local_test.go 6.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217
  1. package local
  2. import (
  3. "bytes"
  4. "context"
  5. "encoding/binary"
  6. "fmt"
  7. "io"
  8. "os"
  9. "path/filepath"
  10. "strings"
  11. "testing"
  12. "time"
  13. "github.com/docker/docker/api/types/backend"
  14. "github.com/docker/docker/api/types/plugins/logdriver"
  15. "github.com/docker/docker/daemon/logger"
  16. protoio "github.com/gogo/protobuf/io"
  17. "gotest.tools/v3/assert"
  18. is "gotest.tools/v3/assert/cmp"
  19. )
  20. func TestWriteLog(t *testing.T) {
  21. t.Parallel()
  22. dir, err := os.MkdirTemp("", t.Name())
  23. assert.NilError(t, err)
  24. defer os.RemoveAll(dir)
  25. logPath := filepath.Join(dir, "test.log")
  26. l, err := New(logger.Info{LogPath: logPath})
  27. assert.NilError(t, err)
  28. defer l.Close()
  29. m1 := logger.Message{Source: "stdout", Timestamp: time.Now().Add(-1 * 30 * time.Minute), Line: []byte("message 1")}
  30. m2 := logger.Message{Source: "stdout", Timestamp: time.Now().Add(-1 * 20 * time.Minute), Line: []byte("message 2"), PLogMetaData: &backend.PartialLogMetaData{Last: true, ID: "0001", Ordinal: 1}}
  31. m3 := logger.Message{Source: "stderr", Timestamp: time.Now().Add(-1 * 10 * time.Minute), Line: []byte("message 3")}
  32. // copy the log message because the underying log writer resets the log message and returns it to a buffer pool
  33. err = l.Log(copyLogMessage(&m1))
  34. assert.NilError(t, err)
  35. err = l.Log(copyLogMessage(&m2))
  36. assert.NilError(t, err)
  37. err = l.Log(copyLogMessage(&m3))
  38. assert.NilError(t, err)
  39. f, err := os.Open(logPath)
  40. assert.NilError(t, err)
  41. defer f.Close()
  42. dec := protoio.NewUint32DelimitedReader(f, binary.BigEndian, 1e6)
  43. var (
  44. proto logdriver.LogEntry
  45. testProto logdriver.LogEntry
  46. partial logdriver.PartialLogEntryMetadata
  47. )
  48. lenBuf := make([]byte, encodeBinaryLen)
  49. seekMsgLen := func() {
  50. io.ReadFull(f, lenBuf)
  51. }
  52. err = dec.ReadMsg(&proto)
  53. assert.NilError(t, err)
  54. messageToProto(&m1, &testProto, &partial)
  55. assert.Check(t, is.DeepEqual(testProto, proto), "expected:\n%+v\ngot:\n%+v", testProto, proto)
  56. seekMsgLen()
  57. err = dec.ReadMsg(&proto)
  58. assert.NilError(t, err)
  59. messageToProto(&m2, &testProto, &partial)
  60. assert.Check(t, is.DeepEqual(testProto, proto))
  61. seekMsgLen()
  62. err = dec.ReadMsg(&proto)
  63. assert.NilError(t, err)
  64. messageToProto(&m3, &testProto, &partial)
  65. assert.Check(t, is.DeepEqual(testProto, proto), "expected:\n%+v\ngot:\n%+v", testProto, proto)
  66. }
  67. func TestReadLog(t *testing.T) {
  68. t.Parallel()
  69. dir, err := os.MkdirTemp("", t.Name())
  70. assert.NilError(t, err)
  71. defer os.RemoveAll(dir)
  72. logPath := filepath.Join(dir, "test.log")
  73. l, err := New(logger.Info{LogPath: logPath})
  74. assert.NilError(t, err)
  75. defer l.Close()
  76. m1 := logger.Message{Source: "stdout", Timestamp: time.Now().Add(-1 * 30 * time.Minute), Line: []byte("a message")}
  77. m2 := logger.Message{Source: "stdout", Timestamp: time.Now().Add(-1 * 20 * time.Minute), Line: []byte("another message"), PLogMetaData: &backend.PartialLogMetaData{Ordinal: 1, Last: true}}
  78. longMessage := []byte("a really long message " + strings.Repeat("a", initialBufSize*2))
  79. m3 := logger.Message{Source: "stderr", Timestamp: time.Now().Add(-1 * 10 * time.Minute), Line: longMessage}
  80. m4 := logger.Message{Source: "stderr", Timestamp: time.Now().Add(-1 * 10 * time.Minute), Line: []byte("just one more message")}
  81. // copy the log message because the underlying log writer resets the log message and returns it to a buffer pool
  82. err = l.Log(copyLogMessage(&m1))
  83. assert.NilError(t, err)
  84. err = l.Log(copyLogMessage(&m2))
  85. assert.NilError(t, err)
  86. err = l.Log(copyLogMessage(&m3))
  87. assert.NilError(t, err)
  88. err = l.Log(copyLogMessage(&m4))
  89. assert.NilError(t, err)
  90. lr := l.(logger.LogReader)
  91. testMessage := func(t *testing.T, lw *logger.LogWatcher, m *logger.Message) {
  92. t.Helper()
  93. ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
  94. defer cancel()
  95. select {
  96. case <-ctx.Done():
  97. assert.Assert(t, ctx.Err())
  98. case err := <-lw.Err:
  99. assert.NilError(t, err)
  100. case msg, open := <-lw.Msg:
  101. if !open {
  102. select {
  103. case err := <-lw.Err:
  104. assert.NilError(t, err)
  105. default:
  106. assert.Assert(t, m == nil)
  107. return
  108. }
  109. }
  110. assert.Assert(t, m != nil)
  111. if m.PLogMetaData == nil {
  112. // a `\n` is appended on read to make this work with the existing API's when the message is not a partial.
  113. // make sure it's the last entry in the line, and then truncate it for the deep equal below.
  114. assert.Check(t, msg.Line[len(msg.Line)-1] == '\n')
  115. msg.Line = msg.Line[:len(msg.Line)-1]
  116. }
  117. assert.Check(t, is.DeepEqual(m, msg), fmt.Sprintf("\n%+v\n%+v", m, msg))
  118. }
  119. }
  120. t.Run("tail exact", func(t *testing.T) {
  121. lw := lr.ReadLogs(logger.ReadConfig{Tail: 4})
  122. testMessage(t, lw, &m1)
  123. testMessage(t, lw, &m2)
  124. testMessage(t, lw, &m3)
  125. testMessage(t, lw, &m4)
  126. testMessage(t, lw, nil) // no more messages
  127. })
  128. t.Run("tail less than available", func(t *testing.T) {
  129. lw := lr.ReadLogs(logger.ReadConfig{Tail: 2})
  130. testMessage(t, lw, &m3)
  131. testMessage(t, lw, &m4)
  132. testMessage(t, lw, nil) // no more messages
  133. })
  134. t.Run("tail more than available", func(t *testing.T) {
  135. lw := lr.ReadLogs(logger.ReadConfig{Tail: 100})
  136. testMessage(t, lw, &m1)
  137. testMessage(t, lw, &m2)
  138. testMessage(t, lw, &m3)
  139. testMessage(t, lw, &m4)
  140. testMessage(t, lw, nil) // no more messages
  141. })
  142. }
  143. func BenchmarkLogWrite(b *testing.B) {
  144. f, err := os.CreateTemp("", b.Name())
  145. assert.Assert(b, err)
  146. defer os.Remove(f.Name())
  147. f.Close()
  148. local, err := New(logger.Info{LogPath: f.Name()})
  149. assert.Assert(b, err)
  150. defer local.Close()
  151. t := time.Now().UTC()
  152. for _, data := range [][]byte{
  153. []byte(""),
  154. []byte("a short string"),
  155. bytes.Repeat([]byte("a long string"), 100),
  156. bytes.Repeat([]byte("a really long string"), 10000),
  157. } {
  158. b.Run(fmt.Sprintf("%d", len(data)), func(b *testing.B) {
  159. entry := &logdriver.LogEntry{Line: data, Source: "stdout", TimeNano: t.UnixNano()}
  160. b.SetBytes(int64(entry.Size() + encodeBinaryLen + encodeBinaryLen))
  161. b.ResetTimer()
  162. for i := 0; i < b.N; i++ {
  163. msg := logger.NewMessage()
  164. msg.Line = data
  165. msg.Timestamp = t
  166. msg.Source = "stdout"
  167. if err := local.Log(msg); err != nil {
  168. b.Fatal(err)
  169. }
  170. }
  171. })
  172. }
  173. }
  174. func copyLogMessage(src *logger.Message) *logger.Message {
  175. dst := logger.NewMessage()
  176. dst.Source = src.Source
  177. dst.Timestamp = src.Timestamp
  178. dst.Attrs = src.Attrs
  179. dst.Err = src.Err
  180. dst.Line = append(dst.Line, src.Line...)
  181. if src.PLogMetaData != nil {
  182. lmd := *src.PLogMetaData
  183. dst.PLogMetaData = &lmd
  184. }
  185. return dst
  186. }