2aa13e950d
Signed-off-by: Paweł Gronowski <pawel.gronowski@docker.com>
898 lines
31 KiB
Go
898 lines
31 KiB
Go
// Package awslogs provides the logdriver for forwarding container logs to Amazon CloudWatch Logs
|
|
package awslogs // import "github.com/docker/docker/daemon/logger/awslogs"
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"os"
|
|
"regexp"
|
|
"sort"
|
|
"strconv"
|
|
"sync"
|
|
"time"
|
|
"unicode/utf8"
|
|
|
|
"github.com/aws/aws-sdk-go-v2/aws"
|
|
"github.com/aws/aws-sdk-go-v2/aws/middleware"
|
|
"github.com/aws/aws-sdk-go-v2/config"
|
|
"github.com/aws/aws-sdk-go-v2/credentials/endpointcreds"
|
|
"github.com/aws/aws-sdk-go-v2/feature/ec2/imds"
|
|
"github.com/aws/aws-sdk-go-v2/service/cloudwatchlogs"
|
|
"github.com/aws/aws-sdk-go-v2/service/cloudwatchlogs/types"
|
|
"github.com/aws/smithy-go"
|
|
smithymiddleware "github.com/aws/smithy-go/middleware"
|
|
smithyhttp "github.com/aws/smithy-go/transport/http"
|
|
"github.com/containerd/log"
|
|
"github.com/docker/docker/daemon/logger"
|
|
"github.com/docker/docker/daemon/logger/loggerutils"
|
|
"github.com/docker/docker/dockerversion"
|
|
"github.com/pkg/errors"
|
|
)
|
|
|
|
const (
|
|
name = "awslogs"
|
|
regionKey = "awslogs-region"
|
|
endpointKey = "awslogs-endpoint"
|
|
regionEnvKey = "AWS_REGION"
|
|
logGroupKey = "awslogs-group"
|
|
logStreamKey = "awslogs-stream"
|
|
logCreateGroupKey = "awslogs-create-group"
|
|
logCreateStreamKey = "awslogs-create-stream"
|
|
tagKey = "tag"
|
|
datetimeFormatKey = "awslogs-datetime-format"
|
|
multilinePatternKey = "awslogs-multiline-pattern"
|
|
credentialsEndpointKey = "awslogs-credentials-endpoint" //nolint:gosec // G101: Potential hardcoded credentials
|
|
forceFlushIntervalKey = "awslogs-force-flush-interval-seconds"
|
|
maxBufferedEventsKey = "awslogs-max-buffered-events"
|
|
logFormatKey = "awslogs-format"
|
|
|
|
defaultForceFlushInterval = 5 * time.Second
|
|
defaultMaxBufferedEvents = 4096
|
|
|
|
// See: http://docs.aws.amazon.com/AmazonCloudWatchLogs/latest/APIReference/API_PutLogEvents.html
|
|
perEventBytes = 26
|
|
maximumBytesPerPut = 1048576
|
|
maximumLogEventsPerPut = 10000
|
|
|
|
// See: http://docs.aws.amazon.com/AmazonCloudWatch/latest/DeveloperGuide/cloudwatch_limits.html
|
|
// Because the events are interpreted as UTF-8 encoded Unicode, invalid UTF-8 byte sequences are replaced with the
|
|
// Unicode replacement character (U+FFFD), which is a 3-byte sequence in UTF-8. To compensate for that and to avoid
|
|
// splitting valid UTF-8 characters into invalid byte sequences, we calculate the length of each event assuming that
|
|
// this replacement happens.
|
|
maximumBytesPerEvent = 262144 - perEventBytes
|
|
|
|
credentialsEndpoint = "http://169.254.170.2" //nolint:gosec // G101: Potential hardcoded credentials
|
|
|
|
// See: https://docs.aws.amazon.com/AmazonCloudWatch/latest/monitoring/CloudWatch_Embedded_Metric_Format_Specification.html
|
|
logsFormatHeader = "x-amzn-logs-format"
|
|
jsonEmfLogFormat = "json/emf"
|
|
)
|
|
|
|
type logStream struct {
|
|
logStreamName string
|
|
logGroupName string
|
|
logCreateGroup bool
|
|
logCreateStream bool
|
|
forceFlushInterval time.Duration
|
|
multilinePattern *regexp.Regexp
|
|
client api
|
|
messages chan *logger.Message
|
|
lock sync.RWMutex
|
|
closed bool
|
|
sequenceToken *string
|
|
}
|
|
|
|
type logStreamConfig struct {
|
|
logStreamName string
|
|
logGroupName string
|
|
logCreateGroup bool
|
|
logCreateStream bool
|
|
forceFlushInterval time.Duration
|
|
maxBufferedEvents int
|
|
multilinePattern *regexp.Regexp
|
|
}
|
|
|
|
var _ logger.SizedLogger = &logStream{}
|
|
|
|
type api interface {
|
|
CreateLogGroup(context.Context, *cloudwatchlogs.CreateLogGroupInput, ...func(*cloudwatchlogs.Options)) (*cloudwatchlogs.CreateLogGroupOutput, error)
|
|
CreateLogStream(context.Context, *cloudwatchlogs.CreateLogStreamInput, ...func(*cloudwatchlogs.Options)) (*cloudwatchlogs.CreateLogStreamOutput, error)
|
|
PutLogEvents(context.Context, *cloudwatchlogs.PutLogEventsInput, ...func(*cloudwatchlogs.Options)) (*cloudwatchlogs.PutLogEventsOutput, error)
|
|
}
|
|
|
|
type regionFinder interface {
|
|
GetRegion(context.Context, *imds.GetRegionInput, ...func(*imds.Options)) (*imds.GetRegionOutput, error)
|
|
}
|
|
|
|
type wrappedEvent struct {
|
|
inputLogEvent types.InputLogEvent
|
|
insertOrder int
|
|
}
|
|
type byTimestamp []wrappedEvent
|
|
|
|
// init registers the awslogs driver
|
|
func init() {
|
|
if err := logger.RegisterLogDriver(name, New); err != nil {
|
|
panic(err)
|
|
}
|
|
if err := logger.RegisterLogOptValidator(name, ValidateLogOpt); err != nil {
|
|
panic(err)
|
|
}
|
|
}
|
|
|
|
// eventBatch holds the events that are batched for submission and the
|
|
// associated data about it.
|
|
//
|
|
// Warning: this type is not threadsafe and must not be used
|
|
// concurrently. This type is expected to be consumed in a single go
|
|
// routine and never concurrently.
|
|
type eventBatch struct {
|
|
batch []wrappedEvent
|
|
bytes int
|
|
}
|
|
|
|
// New creates an awslogs logger using the configuration passed in on the
|
|
// context. Supported context configuration variables are awslogs-region,
|
|
// awslogs-endpoint, awslogs-group, awslogs-stream, awslogs-create-group,
|
|
// awslogs-multiline-pattern and awslogs-datetime-format.
|
|
// When available, configuration is also taken from environment variables
|
|
// AWS_REGION, AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY, the shared credentials
|
|
// file (~/.aws/credentials), and the EC2 Instance Metadata Service.
|
|
func New(info logger.Info) (logger.Logger, error) {
|
|
containerStreamConfig, err := newStreamConfig(info)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
client, err := newAWSLogsClient(info)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
logNonBlocking := info.Config["mode"] == "non-blocking"
|
|
|
|
containerStream := &logStream{
|
|
logStreamName: containerStreamConfig.logStreamName,
|
|
logGroupName: containerStreamConfig.logGroupName,
|
|
logCreateGroup: containerStreamConfig.logCreateGroup,
|
|
logCreateStream: containerStreamConfig.logCreateStream,
|
|
forceFlushInterval: containerStreamConfig.forceFlushInterval,
|
|
multilinePattern: containerStreamConfig.multilinePattern,
|
|
client: client,
|
|
messages: make(chan *logger.Message, containerStreamConfig.maxBufferedEvents),
|
|
}
|
|
|
|
creationDone := make(chan bool)
|
|
if logNonBlocking {
|
|
go func() {
|
|
backoff := 1
|
|
maxBackoff := 32
|
|
for {
|
|
// If logger is closed we are done
|
|
containerStream.lock.RLock()
|
|
if containerStream.closed {
|
|
containerStream.lock.RUnlock()
|
|
break
|
|
}
|
|
containerStream.lock.RUnlock()
|
|
err := containerStream.create()
|
|
if err == nil {
|
|
break
|
|
}
|
|
|
|
time.Sleep(time.Duration(backoff) * time.Second)
|
|
if backoff < maxBackoff {
|
|
backoff *= 2
|
|
}
|
|
log.G(context.TODO()).
|
|
WithError(err).
|
|
WithField("container-id", info.ContainerID).
|
|
WithField("container-name", info.ContainerName).
|
|
Error("Error while trying to initialize awslogs. Retrying in: ", backoff, " seconds")
|
|
}
|
|
close(creationDone)
|
|
}()
|
|
} else {
|
|
if err = containerStream.create(); err != nil {
|
|
return nil, err
|
|
}
|
|
close(creationDone)
|
|
}
|
|
go containerStream.collectBatch(creationDone)
|
|
|
|
return containerStream, nil
|
|
}
|
|
|
|
// Parses most of the awslogs- options and prepares a config object to be used for newing the actual stream
|
|
// It has been formed out to ease Utest of the New above
|
|
func newStreamConfig(info logger.Info) (*logStreamConfig, error) {
|
|
logGroupName := info.Config[logGroupKey]
|
|
logStreamName, err := loggerutils.ParseLogTag(info, "{{.FullID}}")
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
logCreateGroup := false
|
|
if info.Config[logCreateGroupKey] != "" {
|
|
logCreateGroup, err = strconv.ParseBool(info.Config[logCreateGroupKey])
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
}
|
|
|
|
forceFlushInterval := defaultForceFlushInterval
|
|
if info.Config[forceFlushIntervalKey] != "" {
|
|
forceFlushIntervalAsInt, err := strconv.Atoi(info.Config[forceFlushIntervalKey])
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
forceFlushInterval = time.Duration(forceFlushIntervalAsInt) * time.Second
|
|
}
|
|
|
|
maxBufferedEvents := int(defaultMaxBufferedEvents)
|
|
if info.Config[maxBufferedEventsKey] != "" {
|
|
maxBufferedEvents, err = strconv.Atoi(info.Config[maxBufferedEventsKey])
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
}
|
|
|
|
if info.Config[logStreamKey] != "" {
|
|
logStreamName = info.Config[logStreamKey]
|
|
}
|
|
logCreateStream := true
|
|
if info.Config[logCreateStreamKey] != "" {
|
|
logCreateStream, err = strconv.ParseBool(info.Config[logCreateStreamKey])
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
}
|
|
|
|
multilinePattern, err := parseMultilineOptions(info)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
containerStreamConfig := &logStreamConfig{
|
|
logStreamName: logStreamName,
|
|
logGroupName: logGroupName,
|
|
logCreateGroup: logCreateGroup,
|
|
logCreateStream: logCreateStream,
|
|
forceFlushInterval: forceFlushInterval,
|
|
maxBufferedEvents: maxBufferedEvents,
|
|
multilinePattern: multilinePattern,
|
|
}
|
|
|
|
return containerStreamConfig, nil
|
|
}
|
|
|
|
// Parses awslogs-multiline-pattern and awslogs-datetime-format options
|
|
// If awslogs-datetime-format is present, convert the format from strftime
|
|
// to regexp and return.
|
|
// If awslogs-multiline-pattern is present, compile regexp and return
|
|
func parseMultilineOptions(info logger.Info) (*regexp.Regexp, error) {
|
|
dateTimeFormat := info.Config[datetimeFormatKey]
|
|
multilinePatternKey := info.Config[multilinePatternKey]
|
|
// strftime input is parsed into a regular expression
|
|
if dateTimeFormat != "" {
|
|
// %. matches each strftime format sequence and ReplaceAllStringFunc
|
|
// looks up each format sequence in the conversion table strftimeToRegex
|
|
// to replace with a defined regular expression
|
|
r := regexp.MustCompile("%.")
|
|
multilinePatternKey = r.ReplaceAllStringFunc(dateTimeFormat, func(s string) string {
|
|
return strftimeToRegex[s]
|
|
})
|
|
}
|
|
if multilinePatternKey != "" {
|
|
multilinePattern, err := regexp.Compile(multilinePatternKey)
|
|
if err != nil {
|
|
return nil, errors.Wrapf(err, "awslogs could not parse multiline pattern key %q", multilinePatternKey)
|
|
}
|
|
return multilinePattern, nil
|
|
}
|
|
return nil, nil
|
|
}
|
|
|
|
// Maps strftime format strings to regex
|
|
var strftimeToRegex = map[string]string{
|
|
/*weekdayShort */ `%a`: `(?:Mon|Tue|Wed|Thu|Fri|Sat|Sun)`,
|
|
/*weekdayFull */ `%A`: `(?:Monday|Tuesday|Wednesday|Thursday|Friday|Saturday|Sunday)`,
|
|
/*weekdayZeroIndex */ `%w`: `[0-6]`,
|
|
/*dayZeroPadded */ `%d`: `(?:0[1-9]|[1,2][0-9]|3[0,1])`,
|
|
/*monthShort */ `%b`: `(?:Jan|Feb|Mar|Apr|May|Jun|Jul|Aug|Sep|Oct|Nov|Dec)`,
|
|
/*monthFull */ `%B`: `(?:January|February|March|April|May|June|July|August|September|October|November|December)`,
|
|
/*monthZeroPadded */ `%m`: `(?:0[1-9]|1[0-2])`,
|
|
/*yearCentury */ `%Y`: `\d{4}`,
|
|
/*yearZeroPadded */ `%y`: `\d{2}`,
|
|
/*hour24ZeroPadded */ `%H`: `(?:[0,1][0-9]|2[0-3])`,
|
|
/*hour12ZeroPadded */ `%I`: `(?:0[0-9]|1[0-2])`,
|
|
/*AM or PM */ `%p`: "[A,P]M",
|
|
/*minuteZeroPadded */ `%M`: `[0-5][0-9]`,
|
|
/*secondZeroPadded */ `%S`: `[0-5][0-9]`,
|
|
/*microsecondZeroPadded */ `%f`: `\d{6}`,
|
|
/*utcOffset */ `%z`: `[+-]\d{4}`,
|
|
/*tzName */ `%Z`: `[A-Z]{1,4}T`,
|
|
/*dayOfYearZeroPadded */ `%j`: `(?:0[0-9][1-9]|[1,2][0-9][0-9]|3[0-5][0-9]|36[0-6])`,
|
|
/*milliseconds */ `%L`: `\.\d{3}`,
|
|
}
|
|
|
|
// newRegionFinder is a variable such that the implementation
|
|
// can be swapped out for unit tests.
|
|
var newRegionFinder = func(ctx context.Context) (regionFinder, error) {
|
|
cfg, err := config.LoadDefaultConfig(ctx) // default config, because we don't yet know the region
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
client := imds.NewFromConfig(cfg)
|
|
return client, nil
|
|
}
|
|
|
|
// newSDKEndpoint is a variable such that the implementation
|
|
// can be swapped out for unit tests.
|
|
var newSDKEndpoint = credentialsEndpoint
|
|
|
|
// newAWSLogsClient creates the service client for Amazon CloudWatch Logs.
|
|
// Customizations to the default client from the SDK include a Docker-specific
|
|
// User-Agent string and automatic region detection using the EC2 Instance
|
|
// Metadata Service when region is otherwise unspecified.
|
|
func newAWSLogsClient(info logger.Info, configOpts ...func(*config.LoadOptions) error) (*cloudwatchlogs.Client, error) {
|
|
ctx := context.TODO()
|
|
var region, endpoint *string
|
|
if os.Getenv(regionEnvKey) != "" {
|
|
region = aws.String(os.Getenv(regionEnvKey))
|
|
}
|
|
if info.Config[regionKey] != "" {
|
|
region = aws.String(info.Config[regionKey])
|
|
}
|
|
if info.Config[endpointKey] != "" {
|
|
endpoint = aws.String(info.Config[endpointKey])
|
|
}
|
|
if region == nil || *region == "" {
|
|
log.G(ctx).Info("Trying to get region from IMDS")
|
|
regFinder, err := newRegionFinder(context.TODO())
|
|
if err != nil {
|
|
log.G(ctx).WithError(err).Error("could not create regionFinder")
|
|
return nil, errors.Wrap(err, "could not create regionFinder")
|
|
}
|
|
|
|
r, err := regFinder.GetRegion(context.TODO(), &imds.GetRegionInput{})
|
|
if err != nil {
|
|
log.G(ctx).WithError(err).Error("Could not get region from IMDS, environment, or log option")
|
|
return nil, errors.Wrap(err, "cannot determine region for awslogs driver")
|
|
}
|
|
region = &r.Region
|
|
}
|
|
|
|
configOpts = append(configOpts, config.WithRegion(*region))
|
|
|
|
if uri, ok := info.Config[credentialsEndpointKey]; ok {
|
|
log.G(ctx).Debugf("Trying to get credentials from awslogs-credentials-endpoint")
|
|
|
|
endpoint := fmt.Sprintf("%s%s", newSDKEndpoint, uri)
|
|
configOpts = append(configOpts, config.WithCredentialsProvider(endpointcreds.New(endpoint)))
|
|
}
|
|
|
|
cfg, err := config.LoadDefaultConfig(context.TODO(), configOpts...)
|
|
if err != nil {
|
|
log.G(ctx).WithError(err).Error("Could not initialize AWS SDK config")
|
|
return nil, errors.Wrap(err, "could not initialize AWS SDK config")
|
|
}
|
|
|
|
log.G(ctx).WithFields(log.Fields{
|
|
"region": *region,
|
|
}).Debug("Created awslogs client")
|
|
|
|
var clientOpts []func(*cloudwatchlogs.Options)
|
|
|
|
if info.Config[logFormatKey] != "" {
|
|
logFormatMiddleware := smithymiddleware.BuildMiddlewareFunc("logFormat", func(
|
|
ctx context.Context, in smithymiddleware.BuildInput, next smithymiddleware.BuildHandler,
|
|
) (
|
|
out smithymiddleware.BuildOutput, metadata smithymiddleware.Metadata, err error,
|
|
) {
|
|
switch v := in.Request.(type) {
|
|
case *smithyhttp.Request:
|
|
v.Header.Add(logsFormatHeader, jsonEmfLogFormat)
|
|
}
|
|
return next.HandleBuild(ctx, in)
|
|
})
|
|
clientOpts = append(
|
|
clientOpts,
|
|
cloudwatchlogs.WithAPIOptions(func(stack *smithymiddleware.Stack) error {
|
|
return stack.Build.Add(logFormatMiddleware, smithymiddleware.Before)
|
|
}),
|
|
)
|
|
}
|
|
|
|
clientOpts = append(
|
|
clientOpts,
|
|
cloudwatchlogs.WithAPIOptions(middleware.AddUserAgentKeyValue("Docker", dockerversion.Version)),
|
|
func(o *cloudwatchlogs.Options) {
|
|
o.BaseEndpoint = endpoint
|
|
},
|
|
)
|
|
|
|
client := cloudwatchlogs.NewFromConfig(cfg, clientOpts...)
|
|
|
|
return client, nil
|
|
}
|
|
|
|
// Name returns the name of the awslogs logging driver
|
|
func (l *logStream) Name() string {
|
|
return name
|
|
}
|
|
|
|
// BufSize returns the maximum bytes CloudWatch can handle.
|
|
func (l *logStream) BufSize() int {
|
|
return maximumBytesPerEvent
|
|
}
|
|
|
|
// Log submits messages for logging by an instance of the awslogs logging driver
|
|
func (l *logStream) Log(msg *logger.Message) error {
|
|
l.lock.RLock()
|
|
defer l.lock.RUnlock()
|
|
if l.closed {
|
|
return errors.New("awslogs is closed")
|
|
}
|
|
l.messages <- msg
|
|
return nil
|
|
}
|
|
|
|
// Close closes the instance of the awslogs logging driver
|
|
func (l *logStream) Close() error {
|
|
l.lock.Lock()
|
|
defer l.lock.Unlock()
|
|
if !l.closed {
|
|
close(l.messages)
|
|
}
|
|
l.closed = true
|
|
return nil
|
|
}
|
|
|
|
// create creates log group and log stream for the instance of the awslogs logging driver
|
|
func (l *logStream) create() error {
|
|
err := l.createLogStream()
|
|
if err == nil {
|
|
return nil
|
|
}
|
|
|
|
var apiErr *types.ResourceNotFoundException
|
|
if errors.As(err, &apiErr) && l.logCreateGroup {
|
|
if err := l.createLogGroup(); err != nil {
|
|
return errors.Wrap(err, "failed to create Cloudwatch log group")
|
|
}
|
|
err = l.createLogStream()
|
|
if err == nil {
|
|
return nil
|
|
}
|
|
}
|
|
return errors.Wrap(err, "failed to create Cloudwatch log stream")
|
|
}
|
|
|
|
// createLogGroup creates a log group for the instance of the awslogs logging driver
|
|
func (l *logStream) createLogGroup() error {
|
|
if _, err := l.client.CreateLogGroup(context.TODO(), &cloudwatchlogs.CreateLogGroupInput{
|
|
LogGroupName: aws.String(l.logGroupName),
|
|
}); err != nil {
|
|
var apiErr smithy.APIError
|
|
if errors.As(err, &apiErr) {
|
|
fields := log.Fields{
|
|
"errorCode": apiErr.ErrorCode(),
|
|
"message": apiErr.ErrorMessage(),
|
|
"logGroupName": l.logGroupName,
|
|
"logCreateGroup": l.logCreateGroup,
|
|
}
|
|
if _, ok := apiErr.(*types.ResourceAlreadyExistsException); ok {
|
|
// Allow creation to succeed
|
|
log.G(context.TODO()).WithFields(fields).Info("Log group already exists")
|
|
return nil
|
|
}
|
|
log.G(context.TODO()).WithFields(fields).Error("Failed to create log group")
|
|
}
|
|
return err
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// createLogStream creates a log stream for the instance of the awslogs logging driver
|
|
func (l *logStream) createLogStream() error {
|
|
// Directly return if we do not want to create log stream.
|
|
if !l.logCreateStream {
|
|
log.G(context.TODO()).WithFields(log.Fields{
|
|
"logGroupName": l.logGroupName,
|
|
"logStreamName": l.logStreamName,
|
|
"logCreateStream": l.logCreateStream,
|
|
}).Info("Skipping creating log stream")
|
|
return nil
|
|
}
|
|
|
|
input := &cloudwatchlogs.CreateLogStreamInput{
|
|
LogGroupName: aws.String(l.logGroupName),
|
|
LogStreamName: aws.String(l.logStreamName),
|
|
}
|
|
|
|
_, err := l.client.CreateLogStream(context.TODO(), input)
|
|
if err != nil {
|
|
var apiErr smithy.APIError
|
|
if errors.As(err, &apiErr) {
|
|
fields := log.Fields{
|
|
"errorCode": apiErr.ErrorCode(),
|
|
"message": apiErr.ErrorMessage(),
|
|
"logGroupName": l.logGroupName,
|
|
"logStreamName": l.logStreamName,
|
|
}
|
|
if _, ok := apiErr.(*types.ResourceAlreadyExistsException); ok {
|
|
// Allow creation to succeed
|
|
log.G(context.TODO()).WithFields(fields).Info("Log stream already exists")
|
|
return nil
|
|
}
|
|
log.G(context.TODO()).WithFields(fields).Error("Failed to create log stream")
|
|
}
|
|
}
|
|
return err
|
|
}
|
|
|
|
// newTicker is used for time-based batching. newTicker is a variable such
|
|
// that the implementation can be swapped out for unit tests.
|
|
var newTicker = func(freq time.Duration) *time.Ticker {
|
|
return time.NewTicker(freq)
|
|
}
|
|
|
|
// collectBatch executes as a goroutine to perform batching of log events for
|
|
// submission to the log stream. If the awslogs-multiline-pattern or
|
|
// awslogs-datetime-format options have been configured, multiline processing
|
|
// is enabled, where log messages are stored in an event buffer until a multiline
|
|
// pattern match is found, at which point the messages in the event buffer are
|
|
// pushed to CloudWatch logs as a single log event. Multiline messages are processed
|
|
// according to the maximumBytesPerPut constraint, and the implementation only
|
|
// allows for messages to be buffered for a maximum of 2*l.forceFlushInterval
|
|
// seconds. If no forceFlushInterval is specified for the log stream, then the default
|
|
// of 5 seconds will be used resulting in a maximum of 10 seconds buffer time for multiline
|
|
// messages. When events are ready to be processed for submission to CloudWatch
|
|
// Logs, the processEvents method is called. If a multiline pattern is not
|
|
// configured, log events are submitted to the processEvents method immediately.
|
|
func (l *logStream) collectBatch(created chan bool) {
|
|
// Wait for the logstream/group to be created
|
|
<-created
|
|
flushInterval := l.forceFlushInterval
|
|
if flushInterval <= 0 {
|
|
flushInterval = defaultForceFlushInterval
|
|
}
|
|
ticker := newTicker(flushInterval)
|
|
var eventBuffer []byte
|
|
var eventBufferTimestamp int64
|
|
batch := newEventBatch()
|
|
for {
|
|
select {
|
|
case t := <-ticker.C:
|
|
// If event buffer is older than batch publish frequency flush the event buffer
|
|
if eventBufferTimestamp > 0 && len(eventBuffer) > 0 {
|
|
eventBufferAge := t.UnixNano()/int64(time.Millisecond) - eventBufferTimestamp
|
|
eventBufferExpired := eventBufferAge >= int64(flushInterval)/int64(time.Millisecond)
|
|
eventBufferNegative := eventBufferAge < 0
|
|
if eventBufferExpired || eventBufferNegative {
|
|
l.processEvent(batch, eventBuffer, eventBufferTimestamp)
|
|
eventBuffer = eventBuffer[:0]
|
|
}
|
|
}
|
|
l.publishBatch(batch)
|
|
batch.reset()
|
|
case msg, more := <-l.messages:
|
|
if !more {
|
|
// Flush event buffer and release resources
|
|
l.processEvent(batch, eventBuffer, eventBufferTimestamp)
|
|
l.publishBatch(batch)
|
|
batch.reset()
|
|
return
|
|
}
|
|
if eventBufferTimestamp == 0 {
|
|
eventBufferTimestamp = msg.Timestamp.UnixNano() / int64(time.Millisecond)
|
|
}
|
|
line := msg.Line
|
|
if l.multilinePattern != nil {
|
|
lineEffectiveLen := effectiveLen(string(line))
|
|
if l.multilinePattern.Match(line) || effectiveLen(string(eventBuffer))+lineEffectiveLen > maximumBytesPerEvent {
|
|
// This is a new log event or we will exceed max bytes per event
|
|
// so flush the current eventBuffer to events and reset timestamp
|
|
l.processEvent(batch, eventBuffer, eventBufferTimestamp)
|
|
eventBufferTimestamp = msg.Timestamp.UnixNano() / int64(time.Millisecond)
|
|
eventBuffer = eventBuffer[:0]
|
|
}
|
|
// Append newline if event is less than max event size
|
|
if lineEffectiveLen < maximumBytesPerEvent {
|
|
line = append(line, "\n"...)
|
|
}
|
|
eventBuffer = append(eventBuffer, line...)
|
|
logger.PutMessage(msg)
|
|
} else {
|
|
l.processEvent(batch, line, msg.Timestamp.UnixNano()/int64(time.Millisecond))
|
|
logger.PutMessage(msg)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// processEvent processes log events that are ready for submission to CloudWatch
|
|
// logs. Batching is performed on time- and size-bases. Time-based batching occurs
|
|
// at the interval defined by awslogs-force-flush-interval-seconds (defaults to 5 seconds).
|
|
// Size-based batching is performed on the maximum number of events per batch
|
|
// (defined in maximumLogEventsPerPut) and the maximum number of total bytes in a
|
|
// batch (defined in maximumBytesPerPut). Log messages are split by the maximum
|
|
// bytes per event (defined in maximumBytesPerEvent). There is a fixed per-event
|
|
// byte overhead (defined in perEventBytes) which is accounted for in split- and
|
|
// batch-calculations. Because the events are interpreted as UTF-8 encoded
|
|
// Unicode, invalid UTF-8 byte sequences are replaced with the Unicode
|
|
// replacement character (U+FFFD), which is a 3-byte sequence in UTF-8. To
|
|
// compensate for that and to avoid splitting valid UTF-8 characters into
|
|
// invalid byte sequences, we calculate the length of each event assuming that
|
|
// this replacement happens.
|
|
func (l *logStream) processEvent(batch *eventBatch, bytes []byte, timestamp int64) {
|
|
for len(bytes) > 0 {
|
|
// Split line length so it does not exceed the maximum
|
|
splitOffset, lineBytes := findValidSplit(string(bytes), maximumBytesPerEvent)
|
|
line := bytes[:splitOffset]
|
|
event := wrappedEvent{
|
|
inputLogEvent: types.InputLogEvent{
|
|
Message: aws.String(string(line)),
|
|
Timestamp: aws.Int64(timestamp),
|
|
},
|
|
insertOrder: batch.count(),
|
|
}
|
|
|
|
added := batch.add(event, lineBytes)
|
|
if added {
|
|
bytes = bytes[splitOffset:]
|
|
} else {
|
|
l.publishBatch(batch)
|
|
batch.reset()
|
|
}
|
|
}
|
|
}
|
|
|
|
// effectiveLen counts the effective number of bytes in the string, after
|
|
// UTF-8 normalization. UTF-8 normalization includes replacing bytes that do
|
|
// not constitute valid UTF-8 encoded Unicode codepoints with the Unicode
|
|
// replacement codepoint U+FFFD (a 3-byte UTF-8 sequence, represented in Go as
|
|
// utf8.RuneError)
|
|
func effectiveLen(line string) int {
|
|
effectiveBytes := 0
|
|
for _, rune := range line {
|
|
effectiveBytes += utf8.RuneLen(rune)
|
|
}
|
|
return effectiveBytes
|
|
}
|
|
|
|
// findValidSplit finds the byte offset to split a string without breaking valid
|
|
// Unicode codepoints given a maximum number of total bytes. findValidSplit
|
|
// returns the byte offset for splitting a string or []byte, as well as the
|
|
// effective number of bytes if the string were normalized to replace invalid
|
|
// UTF-8 encoded bytes with the Unicode replacement character (a 3-byte UTF-8
|
|
// sequence, represented in Go as utf8.RuneError)
|
|
func findValidSplit(line string, maxBytes int) (splitOffset, effectiveBytes int) {
|
|
for offset, rune := range line {
|
|
splitOffset = offset
|
|
if effectiveBytes+utf8.RuneLen(rune) > maxBytes {
|
|
return splitOffset, effectiveBytes
|
|
}
|
|
effectiveBytes += utf8.RuneLen(rune)
|
|
}
|
|
splitOffset = len(line)
|
|
return
|
|
}
|
|
|
|
// publishBatch calls PutLogEvents for a given set of InputLogEvents,
|
|
// accounting for sequencing requirements (each request must reference the
|
|
// sequence token returned by the previous request).
|
|
func (l *logStream) publishBatch(batch *eventBatch) {
|
|
if batch.isEmpty() {
|
|
return
|
|
}
|
|
cwEvents := unwrapEvents(batch.events())
|
|
|
|
nextSequenceToken, err := l.putLogEvents(cwEvents, l.sequenceToken)
|
|
if err != nil {
|
|
if apiErr := (*types.DataAlreadyAcceptedException)(nil); errors.As(err, &apiErr) {
|
|
// already submitted, just grab the correct sequence token
|
|
nextSequenceToken = apiErr.ExpectedSequenceToken
|
|
log.G(context.TODO()).WithFields(log.Fields{
|
|
"errorCode": apiErr.ErrorCode(),
|
|
"message": apiErr.ErrorMessage(),
|
|
"logGroupName": l.logGroupName,
|
|
"logStreamName": l.logStreamName,
|
|
}).Info("Data already accepted, ignoring error")
|
|
err = nil
|
|
} else if apiErr := (*types.InvalidSequenceTokenException)(nil); errors.As(err, &apiErr) {
|
|
nextSequenceToken, err = l.putLogEvents(cwEvents, apiErr.ExpectedSequenceToken)
|
|
}
|
|
}
|
|
if err != nil {
|
|
log.G(context.TODO()).Error(err)
|
|
} else {
|
|
l.sequenceToken = nextSequenceToken
|
|
}
|
|
}
|
|
|
|
// putLogEvents wraps the PutLogEvents API
|
|
func (l *logStream) putLogEvents(events []types.InputLogEvent, sequenceToken *string) (*string, error) {
|
|
input := &cloudwatchlogs.PutLogEventsInput{
|
|
LogEvents: events,
|
|
SequenceToken: sequenceToken,
|
|
LogGroupName: aws.String(l.logGroupName),
|
|
LogStreamName: aws.String(l.logStreamName),
|
|
}
|
|
resp, err := l.client.PutLogEvents(context.TODO(), input)
|
|
if err != nil {
|
|
var apiErr smithy.APIError
|
|
if errors.As(err, &apiErr) {
|
|
log.G(context.TODO()).WithFields(log.Fields{
|
|
"errorCode": apiErr.ErrorCode(),
|
|
"message": apiErr.ErrorMessage(),
|
|
"logGroupName": l.logGroupName,
|
|
"logStreamName": l.logStreamName,
|
|
}).Error("Failed to put log events")
|
|
}
|
|
return nil, err
|
|
}
|
|
return resp.NextSequenceToken, nil
|
|
}
|
|
|
|
// ValidateLogOpt looks for awslogs-specific log options awslogs-region, awslogs-endpoint
|
|
// awslogs-group, awslogs-stream, awslogs-create-group, awslogs-datetime-format,
|
|
// awslogs-multiline-pattern
|
|
func ValidateLogOpt(cfg map[string]string) error {
|
|
for key := range cfg {
|
|
switch key {
|
|
case logGroupKey:
|
|
case logStreamKey:
|
|
case logCreateGroupKey:
|
|
case regionKey:
|
|
case endpointKey:
|
|
case tagKey:
|
|
case datetimeFormatKey:
|
|
case multilinePatternKey:
|
|
case credentialsEndpointKey:
|
|
case forceFlushIntervalKey:
|
|
case maxBufferedEventsKey:
|
|
case logFormatKey:
|
|
default:
|
|
return fmt.Errorf("unknown log opt '%s' for %s log driver", key, name)
|
|
}
|
|
}
|
|
if cfg[logGroupKey] == "" {
|
|
return fmt.Errorf("must specify a value for log opt '%s'", logGroupKey)
|
|
}
|
|
if cfg[logCreateGroupKey] != "" {
|
|
if _, err := strconv.ParseBool(cfg[logCreateGroupKey]); err != nil {
|
|
return fmt.Errorf("must specify valid value for log opt '%s': %v", logCreateGroupKey, err)
|
|
}
|
|
}
|
|
if cfg[forceFlushIntervalKey] != "" {
|
|
if value, err := strconv.Atoi(cfg[forceFlushIntervalKey]); err != nil || value <= 0 {
|
|
return fmt.Errorf("must specify a positive integer for log opt '%s': %v", forceFlushIntervalKey, cfg[forceFlushIntervalKey])
|
|
}
|
|
}
|
|
if cfg[maxBufferedEventsKey] != "" {
|
|
if value, err := strconv.Atoi(cfg[maxBufferedEventsKey]); err != nil || value <= 0 {
|
|
return fmt.Errorf("must specify a positive integer for log opt '%s': %v", maxBufferedEventsKey, cfg[maxBufferedEventsKey])
|
|
}
|
|
}
|
|
_, datetimeFormatKeyExists := cfg[datetimeFormatKey]
|
|
_, multilinePatternKeyExists := cfg[multilinePatternKey]
|
|
if datetimeFormatKeyExists && multilinePatternKeyExists {
|
|
return fmt.Errorf("you cannot configure log opt '%s' and '%s' at the same time", datetimeFormatKey, multilinePatternKey)
|
|
}
|
|
|
|
if cfg[logFormatKey] != "" {
|
|
// For now, only the "json/emf" log format is supported
|
|
if cfg[logFormatKey] != jsonEmfLogFormat {
|
|
return fmt.Errorf("unsupported log format '%s'", cfg[logFormatKey])
|
|
}
|
|
if datetimeFormatKeyExists || multilinePatternKeyExists {
|
|
return fmt.Errorf("you cannot configure log opt '%s' or '%s' when log opt '%s' is set to '%s'", datetimeFormatKey, multilinePatternKey, logFormatKey, jsonEmfLogFormat)
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// Len returns the length of a byTimestamp slice. Len is required by the
|
|
// sort.Interface interface.
|
|
func (slice byTimestamp) Len() int {
|
|
return len(slice)
|
|
}
|
|
|
|
// Less compares two values in a byTimestamp slice by Timestamp. Less is
|
|
// required by the sort.Interface interface.
|
|
func (slice byTimestamp) Less(i, j int) bool {
|
|
iTimestamp, jTimestamp := int64(0), int64(0)
|
|
if slice != nil && slice[i].inputLogEvent.Timestamp != nil {
|
|
iTimestamp = *slice[i].inputLogEvent.Timestamp
|
|
}
|
|
if slice != nil && slice[j].inputLogEvent.Timestamp != nil {
|
|
jTimestamp = *slice[j].inputLogEvent.Timestamp
|
|
}
|
|
if iTimestamp == jTimestamp {
|
|
return slice[i].insertOrder < slice[j].insertOrder
|
|
}
|
|
return iTimestamp < jTimestamp
|
|
}
|
|
|
|
// Swap swaps two values in a byTimestamp slice with each other. Swap is
|
|
// required by the sort.Interface interface.
|
|
func (slice byTimestamp) Swap(i, j int) {
|
|
slice[i], slice[j] = slice[j], slice[i]
|
|
}
|
|
|
|
func unwrapEvents(events []wrappedEvent) []types.InputLogEvent {
|
|
cwEvents := make([]types.InputLogEvent, len(events))
|
|
for i, input := range events {
|
|
cwEvents[i] = input.inputLogEvent
|
|
}
|
|
return cwEvents
|
|
}
|
|
|
|
func newEventBatch() *eventBatch {
|
|
return &eventBatch{
|
|
batch: make([]wrappedEvent, 0),
|
|
bytes: 0,
|
|
}
|
|
}
|
|
|
|
// events returns a slice of wrappedEvents sorted in order of their
|
|
// timestamps and then by their insertion order (see `byTimestamp`).
|
|
//
|
|
// Warning: this method is not threadsafe and must not be used
|
|
// concurrently.
|
|
func (b *eventBatch) events() []wrappedEvent {
|
|
sort.Sort(byTimestamp(b.batch))
|
|
return b.batch
|
|
}
|
|
|
|
// add adds an event to the batch of events accounting for the
|
|
// necessary overhead for an event to be logged. An error will be
|
|
// returned if the event cannot be added to the batch due to service
|
|
// limits.
|
|
//
|
|
// Warning: this method is not threadsafe and must not be used
|
|
// concurrently.
|
|
func (b *eventBatch) add(event wrappedEvent, size int) bool {
|
|
addBytes := size + perEventBytes
|
|
|
|
// verify we are still within service limits
|
|
switch {
|
|
case len(b.batch)+1 > maximumLogEventsPerPut:
|
|
return false
|
|
case b.bytes+addBytes > maximumBytesPerPut:
|
|
return false
|
|
}
|
|
|
|
b.bytes += addBytes
|
|
b.batch = append(b.batch, event)
|
|
|
|
return true
|
|
}
|
|
|
|
// count is the number of batched events. Warning: this method
|
|
// is not threadsafe and must not be used concurrently.
|
|
func (b *eventBatch) count() int {
|
|
return len(b.batch)
|
|
}
|
|
|
|
// size is the total number of bytes that the batch represents.
|
|
//
|
|
// Warning: this method is not threadsafe and must not be used
|
|
// concurrently.
|
|
func (b *eventBatch) size() int {
|
|
return b.bytes
|
|
}
|
|
|
|
func (b *eventBatch) isEmpty() bool {
|
|
zeroEvents := b.count() == 0
|
|
zeroSize := b.size() == 0
|
|
return zeroEvents && zeroSize
|
|
}
|
|
|
|
// reset prepares the batch for reuse.
|
|
func (b *eventBatch) reset() {
|
|
b.bytes = 0
|
|
b.batch = b.batch[:0]
|
|
}
|