|
@@ -17,26 +17,39 @@ import (
|
|
|
"github.com/docker/docker/daemon/logger/journald/internal/sdjournal"
|
|
|
)
|
|
|
|
|
|
+// Fields which we know are not user-provided attribute fields.
|
|
|
var wellKnownFields = map[string]bool{
|
|
|
- "MESSAGE": true,
|
|
|
- "MESSAGE_ID": true,
|
|
|
- "PRIORITY": true,
|
|
|
- "CODE_FILE": true,
|
|
|
- "CODE_LINE": true,
|
|
|
- "CODE_FUNC": true,
|
|
|
- "ERRNO": true,
|
|
|
- "SYSLOG_FACILITY": true,
|
|
|
- "SYSLOG_IDENTIFIER": true,
|
|
|
- "SYSLOG_PID": true,
|
|
|
- "CONTAINER_NAME": true,
|
|
|
- "CONTAINER_ID": true,
|
|
|
- "CONTAINER_ID_FULL": true,
|
|
|
- "CONTAINER_TAG": true,
|
|
|
+ "MESSAGE": true,
|
|
|
+ "MESSAGE_ID": true,
|
|
|
+ "PRIORITY": true,
|
|
|
+ "CODE_FILE": true,
|
|
|
+ "CODE_LINE": true,
|
|
|
+ "CODE_FUNC": true,
|
|
|
+ "ERRNO": true,
|
|
|
+ "SYSLOG_FACILITY": true,
|
|
|
+ fieldSyslogIdentifier: true,
|
|
|
+ "SYSLOG_PID": true,
|
|
|
+ fieldSyslogTimestamp: true,
|
|
|
+ fieldContainerName: true,
|
|
|
+ fieldContainerID: true,
|
|
|
+ fieldContainerIDFull: true,
|
|
|
+ fieldContainerTag: true,
|
|
|
+ fieldImageName: true,
|
|
|
+ fieldPLogID: true,
|
|
|
+ fieldPLogOrdinal: true,
|
|
|
+ fieldPLogLast: true,
|
|
|
+ fieldPartialMessage: true,
|
|
|
}
|
|
|
|
|
|
-func getMessage(d map[string]string) (line []byte, partial, ok bool) {
|
|
|
+func getMessage(d map[string]string) (line []byte, ok bool) {
|
|
|
m, ok := d["MESSAGE"]
|
|
|
- return []byte(m), d["CONTAINER_PARTIAL_MESSAGE"] == "true", ok
|
|
|
+ if ok {
|
|
|
+ line = []byte(m)
|
|
|
+ if d[fieldPartialMessage] != "true" {
|
|
|
+ line = append(line, "\n"...)
|
|
|
+ }
|
|
|
+ }
|
|
|
+ return line, ok
|
|
|
}
|
|
|
|
|
|
func getPriority(d map[string]string) (journal.Priority, bool) {
|
|
@@ -47,106 +60,193 @@ func getPriority(d map[string]string) (journal.Priority, bool) {
|
|
|
return -1, false
|
|
|
}
|
|
|
|
|
|
-func (s *journald) drainJournal(logWatcher *logger.LogWatcher, j *sdjournal.Journal, oldCursor *sdjournal.Cursor, until time.Time) (*sdjournal.Cursor, bool, int) {
|
|
|
- var (
|
|
|
- done bool
|
|
|
- shown int
|
|
|
- )
|
|
|
-
|
|
|
- // Walk the journal from here forward until we run out of new entries
|
|
|
- // or we reach the until value (if provided).
|
|
|
-drain:
|
|
|
- for ok := true; ok; ok, _ = j.Next() {
|
|
|
- // Try not to send a given entry twice.
|
|
|
- if oldCursor != nil {
|
|
|
- if ok, _ := j.TestCursor(oldCursor); ok {
|
|
|
- if ok, _ := j.Next(); !ok {
|
|
|
- break drain
|
|
|
- }
|
|
|
- }
|
|
|
+// getSource recovers the stream name from the entry data by mapping from the
|
|
|
+// journal priority field back to the stream that we would have assigned that
|
|
|
+// value.
|
|
|
+func getSource(d map[string]string) string {
|
|
|
+ source := ""
|
|
|
+ if priority, ok := getPriority(d); ok {
|
|
|
+ if priority == journal.PriErr {
|
|
|
+ source = "stderr"
|
|
|
+ } else if priority == journal.PriInfo {
|
|
|
+ source = "stdout"
|
|
|
}
|
|
|
- // Read and send the logged message, if there is one to read.
|
|
|
- data, err := j.Data()
|
|
|
- if errors.Is(err, sdjournal.ErrInvalidReadPointer) {
|
|
|
- continue
|
|
|
+ }
|
|
|
+ return source
|
|
|
+}
|
|
|
+
|
|
|
+func getAttrs(d map[string]string) []backend.LogAttr {
|
|
|
+ var attrs []backend.LogAttr
|
|
|
+ for k, v := range d {
|
|
|
+ if k[0] != '_' && !wellKnownFields[k] {
|
|
|
+ attrs = append(attrs, backend.LogAttr{Key: k, Value: v})
|
|
|
}
|
|
|
- if line, partial, ok := getMessage(data); ok {
|
|
|
- // Read the entry's timestamp.
|
|
|
- timestamp, err := j.Realtime()
|
|
|
- if err != nil {
|
|
|
- break
|
|
|
+ }
|
|
|
+ return attrs
|
|
|
+}
|
|
|
+
|
|
|
+// errDrainDone is the error returned by drainJournal to signal that there are
|
|
|
+// no more log entries to send to the log watcher.
|
|
|
+var errDrainDone = errors.New("journald drain done")
|
|
|
+
|
|
|
+// drainJournal reads and sends log messages from the journal. It returns the
|
|
|
+// number of log messages sent and any error encountered. When initial != nil
|
|
|
+// it initializes the journal read position to the position specified by config
|
|
|
+// before reading. Otherwise it continues to read from the current position.
|
|
|
+//
|
|
|
+// drainJournal returns err == errDrainDone when a terminal stopping condition
|
|
|
+// has been reached: either the watch consumer is gone or a log entry is read
|
|
|
+// which has a timestamp after until (if until is nonzero). If the end of the
|
|
|
+// journal is reached without encountering a terminal stopping condition,
|
|
|
+// err == nil is returned.
|
|
|
+func (s *journald) drainJournal(logWatcher *logger.LogWatcher, j *sdjournal.Journal, config logger.ReadConfig, initial chan struct{}) (int, error) {
|
|
|
+ if initial != nil {
|
|
|
+ defer func() {
|
|
|
+ if initial != nil {
|
|
|
+ close(initial)
|
|
|
}
|
|
|
- // Break if the timestamp exceeds any provided until flag.
|
|
|
- if !until.IsZero() && until.Before(timestamp) {
|
|
|
- done = true
|
|
|
- break
|
|
|
+ }()
|
|
|
+
|
|
|
+ var (
|
|
|
+ err error
|
|
|
+ seekedToTail bool
|
|
|
+ )
|
|
|
+ if config.Tail >= 0 {
|
|
|
+ if config.Until.IsZero() {
|
|
|
+ err = j.SeekTail()
|
|
|
+ seekedToTail = true
|
|
|
+ } else {
|
|
|
+ err = j.SeekRealtime(config.Until)
|
|
|
+ }
|
|
|
+ } else {
|
|
|
+ if config.Since.IsZero() {
|
|
|
+ err = j.SeekHead()
|
|
|
+ } else {
|
|
|
+ err = j.SeekRealtime(config.Since)
|
|
|
}
|
|
|
+ }
|
|
|
+ if err != nil {
|
|
|
+ return 0, err
|
|
|
+ }
|
|
|
|
|
|
- // Set up the text of the entry.
|
|
|
- if !partial {
|
|
|
- line = append(line, "\n"...)
|
|
|
+ // SeekTail() followed by Next() behaves incorrectly, so we need
|
|
|
+ // to work around the bug by ensuring the first discrete
|
|
|
+ // movement of the read pointer is Previous() or PreviousSkip().
|
|
|
+ // PreviousSkip() is called inside the loop when config.Tail > 0
|
|
|
+ // so the only special case requiring special handling is
|
|
|
+ // config.Tail == 0.
|
|
|
+ // https://github.com/systemd/systemd/issues/9934
|
|
|
+ if seekedToTail && config.Tail == 0 {
|
|
|
+ // Resolve the read pointer to the last entry in the
|
|
|
+ // journal so that the call to Next() inside the loop
|
|
|
+ // advances past it.
|
|
|
+ if ok, err := j.Previous(); err != nil || !ok {
|
|
|
+ return 0, err
|
|
|
}
|
|
|
- // Recover the stream name by mapping
|
|
|
- // from the journal priority back to
|
|
|
- // the stream that we would have
|
|
|
- // assigned that value.
|
|
|
- source := ""
|
|
|
- if priority, ok := getPriority(data); ok {
|
|
|
- if priority == journal.PriErr {
|
|
|
- source = "stderr"
|
|
|
- } else if priority == journal.PriInfo {
|
|
|
- source = "stdout"
|
|
|
- }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ var sent int
|
|
|
+ for i := 0; ; i++ {
|
|
|
+ if initial != nil && i == 0 && config.Tail > 0 {
|
|
|
+ if n, err := j.PreviousSkip(uint(config.Tail)); err != nil || n == 0 {
|
|
|
+ return sent, err
|
|
|
}
|
|
|
- // Retrieve the values of any variables we're adding to the journal.
|
|
|
- var attrs []backend.LogAttr
|
|
|
- for k, v := range data {
|
|
|
- if k[0] != '_' && !wellKnownFields[k] {
|
|
|
- attrs = append(attrs, backend.LogAttr{Key: k, Value: v})
|
|
|
- }
|
|
|
+ } else if ok, err := j.Next(); err != nil || !ok {
|
|
|
+ return sent, err
|
|
|
+ }
|
|
|
+
|
|
|
+ if initial != nil && i == 0 {
|
|
|
+ // The cursor is in position. Signal that the watcher is
|
|
|
+ // initialized.
|
|
|
+ close(initial)
|
|
|
+ initial = nil // Prevent double-closing.
|
|
|
+ }
|
|
|
+
|
|
|
+ // Read the entry's timestamp.
|
|
|
+ timestamp, err := j.Realtime()
|
|
|
+ if err != nil {
|
|
|
+ return sent, err
|
|
|
+ }
|
|
|
+ if timestamp.Before(config.Since) {
|
|
|
+ if initial != nil && i == 0 && config.Tail > 0 {
|
|
|
+ // PreviousSkip went too far back. Seek forwards.
|
|
|
+ j.SeekRealtime(config.Since)
|
|
|
}
|
|
|
+ continue
|
|
|
+ }
|
|
|
+ if !config.Until.IsZero() && config.Until.Before(timestamp) {
|
|
|
+ return sent, errDrainDone
|
|
|
+ }
|
|
|
|
|
|
+ // Read and send the logged message, if there is one to read.
|
|
|
+ data, err := j.Data()
|
|
|
+ if err != nil {
|
|
|
+ return sent, err
|
|
|
+ }
|
|
|
+ if line, ok := getMessage(data); ok {
|
|
|
// Send the log message, unless the consumer is gone
|
|
|
- select {
|
|
|
- case <-logWatcher.WatchConsumerGone():
|
|
|
- done = true // we won't be able to write anything anymore
|
|
|
- break drain
|
|
|
- case logWatcher.Msg <- &logger.Message{
|
|
|
+ msg := &logger.Message{
|
|
|
Line: line,
|
|
|
- Source: source,
|
|
|
+ Source: getSource(data),
|
|
|
Timestamp: timestamp.In(time.UTC),
|
|
|
- Attrs: attrs,
|
|
|
- }:
|
|
|
- shown++
|
|
|
+ Attrs: getAttrs(data),
|
|
|
}
|
|
|
- // Call sd_journal_process() periodically during the processing loop
|
|
|
- // to close any opened file descriptors for rotated (deleted) journal files.
|
|
|
- if shown%1024 == 0 {
|
|
|
- if _, err := j.Process(); err != nil {
|
|
|
- // log a warning but ignore it for now
|
|
|
- logrus.WithField("container", s.vars["CONTAINER_ID_FULL"]).
|
|
|
- WithField("error", err).
|
|
|
- Warn("journald: error processing journal")
|
|
|
+ // The daemon timestamp will differ from the "trusted"
|
|
|
+ // timestamp of when the event was received by journald.
|
|
|
+ // We can efficiently seek around the journal by the
|
|
|
+ // event timestamp, and the event timestamp is what
|
|
|
+ // journalctl displays. The daemon timestamp is just an
|
|
|
+ // application-supplied field with no special
|
|
|
+ // significance; libsystemd won't help us seek to the
|
|
|
+ // entry with the closest timestamp.
|
|
|
+ /*
|
|
|
+ if sts := data["SYSLOG_TIMESTAMP"]; sts != "" {
|
|
|
+ if tv, err := time.Parse(time.RFC3339Nano, sts); err == nil {
|
|
|
+ msg.Timestamp = tv
|
|
|
+ }
|
|
|
}
|
|
|
+ */
|
|
|
+ select {
|
|
|
+ case <-logWatcher.WatchConsumerGone():
|
|
|
+ return sent, errDrainDone
|
|
|
+ case logWatcher.Msg <- msg:
|
|
|
+ sent++
|
|
|
}
|
|
|
}
|
|
|
- }
|
|
|
|
|
|
- cursor, _ := j.Cursor()
|
|
|
- return cursor, done, shown
|
|
|
+ // Call sd_journal_process() periodically during the processing loop
|
|
|
+ // to close any opened file descriptors for rotated (deleted) journal files.
|
|
|
+ if i != 0 && i%1024 == 0 {
|
|
|
+ if _, err := j.Process(); err != nil {
|
|
|
+ // log a warning but ignore it for now
|
|
|
+ logrus.WithField("container", s.vars[fieldContainerIDFull]).
|
|
|
+ WithField("error", err).
|
|
|
+ Warn("journald: error processing journal")
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
-func (s *journald) followJournal(logWatcher *logger.LogWatcher, j *sdjournal.Journal, cursor *sdjournal.Cursor, until time.Time) *sdjournal.Cursor {
|
|
|
-LOOP:
|
|
|
+func (s *journald) readJournal(logWatcher *logger.LogWatcher, j *sdjournal.Journal, config logger.ReadConfig, ready chan struct{}) error {
|
|
|
+ if _, err := s.drainJournal(logWatcher, j, config, ready /* initial */); err != nil {
|
|
|
+ if err != errDrainDone {
|
|
|
+ return err
|
|
|
+ }
|
|
|
+ return nil
|
|
|
+ }
|
|
|
+ if !config.Follow {
|
|
|
+ return nil
|
|
|
+ }
|
|
|
+
|
|
|
for {
|
|
|
status, err := j.Wait(250 * time.Millisecond)
|
|
|
if err != nil {
|
|
|
- logWatcher.Err <- err
|
|
|
- break
|
|
|
+ return err
|
|
|
}
|
|
|
select {
|
|
|
case <-logWatcher.WatchConsumerGone():
|
|
|
- break LOOP // won't be able to write anything anymore
|
|
|
+ return nil // won't be able to write anything anymore
|
|
|
case <-s.closed:
|
|
|
// container is gone, drain journal
|
|
|
default:
|
|
@@ -156,20 +256,31 @@ LOOP:
|
|
|
continue
|
|
|
}
|
|
|
}
|
|
|
- newCursor, done, recv := s.drainJournal(logWatcher, j, cursor, until)
|
|
|
- cursor.Free()
|
|
|
- cursor = newCursor
|
|
|
- if done || (status == sdjournal.StatusNOP && recv == 0) {
|
|
|
- break
|
|
|
+ n, err := s.drainJournal(logWatcher, j, config, nil /* initial */)
|
|
|
+ if err != nil {
|
|
|
+ if err != errDrainDone {
|
|
|
+ return err
|
|
|
+ }
|
|
|
+ return nil
|
|
|
+ } else if status == sdjournal.StatusNOP && n == 0 {
|
|
|
+ return nil
|
|
|
}
|
|
|
}
|
|
|
-
|
|
|
- return cursor
|
|
|
}
|
|
|
|
|
|
-func (s *journald) readLogs(logWatcher *logger.LogWatcher, config logger.ReadConfig) {
|
|
|
+func (s *journald) readLogs(logWatcher *logger.LogWatcher, config logger.ReadConfig, ready chan struct{}) {
|
|
|
defer close(logWatcher.Msg)
|
|
|
|
|
|
+ // Make sure the ready channel is closed in the event of an early
|
|
|
+ // return.
|
|
|
+ defer func() {
|
|
|
+ select {
|
|
|
+ case <-ready:
|
|
|
+ default:
|
|
|
+ close(ready)
|
|
|
+ }
|
|
|
+ }()
|
|
|
+
|
|
|
// Quoting https://www.freedesktop.org/software/systemd/man/sd-journal.html:
|
|
|
// Functions that operate on sd_journal objects are thread
|
|
|
// agnostic — given sd_journal pointer may only be used from one
|
|
@@ -183,7 +294,15 @@ func (s *journald) readLogs(logWatcher *logger.LogWatcher, config logger.ReadCon
|
|
|
defer runtime.UnlockOSThread()
|
|
|
|
|
|
// Get a handle to the journal.
|
|
|
- j, err := sdjournal.Open(0)
|
|
|
+ var (
|
|
|
+ j *sdjournal.Journal
|
|
|
+ err error
|
|
|
+ )
|
|
|
+ if s.journalReadDir != "" {
|
|
|
+ j, err = sdjournal.OpenDir(s.journalReadDir, 0)
|
|
|
+ } else {
|
|
|
+ j, err = sdjournal.Open(0)
|
|
|
+ }
|
|
|
if err != nil {
|
|
|
logWatcher.Err <- err
|
|
|
return
|
|
@@ -204,61 +323,23 @@ func (s *journald) readLogs(logWatcher *logger.LogWatcher, config logger.ReadCon
|
|
|
return
|
|
|
}
|
|
|
// Add a match to have the library do the searching for us.
|
|
|
- if err := j.AddMatch("CONTAINER_ID_FULL", s.vars["CONTAINER_ID_FULL"]); err != nil {
|
|
|
+ if err := j.AddMatch(fieldContainerIDFull, s.vars[fieldContainerIDFull]); err != nil {
|
|
|
logWatcher.Err <- err
|
|
|
return
|
|
|
}
|
|
|
- if config.Tail >= 0 {
|
|
|
- // If until time provided, start from there.
|
|
|
- // Otherwise start at the end of the journal.
|
|
|
- if !config.Until.IsZero() {
|
|
|
- if err := j.SeekRealtime(config.Until); err != nil {
|
|
|
- logWatcher.Err <- err
|
|
|
- return
|
|
|
- }
|
|
|
- } else if err := j.SeekTail(); err != nil {
|
|
|
- logWatcher.Err <- err
|
|
|
- return
|
|
|
- }
|
|
|
- // (Try to) skip backwards by the requested number of lines...
|
|
|
- if _, err := j.PreviousSkip(uint(config.Tail)); err == nil {
|
|
|
- // ...but not before "since"
|
|
|
- if !config.Since.IsZero() {
|
|
|
- if stamp, err := j.Realtime(); err == nil && stamp.Before(config.Since) {
|
|
|
- _ = j.SeekRealtime(config.Since)
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
- } else {
|
|
|
- // Start at the beginning of the journal.
|
|
|
- if err := j.SeekHead(); err != nil {
|
|
|
- logWatcher.Err <- err
|
|
|
- return
|
|
|
- }
|
|
|
- // If we have a cutoff date, fast-forward to it.
|
|
|
- if !config.Since.IsZero() {
|
|
|
- if err := j.SeekRealtime(config.Since); err != nil {
|
|
|
- logWatcher.Err <- err
|
|
|
- return
|
|
|
- }
|
|
|
- }
|
|
|
- if _, err := j.Next(); err != nil {
|
|
|
- logWatcher.Err <- err
|
|
|
- return
|
|
|
- }
|
|
|
- }
|
|
|
- var cursor *sdjournal.Cursor
|
|
|
- if config.Tail != 0 { // special case for --tail 0
|
|
|
- cursor, _, _ = s.drainJournal(logWatcher, j, nil, config.Until)
|
|
|
- }
|
|
|
- if config.Follow {
|
|
|
- cursor = s.followJournal(logWatcher, j, cursor, config.Until)
|
|
|
+
|
|
|
+ if err := s.readJournal(logWatcher, j, config, ready); err != nil {
|
|
|
+ logWatcher.Err <- err
|
|
|
+ return
|
|
|
}
|
|
|
- cursor.Free()
|
|
|
}
|
|
|
|
|
|
func (s *journald) ReadLogs(config logger.ReadConfig) *logger.LogWatcher {
|
|
|
logWatcher := logger.NewLogWatcher()
|
|
|
- go s.readLogs(logWatcher, config)
|
|
|
+ ready := make(chan struct{})
|
|
|
+ go s.readLogs(logWatcher, config, ready)
|
|
|
+ // Block until the reader is in position to read from the current config
|
|
|
+ // location to prevent race conditions in tests.
|
|
|
+ <-ready
|
|
|
return logWatcher
|
|
|
}
|