diff --git a/contrib/completion/bash/docker b/contrib/completion/bash/docker index baef6b90ed..0a93395a75 100644 --- a/contrib/completion/bash/docker +++ b/contrib/completion/bash/docker @@ -520,7 +520,7 @@ __docker_complete_log_options() { local journald_options="env labels tag" local json_file_options="env labels max-file max-size" local syslog_options="env labels syslog-address syslog-facility syslog-format syslog-tls-ca-cert syslog-tls-cert syslog-tls-key syslog-tls-skip-verify tag" - local splunk_options="env labels splunk-caname splunk-capath splunk-format splunk-index splunk-insecureskipverify splunk-source splunk-sourcetype splunk-token splunk-url splunk-verify-connection tag" + local splunk_options="env labels splunk-caname splunk-capath splunk-format splunk-gzip splunk-gzip-level splunk-index splunk-insecureskipverify splunk-source splunk-sourcetype splunk-token splunk-url splunk-verify-connection tag" local all_options="$fluentd_options $gcplogs_options $gelf_options $journald_options $json_file_options $syslog_options $splunk_options" @@ -629,7 +629,7 @@ __docker_complete_log_driver_options() { __ltrim_colon_completions "${cur}" return ;; - splunk-insecureskipverify|splunk-verify-connection) + splunk-gzip|splunk-insecureskipverify|splunk-verify-connection) COMPREPLY=( $( compgen -W "false true" -- "${cur##*=}" ) ) return ;; diff --git a/contrib/completion/zsh/_docker b/contrib/completion/zsh/_docker index 8224c729f8..13962eadc0 100644 --- a/contrib/completion/zsh/_docker +++ b/contrib/completion/zsh/_docker @@ -228,7 +228,7 @@ __docker_get_log_options() { journald_options=("env" "labels" "tag") json_file_options=("env" "labels" "max-file" "max-size") syslog_options=("env" "labels" "syslog-address" "syslog-facility" "syslog-format" "syslog-tls-ca-cert" "syslog-tls-cert" "syslog-tls-key" "syslog-tls-skip-verify" "tag") - splunk_options=("env" "labels" "splunk-caname" "splunk-capath" "splunk-format" "splunk-index" "splunk-insecureskipverify" "splunk-source" "splunk-sourcetype" "splunk-token" "splunk-url" "splunk-verify-connection" "tag") + splunk_options=("env" "labels" "splunk-caname" "splunk-capath" "splunk-format" "splunk-gzip" "splunk-gzip-level" "splunk-index" "splunk-insecureskipverify" "splunk-source" "splunk-sourcetype" "splunk-token" "splunk-url" "splunk-verify-connection" "tag") [[ $log_driver = (awslogs|all) ]] && _describe -t awslogs-options "awslogs options" awslogs_options "$@" && ret=0 [[ $log_driver = (fluentd|all) ]] && _describe -t fluentd-options "fluentd options" fluentd_options "$@" && ret=0 diff --git a/daemon/logger/splunk/splunk.go b/daemon/logger/splunk/splunk.go index 15bff45110..f85832681a 100644 --- a/daemon/logger/splunk/splunk.go +++ b/daemon/logger/splunk/splunk.go @@ -4,6 +4,7 @@ package splunk import ( "bytes" + "compress/gzip" "crypto/tls" "crypto/x509" "encoding/json" @@ -12,7 +13,9 @@ import ( "io/ioutil" "net/http" "net/url" + "os" "strconv" + "sync" "time" "github.com/Sirupsen/logrus" @@ -22,22 +25,47 @@ import ( ) const ( - driverName = "splunk" - splunkURLKey = "splunk-url" - splunkTokenKey = "splunk-token" - splunkSourceKey = "splunk-source" - splunkSourceTypeKey = "splunk-sourcetype" - splunkIndexKey = "splunk-index" - splunkCAPathKey = "splunk-capath" - splunkCANameKey = "splunk-caname" - splunkInsecureSkipVerifyKey = "splunk-insecureskipverify" - splunkFormatKey = "splunk-format" - splunkVerifyConnectionKey = "splunk-verify-connection" - envKey = "env" - labelsKey = "labels" - tagKey = "tag" + driverName = "splunk" + splunkURLKey = "splunk-url" + splunkTokenKey = "splunk-token" + splunkSourceKey = "splunk-source" + splunkSourceTypeKey = "splunk-sourcetype" + splunkIndexKey = "splunk-index" + splunkCAPathKey = "splunk-capath" + splunkCANameKey = "splunk-caname" + splunkInsecureSkipVerifyKey = "splunk-insecureskipverify" + splunkFormatKey = "splunk-format" + splunkVerifyConnectionKey = "splunk-verify-connection" + splunkGzipCompressionKey = "splunk-gzip" + splunkGzipCompressionLevelKey = "splunk-gzip-level" + envKey = "env" + labelsKey = "labels" + tagKey = "tag" ) +const ( + // How often do we send messages (if we are not reaching batch size) + defaultPostMessagesFrequency = 5 * time.Second + // How big can be batch of messages + defaultPostMessagesBatchSize = 1000 + // Maximum number of messages we can store in buffer + defaultBufferMaximum = 10 * defaultPostMessagesBatchSize + // Number of messages allowed to be queued in the channel + defaultStreamChannelSize = 4 * defaultPostMessagesBatchSize +) + +const ( + envVarPostMessagesFrequency = "SPLUNK_LOGGING_DRIVER_POST_MESSAGES_FREQUENCY" + envVarPostMessagesBatchSize = "SPLUNK_LOGGING_DRIVER_POST_MESSAGES_BATCH_SIZE" + envVarBufferMaximum = "SPLUNK_LOGGING_DRIVER_BUFFER_MAX" + envVarStreamChannelSize = "SPLUNK_LOGGING_DRIVER_CHANNEL_SIZE" +) + +type splunkLoggerInterface interface { + logger.Logger + worker() +} + type splunkLogger struct { client *http.Client transport *http.Transport @@ -45,6 +73,23 @@ type splunkLogger struct { url string auth string nullMessage *splunkMessage + + // http compression + gzipCompression bool + gzipCompressionLevel int + + // Advanced options + postMessagesFrequency time.Duration + postMessagesBatchSize int + bufferMaximum int + + // For synchronization between background worker and logger. + // We use channel to send messages to worker go routine. + // All other variables for blocking Close call before we flush all messages to HEC + stream chan *splunkMessage + lock sync.RWMutex + closed bool + closedCond *sync.Cond } type splunkLoggerInline struct { @@ -140,6 +185,29 @@ func New(ctx logger.Context) (logger.Logger, error) { tlsConfig.ServerName = caName } + gzipCompression := false + if gzipCompressionStr, ok := ctx.Config[splunkGzipCompressionKey]; ok { + gzipCompression, err = strconv.ParseBool(gzipCompressionStr) + if err != nil { + return nil, err + } + } + + gzipCompressionLevel := gzip.DefaultCompression + if gzipCompressionLevelStr, ok := ctx.Config[splunkGzipCompressionLevelKey]; ok { + var err error + gzipCompressionLevel64, err := strconv.ParseInt(gzipCompressionLevelStr, 10, 32) + if err != nil { + return nil, err + } + gzipCompressionLevel = int(gzipCompressionLevel64) + if gzipCompressionLevel < gzip.DefaultCompression || gzipCompressionLevel > gzip.BestCompression { + err := fmt.Errorf("Not supported level '%s' for %s (supported values between %d and %d).", + gzipCompressionLevelStr, splunkGzipCompressionLevelKey, gzip.DefaultCompression, gzip.BestCompression) + return nil, err + } + } + transport := &http.Transport{ TLSClientConfig: tlsConfig, } @@ -158,19 +226,36 @@ func New(ctx logger.Context) (logger.Logger, error) { Index: index, } - tag, err := loggerutils.ParseLogTag(ctx, loggerutils.DefaultTemplate) - if err != nil { - return nil, err + // Allow user to remove tag from the messages by setting tag to empty string + tag := "" + if tagTemplate, ok := ctx.Config[tagKey]; !ok || tagTemplate != "" { + tag, err = loggerutils.ParseLogTag(ctx, loggerutils.DefaultTemplate) + if err != nil { + return nil, err + } } attrs := ctx.ExtraAttributes(nil) + var ( + postMessagesFrequency = getAdvancedOptionDuration(envVarPostMessagesFrequency, defaultPostMessagesFrequency) + postMessagesBatchSize = getAdvancedOptionInt(envVarPostMessagesBatchSize, defaultPostMessagesBatchSize) + bufferMaximum = getAdvancedOptionInt(envVarBufferMaximum, defaultBufferMaximum) + streamChannelSize = getAdvancedOptionInt(envVarStreamChannelSize, defaultStreamChannelSize) + ) + logger := &splunkLogger{ - client: client, - transport: transport, - url: splunkURL.String(), - auth: "Splunk " + splunkToken, - nullMessage: nullMessage, + client: client, + transport: transport, + url: splunkURL.String(), + auth: "Splunk " + splunkToken, + nullMessage: nullMessage, + gzipCompression: gzipCompression, + gzipCompressionLevel: gzipCompressionLevel, + stream: make(chan *splunkMessage, streamChannelSize), + postMessagesFrequency: postMessagesFrequency, + postMessagesBatchSize: postMessagesBatchSize, + bufferMaximum: bufferMaximum, } // By default we verify connection, but we allow use to skip that @@ -203,6 +288,8 @@ func New(ctx logger.Context) (logger.Logger, error) { splunkFormat = splunkFormatInline } + var loggerWrapper splunkLoggerInterface + switch splunkFormat { case splunkFormatInline: nullEvent := &splunkMessageEvent{ @@ -210,18 +297,20 @@ func New(ctx logger.Context) (logger.Logger, error) { Attrs: attrs, } - return &splunkLoggerInline{logger, nullEvent}, nil + loggerWrapper = &splunkLoggerInline{logger, nullEvent} case splunkFormatJSON: nullEvent := &splunkMessageEvent{ Tag: tag, Attrs: attrs, } - return &splunkLoggerJSON{&splunkLoggerInline{logger, nullEvent}}, nil + loggerWrapper = &splunkLoggerJSON{&splunkLoggerInline{logger, nullEvent}} case splunkFormatRaw: var prefix bytes.Buffer - prefix.WriteString(tag) - prefix.WriteString(" ") + if tag != "" { + prefix.WriteString(tag) + prefix.WriteString(" ") + } for key, value := range attrs { prefix.WriteString(key) prefix.WriteString("=") @@ -229,10 +318,14 @@ func New(ctx logger.Context) (logger.Logger, error) { prefix.WriteString(" ") } - return &splunkLoggerRaw{logger, prefix.Bytes()}, nil + loggerWrapper = &splunkLoggerRaw{logger, prefix.Bytes()} default: return nil, fmt.Errorf("Unexpected format %s", splunkFormat) } + + go loggerWrapper.worker() + + return loggerWrapper, nil } func (l *splunkLoggerInline) Log(msg *logger.Message) error { @@ -244,7 +337,7 @@ func (l *splunkLoggerInline) Log(msg *logger.Message) error { message.Event = &event - return l.postMessage(message) + return l.queueMessageAsync(message) } func (l *splunkLoggerJSON) Log(msg *logger.Message) error { @@ -262,7 +355,7 @@ func (l *splunkLoggerJSON) Log(msg *logger.Message) error { message.Event = &event - return l.postMessage(message) + return l.queueMessageAsync(message) } func (l *splunkLoggerRaw) Log(msg *logger.Message) error { @@ -270,19 +363,124 @@ func (l *splunkLoggerRaw) Log(msg *logger.Message) error { message.Event = string(append(l.prefix, msg.Line...)) - return l.postMessage(message) + return l.queueMessageAsync(message) } -func (l *splunkLogger) postMessage(message *splunkMessage) error { - jsonEvent, err := json.Marshal(message) - if err != nil { - return err +func (l *splunkLogger) queueMessageAsync(message *splunkMessage) error { + l.lock.RLock() + defer l.lock.RUnlock() + if l.closedCond != nil { + return fmt.Errorf("%s: driver is closed", driverName) } - req, err := http.NewRequest("POST", l.url, bytes.NewBuffer(jsonEvent)) + l.stream <- message + return nil +} + +func (l *splunkLogger) worker() { + timer := time.NewTicker(l.postMessagesFrequency) + var messages []*splunkMessage + for { + select { + case message, open := <-l.stream: + if !open { + l.postMessages(messages, true) + l.lock.Lock() + defer l.lock.Unlock() + l.transport.CloseIdleConnections() + l.closed = true + l.closedCond.Signal() + return + } + messages = append(messages, message) + // Only sending when we get exactly to the batch size, + // This also helps not to fire postMessages on every new message, + // when previous try failed. + if len(messages)%l.postMessagesBatchSize == 0 { + messages = l.postMessages(messages, false) + } + case <-timer.C: + messages = l.postMessages(messages, false) + } + } +} + +func (l *splunkLogger) postMessages(messages []*splunkMessage, lastChance bool) []*splunkMessage { + messagesLen := len(messages) + for i := 0; i < messagesLen; i += l.postMessagesBatchSize { + upperBound := i + l.postMessagesBatchSize + if upperBound > messagesLen { + upperBound = messagesLen + } + if err := l.tryPostMessages(messages[i:upperBound]); err != nil { + logrus.Error(err) + if messagesLen-i >= l.bufferMaximum || lastChance { + // If this is last chance - print them all to the daemon log + if lastChance { + upperBound = messagesLen + } + // Not all sent, but buffer has got to its maximum, let's log all messages + // we could not send and return buffer minus one batch size + for j := i; j < upperBound; j++ { + if jsonEvent, err := json.Marshal(messages[j]); err != nil { + logrus.Error(err) + } else { + logrus.Error(fmt.Errorf("Failed to send a message '%s'", string(jsonEvent))) + } + } + return messages[upperBound:messagesLen] + } + // Not all sent, returning buffer from where we have not sent messages + return messages[i:messagesLen] + } + } + // All sent, return empty buffer + return messages[:0] +} + +func (l *splunkLogger) tryPostMessages(messages []*splunkMessage) error { + if len(messages) == 0 { + return nil + } + var buffer bytes.Buffer + var writer io.Writer + var gzipWriter *gzip.Writer + var err error + // If gzip compression is enabled - create gzip writer with specified compression + // level. If gzip compression is disabled, use standard buffer as a writer + if l.gzipCompression { + gzipWriter, err = gzip.NewWriterLevel(&buffer, l.gzipCompressionLevel) + if err != nil { + return err + } + writer = gzipWriter + } else { + writer = &buffer + } + for _, message := range messages { + jsonEvent, err := json.Marshal(message) + if err != nil { + return err + } + if _, err := writer.Write(jsonEvent); err != nil { + return err + } + } + // If gzip compression is enabled, tell it, that we are done + if l.gzipCompression { + err = gzipWriter.Close() + if err != nil { + return err + } + } + req, err := http.NewRequest("POST", l.url, bytes.NewBuffer(buffer.Bytes())) if err != nil { return err } req.Header.Set("Authorization", l.auth) + // Tell if we are sending gzip compressed body + if l.gzipCompression { + req.Header.Set("Content-Encoding", "gzip") + } res, err := l.client.Do(req) if err != nil { return err @@ -301,7 +499,15 @@ func (l *splunkLogger) postMessage(message *splunkMessage) error { } func (l *splunkLogger) Close() error { - l.transport.CloseIdleConnections() + l.lock.Lock() + defer l.lock.Unlock() + if l.closedCond == nil { + l.closedCond = sync.NewCond(&l.lock) + close(l.stream) + for !l.closed { + l.closedCond.Wait() + } + } return nil } @@ -329,6 +535,8 @@ func ValidateLogOpt(cfg map[string]string) error { case splunkInsecureSkipVerifyKey: case splunkFormatKey: case splunkVerifyConnectionKey: + case splunkGzipCompressionKey: + case splunkGzipCompressionLevelKey: case envKey: case labelsKey: case tagKey: @@ -364,7 +572,7 @@ func parseURL(ctx logger.Context) (*url.URL, error) { } func verifySplunkConnection(l *splunkLogger) error { - req, err := http.NewRequest("OPTIONS", l.url, nil) + req, err := http.NewRequest(http.MethodOptions, l.url, nil) if err != nil { return err } @@ -385,3 +593,29 @@ func verifySplunkConnection(l *splunkLogger) error { } return nil } + +func getAdvancedOptionDuration(envName string, defaultValue time.Duration) time.Duration { + valueStr := os.Getenv(envName) + if valueStr == "" { + return defaultValue + } + parsedValue, err := time.ParseDuration(valueStr) + if err != nil { + logrus.Error(fmt.Sprintf("Failed to parse value of %s as duration. Using default %v. %v", envName, defaultValue, err)) + return defaultValue + } + return parsedValue +} + +func getAdvancedOptionInt(envName string, defaultValue int) int { + valueStr := os.Getenv(envName) + if valueStr == "" { + return defaultValue + } + parsedValue, err := strconv.ParseInt(valueStr, 10, 32) + if err != nil { + logrus.Error(fmt.Sprintf("Failed to parse value of %s as integer. Using default %d. %v", envName, defaultValue, err)) + return defaultValue + } + return int(parsedValue) +} diff --git a/daemon/logger/splunk/splunk_test.go b/daemon/logger/splunk/splunk_test.go new file mode 100644 index 0000000000..df74cbad5f --- /dev/null +++ b/daemon/logger/splunk/splunk_test.go @@ -0,0 +1,1302 @@ +package splunk + +import ( + "compress/gzip" + "fmt" + "os" + "testing" + "time" + + "github.com/docker/docker/daemon/logger" +) + +// Validate options +func TestValidateLogOpt(t *testing.T) { + err := ValidateLogOpt(map[string]string{ + splunkURLKey: "http://127.0.0.1", + splunkTokenKey: "2160C7EF-2CE9-4307-A180-F852B99CF417", + splunkSourceKey: "mysource", + splunkSourceTypeKey: "mysourcetype", + splunkIndexKey: "myindex", + splunkCAPathKey: "/usr/cert.pem", + splunkCANameKey: "ca_name", + splunkInsecureSkipVerifyKey: "true", + splunkFormatKey: "json", + splunkVerifyConnectionKey: "true", + splunkGzipCompressionKey: "true", + splunkGzipCompressionLevelKey: "1", + envKey: "a", + labelsKey: "b", + tagKey: "c", + }) + if err != nil { + t.Fatal(err) + } + + err = ValidateLogOpt(map[string]string{ + "not-supported-option": "a", + }) + if err == nil { + t.Fatal("Expecting error on unsupported options") + } +} + +// Driver require user to specify required options +func TestNewMissedConfig(t *testing.T) { + ctx := logger.Context{ + Config: map[string]string{}, + } + _, err := New(ctx) + if err == nil { + t.Fatal("Logger driver should fail when no required parameters specified") + } +} + +// Driver require user to specify splunk-url +func TestNewMissedUrl(t *testing.T) { + ctx := logger.Context{ + Config: map[string]string{ + splunkTokenKey: "4642492F-D8BD-47F1-A005-0C08AE4657DF", + }, + } + _, err := New(ctx) + if err.Error() != "splunk: splunk-url is expected" { + t.Fatal("Logger driver should fail when no required parameters specified") + } +} + +// Driver require user to specify splunk-token +func TestNewMissedToken(t *testing.T) { + ctx := logger.Context{ + Config: map[string]string{ + splunkURLKey: "http://127.0.0.1:8088", + }, + } + _, err := New(ctx) + if err.Error() != "splunk: splunk-token is expected" { + t.Fatal("Logger driver should fail when no required parameters specified") + } +} + +// Test default settings +func TestDefault(t *testing.T) { + hec := NewHTTPEventCollectorMock(t) + + go hec.Serve() + + ctx := logger.Context{ + Config: map[string]string{ + splunkURLKey: hec.URL(), + splunkTokenKey: hec.token, + }, + ContainerID: "containeriid", + ContainerName: "container_name", + ContainerImageID: "contaimageid", + ContainerImageName: "container_image_name", + } + + hostname, err := ctx.Hostname() + if err != nil { + t.Fatal(err) + } + + loggerDriver, err := New(ctx) + if err != nil { + t.Fatal(err) + } + + if loggerDriver.Name() != driverName { + t.Fatal("Unexpected logger driver name") + } + + if !hec.connectionVerified { + t.Fatal("By default connection should be verified") + } + + splunkLoggerDriver, ok := loggerDriver.(*splunkLoggerInline) + if !ok { + t.Fatal("Unexpected Splunk Logging Driver type") + } + + if splunkLoggerDriver.url != hec.URL()+"/services/collector/event/1.0" || + splunkLoggerDriver.auth != "Splunk "+hec.token || + splunkLoggerDriver.nullMessage.Host != hostname || + splunkLoggerDriver.nullMessage.Source != "" || + splunkLoggerDriver.nullMessage.SourceType != "" || + splunkLoggerDriver.nullMessage.Index != "" || + splunkLoggerDriver.gzipCompression != false || + splunkLoggerDriver.postMessagesFrequency != defaultPostMessagesFrequency || + splunkLoggerDriver.postMessagesBatchSize != defaultPostMessagesBatchSize || + splunkLoggerDriver.bufferMaximum != defaultBufferMaximum || + cap(splunkLoggerDriver.stream) != defaultStreamChannelSize { + t.Fatal("Found not default values setup in Splunk Logging Driver.") + } + + message1Time := time.Now() + if err := loggerDriver.Log(&logger.Message{[]byte("{\"a\":\"b\"}"), "stdout", message1Time, nil, false}); err != nil { + t.Fatal(err) + } + message2Time := time.Now() + if err := loggerDriver.Log(&logger.Message{[]byte("notajson"), "stdout", message2Time, nil, false}); err != nil { + t.Fatal(err) + } + + err = loggerDriver.Close() + if err != nil { + t.Fatal(err) + } + + if len(hec.messages) != 2 { + t.Fatal("Expected two messages") + } + + if *hec.gzipEnabled { + t.Fatal("Gzip should not be used") + } + + message1 := hec.messages[0] + if message1.Time != fmt.Sprintf("%f", float64(message1Time.UnixNano())/float64(time.Second)) || + message1.Host != hostname || + message1.Source != "" || + message1.SourceType != "" || + message1.Index != "" { + t.Fatalf("Unexpected values of message 1 %v", message1) + } + + if event, err := message1.EventAsMap(); err != nil { + t.Fatal(err) + } else { + if event["line"] != "{\"a\":\"b\"}" || + event["source"] != "stdout" || + event["tag"] != "containeriid" || + len(event) != 3 { + t.Fatalf("Unexpected event in message %v", event) + } + } + + message2 := hec.messages[1] + if message2.Time != fmt.Sprintf("%f", float64(message2Time.UnixNano())/float64(time.Second)) || + message2.Host != hostname || + message2.Source != "" || + message2.SourceType != "" || + message2.Index != "" { + t.Fatalf("Unexpected values of message 1 %v", message2) + } + + if event, err := message2.EventAsMap(); err != nil { + t.Fatal(err) + } else { + if event["line"] != "notajson" || + event["source"] != "stdout" || + event["tag"] != "containeriid" || + len(event) != 3 { + t.Fatalf("Unexpected event in message %v", event) + } + } + + err = hec.Close() + if err != nil { + t.Fatal(err) + } +} + +// Verify inline format with a not default settings for most of options +func TestInlineFormatWithNonDefaultOptions(t *testing.T) { + hec := NewHTTPEventCollectorMock(t) + + go hec.Serve() + + ctx := logger.Context{ + Config: map[string]string{ + splunkURLKey: hec.URL(), + splunkTokenKey: hec.token, + splunkSourceKey: "mysource", + splunkSourceTypeKey: "mysourcetype", + splunkIndexKey: "myindex", + splunkFormatKey: splunkFormatInline, + splunkGzipCompressionKey: "true", + tagKey: "{{.ImageName}}/{{.Name}}", + labelsKey: "a", + }, + ContainerID: "containeriid", + ContainerName: "/container_name", + ContainerImageID: "contaimageid", + ContainerImageName: "container_image_name", + ContainerLabels: map[string]string{ + "a": "b", + }, + } + + hostname, err := ctx.Hostname() + if err != nil { + t.Fatal(err) + } + + loggerDriver, err := New(ctx) + if err != nil { + t.Fatal(err) + } + + if !hec.connectionVerified { + t.Fatal("By default connection should be verified") + } + + splunkLoggerDriver, ok := loggerDriver.(*splunkLoggerInline) + if !ok { + t.Fatal("Unexpected Splunk Logging Driver type") + } + + if splunkLoggerDriver.url != hec.URL()+"/services/collector/event/1.0" || + splunkLoggerDriver.auth != "Splunk "+hec.token || + splunkLoggerDriver.nullMessage.Host != hostname || + splunkLoggerDriver.nullMessage.Source != "mysource" || + splunkLoggerDriver.nullMessage.SourceType != "mysourcetype" || + splunkLoggerDriver.nullMessage.Index != "myindex" || + splunkLoggerDriver.gzipCompression != true || + splunkLoggerDriver.gzipCompressionLevel != gzip.DefaultCompression || + splunkLoggerDriver.postMessagesFrequency != defaultPostMessagesFrequency || + splunkLoggerDriver.postMessagesBatchSize != defaultPostMessagesBatchSize || + splunkLoggerDriver.bufferMaximum != defaultBufferMaximum || + cap(splunkLoggerDriver.stream) != defaultStreamChannelSize { + t.Fatal("Values do not match configuration.") + } + + messageTime := time.Now() + if err := loggerDriver.Log(&logger.Message{[]byte("1"), "stdout", messageTime, nil, false}); err != nil { + t.Fatal(err) + } + + err = loggerDriver.Close() + if err != nil { + t.Fatal(err) + } + + if len(hec.messages) != 1 { + t.Fatal("Expected one message") + } + + if !*hec.gzipEnabled { + t.Fatal("Gzip should be used") + } + + message := hec.messages[0] + if message.Time != fmt.Sprintf("%f", float64(messageTime.UnixNano())/float64(time.Second)) || + message.Host != hostname || + message.Source != "mysource" || + message.SourceType != "mysourcetype" || + message.Index != "myindex" { + t.Fatalf("Unexpected values of message %v", message) + } + + if event, err := message.EventAsMap(); err != nil { + t.Fatal(err) + } else { + if event["line"] != "1" || + event["source"] != "stdout" || + event["tag"] != "container_image_name/container_name" || + event["attrs"].(map[string]interface{})["a"] != "b" || + len(event) != 4 { + t.Fatalf("Unexpected event in message %v", event) + } + } + + err = hec.Close() + if err != nil { + t.Fatal(err) + } +} + +// Verify JSON format +func TestJsonFormat(t *testing.T) { + hec := NewHTTPEventCollectorMock(t) + + go hec.Serve() + + ctx := logger.Context{ + Config: map[string]string{ + splunkURLKey: hec.URL(), + splunkTokenKey: hec.token, + splunkFormatKey: splunkFormatJSON, + splunkGzipCompressionKey: "true", + splunkGzipCompressionLevelKey: "1", + }, + ContainerID: "containeriid", + ContainerName: "/container_name", + ContainerImageID: "contaimageid", + ContainerImageName: "container_image_name", + } + + hostname, err := ctx.Hostname() + if err != nil { + t.Fatal(err) + } + + loggerDriver, err := New(ctx) + if err != nil { + t.Fatal(err) + } + + if !hec.connectionVerified { + t.Fatal("By default connection should be verified") + } + + splunkLoggerDriver, ok := loggerDriver.(*splunkLoggerJSON) + if !ok { + t.Fatal("Unexpected Splunk Logging Driver type") + } + + if splunkLoggerDriver.url != hec.URL()+"/services/collector/event/1.0" || + splunkLoggerDriver.auth != "Splunk "+hec.token || + splunkLoggerDriver.nullMessage.Host != hostname || + splunkLoggerDriver.nullMessage.Source != "" || + splunkLoggerDriver.nullMessage.SourceType != "" || + splunkLoggerDriver.nullMessage.Index != "" || + splunkLoggerDriver.gzipCompression != true || + splunkLoggerDriver.gzipCompressionLevel != gzip.BestSpeed || + splunkLoggerDriver.postMessagesFrequency != defaultPostMessagesFrequency || + splunkLoggerDriver.postMessagesBatchSize != defaultPostMessagesBatchSize || + splunkLoggerDriver.bufferMaximum != defaultBufferMaximum || + cap(splunkLoggerDriver.stream) != defaultStreamChannelSize { + t.Fatal("Values do not match configuration.") + } + + message1Time := time.Now() + if err := loggerDriver.Log(&logger.Message{[]byte("{\"a\":\"b\"}"), "stdout", message1Time, nil, false}); err != nil { + t.Fatal(err) + } + message2Time := time.Now() + if err := loggerDriver.Log(&logger.Message{[]byte("notjson"), "stdout", message2Time, nil, false}); err != nil { + t.Fatal(err) + } + + err = loggerDriver.Close() + if err != nil { + t.Fatal(err) + } + + if len(hec.messages) != 2 { + t.Fatal("Expected two messages") + } + + message1 := hec.messages[0] + if message1.Time != fmt.Sprintf("%f", float64(message1Time.UnixNano())/float64(time.Second)) || + message1.Host != hostname || + message1.Source != "" || + message1.SourceType != "" || + message1.Index != "" { + t.Fatalf("Unexpected values of message 1 %v", message1) + } + + if event, err := message1.EventAsMap(); err != nil { + t.Fatal(err) + } else { + if event["line"].(map[string]interface{})["a"] != "b" || + event["source"] != "stdout" || + event["tag"] != "containeriid" || + len(event) != 3 { + t.Fatalf("Unexpected event in message 1 %v", event) + } + } + + message2 := hec.messages[1] + if message2.Time != fmt.Sprintf("%f", float64(message2Time.UnixNano())/float64(time.Second)) || + message2.Host != hostname || + message2.Source != "" || + message2.SourceType != "" || + message2.Index != "" { + t.Fatalf("Unexpected values of message 2 %v", message2) + } + + // If message cannot be parsed as JSON - it should be sent as a line + if event, err := message2.EventAsMap(); err != nil { + t.Fatal(err) + } else { + if event["line"] != "notjson" || + event["source"] != "stdout" || + event["tag"] != "containeriid" || + len(event) != 3 { + t.Fatalf("Unexpected event in message 2 %v", event) + } + } + + err = hec.Close() + if err != nil { + t.Fatal(err) + } +} + +// Verify raw format +func TestRawFormat(t *testing.T) { + hec := NewHTTPEventCollectorMock(t) + + go hec.Serve() + + ctx := logger.Context{ + Config: map[string]string{ + splunkURLKey: hec.URL(), + splunkTokenKey: hec.token, + splunkFormatKey: splunkFormatRaw, + }, + ContainerID: "containeriid", + ContainerName: "/container_name", + ContainerImageID: "contaimageid", + ContainerImageName: "container_image_name", + } + + hostname, err := ctx.Hostname() + if err != nil { + t.Fatal(err) + } + + loggerDriver, err := New(ctx) + if err != nil { + t.Fatal(err) + } + + if !hec.connectionVerified { + t.Fatal("By default connection should be verified") + } + + splunkLoggerDriver, ok := loggerDriver.(*splunkLoggerRaw) + if !ok { + t.Fatal("Unexpected Splunk Logging Driver type") + } + + if splunkLoggerDriver.url != hec.URL()+"/services/collector/event/1.0" || + splunkLoggerDriver.auth != "Splunk "+hec.token || + splunkLoggerDriver.nullMessage.Host != hostname || + splunkLoggerDriver.nullMessage.Source != "" || + splunkLoggerDriver.nullMessage.SourceType != "" || + splunkLoggerDriver.nullMessage.Index != "" || + splunkLoggerDriver.gzipCompression != false || + splunkLoggerDriver.postMessagesFrequency != defaultPostMessagesFrequency || + splunkLoggerDriver.postMessagesBatchSize != defaultPostMessagesBatchSize || + splunkLoggerDriver.bufferMaximum != defaultBufferMaximum || + cap(splunkLoggerDriver.stream) != defaultStreamChannelSize || + string(splunkLoggerDriver.prefix) != "containeriid " { + t.Fatal("Values do not match configuration.") + } + + message1Time := time.Now() + if err := loggerDriver.Log(&logger.Message{[]byte("{\"a\":\"b\"}"), "stdout", message1Time, nil, false}); err != nil { + t.Fatal(err) + } + message2Time := time.Now() + if err := loggerDriver.Log(&logger.Message{[]byte("notjson"), "stdout", message2Time, nil, false}); err != nil { + t.Fatal(err) + } + + err = loggerDriver.Close() + if err != nil { + t.Fatal(err) + } + + if len(hec.messages) != 2 { + t.Fatal("Expected two messages") + } + + message1 := hec.messages[0] + if message1.Time != fmt.Sprintf("%f", float64(message1Time.UnixNano())/float64(time.Second)) || + message1.Host != hostname || + message1.Source != "" || + message1.SourceType != "" || + message1.Index != "" { + t.Fatalf("Unexpected values of message 1 %v", message1) + } + + if event, err := message1.EventAsString(); err != nil { + t.Fatal(err) + } else { + if event != "containeriid {\"a\":\"b\"}" { + t.Fatalf("Unexpected event in message 1 %v", event) + } + } + + message2 := hec.messages[1] + if message2.Time != fmt.Sprintf("%f", float64(message2Time.UnixNano())/float64(time.Second)) || + message2.Host != hostname || + message2.Source != "" || + message2.SourceType != "" || + message2.Index != "" { + t.Fatalf("Unexpected values of message 2 %v", message2) + } + + if event, err := message2.EventAsString(); err != nil { + t.Fatal(err) + } else { + if event != "containeriid notjson" { + t.Fatalf("Unexpected event in message 1 %v", event) + } + } + + err = hec.Close() + if err != nil { + t.Fatal(err) + } +} + +// Verify raw format with labels +func TestRawFormatWithLabels(t *testing.T) { + hec := NewHTTPEventCollectorMock(t) + + go hec.Serve() + + ctx := logger.Context{ + Config: map[string]string{ + splunkURLKey: hec.URL(), + splunkTokenKey: hec.token, + splunkFormatKey: splunkFormatRaw, + labelsKey: "a", + }, + ContainerID: "containeriid", + ContainerName: "/container_name", + ContainerImageID: "contaimageid", + ContainerImageName: "container_image_name", + ContainerLabels: map[string]string{ + "a": "b", + }, + } + + hostname, err := ctx.Hostname() + if err != nil { + t.Fatal(err) + } + + loggerDriver, err := New(ctx) + if err != nil { + t.Fatal(err) + } + + if !hec.connectionVerified { + t.Fatal("By default connection should be verified") + } + + splunkLoggerDriver, ok := loggerDriver.(*splunkLoggerRaw) + if !ok { + t.Fatal("Unexpected Splunk Logging Driver type") + } + + if splunkLoggerDriver.url != hec.URL()+"/services/collector/event/1.0" || + splunkLoggerDriver.auth != "Splunk "+hec.token || + splunkLoggerDriver.nullMessage.Host != hostname || + splunkLoggerDriver.nullMessage.Source != "" || + splunkLoggerDriver.nullMessage.SourceType != "" || + splunkLoggerDriver.nullMessage.Index != "" || + splunkLoggerDriver.gzipCompression != false || + splunkLoggerDriver.postMessagesFrequency != defaultPostMessagesFrequency || + splunkLoggerDriver.postMessagesBatchSize != defaultPostMessagesBatchSize || + splunkLoggerDriver.bufferMaximum != defaultBufferMaximum || + cap(splunkLoggerDriver.stream) != defaultStreamChannelSize || + string(splunkLoggerDriver.prefix) != "containeriid a=b " { + t.Fatal("Values do not match configuration.") + } + + message1Time := time.Now() + if err := loggerDriver.Log(&logger.Message{[]byte("{\"a\":\"b\"}"), "stdout", message1Time, nil, false}); err != nil { + t.Fatal(err) + } + message2Time := time.Now() + if err := loggerDriver.Log(&logger.Message{[]byte("notjson"), "stdout", message2Time, nil, false}); err != nil { + t.Fatal(err) + } + + err = loggerDriver.Close() + if err != nil { + t.Fatal(err) + } + + if len(hec.messages) != 2 { + t.Fatal("Expected two messages") + } + + message1 := hec.messages[0] + if message1.Time != fmt.Sprintf("%f", float64(message1Time.UnixNano())/float64(time.Second)) || + message1.Host != hostname || + message1.Source != "" || + message1.SourceType != "" || + message1.Index != "" { + t.Fatalf("Unexpected values of message 1 %v", message1) + } + + if event, err := message1.EventAsString(); err != nil { + t.Fatal(err) + } else { + if event != "containeriid a=b {\"a\":\"b\"}" { + t.Fatalf("Unexpected event in message 1 %v", event) + } + } + + message2 := hec.messages[1] + if message2.Time != fmt.Sprintf("%f", float64(message2Time.UnixNano())/float64(time.Second)) || + message2.Host != hostname || + message2.Source != "" || + message2.SourceType != "" || + message2.Index != "" { + t.Fatalf("Unexpected values of message 2 %v", message2) + } + + if event, err := message2.EventAsString(); err != nil { + t.Fatal(err) + } else { + if event != "containeriid a=b notjson" { + t.Fatalf("Unexpected event in message 1 %v", event) + } + } + + err = hec.Close() + if err != nil { + t.Fatal(err) + } +} + +// Verify that Splunk Logging Driver can accept tag="" which will allow to send raw messages +// in the same way we get them in stdout/stderr +func TestRawFormatWithoutTag(t *testing.T) { + hec := NewHTTPEventCollectorMock(t) + + go hec.Serve() + + ctx := logger.Context{ + Config: map[string]string{ + splunkURLKey: hec.URL(), + splunkTokenKey: hec.token, + splunkFormatKey: splunkFormatRaw, + tagKey: "", + }, + ContainerID: "containeriid", + ContainerName: "/container_name", + ContainerImageID: "contaimageid", + ContainerImageName: "container_image_name", + } + + hostname, err := ctx.Hostname() + if err != nil { + t.Fatal(err) + } + + loggerDriver, err := New(ctx) + if err != nil { + t.Fatal(err) + } + + if !hec.connectionVerified { + t.Fatal("By default connection should be verified") + } + + splunkLoggerDriver, ok := loggerDriver.(*splunkLoggerRaw) + if !ok { + t.Fatal("Unexpected Splunk Logging Driver type") + } + + if splunkLoggerDriver.url != hec.URL()+"/services/collector/event/1.0" || + splunkLoggerDriver.auth != "Splunk "+hec.token || + splunkLoggerDriver.nullMessage.Host != hostname || + splunkLoggerDriver.nullMessage.Source != "" || + splunkLoggerDriver.nullMessage.SourceType != "" || + splunkLoggerDriver.nullMessage.Index != "" || + splunkLoggerDriver.gzipCompression != false || + splunkLoggerDriver.postMessagesFrequency != defaultPostMessagesFrequency || + splunkLoggerDriver.postMessagesBatchSize != defaultPostMessagesBatchSize || + splunkLoggerDriver.bufferMaximum != defaultBufferMaximum || + cap(splunkLoggerDriver.stream) != defaultStreamChannelSize || + string(splunkLoggerDriver.prefix) != "" { + t.Log(string(splunkLoggerDriver.prefix) + "a") + t.Fatal("Values do not match configuration.") + } + + message1Time := time.Now() + if err := loggerDriver.Log(&logger.Message{[]byte("{\"a\":\"b\"}"), "stdout", message1Time, nil, false}); err != nil { + t.Fatal(err) + } + message2Time := time.Now() + if err := loggerDriver.Log(&logger.Message{[]byte("notjson"), "stdout", message2Time, nil, false}); err != nil { + t.Fatal(err) + } + + err = loggerDriver.Close() + if err != nil { + t.Fatal(err) + } + + if len(hec.messages) != 2 { + t.Fatal("Expected two messages") + } + + message1 := hec.messages[0] + if message1.Time != fmt.Sprintf("%f", float64(message1Time.UnixNano())/float64(time.Second)) || + message1.Host != hostname || + message1.Source != "" || + message1.SourceType != "" || + message1.Index != "" { + t.Fatalf("Unexpected values of message 1 %v", message1) + } + + if event, err := message1.EventAsString(); err != nil { + t.Fatal(err) + } else { + if event != "{\"a\":\"b\"}" { + t.Fatalf("Unexpected event in message 1 %v", event) + } + } + + message2 := hec.messages[1] + if message2.Time != fmt.Sprintf("%f", float64(message2Time.UnixNano())/float64(time.Second)) || + message2.Host != hostname || + message2.Source != "" || + message2.SourceType != "" || + message2.Index != "" { + t.Fatalf("Unexpected values of message 2 %v", message2) + } + + if event, err := message2.EventAsString(); err != nil { + t.Fatal(err) + } else { + if event != "notjson" { + t.Fatalf("Unexpected event in message 1 %v", event) + } + } + + err = hec.Close() + if err != nil { + t.Fatal(err) + } +} + +// Verify that we will send messages in batches with default batching parameters, +// but change frequency to be sure that numOfRequests will match expected 17 requests +func TestBatching(t *testing.T) { + if err := os.Setenv(envVarPostMessagesFrequency, "10h"); err != nil { + t.Fatal(err) + } + + hec := NewHTTPEventCollectorMock(t) + + go hec.Serve() + + ctx := logger.Context{ + Config: map[string]string{ + splunkURLKey: hec.URL(), + splunkTokenKey: hec.token, + }, + ContainerID: "containeriid", + ContainerName: "/container_name", + ContainerImageID: "contaimageid", + ContainerImageName: "container_image_name", + } + + loggerDriver, err := New(ctx) + if err != nil { + t.Fatal(err) + } + + for i := 0; i < defaultStreamChannelSize*4; i++ { + if err := loggerDriver.Log(&logger.Message{[]byte(fmt.Sprintf("%d", i)), "stdout", time.Now(), nil, false}); err != nil { + t.Fatal(err) + } + } + + err = loggerDriver.Close() + if err != nil { + t.Fatal(err) + } + + if len(hec.messages) != defaultStreamChannelSize*4 { + t.Fatal("Not all messages delivered") + } + + for i, message := range hec.messages { + if event, err := message.EventAsMap(); err != nil { + t.Fatal(err) + } else { + if event["line"] != fmt.Sprintf("%d", i) { + t.Fatalf("Unexpected event in message %v", event) + } + } + } + + // 1 to verify connection and 16 batches + if hec.numOfRequests != 17 { + t.Fatalf("Unexpected number of requests %d", hec.numOfRequests) + } + + err = hec.Close() + if err != nil { + t.Fatal(err) + } + + if err := os.Setenv(envVarPostMessagesFrequency, ""); err != nil { + t.Fatal(err) + } +} + +// Verify that test is using time to fire events not rare than specified frequency +func TestFrequency(t *testing.T) { + if err := os.Setenv(envVarPostMessagesFrequency, "5ms"); err != nil { + t.Fatal(err) + } + + hec := NewHTTPEventCollectorMock(t) + + go hec.Serve() + + ctx := logger.Context{ + Config: map[string]string{ + splunkURLKey: hec.URL(), + splunkTokenKey: hec.token, + }, + ContainerID: "containeriid", + ContainerName: "/container_name", + ContainerImageID: "contaimageid", + ContainerImageName: "container_image_name", + } + + loggerDriver, err := New(ctx) + if err != nil { + t.Fatal(err) + } + + for i := 0; i < 10; i++ { + if err := loggerDriver.Log(&logger.Message{[]byte(fmt.Sprintf("%d", i)), "stdout", time.Now(), nil, false}); err != nil { + t.Fatal(err) + } + time.Sleep(15 * time.Millisecond) + } + + err = loggerDriver.Close() + if err != nil { + t.Fatal(err) + } + + if len(hec.messages) != 10 { + t.Fatal("Not all messages delivered") + } + + for i, message := range hec.messages { + if event, err := message.EventAsMap(); err != nil { + t.Fatal(err) + } else { + if event["line"] != fmt.Sprintf("%d", i) { + t.Fatalf("Unexpected event in message %v", event) + } + } + } + + // 1 to verify connection and 10 to verify that we have sent messages with required frequency, + // but because frequency is too small (to keep test quick), instead of 11, use 9 if context switches will be slow + if hec.numOfRequests < 9 { + t.Fatalf("Unexpected number of requests %d", hec.numOfRequests) + } + + err = hec.Close() + if err != nil { + t.Fatal(err) + } + + if err := os.Setenv(envVarPostMessagesFrequency, ""); err != nil { + t.Fatal(err) + } +} + +// Simulate behavior similar to first version of Splunk Logging Driver, when we were sending one message +// per request +func TestOneMessagePerRequest(t *testing.T) { + if err := os.Setenv(envVarPostMessagesFrequency, "10h"); err != nil { + t.Fatal(err) + } + + if err := os.Setenv(envVarPostMessagesBatchSize, "1"); err != nil { + t.Fatal(err) + } + + if err := os.Setenv(envVarBufferMaximum, "1"); err != nil { + t.Fatal(err) + } + + if err := os.Setenv(envVarStreamChannelSize, "0"); err != nil { + t.Fatal(err) + } + + hec := NewHTTPEventCollectorMock(t) + + go hec.Serve() + + ctx := logger.Context{ + Config: map[string]string{ + splunkURLKey: hec.URL(), + splunkTokenKey: hec.token, + }, + ContainerID: "containeriid", + ContainerName: "/container_name", + ContainerImageID: "contaimageid", + ContainerImageName: "container_image_name", + } + + loggerDriver, err := New(ctx) + if err != nil { + t.Fatal(err) + } + + for i := 0; i < 10; i++ { + if err := loggerDriver.Log(&logger.Message{[]byte(fmt.Sprintf("%d", i)), "stdout", time.Now(), nil, false}); err != nil { + t.Fatal(err) + } + } + + err = loggerDriver.Close() + if err != nil { + t.Fatal(err) + } + + if len(hec.messages) != 10 { + t.Fatal("Not all messages delivered") + } + + for i, message := range hec.messages { + if event, err := message.EventAsMap(); err != nil { + t.Fatal(err) + } else { + if event["line"] != fmt.Sprintf("%d", i) { + t.Fatalf("Unexpected event in message %v", event) + } + } + } + + // 1 to verify connection and 10 messages + if hec.numOfRequests != 11 { + t.Fatalf("Unexpected number of requests %d", hec.numOfRequests) + } + + err = hec.Close() + if err != nil { + t.Fatal(err) + } + + if err := os.Setenv(envVarPostMessagesFrequency, ""); err != nil { + t.Fatal(err) + } + + if err := os.Setenv(envVarPostMessagesBatchSize, ""); err != nil { + t.Fatal(err) + } + + if err := os.Setenv(envVarBufferMaximum, ""); err != nil { + t.Fatal(err) + } + + if err := os.Setenv(envVarStreamChannelSize, ""); err != nil { + t.Fatal(err) + } +} + +// Driver should not be created when HEC is unresponsive +func TestVerify(t *testing.T) { + hec := NewHTTPEventCollectorMock(t) + hec.simulateServerError = true + go hec.Serve() + + ctx := logger.Context{ + Config: map[string]string{ + splunkURLKey: hec.URL(), + splunkTokenKey: hec.token, + }, + ContainerID: "containeriid", + ContainerName: "/container_name", + ContainerImageID: "contaimageid", + ContainerImageName: "container_image_name", + } + + _, err := New(ctx) + if err == nil { + t.Fatal("Expecting driver to fail, when server is unresponsive") + } + + err = hec.Close() + if err != nil { + t.Fatal(err) + } +} + +// Verify that user can specify to skip verification that Splunk HEC is working. +// Also in this test we verify retry logic. +func TestSkipVerify(t *testing.T) { + hec := NewHTTPEventCollectorMock(t) + hec.simulateServerError = true + go hec.Serve() + + ctx := logger.Context{ + Config: map[string]string{ + splunkURLKey: hec.URL(), + splunkTokenKey: hec.token, + splunkVerifyConnectionKey: "false", + }, + ContainerID: "containeriid", + ContainerName: "/container_name", + ContainerImageID: "contaimageid", + ContainerImageName: "container_image_name", + } + + loggerDriver, err := New(ctx) + if err != nil { + t.Fatal(err) + } + + if hec.connectionVerified { + t.Fatal("Connection should not be verified") + } + + for i := 0; i < defaultStreamChannelSize*2; i++ { + if err := loggerDriver.Log(&logger.Message{[]byte(fmt.Sprintf("%d", i)), "stdout", time.Now(), nil, false}); err != nil { + t.Fatal(err) + } + } + + if len(hec.messages) != 0 { + t.Fatal("No messages should be accepted at this point") + } + + hec.simulateServerError = false + + for i := defaultStreamChannelSize * 2; i < defaultStreamChannelSize*4; i++ { + if err := loggerDriver.Log(&logger.Message{[]byte(fmt.Sprintf("%d", i)), "stdout", time.Now(), nil, false}); err != nil { + t.Fatal(err) + } + } + + err = loggerDriver.Close() + if err != nil { + t.Fatal(err) + } + + if len(hec.messages) != defaultStreamChannelSize*4 { + t.Fatal("Not all messages delivered") + } + + for i, message := range hec.messages { + if event, err := message.EventAsMap(); err != nil { + t.Fatal(err) + } else { + if event["line"] != fmt.Sprintf("%d", i) { + t.Fatalf("Unexpected event in message %v", event) + } + } + } + + err = hec.Close() + if err != nil { + t.Fatal(err) + } +} + +// Verify logic for when we filled whole buffer +func TestBufferMaximum(t *testing.T) { + if err := os.Setenv(envVarPostMessagesBatchSize, "2"); err != nil { + t.Fatal(err) + } + + if err := os.Setenv(envVarBufferMaximum, "10"); err != nil { + t.Fatal(err) + } + + if err := os.Setenv(envVarStreamChannelSize, "0"); err != nil { + t.Fatal(err) + } + + hec := NewHTTPEventCollectorMock(t) + hec.simulateServerError = true + go hec.Serve() + + ctx := logger.Context{ + Config: map[string]string{ + splunkURLKey: hec.URL(), + splunkTokenKey: hec.token, + splunkVerifyConnectionKey: "false", + }, + ContainerID: "containeriid", + ContainerName: "/container_name", + ContainerImageID: "contaimageid", + ContainerImageName: "container_image_name", + } + + loggerDriver, err := New(ctx) + if err != nil { + t.Fatal(err) + } + + if hec.connectionVerified { + t.Fatal("Connection should not be verified") + } + + for i := 0; i < 11; i++ { + if err := loggerDriver.Log(&logger.Message{[]byte(fmt.Sprintf("%d", i)), "stdout", time.Now(), nil, false}); err != nil { + t.Fatal(err) + } + } + + if len(hec.messages) != 0 { + t.Fatal("No messages should be accepted at this point") + } + + hec.simulateServerError = false + + err = loggerDriver.Close() + if err != nil { + t.Fatal(err) + } + + if len(hec.messages) != 9 { + t.Fatalf("Expected # of messages %d, got %d", 9, len(hec.messages)) + } + + // First 1000 messages are written to daemon log when buffer was full + for i, message := range hec.messages { + if event, err := message.EventAsMap(); err != nil { + t.Fatal(err) + } else { + if event["line"] != fmt.Sprintf("%d", i+2) { + t.Fatalf("Unexpected event in message %v", event) + } + } + } + + err = hec.Close() + if err != nil { + t.Fatal(err) + } + + if err := os.Setenv(envVarPostMessagesBatchSize, ""); err != nil { + t.Fatal(err) + } + + if err := os.Setenv(envVarBufferMaximum, ""); err != nil { + t.Fatal(err) + } + + if err := os.Setenv(envVarStreamChannelSize, ""); err != nil { + t.Fatal(err) + } +} + +// Verify that we are not blocking close when HEC is down for the whole time +func TestServerAlwaysDown(t *testing.T) { + if err := os.Setenv(envVarPostMessagesBatchSize, "2"); err != nil { + t.Fatal(err) + } + + if err := os.Setenv(envVarBufferMaximum, "4"); err != nil { + t.Fatal(err) + } + + if err := os.Setenv(envVarStreamChannelSize, "0"); err != nil { + t.Fatal(err) + } + + hec := NewHTTPEventCollectorMock(t) + hec.simulateServerError = true + go hec.Serve() + + ctx := logger.Context{ + Config: map[string]string{ + splunkURLKey: hec.URL(), + splunkTokenKey: hec.token, + splunkVerifyConnectionKey: "false", + }, + ContainerID: "containeriid", + ContainerName: "/container_name", + ContainerImageID: "contaimageid", + ContainerImageName: "container_image_name", + } + + loggerDriver, err := New(ctx) + if err != nil { + t.Fatal(err) + } + + if hec.connectionVerified { + t.Fatal("Connection should not be verified") + } + + for i := 0; i < 5; i++ { + if err := loggerDriver.Log(&logger.Message{[]byte(fmt.Sprintf("%d", i)), "stdout", time.Now(), nil, false}); err != nil { + t.Fatal(err) + } + } + + err = loggerDriver.Close() + if err != nil { + t.Fatal(err) + } + + if len(hec.messages) != 0 { + t.Fatal("No messages should be sent") + } + + err = hec.Close() + if err != nil { + t.Fatal(err) + } + + if err := os.Setenv(envVarPostMessagesBatchSize, ""); err != nil { + t.Fatal(err) + } + + if err := os.Setenv(envVarBufferMaximum, ""); err != nil { + t.Fatal(err) + } + + if err := os.Setenv(envVarStreamChannelSize, ""); err != nil { + t.Fatal(err) + } +} + +// Cannot send messages after we close driver +func TestCannotSendAfterClose(t *testing.T) { + hec := NewHTTPEventCollectorMock(t) + go hec.Serve() + + ctx := logger.Context{ + Config: map[string]string{ + splunkURLKey: hec.URL(), + splunkTokenKey: hec.token, + }, + ContainerID: "containeriid", + ContainerName: "/container_name", + ContainerImageID: "contaimageid", + ContainerImageName: "container_image_name", + } + + loggerDriver, err := New(ctx) + if err != nil { + t.Fatal(err) + } + + if err := loggerDriver.Log(&logger.Message{[]byte("message1"), "stdout", time.Now(), nil, false}); err != nil { + t.Fatal(err) + } + + err = loggerDriver.Close() + if err != nil { + t.Fatal(err) + } + + if err := loggerDriver.Log(&logger.Message{[]byte("message2"), "stdout", time.Now(), nil, false}); err == nil { + t.Fatal("Driver should not allow to send messages after close") + } + + if len(hec.messages) != 1 { + t.Fatal("Only one message should be sent") + } + + message := hec.messages[0] + if event, err := message.EventAsMap(); err != nil { + t.Fatal(err) + } else { + if event["line"] != "message1" { + t.Fatalf("Unexpected event in message %v", event) + } + } + + err = hec.Close() + if err != nil { + t.Fatal(err) + } +} diff --git a/daemon/logger/splunk/splunkhecmock_test.go b/daemon/logger/splunk/splunkhecmock_test.go new file mode 100644 index 0000000000..e508948280 --- /dev/null +++ b/daemon/logger/splunk/splunkhecmock_test.go @@ -0,0 +1,157 @@ +package splunk + +import ( + "compress/gzip" + "encoding/json" + "fmt" + "io" + "io/ioutil" + "net" + "net/http" + "testing" +) + +func (message *splunkMessage) EventAsString() (string, error) { + if val, ok := message.Event.(string); ok { + return val, nil + } + return "", fmt.Errorf("Cannot cast Event %v to string", message.Event) +} + +func (message *splunkMessage) EventAsMap() (map[string]interface{}, error) { + if val, ok := message.Event.(map[string]interface{}); ok { + return val, nil + } + return nil, fmt.Errorf("Cannot cast Event %v to map", message.Event) +} + +type HTTPEventCollectorMock struct { + tcpAddr *net.TCPAddr + tcpListener *net.TCPListener + + token string + simulateServerError bool + + test *testing.T + + connectionVerified bool + gzipEnabled *bool + messages []*splunkMessage + numOfRequests int +} + +func NewHTTPEventCollectorMock(t *testing.T) *HTTPEventCollectorMock { + tcpAddr := &net.TCPAddr{IP: []byte{127, 0, 0, 1}, Port: 0, Zone: ""} + tcpListener, err := net.ListenTCP("tcp", tcpAddr) + if err != nil { + t.Fatal(err) + } + return &HTTPEventCollectorMock{ + tcpAddr: tcpAddr, + tcpListener: tcpListener, + token: "4642492F-D8BD-47F1-A005-0C08AE4657DF", + simulateServerError: false, + test: t, + connectionVerified: false} +} + +func (hec *HTTPEventCollectorMock) URL() string { + return "http://" + hec.tcpListener.Addr().String() +} + +func (hec *HTTPEventCollectorMock) Serve() error { + return http.Serve(hec.tcpListener, hec) +} + +func (hec *HTTPEventCollectorMock) Close() error { + return hec.tcpListener.Close() +} + +func (hec *HTTPEventCollectorMock) ServeHTTP(writer http.ResponseWriter, request *http.Request) { + var err error + + hec.numOfRequests++ + + if hec.simulateServerError { + if request.Body != nil { + defer request.Body.Close() + } + writer.WriteHeader(http.StatusInternalServerError) + return + } + + switch request.Method { + case http.MethodOptions: + // Verify that options method is getting called only once + if hec.connectionVerified { + hec.test.Errorf("Connection should not be verified more than once. Got second request with %s method.", request.Method) + } + hec.connectionVerified = true + writer.WriteHeader(http.StatusOK) + case http.MethodPost: + // Always verify that Driver is using correct path to HEC + if request.URL.String() != "/services/collector/event/1.0" { + hec.test.Errorf("Unexpected path %v", request.URL) + } + defer request.Body.Close() + + if authorization, ok := request.Header["Authorization"]; !ok || authorization[0] != ("Splunk "+hec.token) { + hec.test.Error("Authorization header is invalid.") + } + + gzipEnabled := false + if contentEncoding, ok := request.Header["Content-Encoding"]; ok && contentEncoding[0] == "gzip" { + gzipEnabled = true + } + + if hec.gzipEnabled == nil { + hec.gzipEnabled = &gzipEnabled + } else if gzipEnabled != *hec.gzipEnabled { + // Nothing wrong with that, but we just know that Splunk Logging Driver does not do that + hec.test.Error("Driver should not change Content Encoding.") + } + + var gzipReader *gzip.Reader + var reader io.Reader + if gzipEnabled { + gzipReader, err = gzip.NewReader(request.Body) + if err != nil { + hec.test.Fatal(err) + } + reader = gzipReader + } else { + reader = request.Body + } + + // Read body + var body []byte + body, err = ioutil.ReadAll(reader) + if err != nil { + hec.test.Fatal(err) + } + + // Parse message + messageStart := 0 + for i := 0; i < len(body); i++ { + if i == len(body)-1 || (body[i] == '}' && body[i+1] == '{') { + var message splunkMessage + err = json.Unmarshal(body[messageStart:i+1], &message) + if err != nil { + hec.test.Log(string(body[messageStart : i+1])) + hec.test.Fatal(err) + } + hec.messages = append(hec.messages, &message) + messageStart = i + 1 + } + } + + if gzipEnabled { + gzipReader.Close() + } + + writer.WriteHeader(http.StatusOK) + default: + hec.test.Errorf("Unexpected HTTP method %s", http.MethodOptions) + writer.WriteHeader(http.StatusBadRequest) + } +} diff --git a/docs/admin/logging/splunk.md b/docs/admin/logging/splunk.md index 2d9ed558ed..e081512d2c 100644 --- a/docs/admin/logging/splunk.md +++ b/docs/admin/logging/splunk.md @@ -32,21 +32,23 @@ You can set the logging driver for a specific container by using the You can use the `--log-opt NAME=VALUE` flag to specify these additional Splunk logging driver options: -| Option | Required | Description | -|-----------------------------|----------|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| -| `splunk-token` | required | Splunk HTTP Event Collector token. | -| `splunk-url` | required | Path to your Splunk Enterprise or Splunk Cloud instance (including port and scheme used by HTTP Event Collector) `https://your_splunk_instance:8088`. | -| `splunk-source` | optional | Event source. | -| `splunk-sourcetype` | optional | Event source type. | -| `splunk-index` | optional | Event index. | -| `splunk-capath` | optional | Path to root certificate. | -| `splunk-caname` | optional | Name to use for validating server certificate; by default the hostname of the `splunk-url` will be used. | -| `splunk-insecureskipverify` | optional | Ignore server certificate validation. | -| `splunk-format` | optional | Message format. Can be `inline`, `json` or `raw`. Defaults to `inline`. | -| `splunk-verify-connection` | optional | Verify on start, that docker can connect to Splunk server. Defaults to true. | -| `tag` | optional | Specify tag for message, which interpret some markup. Default value is `{{.ID}}` (12 characters of the container ID). Refer to the [log tag option documentation](log_tags.md) for customizing the log tag format. | -| `labels` | optional | Comma-separated list of keys of labels, which should be included in message, if these labels are specified for container. | -| `env` | optional | Comma-separated list of keys of environment variables, which should be included in message, if these variables are specified for container. | +| Option | Required | Description | +|-----------------------------|----------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| `splunk-token` | required | Splunk HTTP Event Collector token. | +| `splunk-url` | required | Path to your Splunk Enterprise or Splunk Cloud instance (including port and scheme used by HTTP Event Collector) `https://your_splunk_instance:8088`. | +| `splunk-source` | optional | Event source. | +| `splunk-sourcetype` | optional | Event source type. | +| `splunk-index` | optional | Event index. | +| `splunk-capath` | optional | Path to root certificate. | +| `splunk-caname` | optional | Name to use for validating server certificate; by default the hostname of the `splunk-url` will be used. | +| `splunk-insecureskipverify` | optional | Ignore server certificate validation. | +| `splunk-format` | optional | Message format. Can be `inline`, `json` or `raw`. Defaults to `inline`. | +| `splunk-verify-connection` | optional | Verify on start, that docker can connect to Splunk server. Defaults to true. | +| `splunk-gzip` | optional | Enable/disable gzip compression to send events to Splunk Enterprise or Splunk Cloud instance. Defaults to false. | +| `splunk-gzip-level` | optional | Set compression level for gzip. Valid values are -1 (default), 0 (no compression), 1 (best speed) ... 9 (best compression). Defaults to [DefaultCompression](https://golang.org/pkg/compress/gzip/#DefaultCompression). | +| `tag` | optional | Specify tag for message, which interpret some markup. Default value is `{{.ID}}` (12 characters of the container ID). Refer to the [log tag option documentation](log_tags.md) for customizing the log tag format. | +| `labels` | optional | Comma-separated list of keys of labels, which should be included in message, if these labels are specified for container. | +| `env` | optional | Comma-separated list of keys of environment variables, which should be included in message, if these variables are specified for container. | If there is collision between `label` and `env` keys, the value of the `env` takes precedence. Both options add additional fields to the attributes of a logging message. @@ -132,3 +134,14 @@ tag will be prefixed to the message. For example MyImage/MyContainer env1=val1 label1=label1 my message MyImage/MyContainer env1=val1 label1=label1 {"foo": "bar"} ``` + +## Advanced options + +Splunk Logging Driver allows you to configure few advanced options by specifying next environment variables for the Docker daemon. + +| Environment variable name | Default value | Description | +|--------------------------------------------------|---------------|----------------------------------------------------------------------------------------------------------------------------------------------------| +| `SPLUNK_LOGGING_DRIVER_POST_MESSAGES_FREQUENCY` | `5s` | If there is nothing to batch how often driver will post messages. You can think about this as the maximum time to wait for more messages to batch. | +| `SPLUNK_LOGGING_DRIVER_POST_MESSAGES_BATCH_SIZE` | `1000` | How many messages driver should wait before sending them in one batch. | +| `SPLUNK_LOGGING_DRIVER_BUFFER_MAX` | `10 * 1000` | If driver cannot connect to remote server, what is the maximum amount of messages it can hold in buffer for retries. | +| `SPLUNK_LOGGING_DRIVER_CHANNEL_SIZE` | `4 * 1000` | How many pending messages can be in the channel which is used to send messages to background logger worker, which batches them. |