cloudwatchlogs_test.go 39 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391
  1. package awslogs // import "github.com/docker/docker/daemon/logger/awslogs"
  2. import (
  3. "errors"
  4. "fmt"
  5. "io/ioutil"
  6. "net/http"
  7. "net/http/httptest"
  8. "os"
  9. "reflect"
  10. "regexp"
  11. "runtime"
  12. "strings"
  13. "testing"
  14. "time"
  15. "github.com/aws/aws-sdk-go/aws"
  16. "github.com/aws/aws-sdk-go/aws/awserr"
  17. "github.com/aws/aws-sdk-go/aws/request"
  18. "github.com/aws/aws-sdk-go/service/cloudwatchlogs"
  19. "github.com/docker/docker/daemon/logger"
  20. "github.com/docker/docker/daemon/logger/loggerutils"
  21. "github.com/docker/docker/dockerversion"
  22. "github.com/gotestyourself/gotestyourself/assert"
  23. is "github.com/gotestyourself/gotestyourself/assert/cmp"
  24. )
  25. const (
  26. groupName = "groupName"
  27. streamName = "streamName"
  28. sequenceToken = "sequenceToken"
  29. nextSequenceToken = "nextSequenceToken"
  30. logline = "this is a log line\r"
  31. multilineLogline = "2017-01-01 01:01:44 This is a multiline log entry\r"
  32. )
  33. // Generates i multi-line events each with j lines
  34. func (l *logStream) logGenerator(lineCount int, multilineCount int) {
  35. for i := 0; i < multilineCount; i++ {
  36. l.Log(&logger.Message{
  37. Line: []byte(multilineLogline),
  38. Timestamp: time.Time{},
  39. })
  40. for j := 0; j < lineCount; j++ {
  41. l.Log(&logger.Message{
  42. Line: []byte(logline),
  43. Timestamp: time.Time{},
  44. })
  45. }
  46. }
  47. }
  48. func testEventBatch(events []wrappedEvent) *eventBatch {
  49. batch := newEventBatch()
  50. for _, event := range events {
  51. eventlen := len([]byte(*event.inputLogEvent.Message))
  52. batch.add(event, eventlen)
  53. }
  54. return batch
  55. }
  56. func TestNewAWSLogsClientUserAgentHandler(t *testing.T) {
  57. info := logger.Info{
  58. Config: map[string]string{
  59. regionKey: "us-east-1",
  60. },
  61. }
  62. client, err := newAWSLogsClient(info)
  63. if err != nil {
  64. t.Fatal(err)
  65. }
  66. realClient, ok := client.(*cloudwatchlogs.CloudWatchLogs)
  67. if !ok {
  68. t.Fatal("Could not cast client to cloudwatchlogs.CloudWatchLogs")
  69. }
  70. buildHandlerList := realClient.Handlers.Build
  71. request := &request.Request{
  72. HTTPRequest: &http.Request{
  73. Header: http.Header{},
  74. },
  75. }
  76. buildHandlerList.Run(request)
  77. expectedUserAgentString := fmt.Sprintf("Docker %s (%s) %s/%s (%s; %s; %s)",
  78. dockerversion.Version, runtime.GOOS, aws.SDKName, aws.SDKVersion, runtime.Version(), runtime.GOOS, runtime.GOARCH)
  79. userAgent := request.HTTPRequest.Header.Get("User-Agent")
  80. if userAgent != expectedUserAgentString {
  81. t.Errorf("Wrong User-Agent string, expected \"%s\" but was \"%s\"",
  82. expectedUserAgentString, userAgent)
  83. }
  84. }
  85. func TestNewAWSLogsClientRegionDetect(t *testing.T) {
  86. info := logger.Info{
  87. Config: map[string]string{},
  88. }
  89. mockMetadata := newMockMetadataClient()
  90. newRegionFinder = func() regionFinder {
  91. return mockMetadata
  92. }
  93. mockMetadata.regionResult <- &regionResult{
  94. successResult: "us-east-1",
  95. }
  96. _, err := newAWSLogsClient(info)
  97. if err != nil {
  98. t.Fatal(err)
  99. }
  100. }
  101. func TestCreateSuccess(t *testing.T) {
  102. mockClient := newMockClient()
  103. stream := &logStream{
  104. client: mockClient,
  105. logGroupName: groupName,
  106. logStreamName: streamName,
  107. }
  108. mockClient.createLogStreamResult <- &createLogStreamResult{}
  109. err := stream.create()
  110. if err != nil {
  111. t.Errorf("Received unexpected err: %v\n", err)
  112. }
  113. argument := <-mockClient.createLogStreamArgument
  114. if argument.LogGroupName == nil {
  115. t.Fatal("Expected non-nil LogGroupName")
  116. }
  117. if *argument.LogGroupName != groupName {
  118. t.Errorf("Expected LogGroupName to be %s", groupName)
  119. }
  120. if argument.LogStreamName == nil {
  121. t.Fatal("Expected non-nil LogStreamName")
  122. }
  123. if *argument.LogStreamName != streamName {
  124. t.Errorf("Expected LogStreamName to be %s", streamName)
  125. }
  126. }
  127. func TestCreateLogGroupSuccess(t *testing.T) {
  128. mockClient := newMockClient()
  129. stream := &logStream{
  130. client: mockClient,
  131. logGroupName: groupName,
  132. logStreamName: streamName,
  133. logCreateGroup: true,
  134. }
  135. mockClient.createLogGroupResult <- &createLogGroupResult{}
  136. mockClient.createLogStreamResult <- &createLogStreamResult{}
  137. err := stream.create()
  138. if err != nil {
  139. t.Errorf("Received unexpected err: %v\n", err)
  140. }
  141. argument := <-mockClient.createLogStreamArgument
  142. if argument.LogGroupName == nil {
  143. t.Fatal("Expected non-nil LogGroupName")
  144. }
  145. if *argument.LogGroupName != groupName {
  146. t.Errorf("Expected LogGroupName to be %s", groupName)
  147. }
  148. if argument.LogStreamName == nil {
  149. t.Fatal("Expected non-nil LogStreamName")
  150. }
  151. if *argument.LogStreamName != streamName {
  152. t.Errorf("Expected LogStreamName to be %s", streamName)
  153. }
  154. }
  155. func TestCreateError(t *testing.T) {
  156. mockClient := newMockClient()
  157. stream := &logStream{
  158. client: mockClient,
  159. }
  160. mockClient.createLogStreamResult <- &createLogStreamResult{
  161. errorResult: errors.New("Error"),
  162. }
  163. err := stream.create()
  164. if err == nil {
  165. t.Fatal("Expected non-nil err")
  166. }
  167. }
  168. func TestCreateAlreadyExists(t *testing.T) {
  169. mockClient := newMockClient()
  170. stream := &logStream{
  171. client: mockClient,
  172. }
  173. mockClient.createLogStreamResult <- &createLogStreamResult{
  174. errorResult: awserr.New(resourceAlreadyExistsCode, "", nil),
  175. }
  176. err := stream.create()
  177. if err != nil {
  178. t.Fatal("Expected nil err")
  179. }
  180. }
  181. func TestLogClosed(t *testing.T) {
  182. mockClient := newMockClient()
  183. stream := &logStream{
  184. client: mockClient,
  185. closed: true,
  186. }
  187. err := stream.Log(&logger.Message{})
  188. if err == nil {
  189. t.Fatal("Expected non-nil error")
  190. }
  191. }
  192. func TestLogBlocking(t *testing.T) {
  193. mockClient := newMockClient()
  194. stream := &logStream{
  195. client: mockClient,
  196. messages: make(chan *logger.Message),
  197. }
  198. errorCh := make(chan error, 1)
  199. started := make(chan bool)
  200. go func() {
  201. started <- true
  202. err := stream.Log(&logger.Message{})
  203. errorCh <- err
  204. }()
  205. <-started
  206. select {
  207. case err := <-errorCh:
  208. t.Fatal("Expected stream.Log to block: ", err)
  209. default:
  210. break
  211. }
  212. select {
  213. case <-stream.messages:
  214. break
  215. default:
  216. t.Fatal("Expected to be able to read from stream.messages but was unable to")
  217. }
  218. select {
  219. case err := <-errorCh:
  220. if err != nil {
  221. t.Fatal(err)
  222. }
  223. case <-time.After(30 * time.Second):
  224. t.Fatal("timed out waiting for read")
  225. }
  226. }
  227. func TestLogNonBlockingBufferEmpty(t *testing.T) {
  228. mockClient := newMockClient()
  229. stream := &logStream{
  230. client: mockClient,
  231. messages: make(chan *logger.Message, 1),
  232. logNonBlocking: true,
  233. }
  234. err := stream.Log(&logger.Message{})
  235. if err != nil {
  236. t.Fatal(err)
  237. }
  238. }
  239. func TestLogNonBlockingBufferFull(t *testing.T) {
  240. mockClient := newMockClient()
  241. stream := &logStream{
  242. client: mockClient,
  243. messages: make(chan *logger.Message, 1),
  244. logNonBlocking: true,
  245. }
  246. stream.messages <- &logger.Message{}
  247. errorCh := make(chan error)
  248. started := make(chan bool)
  249. go func() {
  250. started <- true
  251. err := stream.Log(&logger.Message{})
  252. errorCh <- err
  253. }()
  254. <-started
  255. select {
  256. case err := <-errorCh:
  257. if err == nil {
  258. t.Fatal("Expected non-nil error")
  259. }
  260. case <-time.After(30 * time.Second):
  261. t.Fatal("Expected Log call to not block")
  262. }
  263. }
  264. func TestPublishBatchSuccess(t *testing.T) {
  265. mockClient := newMockClient()
  266. stream := &logStream{
  267. client: mockClient,
  268. logGroupName: groupName,
  269. logStreamName: streamName,
  270. sequenceToken: aws.String(sequenceToken),
  271. }
  272. mockClient.putLogEventsResult <- &putLogEventsResult{
  273. successResult: &cloudwatchlogs.PutLogEventsOutput{
  274. NextSequenceToken: aws.String(nextSequenceToken),
  275. },
  276. }
  277. events := []wrappedEvent{
  278. {
  279. inputLogEvent: &cloudwatchlogs.InputLogEvent{
  280. Message: aws.String(logline),
  281. },
  282. },
  283. }
  284. stream.publishBatch(testEventBatch(events))
  285. if stream.sequenceToken == nil {
  286. t.Fatal("Expected non-nil sequenceToken")
  287. }
  288. if *stream.sequenceToken != nextSequenceToken {
  289. t.Errorf("Expected sequenceToken to be %s, but was %s", nextSequenceToken, *stream.sequenceToken)
  290. }
  291. argument := <-mockClient.putLogEventsArgument
  292. if argument == nil {
  293. t.Fatal("Expected non-nil PutLogEventsInput")
  294. }
  295. if argument.SequenceToken == nil {
  296. t.Fatal("Expected non-nil PutLogEventsInput.SequenceToken")
  297. }
  298. if *argument.SequenceToken != sequenceToken {
  299. t.Errorf("Expected PutLogEventsInput.SequenceToken to be %s, but was %s", sequenceToken, *argument.SequenceToken)
  300. }
  301. if len(argument.LogEvents) != 1 {
  302. t.Errorf("Expected LogEvents to contain 1 element, but contains %d", len(argument.LogEvents))
  303. }
  304. if argument.LogEvents[0] != events[0].inputLogEvent {
  305. t.Error("Expected event to equal input")
  306. }
  307. }
  308. func TestPublishBatchError(t *testing.T) {
  309. mockClient := newMockClient()
  310. stream := &logStream{
  311. client: mockClient,
  312. logGroupName: groupName,
  313. logStreamName: streamName,
  314. sequenceToken: aws.String(sequenceToken),
  315. }
  316. mockClient.putLogEventsResult <- &putLogEventsResult{
  317. errorResult: errors.New("Error"),
  318. }
  319. events := []wrappedEvent{
  320. {
  321. inputLogEvent: &cloudwatchlogs.InputLogEvent{
  322. Message: aws.String(logline),
  323. },
  324. },
  325. }
  326. stream.publishBatch(testEventBatch(events))
  327. if stream.sequenceToken == nil {
  328. t.Fatal("Expected non-nil sequenceToken")
  329. }
  330. if *stream.sequenceToken != sequenceToken {
  331. t.Errorf("Expected sequenceToken to be %s, but was %s", sequenceToken, *stream.sequenceToken)
  332. }
  333. }
  334. func TestPublishBatchInvalidSeqSuccess(t *testing.T) {
  335. mockClient := newMockClientBuffered(2)
  336. stream := &logStream{
  337. client: mockClient,
  338. logGroupName: groupName,
  339. logStreamName: streamName,
  340. sequenceToken: aws.String(sequenceToken),
  341. }
  342. mockClient.putLogEventsResult <- &putLogEventsResult{
  343. errorResult: awserr.New(invalidSequenceTokenCode, "use token token", nil),
  344. }
  345. mockClient.putLogEventsResult <- &putLogEventsResult{
  346. successResult: &cloudwatchlogs.PutLogEventsOutput{
  347. NextSequenceToken: aws.String(nextSequenceToken),
  348. },
  349. }
  350. events := []wrappedEvent{
  351. {
  352. inputLogEvent: &cloudwatchlogs.InputLogEvent{
  353. Message: aws.String(logline),
  354. },
  355. },
  356. }
  357. stream.publishBatch(testEventBatch(events))
  358. if stream.sequenceToken == nil {
  359. t.Fatal("Expected non-nil sequenceToken")
  360. }
  361. if *stream.sequenceToken != nextSequenceToken {
  362. t.Errorf("Expected sequenceToken to be %s, but was %s", nextSequenceToken, *stream.sequenceToken)
  363. }
  364. argument := <-mockClient.putLogEventsArgument
  365. if argument == nil {
  366. t.Fatal("Expected non-nil PutLogEventsInput")
  367. }
  368. if argument.SequenceToken == nil {
  369. t.Fatal("Expected non-nil PutLogEventsInput.SequenceToken")
  370. }
  371. if *argument.SequenceToken != sequenceToken {
  372. t.Errorf("Expected PutLogEventsInput.SequenceToken to be %s, but was %s", sequenceToken, *argument.SequenceToken)
  373. }
  374. if len(argument.LogEvents) != 1 {
  375. t.Errorf("Expected LogEvents to contain 1 element, but contains %d", len(argument.LogEvents))
  376. }
  377. if argument.LogEvents[0] != events[0].inputLogEvent {
  378. t.Error("Expected event to equal input")
  379. }
  380. argument = <-mockClient.putLogEventsArgument
  381. if argument == nil {
  382. t.Fatal("Expected non-nil PutLogEventsInput")
  383. }
  384. if argument.SequenceToken == nil {
  385. t.Fatal("Expected non-nil PutLogEventsInput.SequenceToken")
  386. }
  387. if *argument.SequenceToken != "token" {
  388. t.Errorf("Expected PutLogEventsInput.SequenceToken to be %s, but was %s", "token", *argument.SequenceToken)
  389. }
  390. if len(argument.LogEvents) != 1 {
  391. t.Errorf("Expected LogEvents to contain 1 element, but contains %d", len(argument.LogEvents))
  392. }
  393. if argument.LogEvents[0] != events[0].inputLogEvent {
  394. t.Error("Expected event to equal input")
  395. }
  396. }
  397. func TestPublishBatchAlreadyAccepted(t *testing.T) {
  398. mockClient := newMockClient()
  399. stream := &logStream{
  400. client: mockClient,
  401. logGroupName: groupName,
  402. logStreamName: streamName,
  403. sequenceToken: aws.String(sequenceToken),
  404. }
  405. mockClient.putLogEventsResult <- &putLogEventsResult{
  406. errorResult: awserr.New(dataAlreadyAcceptedCode, "use token token", nil),
  407. }
  408. events := []wrappedEvent{
  409. {
  410. inputLogEvent: &cloudwatchlogs.InputLogEvent{
  411. Message: aws.String(logline),
  412. },
  413. },
  414. }
  415. stream.publishBatch(testEventBatch(events))
  416. if stream.sequenceToken == nil {
  417. t.Fatal("Expected non-nil sequenceToken")
  418. }
  419. if *stream.sequenceToken != "token" {
  420. t.Errorf("Expected sequenceToken to be %s, but was %s", "token", *stream.sequenceToken)
  421. }
  422. argument := <-mockClient.putLogEventsArgument
  423. if argument == nil {
  424. t.Fatal("Expected non-nil PutLogEventsInput")
  425. }
  426. if argument.SequenceToken == nil {
  427. t.Fatal("Expected non-nil PutLogEventsInput.SequenceToken")
  428. }
  429. if *argument.SequenceToken != sequenceToken {
  430. t.Errorf("Expected PutLogEventsInput.SequenceToken to be %s, but was %s", sequenceToken, *argument.SequenceToken)
  431. }
  432. if len(argument.LogEvents) != 1 {
  433. t.Errorf("Expected LogEvents to contain 1 element, but contains %d", len(argument.LogEvents))
  434. }
  435. if argument.LogEvents[0] != events[0].inputLogEvent {
  436. t.Error("Expected event to equal input")
  437. }
  438. }
  439. func TestCollectBatchSimple(t *testing.T) {
  440. mockClient := newMockClient()
  441. stream := &logStream{
  442. client: mockClient,
  443. logGroupName: groupName,
  444. logStreamName: streamName,
  445. sequenceToken: aws.String(sequenceToken),
  446. messages: make(chan *logger.Message),
  447. }
  448. mockClient.putLogEventsResult <- &putLogEventsResult{
  449. successResult: &cloudwatchlogs.PutLogEventsOutput{
  450. NextSequenceToken: aws.String(nextSequenceToken),
  451. },
  452. }
  453. ticks := make(chan time.Time)
  454. newTicker = func(_ time.Duration) *time.Ticker {
  455. return &time.Ticker{
  456. C: ticks,
  457. }
  458. }
  459. d := make(chan bool)
  460. close(d)
  461. go stream.collectBatch(d)
  462. stream.Log(&logger.Message{
  463. Line: []byte(logline),
  464. Timestamp: time.Time{},
  465. })
  466. ticks <- time.Time{}
  467. stream.Close()
  468. argument := <-mockClient.putLogEventsArgument
  469. if argument == nil {
  470. t.Fatal("Expected non-nil PutLogEventsInput")
  471. }
  472. if len(argument.LogEvents) != 1 {
  473. t.Errorf("Expected LogEvents to contain 1 element, but contains %d", len(argument.LogEvents))
  474. }
  475. if *argument.LogEvents[0].Message != logline {
  476. t.Errorf("Expected message to be %s but was %s", logline, *argument.LogEvents[0].Message)
  477. }
  478. }
  479. func TestCollectBatchTicker(t *testing.T) {
  480. mockClient := newMockClient()
  481. stream := &logStream{
  482. client: mockClient,
  483. logGroupName: groupName,
  484. logStreamName: streamName,
  485. sequenceToken: aws.String(sequenceToken),
  486. messages: make(chan *logger.Message),
  487. }
  488. mockClient.putLogEventsResult <- &putLogEventsResult{
  489. successResult: &cloudwatchlogs.PutLogEventsOutput{
  490. NextSequenceToken: aws.String(nextSequenceToken),
  491. },
  492. }
  493. ticks := make(chan time.Time)
  494. newTicker = func(_ time.Duration) *time.Ticker {
  495. return &time.Ticker{
  496. C: ticks,
  497. }
  498. }
  499. d := make(chan bool)
  500. close(d)
  501. go stream.collectBatch(d)
  502. stream.Log(&logger.Message{
  503. Line: []byte(logline + " 1"),
  504. Timestamp: time.Time{},
  505. })
  506. stream.Log(&logger.Message{
  507. Line: []byte(logline + " 2"),
  508. Timestamp: time.Time{},
  509. })
  510. ticks <- time.Time{}
  511. // Verify first batch
  512. argument := <-mockClient.putLogEventsArgument
  513. if argument == nil {
  514. t.Fatal("Expected non-nil PutLogEventsInput")
  515. }
  516. if len(argument.LogEvents) != 2 {
  517. t.Errorf("Expected LogEvents to contain 2 elements, but contains %d", len(argument.LogEvents))
  518. }
  519. if *argument.LogEvents[0].Message != logline+" 1" {
  520. t.Errorf("Expected message to be %s but was %s", logline+" 1", *argument.LogEvents[0].Message)
  521. }
  522. if *argument.LogEvents[1].Message != logline+" 2" {
  523. t.Errorf("Expected message to be %s but was %s", logline+" 2", *argument.LogEvents[0].Message)
  524. }
  525. stream.Log(&logger.Message{
  526. Line: []byte(logline + " 3"),
  527. Timestamp: time.Time{},
  528. })
  529. ticks <- time.Time{}
  530. argument = <-mockClient.putLogEventsArgument
  531. if argument == nil {
  532. t.Fatal("Expected non-nil PutLogEventsInput")
  533. }
  534. if len(argument.LogEvents) != 1 {
  535. t.Errorf("Expected LogEvents to contain 1 elements, but contains %d", len(argument.LogEvents))
  536. }
  537. if *argument.LogEvents[0].Message != logline+" 3" {
  538. t.Errorf("Expected message to be %s but was %s", logline+" 3", *argument.LogEvents[0].Message)
  539. }
  540. stream.Close()
  541. }
  542. func TestCollectBatchMultilinePattern(t *testing.T) {
  543. mockClient := newMockClient()
  544. multilinePattern := regexp.MustCompile("xxxx")
  545. stream := &logStream{
  546. client: mockClient,
  547. logGroupName: groupName,
  548. logStreamName: streamName,
  549. multilinePattern: multilinePattern,
  550. sequenceToken: aws.String(sequenceToken),
  551. messages: make(chan *logger.Message),
  552. }
  553. mockClient.putLogEventsResult <- &putLogEventsResult{
  554. successResult: &cloudwatchlogs.PutLogEventsOutput{
  555. NextSequenceToken: aws.String(nextSequenceToken),
  556. },
  557. }
  558. ticks := make(chan time.Time)
  559. newTicker = func(_ time.Duration) *time.Ticker {
  560. return &time.Ticker{
  561. C: ticks,
  562. }
  563. }
  564. d := make(chan bool)
  565. close(d)
  566. go stream.collectBatch(d)
  567. stream.Log(&logger.Message{
  568. Line: []byte(logline),
  569. Timestamp: time.Now(),
  570. })
  571. stream.Log(&logger.Message{
  572. Line: []byte(logline),
  573. Timestamp: time.Now(),
  574. })
  575. stream.Log(&logger.Message{
  576. Line: []byte("xxxx " + logline),
  577. Timestamp: time.Now(),
  578. })
  579. ticks <- time.Now()
  580. // Verify single multiline event
  581. argument := <-mockClient.putLogEventsArgument
  582. assert.Check(t, argument != nil, "Expected non-nil PutLogEventsInput")
  583. assert.Check(t, is.Equal(1, len(argument.LogEvents)), "Expected single multiline event")
  584. assert.Check(t, is.Equal(logline+"\n"+logline+"\n", *argument.LogEvents[0].Message), "Received incorrect multiline message")
  585. stream.Close()
  586. // Verify single event
  587. argument = <-mockClient.putLogEventsArgument
  588. assert.Check(t, argument != nil, "Expected non-nil PutLogEventsInput")
  589. assert.Check(t, is.Equal(1, len(argument.LogEvents)), "Expected single multiline event")
  590. assert.Check(t, is.Equal("xxxx "+logline+"\n", *argument.LogEvents[0].Message), "Received incorrect multiline message")
  591. }
  592. func BenchmarkCollectBatch(b *testing.B) {
  593. for i := 0; i < b.N; i++ {
  594. mockClient := newMockClient()
  595. stream := &logStream{
  596. client: mockClient,
  597. logGroupName: groupName,
  598. logStreamName: streamName,
  599. sequenceToken: aws.String(sequenceToken),
  600. messages: make(chan *logger.Message),
  601. }
  602. mockClient.putLogEventsResult <- &putLogEventsResult{
  603. successResult: &cloudwatchlogs.PutLogEventsOutput{
  604. NextSequenceToken: aws.String(nextSequenceToken),
  605. },
  606. }
  607. ticks := make(chan time.Time)
  608. newTicker = func(_ time.Duration) *time.Ticker {
  609. return &time.Ticker{
  610. C: ticks,
  611. }
  612. }
  613. d := make(chan bool)
  614. close(d)
  615. go stream.collectBatch(d)
  616. stream.logGenerator(10, 100)
  617. ticks <- time.Time{}
  618. stream.Close()
  619. }
  620. }
  621. func BenchmarkCollectBatchMultilinePattern(b *testing.B) {
  622. for i := 0; i < b.N; i++ {
  623. mockClient := newMockClient()
  624. multilinePattern := regexp.MustCompile(`\d{4}-(?:0[1-9]|1[0-2])-(?:0[1-9]|[1,2][0-9]|3[0,1]) (?:[0,1][0-9]|2[0-3]):[0-5][0-9]:[0-5][0-9]`)
  625. stream := &logStream{
  626. client: mockClient,
  627. logGroupName: groupName,
  628. logStreamName: streamName,
  629. multilinePattern: multilinePattern,
  630. sequenceToken: aws.String(sequenceToken),
  631. messages: make(chan *logger.Message),
  632. }
  633. mockClient.putLogEventsResult <- &putLogEventsResult{
  634. successResult: &cloudwatchlogs.PutLogEventsOutput{
  635. NextSequenceToken: aws.String(nextSequenceToken),
  636. },
  637. }
  638. ticks := make(chan time.Time)
  639. newTicker = func(_ time.Duration) *time.Ticker {
  640. return &time.Ticker{
  641. C: ticks,
  642. }
  643. }
  644. d := make(chan bool)
  645. close(d)
  646. go stream.collectBatch(d)
  647. stream.logGenerator(10, 100)
  648. ticks <- time.Time{}
  649. stream.Close()
  650. }
  651. }
  652. func TestCollectBatchMultilinePatternMaxEventAge(t *testing.T) {
  653. mockClient := newMockClient()
  654. multilinePattern := regexp.MustCompile("xxxx")
  655. stream := &logStream{
  656. client: mockClient,
  657. logGroupName: groupName,
  658. logStreamName: streamName,
  659. multilinePattern: multilinePattern,
  660. sequenceToken: aws.String(sequenceToken),
  661. messages: make(chan *logger.Message),
  662. }
  663. mockClient.putLogEventsResult <- &putLogEventsResult{
  664. successResult: &cloudwatchlogs.PutLogEventsOutput{
  665. NextSequenceToken: aws.String(nextSequenceToken),
  666. },
  667. }
  668. ticks := make(chan time.Time)
  669. newTicker = func(_ time.Duration) *time.Ticker {
  670. return &time.Ticker{
  671. C: ticks,
  672. }
  673. }
  674. d := make(chan bool)
  675. close(d)
  676. go stream.collectBatch(d)
  677. stream.Log(&logger.Message{
  678. Line: []byte(logline),
  679. Timestamp: time.Now(),
  680. })
  681. // Log an event 1 second later
  682. stream.Log(&logger.Message{
  683. Line: []byte(logline),
  684. Timestamp: time.Now().Add(time.Second),
  685. })
  686. // Fire ticker batchPublishFrequency seconds later
  687. ticks <- time.Now().Add(batchPublishFrequency + time.Second)
  688. // Verify single multiline event is flushed after maximum event buffer age (batchPublishFrequency)
  689. argument := <-mockClient.putLogEventsArgument
  690. assert.Check(t, argument != nil, "Expected non-nil PutLogEventsInput")
  691. assert.Check(t, is.Equal(1, len(argument.LogEvents)), "Expected single multiline event")
  692. assert.Check(t, is.Equal(logline+"\n"+logline+"\n", *argument.LogEvents[0].Message), "Received incorrect multiline message")
  693. // Log an event 1 second later
  694. stream.Log(&logger.Message{
  695. Line: []byte(logline),
  696. Timestamp: time.Now().Add(time.Second),
  697. })
  698. // Fire ticker another batchPublishFrequency seconds later
  699. ticks <- time.Now().Add(2*batchPublishFrequency + time.Second)
  700. // Verify the event buffer is truly flushed - we should only receive a single event
  701. argument = <-mockClient.putLogEventsArgument
  702. assert.Check(t, argument != nil, "Expected non-nil PutLogEventsInput")
  703. assert.Check(t, is.Equal(1, len(argument.LogEvents)), "Expected single multiline event")
  704. assert.Check(t, is.Equal(logline+"\n", *argument.LogEvents[0].Message), "Received incorrect multiline message")
  705. stream.Close()
  706. }
  707. func TestCollectBatchMultilinePatternNegativeEventAge(t *testing.T) {
  708. mockClient := newMockClient()
  709. multilinePattern := regexp.MustCompile("xxxx")
  710. stream := &logStream{
  711. client: mockClient,
  712. logGroupName: groupName,
  713. logStreamName: streamName,
  714. multilinePattern: multilinePattern,
  715. sequenceToken: aws.String(sequenceToken),
  716. messages: make(chan *logger.Message),
  717. }
  718. mockClient.putLogEventsResult <- &putLogEventsResult{
  719. successResult: &cloudwatchlogs.PutLogEventsOutput{
  720. NextSequenceToken: aws.String(nextSequenceToken),
  721. },
  722. }
  723. ticks := make(chan time.Time)
  724. newTicker = func(_ time.Duration) *time.Ticker {
  725. return &time.Ticker{
  726. C: ticks,
  727. }
  728. }
  729. d := make(chan bool)
  730. close(d)
  731. go stream.collectBatch(d)
  732. stream.Log(&logger.Message{
  733. Line: []byte(logline),
  734. Timestamp: time.Now(),
  735. })
  736. // Log an event 1 second later
  737. stream.Log(&logger.Message{
  738. Line: []byte(logline),
  739. Timestamp: time.Now().Add(time.Second),
  740. })
  741. // Fire ticker in past to simulate negative event buffer age
  742. ticks <- time.Now().Add(-time.Second)
  743. // Verify single multiline event is flushed with a negative event buffer age
  744. argument := <-mockClient.putLogEventsArgument
  745. assert.Check(t, argument != nil, "Expected non-nil PutLogEventsInput")
  746. assert.Check(t, is.Equal(1, len(argument.LogEvents)), "Expected single multiline event")
  747. assert.Check(t, is.Equal(logline+"\n"+logline+"\n", *argument.LogEvents[0].Message), "Received incorrect multiline message")
  748. stream.Close()
  749. }
  750. func TestCollectBatchMultilinePatternMaxEventSize(t *testing.T) {
  751. mockClient := newMockClient()
  752. multilinePattern := regexp.MustCompile("xxxx")
  753. stream := &logStream{
  754. client: mockClient,
  755. logGroupName: groupName,
  756. logStreamName: streamName,
  757. multilinePattern: multilinePattern,
  758. sequenceToken: aws.String(sequenceToken),
  759. messages: make(chan *logger.Message),
  760. }
  761. mockClient.putLogEventsResult <- &putLogEventsResult{
  762. successResult: &cloudwatchlogs.PutLogEventsOutput{
  763. NextSequenceToken: aws.String(nextSequenceToken),
  764. },
  765. }
  766. ticks := make(chan time.Time)
  767. newTicker = func(_ time.Duration) *time.Ticker {
  768. return &time.Ticker{
  769. C: ticks,
  770. }
  771. }
  772. d := make(chan bool)
  773. close(d)
  774. go stream.collectBatch(d)
  775. // Log max event size
  776. longline := strings.Repeat("A", maximumBytesPerEvent)
  777. stream.Log(&logger.Message{
  778. Line: []byte(longline),
  779. Timestamp: time.Now(),
  780. })
  781. // Log short event
  782. shortline := strings.Repeat("B", 100)
  783. stream.Log(&logger.Message{
  784. Line: []byte(shortline),
  785. Timestamp: time.Now(),
  786. })
  787. // Fire ticker
  788. ticks <- time.Now().Add(batchPublishFrequency)
  789. // Verify multiline events
  790. // We expect a maximum sized event with no new line characters and a
  791. // second short event with a new line character at the end
  792. argument := <-mockClient.putLogEventsArgument
  793. assert.Check(t, argument != nil, "Expected non-nil PutLogEventsInput")
  794. assert.Check(t, is.Equal(2, len(argument.LogEvents)), "Expected two events")
  795. assert.Check(t, is.Equal(longline, *argument.LogEvents[0].Message), "Received incorrect multiline message")
  796. assert.Check(t, is.Equal(shortline+"\n", *argument.LogEvents[1].Message), "Received incorrect multiline message")
  797. stream.Close()
  798. }
  799. func TestCollectBatchClose(t *testing.T) {
  800. mockClient := newMockClient()
  801. stream := &logStream{
  802. client: mockClient,
  803. logGroupName: groupName,
  804. logStreamName: streamName,
  805. sequenceToken: aws.String(sequenceToken),
  806. messages: make(chan *logger.Message),
  807. }
  808. mockClient.putLogEventsResult <- &putLogEventsResult{
  809. successResult: &cloudwatchlogs.PutLogEventsOutput{
  810. NextSequenceToken: aws.String(nextSequenceToken),
  811. },
  812. }
  813. var ticks = make(chan time.Time)
  814. newTicker = func(_ time.Duration) *time.Ticker {
  815. return &time.Ticker{
  816. C: ticks,
  817. }
  818. }
  819. d := make(chan bool)
  820. close(d)
  821. go stream.collectBatch(d)
  822. stream.Log(&logger.Message{
  823. Line: []byte(logline),
  824. Timestamp: time.Time{},
  825. })
  826. // no ticks
  827. stream.Close()
  828. argument := <-mockClient.putLogEventsArgument
  829. if argument == nil {
  830. t.Fatal("Expected non-nil PutLogEventsInput")
  831. }
  832. if len(argument.LogEvents) != 1 {
  833. t.Errorf("Expected LogEvents to contain 1 element, but contains %d", len(argument.LogEvents))
  834. }
  835. if *argument.LogEvents[0].Message != logline {
  836. t.Errorf("Expected message to be %s but was %s", logline, *argument.LogEvents[0].Message)
  837. }
  838. }
  839. func TestCollectBatchLineSplit(t *testing.T) {
  840. mockClient := newMockClient()
  841. stream := &logStream{
  842. client: mockClient,
  843. logGroupName: groupName,
  844. logStreamName: streamName,
  845. sequenceToken: aws.String(sequenceToken),
  846. messages: make(chan *logger.Message),
  847. }
  848. mockClient.putLogEventsResult <- &putLogEventsResult{
  849. successResult: &cloudwatchlogs.PutLogEventsOutput{
  850. NextSequenceToken: aws.String(nextSequenceToken),
  851. },
  852. }
  853. var ticks = make(chan time.Time)
  854. newTicker = func(_ time.Duration) *time.Ticker {
  855. return &time.Ticker{
  856. C: ticks,
  857. }
  858. }
  859. d := make(chan bool)
  860. close(d)
  861. go stream.collectBatch(d)
  862. longline := strings.Repeat("A", maximumBytesPerEvent)
  863. stream.Log(&logger.Message{
  864. Line: []byte(longline + "B"),
  865. Timestamp: time.Time{},
  866. })
  867. // no ticks
  868. stream.Close()
  869. argument := <-mockClient.putLogEventsArgument
  870. if argument == nil {
  871. t.Fatal("Expected non-nil PutLogEventsInput")
  872. }
  873. if len(argument.LogEvents) != 2 {
  874. t.Errorf("Expected LogEvents to contain 2 elements, but contains %d", len(argument.LogEvents))
  875. }
  876. if *argument.LogEvents[0].Message != longline {
  877. t.Errorf("Expected message to be %s but was %s", longline, *argument.LogEvents[0].Message)
  878. }
  879. if *argument.LogEvents[1].Message != "B" {
  880. t.Errorf("Expected message to be %s but was %s", "B", *argument.LogEvents[1].Message)
  881. }
  882. }
  883. func TestCollectBatchMaxEvents(t *testing.T) {
  884. mockClient := newMockClientBuffered(1)
  885. stream := &logStream{
  886. client: mockClient,
  887. logGroupName: groupName,
  888. logStreamName: streamName,
  889. sequenceToken: aws.String(sequenceToken),
  890. messages: make(chan *logger.Message),
  891. }
  892. mockClient.putLogEventsResult <- &putLogEventsResult{
  893. successResult: &cloudwatchlogs.PutLogEventsOutput{
  894. NextSequenceToken: aws.String(nextSequenceToken),
  895. },
  896. }
  897. var ticks = make(chan time.Time)
  898. newTicker = func(_ time.Duration) *time.Ticker {
  899. return &time.Ticker{
  900. C: ticks,
  901. }
  902. }
  903. d := make(chan bool)
  904. close(d)
  905. go stream.collectBatch(d)
  906. line := "A"
  907. for i := 0; i <= maximumLogEventsPerPut; i++ {
  908. stream.Log(&logger.Message{
  909. Line: []byte(line),
  910. Timestamp: time.Time{},
  911. })
  912. }
  913. // no ticks
  914. stream.Close()
  915. argument := <-mockClient.putLogEventsArgument
  916. if argument == nil {
  917. t.Fatal("Expected non-nil PutLogEventsInput")
  918. }
  919. if len(argument.LogEvents) != maximumLogEventsPerPut {
  920. t.Errorf("Expected LogEvents to contain %d elements, but contains %d", maximumLogEventsPerPut, len(argument.LogEvents))
  921. }
  922. argument = <-mockClient.putLogEventsArgument
  923. if argument == nil {
  924. t.Fatal("Expected non-nil PutLogEventsInput")
  925. }
  926. if len(argument.LogEvents) != 1 {
  927. t.Errorf("Expected LogEvents to contain %d elements, but contains %d", 1, len(argument.LogEvents))
  928. }
  929. }
  930. func TestCollectBatchMaxTotalBytes(t *testing.T) {
  931. expectedPuts := 2
  932. mockClient := newMockClientBuffered(expectedPuts)
  933. stream := &logStream{
  934. client: mockClient,
  935. logGroupName: groupName,
  936. logStreamName: streamName,
  937. sequenceToken: aws.String(sequenceToken),
  938. messages: make(chan *logger.Message),
  939. }
  940. for i := 0; i < expectedPuts; i++ {
  941. mockClient.putLogEventsResult <- &putLogEventsResult{
  942. successResult: &cloudwatchlogs.PutLogEventsOutput{
  943. NextSequenceToken: aws.String(nextSequenceToken),
  944. },
  945. }
  946. }
  947. var ticks = make(chan time.Time)
  948. newTicker = func(_ time.Duration) *time.Ticker {
  949. return &time.Ticker{
  950. C: ticks,
  951. }
  952. }
  953. d := make(chan bool)
  954. close(d)
  955. go stream.collectBatch(d)
  956. numPayloads := maximumBytesPerPut / (maximumBytesPerEvent + perEventBytes)
  957. // maxline is the maximum line that could be submitted after
  958. // accounting for its overhead.
  959. maxline := strings.Repeat("A", maximumBytesPerPut-(perEventBytes*numPayloads))
  960. // This will be split and batched up to the `maximumBytesPerPut'
  961. // (+/- `maximumBytesPerEvent'). This /should/ be aligned, but
  962. // should also tolerate an offset within that range.
  963. stream.Log(&logger.Message{
  964. Line: []byte(maxline[:len(maxline)/2]),
  965. Timestamp: time.Time{},
  966. })
  967. stream.Log(&logger.Message{
  968. Line: []byte(maxline[len(maxline)/2:]),
  969. Timestamp: time.Time{},
  970. })
  971. stream.Log(&logger.Message{
  972. Line: []byte("B"),
  973. Timestamp: time.Time{},
  974. })
  975. // no ticks, guarantee batch by size (and chan close)
  976. stream.Close()
  977. argument := <-mockClient.putLogEventsArgument
  978. if argument == nil {
  979. t.Fatal("Expected non-nil PutLogEventsInput")
  980. }
  981. // Should total to the maximum allowed bytes.
  982. eventBytes := 0
  983. for _, event := range argument.LogEvents {
  984. eventBytes += len(*event.Message)
  985. }
  986. eventsOverhead := len(argument.LogEvents) * perEventBytes
  987. payloadTotal := eventBytes + eventsOverhead
  988. // lowestMaxBatch allows the payload to be offset if the messages
  989. // don't lend themselves to align with the maximum event size.
  990. lowestMaxBatch := maximumBytesPerPut - maximumBytesPerEvent
  991. if payloadTotal > maximumBytesPerPut {
  992. t.Errorf("Expected <= %d bytes but was %d", maximumBytesPerPut, payloadTotal)
  993. }
  994. if payloadTotal < lowestMaxBatch {
  995. t.Errorf("Batch to be no less than %d but was %d", lowestMaxBatch, payloadTotal)
  996. }
  997. argument = <-mockClient.putLogEventsArgument
  998. if len(argument.LogEvents) != 1 {
  999. t.Errorf("Expected LogEvents to contain 1 elements, but contains %d", len(argument.LogEvents))
  1000. }
  1001. message := *argument.LogEvents[len(argument.LogEvents)-1].Message
  1002. if message[len(message)-1:] != "B" {
  1003. t.Errorf("Expected message to be %s but was %s", "B", message[len(message)-1:])
  1004. }
  1005. }
  1006. func TestCollectBatchWithDuplicateTimestamps(t *testing.T) {
  1007. mockClient := newMockClient()
  1008. stream := &logStream{
  1009. client: mockClient,
  1010. logGroupName: groupName,
  1011. logStreamName: streamName,
  1012. sequenceToken: aws.String(sequenceToken),
  1013. messages: make(chan *logger.Message),
  1014. }
  1015. mockClient.putLogEventsResult <- &putLogEventsResult{
  1016. successResult: &cloudwatchlogs.PutLogEventsOutput{
  1017. NextSequenceToken: aws.String(nextSequenceToken),
  1018. },
  1019. }
  1020. ticks := make(chan time.Time)
  1021. newTicker = func(_ time.Duration) *time.Ticker {
  1022. return &time.Ticker{
  1023. C: ticks,
  1024. }
  1025. }
  1026. d := make(chan bool)
  1027. close(d)
  1028. go stream.collectBatch(d)
  1029. var expectedEvents []*cloudwatchlogs.InputLogEvent
  1030. times := maximumLogEventsPerPut
  1031. timestamp := time.Now()
  1032. for i := 0; i < times; i++ {
  1033. line := fmt.Sprintf("%d", i)
  1034. if i%2 == 0 {
  1035. timestamp.Add(1 * time.Nanosecond)
  1036. }
  1037. stream.Log(&logger.Message{
  1038. Line: []byte(line),
  1039. Timestamp: timestamp,
  1040. })
  1041. expectedEvents = append(expectedEvents, &cloudwatchlogs.InputLogEvent{
  1042. Message: aws.String(line),
  1043. Timestamp: aws.Int64(timestamp.UnixNano() / int64(time.Millisecond)),
  1044. })
  1045. }
  1046. ticks <- time.Time{}
  1047. stream.Close()
  1048. argument := <-mockClient.putLogEventsArgument
  1049. if argument == nil {
  1050. t.Fatal("Expected non-nil PutLogEventsInput")
  1051. }
  1052. if len(argument.LogEvents) != times {
  1053. t.Errorf("Expected LogEvents to contain %d elements, but contains %d", times, len(argument.LogEvents))
  1054. }
  1055. for i := 0; i < times; i++ {
  1056. if !reflect.DeepEqual(*argument.LogEvents[i], *expectedEvents[i]) {
  1057. t.Errorf("Expected event to be %v but was %v", *expectedEvents[i], *argument.LogEvents[i])
  1058. }
  1059. }
  1060. }
  1061. func TestParseLogOptionsMultilinePattern(t *testing.T) {
  1062. info := logger.Info{
  1063. Config: map[string]string{
  1064. multilinePatternKey: "^xxxx",
  1065. },
  1066. }
  1067. multilinePattern, err := parseMultilineOptions(info)
  1068. assert.Check(t, err, "Received unexpected error")
  1069. assert.Check(t, multilinePattern.MatchString("xxxx"), "No multiline pattern match found")
  1070. }
  1071. func TestParseLogOptionsDatetimeFormat(t *testing.T) {
  1072. datetimeFormatTests := []struct {
  1073. format string
  1074. match string
  1075. }{
  1076. {"%d/%m/%y %a %H:%M:%S%L %Z", "31/12/10 Mon 08:42:44.345 NZDT"},
  1077. {"%Y-%m-%d %A %I:%M:%S.%f%p%z", "2007-12-04 Monday 08:42:44.123456AM+1200"},
  1078. {"%b|%b|%b|%b|%b|%b|%b|%b|%b|%b|%b|%b", "Jan|Feb|Mar|Apr|May|Jun|Jul|Aug|Sep|Oct|Nov|Dec"},
  1079. {"%B|%B|%B|%B|%B|%B|%B|%B|%B|%B|%B|%B", "January|February|March|April|May|June|July|August|September|October|November|December"},
  1080. {"%A|%A|%A|%A|%A|%A|%A", "Monday|Tuesday|Wednesday|Thursday|Friday|Saturday|Sunday"},
  1081. {"%a|%a|%a|%a|%a|%a|%a", "Mon|Tue|Wed|Thu|Fri|Sat|Sun"},
  1082. {"Day of the week: %w, Day of the year: %j", "Day of the week: 4, Day of the year: 091"},
  1083. }
  1084. for _, dt := range datetimeFormatTests {
  1085. t.Run(dt.match, func(t *testing.T) {
  1086. info := logger.Info{
  1087. Config: map[string]string{
  1088. datetimeFormatKey: dt.format,
  1089. },
  1090. }
  1091. multilinePattern, err := parseMultilineOptions(info)
  1092. assert.Check(t, err, "Received unexpected error")
  1093. assert.Check(t, multilinePattern.MatchString(dt.match), "No multiline pattern match found")
  1094. })
  1095. }
  1096. }
  1097. func TestValidateLogOptionsDatetimeFormatAndMultilinePattern(t *testing.T) {
  1098. cfg := map[string]string{
  1099. multilinePatternKey: "^xxxx",
  1100. datetimeFormatKey: "%Y-%m-%d",
  1101. logGroupKey: groupName,
  1102. }
  1103. conflictingLogOptionsError := "you cannot configure log opt 'awslogs-datetime-format' and 'awslogs-multiline-pattern' at the same time"
  1104. err := ValidateLogOpt(cfg)
  1105. assert.Check(t, err != nil, "Expected an error")
  1106. assert.Check(t, is.Equal(err.Error(), conflictingLogOptionsError), "Received invalid error")
  1107. }
  1108. func TestCreateTagSuccess(t *testing.T) {
  1109. mockClient := newMockClient()
  1110. info := logger.Info{
  1111. ContainerName: "/test-container",
  1112. ContainerID: "container-abcdefghijklmnopqrstuvwxyz01234567890",
  1113. Config: map[string]string{"tag": "{{.Name}}/{{.FullID}}"},
  1114. }
  1115. logStreamName, e := loggerutils.ParseLogTag(info, loggerutils.DefaultTemplate)
  1116. if e != nil {
  1117. t.Errorf("Error generating tag: %q", e)
  1118. }
  1119. stream := &logStream{
  1120. client: mockClient,
  1121. logGroupName: groupName,
  1122. logStreamName: logStreamName,
  1123. }
  1124. mockClient.createLogStreamResult <- &createLogStreamResult{}
  1125. err := stream.create()
  1126. if err != nil {
  1127. t.Errorf("Received unexpected err: %v\n", err)
  1128. }
  1129. argument := <-mockClient.createLogStreamArgument
  1130. if *argument.LogStreamName != "test-container/container-abcdefghijklmnopqrstuvwxyz01234567890" {
  1131. t.Errorf("Expected LogStreamName to be %s", "test-container/container-abcdefghijklmnopqrstuvwxyz01234567890")
  1132. }
  1133. }
  1134. func BenchmarkUnwrapEvents(b *testing.B) {
  1135. events := make([]wrappedEvent, maximumLogEventsPerPut)
  1136. for i := 0; i < maximumLogEventsPerPut; i++ {
  1137. mes := strings.Repeat("0", maximumBytesPerEvent)
  1138. events[i].inputLogEvent = &cloudwatchlogs.InputLogEvent{
  1139. Message: &mes,
  1140. }
  1141. }
  1142. b.ResetTimer()
  1143. for i := 0; i < b.N; i++ {
  1144. res := unwrapEvents(events)
  1145. assert.Check(b, is.Len(res, maximumLogEventsPerPut))
  1146. }
  1147. }
  1148. func TestNewAWSLogsClientCredentialEndpointDetect(t *testing.T) {
  1149. // required for the cloudwatchlogs client
  1150. os.Setenv("AWS_REGION", "us-west-2")
  1151. defer os.Unsetenv("AWS_REGION")
  1152. credsResp := `{
  1153. "AccessKeyId" : "test-access-key-id",
  1154. "SecretAccessKey": "test-secret-access-key"
  1155. }`
  1156. expectedAccessKeyID := "test-access-key-id"
  1157. expectedSecretAccessKey := "test-secret-access-key"
  1158. testServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
  1159. w.Header().Set("Content-Type", "application/json")
  1160. fmt.Fprintln(w, credsResp)
  1161. }))
  1162. defer testServer.Close()
  1163. // set the SDKEndpoint in the driver
  1164. newSDKEndpoint = testServer.URL
  1165. info := logger.Info{
  1166. Config: map[string]string{},
  1167. }
  1168. info.Config["awslogs-credentials-endpoint"] = "/creds"
  1169. c, err := newAWSLogsClient(info)
  1170. assert.Check(t, err)
  1171. client := c.(*cloudwatchlogs.CloudWatchLogs)
  1172. creds, err := client.Config.Credentials.Get()
  1173. assert.Check(t, err)
  1174. assert.Check(t, is.Equal(expectedAccessKeyID, creds.AccessKeyID))
  1175. assert.Check(t, is.Equal(expectedSecretAccessKey, creds.SecretAccessKey))
  1176. }
  1177. func TestNewAWSLogsClientCredentialEnvironmentVariable(t *testing.T) {
  1178. // required for the cloudwatchlogs client
  1179. os.Setenv("AWS_REGION", "us-west-2")
  1180. defer os.Unsetenv("AWS_REGION")
  1181. expectedAccessKeyID := "test-access-key-id"
  1182. expectedSecretAccessKey := "test-secret-access-key"
  1183. os.Setenv("AWS_ACCESS_KEY_ID", expectedAccessKeyID)
  1184. defer os.Unsetenv("AWS_ACCESS_KEY_ID")
  1185. os.Setenv("AWS_SECRET_ACCESS_KEY", expectedSecretAccessKey)
  1186. defer os.Unsetenv("AWS_SECRET_ACCESS_KEY")
  1187. info := logger.Info{
  1188. Config: map[string]string{},
  1189. }
  1190. c, err := newAWSLogsClient(info)
  1191. assert.Check(t, err)
  1192. client := c.(*cloudwatchlogs.CloudWatchLogs)
  1193. creds, err := client.Config.Credentials.Get()
  1194. assert.Check(t, err)
  1195. assert.Check(t, is.Equal(expectedAccessKeyID, creds.AccessKeyID))
  1196. assert.Check(t, is.Equal(expectedSecretAccessKey, creds.SecretAccessKey))
  1197. }
  1198. func TestNewAWSLogsClientCredentialSharedFile(t *testing.T) {
  1199. // required for the cloudwatchlogs client
  1200. os.Setenv("AWS_REGION", "us-west-2")
  1201. defer os.Unsetenv("AWS_REGION")
  1202. expectedAccessKeyID := "test-access-key-id"
  1203. expectedSecretAccessKey := "test-secret-access-key"
  1204. contentStr := `
  1205. [default]
  1206. aws_access_key_id = "test-access-key-id"
  1207. aws_secret_access_key = "test-secret-access-key"
  1208. `
  1209. content := []byte(contentStr)
  1210. tmpfile, err := ioutil.TempFile("", "example")
  1211. defer os.Remove(tmpfile.Name()) // clean up
  1212. assert.Check(t, err)
  1213. _, err = tmpfile.Write(content)
  1214. assert.Check(t, err)
  1215. err = tmpfile.Close()
  1216. assert.Check(t, err)
  1217. os.Unsetenv("AWS_ACCESS_KEY_ID")
  1218. os.Unsetenv("AWS_SECRET_ACCESS_KEY")
  1219. os.Setenv("AWS_SHARED_CREDENTIALS_FILE", tmpfile.Name())
  1220. defer os.Unsetenv("AWS_SHARED_CREDENTIALS_FILE")
  1221. info := logger.Info{
  1222. Config: map[string]string{},
  1223. }
  1224. c, err := newAWSLogsClient(info)
  1225. assert.Check(t, err)
  1226. client := c.(*cloudwatchlogs.CloudWatchLogs)
  1227. creds, err := client.Config.Credentials.Get()
  1228. assert.Check(t, err)
  1229. assert.Check(t, is.Equal(expectedAccessKeyID, creds.AccessKeyID))
  1230. assert.Check(t, is.Equal(expectedSecretAccessKey, creds.SecretAccessKey))
  1231. }