diff --git a/daemon/logger/journald/internal/fake/sender.go b/daemon/logger/journald/internal/fake/sender.go index 1dc89e0b0e..050f1b71ad 100644 --- a/daemon/logger/journald/internal/fake/sender.go +++ b/daemon/logger/journald/internal/fake/sender.go @@ -72,6 +72,10 @@ type Sender struct { // https://github.com/systemd/systemd/commit/1eede158519e4e5ed22738c90cb57a91dbecb7f2 // (systemd 255). BootID uuid.UUID + + // When set, Send will act as a test helper and redirect + // systemd-journal-remote command output to the test log. + TB testing.TB } // New constructs a new Sender which will write journal entries to outpath. The @@ -101,6 +105,7 @@ func NewT(t *testing.T, outpath string) *Sender { t.Skip(err) } assert.NilError(t, err) + s.TB = t return s } @@ -109,6 +114,9 @@ var validVarName = regexp.MustCompile("^[A-Z0-9][A-Z0-9_]*$") // Send is a drop-in replacement for // github.com/coreos/go-systemd/v22/journal.Send. func (s *Sender) Send(message string, priority journal.Priority, vars map[string]string) error { + if s.TB != nil { + s.TB.Helper() + } var buf bytes.Buffer // https://systemd.io/JOURNAL_EXPORT_FORMATS/ says "if you are // generating this format you shouldn’t care about these special @@ -152,6 +160,16 @@ func (s *Sender) Send(message string, priority journal.Priority, vars map[string // has been flushed to disk when Send returns. cmd := exec.Command(s.CmdName, "--output", s.OutputPath, "-") cmd.Stdin = &buf + + if s.TB != nil { + out, err := cmd.CombinedOutput() + s.TB.Logf("[systemd-journal-remote] %s", out) + var exitErr *exec.ExitError + if errors.As(err, &exitErr) { + s.TB.Logf("systemd-journal-remote exit status: %d", exitErr.ExitCode()) + } + return err + } cmd.Stdout = os.Stdout cmd.Stderr = os.Stderr return cmd.Run() diff --git a/daemon/logger/journald/read.go b/daemon/logger/journald/read.go index e2140328de..96e0dea68f 100644 --- a/daemon/logger/journald/read.go +++ b/daemon/logger/journald/read.go @@ -216,6 +216,9 @@ func (r *reader) wait() (bool, error) { return false, nil case <-r.s.closed: // Container is gone; don't wait indefinitely for journal entries that will never arrive. + if r.maxOrdinal >= atomic.LoadUint64(&r.s.ordinal) { + return false, nil + } if r.drainDeadline.IsZero() { r.drainDeadline = time.Now().Add(closedDrainTimeout) } @@ -241,10 +244,10 @@ func (r *reader) nextWait() (bool, error) { // current read pointer, until the end of the journal or a terminal stopping // condition is reached. // -// It returns false when a terminal stopping condition has been reached: the -// watch consumer is gone, a log entry is read which has a timestamp after until -// (if until is nonzero), or the log driver is closed and the last message -// logged has been sent from the journal. +// It returns false when a terminal stopping condition has been reached: +// - the watch consumer is gone, or +// - (if until is nonzero) a log entry is read which has a timestamp after +// until func (r *reader) drainJournal() (bool, error) { for i := 0; ; i++ { // Read the entry's timestamp. diff --git a/daemon/logger/journald/read_test.go b/daemon/logger/journald/read_test.go index 86fc6b280d..771057671c 100644 --- a/daemon/logger/journald/read_test.go +++ b/daemon/logger/journald/read_test.go @@ -3,6 +3,7 @@ package journald // import "github.com/docker/docker/daemon/logger/journald" import ( + "sync" "testing" "time" @@ -46,32 +47,37 @@ func TestLogRead(t *testing.T) { assert.NilError(t, rotatedJournal.Send("a log message from a totally different process in the active journal", journal.PriInfo, nil)) return func(t *testing.T) logger.Logger { + l, err := new(info) + assert.NilError(t, err) + l.journalReadDir = journalDir + sl := &syncLogger{journald: l, waiters: map[uint64]chan<- struct{}{}} + s := make(chan sendit, 100) t.Cleanup(func() { close(s) }) go func() { for m := range s { <-m.after activeJournal.Send(m.message, m.priority, m.vars) - if m.sent != nil { - close(m.sent) + sl.mu.Lock() + sl.sent++ + if notify, ok := sl.waiters[sl.sent]; ok { + delete(sl.waiters, sl.sent) + close(notify) } + sl.mu.Unlock() } }() - l, err := new(info) - assert.NilError(t, err) - l.journalReadDir = journalDir - sl := &syncLogger{journald: l} l.sendToJournal = func(message string, priority journal.Priority, vars map[string]string) error { - sent := make(chan struct{}) + sl.mu.Lock() + sl.queued++ + sl.mu.Unlock() s <- sendit{ message: message, priority: priority, vars: vars, after: time.After(150 * time.Millisecond), - sent: sent, } - sl.waitOn = sent return nil } l.readSyncTimeout = 3 * time.Second @@ -88,17 +94,31 @@ type sendit struct { priority journal.Priority vars map[string]string after <-chan time.Time - sent chan<- struct{} } type syncLogger struct { *journald - waitOn <-chan struct{} + + mu sync.Mutex + queued, sent uint64 + waiters map[uint64]chan<- struct{} } func (l *syncLogger) Sync() error { - if l.waitOn != nil { - <-l.waitOn + l.mu.Lock() + waitFor := l.queued + if l.sent >= l.queued { + l.mu.Unlock() + return nil } + notify := make(chan struct{}) + l.waiters[waitFor] = notify + l.mu.Unlock() + <-notify return nil } + +func (l *syncLogger) Close() error { + _ = l.Sync() + return l.journald.Close() +} diff --git a/daemon/logger/loggertest/logreader.go b/daemon/logger/loggertest/logreader.go index ddd06b8379..0573a6add4 100644 --- a/daemon/logger/loggertest/logreader.go +++ b/daemon/logger/loggertest/logreader.go @@ -11,6 +11,7 @@ import ( "github.com/google/go-cmp/cmp" "github.com/google/go-cmp/cmp/cmpopts" "gotest.tools/v3/assert" + is "gotest.tools/v3/assert/cmp" "gotest.tools/v3/assert/opt" "github.com/docker/docker/api/types/backend" @@ -437,7 +438,7 @@ func (tr Reader) TestConcurrent(t *testing.T) { logAll := func(msgs []*logger.Message) { defer wg.Done() for _, m := range msgs { - l.Log(copyLogMessage(m)) + assert.Check(t, l.Log(copyLogMessage(m)), "failed to log message %+v", m) } } @@ -450,6 +451,15 @@ func (tr Reader) TestConcurrent(t *testing.T) { defer l.Close() wg.Wait() }() + defer func() { + // Make sure log gets closed before we return + // so the temporary dir can be deleted + select { + case <-time.After(10 * time.Second): + t.Fatal("timed out waiting for logger to close") + case <-closed: + } + }() // Check if the message count, order and content is equal to what was logged for { @@ -473,12 +483,8 @@ func (tr Reader) TestConcurrent(t *testing.T) { *messages = (*messages)[1:] } - assert.Equal(t, len(stdoutMessages), 0) - assert.Equal(t, len(stderrMessages), 0) - - // Make sure log gets closed before we return - // so the temporary dir can be deleted - <-closed + assert.Check(t, is.Len(stdoutMessages, 0), "expected stdout messages were not read") + assert.Check(t, is.Len(stderrMessages, 0), "expected stderr messages were not read") } // logMessages logs messages to l and returns a slice of messages as would be