|
@@ -3,369 +3,456 @@
|
|
|
|
|
|
package journald // import "github.com/docker/docker/daemon/logger/journald"
|
|
|
|
|
|
-// #include <sys/types.h>
|
|
|
-// #include <sys/poll.h>
|
|
|
-// #include <systemd/sd-journal.h>
|
|
|
-// #include <errno.h>
|
|
|
-// #include <stdio.h>
|
|
|
-// #include <stdlib.h>
|
|
|
-// #include <string.h>
|
|
|
-// #include <time.h>
|
|
|
-// #include <unistd.h>
|
|
|
-//
|
|
|
-//static int get_message(sd_journal *j, const char **msg, size_t *length, int *partial)
|
|
|
-//{
|
|
|
-// int rc;
|
|
|
-// size_t plength;
|
|
|
-// *msg = NULL;
|
|
|
-// *length = 0;
|
|
|
-// plength = strlen("CONTAINER_PARTIAL_MESSAGE=true");
|
|
|
-// rc = sd_journal_get_data(j, "CONTAINER_PARTIAL_MESSAGE", (const void **) msg, length);
|
|
|
-// *partial = ((rc == 0) && (*length == plength) && (memcmp(*msg, "CONTAINER_PARTIAL_MESSAGE=true", plength) == 0));
|
|
|
-// rc = sd_journal_get_data(j, "MESSAGE", (const void **) msg, length);
|
|
|
-// if (rc == 0) {
|
|
|
-// if (*length > 8) {
|
|
|
-// (*msg) += 8;
|
|
|
-// *length -= 8;
|
|
|
-// } else {
|
|
|
-// *msg = NULL;
|
|
|
-// *length = 0;
|
|
|
-// rc = -ENOENT;
|
|
|
-// }
|
|
|
-// }
|
|
|
-// return rc;
|
|
|
-//}
|
|
|
-//static int get_priority(sd_journal *j, int *priority)
|
|
|
-//{
|
|
|
-// const void *data;
|
|
|
-// size_t i, length;
|
|
|
-// int rc;
|
|
|
-// *priority = -1;
|
|
|
-// rc = sd_journal_get_data(j, "PRIORITY", &data, &length);
|
|
|
-// if (rc == 0) {
|
|
|
-// if ((length > 9) && (strncmp(data, "PRIORITY=", 9) == 0)) {
|
|
|
-// *priority = 0;
|
|
|
-// for (i = 9; i < length; i++) {
|
|
|
-// *priority = *priority * 10 + ((const char *)data)[i] - '0';
|
|
|
-// }
|
|
|
-// if (length > 9) {
|
|
|
-// rc = 0;
|
|
|
-// }
|
|
|
-// }
|
|
|
-// }
|
|
|
-// return rc;
|
|
|
-//}
|
|
|
-//static int is_attribute_field(const char *msg, size_t length)
|
|
|
-//{
|
|
|
-// static const struct known_field {
|
|
|
-// const char *name;
|
|
|
-// size_t length;
|
|
|
-// } fields[] = {
|
|
|
-// {"MESSAGE", sizeof("MESSAGE") - 1},
|
|
|
-// {"MESSAGE_ID", sizeof("MESSAGE_ID") - 1},
|
|
|
-// {"PRIORITY", sizeof("PRIORITY") - 1},
|
|
|
-// {"CODE_FILE", sizeof("CODE_FILE") - 1},
|
|
|
-// {"CODE_LINE", sizeof("CODE_LINE") - 1},
|
|
|
-// {"CODE_FUNC", sizeof("CODE_FUNC") - 1},
|
|
|
-// {"ERRNO", sizeof("ERRNO") - 1},
|
|
|
-// {"SYSLOG_FACILITY", sizeof("SYSLOG_FACILITY") - 1},
|
|
|
-// {"SYSLOG_IDENTIFIER", sizeof("SYSLOG_IDENTIFIER") - 1},
|
|
|
-// {"SYSLOG_PID", sizeof("SYSLOG_PID") - 1},
|
|
|
-// {"CONTAINER_NAME", sizeof("CONTAINER_NAME") - 1},
|
|
|
-// {"CONTAINER_ID", sizeof("CONTAINER_ID") - 1},
|
|
|
-// {"CONTAINER_ID_FULL", sizeof("CONTAINER_ID_FULL") - 1},
|
|
|
-// {"CONTAINER_TAG", sizeof("CONTAINER_TAG") - 1},
|
|
|
-// };
|
|
|
-// unsigned int i;
|
|
|
-// void *p;
|
|
|
-// if ((length < 1) || (msg[0] == '_') || ((p = memchr(msg, '=', length)) == NULL)) {
|
|
|
-// return -1;
|
|
|
-// }
|
|
|
-// length = ((const char *) p) - msg;
|
|
|
-// for (i = 0; i < sizeof(fields) / sizeof(fields[0]); i++) {
|
|
|
-// if ((fields[i].length == length) && (memcmp(fields[i].name, msg, length) == 0)) {
|
|
|
-// return -1;
|
|
|
-// }
|
|
|
-// }
|
|
|
-// return 0;
|
|
|
-//}
|
|
|
-//static int get_attribute_field(sd_journal *j, const char **msg, size_t *length)
|
|
|
-//{
|
|
|
-// int rc;
|
|
|
-// *msg = NULL;
|
|
|
-// *length = 0;
|
|
|
-// while ((rc = sd_journal_enumerate_data(j, (const void **) msg, length)) > 0) {
|
|
|
-// if (is_attribute_field(*msg, *length) == 0) {
|
|
|
-// break;
|
|
|
-// }
|
|
|
-// rc = -ENOENT;
|
|
|
-// }
|
|
|
-// return rc;
|
|
|
-//}
|
|
|
-import "C"
|
|
|
-
|
|
|
import (
|
|
|
"errors"
|
|
|
- "strings"
|
|
|
+ "runtime"
|
|
|
+ "strconv"
|
|
|
+ "sync/atomic"
|
|
|
"time"
|
|
|
- "unsafe"
|
|
|
|
|
|
"github.com/coreos/go-systemd/v22/journal"
|
|
|
+ "github.com/sirupsen/logrus"
|
|
|
+
|
|
|
"github.com/docker/docker/api/types/backend"
|
|
|
"github.com/docker/docker/daemon/logger"
|
|
|
- "github.com/sirupsen/logrus"
|
|
|
+ "github.com/docker/docker/daemon/logger/journald/internal/sdjournal"
|
|
|
)
|
|
|
|
|
|
-// CErr converts error code returned from a sd_journal_* function
|
|
|
-// (which returns -errno) to a string
|
|
|
-func CErr(ret C.int) string {
|
|
|
- return C.GoString(C.strerror(-ret))
|
|
|
+const closedDrainTimeout = 5 * time.Second
|
|
|
+
|
|
|
+// Fields which we know are not user-provided attribute fields.
|
|
|
+var wellKnownFields = map[string]bool{
|
|
|
+ "MESSAGE": true,
|
|
|
+ "MESSAGE_ID": true,
|
|
|
+ "PRIORITY": true,
|
|
|
+ "CODE_FILE": true,
|
|
|
+ "CODE_LINE": true,
|
|
|
+ "CODE_FUNC": true,
|
|
|
+ "ERRNO": true,
|
|
|
+ "SYSLOG_FACILITY": true,
|
|
|
+ fieldSyslogIdentifier: true,
|
|
|
+ "SYSLOG_PID": true,
|
|
|
+ fieldSyslogTimestamp: true,
|
|
|
+ fieldContainerName: true,
|
|
|
+ fieldContainerID: true,
|
|
|
+ fieldContainerIDFull: true,
|
|
|
+ fieldContainerTag: true,
|
|
|
+ fieldImageName: true,
|
|
|
+ fieldPLogID: true,
|
|
|
+ fieldPLogOrdinal: true,
|
|
|
+ fieldPLogLast: true,
|
|
|
+ fieldPartialMessage: true,
|
|
|
+ fieldLogEpoch: true,
|
|
|
+ fieldLogOrdinal: true,
|
|
|
}
|
|
|
|
|
|
-func (s *journald) drainJournal(logWatcher *logger.LogWatcher, j *C.sd_journal, oldCursor *C.char, untilUnixMicro uint64) (*C.char, bool, int) {
|
|
|
- var (
|
|
|
- msg, data, cursor *C.char
|
|
|
- length C.size_t
|
|
|
- stamp C.uint64_t
|
|
|
- priority, partial C.int
|
|
|
- done bool
|
|
|
- shown int
|
|
|
- )
|
|
|
-
|
|
|
- // Walk the journal from here forward until we run out of new entries
|
|
|
- // or we reach the until value (if provided).
|
|
|
-drain:
|
|
|
- for {
|
|
|
- // Try not to send a given entry twice.
|
|
|
- if oldCursor != nil {
|
|
|
- for C.sd_journal_test_cursor(j, oldCursor) > 0 {
|
|
|
- if C.sd_journal_next(j) <= 0 {
|
|
|
- break drain
|
|
|
- }
|
|
|
- }
|
|
|
+type reader struct {
|
|
|
+ s *journald
|
|
|
+ j *sdjournal.Journal
|
|
|
+ logWatcher *logger.LogWatcher
|
|
|
+ config logger.ReadConfig
|
|
|
+ maxOrdinal uint64
|
|
|
+ initialized bool
|
|
|
+ ready chan struct{}
|
|
|
+}
|
|
|
+
|
|
|
+func getMessage(d map[string]string) (line []byte, ok bool) {
|
|
|
+ m, ok := d["MESSAGE"]
|
|
|
+ if ok {
|
|
|
+ line = []byte(m)
|
|
|
+ if d[fieldPartialMessage] != "true" {
|
|
|
+ line = append(line, "\n"...)
|
|
|
}
|
|
|
- // Read and send the logged message, if there is one to read.
|
|
|
- i := C.get_message(j, &msg, &length, &partial)
|
|
|
- if i != -C.ENOENT && i != -C.EADDRNOTAVAIL {
|
|
|
- // Read the entry's timestamp.
|
|
|
- if C.sd_journal_get_realtime_usec(j, &stamp) != 0 {
|
|
|
- break
|
|
|
+ }
|
|
|
+ return line, ok
|
|
|
+}
|
|
|
+
|
|
|
+func getPriority(d map[string]string) (journal.Priority, bool) {
|
|
|
+ if pri, ok := d["PRIORITY"]; ok {
|
|
|
+ i, err := strconv.Atoi(pri)
|
|
|
+ return journal.Priority(i), err == nil
|
|
|
+ }
|
|
|
+ return -1, false
|
|
|
+}
|
|
|
+
|
|
|
+// getSource recovers the stream name from the entry data by mapping from the
|
|
|
+// journal priority field back to the stream that we would have assigned that
|
|
|
+// value.
|
|
|
+func getSource(d map[string]string) string {
|
|
|
+ source := ""
|
|
|
+ if priority, ok := getPriority(d); ok {
|
|
|
+ if priority == journal.PriErr {
|
|
|
+ source = "stderr"
|
|
|
+ } else if priority == journal.PriInfo {
|
|
|
+ source = "stdout"
|
|
|
+ }
|
|
|
+ }
|
|
|
+ return source
|
|
|
+}
|
|
|
+
|
|
|
+func getAttrs(d map[string]string) []backend.LogAttr {
|
|
|
+ var attrs []backend.LogAttr
|
|
|
+ for k, v := range d {
|
|
|
+ if k[0] != '_' && !wellKnownFields[k] {
|
|
|
+ attrs = append(attrs, backend.LogAttr{Key: k, Value: v})
|
|
|
+ }
|
|
|
+ }
|
|
|
+ return attrs
|
|
|
+}
|
|
|
+
|
|
|
+// errDrainDone is the error returned by drainJournal to signal that there are
|
|
|
+// no more log entries to send to the log watcher.
|
|
|
+var errDrainDone = errors.New("journald drain done")
|
|
|
+
|
|
|
+// drainJournal reads and sends log messages from the journal.
|
|
|
+//
|
|
|
+// drainJournal returns errDrainDone when a terminal stopping condition has been
|
|
|
+// reached: the watch consumer is gone, a log entry is read which has a
|
|
|
+// timestamp after until (if until is nonzero), or the log driver is closed and
|
|
|
+// the last message logged has been sent from the journal. If the end of the
|
|
|
+// journal is reached without encountering a terminal stopping condition, a nil
|
|
|
+// error is returned.
|
|
|
+func (r *reader) drainJournal() error {
|
|
|
+ if !r.initialized {
|
|
|
+ defer func() {
|
|
|
+ r.signalReady()
|
|
|
+ r.initialized = true
|
|
|
+ }()
|
|
|
+
|
|
|
+ var (
|
|
|
+ err error
|
|
|
+ seekedToTail bool
|
|
|
+ )
|
|
|
+ if r.config.Tail >= 0 {
|
|
|
+ if r.config.Until.IsZero() {
|
|
|
+ err = r.j.SeekTail()
|
|
|
+ seekedToTail = true
|
|
|
+ } else {
|
|
|
+ err = r.j.SeekRealtime(r.config.Until)
|
|
|
}
|
|
|
- // Break if the timestamp exceeds any provided until flag.
|
|
|
- if untilUnixMicro != 0 && untilUnixMicro < uint64(stamp) {
|
|
|
- done = true
|
|
|
- break
|
|
|
+ } else {
|
|
|
+ if r.config.Since.IsZero() {
|
|
|
+ err = r.j.SeekHead()
|
|
|
+ } else {
|
|
|
+ err = r.j.SeekRealtime(r.config.Since)
|
|
|
}
|
|
|
+ }
|
|
|
+ if err != nil {
|
|
|
+ return err
|
|
|
+ }
|
|
|
|
|
|
- // Set up the time and text of the entry.
|
|
|
- timestamp := time.Unix(int64(stamp)/1000000, (int64(stamp)%1000000)*1000)
|
|
|
- line := C.GoBytes(unsafe.Pointer(msg), C.int(length))
|
|
|
- if partial == 0 {
|
|
|
- line = append(line, "\n"...)
|
|
|
+ // SeekTail() followed by Next() behaves incorrectly, so we need
|
|
|
+ // to work around the bug by ensuring the first discrete
|
|
|
+ // movement of the read pointer is Previous() or PreviousSkip().
|
|
|
+ // PreviousSkip() is called inside the loop when config.Tail > 0
|
|
|
+ // so the only special case requiring special handling is
|
|
|
+ // config.Tail == 0.
|
|
|
+ // https://github.com/systemd/systemd/issues/9934
|
|
|
+ if seekedToTail && r.config.Tail == 0 {
|
|
|
+ // Resolve the read pointer to the last entry in the
|
|
|
+ // journal so that the call to Next() inside the loop
|
|
|
+ // advances past it.
|
|
|
+ if ok, err := r.j.Previous(); err != nil || !ok {
|
|
|
+ return err
|
|
|
}
|
|
|
- // Recover the stream name by mapping
|
|
|
- // from the journal priority back to
|
|
|
- // the stream that we would have
|
|
|
- // assigned that value.
|
|
|
- source := ""
|
|
|
- if C.get_priority(j, &priority) == 0 {
|
|
|
- if priority == C.int(journal.PriErr) {
|
|
|
- source = "stderr"
|
|
|
- } else if priority == C.int(journal.PriInfo) {
|
|
|
- source = "stdout"
|
|
|
- }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ for i := 0; ; i++ {
|
|
|
+ if !r.initialized && i == 0 && r.config.Tail > 0 {
|
|
|
+ if n, err := r.j.PreviousSkip(uint(r.config.Tail)); err != nil || n == 0 {
|
|
|
+ return err
|
|
|
}
|
|
|
- // Retrieve the values of any variables we're adding to the journal.
|
|
|
- var attrs []backend.LogAttr
|
|
|
- C.sd_journal_restart_data(j)
|
|
|
- for C.get_attribute_field(j, &data, &length) > C.int(0) {
|
|
|
- kv := strings.SplitN(C.GoStringN(data, C.int(length)), "=", 2)
|
|
|
- attrs = append(attrs, backend.LogAttr{Key: kv[0], Value: kv[1]})
|
|
|
+ } else if ok, err := r.j.Next(); err != nil || !ok {
|
|
|
+ return err
|
|
|
+ }
|
|
|
+
|
|
|
+ if !r.initialized && i == 0 {
|
|
|
+ // The cursor is in a position which will be unaffected
|
|
|
+ // by subsequent logging.
|
|
|
+ r.signalReady()
|
|
|
+ }
|
|
|
+
|
|
|
+ // Read the entry's timestamp.
|
|
|
+ timestamp, err := r.j.Realtime()
|
|
|
+ if err != nil {
|
|
|
+ return err
|
|
|
+ }
|
|
|
+ // Check if the PreviousSkip went too far back. Check only the
|
|
|
+ // initial position as we are comparing wall-clock timestamps,
|
|
|
+ // which may not be monotonic. We don't want to skip over
|
|
|
+ // messages sent later in time just because the clock moved
|
|
|
+ // backwards.
|
|
|
+ if !r.initialized && i == 0 && r.config.Tail > 0 && timestamp.Before(r.config.Since) {
|
|
|
+ r.j.SeekRealtime(r.config.Since)
|
|
|
+ continue
|
|
|
+ }
|
|
|
+ if !r.config.Until.IsZero() && r.config.Until.Before(timestamp) {
|
|
|
+ return errDrainDone
|
|
|
+ }
|
|
|
+
|
|
|
+ // Read and send the logged message, if there is one to read.
|
|
|
+ data, err := r.j.Data()
|
|
|
+ if err != nil {
|
|
|
+ return err
|
|
|
+ }
|
|
|
+
|
|
|
+ if data[fieldLogEpoch] == r.s.epoch {
|
|
|
+ seq, err := strconv.ParseUint(data[fieldLogOrdinal], 10, 64)
|
|
|
+ if err == nil && seq > r.maxOrdinal {
|
|
|
+ r.maxOrdinal = seq
|
|
|
}
|
|
|
+ }
|
|
|
|
|
|
+ if line, ok := getMessage(data); ok {
|
|
|
// Send the log message, unless the consumer is gone
|
|
|
- select {
|
|
|
- case <-logWatcher.WatchConsumerGone():
|
|
|
- done = true // we won't be able to write anything anymore
|
|
|
- break drain
|
|
|
- case logWatcher.Msg <- &logger.Message{
|
|
|
+ msg := &logger.Message{
|
|
|
Line: line,
|
|
|
- Source: source,
|
|
|
+ Source: getSource(data),
|
|
|
Timestamp: timestamp.In(time.UTC),
|
|
|
- Attrs: attrs,
|
|
|
- }:
|
|
|
- shown++
|
|
|
+ Attrs: getAttrs(data),
|
|
|
}
|
|
|
- // Call sd_journal_process() periodically during the processing loop
|
|
|
- // to close any opened file descriptors for rotated (deleted) journal files.
|
|
|
- if shown%1024 == 0 {
|
|
|
- if ret := C.sd_journal_process(j); ret < 0 {
|
|
|
- // log a warning but ignore it for now
|
|
|
- logrus.WithField("container", s.vars["CONTAINER_ID_FULL"]).
|
|
|
- WithField("error", CErr(ret)).
|
|
|
- Warn("journald: error processing journal")
|
|
|
+ // The daemon timestamp will differ from the "trusted"
|
|
|
+ // timestamp of when the event was received by journald.
|
|
|
+ // We can efficiently seek around the journal by the
|
|
|
+ // event timestamp, and the event timestamp is what
|
|
|
+ // journalctl displays. The daemon timestamp is just an
|
|
|
+ // application-supplied field with no special
|
|
|
+ // significance; libsystemd won't help us seek to the
|
|
|
+ // entry with the closest timestamp.
|
|
|
+ /*
|
|
|
+ if sts := data["SYSLOG_TIMESTAMP"]; sts != "" {
|
|
|
+ if tv, err := time.Parse(time.RFC3339Nano, sts); err == nil {
|
|
|
+ msg.Timestamp = tv
|
|
|
+ }
|
|
|
}
|
|
|
+ */
|
|
|
+ select {
|
|
|
+ case <-r.logWatcher.WatchConsumerGone():
|
|
|
+ return errDrainDone
|
|
|
+ case r.logWatcher.Msg <- msg:
|
|
|
}
|
|
|
}
|
|
|
- // If we're at the end of the journal, we're done (for now).
|
|
|
- if C.sd_journal_next(j) <= 0 {
|
|
|
- break
|
|
|
- }
|
|
|
- }
|
|
|
|
|
|
- // free(NULL) is safe
|
|
|
- C.free(unsafe.Pointer(oldCursor))
|
|
|
- if C.sd_journal_get_cursor(j, &cursor) != 0 {
|
|
|
- // ensure that we won't be freeing an address that's invalid
|
|
|
- cursor = nil
|
|
|
+ // Call sd_journal_process() periodically during the processing loop
|
|
|
+ // to close any opened file descriptors for rotated (deleted) journal files.
|
|
|
+ if i != 0 && i%1024 == 0 {
|
|
|
+ if _, err := r.j.Process(); err != nil {
|
|
|
+ // log a warning but ignore it for now
|
|
|
+ logrus.WithField("container", r.s.vars[fieldContainerIDFull]).
|
|
|
+ WithField("error", err).
|
|
|
+ Warn("journald: error processing journal")
|
|
|
+ }
|
|
|
+ }
|
|
|
}
|
|
|
- return cursor, done, shown
|
|
|
}
|
|
|
|
|
|
-func (s *journald) followJournal(logWatcher *logger.LogWatcher, j *C.sd_journal, cursor *C.char, untilUnixMicro uint64) *C.char {
|
|
|
- defer close(logWatcher.Msg)
|
|
|
+func (r *reader) readJournal() error {
|
|
|
+ caughtUp := atomic.LoadUint64(&r.s.ordinal)
|
|
|
+ if err := r.drainJournal(); err != nil {
|
|
|
+ if err != errDrainDone {
|
|
|
+ return err
|
|
|
+ }
|
|
|
+ return nil
|
|
|
+ }
|
|
|
|
|
|
- waitTimeout := C.uint64_t(250000) // 0.25s
|
|
|
+ var drainTimeout <-chan time.Time
|
|
|
+ if !r.config.Follow {
|
|
|
+ if r.s.readSyncTimeout == 0 {
|
|
|
+ return nil
|
|
|
+ }
|
|
|
+ tmr := time.NewTimer(r.s.readSyncTimeout)
|
|
|
+ defer tmr.Stop()
|
|
|
+ drainTimeout = tmr.C
|
|
|
+ }
|
|
|
|
|
|
-LOOP:
|
|
|
for {
|
|
|
- status := C.sd_journal_wait(j, waitTimeout)
|
|
|
- if status < 0 {
|
|
|
- logWatcher.Err <- errors.New("error waiting for journal: " + CErr(status))
|
|
|
- break
|
|
|
+ status, err := r.j.Wait(250 * time.Millisecond)
|
|
|
+ if err != nil {
|
|
|
+ return err
|
|
|
}
|
|
|
select {
|
|
|
- case <-logWatcher.WatchConsumerGone():
|
|
|
- break LOOP // won't be able to write anything anymore
|
|
|
- case <-s.closed:
|
|
|
+ case <-r.logWatcher.WatchConsumerGone():
|
|
|
+ return nil // won't be able to write anything anymore
|
|
|
+ case <-drainTimeout:
|
|
|
+ // Container is gone but we haven't found the end of the
|
|
|
+ // logs within the timeout. Maybe it was dropped by
|
|
|
+ // journald, e.g. due to rate-limiting.
|
|
|
+ return nil
|
|
|
+ case <-r.s.closed:
|
|
|
// container is gone, drain journal
|
|
|
+ lastSeq := atomic.LoadUint64(&r.s.ordinal)
|
|
|
+ if r.maxOrdinal >= lastSeq {
|
|
|
+ // All caught up with the logger!
|
|
|
+ return nil
|
|
|
+ }
|
|
|
+ if drainTimeout == nil {
|
|
|
+ tmr := time.NewTimer(closedDrainTimeout)
|
|
|
+ defer tmr.Stop()
|
|
|
+ drainTimeout = tmr.C
|
|
|
+ }
|
|
|
default:
|
|
|
// container is still alive
|
|
|
- if status == C.SD_JOURNAL_NOP {
|
|
|
+ if status == sdjournal.StatusNOP {
|
|
|
// no new data -- keep waiting
|
|
|
continue
|
|
|
}
|
|
|
}
|
|
|
- newCursor, done, recv := s.drainJournal(logWatcher, j, cursor, untilUnixMicro)
|
|
|
- cursor = newCursor
|
|
|
- if done || (status == C.SD_JOURNAL_NOP && recv == 0) {
|
|
|
- break
|
|
|
+ err = r.drainJournal()
|
|
|
+ if err != nil {
|
|
|
+ if err != errDrainDone {
|
|
|
+ return err
|
|
|
+ }
|
|
|
+ return nil
|
|
|
+ }
|
|
|
+ if !r.config.Follow && r.s.readSyncTimeout > 0 && r.maxOrdinal >= caughtUp {
|
|
|
+ return nil
|
|
|
}
|
|
|
}
|
|
|
-
|
|
|
- return cursor
|
|
|
}
|
|
|
|
|
|
-func (s *journald) readLogs(logWatcher *logger.LogWatcher, config logger.ReadConfig) {
|
|
|
- var (
|
|
|
- j *C.sd_journal
|
|
|
- cmatch, cursor *C.char
|
|
|
- stamp C.uint64_t
|
|
|
- sinceUnixMicro uint64
|
|
|
- untilUnixMicro uint64
|
|
|
- )
|
|
|
+func (r *reader) readLogs() {
|
|
|
+ defer close(r.logWatcher.Msg)
|
|
|
+
|
|
|
+ // Make sure the ready channel is closed in the event of an early
|
|
|
+ // return.
|
|
|
+ defer r.signalReady()
|
|
|
+
|
|
|
+ // Quoting https://www.freedesktop.org/software/systemd/man/sd-journal.html:
|
|
|
+ // Functions that operate on sd_journal objects are thread
|
|
|
+ // agnostic — given sd_journal pointer may only be used from one
|
|
|
+ // specific thread at all times (and it has to be the very same one
|
|
|
+ // during the entire lifetime of the object), but multiple,
|
|
|
+ // independent threads may use multiple, independent objects safely.
|
|
|
+ //
|
|
|
+ // sdjournal.Open returns a value which wraps an sd_journal pointer so
|
|
|
+ // we need to abide by those rules.
|
|
|
+ runtime.LockOSThread()
|
|
|
+ defer runtime.UnlockOSThread()
|
|
|
|
|
|
// Get a handle to the journal.
|
|
|
- if rc := C.sd_journal_open(&j, C.int(0)); rc != 0 {
|
|
|
- logWatcher.Err <- errors.New("error opening journal: " + CErr(rc))
|
|
|
- close(logWatcher.Msg)
|
|
|
+ var err error
|
|
|
+ if r.s.journalReadDir != "" {
|
|
|
+ r.j, err = sdjournal.OpenDir(r.s.journalReadDir, 0)
|
|
|
+ } else {
|
|
|
+ r.j, err = sdjournal.Open(0)
|
|
|
+ }
|
|
|
+ if err != nil {
|
|
|
+ r.logWatcher.Err <- err
|
|
|
return
|
|
|
}
|
|
|
- if config.Follow {
|
|
|
+ defer r.j.Close()
|
|
|
+
|
|
|
+ if r.config.Follow {
|
|
|
// Initialize library inotify watches early
|
|
|
- if rc := C.sd_journal_get_fd(j); rc < 0 {
|
|
|
- logWatcher.Err <- errors.New("error getting journald fd: " + CErr(rc))
|
|
|
- close(logWatcher.Msg)
|
|
|
+ if err := r.j.InitializeInotify(); err != nil {
|
|
|
+ r.logWatcher.Err <- err
|
|
|
return
|
|
|
}
|
|
|
}
|
|
|
- // If we end up following the log, we can set the journal context
|
|
|
- // pointer and the channel pointer to nil so that we won't close them
|
|
|
- // here, potentially while the goroutine that uses them is still
|
|
|
- // running. Otherwise, close them when we return from this function.
|
|
|
- following := false
|
|
|
- defer func() {
|
|
|
- if !following {
|
|
|
- close(logWatcher.Msg)
|
|
|
- }
|
|
|
- C.sd_journal_close(j)
|
|
|
- }()
|
|
|
+
|
|
|
// Remove limits on the size of data items that we'll retrieve.
|
|
|
- if rc := C.sd_journal_set_data_threshold(j, C.size_t(0)); rc != 0 {
|
|
|
- logWatcher.Err <- errors.New("error setting journal data threshold: " + CErr(rc))
|
|
|
+ if err := r.j.SetDataThreshold(0); err != nil {
|
|
|
+ r.logWatcher.Err <- err
|
|
|
return
|
|
|
}
|
|
|
// Add a match to have the library do the searching for us.
|
|
|
- cmatch = C.CString("CONTAINER_ID_FULL=" + s.vars["CONTAINER_ID_FULL"])
|
|
|
- defer C.free(unsafe.Pointer(cmatch))
|
|
|
- if rc := C.sd_journal_add_match(j, unsafe.Pointer(cmatch), C.strlen(cmatch)); rc != 0 {
|
|
|
- logWatcher.Err <- errors.New("error setting journal match: " + CErr(rc))
|
|
|
+ if err := r.j.AddMatch(fieldContainerIDFull, r.s.vars[fieldContainerIDFull]); err != nil {
|
|
|
+ r.logWatcher.Err <- err
|
|
|
return
|
|
|
}
|
|
|
- // If we have a cutoff time, convert it to Unix time once.
|
|
|
- if !config.Since.IsZero() {
|
|
|
- nano := config.Since.UnixNano()
|
|
|
- sinceUnixMicro = uint64(nano / 1000)
|
|
|
+
|
|
|
+ if err := r.readJournal(); err != nil {
|
|
|
+ r.logWatcher.Err <- err
|
|
|
+ return
|
|
|
}
|
|
|
- // If we have an until value, convert it too
|
|
|
- if !config.Until.IsZero() {
|
|
|
- nano := config.Until.UnixNano()
|
|
|
- untilUnixMicro = uint64(nano / 1000)
|
|
|
+}
|
|
|
+
|
|
|
+func (r *reader) signalReady() {
|
|
|
+ select {
|
|
|
+ case <-r.ready:
|
|
|
+ default:
|
|
|
+ close(r.ready)
|
|
|
}
|
|
|
- if config.Tail >= 0 {
|
|
|
- // If until time provided, start from there.
|
|
|
- // Otherwise start at the end of the journal.
|
|
|
- if untilUnixMicro != 0 {
|
|
|
- if rc := C.sd_journal_seek_realtime_usec(j, C.uint64_t(untilUnixMicro)); rc != 0 {
|
|
|
- logWatcher.Err <- errors.New("error seeking provided until value: " + CErr(rc))
|
|
|
- return
|
|
|
- }
|
|
|
- } else if rc := C.sd_journal_seek_tail(j); rc != 0 {
|
|
|
- logWatcher.Err <- errors.New("error seeking to end of journal: " + CErr(rc))
|
|
|
- return
|
|
|
+}
|
|
|
+
|
|
|
+func (s *journald) ReadLogs(config logger.ReadConfig) *logger.LogWatcher {
|
|
|
+ r := &reader{
|
|
|
+ s: s,
|
|
|
+ logWatcher: logger.NewLogWatcher(),
|
|
|
+ config: config,
|
|
|
+ ready: make(chan struct{}),
|
|
|
+ }
|
|
|
+ go r.readLogs()
|
|
|
+ // Block until the reader is in position to read from the current config
|
|
|
+ // location to prevent race conditions in tests.
|
|
|
+ <-r.ready
|
|
|
+ return r.logWatcher
|
|
|
+}
|
|
|
+
|
|
|
+func waitUntilFlushedImpl(s *journald) error {
|
|
|
+ if s.readSyncTimeout == 0 {
|
|
|
+ return nil
|
|
|
+ }
|
|
|
+
|
|
|
+ ordinal := atomic.LoadUint64(&s.ordinal)
|
|
|
+ if ordinal == 0 {
|
|
|
+ return nil // No logs were sent; nothing to wait for.
|
|
|
+ }
|
|
|
+
|
|
|
+ flushed := make(chan error)
|
|
|
+ go func() {
|
|
|
+ defer close(flushed)
|
|
|
+ runtime.LockOSThread()
|
|
|
+
|
|
|
+ var (
|
|
|
+ j *sdjournal.Journal
|
|
|
+ err error
|
|
|
+ )
|
|
|
+ if s.journalReadDir != "" {
|
|
|
+ j, err = sdjournal.OpenDir(s.journalReadDir, 0)
|
|
|
+ } else {
|
|
|
+ j, err = sdjournal.Open(0)
|
|
|
}
|
|
|
- // (Try to) skip backwards by the requested number of lines...
|
|
|
- if C.sd_journal_previous_skip(j, C.uint64_t(config.Tail)) >= 0 {
|
|
|
- // ...but not before "since"
|
|
|
- if sinceUnixMicro != 0 &&
|
|
|
- C.sd_journal_get_realtime_usec(j, &stamp) == 0 &&
|
|
|
- uint64(stamp) < sinceUnixMicro {
|
|
|
- C.sd_journal_seek_realtime_usec(j, C.uint64_t(sinceUnixMicro))
|
|
|
- }
|
|
|
+ if err != nil {
|
|
|
+ flushed <- err
|
|
|
+ return
|
|
|
}
|
|
|
- } else {
|
|
|
- // Start at the beginning of the journal.
|
|
|
- if rc := C.sd_journal_seek_head(j); rc != 0 {
|
|
|
- logWatcher.Err <- errors.New("error seeking to start of journal: " + CErr(rc))
|
|
|
+ defer j.Close()
|
|
|
+
|
|
|
+ if err := j.AddMatch(fieldContainerIDFull, s.vars[fieldContainerIDFull]); err != nil {
|
|
|
+ flushed <- err
|
|
|
return
|
|
|
}
|
|
|
- // If we have a cutoff date, fast-forward to it.
|
|
|
- if sinceUnixMicro != 0 {
|
|
|
- if rc := C.sd_journal_seek_realtime_usec(j, C.uint64_t(sinceUnixMicro)); rc != 0 {
|
|
|
- logWatcher.Err <- errors.New("error seeking to start time in journal: " + CErr(rc))
|
|
|
- return
|
|
|
- }
|
|
|
+ if err := j.AddMatch(fieldLogEpoch, s.epoch); err != nil {
|
|
|
+ flushed <- err
|
|
|
+ return
|
|
|
}
|
|
|
- if rc := C.sd_journal_next(j); rc < 0 {
|
|
|
- logWatcher.Err <- errors.New("error skipping to next journal entry: " + CErr(rc))
|
|
|
+ if err := j.AddMatch(fieldLogOrdinal, strconv.FormatUint(ordinal, 10)); err != nil {
|
|
|
+ flushed <- err
|
|
|
return
|
|
|
}
|
|
|
- }
|
|
|
- if config.Tail != 0 { // special case for --tail 0
|
|
|
- cursor, _, _ = s.drainJournal(logWatcher, j, nil, untilUnixMicro)
|
|
|
- }
|
|
|
- if config.Follow {
|
|
|
- cursor = s.followJournal(logWatcher, j, cursor, untilUnixMicro)
|
|
|
- // Let followJournal handle freeing the journal context
|
|
|
- // object and closing the channel.
|
|
|
- following = true
|
|
|
- }
|
|
|
|
|
|
- C.free(unsafe.Pointer(cursor))
|
|
|
+ deadline := time.Now().Add(s.readSyncTimeout)
|
|
|
+ for time.Now().Before(deadline) {
|
|
|
+ if ok, err := j.Next(); ok {
|
|
|
+ // Found it!
|
|
|
+ return
|
|
|
+ } else if err != nil {
|
|
|
+ flushed <- err
|
|
|
+ return
|
|
|
+ }
|
|
|
+ if _, err := j.Wait(100 * time.Millisecond); err != nil {
|
|
|
+ flushed <- err
|
|
|
+ return
|
|
|
+ }
|
|
|
+ }
|
|
|
+ logrus.WithField("container", s.vars[fieldContainerIDFull]).
|
|
|
+ Warn("journald: deadline exceeded waiting for logs to be committed to journal")
|
|
|
+ }()
|
|
|
+ return <-flushed
|
|
|
}
|
|
|
|
|
|
-func (s *journald) ReadLogs(config logger.ReadConfig) *logger.LogWatcher {
|
|
|
- logWatcher := logger.NewLogWatcher()
|
|
|
- go s.readLogs(logWatcher, config)
|
|
|
- return logWatcher
|
|
|
+func init() {
|
|
|
+ waitUntilFlushed = waitUntilFlushedImpl
|
|
|
}
|