Browse Source

daemon/logger: drain readers when logger is closed

The LogFile follower would stop immediately upon the producer closing.
The close signal would race the file watcher; if a message were to be
logged and the logger immediately closed, the follower could miss that
last message if the close signal (formerly ProducerGone) was to win the
race. Add logic to perform one more round of reading when the producer
is closed to catch up on any final logs.

Signed-off-by: Cory Snider <csnider@mirantis.com>
Cory Snider 3 years ago
parent
commit
3844d1a3d1

+ 9 - 1
daemon/logger/loggerutils/follow.go

@@ -21,6 +21,7 @@ type follow struct {
 	fileWatcher               filenotify.FileWatcher
 	fileWatcher               filenotify.FileWatcher
 	logWatcher                *logger.LogWatcher
 	logWatcher                *logger.LogWatcher
 	producerGone              <-chan struct{}
 	producerGone              <-chan struct{}
+	draining                  bool
 	notifyRotate, notifyEvict chan interface{}
 	notifyRotate, notifyEvict chan interface{}
 	oldSize                   int64
 	oldSize                   int64
 	retries                   int
 	retries                   int
@@ -99,7 +100,14 @@ func (fl *follow) waitRead() error {
 		}
 		}
 		return err
 		return err
 	case <-fl.producerGone:
 	case <-fl.producerGone:
-		return errDone
+		// There may be messages written out which the fileWatcher has
+		// not yet notified us about.
+		if fl.draining {
+			return errDone
+		}
+		fl.draining = true
+		fl.dec.Reset(fl.file)
+		return nil
 	case <-fl.logWatcher.WatchConsumerGone():
 	case <-fl.logWatcher.WatchConsumerGone():
 		return errDone
 		return errDone
 	}
 	}

+ 2 - 3
daemon/logger/loggerutils/logfile.go

@@ -440,11 +440,10 @@ func (w *LogFile) readLogsLocked(config logger.ReadConfig, watcher *logger.LogWa
 		w.mu.RLock()
 		w.mu.RLock()
 	}
 	}
 
 
-	if !config.Follow || w.closed {
-		w.mu.RUnlock()
+	w.mu.RUnlock()
+	if !config.Follow {
 		return
 		return
 	}
 	}
-	w.mu.RUnlock()
 
 
 	notifyRotate := w.notifyReaders.SubscribeTopic(func(i interface{}) bool {
 	notifyRotate := w.notifyReaders.SubscribeTopic(func(i interface{}) bool {
 		_, ok := i.(struct{})
 		_, ok := i.(struct{})

+ 0 - 1
daemon/logger/loggerutils/logfile_test.go

@@ -179,7 +179,6 @@ func TestFollowLogsProducerGone(t *testing.T) {
 			t.Logf("logDecode() closed after sending %d messages\n", sent)
 			t.Logf("logDecode() closed after sending %d messages\n", sent)
 			return io.EOF
 			return io.EOF
 		default:
 		default:
-			t.Fatal("logDecode() called after closing!")
 			return io.EOF
 			return io.EOF
 		}
 		}
 	}}
 	}}