logfile_test.go 9.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371
  1. package loggerutils // import "github.com/docker/docker/daemon/logger/loggerutils"
  2. import (
  3. "bufio"
  4. "bytes"
  5. "context"
  6. "fmt"
  7. "io"
  8. "os"
  9. "path/filepath"
  10. "strings"
  11. "sync/atomic"
  12. "testing"
  13. "text/tabwriter"
  14. "time"
  15. "github.com/docker/docker/daemon/logger"
  16. "github.com/docker/docker/pkg/tailfile"
  17. "gotest.tools/v3/assert"
  18. "gotest.tools/v3/poll"
  19. )
  20. type testDecoder struct {
  21. rdr io.Reader
  22. scanner *bufio.Scanner
  23. resetCount int
  24. }
  25. func (d *testDecoder) Decode() (*logger.Message, error) {
  26. if d.scanner == nil {
  27. d.scanner = bufio.NewScanner(d.rdr)
  28. }
  29. if !d.scanner.Scan() {
  30. return nil, d.scanner.Err()
  31. }
  32. // some comment
  33. return &logger.Message{Line: d.scanner.Bytes(), Timestamp: time.Now()}, nil
  34. }
  35. func (d *testDecoder) Reset(rdr io.Reader) {
  36. d.rdr = rdr
  37. d.scanner = bufio.NewScanner(rdr)
  38. d.resetCount++
  39. }
  40. func (d *testDecoder) Close() {
  41. d.rdr = nil
  42. d.scanner = nil
  43. }
  44. func TestTailFiles(t *testing.T) {
  45. s1 := strings.NewReader("Hello.\nMy name is Inigo Montoya.\n")
  46. s2 := strings.NewReader("I'm serious.\nDon't call me Shirley!\n")
  47. s3 := strings.NewReader("Roads?\nWhere we're going we don't need roads.\n")
  48. files := []SizeReaderAt{s1, s2, s3}
  49. watcher := logger.NewLogWatcher()
  50. defer watcher.ConsumerGone()
  51. tailReader := func(ctx context.Context, r SizeReaderAt, lines int) (io.Reader, int, error) {
  52. return tailfile.NewTailReader(ctx, r, lines)
  53. }
  54. dec := &testDecoder{}
  55. for desc, config := range map[string]logger.ReadConfig{} {
  56. t.Run(desc, func(t *testing.T) {
  57. started := make(chan struct{})
  58. go func() {
  59. close(started)
  60. tailFiles(files, watcher, dec, tailReader, config, make(chan interface{}))
  61. }()
  62. <-started
  63. })
  64. }
  65. config := logger.ReadConfig{Tail: 2}
  66. started := make(chan struct{})
  67. go func() {
  68. close(started)
  69. tailFiles(files, watcher, dec, tailReader, config, make(chan interface{}))
  70. }()
  71. <-started
  72. select {
  73. case <-time.After(60 * time.Second):
  74. t.Fatal("timeout waiting for tail line")
  75. case err := <-watcher.Err:
  76. assert.NilError(t, err)
  77. case msg := <-watcher.Msg:
  78. assert.Assert(t, msg != nil)
  79. assert.Assert(t, string(msg.Line) == "Roads?", string(msg.Line))
  80. }
  81. select {
  82. case <-time.After(60 * time.Second):
  83. t.Fatal("timeout waiting for tail line")
  84. case err := <-watcher.Err:
  85. assert.NilError(t, err)
  86. case msg := <-watcher.Msg:
  87. assert.Assert(t, msg != nil)
  88. assert.Assert(t, string(msg.Line) == "Where we're going we don't need roads.", string(msg.Line))
  89. }
  90. }
  91. type dummyDecoder struct{}
  92. func (dummyDecoder) Decode() (*logger.Message, error) {
  93. return &logger.Message{}, nil
  94. }
  95. func (dummyDecoder) Close() {}
  96. func (dummyDecoder) Reset(io.Reader) {}
  97. func TestFollowLogsConsumerGone(t *testing.T) {
  98. lw := logger.NewLogWatcher()
  99. f, err := os.CreateTemp("", t.Name())
  100. assert.NilError(t, err)
  101. defer func() {
  102. f.Close()
  103. os.Remove(f.Name())
  104. }()
  105. dec := dummyDecoder{}
  106. followLogsDone := make(chan struct{})
  107. var since, until time.Time
  108. go func() {
  109. followLogs(f, lw, make(chan interface{}), make(chan interface{}), dec, since, until)
  110. close(followLogsDone)
  111. }()
  112. select {
  113. case <-lw.Msg:
  114. case err := <-lw.Err:
  115. assert.NilError(t, err)
  116. case <-followLogsDone:
  117. t.Fatal("follow logs finished unexpectedly")
  118. case <-time.After(10 * time.Second):
  119. t.Fatal("timeout waiting for log message")
  120. }
  121. lw.ConsumerGone()
  122. select {
  123. case <-followLogsDone:
  124. case <-time.After(20 * time.Second):
  125. t.Fatal("timeout waiting for followLogs() to finish")
  126. }
  127. }
  128. type dummyWrapper struct {
  129. dummyDecoder
  130. fn func() error
  131. }
  132. func (d *dummyWrapper) Decode() (*logger.Message, error) {
  133. if err := d.fn(); err != nil {
  134. return nil, err
  135. }
  136. return d.dummyDecoder.Decode()
  137. }
  138. func TestFollowLogsProducerGone(t *testing.T) {
  139. lw := logger.NewLogWatcher()
  140. defer lw.ConsumerGone()
  141. f, err := os.CreateTemp("", t.Name())
  142. assert.NilError(t, err)
  143. defer os.Remove(f.Name())
  144. var sent, received, closed int32
  145. dec := &dummyWrapper{fn: func() error {
  146. switch atomic.LoadInt32(&closed) {
  147. case 0:
  148. atomic.AddInt32(&sent, 1)
  149. return nil
  150. case 1:
  151. atomic.AddInt32(&closed, 1)
  152. t.Logf("logDecode() closed after sending %d messages\n", sent)
  153. return io.EOF
  154. default:
  155. t.Fatal("logDecode() called after closing!")
  156. return io.EOF
  157. }
  158. }}
  159. var since, until time.Time
  160. followLogsDone := make(chan struct{})
  161. go func() {
  162. followLogs(f, lw, make(chan interface{}), make(chan interface{}), dec, since, until)
  163. close(followLogsDone)
  164. }()
  165. // read 1 message
  166. select {
  167. case <-lw.Msg:
  168. received++
  169. case err := <-lw.Err:
  170. assert.NilError(t, err)
  171. case <-followLogsDone:
  172. t.Fatal("followLogs() finished unexpectedly")
  173. case <-time.After(10 * time.Second):
  174. t.Fatal("timeout waiting for log message")
  175. }
  176. // "stop" the "container"
  177. atomic.StoreInt32(&closed, 1)
  178. lw.ProducerGone()
  179. // should receive all the messages sent
  180. readDone := make(chan struct{})
  181. go func() {
  182. defer close(readDone)
  183. for {
  184. select {
  185. case <-lw.Msg:
  186. received++
  187. if received == atomic.LoadInt32(&sent) {
  188. return
  189. }
  190. case err := <-lw.Err:
  191. assert.NilError(t, err)
  192. }
  193. }
  194. }()
  195. select {
  196. case <-readDone:
  197. case <-time.After(30 * time.Second):
  198. t.Fatalf("timeout waiting for log messages to be read (sent: %d, received: %d", sent, received)
  199. }
  200. t.Logf("messages sent: %d, received: %d", atomic.LoadInt32(&sent), received)
  201. // followLogs() should be done by now
  202. select {
  203. case <-followLogsDone:
  204. case <-time.After(30 * time.Second):
  205. t.Fatal("timeout waiting for followLogs() to finish")
  206. }
  207. select {
  208. case <-lw.WatchConsumerGone():
  209. t.Fatal("consumer should not have exited")
  210. default:
  211. }
  212. }
  213. func TestCheckCapacityAndRotate(t *testing.T) {
  214. dir := t.TempDir()
  215. logPath := filepath.Join(dir, "log")
  216. getTailReader := func(ctx context.Context, r SizeReaderAt, lines int) (io.Reader, int, error) {
  217. return tailfile.NewTailReader(ctx, r, lines)
  218. }
  219. createDecoder := func(io.Reader) Decoder {
  220. return dummyDecoder{}
  221. }
  222. marshal := func(msg *logger.Message) ([]byte, error) {
  223. return msg.Line, nil
  224. }
  225. l, err := NewLogFile(
  226. logPath,
  227. 5, // capacity
  228. 3, // maxFiles
  229. true, // compress
  230. marshal,
  231. createDecoder,
  232. 0600, // perms
  233. getTailReader,
  234. )
  235. assert.NilError(t, err)
  236. defer l.Close()
  237. ls := dirStringer{dir}
  238. assert.NilError(t, l.WriteLogEntry(&logger.Message{Line: []byte("hello world!")}))
  239. _, err = os.Stat(logPath + ".1")
  240. assert.Assert(t, os.IsNotExist(err), ls)
  241. assert.NilError(t, l.WriteLogEntry(&logger.Message{Line: []byte("hello world!")}))
  242. poll.WaitOn(t, checkFileExists(logPath+".1.gz"), poll.WithDelay(time.Millisecond), poll.WithTimeout(30*time.Second))
  243. assert.NilError(t, l.WriteLogEntry(&logger.Message{Line: []byte("hello world!")}))
  244. poll.WaitOn(t, checkFileExists(logPath+".1.gz"), poll.WithDelay(time.Millisecond), poll.WithTimeout(30*time.Second))
  245. poll.WaitOn(t, checkFileExists(logPath+".2.gz"), poll.WithDelay(time.Millisecond), poll.WithTimeout(30*time.Second))
  246. t.Run("closed log file", func(t *testing.T) {
  247. // Now let's simulate a failed rotation where the file was able to be closed but something else happened elsewhere
  248. // down the line.
  249. // We want to make sure that we can recover in the case that `l.f` was closed while attempting a rotation.
  250. l.f.Close()
  251. assert.NilError(t, l.WriteLogEntry(&logger.Message{Line: []byte("hello world!")}))
  252. assert.NilError(t, os.Remove(logPath+".2.gz"))
  253. })
  254. t.Run("with log reader", func(t *testing.T) {
  255. // Make sure rotate works with an active reader
  256. lw := l.ReadLogs(logger.ReadConfig{Follow: true, Tail: 1000})
  257. defer lw.ConsumerGone()
  258. assert.NilError(t, l.WriteLogEntry(&logger.Message{Line: []byte("hello world 0!")}), ls)
  259. // make sure the log reader is primed
  260. waitForMsg(t, lw, 30*time.Second)
  261. assert.NilError(t, l.WriteLogEntry(&logger.Message{Line: []byte("hello world 1!")}), ls)
  262. assert.NilError(t, l.WriteLogEntry(&logger.Message{Line: []byte("hello world 2!")}), ls)
  263. assert.NilError(t, l.WriteLogEntry(&logger.Message{Line: []byte("hello world 3!")}), ls)
  264. assert.NilError(t, l.WriteLogEntry(&logger.Message{Line: []byte("hello world 4!")}), ls)
  265. poll.WaitOn(t, checkFileExists(logPath+".2.gz"), poll.WithDelay(time.Millisecond), poll.WithTimeout(30*time.Second))
  266. })
  267. }
  268. func waitForMsg(t *testing.T, lw *logger.LogWatcher, timeout time.Duration) {
  269. t.Helper()
  270. timer := time.NewTimer(timeout)
  271. defer timer.Stop()
  272. select {
  273. case <-lw.Msg:
  274. case <-lw.WatchProducerGone():
  275. t.Fatal("log producer gone before log message arrived")
  276. case err := <-lw.Err:
  277. assert.NilError(t, err)
  278. case <-timer.C:
  279. t.Fatal("timeout waiting for log message")
  280. }
  281. }
  282. type dirStringer struct {
  283. d string
  284. }
  285. func (d dirStringer) String() string {
  286. ls, err := os.ReadDir(d.d)
  287. if err != nil {
  288. return ""
  289. }
  290. buf := bytes.NewBuffer(nil)
  291. tw := tabwriter.NewWriter(buf, 1, 8, 1, '\t', 0)
  292. buf.WriteString("\n")
  293. btw := bufio.NewWriter(tw)
  294. for _, entry := range ls {
  295. fi, err := entry.Info()
  296. if err != nil {
  297. return ""
  298. }
  299. btw.WriteString(fmt.Sprintf("%s\t%s\t%dB\t%s\n", fi.Name(), fi.Mode(), fi.Size(), fi.ModTime()))
  300. }
  301. btw.Flush()
  302. tw.Flush()
  303. return buf.String()
  304. }
  305. func checkFileExists(name string) poll.Check {
  306. return func(t poll.LogT) poll.Result {
  307. _, err := os.Stat(name)
  308. switch {
  309. case err == nil:
  310. return poll.Success()
  311. case os.IsNotExist(err):
  312. return poll.Continue("waiting for %s to exist", name)
  313. default:
  314. t.Logf("waiting for %s: %v: %s", name, err, dirStringer{filepath.Dir(name)})
  315. return poll.Error(err)
  316. }
  317. }
  318. }