123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561 |
- package loggertest // import "github.com/docker/docker/daemon/logger/loggertest"
- import (
- "runtime"
- "strings"
- "sync"
- "testing"
- "time"
- "github.com/google/go-cmp/cmp"
- "github.com/google/go-cmp/cmp/cmpopts"
- "gotest.tools/v3/assert"
- "gotest.tools/v3/assert/opt"
- "github.com/docker/docker/api/types/backend"
- "github.com/docker/docker/daemon/logger"
- )
- type syncer interface {
- // Sync commits the current logs to stable storage such that the most
- // recently-logged message can be immediately read back by a LogReader.
- Sync() error
- }
- func syncLogger(t *testing.T, l logger.Logger) {
- if sl, ok := l.(syncer); ok {
- assert.NilError(t, sl.Sync())
- }
- }
- // Reader tests that a logger.LogReader implementation behaves as it should.
- type Reader struct {
- // Factory returns a function which constructs loggers for the container
- // specified in info. Each call to the returned function must yield a
- // distinct logger instance which can read back logs written by earlier
- // instances.
- Factory func(*testing.T, logger.Info) func(*testing.T) logger.Logger
- }
- var compareLog cmp.Options = []cmp.Option{
- // Not all log drivers can round-trip timestamps at full nanosecond
- // precision.
- opt.TimeWithThreshold(time.Millisecond),
- // The json-log driver does not round-trip PLogMetaData and API users do
- // not expect it.
- cmpopts.IgnoreFields(logger.Message{}, "PLogMetaData"),
- cmp.Transformer("string", func(b []byte) string { return string(b) }),
- }
- // TestTail tests the behavior of the LogReader's tail implementation.
- func (tr Reader) TestTail(t *testing.T) {
- t.Run("Live", func(t *testing.T) { tr.testTail(t, true) })
- t.Run("LiveEmpty", func(t *testing.T) { tr.testTailEmptyLogs(t, true) })
- t.Run("Stopped", func(t *testing.T) { tr.testTail(t, false) })
- t.Run("StoppedEmpty", func(t *testing.T) { tr.testTailEmptyLogs(t, false) })
- }
- func makeTestMessages() []*logger.Message {
- return []*logger.Message{
- {Source: "stdout", Timestamp: time.Now().Add(-1 * 30 * time.Minute), Line: []byte("a message")},
- {Source: "stdout", Timestamp: time.Now().Add(-1 * 20 * time.Minute), Line: []byte("another message"), PLogMetaData: &backend.PartialLogMetaData{ID: "aaaaaaaa", Ordinal: 1, Last: true}},
- {Source: "stderr", Timestamp: time.Now().Add(-1 * 15 * time.Minute), Line: []byte("to be..."), PLogMetaData: &backend.PartialLogMetaData{ID: "bbbbbbbb", Ordinal: 1}},
- {Source: "stderr", Timestamp: time.Now().Add(-1 * 15 * time.Minute), Line: []byte("continued"), PLogMetaData: &backend.PartialLogMetaData{ID: "bbbbbbbb", Ordinal: 2, Last: true}},
- {Source: "stderr", Timestamp: time.Now().Add(-1 * 10 * time.Minute), Line: []byte("a really long message " + strings.Repeat("a", 4096))},
- {Source: "stderr", Timestamp: time.Now().Add(-1 * 10 * time.Minute), Line: []byte("just one more message")},
- {Source: "stdout", Timestamp: time.Now().Add(-1 * 90 * time.Minute), Line: []byte("someone adjusted the clock")},
- }
- }
- func (tr Reader) testTail(t *testing.T, live bool) {
- t.Parallel()
- factory := tr.Factory(t, logger.Info{
- ContainerID: "tailtest0000",
- ContainerName: "logtail",
- })
- l := factory(t)
- if live {
- defer func() { assert.NilError(t, l.Close()) }()
- }
- mm := makeTestMessages()
- expected := logMessages(t, l, mm)
- if !live {
- // Simulate reading from a stopped container.
- assert.NilError(t, l.Close())
- l = factory(t)
- defer func() { assert.NilError(t, l.Close()) }()
- }
- lr := l.(logger.LogReader)
- t.Run("Exact", func(t *testing.T) {
- t.Parallel()
- lw := lr.ReadLogs(logger.ReadConfig{Tail: len(mm)})
- defer lw.ConsumerGone()
- assert.DeepEqual(t, readAll(t, lw), expected, compareLog)
- })
- t.Run("LessThanAvailable", func(t *testing.T) {
- t.Parallel()
- lw := lr.ReadLogs(logger.ReadConfig{Tail: 2})
- defer lw.ConsumerGone()
- assert.DeepEqual(t, readAll(t, lw), expected[len(mm)-2:], compareLog)
- })
- t.Run("MoreThanAvailable", func(t *testing.T) {
- t.Parallel()
- lw := lr.ReadLogs(logger.ReadConfig{Tail: 100})
- defer lw.ConsumerGone()
- assert.DeepEqual(t, readAll(t, lw), expected, compareLog)
- })
- t.Run("All", func(t *testing.T) {
- t.Parallel()
- lw := lr.ReadLogs(logger.ReadConfig{Tail: -1})
- defer lw.ConsumerGone()
- assert.DeepEqual(t, readAll(t, lw), expected, compareLog)
- })
- t.Run("Since", func(t *testing.T) {
- t.Parallel()
- lw := lr.ReadLogs(logger.ReadConfig{Tail: -1, Since: mm[1].Timestamp.Truncate(time.Millisecond)})
- defer lw.ConsumerGone()
- assert.DeepEqual(t, readAll(t, lw), expected[1:], compareLog)
- })
- t.Run("MoreThanSince", func(t *testing.T) {
- t.Parallel()
- lw := lr.ReadLogs(logger.ReadConfig{Tail: len(mm), Since: mm[1].Timestamp.Truncate(time.Millisecond)})
- defer lw.ConsumerGone()
- assert.DeepEqual(t, readAll(t, lw), expected[1:], compareLog)
- })
- t.Run("LessThanSince", func(t *testing.T) {
- t.Parallel()
- lw := lr.ReadLogs(logger.ReadConfig{Tail: len(mm) - 2, Since: mm[1].Timestamp.Truncate(time.Millisecond)})
- defer lw.ConsumerGone()
- assert.DeepEqual(t, readAll(t, lw), expected[2:], compareLog)
- })
- t.Run("Until", func(t *testing.T) {
- t.Parallel()
- lw := lr.ReadLogs(logger.ReadConfig{Tail: -1, Until: mm[2].Timestamp.Add(-time.Millisecond)})
- defer lw.ConsumerGone()
- assert.DeepEqual(t, readAll(t, lw), expected[:2], compareLog)
- })
- t.Run("SinceAndUntil", func(t *testing.T) {
- t.Parallel()
- lw := lr.ReadLogs(logger.ReadConfig{Tail: -1, Since: mm[1].Timestamp.Truncate(time.Millisecond), Until: mm[1].Timestamp.Add(time.Millisecond)})
- defer lw.ConsumerGone()
- assert.DeepEqual(t, readAll(t, lw), expected[1:2], compareLog)
- })
- }
- func (tr Reader) testTailEmptyLogs(t *testing.T, live bool) {
- t.Parallel()
- factory := tr.Factory(t, logger.Info{
- ContainerID: "tailemptytest",
- ContainerName: "logtail",
- })
- l := factory(t)
- if !live {
- assert.NilError(t, l.Close())
- l = factory(t)
- }
- defer func() { assert.NilError(t, l.Close()) }()
- for _, tt := range []struct {
- name string
- cfg logger.ReadConfig
- }{
- {name: "Zero", cfg: logger.ReadConfig{}},
- {name: "All", cfg: logger.ReadConfig{Tail: -1}},
- {name: "Tail", cfg: logger.ReadConfig{Tail: 42}},
- {name: "Since", cfg: logger.ReadConfig{Since: time.Unix(1, 0)}},
- {name: "Until", cfg: logger.ReadConfig{Until: time.Date(2100, time.January, 1, 1, 1, 1, 0, time.UTC)}},
- {name: "SinceAndUntil", cfg: logger.ReadConfig{Since: time.Unix(1, 0), Until: time.Date(2100, time.January, 1, 1, 1, 1, 0, time.UTC)}},
- } {
- tt := tt
- t.Run(tt.name, func(t *testing.T) {
- t.Parallel()
- lw := l.(logger.LogReader).ReadLogs(logger.ReadConfig{})
- defer lw.ConsumerGone()
- assert.DeepEqual(t, readAll(t, lw), ([]*logger.Message)(nil), cmpopts.EquateEmpty())
- })
- }
- }
- // TestFollow tests the LogReader's follow implementation.
- //
- // The LogReader is expected to be able to follow an arbitrary number of
- // messages at a high rate with no dropped messages.
- func (tr Reader) TestFollow(t *testing.T) {
- // Reader sends all logs and closes after logger is closed
- // - Starting from empty log (like run)
- t.Run("FromEmptyLog", func(t *testing.T) {
- t.Parallel()
- l := tr.Factory(t, logger.Info{
- ContainerID: "followstart0",
- ContainerName: "logloglog",
- })(t)
- lw := l.(logger.LogReader).ReadLogs(logger.ReadConfig{Tail: -1, Follow: true})
- defer lw.ConsumerGone()
- doneReading := make(chan struct{})
- var logs []*logger.Message
- go func() {
- defer close(doneReading)
- logs = readAll(t, lw)
- }()
- mm := makeTestMessages()
- expected := logMessages(t, l, mm)
- assert.NilError(t, l.Close())
- <-doneReading
- assert.DeepEqual(t, logs, expected, compareLog)
- })
- t.Run("AttachMidStream", func(t *testing.T) {
- t.Parallel()
- l := tr.Factory(t, logger.Info{
- ContainerID: "followmiddle",
- ContainerName: "logloglog",
- })(t)
- mm := makeTestMessages()
- expected := logMessages(t, l, mm[0:1])
- lw := l.(logger.LogReader).ReadLogs(logger.ReadConfig{Tail: -1, Follow: true})
- defer lw.ConsumerGone()
- doneReading := make(chan struct{})
- var logs []*logger.Message
- go func() {
- defer close(doneReading)
- logs = readAll(t, lw)
- }()
- expected = append(expected, logMessages(t, l, mm[1:])...)
- assert.NilError(t, l.Close())
- <-doneReading
- assert.DeepEqual(t, logs, expected, compareLog)
- })
- t.Run("Since", func(t *testing.T) {
- t.Parallel()
- l := tr.Factory(t, logger.Info{
- ContainerID: "followsince0",
- ContainerName: "logloglog",
- })(t)
- mm := makeTestMessages()
- lw := l.(logger.LogReader).ReadLogs(logger.ReadConfig{Tail: -1, Follow: true, Since: mm[2].Timestamp.Truncate(time.Millisecond)})
- defer lw.ConsumerGone()
- doneReading := make(chan struct{})
- var logs []*logger.Message
- go func() {
- defer close(doneReading)
- logs = readAll(t, lw)
- }()
- expected := logMessages(t, l, mm)[2:]
- assert.NilError(t, l.Close())
- <-doneReading
- assert.DeepEqual(t, logs, expected, compareLog)
- })
- t.Run("Until", func(t *testing.T) {
- t.Parallel()
- l := tr.Factory(t, logger.Info{
- ContainerID: "followuntil0",
- ContainerName: "logloglog",
- })(t)
- mm := makeTestMessages()
- lw := l.(logger.LogReader).ReadLogs(logger.ReadConfig{Tail: -1, Follow: true, Until: mm[2].Timestamp.Add(-time.Millisecond)})
- defer lw.ConsumerGone()
- doneReading := make(chan struct{})
- var logs []*logger.Message
- go func() {
- defer close(doneReading)
- logs = readAll(t, lw)
- }()
- expected := logMessages(t, l, mm)[:2]
- defer assert.NilError(t, l.Close()) // Reading should end before the logger is closed.
- <-doneReading
- assert.DeepEqual(t, logs, expected, compareLog)
- })
- t.Run("SinceAndUntil", func(t *testing.T) {
- t.Parallel()
- l := tr.Factory(t, logger.Info{
- ContainerID: "followbounded",
- ContainerName: "logloglog",
- })(t)
- mm := makeTestMessages()
- lw := l.(logger.LogReader).ReadLogs(logger.ReadConfig{Tail: -1, Follow: true, Since: mm[1].Timestamp.Add(-time.Millisecond), Until: mm[2].Timestamp.Add(-time.Millisecond)})
- defer lw.ConsumerGone()
- doneReading := make(chan struct{})
- var logs []*logger.Message
- go func() {
- defer close(doneReading)
- logs = readAll(t, lw)
- }()
- expected := logMessages(t, l, mm)[1:2]
- defer assert.NilError(t, l.Close()) // Reading should end before the logger is closed.
- <-doneReading
- assert.DeepEqual(t, logs, expected, compareLog)
- })
- t.Run("Tail=0", func(t *testing.T) {
- t.Parallel()
- l := tr.Factory(t, logger.Info{
- ContainerID: "followtail00",
- ContainerName: "logloglog",
- })(t)
- mm := makeTestMessages()
- logMessages(t, l, mm[0:2])
- syncLogger(t, l)
- lw := l.(logger.LogReader).ReadLogs(logger.ReadConfig{Tail: 0, Follow: true})
- defer lw.ConsumerGone()
- doneReading := make(chan struct{})
- var logs []*logger.Message
- go func() {
- defer close(doneReading)
- logs = readAll(t, lw)
- }()
- expected := logMessages(t, l, mm[2:])
- assert.NilError(t, l.Close())
- <-doneReading
- assert.DeepEqual(t, logs, expected, compareLog)
- })
- t.Run("Tail>0", func(t *testing.T) {
- t.Parallel()
- l := tr.Factory(t, logger.Info{
- ContainerID: "followtail00",
- ContainerName: "logloglog",
- })(t)
- mm := makeTestMessages()
- expected := logMessages(t, l, mm[0:2])[1:]
- syncLogger(t, l)
- lw := l.(logger.LogReader).ReadLogs(logger.ReadConfig{Tail: 1, Follow: true})
- defer lw.ConsumerGone()
- doneReading := make(chan struct{})
- var logs []*logger.Message
- go func() {
- defer close(doneReading)
- logs = readAll(t, lw)
- }()
- expected = append(expected, logMessages(t, l, mm[2:])...)
- assert.NilError(t, l.Close())
- <-doneReading
- assert.DeepEqual(t, logs, expected, compareLog)
- })
- t.Run("MultipleStarts", func(t *testing.T) {
- t.Parallel()
- factory := tr.Factory(t, logger.Info{
- ContainerID: "startrestart",
- ContainerName: "startmeup",
- })
- mm := makeTestMessages()
- l := factory(t)
- expected := logMessages(t, l, mm[:3])
- assert.NilError(t, l.Close())
- l = factory(t)
- lw := l.(logger.LogReader).ReadLogs(logger.ReadConfig{Tail: -1, Follow: true})
- defer lw.ConsumerGone()
- doneReading := make(chan struct{})
- var logs []*logger.Message
- go func() {
- defer close(doneReading)
- logs = readAll(t, lw)
- }()
- expected = append(expected, logMessages(t, l, mm[3:])...)
- assert.NilError(t, l.Close())
- <-doneReading
- assert.DeepEqual(t, logs, expected, compareLog)
- })
- t.Run("Concurrent", tr.TestConcurrent)
- }
- // TestConcurrent tests the Logger and its LogReader implementation for
- // race conditions when logging from multiple goroutines concurrently.
- func (tr Reader) TestConcurrent(t *testing.T) {
- t.Parallel()
- l := tr.Factory(t, logger.Info{
- ContainerID: "logconcurrent0",
- ContainerName: "logconcurrent123",
- })(t)
- // Split test messages
- stderrMessages := []*logger.Message{}
- stdoutMessages := []*logger.Message{}
- for _, m := range makeTestMessages() {
- if m.Source == "stdout" {
- stdoutMessages = append(stdoutMessages, m)
- } else if m.Source == "stderr" {
- stderrMessages = append(stderrMessages, m)
- }
- }
- // Follow all logs
- lw := l.(logger.LogReader).ReadLogs(logger.ReadConfig{Follow: true, Tail: -1})
- defer lw.ConsumerGone()
- // Log concurrently from two sources and close log
- wg := &sync.WaitGroup{}
- logAll := func(msgs []*logger.Message) {
- defer wg.Done()
- for _, m := range msgs {
- l.Log(copyLogMessage(m))
- }
- }
- closed := make(chan struct{})
- wg.Add(2)
- go logAll(stdoutMessages)
- go logAll(stderrMessages)
- go func() {
- defer close(closed)
- defer l.Close()
- wg.Wait()
- }()
- // Check if the message count, order and content is equal to what was logged
- for {
- l := readMessage(t, lw)
- if l == nil {
- break
- }
- var messages *[]*logger.Message
- if l.Source == "stdout" {
- messages = &stdoutMessages
- } else if l.Source == "stderr" {
- messages = &stderrMessages
- } else {
- t.Fatalf("Corrupted message.Source = %q", l.Source)
- }
- expectedMsg := transformToExpected((*messages)[0])
- assert.DeepEqual(t, *expectedMsg, *l, compareLog)
- *messages = (*messages)[1:]
- }
- assert.Equal(t, len(stdoutMessages), 0)
- assert.Equal(t, len(stderrMessages), 0)
- // Make sure log gets closed before we return
- // so the temporary dir can be deleted
- <-closed
- }
- // logMessages logs messages to l and returns a slice of messages as would be
- // expected to be read back. The message values are not modified and the
- // returned slice of messages are deep-copied.
- func logMessages(t *testing.T, l logger.Logger, messages []*logger.Message) []*logger.Message {
- t.Helper()
- var expected []*logger.Message
- for _, m := range messages {
- // Copy the log message because the underlying log writer resets
- // the log message and returns it to a buffer pool.
- assert.NilError(t, l.Log(copyLogMessage(m)))
- runtime.Gosched()
- expect := transformToExpected(m)
- expected = append(expected, expect)
- }
- return expected
- }
- // Existing API consumers expect a newline to be appended to
- // messages other than nonterminal partials as that matches the
- // existing behavior of the json-file log driver.
- func transformToExpected(m *logger.Message) *logger.Message {
- // Copy the log message again so as not to mutate the input.
- copy := copyLogMessage(m)
- if m.PLogMetaData == nil || m.PLogMetaData.Last {
- copy.Line = append(copy.Line, '\n')
- }
- return copy
- }
- func copyLogMessage(src *logger.Message) *logger.Message {
- dst := logger.NewMessage()
- dst.Source = src.Source
- dst.Timestamp = src.Timestamp
- dst.Attrs = src.Attrs
- dst.Err = src.Err
- dst.Line = append(dst.Line, src.Line...)
- if src.PLogMetaData != nil {
- lmd := *src.PLogMetaData
- dst.PLogMetaData = &lmd
- }
- return dst
- }
- func readMessage(t *testing.T, lw *logger.LogWatcher) *logger.Message {
- t.Helper()
- timeout := time.NewTimer(5 * time.Second)
- defer timeout.Stop()
- select {
- case <-timeout.C:
- t.Error("timed out waiting for message")
- return nil
- case err, open := <-lw.Err:
- t.Errorf("unexpected receive on lw.Err: err=%v, open=%v", err, open)
- return nil
- case msg, open := <-lw.Msg:
- if !open {
- select {
- case err, open := <-lw.Err:
- t.Errorf("unexpected receive on lw.Err with closed lw.Msg: err=%v, open=%v", err, open)
- default:
- }
- return nil
- }
- assert.Assert(t, msg != nil)
- t.Logf("[%v] %s: %s", msg.Timestamp, msg.Source, msg.Line)
- return msg
- }
- }
- func readAll(t *testing.T, lw *logger.LogWatcher) []*logger.Message {
- t.Helper()
- var msgs []*logger.Message
- for {
- m := readMessage(t, lw)
- if m == nil {
- return msgs
- }
- msgs = append(msgs, m)
- }
- }
|