cloudwatchlogs.go 22 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689
  1. // Package awslogs provides the logdriver for forwarding container logs to Amazon CloudWatch Logs
  2. package awslogs
  3. import (
  4. "fmt"
  5. "os"
  6. "regexp"
  7. "runtime"
  8. "sort"
  9. "strconv"
  10. "strings"
  11. "sync"
  12. "time"
  13. "github.com/aws/aws-sdk-go/aws"
  14. "github.com/aws/aws-sdk-go/aws/awserr"
  15. "github.com/aws/aws-sdk-go/aws/credentials/endpointcreds"
  16. "github.com/aws/aws-sdk-go/aws/ec2metadata"
  17. "github.com/aws/aws-sdk-go/aws/request"
  18. "github.com/aws/aws-sdk-go/aws/session"
  19. "github.com/aws/aws-sdk-go/service/cloudwatchlogs"
  20. "github.com/docker/docker/daemon/logger"
  21. "github.com/docker/docker/daemon/logger/loggerutils"
  22. "github.com/docker/docker/dockerversion"
  23. "github.com/pkg/errors"
  24. "github.com/sirupsen/logrus"
  25. )
  26. const (
  27. name = "awslogs"
  28. regionKey = "awslogs-region"
  29. regionEnvKey = "AWS_REGION"
  30. logGroupKey = "awslogs-group"
  31. logStreamKey = "awslogs-stream"
  32. logCreateGroupKey = "awslogs-create-group"
  33. tagKey = "tag"
  34. datetimeFormatKey = "awslogs-datetime-format"
  35. multilinePatternKey = "awslogs-multiline-pattern"
  36. credentialsEndpointKey = "awslogs-credentials-endpoint"
  37. batchPublishFrequency = 5 * time.Second
  38. // See: http://docs.aws.amazon.com/AmazonCloudWatchLogs/latest/APIReference/API_PutLogEvents.html
  39. perEventBytes = 26
  40. maximumBytesPerPut = 1048576
  41. maximumLogEventsPerPut = 10000
  42. // See: http://docs.aws.amazon.com/AmazonCloudWatch/latest/DeveloperGuide/cloudwatch_limits.html
  43. maximumBytesPerEvent = 262144 - perEventBytes
  44. resourceAlreadyExistsCode = "ResourceAlreadyExistsException"
  45. dataAlreadyAcceptedCode = "DataAlreadyAcceptedException"
  46. invalidSequenceTokenCode = "InvalidSequenceTokenException"
  47. resourceNotFoundCode = "ResourceNotFoundException"
  48. credentialsEndpoint = "http://169.254.170.2"
  49. userAgentHeader = "User-Agent"
  50. )
  51. type logStream struct {
  52. logStreamName string
  53. logGroupName string
  54. logCreateGroup bool
  55. multilinePattern *regexp.Regexp
  56. client api
  57. messages chan *logger.Message
  58. lock sync.RWMutex
  59. closed bool
  60. sequenceToken *string
  61. }
  62. type api interface {
  63. CreateLogGroup(*cloudwatchlogs.CreateLogGroupInput) (*cloudwatchlogs.CreateLogGroupOutput, error)
  64. CreateLogStream(*cloudwatchlogs.CreateLogStreamInput) (*cloudwatchlogs.CreateLogStreamOutput, error)
  65. PutLogEvents(*cloudwatchlogs.PutLogEventsInput) (*cloudwatchlogs.PutLogEventsOutput, error)
  66. }
  67. type regionFinder interface {
  68. Region() (string, error)
  69. }
  70. type wrappedEvent struct {
  71. inputLogEvent *cloudwatchlogs.InputLogEvent
  72. insertOrder int
  73. }
  74. type byTimestamp []wrappedEvent
  75. // init registers the awslogs driver
  76. func init() {
  77. if err := logger.RegisterLogDriver(name, New); err != nil {
  78. logrus.Fatal(err)
  79. }
  80. if err := logger.RegisterLogOptValidator(name, ValidateLogOpt); err != nil {
  81. logrus.Fatal(err)
  82. }
  83. }
  84. // eventBatch holds the events that are batched for submission and the
  85. // associated data about it.
  86. //
  87. // Warning: this type is not threadsafe and must not be used
  88. // concurrently. This type is expected to be consumed in a single go
  89. // routine and never concurrently.
  90. type eventBatch struct {
  91. batch []wrappedEvent
  92. bytes int
  93. }
  94. // New creates an awslogs logger using the configuration passed in on the
  95. // context. Supported context configuration variables are awslogs-region,
  96. // awslogs-group, awslogs-stream, awslogs-create-group, awslogs-multiline-pattern
  97. // and awslogs-datetime-format. When available, configuration is
  98. // also taken from environment variables AWS_REGION, AWS_ACCESS_KEY_ID,
  99. // AWS_SECRET_ACCESS_KEY, the shared credentials file (~/.aws/credentials), and
  100. // the EC2 Instance Metadata Service.
  101. func New(info logger.Info) (logger.Logger, error) {
  102. logGroupName := info.Config[logGroupKey]
  103. logStreamName, err := loggerutils.ParseLogTag(info, "{{.FullID}}")
  104. if err != nil {
  105. return nil, err
  106. }
  107. logCreateGroup := false
  108. if info.Config[logCreateGroupKey] != "" {
  109. logCreateGroup, err = strconv.ParseBool(info.Config[logCreateGroupKey])
  110. if err != nil {
  111. return nil, err
  112. }
  113. }
  114. if info.Config[logStreamKey] != "" {
  115. logStreamName = info.Config[logStreamKey]
  116. }
  117. multilinePattern, err := parseMultilineOptions(info)
  118. if err != nil {
  119. return nil, err
  120. }
  121. client, err := newAWSLogsClient(info)
  122. if err != nil {
  123. return nil, err
  124. }
  125. containerStream := &logStream{
  126. logStreamName: logStreamName,
  127. logGroupName: logGroupName,
  128. logCreateGroup: logCreateGroup,
  129. multilinePattern: multilinePattern,
  130. client: client,
  131. messages: make(chan *logger.Message, 4096),
  132. }
  133. err = containerStream.create()
  134. if err != nil {
  135. return nil, err
  136. }
  137. go containerStream.collectBatch()
  138. return containerStream, nil
  139. }
  140. // Parses awslogs-multiline-pattern and awslogs-datetime-format options
  141. // If awslogs-datetime-format is present, convert the format from strftime
  142. // to regexp and return.
  143. // If awslogs-multiline-pattern is present, compile regexp and return
  144. func parseMultilineOptions(info logger.Info) (*regexp.Regexp, error) {
  145. dateTimeFormat := info.Config[datetimeFormatKey]
  146. multilinePatternKey := info.Config[multilinePatternKey]
  147. // strftime input is parsed into a regular expression
  148. if dateTimeFormat != "" {
  149. // %. matches each strftime format sequence and ReplaceAllStringFunc
  150. // looks up each format sequence in the conversion table strftimeToRegex
  151. // to replace with a defined regular expression
  152. r := regexp.MustCompile("%.")
  153. multilinePatternKey = r.ReplaceAllStringFunc(dateTimeFormat, func(s string) string {
  154. return strftimeToRegex[s]
  155. })
  156. }
  157. if multilinePatternKey != "" {
  158. multilinePattern, err := regexp.Compile(multilinePatternKey)
  159. if err != nil {
  160. return nil, errors.Wrapf(err, "awslogs could not parse multiline pattern key %q", multilinePatternKey)
  161. }
  162. return multilinePattern, nil
  163. }
  164. return nil, nil
  165. }
  166. // Maps strftime format strings to regex
  167. var strftimeToRegex = map[string]string{
  168. /*weekdayShort */ `%a`: `(?:Mon|Tue|Wed|Thu|Fri|Sat|Sun)`,
  169. /*weekdayFull */ `%A`: `(?:Monday|Tuesday|Wednesday|Thursday|Friday|Saturday|Sunday)`,
  170. /*weekdayZeroIndex */ `%w`: `[0-6]`,
  171. /*dayZeroPadded */ `%d`: `(?:0[1-9]|[1,2][0-9]|3[0,1])`,
  172. /*monthShort */ `%b`: `(?:Jan|Feb|Mar|Apr|May|Jun|Jul|Aug|Sep|Oct|Nov|Dec)`,
  173. /*monthFull */ `%B`: `(?:January|February|March|April|May|June|July|August|September|October|November|December)`,
  174. /*monthZeroPadded */ `%m`: `(?:0[1-9]|1[0-2])`,
  175. /*yearCentury */ `%Y`: `\d{4}`,
  176. /*yearZeroPadded */ `%y`: `\d{2}`,
  177. /*hour24ZeroPadded */ `%H`: `(?:[0,1][0-9]|2[0-3])`,
  178. /*hour12ZeroPadded */ `%I`: `(?:0[0-9]|1[0-2])`,
  179. /*AM or PM */ `%p`: "[A,P]M",
  180. /*minuteZeroPadded */ `%M`: `[0-5][0-9]`,
  181. /*secondZeroPadded */ `%S`: `[0-5][0-9]`,
  182. /*microsecondZeroPadded */ `%f`: `\d{6}`,
  183. /*utcOffset */ `%z`: `[+-]\d{4}`,
  184. /*tzName */ `%Z`: `[A-Z]{1,4}T`,
  185. /*dayOfYearZeroPadded */ `%j`: `(?:0[0-9][1-9]|[1,2][0-9][0-9]|3[0-5][0-9]|36[0-6])`,
  186. /*milliseconds */ `%L`: `\.\d{3}`,
  187. }
  188. // newRegionFinder is a variable such that the implementation
  189. // can be swapped out for unit tests.
  190. var newRegionFinder = func() regionFinder {
  191. return ec2metadata.New(session.New())
  192. }
  193. // newSDKEndpoint is a variable such that the implementation
  194. // can be swapped out for unit tests.
  195. var newSDKEndpoint = credentialsEndpoint
  196. // newAWSLogsClient creates the service client for Amazon CloudWatch Logs.
  197. // Customizations to the default client from the SDK include a Docker-specific
  198. // User-Agent string and automatic region detection using the EC2 Instance
  199. // Metadata Service when region is otherwise unspecified.
  200. func newAWSLogsClient(info logger.Info) (api, error) {
  201. var region *string
  202. if os.Getenv(regionEnvKey) != "" {
  203. region = aws.String(os.Getenv(regionEnvKey))
  204. }
  205. if info.Config[regionKey] != "" {
  206. region = aws.String(info.Config[regionKey])
  207. }
  208. if region == nil || *region == "" {
  209. logrus.Info("Trying to get region from EC2 Metadata")
  210. ec2MetadataClient := newRegionFinder()
  211. r, err := ec2MetadataClient.Region()
  212. if err != nil {
  213. logrus.WithFields(logrus.Fields{
  214. "error": err,
  215. }).Error("Could not get region from EC2 metadata, environment, or log option")
  216. return nil, errors.New("Cannot determine region for awslogs driver")
  217. }
  218. region = &r
  219. }
  220. sess, err := session.NewSession()
  221. if err != nil {
  222. return nil, errors.New("Failed to create a service client session for for awslogs driver")
  223. }
  224. // attach region to cloudwatchlogs config
  225. sess.Config.Region = region
  226. if uri, ok := info.Config[credentialsEndpointKey]; ok {
  227. logrus.Debugf("Trying to get credentials from awslogs-credentials-endpoint")
  228. endpoint := fmt.Sprintf("%s%s", newSDKEndpoint, uri)
  229. creds := endpointcreds.NewCredentialsClient(*sess.Config, sess.Handlers, endpoint,
  230. func(p *endpointcreds.Provider) {
  231. p.ExpiryWindow = 5 * time.Minute
  232. })
  233. // attach credentials to cloudwatchlogs config
  234. sess.Config.Credentials = creds
  235. }
  236. logrus.WithFields(logrus.Fields{
  237. "region": *region,
  238. }).Debug("Created awslogs client")
  239. client := cloudwatchlogs.New(sess)
  240. client.Handlers.Build.PushBackNamed(request.NamedHandler{
  241. Name: "DockerUserAgentHandler",
  242. Fn: func(r *request.Request) {
  243. currentAgent := r.HTTPRequest.Header.Get(userAgentHeader)
  244. r.HTTPRequest.Header.Set(userAgentHeader,
  245. fmt.Sprintf("Docker %s (%s) %s",
  246. dockerversion.Version, runtime.GOOS, currentAgent))
  247. },
  248. })
  249. return client, nil
  250. }
  251. // Name returns the name of the awslogs logging driver
  252. func (l *logStream) Name() string {
  253. return name
  254. }
  255. func (l *logStream) BufSize() int {
  256. return maximumBytesPerEvent
  257. }
  258. // Log submits messages for logging by an instance of the awslogs logging driver
  259. func (l *logStream) Log(msg *logger.Message) error {
  260. l.lock.RLock()
  261. defer l.lock.RUnlock()
  262. if !l.closed {
  263. l.messages <- msg
  264. }
  265. return nil
  266. }
  267. // Close closes the instance of the awslogs logging driver
  268. func (l *logStream) Close() error {
  269. l.lock.Lock()
  270. defer l.lock.Unlock()
  271. if !l.closed {
  272. close(l.messages)
  273. }
  274. l.closed = true
  275. return nil
  276. }
  277. // create creates log group and log stream for the instance of the awslogs logging driver
  278. func (l *logStream) create() error {
  279. if err := l.createLogStream(); err != nil {
  280. if l.logCreateGroup {
  281. if awsErr, ok := err.(awserr.Error); ok && awsErr.Code() == resourceNotFoundCode {
  282. if err := l.createLogGroup(); err != nil {
  283. return err
  284. }
  285. return l.createLogStream()
  286. }
  287. }
  288. return err
  289. }
  290. return nil
  291. }
  292. // createLogGroup creates a log group for the instance of the awslogs logging driver
  293. func (l *logStream) createLogGroup() error {
  294. if _, err := l.client.CreateLogGroup(&cloudwatchlogs.CreateLogGroupInput{
  295. LogGroupName: aws.String(l.logGroupName),
  296. }); err != nil {
  297. if awsErr, ok := err.(awserr.Error); ok {
  298. fields := logrus.Fields{
  299. "errorCode": awsErr.Code(),
  300. "message": awsErr.Message(),
  301. "origError": awsErr.OrigErr(),
  302. "logGroupName": l.logGroupName,
  303. "logCreateGroup": l.logCreateGroup,
  304. }
  305. if awsErr.Code() == resourceAlreadyExistsCode {
  306. // Allow creation to succeed
  307. logrus.WithFields(fields).Info("Log group already exists")
  308. return nil
  309. }
  310. logrus.WithFields(fields).Error("Failed to create log group")
  311. }
  312. return err
  313. }
  314. return nil
  315. }
  316. // createLogStream creates a log stream for the instance of the awslogs logging driver
  317. func (l *logStream) createLogStream() error {
  318. input := &cloudwatchlogs.CreateLogStreamInput{
  319. LogGroupName: aws.String(l.logGroupName),
  320. LogStreamName: aws.String(l.logStreamName),
  321. }
  322. _, err := l.client.CreateLogStream(input)
  323. if err != nil {
  324. if awsErr, ok := err.(awserr.Error); ok {
  325. fields := logrus.Fields{
  326. "errorCode": awsErr.Code(),
  327. "message": awsErr.Message(),
  328. "origError": awsErr.OrigErr(),
  329. "logGroupName": l.logGroupName,
  330. "logStreamName": l.logStreamName,
  331. }
  332. if awsErr.Code() == resourceAlreadyExistsCode {
  333. // Allow creation to succeed
  334. logrus.WithFields(fields).Info("Log stream already exists")
  335. return nil
  336. }
  337. logrus.WithFields(fields).Error("Failed to create log stream")
  338. }
  339. }
  340. return err
  341. }
  342. // newTicker is used for time-based batching. newTicker is a variable such
  343. // that the implementation can be swapped out for unit tests.
  344. var newTicker = func(freq time.Duration) *time.Ticker {
  345. return time.NewTicker(freq)
  346. }
  347. // collectBatch executes as a goroutine to perform batching of log events for
  348. // submission to the log stream. If the awslogs-multiline-pattern or
  349. // awslogs-datetime-format options have been configured, multiline processing
  350. // is enabled, where log messages are stored in an event buffer until a multiline
  351. // pattern match is found, at which point the messages in the event buffer are
  352. // pushed to CloudWatch logs as a single log event. Multiline messages are processed
  353. // according to the maximumBytesPerPut constraint, and the implementation only
  354. // allows for messages to be buffered for a maximum of 2*batchPublishFrequency
  355. // seconds. When events are ready to be processed for submission to CloudWatch
  356. // Logs, the processEvents method is called. If a multiline pattern is not
  357. // configured, log events are submitted to the processEvents method immediately.
  358. func (l *logStream) collectBatch() {
  359. ticker := newTicker(batchPublishFrequency)
  360. var eventBuffer []byte
  361. var eventBufferTimestamp int64
  362. var batch = newEventBatch()
  363. for {
  364. select {
  365. case t := <-ticker.C:
  366. // If event buffer is older than batch publish frequency flush the event buffer
  367. if eventBufferTimestamp > 0 && len(eventBuffer) > 0 {
  368. eventBufferAge := t.UnixNano()/int64(time.Millisecond) - eventBufferTimestamp
  369. eventBufferExpired := eventBufferAge > int64(batchPublishFrequency)/int64(time.Millisecond)
  370. eventBufferNegative := eventBufferAge < 0
  371. if eventBufferExpired || eventBufferNegative {
  372. l.processEvent(batch, eventBuffer, eventBufferTimestamp)
  373. eventBuffer = eventBuffer[:0]
  374. }
  375. }
  376. l.publishBatch(batch)
  377. batch.reset()
  378. case msg, more := <-l.messages:
  379. if !more {
  380. // Flush event buffer and release resources
  381. l.processEvent(batch, eventBuffer, eventBufferTimestamp)
  382. eventBuffer = eventBuffer[:0]
  383. l.publishBatch(batch)
  384. batch.reset()
  385. return
  386. }
  387. if eventBufferTimestamp == 0 {
  388. eventBufferTimestamp = msg.Timestamp.UnixNano() / int64(time.Millisecond)
  389. }
  390. unprocessedLine := msg.Line
  391. if l.multilinePattern != nil {
  392. if l.multilinePattern.Match(unprocessedLine) || len(eventBuffer)+len(unprocessedLine) > maximumBytesPerEvent {
  393. // This is a new log event or we will exceed max bytes per event
  394. // so flush the current eventBuffer to events and reset timestamp
  395. l.processEvent(batch, eventBuffer, eventBufferTimestamp)
  396. eventBufferTimestamp = msg.Timestamp.UnixNano() / int64(time.Millisecond)
  397. eventBuffer = eventBuffer[:0]
  398. }
  399. // Append new line
  400. processedLine := append(unprocessedLine, "\n"...)
  401. eventBuffer = append(eventBuffer, processedLine...)
  402. logger.PutMessage(msg)
  403. } else {
  404. l.processEvent(batch, unprocessedLine, msg.Timestamp.UnixNano()/int64(time.Millisecond))
  405. logger.PutMessage(msg)
  406. }
  407. }
  408. }
  409. }
  410. // processEvent processes log events that are ready for submission to CloudWatch
  411. // logs. Batching is performed on time- and size-bases. Time-based batching
  412. // occurs at a 5 second interval (defined in the batchPublishFrequency const).
  413. // Size-based batching is performed on the maximum number of events per batch
  414. // (defined in maximumLogEventsPerPut) and the maximum number of total bytes in a
  415. // batch (defined in maximumBytesPerPut). Log messages are split by the maximum
  416. // bytes per event (defined in maximumBytesPerEvent). There is a fixed per-event
  417. // byte overhead (defined in perEventBytes) which is accounted for in split- and
  418. // batch-calculations.
  419. func (l *logStream) processEvent(batch *eventBatch, unprocessedLine []byte, timestamp int64) {
  420. for len(unprocessedLine) > 0 {
  421. // Split line length so it does not exceed the maximum
  422. lineBytes := len(unprocessedLine)
  423. if lineBytes > maximumBytesPerEvent {
  424. lineBytes = maximumBytesPerEvent
  425. }
  426. line := unprocessedLine[:lineBytes]
  427. event := wrappedEvent{
  428. inputLogEvent: &cloudwatchlogs.InputLogEvent{
  429. Message: aws.String(string(line)),
  430. Timestamp: aws.Int64(timestamp),
  431. },
  432. insertOrder: batch.count(),
  433. }
  434. added := batch.add(event, lineBytes)
  435. if added {
  436. unprocessedLine = unprocessedLine[lineBytes:]
  437. } else {
  438. l.publishBatch(batch)
  439. batch.reset()
  440. }
  441. }
  442. }
  443. // publishBatch calls PutLogEvents for a given set of InputLogEvents,
  444. // accounting for sequencing requirements (each request must reference the
  445. // sequence token returned by the previous request).
  446. func (l *logStream) publishBatch(batch *eventBatch) {
  447. if batch.isEmpty() {
  448. return
  449. }
  450. cwEvents := unwrapEvents(batch.events())
  451. nextSequenceToken, err := l.putLogEvents(cwEvents, l.sequenceToken)
  452. if err != nil {
  453. if awsErr, ok := err.(awserr.Error); ok {
  454. if awsErr.Code() == dataAlreadyAcceptedCode {
  455. // already submitted, just grab the correct sequence token
  456. parts := strings.Split(awsErr.Message(), " ")
  457. nextSequenceToken = &parts[len(parts)-1]
  458. logrus.WithFields(logrus.Fields{
  459. "errorCode": awsErr.Code(),
  460. "message": awsErr.Message(),
  461. "logGroupName": l.logGroupName,
  462. "logStreamName": l.logStreamName,
  463. }).Info("Data already accepted, ignoring error")
  464. err = nil
  465. } else if awsErr.Code() == invalidSequenceTokenCode {
  466. // sequence code is bad, grab the correct one and retry
  467. parts := strings.Split(awsErr.Message(), " ")
  468. token := parts[len(parts)-1]
  469. nextSequenceToken, err = l.putLogEvents(cwEvents, &token)
  470. }
  471. }
  472. }
  473. if err != nil {
  474. logrus.Error(err)
  475. } else {
  476. l.sequenceToken = nextSequenceToken
  477. }
  478. }
  479. // putLogEvents wraps the PutLogEvents API
  480. func (l *logStream) putLogEvents(events []*cloudwatchlogs.InputLogEvent, sequenceToken *string) (*string, error) {
  481. input := &cloudwatchlogs.PutLogEventsInput{
  482. LogEvents: events,
  483. SequenceToken: sequenceToken,
  484. LogGroupName: aws.String(l.logGroupName),
  485. LogStreamName: aws.String(l.logStreamName),
  486. }
  487. resp, err := l.client.PutLogEvents(input)
  488. if err != nil {
  489. if awsErr, ok := err.(awserr.Error); ok {
  490. logrus.WithFields(logrus.Fields{
  491. "errorCode": awsErr.Code(),
  492. "message": awsErr.Message(),
  493. "origError": awsErr.OrigErr(),
  494. "logGroupName": l.logGroupName,
  495. "logStreamName": l.logStreamName,
  496. }).Error("Failed to put log events")
  497. }
  498. return nil, err
  499. }
  500. return resp.NextSequenceToken, nil
  501. }
  502. // ValidateLogOpt looks for awslogs-specific log options awslogs-region,
  503. // awslogs-group, awslogs-stream, awslogs-create-group, awslogs-datetime-format,
  504. // awslogs-multiline-pattern
  505. func ValidateLogOpt(cfg map[string]string) error {
  506. for key := range cfg {
  507. switch key {
  508. case logGroupKey:
  509. case logStreamKey:
  510. case logCreateGroupKey:
  511. case regionKey:
  512. case tagKey:
  513. case datetimeFormatKey:
  514. case multilinePatternKey:
  515. case credentialsEndpointKey:
  516. default:
  517. return fmt.Errorf("unknown log opt '%s' for %s log driver", key, name)
  518. }
  519. }
  520. if cfg[logGroupKey] == "" {
  521. return fmt.Errorf("must specify a value for log opt '%s'", logGroupKey)
  522. }
  523. if cfg[logCreateGroupKey] != "" {
  524. if _, err := strconv.ParseBool(cfg[logCreateGroupKey]); err != nil {
  525. return fmt.Errorf("must specify valid value for log opt '%s': %v", logCreateGroupKey, err)
  526. }
  527. }
  528. _, datetimeFormatKeyExists := cfg[datetimeFormatKey]
  529. _, multilinePatternKeyExists := cfg[multilinePatternKey]
  530. if datetimeFormatKeyExists && multilinePatternKeyExists {
  531. return fmt.Errorf("you cannot configure log opt '%s' and '%s' at the same time", datetimeFormatKey, multilinePatternKey)
  532. }
  533. return nil
  534. }
  535. // Len returns the length of a byTimestamp slice. Len is required by the
  536. // sort.Interface interface.
  537. func (slice byTimestamp) Len() int {
  538. return len(slice)
  539. }
  540. // Less compares two values in a byTimestamp slice by Timestamp. Less is
  541. // required by the sort.Interface interface.
  542. func (slice byTimestamp) Less(i, j int) bool {
  543. iTimestamp, jTimestamp := int64(0), int64(0)
  544. if slice != nil && slice[i].inputLogEvent.Timestamp != nil {
  545. iTimestamp = *slice[i].inputLogEvent.Timestamp
  546. }
  547. if slice != nil && slice[j].inputLogEvent.Timestamp != nil {
  548. jTimestamp = *slice[j].inputLogEvent.Timestamp
  549. }
  550. if iTimestamp == jTimestamp {
  551. return slice[i].insertOrder < slice[j].insertOrder
  552. }
  553. return iTimestamp < jTimestamp
  554. }
  555. // Swap swaps two values in a byTimestamp slice with each other. Swap is
  556. // required by the sort.Interface interface.
  557. func (slice byTimestamp) Swap(i, j int) {
  558. slice[i], slice[j] = slice[j], slice[i]
  559. }
  560. func unwrapEvents(events []wrappedEvent) []*cloudwatchlogs.InputLogEvent {
  561. cwEvents := make([]*cloudwatchlogs.InputLogEvent, len(events))
  562. for i, input := range events {
  563. cwEvents[i] = input.inputLogEvent
  564. }
  565. return cwEvents
  566. }
  567. func newEventBatch() *eventBatch {
  568. return &eventBatch{
  569. batch: make([]wrappedEvent, 0),
  570. bytes: 0,
  571. }
  572. }
  573. // events returns a slice of wrappedEvents sorted in order of their
  574. // timestamps and then by their insertion order (see `byTimestamp`).
  575. //
  576. // Warning: this method is not threadsafe and must not be used
  577. // concurrently.
  578. func (b *eventBatch) events() []wrappedEvent {
  579. sort.Sort(byTimestamp(b.batch))
  580. return b.batch
  581. }
  582. // add adds an event to the batch of events accounting for the
  583. // necessary overhead for an event to be logged. An error will be
  584. // returned if the event cannot be added to the batch due to service
  585. // limits.
  586. //
  587. // Warning: this method is not threadsafe and must not be used
  588. // concurrently.
  589. func (b *eventBatch) add(event wrappedEvent, size int) bool {
  590. addBytes := size + perEventBytes
  591. // verify we are still within service limits
  592. switch {
  593. case len(b.batch)+1 > maximumLogEventsPerPut:
  594. return false
  595. case b.bytes+addBytes > maximumBytesPerPut:
  596. return false
  597. }
  598. b.bytes += addBytes
  599. b.batch = append(b.batch, event)
  600. return true
  601. }
  602. // count is the number of batched events. Warning: this method
  603. // is not threadsafe and must not be used concurrently.
  604. func (b *eventBatch) count() int {
  605. return len(b.batch)
  606. }
  607. // size is the total number of bytes that the batch represents.
  608. //
  609. // Warning: this method is not threadsafe and must not be used
  610. // concurrently.
  611. func (b *eventBatch) size() int {
  612. return b.bytes
  613. }
  614. func (b *eventBatch) isEmpty() bool {
  615. zeroEvents := b.count() == 0
  616. zeroSize := b.size() == 0
  617. return zeroEvents && zeroSize
  618. }
  619. // reset prepares the batch for reuse.
  620. func (b *eventBatch) reset() {
  621. b.bytes = 0
  622. b.batch = b.batch[:0]
  623. }