123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213 |
- package logger // import "github.com/docker/docker/daemon/logger"
- import (
- "encoding/binary"
- "io"
- "sync"
- "testing"
- "time"
- "github.com/docker/docker/api/types/plugins/logdriver"
- protoio "github.com/gogo/protobuf/io"
- "gotest.tools/v3/assert"
- is "gotest.tools/v3/assert/cmp"
- )
- // mockLoggingPlugin implements the loggingPlugin interface for testing purposes
- // it only supports a single log stream
- type mockLoggingPlugin struct {
- io.WriteCloser
- inStream io.Reader
- logs []*logdriver.LogEntry
- c *sync.Cond
- err error
- }
- func newMockLoggingPlugin() *mockLoggingPlugin {
- r, w := io.Pipe()
- return &mockLoggingPlugin{
- WriteCloser: w,
- inStream: r,
- logs: []*logdriver.LogEntry{},
- c: sync.NewCond(new(sync.Mutex)),
- }
- }
- func (l *mockLoggingPlugin) StartLogging(file string, info Info) error {
- go func() {
- dec := protoio.NewUint32DelimitedReader(l.inStream, binary.BigEndian, 1e6)
- for {
- var msg logdriver.LogEntry
- if err := dec.ReadMsg(&msg); err != nil {
- l.c.L.Lock()
- if l.err == nil {
- l.err = err
- }
- l.c.L.Unlock()
- l.c.Broadcast()
- return
- }
- l.c.L.Lock()
- l.logs = append(l.logs, &msg)
- l.c.L.Unlock()
- l.c.Broadcast()
- }
- }()
- return nil
- }
- func (l *mockLoggingPlugin) StopLogging(file string) error {
- l.c.L.Lock()
- if l.err == nil {
- l.err = io.EOF
- }
- l.c.L.Unlock()
- l.c.Broadcast()
- return nil
- }
- func (l *mockLoggingPlugin) Capabilities() (cap Capability, err error) {
- return Capability{ReadLogs: true}, nil
- }
- func (l *mockLoggingPlugin) ReadLogs(info Info, config ReadConfig) (io.ReadCloser, error) {
- r, w := io.Pipe()
- go func() {
- var idx int
- enc := logdriver.NewLogEntryEncoder(w)
- l.c.L.Lock()
- defer l.c.L.Unlock()
- for {
- if l.err != nil {
- w.Close()
- return
- }
- if idx >= len(l.logs) {
- if !config.Follow {
- w.Close()
- return
- }
- l.c.Wait()
- continue
- }
- if err := enc.Encode(l.logs[idx]); err != nil {
- w.CloseWithError(err)
- return
- }
- idx++
- }
- }()
- return r, nil
- }
- func (l *mockLoggingPlugin) waitLen(i int) {
- l.c.L.Lock()
- defer l.c.L.Unlock()
- for len(l.logs) < i {
- l.c.Wait()
- }
- }
- func (l *mockLoggingPlugin) check(t *testing.T) {
- if l.err != nil && l.err != io.EOF {
- t.Fatal(l.err)
- }
- }
- func newMockPluginAdapter(plugin *mockLoggingPlugin) Logger {
- enc := logdriver.NewLogEntryEncoder(plugin)
- a := &pluginAdapterWithRead{
- &pluginAdapter{
- plugin: plugin,
- stream: plugin,
- enc: enc,
- },
- }
- a.plugin.StartLogging("", Info{})
- return a
- }
- func TestAdapterReadLogs(t *testing.T) {
- plugin := newMockLoggingPlugin()
- l := newMockPluginAdapter(plugin)
- testMsg := []Message{
- {Line: []byte("Are you the keymaker?"), Timestamp: time.Now()},
- {Line: []byte("Follow the white rabbit"), Timestamp: time.Now()},
- }
- for _, msg := range testMsg {
- m := msg.copy()
- assert.Check(t, l.Log(m))
- }
- // Wait until messages are read into plugin
- plugin.waitLen(len(testMsg))
- lr, ok := l.(LogReader)
- assert.Check(t, ok, "Logger does not implement LogReader")
- lw := lr.ReadLogs(ReadConfig{})
- for _, x := range testMsg {
- select {
- case msg := <-lw.Msg:
- testMessageEqual(t, &x, msg)
- case <-time.After(10 * time.Second):
- t.Fatal("timeout reading logs")
- }
- }
- select {
- case _, ok := <-lw.Msg:
- assert.Check(t, !ok, "expected message channel to be closed")
- case <-time.After(10 * time.Second):
- t.Fatal("timeout waiting for message channel to close")
- }
- lw.ConsumerGone()
- lw = lr.ReadLogs(ReadConfig{Follow: true})
- for _, x := range testMsg {
- select {
- case msg := <-lw.Msg:
- testMessageEqual(t, &x, msg)
- case <-time.After(10 * time.Second):
- t.Fatal("timeout reading logs")
- }
- }
- x := Message{Line: []byte("Too infinity and beyond!"), Timestamp: time.Now()}
- assert.Check(t, l.Log(x.copy()))
- select {
- case msg, ok := <-lw.Msg:
- assert.Check(t, ok, "message channel unexpectedly closed")
- testMessageEqual(t, &x, msg)
- case <-time.After(10 * time.Second):
- t.Fatal("timeout reading logs")
- }
- l.Close()
- select {
- case msg, ok := <-lw.Msg:
- assert.Check(t, !ok, "expected message channel to be closed")
- assert.Check(t, is.Nil(msg))
- case <-time.After(10 * time.Second):
- t.Fatal("timeout waiting for logger to close")
- }
- plugin.check(t)
- }
- func testMessageEqual(t *testing.T, a, b *Message) {
- assert.Check(t, is.DeepEqual(a.Line, b.Line))
- assert.Check(t, is.DeepEqual(a.Timestamp.UnixNano(), b.Timestamp.UnixNano()))
- assert.Check(t, is.Equal(a.Source, b.Source))
- }
|