diff --git a/daemon/logger/journald/read.go b/daemon/logger/journald/read.go index 65afdfa5c8..992108b8c3 100644 --- a/daemon/logger/journald/read.go +++ b/daemon/logger/journald/read.go @@ -17,7 +17,10 @@ import ( "github.com/docker/docker/daemon/logger/journald/internal/sdjournal" ) -const closedDrainTimeout = 5 * time.Second +const ( + closedDrainTimeout = 5 * time.Second + waitInterval = 250 * time.Millisecond +) // Fields which we know are not user-provided attribute fields. var wellKnownFields = map[string]bool{ @@ -46,13 +49,13 @@ var wellKnownFields = map[string]bool{ } type reader struct { - s *journald - j *sdjournal.Journal - logWatcher *logger.LogWatcher - config logger.ReadConfig - maxOrdinal uint64 - initialized bool - ready chan struct{} + s *journald + j *sdjournal.Journal + logWatcher *logger.LogWatcher + config logger.ReadConfig + maxOrdinal uint64 + ready chan struct{} + drainDeadline time.Time } func getMessage(d map[string]string) (line []byte, ok bool) { @@ -103,89 +106,155 @@ func getAttrs(d map[string]string) []backend.LogAttr { // no more log entries to send to the log watcher. var errDrainDone = errors.New("journald drain done") -// drainJournal reads and sends log messages from the journal. -// -// drainJournal returns errDrainDone when a terminal stopping condition has been -// reached: the watch consumer is gone, a log entry is read which has a -// timestamp after until (if until is nonzero), or the log driver is closed and -// the last message logged has been sent from the journal. If the end of the -// journal is reached without encountering a terminal stopping condition, a nil -// error is returned. -func (r *reader) drainJournal() error { - if !r.initialized { - defer func() { - r.signalReady() - r.initialized = true - }() +// The SeekXYZ() methods all move the journal read pointer to a "conceptual" +// position which does not correspond to any journal entry. A subsequent call to +// Next(), Previous() or similar is necessary to resolve the read pointer to a +// discrete entry. +// https://github.com/systemd/systemd/pull/5930#issuecomment-300878104 +// But that's not all! If there is no discrete entry to resolve the position to, +// the call to Next() or Previous() will just leave the read pointer in a +// conceptual position, or do something even more bizarre. +// https://github.com/systemd/systemd/issues/9934 - var ( - err error - seekedToTail bool - ) - if r.config.Tail >= 0 { - if r.config.Until.IsZero() { - err = r.j.SeekTail() - seekedToTail = true - } else { - err = r.j.SeekRealtime(r.config.Until) - } - } else { - if r.config.Since.IsZero() { - err = r.j.SeekHead() - } else { - err = r.j.SeekRealtime(r.config.Since) - } - } - if err != nil { - return err - } +// initialSeekHead positions the journal read pointer at the earliest journal +// entry with a timestamp of at least r.config.Since. It returns true if there +// is an entry to read at the read pointer. +func (r *reader) initialSeekHead() (bool, error) { + var err error + if r.config.Since.IsZero() { + err = r.j.SeekHead() + } else { + err = r.j.SeekRealtime(r.config.Since) + } + if err != nil { + return false, err + } + return r.j.Next() +} - // SeekTail() followed by Next() behaves incorrectly, so we need - // to work around the bug by ensuring the first discrete - // movement of the read pointer is Previous() or PreviousSkip(). - // PreviousSkip() is called inside the loop when config.Tail > 0 - // so the only special case requiring special handling is - // config.Tail == 0. - // https://github.com/systemd/systemd/issues/9934 - if seekedToTail && r.config.Tail == 0 { - // Resolve the read pointer to the last entry in the - // journal so that the call to Next() inside the loop - // advances past it. - if ok, err := r.j.Previous(); err != nil || !ok { - return err - } - } +// initialSeekTail positions the journal read pointer at a journal entry +// relative to the tail of the journal at the time of the call based on the +// specification in r.config. It returns true if there is an entry to read at +// the read pointer. Otherwise the read pointer is set to a conceptual position +// which will be resolved to the desired entry (once written) by advancing +// forward with r.j.Next() or similar. +func (r *reader) initialSeekTail() (bool, error) { + var err error + if r.config.Until.IsZero() { + err = r.j.SeekTail() + } else { + err = r.j.SeekRealtime(r.config.Until) + } + if err != nil { + return false, err } - for i := 0; ; i++ { - if !r.initialized && i == 0 && r.config.Tail > 0 { - if n, err := r.j.PreviousSkip(uint(r.config.Tail)); err != nil || n == 0 { - return err + var ok bool + if r.config.Tail == 0 { + ok, err = r.j.Previous() + } else { + var n int + n, err = r.j.PreviousSkip(uint(r.config.Tail)) + ok = n > 0 + } + if err != nil { + return ok, err + } + if !ok { + // The (filtered) journal has no entries. The tail is the head: all new + // entries which get written into the journal from this point forward + // should be read from the journal. However the read pointer is + // positioned at a conceptual position which is not condusive to reading + // those entries. The tail of the journal is resolved to the last entry + // in the journal _at the time of the first successful Previous() call_, + // which means that an arbitrary number of journal entries added in the + // interim may be skipped: race condition. While the realtime conceptual + // position is not so racy, it is also unhelpful: it is the timestamp + // past where reading should stop, so all logs that should be followed + // would be skipped over. + // Reset the read pointer position to avoid these problems. + return r.initialSeekHead() + } else if r.config.Tail == 0 { + // The journal read pointer is positioned at the discrete position of + // the journal entry _before_ the entry to send. + return r.j.Next() + } + + // Check if the PreviousSkip went too far back. + timestamp, err := r.j.Realtime() + if err != nil { + return false, err + } + if timestamp.Before(r.config.Since) { + if err := r.j.SeekRealtime(r.config.Since); err != nil { + return false, err + } + return r.j.Next() + } + return true, nil +} + +// wait blocks until the journal has new data to read, the reader's drain +// deadline is exceeded, or the log reading consumer is gone. +func (r *reader) wait() (sdjournal.Status, error) { + for { + dur := waitInterval + if !r.drainDeadline.IsZero() { + dur = time.Until(r.drainDeadline) + if dur < 0 { + // Container is gone but we haven't found the end of the + // logs before the deadline. Maybe it was dropped by + // journald, e.g. due to rate-limiting. + return sdjournal.StatusNOP, errDrainDone + } else if dur > waitInterval { + dur = waitInterval } - } else if ok, err := r.j.Next(); err != nil || !ok { + } + status, err := r.j.Wait(dur) + if err != nil || status != sdjournal.StatusNOP { + return status, err + } + select { + case <-r.logWatcher.WatchConsumerGone(): + return status, errDrainDone + case <-r.s.closed: + // Container is gone; don't wait indefinitely for journal entries that will never arrive. + if r.drainDeadline.IsZero() { + r.drainDeadline = time.Now().Add(closedDrainTimeout) + } + default: + } + } +} + +// nextWait blocks until there is a new journal entry to read, and advances the +// journal read pointer to it. +func (r *reader) nextWait() error { + for { + if ok, err := r.j.Next(); err != nil || ok { return err } - - if !r.initialized && i == 0 { - // The cursor is in a position which will be unaffected - // by subsequent logging. - r.signalReady() + if _, err := r.wait(); err != nil { + return err } + } +} +// drainJournal reads and sends log messages from the journal, starting from the +// current read pointer, until the end of the journal or a terminal stopping +// condition is reached. +// +// errDrainDone is returned when a terminal stopping condition has been +// reached: the watch consumer is gone, a log entry is read which has a +// timestamp after until (if until is nonzero), or the log driver is closed and +// the last message logged has been sent from the journal. +func (r *reader) drainJournal() error { + for i := 0; ; i++ { // Read the entry's timestamp. timestamp, err := r.j.Realtime() if err != nil { return err } - // Check if the PreviousSkip went too far back. Check only the - // initial position as we are comparing wall-clock timestamps, - // which may not be monotonic. We don't want to skip over - // messages sent later in time just because the clock moved - // backwards. - if !r.initialized && i == 0 && r.config.Tail > 0 && timestamp.Before(r.config.Since) { - r.j.SeekRealtime(r.config.Since) - continue - } if !r.config.Until.IsZero() && r.config.Until.Before(timestamp) { return errDrainDone } @@ -243,6 +312,10 @@ func (r *reader) drainJournal() error { Warn("journald: error processing journal") } } + + if ok, err := r.j.Next(); err != nil || !ok { + return err + } } } @@ -255,28 +328,15 @@ func (r *reader) readJournal() error { return nil } - var drainDeadline time.Time if !r.config.Follow { if r.s.readSyncTimeout == 0 { return nil } - drainDeadline = time.Now().Add(r.s.readSyncTimeout) + r.drainDeadline = time.Now().Add(r.s.readSyncTimeout) } for { - if !drainDeadline.IsZero() && time.Now().After(drainDeadline) { - // Container is gone but we haven't found the end of the - // logs within the timeout. Maybe it was dropped by - // journald, e.g. due to rate-limiting. - return nil - } - status, err := r.j.Wait(250 * time.Millisecond) - if err != nil { - return err - } select { - case <-r.logWatcher.WatchConsumerGone(): - return nil // won't be able to write anything anymore case <-r.s.closed: // container is gone, drain journal lastSeq := atomic.LoadUint64(&r.s.ordinal) @@ -284,17 +344,16 @@ func (r *reader) readJournal() error { // All caught up with the logger! return nil } - if drainDeadline.IsZero() { - drainDeadline = time.Now().Add(closedDrainTimeout) - } default: - if status == sdjournal.StatusNOP { - // no new data -- keep waiting - continue - } } - err = r.drainJournal() - if err != nil { + + if err := r.nextWait(); err != nil { + if err != errDrainDone { + return err + } + return nil + } + if err := r.drainJournal(); err != nil { if err != errDrainDone { return err } @@ -357,6 +416,33 @@ func (r *reader) readLogs() { return } + var ok bool + if r.config.Tail >= 0 { + ok, err = r.initialSeekTail() + } else { + ok, err = r.initialSeekHead() + } + if err != nil { + r.logWatcher.Err <- err + return + } + r.signalReady() + if !ok { + if !r.config.Follow { + return + } + // Either the read pointer is positioned at a discrete journal entry, in + // which case the position will be unaffected by subsequent logging, or + // the read pointer is in the conceptual position corresponding to the + // first journal entry to send once it is logged in the future. + if err := r.nextWait(); err != nil { + if err != errDrainDone { + r.logWatcher.Err <- err + } + return + } + } + if err := r.readJournal(); err != nil { r.logWatcher.Err <- err return