123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375 |
- package loggerutils // import "github.com/docker/docker/daemon/logger/loggerutils"
- import (
- "bufio"
- "bytes"
- "context"
- "fmt"
- "io"
- "os"
- "path/filepath"
- "strings"
- "sync/atomic"
- "testing"
- "text/tabwriter"
- "time"
- "github.com/docker/docker/daemon/logger"
- "github.com/docker/docker/pkg/pubsub"
- "github.com/docker/docker/pkg/tailfile"
- "gotest.tools/v3/assert"
- "gotest.tools/v3/poll"
- )
- type testDecoder struct {
- rdr io.Reader
- scanner *bufio.Scanner
- resetCount int
- }
- func (d *testDecoder) Decode() (*logger.Message, error) {
- if d.scanner == nil {
- d.scanner = bufio.NewScanner(d.rdr)
- }
- if !d.scanner.Scan() {
- return nil, d.scanner.Err()
- }
- // some comment
- return &logger.Message{Line: d.scanner.Bytes(), Timestamp: time.Now()}, nil
- }
- func (d *testDecoder) Reset(rdr io.Reader) {
- d.rdr = rdr
- d.scanner = bufio.NewScanner(rdr)
- d.resetCount++
- }
- func (d *testDecoder) Close() {
- d.rdr = nil
- d.scanner = nil
- }
- func TestTailFiles(t *testing.T) {
- s1 := strings.NewReader("Hello.\nMy name is Inigo Montoya.\n")
- s2 := strings.NewReader("I'm serious.\nDon't call me Shirley!\n")
- s3 := strings.NewReader("Roads?\nWhere we're going we don't need roads.\n")
- files := []SizeReaderAt{s1, s2, s3}
- watcher := logger.NewLogWatcher()
- defer watcher.ConsumerGone()
- tailReader := func(ctx context.Context, r SizeReaderAt, lines int) (io.Reader, int, error) {
- return tailfile.NewTailReader(ctx, r, lines)
- }
- dec := &testDecoder{}
- for desc, config := range map[string]logger.ReadConfig{} {
- t.Run(desc, func(t *testing.T) {
- started := make(chan struct{})
- go func() {
- close(started)
- tailFiles(files, watcher, dec, tailReader, config, make(chan interface{}))
- }()
- <-started
- })
- }
- config := logger.ReadConfig{Tail: 2}
- started := make(chan struct{})
- go func() {
- close(started)
- tailFiles(files, watcher, dec, tailReader, config, make(chan interface{}))
- }()
- <-started
- select {
- case <-time.After(60 * time.Second):
- t.Fatal("timeout waiting for tail line")
- case err := <-watcher.Err:
- assert.NilError(t, err)
- case msg := <-watcher.Msg:
- assert.Assert(t, msg != nil)
- assert.Assert(t, string(msg.Line) == "Roads?", string(msg.Line))
- }
- select {
- case <-time.After(60 * time.Second):
- t.Fatal("timeout waiting for tail line")
- case err := <-watcher.Err:
- assert.NilError(t, err)
- case msg := <-watcher.Msg:
- assert.Assert(t, msg != nil)
- assert.Assert(t, string(msg.Line) == "Where we're going we don't need roads.", string(msg.Line))
- }
- }
- type dummyDecoder struct{}
- func (dummyDecoder) Decode() (*logger.Message, error) {
- return &logger.Message{}, nil
- }
- func (dummyDecoder) Close() {}
- func (dummyDecoder) Reset(io.Reader) {}
- func TestFollowLogsConsumerGone(t *testing.T) {
- lw := logger.NewLogWatcher()
- f, err := os.CreateTemp("", t.Name())
- assert.NilError(t, err)
- defer func() {
- f.Close()
- os.Remove(f.Name())
- }()
- dec := dummyDecoder{}
- followLogsDone := make(chan struct{})
- var since, until time.Time
- go func() {
- followLogs(f, lw, make(chan interface{}), make(chan interface{}), dec, since, until)
- close(followLogsDone)
- }()
- select {
- case <-lw.Msg:
- case err := <-lw.Err:
- assert.NilError(t, err)
- case <-followLogsDone:
- t.Fatal("follow logs finished unexpectedly")
- case <-time.After(10 * time.Second):
- t.Fatal("timeout waiting for log message")
- }
- lw.ConsumerGone()
- select {
- case <-followLogsDone:
- case <-time.After(20 * time.Second):
- t.Fatal("timeout waiting for followLogs() to finish")
- }
- }
- type dummyWrapper struct {
- dummyDecoder
- fn func() error
- }
- func (d *dummyWrapper) Decode() (*logger.Message, error) {
- if err := d.fn(); err != nil {
- return nil, err
- }
- return d.dummyDecoder.Decode()
- }
- func TestFollowLogsProducerGone(t *testing.T) {
- lw := logger.NewLogWatcher()
- defer lw.ConsumerGone()
- f, err := os.CreateTemp("", t.Name())
- assert.NilError(t, err)
- defer os.Remove(f.Name())
- var sent, received, closed int32
- dec := &dummyWrapper{fn: func() error {
- switch atomic.LoadInt32(&closed) {
- case 0:
- atomic.AddInt32(&sent, 1)
- return nil
- case 1:
- atomic.AddInt32(&closed, 1)
- t.Logf("logDecode() closed after sending %d messages\n", sent)
- return io.EOF
- default:
- t.Fatal("logDecode() called after closing!")
- return io.EOF
- }
- }}
- var since, until time.Time
- followLogsDone := make(chan struct{})
- go func() {
- followLogs(f, lw, make(chan interface{}), make(chan interface{}), dec, since, until)
- close(followLogsDone)
- }()
- // read 1 message
- select {
- case <-lw.Msg:
- received++
- case err := <-lw.Err:
- assert.NilError(t, err)
- case <-followLogsDone:
- t.Fatal("followLogs() finished unexpectedly")
- case <-time.After(10 * time.Second):
- t.Fatal("timeout waiting for log message")
- }
- // "stop" the "container"
- atomic.StoreInt32(&closed, 1)
- lw.ProducerGone()
- // should receive all the messages sent
- readDone := make(chan struct{})
- go func() {
- defer close(readDone)
- for {
- select {
- case <-lw.Msg:
- received++
- if received == atomic.LoadInt32(&sent) {
- return
- }
- case err := <-lw.Err:
- assert.NilError(t, err)
- }
- }
- }()
- select {
- case <-readDone:
- case <-time.After(30 * time.Second):
- t.Fatalf("timeout waiting for log messages to be read (sent: %d, received: %d", sent, received)
- }
- t.Logf("messages sent: %d, received: %d", atomic.LoadInt32(&sent), received)
- // followLogs() should be done by now
- select {
- case <-followLogsDone:
- case <-time.After(30 * time.Second):
- t.Fatal("timeout waiting for followLogs() to finish")
- }
- select {
- case <-lw.WatchConsumerGone():
- t.Fatal("consumer should not have exited")
- default:
- }
- }
- func TestCheckCapacityAndRotate(t *testing.T) {
- dir, err := os.MkdirTemp("", t.Name())
- assert.NilError(t, err)
- defer os.RemoveAll(dir)
- f, err := os.CreateTemp(dir, "log")
- assert.NilError(t, err)
- l := &LogFile{
- f: f,
- capacity: 5,
- maxFiles: 3,
- compress: true,
- notifyReaders: pubsub.NewPublisher(0, 1),
- perms: 0600,
- filesRefCounter: refCounter{counter: make(map[string]int)},
- getTailReader: func(ctx context.Context, r SizeReaderAt, lines int) (io.Reader, int, error) {
- return tailfile.NewTailReader(ctx, r, lines)
- },
- createDecoder: func(io.Reader) Decoder {
- return dummyDecoder{}
- },
- marshal: func(msg *logger.Message) ([]byte, error) {
- return msg.Line, nil
- },
- }
- defer l.Close()
- ls := dirStringer{dir}
- assert.NilError(t, l.WriteLogEntry(&logger.Message{Line: []byte("hello world!")}))
- _, err = os.Stat(f.Name() + ".1")
- assert.Assert(t, os.IsNotExist(err), ls)
- assert.NilError(t, l.WriteLogEntry(&logger.Message{Line: []byte("hello world!")}))
- poll.WaitOn(t, checkFileExists(f.Name()+".1.gz"), poll.WithDelay(time.Millisecond), poll.WithTimeout(30*time.Second))
- assert.NilError(t, l.WriteLogEntry(&logger.Message{Line: []byte("hello world!")}))
- poll.WaitOn(t, checkFileExists(f.Name()+".1.gz"), poll.WithDelay(time.Millisecond), poll.WithTimeout(30*time.Second))
- poll.WaitOn(t, checkFileExists(f.Name()+".2.gz"), poll.WithDelay(time.Millisecond), poll.WithTimeout(30*time.Second))
- t.Run("closed log file", func(t *testing.T) {
- // Now let's simulate a failed rotation where the file was able to be closed but something else happened elsewhere
- // down the line.
- // We want to make sure that we can recover in the case that `l.f` was closed while attempting a rotation.
- l.f.Close()
- assert.NilError(t, l.WriteLogEntry(&logger.Message{Line: []byte("hello world!")}))
- assert.NilError(t, os.Remove(f.Name()+".2.gz"))
- })
- t.Run("with log reader", func(t *testing.T) {
- // Make sure rotate works with an active reader
- lw := logger.NewLogWatcher()
- defer lw.ConsumerGone()
- go l.ReadLogs(logger.ReadConfig{Follow: true, Tail: 1000}, lw)
- assert.NilError(t, l.WriteLogEntry(&logger.Message{Line: []byte("hello world 0!")}), ls)
- // make sure the log reader is primed
- waitForMsg(t, lw, 30*time.Second)
- assert.NilError(t, l.WriteLogEntry(&logger.Message{Line: []byte("hello world 1!")}), ls)
- assert.NilError(t, l.WriteLogEntry(&logger.Message{Line: []byte("hello world 2!")}), ls)
- assert.NilError(t, l.WriteLogEntry(&logger.Message{Line: []byte("hello world 3!")}), ls)
- assert.NilError(t, l.WriteLogEntry(&logger.Message{Line: []byte("hello world 4!")}), ls)
- poll.WaitOn(t, checkFileExists(f.Name()+".2.gz"), poll.WithDelay(time.Millisecond), poll.WithTimeout(30*time.Second))
- })
- }
- func waitForMsg(t *testing.T, lw *logger.LogWatcher, timeout time.Duration) {
- t.Helper()
- timer := time.NewTimer(timeout)
- defer timer.Stop()
- select {
- case <-lw.Msg:
- case <-lw.WatchProducerGone():
- t.Fatal("log producer gone before log message arrived")
- case err := <-lw.Err:
- assert.NilError(t, err)
- case <-timer.C:
- t.Fatal("timeout waiting for log message")
- }
- }
- type dirStringer struct {
- d string
- }
- func (d dirStringer) String() string {
- ls, err := os.ReadDir(d.d)
- if err != nil {
- return ""
- }
- buf := bytes.NewBuffer(nil)
- tw := tabwriter.NewWriter(buf, 1, 8, 1, '\t', 0)
- buf.WriteString("\n")
- btw := bufio.NewWriter(tw)
- for _, entry := range ls {
- fi, err := entry.Info()
- if err != nil {
- return ""
- }
- btw.WriteString(fmt.Sprintf("%s\t%s\t%dB\t%s\n", fi.Name(), fi.Mode(), fi.Size(), fi.ModTime()))
- }
- btw.Flush()
- tw.Flush()
- return buf.String()
- }
- func checkFileExists(name string) poll.Check {
- return func(t poll.LogT) poll.Result {
- _, err := os.Stat(name)
- switch {
- case err == nil:
- return poll.Success()
- case os.IsNotExist(err):
- return poll.Continue("waiting for %s to exist", name)
- default:
- t.Logf("waiting for %s: %v: %s", name, err, dirStringer{filepath.Dir(name)})
- return poll.Error(err)
- }
- }
- }
|