diff --git a/daemon/logger/loggerutils/file_windows.go b/daemon/logger/loggerutils/file_windows.go index c42c08aa72..b16bf01d70 100644 --- a/daemon/logger/loggerutils/file_windows.go +++ b/daemon/logger/loggerutils/file_windows.go @@ -4,8 +4,6 @@ import ( "os" "syscall" "unsafe" - - "github.com/pkg/errors" ) func open(name string) (*os.File, error) { @@ -18,7 +16,7 @@ func openFile(name string, flag int, perm os.FileMode) (*os.File, error) { } h, err := syscallOpen(fixLongPath(name), flag|syscall.O_CLOEXEC, syscallMode(perm)) if err != nil { - return nil, errors.Wrap(err, "error opening file") + return nil, &os.PathError{Op: "open", Path: name, Err: err} } return os.NewFile(uintptr(h), name), nil } diff --git a/daemon/logger/loggerutils/logfile.go b/daemon/logger/loggerutils/logfile.go index b4212e7a81..6b42d9dd30 100644 --- a/daemon/logger/loggerutils/logfile.go +++ b/daemon/logger/loggerutils/logfile.go @@ -87,7 +87,7 @@ type LogFile struct { compress bool // whether old versions of log files are compressed lastTimestamp time.Time // timestamp of the last log filesRefCounter refCounter // keep reference-counted of decompressed files - notifyRotate *pubsub.Publisher + notifyReaders *pubsub.Publisher marshal logger.MarshalFunc createDecoder MakeDecoderFn getTailReader GetTailReaderFunc @@ -141,7 +141,7 @@ func NewLogFile(logPath string, capacity int64, maxFiles int, compress bool, mar maxFiles: maxFiles, compress: compress, filesRefCounter: refCounter{counter: make(map[string]int)}, - notifyRotate: pubsub.NewPublisher(0, 1), + notifyReaders: pubsub.NewPublisher(0, 1), marshal: marshaller, createDecoder: decodeFunc, perms: perms, @@ -167,7 +167,7 @@ func (w *LogFile) WriteLogEntry(msg *logger.Message) error { if err := w.checkCapacityAndRotate(); err != nil { w.mu.Unlock() - return err + return errors.Wrap(err, "error rotating log file") } n, err := w.f.Write(b) @@ -175,22 +175,25 @@ func (w *LogFile) WriteLogEntry(msg *logger.Message) error { w.currentSize += int64(n) w.lastTimestamp = msg.Timestamp } + w.mu.Unlock() - return err + return errors.Wrap(err, "error writing log entry") } func (w *LogFile) checkCapacityAndRotate() (retErr error) { if w.capacity == -1 { return nil } - if w.currentSize < w.capacity { return nil } w.rotateMu.Lock() + noCompress := w.maxFiles <= 1 || !w.compress defer func() { - if retErr != nil || w.maxFiles <= 1 || !w.compress { + // If we aren't going to run the goroutine to compress the log file, then we need to unlock in this function. + // Otherwise the lock will be released in the goroutine that handles compression. + if retErr != nil || noCompress { w.rotateMu.Unlock() } }() @@ -204,17 +207,33 @@ func (w *LogFile) checkCapacityAndRotate() (retErr error) { } if err := rotate(fname, w.maxFiles, w.compress); err != nil { - return err + logrus.WithError(err).Warn("Error rotating log file, log data may have been lost") + } else { + var renameErr error + for i := 0; i < 10; i++ { + if renameErr = os.Rename(fname, fname+".1"); renameErr != nil && !os.IsNotExist(renameErr) { + logrus.WithError(renameErr).WithField("file", fname).Debug("Error rotating current container log file, evicting readers and retrying") + w.notifyReaders.Publish(renameErr) + time.Sleep(100 * time.Millisecond) + continue + } + break + } + if renameErr != nil { + logrus.WithError(renameErr).Error("Error renaming current log file") + } } + file, err := openFile(fname, os.O_WRONLY|os.O_TRUNC|os.O_CREATE, w.perms) if err != nil { return err } w.f = file w.currentSize = 0 - w.notifyRotate.Publish(struct{}{}) - if w.maxFiles <= 1 || !w.compress { + w.notifyReaders.Publish(struct{}{}) + + if noCompress { return nil } @@ -249,15 +268,12 @@ func rotate(name string, maxFiles int, compress bool) error { for i := maxFiles - 1; i > 1; i-- { toPath := name + "." + strconv.Itoa(i) + extension fromPath := name + "." + strconv.Itoa(i-1) + extension + logrus.WithField("source", fromPath).WithField("target", toPath).Trace("Rotating log file") if err := os.Rename(fromPath, toPath); err != nil && !os.IsNotExist(err) { return err } } - if err := os.Rename(name, name+".1"); err != nil && !os.IsNotExist(err) { - return err - } - return nil } @@ -272,9 +288,11 @@ func compressFile(fileName string, lastTimestamp time.Time) (retErr error) { } defer func() { file.Close() - err := os.Remove(fileName) - if err != nil && !os.IsNotExist(err) { - retErr = errors.Wrap(err, "failed to remove source log file") + if retErr == nil { + err := os.Remove(fileName) + if err != nil && !os.IsNotExist(err) { + retErr = errors.Wrap(err, "failed to remove source log file") + } } }() @@ -354,6 +372,12 @@ func (w *LogFile) ReadLogs(config logger.ReadConfig, watcher *logger.LogWatcher) return } + notifyEvict := w.notifyReaders.SubscribeTopicWithBuffer(func(i interface{}) bool { + _, ok := i.(error) + return ok + }, 1) + defer w.notifyReaders.Evict(notifyEvict) + if config.Tail != 0 { // TODO(@cpuguy83): Instead of opening every file, only get the files which // are needed to tail. @@ -392,9 +416,11 @@ func (w *LogFile) ReadLogs(config logger.ReadConfig, watcher *logger.LogWatcher) readers = append(readers, currentChunk) } - tailFiles(readers, watcher, dec, w.getTailReader, config) + ok := tailFiles(readers, watcher, dec, w.getTailReader, config, notifyEvict) closeFiles() - + if !ok { + return + } w.mu.RLock() } @@ -404,9 +430,13 @@ func (w *LogFile) ReadLogs(config logger.ReadConfig, watcher *logger.LogWatcher) } w.mu.RUnlock() - notifyRotate := w.notifyRotate.Subscribe() - defer w.notifyRotate.Evict(notifyRotate) - followLogs(currentFile, watcher, notifyRotate, dec, config.Since, config.Until) + notifyRotate := w.notifyReaders.SubscribeTopic(func(i interface{}) bool { + _, ok := i.(struct{}) + return ok + }) + defer w.notifyReaders.Evict(notifyRotate) + + followLogs(currentFile, watcher, notifyRotate, notifyEvict, dec, config.Since, config.Until) } func (w *LogFile) openRotatedFiles(config logger.ReadConfig) (files []*os.File, err error) { @@ -512,16 +542,25 @@ func newSectionReader(f *os.File) (*io.SectionReader, error) { return io.NewSectionReader(f, 0, size), nil } -func tailFiles(files []SizeReaderAt, watcher *logger.LogWatcher, dec Decoder, getTailReader GetTailReaderFunc, config logger.ReadConfig) { +func tailFiles(files []SizeReaderAt, watcher *logger.LogWatcher, dec Decoder, getTailReader GetTailReaderFunc, config logger.ReadConfig, notifyEvict <-chan interface{}) (cont bool) { nLines := config.Tail ctx, cancel := context.WithCancel(context.Background()) defer cancel() + + cont = true // TODO(@cpuguy83): we should plumb a context through instead of dealing with `WatchClose()` here. go func() { select { + case err := <-notifyEvict: + if err != nil { + watcher.Err <- err.(error) + cont = false + cancel() + } case <-ctx.Done(): case <-watcher.WatchConsumerGone(): + cont = false cancel() } }() @@ -569,7 +608,7 @@ func tailFiles(files []SizeReaderAt, watcher *logger.LogWatcher, dec Decoder, ge } } -func followLogs(f *os.File, logWatcher *logger.LogWatcher, notifyRotate chan interface{}, dec Decoder, since, until time.Time) { +func followLogs(f *os.File, logWatcher *logger.LogWatcher, notifyRotate, notifyEvict chan interface{}, dec Decoder, since, until time.Time) { dec.Reset(f) name := f.Name() @@ -580,6 +619,7 @@ func followLogs(f *os.File, logWatcher *logger.LogWatcher, notifyRotate chan int } defer func() { f.Close() + dec.Close() fileWatcher.Close() }() @@ -607,8 +647,22 @@ func followLogs(f *os.File, logWatcher *logger.LogWatcher, notifyRotate chan int errRetry := errors.New("retry") errDone := errors.New("done") + + handleMustClose := func(evictErr error) { + f.Close() + dec.Close() + logWatcher.Err <- errors.Wrap(err, "log reader evicted due to errors") + logrus.WithField("file", f.Name()).Error("Log reader notified that it must re-open log file, some log data may not be streamed to the client.") + } + waitRead := func() error { select { + case e := <-notifyEvict: + if e != nil { + err := e.(error) + handleMustClose(err) + } + return errDone case e := <-fileWatcher.Events(): switch e.Op { case fsnotify.Write: @@ -682,6 +736,14 @@ func followLogs(f *os.File, logWatcher *logger.LogWatcher, notifyRotate chan int // main loop for { + select { + case err := <-notifyEvict: + if err != nil { + handleMustClose(err.(error)) + } + return + default: + } msg, err := dec.Decode() if err != nil { if err := handleDecodeErr(err); err != nil { @@ -705,6 +767,13 @@ func followLogs(f *os.File, logWatcher *logger.LogWatcher, notifyRotate chan int } // send the message, unless the consumer is gone select { + case e := <-notifyEvict: + if e != nil { + err := e.(error) + logrus.WithError(err).Debug("Reader evicted while sending log message") + logWatcher.Err <- err + } + return case logWatcher.Msg <- msg: case <-logWatcher.WatchConsumerGone(): return diff --git a/daemon/logger/loggerutils/logfile_test.go b/daemon/logger/loggerutils/logfile_test.go index fc83f9c0c5..af951c75f5 100644 --- a/daemon/logger/loggerutils/logfile_test.go +++ b/daemon/logger/loggerutils/logfile_test.go @@ -1,8 +1,10 @@ -package loggerutils +package loggerutils // import "github.com/docker/docker/daemon/logger/loggerutils" import ( "bufio" + "bytes" "context" + "fmt" "io" "io/ioutil" "os" @@ -10,6 +12,7 @@ import ( "strings" "sync/atomic" "testing" + "text/tabwriter" "time" "github.com/docker/docker/daemon/logger" @@ -64,7 +67,7 @@ func TestTailFiles(t *testing.T) { started := make(chan struct{}) go func() { close(started) - tailFiles(files, watcher, dec, tailReader, config) + tailFiles(files, watcher, dec, tailReader, config, make(chan interface{})) }() <-started }) @@ -74,7 +77,7 @@ func TestTailFiles(t *testing.T) { started := make(chan struct{}) go func() { close(started) - tailFiles(files, watcher, dec, tailReader, config) + tailFiles(files, watcher, dec, tailReader, config, make(chan interface{})) }() <-started @@ -123,7 +126,7 @@ func TestFollowLogsConsumerGone(t *testing.T) { followLogsDone := make(chan struct{}) var since, until time.Time go func() { - followLogs(f, lw, make(chan interface{}), dec, since, until) + followLogs(f, lw, make(chan interface{}), make(chan interface{}), dec, since, until) close(followLogsDone) }() @@ -184,7 +187,7 @@ func TestFollowLogsProducerGone(t *testing.T) { followLogsDone := make(chan struct{}) go func() { - followLogs(f, lw, make(chan interface{}), dec, since, until) + followLogs(f, lw, make(chan interface{}), make(chan interface{}), dec, since, until) close(followLogsDone) }() @@ -251,21 +254,30 @@ func TestCheckCapacityAndRotate(t *testing.T) { assert.NilError(t, err) l := &LogFile{ - f: f, - capacity: 5, - maxFiles: 3, - compress: true, - notifyRotate: pubsub.NewPublisher(0, 1), - perms: 0600, + f: f, + capacity: 5, + maxFiles: 3, + compress: true, + notifyReaders: pubsub.NewPublisher(0, 1), + perms: 0600, + filesRefCounter: refCounter{counter: make(map[string]int)}, + getTailReader: func(ctx context.Context, r SizeReaderAt, lines int) (io.Reader, int, error) { + return tailfile.NewTailReader(ctx, r, lines) + }, + createDecoder: func(io.Reader) Decoder { + return dummyDecoder{} + }, marshal: func(msg *logger.Message) ([]byte, error) { return msg.Line, nil }, } defer l.Close() + ls := dirStringer{dir} + assert.NilError(t, l.WriteLogEntry(&logger.Message{Line: []byte("hello world!")})) _, err = os.Stat(f.Name() + ".1") - assert.Assert(t, os.IsNotExist(err), dirStringer{dir}) + assert.Assert(t, os.IsNotExist(err), ls) assert.NilError(t, l.WriteLogEntry(&logger.Message{Line: []byte("hello world!")})) poll.WaitOn(t, checkFileExists(f.Name()+".1.gz"), poll.WithDelay(time.Millisecond), poll.WithTimeout(30*time.Second)) @@ -274,11 +286,48 @@ func TestCheckCapacityAndRotate(t *testing.T) { poll.WaitOn(t, checkFileExists(f.Name()+".1.gz"), poll.WithDelay(time.Millisecond), poll.WithTimeout(30*time.Second)) poll.WaitOn(t, checkFileExists(f.Name()+".2.gz"), poll.WithDelay(time.Millisecond), poll.WithTimeout(30*time.Second)) - // Now let's simulate a failed rotation where the file was able to be closed but something else happened elsewhere - // down the line. - // We want to make sure that we can recover in the case that `l.f` was closed while attempting a rotation. - l.f.Close() - assert.NilError(t, l.WriteLogEntry(&logger.Message{Line: []byte("hello world!")})) + t.Run("closed log file", func(t *testing.T) { + // Now let's simulate a failed rotation where the file was able to be closed but something else happened elsewhere + // down the line. + // We want to make sure that we can recover in the case that `l.f` was closed while attempting a rotation. + l.f.Close() + assert.NilError(t, l.WriteLogEntry(&logger.Message{Line: []byte("hello world!")})) + assert.NilError(t, os.Remove(f.Name()+".2.gz")) + }) + + t.Run("with log reader", func(t *testing.T) { + // Make sure rotate works with an active reader + lw := logger.NewLogWatcher() + defer lw.ConsumerGone() + go l.ReadLogs(logger.ReadConfig{Follow: true, Tail: 1000}, lw) + + assert.NilError(t, l.WriteLogEntry(&logger.Message{Line: []byte("hello world 0!")}), ls) + // make sure the log reader is primed + waitForMsg(t, lw, 30*time.Second) + + assert.NilError(t, l.WriteLogEntry(&logger.Message{Line: []byte("hello world 1!")}), ls) + assert.NilError(t, l.WriteLogEntry(&logger.Message{Line: []byte("hello world 2!")}), ls) + assert.NilError(t, l.WriteLogEntry(&logger.Message{Line: []byte("hello world 3!")}), ls) + assert.NilError(t, l.WriteLogEntry(&logger.Message{Line: []byte("hello world 4!")}), ls) + poll.WaitOn(t, checkFileExists(f.Name()+".2.gz"), poll.WithDelay(time.Millisecond), poll.WithTimeout(30*time.Second)) + }) +} + +func waitForMsg(t *testing.T, lw *logger.LogWatcher, timeout time.Duration) { + t.Helper() + + timer := time.NewTimer(timeout) + defer timer.Stop() + + select { + case <-lw.Msg: + case <-lw.WatchProducerGone(): + t.Fatal("log producer gone before log message arrived") + case err := <-lw.Err: + assert.NilError(t, err) + case <-timer.C: + t.Fatal("timeout waiting for log message") + } } type dirStringer struct { @@ -290,13 +339,18 @@ func (d dirStringer) String() string { if err != nil { return "" } - var s strings.Builder - s.WriteString("\n") + buf := bytes.NewBuffer(nil) + tw := tabwriter.NewWriter(buf, 1, 8, 1, '\t', 0) + buf.WriteString("\n") + + btw := bufio.NewWriter(tw) for _, fi := range ls { - s.WriteString(fi.Name() + "\n") + btw.WriteString(fmt.Sprintf("%s\t%s\t%dB\t%s\n", fi.Name(), fi.Mode(), fi.Size(), fi.ModTime())) } - return s.String() + btw.Flush() + tw.Flush() + return buf.String() } func checkFileExists(name string) poll.Check {