logger/journald: refactor cgo out of business logic
Wrap the libsystemd journal reading functionality in a more idiomatic Go API and refactor the journald logging driver's ReadLogs implementation to use the wrapper. Rewrite the parts of the ReadLogs implementation in Go which were previously implemented in C as part of the cgo preamble. Separating the business logic from the cgo minutiae should hopefully make the code more accessible to a wider audience of developers for reviewing the code and contributing improvements. The structure of the ReadLogs implementation is retained with few modifications. Any ignored errors were also ignored before the refactor; the explicit error return values afforded by the sdjournal wrapper makes this more obvious. The package github.com/coreos/go-systemd/v22/sdjournal also provides a more idiomatic Go wrapper around libsystemd. It is unsuitable for our needs as it does not expose wrappers for the sd_journal_process and sd_journal_get_fd functions. Signed-off-by: Cory Snider <csnider@mirantis.com>
This commit is contained in:
parent
7ff4b64319
commit
3e8405aa82
5 changed files with 385 additions and 210 deletions
45
daemon/logger/journald/internal/sdjournal/cursor.go
Normal file
45
daemon/logger/journald/internal/sdjournal/cursor.go
Normal file
|
@ -0,0 +1,45 @@
|
|||
//go:build linux && cgo && !static_build && journald
|
||||
// +build linux,cgo,!static_build,journald
|
||||
|
||||
package sdjournal // import "github.com/docker/docker/daemon/logger/journald/internal/sdjournal"
|
||||
|
||||
// #include <stdlib.h>
|
||||
import "C"
|
||||
import (
|
||||
"runtime"
|
||||
"unsafe"
|
||||
)
|
||||
|
||||
// Cursor is a reference to a journal cursor. A Cursor must not be copied.
|
||||
type Cursor struct {
|
||||
c *C.char
|
||||
noCopy noCopy //nolint:structcheck,unused // Exists only to mark values uncopyable for `go vet`.
|
||||
}
|
||||
|
||||
func wrapCursor(cur *C.char) *Cursor {
|
||||
c := &Cursor{c: cur}
|
||||
runtime.SetFinalizer(c, (*Cursor).Free)
|
||||
return c
|
||||
}
|
||||
|
||||
func (c *Cursor) String() string {
|
||||
if c.c == nil {
|
||||
return "<nil>"
|
||||
}
|
||||
return C.GoString(c.c)
|
||||
}
|
||||
|
||||
// Free invalidates the cursor and frees any associated resources on the C heap.
|
||||
func (c *Cursor) Free() {
|
||||
if c == nil {
|
||||
return
|
||||
}
|
||||
C.free(unsafe.Pointer(c.c))
|
||||
runtime.SetFinalizer(c, nil)
|
||||
c.c = nil
|
||||
}
|
||||
|
||||
type noCopy struct{}
|
||||
|
||||
func (*noCopy) Lock() {}
|
||||
func (*noCopy) Unlock() {}
|
3
daemon/logger/journald/internal/sdjournal/doc.go
Normal file
3
daemon/logger/journald/internal/sdjournal/doc.go
Normal file
|
@ -0,0 +1,3 @@
|
|||
// Package sdjournal provides a Go interface to the systemd journal read API by
|
||||
// wrapping the libsystemd C library.
|
||||
package sdjournal // import "github.com/docker/docker/daemon/logger/journald/internal/sdjournal"
|
237
daemon/logger/journald/internal/sdjournal/sdjournal.go
Normal file
237
daemon/logger/journald/internal/sdjournal/sdjournal.go
Normal file
|
@ -0,0 +1,237 @@
|
|||
//go:build linux && cgo && !static_build && journald
|
||||
// +build linux,cgo,!static_build,journald
|
||||
|
||||
package sdjournal // import "github.com/docker/docker/daemon/logger/journald/internal/sdjournal"
|
||||
|
||||
// #cgo pkg-config: libsystemd
|
||||
// #include <stdlib.h>
|
||||
// #include <systemd/sd-journal.h>
|
||||
//
|
||||
// static int add_match(sd_journal *j, _GoString_ s) {
|
||||
// return sd_journal_add_match(j, _GoStringPtr(s), _GoStringLen(s));
|
||||
// }
|
||||
import "C"
|
||||
import (
|
||||
"fmt"
|
||||
"runtime"
|
||||
"strings"
|
||||
"syscall"
|
||||
"time"
|
||||
"unsafe"
|
||||
)
|
||||
|
||||
// Status is an sd-journal status code.
|
||||
type Status int
|
||||
|
||||
// Status values for Process() and Wait().
|
||||
const (
|
||||
StatusNOP = Status(C.SD_JOURNAL_NOP) // SD_JOURNAL_NOP
|
||||
StatusAPPEND = Status(C.SD_JOURNAL_APPEND) // SD_JOURNAL_APPEND
|
||||
StatusINVALIDATE = Status(C.SD_JOURNAL_INVALIDATE) // SD_JOURNAL_INVALIDATE
|
||||
)
|
||||
|
||||
const (
|
||||
// ErrInvalidReadPointer is the error returned when the read pointer is
|
||||
// in an invalid position.
|
||||
ErrInvalidReadPointer = syscall.EADDRNOTAVAIL
|
||||
)
|
||||
|
||||
// Journal is a handle to an open journald journal.
|
||||
type Journal struct {
|
||||
j *C.sd_journal
|
||||
noCopy noCopy //nolint:structcheck,unused // Exists only to mark values uncopyable for `go vet`.
|
||||
}
|
||||
|
||||
// Open opens the log journal for reading.
|
||||
//
|
||||
// The returned Journal value may only be used from the same operating system
|
||||
// thread which Open was called from. Using it from only a single goroutine is
|
||||
// not sufficient; runtime.LockOSThread must also be used.
|
||||
func Open(flags int) (*Journal, error) {
|
||||
j := &Journal{}
|
||||
if rc := C.sd_journal_open(&j.j, C.int(flags)); rc != 0 {
|
||||
return nil, fmt.Errorf("journald: error opening journal: %w", syscall.Errno(-rc))
|
||||
}
|
||||
runtime.SetFinalizer(j, (*Journal).Close)
|
||||
return j, nil
|
||||
}
|
||||
|
||||
// Close closes the journal. The return value is always nil.
|
||||
func (j *Journal) Close() error {
|
||||
if j.j != nil {
|
||||
C.sd_journal_close(j.j)
|
||||
runtime.SetFinalizer(j, nil)
|
||||
j.j = nil
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// Process processes journal events.
|
||||
//
|
||||
// https://www.freedesktop.org/software/systemd/man/sd_journal_process.html
|
||||
func (j *Journal) Process() (Status, error) {
|
||||
s := C.sd_journal_process(j.j)
|
||||
if s < 0 {
|
||||
return 0, fmt.Errorf("journald: error processing events: %w", syscall.Errno(-s))
|
||||
}
|
||||
return Status(s), nil
|
||||
}
|
||||
|
||||
// InitializeInotify sets up change notifications for the journal.
|
||||
func (j *Journal) InitializeInotify() error {
|
||||
if rc := C.sd_journal_get_fd(j.j); rc < 0 {
|
||||
return fmt.Errorf("journald: error initializing inotify watches: %w", syscall.Errno(-rc))
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// AddMatch adds a match by which to filter the entries of the journal file.
|
||||
//
|
||||
// https://www.freedesktop.org/software/systemd/man/sd_journal_add_match.html
|
||||
func (j *Journal) AddMatch(field, value string) error {
|
||||
m := field + "=" + value
|
||||
if rc := C.add_match(j.j, m); rc != 0 {
|
||||
return fmt.Errorf("journald: error adding match %q: %w", m, syscall.Errno(-rc))
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// Next advances the read pointer to the next entry.
|
||||
func (j *Journal) Next() (bool, error) {
|
||||
rc := C.sd_journal_next(j.j)
|
||||
if rc < 0 {
|
||||
return false, fmt.Errorf("journald: error advancing read pointer: %w", syscall.Errno(-rc))
|
||||
}
|
||||
return rc > 0, nil
|
||||
}
|
||||
|
||||
// PreviousSkip sets back the read pointer by n entries. The number of entries
|
||||
// must be less than or equal to 2147483647 (2**31 - 1).
|
||||
func (j *Journal) PreviousSkip(n uint) (int, error) {
|
||||
rc := C.sd_journal_previous_skip(j.j, C.uint64_t(n))
|
||||
if rc < 0 {
|
||||
return 0, fmt.Errorf("journald: error setting back read pointer: %w", syscall.Errno(-rc))
|
||||
}
|
||||
return int(rc), nil
|
||||
}
|
||||
|
||||
// SeekHead sets the read pointer to the position before the oldest available entry.
|
||||
func (j *Journal) SeekHead() error {
|
||||
if rc := C.sd_journal_seek_head(j.j); rc != 0 {
|
||||
return fmt.Errorf("journald: error seeking to head of journal: %w", syscall.Errno(-rc))
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// SeekTail sets the read pointer to the position after the most recent available entry.
|
||||
func (j *Journal) SeekTail() error {
|
||||
if rc := C.sd_journal_seek_tail(j.j); rc != 0 {
|
||||
return fmt.Errorf("journald: error seeking to tail of journal: %w", syscall.Errno(-rc))
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// SeekRealtime seeks to a position with a realtime (wallclock) timestamp after t.
|
||||
//
|
||||
// Note that the realtime clock is not necessarily monotonic. If a realtime
|
||||
// timestamp is ambiguous, the position seeked to is not defined.
|
||||
func (j *Journal) SeekRealtime(t time.Time) error {
|
||||
if rc := C.sd_journal_seek_realtime_usec(j.j, C.uint64_t(t.UnixMicro())); rc != 0 {
|
||||
return fmt.Errorf("journald: error seeking to time %v: %w", t, syscall.Errno(-rc))
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// Cursor returns a serialization of the journal read pointer's current position.
|
||||
func (j *Journal) Cursor() (*Cursor, error) {
|
||||
var c *C.char
|
||||
if rc := C.sd_journal_get_cursor(j.j, &c); rc != 0 {
|
||||
return nil, fmt.Errorf("journald: error getting cursor: %w", syscall.Errno(-rc))
|
||||
}
|
||||
return wrapCursor(c), nil
|
||||
}
|
||||
|
||||
// TestCursor checks whether the current position of the journal read pointer matches c.
|
||||
func (j *Journal) TestCursor(c *Cursor) (bool, error) {
|
||||
rc := C.sd_journal_test_cursor(j.j, c.c)
|
||||
if rc < 0 {
|
||||
return false, fmt.Errorf("journald: error testing cursor: %w", syscall.Errno(-rc))
|
||||
}
|
||||
return rc > 0, nil
|
||||
}
|
||||
|
||||
// Wait blocks until the journal gets changed or timeout has elapsed.
|
||||
// Pass a negative timeout to wait indefinitely.
|
||||
func (j *Journal) Wait(timeout time.Duration) (Status, error) {
|
||||
var dur C.uint64_t
|
||||
if timeout < 0 {
|
||||
// Wait indefinitely.
|
||||
dur = ^C.uint64_t(0) // (uint64_t) -1
|
||||
} else {
|
||||
dur = C.uint64_t(timeout.Microseconds())
|
||||
}
|
||||
s := C.sd_journal_wait(j.j, dur)
|
||||
if s < 0 {
|
||||
return 0, fmt.Errorf("journald: error waiting for event: %w", syscall.Errno(-s))
|
||||
}
|
||||
return Status(s), nil
|
||||
}
|
||||
|
||||
// Realtime returns the realtime timestamp of the current journal entry.
|
||||
func (j *Journal) Realtime() (time.Time, error) {
|
||||
var stamp C.uint64_t
|
||||
if rc := C.sd_journal_get_realtime_usec(j.j, &stamp); rc != 0 {
|
||||
return time.Time{}, fmt.Errorf("journald: error getting journal entry timestamp: %w", syscall.Errno(-rc))
|
||||
}
|
||||
return time.UnixMicro(int64(stamp)), nil
|
||||
}
|
||||
|
||||
// Data returns all data fields for the current journal entry.
|
||||
func (j *Journal) Data() (map[string]string, error) {
|
||||
// Copying all the data fields for the entry into a map is more optimal
|
||||
// than you might think. Doing so has time complexity O(N), where N is
|
||||
// the number of fields in the entry. Looking up a data field in the map
|
||||
// is amortized O(1), so the total complexity to look up M data fields
|
||||
// is O(N+M). By comparison, looking up a data field using the
|
||||
// sd_journal_get_data function has time complexity of O(N) as it is
|
||||
// implemented as a linear search through the entry's fields. Therefore
|
||||
// looking up M data fields in an entry by calling sd_journal_get_data
|
||||
// in a loop would have time complexity of O(N*M).
|
||||
|
||||
m := make(map[string]string)
|
||||
j.restartData()
|
||||
for {
|
||||
var (
|
||||
data unsafe.Pointer
|
||||
len C.size_t
|
||||
)
|
||||
rc := C.sd_journal_enumerate_data(j.j, &data, &len)
|
||||
if rc == 0 {
|
||||
return m, nil
|
||||
} else if rc < 0 {
|
||||
return m, fmt.Errorf("journald: error enumerating entry data: %w", syscall.Errno(-rc))
|
||||
}
|
||||
|
||||
kv := strings.SplitN(C.GoStringN((*C.char)(data), C.int(len)), "=", 2)
|
||||
m[kv[0]] = kv[1]
|
||||
}
|
||||
}
|
||||
|
||||
func (j *Journal) restartData() {
|
||||
C.sd_journal_restart_data(j.j)
|
||||
}
|
||||
|
||||
// SetDataThreshold may be used to change the data field size threshold for data
|
||||
// returned by j.Data(). The threshold is a hint only; larger data fields might
|
||||
// still be returned.
|
||||
//
|
||||
// The default threshold is 64K. To retrieve the complete data fields this
|
||||
// threshold should be turned off by setting it to 0.
|
||||
//
|
||||
// https://www.freedesktop.org/software/systemd/man/sd_journal_set_data_threshold.html
|
||||
func (j *Journal) SetDataThreshold(v uint) error {
|
||||
if rc := C.sd_journal_set_data_threshold(j.j, C.size_t(v)); rc != 0 {
|
||||
return fmt.Errorf("journald: error setting journal data threshold: %w", syscall.Errno(-rc))
|
||||
}
|
||||
return nil
|
||||
}
|
|
@ -3,165 +3,87 @@
|
|||
|
||||
package journald // import "github.com/docker/docker/daemon/logger/journald"
|
||||
|
||||
// #include <sys/types.h>
|
||||
// #include <sys/poll.h>
|
||||
// #include <systemd/sd-journal.h>
|
||||
// #include <errno.h>
|
||||
// #include <stdio.h>
|
||||
// #include <stdlib.h>
|
||||
// #include <string.h>
|
||||
// #include <time.h>
|
||||
// #include <unistd.h>
|
||||
//
|
||||
//static int get_message(sd_journal *j, const char **msg, size_t *length, int *partial)
|
||||
//{
|
||||
// int rc;
|
||||
// size_t plength;
|
||||
// *msg = NULL;
|
||||
// *length = 0;
|
||||
// plength = strlen("CONTAINER_PARTIAL_MESSAGE=true");
|
||||
// rc = sd_journal_get_data(j, "CONTAINER_PARTIAL_MESSAGE", (const void **) msg, length);
|
||||
// *partial = ((rc == 0) && (*length == plength) && (memcmp(*msg, "CONTAINER_PARTIAL_MESSAGE=true", plength) == 0));
|
||||
// rc = sd_journal_get_data(j, "MESSAGE", (const void **) msg, length);
|
||||
// if (rc == 0) {
|
||||
// if (*length > 8) {
|
||||
// (*msg) += 8;
|
||||
// *length -= 8;
|
||||
// } else {
|
||||
// *msg = NULL;
|
||||
// *length = 0;
|
||||
// rc = -ENOENT;
|
||||
// }
|
||||
// }
|
||||
// return rc;
|
||||
//}
|
||||
//static int get_priority(sd_journal *j, int *priority)
|
||||
//{
|
||||
// const void *data;
|
||||
// size_t i, length;
|
||||
// int rc;
|
||||
// *priority = -1;
|
||||
// rc = sd_journal_get_data(j, "PRIORITY", &data, &length);
|
||||
// if (rc == 0) {
|
||||
// if ((length > 9) && (strncmp(data, "PRIORITY=", 9) == 0)) {
|
||||
// *priority = 0;
|
||||
// for (i = 9; i < length; i++) {
|
||||
// *priority = *priority * 10 + ((const char *)data)[i] - '0';
|
||||
// }
|
||||
// if (length > 9) {
|
||||
// rc = 0;
|
||||
// }
|
||||
// }
|
||||
// }
|
||||
// return rc;
|
||||
//}
|
||||
//static int is_attribute_field(const char *msg, size_t length)
|
||||
//{
|
||||
// static const struct known_field {
|
||||
// const char *name;
|
||||
// size_t length;
|
||||
// } fields[] = {
|
||||
// {"MESSAGE", sizeof("MESSAGE") - 1},
|
||||
// {"MESSAGE_ID", sizeof("MESSAGE_ID") - 1},
|
||||
// {"PRIORITY", sizeof("PRIORITY") - 1},
|
||||
// {"CODE_FILE", sizeof("CODE_FILE") - 1},
|
||||
// {"CODE_LINE", sizeof("CODE_LINE") - 1},
|
||||
// {"CODE_FUNC", sizeof("CODE_FUNC") - 1},
|
||||
// {"ERRNO", sizeof("ERRNO") - 1},
|
||||
// {"SYSLOG_FACILITY", sizeof("SYSLOG_FACILITY") - 1},
|
||||
// {"SYSLOG_IDENTIFIER", sizeof("SYSLOG_IDENTIFIER") - 1},
|
||||
// {"SYSLOG_PID", sizeof("SYSLOG_PID") - 1},
|
||||
// {"CONTAINER_NAME", sizeof("CONTAINER_NAME") - 1},
|
||||
// {"CONTAINER_ID", sizeof("CONTAINER_ID") - 1},
|
||||
// {"CONTAINER_ID_FULL", sizeof("CONTAINER_ID_FULL") - 1},
|
||||
// {"CONTAINER_TAG", sizeof("CONTAINER_TAG") - 1},
|
||||
// };
|
||||
// unsigned int i;
|
||||
// void *p;
|
||||
// if ((length < 1) || (msg[0] == '_') || ((p = memchr(msg, '=', length)) == NULL)) {
|
||||
// return -1;
|
||||
// }
|
||||
// length = ((const char *) p) - msg;
|
||||
// for (i = 0; i < sizeof(fields) / sizeof(fields[0]); i++) {
|
||||
// if ((fields[i].length == length) && (memcmp(fields[i].name, msg, length) == 0)) {
|
||||
// return -1;
|
||||
// }
|
||||
// }
|
||||
// return 0;
|
||||
//}
|
||||
//static int get_attribute_field(sd_journal *j, const char **msg, size_t *length)
|
||||
//{
|
||||
// int rc;
|
||||
// *msg = NULL;
|
||||
// *length = 0;
|
||||
// while ((rc = sd_journal_enumerate_data(j, (const void **) msg, length)) > 0) {
|
||||
// if (is_attribute_field(*msg, *length) == 0) {
|
||||
// break;
|
||||
// }
|
||||
// rc = -ENOENT;
|
||||
// }
|
||||
// return rc;
|
||||
//}
|
||||
import "C"
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"runtime"
|
||||
"strings"
|
||||
"strconv"
|
||||
"time"
|
||||
"unsafe"
|
||||
|
||||
"github.com/coreos/go-systemd/v22/journal"
|
||||
"github.com/sirupsen/logrus"
|
||||
|
||||
"github.com/docker/docker/api/types/backend"
|
||||
"github.com/docker/docker/daemon/logger"
|
||||
"github.com/sirupsen/logrus"
|
||||
"github.com/docker/docker/daemon/logger/journald/internal/sdjournal"
|
||||
)
|
||||
|
||||
// CErr converts error code returned from a sd_journal_* function
|
||||
// (which returns -errno) to a string
|
||||
func CErr(ret C.int) string {
|
||||
return C.GoString(C.strerror(-ret))
|
||||
var wellKnownFields = map[string]bool{
|
||||
"MESSAGE": true,
|
||||
"MESSAGE_ID": true,
|
||||
"PRIORITY": true,
|
||||
"CODE_FILE": true,
|
||||
"CODE_LINE": true,
|
||||
"CODE_FUNC": true,
|
||||
"ERRNO": true,
|
||||
"SYSLOG_FACILITY": true,
|
||||
"SYSLOG_IDENTIFIER": true,
|
||||
"SYSLOG_PID": true,
|
||||
"CONTAINER_NAME": true,
|
||||
"CONTAINER_ID": true,
|
||||
"CONTAINER_ID_FULL": true,
|
||||
"CONTAINER_TAG": true,
|
||||
}
|
||||
|
||||
func (s *journald) drainJournal(logWatcher *logger.LogWatcher, j *C.sd_journal, oldCursor *C.char, untilUnixMicro uint64) (*C.char, bool, int) {
|
||||
func getMessage(d map[string]string) (line []byte, partial, ok bool) {
|
||||
m, ok := d["MESSAGE"]
|
||||
return []byte(m), d["CONTAINER_PARTIAL_MESSAGE"] == "true", ok
|
||||
}
|
||||
|
||||
func getPriority(d map[string]string) (journal.Priority, bool) {
|
||||
if pri, ok := d["PRIORITY"]; ok {
|
||||
i, err := strconv.Atoi(pri)
|
||||
return journal.Priority(i), err == nil
|
||||
}
|
||||
return -1, false
|
||||
}
|
||||
|
||||
func (s *journald) drainJournal(logWatcher *logger.LogWatcher, j *sdjournal.Journal, oldCursor *sdjournal.Cursor, until time.Time) (*sdjournal.Cursor, bool, int) {
|
||||
var (
|
||||
msg, data, cursor *C.char
|
||||
length C.size_t
|
||||
stamp C.uint64_t
|
||||
priority, partial C.int
|
||||
done bool
|
||||
shown int
|
||||
done bool
|
||||
shown int
|
||||
)
|
||||
|
||||
// Walk the journal from here forward until we run out of new entries
|
||||
// or we reach the until value (if provided).
|
||||
drain:
|
||||
for {
|
||||
for ok := true; ok; ok, _ = j.Next() {
|
||||
// Try not to send a given entry twice.
|
||||
if oldCursor != nil {
|
||||
for C.sd_journal_test_cursor(j, oldCursor) > 0 {
|
||||
if C.sd_journal_next(j) <= 0 {
|
||||
if ok, _ := j.TestCursor(oldCursor); ok {
|
||||
if ok, _ := j.Next(); !ok {
|
||||
break drain
|
||||
}
|
||||
}
|
||||
}
|
||||
// Read and send the logged message, if there is one to read.
|
||||
i := C.get_message(j, &msg, &length, &partial)
|
||||
if i != -C.ENOENT && i != -C.EADDRNOTAVAIL {
|
||||
data, err := j.Data()
|
||||
if errors.Is(err, sdjournal.ErrInvalidReadPointer) {
|
||||
continue
|
||||
}
|
||||
if line, partial, ok := getMessage(data); ok {
|
||||
// Read the entry's timestamp.
|
||||
if C.sd_journal_get_realtime_usec(j, &stamp) != 0 {
|
||||
timestamp, err := j.Realtime()
|
||||
if err != nil {
|
||||
break
|
||||
}
|
||||
// Break if the timestamp exceeds any provided until flag.
|
||||
if untilUnixMicro != 0 && untilUnixMicro < uint64(stamp) {
|
||||
if !until.IsZero() && until.Before(timestamp) {
|
||||
done = true
|
||||
break
|
||||
}
|
||||
|
||||
// Set up the time and text of the entry.
|
||||
timestamp := time.Unix(int64(stamp)/1000000, (int64(stamp)%1000000)*1000)
|
||||
line := C.GoBytes(unsafe.Pointer(msg), C.int(length))
|
||||
if partial == 0 {
|
||||
// Set up the text of the entry.
|
||||
if !partial {
|
||||
line = append(line, "\n"...)
|
||||
}
|
||||
// Recover the stream name by mapping
|
||||
|
@ -169,19 +91,19 @@ drain:
|
|||
// the stream that we would have
|
||||
// assigned that value.
|
||||
source := ""
|
||||
if C.get_priority(j, &priority) == 0 {
|
||||
if priority == C.int(journal.PriErr) {
|
||||
if priority, ok := getPriority(data); ok {
|
||||
if priority == journal.PriErr {
|
||||
source = "stderr"
|
||||
} else if priority == C.int(journal.PriInfo) {
|
||||
} else if priority == journal.PriInfo {
|
||||
source = "stdout"
|
||||
}
|
||||
}
|
||||
// Retrieve the values of any variables we're adding to the journal.
|
||||
var attrs []backend.LogAttr
|
||||
C.sd_journal_restart_data(j)
|
||||
for C.get_attribute_field(j, &data, &length) > C.int(0) {
|
||||
kv := strings.SplitN(C.GoStringN(data, C.int(length)), "=", 2)
|
||||
attrs = append(attrs, backend.LogAttr{Key: kv[0], Value: kv[1]})
|
||||
for k, v := range data {
|
||||
if k[0] != '_' && !wellKnownFields[k] {
|
||||
attrs = append(attrs, backend.LogAttr{Key: k, Value: v})
|
||||
}
|
||||
}
|
||||
|
||||
// Send the log message, unless the consumer is gone
|
||||
|
@ -200,37 +122,26 @@ drain:
|
|||
// Call sd_journal_process() periodically during the processing loop
|
||||
// to close any opened file descriptors for rotated (deleted) journal files.
|
||||
if shown%1024 == 0 {
|
||||
if ret := C.sd_journal_process(j); ret < 0 {
|
||||
if _, err := j.Process(); err != nil {
|
||||
// log a warning but ignore it for now
|
||||
logrus.WithField("container", s.vars["CONTAINER_ID_FULL"]).
|
||||
WithField("error", CErr(ret)).
|
||||
WithField("error", err).
|
||||
Warn("journald: error processing journal")
|
||||
}
|
||||
}
|
||||
}
|
||||
// If we're at the end of the journal, we're done (for now).
|
||||
if C.sd_journal_next(j) <= 0 {
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
// free(NULL) is safe
|
||||
C.free(unsafe.Pointer(oldCursor))
|
||||
if C.sd_journal_get_cursor(j, &cursor) != 0 {
|
||||
// ensure that we won't be freeing an address that's invalid
|
||||
cursor = nil
|
||||
}
|
||||
cursor, _ := j.Cursor()
|
||||
return cursor, done, shown
|
||||
}
|
||||
|
||||
func (s *journald) followJournal(logWatcher *logger.LogWatcher, j *C.sd_journal, cursor *C.char, untilUnixMicro uint64) *C.char {
|
||||
waitTimeout := C.uint64_t(250000) // 0.25s
|
||||
|
||||
func (s *journald) followJournal(logWatcher *logger.LogWatcher, j *sdjournal.Journal, cursor *sdjournal.Cursor, until time.Time) *sdjournal.Cursor {
|
||||
LOOP:
|
||||
for {
|
||||
status := C.sd_journal_wait(j, waitTimeout)
|
||||
if status < 0 {
|
||||
logWatcher.Err <- errors.New("error waiting for journal: " + CErr(status))
|
||||
status, err := j.Wait(250 * time.Millisecond)
|
||||
if err != nil {
|
||||
logWatcher.Err <- err
|
||||
break
|
||||
}
|
||||
select {
|
||||
|
@ -240,14 +151,15 @@ LOOP:
|
|||
// container is gone, drain journal
|
||||
default:
|
||||
// container is still alive
|
||||
if status == C.SD_JOURNAL_NOP {
|
||||
if status == sdjournal.StatusNOP {
|
||||
// no new data -- keep waiting
|
||||
continue
|
||||
}
|
||||
}
|
||||
newCursor, done, recv := s.drainJournal(logWatcher, j, cursor, untilUnixMicro)
|
||||
newCursor, done, recv := s.drainJournal(logWatcher, j, cursor, until)
|
||||
cursor.Free()
|
||||
cursor = newCursor
|
||||
if done || (status == C.SD_JOURNAL_NOP && recv == 0) {
|
||||
if done || (status == sdjournal.StatusNOP && recv == 0) {
|
||||
break
|
||||
}
|
||||
}
|
||||
|
@ -256,14 +168,6 @@ LOOP:
|
|||
}
|
||||
|
||||
func (s *journald) readLogs(logWatcher *logger.LogWatcher, config logger.ReadConfig) {
|
||||
var (
|
||||
j *C.sd_journal
|
||||
cmatch, cursor *C.char
|
||||
stamp C.uint64_t
|
||||
sinceUnixMicro uint64
|
||||
untilUnixMicro uint64
|
||||
)
|
||||
|
||||
defer close(logWatcher.Msg)
|
||||
|
||||
// Quoting https://www.freedesktop.org/software/systemd/man/sd-journal.html:
|
||||
|
@ -272,92 +176,85 @@ func (s *journald) readLogs(logWatcher *logger.LogWatcher, config logger.ReadCon
|
|||
// specific thread at all times (and it has to be the very same one
|
||||
// during the entire lifetime of the object), but multiple,
|
||||
// independent threads may use multiple, independent objects safely.
|
||||
//
|
||||
// sdjournal.Open returns a value which wraps an sd_journal pointer so
|
||||
// we need to abide by those rules.
|
||||
runtime.LockOSThread()
|
||||
defer runtime.UnlockOSThread()
|
||||
|
||||
// Get a handle to the journal.
|
||||
if rc := C.sd_journal_open(&j, C.int(0)); rc != 0 {
|
||||
logWatcher.Err <- errors.New("error opening journal: " + CErr(rc))
|
||||
j, err := sdjournal.Open(0)
|
||||
if err != nil {
|
||||
logWatcher.Err <- err
|
||||
return
|
||||
}
|
||||
defer C.sd_journal_close(j)
|
||||
defer j.Close()
|
||||
|
||||
if config.Follow {
|
||||
// Initialize library inotify watches early
|
||||
if rc := C.sd_journal_get_fd(j); rc < 0 {
|
||||
logWatcher.Err <- errors.New("error getting journald fd: " + CErr(rc))
|
||||
if err := j.InitializeInotify(); err != nil {
|
||||
logWatcher.Err <- err
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
// Remove limits on the size of data items that we'll retrieve.
|
||||
if rc := C.sd_journal_set_data_threshold(j, C.size_t(0)); rc != 0 {
|
||||
logWatcher.Err <- errors.New("error setting journal data threshold: " + CErr(rc))
|
||||
if err := j.SetDataThreshold(0); err != nil {
|
||||
logWatcher.Err <- err
|
||||
return
|
||||
}
|
||||
// Add a match to have the library do the searching for us.
|
||||
cmatch = C.CString("CONTAINER_ID_FULL=" + s.vars["CONTAINER_ID_FULL"])
|
||||
defer C.free(unsafe.Pointer(cmatch))
|
||||
if rc := C.sd_journal_add_match(j, unsafe.Pointer(cmatch), C.strlen(cmatch)); rc != 0 {
|
||||
logWatcher.Err <- errors.New("error setting journal match: " + CErr(rc))
|
||||
if err := j.AddMatch("CONTAINER_ID_FULL", s.vars["CONTAINER_ID_FULL"]); err != nil {
|
||||
logWatcher.Err <- err
|
||||
return
|
||||
}
|
||||
// If we have a cutoff time, convert it to Unix time once.
|
||||
if !config.Since.IsZero() {
|
||||
nano := config.Since.UnixNano()
|
||||
sinceUnixMicro = uint64(nano / 1000)
|
||||
}
|
||||
// If we have an until value, convert it too
|
||||
if !config.Until.IsZero() {
|
||||
nano := config.Until.UnixNano()
|
||||
untilUnixMicro = uint64(nano / 1000)
|
||||
}
|
||||
if config.Tail >= 0 {
|
||||
// If until time provided, start from there.
|
||||
// Otherwise start at the end of the journal.
|
||||
if untilUnixMicro != 0 {
|
||||
if rc := C.sd_journal_seek_realtime_usec(j, C.uint64_t(untilUnixMicro)); rc != 0 {
|
||||
logWatcher.Err <- errors.New("error seeking provided until value: " + CErr(rc))
|
||||
if !config.Until.IsZero() {
|
||||
if err := j.SeekRealtime(config.Until); err != nil {
|
||||
logWatcher.Err <- err
|
||||
return
|
||||
}
|
||||
} else if rc := C.sd_journal_seek_tail(j); rc != 0 {
|
||||
logWatcher.Err <- errors.New("error seeking to end of journal: " + CErr(rc))
|
||||
} else if err := j.SeekTail(); err != nil {
|
||||
logWatcher.Err <- err
|
||||
return
|
||||
}
|
||||
// (Try to) skip backwards by the requested number of lines...
|
||||
if C.sd_journal_previous_skip(j, C.uint64_t(config.Tail)) >= 0 {
|
||||
if _, err := j.PreviousSkip(uint(config.Tail)); err == nil {
|
||||
// ...but not before "since"
|
||||
if sinceUnixMicro != 0 &&
|
||||
C.sd_journal_get_realtime_usec(j, &stamp) == 0 &&
|
||||
uint64(stamp) < sinceUnixMicro {
|
||||
C.sd_journal_seek_realtime_usec(j, C.uint64_t(sinceUnixMicro))
|
||||
if !config.Since.IsZero() {
|
||||
if stamp, err := j.Realtime(); err == nil && stamp.Before(config.Since) {
|
||||
_ = j.SeekRealtime(config.Since)
|
||||
}
|
||||
}
|
||||
}
|
||||
} else {
|
||||
// Start at the beginning of the journal.
|
||||
if rc := C.sd_journal_seek_head(j); rc != 0 {
|
||||
logWatcher.Err <- errors.New("error seeking to start of journal: " + CErr(rc))
|
||||
if err := j.SeekHead(); err != nil {
|
||||
logWatcher.Err <- err
|
||||
return
|
||||
}
|
||||
// If we have a cutoff date, fast-forward to it.
|
||||
if sinceUnixMicro != 0 {
|
||||
if rc := C.sd_journal_seek_realtime_usec(j, C.uint64_t(sinceUnixMicro)); rc != 0 {
|
||||
logWatcher.Err <- errors.New("error seeking to start time in journal: " + CErr(rc))
|
||||
if !config.Since.IsZero() {
|
||||
if err := j.SeekRealtime(config.Since); err != nil {
|
||||
logWatcher.Err <- err
|
||||
return
|
||||
}
|
||||
}
|
||||
if rc := C.sd_journal_next(j); rc < 0 {
|
||||
logWatcher.Err <- errors.New("error skipping to next journal entry: " + CErr(rc))
|
||||
if _, err := j.Next(); err != nil {
|
||||
logWatcher.Err <- err
|
||||
return
|
||||
}
|
||||
}
|
||||
var cursor *sdjournal.Cursor
|
||||
if config.Tail != 0 { // special case for --tail 0
|
||||
cursor, _, _ = s.drainJournal(logWatcher, j, nil, untilUnixMicro)
|
||||
cursor, _, _ = s.drainJournal(logWatcher, j, nil, config.Until)
|
||||
}
|
||||
if config.Follow {
|
||||
cursor = s.followJournal(logWatcher, j, cursor, untilUnixMicro)
|
||||
cursor = s.followJournal(logWatcher, j, cursor, config.Until)
|
||||
}
|
||||
C.free(unsafe.Pointer(cursor))
|
||||
cursor.Free()
|
||||
}
|
||||
|
||||
func (s *journald) ReadLogs(config logger.ReadConfig) *logger.LogWatcher {
|
||||
|
|
|
@ -1,7 +0,0 @@
|
|||
//go:build linux && cgo && !static_build && journald
|
||||
// +build linux,cgo,!static_build,journald
|
||||
|
||||
package journald // import "github.com/docker/docker/daemon/logger/journald"
|
||||
|
||||
// #cgo pkg-config: libsystemd
|
||||
import "C"
|
Loading…
Reference in a new issue