Merge pull request #43306 from corhere/logfile-data-race
daemon/logger: fix data race in LogFile
This commit is contained in:
commit
7025029b98
4 changed files with 88 additions and 12 deletions
|
@ -38,8 +38,6 @@ type PartialLogMetaData struct {
|
|||
// LogMessage is datastructure that represents piece of output produced by some
|
||||
// container. The Line member is a slice of an array whose contents can be
|
||||
// changed after a log driver's Log() method returns.
|
||||
// changes to this struct need to be reflect in the reset method in
|
||||
// daemon/logger/logger.go
|
||||
type LogMessage struct {
|
||||
Line []byte
|
||||
Source string
|
||||
|
|
|
@ -49,20 +49,12 @@ func PutMessage(msg *Message) {
|
|||
// Message is subtyped from backend.LogMessage because there is a lot of
|
||||
// internal complexity around the Message type that should not be exposed
|
||||
// to any package not explicitly importing the logger type.
|
||||
//
|
||||
// Any changes made to this struct must also be updated in the `reset` function
|
||||
type Message backend.LogMessage
|
||||
|
||||
// reset sets the message back to default values
|
||||
// This is used when putting a message back into the message pool.
|
||||
// Any changes to the `Message` struct should be reflected here.
|
||||
func (m *Message) reset() {
|
||||
m.Line = m.Line[:0]
|
||||
m.Source = ""
|
||||
m.Attrs = nil
|
||||
m.PLogMetaData = nil
|
||||
|
||||
m.Err = nil
|
||||
*m = Message{Line: m.Line[:0]}
|
||||
}
|
||||
|
||||
// AsLogMessage returns a pointer to the message as a pointer to
|
||||
|
|
|
@ -156,7 +156,9 @@ func (w *LogFile) WriteLogEntry(msg *logger.Message) error {
|
|||
return errors.Wrap(err, "error marshalling log message")
|
||||
}
|
||||
|
||||
ts := msg.Timestamp
|
||||
logger.PutMessage(msg)
|
||||
msg = nil // Turn use-after-put bugs into panics.
|
||||
|
||||
w.mu.Lock()
|
||||
if w.closed {
|
||||
|
@ -172,7 +174,7 @@ func (w *LogFile) WriteLogEntry(msg *logger.Message) error {
|
|||
n, err := w.f.Write(b)
|
||||
if err == nil {
|
||||
w.currentSize += int64(n)
|
||||
w.lastTimestamp = msg.Timestamp
|
||||
w.lastTimestamp = ts
|
||||
}
|
||||
|
||||
w.mu.Unlock()
|
||||
|
|
84
daemon/logger/loggerutils/logfile_race_test.go
Normal file
84
daemon/logger/loggerutils/logfile_race_test.go
Normal file
|
@ -0,0 +1,84 @@
|
|||
//go:build race
|
||||
// +build race
|
||||
|
||||
package loggerutils // import "github.com/docker/docker/daemon/logger/loggerutils"
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"io"
|
||||
"path/filepath"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/docker/docker/api/types/backend"
|
||||
"github.com/docker/docker/daemon/logger"
|
||||
"github.com/docker/docker/pkg/tailfile"
|
||||
"golang.org/x/sync/errgroup"
|
||||
"gotest.tools/v3/assert"
|
||||
)
|
||||
|
||||
func TestConcurrentLogging(t *testing.T) {
|
||||
const (
|
||||
containers = 5
|
||||
loggers = 3 // loggers per container
|
||||
messages = 50 // messages per logger
|
||||
|
||||
capacity = 256
|
||||
maxFiles = 3
|
||||
compress = true
|
||||
)
|
||||
getTailReader := func(ctx context.Context, r SizeReaderAt, lines int) (io.Reader, int, error) {
|
||||
return tailfile.NewTailReader(ctx, r, lines)
|
||||
}
|
||||
createDecoder := func(io.Reader) Decoder {
|
||||
return dummyDecoder{}
|
||||
}
|
||||
marshal := func(msg *logger.Message) ([]byte, error) {
|
||||
return []byte(fmt.Sprintf(
|
||||
"Line=%q Source=%q Timestamp=%v Attrs=%v PLogMetaData=%#v Err=%v",
|
||||
msg.Line, msg.Source, msg.Timestamp, msg.Attrs, msg.PLogMetaData, msg.Err,
|
||||
)), nil
|
||||
}
|
||||
g, ctx := errgroup.WithContext(context.Background())
|
||||
for ct := 0; ct < containers; ct++ {
|
||||
ct := ct
|
||||
dir := t.TempDir()
|
||||
g.Go(func() (err error) {
|
||||
logfile, err := NewLogFile(filepath.Join(dir, "log.log"), capacity, maxFiles, compress, marshal, createDecoder, 0644, getTailReader)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer func() {
|
||||
if cErr := logfile.Close(); cErr != nil && err == nil {
|
||||
err = cErr
|
||||
}
|
||||
}()
|
||||
lg, ctx := errgroup.WithContext(ctx)
|
||||
for ln := 0; ln < loggers; ln++ {
|
||||
ln := ln
|
||||
lg.Go(func() error {
|
||||
for m := 0; m < messages; m++ {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
default:
|
||||
}
|
||||
msg := logger.NewMessage()
|
||||
msg.Line = append(msg.Line, fmt.Sprintf("container=%v logger=%v msg=%v", ct, ln, m)...)
|
||||
msg.Source = "stdout"
|
||||
msg.Timestamp = time.Now()
|
||||
msg.Attrs = append(msg.Attrs, backend.LogAttr{Key: "foo", Value: "bar"})
|
||||
msg.PLogMetaData = &backend.PartialLogMetaData{ID: fmt.Sprintf("%v %v %v", ct, ln, m), Ordinal: 1, Last: true}
|
||||
if err := logfile.WriteLogEntry(msg); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
})
|
||||
}
|
||||
return lg.Wait()
|
||||
})
|
||||
}
|
||||
assert.NilError(t, g.Wait())
|
||||
}
|
Loading…
Add table
Reference in a new issue