adapter_test.go 3.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180
  1. package logger
  2. import (
  3. "encoding/binary"
  4. "io"
  5. "io/ioutil"
  6. "os"
  7. "testing"
  8. "time"
  9. "github.com/docker/docker/api/types/plugins/logdriver"
  10. protoio "github.com/gogo/protobuf/io"
  11. "github.com/stretchr/testify/assert"
  12. )
  13. // mockLoggingPlugin implements the loggingPlugin interface for testing purposes
  14. // it only supports a single log stream
  15. type mockLoggingPlugin struct {
  16. inStream io.ReadCloser
  17. f *os.File
  18. closed chan struct{}
  19. t *testing.T
  20. }
  21. func (l *mockLoggingPlugin) StartLogging(file string, info Info) error {
  22. go func() {
  23. io.Copy(l.f, l.inStream)
  24. close(l.closed)
  25. }()
  26. return nil
  27. }
  28. func (l *mockLoggingPlugin) StopLogging(file string) error {
  29. l.inStream.Close()
  30. l.f.Close()
  31. os.Remove(l.f.Name())
  32. return nil
  33. }
  34. func (l *mockLoggingPlugin) Capabilities() (cap Capability, err error) {
  35. return Capability{ReadLogs: true}, nil
  36. }
  37. func (l *mockLoggingPlugin) ReadLogs(info Info, config ReadConfig) (io.ReadCloser, error) {
  38. r, w := io.Pipe()
  39. f, err := os.Open(l.f.Name())
  40. if err != nil {
  41. return nil, err
  42. }
  43. go func() {
  44. defer f.Close()
  45. dec := protoio.NewUint32DelimitedReader(f, binary.BigEndian, 1e6)
  46. enc := logdriver.NewLogEntryEncoder(w)
  47. for {
  48. select {
  49. case <-l.closed:
  50. w.Close()
  51. return
  52. default:
  53. }
  54. var msg logdriver.LogEntry
  55. if err := dec.ReadMsg(&msg); err != nil {
  56. if err == io.EOF {
  57. if !config.Follow {
  58. w.Close()
  59. return
  60. }
  61. dec = protoio.NewUint32DelimitedReader(f, binary.BigEndian, 1e6)
  62. continue
  63. }
  64. l.t.Fatal(err)
  65. continue
  66. }
  67. if err := enc.Encode(&msg); err != nil {
  68. w.CloseWithError(err)
  69. return
  70. }
  71. }
  72. }()
  73. return r, nil
  74. }
  75. func newMockPluginAdapter(t *testing.T) Logger {
  76. r, w := io.Pipe()
  77. f, err := ioutil.TempFile("", "mock-plugin-adapter")
  78. assert.NoError(t, err)
  79. enc := logdriver.NewLogEntryEncoder(w)
  80. a := &pluginAdapterWithRead{
  81. &pluginAdapter{
  82. plugin: &mockLoggingPlugin{
  83. inStream: r,
  84. f: f,
  85. closed: make(chan struct{}),
  86. t: t,
  87. },
  88. stream: w,
  89. enc: enc,
  90. },
  91. }
  92. a.plugin.StartLogging("", Info{})
  93. return a
  94. }
  95. func TestAdapterReadLogs(t *testing.T) {
  96. l := newMockPluginAdapter(t)
  97. testMsg := []Message{
  98. {Line: []byte("Are you the keymaker?"), Timestamp: time.Now()},
  99. {Line: []byte("Follow the white rabbit"), Timestamp: time.Now()},
  100. }
  101. for _, msg := range testMsg {
  102. m := msg.copy()
  103. assert.NoError(t, l.Log(m))
  104. }
  105. lr, ok := l.(LogReader)
  106. assert.NotNil(t, ok)
  107. lw := lr.ReadLogs(ReadConfig{})
  108. for _, x := range testMsg {
  109. select {
  110. case msg := <-lw.Msg:
  111. testMessageEqual(t, &x, msg)
  112. case <-time.After(10 * time.Second):
  113. t.Fatal("timeout reading logs")
  114. }
  115. }
  116. select {
  117. case _, ok := <-lw.Msg:
  118. assert.False(t, ok, "expected message channel to be closed")
  119. case <-time.After(10 * time.Second):
  120. t.Fatal("timeout waiting for message channel to close")
  121. }
  122. lw.Close()
  123. lw = lr.ReadLogs(ReadConfig{Follow: true})
  124. for _, x := range testMsg {
  125. select {
  126. case msg := <-lw.Msg:
  127. testMessageEqual(t, &x, msg)
  128. case <-time.After(10 * time.Second):
  129. t.Fatal("timeout reading logs")
  130. }
  131. }
  132. x := Message{Line: []byte("Too infinity and beyond!"), Timestamp: time.Now()}
  133. assert.NoError(t, l.Log(x.copy()))
  134. select {
  135. case msg, ok := <-lw.Msg:
  136. assert.NotNil(t, ok, "message channel unexpectedly closed")
  137. testMessageEqual(t, &x, msg)
  138. case <-time.After(10 * time.Second):
  139. t.Fatal("timeout reading logs")
  140. }
  141. l.Close()
  142. select {
  143. case msg, ok := <-lw.Msg:
  144. assert.False(t, ok, "expected message channel to be closed")
  145. assert.Nil(t, msg)
  146. case <-time.After(10 * time.Second):
  147. t.Fatal("timeout waiting for logger to close")
  148. }
  149. }
  150. func testMessageEqual(t *testing.T, a, b *Message) {
  151. assert.Equal(t, a.Line, b.Line)
  152. assert.Equal(t, a.Timestamp.UnixNano(), b.Timestamp.UnixNano())
  153. assert.Equal(t, a.Source, b.Source)
  154. }