123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899 |
- // Package awslogs provides the logdriver for forwarding container logs to Amazon CloudWatch Logs
- package awslogs // import "github.com/docker/docker/daemon/logger/awslogs"
- import (
- "context"
- "fmt"
- "os"
- "regexp"
- "sort"
- "strconv"
- "sync"
- "time"
- "unicode/utf8"
- "github.com/aws/aws-sdk-go-v2/aws"
- "github.com/aws/aws-sdk-go-v2/aws/middleware"
- "github.com/aws/aws-sdk-go-v2/config"
- "github.com/aws/aws-sdk-go-v2/credentials/endpointcreds"
- "github.com/aws/aws-sdk-go-v2/feature/ec2/imds"
- "github.com/aws/aws-sdk-go-v2/service/cloudwatchlogs"
- "github.com/aws/aws-sdk-go-v2/service/cloudwatchlogs/types"
- "github.com/aws/smithy-go"
- smithymiddleware "github.com/aws/smithy-go/middleware"
- smithyhttp "github.com/aws/smithy-go/transport/http"
- "github.com/containerd/log"
- "github.com/docker/docker/daemon/logger"
- "github.com/docker/docker/daemon/logger/loggerutils"
- "github.com/docker/docker/dockerversion"
- "github.com/pkg/errors"
- )
- const (
- name = "awslogs"
- regionKey = "awslogs-region"
- endpointKey = "awslogs-endpoint"
- regionEnvKey = "AWS_REGION"
- logGroupKey = "awslogs-group"
- logStreamKey = "awslogs-stream"
- logCreateGroupKey = "awslogs-create-group"
- logCreateStreamKey = "awslogs-create-stream"
- tagKey = "tag"
- datetimeFormatKey = "awslogs-datetime-format"
- multilinePatternKey = "awslogs-multiline-pattern"
- credentialsEndpointKey = "awslogs-credentials-endpoint" //nolint:gosec // G101: Potential hardcoded credentials
- forceFlushIntervalKey = "awslogs-force-flush-interval-seconds"
- maxBufferedEventsKey = "awslogs-max-buffered-events"
- logFormatKey = "awslogs-format"
- defaultForceFlushInterval = 5 * time.Second
- defaultMaxBufferedEvents = 4096
- // See: http://docs.aws.amazon.com/AmazonCloudWatchLogs/latest/APIReference/API_PutLogEvents.html
- perEventBytes = 26
- maximumBytesPerPut = 1048576
- maximumLogEventsPerPut = 10000
- // See: http://docs.aws.amazon.com/AmazonCloudWatch/latest/DeveloperGuide/cloudwatch_limits.html
- // Because the events are interpreted as UTF-8 encoded Unicode, invalid UTF-8 byte sequences are replaced with the
- // Unicode replacement character (U+FFFD), which is a 3-byte sequence in UTF-8. To compensate for that and to avoid
- // splitting valid UTF-8 characters into invalid byte sequences, we calculate the length of each event assuming that
- // this replacement happens.
- maximumBytesPerEvent = 262144 - perEventBytes
- credentialsEndpoint = "http://169.254.170.2" //nolint:gosec // G101: Potential hardcoded credentials
- // See: https://docs.aws.amazon.com/AmazonCloudWatch/latest/monitoring/CloudWatch_Embedded_Metric_Format_Specification.html
- logsFormatHeader = "x-amzn-logs-format"
- jsonEmfLogFormat = "json/emf"
- )
- type logStream struct {
- logStreamName string
- logGroupName string
- logCreateGroup bool
- logCreateStream bool
- forceFlushInterval time.Duration
- multilinePattern *regexp.Regexp
- client api
- messages chan *logger.Message
- lock sync.RWMutex
- closed bool
- sequenceToken *string
- }
- type logStreamConfig struct {
- logStreamName string
- logGroupName string
- logCreateGroup bool
- logCreateStream bool
- forceFlushInterval time.Duration
- maxBufferedEvents int
- multilinePattern *regexp.Regexp
- }
- var _ logger.SizedLogger = &logStream{}
- type api interface {
- CreateLogGroup(context.Context, *cloudwatchlogs.CreateLogGroupInput, ...func(*cloudwatchlogs.Options)) (*cloudwatchlogs.CreateLogGroupOutput, error)
- CreateLogStream(context.Context, *cloudwatchlogs.CreateLogStreamInput, ...func(*cloudwatchlogs.Options)) (*cloudwatchlogs.CreateLogStreamOutput, error)
- PutLogEvents(context.Context, *cloudwatchlogs.PutLogEventsInput, ...func(*cloudwatchlogs.Options)) (*cloudwatchlogs.PutLogEventsOutput, error)
- }
- type regionFinder interface {
- GetRegion(context.Context, *imds.GetRegionInput, ...func(*imds.Options)) (*imds.GetRegionOutput, error)
- }
- type wrappedEvent struct {
- inputLogEvent types.InputLogEvent
- insertOrder int
- }
- type byTimestamp []wrappedEvent
- // init registers the awslogs driver
- func init() {
- if err := logger.RegisterLogDriver(name, New); err != nil {
- panic(err)
- }
- if err := logger.RegisterLogOptValidator(name, ValidateLogOpt); err != nil {
- panic(err)
- }
- }
- // eventBatch holds the events that are batched for submission and the
- // associated data about it.
- //
- // Warning: this type is not threadsafe and must not be used
- // concurrently. This type is expected to be consumed in a single go
- // routine and never concurrently.
- type eventBatch struct {
- batch []wrappedEvent
- bytes int
- }
- // New creates an awslogs logger using the configuration passed in on the
- // context. Supported context configuration variables are awslogs-region,
- // awslogs-endpoint, awslogs-group, awslogs-stream, awslogs-create-group,
- // awslogs-multiline-pattern and awslogs-datetime-format.
- // When available, configuration is also taken from environment variables
- // AWS_REGION, AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY, the shared credentials
- // file (~/.aws/credentials), and the EC2 Instance Metadata Service.
- func New(info logger.Info) (logger.Logger, error) {
- containerStreamConfig, err := newStreamConfig(info)
- if err != nil {
- return nil, err
- }
- client, err := newAWSLogsClient(info)
- if err != nil {
- return nil, err
- }
- logNonBlocking := info.Config["mode"] == "non-blocking"
- containerStream := &logStream{
- logStreamName: containerStreamConfig.logStreamName,
- logGroupName: containerStreamConfig.logGroupName,
- logCreateGroup: containerStreamConfig.logCreateGroup,
- logCreateStream: containerStreamConfig.logCreateStream,
- forceFlushInterval: containerStreamConfig.forceFlushInterval,
- multilinePattern: containerStreamConfig.multilinePattern,
- client: client,
- messages: make(chan *logger.Message, containerStreamConfig.maxBufferedEvents),
- }
- creationDone := make(chan bool)
- if logNonBlocking {
- go func() {
- backoff := 1
- maxBackoff := 32
- for {
- // If logger is closed we are done
- containerStream.lock.RLock()
- if containerStream.closed {
- containerStream.lock.RUnlock()
- break
- }
- containerStream.lock.RUnlock()
- err := containerStream.create()
- if err == nil {
- break
- }
- time.Sleep(time.Duration(backoff) * time.Second)
- if backoff < maxBackoff {
- backoff *= 2
- }
- log.G(context.TODO()).
- WithError(err).
- WithField("container-id", info.ContainerID).
- WithField("container-name", info.ContainerName).
- Error("Error while trying to initialize awslogs. Retrying in: ", backoff, " seconds")
- }
- close(creationDone)
- }()
- } else {
- if err = containerStream.create(); err != nil {
- return nil, err
- }
- close(creationDone)
- }
- go containerStream.collectBatch(creationDone)
- return containerStream, nil
- }
- // Parses most of the awslogs- options and prepares a config object to be used for newing the actual stream
- // It has been formed out to ease Utest of the New above
- func newStreamConfig(info logger.Info) (*logStreamConfig, error) {
- logGroupName := info.Config[logGroupKey]
- logStreamName, err := loggerutils.ParseLogTag(info, "{{.FullID}}")
- if err != nil {
- return nil, err
- }
- logCreateGroup := false
- if info.Config[logCreateGroupKey] != "" {
- logCreateGroup, err = strconv.ParseBool(info.Config[logCreateGroupKey])
- if err != nil {
- return nil, err
- }
- }
- forceFlushInterval := defaultForceFlushInterval
- if info.Config[forceFlushIntervalKey] != "" {
- forceFlushIntervalAsInt, err := strconv.Atoi(info.Config[forceFlushIntervalKey])
- if err != nil {
- return nil, err
- }
- forceFlushInterval = time.Duration(forceFlushIntervalAsInt) * time.Second
- }
- maxBufferedEvents := int(defaultMaxBufferedEvents)
- if info.Config[maxBufferedEventsKey] != "" {
- maxBufferedEvents, err = strconv.Atoi(info.Config[maxBufferedEventsKey])
- if err != nil {
- return nil, err
- }
- }
- if info.Config[logStreamKey] != "" {
- logStreamName = info.Config[logStreamKey]
- }
- logCreateStream := true
- if info.Config[logCreateStreamKey] != "" {
- logCreateStream, err = strconv.ParseBool(info.Config[logCreateStreamKey])
- if err != nil {
- return nil, err
- }
- }
- multilinePattern, err := parseMultilineOptions(info)
- if err != nil {
- return nil, err
- }
- containerStreamConfig := &logStreamConfig{
- logStreamName: logStreamName,
- logGroupName: logGroupName,
- logCreateGroup: logCreateGroup,
- logCreateStream: logCreateStream,
- forceFlushInterval: forceFlushInterval,
- maxBufferedEvents: maxBufferedEvents,
- multilinePattern: multilinePattern,
- }
- return containerStreamConfig, nil
- }
- // Parses awslogs-multiline-pattern and awslogs-datetime-format options
- // If awslogs-datetime-format is present, convert the format from strftime
- // to regexp and return.
- // If awslogs-multiline-pattern is present, compile regexp and return
- func parseMultilineOptions(info logger.Info) (*regexp.Regexp, error) {
- dateTimeFormat := info.Config[datetimeFormatKey]
- multilinePatternKey := info.Config[multilinePatternKey]
- // strftime input is parsed into a regular expression
- if dateTimeFormat != "" {
- // %. matches each strftime format sequence and ReplaceAllStringFunc
- // looks up each format sequence in the conversion table strftimeToRegex
- // to replace with a defined regular expression
- r := regexp.MustCompile("%.")
- multilinePatternKey = r.ReplaceAllStringFunc(dateTimeFormat, func(s string) string {
- return strftimeToRegex[s]
- })
- }
- if multilinePatternKey != "" {
- multilinePattern, err := regexp.Compile(multilinePatternKey)
- if err != nil {
- return nil, errors.Wrapf(err, "awslogs could not parse multiline pattern key %q", multilinePatternKey)
- }
- return multilinePattern, nil
- }
- return nil, nil
- }
- // Maps strftime format strings to regex
- var strftimeToRegex = map[string]string{
- /*weekdayShort */ `%a`: `(?:Mon|Tue|Wed|Thu|Fri|Sat|Sun)`,
- /*weekdayFull */ `%A`: `(?:Monday|Tuesday|Wednesday|Thursday|Friday|Saturday|Sunday)`,
- /*weekdayZeroIndex */ `%w`: `[0-6]`,
- /*dayZeroPadded */ `%d`: `(?:0[1-9]|[1,2][0-9]|3[0,1])`,
- /*monthShort */ `%b`: `(?:Jan|Feb|Mar|Apr|May|Jun|Jul|Aug|Sep|Oct|Nov|Dec)`,
- /*monthFull */ `%B`: `(?:January|February|March|April|May|June|July|August|September|October|November|December)`,
- /*monthZeroPadded */ `%m`: `(?:0[1-9]|1[0-2])`,
- /*yearCentury */ `%Y`: `\d{4}`,
- /*yearZeroPadded */ `%y`: `\d{2}`,
- /*hour24ZeroPadded */ `%H`: `(?:[0,1][0-9]|2[0-3])`,
- /*hour12ZeroPadded */ `%I`: `(?:0[0-9]|1[0-2])`,
- /*AM or PM */ `%p`: "[A,P]M",
- /*minuteZeroPadded */ `%M`: `[0-5][0-9]`,
- /*secondZeroPadded */ `%S`: `[0-5][0-9]`,
- /*microsecondZeroPadded */ `%f`: `\d{6}`,
- /*utcOffset */ `%z`: `[+-]\d{4}`,
- /*tzName */ `%Z`: `[A-Z]{1,4}T`,
- /*dayOfYearZeroPadded */ `%j`: `(?:0[0-9][1-9]|[1,2][0-9][0-9]|3[0-5][0-9]|36[0-6])`,
- /*milliseconds */ `%L`: `\.\d{3}`,
- }
- // newRegionFinder is a variable such that the implementation
- // can be swapped out for unit tests.
- var newRegionFinder = func(ctx context.Context) (regionFinder, error) {
- cfg, err := config.LoadDefaultConfig(ctx) // default config, because we don't yet know the region
- if err != nil {
- return nil, err
- }
- client := imds.NewFromConfig(cfg)
- return client, nil
- }
- // newSDKEndpoint is a variable such that the implementation
- // can be swapped out for unit tests.
- var newSDKEndpoint = credentialsEndpoint
- // newAWSLogsClient creates the service client for Amazon CloudWatch Logs.
- // Customizations to the default client from the SDK include a Docker-specific
- // User-Agent string and automatic region detection using the EC2 Instance
- // Metadata Service when region is otherwise unspecified.
- func newAWSLogsClient(info logger.Info, configOpts ...func(*config.LoadOptions) error) (*cloudwatchlogs.Client, error) {
- ctx := context.TODO()
- var region, endpoint *string
- if os.Getenv(regionEnvKey) != "" {
- region = aws.String(os.Getenv(regionEnvKey))
- }
- if info.Config[regionKey] != "" {
- region = aws.String(info.Config[regionKey])
- }
- if info.Config[endpointKey] != "" {
- endpoint = aws.String(info.Config[endpointKey])
- }
- if region == nil || *region == "" {
- log.G(ctx).Info("Trying to get region from IMDS")
- regFinder, err := newRegionFinder(context.TODO())
- if err != nil {
- log.G(ctx).WithError(err).Error("could not create regionFinder")
- return nil, errors.Wrap(err, "could not create regionFinder")
- }
- r, err := regFinder.GetRegion(context.TODO(), &imds.GetRegionInput{})
- if err != nil {
- log.G(ctx).WithError(err).Error("Could not get region from IMDS, environment, or log option")
- return nil, errors.Wrap(err, "cannot determine region for awslogs driver")
- }
- region = &r.Region
- }
- configOpts = append(configOpts, config.WithRegion(*region))
- if uri, ok := info.Config[credentialsEndpointKey]; ok {
- log.G(ctx).Debugf("Trying to get credentials from awslogs-credentials-endpoint")
- endpoint := fmt.Sprintf("%s%s", newSDKEndpoint, uri)
- configOpts = append(configOpts, config.WithCredentialsProvider(endpointcreds.New(endpoint)))
- }
- cfg, err := config.LoadDefaultConfig(context.TODO(), configOpts...)
- if err != nil {
- log.G(ctx).WithError(err).Error("Could not initialize AWS SDK config")
- return nil, errors.Wrap(err, "could not initialize AWS SDK config")
- }
- log.G(ctx).WithFields(log.Fields{
- "region": *region,
- }).Debug("Created awslogs client")
- var clientOpts []func(*cloudwatchlogs.Options)
- if info.Config[logFormatKey] != "" {
- logFormatMiddleware := smithymiddleware.BuildMiddlewareFunc("logFormat", func(
- ctx context.Context, in smithymiddleware.BuildInput, next smithymiddleware.BuildHandler,
- ) (
- out smithymiddleware.BuildOutput, metadata smithymiddleware.Metadata, err error,
- ) {
- switch v := in.Request.(type) {
- case *smithyhttp.Request:
- v.Header.Add(logsFormatHeader, jsonEmfLogFormat)
- }
- return next.HandleBuild(ctx, in)
- })
- clientOpts = append(
- clientOpts,
- cloudwatchlogs.WithAPIOptions(func(stack *smithymiddleware.Stack) error {
- return stack.Build.Add(logFormatMiddleware, smithymiddleware.Before)
- }),
- )
- }
- clientOpts = append(
- clientOpts,
- cloudwatchlogs.WithAPIOptions(middleware.AddUserAgentKeyValue("Docker", dockerversion.Version)),
- )
- if endpoint != nil {
- clientOpts = append(clientOpts, cloudwatchlogs.WithEndpointResolver(cloudwatchlogs.EndpointResolverFromURL(*endpoint)))
- }
- client := cloudwatchlogs.NewFromConfig(cfg, clientOpts...)
- return client, nil
- }
- // Name returns the name of the awslogs logging driver
- func (l *logStream) Name() string {
- return name
- }
- // BufSize returns the maximum bytes CloudWatch can handle.
- func (l *logStream) BufSize() int {
- return maximumBytesPerEvent
- }
- // Log submits messages for logging by an instance of the awslogs logging driver
- func (l *logStream) Log(msg *logger.Message) error {
- l.lock.RLock()
- defer l.lock.RUnlock()
- if l.closed {
- return errors.New("awslogs is closed")
- }
- l.messages <- msg
- return nil
- }
- // Close closes the instance of the awslogs logging driver
- func (l *logStream) Close() error {
- l.lock.Lock()
- defer l.lock.Unlock()
- if !l.closed {
- close(l.messages)
- }
- l.closed = true
- return nil
- }
- // create creates log group and log stream for the instance of the awslogs logging driver
- func (l *logStream) create() error {
- err := l.createLogStream()
- if err == nil {
- return nil
- }
- var apiErr *types.ResourceNotFoundException
- if errors.As(err, &apiErr) && l.logCreateGroup {
- if err := l.createLogGroup(); err != nil {
- return errors.Wrap(err, "failed to create Cloudwatch log group")
- }
- err = l.createLogStream()
- if err == nil {
- return nil
- }
- }
- return errors.Wrap(err, "failed to create Cloudwatch log stream")
- }
- // createLogGroup creates a log group for the instance of the awslogs logging driver
- func (l *logStream) createLogGroup() error {
- if _, err := l.client.CreateLogGroup(context.TODO(), &cloudwatchlogs.CreateLogGroupInput{
- LogGroupName: aws.String(l.logGroupName),
- }); err != nil {
- var apiErr smithy.APIError
- if errors.As(err, &apiErr) {
- fields := log.Fields{
- "errorCode": apiErr.ErrorCode(),
- "message": apiErr.ErrorMessage(),
- "logGroupName": l.logGroupName,
- "logCreateGroup": l.logCreateGroup,
- }
- if _, ok := apiErr.(*types.ResourceAlreadyExistsException); ok {
- // Allow creation to succeed
- log.G(context.TODO()).WithFields(fields).Info("Log group already exists")
- return nil
- }
- log.G(context.TODO()).WithFields(fields).Error("Failed to create log group")
- }
- return err
- }
- return nil
- }
- // createLogStream creates a log stream for the instance of the awslogs logging driver
- func (l *logStream) createLogStream() error {
- // Directly return if we do not want to create log stream.
- if !l.logCreateStream {
- log.G(context.TODO()).WithFields(log.Fields{
- "logGroupName": l.logGroupName,
- "logStreamName": l.logStreamName,
- "logCreateStream": l.logCreateStream,
- }).Info("Skipping creating log stream")
- return nil
- }
- input := &cloudwatchlogs.CreateLogStreamInput{
- LogGroupName: aws.String(l.logGroupName),
- LogStreamName: aws.String(l.logStreamName),
- }
- _, err := l.client.CreateLogStream(context.TODO(), input)
- if err != nil {
- var apiErr smithy.APIError
- if errors.As(err, &apiErr) {
- fields := log.Fields{
- "errorCode": apiErr.ErrorCode(),
- "message": apiErr.ErrorMessage(),
- "logGroupName": l.logGroupName,
- "logStreamName": l.logStreamName,
- }
- if _, ok := apiErr.(*types.ResourceAlreadyExistsException); ok {
- // Allow creation to succeed
- log.G(context.TODO()).WithFields(fields).Info("Log stream already exists")
- return nil
- }
- log.G(context.TODO()).WithFields(fields).Error("Failed to create log stream")
- }
- }
- return err
- }
- // newTicker is used for time-based batching. newTicker is a variable such
- // that the implementation can be swapped out for unit tests.
- var newTicker = func(freq time.Duration) *time.Ticker {
- return time.NewTicker(freq)
- }
- // collectBatch executes as a goroutine to perform batching of log events for
- // submission to the log stream. If the awslogs-multiline-pattern or
- // awslogs-datetime-format options have been configured, multiline processing
- // is enabled, where log messages are stored in an event buffer until a multiline
- // pattern match is found, at which point the messages in the event buffer are
- // pushed to CloudWatch logs as a single log event. Multiline messages are processed
- // according to the maximumBytesPerPut constraint, and the implementation only
- // allows for messages to be buffered for a maximum of 2*l.forceFlushInterval
- // seconds. If no forceFlushInterval is specified for the log stream, then the default
- // of 5 seconds will be used resulting in a maximum of 10 seconds buffer time for multiline
- // messages. When events are ready to be processed for submission to CloudWatch
- // Logs, the processEvents method is called. If a multiline pattern is not
- // configured, log events are submitted to the processEvents method immediately.
- func (l *logStream) collectBatch(created chan bool) {
- // Wait for the logstream/group to be created
- <-created
- flushInterval := l.forceFlushInterval
- if flushInterval <= 0 {
- flushInterval = defaultForceFlushInterval
- }
- ticker := newTicker(flushInterval)
- var eventBuffer []byte
- var eventBufferTimestamp int64
- batch := newEventBatch()
- for {
- select {
- case t := <-ticker.C:
- // If event buffer is older than batch publish frequency flush the event buffer
- if eventBufferTimestamp > 0 && len(eventBuffer) > 0 {
- eventBufferAge := t.UnixNano()/int64(time.Millisecond) - eventBufferTimestamp
- eventBufferExpired := eventBufferAge >= int64(flushInterval)/int64(time.Millisecond)
- eventBufferNegative := eventBufferAge < 0
- if eventBufferExpired || eventBufferNegative {
- l.processEvent(batch, eventBuffer, eventBufferTimestamp)
- eventBuffer = eventBuffer[:0]
- }
- }
- l.publishBatch(batch)
- batch.reset()
- case msg, more := <-l.messages:
- if !more {
- // Flush event buffer and release resources
- l.processEvent(batch, eventBuffer, eventBufferTimestamp)
- l.publishBatch(batch)
- batch.reset()
- return
- }
- if eventBufferTimestamp == 0 {
- eventBufferTimestamp = msg.Timestamp.UnixNano() / int64(time.Millisecond)
- }
- line := msg.Line
- if l.multilinePattern != nil {
- lineEffectiveLen := effectiveLen(string(line))
- if l.multilinePattern.Match(line) || effectiveLen(string(eventBuffer))+lineEffectiveLen > maximumBytesPerEvent {
- // This is a new log event or we will exceed max bytes per event
- // so flush the current eventBuffer to events and reset timestamp
- l.processEvent(batch, eventBuffer, eventBufferTimestamp)
- eventBufferTimestamp = msg.Timestamp.UnixNano() / int64(time.Millisecond)
- eventBuffer = eventBuffer[:0]
- }
- // Append newline if event is less than max event size
- if lineEffectiveLen < maximumBytesPerEvent {
- line = append(line, "\n"...)
- }
- eventBuffer = append(eventBuffer, line...)
- logger.PutMessage(msg)
- } else {
- l.processEvent(batch, line, msg.Timestamp.UnixNano()/int64(time.Millisecond))
- logger.PutMessage(msg)
- }
- }
- }
- }
- // processEvent processes log events that are ready for submission to CloudWatch
- // logs. Batching is performed on time- and size-bases. Time-based batching occurs
- // at the interval defined by awslogs-force-flush-interval-seconds (defaults to 5 seconds).
- // Size-based batching is performed on the maximum number of events per batch
- // (defined in maximumLogEventsPerPut) and the maximum number of total bytes in a
- // batch (defined in maximumBytesPerPut). Log messages are split by the maximum
- // bytes per event (defined in maximumBytesPerEvent). There is a fixed per-event
- // byte overhead (defined in perEventBytes) which is accounted for in split- and
- // batch-calculations. Because the events are interpreted as UTF-8 encoded
- // Unicode, invalid UTF-8 byte sequences are replaced with the Unicode
- // replacement character (U+FFFD), which is a 3-byte sequence in UTF-8. To
- // compensate for that and to avoid splitting valid UTF-8 characters into
- // invalid byte sequences, we calculate the length of each event assuming that
- // this replacement happens.
- func (l *logStream) processEvent(batch *eventBatch, bytes []byte, timestamp int64) {
- for len(bytes) > 0 {
- // Split line length so it does not exceed the maximum
- splitOffset, lineBytes := findValidSplit(string(bytes), maximumBytesPerEvent)
- line := bytes[:splitOffset]
- event := wrappedEvent{
- inputLogEvent: types.InputLogEvent{
- Message: aws.String(string(line)),
- Timestamp: aws.Int64(timestamp),
- },
- insertOrder: batch.count(),
- }
- added := batch.add(event, lineBytes)
- if added {
- bytes = bytes[splitOffset:]
- } else {
- l.publishBatch(batch)
- batch.reset()
- }
- }
- }
- // effectiveLen counts the effective number of bytes in the string, after
- // UTF-8 normalization. UTF-8 normalization includes replacing bytes that do
- // not constitute valid UTF-8 encoded Unicode codepoints with the Unicode
- // replacement codepoint U+FFFD (a 3-byte UTF-8 sequence, represented in Go as
- // utf8.RuneError)
- func effectiveLen(line string) int {
- effectiveBytes := 0
- for _, rune := range line {
- effectiveBytes += utf8.RuneLen(rune)
- }
- return effectiveBytes
- }
- // findValidSplit finds the byte offset to split a string without breaking valid
- // Unicode codepoints given a maximum number of total bytes. findValidSplit
- // returns the byte offset for splitting a string or []byte, as well as the
- // effective number of bytes if the string were normalized to replace invalid
- // UTF-8 encoded bytes with the Unicode replacement character (a 3-byte UTF-8
- // sequence, represented in Go as utf8.RuneError)
- func findValidSplit(line string, maxBytes int) (splitOffset, effectiveBytes int) {
- for offset, rune := range line {
- splitOffset = offset
- if effectiveBytes+utf8.RuneLen(rune) > maxBytes {
- return splitOffset, effectiveBytes
- }
- effectiveBytes += utf8.RuneLen(rune)
- }
- splitOffset = len(line)
- return
- }
- // publishBatch calls PutLogEvents for a given set of InputLogEvents,
- // accounting for sequencing requirements (each request must reference the
- // sequence token returned by the previous request).
- func (l *logStream) publishBatch(batch *eventBatch) {
- if batch.isEmpty() {
- return
- }
- cwEvents := unwrapEvents(batch.events())
- nextSequenceToken, err := l.putLogEvents(cwEvents, l.sequenceToken)
- if err != nil {
- if apiErr := (*types.DataAlreadyAcceptedException)(nil); errors.As(err, &apiErr) {
- // already submitted, just grab the correct sequence token
- nextSequenceToken = apiErr.ExpectedSequenceToken
- log.G(context.TODO()).WithFields(log.Fields{
- "errorCode": apiErr.ErrorCode(),
- "message": apiErr.ErrorMessage(),
- "logGroupName": l.logGroupName,
- "logStreamName": l.logStreamName,
- }).Info("Data already accepted, ignoring error")
- err = nil
- } else if apiErr := (*types.InvalidSequenceTokenException)(nil); errors.As(err, &apiErr) {
- nextSequenceToken, err = l.putLogEvents(cwEvents, apiErr.ExpectedSequenceToken)
- }
- }
- if err != nil {
- log.G(context.TODO()).Error(err)
- } else {
- l.sequenceToken = nextSequenceToken
- }
- }
- // putLogEvents wraps the PutLogEvents API
- func (l *logStream) putLogEvents(events []types.InputLogEvent, sequenceToken *string) (*string, error) {
- input := &cloudwatchlogs.PutLogEventsInput{
- LogEvents: events,
- SequenceToken: sequenceToken,
- LogGroupName: aws.String(l.logGroupName),
- LogStreamName: aws.String(l.logStreamName),
- }
- resp, err := l.client.PutLogEvents(context.TODO(), input)
- if err != nil {
- var apiErr smithy.APIError
- if errors.As(err, &apiErr) {
- log.G(context.TODO()).WithFields(log.Fields{
- "errorCode": apiErr.ErrorCode(),
- "message": apiErr.ErrorMessage(),
- "logGroupName": l.logGroupName,
- "logStreamName": l.logStreamName,
- }).Error("Failed to put log events")
- }
- return nil, err
- }
- return resp.NextSequenceToken, nil
- }
- // ValidateLogOpt looks for awslogs-specific log options awslogs-region, awslogs-endpoint
- // awslogs-group, awslogs-stream, awslogs-create-group, awslogs-datetime-format,
- // awslogs-multiline-pattern
- func ValidateLogOpt(cfg map[string]string) error {
- for key := range cfg {
- switch key {
- case logGroupKey:
- case logStreamKey:
- case logCreateGroupKey:
- case regionKey:
- case endpointKey:
- case tagKey:
- case datetimeFormatKey:
- case multilinePatternKey:
- case credentialsEndpointKey:
- case forceFlushIntervalKey:
- case maxBufferedEventsKey:
- case logFormatKey:
- default:
- return fmt.Errorf("unknown log opt '%s' for %s log driver", key, name)
- }
- }
- if cfg[logGroupKey] == "" {
- return fmt.Errorf("must specify a value for log opt '%s'", logGroupKey)
- }
- if cfg[logCreateGroupKey] != "" {
- if _, err := strconv.ParseBool(cfg[logCreateGroupKey]); err != nil {
- return fmt.Errorf("must specify valid value for log opt '%s': %v", logCreateGroupKey, err)
- }
- }
- if cfg[forceFlushIntervalKey] != "" {
- if value, err := strconv.Atoi(cfg[forceFlushIntervalKey]); err != nil || value <= 0 {
- return fmt.Errorf("must specify a positive integer for log opt '%s': %v", forceFlushIntervalKey, cfg[forceFlushIntervalKey])
- }
- }
- if cfg[maxBufferedEventsKey] != "" {
- if value, err := strconv.Atoi(cfg[maxBufferedEventsKey]); err != nil || value <= 0 {
- return fmt.Errorf("must specify a positive integer for log opt '%s': %v", maxBufferedEventsKey, cfg[maxBufferedEventsKey])
- }
- }
- _, datetimeFormatKeyExists := cfg[datetimeFormatKey]
- _, multilinePatternKeyExists := cfg[multilinePatternKey]
- if datetimeFormatKeyExists && multilinePatternKeyExists {
- return fmt.Errorf("you cannot configure log opt '%s' and '%s' at the same time", datetimeFormatKey, multilinePatternKey)
- }
- if cfg[logFormatKey] != "" {
- // For now, only the "json/emf" log format is supported
- if cfg[logFormatKey] != jsonEmfLogFormat {
- return fmt.Errorf("unsupported log format '%s'", cfg[logFormatKey])
- }
- if datetimeFormatKeyExists || multilinePatternKeyExists {
- return fmt.Errorf("you cannot configure log opt '%s' or '%s' when log opt '%s' is set to '%s'", datetimeFormatKey, multilinePatternKey, logFormatKey, jsonEmfLogFormat)
- }
- }
- return nil
- }
- // Len returns the length of a byTimestamp slice. Len is required by the
- // sort.Interface interface.
- func (slice byTimestamp) Len() int {
- return len(slice)
- }
- // Less compares two values in a byTimestamp slice by Timestamp. Less is
- // required by the sort.Interface interface.
- func (slice byTimestamp) Less(i, j int) bool {
- iTimestamp, jTimestamp := int64(0), int64(0)
- if slice != nil && slice[i].inputLogEvent.Timestamp != nil {
- iTimestamp = *slice[i].inputLogEvent.Timestamp
- }
- if slice != nil && slice[j].inputLogEvent.Timestamp != nil {
- jTimestamp = *slice[j].inputLogEvent.Timestamp
- }
- if iTimestamp == jTimestamp {
- return slice[i].insertOrder < slice[j].insertOrder
- }
- return iTimestamp < jTimestamp
- }
- // Swap swaps two values in a byTimestamp slice with each other. Swap is
- // required by the sort.Interface interface.
- func (slice byTimestamp) Swap(i, j int) {
- slice[i], slice[j] = slice[j], slice[i]
- }
- func unwrapEvents(events []wrappedEvent) []types.InputLogEvent {
- cwEvents := make([]types.InputLogEvent, len(events))
- for i, input := range events {
- cwEvents[i] = input.inputLogEvent
- }
- return cwEvents
- }
- func newEventBatch() *eventBatch {
- return &eventBatch{
- batch: make([]wrappedEvent, 0),
- bytes: 0,
- }
- }
- // events returns a slice of wrappedEvents sorted in order of their
- // timestamps and then by their insertion order (see `byTimestamp`).
- //
- // Warning: this method is not threadsafe and must not be used
- // concurrently.
- func (b *eventBatch) events() []wrappedEvent {
- sort.Sort(byTimestamp(b.batch))
- return b.batch
- }
- // add adds an event to the batch of events accounting for the
- // necessary overhead for an event to be logged. An error will be
- // returned if the event cannot be added to the batch due to service
- // limits.
- //
- // Warning: this method is not threadsafe and must not be used
- // concurrently.
- func (b *eventBatch) add(event wrappedEvent, size int) bool {
- addBytes := size + perEventBytes
- // verify we are still within service limits
- switch {
- case len(b.batch)+1 > maximumLogEventsPerPut:
- return false
- case b.bytes+addBytes > maximumBytesPerPut:
- return false
- }
- b.bytes += addBytes
- b.batch = append(b.batch, event)
- return true
- }
- // count is the number of batched events. Warning: this method
- // is not threadsafe and must not be used concurrently.
- func (b *eventBatch) count() int {
- return len(b.batch)
- }
- // size is the total number of bytes that the batch represents.
- //
- // Warning: this method is not threadsafe and must not be used
- // concurrently.
- func (b *eventBatch) size() int {
- return b.bytes
- }
- func (b *eventBatch) isEmpty() bool {
- zeroEvents := b.count() == 0
- zeroSize := b.size() == 0
- return zeroEvents && zeroSize
- }
- // reset prepares the batch for reuse.
- func (b *eventBatch) reset() {
- b.bytes = 0
- b.batch = b.batch[:0]
- }
|