Merge pull request #34020 from cpuguy83/17.03_fix_log_reader
[17.03] Fix log readers can block writes indefinitely
This commit is contained in:
commit
c1edccdf5d
2 changed files with 66 additions and 36 deletions
|
@ -7,6 +7,7 @@ import (
|
|||
"bytes"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"io"
|
||||
"strconv"
|
||||
"sync"
|
||||
|
||||
|
@ -15,6 +16,7 @@ import (
|
|||
"github.com/docker/docker/daemon/logger/loggerutils"
|
||||
"github.com/docker/docker/pkg/jsonlog"
|
||||
"github.com/docker/go-units"
|
||||
"github.com/pkg/errors"
|
||||
)
|
||||
|
||||
// Name is the name of the file that the jsonlogger logs to.
|
||||
|
@ -22,11 +24,13 @@ const Name = "json-file"
|
|||
|
||||
// JSONFileLogger is Logger implementation for default Docker logging.
|
||||
type JSONFileLogger struct {
|
||||
buf *bytes.Buffer
|
||||
extra []byte // json-encoded extra attributes
|
||||
|
||||
mu sync.RWMutex
|
||||
buf *bytes.Buffer // avoids allocating a new buffer on each call to `Log()`
|
||||
closed bool
|
||||
writer *loggerutils.RotateFileWriter
|
||||
mu sync.Mutex
|
||||
readers map[*logger.LogWatcher]struct{} // stores the active log followers
|
||||
extra []byte // json-encoded extra attributes
|
||||
}
|
||||
|
||||
func init() {
|
||||
|
@ -85,32 +89,43 @@ func New(ctx logger.Context) (logger.Logger, error) {
|
|||
|
||||
// Log converts logger.Message to jsonlog.JSONLog and serializes it to file.
|
||||
func (l *JSONFileLogger) Log(msg *logger.Message) error {
|
||||
l.mu.Lock()
|
||||
err := writeMessageBuf(l.writer, msg, l.extra, l.buf)
|
||||
l.buf.Reset()
|
||||
l.mu.Unlock()
|
||||
return err
|
||||
}
|
||||
|
||||
func writeMessageBuf(w io.Writer, m *logger.Message, extra json.RawMessage, buf *bytes.Buffer) error {
|
||||
if err := marshalMessage(m, extra, buf); err != nil {
|
||||
return err
|
||||
}
|
||||
if _, err := w.Write(buf.Bytes()); err != nil {
|
||||
return errors.Wrap(err, "error writing log entry")
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func marshalMessage(msg *logger.Message, extra json.RawMessage, buf *bytes.Buffer) error {
|
||||
timestamp, err := jsonlog.FastTimeMarshalJSON(msg.Timestamp)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
l.mu.Lock()
|
||||
logline := msg.Line
|
||||
logLine := msg.Line
|
||||
if !msg.Partial {
|
||||
logline = append(msg.Line, '\n')
|
||||
logLine = append(msg.Line, '\n')
|
||||
}
|
||||
err = (&jsonlog.JSONLogs{
|
||||
Log: logline,
|
||||
Log: logLine,
|
||||
Stream: msg.Source,
|
||||
Created: timestamp,
|
||||
RawAttrs: l.extra,
|
||||
}).MarshalJSONBuf(l.buf)
|
||||
RawAttrs: extra,
|
||||
}).MarshalJSONBuf(buf)
|
||||
if err != nil {
|
||||
l.mu.Unlock()
|
||||
return err
|
||||
return errors.Wrap(err, "error writing log message to buffer")
|
||||
}
|
||||
|
||||
l.buf.WriteByte('\n')
|
||||
_, err = l.writer.Write(l.buf.Bytes())
|
||||
l.buf.Reset()
|
||||
l.mu.Unlock()
|
||||
|
||||
return err
|
||||
err = buf.WriteByte('\n')
|
||||
return errors.Wrap(err, "error finalizing log buffer")
|
||||
}
|
||||
|
||||
// ValidateLogOpt looks for json specific log options max-file & max-size.
|
||||
|
|
|
@ -3,7 +3,6 @@ package jsonfilelog
|
|||
import (
|
||||
"bytes"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"os"
|
||||
|
@ -18,6 +17,7 @@ import (
|
|||
"github.com/docker/docker/pkg/ioutils"
|
||||
"github.com/docker/docker/pkg/jsonlog"
|
||||
"github.com/docker/docker/pkg/tailfile"
|
||||
"github.com/pkg/errors"
|
||||
)
|
||||
|
||||
const maxJSONDecodeRetry = 20000
|
||||
|
@ -48,10 +48,11 @@ func (l *JSONFileLogger) ReadLogs(config logger.ReadConfig) *logger.LogWatcher {
|
|||
func (l *JSONFileLogger) readLogs(logWatcher *logger.LogWatcher, config logger.ReadConfig) {
|
||||
defer close(logWatcher.Msg)
|
||||
|
||||
// lock so the read stream doesn't get corrupted due to rotations or other log data written while we read
|
||||
// lock so the read stream doesn't get corrupted due to rotations or other log data written while we open these files
|
||||
// This will block writes!!!
|
||||
l.mu.Lock()
|
||||
l.mu.RLock()
|
||||
|
||||
// TODO it would be nice to move a lot of this reader implementation to the rotate logger object
|
||||
pth := l.writer.LogPath()
|
||||
var files []io.ReadSeeker
|
||||
for i := l.writer.MaxFiles(); i > 1; i-- {
|
||||
|
@ -59,25 +60,36 @@ func (l *JSONFileLogger) readLogs(logWatcher *logger.LogWatcher, config logger.R
|
|||
if err != nil {
|
||||
if !os.IsNotExist(err) {
|
||||
logWatcher.Err <- err
|
||||
break
|
||||
l.mu.RUnlock()
|
||||
return
|
||||
}
|
||||
continue
|
||||
}
|
||||
defer f.Close()
|
||||
|
||||
files = append(files, f)
|
||||
}
|
||||
|
||||
latestFile, err := os.Open(pth)
|
||||
if err != nil {
|
||||
logWatcher.Err <- err
|
||||
l.mu.Unlock()
|
||||
logWatcher.Err <- errors.Wrap(err, "error opening latest log file")
|
||||
l.mu.RUnlock()
|
||||
return
|
||||
}
|
||||
defer latestFile.Close()
|
||||
|
||||
latestChunk, err := newSectionReader(latestFile)
|
||||
|
||||
// Now we have the reader sectioned, all fd's opened, we can unlock.
|
||||
// New writes/rotates will not affect seeking through these files
|
||||
l.mu.RUnlock()
|
||||
|
||||
if err != nil {
|
||||
logWatcher.Err <- err
|
||||
return
|
||||
}
|
||||
|
||||
if config.Tail != 0 {
|
||||
tailer := ioutils.MultiReadSeeker(append(files, latestFile)...)
|
||||
tailer := ioutils.MultiReadSeeker(append(files, latestChunk)...)
|
||||
tailFile(tailer, logWatcher, config.Tail, config.Since)
|
||||
}
|
||||
|
||||
|
@ -88,29 +100,32 @@ func (l *JSONFileLogger) readLogs(logWatcher *logger.LogWatcher, config logger.R
|
|||
}
|
||||
}
|
||||
|
||||
if !config.Follow {
|
||||
if err := latestFile.Close(); err != nil {
|
||||
logrus.Errorf("Error closing file: %v", err)
|
||||
}
|
||||
l.mu.Unlock()
|
||||
if !config.Follow || l.closed {
|
||||
return
|
||||
}
|
||||
|
||||
if config.Tail >= 0 {
|
||||
latestFile.Seek(0, os.SEEK_END)
|
||||
}
|
||||
notifyRotate := l.writer.NotifyRotate()
|
||||
defer l.writer.NotifyRotateEvict(notifyRotate)
|
||||
|
||||
l.mu.Lock()
|
||||
l.readers[logWatcher] = struct{}{}
|
||||
l.mu.Unlock()
|
||||
|
||||
notifyRotate := l.writer.NotifyRotate()
|
||||
followLogs(latestFile, logWatcher, notifyRotate, config.Since)
|
||||
|
||||
l.mu.Lock()
|
||||
delete(l.readers, logWatcher)
|
||||
l.mu.Unlock()
|
||||
}
|
||||
|
||||
l.writer.NotifyRotateEvict(notifyRotate)
|
||||
func newSectionReader(f *os.File) (*io.SectionReader, error) {
|
||||
// seek to the end to get the size
|
||||
// we'll leave this at the end of the file since section reader does not advance the reader
|
||||
size, err := f.Seek(0, os.SEEK_END)
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "error getting current file size")
|
||||
}
|
||||
return io.NewSectionReader(f, 0, size), nil
|
||||
}
|
||||
|
||||
func tailFile(f io.ReadSeeker, logWatcher *logger.LogWatcher, tail int, since time.Time) {
|
||||
|
|
Loading…
Reference in a new issue