Procházet zdrojové kódy

Merge pull request #47256 from corhere/journald/quit-when-youre-ahead

daemon/logger/journald: quit waiting when the logger closes
Brian Goff před 1 rokem
rodič
revize
7c4828f8fb

+ 18 - 0
daemon/logger/journald/internal/fake/sender.go

@@ -72,6 +72,10 @@ type Sender struct {
 	// https://github.com/systemd/systemd/commit/1eede158519e4e5ed22738c90cb57a91dbecb7f2
 	// (systemd 255).
 	BootID uuid.UUID
+
+	// When set, Send will act as a test helper and redirect
+	// systemd-journal-remote command output to the test log.
+	TB testing.TB
 }
 
 // New constructs a new Sender which will write journal entries to outpath. The
@@ -101,6 +105,7 @@ func NewT(t *testing.T, outpath string) *Sender {
 		t.Skip(err)
 	}
 	assert.NilError(t, err)
+	s.TB = t
 	return s
 }
 
@@ -109,6 +114,9 @@ var validVarName = regexp.MustCompile("^[A-Z0-9][A-Z0-9_]*$")
 // Send is a drop-in replacement for
 // github.com/coreos/go-systemd/v22/journal.Send.
 func (s *Sender) Send(message string, priority journal.Priority, vars map[string]string) error {
+	if s.TB != nil {
+		s.TB.Helper()
+	}
 	var buf bytes.Buffer
 	// https://systemd.io/JOURNAL_EXPORT_FORMATS/ says "if you are
 	// generating this format you shouldn’t care about these special
@@ -152,6 +160,16 @@ func (s *Sender) Send(message string, priority journal.Priority, vars map[string
 	// has been flushed to disk when Send returns.
 	cmd := exec.Command(s.CmdName, "--output", s.OutputPath, "-")
 	cmd.Stdin = &buf
+
+	if s.TB != nil {
+		out, err := cmd.CombinedOutput()
+		s.TB.Logf("[systemd-journal-remote] %s", out)
+		var exitErr *exec.ExitError
+		if errors.As(err, &exitErr) {
+			s.TB.Logf("systemd-journal-remote exit status: %d", exitErr.ExitCode())
+		}
+		return err
+	}
 	cmd.Stdout = os.Stdout
 	cmd.Stderr = os.Stderr
 	return cmd.Run()

+ 7 - 4
daemon/logger/journald/read.go

@@ -216,6 +216,9 @@ func (r *reader) wait() (bool, error) {
 			return false, nil
 		case <-r.s.closed:
 			// Container is gone; don't wait indefinitely for journal entries that will never arrive.
+			if r.maxOrdinal >= atomic.LoadUint64(&r.s.ordinal) {
+				return false, nil
+			}
 			if r.drainDeadline.IsZero() {
 				r.drainDeadline = time.Now().Add(closedDrainTimeout)
 			}
@@ -241,10 +244,10 @@ func (r *reader) nextWait() (bool, error) {
 // current read pointer, until the end of the journal or a terminal stopping
 // condition is reached.
 //
-// It returns false when a terminal stopping condition has been reached: the
-// watch consumer is gone, a log entry is read which has a timestamp after until
-// (if until is nonzero), or the log driver is closed and the last message
-// logged has been sent from the journal.
+// It returns false when a terminal stopping condition has been reached:
+//   - the watch consumer is gone, or
+//   - (if until is nonzero) a log entry is read which has a timestamp after
+//     until
 func (r *reader) drainJournal() (bool, error) {
 	for i := 0; ; i++ {
 		// Read the entry's timestamp.

+ 33 - 13
daemon/logger/journald/read_test.go

@@ -3,6 +3,7 @@
 package journald // import "github.com/docker/docker/daemon/logger/journald"
 
 import (
+	"sync"
 	"testing"
 	"time"
 
@@ -46,32 +47,37 @@ func TestLogRead(t *testing.T) {
 			assert.NilError(t, rotatedJournal.Send("a log message from a totally different process in the active journal", journal.PriInfo, nil))
 
 			return func(t *testing.T) logger.Logger {
+				l, err := new(info)
+				assert.NilError(t, err)
+				l.journalReadDir = journalDir
+				sl := &syncLogger{journald: l, waiters: map[uint64]chan<- struct{}{}}
+
 				s := make(chan sendit, 100)
 				t.Cleanup(func() { close(s) })
 				go func() {
 					for m := range s {
 						<-m.after
 						activeJournal.Send(m.message, m.priority, m.vars)
-						if m.sent != nil {
-							close(m.sent)
+						sl.mu.Lock()
+						sl.sent++
+						if notify, ok := sl.waiters[sl.sent]; ok {
+							delete(sl.waiters, sl.sent)
+							close(notify)
 						}
+						sl.mu.Unlock()
 					}
 				}()
-				l, err := new(info)
-				assert.NilError(t, err)
-				l.journalReadDir = journalDir
 
-				sl := &syncLogger{journald: l}
 				l.sendToJournal = func(message string, priority journal.Priority, vars map[string]string) error {
-					sent := make(chan struct{})
+					sl.mu.Lock()
+					sl.queued++
+					sl.mu.Unlock()
 					s <- sendit{
 						message:  message,
 						priority: priority,
 						vars:     vars,
 						after:    time.After(150 * time.Millisecond),
-						sent:     sent,
 					}
-					sl.waitOn = sent
 					return nil
 				}
 				l.readSyncTimeout = 3 * time.Second
@@ -88,17 +94,31 @@ type sendit struct {
 	priority journal.Priority
 	vars     map[string]string
 	after    <-chan time.Time
-	sent     chan<- struct{}
 }
 
 type syncLogger struct {
 	*journald
-	waitOn <-chan struct{}
+
+	mu           sync.Mutex
+	queued, sent uint64
+	waiters      map[uint64]chan<- struct{}
 }
 
 func (l *syncLogger) Sync() error {
-	if l.waitOn != nil {
-		<-l.waitOn
+	l.mu.Lock()
+	waitFor := l.queued
+	if l.sent >= l.queued {
+		l.mu.Unlock()
+		return nil
 	}
+	notify := make(chan struct{})
+	l.waiters[waitFor] = notify
+	l.mu.Unlock()
+	<-notify
 	return nil
 }
+
+func (l *syncLogger) Close() error {
+	_ = l.Sync()
+	return l.journald.Close()
+}

+ 13 - 7
daemon/logger/loggertest/logreader.go

@@ -11,6 +11,7 @@ import (
 	"github.com/google/go-cmp/cmp"
 	"github.com/google/go-cmp/cmp/cmpopts"
 	"gotest.tools/v3/assert"
+	is "gotest.tools/v3/assert/cmp"
 	"gotest.tools/v3/assert/opt"
 
 	"github.com/docker/docker/api/types/backend"
@@ -437,7 +438,7 @@ func (tr Reader) TestConcurrent(t *testing.T) {
 	logAll := func(msgs []*logger.Message) {
 		defer wg.Done()
 		for _, m := range msgs {
-			l.Log(copyLogMessage(m))
+			assert.Check(t, l.Log(copyLogMessage(m)), "failed to log message %+v", m)
 		}
 	}
 
@@ -450,6 +451,15 @@ func (tr Reader) TestConcurrent(t *testing.T) {
 		defer l.Close()
 		wg.Wait()
 	}()
+	defer func() {
+		// Make sure log gets closed before we return
+		// so the temporary dir can be deleted
+		select {
+		case <-time.After(10 * time.Second):
+			t.Fatal("timed out waiting for logger to close")
+		case <-closed:
+		}
+	}()
 
 	// Check if the message count, order and content is equal to what was logged
 	for {
@@ -473,12 +483,8 @@ func (tr Reader) TestConcurrent(t *testing.T) {
 		*messages = (*messages)[1:]
 	}
 
-	assert.Equal(t, len(stdoutMessages), 0)
-	assert.Equal(t, len(stderrMessages), 0)
-
-	// Make sure log gets closed before we return
-	// so the temporary dir can be deleted
-	<-closed
+	assert.Check(t, is.Len(stdoutMessages, 0), "expected stdout messages were not read")
+	assert.Check(t, is.Len(stderrMessages, 0), "expected stderr messages were not read")
 }
 
 // logMessages logs messages to l and returns a slice of messages as would be