logger/journald: wait no longer than the deadline

While it doesn't really matter if the reader waits for an extra
arbitrary period beyond an arbitrary hardcoded timeout, it's also
trivial and cheap to implement, and nice to have.

Signed-off-by: Cory Snider <csnider@mirantis.com>
This commit is contained in:
Cory Snider 2024-01-02 15:35:33 -05:00
parent e94ec8068d
commit d70fe8803c

View file

@ -17,7 +17,10 @@ import (
"github.com/docker/docker/daemon/logger/journald/internal/sdjournal"
)
const closedDrainTimeout = 5 * time.Second
const (
closedDrainTimeout = 5 * time.Second
waitInterval = 250 * time.Millisecond
)
// Fields which we know are not user-provided attribute fields.
var wellKnownFields = map[string]bool{
@ -46,13 +49,13 @@ var wellKnownFields = map[string]bool{
}
type reader struct {
s *journald
j *sdjournal.Journal
logWatcher *logger.LogWatcher
config logger.ReadConfig
maxOrdinal uint64
initialized bool
ready chan struct{}
s *journald
j *sdjournal.Journal
logWatcher *logger.LogWatcher
config logger.ReadConfig
maxOrdinal uint64
ready chan struct{}
drainDeadline time.Time
}
func getMessage(d map[string]string) (line []byte, ok bool) {
@ -103,89 +106,155 @@ func getAttrs(d map[string]string) []backend.LogAttr {
// no more log entries to send to the log watcher.
var errDrainDone = errors.New("journald drain done")
// drainJournal reads and sends log messages from the journal.
//
// drainJournal returns errDrainDone when a terminal stopping condition has been
// reached: the watch consumer is gone, a log entry is read which has a
// timestamp after until (if until is nonzero), or the log driver is closed and
// the last message logged has been sent from the journal. If the end of the
// journal is reached without encountering a terminal stopping condition, a nil
// error is returned.
func (r *reader) drainJournal() error {
if !r.initialized {
defer func() {
r.signalReady()
r.initialized = true
}()
// The SeekXYZ() methods all move the journal read pointer to a "conceptual"
// position which does not correspond to any journal entry. A subsequent call to
// Next(), Previous() or similar is necessary to resolve the read pointer to a
// discrete entry.
// https://github.com/systemd/systemd/pull/5930#issuecomment-300878104
// But that's not all! If there is no discrete entry to resolve the position to,
// the call to Next() or Previous() will just leave the read pointer in a
// conceptual position, or do something even more bizarre.
// https://github.com/systemd/systemd/issues/9934
var (
err error
seekedToTail bool
)
if r.config.Tail >= 0 {
if r.config.Until.IsZero() {
err = r.j.SeekTail()
seekedToTail = true
} else {
err = r.j.SeekRealtime(r.config.Until)
}
} else {
if r.config.Since.IsZero() {
err = r.j.SeekHead()
} else {
err = r.j.SeekRealtime(r.config.Since)
}
}
if err != nil {
return err
}
// initialSeekHead positions the journal read pointer at the earliest journal
// entry with a timestamp of at least r.config.Since. It returns true if there
// is an entry to read at the read pointer.
func (r *reader) initialSeekHead() (bool, error) {
var err error
if r.config.Since.IsZero() {
err = r.j.SeekHead()
} else {
err = r.j.SeekRealtime(r.config.Since)
}
if err != nil {
return false, err
}
return r.j.Next()
}
// SeekTail() followed by Next() behaves incorrectly, so we need
// to work around the bug by ensuring the first discrete
// movement of the read pointer is Previous() or PreviousSkip().
// PreviousSkip() is called inside the loop when config.Tail > 0
// so the only special case requiring special handling is
// config.Tail == 0.
// https://github.com/systemd/systemd/issues/9934
if seekedToTail && r.config.Tail == 0 {
// Resolve the read pointer to the last entry in the
// journal so that the call to Next() inside the loop
// advances past it.
if ok, err := r.j.Previous(); err != nil || !ok {
return err
}
}
// initialSeekTail positions the journal read pointer at a journal entry
// relative to the tail of the journal at the time of the call based on the
// specification in r.config. It returns true if there is an entry to read at
// the read pointer. Otherwise the read pointer is set to a conceptual position
// which will be resolved to the desired entry (once written) by advancing
// forward with r.j.Next() or similar.
func (r *reader) initialSeekTail() (bool, error) {
var err error
if r.config.Until.IsZero() {
err = r.j.SeekTail()
} else {
err = r.j.SeekRealtime(r.config.Until)
}
if err != nil {
return false, err
}
for i := 0; ; i++ {
if !r.initialized && i == 0 && r.config.Tail > 0 {
if n, err := r.j.PreviousSkip(uint(r.config.Tail)); err != nil || n == 0 {
return err
var ok bool
if r.config.Tail == 0 {
ok, err = r.j.Previous()
} else {
var n int
n, err = r.j.PreviousSkip(uint(r.config.Tail))
ok = n > 0
}
if err != nil {
return ok, err
}
if !ok {
// The (filtered) journal has no entries. The tail is the head: all new
// entries which get written into the journal from this point forward
// should be read from the journal. However the read pointer is
// positioned at a conceptual position which is not condusive to reading
// those entries. The tail of the journal is resolved to the last entry
// in the journal _at the time of the first successful Previous() call_,
// which means that an arbitrary number of journal entries added in the
// interim may be skipped: race condition. While the realtime conceptual
// position is not so racy, it is also unhelpful: it is the timestamp
// past where reading should stop, so all logs that should be followed
// would be skipped over.
// Reset the read pointer position to avoid these problems.
return r.initialSeekHead()
} else if r.config.Tail == 0 {
// The journal read pointer is positioned at the discrete position of
// the journal entry _before_ the entry to send.
return r.j.Next()
}
// Check if the PreviousSkip went too far back.
timestamp, err := r.j.Realtime()
if err != nil {
return false, err
}
if timestamp.Before(r.config.Since) {
if err := r.j.SeekRealtime(r.config.Since); err != nil {
return false, err
}
return r.j.Next()
}
return true, nil
}
// wait blocks until the journal has new data to read, the reader's drain
// deadline is exceeded, or the log reading consumer is gone.
func (r *reader) wait() (sdjournal.Status, error) {
for {
dur := waitInterval
if !r.drainDeadline.IsZero() {
dur = time.Until(r.drainDeadline)
if dur < 0 {
// Container is gone but we haven't found the end of the
// logs before the deadline. Maybe it was dropped by
// journald, e.g. due to rate-limiting.
return sdjournal.StatusNOP, errDrainDone
} else if dur > waitInterval {
dur = waitInterval
}
} else if ok, err := r.j.Next(); err != nil || !ok {
}
status, err := r.j.Wait(dur)
if err != nil || status != sdjournal.StatusNOP {
return status, err
}
select {
case <-r.logWatcher.WatchConsumerGone():
return status, errDrainDone
case <-r.s.closed:
// Container is gone; don't wait indefinitely for journal entries that will never arrive.
if r.drainDeadline.IsZero() {
r.drainDeadline = time.Now().Add(closedDrainTimeout)
}
default:
}
}
}
// nextWait blocks until there is a new journal entry to read, and advances the
// journal read pointer to it.
func (r *reader) nextWait() error {
for {
if ok, err := r.j.Next(); err != nil || ok {
return err
}
if !r.initialized && i == 0 {
// The cursor is in a position which will be unaffected
// by subsequent logging.
r.signalReady()
if _, err := r.wait(); err != nil {
return err
}
}
}
// drainJournal reads and sends log messages from the journal, starting from the
// current read pointer, until the end of the journal or a terminal stopping
// condition is reached.
//
// errDrainDone is returned when a terminal stopping condition has been
// reached: the watch consumer is gone, a log entry is read which has a
// timestamp after until (if until is nonzero), or the log driver is closed and
// the last message logged has been sent from the journal.
func (r *reader) drainJournal() error {
for i := 0; ; i++ {
// Read the entry's timestamp.
timestamp, err := r.j.Realtime()
if err != nil {
return err
}
// Check if the PreviousSkip went too far back. Check only the
// initial position as we are comparing wall-clock timestamps,
// which may not be monotonic. We don't want to skip over
// messages sent later in time just because the clock moved
// backwards.
if !r.initialized && i == 0 && r.config.Tail > 0 && timestamp.Before(r.config.Since) {
r.j.SeekRealtime(r.config.Since)
continue
}
if !r.config.Until.IsZero() && r.config.Until.Before(timestamp) {
return errDrainDone
}
@ -243,6 +312,10 @@ func (r *reader) drainJournal() error {
Warn("journald: error processing journal")
}
}
if ok, err := r.j.Next(); err != nil || !ok {
return err
}
}
}
@ -255,28 +328,15 @@ func (r *reader) readJournal() error {
return nil
}
var drainDeadline time.Time
if !r.config.Follow {
if r.s.readSyncTimeout == 0 {
return nil
}
drainDeadline = time.Now().Add(r.s.readSyncTimeout)
r.drainDeadline = time.Now().Add(r.s.readSyncTimeout)
}
for {
if !drainDeadline.IsZero() && time.Now().After(drainDeadline) {
// Container is gone but we haven't found the end of the
// logs within the timeout. Maybe it was dropped by
// journald, e.g. due to rate-limiting.
return nil
}
status, err := r.j.Wait(250 * time.Millisecond)
if err != nil {
return err
}
select {
case <-r.logWatcher.WatchConsumerGone():
return nil // won't be able to write anything anymore
case <-r.s.closed:
// container is gone, drain journal
lastSeq := atomic.LoadUint64(&r.s.ordinal)
@ -284,17 +344,16 @@ func (r *reader) readJournal() error {
// All caught up with the logger!
return nil
}
if drainDeadline.IsZero() {
drainDeadline = time.Now().Add(closedDrainTimeout)
}
default:
if status == sdjournal.StatusNOP {
// no new data -- keep waiting
continue
}
}
err = r.drainJournal()
if err != nil {
if err := r.nextWait(); err != nil {
if err != errDrainDone {
return err
}
return nil
}
if err := r.drainJournal(); err != nil {
if err != errDrainDone {
return err
}
@ -357,6 +416,33 @@ func (r *reader) readLogs() {
return
}
var ok bool
if r.config.Tail >= 0 {
ok, err = r.initialSeekTail()
} else {
ok, err = r.initialSeekHead()
}
if err != nil {
r.logWatcher.Err <- err
return
}
r.signalReady()
if !ok {
if !r.config.Follow {
return
}
// Either the read pointer is positioned at a discrete journal entry, in
// which case the position will be unaffected by subsequent logging, or
// the read pointer is in the conceptual position corresponding to the
// first journal entry to send once it is logged in the future.
if err := r.nextWait(); err != nil {
if err != errDrainDone {
r.logWatcher.Err <- err
}
return
}
}
if err := r.readJournal(); err != nil {
r.logWatcher.Err <- err
return