123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376 |
- // Package awslogs provides the logdriver for forwarding container logs to Amazon CloudWatch Logs
- package awslogs
- import (
- "errors"
- "fmt"
- "os"
- "runtime"
- "sort"
- "strings"
- "sync"
- "time"
- "github.com/Sirupsen/logrus"
- "github.com/aws/aws-sdk-go/aws"
- "github.com/aws/aws-sdk-go/aws/awserr"
- "github.com/aws/aws-sdk-go/aws/ec2metadata"
- "github.com/aws/aws-sdk-go/aws/request"
- "github.com/aws/aws-sdk-go/aws/session"
- "github.com/aws/aws-sdk-go/service/cloudwatchlogs"
- "github.com/docker/docker/daemon/logger"
- "github.com/docker/docker/dockerversion"
- )
- const (
- name = "awslogs"
- regionKey = "awslogs-region"
- regionEnvKey = "AWS_REGION"
- logGroupKey = "awslogs-group"
- logStreamKey = "awslogs-stream"
- batchPublishFrequency = 5 * time.Second
- // See: http://docs.aws.amazon.com/AmazonCloudWatchLogs/latest/APIReference/API_PutLogEvents.html
- perEventBytes = 26
- maximumBytesPerPut = 1048576
- maximumLogEventsPerPut = 10000
- // See: http://docs.aws.amazon.com/AmazonCloudWatch/latest/DeveloperGuide/cloudwatch_limits.html
- maximumBytesPerEvent = 262144 - perEventBytes
- resourceAlreadyExistsCode = "ResourceAlreadyExistsException"
- dataAlreadyAcceptedCode = "DataAlreadyAcceptedException"
- invalidSequenceTokenCode = "InvalidSequenceTokenException"
- userAgentHeader = "User-Agent"
- )
- type logStream struct {
- logStreamName string
- logGroupName string
- client api
- messages chan *logger.Message
- lock sync.RWMutex
- closed bool
- sequenceToken *string
- }
- type api interface {
- CreateLogStream(*cloudwatchlogs.CreateLogStreamInput) (*cloudwatchlogs.CreateLogStreamOutput, error)
- PutLogEvents(*cloudwatchlogs.PutLogEventsInput) (*cloudwatchlogs.PutLogEventsOutput, error)
- }
- type regionFinder interface {
- Region() (string, error)
- }
- type byTimestamp []*cloudwatchlogs.InputLogEvent
- // init registers the awslogs driver
- func init() {
- if err := logger.RegisterLogDriver(name, New); err != nil {
- logrus.Fatal(err)
- }
- if err := logger.RegisterLogOptValidator(name, ValidateLogOpt); err != nil {
- logrus.Fatal(err)
- }
- }
- // New creates an awslogs logger using the configuration passed in on the
- // context. Supported context configuration variables are awslogs-region,
- // awslogs-group, and awslogs-stream. When available, configuration is
- // also taken from environment variables AWS_REGION, AWS_ACCESS_KEY_ID,
- // AWS_SECRET_ACCESS_KEY, the shared credentials file (~/.aws/credentials), and
- // the EC2 Instance Metadata Service.
- func New(ctx logger.Context) (logger.Logger, error) {
- logGroupName := ctx.Config[logGroupKey]
- logStreamName := ctx.ContainerID
- if ctx.Config[logStreamKey] != "" {
- logStreamName = ctx.Config[logStreamKey]
- }
- client, err := newAWSLogsClient(ctx)
- if err != nil {
- return nil, err
- }
- containerStream := &logStream{
- logStreamName: logStreamName,
- logGroupName: logGroupName,
- client: client,
- messages: make(chan *logger.Message, 4096),
- }
- err = containerStream.create()
- if err != nil {
- return nil, err
- }
- go containerStream.collectBatch()
- return containerStream, nil
- }
- // newRegionFinder is a variable such that the implementation
- // can be swapped out for unit tests.
- var newRegionFinder = func() regionFinder {
- return ec2metadata.New(session.New())
- }
- // newAWSLogsClient creates the service client for Amazon CloudWatch Logs.
- // Customizations to the default client from the SDK include a Docker-specific
- // User-Agent string and automatic region detection using the EC2 Instance
- // Metadata Service when region is otherwise unspecified.
- func newAWSLogsClient(ctx logger.Context) (api, error) {
- var region *string
- if os.Getenv(regionEnvKey) != "" {
- region = aws.String(os.Getenv(regionEnvKey))
- }
- if ctx.Config[regionKey] != "" {
- region = aws.String(ctx.Config[regionKey])
- }
- if region == nil || *region == "" {
- logrus.Info("Trying to get region from EC2 Metadata")
- ec2MetadataClient := newRegionFinder()
- r, err := ec2MetadataClient.Region()
- if err != nil {
- logrus.WithFields(logrus.Fields{
- "error": err,
- }).Error("Could not get region from EC2 metadata, environment, or log option")
- return nil, errors.New("Cannot determine region for awslogs driver")
- }
- region = &r
- }
- logrus.WithFields(logrus.Fields{
- "region": *region,
- }).Debug("Created awslogs client")
- client := cloudwatchlogs.New(session.New(), aws.NewConfig().WithRegion(*region))
- client.Handlers.Build.PushBackNamed(request.NamedHandler{
- Name: "DockerUserAgentHandler",
- Fn: func(r *request.Request) {
- currentAgent := r.HTTPRequest.Header.Get(userAgentHeader)
- r.HTTPRequest.Header.Set(userAgentHeader,
- fmt.Sprintf("Docker %s (%s) %s",
- dockerversion.Version, runtime.GOOS, currentAgent))
- },
- })
- return client, nil
- }
- // Name returns the name of the awslogs logging driver
- func (l *logStream) Name() string {
- return name
- }
- // Log submits messages for logging by an instance of the awslogs logging driver
- func (l *logStream) Log(msg *logger.Message) error {
- l.lock.RLock()
- defer l.lock.RUnlock()
- if !l.closed {
- // buffer up the data, making sure to copy the Line data
- l.messages <- logger.CopyMessage(msg)
- }
- return nil
- }
- // Close closes the instance of the awslogs logging driver
- func (l *logStream) Close() error {
- l.lock.Lock()
- defer l.lock.Unlock()
- if !l.closed {
- close(l.messages)
- }
- l.closed = true
- return nil
- }
- // create creates a log stream for the instance of the awslogs logging driver
- func (l *logStream) create() error {
- input := &cloudwatchlogs.CreateLogStreamInput{
- LogGroupName: aws.String(l.logGroupName),
- LogStreamName: aws.String(l.logStreamName),
- }
- _, err := l.client.CreateLogStream(input)
- if err != nil {
- if awsErr, ok := err.(awserr.Error); ok {
- fields := logrus.Fields{
- "errorCode": awsErr.Code(),
- "message": awsErr.Message(),
- "origError": awsErr.OrigErr(),
- "logGroupName": l.logGroupName,
- "logStreamName": l.logStreamName,
- }
- if awsErr.Code() == resourceAlreadyExistsCode {
- // Allow creation to succeed
- logrus.WithFields(fields).Info("Log stream already exists")
- return nil
- }
- logrus.WithFields(fields).Error("Failed to create log stream")
- }
- }
- return err
- }
- // newTicker is used for time-based batching. newTicker is a variable such
- // that the implementation can be swapped out for unit tests.
- var newTicker = func(freq time.Duration) *time.Ticker {
- return time.NewTicker(freq)
- }
- // collectBatch executes as a goroutine to perform batching of log events for
- // submission to the log stream. Batching is performed on time- and size-
- // bases. Time-based batching occurs at a 5 second interval (defined in the
- // batchPublishFrequency const). Size-based batching is performed on the
- // maximum number of events per batch (defined in maximumLogEventsPerPut) and
- // the maximum number of total bytes in a batch (defined in
- // maximumBytesPerPut). Log messages are split by the maximum bytes per event
- // (defined in maximumBytesPerEvent). There is a fixed per-event byte overhead
- // (defined in perEventBytes) which is accounted for in split- and batch-
- // calculations.
- func (l *logStream) collectBatch() {
- timer := newTicker(batchPublishFrequency)
- var events []*cloudwatchlogs.InputLogEvent
- bytes := 0
- for {
- select {
- case <-timer.C:
- l.publishBatch(events)
- events = events[:0]
- bytes = 0
- case msg, more := <-l.messages:
- if !more {
- l.publishBatch(events)
- return
- }
- unprocessedLine := msg.Line
- for len(unprocessedLine) > 0 {
- // Split line length so it does not exceed the maximum
- lineBytes := len(unprocessedLine)
- if lineBytes > maximumBytesPerEvent {
- lineBytes = maximumBytesPerEvent
- }
- line := unprocessedLine[:lineBytes]
- unprocessedLine = unprocessedLine[lineBytes:]
- if (len(events) >= maximumLogEventsPerPut) || (bytes+lineBytes+perEventBytes > maximumBytesPerPut) {
- // Publish an existing batch if it's already over the maximum number of events or if adding this
- // event would push it over the maximum number of total bytes.
- l.publishBatch(events)
- events = events[:0]
- bytes = 0
- }
- events = append(events, &cloudwatchlogs.InputLogEvent{
- Message: aws.String(string(line)),
- Timestamp: aws.Int64(msg.Timestamp.UnixNano() / int64(time.Millisecond)),
- })
- bytes += (lineBytes + perEventBytes)
- }
- }
- }
- }
- // publishBatch calls PutLogEvents for a given set of InputLogEvents,
- // accounting for sequencing requirements (each request must reference the
- // sequence token returned by the previous request).
- func (l *logStream) publishBatch(events []*cloudwatchlogs.InputLogEvent) {
- if len(events) == 0 {
- return
- }
- sort.Sort(byTimestamp(events))
- nextSequenceToken, err := l.putLogEvents(events, l.sequenceToken)
- if err != nil {
- if awsErr, ok := err.(awserr.Error); ok {
- if awsErr.Code() == dataAlreadyAcceptedCode {
- // already submitted, just grab the correct sequence token
- parts := strings.Split(awsErr.Message(), " ")
- nextSequenceToken = &parts[len(parts)-1]
- logrus.WithFields(logrus.Fields{
- "errorCode": awsErr.Code(),
- "message": awsErr.Message(),
- "logGroupName": l.logGroupName,
- "logStreamName": l.logStreamName,
- }).Info("Data already accepted, ignoring error")
- err = nil
- } else if awsErr.Code() == invalidSequenceTokenCode {
- // sequence code is bad, grab the correct one and retry
- parts := strings.Split(awsErr.Message(), " ")
- token := parts[len(parts)-1]
- nextSequenceToken, err = l.putLogEvents(events, &token)
- }
- }
- }
- if err != nil {
- logrus.Error(err)
- } else {
- l.sequenceToken = nextSequenceToken
- }
- }
- // putLogEvents wraps the PutLogEvents API
- func (l *logStream) putLogEvents(events []*cloudwatchlogs.InputLogEvent, sequenceToken *string) (*string, error) {
- input := &cloudwatchlogs.PutLogEventsInput{
- LogEvents: events,
- SequenceToken: sequenceToken,
- LogGroupName: aws.String(l.logGroupName),
- LogStreamName: aws.String(l.logStreamName),
- }
- resp, err := l.client.PutLogEvents(input)
- if err != nil {
- if awsErr, ok := err.(awserr.Error); ok {
- logrus.WithFields(logrus.Fields{
- "errorCode": awsErr.Code(),
- "message": awsErr.Message(),
- "origError": awsErr.OrigErr(),
- "logGroupName": l.logGroupName,
- "logStreamName": l.logStreamName,
- }).Error("Failed to put log events")
- }
- return nil, err
- }
- return resp.NextSequenceToken, nil
- }
- // ValidateLogOpt looks for awslogs-specific log options awslogs-region,
- // awslogs-group, and awslogs-stream
- func ValidateLogOpt(cfg map[string]string) error {
- for key := range cfg {
- switch key {
- case logGroupKey:
- case logStreamKey:
- case regionKey:
- default:
- return fmt.Errorf("unknown log opt '%s' for %s log driver", key, name)
- }
- }
- if cfg[logGroupKey] == "" {
- return fmt.Errorf("must specify a value for log opt '%s'", logGroupKey)
- }
- return nil
- }
- // Len returns the length of a byTimestamp slice. Len is required by the
- // sort.Interface interface.
- func (slice byTimestamp) Len() int {
- return len(slice)
- }
- // Less compares two values in a byTimestamp slice by Timestamp. Less is
- // required by the sort.Interface interface.
- func (slice byTimestamp) Less(i, j int) bool {
- iTimestamp, jTimestamp := int64(0), int64(0)
- if slice != nil && slice[i].Timestamp != nil {
- iTimestamp = *slice[i].Timestamp
- }
- if slice != nil && slice[j].Timestamp != nil {
- jTimestamp = *slice[j].Timestamp
- }
- return iTimestamp < jTimestamp
- }
- // Swap swaps two values in a byTimestamp slice with each other. Swap is
- // required by the sort.Interface interface.
- func (slice byTimestamp) Swap(i, j int) {
- slice[i], slice[j] = slice[j], slice[i]
- }
|