moby/daemon/logger/ring.go
Brian Goff 3f4fccb65f Use sync.Pool for logger Messages
This reduces allocs and bytes used per log entry significantly as well
as some improvement to time per log operation.

Each log driver, however, must put messages back in the pool once they
are finished with the message.

Signed-off-by: Brian Goff <cpuguy83@gmail.com>
2017-02-01 13:52:37 -05:00

218 lines
4.8 KiB
Go

package logger
import (
"errors"
"sync"
"sync/atomic"
"github.com/Sirupsen/logrus"
)
const (
defaultRingMaxSize = 1e6 // 1MB
)
// RingLogger is a ring buffer that implements the Logger interface.
// This is used when lossy logging is OK.
type RingLogger struct {
buffer *messageRing
l Logger
logInfo Info
closeFlag int32
}
type ringWithReader struct {
*RingLogger
}
func (r *ringWithReader) ReadLogs(cfg ReadConfig) *LogWatcher {
reader, ok := r.l.(LogReader)
if !ok {
// something is wrong if we get here
panic("expected log reader")
}
return reader.ReadLogs(cfg)
}
func newRingLogger(driver Logger, logInfo Info, maxSize int64) *RingLogger {
l := &RingLogger{
buffer: newRing(maxSize),
l: driver,
logInfo: logInfo,
}
go l.run()
return l
}
// NewRingLogger creates a new Logger that is implemented as a RingBuffer wrapping
// the passed in logger.
func NewRingLogger(driver Logger, logInfo Info, maxSize int64) Logger {
if maxSize < 0 {
maxSize = defaultRingMaxSize
}
l := newRingLogger(driver, logInfo, maxSize)
if _, ok := driver.(LogReader); ok {
return &ringWithReader{l}
}
return l
}
// Log queues messages into the ring buffer
func (r *RingLogger) Log(msg *Message) error {
if r.closed() {
return errClosed
}
return r.buffer.Enqueue(msg)
}
// Name returns the name of the underlying logger
func (r *RingLogger) Name() string {
return r.l.Name()
}
func (r *RingLogger) closed() bool {
return atomic.LoadInt32(&r.closeFlag) == 1
}
func (r *RingLogger) setClosed() {
atomic.StoreInt32(&r.closeFlag, 1)
}
// Close closes the logger
func (r *RingLogger) Close() error {
r.setClosed()
r.buffer.Close()
// empty out the queue
var logErr bool
for _, msg := range r.buffer.Drain() {
if logErr {
// some error logging a previous message, so re-insert to message pool
// and assume log driver is hosed
PutMessage(msg)
continue
}
if err := r.l.Log(msg); err != nil {
logrus.WithField("driver", r.l.Name()).WithField("container", r.logInfo.ContainerID).Errorf("Error writing log message: %v", r.l)
logErr = true
}
}
return r.l.Close()
}
// run consumes messages from the ring buffer and forwards them to the underling
// logger.
// This is run in a goroutine when the RingLogger is created
func (r *RingLogger) run() {
for {
if r.closed() {
return
}
msg, err := r.buffer.Dequeue()
if err != nil {
// buffer is closed
return
}
if err := r.l.Log(msg); err != nil {
logrus.WithField("driver", r.l.Name()).WithField("container", r.logInfo.ContainerID).Errorf("Error writing log message: %v", r.l)
}
}
}
type messageRing struct {
mu sync.Mutex
// singals callers of `Dequeue` to wake up either on `Close` or when a new `Message` is added
wait *sync.Cond
sizeBytes int64 // current buffer size
maxBytes int64 // max buffer size size
queue []*Message
closed bool
}
func newRing(maxBytes int64) *messageRing {
queueSize := 1000
if maxBytes == 0 || maxBytes == 1 {
// With 0 or 1 max byte size, the maximum size of the queue would only ever be 1
// message long.
queueSize = 1
}
r := &messageRing{queue: make([]*Message, 0, queueSize), maxBytes: maxBytes}
r.wait = sync.NewCond(&r.mu)
return r
}
// Enqueue adds a message to the buffer queue
// If the message is too big for the buffer it drops the oldest messages to make room
// If there are no messages in the queue and the message is still too big, it adds the message anyway.
func (r *messageRing) Enqueue(m *Message) error {
mSize := int64(len(m.Line))
r.mu.Lock()
if r.closed {
r.mu.Unlock()
return errClosed
}
if mSize+r.sizeBytes > r.maxBytes && len(r.queue) > 0 {
r.wait.Signal()
r.mu.Unlock()
return nil
}
r.queue = append(r.queue, m)
r.sizeBytes += mSize
r.wait.Signal()
r.mu.Unlock()
return nil
}
// Dequeue pulls a message off the queue
// If there are no messages, it waits for one.
// If the buffer is closed, it will return immediately.
func (r *messageRing) Dequeue() (*Message, error) {
r.mu.Lock()
for len(r.queue) == 0 && !r.closed {
r.wait.Wait()
}
if r.closed {
r.mu.Unlock()
return nil, errClosed
}
msg := r.queue[0]
r.queue = r.queue[1:]
r.sizeBytes -= int64(len(msg.Line))
r.mu.Unlock()
return msg, nil
}
var errClosed = errors.New("closed")
// Close closes the buffer ensuring no new messages can be added.
// Any callers waiting to dequeue a message will be woken up.
func (r *messageRing) Close() {
r.mu.Lock()
if r.closed {
r.mu.Unlock()
return
}
r.closed = true
r.wait.Broadcast()
r.mu.Unlock()
return
}
// Drain drains all messages from the queue.
// This can be used after `Close()` to get any remaining messages that were in queue.
func (r *messageRing) Drain() []*Message {
r.mu.Lock()
ls := make([]*Message, 0, len(r.queue))
ls = append(ls, r.queue...)
r.sizeBytes = 0
r.queue = r.queue[:0]
r.mu.Unlock()
return ls
}