moby/daemon/logger/journald/read.go
Brian Goff 74da6a6363 Switch all logging to use containerd log pkg
This unifies our logging and allows us to propagate logging and trace
contexts together.

Signed-off-by: Brian Goff <cpuguy83@gmail.com>
2023-06-24 00:23:44 +00:00

458 lines
12 KiB
Go

//go:build linux && cgo && !static_build && journald
package journald // import "github.com/docker/docker/daemon/logger/journald"
import (
"context"
"errors"
"runtime"
"strconv"
"sync/atomic"
"time"
"github.com/containerd/containerd/log"
"github.com/coreos/go-systemd/v22/journal"
"github.com/docker/docker/api/types/backend"
"github.com/docker/docker/daemon/logger"
"github.com/docker/docker/daemon/logger/journald/internal/sdjournal"
)
const closedDrainTimeout = 5 * time.Second
// Fields which we know are not user-provided attribute fields.
var wellKnownFields = map[string]bool{
"MESSAGE": true,
"MESSAGE_ID": true,
"PRIORITY": true,
"CODE_FILE": true,
"CODE_LINE": true,
"CODE_FUNC": true,
"ERRNO": true,
"SYSLOG_FACILITY": true,
fieldSyslogIdentifier: true,
"SYSLOG_PID": true,
fieldSyslogTimestamp: true,
fieldContainerName: true,
fieldContainerID: true,
fieldContainerIDFull: true,
fieldContainerTag: true,
fieldImageName: true,
fieldPLogID: true,
fieldPLogOrdinal: true,
fieldPLogLast: true,
fieldPartialMessage: true,
fieldLogEpoch: true,
fieldLogOrdinal: true,
}
type reader struct {
s *journald
j *sdjournal.Journal
logWatcher *logger.LogWatcher
config logger.ReadConfig
maxOrdinal uint64
initialized bool
ready chan struct{}
}
func getMessage(d map[string]string) (line []byte, ok bool) {
m, ok := d["MESSAGE"]
if ok {
line = []byte(m)
if d[fieldPartialMessage] != "true" {
line = append(line, "\n"...)
}
}
return line, ok
}
func getPriority(d map[string]string) (journal.Priority, bool) {
if pri, ok := d["PRIORITY"]; ok {
i, err := strconv.Atoi(pri)
return journal.Priority(i), err == nil
}
return -1, false
}
// getSource recovers the stream name from the entry data by mapping from the
// journal priority field back to the stream that we would have assigned that
// value.
func getSource(d map[string]string) string {
source := ""
if priority, ok := getPriority(d); ok {
if priority == journal.PriErr {
source = "stderr"
} else if priority == journal.PriInfo {
source = "stdout"
}
}
return source
}
func getAttrs(d map[string]string) []backend.LogAttr {
var attrs []backend.LogAttr
for k, v := range d {
if k[0] != '_' && !wellKnownFields[k] {
attrs = append(attrs, backend.LogAttr{Key: k, Value: v})
}
}
return attrs
}
// errDrainDone is the error returned by drainJournal to signal that there are
// no more log entries to send to the log watcher.
var errDrainDone = errors.New("journald drain done")
// drainJournal reads and sends log messages from the journal.
//
// drainJournal returns errDrainDone when a terminal stopping condition has been
// reached: the watch consumer is gone, a log entry is read which has a
// timestamp after until (if until is nonzero), or the log driver is closed and
// the last message logged has been sent from the journal. If the end of the
// journal is reached without encountering a terminal stopping condition, a nil
// error is returned.
func (r *reader) drainJournal() error {
if !r.initialized {
defer func() {
r.signalReady()
r.initialized = true
}()
var (
err error
seekedToTail bool
)
if r.config.Tail >= 0 {
if r.config.Until.IsZero() {
err = r.j.SeekTail()
seekedToTail = true
} else {
err = r.j.SeekRealtime(r.config.Until)
}
} else {
if r.config.Since.IsZero() {
err = r.j.SeekHead()
} else {
err = r.j.SeekRealtime(r.config.Since)
}
}
if err != nil {
return err
}
// SeekTail() followed by Next() behaves incorrectly, so we need
// to work around the bug by ensuring the first discrete
// movement of the read pointer is Previous() or PreviousSkip().
// PreviousSkip() is called inside the loop when config.Tail > 0
// so the only special case requiring special handling is
// config.Tail == 0.
// https://github.com/systemd/systemd/issues/9934
if seekedToTail && r.config.Tail == 0 {
// Resolve the read pointer to the last entry in the
// journal so that the call to Next() inside the loop
// advances past it.
if ok, err := r.j.Previous(); err != nil || !ok {
return err
}
}
}
for i := 0; ; i++ {
if !r.initialized && i == 0 && r.config.Tail > 0 {
if n, err := r.j.PreviousSkip(uint(r.config.Tail)); err != nil || n == 0 {
return err
}
} else if ok, err := r.j.Next(); err != nil || !ok {
return err
}
if !r.initialized && i == 0 {
// The cursor is in a position which will be unaffected
// by subsequent logging.
r.signalReady()
}
// Read the entry's timestamp.
timestamp, err := r.j.Realtime()
if err != nil {
return err
}
// Check if the PreviousSkip went too far back. Check only the
// initial position as we are comparing wall-clock timestamps,
// which may not be monotonic. We don't want to skip over
// messages sent later in time just because the clock moved
// backwards.
if !r.initialized && i == 0 && r.config.Tail > 0 && timestamp.Before(r.config.Since) {
r.j.SeekRealtime(r.config.Since)
continue
}
if !r.config.Until.IsZero() && r.config.Until.Before(timestamp) {
return errDrainDone
}
// Read and send the logged message, if there is one to read.
data, err := r.j.Data()
if err != nil {
return err
}
if data[fieldLogEpoch] == r.s.epoch {
seq, err := strconv.ParseUint(data[fieldLogOrdinal], 10, 64)
if err == nil && seq > r.maxOrdinal {
r.maxOrdinal = seq
}
}
if line, ok := getMessage(data); ok {
// Send the log message, unless the consumer is gone
msg := &logger.Message{
Line: line,
Source: getSource(data),
Timestamp: timestamp.In(time.UTC),
Attrs: getAttrs(data),
}
// The daemon timestamp will differ from the "trusted"
// timestamp of when the event was received by journald.
// We can efficiently seek around the journal by the
// event timestamp, and the event timestamp is what
// journalctl displays. The daemon timestamp is just an
// application-supplied field with no special
// significance; libsystemd won't help us seek to the
// entry with the closest timestamp.
/*
if sts := data["SYSLOG_TIMESTAMP"]; sts != "" {
if tv, err := time.Parse(time.RFC3339Nano, sts); err == nil {
msg.Timestamp = tv
}
}
*/
select {
case <-r.logWatcher.WatchConsumerGone():
return errDrainDone
case r.logWatcher.Msg <- msg:
}
}
// Call sd_journal_process() periodically during the processing loop
// to close any opened file descriptors for rotated (deleted) journal files.
if i != 0 && i%1024 == 0 {
if _, err := r.j.Process(); err != nil {
// log a warning but ignore it for now
log.G(context.TODO()).WithField("container", r.s.vars[fieldContainerIDFull]).
WithField("error", err).
Warn("journald: error processing journal")
}
}
}
}
func (r *reader) readJournal() error {
caughtUp := atomic.LoadUint64(&r.s.ordinal)
if err := r.drainJournal(); err != nil {
if err != errDrainDone {
return err
}
return nil
}
var drainTimeout <-chan time.Time
if !r.config.Follow {
if r.s.readSyncTimeout == 0 {
return nil
}
tmr := time.NewTimer(r.s.readSyncTimeout)
defer tmr.Stop()
drainTimeout = tmr.C
}
for {
status, err := r.j.Wait(250 * time.Millisecond)
if err != nil {
return err
}
select {
case <-r.logWatcher.WatchConsumerGone():
return nil // won't be able to write anything anymore
case <-drainTimeout:
// Container is gone but we haven't found the end of the
// logs within the timeout. Maybe it was dropped by
// journald, e.g. due to rate-limiting.
return nil
case <-r.s.closed:
// container is gone, drain journal
lastSeq := atomic.LoadUint64(&r.s.ordinal)
if r.maxOrdinal >= lastSeq {
// All caught up with the logger!
return nil
}
if drainTimeout == nil {
tmr := time.NewTimer(closedDrainTimeout)
defer tmr.Stop()
drainTimeout = tmr.C
}
default:
// container is still alive
if status == sdjournal.StatusNOP {
// no new data -- keep waiting
continue
}
}
err = r.drainJournal()
if err != nil {
if err != errDrainDone {
return err
}
return nil
}
if !r.config.Follow && r.s.readSyncTimeout > 0 && r.maxOrdinal >= caughtUp {
return nil
}
}
}
func (r *reader) readLogs() {
defer close(r.logWatcher.Msg)
// Make sure the ready channel is closed in the event of an early
// return.
defer r.signalReady()
// Quoting https://www.freedesktop.org/software/systemd/man/sd-journal.html:
// Functions that operate on sd_journal objects are thread
// agnostic — given sd_journal pointer may only be used from one
// specific thread at all times (and it has to be the very same one
// during the entire lifetime of the object), but multiple,
// independent threads may use multiple, independent objects safely.
//
// sdjournal.Open returns a value which wraps an sd_journal pointer so
// we need to abide by those rules.
runtime.LockOSThread()
defer runtime.UnlockOSThread()
// Get a handle to the journal.
var err error
if r.s.journalReadDir != "" {
r.j, err = sdjournal.OpenDir(r.s.journalReadDir, 0)
} else {
r.j, err = sdjournal.Open(0)
}
if err != nil {
r.logWatcher.Err <- err
return
}
defer r.j.Close()
if r.config.Follow {
// Initialize library inotify watches early
if err := r.j.InitializeInotify(); err != nil {
r.logWatcher.Err <- err
return
}
}
// Remove limits on the size of data items that we'll retrieve.
if err := r.j.SetDataThreshold(0); err != nil {
r.logWatcher.Err <- err
return
}
// Add a match to have the library do the searching for us.
if err := r.j.AddMatch(fieldContainerIDFull, r.s.vars[fieldContainerIDFull]); err != nil {
r.logWatcher.Err <- err
return
}
if err := r.readJournal(); err != nil {
r.logWatcher.Err <- err
return
}
}
func (r *reader) signalReady() {
select {
case <-r.ready:
default:
close(r.ready)
}
}
func (s *journald) ReadLogs(config logger.ReadConfig) *logger.LogWatcher {
r := &reader{
s: s,
logWatcher: logger.NewLogWatcher(),
config: config,
ready: make(chan struct{}),
}
go r.readLogs()
// Block until the reader is in position to read from the current config
// location to prevent race conditions in tests.
<-r.ready
return r.logWatcher
}
func waitUntilFlushedImpl(s *journald) error {
if s.readSyncTimeout == 0 {
return nil
}
ordinal := atomic.LoadUint64(&s.ordinal)
if ordinal == 0 {
return nil // No logs were sent; nothing to wait for.
}
flushed := make(chan error)
go func() {
defer close(flushed)
runtime.LockOSThread()
defer runtime.UnlockOSThread()
var (
j *sdjournal.Journal
err error
)
if s.journalReadDir != "" {
j, err = sdjournal.OpenDir(s.journalReadDir, 0)
} else {
j, err = sdjournal.Open(0)
}
if err != nil {
flushed <- err
return
}
defer j.Close()
if err := j.AddMatch(fieldContainerIDFull, s.vars[fieldContainerIDFull]); err != nil {
flushed <- err
return
}
if err := j.AddMatch(fieldLogEpoch, s.epoch); err != nil {
flushed <- err
return
}
if err := j.AddMatch(fieldLogOrdinal, strconv.FormatUint(ordinal, 10)); err != nil {
flushed <- err
return
}
deadline := time.Now().Add(s.readSyncTimeout)
for time.Now().Before(deadline) {
if ok, err := j.Next(); ok {
// Found it!
return
} else if err != nil {
flushed <- err
return
}
if _, err := j.Wait(100 * time.Millisecond); err != nil {
flushed <- err
return
}
}
log.G(context.TODO()).WithField("container", s.vars[fieldContainerIDFull]).
Warn("journald: deadline exceeded waiting for logs to be committed to journal")
}()
return <-flushed
}
func init() {
waitUntilFlushed = waitUntilFlushedImpl
}