diff --git a/daemon/logger/jsonfilelog/jsonfilelog.go b/daemon/logger/jsonfilelog/jsonfilelog.go index 177c070394..7aa92f3d37 100644 --- a/daemon/logger/jsonfilelog/jsonfilelog.go +++ b/daemon/logger/jsonfilelog/jsonfilelog.go @@ -7,7 +7,6 @@ import ( "bytes" "encoding/json" "fmt" - "io" "strconv" "sync" @@ -24,12 +23,9 @@ const Name = "json-file" // JSONFileLogger is Logger implementation for default Docker logging. type JSONFileLogger struct { - extra []byte // json-encoded extra attributes - - mu sync.RWMutex - buf *bytes.Buffer // avoids allocating a new buffer on each call to `Log()` + mu sync.Mutex closed bool - writer *loggerutils.RotateFileWriter + writer *loggerutils.LogFile readers map[*logger.LogWatcher]struct{} // stores the active log followers } @@ -65,11 +61,6 @@ func New(info logger.Info) (logger.Logger, error) { } } - writer, err := loggerutils.NewRotateFileWriter(info.LogPath, capval, maxFiles) - if err != nil { - return nil, err - } - var extra []byte attrs, err := info.ExtraAttributes(nil) if err != nil { @@ -83,33 +74,35 @@ func New(info logger.Info) (logger.Logger, error) { } } + buf := bytes.NewBuffer(nil) + marshalFunc := func(msg *logger.Message) ([]byte, error) { + if err := marshalMessage(msg, extra, buf); err != nil { + return nil, err + } + b := buf.Bytes() + buf.Reset() + return b, nil + } + + writer, err := loggerutils.NewLogFile(info.LogPath, capval, maxFiles, marshalFunc, decodeFunc) + if err != nil { + return nil, err + } + return &JSONFileLogger{ - buf: bytes.NewBuffer(nil), writer: writer, readers: make(map[*logger.LogWatcher]struct{}), - extra: extra, }, nil } // Log converts logger.Message to jsonlog.JSONLog and serializes it to file. func (l *JSONFileLogger) Log(msg *logger.Message) error { l.mu.Lock() - err := writeMessageBuf(l.writer, msg, l.extra, l.buf) - l.buf.Reset() + err := l.writer.WriteLogEntry(msg) l.mu.Unlock() return err } -func writeMessageBuf(w io.Writer, m *logger.Message, extra json.RawMessage, buf *bytes.Buffer) error { - if err := marshalMessage(m, extra, buf); err != nil { - logger.PutMessage(m) - return err - } - logger.PutMessage(m) - _, err := w.Write(buf.Bytes()) - return errors.Wrap(err, "error writing log entry") -} - func marshalMessage(msg *logger.Message, extra json.RawMessage, buf *bytes.Buffer) error { logLine := msg.Line if !msg.Partial { diff --git a/daemon/logger/jsonfilelog/jsonfilelog_test.go b/daemon/logger/jsonfilelog/jsonfilelog_test.go index 2b2b2b5229..893c054669 100644 --- a/daemon/logger/jsonfilelog/jsonfilelog_test.go +++ b/daemon/logger/jsonfilelog/jsonfilelog_test.go @@ -82,7 +82,7 @@ func BenchmarkJSONFileLoggerLog(b *testing.B) { } buf := bytes.NewBuffer(nil) - require.NoError(b, marshalMessage(msg, jsonlogger.(*JSONFileLogger).extra, buf)) + require.NoError(b, marshalMessage(msg, nil, buf)) b.SetBytes(int64(buf.Len())) b.ResetTimer() diff --git a/daemon/logger/jsonfilelog/read.go b/daemon/logger/jsonfilelog/read.go index 09eaaf00de..f190e01a56 100644 --- a/daemon/logger/jsonfilelog/read.go +++ b/daemon/logger/jsonfilelog/read.go @@ -1,33 +1,45 @@ package jsonfilelog import ( - "bytes" "encoding/json" - "fmt" "io" - "os" - "time" - - "github.com/fsnotify/fsnotify" - "golang.org/x/net/context" "github.com/docker/docker/api/types/backend" "github.com/docker/docker/daemon/logger" "github.com/docker/docker/daemon/logger/jsonfilelog/jsonlog" - "github.com/docker/docker/daemon/logger/jsonfilelog/multireader" - "github.com/docker/docker/pkg/filenotify" - "github.com/docker/docker/pkg/tailfile" - "github.com/pkg/errors" - "github.com/sirupsen/logrus" ) const maxJSONDecodeRetry = 20000 +// ReadLogs implements the logger's LogReader interface for the logs +// created by this driver. +func (l *JSONFileLogger) ReadLogs(config logger.ReadConfig) *logger.LogWatcher { + logWatcher := logger.NewLogWatcher() + + go l.readLogs(logWatcher, config) + return logWatcher +} + +func (l *JSONFileLogger) readLogs(watcher *logger.LogWatcher, config logger.ReadConfig) { + defer close(watcher.Msg) + + l.mu.Lock() + l.readers[watcher] = struct{}{} + l.mu.Unlock() + + l.writer.ReadLogs(config, watcher) + + l.mu.Lock() + delete(l.readers, watcher) + l.mu.Unlock() +} + func decodeLogLine(dec *json.Decoder, l *jsonlog.JSONLog) (*logger.Message, error) { l.Reset() if err := dec.Decode(l); err != nil { return nil, err } + var attrs []backend.LogAttr if len(l.Attrs) != 0 { attrs = make([]backend.LogAttr, 0, len(l.Attrs)) @@ -44,318 +56,34 @@ func decodeLogLine(dec *json.Decoder, l *jsonlog.JSONLog) (*logger.Message, erro return msg, nil } -// ReadLogs implements the logger's LogReader interface for the logs -// created by this driver. -func (l *JSONFileLogger) ReadLogs(config logger.ReadConfig) *logger.LogWatcher { - logWatcher := logger.NewLogWatcher() - - go l.readLogs(logWatcher, config) - return logWatcher -} - -func (l *JSONFileLogger) readLogs(logWatcher *logger.LogWatcher, config logger.ReadConfig) { - defer close(logWatcher.Msg) - - // lock so the read stream doesn't get corrupted due to rotations or other log data written while we open these files - // This will block writes!!! - l.mu.RLock() - - // TODO it would be nice to move a lot of this reader implementation to the rotate logger object - pth := l.writer.LogPath() - var files []io.ReadSeeker - for i := l.writer.MaxFiles(); i > 1; i-- { - f, err := os.Open(fmt.Sprintf("%s.%d", pth, i-1)) - if err != nil { - if !os.IsNotExist(err) { - logWatcher.Err <- err - l.mu.RUnlock() - return - } - continue - } - defer f.Close() - files = append(files, f) - } - - latestFile, err := os.Open(pth) - if err != nil { - logWatcher.Err <- errors.Wrap(err, "error opening latest log file") - l.mu.RUnlock() - return - } - defer latestFile.Close() - - latestChunk, err := newSectionReader(latestFile) - - // Now we have the reader sectioned, all fd's opened, we can unlock. - // New writes/rotates will not affect seeking through these files - l.mu.RUnlock() - - if err != nil { - logWatcher.Err <- err - return - } - - if config.Tail != 0 { - tailer := multireader.MultiReadSeeker(append(files, latestChunk)...) - tailFile(tailer, logWatcher, config.Tail, config.Since, config.Until) - } - - // close all the rotated files - for _, f := range files { - if err := f.(io.Closer).Close(); err != nil { - logrus.WithField("logger", "json-file").Warnf("error closing tailed log file: %v", err) - } - } - - if !config.Follow || l.closed { - return - } - - notifyRotate := l.writer.NotifyRotate() - defer l.writer.NotifyRotateEvict(notifyRotate) - - l.mu.Lock() - l.readers[logWatcher] = struct{}{} - l.mu.Unlock() - - followLogs(latestFile, logWatcher, notifyRotate, config) - - l.mu.Lock() - delete(l.readers, logWatcher) - l.mu.Unlock() -} - -func newSectionReader(f *os.File) (*io.SectionReader, error) { - // seek to the end to get the size - // we'll leave this at the end of the file since section reader does not advance the reader - size, err := f.Seek(0, os.SEEK_END) - if err != nil { - return nil, errors.Wrap(err, "error getting current file size") - } - return io.NewSectionReader(f, 0, size), nil -} - -func tailFile(f io.ReadSeeker, logWatcher *logger.LogWatcher, tail int, since, until time.Time) { - rdr := io.Reader(f) - if tail > 0 { - ls, err := tailfile.TailFile(f, tail) - if err != nil { - logWatcher.Err <- err - return - } - rdr = bytes.NewBuffer(bytes.Join(ls, []byte("\n"))) - } - dec := json.NewDecoder(rdr) - for { - msg, err := decodeLogLine(dec, &jsonlog.JSONLog{}) - if err != nil { - if err != io.EOF { - logWatcher.Err <- err - } - return - } - if !since.IsZero() && msg.Timestamp.Before(since) { - continue - } - if !until.IsZero() && msg.Timestamp.After(until) { - return - } - select { - case <-logWatcher.WatchClose(): - return - case logWatcher.Msg <- msg: - } - } -} - -func watchFile(name string) (filenotify.FileWatcher, error) { - fileWatcher, err := filenotify.New() - if err != nil { - return nil, err - } - - if err := fileWatcher.Add(name); err != nil { - logrus.WithField("logger", "json-file").Warnf("falling back to file poller due to error: %v", err) - fileWatcher.Close() - fileWatcher = filenotify.NewPollingWatcher() - - if err := fileWatcher.Add(name); err != nil { - fileWatcher.Close() - logrus.Debugf("error watching log file for modifications: %v", err) - return nil, err - } - } - return fileWatcher, nil -} - -func followLogs(f *os.File, logWatcher *logger.LogWatcher, notifyRotate chan interface{}, config logger.ReadConfig) { - dec := json.NewDecoder(f) +// decodeFunc is used to create a decoder for the log file reader +func decodeFunc(rdr io.Reader) func() (*logger.Message, error) { l := &jsonlog.JSONLog{} - - name := f.Name() - fileWatcher, err := watchFile(name) - if err != nil { - logWatcher.Err <- err - return - } - defer func() { - f.Close() - fileWatcher.Remove(name) - fileWatcher.Close() - }() - - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - go func() { - select { - case <-logWatcher.WatchClose(): - fileWatcher.Remove(name) - cancel() - case <-ctx.Done(): - return - } - }() - - var retries int - handleRotate := func() error { - f.Close() - fileWatcher.Remove(name) - - // retry when the file doesn't exist - for retries := 0; retries <= 5; retries++ { - f, err = os.Open(name) - if err == nil || !os.IsNotExist(err) { + dec := json.NewDecoder(rdr) + return func() (msg *logger.Message, err error) { + for retries := 0; retries < maxJSONDecodeRetry; retries++ { + msg, err = decodeLogLine(dec, l) + if err == nil { break } - } - if err != nil { - return err - } - if err := fileWatcher.Add(name); err != nil { - return err - } - dec = json.NewDecoder(f) - return nil - } - errRetry := errors.New("retry") - errDone := errors.New("done") - waitRead := func() error { - select { - case e := <-fileWatcher.Events(): - switch e.Op { - case fsnotify.Write: - dec = json.NewDecoder(f) - return nil - case fsnotify.Rename, fsnotify.Remove: - select { - case <-notifyRotate: - case <-ctx.Done(): - return errDone - } - if err := handleRotate(); err != nil { - return err - } - return nil - } - return errRetry - case err := <-fileWatcher.Errors(): - logrus.Debug("logger got error watching file: %v", err) - // Something happened, let's try and stay alive and create a new watcher - if retries <= 5 { - fileWatcher.Close() - fileWatcher, err = watchFile(name) - if err != nil { - return err - } + // try again, could be due to a an incomplete json object as we read + if _, ok := err.(*json.SyntaxError); ok { + dec = json.NewDecoder(rdr) retries++ - return errRetry + continue } - return err - case <-ctx.Done(): - return errDone - } - } - handleDecodeErr := func(err error) error { - if err == io.EOF { - for { - err := waitRead() - if err == nil { - break - } - if err == errRetry { - continue - } - return err - } - return nil - } - // try again because this shouldn't happen - if _, ok := err.(*json.SyntaxError); ok && retries <= maxJSONDecodeRetry { - dec = json.NewDecoder(f) - retries++ - return nil - } - // io.ErrUnexpectedEOF is returned from json.Decoder when there is - // remaining data in the parser's buffer while an io.EOF occurs. - // If the json logger writes a partial json log entry to the disk - // while at the same time the decoder tries to decode it, the race condition happens. - if err == io.ErrUnexpectedEOF && retries <= maxJSONDecodeRetry { - reader := io.MultiReader(dec.Buffered(), f) - dec = json.NewDecoder(reader) - retries++ - return nil - } - return err - } - - // main loop - for { - msg, err := decodeLogLine(dec, l) - if err != nil { - if err := handleDecodeErr(err); err != nil { - if err == errDone { - return - } - // we got an unrecoverable error, so return - logWatcher.Err <- err - return - } - // ready to try again - continue - } - - since := config.Since - until := config.Until - - retries = 0 // reset retries since we've succeeded - if !since.IsZero() && msg.Timestamp.Before(since) { - continue - } - if !until.IsZero() && msg.Timestamp.After(until) { - return - } - select { - case logWatcher.Msg <- msg: - case <-ctx.Done(): - logWatcher.Msg <- msg - // This for loop is used when the logger is closed (ie, container - // stopped) but the consumer is still waiting for logs. - for { - msg, err := decodeLogLine(dec, l) - if err != nil { - return - } - if !since.IsZero() && msg.Timestamp.Before(since) { - continue - } - if !until.IsZero() && msg.Timestamp.After(until) { - return - } - logWatcher.Msg <- msg + // io.ErrUnexpectedEOF is returned from json.Decoder when there is + // remaining data in the parser's buffer while an io.EOF occurs. + // If the json logger writes a partial json log entry to the disk + // while at the same time the decoder tries to decode it, the race condition happens. + if err == io.ErrUnexpectedEOF { + reader := io.MultiReader(dec.Buffered(), rdr) + dec = json.NewDecoder(reader) + retries++ } } + return msg, err } } diff --git a/daemon/logger/jsonfilelog/read_test.go b/daemon/logger/jsonfilelog/read_test.go index 01a05c4b78..599fdf9336 100644 --- a/daemon/logger/jsonfilelog/read_test.go +++ b/daemon/logger/jsonfilelog/read_test.go @@ -35,7 +35,7 @@ func BenchmarkJSONFileLoggerReadLogs(b *testing.B) { } buf := bytes.NewBuffer(nil) - require.NoError(b, marshalMessage(msg, jsonlogger.(*JSONFileLogger).extra, buf)) + require.NoError(b, marshalMessage(msg, nil, buf)) b.SetBytes(int64(buf.Len())) b.ResetTimer() diff --git a/daemon/logger/logger.go b/daemon/logger/logger.go index 6edbf734c1..dc25bebfc7 100644 --- a/daemon/logger/logger.go +++ b/daemon/logger/logger.go @@ -140,3 +140,6 @@ type Capability struct { // Determines if a log driver can read back logs ReadLogs bool } + +// MarshalFunc is a func that marshals a message into an arbitrary format +type MarshalFunc func(*Message) ([]byte, error) diff --git a/daemon/logger/loggerutils/logfile.go b/daemon/logger/loggerutils/logfile.go new file mode 100644 index 0000000000..6a7a68909e --- /dev/null +++ b/daemon/logger/loggerutils/logfile.go @@ -0,0 +1,454 @@ +package loggerutils + +import ( + "bytes" + "context" + "fmt" + "io" + "os" + "strconv" + "sync" + "time" + + "github.com/docker/docker/daemon/logger" + "github.com/docker/docker/daemon/logger/loggerutils/multireader" + "github.com/docker/docker/pkg/filenotify" + "github.com/docker/docker/pkg/pubsub" + "github.com/docker/docker/pkg/tailfile" + "github.com/fsnotify/fsnotify" + "github.com/pkg/errors" + "github.com/sirupsen/logrus" +) + +// LogFile is Logger implementation for default Docker logging. +type LogFile struct { + f *os.File // store for closing + closed bool + mu sync.RWMutex + capacity int64 //maximum size of each file + currentSize int64 // current size of the latest file + maxFiles int //maximum number of files + notifyRotate *pubsub.Publisher + marshal logger.MarshalFunc + createDecoder makeDecoderFunc +} + +type makeDecoderFunc func(rdr io.Reader) func() (*logger.Message, error) + +//NewLogFile creates new LogFile +func NewLogFile(logPath string, capacity int64, maxFiles int, marshaller logger.MarshalFunc, decodeFunc makeDecoderFunc) (*LogFile, error) { + log, err := os.OpenFile(logPath, os.O_WRONLY|os.O_APPEND|os.O_CREATE, 0640) + if err != nil { + return nil, err + } + + size, err := log.Seek(0, os.SEEK_END) + if err != nil { + return nil, err + } + + return &LogFile{ + f: log, + capacity: capacity, + currentSize: size, + maxFiles: maxFiles, + notifyRotate: pubsub.NewPublisher(0, 1), + marshal: marshaller, + createDecoder: decodeFunc, + }, nil +} + +// WriteLogEntry writes the provided log message to the current log file. +// This may trigger a rotation event if the max file/capacity limits are hit. +func (w *LogFile) WriteLogEntry(msg *logger.Message) error { + b, err := w.marshal(msg) + if err != nil { + return errors.Wrap(err, "error marshalling log message") + } + + logger.PutMessage(msg) + + w.mu.Lock() + if w.closed { + w.mu.Unlock() + return errors.New("cannot write because the output file was closed") + } + + if err := w.checkCapacityAndRotate(); err != nil { + w.mu.Unlock() + return err + } + + n, err := w.f.Write(b) + if err == nil { + w.currentSize += int64(n) + } + w.mu.Unlock() + return err +} + +func (w *LogFile) checkCapacityAndRotate() error { + if w.capacity == -1 { + return nil + } + + if w.currentSize >= w.capacity { + name := w.f.Name() + if err := w.f.Close(); err != nil { + return errors.Wrap(err, "error closing file") + } + if err := rotate(name, w.maxFiles); err != nil { + return err + } + file, err := os.OpenFile(name, os.O_WRONLY|os.O_TRUNC|os.O_CREATE, 0640) + if err != nil { + return err + } + w.f = file + w.currentSize = 0 + w.notifyRotate.Publish(struct{}{}) + } + + return nil +} + +func rotate(name string, maxFiles int) error { + if maxFiles < 2 { + return nil + } + for i := maxFiles - 1; i > 1; i-- { + toPath := name + "." + strconv.Itoa(i) + fromPath := name + "." + strconv.Itoa(i-1) + if err := os.Rename(fromPath, toPath); err != nil && !os.IsNotExist(err) { + return errors.Wrap(err, "error rotating old log entries") + } + } + + if err := os.Rename(name, name+".1"); err != nil && !os.IsNotExist(err) { + return errors.Wrap(err, "error rotating current log") + } + return nil +} + +// LogPath returns the location the given writer logs to. +func (w *LogFile) LogPath() string { + w.mu.Lock() + defer w.mu.Unlock() + return w.f.Name() +} + +// MaxFiles return maximum number of files +func (w *LogFile) MaxFiles() int { + return w.maxFiles +} + +// Close closes underlying file and signals all readers to stop. +func (w *LogFile) Close() error { + w.mu.Lock() + defer w.mu.Unlock() + if w.closed { + return nil + } + if err := w.f.Close(); err != nil { + return err + } + w.closed = true + return nil +} + +// ReadLogs decodes entries from log files and sends them the passed in watcher +func (w *LogFile) ReadLogs(config logger.ReadConfig, watcher *logger.LogWatcher) { + w.mu.RLock() + files, err := w.openRotatedFiles() + if err != nil { + w.mu.RUnlock() + watcher.Err <- err + return + } + defer func() { + for _, f := range files { + f.Close() + } + }() + + currentFile, err := os.Open(w.f.Name()) + if err != nil { + w.mu.RUnlock() + watcher.Err <- err + return + } + defer currentFile.Close() + + currentChunk, err := newSectionReader(currentFile) + w.mu.RUnlock() + + if err != nil { + watcher.Err <- err + return + } + + if config.Tail != 0 { + seekers := make([]io.ReadSeeker, 0, len(files)+1) + for _, f := range files { + seekers = append(seekers, f) + } + seekers = append(seekers, currentChunk) + tailFile(multireader.MultiReadSeeker(seekers...), watcher, w.createDecoder, config) + } + + w.mu.RLock() + if !config.Follow || w.closed { + w.mu.RUnlock() + return + } + w.mu.RUnlock() + + notifyRotate := w.notifyRotate.Subscribe() + defer w.notifyRotate.Evict(notifyRotate) + followLogs(currentFile, watcher, notifyRotate, w.createDecoder, config.Since, config.Until) +} + +func (w *LogFile) openRotatedFiles() (files []*os.File, err error) { + defer func() { + if err == nil { + return + } + for _, f := range files { + f.Close() + } + }() + + for i := w.maxFiles; i > 1; i-- { + f, err := os.Open(fmt.Sprintf("%s.%d", w.f.Name(), i-1)) + if err != nil { + if !os.IsNotExist(err) { + return nil, err + } + continue + } + files = append(files, f) + } + + return files, nil +} + +func newSectionReader(f *os.File) (*io.SectionReader, error) { + // seek to the end to get the size + // we'll leave this at the end of the file since section reader does not advance the reader + size, err := f.Seek(0, os.SEEK_END) + if err != nil { + return nil, errors.Wrap(err, "error getting current file size") + } + return io.NewSectionReader(f, 0, size), nil +} + +type decodeFunc func() (*logger.Message, error) + +func tailFile(f io.ReadSeeker, watcher *logger.LogWatcher, createDecoder makeDecoderFunc, config logger.ReadConfig) { + var rdr io.Reader = f + if config.Tail > 0 { + ls, err := tailfile.TailFile(f, config.Tail) + if err != nil { + watcher.Err <- err + return + } + rdr = bytes.NewBuffer(bytes.Join(ls, []byte("\n"))) + } + + decodeLogLine := createDecoder(rdr) + for { + msg, err := decodeLogLine() + if err != nil { + if err != io.EOF { + watcher.Err <- err + } + return + } + if !config.Since.IsZero() && msg.Timestamp.Before(config.Since) { + continue + } + if !config.Until.IsZero() && msg.Timestamp.After(config.Until) { + return + } + select { + case <-watcher.WatchClose(): + return + case watcher.Msg <- msg: + } + } +} + +func followLogs(f *os.File, logWatcher *logger.LogWatcher, notifyRotate chan interface{}, createDecoder makeDecoderFunc, since, until time.Time) { + decodeLogLine := createDecoder(f) + + name := f.Name() + fileWatcher, err := watchFile(name) + if err != nil { + logWatcher.Err <- err + return + } + defer func() { + f.Close() + fileWatcher.Remove(name) + fileWatcher.Close() + }() + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + go func() { + select { + case <-logWatcher.WatchClose(): + fileWatcher.Remove(name) + cancel() + case <-ctx.Done(): + return + } + }() + + var retries int + handleRotate := func() error { + f.Close() + fileWatcher.Remove(name) + + // retry when the file doesn't exist + for retries := 0; retries <= 5; retries++ { + f, err = os.Open(name) + if err == nil || !os.IsNotExist(err) { + break + } + } + if err != nil { + return err + } + if err := fileWatcher.Add(name); err != nil { + return err + } + decodeLogLine = createDecoder(f) + return nil + } + + errRetry := errors.New("retry") + errDone := errors.New("done") + waitRead := func() error { + select { + case e := <-fileWatcher.Events(): + switch e.Op { + case fsnotify.Write: + decodeLogLine = createDecoder(f) + return nil + case fsnotify.Rename, fsnotify.Remove: + select { + case <-notifyRotate: + case <-ctx.Done(): + return errDone + } + if err := handleRotate(); err != nil { + return err + } + return nil + } + return errRetry + case err := <-fileWatcher.Errors(): + logrus.Debug("logger got error watching file: %v", err) + // Something happened, let's try and stay alive and create a new watcher + if retries <= 5 { + fileWatcher.Close() + fileWatcher, err = watchFile(name) + if err != nil { + return err + } + retries++ + return errRetry + } + return err + case <-ctx.Done(): + return errDone + } + } + + handleDecodeErr := func(err error) error { + if err != io.EOF { + return err + } + + for { + err := waitRead() + if err == nil { + break + } + if err == errRetry { + continue + } + return err + } + return nil + } + + // main loop + for { + msg, err := decodeLogLine() + if err != nil { + if err := handleDecodeErr(err); err != nil { + if err == errDone { + return + } + // we got an unrecoverable error, so return + logWatcher.Err <- err + return + } + // ready to try again + continue + } + + retries = 0 // reset retries since we've succeeded + if !since.IsZero() && msg.Timestamp.Before(since) { + continue + } + if !until.IsZero() && msg.Timestamp.After(until) { + return + } + select { + case logWatcher.Msg <- msg: + case <-ctx.Done(): + logWatcher.Msg <- msg + for { + msg, err := decodeLogLine() + if err != nil { + return + } + if !since.IsZero() && msg.Timestamp.Before(since) { + continue + } + if !until.IsZero() && msg.Timestamp.After(until) { + return + } + logWatcher.Msg <- msg + } + } + } +} + +func watchFile(name string) (filenotify.FileWatcher, error) { + fileWatcher, err := filenotify.New() + if err != nil { + return nil, err + } + + logger := logrus.WithFields(logrus.Fields{ + "module": "logger", + "fille": name, + }) + + if err := fileWatcher.Add(name); err != nil { + logger.WithError(err).Warnf("falling back to file poller") + fileWatcher.Close() + fileWatcher = filenotify.NewPollingWatcher() + + if err := fileWatcher.Add(name); err != nil { + fileWatcher.Close() + logger.WithError(err).Debugf("error watching log file for modifications") + return nil, err + } + } + return fileWatcher, nil +} diff --git a/daemon/logger/jsonfilelog/multireader/multireader.go b/daemon/logger/loggerutils/multireader/multireader.go similarity index 100% rename from daemon/logger/jsonfilelog/multireader/multireader.go rename to daemon/logger/loggerutils/multireader/multireader.go diff --git a/daemon/logger/jsonfilelog/multireader/multireader_test.go b/daemon/logger/loggerutils/multireader/multireader_test.go similarity index 100% rename from daemon/logger/jsonfilelog/multireader/multireader_test.go rename to daemon/logger/loggerutils/multireader/multireader_test.go diff --git a/daemon/logger/loggerutils/rotatefilewriter.go b/daemon/logger/loggerutils/rotatefilewriter.go deleted file mode 100644 index 457a39b5a3..0000000000 --- a/daemon/logger/loggerutils/rotatefilewriter.go +++ /dev/null @@ -1,141 +0,0 @@ -package loggerutils - -import ( - "errors" - "os" - "strconv" - "sync" - - "github.com/docker/docker/pkg/pubsub" -) - -// RotateFileWriter is Logger implementation for default Docker logging. -type RotateFileWriter struct { - f *os.File // store for closing - closed bool - mu sync.Mutex - capacity int64 //maximum size of each file - currentSize int64 // current size of the latest file - maxFiles int //maximum number of files - notifyRotate *pubsub.Publisher -} - -//NewRotateFileWriter creates new RotateFileWriter -func NewRotateFileWriter(logPath string, capacity int64, maxFiles int) (*RotateFileWriter, error) { - log, err := os.OpenFile(logPath, os.O_WRONLY|os.O_APPEND|os.O_CREATE, 0640) - if err != nil { - return nil, err - } - - size, err := log.Seek(0, os.SEEK_END) - if err != nil { - return nil, err - } - - return &RotateFileWriter{ - f: log, - capacity: capacity, - currentSize: size, - maxFiles: maxFiles, - notifyRotate: pubsub.NewPublisher(0, 1), - }, nil -} - -//WriteLog write log message to File -func (w *RotateFileWriter) Write(message []byte) (int, error) { - w.mu.Lock() - if w.closed { - w.mu.Unlock() - return -1, errors.New("cannot write because the output file was closed") - } - if err := w.checkCapacityAndRotate(); err != nil { - w.mu.Unlock() - return -1, err - } - - n, err := w.f.Write(message) - if err == nil { - w.currentSize += int64(n) - } - w.mu.Unlock() - return n, err -} - -func (w *RotateFileWriter) checkCapacityAndRotate() error { - if w.capacity == -1 { - return nil - } - - if w.currentSize >= w.capacity { - name := w.f.Name() - if err := w.f.Close(); err != nil { - return err - } - if err := rotate(name, w.maxFiles); err != nil { - return err - } - file, err := os.OpenFile(name, os.O_WRONLY|os.O_TRUNC|os.O_CREATE, 0640) - if err != nil { - return err - } - w.f = file - w.currentSize = 0 - w.notifyRotate.Publish(struct{}{}) - } - - return nil -} - -func rotate(name string, maxFiles int) error { - if maxFiles < 2 { - return nil - } - for i := maxFiles - 1; i > 1; i-- { - toPath := name + "." + strconv.Itoa(i) - fromPath := name + "." + strconv.Itoa(i-1) - if err := os.Rename(fromPath, toPath); err != nil && !os.IsNotExist(err) { - return err - } - } - - if err := os.Rename(name, name+".1"); err != nil && !os.IsNotExist(err) { - return err - } - return nil -} - -// LogPath returns the location the given writer logs to. -func (w *RotateFileWriter) LogPath() string { - w.mu.Lock() - defer w.mu.Unlock() - return w.f.Name() -} - -// MaxFiles return maximum number of files -func (w *RotateFileWriter) MaxFiles() int { - return w.maxFiles -} - -//NotifyRotate returns the new subscriber -func (w *RotateFileWriter) NotifyRotate() chan interface{} { - return w.notifyRotate.Subscribe() -} - -//NotifyRotateEvict removes the specified subscriber from receiving any more messages. -func (w *RotateFileWriter) NotifyRotateEvict(sub chan interface{}) { - w.notifyRotate.Evict(sub) -} - -// Close closes underlying file and signals all readers to stop. -func (w *RotateFileWriter) Close() error { - w.mu.Lock() - defer w.mu.Unlock() - if w.closed { - return nil - } - if err := w.f.Close(); err != nil { - return err - } - w.closed = true - return nil -} diff --git a/daemon/logs.go b/daemon/logs.go index babf07e36d..131360b7b4 100644 --- a/daemon/logs.go +++ b/daemon/logs.go @@ -123,7 +123,7 @@ func (daemon *Daemon) ContainerLogs(ctx context.Context, containerName string, c } return case <-ctx.Done(): - lg.Debug("logs: end stream, ctx is done: %v", ctx.Err()) + lg.Debugf("logs: end stream, ctx is done: %v", ctx.Err()) return case msg, ok := <-logs.Msg: // there is some kind of pool or ring buffer in the logger that