logger/journald: drop errDrainDone sentinel

errDrainDone is a sentinel error which is never supposed to escape the
package. Consequently, it needs to be filtered out of returns all over
the place, adding boilerplate. Forgetting to filter out these errors
would be a logic bug which the compiler would not help us catch. Replace
it with boolean multi-valued returns as they can't be accidentally
ignored or propagated.

Signed-off-by: Cory Snider <csnider@mirantis.com>
This commit is contained in:
Cory Snider 2024-01-03 15:37:44 -05:00
parent d70fe8803c
commit 905477c8ae

View file

@ -4,7 +4,6 @@ package journald // import "github.com/docker/docker/daemon/logger/journald"
import (
"context"
"errors"
"runtime"
"strconv"
"sync/atomic"
@ -102,10 +101,6 @@ func getAttrs(d map[string]string) []backend.LogAttr {
return attrs
}
// errDrainDone is the error returned by drainJournal to signal that there are
// no more log entries to send to the log watcher.
var errDrainDone = errors.New("journald drain done")
// The SeekXYZ() methods all move the journal read pointer to a "conceptual"
// position which does not correspond to any journal entry. A subsequent call to
// Next(), Previous() or similar is necessary to resolve the read pointer to a
@ -196,7 +191,7 @@ func (r *reader) initialSeekTail() (bool, error) {
// wait blocks until the journal has new data to read, the reader's drain
// deadline is exceeded, or the log reading consumer is gone.
func (r *reader) wait() (sdjournal.Status, error) {
func (r *reader) wait() (bool, error) {
for {
dur := waitInterval
if !r.drainDeadline.IsZero() {
@ -205,18 +200,20 @@ func (r *reader) wait() (sdjournal.Status, error) {
// Container is gone but we haven't found the end of the
// logs before the deadline. Maybe it was dropped by
// journald, e.g. due to rate-limiting.
return sdjournal.StatusNOP, errDrainDone
return false, nil
} else if dur > waitInterval {
dur = waitInterval
}
}
status, err := r.j.Wait(dur)
if err != nil || status != sdjournal.StatusNOP {
return status, err
if err != nil {
return false, err
} else if status != sdjournal.StatusNOP {
return true, nil
}
select {
case <-r.logWatcher.WatchConsumerGone():
return status, errDrainDone
return false, nil
case <-r.s.closed:
// Container is gone; don't wait indefinitely for journal entries that will never arrive.
if r.drainDeadline.IsZero() {
@ -229,13 +226,13 @@ func (r *reader) wait() (sdjournal.Status, error) {
// nextWait blocks until there is a new journal entry to read, and advances the
// journal read pointer to it.
func (r *reader) nextWait() error {
func (r *reader) nextWait() (bool, error) {
for {
if ok, err := r.j.Next(); err != nil || ok {
return err
return ok, err
}
if _, err := r.wait(); err != nil {
return err
if ok, err := r.wait(); err != nil || !ok {
return false, err
}
}
}
@ -244,25 +241,25 @@ func (r *reader) nextWait() error {
// current read pointer, until the end of the journal or a terminal stopping
// condition is reached.
//
// errDrainDone is returned when a terminal stopping condition has been
// reached: the watch consumer is gone, a log entry is read which has a
// timestamp after until (if until is nonzero), or the log driver is closed and
// the last message logged has been sent from the journal.
func (r *reader) drainJournal() error {
// It returns false when a terminal stopping condition has been reached: the
// watch consumer is gone, a log entry is read which has a timestamp after until
// (if until is nonzero), or the log driver is closed and the last message
// logged has been sent from the journal.
func (r *reader) drainJournal() (bool, error) {
for i := 0; ; i++ {
// Read the entry's timestamp.
timestamp, err := r.j.Realtime()
if err != nil {
return err
return true, err
}
if !r.config.Until.IsZero() && r.config.Until.Before(timestamp) {
return errDrainDone
return false, nil
}
// Read and send the logged message, if there is one to read.
data, err := r.j.Data()
if err != nil {
return err
return true, err
}
if data[fieldLogEpoch] == r.s.epoch {
@ -297,7 +294,7 @@ func (r *reader) drainJournal() error {
*/
select {
case <-r.logWatcher.WatchConsumerGone():
return errDrainDone
return false, nil
case r.logWatcher.Msg <- msg:
}
}
@ -314,18 +311,15 @@ func (r *reader) drainJournal() error {
}
if ok, err := r.j.Next(); err != nil || !ok {
return err
return true, err
}
}
}
func (r *reader) readJournal() error {
caughtUp := atomic.LoadUint64(&r.s.ordinal)
if err := r.drainJournal(); err != nil {
if err != errDrainDone {
return err
}
return nil
if more, err := r.drainJournal(); err != nil || !more {
return err
}
if !r.config.Follow {
@ -347,17 +341,11 @@ func (r *reader) readJournal() error {
default:
}
if err := r.nextWait(); err != nil {
if err != errDrainDone {
return err
}
return nil
if more, err := r.nextWait(); err != nil || !more {
return err
}
if err := r.drainJournal(); err != nil {
if err != errDrainDone {
return err
}
return nil
if more, err := r.drainJournal(); err != nil || !more {
return err
}
if !r.config.Follow && r.s.readSyncTimeout > 0 && r.maxOrdinal >= caughtUp {
return nil
@ -435,8 +423,8 @@ func (r *reader) readLogs() {
// which case the position will be unaffected by subsequent logging, or
// the read pointer is in the conceptual position corresponding to the
// first journal entry to send once it is logged in the future.
if err := r.nextWait(); err != nil {
if err != errDrainDone {
if more, err := r.nextWait(); err != nil || !more {
if err != nil {
r.logWatcher.Err <- err
}
return