cloudwatchlogs.go 31 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899
  1. // Package awslogs provides the logdriver for forwarding container logs to Amazon CloudWatch Logs
  2. package awslogs // import "github.com/docker/docker/daemon/logger/awslogs"
  3. import (
  4. "context"
  5. "fmt"
  6. "os"
  7. "regexp"
  8. "sort"
  9. "strconv"
  10. "sync"
  11. "time"
  12. "unicode/utf8"
  13. "github.com/aws/aws-sdk-go-v2/aws"
  14. "github.com/aws/aws-sdk-go-v2/aws/middleware"
  15. "github.com/aws/aws-sdk-go-v2/config"
  16. "github.com/aws/aws-sdk-go-v2/credentials/endpointcreds"
  17. "github.com/aws/aws-sdk-go-v2/feature/ec2/imds"
  18. "github.com/aws/aws-sdk-go-v2/service/cloudwatchlogs"
  19. "github.com/aws/aws-sdk-go-v2/service/cloudwatchlogs/types"
  20. "github.com/aws/smithy-go"
  21. smithymiddleware "github.com/aws/smithy-go/middleware"
  22. smithyhttp "github.com/aws/smithy-go/transport/http"
  23. "github.com/containerd/log"
  24. "github.com/docker/docker/daemon/logger"
  25. "github.com/docker/docker/daemon/logger/loggerutils"
  26. "github.com/docker/docker/dockerversion"
  27. "github.com/pkg/errors"
  28. )
  29. const (
  30. name = "awslogs"
  31. regionKey = "awslogs-region"
  32. endpointKey = "awslogs-endpoint"
  33. regionEnvKey = "AWS_REGION"
  34. logGroupKey = "awslogs-group"
  35. logStreamKey = "awslogs-stream"
  36. logCreateGroupKey = "awslogs-create-group"
  37. logCreateStreamKey = "awslogs-create-stream"
  38. tagKey = "tag"
  39. datetimeFormatKey = "awslogs-datetime-format"
  40. multilinePatternKey = "awslogs-multiline-pattern"
  41. credentialsEndpointKey = "awslogs-credentials-endpoint" //nolint:gosec // G101: Potential hardcoded credentials
  42. forceFlushIntervalKey = "awslogs-force-flush-interval-seconds"
  43. maxBufferedEventsKey = "awslogs-max-buffered-events"
  44. logFormatKey = "awslogs-format"
  45. defaultForceFlushInterval = 5 * time.Second
  46. defaultMaxBufferedEvents = 4096
  47. // See: http://docs.aws.amazon.com/AmazonCloudWatchLogs/latest/APIReference/API_PutLogEvents.html
  48. perEventBytes = 26
  49. maximumBytesPerPut = 1048576
  50. maximumLogEventsPerPut = 10000
  51. // See: http://docs.aws.amazon.com/AmazonCloudWatch/latest/DeveloperGuide/cloudwatch_limits.html
  52. // Because the events are interpreted as UTF-8 encoded Unicode, invalid UTF-8 byte sequences are replaced with the
  53. // Unicode replacement character (U+FFFD), which is a 3-byte sequence in UTF-8. To compensate for that and to avoid
  54. // splitting valid UTF-8 characters into invalid byte sequences, we calculate the length of each event assuming that
  55. // this replacement happens.
  56. maximumBytesPerEvent = 262144 - perEventBytes
  57. credentialsEndpoint = "http://169.254.170.2" //nolint:gosec // G101: Potential hardcoded credentials
  58. // See: https://docs.aws.amazon.com/AmazonCloudWatch/latest/monitoring/CloudWatch_Embedded_Metric_Format_Specification.html
  59. logsFormatHeader = "x-amzn-logs-format"
  60. jsonEmfLogFormat = "json/emf"
  61. )
  62. type logStream struct {
  63. logStreamName string
  64. logGroupName string
  65. logCreateGroup bool
  66. logCreateStream bool
  67. forceFlushInterval time.Duration
  68. multilinePattern *regexp.Regexp
  69. client api
  70. messages chan *logger.Message
  71. lock sync.RWMutex
  72. closed bool
  73. sequenceToken *string
  74. }
  75. type logStreamConfig struct {
  76. logStreamName string
  77. logGroupName string
  78. logCreateGroup bool
  79. logCreateStream bool
  80. forceFlushInterval time.Duration
  81. maxBufferedEvents int
  82. multilinePattern *regexp.Regexp
  83. }
  84. var _ logger.SizedLogger = &logStream{}
  85. type api interface {
  86. CreateLogGroup(context.Context, *cloudwatchlogs.CreateLogGroupInput, ...func(*cloudwatchlogs.Options)) (*cloudwatchlogs.CreateLogGroupOutput, error)
  87. CreateLogStream(context.Context, *cloudwatchlogs.CreateLogStreamInput, ...func(*cloudwatchlogs.Options)) (*cloudwatchlogs.CreateLogStreamOutput, error)
  88. PutLogEvents(context.Context, *cloudwatchlogs.PutLogEventsInput, ...func(*cloudwatchlogs.Options)) (*cloudwatchlogs.PutLogEventsOutput, error)
  89. }
  90. type regionFinder interface {
  91. GetRegion(context.Context, *imds.GetRegionInput, ...func(*imds.Options)) (*imds.GetRegionOutput, error)
  92. }
  93. type wrappedEvent struct {
  94. inputLogEvent types.InputLogEvent
  95. insertOrder int
  96. }
  97. type byTimestamp []wrappedEvent
  98. // init registers the awslogs driver
  99. func init() {
  100. if err := logger.RegisterLogDriver(name, New); err != nil {
  101. panic(err)
  102. }
  103. if err := logger.RegisterLogOptValidator(name, ValidateLogOpt); err != nil {
  104. panic(err)
  105. }
  106. }
  107. // eventBatch holds the events that are batched for submission and the
  108. // associated data about it.
  109. //
  110. // Warning: this type is not threadsafe and must not be used
  111. // concurrently. This type is expected to be consumed in a single go
  112. // routine and never concurrently.
  113. type eventBatch struct {
  114. batch []wrappedEvent
  115. bytes int
  116. }
  117. // New creates an awslogs logger using the configuration passed in on the
  118. // context. Supported context configuration variables are awslogs-region,
  119. // awslogs-endpoint, awslogs-group, awslogs-stream, awslogs-create-group,
  120. // awslogs-multiline-pattern and awslogs-datetime-format.
  121. // When available, configuration is also taken from environment variables
  122. // AWS_REGION, AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY, the shared credentials
  123. // file (~/.aws/credentials), and the EC2 Instance Metadata Service.
  124. func New(info logger.Info) (logger.Logger, error) {
  125. containerStreamConfig, err := newStreamConfig(info)
  126. if err != nil {
  127. return nil, err
  128. }
  129. client, err := newAWSLogsClient(info)
  130. if err != nil {
  131. return nil, err
  132. }
  133. logNonBlocking := info.Config["mode"] == "non-blocking"
  134. containerStream := &logStream{
  135. logStreamName: containerStreamConfig.logStreamName,
  136. logGroupName: containerStreamConfig.logGroupName,
  137. logCreateGroup: containerStreamConfig.logCreateGroup,
  138. logCreateStream: containerStreamConfig.logCreateStream,
  139. forceFlushInterval: containerStreamConfig.forceFlushInterval,
  140. multilinePattern: containerStreamConfig.multilinePattern,
  141. client: client,
  142. messages: make(chan *logger.Message, containerStreamConfig.maxBufferedEvents),
  143. }
  144. creationDone := make(chan bool)
  145. if logNonBlocking {
  146. go func() {
  147. backoff := 1
  148. maxBackoff := 32
  149. for {
  150. // If logger is closed we are done
  151. containerStream.lock.RLock()
  152. if containerStream.closed {
  153. containerStream.lock.RUnlock()
  154. break
  155. }
  156. containerStream.lock.RUnlock()
  157. err := containerStream.create()
  158. if err == nil {
  159. break
  160. }
  161. time.Sleep(time.Duration(backoff) * time.Second)
  162. if backoff < maxBackoff {
  163. backoff *= 2
  164. }
  165. log.G(context.TODO()).
  166. WithError(err).
  167. WithField("container-id", info.ContainerID).
  168. WithField("container-name", info.ContainerName).
  169. Error("Error while trying to initialize awslogs. Retrying in: ", backoff, " seconds")
  170. }
  171. close(creationDone)
  172. }()
  173. } else {
  174. if err = containerStream.create(); err != nil {
  175. return nil, err
  176. }
  177. close(creationDone)
  178. }
  179. go containerStream.collectBatch(creationDone)
  180. return containerStream, nil
  181. }
  182. // Parses most of the awslogs- options and prepares a config object to be used for newing the actual stream
  183. // It has been formed out to ease Utest of the New above
  184. func newStreamConfig(info logger.Info) (*logStreamConfig, error) {
  185. logGroupName := info.Config[logGroupKey]
  186. logStreamName, err := loggerutils.ParseLogTag(info, "{{.FullID}}")
  187. if err != nil {
  188. return nil, err
  189. }
  190. logCreateGroup := false
  191. if info.Config[logCreateGroupKey] != "" {
  192. logCreateGroup, err = strconv.ParseBool(info.Config[logCreateGroupKey])
  193. if err != nil {
  194. return nil, err
  195. }
  196. }
  197. forceFlushInterval := defaultForceFlushInterval
  198. if info.Config[forceFlushIntervalKey] != "" {
  199. forceFlushIntervalAsInt, err := strconv.Atoi(info.Config[forceFlushIntervalKey])
  200. if err != nil {
  201. return nil, err
  202. }
  203. forceFlushInterval = time.Duration(forceFlushIntervalAsInt) * time.Second
  204. }
  205. maxBufferedEvents := int(defaultMaxBufferedEvents)
  206. if info.Config[maxBufferedEventsKey] != "" {
  207. maxBufferedEvents, err = strconv.Atoi(info.Config[maxBufferedEventsKey])
  208. if err != nil {
  209. return nil, err
  210. }
  211. }
  212. if info.Config[logStreamKey] != "" {
  213. logStreamName = info.Config[logStreamKey]
  214. }
  215. logCreateStream := true
  216. if info.Config[logCreateStreamKey] != "" {
  217. logCreateStream, err = strconv.ParseBool(info.Config[logCreateStreamKey])
  218. if err != nil {
  219. return nil, err
  220. }
  221. }
  222. multilinePattern, err := parseMultilineOptions(info)
  223. if err != nil {
  224. return nil, err
  225. }
  226. containerStreamConfig := &logStreamConfig{
  227. logStreamName: logStreamName,
  228. logGroupName: logGroupName,
  229. logCreateGroup: logCreateGroup,
  230. logCreateStream: logCreateStream,
  231. forceFlushInterval: forceFlushInterval,
  232. maxBufferedEvents: maxBufferedEvents,
  233. multilinePattern: multilinePattern,
  234. }
  235. return containerStreamConfig, nil
  236. }
  237. // Parses awslogs-multiline-pattern and awslogs-datetime-format options
  238. // If awslogs-datetime-format is present, convert the format from strftime
  239. // to regexp and return.
  240. // If awslogs-multiline-pattern is present, compile regexp and return
  241. func parseMultilineOptions(info logger.Info) (*regexp.Regexp, error) {
  242. dateTimeFormat := info.Config[datetimeFormatKey]
  243. multilinePatternKey := info.Config[multilinePatternKey]
  244. // strftime input is parsed into a regular expression
  245. if dateTimeFormat != "" {
  246. // %. matches each strftime format sequence and ReplaceAllStringFunc
  247. // looks up each format sequence in the conversion table strftimeToRegex
  248. // to replace with a defined regular expression
  249. r := regexp.MustCompile("%.")
  250. multilinePatternKey = r.ReplaceAllStringFunc(dateTimeFormat, func(s string) string {
  251. return strftimeToRegex[s]
  252. })
  253. }
  254. if multilinePatternKey != "" {
  255. multilinePattern, err := regexp.Compile(multilinePatternKey)
  256. if err != nil {
  257. return nil, errors.Wrapf(err, "awslogs could not parse multiline pattern key %q", multilinePatternKey)
  258. }
  259. return multilinePattern, nil
  260. }
  261. return nil, nil
  262. }
  263. // Maps strftime format strings to regex
  264. var strftimeToRegex = map[string]string{
  265. /*weekdayShort */ `%a`: `(?:Mon|Tue|Wed|Thu|Fri|Sat|Sun)`,
  266. /*weekdayFull */ `%A`: `(?:Monday|Tuesday|Wednesday|Thursday|Friday|Saturday|Sunday)`,
  267. /*weekdayZeroIndex */ `%w`: `[0-6]`,
  268. /*dayZeroPadded */ `%d`: `(?:0[1-9]|[1,2][0-9]|3[0,1])`,
  269. /*monthShort */ `%b`: `(?:Jan|Feb|Mar|Apr|May|Jun|Jul|Aug|Sep|Oct|Nov|Dec)`,
  270. /*monthFull */ `%B`: `(?:January|February|March|April|May|June|July|August|September|October|November|December)`,
  271. /*monthZeroPadded */ `%m`: `(?:0[1-9]|1[0-2])`,
  272. /*yearCentury */ `%Y`: `\d{4}`,
  273. /*yearZeroPadded */ `%y`: `\d{2}`,
  274. /*hour24ZeroPadded */ `%H`: `(?:[0,1][0-9]|2[0-3])`,
  275. /*hour12ZeroPadded */ `%I`: `(?:0[0-9]|1[0-2])`,
  276. /*AM or PM */ `%p`: "[A,P]M",
  277. /*minuteZeroPadded */ `%M`: `[0-5][0-9]`,
  278. /*secondZeroPadded */ `%S`: `[0-5][0-9]`,
  279. /*microsecondZeroPadded */ `%f`: `\d{6}`,
  280. /*utcOffset */ `%z`: `[+-]\d{4}`,
  281. /*tzName */ `%Z`: `[A-Z]{1,4}T`,
  282. /*dayOfYearZeroPadded */ `%j`: `(?:0[0-9][1-9]|[1,2][0-9][0-9]|3[0-5][0-9]|36[0-6])`,
  283. /*milliseconds */ `%L`: `\.\d{3}`,
  284. }
  285. // newRegionFinder is a variable such that the implementation
  286. // can be swapped out for unit tests.
  287. var newRegionFinder = func(ctx context.Context) (regionFinder, error) {
  288. cfg, err := config.LoadDefaultConfig(ctx) // default config, because we don't yet know the region
  289. if err != nil {
  290. return nil, err
  291. }
  292. client := imds.NewFromConfig(cfg)
  293. return client, nil
  294. }
  295. // newSDKEndpoint is a variable such that the implementation
  296. // can be swapped out for unit tests.
  297. var newSDKEndpoint = credentialsEndpoint
  298. // newAWSLogsClient creates the service client for Amazon CloudWatch Logs.
  299. // Customizations to the default client from the SDK include a Docker-specific
  300. // User-Agent string and automatic region detection using the EC2 Instance
  301. // Metadata Service when region is otherwise unspecified.
  302. func newAWSLogsClient(info logger.Info, configOpts ...func(*config.LoadOptions) error) (*cloudwatchlogs.Client, error) {
  303. ctx := context.TODO()
  304. var region, endpoint *string
  305. if os.Getenv(regionEnvKey) != "" {
  306. region = aws.String(os.Getenv(regionEnvKey))
  307. }
  308. if info.Config[regionKey] != "" {
  309. region = aws.String(info.Config[regionKey])
  310. }
  311. if info.Config[endpointKey] != "" {
  312. endpoint = aws.String(info.Config[endpointKey])
  313. }
  314. if region == nil || *region == "" {
  315. log.G(ctx).Info("Trying to get region from IMDS")
  316. regFinder, err := newRegionFinder(context.TODO())
  317. if err != nil {
  318. log.G(ctx).WithError(err).Error("could not create regionFinder")
  319. return nil, errors.Wrap(err, "could not create regionFinder")
  320. }
  321. r, err := regFinder.GetRegion(context.TODO(), &imds.GetRegionInput{})
  322. if err != nil {
  323. log.G(ctx).WithError(err).Error("Could not get region from IMDS, environment, or log option")
  324. return nil, errors.Wrap(err, "cannot determine region for awslogs driver")
  325. }
  326. region = &r.Region
  327. }
  328. configOpts = append(configOpts, config.WithRegion(*region))
  329. if uri, ok := info.Config[credentialsEndpointKey]; ok {
  330. log.G(ctx).Debugf("Trying to get credentials from awslogs-credentials-endpoint")
  331. endpoint := fmt.Sprintf("%s%s", newSDKEndpoint, uri)
  332. configOpts = append(configOpts, config.WithCredentialsProvider(endpointcreds.New(endpoint)))
  333. }
  334. cfg, err := config.LoadDefaultConfig(context.TODO(), configOpts...)
  335. if err != nil {
  336. log.G(ctx).WithError(err).Error("Could not initialize AWS SDK config")
  337. return nil, errors.Wrap(err, "could not initialize AWS SDK config")
  338. }
  339. log.G(ctx).WithFields(log.Fields{
  340. "region": *region,
  341. }).Debug("Created awslogs client")
  342. var clientOpts []func(*cloudwatchlogs.Options)
  343. if info.Config[logFormatKey] != "" {
  344. logFormatMiddleware := smithymiddleware.BuildMiddlewareFunc("logFormat", func(
  345. ctx context.Context, in smithymiddleware.BuildInput, next smithymiddleware.BuildHandler,
  346. ) (
  347. out smithymiddleware.BuildOutput, metadata smithymiddleware.Metadata, err error,
  348. ) {
  349. switch v := in.Request.(type) {
  350. case *smithyhttp.Request:
  351. v.Header.Add(logsFormatHeader, jsonEmfLogFormat)
  352. }
  353. return next.HandleBuild(ctx, in)
  354. })
  355. clientOpts = append(
  356. clientOpts,
  357. cloudwatchlogs.WithAPIOptions(func(stack *smithymiddleware.Stack) error {
  358. return stack.Build.Add(logFormatMiddleware, smithymiddleware.Before)
  359. }),
  360. )
  361. }
  362. clientOpts = append(
  363. clientOpts,
  364. cloudwatchlogs.WithAPIOptions(middleware.AddUserAgentKeyValue("Docker", dockerversion.Version)),
  365. )
  366. if endpoint != nil {
  367. clientOpts = append(clientOpts, cloudwatchlogs.WithEndpointResolver(cloudwatchlogs.EndpointResolverFromURL(*endpoint)))
  368. }
  369. client := cloudwatchlogs.NewFromConfig(cfg, clientOpts...)
  370. return client, nil
  371. }
  372. // Name returns the name of the awslogs logging driver
  373. func (l *logStream) Name() string {
  374. return name
  375. }
  376. // BufSize returns the maximum bytes CloudWatch can handle.
  377. func (l *logStream) BufSize() int {
  378. return maximumBytesPerEvent
  379. }
  380. // Log submits messages for logging by an instance of the awslogs logging driver
  381. func (l *logStream) Log(msg *logger.Message) error {
  382. l.lock.RLock()
  383. defer l.lock.RUnlock()
  384. if l.closed {
  385. return errors.New("awslogs is closed")
  386. }
  387. l.messages <- msg
  388. return nil
  389. }
  390. // Close closes the instance of the awslogs logging driver
  391. func (l *logStream) Close() error {
  392. l.lock.Lock()
  393. defer l.lock.Unlock()
  394. if !l.closed {
  395. close(l.messages)
  396. }
  397. l.closed = true
  398. return nil
  399. }
  400. // create creates log group and log stream for the instance of the awslogs logging driver
  401. func (l *logStream) create() error {
  402. err := l.createLogStream()
  403. if err == nil {
  404. return nil
  405. }
  406. var apiErr *types.ResourceNotFoundException
  407. if errors.As(err, &apiErr) && l.logCreateGroup {
  408. if err := l.createLogGroup(); err != nil {
  409. return errors.Wrap(err, "failed to create Cloudwatch log group")
  410. }
  411. err = l.createLogStream()
  412. if err == nil {
  413. return nil
  414. }
  415. }
  416. return errors.Wrap(err, "failed to create Cloudwatch log stream")
  417. }
  418. // createLogGroup creates a log group for the instance of the awslogs logging driver
  419. func (l *logStream) createLogGroup() error {
  420. if _, err := l.client.CreateLogGroup(context.TODO(), &cloudwatchlogs.CreateLogGroupInput{
  421. LogGroupName: aws.String(l.logGroupName),
  422. }); err != nil {
  423. var apiErr smithy.APIError
  424. if errors.As(err, &apiErr) {
  425. fields := log.Fields{
  426. "errorCode": apiErr.ErrorCode(),
  427. "message": apiErr.ErrorMessage(),
  428. "logGroupName": l.logGroupName,
  429. "logCreateGroup": l.logCreateGroup,
  430. }
  431. if _, ok := apiErr.(*types.ResourceAlreadyExistsException); ok {
  432. // Allow creation to succeed
  433. log.G(context.TODO()).WithFields(fields).Info("Log group already exists")
  434. return nil
  435. }
  436. log.G(context.TODO()).WithFields(fields).Error("Failed to create log group")
  437. }
  438. return err
  439. }
  440. return nil
  441. }
  442. // createLogStream creates a log stream for the instance of the awslogs logging driver
  443. func (l *logStream) createLogStream() error {
  444. // Directly return if we do not want to create log stream.
  445. if !l.logCreateStream {
  446. log.G(context.TODO()).WithFields(log.Fields{
  447. "logGroupName": l.logGroupName,
  448. "logStreamName": l.logStreamName,
  449. "logCreateStream": l.logCreateStream,
  450. }).Info("Skipping creating log stream")
  451. return nil
  452. }
  453. input := &cloudwatchlogs.CreateLogStreamInput{
  454. LogGroupName: aws.String(l.logGroupName),
  455. LogStreamName: aws.String(l.logStreamName),
  456. }
  457. _, err := l.client.CreateLogStream(context.TODO(), input)
  458. if err != nil {
  459. var apiErr smithy.APIError
  460. if errors.As(err, &apiErr) {
  461. fields := log.Fields{
  462. "errorCode": apiErr.ErrorCode(),
  463. "message": apiErr.ErrorMessage(),
  464. "logGroupName": l.logGroupName,
  465. "logStreamName": l.logStreamName,
  466. }
  467. if _, ok := apiErr.(*types.ResourceAlreadyExistsException); ok {
  468. // Allow creation to succeed
  469. log.G(context.TODO()).WithFields(fields).Info("Log stream already exists")
  470. return nil
  471. }
  472. log.G(context.TODO()).WithFields(fields).Error("Failed to create log stream")
  473. }
  474. }
  475. return err
  476. }
  477. // newTicker is used for time-based batching. newTicker is a variable such
  478. // that the implementation can be swapped out for unit tests.
  479. var newTicker = func(freq time.Duration) *time.Ticker {
  480. return time.NewTicker(freq)
  481. }
  482. // collectBatch executes as a goroutine to perform batching of log events for
  483. // submission to the log stream. If the awslogs-multiline-pattern or
  484. // awslogs-datetime-format options have been configured, multiline processing
  485. // is enabled, where log messages are stored in an event buffer until a multiline
  486. // pattern match is found, at which point the messages in the event buffer are
  487. // pushed to CloudWatch logs as a single log event. Multiline messages are processed
  488. // according to the maximumBytesPerPut constraint, and the implementation only
  489. // allows for messages to be buffered for a maximum of 2*l.forceFlushInterval
  490. // seconds. If no forceFlushInterval is specified for the log stream, then the default
  491. // of 5 seconds will be used resulting in a maximum of 10 seconds buffer time for multiline
  492. // messages. When events are ready to be processed for submission to CloudWatch
  493. // Logs, the processEvents method is called. If a multiline pattern is not
  494. // configured, log events are submitted to the processEvents method immediately.
  495. func (l *logStream) collectBatch(created chan bool) {
  496. // Wait for the logstream/group to be created
  497. <-created
  498. flushInterval := l.forceFlushInterval
  499. if flushInterval <= 0 {
  500. flushInterval = defaultForceFlushInterval
  501. }
  502. ticker := newTicker(flushInterval)
  503. var eventBuffer []byte
  504. var eventBufferTimestamp int64
  505. batch := newEventBatch()
  506. for {
  507. select {
  508. case t := <-ticker.C:
  509. // If event buffer is older than batch publish frequency flush the event buffer
  510. if eventBufferTimestamp > 0 && len(eventBuffer) > 0 {
  511. eventBufferAge := t.UnixNano()/int64(time.Millisecond) - eventBufferTimestamp
  512. eventBufferExpired := eventBufferAge >= int64(flushInterval)/int64(time.Millisecond)
  513. eventBufferNegative := eventBufferAge < 0
  514. if eventBufferExpired || eventBufferNegative {
  515. l.processEvent(batch, eventBuffer, eventBufferTimestamp)
  516. eventBuffer = eventBuffer[:0]
  517. }
  518. }
  519. l.publishBatch(batch)
  520. batch.reset()
  521. case msg, more := <-l.messages:
  522. if !more {
  523. // Flush event buffer and release resources
  524. l.processEvent(batch, eventBuffer, eventBufferTimestamp)
  525. l.publishBatch(batch)
  526. batch.reset()
  527. return
  528. }
  529. if eventBufferTimestamp == 0 {
  530. eventBufferTimestamp = msg.Timestamp.UnixNano() / int64(time.Millisecond)
  531. }
  532. line := msg.Line
  533. if l.multilinePattern != nil {
  534. lineEffectiveLen := effectiveLen(string(line))
  535. if l.multilinePattern.Match(line) || effectiveLen(string(eventBuffer))+lineEffectiveLen > maximumBytesPerEvent {
  536. // This is a new log event or we will exceed max bytes per event
  537. // so flush the current eventBuffer to events and reset timestamp
  538. l.processEvent(batch, eventBuffer, eventBufferTimestamp)
  539. eventBufferTimestamp = msg.Timestamp.UnixNano() / int64(time.Millisecond)
  540. eventBuffer = eventBuffer[:0]
  541. }
  542. // Append newline if event is less than max event size
  543. if lineEffectiveLen < maximumBytesPerEvent {
  544. line = append(line, "\n"...)
  545. }
  546. eventBuffer = append(eventBuffer, line...)
  547. logger.PutMessage(msg)
  548. } else {
  549. l.processEvent(batch, line, msg.Timestamp.UnixNano()/int64(time.Millisecond))
  550. logger.PutMessage(msg)
  551. }
  552. }
  553. }
  554. }
  555. // processEvent processes log events that are ready for submission to CloudWatch
  556. // logs. Batching is performed on time- and size-bases. Time-based batching occurs
  557. // at the interval defined by awslogs-force-flush-interval-seconds (defaults to 5 seconds).
  558. // Size-based batching is performed on the maximum number of events per batch
  559. // (defined in maximumLogEventsPerPut) and the maximum number of total bytes in a
  560. // batch (defined in maximumBytesPerPut). Log messages are split by the maximum
  561. // bytes per event (defined in maximumBytesPerEvent). There is a fixed per-event
  562. // byte overhead (defined in perEventBytes) which is accounted for in split- and
  563. // batch-calculations. Because the events are interpreted as UTF-8 encoded
  564. // Unicode, invalid UTF-8 byte sequences are replaced with the Unicode
  565. // replacement character (U+FFFD), which is a 3-byte sequence in UTF-8. To
  566. // compensate for that and to avoid splitting valid UTF-8 characters into
  567. // invalid byte sequences, we calculate the length of each event assuming that
  568. // this replacement happens.
  569. func (l *logStream) processEvent(batch *eventBatch, bytes []byte, timestamp int64) {
  570. for len(bytes) > 0 {
  571. // Split line length so it does not exceed the maximum
  572. splitOffset, lineBytes := findValidSplit(string(bytes), maximumBytesPerEvent)
  573. line := bytes[:splitOffset]
  574. event := wrappedEvent{
  575. inputLogEvent: types.InputLogEvent{
  576. Message: aws.String(string(line)),
  577. Timestamp: aws.Int64(timestamp),
  578. },
  579. insertOrder: batch.count(),
  580. }
  581. added := batch.add(event, lineBytes)
  582. if added {
  583. bytes = bytes[splitOffset:]
  584. } else {
  585. l.publishBatch(batch)
  586. batch.reset()
  587. }
  588. }
  589. }
  590. // effectiveLen counts the effective number of bytes in the string, after
  591. // UTF-8 normalization. UTF-8 normalization includes replacing bytes that do
  592. // not constitute valid UTF-8 encoded Unicode codepoints with the Unicode
  593. // replacement codepoint U+FFFD (a 3-byte UTF-8 sequence, represented in Go as
  594. // utf8.RuneError)
  595. func effectiveLen(line string) int {
  596. effectiveBytes := 0
  597. for _, rune := range line {
  598. effectiveBytes += utf8.RuneLen(rune)
  599. }
  600. return effectiveBytes
  601. }
  602. // findValidSplit finds the byte offset to split a string without breaking valid
  603. // Unicode codepoints given a maximum number of total bytes. findValidSplit
  604. // returns the byte offset for splitting a string or []byte, as well as the
  605. // effective number of bytes if the string were normalized to replace invalid
  606. // UTF-8 encoded bytes with the Unicode replacement character (a 3-byte UTF-8
  607. // sequence, represented in Go as utf8.RuneError)
  608. func findValidSplit(line string, maxBytes int) (splitOffset, effectiveBytes int) {
  609. for offset, rune := range line {
  610. splitOffset = offset
  611. if effectiveBytes+utf8.RuneLen(rune) > maxBytes {
  612. return splitOffset, effectiveBytes
  613. }
  614. effectiveBytes += utf8.RuneLen(rune)
  615. }
  616. splitOffset = len(line)
  617. return
  618. }
  619. // publishBatch calls PutLogEvents for a given set of InputLogEvents,
  620. // accounting for sequencing requirements (each request must reference the
  621. // sequence token returned by the previous request).
  622. func (l *logStream) publishBatch(batch *eventBatch) {
  623. if batch.isEmpty() {
  624. return
  625. }
  626. cwEvents := unwrapEvents(batch.events())
  627. nextSequenceToken, err := l.putLogEvents(cwEvents, l.sequenceToken)
  628. if err != nil {
  629. if apiErr := (*types.DataAlreadyAcceptedException)(nil); errors.As(err, &apiErr) {
  630. // already submitted, just grab the correct sequence token
  631. nextSequenceToken = apiErr.ExpectedSequenceToken
  632. log.G(context.TODO()).WithFields(log.Fields{
  633. "errorCode": apiErr.ErrorCode(),
  634. "message": apiErr.ErrorMessage(),
  635. "logGroupName": l.logGroupName,
  636. "logStreamName": l.logStreamName,
  637. }).Info("Data already accepted, ignoring error")
  638. err = nil
  639. } else if apiErr := (*types.InvalidSequenceTokenException)(nil); errors.As(err, &apiErr) {
  640. nextSequenceToken, err = l.putLogEvents(cwEvents, apiErr.ExpectedSequenceToken)
  641. }
  642. }
  643. if err != nil {
  644. log.G(context.TODO()).Error(err)
  645. } else {
  646. l.sequenceToken = nextSequenceToken
  647. }
  648. }
  649. // putLogEvents wraps the PutLogEvents API
  650. func (l *logStream) putLogEvents(events []types.InputLogEvent, sequenceToken *string) (*string, error) {
  651. input := &cloudwatchlogs.PutLogEventsInput{
  652. LogEvents: events,
  653. SequenceToken: sequenceToken,
  654. LogGroupName: aws.String(l.logGroupName),
  655. LogStreamName: aws.String(l.logStreamName),
  656. }
  657. resp, err := l.client.PutLogEvents(context.TODO(), input)
  658. if err != nil {
  659. var apiErr smithy.APIError
  660. if errors.As(err, &apiErr) {
  661. log.G(context.TODO()).WithFields(log.Fields{
  662. "errorCode": apiErr.ErrorCode(),
  663. "message": apiErr.ErrorMessage(),
  664. "logGroupName": l.logGroupName,
  665. "logStreamName": l.logStreamName,
  666. }).Error("Failed to put log events")
  667. }
  668. return nil, err
  669. }
  670. return resp.NextSequenceToken, nil
  671. }
  672. // ValidateLogOpt looks for awslogs-specific log options awslogs-region, awslogs-endpoint
  673. // awslogs-group, awslogs-stream, awslogs-create-group, awslogs-datetime-format,
  674. // awslogs-multiline-pattern
  675. func ValidateLogOpt(cfg map[string]string) error {
  676. for key := range cfg {
  677. switch key {
  678. case logGroupKey:
  679. case logStreamKey:
  680. case logCreateGroupKey:
  681. case regionKey:
  682. case endpointKey:
  683. case tagKey:
  684. case datetimeFormatKey:
  685. case multilinePatternKey:
  686. case credentialsEndpointKey:
  687. case forceFlushIntervalKey:
  688. case maxBufferedEventsKey:
  689. case logFormatKey:
  690. default:
  691. return fmt.Errorf("unknown log opt '%s' for %s log driver", key, name)
  692. }
  693. }
  694. if cfg[logGroupKey] == "" {
  695. return fmt.Errorf("must specify a value for log opt '%s'", logGroupKey)
  696. }
  697. if cfg[logCreateGroupKey] != "" {
  698. if _, err := strconv.ParseBool(cfg[logCreateGroupKey]); err != nil {
  699. return fmt.Errorf("must specify valid value for log opt '%s': %v", logCreateGroupKey, err)
  700. }
  701. }
  702. if cfg[forceFlushIntervalKey] != "" {
  703. if value, err := strconv.Atoi(cfg[forceFlushIntervalKey]); err != nil || value <= 0 {
  704. return fmt.Errorf("must specify a positive integer for log opt '%s': %v", forceFlushIntervalKey, cfg[forceFlushIntervalKey])
  705. }
  706. }
  707. if cfg[maxBufferedEventsKey] != "" {
  708. if value, err := strconv.Atoi(cfg[maxBufferedEventsKey]); err != nil || value <= 0 {
  709. return fmt.Errorf("must specify a positive integer for log opt '%s': %v", maxBufferedEventsKey, cfg[maxBufferedEventsKey])
  710. }
  711. }
  712. _, datetimeFormatKeyExists := cfg[datetimeFormatKey]
  713. _, multilinePatternKeyExists := cfg[multilinePatternKey]
  714. if datetimeFormatKeyExists && multilinePatternKeyExists {
  715. return fmt.Errorf("you cannot configure log opt '%s' and '%s' at the same time", datetimeFormatKey, multilinePatternKey)
  716. }
  717. if cfg[logFormatKey] != "" {
  718. // For now, only the "json/emf" log format is supported
  719. if cfg[logFormatKey] != jsonEmfLogFormat {
  720. return fmt.Errorf("unsupported log format '%s'", cfg[logFormatKey])
  721. }
  722. if datetimeFormatKeyExists || multilinePatternKeyExists {
  723. return fmt.Errorf("you cannot configure log opt '%s' or '%s' when log opt '%s' is set to '%s'", datetimeFormatKey, multilinePatternKey, logFormatKey, jsonEmfLogFormat)
  724. }
  725. }
  726. return nil
  727. }
  728. // Len returns the length of a byTimestamp slice. Len is required by the
  729. // sort.Interface interface.
  730. func (slice byTimestamp) Len() int {
  731. return len(slice)
  732. }
  733. // Less compares two values in a byTimestamp slice by Timestamp. Less is
  734. // required by the sort.Interface interface.
  735. func (slice byTimestamp) Less(i, j int) bool {
  736. iTimestamp, jTimestamp := int64(0), int64(0)
  737. if slice != nil && slice[i].inputLogEvent.Timestamp != nil {
  738. iTimestamp = *slice[i].inputLogEvent.Timestamp
  739. }
  740. if slice != nil && slice[j].inputLogEvent.Timestamp != nil {
  741. jTimestamp = *slice[j].inputLogEvent.Timestamp
  742. }
  743. if iTimestamp == jTimestamp {
  744. return slice[i].insertOrder < slice[j].insertOrder
  745. }
  746. return iTimestamp < jTimestamp
  747. }
  748. // Swap swaps two values in a byTimestamp slice with each other. Swap is
  749. // required by the sort.Interface interface.
  750. func (slice byTimestamp) Swap(i, j int) {
  751. slice[i], slice[j] = slice[j], slice[i]
  752. }
  753. func unwrapEvents(events []wrappedEvent) []types.InputLogEvent {
  754. cwEvents := make([]types.InputLogEvent, len(events))
  755. for i, input := range events {
  756. cwEvents[i] = input.inputLogEvent
  757. }
  758. return cwEvents
  759. }
  760. func newEventBatch() *eventBatch {
  761. return &eventBatch{
  762. batch: make([]wrappedEvent, 0),
  763. bytes: 0,
  764. }
  765. }
  766. // events returns a slice of wrappedEvents sorted in order of their
  767. // timestamps and then by their insertion order (see `byTimestamp`).
  768. //
  769. // Warning: this method is not threadsafe and must not be used
  770. // concurrently.
  771. func (b *eventBatch) events() []wrappedEvent {
  772. sort.Sort(byTimestamp(b.batch))
  773. return b.batch
  774. }
  775. // add adds an event to the batch of events accounting for the
  776. // necessary overhead for an event to be logged. An error will be
  777. // returned if the event cannot be added to the batch due to service
  778. // limits.
  779. //
  780. // Warning: this method is not threadsafe and must not be used
  781. // concurrently.
  782. func (b *eventBatch) add(event wrappedEvent, size int) bool {
  783. addBytes := size + perEventBytes
  784. // verify we are still within service limits
  785. switch {
  786. case len(b.batch)+1 > maximumLogEventsPerPut:
  787. return false
  788. case b.bytes+addBytes > maximumBytesPerPut:
  789. return false
  790. }
  791. b.bytes += addBytes
  792. b.batch = append(b.batch, event)
  793. return true
  794. }
  795. // count is the number of batched events. Warning: this method
  796. // is not threadsafe and must not be used concurrently.
  797. func (b *eventBatch) count() int {
  798. return len(b.batch)
  799. }
  800. // size is the total number of bytes that the batch represents.
  801. //
  802. // Warning: this method is not threadsafe and must not be used
  803. // concurrently.
  804. func (b *eventBatch) size() int {
  805. return b.bytes
  806. }
  807. func (b *eventBatch) isEmpty() bool {
  808. zeroEvents := b.count() == 0
  809. zeroSize := b.size() == 0
  810. return zeroEvents && zeroSize
  811. }
  812. // reset prepares the batch for reuse.
  813. func (b *eventBatch) reset() {
  814. b.bytes = 0
  815. b.batch = b.batch[:0]
  816. }