logfile_test.go 4.7 KB


  1. package loggerutils
  2. import (
  3. "bufio"
  4. "context"
  5. "io"
  6. "io/ioutil"
  7. "os"
  8. "strings"
  9. "testing"
  10. "time"
  11. "github.com/docker/docker/daemon/logger"
  12. "github.com/docker/docker/pkg/tailfile"
  13. "gotest.tools/v3/assert"
  14. )
  15. func TestTailFiles(t *testing.T) {
  16. s1 := strings.NewReader("Hello.\nMy name is Inigo Montoya.\n")
  17. s2 := strings.NewReader("I'm serious.\nDon't call me Shirley!\n")
  18. s3 := strings.NewReader("Roads?\nWhere we're going we don't need roads.\n")
  19. files := []SizeReaderAt{s1, s2, s3}
  20. watcher := logger.NewLogWatcher()
  21. createDecoder := func(r io.Reader) func() (*logger.Message, error) {
  22. scanner := bufio.NewScanner(r)
  23. return func() (*logger.Message, error) {
  24. if !scanner.Scan() {
  25. return nil, scanner.Err()
  26. }
  27. // some comment
  28. return &logger.Message{Line: scanner.Bytes(), Timestamp: time.Now()}, nil
  29. }
  30. }
  31. tailReader := func(ctx context.Context, r SizeReaderAt, lines int) (io.Reader, int, error) {
  32. return tailfile.NewTailReader(ctx, r, lines)
  33. }
  34. for desc, config := range map[string]logger.ReadConfig{} {
  35. t.Run(desc, func(t *testing.T) {
  36. started := make(chan struct{})
  37. go func() {
  38. close(started)
  39. tailFiles(files, watcher, createDecoder, tailReader, config)
  40. }()
  41. <-started
  42. })
  43. }
  44. config := logger.ReadConfig{Tail: 2}
  45. started := make(chan struct{})
  46. go func() {
  47. close(started)
  48. tailFiles(files, watcher, createDecoder, tailReader, config)
  49. }()
  50. <-started
  51. select {
  52. case <-time.After(60 * time.Second):
  53. t.Fatal("timeout waiting for tail line")
  54. case err := <-watcher.Err:
  55. assert.NilError(t, err)
  56. case msg := <-watcher.Msg:
  57. assert.Assert(t, msg != nil)
  58. assert.Assert(t, string(msg.Line) == "Roads?", string(msg.Line))
  59. }
  60. select {
  61. case <-time.After(60 * time.Second):
  62. t.Fatal("timeout waiting for tail line")
  63. case err := <-watcher.Err:
  64. assert.NilError(t, err)
  65. case msg := <-watcher.Msg:
  66. assert.Assert(t, msg != nil)
  67. assert.Assert(t, string(msg.Line) == "Where we're going we don't need roads.", string(msg.Line))
  68. }
  69. }
  70. func TestFollowLogsConsumerGone(t *testing.T) {
  71. lw := logger.NewLogWatcher()
  72. f, err := ioutil.TempFile("", t.Name())
  73. assert.NilError(t, err)
  74. defer func() {
  75. f.Close()
  76. os.Remove(f.Name())
  77. }()
  78. makeDecoder := func(rdr io.Reader) func() (*logger.Message, error) {
  79. return func() (*logger.Message, error) {
  80. return &logger.Message{}, nil
  81. }
  82. }
  83. followLogsDone := make(chan struct{})
  84. var since, until time.Time
  85. go func() {
  86. followLogs(f, lw, make(chan interface{}), makeDecoder, since, until)
  87. close(followLogsDone)
  88. }()
  89. select {
  90. case <-lw.Msg:
  91. case err := <-lw.Err:
  92. assert.NilError(t, err)
  93. case <-followLogsDone:
  94. t.Fatal("follow logs finished unexpectedly")
  95. case <-time.After(10 * time.Second):
  96. t.Fatal("timeout waiting for log message")
  97. }
  98. lw.ConsumerGone()
  99. select {
  100. case <-followLogsDone:
  101. case <-time.After(20 * time.Second):
  102. t.Fatal("timeout waiting for followLogs() to finish")
  103. }
  104. }
  105. func TestFollowLogsProducerGone(t *testing.T) {
  106. lw := logger.NewLogWatcher()
  107. f, err := ioutil.TempFile("", t.Name())
  108. assert.NilError(t, err)
  109. defer os.Remove(f.Name())
  110. var sent, received, closed int
  111. makeDecoder := func(rdr io.Reader) func() (*logger.Message, error) {
  112. return func() (*logger.Message, error) {
  113. if closed == 1 {
  114. closed++
  115. t.Logf("logDecode() closed after sending %d messages\n", sent)
  116. return nil, io.EOF
  117. } else if closed > 1 {
  118. t.Fatal("logDecode() called after closing!")
  119. return nil, io.EOF
  120. }
  121. sent++
  122. return &logger.Message{}, nil
  123. }
  124. }
  125. var since, until time.Time
  126. followLogsDone := make(chan struct{})
  127. go func() {
  128. followLogs(f, lw, make(chan interface{}), makeDecoder, since, until)
  129. close(followLogsDone)
  130. }()
  131. // read 1 message
  132. select {
  133. case <-lw.Msg:
  134. received++
  135. case err := <-lw.Err:
  136. assert.NilError(t, err)
  137. case <-followLogsDone:
  138. t.Fatal("followLogs() finished unexpectedly")
  139. case <-time.After(10 * time.Second):
  140. t.Fatal("timeout waiting for log message")
  141. }
  142. // "stop" the "container"
  143. closed = 1
  144. lw.ProducerGone()
  145. // should receive all the messages sent
  146. readDone := make(chan struct{})
  147. go func() {
  148. defer close(readDone)
  149. for {
  150. select {
  151. case <-lw.Msg:
  152. received++
  153. if received == sent {
  154. return
  155. }
  156. case err := <-lw.Err:
  157. assert.NilError(t, err)
  158. }
  159. }
  160. }()
  161. select {
  162. case <-readDone:
  163. case <-time.After(30 * time.Second):
  164. t.Fatalf("timeout waiting for log messages to be read (sent: %d, received: %d", sent, received)
  165. }
  166. t.Logf("messages sent: %d, received: %d", sent, received)
  167. // followLogs() should be done by now
  168. select {
  169. case <-followLogsDone:
  170. case <-time.After(30 * time.Second):
  171. t.Fatal("timeout waiting for followLogs() to finish")
  172. }
  173. select {
  174. case <-lw.WatchConsumerGone():
  175. t.Fatal("consumer should not have exited")
  176. default:
  177. }
  178. }