logger/*: fix reading w/ non-monotonic timestamps

Fix journald and logfile-powered (jsonfile, local) log readers
incorrectly filtering out messages with timestamps < Since which were
preceded by a message with a timestamp >= Since.

Signed-off-by: Cory Snider <csnider@mirantis.com>
This commit is contained in:
Cory Snider 2022-02-10 14:47:24 -05:00
parent 342b44bf20
commit e278d3f185
5 changed files with 72 additions and 70 deletions

View file

@ -100,7 +100,8 @@ var errDrainDone = errors.New("journald drain done")
// journal is reached without encountering a terminal stopping condition,
// err == nil is returned.
func (s *journald) drainJournal(logWatcher *logger.LogWatcher, j *sdjournal.Journal, config logger.ReadConfig, initial chan struct{}) (int, error) {
if initial != nil {
isInitial := initial != nil
if isInitial {
defer func() {
if initial != nil {
close(initial)
@ -148,7 +149,7 @@ func (s *journald) drainJournal(logWatcher *logger.LogWatcher, j *sdjournal.Jour
var sent int
for i := 0; ; i++ {
if initial != nil && i == 0 && config.Tail > 0 {
if isInitial && i == 0 && config.Tail > 0 {
if n, err := j.PreviousSkip(uint(config.Tail)); err != nil || n == 0 {
return sent, err
}
@ -156,8 +157,9 @@ func (s *journald) drainJournal(logWatcher *logger.LogWatcher, j *sdjournal.Jour
return sent, err
}
if initial != nil && i == 0 {
// The cursor is in position. Signal that the watcher is
if isInitial && i == 0 {
// The cursor is in a position which will be unaffected
// by subsequent logging. Signal that the watcher is
// initialized.
close(initial)
initial = nil // Prevent double-closing.
@ -168,11 +170,13 @@ func (s *journald) drainJournal(logWatcher *logger.LogWatcher, j *sdjournal.Jour
if err != nil {
return sent, err
}
if timestamp.Before(config.Since) {
if initial != nil && i == 0 && config.Tail > 0 {
// PreviousSkip went too far back. Seek forwards.
j.SeekRealtime(config.Since)
}
// Check if the PreviousSkip went too far back. Check only the
// initial position as we are comparing wall-clock timestamps,
// which may not be monotonic. We don't want to skip over
// messages sent later in time just because the clock moved
// backwards.
if isInitial && i == 0 && config.Tail > 0 && timestamp.Before(config.Since) {
j.SeekRealtime(config.Since)
continue
}
if !config.Until.IsZero() && config.Until.Before(timestamp) {

View file

@ -51,6 +51,7 @@ func makeTestMessages() []*logger.Message {
{Source: "stderr", Timestamp: time.Now().Add(-1 * 15 * time.Minute), Line: []byte("continued"), PLogMetaData: &backend.PartialLogMetaData{ID: "bbbbbbbb", Ordinal: 2, Last: true}},
{Source: "stderr", Timestamp: time.Now().Add(-1 * 10 * time.Minute), Line: []byte("a really long message " + strings.Repeat("a", 4096))},
{Source: "stderr", Timestamp: time.Now().Add(-1 * 10 * time.Minute), Line: []byte("just one more message")},
{Source: "stdout", Timestamp: time.Now().Add(-1 * 90 * time.Minute), Line: []byte("someone adjusted the clock")},
}
}
@ -523,13 +524,12 @@ func readMessage(t *testing.T, lw *logger.LogWatcher) *logger.Message {
select {
case err, open := <-lw.Err:
t.Errorf("unexpected receive on lw.Err with closed lw.Msg: err=%v, open=%v", err, open)
return nil
default:
}
return nil
}
if msg != nil {
t.Logf("loggertest: ReadMessage [%v %v] %s", msg.Source, msg.Timestamp, msg.Line)
}
assert.Assert(t, msg != nil)
t.Logf("[%v] %s: %s", msg.Timestamp, msg.Source, msg.Line)
return msg
}
}

View file

@ -4,7 +4,6 @@ import (
"fmt"
"io"
"os"
"time"
"github.com/docker/docker/daemon/logger"
"github.com/pkg/errors"
@ -12,10 +11,10 @@ import (
)
type follow struct {
LogFile *LogFile
Watcher *logger.LogWatcher
Decoder Decoder
Since, Until time.Time
LogFile *LogFile
Watcher *logger.LogWatcher
Decoder Decoder
Forwarder *forwarder
log *logrus.Entry
c chan logPos
@ -49,7 +48,7 @@ func (fl *follow) Do(f *os.File, read logPos) {
fl.Watcher.Err <- err
return
}
if fl.decode(f) {
if !fl.forward(f) {
return
}
@ -91,7 +90,7 @@ func (fl *follow) Do(f *os.File, read logPos) {
read.size = 0
}
if fl.decode(io.NewSectionReader(f, read.size, wrote.size-read.size)) {
if !fl.forward(io.NewSectionReader(f, read.size, wrote.size-read.size)) {
return
}
read = wrote
@ -132,34 +131,10 @@ func (fl *follow) nextPos(current logPos) (next logPos, ok bool) {
return next, true
}
// decode decodes log messages from r and sends messages with timestamps between
// Since and Until to the log watcher.
// forward decodes log messages from r and forwards them to the log watcher.
//
// The return value, done, signals whether following should end due to a
// condition encountered during decode.
func (fl *follow) decode(r io.Reader) (done bool) {
// The return value, cont, signals whether following should continue.
func (fl *follow) forward(r io.Reader) (cont bool) {
fl.Decoder.Reset(r)
for {
msg, err := fl.Decoder.Decode()
if err != nil {
if errors.Is(err, io.EOF) {
return false
}
fl.Watcher.Err <- err
return true
}
if !fl.Since.IsZero() && msg.Timestamp.Before(fl.Since) {
continue
}
if !fl.Until.IsZero() && msg.Timestamp.After(fl.Until) {
return true
}
// send the message, unless the consumer is gone
select {
case fl.Watcher.Msg <- msg:
case <-fl.Watcher.WatchConsumerGone():
return true
}
}
return fl.Forwarder.Do(fl.Watcher, fl.Decoder)
}

View file

@ -411,6 +411,7 @@ func (w *LogFile) readLogsLocked(currentPos logPos, config logger.ReadConfig, wa
defer dec.Close()
currentChunk := io.NewSectionReader(currentFile, 0, currentPos.size)
fwd := newForwarder(config)
if config.Tail != 0 {
// TODO(@cpuguy83): Instead of opening every file, only get the files which
@ -449,7 +450,7 @@ func (w *LogFile) readLogsLocked(currentPos logPos, config logger.ReadConfig, wa
readers = append(readers, currentChunk)
}
ok := tailFiles(readers, watcher, dec, w.getTailReader, config)
ok := tailFiles(readers, watcher, dec, w.getTailReader, config.Tail, fwd)
closeFiles()
if !ok {
return
@ -463,11 +464,10 @@ func (w *LogFile) readLogsLocked(currentPos logPos, config logger.ReadConfig, wa
}
(&follow{
LogFile: w,
Watcher: watcher,
Decoder: dec,
Since: config.Since,
Until: config.Until,
LogFile: w,
Watcher: watcher,
Decoder: dec,
Forwarder: fwd,
}).Do(currentFile, currentPos)
}
@ -573,7 +573,7 @@ func decompress(dst io.WriteSeeker, src io.ReadSeeker) error {
return rc.Close()
}
func tailFiles(files []SizeReaderAt, watcher *logger.LogWatcher, dec Decoder, getTailReader GetTailReaderFunc, config logger.ReadConfig) (cont bool) {
func tailFiles(files []SizeReaderAt, watcher *logger.LogWatcher, dec Decoder, getTailReader GetTailReaderFunc, nLines int, fwd *forwarder) (cont bool) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
@ -583,20 +583,18 @@ func tailFiles(files []SizeReaderAt, watcher *logger.LogWatcher, dec Decoder, ge
select {
case <-ctx.Done():
case <-watcher.WatchConsumerGone():
cont = false
cancel()
}
}()
readers := make([]io.Reader, 0, len(files))
if config.Tail > 0 {
nLines := config.Tail
if nLines > 0 {
for i := len(files) - 1; i >= 0 && nLines > 0; i-- {
tail, n, err := getTailReader(ctx, files[i], nLines)
if err != nil {
watcher.Err <- errors.Wrap(err, "error finding file position to start log tailing")
return
return false
}
nLines -= n
readers = append([]io.Reader{tail}, readers...)
@ -609,24 +607,47 @@ func tailFiles(files []SizeReaderAt, watcher *logger.LogWatcher, dec Decoder, ge
rdr := io.MultiReader(readers...)
dec.Reset(rdr)
return fwd.Do(watcher, dec)
}
type forwarder struct {
since, until time.Time
}
func newForwarder(config logger.ReadConfig) *forwarder {
return &forwarder{since: config.Since, until: config.Until}
}
// Do reads log messages from dec and sends the messages matching the filter
// conditions to watcher. Do returns cont=true iff it has read all messages from
// dec without encountering a message with a timestamp which is after the
// configured until time.
func (fwd *forwarder) Do(watcher *logger.LogWatcher, dec Decoder) (cont bool) {
for {
msg, err := dec.Decode()
if err != nil {
if !errors.Is(err, io.EOF) {
watcher.Err <- err
if errors.Is(err, io.EOF) {
return true
}
return
watcher.Err <- err
return false
}
if !config.Since.IsZero() && msg.Timestamp.Before(config.Since) {
continue
if !fwd.since.IsZero() {
if msg.Timestamp.Before(fwd.since) {
continue
}
// We've found our first message with a timestamp >= since. As message
// timestamps might not be monotonic, we need to skip the since check for all
// subsequent messages so we do not filter out later messages which happen to
// have timestamps before since.
fwd.since = time.Time{}
}
if !config.Until.IsZero() && msg.Timestamp.After(config.Until) {
return
if !fwd.until.IsZero() && msg.Timestamp.After(fwd.until) {
return false
}
select {
case <-ctx.Done():
return
case <-watcher.WatchConsumerGone():
return false
case watcher.Msg <- msg:
}
}

View file

@ -64,19 +64,21 @@ func TestTailFiles(t *testing.T) {
for desc, config := range map[string]logger.ReadConfig{} {
t.Run(desc, func(t *testing.T) {
started := make(chan struct{})
fwd := newForwarder(config)
go func() {
close(started)
tailFiles(files, watcher, dec, tailReader, config)
tailFiles(files, watcher, dec, tailReader, config.Tail, fwd)
}()
<-started
})
}
config := logger.ReadConfig{Tail: 2}
fwd := newForwarder(config)
started := make(chan struct{})
go func() {
close(started)
tailFiles(files, watcher, dec, tailReader, config)
tailFiles(files, watcher, dec, tailReader, config.Tail, fwd)
}()
<-started