diff --git a/api/types/backend/backend.go b/api/types/backend/backend.go index 9880c632bd..29d92026c4 100644 --- a/api/types/backend/backend.go +++ b/api/types/backend/backend.go @@ -38,8 +38,6 @@ type PartialLogMetaData struct { // LogMessage is datastructure that represents piece of output produced by some // container. The Line member is a slice of an array whose contents can be // changed after a log driver's Log() method returns. -// changes to this struct need to be reflect in the reset method in -// daemon/logger/logger.go type LogMessage struct { Line []byte Source string diff --git a/daemon/logger/logger.go b/daemon/logger/logger.go index 6308270b20..15899e07ed 100644 --- a/daemon/logger/logger.go +++ b/daemon/logger/logger.go @@ -49,20 +49,12 @@ func PutMessage(msg *Message) { // Message is subtyped from backend.LogMessage because there is a lot of // internal complexity around the Message type that should not be exposed // to any package not explicitly importing the logger type. -// -// Any changes made to this struct must also be updated in the `reset` function type Message backend.LogMessage // reset sets the message back to default values // This is used when putting a message back into the message pool. -// Any changes to the `Message` struct should be reflected here. func (m *Message) reset() { - m.Line = m.Line[:0] - m.Source = "" - m.Attrs = nil - m.PLogMetaData = nil - - m.Err = nil + *m = Message{Line: m.Line[:0]} } // AsLogMessage returns a pointer to the message as a pointer to diff --git a/daemon/logger/loggerutils/logfile.go b/daemon/logger/loggerutils/logfile.go index d4f3f5e149..9319ce5818 100644 --- a/daemon/logger/loggerutils/logfile.go +++ b/daemon/logger/loggerutils/logfile.go @@ -156,7 +156,9 @@ func (w *LogFile) WriteLogEntry(msg *logger.Message) error { return errors.Wrap(err, "error marshalling log message") } + ts := msg.Timestamp logger.PutMessage(msg) + msg = nil // Turn use-after-put bugs into panics. w.mu.Lock() if w.closed { @@ -172,7 +174,7 @@ func (w *LogFile) WriteLogEntry(msg *logger.Message) error { n, err := w.f.Write(b) if err == nil { w.currentSize += int64(n) - w.lastTimestamp = msg.Timestamp + w.lastTimestamp = ts } w.mu.Unlock() diff --git a/daemon/logger/loggerutils/logfile_race_test.go b/daemon/logger/loggerutils/logfile_race_test.go new file mode 100644 index 0000000000..f8dc3ad63b --- /dev/null +++ b/daemon/logger/loggerutils/logfile_race_test.go @@ -0,0 +1,84 @@ +//go:build race +// +build race + +package loggerutils // import "github.com/docker/docker/daemon/logger/loggerutils" + +import ( + "context" + "fmt" + "io" + "path/filepath" + "testing" + "time" + + "github.com/docker/docker/api/types/backend" + "github.com/docker/docker/daemon/logger" + "github.com/docker/docker/pkg/tailfile" + "golang.org/x/sync/errgroup" + "gotest.tools/v3/assert" +) + +func TestConcurrentLogging(t *testing.T) { + const ( + containers = 5 + loggers = 3 // loggers per container + messages = 50 // messages per logger + + capacity = 256 + maxFiles = 3 + compress = true + ) + getTailReader := func(ctx context.Context, r SizeReaderAt, lines int) (io.Reader, int, error) { + return tailfile.NewTailReader(ctx, r, lines) + } + createDecoder := func(io.Reader) Decoder { + return dummyDecoder{} + } + marshal := func(msg *logger.Message) ([]byte, error) { + return []byte(fmt.Sprintf( + "Line=%q Source=%q Timestamp=%v Attrs=%v PLogMetaData=%#v Err=%v", + msg.Line, msg.Source, msg.Timestamp, msg.Attrs, msg.PLogMetaData, msg.Err, + )), nil + } + g, ctx := errgroup.WithContext(context.Background()) + for ct := 0; ct < containers; ct++ { + ct := ct + dir := t.TempDir() + g.Go(func() (err error) { + logfile, err := NewLogFile(filepath.Join(dir, "log.log"), capacity, maxFiles, compress, marshal, createDecoder, 0644, getTailReader) + if err != nil { + return err + } + defer func() { + if cErr := logfile.Close(); cErr != nil && err == nil { + err = cErr + } + }() + lg, ctx := errgroup.WithContext(ctx) + for ln := 0; ln < loggers; ln++ { + ln := ln + lg.Go(func() error { + for m := 0; m < messages; m++ { + select { + case <-ctx.Done(): + return ctx.Err() + default: + } + msg := logger.NewMessage() + msg.Line = append(msg.Line, fmt.Sprintf("container=%v logger=%v msg=%v", ct, ln, m)...) + msg.Source = "stdout" + msg.Timestamp = time.Now() + msg.Attrs = append(msg.Attrs, backend.LogAttr{Key: "foo", Value: "bar"}) + msg.PLogMetaData = &backend.PartialLogMetaData{ID: fmt.Sprintf("%v %v %v", ct, ln, m), Ordinal: 1, Last: true} + if err := logfile.WriteLogEntry(msg); err != nil { + return err + } + } + return nil + }) + } + return lg.Wait() + }) + } + assert.NilError(t, g.Wait()) +}