123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235 |
- package loggerutils // import "github.com/docker/docker/daemon/logger/loggerutils"
- import (
- "bufio"
- "bytes"
- "context"
- "fmt"
- "io"
- "os"
- "path/filepath"
- "strings"
- "testing"
- "text/tabwriter"
- "time"
- "github.com/docker/docker/daemon/logger"
- "github.com/docker/docker/pkg/tailfile"
- "gotest.tools/v3/assert"
- "gotest.tools/v3/poll"
- )
- type testDecoder struct {
- rdr io.Reader
- scanner *bufio.Scanner
- resetCount int
- }
- func (d *testDecoder) Decode() (*logger.Message, error) {
- if d.scanner == nil {
- d.scanner = bufio.NewScanner(d.rdr)
- }
- if !d.scanner.Scan() {
- return nil, d.scanner.Err()
- }
- // some comment
- return &logger.Message{Line: d.scanner.Bytes(), Timestamp: time.Now()}, nil
- }
- func (d *testDecoder) Reset(rdr io.Reader) {
- d.rdr = rdr
- d.scanner = bufio.NewScanner(rdr)
- d.resetCount++
- }
- func (d *testDecoder) Close() {
- d.rdr = nil
- d.scanner = nil
- }
- func TestTailFiles(t *testing.T) {
- s1 := strings.NewReader("Hello.\nMy name is Inigo Montoya.\n")
- s2 := strings.NewReader("I'm serious.\nDon't call me Shirley!\n")
- s3 := strings.NewReader("Roads?\nWhere we're going we don't need roads.\n")
- files := []SizeReaderAt{s1, s2, s3}
- watcher := logger.NewLogWatcher()
- defer watcher.ConsumerGone()
- tailReader := func(ctx context.Context, r SizeReaderAt, lines int) (io.Reader, int, error) {
- return tailfile.NewTailReader(ctx, r, lines)
- }
- dec := &testDecoder{}
- for desc, config := range map[string]logger.ReadConfig{} {
- t.Run(desc, func(t *testing.T) {
- started := make(chan struct{})
- fwd := newForwarder(config)
- go func() {
- close(started)
- tailFiles(files, watcher, dec, tailReader, config.Tail, fwd)
- }()
- <-started
- })
- }
- config := logger.ReadConfig{Tail: 2}
- fwd := newForwarder(config)
- started := make(chan struct{})
- go func() {
- close(started)
- tailFiles(files, watcher, dec, tailReader, config.Tail, fwd)
- }()
- <-started
- select {
- case <-time.After(60 * time.Second):
- t.Fatal("timeout waiting for tail line")
- case err := <-watcher.Err:
- assert.NilError(t, err)
- case msg := <-watcher.Msg:
- assert.Assert(t, msg != nil)
- assert.Assert(t, string(msg.Line) == "Roads?", string(msg.Line))
- }
- select {
- case <-time.After(60 * time.Second):
- t.Fatal("timeout waiting for tail line")
- case err := <-watcher.Err:
- assert.NilError(t, err)
- case msg := <-watcher.Msg:
- assert.Assert(t, msg != nil)
- assert.Assert(t, string(msg.Line) == "Where we're going we don't need roads.", string(msg.Line))
- }
- }
- type dummyDecoder struct{}
- func (dummyDecoder) Decode() (*logger.Message, error) {
- return &logger.Message{}, nil
- }
- func (dummyDecoder) Close() {}
- func (dummyDecoder) Reset(io.Reader) {}
- func TestCheckCapacityAndRotate(t *testing.T) {
- dir := t.TempDir()
- logPath := filepath.Join(dir, "log")
- getTailReader := func(ctx context.Context, r SizeReaderAt, lines int) (io.Reader, int, error) {
- return tailfile.NewTailReader(ctx, r, lines)
- }
- createDecoder := func(io.Reader) Decoder {
- return dummyDecoder{}
- }
- l, err := NewLogFile(
- logPath,
- 5, // capacity
- 3, // maxFiles
- true, // compress
- createDecoder,
- 0o600, // perms
- getTailReader,
- )
- assert.NilError(t, err)
- defer l.Close()
- ls := dirStringer{dir}
- timestamp := time.Time{}
- assert.NilError(t, l.WriteLogEntry(timestamp, []byte("hello world!")))
- _, err = os.Stat(logPath + ".1")
- assert.Assert(t, os.IsNotExist(err), ls)
- assert.NilError(t, l.WriteLogEntry(timestamp, []byte("hello world!")))
- poll.WaitOn(t, checkFileExists(logPath+".1.gz"), poll.WithDelay(time.Millisecond), poll.WithTimeout(30*time.Second))
- assert.NilError(t, l.WriteLogEntry(timestamp, []byte("hello world!")))
- poll.WaitOn(t, checkFileExists(logPath+".1.gz"), poll.WithDelay(time.Millisecond), poll.WithTimeout(30*time.Second))
- poll.WaitOn(t, checkFileExists(logPath+".2.gz"), poll.WithDelay(time.Millisecond), poll.WithTimeout(30*time.Second))
- t.Run("closed log file", func(t *testing.T) {
- // Now let's simulate a failed rotation where the file was able to be closed but something else happened elsewhere
- // down the line.
- // We want to make sure that we can recover in the case that `l.f` was closed while attempting a rotation.
- l.f.Close()
- assert.NilError(t, l.WriteLogEntry(timestamp, []byte("hello world!")))
- assert.NilError(t, os.Remove(logPath+".2.gz"))
- })
- t.Run("with log reader", func(t *testing.T) {
- // Make sure rotate works with an active reader
- lw := l.ReadLogs(logger.ReadConfig{Follow: true, Tail: 1000})
- defer lw.ConsumerGone()
- assert.NilError(t, l.WriteLogEntry(timestamp, []byte("hello world 0!")), ls)
- // make sure the log reader is primed
- waitForMsg(t, lw, 30*time.Second)
- assert.NilError(t, l.WriteLogEntry(timestamp, []byte("hello world 1!")), ls)
- assert.NilError(t, l.WriteLogEntry(timestamp, []byte("hello world 2!")), ls)
- assert.NilError(t, l.WriteLogEntry(timestamp, []byte("hello world 3!")), ls)
- assert.NilError(t, l.WriteLogEntry(timestamp, []byte("hello world 4!")), ls)
- poll.WaitOn(t, checkFileExists(logPath+".2.gz"), poll.WithDelay(time.Millisecond), poll.WithTimeout(30*time.Second))
- })
- }
- func waitForMsg(t *testing.T, lw *logger.LogWatcher, timeout time.Duration) {
- t.Helper()
- timer := time.NewTimer(timeout)
- defer timer.Stop()
- select {
- case _, ok := <-lw.Msg:
- assert.Assert(t, ok, "log producer gone before log message arrived")
- case err := <-lw.Err:
- assert.NilError(t, err)
- case <-timer.C:
- t.Fatal("timeout waiting for log message")
- }
- }
- type dirStringer struct {
- d string
- }
- func (d dirStringer) String() string {
- ls, err := os.ReadDir(d.d)
- if err != nil {
- return ""
- }
- buf := bytes.NewBuffer(nil)
- tw := tabwriter.NewWriter(buf, 1, 8, 1, '\t', 0)
- buf.WriteString("\n")
- btw := bufio.NewWriter(tw)
- for _, entry := range ls {
- fi, err := entry.Info()
- if err != nil {
- return ""
- }
- btw.WriteString(fmt.Sprintf("%s\t%s\t%dB\t%s\n", fi.Name(), fi.Mode(), fi.Size(), fi.ModTime()))
- }
- btw.Flush()
- tw.Flush()
- return buf.String()
- }
- func checkFileExists(name string) poll.Check {
- return func(t poll.LogT) poll.Result {
- _, err := os.Stat(name)
- switch {
- case err == nil:
- return poll.Success()
- case os.IsNotExist(err):
- return poll.Continue("waiting for %s to exist", name)
- default:
- t.Logf("waiting for %s: %v: %s", name, err, dirStringer{filepath.Dir(name)})
- return poll.Error(err)
- }
- }
- }
|