diff --git a/daemon/logger/awslogs/cloudwatchlogs.go b/daemon/logger/awslogs/cloudwatchlogs.go index ba9455e6ac..ed62e53896 100644 --- a/daemon/logger/awslogs/cloudwatchlogs.go +++ b/daemon/logger/awslogs/cloudwatchlogs.go @@ -6,6 +6,7 @@ import ( "errors" "fmt" "os" + "regexp" "runtime" "sort" "strconv" @@ -34,6 +35,8 @@ const ( logStreamKey = "awslogs-stream" logCreateGroupKey = "awslogs-create-group" tagKey = "tag" + datetimeFormatKey = "awslogs-datetime-format" + multilinePatternKey = "awslogs-multiline-pattern" batchPublishFrequency = 5 * time.Second // See: http://docs.aws.amazon.com/AmazonCloudWatchLogs/latest/APIReference/API_PutLogEvents.html @@ -53,14 +56,15 @@ const ( ) type logStream struct { - logStreamName string - logGroupName string - logCreateGroup bool - client api - messages chan *logger.Message - lock sync.RWMutex - closed bool - sequenceToken *string + logStreamName string + logGroupName string + logCreateGroup bool + multilinePattern *regexp.Regexp + client api + messages chan *logger.Message + lock sync.RWMutex + closed bool + sequenceToken *string } type api interface { @@ -89,9 +93,33 @@ func init() { } } +// Parses awslogs-multiline-pattern and awslogs-datetime-format options +// If awslogs-datetime-format is present, convert the format from strftime +// to regexp and return. +// If awslogs-multiline-pattern is present, compile regexp and return +func parseMultilineOptions(info logger.Info) (*regexp.Regexp, error) { + dateTimeFormat := info.Config[datetimeFormatKey] + multilinePatternKey := info.Config[multilinePatternKey] + if dateTimeFormat != "" { + r := regexp.MustCompile("%.") + multilinePatternKey = r.ReplaceAllStringFunc(dateTimeFormat, func(s string) string { + return strftimeToRegex[s] + }) + } + if multilinePatternKey != "" { + multilinePattern, err := regexp.Compile(multilinePatternKey) + if err != nil { + return nil, err + } + return multilinePattern, nil + } + return nil, nil +} + // New creates an awslogs logger using the configuration passed in on the // context. Supported context configuration variables are awslogs-region, -// awslogs-group, awslogs-stream, and awslogs-create-group. When available, configuration is +// awslogs-group, awslogs-stream, awslogs-create-group, awslogs-multiline-pattern +// and awslogs-datetime-format. When available, configuration is // also taken from environment variables AWS_REGION, AWS_ACCESS_KEY_ID, // AWS_SECRET_ACCESS_KEY, the shared credentials file (~/.aws/credentials), and // the EC2 Instance Metadata Service. @@ -112,16 +140,23 @@ func New(info logger.Info) (logger.Logger, error) { if info.Config[logStreamKey] != "" { logStreamName = info.Config[logStreamKey] } + + multilinePattern, err := parseMultilineOptions(info) + if err != nil { + return nil, err + } + client, err := newAWSLogsClient(info) if err != nil { return nil, err } containerStream := &logStream{ - logStreamName: logStreamName, - logGroupName: logGroupName, - logCreateGroup: logCreateGroup, - client: client, - messages: make(chan *logger.Message, 4096), + logStreamName: logStreamName, + logGroupName: logGroupName, + logCreateGroup: logCreateGroup, + multilinePattern: multilinePattern, + client: client, + messages: make(chan *logger.Message, 4096), } err = containerStream.create() if err != nil { @@ -309,48 +344,83 @@ var newTicker = func(freq time.Duration) *time.Ticker { func (l *logStream) collectBatch() { timer := newTicker(batchPublishFrequency) var events []wrappedEvent - bytes := 0 + var eventBuffer []byte + var eventBufferTimestamp int64 for { select { - case <-timer.C: + case t := <-timer.C: + // If event buffer is older than batch publish frequency flush the event buffer + if eventBufferTimestamp > 0 && len(eventBuffer) > 0 { + eventBufferAge := t.UnixNano()/int64(time.Millisecond) - eventBufferTimestamp + if eventBufferAge > int64(batchPublishFrequency)/int64(time.Millisecond) { + events = l.processEvent(events, eventBuffer, eventBufferTimestamp) + } + } l.publishBatch(events) events = events[:0] - bytes = 0 case msg, more := <-l.messages: if !more { + // Flush event buffer + events = l.processEvent(events, eventBuffer, eventBufferTimestamp) l.publishBatch(events) return } - unprocessedLine := msg.Line - for len(unprocessedLine) > 0 { - // Split line length so it does not exceed the maximum - lineBytes := len(unprocessedLine) - if lineBytes > maximumBytesPerEvent { - lineBytes = maximumBytesPerEvent - } - line := unprocessedLine[:lineBytes] - unprocessedLine = unprocessedLine[lineBytes:] - if (len(events) >= maximumLogEventsPerPut) || (bytes+lineBytes+perEventBytes > maximumBytesPerPut) { - // Publish an existing batch if it's already over the maximum number of events or if adding this - // event would push it over the maximum number of total bytes. - l.publishBatch(events) - events = events[:0] - bytes = 0 - } - events = append(events, wrappedEvent{ - inputLogEvent: &cloudwatchlogs.InputLogEvent{ - Message: aws.String(string(line)), - Timestamp: aws.Int64(msg.Timestamp.UnixNano() / int64(time.Millisecond)), - }, - insertOrder: len(events), - }) - bytes += (lineBytes + perEventBytes) + if eventBufferTimestamp == 0 { + eventBufferTimestamp = msg.Timestamp.UnixNano() / int64(time.Millisecond) } + unprocessedLine := msg.Line + if l.multilinePattern != nil { + if l.multilinePattern.Match(unprocessedLine) { + // This is a new log event so flush the current eventBuffer to events + events = l.processEvent(events, eventBuffer, eventBufferTimestamp) + eventBufferTimestamp = msg.Timestamp.UnixNano() / int64(time.Millisecond) + eventBuffer = eventBuffer[:0] + } + eventBuffer = append(eventBuffer, unprocessedLine...) + // If we have exceeded max bytes per event flush the event buffer up to max bytes + if len(eventBuffer) > maximumBytesPerEvent { + events = l.processEvent(events, eventBuffer[:maximumBytesPerEvent], eventBufferTimestamp) + eventBuffer = eventBuffer[maximumBytesPerEvent:] + } + logger.PutMessage(msg) + continue + } + events = l.processEvent(events, unprocessedLine, msg.Timestamp.UnixNano()/int64(time.Millisecond)) logger.PutMessage(msg) } } } +// processEvent processes log events +func (l *logStream) processEvent(events []wrappedEvent, unprocessedLine []byte, timestamp int64) []wrappedEvent { + bytes := 0 + for len(unprocessedLine) > 0 { + // Split line length so it does not exceed the maximum + lineBytes := len(unprocessedLine) + if lineBytes > maximumBytesPerEvent { + lineBytes = maximumBytesPerEvent + } + line := unprocessedLine[:lineBytes] + unprocessedLine = unprocessedLine[lineBytes:] + if (len(events) >= maximumLogEventsPerPut) || (bytes+lineBytes+perEventBytes > maximumBytesPerPut) { + // Publish an existing batch if it's already over the maximum number of events or if adding this + // event would push it over the maximum number of total bytes. + l.publishBatch(events) + events = events[:0] + bytes = 0 + } + events = append(events, wrappedEvent{ + inputLogEvent: &cloudwatchlogs.InputLogEvent{ + Message: aws.String(string(line)), + Timestamp: aws.Int64(timestamp), + }, + insertOrder: len(events), + }) + bytes += (lineBytes + perEventBytes) + } + return events +} + // publishBatch calls PutLogEvents for a given set of InputLogEvents, // accounting for sequencing requirements (each request must reference the // sequence token returned by the previous request). @@ -394,6 +464,29 @@ func (l *logStream) publishBatch(events []wrappedEvent) { } } +// Maps strftime format strings to regex +var strftimeToRegex = map[string]string{ + /*weekdayShort */ `%a`: `(?:Mon|Tue|Wed|Thu|Fri|Sat|Sun)`, + /*weekdayFull */ `%A`: `(?:Monday|Tuesday|Wednesday|Thursday|Friday|Saturday|Sunday)`, + /*weekdayZeroIndex */ `%w`: `[0-6]`, + /*dayZeroPadded */ `%d`: `(?:0[1-9]|[1,2][0-9]|3[0,1])`, + /*monthShort */ `%b`: `(?:Jan|Feb|Mar|Apr|Jun|Jul|Aug|Sep|Oct|Nov|Dec)`, + /*monthFull */ `%B`: `(?:January|February|March|April|June|July|August|September|October|November|December)`, + /*monthZeroPadded */ `%m`: `(?:0[1-9]|1[0-2])`, + /*yearCentury */ `%Y`: `\d{4}`, + /*yearZeroPadded */ `%y`: `\d{2}`, + /*hour24ZeroPadded */ `%H`: `(?:[0,1][0-9]|2[0-3])`, + /*hour12ZeroPadded */ `%I`: `(?:0[0-9]|1[0-2])`, + /*AM or PM */ `%p`: "[A,P]M", + /*minuteZeroPadded */ `%M`: `[0-5][0-9]`, + /*secondZeroPadded */ `%S`: `[0-5][0-9]`, + /*microsecondZeroPadded */ `%f`: `\d{6}`, + /*utcOffset */ `%z`: `[+-]\d{4}`, + /*tzName */ `%Z`: `[A-Z]{1,4}T`, + /*dayOfYearZeroPadded */ `%j`: `(?:0[0-9][1-9]|[1,2][0-9][0-9]|3[0-5][0-9]|36[0-6])`, + /*milliseconds */ `%L`: `\.\d{3}`, +} + // putLogEvents wraps the PutLogEvents API func (l *logStream) putLogEvents(events []*cloudwatchlogs.InputLogEvent, sequenceToken *string) (*string, error) { input := &cloudwatchlogs.PutLogEventsInput{ @@ -428,6 +521,8 @@ func ValidateLogOpt(cfg map[string]string) error { case logCreateGroupKey: case regionKey: case tagKey: + case datetimeFormatKey: + case multilinePatternKey: default: return fmt.Errorf("unknown log opt '%s' for %s log driver", key, name) } diff --git a/daemon/logger/awslogs/cloudwatchlogs_test.go b/daemon/logger/awslogs/cloudwatchlogs_test.go index ac0bb09c39..48c4395a04 100644 --- a/daemon/logger/awslogs/cloudwatchlogs_test.go +++ b/daemon/logger/awslogs/cloudwatchlogs_test.go @@ -5,6 +5,7 @@ import ( "fmt" "net/http" "reflect" + "regexp" "runtime" "strings" "testing" @@ -24,7 +25,7 @@ const ( streamName = "streamName" sequenceToken = "sequenceToken" nextSequenceToken = "nextSequenceToken" - logline = "this is a log line" + logline = "this is a log line\r" ) func TestNewAWSLogsClientUserAgentHandler(t *testing.T) { @@ -471,6 +472,127 @@ func TestCollectBatchTicker(t *testing.T) { } +func TestCollectBatchMultilinePattern(t *testing.T) { + mockClient := newMockClient() + multilinePattern := regexp.MustCompile("xxxx") + stream := &logStream{ + client: mockClient, + logGroupName: groupName, + logStreamName: streamName, + multilinePattern: multilinePattern, + sequenceToken: aws.String(sequenceToken), + messages: make(chan *logger.Message), + } + mockClient.putLogEventsResult <- &putLogEventsResult{ + successResult: &cloudwatchlogs.PutLogEventsOutput{ + NextSequenceToken: aws.String(nextSequenceToken), + }, + } + ticks := make(chan time.Time) + newTicker = func(_ time.Duration) *time.Ticker { + return &time.Ticker{ + C: ticks, + } + } + + go stream.collectBatch() + + stream.Log(&logger.Message{ + Line: []byte(logline), + Timestamp: time.Now(), + }) + stream.Log(&logger.Message{ + Line: []byte(logline), + Timestamp: time.Now(), + }) + stream.Log(&logger.Message{ + Line: []byte("xxxx " + logline), + Timestamp: time.Now(), + }) + + ticks <- time.Time{} + + // Verify single multiline event + argument := <-mockClient.putLogEventsArgument + if argument == nil { + t.Fatal("Expected non-nil PutLogEventsInput") + } + if len(argument.LogEvents) != 1 { + t.Errorf("Expected LogEvents to contain 1 elements, but contains %d", len(argument.LogEvents)) + } + if *argument.LogEvents[0].Message != logline+logline { + t.Errorf("Expected message to be %s but was %s", logline+logline, *argument.LogEvents[0].Message) + } + + stream.Close() + + // Verify single event + argument = <-mockClient.putLogEventsArgument + if argument == nil { + t.Fatal("Expected non-nil PutLogEventsInput") + } + if len(argument.LogEvents) != 1 { + t.Errorf("Expected LogEvents to contain 1 elements, but contains %d", len(argument.LogEvents)) + } + if *argument.LogEvents[0].Message != "xxxx "+logline { + t.Errorf("Expected message to be %s but was %s", "xxxx "+logline, *argument.LogEvents[0].Message) + } +} + +func TestCollectBatchMultilinePatternMaxEventAge(t *testing.T) { + mockClient := newMockClient() + multilinePattern := regexp.MustCompile("xxxx") + stream := &logStream{ + client: mockClient, + logGroupName: groupName, + logStreamName: streamName, + multilinePattern: multilinePattern, + sequenceToken: aws.String(sequenceToken), + messages: make(chan *logger.Message), + } + mockClient.putLogEventsResult <- &putLogEventsResult{ + successResult: &cloudwatchlogs.PutLogEventsOutput{ + NextSequenceToken: aws.String(nextSequenceToken), + }, + } + ticks := make(chan time.Time) + newTicker = func(_ time.Duration) *time.Ticker { + return &time.Ticker{ + C: ticks, + } + } + + go stream.collectBatch() + + stream.Log(&logger.Message{ + Line: []byte(logline), + Timestamp: time.Now(), + }) + + // Log an event 1 second later + stream.Log(&logger.Message{ + Line: []byte(logline), + Timestamp: time.Now().Add(time.Second), + }) + + // Fire ticker batchPublishFrequency seconds later + ticks <- time.Now().Add(batchPublishFrequency * time.Second) + + // Verify single multiline event is flushed after maximum event buffer age (batchPublishFrequency) + argument := <-mockClient.putLogEventsArgument + if argument == nil { + t.Fatal("Expected non-nil PutLogEventsInput") + } + if len(argument.LogEvents) != 1 { + t.Errorf("Expected LogEvents to contain 1 elements, but contains %d", len(argument.LogEvents)) + } + if *argument.LogEvents[0].Message != logline+logline { + t.Errorf("Expected message to be %s but was %s", logline+logline, *argument.LogEvents[0].Message) + } + + stream.Close() +} + func TestCollectBatchClose(t *testing.T) { mockClient := newMockClient() stream := &logStream{ @@ -724,6 +846,71 @@ func TestCollectBatchWithDuplicateTimestamps(t *testing.T) { } } +func TestParseLogOptionsMultilinePattern(t *testing.T) { + info := logger.Info{ + Config: map[string]string{ + multilinePatternKey: "^xxxx", + }, + } + + multilinePattern, err := parseMultilineOptions(info) + if err != nil { + t.Errorf("Received unexpected err: %v\n", err) + } + if !multilinePattern.MatchString("xxxx") { + t.Errorf("Expected multilinePattern to match string xxxx but no match found") + } +} + +func TestParseLogOptionsDatetimeFormatSupersedesMultilinePattern(t *testing.T) { + info := logger.Info{ + Config: map[string]string{ + multilinePatternKey: "^xxxx", + datetimeFormatKey: "%Y-%m-%d", + }, + } + + multilinePattern, err := parseMultilineOptions(info) + if err != nil { + t.Errorf("Received unexpected err: %v\n", err) + } + if multilinePattern.MatchString("xxxx") { + t.Errorf("Expected multilinePattern to NOT match string xxxx but match was made") + } + if !multilinePattern.MatchString("2017-01-01") { + t.Errorf("Expected multilinePattern to match string 2017-01-01 but no match found") + } +} + +func TestParseLogOptionsDatetimeFormat(t *testing.T) { + datetimeFormatTests := []struct { + format string + match string + }{ + {"%d/%m/%y %a %H:%M:%S%L %Z", "31/12/10 Mon 08:42:44.345 NZDT"}, + {"%Y-%m-%d %A %I:%M:%S.%f%p%z", "2007-12-04 Monday 08:42:44.123456AM+1200"}, + {"%b|%b|%b|%b|%b|%b|%b|%b|%b|%b|%b|%b", "Jan|Feb|Mar|Apr|Jun|Jul|Aug|Sep|Oct|Nov|Dec"}, + {"%B|%B|%B|%B|%B|%B|%B|%B|%B|%B|%B|%B", "January|February|March|April|June|July|August|September|October|November|December"}, + {"%A|%A|%A|%A|%A|%A|%A", "Monday|Tuesday|Wednesday|Thursday|Friday|Saturday|Sunday"}, + {"%a|%a|%a|%a|%a|%a|%a", "Mon|Tue|Wed|Thu|Fri|Sat|Sun"}, + {"Day of the week: %w, Day of the year: %j", "Day of the week: 4, Day of the year: 091"}, + } + for _, dt := range datetimeFormatTests { + info := logger.Info{ + Config: map[string]string{ + datetimeFormatKey: dt.format, + }, + } + multilinePattern, err := parseMultilineOptions(info) + if err != nil { + t.Errorf("Received unexpected err: %v\n", err) + } + if !multilinePattern.MatchString(dt.match) { + t.Errorf("Expected multilinePattern %s to match string %s but no match found", dt.format, dt.match) + } + } +} + func TestCreateTagSuccess(t *testing.T) { mockClient := newMockClient() info := logger.Info{