diff --git a/daemon/logger/journald/journald.go b/daemon/logger/journald/journald.go index 8750cae093..40bdd4228c 100644 --- a/daemon/logger/journald/journald.go +++ b/daemon/logger/journald/journald.go @@ -21,7 +21,6 @@ type journald struct { mu sync.Mutex vars map[string]string // additional variables and values to send to the journal along with the log message readers map[*logger.LogWatcher]struct{} - closed bool } func init() { diff --git a/daemon/logger/journald/read.go b/daemon/logger/journald/read.go index 4bddfd5781..7b221983b6 100644 --- a/daemon/logger/journald/read.go +++ b/daemon/logger/journald/read.go @@ -101,56 +101,10 @@ package journald // import "github.com/docker/docker/daemon/logger/journald" // } // return rc; //} -//static int wait_for_data_cancelable(sd_journal *j, int pipefd) -//{ -// struct pollfd fds[2]; -// uint64_t when = 0; -// int timeout, jevents, i; -// struct timespec ts; -// uint64_t now; -// -// memset(&fds, 0, sizeof(fds)); -// fds[0].fd = pipefd; -// fds[0].events = POLLHUP; -// fds[1].fd = sd_journal_get_fd(j); -// if (fds[1].fd < 0) { -// return fds[1].fd; -// } -// -// do { -// jevents = sd_journal_get_events(j); -// if (jevents < 0) { -// return jevents; -// } -// fds[1].events = jevents; -// sd_journal_get_timeout(j, &when); -// if (when == -1) { -// timeout = -1; -// } else { -// clock_gettime(CLOCK_MONOTONIC, &ts); -// now = (uint64_t) ts.tv_sec * 1000000 + ts.tv_nsec / 1000; -// timeout = when > now ? (int) ((when - now + 999) / 1000) : 0; -// } -// i = poll(fds, 2, timeout); -// if ((i == -1) && (errno != EINTR)) { -// /* An unexpected error. */ -// return (errno != 0) ? -errno : -EINTR; -// } -// if (fds[0].revents & POLLHUP) { -// /* The close notification pipe was closed. */ -// return 0; -// } -// if (sd_journal_process(j) == SD_JOURNAL_APPEND) { -// /* Data, which we might care about, was appended. */ -// return 1; -// } -// } while ((fds[0].revents & POLLHUP) == 0); -// return 0; -//} import "C" import ( - "fmt" + "errors" "strings" "time" "unsafe" @@ -163,22 +117,29 @@ import ( func (s *journald) Close() error { s.mu.Lock() - s.closed = true for r := range s.readers { r.ProducerGone() delete(s.readers, r) - } s.mu.Unlock() return nil } -func (s *journald) drainJournal(logWatcher *logger.LogWatcher, j *C.sd_journal, oldCursor *C.char, untilUnixMicro uint64) (*C.char, bool) { - var msg, data, cursor *C.char - var length C.size_t - var stamp C.uint64_t - var priority, partial C.int - var done bool +// convert error code returned from a sd_journal_* function +// (which returns -errno) to a string +func CErr(ret C.int) string { + return C.GoString(C.strerror(C.int(-ret))) +} + +func (s *journald) drainJournal(logWatcher *logger.LogWatcher, j *C.sd_journal, oldCursor *C.char, untilUnixMicro uint64) (*C.char, bool, int) { + var ( + msg, data, cursor *C.char + length C.size_t + stamp C.uint64_t + priority, partial C.int + done bool + shown int + ) // Walk the journal from here forward until we run out of new entries // or we reach the until value (if provided). @@ -216,12 +177,12 @@ drain: // the stream that we would have // assigned that value. source := "" - if C.get_priority(j, &priority) != 0 { - source = "" - } else if priority == C.int(journal.PriErr) { - source = "stderr" - } else if priority == C.int(journal.PriInfo) { - source = "stdout" + if C.get_priority(j, &priority) == 0 { + if priority == C.int(journal.PriErr) { + source = "stderr" + } else if priority == C.int(journal.PriInfo) { + source = "stdout" + } } // Retrieve the values of any variables we're adding to the journal. var attrs []backend.LogAttr @@ -230,12 +191,29 @@ drain: kv := strings.SplitN(C.GoStringN(data, C.int(length)), "=", 2) attrs = append(attrs, backend.LogAttr{Key: kv[0], Value: kv[1]}) } - // Send the log message. - logWatcher.Msg <- &logger.Message{ + + // Send the log message, unless the consumer is gone + select { + case <-logWatcher.WatchConsumerGone(): + done = true // we won't be able to write anything anymore + break drain + case logWatcher.Msg <- &logger.Message{ Line: line, Source: source, Timestamp: timestamp.In(time.UTC), Attrs: attrs, + }: + shown++ + } + // Call sd_journal_process() periodically during the processing loop + // to close any opened file descriptors for rotated (deleted) journal files. + if shown%1024 == 0 { + if ret := C.sd_journal_process(j); ret < 0 { + // log a warning but ignore it for now + logrus.WithField("container", s.vars["CONTAINER_ID_FULL"]). + WithField("error", CErr(ret)). + Warn("journald: error processing journal") + } } } // If we're at the end of the journal, we're done (for now). @@ -250,104 +228,93 @@ drain: // ensure that we won't be freeing an address that's invalid cursor = nil } - return cursor, done + return cursor, done, shown } -func (s *journald) followJournal(logWatcher *logger.LogWatcher, j *C.sd_journal, pfd [2]C.int, cursor *C.char, untilUnixMicro uint64) *C.char { +func (s *journald) followJournal(logWatcher *logger.LogWatcher, j *C.sd_journal, cursor *C.char, untilUnixMicro uint64) *C.char { s.mu.Lock() s.readers[logWatcher] = struct{}{} - if s.closed { - // the journald Logger is closed, presumably because the container has been - // reset. So we shouldn't follow, because we'll never be woken up. But we - // should make one more drainJournal call to be sure we've got all the logs. - // Close pfd[1] so that one drainJournal happens, then cleanup, then return. - C.close(pfd[1]) - } s.mu.Unlock() - newCursor := make(chan *C.char) + waitTimeout := C.uint64_t(250000) // 0.25s - go func() { - for { - // Keep copying journal data out until we're notified to stop - // or we hit an error. - status := C.wait_for_data_cancelable(j, pfd[0]) - if status < 0 { - cerrstr := C.strerror(C.int(-status)) - errstr := C.GoString(cerrstr) - fmtstr := "error %q while attempting to follow journal for container %q" - logrus.Errorf(fmtstr, errstr, s.vars["CONTAINER_ID_FULL"]) - break - } - - var done bool - cursor, done = s.drainJournal(logWatcher, j, cursor, untilUnixMicro) - - if status != 1 || done { - // We were notified to stop - break + for { + status := C.sd_journal_wait(j, waitTimeout) + if status < 0 { + logWatcher.Err <- errors.New("error waiting for journal: " + CErr(status)) + goto cleanup + } + select { + case <-logWatcher.WatchConsumerGone(): + goto cleanup // won't be able to write anything anymore + case <-logWatcher.WatchProducerGone(): + // container is gone, drain journal + default: + // container is still alive + if status == C.SD_JOURNAL_NOP { + // no new data -- keep waiting + continue } } - - // Clean up. - C.close(pfd[0]) - s.mu.Lock() - delete(s.readers, logWatcher) - s.mu.Unlock() - close(logWatcher.Msg) - newCursor <- cursor - }() - - // Wait until we're told to stop. - select { - case cursor = <-newCursor: - case <-logWatcher.WatchConsumerGone(): - // Notify the other goroutine that its work is done. - C.close(pfd[1]) - cursor = <-newCursor + newCursor, done, recv := s.drainJournal(logWatcher, j, cursor, untilUnixMicro) + cursor = newCursor + if done || (status == C.SD_JOURNAL_NOP && recv == 0) { + break + } } +cleanup: + s.mu.Lock() + delete(s.readers, logWatcher) + s.mu.Unlock() + close(logWatcher.Msg) return cursor } func (s *journald) readLogs(logWatcher *logger.LogWatcher, config logger.ReadConfig) { - var j *C.sd_journal - var cmatch, cursor *C.char - var stamp C.uint64_t - var sinceUnixMicro uint64 - var untilUnixMicro uint64 - var pipes [2]C.int + var ( + j *C.sd_journal + cmatch, cursor *C.char + stamp C.uint64_t + sinceUnixMicro uint64 + untilUnixMicro uint64 + ) // Get a handle to the journal. - rc := C.sd_journal_open(&j, C.int(0)) - if rc != 0 { - logWatcher.Err <- fmt.Errorf("error opening journal") + if rc := C.sd_journal_open(&j, C.int(0)); rc != 0 { + logWatcher.Err <- errors.New("error opening journal: " + CErr(rc)) close(logWatcher.Msg) return } + if config.Follow { + // Initialize library inotify watches early + if rc := C.sd_journal_get_fd(j); rc < 0 { + logWatcher.Err <- errors.New("error getting journald fd: " + CErr(rc)) + close(logWatcher.Msg) + return + } + } // If we end up following the log, we can set the journal context // pointer and the channel pointer to nil so that we won't close them // here, potentially while the goroutine that uses them is still // running. Otherwise, close them when we return from this function. following := false - defer func(pfollowing *bool) { - if !*pfollowing { + defer func() { + if !following { close(logWatcher.Msg) } C.sd_journal_close(j) - }(&following) + }() // Remove limits on the size of data items that we'll retrieve. - rc = C.sd_journal_set_data_threshold(j, C.size_t(0)) - if rc != 0 { - logWatcher.Err <- fmt.Errorf("error setting journal data threshold") + if rc := C.sd_journal_set_data_threshold(j, C.size_t(0)); rc != 0 { + logWatcher.Err <- errors.New("error setting journal data threshold: " + CErr(rc)) return } // Add a match to have the library do the searching for us. cmatch = C.CString("CONTAINER_ID_FULL=" + s.vars["CONTAINER_ID_FULL"]) defer C.free(unsafe.Pointer(cmatch)) - rc = C.sd_journal_add_match(j, unsafe.Pointer(cmatch), C.strlen(cmatch)) - if rc != 0 { - logWatcher.Err <- fmt.Errorf("error setting journal match") + if rc := C.sd_journal_add_match(j, unsafe.Pointer(cmatch), C.strlen(cmatch)); rc != 0 { + logWatcher.Err <- errors.New("error setting journal match: " + CErr(rc)) return } // If we have a cutoff time, convert it to Unix time once. @@ -360,76 +327,53 @@ func (s *journald) readLogs(logWatcher *logger.LogWatcher, config logger.ReadCon nano := config.Until.UnixNano() untilUnixMicro = uint64(nano / 1000) } - if config.Tail > 0 { - lines := config.Tail + if config.Tail >= 0 { // If until time provided, start from there. // Otherwise start at the end of the journal. - if untilUnixMicro != 0 && C.sd_journal_seek_realtime_usec(j, C.uint64_t(untilUnixMicro)) < 0 { - logWatcher.Err <- fmt.Errorf("error seeking provided until value") - return - } else if C.sd_journal_seek_tail(j) < 0 { - logWatcher.Err <- fmt.Errorf("error seeking to end of journal") - return - } - if C.sd_journal_previous(j) < 0 { - logWatcher.Err <- fmt.Errorf("error backtracking to previous journal entry") - return - } - // Walk backward. - for lines > 0 { - // Stop if the entry time is before our cutoff. - // We'll need the entry time if it isn't, so go - // ahead and parse it now. - if C.sd_journal_get_realtime_usec(j, &stamp) != 0 { - break - } else { - // Compare the timestamp on the entry to our threshold value. - if sinceUnixMicro != 0 && sinceUnixMicro > uint64(stamp) { - break - } + if untilUnixMicro != 0 { + if rc := C.sd_journal_seek_realtime_usec(j, C.uint64_t(untilUnixMicro)); rc != 0 { + logWatcher.Err <- errors.New("error seeking provided until value: " + CErr(rc)) + return } - lines-- - // If we're at the start of the journal, or - // don't need to back up past any more entries, - // stop. - if lines == 0 || C.sd_journal_previous(j) <= 0 { - break + } else if rc := C.sd_journal_seek_tail(j); rc != 0 { + logWatcher.Err <- errors.New("error seeking to end of journal: " + CErr(rc)) + return + } + // (Try to) skip backwards by the requested number of lines... + if C.sd_journal_previous_skip(j, C.uint64_t(config.Tail)) >= 0 { + // ...but not before "since" + if sinceUnixMicro != 0 && + C.sd_journal_get_realtime_usec(j, &stamp) == 0 && + uint64(stamp) < sinceUnixMicro { + C.sd_journal_seek_realtime_usec(j, C.uint64_t(sinceUnixMicro)) } } } else { // Start at the beginning of the journal. - if C.sd_journal_seek_head(j) < 0 { - logWatcher.Err <- fmt.Errorf("error seeking to start of journal") + if rc := C.sd_journal_seek_head(j); rc != 0 { + logWatcher.Err <- errors.New("error seeking to start of journal: " + CErr(rc)) return } // If we have a cutoff date, fast-forward to it. - if sinceUnixMicro != 0 && C.sd_journal_seek_realtime_usec(j, C.uint64_t(sinceUnixMicro)) != 0 { - logWatcher.Err <- fmt.Errorf("error seeking to start time in journal") - return + if sinceUnixMicro != 0 { + if rc := C.sd_journal_seek_realtime_usec(j, C.uint64_t(sinceUnixMicro)); rc != 0 { + logWatcher.Err <- errors.New("error seeking to start time in journal: " + CErr(rc)) + return + } } - if C.sd_journal_next(j) < 0 { - logWatcher.Err <- fmt.Errorf("error skipping to next journal entry") + if rc := C.sd_journal_next(j); rc < 0 { + logWatcher.Err <- errors.New("error skipping to next journal entry: " + CErr(rc)) return } } - cursor, _ = s.drainJournal(logWatcher, j, nil, untilUnixMicro) + if config.Tail != 0 { // special case for --tail 0 + cursor, _, _ = s.drainJournal(logWatcher, j, nil, untilUnixMicro) + } if config.Follow { - // Allocate a descriptor for following the journal, if we'll - // need one. Do it here so that we can report if it fails. - if fd := C.sd_journal_get_fd(j); fd < C.int(0) { - logWatcher.Err <- fmt.Errorf("error opening journald follow descriptor: %q", C.GoString(C.strerror(-fd))) - } else { - // Create a pipe that we can poll at the same time as - // the journald descriptor. - if C.pipe(&pipes[0]) == C.int(-1) { - logWatcher.Err <- fmt.Errorf("error opening journald close notification pipe") - } else { - cursor = s.followJournal(logWatcher, j, pipes, cursor, untilUnixMicro) - // Let followJournal handle freeing the journal context - // object and closing the channel. - following = true - } - } + cursor = s.followJournal(logWatcher, j, cursor, untilUnixMicro) + // Let followJournal handle freeing the journal context + // object and closing the channel. + following = true } C.free(unsafe.Pointer(cursor))