Browse Source

Merge pull request #33897 from cpuguy83/jsonfile_log_blocking

Fix log readers can block writes indefinitely
Sebastiaan van Stijn 8 years ago
parent
commit
db8c2651b8
2 changed files with 66 additions and 34 deletions
  1. 35 20
      daemon/logger/jsonfilelog/jsonfilelog.go
  2. 31 14
      daemon/logger/jsonfilelog/read.go

+ 35 - 20
daemon/logger/jsonfilelog/jsonfilelog.go

@@ -7,6 +7,7 @@ import (
 	"bytes"
 	"bytes"
 	"encoding/json"
 	"encoding/json"
 	"fmt"
 	"fmt"
+	"io"
 	"strconv"
 	"strconv"
 	"sync"
 	"sync"
 
 
@@ -15,6 +16,7 @@ import (
 	"github.com/docker/docker/daemon/logger/loggerutils"
 	"github.com/docker/docker/daemon/logger/loggerutils"
 	"github.com/docker/docker/pkg/jsonlog"
 	"github.com/docker/docker/pkg/jsonlog"
 	"github.com/docker/go-units"
 	"github.com/docker/go-units"
+	"github.com/pkg/errors"
 )
 )
 
 
 // Name is the name of the file that the jsonlogger logs to.
 // Name is the name of the file that the jsonlogger logs to.
@@ -22,12 +24,13 @@ const Name = "json-file"
 
 
 // JSONFileLogger is Logger implementation for default Docker logging.
 // JSONFileLogger is Logger implementation for default Docker logging.
 type JSONFileLogger struct {
 type JSONFileLogger struct {
-	buf     *bytes.Buffer
+	extra []byte // json-encoded extra attributes
+
+	mu      sync.RWMutex
+	buf     *bytes.Buffer // avoids allocating a new buffer on each call to `Log()`
+	closed  bool
 	writer  *loggerutils.RotateFileWriter
 	writer  *loggerutils.RotateFileWriter
-	mu      sync.Mutex
 	readers map[*logger.LogWatcher]struct{} // stores the active log followers
 	readers map[*logger.LogWatcher]struct{} // stores the active log followers
-	extra   []byte                          // json-encoded extra attributes
-	closed  bool
 }
 }
 
 
 func init() {
 func init() {
@@ -90,33 +93,45 @@ func New(info logger.Info) (logger.Logger, error) {
 
 
 // Log converts logger.Message to jsonlog.JSONLog and serializes it to file.
 // Log converts logger.Message to jsonlog.JSONLog and serializes it to file.
 func (l *JSONFileLogger) Log(msg *logger.Message) error {
 func (l *JSONFileLogger) Log(msg *logger.Message) error {
+	l.mu.Lock()
+	err := writeMessageBuf(l.writer, msg, l.extra, l.buf)
+	l.buf.Reset()
+	l.mu.Unlock()
+	return err
+}
+
+func writeMessageBuf(w io.Writer, m *logger.Message, extra json.RawMessage, buf *bytes.Buffer) error {
+	if err := marshalMessage(m, extra, buf); err != nil {
+		logger.PutMessage(m)
+		return err
+	}
+	logger.PutMessage(m)
+	if _, err := w.Write(buf.Bytes()); err != nil {
+		return errors.Wrap(err, "error writing log entry")
+	}
+	return nil
+}
+
+func marshalMessage(msg *logger.Message, extra json.RawMessage, buf *bytes.Buffer) error {
 	timestamp, err := jsonlog.FastTimeMarshalJSON(msg.Timestamp)
 	timestamp, err := jsonlog.FastTimeMarshalJSON(msg.Timestamp)
 	if err != nil {
 	if err != nil {
 		return err
 		return err
 	}
 	}
-	l.mu.Lock()
-	logline := msg.Line
+	logLine := msg.Line
 	if !msg.Partial {
 	if !msg.Partial {
-		logline = append(msg.Line, '\n')
+		logLine = append(msg.Line, '\n')
 	}
 	}
 	err = (&jsonlog.JSONLogs{
 	err = (&jsonlog.JSONLogs{
-		Log:      logline,
+		Log:      logLine,
 		Stream:   msg.Source,
 		Stream:   msg.Source,
 		Created:  timestamp,
 		Created:  timestamp,
-		RawAttrs: l.extra,
-	}).MarshalJSONBuf(l.buf)
-	logger.PutMessage(msg)
+		RawAttrs: extra,
+	}).MarshalJSONBuf(buf)
 	if err != nil {
 	if err != nil {
-		l.mu.Unlock()
-		return err
+		return errors.Wrap(err, "error writing log message to buffer")
 	}
 	}
-
-	l.buf.WriteByte('\n')
-	_, err = l.writer.Write(l.buf.Bytes())
-	l.buf.Reset()
-	l.mu.Unlock()
-
-	return err
+	err = buf.WriteByte('\n')
+	return errors.Wrap(err, "error finalizing log buffer")
 }
 }
 
 
 // ValidateLogOpt looks for json specific log options max-file & max-size.
 // ValidateLogOpt looks for json specific log options max-file & max-size.

+ 31 - 14
daemon/logger/jsonfilelog/read.go

@@ -3,7 +3,6 @@ package jsonfilelog
 import (
 import (
 	"bytes"
 	"bytes"
 	"encoding/json"
 	"encoding/json"
-	"errors"
 	"fmt"
 	"fmt"
 	"io"
 	"io"
 	"os"
 	"os"
@@ -18,6 +17,7 @@ import (
 	"github.com/docker/docker/pkg/filenotify"
 	"github.com/docker/docker/pkg/filenotify"
 	"github.com/docker/docker/pkg/jsonlog"
 	"github.com/docker/docker/pkg/jsonlog"
 	"github.com/docker/docker/pkg/tailfile"
 	"github.com/docker/docker/pkg/tailfile"
+	"github.com/pkg/errors"
 )
 )
 
 
 const maxJSONDecodeRetry = 20000
 const maxJSONDecodeRetry = 20000
@@ -48,10 +48,11 @@ func (l *JSONFileLogger) ReadLogs(config logger.ReadConfig) *logger.LogWatcher {
 func (l *JSONFileLogger) readLogs(logWatcher *logger.LogWatcher, config logger.ReadConfig) {
 func (l *JSONFileLogger) readLogs(logWatcher *logger.LogWatcher, config logger.ReadConfig) {
 	defer close(logWatcher.Msg)
 	defer close(logWatcher.Msg)
 
 
-	// lock so the read stream doesn't get corrupted due to rotations or other log data written while we read
+	// lock so the read stream doesn't get corrupted due to rotations or other log data written while we open these files
 	// This will block writes!!!
 	// This will block writes!!!
-	l.mu.Lock()
+	l.mu.RLock()
 
 
+	// TODO it would be nice to move a lot of this reader implementation to the rotate logger object
 	pth := l.writer.LogPath()
 	pth := l.writer.LogPath()
 	var files []io.ReadSeeker
 	var files []io.ReadSeeker
 	for i := l.writer.MaxFiles(); i > 1; i-- {
 	for i := l.writer.MaxFiles(); i > 1; i-- {
@@ -59,25 +60,36 @@ func (l *JSONFileLogger) readLogs(logWatcher *logger.LogWatcher, config logger.R
 		if err != nil {
 		if err != nil {
 			if !os.IsNotExist(err) {
 			if !os.IsNotExist(err) {
 				logWatcher.Err <- err
 				logWatcher.Err <- err
-				break
+				l.mu.RUnlock()
+				return
 			}
 			}
 			continue
 			continue
 		}
 		}
 		defer f.Close()
 		defer f.Close()
-
 		files = append(files, f)
 		files = append(files, f)
 	}
 	}
 
 
 	latestFile, err := os.Open(pth)
 	latestFile, err := os.Open(pth)
 	if err != nil {
 	if err != nil {
-		logWatcher.Err <- err
-		l.mu.Unlock()
+		logWatcher.Err <- errors.Wrap(err, "error opening latest log file")
+		l.mu.RUnlock()
 		return
 		return
 	}
 	}
 	defer latestFile.Close()
 	defer latestFile.Close()
 
 
+	latestChunk, err := newSectionReader(latestFile)
+
+	// Now we have the reader sectioned, all fd's opened, we can unlock.
+	// New writes/rotates will not affect seeking through these files
+	l.mu.RUnlock()
+
+	if err != nil {
+		logWatcher.Err <- err
+		return
+	}
+
 	if config.Tail != 0 {
 	if config.Tail != 0 {
-		tailer := multireader.MultiReadSeeker(append(files, latestFile)...)
+		tailer := multireader.MultiReadSeeker(append(files, latestChunk)...)
 		tailFile(tailer, logWatcher, config.Tail, config.Since)
 		tailFile(tailer, logWatcher, config.Tail, config.Since)
 	}
 	}
 
 
@@ -89,19 +101,14 @@ func (l *JSONFileLogger) readLogs(logWatcher *logger.LogWatcher, config logger.R
 	}
 	}
 
 
 	if !config.Follow || l.closed {
 	if !config.Follow || l.closed {
-		l.mu.Unlock()
 		return
 		return
 	}
 	}
 
 
-	if config.Tail >= 0 {
-		latestFile.Seek(0, os.SEEK_END)
-	}
-
 	notifyRotate := l.writer.NotifyRotate()
 	notifyRotate := l.writer.NotifyRotate()
 	defer l.writer.NotifyRotateEvict(notifyRotate)
 	defer l.writer.NotifyRotateEvict(notifyRotate)
 
 
+	l.mu.Lock()
 	l.readers[logWatcher] = struct{}{}
 	l.readers[logWatcher] = struct{}{}
-
 	l.mu.Unlock()
 	l.mu.Unlock()
 
 
 	followLogs(latestFile, logWatcher, notifyRotate, config.Since)
 	followLogs(latestFile, logWatcher, notifyRotate, config.Since)
@@ -111,6 +118,16 @@ func (l *JSONFileLogger) readLogs(logWatcher *logger.LogWatcher, config logger.R
 	l.mu.Unlock()
 	l.mu.Unlock()
 }
 }
 
 
+func newSectionReader(f *os.File) (*io.SectionReader, error) {
+	// seek to the end to get the size
+	// we'll leave this at the end of the file since section reader does not advance the reader
+	size, err := f.Seek(0, os.SEEK_END)
+	if err != nil {
+		return nil, errors.Wrap(err, "error getting current file size")
+	}
+	return io.NewSectionReader(f, 0, size), nil
+}
+
 func tailFile(f io.ReadSeeker, logWatcher *logger.LogWatcher, tail int, since time.Time) {
 func tailFile(f io.ReadSeeker, logWatcher *logger.LogWatcher, tail int, since time.Time) {
 	var rdr io.Reader
 	var rdr io.Reader
 	rdr = f
 	rdr = f