From 905477c8ae1d43b5ea654f7a430091a903149c36 Mon Sep 17 00:00:00 2001 From: Cory Snider Date: Wed, 3 Jan 2024 15:37:44 -0500 Subject: [PATCH] logger/journald: drop errDrainDone sentinel errDrainDone is a sentinel error which is never supposed to escape the package. Consequently, it needs to be filtered out of returns all over the place, adding boilerplate. Forgetting to filter out these errors would be a logic bug which the compiler would not help us catch. Replace it with boolean multi-valued returns as they can't be accidentally ignored or propagated. Signed-off-by: Cory Snider --- daemon/logger/journald/read.go | 70 ++++++++++++++-------------------- 1 file changed, 29 insertions(+), 41 deletions(-) diff --git a/daemon/logger/journald/read.go b/daemon/logger/journald/read.go index 992108b8c3..e2140328de 100644 --- a/daemon/logger/journald/read.go +++ b/daemon/logger/journald/read.go @@ -4,7 +4,6 @@ package journald // import "github.com/docker/docker/daemon/logger/journald" import ( "context" - "errors" "runtime" "strconv" "sync/atomic" @@ -102,10 +101,6 @@ func getAttrs(d map[string]string) []backend.LogAttr { return attrs } -// errDrainDone is the error returned by drainJournal to signal that there are -// no more log entries to send to the log watcher. -var errDrainDone = errors.New("journald drain done") - // The SeekXYZ() methods all move the journal read pointer to a "conceptual" // position which does not correspond to any journal entry. A subsequent call to // Next(), Previous() or similar is necessary to resolve the read pointer to a @@ -196,7 +191,7 @@ func (r *reader) initialSeekTail() (bool, error) { // wait blocks until the journal has new data to read, the reader's drain // deadline is exceeded, or the log reading consumer is gone. -func (r *reader) wait() (sdjournal.Status, error) { +func (r *reader) wait() (bool, error) { for { dur := waitInterval if !r.drainDeadline.IsZero() { @@ -205,18 +200,20 @@ func (r *reader) wait() (sdjournal.Status, error) { // Container is gone but we haven't found the end of the // logs before the deadline. Maybe it was dropped by // journald, e.g. due to rate-limiting. - return sdjournal.StatusNOP, errDrainDone + return false, nil } else if dur > waitInterval { dur = waitInterval } } status, err := r.j.Wait(dur) - if err != nil || status != sdjournal.StatusNOP { - return status, err + if err != nil { + return false, err + } else if status != sdjournal.StatusNOP { + return true, nil } select { case <-r.logWatcher.WatchConsumerGone(): - return status, errDrainDone + return false, nil case <-r.s.closed: // Container is gone; don't wait indefinitely for journal entries that will never arrive. if r.drainDeadline.IsZero() { @@ -229,13 +226,13 @@ func (r *reader) wait() (sdjournal.Status, error) { // nextWait blocks until there is a new journal entry to read, and advances the // journal read pointer to it. -func (r *reader) nextWait() error { +func (r *reader) nextWait() (bool, error) { for { if ok, err := r.j.Next(); err != nil || ok { - return err + return ok, err } - if _, err := r.wait(); err != nil { - return err + if ok, err := r.wait(); err != nil || !ok { + return false, err } } } @@ -244,25 +241,25 @@ func (r *reader) nextWait() error { // current read pointer, until the end of the journal or a terminal stopping // condition is reached. // -// errDrainDone is returned when a terminal stopping condition has been -// reached: the watch consumer is gone, a log entry is read which has a -// timestamp after until (if until is nonzero), or the log driver is closed and -// the last message logged has been sent from the journal. -func (r *reader) drainJournal() error { +// It returns false when a terminal stopping condition has been reached: the +// watch consumer is gone, a log entry is read which has a timestamp after until +// (if until is nonzero), or the log driver is closed and the last message +// logged has been sent from the journal. +func (r *reader) drainJournal() (bool, error) { for i := 0; ; i++ { // Read the entry's timestamp. timestamp, err := r.j.Realtime() if err != nil { - return err + return true, err } if !r.config.Until.IsZero() && r.config.Until.Before(timestamp) { - return errDrainDone + return false, nil } // Read and send the logged message, if there is one to read. data, err := r.j.Data() if err != nil { - return err + return true, err } if data[fieldLogEpoch] == r.s.epoch { @@ -297,7 +294,7 @@ func (r *reader) drainJournal() error { */ select { case <-r.logWatcher.WatchConsumerGone(): - return errDrainDone + return false, nil case r.logWatcher.Msg <- msg: } } @@ -314,18 +311,15 @@ func (r *reader) drainJournal() error { } if ok, err := r.j.Next(); err != nil || !ok { - return err + return true, err } } } func (r *reader) readJournal() error { caughtUp := atomic.LoadUint64(&r.s.ordinal) - if err := r.drainJournal(); err != nil { - if err != errDrainDone { - return err - } - return nil + if more, err := r.drainJournal(); err != nil || !more { + return err } if !r.config.Follow { @@ -347,17 +341,11 @@ func (r *reader) readJournal() error { default: } - if err := r.nextWait(); err != nil { - if err != errDrainDone { - return err - } - return nil + if more, err := r.nextWait(); err != nil || !more { + return err } - if err := r.drainJournal(); err != nil { - if err != errDrainDone { - return err - } - return nil + if more, err := r.drainJournal(); err != nil || !more { + return err } if !r.config.Follow && r.s.readSyncTimeout > 0 && r.maxOrdinal >= caughtUp { return nil @@ -435,8 +423,8 @@ func (r *reader) readLogs() { // which case the position will be unaffected by subsequent logging, or // the read pointer is in the conceptual position corresponding to the // first journal entry to send once it is logged in the future. - if err := r.nextWait(); err != nil { - if err != errDrainDone { + if more, err := r.nextWait(); err != nil || !more { + if err != nil { r.logWatcher.Err <- err } return