浏览代码

Merge pull request #35726 from jahkeup/awslogs-batching

Fix awslogs batch size calculation
Sebastiaan van Stijn 7 年之前
父节点
当前提交
c8f7f4489e

+ 105 - 33
daemon/logger/awslogs/cloudwatchlogs.go

@@ -95,6 +95,17 @@ func init() {
 	}
 }
 
+// eventBatch holds the events that are batched for submission and the
+// associated data about it.
+//
+// Warning: this type is not threadsafe and must not be used
+// concurrently. This type is expected to be consumed in a single go
+// routine and never concurrently.
+type eventBatch struct {
+	batch []wrappedEvent
+	bytes int
+}
+
 // New creates an awslogs logger using the configuration passed in on the
 // context.  Supported context configuration variables are awslogs-region,
 // awslogs-group, awslogs-stream, awslogs-create-group, awslogs-multiline-pattern
@@ -389,32 +400,32 @@ var newTicker = func(freq time.Duration) *time.Ticker {
 // Logs, the processEvents method is called.  If a multiline pattern is not
 // configured, log events are submitted to the processEvents method immediately.
 func (l *logStream) collectBatch() {
-	timer := newTicker(batchPublishFrequency)
-	var events []wrappedEvent
+	ticker := newTicker(batchPublishFrequency)
 	var eventBuffer []byte
 	var eventBufferTimestamp int64
+	var batch = newEventBatch()
 	for {
 		select {
-		case t := <-timer.C:
+		case t := <-ticker.C:
 			// If event buffer is older than batch publish frequency flush the event buffer
 			if eventBufferTimestamp > 0 && len(eventBuffer) > 0 {
 				eventBufferAge := t.UnixNano()/int64(time.Millisecond) - eventBufferTimestamp
 				eventBufferExpired := eventBufferAge > int64(batchPublishFrequency)/int64(time.Millisecond)
 				eventBufferNegative := eventBufferAge < 0
 				if eventBufferExpired || eventBufferNegative {
-					events = l.processEvent(events, eventBuffer, eventBufferTimestamp)
+					l.processEvent(batch, eventBuffer, eventBufferTimestamp)
 					eventBuffer = eventBuffer[:0]
 				}
 			}
-			l.publishBatch(events)
-			events = events[:0]
+			l.publishBatch(batch)
+			batch.reset()
 		case msg, more := <-l.messages:
 			if !more {
 				// Flush event buffer and release resources
-				events = l.processEvent(events, eventBuffer, eventBufferTimestamp)
+				l.processEvent(batch, eventBuffer, eventBufferTimestamp)
 				eventBuffer = eventBuffer[:0]
-				l.publishBatch(events)
-				events = events[:0]
+				l.publishBatch(batch)
+				batch.reset()
 				return
 			}
 			if eventBufferTimestamp == 0 {
@@ -425,7 +436,7 @@ func (l *logStream) collectBatch() {
 				if l.multilinePattern.Match(unprocessedLine) || len(eventBuffer)+len(unprocessedLine) > maximumBytesPerEvent {
 					// This is a new log event or we will exceed max bytes per event
 					// so flush the current eventBuffer to events and reset timestamp
-					events = l.processEvent(events, eventBuffer, eventBufferTimestamp)
+					l.processEvent(batch, eventBuffer, eventBufferTimestamp)
 					eventBufferTimestamp = msg.Timestamp.UnixNano() / int64(time.Millisecond)
 					eventBuffer = eventBuffer[:0]
 				}
@@ -434,7 +445,7 @@ func (l *logStream) collectBatch() {
 				eventBuffer = append(eventBuffer, processedLine...)
 				logger.PutMessage(msg)
 			} else {
-				events = l.processEvent(events, unprocessedLine, msg.Timestamp.UnixNano()/int64(time.Millisecond))
+				l.processEvent(batch, unprocessedLine, msg.Timestamp.UnixNano()/int64(time.Millisecond))
 				logger.PutMessage(msg)
 			}
 		}
@@ -450,8 +461,7 @@ func (l *logStream) collectBatch() {
 // bytes per event (defined in maximumBytesPerEvent).  There is a fixed per-event
 // byte overhead (defined in perEventBytes) which is accounted for in split- and
 // batch-calculations.
-func (l *logStream) processEvent(events []wrappedEvent, unprocessedLine []byte, timestamp int64) []wrappedEvent {
-	bytes := 0
+func (l *logStream) processEvent(batch *eventBatch, unprocessedLine []byte, timestamp int64) {
 	for len(unprocessedLine) > 0 {
 		// Split line length so it does not exceed the maximum
 		lineBytes := len(unprocessedLine)
@@ -459,38 +469,33 @@ func (l *logStream) processEvent(events []wrappedEvent, unprocessedLine []byte,
 			lineBytes = maximumBytesPerEvent
 		}
 		line := unprocessedLine[:lineBytes]
-		unprocessedLine = unprocessedLine[lineBytes:]
-		if (len(events) >= maximumLogEventsPerPut) || (bytes+lineBytes+perEventBytes > maximumBytesPerPut) {
-			// Publish an existing batch if it's already over the maximum number of events or if adding this
-			// event would push it over the maximum number of total bytes.
-			l.publishBatch(events)
-			events = events[:0]
-			bytes = 0
-		}
-		events = append(events, wrappedEvent{
+
+		event := wrappedEvent{
 			inputLogEvent: &cloudwatchlogs.InputLogEvent{
 				Message:   aws.String(string(line)),
 				Timestamp: aws.Int64(timestamp),
 			},
-			insertOrder: len(events),
-		})
-		bytes += (lineBytes + perEventBytes)
+			insertOrder: batch.count(),
+		}
+
+		added := batch.add(event, lineBytes)
+		if added {
+			unprocessedLine = unprocessedLine[lineBytes:]
+		} else {
+			l.publishBatch(batch)
+			batch.reset()
+		}
 	}
-	return events
 }
 
 // publishBatch calls PutLogEvents for a given set of InputLogEvents,
 // accounting for sequencing requirements (each request must reference the
 // sequence token returned by the previous request).
-func (l *logStream) publishBatch(events []wrappedEvent) {
-	if len(events) == 0 {
+func (l *logStream) publishBatch(batch *eventBatch) {
+	if batch.isEmpty() {
 		return
 	}
-
-	// events in a batch must be sorted by timestamp
-	// see http://docs.aws.amazon.com/AmazonCloudWatchLogs/latest/APIReference/API_PutLogEvents.html
-	sort.Sort(byTimestamp(events))
-	cwEvents := unwrapEvents(events)
+	cwEvents := unwrapEvents(batch.events())
 
 	nextSequenceToken, err := l.putLogEvents(cwEvents, l.sequenceToken)
 
@@ -615,3 +620,70 @@ func unwrapEvents(events []wrappedEvent) []*cloudwatchlogs.InputLogEvent {
 	}
 	return cwEvents
 }
+
+func newEventBatch() *eventBatch {
+	return &eventBatch{
+		batch: make([]wrappedEvent, 0),
+		bytes: 0,
+	}
+}
+
+// events returns a slice of wrappedEvents sorted in order of their
+// timestamps and then by their insertion order (see `byTimestamp`).
+//
+// Warning: this method is not threadsafe and must not be used
+// concurrently.
+func (b *eventBatch) events() []wrappedEvent {
+	sort.Sort(byTimestamp(b.batch))
+	return b.batch
+}
+
+// add adds an event to the batch of events accounting for the
+// necessary overhead for an event to be logged. An error will be
+// returned if the event cannot be added to the batch due to service
+// limits.
+//
+// Warning: this method is not threadsafe and must not be used
+// concurrently.
+func (b *eventBatch) add(event wrappedEvent, size int) bool {
+	addBytes := size + perEventBytes
+
+	// verify we are still within service limits
+	switch {
+	case len(b.batch)+1 > maximumLogEventsPerPut:
+		return false
+	case b.bytes+addBytes > maximumBytesPerPut:
+		return false
+	}
+
+	b.bytes += addBytes
+	b.batch = append(b.batch, event)
+
+	return true
+}
+
+// count is the number of batched events.  Warning: this method
+// is not threadsafe and must not be used concurrently.
+func (b *eventBatch) count() int {
+	return len(b.batch)
+}
+
+// size is the total number of bytes that the batch represents.
+//
+// Warning: this method is not threadsafe and must not be used
+// concurrently.
+func (b *eventBatch) size() int {
+	return b.bytes
+}
+
+func (b *eventBatch) isEmpty() bool {
+	zeroEvents := b.count() == 0
+	zeroSize := b.size() == 0
+	return zeroEvents && zeroSize
+}
+
+// reset prepares the batch for reuse.
+func (b *eventBatch) reset() {
+	b.bytes = 0
+	b.batch = b.batch[:0]
+}

+ 55 - 17
daemon/logger/awslogs/cloudwatchlogs_test.go

@@ -49,6 +49,15 @@ func (l *logStream) logGenerator(lineCount int, multilineCount int) {
 	}
 }
 
+func testEventBatch(events []wrappedEvent) *eventBatch {
+	batch := newEventBatch()
+	for _, event := range events {
+		eventlen := len([]byte(*event.inputLogEvent.Message))
+		batch.add(event, eventlen)
+	}
+	return batch
+}
+
 func TestNewAWSLogsClientUserAgentHandler(t *testing.T) {
 	info := logger.Info{
 		Config: map[string]string{
@@ -212,7 +221,7 @@ func TestPublishBatchSuccess(t *testing.T) {
 		},
 	}
 
-	stream.publishBatch(events)
+	stream.publishBatch(testEventBatch(events))
 	if stream.sequenceToken == nil {
 		t.Fatal("Expected non-nil sequenceToken")
 	}
@@ -257,7 +266,7 @@ func TestPublishBatchError(t *testing.T) {
 		},
 	}
 
-	stream.publishBatch(events)
+	stream.publishBatch(testEventBatch(events))
 	if stream.sequenceToken == nil {
 		t.Fatal("Expected non-nil sequenceToken")
 	}
@@ -291,7 +300,7 @@ func TestPublishBatchInvalidSeqSuccess(t *testing.T) {
 		},
 	}
 
-	stream.publishBatch(events)
+	stream.publishBatch(testEventBatch(events))
 	if stream.sequenceToken == nil {
 		t.Fatal("Expected non-nil sequenceToken")
 	}
@@ -354,7 +363,7 @@ func TestPublishBatchAlreadyAccepted(t *testing.T) {
 		},
 	}
 
-	stream.publishBatch(events)
+	stream.publishBatch(testEventBatch(events))
 	if stream.sequenceToken == nil {
 		t.Fatal("Expected non-nil sequenceToken")
 	}
@@ -859,7 +868,8 @@ func TestCollectBatchMaxEvents(t *testing.T) {
 }
 
 func TestCollectBatchMaxTotalBytes(t *testing.T) {
-	mockClient := newMockClientBuffered(1)
+	expectedPuts := 2
+	mockClient := newMockClientBuffered(expectedPuts)
 	stream := &logStream{
 		client:        mockClient,
 		logGroupName:  groupName,
@@ -867,11 +877,14 @@ func TestCollectBatchMaxTotalBytes(t *testing.T) {
 		sequenceToken: aws.String(sequenceToken),
 		messages:      make(chan *logger.Message),
 	}
-	mockClient.putLogEventsResult <- &putLogEventsResult{
-		successResult: &cloudwatchlogs.PutLogEventsOutput{
-			NextSequenceToken: aws.String(nextSequenceToken),
-		},
+	for i := 0; i < expectedPuts; i++ {
+		mockClient.putLogEventsResult <- &putLogEventsResult{
+			successResult: &cloudwatchlogs.PutLogEventsOutput{
+				NextSequenceToken: aws.String(nextSequenceToken),
+			},
+		}
 	}
+
 	var ticks = make(chan time.Time)
 	newTicker = func(_ time.Duration) *time.Ticker {
 		return &time.Ticker{
@@ -881,32 +894,57 @@ func TestCollectBatchMaxTotalBytes(t *testing.T) {
 
 	go stream.collectBatch()
 
-	longline := strings.Repeat("A", maximumBytesPerPut)
+	numPayloads := maximumBytesPerPut / (maximumBytesPerEvent + perEventBytes)
+	// maxline is the maximum line that could be submitted after
+	// accounting for its overhead.
+	maxline := strings.Repeat("A", maximumBytesPerPut-(perEventBytes*numPayloads))
+	// This will be split and batched up to the `maximumBytesPerPut'
+	// (+/- `maximumBytesPerEvent'). This /should/ be aligned, but
+	// should also tolerate an offset within that range.
 	stream.Log(&logger.Message{
-		Line:      []byte(longline + "B"),
+		Line:      []byte(maxline[:len(maxline)/2]),
+		Timestamp: time.Time{},
+	})
+	stream.Log(&logger.Message{
+		Line:      []byte(maxline[len(maxline)/2:]),
+		Timestamp: time.Time{},
+	})
+	stream.Log(&logger.Message{
+		Line:      []byte("B"),
 		Timestamp: time.Time{},
 	})
 
-	// no ticks
+	// no ticks, guarantee batch by size (and chan close)
 	stream.Close()
 
 	argument := <-mockClient.putLogEventsArgument
 	if argument == nil {
 		t.Fatal("Expected non-nil PutLogEventsInput")
 	}
-	bytes := 0
+
+	// Should total to the maximum allowed bytes.
+	eventBytes := 0
 	for _, event := range argument.LogEvents {
-		bytes += len(*event.Message)
+		eventBytes += len(*event.Message)
+	}
+	eventsOverhead := len(argument.LogEvents) * perEventBytes
+	payloadTotal := eventBytes + eventsOverhead
+	// lowestMaxBatch allows the payload to be offset if the messages
+	// don't lend themselves to align with the maximum event size.
+	lowestMaxBatch := maximumBytesPerPut - maximumBytesPerEvent
+
+	if payloadTotal > maximumBytesPerPut {
+		t.Errorf("Expected <= %d bytes but was %d", maximumBytesPerPut, payloadTotal)
 	}
-	if bytes > maximumBytesPerPut {
-		t.Errorf("Expected <= %d bytes but was %d", maximumBytesPerPut, bytes)
+	if payloadTotal < lowestMaxBatch {
+		t.Errorf("Batch to be no less than %d but was %d", lowestMaxBatch, payloadTotal)
 	}
 
 	argument = <-mockClient.putLogEventsArgument
 	if len(argument.LogEvents) != 1 {
 		t.Errorf("Expected LogEvents to contain 1 elements, but contains %d", len(argument.LogEvents))
 	}
-	message := *argument.LogEvents[0].Message
+	message := *argument.LogEvents[len(argument.LogEvents)-1].Message
 	if message[len(message)-1:] != "B" {
 		t.Errorf("Expected message to be %s but was %s", "B", message[len(message)-1:])
 	}

+ 28 - 1
daemon/logger/awslogs/cwlogsiface_mock_test.go

@@ -1,6 +1,10 @@
 package awslogs
 
-import "github.com/aws/aws-sdk-go/service/cloudwatchlogs"
+import (
+	"fmt"
+
+	"github.com/aws/aws-sdk-go/service/cloudwatchlogs"
+)
 
 type mockcwlogsclient struct {
 	createLogGroupArgument  chan *cloudwatchlogs.CreateLogGroupInput
@@ -67,7 +71,30 @@ func (m *mockcwlogsclient) PutLogEvents(input *cloudwatchlogs.PutLogEventsInput)
 		LogGroupName:  input.LogGroupName,
 		LogStreamName: input.LogStreamName,
 	}
+
+	// Intended mock output
 	output := <-m.putLogEventsResult
+
+	// Checked enforced limits in mock
+	totalBytes := 0
+	for _, evt := range events {
+		if evt.Message == nil {
+			continue
+		}
+		eventBytes := len([]byte(*evt.Message))
+		if eventBytes > maximumBytesPerEvent {
+			// exceeded per event message size limits
+			return nil, fmt.Errorf("maximum bytes per event exceeded: Event too large %d, max allowed: %d", eventBytes, maximumBytesPerEvent)
+		}
+		// total event bytes including overhead
+		totalBytes += eventBytes + perEventBytes
+	}
+
+	if totalBytes > maximumBytesPerPut {
+		// exceeded per put maximum size limit
+		return nil, fmt.Errorf("maximum bytes per put exceeded: Upload too large %d, max allowed: %d", totalBytes, maximumBytesPerPut)
+	}
+
 	return output.successResult, output.errorResult
 }