adapter_test.go 4.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213
  1. package logger // import "github.com/docker/docker/daemon/logger"
  2. import (
  3. "encoding/binary"
  4. "io"
  5. "sync"
  6. "testing"
  7. "time"
  8. "github.com/docker/docker/api/types/plugins/logdriver"
  9. protoio "github.com/gogo/protobuf/io"
  10. "gotest.tools/v3/assert"
  11. is "gotest.tools/v3/assert/cmp"
  12. )
  13. // mockLoggingPlugin implements the loggingPlugin interface for testing purposes
  14. // it only supports a single log stream
  15. type mockLoggingPlugin struct {
  16. io.WriteCloser
  17. inStream io.Reader
  18. logs []*logdriver.LogEntry
  19. c *sync.Cond
  20. err error
  21. }
  22. func newMockLoggingPlugin() *mockLoggingPlugin {
  23. r, w := io.Pipe()
  24. return &mockLoggingPlugin{
  25. WriteCloser: w,
  26. inStream: r,
  27. logs: []*logdriver.LogEntry{},
  28. c: sync.NewCond(new(sync.Mutex)),
  29. }
  30. }
  31. func (l *mockLoggingPlugin) StartLogging(file string, info Info) error {
  32. go func() {
  33. dec := protoio.NewUint32DelimitedReader(l.inStream, binary.BigEndian, 1e6)
  34. for {
  35. var msg logdriver.LogEntry
  36. if err := dec.ReadMsg(&msg); err != nil {
  37. l.c.L.Lock()
  38. if l.err == nil {
  39. l.err = err
  40. }
  41. l.c.L.Unlock()
  42. l.c.Broadcast()
  43. return
  44. }
  45. l.c.L.Lock()
  46. l.logs = append(l.logs, &msg)
  47. l.c.L.Unlock()
  48. l.c.Broadcast()
  49. }
  50. }()
  51. return nil
  52. }
  53. func (l *mockLoggingPlugin) StopLogging(file string) error {
  54. l.c.L.Lock()
  55. if l.err == nil {
  56. l.err = io.EOF
  57. }
  58. l.c.L.Unlock()
  59. l.c.Broadcast()
  60. return nil
  61. }
  62. func (l *mockLoggingPlugin) Capabilities() (cap Capability, err error) {
  63. return Capability{ReadLogs: true}, nil
  64. }
  65. func (l *mockLoggingPlugin) ReadLogs(info Info, config ReadConfig) (io.ReadCloser, error) {
  66. r, w := io.Pipe()
  67. go func() {
  68. var idx int
  69. enc := logdriver.NewLogEntryEncoder(w)
  70. l.c.L.Lock()
  71. defer l.c.L.Unlock()
  72. for {
  73. if l.err != nil {
  74. w.Close()
  75. return
  76. }
  77. if idx >= len(l.logs) {
  78. if !config.Follow {
  79. w.Close()
  80. return
  81. }
  82. l.c.Wait()
  83. continue
  84. }
  85. if err := enc.Encode(l.logs[idx]); err != nil {
  86. w.CloseWithError(err)
  87. return
  88. }
  89. idx++
  90. }
  91. }()
  92. return r, nil
  93. }
  94. func (l *mockLoggingPlugin) waitLen(i int) {
  95. l.c.L.Lock()
  96. defer l.c.L.Unlock()
  97. for len(l.logs) < i {
  98. l.c.Wait()
  99. }
  100. }
  101. func (l *mockLoggingPlugin) check(t *testing.T) {
  102. if l.err != nil && l.err != io.EOF {
  103. t.Fatal(l.err)
  104. }
  105. }
  106. func newMockPluginAdapter(plugin *mockLoggingPlugin) Logger {
  107. enc := logdriver.NewLogEntryEncoder(plugin)
  108. a := &pluginAdapterWithRead{
  109. &pluginAdapter{
  110. plugin: plugin,
  111. stream: plugin,
  112. enc: enc,
  113. },
  114. }
  115. a.plugin.StartLogging("", Info{})
  116. return a
  117. }
  118. func TestAdapterReadLogs(t *testing.T) {
  119. plugin := newMockLoggingPlugin()
  120. l := newMockPluginAdapter(plugin)
  121. testMsg := []Message{
  122. {Line: []byte("Are you the keymaker?"), Timestamp: time.Now()},
  123. {Line: []byte("Follow the white rabbit"), Timestamp: time.Now()},
  124. }
  125. for _, msg := range testMsg {
  126. m := msg.copy()
  127. assert.Check(t, l.Log(m))
  128. }
  129. // Wait until messages are read into plugin
  130. plugin.waitLen(len(testMsg))
  131. lr, ok := l.(LogReader)
  132. assert.Check(t, ok, "Logger does not implement LogReader")
  133. lw := lr.ReadLogs(ReadConfig{})
  134. for _, x := range testMsg {
  135. select {
  136. case msg := <-lw.Msg:
  137. testMessageEqual(t, &x, msg)
  138. case <-time.After(10 * time.Second):
  139. t.Fatal("timeout reading logs")
  140. }
  141. }
  142. select {
  143. case _, ok := <-lw.Msg:
  144. assert.Check(t, !ok, "expected message channel to be closed")
  145. case <-time.After(10 * time.Second):
  146. t.Fatal("timeout waiting for message channel to close")
  147. }
  148. lw.ConsumerGone()
  149. lw = lr.ReadLogs(ReadConfig{Follow: true})
  150. for _, x := range testMsg {
  151. select {
  152. case msg := <-lw.Msg:
  153. testMessageEqual(t, &x, msg)
  154. case <-time.After(10 * time.Second):
  155. t.Fatal("timeout reading logs")
  156. }
  157. }
  158. x := Message{Line: []byte("Too infinity and beyond!"), Timestamp: time.Now()}
  159. assert.Check(t, l.Log(x.copy()))
  160. select {
  161. case msg, ok := <-lw.Msg:
  162. assert.Check(t, ok, "message channel unexpectedly closed")
  163. testMessageEqual(t, &x, msg)
  164. case <-time.After(10 * time.Second):
  165. t.Fatal("timeout reading logs")
  166. }
  167. l.Close()
  168. select {
  169. case msg, ok := <-lw.Msg:
  170. assert.Check(t, !ok, "expected message channel to be closed")
  171. assert.Check(t, is.Nil(msg))
  172. case <-time.After(10 * time.Second):
  173. t.Fatal("timeout waiting for logger to close")
  174. }
  175. plugin.check(t)
  176. }
  177. func testMessageEqual(t *testing.T, a, b *Message) {
  178. assert.Check(t, is.DeepEqual(a.Line, b.Line))
  179. assert.Check(t, is.DeepEqual(a.Timestamp.UnixNano(), b.Timestamp.UnixNano()))
  180. assert.Check(t, is.Equal(a.Source, b.Source))
  181. }