ソースを参照

Add awslogs driver for Amazon CloudWatch Logs

Signed-off-by: Samuel Karp <skarp@amazon.com>
Samuel Karp 10 年 前
コミット
3effe484e6

+ 4 - 0
contrib/completion/bash/docker

@@ -279,6 +279,7 @@ __docker_log_driver_options() {
 	local gelf_options="gelf-address gelf-tag"
 	local json_file_options="max-file max-size"
 	local syslog_options="syslog-address syslog-facility syslog-tag"
+	local awslogs_options="awslogs-region awslogs-group awslogs-stream"
 
 	case $(__docker_value_of_option --log-driver) in
 		'')
@@ -296,6 +297,9 @@ __docker_log_driver_options() {
 		syslog)
 			COMPREPLY=( $( compgen -W "$syslog_options" -S = -- "$cur" ) )
 			;;
+		awslogs)
+			COMPREPLY=( $( compgen -W "$awslogs_options" -S = -- "$cur" ) )
+			;;
 		*)
 			return
 			;;

+ 2 - 2
contrib/completion/zsh/_docker

@@ -238,7 +238,7 @@ __docker_subcommand() {
         "($help)--ipc=-[IPC namespace to use]:IPC namespace: "
         "($help)*--link=-[Add link to another container]:link:->link"
         "($help)*"{-l,--label=-}"[Set meta data on a container]:label: "
-        "($help)--log-driver=-[Default driver for container logs]:Logging driver:(json-file syslog journald gelf fluentd none)"
+        "($help)--log-driver=-[Default driver for container logs]:Logging driver:(json-file syslog journald gelf fluentd awslogs none)"
         "($help)*--log-opt=-[Log driver specific options]:log driver options: "
         "($help)*--lxc-conf=-[Add custom lxc options]:lxc options: "
         "($help)--mac-address=-[Container MAC address]:MAC address: "
@@ -617,7 +617,7 @@ _docker() {
         "($help)--ipv6[Enable IPv6 networking]" \
         "($help -l --log-level)"{-l,--log-level=-}"[Set the logging level]:level:(debug info warn error fatal)" \
         "($help)*--label=-[Set key=value labels to the daemon]:label: " \
-        "($help)--log-driver=-[Default driver for container logs]:Logging driver:(json-file syslog journald gelf fluentd none)" \
+        "($help)--log-driver=-[Default driver for container logs]:Logging driver:(json-file syslog journald gelf fluentd awslogs none)" \
         "($help)*--log-opt=-[Log driver specific options]:log driver options: " \
         "($help)--mtu=-[Set the containers network MTU]:mtu:(0 576 1420 1500 9000)" \
         "($help -p --pidfile)"{-p,--pidfile=-}"[Path to use for daemon PID file]:PID file:_files" \

+ 1 - 0
daemon/logdrivers_linux.go

@@ -3,6 +3,7 @@ package daemon
 import (
 	// Importing packages here only to make sure their init gets called and
 	// therefore they register themselves to the logdriver factory.
+	_ "github.com/docker/docker/daemon/logger/awslogs"
 	_ "github.com/docker/docker/daemon/logger/fluentd"
 	_ "github.com/docker/docker/daemon/logger/gelf"
 	_ "github.com/docker/docker/daemon/logger/journald"

+ 1 - 0
daemon/logdrivers_windows.go

@@ -3,5 +3,6 @@ package daemon
 import (
 	// Importing packages here only to make sure their init gets called and
 	// therefore they register themselves to the logdriver factory.
+	_ "github.com/docker/docker/daemon/logger/awslogs"
 	_ "github.com/docker/docker/daemon/logger/jsonfilelog"
 )

+ 326 - 0
daemon/logger/awslogs/cloudwatchlogs.go

@@ -0,0 +1,326 @@
+// Package awslogs provides the logdriver for forwarding container logs to Amazon CloudWatch Logs
+package awslogs
+
+import (
+	"fmt"
+	"os"
+	"sort"
+	"strings"
+	"sync"
+	"time"
+
+	"github.com/aws/aws-sdk-go/aws"
+	"github.com/aws/aws-sdk-go/aws/awserr"
+	"github.com/aws/aws-sdk-go/service/cloudwatchlogs"
+	"github.com/docker/docker/daemon/logger"
+	"github.com/docker/docker/vendor/src/github.com/Sirupsen/logrus"
+)
+
+const (
+	name                  = "awslogs"
+	regionKey             = "awslogs-region"
+	regionEnvKey          = "AWS_REGION"
+	logGroupKey           = "awslogs-group"
+	logStreamKey          = "awslogs-stream"
+	batchPublishFrequency = 5 * time.Second
+
+	// See: http://docs.aws.amazon.com/AmazonCloudWatchLogs/latest/APIReference/API_PutLogEvents.html
+	perEventBytes          = 26
+	maximumBytesPerPut     = 1048576
+	maximumLogEventsPerPut = 10000
+
+	// See: http://docs.aws.amazon.com/AmazonCloudWatch/latest/DeveloperGuide/cloudwatch_limits.html
+	maximumBytesPerEvent = 262144 - perEventBytes
+
+	resourceAlreadyExistsCode = "ResourceAlreadyExistsException"
+	dataAlreadyAcceptedCode   = "DataAlreadyAcceptedException"
+	invalidSequenceTokenCode  = "InvalidSequenceTokenException"
+)
+
+type logStream struct {
+	logStreamName string
+	logGroupName  string
+	client        api
+	messages      chan *logger.Message
+	lock          sync.RWMutex
+	closed        bool
+	sequenceToken *string
+}
+
+type api interface {
+	CreateLogStream(*cloudwatchlogs.CreateLogStreamInput) (*cloudwatchlogs.CreateLogStreamOutput, error)
+	PutLogEvents(*cloudwatchlogs.PutLogEventsInput) (*cloudwatchlogs.PutLogEventsOutput, error)
+}
+
+type byTimestamp []*cloudwatchlogs.InputLogEvent
+
+// init registers the awslogs driver and sets the default region, if provided
+func init() {
+	if os.Getenv(regionEnvKey) != "" {
+		aws.DefaultConfig.Region = aws.String(os.Getenv(regionEnvKey))
+	}
+	if err := logger.RegisterLogDriver(name, New); err != nil {
+		logrus.Fatal(err)
+	}
+	if err := logger.RegisterLogOptValidator(name, ValidateLogOpt); err != nil {
+		logrus.Fatal(err)
+	}
+}
+
+// New creates an awslogs logger using the configuration passed in on the
+// context.  Supported context configuration variables are awslogs-region,
+// awslogs-group, and awslogs-stream.  When available, configuration is
+// also taken from environment variables AWS_REGION, AWS_ACCESS_KEY_ID,
+// AWS_SECRET_ACCESS_KEY, the shared credentials file (~/.aws/credentials), and
+// the EC2 Instance Metadata Service.
+func New(ctx logger.Context) (logger.Logger, error) {
+	logGroupName := ctx.Config[logGroupKey]
+	logStreamName := ctx.ContainerID
+	if ctx.Config[logStreamKey] != "" {
+		logStreamName = ctx.Config[logStreamKey]
+	}
+	config := aws.DefaultConfig
+	if ctx.Config[regionKey] != "" {
+		config = aws.DefaultConfig.Merge(&aws.Config{
+			Region: aws.String(ctx.Config[regionKey]),
+		})
+	}
+	containerStream := &logStream{
+		logStreamName: logStreamName,
+		logGroupName:  logGroupName,
+		client:        cloudwatchlogs.New(config),
+		messages:      make(chan *logger.Message, 4096),
+	}
+	err := containerStream.create()
+	if err != nil {
+		return nil, err
+	}
+	go containerStream.collectBatch()
+
+	return containerStream, nil
+}
+
+// Name returns the name of the awslogs logging driver
+func (l *logStream) Name() string {
+	return name
+}
+
+// Log submits messages for logging by an instance of the awslogs logging driver
+func (l *logStream) Log(msg *logger.Message) error {
+	l.lock.RLock()
+	defer l.lock.RUnlock()
+	if !l.closed {
+		l.messages <- msg
+	}
+	return nil
+}
+
+// Close closes the instance of the awslogs logging driver
+func (l *logStream) Close() error {
+	l.lock.Lock()
+	defer l.lock.Unlock()
+	if !l.closed {
+		close(l.messages)
+	}
+	l.closed = true
+	return nil
+}
+
+// create creates a log stream for the instance of the awslogs logging driver
+func (l *logStream) create() error {
+	input := &cloudwatchlogs.CreateLogStreamInput{
+		LogGroupName:  aws.String(l.logGroupName),
+		LogStreamName: aws.String(l.logStreamName),
+	}
+
+	_, err := l.client.CreateLogStream(input)
+
+	if err != nil {
+		if awsErr, ok := err.(awserr.Error); ok {
+			fields := logrus.Fields{
+				"errorCode":     awsErr.Code(),
+				"message":       awsErr.Message(),
+				"origError":     awsErr.OrigErr(),
+				"logGroupName":  l.logGroupName,
+				"logStreamName": l.logStreamName,
+			}
+			if awsErr.Code() == resourceAlreadyExistsCode {
+				// Allow creation to succeed
+				logrus.WithFields(fields).Info("Log stream already exists")
+				return nil
+			}
+			logrus.WithFields(fields).Error("Failed to create log stream")
+		}
+	}
+	return err
+}
+
+// newTicker is used for time-based batching.  newTicker is a variable such
+// that the implementation can be swapped out for unit tests.
+var newTicker = func(freq time.Duration) *time.Ticker {
+	return time.NewTicker(freq)
+}
+
+// collectBatch executes as a goroutine to perform batching of log events for
+// submission to the log stream.  Batching is performed on time- and size-
+// bases.  Time-based batching occurs at a 5 second interval (defined in the
+// batchPublishFrequency const).  Size-based batching is performed on the
+// maximum number of events per batch (defined in maximumLogEventsPerPut) and
+// the maximum number of total bytes in a batch (defined in
+// maximumBytesPerPut).  Log messages are split by the maximum bytes per event
+// (defined in maximumBytesPerEvent).  There is a fixed per-event byte overhead
+// (defined in perEventBytes) which is accounted for in split- and batch-
+// calculations.
+func (l *logStream) collectBatch() {
+	timer := newTicker(batchPublishFrequency)
+	var events []*cloudwatchlogs.InputLogEvent
+	bytes := 0
+	for {
+		select {
+		case <-timer.C:
+			l.publishBatch(events)
+			events = events[:0]
+			bytes = 0
+		case msg, more := <-l.messages:
+			if !more {
+				l.publishBatch(events)
+				return
+			}
+			unprocessedLine := msg.Line
+			for len(unprocessedLine) > 0 {
+				// Split line length so it does not exceed the maximum
+				lineBytes := len(unprocessedLine)
+				if lineBytes > maximumBytesPerEvent {
+					lineBytes = maximumBytesPerEvent
+				}
+				line := unprocessedLine[:lineBytes]
+				unprocessedLine = unprocessedLine[lineBytes:]
+				if (len(events) >= maximumLogEventsPerPut) || (bytes+lineBytes+perEventBytes > maximumBytesPerPut) {
+					// Publish an existing batch if it's already over the maximum number of events or if adding this
+					// event would push it over the maximum number of total bytes.
+					l.publishBatch(events)
+					events = events[:0]
+					bytes = 0
+				}
+				events = append(events, &cloudwatchlogs.InputLogEvent{
+					Message:   aws.String(string(line)),
+					Timestamp: aws.Int64(msg.Timestamp.UnixNano() / int64(time.Millisecond)),
+				})
+				bytes += (lineBytes + perEventBytes)
+			}
+		}
+	}
+}
+
+// publishBatch calls PutLogEvents for a given set of InputLogEvents,
+// accounting for sequencing requirements (each request must reference the
+// sequence token returned by the previous request).
+func (l *logStream) publishBatch(events []*cloudwatchlogs.InputLogEvent) {
+	if len(events) == 0 {
+		return
+	}
+
+	sort.Sort(byTimestamp(events))
+
+	nextSequenceToken, err := l.putLogEvents(events, l.sequenceToken)
+
+	if err != nil {
+		if awsErr, ok := err.(awserr.Error); ok {
+			if awsErr.Code() == dataAlreadyAcceptedCode {
+				// already submitted, just grab the correct sequence token
+				parts := strings.Split(awsErr.Message(), " ")
+				nextSequenceToken = &parts[len(parts)-1]
+				logrus.WithFields(logrus.Fields{
+					"errorCode":     awsErr.Code(),
+					"message":       awsErr.Message(),
+					"logGroupName":  l.logGroupName,
+					"logStreamName": l.logStreamName,
+				}).Info("Data already accepted, ignoring error")
+				err = nil
+			} else if awsErr.Code() == invalidSequenceTokenCode {
+				// sequence code is bad, grab the correct one and retry
+				parts := strings.Split(awsErr.Message(), " ")
+				token := parts[len(parts)-1]
+				nextSequenceToken, err = l.putLogEvents(events, &token)
+			}
+		}
+	}
+	if err != nil {
+		logrus.Error(err)
+	} else {
+		l.sequenceToken = nextSequenceToken
+	}
+}
+
+// putLogEvents wraps the PutLogEvents API
+func (l *logStream) putLogEvents(events []*cloudwatchlogs.InputLogEvent, sequenceToken *string) (*string, error) {
+	input := &cloudwatchlogs.PutLogEventsInput{
+		LogEvents:     events,
+		SequenceToken: sequenceToken,
+		LogGroupName:  aws.String(l.logGroupName),
+		LogStreamName: aws.String(l.logStreamName),
+	}
+	resp, err := l.client.PutLogEvents(input)
+	if err != nil {
+		if awsErr, ok := err.(awserr.Error); ok {
+			logrus.WithFields(logrus.Fields{
+				"errorCode":     awsErr.Code(),
+				"message":       awsErr.Message(),
+				"origError":     awsErr.OrigErr(),
+				"logGroupName":  l.logGroupName,
+				"logStreamName": l.logStreamName,
+			}).Error("Failed to put log events")
+		}
+		return nil, err
+	}
+	return resp.NextSequenceToken, nil
+}
+
+// ValidateLogOpt looks for awslogs-specific log options awslogs-region,
+// awslogs-group, and awslogs-stream
+func ValidateLogOpt(cfg map[string]string) error {
+	for key := range cfg {
+		switch key {
+		case logGroupKey:
+		case logStreamKey:
+		case regionKey:
+		default:
+			return fmt.Errorf("unknown log opt '%s' for %s log driver", key, name)
+		}
+	}
+	if cfg[logGroupKey] == "" {
+		return fmt.Errorf("must specify a value for log opt '%s'", logGroupKey)
+	}
+	if cfg[regionKey] == "" && os.Getenv(regionEnvKey) == "" {
+		return fmt.Errorf(
+			"must specify a value for environment variable '%s' or log opt '%s'",
+			regionEnvKey,
+			regionKey)
+	}
+	return nil
+}
+
+// Len returns the length of a byTimestamp slice.  Len is required by the
+// sort.Interface interface.
+func (slice byTimestamp) Len() int {
+	return len(slice)
+}
+
+// Less compares two values in a byTimestamp slice by Timestamp.  Less is
+// required by the sort.Interface interface.
+func (slice byTimestamp) Less(i, j int) bool {
+	iTimestamp, jTimestamp := int64(0), int64(0)
+	if slice != nil && slice[i].Timestamp != nil {
+		iTimestamp = *slice[i].Timestamp
+	}
+	if slice != nil && slice[j].Timestamp != nil {
+		jTimestamp = *slice[j].Timestamp
+	}
+	return iTimestamp < jTimestamp
+}
+
+// Swap swaps two values in a byTimestamp slice with each other.  Swap is
+// required by the sort.Interface interface.
+func (slice byTimestamp) Swap(i, j int) {
+	slice[i], slice[j] = slice[j], slice[i]
+}

+ 572 - 0
daemon/logger/awslogs/cloudwatchlogs_test.go

@@ -0,0 +1,572 @@
+package awslogs
+
+import (
+	"errors"
+	"strings"
+	"testing"
+	"time"
+
+	"github.com/aws/aws-sdk-go/aws"
+	"github.com/aws/aws-sdk-go/aws/awserr"
+	"github.com/aws/aws-sdk-go/service/cloudwatchlogs"
+	"github.com/docker/docker/daemon/logger"
+)
+
+const (
+	groupName         = "groupName"
+	streamName        = "streamName"
+	sequenceToken     = "sequenceToken"
+	nextSequenceToken = "nextSequenceToken"
+	logline           = "this is a log line"
+)
+
+func TestCreateSuccess(t *testing.T) {
+	mockClient := newMockClient()
+	stream := &logStream{
+		client:        mockClient,
+		logGroupName:  groupName,
+		logStreamName: streamName,
+	}
+	mockClient.createLogStreamResult <- &createLogStreamResult{}
+
+	err := stream.create()
+
+	if err != nil {
+		t.Errorf("Received unexpected err: %v\n", err)
+	}
+	argument := <-mockClient.createLogStreamArgument
+	if argument.LogGroupName == nil {
+		t.Fatal("Expected non-nil LogGroupName")
+	}
+	if *argument.LogGroupName != groupName {
+		t.Errorf("Expected LogGroupName to be %s", groupName)
+	}
+	if argument.LogStreamName == nil {
+		t.Fatal("Expected non-nil LogGroupName")
+	}
+	if *argument.LogStreamName != streamName {
+		t.Errorf("Expected LogStreamName to be %s", streamName)
+	}
+}
+
+func TestCreateError(t *testing.T) {
+	mockClient := newMockClient()
+	stream := &logStream{
+		client: mockClient,
+	}
+	mockClient.createLogStreamResult <- &createLogStreamResult{
+		errorResult: errors.New("Error!"),
+	}
+
+	err := stream.create()
+
+	if err == nil {
+		t.Fatal("Expected non-nil err")
+	}
+}
+
+func TestCreateAlreadyExists(t *testing.T) {
+	mockClient := newMockClient()
+	stream := &logStream{
+		client: mockClient,
+	}
+	mockClient.createLogStreamResult <- &createLogStreamResult{
+		errorResult: awserr.New(resourceAlreadyExistsCode, "", nil),
+	}
+
+	err := stream.create()
+
+	if err != nil {
+		t.Fatal("Expected nil err")
+	}
+}
+
+func TestPublishBatchSuccess(t *testing.T) {
+	mockClient := newMockClient()
+	stream := &logStream{
+		client:        mockClient,
+		logGroupName:  groupName,
+		logStreamName: streamName,
+		sequenceToken: aws.String(sequenceToken),
+	}
+	mockClient.putLogEventsResult <- &putLogEventsResult{
+		successResult: &cloudwatchlogs.PutLogEventsOutput{
+			NextSequenceToken: aws.String(nextSequenceToken),
+		},
+	}
+
+	events := []*cloudwatchlogs.InputLogEvent{
+		{
+			Message: aws.String(logline),
+		},
+	}
+
+	stream.publishBatch(events)
+	if stream.sequenceToken == nil {
+		t.Fatal("Expected non-nil sequenceToken")
+	}
+	if *stream.sequenceToken != nextSequenceToken {
+		t.Errorf("Expected sequenceToken to be %s, but was %s", nextSequenceToken, *stream.sequenceToken)
+	}
+	argument := <-mockClient.putLogEventsArgument
+	if argument == nil {
+		t.Fatal("Expected non-nil PutLogEventsInput")
+	}
+	if argument.SequenceToken == nil {
+		t.Fatal("Expected non-nil PutLogEventsInput.SequenceToken")
+	}
+	if *argument.SequenceToken != sequenceToken {
+		t.Errorf("Expected PutLogEventsInput.SequenceToken to be %s, but was %s", sequenceToken, *argument.SequenceToken)
+	}
+	if len(argument.LogEvents) != 1 {
+		t.Errorf("Expected LogEvents to contain 1 element, but contains %d", len(argument.LogEvents))
+	}
+	if argument.LogEvents[0] != events[0] {
+		t.Error("Expected event to equal input")
+	}
+}
+
+func TestPublishBatchError(t *testing.T) {
+	mockClient := newMockClient()
+	stream := &logStream{
+		client:        mockClient,
+		logGroupName:  groupName,
+		logStreamName: streamName,
+		sequenceToken: aws.String(sequenceToken),
+	}
+	mockClient.putLogEventsResult <- &putLogEventsResult{
+		errorResult: errors.New("Error!"),
+	}
+
+	events := []*cloudwatchlogs.InputLogEvent{
+		{
+			Message: aws.String(logline),
+		},
+	}
+
+	stream.publishBatch(events)
+	if stream.sequenceToken == nil {
+		t.Fatal("Expected non-nil sequenceToken")
+	}
+	if *stream.sequenceToken != sequenceToken {
+		t.Errorf("Expected sequenceToken to be %s, but was %s", sequenceToken, *stream.sequenceToken)
+	}
+}
+
+func TestPublishBatchInvalidSeqSuccess(t *testing.T) {
+	mockClient := newMockClientBuffered(2)
+	stream := &logStream{
+		client:        mockClient,
+		logGroupName:  groupName,
+		logStreamName: streamName,
+		sequenceToken: aws.String(sequenceToken),
+	}
+	mockClient.putLogEventsResult <- &putLogEventsResult{
+		errorResult: awserr.New(invalidSequenceTokenCode, "use token token", nil),
+	}
+	mockClient.putLogEventsResult <- &putLogEventsResult{
+		successResult: &cloudwatchlogs.PutLogEventsOutput{
+			NextSequenceToken: aws.String(nextSequenceToken),
+		},
+	}
+
+	events := []*cloudwatchlogs.InputLogEvent{
+		{
+			Message: aws.String(logline),
+		},
+	}
+
+	stream.publishBatch(events)
+	if stream.sequenceToken == nil {
+		t.Fatal("Expected non-nil sequenceToken")
+	}
+	if *stream.sequenceToken != nextSequenceToken {
+		t.Errorf("Expected sequenceToken to be %s, but was %s", nextSequenceToken, *stream.sequenceToken)
+	}
+
+	argument := <-mockClient.putLogEventsArgument
+	if argument == nil {
+		t.Fatal("Expected non-nil PutLogEventsInput")
+	}
+	if argument.SequenceToken == nil {
+		t.Fatal("Expected non-nil PutLogEventsInput.SequenceToken")
+	}
+	if *argument.SequenceToken != sequenceToken {
+		t.Errorf("Expected PutLogEventsInput.SequenceToken to be %s, but was %s", sequenceToken, *argument.SequenceToken)
+	}
+	if len(argument.LogEvents) != 1 {
+		t.Errorf("Expected LogEvents to contain 1 element, but contains %d", len(argument.LogEvents))
+	}
+	if argument.LogEvents[0] != events[0] {
+		t.Error("Expected event to equal input")
+	}
+
+	argument = <-mockClient.putLogEventsArgument
+	if argument == nil {
+		t.Fatal("Expected non-nil PutLogEventsInput")
+	}
+	if argument.SequenceToken == nil {
+		t.Fatal("Expected non-nil PutLogEventsInput.SequenceToken")
+	}
+	if *argument.SequenceToken != "token" {
+		t.Errorf("Expected PutLogEventsInput.SequenceToken to be %s, but was %s", "token", *argument.SequenceToken)
+	}
+	if len(argument.LogEvents) != 1 {
+		t.Errorf("Expected LogEvents to contain 1 element, but contains %d", len(argument.LogEvents))
+	}
+	if argument.LogEvents[0] != events[0] {
+		t.Error("Expected event to equal input")
+	}
+}
+
+func TestPublishBatchAlreadyAccepted(t *testing.T) {
+	mockClient := newMockClient()
+	stream := &logStream{
+		client:        mockClient,
+		logGroupName:  groupName,
+		logStreamName: streamName,
+		sequenceToken: aws.String(sequenceToken),
+	}
+	mockClient.putLogEventsResult <- &putLogEventsResult{
+		errorResult: awserr.New(dataAlreadyAcceptedCode, "use token token", nil),
+	}
+
+	events := []*cloudwatchlogs.InputLogEvent{
+		{
+			Message: aws.String(logline),
+		},
+	}
+
+	stream.publishBatch(events)
+	if stream.sequenceToken == nil {
+		t.Fatal("Expected non-nil sequenceToken")
+	}
+	if *stream.sequenceToken != "token" {
+		t.Errorf("Expected sequenceToken to be %s, but was %s", "token", *stream.sequenceToken)
+	}
+
+	argument := <-mockClient.putLogEventsArgument
+	if argument == nil {
+		t.Fatal("Expected non-nil PutLogEventsInput")
+	}
+	if argument.SequenceToken == nil {
+		t.Fatal("Expected non-nil PutLogEventsInput.SequenceToken")
+	}
+	if *argument.SequenceToken != sequenceToken {
+		t.Errorf("Expected PutLogEventsInput.SequenceToken to be %s, but was %s", sequenceToken, *argument.SequenceToken)
+	}
+	if len(argument.LogEvents) != 1 {
+		t.Errorf("Expected LogEvents to contain 1 element, but contains %d", len(argument.LogEvents))
+	}
+	if argument.LogEvents[0] != events[0] {
+		t.Error("Expected event to equal input")
+	}
+}
+
+func TestCollectBatchSimple(t *testing.T) {
+	mockClient := newMockClient()
+	stream := &logStream{
+		client:        mockClient,
+		logGroupName:  groupName,
+		logStreamName: streamName,
+		sequenceToken: aws.String(sequenceToken),
+		messages:      make(chan *logger.Message),
+	}
+	mockClient.putLogEventsResult <- &putLogEventsResult{
+		successResult: &cloudwatchlogs.PutLogEventsOutput{
+			NextSequenceToken: aws.String(nextSequenceToken),
+		},
+	}
+	ticks := make(chan time.Time)
+	newTicker = func(_ time.Duration) *time.Ticker {
+		return &time.Ticker{
+			C: ticks,
+		}
+	}
+
+	go stream.collectBatch()
+
+	stream.Log(&logger.Message{
+		Line:      []byte(logline),
+		Timestamp: time.Time{},
+	})
+
+	ticks <- time.Time{}
+	stream.Close()
+
+	argument := <-mockClient.putLogEventsArgument
+	if argument == nil {
+		t.Fatal("Expected non-nil PutLogEventsInput")
+	}
+	if len(argument.LogEvents) != 1 {
+		t.Errorf("Expected LogEvents to contain 1 element, but contains %d", len(argument.LogEvents))
+	}
+	if *argument.LogEvents[0].Message != logline {
+		t.Errorf("Expected message to be %s but was %s", logline, *argument.LogEvents[0].Message)
+	}
+}
+
+func TestCollectBatchTicker(t *testing.T) {
+	mockClient := newMockClient()
+	stream := &logStream{
+		client:        mockClient,
+		logGroupName:  groupName,
+		logStreamName: streamName,
+		sequenceToken: aws.String(sequenceToken),
+		messages:      make(chan *logger.Message),
+	}
+	mockClient.putLogEventsResult <- &putLogEventsResult{
+		successResult: &cloudwatchlogs.PutLogEventsOutput{
+			NextSequenceToken: aws.String(nextSequenceToken),
+		},
+	}
+	ticks := make(chan time.Time)
+	newTicker = func(_ time.Duration) *time.Ticker {
+		return &time.Ticker{
+			C: ticks,
+		}
+	}
+
+	go stream.collectBatch()
+
+	stream.Log(&logger.Message{
+		Line:      []byte(logline + " 1"),
+		Timestamp: time.Time{},
+	})
+	stream.Log(&logger.Message{
+		Line:      []byte(logline + " 2"),
+		Timestamp: time.Time{},
+	})
+
+	ticks <- time.Time{}
+
+	// Verify first batch
+	argument := <-mockClient.putLogEventsArgument
+	if argument == nil {
+		t.Fatal("Expected non-nil PutLogEventsInput")
+	}
+	if len(argument.LogEvents) != 2 {
+		t.Errorf("Expected LogEvents to contain 2 elements, but contains %d", len(argument.LogEvents))
+	}
+	if *argument.LogEvents[0].Message != logline+" 1" {
+		t.Errorf("Expected message to be %s but was %s", logline+" 1", *argument.LogEvents[0].Message)
+	}
+	if *argument.LogEvents[1].Message != logline+" 2" {
+		t.Errorf("Expected message to be %s but was %s", logline+" 2", *argument.LogEvents[0].Message)
+	}
+
+	stream.Log(&logger.Message{
+		Line:      []byte(logline + " 3"),
+		Timestamp: time.Time{},
+	})
+
+	ticks <- time.Time{}
+	argument = <-mockClient.putLogEventsArgument
+	if argument == nil {
+		t.Fatal("Expected non-nil PutLogEventsInput")
+	}
+	if len(argument.LogEvents) != 1 {
+		t.Errorf("Expected LogEvents to contain 1 elements, but contains %d", len(argument.LogEvents))
+	}
+	if *argument.LogEvents[0].Message != logline+" 3" {
+		t.Errorf("Expected message to be %s but was %s", logline+" 3", *argument.LogEvents[0].Message)
+	}
+
+	stream.Close()
+
+}
+
+func TestCollectBatchClose(t *testing.T) {
+	mockClient := newMockClient()
+	stream := &logStream{
+		client:        mockClient,
+		logGroupName:  groupName,
+		logStreamName: streamName,
+		sequenceToken: aws.String(sequenceToken),
+		messages:      make(chan *logger.Message),
+	}
+	mockClient.putLogEventsResult <- &putLogEventsResult{
+		successResult: &cloudwatchlogs.PutLogEventsOutput{
+			NextSequenceToken: aws.String(nextSequenceToken),
+		},
+	}
+	var ticks = make(chan time.Time)
+	newTicker = func(_ time.Duration) *time.Ticker {
+		return &time.Ticker{
+			C: ticks,
+		}
+	}
+
+	go stream.collectBatch()
+
+	stream.Log(&logger.Message{
+		Line:      []byte(logline),
+		Timestamp: time.Time{},
+	})
+
+	// no ticks
+	stream.Close()
+
+	argument := <-mockClient.putLogEventsArgument
+	if argument == nil {
+		t.Fatal("Expected non-nil PutLogEventsInput")
+	}
+	if len(argument.LogEvents) != 1 {
+		t.Errorf("Expected LogEvents to contain 1 element, but contains %d", len(argument.LogEvents))
+	}
+	if *argument.LogEvents[0].Message != logline {
+		t.Errorf("Expected message to be %s but was %s", logline, *argument.LogEvents[0].Message)
+	}
+}
+
+func TestCollectBatchLineSplit(t *testing.T) {
+	mockClient := newMockClient()
+	stream := &logStream{
+		client:        mockClient,
+		logGroupName:  groupName,
+		logStreamName: streamName,
+		sequenceToken: aws.String(sequenceToken),
+		messages:      make(chan *logger.Message),
+	}
+	mockClient.putLogEventsResult <- &putLogEventsResult{
+		successResult: &cloudwatchlogs.PutLogEventsOutput{
+			NextSequenceToken: aws.String(nextSequenceToken),
+		},
+	}
+	var ticks = make(chan time.Time)
+	newTicker = func(_ time.Duration) *time.Ticker {
+		return &time.Ticker{
+			C: ticks,
+		}
+	}
+
+	go stream.collectBatch()
+
+	longline := strings.Repeat("A", maximumBytesPerEvent)
+	stream.Log(&logger.Message{
+		Line:      []byte(longline + "B"),
+		Timestamp: time.Time{},
+	})
+
+	// no ticks
+	stream.Close()
+
+	argument := <-mockClient.putLogEventsArgument
+	if argument == nil {
+		t.Fatal("Expected non-nil PutLogEventsInput")
+	}
+	if len(argument.LogEvents) != 2 {
+		t.Errorf("Expected LogEvents to contain 2 elements, but contains %d", len(argument.LogEvents))
+	}
+	if *argument.LogEvents[0].Message != longline {
+		t.Errorf("Expected message to be %s but was %s", longline, *argument.LogEvents[0].Message)
+	}
+	if *argument.LogEvents[1].Message != "B" {
+		t.Errorf("Expected message to be %s but was %s", "B", *argument.LogEvents[1].Message)
+	}
+}
+
+func TestCollectBatchMaxEvents(t *testing.T) {
+	mockClient := newMockClientBuffered(1)
+	stream := &logStream{
+		client:        mockClient,
+		logGroupName:  groupName,
+		logStreamName: streamName,
+		sequenceToken: aws.String(sequenceToken),
+		messages:      make(chan *logger.Message),
+	}
+	mockClient.putLogEventsResult <- &putLogEventsResult{
+		successResult: &cloudwatchlogs.PutLogEventsOutput{
+			NextSequenceToken: aws.String(nextSequenceToken),
+		},
+	}
+	var ticks = make(chan time.Time)
+	newTicker = func(_ time.Duration) *time.Ticker {
+		return &time.Ticker{
+			C: ticks,
+		}
+	}
+
+	go stream.collectBatch()
+
+	line := "A"
+	for i := 0; i <= maximumLogEventsPerPut; i++ {
+		stream.Log(&logger.Message{
+			Line:      []byte(line),
+			Timestamp: time.Time{},
+		})
+	}
+
+	// no ticks
+	stream.Close()
+
+	argument := <-mockClient.putLogEventsArgument
+	if argument == nil {
+		t.Fatal("Expected non-nil PutLogEventsInput")
+	}
+	if len(argument.LogEvents) != maximumLogEventsPerPut {
+		t.Errorf("Expected LogEvents to contain %d elements, but contains %d", maximumLogEventsPerPut, len(argument.LogEvents))
+	}
+
+	argument = <-mockClient.putLogEventsArgument
+	if argument == nil {
+		t.Fatal("Expected non-nil PutLogEventsInput")
+	}
+	if len(argument.LogEvents) != 1 {
+		t.Errorf("Expected LogEvents to contain %d elements, but contains %d", 1, len(argument.LogEvents))
+	}
+}
+
+func TestCollectBatchMaxTotalBytes(t *testing.T) {
+	mockClient := newMockClientBuffered(1)
+	stream := &logStream{
+		client:        mockClient,
+		logGroupName:  groupName,
+		logStreamName: streamName,
+		sequenceToken: aws.String(sequenceToken),
+		messages:      make(chan *logger.Message),
+	}
+	mockClient.putLogEventsResult <- &putLogEventsResult{
+		successResult: &cloudwatchlogs.PutLogEventsOutput{
+			NextSequenceToken: aws.String(nextSequenceToken),
+		},
+	}
+	var ticks = make(chan time.Time)
+	newTicker = func(_ time.Duration) *time.Ticker {
+		return &time.Ticker{
+			C: ticks,
+		}
+	}
+
+	go stream.collectBatch()
+
+	longline := strings.Repeat("A", maximumBytesPerPut)
+	stream.Log(&logger.Message{
+		Line:      []byte(longline + "B"),
+		Timestamp: time.Time{},
+	})
+
+	// no ticks
+	stream.Close()
+
+	argument := <-mockClient.putLogEventsArgument
+	if argument == nil {
+		t.Fatal("Expected non-nil PutLogEventsInput")
+	}
+	bytes := 0
+	for _, event := range argument.LogEvents {
+		bytes += len(*event.Message)
+	}
+	if bytes > maximumBytesPerPut {
+		t.Errorf("Expected <= %d bytes but was %d", maximumBytesPerPut, bytes)
+	}
+
+	argument = <-mockClient.putLogEventsArgument
+	if len(argument.LogEvents) != 1 {
+		t.Errorf("Expected LogEvents to contain 1 elements, but contains %d", len(argument.LogEvents))
+	}
+	message := *argument.LogEvents[0].Message
+	if message[len(message)-1:] != "B" {
+		t.Errorf("Expected message to be %s but was %s", "B", message[len(message)-1:])
+	}
+}

+ 56 - 0
daemon/logger/awslogs/cwlogsiface_mock_test.go

@@ -0,0 +1,56 @@
+package awslogs
+
+import "github.com/aws/aws-sdk-go/service/cloudwatchlogs"
+
+type mockcwlogsclient struct {
+	createLogStreamArgument chan *cloudwatchlogs.CreateLogStreamInput
+	createLogStreamResult   chan *createLogStreamResult
+	putLogEventsArgument    chan *cloudwatchlogs.PutLogEventsInput
+	putLogEventsResult      chan *putLogEventsResult
+}
+
+type createLogStreamResult struct {
+	successResult *cloudwatchlogs.CreateLogStreamOutput
+	errorResult   error
+}
+
+type putLogEventsResult struct {
+	successResult *cloudwatchlogs.PutLogEventsOutput
+	errorResult   error
+}
+
+func newMockClient() *mockcwlogsclient {
+	return &mockcwlogsclient{
+		createLogStreamArgument: make(chan *cloudwatchlogs.CreateLogStreamInput, 1),
+		createLogStreamResult:   make(chan *createLogStreamResult, 1),
+		putLogEventsArgument:    make(chan *cloudwatchlogs.PutLogEventsInput, 1),
+		putLogEventsResult:      make(chan *putLogEventsResult, 1),
+	}
+}
+
+func newMockClientBuffered(buflen int) *mockcwlogsclient {
+	return &mockcwlogsclient{
+		createLogStreamArgument: make(chan *cloudwatchlogs.CreateLogStreamInput, buflen),
+		createLogStreamResult:   make(chan *createLogStreamResult, buflen),
+		putLogEventsArgument:    make(chan *cloudwatchlogs.PutLogEventsInput, buflen),
+		putLogEventsResult:      make(chan *putLogEventsResult, buflen),
+	}
+}
+
+func (m *mockcwlogsclient) CreateLogStream(input *cloudwatchlogs.CreateLogStreamInput) (*cloudwatchlogs.CreateLogStreamOutput, error) {
+	m.createLogStreamArgument <- input
+	output := <-m.createLogStreamResult
+	return output.successResult, output.errorResult
+}
+
+func (m *mockcwlogsclient) PutLogEvents(input *cloudwatchlogs.PutLogEventsInput) (*cloudwatchlogs.PutLogEventsOutput, error) {
+	m.putLogEventsArgument <- input
+	output := <-m.putLogEventsResult
+	return output.successResult, output.errorResult
+}
+
+func test() {
+	_ = &logStream{
+		client: newMockClient(),
+	}
+}

+ 1 - 1
docs/reference/api/docker_remote_api_v1.21.md

@@ -298,7 +298,7 @@ Json Parameters:
         systems, such as SELinux.
     -   **LogConfig** - Log configuration for the container, specified as a JSON object in the form
           `{ "Type": "<driver_name>", "Config": {"key1": "val1"}}`.
-          Available types: `json-file`, `syslog`, `journald`, `gelf`, `none`.
+          Available types: `json-file`, `syslog`, `journald`, `gelf`, `awslogs`, `none`.
           `json-file` logging driver.
     -   **CgroupParent** - Path to `cgroups` under which the container's `cgroup` is created. If the path is not absolute, the path is considered to be relative to the `cgroups` path of the init process. Cgroups are created if they do not already exist.
     -   **VolumeDriver** - Driver that this container users to mount volumes.

+ 90 - 0
docs/reference/logging/awslogs.md

@@ -0,0 +1,90 @@
+<!--[metadata]>
++++
+title = "Amazon CloudWatch Logs logging driver"
+description = "Describes how to use the Amazon CloudWatch Logs logging driver."
+keywords = ["AWS, Amazon, CloudWatch, logging, driver"]
+[menu.main]
+parent = "smn_logging"
++++
+<![end-metadata]-->
+
+# Amazon CloudWatch Logs logging driver
+
+The `awslogs` logging driver sends container logs to
+[Amazon CloudWatch Logs](https://aws.amazon.com/cloudwatch/details/#log-monitoring).
+Log entries can be retrieved through the [AWS Management
+Console](https://console.aws.amazon.com/cloudwatch/home#logs:) or the [AWS SDKs
+and Command Line Tools](http://docs.aws.amazon.com/cli/latest/reference/logs/index.html).
+
+## Usage
+
+You can configure the default logging driver by passing the `--log-driver`
+option to the Docker daemon:
+
+    docker --log-driver=awslogs
+
+You can set the logging driver for a specific container by using the
+`--log-driver` option to `docker run`:
+
+    docker run --log-driver=awslogs ...
+
+## Amazon CloudWatch Logs options
+
+You can use the `--log-opt NAME=VALUE` flag to specify Amazon CloudWatch Logs logging driver options.
+
+### awslogs-region
+
+You must specify a region for the `awslogs` logging driver. You can specify the
+region with either the `awslogs-region` log option or `AWS_REGION` environment
+variable:
+
+    docker run --log-driver=awslogs --log-opt awslogs-region=us-east-1 ...
+
+### awslogs-group
+
+You must specify a
+[log group](http://docs.aws.amazon.com/AmazonCloudWatch/latest/DeveloperGuide/WhatIsCloudWatchLogs.html)
+for the `awslogs` logging driver.  You can specify the log group with the
+`awslogs-group` log option:
+
+    docker run --log-driver=awslogs --log-opt awslogs-region=us-east-1 --log-opt awslogs-group=myLogGroup ...
+
+### awslogs-stream
+
+To configure which
+[log stream](http://docs.aws.amazon.com/AmazonCloudWatch/latest/DeveloperGuide/WhatIsCloudWatchLogs.html)
+should be used, you can specify the `awslogs-stream` log option.  If not
+specified, the container ID is used as the log stream.
+
+> **Note:**
+> Log streams within a given log group should only be used by one container
+> at a time.  Using the same log stream for multiple containers concurrently
+> can cause reduced logging performance.
+
+## Credentials
+
+You must provide AWS credentials to the Docker daemon to use the `awslogs`
+logging driver. You can provide these credentials with the `AWS_ACCESS_KEY_ID`,
+`AWS_SECRET_ACCESS_KEY`, and `AWS_SESSION_TOKEN` environment variables, the
+default AWS shared credentials file (`~/.aws/credentials` of the root user), or
+(if you are running the Docker daemon on an Amazon EC2 instance) the Amazon EC2
+instance profile.
+
+Credentials must have a policy applied that allows the `logs:CreateLogStream`
+and `logs:PutLogEvents` actions, as shown in the following example.
+
+    {
+      "Version": "2012-10-17",
+      "Statement": [
+        {
+          "Action": [
+            "logs:CreateLogStream",
+            "logs:PutLogEvents"
+          ],
+          "Effect": "Allow",
+          "Resource": "*"
+        }
+      ]
+    }
+
+

+ 2 - 1
docs/reference/logging/index.md

@@ -15,4 +15,5 @@ weight=8
 
 * [Configuring logging drivers](overview)
 * [Fluentd logging driver](fluentd)
-* [Journald logging driver](journald)
+* [Journald logging driver](journald)
+* [Amazon CloudWatch Logs logging driver](awslogs)

+ 13 - 0
docs/reference/logging/overview.md

@@ -23,6 +23,7 @@ container's logging driver. The following options are supported:
 | `journald`  | Journald logging driver for Docker. Writes log messages to `journald`.                                                        |
 | `gelf`      | Graylog Extended Log Format (GELF) logging driver for Docker. Writes log messages to a GELF endpoint likeGraylog or Logstash. |
 | `fluentd`   | Fluentd logging driver for Docker. Writes log messages to `fluentd` (forward input).                                          |
+| `awslogs`   | Amazon CloudWatch Logs logging driver for Docker. Writes log messages to Amazon CloudWatch Logs.                              |
 
 The `docker logs`command is available only for the `json-file` logging driver.  
 
@@ -128,3 +129,15 @@ For example, to specify both additional options:
 If container cannot connect to the Fluentd daemon on the specified address,
 the container stops immediately. For detailed information on working with this
 logging driver, see [the fluentd logging driver](/reference/logging/fluentd/)
+
+## Specify Amazon CloudWatch Logs options
+
+The Amazon CloudWatch Logs logging driver supports the following options:
+
+    --log-opt awslogs-region=<aws_region>
+    --log-opt awslogs-group=<log_group_name>
+    --log-opt awslogs-stream=<log_stream_name>
+
+
+For detailed information on working with this logging driver, see [the awslogs logging driver](/reference/logging/awslogs/)
+reference documentation.

+ 1 - 0
docs/reference/run.md

@@ -1011,6 +1011,7 @@ container's logging driver. The following options are supported:
 | `journald`  | Journald logging driver for Docker. Writes log messages to `journald`.                                                        |
 | `gelf`      | Graylog Extended Log Format (GELF) logging driver for Docker. Writes log messages to a GELF endpoint likeGraylog or Logstash. |
 | `fluentd`   | Fluentd logging driver for Docker. Writes log messages to `fluentd` (forward input).                                          |
+| `awslogs`   | Amazon CloudWatch Logs logging driver for Docker. Writes log messages to Amazon CloudWatch Logs                               |
 
 	The `docker logs`command is available only for the `json-file` logging
 driver.  For detailed information on working with logging drivers, see

+ 1 - 1
man/docker-create.1.md

@@ -168,7 +168,7 @@ millions of trillions.
    Add link to another container in the form of <name or id>:alias or just
    <name or id> in which case the alias will match the name.
 
-**--log-driver**="|*json-file*|*syslog*|*journald*|*gelf*|*fluentd*|*none*"
+**--log-driver**="|*json-file*|*syslog*|*journald*|*gelf*|*fluentd*|*awslogs*|*none*"
   Logging driver for container. Default is defined by daemon `--log-driver` flag.
   **Warning**: `docker logs` command works only for `json-file` logging driver.
 

+ 1 - 1
man/docker-run.1.md

@@ -268,7 +268,7 @@ which interface and port to use.
 **--lxc-conf**=[]
    (lxc exec-driver only) Add custom lxc options --lxc-conf="lxc.cgroup.cpuset.cpus = 0,1"
 
-**--log-driver**="|*json-file*|*syslog*|*journald*|*gelf*|*fluentd*|*none*"
+**--log-driver**="|*json-file*|*syslog*|*journald*|*gelf*|*fluentd*|*awslogs*|*none*"
   Logging driver for container. Default is defined by daemon `--log-driver` flag.
   **Warning**: `docker logs` command works only for `json-file` logging driver.
 

+ 1 - 1
man/docker.1.md

@@ -119,7 +119,7 @@ unix://[/path/to/socket] to use.
 **--label**="[]"
   Set key=value labels to the daemon (displayed in `docker info`)
 
-**--log-driver**="*json-file*|*syslog*|*journald*|*gelf*|*fluentd*|*none*"
+**--log-driver**="*json-file*|*syslog*|*journald*|*gelf*|*fluentd*|*awslogs*|*none*"
   Default driver for container logs. Default is `json-file`.
   **Warning**: `docker logs` command works only for `json-file` logging driver.