Przeglądaj źródła

Implement optional ring buffer for container logs

This allows the user to set a logging mode to "blocking" (default), or
"non-blocking", which uses the ring buffer as a proxy to the real log
driver.

This allows a container to never be blocked on stdio at the cost of
dropping log messages.

Introduces 2 new log-opts that works for all drivers, `log-mode` and
`log-size`. `log-mode` takes a  value of "blocking", or "non-blocking"
I chose not to implement this as a bool since it is difficult to
determine if the mode was set to false vs just not set... especially
difficult when merging the default daemon config with the container config.
`log-size` takes a size string, e.g. `2MB`, which sets the max size
of the ring buffer. When the max size is reached, it will start
dropping log messages.

```
BenchmarkRingLoggerThroughputNoReceiver-8           	2000000000	        36.2 ns/op	 856.35 MB/s	       0 B/op	       0 allocs/op
BenchmarkRingLoggerThroughputWithReceiverDelay0-8   	300000000	       156 ns/op	 198.48 MB/s	      32 B/op	       0 allocs/op
BenchmarkRingLoggerThroughputConsumeDelay1-8        	2000000000	        36.1 ns/op	 857.80 MB/s	       0 B/op	       0 allocs/op
BenchmarkRingLoggerThroughputConsumeDelay10-8       	1000000000	        36.2 ns/op	 856.53 MB/s	       0 B/op	       0 allocs/op
BenchmarkRingLoggerThroughputConsumeDelay50-8       	2000000000	        34.7 ns/op	 894.65 MB/s	       0 B/op	       0 allocs/op
BenchmarkRingLoggerThroughputConsumeDelay100-8      	2000000000	        35.1 ns/op	 883.91 MB/s	       0 B/op	       0 allocs/op
BenchmarkRingLoggerThroughputConsumeDelay300-8      	1000000000	        35.9 ns/op	 863.90 MB/s	       0 B/op	       0 allocs/op
BenchmarkRingLoggerThroughputConsumeDelay500-8      	2000000000	        35.8 ns/op	 866.88 MB/s	       0 B/op	       0 allocs/op
```

Signed-off-by: Brian Goff <cpuguy83@gmail.com>
Brian Goff 8 lat temu
rodzic
commit
054abff3b6

+ 11 - 0
api/types/container/host_config.go

@@ -223,6 +223,17 @@ func (rp *RestartPolicy) IsSame(tp *RestartPolicy) bool {
 	return rp.Name == tp.Name && rp.MaximumRetryCount == tp.MaximumRetryCount
 }
 
+// LogMode is a type to define the available modes for logging
+// These modes affect how logs are handled when log messages start piling up.
+type LogMode string
+
+// Available logging modes
+const (
+	LogModeUnset            = ""
+	LogModeBlocking LogMode = "blocking"
+	LogModeNonBlock LogMode = "non-blocking"
+)
+
 // LogConfig represents the logging configuration of the container.
 type LogConfig struct {
 	Type   string

+ 19 - 2
container/container.go

@@ -37,6 +37,7 @@ import (
 	"github.com/docker/docker/runconfig"
 	"github.com/docker/docker/volume"
 	"github.com/docker/go-connections/nat"
+	"github.com/docker/go-units"
 	"github.com/docker/libnetwork"
 	"github.com/docker/libnetwork/netlabel"
 	"github.com/docker/libnetwork/options"
@@ -316,7 +317,7 @@ func (container *Container) CheckpointDir() string {
 // StartLogger starts a new logger driver for the container.
 func (container *Container) StartLogger() (logger.Logger, error) {
 	cfg := container.HostConfig.LogConfig
-	c, err := logger.GetLogDriver(cfg.Type)
+	initDriver, err := logger.GetLogDriver(cfg.Type)
 	if err != nil {
 		return nil, fmt.Errorf("failed to get logging factory: %v", err)
 	}
@@ -341,7 +342,23 @@ func (container *Container) StartLogger() (logger.Logger, error) {
 			return nil, err
 		}
 	}
-	return c(info)
+
+	l, err := initDriver(info)
+	if err != nil {
+		return nil, err
+	}
+
+	if containertypes.LogMode(cfg.Config["mode"]) == containertypes.LogModeNonBlock {
+		bufferSize := int64(-1)
+		if s, exists := cfg.Config["max-buffer-size"]; exists {
+			bufferSize, err = units.RAMInBytes(s)
+			if err != nil {
+				return nil, err
+			}
+		}
+		l = logger.NewRingLogger(l, info, bufferSize)
+	}
+	return l, nil
 }
 
 // GetProcessLabel returns the process label for the container.

+ 1 - 1
daemon/logger/awslogs/cloudwatchlogs.go

@@ -204,7 +204,7 @@ func (l *logStream) Log(msg *logger.Message) error {
 	defer l.lock.RUnlock()
 	if !l.closed {
 		// buffer up the data, making sure to copy the Line data
-		l.messages <- logger.CopyMessage(msg)
+		l.messages <- msg
 	}
 	return nil
 }

+ 13 - 6
daemon/logger/copier.go

@@ -47,7 +47,6 @@ func (c *Copier) copySrc(name string, src io.Reader) {
 	buf := make([]byte, bufSize)
 	n := 0
 	eof := false
-	msg := &Message{Source: name}
 
 	for {
 		select {
@@ -78,13 +77,16 @@ func (c *Copier) copySrc(name string, src io.Reader) {
 			// Break up the data that we've buffered up into lines, and log each in turn.
 			p := 0
 			for q := bytes.Index(buf[p:n], []byte{'\n'}); q >= 0; q = bytes.Index(buf[p:n], []byte{'\n'}) {
-				msg.Line = buf[p : p+q]
-				msg.Timestamp = time.Now().UTC()
-				msg.Partial = false
 				select {
 				case <-c.closed:
 					return
 				default:
+					msg := &Message{
+						Source:    name,
+						Timestamp: time.Now().UTC(),
+					}
+					msg.Line = append(msg.Line, buf[p:p+q]...)
+
 					if logErr := c.dst.Log(msg); logErr != nil {
 						logrus.Errorf("Failed to log msg %q for logger %s: %s", msg.Line, c.dst.Name(), logErr)
 					}
@@ -96,9 +98,14 @@ func (c *Copier) copySrc(name string, src io.Reader) {
 			// noting that it's a partial log line.
 			if eof || (p == 0 && n == len(buf)) {
 				if p < n {
-					msg.Line = buf[p:n]
-					msg.Timestamp = time.Now().UTC()
+					msg := &Message{
+						Source:    name,
+						Timestamp: time.Now().UTC(),
+						Partial:   true,
+					}
+					msg.Line = append(msg.Line, buf[p:n]...)
 					msg.Partial = true
+
 					if logErr := c.dst.Log(msg); logErr != nil {
 						logrus.Errorf("Failed to log msg %q for logger %s: %s", msg.Line, c.dst.Name(), logErr)
 					}

+ 32 - 1
daemon/logger/factory.go

@@ -3,6 +3,10 @@ package logger
 import (
 	"fmt"
 	"sync"
+
+	containertypes "github.com/docker/docker/api/types/container"
+	units "github.com/docker/go-units"
+	"github.com/pkg/errors"
 )
 
 // Creator builds a logging driver instance with given context.
@@ -85,6 +89,11 @@ func GetLogDriver(name string) (Creator, error) {
 	return factory.get(name)
 }
 
+var builtInLogOpts = map[string]bool{
+	"mode":            true,
+	"max-buffer-size": true,
+}
+
 // ValidateLogOpts checks the options for the given log driver. The
 // options supported are specific to the LogDriver implementation.
 func ValidateLogOpts(name string, cfg map[string]string) error {
@@ -92,13 +101,35 @@ func ValidateLogOpts(name string, cfg map[string]string) error {
 		return nil
 	}
 
+	switch containertypes.LogMode(cfg["mode"]) {
+	case containertypes.LogModeBlocking, containertypes.LogModeNonBlock, containertypes.LogModeUnset:
+	default:
+		return fmt.Errorf("logger: logging mode not supported: %s", cfg["mode"])
+	}
+
+	if s, ok := cfg["max-buffer-size"]; ok {
+		if containertypes.LogMode(cfg["mode"]) != containertypes.LogModeNonBlock {
+			return fmt.Errorf("logger: max-buffer-size option is only supported with 'mode=%s'", containertypes.LogModeNonBlock)
+		}
+		if _, err := units.RAMInBytes(s); err != nil {
+			return errors.Wrap(err, "error parsing option max-buffer-size")
+		}
+	}
+
 	if !factory.driverRegistered(name) {
 		return fmt.Errorf("logger: no log driver named '%s' is registered", name)
 	}
 
+	filteredOpts := make(map[string]string, len(builtInLogOpts))
+	for k, v := range cfg {
+		if !builtInLogOpts[k] {
+			filteredOpts[k] = v
+		}
+	}
+
 	validator := factory.getLogOptValidator(name)
 	if validator != nil {
-		return validator(cfg)
+		return validator(filteredOpts)
 	}
 	return nil
 }

+ 0 - 18
daemon/logger/logger.go

@@ -37,24 +37,6 @@ type Message struct {
 	Partial   bool
 }
 
-// CopyMessage creates a copy of the passed-in Message which will remain
-// unchanged if the original is changed.  Log drivers which buffer Messages
-// rather than dispatching them during their Log() method should use this
-// function to obtain a Message whose Line member's contents won't change.
-func CopyMessage(msg *Message) *Message {
-	m := new(Message)
-	m.Line = make([]byte, len(msg.Line))
-	copy(m.Line, msg.Line)
-	m.Source = msg.Source
-	m.Timestamp = msg.Timestamp
-	m.Partial = msg.Partial
-	m.Attrs = make(LogAttributes)
-	for k, v := range msg.Attrs {
-		m.Attrs[k] = v
-	}
-	return m
-}
-
 // LogAttributes is used to hold the extra attributes available in the log message
 // Primarily used for converting the map type to string and sorting.
 type LogAttributes map[string]string

+ 0 - 26
daemon/logger/logger_test.go

@@ -1,26 +0,0 @@
-package logger
-
-import (
-	"reflect"
-	"testing"
-	"time"
-)
-
-func TestCopyMessage(t *testing.T) {
-	msg := &Message{
-		Line:      []byte("test line."),
-		Source:    "stdout",
-		Timestamp: time.Now(),
-		Attrs: LogAttributes{
-			"key1": "val1",
-			"key2": "val2",
-			"key3": "val3",
-		},
-		Partial: true,
-	}
-
-	m := CopyMessage(msg)
-	if !reflect.DeepEqual(m, msg) {
-		t.Fatalf("CopyMessage failed to copy message")
-	}
-}

+ 210 - 0
daemon/logger/ring.go

@@ -0,0 +1,210 @@
+package logger
+
+import (
+	"errors"
+	"sync"
+	"sync/atomic"
+
+	"github.com/Sirupsen/logrus"
+)
+
+const (
+	defaultRingMaxSize = 1e6 // 1MB
+)
+
+// RingLogger is a ring buffer that implements the Logger interface.
+// This is used when lossy logging is OK.
+type RingLogger struct {
+	buffer    *messageRing
+	l         Logger
+	logInfo   Info
+	closeFlag int32
+}
+
+type ringWithReader struct {
+	*RingLogger
+}
+
+func (r *ringWithReader) ReadLogs(cfg ReadConfig) *LogWatcher {
+	reader, ok := r.l.(LogReader)
+	if !ok {
+		// something is wrong if we get here
+		panic("expected log reader")
+	}
+	return reader.ReadLogs(cfg)
+}
+
+func newRingLogger(driver Logger, logInfo Info, maxSize int64) *RingLogger {
+	l := &RingLogger{
+		buffer:  newRing(maxSize),
+		l:       driver,
+		logInfo: logInfo,
+	}
+	go l.run()
+	return l
+}
+
+// NewRingLogger creates a new Logger that is implemented as a RingBuffer wrapping
+// the passed in logger.
+func NewRingLogger(driver Logger, logInfo Info, maxSize int64) Logger {
+	if maxSize < 0 {
+		maxSize = defaultRingMaxSize
+	}
+	l := newRingLogger(driver, logInfo, maxSize)
+	if _, ok := driver.(LogReader); ok {
+		return &ringWithReader{l}
+	}
+	return l
+}
+
+// Log queues messages into the ring buffer
+func (r *RingLogger) Log(msg *Message) error {
+	if r.closed() {
+		return errClosed
+	}
+	return r.buffer.Enqueue(msg)
+}
+
+// Name returns the name of the underlying logger
+func (r *RingLogger) Name() string {
+	return r.l.Name()
+}
+
+func (r *RingLogger) closed() bool {
+	return atomic.LoadInt32(&r.closeFlag) == 1
+}
+
+func (r *RingLogger) setClosed() {
+	atomic.StoreInt32(&r.closeFlag, 1)
+}
+
+// Close closes the logger
+func (r *RingLogger) Close() error {
+	r.setClosed()
+	r.buffer.Close()
+	// empty out the queue
+	for _, msg := range r.buffer.Drain() {
+		if err := r.l.Log(msg); err != nil {
+			logrus.WithField("driver", r.l.Name()).WithField("container", r.logInfo.ContainerID).Errorf("Error writing log message: %v", r.l)
+			break
+		}
+	}
+	return r.l.Close()
+}
+
+// run consumes messages from the ring buffer and forwards them to the underling
+// logger.
+// This is run in a goroutine when the RingLogger is created
+func (r *RingLogger) run() {
+	for {
+		if r.closed() {
+			return
+		}
+		msg, err := r.buffer.Dequeue()
+		if err != nil {
+			// buffer is closed
+			return
+		}
+		if err := r.l.Log(msg); err != nil {
+			logrus.WithField("driver", r.l.Name()).WithField("container", r.logInfo.ContainerID).Errorf("Error writing log message: %v", r.l)
+		}
+	}
+}
+
+type messageRing struct {
+	mu sync.Mutex
+	// singals callers of `Dequeue` to wake up either on `Close` or when a new `Message` is added
+	wait *sync.Cond
+
+	sizeBytes int64 // current buffer size
+	maxBytes  int64 // max buffer size size
+	queue     []*Message
+	closed    bool
+}
+
+func newRing(maxBytes int64) *messageRing {
+	queueSize := 1000
+	if maxBytes == 0 || maxBytes == 1 {
+		// With 0 or 1 max byte size, the maximum size of the queue would only ever be 1
+		// message long.
+		queueSize = 1
+	}
+
+	r := &messageRing{queue: make([]*Message, 0, queueSize), maxBytes: maxBytes}
+	r.wait = sync.NewCond(&r.mu)
+	return r
+}
+
+// Enqueue adds a message to the buffer queue
+// If the message is too big for the buffer it drops the oldest messages to make room
+// If there are no messages in the queue and the message is still too big, it adds the message anyway.
+func (r *messageRing) Enqueue(m *Message) error {
+	mSize := int64(len(m.Line))
+
+	r.mu.Lock()
+	if r.closed {
+		r.mu.Unlock()
+		return errClosed
+	}
+	if mSize+r.sizeBytes > r.maxBytes && len(r.queue) > 0 {
+		r.wait.Signal()
+		r.mu.Unlock()
+		return nil
+	}
+
+	r.queue = append(r.queue, m)
+	r.sizeBytes += mSize
+	r.wait.Signal()
+	r.mu.Unlock()
+	return nil
+}
+
+// Dequeue pulls a message off the queue
+// If there are no messages, it waits for one.
+// If the buffer is closed, it will return immediately.
+func (r *messageRing) Dequeue() (*Message, error) {
+	r.mu.Lock()
+	for len(r.queue) == 0 && !r.closed {
+		r.wait.Wait()
+	}
+
+	if r.closed {
+		r.mu.Unlock()
+		return nil, errClosed
+	}
+
+	msg := r.queue[0]
+	r.queue = r.queue[1:]
+	r.sizeBytes -= int64(len(msg.Line))
+	r.mu.Unlock()
+	return msg, nil
+}
+
+var errClosed = errors.New("closed")
+
+// Close closes the buffer ensuring no new messages can be added.
+// Any callers waiting to dequeue a message will be woken up.
+func (r *messageRing) Close() {
+	r.mu.Lock()
+	if r.closed {
+		r.mu.Unlock()
+		return
+	}
+
+	r.closed = true
+	r.wait.Broadcast()
+	r.mu.Unlock()
+	return
+}
+
+// Drain drains all messages from the queue.
+// This can be used after `Close()` to get any remaining messages that were in queue.
+func (r *messageRing) Drain() []*Message {
+	r.mu.Lock()
+	ls := make([]*Message, 0, len(r.queue))
+	ls = append(ls, r.queue...)
+	r.sizeBytes = 0
+	r.queue = r.queue[:0]
+	r.mu.Unlock()
+	return ls
+}

+ 299 - 0
daemon/logger/ring_test.go

@@ -0,0 +1,299 @@
+package logger
+
+import (
+	"context"
+	"strconv"
+	"testing"
+	"time"
+)
+
+type mockLogger struct{ c chan *Message }
+
+func (l *mockLogger) Log(msg *Message) error {
+	l.c <- msg
+	return nil
+}
+
+func (l *mockLogger) Name() string {
+	return "mock"
+}
+
+func (l *mockLogger) Close() error {
+	return nil
+}
+
+func TestRingLogger(t *testing.T) {
+	mockLog := &mockLogger{make(chan *Message)} // no buffer on this channel
+	ring := newRingLogger(mockLog, Info{}, 1)
+	defer ring.setClosed()
+
+	// this should never block
+	ring.Log(&Message{Line: []byte("1")})
+	ring.Log(&Message{Line: []byte("2")})
+	ring.Log(&Message{Line: []byte("3")})
+
+	select {
+	case msg := <-mockLog.c:
+		if string(msg.Line) != "1" {
+			t.Fatalf("got unexpected msg: %q", string(msg.Line))
+		}
+	case <-time.After(100 * time.Millisecond):
+		t.Fatal("timeout reading log message")
+	}
+
+	select {
+	case msg := <-mockLog.c:
+		t.Fatalf("expected no more messages in the queue, got: %q", string(msg.Line))
+	default:
+	}
+}
+
+func TestRingCap(t *testing.T) {
+	r := newRing(5)
+	for i := 0; i < 10; i++ {
+		// queue messages with "0" to "10"
+		// the "5" to "10" messages should be dropped since we only allow 5 bytes in the buffer
+		if err := r.Enqueue(&Message{Line: []byte(strconv.Itoa(i))}); err != nil {
+			t.Fatal(err)
+		}
+	}
+
+	// should have messages in the queue for "5" to "10"
+	for i := 0; i < 5; i++ {
+		m, err := r.Dequeue()
+		if err != nil {
+			t.Fatal(err)
+		}
+		if string(m.Line) != strconv.Itoa(i) {
+			t.Fatalf("got unexpected message for iter %d: %s", i, string(m.Line))
+		}
+	}
+
+	// queue a message that's bigger than the buffer cap
+	if err := r.Enqueue(&Message{Line: []byte("hello world")}); err != nil {
+		t.Fatal(err)
+	}
+
+	// queue another message that's bigger than the buffer cap
+	if err := r.Enqueue(&Message{Line: []byte("eat a banana")}); err != nil {
+		t.Fatal(err)
+	}
+
+	m, err := r.Dequeue()
+	if err != nil {
+		t.Fatal(err)
+	}
+	if string(m.Line) != "hello world" {
+		t.Fatalf("got unexpected message: %s", string(m.Line))
+	}
+	if len(r.queue) != 0 {
+		t.Fatalf("expected queue to be empty, got: %d", len(r.queue))
+	}
+}
+
+func TestRingClose(t *testing.T) {
+	r := newRing(1)
+	if err := r.Enqueue(&Message{Line: []byte("hello")}); err != nil {
+		t.Fatal(err)
+	}
+	r.Close()
+	if err := r.Enqueue(&Message{}); err != errClosed {
+		t.Fatalf("expected errClosed, got: %v", err)
+	}
+	if len(r.queue) != 1 {
+		t.Fatal("expected empty queue")
+	}
+	if m, err := r.Dequeue(); err == nil || m != nil {
+		t.Fatal("exepcted err on Dequeue after close")
+	}
+
+	ls := r.Drain()
+	if len(ls) != 1 {
+		t.Fatalf("expected one message: %v", ls)
+	}
+	if string(ls[0].Line) != "hello" {
+		t.Fatalf("got unexpected message: %s", string(ls[0].Line))
+	}
+}
+
+func TestRingDrain(t *testing.T) {
+	r := newRing(5)
+	for i := 0; i < 5; i++ {
+		if err := r.Enqueue(&Message{Line: []byte(strconv.Itoa(i))}); err != nil {
+			t.Fatal(err)
+		}
+	}
+
+	ls := r.Drain()
+	if len(ls) != 5 {
+		t.Fatal("got unexpected length after drain")
+	}
+
+	for i := 0; i < 5; i++ {
+		if string(ls[i].Line) != strconv.Itoa(i) {
+			t.Fatalf("got unexpected message at position %d: %s", i, string(ls[i].Line))
+		}
+	}
+	if r.sizeBytes != 0 {
+		t.Fatalf("expected buffer size to be 0 after drain, got: %d", r.sizeBytes)
+	}
+
+	ls = r.Drain()
+	if len(ls) != 0 {
+		t.Fatalf("expected 0 messages on 2nd drain: %v", ls)
+	}
+
+}
+
+type nopLogger struct{}
+
+func (nopLogger) Name() string       { return "nopLogger" }
+func (nopLogger) Close() error       { return nil }
+func (nopLogger) Log(*Message) error { return nil }
+
+func BenchmarkRingLoggerThroughputNoReceiver(b *testing.B) {
+	mockLog := &mockLogger{make(chan *Message)}
+	defer mockLog.Close()
+	l := NewRingLogger(mockLog, Info{}, -1)
+	msg := &Message{Line: []byte("hello humans and everyone else!")}
+	b.SetBytes(int64(len(msg.Line)))
+
+	for i := 0; i < b.N; i++ {
+		if err := l.Log(msg); err != nil {
+			b.Fatal(err)
+		}
+	}
+}
+
+func BenchmarkRingLoggerThroughputWithReceiverDelay0(b *testing.B) {
+	l := NewRingLogger(nopLogger{}, Info{}, -1)
+	msg := &Message{Line: []byte("hello humans and everyone else!")}
+	b.SetBytes(int64(len(msg.Line)))
+
+	for i := 0; i < b.N; i++ {
+		if err := l.Log(msg); err != nil {
+			b.Fatal(err)
+		}
+	}
+}
+
+func consumeWithDelay(delay time.Duration, c <-chan *Message) (cancel func()) {
+	started := make(chan struct{})
+	ctx, cancel := context.WithCancel(context.Background())
+	go func() {
+		close(started)
+		ticker := time.NewTicker(delay)
+		for range ticker.C {
+			select {
+			case <-ctx.Done():
+				ticker.Stop()
+				return
+			case <-c:
+			}
+		}
+	}()
+	<-started
+	return cancel
+}
+
+func BenchmarkRingLoggerThroughputConsumeDelay1(b *testing.B) {
+	mockLog := &mockLogger{make(chan *Message)}
+	defer mockLog.Close()
+	l := NewRingLogger(mockLog, Info{}, -1)
+	msg := &Message{Line: []byte("hello humans and everyone else!")}
+	b.SetBytes(int64(len(msg.Line)))
+
+	cancel := consumeWithDelay(1*time.Millisecond, mockLog.c)
+	defer cancel()
+
+	for i := 0; i < b.N; i++ {
+		if err := l.Log(msg); err != nil {
+			b.Fatal(err)
+		}
+	}
+}
+
+func BenchmarkRingLoggerThroughputConsumeDelay10(b *testing.B) {
+	mockLog := &mockLogger{make(chan *Message)}
+	defer mockLog.Close()
+	l := NewRingLogger(mockLog, Info{}, -1)
+	msg := &Message{Line: []byte("hello humans and everyone else!")}
+	b.SetBytes(int64(len(msg.Line)))
+
+	cancel := consumeWithDelay(10*time.Millisecond, mockLog.c)
+	defer cancel()
+
+	for i := 0; i < b.N; i++ {
+		if err := l.Log(msg); err != nil {
+			b.Fatal(err)
+		}
+	}
+}
+
+func BenchmarkRingLoggerThroughputConsumeDelay50(b *testing.B) {
+	mockLog := &mockLogger{make(chan *Message)}
+	defer mockLog.Close()
+	l := NewRingLogger(mockLog, Info{}, -1)
+	msg := &Message{Line: []byte("hello humans and everyone else!")}
+	b.SetBytes(int64(len(msg.Line)))
+
+	cancel := consumeWithDelay(50*time.Millisecond, mockLog.c)
+	defer cancel()
+
+	for i := 0; i < b.N; i++ {
+		if err := l.Log(msg); err != nil {
+			b.Fatal(err)
+		}
+	}
+}
+
+func BenchmarkRingLoggerThroughputConsumeDelay100(b *testing.B) {
+	mockLog := &mockLogger{make(chan *Message)}
+	defer mockLog.Close()
+	l := NewRingLogger(mockLog, Info{}, -1)
+	msg := &Message{Line: []byte("hello humans and everyone else!")}
+	b.SetBytes(int64(len(msg.Line)))
+
+	cancel := consumeWithDelay(100*time.Millisecond, mockLog.c)
+	defer cancel()
+
+	for i := 0; i < b.N; i++ {
+		if err := l.Log(msg); err != nil {
+			b.Fatal(err)
+		}
+	}
+}
+
+func BenchmarkRingLoggerThroughputConsumeDelay300(b *testing.B) {
+	mockLog := &mockLogger{make(chan *Message)}
+	defer mockLog.Close()
+	l := NewRingLogger(mockLog, Info{}, -1)
+	msg := &Message{Line: []byte("hello humans and everyone else!")}
+	b.SetBytes(int64(len(msg.Line)))
+
+	cancel := consumeWithDelay(300*time.Millisecond, mockLog.c)
+	defer cancel()
+
+	for i := 0; i < b.N; i++ {
+		if err := l.Log(msg); err != nil {
+			b.Fatal(err)
+		}
+	}
+}
+
+func BenchmarkRingLoggerThroughputConsumeDelay500(b *testing.B) {
+	mockLog := &mockLogger{make(chan *Message)}
+	defer mockLog.Close()
+	l := NewRingLogger(mockLog, Info{}, -1)
+	msg := &Message{Line: []byte("hello humans and everyone else!")}
+	b.SetBytes(int64(len(msg.Line)))
+
+	cancel := consumeWithDelay(500*time.Millisecond, mockLog.c)
+	defer cancel()
+
+	for i := 0; i < b.N; i++ {
+		if err := l.Log(msg); err != nil {
+			b.Fatal(err)
+		}
+	}
+}

+ 20 - 20
daemon/logger/splunk/splunk_test.go

@@ -133,11 +133,11 @@ func TestDefault(t *testing.T) {
 	}
 
 	message1Time := time.Now()
-	if err := loggerDriver.Log(&logger.Message{[]byte("{\"a\":\"b\"}"), "stdout", message1Time, nil, false}); err != nil {
+	if err := loggerDriver.Log(&logger.Message{Line: []byte("{\"a\":\"b\"}"), Source: "stdout", Timestamp: message1Time}); err != nil {
 		t.Fatal(err)
 	}
 	message2Time := time.Now()
-	if err := loggerDriver.Log(&logger.Message{[]byte("notajson"), "stdout", message2Time, nil, false}); err != nil {
+	if err := loggerDriver.Log(&logger.Message{Line: []byte("notajson"), Source: "stdout", Timestamp: message2Time}); err != nil {
 		t.Fatal(err)
 	}
 
@@ -262,7 +262,7 @@ func TestInlineFormatWithNonDefaultOptions(t *testing.T) {
 	}
 
 	messageTime := time.Now()
-	if err := loggerDriver.Log(&logger.Message{[]byte("1"), "stdout", messageTime, nil, false}); err != nil {
+	if err := loggerDriver.Log(&logger.Message{Line: []byte("1"), Source: "stdout", Timestamp: messageTime}); err != nil {
 		t.Fatal(err)
 	}
 
@@ -361,11 +361,11 @@ func TestJsonFormat(t *testing.T) {
 	}
 
 	message1Time := time.Now()
-	if err := loggerDriver.Log(&logger.Message{[]byte("{\"a\":\"b\"}"), "stdout", message1Time, nil, false}); err != nil {
+	if err := loggerDriver.Log(&logger.Message{Line: []byte("{\"a\":\"b\"}"), Source: "stdout", Timestamp: message1Time}); err != nil {
 		t.Fatal(err)
 	}
 	message2Time := time.Now()
-	if err := loggerDriver.Log(&logger.Message{[]byte("notjson"), "stdout", message2Time, nil, false}); err != nil {
+	if err := loggerDriver.Log(&logger.Message{Line: []byte("notjson"), Source: "stdout", Timestamp: message2Time}); err != nil {
 		t.Fatal(err)
 	}
 
@@ -478,11 +478,11 @@ func TestRawFormat(t *testing.T) {
 	}
 
 	message1Time := time.Now()
-	if err := loggerDriver.Log(&logger.Message{[]byte("{\"a\":\"b\"}"), "stdout", message1Time, nil, false}); err != nil {
+	if err := loggerDriver.Log(&logger.Message{Line: []byte("{\"a\":\"b\"}"), Source: "stdout", Timestamp: message1Time}); err != nil {
 		t.Fatal(err)
 	}
 	message2Time := time.Now()
-	if err := loggerDriver.Log(&logger.Message{[]byte("notjson"), "stdout", message2Time, nil, false}); err != nil {
+	if err := loggerDriver.Log(&logger.Message{Line: []byte("notjson"), Source: "stdout", Timestamp: message2Time}); err != nil {
 		t.Fatal(err)
 	}
 
@@ -592,11 +592,11 @@ func TestRawFormatWithLabels(t *testing.T) {
 	}
 
 	message1Time := time.Now()
-	if err := loggerDriver.Log(&logger.Message{[]byte("{\"a\":\"b\"}"), "stdout", message1Time, nil, false}); err != nil {
+	if err := loggerDriver.Log(&logger.Message{Line: []byte("{\"a\":\"b\"}"), Source: "stdout", Timestamp: message1Time}); err != nil {
 		t.Fatal(err)
 	}
 	message2Time := time.Now()
-	if err := loggerDriver.Log(&logger.Message{[]byte("notjson"), "stdout", message2Time, nil, false}); err != nil {
+	if err := loggerDriver.Log(&logger.Message{Line: []byte("notjson"), Source: "stdout", Timestamp: message2Time}); err != nil {
 		t.Fatal(err)
 	}
 
@@ -705,11 +705,11 @@ func TestRawFormatWithoutTag(t *testing.T) {
 	}
 
 	message1Time := time.Now()
-	if err := loggerDriver.Log(&logger.Message{[]byte("{\"a\":\"b\"}"), "stdout", message1Time, nil, false}); err != nil {
+	if err := loggerDriver.Log(&logger.Message{Line: []byte("{\"a\":\"b\"}"), Source: "stdout", Timestamp: message1Time}); err != nil {
 		t.Fatal(err)
 	}
 	message2Time := time.Now()
-	if err := loggerDriver.Log(&logger.Message{[]byte("notjson"), "stdout", message2Time, nil, false}); err != nil {
+	if err := loggerDriver.Log(&logger.Message{Line: []byte("notjson"), Source: "stdout", Timestamp: message2Time}); err != nil {
 		t.Fatal(err)
 	}
 
@@ -790,7 +790,7 @@ func TestBatching(t *testing.T) {
 	}
 
 	for i := 0; i < defaultStreamChannelSize*4; i++ {
-		if err := loggerDriver.Log(&logger.Message{[]byte(fmt.Sprintf("%d", i)), "stdout", time.Now(), nil, false}); err != nil {
+		if err := loggerDriver.Log(&logger.Message{Line: []byte(fmt.Sprintf("%d", i)), Source: "stdout", Timestamp: time.Now()}); err != nil {
 			t.Fatal(err)
 		}
 	}
@@ -856,7 +856,7 @@ func TestFrequency(t *testing.T) {
 	}
 
 	for i := 0; i < 10; i++ {
-		if err := loggerDriver.Log(&logger.Message{[]byte(fmt.Sprintf("%d", i)), "stdout", time.Now(), nil, false}); err != nil {
+		if err := loggerDriver.Log(&logger.Message{Line: []byte(fmt.Sprintf("%d", i)), Source: "stdout", Timestamp: time.Now()}); err != nil {
 			t.Fatal(err)
 		}
 		time.Sleep(15 * time.Millisecond)
@@ -937,7 +937,7 @@ func TestOneMessagePerRequest(t *testing.T) {
 	}
 
 	for i := 0; i < 10; i++ {
-		if err := loggerDriver.Log(&logger.Message{[]byte(fmt.Sprintf("%d", i)), "stdout", time.Now(), nil, false}); err != nil {
+		if err := loggerDriver.Log(&logger.Message{Line: []byte(fmt.Sprintf("%d", i)), Source: "stdout", Timestamp: time.Now()}); err != nil {
 			t.Fatal(err)
 		}
 	}
@@ -1045,7 +1045,7 @@ func TestSkipVerify(t *testing.T) {
 	}
 
 	for i := 0; i < defaultStreamChannelSize*2; i++ {
-		if err := loggerDriver.Log(&logger.Message{[]byte(fmt.Sprintf("%d", i)), "stdout", time.Now(), nil, false}); err != nil {
+		if err := loggerDriver.Log(&logger.Message{Line: []byte(fmt.Sprintf("%d", i)), Source: "stdout", Timestamp: time.Now()}); err != nil {
 			t.Fatal(err)
 		}
 	}
@@ -1057,7 +1057,7 @@ func TestSkipVerify(t *testing.T) {
 	hec.simulateServerError = false
 
 	for i := defaultStreamChannelSize * 2; i < defaultStreamChannelSize*4; i++ {
-		if err := loggerDriver.Log(&logger.Message{[]byte(fmt.Sprintf("%d", i)), "stdout", time.Now(), nil, false}); err != nil {
+		if err := loggerDriver.Log(&logger.Message{Line: []byte(fmt.Sprintf("%d", i)), Source: "stdout", Timestamp: time.Now()}); err != nil {
 			t.Fatal(err)
 		}
 	}
@@ -1127,7 +1127,7 @@ func TestBufferMaximum(t *testing.T) {
 	}
 
 	for i := 0; i < 11; i++ {
-		if err := loggerDriver.Log(&logger.Message{[]byte(fmt.Sprintf("%d", i)), "stdout", time.Now(), nil, false}); err != nil {
+		if err := loggerDriver.Log(&logger.Message{Line: []byte(fmt.Sprintf("%d", i)), Source: "stdout", Timestamp: time.Now()}); err != nil {
 			t.Fatal(err)
 		}
 	}
@@ -1216,7 +1216,7 @@ func TestServerAlwaysDown(t *testing.T) {
 	}
 
 	for i := 0; i < 5; i++ {
-		if err := loggerDriver.Log(&logger.Message{[]byte(fmt.Sprintf("%d", i)), "stdout", time.Now(), nil, false}); err != nil {
+		if err := loggerDriver.Log(&logger.Message{Line: []byte(fmt.Sprintf("%d", i)), Source: "stdout", Timestamp: time.Now()}); err != nil {
 			t.Fatal(err)
 		}
 	}
@@ -1269,7 +1269,7 @@ func TestCannotSendAfterClose(t *testing.T) {
 		t.Fatal(err)
 	}
 
-	if err := loggerDriver.Log(&logger.Message{[]byte("message1"), "stdout", time.Now(), nil, false}); err != nil {
+	if err := loggerDriver.Log(&logger.Message{Line: []byte("message1"), Source: "stdout", Timestamp: time.Now()}); err != nil {
 		t.Fatal(err)
 	}
 
@@ -1278,7 +1278,7 @@ func TestCannotSendAfterClose(t *testing.T) {
 		t.Fatal(err)
 	}
 
-	if err := loggerDriver.Log(&logger.Message{[]byte("message2"), "stdout", time.Now(), nil, false}); err == nil {
+	if err := loggerDriver.Log(&logger.Message{Line: []byte("message2"), Source: "stdout", Timestamp: time.Now()}); err == nil {
 		t.Fatal("Driver should not allow to send messages after close")
 	}
 

+ 2 - 0
docs/api/version-history.md

@@ -82,6 +82,8 @@ keywords: "API, Docker, rcli, REST, documentation"
 * `GET /secrets/{id}` returns information on the secret `id`.
 * `POST /secrets/{id}/update` updates the secret `id`.
 * `POST /services/(id or name)/update` now accepts service name or prefix of service id as a parameter.
+* `POST /containers/create` added 2 built-in log-opts that work on all logging drivers,
+`mode` (`blocking`|`non-blocking`), and `max-buffer-size` (e.g. `2m`) which enables a non-blocking log buffer.
 
 ## v1.24 API changes