diff --git a/daemon/logger/jsonfilelog/jsonfilelog.go b/daemon/logger/jsonfilelog/jsonfilelog.go index 8ef82bbc33..ebd65f4526 100644 --- a/daemon/logger/jsonfilelog/jsonfilelog.go +++ b/daemon/logger/jsonfilelog/jsonfilelog.go @@ -8,6 +8,7 @@ import ( "encoding/json" "fmt" "strconv" + "sync" "github.com/docker/docker/daemon/logger" "github.com/docker/docker/daemon/logger/jsonfilelog/jsonlog" @@ -21,8 +22,10 @@ const Name = "json-file" // JSONFileLogger is Logger implementation for default Docker logging. type JSONFileLogger struct { - writer *loggerutils.LogFile - tag string // tag values requested by the user to log + writer *loggerutils.LogFile + tag string // tag values requested by the user to log + extra json.RawMessage + buffersPool sync.Pool } func init() { @@ -86,7 +89,7 @@ func New(info logger.Info) (logger.Logger, error) { attrs["tag"] = tag } - var extra []byte + var extra json.RawMessage if len(attrs) > 0 { var err error extra, err = json.Marshal(attrs) @@ -95,30 +98,40 @@ func New(info logger.Info) (logger.Logger, error) { } } - buf := bytes.NewBuffer(nil) - marshalFunc := func(msg *logger.Message) ([]byte, error) { - if err := marshalMessage(msg, extra, buf); err != nil { - return nil, err - } - b := buf.Bytes() - buf.Reset() - return b, nil - } - - writer, err := loggerutils.NewLogFile(info.LogPath, capval, maxFiles, compress, marshalFunc, decodeFunc, 0640, getTailReader) + writer, err := loggerutils.NewLogFile(info.LogPath, capval, maxFiles, compress, decodeFunc, 0640, getTailReader) if err != nil { return nil, err } return &JSONFileLogger{ - writer: writer, - tag: tag, + writer: writer, + tag: tag, + extra: extra, + buffersPool: makePool(), }, nil } +func makePool() sync.Pool { + // Every buffer will have to store the same constant json structure and the message + // len(`{"log":"","stream:"stdout","time":"2000-01-01T00:00:00.000000000Z"}\n`) = 68 + // So let's start with a buffer bigger than this + const initialBufSize = 128 + + return sync.Pool{New: func() interface{} { return bytes.NewBuffer(make([]byte, 0, initialBufSize)) }} +} + // Log converts logger.Message to jsonlog.JSONLog and serializes it to file. func (l *JSONFileLogger) Log(msg *logger.Message) error { - return l.writer.WriteLogEntry(msg) + defer logger.PutMessage(msg) + buf := l.buffersPool.Get().(*bytes.Buffer) + buf.Reset() + defer l.buffersPool.Put(buf) + + if err := marshalMessage(msg, l.extra, buf); err != nil { + return err + } + + return l.writer.WriteLogEntry(msg.Timestamp, buf.Bytes()) } func marshalMessage(msg *logger.Message, extra json.RawMessage, buf *bytes.Buffer) error { diff --git a/daemon/logger/local/local.go b/daemon/logger/local/local.go index fe1ef083f7..01c9f35a56 100644 --- a/daemon/logger/local/local.go +++ b/daemon/logger/local/local.go @@ -4,6 +4,7 @@ import ( "encoding/binary" "io" "strconv" + "sync" "time" "github.com/docker/docker/api/types/backend" @@ -55,7 +56,8 @@ func init() { } type driver struct { - logfile *loggerutils.LogFile + logfile *loggerutils.LogFile + buffersPool sync.Pool } // New creates a new local logger @@ -92,42 +94,34 @@ func New(info logger.Info) (logger.Logger, error) { return newDriver(info.LogPath, cfg) } -func makeMarshaller() func(m *logger.Message) ([]byte, error) { - buf := make([]byte, initialBufSize) +func marshal(m *logger.Message, buffer *[]byte) error { + proto := logdriver.LogEntry{} + md := logdriver.PartialLogEntryMetadata{} - // allocate the partial log entry separately, which allows for easier re-use - proto := &logdriver.LogEntry{} - md := &logdriver.PartialLogEntryMetadata{} + resetProto(&proto) - return func(m *logger.Message) ([]byte, error) { - resetProto(proto) + messageToProto(m, &proto, &md) + protoSize := proto.Size() + writeLen := protoSize + (2 * encodeBinaryLen) // + len(messageDelimiter) - messageToProto(m, proto, md) - protoSize := proto.Size() - writeLen := protoSize + (2 * encodeBinaryLen) // + len(messageDelimiter) - - if writeLen > len(buf) { - buf = make([]byte, writeLen) - } else { - // shrink the buffer back down - if writeLen <= initialBufSize { - buf = buf[:initialBufSize] - } else { - buf = buf[:writeLen] - } - } - - binary.BigEndian.PutUint32(buf[:encodeBinaryLen], uint32(protoSize)) - n, err := proto.MarshalTo(buf[encodeBinaryLen:writeLen]) - if err != nil { - return nil, errors.Wrap(err, "error marshaling log entry") - } - if n+(encodeBinaryLen*2) != writeLen { - return nil, io.ErrShortWrite - } - binary.BigEndian.PutUint32(buf[writeLen-encodeBinaryLen:writeLen], uint32(protoSize)) - return buf[:writeLen], nil + buf := *buffer + if writeLen > cap(buf) { + buf = make([]byte, writeLen) + } else { + buf = buf[:writeLen] } + *buffer = buf + + binary.BigEndian.PutUint32(buf[:encodeBinaryLen], uint32(protoSize)) + n, err := proto.MarshalTo(buf[encodeBinaryLen:writeLen]) + if err != nil { + return errors.Wrap(err, "error marshaling log entry") + } + if n+(encodeBinaryLen*2) != writeLen { + return io.ErrShortWrite + } + binary.BigEndian.PutUint32(buf[writeLen-encodeBinaryLen:writeLen], uint32(protoSize)) + return nil } func newDriver(logPath string, cfg *CreateConfig) (logger.Logger, error) { @@ -135,12 +129,16 @@ func newDriver(logPath string, cfg *CreateConfig) (logger.Logger, error) { return nil, errdefs.InvalidParameter(err) } - lf, err := loggerutils.NewLogFile(logPath, cfg.MaxFileSize, cfg.MaxFileCount, !cfg.DisableCompression, makeMarshaller(), decodeFunc, 0640, getTailReader) + lf, err := loggerutils.NewLogFile(logPath, cfg.MaxFileSize, cfg.MaxFileCount, !cfg.DisableCompression, decodeFunc, 0640, getTailReader) if err != nil { return nil, err } return &driver{ logfile: lf, + buffersPool: sync.Pool{New: func() interface{} { + b := make([]byte, initialBufSize) + return &b + }}, }, nil } @@ -149,7 +147,15 @@ func (d *driver) Name() string { } func (d *driver) Log(msg *logger.Message) error { - return d.logfile.WriteLogEntry(msg) + defer logger.PutMessage(msg) + buf := d.buffersPool.Get().(*[]byte) + defer d.buffersPool.Put(buf) + + err := marshal(msg, buf) + if err != nil { + return errors.Wrap(err, "error marshalling logger.Message") + } + return d.logfile.WriteLogEntry(msg.Timestamp, *buf) } func (d *driver) Close() error { diff --git a/daemon/logger/local/read_test.go b/daemon/logger/local/read_test.go index 4dd5f2043a..23dac2ac68 100644 --- a/daemon/logger/local/read_test.go +++ b/daemon/logger/local/read_test.go @@ -11,9 +11,9 @@ import ( ) func TestDecode(t *testing.T) { - marshal := makeMarshaller() + buf := make([]byte, 0) - buf, err := marshal(&logger.Message{Line: []byte("hello")}) + err := marshal(&logger.Message{Line: []byte("hello")}, &buf) assert.NilError(t, err) for i := 0; i < len(buf); i++ { diff --git a/daemon/logger/logger.go b/daemon/logger/logger.go index 05ea1b3af0..480932cd03 100644 --- a/daemon/logger/logger.go +++ b/daemon/logger/logger.go @@ -131,6 +131,3 @@ type Capability struct { // Determines if a log driver can read back logs ReadLogs bool } - -// MarshalFunc is a func that marshals a message into an arbitrary format -type MarshalFunc func(*Message) ([]byte, error) diff --git a/daemon/logger/loggerutils/logfile.go b/daemon/logger/loggerutils/logfile.go index 516b1dacec..decd2dfb34 100644 --- a/daemon/logger/loggerutils/logfile.go +++ b/daemon/logger/loggerutils/logfile.go @@ -44,7 +44,6 @@ type LogFile struct { // Log file codec - marshal logger.MarshalFunc createDecoder MakeDecoderFn getTailReader GetTailReaderFunc @@ -120,7 +119,7 @@ type readAtCloser interface { type GetTailReaderFunc func(ctx context.Context, f SizeReaderAt, nLogLines int) (rdr io.Reader, nLines int, err error) // NewLogFile creates new LogFile -func NewLogFile(logPath string, capacity int64, maxFiles int, compress bool, marshaller logger.MarshalFunc, decodeFunc MakeDecoderFn, perms os.FileMode, getTailReader GetTailReaderFunc) (*LogFile, error) { +func NewLogFile(logPath string, capacity int64, maxFiles int, compress bool, decodeFunc MakeDecoderFn, perms os.FileMode, getTailReader GetTailReaderFunc) (*LogFile, error) { log, err := openFile(logPath, os.O_WRONLY|os.O_APPEND|os.O_CREATE, perms) if err != nil { return nil, err @@ -149,7 +148,6 @@ func NewLogFile(logPath string, capacity int64, maxFiles int, compress bool, mar maxFiles: maxFiles, compress: compress, decompress: newSharedTempFileConverter(decompress), - marshal: marshaller, createDecoder: decodeFunc, perms: perms, getTailReader: getTailReader, @@ -158,16 +156,7 @@ func NewLogFile(logPath string, capacity int64, maxFiles int, compress bool, mar // WriteLogEntry writes the provided log message to the current log file. // This may trigger a rotation event if the max file/capacity limits are hit. -func (w *LogFile) WriteLogEntry(msg *logger.Message) error { - b, err := w.marshal(msg) - if err != nil { - return errors.Wrap(err, "error marshalling log message") - } - - ts := msg.Timestamp - logger.PutMessage(msg) - msg = nil // Turn use-after-put bugs into panics. - +func (w *LogFile) WriteLogEntry(timestamp time.Time, marshalled []byte) error { select { case <-w.closed: return errors.New("cannot write because the output file was closed") @@ -183,12 +172,12 @@ func (w *LogFile) WriteLogEntry(msg *logger.Message) error { } } - n, err := w.f.Write(b) + n, err := w.f.Write(marshalled) if err != nil { return errors.Wrap(err, "error writing log entry") } w.pos.size += int64(n) - w.lastTimestamp = ts + w.lastTimestamp = timestamp // Notify any waiting readers that there is a new log entry to read. st := <-w.read diff --git a/daemon/logger/loggerutils/logfile_test.go b/daemon/logger/loggerutils/logfile_test.go index df3d0a1226..f6edb38692 100644 --- a/daemon/logger/loggerutils/logfile_test.go +++ b/daemon/logger/loggerutils/logfile_test.go @@ -120,15 +120,11 @@ func TestCheckCapacityAndRotate(t *testing.T) { createDecoder := func(io.Reader) Decoder { return dummyDecoder{} } - marshal := func(msg *logger.Message) ([]byte, error) { - return msg.Line, nil - } l, err := NewLogFile( logPath, 5, // capacity 3, // maxFiles true, // compress - marshal, createDecoder, 0600, // perms getTailReader, @@ -138,14 +134,16 @@ func TestCheckCapacityAndRotate(t *testing.T) { ls := dirStringer{dir} - assert.NilError(t, l.WriteLogEntry(&logger.Message{Line: []byte("hello world!")})) + timestamp := time.Time{} + + assert.NilError(t, l.WriteLogEntry(timestamp, []byte("hello world!"))) _, err = os.Stat(logPath + ".1") assert.Assert(t, os.IsNotExist(err), ls) - assert.NilError(t, l.WriteLogEntry(&logger.Message{Line: []byte("hello world!")})) + assert.NilError(t, l.WriteLogEntry(timestamp, []byte("hello world!"))) poll.WaitOn(t, checkFileExists(logPath+".1.gz"), poll.WithDelay(time.Millisecond), poll.WithTimeout(30*time.Second)) - assert.NilError(t, l.WriteLogEntry(&logger.Message{Line: []byte("hello world!")})) + assert.NilError(t, l.WriteLogEntry(timestamp, []byte("hello world!"))) poll.WaitOn(t, checkFileExists(logPath+".1.gz"), poll.WithDelay(time.Millisecond), poll.WithTimeout(30*time.Second)) poll.WaitOn(t, checkFileExists(logPath+".2.gz"), poll.WithDelay(time.Millisecond), poll.WithTimeout(30*time.Second)) @@ -154,7 +152,7 @@ func TestCheckCapacityAndRotate(t *testing.T) { // down the line. // We want to make sure that we can recover in the case that `l.f` was closed while attempting a rotation. l.f.Close() - assert.NilError(t, l.WriteLogEntry(&logger.Message{Line: []byte("hello world!")})) + assert.NilError(t, l.WriteLogEntry(timestamp, []byte("hello world!"))) assert.NilError(t, os.Remove(logPath+".2.gz")) }) @@ -163,14 +161,14 @@ func TestCheckCapacityAndRotate(t *testing.T) { lw := l.ReadLogs(logger.ReadConfig{Follow: true, Tail: 1000}) defer lw.ConsumerGone() - assert.NilError(t, l.WriteLogEntry(&logger.Message{Line: []byte("hello world 0!")}), ls) + assert.NilError(t, l.WriteLogEntry(timestamp, []byte("hello world 0!")), ls) // make sure the log reader is primed waitForMsg(t, lw, 30*time.Second) - assert.NilError(t, l.WriteLogEntry(&logger.Message{Line: []byte("hello world 1!")}), ls) - assert.NilError(t, l.WriteLogEntry(&logger.Message{Line: []byte("hello world 2!")}), ls) - assert.NilError(t, l.WriteLogEntry(&logger.Message{Line: []byte("hello world 3!")}), ls) - assert.NilError(t, l.WriteLogEntry(&logger.Message{Line: []byte("hello world 4!")}), ls) + assert.NilError(t, l.WriteLogEntry(timestamp, []byte("hello world 1!")), ls) + assert.NilError(t, l.WriteLogEntry(timestamp, []byte("hello world 2!")), ls) + assert.NilError(t, l.WriteLogEntry(timestamp, []byte("hello world 3!")), ls) + assert.NilError(t, l.WriteLogEntry(timestamp, []byte("hello world 4!")), ls) poll.WaitOn(t, checkFileExists(logPath+".2.gz"), poll.WithDelay(time.Millisecond), poll.WithTimeout(30*time.Second)) }) }