diff --git a/daemon/logger/loggerutils/follow.go b/daemon/logger/loggerutils/follow.go new file mode 100644 index 0000000000..755a483d7a --- /dev/null +++ b/daemon/logger/loggerutils/follow.go @@ -0,0 +1,211 @@ +package loggerutils // import "github.com/docker/docker/daemon/logger/loggerutils" + +import ( + "io" + "os" + "time" + + "github.com/docker/docker/daemon/logger" + "github.com/docker/docker/pkg/filenotify" + "github.com/fsnotify/fsnotify" + "github.com/pkg/errors" + "github.com/sirupsen/logrus" +) + +var errRetry = errors.New("retry") +var errDone = errors.New("done") + +type follow struct { + file *os.File + dec Decoder + fileWatcher filenotify.FileWatcher + logWatcher *logger.LogWatcher + notifyRotate, notifyEvict chan interface{} + oldSize int64 + retries int +} + +func (fl *follow) handleRotate() error { + name := fl.file.Name() + + fl.file.Close() + fl.fileWatcher.Remove(name) + + // retry when the file doesn't exist + var err error + for retries := 0; retries <= 5; retries++ { + f, err := open(name) + if err == nil || !os.IsNotExist(err) { + fl.file = f + break + } + } + if err != nil { + return err + } + if err := fl.fileWatcher.Add(name); err != nil { + return err + } + fl.dec.Reset(fl.file) + return nil +} + +func (fl *follow) handleMustClose(evictErr error) { + fl.file.Close() + fl.dec.Close() + fl.logWatcher.Err <- errors.Wrap(evictErr, "log reader evicted due to errors") + logrus.WithField("file", fl.file.Name()).Error("Log reader notified that it must re-open log file, some log data may not be streamed to the client.") +} + +func (fl *follow) waitRead() error { + select { + case e := <-fl.notifyEvict: + if e != nil { + err := e.(error) + fl.handleMustClose(err) + } + return errDone + case e := <-fl.fileWatcher.Events(): + switch e.Op { + case fsnotify.Write: + fl.dec.Reset(fl.file) + return nil + case fsnotify.Rename, fsnotify.Remove: + select { + case <-fl.notifyRotate: + case <-fl.logWatcher.WatchProducerGone(): + return errDone + case <-fl.logWatcher.WatchConsumerGone(): + return errDone + } + if err := fl.handleRotate(); err != nil { + return err + } + return nil + } + return errRetry + case err := <-fl.fileWatcher.Errors(): + logrus.Debugf("logger got error watching file: %v", err) + // Something happened, let's try and stay alive and create a new watcher + if fl.retries <= 5 { + fl.fileWatcher.Close() + fl.fileWatcher, err = watchFile(fl.file.Name()) + if err != nil { + return err + } + fl.retries++ + return errRetry + } + return err + case <-fl.logWatcher.WatchProducerGone(): + return errDone + case <-fl.logWatcher.WatchConsumerGone(): + return errDone + } +} + +func (fl *follow) handleDecodeErr(err error) error { + if !errors.Is(err, io.EOF) { + return err + } + + // Handle special case (#39235): max-file=1 and file was truncated + st, stErr := fl.file.Stat() + if stErr == nil { + size := st.Size() + defer func() { fl.oldSize = size }() + if size < fl.oldSize { // truncated + fl.file.Seek(0, 0) + fl.dec.Reset(fl.file) + return nil + } + } else { + logrus.WithError(stErr).Warn("logger: stat error") + } + + for { + err := fl.waitRead() + if err == nil { + break + } + if err == errRetry { + continue + } + return err + } + return nil +} + +func (fl *follow) mainLoop(since, until time.Time) { + for { + select { + case err := <-fl.notifyEvict: + if err != nil { + fl.handleMustClose(err.(error)) + } + return + default: + } + msg, err := fl.dec.Decode() + if err != nil { + if err := fl.handleDecodeErr(err); err != nil { + if err == errDone { + return + } + // we got an unrecoverable error, so return + fl.logWatcher.Err <- err + return + } + // ready to try again + continue + } + + fl.retries = 0 // reset retries since we've succeeded + if !since.IsZero() && msg.Timestamp.Before(since) { + continue + } + if !until.IsZero() && msg.Timestamp.After(until) { + return + } + // send the message, unless the consumer is gone + select { + case e := <-fl.notifyEvict: + if e != nil { + err := e.(error) + logrus.WithError(err).Debug("Reader evicted while sending log message") + fl.logWatcher.Err <- err + } + return + case fl.logWatcher.Msg <- msg: + case <-fl.logWatcher.WatchConsumerGone(): + return + } + } +} + +func followLogs(f *os.File, logWatcher *logger.LogWatcher, notifyRotate, notifyEvict chan interface{}, dec Decoder, since, until time.Time) { + dec.Reset(f) + + name := f.Name() + fileWatcher, err := watchFile(name) + if err != nil { + logWatcher.Err <- err + return + } + defer func() { + f.Close() + dec.Close() + fileWatcher.Close() + }() + + fl := &follow{ + file: f, + oldSize: -1, + logWatcher: logWatcher, + fileWatcher: fileWatcher, + notifyRotate: notifyRotate, + notifyEvict: notifyEvict, + dec: dec, + } + fl.mainLoop(since, until) +} diff --git a/daemon/logger/loggerutils/logfile.go b/daemon/logger/loggerutils/logfile.go index 8817934946..d4f3f5e149 100644 --- a/daemon/logger/loggerutils/logfile.go +++ b/daemon/logger/loggerutils/logfile.go @@ -17,7 +17,6 @@ import ( "github.com/docker/docker/pkg/filenotify" "github.com/docker/docker/pkg/pools" "github.com/docker/docker/pkg/pubsub" - "github.com/fsnotify/fsnotify" "github.com/pkg/errors" "github.com/sirupsen/logrus" ) @@ -608,180 +607,6 @@ func tailFiles(files []SizeReaderAt, watcher *logger.LogWatcher, dec Decoder, ge } } -func followLogs(f *os.File, logWatcher *logger.LogWatcher, notifyRotate, notifyEvict chan interface{}, dec Decoder, since, until time.Time) { - dec.Reset(f) - - name := f.Name() - fileWatcher, err := watchFile(name) - if err != nil { - logWatcher.Err <- err - return - } - defer func() { - f.Close() - dec.Close() - fileWatcher.Close() - }() - - var retries int - handleRotate := func() error { - f.Close() - fileWatcher.Remove(name) - - // retry when the file doesn't exist - for retries := 0; retries <= 5; retries++ { - f, err = open(name) - if err == nil || !os.IsNotExist(err) { - break - } - } - if err != nil { - return err - } - if err := fileWatcher.Add(name); err != nil { - return err - } - dec.Reset(f) - return nil - } - - errRetry := errors.New("retry") - errDone := errors.New("done") - - handleMustClose := func(evictErr error) { - f.Close() - dec.Close() - logWatcher.Err <- errors.Wrap(err, "log reader evicted due to errors") - logrus.WithField("file", f.Name()).Error("Log reader notified that it must re-open log file, some log data may not be streamed to the client.") - } - - waitRead := func() error { - select { - case e := <-notifyEvict: - if e != nil { - err := e.(error) - handleMustClose(err) - } - return errDone - case e := <-fileWatcher.Events(): - switch e.Op { - case fsnotify.Write: - dec.Reset(f) - return nil - case fsnotify.Rename, fsnotify.Remove: - select { - case <-notifyRotate: - case <-logWatcher.WatchProducerGone(): - return errDone - case <-logWatcher.WatchConsumerGone(): - return errDone - } - if err := handleRotate(); err != nil { - return err - } - return nil - } - return errRetry - case err := <-fileWatcher.Errors(): - logrus.Debugf("logger got error watching file: %v", err) - // Something happened, let's try and stay alive and create a new watcher - if retries <= 5 { - fileWatcher.Close() - fileWatcher, err = watchFile(name) - if err != nil { - return err - } - retries++ - return errRetry - } - return err - case <-logWatcher.WatchProducerGone(): - return errDone - case <-logWatcher.WatchConsumerGone(): - return errDone - } - } - - oldSize := int64(-1) - handleDecodeErr := func(err error) error { - if !errors.Is(err, io.EOF) { - return err - } - - // Handle special case (#39235): max-file=1 and file was truncated - st, stErr := f.Stat() - if stErr == nil { - size := st.Size() - defer func() { oldSize = size }() - if size < oldSize { // truncated - f.Seek(0, 0) - dec.Reset(f) - return nil - } - } else { - logrus.WithError(stErr).Warn("logger: stat error") - } - - for { - err := waitRead() - if err == nil { - break - } - if err == errRetry { - continue - } - return err - } - return nil - } - - // main loop - for { - select { - case err := <-notifyEvict: - if err != nil { - handleMustClose(err.(error)) - } - return - default: - } - msg, err := dec.Decode() - if err != nil { - if err := handleDecodeErr(err); err != nil { - if err == errDone { - return - } - // we got an unrecoverable error, so return - logWatcher.Err <- err - return - } - // ready to try again - continue - } - - retries = 0 // reset retries since we've succeeded - if !since.IsZero() && msg.Timestamp.Before(since) { - continue - } - if !until.IsZero() && msg.Timestamp.After(until) { - return - } - // send the message, unless the consumer is gone - select { - case e := <-notifyEvict: - if e != nil { - err := e.(error) - logrus.WithError(err).Debug("Reader evicted while sending log message") - logWatcher.Err <- err - } - return - case logWatcher.Msg <- msg: - case <-logWatcher.WatchConsumerGone(): - return - } - } -} - func watchFile(name string) (filenotify.FileWatcher, error) { var fileWatcher filenotify.FileWatcher