Merge pull request #37986 from samuelkarp/moby/moby-37747
awslogs: account for UTF-8 normalization in limits
This commit is contained in:
commit
6611ab1c6f
2 changed files with 233 additions and 13 deletions
|
@ -11,6 +11,7 @@ import (
|
|||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
"unicode/utf8"
|
||||
|
||||
"github.com/aws/aws-sdk-go/aws"
|
||||
"github.com/aws/aws-sdk-go/aws/awserr"
|
||||
|
@ -46,6 +47,10 @@ const (
|
|||
maximumLogEventsPerPut = 10000
|
||||
|
||||
// See: http://docs.aws.amazon.com/AmazonCloudWatch/latest/DeveloperGuide/cloudwatch_limits.html
|
||||
// Because the events are interpreted as UTF-8 encoded Unicode, invalid UTF-8 byte sequences are replaced with the
|
||||
// Unicode replacement character (U+FFFD), which is a 3-byte sequence in UTF-8. To compensate for that and to avoid
|
||||
// splitting valid UTF-8 characters into invalid byte sequences, we calculate the length of each event assuming that
|
||||
// this replacement happens.
|
||||
maximumBytesPerEvent = 262144 - perEventBytes
|
||||
|
||||
resourceAlreadyExistsCode = "ResourceAlreadyExistsException"
|
||||
|
@ -495,15 +500,16 @@ func (l *logStream) collectBatch(created chan bool) {
|
|||
}
|
||||
line := msg.Line
|
||||
if l.multilinePattern != nil {
|
||||
if l.multilinePattern.Match(line) || len(eventBuffer)+len(line) > maximumBytesPerEvent {
|
||||
lineEffectiveLen := effectiveLen(string(line))
|
||||
if l.multilinePattern.Match(line) || effectiveLen(string(eventBuffer))+lineEffectiveLen > maximumBytesPerEvent {
|
||||
// This is a new log event or we will exceed max bytes per event
|
||||
// so flush the current eventBuffer to events and reset timestamp
|
||||
l.processEvent(batch, eventBuffer, eventBufferTimestamp)
|
||||
eventBufferTimestamp = msg.Timestamp.UnixNano() / int64(time.Millisecond)
|
||||
eventBuffer = eventBuffer[:0]
|
||||
}
|
||||
// Append new line if event is less than max event size
|
||||
if len(line) < maximumBytesPerEvent {
|
||||
// Append newline if event is less than max event size
|
||||
if lineEffectiveLen < maximumBytesPerEvent {
|
||||
line = append(line, "\n"...)
|
||||
}
|
||||
eventBuffer = append(eventBuffer, line...)
|
||||
|
@ -524,16 +530,17 @@ func (l *logStream) collectBatch(created chan bool) {
|
|||
// batch (defined in maximumBytesPerPut). Log messages are split by the maximum
|
||||
// bytes per event (defined in maximumBytesPerEvent). There is a fixed per-event
|
||||
// byte overhead (defined in perEventBytes) which is accounted for in split- and
|
||||
// batch-calculations.
|
||||
func (l *logStream) processEvent(batch *eventBatch, events []byte, timestamp int64) {
|
||||
for len(events) > 0 {
|
||||
// batch-calculations. Because the events are interpreted as UTF-8 encoded
|
||||
// Unicode, invalid UTF-8 byte sequences are replaced with the Unicode
|
||||
// replacement character (U+FFFD), which is a 3-byte sequence in UTF-8. To
|
||||
// compensate for that and to avoid splitting valid UTF-8 characters into
|
||||
// invalid byte sequences, we calculate the length of each event assuming that
|
||||
// this replacement happens.
|
||||
func (l *logStream) processEvent(batch *eventBatch, bytes []byte, timestamp int64) {
|
||||
for len(bytes) > 0 {
|
||||
// Split line length so it does not exceed the maximum
|
||||
lineBytes := len(events)
|
||||
if lineBytes > maximumBytesPerEvent {
|
||||
lineBytes = maximumBytesPerEvent
|
||||
}
|
||||
line := events[:lineBytes]
|
||||
|
||||
splitOffset, lineBytes := findValidSplit(string(bytes), maximumBytesPerEvent)
|
||||
line := bytes[:splitOffset]
|
||||
event := wrappedEvent{
|
||||
inputLogEvent: &cloudwatchlogs.InputLogEvent{
|
||||
Message: aws.String(string(line)),
|
||||
|
@ -544,7 +551,7 @@ func (l *logStream) processEvent(batch *eventBatch, events []byte, timestamp int
|
|||
|
||||
added := batch.add(event, lineBytes)
|
||||
if added {
|
||||
events = events[lineBytes:]
|
||||
bytes = bytes[splitOffset:]
|
||||
} else {
|
||||
l.publishBatch(batch)
|
||||
batch.reset()
|
||||
|
@ -552,6 +559,37 @@ func (l *logStream) processEvent(batch *eventBatch, events []byte, timestamp int
|
|||
}
|
||||
}
|
||||
|
||||
// effectiveLen counts the effective number of bytes in the string, after
|
||||
// UTF-8 normalization. UTF-8 normalization includes replacing bytes that do
|
||||
// not constitute valid UTF-8 encoded Unicode codepoints with the Unicode
|
||||
// replacement codepoint U+FFFD (a 3-byte UTF-8 sequence, represented in Go as
|
||||
// utf8.RuneError)
|
||||
func effectiveLen(line string) int {
|
||||
effectiveBytes := 0
|
||||
for _, rune := range line {
|
||||
effectiveBytes += utf8.RuneLen(rune)
|
||||
}
|
||||
return effectiveBytes
|
||||
}
|
||||
|
||||
// findValidSplit finds the byte offset to split a string without breaking valid
|
||||
// Unicode codepoints given a maximum number of total bytes. findValidSplit
|
||||
// returns the byte offset for splitting a string or []byte, as well as the
|
||||
// effective number of bytes if the string were normalized to replace invalid
|
||||
// UTF-8 encoded bytes with the Unicode replacement character (a 3-byte UTF-8
|
||||
// sequence, represented in Go as utf8.RuneError)
|
||||
func findValidSplit(line string, maxBytes int) (splitOffset, effectiveBytes int) {
|
||||
for offset, rune := range line {
|
||||
splitOffset = offset
|
||||
if effectiveBytes+utf8.RuneLen(rune) > maxBytes {
|
||||
return splitOffset, effectiveBytes
|
||||
}
|
||||
effectiveBytes += utf8.RuneLen(rune)
|
||||
}
|
||||
splitOffset = len(line)
|
||||
return
|
||||
}
|
||||
|
||||
// publishBatch calls PutLogEvents for a given set of InputLogEvents,
|
||||
// accounting for sequencing requirements (each request must reference the
|
||||
// sequence token returned by the previous request).
|
||||
|
|
|
@ -938,6 +938,62 @@ func TestCollectBatchClose(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
func TestEffectiveLen(t *testing.T) {
|
||||
tests := []struct {
|
||||
str string
|
||||
effectiveBytes int
|
||||
}{
|
||||
{"Hello", 5},
|
||||
{string([]byte{1, 2, 3, 4}), 4},
|
||||
{"🙃", 4},
|
||||
{string([]byte{0xFF, 0xFF, 0xFF, 0xFF}), 12},
|
||||
{"He\xff\xffo", 9},
|
||||
{"", 0},
|
||||
}
|
||||
for i, tc := range tests {
|
||||
t.Run(fmt.Sprintf("%d/%s", i, tc.str), func(t *testing.T) {
|
||||
assert.Equal(t, tc.effectiveBytes, effectiveLen(tc.str))
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestFindValidSplit(t *testing.T) {
|
||||
tests := []struct {
|
||||
str string
|
||||
maxEffectiveBytes int
|
||||
splitOffset int
|
||||
effectiveBytes int
|
||||
}{
|
||||
{"", 10, 0, 0},
|
||||
{"Hello", 6, 5, 5},
|
||||
{"Hello", 2, 2, 2},
|
||||
{"Hello", 0, 0, 0},
|
||||
{"🙃", 3, 0, 0},
|
||||
{"🙃", 4, 4, 4},
|
||||
{string([]byte{'a', 0xFF}), 2, 1, 1},
|
||||
{string([]byte{'a', 0xFF}), 4, 2, 4},
|
||||
}
|
||||
for i, tc := range tests {
|
||||
t.Run(fmt.Sprintf("%d/%s", i, tc.str), func(t *testing.T) {
|
||||
splitOffset, effectiveBytes := findValidSplit(tc.str, tc.maxEffectiveBytes)
|
||||
assert.Equal(t, tc.splitOffset, splitOffset, "splitOffset")
|
||||
assert.Equal(t, tc.effectiveBytes, effectiveBytes, "effectiveBytes")
|
||||
t.Log(tc.str[:tc.splitOffset])
|
||||
t.Log(tc.str[tc.splitOffset:])
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestProcessEventEmoji(t *testing.T) {
|
||||
stream := &logStream{}
|
||||
batch := &eventBatch{}
|
||||
bytes := []byte(strings.Repeat("🙃", maximumBytesPerEvent/4+1))
|
||||
stream.processEvent(batch, bytes, 0)
|
||||
assert.Equal(t, 2, len(batch.batch), "should be two events in the batch")
|
||||
assert.Equal(t, strings.Repeat("🙃", maximumBytesPerEvent/4), aws.StringValue(batch.batch[0].inputLogEvent.Message))
|
||||
assert.Equal(t, "🙃", aws.StringValue(batch.batch[1].inputLogEvent.Message))
|
||||
}
|
||||
|
||||
func TestCollectBatchLineSplit(t *testing.T) {
|
||||
mockClient := newMockClient()
|
||||
stream := &logStream{
|
||||
|
@ -987,6 +1043,55 @@ func TestCollectBatchLineSplit(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
func TestCollectBatchLineSplitWithBinary(t *testing.T) {
|
||||
mockClient := newMockClient()
|
||||
stream := &logStream{
|
||||
client: mockClient,
|
||||
logGroupName: groupName,
|
||||
logStreamName: streamName,
|
||||
sequenceToken: aws.String(sequenceToken),
|
||||
messages: make(chan *logger.Message),
|
||||
}
|
||||
mockClient.putLogEventsResult <- &putLogEventsResult{
|
||||
successResult: &cloudwatchlogs.PutLogEventsOutput{
|
||||
NextSequenceToken: aws.String(nextSequenceToken),
|
||||
},
|
||||
}
|
||||
var ticks = make(chan time.Time)
|
||||
newTicker = func(_ time.Duration) *time.Ticker {
|
||||
return &time.Ticker{
|
||||
C: ticks,
|
||||
}
|
||||
}
|
||||
|
||||
d := make(chan bool)
|
||||
close(d)
|
||||
go stream.collectBatch(d)
|
||||
|
||||
longline := strings.Repeat("\xFF", maximumBytesPerEvent/3) // 0xFF is counted as the 3-byte utf8.RuneError
|
||||
stream.Log(&logger.Message{
|
||||
Line: []byte(longline + "\xFD"),
|
||||
Timestamp: time.Time{},
|
||||
})
|
||||
|
||||
// no ticks
|
||||
stream.Close()
|
||||
|
||||
argument := <-mockClient.putLogEventsArgument
|
||||
if argument == nil {
|
||||
t.Fatal("Expected non-nil PutLogEventsInput")
|
||||
}
|
||||
if len(argument.LogEvents) != 2 {
|
||||
t.Errorf("Expected LogEvents to contain 2 elements, but contains %d", len(argument.LogEvents))
|
||||
}
|
||||
if *argument.LogEvents[0].Message != longline {
|
||||
t.Errorf("Expected message to be %s but was %s", longline, *argument.LogEvents[0].Message)
|
||||
}
|
||||
if *argument.LogEvents[1].Message != "\xFD" {
|
||||
t.Errorf("Expected message to be %s but was %s", "\xFD", *argument.LogEvents[1].Message)
|
||||
}
|
||||
}
|
||||
|
||||
func TestCollectBatchMaxEvents(t *testing.T) {
|
||||
mockClient := newMockClientBuffered(1)
|
||||
stream := &logStream{
|
||||
|
@ -1125,6 +1230,83 @@ func TestCollectBatchMaxTotalBytes(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
func TestCollectBatchMaxTotalBytesWithBinary(t *testing.T) {
|
||||
expectedPuts := 2
|
||||
mockClient := newMockClientBuffered(expectedPuts)
|
||||
stream := &logStream{
|
||||
client: mockClient,
|
||||
logGroupName: groupName,
|
||||
logStreamName: streamName,
|
||||
sequenceToken: aws.String(sequenceToken),
|
||||
messages: make(chan *logger.Message),
|
||||
}
|
||||
for i := 0; i < expectedPuts; i++ {
|
||||
mockClient.putLogEventsResult <- &putLogEventsResult{
|
||||
successResult: &cloudwatchlogs.PutLogEventsOutput{
|
||||
NextSequenceToken: aws.String(nextSequenceToken),
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
var ticks = make(chan time.Time)
|
||||
newTicker = func(_ time.Duration) *time.Ticker {
|
||||
return &time.Ticker{
|
||||
C: ticks,
|
||||
}
|
||||
}
|
||||
|
||||
d := make(chan bool)
|
||||
close(d)
|
||||
go stream.collectBatch(d)
|
||||
|
||||
// maxline is the maximum line that could be submitted after
|
||||
// accounting for its overhead.
|
||||
maxline := strings.Repeat("\xFF", (maximumBytesPerPut-perEventBytes)/3) // 0xFF is counted as the 3-byte utf8.RuneError
|
||||
// This will be split and batched up to the `maximumBytesPerPut'
|
||||
// (+/- `maximumBytesPerEvent'). This /should/ be aligned, but
|
||||
// should also tolerate an offset within that range.
|
||||
stream.Log(&logger.Message{
|
||||
Line: []byte(maxline),
|
||||
Timestamp: time.Time{},
|
||||
})
|
||||
stream.Log(&logger.Message{
|
||||
Line: []byte("B"),
|
||||
Timestamp: time.Time{},
|
||||
})
|
||||
|
||||
// no ticks, guarantee batch by size (and chan close)
|
||||
stream.Close()
|
||||
|
||||
argument := <-mockClient.putLogEventsArgument
|
||||
if argument == nil {
|
||||
t.Fatal("Expected non-nil PutLogEventsInput")
|
||||
}
|
||||
|
||||
// Should total to the maximum allowed bytes.
|
||||
eventBytes := 0
|
||||
for _, event := range argument.LogEvents {
|
||||
eventBytes += effectiveLen(*event.Message)
|
||||
}
|
||||
eventsOverhead := len(argument.LogEvents) * perEventBytes
|
||||
payloadTotal := eventBytes + eventsOverhead
|
||||
// lowestMaxBatch allows the payload to be offset if the messages
|
||||
// don't lend themselves to align with the maximum event size.
|
||||
lowestMaxBatch := maximumBytesPerPut - maximumBytesPerEvent
|
||||
|
||||
if payloadTotal > maximumBytesPerPut {
|
||||
t.Errorf("Expected <= %d bytes but was %d", maximumBytesPerPut, payloadTotal)
|
||||
}
|
||||
if payloadTotal < lowestMaxBatch {
|
||||
t.Errorf("Batch to be no less than %d but was %d", lowestMaxBatch, payloadTotal)
|
||||
}
|
||||
|
||||
argument = <-mockClient.putLogEventsArgument
|
||||
message := *argument.LogEvents[len(argument.LogEvents)-1].Message
|
||||
if message[len(message)-1:] != "B" {
|
||||
t.Errorf("Expected message to be %s but was %s", "B", message[len(message)-1:])
|
||||
}
|
||||
}
|
||||
|
||||
func TestCollectBatchWithDuplicateTimestamps(t *testing.T) {
|
||||
mockClient := newMockClient()
|
||||
stream := &logStream{
|
||||
|
|
Loading…
Add table
Reference in a new issue