logfile_test.go 6.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267
  1. package loggerutils
  2. import (
  3. "bufio"
  4. "context"
  5. "io"
  6. "io/ioutil"
  7. "os"
  8. "strings"
  9. "testing"
  10. "time"
  11. "github.com/docker/docker/daemon/logger"
  12. "github.com/docker/docker/pkg/pubsub"
  13. "github.com/docker/docker/pkg/tailfile"
  14. "gotest.tools/assert"
  15. )
  16. func TestTailFiles(t *testing.T) {
  17. s1 := strings.NewReader("Hello.\nMy name is Inigo Montoya.\n")
  18. s2 := strings.NewReader("I'm serious.\nDon't call me Shirley!\n")
  19. s3 := strings.NewReader("Roads?\nWhere we're going we don't need roads.\n")
  20. files := []SizeReaderAt{s1, s2, s3}
  21. watcher := logger.NewLogWatcher()
  22. createDecoder := func(r io.Reader) func() (*logger.Message, error) {
  23. scanner := bufio.NewScanner(r)
  24. return func() (*logger.Message, error) {
  25. if !scanner.Scan() {
  26. return nil, scanner.Err()
  27. }
  28. // some comment
  29. return &logger.Message{Line: scanner.Bytes(), Timestamp: time.Now()}, nil
  30. }
  31. }
  32. tailReader := func(ctx context.Context, r SizeReaderAt, lines int) (io.Reader, int, error) {
  33. return tailfile.NewTailReader(ctx, r, lines)
  34. }
  35. for desc, config := range map[string]logger.ReadConfig{} {
  36. t.Run(desc, func(t *testing.T) {
  37. started := make(chan struct{})
  38. go func() {
  39. close(started)
  40. tailFiles(files, watcher, createDecoder, tailReader, config)
  41. }()
  42. <-started
  43. })
  44. }
  45. config := logger.ReadConfig{Tail: 2}
  46. started := make(chan struct{})
  47. go func() {
  48. close(started)
  49. tailFiles(files, watcher, createDecoder, tailReader, config)
  50. }()
  51. <-started
  52. select {
  53. case <-time.After(60 * time.Second):
  54. t.Fatal("timeout waiting for tail line")
  55. case err := <-watcher.Err:
  56. assert.NilError(t, err)
  57. case msg := <-watcher.Msg:
  58. assert.Assert(t, msg != nil)
  59. assert.Assert(t, string(msg.Line) == "Roads?", string(msg.Line))
  60. }
  61. select {
  62. case <-time.After(60 * time.Second):
  63. t.Fatal("timeout waiting for tail line")
  64. case err := <-watcher.Err:
  65. assert.NilError(t, err)
  66. case msg := <-watcher.Msg:
  67. assert.Assert(t, msg != nil)
  68. assert.Assert(t, string(msg.Line) == "Where we're going we don't need roads.", string(msg.Line))
  69. }
  70. }
  71. func TestFollowLogsConsumerGone(t *testing.T) {
  72. lw := logger.NewLogWatcher()
  73. f, err := ioutil.TempFile("", t.Name())
  74. assert.NilError(t, err)
  75. defer func() {
  76. f.Close()
  77. os.Remove(f.Name())
  78. }()
  79. makeDecoder := func(rdr io.Reader) func() (*logger.Message, error) {
  80. return func() (*logger.Message, error) {
  81. return &logger.Message{}, nil
  82. }
  83. }
  84. followLogsDone := make(chan struct{})
  85. var since, until time.Time
  86. go func() {
  87. followLogs(f, lw, make(chan interface{}), makeDecoder, since, until)
  88. close(followLogsDone)
  89. }()
  90. select {
  91. case <-lw.Msg:
  92. case err := <-lw.Err:
  93. assert.NilError(t, err)
  94. case <-followLogsDone:
  95. t.Fatal("follow logs finished unexpectedly")
  96. case <-time.After(10 * time.Second):
  97. t.Fatal("timeout waiting for log message")
  98. }
  99. lw.ConsumerGone()
  100. select {
  101. case <-followLogsDone:
  102. case <-time.After(20 * time.Second):
  103. t.Fatal("timeout waiting for followLogs() to finish")
  104. }
  105. }
  106. func TestFollowLogsProducerGone(t *testing.T) {
  107. lw := logger.NewLogWatcher()
  108. f, err := ioutil.TempFile("", t.Name())
  109. assert.NilError(t, err)
  110. defer os.Remove(f.Name())
  111. var sent, received, closed int
  112. makeDecoder := func(rdr io.Reader) func() (*logger.Message, error) {
  113. return func() (*logger.Message, error) {
  114. if closed == 1 {
  115. closed++
  116. t.Logf("logDecode() closed after sending %d messages\n", sent)
  117. return nil, io.EOF
  118. } else if closed > 1 {
  119. t.Fatal("logDecode() called after closing!")
  120. return nil, io.EOF
  121. }
  122. sent++
  123. return &logger.Message{}, nil
  124. }
  125. }
  126. var since, until time.Time
  127. followLogsDone := make(chan struct{})
  128. go func() {
  129. followLogs(f, lw, make(chan interface{}), makeDecoder, since, until)
  130. close(followLogsDone)
  131. }()
  132. // read 1 message
  133. select {
  134. case <-lw.Msg:
  135. received++
  136. case err := <-lw.Err:
  137. assert.NilError(t, err)
  138. case <-followLogsDone:
  139. t.Fatal("followLogs() finished unexpectedly")
  140. case <-time.After(10 * time.Second):
  141. t.Fatal("timeout waiting for log message")
  142. }
  143. // "stop" the "container"
  144. closed = 1
  145. lw.ProducerGone()
  146. // should receive all the messages sent
  147. readDone := make(chan struct{})
  148. go func() {
  149. defer close(readDone)
  150. for {
  151. select {
  152. case <-lw.Msg:
  153. received++
  154. if received == sent {
  155. return
  156. }
  157. case err := <-lw.Err:
  158. assert.NilError(t, err)
  159. }
  160. }
  161. }()
  162. select {
  163. case <-readDone:
  164. case <-time.After(30 * time.Second):
  165. t.Fatalf("timeout waiting for log messages to be read (sent: %d, received: %d", sent, received)
  166. }
  167. t.Logf("messages sent: %d, received: %d", sent, received)
  168. // followLogs() should be done by now
  169. select {
  170. case <-followLogsDone:
  171. case <-time.After(30 * time.Second):
  172. t.Fatal("timeout waiting for followLogs() to finish")
  173. }
  174. select {
  175. case <-lw.WatchConsumerGone():
  176. t.Fatal("consumer should not have exited")
  177. default:
  178. }
  179. }
  180. func TestCheckCapacityAndRotate(t *testing.T) {
  181. dir, err := ioutil.TempDir("", t.Name())
  182. assert.NilError(t, err)
  183. defer os.RemoveAll(dir)
  184. f, err := ioutil.TempFile(dir, "log")
  185. assert.NilError(t, err)
  186. l := &LogFile{
  187. f: f,
  188. capacity: 5,
  189. maxFiles: 3,
  190. compress: true,
  191. notifyRotate: pubsub.NewPublisher(0, 1),
  192. perms: 0600,
  193. marshal: func(msg *logger.Message) ([]byte, error) {
  194. return msg.Line, nil
  195. },
  196. }
  197. defer l.Close()
  198. assert.NilError(t, l.WriteLogEntry(&logger.Message{Line: []byte("hello world!")}))
  199. dStringer := dirStringer{dir}
  200. _, err = os.Stat(f.Name() + ".1")
  201. assert.Assert(t, os.IsNotExist(err), dStringer)
  202. assert.NilError(t, l.WriteLogEntry(&logger.Message{Line: []byte("hello world!")}))
  203. _, err = os.Stat(f.Name() + ".1")
  204. assert.NilError(t, err, dStringer)
  205. assert.NilError(t, l.WriteLogEntry(&logger.Message{Line: []byte("hello world!")}))
  206. _, err = os.Stat(f.Name() + ".1")
  207. assert.NilError(t, err, dStringer)
  208. _, err = os.Stat(f.Name() + ".2.gz")
  209. assert.NilError(t, err, dStringer)
  210. // Now let's simulate a failed rotation where the file was able to be closed but something else happened elsewhere
  211. // down the line.
  212. // We want to make sure that we can recover in the case that `l.f` was closed while attempting a rotation.
  213. l.f.Close()
  214. assert.NilError(t, l.WriteLogEntry(&logger.Message{Line: []byte("hello world!")}))
  215. }
  216. type dirStringer struct {
  217. d string
  218. }
  219. func (d dirStringer) String() string {
  220. ls, err := ioutil.ReadDir(d.d)
  221. if err != nil {
  222. return ""
  223. }
  224. var s strings.Builder
  225. s.WriteString("\n")
  226. for _, fi := range ls {
  227. s.WriteString(fi.Name() + "\n")
  228. }
  229. return s.String()
  230. }