diff --git a/daemon/logger/journald/internal/sdjournal/cursor.go b/daemon/logger/journald/internal/sdjournal/cursor.go new file mode 100644 index 0000000000..52cc21c7c5 --- /dev/null +++ b/daemon/logger/journald/internal/sdjournal/cursor.go @@ -0,0 +1,45 @@ +//go:build linux && cgo && !static_build && journald +// +build linux,cgo,!static_build,journald + +package sdjournal // import "github.com/docker/docker/daemon/logger/journald/internal/sdjournal" + +// #include +import "C" +import ( + "runtime" + "unsafe" +) + +// Cursor is a reference to a journal cursor. A Cursor must not be copied. +type Cursor struct { + c *C.char + noCopy noCopy //nolint:structcheck,unused // Exists only to mark values uncopyable for `go vet`. +} + +func wrapCursor(cur *C.char) *Cursor { + c := &Cursor{c: cur} + runtime.SetFinalizer(c, (*Cursor).Free) + return c +} + +func (c *Cursor) String() string { + if c.c == nil { + return "" + } + return C.GoString(c.c) +} + +// Free invalidates the cursor and frees any associated resources on the C heap. +func (c *Cursor) Free() { + if c == nil { + return + } + C.free(unsafe.Pointer(c.c)) + runtime.SetFinalizer(c, nil) + c.c = nil +} + +type noCopy struct{} + +func (*noCopy) Lock() {} +func (*noCopy) Unlock() {} diff --git a/daemon/logger/journald/internal/sdjournal/doc.go b/daemon/logger/journald/internal/sdjournal/doc.go new file mode 100644 index 0000000000..2e0850694d --- /dev/null +++ b/daemon/logger/journald/internal/sdjournal/doc.go @@ -0,0 +1,3 @@ +// Package sdjournal provides a Go interface to the systemd journal read API by +// wrapping the libsystemd C library. +package sdjournal // import "github.com/docker/docker/daemon/logger/journald/internal/sdjournal" diff --git a/daemon/logger/journald/internal/sdjournal/sdjournal.go b/daemon/logger/journald/internal/sdjournal/sdjournal.go new file mode 100644 index 0000000000..651d51ac86 --- /dev/null +++ b/daemon/logger/journald/internal/sdjournal/sdjournal.go @@ -0,0 +1,237 @@ +//go:build linux && cgo && !static_build && journald +// +build linux,cgo,!static_build,journald + +package sdjournal // import "github.com/docker/docker/daemon/logger/journald/internal/sdjournal" + +// #cgo pkg-config: libsystemd +// #include +// #include +// +// static int add_match(sd_journal *j, _GoString_ s) { +// return sd_journal_add_match(j, _GoStringPtr(s), _GoStringLen(s)); +// } +import "C" +import ( + "fmt" + "runtime" + "strings" + "syscall" + "time" + "unsafe" +) + +// Status is an sd-journal status code. +type Status int + +// Status values for Process() and Wait(). +const ( + StatusNOP = Status(C.SD_JOURNAL_NOP) // SD_JOURNAL_NOP + StatusAPPEND = Status(C.SD_JOURNAL_APPEND) // SD_JOURNAL_APPEND + StatusINVALIDATE = Status(C.SD_JOURNAL_INVALIDATE) // SD_JOURNAL_INVALIDATE +) + +const ( + // ErrInvalidReadPointer is the error returned when the read pointer is + // in an invalid position. + ErrInvalidReadPointer = syscall.EADDRNOTAVAIL +) + +// Journal is a handle to an open journald journal. +type Journal struct { + j *C.sd_journal + noCopy noCopy //nolint:structcheck,unused // Exists only to mark values uncopyable for `go vet`. +} + +// Open opens the log journal for reading. +// +// The returned Journal value may only be used from the same operating system +// thread which Open was called from. Using it from only a single goroutine is +// not sufficient; runtime.LockOSThread must also be used. +func Open(flags int) (*Journal, error) { + j := &Journal{} + if rc := C.sd_journal_open(&j.j, C.int(flags)); rc != 0 { + return nil, fmt.Errorf("journald: error opening journal: %w", syscall.Errno(-rc)) + } + runtime.SetFinalizer(j, (*Journal).Close) + return j, nil +} + +// Close closes the journal. The return value is always nil. +func (j *Journal) Close() error { + if j.j != nil { + C.sd_journal_close(j.j) + runtime.SetFinalizer(j, nil) + j.j = nil + } + return nil +} + +// Process processes journal events. +// +// https://www.freedesktop.org/software/systemd/man/sd_journal_process.html +func (j *Journal) Process() (Status, error) { + s := C.sd_journal_process(j.j) + if s < 0 { + return 0, fmt.Errorf("journald: error processing events: %w", syscall.Errno(-s)) + } + return Status(s), nil +} + +// InitializeInotify sets up change notifications for the journal. +func (j *Journal) InitializeInotify() error { + if rc := C.sd_journal_get_fd(j.j); rc < 0 { + return fmt.Errorf("journald: error initializing inotify watches: %w", syscall.Errno(-rc)) + } + return nil +} + +// AddMatch adds a match by which to filter the entries of the journal file. +// +// https://www.freedesktop.org/software/systemd/man/sd_journal_add_match.html +func (j *Journal) AddMatch(field, value string) error { + m := field + "=" + value + if rc := C.add_match(j.j, m); rc != 0 { + return fmt.Errorf("journald: error adding match %q: %w", m, syscall.Errno(-rc)) + } + return nil +} + +// Next advances the read pointer to the next entry. +func (j *Journal) Next() (bool, error) { + rc := C.sd_journal_next(j.j) + if rc < 0 { + return false, fmt.Errorf("journald: error advancing read pointer: %w", syscall.Errno(-rc)) + } + return rc > 0, nil +} + +// PreviousSkip sets back the read pointer by n entries. The number of entries +// must be less than or equal to 2147483647 (2**31 - 1). +func (j *Journal) PreviousSkip(n uint) (int, error) { + rc := C.sd_journal_previous_skip(j.j, C.uint64_t(n)) + if rc < 0 { + return 0, fmt.Errorf("journald: error setting back read pointer: %w", syscall.Errno(-rc)) + } + return int(rc), nil +} + +// SeekHead sets the read pointer to the position before the oldest available entry. +func (j *Journal) SeekHead() error { + if rc := C.sd_journal_seek_head(j.j); rc != 0 { + return fmt.Errorf("journald: error seeking to head of journal: %w", syscall.Errno(-rc)) + } + return nil +} + +// SeekTail sets the read pointer to the position after the most recent available entry. +func (j *Journal) SeekTail() error { + if rc := C.sd_journal_seek_tail(j.j); rc != 0 { + return fmt.Errorf("journald: error seeking to tail of journal: %w", syscall.Errno(-rc)) + } + return nil +} + +// SeekRealtime seeks to a position with a realtime (wallclock) timestamp after t. +// +// Note that the realtime clock is not necessarily monotonic. If a realtime +// timestamp is ambiguous, the position seeked to is not defined. +func (j *Journal) SeekRealtime(t time.Time) error { + if rc := C.sd_journal_seek_realtime_usec(j.j, C.uint64_t(t.UnixMicro())); rc != 0 { + return fmt.Errorf("journald: error seeking to time %v: %w", t, syscall.Errno(-rc)) + } + return nil +} + +// Cursor returns a serialization of the journal read pointer's current position. +func (j *Journal) Cursor() (*Cursor, error) { + var c *C.char + if rc := C.sd_journal_get_cursor(j.j, &c); rc != 0 { + return nil, fmt.Errorf("journald: error getting cursor: %w", syscall.Errno(-rc)) + } + return wrapCursor(c), nil +} + +// TestCursor checks whether the current position of the journal read pointer matches c. +func (j *Journal) TestCursor(c *Cursor) (bool, error) { + rc := C.sd_journal_test_cursor(j.j, c.c) + if rc < 0 { + return false, fmt.Errorf("journald: error testing cursor: %w", syscall.Errno(-rc)) + } + return rc > 0, nil +} + +// Wait blocks until the journal gets changed or timeout has elapsed. +// Pass a negative timeout to wait indefinitely. +func (j *Journal) Wait(timeout time.Duration) (Status, error) { + var dur C.uint64_t + if timeout < 0 { + // Wait indefinitely. + dur = ^C.uint64_t(0) // (uint64_t) -1 + } else { + dur = C.uint64_t(timeout.Microseconds()) + } + s := C.sd_journal_wait(j.j, dur) + if s < 0 { + return 0, fmt.Errorf("journald: error waiting for event: %w", syscall.Errno(-s)) + } + return Status(s), nil +} + +// Realtime returns the realtime timestamp of the current journal entry. +func (j *Journal) Realtime() (time.Time, error) { + var stamp C.uint64_t + if rc := C.sd_journal_get_realtime_usec(j.j, &stamp); rc != 0 { + return time.Time{}, fmt.Errorf("journald: error getting journal entry timestamp: %w", syscall.Errno(-rc)) + } + return time.UnixMicro(int64(stamp)), nil +} + +// Data returns all data fields for the current journal entry. +func (j *Journal) Data() (map[string]string, error) { + // Copying all the data fields for the entry into a map is more optimal + // than you might think. Doing so has time complexity O(N), where N is + // the number of fields in the entry. Looking up a data field in the map + // is amortized O(1), so the total complexity to look up M data fields + // is O(N+M). By comparison, looking up a data field using the + // sd_journal_get_data function has time complexity of O(N) as it is + // implemented as a linear search through the entry's fields. Therefore + // looking up M data fields in an entry by calling sd_journal_get_data + // in a loop would have time complexity of O(N*M). + + m := make(map[string]string) + j.restartData() + for { + var ( + data unsafe.Pointer + len C.size_t + ) + rc := C.sd_journal_enumerate_data(j.j, &data, &len) + if rc == 0 { + return m, nil + } else if rc < 0 { + return m, fmt.Errorf("journald: error enumerating entry data: %w", syscall.Errno(-rc)) + } + + kv := strings.SplitN(C.GoStringN((*C.char)(data), C.int(len)), "=", 2) + m[kv[0]] = kv[1] + } +} + +func (j *Journal) restartData() { + C.sd_journal_restart_data(j.j) +} + +// SetDataThreshold may be used to change the data field size threshold for data +// returned by j.Data(). The threshold is a hint only; larger data fields might +// still be returned. +// +// The default threshold is 64K. To retrieve the complete data fields this +// threshold should be turned off by setting it to 0. +// +// https://www.freedesktop.org/software/systemd/man/sd_journal_set_data_threshold.html +func (j *Journal) SetDataThreshold(v uint) error { + if rc := C.sd_journal_set_data_threshold(j.j, C.size_t(v)); rc != 0 { + return fmt.Errorf("journald: error setting journal data threshold: %w", syscall.Errno(-rc)) + } + return nil +} diff --git a/daemon/logger/journald/read.go b/daemon/logger/journald/read.go index 49e4e1b8ae..01f3ec18d8 100644 --- a/daemon/logger/journald/read.go +++ b/daemon/logger/journald/read.go @@ -3,165 +3,87 @@ package journald // import "github.com/docker/docker/daemon/logger/journald" -// #include -// #include -// #include -// #include -// #include -// #include -// #include -// #include -// #include -// -//static int get_message(sd_journal *j, const char **msg, size_t *length, int *partial) -//{ -// int rc; -// size_t plength; -// *msg = NULL; -// *length = 0; -// plength = strlen("CONTAINER_PARTIAL_MESSAGE=true"); -// rc = sd_journal_get_data(j, "CONTAINER_PARTIAL_MESSAGE", (const void **) msg, length); -// *partial = ((rc == 0) && (*length == plength) && (memcmp(*msg, "CONTAINER_PARTIAL_MESSAGE=true", plength) == 0)); -// rc = sd_journal_get_data(j, "MESSAGE", (const void **) msg, length); -// if (rc == 0) { -// if (*length > 8) { -// (*msg) += 8; -// *length -= 8; -// } else { -// *msg = NULL; -// *length = 0; -// rc = -ENOENT; -// } -// } -// return rc; -//} -//static int get_priority(sd_journal *j, int *priority) -//{ -// const void *data; -// size_t i, length; -// int rc; -// *priority = -1; -// rc = sd_journal_get_data(j, "PRIORITY", &data, &length); -// if (rc == 0) { -// if ((length > 9) && (strncmp(data, "PRIORITY=", 9) == 0)) { -// *priority = 0; -// for (i = 9; i < length; i++) { -// *priority = *priority * 10 + ((const char *)data)[i] - '0'; -// } -// if (length > 9) { -// rc = 0; -// } -// } -// } -// return rc; -//} -//static int is_attribute_field(const char *msg, size_t length) -//{ -// static const struct known_field { -// const char *name; -// size_t length; -// } fields[] = { -// {"MESSAGE", sizeof("MESSAGE") - 1}, -// {"MESSAGE_ID", sizeof("MESSAGE_ID") - 1}, -// {"PRIORITY", sizeof("PRIORITY") - 1}, -// {"CODE_FILE", sizeof("CODE_FILE") - 1}, -// {"CODE_LINE", sizeof("CODE_LINE") - 1}, -// {"CODE_FUNC", sizeof("CODE_FUNC") - 1}, -// {"ERRNO", sizeof("ERRNO") - 1}, -// {"SYSLOG_FACILITY", sizeof("SYSLOG_FACILITY") - 1}, -// {"SYSLOG_IDENTIFIER", sizeof("SYSLOG_IDENTIFIER") - 1}, -// {"SYSLOG_PID", sizeof("SYSLOG_PID") - 1}, -// {"CONTAINER_NAME", sizeof("CONTAINER_NAME") - 1}, -// {"CONTAINER_ID", sizeof("CONTAINER_ID") - 1}, -// {"CONTAINER_ID_FULL", sizeof("CONTAINER_ID_FULL") - 1}, -// {"CONTAINER_TAG", sizeof("CONTAINER_TAG") - 1}, -// }; -// unsigned int i; -// void *p; -// if ((length < 1) || (msg[0] == '_') || ((p = memchr(msg, '=', length)) == NULL)) { -// return -1; -// } -// length = ((const char *) p) - msg; -// for (i = 0; i < sizeof(fields) / sizeof(fields[0]); i++) { -// if ((fields[i].length == length) && (memcmp(fields[i].name, msg, length) == 0)) { -// return -1; -// } -// } -// return 0; -//} -//static int get_attribute_field(sd_journal *j, const char **msg, size_t *length) -//{ -// int rc; -// *msg = NULL; -// *length = 0; -// while ((rc = sd_journal_enumerate_data(j, (const void **) msg, length)) > 0) { -// if (is_attribute_field(*msg, *length) == 0) { -// break; -// } -// rc = -ENOENT; -// } -// return rc; -//} -import "C" - import ( "errors" "runtime" - "strings" + "strconv" "time" - "unsafe" "github.com/coreos/go-systemd/v22/journal" + "github.com/sirupsen/logrus" + "github.com/docker/docker/api/types/backend" "github.com/docker/docker/daemon/logger" - "github.com/sirupsen/logrus" + "github.com/docker/docker/daemon/logger/journald/internal/sdjournal" ) -// CErr converts error code returned from a sd_journal_* function -// (which returns -errno) to a string -func CErr(ret C.int) string { - return C.GoString(C.strerror(-ret)) +var wellKnownFields = map[string]bool{ + "MESSAGE": true, + "MESSAGE_ID": true, + "PRIORITY": true, + "CODE_FILE": true, + "CODE_LINE": true, + "CODE_FUNC": true, + "ERRNO": true, + "SYSLOG_FACILITY": true, + "SYSLOG_IDENTIFIER": true, + "SYSLOG_PID": true, + "CONTAINER_NAME": true, + "CONTAINER_ID": true, + "CONTAINER_ID_FULL": true, + "CONTAINER_TAG": true, } -func (s *journald) drainJournal(logWatcher *logger.LogWatcher, j *C.sd_journal, oldCursor *C.char, untilUnixMicro uint64) (*C.char, bool, int) { +func getMessage(d map[string]string) (line []byte, partial, ok bool) { + m, ok := d["MESSAGE"] + return []byte(m), d["CONTAINER_PARTIAL_MESSAGE"] == "true", ok +} + +func getPriority(d map[string]string) (journal.Priority, bool) { + if pri, ok := d["PRIORITY"]; ok { + i, err := strconv.Atoi(pri) + return journal.Priority(i), err == nil + } + return -1, false +} + +func (s *journald) drainJournal(logWatcher *logger.LogWatcher, j *sdjournal.Journal, oldCursor *sdjournal.Cursor, until time.Time) (*sdjournal.Cursor, bool, int) { var ( - msg, data, cursor *C.char - length C.size_t - stamp C.uint64_t - priority, partial C.int - done bool - shown int + done bool + shown int ) // Walk the journal from here forward until we run out of new entries // or we reach the until value (if provided). drain: - for { + for ok := true; ok; ok, _ = j.Next() { // Try not to send a given entry twice. if oldCursor != nil { - for C.sd_journal_test_cursor(j, oldCursor) > 0 { - if C.sd_journal_next(j) <= 0 { + if ok, _ := j.TestCursor(oldCursor); ok { + if ok, _ := j.Next(); !ok { break drain } } } // Read and send the logged message, if there is one to read. - i := C.get_message(j, &msg, &length, &partial) - if i != -C.ENOENT && i != -C.EADDRNOTAVAIL { + data, err := j.Data() + if errors.Is(err, sdjournal.ErrInvalidReadPointer) { + continue + } + if line, partial, ok := getMessage(data); ok { // Read the entry's timestamp. - if C.sd_journal_get_realtime_usec(j, &stamp) != 0 { + timestamp, err := j.Realtime() + if err != nil { break } // Break if the timestamp exceeds any provided until flag. - if untilUnixMicro != 0 && untilUnixMicro < uint64(stamp) { + if !until.IsZero() && until.Before(timestamp) { done = true break } - // Set up the time and text of the entry. - timestamp := time.Unix(int64(stamp)/1000000, (int64(stamp)%1000000)*1000) - line := C.GoBytes(unsafe.Pointer(msg), C.int(length)) - if partial == 0 { + // Set up the text of the entry. + if !partial { line = append(line, "\n"...) } // Recover the stream name by mapping @@ -169,19 +91,19 @@ drain: // the stream that we would have // assigned that value. source := "" - if C.get_priority(j, &priority) == 0 { - if priority == C.int(journal.PriErr) { + if priority, ok := getPriority(data); ok { + if priority == journal.PriErr { source = "stderr" - } else if priority == C.int(journal.PriInfo) { + } else if priority == journal.PriInfo { source = "stdout" } } // Retrieve the values of any variables we're adding to the journal. var attrs []backend.LogAttr - C.sd_journal_restart_data(j) - for C.get_attribute_field(j, &data, &length) > C.int(0) { - kv := strings.SplitN(C.GoStringN(data, C.int(length)), "=", 2) - attrs = append(attrs, backend.LogAttr{Key: kv[0], Value: kv[1]}) + for k, v := range data { + if k[0] != '_' && !wellKnownFields[k] { + attrs = append(attrs, backend.LogAttr{Key: k, Value: v}) + } } // Send the log message, unless the consumer is gone @@ -200,37 +122,26 @@ drain: // Call sd_journal_process() periodically during the processing loop // to close any opened file descriptors for rotated (deleted) journal files. if shown%1024 == 0 { - if ret := C.sd_journal_process(j); ret < 0 { + if _, err := j.Process(); err != nil { // log a warning but ignore it for now logrus.WithField("container", s.vars["CONTAINER_ID_FULL"]). - WithField("error", CErr(ret)). + WithField("error", err). Warn("journald: error processing journal") } } } - // If we're at the end of the journal, we're done (for now). - if C.sd_journal_next(j) <= 0 { - break - } } - // free(NULL) is safe - C.free(unsafe.Pointer(oldCursor)) - if C.sd_journal_get_cursor(j, &cursor) != 0 { - // ensure that we won't be freeing an address that's invalid - cursor = nil - } + cursor, _ := j.Cursor() return cursor, done, shown } -func (s *journald) followJournal(logWatcher *logger.LogWatcher, j *C.sd_journal, cursor *C.char, untilUnixMicro uint64) *C.char { - waitTimeout := C.uint64_t(250000) // 0.25s - +func (s *journald) followJournal(logWatcher *logger.LogWatcher, j *sdjournal.Journal, cursor *sdjournal.Cursor, until time.Time) *sdjournal.Cursor { LOOP: for { - status := C.sd_journal_wait(j, waitTimeout) - if status < 0 { - logWatcher.Err <- errors.New("error waiting for journal: " + CErr(status)) + status, err := j.Wait(250 * time.Millisecond) + if err != nil { + logWatcher.Err <- err break } select { @@ -240,14 +151,15 @@ LOOP: // container is gone, drain journal default: // container is still alive - if status == C.SD_JOURNAL_NOP { + if status == sdjournal.StatusNOP { // no new data -- keep waiting continue } } - newCursor, done, recv := s.drainJournal(logWatcher, j, cursor, untilUnixMicro) + newCursor, done, recv := s.drainJournal(logWatcher, j, cursor, until) + cursor.Free() cursor = newCursor - if done || (status == C.SD_JOURNAL_NOP && recv == 0) { + if done || (status == sdjournal.StatusNOP && recv == 0) { break } } @@ -256,14 +168,6 @@ LOOP: } func (s *journald) readLogs(logWatcher *logger.LogWatcher, config logger.ReadConfig) { - var ( - j *C.sd_journal - cmatch, cursor *C.char - stamp C.uint64_t - sinceUnixMicro uint64 - untilUnixMicro uint64 - ) - defer close(logWatcher.Msg) // Quoting https://www.freedesktop.org/software/systemd/man/sd-journal.html: @@ -272,92 +176,85 @@ func (s *journald) readLogs(logWatcher *logger.LogWatcher, config logger.ReadCon // specific thread at all times (and it has to be the very same one // during the entire lifetime of the object), but multiple, // independent threads may use multiple, independent objects safely. + // + // sdjournal.Open returns a value which wraps an sd_journal pointer so + // we need to abide by those rules. runtime.LockOSThread() defer runtime.UnlockOSThread() // Get a handle to the journal. - if rc := C.sd_journal_open(&j, C.int(0)); rc != 0 { - logWatcher.Err <- errors.New("error opening journal: " + CErr(rc)) + j, err := sdjournal.Open(0) + if err != nil { + logWatcher.Err <- err return } - defer C.sd_journal_close(j) + defer j.Close() if config.Follow { // Initialize library inotify watches early - if rc := C.sd_journal_get_fd(j); rc < 0 { - logWatcher.Err <- errors.New("error getting journald fd: " + CErr(rc)) + if err := j.InitializeInotify(); err != nil { + logWatcher.Err <- err return } } // Remove limits on the size of data items that we'll retrieve. - if rc := C.sd_journal_set_data_threshold(j, C.size_t(0)); rc != 0 { - logWatcher.Err <- errors.New("error setting journal data threshold: " + CErr(rc)) + if err := j.SetDataThreshold(0); err != nil { + logWatcher.Err <- err return } // Add a match to have the library do the searching for us. - cmatch = C.CString("CONTAINER_ID_FULL=" + s.vars["CONTAINER_ID_FULL"]) - defer C.free(unsafe.Pointer(cmatch)) - if rc := C.sd_journal_add_match(j, unsafe.Pointer(cmatch), C.strlen(cmatch)); rc != 0 { - logWatcher.Err <- errors.New("error setting journal match: " + CErr(rc)) + if err := j.AddMatch("CONTAINER_ID_FULL", s.vars["CONTAINER_ID_FULL"]); err != nil { + logWatcher.Err <- err return } - // If we have a cutoff time, convert it to Unix time once. - if !config.Since.IsZero() { - nano := config.Since.UnixNano() - sinceUnixMicro = uint64(nano / 1000) - } - // If we have an until value, convert it too - if !config.Until.IsZero() { - nano := config.Until.UnixNano() - untilUnixMicro = uint64(nano / 1000) - } if config.Tail >= 0 { // If until time provided, start from there. // Otherwise start at the end of the journal. - if untilUnixMicro != 0 { - if rc := C.sd_journal_seek_realtime_usec(j, C.uint64_t(untilUnixMicro)); rc != 0 { - logWatcher.Err <- errors.New("error seeking provided until value: " + CErr(rc)) + if !config.Until.IsZero() { + if err := j.SeekRealtime(config.Until); err != nil { + logWatcher.Err <- err return } - } else if rc := C.sd_journal_seek_tail(j); rc != 0 { - logWatcher.Err <- errors.New("error seeking to end of journal: " + CErr(rc)) + } else if err := j.SeekTail(); err != nil { + logWatcher.Err <- err return } // (Try to) skip backwards by the requested number of lines... - if C.sd_journal_previous_skip(j, C.uint64_t(config.Tail)) >= 0 { + if _, err := j.PreviousSkip(uint(config.Tail)); err == nil { // ...but not before "since" - if sinceUnixMicro != 0 && - C.sd_journal_get_realtime_usec(j, &stamp) == 0 && - uint64(stamp) < sinceUnixMicro { - C.sd_journal_seek_realtime_usec(j, C.uint64_t(sinceUnixMicro)) + if !config.Since.IsZero() { + if stamp, err := j.Realtime(); err == nil && stamp.Before(config.Since) { + _ = j.SeekRealtime(config.Since) + } } } } else { // Start at the beginning of the journal. - if rc := C.sd_journal_seek_head(j); rc != 0 { - logWatcher.Err <- errors.New("error seeking to start of journal: " + CErr(rc)) + if err := j.SeekHead(); err != nil { + logWatcher.Err <- err return } // If we have a cutoff date, fast-forward to it. - if sinceUnixMicro != 0 { - if rc := C.sd_journal_seek_realtime_usec(j, C.uint64_t(sinceUnixMicro)); rc != 0 { - logWatcher.Err <- errors.New("error seeking to start time in journal: " + CErr(rc)) + if !config.Since.IsZero() { + if err := j.SeekRealtime(config.Since); err != nil { + logWatcher.Err <- err return } } - if rc := C.sd_journal_next(j); rc < 0 { - logWatcher.Err <- errors.New("error skipping to next journal entry: " + CErr(rc)) + if _, err := j.Next(); err != nil { + logWatcher.Err <- err return } } + var cursor *sdjournal.Cursor if config.Tail != 0 { // special case for --tail 0 - cursor, _, _ = s.drainJournal(logWatcher, j, nil, untilUnixMicro) + cursor, _, _ = s.drainJournal(logWatcher, j, nil, config.Until) } if config.Follow { - cursor = s.followJournal(logWatcher, j, cursor, untilUnixMicro) + cursor = s.followJournal(logWatcher, j, cursor, config.Until) } - C.free(unsafe.Pointer(cursor)) + cursor.Free() } func (s *journald) ReadLogs(config logger.ReadConfig) *logger.LogWatcher { diff --git a/daemon/logger/journald/read_native.go b/daemon/logger/journald/read_native.go deleted file mode 100644 index 862d064277..0000000000 --- a/daemon/logger/journald/read_native.go +++ /dev/null @@ -1,7 +0,0 @@ -//go:build linux && cgo && !static_build && journald -// +build linux,cgo,!static_build,journald - -package journald // import "github.com/docker/docker/daemon/logger/journald" - -// #cgo pkg-config: libsystemd -import "C"