Merge pull request #38859 from kolyshkin/journald

Fixes for reading journald logs
This commit is contained in:
Brian Goff 2019-08-09 10:22:40 -07:00 committed by GitHub
commit c2f70da793
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
2 changed files with 126 additions and 183 deletions

View file

@ -21,7 +21,6 @@ type journald struct {
mu sync.Mutex mu sync.Mutex
vars map[string]string // additional variables and values to send to the journal along with the log message vars map[string]string // additional variables and values to send to the journal along with the log message
readers map[*logger.LogWatcher]struct{} readers map[*logger.LogWatcher]struct{}
closed bool
} }
func init() { func init() {

View file

@ -101,56 +101,10 @@ package journald // import "github.com/docker/docker/daemon/logger/journald"
// } // }
// return rc; // return rc;
//} //}
//static int wait_for_data_cancelable(sd_journal *j, int pipefd)
//{
// struct pollfd fds[2];
// uint64_t when = 0;
// int timeout, jevents, i;
// struct timespec ts;
// uint64_t now;
//
// memset(&fds, 0, sizeof(fds));
// fds[0].fd = pipefd;
// fds[0].events = POLLHUP;
// fds[1].fd = sd_journal_get_fd(j);
// if (fds[1].fd < 0) {
// return fds[1].fd;
// }
//
// do {
// jevents = sd_journal_get_events(j);
// if (jevents < 0) {
// return jevents;
// }
// fds[1].events = jevents;
// sd_journal_get_timeout(j, &when);
// if (when == -1) {
// timeout = -1;
// } else {
// clock_gettime(CLOCK_MONOTONIC, &ts);
// now = (uint64_t) ts.tv_sec * 1000000 + ts.tv_nsec / 1000;
// timeout = when > now ? (int) ((when - now + 999) / 1000) : 0;
// }
// i = poll(fds, 2, timeout);
// if ((i == -1) && (errno != EINTR)) {
// /* An unexpected error. */
// return (errno != 0) ? -errno : -EINTR;
// }
// if (fds[0].revents & POLLHUP) {
// /* The close notification pipe was closed. */
// return 0;
// }
// if (sd_journal_process(j) == SD_JOURNAL_APPEND) {
// /* Data, which we might care about, was appended. */
// return 1;
// }
// } while ((fds[0].revents & POLLHUP) == 0);
// return 0;
//}
import "C" import "C"
import ( import (
"fmt" "errors"
"strings" "strings"
"time" "time"
"unsafe" "unsafe"
@ -163,22 +117,29 @@ import (
func (s *journald) Close() error { func (s *journald) Close() error {
s.mu.Lock() s.mu.Lock()
s.closed = true
for r := range s.readers { for r := range s.readers {
r.ProducerGone() r.ProducerGone()
delete(s.readers, r) delete(s.readers, r)
} }
s.mu.Unlock() s.mu.Unlock()
return nil return nil
} }
func (s *journald) drainJournal(logWatcher *logger.LogWatcher, j *C.sd_journal, oldCursor *C.char, untilUnixMicro uint64) (*C.char, bool) { // convert error code returned from a sd_journal_* function
var msg, data, cursor *C.char // (which returns -errno) to a string
var length C.size_t func CErr(ret C.int) string {
var stamp C.uint64_t return C.GoString(C.strerror(C.int(-ret)))
var priority, partial C.int }
var done bool
func (s *journald) drainJournal(logWatcher *logger.LogWatcher, j *C.sd_journal, oldCursor *C.char, untilUnixMicro uint64) (*C.char, bool, int) {
var (
msg, data, cursor *C.char
length C.size_t
stamp C.uint64_t
priority, partial C.int
done bool
shown int
)
// Walk the journal from here forward until we run out of new entries // Walk the journal from here forward until we run out of new entries
// or we reach the until value (if provided). // or we reach the until value (if provided).
@ -216,13 +177,13 @@ drain:
// the stream that we would have // the stream that we would have
// assigned that value. // assigned that value.
source := "" source := ""
if C.get_priority(j, &priority) != 0 { if C.get_priority(j, &priority) == 0 {
source = "" if priority == C.int(journal.PriErr) {
} else if priority == C.int(journal.PriErr) {
source = "stderr" source = "stderr"
} else if priority == C.int(journal.PriInfo) { } else if priority == C.int(journal.PriInfo) {
source = "stdout" source = "stdout"
} }
}
// Retrieve the values of any variables we're adding to the journal. // Retrieve the values of any variables we're adding to the journal.
var attrs []backend.LogAttr var attrs []backend.LogAttr
C.sd_journal_restart_data(j) C.sd_journal_restart_data(j)
@ -230,12 +191,29 @@ drain:
kv := strings.SplitN(C.GoStringN(data, C.int(length)), "=", 2) kv := strings.SplitN(C.GoStringN(data, C.int(length)), "=", 2)
attrs = append(attrs, backend.LogAttr{Key: kv[0], Value: kv[1]}) attrs = append(attrs, backend.LogAttr{Key: kv[0], Value: kv[1]})
} }
// Send the log message.
logWatcher.Msg <- &logger.Message{ // Send the log message, unless the consumer is gone
select {
case <-logWatcher.WatchConsumerGone():
done = true // we won't be able to write anything anymore
break drain
case logWatcher.Msg <- &logger.Message{
Line: line, Line: line,
Source: source, Source: source,
Timestamp: timestamp.In(time.UTC), Timestamp: timestamp.In(time.UTC),
Attrs: attrs, Attrs: attrs,
}:
shown++
}
// Call sd_journal_process() periodically during the processing loop
// to close any opened file descriptors for rotated (deleted) journal files.
if shown%1024 == 0 {
if ret := C.sd_journal_process(j); ret < 0 {
// log a warning but ignore it for now
logrus.WithField("container", s.vars["CONTAINER_ID_FULL"]).
WithField("error", CErr(ret)).
Warn("journald: error processing journal")
}
} }
} }
// If we're at the end of the journal, we're done (for now). // If we're at the end of the journal, we're done (for now).
@ -250,104 +228,93 @@ drain:
// ensure that we won't be freeing an address that's invalid // ensure that we won't be freeing an address that's invalid
cursor = nil cursor = nil
} }
return cursor, done return cursor, done, shown
} }
func (s *journald) followJournal(logWatcher *logger.LogWatcher, j *C.sd_journal, pfd [2]C.int, cursor *C.char, untilUnixMicro uint64) *C.char { func (s *journald) followJournal(logWatcher *logger.LogWatcher, j *C.sd_journal, cursor *C.char, untilUnixMicro uint64) *C.char {
s.mu.Lock() s.mu.Lock()
s.readers[logWatcher] = struct{}{} s.readers[logWatcher] = struct{}{}
if s.closed {
// the journald Logger is closed, presumably because the container has been
// reset. So we shouldn't follow, because we'll never be woken up. But we
// should make one more drainJournal call to be sure we've got all the logs.
// Close pfd[1] so that one drainJournal happens, then cleanup, then return.
C.close(pfd[1])
}
s.mu.Unlock() s.mu.Unlock()
newCursor := make(chan *C.char) waitTimeout := C.uint64_t(250000) // 0.25s
go func() {
for { for {
// Keep copying journal data out until we're notified to stop status := C.sd_journal_wait(j, waitTimeout)
// or we hit an error.
status := C.wait_for_data_cancelable(j, pfd[0])
if status < 0 { if status < 0 {
cerrstr := C.strerror(C.int(-status)) logWatcher.Err <- errors.New("error waiting for journal: " + CErr(status))
errstr := C.GoString(cerrstr) goto cleanup
fmtstr := "error %q while attempting to follow journal for container %q"
logrus.Errorf(fmtstr, errstr, s.vars["CONTAINER_ID_FULL"])
break
} }
select {
var done bool case <-logWatcher.WatchConsumerGone():
cursor, done = s.drainJournal(logWatcher, j, cursor, untilUnixMicro) goto cleanup // won't be able to write anything anymore
case <-logWatcher.WatchProducerGone():
if status != 1 || done { // container is gone, drain journal
// We were notified to stop default:
// container is still alive
if status == C.SD_JOURNAL_NOP {
// no new data -- keep waiting
continue
}
}
newCursor, done, recv := s.drainJournal(logWatcher, j, cursor, untilUnixMicro)
cursor = newCursor
if done || (status == C.SD_JOURNAL_NOP && recv == 0) {
break break
} }
} }
// Clean up. cleanup:
C.close(pfd[0])
s.mu.Lock() s.mu.Lock()
delete(s.readers, logWatcher) delete(s.readers, logWatcher)
s.mu.Unlock() s.mu.Unlock()
close(logWatcher.Msg) close(logWatcher.Msg)
newCursor <- cursor
}()
// Wait until we're told to stop.
select {
case cursor = <-newCursor:
case <-logWatcher.WatchConsumerGone():
// Notify the other goroutine that its work is done.
C.close(pfd[1])
cursor = <-newCursor
}
return cursor return cursor
} }
func (s *journald) readLogs(logWatcher *logger.LogWatcher, config logger.ReadConfig) { func (s *journald) readLogs(logWatcher *logger.LogWatcher, config logger.ReadConfig) {
var j *C.sd_journal var (
var cmatch, cursor *C.char j *C.sd_journal
var stamp C.uint64_t cmatch, cursor *C.char
var sinceUnixMicro uint64 stamp C.uint64_t
var untilUnixMicro uint64 sinceUnixMicro uint64
var pipes [2]C.int untilUnixMicro uint64
)
// Get a handle to the journal. // Get a handle to the journal.
rc := C.sd_journal_open(&j, C.int(0)) if rc := C.sd_journal_open(&j, C.int(0)); rc != 0 {
if rc != 0 { logWatcher.Err <- errors.New("error opening journal: " + CErr(rc))
logWatcher.Err <- fmt.Errorf("error opening journal")
close(logWatcher.Msg) close(logWatcher.Msg)
return return
} }
if config.Follow {
// Initialize library inotify watches early
if rc := C.sd_journal_get_fd(j); rc < 0 {
logWatcher.Err <- errors.New("error getting journald fd: " + CErr(rc))
close(logWatcher.Msg)
return
}
}
// If we end up following the log, we can set the journal context // If we end up following the log, we can set the journal context
// pointer and the channel pointer to nil so that we won't close them // pointer and the channel pointer to nil so that we won't close them
// here, potentially while the goroutine that uses them is still // here, potentially while the goroutine that uses them is still
// running. Otherwise, close them when we return from this function. // running. Otherwise, close them when we return from this function.
following := false following := false
defer func(pfollowing *bool) { defer func() {
if !*pfollowing { if !following {
close(logWatcher.Msg) close(logWatcher.Msg)
} }
C.sd_journal_close(j) C.sd_journal_close(j)
}(&following) }()
// Remove limits on the size of data items that we'll retrieve. // Remove limits on the size of data items that we'll retrieve.
rc = C.sd_journal_set_data_threshold(j, C.size_t(0)) if rc := C.sd_journal_set_data_threshold(j, C.size_t(0)); rc != 0 {
if rc != 0 { logWatcher.Err <- errors.New("error setting journal data threshold: " + CErr(rc))
logWatcher.Err <- fmt.Errorf("error setting journal data threshold")
return return
} }
// Add a match to have the library do the searching for us. // Add a match to have the library do the searching for us.
cmatch = C.CString("CONTAINER_ID_FULL=" + s.vars["CONTAINER_ID_FULL"]) cmatch = C.CString("CONTAINER_ID_FULL=" + s.vars["CONTAINER_ID_FULL"])
defer C.free(unsafe.Pointer(cmatch)) defer C.free(unsafe.Pointer(cmatch))
rc = C.sd_journal_add_match(j, unsafe.Pointer(cmatch), C.strlen(cmatch)) if rc := C.sd_journal_add_match(j, unsafe.Pointer(cmatch), C.strlen(cmatch)); rc != 0 {
if rc != 0 { logWatcher.Err <- errors.New("error setting journal match: " + CErr(rc))
logWatcher.Err <- fmt.Errorf("error setting journal match")
return return
} }
// If we have a cutoff time, convert it to Unix time once. // If we have a cutoff time, convert it to Unix time once.
@ -360,77 +327,54 @@ func (s *journald) readLogs(logWatcher *logger.LogWatcher, config logger.ReadCon
nano := config.Until.UnixNano() nano := config.Until.UnixNano()
untilUnixMicro = uint64(nano / 1000) untilUnixMicro = uint64(nano / 1000)
} }
if config.Tail > 0 { if config.Tail >= 0 {
lines := config.Tail
// If until time provided, start from there. // If until time provided, start from there.
// Otherwise start at the end of the journal. // Otherwise start at the end of the journal.
if untilUnixMicro != 0 && C.sd_journal_seek_realtime_usec(j, C.uint64_t(untilUnixMicro)) < 0 { if untilUnixMicro != 0 {
logWatcher.Err <- fmt.Errorf("error seeking provided until value") if rc := C.sd_journal_seek_realtime_usec(j, C.uint64_t(untilUnixMicro)); rc != 0 {
return logWatcher.Err <- errors.New("error seeking provided until value: " + CErr(rc))
} else if C.sd_journal_seek_tail(j) < 0 {
logWatcher.Err <- fmt.Errorf("error seeking to end of journal")
return return
} }
if C.sd_journal_previous(j) < 0 { } else if rc := C.sd_journal_seek_tail(j); rc != 0 {
logWatcher.Err <- fmt.Errorf("error backtracking to previous journal entry") logWatcher.Err <- errors.New("error seeking to end of journal: " + CErr(rc))
return return
} }
// Walk backward. // (Try to) skip backwards by the requested number of lines...
for lines > 0 { if C.sd_journal_previous_skip(j, C.uint64_t(config.Tail)) >= 0 {
// Stop if the entry time is before our cutoff. // ...but not before "since"
// We'll need the entry time if it isn't, so go if sinceUnixMicro != 0 &&
// ahead and parse it now. C.sd_journal_get_realtime_usec(j, &stamp) == 0 &&
if C.sd_journal_get_realtime_usec(j, &stamp) != 0 { uint64(stamp) < sinceUnixMicro {
break C.sd_journal_seek_realtime_usec(j, C.uint64_t(sinceUnixMicro))
} else {
// Compare the timestamp on the entry to our threshold value.
if sinceUnixMicro != 0 && sinceUnixMicro > uint64(stamp) {
break
}
}
lines--
// If we're at the start of the journal, or
// don't need to back up past any more entries,
// stop.
if lines == 0 || C.sd_journal_previous(j) <= 0 {
break
} }
} }
} else { } else {
// Start at the beginning of the journal. // Start at the beginning of the journal.
if C.sd_journal_seek_head(j) < 0 { if rc := C.sd_journal_seek_head(j); rc != 0 {
logWatcher.Err <- fmt.Errorf("error seeking to start of journal") logWatcher.Err <- errors.New("error seeking to start of journal: " + CErr(rc))
return return
} }
// If we have a cutoff date, fast-forward to it. // If we have a cutoff date, fast-forward to it.
if sinceUnixMicro != 0 && C.sd_journal_seek_realtime_usec(j, C.uint64_t(sinceUnixMicro)) != 0 { if sinceUnixMicro != 0 {
logWatcher.Err <- fmt.Errorf("error seeking to start time in journal") if rc := C.sd_journal_seek_realtime_usec(j, C.uint64_t(sinceUnixMicro)); rc != 0 {
return logWatcher.Err <- errors.New("error seeking to start time in journal: " + CErr(rc))
}
if C.sd_journal_next(j) < 0 {
logWatcher.Err <- fmt.Errorf("error skipping to next journal entry")
return return
} }
} }
cursor, _ = s.drainJournal(logWatcher, j, nil, untilUnixMicro) if rc := C.sd_journal_next(j); rc < 0 {
logWatcher.Err <- errors.New("error skipping to next journal entry: " + CErr(rc))
return
}
}
if config.Tail != 0 { // special case for --tail 0
cursor, _, _ = s.drainJournal(logWatcher, j, nil, untilUnixMicro)
}
if config.Follow { if config.Follow {
// Allocate a descriptor for following the journal, if we'll cursor = s.followJournal(logWatcher, j, cursor, untilUnixMicro)
// need one. Do it here so that we can report if it fails.
if fd := C.sd_journal_get_fd(j); fd < C.int(0) {
logWatcher.Err <- fmt.Errorf("error opening journald follow descriptor: %q", C.GoString(C.strerror(-fd)))
} else {
// Create a pipe that we can poll at the same time as
// the journald descriptor.
if C.pipe(&pipes[0]) == C.int(-1) {
logWatcher.Err <- fmt.Errorf("error opening journald close notification pipe")
} else {
cursor = s.followJournal(logWatcher, j, pipes, cursor, untilUnixMicro)
// Let followJournal handle freeing the journal context // Let followJournal handle freeing the journal context
// object and closing the channel. // object and closing the channel.
following = true following = true
} }
}
}
C.free(unsafe.Pointer(cursor)) C.free(unsafe.Pointer(cursor))
return return