Просмотр исходного кода

Handle long log messages correctly on SizedLogger

Loggers that implement BufSize() (e.g. awslogs) uses the method to
tell Copier about the maximum log line length. However loggerWithCache
and RingBuffer hide the method by wrapping loggers.

As a result, Copier uses its default 16KB limit which breaks log
lines > 16kB even the destinations can handle that.

This change implements BufSize() on loggerWithCache and RingBuffer to
make sure these logger wrappes don't hide the method on the underlying
loggers.

Fixes #41794.

Signed-off-by: Kazuyoshi Kato <katokazu@amazon.com>
Kazuyoshi Kato 4 лет назад
Родитель
Сommit
bb11365e96

+ 1 - 0
daemon/logger/awslogs/cloudwatchlogs.go

@@ -400,6 +400,7 @@ func (l *logStream) Name() string {
 	return name
 }
 
+// BufSize returns the maximum bytes CloudWatch can handle.
 func (l *logStream) BufSize() int {
 	return maximumBytesPerEvent
 }

+ 6 - 1
daemon/logger/copier.go

@@ -54,7 +54,12 @@ func (c *Copier) copySrc(name string, src io.Reader) {
 
 	bufSize := defaultBufSize
 	if sizedLogger, ok := c.dst.(SizedLogger); ok {
-		bufSize = sizedLogger.BufSize()
+		size := sizedLogger.BufSize()
+		// Loggers that wrap another loggers would have BufSize(), but cannot return the size
+		// when the wrapped loggers doesn't have BufSize().
+		if size > 0 {
+			bufSize = size
+		}
 	}
 	buf := make([]byte, bufSize)
 

+ 23 - 3
daemon/logger/copier_test.go

@@ -223,10 +223,28 @@ func TestCopierSlow(t *testing.T) {
 }
 
 func TestCopierWithSized(t *testing.T) {
+	t.Run("as is", func(t *testing.T) {
+		testCopierWithSized(t, func(l SizedLogger) SizedLogger {
+			return l
+		})
+	})
+	t.Run("With RingLogger", func(t *testing.T) {
+		testCopierWithSized(t, func(l SizedLogger) SizedLogger {
+			return newRingLogger(l, Info{}, defaultRingMaxSize)
+		})
+	})
+}
+
+func testCopierWithSized(t *testing.T, loggerFactory func(SizedLogger) SizedLogger) {
 	var jsonBuf bytes.Buffer
 	expectedMsgs := 2
-	sizedLogger := &TestSizedLoggerJSON{Encoder: json.NewEncoder(&jsonBuf)}
-	logbuf := bytes.NewBufferString(strings.Repeat(".", sizedLogger.BufSize()*expectedMsgs))
+	sizedLogger := loggerFactory(&TestSizedLoggerJSON{Encoder: json.NewEncoder(&jsonBuf)})
+
+	size := sizedLogger.BufSize()
+	if size < 0 {
+		size = 100
+	}
+	logbuf := bytes.NewBufferString(strings.Repeat(".", size*expectedMsgs))
 	c := NewCopier(map[string]io.Reader{"stdout": logbuf}, sizedLogger)
 
 	c.Run()
@@ -234,6 +252,8 @@ func TestCopierWithSized(t *testing.T) {
 	c.Wait()
 	c.Close()
 
+	sizedLogger.Close()
+
 	recvdMsgs := 0
 	dec := json.NewDecoder(&jsonBuf)
 	for {
@@ -253,7 +273,7 @@ func TestCopierWithSized(t *testing.T) {
 		recvdMsgs++
 	}
 	if recvdMsgs != expectedMsgs {
-		t.Fatalf("expected to receive %d messages, actually received %d", expectedMsgs, recvdMsgs)
+		t.Fatalf("expected to receive %d messages, actually received %d %q", expectedMsgs, recvdMsgs, jsonBuf.String())
 	}
 }
 

+ 11 - 0
daemon/logger/loggerutils/cache/local_cache.go

@@ -58,6 +58,17 @@ type loggerWithCache struct {
 	cache logger.Logger
 }
 
+var _ logger.SizedLogger = &loggerWithCache{}
+
+// BufSize returns the buffer size of the underlying logger.
+// Returns -1 if the logger doesn't match SizedLogger interface.
+func (l *loggerWithCache) BufSize() int {
+	if sl, ok := l.l.(logger.SizedLogger); ok {
+		return sl.BufSize()
+	}
+	return -1
+}
+
 func (l *loggerWithCache) Log(msg *logger.Message) error {
 	// copy the message as the original will be reset once the call to `Log` is complete
 	dup := logger.NewMessage()

+ 11 - 0
daemon/logger/ring.go

@@ -21,6 +21,8 @@ type RingLogger struct {
 	closeFlag int32
 }
 
+var _ SizedLogger = &RingLogger{}
+
 type ringWithReader struct {
 	*RingLogger
 }
@@ -57,6 +59,15 @@ func NewRingLogger(driver Logger, logInfo Info, maxSize int64) Logger {
 	return l
 }
 
+// BufSize returns the buffer size of the underlying logger.
+// Returns -1 if the logger doesn't match SizedLogger interface.
+func (r *RingLogger) BufSize() int {
+	if sl, ok := r.l.(SizedLogger); ok {
+		return sl.BufSize()
+	}
+	return -1
+}
+
 // Log queues messages into the ring buffer
 func (r *RingLogger) Log(msg *Message) error {
 	if r.closed() {