cloudwatchlogs_test.go 29 KB


  1. package awslogs
  2. import (
  3. "errors"
  4. "fmt"
  5. "net/http"
  6. "reflect"
  7. "regexp"
  8. "runtime"
  9. "strings"
  10. "testing"
  11. "time"
  12. "github.com/aws/aws-sdk-go/aws"
  13. "github.com/aws/aws-sdk-go/aws/awserr"
  14. "github.com/aws/aws-sdk-go/aws/request"
  15. "github.com/aws/aws-sdk-go/service/cloudwatchlogs"
  16. "github.com/docker/docker/daemon/logger"
  17. "github.com/docker/docker/daemon/logger/loggerutils"
  18. "github.com/docker/docker/dockerversion"
  19. "github.com/stretchr/testify/assert"
  20. )
  21. const (
  22. groupName = "groupName"
  23. streamName = "streamName"
  24. sequenceToken = "sequenceToken"
  25. nextSequenceToken = "nextSequenceToken"
  26. logline = "this is a log line\r"
  27. multilineLogline = "2017-01-01 01:01:44 This is a multiline log entry\r"
  28. )
  29. // Generates i multi-line events each with j lines
  30. func (l *logStream) logGenerator(lineCount int, multilineCount int) {
  31. for i := 0; i < multilineCount; i++ {
  32. l.Log(&logger.Message{
  33. Line: []byte(multilineLogline),
  34. Timestamp: time.Time{},
  35. })
  36. for j := 0; j < lineCount; j++ {
  37. l.Log(&logger.Message{
  38. Line: []byte(logline),
  39. Timestamp: time.Time{},
  40. })
  41. }
  42. }
  43. }
  44. func TestNewAWSLogsClientUserAgentHandler(t *testing.T) {
  45. info := logger.Info{
  46. Config: map[string]string{
  47. regionKey: "us-east-1",
  48. },
  49. }
  50. client, err := newAWSLogsClient(info)
  51. if err != nil {
  52. t.Fatal(err)
  53. }
  54. realClient, ok := client.(*cloudwatchlogs.CloudWatchLogs)
  55. if !ok {
  56. t.Fatal("Could not cast client to cloudwatchlogs.CloudWatchLogs")
  57. }
  58. buildHandlerList := realClient.Handlers.Build
  59. request := &request.Request{
  60. HTTPRequest: &http.Request{
  61. Header: http.Header{},
  62. },
  63. }
  64. buildHandlerList.Run(request)
  65. expectedUserAgentString := fmt.Sprintf("Docker %s (%s) %s/%s (%s; %s; %s)",
  66. dockerversion.Version, runtime.GOOS, aws.SDKName, aws.SDKVersion, runtime.Version(), runtime.GOOS, runtime.GOARCH)
  67. userAgent := request.HTTPRequest.Header.Get("User-Agent")
  68. if userAgent != expectedUserAgentString {
  69. t.Errorf("Wrong User-Agent string, expected \"%s\" but was \"%s\"",
  70. expectedUserAgentString, userAgent)
  71. }
  72. }
  73. func TestNewAWSLogsClientRegionDetect(t *testing.T) {
  74. info := logger.Info{
  75. Config: map[string]string{},
  76. }
  77. mockMetadata := newMockMetadataClient()
  78. newRegionFinder = func() regionFinder {
  79. return mockMetadata
  80. }
  81. mockMetadata.regionResult <- &regionResult{
  82. successResult: "us-east-1",
  83. }
  84. _, err := newAWSLogsClient(info)
  85. if err != nil {
  86. t.Fatal(err)
  87. }
  88. }
  89. func TestCreateSuccess(t *testing.T) {
  90. mockClient := newMockClient()
  91. stream := &logStream{
  92. client: mockClient,
  93. logGroupName: groupName,
  94. logStreamName: streamName,
  95. }
  96. mockClient.createLogStreamResult <- &createLogStreamResult{}
  97. err := stream.create()
  98. if err != nil {
  99. t.Errorf("Received unexpected err: %v\n", err)
  100. }
  101. argument := <-mockClient.createLogStreamArgument
  102. if argument.LogGroupName == nil {
  103. t.Fatal("Expected non-nil LogGroupName")
  104. }
  105. if *argument.LogGroupName != groupName {
  106. t.Errorf("Expected LogGroupName to be %s", groupName)
  107. }
  108. if argument.LogStreamName == nil {
  109. t.Fatal("Expected non-nil LogStreamName")
  110. }
  111. if *argument.LogStreamName != streamName {
  112. t.Errorf("Expected LogStreamName to be %s", streamName)
  113. }
  114. }
  115. func TestCreateLogGroupSuccess(t *testing.T) {
  116. mockClient := newMockClient()
  117. stream := &logStream{
  118. client: mockClient,
  119. logGroupName: groupName,
  120. logStreamName: streamName,
  121. logCreateGroup: true,
  122. }
  123. mockClient.createLogGroupResult <- &createLogGroupResult{}
  124. mockClient.createLogStreamResult <- &createLogStreamResult{}
  125. err := stream.create()
  126. if err != nil {
  127. t.Errorf("Received unexpected err: %v\n", err)
  128. }
  129. argument := <-mockClient.createLogStreamArgument
  130. if argument.LogGroupName == nil {
  131. t.Fatal("Expected non-nil LogGroupName")
  132. }
  133. if *argument.LogGroupName != groupName {
  134. t.Errorf("Expected LogGroupName to be %s", groupName)
  135. }
  136. if argument.LogStreamName == nil {
  137. t.Fatal("Expected non-nil LogStreamName")
  138. }
  139. if *argument.LogStreamName != streamName {
  140. t.Errorf("Expected LogStreamName to be %s", streamName)
  141. }
  142. }
  143. func TestCreateError(t *testing.T) {
  144. mockClient := newMockClient()
  145. stream := &logStream{
  146. client: mockClient,
  147. }
  148. mockClient.createLogStreamResult <- &createLogStreamResult{
  149. errorResult: errors.New("Error!"),
  150. }
  151. err := stream.create()
  152. if err == nil {
  153. t.Fatal("Expected non-nil err")
  154. }
  155. }
  156. func TestCreateAlreadyExists(t *testing.T) {
  157. mockClient := newMockClient()
  158. stream := &logStream{
  159. client: mockClient,
  160. }
  161. mockClient.createLogStreamResult <- &createLogStreamResult{
  162. errorResult: awserr.New(resourceAlreadyExistsCode, "", nil),
  163. }
  164. err := stream.create()
  165. if err != nil {
  166. t.Fatal("Expected nil err")
  167. }
  168. }
  169. func TestPublishBatchSuccess(t *testing.T) {
  170. mockClient := newMockClient()
  171. stream := &logStream{
  172. client: mockClient,
  173. logGroupName: groupName,
  174. logStreamName: streamName,
  175. sequenceToken: aws.String(sequenceToken),
  176. }
  177. mockClient.putLogEventsResult <- &putLogEventsResult{
  178. successResult: &cloudwatchlogs.PutLogEventsOutput{
  179. NextSequenceToken: aws.String(nextSequenceToken),
  180. },
  181. }
  182. events := []wrappedEvent{
  183. {
  184. inputLogEvent: &cloudwatchlogs.InputLogEvent{
  185. Message: aws.String(logline),
  186. },
  187. },
  188. }
  189. stream.publishBatch(events)
  190. if stream.sequenceToken == nil {
  191. t.Fatal("Expected non-nil sequenceToken")
  192. }
  193. if *stream.sequenceToken != nextSequenceToken {
  194. t.Errorf("Expected sequenceToken to be %s, but was %s", nextSequenceToken, *stream.sequenceToken)
  195. }
  196. argument := <-mockClient.putLogEventsArgument
  197. if argument == nil {
  198. t.Fatal("Expected non-nil PutLogEventsInput")
  199. }
  200. if argument.SequenceToken == nil {
  201. t.Fatal("Expected non-nil PutLogEventsInput.SequenceToken")
  202. }
  203. if *argument.SequenceToken != sequenceToken {
  204. t.Errorf("Expected PutLogEventsInput.SequenceToken to be %s, but was %s", sequenceToken, *argument.SequenceToken)
  205. }
  206. if len(argument.LogEvents) != 1 {
  207. t.Errorf("Expected LogEvents to contain 1 element, but contains %d", len(argument.LogEvents))
  208. }
  209. if argument.LogEvents[0] != events[0].inputLogEvent {
  210. t.Error("Expected event to equal input")
  211. }
  212. }
  213. func TestPublishBatchError(t *testing.T) {
  214. mockClient := newMockClient()
  215. stream := &logStream{
  216. client: mockClient,
  217. logGroupName: groupName,
  218. logStreamName: streamName,
  219. sequenceToken: aws.String(sequenceToken),
  220. }
  221. mockClient.putLogEventsResult <- &putLogEventsResult{
  222. errorResult: errors.New("Error!"),
  223. }
  224. events := []wrappedEvent{
  225. {
  226. inputLogEvent: &cloudwatchlogs.InputLogEvent{
  227. Message: aws.String(logline),
  228. },
  229. },
  230. }
  231. stream.publishBatch(events)
  232. if stream.sequenceToken == nil {
  233. t.Fatal("Expected non-nil sequenceToken")
  234. }
  235. if *stream.sequenceToken != sequenceToken {
  236. t.Errorf("Expected sequenceToken to be %s, but was %s", sequenceToken, *stream.sequenceToken)
  237. }
  238. }
  239. func TestPublishBatchInvalidSeqSuccess(t *testing.T) {
  240. mockClient := newMockClientBuffered(2)
  241. stream := &logStream{
  242. client: mockClient,
  243. logGroupName: groupName,
  244. logStreamName: streamName,
  245. sequenceToken: aws.String(sequenceToken),
  246. }
  247. mockClient.putLogEventsResult <- &putLogEventsResult{
  248. errorResult: awserr.New(invalidSequenceTokenCode, "use token token", nil),
  249. }
  250. mockClient.putLogEventsResult <- &putLogEventsResult{
  251. successResult: &cloudwatchlogs.PutLogEventsOutput{
  252. NextSequenceToken: aws.String(nextSequenceToken),
  253. },
  254. }
  255. events := []wrappedEvent{
  256. {
  257. inputLogEvent: &cloudwatchlogs.InputLogEvent{
  258. Message: aws.String(logline),
  259. },
  260. },
  261. }
  262. stream.publishBatch(events)
  263. if stream.sequenceToken == nil {
  264. t.Fatal("Expected non-nil sequenceToken")
  265. }
  266. if *stream.sequenceToken != nextSequenceToken {
  267. t.Errorf("Expected sequenceToken to be %s, but was %s", nextSequenceToken, *stream.sequenceToken)
  268. }
  269. argument := <-mockClient.putLogEventsArgument
  270. if argument == nil {
  271. t.Fatal("Expected non-nil PutLogEventsInput")
  272. }
  273. if argument.SequenceToken == nil {
  274. t.Fatal("Expected non-nil PutLogEventsInput.SequenceToken")
  275. }
  276. if *argument.SequenceToken != sequenceToken {
  277. t.Errorf("Expected PutLogEventsInput.SequenceToken to be %s, but was %s", sequenceToken, *argument.SequenceToken)
  278. }
  279. if len(argument.LogEvents) != 1 {
  280. t.Errorf("Expected LogEvents to contain 1 element, but contains %d", len(argument.LogEvents))
  281. }
  282. if argument.LogEvents[0] != events[0].inputLogEvent {
  283. t.Error("Expected event to equal input")
  284. }
  285. argument = <-mockClient.putLogEventsArgument
  286. if argument == nil {
  287. t.Fatal("Expected non-nil PutLogEventsInput")
  288. }
  289. if argument.SequenceToken == nil {
  290. t.Fatal("Expected non-nil PutLogEventsInput.SequenceToken")
  291. }
  292. if *argument.SequenceToken != "token" {
  293. t.Errorf("Expected PutLogEventsInput.SequenceToken to be %s, but was %s", "token", *argument.SequenceToken)
  294. }
  295. if len(argument.LogEvents) != 1 {
  296. t.Errorf("Expected LogEvents to contain 1 element, but contains %d", len(argument.LogEvents))
  297. }
  298. if argument.LogEvents[0] != events[0].inputLogEvent {
  299. t.Error("Expected event to equal input")
  300. }
  301. }
  302. func TestPublishBatchAlreadyAccepted(t *testing.T) {
  303. mockClient := newMockClient()
  304. stream := &logStream{
  305. client: mockClient,
  306. logGroupName: groupName,
  307. logStreamName: streamName,
  308. sequenceToken: aws.String(sequenceToken),
  309. }
  310. mockClient.putLogEventsResult <- &putLogEventsResult{
  311. errorResult: awserr.New(dataAlreadyAcceptedCode, "use token token", nil),
  312. }
  313. events := []wrappedEvent{
  314. {
  315. inputLogEvent: &cloudwatchlogs.InputLogEvent{
  316. Message: aws.String(logline),
  317. },
  318. },
  319. }
  320. stream.publishBatch(events)
  321. if stream.sequenceToken == nil {
  322. t.Fatal("Expected non-nil sequenceToken")
  323. }
  324. if *stream.sequenceToken != "token" {
  325. t.Errorf("Expected sequenceToken to be %s, but was %s", "token", *stream.sequenceToken)
  326. }
  327. argument := <-mockClient.putLogEventsArgument
  328. if argument == nil {
  329. t.Fatal("Expected non-nil PutLogEventsInput")
  330. }
  331. if argument.SequenceToken == nil {
  332. t.Fatal("Expected non-nil PutLogEventsInput.SequenceToken")
  333. }
  334. if *argument.SequenceToken != sequenceToken {
  335. t.Errorf("Expected PutLogEventsInput.SequenceToken to be %s, but was %s", sequenceToken, *argument.SequenceToken)
  336. }
  337. if len(argument.LogEvents) != 1 {
  338. t.Errorf("Expected LogEvents to contain 1 element, but contains %d", len(argument.LogEvents))
  339. }
  340. if argument.LogEvents[0] != events[0].inputLogEvent {
  341. t.Error("Expected event to equal input")
  342. }
  343. }
  344. func TestCollectBatchSimple(t *testing.T) {
  345. mockClient := newMockClient()
  346. stream := &logStream{
  347. client: mockClient,
  348. logGroupName: groupName,
  349. logStreamName: streamName,
  350. sequenceToken: aws.String(sequenceToken),
  351. messages: make(chan *logger.Message),
  352. }
  353. mockClient.putLogEventsResult <- &putLogEventsResult{
  354. successResult: &cloudwatchlogs.PutLogEventsOutput{
  355. NextSequenceToken: aws.String(nextSequenceToken),
  356. },
  357. }
  358. ticks := make(chan time.Time)
  359. newTicker = func(_ time.Duration) *time.Ticker {
  360. return &time.Ticker{
  361. C: ticks,
  362. }
  363. }
  364. go stream.collectBatch()
  365. stream.Log(&logger.Message{
  366. Line: []byte(logline),
  367. Timestamp: time.Time{},
  368. })
  369. ticks <- time.Time{}
  370. stream.Close()
  371. argument := <-mockClient.putLogEventsArgument
  372. if argument == nil {
  373. t.Fatal("Expected non-nil PutLogEventsInput")
  374. }
  375. if len(argument.LogEvents) != 1 {
  376. t.Errorf("Expected LogEvents to contain 1 element, but contains %d", len(argument.LogEvents))
  377. }
  378. if *argument.LogEvents[0].Message != logline {
  379. t.Errorf("Expected message to be %s but was %s", logline, *argument.LogEvents[0].Message)
  380. }
  381. }
  382. func TestCollectBatchTicker(t *testing.T) {
  383. mockClient := newMockClient()
  384. stream := &logStream{
  385. client: mockClient,
  386. logGroupName: groupName,
  387. logStreamName: streamName,
  388. sequenceToken: aws.String(sequenceToken),
  389. messages: make(chan *logger.Message),
  390. }
  391. mockClient.putLogEventsResult <- &putLogEventsResult{
  392. successResult: &cloudwatchlogs.PutLogEventsOutput{
  393. NextSequenceToken: aws.String(nextSequenceToken),
  394. },
  395. }
  396. ticks := make(chan time.Time)
  397. newTicker = func(_ time.Duration) *time.Ticker {
  398. return &time.Ticker{
  399. C: ticks,
  400. }
  401. }
  402. go stream.collectBatch()
  403. stream.Log(&logger.Message{
  404. Line: []byte(logline + " 1"),
  405. Timestamp: time.Time{},
  406. })
  407. stream.Log(&logger.Message{
  408. Line: []byte(logline + " 2"),
  409. Timestamp: time.Time{},
  410. })
  411. ticks <- time.Time{}
  412. // Verify first batch
  413. argument := <-mockClient.putLogEventsArgument
  414. if argument == nil {
  415. t.Fatal("Expected non-nil PutLogEventsInput")
  416. }
  417. if len(argument.LogEvents) != 2 {
  418. t.Errorf("Expected LogEvents to contain 2 elements, but contains %d", len(argument.LogEvents))
  419. }
  420. if *argument.LogEvents[0].Message != logline+" 1" {
  421. t.Errorf("Expected message to be %s but was %s", logline+" 1", *argument.LogEvents[0].Message)
  422. }
  423. if *argument.LogEvents[1].Message != logline+" 2" {
  424. t.Errorf("Expected message to be %s but was %s", logline+" 2", *argument.LogEvents[0].Message)
  425. }
  426. stream.Log(&logger.Message{
  427. Line: []byte(logline + " 3"),
  428. Timestamp: time.Time{},
  429. })
  430. ticks <- time.Time{}
  431. argument = <-mockClient.putLogEventsArgument
  432. if argument == nil {
  433. t.Fatal("Expected non-nil PutLogEventsInput")
  434. }
  435. if len(argument.LogEvents) != 1 {
  436. t.Errorf("Expected LogEvents to contain 1 elements, but contains %d", len(argument.LogEvents))
  437. }
  438. if *argument.LogEvents[0].Message != logline+" 3" {
  439. t.Errorf("Expected message to be %s but was %s", logline+" 3", *argument.LogEvents[0].Message)
  440. }
  441. stream.Close()
  442. }
  443. func TestCollectBatchMultilinePattern(t *testing.T) {
  444. mockClient := newMockClient()
  445. multilinePattern := regexp.MustCompile("xxxx")
  446. stream := &logStream{
  447. client: mockClient,
  448. logGroupName: groupName,
  449. logStreamName: streamName,
  450. multilinePattern: multilinePattern,
  451. sequenceToken: aws.String(sequenceToken),
  452. messages: make(chan *logger.Message),
  453. }
  454. mockClient.putLogEventsResult <- &putLogEventsResult{
  455. successResult: &cloudwatchlogs.PutLogEventsOutput{
  456. NextSequenceToken: aws.String(nextSequenceToken),
  457. },
  458. }
  459. ticks := make(chan time.Time)
  460. newTicker = func(_ time.Duration) *time.Ticker {
  461. return &time.Ticker{
  462. C: ticks,
  463. }
  464. }
  465. go stream.collectBatch()
  466. stream.Log(&logger.Message{
  467. Line: []byte(logline),
  468. Timestamp: time.Now(),
  469. })
  470. stream.Log(&logger.Message{
  471. Line: []byte(logline),
  472. Timestamp: time.Now(),
  473. })
  474. stream.Log(&logger.Message{
  475. Line: []byte("xxxx " + logline),
  476. Timestamp: time.Now(),
  477. })
  478. ticks <- time.Now()
  479. // Verify single multiline event
  480. argument := <-mockClient.putLogEventsArgument
  481. assert.NotNil(t, argument, "Expected non-nil PutLogEventsInput")
  482. assert.Equal(t, 1, len(argument.LogEvents), "Expected single multiline event")
  483. assert.Equal(t, logline+"\n"+logline+"\n", *argument.LogEvents[0].Message, "Received incorrect multiline message")
  484. stream.Close()
  485. // Verify single event
  486. argument = <-mockClient.putLogEventsArgument
  487. assert.NotNil(t, argument, "Expected non-nil PutLogEventsInput")
  488. assert.Equal(t, 1, len(argument.LogEvents), "Expected single multiline event")
  489. assert.Equal(t, "xxxx "+logline+"\n", *argument.LogEvents[0].Message, "Received incorrect multiline message")
  490. }
  491. func BenchmarkCollectBatch(b *testing.B) {
  492. for i := 0; i < b.N; i++ {
  493. mockClient := newMockClient()
  494. stream := &logStream{
  495. client: mockClient,
  496. logGroupName: groupName,
  497. logStreamName: streamName,
  498. sequenceToken: aws.String(sequenceToken),
  499. messages: make(chan *logger.Message),
  500. }
  501. mockClient.putLogEventsResult <- &putLogEventsResult{
  502. successResult: &cloudwatchlogs.PutLogEventsOutput{
  503. NextSequenceToken: aws.String(nextSequenceToken),
  504. },
  505. }
  506. ticks := make(chan time.Time)
  507. newTicker = func(_ time.Duration) *time.Ticker {
  508. return &time.Ticker{
  509. C: ticks,
  510. }
  511. }
  512. go stream.collectBatch()
  513. stream.logGenerator(10, 100)
  514. ticks <- time.Time{}
  515. stream.Close()
  516. }
  517. }
  518. func BenchmarkCollectBatchMultilinePattern(b *testing.B) {
  519. for i := 0; i < b.N; i++ {
  520. mockClient := newMockClient()
  521. multilinePattern := regexp.MustCompile(`\d{4}-(?:0[1-9]|1[0-2])-(?:0[1-9]|[1,2][0-9]|3[0,1]) (?:[0,1][0-9]|2[0-3]):[0-5][0-9]:[0-5][0-9]`)
  522. stream := &logStream{
  523. client: mockClient,
  524. logGroupName: groupName,
  525. logStreamName: streamName,
  526. multilinePattern: multilinePattern,
  527. sequenceToken: aws.String(sequenceToken),
  528. messages: make(chan *logger.Message),
  529. }
  530. mockClient.putLogEventsResult <- &putLogEventsResult{
  531. successResult: &cloudwatchlogs.PutLogEventsOutput{
  532. NextSequenceToken: aws.String(nextSequenceToken),
  533. },
  534. }
  535. ticks := make(chan time.Time)
  536. newTicker = func(_ time.Duration) *time.Ticker {
  537. return &time.Ticker{
  538. C: ticks,
  539. }
  540. }
  541. go stream.collectBatch()
  542. stream.logGenerator(10, 100)
  543. ticks <- time.Time{}
  544. stream.Close()
  545. }
  546. }
  547. func TestCollectBatchMultilinePatternMaxEventAge(t *testing.T) {
  548. mockClient := newMockClient()
  549. multilinePattern := regexp.MustCompile("xxxx")
  550. stream := &logStream{
  551. client: mockClient,
  552. logGroupName: groupName,
  553. logStreamName: streamName,
  554. multilinePattern: multilinePattern,
  555. sequenceToken: aws.String(sequenceToken),
  556. messages: make(chan *logger.Message),
  557. }
  558. mockClient.putLogEventsResult <- &putLogEventsResult{
  559. successResult: &cloudwatchlogs.PutLogEventsOutput{
  560. NextSequenceToken: aws.String(nextSequenceToken),
  561. },
  562. }
  563. ticks := make(chan time.Time)
  564. newTicker = func(_ time.Duration) *time.Ticker {
  565. return &time.Ticker{
  566. C: ticks,
  567. }
  568. }
  569. go stream.collectBatch()
  570. stream.Log(&logger.Message{
  571. Line: []byte(logline),
  572. Timestamp: time.Now(),
  573. })
  574. // Log an event 1 second later
  575. stream.Log(&logger.Message{
  576. Line: []byte(logline),
  577. Timestamp: time.Now().Add(time.Second),
  578. })
  579. // Fire ticker batchPublishFrequency seconds later
  580. ticks <- time.Now().Add(batchPublishFrequency * time.Second)
  581. // Verify single multiline event is flushed after maximum event buffer age (batchPublishFrequency)
  582. argument := <-mockClient.putLogEventsArgument
  583. assert.NotNil(t, argument, "Expected non-nil PutLogEventsInput")
  584. assert.Equal(t, 1, len(argument.LogEvents), "Expected single multiline event")
  585. assert.Equal(t, logline+"\n"+logline+"\n", *argument.LogEvents[0].Message, "Received incorrect multiline message")
  586. stream.Close()
  587. }
  588. func TestCollectBatchMultilinePatternNegativeEventAge(t *testing.T) {
  589. mockClient := newMockClient()
  590. multilinePattern := regexp.MustCompile("xxxx")
  591. stream := &logStream{
  592. client: mockClient,
  593. logGroupName: groupName,
  594. logStreamName: streamName,
  595. multilinePattern: multilinePattern,
  596. sequenceToken: aws.String(sequenceToken),
  597. messages: make(chan *logger.Message),
  598. }
  599. mockClient.putLogEventsResult <- &putLogEventsResult{
  600. successResult: &cloudwatchlogs.PutLogEventsOutput{
  601. NextSequenceToken: aws.String(nextSequenceToken),
  602. },
  603. }
  604. ticks := make(chan time.Time)
  605. newTicker = func(_ time.Duration) *time.Ticker {
  606. return &time.Ticker{
  607. C: ticks,
  608. }
  609. }
  610. go stream.collectBatch()
  611. stream.Log(&logger.Message{
  612. Line: []byte(logline),
  613. Timestamp: time.Now(),
  614. })
  615. // Log an event 1 second later
  616. stream.Log(&logger.Message{
  617. Line: []byte(logline),
  618. Timestamp: time.Now().Add(time.Second),
  619. })
  620. // Fire ticker in past to simulate negative event buffer age
  621. ticks <- time.Now().Add(-time.Second)
  622. // Verify single multiline event is flushed with a negative event buffer age
  623. argument := <-mockClient.putLogEventsArgument
  624. assert.NotNil(t, argument, "Expected non-nil PutLogEventsInput")
  625. assert.Equal(t, 1, len(argument.LogEvents), "Expected single multiline event")
  626. assert.Equal(t, logline+"\n"+logline+"\n", *argument.LogEvents[0].Message, "Received incorrect multiline message")
  627. stream.Close()
  628. }
  629. func TestCollectBatchClose(t *testing.T) {
  630. mockClient := newMockClient()
  631. stream := &logStream{
  632. client: mockClient,
  633. logGroupName: groupName,
  634. logStreamName: streamName,
  635. sequenceToken: aws.String(sequenceToken),
  636. messages: make(chan *logger.Message),
  637. }
  638. mockClient.putLogEventsResult <- &putLogEventsResult{
  639. successResult: &cloudwatchlogs.PutLogEventsOutput{
  640. NextSequenceToken: aws.String(nextSequenceToken),
  641. },
  642. }
  643. var ticks = make(chan time.Time)
  644. newTicker = func(_ time.Duration) *time.Ticker {
  645. return &time.Ticker{
  646. C: ticks,
  647. }
  648. }
  649. go stream.collectBatch()
  650. stream.Log(&logger.Message{
  651. Line: []byte(logline),
  652. Timestamp: time.Time{},
  653. })
  654. // no ticks
  655. stream.Close()
  656. argument := <-mockClient.putLogEventsArgument
  657. if argument == nil {
  658. t.Fatal("Expected non-nil PutLogEventsInput")
  659. }
  660. if len(argument.LogEvents) != 1 {
  661. t.Errorf("Expected LogEvents to contain 1 element, but contains %d", len(argument.LogEvents))
  662. }
  663. if *argument.LogEvents[0].Message != logline {
  664. t.Errorf("Expected message to be %s but was %s", logline, *argument.LogEvents[0].Message)
  665. }
  666. }
  667. func TestCollectBatchLineSplit(t *testing.T) {
  668. mockClient := newMockClient()
  669. stream := &logStream{
  670. client: mockClient,
  671. logGroupName: groupName,
  672. logStreamName: streamName,
  673. sequenceToken: aws.String(sequenceToken),
  674. messages: make(chan *logger.Message),
  675. }
  676. mockClient.putLogEventsResult <- &putLogEventsResult{
  677. successResult: &cloudwatchlogs.PutLogEventsOutput{
  678. NextSequenceToken: aws.String(nextSequenceToken),
  679. },
  680. }
  681. var ticks = make(chan time.Time)
  682. newTicker = func(_ time.Duration) *time.Ticker {
  683. return &time.Ticker{
  684. C: ticks,
  685. }
  686. }
  687. go stream.collectBatch()
  688. longline := strings.Repeat("A", maximumBytesPerEvent)
  689. stream.Log(&logger.Message{
  690. Line: []byte(longline + "B"),
  691. Timestamp: time.Time{},
  692. })
  693. // no ticks
  694. stream.Close()
  695. argument := <-mockClient.putLogEventsArgument
  696. if argument == nil {
  697. t.Fatal("Expected non-nil PutLogEventsInput")
  698. }
  699. if len(argument.LogEvents) != 2 {
  700. t.Errorf("Expected LogEvents to contain 2 elements, but contains %d", len(argument.LogEvents))
  701. }
  702. if *argument.LogEvents[0].Message != longline {
  703. t.Errorf("Expected message to be %s but was %s", longline, *argument.LogEvents[0].Message)
  704. }
  705. if *argument.LogEvents[1].Message != "B" {
  706. t.Errorf("Expected message to be %s but was %s", "B", *argument.LogEvents[1].Message)
  707. }
  708. }
  709. func TestCollectBatchMaxEvents(t *testing.T) {
  710. mockClient := newMockClientBuffered(1)
  711. stream := &logStream{
  712. client: mockClient,
  713. logGroupName: groupName,
  714. logStreamName: streamName,
  715. sequenceToken: aws.String(sequenceToken),
  716. messages: make(chan *logger.Message),
  717. }
  718. mockClient.putLogEventsResult <- &putLogEventsResult{
  719. successResult: &cloudwatchlogs.PutLogEventsOutput{
  720. NextSequenceToken: aws.String(nextSequenceToken),
  721. },
  722. }
  723. var ticks = make(chan time.Time)
  724. newTicker = func(_ time.Duration) *time.Ticker {
  725. return &time.Ticker{
  726. C: ticks,
  727. }
  728. }
  729. go stream.collectBatch()
  730. line := "A"
  731. for i := 0; i <= maximumLogEventsPerPut; i++ {
  732. stream.Log(&logger.Message{
  733. Line: []byte(line),
  734. Timestamp: time.Time{},
  735. })
  736. }
  737. // no ticks
  738. stream.Close()
  739. argument := <-mockClient.putLogEventsArgument
  740. if argument == nil {
  741. t.Fatal("Expected non-nil PutLogEventsInput")
  742. }
  743. if len(argument.LogEvents) != maximumLogEventsPerPut {
  744. t.Errorf("Expected LogEvents to contain %d elements, but contains %d", maximumLogEventsPerPut, len(argument.LogEvents))
  745. }
  746. argument = <-mockClient.putLogEventsArgument
  747. if argument == nil {
  748. t.Fatal("Expected non-nil PutLogEventsInput")
  749. }
  750. if len(argument.LogEvents) != 1 {
  751. t.Errorf("Expected LogEvents to contain %d elements, but contains %d", 1, len(argument.LogEvents))
  752. }
  753. }
  754. func TestCollectBatchMaxTotalBytes(t *testing.T) {
  755. mockClient := newMockClientBuffered(1)
  756. stream := &logStream{
  757. client: mockClient,
  758. logGroupName: groupName,
  759. logStreamName: streamName,
  760. sequenceToken: aws.String(sequenceToken),
  761. messages: make(chan *logger.Message),
  762. }
  763. mockClient.putLogEventsResult <- &putLogEventsResult{
  764. successResult: &cloudwatchlogs.PutLogEventsOutput{
  765. NextSequenceToken: aws.String(nextSequenceToken),
  766. },
  767. }
  768. var ticks = make(chan time.Time)
  769. newTicker = func(_ time.Duration) *time.Ticker {
  770. return &time.Ticker{
  771. C: ticks,
  772. }
  773. }
  774. go stream.collectBatch()
  775. longline := strings.Repeat("A", maximumBytesPerPut)
  776. stream.Log(&logger.Message{
  777. Line: []byte(longline + "B"),
  778. Timestamp: time.Time{},
  779. })
  780. // no ticks
  781. stream.Close()
  782. argument := <-mockClient.putLogEventsArgument
  783. if argument == nil {
  784. t.Fatal("Expected non-nil PutLogEventsInput")
  785. }
  786. bytes := 0
  787. for _, event := range argument.LogEvents {
  788. bytes += len(*event.Message)
  789. }
  790. if bytes > maximumBytesPerPut {
  791. t.Errorf("Expected <= %d bytes but was %d", maximumBytesPerPut, bytes)
  792. }
  793. argument = <-mockClient.putLogEventsArgument
  794. if len(argument.LogEvents) != 1 {
  795. t.Errorf("Expected LogEvents to contain 1 elements, but contains %d", len(argument.LogEvents))
  796. }
  797. message := *argument.LogEvents[0].Message
  798. if message[len(message)-1:] != "B" {
  799. t.Errorf("Expected message to be %s but was %s", "B", message[len(message)-1:])
  800. }
  801. }
  802. func TestCollectBatchWithDuplicateTimestamps(t *testing.T) {
  803. mockClient := newMockClient()
  804. stream := &logStream{
  805. client: mockClient,
  806. logGroupName: groupName,
  807. logStreamName: streamName,
  808. sequenceToken: aws.String(sequenceToken),
  809. messages: make(chan *logger.Message),
  810. }
  811. mockClient.putLogEventsResult <- &putLogEventsResult{
  812. successResult: &cloudwatchlogs.PutLogEventsOutput{
  813. NextSequenceToken: aws.String(nextSequenceToken),
  814. },
  815. }
  816. ticks := make(chan time.Time)
  817. newTicker = func(_ time.Duration) *time.Ticker {
  818. return &time.Ticker{
  819. C: ticks,
  820. }
  821. }
  822. go stream.collectBatch()
  823. times := maximumLogEventsPerPut
  824. expectedEvents := []*cloudwatchlogs.InputLogEvent{}
  825. timestamp := time.Now()
  826. for i := 0; i < times; i++ {
  827. line := fmt.Sprintf("%d", i)
  828. if i%2 == 0 {
  829. timestamp.Add(1 * time.Nanosecond)
  830. }
  831. stream.Log(&logger.Message{
  832. Line: []byte(line),
  833. Timestamp: timestamp,
  834. })
  835. expectedEvents = append(expectedEvents, &cloudwatchlogs.InputLogEvent{
  836. Message: aws.String(line),
  837. Timestamp: aws.Int64(timestamp.UnixNano() / int64(time.Millisecond)),
  838. })
  839. }
  840. ticks <- time.Time{}
  841. stream.Close()
  842. argument := <-mockClient.putLogEventsArgument
  843. if argument == nil {
  844. t.Fatal("Expected non-nil PutLogEventsInput")
  845. }
  846. if len(argument.LogEvents) != times {
  847. t.Errorf("Expected LogEvents to contain %d elements, but contains %d", times, len(argument.LogEvents))
  848. }
  849. for i := 0; i < times; i++ {
  850. if !reflect.DeepEqual(*argument.LogEvents[i], *expectedEvents[i]) {
  851. t.Errorf("Expected event to be %v but was %v", *expectedEvents[i], *argument.LogEvents[i])
  852. }
  853. }
  854. }
  855. func TestParseLogOptionsMultilinePattern(t *testing.T) {
  856. info := logger.Info{
  857. Config: map[string]string{
  858. multilinePatternKey: "^xxxx",
  859. },
  860. }
  861. multilinePattern, err := parseMultilineOptions(info)
  862. assert.Nil(t, err, "Received unexpected error")
  863. assert.True(t, multilinePattern.MatchString("xxxx"), "No multiline pattern match found")
  864. }
  865. func TestParseLogOptionsDatetimeFormat(t *testing.T) {
  866. datetimeFormatTests := []struct {
  867. format string
  868. match string
  869. }{
  870. {"%d/%m/%y %a %H:%M:%S%L %Z", "31/12/10 Mon 08:42:44.345 NZDT"},
  871. {"%Y-%m-%d %A %I:%M:%S.%f%p%z", "2007-12-04 Monday 08:42:44.123456AM+1200"},
  872. {"%b|%b|%b|%b|%b|%b|%b|%b|%b|%b|%b|%b", "Jan|Feb|Mar|Apr|May|Jun|Jul|Aug|Sep|Oct|Nov|Dec"},
  873. {"%B|%B|%B|%B|%B|%B|%B|%B|%B|%B|%B|%B", "January|February|March|April|May|June|July|August|September|October|November|December"},
  874. {"%A|%A|%A|%A|%A|%A|%A", "Monday|Tuesday|Wednesday|Thursday|Friday|Saturday|Sunday"},
  875. {"%a|%a|%a|%a|%a|%a|%a", "Mon|Tue|Wed|Thu|Fri|Sat|Sun"},
  876. {"Day of the week: %w, Day of the year: %j", "Day of the week: 4, Day of the year: 091"},
  877. }
  878. for _, dt := range datetimeFormatTests {
  879. t.Run(dt.match, func(t *testing.T) {
  880. info := logger.Info{
  881. Config: map[string]string{
  882. datetimeFormatKey: dt.format,
  883. },
  884. }
  885. multilinePattern, err := parseMultilineOptions(info)
  886. assert.Nil(t, err, "Received unexpected error")
  887. assert.True(t, multilinePattern.MatchString(dt.match), "No multiline pattern match found")
  888. })
  889. }
  890. }
  891. func TestValidateLogOptionsDatetimeFormatAndMultilinePattern(t *testing.T) {
  892. cfg := map[string]string{
  893. multilinePatternKey: "^xxxx",
  894. datetimeFormatKey: "%Y-%m-%d",
  895. logGroupKey: groupName,
  896. }
  897. conflictingLogOptionsError := "you cannot configure log opt 'awslogs-datetime-format' and 'awslogs-multiline-pattern' at the same time"
  898. err := ValidateLogOpt(cfg)
  899. assert.NotNil(t, err, "Expected an error")
  900. assert.Equal(t, err.Error(), conflictingLogOptionsError, "Received invalid error")
  901. }
  902. func TestCreateTagSuccess(t *testing.T) {
  903. mockClient := newMockClient()
  904. info := logger.Info{
  905. ContainerName: "/test-container",
  906. ContainerID: "container-abcdefghijklmnopqrstuvwxyz01234567890",
  907. Config: map[string]string{"tag": "{{.Name}}/{{.FullID}}"},
  908. }
  909. logStreamName, e := loggerutils.ParseLogTag(info, loggerutils.DefaultTemplate)
  910. if e != nil {
  911. t.Errorf("Error generating tag: %q", e)
  912. }
  913. stream := &logStream{
  914. client: mockClient,
  915. logGroupName: groupName,
  916. logStreamName: logStreamName,
  917. }
  918. mockClient.createLogStreamResult <- &createLogStreamResult{}
  919. err := stream.create()
  920. if err != nil {
  921. t.Errorf("Received unexpected err: %v\n", err)
  922. }
  923. argument := <-mockClient.createLogStreamArgument
  924. if *argument.LogStreamName != "test-container/container-abcdefghijklmnopqrstuvwxyz01234567890" {
  925. t.Errorf("Expected LogStreamName to be %s", "test-container/container-abcdefghijklmnopqrstuvwxyz01234567890")
  926. }
  927. }
  928. func BenchmarkUnwrapEvents(b *testing.B) {
  929. events := make([]wrappedEvent, maximumLogEventsPerPut)
  930. for i := 0; i < maximumLogEventsPerPut; i++ {
  931. mes := strings.Repeat("0", maximumBytesPerEvent)
  932. events[i].inputLogEvent = &cloudwatchlogs.InputLogEvent{
  933. Message: &mes,
  934. }
  935. }
  936. as := assert.New(b)
  937. b.ResetTimer()
  938. for i := 0; i < b.N; i++ {
  939. res := unwrapEvents(events)
  940. as.Len(res, maximumLogEventsPerPut)
  941. }
  942. }