Sfoglia il codice sorgente

Merge pull request #38407 from maximilianomaccanti/master

Add two configurable options to awslogs driver
Yong Tang 6 anni fa
parent
commit
626022d0f6

+ 111 - 42
daemon/logger/awslogs/cloudwatchlogs.go

@@ -39,7 +39,11 @@ const (
 	datetimeFormatKey      = "awslogs-datetime-format"
 	datetimeFormatKey      = "awslogs-datetime-format"
 	multilinePatternKey    = "awslogs-multiline-pattern"
 	multilinePatternKey    = "awslogs-multiline-pattern"
 	credentialsEndpointKey = "awslogs-credentials-endpoint"
 	credentialsEndpointKey = "awslogs-credentials-endpoint"
-	batchPublishFrequency  = 5 * time.Second
+	forceFlushIntervalKey  = "awslogs-force-flush-interval-seconds"
+	maxBufferedEventsKey   = "awslogs-max-buffered-events"
+
+	defaultForceFlushInterval = 5 * time.Second
+	defaultMaxBufferedEvents  = 4096
 
 
 	// See: http://docs.aws.amazon.com/AmazonCloudWatchLogs/latest/APIReference/API_PutLogEvents.html
 	// See: http://docs.aws.amazon.com/AmazonCloudWatchLogs/latest/APIReference/API_PutLogEvents.html
 	perEventBytes          = 26
 	perEventBytes          = 26
@@ -64,16 +68,27 @@ const (
 )
 )
 
 
 type logStream struct {
 type logStream struct {
-	logStreamName    string
-	logGroupName     string
-	logCreateGroup   bool
-	logNonBlocking   bool
-	multilinePattern *regexp.Regexp
-	client           api
-	messages         chan *logger.Message
-	lock             sync.RWMutex
-	closed           bool
-	sequenceToken    *string
+	logStreamName      string
+	logGroupName       string
+	logCreateGroup     bool
+	logNonBlocking     bool
+	forceFlushInterval time.Duration
+	multilinePattern   *regexp.Regexp
+	client             api
+	messages           chan *logger.Message
+	lock               sync.RWMutex
+	closed             bool
+	sequenceToken      *string
+}
+
+type logStreamConfig struct {
+	logStreamName      string
+	logGroupName       string
+	logCreateGroup     bool
+	logNonBlocking     bool
+	forceFlushInterval time.Duration
+	maxBufferedEvents  int
+	multilinePattern   *regexp.Regexp
 }
 }
 
 
 var _ logger.SizedLogger = &logStream{}
 var _ logger.SizedLogger = &logStream{}
@@ -123,47 +138,28 @@ type eventBatch struct {
 // AWS_REGION, AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY, the shared credentials
 // AWS_REGION, AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY, the shared credentials
 // file (~/.aws/credentials), and the EC2 Instance Metadata Service.
 // file (~/.aws/credentials), and the EC2 Instance Metadata Service.
 func New(info logger.Info) (logger.Logger, error) {
 func New(info logger.Info) (logger.Logger, error) {
-	logGroupName := info.Config[logGroupKey]
-	logStreamName, err := loggerutils.ParseLogTag(info, "{{.FullID}}")
+	containerStreamConfig, err := newStreamConfig(info)
 	if err != nil {
 	if err != nil {
 		return nil, err
 		return nil, err
 	}
 	}
-	logCreateGroup := false
-	if info.Config[logCreateGroupKey] != "" {
-		logCreateGroup, err = strconv.ParseBool(info.Config[logCreateGroupKey])
-		if err != nil {
-			return nil, err
-		}
-	}
-
-	logNonBlocking := info.Config["mode"] == "non-blocking"
-
-	if info.Config[logStreamKey] != "" {
-		logStreamName = info.Config[logStreamKey]
-	}
-
-	multilinePattern, err := parseMultilineOptions(info)
-	if err != nil {
-		return nil, err
-	}
-
 	client, err := newAWSLogsClient(info)
 	client, err := newAWSLogsClient(info)
 	if err != nil {
 	if err != nil {
 		return nil, err
 		return nil, err
 	}
 	}
 
 
 	containerStream := &logStream{
 	containerStream := &logStream{
-		logStreamName:    logStreamName,
-		logGroupName:     logGroupName,
-		logCreateGroup:   logCreateGroup,
-		logNonBlocking:   logNonBlocking,
-		multilinePattern: multilinePattern,
-		client:           client,
-		messages:         make(chan *logger.Message, 4096),
+		logStreamName:      containerStreamConfig.logStreamName,
+		logGroupName:       containerStreamConfig.logGroupName,
+		logCreateGroup:     containerStreamConfig.logCreateGroup,
+		logNonBlocking:     containerStreamConfig.logNonBlocking,
+		forceFlushInterval: containerStreamConfig.forceFlushInterval,
+		multilinePattern:   containerStreamConfig.multilinePattern,
+		client:             client,
+		messages:           make(chan *logger.Message, containerStreamConfig.maxBufferedEvents),
 	}
 	}
 
 
 	creationDone := make(chan bool)
 	creationDone := make(chan bool)
-	if logNonBlocking {
+	if containerStream.logNonBlocking {
 		go func() {
 		go func() {
 			backoff := 1
 			backoff := 1
 			maxBackoff := 32
 			maxBackoff := 32
@@ -203,6 +199,63 @@ func New(info logger.Info) (logger.Logger, error) {
 	return containerStream, nil
 	return containerStream, nil
 }
 }
 
 
+// Parses most of the awslogs- options and prepares a config object to be used for newing the actual stream
+// It has been formed out to ease Utest of the New above
+func newStreamConfig(info logger.Info) (*logStreamConfig, error) {
+	logGroupName := info.Config[logGroupKey]
+	logStreamName, err := loggerutils.ParseLogTag(info, "{{.FullID}}")
+	if err != nil {
+		return nil, err
+	}
+	logCreateGroup := false
+	if info.Config[logCreateGroupKey] != "" {
+		logCreateGroup, err = strconv.ParseBool(info.Config[logCreateGroupKey])
+		if err != nil {
+			return nil, err
+		}
+	}
+
+	logNonBlocking := info.Config["mode"] == "non-blocking"
+
+	forceFlushInterval := defaultForceFlushInterval
+	if info.Config[forceFlushIntervalKey] != "" {
+		forceFlushIntervalAsInt, err := strconv.Atoi(info.Config[forceFlushIntervalKey])
+		if err != nil {
+			return nil, err
+		}
+		forceFlushInterval = time.Duration(forceFlushIntervalAsInt) * time.Second
+	}
+
+	maxBufferedEvents := int(defaultMaxBufferedEvents)
+	if info.Config[maxBufferedEventsKey] != "" {
+		maxBufferedEvents, err = strconv.Atoi(info.Config[maxBufferedEventsKey])
+		if err != nil {
+			return nil, err
+		}
+	}
+
+	if info.Config[logStreamKey] != "" {
+		logStreamName = info.Config[logStreamKey]
+	}
+
+	multilinePattern, err := parseMultilineOptions(info)
+	if err != nil {
+		return nil, err
+	}
+
+	containerStreamConfig := &logStreamConfig{
+		logStreamName:      logStreamName,
+		logGroupName:       logGroupName,
+		logCreateGroup:     logCreateGroup,
+		logNonBlocking:     logNonBlocking,
+		forceFlushInterval: forceFlushInterval,
+		maxBufferedEvents:  maxBufferedEvents,
+		multilinePattern:   multilinePattern,
+	}
+
+	return containerStreamConfig, nil
+}
+
 // Parses awslogs-multiline-pattern and awslogs-datetime-format options
 // Parses awslogs-multiline-pattern and awslogs-datetime-format options
 // If awslogs-datetime-format is present, convert the format from strftime
 // If awslogs-datetime-format is present, convert the format from strftime
 // to regexp and return.
 // to regexp and return.
@@ -471,7 +524,11 @@ var newTicker = func(freq time.Duration) *time.Ticker {
 func (l *logStream) collectBatch(created chan bool) {
 func (l *logStream) collectBatch(created chan bool) {
 	// Wait for the logstream/group to be created
 	// Wait for the logstream/group to be created
 	<-created
 	<-created
-	ticker := newTicker(batchPublishFrequency)
+	flushInterval := l.forceFlushInterval
+	if flushInterval <= 0 {
+		flushInterval = defaultForceFlushInterval
+	}
+	ticker := newTicker(flushInterval)
 	var eventBuffer []byte
 	var eventBuffer []byte
 	var eventBufferTimestamp int64
 	var eventBufferTimestamp int64
 	var batch = newEventBatch()
 	var batch = newEventBatch()
@@ -481,7 +538,7 @@ func (l *logStream) collectBatch(created chan bool) {
 			// If event buffer is older than batch publish frequency flush the event buffer
 			// If event buffer is older than batch publish frequency flush the event buffer
 			if eventBufferTimestamp > 0 && len(eventBuffer) > 0 {
 			if eventBufferTimestamp > 0 && len(eventBuffer) > 0 {
 				eventBufferAge := t.UnixNano()/int64(time.Millisecond) - eventBufferTimestamp
 				eventBufferAge := t.UnixNano()/int64(time.Millisecond) - eventBufferTimestamp
-				eventBufferExpired := eventBufferAge >= int64(batchPublishFrequency)/int64(time.Millisecond)
+				eventBufferExpired := eventBufferAge >= int64(flushInterval)/int64(time.Millisecond)
 				eventBufferNegative := eventBufferAge < 0
 				eventBufferNegative := eventBufferAge < 0
 				if eventBufferExpired || eventBufferNegative {
 				if eventBufferExpired || eventBufferNegative {
 					l.processEvent(batch, eventBuffer, eventBufferTimestamp)
 					l.processEvent(batch, eventBuffer, eventBufferTimestamp)
@@ -672,6 +729,8 @@ func ValidateLogOpt(cfg map[string]string) error {
 		case datetimeFormatKey:
 		case datetimeFormatKey:
 		case multilinePatternKey:
 		case multilinePatternKey:
 		case credentialsEndpointKey:
 		case credentialsEndpointKey:
+		case forceFlushIntervalKey:
+		case maxBufferedEventsKey:
 		default:
 		default:
 			return fmt.Errorf("unknown log opt '%s' for %s log driver", key, name)
 			return fmt.Errorf("unknown log opt '%s' for %s log driver", key, name)
 		}
 		}
@@ -684,6 +743,16 @@ func ValidateLogOpt(cfg map[string]string) error {
 			return fmt.Errorf("must specify valid value for log opt '%s': %v", logCreateGroupKey, err)
 			return fmt.Errorf("must specify valid value for log opt '%s': %v", logCreateGroupKey, err)
 		}
 		}
 	}
 	}
+	if cfg[forceFlushIntervalKey] != "" {
+		if value, err := strconv.Atoi(cfg[forceFlushIntervalKey]); err != nil || value <= 0 {
+			return fmt.Errorf("must specify a positive integer for log opt '%s': %v", forceFlushIntervalKey, cfg[forceFlushIntervalKey])
+		}
+	}
+	if cfg[maxBufferedEventsKey] != "" {
+		if value, err := strconv.Atoi(cfg[maxBufferedEventsKey]); err != nil || value <= 0 {
+			return fmt.Errorf("must specify a positive integer for log opt '%s': %v", maxBufferedEventsKey, cfg[maxBufferedEventsKey])
+		}
+	}
 	_, datetimeFormatKeyExists := cfg[datetimeFormatKey]
 	_, datetimeFormatKeyExists := cfg[datetimeFormatKey]
 	_, multilinePatternKeyExists := cfg[multilinePatternKey]
 	_, multilinePatternKeyExists := cfg[multilinePatternKey]
 	if datetimeFormatKeyExists && multilinePatternKeyExists {
 	if datetimeFormatKeyExists && multilinePatternKeyExists {

+ 124 - 6
daemon/logger/awslogs/cloudwatchlogs_test.go

@@ -10,6 +10,7 @@ import (
 	"reflect"
 	"reflect"
 	"regexp"
 	"regexp"
 	"runtime"
 	"runtime"
+	"strconv"
 	"strings"
 	"strings"
 	"testing"
 	"testing"
 	"time"
 	"time"
@@ -59,6 +60,63 @@ func testEventBatch(events []wrappedEvent) *eventBatch {
 	return batch
 	return batch
 }
 }
 
 
+func TestNewStreamConfig(t *testing.T) {
+	tests := []struct {
+		logStreamName      string
+		logGroupName       string
+		logCreateGroup     string
+		logNonBlocking     string
+		forceFlushInterval string
+		maxBufferedEvents  string
+		datetimeFormat     string
+		multilinePattern   string
+		shouldErr          bool
+		testName           string
+	}{
+		{"", groupName, "", "", "", "", "", "", false, "defaults"},
+		{"", groupName, "invalid create group", "", "", "", "", "", true, "invalid create group"},
+		{"", groupName, "", "", "invalid flush interval", "", "", "", true, "invalid flush interval"},
+		{"", groupName, "", "", "", "invalid max buffered events", "", "", true, "invalid max buffered events"},
+		{"", groupName, "", "", "", "", "", "n{1001}", true, "invalid multiline pattern"},
+		{"", groupName, "", "", "15", "", "", "", false, "flush interval at 15"},
+		{"", groupName, "", "", "", "1024", "", "", false, "max buffered events at 1024"},
+	}
+
+	for _, tc := range tests {
+		t.Run(tc.testName, func(t *testing.T) {
+			cfg := map[string]string{
+				logGroupKey:           tc.logGroupName,
+				logCreateGroupKey:     tc.logCreateGroup,
+				"mode":                tc.logNonBlocking,
+				forceFlushIntervalKey: tc.forceFlushInterval,
+				maxBufferedEventsKey:  tc.maxBufferedEvents,
+				logStreamKey:          tc.logStreamName,
+				datetimeFormatKey:     tc.datetimeFormat,
+				multilinePatternKey:   tc.multilinePattern,
+			}
+
+			info := logger.Info{
+				Config: cfg,
+			}
+			logStreamConfig, err := newStreamConfig(info)
+			if tc.shouldErr {
+				assert.Check(t, err != nil, "Expected an error")
+			} else {
+				assert.Check(t, err == nil, "Unexpected error")
+				assert.Check(t, logStreamConfig.logGroupName == tc.logGroupName, "Unexpected logGroupName")
+				if tc.forceFlushInterval != "" {
+					forceFlushIntervalAsInt, _ := strconv.Atoi(info.Config[forceFlushIntervalKey])
+					assert.Check(t, logStreamConfig.forceFlushInterval == time.Duration(forceFlushIntervalAsInt)*time.Second, "Unexpected forceFlushInterval")
+				}
+				if tc.maxBufferedEvents != "" {
+					maxBufferedEvents, _ := strconv.Atoi(info.Config[maxBufferedEventsKey])
+					assert.Check(t, logStreamConfig.maxBufferedEvents == maxBufferedEvents, "Unexpected maxBufferedEvents")
+				}
+			}
+		})
+	}
+}
+
 func TestNewAWSLogsClientUserAgentHandler(t *testing.T) {
 func TestNewAWSLogsClientUserAgentHandler(t *testing.T) {
 	info := logger.Info{
 	info := logger.Info{
 		Config: map[string]string{
 		Config: map[string]string{
@@ -762,10 +820,10 @@ func TestCollectBatchMultilinePatternMaxEventAge(t *testing.T) {
 		Timestamp: time.Now().Add(time.Second),
 		Timestamp: time.Now().Add(time.Second),
 	})
 	})
 
 
-	// Fire ticker batchPublishFrequency seconds later
-	ticks <- time.Now().Add(batchPublishFrequency + time.Second)
+	// Fire ticker defaultForceFlushInterval seconds later
+	ticks <- time.Now().Add(defaultForceFlushInterval + time.Second)
 
 
-	// Verify single multiline event is flushed after maximum event buffer age (batchPublishFrequency)
+	// Verify single multiline event is flushed after maximum event buffer age (defaultForceFlushInterval)
 	argument := <-mockClient.putLogEventsArgument
 	argument := <-mockClient.putLogEventsArgument
 	assert.Check(t, argument != nil, "Expected non-nil PutLogEventsInput")
 	assert.Check(t, argument != nil, "Expected non-nil PutLogEventsInput")
 	assert.Check(t, is.Equal(1, len(argument.LogEvents)), "Expected single multiline event")
 	assert.Check(t, is.Equal(1, len(argument.LogEvents)), "Expected single multiline event")
@@ -777,8 +835,8 @@ func TestCollectBatchMultilinePatternMaxEventAge(t *testing.T) {
 		Timestamp: time.Now().Add(time.Second),
 		Timestamp: time.Now().Add(time.Second),
 	})
 	})
 
 
-	// Fire ticker another batchPublishFrequency seconds later
-	ticks <- time.Now().Add(2*batchPublishFrequency + time.Second)
+	// Fire ticker another defaultForceFlushInterval seconds later
+	ticks <- time.Now().Add(2*defaultForceFlushInterval + time.Second)
 
 
 	// Verify the event buffer is truly flushed - we should only receive a single event
 	// Verify the event buffer is truly flushed - we should only receive a single event
 	argument = <-mockClient.putLogEventsArgument
 	argument = <-mockClient.putLogEventsArgument
@@ -880,7 +938,7 @@ func TestCollectBatchMultilinePatternMaxEventSize(t *testing.T) {
 	})
 	})
 
 
 	// Fire ticker
 	// Fire ticker
-	ticks <- time.Now().Add(batchPublishFrequency)
+	ticks <- time.Now().Add(defaultForceFlushInterval)
 
 
 	// Verify multiline events
 	// Verify multiline events
 	// We expect a maximum sized event with no new line characters and a
 	// We expect a maximum sized event with no new line characters and a
@@ -1419,6 +1477,66 @@ func TestValidateLogOptionsDatetimeFormatAndMultilinePattern(t *testing.T) {
 	assert.Check(t, is.Equal(err.Error(), conflictingLogOptionsError), "Received invalid error")
 	assert.Check(t, is.Equal(err.Error(), conflictingLogOptionsError), "Received invalid error")
 }
 }
 
 
+func TestValidateLogOptionsForceFlushIntervalSeconds(t *testing.T) {
+	tests := []struct {
+		input     string
+		shouldErr bool
+	}{
+		{"0", true},
+		{"-1", true},
+		{"a", true},
+		{"10", false},
+	}
+
+	for _, tc := range tests {
+		t.Run(tc.input, func(t *testing.T) {
+			cfg := map[string]string{
+				forceFlushIntervalKey: tc.input,
+				logGroupKey:           groupName,
+			}
+
+			err := ValidateLogOpt(cfg)
+			if tc.shouldErr {
+				expectedErr := "must specify a positive integer for log opt 'awslogs-force-flush-interval-seconds': " + tc.input
+				assert.Check(t, err != nil, "Expected an error")
+				assert.Check(t, is.Equal(err.Error(), expectedErr), "Received invalid error")
+			} else {
+				assert.Check(t, err == nil, "Unexpected error")
+			}
+		})
+	}
+}
+
+func TestValidateLogOptionsMaxBufferedEvents(t *testing.T) {
+	tests := []struct {
+		input     string
+		shouldErr bool
+	}{
+		{"0", true},
+		{"-1", true},
+		{"a", true},
+		{"10", false},
+	}
+
+	for _, tc := range tests {
+		t.Run(tc.input, func(t *testing.T) {
+			cfg := map[string]string{
+				maxBufferedEventsKey: tc.input,
+				logGroupKey:          groupName,
+			}
+
+			err := ValidateLogOpt(cfg)
+			if tc.shouldErr {
+				expectedErr := "must specify a positive integer for log opt 'awslogs-max-buffered-events': " + tc.input
+				assert.Check(t, err != nil, "Expected an error")
+				assert.Check(t, is.Equal(err.Error(), expectedErr), "Received invalid error")
+			} else {
+				assert.Check(t, err == nil, "Unexpected error")
+			}
+		})
+	}
+}
+
 func TestCreateTagSuccess(t *testing.T) {
 func TestCreateTagSuccess(t *testing.T) {
 	mockClient := newMockClient()
 	mockClient := newMockClient()
 	info := logger.Info{
 	info := logger.Info{