|
@@ -245,20 +245,28 @@ func (s *journald) followJournal(logWatcher *logger.LogWatcher, config logger.Re
|
|
|
s.readers.mu.Lock()
|
|
|
s.readers.readers[logWatcher] = logWatcher
|
|
|
s.readers.mu.Unlock()
|
|
|
+
|
|
|
go func() {
|
|
|
- // Keep copying journal data out until we're notified to stop
|
|
|
- // or we hit an error.
|
|
|
- status := C.wait_for_data_cancelable(j, pfd[0])
|
|
|
- for status == 1 {
|
|
|
+ for {
|
|
|
+ // Keep copying journal data out until we're notified to stop
|
|
|
+ // or we hit an error.
|
|
|
+ status := C.wait_for_data_cancelable(j, pfd[0])
|
|
|
+ if status < 0 {
|
|
|
+ cerrstr := C.strerror(C.int(-status))
|
|
|
+ errstr := C.GoString(cerrstr)
|
|
|
+ fmtstr := "error %q while attempting to follow journal for container %q"
|
|
|
+ logrus.Errorf(fmtstr, errstr, s.vars["CONTAINER_ID_FULL"])
|
|
|
+ break
|
|
|
+ }
|
|
|
+
|
|
|
cursor = s.drainJournal(logWatcher, config, j, cursor)
|
|
|
- status = C.wait_for_data_cancelable(j, pfd[0])
|
|
|
- }
|
|
|
- if status < 0 {
|
|
|
- cerrstr := C.strerror(C.int(-status))
|
|
|
- errstr := C.GoString(cerrstr)
|
|
|
- fmtstr := "error %q while attempting to follow journal for container %q"
|
|
|
- logrus.Errorf(fmtstr, errstr, s.vars["CONTAINER_ID_FULL"])
|
|
|
+
|
|
|
+ if status != 1 {
|
|
|
+ // We were notified to stop
|
|
|
+ break
|
|
|
+ }
|
|
|
}
|
|
|
+
|
|
|
// Clean up.
|
|
|
C.close(pfd[0])
|
|
|
s.readers.mu.Lock()
|
|
@@ -267,6 +275,7 @@ func (s *journald) followJournal(logWatcher *logger.LogWatcher, config logger.Re
|
|
|
C.sd_journal_close(j)
|
|
|
close(logWatcher.Msg)
|
|
|
}()
|
|
|
+
|
|
|
// Wait until we're told to stop.
|
|
|
select {
|
|
|
case <-logWatcher.WatchClose():
|