Переглянути джерело

Merge pull request #26207 from splunk/splunk-logging-driver-performance-improvements

Splunk Logging Driver performance improvements
Michael Crosby 8 роки тому
батько
коміт
efe4e914ef

+ 2 - 2
contrib/completion/bash/docker

@@ -520,7 +520,7 @@ __docker_complete_log_options() {
 	local journald_options="env labels tag"
 	local json_file_options="env labels max-file max-size"
 	local syslog_options="env labels syslog-address syslog-facility syslog-format syslog-tls-ca-cert syslog-tls-cert syslog-tls-key syslog-tls-skip-verify tag"
-	local splunk_options="env labels splunk-caname splunk-capath splunk-format splunk-index splunk-insecureskipverify splunk-source splunk-sourcetype splunk-token splunk-url splunk-verify-connection tag"
+	local splunk_options="env labels splunk-caname splunk-capath splunk-format splunk-gzip splunk-gzip-level splunk-index splunk-insecureskipverify splunk-source splunk-sourcetype splunk-token splunk-url splunk-verify-connection tag"
 
 	local all_options="$fluentd_options $gcplogs_options $gelf_options $journald_options $json_file_options $syslog_options $splunk_options"
 
@@ -629,7 +629,7 @@ __docker_complete_log_driver_options() {
 			__ltrim_colon_completions "${cur}"
 			return
 			;;
-		splunk-insecureskipverify|splunk-verify-connection)
+		splunk-gzip|splunk-insecureskipverify|splunk-verify-connection)
 			COMPREPLY=( $( compgen -W "false true" -- "${cur##*=}" ) )
 			return
 			;;

+ 1 - 1
contrib/completion/zsh/_docker

@@ -228,7 +228,7 @@ __docker_get_log_options() {
     journald_options=("env" "labels" "tag")
     json_file_options=("env" "labels" "max-file" "max-size")
     syslog_options=("env" "labels" "syslog-address" "syslog-facility" "syslog-format" "syslog-tls-ca-cert" "syslog-tls-cert" "syslog-tls-key" "syslog-tls-skip-verify" "tag")
-    splunk_options=("env" "labels" "splunk-caname" "splunk-capath" "splunk-format" "splunk-index" "splunk-insecureskipverify" "splunk-source" "splunk-sourcetype" "splunk-token" "splunk-url" "splunk-verify-connection" "tag")
+    splunk_options=("env" "labels" "splunk-caname" "splunk-capath" "splunk-format" "splunk-gzip" "splunk-gzip-level" "splunk-index" "splunk-insecureskipverify" "splunk-source" "splunk-sourcetype" "splunk-token" "splunk-url" "splunk-verify-connection" "tag")
 
     [[ $log_driver = (awslogs|all) ]] && _describe -t awslogs-options "awslogs options" awslogs_options "$@" && ret=0
     [[ $log_driver = (fluentd|all) ]] && _describe -t fluentd-options "fluentd options" fluentd_options "$@" && ret=0

+ 271 - 37
daemon/logger/splunk/splunk.go

@@ -4,6 +4,7 @@ package splunk
 
 import (
 	"bytes"
+	"compress/gzip"
 	"crypto/tls"
 	"crypto/x509"
 	"encoding/json"
@@ -12,7 +13,9 @@ import (
 	"io/ioutil"
 	"net/http"
 	"net/url"
+	"os"
 	"strconv"
+	"sync"
 	"time"
 
 	"github.com/Sirupsen/logrus"
@@ -22,22 +25,47 @@ import (
 )
 
 const (
-	driverName                  = "splunk"
-	splunkURLKey                = "splunk-url"
-	splunkTokenKey              = "splunk-token"
-	splunkSourceKey             = "splunk-source"
-	splunkSourceTypeKey         = "splunk-sourcetype"
-	splunkIndexKey              = "splunk-index"
-	splunkCAPathKey             = "splunk-capath"
-	splunkCANameKey             = "splunk-caname"
-	splunkInsecureSkipVerifyKey = "splunk-insecureskipverify"
-	splunkFormatKey             = "splunk-format"
-	splunkVerifyConnectionKey   = "splunk-verify-connection"
-	envKey                      = "env"
-	labelsKey                   = "labels"
-	tagKey                      = "tag"
+	driverName                    = "splunk"
+	splunkURLKey                  = "splunk-url"
+	splunkTokenKey                = "splunk-token"
+	splunkSourceKey               = "splunk-source"
+	splunkSourceTypeKey           = "splunk-sourcetype"
+	splunkIndexKey                = "splunk-index"
+	splunkCAPathKey               = "splunk-capath"
+	splunkCANameKey               = "splunk-caname"
+	splunkInsecureSkipVerifyKey   = "splunk-insecureskipverify"
+	splunkFormatKey               = "splunk-format"
+	splunkVerifyConnectionKey     = "splunk-verify-connection"
+	splunkGzipCompressionKey      = "splunk-gzip"
+	splunkGzipCompressionLevelKey = "splunk-gzip-level"
+	envKey                        = "env"
+	labelsKey                     = "labels"
+	tagKey                        = "tag"
 )
 
+const (
+	// How often do we send messages (if we are not reaching batch size)
+	defaultPostMessagesFrequency = 5 * time.Second
+	// How big can be batch of messages
+	defaultPostMessagesBatchSize = 1000
+	// Maximum number of messages we can store in buffer
+	defaultBufferMaximum = 10 * defaultPostMessagesBatchSize
+	// Number of messages allowed to be queued in the channel
+	defaultStreamChannelSize = 4 * defaultPostMessagesBatchSize
+)
+
+const (
+	envVarPostMessagesFrequency = "SPLUNK_LOGGING_DRIVER_POST_MESSAGES_FREQUENCY"
+	envVarPostMessagesBatchSize = "SPLUNK_LOGGING_DRIVER_POST_MESSAGES_BATCH_SIZE"
+	envVarBufferMaximum         = "SPLUNK_LOGGING_DRIVER_BUFFER_MAX"
+	envVarStreamChannelSize     = "SPLUNK_LOGGING_DRIVER_CHANNEL_SIZE"
+)
+
+type splunkLoggerInterface interface {
+	logger.Logger
+	worker()
+}
+
 type splunkLogger struct {
 	client    *http.Client
 	transport *http.Transport
@@ -45,6 +73,23 @@ type splunkLogger struct {
 	url         string
 	auth        string
 	nullMessage *splunkMessage
+
+	// http compression
+	gzipCompression      bool
+	gzipCompressionLevel int
+
+	// Advanced options
+	postMessagesFrequency time.Duration
+	postMessagesBatchSize int
+	bufferMaximum         int
+
+	// For synchronization between background worker and logger.
+	// We use channel to send messages to worker go routine.
+	// All other variables for blocking Close call before we flush all messages to HEC
+	stream     chan *splunkMessage
+	lock       sync.RWMutex
+	closed     bool
+	closedCond *sync.Cond
 }
 
 type splunkLoggerInline struct {
@@ -140,6 +185,29 @@ func New(ctx logger.Context) (logger.Logger, error) {
 		tlsConfig.ServerName = caName
 	}
 
+	gzipCompression := false
+	if gzipCompressionStr, ok := ctx.Config[splunkGzipCompressionKey]; ok {
+		gzipCompression, err = strconv.ParseBool(gzipCompressionStr)
+		if err != nil {
+			return nil, err
+		}
+	}
+
+	gzipCompressionLevel := gzip.DefaultCompression
+	if gzipCompressionLevelStr, ok := ctx.Config[splunkGzipCompressionLevelKey]; ok {
+		var err error
+		gzipCompressionLevel64, err := strconv.ParseInt(gzipCompressionLevelStr, 10, 32)
+		if err != nil {
+			return nil, err
+		}
+		gzipCompressionLevel = int(gzipCompressionLevel64)
+		if gzipCompressionLevel < gzip.DefaultCompression || gzipCompressionLevel > gzip.BestCompression {
+			err := fmt.Errorf("Not supported level '%s' for %s (supported values between %d and %d).",
+				gzipCompressionLevelStr, splunkGzipCompressionLevelKey, gzip.DefaultCompression, gzip.BestCompression)
+			return nil, err
+		}
+	}
+
 	transport := &http.Transport{
 		TLSClientConfig: tlsConfig,
 	}
@@ -158,19 +226,36 @@ func New(ctx logger.Context) (logger.Logger, error) {
 		Index:      index,
 	}
 
-	tag, err := loggerutils.ParseLogTag(ctx, loggerutils.DefaultTemplate)
-	if err != nil {
-		return nil, err
+	// Allow user to remove tag from the messages by setting tag to empty string
+	tag := ""
+	if tagTemplate, ok := ctx.Config[tagKey]; !ok || tagTemplate != "" {
+		tag, err = loggerutils.ParseLogTag(ctx, loggerutils.DefaultTemplate)
+		if err != nil {
+			return nil, err
+		}
 	}
 
 	attrs := ctx.ExtraAttributes(nil)
 
+	var (
+		postMessagesFrequency = getAdvancedOptionDuration(envVarPostMessagesFrequency, defaultPostMessagesFrequency)
+		postMessagesBatchSize = getAdvancedOptionInt(envVarPostMessagesBatchSize, defaultPostMessagesBatchSize)
+		bufferMaximum         = getAdvancedOptionInt(envVarBufferMaximum, defaultBufferMaximum)
+		streamChannelSize     = getAdvancedOptionInt(envVarStreamChannelSize, defaultStreamChannelSize)
+	)
+
 	logger := &splunkLogger{
-		client:      client,
-		transport:   transport,
-		url:         splunkURL.String(),
-		auth:        "Splunk " + splunkToken,
-		nullMessage: nullMessage,
+		client:                client,
+		transport:             transport,
+		url:                   splunkURL.String(),
+		auth:                  "Splunk " + splunkToken,
+		nullMessage:           nullMessage,
+		gzipCompression:       gzipCompression,
+		gzipCompressionLevel:  gzipCompressionLevel,
+		stream:                make(chan *splunkMessage, streamChannelSize),
+		postMessagesFrequency: postMessagesFrequency,
+		postMessagesBatchSize: postMessagesBatchSize,
+		bufferMaximum:         bufferMaximum,
 	}
 
 	// By default we verify connection, but we allow use to skip that
@@ -203,6 +288,8 @@ func New(ctx logger.Context) (logger.Logger, error) {
 		splunkFormat = splunkFormatInline
 	}
 
+	var loggerWrapper splunkLoggerInterface
+
 	switch splunkFormat {
 	case splunkFormatInline:
 		nullEvent := &splunkMessageEvent{
@@ -210,18 +297,20 @@ func New(ctx logger.Context) (logger.Logger, error) {
 			Attrs: attrs,
 		}
 
-		return &splunkLoggerInline{logger, nullEvent}, nil
+		loggerWrapper = &splunkLoggerInline{logger, nullEvent}
 	case splunkFormatJSON:
 		nullEvent := &splunkMessageEvent{
 			Tag:   tag,
 			Attrs: attrs,
 		}
 
-		return &splunkLoggerJSON{&splunkLoggerInline{logger, nullEvent}}, nil
+		loggerWrapper = &splunkLoggerJSON{&splunkLoggerInline{logger, nullEvent}}
 	case splunkFormatRaw:
 		var prefix bytes.Buffer
-		prefix.WriteString(tag)
-		prefix.WriteString(" ")
+		if tag != "" {
+			prefix.WriteString(tag)
+			prefix.WriteString(" ")
+		}
 		for key, value := range attrs {
 			prefix.WriteString(key)
 			prefix.WriteString("=")
@@ -229,10 +318,14 @@ func New(ctx logger.Context) (logger.Logger, error) {
 			prefix.WriteString(" ")
 		}
 
-		return &splunkLoggerRaw{logger, prefix.Bytes()}, nil
+		loggerWrapper = &splunkLoggerRaw{logger, prefix.Bytes()}
 	default:
 		return nil, fmt.Errorf("Unexpected format %s", splunkFormat)
 	}
+
+	go loggerWrapper.worker()
+
+	return loggerWrapper, nil
 }
 
 func (l *splunkLoggerInline) Log(msg *logger.Message) error {
@@ -244,7 +337,7 @@ func (l *splunkLoggerInline) Log(msg *logger.Message) error {
 
 	message.Event = &event
 
-	return l.postMessage(message)
+	return l.queueMessageAsync(message)
 }
 
 func (l *splunkLoggerJSON) Log(msg *logger.Message) error {
@@ -262,7 +355,7 @@ func (l *splunkLoggerJSON) Log(msg *logger.Message) error {
 
 	message.Event = &event
 
-	return l.postMessage(message)
+	return l.queueMessageAsync(message)
 }
 
 func (l *splunkLoggerRaw) Log(msg *logger.Message) error {
@@ -270,19 +363,124 @@ func (l *splunkLoggerRaw) Log(msg *logger.Message) error {
 
 	message.Event = string(append(l.prefix, msg.Line...))
 
-	return l.postMessage(message)
+	return l.queueMessageAsync(message)
 }
 
-func (l *splunkLogger) postMessage(message *splunkMessage) error {
-	jsonEvent, err := json.Marshal(message)
-	if err != nil {
-		return err
+func (l *splunkLogger) queueMessageAsync(message *splunkMessage) error {
+	l.lock.RLock()
+	defer l.lock.RUnlock()
+	if l.closedCond != nil {
+		return fmt.Errorf("%s: driver is closed", driverName)
+	}
+	l.stream <- message
+	return nil
+}
+
+func (l *splunkLogger) worker() {
+	timer := time.NewTicker(l.postMessagesFrequency)
+	var messages []*splunkMessage
+	for {
+		select {
+		case message, open := <-l.stream:
+			if !open {
+				l.postMessages(messages, true)
+				l.lock.Lock()
+				defer l.lock.Unlock()
+				l.transport.CloseIdleConnections()
+				l.closed = true
+				l.closedCond.Signal()
+				return
+			}
+			messages = append(messages, message)
+			// Only sending when we get exactly to the batch size,
+			// This also helps not to fire postMessages on every new message,
+			// when previous try failed.
+			if len(messages)%l.postMessagesBatchSize == 0 {
+				messages = l.postMessages(messages, false)
+			}
+		case <-timer.C:
+			messages = l.postMessages(messages, false)
+		}
+	}
+}
+
+func (l *splunkLogger) postMessages(messages []*splunkMessage, lastChance bool) []*splunkMessage {
+	messagesLen := len(messages)
+	for i := 0; i < messagesLen; i += l.postMessagesBatchSize {
+		upperBound := i + l.postMessagesBatchSize
+		if upperBound > messagesLen {
+			upperBound = messagesLen
+		}
+		if err := l.tryPostMessages(messages[i:upperBound]); err != nil {
+			logrus.Error(err)
+			if messagesLen-i >= l.bufferMaximum || lastChance {
+				// If this is last chance - print them all to the daemon log
+				if lastChance {
+					upperBound = messagesLen
+				}
+				// Not all sent, but buffer has got to its maximum, let's log all messages
+				// we could not send and return buffer minus one batch size
+				for j := i; j < upperBound; j++ {
+					if jsonEvent, err := json.Marshal(messages[j]); err != nil {
+						logrus.Error(err)
+					} else {
+						logrus.Error(fmt.Errorf("Failed to send a message '%s'", string(jsonEvent)))
+					}
+				}
+				return messages[upperBound:messagesLen]
+			}
+			// Not all sent, returning buffer from where we have not sent messages
+			return messages[i:messagesLen]
+		}
+	}
+	// All sent, return empty buffer
+	return messages[:0]
+}
+
+func (l *splunkLogger) tryPostMessages(messages []*splunkMessage) error {
+	if len(messages) == 0 {
+		return nil
+	}
+	var buffer bytes.Buffer
+	var writer io.Writer
+	var gzipWriter *gzip.Writer
+	var err error
+	// If gzip compression is enabled - create gzip writer with specified compression
+	// level. If gzip compression is disabled, use standard buffer as a writer
+	if l.gzipCompression {
+		gzipWriter, err = gzip.NewWriterLevel(&buffer, l.gzipCompressionLevel)
+		if err != nil {
+			return err
+		}
+		writer = gzipWriter
+	} else {
+		writer = &buffer
+	}
+	for _, message := range messages {
+		jsonEvent, err := json.Marshal(message)
+		if err != nil {
+			return err
+		}
+		if _, err := writer.Write(jsonEvent); err != nil {
+			return err
+		}
+	}
+	// If gzip compression is enabled, tell it, that we are done
+	if l.gzipCompression {
+		err = gzipWriter.Close()
+		if err != nil {
+			return err
+		}
 	}
-	req, err := http.NewRequest("POST", l.url, bytes.NewBuffer(jsonEvent))
+	req, err := http.NewRequest("POST", l.url, bytes.NewBuffer(buffer.Bytes()))
 	if err != nil {
 		return err
 	}
 	req.Header.Set("Authorization", l.auth)
+	// Tell if we are sending gzip compressed body
+	if l.gzipCompression {
+		req.Header.Set("Content-Encoding", "gzip")
+	}
 	res, err := l.client.Do(req)
 	if err != nil {
 		return err
@@ -301,7 +499,15 @@ func (l *splunkLogger) postMessage(message *splunkMessage) error {
 }
 
 func (l *splunkLogger) Close() error {
-	l.transport.CloseIdleConnections()
+	l.lock.Lock()
+	defer l.lock.Unlock()
+	if l.closedCond == nil {
+		l.closedCond = sync.NewCond(&l.lock)
+		close(l.stream)
+		for !l.closed {
+			l.closedCond.Wait()
+		}
+	}
 	return nil
 }
 
@@ -329,6 +535,8 @@ func ValidateLogOpt(cfg map[string]string) error {
 		case splunkInsecureSkipVerifyKey:
 		case splunkFormatKey:
 		case splunkVerifyConnectionKey:
+		case splunkGzipCompressionKey:
+		case splunkGzipCompressionLevelKey:
 		case envKey:
 		case labelsKey:
 		case tagKey:
@@ -364,7 +572,7 @@ func parseURL(ctx logger.Context) (*url.URL, error) {
 }
 
 func verifySplunkConnection(l *splunkLogger) error {
-	req, err := http.NewRequest("OPTIONS", l.url, nil)
+	req, err := http.NewRequest(http.MethodOptions, l.url, nil)
 	if err != nil {
 		return err
 	}
@@ -385,3 +593,29 @@ func verifySplunkConnection(l *splunkLogger) error {
 	}
 	return nil
 }
+
+func getAdvancedOptionDuration(envName string, defaultValue time.Duration) time.Duration {
+	valueStr := os.Getenv(envName)
+	if valueStr == "" {
+		return defaultValue
+	}
+	parsedValue, err := time.ParseDuration(valueStr)
+	if err != nil {
+		logrus.Error(fmt.Sprintf("Failed to parse value of %s as duration. Using default %v. %v", envName, defaultValue, err))
+		return defaultValue
+	}
+	return parsedValue
+}
+
+func getAdvancedOptionInt(envName string, defaultValue int) int {
+	valueStr := os.Getenv(envName)
+	if valueStr == "" {
+		return defaultValue
+	}
+	parsedValue, err := strconv.ParseInt(valueStr, 10, 32)
+	if err != nil {
+		logrus.Error(fmt.Sprintf("Failed to parse value of %s as integer. Using default %d. %v", envName, defaultValue, err))
+		return defaultValue
+	}
+	return int(parsedValue)
+}

+ 1302 - 0
daemon/logger/splunk/splunk_test.go

@@ -0,0 +1,1302 @@
+package splunk
+
+import (
+	"compress/gzip"
+	"fmt"
+	"os"
+	"testing"
+	"time"
+
+	"github.com/docker/docker/daemon/logger"
+)
+
+// Validate options
+func TestValidateLogOpt(t *testing.T) {
+	err := ValidateLogOpt(map[string]string{
+		splunkURLKey:                  "http://127.0.0.1",
+		splunkTokenKey:                "2160C7EF-2CE9-4307-A180-F852B99CF417",
+		splunkSourceKey:               "mysource",
+		splunkSourceTypeKey:           "mysourcetype",
+		splunkIndexKey:                "myindex",
+		splunkCAPathKey:               "/usr/cert.pem",
+		splunkCANameKey:               "ca_name",
+		splunkInsecureSkipVerifyKey:   "true",
+		splunkFormatKey:               "json",
+		splunkVerifyConnectionKey:     "true",
+		splunkGzipCompressionKey:      "true",
+		splunkGzipCompressionLevelKey: "1",
+		envKey:    "a",
+		labelsKey: "b",
+		tagKey:    "c",
+	})
+	if err != nil {
+		t.Fatal(err)
+	}
+
+	err = ValidateLogOpt(map[string]string{
+		"not-supported-option": "a",
+	})
+	if err == nil {
+		t.Fatal("Expecting error on unsupported options")
+	}
+}
+
+// Driver require user to specify required options
+func TestNewMissedConfig(t *testing.T) {
+	ctx := logger.Context{
+		Config: map[string]string{},
+	}
+	_, err := New(ctx)
+	if err == nil {
+		t.Fatal("Logger driver should fail when no required parameters specified")
+	}
+}
+
+// Driver require user to specify splunk-url
+func TestNewMissedUrl(t *testing.T) {
+	ctx := logger.Context{
+		Config: map[string]string{
+			splunkTokenKey: "4642492F-D8BD-47F1-A005-0C08AE4657DF",
+		},
+	}
+	_, err := New(ctx)
+	if err.Error() != "splunk: splunk-url is expected" {
+		t.Fatal("Logger driver should fail when no required parameters specified")
+	}
+}
+
+// Driver require user to specify splunk-token
+func TestNewMissedToken(t *testing.T) {
+	ctx := logger.Context{
+		Config: map[string]string{
+			splunkURLKey: "http://127.0.0.1:8088",
+		},
+	}
+	_, err := New(ctx)
+	if err.Error() != "splunk: splunk-token is expected" {
+		t.Fatal("Logger driver should fail when no required parameters specified")
+	}
+}
+
+// Test default settings
+func TestDefault(t *testing.T) {
+	hec := NewHTTPEventCollectorMock(t)
+
+	go hec.Serve()
+
+	ctx := logger.Context{
+		Config: map[string]string{
+			splunkURLKey:   hec.URL(),
+			splunkTokenKey: hec.token,
+		},
+		ContainerID:        "containeriid",
+		ContainerName:      "container_name",
+		ContainerImageID:   "contaimageid",
+		ContainerImageName: "container_image_name",
+	}
+
+	hostname, err := ctx.Hostname()
+	if err != nil {
+		t.Fatal(err)
+	}
+
+	loggerDriver, err := New(ctx)
+	if err != nil {
+		t.Fatal(err)
+	}
+
+	if loggerDriver.Name() != driverName {
+		t.Fatal("Unexpected logger driver name")
+	}
+
+	if !hec.connectionVerified {
+		t.Fatal("By default connection should be verified")
+	}
+
+	splunkLoggerDriver, ok := loggerDriver.(*splunkLoggerInline)
+	if !ok {
+		t.Fatal("Unexpected Splunk Logging Driver type")
+	}
+
+	if splunkLoggerDriver.url != hec.URL()+"/services/collector/event/1.0" ||
+		splunkLoggerDriver.auth != "Splunk "+hec.token ||
+		splunkLoggerDriver.nullMessage.Host != hostname ||
+		splunkLoggerDriver.nullMessage.Source != "" ||
+		splunkLoggerDriver.nullMessage.SourceType != "" ||
+		splunkLoggerDriver.nullMessage.Index != "" ||
+		splunkLoggerDriver.gzipCompression != false ||
+		splunkLoggerDriver.postMessagesFrequency != defaultPostMessagesFrequency ||
+		splunkLoggerDriver.postMessagesBatchSize != defaultPostMessagesBatchSize ||
+		splunkLoggerDriver.bufferMaximum != defaultBufferMaximum ||
+		cap(splunkLoggerDriver.stream) != defaultStreamChannelSize {
+		t.Fatal("Found not default values setup in Splunk Logging Driver.")
+	}
+
+	message1Time := time.Now()
+	if err := loggerDriver.Log(&logger.Message{[]byte("{\"a\":\"b\"}"), "stdout", message1Time, nil, false}); err != nil {
+		t.Fatal(err)
+	}
+	message2Time := time.Now()
+	if err := loggerDriver.Log(&logger.Message{[]byte("notajson"), "stdout", message2Time, nil, false}); err != nil {
+		t.Fatal(err)
+	}
+
+	err = loggerDriver.Close()
+	if err != nil {
+		t.Fatal(err)
+	}
+
+	if len(hec.messages) != 2 {
+		t.Fatal("Expected two messages")
+	}
+
+	if *hec.gzipEnabled {
+		t.Fatal("Gzip should not be used")
+	}
+
+	message1 := hec.messages[0]
+	if message1.Time != fmt.Sprintf("%f", float64(message1Time.UnixNano())/float64(time.Second)) ||
+		message1.Host != hostname ||
+		message1.Source != "" ||
+		message1.SourceType != "" ||
+		message1.Index != "" {
+		t.Fatalf("Unexpected values of message 1 %v", message1)
+	}
+
+	if event, err := message1.EventAsMap(); err != nil {
+		t.Fatal(err)
+	} else {
+		if event["line"] != "{\"a\":\"b\"}" ||
+			event["source"] != "stdout" ||
+			event["tag"] != "containeriid" ||
+			len(event) != 3 {
+			t.Fatalf("Unexpected event in message %v", event)
+		}
+	}
+
+	message2 := hec.messages[1]
+	if message2.Time != fmt.Sprintf("%f", float64(message2Time.UnixNano())/float64(time.Second)) ||
+		message2.Host != hostname ||
+		message2.Source != "" ||
+		message2.SourceType != "" ||
+		message2.Index != "" {
+		t.Fatalf("Unexpected values of message 1 %v", message2)
+	}
+
+	if event, err := message2.EventAsMap(); err != nil {
+		t.Fatal(err)
+	} else {
+		if event["line"] != "notajson" ||
+			event["source"] != "stdout" ||
+			event["tag"] != "containeriid" ||
+			len(event) != 3 {
+			t.Fatalf("Unexpected event in message %v", event)
+		}
+	}
+
+	err = hec.Close()
+	if err != nil {
+		t.Fatal(err)
+	}
+}
+
+// Verify inline format with a not default settings for most of options
+func TestInlineFormatWithNonDefaultOptions(t *testing.T) {
+	hec := NewHTTPEventCollectorMock(t)
+
+	go hec.Serve()
+
+	ctx := logger.Context{
+		Config: map[string]string{
+			splunkURLKey:             hec.URL(),
+			splunkTokenKey:           hec.token,
+			splunkSourceKey:          "mysource",
+			splunkSourceTypeKey:      "mysourcetype",
+			splunkIndexKey:           "myindex",
+			splunkFormatKey:          splunkFormatInline,
+			splunkGzipCompressionKey: "true",
+			tagKey:    "{{.ImageName}}/{{.Name}}",
+			labelsKey: "a",
+		},
+		ContainerID:        "containeriid",
+		ContainerName:      "/container_name",
+		ContainerImageID:   "contaimageid",
+		ContainerImageName: "container_image_name",
+		ContainerLabels: map[string]string{
+			"a": "b",
+		},
+	}
+
+	hostname, err := ctx.Hostname()
+	if err != nil {
+		t.Fatal(err)
+	}
+
+	loggerDriver, err := New(ctx)
+	if err != nil {
+		t.Fatal(err)
+	}
+
+	if !hec.connectionVerified {
+		t.Fatal("By default connection should be verified")
+	}
+
+	splunkLoggerDriver, ok := loggerDriver.(*splunkLoggerInline)
+	if !ok {
+		t.Fatal("Unexpected Splunk Logging Driver type")
+	}
+
+	if splunkLoggerDriver.url != hec.URL()+"/services/collector/event/1.0" ||
+		splunkLoggerDriver.auth != "Splunk "+hec.token ||
+		splunkLoggerDriver.nullMessage.Host != hostname ||
+		splunkLoggerDriver.nullMessage.Source != "mysource" ||
+		splunkLoggerDriver.nullMessage.SourceType != "mysourcetype" ||
+		splunkLoggerDriver.nullMessage.Index != "myindex" ||
+		splunkLoggerDriver.gzipCompression != true ||
+		splunkLoggerDriver.gzipCompressionLevel != gzip.DefaultCompression ||
+		splunkLoggerDriver.postMessagesFrequency != defaultPostMessagesFrequency ||
+		splunkLoggerDriver.postMessagesBatchSize != defaultPostMessagesBatchSize ||
+		splunkLoggerDriver.bufferMaximum != defaultBufferMaximum ||
+		cap(splunkLoggerDriver.stream) != defaultStreamChannelSize {
+		t.Fatal("Values do not match configuration.")
+	}
+
+	messageTime := time.Now()
+	if err := loggerDriver.Log(&logger.Message{[]byte("1"), "stdout", messageTime, nil, false}); err != nil {
+		t.Fatal(err)
+	}
+
+	err = loggerDriver.Close()
+	if err != nil {
+		t.Fatal(err)
+	}
+
+	if len(hec.messages) != 1 {
+		t.Fatal("Expected one message")
+	}
+
+	if !*hec.gzipEnabled {
+		t.Fatal("Gzip should be used")
+	}
+
+	message := hec.messages[0]
+	if message.Time != fmt.Sprintf("%f", float64(messageTime.UnixNano())/float64(time.Second)) ||
+		message.Host != hostname ||
+		message.Source != "mysource" ||
+		message.SourceType != "mysourcetype" ||
+		message.Index != "myindex" {
+		t.Fatalf("Unexpected values of message %v", message)
+	}
+
+	if event, err := message.EventAsMap(); err != nil {
+		t.Fatal(err)
+	} else {
+		if event["line"] != "1" ||
+			event["source"] != "stdout" ||
+			event["tag"] != "container_image_name/container_name" ||
+			event["attrs"].(map[string]interface{})["a"] != "b" ||
+			len(event) != 4 {
+			t.Fatalf("Unexpected event in message %v", event)
+		}
+	}
+
+	err = hec.Close()
+	if err != nil {
+		t.Fatal(err)
+	}
+}
+
+// Verify JSON format
+func TestJsonFormat(t *testing.T) {
+	hec := NewHTTPEventCollectorMock(t)
+
+	go hec.Serve()
+
+	ctx := logger.Context{
+		Config: map[string]string{
+			splunkURLKey:                  hec.URL(),
+			splunkTokenKey:                hec.token,
+			splunkFormatKey:               splunkFormatJSON,
+			splunkGzipCompressionKey:      "true",
+			splunkGzipCompressionLevelKey: "1",
+		},
+		ContainerID:        "containeriid",
+		ContainerName:      "/container_name",
+		ContainerImageID:   "contaimageid",
+		ContainerImageName: "container_image_name",
+	}
+
+	hostname, err := ctx.Hostname()
+	if err != nil {
+		t.Fatal(err)
+	}
+
+	loggerDriver, err := New(ctx)
+	if err != nil {
+		t.Fatal(err)
+	}
+
+	if !hec.connectionVerified {
+		t.Fatal("By default connection should be verified")
+	}
+
+	splunkLoggerDriver, ok := loggerDriver.(*splunkLoggerJSON)
+	if !ok {
+		t.Fatal("Unexpected Splunk Logging Driver type")
+	}
+
+	if splunkLoggerDriver.url != hec.URL()+"/services/collector/event/1.0" ||
+		splunkLoggerDriver.auth != "Splunk "+hec.token ||
+		splunkLoggerDriver.nullMessage.Host != hostname ||
+		splunkLoggerDriver.nullMessage.Source != "" ||
+		splunkLoggerDriver.nullMessage.SourceType != "" ||
+		splunkLoggerDriver.nullMessage.Index != "" ||
+		splunkLoggerDriver.gzipCompression != true ||
+		splunkLoggerDriver.gzipCompressionLevel != gzip.BestSpeed ||
+		splunkLoggerDriver.postMessagesFrequency != defaultPostMessagesFrequency ||
+		splunkLoggerDriver.postMessagesBatchSize != defaultPostMessagesBatchSize ||
+		splunkLoggerDriver.bufferMaximum != defaultBufferMaximum ||
+		cap(splunkLoggerDriver.stream) != defaultStreamChannelSize {
+		t.Fatal("Values do not match configuration.")
+	}
+
+	message1Time := time.Now()
+	if err := loggerDriver.Log(&logger.Message{[]byte("{\"a\":\"b\"}"), "stdout", message1Time, nil, false}); err != nil {
+		t.Fatal(err)
+	}
+	message2Time := time.Now()
+	if err := loggerDriver.Log(&logger.Message{[]byte("notjson"), "stdout", message2Time, nil, false}); err != nil {
+		t.Fatal(err)
+	}
+
+	err = loggerDriver.Close()
+	if err != nil {
+		t.Fatal(err)
+	}
+
+	if len(hec.messages) != 2 {
+		t.Fatal("Expected two messages")
+	}
+
+	message1 := hec.messages[0]
+	if message1.Time != fmt.Sprintf("%f", float64(message1Time.UnixNano())/float64(time.Second)) ||
+		message1.Host != hostname ||
+		message1.Source != "" ||
+		message1.SourceType != "" ||
+		message1.Index != "" {
+		t.Fatalf("Unexpected values of message 1 %v", message1)
+	}
+
+	if event, err := message1.EventAsMap(); err != nil {
+		t.Fatal(err)
+	} else {
+		if event["line"].(map[string]interface{})["a"] != "b" ||
+			event["source"] != "stdout" ||
+			event["tag"] != "containeriid" ||
+			len(event) != 3 {
+			t.Fatalf("Unexpected event in message 1 %v", event)
+		}
+	}
+
+	message2 := hec.messages[1]
+	if message2.Time != fmt.Sprintf("%f", float64(message2Time.UnixNano())/float64(time.Second)) ||
+		message2.Host != hostname ||
+		message2.Source != "" ||
+		message2.SourceType != "" ||
+		message2.Index != "" {
+		t.Fatalf("Unexpected values of message 2 %v", message2)
+	}
+
+	// If message cannot be parsed as JSON - it should be sent as a line
+	if event, err := message2.EventAsMap(); err != nil {
+		t.Fatal(err)
+	} else {
+		if event["line"] != "notjson" ||
+			event["source"] != "stdout" ||
+			event["tag"] != "containeriid" ||
+			len(event) != 3 {
+			t.Fatalf("Unexpected event in message 2 %v", event)
+		}
+	}
+
+	err = hec.Close()
+	if err != nil {
+		t.Fatal(err)
+	}
+}
+
+// Verify raw format
+func TestRawFormat(t *testing.T) {
+	hec := NewHTTPEventCollectorMock(t)
+
+	go hec.Serve()
+
+	ctx := logger.Context{
+		Config: map[string]string{
+			splunkURLKey:    hec.URL(),
+			splunkTokenKey:  hec.token,
+			splunkFormatKey: splunkFormatRaw,
+		},
+		ContainerID:        "containeriid",
+		ContainerName:      "/container_name",
+		ContainerImageID:   "contaimageid",
+		ContainerImageName: "container_image_name",
+	}
+
+	hostname, err := ctx.Hostname()
+	if err != nil {
+		t.Fatal(err)
+	}
+
+	loggerDriver, err := New(ctx)
+	if err != nil {
+		t.Fatal(err)
+	}
+
+	if !hec.connectionVerified {
+		t.Fatal("By default connection should be verified")
+	}
+
+	splunkLoggerDriver, ok := loggerDriver.(*splunkLoggerRaw)
+	if !ok {
+		t.Fatal("Unexpected Splunk Logging Driver type")
+	}
+
+	if splunkLoggerDriver.url != hec.URL()+"/services/collector/event/1.0" ||
+		splunkLoggerDriver.auth != "Splunk "+hec.token ||
+		splunkLoggerDriver.nullMessage.Host != hostname ||
+		splunkLoggerDriver.nullMessage.Source != "" ||
+		splunkLoggerDriver.nullMessage.SourceType != "" ||
+		splunkLoggerDriver.nullMessage.Index != "" ||
+		splunkLoggerDriver.gzipCompression != false ||
+		splunkLoggerDriver.postMessagesFrequency != defaultPostMessagesFrequency ||
+		splunkLoggerDriver.postMessagesBatchSize != defaultPostMessagesBatchSize ||
+		splunkLoggerDriver.bufferMaximum != defaultBufferMaximum ||
+		cap(splunkLoggerDriver.stream) != defaultStreamChannelSize ||
+		string(splunkLoggerDriver.prefix) != "containeriid " {
+		t.Fatal("Values do not match configuration.")
+	}
+
+	message1Time := time.Now()
+	if err := loggerDriver.Log(&logger.Message{[]byte("{\"a\":\"b\"}"), "stdout", message1Time, nil, false}); err != nil {
+		t.Fatal(err)
+	}
+	message2Time := time.Now()
+	if err := loggerDriver.Log(&logger.Message{[]byte("notjson"), "stdout", message2Time, nil, false}); err != nil {
+		t.Fatal(err)
+	}
+
+	err = loggerDriver.Close()
+	if err != nil {
+		t.Fatal(err)
+	}
+
+	if len(hec.messages) != 2 {
+		t.Fatal("Expected two messages")
+	}
+
+	message1 := hec.messages[0]
+	if message1.Time != fmt.Sprintf("%f", float64(message1Time.UnixNano())/float64(time.Second)) ||
+		message1.Host != hostname ||
+		message1.Source != "" ||
+		message1.SourceType != "" ||
+		message1.Index != "" {
+		t.Fatalf("Unexpected values of message 1 %v", message1)
+	}
+
+	if event, err := message1.EventAsString(); err != nil {
+		t.Fatal(err)
+	} else {
+		if event != "containeriid {\"a\":\"b\"}" {
+			t.Fatalf("Unexpected event in message 1 %v", event)
+		}
+	}
+
+	message2 := hec.messages[1]
+	if message2.Time != fmt.Sprintf("%f", float64(message2Time.UnixNano())/float64(time.Second)) ||
+		message2.Host != hostname ||
+		message2.Source != "" ||
+		message2.SourceType != "" ||
+		message2.Index != "" {
+		t.Fatalf("Unexpected values of message 2 %v", message2)
+	}
+
+	if event, err := message2.EventAsString(); err != nil {
+		t.Fatal(err)
+	} else {
+		if event != "containeriid notjson" {
+			t.Fatalf("Unexpected event in message 1 %v", event)
+		}
+	}
+
+	err = hec.Close()
+	if err != nil {
+		t.Fatal(err)
+	}
+}
+
+// Verify raw format with labels
+func TestRawFormatWithLabels(t *testing.T) {
+	hec := NewHTTPEventCollectorMock(t)
+
+	go hec.Serve()
+
+	ctx := logger.Context{
+		Config: map[string]string{
+			splunkURLKey:    hec.URL(),
+			splunkTokenKey:  hec.token,
+			splunkFormatKey: splunkFormatRaw,
+			labelsKey:       "a",
+		},
+		ContainerID:        "containeriid",
+		ContainerName:      "/container_name",
+		ContainerImageID:   "contaimageid",
+		ContainerImageName: "container_image_name",
+		ContainerLabels: map[string]string{
+			"a": "b",
+		},
+	}
+
+	hostname, err := ctx.Hostname()
+	if err != nil {
+		t.Fatal(err)
+	}
+
+	loggerDriver, err := New(ctx)
+	if err != nil {
+		t.Fatal(err)
+	}
+
+	if !hec.connectionVerified {
+		t.Fatal("By default connection should be verified")
+	}
+
+	splunkLoggerDriver, ok := loggerDriver.(*splunkLoggerRaw)
+	if !ok {
+		t.Fatal("Unexpected Splunk Logging Driver type")
+	}
+
+	if splunkLoggerDriver.url != hec.URL()+"/services/collector/event/1.0" ||
+		splunkLoggerDriver.auth != "Splunk "+hec.token ||
+		splunkLoggerDriver.nullMessage.Host != hostname ||
+		splunkLoggerDriver.nullMessage.Source != "" ||
+		splunkLoggerDriver.nullMessage.SourceType != "" ||
+		splunkLoggerDriver.nullMessage.Index != "" ||
+		splunkLoggerDriver.gzipCompression != false ||
+		splunkLoggerDriver.postMessagesFrequency != defaultPostMessagesFrequency ||
+		splunkLoggerDriver.postMessagesBatchSize != defaultPostMessagesBatchSize ||
+		splunkLoggerDriver.bufferMaximum != defaultBufferMaximum ||
+		cap(splunkLoggerDriver.stream) != defaultStreamChannelSize ||
+		string(splunkLoggerDriver.prefix) != "containeriid a=b " {
+		t.Fatal("Values do not match configuration.")
+	}
+
+	message1Time := time.Now()
+	if err := loggerDriver.Log(&logger.Message{[]byte("{\"a\":\"b\"}"), "stdout", message1Time, nil, false}); err != nil {
+		t.Fatal(err)
+	}
+	message2Time := time.Now()
+	if err := loggerDriver.Log(&logger.Message{[]byte("notjson"), "stdout", message2Time, nil, false}); err != nil {
+		t.Fatal(err)
+	}
+
+	err = loggerDriver.Close()
+	if err != nil {
+		t.Fatal(err)
+	}
+
+	if len(hec.messages) != 2 {
+		t.Fatal("Expected two messages")
+	}
+
+	message1 := hec.messages[0]
+	if message1.Time != fmt.Sprintf("%f", float64(message1Time.UnixNano())/float64(time.Second)) ||
+		message1.Host != hostname ||
+		message1.Source != "" ||
+		message1.SourceType != "" ||
+		message1.Index != "" {
+		t.Fatalf("Unexpected values of message 1 %v", message1)
+	}
+
+	if event, err := message1.EventAsString(); err != nil {
+		t.Fatal(err)
+	} else {
+		if event != "containeriid a=b {\"a\":\"b\"}" {
+			t.Fatalf("Unexpected event in message 1 %v", event)
+		}
+	}
+
+	message2 := hec.messages[1]
+	if message2.Time != fmt.Sprintf("%f", float64(message2Time.UnixNano())/float64(time.Second)) ||
+		message2.Host != hostname ||
+		message2.Source != "" ||
+		message2.SourceType != "" ||
+		message2.Index != "" {
+		t.Fatalf("Unexpected values of message 2 %v", message2)
+	}
+
+	if event, err := message2.EventAsString(); err != nil {
+		t.Fatal(err)
+	} else {
+		if event != "containeriid a=b notjson" {
+			t.Fatalf("Unexpected event in message 1 %v", event)
+		}
+	}
+
+	err = hec.Close()
+	if err != nil {
+		t.Fatal(err)
+	}
+}
+
+// Verify that Splunk Logging Driver can accept tag="" which will allow to send raw messages
+// in the same way we get them in stdout/stderr
+func TestRawFormatWithoutTag(t *testing.T) {
+	hec := NewHTTPEventCollectorMock(t)
+
+	go hec.Serve()
+
+	ctx := logger.Context{
+		Config: map[string]string{
+			splunkURLKey:    hec.URL(),
+			splunkTokenKey:  hec.token,
+			splunkFormatKey: splunkFormatRaw,
+			tagKey:          "",
+		},
+		ContainerID:        "containeriid",
+		ContainerName:      "/container_name",
+		ContainerImageID:   "contaimageid",
+		ContainerImageName: "container_image_name",
+	}
+
+	hostname, err := ctx.Hostname()
+	if err != nil {
+		t.Fatal(err)
+	}
+
+	loggerDriver, err := New(ctx)
+	if err != nil {
+		t.Fatal(err)
+	}
+
+	if !hec.connectionVerified {
+		t.Fatal("By default connection should be verified")
+	}
+
+	splunkLoggerDriver, ok := loggerDriver.(*splunkLoggerRaw)
+	if !ok {
+		t.Fatal("Unexpected Splunk Logging Driver type")
+	}
+
+	if splunkLoggerDriver.url != hec.URL()+"/services/collector/event/1.0" ||
+		splunkLoggerDriver.auth != "Splunk "+hec.token ||
+		splunkLoggerDriver.nullMessage.Host != hostname ||
+		splunkLoggerDriver.nullMessage.Source != "" ||
+		splunkLoggerDriver.nullMessage.SourceType != "" ||
+		splunkLoggerDriver.nullMessage.Index != "" ||
+		splunkLoggerDriver.gzipCompression != false ||
+		splunkLoggerDriver.postMessagesFrequency != defaultPostMessagesFrequency ||
+		splunkLoggerDriver.postMessagesBatchSize != defaultPostMessagesBatchSize ||
+		splunkLoggerDriver.bufferMaximum != defaultBufferMaximum ||
+		cap(splunkLoggerDriver.stream) != defaultStreamChannelSize ||
+		string(splunkLoggerDriver.prefix) != "" {
+		t.Log(string(splunkLoggerDriver.prefix) + "a")
+		t.Fatal("Values do not match configuration.")
+	}
+
+	message1Time := time.Now()
+	if err := loggerDriver.Log(&logger.Message{[]byte("{\"a\":\"b\"}"), "stdout", message1Time, nil, false}); err != nil {
+		t.Fatal(err)
+	}
+	message2Time := time.Now()
+	if err := loggerDriver.Log(&logger.Message{[]byte("notjson"), "stdout", message2Time, nil, false}); err != nil {
+		t.Fatal(err)
+	}
+
+	err = loggerDriver.Close()
+	if err != nil {
+		t.Fatal(err)
+	}
+
+	if len(hec.messages) != 2 {
+		t.Fatal("Expected two messages")
+	}
+
+	message1 := hec.messages[0]
+	if message1.Time != fmt.Sprintf("%f", float64(message1Time.UnixNano())/float64(time.Second)) ||
+		message1.Host != hostname ||
+		message1.Source != "" ||
+		message1.SourceType != "" ||
+		message1.Index != "" {
+		t.Fatalf("Unexpected values of message 1 %v", message1)
+	}
+
+	if event, err := message1.EventAsString(); err != nil {
+		t.Fatal(err)
+	} else {
+		if event != "{\"a\":\"b\"}" {
+			t.Fatalf("Unexpected event in message 1 %v", event)
+		}
+	}
+
+	message2 := hec.messages[1]
+	if message2.Time != fmt.Sprintf("%f", float64(message2Time.UnixNano())/float64(time.Second)) ||
+		message2.Host != hostname ||
+		message2.Source != "" ||
+		message2.SourceType != "" ||
+		message2.Index != "" {
+		t.Fatalf("Unexpected values of message 2 %v", message2)
+	}
+
+	if event, err := message2.EventAsString(); err != nil {
+		t.Fatal(err)
+	} else {
+		if event != "notjson" {
+			t.Fatalf("Unexpected event in message 1 %v", event)
+		}
+	}
+
+	err = hec.Close()
+	if err != nil {
+		t.Fatal(err)
+	}
+}
+
+// Verify that we will send messages in batches with default batching parameters,
+// but change frequency to be sure that numOfRequests will match expected 17 requests
+func TestBatching(t *testing.T) {
+	if err := os.Setenv(envVarPostMessagesFrequency, "10h"); err != nil {
+		t.Fatal(err)
+	}
+
+	hec := NewHTTPEventCollectorMock(t)
+
+	go hec.Serve()
+
+	ctx := logger.Context{
+		Config: map[string]string{
+			splunkURLKey:   hec.URL(),
+			splunkTokenKey: hec.token,
+		},
+		ContainerID:        "containeriid",
+		ContainerName:      "/container_name",
+		ContainerImageID:   "contaimageid",
+		ContainerImageName: "container_image_name",
+	}
+
+	loggerDriver, err := New(ctx)
+	if err != nil {
+		t.Fatal(err)
+	}
+
+	for i := 0; i < defaultStreamChannelSize*4; i++ {
+		if err := loggerDriver.Log(&logger.Message{[]byte(fmt.Sprintf("%d", i)), "stdout", time.Now(), nil, false}); err != nil {
+			t.Fatal(err)
+		}
+	}
+
+	err = loggerDriver.Close()
+	if err != nil {
+		t.Fatal(err)
+	}
+
+	if len(hec.messages) != defaultStreamChannelSize*4 {
+		t.Fatal("Not all messages delivered")
+	}
+
+	for i, message := range hec.messages {
+		if event, err := message.EventAsMap(); err != nil {
+			t.Fatal(err)
+		} else {
+			if event["line"] != fmt.Sprintf("%d", i) {
+				t.Fatalf("Unexpected event in message %v", event)
+			}
+		}
+	}
+
+	// 1 to verify connection and 16 batches
+	if hec.numOfRequests != 17 {
+		t.Fatalf("Unexpected number of requests %d", hec.numOfRequests)
+	}
+
+	err = hec.Close()
+	if err != nil {
+		t.Fatal(err)
+	}
+
+	if err := os.Setenv(envVarPostMessagesFrequency, ""); err != nil {
+		t.Fatal(err)
+	}
+}
+
+// Verify that test is using time to fire events not rare than specified frequency
+func TestFrequency(t *testing.T) {
+	if err := os.Setenv(envVarPostMessagesFrequency, "5ms"); err != nil {
+		t.Fatal(err)
+	}
+
+	hec := NewHTTPEventCollectorMock(t)
+
+	go hec.Serve()
+
+	ctx := logger.Context{
+		Config: map[string]string{
+			splunkURLKey:   hec.URL(),
+			splunkTokenKey: hec.token,
+		},
+		ContainerID:        "containeriid",
+		ContainerName:      "/container_name",
+		ContainerImageID:   "contaimageid",
+		ContainerImageName: "container_image_name",
+	}
+
+	loggerDriver, err := New(ctx)
+	if err != nil {
+		t.Fatal(err)
+	}
+
+	for i := 0; i < 10; i++ {
+		if err := loggerDriver.Log(&logger.Message{[]byte(fmt.Sprintf("%d", i)), "stdout", time.Now(), nil, false}); err != nil {
+			t.Fatal(err)
+		}
+		time.Sleep(15 * time.Millisecond)
+	}
+
+	err = loggerDriver.Close()
+	if err != nil {
+		t.Fatal(err)
+	}
+
+	if len(hec.messages) != 10 {
+		t.Fatal("Not all messages delivered")
+	}
+
+	for i, message := range hec.messages {
+		if event, err := message.EventAsMap(); err != nil {
+			t.Fatal(err)
+		} else {
+			if event["line"] != fmt.Sprintf("%d", i) {
+				t.Fatalf("Unexpected event in message %v", event)
+			}
+		}
+	}
+
+	// 1 to verify connection and 10 to verify that we have sent messages with required frequency,
+	// but because frequency is too small (to keep test quick), instead of 11, use 9 if context switches will be slow
+	if hec.numOfRequests < 9 {
+		t.Fatalf("Unexpected number of requests %d", hec.numOfRequests)
+	}
+
+	err = hec.Close()
+	if err != nil {
+		t.Fatal(err)
+	}
+
+	if err := os.Setenv(envVarPostMessagesFrequency, ""); err != nil {
+		t.Fatal(err)
+	}
+}
+
+// Simulate behavior similar to first version of Splunk Logging Driver, when we were sending one message
+// per request
+func TestOneMessagePerRequest(t *testing.T) {
+	if err := os.Setenv(envVarPostMessagesFrequency, "10h"); err != nil {
+		t.Fatal(err)
+	}
+
+	if err := os.Setenv(envVarPostMessagesBatchSize, "1"); err != nil {
+		t.Fatal(err)
+	}
+
+	if err := os.Setenv(envVarBufferMaximum, "1"); err != nil {
+		t.Fatal(err)
+	}
+
+	if err := os.Setenv(envVarStreamChannelSize, "0"); err != nil {
+		t.Fatal(err)
+	}
+
+	hec := NewHTTPEventCollectorMock(t)
+
+	go hec.Serve()
+
+	ctx := logger.Context{
+		Config: map[string]string{
+			splunkURLKey:   hec.URL(),
+			splunkTokenKey: hec.token,
+		},
+		ContainerID:        "containeriid",
+		ContainerName:      "/container_name",
+		ContainerImageID:   "contaimageid",
+		ContainerImageName: "container_image_name",
+	}
+
+	loggerDriver, err := New(ctx)
+	if err != nil {
+		t.Fatal(err)
+	}
+
+	for i := 0; i < 10; i++ {
+		if err := loggerDriver.Log(&logger.Message{[]byte(fmt.Sprintf("%d", i)), "stdout", time.Now(), nil, false}); err != nil {
+			t.Fatal(err)
+		}
+	}
+
+	err = loggerDriver.Close()
+	if err != nil {
+		t.Fatal(err)
+	}
+
+	if len(hec.messages) != 10 {
+		t.Fatal("Not all messages delivered")
+	}
+
+	for i, message := range hec.messages {
+		if event, err := message.EventAsMap(); err != nil {
+			t.Fatal(err)
+		} else {
+			if event["line"] != fmt.Sprintf("%d", i) {
+				t.Fatalf("Unexpected event in message %v", event)
+			}
+		}
+	}
+
+	// 1 to verify connection and 10 messages
+	if hec.numOfRequests != 11 {
+		t.Fatalf("Unexpected number of requests %d", hec.numOfRequests)
+	}
+
+	err = hec.Close()
+	if err != nil {
+		t.Fatal(err)
+	}
+
+	if err := os.Setenv(envVarPostMessagesFrequency, ""); err != nil {
+		t.Fatal(err)
+	}
+
+	if err := os.Setenv(envVarPostMessagesBatchSize, ""); err != nil {
+		t.Fatal(err)
+	}
+
+	if err := os.Setenv(envVarBufferMaximum, ""); err != nil {
+		t.Fatal(err)
+	}
+
+	if err := os.Setenv(envVarStreamChannelSize, ""); err != nil {
+		t.Fatal(err)
+	}
+}
+
+// Driver should not be created when HEC is unresponsive
+func TestVerify(t *testing.T) {
+	hec := NewHTTPEventCollectorMock(t)
+	hec.simulateServerError = true
+	go hec.Serve()
+
+	ctx := logger.Context{
+		Config: map[string]string{
+			splunkURLKey:   hec.URL(),
+			splunkTokenKey: hec.token,
+		},
+		ContainerID:        "containeriid",
+		ContainerName:      "/container_name",
+		ContainerImageID:   "contaimageid",
+		ContainerImageName: "container_image_name",
+	}
+
+	_, err := New(ctx)
+	if err == nil {
+		t.Fatal("Expecting driver to fail, when server is unresponsive")
+	}
+
+	err = hec.Close()
+	if err != nil {
+		t.Fatal(err)
+	}
+}
+
+// Verify that user can specify to skip verification that Splunk HEC is working.
+// Also in this test we verify retry logic.
+func TestSkipVerify(t *testing.T) {
+	hec := NewHTTPEventCollectorMock(t)
+	hec.simulateServerError = true
+	go hec.Serve()
+
+	ctx := logger.Context{
+		Config: map[string]string{
+			splunkURLKey:              hec.URL(),
+			splunkTokenKey:            hec.token,
+			splunkVerifyConnectionKey: "false",
+		},
+		ContainerID:        "containeriid",
+		ContainerName:      "/container_name",
+		ContainerImageID:   "contaimageid",
+		ContainerImageName: "container_image_name",
+	}
+
+	loggerDriver, err := New(ctx)
+	if err != nil {
+		t.Fatal(err)
+	}
+
+	if hec.connectionVerified {
+		t.Fatal("Connection should not be verified")
+	}
+
+	for i := 0; i < defaultStreamChannelSize*2; i++ {
+		if err := loggerDriver.Log(&logger.Message{[]byte(fmt.Sprintf("%d", i)), "stdout", time.Now(), nil, false}); err != nil {
+			t.Fatal(err)
+		}
+	}
+
+	if len(hec.messages) != 0 {
+		t.Fatal("No messages should be accepted at this point")
+	}
+
+	hec.simulateServerError = false
+
+	for i := defaultStreamChannelSize * 2; i < defaultStreamChannelSize*4; i++ {
+		if err := loggerDriver.Log(&logger.Message{[]byte(fmt.Sprintf("%d", i)), "stdout", time.Now(), nil, false}); err != nil {
+			t.Fatal(err)
+		}
+	}
+
+	err = loggerDriver.Close()
+	if err != nil {
+		t.Fatal(err)
+	}
+
+	if len(hec.messages) != defaultStreamChannelSize*4 {
+		t.Fatal("Not all messages delivered")
+	}
+
+	for i, message := range hec.messages {
+		if event, err := message.EventAsMap(); err != nil {
+			t.Fatal(err)
+		} else {
+			if event["line"] != fmt.Sprintf("%d", i) {
+				t.Fatalf("Unexpected event in message %v", event)
+			}
+		}
+	}
+
+	err = hec.Close()
+	if err != nil {
+		t.Fatal(err)
+	}
+}
+
+// Verify logic for when we filled whole buffer
+func TestBufferMaximum(t *testing.T) {
+	if err := os.Setenv(envVarPostMessagesBatchSize, "2"); err != nil {
+		t.Fatal(err)
+	}
+
+	if err := os.Setenv(envVarBufferMaximum, "10"); err != nil {
+		t.Fatal(err)
+	}
+
+	if err := os.Setenv(envVarStreamChannelSize, "0"); err != nil {
+		t.Fatal(err)
+	}
+
+	hec := NewHTTPEventCollectorMock(t)
+	hec.simulateServerError = true
+	go hec.Serve()
+
+	ctx := logger.Context{
+		Config: map[string]string{
+			splunkURLKey:              hec.URL(),
+			splunkTokenKey:            hec.token,
+			splunkVerifyConnectionKey: "false",
+		},
+		ContainerID:        "containeriid",
+		ContainerName:      "/container_name",
+		ContainerImageID:   "contaimageid",
+		ContainerImageName: "container_image_name",
+	}
+
+	loggerDriver, err := New(ctx)
+	if err != nil {
+		t.Fatal(err)
+	}
+
+	if hec.connectionVerified {
+		t.Fatal("Connection should not be verified")
+	}
+
+	for i := 0; i < 11; i++ {
+		if err := loggerDriver.Log(&logger.Message{[]byte(fmt.Sprintf("%d", i)), "stdout", time.Now(), nil, false}); err != nil {
+			t.Fatal(err)
+		}
+	}
+
+	if len(hec.messages) != 0 {
+		t.Fatal("No messages should be accepted at this point")
+	}
+
+	hec.simulateServerError = false
+
+	err = loggerDriver.Close()
+	if err != nil {
+		t.Fatal(err)
+	}
+
+	if len(hec.messages) != 9 {
+		t.Fatalf("Expected # of messages %d, got %d", 9, len(hec.messages))
+	}
+
+	// First 1000 messages are written to daemon log when buffer was full
+	for i, message := range hec.messages {
+		if event, err := message.EventAsMap(); err != nil {
+			t.Fatal(err)
+		} else {
+			if event["line"] != fmt.Sprintf("%d", i+2) {
+				t.Fatalf("Unexpected event in message %v", event)
+			}
+		}
+	}
+
+	err = hec.Close()
+	if err != nil {
+		t.Fatal(err)
+	}
+
+	if err := os.Setenv(envVarPostMessagesBatchSize, ""); err != nil {
+		t.Fatal(err)
+	}
+
+	if err := os.Setenv(envVarBufferMaximum, ""); err != nil {
+		t.Fatal(err)
+	}
+
+	if err := os.Setenv(envVarStreamChannelSize, ""); err != nil {
+		t.Fatal(err)
+	}
+}
+
+// Verify that we are not blocking close when HEC is down for the whole time
+func TestServerAlwaysDown(t *testing.T) {
+	if err := os.Setenv(envVarPostMessagesBatchSize, "2"); err != nil {
+		t.Fatal(err)
+	}
+
+	if err := os.Setenv(envVarBufferMaximum, "4"); err != nil {
+		t.Fatal(err)
+	}
+
+	if err := os.Setenv(envVarStreamChannelSize, "0"); err != nil {
+		t.Fatal(err)
+	}
+
+	hec := NewHTTPEventCollectorMock(t)
+	hec.simulateServerError = true
+	go hec.Serve()
+
+	ctx := logger.Context{
+		Config: map[string]string{
+			splunkURLKey:              hec.URL(),
+			splunkTokenKey:            hec.token,
+			splunkVerifyConnectionKey: "false",
+		},
+		ContainerID:        "containeriid",
+		ContainerName:      "/container_name",
+		ContainerImageID:   "contaimageid",
+		ContainerImageName: "container_image_name",
+	}
+
+	loggerDriver, err := New(ctx)
+	if err != nil {
+		t.Fatal(err)
+	}
+
+	if hec.connectionVerified {
+		t.Fatal("Connection should not be verified")
+	}
+
+	for i := 0; i < 5; i++ {
+		if err := loggerDriver.Log(&logger.Message{[]byte(fmt.Sprintf("%d", i)), "stdout", time.Now(), nil, false}); err != nil {
+			t.Fatal(err)
+		}
+	}
+
+	err = loggerDriver.Close()
+	if err != nil {
+		t.Fatal(err)
+	}
+
+	if len(hec.messages) != 0 {
+		t.Fatal("No messages should be sent")
+	}
+
+	err = hec.Close()
+	if err != nil {
+		t.Fatal(err)
+	}
+
+	if err := os.Setenv(envVarPostMessagesBatchSize, ""); err != nil {
+		t.Fatal(err)
+	}
+
+	if err := os.Setenv(envVarBufferMaximum, ""); err != nil {
+		t.Fatal(err)
+	}
+
+	if err := os.Setenv(envVarStreamChannelSize, ""); err != nil {
+		t.Fatal(err)
+	}
+}
+
+// Cannot send messages after we close driver
+func TestCannotSendAfterClose(t *testing.T) {
+	hec := NewHTTPEventCollectorMock(t)
+	go hec.Serve()
+
+	ctx := logger.Context{
+		Config: map[string]string{
+			splunkURLKey:   hec.URL(),
+			splunkTokenKey: hec.token,
+		},
+		ContainerID:        "containeriid",
+		ContainerName:      "/container_name",
+		ContainerImageID:   "contaimageid",
+		ContainerImageName: "container_image_name",
+	}
+
+	loggerDriver, err := New(ctx)
+	if err != nil {
+		t.Fatal(err)
+	}
+
+	if err := loggerDriver.Log(&logger.Message{[]byte("message1"), "stdout", time.Now(), nil, false}); err != nil {
+		t.Fatal(err)
+	}
+
+	err = loggerDriver.Close()
+	if err != nil {
+		t.Fatal(err)
+	}
+
+	if err := loggerDriver.Log(&logger.Message{[]byte("message2"), "stdout", time.Now(), nil, false}); err == nil {
+		t.Fatal("Driver should not allow to send messages after close")
+	}
+
+	if len(hec.messages) != 1 {
+		t.Fatal("Only one message should be sent")
+	}
+
+	message := hec.messages[0]
+	if event, err := message.EventAsMap(); err != nil {
+		t.Fatal(err)
+	} else {
+		if event["line"] != "message1" {
+			t.Fatalf("Unexpected event in message %v", event)
+		}
+	}
+
+	err = hec.Close()
+	if err != nil {
+		t.Fatal(err)
+	}
+}

+ 157 - 0
daemon/logger/splunk/splunkhecmock_test.go

@@ -0,0 +1,157 @@
+package splunk
+
+import (
+	"compress/gzip"
+	"encoding/json"
+	"fmt"
+	"io"
+	"io/ioutil"
+	"net"
+	"net/http"
+	"testing"
+)
+
+func (message *splunkMessage) EventAsString() (string, error) {
+	if val, ok := message.Event.(string); ok {
+		return val, nil
+	}
+	return "", fmt.Errorf("Cannot cast Event %v to string", message.Event)
+}
+
+func (message *splunkMessage) EventAsMap() (map[string]interface{}, error) {
+	if val, ok := message.Event.(map[string]interface{}); ok {
+		return val, nil
+	}
+	return nil, fmt.Errorf("Cannot cast Event %v to map", message.Event)
+}
+
+type HTTPEventCollectorMock struct {
+	tcpAddr     *net.TCPAddr
+	tcpListener *net.TCPListener
+
+	token               string
+	simulateServerError bool
+
+	test *testing.T
+
+	connectionVerified bool
+	gzipEnabled        *bool
+	messages           []*splunkMessage
+	numOfRequests      int
+}
+
+func NewHTTPEventCollectorMock(t *testing.T) *HTTPEventCollectorMock {
+	tcpAddr := &net.TCPAddr{IP: []byte{127, 0, 0, 1}, Port: 0, Zone: ""}
+	tcpListener, err := net.ListenTCP("tcp", tcpAddr)
+	if err != nil {
+		t.Fatal(err)
+	}
+	return &HTTPEventCollectorMock{
+		tcpAddr:             tcpAddr,
+		tcpListener:         tcpListener,
+		token:               "4642492F-D8BD-47F1-A005-0C08AE4657DF",
+		simulateServerError: false,
+		test:                t,
+		connectionVerified:  false}
+}
+
+func (hec *HTTPEventCollectorMock) URL() string {
+	return "http://" + hec.tcpListener.Addr().String()
+}
+
+func (hec *HTTPEventCollectorMock) Serve() error {
+	return http.Serve(hec.tcpListener, hec)
+}
+
+func (hec *HTTPEventCollectorMock) Close() error {
+	return hec.tcpListener.Close()
+}
+
+func (hec *HTTPEventCollectorMock) ServeHTTP(writer http.ResponseWriter, request *http.Request) {
+	var err error
+
+	hec.numOfRequests++
+
+	if hec.simulateServerError {
+		if request.Body != nil {
+			defer request.Body.Close()
+		}
+		writer.WriteHeader(http.StatusInternalServerError)
+		return
+	}
+
+	switch request.Method {
+	case http.MethodOptions:
+		// Verify that options method is getting called only once
+		if hec.connectionVerified {
+			hec.test.Errorf("Connection should not be verified more than once. Got second request with %s method.", request.Method)
+		}
+		hec.connectionVerified = true
+		writer.WriteHeader(http.StatusOK)
+	case http.MethodPost:
+		// Always verify that Driver is using correct path to HEC
+		if request.URL.String() != "/services/collector/event/1.0" {
+			hec.test.Errorf("Unexpected path %v", request.URL)
+		}
+		defer request.Body.Close()
+
+		if authorization, ok := request.Header["Authorization"]; !ok || authorization[0] != ("Splunk "+hec.token) {
+			hec.test.Error("Authorization header is invalid.")
+		}
+
+		gzipEnabled := false
+		if contentEncoding, ok := request.Header["Content-Encoding"]; ok && contentEncoding[0] == "gzip" {
+			gzipEnabled = true
+		}
+
+		if hec.gzipEnabled == nil {
+			hec.gzipEnabled = &gzipEnabled
+		} else if gzipEnabled != *hec.gzipEnabled {
+			// Nothing wrong with that, but we just know that Splunk Logging Driver does not do that
+			hec.test.Error("Driver should not change Content Encoding.")
+		}
+
+		var gzipReader *gzip.Reader
+		var reader io.Reader
+		if gzipEnabled {
+			gzipReader, err = gzip.NewReader(request.Body)
+			if err != nil {
+				hec.test.Fatal(err)
+			}
+			reader = gzipReader
+		} else {
+			reader = request.Body
+		}
+
+		// Read body
+		var body []byte
+		body, err = ioutil.ReadAll(reader)
+		if err != nil {
+			hec.test.Fatal(err)
+		}
+
+		// Parse message
+		messageStart := 0
+		for i := 0; i < len(body); i++ {
+			if i == len(body)-1 || (body[i] == '}' && body[i+1] == '{') {
+				var message splunkMessage
+				err = json.Unmarshal(body[messageStart:i+1], &message)
+				if err != nil {
+					hec.test.Log(string(body[messageStart : i+1]))
+					hec.test.Fatal(err)
+				}
+				hec.messages = append(hec.messages, &message)
+				messageStart = i + 1
+			}
+		}
+
+		if gzipEnabled {
+			gzipReader.Close()
+		}
+
+		writer.WriteHeader(http.StatusOK)
+	default:
+		hec.test.Errorf("Unexpected HTTP method %s", http.MethodOptions)
+		writer.WriteHeader(http.StatusBadRequest)
+	}
+}

+ 28 - 15
docs/admin/logging/splunk.md

@@ -32,21 +32,23 @@ You can set the logging driver for a specific container by using the
 You can use the `--log-opt NAME=VALUE` flag to specify these additional Splunk
 logging driver options:
 
-| Option                      | Required | Description                                                                                                                                                                                                        |
-|-----------------------------|----------|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
-| `splunk-token`              | required | Splunk HTTP Event Collector token.                                                                                                                                                                                 |
-| `splunk-url`                | required | Path to your Splunk Enterprise or Splunk Cloud instance (including port and scheme used by HTTP Event Collector) `https://your_splunk_instance:8088`.                                                              |
-| `splunk-source`             | optional | Event source.                                                                                                                                                                                                      |
-| `splunk-sourcetype`         | optional | Event source type.                                                                                                                                                                                                 |
-| `splunk-index`              | optional | Event index.                                                                                                                                                                                                       |
-| `splunk-capath`             | optional | Path to root certificate.                                                                                                                                                                                          |
-| `splunk-caname`             | optional | Name to use for validating server certificate; by default the hostname of the `splunk-url` will be used.                                                                                                           |
-| `splunk-insecureskipverify` | optional | Ignore server certificate validation.                                                                                                                                                                              |
-| `splunk-format`             | optional | Message format. Can be `inline`, `json` or `raw`. Defaults to `inline`.                                                                                                                                            |
-| `splunk-verify-connection`   | optional | Verify on start, that docker can connect to Splunk server. Defaults to true.                                                                                                                                       |
-| `tag`                       | optional | Specify tag for message, which interpret some markup. Default value is `{{.ID}}` (12 characters of the container ID). Refer to the [log tag option documentation](log_tags.md) for customizing the log tag format. |
-| `labels`                    | optional | Comma-separated list of keys of labels, which should be included in message, if these labels are specified for container.                                                                                          |
-| `env`                       | optional | Comma-separated list of keys of environment variables, which should be included in message, if these variables are specified for container.                                                                        |
+| Option                      | Required | Description                                                                                                                                                                                                             |
+|-----------------------------|----------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
+| `splunk-token`              | required | Splunk HTTP Event Collector token.                                                                                                                                                                                      |
+| `splunk-url`                | required | Path to your Splunk Enterprise or Splunk Cloud instance (including port and scheme used by HTTP Event Collector) `https://your_splunk_instance:8088`.                                                                   |
+| `splunk-source`             | optional | Event source.                                                                                                                                                                                                           |
+| `splunk-sourcetype`         | optional | Event source type.                                                                                                                                                                                                      |
+| `splunk-index`              | optional | Event index.                                                                                                                                                                                                            |
+| `splunk-capath`             | optional | Path to root certificate.                                                                                                                                                                                               |
+| `splunk-caname`             | optional | Name to use for validating server certificate; by default the hostname of the `splunk-url` will be used.                                                                                                                |
+| `splunk-insecureskipverify` | optional | Ignore server certificate validation.                                                                                                                                                                                   |
+| `splunk-format`             | optional | Message format. Can be `inline`, `json` or `raw`. Defaults to `inline`.                                                                                                                                                 |
+| `splunk-verify-connection`  | optional | Verify on start, that docker can connect to Splunk server. Defaults to true.                                                                                                                                            |
+| `splunk-gzip`               | optional | Enable/disable gzip compression to send events to Splunk Enterprise or Splunk Cloud instance. Defaults to false.                                                                                                         |
+| `splunk-gzip-level`         | optional | Set compression level for gzip. Valid values are -1 (default), 0 (no compression), 1 (best speed) ... 9 (best compression). Defaults to [DefaultCompression](https://golang.org/pkg/compress/gzip/#DefaultCompression). |
+| `tag`                       | optional | Specify tag for message, which interpret some markup. Default value is `{{.ID}}` (12 characters of the container ID). Refer to the [log tag option documentation](log_tags.md) for customizing the log tag format.      |
+| `labels`                    | optional | Comma-separated list of keys of labels, which should be included in message, if these labels are specified for container.                                                                                               |
+| `env`                       | optional | Comma-separated list of keys of environment variables, which should be included in message, if these variables are specified for container.                                                                             |
 
 If there is collision between `label` and `env` keys, the value of the `env` takes precedence.
 Both options add additional fields to the attributes of a logging message.
@@ -132,3 +134,14 @@ tag will be prefixed to the message. For example
 MyImage/MyContainer env1=val1 label1=label1 my message
 MyImage/MyContainer env1=val1 label1=label1 {"foo": "bar"}
 ```
+
+## Advanced options
+
+Splunk Logging Driver allows you to configure few advanced options by specifying next environment variables for the Docker daemon.
+
+| Environment variable name                        | Default value | Description                                                                                                                                        |
+|--------------------------------------------------|---------------|----------------------------------------------------------------------------------------------------------------------------------------------------|
+| `SPLUNK_LOGGING_DRIVER_POST_MESSAGES_FREQUENCY`  | `5s`          | If there is nothing to batch how often driver will post messages. You can think about this as the maximum time to wait for more messages to batch. |
+| `SPLUNK_LOGGING_DRIVER_POST_MESSAGES_BATCH_SIZE` | `1000`        | How many messages driver should wait before sending them in one batch.                                                                             |
+| `SPLUNK_LOGGING_DRIVER_BUFFER_MAX`               | `10 * 1000`   | If driver cannot connect to remote server, what is the maximum amount of messages it can hold in buffer for retries.                               |
+| `SPLUNK_LOGGING_DRIVER_CHANNEL_SIZE`             | `4 * 1000`    | How many pending messages can be in the channel which is used to send messages to background logger worker, which batches them.                    |