123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298 |
- package logger // import "github.com/docker/docker/daemon/logger"
- import (
- "context"
- "strconv"
- "testing"
- "time"
- )
- type mockLogger struct{ c chan *Message }
- func (l *mockLogger) Log(msg *Message) error {
- l.c <- msg
- return nil
- }
- func (l *mockLogger) Name() string {
- return "mock"
- }
- func (l *mockLogger) Close() error {
- return nil
- }
- func TestRingLogger(t *testing.T) {
- mockLog := &mockLogger{make(chan *Message)} // no buffer on this channel
- ring := newRingLogger(mockLog, Info{}, 1)
- defer ring.setClosed()
- // this should never block
- ring.Log(&Message{Line: []byte("1")})
- ring.Log(&Message{Line: []byte("2")})
- ring.Log(&Message{Line: []byte("3")})
- select {
- case msg := <-mockLog.c:
- if string(msg.Line) != "1" {
- t.Fatalf("got unexpected msg: %q", string(msg.Line))
- }
- case <-time.After(100 * time.Millisecond):
- t.Fatal("timeout reading log message")
- }
- select {
- case msg := <-mockLog.c:
- t.Fatalf("expected no more messages in the queue, got: %q", string(msg.Line))
- default:
- }
- }
- func TestRingCap(t *testing.T) {
- r := newRing(5)
- for i := 0; i < 10; i++ {
- // queue messages with "0" to "10"
- // the "5" to "10" messages should be dropped since we only allow 5 bytes in the buffer
- if err := r.Enqueue(&Message{Line: []byte(strconv.Itoa(i))}); err != nil {
- t.Fatal(err)
- }
- }
- // should have messages in the queue for "0" to "4"
- for i := 0; i < 5; i++ {
- m, err := r.Dequeue()
- if err != nil {
- t.Fatal(err)
- }
- if string(m.Line) != strconv.Itoa(i) {
- t.Fatalf("got unexpected message for iter %d: %s", i, string(m.Line))
- }
- }
- // queue a message that's bigger than the buffer cap
- if err := r.Enqueue(&Message{Line: []byte("hello world")}); err != nil {
- t.Fatal(err)
- }
- // queue another message that's bigger than the buffer cap
- if err := r.Enqueue(&Message{Line: []byte("eat a banana")}); err != nil {
- t.Fatal(err)
- }
- m, err := r.Dequeue()
- if err != nil {
- t.Fatal(err)
- }
- if string(m.Line) != "hello world" {
- t.Fatalf("got unexpected message: %s", string(m.Line))
- }
- if len(r.queue) != 0 {
- t.Fatalf("expected queue to be empty, got: %d", len(r.queue))
- }
- }
- func TestRingClose(t *testing.T) {
- r := newRing(1)
- if err := r.Enqueue(&Message{Line: []byte("hello")}); err != nil {
- t.Fatal(err)
- }
- r.Close()
- if err := r.Enqueue(&Message{}); err != errClosed {
- t.Fatalf("expected errClosed, got: %v", err)
- }
- if len(r.queue) != 1 {
- t.Fatal("expected empty queue")
- }
- if m, err := r.Dequeue(); err == nil || m != nil {
- t.Fatal("expected err on Dequeue after close")
- }
- ls := r.Drain()
- if len(ls) != 1 {
- t.Fatalf("expected one message: %v", ls)
- }
- if string(ls[0].Line) != "hello" {
- t.Fatalf("got unexpected message: %s", string(ls[0].Line))
- }
- }
- func TestRingDrain(t *testing.T) {
- r := newRing(5)
- for i := 0; i < 5; i++ {
- if err := r.Enqueue(&Message{Line: []byte(strconv.Itoa(i))}); err != nil {
- t.Fatal(err)
- }
- }
- ls := r.Drain()
- if len(ls) != 5 {
- t.Fatal("got unexpected length after drain")
- }
- for i := 0; i < 5; i++ {
- if string(ls[i].Line) != strconv.Itoa(i) {
- t.Fatalf("got unexpected message at position %d: %s", i, string(ls[i].Line))
- }
- }
- if r.sizeBytes != 0 {
- t.Fatalf("expected buffer size to be 0 after drain, got: %d", r.sizeBytes)
- }
- ls = r.Drain()
- if len(ls) != 0 {
- t.Fatalf("expected 0 messages on 2nd drain: %v", ls)
- }
- }
- type nopLogger struct{}
- func (nopLogger) Name() string { return "nopLogger" }
- func (nopLogger) Close() error { return nil }
- func (nopLogger) Log(*Message) error { return nil }
- func BenchmarkRingLoggerThroughputNoReceiver(b *testing.B) {
- mockLog := &mockLogger{make(chan *Message)}
- defer mockLog.Close()
- l := NewRingLogger(mockLog, Info{}, -1)
- msg := &Message{Line: []byte("hello humans and everyone else!")}
- b.SetBytes(int64(len(msg.Line)))
- for i := 0; i < b.N; i++ {
- if err := l.Log(msg); err != nil {
- b.Fatal(err)
- }
- }
- }
- func BenchmarkRingLoggerThroughputWithReceiverDelay0(b *testing.B) {
- l := NewRingLogger(nopLogger{}, Info{}, -1)
- msg := &Message{Line: []byte("hello humans and everyone else!")}
- b.SetBytes(int64(len(msg.Line)))
- for i := 0; i < b.N; i++ {
- if err := l.Log(msg); err != nil {
- b.Fatal(err)
- }
- }
- }
- func consumeWithDelay(delay time.Duration, c <-chan *Message) (cancel func()) {
- started := make(chan struct{})
- ctx, cancel := context.WithCancel(context.Background())
- go func() {
- close(started)
- ticker := time.NewTicker(delay)
- for range ticker.C {
- select {
- case <-ctx.Done():
- ticker.Stop()
- return
- case <-c:
- }
- }
- }()
- <-started
- return cancel
- }
- func BenchmarkRingLoggerThroughputConsumeDelay1(b *testing.B) {
- mockLog := &mockLogger{make(chan *Message)}
- defer mockLog.Close()
- l := NewRingLogger(mockLog, Info{}, -1)
- msg := &Message{Line: []byte("hello humans and everyone else!")}
- b.SetBytes(int64(len(msg.Line)))
- cancel := consumeWithDelay(1*time.Millisecond, mockLog.c)
- defer cancel()
- for i := 0; i < b.N; i++ {
- if err := l.Log(msg); err != nil {
- b.Fatal(err)
- }
- }
- }
- func BenchmarkRingLoggerThroughputConsumeDelay10(b *testing.B) {
- mockLog := &mockLogger{make(chan *Message)}
- defer mockLog.Close()
- l := NewRingLogger(mockLog, Info{}, -1)
- msg := &Message{Line: []byte("hello humans and everyone else!")}
- b.SetBytes(int64(len(msg.Line)))
- cancel := consumeWithDelay(10*time.Millisecond, mockLog.c)
- defer cancel()
- for i := 0; i < b.N; i++ {
- if err := l.Log(msg); err != nil {
- b.Fatal(err)
- }
- }
- }
- func BenchmarkRingLoggerThroughputConsumeDelay50(b *testing.B) {
- mockLog := &mockLogger{make(chan *Message)}
- defer mockLog.Close()
- l := NewRingLogger(mockLog, Info{}, -1)
- msg := &Message{Line: []byte("hello humans and everyone else!")}
- b.SetBytes(int64(len(msg.Line)))
- cancel := consumeWithDelay(50*time.Millisecond, mockLog.c)
- defer cancel()
- for i := 0; i < b.N; i++ {
- if err := l.Log(msg); err != nil {
- b.Fatal(err)
- }
- }
- }
- func BenchmarkRingLoggerThroughputConsumeDelay100(b *testing.B) {
- mockLog := &mockLogger{make(chan *Message)}
- defer mockLog.Close()
- l := NewRingLogger(mockLog, Info{}, -1)
- msg := &Message{Line: []byte("hello humans and everyone else!")}
- b.SetBytes(int64(len(msg.Line)))
- cancel := consumeWithDelay(100*time.Millisecond, mockLog.c)
- defer cancel()
- for i := 0; i < b.N; i++ {
- if err := l.Log(msg); err != nil {
- b.Fatal(err)
- }
- }
- }
- func BenchmarkRingLoggerThroughputConsumeDelay300(b *testing.B) {
- mockLog := &mockLogger{make(chan *Message)}
- defer mockLog.Close()
- l := NewRingLogger(mockLog, Info{}, -1)
- msg := &Message{Line: []byte("hello humans and everyone else!")}
- b.SetBytes(int64(len(msg.Line)))
- cancel := consumeWithDelay(300*time.Millisecond, mockLog.c)
- defer cancel()
- for i := 0; i < b.N; i++ {
- if err := l.Log(msg); err != nil {
- b.Fatal(err)
- }
- }
- }
- func BenchmarkRingLoggerThroughputConsumeDelay500(b *testing.B) {
- mockLog := &mockLogger{make(chan *Message)}
- defer mockLog.Close()
- l := NewRingLogger(mockLog, Info{}, -1)
- msg := &Message{Line: []byte("hello humans and everyone else!")}
- b.SetBytes(int64(len(msg.Line)))
- cancel := consumeWithDelay(500*time.Millisecond, mockLog.c)
- defer cancel()
- for i := 0; i < b.N; i++ {
- if err := l.Log(msg); err != nil {
- b.Fatal(err)
- }
- }
- }
|