1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231 |
- package awslogs
- import (
- "errors"
- "fmt"
- "io/ioutil"
- "net/http"
- "net/http/httptest"
- "os"
- "reflect"
- "regexp"
- "runtime"
- "strings"
- "testing"
- "time"
- "github.com/aws/aws-sdk-go/aws"
- "github.com/aws/aws-sdk-go/aws/awserr"
- "github.com/aws/aws-sdk-go/aws/request"
- "github.com/aws/aws-sdk-go/service/cloudwatchlogs"
- "github.com/docker/docker/daemon/logger"
- "github.com/docker/docker/daemon/logger/loggerutils"
- "github.com/docker/docker/dockerversion"
- "github.com/stretchr/testify/assert"
- )
- const (
- groupName = "groupName"
- streamName = "streamName"
- sequenceToken = "sequenceToken"
- nextSequenceToken = "nextSequenceToken"
- logline = "this is a log line\r"
- multilineLogline = "2017-01-01 01:01:44 This is a multiline log entry\r"
- )
- // Generates i multi-line events each with j lines
- func (l *logStream) logGenerator(lineCount int, multilineCount int) {
- for i := 0; i < multilineCount; i++ {
- l.Log(&logger.Message{
- Line: []byte(multilineLogline),
- Timestamp: time.Time{},
- })
- for j := 0; j < lineCount; j++ {
- l.Log(&logger.Message{
- Line: []byte(logline),
- Timestamp: time.Time{},
- })
- }
- }
- }
- func testEventBatch(events []wrappedEvent) *eventBatch {
- batch := newEventBatch()
- for _, event := range events {
- eventlen := len([]byte(*event.inputLogEvent.Message))
- batch.add(event, eventlen)
- }
- return batch
- }
- func TestNewAWSLogsClientUserAgentHandler(t *testing.T) {
- info := logger.Info{
- Config: map[string]string{
- regionKey: "us-east-1",
- },
- }
- client, err := newAWSLogsClient(info)
- if err != nil {
- t.Fatal(err)
- }
- realClient, ok := client.(*cloudwatchlogs.CloudWatchLogs)
- if !ok {
- t.Fatal("Could not cast client to cloudwatchlogs.CloudWatchLogs")
- }
- buildHandlerList := realClient.Handlers.Build
- request := &request.Request{
- HTTPRequest: &http.Request{
- Header: http.Header{},
- },
- }
- buildHandlerList.Run(request)
- expectedUserAgentString := fmt.Sprintf("Docker %s (%s) %s/%s (%s; %s; %s)",
- dockerversion.Version, runtime.GOOS, aws.SDKName, aws.SDKVersion, runtime.Version(), runtime.GOOS, runtime.GOARCH)
- userAgent := request.HTTPRequest.Header.Get("User-Agent")
- if userAgent != expectedUserAgentString {
- t.Errorf("Wrong User-Agent string, expected \"%s\" but was \"%s\"",
- expectedUserAgentString, userAgent)
- }
- }
- func TestNewAWSLogsClientRegionDetect(t *testing.T) {
- info := logger.Info{
- Config: map[string]string{},
- }
- mockMetadata := newMockMetadataClient()
- newRegionFinder = func() regionFinder {
- return mockMetadata
- }
- mockMetadata.regionResult <- ®ionResult{
- successResult: "us-east-1",
- }
- _, err := newAWSLogsClient(info)
- if err != nil {
- t.Fatal(err)
- }
- }
- func TestCreateSuccess(t *testing.T) {
- mockClient := newMockClient()
- stream := &logStream{
- client: mockClient,
- logGroupName: groupName,
- logStreamName: streamName,
- }
- mockClient.createLogStreamResult <- &createLogStreamResult{}
- err := stream.create()
- if err != nil {
- t.Errorf("Received unexpected err: %v\n", err)
- }
- argument := <-mockClient.createLogStreamArgument
- if argument.LogGroupName == nil {
- t.Fatal("Expected non-nil LogGroupName")
- }
- if *argument.LogGroupName != groupName {
- t.Errorf("Expected LogGroupName to be %s", groupName)
- }
- if argument.LogStreamName == nil {
- t.Fatal("Expected non-nil LogStreamName")
- }
- if *argument.LogStreamName != streamName {
- t.Errorf("Expected LogStreamName to be %s", streamName)
- }
- }
- func TestCreateLogGroupSuccess(t *testing.T) {
- mockClient := newMockClient()
- stream := &logStream{
- client: mockClient,
- logGroupName: groupName,
- logStreamName: streamName,
- logCreateGroup: true,
- }
- mockClient.createLogGroupResult <- &createLogGroupResult{}
- mockClient.createLogStreamResult <- &createLogStreamResult{}
- err := stream.create()
- if err != nil {
- t.Errorf("Received unexpected err: %v\n", err)
- }
- argument := <-mockClient.createLogStreamArgument
- if argument.LogGroupName == nil {
- t.Fatal("Expected non-nil LogGroupName")
- }
- if *argument.LogGroupName != groupName {
- t.Errorf("Expected LogGroupName to be %s", groupName)
- }
- if argument.LogStreamName == nil {
- t.Fatal("Expected non-nil LogStreamName")
- }
- if *argument.LogStreamName != streamName {
- t.Errorf("Expected LogStreamName to be %s", streamName)
- }
- }
- func TestCreateError(t *testing.T) {
- mockClient := newMockClient()
- stream := &logStream{
- client: mockClient,
- }
- mockClient.createLogStreamResult <- &createLogStreamResult{
- errorResult: errors.New("Error"),
- }
- err := stream.create()
- if err == nil {
- t.Fatal("Expected non-nil err")
- }
- }
- func TestCreateAlreadyExists(t *testing.T) {
- mockClient := newMockClient()
- stream := &logStream{
- client: mockClient,
- }
- mockClient.createLogStreamResult <- &createLogStreamResult{
- errorResult: awserr.New(resourceAlreadyExistsCode, "", nil),
- }
- err := stream.create()
- if err != nil {
- t.Fatal("Expected nil err")
- }
- }
- func TestPublishBatchSuccess(t *testing.T) {
- mockClient := newMockClient()
- stream := &logStream{
- client: mockClient,
- logGroupName: groupName,
- logStreamName: streamName,
- sequenceToken: aws.String(sequenceToken),
- }
- mockClient.putLogEventsResult <- &putLogEventsResult{
- successResult: &cloudwatchlogs.PutLogEventsOutput{
- NextSequenceToken: aws.String(nextSequenceToken),
- },
- }
- events := []wrappedEvent{
- {
- inputLogEvent: &cloudwatchlogs.InputLogEvent{
- Message: aws.String(logline),
- },
- },
- }
- stream.publishBatch(testEventBatch(events))
- if stream.sequenceToken == nil {
- t.Fatal("Expected non-nil sequenceToken")
- }
- if *stream.sequenceToken != nextSequenceToken {
- t.Errorf("Expected sequenceToken to be %s, but was %s", nextSequenceToken, *stream.sequenceToken)
- }
- argument := <-mockClient.putLogEventsArgument
- if argument == nil {
- t.Fatal("Expected non-nil PutLogEventsInput")
- }
- if argument.SequenceToken == nil {
- t.Fatal("Expected non-nil PutLogEventsInput.SequenceToken")
- }
- if *argument.SequenceToken != sequenceToken {
- t.Errorf("Expected PutLogEventsInput.SequenceToken to be %s, but was %s", sequenceToken, *argument.SequenceToken)
- }
- if len(argument.LogEvents) != 1 {
- t.Errorf("Expected LogEvents to contain 1 element, but contains %d", len(argument.LogEvents))
- }
- if argument.LogEvents[0] != events[0].inputLogEvent {
- t.Error("Expected event to equal input")
- }
- }
- func TestPublishBatchError(t *testing.T) {
- mockClient := newMockClient()
- stream := &logStream{
- client: mockClient,
- logGroupName: groupName,
- logStreamName: streamName,
- sequenceToken: aws.String(sequenceToken),
- }
- mockClient.putLogEventsResult <- &putLogEventsResult{
- errorResult: errors.New("Error"),
- }
- events := []wrappedEvent{
- {
- inputLogEvent: &cloudwatchlogs.InputLogEvent{
- Message: aws.String(logline),
- },
- },
- }
- stream.publishBatch(testEventBatch(events))
- if stream.sequenceToken == nil {
- t.Fatal("Expected non-nil sequenceToken")
- }
- if *stream.sequenceToken != sequenceToken {
- t.Errorf("Expected sequenceToken to be %s, but was %s", sequenceToken, *stream.sequenceToken)
- }
- }
- func TestPublishBatchInvalidSeqSuccess(t *testing.T) {
- mockClient := newMockClientBuffered(2)
- stream := &logStream{
- client: mockClient,
- logGroupName: groupName,
- logStreamName: streamName,
- sequenceToken: aws.String(sequenceToken),
- }
- mockClient.putLogEventsResult <- &putLogEventsResult{
- errorResult: awserr.New(invalidSequenceTokenCode, "use token token", nil),
- }
- mockClient.putLogEventsResult <- &putLogEventsResult{
- successResult: &cloudwatchlogs.PutLogEventsOutput{
- NextSequenceToken: aws.String(nextSequenceToken),
- },
- }
- events := []wrappedEvent{
- {
- inputLogEvent: &cloudwatchlogs.InputLogEvent{
- Message: aws.String(logline),
- },
- },
- }
- stream.publishBatch(testEventBatch(events))
- if stream.sequenceToken == nil {
- t.Fatal("Expected non-nil sequenceToken")
- }
- if *stream.sequenceToken != nextSequenceToken {
- t.Errorf("Expected sequenceToken to be %s, but was %s", nextSequenceToken, *stream.sequenceToken)
- }
- argument := <-mockClient.putLogEventsArgument
- if argument == nil {
- t.Fatal("Expected non-nil PutLogEventsInput")
- }
- if argument.SequenceToken == nil {
- t.Fatal("Expected non-nil PutLogEventsInput.SequenceToken")
- }
- if *argument.SequenceToken != sequenceToken {
- t.Errorf("Expected PutLogEventsInput.SequenceToken to be %s, but was %s", sequenceToken, *argument.SequenceToken)
- }
- if len(argument.LogEvents) != 1 {
- t.Errorf("Expected LogEvents to contain 1 element, but contains %d", len(argument.LogEvents))
- }
- if argument.LogEvents[0] != events[0].inputLogEvent {
- t.Error("Expected event to equal input")
- }
- argument = <-mockClient.putLogEventsArgument
- if argument == nil {
- t.Fatal("Expected non-nil PutLogEventsInput")
- }
- if argument.SequenceToken == nil {
- t.Fatal("Expected non-nil PutLogEventsInput.SequenceToken")
- }
- if *argument.SequenceToken != "token" {
- t.Errorf("Expected PutLogEventsInput.SequenceToken to be %s, but was %s", "token", *argument.SequenceToken)
- }
- if len(argument.LogEvents) != 1 {
- t.Errorf("Expected LogEvents to contain 1 element, but contains %d", len(argument.LogEvents))
- }
- if argument.LogEvents[0] != events[0].inputLogEvent {
- t.Error("Expected event to equal input")
- }
- }
- func TestPublishBatchAlreadyAccepted(t *testing.T) {
- mockClient := newMockClient()
- stream := &logStream{
- client: mockClient,
- logGroupName: groupName,
- logStreamName: streamName,
- sequenceToken: aws.String(sequenceToken),
- }
- mockClient.putLogEventsResult <- &putLogEventsResult{
- errorResult: awserr.New(dataAlreadyAcceptedCode, "use token token", nil),
- }
- events := []wrappedEvent{
- {
- inputLogEvent: &cloudwatchlogs.InputLogEvent{
- Message: aws.String(logline),
- },
- },
- }
- stream.publishBatch(testEventBatch(events))
- if stream.sequenceToken == nil {
- t.Fatal("Expected non-nil sequenceToken")
- }
- if *stream.sequenceToken != "token" {
- t.Errorf("Expected sequenceToken to be %s, but was %s", "token", *stream.sequenceToken)
- }
- argument := <-mockClient.putLogEventsArgument
- if argument == nil {
- t.Fatal("Expected non-nil PutLogEventsInput")
- }
- if argument.SequenceToken == nil {
- t.Fatal("Expected non-nil PutLogEventsInput.SequenceToken")
- }
- if *argument.SequenceToken != sequenceToken {
- t.Errorf("Expected PutLogEventsInput.SequenceToken to be %s, but was %s", sequenceToken, *argument.SequenceToken)
- }
- if len(argument.LogEvents) != 1 {
- t.Errorf("Expected LogEvents to contain 1 element, but contains %d", len(argument.LogEvents))
- }
- if argument.LogEvents[0] != events[0].inputLogEvent {
- t.Error("Expected event to equal input")
- }
- }
- func TestCollectBatchSimple(t *testing.T) {
- mockClient := newMockClient()
- stream := &logStream{
- client: mockClient,
- logGroupName: groupName,
- logStreamName: streamName,
- sequenceToken: aws.String(sequenceToken),
- messages: make(chan *logger.Message),
- }
- mockClient.putLogEventsResult <- &putLogEventsResult{
- successResult: &cloudwatchlogs.PutLogEventsOutput{
- NextSequenceToken: aws.String(nextSequenceToken),
- },
- }
- ticks := make(chan time.Time)
- newTicker = func(_ time.Duration) *time.Ticker {
- return &time.Ticker{
- C: ticks,
- }
- }
- go stream.collectBatch()
- stream.Log(&logger.Message{
- Line: []byte(logline),
- Timestamp: time.Time{},
- })
- ticks <- time.Time{}
- stream.Close()
- argument := <-mockClient.putLogEventsArgument
- if argument == nil {
- t.Fatal("Expected non-nil PutLogEventsInput")
- }
- if len(argument.LogEvents) != 1 {
- t.Errorf("Expected LogEvents to contain 1 element, but contains %d", len(argument.LogEvents))
- }
- if *argument.LogEvents[0].Message != logline {
- t.Errorf("Expected message to be %s but was %s", logline, *argument.LogEvents[0].Message)
- }
- }
- func TestCollectBatchTicker(t *testing.T) {
- mockClient := newMockClient()
- stream := &logStream{
- client: mockClient,
- logGroupName: groupName,
- logStreamName: streamName,
- sequenceToken: aws.String(sequenceToken),
- messages: make(chan *logger.Message),
- }
- mockClient.putLogEventsResult <- &putLogEventsResult{
- successResult: &cloudwatchlogs.PutLogEventsOutput{
- NextSequenceToken: aws.String(nextSequenceToken),
- },
- }
- ticks := make(chan time.Time)
- newTicker = func(_ time.Duration) *time.Ticker {
- return &time.Ticker{
- C: ticks,
- }
- }
- go stream.collectBatch()
- stream.Log(&logger.Message{
- Line: []byte(logline + " 1"),
- Timestamp: time.Time{},
- })
- stream.Log(&logger.Message{
- Line: []byte(logline + " 2"),
- Timestamp: time.Time{},
- })
- ticks <- time.Time{}
- // Verify first batch
- argument := <-mockClient.putLogEventsArgument
- if argument == nil {
- t.Fatal("Expected non-nil PutLogEventsInput")
- }
- if len(argument.LogEvents) != 2 {
- t.Errorf("Expected LogEvents to contain 2 elements, but contains %d", len(argument.LogEvents))
- }
- if *argument.LogEvents[0].Message != logline+" 1" {
- t.Errorf("Expected message to be %s but was %s", logline+" 1", *argument.LogEvents[0].Message)
- }
- if *argument.LogEvents[1].Message != logline+" 2" {
- t.Errorf("Expected message to be %s but was %s", logline+" 2", *argument.LogEvents[0].Message)
- }
- stream.Log(&logger.Message{
- Line: []byte(logline + " 3"),
- Timestamp: time.Time{},
- })
- ticks <- time.Time{}
- argument = <-mockClient.putLogEventsArgument
- if argument == nil {
- t.Fatal("Expected non-nil PutLogEventsInput")
- }
- if len(argument.LogEvents) != 1 {
- t.Errorf("Expected LogEvents to contain 1 elements, but contains %d", len(argument.LogEvents))
- }
- if *argument.LogEvents[0].Message != logline+" 3" {
- t.Errorf("Expected message to be %s but was %s", logline+" 3", *argument.LogEvents[0].Message)
- }
- stream.Close()
- }
- func TestCollectBatchMultilinePattern(t *testing.T) {
- mockClient := newMockClient()
- multilinePattern := regexp.MustCompile("xxxx")
- stream := &logStream{
- client: mockClient,
- logGroupName: groupName,
- logStreamName: streamName,
- multilinePattern: multilinePattern,
- sequenceToken: aws.String(sequenceToken),
- messages: make(chan *logger.Message),
- }
- mockClient.putLogEventsResult <- &putLogEventsResult{
- successResult: &cloudwatchlogs.PutLogEventsOutput{
- NextSequenceToken: aws.String(nextSequenceToken),
- },
- }
- ticks := make(chan time.Time)
- newTicker = func(_ time.Duration) *time.Ticker {
- return &time.Ticker{
- C: ticks,
- }
- }
- go stream.collectBatch()
- stream.Log(&logger.Message{
- Line: []byte(logline),
- Timestamp: time.Now(),
- })
- stream.Log(&logger.Message{
- Line: []byte(logline),
- Timestamp: time.Now(),
- })
- stream.Log(&logger.Message{
- Line: []byte("xxxx " + logline),
- Timestamp: time.Now(),
- })
- ticks <- time.Now()
- // Verify single multiline event
- argument := <-mockClient.putLogEventsArgument
- assert.NotNil(t, argument, "Expected non-nil PutLogEventsInput")
- assert.Equal(t, 1, len(argument.LogEvents), "Expected single multiline event")
- assert.Equal(t, logline+"\n"+logline+"\n", *argument.LogEvents[0].Message, "Received incorrect multiline message")
- stream.Close()
- // Verify single event
- argument = <-mockClient.putLogEventsArgument
- assert.NotNil(t, argument, "Expected non-nil PutLogEventsInput")
- assert.Equal(t, 1, len(argument.LogEvents), "Expected single multiline event")
- assert.Equal(t, "xxxx "+logline+"\n", *argument.LogEvents[0].Message, "Received incorrect multiline message")
- }
- func BenchmarkCollectBatch(b *testing.B) {
- for i := 0; i < b.N; i++ {
- mockClient := newMockClient()
- stream := &logStream{
- client: mockClient,
- logGroupName: groupName,
- logStreamName: streamName,
- sequenceToken: aws.String(sequenceToken),
- messages: make(chan *logger.Message),
- }
- mockClient.putLogEventsResult <- &putLogEventsResult{
- successResult: &cloudwatchlogs.PutLogEventsOutput{
- NextSequenceToken: aws.String(nextSequenceToken),
- },
- }
- ticks := make(chan time.Time)
- newTicker = func(_ time.Duration) *time.Ticker {
- return &time.Ticker{
- C: ticks,
- }
- }
- go stream.collectBatch()
- stream.logGenerator(10, 100)
- ticks <- time.Time{}
- stream.Close()
- }
- }
- func BenchmarkCollectBatchMultilinePattern(b *testing.B) {
- for i := 0; i < b.N; i++ {
- mockClient := newMockClient()
- multilinePattern := regexp.MustCompile(`\d{4}-(?:0[1-9]|1[0-2])-(?:0[1-9]|[1,2][0-9]|3[0,1]) (?:[0,1][0-9]|2[0-3]):[0-5][0-9]:[0-5][0-9]`)
- stream := &logStream{
- client: mockClient,
- logGroupName: groupName,
- logStreamName: streamName,
- multilinePattern: multilinePattern,
- sequenceToken: aws.String(sequenceToken),
- messages: make(chan *logger.Message),
- }
- mockClient.putLogEventsResult <- &putLogEventsResult{
- successResult: &cloudwatchlogs.PutLogEventsOutput{
- NextSequenceToken: aws.String(nextSequenceToken),
- },
- }
- ticks := make(chan time.Time)
- newTicker = func(_ time.Duration) *time.Ticker {
- return &time.Ticker{
- C: ticks,
- }
- }
- go stream.collectBatch()
- stream.logGenerator(10, 100)
- ticks <- time.Time{}
- stream.Close()
- }
- }
- func TestCollectBatchMultilinePatternMaxEventAge(t *testing.T) {
- mockClient := newMockClient()
- multilinePattern := regexp.MustCompile("xxxx")
- stream := &logStream{
- client: mockClient,
- logGroupName: groupName,
- logStreamName: streamName,
- multilinePattern: multilinePattern,
- sequenceToken: aws.String(sequenceToken),
- messages: make(chan *logger.Message),
- }
- mockClient.putLogEventsResult <- &putLogEventsResult{
- successResult: &cloudwatchlogs.PutLogEventsOutput{
- NextSequenceToken: aws.String(nextSequenceToken),
- },
- }
- ticks := make(chan time.Time)
- newTicker = func(_ time.Duration) *time.Ticker {
- return &time.Ticker{
- C: ticks,
- }
- }
- go stream.collectBatch()
- stream.Log(&logger.Message{
- Line: []byte(logline),
- Timestamp: time.Now(),
- })
- // Log an event 1 second later
- stream.Log(&logger.Message{
- Line: []byte(logline),
- Timestamp: time.Now().Add(time.Second),
- })
- // Fire ticker batchPublishFrequency seconds later
- ticks <- time.Now().Add(batchPublishFrequency + time.Second)
- // Verify single multiline event is flushed after maximum event buffer age (batchPublishFrequency)
- argument := <-mockClient.putLogEventsArgument
- assert.NotNil(t, argument, "Expected non-nil PutLogEventsInput")
- assert.Equal(t, 1, len(argument.LogEvents), "Expected single multiline event")
- assert.Equal(t, logline+"\n"+logline+"\n", *argument.LogEvents[0].Message, "Received incorrect multiline message")
- // Log an event 1 second later
- stream.Log(&logger.Message{
- Line: []byte(logline),
- Timestamp: time.Now().Add(time.Second),
- })
- // Fire ticker another batchPublishFrequency seconds later
- ticks <- time.Now().Add(2*batchPublishFrequency + time.Second)
- // Verify the event buffer is truly flushed - we should only receive a single event
- argument = <-mockClient.putLogEventsArgument
- assert.NotNil(t, argument, "Expected non-nil PutLogEventsInput")
- assert.Equal(t, 1, len(argument.LogEvents), "Expected single multiline event")
- assert.Equal(t, logline+"\n", *argument.LogEvents[0].Message, "Received incorrect multiline message")
- stream.Close()
- }
- func TestCollectBatchMultilinePatternNegativeEventAge(t *testing.T) {
- mockClient := newMockClient()
- multilinePattern := regexp.MustCompile("xxxx")
- stream := &logStream{
- client: mockClient,
- logGroupName: groupName,
- logStreamName: streamName,
- multilinePattern: multilinePattern,
- sequenceToken: aws.String(sequenceToken),
- messages: make(chan *logger.Message),
- }
- mockClient.putLogEventsResult <- &putLogEventsResult{
- successResult: &cloudwatchlogs.PutLogEventsOutput{
- NextSequenceToken: aws.String(nextSequenceToken),
- },
- }
- ticks := make(chan time.Time)
- newTicker = func(_ time.Duration) *time.Ticker {
- return &time.Ticker{
- C: ticks,
- }
- }
- go stream.collectBatch()
- stream.Log(&logger.Message{
- Line: []byte(logline),
- Timestamp: time.Now(),
- })
- // Log an event 1 second later
- stream.Log(&logger.Message{
- Line: []byte(logline),
- Timestamp: time.Now().Add(time.Second),
- })
- // Fire ticker in past to simulate negative event buffer age
- ticks <- time.Now().Add(-time.Second)
- // Verify single multiline event is flushed with a negative event buffer age
- argument := <-mockClient.putLogEventsArgument
- assert.NotNil(t, argument, "Expected non-nil PutLogEventsInput")
- assert.Equal(t, 1, len(argument.LogEvents), "Expected single multiline event")
- assert.Equal(t, logline+"\n"+logline+"\n", *argument.LogEvents[0].Message, "Received incorrect multiline message")
- stream.Close()
- }
- func TestCollectBatchClose(t *testing.T) {
- mockClient := newMockClient()
- stream := &logStream{
- client: mockClient,
- logGroupName: groupName,
- logStreamName: streamName,
- sequenceToken: aws.String(sequenceToken),
- messages: make(chan *logger.Message),
- }
- mockClient.putLogEventsResult <- &putLogEventsResult{
- successResult: &cloudwatchlogs.PutLogEventsOutput{
- NextSequenceToken: aws.String(nextSequenceToken),
- },
- }
- var ticks = make(chan time.Time)
- newTicker = func(_ time.Duration) *time.Ticker {
- return &time.Ticker{
- C: ticks,
- }
- }
- go stream.collectBatch()
- stream.Log(&logger.Message{
- Line: []byte(logline),
- Timestamp: time.Time{},
- })
- // no ticks
- stream.Close()
- argument := <-mockClient.putLogEventsArgument
- if argument == nil {
- t.Fatal("Expected non-nil PutLogEventsInput")
- }
- if len(argument.LogEvents) != 1 {
- t.Errorf("Expected LogEvents to contain 1 element, but contains %d", len(argument.LogEvents))
- }
- if *argument.LogEvents[0].Message != logline {
- t.Errorf("Expected message to be %s but was %s", logline, *argument.LogEvents[0].Message)
- }
- }
- func TestCollectBatchLineSplit(t *testing.T) {
- mockClient := newMockClient()
- stream := &logStream{
- client: mockClient,
- logGroupName: groupName,
- logStreamName: streamName,
- sequenceToken: aws.String(sequenceToken),
- messages: make(chan *logger.Message),
- }
- mockClient.putLogEventsResult <- &putLogEventsResult{
- successResult: &cloudwatchlogs.PutLogEventsOutput{
- NextSequenceToken: aws.String(nextSequenceToken),
- },
- }
- var ticks = make(chan time.Time)
- newTicker = func(_ time.Duration) *time.Ticker {
- return &time.Ticker{
- C: ticks,
- }
- }
- go stream.collectBatch()
- longline := strings.Repeat("A", maximumBytesPerEvent)
- stream.Log(&logger.Message{
- Line: []byte(longline + "B"),
- Timestamp: time.Time{},
- })
- // no ticks
- stream.Close()
- argument := <-mockClient.putLogEventsArgument
- if argument == nil {
- t.Fatal("Expected non-nil PutLogEventsInput")
- }
- if len(argument.LogEvents) != 2 {
- t.Errorf("Expected LogEvents to contain 2 elements, but contains %d", len(argument.LogEvents))
- }
- if *argument.LogEvents[0].Message != longline {
- t.Errorf("Expected message to be %s but was %s", longline, *argument.LogEvents[0].Message)
- }
- if *argument.LogEvents[1].Message != "B" {
- t.Errorf("Expected message to be %s but was %s", "B", *argument.LogEvents[1].Message)
- }
- }
- func TestCollectBatchMaxEvents(t *testing.T) {
- mockClient := newMockClientBuffered(1)
- stream := &logStream{
- client: mockClient,
- logGroupName: groupName,
- logStreamName: streamName,
- sequenceToken: aws.String(sequenceToken),
- messages: make(chan *logger.Message),
- }
- mockClient.putLogEventsResult <- &putLogEventsResult{
- successResult: &cloudwatchlogs.PutLogEventsOutput{
- NextSequenceToken: aws.String(nextSequenceToken),
- },
- }
- var ticks = make(chan time.Time)
- newTicker = func(_ time.Duration) *time.Ticker {
- return &time.Ticker{
- C: ticks,
- }
- }
- go stream.collectBatch()
- line := "A"
- for i := 0; i <= maximumLogEventsPerPut; i++ {
- stream.Log(&logger.Message{
- Line: []byte(line),
- Timestamp: time.Time{},
- })
- }
- // no ticks
- stream.Close()
- argument := <-mockClient.putLogEventsArgument
- if argument == nil {
- t.Fatal("Expected non-nil PutLogEventsInput")
- }
- if len(argument.LogEvents) != maximumLogEventsPerPut {
- t.Errorf("Expected LogEvents to contain %d elements, but contains %d", maximumLogEventsPerPut, len(argument.LogEvents))
- }
- argument = <-mockClient.putLogEventsArgument
- if argument == nil {
- t.Fatal("Expected non-nil PutLogEventsInput")
- }
- if len(argument.LogEvents) != 1 {
- t.Errorf("Expected LogEvents to contain %d elements, but contains %d", 1, len(argument.LogEvents))
- }
- }
- func TestCollectBatchMaxTotalBytes(t *testing.T) {
- expectedPuts := 2
- mockClient := newMockClientBuffered(expectedPuts)
- stream := &logStream{
- client: mockClient,
- logGroupName: groupName,
- logStreamName: streamName,
- sequenceToken: aws.String(sequenceToken),
- messages: make(chan *logger.Message),
- }
- for i := 0; i < expectedPuts; i++ {
- mockClient.putLogEventsResult <- &putLogEventsResult{
- successResult: &cloudwatchlogs.PutLogEventsOutput{
- NextSequenceToken: aws.String(nextSequenceToken),
- },
- }
- }
- var ticks = make(chan time.Time)
- newTicker = func(_ time.Duration) *time.Ticker {
- return &time.Ticker{
- C: ticks,
- }
- }
- go stream.collectBatch()
- numPayloads := maximumBytesPerPut / (maximumBytesPerEvent + perEventBytes)
- // maxline is the maximum line that could be submitted after
- // accounting for its overhead.
- maxline := strings.Repeat("A", maximumBytesPerPut-(perEventBytes*numPayloads))
- // This will be split and batched up to the `maximumBytesPerPut'
- // (+/- `maximumBytesPerEvent'). This /should/ be aligned, but
- // should also tolerate an offset within that range.
- stream.Log(&logger.Message{
- Line: []byte(maxline[:len(maxline)/2]),
- Timestamp: time.Time{},
- })
- stream.Log(&logger.Message{
- Line: []byte(maxline[len(maxline)/2:]),
- Timestamp: time.Time{},
- })
- stream.Log(&logger.Message{
- Line: []byte("B"),
- Timestamp: time.Time{},
- })
- // no ticks, guarantee batch by size (and chan close)
- stream.Close()
- argument := <-mockClient.putLogEventsArgument
- if argument == nil {
- t.Fatal("Expected non-nil PutLogEventsInput")
- }
- // Should total to the maximum allowed bytes.
- eventBytes := 0
- for _, event := range argument.LogEvents {
- eventBytes += len(*event.Message)
- }
- eventsOverhead := len(argument.LogEvents) * perEventBytes
- payloadTotal := eventBytes + eventsOverhead
- // lowestMaxBatch allows the payload to be offset if the messages
- // don't lend themselves to align with the maximum event size.
- lowestMaxBatch := maximumBytesPerPut - maximumBytesPerEvent
- if payloadTotal > maximumBytesPerPut {
- t.Errorf("Expected <= %d bytes but was %d", maximumBytesPerPut, payloadTotal)
- }
- if payloadTotal < lowestMaxBatch {
- t.Errorf("Batch to be no less than %d but was %d", lowestMaxBatch, payloadTotal)
- }
- argument = <-mockClient.putLogEventsArgument
- if len(argument.LogEvents) != 1 {
- t.Errorf("Expected LogEvents to contain 1 elements, but contains %d", len(argument.LogEvents))
- }
- message := *argument.LogEvents[len(argument.LogEvents)-1].Message
- if message[len(message)-1:] != "B" {
- t.Errorf("Expected message to be %s but was %s", "B", message[len(message)-1:])
- }
- }
- func TestCollectBatchWithDuplicateTimestamps(t *testing.T) {
- mockClient := newMockClient()
- stream := &logStream{
- client: mockClient,
- logGroupName: groupName,
- logStreamName: streamName,
- sequenceToken: aws.String(sequenceToken),
- messages: make(chan *logger.Message),
- }
- mockClient.putLogEventsResult <- &putLogEventsResult{
- successResult: &cloudwatchlogs.PutLogEventsOutput{
- NextSequenceToken: aws.String(nextSequenceToken),
- },
- }
- ticks := make(chan time.Time)
- newTicker = func(_ time.Duration) *time.Ticker {
- return &time.Ticker{
- C: ticks,
- }
- }
- go stream.collectBatch()
- times := maximumLogEventsPerPut
- expectedEvents := []*cloudwatchlogs.InputLogEvent{}
- timestamp := time.Now()
- for i := 0; i < times; i++ {
- line := fmt.Sprintf("%d", i)
- if i%2 == 0 {
- timestamp.Add(1 * time.Nanosecond)
- }
- stream.Log(&logger.Message{
- Line: []byte(line),
- Timestamp: timestamp,
- })
- expectedEvents = append(expectedEvents, &cloudwatchlogs.InputLogEvent{
- Message: aws.String(line),
- Timestamp: aws.Int64(timestamp.UnixNano() / int64(time.Millisecond)),
- })
- }
- ticks <- time.Time{}
- stream.Close()
- argument := <-mockClient.putLogEventsArgument
- if argument == nil {
- t.Fatal("Expected non-nil PutLogEventsInput")
- }
- if len(argument.LogEvents) != times {
- t.Errorf("Expected LogEvents to contain %d elements, but contains %d", times, len(argument.LogEvents))
- }
- for i := 0; i < times; i++ {
- if !reflect.DeepEqual(*argument.LogEvents[i], *expectedEvents[i]) {
- t.Errorf("Expected event to be %v but was %v", *expectedEvents[i], *argument.LogEvents[i])
- }
- }
- }
- func TestParseLogOptionsMultilinePattern(t *testing.T) {
- info := logger.Info{
- Config: map[string]string{
- multilinePatternKey: "^xxxx",
- },
- }
- multilinePattern, err := parseMultilineOptions(info)
- assert.Nil(t, err, "Received unexpected error")
- assert.True(t, multilinePattern.MatchString("xxxx"), "No multiline pattern match found")
- }
- func TestParseLogOptionsDatetimeFormat(t *testing.T) {
- datetimeFormatTests := []struct {
- format string
- match string
- }{
- {"%d/%m/%y %a %H:%M:%S%L %Z", "31/12/10 Mon 08:42:44.345 NZDT"},
- {"%Y-%m-%d %A %I:%M:%S.%f%p%z", "2007-12-04 Monday 08:42:44.123456AM+1200"},
- {"%b|%b|%b|%b|%b|%b|%b|%b|%b|%b|%b|%b", "Jan|Feb|Mar|Apr|May|Jun|Jul|Aug|Sep|Oct|Nov|Dec"},
- {"%B|%B|%B|%B|%B|%B|%B|%B|%B|%B|%B|%B", "January|February|March|April|May|June|July|August|September|October|November|December"},
- {"%A|%A|%A|%A|%A|%A|%A", "Monday|Tuesday|Wednesday|Thursday|Friday|Saturday|Sunday"},
- {"%a|%a|%a|%a|%a|%a|%a", "Mon|Tue|Wed|Thu|Fri|Sat|Sun"},
- {"Day of the week: %w, Day of the year: %j", "Day of the week: 4, Day of the year: 091"},
- }
- for _, dt := range datetimeFormatTests {
- t.Run(dt.match, func(t *testing.T) {
- info := logger.Info{
- Config: map[string]string{
- datetimeFormatKey: dt.format,
- },
- }
- multilinePattern, err := parseMultilineOptions(info)
- assert.Nil(t, err, "Received unexpected error")
- assert.True(t, multilinePattern.MatchString(dt.match), "No multiline pattern match found")
- })
- }
- }
- func TestValidateLogOptionsDatetimeFormatAndMultilinePattern(t *testing.T) {
- cfg := map[string]string{
- multilinePatternKey: "^xxxx",
- datetimeFormatKey: "%Y-%m-%d",
- logGroupKey: groupName,
- }
- conflictingLogOptionsError := "you cannot configure log opt 'awslogs-datetime-format' and 'awslogs-multiline-pattern' at the same time"
- err := ValidateLogOpt(cfg)
- assert.NotNil(t, err, "Expected an error")
- assert.Equal(t, err.Error(), conflictingLogOptionsError, "Received invalid error")
- }
- func TestCreateTagSuccess(t *testing.T) {
- mockClient := newMockClient()
- info := logger.Info{
- ContainerName: "/test-container",
- ContainerID: "container-abcdefghijklmnopqrstuvwxyz01234567890",
- Config: map[string]string{"tag": "{{.Name}}/{{.FullID}}"},
- }
- logStreamName, e := loggerutils.ParseLogTag(info, loggerutils.DefaultTemplate)
- if e != nil {
- t.Errorf("Error generating tag: %q", e)
- }
- stream := &logStream{
- client: mockClient,
- logGroupName: groupName,
- logStreamName: logStreamName,
- }
- mockClient.createLogStreamResult <- &createLogStreamResult{}
- err := stream.create()
- if err != nil {
- t.Errorf("Received unexpected err: %v\n", err)
- }
- argument := <-mockClient.createLogStreamArgument
- if *argument.LogStreamName != "test-container/container-abcdefghijklmnopqrstuvwxyz01234567890" {
- t.Errorf("Expected LogStreamName to be %s", "test-container/container-abcdefghijklmnopqrstuvwxyz01234567890")
- }
- }
- func TestIsSizedLogger(t *testing.T) {
- awslogs := &logStream{}
- assert.Implements(t, (*logger.SizedLogger)(nil), awslogs, "awslogs should implement SizedLogger")
- }
- func BenchmarkUnwrapEvents(b *testing.B) {
- events := make([]wrappedEvent, maximumLogEventsPerPut)
- for i := 0; i < maximumLogEventsPerPut; i++ {
- mes := strings.Repeat("0", maximumBytesPerEvent)
- events[i].inputLogEvent = &cloudwatchlogs.InputLogEvent{
- Message: &mes,
- }
- }
- as := assert.New(b)
- b.ResetTimer()
- for i := 0; i < b.N; i++ {
- res := unwrapEvents(events)
- as.Len(res, maximumLogEventsPerPut)
- }
- }
- func TestNewAWSLogsClientCredentialEndpointDetect(t *testing.T) {
- // required for the cloudwatchlogs client
- os.Setenv("AWS_REGION", "us-west-2")
- defer os.Unsetenv("AWS_REGION")
- credsResp := `{
- "AccessKeyId" : "test-access-key-id",
- "SecretAccessKey": "test-secret-access-key"
- }`
- expectedAccessKeyID := "test-access-key-id"
- expectedSecretAccessKey := "test-secret-access-key"
- testServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
- w.Header().Set("Content-Type", "application/json")
- fmt.Fprintln(w, credsResp)
- }))
- defer testServer.Close()
- // set the SDKEndpoint in the driver
- newSDKEndpoint = testServer.URL
- info := logger.Info{
- Config: map[string]string{},
- }
- info.Config["awslogs-credentials-endpoint"] = "/creds"
- c, err := newAWSLogsClient(info)
- assert.NoError(t, err)
- client := c.(*cloudwatchlogs.CloudWatchLogs)
- creds, err := client.Config.Credentials.Get()
- assert.NoError(t, err)
- assert.Equal(t, expectedAccessKeyID, creds.AccessKeyID)
- assert.Equal(t, expectedSecretAccessKey, creds.SecretAccessKey)
- }
- func TestNewAWSLogsClientCredentialEnvironmentVariable(t *testing.T) {
- // required for the cloudwatchlogs client
- os.Setenv("AWS_REGION", "us-west-2")
- defer os.Unsetenv("AWS_REGION")
- expectedAccessKeyID := "test-access-key-id"
- expectedSecretAccessKey := "test-secret-access-key"
- os.Setenv("AWS_ACCESS_KEY_ID", expectedAccessKeyID)
- defer os.Unsetenv("AWS_ACCESS_KEY_ID")
- os.Setenv("AWS_SECRET_ACCESS_KEY", expectedSecretAccessKey)
- defer os.Unsetenv("AWS_SECRET_ACCESS_KEY")
- info := logger.Info{
- Config: map[string]string{},
- }
- c, err := newAWSLogsClient(info)
- assert.NoError(t, err)
- client := c.(*cloudwatchlogs.CloudWatchLogs)
- creds, err := client.Config.Credentials.Get()
- assert.NoError(t, err)
- assert.Equal(t, expectedAccessKeyID, creds.AccessKeyID)
- assert.Equal(t, expectedSecretAccessKey, creds.SecretAccessKey)
- }
- func TestNewAWSLogsClientCredentialSharedFile(t *testing.T) {
- // required for the cloudwatchlogs client
- os.Setenv("AWS_REGION", "us-west-2")
- defer os.Unsetenv("AWS_REGION")
- expectedAccessKeyID := "test-access-key-id"
- expectedSecretAccessKey := "test-secret-access-key"
- contentStr := `
- [default]
- aws_access_key_id = "test-access-key-id"
- aws_secret_access_key = "test-secret-access-key"
- `
- content := []byte(contentStr)
- tmpfile, err := ioutil.TempFile("", "example")
- defer os.Remove(tmpfile.Name()) // clean up
- assert.NoError(t, err)
- _, err = tmpfile.Write(content)
- assert.NoError(t, err)
- err = tmpfile.Close()
- assert.NoError(t, err)
- os.Unsetenv("AWS_ACCESS_KEY_ID")
- os.Unsetenv("AWS_SECRET_ACCESS_KEY")
- os.Setenv("AWS_SHARED_CREDENTIALS_FILE", tmpfile.Name())
- defer os.Unsetenv("AWS_SHARED_CREDENTIALS_FILE")
- info := logger.Info{
- Config: map[string]string{},
- }
- c, err := newAWSLogsClient(info)
- assert.NoError(t, err)
- client := c.(*cloudwatchlogs.CloudWatchLogs)
- creds, err := client.Config.Credentials.Get()
- assert.NoError(t, err)
- assert.Equal(t, expectedAccessKeyID, creds.AccessKeyID)
- assert.Equal(t, expectedSecretAccessKey, creds.SecretAccessKey)
- }
|