Prechádzať zdrojové kódy

Merge pull request #41975 from thaJeztah/20.10_backport_41794_sized_logger

[20.10 backport] Handle long log messages correctly on SizedLogger
Brian Goff 4 rokov pred
rodič
commit
24e1d7fa59

+ 1 - 0
daemon/logger/awslogs/cloudwatchlogs.go

@@ -400,6 +400,7 @@ func (l *logStream) Name() string {
 	return name
 }
 
+// BufSize returns the maximum bytes CloudWatch can handle.
 func (l *logStream) BufSize() int {
 	return maximumBytesPerEvent
 }

+ 6 - 1
daemon/logger/copier.go

@@ -54,7 +54,12 @@ func (c *Copier) copySrc(name string, src io.Reader) {
 
 	bufSize := defaultBufSize
 	if sizedLogger, ok := c.dst.(SizedLogger); ok {
-		bufSize = sizedLogger.BufSize()
+		size := sizedLogger.BufSize()
+		// Loggers that wrap another loggers would have BufSize(), but cannot return the size
+		// when the wrapped loggers doesn't have BufSize().
+		if size > 0 {
+			bufSize = size
+		}
 	}
 	buf := make([]byte, bufSize)
 

+ 23 - 3
daemon/logger/copier_test.go

@@ -223,10 +223,28 @@ func TestCopierSlow(t *testing.T) {
 }
 
 func TestCopierWithSized(t *testing.T) {
+	t.Run("as is", func(t *testing.T) {
+		testCopierWithSized(t, func(l SizedLogger) SizedLogger {
+			return l
+		})
+	})
+	t.Run("With RingLogger", func(t *testing.T) {
+		testCopierWithSized(t, func(l SizedLogger) SizedLogger {
+			return newRingLogger(l, Info{}, defaultRingMaxSize)
+		})
+	})
+}
+
+func testCopierWithSized(t *testing.T, loggerFactory func(SizedLogger) SizedLogger) {
 	var jsonBuf bytes.Buffer
 	expectedMsgs := 2
-	sizedLogger := &TestSizedLoggerJSON{Encoder: json.NewEncoder(&jsonBuf)}
-	logbuf := bytes.NewBufferString(strings.Repeat(".", sizedLogger.BufSize()*expectedMsgs))
+	sizedLogger := loggerFactory(&TestSizedLoggerJSON{Encoder: json.NewEncoder(&jsonBuf)})
+
+	size := sizedLogger.BufSize()
+	if size < 0 {
+		size = 100
+	}
+	logbuf := bytes.NewBufferString(strings.Repeat(".", size*expectedMsgs))
 	c := NewCopier(map[string]io.Reader{"stdout": logbuf}, sizedLogger)
 
 	c.Run()
@@ -234,6 +252,8 @@ func TestCopierWithSized(t *testing.T) {
 	c.Wait()
 	c.Close()
 
+	sizedLogger.Close()
+
 	recvdMsgs := 0
 	dec := json.NewDecoder(&jsonBuf)
 	for {
@@ -253,7 +273,7 @@ func TestCopierWithSized(t *testing.T) {
 		recvdMsgs++
 	}
 	if recvdMsgs != expectedMsgs {
-		t.Fatalf("expected to receive %d messages, actually received %d", expectedMsgs, recvdMsgs)
+		t.Fatalf("expected to receive %d messages, actually received %d %q", expectedMsgs, recvdMsgs, jsonBuf.String())
 	}
 }
 

+ 11 - 0
daemon/logger/loggerutils/cache/local_cache.go

@@ -58,6 +58,17 @@ type loggerWithCache struct {
 	cache logger.Logger
 }
 
+var _ logger.SizedLogger = &loggerWithCache{}
+
+// BufSize returns the buffer size of the underlying logger.
+// Returns -1 if the logger doesn't match SizedLogger interface.
+func (l *loggerWithCache) BufSize() int {
+	if sl, ok := l.l.(logger.SizedLogger); ok {
+		return sl.BufSize()
+	}
+	return -1
+}
+
 func (l *loggerWithCache) Log(msg *logger.Message) error {
 	// copy the message as the original will be reset once the call to `Log` is complete
 	dup := logger.NewMessage()

+ 11 - 0
daemon/logger/ring.go

@@ -21,6 +21,8 @@ type RingLogger struct {
 	closeFlag int32
 }
 
+var _ SizedLogger = &RingLogger{}
+
 type ringWithReader struct {
 	*RingLogger
 }
@@ -57,6 +59,15 @@ func NewRingLogger(driver Logger, logInfo Info, maxSize int64) Logger {
 	return l
 }
 
+// BufSize returns the buffer size of the underlying logger.
+// Returns -1 if the logger doesn't match SizedLogger interface.
+func (r *RingLogger) BufSize() int {
+	if sl, ok := r.l.(SizedLogger); ok {
+		return sl.BufSize()
+	}
+	return -1
+}
+
 // Log queues messages into the ring buffer
 func (r *RingLogger) Log(msg *Message) error {
 	if r.closed() {