diff --git a/daemon/logger/loggerutils/file_unix.go b/daemon/logger/loggerutils/file_unix.go index eb39d4b73d..e7b6095296 100644 --- a/daemon/logger/loggerutils/file_unix.go +++ b/daemon/logger/loggerutils/file_unix.go @@ -7,3 +7,7 @@ import "os" func openFile(name string, flag int, perm os.FileMode) (*os.File, error) { return os.OpenFile(name, flag, perm) } + +func open(name string) (*os.File, error) { + return os.Open(name) +} diff --git a/daemon/logger/loggerutils/file_windows.go b/daemon/logger/loggerutils/file_windows.go index 507da6874d..b16bf01d70 100644 --- a/daemon/logger/loggerutils/file_windows.go +++ b/daemon/logger/loggerutils/file_windows.go @@ -4,17 +4,19 @@ import ( "os" "syscall" "unsafe" - - "github.com/pkg/errors" ) +func open(name string) (*os.File, error) { + return openFile(name, os.O_RDONLY, 0) +} + func openFile(name string, flag int, perm os.FileMode) (*os.File, error) { if name == "" { return nil, &os.PathError{Op: "open", Path: name, Err: syscall.ENOENT} } h, err := syscallOpen(fixLongPath(name), flag|syscall.O_CLOEXEC, syscallMode(perm)) if err != nil { - return nil, errors.Wrap(err, "error opening file") + return nil, &os.PathError{Op: "open", Path: name, Err: err} } return os.NewFile(uintptr(h), name), nil } diff --git a/daemon/logger/loggerutils/logfile.go b/daemon/logger/loggerutils/logfile.go index 2b01192102..6b42d9dd30 100644 --- a/daemon/logger/loggerutils/logfile.go +++ b/daemon/logger/loggerutils/logfile.go @@ -68,7 +68,7 @@ func (rc *refCounter) Dereference(fileName string) error { if rc.counter[fileName] <= 0 { delete(rc.counter, fileName) err := os.Remove(fileName) - if err != nil { + if err != nil && !os.IsNotExist(err) { return err } } @@ -87,7 +87,7 @@ type LogFile struct { compress bool // whether old versions of log files are compressed lastTimestamp time.Time // timestamp of the last log filesRefCounter refCounter // keep reference-counted of decompressed files - notifyRotate *pubsub.Publisher + notifyReaders *pubsub.Publisher marshal logger.MarshalFunc createDecoder MakeDecoderFn getTailReader GetTailReaderFunc @@ -141,7 +141,7 @@ func NewLogFile(logPath string, capacity int64, maxFiles int, compress bool, mar maxFiles: maxFiles, compress: compress, filesRefCounter: refCounter{counter: make(map[string]int)}, - notifyRotate: pubsub.NewPublisher(0, 1), + notifyReaders: pubsub.NewPublisher(0, 1), marshal: marshaller, createDecoder: decodeFunc, perms: perms, @@ -167,7 +167,7 @@ func (w *LogFile) WriteLogEntry(msg *logger.Message) error { if err := w.checkCapacityAndRotate(); err != nil { w.mu.Unlock() - return err + return errors.Wrap(err, "error rotating log file") } n, err := w.f.Write(b) @@ -175,49 +175,77 @@ func (w *LogFile) WriteLogEntry(msg *logger.Message) error { w.currentSize += int64(n) w.lastTimestamp = msg.Timestamp } + w.mu.Unlock() - return err + return errors.Wrap(err, "error writing log entry") } -func (w *LogFile) checkCapacityAndRotate() error { +func (w *LogFile) checkCapacityAndRotate() (retErr error) { if w.capacity == -1 { return nil } - - if w.currentSize >= w.capacity { - w.rotateMu.Lock() - fname := w.f.Name() - if err := w.f.Close(); err != nil { - // if there was an error during a prior rotate, the file could already be closed - if !errors.Is(err, os.ErrClosed) { - w.rotateMu.Unlock() - return errors.Wrap(err, "error closing file") - } - } - if err := rotate(fname, w.maxFiles, w.compress); err != nil { - w.rotateMu.Unlock() - return err - } - file, err := openFile(fname, os.O_WRONLY|os.O_TRUNC|os.O_CREATE, w.perms) - if err != nil { - w.rotateMu.Unlock() - return err - } - w.f = file - w.currentSize = 0 - w.notifyRotate.Publish(struct{}{}) - - if w.maxFiles <= 1 || !w.compress { - w.rotateMu.Unlock() - return nil - } - - go func() { - compressFile(fname+".1", w.lastTimestamp) - w.rotateMu.Unlock() - }() + if w.currentSize < w.capacity { + return nil } + w.rotateMu.Lock() + noCompress := w.maxFiles <= 1 || !w.compress + defer func() { + // If we aren't going to run the goroutine to compress the log file, then we need to unlock in this function. + // Otherwise the lock will be released in the goroutine that handles compression. + if retErr != nil || noCompress { + w.rotateMu.Unlock() + } + }() + + fname := w.f.Name() + if err := w.f.Close(); err != nil { + // if there was an error during a prior rotate, the file could already be closed + if !errors.Is(err, os.ErrClosed) { + return errors.Wrap(err, "error closing file") + } + } + + if err := rotate(fname, w.maxFiles, w.compress); err != nil { + logrus.WithError(err).Warn("Error rotating log file, log data may have been lost") + } else { + var renameErr error + for i := 0; i < 10; i++ { + if renameErr = os.Rename(fname, fname+".1"); renameErr != nil && !os.IsNotExist(renameErr) { + logrus.WithError(renameErr).WithField("file", fname).Debug("Error rotating current container log file, evicting readers and retrying") + w.notifyReaders.Publish(renameErr) + time.Sleep(100 * time.Millisecond) + continue + } + break + } + if renameErr != nil { + logrus.WithError(renameErr).Error("Error renaming current log file") + } + } + + file, err := openFile(fname, os.O_WRONLY|os.O_TRUNC|os.O_CREATE, w.perms) + if err != nil { + return err + } + w.f = file + w.currentSize = 0 + + w.notifyReaders.Publish(struct{}{}) + + if noCompress { + return nil + } + + ts := w.lastTimestamp + + go func() { + if err := compressFile(fname+".1", ts); err != nil { + logrus.WithError(err).Error("Error compressing log file after rotation") + } + w.rotateMu.Unlock() + }() + return nil } @@ -240,41 +268,44 @@ func rotate(name string, maxFiles int, compress bool) error { for i := maxFiles - 1; i > 1; i-- { toPath := name + "." + strconv.Itoa(i) + extension fromPath := name + "." + strconv.Itoa(i-1) + extension + logrus.WithField("source", fromPath).WithField("target", toPath).Trace("Rotating log file") if err := os.Rename(fromPath, toPath); err != nil && !os.IsNotExist(err) { return err } } - if err := os.Rename(name, name+".1"); err != nil && !os.IsNotExist(err) { - return err - } - return nil } -func compressFile(fileName string, lastTimestamp time.Time) { - file, err := os.Open(fileName) +func compressFile(fileName string, lastTimestamp time.Time) (retErr error) { + file, err := open(fileName) if err != nil { - logrus.Errorf("Failed to open log file: %v", err) - return + if os.IsNotExist(err) { + logrus.WithField("file", fileName).WithError(err).Debug("Could not open log file to compress") + return nil + } + return errors.Wrap(err, "failed to open log file") } defer func() { file.Close() - err := os.Remove(fileName) - if err != nil { - logrus.Errorf("Failed to remove source log file: %v", err) + if retErr == nil { + err := os.Remove(fileName) + if err != nil && !os.IsNotExist(err) { + retErr = errors.Wrap(err, "failed to remove source log file") + } } }() outFile, err := openFile(fileName+".gz", os.O_CREATE|os.O_TRUNC|os.O_RDWR, 0640) if err != nil { - logrus.Errorf("Failed to open or create gzip log file: %v", err) - return + return errors.Wrap(err, "failed to open or create gzip log file") } defer func() { outFile.Close() - if err != nil { - os.Remove(fileName + ".gz") + if retErr != nil { + if err := os.Remove(fileName + ".gz"); err != nil && !os.IsExist(err) { + logrus.WithError(err).Error("Error cleaning up after failed log compression") + } } }() @@ -292,9 +323,10 @@ func compressFile(fileName string, lastTimestamp time.Time) { _, err = pools.Copy(compressWriter, file) if err != nil { - logrus.WithError(err).WithField("module", "container.logs").WithField("file", fileName).Error("Error compressing log file") - return + return errors.Wrapf(err, "error compressing log file %s", fileName) } + + return nil } // MaxFiles return maximum number of files @@ -309,7 +341,7 @@ func (w *LogFile) Close() error { if w.closed { return nil } - if err := w.f.Close(); err != nil { + if err := w.f.Close(); err != nil && !errors.Is(err, os.ErrClosed) { return err } w.closed = true @@ -322,7 +354,7 @@ func (w *LogFile) Close() error { // TODO: Consider a different implementation which can effectively follow logs under frequent rotations. func (w *LogFile) ReadLogs(config logger.ReadConfig, watcher *logger.LogWatcher) { w.mu.RLock() - currentFile, err := os.Open(w.f.Name()) + currentFile, err := open(w.f.Name()) if err != nil { w.mu.RUnlock() watcher.Err <- err @@ -340,6 +372,12 @@ func (w *LogFile) ReadLogs(config logger.ReadConfig, watcher *logger.LogWatcher) return } + notifyEvict := w.notifyReaders.SubscribeTopicWithBuffer(func(i interface{}) bool { + _, ok := i.(error) + return ok + }, 1) + defer w.notifyReaders.Evict(notifyEvict) + if config.Tail != 0 { // TODO(@cpuguy83): Instead of opening every file, only get the files which // are needed to tail. @@ -358,7 +396,7 @@ func (w *LogFile) ReadLogs(config logger.ReadConfig, watcher *logger.LogWatcher) if strings.HasSuffix(fileName, tmpLogfileSuffix) { err := w.filesRefCounter.Dereference(fileName) if err != nil { - logrus.Errorf("Failed to dereference the log file %q: %v", fileName, err) + logrus.WithError(err).WithField("file", fileName).Error("Failed to dereference the log file") } } } @@ -378,9 +416,11 @@ func (w *LogFile) ReadLogs(config logger.ReadConfig, watcher *logger.LogWatcher) readers = append(readers, currentChunk) } - tailFiles(readers, watcher, dec, w.getTailReader, config) + ok := tailFiles(readers, watcher, dec, w.getTailReader, config, notifyEvict) closeFiles() - + if !ok { + return + } w.mu.RLock() } @@ -390,9 +430,13 @@ func (w *LogFile) ReadLogs(config logger.ReadConfig, watcher *logger.LogWatcher) } w.mu.RUnlock() - notifyRotate := w.notifyRotate.Subscribe() - defer w.notifyRotate.Evict(notifyRotate) - followLogs(currentFile, watcher, notifyRotate, dec, config.Since, config.Until) + notifyRotate := w.notifyReaders.SubscribeTopic(func(i interface{}) bool { + _, ok := i.(struct{}) + return ok + }) + defer w.notifyReaders.Evict(notifyRotate) + + followLogs(currentFile, watcher, notifyRotate, notifyEvict, dec, config.Since, config.Until) } func (w *LogFile) openRotatedFiles(config logger.ReadConfig) (files []*os.File, err error) { @@ -415,7 +459,7 @@ func (w *LogFile) openRotatedFiles(config logger.ReadConfig) (files []*os.File, }() for i := w.maxFiles; i > 1; i-- { - f, err := os.Open(fmt.Sprintf("%s.%d", w.f.Name(), i-1)) + f, err := open(fmt.Sprintf("%s.%d", w.f.Name(), i-1)) if err != nil { if !os.IsNotExist(err) { return nil, errors.Wrap(err, "error opening rotated log file") @@ -425,7 +469,7 @@ func (w *LogFile) openRotatedFiles(config logger.ReadConfig) (files []*os.File, decompressedFileName := fileName + tmpLogfileSuffix tmpFile, err := w.filesRefCounter.GetReference(decompressedFileName, func(refFileName string, exists bool) (*os.File, error) { if exists { - return os.Open(refFileName) + return open(refFileName) } return decompressfile(fileName, refFileName, config.Since) }) @@ -451,7 +495,7 @@ func (w *LogFile) openRotatedFiles(config logger.ReadConfig) (files []*os.File, } func decompressfile(fileName, destFileName string, since time.Time) (*os.File, error) { - cf, err := os.Open(fileName) + cf, err := open(fileName) if err != nil { return nil, errors.Wrap(err, "error opening file for decompression") } @@ -498,16 +542,25 @@ func newSectionReader(f *os.File) (*io.SectionReader, error) { return io.NewSectionReader(f, 0, size), nil } -func tailFiles(files []SizeReaderAt, watcher *logger.LogWatcher, dec Decoder, getTailReader GetTailReaderFunc, config logger.ReadConfig) { +func tailFiles(files []SizeReaderAt, watcher *logger.LogWatcher, dec Decoder, getTailReader GetTailReaderFunc, config logger.ReadConfig, notifyEvict <-chan interface{}) (cont bool) { nLines := config.Tail ctx, cancel := context.WithCancel(context.Background()) defer cancel() + + cont = true // TODO(@cpuguy83): we should plumb a context through instead of dealing with `WatchClose()` here. go func() { select { + case err := <-notifyEvict: + if err != nil { + watcher.Err <- err.(error) + cont = false + cancel() + } case <-ctx.Done(): case <-watcher.WatchConsumerGone(): + cont = false cancel() } }() @@ -555,7 +608,7 @@ func tailFiles(files []SizeReaderAt, watcher *logger.LogWatcher, dec Decoder, ge } } -func followLogs(f *os.File, logWatcher *logger.LogWatcher, notifyRotate chan interface{}, dec Decoder, since, until time.Time) { +func followLogs(f *os.File, logWatcher *logger.LogWatcher, notifyRotate, notifyEvict chan interface{}, dec Decoder, since, until time.Time) { dec.Reset(f) name := f.Name() @@ -566,6 +619,7 @@ func followLogs(f *os.File, logWatcher *logger.LogWatcher, notifyRotate chan int } defer func() { f.Close() + dec.Close() fileWatcher.Close() }() @@ -576,7 +630,7 @@ func followLogs(f *os.File, logWatcher *logger.LogWatcher, notifyRotate chan int // retry when the file doesn't exist for retries := 0; retries <= 5; retries++ { - f, err = os.Open(name) + f, err = open(name) if err == nil || !os.IsNotExist(err) { break } @@ -593,8 +647,22 @@ func followLogs(f *os.File, logWatcher *logger.LogWatcher, notifyRotate chan int errRetry := errors.New("retry") errDone := errors.New("done") + + handleMustClose := func(evictErr error) { + f.Close() + dec.Close() + logWatcher.Err <- errors.Wrap(err, "log reader evicted due to errors") + logrus.WithField("file", f.Name()).Error("Log reader notified that it must re-open log file, some log data may not be streamed to the client.") + } + waitRead := func() error { select { + case e := <-notifyEvict: + if e != nil { + err := e.(error) + handleMustClose(err) + } + return errDone case e := <-fileWatcher.Events(): switch e.Op { case fsnotify.Write: @@ -668,6 +736,14 @@ func followLogs(f *os.File, logWatcher *logger.LogWatcher, notifyRotate chan int // main loop for { + select { + case err := <-notifyEvict: + if err != nil { + handleMustClose(err.(error)) + } + return + default: + } msg, err := dec.Decode() if err != nil { if err := handleDecodeErr(err); err != nil { @@ -691,6 +767,13 @@ func followLogs(f *os.File, logWatcher *logger.LogWatcher, notifyRotate chan int } // send the message, unless the consumer is gone select { + case e := <-notifyEvict: + if e != nil { + err := e.(error) + logrus.WithError(err).Debug("Reader evicted while sending log message") + logWatcher.Err <- err + } + return case logWatcher.Msg <- msg: case <-logWatcher.WatchConsumerGone(): return diff --git a/daemon/logger/loggerutils/logfile_test.go b/daemon/logger/loggerutils/logfile_test.go index ab4a542b19..af951c75f5 100644 --- a/daemon/logger/loggerutils/logfile_test.go +++ b/daemon/logger/loggerutils/logfile_test.go @@ -1,14 +1,18 @@ -package loggerutils +package loggerutils // import "github.com/docker/docker/daemon/logger/loggerutils" import ( "bufio" + "bytes" "context" + "fmt" "io" "io/ioutil" "os" "path/filepath" "strings" + "sync/atomic" "testing" + "text/tabwriter" "time" "github.com/docker/docker/daemon/logger" @@ -51,6 +55,7 @@ func TestTailFiles(t *testing.T) { files := []SizeReaderAt{s1, s2, s3} watcher := logger.NewLogWatcher() + defer watcher.ConsumerGone() tailReader := func(ctx context.Context, r SizeReaderAt, lines int) (io.Reader, int, error) { return tailfile.NewTailReader(ctx, r, lines) @@ -62,7 +67,7 @@ func TestTailFiles(t *testing.T) { started := make(chan struct{}) go func() { close(started) - tailFiles(files, watcher, dec, tailReader, config) + tailFiles(files, watcher, dec, tailReader, config, make(chan interface{})) }() <-started }) @@ -72,7 +77,7 @@ func TestTailFiles(t *testing.T) { started := make(chan struct{}) go func() { close(started) - tailFiles(files, watcher, dec, tailReader, config) + tailFiles(files, watcher, dec, tailReader, config, make(chan interface{})) }() <-started @@ -121,7 +126,7 @@ func TestFollowLogsConsumerGone(t *testing.T) { followLogsDone := make(chan struct{}) var since, until time.Time go func() { - followLogs(f, lw, make(chan interface{}), dec, since, until) + followLogs(f, lw, make(chan interface{}), make(chan interface{}), dec, since, until) close(followLogsDone) }() @@ -157,19 +162,20 @@ func (d *dummyWrapper) Decode() (*logger.Message, error) { func TestFollowLogsProducerGone(t *testing.T) { lw := logger.NewLogWatcher() + defer lw.ConsumerGone() f, err := ioutil.TempFile("", t.Name()) assert.NilError(t, err) defer os.Remove(f.Name()) - var sent, received, closed int + var sent, received, closed int32 dec := &dummyWrapper{fn: func() error { - switch closed { + switch atomic.LoadInt32(&closed) { case 0: - sent++ + atomic.AddInt32(&sent, 1) return nil case 1: - closed++ + atomic.AddInt32(&closed, 1) t.Logf("logDecode() closed after sending %d messages\n", sent) return io.EOF default: @@ -181,7 +187,7 @@ func TestFollowLogsProducerGone(t *testing.T) { followLogsDone := make(chan struct{}) go func() { - followLogs(f, lw, make(chan interface{}), dec, since, until) + followLogs(f, lw, make(chan interface{}), make(chan interface{}), dec, since, until) close(followLogsDone) }() @@ -198,7 +204,7 @@ func TestFollowLogsProducerGone(t *testing.T) { } // "stop" the "container" - closed = 1 + atomic.StoreInt32(&closed, 1) lw.ProducerGone() // should receive all the messages sent @@ -209,7 +215,7 @@ func TestFollowLogsProducerGone(t *testing.T) { select { case <-lw.Msg: received++ - if received == sent { + if received == atomic.LoadInt32(&sent) { return } case err := <-lw.Err: @@ -223,7 +229,7 @@ func TestFollowLogsProducerGone(t *testing.T) { t.Fatalf("timeout waiting for log messages to be read (sent: %d, received: %d", sent, received) } - t.Logf("messages sent: %d, received: %d", sent, received) + t.Logf("messages sent: %d, received: %d", atomic.LoadInt32(&sent), received) // followLogs() should be done by now select { @@ -248,21 +254,30 @@ func TestCheckCapacityAndRotate(t *testing.T) { assert.NilError(t, err) l := &LogFile{ - f: f, - capacity: 5, - maxFiles: 3, - compress: true, - notifyRotate: pubsub.NewPublisher(0, 1), - perms: 0600, + f: f, + capacity: 5, + maxFiles: 3, + compress: true, + notifyReaders: pubsub.NewPublisher(0, 1), + perms: 0600, + filesRefCounter: refCounter{counter: make(map[string]int)}, + getTailReader: func(ctx context.Context, r SizeReaderAt, lines int) (io.Reader, int, error) { + return tailfile.NewTailReader(ctx, r, lines) + }, + createDecoder: func(io.Reader) Decoder { + return dummyDecoder{} + }, marshal: func(msg *logger.Message) ([]byte, error) { return msg.Line, nil }, } defer l.Close() + ls := dirStringer{dir} + assert.NilError(t, l.WriteLogEntry(&logger.Message{Line: []byte("hello world!")})) _, err = os.Stat(f.Name() + ".1") - assert.Assert(t, os.IsNotExist(err), dirStringer{dir}) + assert.Assert(t, os.IsNotExist(err), ls) assert.NilError(t, l.WriteLogEntry(&logger.Message{Line: []byte("hello world!")})) poll.WaitOn(t, checkFileExists(f.Name()+".1.gz"), poll.WithDelay(time.Millisecond), poll.WithTimeout(30*time.Second)) @@ -271,11 +286,48 @@ func TestCheckCapacityAndRotate(t *testing.T) { poll.WaitOn(t, checkFileExists(f.Name()+".1.gz"), poll.WithDelay(time.Millisecond), poll.WithTimeout(30*time.Second)) poll.WaitOn(t, checkFileExists(f.Name()+".2.gz"), poll.WithDelay(time.Millisecond), poll.WithTimeout(30*time.Second)) - // Now let's simulate a failed rotation where the file was able to be closed but something else happened elsewhere - // down the line. - // We want to make sure that we can recover in the case that `l.f` was closed while attempting a rotation. - l.f.Close() - assert.NilError(t, l.WriteLogEntry(&logger.Message{Line: []byte("hello world!")})) + t.Run("closed log file", func(t *testing.T) { + // Now let's simulate a failed rotation where the file was able to be closed but something else happened elsewhere + // down the line. + // We want to make sure that we can recover in the case that `l.f` was closed while attempting a rotation. + l.f.Close() + assert.NilError(t, l.WriteLogEntry(&logger.Message{Line: []byte("hello world!")})) + assert.NilError(t, os.Remove(f.Name()+".2.gz")) + }) + + t.Run("with log reader", func(t *testing.T) { + // Make sure rotate works with an active reader + lw := logger.NewLogWatcher() + defer lw.ConsumerGone() + go l.ReadLogs(logger.ReadConfig{Follow: true, Tail: 1000}, lw) + + assert.NilError(t, l.WriteLogEntry(&logger.Message{Line: []byte("hello world 0!")}), ls) + // make sure the log reader is primed + waitForMsg(t, lw, 30*time.Second) + + assert.NilError(t, l.WriteLogEntry(&logger.Message{Line: []byte("hello world 1!")}), ls) + assert.NilError(t, l.WriteLogEntry(&logger.Message{Line: []byte("hello world 2!")}), ls) + assert.NilError(t, l.WriteLogEntry(&logger.Message{Line: []byte("hello world 3!")}), ls) + assert.NilError(t, l.WriteLogEntry(&logger.Message{Line: []byte("hello world 4!")}), ls) + poll.WaitOn(t, checkFileExists(f.Name()+".2.gz"), poll.WithDelay(time.Millisecond), poll.WithTimeout(30*time.Second)) + }) +} + +func waitForMsg(t *testing.T, lw *logger.LogWatcher, timeout time.Duration) { + t.Helper() + + timer := time.NewTimer(timeout) + defer timer.Stop() + + select { + case <-lw.Msg: + case <-lw.WatchProducerGone(): + t.Fatal("log producer gone before log message arrived") + case err := <-lw.Err: + assert.NilError(t, err) + case <-timer.C: + t.Fatal("timeout waiting for log message") + } } type dirStringer struct { @@ -287,13 +339,18 @@ func (d dirStringer) String() string { if err != nil { return "" } - var s strings.Builder - s.WriteString("\n") + buf := bytes.NewBuffer(nil) + tw := tabwriter.NewWriter(buf, 1, 8, 1, '\t', 0) + buf.WriteString("\n") + + btw := bufio.NewWriter(tw) for _, fi := range ls { - s.WriteString(fi.Name() + "\n") + btw.WriteString(fmt.Sprintf("%s\t%s\t%dB\t%s\n", fi.Name(), fi.Mode(), fi.Size(), fi.ModTime())) } - return s.String() + btw.Flush() + tw.Flush() + return buf.String() } func checkFileExists(name string) poll.Check { @@ -305,7 +362,7 @@ func checkFileExists(name string) poll.Check { case os.IsNotExist(err): return poll.Continue("waiting for %s to exist", name) default: - t.Logf("%s", dirStringer{filepath.Dir(name)}) + t.Logf("waiting for %s: %v: %s", name, err, dirStringer{filepath.Dir(name)}) return poll.Error(err) } }