瀏覽代碼

daemon/logger: add test suite for LogReaders

Add an extensive test suite for validating the behavior of any
LogReader. Test the current LogFile-based implementations against it.

Signed-off-by: Cory Snider <csnider@mirantis.com>
Cory Snider 3 年之前
父節點
當前提交
9aa9d6fafc
共有 3 個文件被更改,包括 480 次插入87 次删除
  1. 15 0
      daemon/logger/jsonfilelog/read_test.go
  2. 11 87
      daemon/logger/local/local_test.go
  3. 454 0
      daemon/logger/loggertest/logreader.go

+ 15 - 0
daemon/logger/jsonfilelog/read_test.go

@@ -9,6 +9,7 @@ import (
 	"time"
 	"time"
 
 
 	"github.com/docker/docker/daemon/logger"
 	"github.com/docker/docker/daemon/logger"
+	"github.com/docker/docker/daemon/logger/loggertest"
 	"gotest.tools/v3/assert"
 	"gotest.tools/v3/assert"
 )
 )
 
 
@@ -139,6 +140,20 @@ func TestUnexpectedEOF(t *testing.T) {
 	assert.Error(t, err, io.EOF.Error())
 	assert.Error(t, err, io.EOF.Error())
 }
 }
 
 
+func TestReadLogs(t *testing.T) {
+	loggertest.Reader{
+		Factory: func(t *testing.T, info logger.Info) func(*testing.T) logger.Logger {
+			dir := t.TempDir()
+			info.LogPath = filepath.Join(dir, info.ContainerID+".log")
+			return func(t *testing.T) logger.Logger {
+				l, err := New(info)
+				assert.NilError(t, err)
+				return l
+			}
+		},
+	}.Do(t)
+}
+
 type readerWithErr struct {
 type readerWithErr struct {
 	err   error
 	err   error
 	after int
 	after int

+ 11 - 87
daemon/logger/local/local_test.go

@@ -2,19 +2,18 @@ package local
 
 
 import (
 import (
 	"bytes"
 	"bytes"
-	"context"
 	"encoding/binary"
 	"encoding/binary"
 	"fmt"
 	"fmt"
 	"io"
 	"io"
 	"os"
 	"os"
 	"path/filepath"
 	"path/filepath"
-	"strings"
 	"testing"
 	"testing"
 	"time"
 	"time"
 
 
 	"github.com/docker/docker/api/types/backend"
 	"github.com/docker/docker/api/types/backend"
 	"github.com/docker/docker/api/types/plugins/logdriver"
 	"github.com/docker/docker/api/types/plugins/logdriver"
 	"github.com/docker/docker/daemon/logger"
 	"github.com/docker/docker/daemon/logger"
+	"github.com/docker/docker/daemon/logger/loggertest"
 	protoio "github.com/gogo/protobuf/io"
 	protoio "github.com/gogo/protobuf/io"
 	"gotest.tools/v3/assert"
 	"gotest.tools/v3/assert"
 	is "gotest.tools/v3/assert/cmp"
 	is "gotest.tools/v3/assert/cmp"
@@ -80,92 +79,17 @@ func TestWriteLog(t *testing.T) {
 }
 }
 
 
 func TestReadLog(t *testing.T) {
 func TestReadLog(t *testing.T) {
-	t.Parallel()
-
-	dir, err := os.MkdirTemp("", t.Name())
-	assert.NilError(t, err)
-	defer os.RemoveAll(dir)
-
-	logPath := filepath.Join(dir, "test.log")
-	l, err := New(logger.Info{LogPath: logPath})
-	assert.NilError(t, err)
-	defer l.Close()
-
-	m1 := logger.Message{Source: "stdout", Timestamp: time.Now().Add(-1 * 30 * time.Minute), Line: []byte("a message")}
-	m2 := logger.Message{Source: "stdout", Timestamp: time.Now().Add(-1 * 20 * time.Minute), Line: []byte("another message"), PLogMetaData: &backend.PartialLogMetaData{Ordinal: 1, Last: true}}
-	longMessage := []byte("a really long message " + strings.Repeat("a", initialBufSize*2))
-	m3 := logger.Message{Source: "stderr", Timestamp: time.Now().Add(-1 * 10 * time.Minute), Line: longMessage}
-	m4 := logger.Message{Source: "stderr", Timestamp: time.Now().Add(-1 * 10 * time.Minute), Line: []byte("just one more message")}
-
-	// copy the log message because the underlying log writer resets the log message and returns it to a buffer pool
-	err = l.Log(copyLogMessage(&m1))
-	assert.NilError(t, err)
-	err = l.Log(copyLogMessage(&m2))
-	assert.NilError(t, err)
-	err = l.Log(copyLogMessage(&m3))
-	assert.NilError(t, err)
-	err = l.Log(copyLogMessage(&m4))
-	assert.NilError(t, err)
-
-	lr := l.(logger.LogReader)
-
-	testMessage := func(t *testing.T, lw *logger.LogWatcher, m *logger.Message) {
-		t.Helper()
-		ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
-		defer cancel()
-		select {
-		case <-ctx.Done():
-			assert.Assert(t, ctx.Err())
-		case err := <-lw.Err:
-			assert.NilError(t, err)
-		case msg, open := <-lw.Msg:
-			if !open {
-				select {
-				case err := <-lw.Err:
-					assert.NilError(t, err)
-				default:
-					assert.Assert(t, m == nil)
-					return
-				}
+	loggertest.Reader{
+		Factory: func(t *testing.T, info logger.Info) func(*testing.T) logger.Logger {
+			dir := t.TempDir()
+			info.LogPath = filepath.Join(dir, info.ContainerID+".log")
+			return func(t *testing.T) logger.Logger {
+				l, err := New(info)
+				assert.NilError(t, err)
+				return l
 			}
 			}
-			assert.Assert(t, m != nil)
-			if m.PLogMetaData == nil {
-				// a `\n` is appended on read to make this work with the existing API's when the message is not a partial.
-				// make sure it's the last entry in the line, and then truncate it for the deep equal below.
-				assert.Check(t, msg.Line[len(msg.Line)-1] == '\n')
-				msg.Line = msg.Line[:len(msg.Line)-1]
-			}
-			assert.Check(t, is.DeepEqual(m, msg), fmt.Sprintf("\n%+v\n%+v", m, msg))
-		}
-	}
-
-	t.Run("tail exact", func(t *testing.T) {
-		lw := lr.ReadLogs(logger.ReadConfig{Tail: 4})
-
-		testMessage(t, lw, &m1)
-		testMessage(t, lw, &m2)
-		testMessage(t, lw, &m3)
-		testMessage(t, lw, &m4)
-		testMessage(t, lw, nil) // no more messages
-	})
-
-	t.Run("tail less than available", func(t *testing.T) {
-		lw := lr.ReadLogs(logger.ReadConfig{Tail: 2})
-
-		testMessage(t, lw, &m3)
-		testMessage(t, lw, &m4)
-		testMessage(t, lw, nil) // no more messages
-	})
-
-	t.Run("tail more than available", func(t *testing.T) {
-		lw := lr.ReadLogs(logger.ReadConfig{Tail: 100})
-
-		testMessage(t, lw, &m1)
-		testMessage(t, lw, &m2)
-		testMessage(t, lw, &m3)
-		testMessage(t, lw, &m4)
-		testMessage(t, lw, nil) // no more messages
-	})
+		},
+	}.Do(t)
 }
 }
 
 
 func BenchmarkLogWrite(b *testing.B) {
 func BenchmarkLogWrite(b *testing.B) {

+ 454 - 0
daemon/logger/loggertest/logreader.go

@@ -0,0 +1,454 @@
+package loggertest // import "github.com/docker/docker/daemon/logger/loggertest"
+
+import (
+	"strings"
+	"testing"
+	"time"
+
+	"github.com/google/go-cmp/cmp"
+	"github.com/google/go-cmp/cmp/cmpopts"
+	"gotest.tools/v3/assert"
+
+	"github.com/docker/docker/api/types/backend"
+	"github.com/docker/docker/daemon/logger"
+)
+
+// Reader tests that a logger.LogReader implementation behaves as it should.
+type Reader struct {
+	// Factory returns a function which constructs loggers for the container
+	// specified in info. Each call to the returned function must yield a
+	// distinct logger instance which can read back logs written by earlier
+	// instances.
+	Factory func(*testing.T, logger.Info) func(*testing.T) logger.Logger
+}
+
+var compareLog cmp.Options = []cmp.Option{
+	// The json-log driver does not round-trip PLogMetaData and API users do
+	// not expect it.
+	cmpopts.IgnoreFields(logger.Message{}, "PLogMetaData"),
+	cmp.Transformer("string", func(b []byte) string { return string(b) }),
+}
+
+// Do tests the behavior of the LogReader implementation.
+func (tr Reader) Do(t *testing.T) {
+	t.Run("Live/Tail", func(t *testing.T) { tr.testTail(t, true) })
+	t.Run("Live/TailEmpty", func(t *testing.T) { tr.testTailEmptyLogs(t, true) })
+	t.Run("Live/Follow", tr.testFollow)
+	t.Run("Stopped/Tail", func(t *testing.T) { tr.testTail(t, false) })
+	t.Run("Stopped/TailEmpty", func(t *testing.T) { tr.testTailEmptyLogs(t, false) })
+}
+
+func makeTestMessages() []*logger.Message {
+	return []*logger.Message{
+		{Source: "stdout", Timestamp: time.Now().Add(-1 * 30 * time.Minute), Line: []byte("a message")},
+		{Source: "stdout", Timestamp: time.Now().Add(-1 * 20 * time.Minute), Line: []byte("another message"), PLogMetaData: &backend.PartialLogMetaData{ID: "aaaaaaaa", Ordinal: 1, Last: true}},
+		{Source: "stderr", Timestamp: time.Now().Add(-1 * 15 * time.Minute), Line: []byte("to be..."), PLogMetaData: &backend.PartialLogMetaData{ID: "bbbbbbbb", Ordinal: 1}},
+		{Source: "stderr", Timestamp: time.Now().Add(-1 * 15 * time.Minute), Line: []byte("continued"), PLogMetaData: &backend.PartialLogMetaData{ID: "bbbbbbbb", Ordinal: 2, Last: true}},
+		{Source: "stderr", Timestamp: time.Now().Add(-1 * 10 * time.Minute), Line: []byte("a really long message " + strings.Repeat("a", 4096))},
+		{Source: "stderr", Timestamp: time.Now().Add(-1 * 10 * time.Minute), Line: []byte("just one more message")},
+	}
+
+}
+
+func (tr Reader) testTail(t *testing.T, live bool) {
+	t.Parallel()
+	factory := tr.Factory(t, logger.Info{
+		ContainerID:   "tailtest0000",
+		ContainerName: "logtail",
+	})
+	l := factory(t)
+	if live {
+		defer func() { assert.NilError(t, l.Close()) }()
+	}
+
+	mm := makeTestMessages()
+	expected := logMessages(t, l, mm)
+
+	if !live {
+		// Simulate reading from a stopped container.
+		assert.NilError(t, l.Close())
+		l = factory(t)
+		defer func() { assert.NilError(t, l.Close()) }()
+	}
+	lr := l.(logger.LogReader)
+
+	t.Run("Exact", func(t *testing.T) {
+		t.Parallel()
+		lw := lr.ReadLogs(logger.ReadConfig{Tail: len(mm)})
+		defer lw.ConsumerGone()
+		assert.DeepEqual(t, readAll(t, lw), expected, compareLog)
+	})
+
+	t.Run("LessThanAvailable", func(t *testing.T) {
+		t.Parallel()
+		lw := lr.ReadLogs(logger.ReadConfig{Tail: 2})
+		defer lw.ConsumerGone()
+		assert.DeepEqual(t, readAll(t, lw), expected[len(mm)-2:], compareLog)
+	})
+
+	t.Run("MoreThanAvailable", func(t *testing.T) {
+		t.Parallel()
+		lw := lr.ReadLogs(logger.ReadConfig{Tail: 100})
+		defer lw.ConsumerGone()
+		assert.DeepEqual(t, readAll(t, lw), expected, compareLog)
+	})
+
+	t.Run("All", func(t *testing.T) {
+		t.Parallel()
+		lw := lr.ReadLogs(logger.ReadConfig{Tail: -1})
+		defer lw.ConsumerGone()
+		assert.DeepEqual(t, readAll(t, lw), expected, compareLog)
+	})
+
+	t.Run("Since", func(t *testing.T) {
+		t.Parallel()
+		lw := lr.ReadLogs(logger.ReadConfig{Tail: -1, Since: mm[1].Timestamp.Truncate(time.Millisecond)})
+		defer lw.ConsumerGone()
+		assert.DeepEqual(t, readAll(t, lw), expected[1:], compareLog)
+	})
+
+	t.Run("MoreThanSince", func(t *testing.T) {
+		t.Parallel()
+		lw := lr.ReadLogs(logger.ReadConfig{Tail: len(mm), Since: mm[1].Timestamp.Truncate(time.Millisecond)})
+		defer lw.ConsumerGone()
+		assert.DeepEqual(t, readAll(t, lw), expected[1:], compareLog)
+	})
+
+	t.Run("LessThanSince", func(t *testing.T) {
+		t.Parallel()
+		lw := lr.ReadLogs(logger.ReadConfig{Tail: len(mm) - 2, Since: mm[1].Timestamp.Truncate(time.Millisecond)})
+		defer lw.ConsumerGone()
+		assert.DeepEqual(t, readAll(t, lw), expected[2:], compareLog)
+	})
+
+	t.Run("Until", func(t *testing.T) {
+		t.Parallel()
+		lw := lr.ReadLogs(logger.ReadConfig{Tail: -1, Until: mm[2].Timestamp.Add(-time.Millisecond)})
+		defer lw.ConsumerGone()
+		assert.DeepEqual(t, readAll(t, lw), expected[:2], compareLog)
+	})
+
+	t.Run("SinceAndUntil", func(t *testing.T) {
+		t.Parallel()
+		lw := lr.ReadLogs(logger.ReadConfig{Tail: -1, Since: mm[1].Timestamp.Truncate(time.Millisecond), Until: mm[1].Timestamp.Add(time.Millisecond)})
+		defer lw.ConsumerGone()
+		assert.DeepEqual(t, readAll(t, lw), expected[1:2], compareLog)
+	})
+}
+
+func (tr Reader) testTailEmptyLogs(t *testing.T, live bool) {
+	t.Parallel()
+	factory := tr.Factory(t, logger.Info{
+		ContainerID:   "tailemptytest",
+		ContainerName: "logtail",
+	})
+	l := factory(t)
+	if !live {
+		assert.NilError(t, l.Close())
+		l = factory(t)
+	}
+	defer func() { assert.NilError(t, l.Close()) }()
+
+	for _, tt := range []struct {
+		name string
+		cfg  logger.ReadConfig
+	}{
+		{name: "Zero", cfg: logger.ReadConfig{}},
+		{name: "All", cfg: logger.ReadConfig{Tail: -1}},
+		{name: "Tail", cfg: logger.ReadConfig{Tail: 42}},
+		{name: "Since", cfg: logger.ReadConfig{Since: time.Unix(1, 0)}},
+		{name: "Until", cfg: logger.ReadConfig{Until: time.Date(2100, time.January, 1, 1, 1, 1, 0, time.UTC)}},
+		{name: "SinceAndUntil", cfg: logger.ReadConfig{Since: time.Unix(1, 0), Until: time.Date(2100, time.January, 1, 1, 1, 1, 0, time.UTC)}},
+	} {
+		tt := tt
+		t.Run(tt.name, func(t *testing.T) {
+			t.Parallel()
+			lw := l.(logger.LogReader).ReadLogs(logger.ReadConfig{})
+			defer lw.ConsumerGone()
+			assert.DeepEqual(t, readAll(t, lw), ([]*logger.Message)(nil), cmpopts.EquateEmpty())
+		})
+	}
+}
+
+func (tr Reader) testFollow(t *testing.T) {
+	t.Parallel()
+	// Reader sends all logs and closes after logger is closed
+	// - Starting from empty log (like run)
+	t.Run("FromEmptyLog", func(t *testing.T) {
+		t.Parallel()
+		l := tr.Factory(t, logger.Info{
+			ContainerID:   "followstart0",
+			ContainerName: "logloglog",
+		})(t)
+		lw := l.(logger.LogReader).ReadLogs(logger.ReadConfig{Tail: -1, Follow: true})
+		defer lw.ConsumerGone()
+
+		doneReading := make(chan struct{})
+		var logs []*logger.Message
+		go func() {
+			defer close(doneReading)
+			logs = readAll(t, lw)
+		}()
+
+		mm := makeTestMessages()
+		expected := logMessages(t, l, mm)
+		assert.NilError(t, l.Close())
+		<-doneReading
+		assert.DeepEqual(t, logs, expected, compareLog)
+	})
+
+	t.Run("AttachMidStream", func(t *testing.T) {
+		t.Parallel()
+		l := tr.Factory(t, logger.Info{
+			ContainerID:   "followmiddle",
+			ContainerName: "logloglog",
+		})(t)
+
+		mm := makeTestMessages()
+		expected := logMessages(t, l, mm[0:1])
+
+		lw := l.(logger.LogReader).ReadLogs(logger.ReadConfig{Tail: -1, Follow: true})
+		defer lw.ConsumerGone()
+
+		doneReading := make(chan struct{})
+		var logs []*logger.Message
+		go func() {
+			defer close(doneReading)
+			logs = readAll(t, lw)
+		}()
+
+		expected = append(expected, logMessages(t, l, mm[1:])...)
+		assert.NilError(t, l.Close())
+		<-doneReading
+		assert.DeepEqual(t, logs, expected, compareLog)
+	})
+
+	t.Run("Since", func(t *testing.T) {
+		t.Parallel()
+		l := tr.Factory(t, logger.Info{
+			ContainerID:   "followsince0",
+			ContainerName: "logloglog",
+		})(t)
+
+		mm := makeTestMessages()
+
+		lw := l.(logger.LogReader).ReadLogs(logger.ReadConfig{Tail: -1, Follow: true, Since: mm[2].Timestamp.Truncate(time.Millisecond)})
+		defer lw.ConsumerGone()
+
+		doneReading := make(chan struct{})
+		var logs []*logger.Message
+		go func() {
+			defer close(doneReading)
+			logs = readAll(t, lw)
+		}()
+
+		expected := logMessages(t, l, mm)[2:]
+		assert.NilError(t, l.Close())
+		<-doneReading
+		assert.DeepEqual(t, logs, expected, compareLog)
+	})
+
+	t.Run("Until", func(t *testing.T) {
+		t.Parallel()
+		l := tr.Factory(t, logger.Info{
+			ContainerID:   "followuntil0",
+			ContainerName: "logloglog",
+		})(t)
+
+		mm := makeTestMessages()
+
+		lw := l.(logger.LogReader).ReadLogs(logger.ReadConfig{Tail: -1, Follow: true, Until: mm[2].Timestamp.Add(-time.Millisecond)})
+		defer lw.ConsumerGone()
+
+		doneReading := make(chan struct{})
+		var logs []*logger.Message
+		go func() {
+			defer close(doneReading)
+			logs = readAll(t, lw)
+		}()
+
+		expected := logMessages(t, l, mm)[:2]
+		defer assert.NilError(t, l.Close()) // Reading should end before the logger is closed.
+		<-doneReading
+		assert.DeepEqual(t, logs, expected, compareLog)
+	})
+
+	t.Run("SinceAndUntil", func(t *testing.T) {
+		t.Parallel()
+		l := tr.Factory(t, logger.Info{
+			ContainerID:   "followbounded",
+			ContainerName: "logloglog",
+		})(t)
+
+		mm := makeTestMessages()
+
+		lw := l.(logger.LogReader).ReadLogs(logger.ReadConfig{Tail: -1, Follow: true, Since: mm[1].Timestamp.Add(-time.Millisecond), Until: mm[2].Timestamp.Add(-time.Millisecond)})
+		defer lw.ConsumerGone()
+
+		doneReading := make(chan struct{})
+		var logs []*logger.Message
+		go func() {
+			defer close(doneReading)
+			logs = readAll(t, lw)
+		}()
+
+		expected := logMessages(t, l, mm)[1:2]
+		defer assert.NilError(t, l.Close()) // Reading should end before the logger is closed.
+		<-doneReading
+		assert.DeepEqual(t, logs, expected, compareLog)
+	})
+
+	t.Run("Tail=0", func(t *testing.T) {
+		t.Parallel()
+		l := tr.Factory(t, logger.Info{
+			ContainerID:   "followtail00",
+			ContainerName: "logloglog",
+		})(t)
+
+		mm := makeTestMessages()
+		logMessages(t, l, mm[0:2])
+
+		lw := l.(logger.LogReader).ReadLogs(logger.ReadConfig{Tail: 0, Follow: true})
+		defer lw.ConsumerGone()
+
+		doneReading := make(chan struct{})
+		var logs []*logger.Message
+		go func() {
+			defer close(doneReading)
+			logs = readAll(t, lw)
+		}()
+
+		expected := logMessages(t, l, mm[2:])
+		assert.NilError(t, l.Close())
+		<-doneReading
+		assert.DeepEqual(t, logs, expected, compareLog)
+	})
+
+	t.Run("Tail>0", func(t *testing.T) {
+		t.Parallel()
+		l := tr.Factory(t, logger.Info{
+			ContainerID:   "followtail00",
+			ContainerName: "logloglog",
+		})(t)
+
+		mm := makeTestMessages()
+		expected := logMessages(t, l, mm[0:2])[1:]
+
+		lw := l.(logger.LogReader).ReadLogs(logger.ReadConfig{Tail: 1, Follow: true})
+		defer lw.ConsumerGone()
+
+		doneReading := make(chan struct{})
+		var logs []*logger.Message
+		go func() {
+			defer close(doneReading)
+			logs = readAll(t, lw)
+		}()
+
+		expected = append(expected, logMessages(t, l, mm[2:])...)
+		assert.NilError(t, l.Close())
+		<-doneReading
+		assert.DeepEqual(t, logs, expected, compareLog)
+	})
+
+	t.Run("MultipleStarts", func(t *testing.T) {
+		t.Parallel()
+		factory := tr.Factory(t, logger.Info{
+			ContainerID:   "startrestart",
+			ContainerName: "startmeup",
+		})
+
+		mm := makeTestMessages()
+		l := factory(t)
+		expected := logMessages(t, l, mm[:3])
+		assert.NilError(t, l.Close())
+
+		l = factory(t)
+		lw := l.(logger.LogReader).ReadLogs(logger.ReadConfig{Tail: -1, Follow: true})
+		defer lw.ConsumerGone()
+
+		doneReading := make(chan struct{})
+		var logs []*logger.Message
+		go func() {
+			defer close(doneReading)
+			logs = readAll(t, lw)
+		}()
+
+		expected = append(expected, logMessages(t, l, mm[3:])...)
+		assert.NilError(t, l.Close())
+		<-doneReading
+		assert.DeepEqual(t, logs, expected, compareLog)
+	})
+}
+
+// logMessages logs messages to l and returns a slice of messages as would be
+// expected to be read back. The message values are not modified and the
+// returned slice of messages are deep-copied.
+func logMessages(t *testing.T, l logger.Logger, messages []*logger.Message) []*logger.Message {
+	t.Helper()
+	var expected []*logger.Message
+	for _, m := range messages {
+		// Copy the log message because the underlying log writer resets
+		// the log message and returns it to a buffer pool.
+		assert.NilError(t, l.Log(copyLogMessage(m)))
+
+		// Copy the log message again so as not to mutate the input.
+		expect := copyLogMessage(m)
+		// Existing API consumers expect a newline to be appended to
+		// messages other than nonterminal partials as that matches the
+		// existing behavior of the json-file log driver.
+		if m.PLogMetaData == nil || m.PLogMetaData.Last {
+			expect.Line = append(expect.Line, '\n')
+		}
+		expected = append(expected, expect)
+	}
+	return expected
+}
+
+func copyLogMessage(src *logger.Message) *logger.Message {
+	dst := logger.NewMessage()
+	dst.Source = src.Source
+	dst.Timestamp = src.Timestamp
+	dst.Attrs = src.Attrs
+	dst.Err = src.Err
+	dst.Line = append(dst.Line, src.Line...)
+	if src.PLogMetaData != nil {
+		lmd := *src.PLogMetaData
+		dst.PLogMetaData = &lmd
+	}
+	return dst
+}
+func readMessage(t *testing.T, lw *logger.LogWatcher) *logger.Message {
+	t.Helper()
+	timeout := time.NewTimer(5 * time.Second)
+	defer timeout.Stop()
+	select {
+	case <-timeout.C:
+		t.Error("timed out waiting for message")
+		return nil
+	case err, open := <-lw.Err:
+		t.Errorf("unexpected receive on lw.Err: err=%v, open=%v", err, open)
+		return nil
+	case msg, open := <-lw.Msg:
+		if !open {
+			select {
+			case err, open := <-lw.Err:
+				t.Errorf("unexpected receive on lw.Err with closed lw.Msg: err=%v, open=%v", err, open)
+				return nil
+			default:
+			}
+		}
+		return msg
+	}
+}
+
+func readAll(t *testing.T, lw *logger.LogWatcher) []*logger.Message {
+	t.Helper()
+	var msgs []*logger.Message
+	for {
+		m := readMessage(t, lw)
+		if m == nil {
+			return msgs
+		}
+		msgs = append(msgs, m)
+	}
+}