Merge pull request #43650 from vvoland/fix-logs-regression

daemon/logger: Share buffers by sync.Pool
This commit is contained in:
Sebastiaan van Stijn 2022-05-28 14:21:15 +02:00 committed by GitHub
commit 467c275b58
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
6 changed files with 88 additions and 85 deletions

View file

@ -8,6 +8,7 @@ import (
"encoding/json"
"fmt"
"strconv"
"sync"
"github.com/docker/docker/daemon/logger"
"github.com/docker/docker/daemon/logger/jsonfilelog/jsonlog"
@ -21,8 +22,10 @@ const Name = "json-file"
// JSONFileLogger is Logger implementation for default Docker logging.
type JSONFileLogger struct {
writer *loggerutils.LogFile
tag string // tag values requested by the user to log
writer *loggerutils.LogFile
tag string // tag values requested by the user to log
extra json.RawMessage
buffersPool sync.Pool
}
func init() {
@ -86,7 +89,7 @@ func New(info logger.Info) (logger.Logger, error) {
attrs["tag"] = tag
}
var extra []byte
var extra json.RawMessage
if len(attrs) > 0 {
var err error
extra, err = json.Marshal(attrs)
@ -95,30 +98,40 @@ func New(info logger.Info) (logger.Logger, error) {
}
}
buf := bytes.NewBuffer(nil)
marshalFunc := func(msg *logger.Message) ([]byte, error) {
if err := marshalMessage(msg, extra, buf); err != nil {
return nil, err
}
b := buf.Bytes()
buf.Reset()
return b, nil
}
writer, err := loggerutils.NewLogFile(info.LogPath, capval, maxFiles, compress, marshalFunc, decodeFunc, 0640, getTailReader)
writer, err := loggerutils.NewLogFile(info.LogPath, capval, maxFiles, compress, decodeFunc, 0640, getTailReader)
if err != nil {
return nil, err
}
return &JSONFileLogger{
writer: writer,
tag: tag,
writer: writer,
tag: tag,
extra: extra,
buffersPool: makePool(),
}, nil
}
func makePool() sync.Pool {
// Every buffer will have to store the same constant json structure and the message
// len(`{"log":"","stream:"stdout","time":"2000-01-01T00:00:00.000000000Z"}\n`) = 68
// So let's start with a buffer bigger than this
const initialBufSize = 128
return sync.Pool{New: func() interface{} { return bytes.NewBuffer(make([]byte, 0, initialBufSize)) }}
}
// Log converts logger.Message to jsonlog.JSONLog and serializes it to file.
func (l *JSONFileLogger) Log(msg *logger.Message) error {
return l.writer.WriteLogEntry(msg)
defer logger.PutMessage(msg)
buf := l.buffersPool.Get().(*bytes.Buffer)
buf.Reset()
defer l.buffersPool.Put(buf)
if err := marshalMessage(msg, l.extra, buf); err != nil {
return err
}
return l.writer.WriteLogEntry(msg.Timestamp, buf.Bytes())
}
func marshalMessage(msg *logger.Message, extra json.RawMessage, buf *bytes.Buffer) error {

View file

@ -4,6 +4,7 @@ import (
"encoding/binary"
"io"
"strconv"
"sync"
"time"
"github.com/docker/docker/api/types/backend"
@ -55,7 +56,8 @@ func init() {
}
type driver struct {
logfile *loggerutils.LogFile
logfile *loggerutils.LogFile
buffersPool sync.Pool
}
// New creates a new local logger
@ -92,42 +94,34 @@ func New(info logger.Info) (logger.Logger, error) {
return newDriver(info.LogPath, cfg)
}
func makeMarshaller() func(m *logger.Message) ([]byte, error) {
buf := make([]byte, initialBufSize)
func marshal(m *logger.Message, buffer *[]byte) error {
proto := logdriver.LogEntry{}
md := logdriver.PartialLogEntryMetadata{}
// allocate the partial log entry separately, which allows for easier re-use
proto := &logdriver.LogEntry{}
md := &logdriver.PartialLogEntryMetadata{}
resetProto(&proto)
return func(m *logger.Message) ([]byte, error) {
resetProto(proto)
messageToProto(m, &proto, &md)
protoSize := proto.Size()
writeLen := protoSize + (2 * encodeBinaryLen) // + len(messageDelimiter)
messageToProto(m, proto, md)
protoSize := proto.Size()
writeLen := protoSize + (2 * encodeBinaryLen) // + len(messageDelimiter)
if writeLen > len(buf) {
buf = make([]byte, writeLen)
} else {
// shrink the buffer back down
if writeLen <= initialBufSize {
buf = buf[:initialBufSize]
} else {
buf = buf[:writeLen]
}
}
binary.BigEndian.PutUint32(buf[:encodeBinaryLen], uint32(protoSize))
n, err := proto.MarshalTo(buf[encodeBinaryLen:writeLen])
if err != nil {
return nil, errors.Wrap(err, "error marshaling log entry")
}
if n+(encodeBinaryLen*2) != writeLen {
return nil, io.ErrShortWrite
}
binary.BigEndian.PutUint32(buf[writeLen-encodeBinaryLen:writeLen], uint32(protoSize))
return buf[:writeLen], nil
buf := *buffer
if writeLen > cap(buf) {
buf = make([]byte, writeLen)
} else {
buf = buf[:writeLen]
}
*buffer = buf
binary.BigEndian.PutUint32(buf[:encodeBinaryLen], uint32(protoSize))
n, err := proto.MarshalTo(buf[encodeBinaryLen:writeLen])
if err != nil {
return errors.Wrap(err, "error marshaling log entry")
}
if n+(encodeBinaryLen*2) != writeLen {
return io.ErrShortWrite
}
binary.BigEndian.PutUint32(buf[writeLen-encodeBinaryLen:writeLen], uint32(protoSize))
return nil
}
func newDriver(logPath string, cfg *CreateConfig) (logger.Logger, error) {
@ -135,12 +129,16 @@ func newDriver(logPath string, cfg *CreateConfig) (logger.Logger, error) {
return nil, errdefs.InvalidParameter(err)
}
lf, err := loggerutils.NewLogFile(logPath, cfg.MaxFileSize, cfg.MaxFileCount, !cfg.DisableCompression, makeMarshaller(), decodeFunc, 0640, getTailReader)
lf, err := loggerutils.NewLogFile(logPath, cfg.MaxFileSize, cfg.MaxFileCount, !cfg.DisableCompression, decodeFunc, 0640, getTailReader)
if err != nil {
return nil, err
}
return &driver{
logfile: lf,
buffersPool: sync.Pool{New: func() interface{} {
b := make([]byte, initialBufSize)
return &b
}},
}, nil
}
@ -149,7 +147,15 @@ func (d *driver) Name() string {
}
func (d *driver) Log(msg *logger.Message) error {
return d.logfile.WriteLogEntry(msg)
defer logger.PutMessage(msg)
buf := d.buffersPool.Get().(*[]byte)
defer d.buffersPool.Put(buf)
err := marshal(msg, buf)
if err != nil {
return errors.Wrap(err, "error marshalling logger.Message")
}
return d.logfile.WriteLogEntry(msg.Timestamp, *buf)
}
func (d *driver) Close() error {

View file

@ -11,9 +11,9 @@ import (
)
func TestDecode(t *testing.T) {
marshal := makeMarshaller()
buf := make([]byte, 0)
buf, err := marshal(&logger.Message{Line: []byte("hello")})
err := marshal(&logger.Message{Line: []byte("hello")}, &buf)
assert.NilError(t, err)
for i := 0; i < len(buf); i++ {

View file

@ -131,6 +131,3 @@ type Capability struct {
// Determines if a log driver can read back logs
ReadLogs bool
}
// MarshalFunc is a func that marshals a message into an arbitrary format
type MarshalFunc func(*Message) ([]byte, error)

View file

@ -44,7 +44,6 @@ type LogFile struct {
// Log file codec
marshal logger.MarshalFunc
createDecoder MakeDecoderFn
getTailReader GetTailReaderFunc
@ -120,7 +119,7 @@ type readAtCloser interface {
type GetTailReaderFunc func(ctx context.Context, f SizeReaderAt, nLogLines int) (rdr io.Reader, nLines int, err error)
// NewLogFile creates new LogFile
func NewLogFile(logPath string, capacity int64, maxFiles int, compress bool, marshaller logger.MarshalFunc, decodeFunc MakeDecoderFn, perms os.FileMode, getTailReader GetTailReaderFunc) (*LogFile, error) {
func NewLogFile(logPath string, capacity int64, maxFiles int, compress bool, decodeFunc MakeDecoderFn, perms os.FileMode, getTailReader GetTailReaderFunc) (*LogFile, error) {
log, err := openFile(logPath, os.O_WRONLY|os.O_APPEND|os.O_CREATE, perms)
if err != nil {
return nil, err
@ -149,7 +148,6 @@ func NewLogFile(logPath string, capacity int64, maxFiles int, compress bool, mar
maxFiles: maxFiles,
compress: compress,
decompress: newSharedTempFileConverter(decompress),
marshal: marshaller,
createDecoder: decodeFunc,
perms: perms,
getTailReader: getTailReader,
@ -158,16 +156,7 @@ func NewLogFile(logPath string, capacity int64, maxFiles int, compress bool, mar
// WriteLogEntry writes the provided log message to the current log file.
// This may trigger a rotation event if the max file/capacity limits are hit.
func (w *LogFile) WriteLogEntry(msg *logger.Message) error {
b, err := w.marshal(msg)
if err != nil {
return errors.Wrap(err, "error marshalling log message")
}
ts := msg.Timestamp
logger.PutMessage(msg)
msg = nil // Turn use-after-put bugs into panics.
func (w *LogFile) WriteLogEntry(timestamp time.Time, marshalled []byte) error {
select {
case <-w.closed:
return errors.New("cannot write because the output file was closed")
@ -183,12 +172,12 @@ func (w *LogFile) WriteLogEntry(msg *logger.Message) error {
}
}
n, err := w.f.Write(b)
n, err := w.f.Write(marshalled)
if err != nil {
return errors.Wrap(err, "error writing log entry")
}
w.pos.size += int64(n)
w.lastTimestamp = ts
w.lastTimestamp = timestamp
// Notify any waiting readers that there is a new log entry to read.
st := <-w.read

View file

@ -120,15 +120,11 @@ func TestCheckCapacityAndRotate(t *testing.T) {
createDecoder := func(io.Reader) Decoder {
return dummyDecoder{}
}
marshal := func(msg *logger.Message) ([]byte, error) {
return msg.Line, nil
}
l, err := NewLogFile(
logPath,
5, // capacity
3, // maxFiles
true, // compress
marshal,
createDecoder,
0600, // perms
getTailReader,
@ -138,14 +134,16 @@ func TestCheckCapacityAndRotate(t *testing.T) {
ls := dirStringer{dir}
assert.NilError(t, l.WriteLogEntry(&logger.Message{Line: []byte("hello world!")}))
timestamp := time.Time{}
assert.NilError(t, l.WriteLogEntry(timestamp, []byte("hello world!")))
_, err = os.Stat(logPath + ".1")
assert.Assert(t, os.IsNotExist(err), ls)
assert.NilError(t, l.WriteLogEntry(&logger.Message{Line: []byte("hello world!")}))
assert.NilError(t, l.WriteLogEntry(timestamp, []byte("hello world!")))
poll.WaitOn(t, checkFileExists(logPath+".1.gz"), poll.WithDelay(time.Millisecond), poll.WithTimeout(30*time.Second))
assert.NilError(t, l.WriteLogEntry(&logger.Message{Line: []byte("hello world!")}))
assert.NilError(t, l.WriteLogEntry(timestamp, []byte("hello world!")))
poll.WaitOn(t, checkFileExists(logPath+".1.gz"), poll.WithDelay(time.Millisecond), poll.WithTimeout(30*time.Second))
poll.WaitOn(t, checkFileExists(logPath+".2.gz"), poll.WithDelay(time.Millisecond), poll.WithTimeout(30*time.Second))
@ -154,7 +152,7 @@ func TestCheckCapacityAndRotate(t *testing.T) {
// down the line.
// We want to make sure that we can recover in the case that `l.f` was closed while attempting a rotation.
l.f.Close()
assert.NilError(t, l.WriteLogEntry(&logger.Message{Line: []byte("hello world!")}))
assert.NilError(t, l.WriteLogEntry(timestamp, []byte("hello world!")))
assert.NilError(t, os.Remove(logPath+".2.gz"))
})
@ -163,14 +161,14 @@ func TestCheckCapacityAndRotate(t *testing.T) {
lw := l.ReadLogs(logger.ReadConfig{Follow: true, Tail: 1000})
defer lw.ConsumerGone()
assert.NilError(t, l.WriteLogEntry(&logger.Message{Line: []byte("hello world 0!")}), ls)
assert.NilError(t, l.WriteLogEntry(timestamp, []byte("hello world 0!")), ls)
// make sure the log reader is primed
waitForMsg(t, lw, 30*time.Second)
assert.NilError(t, l.WriteLogEntry(&logger.Message{Line: []byte("hello world 1!")}), ls)
assert.NilError(t, l.WriteLogEntry(&logger.Message{Line: []byte("hello world 2!")}), ls)
assert.NilError(t, l.WriteLogEntry(&logger.Message{Line: []byte("hello world 3!")}), ls)
assert.NilError(t, l.WriteLogEntry(&logger.Message{Line: []byte("hello world 4!")}), ls)
assert.NilError(t, l.WriteLogEntry(timestamp, []byte("hello world 1!")), ls)
assert.NilError(t, l.WriteLogEntry(timestamp, []byte("hello world 2!")), ls)
assert.NilError(t, l.WriteLogEntry(timestamp, []byte("hello world 3!")), ls)
assert.NilError(t, l.WriteLogEntry(timestamp, []byte("hello world 4!")), ls)
poll.WaitOn(t, checkFileExists(logPath+".2.gz"), poll.WithDelay(time.Millisecond), poll.WithTimeout(30*time.Second))
})
}