diff --git a/daemon/logger/journald/read.go b/daemon/logger/journald/read.go index f15c4ebc7a..13ee3d3b34 100644 --- a/daemon/logger/journald/read.go +++ b/daemon/logger/journald/read.go @@ -100,7 +100,8 @@ var errDrainDone = errors.New("journald drain done") // journal is reached without encountering a terminal stopping condition, // err == nil is returned. func (s *journald) drainJournal(logWatcher *logger.LogWatcher, j *sdjournal.Journal, config logger.ReadConfig, initial chan struct{}) (int, error) { - if initial != nil { + isInitial := initial != nil + if isInitial { defer func() { if initial != nil { close(initial) @@ -148,7 +149,7 @@ func (s *journald) drainJournal(logWatcher *logger.LogWatcher, j *sdjournal.Jour var sent int for i := 0; ; i++ { - if initial != nil && i == 0 && config.Tail > 0 { + if isInitial && i == 0 && config.Tail > 0 { if n, err := j.PreviousSkip(uint(config.Tail)); err != nil || n == 0 { return sent, err } @@ -156,8 +157,9 @@ func (s *journald) drainJournal(logWatcher *logger.LogWatcher, j *sdjournal.Jour return sent, err } - if initial != nil && i == 0 { - // The cursor is in position. Signal that the watcher is + if isInitial && i == 0 { + // The cursor is in a position which will be unaffected + // by subsequent logging. Signal that the watcher is // initialized. close(initial) initial = nil // Prevent double-closing. @@ -168,11 +170,13 @@ func (s *journald) drainJournal(logWatcher *logger.LogWatcher, j *sdjournal.Jour if err != nil { return sent, err } - if timestamp.Before(config.Since) { - if initial != nil && i == 0 && config.Tail > 0 { - // PreviousSkip went too far back. Seek forwards. - j.SeekRealtime(config.Since) - } + // Check if the PreviousSkip went too far back. Check only the + // initial position as we are comparing wall-clock timestamps, + // which may not be monotonic. We don't want to skip over + // messages sent later in time just because the clock moved + // backwards. + if isInitial && i == 0 && config.Tail > 0 && timestamp.Before(config.Since) { + j.SeekRealtime(config.Since) continue } if !config.Until.IsZero() && config.Until.Before(timestamp) { diff --git a/daemon/logger/loggertest/logreader.go b/daemon/logger/loggertest/logreader.go index df5cf5ed62..f559bf222f 100644 --- a/daemon/logger/loggertest/logreader.go +++ b/daemon/logger/loggertest/logreader.go @@ -51,6 +51,7 @@ func makeTestMessages() []*logger.Message { {Source: "stderr", Timestamp: time.Now().Add(-1 * 15 * time.Minute), Line: []byte("continued"), PLogMetaData: &backend.PartialLogMetaData{ID: "bbbbbbbb", Ordinal: 2, Last: true}}, {Source: "stderr", Timestamp: time.Now().Add(-1 * 10 * time.Minute), Line: []byte("a really long message " + strings.Repeat("a", 4096))}, {Source: "stderr", Timestamp: time.Now().Add(-1 * 10 * time.Minute), Line: []byte("just one more message")}, + {Source: "stdout", Timestamp: time.Now().Add(-1 * 90 * time.Minute), Line: []byte("someone adjusted the clock")}, } } @@ -523,13 +524,12 @@ func readMessage(t *testing.T, lw *logger.LogWatcher) *logger.Message { select { case err, open := <-lw.Err: t.Errorf("unexpected receive on lw.Err with closed lw.Msg: err=%v, open=%v", err, open) - return nil default: } + return nil } - if msg != nil { - t.Logf("loggertest: ReadMessage [%v %v] %s", msg.Source, msg.Timestamp, msg.Line) - } + assert.Assert(t, msg != nil) + t.Logf("[%v] %s: %s", msg.Timestamp, msg.Source, msg.Line) return msg } } diff --git a/daemon/logger/loggerutils/follow.go b/daemon/logger/loggerutils/follow.go index b7f93a9aff..483e032d2c 100644 --- a/daemon/logger/loggerutils/follow.go +++ b/daemon/logger/loggerutils/follow.go @@ -4,7 +4,6 @@ import ( "fmt" "io" "os" - "time" "github.com/docker/docker/daemon/logger" "github.com/pkg/errors" @@ -12,10 +11,10 @@ import ( ) type follow struct { - LogFile *LogFile - Watcher *logger.LogWatcher - Decoder Decoder - Since, Until time.Time + LogFile *LogFile + Watcher *logger.LogWatcher + Decoder Decoder + Forwarder *forwarder log *logrus.Entry c chan logPos @@ -49,7 +48,7 @@ func (fl *follow) Do(f *os.File, read logPos) { fl.Watcher.Err <- err return } - if fl.decode(f) { + if !fl.forward(f) { return } @@ -91,7 +90,7 @@ func (fl *follow) Do(f *os.File, read logPos) { read.size = 0 } - if fl.decode(io.NewSectionReader(f, read.size, wrote.size-read.size)) { + if !fl.forward(io.NewSectionReader(f, read.size, wrote.size-read.size)) { return } read = wrote @@ -132,34 +131,10 @@ func (fl *follow) nextPos(current logPos) (next logPos, ok bool) { return next, true } -// decode decodes log messages from r and sends messages with timestamps between -// Since and Until to the log watcher. +// forward decodes log messages from r and forwards them to the log watcher. // -// The return value, done, signals whether following should end due to a -// condition encountered during decode. -func (fl *follow) decode(r io.Reader) (done bool) { +// The return value, cont, signals whether following should continue. +func (fl *follow) forward(r io.Reader) (cont bool) { fl.Decoder.Reset(r) - for { - msg, err := fl.Decoder.Decode() - if err != nil { - if errors.Is(err, io.EOF) { - return false - } - fl.Watcher.Err <- err - return true - } - - if !fl.Since.IsZero() && msg.Timestamp.Before(fl.Since) { - continue - } - if !fl.Until.IsZero() && msg.Timestamp.After(fl.Until) { - return true - } - // send the message, unless the consumer is gone - select { - case fl.Watcher.Msg <- msg: - case <-fl.Watcher.WatchConsumerGone(): - return true - } - } + return fl.Forwarder.Do(fl.Watcher, fl.Decoder) } diff --git a/daemon/logger/loggerutils/logfile.go b/daemon/logger/loggerutils/logfile.go index decd2dfb34..b37e93f875 100644 --- a/daemon/logger/loggerutils/logfile.go +++ b/daemon/logger/loggerutils/logfile.go @@ -411,6 +411,7 @@ func (w *LogFile) readLogsLocked(currentPos logPos, config logger.ReadConfig, wa defer dec.Close() currentChunk := io.NewSectionReader(currentFile, 0, currentPos.size) + fwd := newForwarder(config) if config.Tail != 0 { // TODO(@cpuguy83): Instead of opening every file, only get the files which @@ -449,7 +450,7 @@ func (w *LogFile) readLogsLocked(currentPos logPos, config logger.ReadConfig, wa readers = append(readers, currentChunk) } - ok := tailFiles(readers, watcher, dec, w.getTailReader, config) + ok := tailFiles(readers, watcher, dec, w.getTailReader, config.Tail, fwd) closeFiles() if !ok { return @@ -463,11 +464,10 @@ func (w *LogFile) readLogsLocked(currentPos logPos, config logger.ReadConfig, wa } (&follow{ - LogFile: w, - Watcher: watcher, - Decoder: dec, - Since: config.Since, - Until: config.Until, + LogFile: w, + Watcher: watcher, + Decoder: dec, + Forwarder: fwd, }).Do(currentFile, currentPos) } @@ -573,7 +573,7 @@ func decompress(dst io.WriteSeeker, src io.ReadSeeker) error { return rc.Close() } -func tailFiles(files []SizeReaderAt, watcher *logger.LogWatcher, dec Decoder, getTailReader GetTailReaderFunc, config logger.ReadConfig) (cont bool) { +func tailFiles(files []SizeReaderAt, watcher *logger.LogWatcher, dec Decoder, getTailReader GetTailReaderFunc, nLines int, fwd *forwarder) (cont bool) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -583,20 +583,18 @@ func tailFiles(files []SizeReaderAt, watcher *logger.LogWatcher, dec Decoder, ge select { case <-ctx.Done(): case <-watcher.WatchConsumerGone(): - cont = false cancel() } }() readers := make([]io.Reader, 0, len(files)) - if config.Tail > 0 { - nLines := config.Tail + if nLines > 0 { for i := len(files) - 1; i >= 0 && nLines > 0; i-- { tail, n, err := getTailReader(ctx, files[i], nLines) if err != nil { watcher.Err <- errors.Wrap(err, "error finding file position to start log tailing") - return + return false } nLines -= n readers = append([]io.Reader{tail}, readers...) @@ -609,24 +607,47 @@ func tailFiles(files []SizeReaderAt, watcher *logger.LogWatcher, dec Decoder, ge rdr := io.MultiReader(readers...) dec.Reset(rdr) + return fwd.Do(watcher, dec) +} +type forwarder struct { + since, until time.Time +} + +func newForwarder(config logger.ReadConfig) *forwarder { + return &forwarder{since: config.Since, until: config.Until} +} + +// Do reads log messages from dec and sends the messages matching the filter +// conditions to watcher. Do returns cont=true iff it has read all messages from +// dec without encountering a message with a timestamp which is after the +// configured until time. +func (fwd *forwarder) Do(watcher *logger.LogWatcher, dec Decoder) (cont bool) { for { msg, err := dec.Decode() if err != nil { - if !errors.Is(err, io.EOF) { - watcher.Err <- err + if errors.Is(err, io.EOF) { + return true } - return + watcher.Err <- err + return false } - if !config.Since.IsZero() && msg.Timestamp.Before(config.Since) { - continue + if !fwd.since.IsZero() { + if msg.Timestamp.Before(fwd.since) { + continue + } + // We've found our first message with a timestamp >= since. As message + // timestamps might not be monotonic, we need to skip the since check for all + // subsequent messages so we do not filter out later messages which happen to + // have timestamps before since. + fwd.since = time.Time{} } - if !config.Until.IsZero() && msg.Timestamp.After(config.Until) { - return + if !fwd.until.IsZero() && msg.Timestamp.After(fwd.until) { + return false } select { - case <-ctx.Done(): - return + case <-watcher.WatchConsumerGone(): + return false case watcher.Msg <- msg: } } diff --git a/daemon/logger/loggerutils/logfile_test.go b/daemon/logger/loggerutils/logfile_test.go index f6edb38692..20a115c7a9 100644 --- a/daemon/logger/loggerutils/logfile_test.go +++ b/daemon/logger/loggerutils/logfile_test.go @@ -64,19 +64,21 @@ func TestTailFiles(t *testing.T) { for desc, config := range map[string]logger.ReadConfig{} { t.Run(desc, func(t *testing.T) { started := make(chan struct{}) + fwd := newForwarder(config) go func() { close(started) - tailFiles(files, watcher, dec, tailReader, config) + tailFiles(files, watcher, dec, tailReader, config.Tail, fwd) }() <-started }) } config := logger.ReadConfig{Tail: 2} + fwd := newForwarder(config) started := make(chan struct{}) go func() { close(started) - tailFiles(files, watcher, dec, tailReader, config) + tailFiles(files, watcher, dec, tailReader, config.Tail, fwd) }() <-started