diff --git a/daemon/logger/ring.go b/daemon/logger/ring.go index 8c19b543d6..7d7f839f36 100644 --- a/daemon/logger/ring.go +++ b/daemon/logger/ring.go @@ -8,6 +8,7 @@ import ( const ( defaultRingMaxSize = 1e6 // 1MB + defaultQueueSize = 1000 ) // RingLogger is a ring buffer that implements the Logger interface. @@ -36,8 +37,11 @@ func (r *ringWithReader) ReadLogs(cfg ReadConfig) *LogWatcher { } func newRingLogger(driver Logger, logInfo Info, maxSize int64) *RingLogger { + if maxSize < 0 { + maxSize = defaultRingMaxSize + } l := &RingLogger{ - buffer: newRing(maxSize), + buffer: newRing(maxSize, defaultQueueSize), l: driver, logInfo: logInfo, } @@ -49,9 +53,6 @@ func newRingLogger(driver Logger, logInfo Info, maxSize int64) *RingLogger { // NewRingLogger creates a new Logger that is implemented as a RingBuffer wrapping // the passed in logger. func NewRingLogger(driver Logger, logInfo Info, maxSize int64) Logger { - if maxSize < 0 { - maxSize = defaultRingMaxSize - } l := newRingLogger(driver, logInfo, maxSize) if _, ok := driver.(LogReader); ok { return &ringWithReader{l} @@ -141,21 +142,41 @@ type messageRing struct { maxBytes int64 // max buffer size queue []*Message closed bool + + // head: index of the next message to dequeue from + // tail: index of the next message to enqueue to + // count: number of messages in the queue + head, tail, count int + + // tracks the number of times the queue has been grown + // Used to determine if we can shrink the queue back down on dequeue + growCount int + + dropped int64 } -func newRing(maxBytes int64) *messageRing { - queueSize := 1000 - if maxBytes == 0 || maxBytes == 1 { - // With 0 or 1 max byte size, the maximum size of the queue would only ever be 1 - // message long. - queueSize = 1 +func newRing(maxBytes int64, queueSize int) *messageRing { + if queueSize == 0 { + queueSize = defaultQueueSize } - r := &messageRing{queue: make([]*Message, 0, queueSize), maxBytes: maxBytes} + r := &messageRing{queue: make([]*Message, queueSize), maxBytes: maxBytes} r.wait = sync.NewCond(&r.mu) return r } +func (r *messageRing) isEmpty() bool { + return r.head == r.tail && !r.isFull() +} + +func (r *messageRing) isFull() bool { + return r.count == cap(r.queue) +} + +func (r *messageRing) next(i int) int { + return (i + 1) % cap(r.queue) +} + // Enqueue adds a message to the buffer queue // If the message is too big for the buffer it drops the new message. // If there are no messages in the queue and the message is still too big, it adds the message anyway. @@ -167,14 +188,31 @@ func (r *messageRing) Enqueue(m *Message) error { r.mu.Unlock() return errClosed } - if mSize+r.sizeBytes > r.maxBytes && len(r.queue) > 0 { - r.wait.Signal() - r.mu.Unlock() - return nil + + // drop old messages until there is enough space for the new message + // This is not accounting for slots in the queue, just the size of the messages + for mSize+r.sizeBytes > r.maxBytes && !r.isEmpty() { + r.dequeue() + r.dropped++ } - r.queue = append(r.queue, m) + if r.isFull() { + // We've run out of space to store messages in the circular buffer even though we have not reached our byte limit. + // This means we need to resize the queue to make room for more messages. + // + r.growCount++ + buf := make([]*Message, len(r.queue)*2) + r.copyTo(buf) + r.queue = buf + r.head = 0 + r.tail = r.count + } + + r.queue[r.tail] = m + r.tail = r.next(r.tail) r.sizeBytes += mSize + r.count++ + r.wait.Signal() r.mu.Unlock() return nil @@ -185,7 +223,7 @@ func (r *messageRing) Enqueue(m *Message) error { // If the buffer is closed, it will return immediately. func (r *messageRing) Dequeue() (*Message, error) { r.mu.Lock() - for len(r.queue) == 0 && !r.closed { + for r.isEmpty() && !r.closed { r.wait.Wait() } @@ -194,13 +232,33 @@ func (r *messageRing) Dequeue() (*Message, error) { return nil, errClosed } - msg := r.queue[0] - r.queue = r.queue[1:] - r.sizeBytes -= int64(len(msg.Line)) + msg := r.dequeue() r.mu.Unlock() return msg, nil } +// callers must validate that there is at least one message in the queue +// callers must hold the lock +func (r *messageRing) dequeue() *Message { + msg := r.queue[r.head] + r.queue[r.head] = nil + r.head = r.next(r.head) + r.sizeBytes -= int64(len(msg.Line)) + r.count-- + + if r.growCount > 0 && r.count <= cap(r.queue)/4 { + // We've shrunk the queue enough that we can resize it back down. + // This is to prevent the queue from growing too large and never shrinking back down. + buf := make([]*Message, cap(r.queue)/2) + r.copyTo(buf) + r.queue = buf + r.head = 0 + r.tail = r.count + r.growCount-- + } + return msg +} + var errClosed = errors.New("closed") // Close closes the buffer ensuring no new messages can be added. @@ -217,14 +275,55 @@ func (r *messageRing) Close() { r.mu.Unlock() } +func (r *messageRing) Len() int { + r.mu.Lock() + defer r.mu.Unlock() + return r.count +} + +func (r *messageRing) copyTo(buf []*Message) { + if r.isEmpty() { + return + } + + // Here we have a circular buffer that we need to copy to the passed in slice. + // The assumption here is that `ls` is at least as large enough to hold all the messages in the queue. + // It maybe larger than the queue, but it must not be smaller. + + // +---+---+---+---+---+---+---+---+---+---+ + // | 0 | 1 | 2 | 3 | 4 | 5 | 6 | 7 | 8 | 9 | + // +---+---+---+---+---+---+---+---+---+---+ + // ^ ^ + // | | + // head tail + + // The above example is a simplified view of the queue. The head is at index 0 (beginning) and the tail is at index 9 (end). + // In reality this could happen at any point in the queue. The head could be at index 5 and the tail at index 4. + + // Copy from head marker of the slice + // Since this is a circular buffer we need to account for the wrap around. + if r.head < r.tail { + copy(buf, r.queue[r.head:r.tail]) + } else { + n := copy(buf, r.queue[r.head:]) + copy(buf[n:], r.queue[:r.tail]) + } + +} + // Drain drains all messages from the queue. // This can be used after `Close()` to get any remaining messages that were in queue. func (r *messageRing) Drain() []*Message { r.mu.Lock() - ls := make([]*Message, 0, len(r.queue)) - ls = append(ls, r.queue...) + ls := make([]*Message, r.count) + + r.copyTo(ls) + r.sizeBytes = 0 - r.queue = r.queue[:0] + r.head = 0 + r.tail = 0 + r.count = 0 + r.mu.Unlock() return ls } diff --git a/daemon/logger/ring_test.go b/daemon/logger/ring_test.go index eab2446234..84ec09399f 100644 --- a/daemon/logger/ring_test.go +++ b/daemon/logger/ring_test.go @@ -5,6 +5,9 @@ import ( "strconv" "testing" "time" + + "gotest.tools/v3/assert" + is "gotest.tools/v3/assert/cmp" ) type mockLogger struct{ c chan *Message } @@ -34,9 +37,7 @@ func TestRingLogger(t *testing.T) { select { case msg := <-mockLog.c: - if string(msg.Line) != "1" { - t.Fatalf("got unexpected msg: %q", string(msg.Line)) - } + assert.Equal(t, string(msg.Line), "3") case <-time.After(100 * time.Millisecond): t.Fatal("timeout reading log message") } @@ -46,102 +47,77 @@ func TestRingLogger(t *testing.T) { t.Fatalf("expected no more messages in the queue, got: %q", string(msg.Line)) default: } + + assert.Equal(t, ring.buffer.dropped, int64(2)) } func TestRingCap(t *testing.T) { - r := newRing(5) + r := newRing(5, 0) for i := 0; i < 10; i++ { // queue messages with "0" to "10" - // the "5" to "10" messages should be dropped since we only allow 5 bytes in the buffer - if err := r.Enqueue(&Message{Line: []byte(strconv.Itoa(i))}); err != nil { - t.Fatal(err) - } + // the "0" to "4" messages should be dropped since we only allow 5 bytes in the buffer + err := r.Enqueue(&Message{Line: []byte(strconv.Itoa(i))}) + assert.NilError(t, err) } // should have messages in the queue for "0" to "4" - for i := 0; i < 5; i++ { + for i := 5; i < 10; i++ { m, err := r.Dequeue() - if err != nil { - t.Fatal(err) - } - if string(m.Line) != strconv.Itoa(i) { - t.Fatalf("got unexpected message for iter %d: %s", i, string(m.Line)) - } + assert.NilError(t, err) + assert.Equal(t, string(m.Line), strconv.Itoa(i)) } // queue a message that's bigger than the buffer cap - if err := r.Enqueue(&Message{Line: []byte("hello world")}); err != nil { - t.Fatal(err) - } + + err := r.Enqueue(&Message{Line: []byte("hello world")}) + assert.NilError(t, err) // queue another message that's bigger than the buffer cap - if err := r.Enqueue(&Message{Line: []byte("eat a banana")}); err != nil { - t.Fatal(err) - } + err = r.Enqueue(&Message{Line: []byte("eat a banana")}) + assert.NilError(t, err) m, err := r.Dequeue() - if err != nil { - t.Fatal(err) - } - if string(m.Line) != "hello world" { - t.Fatalf("got unexpected message: %s", string(m.Line)) - } - if len(r.queue) != 0 { - t.Fatalf("expected queue to be empty, got: %d", len(r.queue)) - } + assert.NilError(t, err) + assert.Equal(t, string(m.Line), "eat a banana") + assert.Equal(t, r.count, 0) } func TestRingClose(t *testing.T) { - r := newRing(1) - if err := r.Enqueue(&Message{Line: []byte("hello")}); err != nil { - t.Fatal(err) - } + r := newRing(1, 0) + err := r.Enqueue(&Message{Line: []byte("hello")}) + assert.NilError(t, err) + r.Close() - if err := r.Enqueue(&Message{}); err != errClosed { - t.Fatalf("expected errClosed, got: %v", err) - } - if len(r.queue) != 1 { - t.Fatal("expected empty queue") - } - if m, err := r.Dequeue(); err == nil || m != nil { - t.Fatal("expected err on Dequeue after close") - } + + assert.ErrorIs(t, r.Enqueue(&Message{}), errClosed) + assert.Equal(t, r.count, 1) + + m, err := r.Dequeue() + assert.ErrorIs(t, err, errClosed) + assert.Assert(t, is.Nil(m)) ls := r.Drain() - if len(ls) != 1 { - t.Fatalf("expected one message: %v", ls) - } - if string(ls[0].Line) != "hello" { - t.Fatalf("got unexpected message: %s", string(ls[0].Line)) - } + assert.Assert(t, is.Len(ls, 1)) + assert.Equal(t, string(ls[0].Line), "hello") } func TestRingDrain(t *testing.T) { - r := newRing(5) + r := newRing(5, 0) for i := 0; i < 5; i++ { - if err := r.Enqueue(&Message{Line: []byte(strconv.Itoa(i))}); err != nil { - t.Fatal(err) - } + err := r.Enqueue(&Message{Line: []byte(strconv.Itoa(i))}) + assert.NilError(t, err) } ls := r.Drain() - if len(ls) != 5 { - t.Fatal("got unexpected length after drain") - } + assert.Assert(t, is.Len(ls, 5)) for i := 0; i < 5; i++ { - if string(ls[i].Line) != strconv.Itoa(i) { - t.Fatalf("got unexpected message at position %d: %s", i, string(ls[i].Line)) - } - } - if r.sizeBytes != 0 { - t.Fatalf("expected buffer size to be 0 after drain, got: %d", r.sizeBytes) + assert.Check(t, is.Equal(string(ls[i].Line), strconv.Itoa(i))) } + assert.Check(t, is.Equal(r.count, 0)) ls = r.Drain() - if len(ls) != 0 { - t.Fatalf("expected 0 messages on 2nd drain: %v", ls) - } + assert.Assert(t, is.Len(ls, 0)) } type nopLogger struct{}