logfile_test.go 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446
  1. package loggerutils // import "github.com/docker/docker/daemon/logger/loggerutils"
  2. import (
  3. "bufio"
  4. "bytes"
  5. "context"
  6. "errors"
  7. "fmt"
  8. "io"
  9. "os"
  10. "path/filepath"
  11. "strings"
  12. "sync"
  13. "sync/atomic"
  14. "testing"
  15. "text/tabwriter"
  16. "time"
  17. "github.com/docker/docker/daemon/logger"
  18. "github.com/docker/docker/pkg/pubsub"
  19. "github.com/docker/docker/pkg/tailfile"
  20. "gotest.tools/v3/assert"
  21. "gotest.tools/v3/poll"
  22. )
  23. type testDecoder struct {
  24. rdr io.Reader
  25. scanner *bufio.Scanner
  26. }
  27. func (d *testDecoder) Decode() (*logger.Message, error) {
  28. if d.scanner == nil {
  29. d.scanner = bufio.NewScanner(d.rdr)
  30. }
  31. if !d.scanner.Scan() {
  32. return nil, d.scanner.Err()
  33. }
  34. // some comment
  35. return &logger.Message{Line: d.scanner.Bytes(), Timestamp: time.Now()}, nil
  36. }
  37. func (d *testDecoder) Reset(rdr io.Reader) {
  38. d.rdr = rdr
  39. d.scanner = bufio.NewScanner(rdr)
  40. }
  41. func (d *testDecoder) Close() {
  42. d.rdr = nil
  43. d.scanner = nil
  44. }
  45. func TestTailFiles(t *testing.T) {
  46. s1 := strings.NewReader("Hello.\nMy name is Inigo Montoya.\n")
  47. s2 := strings.NewReader("I'm serious.\nDon't call me Shirley!\n")
  48. s3 := strings.NewReader("Roads?\nWhere we're going we don't need roads.\n")
  49. files := []SizeReaderAt{s1, s2, s3}
  50. watcher := logger.NewLogWatcher()
  51. defer watcher.ConsumerGone()
  52. tailReader := func(ctx context.Context, r SizeReaderAt, lines int) (io.Reader, int, error) {
  53. return tailfile.NewTailReader(ctx, r, lines)
  54. }
  55. dec := &testDecoder{}
  56. for desc, config := range map[string]logger.ReadConfig{} {
  57. t.Run(desc, func(t *testing.T) {
  58. started := make(chan struct{})
  59. go func() {
  60. close(started)
  61. tailFiles(files, watcher, dec, tailReader, config, make(chan interface{}))
  62. }()
  63. <-started
  64. })
  65. }
  66. config := logger.ReadConfig{Tail: 2}
  67. started := make(chan struct{})
  68. go func() {
  69. close(started)
  70. tailFiles(files, watcher, dec, tailReader, config, make(chan interface{}))
  71. }()
  72. <-started
  73. select {
  74. case <-time.After(60 * time.Second):
  75. t.Fatal("timeout waiting for tail line")
  76. case err := <-watcher.Err:
  77. assert.NilError(t, err)
  78. case msg := <-watcher.Msg:
  79. assert.Assert(t, msg != nil)
  80. assert.Assert(t, string(msg.Line) == "Roads?", string(msg.Line))
  81. }
  82. select {
  83. case <-time.After(60 * time.Second):
  84. t.Fatal("timeout waiting for tail line")
  85. case err := <-watcher.Err:
  86. assert.NilError(t, err)
  87. case msg := <-watcher.Msg:
  88. assert.Assert(t, msg != nil)
  89. assert.Assert(t, string(msg.Line) == "Where we're going we don't need roads.", string(msg.Line))
  90. }
  91. }
  92. type dummyDecoder struct{}
  93. func (dummyDecoder) Decode() (*logger.Message, error) {
  94. return &logger.Message{}, nil
  95. }
  96. func (dummyDecoder) Close() {}
  97. func (dummyDecoder) Reset(io.Reader) {}
  98. func TestFollowLogsConsumerGone(t *testing.T) {
  99. lw := logger.NewLogWatcher()
  100. f, err := os.CreateTemp("", t.Name())
  101. assert.NilError(t, err)
  102. defer func() {
  103. f.Close()
  104. os.Remove(f.Name())
  105. }()
  106. dec := dummyDecoder{}
  107. followLogsDone := make(chan struct{})
  108. var since, until time.Time
  109. go func() {
  110. followLogs(f, lw, make(chan interface{}), make(chan interface{}), dec, since, until)
  111. close(followLogsDone)
  112. }()
  113. select {
  114. case <-lw.Msg:
  115. case err := <-lw.Err:
  116. assert.NilError(t, err)
  117. case <-followLogsDone:
  118. t.Fatal("follow logs finished unexpectedly")
  119. case <-time.After(10 * time.Second):
  120. t.Fatal("timeout waiting for log message")
  121. }
  122. lw.ConsumerGone()
  123. select {
  124. case <-followLogsDone:
  125. case <-time.After(20 * time.Second):
  126. t.Fatal("timeout waiting for followLogs() to finish")
  127. }
  128. }
  129. type dummyWrapper struct {
  130. dummyDecoder
  131. fn func() error
  132. }
  133. func (d *dummyWrapper) Decode() (*logger.Message, error) {
  134. if err := d.fn(); err != nil {
  135. return nil, err
  136. }
  137. return d.dummyDecoder.Decode()
  138. }
  139. func TestFollowLogsProducerGone(t *testing.T) {
  140. lw := logger.NewLogWatcher()
  141. defer lw.ConsumerGone()
  142. f, err := os.CreateTemp("", t.Name())
  143. assert.NilError(t, err)
  144. defer os.Remove(f.Name())
  145. var sent, received, closed int32
  146. dec := &dummyWrapper{fn: func() error {
  147. switch atomic.LoadInt32(&closed) {
  148. case 0:
  149. atomic.AddInt32(&sent, 1)
  150. return nil
  151. case 1:
  152. atomic.AddInt32(&closed, 1)
  153. t.Logf("logDecode() closed after sending %d messages\n", sent)
  154. return io.EOF
  155. default:
  156. t.Fatal("logDecode() called after closing!")
  157. return io.EOF
  158. }
  159. }}
  160. var since, until time.Time
  161. followLogsDone := make(chan struct{})
  162. go func() {
  163. followLogs(f, lw, make(chan interface{}), make(chan interface{}), dec, since, until)
  164. close(followLogsDone)
  165. }()
  166. // read 1 message
  167. select {
  168. case <-lw.Msg:
  169. received++
  170. case err := <-lw.Err:
  171. assert.NilError(t, err)
  172. case <-followLogsDone:
  173. t.Fatal("followLogs() finished unexpectedly")
  174. case <-time.After(10 * time.Second):
  175. t.Fatal("timeout waiting for log message")
  176. }
  177. // "stop" the "container"
  178. atomic.StoreInt32(&closed, 1)
  179. lw.ProducerGone()
  180. // should receive all the messages sent
  181. readDone := make(chan struct{})
  182. go func() {
  183. defer close(readDone)
  184. for {
  185. select {
  186. case <-lw.Msg:
  187. received++
  188. if received == atomic.LoadInt32(&sent) {
  189. return
  190. }
  191. case err := <-lw.Err:
  192. assert.NilError(t, err)
  193. }
  194. }
  195. }()
  196. select {
  197. case <-readDone:
  198. case <-time.After(30 * time.Second):
  199. t.Fatalf("timeout waiting for log messages to be read (sent: %d, received: %d", sent, received)
  200. }
  201. t.Logf("messages sent: %d, received: %d", atomic.LoadInt32(&sent), received)
  202. // followLogs() should be done by now
  203. select {
  204. case <-followLogsDone:
  205. case <-time.After(30 * time.Second):
  206. t.Fatal("timeout waiting for followLogs() to finish")
  207. }
  208. select {
  209. case <-lw.WatchConsumerGone():
  210. t.Fatal("consumer should not have exited")
  211. default:
  212. }
  213. }
  214. type lineDecoder struct {
  215. r *bufio.Reader
  216. resetCount int
  217. }
  218. func (d *lineDecoder) Decode() (*logger.Message, error) {
  219. line, err := d.r.ReadString('\n')
  220. if err != nil {
  221. return nil, err
  222. }
  223. m := logger.NewMessage()
  224. m.Line = []byte(line)
  225. return m, nil
  226. }
  227. func (d *lineDecoder) Reset(r io.Reader) {
  228. d.r = bufio.NewReader(r)
  229. d.resetCount++
  230. }
  231. func (d *lineDecoder) Close() {
  232. }
  233. func TestFollowLogsHandleDecodeErr(t *testing.T) {
  234. lw := logger.NewLogWatcher()
  235. defer lw.ConsumerGone()
  236. fw, err := os.CreateTemp("", t.Name())
  237. assert.NilError(t, err)
  238. defer os.Remove(fw.Name())
  239. fr, err := os.Open(fw.Name())
  240. assert.NilError(t, err)
  241. dec := &lineDecoder{}
  242. dec.Reset(fr)
  243. var since, until time.Time
  244. rotate := make(chan interface{})
  245. evict := make(chan interface{})
  246. var wg sync.WaitGroup
  247. wg.Add(1)
  248. go func() {
  249. defer wg.Done()
  250. followLogs(fr, lw, rotate, evict, dec, since, until)
  251. }()
  252. sendReceive := func(f io.Writer, message string) {
  253. _, err = f.Write([]byte(message))
  254. assert.NilError(t, err)
  255. m := <-lw.Msg
  256. assert.Equal(t, message, string(m.Line))
  257. }
  258. sendReceive(fw, "log1\n")
  259. sendReceive(fw, "log2\n")
  260. ft, err := os.OpenFile(fw.Name(), os.O_WRONLY|os.O_TRUNC, 0600)
  261. assert.NilError(t, err)
  262. sendReceive(ft, "log3\n")
  263. evict <- errors.New("stop followLogs")
  264. wg.Wait()
  265. // followLogs calls Reset() in the beginning,
  266. // each 3 writes result Reset(), then handleDecodeErr() calles Reset().
  267. assert.Equal(t, 5, dec.resetCount)
  268. }
  269. func TestCheckCapacityAndRotate(t *testing.T) {
  270. dir, err := os.MkdirTemp("", t.Name())
  271. assert.NilError(t, err)
  272. defer os.RemoveAll(dir)
  273. f, err := os.CreateTemp(dir, "log")
  274. assert.NilError(t, err)
  275. l := &LogFile{
  276. f: f,
  277. capacity: 5,
  278. maxFiles: 3,
  279. compress: true,
  280. notifyReaders: pubsub.NewPublisher(0, 1),
  281. perms: 0600,
  282. filesRefCounter: refCounter{counter: make(map[string]int)},
  283. getTailReader: func(ctx context.Context, r SizeReaderAt, lines int) (io.Reader, int, error) {
  284. return tailfile.NewTailReader(ctx, r, lines)
  285. },
  286. createDecoder: func(io.Reader) Decoder {
  287. return dummyDecoder{}
  288. },
  289. marshal: func(msg *logger.Message) ([]byte, error) {
  290. return msg.Line, nil
  291. },
  292. }
  293. defer l.Close()
  294. ls := dirStringer{dir}
  295. assert.NilError(t, l.WriteLogEntry(&logger.Message{Line: []byte("hello world!")}))
  296. _, err = os.Stat(f.Name() + ".1")
  297. assert.Assert(t, os.IsNotExist(err), ls)
  298. assert.NilError(t, l.WriteLogEntry(&logger.Message{Line: []byte("hello world!")}))
  299. poll.WaitOn(t, checkFileExists(f.Name()+".1.gz"), poll.WithDelay(time.Millisecond), poll.WithTimeout(30*time.Second))
  300. assert.NilError(t, l.WriteLogEntry(&logger.Message{Line: []byte("hello world!")}))
  301. poll.WaitOn(t, checkFileExists(f.Name()+".1.gz"), poll.WithDelay(time.Millisecond), poll.WithTimeout(30*time.Second))
  302. poll.WaitOn(t, checkFileExists(f.Name()+".2.gz"), poll.WithDelay(time.Millisecond), poll.WithTimeout(30*time.Second))
  303. t.Run("closed log file", func(t *testing.T) {
  304. // Now let's simulate a failed rotation where the file was able to be closed but something else happened elsewhere
  305. // down the line.
  306. // We want to make sure that we can recover in the case that `l.f` was closed while attempting a rotation.
  307. l.f.Close()
  308. assert.NilError(t, l.WriteLogEntry(&logger.Message{Line: []byte("hello world!")}))
  309. assert.NilError(t, os.Remove(f.Name()+".2.gz"))
  310. })
  311. t.Run("with log reader", func(t *testing.T) {
  312. // Make sure rotate works with an active reader
  313. lw := logger.NewLogWatcher()
  314. defer lw.ConsumerGone()
  315. go l.ReadLogs(logger.ReadConfig{Follow: true, Tail: 1000}, lw)
  316. assert.NilError(t, l.WriteLogEntry(&logger.Message{Line: []byte("hello world 0!")}), ls)
  317. // make sure the log reader is primed
  318. waitForMsg(t, lw, 30*time.Second)
  319. assert.NilError(t, l.WriteLogEntry(&logger.Message{Line: []byte("hello world 1!")}), ls)
  320. assert.NilError(t, l.WriteLogEntry(&logger.Message{Line: []byte("hello world 2!")}), ls)
  321. assert.NilError(t, l.WriteLogEntry(&logger.Message{Line: []byte("hello world 3!")}), ls)
  322. assert.NilError(t, l.WriteLogEntry(&logger.Message{Line: []byte("hello world 4!")}), ls)
  323. poll.WaitOn(t, checkFileExists(f.Name()+".2.gz"), poll.WithDelay(time.Millisecond), poll.WithTimeout(30*time.Second))
  324. })
  325. }
  326. func waitForMsg(t *testing.T, lw *logger.LogWatcher, timeout time.Duration) {
  327. t.Helper()
  328. timer := time.NewTimer(timeout)
  329. defer timer.Stop()
  330. select {
  331. case <-lw.Msg:
  332. case <-lw.WatchProducerGone():
  333. t.Fatal("log producer gone before log message arrived")
  334. case err := <-lw.Err:
  335. assert.NilError(t, err)
  336. case <-timer.C:
  337. t.Fatal("timeout waiting for log message")
  338. }
  339. }
  340. type dirStringer struct {
  341. d string
  342. }
  343. func (d dirStringer) String() string {
  344. ls, err := os.ReadDir(d.d)
  345. if err != nil {
  346. return ""
  347. }
  348. buf := bytes.NewBuffer(nil)
  349. tw := tabwriter.NewWriter(buf, 1, 8, 1, '\t', 0)
  350. buf.WriteString("\n")
  351. btw := bufio.NewWriter(tw)
  352. for _, entry := range ls {
  353. fi, err := entry.Info()
  354. if err != nil {
  355. return ""
  356. }
  357. btw.WriteString(fmt.Sprintf("%s\t%s\t%dB\t%s\n", fi.Name(), fi.Mode(), fi.Size(), fi.ModTime()))
  358. }
  359. btw.Flush()
  360. tw.Flush()
  361. return buf.String()
  362. }
  363. func checkFileExists(name string) poll.Check {
  364. return func(t poll.LogT) poll.Result {
  365. _, err := os.Stat(name)
  366. switch {
  367. case err == nil:
  368. return poll.Success()
  369. case os.IsNotExist(err):
  370. return poll.Continue("waiting for %s to exist", name)
  371. default:
  372. t.Logf("waiting for %s: %v: %s", name, err, dirStringer{filepath.Dir(name)})
  373. return poll.Error(err)
  374. }
  375. }
  376. }