cloudwatchlogs.go 20 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598
  1. // Package awslogs provides the logdriver for forwarding container logs to Amazon CloudWatch Logs
  2. package awslogs
  3. import (
  4. "bytes"
  5. "fmt"
  6. "os"
  7. "regexp"
  8. "runtime"
  9. "sort"
  10. "strconv"
  11. "strings"
  12. "sync"
  13. "time"
  14. "github.com/Sirupsen/logrus"
  15. "github.com/aws/aws-sdk-go/aws"
  16. "github.com/aws/aws-sdk-go/aws/awserr"
  17. "github.com/aws/aws-sdk-go/aws/ec2metadata"
  18. "github.com/aws/aws-sdk-go/aws/request"
  19. "github.com/aws/aws-sdk-go/aws/session"
  20. "github.com/aws/aws-sdk-go/service/cloudwatchlogs"
  21. "github.com/docker/docker/daemon/logger"
  22. "github.com/docker/docker/daemon/logger/loggerutils"
  23. "github.com/docker/docker/dockerversion"
  24. "github.com/docker/docker/pkg/templates"
  25. "github.com/pkg/errors"
  26. )
  27. const (
  28. name = "awslogs"
  29. regionKey = "awslogs-region"
  30. regionEnvKey = "AWS_REGION"
  31. logGroupKey = "awslogs-group"
  32. logStreamKey = "awslogs-stream"
  33. logCreateGroupKey = "awslogs-create-group"
  34. tagKey = "tag"
  35. datetimeFormatKey = "awslogs-datetime-format"
  36. multilinePatternKey = "awslogs-multiline-pattern"
  37. batchPublishFrequency = 5 * time.Second
  38. // See: http://docs.aws.amazon.com/AmazonCloudWatchLogs/latest/APIReference/API_PutLogEvents.html
  39. perEventBytes = 26
  40. maximumBytesPerPut = 1048576
  41. maximumLogEventsPerPut = 10000
  42. // See: http://docs.aws.amazon.com/AmazonCloudWatch/latest/DeveloperGuide/cloudwatch_limits.html
  43. maximumBytesPerEvent = 262144 - perEventBytes
  44. resourceAlreadyExistsCode = "ResourceAlreadyExistsException"
  45. dataAlreadyAcceptedCode = "DataAlreadyAcceptedException"
  46. invalidSequenceTokenCode = "InvalidSequenceTokenException"
  47. resourceNotFoundCode = "ResourceNotFoundException"
  48. userAgentHeader = "User-Agent"
  49. )
  50. type logStream struct {
  51. logStreamName string
  52. logGroupName string
  53. logCreateGroup bool
  54. multilinePattern *regexp.Regexp
  55. client api
  56. messages chan *logger.Message
  57. lock sync.RWMutex
  58. closed bool
  59. sequenceToken *string
  60. }
  61. type api interface {
  62. CreateLogGroup(*cloudwatchlogs.CreateLogGroupInput) (*cloudwatchlogs.CreateLogGroupOutput, error)
  63. CreateLogStream(*cloudwatchlogs.CreateLogStreamInput) (*cloudwatchlogs.CreateLogStreamOutput, error)
  64. PutLogEvents(*cloudwatchlogs.PutLogEventsInput) (*cloudwatchlogs.PutLogEventsOutput, error)
  65. }
  66. type regionFinder interface {
  67. Region() (string, error)
  68. }
  69. type wrappedEvent struct {
  70. inputLogEvent *cloudwatchlogs.InputLogEvent
  71. insertOrder int
  72. }
  73. type byTimestamp []wrappedEvent
  74. // init registers the awslogs driver
  75. func init() {
  76. if err := logger.RegisterLogDriver(name, New); err != nil {
  77. logrus.Fatal(err)
  78. }
  79. if err := logger.RegisterLogOptValidator(name, ValidateLogOpt); err != nil {
  80. logrus.Fatal(err)
  81. }
  82. }
  83. // New creates an awslogs logger using the configuration passed in on the
  84. // context. Supported context configuration variables are awslogs-region,
  85. // awslogs-group, awslogs-stream, awslogs-create-group, awslogs-multiline-pattern
  86. // and awslogs-datetime-format. When available, configuration is
  87. // also taken from environment variables AWS_REGION, AWS_ACCESS_KEY_ID,
  88. // AWS_SECRET_ACCESS_KEY, the shared credentials file (~/.aws/credentials), and
  89. // the EC2 Instance Metadata Service.
  90. func New(info logger.Info) (logger.Logger, error) {
  91. logGroupName := info.Config[logGroupKey]
  92. logStreamName, err := loggerutils.ParseLogTag(info, "{{.FullID}}")
  93. if err != nil {
  94. return nil, err
  95. }
  96. logCreateGroup := false
  97. if info.Config[logCreateGroupKey] != "" {
  98. logCreateGroup, err = strconv.ParseBool(info.Config[logCreateGroupKey])
  99. if err != nil {
  100. return nil, err
  101. }
  102. }
  103. if info.Config[logStreamKey] != "" {
  104. logStreamName = info.Config[logStreamKey]
  105. }
  106. multilinePattern, err := parseMultilineOptions(info)
  107. if err != nil {
  108. return nil, err
  109. }
  110. client, err := newAWSLogsClient(info)
  111. if err != nil {
  112. return nil, err
  113. }
  114. containerStream := &logStream{
  115. logStreamName: logStreamName,
  116. logGroupName: logGroupName,
  117. logCreateGroup: logCreateGroup,
  118. multilinePattern: multilinePattern,
  119. client: client,
  120. messages: make(chan *logger.Message, 4096),
  121. }
  122. err = containerStream.create()
  123. if err != nil {
  124. return nil, err
  125. }
  126. go containerStream.collectBatch()
  127. return containerStream, nil
  128. }
  129. // Parses awslogs-multiline-pattern and awslogs-datetime-format options
  130. // If awslogs-datetime-format is present, convert the format from strftime
  131. // to regexp and return.
  132. // If awslogs-multiline-pattern is present, compile regexp and return
  133. func parseMultilineOptions(info logger.Info) (*regexp.Regexp, error) {
  134. dateTimeFormat := info.Config[datetimeFormatKey]
  135. multilinePatternKey := info.Config[multilinePatternKey]
  136. // strftime input is parsed into a regular expression
  137. if dateTimeFormat != "" {
  138. // %. matches each strftime format sequence and ReplaceAllStringFunc
  139. // looks up each format sequence in the conversion table strftimeToRegex
  140. // to replace with a defined regular expression
  141. r := regexp.MustCompile("%.")
  142. multilinePatternKey = r.ReplaceAllStringFunc(dateTimeFormat, func(s string) string {
  143. return strftimeToRegex[s]
  144. })
  145. }
  146. if multilinePatternKey != "" {
  147. multilinePattern, err := regexp.Compile(multilinePatternKey)
  148. if err != nil {
  149. return nil, errors.Wrapf(err, "awslogs could not parse multiline pattern key %q", multilinePatternKey)
  150. }
  151. return multilinePattern, nil
  152. }
  153. return nil, nil
  154. }
  155. // Maps strftime format strings to regex
  156. var strftimeToRegex = map[string]string{
  157. /*weekdayShort */ `%a`: `(?:Mon|Tue|Wed|Thu|Fri|Sat|Sun)`,
  158. /*weekdayFull */ `%A`: `(?:Monday|Tuesday|Wednesday|Thursday|Friday|Saturday|Sunday)`,
  159. /*weekdayZeroIndex */ `%w`: `[0-6]`,
  160. /*dayZeroPadded */ `%d`: `(?:0[1-9]|[1,2][0-9]|3[0,1])`,
  161. /*monthShort */ `%b`: `(?:Jan|Feb|Mar|Apr|May|Jun|Jul|Aug|Sep|Oct|Nov|Dec)`,
  162. /*monthFull */ `%B`: `(?:January|February|March|April|May|June|July|August|September|October|November|December)`,
  163. /*monthZeroPadded */ `%m`: `(?:0[1-9]|1[0-2])`,
  164. /*yearCentury */ `%Y`: `\d{4}`,
  165. /*yearZeroPadded */ `%y`: `\d{2}`,
  166. /*hour24ZeroPadded */ `%H`: `(?:[0,1][0-9]|2[0-3])`,
  167. /*hour12ZeroPadded */ `%I`: `(?:0[0-9]|1[0-2])`,
  168. /*AM or PM */ `%p`: "[A,P]M",
  169. /*minuteZeroPadded */ `%M`: `[0-5][0-9]`,
  170. /*secondZeroPadded */ `%S`: `[0-5][0-9]`,
  171. /*microsecondZeroPadded */ `%f`: `\d{6}`,
  172. /*utcOffset */ `%z`: `[+-]\d{4}`,
  173. /*tzName */ `%Z`: `[A-Z]{1,4}T`,
  174. /*dayOfYearZeroPadded */ `%j`: `(?:0[0-9][1-9]|[1,2][0-9][0-9]|3[0-5][0-9]|36[0-6])`,
  175. /*milliseconds */ `%L`: `\.\d{3}`,
  176. }
  177. func parseLogGroup(info logger.Info, groupTemplate string) (string, error) {
  178. tmpl, err := templates.NewParse("log-group", groupTemplate)
  179. if err != nil {
  180. return "", err
  181. }
  182. buf := new(bytes.Buffer)
  183. if err := tmpl.Execute(buf, &info); err != nil {
  184. return "", err
  185. }
  186. return buf.String(), nil
  187. }
  188. // newRegionFinder is a variable such that the implementation
  189. // can be swapped out for unit tests.
  190. var newRegionFinder = func() regionFinder {
  191. return ec2metadata.New(session.New())
  192. }
  193. // newAWSLogsClient creates the service client for Amazon CloudWatch Logs.
  194. // Customizations to the default client from the SDK include a Docker-specific
  195. // User-Agent string and automatic region detection using the EC2 Instance
  196. // Metadata Service when region is otherwise unspecified.
  197. func newAWSLogsClient(info logger.Info) (api, error) {
  198. var region *string
  199. if os.Getenv(regionEnvKey) != "" {
  200. region = aws.String(os.Getenv(regionEnvKey))
  201. }
  202. if info.Config[regionKey] != "" {
  203. region = aws.String(info.Config[regionKey])
  204. }
  205. if region == nil || *region == "" {
  206. logrus.Info("Trying to get region from EC2 Metadata")
  207. ec2MetadataClient := newRegionFinder()
  208. r, err := ec2MetadataClient.Region()
  209. if err != nil {
  210. logrus.WithFields(logrus.Fields{
  211. "error": err,
  212. }).Error("Could not get region from EC2 metadata, environment, or log option")
  213. return nil, errors.New("Cannot determine region for awslogs driver")
  214. }
  215. region = &r
  216. }
  217. logrus.WithFields(logrus.Fields{
  218. "region": *region,
  219. }).Debug("Created awslogs client")
  220. client := cloudwatchlogs.New(session.New(), aws.NewConfig().WithRegion(*region))
  221. client.Handlers.Build.PushBackNamed(request.NamedHandler{
  222. Name: "DockerUserAgentHandler",
  223. Fn: func(r *request.Request) {
  224. currentAgent := r.HTTPRequest.Header.Get(userAgentHeader)
  225. r.HTTPRequest.Header.Set(userAgentHeader,
  226. fmt.Sprintf("Docker %s (%s) %s",
  227. dockerversion.Version, runtime.GOOS, currentAgent))
  228. },
  229. })
  230. return client, nil
  231. }
  232. // Name returns the name of the awslogs logging driver
  233. func (l *logStream) Name() string {
  234. return name
  235. }
  236. // Log submits messages for logging by an instance of the awslogs logging driver
  237. func (l *logStream) Log(msg *logger.Message) error {
  238. l.lock.RLock()
  239. defer l.lock.RUnlock()
  240. if !l.closed {
  241. l.messages <- msg
  242. }
  243. return nil
  244. }
  245. // Close closes the instance of the awslogs logging driver
  246. func (l *logStream) Close() error {
  247. l.lock.Lock()
  248. defer l.lock.Unlock()
  249. if !l.closed {
  250. close(l.messages)
  251. }
  252. l.closed = true
  253. return nil
  254. }
  255. // create creates log group and log stream for the instance of the awslogs logging driver
  256. func (l *logStream) create() error {
  257. if err := l.createLogStream(); err != nil {
  258. if l.logCreateGroup {
  259. if awsErr, ok := err.(awserr.Error); ok && awsErr.Code() == resourceNotFoundCode {
  260. if err := l.createLogGroup(); err != nil {
  261. return err
  262. }
  263. return l.createLogStream()
  264. }
  265. }
  266. return err
  267. }
  268. return nil
  269. }
  270. // createLogGroup creates a log group for the instance of the awslogs logging driver
  271. func (l *logStream) createLogGroup() error {
  272. if _, err := l.client.CreateLogGroup(&cloudwatchlogs.CreateLogGroupInput{
  273. LogGroupName: aws.String(l.logGroupName),
  274. }); err != nil {
  275. if awsErr, ok := err.(awserr.Error); ok {
  276. fields := logrus.Fields{
  277. "errorCode": awsErr.Code(),
  278. "message": awsErr.Message(),
  279. "origError": awsErr.OrigErr(),
  280. "logGroupName": l.logGroupName,
  281. "logCreateGroup": l.logCreateGroup,
  282. }
  283. if awsErr.Code() == resourceAlreadyExistsCode {
  284. // Allow creation to succeed
  285. logrus.WithFields(fields).Info("Log group already exists")
  286. return nil
  287. }
  288. logrus.WithFields(fields).Error("Failed to create log group")
  289. }
  290. return err
  291. }
  292. return nil
  293. }
  294. // createLogStream creates a log stream for the instance of the awslogs logging driver
  295. func (l *logStream) createLogStream() error {
  296. input := &cloudwatchlogs.CreateLogStreamInput{
  297. LogGroupName: aws.String(l.logGroupName),
  298. LogStreamName: aws.String(l.logStreamName),
  299. }
  300. _, err := l.client.CreateLogStream(input)
  301. if err != nil {
  302. if awsErr, ok := err.(awserr.Error); ok {
  303. fields := logrus.Fields{
  304. "errorCode": awsErr.Code(),
  305. "message": awsErr.Message(),
  306. "origError": awsErr.OrigErr(),
  307. "logGroupName": l.logGroupName,
  308. "logStreamName": l.logStreamName,
  309. }
  310. if awsErr.Code() == resourceAlreadyExistsCode {
  311. // Allow creation to succeed
  312. logrus.WithFields(fields).Info("Log stream already exists")
  313. return nil
  314. }
  315. logrus.WithFields(fields).Error("Failed to create log stream")
  316. }
  317. }
  318. return err
  319. }
  320. // newTicker is used for time-based batching. newTicker is a variable such
  321. // that the implementation can be swapped out for unit tests.
  322. var newTicker = func(freq time.Duration) *time.Ticker {
  323. return time.NewTicker(freq)
  324. }
  325. // collectBatch executes as a goroutine to perform batching of log events for
  326. // submission to the log stream. If the awslogs-multiline-pattern or
  327. // awslogs-datetime-format options have been configured, multiline processing
  328. // is enabled, where log messages are stored in an event buffer until a multiline
  329. // pattern match is found, at which point the messages in the event buffer are
  330. // pushed to CloudWatch logs as a single log event. Multline messages are processed
  331. // according to the maximumBytesPerPut constraint, and the implementation only
  332. // allows for messages to be buffered for a maximum of 2*batchPublishFrequency
  333. // seconds. When events are ready to be processed for submission to CloudWatch
  334. // Logs, the processEvents method is called. If a multiline pattern is not
  335. // configured, log events are submitted to the processEvents method immediately.
  336. func (l *logStream) collectBatch() {
  337. timer := newTicker(batchPublishFrequency)
  338. var events []wrappedEvent
  339. var eventBuffer []byte
  340. var eventBufferTimestamp int64
  341. for {
  342. select {
  343. case t := <-timer.C:
  344. // If event buffer is older than batch publish frequency flush the event buffer
  345. if eventBufferTimestamp > 0 && len(eventBuffer) > 0 {
  346. eventBufferAge := t.UnixNano()/int64(time.Millisecond) - eventBufferTimestamp
  347. eventBufferExpired := eventBufferAge > int64(batchPublishFrequency)/int64(time.Millisecond)
  348. eventBufferNegative := eventBufferAge < 0
  349. if eventBufferExpired || eventBufferNegative {
  350. events = l.processEvent(events, eventBuffer, eventBufferTimestamp)
  351. }
  352. }
  353. l.publishBatch(events)
  354. events = events[:0]
  355. case msg, more := <-l.messages:
  356. if !more {
  357. // Flush event buffer
  358. events = l.processEvent(events, eventBuffer, eventBufferTimestamp)
  359. l.publishBatch(events)
  360. return
  361. }
  362. if eventBufferTimestamp == 0 {
  363. eventBufferTimestamp = msg.Timestamp.UnixNano() / int64(time.Millisecond)
  364. }
  365. unprocessedLine := msg.Line
  366. if l.multilinePattern != nil {
  367. if l.multilinePattern.Match(unprocessedLine) {
  368. // This is a new log event so flush the current eventBuffer to events
  369. events = l.processEvent(events, eventBuffer, eventBufferTimestamp)
  370. eventBufferTimestamp = msg.Timestamp.UnixNano() / int64(time.Millisecond)
  371. eventBuffer = eventBuffer[:0]
  372. }
  373. // If we will exceed max bytes per event flush the current event buffer before appending
  374. if len(eventBuffer)+len(unprocessedLine) > maximumBytesPerEvent {
  375. events = l.processEvent(events, eventBuffer, eventBufferTimestamp)
  376. eventBuffer = eventBuffer[:0]
  377. }
  378. // Append new line
  379. processedLine := append(unprocessedLine, "\n"...)
  380. eventBuffer = append(eventBuffer, processedLine...)
  381. logger.PutMessage(msg)
  382. } else {
  383. events = l.processEvent(events, unprocessedLine, msg.Timestamp.UnixNano()/int64(time.Millisecond))
  384. logger.PutMessage(msg)
  385. }
  386. }
  387. }
  388. }
  389. // processEvent processes log events that are ready for submission to CloudWatch
  390. // logs. Batching is performed on time- and size-bases. Time-based batching
  391. // occurs at a 5 second interval (defined in the batchPublishFrequency const).
  392. // Size-based batching is performed on the maximum number of events per batch
  393. // (defined in maximumLogEventsPerPut) and the maximum number of total bytes in a
  394. // batch (defined in maximumBytesPerPut). Log messages are split by the maximum
  395. // bytes per event (defined in maximumBytesPerEvent). There is a fixed per-event
  396. // byte overhead (defined in perEventBytes) which is accounted for in split- and
  397. // batch-calculations.
  398. func (l *logStream) processEvent(events []wrappedEvent, unprocessedLine []byte, timestamp int64) []wrappedEvent {
  399. bytes := 0
  400. for len(unprocessedLine) > 0 {
  401. // Split line length so it does not exceed the maximum
  402. lineBytes := len(unprocessedLine)
  403. if lineBytes > maximumBytesPerEvent {
  404. lineBytes = maximumBytesPerEvent
  405. }
  406. line := unprocessedLine[:lineBytes]
  407. unprocessedLine = unprocessedLine[lineBytes:]
  408. if (len(events) >= maximumLogEventsPerPut) || (bytes+lineBytes+perEventBytes > maximumBytesPerPut) {
  409. // Publish an existing batch if it's already over the maximum number of events or if adding this
  410. // event would push it over the maximum number of total bytes.
  411. l.publishBatch(events)
  412. events = events[:0]
  413. bytes = 0
  414. }
  415. events = append(events, wrappedEvent{
  416. inputLogEvent: &cloudwatchlogs.InputLogEvent{
  417. Message: aws.String(string(line)),
  418. Timestamp: aws.Int64(timestamp),
  419. },
  420. insertOrder: len(events),
  421. })
  422. bytes += (lineBytes + perEventBytes)
  423. }
  424. return events
  425. }
  426. // publishBatch calls PutLogEvents for a given set of InputLogEvents,
  427. // accounting for sequencing requirements (each request must reference the
  428. // sequence token returned by the previous request).
  429. func (l *logStream) publishBatch(events []wrappedEvent) {
  430. if len(events) == 0 {
  431. return
  432. }
  433. // events in a batch must be sorted by timestamp
  434. // see http://docs.aws.amazon.com/AmazonCloudWatchLogs/latest/APIReference/API_PutLogEvents.html
  435. sort.Sort(byTimestamp(events))
  436. cwEvents := unwrapEvents(events)
  437. nextSequenceToken, err := l.putLogEvents(cwEvents, l.sequenceToken)
  438. if err != nil {
  439. if awsErr, ok := err.(awserr.Error); ok {
  440. if awsErr.Code() == dataAlreadyAcceptedCode {
  441. // already submitted, just grab the correct sequence token
  442. parts := strings.Split(awsErr.Message(), " ")
  443. nextSequenceToken = &parts[len(parts)-1]
  444. logrus.WithFields(logrus.Fields{
  445. "errorCode": awsErr.Code(),
  446. "message": awsErr.Message(),
  447. "logGroupName": l.logGroupName,
  448. "logStreamName": l.logStreamName,
  449. }).Info("Data already accepted, ignoring error")
  450. err = nil
  451. } else if awsErr.Code() == invalidSequenceTokenCode {
  452. // sequence code is bad, grab the correct one and retry
  453. parts := strings.Split(awsErr.Message(), " ")
  454. token := parts[len(parts)-1]
  455. nextSequenceToken, err = l.putLogEvents(cwEvents, &token)
  456. }
  457. }
  458. }
  459. if err != nil {
  460. logrus.Error(err)
  461. } else {
  462. l.sequenceToken = nextSequenceToken
  463. }
  464. }
  465. // putLogEvents wraps the PutLogEvents API
  466. func (l *logStream) putLogEvents(events []*cloudwatchlogs.InputLogEvent, sequenceToken *string) (*string, error) {
  467. input := &cloudwatchlogs.PutLogEventsInput{
  468. LogEvents: events,
  469. SequenceToken: sequenceToken,
  470. LogGroupName: aws.String(l.logGroupName),
  471. LogStreamName: aws.String(l.logStreamName),
  472. }
  473. resp, err := l.client.PutLogEvents(input)
  474. if err != nil {
  475. if awsErr, ok := err.(awserr.Error); ok {
  476. logrus.WithFields(logrus.Fields{
  477. "errorCode": awsErr.Code(),
  478. "message": awsErr.Message(),
  479. "origError": awsErr.OrigErr(),
  480. "logGroupName": l.logGroupName,
  481. "logStreamName": l.logStreamName,
  482. }).Error("Failed to put log events")
  483. }
  484. return nil, err
  485. }
  486. return resp.NextSequenceToken, nil
  487. }
  488. // ValidateLogOpt looks for awslogs-specific log options awslogs-region,
  489. // awslogs-group, awslogs-stream, awslogs-create-group, awslogs-datetime-format,
  490. // awslogs-multiline-pattern
  491. func ValidateLogOpt(cfg map[string]string) error {
  492. for key := range cfg {
  493. switch key {
  494. case logGroupKey:
  495. case logStreamKey:
  496. case logCreateGroupKey:
  497. case regionKey:
  498. case tagKey:
  499. case datetimeFormatKey:
  500. case multilinePatternKey:
  501. default:
  502. return fmt.Errorf("unknown log opt '%s' for %s log driver", key, name)
  503. }
  504. }
  505. if cfg[logGroupKey] == "" {
  506. return fmt.Errorf("must specify a value for log opt '%s'", logGroupKey)
  507. }
  508. if cfg[logCreateGroupKey] != "" {
  509. if _, err := strconv.ParseBool(cfg[logCreateGroupKey]); err != nil {
  510. return fmt.Errorf("must specify valid value for log opt '%s': %v", logCreateGroupKey, err)
  511. }
  512. }
  513. _, datetimeFormatKeyExists := cfg[datetimeFormatKey]
  514. _, multilinePatternKeyExists := cfg[multilinePatternKey]
  515. if datetimeFormatKeyExists && multilinePatternKeyExists {
  516. return fmt.Errorf("you cannot configure log opt '%s' and '%s' at the same time", datetimeFormatKey, multilinePatternKey)
  517. }
  518. return nil
  519. }
  520. // Len returns the length of a byTimestamp slice. Len is required by the
  521. // sort.Interface interface.
  522. func (slice byTimestamp) Len() int {
  523. return len(slice)
  524. }
  525. // Less compares two values in a byTimestamp slice by Timestamp. Less is
  526. // required by the sort.Interface interface.
  527. func (slice byTimestamp) Less(i, j int) bool {
  528. iTimestamp, jTimestamp := int64(0), int64(0)
  529. if slice != nil && slice[i].inputLogEvent.Timestamp != nil {
  530. iTimestamp = *slice[i].inputLogEvent.Timestamp
  531. }
  532. if slice != nil && slice[j].inputLogEvent.Timestamp != nil {
  533. jTimestamp = *slice[j].inputLogEvent.Timestamp
  534. }
  535. if iTimestamp == jTimestamp {
  536. return slice[i].insertOrder < slice[j].insertOrder
  537. }
  538. return iTimestamp < jTimestamp
  539. }
  540. // Swap swaps two values in a byTimestamp slice with each other. Swap is
  541. // required by the sort.Interface interface.
  542. func (slice byTimestamp) Swap(i, j int) {
  543. slice[i], slice[j] = slice[j], slice[i]
  544. }
  545. func unwrapEvents(events []wrappedEvent) []*cloudwatchlogs.InputLogEvent {
  546. cwEvents := make([]*cloudwatchlogs.InputLogEvent, len(events))
  547. for i, input := range events {
  548. cwEvents[i] = input.inputLogEvent
  549. }
  550. return cwEvents
  551. }