Merge pull request #47019 from corhere/fix-journald-logs-systemd-255

logger/journald: fix tailing logs with systemd 255
This commit is contained in:
Brian Goff 2024-01-28 12:22:16 -08:00 committed by GitHub
commit 0f507ef624
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
3 changed files with 226 additions and 143 deletions

View file

@ -22,6 +22,7 @@ import (
"code.cloudfoundry.org/clock"
"github.com/coreos/go-systemd/v22/journal"
"github.com/google/uuid"
"gotest.tools/v3/assert"
"github.com/docker/docker/daemon/logger/journald/internal/export"
@ -67,6 +68,10 @@ type Sender struct {
// timestamp in zero time after the SYSLOG_TIMESTAMP value was set,
// which is higly unrealistic in practice.
AssignEventTimestampFromSyslogTimestamp bool
// Boot ID for journal entries. Required by systemd-journal-remote as of
// https://github.com/systemd/systemd/commit/1eede158519e4e5ed22738c90cb57a91dbecb7f2
// (systemd 255).
BootID uuid.UUID
}
// New constructs a new Sender which will write journal entries to outpath. The
@ -82,6 +87,7 @@ func New(outpath string) (*Sender, error) {
CmdName: p,
OutputPath: outpath,
Clock: clock.NewClock(),
BootID: uuid.New(), // UUIDv4, like systemd itself generates for sd_id128 values.
}
return sender, nil
}
@ -121,6 +127,9 @@ func (s *Sender) Send(message string, priority journal.Priority, vars map[string
if err := export.WriteField(&buf, "__REALTIME_TIMESTAMP", strconv.FormatInt(ts.UnixMicro(), 10)); err != nil {
return fmt.Errorf("fake: error writing entry to systemd-journal-remote: %w", err)
}
if err := export.WriteField(&buf, "_BOOT_ID", fmt.Sprintf("%x", [16]byte(s.BootID))); err != nil {
return fmt.Errorf("fake: error writing entry to systemd-journal-remote: %w", err)
}
if err := export.WriteField(&buf, "MESSAGE", message); err != nil {
return fmt.Errorf("fake: error writing entry to systemd-journal-remote: %w", err)
}

View file

@ -4,7 +4,6 @@ package journald // import "github.com/docker/docker/daemon/logger/journald"
import (
"context"
"errors"
"runtime"
"strconv"
"sync/atomic"
@ -17,7 +16,10 @@ import (
"github.com/docker/docker/daemon/logger/journald/internal/sdjournal"
)
const closedDrainTimeout = 5 * time.Second
const (
closedDrainTimeout = 5 * time.Second
waitInterval = 250 * time.Millisecond
)
// Fields which we know are not user-provided attribute fields.
var wellKnownFields = map[string]bool{
@ -46,13 +48,13 @@ var wellKnownFields = map[string]bool{
}
type reader struct {
s *journald
j *sdjournal.Journal
logWatcher *logger.LogWatcher
config logger.ReadConfig
maxOrdinal uint64
initialized bool
ready chan struct{}
s *journald
j *sdjournal.Journal
logWatcher *logger.LogWatcher
config logger.ReadConfig
maxOrdinal uint64
ready chan struct{}
drainDeadline time.Time
}
func getMessage(d map[string]string) (line []byte, ok bool) {
@ -99,101 +101,165 @@ func getAttrs(d map[string]string) []backend.LogAttr {
return attrs
}
// errDrainDone is the error returned by drainJournal to signal that there are
// no more log entries to send to the log watcher.
var errDrainDone = errors.New("journald drain done")
// The SeekXYZ() methods all move the journal read pointer to a "conceptual"
// position which does not correspond to any journal entry. A subsequent call to
// Next(), Previous() or similar is necessary to resolve the read pointer to a
// discrete entry.
// https://github.com/systemd/systemd/pull/5930#issuecomment-300878104
// But that's not all! If there is no discrete entry to resolve the position to,
// the call to Next() or Previous() will just leave the read pointer in a
// conceptual position, or do something even more bizarre.
// https://github.com/systemd/systemd/issues/9934
// drainJournal reads and sends log messages from the journal.
//
// drainJournal returns errDrainDone when a terminal stopping condition has been
// reached: the watch consumer is gone, a log entry is read which has a
// timestamp after until (if until is nonzero), or the log driver is closed and
// the last message logged has been sent from the journal. If the end of the
// journal is reached without encountering a terminal stopping condition, a nil
// error is returned.
func (r *reader) drainJournal() error {
if !r.initialized {
defer func() {
r.signalReady()
r.initialized = true
}()
// initialSeekHead positions the journal read pointer at the earliest journal
// entry with a timestamp of at least r.config.Since. It returns true if there
// is an entry to read at the read pointer.
func (r *reader) initialSeekHead() (bool, error) {
var err error
if r.config.Since.IsZero() {
err = r.j.SeekHead()
} else {
err = r.j.SeekRealtime(r.config.Since)
}
if err != nil {
return false, err
}
return r.j.Next()
}
var (
err error
seekedToTail bool
)
if r.config.Tail >= 0 {
if r.config.Until.IsZero() {
err = r.j.SeekTail()
seekedToTail = true
} else {
err = r.j.SeekRealtime(r.config.Until)
}
} else {
if r.config.Since.IsZero() {
err = r.j.SeekHead()
} else {
err = r.j.SeekRealtime(r.config.Since)
}
}
if err != nil {
return err
}
// SeekTail() followed by Next() behaves incorrectly, so we need
// to work around the bug by ensuring the first discrete
// movement of the read pointer is Previous() or PreviousSkip().
// PreviousSkip() is called inside the loop when config.Tail > 0
// so the only special case requiring special handling is
// config.Tail == 0.
// https://github.com/systemd/systemd/issues/9934
if seekedToTail && r.config.Tail == 0 {
// Resolve the read pointer to the last entry in the
// journal so that the call to Next() inside the loop
// advances past it.
if ok, err := r.j.Previous(); err != nil || !ok {
return err
}
}
// initialSeekTail positions the journal read pointer at a journal entry
// relative to the tail of the journal at the time of the call based on the
// specification in r.config. It returns true if there is an entry to read at
// the read pointer. Otherwise the read pointer is set to a conceptual position
// which will be resolved to the desired entry (once written) by advancing
// forward with r.j.Next() or similar.
func (r *reader) initialSeekTail() (bool, error) {
var err error
if r.config.Until.IsZero() {
err = r.j.SeekTail()
} else {
err = r.j.SeekRealtime(r.config.Until)
}
if err != nil {
return false, err
}
for i := 0; ; i++ {
if !r.initialized && i == 0 && r.config.Tail > 0 {
if n, err := r.j.PreviousSkip(uint(r.config.Tail)); err != nil || n == 0 {
return err
var ok bool
if r.config.Tail == 0 {
ok, err = r.j.Previous()
} else {
var n int
n, err = r.j.PreviousSkip(uint(r.config.Tail))
ok = n > 0
}
if err != nil {
return ok, err
}
if !ok {
// The (filtered) journal has no entries. The tail is the head: all new
// entries which get written into the journal from this point forward
// should be read from the journal. However the read pointer is
// positioned at a conceptual position which is not condusive to reading
// those entries. The tail of the journal is resolved to the last entry
// in the journal _at the time of the first successful Previous() call_,
// which means that an arbitrary number of journal entries added in the
// interim may be skipped: race condition. While the realtime conceptual
// position is not so racy, it is also unhelpful: it is the timestamp
// past where reading should stop, so all logs that should be followed
// would be skipped over.
// Reset the read pointer position to avoid these problems.
return r.initialSeekHead()
} else if r.config.Tail == 0 {
// The journal read pointer is positioned at the discrete position of
// the journal entry _before_ the entry to send.
return r.j.Next()
}
// Check if the PreviousSkip went too far back.
timestamp, err := r.j.Realtime()
if err != nil {
return false, err
}
if timestamp.Before(r.config.Since) {
if err := r.j.SeekRealtime(r.config.Since); err != nil {
return false, err
}
return r.j.Next()
}
return true, nil
}
// wait blocks until the journal has new data to read, the reader's drain
// deadline is exceeded, or the log reading consumer is gone.
func (r *reader) wait() (bool, error) {
for {
dur := waitInterval
if !r.drainDeadline.IsZero() {
dur = time.Until(r.drainDeadline)
if dur < 0 {
// Container is gone but we haven't found the end of the
// logs before the deadline. Maybe it was dropped by
// journald, e.g. due to rate-limiting.
return false, nil
} else if dur > waitInterval {
dur = waitInterval
}
} else if ok, err := r.j.Next(); err != nil || !ok {
return err
}
if !r.initialized && i == 0 {
// The cursor is in a position which will be unaffected
// by subsequent logging.
r.signalReady()
status, err := r.j.Wait(dur)
if err != nil {
return false, err
} else if status != sdjournal.StatusNOP {
return true, nil
}
select {
case <-r.logWatcher.WatchConsumerGone():
return false, nil
case <-r.s.closed:
// Container is gone; don't wait indefinitely for journal entries that will never arrive.
if r.drainDeadline.IsZero() {
r.drainDeadline = time.Now().Add(closedDrainTimeout)
}
default:
}
}
}
// nextWait blocks until there is a new journal entry to read, and advances the
// journal read pointer to it.
func (r *reader) nextWait() (bool, error) {
for {
if ok, err := r.j.Next(); err != nil || ok {
return ok, err
}
if ok, err := r.wait(); err != nil || !ok {
return false, err
}
}
}
// drainJournal reads and sends log messages from the journal, starting from the
// current read pointer, until the end of the journal or a terminal stopping
// condition is reached.
//
// It returns false when a terminal stopping condition has been reached: the
// watch consumer is gone, a log entry is read which has a timestamp after until
// (if until is nonzero), or the log driver is closed and the last message
// logged has been sent from the journal.
func (r *reader) drainJournal() (bool, error) {
for i := 0; ; i++ {
// Read the entry's timestamp.
timestamp, err := r.j.Realtime()
if err != nil {
return err
}
// Check if the PreviousSkip went too far back. Check only the
// initial position as we are comparing wall-clock timestamps,
// which may not be monotonic. We don't want to skip over
// messages sent later in time just because the clock moved
// backwards.
if !r.initialized && i == 0 && r.config.Tail > 0 && timestamp.Before(r.config.Since) {
r.j.SeekRealtime(r.config.Since)
continue
return true, err
}
if !r.config.Until.IsZero() && r.config.Until.Before(timestamp) {
return errDrainDone
return false, nil
}
// Read and send the logged message, if there is one to read.
data, err := r.j.Data()
if err != nil {
return err
return true, err
}
if data[fieldLogEpoch] == r.s.epoch {
@ -228,7 +294,7 @@ func (r *reader) drainJournal() error {
*/
select {
case <-r.logWatcher.WatchConsumerGone():
return errDrainDone
return false, nil
case r.logWatcher.Msg <- msg:
}
}
@ -243,41 +309,28 @@ func (r *reader) drainJournal() error {
Warn("journald: error processing journal")
}
}
if ok, err := r.j.Next(); err != nil || !ok {
return true, err
}
}
}
func (r *reader) readJournal() error {
caughtUp := atomic.LoadUint64(&r.s.ordinal)
if err := r.drainJournal(); err != nil {
if err != errDrainDone {
return err
}
return nil
if more, err := r.drainJournal(); err != nil || !more {
return err
}
var drainTimeout <-chan time.Time
if !r.config.Follow {
if r.s.readSyncTimeout == 0 {
return nil
}
tmr := time.NewTimer(r.s.readSyncTimeout)
defer tmr.Stop()
drainTimeout = tmr.C
r.drainDeadline = time.Now().Add(r.s.readSyncTimeout)
}
for {
status, err := r.j.Wait(250 * time.Millisecond)
if err != nil {
return err
}
select {
case <-r.logWatcher.WatchConsumerGone():
return nil // won't be able to write anything anymore
case <-drainTimeout:
// Container is gone but we haven't found the end of the
// logs within the timeout. Maybe it was dropped by
// journald, e.g. due to rate-limiting.
return nil
case <-r.s.closed:
// container is gone, drain journal
lastSeq := atomic.LoadUint64(&r.s.ordinal)
@ -285,24 +338,14 @@ func (r *reader) readJournal() error {
// All caught up with the logger!
return nil
}
if drainTimeout == nil {
tmr := time.NewTimer(closedDrainTimeout)
defer tmr.Stop()
drainTimeout = tmr.C
}
default:
// container is still alive
if status == sdjournal.StatusNOP {
// no new data -- keep waiting
continue
}
}
err = r.drainJournal()
if err != nil {
if err != errDrainDone {
return err
}
return nil
if more, err := r.nextWait(); err != nil || !more {
return err
}
if more, err := r.drainJournal(); err != nil || !more {
return err
}
if !r.config.Follow && r.s.readSyncTimeout > 0 && r.maxOrdinal >= caughtUp {
return nil
@ -361,6 +404,33 @@ func (r *reader) readLogs() {
return
}
var ok bool
if r.config.Tail >= 0 {
ok, err = r.initialSeekTail()
} else {
ok, err = r.initialSeekHead()
}
if err != nil {
r.logWatcher.Err <- err
return
}
r.signalReady()
if !ok {
if !r.config.Follow {
return
}
// Either the read pointer is positioned at a discrete journal entry, in
// which case the position will be unaffected by subsequent logging, or
// the read pointer is in the conceptual position corresponding to the
// first journal entry to send once it is logged in the future.
if more, err := r.nextWait(); err != nil || !more {
if err != nil {
r.logWatcher.Err <- err
}
return
}
}
if err := r.readJournal(); err != nil {
r.logWatcher.Err <- err
return

View file

@ -1,6 +1,7 @@
package loggertest // import "github.com/docker/docker/daemon/logger/loggertest"
import (
"fmt"
"runtime"
"strings"
"sync"
@ -194,28 +195,31 @@ func (tr Reader) testTailEmptyLogs(t *testing.T, live bool) {
func (tr Reader) TestFollow(t *testing.T) {
// Reader sends all logs and closes after logger is closed
// - Starting from empty log (like run)
t.Run("FromEmptyLog", func(t *testing.T) {
t.Parallel()
l := tr.Factory(t, logger.Info{
ContainerID: "followstart0",
ContainerName: "logloglog",
})(t)
lw := l.(logger.LogReader).ReadLogs(logger.ReadConfig{Tail: -1, Follow: true})
defer lw.ConsumerGone()
for i, tail := range []int{-1, 0, 1, 42} {
i, tail := i, tail
t.Run(fmt.Sprintf("FromEmptyLog/Tail=%d", tail), func(t *testing.T) {
t.Parallel()
l := tr.Factory(t, logger.Info{
ContainerID: fmt.Sprintf("followstart%d", i),
ContainerName: fmt.Sprintf("logloglog%d", i),
})(t)
lw := l.(logger.LogReader).ReadLogs(logger.ReadConfig{Tail: tail, Follow: true})
defer lw.ConsumerGone()
doneReading := make(chan struct{})
var logs []*logger.Message
go func() {
defer close(doneReading)
logs = readAll(t, lw)
}()
doneReading := make(chan struct{})
var logs []*logger.Message
go func() {
defer close(doneReading)
logs = readAll(t, lw)
}()
mm := makeTestMessages()
expected := logMessages(t, l, mm)
assert.NilError(t, l.Close())
<-doneReading
assert.DeepEqual(t, logs, expected, compareLog)
})
mm := makeTestMessages()
expected := logMessages(t, l, mm)
assert.NilError(t, l.Close())
<-doneReading
assert.DeepEqual(t, logs, expected, compareLog)
})
}
t.Run("AttachMidStream", func(t *testing.T) {
t.Parallel()