local_test.go 6.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220
  1. package local
  2. import (
  3. "context"
  4. "encoding/binary"
  5. "io/ioutil"
  6. "os"
  7. "path/filepath"
  8. "testing"
  9. "time"
  10. "bytes"
  11. "fmt"
  12. "strings"
  13. "io"
  14. "github.com/docker/docker/api/types/backend"
  15. "github.com/docker/docker/api/types/plugins/logdriver"
  16. "github.com/docker/docker/daemon/logger"
  17. protoio "github.com/gogo/protobuf/io"
  18. "gotest.tools/v3/assert"
  19. is "gotest.tools/v3/assert/cmp"
  20. )
  21. func TestWriteLog(t *testing.T) {
  22. t.Parallel()
  23. dir, err := ioutil.TempDir("", t.Name())
  24. assert.NilError(t, err)
  25. defer os.RemoveAll(dir)
  26. logPath := filepath.Join(dir, "test.log")
  27. l, err := New(logger.Info{LogPath: logPath})
  28. assert.NilError(t, err)
  29. defer l.Close()
  30. m1 := logger.Message{Source: "stdout", Timestamp: time.Now().Add(-1 * 30 * time.Minute), Line: []byte("message 1")}
  31. m2 := logger.Message{Source: "stdout", Timestamp: time.Now().Add(-1 * 20 * time.Minute), Line: []byte("message 2"), PLogMetaData: &backend.PartialLogMetaData{Last: true, ID: "0001", Ordinal: 1}}
  32. m3 := logger.Message{Source: "stderr", Timestamp: time.Now().Add(-1 * 10 * time.Minute), Line: []byte("message 3")}
  33. // copy the log message because the underying log writer resets the log message and returns it to a buffer pool
  34. err = l.Log(copyLogMessage(&m1))
  35. assert.NilError(t, err)
  36. err = l.Log(copyLogMessage(&m2))
  37. assert.NilError(t, err)
  38. err = l.Log(copyLogMessage(&m3))
  39. assert.NilError(t, err)
  40. f, err := os.Open(logPath)
  41. assert.NilError(t, err)
  42. defer f.Close()
  43. dec := protoio.NewUint32DelimitedReader(f, binary.BigEndian, 1e6)
  44. var (
  45. proto logdriver.LogEntry
  46. testProto logdriver.LogEntry
  47. partial logdriver.PartialLogEntryMetadata
  48. )
  49. lenBuf := make([]byte, encodeBinaryLen)
  50. seekMsgLen := func() {
  51. io.ReadFull(f, lenBuf)
  52. }
  53. err = dec.ReadMsg(&proto)
  54. assert.NilError(t, err)
  55. messageToProto(&m1, &testProto, &partial)
  56. assert.Check(t, is.DeepEqual(testProto, proto), "expected:\n%+v\ngot:\n%+v", testProto, proto)
  57. seekMsgLen()
  58. err = dec.ReadMsg(&proto)
  59. assert.NilError(t, err)
  60. messageToProto(&m2, &testProto, &partial)
  61. assert.Check(t, is.DeepEqual(testProto, proto))
  62. seekMsgLen()
  63. err = dec.ReadMsg(&proto)
  64. assert.NilError(t, err)
  65. messageToProto(&m3, &testProto, &partial)
  66. assert.Check(t, is.DeepEqual(testProto, proto), "expected:\n%+v\ngot:\n%+v", testProto, proto)
  67. }
  68. func TestReadLog(t *testing.T) {
  69. t.Parallel()
  70. dir, err := ioutil.TempDir("", t.Name())
  71. assert.NilError(t, err)
  72. defer os.RemoveAll(dir)
  73. logPath := filepath.Join(dir, "test.log")
  74. l, err := New(logger.Info{LogPath: logPath})
  75. assert.NilError(t, err)
  76. defer l.Close()
  77. m1 := logger.Message{Source: "stdout", Timestamp: time.Now().Add(-1 * 30 * time.Minute), Line: []byte("a message")}
  78. m2 := logger.Message{Source: "stdout", Timestamp: time.Now().Add(-1 * 20 * time.Minute), Line: []byte("another message"), PLogMetaData: &backend.PartialLogMetaData{Ordinal: 1, Last: true}}
  79. longMessage := []byte("a really long message " + strings.Repeat("a", initialBufSize*2))
  80. m3 := logger.Message{Source: "stderr", Timestamp: time.Now().Add(-1 * 10 * time.Minute), Line: longMessage}
  81. m4 := logger.Message{Source: "stderr", Timestamp: time.Now().Add(-1 * 10 * time.Minute), Line: []byte("just one more message")}
  82. // copy the log message because the underlying log writer resets the log message and returns it to a buffer pool
  83. err = l.Log(copyLogMessage(&m1))
  84. assert.NilError(t, err)
  85. err = l.Log(copyLogMessage(&m2))
  86. assert.NilError(t, err)
  87. err = l.Log(copyLogMessage(&m3))
  88. assert.NilError(t, err)
  89. err = l.Log(copyLogMessage(&m4))
  90. assert.NilError(t, err)
  91. lr := l.(logger.LogReader)
  92. testMessage := func(t *testing.T, lw *logger.LogWatcher, m *logger.Message) {
  93. t.Helper()
  94. ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
  95. defer cancel()
  96. select {
  97. case <-ctx.Done():
  98. assert.Assert(t, ctx.Err())
  99. case err := <-lw.Err:
  100. assert.NilError(t, err)
  101. case msg, open := <-lw.Msg:
  102. if !open {
  103. select {
  104. case err := <-lw.Err:
  105. assert.NilError(t, err)
  106. default:
  107. assert.Assert(t, m == nil)
  108. return
  109. }
  110. }
  111. assert.Assert(t, m != nil)
  112. if m.PLogMetaData == nil {
  113. // a `\n` is appended on read to make this work with the existing API's when the message is not a partial.
  114. // make sure it's the last entry in the line, and then truncate it for the deep equal below.
  115. assert.Check(t, msg.Line[len(msg.Line)-1] == '\n')
  116. msg.Line = msg.Line[:len(msg.Line)-1]
  117. }
  118. assert.Check(t, is.DeepEqual(m, msg), fmt.Sprintf("\n%+v\n%+v", m, msg))
  119. }
  120. }
  121. t.Run("tail exact", func(t *testing.T) {
  122. lw := lr.ReadLogs(logger.ReadConfig{Tail: 4})
  123. testMessage(t, lw, &m1)
  124. testMessage(t, lw, &m2)
  125. testMessage(t, lw, &m3)
  126. testMessage(t, lw, &m4)
  127. testMessage(t, lw, nil) // no more messages
  128. })
  129. t.Run("tail less than available", func(t *testing.T) {
  130. lw := lr.ReadLogs(logger.ReadConfig{Tail: 2})
  131. testMessage(t, lw, &m3)
  132. testMessage(t, lw, &m4)
  133. testMessage(t, lw, nil) // no more messages
  134. })
  135. t.Run("tail more than available", func(t *testing.T) {
  136. lw := lr.ReadLogs(logger.ReadConfig{Tail: 100})
  137. testMessage(t, lw, &m1)
  138. testMessage(t, lw, &m2)
  139. testMessage(t, lw, &m3)
  140. testMessage(t, lw, &m4)
  141. testMessage(t, lw, nil) // no more messages
  142. })
  143. }
  144. func BenchmarkLogWrite(b *testing.B) {
  145. f, err := ioutil.TempFile("", b.Name())
  146. assert.Assert(b, err)
  147. defer os.Remove(f.Name())
  148. f.Close()
  149. local, err := New(logger.Info{LogPath: f.Name()})
  150. assert.Assert(b, err)
  151. defer local.Close()
  152. t := time.Now().UTC()
  153. for _, data := range [][]byte{
  154. []byte(""),
  155. []byte("a short string"),
  156. bytes.Repeat([]byte("a long string"), 100),
  157. bytes.Repeat([]byte("a really long string"), 10000),
  158. } {
  159. b.Run(fmt.Sprintf("%d", len(data)), func(b *testing.B) {
  160. entry := &logdriver.LogEntry{Line: data, Source: "stdout", TimeNano: t.UnixNano()}
  161. b.SetBytes(int64(entry.Size() + encodeBinaryLen + encodeBinaryLen))
  162. b.ResetTimer()
  163. for i := 0; i < b.N; i++ {
  164. msg := logger.NewMessage()
  165. msg.Line = data
  166. msg.Timestamp = t
  167. msg.Source = "stdout"
  168. if err := local.Log(msg); err != nil {
  169. b.Fatal(err)
  170. }
  171. }
  172. })
  173. }
  174. }
  175. func copyLogMessage(src *logger.Message) *logger.Message {
  176. dst := logger.NewMessage()
  177. dst.Source = src.Source
  178. dst.Timestamp = src.Timestamp
  179. dst.Attrs = src.Attrs
  180. dst.Err = src.Err
  181. dst.Line = append(dst.Line, src.Line...)
  182. if src.PLogMetaData != nil {
  183. dst.PLogMetaData = &(*src.PLogMetaData)
  184. }
  185. return dst
  186. }