logfile_test.go 9.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375
  1. package loggerutils // import "github.com/docker/docker/daemon/logger/loggerutils"
  2. import (
  3. "bufio"
  4. "bytes"
  5. "context"
  6. "fmt"
  7. "io"
  8. "os"
  9. "path/filepath"
  10. "strings"
  11. "sync/atomic"
  12. "testing"
  13. "text/tabwriter"
  14. "time"
  15. "github.com/docker/docker/daemon/logger"
  16. "github.com/docker/docker/pkg/pubsub"
  17. "github.com/docker/docker/pkg/tailfile"
  18. "gotest.tools/v3/assert"
  19. "gotest.tools/v3/poll"
  20. )
  21. type testDecoder struct {
  22. rdr io.Reader
  23. scanner *bufio.Scanner
  24. resetCount int
  25. }
  26. func (d *testDecoder) Decode() (*logger.Message, error) {
  27. if d.scanner == nil {
  28. d.scanner = bufio.NewScanner(d.rdr)
  29. }
  30. if !d.scanner.Scan() {
  31. return nil, d.scanner.Err()
  32. }
  33. // some comment
  34. return &logger.Message{Line: d.scanner.Bytes(), Timestamp: time.Now()}, nil
  35. }
  36. func (d *testDecoder) Reset(rdr io.Reader) {
  37. d.rdr = rdr
  38. d.scanner = bufio.NewScanner(rdr)
  39. d.resetCount++
  40. }
  41. func (d *testDecoder) Close() {
  42. d.rdr = nil
  43. d.scanner = nil
  44. }
  45. func TestTailFiles(t *testing.T) {
  46. s1 := strings.NewReader("Hello.\nMy name is Inigo Montoya.\n")
  47. s2 := strings.NewReader("I'm serious.\nDon't call me Shirley!\n")
  48. s3 := strings.NewReader("Roads?\nWhere we're going we don't need roads.\n")
  49. files := []SizeReaderAt{s1, s2, s3}
  50. watcher := logger.NewLogWatcher()
  51. defer watcher.ConsumerGone()
  52. tailReader := func(ctx context.Context, r SizeReaderAt, lines int) (io.Reader, int, error) {
  53. return tailfile.NewTailReader(ctx, r, lines)
  54. }
  55. dec := &testDecoder{}
  56. for desc, config := range map[string]logger.ReadConfig{} {
  57. t.Run(desc, func(t *testing.T) {
  58. started := make(chan struct{})
  59. go func() {
  60. close(started)
  61. tailFiles(files, watcher, dec, tailReader, config, make(chan interface{}))
  62. }()
  63. <-started
  64. })
  65. }
  66. config := logger.ReadConfig{Tail: 2}
  67. started := make(chan struct{})
  68. go func() {
  69. close(started)
  70. tailFiles(files, watcher, dec, tailReader, config, make(chan interface{}))
  71. }()
  72. <-started
  73. select {
  74. case <-time.After(60 * time.Second):
  75. t.Fatal("timeout waiting for tail line")
  76. case err := <-watcher.Err:
  77. assert.NilError(t, err)
  78. case msg := <-watcher.Msg:
  79. assert.Assert(t, msg != nil)
  80. assert.Assert(t, string(msg.Line) == "Roads?", string(msg.Line))
  81. }
  82. select {
  83. case <-time.After(60 * time.Second):
  84. t.Fatal("timeout waiting for tail line")
  85. case err := <-watcher.Err:
  86. assert.NilError(t, err)
  87. case msg := <-watcher.Msg:
  88. assert.Assert(t, msg != nil)
  89. assert.Assert(t, string(msg.Line) == "Where we're going we don't need roads.", string(msg.Line))
  90. }
  91. }
  92. type dummyDecoder struct{}
  93. func (dummyDecoder) Decode() (*logger.Message, error) {
  94. return &logger.Message{}, nil
  95. }
  96. func (dummyDecoder) Close() {}
  97. func (dummyDecoder) Reset(io.Reader) {}
  98. func TestFollowLogsConsumerGone(t *testing.T) {
  99. lw := logger.NewLogWatcher()
  100. f, err := os.CreateTemp("", t.Name())
  101. assert.NilError(t, err)
  102. defer func() {
  103. f.Close()
  104. os.Remove(f.Name())
  105. }()
  106. dec := dummyDecoder{}
  107. followLogsDone := make(chan struct{})
  108. var since, until time.Time
  109. go func() {
  110. followLogs(f, lw, make(chan interface{}), make(chan interface{}), dec, since, until)
  111. close(followLogsDone)
  112. }()
  113. select {
  114. case <-lw.Msg:
  115. case err := <-lw.Err:
  116. assert.NilError(t, err)
  117. case <-followLogsDone:
  118. t.Fatal("follow logs finished unexpectedly")
  119. case <-time.After(10 * time.Second):
  120. t.Fatal("timeout waiting for log message")
  121. }
  122. lw.ConsumerGone()
  123. select {
  124. case <-followLogsDone:
  125. case <-time.After(20 * time.Second):
  126. t.Fatal("timeout waiting for followLogs() to finish")
  127. }
  128. }
  129. type dummyWrapper struct {
  130. dummyDecoder
  131. fn func() error
  132. }
  133. func (d *dummyWrapper) Decode() (*logger.Message, error) {
  134. if err := d.fn(); err != nil {
  135. return nil, err
  136. }
  137. return d.dummyDecoder.Decode()
  138. }
  139. func TestFollowLogsProducerGone(t *testing.T) {
  140. lw := logger.NewLogWatcher()
  141. defer lw.ConsumerGone()
  142. f, err := os.CreateTemp("", t.Name())
  143. assert.NilError(t, err)
  144. defer os.Remove(f.Name())
  145. var sent, received, closed int32
  146. dec := &dummyWrapper{fn: func() error {
  147. switch atomic.LoadInt32(&closed) {
  148. case 0:
  149. atomic.AddInt32(&sent, 1)
  150. return nil
  151. case 1:
  152. atomic.AddInt32(&closed, 1)
  153. t.Logf("logDecode() closed after sending %d messages\n", sent)
  154. return io.EOF
  155. default:
  156. t.Fatal("logDecode() called after closing!")
  157. return io.EOF
  158. }
  159. }}
  160. var since, until time.Time
  161. followLogsDone := make(chan struct{})
  162. go func() {
  163. followLogs(f, lw, make(chan interface{}), make(chan interface{}), dec, since, until)
  164. close(followLogsDone)
  165. }()
  166. // read 1 message
  167. select {
  168. case <-lw.Msg:
  169. received++
  170. case err := <-lw.Err:
  171. assert.NilError(t, err)
  172. case <-followLogsDone:
  173. t.Fatal("followLogs() finished unexpectedly")
  174. case <-time.After(10 * time.Second):
  175. t.Fatal("timeout waiting for log message")
  176. }
  177. // "stop" the "container"
  178. atomic.StoreInt32(&closed, 1)
  179. lw.ProducerGone()
  180. // should receive all the messages sent
  181. readDone := make(chan struct{})
  182. go func() {
  183. defer close(readDone)
  184. for {
  185. select {
  186. case <-lw.Msg:
  187. received++
  188. if received == atomic.LoadInt32(&sent) {
  189. return
  190. }
  191. case err := <-lw.Err:
  192. assert.NilError(t, err)
  193. }
  194. }
  195. }()
  196. select {
  197. case <-readDone:
  198. case <-time.After(30 * time.Second):
  199. t.Fatalf("timeout waiting for log messages to be read (sent: %d, received: %d", sent, received)
  200. }
  201. t.Logf("messages sent: %d, received: %d", atomic.LoadInt32(&sent), received)
  202. // followLogs() should be done by now
  203. select {
  204. case <-followLogsDone:
  205. case <-time.After(30 * time.Second):
  206. t.Fatal("timeout waiting for followLogs() to finish")
  207. }
  208. select {
  209. case <-lw.WatchConsumerGone():
  210. t.Fatal("consumer should not have exited")
  211. default:
  212. }
  213. }
  214. func TestCheckCapacityAndRotate(t *testing.T) {
  215. dir, err := os.MkdirTemp("", t.Name())
  216. assert.NilError(t, err)
  217. defer os.RemoveAll(dir)
  218. f, err := os.CreateTemp(dir, "log")
  219. assert.NilError(t, err)
  220. l := &LogFile{
  221. f: f,
  222. capacity: 5,
  223. maxFiles: 3,
  224. compress: true,
  225. notifyReaders: pubsub.NewPublisher(0, 1),
  226. perms: 0600,
  227. filesRefCounter: refCounter{counter: make(map[string]int)},
  228. getTailReader: func(ctx context.Context, r SizeReaderAt, lines int) (io.Reader, int, error) {
  229. return tailfile.NewTailReader(ctx, r, lines)
  230. },
  231. createDecoder: func(io.Reader) Decoder {
  232. return dummyDecoder{}
  233. },
  234. marshal: func(msg *logger.Message) ([]byte, error) {
  235. return msg.Line, nil
  236. },
  237. }
  238. defer l.Close()
  239. ls := dirStringer{dir}
  240. assert.NilError(t, l.WriteLogEntry(&logger.Message{Line: []byte("hello world!")}))
  241. _, err = os.Stat(f.Name() + ".1")
  242. assert.Assert(t, os.IsNotExist(err), ls)
  243. assert.NilError(t, l.WriteLogEntry(&logger.Message{Line: []byte("hello world!")}))
  244. poll.WaitOn(t, checkFileExists(f.Name()+".1.gz"), poll.WithDelay(time.Millisecond), poll.WithTimeout(30*time.Second))
  245. assert.NilError(t, l.WriteLogEntry(&logger.Message{Line: []byte("hello world!")}))
  246. poll.WaitOn(t, checkFileExists(f.Name()+".1.gz"), poll.WithDelay(time.Millisecond), poll.WithTimeout(30*time.Second))
  247. poll.WaitOn(t, checkFileExists(f.Name()+".2.gz"), poll.WithDelay(time.Millisecond), poll.WithTimeout(30*time.Second))
  248. t.Run("closed log file", func(t *testing.T) {
  249. // Now let's simulate a failed rotation where the file was able to be closed but something else happened elsewhere
  250. // down the line.
  251. // We want to make sure that we can recover in the case that `l.f` was closed while attempting a rotation.
  252. l.f.Close()
  253. assert.NilError(t, l.WriteLogEntry(&logger.Message{Line: []byte("hello world!")}))
  254. assert.NilError(t, os.Remove(f.Name()+".2.gz"))
  255. })
  256. t.Run("with log reader", func(t *testing.T) {
  257. // Make sure rotate works with an active reader
  258. lw := logger.NewLogWatcher()
  259. defer lw.ConsumerGone()
  260. go l.ReadLogs(logger.ReadConfig{Follow: true, Tail: 1000}, lw)
  261. assert.NilError(t, l.WriteLogEntry(&logger.Message{Line: []byte("hello world 0!")}), ls)
  262. // make sure the log reader is primed
  263. waitForMsg(t, lw, 30*time.Second)
  264. assert.NilError(t, l.WriteLogEntry(&logger.Message{Line: []byte("hello world 1!")}), ls)
  265. assert.NilError(t, l.WriteLogEntry(&logger.Message{Line: []byte("hello world 2!")}), ls)
  266. assert.NilError(t, l.WriteLogEntry(&logger.Message{Line: []byte("hello world 3!")}), ls)
  267. assert.NilError(t, l.WriteLogEntry(&logger.Message{Line: []byte("hello world 4!")}), ls)
  268. poll.WaitOn(t, checkFileExists(f.Name()+".2.gz"), poll.WithDelay(time.Millisecond), poll.WithTimeout(30*time.Second))
  269. })
  270. }
  271. func waitForMsg(t *testing.T, lw *logger.LogWatcher, timeout time.Duration) {
  272. t.Helper()
  273. timer := time.NewTimer(timeout)
  274. defer timer.Stop()
  275. select {
  276. case <-lw.Msg:
  277. case <-lw.WatchProducerGone():
  278. t.Fatal("log producer gone before log message arrived")
  279. case err := <-lw.Err:
  280. assert.NilError(t, err)
  281. case <-timer.C:
  282. t.Fatal("timeout waiting for log message")
  283. }
  284. }
  285. type dirStringer struct {
  286. d string
  287. }
  288. func (d dirStringer) String() string {
  289. ls, err := os.ReadDir(d.d)
  290. if err != nil {
  291. return ""
  292. }
  293. buf := bytes.NewBuffer(nil)
  294. tw := tabwriter.NewWriter(buf, 1, 8, 1, '\t', 0)
  295. buf.WriteString("\n")
  296. btw := bufio.NewWriter(tw)
  297. for _, entry := range ls {
  298. fi, err := entry.Info()
  299. if err != nil {
  300. return ""
  301. }
  302. btw.WriteString(fmt.Sprintf("%s\t%s\t%dB\t%s\n", fi.Name(), fi.Mode(), fi.Size(), fi.ModTime()))
  303. }
  304. btw.Flush()
  305. tw.Flush()
  306. return buf.String()
  307. }
  308. func checkFileExists(name string) poll.Check {
  309. return func(t poll.LogT) poll.Result {
  310. _, err := os.Stat(name)
  311. switch {
  312. case err == nil:
  313. return poll.Success()
  314. case os.IsNotExist(err):
  315. return poll.Continue("waiting for %s to exist", name)
  316. default:
  317. t.Logf("waiting for %s: %v: %s", name, err, dirStringer{filepath.Dir(name)})
  318. return poll.Error(err)
  319. }
  320. }
  321. }