소스 검색

logger/journald: implement --follow correctly

Implement --follow entirely correctly for the journald log reader, such
that it exits immediately upon reading back the last log message written
to the journal before the logger was closed. The impossibility of doing
so has been slightly exaggerated.

Signed-off-by: Cory Snider <csnider@mirantis.com>
Cory Snider 3 년 전
부모
커밋
ef5b279887
4개의 변경된 파일296개의 추가작업 그리고 97개의 파일을 삭제
  1. 34 3
      daemon/logger/journald/journald.go
  2. 201 92
      daemon/logger/journald/read.go
  3. 47 2
      daemon/logger/journald/read_test.go
  4. 14 0
      daemon/logger/loggertest/logreader.go

+ 34 - 3
daemon/logger/journald/journald.go

@@ -6,12 +6,15 @@ package journald // import "github.com/docker/docker/daemon/logger/journald"
 import (
 	"fmt"
 	"strconv"
+	"sync/atomic"
 	"time"
 	"unicode"
 
 	"github.com/coreos/go-systemd/v22/journal"
+
 	"github.com/docker/docker/daemon/logger"
 	"github.com/docker/docker/daemon/logger/loggerutils"
+	"github.com/docker/docker/pkg/stringid"
 )
 
 const name = "journald"
@@ -37,17 +40,31 @@ const (
 	fieldPLogOrdinal    = "CONTAINER_PARTIAL_ORDINAL"
 	fieldPLogLast       = "CONTAINER_PARTIAL_LAST"
 	fieldPartialMessage = "CONTAINER_PARTIAL_MESSAGE"
+
+	fieldLogEpoch   = "CONTAINER_LOG_EPOCH"
+	fieldLogOrdinal = "CONTAINER_LOG_ORDINAL"
 )
 
+var waitUntilFlushed func(*journald) error
+
 type journald struct {
+	// Sequence number of the most recent message sent by this instance of
+	// the log driver, starting from 1. Corollary: ordinal == 0 implies no
+	// messages have been sent by this instance.
+	ordinal uint64 // Placed first in struct to ensure 8-byte alignment for atomic ops.
+	// Epoch identifier to distinguish sequence numbers from this instance
+	// vs. other instances.
+	epoch string
+
 	vars map[string]string // additional variables and values to send to the journal along with the log message
 
 	closed chan struct{}
 
 	// Overrides for unit tests.
 
-	sendToJournal  func(message string, priority journal.Priority, vars map[string]string) error
-	journalReadDir string //nolint:structcheck,unused // Referenced in read.go, which has more restrictive build constraints.
+	sendToJournal   func(message string, priority journal.Priority, vars map[string]string) error
+	journalReadDir  string //nolint:structcheck,unused // Referenced in read.go, which has more restrictive build constraints.
+	readSyncTimeout time.Duration
 }
 
 func init() {
@@ -96,6 +113,8 @@ func new(info logger.Info) (*journald, error) {
 		return nil, err
 	}
 
+	epoch := stringid.GenerateRandomID()
+
 	vars := map[string]string{
 		fieldContainerID:      info.ContainerID[:12],
 		fieldContainerIDFull:  info.ContainerID,
@@ -103,6 +122,7 @@ func new(info logger.Info) (*journald, error) {
 		fieldContainerTag:     tag,
 		fieldImageName:        info.ImageName(),
 		fieldSyslogIdentifier: tag,
+		fieldLogEpoch:         epoch,
 	}
 	extraAttrs, err := info.ExtraAttributes(sanitizeKeyMod)
 	if err != nil {
@@ -111,7 +131,12 @@ func new(info logger.Info) (*journald, error) {
 	for k, v := range extraAttrs {
 		vars[k] = v
 	}
-	return &journald{vars: vars, closed: make(chan struct{}), sendToJournal: journal.Send}, nil
+	return &journald{
+		epoch:         epoch,
+		vars:          vars,
+		closed:        make(chan struct{}),
+		sendToJournal: journal.Send,
+	}, nil
 }
 
 // We don't actually accept any options, but we have to supply a callback for
@@ -152,6 +177,9 @@ func (s *journald) Log(msg *logger.Message) error {
 	source := msg.Source
 	logger.PutMessage(msg)
 
+	seq := atomic.AddUint64(&s.ordinal, 1)
+	vars[fieldLogOrdinal] = strconv.FormatUint(seq, 10)
+
 	if source == "stderr" {
 		return s.sendToJournal(line, journal.PriErr, vars)
 	}
@@ -164,5 +192,8 @@ func (s *journald) Name() string {
 
 func (s *journald) Close() error {
 	close(s.closed)
+	if waitUntilFlushed != nil {
+		return waitUntilFlushed(s)
+	}
 	return nil
 }

+ 201 - 92
daemon/logger/journald/read.go

@@ -7,6 +7,7 @@ import (
 	"errors"
 	"runtime"
 	"strconv"
+	"sync/atomic"
 	"time"
 
 	"github.com/coreos/go-systemd/v22/journal"
@@ -17,6 +18,8 @@ import (
 	"github.com/docker/docker/daemon/logger/journald/internal/sdjournal"
 )
 
+const closedDrainTimeout = 5 * time.Second
+
 // Fields which we know are not user-provided attribute fields.
 var wellKnownFields = map[string]bool{
 	"MESSAGE":             true,
@@ -39,6 +42,18 @@ var wellKnownFields = map[string]bool{
 	fieldPLogOrdinal:      true,
 	fieldPLogLast:         true,
 	fieldPartialMessage:   true,
+	fieldLogEpoch:         true,
+	fieldLogOrdinal:       true,
+}
+
+type reader struct {
+	s           *journald
+	j           *sdjournal.Journal
+	logWatcher  *logger.LogWatcher
+	config      logger.ReadConfig
+	maxOrdinal  uint64
+	initialized bool
+	ready       chan struct{}
 }
 
 func getMessage(d map[string]string) (line []byte, ok bool) {
@@ -89,45 +104,41 @@ func getAttrs(d map[string]string) []backend.LogAttr {
 // no more log entries to send to the log watcher.
 var errDrainDone = errors.New("journald drain done")
 
-// drainJournal reads and sends log messages from the journal. It returns the
-// number of log messages sent and any error encountered. When initial != nil
-// it initializes the journal read position to the position specified by config
-// before reading. Otherwise it continues to read from the current position.
+// drainJournal reads and sends log messages from the journal.
 //
-// drainJournal returns err == errDrainDone when a terminal stopping condition
-// has been reached: either the watch consumer is gone or a log entry is read
-// which has a timestamp after until (if until is nonzero). If the end of the
-// journal is reached without encountering a terminal stopping condition,
-// err == nil is returned.
-func (s *journald) drainJournal(logWatcher *logger.LogWatcher, j *sdjournal.Journal, config logger.ReadConfig, initial chan struct{}) (int, error) {
-	isInitial := initial != nil
-	if isInitial {
+// drainJournal returns errDrainDone when a terminal stopping condition has been
+// reached: the watch consumer is gone, a log entry is read which has a
+// timestamp after until (if until is nonzero), or the log driver is closed and
+// the last message logged has been sent from the journal. If the end of the
+// journal is reached without encountering a terminal stopping condition, a nil
+// error is returned.
+func (r *reader) drainJournal() error {
+	if !r.initialized {
 		defer func() {
-			if initial != nil {
-				close(initial)
-			}
+			r.signalReady()
+			r.initialized = true
 		}()
 
 		var (
 			err          error
 			seekedToTail bool
 		)
-		if config.Tail >= 0 {
-			if config.Until.IsZero() {
-				err = j.SeekTail()
+		if r.config.Tail >= 0 {
+			if r.config.Until.IsZero() {
+				err = r.j.SeekTail()
 				seekedToTail = true
 			} else {
-				err = j.SeekRealtime(config.Until)
+				err = r.j.SeekRealtime(r.config.Until)
 			}
 		} else {
-			if config.Since.IsZero() {
-				err = j.SeekHead()
+			if r.config.Since.IsZero() {
+				err = r.j.SeekHead()
 			} else {
-				err = j.SeekRealtime(config.Since)
+				err = r.j.SeekRealtime(r.config.Since)
 			}
 		}
 		if err != nil {
-			return 0, err
+			return err
 		}
 
 		// SeekTail() followed by Next() behaves incorrectly, so we need
@@ -137,57 +148,62 @@ func (s *journald) drainJournal(logWatcher *logger.LogWatcher, j *sdjournal.Jour
 		// so the only special case requiring special handling is
 		// config.Tail == 0.
 		// https://github.com/systemd/systemd/issues/9934
-		if seekedToTail && config.Tail == 0 {
+		if seekedToTail && r.config.Tail == 0 {
 			// Resolve the read pointer to the last entry in the
 			// journal so that the call to Next() inside the loop
 			// advances past it.
-			if ok, err := j.Previous(); err != nil || !ok {
-				return 0, err
+			if ok, err := r.j.Previous(); err != nil || !ok {
+				return err
 			}
 		}
 	}
 
-	var sent int
 	for i := 0; ; i++ {
-		if isInitial && i == 0 && config.Tail > 0 {
-			if n, err := j.PreviousSkip(uint(config.Tail)); err != nil || n == 0 {
-				return sent, err
+		if !r.initialized && i == 0 && r.config.Tail > 0 {
+			if n, err := r.j.PreviousSkip(uint(r.config.Tail)); err != nil || n == 0 {
+				return err
 			}
-		} else if ok, err := j.Next(); err != nil || !ok {
-			return sent, err
+		} else if ok, err := r.j.Next(); err != nil || !ok {
+			return err
 		}
 
-		if isInitial && i == 0 {
+		if !r.initialized && i == 0 {
 			// The cursor is in a position which will be unaffected
-			// by subsequent logging. Signal that the watcher is
-			// initialized.
-			close(initial)
-			initial = nil // Prevent double-closing.
+			// by subsequent logging.
+			r.signalReady()
 		}
 
 		// Read the entry's timestamp.
-		timestamp, err := j.Realtime()
+		timestamp, err := r.j.Realtime()
 		if err != nil {
-			return sent, err
+			return err
 		}
 		// Check if the PreviousSkip went too far back. Check only the
 		// initial position as we are comparing wall-clock timestamps,
 		// which may not be monotonic. We don't want to skip over
 		// messages sent later in time just because the clock moved
 		// backwards.
-		if isInitial && i == 0 && config.Tail > 0 && timestamp.Before(config.Since) {
-			j.SeekRealtime(config.Since)
+		if !r.initialized && i == 0 && r.config.Tail > 0 && timestamp.Before(r.config.Since) {
+			r.j.SeekRealtime(r.config.Since)
 			continue
 		}
-		if !config.Until.IsZero() && config.Until.Before(timestamp) {
-			return sent, errDrainDone
+		if !r.config.Until.IsZero() && r.config.Until.Before(timestamp) {
+			return errDrainDone
 		}
 
 		// Read and send the logged message, if there is one to read.
-		data, err := j.Data()
+		data, err := r.j.Data()
 		if err != nil {
-			return sent, err
+			return err
 		}
+
+		if data[fieldLogEpoch] == r.s.epoch {
+			seq, err := strconv.ParseUint(data[fieldLogOrdinal], 10, 64)
+			if err == nil && seq > r.maxOrdinal {
+				r.maxOrdinal = seq
+			}
+		}
+
 		if line, ok := getMessage(data); ok {
 			// Send the log message, unless the consumer is gone
 			msg := &logger.Message{
@@ -212,19 +228,18 @@ func (s *journald) drainJournal(logWatcher *logger.LogWatcher, j *sdjournal.Jour
 				}
 			*/
 			select {
-			case <-logWatcher.WatchConsumerGone():
-				return sent, errDrainDone
-			case logWatcher.Msg <- msg:
-				sent++
+			case <-r.logWatcher.WatchConsumerGone():
+				return errDrainDone
+			case r.logWatcher.Msg <- msg:
 			}
 		}
 
 		// Call sd_journal_process() periodically during the processing loop
 		// to close any opened file descriptors for rotated (deleted) journal files.
 		if i != 0 && i%1024 == 0 {
-			if _, err := j.Process(); err != nil {
+			if _, err := r.j.Process(); err != nil {
 				// log a warning but ignore it for now
-				logrus.WithField("container", s.vars[fieldContainerIDFull]).
+				logrus.WithField("container", r.s.vars[fieldContainerIDFull]).
 					WithField("error", err).
 					Warn("journald: error processing journal")
 			}
@@ -232,27 +247,50 @@ func (s *journald) drainJournal(logWatcher *logger.LogWatcher, j *sdjournal.Jour
 	}
 }
 
-func (s *journald) readJournal(logWatcher *logger.LogWatcher, j *sdjournal.Journal, config logger.ReadConfig, ready chan struct{}) error {
-	if _, err := s.drainJournal(logWatcher, j, config, ready /* initial */); err != nil {
+func (r *reader) readJournal() error {
+	caughtUp := atomic.LoadUint64(&r.s.ordinal)
+	if err := r.drainJournal(); err != nil {
 		if err != errDrainDone {
 			return err
 		}
 		return nil
 	}
-	if !config.Follow {
-		return nil
+
+	var drainTimeout <-chan time.Time
+	if !r.config.Follow {
+		if r.s.readSyncTimeout == 0 {
+			return nil
+		}
+		tmr := time.NewTimer(r.s.readSyncTimeout)
+		defer tmr.Stop()
+		drainTimeout = tmr.C
 	}
 
 	for {
-		status, err := j.Wait(250 * time.Millisecond)
+		status, err := r.j.Wait(250 * time.Millisecond)
 		if err != nil {
 			return err
 		}
 		select {
-		case <-logWatcher.WatchConsumerGone():
+		case <-r.logWatcher.WatchConsumerGone():
 			return nil // won't be able to write anything anymore
-		case <-s.closed:
+		case <-drainTimeout:
+			// Container is gone but we haven't found the end of the
+			// logs within the timeout. Maybe it was dropped by
+			// journald, e.g. due to rate-limiting.
+			return nil
+		case <-r.s.closed:
 			// container is gone, drain journal
+			lastSeq := atomic.LoadUint64(&r.s.ordinal)
+			if r.maxOrdinal >= lastSeq {
+				// All caught up with the logger!
+				return nil
+			}
+			if drainTimeout == nil {
+				tmr := time.NewTimer(closedDrainTimeout)
+				defer tmr.Stop()
+				drainTimeout = tmr.C
+			}
 		default:
 			// container is still alive
 			if status == sdjournal.StatusNOP {
@@ -260,30 +298,25 @@ func (s *journald) readJournal(logWatcher *logger.LogWatcher, j *sdjournal.Journ
 				continue
 			}
 		}
-		n, err := s.drainJournal(logWatcher, j, config, nil /* initial */)
+		err = r.drainJournal()
 		if err != nil {
 			if err != errDrainDone {
 				return err
 			}
 			return nil
-		} else if status == sdjournal.StatusNOP && n == 0 {
+		}
+		if !r.config.Follow && r.s.readSyncTimeout > 0 && r.maxOrdinal >= caughtUp {
 			return nil
 		}
 	}
 }
 
-func (s *journald) readLogs(logWatcher *logger.LogWatcher, config logger.ReadConfig, ready chan struct{}) {
-	defer close(logWatcher.Msg)
+func (r *reader) readLogs() {
+	defer close(r.logWatcher.Msg)
 
 	// Make sure the ready channel is closed in the event of an early
 	// return.
-	defer func() {
-		select {
-		case <-ready:
-		default:
-			close(ready)
-		}
-	}()
+	defer r.signalReady()
 
 	// Quoting https://www.freedesktop.org/software/systemd/man/sd-journal.html:
 	//     Functions that operate on sd_journal objects are thread
@@ -298,52 +331,128 @@ func (s *journald) readLogs(logWatcher *logger.LogWatcher, config logger.ReadCon
 	defer runtime.UnlockOSThread()
 
 	// Get a handle to the journal.
-	var (
-		j   *sdjournal.Journal
-		err error
-	)
-	if s.journalReadDir != "" {
-		j, err = sdjournal.OpenDir(s.journalReadDir, 0)
+	var err error
+	if r.s.journalReadDir != "" {
+		r.j, err = sdjournal.OpenDir(r.s.journalReadDir, 0)
 	} else {
-		j, err = sdjournal.Open(0)
+		r.j, err = sdjournal.Open(0)
 	}
 	if err != nil {
-		logWatcher.Err <- err
+		r.logWatcher.Err <- err
 		return
 	}
-	defer j.Close()
+	defer r.j.Close()
 
-	if config.Follow {
+	if r.config.Follow {
 		// Initialize library inotify watches early
-		if err := j.InitializeInotify(); err != nil {
-			logWatcher.Err <- err
+		if err := r.j.InitializeInotify(); err != nil {
+			r.logWatcher.Err <- err
 			return
 		}
 	}
 
 	// Remove limits on the size of data items that we'll retrieve.
-	if err := j.SetDataThreshold(0); err != nil {
-		logWatcher.Err <- err
+	if err := r.j.SetDataThreshold(0); err != nil {
+		r.logWatcher.Err <- err
 		return
 	}
 	// Add a match to have the library do the searching for us.
-	if err := j.AddMatch(fieldContainerIDFull, s.vars[fieldContainerIDFull]); err != nil {
-		logWatcher.Err <- err
+	if err := r.j.AddMatch(fieldContainerIDFull, r.s.vars[fieldContainerIDFull]); err != nil {
+		r.logWatcher.Err <- err
 		return
 	}
 
-	if err := s.readJournal(logWatcher, j, config, ready); err != nil {
-		logWatcher.Err <- err
+	if err := r.readJournal(); err != nil {
+		r.logWatcher.Err <- err
 		return
 	}
 }
 
+func (r *reader) signalReady() {
+	select {
+	case <-r.ready:
+	default:
+		close(r.ready)
+	}
+}
+
 func (s *journald) ReadLogs(config logger.ReadConfig) *logger.LogWatcher {
-	logWatcher := logger.NewLogWatcher()
-	ready := make(chan struct{})
-	go s.readLogs(logWatcher, config, ready)
+	r := &reader{
+		s:          s,
+		logWatcher: logger.NewLogWatcher(),
+		config:     config,
+		ready:      make(chan struct{}),
+	}
+	go r.readLogs()
 	// Block until the reader is in position to read from the current config
 	// location to prevent race conditions in tests.
-	<-ready
-	return logWatcher
+	<-r.ready
+	return r.logWatcher
+}
+
+func waitUntilFlushedImpl(s *journald) error {
+	if s.readSyncTimeout == 0 {
+		return nil
+	}
+
+	ordinal := atomic.LoadUint64(&s.ordinal)
+	if ordinal == 0 {
+		return nil // No logs were sent; nothing to wait for.
+	}
+
+	flushed := make(chan error)
+	go func() {
+		defer close(flushed)
+		runtime.LockOSThread()
+
+		var (
+			j   *sdjournal.Journal
+			err error
+		)
+		if s.journalReadDir != "" {
+			j, err = sdjournal.OpenDir(s.journalReadDir, 0)
+		} else {
+			j, err = sdjournal.Open(0)
+		}
+		if err != nil {
+			flushed <- err
+			return
+		}
+		defer j.Close()
+
+		if err := j.AddMatch(fieldContainerIDFull, s.vars[fieldContainerIDFull]); err != nil {
+			flushed <- err
+			return
+		}
+		if err := j.AddMatch(fieldLogEpoch, s.epoch); err != nil {
+			flushed <- err
+			return
+		}
+		if err := j.AddMatch(fieldLogOrdinal, strconv.FormatUint(ordinal, 10)); err != nil {
+			flushed <- err
+			return
+		}
+
+		deadline := time.Now().Add(s.readSyncTimeout)
+		for time.Now().Before(deadline) {
+			if ok, err := j.Next(); ok {
+				// Found it!
+				return
+			} else if err != nil {
+				flushed <- err
+				return
+			}
+			if _, err := j.Wait(100 * time.Millisecond); err != nil {
+				flushed <- err
+				return
+			}
+		}
+		logrus.WithField("container", s.vars[fieldContainerIDFull]).
+			Warn("journald: deadline exceeded waiting for logs to be committed to journal")
+	}()
+	return <-flushed
+}
+
+func init() {
+	waitUntilFlushed = waitUntilFlushedImpl
 }

+ 47 - 2
daemon/logger/journald/read_test.go

@@ -47,14 +47,59 @@ func TestLogRead(t *testing.T) {
 			assert.NilError(t, rotatedJournal.Send("a log message from a totally different process in the active journal", journal.PriInfo, nil))
 
 			return func(t *testing.T) logger.Logger {
+				s := make(chan sendit, 100)
+				t.Cleanup(func() { close(s) })
+				go func() {
+					for m := range s {
+						<-m.after
+						activeJournal.Send(m.message, m.priority, m.vars)
+						if m.sent != nil {
+							close(m.sent)
+						}
+					}
+				}()
 				l, err := new(info)
 				assert.NilError(t, err)
 				l.journalReadDir = journalDir
-				l.sendToJournal = activeJournal.Send
-				return l
+
+				sl := &syncLogger{journald: l}
+				l.sendToJournal = func(message string, priority journal.Priority, vars map[string]string) error {
+					sent := make(chan struct{})
+					s <- sendit{
+						message:  message,
+						priority: priority,
+						vars:     vars,
+						after:    time.After(150 * time.Millisecond),
+						sent:     sent,
+					}
+					sl.waitOn = sent
+					return nil
+				}
+				l.readSyncTimeout = 3 * time.Second
+				return sl
 			}
 		},
 	}
 	t.Run("Tail", r.TestTail)
 	t.Run("Follow", r.TestFollow)
 }
+
+type sendit struct {
+	message  string
+	priority journal.Priority
+	vars     map[string]string
+	after    <-chan time.Time
+	sent     chan<- struct{}
+}
+
+type syncLogger struct {
+	*journald
+	waitOn <-chan struct{}
+}
+
+func (l *syncLogger) Sync() error {
+	if l.waitOn != nil {
+		<-l.waitOn
+	}
+	return nil
+}

+ 14 - 0
daemon/logger/loggertest/logreader.go

@@ -16,6 +16,18 @@ import (
 	"github.com/docker/docker/daemon/logger"
 )
 
+type syncer interface {
+	// Sync commits the current logs to stable storage such that the most
+	// recently-logged message can be immediately read back by a LogReader.
+	Sync() error
+}
+
+func syncLogger(t *testing.T, l logger.Logger) {
+	if sl, ok := l.(syncer); ok {
+		assert.NilError(t, sl.Sync())
+	}
+}
+
 // Reader tests that a logger.LogReader implementation behaves as it should.
 type Reader struct {
 	// Factory returns a function which constructs loggers for the container
@@ -316,6 +328,7 @@ func (tr Reader) TestFollow(t *testing.T) {
 
 		mm := makeTestMessages()
 		logMessages(t, l, mm[0:2])
+		syncLogger(t, l)
 
 		lw := l.(logger.LogReader).ReadLogs(logger.ReadConfig{Tail: 0, Follow: true})
 		defer lw.ConsumerGone()
@@ -342,6 +355,7 @@ func (tr Reader) TestFollow(t *testing.T) {
 
 		mm := makeTestMessages()
 		expected := logMessages(t, l, mm[0:2])[1:]
+		syncLogger(t, l)
 
 		lw := l.(logger.LogReader).ReadLogs(logger.ReadConfig{Tail: 1, Follow: true})
 		defer lw.ConsumerGone()