logger/journald: drop errDrainDone sentinel

errDrainDone is a sentinel error which is never supposed to escape the
package. Consequently, it needs to be filtered out of returns all over
the place, adding boilerplate. Forgetting to filter out these errors
would be a logic bug which the compiler would not help us catch. Replace
it with boolean multi-valued returns as they can't be accidentally
ignored or propagated.

Signed-off-by: Cory Snider <csnider@mirantis.com>
(cherry picked from commit 905477c8ae)
Signed-off-by: Cory Snider <csnider@mirantis.com>
This commit is contained in:
Cory Snider 2024-01-03 15:37:44 -05:00
parent 6ac38cdbeb
commit f2d0d87c46

View file

@ -4,7 +4,6 @@ package journald // import "github.com/docker/docker/daemon/logger/journald"
import ( import (
"context" "context"
"errors"
"runtime" "runtime"
"strconv" "strconv"
"sync/atomic" "sync/atomic"
@ -102,10 +101,6 @@ func getAttrs(d map[string]string) []backend.LogAttr {
return attrs return attrs
} }
// errDrainDone is the error returned by drainJournal to signal that there are
// no more log entries to send to the log watcher.
var errDrainDone = errors.New("journald drain done")
// The SeekXYZ() methods all move the journal read pointer to a "conceptual" // The SeekXYZ() methods all move the journal read pointer to a "conceptual"
// position which does not correspond to any journal entry. A subsequent call to // position which does not correspond to any journal entry. A subsequent call to
// Next(), Previous() or similar is necessary to resolve the read pointer to a // Next(), Previous() or similar is necessary to resolve the read pointer to a
@ -196,7 +191,7 @@ func (r *reader) initialSeekTail() (bool, error) {
// wait blocks until the journal has new data to read, the reader's drain // wait blocks until the journal has new data to read, the reader's drain
// deadline is exceeded, or the log reading consumer is gone. // deadline is exceeded, or the log reading consumer is gone.
func (r *reader) wait() (sdjournal.Status, error) { func (r *reader) wait() (bool, error) {
for { for {
dur := waitInterval dur := waitInterval
if !r.drainDeadline.IsZero() { if !r.drainDeadline.IsZero() {
@ -205,18 +200,20 @@ func (r *reader) wait() (sdjournal.Status, error) {
// Container is gone but we haven't found the end of the // Container is gone but we haven't found the end of the
// logs before the deadline. Maybe it was dropped by // logs before the deadline. Maybe it was dropped by
// journald, e.g. due to rate-limiting. // journald, e.g. due to rate-limiting.
return sdjournal.StatusNOP, errDrainDone return false, nil
} else if dur > waitInterval { } else if dur > waitInterval {
dur = waitInterval dur = waitInterval
} }
} }
status, err := r.j.Wait(dur) status, err := r.j.Wait(dur)
if err != nil || status != sdjournal.StatusNOP { if err != nil {
return status, err return false, err
} else if status != sdjournal.StatusNOP {
return true, nil
} }
select { select {
case <-r.logWatcher.WatchConsumerGone(): case <-r.logWatcher.WatchConsumerGone():
return status, errDrainDone return false, nil
case <-r.s.closed: case <-r.s.closed:
// Container is gone; don't wait indefinitely for journal entries that will never arrive. // Container is gone; don't wait indefinitely for journal entries that will never arrive.
if r.drainDeadline.IsZero() { if r.drainDeadline.IsZero() {
@ -229,13 +226,13 @@ func (r *reader) wait() (sdjournal.Status, error) {
// nextWait blocks until there is a new journal entry to read, and advances the // nextWait blocks until there is a new journal entry to read, and advances the
// journal read pointer to it. // journal read pointer to it.
func (r *reader) nextWait() error { func (r *reader) nextWait() (bool, error) {
for { for {
if ok, err := r.j.Next(); err != nil || ok { if ok, err := r.j.Next(); err != nil || ok {
return err return ok, err
} }
if _, err := r.wait(); err != nil { if ok, err := r.wait(); err != nil || !ok {
return err return false, err
} }
} }
} }
@ -244,25 +241,25 @@ func (r *reader) nextWait() error {
// current read pointer, until the end of the journal or a terminal stopping // current read pointer, until the end of the journal or a terminal stopping
// condition is reached. // condition is reached.
// //
// errDrainDone is returned when a terminal stopping condition has been // It returns false when a terminal stopping condition has been reached: the
// reached: the watch consumer is gone, a log entry is read which has a // watch consumer is gone, a log entry is read which has a timestamp after until
// timestamp after until (if until is nonzero), or the log driver is closed and // (if until is nonzero), or the log driver is closed and the last message
// the last message logged has been sent from the journal. // logged has been sent from the journal.
func (r *reader) drainJournal() error { func (r *reader) drainJournal() (bool, error) {
for i := 0; ; i++ { for i := 0; ; i++ {
// Read the entry's timestamp. // Read the entry's timestamp.
timestamp, err := r.j.Realtime() timestamp, err := r.j.Realtime()
if err != nil { if err != nil {
return err return true, err
} }
if !r.config.Until.IsZero() && r.config.Until.Before(timestamp) { if !r.config.Until.IsZero() && r.config.Until.Before(timestamp) {
return errDrainDone return false, nil
} }
// Read and send the logged message, if there is one to read. // Read and send the logged message, if there is one to read.
data, err := r.j.Data() data, err := r.j.Data()
if err != nil { if err != nil {
return err return true, err
} }
if data[fieldLogEpoch] == r.s.epoch { if data[fieldLogEpoch] == r.s.epoch {
@ -297,7 +294,7 @@ func (r *reader) drainJournal() error {
*/ */
select { select {
case <-r.logWatcher.WatchConsumerGone(): case <-r.logWatcher.WatchConsumerGone():
return errDrainDone return false, nil
case r.logWatcher.Msg <- msg: case r.logWatcher.Msg <- msg:
} }
} }
@ -314,18 +311,15 @@ func (r *reader) drainJournal() error {
} }
if ok, err := r.j.Next(); err != nil || !ok { if ok, err := r.j.Next(); err != nil || !ok {
return err return true, err
} }
} }
} }
func (r *reader) readJournal() error { func (r *reader) readJournal() error {
caughtUp := atomic.LoadUint64(&r.s.ordinal) caughtUp := atomic.LoadUint64(&r.s.ordinal)
if err := r.drainJournal(); err != nil { if more, err := r.drainJournal(); err != nil || !more {
if err != errDrainDone { return err
return err
}
return nil
} }
if !r.config.Follow { if !r.config.Follow {
@ -347,17 +341,11 @@ func (r *reader) readJournal() error {
default: default:
} }
if err := r.nextWait(); err != nil { if more, err := r.nextWait(); err != nil || !more {
if err != errDrainDone { return err
return err
}
return nil
} }
if err := r.drainJournal(); err != nil { if more, err := r.drainJournal(); err != nil || !more {
if err != errDrainDone { return err
return err
}
return nil
} }
if !r.config.Follow && r.s.readSyncTimeout > 0 && r.maxOrdinal >= caughtUp { if !r.config.Follow && r.s.readSyncTimeout > 0 && r.maxOrdinal >= caughtUp {
return nil return nil
@ -435,8 +423,8 @@ func (r *reader) readLogs() {
// which case the position will be unaffected by subsequent logging, or // which case the position will be unaffected by subsequent logging, or
// the read pointer is in the conceptual position corresponding to the // the read pointer is in the conceptual position corresponding to the
// first journal entry to send once it is logged in the future. // first journal entry to send once it is logged in the future.
if err := r.nextWait(); err != nil { if more, err := r.nextWait(); err != nil || !more {
if err != errDrainDone { if err != nil {
r.logWatcher.Err <- err r.logWatcher.Err <- err
} }
return return