copier_test.go 8.1 KB


  1. package logger
  2. import (
  3. "bytes"
  4. "encoding/json"
  5. "fmt"
  6. "io"
  7. "os"
  8. "strings"
  9. "sync"
  10. "testing"
  11. "time"
  12. )
  13. type TestLoggerJSON struct {
  14. *json.Encoder
  15. mu sync.Mutex
  16. delay time.Duration
  17. }
  18. func (l *TestLoggerJSON) Log(m *Message) error {
  19. if l.delay > 0 {
  20. time.Sleep(l.delay)
  21. }
  22. l.mu.Lock()
  23. defer l.mu.Unlock()
  24. return l.Encode(m)
  25. }
  26. func (l *TestLoggerJSON) Close() error { return nil }
  27. func (l *TestLoggerJSON) Name() string { return "json" }
  28. type TestSizedLoggerJSON struct {
  29. *json.Encoder
  30. mu sync.Mutex
  31. }
  32. func (l *TestSizedLoggerJSON) Log(m *Message) error {
  33. l.mu.Lock()
  34. defer l.mu.Unlock()
  35. return l.Encode(m)
  36. }
  37. func (*TestSizedLoggerJSON) Close() error { return nil }
  38. func (*TestSizedLoggerJSON) Name() string { return "sized-json" }
  39. func (*TestSizedLoggerJSON) BufSize() int {
  40. return 32 * 1024
  41. }
  42. func TestCopier(t *testing.T) {
  43. stdoutLine := "Line that thinks that it is log line from docker stdout"
  44. stderrLine := "Line that thinks that it is log line from docker stderr"
  45. stdoutTrailingLine := "stdout trailing line"
  46. stderrTrailingLine := "stderr trailing line"
  47. var stdout bytes.Buffer
  48. var stderr bytes.Buffer
  49. for i := 0; i < 30; i++ {
  50. if _, err := stdout.WriteString(stdoutLine + "\n"); err != nil {
  51. t.Fatal(err)
  52. }
  53. if _, err := stderr.WriteString(stderrLine + "\n"); err != nil {
  54. t.Fatal(err)
  55. }
  56. }
  57. // Test remaining lines without line-endings
  58. if _, err := stdout.WriteString(stdoutTrailingLine); err != nil {
  59. t.Fatal(err)
  60. }
  61. if _, err := stderr.WriteString(stderrTrailingLine); err != nil {
  62. t.Fatal(err)
  63. }
  64. var jsonBuf bytes.Buffer
  65. jsonLog := &TestLoggerJSON{Encoder: json.NewEncoder(&jsonBuf)}
  66. c := NewCopier(
  67. map[string]io.Reader{
  68. "stdout": &stdout,
  69. "stderr": &stderr,
  70. },
  71. jsonLog)
  72. c.Run()
  73. wait := make(chan struct{})
  74. go func() {
  75. c.Wait()
  76. close(wait)
  77. }()
  78. select {
  79. case <-time.After(1 * time.Second):
  80. t.Fatal("Copier failed to do its work in 1 second")
  81. case <-wait:
  82. }
  83. dec := json.NewDecoder(&jsonBuf)
  84. for {
  85. var msg Message
  86. if err := dec.Decode(&msg); err != nil {
  87. if err == io.EOF {
  88. break
  89. }
  90. t.Fatal(err)
  91. }
  92. if msg.Source != "stdout" && msg.Source != "stderr" {
  93. t.Fatalf("Wrong Source: %q, should be %q or %q", msg.Source, "stdout", "stderr")
  94. }
  95. if msg.Source == "stdout" {
  96. if string(msg.Line) != stdoutLine && string(msg.Line) != stdoutTrailingLine {
  97. t.Fatalf("Wrong Line: %q, expected %q or %q", msg.Line, stdoutLine, stdoutTrailingLine)
  98. }
  99. }
  100. if msg.Source == "stderr" {
  101. if string(msg.Line) != stderrLine && string(msg.Line) != stderrTrailingLine {
  102. t.Fatalf("Wrong Line: %q, expected %q or %q", msg.Line, stderrLine, stderrTrailingLine)
  103. }
  104. }
  105. }
  106. }
  107. // TestCopierLongLines tests long lines without line breaks
  108. func TestCopierLongLines(t *testing.T) {
  109. // Long lines (should be split at "defaultBufSize")
  110. stdoutLongLine := strings.Repeat("a", defaultBufSize)
  111. stderrLongLine := strings.Repeat("b", defaultBufSize)
  112. stdoutTrailingLine := "stdout trailing line"
  113. stderrTrailingLine := "stderr trailing line"
  114. var stdout bytes.Buffer
  115. var stderr bytes.Buffer
  116. for i := 0; i < 3; i++ {
  117. if _, err := stdout.WriteString(stdoutLongLine); err != nil {
  118. t.Fatal(err)
  119. }
  120. if _, err := stderr.WriteString(stderrLongLine); err != nil {
  121. t.Fatal(err)
  122. }
  123. }
  124. if _, err := stdout.WriteString(stdoutTrailingLine); err != nil {
  125. t.Fatal(err)
  126. }
  127. if _, err := stderr.WriteString(stderrTrailingLine); err != nil {
  128. t.Fatal(err)
  129. }
  130. var jsonBuf bytes.Buffer
  131. jsonLog := &TestLoggerJSON{Encoder: json.NewEncoder(&jsonBuf)}
  132. c := NewCopier(
  133. map[string]io.Reader{
  134. "stdout": &stdout,
  135. "stderr": &stderr,
  136. },
  137. jsonLog)
  138. c.Run()
  139. wait := make(chan struct{})
  140. go func() {
  141. c.Wait()
  142. close(wait)
  143. }()
  144. select {
  145. case <-time.After(1 * time.Second):
  146. t.Fatal("Copier failed to do its work in 1 second")
  147. case <-wait:
  148. }
  149. dec := json.NewDecoder(&jsonBuf)
  150. for {
  151. var msg Message
  152. if err := dec.Decode(&msg); err != nil {
  153. if err == io.EOF {
  154. break
  155. }
  156. t.Fatal(err)
  157. }
  158. if msg.Source != "stdout" && msg.Source != "stderr" {
  159. t.Fatalf("Wrong Source: %q, should be %q or %q", msg.Source, "stdout", "stderr")
  160. }
  161. if msg.Source == "stdout" {
  162. if string(msg.Line) != stdoutLongLine && string(msg.Line) != stdoutTrailingLine {
  163. t.Fatalf("Wrong Line: %q, expected 'stdoutLongLine' or 'stdoutTrailingLine'", msg.Line)
  164. }
  165. }
  166. if msg.Source == "stderr" {
  167. if string(msg.Line) != stderrLongLine && string(msg.Line) != stderrTrailingLine {
  168. t.Fatalf("Wrong Line: %q, expected 'stderrLongLine' or 'stderrTrailingLine'", msg.Line)
  169. }
  170. }
  171. }
  172. }
  173. func TestCopierSlow(t *testing.T) {
  174. stdoutLine := "Line that thinks that it is log line from docker stdout"
  175. var stdout bytes.Buffer
  176. for i := 0; i < 30; i++ {
  177. if _, err := stdout.WriteString(stdoutLine + "\n"); err != nil {
  178. t.Fatal(err)
  179. }
  180. }
  181. var jsonBuf bytes.Buffer
  182. //encoder := &encodeCloser{Encoder: json.NewEncoder(&jsonBuf)}
  183. jsonLog := &TestLoggerJSON{Encoder: json.NewEncoder(&jsonBuf), delay: 100 * time.Millisecond}
  184. c := NewCopier(map[string]io.Reader{"stdout": &stdout}, jsonLog)
  185. c.Run()
  186. wait := make(chan struct{})
  187. go func() {
  188. c.Wait()
  189. close(wait)
  190. }()
  191. <-time.After(150 * time.Millisecond)
  192. c.Close()
  193. select {
  194. case <-time.After(200 * time.Millisecond):
  195. t.Fatal("failed to exit in time after the copier is closed")
  196. case <-wait:
  197. }
  198. }
  199. func TestCopierWithSized(t *testing.T) {
  200. var jsonBuf bytes.Buffer
  201. expectedMsgs := 2
  202. sizedLogger := &TestSizedLoggerJSON{Encoder: json.NewEncoder(&jsonBuf)}
  203. logbuf := bytes.NewBufferString(strings.Repeat(".", sizedLogger.BufSize()*expectedMsgs))
  204. c := NewCopier(map[string]io.Reader{"stdout": logbuf}, sizedLogger)
  205. c.Run()
  206. // Wait for Copier to finish writing to the buffered logger.
  207. c.Wait()
  208. c.Close()
  209. recvdMsgs := 0
  210. dec := json.NewDecoder(&jsonBuf)
  211. for {
  212. var msg Message
  213. if err := dec.Decode(&msg); err != nil {
  214. if err == io.EOF {
  215. break
  216. }
  217. t.Fatal(err)
  218. }
  219. if msg.Source != "stdout" {
  220. t.Fatalf("Wrong Source: %q, should be %q", msg.Source, "stdout")
  221. }
  222. if len(msg.Line) != sizedLogger.BufSize() {
  223. t.Fatalf("Line was not of expected max length %d, was %d", sizedLogger.BufSize(), len(msg.Line))
  224. }
  225. recvdMsgs++
  226. }
  227. if recvdMsgs != expectedMsgs {
  228. t.Fatalf("expected to receive %d messages, actually received %d", expectedMsgs, recvdMsgs)
  229. }
  230. }
  231. type BenchmarkLoggerDummy struct {
  232. }
  233. func (l *BenchmarkLoggerDummy) Log(m *Message) error { PutMessage(m); return nil }
  234. func (l *BenchmarkLoggerDummy) Close() error { return nil }
  235. func (l *BenchmarkLoggerDummy) Name() string { return "dummy" }
  236. func BenchmarkCopier64(b *testing.B) {
  237. benchmarkCopier(b, 1<<6)
  238. }
  239. func BenchmarkCopier128(b *testing.B) {
  240. benchmarkCopier(b, 1<<7)
  241. }
  242. func BenchmarkCopier256(b *testing.B) {
  243. benchmarkCopier(b, 1<<8)
  244. }
  245. func BenchmarkCopier512(b *testing.B) {
  246. benchmarkCopier(b, 1<<9)
  247. }
  248. func BenchmarkCopier1K(b *testing.B) {
  249. benchmarkCopier(b, 1<<10)
  250. }
  251. func BenchmarkCopier2K(b *testing.B) {
  252. benchmarkCopier(b, 1<<11)
  253. }
  254. func BenchmarkCopier4K(b *testing.B) {
  255. benchmarkCopier(b, 1<<12)
  256. }
  257. func BenchmarkCopier8K(b *testing.B) {
  258. benchmarkCopier(b, 1<<13)
  259. }
  260. func BenchmarkCopier16K(b *testing.B) {
  261. benchmarkCopier(b, 1<<14)
  262. }
  263. func BenchmarkCopier32K(b *testing.B) {
  264. benchmarkCopier(b, 1<<15)
  265. }
  266. func BenchmarkCopier64K(b *testing.B) {
  267. benchmarkCopier(b, 1<<16)
  268. }
  269. func BenchmarkCopier128K(b *testing.B) {
  270. benchmarkCopier(b, 1<<17)
  271. }
  272. func BenchmarkCopier256K(b *testing.B) {
  273. benchmarkCopier(b, 1<<18)
  274. }
  275. func piped(b *testing.B, iterations int, delay time.Duration, buf []byte) io.Reader {
  276. r, w, err := os.Pipe()
  277. if err != nil {
  278. b.Fatal(err)
  279. return nil
  280. }
  281. go func() {
  282. for i := 0; i < iterations; i++ {
  283. time.Sleep(delay)
  284. if n, err := w.Write(buf); err != nil || n != len(buf) {
  285. if err != nil {
  286. b.Fatal(err)
  287. }
  288. b.Fatal(fmt.Errorf("short write"))
  289. }
  290. }
  291. w.Close()
  292. }()
  293. return r
  294. }
  295. func benchmarkCopier(b *testing.B, length int) {
  296. b.StopTimer()
  297. buf := []byte{'A'}
  298. for len(buf) < length {
  299. buf = append(buf, buf...)
  300. }
  301. buf = append(buf[:length-1], []byte{'\n'}...)
  302. b.StartTimer()
  303. for i := 0; i < b.N; i++ {
  304. c := NewCopier(
  305. map[string]io.Reader{
  306. "buffer": piped(b, 10, time.Nanosecond, buf),
  307. },
  308. &BenchmarkLoggerDummy{})
  309. c.Run()
  310. c.Wait()
  311. c.Close()
  312. }
  313. }