Merge pull request #29504 from yongtang/29334-awslogs-CreateLogGroup
Support of CreateLogGroup for awslogs
This commit is contained in:
commit
8820d0aec0
5 changed files with 138 additions and 16 deletions
|
@ -713,7 +713,7 @@ __docker_complete_log_drivers() {
|
|||
|
||||
__docker_complete_log_options() {
|
||||
# see docs/reference/logging/index.md
|
||||
local awslogs_options="awslogs-region awslogs-group awslogs-stream"
|
||||
local awslogs_options="awslogs-region awslogs-group awslogs-stream awslogs-create-group"
|
||||
local fluentd_options="env fluentd-address fluentd-async-connect fluentd-buffer-limit fluentd-retry-wait fluentd-max-retries labels tag"
|
||||
local gcplogs_options="env gcp-log-cmd gcp-project labels"
|
||||
local gelf_options="env gelf-address gelf-compression-level gelf-compression-type labels tag"
|
||||
|
|
|
@ -223,7 +223,7 @@ __docker_get_log_options() {
|
|||
local log_driver=${opt_args[--log-driver]:-"all"}
|
||||
local -a awslogs_options fluentd_options gelf_options journald_options json_file_options logentries_options syslog_options splunk_options
|
||||
|
||||
awslogs_options=("awslogs-region" "awslogs-group" "awslogs-stream")
|
||||
awslogs_options=("awslogs-region" "awslogs-group" "awslogs-stream" "awslogs-create-group")
|
||||
fluentd_options=("env" "fluentd-address" "fluentd-async-connect" "fluentd-buffer-limit" "fluentd-retry-wait" "fluentd-max-retries" "labels" "tag")
|
||||
gcplogs_options=("env" "gcp-log-cmd" "gcp-project" "labels")
|
||||
gelf_options=("env" "gelf-address" "gelf-compression-level" "gelf-compression-type" "labels" "tag")
|
||||
|
|
|
@ -2,11 +2,13 @@
|
|||
package awslogs
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"errors"
|
||||
"fmt"
|
||||
"os"
|
||||
"runtime"
|
||||
"sort"
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
@ -21,6 +23,7 @@ import (
|
|||
"github.com/docker/docker/daemon/logger"
|
||||
"github.com/docker/docker/daemon/logger/loggerutils"
|
||||
"github.com/docker/docker/dockerversion"
|
||||
"github.com/docker/docker/pkg/templates"
|
||||
)
|
||||
|
||||
const (
|
||||
|
@ -29,6 +32,7 @@ const (
|
|||
regionEnvKey = "AWS_REGION"
|
||||
logGroupKey = "awslogs-group"
|
||||
logStreamKey = "awslogs-stream"
|
||||
logCreateGroupKey = "awslogs-create-group"
|
||||
tagKey = "tag"
|
||||
batchPublishFrequency = 5 * time.Second
|
||||
|
||||
|
@ -43,21 +47,24 @@ const (
|
|||
resourceAlreadyExistsCode = "ResourceAlreadyExistsException"
|
||||
dataAlreadyAcceptedCode = "DataAlreadyAcceptedException"
|
||||
invalidSequenceTokenCode = "InvalidSequenceTokenException"
|
||||
resourceNotFoundCode = "ResourceNotFoundException"
|
||||
|
||||
userAgentHeader = "User-Agent"
|
||||
)
|
||||
|
||||
type logStream struct {
|
||||
logStreamName string
|
||||
logGroupName string
|
||||
client api
|
||||
messages chan *logger.Message
|
||||
lock sync.RWMutex
|
||||
closed bool
|
||||
sequenceToken *string
|
||||
logStreamName string
|
||||
logGroupName string
|
||||
logCreateGroup bool
|
||||
client api
|
||||
messages chan *logger.Message
|
||||
lock sync.RWMutex
|
||||
closed bool
|
||||
sequenceToken *string
|
||||
}
|
||||
|
||||
type api interface {
|
||||
CreateLogGroup(*cloudwatchlogs.CreateLogGroupInput) (*cloudwatchlogs.CreateLogGroupOutput, error)
|
||||
CreateLogStream(*cloudwatchlogs.CreateLogStreamInput) (*cloudwatchlogs.CreateLogStreamOutput, error)
|
||||
PutLogEvents(*cloudwatchlogs.PutLogEventsInput) (*cloudwatchlogs.PutLogEventsOutput, error)
|
||||
}
|
||||
|
@ -84,7 +91,7 @@ func init() {
|
|||
|
||||
// New creates an awslogs logger using the configuration passed in on the
|
||||
// context. Supported context configuration variables are awslogs-region,
|
||||
// awslogs-group, and awslogs-stream. When available, configuration is
|
||||
// awslogs-group, awslogs-stream, and awslogs-create-group. When available, configuration is
|
||||
// also taken from environment variables AWS_REGION, AWS_ACCESS_KEY_ID,
|
||||
// AWS_SECRET_ACCESS_KEY, the shared credentials file (~/.aws/credentials), and
|
||||
// the EC2 Instance Metadata Service.
|
||||
|
@ -94,6 +101,13 @@ func New(info logger.Info) (logger.Logger, error) {
|
|||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
logCreateGroup := false
|
||||
if info.Config[logCreateGroupKey] != "" {
|
||||
logCreateGroup, err = strconv.ParseBool(info.Config[logCreateGroupKey])
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
if info.Config[logStreamKey] != "" {
|
||||
logStreamName = info.Config[logStreamKey]
|
||||
|
@ -103,10 +117,11 @@ func New(info logger.Info) (logger.Logger, error) {
|
|||
return nil, err
|
||||
}
|
||||
containerStream := &logStream{
|
||||
logStreamName: logStreamName,
|
||||
logGroupName: logGroupName,
|
||||
client: client,
|
||||
messages: make(chan *logger.Message, 4096),
|
||||
logStreamName: logStreamName,
|
||||
logGroupName: logGroupName,
|
||||
logCreateGroup: logCreateGroup,
|
||||
client: client,
|
||||
messages: make(chan *logger.Message, 4096),
|
||||
}
|
||||
err = containerStream.create()
|
||||
if err != nil {
|
||||
|
@ -117,6 +132,19 @@ func New(info logger.Info) (logger.Logger, error) {
|
|||
return containerStream, nil
|
||||
}
|
||||
|
||||
func parseLogGroup(info logger.Info, groupTemplate string) (string, error) {
|
||||
tmpl, err := templates.NewParse("log-group", groupTemplate)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
buf := new(bytes.Buffer)
|
||||
if err := tmpl.Execute(buf, &info); err != nil {
|
||||
return "", err
|
||||
}
|
||||
|
||||
return buf.String(), nil
|
||||
}
|
||||
|
||||
// newRegionFinder is a variable such that the implementation
|
||||
// can be swapped out for unit tests.
|
||||
var newRegionFinder = func() regionFinder {
|
||||
|
@ -192,8 +220,50 @@ func (l *logStream) Close() error {
|
|||
return nil
|
||||
}
|
||||
|
||||
// create creates a log stream for the instance of the awslogs logging driver
|
||||
// create creates log group and log stream for the instance of the awslogs logging driver
|
||||
func (l *logStream) create() error {
|
||||
if err := l.createLogStream(); err != nil {
|
||||
if l.logCreateGroup {
|
||||
if awsErr, ok := err.(awserr.Error); ok && awsErr.Code() == resourceNotFoundCode {
|
||||
if err := l.createLogGroup(); err != nil {
|
||||
return err
|
||||
}
|
||||
return l.createLogStream()
|
||||
}
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// createLogGroup creates a log group for the instance of the awslogs logging driver
|
||||
func (l *logStream) createLogGroup() error {
|
||||
if _, err := l.client.CreateLogGroup(&cloudwatchlogs.CreateLogGroupInput{
|
||||
LogGroupName: aws.String(l.logGroupName),
|
||||
}); err != nil {
|
||||
if awsErr, ok := err.(awserr.Error); ok {
|
||||
fields := logrus.Fields{
|
||||
"errorCode": awsErr.Code(),
|
||||
"message": awsErr.Message(),
|
||||
"origError": awsErr.OrigErr(),
|
||||
"logGroupName": l.logGroupName,
|
||||
"logCreateGroup": l.logCreateGroup,
|
||||
}
|
||||
if awsErr.Code() == resourceAlreadyExistsCode {
|
||||
// Allow creation to succeed
|
||||
logrus.WithFields(fields).Info("Log group already exists")
|
||||
return nil
|
||||
}
|
||||
logrus.WithFields(fields).Error("Failed to create log group")
|
||||
}
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// createLogStream creates a log stream for the instance of the awslogs logging driver
|
||||
func (l *logStream) createLogStream() error {
|
||||
input := &cloudwatchlogs.CreateLogStreamInput{
|
||||
LogGroupName: aws.String(l.logGroupName),
|
||||
LogStreamName: aws.String(l.logStreamName),
|
||||
|
@ -349,12 +419,13 @@ func (l *logStream) putLogEvents(events []*cloudwatchlogs.InputLogEvent, sequenc
|
|||
}
|
||||
|
||||
// ValidateLogOpt looks for awslogs-specific log options awslogs-region,
|
||||
// awslogs-group, and awslogs-stream
|
||||
// awslogs-group, awslogs-stream, awslogs-create-group
|
||||
func ValidateLogOpt(cfg map[string]string) error {
|
||||
for key := range cfg {
|
||||
switch key {
|
||||
case logGroupKey:
|
||||
case logStreamKey:
|
||||
case logCreateGroupKey:
|
||||
case regionKey:
|
||||
case tagKey:
|
||||
default:
|
||||
|
@ -364,6 +435,11 @@ func ValidateLogOpt(cfg map[string]string) error {
|
|||
if cfg[logGroupKey] == "" {
|
||||
return fmt.Errorf("must specify a value for log opt '%s'", logGroupKey)
|
||||
}
|
||||
if cfg[logCreateGroupKey] != "" {
|
||||
if _, err := strconv.ParseBool(cfg[logCreateGroupKey]); err != nil {
|
||||
return fmt.Errorf("must specify valid value for log opt '%s': %v", logCreateGroupKey, err)
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
|
|
|
@ -106,6 +106,37 @@ func TestCreateSuccess(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
func TestCreateLogGroupSuccess(t *testing.T) {
|
||||
mockClient := newMockClient()
|
||||
stream := &logStream{
|
||||
client: mockClient,
|
||||
logGroupName: groupName,
|
||||
logStreamName: streamName,
|
||||
logCreateGroup: true,
|
||||
}
|
||||
mockClient.createLogGroupResult <- &createLogGroupResult{}
|
||||
mockClient.createLogStreamResult <- &createLogStreamResult{}
|
||||
|
||||
err := stream.create()
|
||||
|
||||
if err != nil {
|
||||
t.Errorf("Received unexpected err: %v\n", err)
|
||||
}
|
||||
argument := <-mockClient.createLogStreamArgument
|
||||
if argument.LogGroupName == nil {
|
||||
t.Fatal("Expected non-nil LogGroupName")
|
||||
}
|
||||
if *argument.LogGroupName != groupName {
|
||||
t.Errorf("Expected LogGroupName to be %s", groupName)
|
||||
}
|
||||
if argument.LogStreamName == nil {
|
||||
t.Fatal("Expected non-nil LogStreamName")
|
||||
}
|
||||
if *argument.LogStreamName != streamName {
|
||||
t.Errorf("Expected LogStreamName to be %s", streamName)
|
||||
}
|
||||
}
|
||||
|
||||
func TestCreateError(t *testing.T) {
|
||||
mockClient := newMockClient()
|
||||
stream := &logStream{
|
||||
|
|
|
@ -3,12 +3,19 @@ package awslogs
|
|||
import "github.com/aws/aws-sdk-go/service/cloudwatchlogs"
|
||||
|
||||
type mockcwlogsclient struct {
|
||||
createLogGroupArgument chan *cloudwatchlogs.CreateLogGroupInput
|
||||
createLogGroupResult chan *createLogGroupResult
|
||||
createLogStreamArgument chan *cloudwatchlogs.CreateLogStreamInput
|
||||
createLogStreamResult chan *createLogStreamResult
|
||||
putLogEventsArgument chan *cloudwatchlogs.PutLogEventsInput
|
||||
putLogEventsResult chan *putLogEventsResult
|
||||
}
|
||||
|
||||
type createLogGroupResult struct {
|
||||
successResult *cloudwatchlogs.CreateLogGroupOutput
|
||||
errorResult error
|
||||
}
|
||||
|
||||
type createLogStreamResult struct {
|
||||
successResult *cloudwatchlogs.CreateLogStreamOutput
|
||||
errorResult error
|
||||
|
@ -21,6 +28,8 @@ type putLogEventsResult struct {
|
|||
|
||||
func newMockClient() *mockcwlogsclient {
|
||||
return &mockcwlogsclient{
|
||||
createLogGroupArgument: make(chan *cloudwatchlogs.CreateLogGroupInput, 1),
|
||||
createLogGroupResult: make(chan *createLogGroupResult, 1),
|
||||
createLogStreamArgument: make(chan *cloudwatchlogs.CreateLogStreamInput, 1),
|
||||
createLogStreamResult: make(chan *createLogStreamResult, 1),
|
||||
putLogEventsArgument: make(chan *cloudwatchlogs.PutLogEventsInput, 1),
|
||||
|
@ -37,6 +46,12 @@ func newMockClientBuffered(buflen int) *mockcwlogsclient {
|
|||
}
|
||||
}
|
||||
|
||||
func (m *mockcwlogsclient) CreateLogGroup(input *cloudwatchlogs.CreateLogGroupInput) (*cloudwatchlogs.CreateLogGroupOutput, error) {
|
||||
m.createLogGroupArgument <- input
|
||||
output := <-m.createLogGroupResult
|
||||
return output.successResult, output.errorResult
|
||||
}
|
||||
|
||||
func (m *mockcwlogsclient) CreateLogStream(input *cloudwatchlogs.CreateLogStreamInput) (*cloudwatchlogs.CreateLogStreamOutput, error) {
|
||||
m.createLogStreamArgument <- input
|
||||
output := <-m.createLogStreamResult
|
||||
|
|
Loading…
Reference in a new issue