cloudwatchlogs.go 30 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916
  1. // Package awslogs provides the logdriver for forwarding container logs to Amazon CloudWatch Logs
  2. package awslogs // import "github.com/docker/docker/daemon/logger/awslogs"
  3. import (
  4. "fmt"
  5. "os"
  6. "regexp"
  7. "runtime"
  8. "sort"
  9. "strconv"
  10. "strings"
  11. "sync"
  12. "time"
  13. "unicode/utf8"
  14. "github.com/aws/aws-sdk-go/aws"
  15. "github.com/aws/aws-sdk-go/aws/awserr"
  16. "github.com/aws/aws-sdk-go/aws/credentials/endpointcreds"
  17. "github.com/aws/aws-sdk-go/aws/ec2metadata"
  18. "github.com/aws/aws-sdk-go/aws/request"
  19. "github.com/aws/aws-sdk-go/aws/session"
  20. "github.com/aws/aws-sdk-go/service/cloudwatchlogs"
  21. "github.com/docker/docker/daemon/logger"
  22. "github.com/docker/docker/daemon/logger/loggerutils"
  23. "github.com/docker/docker/dockerversion"
  24. "github.com/pkg/errors"
  25. "github.com/sirupsen/logrus"
  26. )
  27. const (
  28. name = "awslogs"
  29. regionKey = "awslogs-region"
  30. endpointKey = "awslogs-endpoint"
  31. regionEnvKey = "AWS_REGION"
  32. logGroupKey = "awslogs-group"
  33. logStreamKey = "awslogs-stream"
  34. logCreateGroupKey = "awslogs-create-group"
  35. logCreateStreamKey = "awslogs-create-stream"
  36. tagKey = "tag"
  37. datetimeFormatKey = "awslogs-datetime-format"
  38. multilinePatternKey = "awslogs-multiline-pattern"
  39. credentialsEndpointKey = "awslogs-credentials-endpoint"
  40. forceFlushIntervalKey = "awslogs-force-flush-interval-seconds"
  41. maxBufferedEventsKey = "awslogs-max-buffered-events"
  42. logFormatKey = "awslogs-format"
  43. defaultForceFlushInterval = 5 * time.Second
  44. defaultMaxBufferedEvents = 4096
  45. // See: http://docs.aws.amazon.com/AmazonCloudWatchLogs/latest/APIReference/API_PutLogEvents.html
  46. perEventBytes = 26
  47. maximumBytesPerPut = 1048576
  48. maximumLogEventsPerPut = 10000
  49. // See: http://docs.aws.amazon.com/AmazonCloudWatch/latest/DeveloperGuide/cloudwatch_limits.html
  50. // Because the events are interpreted as UTF-8 encoded Unicode, invalid UTF-8 byte sequences are replaced with the
  51. // Unicode replacement character (U+FFFD), which is a 3-byte sequence in UTF-8. To compensate for that and to avoid
  52. // splitting valid UTF-8 characters into invalid byte sequences, we calculate the length of each event assuming that
  53. // this replacement happens.
  54. maximumBytesPerEvent = 262144 - perEventBytes
  55. resourceAlreadyExistsCode = "ResourceAlreadyExistsException"
  56. dataAlreadyAcceptedCode = "DataAlreadyAcceptedException"
  57. invalidSequenceTokenCode = "InvalidSequenceTokenException"
  58. resourceNotFoundCode = "ResourceNotFoundException"
  59. credentialsEndpoint = "http://169.254.170.2"
  60. userAgentHeader = "User-Agent"
  61. // See: https://docs.aws.amazon.com/AmazonCloudWatch/latest/monitoring/CloudWatch_Embedded_Metric_Format_Specification.html
  62. logsFormatHeader = "x-amzn-logs-format"
  63. jsonEmfLogFormat = "json/emf"
  64. )
  65. type logStream struct {
  66. logStreamName string
  67. logGroupName string
  68. logCreateGroup bool
  69. logCreateStream bool
  70. logNonBlocking bool
  71. forceFlushInterval time.Duration
  72. multilinePattern *regexp.Regexp
  73. client api
  74. messages chan *logger.Message
  75. lock sync.RWMutex
  76. closed bool
  77. sequenceToken *string
  78. }
  79. type logStreamConfig struct {
  80. logStreamName string
  81. logGroupName string
  82. logCreateGroup bool
  83. logCreateStream bool
  84. logNonBlocking bool
  85. forceFlushInterval time.Duration
  86. maxBufferedEvents int
  87. multilinePattern *regexp.Regexp
  88. }
  89. var _ logger.SizedLogger = &logStream{}
  90. type api interface {
  91. CreateLogGroup(*cloudwatchlogs.CreateLogGroupInput) (*cloudwatchlogs.CreateLogGroupOutput, error)
  92. CreateLogStream(*cloudwatchlogs.CreateLogStreamInput) (*cloudwatchlogs.CreateLogStreamOutput, error)
  93. PutLogEvents(*cloudwatchlogs.PutLogEventsInput) (*cloudwatchlogs.PutLogEventsOutput, error)
  94. }
  95. type regionFinder interface {
  96. Region() (string, error)
  97. }
  98. type wrappedEvent struct {
  99. inputLogEvent *cloudwatchlogs.InputLogEvent
  100. insertOrder int
  101. }
  102. type byTimestamp []wrappedEvent
  103. // init registers the awslogs driver
  104. func init() {
  105. if err := logger.RegisterLogDriver(name, New); err != nil {
  106. logrus.Fatal(err)
  107. }
  108. if err := logger.RegisterLogOptValidator(name, ValidateLogOpt); err != nil {
  109. logrus.Fatal(err)
  110. }
  111. }
  112. // eventBatch holds the events that are batched for submission and the
  113. // associated data about it.
  114. //
  115. // Warning: this type is not threadsafe and must not be used
  116. // concurrently. This type is expected to be consumed in a single go
  117. // routine and never concurrently.
  118. type eventBatch struct {
  119. batch []wrappedEvent
  120. bytes int
  121. }
  122. // New creates an awslogs logger using the configuration passed in on the
  123. // context. Supported context configuration variables are awslogs-region,
  124. // awslogs-endpoint, awslogs-group, awslogs-stream, awslogs-create-group,
  125. // awslogs-multiline-pattern and awslogs-datetime-format.
  126. // When available, configuration is also taken from environment variables
  127. // AWS_REGION, AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY, the shared credentials
  128. // file (~/.aws/credentials), and the EC2 Instance Metadata Service.
  129. func New(info logger.Info) (logger.Logger, error) {
  130. containerStreamConfig, err := newStreamConfig(info)
  131. if err != nil {
  132. return nil, err
  133. }
  134. client, err := newAWSLogsClient(info)
  135. if err != nil {
  136. return nil, err
  137. }
  138. containerStream := &logStream{
  139. logStreamName: containerStreamConfig.logStreamName,
  140. logGroupName: containerStreamConfig.logGroupName,
  141. logCreateGroup: containerStreamConfig.logCreateGroup,
  142. logCreateStream: containerStreamConfig.logCreateStream,
  143. logNonBlocking: containerStreamConfig.logNonBlocking,
  144. forceFlushInterval: containerStreamConfig.forceFlushInterval,
  145. multilinePattern: containerStreamConfig.multilinePattern,
  146. client: client,
  147. messages: make(chan *logger.Message, containerStreamConfig.maxBufferedEvents),
  148. }
  149. creationDone := make(chan bool)
  150. if containerStream.logNonBlocking {
  151. go func() {
  152. backoff := 1
  153. maxBackoff := 32
  154. for {
  155. // If logger is closed we are done
  156. containerStream.lock.RLock()
  157. if containerStream.closed {
  158. containerStream.lock.RUnlock()
  159. break
  160. }
  161. containerStream.lock.RUnlock()
  162. err := containerStream.create()
  163. if err == nil {
  164. break
  165. }
  166. time.Sleep(time.Duration(backoff) * time.Second)
  167. if backoff < maxBackoff {
  168. backoff *= 2
  169. }
  170. logrus.
  171. WithError(err).
  172. WithField("container-id", info.ContainerID).
  173. WithField("container-name", info.ContainerName).
  174. Error("Error while trying to initialize awslogs. Retrying in: ", backoff, " seconds")
  175. }
  176. close(creationDone)
  177. }()
  178. } else {
  179. if err = containerStream.create(); err != nil {
  180. return nil, err
  181. }
  182. close(creationDone)
  183. }
  184. go containerStream.collectBatch(creationDone)
  185. return containerStream, nil
  186. }
  187. // Parses most of the awslogs- options and prepares a config object to be used for newing the actual stream
  188. // It has been formed out to ease Utest of the New above
  189. func newStreamConfig(info logger.Info) (*logStreamConfig, error) {
  190. logGroupName := info.Config[logGroupKey]
  191. logStreamName, err := loggerutils.ParseLogTag(info, "{{.FullID}}")
  192. if err != nil {
  193. return nil, err
  194. }
  195. logCreateGroup := false
  196. if info.Config[logCreateGroupKey] != "" {
  197. logCreateGroup, err = strconv.ParseBool(info.Config[logCreateGroupKey])
  198. if err != nil {
  199. return nil, err
  200. }
  201. }
  202. logNonBlocking := info.Config["mode"] == "non-blocking"
  203. forceFlushInterval := defaultForceFlushInterval
  204. if info.Config[forceFlushIntervalKey] != "" {
  205. forceFlushIntervalAsInt, err := strconv.Atoi(info.Config[forceFlushIntervalKey])
  206. if err != nil {
  207. return nil, err
  208. }
  209. forceFlushInterval = time.Duration(forceFlushIntervalAsInt) * time.Second
  210. }
  211. maxBufferedEvents := int(defaultMaxBufferedEvents)
  212. if info.Config[maxBufferedEventsKey] != "" {
  213. maxBufferedEvents, err = strconv.Atoi(info.Config[maxBufferedEventsKey])
  214. if err != nil {
  215. return nil, err
  216. }
  217. }
  218. if info.Config[logStreamKey] != "" {
  219. logStreamName = info.Config[logStreamKey]
  220. }
  221. logCreateStream := true
  222. if info.Config[logCreateStreamKey] != "" {
  223. logCreateStream, err = strconv.ParseBool(info.Config[logCreateStreamKey])
  224. if err != nil {
  225. return nil, err
  226. }
  227. }
  228. multilinePattern, err := parseMultilineOptions(info)
  229. if err != nil {
  230. return nil, err
  231. }
  232. containerStreamConfig := &logStreamConfig{
  233. logStreamName: logStreamName,
  234. logGroupName: logGroupName,
  235. logCreateGroup: logCreateGroup,
  236. logCreateStream: logCreateStream,
  237. logNonBlocking: logNonBlocking,
  238. forceFlushInterval: forceFlushInterval,
  239. maxBufferedEvents: maxBufferedEvents,
  240. multilinePattern: multilinePattern,
  241. }
  242. return containerStreamConfig, nil
  243. }
  244. // Parses awslogs-multiline-pattern and awslogs-datetime-format options
  245. // If awslogs-datetime-format is present, convert the format from strftime
  246. // to regexp and return.
  247. // If awslogs-multiline-pattern is present, compile regexp and return
  248. func parseMultilineOptions(info logger.Info) (*regexp.Regexp, error) {
  249. dateTimeFormat := info.Config[datetimeFormatKey]
  250. multilinePatternKey := info.Config[multilinePatternKey]
  251. // strftime input is parsed into a regular expression
  252. if dateTimeFormat != "" {
  253. // %. matches each strftime format sequence and ReplaceAllStringFunc
  254. // looks up each format sequence in the conversion table strftimeToRegex
  255. // to replace with a defined regular expression
  256. r := regexp.MustCompile("%.")
  257. multilinePatternKey = r.ReplaceAllStringFunc(dateTimeFormat, func(s string) string {
  258. return strftimeToRegex[s]
  259. })
  260. }
  261. if multilinePatternKey != "" {
  262. multilinePattern, err := regexp.Compile(multilinePatternKey)
  263. if err != nil {
  264. return nil, errors.Wrapf(err, "awslogs could not parse multiline pattern key %q", multilinePatternKey)
  265. }
  266. return multilinePattern, nil
  267. }
  268. return nil, nil
  269. }
  270. // Maps strftime format strings to regex
  271. var strftimeToRegex = map[string]string{
  272. /*weekdayShort */ `%a`: `(?:Mon|Tue|Wed|Thu|Fri|Sat|Sun)`,
  273. /*weekdayFull */ `%A`: `(?:Monday|Tuesday|Wednesday|Thursday|Friday|Saturday|Sunday)`,
  274. /*weekdayZeroIndex */ `%w`: `[0-6]`,
  275. /*dayZeroPadded */ `%d`: `(?:0[1-9]|[1,2][0-9]|3[0,1])`,
  276. /*monthShort */ `%b`: `(?:Jan|Feb|Mar|Apr|May|Jun|Jul|Aug|Sep|Oct|Nov|Dec)`,
  277. /*monthFull */ `%B`: `(?:January|February|March|April|May|June|July|August|September|October|November|December)`,
  278. /*monthZeroPadded */ `%m`: `(?:0[1-9]|1[0-2])`,
  279. /*yearCentury */ `%Y`: `\d{4}`,
  280. /*yearZeroPadded */ `%y`: `\d{2}`,
  281. /*hour24ZeroPadded */ `%H`: `(?:[0,1][0-9]|2[0-3])`,
  282. /*hour12ZeroPadded */ `%I`: `(?:0[0-9]|1[0-2])`,
  283. /*AM or PM */ `%p`: "[A,P]M",
  284. /*minuteZeroPadded */ `%M`: `[0-5][0-9]`,
  285. /*secondZeroPadded */ `%S`: `[0-5][0-9]`,
  286. /*microsecondZeroPadded */ `%f`: `\d{6}`,
  287. /*utcOffset */ `%z`: `[+-]\d{4}`,
  288. /*tzName */ `%Z`: `[A-Z]{1,4}T`,
  289. /*dayOfYearZeroPadded */ `%j`: `(?:0[0-9][1-9]|[1,2][0-9][0-9]|3[0-5][0-9]|36[0-6])`,
  290. /*milliseconds */ `%L`: `\.\d{3}`,
  291. }
  292. // newRegionFinder is a variable such that the implementation
  293. // can be swapped out for unit tests.
  294. var newRegionFinder = func() (regionFinder, error) {
  295. s, err := session.NewSession()
  296. if err != nil {
  297. return nil, err
  298. }
  299. return ec2metadata.New(s), nil
  300. }
  301. // newSDKEndpoint is a variable such that the implementation
  302. // can be swapped out for unit tests.
  303. var newSDKEndpoint = credentialsEndpoint
  304. // newAWSLogsClient creates the service client for Amazon CloudWatch Logs.
  305. // Customizations to the default client from the SDK include a Docker-specific
  306. // User-Agent string and automatic region detection using the EC2 Instance
  307. // Metadata Service when region is otherwise unspecified.
  308. func newAWSLogsClient(info logger.Info) (api, error) {
  309. var region, endpoint *string
  310. if os.Getenv(regionEnvKey) != "" {
  311. region = aws.String(os.Getenv(regionEnvKey))
  312. }
  313. if info.Config[regionKey] != "" {
  314. region = aws.String(info.Config[regionKey])
  315. }
  316. if info.Config[endpointKey] != "" {
  317. endpoint = aws.String(info.Config[endpointKey])
  318. }
  319. if region == nil || *region == "" {
  320. logrus.Info("Trying to get region from EC2 Metadata")
  321. ec2MetadataClient, err := newRegionFinder()
  322. if err != nil {
  323. logrus.WithError(err).Error("could not create EC2 metadata client")
  324. return nil, errors.Wrap(err, "could not create EC2 metadata client")
  325. }
  326. r, err := ec2MetadataClient.Region()
  327. if err != nil {
  328. logrus.WithError(err).Error("Could not get region from EC2 metadata, environment, or log option")
  329. return nil, errors.New("Cannot determine region for awslogs driver")
  330. }
  331. region = &r
  332. }
  333. sess, err := session.NewSession()
  334. if err != nil {
  335. return nil, errors.New("Failed to create a service client session for awslogs driver")
  336. }
  337. // attach region to cloudwatchlogs config
  338. sess.Config.Region = region
  339. // attach endpoint to cloudwatchlogs config
  340. if endpoint != nil {
  341. sess.Config.Endpoint = endpoint
  342. }
  343. if uri, ok := info.Config[credentialsEndpointKey]; ok {
  344. logrus.Debugf("Trying to get credentials from awslogs-credentials-endpoint")
  345. endpoint := fmt.Sprintf("%s%s", newSDKEndpoint, uri)
  346. creds := endpointcreds.NewCredentialsClient(*sess.Config, sess.Handlers, endpoint,
  347. func(p *endpointcreds.Provider) {
  348. p.ExpiryWindow = 5 * time.Minute
  349. })
  350. // attach credentials to cloudwatchlogs config
  351. sess.Config.Credentials = creds
  352. }
  353. logrus.WithFields(logrus.Fields{
  354. "region": *region,
  355. }).Debug("Created awslogs client")
  356. client := cloudwatchlogs.New(sess)
  357. client.Handlers.Build.PushBackNamed(request.NamedHandler{
  358. Name: "DockerUserAgentHandler",
  359. Fn: func(r *request.Request) {
  360. currentAgent := r.HTTPRequest.Header.Get(userAgentHeader)
  361. r.HTTPRequest.Header.Set(userAgentHeader,
  362. fmt.Sprintf("Docker %s (%s) %s",
  363. dockerversion.Version, runtime.GOOS, currentAgent))
  364. },
  365. })
  366. if info.Config[logFormatKey] != "" {
  367. client.Handlers.Build.PushBackNamed(request.NamedHandler{
  368. Name: "LogFormatHeaderHandler",
  369. Fn: func(req *request.Request) {
  370. req.HTTPRequest.Header.Set(logsFormatHeader, info.Config[logFormatKey])
  371. },
  372. })
  373. }
  374. return client, nil
  375. }
  376. // Name returns the name of the awslogs logging driver
  377. func (l *logStream) Name() string {
  378. return name
  379. }
  380. // BufSize returns the maximum bytes CloudWatch can handle.
  381. func (l *logStream) BufSize() int {
  382. return maximumBytesPerEvent
  383. }
  384. // Log submits messages for logging by an instance of the awslogs logging driver
  385. func (l *logStream) Log(msg *logger.Message) error {
  386. l.lock.RLock()
  387. defer l.lock.RUnlock()
  388. if l.closed {
  389. return errors.New("awslogs is closed")
  390. }
  391. if l.logNonBlocking {
  392. select {
  393. case l.messages <- msg:
  394. return nil
  395. default:
  396. return errors.New("awslogs buffer is full")
  397. }
  398. }
  399. l.messages <- msg
  400. return nil
  401. }
  402. // Close closes the instance of the awslogs logging driver
  403. func (l *logStream) Close() error {
  404. l.lock.Lock()
  405. defer l.lock.Unlock()
  406. if !l.closed {
  407. close(l.messages)
  408. }
  409. l.closed = true
  410. return nil
  411. }
  412. // create creates log group and log stream for the instance of the awslogs logging driver
  413. func (l *logStream) create() error {
  414. err := l.createLogStream()
  415. if err == nil {
  416. return nil
  417. }
  418. if awsErr, ok := err.(awserr.Error); ok && awsErr.Code() == resourceNotFoundCode && l.logCreateGroup {
  419. if err := l.createLogGroup(); err != nil {
  420. return errors.Wrap(err, "failed to create Cloudwatch log group")
  421. }
  422. err = l.createLogStream()
  423. if err == nil {
  424. return nil
  425. }
  426. }
  427. return errors.Wrap(err, "failed to create Cloudwatch log stream")
  428. }
  429. // createLogGroup creates a log group for the instance of the awslogs logging driver
  430. func (l *logStream) createLogGroup() error {
  431. if _, err := l.client.CreateLogGroup(&cloudwatchlogs.CreateLogGroupInput{
  432. LogGroupName: aws.String(l.logGroupName),
  433. }); err != nil {
  434. if awsErr, ok := err.(awserr.Error); ok {
  435. fields := logrus.Fields{
  436. "errorCode": awsErr.Code(),
  437. "message": awsErr.Message(),
  438. "origError": awsErr.OrigErr(),
  439. "logGroupName": l.logGroupName,
  440. "logCreateGroup": l.logCreateGroup,
  441. }
  442. if awsErr.Code() == resourceAlreadyExistsCode {
  443. // Allow creation to succeed
  444. logrus.WithFields(fields).Info("Log group already exists")
  445. return nil
  446. }
  447. logrus.WithFields(fields).Error("Failed to create log group")
  448. }
  449. return err
  450. }
  451. return nil
  452. }
  453. // createLogStream creates a log stream for the instance of the awslogs logging driver
  454. func (l *logStream) createLogStream() error {
  455. // Directly return if we do not want to create log stream.
  456. if !l.logCreateStream {
  457. logrus.WithFields(logrus.Fields{
  458. "logGroupName": l.logGroupName,
  459. "logStreamName": l.logStreamName,
  460. "logCreateStream": l.logCreateStream,
  461. }).Info("Skipping creating log stream")
  462. return nil
  463. }
  464. input := &cloudwatchlogs.CreateLogStreamInput{
  465. LogGroupName: aws.String(l.logGroupName),
  466. LogStreamName: aws.String(l.logStreamName),
  467. }
  468. _, err := l.client.CreateLogStream(input)
  469. if err != nil {
  470. if awsErr, ok := err.(awserr.Error); ok {
  471. fields := logrus.Fields{
  472. "errorCode": awsErr.Code(),
  473. "message": awsErr.Message(),
  474. "origError": awsErr.OrigErr(),
  475. "logGroupName": l.logGroupName,
  476. "logStreamName": l.logStreamName,
  477. }
  478. if awsErr.Code() == resourceAlreadyExistsCode {
  479. // Allow creation to succeed
  480. logrus.WithFields(fields).Info("Log stream already exists")
  481. return nil
  482. }
  483. logrus.WithFields(fields).Error("Failed to create log stream")
  484. }
  485. }
  486. return err
  487. }
  488. // newTicker is used for time-based batching. newTicker is a variable such
  489. // that the implementation can be swapped out for unit tests.
  490. var newTicker = func(freq time.Duration) *time.Ticker {
  491. return time.NewTicker(freq)
  492. }
  493. // collectBatch executes as a goroutine to perform batching of log events for
  494. // submission to the log stream. If the awslogs-multiline-pattern or
  495. // awslogs-datetime-format options have been configured, multiline processing
  496. // is enabled, where log messages are stored in an event buffer until a multiline
  497. // pattern match is found, at which point the messages in the event buffer are
  498. // pushed to CloudWatch logs as a single log event. Multiline messages are processed
  499. // according to the maximumBytesPerPut constraint, and the implementation only
  500. // allows for messages to be buffered for a maximum of 2*batchPublishFrequency
  501. // seconds. When events are ready to be processed for submission to CloudWatch
  502. // Logs, the processEvents method is called. If a multiline pattern is not
  503. // configured, log events are submitted to the processEvents method immediately.
  504. func (l *logStream) collectBatch(created chan bool) {
  505. // Wait for the logstream/group to be created
  506. <-created
  507. flushInterval := l.forceFlushInterval
  508. if flushInterval <= 0 {
  509. flushInterval = defaultForceFlushInterval
  510. }
  511. ticker := newTicker(flushInterval)
  512. var eventBuffer []byte
  513. var eventBufferTimestamp int64
  514. var batch = newEventBatch()
  515. for {
  516. select {
  517. case t := <-ticker.C:
  518. // If event buffer is older than batch publish frequency flush the event buffer
  519. if eventBufferTimestamp > 0 && len(eventBuffer) > 0 {
  520. eventBufferAge := t.UnixNano()/int64(time.Millisecond) - eventBufferTimestamp
  521. eventBufferExpired := eventBufferAge >= int64(flushInterval)/int64(time.Millisecond)
  522. eventBufferNegative := eventBufferAge < 0
  523. if eventBufferExpired || eventBufferNegative {
  524. l.processEvent(batch, eventBuffer, eventBufferTimestamp)
  525. eventBuffer = eventBuffer[:0]
  526. }
  527. }
  528. l.publishBatch(batch)
  529. batch.reset()
  530. case msg, more := <-l.messages:
  531. if !more {
  532. // Flush event buffer and release resources
  533. l.processEvent(batch, eventBuffer, eventBufferTimestamp)
  534. l.publishBatch(batch)
  535. batch.reset()
  536. return
  537. }
  538. if eventBufferTimestamp == 0 {
  539. eventBufferTimestamp = msg.Timestamp.UnixNano() / int64(time.Millisecond)
  540. }
  541. line := msg.Line
  542. if l.multilinePattern != nil {
  543. lineEffectiveLen := effectiveLen(string(line))
  544. if l.multilinePattern.Match(line) || effectiveLen(string(eventBuffer))+lineEffectiveLen > maximumBytesPerEvent {
  545. // This is a new log event or we will exceed max bytes per event
  546. // so flush the current eventBuffer to events and reset timestamp
  547. l.processEvent(batch, eventBuffer, eventBufferTimestamp)
  548. eventBufferTimestamp = msg.Timestamp.UnixNano() / int64(time.Millisecond)
  549. eventBuffer = eventBuffer[:0]
  550. }
  551. // Append newline if event is less than max event size
  552. if lineEffectiveLen < maximumBytesPerEvent {
  553. line = append(line, "\n"...)
  554. }
  555. eventBuffer = append(eventBuffer, line...)
  556. logger.PutMessage(msg)
  557. } else {
  558. l.processEvent(batch, line, msg.Timestamp.UnixNano()/int64(time.Millisecond))
  559. logger.PutMessage(msg)
  560. }
  561. }
  562. }
  563. }
  564. // processEvent processes log events that are ready for submission to CloudWatch
  565. // logs. Batching is performed on time- and size-bases. Time-based batching
  566. // occurs at a 5 second interval (defined in the batchPublishFrequency const).
  567. // Size-based batching is performed on the maximum number of events per batch
  568. // (defined in maximumLogEventsPerPut) and the maximum number of total bytes in a
  569. // batch (defined in maximumBytesPerPut). Log messages are split by the maximum
  570. // bytes per event (defined in maximumBytesPerEvent). There is a fixed per-event
  571. // byte overhead (defined in perEventBytes) which is accounted for in split- and
  572. // batch-calculations. Because the events are interpreted as UTF-8 encoded
  573. // Unicode, invalid UTF-8 byte sequences are replaced with the Unicode
  574. // replacement character (U+FFFD), which is a 3-byte sequence in UTF-8. To
  575. // compensate for that and to avoid splitting valid UTF-8 characters into
  576. // invalid byte sequences, we calculate the length of each event assuming that
  577. // this replacement happens.
  578. func (l *logStream) processEvent(batch *eventBatch, bytes []byte, timestamp int64) {
  579. for len(bytes) > 0 {
  580. // Split line length so it does not exceed the maximum
  581. splitOffset, lineBytes := findValidSplit(string(bytes), maximumBytesPerEvent)
  582. line := bytes[:splitOffset]
  583. event := wrappedEvent{
  584. inputLogEvent: &cloudwatchlogs.InputLogEvent{
  585. Message: aws.String(string(line)),
  586. Timestamp: aws.Int64(timestamp),
  587. },
  588. insertOrder: batch.count(),
  589. }
  590. added := batch.add(event, lineBytes)
  591. if added {
  592. bytes = bytes[splitOffset:]
  593. } else {
  594. l.publishBatch(batch)
  595. batch.reset()
  596. }
  597. }
  598. }
  599. // effectiveLen counts the effective number of bytes in the string, after
  600. // UTF-8 normalization. UTF-8 normalization includes replacing bytes that do
  601. // not constitute valid UTF-8 encoded Unicode codepoints with the Unicode
  602. // replacement codepoint U+FFFD (a 3-byte UTF-8 sequence, represented in Go as
  603. // utf8.RuneError)
  604. func effectiveLen(line string) int {
  605. effectiveBytes := 0
  606. for _, rune := range line {
  607. effectiveBytes += utf8.RuneLen(rune)
  608. }
  609. return effectiveBytes
  610. }
  611. // findValidSplit finds the byte offset to split a string without breaking valid
  612. // Unicode codepoints given a maximum number of total bytes. findValidSplit
  613. // returns the byte offset for splitting a string or []byte, as well as the
  614. // effective number of bytes if the string were normalized to replace invalid
  615. // UTF-8 encoded bytes with the Unicode replacement character (a 3-byte UTF-8
  616. // sequence, represented in Go as utf8.RuneError)
  617. func findValidSplit(line string, maxBytes int) (splitOffset, effectiveBytes int) {
  618. for offset, rune := range line {
  619. splitOffset = offset
  620. if effectiveBytes+utf8.RuneLen(rune) > maxBytes {
  621. return splitOffset, effectiveBytes
  622. }
  623. effectiveBytes += utf8.RuneLen(rune)
  624. }
  625. splitOffset = len(line)
  626. return
  627. }
  628. // publishBatch calls PutLogEvents for a given set of InputLogEvents,
  629. // accounting for sequencing requirements (each request must reference the
  630. // sequence token returned by the previous request).
  631. func (l *logStream) publishBatch(batch *eventBatch) {
  632. if batch.isEmpty() {
  633. return
  634. }
  635. cwEvents := unwrapEvents(batch.events())
  636. nextSequenceToken, err := l.putLogEvents(cwEvents, l.sequenceToken)
  637. if err != nil {
  638. if awsErr, ok := err.(awserr.Error); ok {
  639. if awsErr.Code() == dataAlreadyAcceptedCode {
  640. // already submitted, just grab the correct sequence token
  641. parts := strings.Split(awsErr.Message(), " ")
  642. nextSequenceToken = &parts[len(parts)-1]
  643. logrus.WithFields(logrus.Fields{
  644. "errorCode": awsErr.Code(),
  645. "message": awsErr.Message(),
  646. "logGroupName": l.logGroupName,
  647. "logStreamName": l.logStreamName,
  648. }).Info("Data already accepted, ignoring error")
  649. err = nil
  650. } else if awsErr.Code() == invalidSequenceTokenCode {
  651. // sequence code is bad, grab the correct one and retry
  652. parts := strings.Split(awsErr.Message(), " ")
  653. token := parts[len(parts)-1]
  654. nextSequenceToken, err = l.putLogEvents(cwEvents, &token)
  655. }
  656. }
  657. }
  658. if err != nil {
  659. logrus.Error(err)
  660. } else {
  661. l.sequenceToken = nextSequenceToken
  662. }
  663. }
  664. // putLogEvents wraps the PutLogEvents API
  665. func (l *logStream) putLogEvents(events []*cloudwatchlogs.InputLogEvent, sequenceToken *string) (*string, error) {
  666. input := &cloudwatchlogs.PutLogEventsInput{
  667. LogEvents: events,
  668. SequenceToken: sequenceToken,
  669. LogGroupName: aws.String(l.logGroupName),
  670. LogStreamName: aws.String(l.logStreamName),
  671. }
  672. resp, err := l.client.PutLogEvents(input)
  673. if err != nil {
  674. if awsErr, ok := err.(awserr.Error); ok {
  675. logrus.WithFields(logrus.Fields{
  676. "errorCode": awsErr.Code(),
  677. "message": awsErr.Message(),
  678. "origError": awsErr.OrigErr(),
  679. "logGroupName": l.logGroupName,
  680. "logStreamName": l.logStreamName,
  681. }).Error("Failed to put log events")
  682. }
  683. return nil, err
  684. }
  685. return resp.NextSequenceToken, nil
  686. }
  687. // ValidateLogOpt looks for awslogs-specific log options awslogs-region, awslogs-endpoint
  688. // awslogs-group, awslogs-stream, awslogs-create-group, awslogs-datetime-format,
  689. // awslogs-multiline-pattern
  690. func ValidateLogOpt(cfg map[string]string) error {
  691. for key := range cfg {
  692. switch key {
  693. case logGroupKey:
  694. case logStreamKey:
  695. case logCreateGroupKey:
  696. case regionKey:
  697. case endpointKey:
  698. case tagKey:
  699. case datetimeFormatKey:
  700. case multilinePatternKey:
  701. case credentialsEndpointKey:
  702. case forceFlushIntervalKey:
  703. case maxBufferedEventsKey:
  704. case logFormatKey:
  705. default:
  706. return fmt.Errorf("unknown log opt '%s' for %s log driver", key, name)
  707. }
  708. }
  709. if cfg[logGroupKey] == "" {
  710. return fmt.Errorf("must specify a value for log opt '%s'", logGroupKey)
  711. }
  712. if cfg[logCreateGroupKey] != "" {
  713. if _, err := strconv.ParseBool(cfg[logCreateGroupKey]); err != nil {
  714. return fmt.Errorf("must specify valid value for log opt '%s': %v", logCreateGroupKey, err)
  715. }
  716. }
  717. if cfg[forceFlushIntervalKey] != "" {
  718. if value, err := strconv.Atoi(cfg[forceFlushIntervalKey]); err != nil || value <= 0 {
  719. return fmt.Errorf("must specify a positive integer for log opt '%s': %v", forceFlushIntervalKey, cfg[forceFlushIntervalKey])
  720. }
  721. }
  722. if cfg[maxBufferedEventsKey] != "" {
  723. if value, err := strconv.Atoi(cfg[maxBufferedEventsKey]); err != nil || value <= 0 {
  724. return fmt.Errorf("must specify a positive integer for log opt '%s': %v", maxBufferedEventsKey, cfg[maxBufferedEventsKey])
  725. }
  726. }
  727. _, datetimeFormatKeyExists := cfg[datetimeFormatKey]
  728. _, multilinePatternKeyExists := cfg[multilinePatternKey]
  729. if datetimeFormatKeyExists && multilinePatternKeyExists {
  730. return fmt.Errorf("you cannot configure log opt '%s' and '%s' at the same time", datetimeFormatKey, multilinePatternKey)
  731. }
  732. if cfg[logFormatKey] != "" {
  733. // For now, only the "json/emf" log format is supported
  734. if cfg[logFormatKey] != jsonEmfLogFormat {
  735. return fmt.Errorf("unsupported log format '%s'", cfg[logFormatKey])
  736. }
  737. if datetimeFormatKeyExists || multilinePatternKeyExists {
  738. return fmt.Errorf("you cannot configure log opt '%s' or '%s' when log opt '%s' is set to '%s'", datetimeFormatKey, multilinePatternKey, logFormatKey, jsonEmfLogFormat)
  739. }
  740. }
  741. return nil
  742. }
  743. // Len returns the length of a byTimestamp slice. Len is required by the
  744. // sort.Interface interface.
  745. func (slice byTimestamp) Len() int {
  746. return len(slice)
  747. }
  748. // Less compares two values in a byTimestamp slice by Timestamp. Less is
  749. // required by the sort.Interface interface.
  750. func (slice byTimestamp) Less(i, j int) bool {
  751. iTimestamp, jTimestamp := int64(0), int64(0)
  752. if slice != nil && slice[i].inputLogEvent.Timestamp != nil {
  753. iTimestamp = *slice[i].inputLogEvent.Timestamp
  754. }
  755. if slice != nil && slice[j].inputLogEvent.Timestamp != nil {
  756. jTimestamp = *slice[j].inputLogEvent.Timestamp
  757. }
  758. if iTimestamp == jTimestamp {
  759. return slice[i].insertOrder < slice[j].insertOrder
  760. }
  761. return iTimestamp < jTimestamp
  762. }
  763. // Swap swaps two values in a byTimestamp slice with each other. Swap is
  764. // required by the sort.Interface interface.
  765. func (slice byTimestamp) Swap(i, j int) {
  766. slice[i], slice[j] = slice[j], slice[i]
  767. }
  768. func unwrapEvents(events []wrappedEvent) []*cloudwatchlogs.InputLogEvent {
  769. cwEvents := make([]*cloudwatchlogs.InputLogEvent, len(events))
  770. for i, input := range events {
  771. cwEvents[i] = input.inputLogEvent
  772. }
  773. return cwEvents
  774. }
  775. func newEventBatch() *eventBatch {
  776. return &eventBatch{
  777. batch: make([]wrappedEvent, 0),
  778. bytes: 0,
  779. }
  780. }
  781. // events returns a slice of wrappedEvents sorted in order of their
  782. // timestamps and then by their insertion order (see `byTimestamp`).
  783. //
  784. // Warning: this method is not threadsafe and must not be used
  785. // concurrently.
  786. func (b *eventBatch) events() []wrappedEvent {
  787. sort.Sort(byTimestamp(b.batch))
  788. return b.batch
  789. }
  790. // add adds an event to the batch of events accounting for the
  791. // necessary overhead for an event to be logged. An error will be
  792. // returned if the event cannot be added to the batch due to service
  793. // limits.
  794. //
  795. // Warning: this method is not threadsafe and must not be used
  796. // concurrently.
  797. func (b *eventBatch) add(event wrappedEvent, size int) bool {
  798. addBytes := size + perEventBytes
  799. // verify we are still within service limits
  800. switch {
  801. case len(b.batch)+1 > maximumLogEventsPerPut:
  802. return false
  803. case b.bytes+addBytes > maximumBytesPerPut:
  804. return false
  805. }
  806. b.bytes += addBytes
  807. b.batch = append(b.batch, event)
  808. return true
  809. }
  810. // count is the number of batched events. Warning: this method
  811. // is not threadsafe and must not be used concurrently.
  812. func (b *eventBatch) count() int {
  813. return len(b.batch)
  814. }
  815. // size is the total number of bytes that the batch represents.
  816. //
  817. // Warning: this method is not threadsafe and must not be used
  818. // concurrently.
  819. func (b *eventBatch) size() int {
  820. return b.bytes
  821. }
  822. func (b *eventBatch) isEmpty() bool {
  823. zeroEvents := b.count() == 0
  824. zeroSize := b.size() == 0
  825. return zeroEvents && zeroSize
  826. }
  827. // reset prepares the batch for reuse.
  828. func (b *eventBatch) reset() {
  829. b.bytes = 0
  830. b.batch = b.batch[:0]
  831. }