moby/daemon/logger/loggerutils/follow.go
Cory Snider 01915a725e daemon/logger: follow LogFile without file watches
File watches have been a source of complexity and unreliability in the
LogFile follow implementation, especially when combined with file
rotation. File change events can be unreliably delivered, especially on
Windows, and the polling fallback adds latency. Following across
rotations has never worked reliably on Windows. Without synchronization
between the log writer and readers, race conditions abound: readers can
read from the file while a log entry is only partially written, leading
to decode errors and necessitating retries.

In addition to the complexities stemming from file watches, the LogFile
follow implementation had complexity from needing to handle file
truncations, and (due to a now-fixed bug in the polling file watcher
implementation) evictions to unlock the log file so it could be rotated.
Log files are now always rotated, never truncated, so these situations
no longer need to be handled by the follow code.

Rewrite the LogFile follow implementation in terms of waiting until
LogFile notifies it that a new message has been written to the log file.
The LogFile informs the follower of the file offset of the last complete
write so that the follower knows not to read past that, preventing it
from attempting to decode partial messages and making retries
unnecessary. Synchronization between LogFile and its followers is used
at critical points to prevent missed notifications of writes and races
between file rotations and the follower opening files for read.

Signed-off-by: Cory Snider <csnider@mirantis.com>
2022-05-19 15:22:22 -04:00

165 lines
4.2 KiB
Go

package loggerutils // import "github.com/docker/docker/daemon/logger/loggerutils"
import (
"fmt"
"io"
"os"
"time"
"github.com/docker/docker/daemon/logger"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
)
type follow struct {
LogFile *LogFile
Watcher *logger.LogWatcher
Decoder Decoder
Since, Until time.Time
log *logrus.Entry
c chan logPos
}
// Do follows the log file as it is written, starting from f at read.
func (fl *follow) Do(f *os.File, read logPos) {
fl.log = logrus.WithFields(logrus.Fields{
"module": "logger",
"file": f.Name(),
})
// Optimization: allocate the write-notifications channel only once and
// reuse it for multiple invocations of nextPos().
fl.c = make(chan logPos, 1)
defer func() {
if err := f.Close(); err != nil && !errors.Is(err, os.ErrClosed) {
fl.log.WithError(err).Warn("error closing current log file")
}
}()
for {
wrote, ok := fl.nextPos(read)
if !ok {
return
}
if wrote.rotation != read.rotation {
// Flush the current file before moving on to the next.
if _, err := f.Seek(read.size, io.SeekStart); err != nil {
fl.Watcher.Err <- err
return
}
if fl.decode(f) {
return
}
// Open the new file, which has the same name as the old
// file thanks to file rotation. Make no mistake: they
// are different files, with distinct identities.
// Atomically capture the wrote position to make
// absolutely sure that the position corresponds to the
// file we have opened; more rotations could have
// occurred since we previously received it.
if err := f.Close(); err != nil {
fl.log.WithError(err).Warn("error closing rotated log file")
}
var err error
func() {
fl.LogFile.fsopMu.RLock()
st := <-fl.LogFile.read
defer func() {
fl.LogFile.read <- st
fl.LogFile.fsopMu.RUnlock()
}()
f, err = open(f.Name())
wrote = st.pos
}()
// We tried to open the file inside a critical section
// so we shouldn't have been racing the rotation of the
// file. Any error, even fs.ErrNotFound, is exceptional.
if err != nil {
fl.Watcher.Err <- fmt.Errorf("logger: error opening log file for follow after rotation: %w", err)
return
}
if nrot := wrote.rotation - read.rotation; nrot > 1 {
fl.log.WithField("missed-rotations", nrot).
Warn("file rotations were missed while following logs; some log messages have been skipped over")
}
// Set up our read position to start from the top of the file.
read.size = 0
}
if fl.decode(io.NewSectionReader(f, read.size, wrote.size-read.size)) {
return
}
read = wrote
}
}
// nextPos waits until the write position of the LogFile being followed has
// advanced from current and returns the new position.
func (fl *follow) nextPos(current logPos) (next logPos, ok bool) {
var st logReadState
select {
case <-fl.Watcher.WatchConsumerGone():
return current, false
case st = <-fl.LogFile.read:
}
// Have any any logs been written since we last checked?
if st.pos == current { // Nope.
// Add ourself to the notify list.
st.wait = append(st.wait, fl.c)
} else { // Yes.
// "Notify" ourself immediately.
fl.c <- st.pos
}
fl.LogFile.read <- st
select {
case <-fl.LogFile.closed: // No more logs will be written.
select { // Have we followed to the end?
case next = <-fl.c: // No: received a new position.
default: // Yes.
return current, false
}
case <-fl.Watcher.WatchConsumerGone():
return current, false
case next = <-fl.c:
}
return next, true
}
// decode decodes log messages from r and sends messages with timestamps between
// Since and Until to the log watcher.
//
// The return value, done, signals whether following should end due to a
// condition encountered during decode.
func (fl *follow) decode(r io.Reader) (done bool) {
fl.Decoder.Reset(r)
for {
msg, err := fl.Decoder.Decode()
if err != nil {
if errors.Is(err, io.EOF) {
return false
}
fl.Watcher.Err <- err
return true
}
if !fl.Since.IsZero() && msg.Timestamp.Before(fl.Since) {
continue
}
if !fl.Until.IsZero() && msg.Timestamp.After(fl.Until) {
return true
}
// send the message, unless the consumer is gone
select {
case fl.Watcher.Msg <- msg:
case <-fl.Watcher.WatchConsumerGone():
return true
}
}
}