12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067 |
- package awslogs
- import (
- "errors"
- "fmt"
- "net/http"
- "reflect"
- "regexp"
- "runtime"
- "strings"
- "testing"
- "time"
- "github.com/aws/aws-sdk-go/aws"
- "github.com/aws/aws-sdk-go/aws/awserr"
- "github.com/aws/aws-sdk-go/aws/request"
- "github.com/aws/aws-sdk-go/service/cloudwatchlogs"
- "github.com/docker/docker/daemon/logger"
- "github.com/docker/docker/daemon/logger/loggerutils"
- "github.com/docker/docker/dockerversion"
- "github.com/stretchr/testify/assert"
- )
- const (
- groupName = "groupName"
- streamName = "streamName"
- sequenceToken = "sequenceToken"
- nextSequenceToken = "nextSequenceToken"
- logline = "this is a log line\r"
- multilineLogline = "2017-01-01 01:01:44 This is a multiline log entry\r"
- )
- // Generates i multi-line events each with j lines
- func (l *logStream) logGenerator(lineCount int, multilineCount int) {
- for i := 0; i < multilineCount; i++ {
- l.Log(&logger.Message{
- Line: []byte(multilineLogline),
- Timestamp: time.Time{},
- })
- for j := 0; j < lineCount; j++ {
- l.Log(&logger.Message{
- Line: []byte(logline),
- Timestamp: time.Time{},
- })
- }
- }
- }
- func TestNewAWSLogsClientUserAgentHandler(t *testing.T) {
- info := logger.Info{
- Config: map[string]string{
- regionKey: "us-east-1",
- },
- }
- client, err := newAWSLogsClient(info)
- if err != nil {
- t.Fatal(err)
- }
- realClient, ok := client.(*cloudwatchlogs.CloudWatchLogs)
- if !ok {
- t.Fatal("Could not cast client to cloudwatchlogs.CloudWatchLogs")
- }
- buildHandlerList := realClient.Handlers.Build
- request := &request.Request{
- HTTPRequest: &http.Request{
- Header: http.Header{},
- },
- }
- buildHandlerList.Run(request)
- expectedUserAgentString := fmt.Sprintf("Docker %s (%s) %s/%s (%s; %s; %s)",
- dockerversion.Version, runtime.GOOS, aws.SDKName, aws.SDKVersion, runtime.Version(), runtime.GOOS, runtime.GOARCH)
- userAgent := request.HTTPRequest.Header.Get("User-Agent")
- if userAgent != expectedUserAgentString {
- t.Errorf("Wrong User-Agent string, expected \"%s\" but was \"%s\"",
- expectedUserAgentString, userAgent)
- }
- }
- func TestNewAWSLogsClientRegionDetect(t *testing.T) {
- info := logger.Info{
- Config: map[string]string{},
- }
- mockMetadata := newMockMetadataClient()
- newRegionFinder = func() regionFinder {
- return mockMetadata
- }
- mockMetadata.regionResult <- ®ionResult{
- successResult: "us-east-1",
- }
- _, err := newAWSLogsClient(info)
- if err != nil {
- t.Fatal(err)
- }
- }
- func TestCreateSuccess(t *testing.T) {
- mockClient := newMockClient()
- stream := &logStream{
- client: mockClient,
- logGroupName: groupName,
- logStreamName: streamName,
- }
- mockClient.createLogStreamResult <- &createLogStreamResult{}
- err := stream.create()
- if err != nil {
- t.Errorf("Received unexpected err: %v\n", err)
- }
- argument := <-mockClient.createLogStreamArgument
- if argument.LogGroupName == nil {
- t.Fatal("Expected non-nil LogGroupName")
- }
- if *argument.LogGroupName != groupName {
- t.Errorf("Expected LogGroupName to be %s", groupName)
- }
- if argument.LogStreamName == nil {
- t.Fatal("Expected non-nil LogStreamName")
- }
- if *argument.LogStreamName != streamName {
- t.Errorf("Expected LogStreamName to be %s", streamName)
- }
- }
- func TestCreateLogGroupSuccess(t *testing.T) {
- mockClient := newMockClient()
- stream := &logStream{
- client: mockClient,
- logGroupName: groupName,
- logStreamName: streamName,
- logCreateGroup: true,
- }
- mockClient.createLogGroupResult <- &createLogGroupResult{}
- mockClient.createLogStreamResult <- &createLogStreamResult{}
- err := stream.create()
- if err != nil {
- t.Errorf("Received unexpected err: %v\n", err)
- }
- argument := <-mockClient.createLogStreamArgument
- if argument.LogGroupName == nil {
- t.Fatal("Expected non-nil LogGroupName")
- }
- if *argument.LogGroupName != groupName {
- t.Errorf("Expected LogGroupName to be %s", groupName)
- }
- if argument.LogStreamName == nil {
- t.Fatal("Expected non-nil LogStreamName")
- }
- if *argument.LogStreamName != streamName {
- t.Errorf("Expected LogStreamName to be %s", streamName)
- }
- }
- func TestCreateError(t *testing.T) {
- mockClient := newMockClient()
- stream := &logStream{
- client: mockClient,
- }
- mockClient.createLogStreamResult <- &createLogStreamResult{
- errorResult: errors.New("Error"),
- }
- err := stream.create()
- if err == nil {
- t.Fatal("Expected non-nil err")
- }
- }
- func TestCreateAlreadyExists(t *testing.T) {
- mockClient := newMockClient()
- stream := &logStream{
- client: mockClient,
- }
- mockClient.createLogStreamResult <- &createLogStreamResult{
- errorResult: awserr.New(resourceAlreadyExistsCode, "", nil),
- }
- err := stream.create()
- if err != nil {
- t.Fatal("Expected nil err")
- }
- }
- func TestPublishBatchSuccess(t *testing.T) {
- mockClient := newMockClient()
- stream := &logStream{
- client: mockClient,
- logGroupName: groupName,
- logStreamName: streamName,
- sequenceToken: aws.String(sequenceToken),
- }
- mockClient.putLogEventsResult <- &putLogEventsResult{
- successResult: &cloudwatchlogs.PutLogEventsOutput{
- NextSequenceToken: aws.String(nextSequenceToken),
- },
- }
- events := []wrappedEvent{
- {
- inputLogEvent: &cloudwatchlogs.InputLogEvent{
- Message: aws.String(logline),
- },
- },
- }
- stream.publishBatch(events)
- if stream.sequenceToken == nil {
- t.Fatal("Expected non-nil sequenceToken")
- }
- if *stream.sequenceToken != nextSequenceToken {
- t.Errorf("Expected sequenceToken to be %s, but was %s", nextSequenceToken, *stream.sequenceToken)
- }
- argument := <-mockClient.putLogEventsArgument
- if argument == nil {
- t.Fatal("Expected non-nil PutLogEventsInput")
- }
- if argument.SequenceToken == nil {
- t.Fatal("Expected non-nil PutLogEventsInput.SequenceToken")
- }
- if *argument.SequenceToken != sequenceToken {
- t.Errorf("Expected PutLogEventsInput.SequenceToken to be %s, but was %s", sequenceToken, *argument.SequenceToken)
- }
- if len(argument.LogEvents) != 1 {
- t.Errorf("Expected LogEvents to contain 1 element, but contains %d", len(argument.LogEvents))
- }
- if argument.LogEvents[0] != events[0].inputLogEvent {
- t.Error("Expected event to equal input")
- }
- }
- func TestPublishBatchError(t *testing.T) {
- mockClient := newMockClient()
- stream := &logStream{
- client: mockClient,
- logGroupName: groupName,
- logStreamName: streamName,
- sequenceToken: aws.String(sequenceToken),
- }
- mockClient.putLogEventsResult <- &putLogEventsResult{
- errorResult: errors.New("Error"),
- }
- events := []wrappedEvent{
- {
- inputLogEvent: &cloudwatchlogs.InputLogEvent{
- Message: aws.String(logline),
- },
- },
- }
- stream.publishBatch(events)
- if stream.sequenceToken == nil {
- t.Fatal("Expected non-nil sequenceToken")
- }
- if *stream.sequenceToken != sequenceToken {
- t.Errorf("Expected sequenceToken to be %s, but was %s", sequenceToken, *stream.sequenceToken)
- }
- }
- func TestPublishBatchInvalidSeqSuccess(t *testing.T) {
- mockClient := newMockClientBuffered(2)
- stream := &logStream{
- client: mockClient,
- logGroupName: groupName,
- logStreamName: streamName,
- sequenceToken: aws.String(sequenceToken),
- }
- mockClient.putLogEventsResult <- &putLogEventsResult{
- errorResult: awserr.New(invalidSequenceTokenCode, "use token token", nil),
- }
- mockClient.putLogEventsResult <- &putLogEventsResult{
- successResult: &cloudwatchlogs.PutLogEventsOutput{
- NextSequenceToken: aws.String(nextSequenceToken),
- },
- }
- events := []wrappedEvent{
- {
- inputLogEvent: &cloudwatchlogs.InputLogEvent{
- Message: aws.String(logline),
- },
- },
- }
- stream.publishBatch(events)
- if stream.sequenceToken == nil {
- t.Fatal("Expected non-nil sequenceToken")
- }
- if *stream.sequenceToken != nextSequenceToken {
- t.Errorf("Expected sequenceToken to be %s, but was %s", nextSequenceToken, *stream.sequenceToken)
- }
- argument := <-mockClient.putLogEventsArgument
- if argument == nil {
- t.Fatal("Expected non-nil PutLogEventsInput")
- }
- if argument.SequenceToken == nil {
- t.Fatal("Expected non-nil PutLogEventsInput.SequenceToken")
- }
- if *argument.SequenceToken != sequenceToken {
- t.Errorf("Expected PutLogEventsInput.SequenceToken to be %s, but was %s", sequenceToken, *argument.SequenceToken)
- }
- if len(argument.LogEvents) != 1 {
- t.Errorf("Expected LogEvents to contain 1 element, but contains %d", len(argument.LogEvents))
- }
- if argument.LogEvents[0] != events[0].inputLogEvent {
- t.Error("Expected event to equal input")
- }
- argument = <-mockClient.putLogEventsArgument
- if argument == nil {
- t.Fatal("Expected non-nil PutLogEventsInput")
- }
- if argument.SequenceToken == nil {
- t.Fatal("Expected non-nil PutLogEventsInput.SequenceToken")
- }
- if *argument.SequenceToken != "token" {
- t.Errorf("Expected PutLogEventsInput.SequenceToken to be %s, but was %s", "token", *argument.SequenceToken)
- }
- if len(argument.LogEvents) != 1 {
- t.Errorf("Expected LogEvents to contain 1 element, but contains %d", len(argument.LogEvents))
- }
- if argument.LogEvents[0] != events[0].inputLogEvent {
- t.Error("Expected event to equal input")
- }
- }
- func TestPublishBatchAlreadyAccepted(t *testing.T) {
- mockClient := newMockClient()
- stream := &logStream{
- client: mockClient,
- logGroupName: groupName,
- logStreamName: streamName,
- sequenceToken: aws.String(sequenceToken),
- }
- mockClient.putLogEventsResult <- &putLogEventsResult{
- errorResult: awserr.New(dataAlreadyAcceptedCode, "use token token", nil),
- }
- events := []wrappedEvent{
- {
- inputLogEvent: &cloudwatchlogs.InputLogEvent{
- Message: aws.String(logline),
- },
- },
- }
- stream.publishBatch(events)
- if stream.sequenceToken == nil {
- t.Fatal("Expected non-nil sequenceToken")
- }
- if *stream.sequenceToken != "token" {
- t.Errorf("Expected sequenceToken to be %s, but was %s", "token", *stream.sequenceToken)
- }
- argument := <-mockClient.putLogEventsArgument
- if argument == nil {
- t.Fatal("Expected non-nil PutLogEventsInput")
- }
- if argument.SequenceToken == nil {
- t.Fatal("Expected non-nil PutLogEventsInput.SequenceToken")
- }
- if *argument.SequenceToken != sequenceToken {
- t.Errorf("Expected PutLogEventsInput.SequenceToken to be %s, but was %s", sequenceToken, *argument.SequenceToken)
- }
- if len(argument.LogEvents) != 1 {
- t.Errorf("Expected LogEvents to contain 1 element, but contains %d", len(argument.LogEvents))
- }
- if argument.LogEvents[0] != events[0].inputLogEvent {
- t.Error("Expected event to equal input")
- }
- }
- func TestCollectBatchSimple(t *testing.T) {
- mockClient := newMockClient()
- stream := &logStream{
- client: mockClient,
- logGroupName: groupName,
- logStreamName: streamName,
- sequenceToken: aws.String(sequenceToken),
- messages: make(chan *logger.Message),
- }
- mockClient.putLogEventsResult <- &putLogEventsResult{
- successResult: &cloudwatchlogs.PutLogEventsOutput{
- NextSequenceToken: aws.String(nextSequenceToken),
- },
- }
- ticks := make(chan time.Time)
- newTicker = func(_ time.Duration) *time.Ticker {
- return &time.Ticker{
- C: ticks,
- }
- }
- go stream.collectBatch()
- stream.Log(&logger.Message{
- Line: []byte(logline),
- Timestamp: time.Time{},
- })
- ticks <- time.Time{}
- stream.Close()
- argument := <-mockClient.putLogEventsArgument
- if argument == nil {
- t.Fatal("Expected non-nil PutLogEventsInput")
- }
- if len(argument.LogEvents) != 1 {
- t.Errorf("Expected LogEvents to contain 1 element, but contains %d", len(argument.LogEvents))
- }
- if *argument.LogEvents[0].Message != logline {
- t.Errorf("Expected message to be %s but was %s", logline, *argument.LogEvents[0].Message)
- }
- }
- func TestCollectBatchTicker(t *testing.T) {
- mockClient := newMockClient()
- stream := &logStream{
- client: mockClient,
- logGroupName: groupName,
- logStreamName: streamName,
- sequenceToken: aws.String(sequenceToken),
- messages: make(chan *logger.Message),
- }
- mockClient.putLogEventsResult <- &putLogEventsResult{
- successResult: &cloudwatchlogs.PutLogEventsOutput{
- NextSequenceToken: aws.String(nextSequenceToken),
- },
- }
- ticks := make(chan time.Time)
- newTicker = func(_ time.Duration) *time.Ticker {
- return &time.Ticker{
- C: ticks,
- }
- }
- go stream.collectBatch()
- stream.Log(&logger.Message{
- Line: []byte(logline + " 1"),
- Timestamp: time.Time{},
- })
- stream.Log(&logger.Message{
- Line: []byte(logline + " 2"),
- Timestamp: time.Time{},
- })
- ticks <- time.Time{}
- // Verify first batch
- argument := <-mockClient.putLogEventsArgument
- if argument == nil {
- t.Fatal("Expected non-nil PutLogEventsInput")
- }
- if len(argument.LogEvents) != 2 {
- t.Errorf("Expected LogEvents to contain 2 elements, but contains %d", len(argument.LogEvents))
- }
- if *argument.LogEvents[0].Message != logline+" 1" {
- t.Errorf("Expected message to be %s but was %s", logline+" 1", *argument.LogEvents[0].Message)
- }
- if *argument.LogEvents[1].Message != logline+" 2" {
- t.Errorf("Expected message to be %s but was %s", logline+" 2", *argument.LogEvents[0].Message)
- }
- stream.Log(&logger.Message{
- Line: []byte(logline + " 3"),
- Timestamp: time.Time{},
- })
- ticks <- time.Time{}
- argument = <-mockClient.putLogEventsArgument
- if argument == nil {
- t.Fatal("Expected non-nil PutLogEventsInput")
- }
- if len(argument.LogEvents) != 1 {
- t.Errorf("Expected LogEvents to contain 1 elements, but contains %d", len(argument.LogEvents))
- }
- if *argument.LogEvents[0].Message != logline+" 3" {
- t.Errorf("Expected message to be %s but was %s", logline+" 3", *argument.LogEvents[0].Message)
- }
- stream.Close()
- }
- func TestCollectBatchMultilinePattern(t *testing.T) {
- mockClient := newMockClient()
- multilinePattern := regexp.MustCompile("xxxx")
- stream := &logStream{
- client: mockClient,
- logGroupName: groupName,
- logStreamName: streamName,
- multilinePattern: multilinePattern,
- sequenceToken: aws.String(sequenceToken),
- messages: make(chan *logger.Message),
- }
- mockClient.putLogEventsResult <- &putLogEventsResult{
- successResult: &cloudwatchlogs.PutLogEventsOutput{
- NextSequenceToken: aws.String(nextSequenceToken),
- },
- }
- ticks := make(chan time.Time)
- newTicker = func(_ time.Duration) *time.Ticker {
- return &time.Ticker{
- C: ticks,
- }
- }
- go stream.collectBatch()
- stream.Log(&logger.Message{
- Line: []byte(logline),
- Timestamp: time.Now(),
- })
- stream.Log(&logger.Message{
- Line: []byte(logline),
- Timestamp: time.Now(),
- })
- stream.Log(&logger.Message{
- Line: []byte("xxxx " + logline),
- Timestamp: time.Now(),
- })
- ticks <- time.Now()
- // Verify single multiline event
- argument := <-mockClient.putLogEventsArgument
- assert.NotNil(t, argument, "Expected non-nil PutLogEventsInput")
- assert.Equal(t, 1, len(argument.LogEvents), "Expected single multiline event")
- assert.Equal(t, logline+"\n"+logline+"\n", *argument.LogEvents[0].Message, "Received incorrect multiline message")
- stream.Close()
- // Verify single event
- argument = <-mockClient.putLogEventsArgument
- assert.NotNil(t, argument, "Expected non-nil PutLogEventsInput")
- assert.Equal(t, 1, len(argument.LogEvents), "Expected single multiline event")
- assert.Equal(t, "xxxx "+logline+"\n", *argument.LogEvents[0].Message, "Received incorrect multiline message")
- }
- func BenchmarkCollectBatch(b *testing.B) {
- for i := 0; i < b.N; i++ {
- mockClient := newMockClient()
- stream := &logStream{
- client: mockClient,
- logGroupName: groupName,
- logStreamName: streamName,
- sequenceToken: aws.String(sequenceToken),
- messages: make(chan *logger.Message),
- }
- mockClient.putLogEventsResult <- &putLogEventsResult{
- successResult: &cloudwatchlogs.PutLogEventsOutput{
- NextSequenceToken: aws.String(nextSequenceToken),
- },
- }
- ticks := make(chan time.Time)
- newTicker = func(_ time.Duration) *time.Ticker {
- return &time.Ticker{
- C: ticks,
- }
- }
- go stream.collectBatch()
- stream.logGenerator(10, 100)
- ticks <- time.Time{}
- stream.Close()
- }
- }
- func BenchmarkCollectBatchMultilinePattern(b *testing.B) {
- for i := 0; i < b.N; i++ {
- mockClient := newMockClient()
- multilinePattern := regexp.MustCompile(`\d{4}-(?:0[1-9]|1[0-2])-(?:0[1-9]|[1,2][0-9]|3[0,1]) (?:[0,1][0-9]|2[0-3]):[0-5][0-9]:[0-5][0-9]`)
- stream := &logStream{
- client: mockClient,
- logGroupName: groupName,
- logStreamName: streamName,
- multilinePattern: multilinePattern,
- sequenceToken: aws.String(sequenceToken),
- messages: make(chan *logger.Message),
- }
- mockClient.putLogEventsResult <- &putLogEventsResult{
- successResult: &cloudwatchlogs.PutLogEventsOutput{
- NextSequenceToken: aws.String(nextSequenceToken),
- },
- }
- ticks := make(chan time.Time)
- newTicker = func(_ time.Duration) *time.Ticker {
- return &time.Ticker{
- C: ticks,
- }
- }
- go stream.collectBatch()
- stream.logGenerator(10, 100)
- ticks <- time.Time{}
- stream.Close()
- }
- }
- func TestCollectBatchMultilinePatternMaxEventAge(t *testing.T) {
- mockClient := newMockClient()
- multilinePattern := regexp.MustCompile("xxxx")
- stream := &logStream{
- client: mockClient,
- logGroupName: groupName,
- logStreamName: streamName,
- multilinePattern: multilinePattern,
- sequenceToken: aws.String(sequenceToken),
- messages: make(chan *logger.Message),
- }
- mockClient.putLogEventsResult <- &putLogEventsResult{
- successResult: &cloudwatchlogs.PutLogEventsOutput{
- NextSequenceToken: aws.String(nextSequenceToken),
- },
- }
- ticks := make(chan time.Time)
- newTicker = func(_ time.Duration) *time.Ticker {
- return &time.Ticker{
- C: ticks,
- }
- }
- go stream.collectBatch()
- stream.Log(&logger.Message{
- Line: []byte(logline),
- Timestamp: time.Now(),
- })
- // Log an event 1 second later
- stream.Log(&logger.Message{
- Line: []byte(logline),
- Timestamp: time.Now().Add(time.Second),
- })
- // Fire ticker batchPublishFrequency seconds later
- ticks <- time.Now().Add(batchPublishFrequency + time.Second)
- // Verify single multiline event is flushed after maximum event buffer age (batchPublishFrequency)
- argument := <-mockClient.putLogEventsArgument
- assert.NotNil(t, argument, "Expected non-nil PutLogEventsInput")
- assert.Equal(t, 1, len(argument.LogEvents), "Expected single multiline event")
- assert.Equal(t, logline+"\n"+logline+"\n", *argument.LogEvents[0].Message, "Received incorrect multiline message")
- // Log an event 1 second later
- stream.Log(&logger.Message{
- Line: []byte(logline),
- Timestamp: time.Now().Add(time.Second),
- })
- // Fire ticker another batchPublishFrequency seconds later
- ticks <- time.Now().Add(2*batchPublishFrequency + time.Second)
- // Verify the event buffer is truly flushed - we should only receive a single event
- argument = <-mockClient.putLogEventsArgument
- assert.NotNil(t, argument, "Expected non-nil PutLogEventsInput")
- assert.Equal(t, 1, len(argument.LogEvents), "Expected single multiline event")
- assert.Equal(t, logline+"\n", *argument.LogEvents[0].Message, "Received incorrect multiline message")
- stream.Close()
- }
- func TestCollectBatchMultilinePatternNegativeEventAge(t *testing.T) {
- mockClient := newMockClient()
- multilinePattern := regexp.MustCompile("xxxx")
- stream := &logStream{
- client: mockClient,
- logGroupName: groupName,
- logStreamName: streamName,
- multilinePattern: multilinePattern,
- sequenceToken: aws.String(sequenceToken),
- messages: make(chan *logger.Message),
- }
- mockClient.putLogEventsResult <- &putLogEventsResult{
- successResult: &cloudwatchlogs.PutLogEventsOutput{
- NextSequenceToken: aws.String(nextSequenceToken),
- },
- }
- ticks := make(chan time.Time)
- newTicker = func(_ time.Duration) *time.Ticker {
- return &time.Ticker{
- C: ticks,
- }
- }
- go stream.collectBatch()
- stream.Log(&logger.Message{
- Line: []byte(logline),
- Timestamp: time.Now(),
- })
- // Log an event 1 second later
- stream.Log(&logger.Message{
- Line: []byte(logline),
- Timestamp: time.Now().Add(time.Second),
- })
- // Fire ticker in past to simulate negative event buffer age
- ticks <- time.Now().Add(-time.Second)
- // Verify single multiline event is flushed with a negative event buffer age
- argument := <-mockClient.putLogEventsArgument
- assert.NotNil(t, argument, "Expected non-nil PutLogEventsInput")
- assert.Equal(t, 1, len(argument.LogEvents), "Expected single multiline event")
- assert.Equal(t, logline+"\n"+logline+"\n", *argument.LogEvents[0].Message, "Received incorrect multiline message")
- stream.Close()
- }
- func TestCollectBatchClose(t *testing.T) {
- mockClient := newMockClient()
- stream := &logStream{
- client: mockClient,
- logGroupName: groupName,
- logStreamName: streamName,
- sequenceToken: aws.String(sequenceToken),
- messages: make(chan *logger.Message),
- }
- mockClient.putLogEventsResult <- &putLogEventsResult{
- successResult: &cloudwatchlogs.PutLogEventsOutput{
- NextSequenceToken: aws.String(nextSequenceToken),
- },
- }
- var ticks = make(chan time.Time)
- newTicker = func(_ time.Duration) *time.Ticker {
- return &time.Ticker{
- C: ticks,
- }
- }
- go stream.collectBatch()
- stream.Log(&logger.Message{
- Line: []byte(logline),
- Timestamp: time.Time{},
- })
- // no ticks
- stream.Close()
- argument := <-mockClient.putLogEventsArgument
- if argument == nil {
- t.Fatal("Expected non-nil PutLogEventsInput")
- }
- if len(argument.LogEvents) != 1 {
- t.Errorf("Expected LogEvents to contain 1 element, but contains %d", len(argument.LogEvents))
- }
- if *argument.LogEvents[0].Message != logline {
- t.Errorf("Expected message to be %s but was %s", logline, *argument.LogEvents[0].Message)
- }
- }
- func TestCollectBatchLineSplit(t *testing.T) {
- mockClient := newMockClient()
- stream := &logStream{
- client: mockClient,
- logGroupName: groupName,
- logStreamName: streamName,
- sequenceToken: aws.String(sequenceToken),
- messages: make(chan *logger.Message),
- }
- mockClient.putLogEventsResult <- &putLogEventsResult{
- successResult: &cloudwatchlogs.PutLogEventsOutput{
- NextSequenceToken: aws.String(nextSequenceToken),
- },
- }
- var ticks = make(chan time.Time)
- newTicker = func(_ time.Duration) *time.Ticker {
- return &time.Ticker{
- C: ticks,
- }
- }
- go stream.collectBatch()
- longline := strings.Repeat("A", maximumBytesPerEvent)
- stream.Log(&logger.Message{
- Line: []byte(longline + "B"),
- Timestamp: time.Time{},
- })
- // no ticks
- stream.Close()
- argument := <-mockClient.putLogEventsArgument
- if argument == nil {
- t.Fatal("Expected non-nil PutLogEventsInput")
- }
- if len(argument.LogEvents) != 2 {
- t.Errorf("Expected LogEvents to contain 2 elements, but contains %d", len(argument.LogEvents))
- }
- if *argument.LogEvents[0].Message != longline {
- t.Errorf("Expected message to be %s but was %s", longline, *argument.LogEvents[0].Message)
- }
- if *argument.LogEvents[1].Message != "B" {
- t.Errorf("Expected message to be %s but was %s", "B", *argument.LogEvents[1].Message)
- }
- }
- func TestCollectBatchMaxEvents(t *testing.T) {
- mockClient := newMockClientBuffered(1)
- stream := &logStream{
- client: mockClient,
- logGroupName: groupName,
- logStreamName: streamName,
- sequenceToken: aws.String(sequenceToken),
- messages: make(chan *logger.Message),
- }
- mockClient.putLogEventsResult <- &putLogEventsResult{
- successResult: &cloudwatchlogs.PutLogEventsOutput{
- NextSequenceToken: aws.String(nextSequenceToken),
- },
- }
- var ticks = make(chan time.Time)
- newTicker = func(_ time.Duration) *time.Ticker {
- return &time.Ticker{
- C: ticks,
- }
- }
- go stream.collectBatch()
- line := "A"
- for i := 0; i <= maximumLogEventsPerPut; i++ {
- stream.Log(&logger.Message{
- Line: []byte(line),
- Timestamp: time.Time{},
- })
- }
- // no ticks
- stream.Close()
- argument := <-mockClient.putLogEventsArgument
- if argument == nil {
- t.Fatal("Expected non-nil PutLogEventsInput")
- }
- if len(argument.LogEvents) != maximumLogEventsPerPut {
- t.Errorf("Expected LogEvents to contain %d elements, but contains %d", maximumLogEventsPerPut, len(argument.LogEvents))
- }
- argument = <-mockClient.putLogEventsArgument
- if argument == nil {
- t.Fatal("Expected non-nil PutLogEventsInput")
- }
- if len(argument.LogEvents) != 1 {
- t.Errorf("Expected LogEvents to contain %d elements, but contains %d", 1, len(argument.LogEvents))
- }
- }
- func TestCollectBatchMaxTotalBytes(t *testing.T) {
- mockClient := newMockClientBuffered(1)
- stream := &logStream{
- client: mockClient,
- logGroupName: groupName,
- logStreamName: streamName,
- sequenceToken: aws.String(sequenceToken),
- messages: make(chan *logger.Message),
- }
- mockClient.putLogEventsResult <- &putLogEventsResult{
- successResult: &cloudwatchlogs.PutLogEventsOutput{
- NextSequenceToken: aws.String(nextSequenceToken),
- },
- }
- var ticks = make(chan time.Time)
- newTicker = func(_ time.Duration) *time.Ticker {
- return &time.Ticker{
- C: ticks,
- }
- }
- go stream.collectBatch()
- longline := strings.Repeat("A", maximumBytesPerPut)
- stream.Log(&logger.Message{
- Line: []byte(longline + "B"),
- Timestamp: time.Time{},
- })
- // no ticks
- stream.Close()
- argument := <-mockClient.putLogEventsArgument
- if argument == nil {
- t.Fatal("Expected non-nil PutLogEventsInput")
- }
- bytes := 0
- for _, event := range argument.LogEvents {
- bytes += len(*event.Message)
- }
- if bytes > maximumBytesPerPut {
- t.Errorf("Expected <= %d bytes but was %d", maximumBytesPerPut, bytes)
- }
- argument = <-mockClient.putLogEventsArgument
- if len(argument.LogEvents) != 1 {
- t.Errorf("Expected LogEvents to contain 1 elements, but contains %d", len(argument.LogEvents))
- }
- message := *argument.LogEvents[0].Message
- if message[len(message)-1:] != "B" {
- t.Errorf("Expected message to be %s but was %s", "B", message[len(message)-1:])
- }
- }
- func TestCollectBatchWithDuplicateTimestamps(t *testing.T) {
- mockClient := newMockClient()
- stream := &logStream{
- client: mockClient,
- logGroupName: groupName,
- logStreamName: streamName,
- sequenceToken: aws.String(sequenceToken),
- messages: make(chan *logger.Message),
- }
- mockClient.putLogEventsResult <- &putLogEventsResult{
- successResult: &cloudwatchlogs.PutLogEventsOutput{
- NextSequenceToken: aws.String(nextSequenceToken),
- },
- }
- ticks := make(chan time.Time)
- newTicker = func(_ time.Duration) *time.Ticker {
- return &time.Ticker{
- C: ticks,
- }
- }
- go stream.collectBatch()
- times := maximumLogEventsPerPut
- expectedEvents := []*cloudwatchlogs.InputLogEvent{}
- timestamp := time.Now()
- for i := 0; i < times; i++ {
- line := fmt.Sprintf("%d", i)
- if i%2 == 0 {
- timestamp.Add(1 * time.Nanosecond)
- }
- stream.Log(&logger.Message{
- Line: []byte(line),
- Timestamp: timestamp,
- })
- expectedEvents = append(expectedEvents, &cloudwatchlogs.InputLogEvent{
- Message: aws.String(line),
- Timestamp: aws.Int64(timestamp.UnixNano() / int64(time.Millisecond)),
- })
- }
- ticks <- time.Time{}
- stream.Close()
- argument := <-mockClient.putLogEventsArgument
- if argument == nil {
- t.Fatal("Expected non-nil PutLogEventsInput")
- }
- if len(argument.LogEvents) != times {
- t.Errorf("Expected LogEvents to contain %d elements, but contains %d", times, len(argument.LogEvents))
- }
- for i := 0; i < times; i++ {
- if !reflect.DeepEqual(*argument.LogEvents[i], *expectedEvents[i]) {
- t.Errorf("Expected event to be %v but was %v", *expectedEvents[i], *argument.LogEvents[i])
- }
- }
- }
- func TestParseLogOptionsMultilinePattern(t *testing.T) {
- info := logger.Info{
- Config: map[string]string{
- multilinePatternKey: "^xxxx",
- },
- }
- multilinePattern, err := parseMultilineOptions(info)
- assert.Nil(t, err, "Received unexpected error")
- assert.True(t, multilinePattern.MatchString("xxxx"), "No multiline pattern match found")
- }
- func TestParseLogOptionsDatetimeFormat(t *testing.T) {
- datetimeFormatTests := []struct {
- format string
- match string
- }{
- {"%d/%m/%y %a %H:%M:%S%L %Z", "31/12/10 Mon 08:42:44.345 NZDT"},
- {"%Y-%m-%d %A %I:%M:%S.%f%p%z", "2007-12-04 Monday 08:42:44.123456AM+1200"},
- {"%b|%b|%b|%b|%b|%b|%b|%b|%b|%b|%b|%b", "Jan|Feb|Mar|Apr|May|Jun|Jul|Aug|Sep|Oct|Nov|Dec"},
- {"%B|%B|%B|%B|%B|%B|%B|%B|%B|%B|%B|%B", "January|February|March|April|May|June|July|August|September|October|November|December"},
- {"%A|%A|%A|%A|%A|%A|%A", "Monday|Tuesday|Wednesday|Thursday|Friday|Saturday|Sunday"},
- {"%a|%a|%a|%a|%a|%a|%a", "Mon|Tue|Wed|Thu|Fri|Sat|Sun"},
- {"Day of the week: %w, Day of the year: %j", "Day of the week: 4, Day of the year: 091"},
- }
- for _, dt := range datetimeFormatTests {
- t.Run(dt.match, func(t *testing.T) {
- info := logger.Info{
- Config: map[string]string{
- datetimeFormatKey: dt.format,
- },
- }
- multilinePattern, err := parseMultilineOptions(info)
- assert.Nil(t, err, "Received unexpected error")
- assert.True(t, multilinePattern.MatchString(dt.match), "No multiline pattern match found")
- })
- }
- }
- func TestValidateLogOptionsDatetimeFormatAndMultilinePattern(t *testing.T) {
- cfg := map[string]string{
- multilinePatternKey: "^xxxx",
- datetimeFormatKey: "%Y-%m-%d",
- logGroupKey: groupName,
- }
- conflictingLogOptionsError := "you cannot configure log opt 'awslogs-datetime-format' and 'awslogs-multiline-pattern' at the same time"
- err := ValidateLogOpt(cfg)
- assert.NotNil(t, err, "Expected an error")
- assert.Equal(t, err.Error(), conflictingLogOptionsError, "Received invalid error")
- }
- func TestCreateTagSuccess(t *testing.T) {
- mockClient := newMockClient()
- info := logger.Info{
- ContainerName: "/test-container",
- ContainerID: "container-abcdefghijklmnopqrstuvwxyz01234567890",
- Config: map[string]string{"tag": "{{.Name}}/{{.FullID}}"},
- }
- logStreamName, e := loggerutils.ParseLogTag(info, loggerutils.DefaultTemplate)
- if e != nil {
- t.Errorf("Error generating tag: %q", e)
- }
- stream := &logStream{
- client: mockClient,
- logGroupName: groupName,
- logStreamName: logStreamName,
- }
- mockClient.createLogStreamResult <- &createLogStreamResult{}
- err := stream.create()
- if err != nil {
- t.Errorf("Received unexpected err: %v\n", err)
- }
- argument := <-mockClient.createLogStreamArgument
- if *argument.LogStreamName != "test-container/container-abcdefghijklmnopqrstuvwxyz01234567890" {
- t.Errorf("Expected LogStreamName to be %s", "test-container/container-abcdefghijklmnopqrstuvwxyz01234567890")
- }
- }
- func BenchmarkUnwrapEvents(b *testing.B) {
- events := make([]wrappedEvent, maximumLogEventsPerPut)
- for i := 0; i < maximumLogEventsPerPut; i++ {
- mes := strings.Repeat("0", maximumBytesPerEvent)
- events[i].inputLogEvent = &cloudwatchlogs.InputLogEvent{
- Message: &mes,
- }
- }
- as := assert.New(b)
- b.ResetTimer()
- for i := 0; i < b.N; i++ {
- res := unwrapEvents(events)
- as.Len(res, maximumLogEventsPerPut)
- }
- }
|