|
@@ -56,7 +56,7 @@ package journald
|
|
|
//}
|
|
|
//static int is_attribute_field(const char *msg, size_t length)
|
|
|
//{
|
|
|
-// const struct known_field {
|
|
|
+// static const struct known_field {
|
|
|
// const char *name;
|
|
|
// size_t length;
|
|
|
// } fields[] = {
|
|
@@ -101,21 +101,23 @@ package journald
|
|
|
// }
|
|
|
// return rc;
|
|
|
//}
|
|
|
-//static int wait_for_data_or_close(sd_journal *j, int pipefd)
|
|
|
+//static int wait_for_data_cancelable(sd_journal *j, int pipefd)
|
|
|
//{
|
|
|
// struct pollfd fds[2];
|
|
|
// uint64_t when = 0;
|
|
|
// int timeout, jevents, i;
|
|
|
// struct timespec ts;
|
|
|
// uint64_t now;
|
|
|
+//
|
|
|
+// memset(&fds, 0, sizeof(fds));
|
|
|
+// fds[0].fd = pipefd;
|
|
|
+// fds[0].events = POLLHUP;
|
|
|
+// fds[1].fd = sd_journal_get_fd(j);
|
|
|
+// if (fds[1].fd < 0) {
|
|
|
+// return fds[1].fd;
|
|
|
+// }
|
|
|
+//
|
|
|
// do {
|
|
|
-// memset(&fds, 0, sizeof(fds));
|
|
|
-// fds[0].fd = pipefd;
|
|
|
-// fds[0].events = POLLHUP;
|
|
|
-// fds[1].fd = sd_journal_get_fd(j);
|
|
|
-// if (fds[1].fd < 0) {
|
|
|
-// return fds[1].fd;
|
|
|
-// }
|
|
|
// jevents = sd_journal_get_events(j);
|
|
|
// if (jevents < 0) {
|
|
|
// return jevents;
|
|
@@ -167,7 +169,7 @@ func (s *journald) Close() error {
|
|
|
return nil
|
|
|
}
|
|
|
|
|
|
-func (s *journald) drainJournal(logWatcher *logger.LogWatcher, config logger.ReadConfig, j *C.sd_journal, oldCursor string) string {
|
|
|
+func (s *journald) drainJournal(logWatcher *logger.LogWatcher, config logger.ReadConfig, j *C.sd_journal, oldCursor *C.char) *C.char {
|
|
|
var msg, data, cursor *C.char
|
|
|
var length C.size_t
|
|
|
var stamp C.uint64_t
|
|
@@ -177,10 +179,8 @@ func (s *journald) drainJournal(logWatcher *logger.LogWatcher, config logger.Rea
|
|
|
drain:
|
|
|
for {
|
|
|
// Try not to send a given entry twice.
|
|
|
- if oldCursor != "" {
|
|
|
- ccursor := C.CString(oldCursor)
|
|
|
- defer C.free(unsafe.Pointer(ccursor))
|
|
|
- for C.sd_journal_test_cursor(j, ccursor) > 0 {
|
|
|
+ if oldCursor != nil {
|
|
|
+ for C.sd_journal_test_cursor(j, oldCursor) > 0 {
|
|
|
if C.sd_journal_next(j) <= 0 {
|
|
|
break drain
|
|
|
}
|
|
@@ -234,25 +234,24 @@ drain:
|
|
|
break
|
|
|
}
|
|
|
}
|
|
|
- retCursor := ""
|
|
|
- if C.sd_journal_get_cursor(j, &cursor) == 0 {
|
|
|
- retCursor = C.GoString(cursor)
|
|
|
- C.free(unsafe.Pointer(cursor))
|
|
|
- }
|
|
|
- return retCursor
|
|
|
+
|
|
|
+ // free(NULL) is safe
|
|
|
+ C.free(unsafe.Pointer(oldCursor))
|
|
|
+ C.sd_journal_get_cursor(j, &cursor)
|
|
|
+ return cursor
|
|
|
}
|
|
|
|
|
|
-func (s *journald) followJournal(logWatcher *logger.LogWatcher, config logger.ReadConfig, j *C.sd_journal, pfd [2]C.int, cursor string) {
|
|
|
+func (s *journald) followJournal(logWatcher *logger.LogWatcher, config logger.ReadConfig, j *C.sd_journal, pfd [2]C.int, cursor *C.char) *C.char {
|
|
|
s.readers.mu.Lock()
|
|
|
s.readers.readers[logWatcher] = logWatcher
|
|
|
s.readers.mu.Unlock()
|
|
|
go func() {
|
|
|
// Keep copying journal data out until we're notified to stop
|
|
|
// or we hit an error.
|
|
|
- status := C.wait_for_data_or_close(j, pfd[0])
|
|
|
+ status := C.wait_for_data_cancelable(j, pfd[0])
|
|
|
for status == 1 {
|
|
|
cursor = s.drainJournal(logWatcher, config, j, cursor)
|
|
|
- status = C.wait_for_data_or_close(j, pfd[0])
|
|
|
+ status = C.wait_for_data_cancelable(j, pfd[0])
|
|
|
}
|
|
|
if status < 0 {
|
|
|
cerrstr := C.strerror(C.int(-status))
|
|
@@ -274,15 +273,16 @@ func (s *journald) followJournal(logWatcher *logger.LogWatcher, config logger.Re
|
|
|
// Notify the other goroutine that its work is done.
|
|
|
C.close(pfd[1])
|
|
|
}
|
|
|
+
|
|
|
+ return cursor
|
|
|
}
|
|
|
|
|
|
func (s *journald) readLogs(logWatcher *logger.LogWatcher, config logger.ReadConfig) {
|
|
|
var j *C.sd_journal
|
|
|
- var cmatch *C.char
|
|
|
+ var cmatch, cursor *C.char
|
|
|
var stamp C.uint64_t
|
|
|
var sinceUnixMicro uint64
|
|
|
var pipes [2]C.int
|
|
|
- cursor := ""
|
|
|
|
|
|
// Get a handle to the journal.
|
|
|
rc := C.sd_journal_open(&j, C.int(0))
|
|
@@ -370,7 +370,7 @@ func (s *journald) readLogs(logWatcher *logger.LogWatcher, config logger.ReadCon
|
|
|
return
|
|
|
}
|
|
|
}
|
|
|
- cursor = s.drainJournal(logWatcher, config, j, "")
|
|
|
+ cursor = s.drainJournal(logWatcher, config, j, nil)
|
|
|
if config.Follow {
|
|
|
// Allocate a descriptor for following the journal, if we'll
|
|
|
// need one. Do it here so that we can report if it fails.
|
|
@@ -382,13 +382,15 @@ func (s *journald) readLogs(logWatcher *logger.LogWatcher, config logger.ReadCon
|
|
|
if C.pipe(&pipes[0]) == C.int(-1) {
|
|
|
logWatcher.Err <- fmt.Errorf("error opening journald close notification pipe")
|
|
|
} else {
|
|
|
- s.followJournal(logWatcher, config, j, pipes, cursor)
|
|
|
+ cursor = s.followJournal(logWatcher, config, j, pipes, cursor)
|
|
|
// Let followJournal handle freeing the journal context
|
|
|
// object and closing the channel.
|
|
|
following = true
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
+
|
|
|
+ C.free(unsafe.Pointer(cursor))
|
|
|
return
|
|
|
}
|
|
|
|