logfile_test.go 6.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235
  1. package loggerutils // import "github.com/docker/docker/daemon/logger/loggerutils"
  2. import (
  3. "bufio"
  4. "bytes"
  5. "context"
  6. "fmt"
  7. "io"
  8. "os"
  9. "path/filepath"
  10. "strings"
  11. "testing"
  12. "text/tabwriter"
  13. "time"
  14. "github.com/docker/docker/daemon/logger"
  15. "github.com/docker/docker/pkg/tailfile"
  16. "gotest.tools/v3/assert"
  17. "gotest.tools/v3/poll"
  18. )
  19. type testDecoder struct {
  20. rdr io.Reader
  21. scanner *bufio.Scanner
  22. resetCount int
  23. }
  24. func (d *testDecoder) Decode() (*logger.Message, error) {
  25. if d.scanner == nil {
  26. d.scanner = bufio.NewScanner(d.rdr)
  27. }
  28. if !d.scanner.Scan() {
  29. return nil, d.scanner.Err()
  30. }
  31. // some comment
  32. return &logger.Message{Line: d.scanner.Bytes(), Timestamp: time.Now()}, nil
  33. }
  34. func (d *testDecoder) Reset(rdr io.Reader) {
  35. d.rdr = rdr
  36. d.scanner = bufio.NewScanner(rdr)
  37. d.resetCount++
  38. }
  39. func (d *testDecoder) Close() {
  40. d.rdr = nil
  41. d.scanner = nil
  42. }
  43. func TestTailFiles(t *testing.T) {
  44. s1 := strings.NewReader("Hello.\nMy name is Inigo Montoya.\n")
  45. s2 := strings.NewReader("I'm serious.\nDon't call me Shirley!\n")
  46. s3 := strings.NewReader("Roads?\nWhere we're going we don't need roads.\n")
  47. files := []SizeReaderAt{s1, s2, s3}
  48. watcher := logger.NewLogWatcher()
  49. defer watcher.ConsumerGone()
  50. tailReader := func(ctx context.Context, r SizeReaderAt, lines int) (io.Reader, int, error) {
  51. return tailfile.NewTailReader(ctx, r, lines)
  52. }
  53. dec := &testDecoder{}
  54. for desc, config := range map[string]logger.ReadConfig{} {
  55. t.Run(desc, func(t *testing.T) {
  56. started := make(chan struct{})
  57. fwd := newForwarder(config)
  58. go func() {
  59. close(started)
  60. tailFiles(files, watcher, dec, tailReader, config.Tail, fwd)
  61. }()
  62. <-started
  63. })
  64. }
  65. config := logger.ReadConfig{Tail: 2}
  66. fwd := newForwarder(config)
  67. started := make(chan struct{})
  68. go func() {
  69. close(started)
  70. tailFiles(files, watcher, dec, tailReader, config.Tail, fwd)
  71. }()
  72. <-started
  73. select {
  74. case <-time.After(60 * time.Second):
  75. t.Fatal("timeout waiting for tail line")
  76. case err := <-watcher.Err:
  77. assert.NilError(t, err)
  78. case msg := <-watcher.Msg:
  79. assert.Assert(t, msg != nil)
  80. assert.Assert(t, string(msg.Line) == "Roads?", string(msg.Line))
  81. }
  82. select {
  83. case <-time.After(60 * time.Second):
  84. t.Fatal("timeout waiting for tail line")
  85. case err := <-watcher.Err:
  86. assert.NilError(t, err)
  87. case msg := <-watcher.Msg:
  88. assert.Assert(t, msg != nil)
  89. assert.Assert(t, string(msg.Line) == "Where we're going we don't need roads.", string(msg.Line))
  90. }
  91. }
  92. type dummyDecoder struct{}
  93. func (dummyDecoder) Decode() (*logger.Message, error) {
  94. return &logger.Message{}, nil
  95. }
  96. func (dummyDecoder) Close() {}
  97. func (dummyDecoder) Reset(io.Reader) {}
  98. func TestCheckCapacityAndRotate(t *testing.T) {
  99. dir := t.TempDir()
  100. logPath := filepath.Join(dir, "log")
  101. getTailReader := func(ctx context.Context, r SizeReaderAt, lines int) (io.Reader, int, error) {
  102. return tailfile.NewTailReader(ctx, r, lines)
  103. }
  104. createDecoder := func(io.Reader) Decoder {
  105. return dummyDecoder{}
  106. }
  107. l, err := NewLogFile(
  108. logPath,
  109. 5, // capacity
  110. 3, // maxFiles
  111. true, // compress
  112. createDecoder,
  113. 0o600, // perms
  114. getTailReader,
  115. )
  116. assert.NilError(t, err)
  117. defer l.Close()
  118. ls := dirStringer{dir}
  119. timestamp := time.Time{}
  120. assert.NilError(t, l.WriteLogEntry(timestamp, []byte("hello world!")))
  121. _, err = os.Stat(logPath + ".1")
  122. assert.Assert(t, os.IsNotExist(err), ls)
  123. assert.NilError(t, l.WriteLogEntry(timestamp, []byte("hello world!")))
  124. poll.WaitOn(t, checkFileExists(logPath+".1.gz"), poll.WithDelay(time.Millisecond), poll.WithTimeout(30*time.Second))
  125. assert.NilError(t, l.WriteLogEntry(timestamp, []byte("hello world!")))
  126. poll.WaitOn(t, checkFileExists(logPath+".1.gz"), poll.WithDelay(time.Millisecond), poll.WithTimeout(30*time.Second))
  127. poll.WaitOn(t, checkFileExists(logPath+".2.gz"), poll.WithDelay(time.Millisecond), poll.WithTimeout(30*time.Second))
  128. t.Run("closed log file", func(t *testing.T) {
  129. // Now let's simulate a failed rotation where the file was able to be closed but something else happened elsewhere
  130. // down the line.
  131. // We want to make sure that we can recover in the case that `l.f` was closed while attempting a rotation.
  132. l.f.Close()
  133. assert.NilError(t, l.WriteLogEntry(timestamp, []byte("hello world!")))
  134. assert.NilError(t, os.Remove(logPath+".2.gz"))
  135. })
  136. t.Run("with log reader", func(t *testing.T) {
  137. // Make sure rotate works with an active reader
  138. lw := l.ReadLogs(logger.ReadConfig{Follow: true, Tail: 1000})
  139. defer lw.ConsumerGone()
  140. assert.NilError(t, l.WriteLogEntry(timestamp, []byte("hello world 0!")), ls)
  141. // make sure the log reader is primed
  142. waitForMsg(t, lw, 30*time.Second)
  143. assert.NilError(t, l.WriteLogEntry(timestamp, []byte("hello world 1!")), ls)
  144. assert.NilError(t, l.WriteLogEntry(timestamp, []byte("hello world 2!")), ls)
  145. assert.NilError(t, l.WriteLogEntry(timestamp, []byte("hello world 3!")), ls)
  146. assert.NilError(t, l.WriteLogEntry(timestamp, []byte("hello world 4!")), ls)
  147. poll.WaitOn(t, checkFileExists(logPath+".2.gz"), poll.WithDelay(time.Millisecond), poll.WithTimeout(30*time.Second))
  148. })
  149. }
  150. func waitForMsg(t *testing.T, lw *logger.LogWatcher, timeout time.Duration) {
  151. t.Helper()
  152. timer := time.NewTimer(timeout)
  153. defer timer.Stop()
  154. select {
  155. case _, ok := <-lw.Msg:
  156. assert.Assert(t, ok, "log producer gone before log message arrived")
  157. case err := <-lw.Err:
  158. assert.NilError(t, err)
  159. case <-timer.C:
  160. t.Fatal("timeout waiting for log message")
  161. }
  162. }
  163. type dirStringer struct {
  164. d string
  165. }
  166. func (d dirStringer) String() string {
  167. ls, err := os.ReadDir(d.d)
  168. if err != nil {
  169. return ""
  170. }
  171. buf := bytes.NewBuffer(nil)
  172. tw := tabwriter.NewWriter(buf, 1, 8, 1, '\t', 0)
  173. buf.WriteString("\n")
  174. btw := bufio.NewWriter(tw)
  175. for _, entry := range ls {
  176. fi, err := entry.Info()
  177. if err != nil {
  178. return ""
  179. }
  180. btw.WriteString(fmt.Sprintf("%s\t%s\t%dB\t%s\n", fi.Name(), fi.Mode(), fi.Size(), fi.ModTime()))
  181. }
  182. btw.Flush()
  183. tw.Flush()
  184. return buf.String()
  185. }
  186. func checkFileExists(name string) poll.Check {
  187. return func(t poll.LogT) poll.Result {
  188. _, err := os.Stat(name)
  189. switch {
  190. case err == nil:
  191. return poll.Success()
  192. case os.IsNotExist(err):
  193. return poll.Continue("waiting for %s to exist", name)
  194. default:
  195. t.Logf("waiting for %s: %v: %s", name, err, dirStringer{filepath.Dir(name)})
  196. return poll.Error(err)
  197. }
  198. }
  199. }