Merge pull request #43105 from kzys/follow-struct

daemon/logger: refactor followLogs and replace flaky TestFollowLogsHandleDecodeErr
This commit is contained in:
Brian Goff 2022-01-11 16:57:02 -08:00 committed by GitHub
commit f045d0de94
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
4 changed files with 252 additions and 250 deletions

View file

@ -0,0 +1,211 @@
package loggerutils // import "github.com/docker/docker/daemon/logger/loggerutils"
import (
"io"
"os"
"time"
"github.com/docker/docker/daemon/logger"
"github.com/docker/docker/pkg/filenotify"
"github.com/fsnotify/fsnotify"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
)
var errRetry = errors.New("retry")
var errDone = errors.New("done")
type follow struct {
file *os.File
dec Decoder
fileWatcher filenotify.FileWatcher
logWatcher *logger.LogWatcher
notifyRotate, notifyEvict chan interface{}
oldSize int64
retries int
}
func (fl *follow) handleRotate() error {
name := fl.file.Name()
fl.file.Close()
fl.fileWatcher.Remove(name)
// retry when the file doesn't exist
var err error
for retries := 0; retries <= 5; retries++ {
f, err := open(name)
if err == nil || !os.IsNotExist(err) {
fl.file = f
break
}
}
if err != nil {
return err
}
if err := fl.fileWatcher.Add(name); err != nil {
return err
}
fl.dec.Reset(fl.file)
return nil
}
func (fl *follow) handleMustClose(evictErr error) {
fl.file.Close()
fl.dec.Close()
fl.logWatcher.Err <- errors.Wrap(evictErr, "log reader evicted due to errors")
logrus.WithField("file", fl.file.Name()).Error("Log reader notified that it must re-open log file, some log data may not be streamed to the client.")
}
func (fl *follow) waitRead() error {
select {
case e := <-fl.notifyEvict:
if e != nil {
err := e.(error)
fl.handleMustClose(err)
}
return errDone
case e := <-fl.fileWatcher.Events():
switch e.Op {
case fsnotify.Write:
fl.dec.Reset(fl.file)
return nil
case fsnotify.Rename, fsnotify.Remove:
select {
case <-fl.notifyRotate:
case <-fl.logWatcher.WatchProducerGone():
return errDone
case <-fl.logWatcher.WatchConsumerGone():
return errDone
}
if err := fl.handleRotate(); err != nil {
return err
}
return nil
}
return errRetry
case err := <-fl.fileWatcher.Errors():
logrus.Debugf("logger got error watching file: %v", err)
// Something happened, let's try and stay alive and create a new watcher
if fl.retries <= 5 {
fl.fileWatcher.Close()
fl.fileWatcher, err = watchFile(fl.file.Name())
if err != nil {
return err
}
fl.retries++
return errRetry
}
return err
case <-fl.logWatcher.WatchProducerGone():
return errDone
case <-fl.logWatcher.WatchConsumerGone():
return errDone
}
}
func (fl *follow) handleDecodeErr(err error) error {
if !errors.Is(err, io.EOF) {
return err
}
// Handle special case (#39235): max-file=1 and file was truncated
st, stErr := fl.file.Stat()
if stErr == nil {
size := st.Size()
defer func() { fl.oldSize = size }()
if size < fl.oldSize { // truncated
fl.file.Seek(0, 0)
fl.dec.Reset(fl.file)
return nil
}
} else {
logrus.WithError(stErr).Warn("logger: stat error")
}
for {
err := fl.waitRead()
if err == nil {
break
}
if err == errRetry {
continue
}
return err
}
return nil
}
func (fl *follow) mainLoop(since, until time.Time) {
for {
select {
case err := <-fl.notifyEvict:
if err != nil {
fl.handleMustClose(err.(error))
}
return
default:
}
msg, err := fl.dec.Decode()
if err != nil {
if err := fl.handleDecodeErr(err); err != nil {
if err == errDone {
return
}
// we got an unrecoverable error, so return
fl.logWatcher.Err <- err
return
}
// ready to try again
continue
}
fl.retries = 0 // reset retries since we've succeeded
if !since.IsZero() && msg.Timestamp.Before(since) {
continue
}
if !until.IsZero() && msg.Timestamp.After(until) {
return
}
// send the message, unless the consumer is gone
select {
case e := <-fl.notifyEvict:
if e != nil {
err := e.(error)
logrus.WithError(err).Debug("Reader evicted while sending log message")
fl.logWatcher.Err <- err
}
return
case fl.logWatcher.Msg <- msg:
case <-fl.logWatcher.WatchConsumerGone():
return
}
}
}
func followLogs(f *os.File, logWatcher *logger.LogWatcher, notifyRotate, notifyEvict chan interface{}, dec Decoder, since, until time.Time) {
dec.Reset(f)
name := f.Name()
fileWatcher, err := watchFile(name)
if err != nil {
logWatcher.Err <- err
return
}
defer func() {
f.Close()
dec.Close()
fileWatcher.Close()
}()
fl := &follow{
file: f,
oldSize: -1,
logWatcher: logWatcher,
fileWatcher: fileWatcher,
notifyRotate: notifyRotate,
notifyEvict: notifyEvict,
dec: dec,
}
fl.mainLoop(since, until)
}

View file

@ -0,0 +1,37 @@
package loggerutils // import "github.com/docker/docker/daemon/logger/loggerutils"
import (
"io"
"os"
"testing"
"gotest.tools/v3/assert"
)
func TestHandleDecoderErr(t *testing.T) {
f, err := os.CreateTemp("", t.Name())
assert.NilError(t, err)
defer os.Remove(f.Name())
_, err = f.Write([]byte("hello"))
assert.NilError(t, err)
pos, err := f.Seek(0, io.SeekCurrent)
assert.NilError(t, err)
assert.Assert(t, pos != 0)
dec := &testDecoder{}
// Simulate "turncate" case, where the file was bigger before.
fl := &follow{file: f, dec: dec, oldSize: 100}
err = fl.handleDecodeErr(io.EOF)
assert.NilError(t, err)
// handleDecodeErr seeks to zero.
pos, err = f.Seek(0, io.SeekCurrent)
assert.NilError(t, err)
assert.Equal(t, int64(0), pos)
// Reset is called.
assert.Equal(t, 1, dec.resetCount)
}

View file

@ -17,7 +17,6 @@ import (
"github.com/docker/docker/pkg/filenotify"
"github.com/docker/docker/pkg/pools"
"github.com/docker/docker/pkg/pubsub"
"github.com/fsnotify/fsnotify"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
)
@ -608,180 +607,6 @@ func tailFiles(files []SizeReaderAt, watcher *logger.LogWatcher, dec Decoder, ge
}
}
func followLogs(f *os.File, logWatcher *logger.LogWatcher, notifyRotate, notifyEvict chan interface{}, dec Decoder, since, until time.Time) {
dec.Reset(f)
name := f.Name()
fileWatcher, err := watchFile(name)
if err != nil {
logWatcher.Err <- err
return
}
defer func() {
f.Close()
dec.Close()
fileWatcher.Close()
}()
var retries int
handleRotate := func() error {
f.Close()
fileWatcher.Remove(name)
// retry when the file doesn't exist
for retries := 0; retries <= 5; retries++ {
f, err = open(name)
if err == nil || !os.IsNotExist(err) {
break
}
}
if err != nil {
return err
}
if err := fileWatcher.Add(name); err != nil {
return err
}
dec.Reset(f)
return nil
}
errRetry := errors.New("retry")
errDone := errors.New("done")
handleMustClose := func(evictErr error) {
f.Close()
dec.Close()
logWatcher.Err <- errors.Wrap(err, "log reader evicted due to errors")
logrus.WithField("file", f.Name()).Error("Log reader notified that it must re-open log file, some log data may not be streamed to the client.")
}
waitRead := func() error {
select {
case e := <-notifyEvict:
if e != nil {
err := e.(error)
handleMustClose(err)
}
return errDone
case e := <-fileWatcher.Events():
switch e.Op {
case fsnotify.Write:
dec.Reset(f)
return nil
case fsnotify.Rename, fsnotify.Remove:
select {
case <-notifyRotate:
case <-logWatcher.WatchProducerGone():
return errDone
case <-logWatcher.WatchConsumerGone():
return errDone
}
if err := handleRotate(); err != nil {
return err
}
return nil
}
return errRetry
case err := <-fileWatcher.Errors():
logrus.Debugf("logger got error watching file: %v", err)
// Something happened, let's try and stay alive and create a new watcher
if retries <= 5 {
fileWatcher.Close()
fileWatcher, err = watchFile(name)
if err != nil {
return err
}
retries++
return errRetry
}
return err
case <-logWatcher.WatchProducerGone():
return errDone
case <-logWatcher.WatchConsumerGone():
return errDone
}
}
oldSize := int64(-1)
handleDecodeErr := func(err error) error {
if !errors.Is(err, io.EOF) {
return err
}
// Handle special case (#39235): max-file=1 and file was truncated
st, stErr := f.Stat()
if stErr == nil {
size := st.Size()
defer func() { oldSize = size }()
if size < oldSize { // truncated
f.Seek(0, 0)
dec.Reset(f)
return nil
}
} else {
logrus.WithError(stErr).Warn("logger: stat error")
}
for {
err := waitRead()
if err == nil {
break
}
if err == errRetry {
continue
}
return err
}
return nil
}
// main loop
for {
select {
case err := <-notifyEvict:
if err != nil {
handleMustClose(err.(error))
}
return
default:
}
msg, err := dec.Decode()
if err != nil {
if err := handleDecodeErr(err); err != nil {
if err == errDone {
return
}
// we got an unrecoverable error, so return
logWatcher.Err <- err
return
}
// ready to try again
continue
}
retries = 0 // reset retries since we've succeeded
if !since.IsZero() && msg.Timestamp.Before(since) {
continue
}
if !until.IsZero() && msg.Timestamp.After(until) {
return
}
// send the message, unless the consumer is gone
select {
case e := <-notifyEvict:
if e != nil {
err := e.(error)
logrus.WithError(err).Debug("Reader evicted while sending log message")
logWatcher.Err <- err
}
return
case logWatcher.Msg <- msg:
case <-logWatcher.WatchConsumerGone():
return
}
}
}
func watchFile(name string) (filenotify.FileWatcher, error) {
var fileWatcher filenotify.FileWatcher

View file

@ -4,13 +4,11 @@ import (
"bufio"
"bytes"
"context"
"errors"
"fmt"
"io"
"os"
"path/filepath"
"strings"
"sync"
"sync/atomic"
"testing"
"text/tabwriter"
@ -24,8 +22,9 @@ import (
)
type testDecoder struct {
rdr io.Reader
scanner *bufio.Scanner
rdr io.Reader
scanner *bufio.Scanner
resetCount int
}
func (d *testDecoder) Decode() (*logger.Message, error) {
@ -42,6 +41,7 @@ func (d *testDecoder) Decode() (*logger.Message, error) {
func (d *testDecoder) Reset(rdr io.Reader) {
d.rdr = rdr
d.scanner = bufio.NewScanner(rdr)
d.resetCount++
}
func (d *testDecoder) Close() {
@ -246,77 +246,6 @@ func TestFollowLogsProducerGone(t *testing.T) {
}
}
type lineDecoder struct {
r *bufio.Reader
resetCount int
}
func (d *lineDecoder) Decode() (*logger.Message, error) {
line, err := d.r.ReadString('\n')
if err != nil {
return nil, err
}
m := logger.NewMessage()
m.Line = []byte(line)
return m, nil
}
func (d *lineDecoder) Reset(r io.Reader) {
d.r = bufio.NewReader(r)
d.resetCount++
}
func (d *lineDecoder) Close() {
}
func TestFollowLogsHandleDecodeErr(t *testing.T) {
lw := logger.NewLogWatcher()
defer lw.ConsumerGone()
fw, err := os.CreateTemp("", t.Name())
assert.NilError(t, err)
defer os.Remove(fw.Name())
fr, err := os.Open(fw.Name())
assert.NilError(t, err)
dec := &lineDecoder{}
dec.Reset(fr)
var since, until time.Time
rotate := make(chan interface{})
evict := make(chan interface{})
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
followLogs(fr, lw, rotate, evict, dec, since, until)
}()
sendReceive := func(f io.Writer, message string) {
_, err = f.Write([]byte(message))
assert.NilError(t, err)
m := <-lw.Msg
assert.Equal(t, message, string(m.Line))
}
sendReceive(fw, "log1\n")
sendReceive(fw, "log2\n")
ft, err := os.OpenFile(fw.Name(), os.O_WRONLY|os.O_TRUNC, 0600)
assert.NilError(t, err)
sendReceive(ft, "log3\n")
evict <- errors.New("stop followLogs")
wg.Wait()
// followLogs calls Reset() in the beginning,
// each 3 writes result Reset(), then handleDecodeErr() calles Reset().
assert.Equal(t, 5, dec.resetCount)
}
func TestCheckCapacityAndRotate(t *testing.T) {
dir, err := os.MkdirTemp("", t.Name())
assert.NilError(t, err)