Sfoglia il codice sorgente

Fix awslogs driver repeating last event - #34292

Signed-off-by: Justin Menga <justin.menga@gmail.com>
Justin Menga 8 anni fa
parent
commit
0fd5a0bab7

+ 7 - 8
daemon/logger/awslogs/cloudwatchlogs.go

@@ -384,15 +384,18 @@ func (l *logStream) collectBatch() {
 				eventBufferNegative := eventBufferAge < 0
 				eventBufferNegative := eventBufferAge < 0
 				if eventBufferExpired || eventBufferNegative {
 				if eventBufferExpired || eventBufferNegative {
 					events = l.processEvent(events, eventBuffer, eventBufferTimestamp)
 					events = l.processEvent(events, eventBuffer, eventBufferTimestamp)
+					eventBuffer = eventBuffer[:0]
 				}
 				}
 			}
 			}
 			l.publishBatch(events)
 			l.publishBatch(events)
 			events = events[:0]
 			events = events[:0]
 		case msg, more := <-l.messages:
 		case msg, more := <-l.messages:
 			if !more {
 			if !more {
-				// Flush event buffer
+				// Flush event buffer and release resources
 				events = l.processEvent(events, eventBuffer, eventBufferTimestamp)
 				events = l.processEvent(events, eventBuffer, eventBufferTimestamp)
+				eventBuffer = eventBuffer[:0]
 				l.publishBatch(events)
 				l.publishBatch(events)
+				events = events[:0]
 				return
 				return
 			}
 			}
 			if eventBufferTimestamp == 0 {
 			if eventBufferTimestamp == 0 {
@@ -400,17 +403,13 @@ func (l *logStream) collectBatch() {
 			}
 			}
 			unprocessedLine := msg.Line
 			unprocessedLine := msg.Line
 			if l.multilinePattern != nil {
 			if l.multilinePattern != nil {
-				if l.multilinePattern.Match(unprocessedLine) {
-					// This is a new log event so flush the current eventBuffer to events
+				if l.multilinePattern.Match(unprocessedLine) || len(eventBuffer)+len(unprocessedLine) > maximumBytesPerEvent {
+					// This is a new log event or we will exceed max bytes per event
+					// so flush the current eventBuffer to events and reset timestamp
 					events = l.processEvent(events, eventBuffer, eventBufferTimestamp)
 					events = l.processEvent(events, eventBuffer, eventBufferTimestamp)
 					eventBufferTimestamp = msg.Timestamp.UnixNano() / int64(time.Millisecond)
 					eventBufferTimestamp = msg.Timestamp.UnixNano() / int64(time.Millisecond)
 					eventBuffer = eventBuffer[:0]
 					eventBuffer = eventBuffer[:0]
 				}
 				}
-				// If we will exceed max bytes per event flush the current event buffer before appending
-				if len(eventBuffer)+len(unprocessedLine) > maximumBytesPerEvent {
-					events = l.processEvent(events, eventBuffer, eventBufferTimestamp)
-					eventBuffer = eventBuffer[:0]
-				}
 				// Append new line
 				// Append new line
 				processedLine := append(unprocessedLine, "\n"...)
 				processedLine := append(unprocessedLine, "\n"...)
 				eventBuffer = append(eventBuffer, processedLine...)
 				eventBuffer = append(eventBuffer, processedLine...)

+ 15 - 1
daemon/logger/awslogs/cloudwatchlogs_test.go

@@ -641,7 +641,7 @@ func TestCollectBatchMultilinePatternMaxEventAge(t *testing.T) {
 	})
 	})
 
 
 	// Fire ticker batchPublishFrequency seconds later
 	// Fire ticker batchPublishFrequency seconds later
-	ticks <- time.Now().Add(batchPublishFrequency * time.Second)
+	ticks <- time.Now().Add(batchPublishFrequency + time.Second)
 
 
 	// Verify single multiline event is flushed after maximum event buffer age (batchPublishFrequency)
 	// Verify single multiline event is flushed after maximum event buffer age (batchPublishFrequency)
 	argument := <-mockClient.putLogEventsArgument
 	argument := <-mockClient.putLogEventsArgument
@@ -649,6 +649,20 @@ func TestCollectBatchMultilinePatternMaxEventAge(t *testing.T) {
 	assert.Equal(t, 1, len(argument.LogEvents), "Expected single multiline event")
 	assert.Equal(t, 1, len(argument.LogEvents), "Expected single multiline event")
 	assert.Equal(t, logline+"\n"+logline+"\n", *argument.LogEvents[0].Message, "Received incorrect multiline message")
 	assert.Equal(t, logline+"\n"+logline+"\n", *argument.LogEvents[0].Message, "Received incorrect multiline message")
 
 
+	// Log an event 1 second later
+	stream.Log(&logger.Message{
+		Line:      []byte(logline),
+		Timestamp: time.Now().Add(time.Second),
+	})
+
+	// Fire ticker another batchPublishFrequency seconds later
+	ticks <- time.Now().Add(2*batchPublishFrequency + time.Second)
+
+	// Verify the event buffer is truly flushed - we should only receive a single event
+	argument = <-mockClient.putLogEventsArgument
+	assert.NotNil(t, argument, "Expected non-nil PutLogEventsInput")
+	assert.Equal(t, 1, len(argument.LogEvents), "Expected single multiline event")
+	assert.Equal(t, logline+"\n", *argument.LogEvents[0].Message, "Received incorrect multiline message")
 	stream.Close()
 	stream.Close()
 }
 }