cloudwatchlogs_test.go 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627
  1. package awslogs
  2. import (
  3. "errors"
  4. "fmt"
  5. "net/http"
  6. "runtime"
  7. "strings"
  8. "testing"
  9. "time"
  10. "github.com/aws/aws-sdk-go/aws"
  11. "github.com/aws/aws-sdk-go/aws/awserr"
  12. "github.com/aws/aws-sdk-go/aws/request"
  13. "github.com/aws/aws-sdk-go/service/cloudwatchlogs"
  14. "github.com/docker/docker/daemon/logger"
  15. "github.com/docker/docker/dockerversion"
  16. )
  17. const (
  18. groupName = "groupName"
  19. streamName = "streamName"
  20. sequenceToken = "sequenceToken"
  21. nextSequenceToken = "nextSequenceToken"
  22. logline = "this is a log line"
  23. )
  24. func TestNewAWSLogsClientUserAgentHandler(t *testing.T) {
  25. ctx := logger.Context{
  26. Config: map[string]string{
  27. regionKey: "us-east-1",
  28. },
  29. }
  30. client, err := newAWSLogsClient(ctx)
  31. if err != nil {
  32. t.Fatal(err)
  33. }
  34. realClient, ok := client.(*cloudwatchlogs.CloudWatchLogs)
  35. if !ok {
  36. t.Fatal("Could not cast client to cloudwatchlogs.CloudWatchLogs")
  37. }
  38. buildHandlerList := realClient.Handlers.Build
  39. request := &request.Request{
  40. HTTPRequest: &http.Request{
  41. Header: http.Header{},
  42. },
  43. }
  44. buildHandlerList.Run(request)
  45. expectedUserAgentString := fmt.Sprintf("Docker %s (%s) %s/%s (%s; %s; %s)",
  46. dockerversion.Version, runtime.GOOS, aws.SDKName, aws.SDKVersion, runtime.Version(), runtime.GOOS, runtime.GOARCH)
  47. userAgent := request.HTTPRequest.Header.Get("User-Agent")
  48. if userAgent != expectedUserAgentString {
  49. t.Errorf("Wrong User-Agent string, expected \"%s\" but was \"%s\"",
  50. expectedUserAgentString, userAgent)
  51. }
  52. }
  53. func TestNewAWSLogsClientRegionDetect(t *testing.T) {
  54. ctx := logger.Context{
  55. Config: map[string]string{},
  56. }
  57. mockMetadata := newMockMetadataClient()
  58. newRegionFinder = func() regionFinder {
  59. return mockMetadata
  60. }
  61. mockMetadata.regionResult <- &regionResult{
  62. successResult: "us-east-1",
  63. }
  64. _, err := newAWSLogsClient(ctx)
  65. if err != nil {
  66. t.Fatal(err)
  67. }
  68. }
  69. func TestCreateSuccess(t *testing.T) {
  70. mockClient := newMockClient()
  71. stream := &logStream{
  72. client: mockClient,
  73. logGroupName: groupName,
  74. logStreamName: streamName,
  75. }
  76. mockClient.createLogStreamResult <- &createLogStreamResult{}
  77. err := stream.create()
  78. if err != nil {
  79. t.Errorf("Received unexpected err: %v\n", err)
  80. }
  81. argument := <-mockClient.createLogStreamArgument
  82. if argument.LogGroupName == nil {
  83. t.Fatal("Expected non-nil LogGroupName")
  84. }
  85. if *argument.LogGroupName != groupName {
  86. t.Errorf("Expected LogGroupName to be %s", groupName)
  87. }
  88. if argument.LogStreamName == nil {
  89. t.Fatal("Expected non-nil LogGroupName")
  90. }
  91. if *argument.LogStreamName != streamName {
  92. t.Errorf("Expected LogStreamName to be %s", streamName)
  93. }
  94. }
  95. func TestCreateError(t *testing.T) {
  96. mockClient := newMockClient()
  97. stream := &logStream{
  98. client: mockClient,
  99. }
  100. mockClient.createLogStreamResult <- &createLogStreamResult{
  101. errorResult: errors.New("Error!"),
  102. }
  103. err := stream.create()
  104. if err == nil {
  105. t.Fatal("Expected non-nil err")
  106. }
  107. }
  108. func TestCreateAlreadyExists(t *testing.T) {
  109. mockClient := newMockClient()
  110. stream := &logStream{
  111. client: mockClient,
  112. }
  113. mockClient.createLogStreamResult <- &createLogStreamResult{
  114. errorResult: awserr.New(resourceAlreadyExistsCode, "", nil),
  115. }
  116. err := stream.create()
  117. if err != nil {
  118. t.Fatal("Expected nil err")
  119. }
  120. }
  121. func TestPublishBatchSuccess(t *testing.T) {
  122. mockClient := newMockClient()
  123. stream := &logStream{
  124. client: mockClient,
  125. logGroupName: groupName,
  126. logStreamName: streamName,
  127. sequenceToken: aws.String(sequenceToken),
  128. }
  129. mockClient.putLogEventsResult <- &putLogEventsResult{
  130. successResult: &cloudwatchlogs.PutLogEventsOutput{
  131. NextSequenceToken: aws.String(nextSequenceToken),
  132. },
  133. }
  134. events := []*cloudwatchlogs.InputLogEvent{
  135. {
  136. Message: aws.String(logline),
  137. },
  138. }
  139. stream.publishBatch(events)
  140. if stream.sequenceToken == nil {
  141. t.Fatal("Expected non-nil sequenceToken")
  142. }
  143. if *stream.sequenceToken != nextSequenceToken {
  144. t.Errorf("Expected sequenceToken to be %s, but was %s", nextSequenceToken, *stream.sequenceToken)
  145. }
  146. argument := <-mockClient.putLogEventsArgument
  147. if argument == nil {
  148. t.Fatal("Expected non-nil PutLogEventsInput")
  149. }
  150. if argument.SequenceToken == nil {
  151. t.Fatal("Expected non-nil PutLogEventsInput.SequenceToken")
  152. }
  153. if *argument.SequenceToken != sequenceToken {
  154. t.Errorf("Expected PutLogEventsInput.SequenceToken to be %s, but was %s", sequenceToken, *argument.SequenceToken)
  155. }
  156. if len(argument.LogEvents) != 1 {
  157. t.Errorf("Expected LogEvents to contain 1 element, but contains %d", len(argument.LogEvents))
  158. }
  159. if argument.LogEvents[0] != events[0] {
  160. t.Error("Expected event to equal input")
  161. }
  162. }
  163. func TestPublishBatchError(t *testing.T) {
  164. mockClient := newMockClient()
  165. stream := &logStream{
  166. client: mockClient,
  167. logGroupName: groupName,
  168. logStreamName: streamName,
  169. sequenceToken: aws.String(sequenceToken),
  170. }
  171. mockClient.putLogEventsResult <- &putLogEventsResult{
  172. errorResult: errors.New("Error!"),
  173. }
  174. events := []*cloudwatchlogs.InputLogEvent{
  175. {
  176. Message: aws.String(logline),
  177. },
  178. }
  179. stream.publishBatch(events)
  180. if stream.sequenceToken == nil {
  181. t.Fatal("Expected non-nil sequenceToken")
  182. }
  183. if *stream.sequenceToken != sequenceToken {
  184. t.Errorf("Expected sequenceToken to be %s, but was %s", sequenceToken, *stream.sequenceToken)
  185. }
  186. }
  187. func TestPublishBatchInvalidSeqSuccess(t *testing.T) {
  188. mockClient := newMockClientBuffered(2)
  189. stream := &logStream{
  190. client: mockClient,
  191. logGroupName: groupName,
  192. logStreamName: streamName,
  193. sequenceToken: aws.String(sequenceToken),
  194. }
  195. mockClient.putLogEventsResult <- &putLogEventsResult{
  196. errorResult: awserr.New(invalidSequenceTokenCode, "use token token", nil),
  197. }
  198. mockClient.putLogEventsResult <- &putLogEventsResult{
  199. successResult: &cloudwatchlogs.PutLogEventsOutput{
  200. NextSequenceToken: aws.String(nextSequenceToken),
  201. },
  202. }
  203. events := []*cloudwatchlogs.InputLogEvent{
  204. {
  205. Message: aws.String(logline),
  206. },
  207. }
  208. stream.publishBatch(events)
  209. if stream.sequenceToken == nil {
  210. t.Fatal("Expected non-nil sequenceToken")
  211. }
  212. if *stream.sequenceToken != nextSequenceToken {
  213. t.Errorf("Expected sequenceToken to be %s, but was %s", nextSequenceToken, *stream.sequenceToken)
  214. }
  215. argument := <-mockClient.putLogEventsArgument
  216. if argument == nil {
  217. t.Fatal("Expected non-nil PutLogEventsInput")
  218. }
  219. if argument.SequenceToken == nil {
  220. t.Fatal("Expected non-nil PutLogEventsInput.SequenceToken")
  221. }
  222. if *argument.SequenceToken != sequenceToken {
  223. t.Errorf("Expected PutLogEventsInput.SequenceToken to be %s, but was %s", sequenceToken, *argument.SequenceToken)
  224. }
  225. if len(argument.LogEvents) != 1 {
  226. t.Errorf("Expected LogEvents to contain 1 element, but contains %d", len(argument.LogEvents))
  227. }
  228. if argument.LogEvents[0] != events[0] {
  229. t.Error("Expected event to equal input")
  230. }
  231. argument = <-mockClient.putLogEventsArgument
  232. if argument == nil {
  233. t.Fatal("Expected non-nil PutLogEventsInput")
  234. }
  235. if argument.SequenceToken == nil {
  236. t.Fatal("Expected non-nil PutLogEventsInput.SequenceToken")
  237. }
  238. if *argument.SequenceToken != "token" {
  239. t.Errorf("Expected PutLogEventsInput.SequenceToken to be %s, but was %s", "token", *argument.SequenceToken)
  240. }
  241. if len(argument.LogEvents) != 1 {
  242. t.Errorf("Expected LogEvents to contain 1 element, but contains %d", len(argument.LogEvents))
  243. }
  244. if argument.LogEvents[0] != events[0] {
  245. t.Error("Expected event to equal input")
  246. }
  247. }
  248. func TestPublishBatchAlreadyAccepted(t *testing.T) {
  249. mockClient := newMockClient()
  250. stream := &logStream{
  251. client: mockClient,
  252. logGroupName: groupName,
  253. logStreamName: streamName,
  254. sequenceToken: aws.String(sequenceToken),
  255. }
  256. mockClient.putLogEventsResult <- &putLogEventsResult{
  257. errorResult: awserr.New(dataAlreadyAcceptedCode, "use token token", nil),
  258. }
  259. events := []*cloudwatchlogs.InputLogEvent{
  260. {
  261. Message: aws.String(logline),
  262. },
  263. }
  264. stream.publishBatch(events)
  265. if stream.sequenceToken == nil {
  266. t.Fatal("Expected non-nil sequenceToken")
  267. }
  268. if *stream.sequenceToken != "token" {
  269. t.Errorf("Expected sequenceToken to be %s, but was %s", "token", *stream.sequenceToken)
  270. }
  271. argument := <-mockClient.putLogEventsArgument
  272. if argument == nil {
  273. t.Fatal("Expected non-nil PutLogEventsInput")
  274. }
  275. if argument.SequenceToken == nil {
  276. t.Fatal("Expected non-nil PutLogEventsInput.SequenceToken")
  277. }
  278. if *argument.SequenceToken != sequenceToken {
  279. t.Errorf("Expected PutLogEventsInput.SequenceToken to be %s, but was %s", sequenceToken, *argument.SequenceToken)
  280. }
  281. if len(argument.LogEvents) != 1 {
  282. t.Errorf("Expected LogEvents to contain 1 element, but contains %d", len(argument.LogEvents))
  283. }
  284. if argument.LogEvents[0] != events[0] {
  285. t.Error("Expected event to equal input")
  286. }
  287. }
  288. func TestCollectBatchSimple(t *testing.T) {
  289. mockClient := newMockClient()
  290. stream := &logStream{
  291. client: mockClient,
  292. logGroupName: groupName,
  293. logStreamName: streamName,
  294. sequenceToken: aws.String(sequenceToken),
  295. messages: make(chan *logger.Message),
  296. }
  297. mockClient.putLogEventsResult <- &putLogEventsResult{
  298. successResult: &cloudwatchlogs.PutLogEventsOutput{
  299. NextSequenceToken: aws.String(nextSequenceToken),
  300. },
  301. }
  302. ticks := make(chan time.Time)
  303. newTicker = func(_ time.Duration) *time.Ticker {
  304. return &time.Ticker{
  305. C: ticks,
  306. }
  307. }
  308. go stream.collectBatch()
  309. stream.Log(&logger.Message{
  310. Line: []byte(logline),
  311. Timestamp: time.Time{},
  312. })
  313. ticks <- time.Time{}
  314. stream.Close()
  315. argument := <-mockClient.putLogEventsArgument
  316. if argument == nil {
  317. t.Fatal("Expected non-nil PutLogEventsInput")
  318. }
  319. if len(argument.LogEvents) != 1 {
  320. t.Errorf("Expected LogEvents to contain 1 element, but contains %d", len(argument.LogEvents))
  321. }
  322. if *argument.LogEvents[0].Message != logline {
  323. t.Errorf("Expected message to be %s but was %s", logline, *argument.LogEvents[0].Message)
  324. }
  325. }
  326. func TestCollectBatchTicker(t *testing.T) {
  327. mockClient := newMockClient()
  328. stream := &logStream{
  329. client: mockClient,
  330. logGroupName: groupName,
  331. logStreamName: streamName,
  332. sequenceToken: aws.String(sequenceToken),
  333. messages: make(chan *logger.Message),
  334. }
  335. mockClient.putLogEventsResult <- &putLogEventsResult{
  336. successResult: &cloudwatchlogs.PutLogEventsOutput{
  337. NextSequenceToken: aws.String(nextSequenceToken),
  338. },
  339. }
  340. ticks := make(chan time.Time)
  341. newTicker = func(_ time.Duration) *time.Ticker {
  342. return &time.Ticker{
  343. C: ticks,
  344. }
  345. }
  346. go stream.collectBatch()
  347. stream.Log(&logger.Message{
  348. Line: []byte(logline + " 1"),
  349. Timestamp: time.Time{},
  350. })
  351. stream.Log(&logger.Message{
  352. Line: []byte(logline + " 2"),
  353. Timestamp: time.Time{},
  354. })
  355. ticks <- time.Time{}
  356. // Verify first batch
  357. argument := <-mockClient.putLogEventsArgument
  358. if argument == nil {
  359. t.Fatal("Expected non-nil PutLogEventsInput")
  360. }
  361. if len(argument.LogEvents) != 2 {
  362. t.Errorf("Expected LogEvents to contain 2 elements, but contains %d", len(argument.LogEvents))
  363. }
  364. if *argument.LogEvents[0].Message != logline+" 1" {
  365. t.Errorf("Expected message to be %s but was %s", logline+" 1", *argument.LogEvents[0].Message)
  366. }
  367. if *argument.LogEvents[1].Message != logline+" 2" {
  368. t.Errorf("Expected message to be %s but was %s", logline+" 2", *argument.LogEvents[0].Message)
  369. }
  370. stream.Log(&logger.Message{
  371. Line: []byte(logline + " 3"),
  372. Timestamp: time.Time{},
  373. })
  374. ticks <- time.Time{}
  375. argument = <-mockClient.putLogEventsArgument
  376. if argument == nil {
  377. t.Fatal("Expected non-nil PutLogEventsInput")
  378. }
  379. if len(argument.LogEvents) != 1 {
  380. t.Errorf("Expected LogEvents to contain 1 elements, but contains %d", len(argument.LogEvents))
  381. }
  382. if *argument.LogEvents[0].Message != logline+" 3" {
  383. t.Errorf("Expected message to be %s but was %s", logline+" 3", *argument.LogEvents[0].Message)
  384. }
  385. stream.Close()
  386. }
  387. func TestCollectBatchClose(t *testing.T) {
  388. mockClient := newMockClient()
  389. stream := &logStream{
  390. client: mockClient,
  391. logGroupName: groupName,
  392. logStreamName: streamName,
  393. sequenceToken: aws.String(sequenceToken),
  394. messages: make(chan *logger.Message),
  395. }
  396. mockClient.putLogEventsResult <- &putLogEventsResult{
  397. successResult: &cloudwatchlogs.PutLogEventsOutput{
  398. NextSequenceToken: aws.String(nextSequenceToken),
  399. },
  400. }
  401. var ticks = make(chan time.Time)
  402. newTicker = func(_ time.Duration) *time.Ticker {
  403. return &time.Ticker{
  404. C: ticks,
  405. }
  406. }
  407. go stream.collectBatch()
  408. stream.Log(&logger.Message{
  409. Line: []byte(logline),
  410. Timestamp: time.Time{},
  411. })
  412. // no ticks
  413. stream.Close()
  414. argument := <-mockClient.putLogEventsArgument
  415. if argument == nil {
  416. t.Fatal("Expected non-nil PutLogEventsInput")
  417. }
  418. if len(argument.LogEvents) != 1 {
  419. t.Errorf("Expected LogEvents to contain 1 element, but contains %d", len(argument.LogEvents))
  420. }
  421. if *argument.LogEvents[0].Message != logline {
  422. t.Errorf("Expected message to be %s but was %s", logline, *argument.LogEvents[0].Message)
  423. }
  424. }
  425. func TestCollectBatchLineSplit(t *testing.T) {
  426. mockClient := newMockClient()
  427. stream := &logStream{
  428. client: mockClient,
  429. logGroupName: groupName,
  430. logStreamName: streamName,
  431. sequenceToken: aws.String(sequenceToken),
  432. messages: make(chan *logger.Message),
  433. }
  434. mockClient.putLogEventsResult <- &putLogEventsResult{
  435. successResult: &cloudwatchlogs.PutLogEventsOutput{
  436. NextSequenceToken: aws.String(nextSequenceToken),
  437. },
  438. }
  439. var ticks = make(chan time.Time)
  440. newTicker = func(_ time.Duration) *time.Ticker {
  441. return &time.Ticker{
  442. C: ticks,
  443. }
  444. }
  445. go stream.collectBatch()
  446. longline := strings.Repeat("A", maximumBytesPerEvent)
  447. stream.Log(&logger.Message{
  448. Line: []byte(longline + "B"),
  449. Timestamp: time.Time{},
  450. })
  451. // no ticks
  452. stream.Close()
  453. argument := <-mockClient.putLogEventsArgument
  454. if argument == nil {
  455. t.Fatal("Expected non-nil PutLogEventsInput")
  456. }
  457. if len(argument.LogEvents) != 2 {
  458. t.Errorf("Expected LogEvents to contain 2 elements, but contains %d", len(argument.LogEvents))
  459. }
  460. if *argument.LogEvents[0].Message != longline {
  461. t.Errorf("Expected message to be %s but was %s", longline, *argument.LogEvents[0].Message)
  462. }
  463. if *argument.LogEvents[1].Message != "B" {
  464. t.Errorf("Expected message to be %s but was %s", "B", *argument.LogEvents[1].Message)
  465. }
  466. }
  467. func TestCollectBatchMaxEvents(t *testing.T) {
  468. mockClient := newMockClientBuffered(1)
  469. stream := &logStream{
  470. client: mockClient,
  471. logGroupName: groupName,
  472. logStreamName: streamName,
  473. sequenceToken: aws.String(sequenceToken),
  474. messages: make(chan *logger.Message),
  475. }
  476. mockClient.putLogEventsResult <- &putLogEventsResult{
  477. successResult: &cloudwatchlogs.PutLogEventsOutput{
  478. NextSequenceToken: aws.String(nextSequenceToken),
  479. },
  480. }
  481. var ticks = make(chan time.Time)
  482. newTicker = func(_ time.Duration) *time.Ticker {
  483. return &time.Ticker{
  484. C: ticks,
  485. }
  486. }
  487. go stream.collectBatch()
  488. line := "A"
  489. for i := 0; i <= maximumLogEventsPerPut; i++ {
  490. stream.Log(&logger.Message{
  491. Line: []byte(line),
  492. Timestamp: time.Time{},
  493. })
  494. }
  495. // no ticks
  496. stream.Close()
  497. argument := <-mockClient.putLogEventsArgument
  498. if argument == nil {
  499. t.Fatal("Expected non-nil PutLogEventsInput")
  500. }
  501. if len(argument.LogEvents) != maximumLogEventsPerPut {
  502. t.Errorf("Expected LogEvents to contain %d elements, but contains %d", maximumLogEventsPerPut, len(argument.LogEvents))
  503. }
  504. argument = <-mockClient.putLogEventsArgument
  505. if argument == nil {
  506. t.Fatal("Expected non-nil PutLogEventsInput")
  507. }
  508. if len(argument.LogEvents) != 1 {
  509. t.Errorf("Expected LogEvents to contain %d elements, but contains %d", 1, len(argument.LogEvents))
  510. }
  511. }
  512. func TestCollectBatchMaxTotalBytes(t *testing.T) {
  513. mockClient := newMockClientBuffered(1)
  514. stream := &logStream{
  515. client: mockClient,
  516. logGroupName: groupName,
  517. logStreamName: streamName,
  518. sequenceToken: aws.String(sequenceToken),
  519. messages: make(chan *logger.Message),
  520. }
  521. mockClient.putLogEventsResult <- &putLogEventsResult{
  522. successResult: &cloudwatchlogs.PutLogEventsOutput{
  523. NextSequenceToken: aws.String(nextSequenceToken),
  524. },
  525. }
  526. var ticks = make(chan time.Time)
  527. newTicker = func(_ time.Duration) *time.Ticker {
  528. return &time.Ticker{
  529. C: ticks,
  530. }
  531. }
  532. go stream.collectBatch()
  533. longline := strings.Repeat("A", maximumBytesPerPut)
  534. stream.Log(&logger.Message{
  535. Line: []byte(longline + "B"),
  536. Timestamp: time.Time{},
  537. })
  538. // no ticks
  539. stream.Close()
  540. argument := <-mockClient.putLogEventsArgument
  541. if argument == nil {
  542. t.Fatal("Expected non-nil PutLogEventsInput")
  543. }
  544. bytes := 0
  545. for _, event := range argument.LogEvents {
  546. bytes += len(*event.Message)
  547. }
  548. if bytes > maximumBytesPerPut {
  549. t.Errorf("Expected <= %d bytes but was %d", maximumBytesPerPut, bytes)
  550. }
  551. argument = <-mockClient.putLogEventsArgument
  552. if len(argument.LogEvents) != 1 {
  553. t.Errorf("Expected LogEvents to contain 1 elements, but contains %d", len(argument.LogEvents))
  554. }
  555. message := *argument.LogEvents[0].Message
  556. if message[len(message)-1:] != "B" {
  557. t.Errorf("Expected message to be %s but was %s", "B", message[len(message)-1:])
  558. }
  559. }