123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425 |
- // +build linux,cgo,!static_build,journald
- package journald
- // #include <sys/types.h>
- // #include <sys/poll.h>
- // #include <systemd/sd-journal.h>
- // #include <errno.h>
- // #include <stdio.h>
- // #include <stdlib.h>
- // #include <string.h>
- // #include <time.h>
- // #include <unistd.h>
- //
- //static int get_message(sd_journal *j, const char **msg, size_t *length, int *partial)
- //{
- // int rc;
- // size_t plength;
- // *msg = NULL;
- // *length = 0;
- // plength = strlen("CONTAINER_PARTIAL_MESSAGE=true");
- // rc = sd_journal_get_data(j, "CONTAINER_PARTIAL_MESSAGE", (const void **) msg, length);
- // *partial = ((rc == 0) && (*length == plength) && (memcmp(*msg, "CONTAINER_PARTIAL_MESSAGE=true", plength) == 0));
- // rc = sd_journal_get_data(j, "MESSAGE", (const void **) msg, length);
- // if (rc == 0) {
- // if (*length > 8) {
- // (*msg) += 8;
- // *length -= 8;
- // } else {
- // *msg = NULL;
- // *length = 0;
- // rc = -ENOENT;
- // }
- // }
- // return rc;
- //}
- //static int get_priority(sd_journal *j, int *priority)
- //{
- // const void *data;
- // size_t i, length;
- // int rc;
- // *priority = -1;
- // rc = sd_journal_get_data(j, "PRIORITY", &data, &length);
- // if (rc == 0) {
- // if ((length > 9) && (strncmp(data, "PRIORITY=", 9) == 0)) {
- // *priority = 0;
- // for (i = 9; i < length; i++) {
- // *priority = *priority * 10 + ((const char *)data)[i] - '0';
- // }
- // if (length > 9) {
- // rc = 0;
- // }
- // }
- // }
- // return rc;
- //}
- //static int is_attribute_field(const char *msg, size_t length)
- //{
- // static const struct known_field {
- // const char *name;
- // size_t length;
- // } fields[] = {
- // {"MESSAGE", sizeof("MESSAGE") - 1},
- // {"MESSAGE_ID", sizeof("MESSAGE_ID") - 1},
- // {"PRIORITY", sizeof("PRIORITY") - 1},
- // {"CODE_FILE", sizeof("CODE_FILE") - 1},
- // {"CODE_LINE", sizeof("CODE_LINE") - 1},
- // {"CODE_FUNC", sizeof("CODE_FUNC") - 1},
- // {"ERRNO", sizeof("ERRNO") - 1},
- // {"SYSLOG_FACILITY", sizeof("SYSLOG_FACILITY") - 1},
- // {"SYSLOG_IDENTIFIER", sizeof("SYSLOG_IDENTIFIER") - 1},
- // {"SYSLOG_PID", sizeof("SYSLOG_PID") - 1},
- // {"CONTAINER_NAME", sizeof("CONTAINER_NAME") - 1},
- // {"CONTAINER_ID", sizeof("CONTAINER_ID") - 1},
- // {"CONTAINER_ID_FULL", sizeof("CONTAINER_ID_FULL") - 1},
- // {"CONTAINER_TAG", sizeof("CONTAINER_TAG") - 1},
- // };
- // unsigned int i;
- // void *p;
- // if ((length < 1) || (msg[0] == '_') || ((p = memchr(msg, '=', length)) == NULL)) {
- // return -1;
- // }
- // length = ((const char *) p) - msg;
- // for (i = 0; i < sizeof(fields) / sizeof(fields[0]); i++) {
- // if ((fields[i].length == length) && (memcmp(fields[i].name, msg, length) == 0)) {
- // return -1;
- // }
- // }
- // return 0;
- //}
- //static int get_attribute_field(sd_journal *j, const char **msg, size_t *length)
- //{
- // int rc;
- // *msg = NULL;
- // *length = 0;
- // while ((rc = sd_journal_enumerate_data(j, (const void **) msg, length)) > 0) {
- // if (is_attribute_field(*msg, *length) == 0) {
- // break;
- // }
- // rc = -ENOENT;
- // }
- // return rc;
- //}
- //static int wait_for_data_cancelable(sd_journal *j, int pipefd)
- //{
- // struct pollfd fds[2];
- // uint64_t when = 0;
- // int timeout, jevents, i;
- // struct timespec ts;
- // uint64_t now;
- //
- // memset(&fds, 0, sizeof(fds));
- // fds[0].fd = pipefd;
- // fds[0].events = POLLHUP;
- // fds[1].fd = sd_journal_get_fd(j);
- // if (fds[1].fd < 0) {
- // return fds[1].fd;
- // }
- //
- // do {
- // jevents = sd_journal_get_events(j);
- // if (jevents < 0) {
- // return jevents;
- // }
- // fds[1].events = jevents;
- // sd_journal_get_timeout(j, &when);
- // if (when == -1) {
- // timeout = -1;
- // } else {
- // clock_gettime(CLOCK_MONOTONIC, &ts);
- // now = (uint64_t) ts.tv_sec * 1000000 + ts.tv_nsec / 1000;
- // timeout = when > now ? (int) ((when - now + 999) / 1000) : 0;
- // }
- // i = poll(fds, 2, timeout);
- // if ((i == -1) && (errno != EINTR)) {
- // /* An unexpected error. */
- // return (errno != 0) ? -errno : -EINTR;
- // }
- // if (fds[0].revents & POLLHUP) {
- // /* The close notification pipe was closed. */
- // return 0;
- // }
- // if (sd_journal_process(j) == SD_JOURNAL_APPEND) {
- // /* Data, which we might care about, was appended. */
- // return 1;
- // }
- // } while ((fds[0].revents & POLLHUP) == 0);
- // return 0;
- //}
- import "C"
- import (
- "fmt"
- "strings"
- "time"
- "unsafe"
- "github.com/Sirupsen/logrus"
- "github.com/coreos/go-systemd/journal"
- "github.com/docker/docker/daemon/logger"
- )
- func (s *journald) Close() error {
- s.mu.Lock()
- s.closed = true
- for reader := range s.readers.readers {
- reader.Close()
- }
- s.mu.Unlock()
- return nil
- }
- func (s *journald) drainJournal(logWatcher *logger.LogWatcher, config logger.ReadConfig, j *C.sd_journal, oldCursor *C.char) *C.char {
- var msg, data, cursor *C.char
- var length C.size_t
- var stamp C.uint64_t
- var priority, partial C.int
- // Walk the journal from here forward until we run out of new entries.
- drain:
- for {
- // Try not to send a given entry twice.
- if oldCursor != nil {
- for C.sd_journal_test_cursor(j, oldCursor) > 0 {
- if C.sd_journal_next(j) <= 0 {
- break drain
- }
- }
- }
- // Read and send the logged message, if there is one to read.
- i := C.get_message(j, &msg, &length, &partial)
- if i != -C.ENOENT && i != -C.EADDRNOTAVAIL {
- // Read the entry's timestamp.
- if C.sd_journal_get_realtime_usec(j, &stamp) != 0 {
- break
- }
- // Set up the time and text of the entry.
- timestamp := time.Unix(int64(stamp)/1000000, (int64(stamp)%1000000)*1000)
- line := C.GoBytes(unsafe.Pointer(msg), C.int(length))
- if partial == 0 {
- line = append(line, "\n"...)
- }
- // Recover the stream name by mapping
- // from the journal priority back to
- // the stream that we would have
- // assigned that value.
- source := ""
- if C.get_priority(j, &priority) != 0 {
- source = ""
- } else if priority == C.int(journal.PriErr) {
- source = "stderr"
- } else if priority == C.int(journal.PriInfo) {
- source = "stdout"
- }
- // Retrieve the values of any variables we're adding to the journal.
- attrs := make(map[string]string)
- C.sd_journal_restart_data(j)
- for C.get_attribute_field(j, &data, &length) > C.int(0) {
- kv := strings.SplitN(C.GoStringN(data, C.int(length)), "=", 2)
- attrs[kv[0]] = kv[1]
- }
- if len(attrs) == 0 {
- attrs = nil
- }
- // Send the log message.
- logWatcher.Msg <- &logger.Message{
- Line: line,
- Source: source,
- Timestamp: timestamp.In(time.UTC),
- Attrs: attrs,
- }
- }
- // If we're at the end of the journal, we're done (for now).
- if C.sd_journal_next(j) <= 0 {
- break
- }
- }
- // free(NULL) is safe
- C.free(unsafe.Pointer(oldCursor))
- if C.sd_journal_get_cursor(j, &cursor) != 0 {
- // ensure that we won't be freeing an address that's invalid
- cursor = nil
- }
- return cursor
- }
- func (s *journald) followJournal(logWatcher *logger.LogWatcher, config logger.ReadConfig, j *C.sd_journal, pfd [2]C.int, cursor *C.char) *C.char {
- s.mu.Lock()
- s.readers.readers[logWatcher] = logWatcher
- if s.closed {
- // the journald Logger is closed, presumably because the container has been
- // reset. So we shouldn't follow, because we'll never be woken up. But we
- // should make one more drainJournal call to be sure we've got all the logs.
- // Close pfd[1] so that one drainJournal happens, then cleanup, then return.
- C.close(pfd[1])
- }
- s.mu.Unlock()
- newCursor := make(chan *C.char)
- go func() {
- for {
- // Keep copying journal data out until we're notified to stop
- // or we hit an error.
- status := C.wait_for_data_cancelable(j, pfd[0])
- if status < 0 {
- cerrstr := C.strerror(C.int(-status))
- errstr := C.GoString(cerrstr)
- fmtstr := "error %q while attempting to follow journal for container %q"
- logrus.Errorf(fmtstr, errstr, s.vars["CONTAINER_ID_FULL"])
- break
- }
- cursor = s.drainJournal(logWatcher, config, j, cursor)
- if status != 1 {
- // We were notified to stop
- break
- }
- }
- // Clean up.
- C.close(pfd[0])
- s.mu.Lock()
- delete(s.readers.readers, logWatcher)
- s.mu.Unlock()
- close(logWatcher.Msg)
- newCursor <- cursor
- }()
- // Wait until we're told to stop.
- select {
- case cursor = <-newCursor:
- case <-logWatcher.WatchClose():
- // Notify the other goroutine that its work is done.
- C.close(pfd[1])
- cursor = <-newCursor
- }
- return cursor
- }
- func (s *journald) readLogs(logWatcher *logger.LogWatcher, config logger.ReadConfig) {
- var j *C.sd_journal
- var cmatch, cursor *C.char
- var stamp C.uint64_t
- var sinceUnixMicro uint64
- var pipes [2]C.int
- // Get a handle to the journal.
- rc := C.sd_journal_open(&j, C.int(0))
- if rc != 0 {
- logWatcher.Err <- fmt.Errorf("error opening journal")
- close(logWatcher.Msg)
- return
- }
- // If we end up following the log, we can set the journal context
- // pointer and the channel pointer to nil so that we won't close them
- // here, potentially while the goroutine that uses them is still
- // running. Otherwise, close them when we return from this function.
- following := false
- defer func(pfollowing *bool) {
- if !*pfollowing {
- close(logWatcher.Msg)
- }
- C.sd_journal_close(j)
- }(&following)
- // Remove limits on the size of data items that we'll retrieve.
- rc = C.sd_journal_set_data_threshold(j, C.size_t(0))
- if rc != 0 {
- logWatcher.Err <- fmt.Errorf("error setting journal data threshold")
- return
- }
- // Add a match to have the library do the searching for us.
- cmatch = C.CString("CONTAINER_ID_FULL=" + s.vars["CONTAINER_ID_FULL"])
- defer C.free(unsafe.Pointer(cmatch))
- rc = C.sd_journal_add_match(j, unsafe.Pointer(cmatch), C.strlen(cmatch))
- if rc != 0 {
- logWatcher.Err <- fmt.Errorf("error setting journal match")
- return
- }
- // If we have a cutoff time, convert it to Unix time once.
- if !config.Since.IsZero() {
- nano := config.Since.UnixNano()
- sinceUnixMicro = uint64(nano / 1000)
- }
- if config.Tail > 0 {
- lines := config.Tail
- // Start at the end of the journal.
- if C.sd_journal_seek_tail(j) < 0 {
- logWatcher.Err <- fmt.Errorf("error seeking to end of journal")
- return
- }
- if C.sd_journal_previous(j) < 0 {
- logWatcher.Err <- fmt.Errorf("error backtracking to previous journal entry")
- return
- }
- // Walk backward.
- for lines > 0 {
- // Stop if the entry time is before our cutoff.
- // We'll need the entry time if it isn't, so go
- // ahead and parse it now.
- if C.sd_journal_get_realtime_usec(j, &stamp) != 0 {
- break
- } else {
- // Compare the timestamp on the entry
- // to our threshold value.
- if sinceUnixMicro != 0 && sinceUnixMicro > uint64(stamp) {
- break
- }
- }
- lines--
- // If we're at the start of the journal, or
- // don't need to back up past any more entries,
- // stop.
- if lines == 0 || C.sd_journal_previous(j) <= 0 {
- break
- }
- }
- } else {
- // Start at the beginning of the journal.
- if C.sd_journal_seek_head(j) < 0 {
- logWatcher.Err <- fmt.Errorf("error seeking to start of journal")
- return
- }
- // If we have a cutoff date, fast-forward to it.
- if sinceUnixMicro != 0 && C.sd_journal_seek_realtime_usec(j, C.uint64_t(sinceUnixMicro)) != 0 {
- logWatcher.Err <- fmt.Errorf("error seeking to start time in journal")
- return
- }
- if C.sd_journal_next(j) < 0 {
- logWatcher.Err <- fmt.Errorf("error skipping to next journal entry")
- return
- }
- }
- cursor = s.drainJournal(logWatcher, config, j, nil)
- if config.Follow {
- // Allocate a descriptor for following the journal, if we'll
- // need one. Do it here so that we can report if it fails.
- if fd := C.sd_journal_get_fd(j); fd < C.int(0) {
- logWatcher.Err <- fmt.Errorf("error opening journald follow descriptor: %q", C.GoString(C.strerror(-fd)))
- } else {
- // Create a pipe that we can poll at the same time as
- // the journald descriptor.
- if C.pipe(&pipes[0]) == C.int(-1) {
- logWatcher.Err <- fmt.Errorf("error opening journald close notification pipe")
- } else {
- cursor = s.followJournal(logWatcher, config, j, pipes, cursor)
- // Let followJournal handle freeing the journal context
- // object and closing the channel.
- following = true
- }
- }
- }
- C.free(unsafe.Pointer(cursor))
- return
- }
- func (s *journald) ReadLogs(config logger.ReadConfig) *logger.LogWatcher {
- logWatcher := logger.NewLogWatcher()
- go s.readLogs(logWatcher, config)
- return logWatcher
- }
|