This commit is contained in:
Brian Goff 2024-04-19 21:33:32 -07:00 committed by GitHub
commit 8c34f8f687
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
2 changed files with 163 additions and 88 deletions

View file

@ -8,6 +8,7 @@ import (
const (
defaultRingMaxSize = 1e6 // 1MB
defaultQueueSize = 1000
)
// RingLogger is a ring buffer that implements the Logger interface.
@ -36,8 +37,11 @@ func (r *ringWithReader) ReadLogs(cfg ReadConfig) *LogWatcher {
}
func newRingLogger(driver Logger, logInfo Info, maxSize int64) *RingLogger {
if maxSize < 0 {
maxSize = defaultRingMaxSize
}
l := &RingLogger{
buffer: newRing(maxSize),
buffer: newRing(maxSize, defaultQueueSize),
l: driver,
logInfo: logInfo,
}
@ -49,9 +53,6 @@ func newRingLogger(driver Logger, logInfo Info, maxSize int64) *RingLogger {
// NewRingLogger creates a new Logger that is implemented as a RingBuffer wrapping
// the passed in logger.
func NewRingLogger(driver Logger, logInfo Info, maxSize int64) Logger {
if maxSize < 0 {
maxSize = defaultRingMaxSize
}
l := newRingLogger(driver, logInfo, maxSize)
if _, ok := driver.(LogReader); ok {
return &ringWithReader{l}
@ -141,21 +142,41 @@ type messageRing struct {
maxBytes int64 // max buffer size
queue []*Message
closed bool
// head: index of the next message to dequeue from
// tail: index of the next message to enqueue to
// count: number of messages in the queue
head, tail, count int
// tracks the number of times the queue has been grown
// Used to determine if we can shrink the queue back down on dequeue
growCount int
dropped int64
}
func newRing(maxBytes int64) *messageRing {
queueSize := 1000
if maxBytes == 0 || maxBytes == 1 {
// With 0 or 1 max byte size, the maximum size of the queue would only ever be 1
// message long.
queueSize = 1
func newRing(maxBytes int64, queueSize int) *messageRing {
if queueSize == 0 {
queueSize = defaultQueueSize
}
r := &messageRing{queue: make([]*Message, 0, queueSize), maxBytes: maxBytes}
r := &messageRing{queue: make([]*Message, queueSize), maxBytes: maxBytes}
r.wait = sync.NewCond(&r.mu)
return r
}
func (r *messageRing) isEmpty() bool {
return r.head == r.tail && !r.isFull()
}
func (r *messageRing) isFull() bool {
return r.count == cap(r.queue)
}
func (r *messageRing) next(i int) int {
return (i + 1) % cap(r.queue)
}
// Enqueue adds a message to the buffer queue
// If the message is too big for the buffer it drops the new message.
// If there are no messages in the queue and the message is still too big, it adds the message anyway.
@ -167,14 +188,31 @@ func (r *messageRing) Enqueue(m *Message) error {
r.mu.Unlock()
return errClosed
}
if mSize+r.sizeBytes > r.maxBytes && len(r.queue) > 0 {
r.wait.Signal()
r.mu.Unlock()
return nil
// drop old messages until there is enough space for the new message
// This is not accounting for slots in the queue, just the size of the messages
for mSize+r.sizeBytes > r.maxBytes && !r.isEmpty() {
r.dequeue()
r.dropped++
}
r.queue = append(r.queue, m)
if r.isFull() {
// We've run out of space to store messages in the circular buffer even though we have not reached our byte limit.
// This means we need to resize the queue to make room for more messages.
//
r.growCount++
buf := make([]*Message, len(r.queue)*2)
r.copyTo(buf)
r.queue = buf
r.head = 0
r.tail = r.count
}
r.queue[r.tail] = m
r.tail = r.next(r.tail)
r.sizeBytes += mSize
r.count++
r.wait.Signal()
r.mu.Unlock()
return nil
@ -185,7 +223,7 @@ func (r *messageRing) Enqueue(m *Message) error {
// If the buffer is closed, it will return immediately.
func (r *messageRing) Dequeue() (*Message, error) {
r.mu.Lock()
for len(r.queue) == 0 && !r.closed {
for r.isEmpty() && !r.closed {
r.wait.Wait()
}
@ -194,13 +232,33 @@ func (r *messageRing) Dequeue() (*Message, error) {
return nil, errClosed
}
msg := r.queue[0]
r.queue = r.queue[1:]
r.sizeBytes -= int64(len(msg.Line))
msg := r.dequeue()
r.mu.Unlock()
return msg, nil
}
// callers must validate that there is at least one message in the queue
// callers must hold the lock
func (r *messageRing) dequeue() *Message {
msg := r.queue[r.head]
r.queue[r.head] = nil
r.head = r.next(r.head)
r.sizeBytes -= int64(len(msg.Line))
r.count--
if r.growCount > 0 && r.count <= cap(r.queue)/4 {
// We've shrunk the queue enough that we can resize it back down.
// This is to prevent the queue from growing too large and never shrinking back down.
buf := make([]*Message, cap(r.queue)/2)
r.copyTo(buf)
r.queue = buf
r.head = 0
r.tail = r.count
r.growCount--
}
return msg
}
var errClosed = errors.New("closed")
// Close closes the buffer ensuring no new messages can be added.
@ -217,14 +275,55 @@ func (r *messageRing) Close() {
r.mu.Unlock()
}
func (r *messageRing) Len() int {
r.mu.Lock()
defer r.mu.Unlock()
return r.count
}
func (r *messageRing) copyTo(buf []*Message) {
if r.isEmpty() {
return
}
// Here we have a circular buffer that we need to copy to the passed in slice.
// The assumption here is that `ls` is at least as large enough to hold all the messages in the queue.
// It maybe larger than the queue, but it must not be smaller.
// +---+---+---+---+---+---+---+---+---+---+
// | 0 | 1 | 2 | 3 | 4 | 5 | 6 | 7 | 8 | 9 |
// +---+---+---+---+---+---+---+---+---+---+
// ^ ^
// | |
// head tail
// The above example is a simplified view of the queue. The head is at index 0 (beginning) and the tail is at index 9 (end).
// In reality this could happen at any point in the queue. The head could be at index 5 and the tail at index 4.
// Copy from head marker of the slice
// Since this is a circular buffer we need to account for the wrap around.
if r.head < r.tail {
copy(buf, r.queue[r.head:r.tail])
} else {
n := copy(buf, r.queue[r.head:])
copy(buf[n:], r.queue[:r.tail])
}
}
// Drain drains all messages from the queue.
// This can be used after `Close()` to get any remaining messages that were in queue.
func (r *messageRing) Drain() []*Message {
r.mu.Lock()
ls := make([]*Message, 0, len(r.queue))
ls = append(ls, r.queue...)
ls := make([]*Message, r.count)
r.copyTo(ls)
r.sizeBytes = 0
r.queue = r.queue[:0]
r.head = 0
r.tail = 0
r.count = 0
r.mu.Unlock()
return ls
}

View file

@ -5,6 +5,9 @@ import (
"strconv"
"testing"
"time"
"gotest.tools/v3/assert"
is "gotest.tools/v3/assert/cmp"
)
type mockLogger struct{ c chan *Message }
@ -34,9 +37,7 @@ func TestRingLogger(t *testing.T) {
select {
case msg := <-mockLog.c:
if string(msg.Line) != "1" {
t.Fatalf("got unexpected msg: %q", string(msg.Line))
}
assert.Equal(t, string(msg.Line), "3")
case <-time.After(100 * time.Millisecond):
t.Fatal("timeout reading log message")
}
@ -46,102 +47,77 @@ func TestRingLogger(t *testing.T) {
t.Fatalf("expected no more messages in the queue, got: %q", string(msg.Line))
default:
}
assert.Equal(t, ring.buffer.dropped, int64(2))
}
func TestRingCap(t *testing.T) {
r := newRing(5)
r := newRing(5, 0)
for i := 0; i < 10; i++ {
// queue messages with "0" to "10"
// the "5" to "10" messages should be dropped since we only allow 5 bytes in the buffer
if err := r.Enqueue(&Message{Line: []byte(strconv.Itoa(i))}); err != nil {
t.Fatal(err)
}
// the "0" to "4" messages should be dropped since we only allow 5 bytes in the buffer
err := r.Enqueue(&Message{Line: []byte(strconv.Itoa(i))})
assert.NilError(t, err)
}
// should have messages in the queue for "0" to "4"
for i := 0; i < 5; i++ {
for i := 5; i < 10; i++ {
m, err := r.Dequeue()
if err != nil {
t.Fatal(err)
}
if string(m.Line) != strconv.Itoa(i) {
t.Fatalf("got unexpected message for iter %d: %s", i, string(m.Line))
}
assert.NilError(t, err)
assert.Equal(t, string(m.Line), strconv.Itoa(i))
}
// queue a message that's bigger than the buffer cap
if err := r.Enqueue(&Message{Line: []byte("hello world")}); err != nil {
t.Fatal(err)
}
err := r.Enqueue(&Message{Line: []byte("hello world")})
assert.NilError(t, err)
// queue another message that's bigger than the buffer cap
if err := r.Enqueue(&Message{Line: []byte("eat a banana")}); err != nil {
t.Fatal(err)
}
err = r.Enqueue(&Message{Line: []byte("eat a banana")})
assert.NilError(t, err)
m, err := r.Dequeue()
if err != nil {
t.Fatal(err)
}
if string(m.Line) != "hello world" {
t.Fatalf("got unexpected message: %s", string(m.Line))
}
if len(r.queue) != 0 {
t.Fatalf("expected queue to be empty, got: %d", len(r.queue))
}
assert.NilError(t, err)
assert.Equal(t, string(m.Line), "eat a banana")
assert.Equal(t, r.count, 0)
}
func TestRingClose(t *testing.T) {
r := newRing(1)
if err := r.Enqueue(&Message{Line: []byte("hello")}); err != nil {
t.Fatal(err)
}
r := newRing(1, 0)
err := r.Enqueue(&Message{Line: []byte("hello")})
assert.NilError(t, err)
r.Close()
if err := r.Enqueue(&Message{}); err != errClosed {
t.Fatalf("expected errClosed, got: %v", err)
}
if len(r.queue) != 1 {
t.Fatal("expected empty queue")
}
if m, err := r.Dequeue(); err == nil || m != nil {
t.Fatal("expected err on Dequeue after close")
}
assert.ErrorIs(t, r.Enqueue(&Message{}), errClosed)
assert.Equal(t, r.count, 1)
m, err := r.Dequeue()
assert.ErrorIs(t, err, errClosed)
assert.Assert(t, is.Nil(m))
ls := r.Drain()
if len(ls) != 1 {
t.Fatalf("expected one message: %v", ls)
}
if string(ls[0].Line) != "hello" {
t.Fatalf("got unexpected message: %s", string(ls[0].Line))
}
assert.Assert(t, is.Len(ls, 1))
assert.Equal(t, string(ls[0].Line), "hello")
}
func TestRingDrain(t *testing.T) {
r := newRing(5)
r := newRing(5, 0)
for i := 0; i < 5; i++ {
if err := r.Enqueue(&Message{Line: []byte(strconv.Itoa(i))}); err != nil {
t.Fatal(err)
}
err := r.Enqueue(&Message{Line: []byte(strconv.Itoa(i))})
assert.NilError(t, err)
}
ls := r.Drain()
if len(ls) != 5 {
t.Fatal("got unexpected length after drain")
}
assert.Assert(t, is.Len(ls, 5))
for i := 0; i < 5; i++ {
if string(ls[i].Line) != strconv.Itoa(i) {
t.Fatalf("got unexpected message at position %d: %s", i, string(ls[i].Line))
}
}
if r.sizeBytes != 0 {
t.Fatalf("expected buffer size to be 0 after drain, got: %d", r.sizeBytes)
assert.Check(t, is.Equal(string(ls[i].Line), strconv.Itoa(i)))
}
assert.Check(t, is.Equal(r.count, 0))
ls = r.Drain()
if len(ls) != 0 {
t.Fatalf("expected 0 messages on 2nd drain: %v", ls)
}
assert.Assert(t, is.Len(ls, 0))
}
type nopLogger struct{}