copier_test.go 6.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296
  1. package logger
  2. import (
  3. "bytes"
  4. "encoding/json"
  5. "fmt"
  6. "io"
  7. "os"
  8. "strings"
  9. "sync"
  10. "testing"
  11. "time"
  12. )
  13. type TestLoggerJSON struct {
  14. *json.Encoder
  15. mu sync.Mutex
  16. delay time.Duration
  17. }
  18. func (l *TestLoggerJSON) Log(m *Message) error {
  19. if l.delay > 0 {
  20. time.Sleep(l.delay)
  21. }
  22. l.mu.Lock()
  23. defer l.mu.Unlock()
  24. return l.Encode(m)
  25. }
  26. func (l *TestLoggerJSON) Close() error { return nil }
  27. func (l *TestLoggerJSON) Name() string { return "json" }
  28. func TestCopier(t *testing.T) {
  29. stdoutLine := "Line that thinks that it is log line from docker stdout"
  30. stderrLine := "Line that thinks that it is log line from docker stderr"
  31. stdoutTrailingLine := "stdout trailing line"
  32. stderrTrailingLine := "stderr trailing line"
  33. var stdout bytes.Buffer
  34. var stderr bytes.Buffer
  35. for i := 0; i < 30; i++ {
  36. if _, err := stdout.WriteString(stdoutLine + "\n"); err != nil {
  37. t.Fatal(err)
  38. }
  39. if _, err := stderr.WriteString(stderrLine + "\n"); err != nil {
  40. t.Fatal(err)
  41. }
  42. }
  43. // Test remaining lines without line-endings
  44. if _, err := stdout.WriteString(stdoutTrailingLine); err != nil {
  45. t.Fatal(err)
  46. }
  47. if _, err := stderr.WriteString(stderrTrailingLine); err != nil {
  48. t.Fatal(err)
  49. }
  50. var jsonBuf bytes.Buffer
  51. jsonLog := &TestLoggerJSON{Encoder: json.NewEncoder(&jsonBuf)}
  52. c := NewCopier(
  53. map[string]io.Reader{
  54. "stdout": &stdout,
  55. "stderr": &stderr,
  56. },
  57. jsonLog)
  58. c.Run()
  59. wait := make(chan struct{})
  60. go func() {
  61. c.Wait()
  62. close(wait)
  63. }()
  64. select {
  65. case <-time.After(1 * time.Second):
  66. t.Fatal("Copier failed to do its work in 1 second")
  67. case <-wait:
  68. }
  69. dec := json.NewDecoder(&jsonBuf)
  70. for {
  71. var msg Message
  72. if err := dec.Decode(&msg); err != nil {
  73. if err == io.EOF {
  74. break
  75. }
  76. t.Fatal(err)
  77. }
  78. if msg.Source != "stdout" && msg.Source != "stderr" {
  79. t.Fatalf("Wrong Source: %q, should be %q or %q", msg.Source, "stdout", "stderr")
  80. }
  81. if msg.Source == "stdout" {
  82. if string(msg.Line) != stdoutLine && string(msg.Line) != stdoutTrailingLine {
  83. t.Fatalf("Wrong Line: %q, expected %q or %q", msg.Line, stdoutLine, stdoutTrailingLine)
  84. }
  85. }
  86. if msg.Source == "stderr" {
  87. if string(msg.Line) != stderrLine && string(msg.Line) != stderrTrailingLine {
  88. t.Fatalf("Wrong Line: %q, expected %q or %q", msg.Line, stderrLine, stderrTrailingLine)
  89. }
  90. }
  91. }
  92. }
  93. // TestCopierLongLines tests long lines without line breaks
  94. func TestCopierLongLines(t *testing.T) {
  95. // Long lines (should be split at "bufSize")
  96. const bufSize = 16 * 1024
  97. stdoutLongLine := strings.Repeat("a", bufSize)
  98. stderrLongLine := strings.Repeat("b", bufSize)
  99. stdoutTrailingLine := "stdout trailing line"
  100. stderrTrailingLine := "stderr trailing line"
  101. var stdout bytes.Buffer
  102. var stderr bytes.Buffer
  103. for i := 0; i < 3; i++ {
  104. if _, err := stdout.WriteString(stdoutLongLine); err != nil {
  105. t.Fatal(err)
  106. }
  107. if _, err := stderr.WriteString(stderrLongLine); err != nil {
  108. t.Fatal(err)
  109. }
  110. }
  111. if _, err := stdout.WriteString(stdoutTrailingLine); err != nil {
  112. t.Fatal(err)
  113. }
  114. if _, err := stderr.WriteString(stderrTrailingLine); err != nil {
  115. t.Fatal(err)
  116. }
  117. var jsonBuf bytes.Buffer
  118. jsonLog := &TestLoggerJSON{Encoder: json.NewEncoder(&jsonBuf)}
  119. c := NewCopier(
  120. map[string]io.Reader{
  121. "stdout": &stdout,
  122. "stderr": &stderr,
  123. },
  124. jsonLog)
  125. c.Run()
  126. wait := make(chan struct{})
  127. go func() {
  128. c.Wait()
  129. close(wait)
  130. }()
  131. select {
  132. case <-time.After(1 * time.Second):
  133. t.Fatal("Copier failed to do its work in 1 second")
  134. case <-wait:
  135. }
  136. dec := json.NewDecoder(&jsonBuf)
  137. for {
  138. var msg Message
  139. if err := dec.Decode(&msg); err != nil {
  140. if err == io.EOF {
  141. break
  142. }
  143. t.Fatal(err)
  144. }
  145. if msg.Source != "stdout" && msg.Source != "stderr" {
  146. t.Fatalf("Wrong Source: %q, should be %q or %q", msg.Source, "stdout", "stderr")
  147. }
  148. if msg.Source == "stdout" {
  149. if string(msg.Line) != stdoutLongLine && string(msg.Line) != stdoutTrailingLine {
  150. t.Fatalf("Wrong Line: %q, expected 'stdoutLongLine' or 'stdoutTrailingLine'", msg.Line)
  151. }
  152. }
  153. if msg.Source == "stderr" {
  154. if string(msg.Line) != stderrLongLine && string(msg.Line) != stderrTrailingLine {
  155. t.Fatalf("Wrong Line: %q, expected 'stderrLongLine' or 'stderrTrailingLine'", msg.Line)
  156. }
  157. }
  158. }
  159. }
  160. func TestCopierSlow(t *testing.T) {
  161. stdoutLine := "Line that thinks that it is log line from docker stdout"
  162. var stdout bytes.Buffer
  163. for i := 0; i < 30; i++ {
  164. if _, err := stdout.WriteString(stdoutLine + "\n"); err != nil {
  165. t.Fatal(err)
  166. }
  167. }
  168. var jsonBuf bytes.Buffer
  169. //encoder := &encodeCloser{Encoder: json.NewEncoder(&jsonBuf)}
  170. jsonLog := &TestLoggerJSON{Encoder: json.NewEncoder(&jsonBuf), delay: 100 * time.Millisecond}
  171. c := NewCopier(map[string]io.Reader{"stdout": &stdout}, jsonLog)
  172. c.Run()
  173. wait := make(chan struct{})
  174. go func() {
  175. c.Wait()
  176. close(wait)
  177. }()
  178. <-time.After(150 * time.Millisecond)
  179. c.Close()
  180. select {
  181. case <-time.After(200 * time.Millisecond):
  182. t.Fatal("failed to exit in time after the copier is closed")
  183. case <-wait:
  184. }
  185. }
  186. type BenchmarkLoggerDummy struct {
  187. }
  188. func (l *BenchmarkLoggerDummy) Log(m *Message) error { PutMessage(m); return nil }
  189. func (l *BenchmarkLoggerDummy) Close() error { return nil }
  190. func (l *BenchmarkLoggerDummy) Name() string { return "dummy" }
  191. func BenchmarkCopier64(b *testing.B) {
  192. benchmarkCopier(b, 1<<6)
  193. }
  194. func BenchmarkCopier128(b *testing.B) {
  195. benchmarkCopier(b, 1<<7)
  196. }
  197. func BenchmarkCopier256(b *testing.B) {
  198. benchmarkCopier(b, 1<<8)
  199. }
  200. func BenchmarkCopier512(b *testing.B) {
  201. benchmarkCopier(b, 1<<9)
  202. }
  203. func BenchmarkCopier1K(b *testing.B) {
  204. benchmarkCopier(b, 1<<10)
  205. }
  206. func BenchmarkCopier2K(b *testing.B) {
  207. benchmarkCopier(b, 1<<11)
  208. }
  209. func BenchmarkCopier4K(b *testing.B) {
  210. benchmarkCopier(b, 1<<12)
  211. }
  212. func BenchmarkCopier8K(b *testing.B) {
  213. benchmarkCopier(b, 1<<13)
  214. }
  215. func BenchmarkCopier16K(b *testing.B) {
  216. benchmarkCopier(b, 1<<14)
  217. }
  218. func BenchmarkCopier32K(b *testing.B) {
  219. benchmarkCopier(b, 1<<15)
  220. }
  221. func BenchmarkCopier64K(b *testing.B) {
  222. benchmarkCopier(b, 1<<16)
  223. }
  224. func BenchmarkCopier128K(b *testing.B) {
  225. benchmarkCopier(b, 1<<17)
  226. }
  227. func BenchmarkCopier256K(b *testing.B) {
  228. benchmarkCopier(b, 1<<18)
  229. }
  230. func piped(b *testing.B, iterations int, delay time.Duration, buf []byte) io.Reader {
  231. r, w, err := os.Pipe()
  232. if err != nil {
  233. b.Fatal(err)
  234. return nil
  235. }
  236. go func() {
  237. for i := 0; i < iterations; i++ {
  238. time.Sleep(delay)
  239. if n, err := w.Write(buf); err != nil || n != len(buf) {
  240. if err != nil {
  241. b.Fatal(err)
  242. }
  243. b.Fatal(fmt.Errorf("short write"))
  244. }
  245. }
  246. w.Close()
  247. }()
  248. return r
  249. }
  250. func benchmarkCopier(b *testing.B, length int) {
  251. b.StopTimer()
  252. buf := []byte{'A'}
  253. for len(buf) < length {
  254. buf = append(buf, buf...)
  255. }
  256. buf = append(buf[:length-1], []byte{'\n'}...)
  257. b.StartTimer()
  258. for i := 0; i < b.N; i++ {
  259. c := NewCopier(
  260. map[string]io.Reader{
  261. "buffer": piped(b, 10, time.Nanosecond, buf),
  262. },
  263. &BenchmarkLoggerDummy{})
  264. c.Run()
  265. c.Wait()
  266. c.Close()
  267. }
  268. }