cloudwatchlogs.go 20 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582
  1. // Package awslogs provides the logdriver for forwarding container logs to Amazon CloudWatch Logs
  2. package awslogs
  3. import (
  4. "fmt"
  5. "os"
  6. "regexp"
  7. "runtime"
  8. "sort"
  9. "strconv"
  10. "strings"
  11. "sync"
  12. "time"
  13. "github.com/aws/aws-sdk-go/aws"
  14. "github.com/aws/aws-sdk-go/aws/awserr"
  15. "github.com/aws/aws-sdk-go/aws/ec2metadata"
  16. "github.com/aws/aws-sdk-go/aws/request"
  17. "github.com/aws/aws-sdk-go/aws/session"
  18. "github.com/aws/aws-sdk-go/service/cloudwatchlogs"
  19. "github.com/docker/docker/daemon/logger"
  20. "github.com/docker/docker/daemon/logger/loggerutils"
  21. "github.com/docker/docker/dockerversion"
  22. "github.com/pkg/errors"
  23. "github.com/sirupsen/logrus"
  24. )
  25. const (
  26. name = "awslogs"
  27. regionKey = "awslogs-region"
  28. regionEnvKey = "AWS_REGION"
  29. logGroupKey = "awslogs-group"
  30. logStreamKey = "awslogs-stream"
  31. logCreateGroupKey = "awslogs-create-group"
  32. tagKey = "tag"
  33. datetimeFormatKey = "awslogs-datetime-format"
  34. multilinePatternKey = "awslogs-multiline-pattern"
  35. batchPublishFrequency = 5 * time.Second
  36. // See: http://docs.aws.amazon.com/AmazonCloudWatchLogs/latest/APIReference/API_PutLogEvents.html
  37. perEventBytes = 26
  38. maximumBytesPerPut = 1048576
  39. maximumLogEventsPerPut = 10000
  40. // See: http://docs.aws.amazon.com/AmazonCloudWatch/latest/DeveloperGuide/cloudwatch_limits.html
  41. maximumBytesPerEvent = 262144 - perEventBytes
  42. resourceAlreadyExistsCode = "ResourceAlreadyExistsException"
  43. dataAlreadyAcceptedCode = "DataAlreadyAcceptedException"
  44. invalidSequenceTokenCode = "InvalidSequenceTokenException"
  45. resourceNotFoundCode = "ResourceNotFoundException"
  46. userAgentHeader = "User-Agent"
  47. )
  48. type logStream struct {
  49. logStreamName string
  50. logGroupName string
  51. logCreateGroup bool
  52. multilinePattern *regexp.Regexp
  53. client api
  54. messages chan *logger.Message
  55. lock sync.RWMutex
  56. closed bool
  57. sequenceToken *string
  58. }
  59. type api interface {
  60. CreateLogGroup(*cloudwatchlogs.CreateLogGroupInput) (*cloudwatchlogs.CreateLogGroupOutput, error)
  61. CreateLogStream(*cloudwatchlogs.CreateLogStreamInput) (*cloudwatchlogs.CreateLogStreamOutput, error)
  62. PutLogEvents(*cloudwatchlogs.PutLogEventsInput) (*cloudwatchlogs.PutLogEventsOutput, error)
  63. }
  64. type regionFinder interface {
  65. Region() (string, error)
  66. }
  67. type wrappedEvent struct {
  68. inputLogEvent *cloudwatchlogs.InputLogEvent
  69. insertOrder int
  70. }
  71. type byTimestamp []wrappedEvent
  72. // init registers the awslogs driver
  73. func init() {
  74. if err := logger.RegisterLogDriver(name, New); err != nil {
  75. logrus.Fatal(err)
  76. }
  77. if err := logger.RegisterLogOptValidator(name, ValidateLogOpt); err != nil {
  78. logrus.Fatal(err)
  79. }
  80. }
  81. // New creates an awslogs logger using the configuration passed in on the
  82. // context. Supported context configuration variables are awslogs-region,
  83. // awslogs-group, awslogs-stream, awslogs-create-group, awslogs-multiline-pattern
  84. // and awslogs-datetime-format. When available, configuration is
  85. // also taken from environment variables AWS_REGION, AWS_ACCESS_KEY_ID,
  86. // AWS_SECRET_ACCESS_KEY, the shared credentials file (~/.aws/credentials), and
  87. // the EC2 Instance Metadata Service.
  88. func New(info logger.Info) (logger.Logger, error) {
  89. logGroupName := info.Config[logGroupKey]
  90. logStreamName, err := loggerutils.ParseLogTag(info, "{{.FullID}}")
  91. if err != nil {
  92. return nil, err
  93. }
  94. logCreateGroup := false
  95. if info.Config[logCreateGroupKey] != "" {
  96. logCreateGroup, err = strconv.ParseBool(info.Config[logCreateGroupKey])
  97. if err != nil {
  98. return nil, err
  99. }
  100. }
  101. if info.Config[logStreamKey] != "" {
  102. logStreamName = info.Config[logStreamKey]
  103. }
  104. multilinePattern, err := parseMultilineOptions(info)
  105. if err != nil {
  106. return nil, err
  107. }
  108. client, err := newAWSLogsClient(info)
  109. if err != nil {
  110. return nil, err
  111. }
  112. containerStream := &logStream{
  113. logStreamName: logStreamName,
  114. logGroupName: logGroupName,
  115. logCreateGroup: logCreateGroup,
  116. multilinePattern: multilinePattern,
  117. client: client,
  118. messages: make(chan *logger.Message, 4096),
  119. }
  120. err = containerStream.create()
  121. if err != nil {
  122. return nil, err
  123. }
  124. go containerStream.collectBatch()
  125. return containerStream, nil
  126. }
  127. // Parses awslogs-multiline-pattern and awslogs-datetime-format options
  128. // If awslogs-datetime-format is present, convert the format from strftime
  129. // to regexp and return.
  130. // If awslogs-multiline-pattern is present, compile regexp and return
  131. func parseMultilineOptions(info logger.Info) (*regexp.Regexp, error) {
  132. dateTimeFormat := info.Config[datetimeFormatKey]
  133. multilinePatternKey := info.Config[multilinePatternKey]
  134. // strftime input is parsed into a regular expression
  135. if dateTimeFormat != "" {
  136. // %. matches each strftime format sequence and ReplaceAllStringFunc
  137. // looks up each format sequence in the conversion table strftimeToRegex
  138. // to replace with a defined regular expression
  139. r := regexp.MustCompile("%.")
  140. multilinePatternKey = r.ReplaceAllStringFunc(dateTimeFormat, func(s string) string {
  141. return strftimeToRegex[s]
  142. })
  143. }
  144. if multilinePatternKey != "" {
  145. multilinePattern, err := regexp.Compile(multilinePatternKey)
  146. if err != nil {
  147. return nil, errors.Wrapf(err, "awslogs could not parse multiline pattern key %q", multilinePatternKey)
  148. }
  149. return multilinePattern, nil
  150. }
  151. return nil, nil
  152. }
  153. // Maps strftime format strings to regex
  154. var strftimeToRegex = map[string]string{
  155. /*weekdayShort */ `%a`: `(?:Mon|Tue|Wed|Thu|Fri|Sat|Sun)`,
  156. /*weekdayFull */ `%A`: `(?:Monday|Tuesday|Wednesday|Thursday|Friday|Saturday|Sunday)`,
  157. /*weekdayZeroIndex */ `%w`: `[0-6]`,
  158. /*dayZeroPadded */ `%d`: `(?:0[1-9]|[1,2][0-9]|3[0,1])`,
  159. /*monthShort */ `%b`: `(?:Jan|Feb|Mar|Apr|May|Jun|Jul|Aug|Sep|Oct|Nov|Dec)`,
  160. /*monthFull */ `%B`: `(?:January|February|March|April|May|June|July|August|September|October|November|December)`,
  161. /*monthZeroPadded */ `%m`: `(?:0[1-9]|1[0-2])`,
  162. /*yearCentury */ `%Y`: `\d{4}`,
  163. /*yearZeroPadded */ `%y`: `\d{2}`,
  164. /*hour24ZeroPadded */ `%H`: `(?:[0,1][0-9]|2[0-3])`,
  165. /*hour12ZeroPadded */ `%I`: `(?:0[0-9]|1[0-2])`,
  166. /*AM or PM */ `%p`: "[A,P]M",
  167. /*minuteZeroPadded */ `%M`: `[0-5][0-9]`,
  168. /*secondZeroPadded */ `%S`: `[0-5][0-9]`,
  169. /*microsecondZeroPadded */ `%f`: `\d{6}`,
  170. /*utcOffset */ `%z`: `[+-]\d{4}`,
  171. /*tzName */ `%Z`: `[A-Z]{1,4}T`,
  172. /*dayOfYearZeroPadded */ `%j`: `(?:0[0-9][1-9]|[1,2][0-9][0-9]|3[0-5][0-9]|36[0-6])`,
  173. /*milliseconds */ `%L`: `\.\d{3}`,
  174. }
  175. // newRegionFinder is a variable such that the implementation
  176. // can be swapped out for unit tests.
  177. var newRegionFinder = func() regionFinder {
  178. return ec2metadata.New(session.New())
  179. }
  180. // newAWSLogsClient creates the service client for Amazon CloudWatch Logs.
  181. // Customizations to the default client from the SDK include a Docker-specific
  182. // User-Agent string and automatic region detection using the EC2 Instance
  183. // Metadata Service when region is otherwise unspecified.
  184. func newAWSLogsClient(info logger.Info) (api, error) {
  185. var region *string
  186. if os.Getenv(regionEnvKey) != "" {
  187. region = aws.String(os.Getenv(regionEnvKey))
  188. }
  189. if info.Config[regionKey] != "" {
  190. region = aws.String(info.Config[regionKey])
  191. }
  192. if region == nil || *region == "" {
  193. logrus.Info("Trying to get region from EC2 Metadata")
  194. ec2MetadataClient := newRegionFinder()
  195. r, err := ec2MetadataClient.Region()
  196. if err != nil {
  197. logrus.WithFields(logrus.Fields{
  198. "error": err,
  199. }).Error("Could not get region from EC2 metadata, environment, or log option")
  200. return nil, errors.New("Cannot determine region for awslogs driver")
  201. }
  202. region = &r
  203. }
  204. logrus.WithFields(logrus.Fields{
  205. "region": *region,
  206. }).Debug("Created awslogs client")
  207. client := cloudwatchlogs.New(session.New(), aws.NewConfig().WithRegion(*region))
  208. client.Handlers.Build.PushBackNamed(request.NamedHandler{
  209. Name: "DockerUserAgentHandler",
  210. Fn: func(r *request.Request) {
  211. currentAgent := r.HTTPRequest.Header.Get(userAgentHeader)
  212. r.HTTPRequest.Header.Set(userAgentHeader,
  213. fmt.Sprintf("Docker %s (%s) %s",
  214. dockerversion.Version, runtime.GOOS, currentAgent))
  215. },
  216. })
  217. return client, nil
  218. }
  219. // Name returns the name of the awslogs logging driver
  220. func (l *logStream) Name() string {
  221. return name
  222. }
  223. // Log submits messages for logging by an instance of the awslogs logging driver
  224. func (l *logStream) Log(msg *logger.Message) error {
  225. l.lock.RLock()
  226. defer l.lock.RUnlock()
  227. if !l.closed {
  228. l.messages <- msg
  229. }
  230. return nil
  231. }
  232. // Close closes the instance of the awslogs logging driver
  233. func (l *logStream) Close() error {
  234. l.lock.Lock()
  235. defer l.lock.Unlock()
  236. if !l.closed {
  237. close(l.messages)
  238. }
  239. l.closed = true
  240. return nil
  241. }
  242. // create creates log group and log stream for the instance of the awslogs logging driver
  243. func (l *logStream) create() error {
  244. if err := l.createLogStream(); err != nil {
  245. if l.logCreateGroup {
  246. if awsErr, ok := err.(awserr.Error); ok && awsErr.Code() == resourceNotFoundCode {
  247. if err := l.createLogGroup(); err != nil {
  248. return err
  249. }
  250. return l.createLogStream()
  251. }
  252. }
  253. return err
  254. }
  255. return nil
  256. }
  257. // createLogGroup creates a log group for the instance of the awslogs logging driver
  258. func (l *logStream) createLogGroup() error {
  259. if _, err := l.client.CreateLogGroup(&cloudwatchlogs.CreateLogGroupInput{
  260. LogGroupName: aws.String(l.logGroupName),
  261. }); err != nil {
  262. if awsErr, ok := err.(awserr.Error); ok {
  263. fields := logrus.Fields{
  264. "errorCode": awsErr.Code(),
  265. "message": awsErr.Message(),
  266. "origError": awsErr.OrigErr(),
  267. "logGroupName": l.logGroupName,
  268. "logCreateGroup": l.logCreateGroup,
  269. }
  270. if awsErr.Code() == resourceAlreadyExistsCode {
  271. // Allow creation to succeed
  272. logrus.WithFields(fields).Info("Log group already exists")
  273. return nil
  274. }
  275. logrus.WithFields(fields).Error("Failed to create log group")
  276. }
  277. return err
  278. }
  279. return nil
  280. }
  281. // createLogStream creates a log stream for the instance of the awslogs logging driver
  282. func (l *logStream) createLogStream() error {
  283. input := &cloudwatchlogs.CreateLogStreamInput{
  284. LogGroupName: aws.String(l.logGroupName),
  285. LogStreamName: aws.String(l.logStreamName),
  286. }
  287. _, err := l.client.CreateLogStream(input)
  288. if err != nil {
  289. if awsErr, ok := err.(awserr.Error); ok {
  290. fields := logrus.Fields{
  291. "errorCode": awsErr.Code(),
  292. "message": awsErr.Message(),
  293. "origError": awsErr.OrigErr(),
  294. "logGroupName": l.logGroupName,
  295. "logStreamName": l.logStreamName,
  296. }
  297. if awsErr.Code() == resourceAlreadyExistsCode {
  298. // Allow creation to succeed
  299. logrus.WithFields(fields).Info("Log stream already exists")
  300. return nil
  301. }
  302. logrus.WithFields(fields).Error("Failed to create log stream")
  303. }
  304. }
  305. return err
  306. }
  307. // newTicker is used for time-based batching. newTicker is a variable such
  308. // that the implementation can be swapped out for unit tests.
  309. var newTicker = func(freq time.Duration) *time.Ticker {
  310. return time.NewTicker(freq)
  311. }
  312. // collectBatch executes as a goroutine to perform batching of log events for
  313. // submission to the log stream. If the awslogs-multiline-pattern or
  314. // awslogs-datetime-format options have been configured, multiline processing
  315. // is enabled, where log messages are stored in an event buffer until a multiline
  316. // pattern match is found, at which point the messages in the event buffer are
  317. // pushed to CloudWatch logs as a single log event. Multiline messages are processed
  318. // according to the maximumBytesPerPut constraint, and the implementation only
  319. // allows for messages to be buffered for a maximum of 2*batchPublishFrequency
  320. // seconds. When events are ready to be processed for submission to CloudWatch
  321. // Logs, the processEvents method is called. If a multiline pattern is not
  322. // configured, log events are submitted to the processEvents method immediately.
  323. func (l *logStream) collectBatch() {
  324. timer := newTicker(batchPublishFrequency)
  325. var events []wrappedEvent
  326. var eventBuffer []byte
  327. var eventBufferTimestamp int64
  328. for {
  329. select {
  330. case t := <-timer.C:
  331. // If event buffer is older than batch publish frequency flush the event buffer
  332. if eventBufferTimestamp > 0 && len(eventBuffer) > 0 {
  333. eventBufferAge := t.UnixNano()/int64(time.Millisecond) - eventBufferTimestamp
  334. eventBufferExpired := eventBufferAge > int64(batchPublishFrequency)/int64(time.Millisecond)
  335. eventBufferNegative := eventBufferAge < 0
  336. if eventBufferExpired || eventBufferNegative {
  337. events = l.processEvent(events, eventBuffer, eventBufferTimestamp)
  338. eventBuffer = eventBuffer[:0]
  339. }
  340. }
  341. l.publishBatch(events)
  342. events = events[:0]
  343. case msg, more := <-l.messages:
  344. if !more {
  345. // Flush event buffer and release resources
  346. events = l.processEvent(events, eventBuffer, eventBufferTimestamp)
  347. eventBuffer = eventBuffer[:0]
  348. l.publishBatch(events)
  349. events = events[:0]
  350. return
  351. }
  352. if eventBufferTimestamp == 0 {
  353. eventBufferTimestamp = msg.Timestamp.UnixNano() / int64(time.Millisecond)
  354. }
  355. unprocessedLine := msg.Line
  356. if l.multilinePattern != nil {
  357. if l.multilinePattern.Match(unprocessedLine) || len(eventBuffer)+len(unprocessedLine) > maximumBytesPerEvent {
  358. // This is a new log event or we will exceed max bytes per event
  359. // so flush the current eventBuffer to events and reset timestamp
  360. events = l.processEvent(events, eventBuffer, eventBufferTimestamp)
  361. eventBufferTimestamp = msg.Timestamp.UnixNano() / int64(time.Millisecond)
  362. eventBuffer = eventBuffer[:0]
  363. }
  364. // Append new line
  365. processedLine := append(unprocessedLine, "\n"...)
  366. eventBuffer = append(eventBuffer, processedLine...)
  367. logger.PutMessage(msg)
  368. } else {
  369. events = l.processEvent(events, unprocessedLine, msg.Timestamp.UnixNano()/int64(time.Millisecond))
  370. logger.PutMessage(msg)
  371. }
  372. }
  373. }
  374. }
  375. // processEvent processes log events that are ready for submission to CloudWatch
  376. // logs. Batching is performed on time- and size-bases. Time-based batching
  377. // occurs at a 5 second interval (defined in the batchPublishFrequency const).
  378. // Size-based batching is performed on the maximum number of events per batch
  379. // (defined in maximumLogEventsPerPut) and the maximum number of total bytes in a
  380. // batch (defined in maximumBytesPerPut). Log messages are split by the maximum
  381. // bytes per event (defined in maximumBytesPerEvent). There is a fixed per-event
  382. // byte overhead (defined in perEventBytes) which is accounted for in split- and
  383. // batch-calculations.
  384. func (l *logStream) processEvent(events []wrappedEvent, unprocessedLine []byte, timestamp int64) []wrappedEvent {
  385. bytes := 0
  386. for len(unprocessedLine) > 0 {
  387. // Split line length so it does not exceed the maximum
  388. lineBytes := len(unprocessedLine)
  389. if lineBytes > maximumBytesPerEvent {
  390. lineBytes = maximumBytesPerEvent
  391. }
  392. line := unprocessedLine[:lineBytes]
  393. unprocessedLine = unprocessedLine[lineBytes:]
  394. if (len(events) >= maximumLogEventsPerPut) || (bytes+lineBytes+perEventBytes > maximumBytesPerPut) {
  395. // Publish an existing batch if it's already over the maximum number of events or if adding this
  396. // event would push it over the maximum number of total bytes.
  397. l.publishBatch(events)
  398. events = events[:0]
  399. bytes = 0
  400. }
  401. events = append(events, wrappedEvent{
  402. inputLogEvent: &cloudwatchlogs.InputLogEvent{
  403. Message: aws.String(string(line)),
  404. Timestamp: aws.Int64(timestamp),
  405. },
  406. insertOrder: len(events),
  407. })
  408. bytes += (lineBytes + perEventBytes)
  409. }
  410. return events
  411. }
  412. // publishBatch calls PutLogEvents for a given set of InputLogEvents,
  413. // accounting for sequencing requirements (each request must reference the
  414. // sequence token returned by the previous request).
  415. func (l *logStream) publishBatch(events []wrappedEvent) {
  416. if len(events) == 0 {
  417. return
  418. }
  419. // events in a batch must be sorted by timestamp
  420. // see http://docs.aws.amazon.com/AmazonCloudWatchLogs/latest/APIReference/API_PutLogEvents.html
  421. sort.Sort(byTimestamp(events))
  422. cwEvents := unwrapEvents(events)
  423. nextSequenceToken, err := l.putLogEvents(cwEvents, l.sequenceToken)
  424. if err != nil {
  425. if awsErr, ok := err.(awserr.Error); ok {
  426. if awsErr.Code() == dataAlreadyAcceptedCode {
  427. // already submitted, just grab the correct sequence token
  428. parts := strings.Split(awsErr.Message(), " ")
  429. nextSequenceToken = &parts[len(parts)-1]
  430. logrus.WithFields(logrus.Fields{
  431. "errorCode": awsErr.Code(),
  432. "message": awsErr.Message(),
  433. "logGroupName": l.logGroupName,
  434. "logStreamName": l.logStreamName,
  435. }).Info("Data already accepted, ignoring error")
  436. err = nil
  437. } else if awsErr.Code() == invalidSequenceTokenCode {
  438. // sequence code is bad, grab the correct one and retry
  439. parts := strings.Split(awsErr.Message(), " ")
  440. token := parts[len(parts)-1]
  441. nextSequenceToken, err = l.putLogEvents(cwEvents, &token)
  442. }
  443. }
  444. }
  445. if err != nil {
  446. logrus.Error(err)
  447. } else {
  448. l.sequenceToken = nextSequenceToken
  449. }
  450. }
  451. // putLogEvents wraps the PutLogEvents API
  452. func (l *logStream) putLogEvents(events []*cloudwatchlogs.InputLogEvent, sequenceToken *string) (*string, error) {
  453. input := &cloudwatchlogs.PutLogEventsInput{
  454. LogEvents: events,
  455. SequenceToken: sequenceToken,
  456. LogGroupName: aws.String(l.logGroupName),
  457. LogStreamName: aws.String(l.logStreamName),
  458. }
  459. resp, err := l.client.PutLogEvents(input)
  460. if err != nil {
  461. if awsErr, ok := err.(awserr.Error); ok {
  462. logrus.WithFields(logrus.Fields{
  463. "errorCode": awsErr.Code(),
  464. "message": awsErr.Message(),
  465. "origError": awsErr.OrigErr(),
  466. "logGroupName": l.logGroupName,
  467. "logStreamName": l.logStreamName,
  468. }).Error("Failed to put log events")
  469. }
  470. return nil, err
  471. }
  472. return resp.NextSequenceToken, nil
  473. }
  474. // ValidateLogOpt looks for awslogs-specific log options awslogs-region,
  475. // awslogs-group, awslogs-stream, awslogs-create-group, awslogs-datetime-format,
  476. // awslogs-multiline-pattern
  477. func ValidateLogOpt(cfg map[string]string) error {
  478. for key := range cfg {
  479. switch key {
  480. case logGroupKey:
  481. case logStreamKey:
  482. case logCreateGroupKey:
  483. case regionKey:
  484. case tagKey:
  485. case datetimeFormatKey:
  486. case multilinePatternKey:
  487. default:
  488. return fmt.Errorf("unknown log opt '%s' for %s log driver", key, name)
  489. }
  490. }
  491. if cfg[logGroupKey] == "" {
  492. return fmt.Errorf("must specify a value for log opt '%s'", logGroupKey)
  493. }
  494. if cfg[logCreateGroupKey] != "" {
  495. if _, err := strconv.ParseBool(cfg[logCreateGroupKey]); err != nil {
  496. return fmt.Errorf("must specify valid value for log opt '%s': %v", logCreateGroupKey, err)
  497. }
  498. }
  499. _, datetimeFormatKeyExists := cfg[datetimeFormatKey]
  500. _, multilinePatternKeyExists := cfg[multilinePatternKey]
  501. if datetimeFormatKeyExists && multilinePatternKeyExists {
  502. return fmt.Errorf("you cannot configure log opt '%s' and '%s' at the same time", datetimeFormatKey, multilinePatternKey)
  503. }
  504. return nil
  505. }
  506. // Len returns the length of a byTimestamp slice. Len is required by the
  507. // sort.Interface interface.
  508. func (slice byTimestamp) Len() int {
  509. return len(slice)
  510. }
  511. // Less compares two values in a byTimestamp slice by Timestamp. Less is
  512. // required by the sort.Interface interface.
  513. func (slice byTimestamp) Less(i, j int) bool {
  514. iTimestamp, jTimestamp := int64(0), int64(0)
  515. if slice != nil && slice[i].inputLogEvent.Timestamp != nil {
  516. iTimestamp = *slice[i].inputLogEvent.Timestamp
  517. }
  518. if slice != nil && slice[j].inputLogEvent.Timestamp != nil {
  519. jTimestamp = *slice[j].inputLogEvent.Timestamp
  520. }
  521. if iTimestamp == jTimestamp {
  522. return slice[i].insertOrder < slice[j].insertOrder
  523. }
  524. return iTimestamp < jTimestamp
  525. }
  526. // Swap swaps two values in a byTimestamp slice with each other. Swap is
  527. // required by the sort.Interface interface.
  528. func (slice byTimestamp) Swap(i, j int) {
  529. slice[i], slice[j] = slice[j], slice[i]
  530. }
  531. func unwrapEvents(events []wrappedEvent) []*cloudwatchlogs.InputLogEvent {
  532. cwEvents := make([]*cloudwatchlogs.InputLogEvent, len(events))
  533. for i, input := range events {
  534. cwEvents[i] = input.inputLogEvent
  535. }
  536. return cwEvents
  537. }