cloudwatchlogs_test.go 51 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457145814591460146114621463146414651466146714681469147014711472147314741475147614771478147914801481148214831484148514861487148814891490149114921493149414951496149714981499150015011502150315041505150615071508150915101511151215131514151515161517151815191520152115221523152415251526152715281529153015311532153315341535153615371538153915401541154215431544154515461547154815491550155115521553155415551556155715581559156015611562156315641565156615671568156915701571157215731574157515761577157815791580158115821583158415851586158715881589159015911592159315941595159615971598159916001601160216031604160516061607160816091610161116121613161416151616161716181619162016211622162316241625162616271628162916301631163216331634163516361637163816391640164116421643164416451646164716481649165016511652165316541655165616571658165916601661166216631664166516661667166816691670167116721673167416751676167716781679168016811682168316841685168616871688168916901691169216931694169516961697
  1. package awslogs // import "github.com/docker/docker/daemon/logger/awslogs"
  2. import (
  3. "context"
  4. "errors"
  5. "fmt"
  6. "net/http"
  7. "net/http/httptest"
  8. "reflect"
  9. "regexp"
  10. "strconv"
  11. "strings"
  12. "sync/atomic"
  13. "testing"
  14. "time"
  15. "github.com/aws/aws-sdk-go-v2/aws"
  16. "github.com/aws/aws-sdk-go-v2/config"
  17. "github.com/aws/aws-sdk-go-v2/credentials"
  18. "github.com/aws/aws-sdk-go-v2/service/cloudwatchlogs"
  19. "github.com/aws/aws-sdk-go-v2/service/cloudwatchlogs/types"
  20. "github.com/docker/docker/daemon/logger"
  21. "github.com/docker/docker/daemon/logger/loggerutils"
  22. "github.com/docker/docker/dockerversion"
  23. "gotest.tools/v3/assert"
  24. is "gotest.tools/v3/assert/cmp"
  25. )
  26. const (
  27. groupName = "groupName"
  28. streamName = "streamName"
  29. sequenceToken = "sequenceToken"
  30. nextSequenceToken = "nextSequenceToken"
  31. logline = "this is a log line\r"
  32. multilineLogline = "2017-01-01 01:01:44 This is a multiline log entry\r"
  33. )
  34. // Generates i multi-line events each with j lines
  35. func (l *logStream) logGenerator(lineCount int, multilineCount int) {
  36. for i := 0; i < multilineCount; i++ {
  37. l.Log(&logger.Message{
  38. Line: []byte(multilineLogline),
  39. Timestamp: time.Time{},
  40. })
  41. for j := 0; j < lineCount; j++ {
  42. l.Log(&logger.Message{
  43. Line: []byte(logline),
  44. Timestamp: time.Time{},
  45. })
  46. }
  47. }
  48. }
  49. func testEventBatch(events []wrappedEvent) *eventBatch {
  50. batch := newEventBatch()
  51. for _, event := range events {
  52. eventlen := len([]byte(*event.inputLogEvent.Message))
  53. batch.add(event, eventlen)
  54. }
  55. return batch
  56. }
  57. func TestNewStreamConfig(t *testing.T) {
  58. tests := []struct {
  59. logStreamName string
  60. logGroupName string
  61. logCreateGroup string
  62. logCreateStream string
  63. logNonBlocking string
  64. forceFlushInterval string
  65. maxBufferedEvents string
  66. datetimeFormat string
  67. multilinePattern string
  68. shouldErr bool
  69. testName string
  70. }{
  71. {"", groupName, "", "", "", "", "", "", "", false, "defaults"},
  72. {"", groupName, "invalid create group", "", "", "", "", "", "", true, "invalid create group"},
  73. {"", groupName, "", "", "", "invalid flush interval", "", "", "", true, "invalid flush interval"},
  74. {"", groupName, "", "", "", "", "invalid max buffered events", "", "", true, "invalid max buffered events"},
  75. {"", groupName, "", "", "", "", "", "", "n{1001}", true, "invalid multiline pattern"},
  76. {"", groupName, "", "", "", "15", "", "", "", false, "flush interval at 15"},
  77. {"", groupName, "", "", "", "", "1024", "", "", false, "max buffered events at 1024"},
  78. }
  79. for _, tc := range tests {
  80. t.Run(tc.testName, func(t *testing.T) {
  81. cfg := map[string]string{
  82. logGroupKey: tc.logGroupName,
  83. logCreateGroupKey: tc.logCreateGroup,
  84. "mode": tc.logNonBlocking,
  85. forceFlushIntervalKey: tc.forceFlushInterval,
  86. maxBufferedEventsKey: tc.maxBufferedEvents,
  87. logStreamKey: tc.logStreamName,
  88. logCreateStreamKey: tc.logCreateStream,
  89. datetimeFormatKey: tc.datetimeFormat,
  90. multilinePatternKey: tc.multilinePattern,
  91. }
  92. info := logger.Info{
  93. Config: cfg,
  94. }
  95. logStreamConfig, err := newStreamConfig(info)
  96. if tc.shouldErr {
  97. assert.Check(t, err != nil, "Expected an error")
  98. } else {
  99. assert.Check(t, err == nil, "Unexpected error")
  100. assert.Check(t, logStreamConfig.logGroupName == tc.logGroupName, "Unexpected logGroupName")
  101. if tc.forceFlushInterval != "" {
  102. forceFlushIntervalAsInt, _ := strconv.Atoi(info.Config[forceFlushIntervalKey])
  103. assert.Check(t, logStreamConfig.forceFlushInterval == time.Duration(forceFlushIntervalAsInt)*time.Second, "Unexpected forceFlushInterval")
  104. }
  105. if tc.maxBufferedEvents != "" {
  106. maxBufferedEvents, _ := strconv.Atoi(info.Config[maxBufferedEventsKey])
  107. assert.Check(t, logStreamConfig.maxBufferedEvents == maxBufferedEvents, "Unexpected maxBufferedEvents")
  108. }
  109. }
  110. })
  111. }
  112. }
  113. func TestNewAWSLogsClientUserAgentHandler(t *testing.T) {
  114. ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
  115. userAgent := r.Header.Get("User-Agent")
  116. assert.Check(t, is.Contains(userAgent, "Docker/"+dockerversion.Version))
  117. fmt.Fprintln(w, "{}")
  118. }))
  119. defer ts.Close()
  120. info := logger.Info{
  121. Config: map[string]string{
  122. regionKey: "us-east-1",
  123. endpointKey: ts.URL,
  124. },
  125. }
  126. client, err := newAWSLogsClient(
  127. info,
  128. config.WithCredentialsProvider(credentials.StaticCredentialsProvider{
  129. Value: aws.Credentials{AccessKeyID: "AKID", SecretAccessKey: "SECRET", SessionToken: "SESSION"},
  130. }),
  131. )
  132. assert.NilError(t, err)
  133. _, err = client.CreateLogGroup(context.TODO(), &cloudwatchlogs.CreateLogGroupInput{LogGroupName: aws.String("foo")})
  134. assert.NilError(t, err)
  135. }
  136. func TestNewAWSLogsClientLogFormatHeaderHandler(t *testing.T) {
  137. tests := []struct {
  138. logFormat string
  139. expectedHeaderValue string
  140. }{
  141. {
  142. logFormat: jsonEmfLogFormat,
  143. expectedHeaderValue: "json/emf",
  144. },
  145. {
  146. logFormat: "",
  147. expectedHeaderValue: "",
  148. },
  149. }
  150. for _, tc := range tests {
  151. t.Run(tc.logFormat, func(t *testing.T) {
  152. ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
  153. logFormatHeaderVal := r.Header.Get("x-amzn-logs-format")
  154. assert.Check(t, is.Equal(tc.expectedHeaderValue, logFormatHeaderVal))
  155. fmt.Fprintln(w, "{}")
  156. }))
  157. defer ts.Close()
  158. info := logger.Info{
  159. Config: map[string]string{
  160. regionKey: "us-east-1",
  161. logFormatKey: tc.logFormat,
  162. endpointKey: ts.URL,
  163. },
  164. }
  165. client, err := newAWSLogsClient(
  166. info,
  167. config.WithCredentialsProvider(credentials.StaticCredentialsProvider{
  168. Value: aws.Credentials{AccessKeyID: "AKID", SecretAccessKey: "SECRET", SessionToken: "SESSION"},
  169. }),
  170. )
  171. assert.NilError(t, err)
  172. _, err = client.CreateLogGroup(context.TODO(), &cloudwatchlogs.CreateLogGroupInput{LogGroupName: aws.String("foo")})
  173. assert.NilError(t, err)
  174. })
  175. }
  176. }
  177. func TestNewAWSLogsClientAWSLogsEndpoint(t *testing.T) {
  178. called := atomic.Value{} // for go1.19 and later, can use atomic.Bool
  179. called.Store(false)
  180. ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
  181. called.Store(true)
  182. fmt.Fprintln(w, "{}")
  183. }))
  184. defer ts.Close()
  185. info := logger.Info{
  186. Config: map[string]string{
  187. regionKey: "us-east-1",
  188. endpointKey: ts.URL,
  189. },
  190. }
  191. client, err := newAWSLogsClient(
  192. info,
  193. config.WithCredentialsProvider(credentials.StaticCredentialsProvider{
  194. Value: aws.Credentials{AccessKeyID: "AKID", SecretAccessKey: "SECRET", SessionToken: "SESSION"},
  195. }),
  196. )
  197. assert.NilError(t, err)
  198. _, err = client.CreateLogGroup(context.TODO(), &cloudwatchlogs.CreateLogGroupInput{LogGroupName: aws.String("foo")})
  199. assert.NilError(t, err)
  200. // make sure the endpoint was actually hit
  201. assert.Check(t, called.Load().(bool))
  202. }
  203. func TestNewAWSLogsClientRegionDetect(t *testing.T) {
  204. info := logger.Info{
  205. Config: map[string]string{},
  206. }
  207. mockMetadata := newMockMetadataClient()
  208. newRegionFinder = func(context.Context) (regionFinder, error) {
  209. return mockMetadata, nil
  210. }
  211. mockMetadata.regionResult <- &regionResult{
  212. successResult: "us-east-1",
  213. }
  214. _, err := newAWSLogsClient(info)
  215. assert.NilError(t, err)
  216. }
  217. func TestCreateSuccess(t *testing.T) {
  218. mockClient := &mockClient{}
  219. stream := &logStream{
  220. client: mockClient,
  221. logGroupName: groupName,
  222. logStreamName: streamName,
  223. logCreateStream: true,
  224. }
  225. var input *cloudwatchlogs.CreateLogStreamInput
  226. mockClient.createLogStreamFunc = func(ctx context.Context, i *cloudwatchlogs.CreateLogStreamInput, opts ...func(*cloudwatchlogs.Options)) (*cloudwatchlogs.CreateLogStreamOutput, error) {
  227. input = i
  228. return &cloudwatchlogs.CreateLogStreamOutput{}, nil
  229. }
  230. err := stream.create()
  231. assert.NilError(t, err)
  232. assert.Equal(t, groupName, aws.ToString(input.LogGroupName), "LogGroupName")
  233. assert.Equal(t, streamName, aws.ToString(input.LogStreamName), "LogStreamName")
  234. }
  235. func TestCreateStreamSkipped(t *testing.T) {
  236. mockClient := &mockClient{}
  237. stream := &logStream{
  238. client: mockClient,
  239. logGroupName: groupName,
  240. logStreamName: streamName,
  241. logCreateStream: false,
  242. }
  243. mockClient.createLogStreamFunc = func(ctx context.Context, i *cloudwatchlogs.CreateLogStreamInput, opts ...func(*cloudwatchlogs.Options)) (*cloudwatchlogs.CreateLogStreamOutput, error) {
  244. t.Error("CreateLogStream should not be called")
  245. return nil, errors.New("should not be called")
  246. }
  247. err := stream.create()
  248. assert.NilError(t, err)
  249. }
  250. func TestCreateLogGroupSuccess(t *testing.T) {
  251. mockClient := &mockClient{}
  252. stream := &logStream{
  253. client: mockClient,
  254. logGroupName: groupName,
  255. logStreamName: streamName,
  256. logCreateGroup: true,
  257. logCreateStream: true,
  258. }
  259. var logGroupInput *cloudwatchlogs.CreateLogGroupInput
  260. mockClient.createLogGroupFunc = func(ctx context.Context, input *cloudwatchlogs.CreateLogGroupInput, opts ...func(*cloudwatchlogs.Options)) (*cloudwatchlogs.CreateLogGroupOutput, error) {
  261. logGroupInput = input
  262. return &cloudwatchlogs.CreateLogGroupOutput{}, nil
  263. }
  264. var logStreamInput *cloudwatchlogs.CreateLogStreamInput
  265. createLogStreamCalls := 0
  266. mockClient.createLogStreamFunc = func(ctx context.Context, input *cloudwatchlogs.CreateLogStreamInput, opts ...func(*cloudwatchlogs.Options)) (*cloudwatchlogs.CreateLogStreamOutput, error) {
  267. createLogStreamCalls++
  268. if logGroupInput == nil {
  269. // log group not created yet
  270. return nil, &types.ResourceNotFoundException{}
  271. }
  272. logStreamInput = input
  273. return &cloudwatchlogs.CreateLogStreamOutput{}, nil
  274. }
  275. err := stream.create()
  276. assert.NilError(t, err)
  277. if createLogStreamCalls < 2 {
  278. t.Errorf("Expected CreateLogStream to be called twice, was called %d times", createLogStreamCalls)
  279. }
  280. assert.Check(t, logGroupInput != nil)
  281. assert.Equal(t, groupName, aws.ToString(logGroupInput.LogGroupName), "LogGroupName in LogGroupInput")
  282. assert.Check(t, logStreamInput != nil)
  283. assert.Equal(t, groupName, aws.ToString(logStreamInput.LogGroupName), "LogGroupName in LogStreamInput")
  284. assert.Equal(t, streamName, aws.ToString(logStreamInput.LogStreamName), "LogStreamName in LogStreamInput")
  285. }
  286. func TestCreateError(t *testing.T) {
  287. mockClient := &mockClient{}
  288. stream := &logStream{
  289. client: mockClient,
  290. logCreateStream: true,
  291. }
  292. mockClient.createLogStreamFunc = func(ctx context.Context, i *cloudwatchlogs.CreateLogStreamInput, opts ...func(*cloudwatchlogs.Options)) (*cloudwatchlogs.CreateLogStreamOutput, error) {
  293. return nil, errors.New("error")
  294. }
  295. err := stream.create()
  296. if err == nil {
  297. t.Fatal("Expected non-nil err")
  298. }
  299. }
  300. func TestCreateAlreadyExists(t *testing.T) {
  301. mockClient := &mockClient{}
  302. stream := &logStream{
  303. client: mockClient,
  304. logCreateStream: true,
  305. }
  306. calls := 0
  307. mockClient.createLogStreamFunc = func(ctx context.Context, input *cloudwatchlogs.CreateLogStreamInput, opts ...func(*cloudwatchlogs.Options)) (*cloudwatchlogs.CreateLogStreamOutput, error) {
  308. calls++
  309. return nil, &types.ResourceAlreadyExistsException{}
  310. }
  311. err := stream.create()
  312. assert.NilError(t, err)
  313. assert.Equal(t, 1, calls)
  314. }
  315. func TestLogClosed(t *testing.T) {
  316. mockClient := &mockClient{}
  317. stream := &logStream{
  318. client: mockClient,
  319. closed: true,
  320. }
  321. err := stream.Log(&logger.Message{})
  322. assert.Check(t, err != nil)
  323. }
  324. // TestLogBlocking tests that the Log method blocks appropriately when
  325. // non-blocking behavior is not enabled. Blocking is achieved through an
  326. // internal channel that must be drained for Log to return.
  327. func TestLogBlocking(t *testing.T) {
  328. mockClient := &mockClient{}
  329. stream := &logStream{
  330. client: mockClient,
  331. messages: make(chan *logger.Message),
  332. }
  333. errorCh := make(chan error, 1)
  334. started := make(chan bool)
  335. go func() {
  336. started <- true
  337. err := stream.Log(&logger.Message{})
  338. errorCh <- err
  339. }()
  340. // block until the goroutine above has started
  341. <-started
  342. select {
  343. case err := <-errorCh:
  344. t.Fatal("Expected stream.Log to block: ", err)
  345. default:
  346. }
  347. // assuming it is blocked, we can now try to drain the internal channel and
  348. // unblock it
  349. select {
  350. case <-time.After(10 * time.Millisecond):
  351. // if we're unable to drain the channel within 10ms, something seems broken
  352. t.Fatal("Expected to be able to read from stream.messages but was unable to")
  353. case <-stream.messages:
  354. }
  355. select {
  356. case err := <-errorCh:
  357. assert.NilError(t, err)
  358. case <-time.After(30 * time.Second):
  359. t.Fatal("timed out waiting for read")
  360. }
  361. }
  362. func TestLogBufferEmpty(t *testing.T) {
  363. mockClient := &mockClient{}
  364. stream := &logStream{
  365. client: mockClient,
  366. messages: make(chan *logger.Message, 1),
  367. }
  368. err := stream.Log(&logger.Message{})
  369. assert.NilError(t, err)
  370. }
  371. func TestPublishBatchSuccess(t *testing.T) {
  372. mockClient := &mockClient{}
  373. stream := &logStream{
  374. client: mockClient,
  375. logGroupName: groupName,
  376. logStreamName: streamName,
  377. sequenceToken: aws.String(sequenceToken),
  378. }
  379. var input *cloudwatchlogs.PutLogEventsInput
  380. mockClient.putLogEventsFunc = func(ctx context.Context, i *cloudwatchlogs.PutLogEventsInput, opts ...func(*cloudwatchlogs.Options)) (*cloudwatchlogs.PutLogEventsOutput, error) {
  381. input = i
  382. return &cloudwatchlogs.PutLogEventsOutput{
  383. NextSequenceToken: aws.String(nextSequenceToken),
  384. }, nil
  385. }
  386. events := []wrappedEvent{
  387. {
  388. inputLogEvent: types.InputLogEvent{
  389. Message: aws.String(logline),
  390. },
  391. },
  392. }
  393. stream.publishBatch(testEventBatch(events))
  394. assert.Equal(t, nextSequenceToken, aws.ToString(stream.sequenceToken), "sequenceToken")
  395. assert.Assert(t, input != nil)
  396. assert.Equal(t, sequenceToken, aws.ToString(input.SequenceToken), "input.SequenceToken")
  397. assert.Assert(t, len(input.LogEvents) == 1)
  398. assert.Equal(t, events[0].inputLogEvent, input.LogEvents[0])
  399. }
  400. func TestPublishBatchError(t *testing.T) {
  401. mockClient := &mockClient{}
  402. stream := &logStream{
  403. client: mockClient,
  404. logGroupName: groupName,
  405. logStreamName: streamName,
  406. sequenceToken: aws.String(sequenceToken),
  407. }
  408. mockClient.putLogEventsFunc = func(ctx context.Context, input *cloudwatchlogs.PutLogEventsInput, opts ...func(*cloudwatchlogs.Options)) (*cloudwatchlogs.PutLogEventsOutput, error) {
  409. return nil, errors.New("error")
  410. }
  411. events := []wrappedEvent{
  412. {
  413. inputLogEvent: types.InputLogEvent{
  414. Message: aws.String(logline),
  415. },
  416. },
  417. }
  418. stream.publishBatch(testEventBatch(events))
  419. assert.Equal(t, sequenceToken, aws.ToString(stream.sequenceToken))
  420. }
  421. func TestPublishBatchInvalidSeqSuccess(t *testing.T) {
  422. mockClient := &mockClient{}
  423. stream := &logStream{
  424. client: mockClient,
  425. logGroupName: groupName,
  426. logStreamName: streamName,
  427. sequenceToken: aws.String(sequenceToken),
  428. }
  429. calls := make([]*cloudwatchlogs.PutLogEventsInput, 0)
  430. mockClient.putLogEventsFunc = func(ctx context.Context, input *cloudwatchlogs.PutLogEventsInput, opts ...func(*cloudwatchlogs.Options)) (*cloudwatchlogs.PutLogEventsOutput, error) {
  431. calls = append(calls, input)
  432. if aws.ToString(input.SequenceToken) != "token" {
  433. return nil, &types.InvalidSequenceTokenException{
  434. ExpectedSequenceToken: aws.String("token"),
  435. }
  436. }
  437. return &cloudwatchlogs.PutLogEventsOutput{
  438. NextSequenceToken: aws.String(nextSequenceToken),
  439. }, nil
  440. }
  441. events := []wrappedEvent{
  442. {
  443. inputLogEvent: types.InputLogEvent{
  444. Message: aws.String(logline),
  445. },
  446. },
  447. }
  448. stream.publishBatch(testEventBatch(events))
  449. assert.Equal(t, nextSequenceToken, aws.ToString(stream.sequenceToken))
  450. assert.Assert(t, len(calls) == 2)
  451. argument := calls[0]
  452. assert.Assert(t, argument != nil)
  453. assert.Equal(t, sequenceToken, aws.ToString(argument.SequenceToken))
  454. assert.Assert(t, len(argument.LogEvents) == 1)
  455. assert.Equal(t, events[0].inputLogEvent, argument.LogEvents[0])
  456. argument = calls[1]
  457. assert.Assert(t, argument != nil)
  458. assert.Equal(t, "token", aws.ToString(argument.SequenceToken))
  459. assert.Assert(t, len(argument.LogEvents) == 1)
  460. assert.Equal(t, events[0].inputLogEvent, argument.LogEvents[0])
  461. }
  462. func TestPublishBatchAlreadyAccepted(t *testing.T) {
  463. mockClient := &mockClient{}
  464. stream := &logStream{
  465. client: mockClient,
  466. logGroupName: groupName,
  467. logStreamName: streamName,
  468. sequenceToken: aws.String(sequenceToken),
  469. }
  470. calls := make([]*cloudwatchlogs.PutLogEventsInput, 0)
  471. mockClient.putLogEventsFunc = func(ctx context.Context, input *cloudwatchlogs.PutLogEventsInput, opts ...func(*cloudwatchlogs.Options)) (*cloudwatchlogs.PutLogEventsOutput, error) {
  472. calls = append(calls, input)
  473. return nil, &types.DataAlreadyAcceptedException{
  474. ExpectedSequenceToken: aws.String("token"),
  475. }
  476. }
  477. events := []wrappedEvent{
  478. {
  479. inputLogEvent: types.InputLogEvent{
  480. Message: aws.String(logline),
  481. },
  482. },
  483. }
  484. stream.publishBatch(testEventBatch(events))
  485. assert.Assert(t, stream.sequenceToken != nil)
  486. assert.Equal(t, "token", aws.ToString(stream.sequenceToken))
  487. assert.Assert(t, len(calls) == 1)
  488. argument := calls[0]
  489. assert.Assert(t, argument != nil)
  490. assert.Equal(t, sequenceToken, aws.ToString(argument.SequenceToken))
  491. assert.Assert(t, len(argument.LogEvents) == 1)
  492. assert.Equal(t, events[0].inputLogEvent, argument.LogEvents[0])
  493. }
  494. func TestCollectBatchSimple(t *testing.T) {
  495. mockClient := &mockClient{}
  496. stream := &logStream{
  497. client: mockClient,
  498. logGroupName: groupName,
  499. logStreamName: streamName,
  500. sequenceToken: aws.String(sequenceToken),
  501. messages: make(chan *logger.Message),
  502. }
  503. calls := make([]*cloudwatchlogs.PutLogEventsInput, 0)
  504. mockClient.putLogEventsFunc = func(ctx context.Context, input *cloudwatchlogs.PutLogEventsInput, opts ...func(*cloudwatchlogs.Options)) (*cloudwatchlogs.PutLogEventsOutput, error) {
  505. calls = append(calls, input)
  506. return &cloudwatchlogs.PutLogEventsOutput{
  507. NextSequenceToken: aws.String(nextSequenceToken),
  508. }, nil
  509. }
  510. ticks := make(chan time.Time)
  511. newTicker = func(_ time.Duration) *time.Ticker {
  512. return &time.Ticker{
  513. C: ticks,
  514. }
  515. }
  516. d := make(chan bool)
  517. close(d)
  518. go stream.collectBatch(d)
  519. stream.Log(&logger.Message{
  520. Line: []byte(logline),
  521. Timestamp: time.Time{},
  522. })
  523. ticks <- time.Time{}
  524. ticks <- time.Time{}
  525. stream.Close()
  526. assert.Assert(t, len(calls) == 1)
  527. argument := calls[0]
  528. assert.Assert(t, argument != nil)
  529. assert.Assert(t, len(argument.LogEvents) == 1)
  530. assert.Equal(t, logline, aws.ToString(argument.LogEvents[0].Message))
  531. }
  532. func TestCollectBatchTicker(t *testing.T) {
  533. mockClient := &mockClient{}
  534. stream := &logStream{
  535. client: mockClient,
  536. logGroupName: groupName,
  537. logStreamName: streamName,
  538. sequenceToken: aws.String(sequenceToken),
  539. messages: make(chan *logger.Message),
  540. }
  541. calls := make([]*cloudwatchlogs.PutLogEventsInput, 0)
  542. called := make(chan struct{}, 50)
  543. mockClient.putLogEventsFunc = func(ctx context.Context, input *cloudwatchlogs.PutLogEventsInput, opts ...func(*cloudwatchlogs.Options)) (*cloudwatchlogs.PutLogEventsOutput, error) {
  544. calls = append(calls, input)
  545. called <- struct{}{}
  546. return &cloudwatchlogs.PutLogEventsOutput{
  547. NextSequenceToken: aws.String(nextSequenceToken),
  548. }, nil
  549. }
  550. ticks := make(chan time.Time)
  551. newTicker = func(_ time.Duration) *time.Ticker {
  552. return &time.Ticker{
  553. C: ticks,
  554. }
  555. }
  556. d := make(chan bool)
  557. close(d)
  558. go stream.collectBatch(d)
  559. stream.Log(&logger.Message{
  560. Line: []byte(logline + " 1"),
  561. Timestamp: time.Time{},
  562. })
  563. stream.Log(&logger.Message{
  564. Line: []byte(logline + " 2"),
  565. Timestamp: time.Time{},
  566. })
  567. ticks <- time.Time{}
  568. // Verify first batch
  569. <-called
  570. assert.Assert(t, len(calls) == 1)
  571. argument := calls[0]
  572. calls = calls[1:]
  573. assert.Assert(t, argument != nil)
  574. assert.Assert(t, len(argument.LogEvents) == 2)
  575. assert.Equal(t, logline+" 1", aws.ToString(argument.LogEvents[0].Message))
  576. assert.Equal(t, logline+" 2", aws.ToString(argument.LogEvents[1].Message))
  577. stream.Log(&logger.Message{
  578. Line: []byte(logline + " 3"),
  579. Timestamp: time.Time{},
  580. })
  581. ticks <- time.Time{}
  582. <-called
  583. assert.Assert(t, len(calls) == 1)
  584. argument = calls[0]
  585. close(called)
  586. assert.Assert(t, argument != nil)
  587. assert.Assert(t, len(argument.LogEvents) == 1)
  588. assert.Equal(t, logline+" 3", aws.ToString(argument.LogEvents[0].Message))
  589. stream.Close()
  590. }
  591. func TestCollectBatchMultilinePattern(t *testing.T) {
  592. mockClient := &mockClient{}
  593. multilinePattern := regexp.MustCompile("xxxx")
  594. stream := &logStream{
  595. client: mockClient,
  596. logGroupName: groupName,
  597. logStreamName: streamName,
  598. multilinePattern: multilinePattern,
  599. sequenceToken: aws.String(sequenceToken),
  600. messages: make(chan *logger.Message),
  601. }
  602. calls := make([]*cloudwatchlogs.PutLogEventsInput, 0)
  603. called := make(chan struct{}, 50)
  604. mockClient.putLogEventsFunc = func(ctx context.Context, input *cloudwatchlogs.PutLogEventsInput, opts ...func(*cloudwatchlogs.Options)) (*cloudwatchlogs.PutLogEventsOutput, error) {
  605. calls = append(calls, input)
  606. called <- struct{}{}
  607. return &cloudwatchlogs.PutLogEventsOutput{
  608. NextSequenceToken: aws.String(nextSequenceToken),
  609. }, nil
  610. }
  611. ticks := make(chan time.Time)
  612. newTicker = func(_ time.Duration) *time.Ticker {
  613. return &time.Ticker{
  614. C: ticks,
  615. }
  616. }
  617. d := make(chan bool)
  618. close(d)
  619. go stream.collectBatch(d)
  620. stream.Log(&logger.Message{
  621. Line: []byte(logline),
  622. Timestamp: time.Now(),
  623. })
  624. stream.Log(&logger.Message{
  625. Line: []byte(logline),
  626. Timestamp: time.Now(),
  627. })
  628. stream.Log(&logger.Message{
  629. Line: []byte("xxxx " + logline),
  630. Timestamp: time.Now(),
  631. })
  632. ticks <- time.Now()
  633. // Verify single multiline event
  634. <-called
  635. assert.Assert(t, len(calls) == 1)
  636. argument := calls[0]
  637. calls = calls[1:]
  638. assert.Check(t, argument != nil, "Expected non-nil PutLogEventsInput")
  639. assert.Check(t, is.Equal(1, len(argument.LogEvents)), "Expected single multiline event")
  640. assert.Check(t, is.Equal(logline+"\n"+logline+"\n", aws.ToString(argument.LogEvents[0].Message)), "Received incorrect multiline message")
  641. stream.Close()
  642. // Verify single event
  643. <-called
  644. assert.Assert(t, len(calls) == 1)
  645. argument = calls[0]
  646. close(called)
  647. assert.Check(t, argument != nil, "Expected non-nil PutLogEventsInput")
  648. assert.Check(t, is.Equal(1, len(argument.LogEvents)), "Expected single multiline event")
  649. assert.Check(t, is.Equal("xxxx "+logline+"\n", aws.ToString(argument.LogEvents[0].Message)), "Received incorrect multiline message")
  650. }
  651. func BenchmarkCollectBatch(b *testing.B) {
  652. for i := 0; i < b.N; i++ {
  653. mockClient := &mockClient{}
  654. stream := &logStream{
  655. client: mockClient,
  656. logGroupName: groupName,
  657. logStreamName: streamName,
  658. sequenceToken: aws.String(sequenceToken),
  659. messages: make(chan *logger.Message),
  660. }
  661. mockClient.putLogEventsFunc = func(ctx context.Context, input *cloudwatchlogs.PutLogEventsInput, opts ...func(*cloudwatchlogs.Options)) (*cloudwatchlogs.PutLogEventsOutput, error) {
  662. return &cloudwatchlogs.PutLogEventsOutput{
  663. NextSequenceToken: aws.String(nextSequenceToken),
  664. }, nil
  665. }
  666. ticks := make(chan time.Time)
  667. newTicker = func(_ time.Duration) *time.Ticker {
  668. return &time.Ticker{
  669. C: ticks,
  670. }
  671. }
  672. d := make(chan bool)
  673. close(d)
  674. go stream.collectBatch(d)
  675. stream.logGenerator(10, 100)
  676. ticks <- time.Time{}
  677. stream.Close()
  678. }
  679. }
  680. func BenchmarkCollectBatchMultilinePattern(b *testing.B) {
  681. for i := 0; i < b.N; i++ {
  682. mockClient := &mockClient{}
  683. multilinePattern := regexp.MustCompile(`\d{4}-(?:0[1-9]|1[0-2])-(?:0[1-9]|[1,2][0-9]|3[0,1]) (?:[0,1][0-9]|2[0-3]):[0-5][0-9]:[0-5][0-9]`)
  684. stream := &logStream{
  685. client: mockClient,
  686. logGroupName: groupName,
  687. logStreamName: streamName,
  688. multilinePattern: multilinePattern,
  689. sequenceToken: aws.String(sequenceToken),
  690. messages: make(chan *logger.Message),
  691. }
  692. mockClient.putLogEventsFunc = func(ctx context.Context, input *cloudwatchlogs.PutLogEventsInput, opts ...func(*cloudwatchlogs.Options)) (*cloudwatchlogs.PutLogEventsOutput, error) {
  693. return &cloudwatchlogs.PutLogEventsOutput{
  694. NextSequenceToken: aws.String(nextSequenceToken),
  695. }, nil
  696. }
  697. ticks := make(chan time.Time)
  698. newTicker = func(_ time.Duration) *time.Ticker {
  699. return &time.Ticker{
  700. C: ticks,
  701. }
  702. }
  703. d := make(chan bool)
  704. close(d)
  705. go stream.collectBatch(d)
  706. stream.logGenerator(10, 100)
  707. ticks <- time.Time{}
  708. stream.Close()
  709. }
  710. }
  711. func TestCollectBatchMultilinePatternMaxEventAge(t *testing.T) {
  712. mockClient := &mockClient{}
  713. multilinePattern := regexp.MustCompile("xxxx")
  714. stream := &logStream{
  715. client: mockClient,
  716. logGroupName: groupName,
  717. logStreamName: streamName,
  718. multilinePattern: multilinePattern,
  719. sequenceToken: aws.String(sequenceToken),
  720. messages: make(chan *logger.Message),
  721. }
  722. calls := make([]*cloudwatchlogs.PutLogEventsInput, 0)
  723. called := make(chan struct{}, 50)
  724. mockClient.putLogEventsFunc = func(ctx context.Context, input *cloudwatchlogs.PutLogEventsInput, opts ...func(*cloudwatchlogs.Options)) (*cloudwatchlogs.PutLogEventsOutput, error) {
  725. calls = append(calls, input)
  726. called <- struct{}{}
  727. return &cloudwatchlogs.PutLogEventsOutput{
  728. NextSequenceToken: aws.String(nextSequenceToken),
  729. }, nil
  730. }
  731. ticks := make(chan time.Time)
  732. newTicker = func(_ time.Duration) *time.Ticker {
  733. return &time.Ticker{
  734. C: ticks,
  735. }
  736. }
  737. d := make(chan bool)
  738. close(d)
  739. go stream.collectBatch(d)
  740. stream.Log(&logger.Message{
  741. Line: []byte(logline),
  742. Timestamp: time.Now(),
  743. })
  744. // Log an event 1 second later
  745. stream.Log(&logger.Message{
  746. Line: []byte(logline),
  747. Timestamp: time.Now().Add(time.Second),
  748. })
  749. // Fire ticker defaultForceFlushInterval seconds later
  750. ticks <- time.Now().Add(defaultForceFlushInterval + time.Second)
  751. // Verify single multiline event is flushed after maximum event buffer age (defaultForceFlushInterval)
  752. <-called
  753. assert.Assert(t, len(calls) == 1)
  754. argument := calls[0]
  755. calls = calls[1:]
  756. assert.Check(t, argument != nil, "Expected non-nil PutLogEventsInput")
  757. assert.Check(t, is.Equal(1, len(argument.LogEvents)), "Expected single multiline event")
  758. assert.Check(t, is.Equal(logline+"\n"+logline+"\n", aws.ToString(argument.LogEvents[0].Message)), "Received incorrect multiline message")
  759. // Log an event 1 second later
  760. stream.Log(&logger.Message{
  761. Line: []byte(logline),
  762. Timestamp: time.Now().Add(time.Second),
  763. })
  764. // Fire ticker another defaultForceFlushInterval seconds later
  765. ticks <- time.Now().Add(2*defaultForceFlushInterval + time.Second)
  766. // Verify the event buffer is truly flushed - we should only receive a single event
  767. <-called
  768. assert.Assert(t, len(calls) == 1)
  769. argument = calls[0]
  770. close(called)
  771. assert.Check(t, argument != nil, "Expected non-nil PutLogEventsInput")
  772. assert.Check(t, is.Equal(1, len(argument.LogEvents)), "Expected single multiline event")
  773. assert.Check(t, is.Equal(logline+"\n", aws.ToString(argument.LogEvents[0].Message)), "Received incorrect multiline message")
  774. stream.Close()
  775. }
  776. func TestCollectBatchMultilinePatternNegativeEventAge(t *testing.T) {
  777. mockClient := &mockClient{}
  778. multilinePattern := regexp.MustCompile("xxxx")
  779. stream := &logStream{
  780. client: mockClient,
  781. logGroupName: groupName,
  782. logStreamName: streamName,
  783. multilinePattern: multilinePattern,
  784. sequenceToken: aws.String(sequenceToken),
  785. messages: make(chan *logger.Message),
  786. }
  787. calls := make([]*cloudwatchlogs.PutLogEventsInput, 0)
  788. called := make(chan struct{}, 50)
  789. mockClient.putLogEventsFunc = func(ctx context.Context, input *cloudwatchlogs.PutLogEventsInput, opts ...func(*cloudwatchlogs.Options)) (*cloudwatchlogs.PutLogEventsOutput, error) {
  790. calls = append(calls, input)
  791. called <- struct{}{}
  792. return &cloudwatchlogs.PutLogEventsOutput{
  793. NextSequenceToken: aws.String(nextSequenceToken),
  794. }, nil
  795. }
  796. ticks := make(chan time.Time)
  797. newTicker = func(_ time.Duration) *time.Ticker {
  798. return &time.Ticker{
  799. C: ticks,
  800. }
  801. }
  802. d := make(chan bool)
  803. close(d)
  804. go stream.collectBatch(d)
  805. stream.Log(&logger.Message{
  806. Line: []byte(logline),
  807. Timestamp: time.Now(),
  808. })
  809. // Log an event 1 second later
  810. stream.Log(&logger.Message{
  811. Line: []byte(logline),
  812. Timestamp: time.Now().Add(time.Second),
  813. })
  814. // Fire ticker in past to simulate negative event buffer age
  815. ticks <- time.Now().Add(-time.Second)
  816. // Verify single multiline event is flushed with a negative event buffer age
  817. <-called
  818. assert.Assert(t, len(calls) == 1)
  819. argument := calls[0]
  820. close(called)
  821. assert.Check(t, argument != nil, "Expected non-nil PutLogEventsInput")
  822. assert.Check(t, is.Equal(1, len(argument.LogEvents)), "Expected single multiline event")
  823. assert.Check(t, is.Equal(logline+"\n"+logline+"\n", aws.ToString(argument.LogEvents[0].Message)), "Received incorrect multiline message")
  824. stream.Close()
  825. }
  826. func TestCollectBatchMultilinePatternMaxEventSize(t *testing.T) {
  827. mockClient := &mockClient{}
  828. multilinePattern := regexp.MustCompile("xxxx")
  829. stream := &logStream{
  830. client: mockClient,
  831. logGroupName: groupName,
  832. logStreamName: streamName,
  833. multilinePattern: multilinePattern,
  834. sequenceToken: aws.String(sequenceToken),
  835. messages: make(chan *logger.Message),
  836. }
  837. calls := make([]*cloudwatchlogs.PutLogEventsInput, 0)
  838. called := make(chan struct{}, 50)
  839. mockClient.putLogEventsFunc = func(ctx context.Context, input *cloudwatchlogs.PutLogEventsInput, opts ...func(*cloudwatchlogs.Options)) (*cloudwatchlogs.PutLogEventsOutput, error) {
  840. calls = append(calls, input)
  841. called <- struct{}{}
  842. return &cloudwatchlogs.PutLogEventsOutput{
  843. NextSequenceToken: aws.String(nextSequenceToken),
  844. }, nil
  845. }
  846. ticks := make(chan time.Time)
  847. newTicker = func(_ time.Duration) *time.Ticker {
  848. return &time.Ticker{
  849. C: ticks,
  850. }
  851. }
  852. d := make(chan bool)
  853. close(d)
  854. go stream.collectBatch(d)
  855. // Log max event size
  856. longline := strings.Repeat("A", maximumBytesPerEvent)
  857. stream.Log(&logger.Message{
  858. Line: []byte(longline),
  859. Timestamp: time.Now(),
  860. })
  861. // Log short event
  862. shortline := strings.Repeat("B", 100)
  863. stream.Log(&logger.Message{
  864. Line: []byte(shortline),
  865. Timestamp: time.Now(),
  866. })
  867. // Fire ticker
  868. ticks <- time.Now().Add(defaultForceFlushInterval)
  869. // Verify multiline events
  870. // We expect a maximum sized event with no new line characters and a
  871. // second short event with a new line character at the end
  872. <-called
  873. assert.Assert(t, len(calls) == 1)
  874. argument := calls[0]
  875. close(called)
  876. assert.Check(t, argument != nil, "Expected non-nil PutLogEventsInput")
  877. assert.Check(t, is.Equal(2, len(argument.LogEvents)), "Expected two events")
  878. assert.Check(t, is.Equal(longline, aws.ToString(argument.LogEvents[0].Message)), "Received incorrect multiline message")
  879. assert.Check(t, is.Equal(shortline+"\n", aws.ToString(argument.LogEvents[1].Message)), "Received incorrect multiline message")
  880. stream.Close()
  881. }
  882. func TestCollectBatchClose(t *testing.T) {
  883. mockClient := &mockClient{}
  884. stream := &logStream{
  885. client: mockClient,
  886. logGroupName: groupName,
  887. logStreamName: streamName,
  888. sequenceToken: aws.String(sequenceToken),
  889. messages: make(chan *logger.Message),
  890. }
  891. calls := make([]*cloudwatchlogs.PutLogEventsInput, 0)
  892. called := make(chan struct{}, 50)
  893. mockClient.putLogEventsFunc = func(ctx context.Context, input *cloudwatchlogs.PutLogEventsInput, opts ...func(*cloudwatchlogs.Options)) (*cloudwatchlogs.PutLogEventsOutput, error) {
  894. calls = append(calls, input)
  895. called <- struct{}{}
  896. return &cloudwatchlogs.PutLogEventsOutput{
  897. NextSequenceToken: aws.String(nextSequenceToken),
  898. }, nil
  899. }
  900. ticks := make(chan time.Time)
  901. newTicker = func(_ time.Duration) *time.Ticker {
  902. return &time.Ticker{
  903. C: ticks,
  904. }
  905. }
  906. d := make(chan bool)
  907. close(d)
  908. go stream.collectBatch(d)
  909. stream.Log(&logger.Message{
  910. Line: []byte(logline),
  911. Timestamp: time.Time{},
  912. })
  913. // no ticks
  914. stream.Close()
  915. <-called
  916. assert.Assert(t, len(calls) == 1)
  917. argument := calls[0]
  918. close(called)
  919. assert.Assert(t, argument != nil)
  920. assert.Assert(t, len(argument.LogEvents) == 1)
  921. assert.Equal(t, logline, *(argument.LogEvents[0].Message))
  922. }
  923. func TestEffectiveLen(t *testing.T) {
  924. tests := []struct {
  925. str string
  926. effectiveBytes int
  927. }{
  928. {"Hello", 5},
  929. {string([]byte{1, 2, 3, 4}), 4},
  930. {"🙃", 4},
  931. {string([]byte{0xFF, 0xFF, 0xFF, 0xFF}), 12},
  932. {"He\xff\xffo", 9},
  933. {"", 0},
  934. }
  935. for i, tc := range tests {
  936. t.Run(fmt.Sprintf("%d/%s", i, tc.str), func(t *testing.T) {
  937. assert.Equal(t, tc.effectiveBytes, effectiveLen(tc.str))
  938. })
  939. }
  940. }
  941. func TestFindValidSplit(t *testing.T) {
  942. tests := []struct {
  943. str string
  944. maxEffectiveBytes int
  945. splitOffset int
  946. effectiveBytes int
  947. }{
  948. {"", 10, 0, 0},
  949. {"Hello", 6, 5, 5},
  950. {"Hello", 2, 2, 2},
  951. {"Hello", 0, 0, 0},
  952. {"🙃", 3, 0, 0},
  953. {"🙃", 4, 4, 4},
  954. {string([]byte{'a', 0xFF}), 2, 1, 1},
  955. {string([]byte{'a', 0xFF}), 4, 2, 4},
  956. }
  957. for i, tc := range tests {
  958. t.Run(fmt.Sprintf("%d/%s", i, tc.str), func(t *testing.T) {
  959. splitOffset, effectiveBytes := findValidSplit(tc.str, tc.maxEffectiveBytes)
  960. assert.Equal(t, tc.splitOffset, splitOffset, "splitOffset")
  961. assert.Equal(t, tc.effectiveBytes, effectiveBytes, "effectiveBytes")
  962. t.Log(tc.str[:tc.splitOffset])
  963. t.Log(tc.str[tc.splitOffset:])
  964. })
  965. }
  966. }
  967. func TestProcessEventEmoji(t *testing.T) {
  968. stream := &logStream{}
  969. batch := &eventBatch{}
  970. bytes := []byte(strings.Repeat("🙃", maximumBytesPerEvent/4+1))
  971. stream.processEvent(batch, bytes, 0)
  972. assert.Equal(t, 2, len(batch.batch), "should be two events in the batch")
  973. assert.Equal(t, strings.Repeat("🙃", maximumBytesPerEvent/4), *batch.batch[0].inputLogEvent.Message)
  974. assert.Equal(t, "🙃", *batch.batch[1].inputLogEvent.Message)
  975. }
  976. func TestCollectBatchLineSplit(t *testing.T) {
  977. mockClient := &mockClient{}
  978. stream := &logStream{
  979. client: mockClient,
  980. logGroupName: groupName,
  981. logStreamName: streamName,
  982. sequenceToken: aws.String(sequenceToken),
  983. messages: make(chan *logger.Message),
  984. }
  985. calls := make([]*cloudwatchlogs.PutLogEventsInput, 0)
  986. called := make(chan struct{}, 50)
  987. mockClient.putLogEventsFunc = func(ctx context.Context, input *cloudwatchlogs.PutLogEventsInput, opts ...func(*cloudwatchlogs.Options)) (*cloudwatchlogs.PutLogEventsOutput, error) {
  988. calls = append(calls, input)
  989. called <- struct{}{}
  990. return &cloudwatchlogs.PutLogEventsOutput{
  991. NextSequenceToken: aws.String(nextSequenceToken),
  992. }, nil
  993. }
  994. ticks := make(chan time.Time)
  995. newTicker = func(_ time.Duration) *time.Ticker {
  996. return &time.Ticker{
  997. C: ticks,
  998. }
  999. }
  1000. d := make(chan bool)
  1001. close(d)
  1002. go stream.collectBatch(d)
  1003. longline := strings.Repeat("A", maximumBytesPerEvent)
  1004. stream.Log(&logger.Message{
  1005. Line: []byte(longline + "B"),
  1006. Timestamp: time.Time{},
  1007. })
  1008. // no ticks
  1009. stream.Close()
  1010. <-called
  1011. assert.Assert(t, len(calls) == 1)
  1012. argument := calls[0]
  1013. close(called)
  1014. assert.Assert(t, argument != nil)
  1015. assert.Assert(t, len(argument.LogEvents) == 2)
  1016. assert.Equal(t, longline, aws.ToString(argument.LogEvents[0].Message))
  1017. assert.Equal(t, "B", aws.ToString(argument.LogEvents[1].Message))
  1018. }
  1019. func TestCollectBatchLineSplitWithBinary(t *testing.T) {
  1020. mockClient := &mockClient{}
  1021. stream := &logStream{
  1022. client: mockClient,
  1023. logGroupName: groupName,
  1024. logStreamName: streamName,
  1025. sequenceToken: aws.String(sequenceToken),
  1026. messages: make(chan *logger.Message),
  1027. }
  1028. calls := make([]*cloudwatchlogs.PutLogEventsInput, 0)
  1029. called := make(chan struct{}, 50)
  1030. mockClient.putLogEventsFunc = func(ctx context.Context, input *cloudwatchlogs.PutLogEventsInput, opts ...func(*cloudwatchlogs.Options)) (*cloudwatchlogs.PutLogEventsOutput, error) {
  1031. calls = append(calls, input)
  1032. called <- struct{}{}
  1033. return &cloudwatchlogs.PutLogEventsOutput{
  1034. NextSequenceToken: aws.String(nextSequenceToken),
  1035. }, nil
  1036. }
  1037. ticks := make(chan time.Time)
  1038. newTicker = func(_ time.Duration) *time.Ticker {
  1039. return &time.Ticker{
  1040. C: ticks,
  1041. }
  1042. }
  1043. d := make(chan bool)
  1044. close(d)
  1045. go stream.collectBatch(d)
  1046. longline := strings.Repeat("\xFF", maximumBytesPerEvent/3) // 0xFF is counted as the 3-byte utf8.RuneError
  1047. stream.Log(&logger.Message{
  1048. Line: []byte(longline + "\xFD"),
  1049. Timestamp: time.Time{},
  1050. })
  1051. // no ticks
  1052. stream.Close()
  1053. <-called
  1054. assert.Assert(t, len(calls) == 1)
  1055. argument := calls[0]
  1056. close(called)
  1057. assert.Assert(t, argument != nil)
  1058. assert.Assert(t, len(argument.LogEvents) == 2)
  1059. assert.Equal(t, longline, aws.ToString(argument.LogEvents[0].Message))
  1060. assert.Equal(t, "\xFD", aws.ToString(argument.LogEvents[1].Message))
  1061. }
  1062. func TestCollectBatchMaxEvents(t *testing.T) {
  1063. mockClient := &mockClient{}
  1064. stream := &logStream{
  1065. client: mockClient,
  1066. logGroupName: groupName,
  1067. logStreamName: streamName,
  1068. sequenceToken: aws.String(sequenceToken),
  1069. messages: make(chan *logger.Message),
  1070. }
  1071. calls := make([]*cloudwatchlogs.PutLogEventsInput, 0)
  1072. called := make(chan struct{}, 50)
  1073. mockClient.putLogEventsFunc = func(ctx context.Context, input *cloudwatchlogs.PutLogEventsInput, opts ...func(*cloudwatchlogs.Options)) (*cloudwatchlogs.PutLogEventsOutput, error) {
  1074. calls = append(calls, input)
  1075. called <- struct{}{}
  1076. return &cloudwatchlogs.PutLogEventsOutput{
  1077. NextSequenceToken: aws.String(nextSequenceToken),
  1078. }, nil
  1079. }
  1080. ticks := make(chan time.Time)
  1081. newTicker = func(_ time.Duration) *time.Ticker {
  1082. return &time.Ticker{
  1083. C: ticks,
  1084. }
  1085. }
  1086. d := make(chan bool)
  1087. close(d)
  1088. go stream.collectBatch(d)
  1089. line := "A"
  1090. for i := 0; i <= maximumLogEventsPerPut; i++ {
  1091. stream.Log(&logger.Message{
  1092. Line: []byte(line),
  1093. Timestamp: time.Time{},
  1094. })
  1095. }
  1096. // no ticks
  1097. stream.Close()
  1098. <-called
  1099. <-called
  1100. assert.Assert(t, len(calls) == 2)
  1101. argument := calls[0]
  1102. assert.Assert(t, argument != nil)
  1103. assert.Check(t, len(argument.LogEvents) == maximumLogEventsPerPut)
  1104. argument = calls[1]
  1105. close(called)
  1106. assert.Assert(t, argument != nil)
  1107. assert.Assert(t, len(argument.LogEvents) == 1)
  1108. }
  1109. func TestCollectBatchMaxTotalBytes(t *testing.T) {
  1110. expectedPuts := 2
  1111. mockClient := &mockClient{}
  1112. stream := &logStream{
  1113. client: mockClient,
  1114. logGroupName: groupName,
  1115. logStreamName: streamName,
  1116. sequenceToken: aws.String(sequenceToken),
  1117. messages: make(chan *logger.Message),
  1118. }
  1119. calls := make([]*cloudwatchlogs.PutLogEventsInput, 0)
  1120. called := make(chan struct{}, 50)
  1121. mockClient.putLogEventsFunc = func(ctx context.Context, input *cloudwatchlogs.PutLogEventsInput, opts ...func(*cloudwatchlogs.Options)) (*cloudwatchlogs.PutLogEventsOutput, error) {
  1122. calls = append(calls, input)
  1123. called <- struct{}{}
  1124. return &cloudwatchlogs.PutLogEventsOutput{
  1125. NextSequenceToken: aws.String(nextSequenceToken),
  1126. }, nil
  1127. }
  1128. ticks := make(chan time.Time)
  1129. newTicker = func(_ time.Duration) *time.Ticker {
  1130. return &time.Ticker{
  1131. C: ticks,
  1132. }
  1133. }
  1134. d := make(chan bool)
  1135. close(d)
  1136. go stream.collectBatch(d)
  1137. numPayloads := maximumBytesPerPut / (maximumBytesPerEvent + perEventBytes)
  1138. // maxline is the maximum line that could be submitted after
  1139. // accounting for its overhead.
  1140. maxline := strings.Repeat("A", maximumBytesPerPut-(perEventBytes*numPayloads))
  1141. // This will be split and batched up to the `maximumBytesPerPut'
  1142. // (+/- `maximumBytesPerEvent'). This /should/ be aligned, but
  1143. // should also tolerate an offset within that range.
  1144. stream.Log(&logger.Message{
  1145. Line: []byte(maxline[:len(maxline)/2]),
  1146. Timestamp: time.Time{},
  1147. })
  1148. stream.Log(&logger.Message{
  1149. Line: []byte(maxline[len(maxline)/2:]),
  1150. Timestamp: time.Time{},
  1151. })
  1152. stream.Log(&logger.Message{
  1153. Line: []byte("B"),
  1154. Timestamp: time.Time{},
  1155. })
  1156. // no ticks, guarantee batch by size (and chan close)
  1157. stream.Close()
  1158. for i := 0; i < expectedPuts; i++ {
  1159. <-called
  1160. }
  1161. assert.Assert(t, len(calls) == expectedPuts)
  1162. argument := calls[0]
  1163. assert.Assert(t, argument != nil)
  1164. // Should total to the maximum allowed bytes.
  1165. eventBytes := 0
  1166. for _, event := range argument.LogEvents {
  1167. eventBytes += len(*event.Message)
  1168. }
  1169. eventsOverhead := len(argument.LogEvents) * perEventBytes
  1170. payloadTotal := eventBytes + eventsOverhead
  1171. // lowestMaxBatch allows the payload to be offset if the messages
  1172. // don't lend themselves to align with the maximum event size.
  1173. lowestMaxBatch := maximumBytesPerPut - maximumBytesPerEvent
  1174. assert.Check(t, payloadTotal <= maximumBytesPerPut)
  1175. assert.Check(t, payloadTotal >= lowestMaxBatch)
  1176. argument = calls[1]
  1177. assert.Assert(t, len(argument.LogEvents) == 1)
  1178. message := *argument.LogEvents[len(argument.LogEvents)-1].Message
  1179. assert.Equal(t, "B", message[len(message)-1:])
  1180. }
  1181. func TestCollectBatchMaxTotalBytesWithBinary(t *testing.T) {
  1182. expectedPuts := 2
  1183. mockClient := &mockClient{}
  1184. stream := &logStream{
  1185. client: mockClient,
  1186. logGroupName: groupName,
  1187. logStreamName: streamName,
  1188. sequenceToken: aws.String(sequenceToken),
  1189. messages: make(chan *logger.Message),
  1190. }
  1191. calls := make([]*cloudwatchlogs.PutLogEventsInput, 0)
  1192. called := make(chan struct{}, 50)
  1193. mockClient.putLogEventsFunc = func(ctx context.Context, input *cloudwatchlogs.PutLogEventsInput, opts ...func(*cloudwatchlogs.Options)) (*cloudwatchlogs.PutLogEventsOutput, error) {
  1194. calls = append(calls, input)
  1195. called <- struct{}{}
  1196. return &cloudwatchlogs.PutLogEventsOutput{
  1197. NextSequenceToken: aws.String(nextSequenceToken),
  1198. }, nil
  1199. }
  1200. ticks := make(chan time.Time)
  1201. newTicker = func(_ time.Duration) *time.Ticker {
  1202. return &time.Ticker{
  1203. C: ticks,
  1204. }
  1205. }
  1206. d := make(chan bool)
  1207. close(d)
  1208. go stream.collectBatch(d)
  1209. // maxline is the maximum line that could be submitted after
  1210. // accounting for its overhead.
  1211. maxline := strings.Repeat("\xFF", (maximumBytesPerPut-perEventBytes)/3) // 0xFF is counted as the 3-byte utf8.RuneError
  1212. // This will be split and batched up to the `maximumBytesPerPut'
  1213. // (+/- `maximumBytesPerEvent'). This /should/ be aligned, but
  1214. // should also tolerate an offset within that range.
  1215. stream.Log(&logger.Message{
  1216. Line: []byte(maxline),
  1217. Timestamp: time.Time{},
  1218. })
  1219. stream.Log(&logger.Message{
  1220. Line: []byte("B"),
  1221. Timestamp: time.Time{},
  1222. })
  1223. // no ticks, guarantee batch by size (and chan close)
  1224. stream.Close()
  1225. for i := 0; i < expectedPuts; i++ {
  1226. <-called
  1227. }
  1228. assert.Assert(t, len(calls) == expectedPuts)
  1229. argument := calls[0]
  1230. assert.Assert(t, argument != nil)
  1231. // Should total to the maximum allowed bytes.
  1232. eventBytes := 0
  1233. for _, event := range argument.LogEvents {
  1234. eventBytes += effectiveLen(*event.Message)
  1235. }
  1236. eventsOverhead := len(argument.LogEvents) * perEventBytes
  1237. payloadTotal := eventBytes + eventsOverhead
  1238. // lowestMaxBatch allows the payload to be offset if the messages
  1239. // don't lend themselves to align with the maximum event size.
  1240. lowestMaxBatch := maximumBytesPerPut - maximumBytesPerEvent
  1241. assert.Check(t, payloadTotal <= maximumBytesPerPut)
  1242. assert.Check(t, payloadTotal >= lowestMaxBatch)
  1243. argument = calls[1]
  1244. message := *argument.LogEvents[len(argument.LogEvents)-1].Message
  1245. assert.Equal(t, "B", message[len(message)-1:])
  1246. }
  1247. func TestCollectBatchWithDuplicateTimestamps(t *testing.T) {
  1248. mockClient := &mockClient{}
  1249. stream := &logStream{
  1250. client: mockClient,
  1251. logGroupName: groupName,
  1252. logStreamName: streamName,
  1253. sequenceToken: aws.String(sequenceToken),
  1254. messages: make(chan *logger.Message),
  1255. }
  1256. calls := make([]*cloudwatchlogs.PutLogEventsInput, 0)
  1257. called := make(chan struct{}, 50)
  1258. mockClient.putLogEventsFunc = func(ctx context.Context, input *cloudwatchlogs.PutLogEventsInput, opts ...func(*cloudwatchlogs.Options)) (*cloudwatchlogs.PutLogEventsOutput, error) {
  1259. calls = append(calls, input)
  1260. called <- struct{}{}
  1261. return &cloudwatchlogs.PutLogEventsOutput{
  1262. NextSequenceToken: aws.String(nextSequenceToken),
  1263. }, nil
  1264. }
  1265. ticks := make(chan time.Time)
  1266. newTicker = func(_ time.Duration) *time.Ticker {
  1267. return &time.Ticker{
  1268. C: ticks,
  1269. }
  1270. }
  1271. d := make(chan bool)
  1272. close(d)
  1273. go stream.collectBatch(d)
  1274. var expectedEvents []types.InputLogEvent
  1275. times := maximumLogEventsPerPut
  1276. timestamp := time.Now()
  1277. for i := 0; i < times; i++ {
  1278. line := strconv.Itoa(i)
  1279. if i%2 == 0 {
  1280. timestamp = timestamp.Add(1 * time.Nanosecond)
  1281. }
  1282. stream.Log(&logger.Message{
  1283. Line: []byte(line),
  1284. Timestamp: timestamp,
  1285. })
  1286. expectedEvents = append(expectedEvents, types.InputLogEvent{
  1287. Message: aws.String(line),
  1288. Timestamp: aws.Int64(timestamp.UnixNano() / int64(time.Millisecond)),
  1289. })
  1290. }
  1291. ticks <- time.Time{}
  1292. stream.Close()
  1293. <-called
  1294. assert.Assert(t, len(calls) == 1)
  1295. argument := calls[0]
  1296. close(called)
  1297. assert.Assert(t, argument != nil)
  1298. assert.Assert(t, len(argument.LogEvents) == times)
  1299. for i := 0; i < times; i++ {
  1300. if !reflect.DeepEqual(argument.LogEvents[i], expectedEvents[i]) {
  1301. t.Errorf("Expected event to be %v but was %v", expectedEvents[i], argument.LogEvents[i])
  1302. }
  1303. }
  1304. }
  1305. func TestParseLogOptionsMultilinePattern(t *testing.T) {
  1306. info := logger.Info{
  1307. Config: map[string]string{
  1308. multilinePatternKey: "^xxxx",
  1309. },
  1310. }
  1311. multilinePattern, err := parseMultilineOptions(info)
  1312. assert.Check(t, err, "Received unexpected error")
  1313. assert.Check(t, multilinePattern.MatchString("xxxx"), "No multiline pattern match found")
  1314. }
  1315. func TestParseLogOptionsDatetimeFormat(t *testing.T) {
  1316. datetimeFormatTests := []struct {
  1317. format string
  1318. match string
  1319. }{
  1320. {"%d/%m/%y %a %H:%M:%S%L %Z", "31/12/10 Mon 08:42:44.345 NZDT"},
  1321. {"%Y-%m-%d %A %I:%M:%S.%f%p%z", "2007-12-04 Monday 08:42:44.123456AM+1200"},
  1322. {"%b|%b|%b|%b|%b|%b|%b|%b|%b|%b|%b|%b", "Jan|Feb|Mar|Apr|May|Jun|Jul|Aug|Sep|Oct|Nov|Dec"},
  1323. {"%B|%B|%B|%B|%B|%B|%B|%B|%B|%B|%B|%B", "January|February|March|April|May|June|July|August|September|October|November|December"},
  1324. {"%A|%A|%A|%A|%A|%A|%A", "Monday|Tuesday|Wednesday|Thursday|Friday|Saturday|Sunday"},
  1325. {"%a|%a|%a|%a|%a|%a|%a", "Mon|Tue|Wed|Thu|Fri|Sat|Sun"},
  1326. {"Day of the week: %w, Day of the year: %j", "Day of the week: 4, Day of the year: 091"},
  1327. }
  1328. for _, dt := range datetimeFormatTests {
  1329. t.Run(dt.match, func(t *testing.T) {
  1330. info := logger.Info{
  1331. Config: map[string]string{
  1332. datetimeFormatKey: dt.format,
  1333. },
  1334. }
  1335. multilinePattern, err := parseMultilineOptions(info)
  1336. assert.Check(t, err, "Received unexpected error")
  1337. assert.Check(t, multilinePattern.MatchString(dt.match), "No multiline pattern match found")
  1338. })
  1339. }
  1340. }
  1341. func TestValidateLogOptionsDatetimeFormatAndMultilinePattern(t *testing.T) {
  1342. cfg := map[string]string{
  1343. multilinePatternKey: "^xxxx",
  1344. datetimeFormatKey: "%Y-%m-%d",
  1345. logGroupKey: groupName,
  1346. }
  1347. conflictingLogOptionsError := "you cannot configure log opt 'awslogs-datetime-format' and 'awslogs-multiline-pattern' at the same time"
  1348. err := ValidateLogOpt(cfg)
  1349. assert.Check(t, err != nil, "Expected an error")
  1350. assert.Check(t, is.Equal(err.Error(), conflictingLogOptionsError), "Received invalid error")
  1351. }
  1352. func TestValidateLogOptionsForceFlushIntervalSeconds(t *testing.T) {
  1353. tests := []struct {
  1354. input string
  1355. shouldErr bool
  1356. }{
  1357. {"0", true},
  1358. {"-1", true},
  1359. {"a", true},
  1360. {"10", false},
  1361. }
  1362. for _, tc := range tests {
  1363. t.Run(tc.input, func(t *testing.T) {
  1364. cfg := map[string]string{
  1365. forceFlushIntervalKey: tc.input,
  1366. logGroupKey: groupName,
  1367. }
  1368. err := ValidateLogOpt(cfg)
  1369. if tc.shouldErr {
  1370. expectedErr := "must specify a positive integer for log opt 'awslogs-force-flush-interval-seconds': " + tc.input
  1371. assert.Error(t, err, expectedErr)
  1372. } else {
  1373. assert.NilError(t, err)
  1374. }
  1375. })
  1376. }
  1377. }
  1378. func TestValidateLogOptionsMaxBufferedEvents(t *testing.T) {
  1379. tests := []struct {
  1380. input string
  1381. shouldErr bool
  1382. }{
  1383. {"0", true},
  1384. {"-1", true},
  1385. {"a", true},
  1386. {"10", false},
  1387. }
  1388. for _, tc := range tests {
  1389. t.Run(tc.input, func(t *testing.T) {
  1390. cfg := map[string]string{
  1391. maxBufferedEventsKey: tc.input,
  1392. logGroupKey: groupName,
  1393. }
  1394. err := ValidateLogOpt(cfg)
  1395. if tc.shouldErr {
  1396. expectedErr := "must specify a positive integer for log opt 'awslogs-max-buffered-events': " + tc.input
  1397. assert.Error(t, err, expectedErr)
  1398. } else {
  1399. assert.NilError(t, err)
  1400. }
  1401. })
  1402. }
  1403. }
  1404. func TestValidateLogOptionsFormat(t *testing.T) {
  1405. tests := []struct {
  1406. format string
  1407. multiLinePattern string
  1408. datetimeFormat string
  1409. expErrMsg string
  1410. }{
  1411. {"json/emf", "", "", ""},
  1412. {"random", "", "", "unsupported log format 'random'"},
  1413. {"", "", "", ""},
  1414. {"json/emf", "---", "", "you cannot configure log opt 'awslogs-datetime-format' or 'awslogs-multiline-pattern' when log opt 'awslogs-format' is set to 'json/emf'"},
  1415. {"json/emf", "", "yyyy-dd-mm", "you cannot configure log opt 'awslogs-datetime-format' or 'awslogs-multiline-pattern' when log opt 'awslogs-format' is set to 'json/emf'"},
  1416. }
  1417. for i, tc := range tests {
  1418. t.Run(fmt.Sprintf("%d/%s", i, tc.format), func(t *testing.T) {
  1419. cfg := map[string]string{
  1420. logGroupKey: groupName,
  1421. logFormatKey: tc.format,
  1422. }
  1423. if tc.multiLinePattern != "" {
  1424. cfg[multilinePatternKey] = tc.multiLinePattern
  1425. }
  1426. if tc.datetimeFormat != "" {
  1427. cfg[datetimeFormatKey] = tc.datetimeFormat
  1428. }
  1429. err := ValidateLogOpt(cfg)
  1430. if tc.expErrMsg != "" {
  1431. assert.Error(t, err, tc.expErrMsg)
  1432. } else {
  1433. assert.NilError(t, err)
  1434. }
  1435. })
  1436. }
  1437. }
  1438. func TestCreateTagSuccess(t *testing.T) {
  1439. mockClient := &mockClient{}
  1440. info := logger.Info{
  1441. ContainerName: "/test-container",
  1442. ContainerID: "container-abcdefghijklmnopqrstuvwxyz01234567890",
  1443. Config: map[string]string{"tag": "{{.Name}}/{{.FullID}}"},
  1444. }
  1445. logStreamName, e := loggerutils.ParseLogTag(info, loggerutils.DefaultTemplate)
  1446. if e != nil {
  1447. t.Errorf("Error generating tag: %q", e)
  1448. }
  1449. stream := &logStream{
  1450. client: mockClient,
  1451. logGroupName: groupName,
  1452. logStreamName: logStreamName,
  1453. logCreateStream: true,
  1454. }
  1455. calls := make([]*cloudwatchlogs.CreateLogStreamInput, 0)
  1456. mockClient.createLogStreamFunc = func(ctx context.Context, input *cloudwatchlogs.CreateLogStreamInput, opts ...func(*cloudwatchlogs.Options)) (*cloudwatchlogs.CreateLogStreamOutput, error) {
  1457. calls = append(calls, input)
  1458. return &cloudwatchlogs.CreateLogStreamOutput{}, nil
  1459. }
  1460. err := stream.create()
  1461. assert.NilError(t, err)
  1462. assert.Equal(t, 1, len(calls))
  1463. argument := calls[0]
  1464. assert.Equal(t, "test-container/container-abcdefghijklmnopqrstuvwxyz01234567890", aws.ToString(argument.LogStreamName))
  1465. }
  1466. func BenchmarkUnwrapEvents(b *testing.B) {
  1467. events := make([]wrappedEvent, maximumLogEventsPerPut)
  1468. for i := 0; i < maximumLogEventsPerPut; i++ {
  1469. mes := strings.Repeat("0", maximumBytesPerEvent)
  1470. events[i].inputLogEvent = types.InputLogEvent{
  1471. Message: &mes,
  1472. }
  1473. }
  1474. b.ResetTimer()
  1475. for i := 0; i < b.N; i++ {
  1476. res := unwrapEvents(events)
  1477. assert.Check(b, is.Len(res, maximumLogEventsPerPut))
  1478. }
  1479. }
  1480. func TestNewAWSLogsClientCredentialEndpointDetect(t *testing.T) {
  1481. // required for the cloudwatchlogs client
  1482. t.Setenv("AWS_REGION", "us-west-2")
  1483. credsResp := `{
  1484. "AccessKeyId" : "test-access-key-id",
  1485. "SecretAccessKey": "test-secret-access-key"
  1486. }`
  1487. credsRetrieved := false
  1488. actualAuthHeader := ""
  1489. testServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
  1490. switch r.URL.Path {
  1491. case "/creds":
  1492. credsRetrieved = true
  1493. w.Header().Set("Content-Type", "application/json")
  1494. fmt.Fprintln(w, credsResp)
  1495. case "/":
  1496. actualAuthHeader = r.Header.Get("Authorization")
  1497. w.Header().Set("Content-Type", "application/json")
  1498. fmt.Fprintln(w, "{}")
  1499. }
  1500. }))
  1501. defer testServer.Close()
  1502. // set the SDKEndpoint in the driver
  1503. newSDKEndpoint = testServer.URL
  1504. info := logger.Info{
  1505. Config: map[string]string{
  1506. endpointKey: testServer.URL,
  1507. credentialsEndpointKey: "/creds",
  1508. },
  1509. }
  1510. client, err := newAWSLogsClient(info)
  1511. assert.Check(t, err)
  1512. _, err = client.CreateLogGroup(context.TODO(), &cloudwatchlogs.CreateLogGroupInput{LogGroupName: aws.String("foo")})
  1513. assert.NilError(t, err)
  1514. assert.Check(t, credsRetrieved)
  1515. // sample header val:
  1516. // AWS4-HMAC-SHA256 Credential=test-access-key-id/20220915/us-west-2/logs/aws4_request, SignedHeaders=amz-sdk-invocation-id;amz-sdk-request;content-length;content-type;host;x-amz-date;x-amz-target, Signature=9cc0f8347e379ec77884616bb4b5a9d4a9a11f63cdc4c765e2f0131f45fe06d3
  1517. assert.Check(t, is.Contains(actualAuthHeader, "AWS4-HMAC-SHA256 Credential=test-access-key-id/"))
  1518. assert.Check(t, is.Contains(actualAuthHeader, "us-west-2"))
  1519. assert.Check(t, is.Contains(actualAuthHeader, "Signature="))
  1520. }