Add awslogs multiline support
Signed-off-by: Justin Menga <justin.menga@gmail.com>
This commit is contained in:
parent
680084b2a2
commit
ab74038df9
2 changed files with 324 additions and 42 deletions
|
@ -6,6 +6,7 @@ import (
|
|||
"errors"
|
||||
"fmt"
|
||||
"os"
|
||||
"regexp"
|
||||
"runtime"
|
||||
"sort"
|
||||
"strconv"
|
||||
|
@ -34,6 +35,8 @@ const (
|
|||
logStreamKey = "awslogs-stream"
|
||||
logCreateGroupKey = "awslogs-create-group"
|
||||
tagKey = "tag"
|
||||
datetimeFormatKey = "awslogs-datetime-format"
|
||||
multilinePatternKey = "awslogs-multiline-pattern"
|
||||
batchPublishFrequency = 5 * time.Second
|
||||
|
||||
// See: http://docs.aws.amazon.com/AmazonCloudWatchLogs/latest/APIReference/API_PutLogEvents.html
|
||||
|
@ -53,14 +56,15 @@ const (
|
|||
)
|
||||
|
||||
type logStream struct {
|
||||
logStreamName string
|
||||
logGroupName string
|
||||
logCreateGroup bool
|
||||
client api
|
||||
messages chan *logger.Message
|
||||
lock sync.RWMutex
|
||||
closed bool
|
||||
sequenceToken *string
|
||||
logStreamName string
|
||||
logGroupName string
|
||||
logCreateGroup bool
|
||||
multilinePattern *regexp.Regexp
|
||||
client api
|
||||
messages chan *logger.Message
|
||||
lock sync.RWMutex
|
||||
closed bool
|
||||
sequenceToken *string
|
||||
}
|
||||
|
||||
type api interface {
|
||||
|
@ -89,9 +93,33 @@ func init() {
|
|||
}
|
||||
}
|
||||
|
||||
// Parses awslogs-multiline-pattern and awslogs-datetime-format options
|
||||
// If awslogs-datetime-format is present, convert the format from strftime
|
||||
// to regexp and return.
|
||||
// If awslogs-multiline-pattern is present, compile regexp and return
|
||||
func parseMultilineOptions(info logger.Info) (*regexp.Regexp, error) {
|
||||
dateTimeFormat := info.Config[datetimeFormatKey]
|
||||
multilinePatternKey := info.Config[multilinePatternKey]
|
||||
if dateTimeFormat != "" {
|
||||
r := regexp.MustCompile("%.")
|
||||
multilinePatternKey = r.ReplaceAllStringFunc(dateTimeFormat, func(s string) string {
|
||||
return strftimeToRegex[s]
|
||||
})
|
||||
}
|
||||
if multilinePatternKey != "" {
|
||||
multilinePattern, err := regexp.Compile(multilinePatternKey)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return multilinePattern, nil
|
||||
}
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
// New creates an awslogs logger using the configuration passed in on the
|
||||
// context. Supported context configuration variables are awslogs-region,
|
||||
// awslogs-group, awslogs-stream, and awslogs-create-group. When available, configuration is
|
||||
// awslogs-group, awslogs-stream, awslogs-create-group, awslogs-multiline-pattern
|
||||
// and awslogs-datetime-format. When available, configuration is
|
||||
// also taken from environment variables AWS_REGION, AWS_ACCESS_KEY_ID,
|
||||
// AWS_SECRET_ACCESS_KEY, the shared credentials file (~/.aws/credentials), and
|
||||
// the EC2 Instance Metadata Service.
|
||||
|
@ -112,16 +140,23 @@ func New(info logger.Info) (logger.Logger, error) {
|
|||
if info.Config[logStreamKey] != "" {
|
||||
logStreamName = info.Config[logStreamKey]
|
||||
}
|
||||
|
||||
multilinePattern, err := parseMultilineOptions(info)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
client, err := newAWSLogsClient(info)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
containerStream := &logStream{
|
||||
logStreamName: logStreamName,
|
||||
logGroupName: logGroupName,
|
||||
logCreateGroup: logCreateGroup,
|
||||
client: client,
|
||||
messages: make(chan *logger.Message, 4096),
|
||||
logStreamName: logStreamName,
|
||||
logGroupName: logGroupName,
|
||||
logCreateGroup: logCreateGroup,
|
||||
multilinePattern: multilinePattern,
|
||||
client: client,
|
||||
messages: make(chan *logger.Message, 4096),
|
||||
}
|
||||
err = containerStream.create()
|
||||
if err != nil {
|
||||
|
@ -309,48 +344,83 @@ var newTicker = func(freq time.Duration) *time.Ticker {
|
|||
func (l *logStream) collectBatch() {
|
||||
timer := newTicker(batchPublishFrequency)
|
||||
var events []wrappedEvent
|
||||
bytes := 0
|
||||
var eventBuffer []byte
|
||||
var eventBufferTimestamp int64
|
||||
for {
|
||||
select {
|
||||
case <-timer.C:
|
||||
case t := <-timer.C:
|
||||
// If event buffer is older than batch publish frequency flush the event buffer
|
||||
if eventBufferTimestamp > 0 && len(eventBuffer) > 0 {
|
||||
eventBufferAge := t.UnixNano()/int64(time.Millisecond) - eventBufferTimestamp
|
||||
if eventBufferAge > int64(batchPublishFrequency)/int64(time.Millisecond) {
|
||||
events = l.processEvent(events, eventBuffer, eventBufferTimestamp)
|
||||
}
|
||||
}
|
||||
l.publishBatch(events)
|
||||
events = events[:0]
|
||||
bytes = 0
|
||||
case msg, more := <-l.messages:
|
||||
if !more {
|
||||
// Flush event buffer
|
||||
events = l.processEvent(events, eventBuffer, eventBufferTimestamp)
|
||||
l.publishBatch(events)
|
||||
return
|
||||
}
|
||||
unprocessedLine := msg.Line
|
||||
for len(unprocessedLine) > 0 {
|
||||
// Split line length so it does not exceed the maximum
|
||||
lineBytes := len(unprocessedLine)
|
||||
if lineBytes > maximumBytesPerEvent {
|
||||
lineBytes = maximumBytesPerEvent
|
||||
}
|
||||
line := unprocessedLine[:lineBytes]
|
||||
unprocessedLine = unprocessedLine[lineBytes:]
|
||||
if (len(events) >= maximumLogEventsPerPut) || (bytes+lineBytes+perEventBytes > maximumBytesPerPut) {
|
||||
// Publish an existing batch if it's already over the maximum number of events or if adding this
|
||||
// event would push it over the maximum number of total bytes.
|
||||
l.publishBatch(events)
|
||||
events = events[:0]
|
||||
bytes = 0
|
||||
}
|
||||
events = append(events, wrappedEvent{
|
||||
inputLogEvent: &cloudwatchlogs.InputLogEvent{
|
||||
Message: aws.String(string(line)),
|
||||
Timestamp: aws.Int64(msg.Timestamp.UnixNano() / int64(time.Millisecond)),
|
||||
},
|
||||
insertOrder: len(events),
|
||||
})
|
||||
bytes += (lineBytes + perEventBytes)
|
||||
if eventBufferTimestamp == 0 {
|
||||
eventBufferTimestamp = msg.Timestamp.UnixNano() / int64(time.Millisecond)
|
||||
}
|
||||
unprocessedLine := msg.Line
|
||||
if l.multilinePattern != nil {
|
||||
if l.multilinePattern.Match(unprocessedLine) {
|
||||
// This is a new log event so flush the current eventBuffer to events
|
||||
events = l.processEvent(events, eventBuffer, eventBufferTimestamp)
|
||||
eventBufferTimestamp = msg.Timestamp.UnixNano() / int64(time.Millisecond)
|
||||
eventBuffer = eventBuffer[:0]
|
||||
}
|
||||
eventBuffer = append(eventBuffer, unprocessedLine...)
|
||||
// If we have exceeded max bytes per event flush the event buffer up to max bytes
|
||||
if len(eventBuffer) > maximumBytesPerEvent {
|
||||
events = l.processEvent(events, eventBuffer[:maximumBytesPerEvent], eventBufferTimestamp)
|
||||
eventBuffer = eventBuffer[maximumBytesPerEvent:]
|
||||
}
|
||||
logger.PutMessage(msg)
|
||||
continue
|
||||
}
|
||||
events = l.processEvent(events, unprocessedLine, msg.Timestamp.UnixNano()/int64(time.Millisecond))
|
||||
logger.PutMessage(msg)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// processEvent processes log events
|
||||
func (l *logStream) processEvent(events []wrappedEvent, unprocessedLine []byte, timestamp int64) []wrappedEvent {
|
||||
bytes := 0
|
||||
for len(unprocessedLine) > 0 {
|
||||
// Split line length so it does not exceed the maximum
|
||||
lineBytes := len(unprocessedLine)
|
||||
if lineBytes > maximumBytesPerEvent {
|
||||
lineBytes = maximumBytesPerEvent
|
||||
}
|
||||
line := unprocessedLine[:lineBytes]
|
||||
unprocessedLine = unprocessedLine[lineBytes:]
|
||||
if (len(events) >= maximumLogEventsPerPut) || (bytes+lineBytes+perEventBytes > maximumBytesPerPut) {
|
||||
// Publish an existing batch if it's already over the maximum number of events or if adding this
|
||||
// event would push it over the maximum number of total bytes.
|
||||
l.publishBatch(events)
|
||||
events = events[:0]
|
||||
bytes = 0
|
||||
}
|
||||
events = append(events, wrappedEvent{
|
||||
inputLogEvent: &cloudwatchlogs.InputLogEvent{
|
||||
Message: aws.String(string(line)),
|
||||
Timestamp: aws.Int64(timestamp),
|
||||
},
|
||||
insertOrder: len(events),
|
||||
})
|
||||
bytes += (lineBytes + perEventBytes)
|
||||
}
|
||||
return events
|
||||
}
|
||||
|
||||
// publishBatch calls PutLogEvents for a given set of InputLogEvents,
|
||||
// accounting for sequencing requirements (each request must reference the
|
||||
// sequence token returned by the previous request).
|
||||
|
@ -394,6 +464,29 @@ func (l *logStream) publishBatch(events []wrappedEvent) {
|
|||
}
|
||||
}
|
||||
|
||||
// Maps strftime format strings to regex
|
||||
var strftimeToRegex = map[string]string{
|
||||
/*weekdayShort */ `%a`: `(?:Mon|Tue|Wed|Thu|Fri|Sat|Sun)`,
|
||||
/*weekdayFull */ `%A`: `(?:Monday|Tuesday|Wednesday|Thursday|Friday|Saturday|Sunday)`,
|
||||
/*weekdayZeroIndex */ `%w`: `[0-6]`,
|
||||
/*dayZeroPadded */ `%d`: `(?:0[1-9]|[1,2][0-9]|3[0,1])`,
|
||||
/*monthShort */ `%b`: `(?:Jan|Feb|Mar|Apr|Jun|Jul|Aug|Sep|Oct|Nov|Dec)`,
|
||||
/*monthFull */ `%B`: `(?:January|February|March|April|June|July|August|September|October|November|December)`,
|
||||
/*monthZeroPadded */ `%m`: `(?:0[1-9]|1[0-2])`,
|
||||
/*yearCentury */ `%Y`: `\d{4}`,
|
||||
/*yearZeroPadded */ `%y`: `\d{2}`,
|
||||
/*hour24ZeroPadded */ `%H`: `(?:[0,1][0-9]|2[0-3])`,
|
||||
/*hour12ZeroPadded */ `%I`: `(?:0[0-9]|1[0-2])`,
|
||||
/*AM or PM */ `%p`: "[A,P]M",
|
||||
/*minuteZeroPadded */ `%M`: `[0-5][0-9]`,
|
||||
/*secondZeroPadded */ `%S`: `[0-5][0-9]`,
|
||||
/*microsecondZeroPadded */ `%f`: `\d{6}`,
|
||||
/*utcOffset */ `%z`: `[+-]\d{4}`,
|
||||
/*tzName */ `%Z`: `[A-Z]{1,4}T`,
|
||||
/*dayOfYearZeroPadded */ `%j`: `(?:0[0-9][1-9]|[1,2][0-9][0-9]|3[0-5][0-9]|36[0-6])`,
|
||||
/*milliseconds */ `%L`: `\.\d{3}`,
|
||||
}
|
||||
|
||||
// putLogEvents wraps the PutLogEvents API
|
||||
func (l *logStream) putLogEvents(events []*cloudwatchlogs.InputLogEvent, sequenceToken *string) (*string, error) {
|
||||
input := &cloudwatchlogs.PutLogEventsInput{
|
||||
|
@ -428,6 +521,8 @@ func ValidateLogOpt(cfg map[string]string) error {
|
|||
case logCreateGroupKey:
|
||||
case regionKey:
|
||||
case tagKey:
|
||||
case datetimeFormatKey:
|
||||
case multilinePatternKey:
|
||||
default:
|
||||
return fmt.Errorf("unknown log opt '%s' for %s log driver", key, name)
|
||||
}
|
||||
|
|
|
@ -5,6 +5,7 @@ import (
|
|||
"fmt"
|
||||
"net/http"
|
||||
"reflect"
|
||||
"regexp"
|
||||
"runtime"
|
||||
"strings"
|
||||
"testing"
|
||||
|
@ -24,7 +25,7 @@ const (
|
|||
streamName = "streamName"
|
||||
sequenceToken = "sequenceToken"
|
||||
nextSequenceToken = "nextSequenceToken"
|
||||
logline = "this is a log line"
|
||||
logline = "this is a log line\r"
|
||||
)
|
||||
|
||||
func TestNewAWSLogsClientUserAgentHandler(t *testing.T) {
|
||||
|
@ -471,6 +472,127 @@ func TestCollectBatchTicker(t *testing.T) {
|
|||
|
||||
}
|
||||
|
||||
func TestCollectBatchMultilinePattern(t *testing.T) {
|
||||
mockClient := newMockClient()
|
||||
multilinePattern := regexp.MustCompile("xxxx")
|
||||
stream := &logStream{
|
||||
client: mockClient,
|
||||
logGroupName: groupName,
|
||||
logStreamName: streamName,
|
||||
multilinePattern: multilinePattern,
|
||||
sequenceToken: aws.String(sequenceToken),
|
||||
messages: make(chan *logger.Message),
|
||||
}
|
||||
mockClient.putLogEventsResult <- &putLogEventsResult{
|
||||
successResult: &cloudwatchlogs.PutLogEventsOutput{
|
||||
NextSequenceToken: aws.String(nextSequenceToken),
|
||||
},
|
||||
}
|
||||
ticks := make(chan time.Time)
|
||||
newTicker = func(_ time.Duration) *time.Ticker {
|
||||
return &time.Ticker{
|
||||
C: ticks,
|
||||
}
|
||||
}
|
||||
|
||||
go stream.collectBatch()
|
||||
|
||||
stream.Log(&logger.Message{
|
||||
Line: []byte(logline),
|
||||
Timestamp: time.Now(),
|
||||
})
|
||||
stream.Log(&logger.Message{
|
||||
Line: []byte(logline),
|
||||
Timestamp: time.Now(),
|
||||
})
|
||||
stream.Log(&logger.Message{
|
||||
Line: []byte("xxxx " + logline),
|
||||
Timestamp: time.Now(),
|
||||
})
|
||||
|
||||
ticks <- time.Time{}
|
||||
|
||||
// Verify single multiline event
|
||||
argument := <-mockClient.putLogEventsArgument
|
||||
if argument == nil {
|
||||
t.Fatal("Expected non-nil PutLogEventsInput")
|
||||
}
|
||||
if len(argument.LogEvents) != 1 {
|
||||
t.Errorf("Expected LogEvents to contain 1 elements, but contains %d", len(argument.LogEvents))
|
||||
}
|
||||
if *argument.LogEvents[0].Message != logline+logline {
|
||||
t.Errorf("Expected message to be %s but was %s", logline+logline, *argument.LogEvents[0].Message)
|
||||
}
|
||||
|
||||
stream.Close()
|
||||
|
||||
// Verify single event
|
||||
argument = <-mockClient.putLogEventsArgument
|
||||
if argument == nil {
|
||||
t.Fatal("Expected non-nil PutLogEventsInput")
|
||||
}
|
||||
if len(argument.LogEvents) != 1 {
|
||||
t.Errorf("Expected LogEvents to contain 1 elements, but contains %d", len(argument.LogEvents))
|
||||
}
|
||||
if *argument.LogEvents[0].Message != "xxxx "+logline {
|
||||
t.Errorf("Expected message to be %s but was %s", "xxxx "+logline, *argument.LogEvents[0].Message)
|
||||
}
|
||||
}
|
||||
|
||||
func TestCollectBatchMultilinePatternMaxEventAge(t *testing.T) {
|
||||
mockClient := newMockClient()
|
||||
multilinePattern := regexp.MustCompile("xxxx")
|
||||
stream := &logStream{
|
||||
client: mockClient,
|
||||
logGroupName: groupName,
|
||||
logStreamName: streamName,
|
||||
multilinePattern: multilinePattern,
|
||||
sequenceToken: aws.String(sequenceToken),
|
||||
messages: make(chan *logger.Message),
|
||||
}
|
||||
mockClient.putLogEventsResult <- &putLogEventsResult{
|
||||
successResult: &cloudwatchlogs.PutLogEventsOutput{
|
||||
NextSequenceToken: aws.String(nextSequenceToken),
|
||||
},
|
||||
}
|
||||
ticks := make(chan time.Time)
|
||||
newTicker = func(_ time.Duration) *time.Ticker {
|
||||
return &time.Ticker{
|
||||
C: ticks,
|
||||
}
|
||||
}
|
||||
|
||||
go stream.collectBatch()
|
||||
|
||||
stream.Log(&logger.Message{
|
||||
Line: []byte(logline),
|
||||
Timestamp: time.Now(),
|
||||
})
|
||||
|
||||
// Log an event 1 second later
|
||||
stream.Log(&logger.Message{
|
||||
Line: []byte(logline),
|
||||
Timestamp: time.Now().Add(time.Second),
|
||||
})
|
||||
|
||||
// Fire ticker batchPublishFrequency seconds later
|
||||
ticks <- time.Now().Add(batchPublishFrequency * time.Second)
|
||||
|
||||
// Verify single multiline event is flushed after maximum event buffer age (batchPublishFrequency)
|
||||
argument := <-mockClient.putLogEventsArgument
|
||||
if argument == nil {
|
||||
t.Fatal("Expected non-nil PutLogEventsInput")
|
||||
}
|
||||
if len(argument.LogEvents) != 1 {
|
||||
t.Errorf("Expected LogEvents to contain 1 elements, but contains %d", len(argument.LogEvents))
|
||||
}
|
||||
if *argument.LogEvents[0].Message != logline+logline {
|
||||
t.Errorf("Expected message to be %s but was %s", logline+logline, *argument.LogEvents[0].Message)
|
||||
}
|
||||
|
||||
stream.Close()
|
||||
}
|
||||
|
||||
func TestCollectBatchClose(t *testing.T) {
|
||||
mockClient := newMockClient()
|
||||
stream := &logStream{
|
||||
|
@ -724,6 +846,71 @@ func TestCollectBatchWithDuplicateTimestamps(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
func TestParseLogOptionsMultilinePattern(t *testing.T) {
|
||||
info := logger.Info{
|
||||
Config: map[string]string{
|
||||
multilinePatternKey: "^xxxx",
|
||||
},
|
||||
}
|
||||
|
||||
multilinePattern, err := parseMultilineOptions(info)
|
||||
if err != nil {
|
||||
t.Errorf("Received unexpected err: %v\n", err)
|
||||
}
|
||||
if !multilinePattern.MatchString("xxxx") {
|
||||
t.Errorf("Expected multilinePattern to match string xxxx but no match found")
|
||||
}
|
||||
}
|
||||
|
||||
func TestParseLogOptionsDatetimeFormatSupersedesMultilinePattern(t *testing.T) {
|
||||
info := logger.Info{
|
||||
Config: map[string]string{
|
||||
multilinePatternKey: "^xxxx",
|
||||
datetimeFormatKey: "%Y-%m-%d",
|
||||
},
|
||||
}
|
||||
|
||||
multilinePattern, err := parseMultilineOptions(info)
|
||||
if err != nil {
|
||||
t.Errorf("Received unexpected err: %v\n", err)
|
||||
}
|
||||
if multilinePattern.MatchString("xxxx") {
|
||||
t.Errorf("Expected multilinePattern to NOT match string xxxx but match was made")
|
||||
}
|
||||
if !multilinePattern.MatchString("2017-01-01") {
|
||||
t.Errorf("Expected multilinePattern to match string 2017-01-01 but no match found")
|
||||
}
|
||||
}
|
||||
|
||||
func TestParseLogOptionsDatetimeFormat(t *testing.T) {
|
||||
datetimeFormatTests := []struct {
|
||||
format string
|
||||
match string
|
||||
}{
|
||||
{"%d/%m/%y %a %H:%M:%S%L %Z", "31/12/10 Mon 08:42:44.345 NZDT"},
|
||||
{"%Y-%m-%d %A %I:%M:%S.%f%p%z", "2007-12-04 Monday 08:42:44.123456AM+1200"},
|
||||
{"%b|%b|%b|%b|%b|%b|%b|%b|%b|%b|%b|%b", "Jan|Feb|Mar|Apr|Jun|Jul|Aug|Sep|Oct|Nov|Dec"},
|
||||
{"%B|%B|%B|%B|%B|%B|%B|%B|%B|%B|%B|%B", "January|February|March|April|June|July|August|September|October|November|December"},
|
||||
{"%A|%A|%A|%A|%A|%A|%A", "Monday|Tuesday|Wednesday|Thursday|Friday|Saturday|Sunday"},
|
||||
{"%a|%a|%a|%a|%a|%a|%a", "Mon|Tue|Wed|Thu|Fri|Sat|Sun"},
|
||||
{"Day of the week: %w, Day of the year: %j", "Day of the week: 4, Day of the year: 091"},
|
||||
}
|
||||
for _, dt := range datetimeFormatTests {
|
||||
info := logger.Info{
|
||||
Config: map[string]string{
|
||||
datetimeFormatKey: dt.format,
|
||||
},
|
||||
}
|
||||
multilinePattern, err := parseMultilineOptions(info)
|
||||
if err != nil {
|
||||
t.Errorf("Received unexpected err: %v\n", err)
|
||||
}
|
||||
if !multilinePattern.MatchString(dt.match) {
|
||||
t.Errorf("Expected multilinePattern %s to match string %s but no match found", dt.format, dt.match)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestCreateTagSuccess(t *testing.T) {
|
||||
mockClient := newMockClient()
|
||||
info := logger.Info{
|
||||
|
|
Loading…
Add table
Reference in a new issue