|
@@ -3,11 +3,14 @@ package jsonfilelog
|
|
|
import (
|
|
|
"bytes"
|
|
|
"encoding/json"
|
|
|
+ "errors"
|
|
|
"fmt"
|
|
|
"io"
|
|
|
"os"
|
|
|
"time"
|
|
|
|
|
|
+ "gopkg.in/fsnotify.v1"
|
|
|
+
|
|
|
"github.com/Sirupsen/logrus"
|
|
|
"github.com/docker/docker/daemon/logger"
|
|
|
"github.com/docker/docker/pkg/filenotify"
|
|
@@ -44,6 +47,10 @@ func (l *JSONFileLogger) ReadLogs(config logger.ReadConfig) *logger.LogWatcher {
|
|
|
func (l *JSONFileLogger) readLogs(logWatcher *logger.LogWatcher, config logger.ReadConfig) {
|
|
|
defer close(logWatcher.Msg)
|
|
|
|
|
|
+ // lock so the read stream doesn't get corrupted due to rotations or other log data written while we read
|
|
|
+ // This will block writes!!!
|
|
|
+ l.mu.Lock()
|
|
|
+
|
|
|
pth := l.writer.LogPath()
|
|
|
var files []io.ReadSeeker
|
|
|
for i := l.writer.MaxFiles(); i > 1; i-- {
|
|
@@ -61,6 +68,7 @@ func (l *JSONFileLogger) readLogs(logWatcher *logger.LogWatcher, config logger.R
|
|
|
latestFile, err := os.Open(pth)
|
|
|
if err != nil {
|
|
|
logWatcher.Err <- err
|
|
|
+ l.mu.Unlock()
|
|
|
return
|
|
|
}
|
|
|
|
|
@@ -80,6 +88,7 @@ func (l *JSONFileLogger) readLogs(logWatcher *logger.LogWatcher, config logger.R
|
|
|
if err := latestFile.Close(); err != nil {
|
|
|
logrus.Errorf("Error closing file: %v", err)
|
|
|
}
|
|
|
+ l.mu.Unlock()
|
|
|
return
|
|
|
}
|
|
|
|
|
@@ -87,7 +96,6 @@ func (l *JSONFileLogger) readLogs(logWatcher *logger.LogWatcher, config logger.R
|
|
|
latestFile.Seek(0, os.SEEK_END)
|
|
|
}
|
|
|
|
|
|
- l.mu.Lock()
|
|
|
l.readers[logWatcher] = struct{}{}
|
|
|
l.mu.Unlock()
|
|
|
|
|
@@ -128,92 +136,148 @@ func tailFile(f io.ReadSeeker, logWatcher *logger.LogWatcher, tail int, since ti
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+func watchFile(name string) (filenotify.FileWatcher, error) {
|
|
|
+ fileWatcher, err := filenotify.New()
|
|
|
+ if err != nil {
|
|
|
+ return nil, err
|
|
|
+ }
|
|
|
+
|
|
|
+ if err := fileWatcher.Add(name); err != nil {
|
|
|
+ logrus.WithField("logger", "json-file").Warnf("falling back to file poller due to error: %v", err)
|
|
|
+ fileWatcher.Close()
|
|
|
+ fileWatcher = filenotify.NewPollingWatcher()
|
|
|
+
|
|
|
+ if err := fileWatcher.Add(name); err != nil {
|
|
|
+ fileWatcher.Close()
|
|
|
+ logrus.Debugf("error watching log file for modifications: %v", err)
|
|
|
+ return nil, err
|
|
|
+ }
|
|
|
+ }
|
|
|
+ return fileWatcher, nil
|
|
|
+}
|
|
|
+
|
|
|
func followLogs(f *os.File, logWatcher *logger.LogWatcher, notifyRotate chan interface{}, since time.Time) {
|
|
|
dec := json.NewDecoder(f)
|
|
|
l := &jsonlog.JSONLog{}
|
|
|
|
|
|
- fileWatcher, err := filenotify.New()
|
|
|
+ name := f.Name()
|
|
|
+ fileWatcher, err := watchFile(name)
|
|
|
if err != nil {
|
|
|
logWatcher.Err <- err
|
|
|
+ return
|
|
|
}
|
|
|
defer func() {
|
|
|
f.Close()
|
|
|
fileWatcher.Close()
|
|
|
}()
|
|
|
- name := f.Name()
|
|
|
|
|
|
- if err := fileWatcher.Add(name); err != nil {
|
|
|
- logrus.WithField("logger", "json-file").Warnf("falling back to file poller due to error: %v", err)
|
|
|
- fileWatcher.Close()
|
|
|
- fileWatcher = filenotify.NewPollingWatcher()
|
|
|
+ var retries int
|
|
|
+ handleRotate := func() error {
|
|
|
+ f.Close()
|
|
|
+ fileWatcher.Remove(name)
|
|
|
|
|
|
+ // retry when the file doesn't exist
|
|
|
+ for retries := 0; retries <= 5; retries++ {
|
|
|
+ f, err = os.Open(name)
|
|
|
+ if err == nil || !os.IsNotExist(err) {
|
|
|
+ break
|
|
|
+ }
|
|
|
+ }
|
|
|
+ if err != nil {
|
|
|
+ return err
|
|
|
+ }
|
|
|
if err := fileWatcher.Add(name); err != nil {
|
|
|
- logrus.Debugf("error watching log file for modifications: %v", err)
|
|
|
- logWatcher.Err <- err
|
|
|
- return
|
|
|
+ return err
|
|
|
}
|
|
|
+ dec = json.NewDecoder(f)
|
|
|
+ return nil
|
|
|
}
|
|
|
|
|
|
- var retries int
|
|
|
- for {
|
|
|
- msg, err := decodeLogLine(dec, l)
|
|
|
- if err != nil {
|
|
|
- if err != io.EOF {
|
|
|
- // try again because this shouldn't happen
|
|
|
- if _, ok := err.(*json.SyntaxError); ok && retries <= maxJSONDecodeRetry {
|
|
|
- dec = json.NewDecoder(f)
|
|
|
- retries++
|
|
|
- continue
|
|
|
+ errRetry := errors.New("retry")
|
|
|
+ errDone := errors.New("done")
|
|
|
+ waitRead := func() error {
|
|
|
+ select {
|
|
|
+ case e := <-fileWatcher.Events():
|
|
|
+ switch e.Op {
|
|
|
+ case fsnotify.Write:
|
|
|
+ dec = json.NewDecoder(f)
|
|
|
+ return nil
|
|
|
+ case fsnotify.Rename, fsnotify.Remove:
|
|
|
+ select {
|
|
|
+ case <-notifyRotate:
|
|
|
+ case <-logWatcher.WatchClose():
|
|
|
+ fileWatcher.Remove(name)
|
|
|
+ return errDone
|
|
|
}
|
|
|
-
|
|
|
- // io.ErrUnexpectedEOF is returned from json.Decoder when there is
|
|
|
- // remaining data in the parser's buffer while an io.EOF occurs.
|
|
|
- // If the json logger writes a partial json log entry to the disk
|
|
|
- // while at the same time the decoder tries to decode it, the race condition happens.
|
|
|
- if err == io.ErrUnexpectedEOF && retries <= maxJSONDecodeRetry {
|
|
|
- reader := io.MultiReader(dec.Buffered(), f)
|
|
|
- dec = json.NewDecoder(reader)
|
|
|
- retries++
|
|
|
- continue
|
|
|
+ if err := handleRotate(); err != nil {
|
|
|
+ return err
|
|
|
}
|
|
|
-
|
|
|
- return
|
|
|
+ return nil
|
|
|
}
|
|
|
-
|
|
|
- select {
|
|
|
- case <-fileWatcher.Events():
|
|
|
- dec = json.NewDecoder(f)
|
|
|
- continue
|
|
|
- case <-fileWatcher.Errors():
|
|
|
- logWatcher.Err <- err
|
|
|
- return
|
|
|
- case <-logWatcher.WatchClose():
|
|
|
- fileWatcher.Remove(name)
|
|
|
- return
|
|
|
- case <-notifyRotate:
|
|
|
- f.Close()
|
|
|
- fileWatcher.Remove(name)
|
|
|
-
|
|
|
- // retry when the file doesn't exist
|
|
|
- for retries := 0; retries <= 5; retries++ {
|
|
|
- f, err = os.Open(name)
|
|
|
- if err == nil || !os.IsNotExist(err) {
|
|
|
- break
|
|
|
- }
|
|
|
+ return errRetry
|
|
|
+ case err := <-fileWatcher.Errors():
|
|
|
+ logrus.Debug("logger got error watching file: %v", err)
|
|
|
+ // Something happened, let's try and stay alive and create a new watcher
|
|
|
+ if retries <= 5 {
|
|
|
+ fileWatcher.Close()
|
|
|
+ fileWatcher, err = watchFile(name)
|
|
|
+ if err != nil {
|
|
|
+ return err
|
|
|
}
|
|
|
+ retries++
|
|
|
+ return errRetry
|
|
|
+ }
|
|
|
+ return err
|
|
|
+ case <-logWatcher.WatchClose():
|
|
|
+ fileWatcher.Remove(name)
|
|
|
+ return errDone
|
|
|
+ }
|
|
|
+ }
|
|
|
|
|
|
- if err = fileWatcher.Add(name); err != nil {
|
|
|
- logWatcher.Err <- err
|
|
|
- return
|
|
|
+ handleDecodeErr := func(err error) error {
|
|
|
+ if err == io.EOF {
|
|
|
+ for err := waitRead(); err != nil; {
|
|
|
+ if err == errRetry {
|
|
|
+ // retry the waitRead
|
|
|
+ continue
|
|
|
}
|
|
|
- if err != nil {
|
|
|
- logWatcher.Err <- err
|
|
|
+ return err
|
|
|
+ }
|
|
|
+ return nil
|
|
|
+ }
|
|
|
+ // try again because this shouldn't happen
|
|
|
+ if _, ok := err.(*json.SyntaxError); ok && retries <= maxJSONDecodeRetry {
|
|
|
+ dec = json.NewDecoder(f)
|
|
|
+ retries++
|
|
|
+ return nil
|
|
|
+ }
|
|
|
+ // io.ErrUnexpectedEOF is returned from json.Decoder when there is
|
|
|
+ // remaining data in the parser's buffer while an io.EOF occurs.
|
|
|
+ // If the json logger writes a partial json log entry to the disk
|
|
|
+ // while at the same time the decoder tries to decode it, the race condition happens.
|
|
|
+ if err == io.ErrUnexpectedEOF && retries <= maxJSONDecodeRetry {
|
|
|
+ reader := io.MultiReader(dec.Buffered(), f)
|
|
|
+ dec = json.NewDecoder(reader)
|
|
|
+ retries++
|
|
|
+ return nil
|
|
|
+ }
|
|
|
+ return err
|
|
|
+ }
|
|
|
+
|
|
|
+ // main loop
|
|
|
+ for {
|
|
|
+ msg, err := decodeLogLine(dec, l)
|
|
|
+ if err != nil {
|
|
|
+ if err := handleDecodeErr(err); err != nil {
|
|
|
+ if err == errDone {
|
|
|
return
|
|
|
}
|
|
|
-
|
|
|
- dec = json.NewDecoder(f)
|
|
|
- continue
|
|
|
+ // we got an unrecoverable error, so return
|
|
|
+ logWatcher.Err <- err
|
|
|
+ return
|
|
|
}
|
|
|
+ // ready to try again
|
|
|
+ continue
|
|
|
}
|
|
|
|
|
|
retries = 0 // reset retries since we've succeeded
|