Handle long log messages correctly on SizedLogger

Loggers that implement BufSize() (e.g. awslogs) uses the method to
tell Copier about the maximum log line length. However loggerWithCache
and RingBuffer hide the method by wrapping loggers.

As a result, Copier uses its default 16KB limit which breaks log
lines > 16kB even the destinations can handle that.

This change implements BufSize() on loggerWithCache and RingBuffer to
make sure these logger wrappes don't hide the method on the underlying
loggers.

Fixes #41794.

Signed-off-by: Kazuyoshi Kato <katokazu@amazon.com>
(cherry picked from commit bb11365e96)
Signed-off-by: Sebastiaan van Stijn <github@gone.nl>
This commit is contained in:
Kazuyoshi Kato 2021-01-20 13:18:41 -08:00 committed by Sebastiaan van Stijn
parent fae366b323
commit d13e162a63
No known key found for this signature in database
GPG key ID: 76698F39D527CE8C
5 changed files with 52 additions and 4 deletions

View file

@ -400,6 +400,7 @@ func (l *logStream) Name() string {
return name return name
} }
// BufSize returns the maximum bytes CloudWatch can handle.
func (l *logStream) BufSize() int { func (l *logStream) BufSize() int {
return maximumBytesPerEvent return maximumBytesPerEvent
} }

View file

@ -54,7 +54,12 @@ func (c *Copier) copySrc(name string, src io.Reader) {
bufSize := defaultBufSize bufSize := defaultBufSize
if sizedLogger, ok := c.dst.(SizedLogger); ok { if sizedLogger, ok := c.dst.(SizedLogger); ok {
bufSize = sizedLogger.BufSize() size := sizedLogger.BufSize()
// Loggers that wrap another loggers would have BufSize(), but cannot return the size
// when the wrapped loggers doesn't have BufSize().
if size > 0 {
bufSize = size
}
} }
buf := make([]byte, bufSize) buf := make([]byte, bufSize)

View file

@ -223,10 +223,28 @@ func TestCopierSlow(t *testing.T) {
} }
func TestCopierWithSized(t *testing.T) { func TestCopierWithSized(t *testing.T) {
t.Run("as is", func(t *testing.T) {
testCopierWithSized(t, func(l SizedLogger) SizedLogger {
return l
})
})
t.Run("With RingLogger", func(t *testing.T) {
testCopierWithSized(t, func(l SizedLogger) SizedLogger {
return newRingLogger(l, Info{}, defaultRingMaxSize)
})
})
}
func testCopierWithSized(t *testing.T, loggerFactory func(SizedLogger) SizedLogger) {
var jsonBuf bytes.Buffer var jsonBuf bytes.Buffer
expectedMsgs := 2 expectedMsgs := 2
sizedLogger := &TestSizedLoggerJSON{Encoder: json.NewEncoder(&jsonBuf)} sizedLogger := loggerFactory(&TestSizedLoggerJSON{Encoder: json.NewEncoder(&jsonBuf)})
logbuf := bytes.NewBufferString(strings.Repeat(".", sizedLogger.BufSize()*expectedMsgs))
size := sizedLogger.BufSize()
if size < 0 {
size = 100
}
logbuf := bytes.NewBufferString(strings.Repeat(".", size*expectedMsgs))
c := NewCopier(map[string]io.Reader{"stdout": logbuf}, sizedLogger) c := NewCopier(map[string]io.Reader{"stdout": logbuf}, sizedLogger)
c.Run() c.Run()
@ -234,6 +252,8 @@ func TestCopierWithSized(t *testing.T) {
c.Wait() c.Wait()
c.Close() c.Close()
sizedLogger.Close()
recvdMsgs := 0 recvdMsgs := 0
dec := json.NewDecoder(&jsonBuf) dec := json.NewDecoder(&jsonBuf)
for { for {
@ -253,7 +273,7 @@ func TestCopierWithSized(t *testing.T) {
recvdMsgs++ recvdMsgs++
} }
if recvdMsgs != expectedMsgs { if recvdMsgs != expectedMsgs {
t.Fatalf("expected to receive %d messages, actually received %d", expectedMsgs, recvdMsgs) t.Fatalf("expected to receive %d messages, actually received %d %q", expectedMsgs, recvdMsgs, jsonBuf.String())
} }
} }

View file

@ -58,6 +58,17 @@ type loggerWithCache struct {
cache logger.Logger cache logger.Logger
} }
var _ logger.SizedLogger = &loggerWithCache{}
// BufSize returns the buffer size of the underlying logger.
// Returns -1 if the logger doesn't match SizedLogger interface.
func (l *loggerWithCache) BufSize() int {
if sl, ok := l.l.(logger.SizedLogger); ok {
return sl.BufSize()
}
return -1
}
func (l *loggerWithCache) Log(msg *logger.Message) error { func (l *loggerWithCache) Log(msg *logger.Message) error {
// copy the message as the original will be reset once the call to `Log` is complete // copy the message as the original will be reset once the call to `Log` is complete
dup := logger.NewMessage() dup := logger.NewMessage()

View file

@ -21,6 +21,8 @@ type RingLogger struct {
closeFlag int32 closeFlag int32
} }
var _ SizedLogger = &RingLogger{}
type ringWithReader struct { type ringWithReader struct {
*RingLogger *RingLogger
} }
@ -57,6 +59,15 @@ func NewRingLogger(driver Logger, logInfo Info, maxSize int64) Logger {
return l return l
} }
// BufSize returns the buffer size of the underlying logger.
// Returns -1 if the logger doesn't match SizedLogger interface.
func (r *RingLogger) BufSize() int {
if sl, ok := r.l.(SizedLogger); ok {
return sl.BufSize()
}
return -1
}
// Log queues messages into the ring buffer // Log queues messages into the ring buffer
func (r *RingLogger) Log(msg *Message) error { func (r *RingLogger) Log(msg *Message) error {
if r.closed() { if r.closed() {