ring_test.go 7.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298
  1. package logger // import "github.com/docker/docker/daemon/logger"
  2. import (
  3. "context"
  4. "strconv"
  5. "testing"
  6. "time"
  7. )
  8. type mockLogger struct{ c chan *Message }
  9. func (l *mockLogger) Log(msg *Message) error {
  10. l.c <- msg
  11. return nil
  12. }
  13. func (l *mockLogger) Name() string {
  14. return "mock"
  15. }
  16. func (l *mockLogger) Close() error {
  17. return nil
  18. }
  19. func TestRingLogger(t *testing.T) {
  20. mockLog := &mockLogger{make(chan *Message)} // no buffer on this channel
  21. ring := newRingLogger(mockLog, Info{}, 1)
  22. defer ring.setClosed()
  23. // this should never block
  24. ring.Log(&Message{Line: []byte("1")})
  25. ring.Log(&Message{Line: []byte("2")})
  26. ring.Log(&Message{Line: []byte("3")})
  27. select {
  28. case msg := <-mockLog.c:
  29. if string(msg.Line) != "1" {
  30. t.Fatalf("got unexpected msg: %q", string(msg.Line))
  31. }
  32. case <-time.After(100 * time.Millisecond):
  33. t.Fatal("timeout reading log message")
  34. }
  35. select {
  36. case msg := <-mockLog.c:
  37. t.Fatalf("expected no more messages in the queue, got: %q", string(msg.Line))
  38. default:
  39. }
  40. }
  41. func TestRingCap(t *testing.T) {
  42. r := newRing(5)
  43. for i := 0; i < 10; i++ {
  44. // queue messages with "0" to "10"
  45. // the "5" to "10" messages should be dropped since we only allow 5 bytes in the buffer
  46. if err := r.Enqueue(&Message{Line: []byte(strconv.Itoa(i))}); err != nil {
  47. t.Fatal(err)
  48. }
  49. }
  50. // should have messages in the queue for "0" to "4"
  51. for i := 0; i < 5; i++ {
  52. m, err := r.Dequeue()
  53. if err != nil {
  54. t.Fatal(err)
  55. }
  56. if string(m.Line) != strconv.Itoa(i) {
  57. t.Fatalf("got unexpected message for iter %d: %s", i, string(m.Line))
  58. }
  59. }
  60. // queue a message that's bigger than the buffer cap
  61. if err := r.Enqueue(&Message{Line: []byte("hello world")}); err != nil {
  62. t.Fatal(err)
  63. }
  64. // queue another message that's bigger than the buffer cap
  65. if err := r.Enqueue(&Message{Line: []byte("eat a banana")}); err != nil {
  66. t.Fatal(err)
  67. }
  68. m, err := r.Dequeue()
  69. if err != nil {
  70. t.Fatal(err)
  71. }
  72. if string(m.Line) != "hello world" {
  73. t.Fatalf("got unexpected message: %s", string(m.Line))
  74. }
  75. if len(r.queue) != 0 {
  76. t.Fatalf("expected queue to be empty, got: %d", len(r.queue))
  77. }
  78. }
  79. func TestRingClose(t *testing.T) {
  80. r := newRing(1)
  81. if err := r.Enqueue(&Message{Line: []byte("hello")}); err != nil {
  82. t.Fatal(err)
  83. }
  84. r.Close()
  85. if err := r.Enqueue(&Message{}); err != errClosed {
  86. t.Fatalf("expected errClosed, got: %v", err)
  87. }
  88. if len(r.queue) != 1 {
  89. t.Fatal("expected empty queue")
  90. }
  91. if m, err := r.Dequeue(); err == nil || m != nil {
  92. t.Fatal("expected err on Dequeue after close")
  93. }
  94. ls := r.Drain()
  95. if len(ls) != 1 {
  96. t.Fatalf("expected one message: %v", ls)
  97. }
  98. if string(ls[0].Line) != "hello" {
  99. t.Fatalf("got unexpected message: %s", string(ls[0].Line))
  100. }
  101. }
  102. func TestRingDrain(t *testing.T) {
  103. r := newRing(5)
  104. for i := 0; i < 5; i++ {
  105. if err := r.Enqueue(&Message{Line: []byte(strconv.Itoa(i))}); err != nil {
  106. t.Fatal(err)
  107. }
  108. }
  109. ls := r.Drain()
  110. if len(ls) != 5 {
  111. t.Fatal("got unexpected length after drain")
  112. }
  113. for i := 0; i < 5; i++ {
  114. if string(ls[i].Line) != strconv.Itoa(i) {
  115. t.Fatalf("got unexpected message at position %d: %s", i, string(ls[i].Line))
  116. }
  117. }
  118. if r.sizeBytes != 0 {
  119. t.Fatalf("expected buffer size to be 0 after drain, got: %d", r.sizeBytes)
  120. }
  121. ls = r.Drain()
  122. if len(ls) != 0 {
  123. t.Fatalf("expected 0 messages on 2nd drain: %v", ls)
  124. }
  125. }
  126. type nopLogger struct{}
  127. func (nopLogger) Name() string { return "nopLogger" }
  128. func (nopLogger) Close() error { return nil }
  129. func (nopLogger) Log(*Message) error { return nil }
  130. func BenchmarkRingLoggerThroughputNoReceiver(b *testing.B) {
  131. mockLog := &mockLogger{make(chan *Message)}
  132. defer mockLog.Close()
  133. l := NewRingLogger(mockLog, Info{}, -1)
  134. msg := &Message{Line: []byte("hello humans and everyone else!")}
  135. b.SetBytes(int64(len(msg.Line)))
  136. for i := 0; i < b.N; i++ {
  137. if err := l.Log(msg); err != nil {
  138. b.Fatal(err)
  139. }
  140. }
  141. }
  142. func BenchmarkRingLoggerThroughputWithReceiverDelay0(b *testing.B) {
  143. l := NewRingLogger(nopLogger{}, Info{}, -1)
  144. msg := &Message{Line: []byte("hello humans and everyone else!")}
  145. b.SetBytes(int64(len(msg.Line)))
  146. for i := 0; i < b.N; i++ {
  147. if err := l.Log(msg); err != nil {
  148. b.Fatal(err)
  149. }
  150. }
  151. }
  152. func consumeWithDelay(delay time.Duration, c <-chan *Message) (cancel func()) {
  153. started := make(chan struct{})
  154. ctx, cancel := context.WithCancel(context.Background())
  155. go func() {
  156. close(started)
  157. ticker := time.NewTicker(delay)
  158. for range ticker.C {
  159. select {
  160. case <-ctx.Done():
  161. ticker.Stop()
  162. return
  163. case <-c:
  164. }
  165. }
  166. }()
  167. <-started
  168. return cancel
  169. }
  170. func BenchmarkRingLoggerThroughputConsumeDelay1(b *testing.B) {
  171. mockLog := &mockLogger{make(chan *Message)}
  172. defer mockLog.Close()
  173. l := NewRingLogger(mockLog, Info{}, -1)
  174. msg := &Message{Line: []byte("hello humans and everyone else!")}
  175. b.SetBytes(int64(len(msg.Line)))
  176. cancel := consumeWithDelay(1*time.Millisecond, mockLog.c)
  177. defer cancel()
  178. for i := 0; i < b.N; i++ {
  179. if err := l.Log(msg); err != nil {
  180. b.Fatal(err)
  181. }
  182. }
  183. }
  184. func BenchmarkRingLoggerThroughputConsumeDelay10(b *testing.B) {
  185. mockLog := &mockLogger{make(chan *Message)}
  186. defer mockLog.Close()
  187. l := NewRingLogger(mockLog, Info{}, -1)
  188. msg := &Message{Line: []byte("hello humans and everyone else!")}
  189. b.SetBytes(int64(len(msg.Line)))
  190. cancel := consumeWithDelay(10*time.Millisecond, mockLog.c)
  191. defer cancel()
  192. for i := 0; i < b.N; i++ {
  193. if err := l.Log(msg); err != nil {
  194. b.Fatal(err)
  195. }
  196. }
  197. }
  198. func BenchmarkRingLoggerThroughputConsumeDelay50(b *testing.B) {
  199. mockLog := &mockLogger{make(chan *Message)}
  200. defer mockLog.Close()
  201. l := NewRingLogger(mockLog, Info{}, -1)
  202. msg := &Message{Line: []byte("hello humans and everyone else!")}
  203. b.SetBytes(int64(len(msg.Line)))
  204. cancel := consumeWithDelay(50*time.Millisecond, mockLog.c)
  205. defer cancel()
  206. for i := 0; i < b.N; i++ {
  207. if err := l.Log(msg); err != nil {
  208. b.Fatal(err)
  209. }
  210. }
  211. }
  212. func BenchmarkRingLoggerThroughputConsumeDelay100(b *testing.B) {
  213. mockLog := &mockLogger{make(chan *Message)}
  214. defer mockLog.Close()
  215. l := NewRingLogger(mockLog, Info{}, -1)
  216. msg := &Message{Line: []byte("hello humans and everyone else!")}
  217. b.SetBytes(int64(len(msg.Line)))
  218. cancel := consumeWithDelay(100*time.Millisecond, mockLog.c)
  219. defer cancel()
  220. for i := 0; i < b.N; i++ {
  221. if err := l.Log(msg); err != nil {
  222. b.Fatal(err)
  223. }
  224. }
  225. }
  226. func BenchmarkRingLoggerThroughputConsumeDelay300(b *testing.B) {
  227. mockLog := &mockLogger{make(chan *Message)}
  228. defer mockLog.Close()
  229. l := NewRingLogger(mockLog, Info{}, -1)
  230. msg := &Message{Line: []byte("hello humans and everyone else!")}
  231. b.SetBytes(int64(len(msg.Line)))
  232. cancel := consumeWithDelay(300*time.Millisecond, mockLog.c)
  233. defer cancel()
  234. for i := 0; i < b.N; i++ {
  235. if err := l.Log(msg); err != nil {
  236. b.Fatal(err)
  237. }
  238. }
  239. }
  240. func BenchmarkRingLoggerThroughputConsumeDelay500(b *testing.B) {
  241. mockLog := &mockLogger{make(chan *Message)}
  242. defer mockLog.Close()
  243. l := NewRingLogger(mockLog, Info{}, -1)
  244. msg := &Message{Line: []byte("hello humans and everyone else!")}
  245. b.SetBytes(int64(len(msg.Line)))
  246. cancel := consumeWithDelay(500*time.Millisecond, mockLog.c)
  247. defer cancel()
  248. for i := 0; i < b.N; i++ {
  249. if err := l.Log(msg); err != nil {
  250. b.Fatal(err)
  251. }
  252. }
  253. }