Selaa lähdekoodia

awslogs: Record log line insert order for sorting

Fixes https://github.com/docker/docker/issues/24775

Signed-off-by: Samuel Karp <skarp@amazon.com>
Samuel Karp 9 vuotta sitten
vanhempi
commit
443f251cf5

+ 33 - 12
daemon/logger/awslogs/cloudwatchlogs.go

@@ -64,7 +64,11 @@ type regionFinder interface {
 	Region() (string, error)
 }
 
-type byTimestamp []*cloudwatchlogs.InputLogEvent
+type wrappedEvent struct {
+	inputLogEvent *cloudwatchlogs.InputLogEvent
+	insertOrder   int
+}
+type byTimestamp []wrappedEvent
 
 // init registers the awslogs driver
 func init() {
@@ -229,7 +233,7 @@ var newTicker = func(freq time.Duration) *time.Ticker {
 // calculations.
 func (l *logStream) collectBatch() {
 	timer := newTicker(batchPublishFrequency)
-	var events []*cloudwatchlogs.InputLogEvent
+	var events []wrappedEvent
 	bytes := 0
 	for {
 		select {
@@ -258,9 +262,12 @@ func (l *logStream) collectBatch() {
 					events = events[:0]
 					bytes = 0
 				}
-				events = append(events, &cloudwatchlogs.InputLogEvent{
-					Message:   aws.String(string(line)),
-					Timestamp: aws.Int64(msg.Timestamp.UnixNano() / int64(time.Millisecond)),
+				events = append(events, wrappedEvent{
+					inputLogEvent: &cloudwatchlogs.InputLogEvent{
+						Message:   aws.String(string(line)),
+						Timestamp: aws.Int64(msg.Timestamp.UnixNano() / int64(time.Millisecond)),
+					},
+					insertOrder: len(events),
 				})
 				bytes += (lineBytes + perEventBytes)
 			}
@@ -271,14 +278,17 @@ func (l *logStream) collectBatch() {
 // publishBatch calls PutLogEvents for a given set of InputLogEvents,
 // accounting for sequencing requirements (each request must reference the
 // sequence token returned by the previous request).
-func (l *logStream) publishBatch(events []*cloudwatchlogs.InputLogEvent) {
+func (l *logStream) publishBatch(events []wrappedEvent) {
 	if len(events) == 0 {
 		return
 	}
 
+	// events in a batch must be sorted by timestamp
+	// see http://docs.aws.amazon.com/AmazonCloudWatchLogs/latest/APIReference/API_PutLogEvents.html
 	sort.Sort(byTimestamp(events))
+	cwEvents := unwrapEvents(events)
 
-	nextSequenceToken, err := l.putLogEvents(events, l.sequenceToken)
+	nextSequenceToken, err := l.putLogEvents(cwEvents, l.sequenceToken)
 
 	if err != nil {
 		if awsErr, ok := err.(awserr.Error); ok {
@@ -297,7 +307,7 @@ func (l *logStream) publishBatch(events []*cloudwatchlogs.InputLogEvent) {
 				// sequence code is bad, grab the correct one and retry
 				parts := strings.Split(awsErr.Message(), " ")
 				token := parts[len(parts)-1]
-				nextSequenceToken, err = l.putLogEvents(events, &token)
+				nextSequenceToken, err = l.putLogEvents(cwEvents, &token)
 			}
 		}
 	}
@@ -360,11 +370,14 @@ func (slice byTimestamp) Len() int {
 // required by the sort.Interface interface.
 func (slice byTimestamp) Less(i, j int) bool {
 	iTimestamp, jTimestamp := int64(0), int64(0)
-	if slice != nil && slice[i].Timestamp != nil {
-		iTimestamp = *slice[i].Timestamp
+	if slice != nil && slice[i].inputLogEvent.Timestamp != nil {
+		iTimestamp = *slice[i].inputLogEvent.Timestamp
+	}
+	if slice != nil && slice[j].inputLogEvent.Timestamp != nil {
+		jTimestamp = *slice[j].inputLogEvent.Timestamp
 	}
-	if slice != nil && slice[j].Timestamp != nil {
-		jTimestamp = *slice[j].Timestamp
+	if iTimestamp == jTimestamp {
+		return slice[i].insertOrder < slice[j].insertOrder
 	}
 	return iTimestamp < jTimestamp
 }
@@ -374,3 +387,11 @@ func (slice byTimestamp) Less(i, j int) bool {
 func (slice byTimestamp) Swap(i, j int) {
 	slice[i], slice[j] = slice[j], slice[i]
 }
+
+func unwrapEvents(events []wrappedEvent) []*cloudwatchlogs.InputLogEvent {
+	cwEvents := []*cloudwatchlogs.InputLogEvent{}
+	for _, input := range events {
+		cwEvents = append(cwEvents, input.inputLogEvent)
+	}
+	return cwEvents
+}

+ 20 - 13
daemon/logger/awslogs/cloudwatchlogs_test.go

@@ -150,10 +150,11 @@ func TestPublishBatchSuccess(t *testing.T) {
 			NextSequenceToken: aws.String(nextSequenceToken),
 		},
 	}
-
-	events := []*cloudwatchlogs.InputLogEvent{
+	events := []wrappedEvent{
 		{
-			Message: aws.String(logline),
+			inputLogEvent: &cloudwatchlogs.InputLogEvent{
+				Message: aws.String(logline),
+			},
 		},
 	}
 
@@ -177,7 +178,7 @@ func TestPublishBatchSuccess(t *testing.T) {
 	if len(argument.LogEvents) != 1 {
 		t.Errorf("Expected LogEvents to contain 1 element, but contains %d", len(argument.LogEvents))
 	}
-	if argument.LogEvents[0] != events[0] {
+	if argument.LogEvents[0] != events[0].inputLogEvent {
 		t.Error("Expected event to equal input")
 	}
 }
@@ -194,9 +195,11 @@ func TestPublishBatchError(t *testing.T) {
 		errorResult: errors.New("Error!"),
 	}
 
-	events := []*cloudwatchlogs.InputLogEvent{
+	events := []wrappedEvent{
 		{
-			Message: aws.String(logline),
+			inputLogEvent: &cloudwatchlogs.InputLogEvent{
+				Message: aws.String(logline),
+			},
 		},
 	}
 
@@ -226,9 +229,11 @@ func TestPublishBatchInvalidSeqSuccess(t *testing.T) {
 		},
 	}
 
-	events := []*cloudwatchlogs.InputLogEvent{
+	events := []wrappedEvent{
 		{
-			Message: aws.String(logline),
+			inputLogEvent: &cloudwatchlogs.InputLogEvent{
+				Message: aws.String(logline),
+			},
 		},
 	}
 
@@ -253,7 +258,7 @@ func TestPublishBatchInvalidSeqSuccess(t *testing.T) {
 	if len(argument.LogEvents) != 1 {
 		t.Errorf("Expected LogEvents to contain 1 element, but contains %d", len(argument.LogEvents))
 	}
-	if argument.LogEvents[0] != events[0] {
+	if argument.LogEvents[0] != events[0].inputLogEvent {
 		t.Error("Expected event to equal input")
 	}
 
@@ -270,7 +275,7 @@ func TestPublishBatchInvalidSeqSuccess(t *testing.T) {
 	if len(argument.LogEvents) != 1 {
 		t.Errorf("Expected LogEvents to contain 1 element, but contains %d", len(argument.LogEvents))
 	}
-	if argument.LogEvents[0] != events[0] {
+	if argument.LogEvents[0] != events[0].inputLogEvent {
 		t.Error("Expected event to equal input")
 	}
 }
@@ -287,9 +292,11 @@ func TestPublishBatchAlreadyAccepted(t *testing.T) {
 		errorResult: awserr.New(dataAlreadyAcceptedCode, "use token token", nil),
 	}
 
-	events := []*cloudwatchlogs.InputLogEvent{
+	events := []wrappedEvent{
 		{
-			Message: aws.String(logline),
+			inputLogEvent: &cloudwatchlogs.InputLogEvent{
+				Message: aws.String(logline),
+			},
 		},
 	}
 
@@ -314,7 +321,7 @@ func TestPublishBatchAlreadyAccepted(t *testing.T) {
 	if len(argument.LogEvents) != 1 {
 		t.Errorf("Expected LogEvents to contain 1 element, but contains %d", len(argument.LogEvents))
 	}
-	if argument.LogEvents[0] != events[0] {
+	if argument.LogEvents[0] != events[0].inputLogEvent {
 		t.Error("Expected event to equal input")
 	}
 }