logreader.go 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561
  1. package loggertest // import "github.com/docker/docker/daemon/logger/loggertest"
  2. import (
  3. "runtime"
  4. "strings"
  5. "sync"
  6. "testing"
  7. "time"
  8. "github.com/google/go-cmp/cmp"
  9. "github.com/google/go-cmp/cmp/cmpopts"
  10. "gotest.tools/v3/assert"
  11. "gotest.tools/v3/assert/opt"
  12. "github.com/docker/docker/api/types/backend"
  13. "github.com/docker/docker/daemon/logger"
  14. )
  15. type syncer interface {
  16. // Sync commits the current logs to stable storage such that the most
  17. // recently-logged message can be immediately read back by a LogReader.
  18. Sync() error
  19. }
  20. func syncLogger(t *testing.T, l logger.Logger) {
  21. if sl, ok := l.(syncer); ok {
  22. assert.NilError(t, sl.Sync())
  23. }
  24. }
  25. // Reader tests that a logger.LogReader implementation behaves as it should.
  26. type Reader struct {
  27. // Factory returns a function which constructs loggers for the container
  28. // specified in info. Each call to the returned function must yield a
  29. // distinct logger instance which can read back logs written by earlier
  30. // instances.
  31. Factory func(*testing.T, logger.Info) func(*testing.T) logger.Logger
  32. }
  33. var compareLog cmp.Options = []cmp.Option{
  34. // Not all log drivers can round-trip timestamps at full nanosecond
  35. // precision.
  36. opt.TimeWithThreshold(time.Millisecond),
  37. // The json-log driver does not round-trip PLogMetaData and API users do
  38. // not expect it.
  39. cmpopts.IgnoreFields(logger.Message{}, "PLogMetaData"),
  40. cmp.Transformer("string", func(b []byte) string { return string(b) }),
  41. }
  42. // TestTail tests the behavior of the LogReader's tail implementation.
  43. func (tr Reader) TestTail(t *testing.T) {
  44. t.Run("Live", func(t *testing.T) { tr.testTail(t, true) })
  45. t.Run("LiveEmpty", func(t *testing.T) { tr.testTailEmptyLogs(t, true) })
  46. t.Run("Stopped", func(t *testing.T) { tr.testTail(t, false) })
  47. t.Run("StoppedEmpty", func(t *testing.T) { tr.testTailEmptyLogs(t, false) })
  48. }
  49. func makeTestMessages() []*logger.Message {
  50. return []*logger.Message{
  51. {Source: "stdout", Timestamp: time.Now().Add(-1 * 30 * time.Minute), Line: []byte("a message")},
  52. {Source: "stdout", Timestamp: time.Now().Add(-1 * 20 * time.Minute), Line: []byte("another message"), PLogMetaData: &backend.PartialLogMetaData{ID: "aaaaaaaa", Ordinal: 1, Last: true}},
  53. {Source: "stderr", Timestamp: time.Now().Add(-1 * 15 * time.Minute), Line: []byte("to be..."), PLogMetaData: &backend.PartialLogMetaData{ID: "bbbbbbbb", Ordinal: 1}},
  54. {Source: "stderr", Timestamp: time.Now().Add(-1 * 15 * time.Minute), Line: []byte("continued"), PLogMetaData: &backend.PartialLogMetaData{ID: "bbbbbbbb", Ordinal: 2, Last: true}},
  55. {Source: "stderr", Timestamp: time.Now().Add(-1 * 10 * time.Minute), Line: []byte("a really long message " + strings.Repeat("a", 4096))},
  56. {Source: "stderr", Timestamp: time.Now().Add(-1 * 10 * time.Minute), Line: []byte("just one more message")},
  57. {Source: "stdout", Timestamp: time.Now().Add(-1 * 90 * time.Minute), Line: []byte("someone adjusted the clock")},
  58. }
  59. }
  60. func (tr Reader) testTail(t *testing.T, live bool) {
  61. t.Parallel()
  62. factory := tr.Factory(t, logger.Info{
  63. ContainerID: "tailtest0000",
  64. ContainerName: "logtail",
  65. })
  66. l := factory(t)
  67. if live {
  68. defer func() { assert.NilError(t, l.Close()) }()
  69. }
  70. mm := makeTestMessages()
  71. expected := logMessages(t, l, mm)
  72. if !live {
  73. // Simulate reading from a stopped container.
  74. assert.NilError(t, l.Close())
  75. l = factory(t)
  76. defer func() { assert.NilError(t, l.Close()) }()
  77. }
  78. lr := l.(logger.LogReader)
  79. t.Run("Exact", func(t *testing.T) {
  80. t.Parallel()
  81. lw := lr.ReadLogs(logger.ReadConfig{Tail: len(mm)})
  82. defer lw.ConsumerGone()
  83. assert.DeepEqual(t, readAll(t, lw), expected, compareLog)
  84. })
  85. t.Run("LessThanAvailable", func(t *testing.T) {
  86. t.Parallel()
  87. lw := lr.ReadLogs(logger.ReadConfig{Tail: 2})
  88. defer lw.ConsumerGone()
  89. assert.DeepEqual(t, readAll(t, lw), expected[len(mm)-2:], compareLog)
  90. })
  91. t.Run("MoreThanAvailable", func(t *testing.T) {
  92. t.Parallel()
  93. lw := lr.ReadLogs(logger.ReadConfig{Tail: 100})
  94. defer lw.ConsumerGone()
  95. assert.DeepEqual(t, readAll(t, lw), expected, compareLog)
  96. })
  97. t.Run("All", func(t *testing.T) {
  98. t.Parallel()
  99. lw := lr.ReadLogs(logger.ReadConfig{Tail: -1})
  100. defer lw.ConsumerGone()
  101. assert.DeepEqual(t, readAll(t, lw), expected, compareLog)
  102. })
  103. t.Run("Since", func(t *testing.T) {
  104. t.Parallel()
  105. lw := lr.ReadLogs(logger.ReadConfig{Tail: -1, Since: mm[1].Timestamp.Truncate(time.Millisecond)})
  106. defer lw.ConsumerGone()
  107. assert.DeepEqual(t, readAll(t, lw), expected[1:], compareLog)
  108. })
  109. t.Run("MoreThanSince", func(t *testing.T) {
  110. t.Parallel()
  111. lw := lr.ReadLogs(logger.ReadConfig{Tail: len(mm), Since: mm[1].Timestamp.Truncate(time.Millisecond)})
  112. defer lw.ConsumerGone()
  113. assert.DeepEqual(t, readAll(t, lw), expected[1:], compareLog)
  114. })
  115. t.Run("LessThanSince", func(t *testing.T) {
  116. t.Parallel()
  117. lw := lr.ReadLogs(logger.ReadConfig{Tail: len(mm) - 2, Since: mm[1].Timestamp.Truncate(time.Millisecond)})
  118. defer lw.ConsumerGone()
  119. assert.DeepEqual(t, readAll(t, lw), expected[2:], compareLog)
  120. })
  121. t.Run("Until", func(t *testing.T) {
  122. t.Parallel()
  123. lw := lr.ReadLogs(logger.ReadConfig{Tail: -1, Until: mm[2].Timestamp.Add(-time.Millisecond)})
  124. defer lw.ConsumerGone()
  125. assert.DeepEqual(t, readAll(t, lw), expected[:2], compareLog)
  126. })
  127. t.Run("SinceAndUntil", func(t *testing.T) {
  128. t.Parallel()
  129. lw := lr.ReadLogs(logger.ReadConfig{Tail: -1, Since: mm[1].Timestamp.Truncate(time.Millisecond), Until: mm[1].Timestamp.Add(time.Millisecond)})
  130. defer lw.ConsumerGone()
  131. assert.DeepEqual(t, readAll(t, lw), expected[1:2], compareLog)
  132. })
  133. }
  134. func (tr Reader) testTailEmptyLogs(t *testing.T, live bool) {
  135. t.Parallel()
  136. factory := tr.Factory(t, logger.Info{
  137. ContainerID: "tailemptytest",
  138. ContainerName: "logtail",
  139. })
  140. l := factory(t)
  141. if !live {
  142. assert.NilError(t, l.Close())
  143. l = factory(t)
  144. }
  145. defer func() { assert.NilError(t, l.Close()) }()
  146. for _, tt := range []struct {
  147. name string
  148. cfg logger.ReadConfig
  149. }{
  150. {name: "Zero", cfg: logger.ReadConfig{}},
  151. {name: "All", cfg: logger.ReadConfig{Tail: -1}},
  152. {name: "Tail", cfg: logger.ReadConfig{Tail: 42}},
  153. {name: "Since", cfg: logger.ReadConfig{Since: time.Unix(1, 0)}},
  154. {name: "Until", cfg: logger.ReadConfig{Until: time.Date(2100, time.January, 1, 1, 1, 1, 0, time.UTC)}},
  155. {name: "SinceAndUntil", cfg: logger.ReadConfig{Since: time.Unix(1, 0), Until: time.Date(2100, time.January, 1, 1, 1, 1, 0, time.UTC)}},
  156. } {
  157. tt := tt
  158. t.Run(tt.name, func(t *testing.T) {
  159. t.Parallel()
  160. lw := l.(logger.LogReader).ReadLogs(logger.ReadConfig{})
  161. defer lw.ConsumerGone()
  162. assert.DeepEqual(t, readAll(t, lw), ([]*logger.Message)(nil), cmpopts.EquateEmpty())
  163. })
  164. }
  165. }
  166. // TestFollow tests the LogReader's follow implementation.
  167. //
  168. // The LogReader is expected to be able to follow an arbitrary number of
  169. // messages at a high rate with no dropped messages.
  170. func (tr Reader) TestFollow(t *testing.T) {
  171. // Reader sends all logs and closes after logger is closed
  172. // - Starting from empty log (like run)
  173. t.Run("FromEmptyLog", func(t *testing.T) {
  174. t.Parallel()
  175. l := tr.Factory(t, logger.Info{
  176. ContainerID: "followstart0",
  177. ContainerName: "logloglog",
  178. })(t)
  179. lw := l.(logger.LogReader).ReadLogs(logger.ReadConfig{Tail: -1, Follow: true})
  180. defer lw.ConsumerGone()
  181. doneReading := make(chan struct{})
  182. var logs []*logger.Message
  183. go func() {
  184. defer close(doneReading)
  185. logs = readAll(t, lw)
  186. }()
  187. mm := makeTestMessages()
  188. expected := logMessages(t, l, mm)
  189. assert.NilError(t, l.Close())
  190. <-doneReading
  191. assert.DeepEqual(t, logs, expected, compareLog)
  192. })
  193. t.Run("AttachMidStream", func(t *testing.T) {
  194. t.Parallel()
  195. l := tr.Factory(t, logger.Info{
  196. ContainerID: "followmiddle",
  197. ContainerName: "logloglog",
  198. })(t)
  199. mm := makeTestMessages()
  200. expected := logMessages(t, l, mm[0:1])
  201. lw := l.(logger.LogReader).ReadLogs(logger.ReadConfig{Tail: -1, Follow: true})
  202. defer lw.ConsumerGone()
  203. doneReading := make(chan struct{})
  204. var logs []*logger.Message
  205. go func() {
  206. defer close(doneReading)
  207. logs = readAll(t, lw)
  208. }()
  209. expected = append(expected, logMessages(t, l, mm[1:])...)
  210. assert.NilError(t, l.Close())
  211. <-doneReading
  212. assert.DeepEqual(t, logs, expected, compareLog)
  213. })
  214. t.Run("Since", func(t *testing.T) {
  215. t.Parallel()
  216. l := tr.Factory(t, logger.Info{
  217. ContainerID: "followsince0",
  218. ContainerName: "logloglog",
  219. })(t)
  220. mm := makeTestMessages()
  221. lw := l.(logger.LogReader).ReadLogs(logger.ReadConfig{Tail: -1, Follow: true, Since: mm[2].Timestamp.Truncate(time.Millisecond)})
  222. defer lw.ConsumerGone()
  223. doneReading := make(chan struct{})
  224. var logs []*logger.Message
  225. go func() {
  226. defer close(doneReading)
  227. logs = readAll(t, lw)
  228. }()
  229. expected := logMessages(t, l, mm)[2:]
  230. assert.NilError(t, l.Close())
  231. <-doneReading
  232. assert.DeepEqual(t, logs, expected, compareLog)
  233. })
  234. t.Run("Until", func(t *testing.T) {
  235. t.Parallel()
  236. l := tr.Factory(t, logger.Info{
  237. ContainerID: "followuntil0",
  238. ContainerName: "logloglog",
  239. })(t)
  240. mm := makeTestMessages()
  241. lw := l.(logger.LogReader).ReadLogs(logger.ReadConfig{Tail: -1, Follow: true, Until: mm[2].Timestamp.Add(-time.Millisecond)})
  242. defer lw.ConsumerGone()
  243. doneReading := make(chan struct{})
  244. var logs []*logger.Message
  245. go func() {
  246. defer close(doneReading)
  247. logs = readAll(t, lw)
  248. }()
  249. expected := logMessages(t, l, mm)[:2]
  250. defer assert.NilError(t, l.Close()) // Reading should end before the logger is closed.
  251. <-doneReading
  252. assert.DeepEqual(t, logs, expected, compareLog)
  253. })
  254. t.Run("SinceAndUntil", func(t *testing.T) {
  255. t.Parallel()
  256. l := tr.Factory(t, logger.Info{
  257. ContainerID: "followbounded",
  258. ContainerName: "logloglog",
  259. })(t)
  260. mm := makeTestMessages()
  261. lw := l.(logger.LogReader).ReadLogs(logger.ReadConfig{Tail: -1, Follow: true, Since: mm[1].Timestamp.Add(-time.Millisecond), Until: mm[2].Timestamp.Add(-time.Millisecond)})
  262. defer lw.ConsumerGone()
  263. doneReading := make(chan struct{})
  264. var logs []*logger.Message
  265. go func() {
  266. defer close(doneReading)
  267. logs = readAll(t, lw)
  268. }()
  269. expected := logMessages(t, l, mm)[1:2]
  270. defer assert.NilError(t, l.Close()) // Reading should end before the logger is closed.
  271. <-doneReading
  272. assert.DeepEqual(t, logs, expected, compareLog)
  273. })
  274. t.Run("Tail=0", func(t *testing.T) {
  275. t.Parallel()
  276. l := tr.Factory(t, logger.Info{
  277. ContainerID: "followtail00",
  278. ContainerName: "logloglog",
  279. })(t)
  280. mm := makeTestMessages()
  281. logMessages(t, l, mm[0:2])
  282. syncLogger(t, l)
  283. lw := l.(logger.LogReader).ReadLogs(logger.ReadConfig{Tail: 0, Follow: true})
  284. defer lw.ConsumerGone()
  285. doneReading := make(chan struct{})
  286. var logs []*logger.Message
  287. go func() {
  288. defer close(doneReading)
  289. logs = readAll(t, lw)
  290. }()
  291. expected := logMessages(t, l, mm[2:])
  292. assert.NilError(t, l.Close())
  293. <-doneReading
  294. assert.DeepEqual(t, logs, expected, compareLog)
  295. })
  296. t.Run("Tail>0", func(t *testing.T) {
  297. t.Parallel()
  298. l := tr.Factory(t, logger.Info{
  299. ContainerID: "followtail00",
  300. ContainerName: "logloglog",
  301. })(t)
  302. mm := makeTestMessages()
  303. expected := logMessages(t, l, mm[0:2])[1:]
  304. syncLogger(t, l)
  305. lw := l.(logger.LogReader).ReadLogs(logger.ReadConfig{Tail: 1, Follow: true})
  306. defer lw.ConsumerGone()
  307. doneReading := make(chan struct{})
  308. var logs []*logger.Message
  309. go func() {
  310. defer close(doneReading)
  311. logs = readAll(t, lw)
  312. }()
  313. expected = append(expected, logMessages(t, l, mm[2:])...)
  314. assert.NilError(t, l.Close())
  315. <-doneReading
  316. assert.DeepEqual(t, logs, expected, compareLog)
  317. })
  318. t.Run("MultipleStarts", func(t *testing.T) {
  319. t.Parallel()
  320. factory := tr.Factory(t, logger.Info{
  321. ContainerID: "startrestart",
  322. ContainerName: "startmeup",
  323. })
  324. mm := makeTestMessages()
  325. l := factory(t)
  326. expected := logMessages(t, l, mm[:3])
  327. assert.NilError(t, l.Close())
  328. l = factory(t)
  329. lw := l.(logger.LogReader).ReadLogs(logger.ReadConfig{Tail: -1, Follow: true})
  330. defer lw.ConsumerGone()
  331. doneReading := make(chan struct{})
  332. var logs []*logger.Message
  333. go func() {
  334. defer close(doneReading)
  335. logs = readAll(t, lw)
  336. }()
  337. expected = append(expected, logMessages(t, l, mm[3:])...)
  338. assert.NilError(t, l.Close())
  339. <-doneReading
  340. assert.DeepEqual(t, logs, expected, compareLog)
  341. })
  342. t.Run("Concurrent", tr.TestConcurrent)
  343. }
  344. // TestConcurrent tests the Logger and its LogReader implementation for
  345. // race conditions when logging from multiple goroutines concurrently.
  346. func (tr Reader) TestConcurrent(t *testing.T) {
  347. t.Parallel()
  348. l := tr.Factory(t, logger.Info{
  349. ContainerID: "logconcurrent0",
  350. ContainerName: "logconcurrent123",
  351. })(t)
  352. // Split test messages
  353. stderrMessages := []*logger.Message{}
  354. stdoutMessages := []*logger.Message{}
  355. for _, m := range makeTestMessages() {
  356. if m.Source == "stdout" {
  357. stdoutMessages = append(stdoutMessages, m)
  358. } else if m.Source == "stderr" {
  359. stderrMessages = append(stderrMessages, m)
  360. }
  361. }
  362. // Follow all logs
  363. lw := l.(logger.LogReader).ReadLogs(logger.ReadConfig{Follow: true, Tail: -1})
  364. defer lw.ConsumerGone()
  365. // Log concurrently from two sources and close log
  366. wg := &sync.WaitGroup{}
  367. logAll := func(msgs []*logger.Message) {
  368. defer wg.Done()
  369. for _, m := range msgs {
  370. l.Log(copyLogMessage(m))
  371. }
  372. }
  373. closed := make(chan struct{})
  374. wg.Add(2)
  375. go logAll(stdoutMessages)
  376. go logAll(stderrMessages)
  377. go func() {
  378. defer close(closed)
  379. defer l.Close()
  380. wg.Wait()
  381. }()
  382. // Check if the message count, order and content is equal to what was logged
  383. for {
  384. l := readMessage(t, lw)
  385. if l == nil {
  386. break
  387. }
  388. var messages *[]*logger.Message
  389. if l.Source == "stdout" {
  390. messages = &stdoutMessages
  391. } else if l.Source == "stderr" {
  392. messages = &stderrMessages
  393. } else {
  394. t.Fatalf("Corrupted message.Source = %q", l.Source)
  395. }
  396. expectedMsg := transformToExpected((*messages)[0])
  397. assert.DeepEqual(t, *expectedMsg, *l, compareLog)
  398. *messages = (*messages)[1:]
  399. }
  400. assert.Equal(t, len(stdoutMessages), 0)
  401. assert.Equal(t, len(stderrMessages), 0)
  402. // Make sure log gets closed before we return
  403. // so the temporary dir can be deleted
  404. <-closed
  405. }
  406. // logMessages logs messages to l and returns a slice of messages as would be
  407. // expected to be read back. The message values are not modified and the
  408. // returned slice of messages are deep-copied.
  409. func logMessages(t *testing.T, l logger.Logger, messages []*logger.Message) []*logger.Message {
  410. t.Helper()
  411. var expected []*logger.Message
  412. for _, m := range messages {
  413. // Copy the log message because the underlying log writer resets
  414. // the log message and returns it to a buffer pool.
  415. assert.NilError(t, l.Log(copyLogMessage(m)))
  416. runtime.Gosched()
  417. expect := transformToExpected(m)
  418. expected = append(expected, expect)
  419. }
  420. return expected
  421. }
  422. // Existing API consumers expect a newline to be appended to
  423. // messages other than nonterminal partials as that matches the
  424. // existing behavior of the json-file log driver.
  425. func transformToExpected(m *logger.Message) *logger.Message {
  426. // Copy the log message again so as not to mutate the input.
  427. copy := copyLogMessage(m)
  428. if m.PLogMetaData == nil || m.PLogMetaData.Last {
  429. copy.Line = append(copy.Line, '\n')
  430. }
  431. return copy
  432. }
  433. func copyLogMessage(src *logger.Message) *logger.Message {
  434. dst := logger.NewMessage()
  435. dst.Source = src.Source
  436. dst.Timestamp = src.Timestamp
  437. dst.Attrs = src.Attrs
  438. dst.Err = src.Err
  439. dst.Line = append(dst.Line, src.Line...)
  440. if src.PLogMetaData != nil {
  441. lmd := *src.PLogMetaData
  442. dst.PLogMetaData = &lmd
  443. }
  444. return dst
  445. }
  446. func readMessage(t *testing.T, lw *logger.LogWatcher) *logger.Message {
  447. t.Helper()
  448. timeout := time.NewTimer(5 * time.Second)
  449. defer timeout.Stop()
  450. select {
  451. case <-timeout.C:
  452. t.Error("timed out waiting for message")
  453. return nil
  454. case err, open := <-lw.Err:
  455. t.Errorf("unexpected receive on lw.Err: err=%v, open=%v", err, open)
  456. return nil
  457. case msg, open := <-lw.Msg:
  458. if !open {
  459. select {
  460. case err, open := <-lw.Err:
  461. t.Errorf("unexpected receive on lw.Err with closed lw.Msg: err=%v, open=%v", err, open)
  462. default:
  463. }
  464. return nil
  465. }
  466. assert.Assert(t, msg != nil)
  467. t.Logf("[%v] %s: %s", msg.Timestamp, msg.Source, msg.Line)
  468. return msg
  469. }
  470. }
  471. func readAll(t *testing.T, lw *logger.LogWatcher) []*logger.Message {
  472. t.Helper()
  473. var msgs []*logger.Message
  474. for {
  475. m := readMessage(t, lw)
  476. if m == nil {
  477. return msgs
  478. }
  479. msgs = append(msgs, m)
  480. }
  481. }