cloudwatchlogs_test.go 20 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755
  1. package awslogs
  2. import (
  3. "errors"
  4. "fmt"
  5. "net/http"
  6. "reflect"
  7. "runtime"
  8. "strings"
  9. "testing"
  10. "time"
  11. "github.com/aws/aws-sdk-go/aws"
  12. "github.com/aws/aws-sdk-go/aws/awserr"
  13. "github.com/aws/aws-sdk-go/aws/request"
  14. "github.com/aws/aws-sdk-go/service/cloudwatchlogs"
  15. "github.com/docker/docker/daemon/logger"
  16. "github.com/docker/docker/daemon/logger/loggerutils"
  17. "github.com/docker/docker/dockerversion"
  18. )
  19. const (
  20. groupName = "groupName"
  21. streamName = "streamName"
  22. sequenceToken = "sequenceToken"
  23. nextSequenceToken = "nextSequenceToken"
  24. logline = "this is a log line"
  25. )
  26. func TestNewAWSLogsClientUserAgentHandler(t *testing.T) {
  27. info := logger.Info{
  28. Config: map[string]string{
  29. regionKey: "us-east-1",
  30. },
  31. }
  32. client, err := newAWSLogsClient(info)
  33. if err != nil {
  34. t.Fatal(err)
  35. }
  36. realClient, ok := client.(*cloudwatchlogs.CloudWatchLogs)
  37. if !ok {
  38. t.Fatal("Could not cast client to cloudwatchlogs.CloudWatchLogs")
  39. }
  40. buildHandlerList := realClient.Handlers.Build
  41. request := &request.Request{
  42. HTTPRequest: &http.Request{
  43. Header: http.Header{},
  44. },
  45. }
  46. buildHandlerList.Run(request)
  47. expectedUserAgentString := fmt.Sprintf("Docker %s (%s) %s/%s (%s; %s; %s)",
  48. dockerversion.Version, runtime.GOOS, aws.SDKName, aws.SDKVersion, runtime.Version(), runtime.GOOS, runtime.GOARCH)
  49. userAgent := request.HTTPRequest.Header.Get("User-Agent")
  50. if userAgent != expectedUserAgentString {
  51. t.Errorf("Wrong User-Agent string, expected \"%s\" but was \"%s\"",
  52. expectedUserAgentString, userAgent)
  53. }
  54. }
  55. func TestNewAWSLogsClientRegionDetect(t *testing.T) {
  56. info := logger.Info{
  57. Config: map[string]string{},
  58. }
  59. mockMetadata := newMockMetadataClient()
  60. newRegionFinder = func() regionFinder {
  61. return mockMetadata
  62. }
  63. mockMetadata.regionResult <- &regionResult{
  64. successResult: "us-east-1",
  65. }
  66. _, err := newAWSLogsClient(info)
  67. if err != nil {
  68. t.Fatal(err)
  69. }
  70. }
  71. func TestCreateSuccess(t *testing.T) {
  72. mockClient := newMockClient()
  73. stream := &logStream{
  74. client: mockClient,
  75. logGroupName: groupName,
  76. logStreamName: streamName,
  77. }
  78. mockClient.createLogStreamResult <- &createLogStreamResult{}
  79. err := stream.create()
  80. if err != nil {
  81. t.Errorf("Received unexpected err: %v\n", err)
  82. }
  83. argument := <-mockClient.createLogStreamArgument
  84. if argument.LogGroupName == nil {
  85. t.Fatal("Expected non-nil LogGroupName")
  86. }
  87. if *argument.LogGroupName != groupName {
  88. t.Errorf("Expected LogGroupName to be %s", groupName)
  89. }
  90. if argument.LogStreamName == nil {
  91. t.Fatal("Expected non-nil LogStreamName")
  92. }
  93. if *argument.LogStreamName != streamName {
  94. t.Errorf("Expected LogStreamName to be %s", streamName)
  95. }
  96. }
  97. func TestCreateLogGroupSuccess(t *testing.T) {
  98. mockClient := newMockClient()
  99. stream := &logStream{
  100. client: mockClient,
  101. logGroupName: groupName,
  102. logStreamName: streamName,
  103. logCreateGroup: true,
  104. }
  105. mockClient.createLogGroupResult <- &createLogGroupResult{}
  106. mockClient.createLogStreamResult <- &createLogStreamResult{}
  107. err := stream.create()
  108. if err != nil {
  109. t.Errorf("Received unexpected err: %v\n", err)
  110. }
  111. argument := <-mockClient.createLogStreamArgument
  112. if argument.LogGroupName == nil {
  113. t.Fatal("Expected non-nil LogGroupName")
  114. }
  115. if *argument.LogGroupName != groupName {
  116. t.Errorf("Expected LogGroupName to be %s", groupName)
  117. }
  118. if argument.LogStreamName == nil {
  119. t.Fatal("Expected non-nil LogStreamName")
  120. }
  121. if *argument.LogStreamName != streamName {
  122. t.Errorf("Expected LogStreamName to be %s", streamName)
  123. }
  124. }
  125. func TestCreateError(t *testing.T) {
  126. mockClient := newMockClient()
  127. stream := &logStream{
  128. client: mockClient,
  129. }
  130. mockClient.createLogStreamResult <- &createLogStreamResult{
  131. errorResult: errors.New("Error!"),
  132. }
  133. err := stream.create()
  134. if err == nil {
  135. t.Fatal("Expected non-nil err")
  136. }
  137. }
  138. func TestCreateAlreadyExists(t *testing.T) {
  139. mockClient := newMockClient()
  140. stream := &logStream{
  141. client: mockClient,
  142. }
  143. mockClient.createLogStreamResult <- &createLogStreamResult{
  144. errorResult: awserr.New(resourceAlreadyExistsCode, "", nil),
  145. }
  146. err := stream.create()
  147. if err != nil {
  148. t.Fatal("Expected nil err")
  149. }
  150. }
  151. func TestPublishBatchSuccess(t *testing.T) {
  152. mockClient := newMockClient()
  153. stream := &logStream{
  154. client: mockClient,
  155. logGroupName: groupName,
  156. logStreamName: streamName,
  157. sequenceToken: aws.String(sequenceToken),
  158. }
  159. mockClient.putLogEventsResult <- &putLogEventsResult{
  160. successResult: &cloudwatchlogs.PutLogEventsOutput{
  161. NextSequenceToken: aws.String(nextSequenceToken),
  162. },
  163. }
  164. events := []wrappedEvent{
  165. {
  166. inputLogEvent: &cloudwatchlogs.InputLogEvent{
  167. Message: aws.String(logline),
  168. },
  169. },
  170. }
  171. stream.publishBatch(events)
  172. if stream.sequenceToken == nil {
  173. t.Fatal("Expected non-nil sequenceToken")
  174. }
  175. if *stream.sequenceToken != nextSequenceToken {
  176. t.Errorf("Expected sequenceToken to be %s, but was %s", nextSequenceToken, *stream.sequenceToken)
  177. }
  178. argument := <-mockClient.putLogEventsArgument
  179. if argument == nil {
  180. t.Fatal("Expected non-nil PutLogEventsInput")
  181. }
  182. if argument.SequenceToken == nil {
  183. t.Fatal("Expected non-nil PutLogEventsInput.SequenceToken")
  184. }
  185. if *argument.SequenceToken != sequenceToken {
  186. t.Errorf("Expected PutLogEventsInput.SequenceToken to be %s, but was %s", sequenceToken, *argument.SequenceToken)
  187. }
  188. if len(argument.LogEvents) != 1 {
  189. t.Errorf("Expected LogEvents to contain 1 element, but contains %d", len(argument.LogEvents))
  190. }
  191. if argument.LogEvents[0] != events[0].inputLogEvent {
  192. t.Error("Expected event to equal input")
  193. }
  194. }
  195. func TestPublishBatchError(t *testing.T) {
  196. mockClient := newMockClient()
  197. stream := &logStream{
  198. client: mockClient,
  199. logGroupName: groupName,
  200. logStreamName: streamName,
  201. sequenceToken: aws.String(sequenceToken),
  202. }
  203. mockClient.putLogEventsResult <- &putLogEventsResult{
  204. errorResult: errors.New("Error!"),
  205. }
  206. events := []wrappedEvent{
  207. {
  208. inputLogEvent: &cloudwatchlogs.InputLogEvent{
  209. Message: aws.String(logline),
  210. },
  211. },
  212. }
  213. stream.publishBatch(events)
  214. if stream.sequenceToken == nil {
  215. t.Fatal("Expected non-nil sequenceToken")
  216. }
  217. if *stream.sequenceToken != sequenceToken {
  218. t.Errorf("Expected sequenceToken to be %s, but was %s", sequenceToken, *stream.sequenceToken)
  219. }
  220. }
  221. func TestPublishBatchInvalidSeqSuccess(t *testing.T) {
  222. mockClient := newMockClientBuffered(2)
  223. stream := &logStream{
  224. client: mockClient,
  225. logGroupName: groupName,
  226. logStreamName: streamName,
  227. sequenceToken: aws.String(sequenceToken),
  228. }
  229. mockClient.putLogEventsResult <- &putLogEventsResult{
  230. errorResult: awserr.New(invalidSequenceTokenCode, "use token token", nil),
  231. }
  232. mockClient.putLogEventsResult <- &putLogEventsResult{
  233. successResult: &cloudwatchlogs.PutLogEventsOutput{
  234. NextSequenceToken: aws.String(nextSequenceToken),
  235. },
  236. }
  237. events := []wrappedEvent{
  238. {
  239. inputLogEvent: &cloudwatchlogs.InputLogEvent{
  240. Message: aws.String(logline),
  241. },
  242. },
  243. }
  244. stream.publishBatch(events)
  245. if stream.sequenceToken == nil {
  246. t.Fatal("Expected non-nil sequenceToken")
  247. }
  248. if *stream.sequenceToken != nextSequenceToken {
  249. t.Errorf("Expected sequenceToken to be %s, but was %s", nextSequenceToken, *stream.sequenceToken)
  250. }
  251. argument := <-mockClient.putLogEventsArgument
  252. if argument == nil {
  253. t.Fatal("Expected non-nil PutLogEventsInput")
  254. }
  255. if argument.SequenceToken == nil {
  256. t.Fatal("Expected non-nil PutLogEventsInput.SequenceToken")
  257. }
  258. if *argument.SequenceToken != sequenceToken {
  259. t.Errorf("Expected PutLogEventsInput.SequenceToken to be %s, but was %s", sequenceToken, *argument.SequenceToken)
  260. }
  261. if len(argument.LogEvents) != 1 {
  262. t.Errorf("Expected LogEvents to contain 1 element, but contains %d", len(argument.LogEvents))
  263. }
  264. if argument.LogEvents[0] != events[0].inputLogEvent {
  265. t.Error("Expected event to equal input")
  266. }
  267. argument = <-mockClient.putLogEventsArgument
  268. if argument == nil {
  269. t.Fatal("Expected non-nil PutLogEventsInput")
  270. }
  271. if argument.SequenceToken == nil {
  272. t.Fatal("Expected non-nil PutLogEventsInput.SequenceToken")
  273. }
  274. if *argument.SequenceToken != "token" {
  275. t.Errorf("Expected PutLogEventsInput.SequenceToken to be %s, but was %s", "token", *argument.SequenceToken)
  276. }
  277. if len(argument.LogEvents) != 1 {
  278. t.Errorf("Expected LogEvents to contain 1 element, but contains %d", len(argument.LogEvents))
  279. }
  280. if argument.LogEvents[0] != events[0].inputLogEvent {
  281. t.Error("Expected event to equal input")
  282. }
  283. }
  284. func TestPublishBatchAlreadyAccepted(t *testing.T) {
  285. mockClient := newMockClient()
  286. stream := &logStream{
  287. client: mockClient,
  288. logGroupName: groupName,
  289. logStreamName: streamName,
  290. sequenceToken: aws.String(sequenceToken),
  291. }
  292. mockClient.putLogEventsResult <- &putLogEventsResult{
  293. errorResult: awserr.New(dataAlreadyAcceptedCode, "use token token", nil),
  294. }
  295. events := []wrappedEvent{
  296. {
  297. inputLogEvent: &cloudwatchlogs.InputLogEvent{
  298. Message: aws.String(logline),
  299. },
  300. },
  301. }
  302. stream.publishBatch(events)
  303. if stream.sequenceToken == nil {
  304. t.Fatal("Expected non-nil sequenceToken")
  305. }
  306. if *stream.sequenceToken != "token" {
  307. t.Errorf("Expected sequenceToken to be %s, but was %s", "token", *stream.sequenceToken)
  308. }
  309. argument := <-mockClient.putLogEventsArgument
  310. if argument == nil {
  311. t.Fatal("Expected non-nil PutLogEventsInput")
  312. }
  313. if argument.SequenceToken == nil {
  314. t.Fatal("Expected non-nil PutLogEventsInput.SequenceToken")
  315. }
  316. if *argument.SequenceToken != sequenceToken {
  317. t.Errorf("Expected PutLogEventsInput.SequenceToken to be %s, but was %s", sequenceToken, *argument.SequenceToken)
  318. }
  319. if len(argument.LogEvents) != 1 {
  320. t.Errorf("Expected LogEvents to contain 1 element, but contains %d", len(argument.LogEvents))
  321. }
  322. if argument.LogEvents[0] != events[0].inputLogEvent {
  323. t.Error("Expected event to equal input")
  324. }
  325. }
  326. func TestCollectBatchSimple(t *testing.T) {
  327. mockClient := newMockClient()
  328. stream := &logStream{
  329. client: mockClient,
  330. logGroupName: groupName,
  331. logStreamName: streamName,
  332. sequenceToken: aws.String(sequenceToken),
  333. messages: make(chan *logger.Message),
  334. }
  335. mockClient.putLogEventsResult <- &putLogEventsResult{
  336. successResult: &cloudwatchlogs.PutLogEventsOutput{
  337. NextSequenceToken: aws.String(nextSequenceToken),
  338. },
  339. }
  340. ticks := make(chan time.Time)
  341. newTicker = func(_ time.Duration) *time.Ticker {
  342. return &time.Ticker{
  343. C: ticks,
  344. }
  345. }
  346. go stream.collectBatch()
  347. stream.Log(&logger.Message{
  348. Line: []byte(logline),
  349. Timestamp: time.Time{},
  350. })
  351. ticks <- time.Time{}
  352. stream.Close()
  353. argument := <-mockClient.putLogEventsArgument
  354. if argument == nil {
  355. t.Fatal("Expected non-nil PutLogEventsInput")
  356. }
  357. if len(argument.LogEvents) != 1 {
  358. t.Errorf("Expected LogEvents to contain 1 element, but contains %d", len(argument.LogEvents))
  359. }
  360. if *argument.LogEvents[0].Message != logline {
  361. t.Errorf("Expected message to be %s but was %s", logline, *argument.LogEvents[0].Message)
  362. }
  363. }
  364. func TestCollectBatchTicker(t *testing.T) {
  365. mockClient := newMockClient()
  366. stream := &logStream{
  367. client: mockClient,
  368. logGroupName: groupName,
  369. logStreamName: streamName,
  370. sequenceToken: aws.String(sequenceToken),
  371. messages: make(chan *logger.Message),
  372. }
  373. mockClient.putLogEventsResult <- &putLogEventsResult{
  374. successResult: &cloudwatchlogs.PutLogEventsOutput{
  375. NextSequenceToken: aws.String(nextSequenceToken),
  376. },
  377. }
  378. ticks := make(chan time.Time)
  379. newTicker = func(_ time.Duration) *time.Ticker {
  380. return &time.Ticker{
  381. C: ticks,
  382. }
  383. }
  384. go stream.collectBatch()
  385. stream.Log(&logger.Message{
  386. Line: []byte(logline + " 1"),
  387. Timestamp: time.Time{},
  388. })
  389. stream.Log(&logger.Message{
  390. Line: []byte(logline + " 2"),
  391. Timestamp: time.Time{},
  392. })
  393. ticks <- time.Time{}
  394. // Verify first batch
  395. argument := <-mockClient.putLogEventsArgument
  396. if argument == nil {
  397. t.Fatal("Expected non-nil PutLogEventsInput")
  398. }
  399. if len(argument.LogEvents) != 2 {
  400. t.Errorf("Expected LogEvents to contain 2 elements, but contains %d", len(argument.LogEvents))
  401. }
  402. if *argument.LogEvents[0].Message != logline+" 1" {
  403. t.Errorf("Expected message to be %s but was %s", logline+" 1", *argument.LogEvents[0].Message)
  404. }
  405. if *argument.LogEvents[1].Message != logline+" 2" {
  406. t.Errorf("Expected message to be %s but was %s", logline+" 2", *argument.LogEvents[0].Message)
  407. }
  408. stream.Log(&logger.Message{
  409. Line: []byte(logline + " 3"),
  410. Timestamp: time.Time{},
  411. })
  412. ticks <- time.Time{}
  413. argument = <-mockClient.putLogEventsArgument
  414. if argument == nil {
  415. t.Fatal("Expected non-nil PutLogEventsInput")
  416. }
  417. if len(argument.LogEvents) != 1 {
  418. t.Errorf("Expected LogEvents to contain 1 elements, but contains %d", len(argument.LogEvents))
  419. }
  420. if *argument.LogEvents[0].Message != logline+" 3" {
  421. t.Errorf("Expected message to be %s but was %s", logline+" 3", *argument.LogEvents[0].Message)
  422. }
  423. stream.Close()
  424. }
  425. func TestCollectBatchClose(t *testing.T) {
  426. mockClient := newMockClient()
  427. stream := &logStream{
  428. client: mockClient,
  429. logGroupName: groupName,
  430. logStreamName: streamName,
  431. sequenceToken: aws.String(sequenceToken),
  432. messages: make(chan *logger.Message),
  433. }
  434. mockClient.putLogEventsResult <- &putLogEventsResult{
  435. successResult: &cloudwatchlogs.PutLogEventsOutput{
  436. NextSequenceToken: aws.String(nextSequenceToken),
  437. },
  438. }
  439. var ticks = make(chan time.Time)
  440. newTicker = func(_ time.Duration) *time.Ticker {
  441. return &time.Ticker{
  442. C: ticks,
  443. }
  444. }
  445. go stream.collectBatch()
  446. stream.Log(&logger.Message{
  447. Line: []byte(logline),
  448. Timestamp: time.Time{},
  449. })
  450. // no ticks
  451. stream.Close()
  452. argument := <-mockClient.putLogEventsArgument
  453. if argument == nil {
  454. t.Fatal("Expected non-nil PutLogEventsInput")
  455. }
  456. if len(argument.LogEvents) != 1 {
  457. t.Errorf("Expected LogEvents to contain 1 element, but contains %d", len(argument.LogEvents))
  458. }
  459. if *argument.LogEvents[0].Message != logline {
  460. t.Errorf("Expected message to be %s but was %s", logline, *argument.LogEvents[0].Message)
  461. }
  462. }
  463. func TestCollectBatchLineSplit(t *testing.T) {
  464. mockClient := newMockClient()
  465. stream := &logStream{
  466. client: mockClient,
  467. logGroupName: groupName,
  468. logStreamName: streamName,
  469. sequenceToken: aws.String(sequenceToken),
  470. messages: make(chan *logger.Message),
  471. }
  472. mockClient.putLogEventsResult <- &putLogEventsResult{
  473. successResult: &cloudwatchlogs.PutLogEventsOutput{
  474. NextSequenceToken: aws.String(nextSequenceToken),
  475. },
  476. }
  477. var ticks = make(chan time.Time)
  478. newTicker = func(_ time.Duration) *time.Ticker {
  479. return &time.Ticker{
  480. C: ticks,
  481. }
  482. }
  483. go stream.collectBatch()
  484. longline := strings.Repeat("A", maximumBytesPerEvent)
  485. stream.Log(&logger.Message{
  486. Line: []byte(longline + "B"),
  487. Timestamp: time.Time{},
  488. })
  489. // no ticks
  490. stream.Close()
  491. argument := <-mockClient.putLogEventsArgument
  492. if argument == nil {
  493. t.Fatal("Expected non-nil PutLogEventsInput")
  494. }
  495. if len(argument.LogEvents) != 2 {
  496. t.Errorf("Expected LogEvents to contain 2 elements, but contains %d", len(argument.LogEvents))
  497. }
  498. if *argument.LogEvents[0].Message != longline {
  499. t.Errorf("Expected message to be %s but was %s", longline, *argument.LogEvents[0].Message)
  500. }
  501. if *argument.LogEvents[1].Message != "B" {
  502. t.Errorf("Expected message to be %s but was %s", "B", *argument.LogEvents[1].Message)
  503. }
  504. }
  505. func TestCollectBatchMaxEvents(t *testing.T) {
  506. mockClient := newMockClientBuffered(1)
  507. stream := &logStream{
  508. client: mockClient,
  509. logGroupName: groupName,
  510. logStreamName: streamName,
  511. sequenceToken: aws.String(sequenceToken),
  512. messages: make(chan *logger.Message),
  513. }
  514. mockClient.putLogEventsResult <- &putLogEventsResult{
  515. successResult: &cloudwatchlogs.PutLogEventsOutput{
  516. NextSequenceToken: aws.String(nextSequenceToken),
  517. },
  518. }
  519. var ticks = make(chan time.Time)
  520. newTicker = func(_ time.Duration) *time.Ticker {
  521. return &time.Ticker{
  522. C: ticks,
  523. }
  524. }
  525. go stream.collectBatch()
  526. line := "A"
  527. for i := 0; i <= maximumLogEventsPerPut; i++ {
  528. stream.Log(&logger.Message{
  529. Line: []byte(line),
  530. Timestamp: time.Time{},
  531. })
  532. }
  533. // no ticks
  534. stream.Close()
  535. argument := <-mockClient.putLogEventsArgument
  536. if argument == nil {
  537. t.Fatal("Expected non-nil PutLogEventsInput")
  538. }
  539. if len(argument.LogEvents) != maximumLogEventsPerPut {
  540. t.Errorf("Expected LogEvents to contain %d elements, but contains %d", maximumLogEventsPerPut, len(argument.LogEvents))
  541. }
  542. argument = <-mockClient.putLogEventsArgument
  543. if argument == nil {
  544. t.Fatal("Expected non-nil PutLogEventsInput")
  545. }
  546. if len(argument.LogEvents) != 1 {
  547. t.Errorf("Expected LogEvents to contain %d elements, but contains %d", 1, len(argument.LogEvents))
  548. }
  549. }
  550. func TestCollectBatchMaxTotalBytes(t *testing.T) {
  551. mockClient := newMockClientBuffered(1)
  552. stream := &logStream{
  553. client: mockClient,
  554. logGroupName: groupName,
  555. logStreamName: streamName,
  556. sequenceToken: aws.String(sequenceToken),
  557. messages: make(chan *logger.Message),
  558. }
  559. mockClient.putLogEventsResult <- &putLogEventsResult{
  560. successResult: &cloudwatchlogs.PutLogEventsOutput{
  561. NextSequenceToken: aws.String(nextSequenceToken),
  562. },
  563. }
  564. var ticks = make(chan time.Time)
  565. newTicker = func(_ time.Duration) *time.Ticker {
  566. return &time.Ticker{
  567. C: ticks,
  568. }
  569. }
  570. go stream.collectBatch()
  571. longline := strings.Repeat("A", maximumBytesPerPut)
  572. stream.Log(&logger.Message{
  573. Line: []byte(longline + "B"),
  574. Timestamp: time.Time{},
  575. })
  576. // no ticks
  577. stream.Close()
  578. argument := <-mockClient.putLogEventsArgument
  579. if argument == nil {
  580. t.Fatal("Expected non-nil PutLogEventsInput")
  581. }
  582. bytes := 0
  583. for _, event := range argument.LogEvents {
  584. bytes += len(*event.Message)
  585. }
  586. if bytes > maximumBytesPerPut {
  587. t.Errorf("Expected <= %d bytes but was %d", maximumBytesPerPut, bytes)
  588. }
  589. argument = <-mockClient.putLogEventsArgument
  590. if len(argument.LogEvents) != 1 {
  591. t.Errorf("Expected LogEvents to contain 1 elements, but contains %d", len(argument.LogEvents))
  592. }
  593. message := *argument.LogEvents[0].Message
  594. if message[len(message)-1:] != "B" {
  595. t.Errorf("Expected message to be %s but was %s", "B", message[len(message)-1:])
  596. }
  597. }
  598. func TestCollectBatchWithDuplicateTimestamps(t *testing.T) {
  599. mockClient := newMockClient()
  600. stream := &logStream{
  601. client: mockClient,
  602. logGroupName: groupName,
  603. logStreamName: streamName,
  604. sequenceToken: aws.String(sequenceToken),
  605. messages: make(chan *logger.Message),
  606. }
  607. mockClient.putLogEventsResult <- &putLogEventsResult{
  608. successResult: &cloudwatchlogs.PutLogEventsOutput{
  609. NextSequenceToken: aws.String(nextSequenceToken),
  610. },
  611. }
  612. ticks := make(chan time.Time)
  613. newTicker = func(_ time.Duration) *time.Ticker {
  614. return &time.Ticker{
  615. C: ticks,
  616. }
  617. }
  618. go stream.collectBatch()
  619. times := maximumLogEventsPerPut
  620. expectedEvents := []*cloudwatchlogs.InputLogEvent{}
  621. timestamp := time.Now()
  622. for i := 0; i < times; i++ {
  623. line := fmt.Sprintf("%d", i)
  624. if i%2 == 0 {
  625. timestamp.Add(1 * time.Nanosecond)
  626. }
  627. stream.Log(&logger.Message{
  628. Line: []byte(line),
  629. Timestamp: timestamp,
  630. })
  631. expectedEvents = append(expectedEvents, &cloudwatchlogs.InputLogEvent{
  632. Message: aws.String(line),
  633. Timestamp: aws.Int64(timestamp.UnixNano() / int64(time.Millisecond)),
  634. })
  635. }
  636. ticks <- time.Time{}
  637. stream.Close()
  638. argument := <-mockClient.putLogEventsArgument
  639. if argument == nil {
  640. t.Fatal("Expected non-nil PutLogEventsInput")
  641. }
  642. if len(argument.LogEvents) != times {
  643. t.Errorf("Expected LogEvents to contain %d elements, but contains %d", times, len(argument.LogEvents))
  644. }
  645. for i := 0; i < times; i++ {
  646. if !reflect.DeepEqual(*argument.LogEvents[i], *expectedEvents[i]) {
  647. t.Errorf("Expected event to be %v but was %v", *expectedEvents[i], *argument.LogEvents[i])
  648. }
  649. }
  650. }
  651. func TestCreateTagSuccess(t *testing.T) {
  652. mockClient := newMockClient()
  653. info := logger.Info{
  654. ContainerName: "/test-container",
  655. ContainerID: "container-abcdefghijklmnopqrstuvwxyz01234567890",
  656. Config: map[string]string{"tag": "{{.Name}}/{{.FullID}}"},
  657. }
  658. logStreamName, e := loggerutils.ParseLogTag(info, loggerutils.DefaultTemplate)
  659. if e != nil {
  660. t.Errorf("Error generating tag: %q", e)
  661. }
  662. stream := &logStream{
  663. client: mockClient,
  664. logGroupName: groupName,
  665. logStreamName: logStreamName,
  666. }
  667. mockClient.createLogStreamResult <- &createLogStreamResult{}
  668. err := stream.create()
  669. if err != nil {
  670. t.Errorf("Received unexpected err: %v\n", err)
  671. }
  672. argument := <-mockClient.createLogStreamArgument
  673. if *argument.LogStreamName != "test-container/container-abcdefghijklmnopqrstuvwxyz01234567890" {
  674. t.Errorf("Expected LogStreamName to be %s", "test-container/container-abcdefghijklmnopqrstuvwxyz01234567890")
  675. }
  676. }