cloudwatchlogs_test.go 35 KB


  1. package awslogs
  2. import (
  3. "errors"
  4. "fmt"
  5. "io/ioutil"
  6. "net/http"
  7. "net/http/httptest"
  8. "os"
  9. "reflect"
  10. "regexp"
  11. "runtime"
  12. "strings"
  13. "testing"
  14. "time"
  15. "github.com/aws/aws-sdk-go/aws"
  16. "github.com/aws/aws-sdk-go/aws/awserr"
  17. "github.com/aws/aws-sdk-go/aws/request"
  18. "github.com/aws/aws-sdk-go/service/cloudwatchlogs"
  19. "github.com/docker/docker/daemon/logger"
  20. "github.com/docker/docker/daemon/logger/loggerutils"
  21. "github.com/docker/docker/dockerversion"
  22. "github.com/stretchr/testify/assert"
  23. )
  24. const (
  25. groupName = "groupName"
  26. streamName = "streamName"
  27. sequenceToken = "sequenceToken"
  28. nextSequenceToken = "nextSequenceToken"
  29. logline = "this is a log line\r"
  30. multilineLogline = "2017-01-01 01:01:44 This is a multiline log entry\r"
  31. )
  32. // Generates i multi-line events each with j lines
  33. func (l *logStream) logGenerator(lineCount int, multilineCount int) {
  34. for i := 0; i < multilineCount; i++ {
  35. l.Log(&logger.Message{
  36. Line: []byte(multilineLogline),
  37. Timestamp: time.Time{},
  38. })
  39. for j := 0; j < lineCount; j++ {
  40. l.Log(&logger.Message{
  41. Line: []byte(logline),
  42. Timestamp: time.Time{},
  43. })
  44. }
  45. }
  46. }
  47. func testEventBatch(events []wrappedEvent) *eventBatch {
  48. batch := newEventBatch()
  49. for _, event := range events {
  50. eventlen := len([]byte(*event.inputLogEvent.Message))
  51. batch.add(event, eventlen)
  52. }
  53. return batch
  54. }
  55. func TestNewAWSLogsClientUserAgentHandler(t *testing.T) {
  56. info := logger.Info{
  57. Config: map[string]string{
  58. regionKey: "us-east-1",
  59. },
  60. }
  61. client, err := newAWSLogsClient(info)
  62. if err != nil {
  63. t.Fatal(err)
  64. }
  65. realClient, ok := client.(*cloudwatchlogs.CloudWatchLogs)
  66. if !ok {
  67. t.Fatal("Could not cast client to cloudwatchlogs.CloudWatchLogs")
  68. }
  69. buildHandlerList := realClient.Handlers.Build
  70. request := &request.Request{
  71. HTTPRequest: &http.Request{
  72. Header: http.Header{},
  73. },
  74. }
  75. buildHandlerList.Run(request)
  76. expectedUserAgentString := fmt.Sprintf("Docker %s (%s) %s/%s (%s; %s; %s)",
  77. dockerversion.Version, runtime.GOOS, aws.SDKName, aws.SDKVersion, runtime.Version(), runtime.GOOS, runtime.GOARCH)
  78. userAgent := request.HTTPRequest.Header.Get("User-Agent")
  79. if userAgent != expectedUserAgentString {
  80. t.Errorf("Wrong User-Agent string, expected \"%s\" but was \"%s\"",
  81. expectedUserAgentString, userAgent)
  82. }
  83. }
  84. func TestNewAWSLogsClientRegionDetect(t *testing.T) {
  85. info := logger.Info{
  86. Config: map[string]string{},
  87. }
  88. mockMetadata := newMockMetadataClient()
  89. newRegionFinder = func() regionFinder {
  90. return mockMetadata
  91. }
  92. mockMetadata.regionResult <- &regionResult{
  93. successResult: "us-east-1",
  94. }
  95. _, err := newAWSLogsClient(info)
  96. if err != nil {
  97. t.Fatal(err)
  98. }
  99. }
  100. func TestCreateSuccess(t *testing.T) {
  101. mockClient := newMockClient()
  102. stream := &logStream{
  103. client: mockClient,
  104. logGroupName: groupName,
  105. logStreamName: streamName,
  106. }
  107. mockClient.createLogStreamResult <- &createLogStreamResult{}
  108. err := stream.create()
  109. if err != nil {
  110. t.Errorf("Received unexpected err: %v\n", err)
  111. }
  112. argument := <-mockClient.createLogStreamArgument
  113. if argument.LogGroupName == nil {
  114. t.Fatal("Expected non-nil LogGroupName")
  115. }
  116. if *argument.LogGroupName != groupName {
  117. t.Errorf("Expected LogGroupName to be %s", groupName)
  118. }
  119. if argument.LogStreamName == nil {
  120. t.Fatal("Expected non-nil LogStreamName")
  121. }
  122. if *argument.LogStreamName != streamName {
  123. t.Errorf("Expected LogStreamName to be %s", streamName)
  124. }
  125. }
  126. func TestCreateLogGroupSuccess(t *testing.T) {
  127. mockClient := newMockClient()
  128. stream := &logStream{
  129. client: mockClient,
  130. logGroupName: groupName,
  131. logStreamName: streamName,
  132. logCreateGroup: true,
  133. }
  134. mockClient.createLogGroupResult <- &createLogGroupResult{}
  135. mockClient.createLogStreamResult <- &createLogStreamResult{}
  136. err := stream.create()
  137. if err != nil {
  138. t.Errorf("Received unexpected err: %v\n", err)
  139. }
  140. argument := <-mockClient.createLogStreamArgument
  141. if argument.LogGroupName == nil {
  142. t.Fatal("Expected non-nil LogGroupName")
  143. }
  144. if *argument.LogGroupName != groupName {
  145. t.Errorf("Expected LogGroupName to be %s", groupName)
  146. }
  147. if argument.LogStreamName == nil {
  148. t.Fatal("Expected non-nil LogStreamName")
  149. }
  150. if *argument.LogStreamName != streamName {
  151. t.Errorf("Expected LogStreamName to be %s", streamName)
  152. }
  153. }
  154. func TestCreateError(t *testing.T) {
  155. mockClient := newMockClient()
  156. stream := &logStream{
  157. client: mockClient,
  158. }
  159. mockClient.createLogStreamResult <- &createLogStreamResult{
  160. errorResult: errors.New("Error"),
  161. }
  162. err := stream.create()
  163. if err == nil {
  164. t.Fatal("Expected non-nil err")
  165. }
  166. }
  167. func TestCreateAlreadyExists(t *testing.T) {
  168. mockClient := newMockClient()
  169. stream := &logStream{
  170. client: mockClient,
  171. }
  172. mockClient.createLogStreamResult <- &createLogStreamResult{
  173. errorResult: awserr.New(resourceAlreadyExistsCode, "", nil),
  174. }
  175. err := stream.create()
  176. if err != nil {
  177. t.Fatal("Expected nil err")
  178. }
  179. }
  180. func TestPublishBatchSuccess(t *testing.T) {
  181. mockClient := newMockClient()
  182. stream := &logStream{
  183. client: mockClient,
  184. logGroupName: groupName,
  185. logStreamName: streamName,
  186. sequenceToken: aws.String(sequenceToken),
  187. }
  188. mockClient.putLogEventsResult <- &putLogEventsResult{
  189. successResult: &cloudwatchlogs.PutLogEventsOutput{
  190. NextSequenceToken: aws.String(nextSequenceToken),
  191. },
  192. }
  193. events := []wrappedEvent{
  194. {
  195. inputLogEvent: &cloudwatchlogs.InputLogEvent{
  196. Message: aws.String(logline),
  197. },
  198. },
  199. }
  200. stream.publishBatch(testEventBatch(events))
  201. if stream.sequenceToken == nil {
  202. t.Fatal("Expected non-nil sequenceToken")
  203. }
  204. if *stream.sequenceToken != nextSequenceToken {
  205. t.Errorf("Expected sequenceToken to be %s, but was %s", nextSequenceToken, *stream.sequenceToken)
  206. }
  207. argument := <-mockClient.putLogEventsArgument
  208. if argument == nil {
  209. t.Fatal("Expected non-nil PutLogEventsInput")
  210. }
  211. if argument.SequenceToken == nil {
  212. t.Fatal("Expected non-nil PutLogEventsInput.SequenceToken")
  213. }
  214. if *argument.SequenceToken != sequenceToken {
  215. t.Errorf("Expected PutLogEventsInput.SequenceToken to be %s, but was %s", sequenceToken, *argument.SequenceToken)
  216. }
  217. if len(argument.LogEvents) != 1 {
  218. t.Errorf("Expected LogEvents to contain 1 element, but contains %d", len(argument.LogEvents))
  219. }
  220. if argument.LogEvents[0] != events[0].inputLogEvent {
  221. t.Error("Expected event to equal input")
  222. }
  223. }
  224. func TestPublishBatchError(t *testing.T) {
  225. mockClient := newMockClient()
  226. stream := &logStream{
  227. client: mockClient,
  228. logGroupName: groupName,
  229. logStreamName: streamName,
  230. sequenceToken: aws.String(sequenceToken),
  231. }
  232. mockClient.putLogEventsResult <- &putLogEventsResult{
  233. errorResult: errors.New("Error"),
  234. }
  235. events := []wrappedEvent{
  236. {
  237. inputLogEvent: &cloudwatchlogs.InputLogEvent{
  238. Message: aws.String(logline),
  239. },
  240. },
  241. }
  242. stream.publishBatch(testEventBatch(events))
  243. if stream.sequenceToken == nil {
  244. t.Fatal("Expected non-nil sequenceToken")
  245. }
  246. if *stream.sequenceToken != sequenceToken {
  247. t.Errorf("Expected sequenceToken to be %s, but was %s", sequenceToken, *stream.sequenceToken)
  248. }
  249. }
  250. func TestPublishBatchInvalidSeqSuccess(t *testing.T) {
  251. mockClient := newMockClientBuffered(2)
  252. stream := &logStream{
  253. client: mockClient,
  254. logGroupName: groupName,
  255. logStreamName: streamName,
  256. sequenceToken: aws.String(sequenceToken),
  257. }
  258. mockClient.putLogEventsResult <- &putLogEventsResult{
  259. errorResult: awserr.New(invalidSequenceTokenCode, "use token token", nil),
  260. }
  261. mockClient.putLogEventsResult <- &putLogEventsResult{
  262. successResult: &cloudwatchlogs.PutLogEventsOutput{
  263. NextSequenceToken: aws.String(nextSequenceToken),
  264. },
  265. }
  266. events := []wrappedEvent{
  267. {
  268. inputLogEvent: &cloudwatchlogs.InputLogEvent{
  269. Message: aws.String(logline),
  270. },
  271. },
  272. }
  273. stream.publishBatch(testEventBatch(events))
  274. if stream.sequenceToken == nil {
  275. t.Fatal("Expected non-nil sequenceToken")
  276. }
  277. if *stream.sequenceToken != nextSequenceToken {
  278. t.Errorf("Expected sequenceToken to be %s, but was %s", nextSequenceToken, *stream.sequenceToken)
  279. }
  280. argument := <-mockClient.putLogEventsArgument
  281. if argument == nil {
  282. t.Fatal("Expected non-nil PutLogEventsInput")
  283. }
  284. if argument.SequenceToken == nil {
  285. t.Fatal("Expected non-nil PutLogEventsInput.SequenceToken")
  286. }
  287. if *argument.SequenceToken != sequenceToken {
  288. t.Errorf("Expected PutLogEventsInput.SequenceToken to be %s, but was %s", sequenceToken, *argument.SequenceToken)
  289. }
  290. if len(argument.LogEvents) != 1 {
  291. t.Errorf("Expected LogEvents to contain 1 element, but contains %d", len(argument.LogEvents))
  292. }
  293. if argument.LogEvents[0] != events[0].inputLogEvent {
  294. t.Error("Expected event to equal input")
  295. }
  296. argument = <-mockClient.putLogEventsArgument
  297. if argument == nil {
  298. t.Fatal("Expected non-nil PutLogEventsInput")
  299. }
  300. if argument.SequenceToken == nil {
  301. t.Fatal("Expected non-nil PutLogEventsInput.SequenceToken")
  302. }
  303. if *argument.SequenceToken != "token" {
  304. t.Errorf("Expected PutLogEventsInput.SequenceToken to be %s, but was %s", "token", *argument.SequenceToken)
  305. }
  306. if len(argument.LogEvents) != 1 {
  307. t.Errorf("Expected LogEvents to contain 1 element, but contains %d", len(argument.LogEvents))
  308. }
  309. if argument.LogEvents[0] != events[0].inputLogEvent {
  310. t.Error("Expected event to equal input")
  311. }
  312. }
  313. func TestPublishBatchAlreadyAccepted(t *testing.T) {
  314. mockClient := newMockClient()
  315. stream := &logStream{
  316. client: mockClient,
  317. logGroupName: groupName,
  318. logStreamName: streamName,
  319. sequenceToken: aws.String(sequenceToken),
  320. }
  321. mockClient.putLogEventsResult <- &putLogEventsResult{
  322. errorResult: awserr.New(dataAlreadyAcceptedCode, "use token token", nil),
  323. }
  324. events := []wrappedEvent{
  325. {
  326. inputLogEvent: &cloudwatchlogs.InputLogEvent{
  327. Message: aws.String(logline),
  328. },
  329. },
  330. }
  331. stream.publishBatch(testEventBatch(events))
  332. if stream.sequenceToken == nil {
  333. t.Fatal("Expected non-nil sequenceToken")
  334. }
  335. if *stream.sequenceToken != "token" {
  336. t.Errorf("Expected sequenceToken to be %s, but was %s", "token", *stream.sequenceToken)
  337. }
  338. argument := <-mockClient.putLogEventsArgument
  339. if argument == nil {
  340. t.Fatal("Expected non-nil PutLogEventsInput")
  341. }
  342. if argument.SequenceToken == nil {
  343. t.Fatal("Expected non-nil PutLogEventsInput.SequenceToken")
  344. }
  345. if *argument.SequenceToken != sequenceToken {
  346. t.Errorf("Expected PutLogEventsInput.SequenceToken to be %s, but was %s", sequenceToken, *argument.SequenceToken)
  347. }
  348. if len(argument.LogEvents) != 1 {
  349. t.Errorf("Expected LogEvents to contain 1 element, but contains %d", len(argument.LogEvents))
  350. }
  351. if argument.LogEvents[0] != events[0].inputLogEvent {
  352. t.Error("Expected event to equal input")
  353. }
  354. }
  355. func TestCollectBatchSimple(t *testing.T) {
  356. mockClient := newMockClient()
  357. stream := &logStream{
  358. client: mockClient,
  359. logGroupName: groupName,
  360. logStreamName: streamName,
  361. sequenceToken: aws.String(sequenceToken),
  362. messages: make(chan *logger.Message),
  363. }
  364. mockClient.putLogEventsResult <- &putLogEventsResult{
  365. successResult: &cloudwatchlogs.PutLogEventsOutput{
  366. NextSequenceToken: aws.String(nextSequenceToken),
  367. },
  368. }
  369. ticks := make(chan time.Time)
  370. newTicker = func(_ time.Duration) *time.Ticker {
  371. return &time.Ticker{
  372. C: ticks,
  373. }
  374. }
  375. go stream.collectBatch()
  376. stream.Log(&logger.Message{
  377. Line: []byte(logline),
  378. Timestamp: time.Time{},
  379. })
  380. ticks <- time.Time{}
  381. stream.Close()
  382. argument := <-mockClient.putLogEventsArgument
  383. if argument == nil {
  384. t.Fatal("Expected non-nil PutLogEventsInput")
  385. }
  386. if len(argument.LogEvents) != 1 {
  387. t.Errorf("Expected LogEvents to contain 1 element, but contains %d", len(argument.LogEvents))
  388. }
  389. if *argument.LogEvents[0].Message != logline {
  390. t.Errorf("Expected message to be %s but was %s", logline, *argument.LogEvents[0].Message)
  391. }
  392. }
  393. func TestCollectBatchTicker(t *testing.T) {
  394. mockClient := newMockClient()
  395. stream := &logStream{
  396. client: mockClient,
  397. logGroupName: groupName,
  398. logStreamName: streamName,
  399. sequenceToken: aws.String(sequenceToken),
  400. messages: make(chan *logger.Message),
  401. }
  402. mockClient.putLogEventsResult <- &putLogEventsResult{
  403. successResult: &cloudwatchlogs.PutLogEventsOutput{
  404. NextSequenceToken: aws.String(nextSequenceToken),
  405. },
  406. }
  407. ticks := make(chan time.Time)
  408. newTicker = func(_ time.Duration) *time.Ticker {
  409. return &time.Ticker{
  410. C: ticks,
  411. }
  412. }
  413. go stream.collectBatch()
  414. stream.Log(&logger.Message{
  415. Line: []byte(logline + " 1"),
  416. Timestamp: time.Time{},
  417. })
  418. stream.Log(&logger.Message{
  419. Line: []byte(logline + " 2"),
  420. Timestamp: time.Time{},
  421. })
  422. ticks <- time.Time{}
  423. // Verify first batch
  424. argument := <-mockClient.putLogEventsArgument
  425. if argument == nil {
  426. t.Fatal("Expected non-nil PutLogEventsInput")
  427. }
  428. if len(argument.LogEvents) != 2 {
  429. t.Errorf("Expected LogEvents to contain 2 elements, but contains %d", len(argument.LogEvents))
  430. }
  431. if *argument.LogEvents[0].Message != logline+" 1" {
  432. t.Errorf("Expected message to be %s but was %s", logline+" 1", *argument.LogEvents[0].Message)
  433. }
  434. if *argument.LogEvents[1].Message != logline+" 2" {
  435. t.Errorf("Expected message to be %s but was %s", logline+" 2", *argument.LogEvents[0].Message)
  436. }
  437. stream.Log(&logger.Message{
  438. Line: []byte(logline + " 3"),
  439. Timestamp: time.Time{},
  440. })
  441. ticks <- time.Time{}
  442. argument = <-mockClient.putLogEventsArgument
  443. if argument == nil {
  444. t.Fatal("Expected non-nil PutLogEventsInput")
  445. }
  446. if len(argument.LogEvents) != 1 {
  447. t.Errorf("Expected LogEvents to contain 1 elements, but contains %d", len(argument.LogEvents))
  448. }
  449. if *argument.LogEvents[0].Message != logline+" 3" {
  450. t.Errorf("Expected message to be %s but was %s", logline+" 3", *argument.LogEvents[0].Message)
  451. }
  452. stream.Close()
  453. }
  454. func TestCollectBatchMultilinePattern(t *testing.T) {
  455. mockClient := newMockClient()
  456. multilinePattern := regexp.MustCompile("xxxx")
  457. stream := &logStream{
  458. client: mockClient,
  459. logGroupName: groupName,
  460. logStreamName: streamName,
  461. multilinePattern: multilinePattern,
  462. sequenceToken: aws.String(sequenceToken),
  463. messages: make(chan *logger.Message),
  464. }
  465. mockClient.putLogEventsResult <- &putLogEventsResult{
  466. successResult: &cloudwatchlogs.PutLogEventsOutput{
  467. NextSequenceToken: aws.String(nextSequenceToken),
  468. },
  469. }
  470. ticks := make(chan time.Time)
  471. newTicker = func(_ time.Duration) *time.Ticker {
  472. return &time.Ticker{
  473. C: ticks,
  474. }
  475. }
  476. go stream.collectBatch()
  477. stream.Log(&logger.Message{
  478. Line: []byte(logline),
  479. Timestamp: time.Now(),
  480. })
  481. stream.Log(&logger.Message{
  482. Line: []byte(logline),
  483. Timestamp: time.Now(),
  484. })
  485. stream.Log(&logger.Message{
  486. Line: []byte("xxxx " + logline),
  487. Timestamp: time.Now(),
  488. })
  489. ticks <- time.Now()
  490. // Verify single multiline event
  491. argument := <-mockClient.putLogEventsArgument
  492. assert.NotNil(t, argument, "Expected non-nil PutLogEventsInput")
  493. assert.Equal(t, 1, len(argument.LogEvents), "Expected single multiline event")
  494. assert.Equal(t, logline+"\n"+logline+"\n", *argument.LogEvents[0].Message, "Received incorrect multiline message")
  495. stream.Close()
  496. // Verify single event
  497. argument = <-mockClient.putLogEventsArgument
  498. assert.NotNil(t, argument, "Expected non-nil PutLogEventsInput")
  499. assert.Equal(t, 1, len(argument.LogEvents), "Expected single multiline event")
  500. assert.Equal(t, "xxxx "+logline+"\n", *argument.LogEvents[0].Message, "Received incorrect multiline message")
  501. }
  502. func BenchmarkCollectBatch(b *testing.B) {
  503. for i := 0; i < b.N; i++ {
  504. mockClient := newMockClient()
  505. stream := &logStream{
  506. client: mockClient,
  507. logGroupName: groupName,
  508. logStreamName: streamName,
  509. sequenceToken: aws.String(sequenceToken),
  510. messages: make(chan *logger.Message),
  511. }
  512. mockClient.putLogEventsResult <- &putLogEventsResult{
  513. successResult: &cloudwatchlogs.PutLogEventsOutput{
  514. NextSequenceToken: aws.String(nextSequenceToken),
  515. },
  516. }
  517. ticks := make(chan time.Time)
  518. newTicker = func(_ time.Duration) *time.Ticker {
  519. return &time.Ticker{
  520. C: ticks,
  521. }
  522. }
  523. go stream.collectBatch()
  524. stream.logGenerator(10, 100)
  525. ticks <- time.Time{}
  526. stream.Close()
  527. }
  528. }
  529. func BenchmarkCollectBatchMultilinePattern(b *testing.B) {
  530. for i := 0; i < b.N; i++ {
  531. mockClient := newMockClient()
  532. multilinePattern := regexp.MustCompile(`\d{4}-(?:0[1-9]|1[0-2])-(?:0[1-9]|[1,2][0-9]|3[0,1]) (?:[0,1][0-9]|2[0-3]):[0-5][0-9]:[0-5][0-9]`)
  533. stream := &logStream{
  534. client: mockClient,
  535. logGroupName: groupName,
  536. logStreamName: streamName,
  537. multilinePattern: multilinePattern,
  538. sequenceToken: aws.String(sequenceToken),
  539. messages: make(chan *logger.Message),
  540. }
  541. mockClient.putLogEventsResult <- &putLogEventsResult{
  542. successResult: &cloudwatchlogs.PutLogEventsOutput{
  543. NextSequenceToken: aws.String(nextSequenceToken),
  544. },
  545. }
  546. ticks := make(chan time.Time)
  547. newTicker = func(_ time.Duration) *time.Ticker {
  548. return &time.Ticker{
  549. C: ticks,
  550. }
  551. }
  552. go stream.collectBatch()
  553. stream.logGenerator(10, 100)
  554. ticks <- time.Time{}
  555. stream.Close()
  556. }
  557. }
  558. func TestCollectBatchMultilinePatternMaxEventAge(t *testing.T) {
  559. mockClient := newMockClient()
  560. multilinePattern := regexp.MustCompile("xxxx")
  561. stream := &logStream{
  562. client: mockClient,
  563. logGroupName: groupName,
  564. logStreamName: streamName,
  565. multilinePattern: multilinePattern,
  566. sequenceToken: aws.String(sequenceToken),
  567. messages: make(chan *logger.Message),
  568. }
  569. mockClient.putLogEventsResult <- &putLogEventsResult{
  570. successResult: &cloudwatchlogs.PutLogEventsOutput{
  571. NextSequenceToken: aws.String(nextSequenceToken),
  572. },
  573. }
  574. ticks := make(chan time.Time)
  575. newTicker = func(_ time.Duration) *time.Ticker {
  576. return &time.Ticker{
  577. C: ticks,
  578. }
  579. }
  580. go stream.collectBatch()
  581. stream.Log(&logger.Message{
  582. Line: []byte(logline),
  583. Timestamp: time.Now(),
  584. })
  585. // Log an event 1 second later
  586. stream.Log(&logger.Message{
  587. Line: []byte(logline),
  588. Timestamp: time.Now().Add(time.Second),
  589. })
  590. // Fire ticker batchPublishFrequency seconds later
  591. ticks <- time.Now().Add(batchPublishFrequency + time.Second)
  592. // Verify single multiline event is flushed after maximum event buffer age (batchPublishFrequency)
  593. argument := <-mockClient.putLogEventsArgument
  594. assert.NotNil(t, argument, "Expected non-nil PutLogEventsInput")
  595. assert.Equal(t, 1, len(argument.LogEvents), "Expected single multiline event")
  596. assert.Equal(t, logline+"\n"+logline+"\n", *argument.LogEvents[0].Message, "Received incorrect multiline message")
  597. // Log an event 1 second later
  598. stream.Log(&logger.Message{
  599. Line: []byte(logline),
  600. Timestamp: time.Now().Add(time.Second),
  601. })
  602. // Fire ticker another batchPublishFrequency seconds later
  603. ticks <- time.Now().Add(2*batchPublishFrequency + time.Second)
  604. // Verify the event buffer is truly flushed - we should only receive a single event
  605. argument = <-mockClient.putLogEventsArgument
  606. assert.NotNil(t, argument, "Expected non-nil PutLogEventsInput")
  607. assert.Equal(t, 1, len(argument.LogEvents), "Expected single multiline event")
  608. assert.Equal(t, logline+"\n", *argument.LogEvents[0].Message, "Received incorrect multiline message")
  609. stream.Close()
  610. }
  611. func TestCollectBatchMultilinePatternNegativeEventAge(t *testing.T) {
  612. mockClient := newMockClient()
  613. multilinePattern := regexp.MustCompile("xxxx")
  614. stream := &logStream{
  615. client: mockClient,
  616. logGroupName: groupName,
  617. logStreamName: streamName,
  618. multilinePattern: multilinePattern,
  619. sequenceToken: aws.String(sequenceToken),
  620. messages: make(chan *logger.Message),
  621. }
  622. mockClient.putLogEventsResult <- &putLogEventsResult{
  623. successResult: &cloudwatchlogs.PutLogEventsOutput{
  624. NextSequenceToken: aws.String(nextSequenceToken),
  625. },
  626. }
  627. ticks := make(chan time.Time)
  628. newTicker = func(_ time.Duration) *time.Ticker {
  629. return &time.Ticker{
  630. C: ticks,
  631. }
  632. }
  633. go stream.collectBatch()
  634. stream.Log(&logger.Message{
  635. Line: []byte(logline),
  636. Timestamp: time.Now(),
  637. })
  638. // Log an event 1 second later
  639. stream.Log(&logger.Message{
  640. Line: []byte(logline),
  641. Timestamp: time.Now().Add(time.Second),
  642. })
  643. // Fire ticker in past to simulate negative event buffer age
  644. ticks <- time.Now().Add(-time.Second)
  645. // Verify single multiline event is flushed with a negative event buffer age
  646. argument := <-mockClient.putLogEventsArgument
  647. assert.NotNil(t, argument, "Expected non-nil PutLogEventsInput")
  648. assert.Equal(t, 1, len(argument.LogEvents), "Expected single multiline event")
  649. assert.Equal(t, logline+"\n"+logline+"\n", *argument.LogEvents[0].Message, "Received incorrect multiline message")
  650. stream.Close()
  651. }
  652. func TestCollectBatchClose(t *testing.T) {
  653. mockClient := newMockClient()
  654. stream := &logStream{
  655. client: mockClient,
  656. logGroupName: groupName,
  657. logStreamName: streamName,
  658. sequenceToken: aws.String(sequenceToken),
  659. messages: make(chan *logger.Message),
  660. }
  661. mockClient.putLogEventsResult <- &putLogEventsResult{
  662. successResult: &cloudwatchlogs.PutLogEventsOutput{
  663. NextSequenceToken: aws.String(nextSequenceToken),
  664. },
  665. }
  666. var ticks = make(chan time.Time)
  667. newTicker = func(_ time.Duration) *time.Ticker {
  668. return &time.Ticker{
  669. C: ticks,
  670. }
  671. }
  672. go stream.collectBatch()
  673. stream.Log(&logger.Message{
  674. Line: []byte(logline),
  675. Timestamp: time.Time{},
  676. })
  677. // no ticks
  678. stream.Close()
  679. argument := <-mockClient.putLogEventsArgument
  680. if argument == nil {
  681. t.Fatal("Expected non-nil PutLogEventsInput")
  682. }
  683. if len(argument.LogEvents) != 1 {
  684. t.Errorf("Expected LogEvents to contain 1 element, but contains %d", len(argument.LogEvents))
  685. }
  686. if *argument.LogEvents[0].Message != logline {
  687. t.Errorf("Expected message to be %s but was %s", logline, *argument.LogEvents[0].Message)
  688. }
  689. }
  690. func TestCollectBatchLineSplit(t *testing.T) {
  691. mockClient := newMockClient()
  692. stream := &logStream{
  693. client: mockClient,
  694. logGroupName: groupName,
  695. logStreamName: streamName,
  696. sequenceToken: aws.String(sequenceToken),
  697. messages: make(chan *logger.Message),
  698. }
  699. mockClient.putLogEventsResult <- &putLogEventsResult{
  700. successResult: &cloudwatchlogs.PutLogEventsOutput{
  701. NextSequenceToken: aws.String(nextSequenceToken),
  702. },
  703. }
  704. var ticks = make(chan time.Time)
  705. newTicker = func(_ time.Duration) *time.Ticker {
  706. return &time.Ticker{
  707. C: ticks,
  708. }
  709. }
  710. go stream.collectBatch()
  711. longline := strings.Repeat("A", maximumBytesPerEvent)
  712. stream.Log(&logger.Message{
  713. Line: []byte(longline + "B"),
  714. Timestamp: time.Time{},
  715. })
  716. // no ticks
  717. stream.Close()
  718. argument := <-mockClient.putLogEventsArgument
  719. if argument == nil {
  720. t.Fatal("Expected non-nil PutLogEventsInput")
  721. }
  722. if len(argument.LogEvents) != 2 {
  723. t.Errorf("Expected LogEvents to contain 2 elements, but contains %d", len(argument.LogEvents))
  724. }
  725. if *argument.LogEvents[0].Message != longline {
  726. t.Errorf("Expected message to be %s but was %s", longline, *argument.LogEvents[0].Message)
  727. }
  728. if *argument.LogEvents[1].Message != "B" {
  729. t.Errorf("Expected message to be %s but was %s", "B", *argument.LogEvents[1].Message)
  730. }
  731. }
  732. func TestCollectBatchMaxEvents(t *testing.T) {
  733. mockClient := newMockClientBuffered(1)
  734. stream := &logStream{
  735. client: mockClient,
  736. logGroupName: groupName,
  737. logStreamName: streamName,
  738. sequenceToken: aws.String(sequenceToken),
  739. messages: make(chan *logger.Message),
  740. }
  741. mockClient.putLogEventsResult <- &putLogEventsResult{
  742. successResult: &cloudwatchlogs.PutLogEventsOutput{
  743. NextSequenceToken: aws.String(nextSequenceToken),
  744. },
  745. }
  746. var ticks = make(chan time.Time)
  747. newTicker = func(_ time.Duration) *time.Ticker {
  748. return &time.Ticker{
  749. C: ticks,
  750. }
  751. }
  752. go stream.collectBatch()
  753. line := "A"
  754. for i := 0; i <= maximumLogEventsPerPut; i++ {
  755. stream.Log(&logger.Message{
  756. Line: []byte(line),
  757. Timestamp: time.Time{},
  758. })
  759. }
  760. // no ticks
  761. stream.Close()
  762. argument := <-mockClient.putLogEventsArgument
  763. if argument == nil {
  764. t.Fatal("Expected non-nil PutLogEventsInput")
  765. }
  766. if len(argument.LogEvents) != maximumLogEventsPerPut {
  767. t.Errorf("Expected LogEvents to contain %d elements, but contains %d", maximumLogEventsPerPut, len(argument.LogEvents))
  768. }
  769. argument = <-mockClient.putLogEventsArgument
  770. if argument == nil {
  771. t.Fatal("Expected non-nil PutLogEventsInput")
  772. }
  773. if len(argument.LogEvents) != 1 {
  774. t.Errorf("Expected LogEvents to contain %d elements, but contains %d", 1, len(argument.LogEvents))
  775. }
  776. }
  777. func TestCollectBatchMaxTotalBytes(t *testing.T) {
  778. expectedPuts := 2
  779. mockClient := newMockClientBuffered(expectedPuts)
  780. stream := &logStream{
  781. client: mockClient,
  782. logGroupName: groupName,
  783. logStreamName: streamName,
  784. sequenceToken: aws.String(sequenceToken),
  785. messages: make(chan *logger.Message),
  786. }
  787. for i := 0; i < expectedPuts; i++ {
  788. mockClient.putLogEventsResult <- &putLogEventsResult{
  789. successResult: &cloudwatchlogs.PutLogEventsOutput{
  790. NextSequenceToken: aws.String(nextSequenceToken),
  791. },
  792. }
  793. }
  794. var ticks = make(chan time.Time)
  795. newTicker = func(_ time.Duration) *time.Ticker {
  796. return &time.Ticker{
  797. C: ticks,
  798. }
  799. }
  800. go stream.collectBatch()
  801. numPayloads := maximumBytesPerPut / (maximumBytesPerEvent + perEventBytes)
  802. // maxline is the maximum line that could be submitted after
  803. // accounting for its overhead.
  804. maxline := strings.Repeat("A", maximumBytesPerPut-(perEventBytes*numPayloads))
  805. // This will be split and batched up to the `maximumBytesPerPut'
  806. // (+/- `maximumBytesPerEvent'). This /should/ be aligned, but
  807. // should also tolerate an offset within that range.
  808. stream.Log(&logger.Message{
  809. Line: []byte(maxline[:len(maxline)/2]),
  810. Timestamp: time.Time{},
  811. })
  812. stream.Log(&logger.Message{
  813. Line: []byte(maxline[len(maxline)/2:]),
  814. Timestamp: time.Time{},
  815. })
  816. stream.Log(&logger.Message{
  817. Line: []byte("B"),
  818. Timestamp: time.Time{},
  819. })
  820. // no ticks, guarantee batch by size (and chan close)
  821. stream.Close()
  822. argument := <-mockClient.putLogEventsArgument
  823. if argument == nil {
  824. t.Fatal("Expected non-nil PutLogEventsInput")
  825. }
  826. // Should total to the maximum allowed bytes.
  827. eventBytes := 0
  828. for _, event := range argument.LogEvents {
  829. eventBytes += len(*event.Message)
  830. }
  831. eventsOverhead := len(argument.LogEvents) * perEventBytes
  832. payloadTotal := eventBytes + eventsOverhead
  833. // lowestMaxBatch allows the payload to be offset if the messages
  834. // don't lend themselves to align with the maximum event size.
  835. lowestMaxBatch := maximumBytesPerPut - maximumBytesPerEvent
  836. if payloadTotal > maximumBytesPerPut {
  837. t.Errorf("Expected <= %d bytes but was %d", maximumBytesPerPut, payloadTotal)
  838. }
  839. if payloadTotal < lowestMaxBatch {
  840. t.Errorf("Batch to be no less than %d but was %d", lowestMaxBatch, payloadTotal)
  841. }
  842. argument = <-mockClient.putLogEventsArgument
  843. if len(argument.LogEvents) != 1 {
  844. t.Errorf("Expected LogEvents to contain 1 elements, but contains %d", len(argument.LogEvents))
  845. }
  846. message := *argument.LogEvents[len(argument.LogEvents)-1].Message
  847. if message[len(message)-1:] != "B" {
  848. t.Errorf("Expected message to be %s but was %s", "B", message[len(message)-1:])
  849. }
  850. }
  851. func TestCollectBatchWithDuplicateTimestamps(t *testing.T) {
  852. mockClient := newMockClient()
  853. stream := &logStream{
  854. client: mockClient,
  855. logGroupName: groupName,
  856. logStreamName: streamName,
  857. sequenceToken: aws.String(sequenceToken),
  858. messages: make(chan *logger.Message),
  859. }
  860. mockClient.putLogEventsResult <- &putLogEventsResult{
  861. successResult: &cloudwatchlogs.PutLogEventsOutput{
  862. NextSequenceToken: aws.String(nextSequenceToken),
  863. },
  864. }
  865. ticks := make(chan time.Time)
  866. newTicker = func(_ time.Duration) *time.Ticker {
  867. return &time.Ticker{
  868. C: ticks,
  869. }
  870. }
  871. go stream.collectBatch()
  872. times := maximumLogEventsPerPut
  873. expectedEvents := []*cloudwatchlogs.InputLogEvent{}
  874. timestamp := time.Now()
  875. for i := 0; i < times; i++ {
  876. line := fmt.Sprintf("%d", i)
  877. if i%2 == 0 {
  878. timestamp.Add(1 * time.Nanosecond)
  879. }
  880. stream.Log(&logger.Message{
  881. Line: []byte(line),
  882. Timestamp: timestamp,
  883. })
  884. expectedEvents = append(expectedEvents, &cloudwatchlogs.InputLogEvent{
  885. Message: aws.String(line),
  886. Timestamp: aws.Int64(timestamp.UnixNano() / int64(time.Millisecond)),
  887. })
  888. }
  889. ticks <- time.Time{}
  890. stream.Close()
  891. argument := <-mockClient.putLogEventsArgument
  892. if argument == nil {
  893. t.Fatal("Expected non-nil PutLogEventsInput")
  894. }
  895. if len(argument.LogEvents) != times {
  896. t.Errorf("Expected LogEvents to contain %d elements, but contains %d", times, len(argument.LogEvents))
  897. }
  898. for i := 0; i < times; i++ {
  899. if !reflect.DeepEqual(*argument.LogEvents[i], *expectedEvents[i]) {
  900. t.Errorf("Expected event to be %v but was %v", *expectedEvents[i], *argument.LogEvents[i])
  901. }
  902. }
  903. }
  904. func TestParseLogOptionsMultilinePattern(t *testing.T) {
  905. info := logger.Info{
  906. Config: map[string]string{
  907. multilinePatternKey: "^xxxx",
  908. },
  909. }
  910. multilinePattern, err := parseMultilineOptions(info)
  911. assert.Nil(t, err, "Received unexpected error")
  912. assert.True(t, multilinePattern.MatchString("xxxx"), "No multiline pattern match found")
  913. }
  914. func TestParseLogOptionsDatetimeFormat(t *testing.T) {
  915. datetimeFormatTests := []struct {
  916. format string
  917. match string
  918. }{
  919. {"%d/%m/%y %a %H:%M:%S%L %Z", "31/12/10 Mon 08:42:44.345 NZDT"},
  920. {"%Y-%m-%d %A %I:%M:%S.%f%p%z", "2007-12-04 Monday 08:42:44.123456AM+1200"},
  921. {"%b|%b|%b|%b|%b|%b|%b|%b|%b|%b|%b|%b", "Jan|Feb|Mar|Apr|May|Jun|Jul|Aug|Sep|Oct|Nov|Dec"},
  922. {"%B|%B|%B|%B|%B|%B|%B|%B|%B|%B|%B|%B", "January|February|March|April|May|June|July|August|September|October|November|December"},
  923. {"%A|%A|%A|%A|%A|%A|%A", "Monday|Tuesday|Wednesday|Thursday|Friday|Saturday|Sunday"},
  924. {"%a|%a|%a|%a|%a|%a|%a", "Mon|Tue|Wed|Thu|Fri|Sat|Sun"},
  925. {"Day of the week: %w, Day of the year: %j", "Day of the week: 4, Day of the year: 091"},
  926. }
  927. for _, dt := range datetimeFormatTests {
  928. t.Run(dt.match, func(t *testing.T) {
  929. info := logger.Info{
  930. Config: map[string]string{
  931. datetimeFormatKey: dt.format,
  932. },
  933. }
  934. multilinePattern, err := parseMultilineOptions(info)
  935. assert.Nil(t, err, "Received unexpected error")
  936. assert.True(t, multilinePattern.MatchString(dt.match), "No multiline pattern match found")
  937. })
  938. }
  939. }
  940. func TestValidateLogOptionsDatetimeFormatAndMultilinePattern(t *testing.T) {
  941. cfg := map[string]string{
  942. multilinePatternKey: "^xxxx",
  943. datetimeFormatKey: "%Y-%m-%d",
  944. logGroupKey: groupName,
  945. }
  946. conflictingLogOptionsError := "you cannot configure log opt 'awslogs-datetime-format' and 'awslogs-multiline-pattern' at the same time"
  947. err := ValidateLogOpt(cfg)
  948. assert.NotNil(t, err, "Expected an error")
  949. assert.Equal(t, err.Error(), conflictingLogOptionsError, "Received invalid error")
  950. }
  951. func TestCreateTagSuccess(t *testing.T) {
  952. mockClient := newMockClient()
  953. info := logger.Info{
  954. ContainerName: "/test-container",
  955. ContainerID: "container-abcdefghijklmnopqrstuvwxyz01234567890",
  956. Config: map[string]string{"tag": "{{.Name}}/{{.FullID}}"},
  957. }
  958. logStreamName, e := loggerutils.ParseLogTag(info, loggerutils.DefaultTemplate)
  959. if e != nil {
  960. t.Errorf("Error generating tag: %q", e)
  961. }
  962. stream := &logStream{
  963. client: mockClient,
  964. logGroupName: groupName,
  965. logStreamName: logStreamName,
  966. }
  967. mockClient.createLogStreamResult <- &createLogStreamResult{}
  968. err := stream.create()
  969. if err != nil {
  970. t.Errorf("Received unexpected err: %v\n", err)
  971. }
  972. argument := <-mockClient.createLogStreamArgument
  973. if *argument.LogStreamName != "test-container/container-abcdefghijklmnopqrstuvwxyz01234567890" {
  974. t.Errorf("Expected LogStreamName to be %s", "test-container/container-abcdefghijklmnopqrstuvwxyz01234567890")
  975. }
  976. }
  977. func TestIsSizedLogger(t *testing.T) {
  978. awslogs := &logStream{}
  979. assert.Implements(t, (*logger.SizedLogger)(nil), awslogs, "awslogs should implement SizedLogger")
  980. }
  981. func BenchmarkUnwrapEvents(b *testing.B) {
  982. events := make([]wrappedEvent, maximumLogEventsPerPut)
  983. for i := 0; i < maximumLogEventsPerPut; i++ {
  984. mes := strings.Repeat("0", maximumBytesPerEvent)
  985. events[i].inputLogEvent = &cloudwatchlogs.InputLogEvent{
  986. Message: &mes,
  987. }
  988. }
  989. as := assert.New(b)
  990. b.ResetTimer()
  991. for i := 0; i < b.N; i++ {
  992. res := unwrapEvents(events)
  993. as.Len(res, maximumLogEventsPerPut)
  994. }
  995. }
  996. func TestNewAWSLogsClientCredentialEndpointDetect(t *testing.T) {
  997. // required for the cloudwatchlogs client
  998. os.Setenv("AWS_REGION", "us-west-2")
  999. defer os.Unsetenv("AWS_REGION")
  1000. credsResp := `{
  1001. "AccessKeyId" : "test-access-key-id",
  1002. "SecretAccessKey": "test-secret-access-key"
  1003. }`
  1004. expectedAccessKeyID := "test-access-key-id"
  1005. expectedSecretAccessKey := "test-secret-access-key"
  1006. testServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
  1007. w.Header().Set("Content-Type", "application/json")
  1008. fmt.Fprintln(w, credsResp)
  1009. }))
  1010. defer testServer.Close()
  1011. // set the SDKEndpoint in the driver
  1012. newSDKEndpoint = testServer.URL
  1013. info := logger.Info{
  1014. Config: map[string]string{},
  1015. }
  1016. info.Config["awslogs-credentials-endpoint"] = "/creds"
  1017. c, err := newAWSLogsClient(info)
  1018. assert.NoError(t, err)
  1019. client := c.(*cloudwatchlogs.CloudWatchLogs)
  1020. creds, err := client.Config.Credentials.Get()
  1021. assert.NoError(t, err)
  1022. assert.Equal(t, expectedAccessKeyID, creds.AccessKeyID)
  1023. assert.Equal(t, expectedSecretAccessKey, creds.SecretAccessKey)
  1024. }
  1025. func TestNewAWSLogsClientCredentialEnvironmentVariable(t *testing.T) {
  1026. // required for the cloudwatchlogs client
  1027. os.Setenv("AWS_REGION", "us-west-2")
  1028. defer os.Unsetenv("AWS_REGION")
  1029. expectedAccessKeyID := "test-access-key-id"
  1030. expectedSecretAccessKey := "test-secret-access-key"
  1031. os.Setenv("AWS_ACCESS_KEY_ID", expectedAccessKeyID)
  1032. defer os.Unsetenv("AWS_ACCESS_KEY_ID")
  1033. os.Setenv("AWS_SECRET_ACCESS_KEY", expectedSecretAccessKey)
  1034. defer os.Unsetenv("AWS_SECRET_ACCESS_KEY")
  1035. info := logger.Info{
  1036. Config: map[string]string{},
  1037. }
  1038. c, err := newAWSLogsClient(info)
  1039. assert.NoError(t, err)
  1040. client := c.(*cloudwatchlogs.CloudWatchLogs)
  1041. creds, err := client.Config.Credentials.Get()
  1042. assert.NoError(t, err)
  1043. assert.Equal(t, expectedAccessKeyID, creds.AccessKeyID)
  1044. assert.Equal(t, expectedSecretAccessKey, creds.SecretAccessKey)
  1045. }
  1046. func TestNewAWSLogsClientCredentialSharedFile(t *testing.T) {
  1047. // required for the cloudwatchlogs client
  1048. os.Setenv("AWS_REGION", "us-west-2")
  1049. defer os.Unsetenv("AWS_REGION")
  1050. expectedAccessKeyID := "test-access-key-id"
  1051. expectedSecretAccessKey := "test-secret-access-key"
  1052. contentStr := `
  1053. [default]
  1054. aws_access_key_id = "test-access-key-id"
  1055. aws_secret_access_key = "test-secret-access-key"
  1056. `
  1057. content := []byte(contentStr)
  1058. tmpfile, err := ioutil.TempFile("", "example")
  1059. defer os.Remove(tmpfile.Name()) // clean up
  1060. assert.NoError(t, err)
  1061. _, err = tmpfile.Write(content)
  1062. assert.NoError(t, err)
  1063. err = tmpfile.Close()
  1064. assert.NoError(t, err)
  1065. os.Unsetenv("AWS_ACCESS_KEY_ID")
  1066. os.Unsetenv("AWS_SECRET_ACCESS_KEY")
  1067. os.Setenv("AWS_SHARED_CREDENTIALS_FILE", tmpfile.Name())
  1068. defer os.Unsetenv("AWS_SHARED_CREDENTIALS_FILE")
  1069. info := logger.Info{
  1070. Config: map[string]string{},
  1071. }
  1072. c, err := newAWSLogsClient(info)
  1073. assert.NoError(t, err)
  1074. client := c.(*cloudwatchlogs.CloudWatchLogs)
  1075. creds, err := client.Config.Credentials.Get()
  1076. assert.NoError(t, err)
  1077. assert.Equal(t, expectedAccessKeyID, creds.AccessKeyID)
  1078. assert.Equal(t, expectedSecretAccessKey, creds.SecretAccessKey)
  1079. }