cloudwatchlogs.go 31 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900
  1. // Package awslogs provides the logdriver for forwarding container logs to Amazon CloudWatch Logs
  2. package awslogs // import "github.com/docker/docker/daemon/logger/awslogs"
  3. import (
  4. "context"
  5. "fmt"
  6. "os"
  7. "regexp"
  8. "sort"
  9. "strconv"
  10. "sync"
  11. "time"
  12. "unicode/utf8"
  13. "github.com/aws/aws-sdk-go-v2/aws"
  14. "github.com/aws/aws-sdk-go-v2/aws/middleware"
  15. "github.com/aws/aws-sdk-go-v2/config"
  16. "github.com/aws/aws-sdk-go-v2/credentials/endpointcreds"
  17. "github.com/aws/aws-sdk-go-v2/feature/ec2/imds"
  18. "github.com/aws/aws-sdk-go-v2/service/cloudwatchlogs"
  19. "github.com/aws/aws-sdk-go-v2/service/cloudwatchlogs/types"
  20. "github.com/aws/smithy-go"
  21. smithymiddleware "github.com/aws/smithy-go/middleware"
  22. smithyhttp "github.com/aws/smithy-go/transport/http"
  23. "github.com/containerd/containerd/log"
  24. "github.com/docker/docker/daemon/logger"
  25. "github.com/docker/docker/daemon/logger/loggerutils"
  26. "github.com/docker/docker/dockerversion"
  27. "github.com/pkg/errors"
  28. "github.com/sirupsen/logrus"
  29. )
  30. const (
  31. name = "awslogs"
  32. regionKey = "awslogs-region"
  33. endpointKey = "awslogs-endpoint"
  34. regionEnvKey = "AWS_REGION"
  35. logGroupKey = "awslogs-group"
  36. logStreamKey = "awslogs-stream"
  37. logCreateGroupKey = "awslogs-create-group"
  38. logCreateStreamKey = "awslogs-create-stream"
  39. tagKey = "tag"
  40. datetimeFormatKey = "awslogs-datetime-format"
  41. multilinePatternKey = "awslogs-multiline-pattern"
  42. credentialsEndpointKey = "awslogs-credentials-endpoint" //nolint:gosec // G101: Potential hardcoded credentials
  43. forceFlushIntervalKey = "awslogs-force-flush-interval-seconds"
  44. maxBufferedEventsKey = "awslogs-max-buffered-events"
  45. logFormatKey = "awslogs-format"
  46. defaultForceFlushInterval = 5 * time.Second
  47. defaultMaxBufferedEvents = 4096
  48. // See: http://docs.aws.amazon.com/AmazonCloudWatchLogs/latest/APIReference/API_PutLogEvents.html
  49. perEventBytes = 26
  50. maximumBytesPerPut = 1048576
  51. maximumLogEventsPerPut = 10000
  52. // See: http://docs.aws.amazon.com/AmazonCloudWatch/latest/DeveloperGuide/cloudwatch_limits.html
  53. // Because the events are interpreted as UTF-8 encoded Unicode, invalid UTF-8 byte sequences are replaced with the
  54. // Unicode replacement character (U+FFFD), which is a 3-byte sequence in UTF-8. To compensate for that and to avoid
  55. // splitting valid UTF-8 characters into invalid byte sequences, we calculate the length of each event assuming that
  56. // this replacement happens.
  57. maximumBytesPerEvent = 262144 - perEventBytes
  58. credentialsEndpoint = "http://169.254.170.2" //nolint:gosec // G101: Potential hardcoded credentials
  59. // See: https://docs.aws.amazon.com/AmazonCloudWatch/latest/monitoring/CloudWatch_Embedded_Metric_Format_Specification.html
  60. logsFormatHeader = "x-amzn-logs-format"
  61. jsonEmfLogFormat = "json/emf"
  62. )
  63. type logStream struct {
  64. logStreamName string
  65. logGroupName string
  66. logCreateGroup bool
  67. logCreateStream bool
  68. forceFlushInterval time.Duration
  69. multilinePattern *regexp.Regexp
  70. client api
  71. messages chan *logger.Message
  72. lock sync.RWMutex
  73. closed bool
  74. sequenceToken *string
  75. }
  76. type logStreamConfig struct {
  77. logStreamName string
  78. logGroupName string
  79. logCreateGroup bool
  80. logCreateStream bool
  81. forceFlushInterval time.Duration
  82. maxBufferedEvents int
  83. multilinePattern *regexp.Regexp
  84. }
  85. var _ logger.SizedLogger = &logStream{}
  86. type api interface {
  87. CreateLogGroup(context.Context, *cloudwatchlogs.CreateLogGroupInput, ...func(*cloudwatchlogs.Options)) (*cloudwatchlogs.CreateLogGroupOutput, error)
  88. CreateLogStream(context.Context, *cloudwatchlogs.CreateLogStreamInput, ...func(*cloudwatchlogs.Options)) (*cloudwatchlogs.CreateLogStreamOutput, error)
  89. PutLogEvents(context.Context, *cloudwatchlogs.PutLogEventsInput, ...func(*cloudwatchlogs.Options)) (*cloudwatchlogs.PutLogEventsOutput, error)
  90. }
  91. type regionFinder interface {
  92. GetRegion(context.Context, *imds.GetRegionInput, ...func(*imds.Options)) (*imds.GetRegionOutput, error)
  93. }
  94. type wrappedEvent struct {
  95. inputLogEvent types.InputLogEvent
  96. insertOrder int
  97. }
  98. type byTimestamp []wrappedEvent
  99. // init registers the awslogs driver
  100. func init() {
  101. if err := logger.RegisterLogDriver(name, New); err != nil {
  102. panic(err)
  103. }
  104. if err := logger.RegisterLogOptValidator(name, ValidateLogOpt); err != nil {
  105. panic(err)
  106. }
  107. }
  108. // eventBatch holds the events that are batched for submission and the
  109. // associated data about it.
  110. //
  111. // Warning: this type is not threadsafe and must not be used
  112. // concurrently. This type is expected to be consumed in a single go
  113. // routine and never concurrently.
  114. type eventBatch struct {
  115. batch []wrappedEvent
  116. bytes int
  117. }
  118. // New creates an awslogs logger using the configuration passed in on the
  119. // context. Supported context configuration variables are awslogs-region,
  120. // awslogs-endpoint, awslogs-group, awslogs-stream, awslogs-create-group,
  121. // awslogs-multiline-pattern and awslogs-datetime-format.
  122. // When available, configuration is also taken from environment variables
  123. // AWS_REGION, AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY, the shared credentials
  124. // file (~/.aws/credentials), and the EC2 Instance Metadata Service.
  125. func New(info logger.Info) (logger.Logger, error) {
  126. containerStreamConfig, err := newStreamConfig(info)
  127. if err != nil {
  128. return nil, err
  129. }
  130. client, err := newAWSLogsClient(info)
  131. if err != nil {
  132. return nil, err
  133. }
  134. logNonBlocking := info.Config["mode"] == "non-blocking"
  135. containerStream := &logStream{
  136. logStreamName: containerStreamConfig.logStreamName,
  137. logGroupName: containerStreamConfig.logGroupName,
  138. logCreateGroup: containerStreamConfig.logCreateGroup,
  139. logCreateStream: containerStreamConfig.logCreateStream,
  140. forceFlushInterval: containerStreamConfig.forceFlushInterval,
  141. multilinePattern: containerStreamConfig.multilinePattern,
  142. client: client,
  143. messages: make(chan *logger.Message, containerStreamConfig.maxBufferedEvents),
  144. }
  145. creationDone := make(chan bool)
  146. if logNonBlocking {
  147. go func() {
  148. backoff := 1
  149. maxBackoff := 32
  150. for {
  151. // If logger is closed we are done
  152. containerStream.lock.RLock()
  153. if containerStream.closed {
  154. containerStream.lock.RUnlock()
  155. break
  156. }
  157. containerStream.lock.RUnlock()
  158. err := containerStream.create()
  159. if err == nil {
  160. break
  161. }
  162. time.Sleep(time.Duration(backoff) * time.Second)
  163. if backoff < maxBackoff {
  164. backoff *= 2
  165. }
  166. log.G(context.TODO()).
  167. WithError(err).
  168. WithField("container-id", info.ContainerID).
  169. WithField("container-name", info.ContainerName).
  170. Error("Error while trying to initialize awslogs. Retrying in: ", backoff, " seconds")
  171. }
  172. close(creationDone)
  173. }()
  174. } else {
  175. if err = containerStream.create(); err != nil {
  176. return nil, err
  177. }
  178. close(creationDone)
  179. }
  180. go containerStream.collectBatch(creationDone)
  181. return containerStream, nil
  182. }
  183. // Parses most of the awslogs- options and prepares a config object to be used for newing the actual stream
  184. // It has been formed out to ease Utest of the New above
  185. func newStreamConfig(info logger.Info) (*logStreamConfig, error) {
  186. logGroupName := info.Config[logGroupKey]
  187. logStreamName, err := loggerutils.ParseLogTag(info, "{{.FullID}}")
  188. if err != nil {
  189. return nil, err
  190. }
  191. logCreateGroup := false
  192. if info.Config[logCreateGroupKey] != "" {
  193. logCreateGroup, err = strconv.ParseBool(info.Config[logCreateGroupKey])
  194. if err != nil {
  195. return nil, err
  196. }
  197. }
  198. forceFlushInterval := defaultForceFlushInterval
  199. if info.Config[forceFlushIntervalKey] != "" {
  200. forceFlushIntervalAsInt, err := strconv.Atoi(info.Config[forceFlushIntervalKey])
  201. if err != nil {
  202. return nil, err
  203. }
  204. forceFlushInterval = time.Duration(forceFlushIntervalAsInt) * time.Second
  205. }
  206. maxBufferedEvents := int(defaultMaxBufferedEvents)
  207. if info.Config[maxBufferedEventsKey] != "" {
  208. maxBufferedEvents, err = strconv.Atoi(info.Config[maxBufferedEventsKey])
  209. if err != nil {
  210. return nil, err
  211. }
  212. }
  213. if info.Config[logStreamKey] != "" {
  214. logStreamName = info.Config[logStreamKey]
  215. }
  216. logCreateStream := true
  217. if info.Config[logCreateStreamKey] != "" {
  218. logCreateStream, err = strconv.ParseBool(info.Config[logCreateStreamKey])
  219. if err != nil {
  220. return nil, err
  221. }
  222. }
  223. multilinePattern, err := parseMultilineOptions(info)
  224. if err != nil {
  225. return nil, err
  226. }
  227. containerStreamConfig := &logStreamConfig{
  228. logStreamName: logStreamName,
  229. logGroupName: logGroupName,
  230. logCreateGroup: logCreateGroup,
  231. logCreateStream: logCreateStream,
  232. forceFlushInterval: forceFlushInterval,
  233. maxBufferedEvents: maxBufferedEvents,
  234. multilinePattern: multilinePattern,
  235. }
  236. return containerStreamConfig, nil
  237. }
  238. // Parses awslogs-multiline-pattern and awslogs-datetime-format options
  239. // If awslogs-datetime-format is present, convert the format from strftime
  240. // to regexp and return.
  241. // If awslogs-multiline-pattern is present, compile regexp and return
  242. func parseMultilineOptions(info logger.Info) (*regexp.Regexp, error) {
  243. dateTimeFormat := info.Config[datetimeFormatKey]
  244. multilinePatternKey := info.Config[multilinePatternKey]
  245. // strftime input is parsed into a regular expression
  246. if dateTimeFormat != "" {
  247. // %. matches each strftime format sequence and ReplaceAllStringFunc
  248. // looks up each format sequence in the conversion table strftimeToRegex
  249. // to replace with a defined regular expression
  250. r := regexp.MustCompile("%.")
  251. multilinePatternKey = r.ReplaceAllStringFunc(dateTimeFormat, func(s string) string {
  252. return strftimeToRegex[s]
  253. })
  254. }
  255. if multilinePatternKey != "" {
  256. multilinePattern, err := regexp.Compile(multilinePatternKey)
  257. if err != nil {
  258. return nil, errors.Wrapf(err, "awslogs could not parse multiline pattern key %q", multilinePatternKey)
  259. }
  260. return multilinePattern, nil
  261. }
  262. return nil, nil
  263. }
  264. // Maps strftime format strings to regex
  265. var strftimeToRegex = map[string]string{
  266. /*weekdayShort */ `%a`: `(?:Mon|Tue|Wed|Thu|Fri|Sat|Sun)`,
  267. /*weekdayFull */ `%A`: `(?:Monday|Tuesday|Wednesday|Thursday|Friday|Saturday|Sunday)`,
  268. /*weekdayZeroIndex */ `%w`: `[0-6]`,
  269. /*dayZeroPadded */ `%d`: `(?:0[1-9]|[1,2][0-9]|3[0,1])`,
  270. /*monthShort */ `%b`: `(?:Jan|Feb|Mar|Apr|May|Jun|Jul|Aug|Sep|Oct|Nov|Dec)`,
  271. /*monthFull */ `%B`: `(?:January|February|March|April|May|June|July|August|September|October|November|December)`,
  272. /*monthZeroPadded */ `%m`: `(?:0[1-9]|1[0-2])`,
  273. /*yearCentury */ `%Y`: `\d{4}`,
  274. /*yearZeroPadded */ `%y`: `\d{2}`,
  275. /*hour24ZeroPadded */ `%H`: `(?:[0,1][0-9]|2[0-3])`,
  276. /*hour12ZeroPadded */ `%I`: `(?:0[0-9]|1[0-2])`,
  277. /*AM or PM */ `%p`: "[A,P]M",
  278. /*minuteZeroPadded */ `%M`: `[0-5][0-9]`,
  279. /*secondZeroPadded */ `%S`: `[0-5][0-9]`,
  280. /*microsecondZeroPadded */ `%f`: `\d{6}`,
  281. /*utcOffset */ `%z`: `[+-]\d{4}`,
  282. /*tzName */ `%Z`: `[A-Z]{1,4}T`,
  283. /*dayOfYearZeroPadded */ `%j`: `(?:0[0-9][1-9]|[1,2][0-9][0-9]|3[0-5][0-9]|36[0-6])`,
  284. /*milliseconds */ `%L`: `\.\d{3}`,
  285. }
  286. // newRegionFinder is a variable such that the implementation
  287. // can be swapped out for unit tests.
  288. var newRegionFinder = func(ctx context.Context) (regionFinder, error) {
  289. cfg, err := config.LoadDefaultConfig(ctx) // default config, because we don't yet know the region
  290. if err != nil {
  291. return nil, err
  292. }
  293. client := imds.NewFromConfig(cfg)
  294. return client, nil
  295. }
  296. // newSDKEndpoint is a variable such that the implementation
  297. // can be swapped out for unit tests.
  298. var newSDKEndpoint = credentialsEndpoint
  299. // newAWSLogsClient creates the service client for Amazon CloudWatch Logs.
  300. // Customizations to the default client from the SDK include a Docker-specific
  301. // User-Agent string and automatic region detection using the EC2 Instance
  302. // Metadata Service when region is otherwise unspecified.
  303. func newAWSLogsClient(info logger.Info, configOpts ...func(*config.LoadOptions) error) (*cloudwatchlogs.Client, error) {
  304. ctx := context.TODO()
  305. var region, endpoint *string
  306. if os.Getenv(regionEnvKey) != "" {
  307. region = aws.String(os.Getenv(regionEnvKey))
  308. }
  309. if info.Config[regionKey] != "" {
  310. region = aws.String(info.Config[regionKey])
  311. }
  312. if info.Config[endpointKey] != "" {
  313. endpoint = aws.String(info.Config[endpointKey])
  314. }
  315. if region == nil || *region == "" {
  316. log.G(ctx).Info("Trying to get region from IMDS")
  317. regFinder, err := newRegionFinder(context.TODO())
  318. if err != nil {
  319. log.G(ctx).WithError(err).Error("could not create regionFinder")
  320. return nil, errors.Wrap(err, "could not create regionFinder")
  321. }
  322. r, err := regFinder.GetRegion(context.TODO(), &imds.GetRegionInput{})
  323. if err != nil {
  324. log.G(ctx).WithError(err).Error("Could not get region from IMDS, environment, or log option")
  325. return nil, errors.Wrap(err, "cannot determine region for awslogs driver")
  326. }
  327. region = &r.Region
  328. }
  329. configOpts = append(configOpts, config.WithRegion(*region))
  330. if uri, ok := info.Config[credentialsEndpointKey]; ok {
  331. log.G(ctx).Debugf("Trying to get credentials from awslogs-credentials-endpoint")
  332. endpoint := fmt.Sprintf("%s%s", newSDKEndpoint, uri)
  333. configOpts = append(configOpts, config.WithCredentialsProvider(endpointcreds.New(endpoint)))
  334. }
  335. cfg, err := config.LoadDefaultConfig(context.TODO(), configOpts...)
  336. if err != nil {
  337. log.G(ctx).WithError(err).Error("Could not initialize AWS SDK config")
  338. return nil, errors.Wrap(err, "could not initialize AWS SDK config")
  339. }
  340. log.G(ctx).WithFields(logrus.Fields{
  341. "region": *region,
  342. }).Debug("Created awslogs client")
  343. var clientOpts []func(*cloudwatchlogs.Options)
  344. if info.Config[logFormatKey] != "" {
  345. logFormatMiddleware := smithymiddleware.BuildMiddlewareFunc("logFormat", func(
  346. ctx context.Context, in smithymiddleware.BuildInput, next smithymiddleware.BuildHandler,
  347. ) (
  348. out smithymiddleware.BuildOutput, metadata smithymiddleware.Metadata, err error,
  349. ) {
  350. switch v := in.Request.(type) {
  351. case *smithyhttp.Request:
  352. v.Header.Add(logsFormatHeader, jsonEmfLogFormat)
  353. }
  354. return next.HandleBuild(ctx, in)
  355. })
  356. clientOpts = append(
  357. clientOpts,
  358. cloudwatchlogs.WithAPIOptions(func(stack *smithymiddleware.Stack) error {
  359. return stack.Build.Add(logFormatMiddleware, smithymiddleware.Before)
  360. }),
  361. )
  362. }
  363. clientOpts = append(
  364. clientOpts,
  365. cloudwatchlogs.WithAPIOptions(middleware.AddUserAgentKeyValue("Docker", dockerversion.Version)),
  366. )
  367. if endpoint != nil {
  368. clientOpts = append(clientOpts, cloudwatchlogs.WithEndpointResolver(cloudwatchlogs.EndpointResolverFromURL(*endpoint)))
  369. }
  370. client := cloudwatchlogs.NewFromConfig(cfg, clientOpts...)
  371. return client, nil
  372. }
  373. // Name returns the name of the awslogs logging driver
  374. func (l *logStream) Name() string {
  375. return name
  376. }
  377. // BufSize returns the maximum bytes CloudWatch can handle.
  378. func (l *logStream) BufSize() int {
  379. return maximumBytesPerEvent
  380. }
  381. // Log submits messages for logging by an instance of the awslogs logging driver
  382. func (l *logStream) Log(msg *logger.Message) error {
  383. l.lock.RLock()
  384. defer l.lock.RUnlock()
  385. if l.closed {
  386. return errors.New("awslogs is closed")
  387. }
  388. l.messages <- msg
  389. return nil
  390. }
  391. // Close closes the instance of the awslogs logging driver
  392. func (l *logStream) Close() error {
  393. l.lock.Lock()
  394. defer l.lock.Unlock()
  395. if !l.closed {
  396. close(l.messages)
  397. }
  398. l.closed = true
  399. return nil
  400. }
  401. // create creates log group and log stream for the instance of the awslogs logging driver
  402. func (l *logStream) create() error {
  403. err := l.createLogStream()
  404. if err == nil {
  405. return nil
  406. }
  407. var apiErr *types.ResourceNotFoundException
  408. if errors.As(err, &apiErr) && l.logCreateGroup {
  409. if err := l.createLogGroup(); err != nil {
  410. return errors.Wrap(err, "failed to create Cloudwatch log group")
  411. }
  412. err = l.createLogStream()
  413. if err == nil {
  414. return nil
  415. }
  416. }
  417. return errors.Wrap(err, "failed to create Cloudwatch log stream")
  418. }
  419. // createLogGroup creates a log group for the instance of the awslogs logging driver
  420. func (l *logStream) createLogGroup() error {
  421. if _, err := l.client.CreateLogGroup(context.TODO(), &cloudwatchlogs.CreateLogGroupInput{
  422. LogGroupName: aws.String(l.logGroupName),
  423. }); err != nil {
  424. var apiErr smithy.APIError
  425. if errors.As(err, &apiErr) {
  426. fields := logrus.Fields{
  427. "errorCode": apiErr.ErrorCode(),
  428. "message": apiErr.ErrorMessage(),
  429. "logGroupName": l.logGroupName,
  430. "logCreateGroup": l.logCreateGroup,
  431. }
  432. if _, ok := apiErr.(*types.ResourceAlreadyExistsException); ok {
  433. // Allow creation to succeed
  434. log.G(context.TODO()).WithFields(fields).Info("Log group already exists")
  435. return nil
  436. }
  437. log.G(context.TODO()).WithFields(fields).Error("Failed to create log group")
  438. }
  439. return err
  440. }
  441. return nil
  442. }
  443. // createLogStream creates a log stream for the instance of the awslogs logging driver
  444. func (l *logStream) createLogStream() error {
  445. // Directly return if we do not want to create log stream.
  446. if !l.logCreateStream {
  447. log.G(context.TODO()).WithFields(logrus.Fields{
  448. "logGroupName": l.logGroupName,
  449. "logStreamName": l.logStreamName,
  450. "logCreateStream": l.logCreateStream,
  451. }).Info("Skipping creating log stream")
  452. return nil
  453. }
  454. input := &cloudwatchlogs.CreateLogStreamInput{
  455. LogGroupName: aws.String(l.logGroupName),
  456. LogStreamName: aws.String(l.logStreamName),
  457. }
  458. _, err := l.client.CreateLogStream(context.TODO(), input)
  459. if err != nil {
  460. var apiErr smithy.APIError
  461. if errors.As(err, &apiErr) {
  462. fields := logrus.Fields{
  463. "errorCode": apiErr.ErrorCode(),
  464. "message": apiErr.ErrorMessage(),
  465. "logGroupName": l.logGroupName,
  466. "logStreamName": l.logStreamName,
  467. }
  468. if _, ok := apiErr.(*types.ResourceAlreadyExistsException); ok {
  469. // Allow creation to succeed
  470. log.G(context.TODO()).WithFields(fields).Info("Log stream already exists")
  471. return nil
  472. }
  473. log.G(context.TODO()).WithFields(fields).Error("Failed to create log stream")
  474. }
  475. }
  476. return err
  477. }
  478. // newTicker is used for time-based batching. newTicker is a variable such
  479. // that the implementation can be swapped out for unit tests.
  480. var newTicker = func(freq time.Duration) *time.Ticker {
  481. return time.NewTicker(freq)
  482. }
  483. // collectBatch executes as a goroutine to perform batching of log events for
  484. // submission to the log stream. If the awslogs-multiline-pattern or
  485. // awslogs-datetime-format options have been configured, multiline processing
  486. // is enabled, where log messages are stored in an event buffer until a multiline
  487. // pattern match is found, at which point the messages in the event buffer are
  488. // pushed to CloudWatch logs as a single log event. Multiline messages are processed
  489. // according to the maximumBytesPerPut constraint, and the implementation only
  490. // allows for messages to be buffered for a maximum of 2*l.forceFlushInterval
  491. // seconds. If no forceFlushInterval is specified for the log stream, then the default
  492. // of 5 seconds will be used resulting in a maximum of 10 seconds buffer time for multiline
  493. // messages. When events are ready to be processed for submission to CloudWatch
  494. // Logs, the processEvents method is called. If a multiline pattern is not
  495. // configured, log events are submitted to the processEvents method immediately.
  496. func (l *logStream) collectBatch(created chan bool) {
  497. // Wait for the logstream/group to be created
  498. <-created
  499. flushInterval := l.forceFlushInterval
  500. if flushInterval <= 0 {
  501. flushInterval = defaultForceFlushInterval
  502. }
  503. ticker := newTicker(flushInterval)
  504. var eventBuffer []byte
  505. var eventBufferTimestamp int64
  506. batch := newEventBatch()
  507. for {
  508. select {
  509. case t := <-ticker.C:
  510. // If event buffer is older than batch publish frequency flush the event buffer
  511. if eventBufferTimestamp > 0 && len(eventBuffer) > 0 {
  512. eventBufferAge := t.UnixNano()/int64(time.Millisecond) - eventBufferTimestamp
  513. eventBufferExpired := eventBufferAge >= int64(flushInterval)/int64(time.Millisecond)
  514. eventBufferNegative := eventBufferAge < 0
  515. if eventBufferExpired || eventBufferNegative {
  516. l.processEvent(batch, eventBuffer, eventBufferTimestamp)
  517. eventBuffer = eventBuffer[:0]
  518. }
  519. }
  520. l.publishBatch(batch)
  521. batch.reset()
  522. case msg, more := <-l.messages:
  523. if !more {
  524. // Flush event buffer and release resources
  525. l.processEvent(batch, eventBuffer, eventBufferTimestamp)
  526. l.publishBatch(batch)
  527. batch.reset()
  528. return
  529. }
  530. if eventBufferTimestamp == 0 {
  531. eventBufferTimestamp = msg.Timestamp.UnixNano() / int64(time.Millisecond)
  532. }
  533. line := msg.Line
  534. if l.multilinePattern != nil {
  535. lineEffectiveLen := effectiveLen(string(line))
  536. if l.multilinePattern.Match(line) || effectiveLen(string(eventBuffer))+lineEffectiveLen > maximumBytesPerEvent {
  537. // This is a new log event or we will exceed max bytes per event
  538. // so flush the current eventBuffer to events and reset timestamp
  539. l.processEvent(batch, eventBuffer, eventBufferTimestamp)
  540. eventBufferTimestamp = msg.Timestamp.UnixNano() / int64(time.Millisecond)
  541. eventBuffer = eventBuffer[:0]
  542. }
  543. // Append newline if event is less than max event size
  544. if lineEffectiveLen < maximumBytesPerEvent {
  545. line = append(line, "\n"...)
  546. }
  547. eventBuffer = append(eventBuffer, line...)
  548. logger.PutMessage(msg)
  549. } else {
  550. l.processEvent(batch, line, msg.Timestamp.UnixNano()/int64(time.Millisecond))
  551. logger.PutMessage(msg)
  552. }
  553. }
  554. }
  555. }
  556. // processEvent processes log events that are ready for submission to CloudWatch
  557. // logs. Batching is performed on time- and size-bases. Time-based batching occurs
  558. // at the interval defined by awslogs-force-flush-interval-seconds (defaults to 5 seconds).
  559. // Size-based batching is performed on the maximum number of events per batch
  560. // (defined in maximumLogEventsPerPut) and the maximum number of total bytes in a
  561. // batch (defined in maximumBytesPerPut). Log messages are split by the maximum
  562. // bytes per event (defined in maximumBytesPerEvent). There is a fixed per-event
  563. // byte overhead (defined in perEventBytes) which is accounted for in split- and
  564. // batch-calculations. Because the events are interpreted as UTF-8 encoded
  565. // Unicode, invalid UTF-8 byte sequences are replaced with the Unicode
  566. // replacement character (U+FFFD), which is a 3-byte sequence in UTF-8. To
  567. // compensate for that and to avoid splitting valid UTF-8 characters into
  568. // invalid byte sequences, we calculate the length of each event assuming that
  569. // this replacement happens.
  570. func (l *logStream) processEvent(batch *eventBatch, bytes []byte, timestamp int64) {
  571. for len(bytes) > 0 {
  572. // Split line length so it does not exceed the maximum
  573. splitOffset, lineBytes := findValidSplit(string(bytes), maximumBytesPerEvent)
  574. line := bytes[:splitOffset]
  575. event := wrappedEvent{
  576. inputLogEvent: types.InputLogEvent{
  577. Message: aws.String(string(line)),
  578. Timestamp: aws.Int64(timestamp),
  579. },
  580. insertOrder: batch.count(),
  581. }
  582. added := batch.add(event, lineBytes)
  583. if added {
  584. bytes = bytes[splitOffset:]
  585. } else {
  586. l.publishBatch(batch)
  587. batch.reset()
  588. }
  589. }
  590. }
  591. // effectiveLen counts the effective number of bytes in the string, after
  592. // UTF-8 normalization. UTF-8 normalization includes replacing bytes that do
  593. // not constitute valid UTF-8 encoded Unicode codepoints with the Unicode
  594. // replacement codepoint U+FFFD (a 3-byte UTF-8 sequence, represented in Go as
  595. // utf8.RuneError)
  596. func effectiveLen(line string) int {
  597. effectiveBytes := 0
  598. for _, rune := range line {
  599. effectiveBytes += utf8.RuneLen(rune)
  600. }
  601. return effectiveBytes
  602. }
  603. // findValidSplit finds the byte offset to split a string without breaking valid
  604. // Unicode codepoints given a maximum number of total bytes. findValidSplit
  605. // returns the byte offset for splitting a string or []byte, as well as the
  606. // effective number of bytes if the string were normalized to replace invalid
  607. // UTF-8 encoded bytes with the Unicode replacement character (a 3-byte UTF-8
  608. // sequence, represented in Go as utf8.RuneError)
  609. func findValidSplit(line string, maxBytes int) (splitOffset, effectiveBytes int) {
  610. for offset, rune := range line {
  611. splitOffset = offset
  612. if effectiveBytes+utf8.RuneLen(rune) > maxBytes {
  613. return splitOffset, effectiveBytes
  614. }
  615. effectiveBytes += utf8.RuneLen(rune)
  616. }
  617. splitOffset = len(line)
  618. return
  619. }
  620. // publishBatch calls PutLogEvents for a given set of InputLogEvents,
  621. // accounting for sequencing requirements (each request must reference the
  622. // sequence token returned by the previous request).
  623. func (l *logStream) publishBatch(batch *eventBatch) {
  624. if batch.isEmpty() {
  625. return
  626. }
  627. cwEvents := unwrapEvents(batch.events())
  628. nextSequenceToken, err := l.putLogEvents(cwEvents, l.sequenceToken)
  629. if err != nil {
  630. if apiErr := (*types.DataAlreadyAcceptedException)(nil); errors.As(err, &apiErr) {
  631. // already submitted, just grab the correct sequence token
  632. nextSequenceToken = apiErr.ExpectedSequenceToken
  633. log.G(context.TODO()).WithFields(logrus.Fields{
  634. "errorCode": apiErr.ErrorCode(),
  635. "message": apiErr.ErrorMessage(),
  636. "logGroupName": l.logGroupName,
  637. "logStreamName": l.logStreamName,
  638. }).Info("Data already accepted, ignoring error")
  639. err = nil
  640. } else if apiErr := (*types.InvalidSequenceTokenException)(nil); errors.As(err, &apiErr) {
  641. nextSequenceToken, err = l.putLogEvents(cwEvents, apiErr.ExpectedSequenceToken)
  642. }
  643. }
  644. if err != nil {
  645. log.G(context.TODO()).Error(err)
  646. } else {
  647. l.sequenceToken = nextSequenceToken
  648. }
  649. }
  650. // putLogEvents wraps the PutLogEvents API
  651. func (l *logStream) putLogEvents(events []types.InputLogEvent, sequenceToken *string) (*string, error) {
  652. input := &cloudwatchlogs.PutLogEventsInput{
  653. LogEvents: events,
  654. SequenceToken: sequenceToken,
  655. LogGroupName: aws.String(l.logGroupName),
  656. LogStreamName: aws.String(l.logStreamName),
  657. }
  658. resp, err := l.client.PutLogEvents(context.TODO(), input)
  659. if err != nil {
  660. var apiErr smithy.APIError
  661. if errors.As(err, &apiErr) {
  662. log.G(context.TODO()).WithFields(logrus.Fields{
  663. "errorCode": apiErr.ErrorCode(),
  664. "message": apiErr.ErrorMessage(),
  665. "logGroupName": l.logGroupName,
  666. "logStreamName": l.logStreamName,
  667. }).Error("Failed to put log events")
  668. }
  669. return nil, err
  670. }
  671. return resp.NextSequenceToken, nil
  672. }
  673. // ValidateLogOpt looks for awslogs-specific log options awslogs-region, awslogs-endpoint
  674. // awslogs-group, awslogs-stream, awslogs-create-group, awslogs-datetime-format,
  675. // awslogs-multiline-pattern
  676. func ValidateLogOpt(cfg map[string]string) error {
  677. for key := range cfg {
  678. switch key {
  679. case logGroupKey:
  680. case logStreamKey:
  681. case logCreateGroupKey:
  682. case regionKey:
  683. case endpointKey:
  684. case tagKey:
  685. case datetimeFormatKey:
  686. case multilinePatternKey:
  687. case credentialsEndpointKey:
  688. case forceFlushIntervalKey:
  689. case maxBufferedEventsKey:
  690. case logFormatKey:
  691. default:
  692. return fmt.Errorf("unknown log opt '%s' for %s log driver", key, name)
  693. }
  694. }
  695. if cfg[logGroupKey] == "" {
  696. return fmt.Errorf("must specify a value for log opt '%s'", logGroupKey)
  697. }
  698. if cfg[logCreateGroupKey] != "" {
  699. if _, err := strconv.ParseBool(cfg[logCreateGroupKey]); err != nil {
  700. return fmt.Errorf("must specify valid value for log opt '%s': %v", logCreateGroupKey, err)
  701. }
  702. }
  703. if cfg[forceFlushIntervalKey] != "" {
  704. if value, err := strconv.Atoi(cfg[forceFlushIntervalKey]); err != nil || value <= 0 {
  705. return fmt.Errorf("must specify a positive integer for log opt '%s': %v", forceFlushIntervalKey, cfg[forceFlushIntervalKey])
  706. }
  707. }
  708. if cfg[maxBufferedEventsKey] != "" {
  709. if value, err := strconv.Atoi(cfg[maxBufferedEventsKey]); err != nil || value <= 0 {
  710. return fmt.Errorf("must specify a positive integer for log opt '%s': %v", maxBufferedEventsKey, cfg[maxBufferedEventsKey])
  711. }
  712. }
  713. _, datetimeFormatKeyExists := cfg[datetimeFormatKey]
  714. _, multilinePatternKeyExists := cfg[multilinePatternKey]
  715. if datetimeFormatKeyExists && multilinePatternKeyExists {
  716. return fmt.Errorf("you cannot configure log opt '%s' and '%s' at the same time", datetimeFormatKey, multilinePatternKey)
  717. }
  718. if cfg[logFormatKey] != "" {
  719. // For now, only the "json/emf" log format is supported
  720. if cfg[logFormatKey] != jsonEmfLogFormat {
  721. return fmt.Errorf("unsupported log format '%s'", cfg[logFormatKey])
  722. }
  723. if datetimeFormatKeyExists || multilinePatternKeyExists {
  724. return fmt.Errorf("you cannot configure log opt '%s' or '%s' when log opt '%s' is set to '%s'", datetimeFormatKey, multilinePatternKey, logFormatKey, jsonEmfLogFormat)
  725. }
  726. }
  727. return nil
  728. }
  729. // Len returns the length of a byTimestamp slice. Len is required by the
  730. // sort.Interface interface.
  731. func (slice byTimestamp) Len() int {
  732. return len(slice)
  733. }
  734. // Less compares two values in a byTimestamp slice by Timestamp. Less is
  735. // required by the sort.Interface interface.
  736. func (slice byTimestamp) Less(i, j int) bool {
  737. iTimestamp, jTimestamp := int64(0), int64(0)
  738. if slice != nil && slice[i].inputLogEvent.Timestamp != nil {
  739. iTimestamp = *slice[i].inputLogEvent.Timestamp
  740. }
  741. if slice != nil && slice[j].inputLogEvent.Timestamp != nil {
  742. jTimestamp = *slice[j].inputLogEvent.Timestamp
  743. }
  744. if iTimestamp == jTimestamp {
  745. return slice[i].insertOrder < slice[j].insertOrder
  746. }
  747. return iTimestamp < jTimestamp
  748. }
  749. // Swap swaps two values in a byTimestamp slice with each other. Swap is
  750. // required by the sort.Interface interface.
  751. func (slice byTimestamp) Swap(i, j int) {
  752. slice[i], slice[j] = slice[j], slice[i]
  753. }
  754. func unwrapEvents(events []wrappedEvent) []types.InputLogEvent {
  755. cwEvents := make([]types.InputLogEvent, len(events))
  756. for i, input := range events {
  757. cwEvents[i] = input.inputLogEvent
  758. }
  759. return cwEvents
  760. }
  761. func newEventBatch() *eventBatch {
  762. return &eventBatch{
  763. batch: make([]wrappedEvent, 0),
  764. bytes: 0,
  765. }
  766. }
  767. // events returns a slice of wrappedEvents sorted in order of their
  768. // timestamps and then by their insertion order (see `byTimestamp`).
  769. //
  770. // Warning: this method is not threadsafe and must not be used
  771. // concurrently.
  772. func (b *eventBatch) events() []wrappedEvent {
  773. sort.Sort(byTimestamp(b.batch))
  774. return b.batch
  775. }
  776. // add adds an event to the batch of events accounting for the
  777. // necessary overhead for an event to be logged. An error will be
  778. // returned if the event cannot be added to the batch due to service
  779. // limits.
  780. //
  781. // Warning: this method is not threadsafe and must not be used
  782. // concurrently.
  783. func (b *eventBatch) add(event wrappedEvent, size int) bool {
  784. addBytes := size + perEventBytes
  785. // verify we are still within service limits
  786. switch {
  787. case len(b.batch)+1 > maximumLogEventsPerPut:
  788. return false
  789. case b.bytes+addBytes > maximumBytesPerPut:
  790. return false
  791. }
  792. b.bytes += addBytes
  793. b.batch = append(b.batch, event)
  794. return true
  795. }
  796. // count is the number of batched events. Warning: this method
  797. // is not threadsafe and must not be used concurrently.
  798. func (b *eventBatch) count() int {
  799. return len(b.batch)
  800. }
  801. // size is the total number of bytes that the batch represents.
  802. //
  803. // Warning: this method is not threadsafe and must not be used
  804. // concurrently.
  805. func (b *eventBatch) size() int {
  806. return b.bytes
  807. }
  808. func (b *eventBatch) isEmpty() bool {
  809. zeroEvents := b.count() == 0
  810. zeroSize := b.size() == 0
  811. return zeroEvents && zeroSize
  812. }
  813. // reset prepares the batch for reuse.
  814. func (b *eventBatch) reset() {
  815. b.bytes = 0
  816. b.batch = b.batch[:0]
  817. }