adapter_test.go 4.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208
  1. package logger
  2. import (
  3. "bytes"
  4. "encoding/binary"
  5. "io"
  6. "io/ioutil"
  7. "os"
  8. "runtime"
  9. "testing"
  10. "time"
  11. "github.com/docker/docker/api/types/plugins/logdriver"
  12. protoio "github.com/gogo/protobuf/io"
  13. )
  14. // mockLoggingPlugin implements the loggingPlugin interface for testing purposes
  15. // it only supports a single log stream
  16. type mockLoggingPlugin struct {
  17. inStream io.ReadCloser
  18. f *os.File
  19. closed chan struct{}
  20. t *testing.T
  21. }
  22. func (l *mockLoggingPlugin) StartLogging(file string, info Info) error {
  23. go func() {
  24. io.Copy(l.f, l.inStream)
  25. close(l.closed)
  26. }()
  27. return nil
  28. }
  29. func (l *mockLoggingPlugin) StopLogging(file string) error {
  30. l.inStream.Close()
  31. l.f.Close()
  32. os.Remove(l.f.Name())
  33. return nil
  34. }
  35. func (l *mockLoggingPlugin) Capabilities() (cap Capability, err error) {
  36. return Capability{ReadLogs: true}, nil
  37. }
  38. func (l *mockLoggingPlugin) ReadLogs(info Info, config ReadConfig) (io.ReadCloser, error) {
  39. r, w := io.Pipe()
  40. f, err := os.Open(l.f.Name())
  41. if err != nil {
  42. return nil, err
  43. }
  44. go func() {
  45. defer f.Close()
  46. dec := protoio.NewUint32DelimitedReader(f, binary.BigEndian, 1e6)
  47. enc := logdriver.NewLogEntryEncoder(w)
  48. for {
  49. select {
  50. case <-l.closed:
  51. w.Close()
  52. return
  53. default:
  54. }
  55. var msg logdriver.LogEntry
  56. if err := dec.ReadMsg(&msg); err != nil {
  57. if err == io.EOF {
  58. if !config.Follow {
  59. w.Close()
  60. return
  61. }
  62. dec = protoio.NewUint32DelimitedReader(f, binary.BigEndian, 1e6)
  63. continue
  64. }
  65. l.t.Fatal(err)
  66. continue
  67. }
  68. if err := enc.Encode(&msg); err != nil {
  69. w.CloseWithError(err)
  70. return
  71. }
  72. }
  73. }()
  74. return r, nil
  75. }
  76. func newMockPluginAdapter(t *testing.T) Logger {
  77. r, w := io.Pipe()
  78. f, err := ioutil.TempFile("", "mock-plugin-adapter")
  79. if err != nil {
  80. t.Fatal(err)
  81. }
  82. enc := logdriver.NewLogEntryEncoder(w)
  83. a := &pluginAdapterWithRead{
  84. &pluginAdapter{
  85. plugin: &mockLoggingPlugin{
  86. inStream: r,
  87. f: f,
  88. closed: make(chan struct{}),
  89. t: t,
  90. },
  91. stream: w,
  92. enc: enc,
  93. },
  94. }
  95. a.plugin.StartLogging("", Info{})
  96. return a
  97. }
  98. func TestAdapterReadLogs(t *testing.T) {
  99. l := newMockPluginAdapter(t)
  100. testMsg := []Message{
  101. {Line: []byte("Are you the keymaker?"), Timestamp: time.Now()},
  102. {Line: []byte("Follow the white rabbit"), Timestamp: time.Now()},
  103. }
  104. for _, msg := range testMsg {
  105. m := msg.copy()
  106. if err := l.Log(m); err != nil {
  107. t.Fatal(err)
  108. }
  109. }
  110. lr, ok := l.(LogReader)
  111. if !ok {
  112. t.Fatal("expected log reader")
  113. }
  114. lw := lr.ReadLogs(ReadConfig{})
  115. for _, x := range testMsg {
  116. select {
  117. case msg := <-lw.Msg:
  118. testMessageEqual(t, &x, msg)
  119. case <-time.After(10 * time.Millisecond):
  120. t.Fatal("timeout reading logs")
  121. }
  122. }
  123. select {
  124. case _, ok := <-lw.Msg:
  125. if ok {
  126. t.Fatal("expected message channel to be closed")
  127. }
  128. case <-time.After(10 * time.Second):
  129. t.Fatal("timeout waiting for message channel to close")
  130. }
  131. lw.Close()
  132. lw = lr.ReadLogs(ReadConfig{Follow: true})
  133. for _, x := range testMsg {
  134. select {
  135. case msg := <-lw.Msg:
  136. testMessageEqual(t, &x, msg)
  137. case <-time.After(10 * time.Second):
  138. t.Fatal("timeout reading logs")
  139. }
  140. }
  141. x := Message{Line: []byte("Too infinity and beyond!"), Timestamp: time.Now()}
  142. if err := l.Log(x.copy()); err != nil {
  143. t.Fatal(err)
  144. }
  145. select {
  146. case msg, ok := <-lw.Msg:
  147. if !ok {
  148. t.Fatal("message channel unexpectedly closed")
  149. }
  150. testMessageEqual(t, &x, msg)
  151. case <-time.After(10 * time.Second):
  152. t.Fatal("timeout reading logs")
  153. }
  154. l.Close()
  155. select {
  156. case msg, ok := <-lw.Msg:
  157. if ok {
  158. t.Fatal("expected message channel to be closed")
  159. }
  160. if msg != nil {
  161. t.Fatal("expected nil message")
  162. }
  163. case <-time.After(10 * time.Second):
  164. t.Fatal("timeout waiting for logger to close")
  165. }
  166. }
  167. func testMessageEqual(t *testing.T, a, b *Message) {
  168. _, _, n, _ := runtime.Caller(1)
  169. errFmt := "line %d: expected same messages:\nwant: %+v\nhave: %+v"
  170. if !bytes.Equal(a.Line, b.Line) {
  171. t.Fatalf(errFmt, n, *a, *b)
  172. }
  173. if a.Timestamp.UnixNano() != b.Timestamp.UnixNano() {
  174. t.Fatalf(errFmt, n, *a, *b)
  175. }
  176. if a.Source != b.Source {
  177. t.Fatalf(errFmt, n, *a, *b)
  178. }
  179. }