cloudwatchlogs_test.go 30 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067
  1. package awslogs
  2. import (
  3. "errors"
  4. "fmt"
  5. "net/http"
  6. "reflect"
  7. "regexp"
  8. "runtime"
  9. "strings"
  10. "testing"
  11. "time"
  12. "github.com/aws/aws-sdk-go/aws"
  13. "github.com/aws/aws-sdk-go/aws/awserr"
  14. "github.com/aws/aws-sdk-go/aws/request"
  15. "github.com/aws/aws-sdk-go/service/cloudwatchlogs"
  16. "github.com/docker/docker/daemon/logger"
  17. "github.com/docker/docker/daemon/logger/loggerutils"
  18. "github.com/docker/docker/dockerversion"
  19. "github.com/stretchr/testify/assert"
  20. )
  21. const (
  22. groupName = "groupName"
  23. streamName = "streamName"
  24. sequenceToken = "sequenceToken"
  25. nextSequenceToken = "nextSequenceToken"
  26. logline = "this is a log line\r"
  27. multilineLogline = "2017-01-01 01:01:44 This is a multiline log entry\r"
  28. )
  29. // Generates i multi-line events each with j lines
  30. func (l *logStream) logGenerator(lineCount int, multilineCount int) {
  31. for i := 0; i < multilineCount; i++ {
  32. l.Log(&logger.Message{
  33. Line: []byte(multilineLogline),
  34. Timestamp: time.Time{},
  35. })
  36. for j := 0; j < lineCount; j++ {
  37. l.Log(&logger.Message{
  38. Line: []byte(logline),
  39. Timestamp: time.Time{},
  40. })
  41. }
  42. }
  43. }
  44. func TestNewAWSLogsClientUserAgentHandler(t *testing.T) {
  45. info := logger.Info{
  46. Config: map[string]string{
  47. regionKey: "us-east-1",
  48. },
  49. }
  50. client, err := newAWSLogsClient(info)
  51. if err != nil {
  52. t.Fatal(err)
  53. }
  54. realClient, ok := client.(*cloudwatchlogs.CloudWatchLogs)
  55. if !ok {
  56. t.Fatal("Could not cast client to cloudwatchlogs.CloudWatchLogs")
  57. }
  58. buildHandlerList := realClient.Handlers.Build
  59. request := &request.Request{
  60. HTTPRequest: &http.Request{
  61. Header: http.Header{},
  62. },
  63. }
  64. buildHandlerList.Run(request)
  65. expectedUserAgentString := fmt.Sprintf("Docker %s (%s) %s/%s (%s; %s; %s)",
  66. dockerversion.Version, runtime.GOOS, aws.SDKName, aws.SDKVersion, runtime.Version(), runtime.GOOS, runtime.GOARCH)
  67. userAgent := request.HTTPRequest.Header.Get("User-Agent")
  68. if userAgent != expectedUserAgentString {
  69. t.Errorf("Wrong User-Agent string, expected \"%s\" but was \"%s\"",
  70. expectedUserAgentString, userAgent)
  71. }
  72. }
  73. func TestNewAWSLogsClientRegionDetect(t *testing.T) {
  74. info := logger.Info{
  75. Config: map[string]string{},
  76. }
  77. mockMetadata := newMockMetadataClient()
  78. newRegionFinder = func() regionFinder {
  79. return mockMetadata
  80. }
  81. mockMetadata.regionResult <- &regionResult{
  82. successResult: "us-east-1",
  83. }
  84. _, err := newAWSLogsClient(info)
  85. if err != nil {
  86. t.Fatal(err)
  87. }
  88. }
  89. func TestCreateSuccess(t *testing.T) {
  90. mockClient := newMockClient()
  91. stream := &logStream{
  92. client: mockClient,
  93. logGroupName: groupName,
  94. logStreamName: streamName,
  95. }
  96. mockClient.createLogStreamResult <- &createLogStreamResult{}
  97. err := stream.create()
  98. if err != nil {
  99. t.Errorf("Received unexpected err: %v\n", err)
  100. }
  101. argument := <-mockClient.createLogStreamArgument
  102. if argument.LogGroupName == nil {
  103. t.Fatal("Expected non-nil LogGroupName")
  104. }
  105. if *argument.LogGroupName != groupName {
  106. t.Errorf("Expected LogGroupName to be %s", groupName)
  107. }
  108. if argument.LogStreamName == nil {
  109. t.Fatal("Expected non-nil LogStreamName")
  110. }
  111. if *argument.LogStreamName != streamName {
  112. t.Errorf("Expected LogStreamName to be %s", streamName)
  113. }
  114. }
  115. func TestCreateLogGroupSuccess(t *testing.T) {
  116. mockClient := newMockClient()
  117. stream := &logStream{
  118. client: mockClient,
  119. logGroupName: groupName,
  120. logStreamName: streamName,
  121. logCreateGroup: true,
  122. }
  123. mockClient.createLogGroupResult <- &createLogGroupResult{}
  124. mockClient.createLogStreamResult <- &createLogStreamResult{}
  125. err := stream.create()
  126. if err != nil {
  127. t.Errorf("Received unexpected err: %v\n", err)
  128. }
  129. argument := <-mockClient.createLogStreamArgument
  130. if argument.LogGroupName == nil {
  131. t.Fatal("Expected non-nil LogGroupName")
  132. }
  133. if *argument.LogGroupName != groupName {
  134. t.Errorf("Expected LogGroupName to be %s", groupName)
  135. }
  136. if argument.LogStreamName == nil {
  137. t.Fatal("Expected non-nil LogStreamName")
  138. }
  139. if *argument.LogStreamName != streamName {
  140. t.Errorf("Expected LogStreamName to be %s", streamName)
  141. }
  142. }
  143. func TestCreateError(t *testing.T) {
  144. mockClient := newMockClient()
  145. stream := &logStream{
  146. client: mockClient,
  147. }
  148. mockClient.createLogStreamResult <- &createLogStreamResult{
  149. errorResult: errors.New("Error"),
  150. }
  151. err := stream.create()
  152. if err == nil {
  153. t.Fatal("Expected non-nil err")
  154. }
  155. }
  156. func TestCreateAlreadyExists(t *testing.T) {
  157. mockClient := newMockClient()
  158. stream := &logStream{
  159. client: mockClient,
  160. }
  161. mockClient.createLogStreamResult <- &createLogStreamResult{
  162. errorResult: awserr.New(resourceAlreadyExistsCode, "", nil),
  163. }
  164. err := stream.create()
  165. if err != nil {
  166. t.Fatal("Expected nil err")
  167. }
  168. }
  169. func TestPublishBatchSuccess(t *testing.T) {
  170. mockClient := newMockClient()
  171. stream := &logStream{
  172. client: mockClient,
  173. logGroupName: groupName,
  174. logStreamName: streamName,
  175. sequenceToken: aws.String(sequenceToken),
  176. }
  177. mockClient.putLogEventsResult <- &putLogEventsResult{
  178. successResult: &cloudwatchlogs.PutLogEventsOutput{
  179. NextSequenceToken: aws.String(nextSequenceToken),
  180. },
  181. }
  182. events := []wrappedEvent{
  183. {
  184. inputLogEvent: &cloudwatchlogs.InputLogEvent{
  185. Message: aws.String(logline),
  186. },
  187. },
  188. }
  189. stream.publishBatch(events)
  190. if stream.sequenceToken == nil {
  191. t.Fatal("Expected non-nil sequenceToken")
  192. }
  193. if *stream.sequenceToken != nextSequenceToken {
  194. t.Errorf("Expected sequenceToken to be %s, but was %s", nextSequenceToken, *stream.sequenceToken)
  195. }
  196. argument := <-mockClient.putLogEventsArgument
  197. if argument == nil {
  198. t.Fatal("Expected non-nil PutLogEventsInput")
  199. }
  200. if argument.SequenceToken == nil {
  201. t.Fatal("Expected non-nil PutLogEventsInput.SequenceToken")
  202. }
  203. if *argument.SequenceToken != sequenceToken {
  204. t.Errorf("Expected PutLogEventsInput.SequenceToken to be %s, but was %s", sequenceToken, *argument.SequenceToken)
  205. }
  206. if len(argument.LogEvents) != 1 {
  207. t.Errorf("Expected LogEvents to contain 1 element, but contains %d", len(argument.LogEvents))
  208. }
  209. if argument.LogEvents[0] != events[0].inputLogEvent {
  210. t.Error("Expected event to equal input")
  211. }
  212. }
  213. func TestPublishBatchError(t *testing.T) {
  214. mockClient := newMockClient()
  215. stream := &logStream{
  216. client: mockClient,
  217. logGroupName: groupName,
  218. logStreamName: streamName,
  219. sequenceToken: aws.String(sequenceToken),
  220. }
  221. mockClient.putLogEventsResult <- &putLogEventsResult{
  222. errorResult: errors.New("Error"),
  223. }
  224. events := []wrappedEvent{
  225. {
  226. inputLogEvent: &cloudwatchlogs.InputLogEvent{
  227. Message: aws.String(logline),
  228. },
  229. },
  230. }
  231. stream.publishBatch(events)
  232. if stream.sequenceToken == nil {
  233. t.Fatal("Expected non-nil sequenceToken")
  234. }
  235. if *stream.sequenceToken != sequenceToken {
  236. t.Errorf("Expected sequenceToken to be %s, but was %s", sequenceToken, *stream.sequenceToken)
  237. }
  238. }
  239. func TestPublishBatchInvalidSeqSuccess(t *testing.T) {
  240. mockClient := newMockClientBuffered(2)
  241. stream := &logStream{
  242. client: mockClient,
  243. logGroupName: groupName,
  244. logStreamName: streamName,
  245. sequenceToken: aws.String(sequenceToken),
  246. }
  247. mockClient.putLogEventsResult <- &putLogEventsResult{
  248. errorResult: awserr.New(invalidSequenceTokenCode, "use token token", nil),
  249. }
  250. mockClient.putLogEventsResult <- &putLogEventsResult{
  251. successResult: &cloudwatchlogs.PutLogEventsOutput{
  252. NextSequenceToken: aws.String(nextSequenceToken),
  253. },
  254. }
  255. events := []wrappedEvent{
  256. {
  257. inputLogEvent: &cloudwatchlogs.InputLogEvent{
  258. Message: aws.String(logline),
  259. },
  260. },
  261. }
  262. stream.publishBatch(events)
  263. if stream.sequenceToken == nil {
  264. t.Fatal("Expected non-nil sequenceToken")
  265. }
  266. if *stream.sequenceToken != nextSequenceToken {
  267. t.Errorf("Expected sequenceToken to be %s, but was %s", nextSequenceToken, *stream.sequenceToken)
  268. }
  269. argument := <-mockClient.putLogEventsArgument
  270. if argument == nil {
  271. t.Fatal("Expected non-nil PutLogEventsInput")
  272. }
  273. if argument.SequenceToken == nil {
  274. t.Fatal("Expected non-nil PutLogEventsInput.SequenceToken")
  275. }
  276. if *argument.SequenceToken != sequenceToken {
  277. t.Errorf("Expected PutLogEventsInput.SequenceToken to be %s, but was %s", sequenceToken, *argument.SequenceToken)
  278. }
  279. if len(argument.LogEvents) != 1 {
  280. t.Errorf("Expected LogEvents to contain 1 element, but contains %d", len(argument.LogEvents))
  281. }
  282. if argument.LogEvents[0] != events[0].inputLogEvent {
  283. t.Error("Expected event to equal input")
  284. }
  285. argument = <-mockClient.putLogEventsArgument
  286. if argument == nil {
  287. t.Fatal("Expected non-nil PutLogEventsInput")
  288. }
  289. if argument.SequenceToken == nil {
  290. t.Fatal("Expected non-nil PutLogEventsInput.SequenceToken")
  291. }
  292. if *argument.SequenceToken != "token" {
  293. t.Errorf("Expected PutLogEventsInput.SequenceToken to be %s, but was %s", "token", *argument.SequenceToken)
  294. }
  295. if len(argument.LogEvents) != 1 {
  296. t.Errorf("Expected LogEvents to contain 1 element, but contains %d", len(argument.LogEvents))
  297. }
  298. if argument.LogEvents[0] != events[0].inputLogEvent {
  299. t.Error("Expected event to equal input")
  300. }
  301. }
  302. func TestPublishBatchAlreadyAccepted(t *testing.T) {
  303. mockClient := newMockClient()
  304. stream := &logStream{
  305. client: mockClient,
  306. logGroupName: groupName,
  307. logStreamName: streamName,
  308. sequenceToken: aws.String(sequenceToken),
  309. }
  310. mockClient.putLogEventsResult <- &putLogEventsResult{
  311. errorResult: awserr.New(dataAlreadyAcceptedCode, "use token token", nil),
  312. }
  313. events := []wrappedEvent{
  314. {
  315. inputLogEvent: &cloudwatchlogs.InputLogEvent{
  316. Message: aws.String(logline),
  317. },
  318. },
  319. }
  320. stream.publishBatch(events)
  321. if stream.sequenceToken == nil {
  322. t.Fatal("Expected non-nil sequenceToken")
  323. }
  324. if *stream.sequenceToken != "token" {
  325. t.Errorf("Expected sequenceToken to be %s, but was %s", "token", *stream.sequenceToken)
  326. }
  327. argument := <-mockClient.putLogEventsArgument
  328. if argument == nil {
  329. t.Fatal("Expected non-nil PutLogEventsInput")
  330. }
  331. if argument.SequenceToken == nil {
  332. t.Fatal("Expected non-nil PutLogEventsInput.SequenceToken")
  333. }
  334. if *argument.SequenceToken != sequenceToken {
  335. t.Errorf("Expected PutLogEventsInput.SequenceToken to be %s, but was %s", sequenceToken, *argument.SequenceToken)
  336. }
  337. if len(argument.LogEvents) != 1 {
  338. t.Errorf("Expected LogEvents to contain 1 element, but contains %d", len(argument.LogEvents))
  339. }
  340. if argument.LogEvents[0] != events[0].inputLogEvent {
  341. t.Error("Expected event to equal input")
  342. }
  343. }
  344. func TestCollectBatchSimple(t *testing.T) {
  345. mockClient := newMockClient()
  346. stream := &logStream{
  347. client: mockClient,
  348. logGroupName: groupName,
  349. logStreamName: streamName,
  350. sequenceToken: aws.String(sequenceToken),
  351. messages: make(chan *logger.Message),
  352. }
  353. mockClient.putLogEventsResult <- &putLogEventsResult{
  354. successResult: &cloudwatchlogs.PutLogEventsOutput{
  355. NextSequenceToken: aws.String(nextSequenceToken),
  356. },
  357. }
  358. ticks := make(chan time.Time)
  359. newTicker = func(_ time.Duration) *time.Ticker {
  360. return &time.Ticker{
  361. C: ticks,
  362. }
  363. }
  364. go stream.collectBatch()
  365. stream.Log(&logger.Message{
  366. Line: []byte(logline),
  367. Timestamp: time.Time{},
  368. })
  369. ticks <- time.Time{}
  370. stream.Close()
  371. argument := <-mockClient.putLogEventsArgument
  372. if argument == nil {
  373. t.Fatal("Expected non-nil PutLogEventsInput")
  374. }
  375. if len(argument.LogEvents) != 1 {
  376. t.Errorf("Expected LogEvents to contain 1 element, but contains %d", len(argument.LogEvents))
  377. }
  378. if *argument.LogEvents[0].Message != logline {
  379. t.Errorf("Expected message to be %s but was %s", logline, *argument.LogEvents[0].Message)
  380. }
  381. }
  382. func TestCollectBatchTicker(t *testing.T) {
  383. mockClient := newMockClient()
  384. stream := &logStream{
  385. client: mockClient,
  386. logGroupName: groupName,
  387. logStreamName: streamName,
  388. sequenceToken: aws.String(sequenceToken),
  389. messages: make(chan *logger.Message),
  390. }
  391. mockClient.putLogEventsResult <- &putLogEventsResult{
  392. successResult: &cloudwatchlogs.PutLogEventsOutput{
  393. NextSequenceToken: aws.String(nextSequenceToken),
  394. },
  395. }
  396. ticks := make(chan time.Time)
  397. newTicker = func(_ time.Duration) *time.Ticker {
  398. return &time.Ticker{
  399. C: ticks,
  400. }
  401. }
  402. go stream.collectBatch()
  403. stream.Log(&logger.Message{
  404. Line: []byte(logline + " 1"),
  405. Timestamp: time.Time{},
  406. })
  407. stream.Log(&logger.Message{
  408. Line: []byte(logline + " 2"),
  409. Timestamp: time.Time{},
  410. })
  411. ticks <- time.Time{}
  412. // Verify first batch
  413. argument := <-mockClient.putLogEventsArgument
  414. if argument == nil {
  415. t.Fatal("Expected non-nil PutLogEventsInput")
  416. }
  417. if len(argument.LogEvents) != 2 {
  418. t.Errorf("Expected LogEvents to contain 2 elements, but contains %d", len(argument.LogEvents))
  419. }
  420. if *argument.LogEvents[0].Message != logline+" 1" {
  421. t.Errorf("Expected message to be %s but was %s", logline+" 1", *argument.LogEvents[0].Message)
  422. }
  423. if *argument.LogEvents[1].Message != logline+" 2" {
  424. t.Errorf("Expected message to be %s but was %s", logline+" 2", *argument.LogEvents[0].Message)
  425. }
  426. stream.Log(&logger.Message{
  427. Line: []byte(logline + " 3"),
  428. Timestamp: time.Time{},
  429. })
  430. ticks <- time.Time{}
  431. argument = <-mockClient.putLogEventsArgument
  432. if argument == nil {
  433. t.Fatal("Expected non-nil PutLogEventsInput")
  434. }
  435. if len(argument.LogEvents) != 1 {
  436. t.Errorf("Expected LogEvents to contain 1 elements, but contains %d", len(argument.LogEvents))
  437. }
  438. if *argument.LogEvents[0].Message != logline+" 3" {
  439. t.Errorf("Expected message to be %s but was %s", logline+" 3", *argument.LogEvents[0].Message)
  440. }
  441. stream.Close()
  442. }
  443. func TestCollectBatchMultilinePattern(t *testing.T) {
  444. mockClient := newMockClient()
  445. multilinePattern := regexp.MustCompile("xxxx")
  446. stream := &logStream{
  447. client: mockClient,
  448. logGroupName: groupName,
  449. logStreamName: streamName,
  450. multilinePattern: multilinePattern,
  451. sequenceToken: aws.String(sequenceToken),
  452. messages: make(chan *logger.Message),
  453. }
  454. mockClient.putLogEventsResult <- &putLogEventsResult{
  455. successResult: &cloudwatchlogs.PutLogEventsOutput{
  456. NextSequenceToken: aws.String(nextSequenceToken),
  457. },
  458. }
  459. ticks := make(chan time.Time)
  460. newTicker = func(_ time.Duration) *time.Ticker {
  461. return &time.Ticker{
  462. C: ticks,
  463. }
  464. }
  465. go stream.collectBatch()
  466. stream.Log(&logger.Message{
  467. Line: []byte(logline),
  468. Timestamp: time.Now(),
  469. })
  470. stream.Log(&logger.Message{
  471. Line: []byte(logline),
  472. Timestamp: time.Now(),
  473. })
  474. stream.Log(&logger.Message{
  475. Line: []byte("xxxx " + logline),
  476. Timestamp: time.Now(),
  477. })
  478. ticks <- time.Now()
  479. // Verify single multiline event
  480. argument := <-mockClient.putLogEventsArgument
  481. assert.NotNil(t, argument, "Expected non-nil PutLogEventsInput")
  482. assert.Equal(t, 1, len(argument.LogEvents), "Expected single multiline event")
  483. assert.Equal(t, logline+"\n"+logline+"\n", *argument.LogEvents[0].Message, "Received incorrect multiline message")
  484. stream.Close()
  485. // Verify single event
  486. argument = <-mockClient.putLogEventsArgument
  487. assert.NotNil(t, argument, "Expected non-nil PutLogEventsInput")
  488. assert.Equal(t, 1, len(argument.LogEvents), "Expected single multiline event")
  489. assert.Equal(t, "xxxx "+logline+"\n", *argument.LogEvents[0].Message, "Received incorrect multiline message")
  490. }
  491. func BenchmarkCollectBatch(b *testing.B) {
  492. for i := 0; i < b.N; i++ {
  493. mockClient := newMockClient()
  494. stream := &logStream{
  495. client: mockClient,
  496. logGroupName: groupName,
  497. logStreamName: streamName,
  498. sequenceToken: aws.String(sequenceToken),
  499. messages: make(chan *logger.Message),
  500. }
  501. mockClient.putLogEventsResult <- &putLogEventsResult{
  502. successResult: &cloudwatchlogs.PutLogEventsOutput{
  503. NextSequenceToken: aws.String(nextSequenceToken),
  504. },
  505. }
  506. ticks := make(chan time.Time)
  507. newTicker = func(_ time.Duration) *time.Ticker {
  508. return &time.Ticker{
  509. C: ticks,
  510. }
  511. }
  512. go stream.collectBatch()
  513. stream.logGenerator(10, 100)
  514. ticks <- time.Time{}
  515. stream.Close()
  516. }
  517. }
  518. func BenchmarkCollectBatchMultilinePattern(b *testing.B) {
  519. for i := 0; i < b.N; i++ {
  520. mockClient := newMockClient()
  521. multilinePattern := regexp.MustCompile(`\d{4}-(?:0[1-9]|1[0-2])-(?:0[1-9]|[1,2][0-9]|3[0,1]) (?:[0,1][0-9]|2[0-3]):[0-5][0-9]:[0-5][0-9]`)
  522. stream := &logStream{
  523. client: mockClient,
  524. logGroupName: groupName,
  525. logStreamName: streamName,
  526. multilinePattern: multilinePattern,
  527. sequenceToken: aws.String(sequenceToken),
  528. messages: make(chan *logger.Message),
  529. }
  530. mockClient.putLogEventsResult <- &putLogEventsResult{
  531. successResult: &cloudwatchlogs.PutLogEventsOutput{
  532. NextSequenceToken: aws.String(nextSequenceToken),
  533. },
  534. }
  535. ticks := make(chan time.Time)
  536. newTicker = func(_ time.Duration) *time.Ticker {
  537. return &time.Ticker{
  538. C: ticks,
  539. }
  540. }
  541. go stream.collectBatch()
  542. stream.logGenerator(10, 100)
  543. ticks <- time.Time{}
  544. stream.Close()
  545. }
  546. }
  547. func TestCollectBatchMultilinePatternMaxEventAge(t *testing.T) {
  548. mockClient := newMockClient()
  549. multilinePattern := regexp.MustCompile("xxxx")
  550. stream := &logStream{
  551. client: mockClient,
  552. logGroupName: groupName,
  553. logStreamName: streamName,
  554. multilinePattern: multilinePattern,
  555. sequenceToken: aws.String(sequenceToken),
  556. messages: make(chan *logger.Message),
  557. }
  558. mockClient.putLogEventsResult <- &putLogEventsResult{
  559. successResult: &cloudwatchlogs.PutLogEventsOutput{
  560. NextSequenceToken: aws.String(nextSequenceToken),
  561. },
  562. }
  563. ticks := make(chan time.Time)
  564. newTicker = func(_ time.Duration) *time.Ticker {
  565. return &time.Ticker{
  566. C: ticks,
  567. }
  568. }
  569. go stream.collectBatch()
  570. stream.Log(&logger.Message{
  571. Line: []byte(logline),
  572. Timestamp: time.Now(),
  573. })
  574. // Log an event 1 second later
  575. stream.Log(&logger.Message{
  576. Line: []byte(logline),
  577. Timestamp: time.Now().Add(time.Second),
  578. })
  579. // Fire ticker batchPublishFrequency seconds later
  580. ticks <- time.Now().Add(batchPublishFrequency + time.Second)
  581. // Verify single multiline event is flushed after maximum event buffer age (batchPublishFrequency)
  582. argument := <-mockClient.putLogEventsArgument
  583. assert.NotNil(t, argument, "Expected non-nil PutLogEventsInput")
  584. assert.Equal(t, 1, len(argument.LogEvents), "Expected single multiline event")
  585. assert.Equal(t, logline+"\n"+logline+"\n", *argument.LogEvents[0].Message, "Received incorrect multiline message")
  586. // Log an event 1 second later
  587. stream.Log(&logger.Message{
  588. Line: []byte(logline),
  589. Timestamp: time.Now().Add(time.Second),
  590. })
  591. // Fire ticker another batchPublishFrequency seconds later
  592. ticks <- time.Now().Add(2*batchPublishFrequency + time.Second)
  593. // Verify the event buffer is truly flushed - we should only receive a single event
  594. argument = <-mockClient.putLogEventsArgument
  595. assert.NotNil(t, argument, "Expected non-nil PutLogEventsInput")
  596. assert.Equal(t, 1, len(argument.LogEvents), "Expected single multiline event")
  597. assert.Equal(t, logline+"\n", *argument.LogEvents[0].Message, "Received incorrect multiline message")
  598. stream.Close()
  599. }
  600. func TestCollectBatchMultilinePatternNegativeEventAge(t *testing.T) {
  601. mockClient := newMockClient()
  602. multilinePattern := regexp.MustCompile("xxxx")
  603. stream := &logStream{
  604. client: mockClient,
  605. logGroupName: groupName,
  606. logStreamName: streamName,
  607. multilinePattern: multilinePattern,
  608. sequenceToken: aws.String(sequenceToken),
  609. messages: make(chan *logger.Message),
  610. }
  611. mockClient.putLogEventsResult <- &putLogEventsResult{
  612. successResult: &cloudwatchlogs.PutLogEventsOutput{
  613. NextSequenceToken: aws.String(nextSequenceToken),
  614. },
  615. }
  616. ticks := make(chan time.Time)
  617. newTicker = func(_ time.Duration) *time.Ticker {
  618. return &time.Ticker{
  619. C: ticks,
  620. }
  621. }
  622. go stream.collectBatch()
  623. stream.Log(&logger.Message{
  624. Line: []byte(logline),
  625. Timestamp: time.Now(),
  626. })
  627. // Log an event 1 second later
  628. stream.Log(&logger.Message{
  629. Line: []byte(logline),
  630. Timestamp: time.Now().Add(time.Second),
  631. })
  632. // Fire ticker in past to simulate negative event buffer age
  633. ticks <- time.Now().Add(-time.Second)
  634. // Verify single multiline event is flushed with a negative event buffer age
  635. argument := <-mockClient.putLogEventsArgument
  636. assert.NotNil(t, argument, "Expected non-nil PutLogEventsInput")
  637. assert.Equal(t, 1, len(argument.LogEvents), "Expected single multiline event")
  638. assert.Equal(t, logline+"\n"+logline+"\n", *argument.LogEvents[0].Message, "Received incorrect multiline message")
  639. stream.Close()
  640. }
  641. func TestCollectBatchClose(t *testing.T) {
  642. mockClient := newMockClient()
  643. stream := &logStream{
  644. client: mockClient,
  645. logGroupName: groupName,
  646. logStreamName: streamName,
  647. sequenceToken: aws.String(sequenceToken),
  648. messages: make(chan *logger.Message),
  649. }
  650. mockClient.putLogEventsResult <- &putLogEventsResult{
  651. successResult: &cloudwatchlogs.PutLogEventsOutput{
  652. NextSequenceToken: aws.String(nextSequenceToken),
  653. },
  654. }
  655. var ticks = make(chan time.Time)
  656. newTicker = func(_ time.Duration) *time.Ticker {
  657. return &time.Ticker{
  658. C: ticks,
  659. }
  660. }
  661. go stream.collectBatch()
  662. stream.Log(&logger.Message{
  663. Line: []byte(logline),
  664. Timestamp: time.Time{},
  665. })
  666. // no ticks
  667. stream.Close()
  668. argument := <-mockClient.putLogEventsArgument
  669. if argument == nil {
  670. t.Fatal("Expected non-nil PutLogEventsInput")
  671. }
  672. if len(argument.LogEvents) != 1 {
  673. t.Errorf("Expected LogEvents to contain 1 element, but contains %d", len(argument.LogEvents))
  674. }
  675. if *argument.LogEvents[0].Message != logline {
  676. t.Errorf("Expected message to be %s but was %s", logline, *argument.LogEvents[0].Message)
  677. }
  678. }
  679. func TestCollectBatchLineSplit(t *testing.T) {
  680. mockClient := newMockClient()
  681. stream := &logStream{
  682. client: mockClient,
  683. logGroupName: groupName,
  684. logStreamName: streamName,
  685. sequenceToken: aws.String(sequenceToken),
  686. messages: make(chan *logger.Message),
  687. }
  688. mockClient.putLogEventsResult <- &putLogEventsResult{
  689. successResult: &cloudwatchlogs.PutLogEventsOutput{
  690. NextSequenceToken: aws.String(nextSequenceToken),
  691. },
  692. }
  693. var ticks = make(chan time.Time)
  694. newTicker = func(_ time.Duration) *time.Ticker {
  695. return &time.Ticker{
  696. C: ticks,
  697. }
  698. }
  699. go stream.collectBatch()
  700. longline := strings.Repeat("A", maximumBytesPerEvent)
  701. stream.Log(&logger.Message{
  702. Line: []byte(longline + "B"),
  703. Timestamp: time.Time{},
  704. })
  705. // no ticks
  706. stream.Close()
  707. argument := <-mockClient.putLogEventsArgument
  708. if argument == nil {
  709. t.Fatal("Expected non-nil PutLogEventsInput")
  710. }
  711. if len(argument.LogEvents) != 2 {
  712. t.Errorf("Expected LogEvents to contain 2 elements, but contains %d", len(argument.LogEvents))
  713. }
  714. if *argument.LogEvents[0].Message != longline {
  715. t.Errorf("Expected message to be %s but was %s", longline, *argument.LogEvents[0].Message)
  716. }
  717. if *argument.LogEvents[1].Message != "B" {
  718. t.Errorf("Expected message to be %s but was %s", "B", *argument.LogEvents[1].Message)
  719. }
  720. }
  721. func TestCollectBatchMaxEvents(t *testing.T) {
  722. mockClient := newMockClientBuffered(1)
  723. stream := &logStream{
  724. client: mockClient,
  725. logGroupName: groupName,
  726. logStreamName: streamName,
  727. sequenceToken: aws.String(sequenceToken),
  728. messages: make(chan *logger.Message),
  729. }
  730. mockClient.putLogEventsResult <- &putLogEventsResult{
  731. successResult: &cloudwatchlogs.PutLogEventsOutput{
  732. NextSequenceToken: aws.String(nextSequenceToken),
  733. },
  734. }
  735. var ticks = make(chan time.Time)
  736. newTicker = func(_ time.Duration) *time.Ticker {
  737. return &time.Ticker{
  738. C: ticks,
  739. }
  740. }
  741. go stream.collectBatch()
  742. line := "A"
  743. for i := 0; i <= maximumLogEventsPerPut; i++ {
  744. stream.Log(&logger.Message{
  745. Line: []byte(line),
  746. Timestamp: time.Time{},
  747. })
  748. }
  749. // no ticks
  750. stream.Close()
  751. argument := <-mockClient.putLogEventsArgument
  752. if argument == nil {
  753. t.Fatal("Expected non-nil PutLogEventsInput")
  754. }
  755. if len(argument.LogEvents) != maximumLogEventsPerPut {
  756. t.Errorf("Expected LogEvents to contain %d elements, but contains %d", maximumLogEventsPerPut, len(argument.LogEvents))
  757. }
  758. argument = <-mockClient.putLogEventsArgument
  759. if argument == nil {
  760. t.Fatal("Expected non-nil PutLogEventsInput")
  761. }
  762. if len(argument.LogEvents) != 1 {
  763. t.Errorf("Expected LogEvents to contain %d elements, but contains %d", 1, len(argument.LogEvents))
  764. }
  765. }
  766. func TestCollectBatchMaxTotalBytes(t *testing.T) {
  767. mockClient := newMockClientBuffered(1)
  768. stream := &logStream{
  769. client: mockClient,
  770. logGroupName: groupName,
  771. logStreamName: streamName,
  772. sequenceToken: aws.String(sequenceToken),
  773. messages: make(chan *logger.Message),
  774. }
  775. mockClient.putLogEventsResult <- &putLogEventsResult{
  776. successResult: &cloudwatchlogs.PutLogEventsOutput{
  777. NextSequenceToken: aws.String(nextSequenceToken),
  778. },
  779. }
  780. var ticks = make(chan time.Time)
  781. newTicker = func(_ time.Duration) *time.Ticker {
  782. return &time.Ticker{
  783. C: ticks,
  784. }
  785. }
  786. go stream.collectBatch()
  787. longline := strings.Repeat("A", maximumBytesPerPut)
  788. stream.Log(&logger.Message{
  789. Line: []byte(longline + "B"),
  790. Timestamp: time.Time{},
  791. })
  792. // no ticks
  793. stream.Close()
  794. argument := <-mockClient.putLogEventsArgument
  795. if argument == nil {
  796. t.Fatal("Expected non-nil PutLogEventsInput")
  797. }
  798. bytes := 0
  799. for _, event := range argument.LogEvents {
  800. bytes += len(*event.Message)
  801. }
  802. if bytes > maximumBytesPerPut {
  803. t.Errorf("Expected <= %d bytes but was %d", maximumBytesPerPut, bytes)
  804. }
  805. argument = <-mockClient.putLogEventsArgument
  806. if len(argument.LogEvents) != 1 {
  807. t.Errorf("Expected LogEvents to contain 1 elements, but contains %d", len(argument.LogEvents))
  808. }
  809. message := *argument.LogEvents[0].Message
  810. if message[len(message)-1:] != "B" {
  811. t.Errorf("Expected message to be %s but was %s", "B", message[len(message)-1:])
  812. }
  813. }
  814. func TestCollectBatchWithDuplicateTimestamps(t *testing.T) {
  815. mockClient := newMockClient()
  816. stream := &logStream{
  817. client: mockClient,
  818. logGroupName: groupName,
  819. logStreamName: streamName,
  820. sequenceToken: aws.String(sequenceToken),
  821. messages: make(chan *logger.Message),
  822. }
  823. mockClient.putLogEventsResult <- &putLogEventsResult{
  824. successResult: &cloudwatchlogs.PutLogEventsOutput{
  825. NextSequenceToken: aws.String(nextSequenceToken),
  826. },
  827. }
  828. ticks := make(chan time.Time)
  829. newTicker = func(_ time.Duration) *time.Ticker {
  830. return &time.Ticker{
  831. C: ticks,
  832. }
  833. }
  834. go stream.collectBatch()
  835. times := maximumLogEventsPerPut
  836. expectedEvents := []*cloudwatchlogs.InputLogEvent{}
  837. timestamp := time.Now()
  838. for i := 0; i < times; i++ {
  839. line := fmt.Sprintf("%d", i)
  840. if i%2 == 0 {
  841. timestamp.Add(1 * time.Nanosecond)
  842. }
  843. stream.Log(&logger.Message{
  844. Line: []byte(line),
  845. Timestamp: timestamp,
  846. })
  847. expectedEvents = append(expectedEvents, &cloudwatchlogs.InputLogEvent{
  848. Message: aws.String(line),
  849. Timestamp: aws.Int64(timestamp.UnixNano() / int64(time.Millisecond)),
  850. })
  851. }
  852. ticks <- time.Time{}
  853. stream.Close()
  854. argument := <-mockClient.putLogEventsArgument
  855. if argument == nil {
  856. t.Fatal("Expected non-nil PutLogEventsInput")
  857. }
  858. if len(argument.LogEvents) != times {
  859. t.Errorf("Expected LogEvents to contain %d elements, but contains %d", times, len(argument.LogEvents))
  860. }
  861. for i := 0; i < times; i++ {
  862. if !reflect.DeepEqual(*argument.LogEvents[i], *expectedEvents[i]) {
  863. t.Errorf("Expected event to be %v but was %v", *expectedEvents[i], *argument.LogEvents[i])
  864. }
  865. }
  866. }
  867. func TestParseLogOptionsMultilinePattern(t *testing.T) {
  868. info := logger.Info{
  869. Config: map[string]string{
  870. multilinePatternKey: "^xxxx",
  871. },
  872. }
  873. multilinePattern, err := parseMultilineOptions(info)
  874. assert.Nil(t, err, "Received unexpected error")
  875. assert.True(t, multilinePattern.MatchString("xxxx"), "No multiline pattern match found")
  876. }
  877. func TestParseLogOptionsDatetimeFormat(t *testing.T) {
  878. datetimeFormatTests := []struct {
  879. format string
  880. match string
  881. }{
  882. {"%d/%m/%y %a %H:%M:%S%L %Z", "31/12/10 Mon 08:42:44.345 NZDT"},
  883. {"%Y-%m-%d %A %I:%M:%S.%f%p%z", "2007-12-04 Monday 08:42:44.123456AM+1200"},
  884. {"%b|%b|%b|%b|%b|%b|%b|%b|%b|%b|%b|%b", "Jan|Feb|Mar|Apr|May|Jun|Jul|Aug|Sep|Oct|Nov|Dec"},
  885. {"%B|%B|%B|%B|%B|%B|%B|%B|%B|%B|%B|%B", "January|February|March|April|May|June|July|August|September|October|November|December"},
  886. {"%A|%A|%A|%A|%A|%A|%A", "Monday|Tuesday|Wednesday|Thursday|Friday|Saturday|Sunday"},
  887. {"%a|%a|%a|%a|%a|%a|%a", "Mon|Tue|Wed|Thu|Fri|Sat|Sun"},
  888. {"Day of the week: %w, Day of the year: %j", "Day of the week: 4, Day of the year: 091"},
  889. }
  890. for _, dt := range datetimeFormatTests {
  891. t.Run(dt.match, func(t *testing.T) {
  892. info := logger.Info{
  893. Config: map[string]string{
  894. datetimeFormatKey: dt.format,
  895. },
  896. }
  897. multilinePattern, err := parseMultilineOptions(info)
  898. assert.Nil(t, err, "Received unexpected error")
  899. assert.True(t, multilinePattern.MatchString(dt.match), "No multiline pattern match found")
  900. })
  901. }
  902. }
  903. func TestValidateLogOptionsDatetimeFormatAndMultilinePattern(t *testing.T) {
  904. cfg := map[string]string{
  905. multilinePatternKey: "^xxxx",
  906. datetimeFormatKey: "%Y-%m-%d",
  907. logGroupKey: groupName,
  908. }
  909. conflictingLogOptionsError := "you cannot configure log opt 'awslogs-datetime-format' and 'awslogs-multiline-pattern' at the same time"
  910. err := ValidateLogOpt(cfg)
  911. assert.NotNil(t, err, "Expected an error")
  912. assert.Equal(t, err.Error(), conflictingLogOptionsError, "Received invalid error")
  913. }
  914. func TestCreateTagSuccess(t *testing.T) {
  915. mockClient := newMockClient()
  916. info := logger.Info{
  917. ContainerName: "/test-container",
  918. ContainerID: "container-abcdefghijklmnopqrstuvwxyz01234567890",
  919. Config: map[string]string{"tag": "{{.Name}}/{{.FullID}}"},
  920. }
  921. logStreamName, e := loggerutils.ParseLogTag(info, loggerutils.DefaultTemplate)
  922. if e != nil {
  923. t.Errorf("Error generating tag: %q", e)
  924. }
  925. stream := &logStream{
  926. client: mockClient,
  927. logGroupName: groupName,
  928. logStreamName: logStreamName,
  929. }
  930. mockClient.createLogStreamResult <- &createLogStreamResult{}
  931. err := stream.create()
  932. if err != nil {
  933. t.Errorf("Received unexpected err: %v\n", err)
  934. }
  935. argument := <-mockClient.createLogStreamArgument
  936. if *argument.LogStreamName != "test-container/container-abcdefghijklmnopqrstuvwxyz01234567890" {
  937. t.Errorf("Expected LogStreamName to be %s", "test-container/container-abcdefghijklmnopqrstuvwxyz01234567890")
  938. }
  939. }
  940. func BenchmarkUnwrapEvents(b *testing.B) {
  941. events := make([]wrappedEvent, maximumLogEventsPerPut)
  942. for i := 0; i < maximumLogEventsPerPut; i++ {
  943. mes := strings.Repeat("0", maximumBytesPerEvent)
  944. events[i].inputLogEvent = &cloudwatchlogs.InputLogEvent{
  945. Message: &mes,
  946. }
  947. }
  948. as := assert.New(b)
  949. b.ResetTimer()
  950. for i := 0; i < b.N; i++ {
  951. res := unwrapEvents(events)
  952. as.Len(res, maximumLogEventsPerPut)
  953. }
  954. }