12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313 |
- package splunk // import "github.com/docker/docker/daemon/logger/splunk"
- import (
- "compress/gzip"
- "context"
- "fmt"
- "net/http"
- "runtime"
- "strconv"
- "testing"
- "time"
- "github.com/docker/docker/daemon/logger"
- "gotest.tools/v3/assert"
- )
- // Validate options
- func TestValidateLogOpt(t *testing.T) {
- err := ValidateLogOpt(map[string]string{
- splunkURLKey: "http://127.0.0.1",
- splunkTokenKey: "2160C7EF-2CE9-4307-A180-F852B99CF417",
- splunkSourceKey: "mysource",
- splunkSourceTypeKey: "mysourcetype",
- splunkIndexKey: "myindex",
- splunkCAPathKey: "/usr/cert.pem",
- splunkCANameKey: "ca_name",
- splunkInsecureSkipVerifyKey: "true",
- splunkFormatKey: "json",
- splunkVerifyConnectionKey: "true",
- splunkGzipCompressionKey: "true",
- splunkGzipCompressionLevelKey: "1",
- envKey: "a",
- envRegexKey: "^foo",
- labelsKey: "b",
- tagKey: "c",
- })
- if err != nil {
- t.Fatal(err)
- }
- err = ValidateLogOpt(map[string]string{
- "not-supported-option": "a",
- })
- if err == nil {
- t.Fatal("Expecting error on unsupported options")
- }
- }
- // Driver require user to specify required options
- func TestNewMissedConfig(t *testing.T) {
- info := logger.Info{
- Config: map[string]string{},
- }
- _, err := New(info)
- if err == nil {
- t.Fatal("Logger driver should fail when no required parameters specified")
- }
- }
- // Driver require user to specify splunk-url
- func TestNewMissedUrl(t *testing.T) {
- info := logger.Info{
- Config: map[string]string{
- splunkTokenKey: "4642492F-D8BD-47F1-A005-0C08AE4657DF",
- },
- }
- _, err := New(info)
- if err.Error() != "splunk: splunk-url is expected" {
- t.Fatal("Logger driver should fail when no required parameters specified")
- }
- }
- // Driver require user to specify splunk-token
- func TestNewMissedToken(t *testing.T) {
- info := logger.Info{
- Config: map[string]string{
- splunkURLKey: "http://127.0.0.1:8088",
- },
- }
- _, err := New(info)
- if err.Error() != "splunk: splunk-token is expected" {
- t.Fatal("Logger driver should fail when no required parameters specified")
- }
- }
- func TestNewWithProxy(t *testing.T) {
- proxy := "http://proxy.testing:8888"
- t.Setenv("HTTP_PROXY", proxy)
- // must not be localhost
- splunkURL := "http://example.com:12345"
- logger, err := New(logger.Info{
- Config: map[string]string{
- splunkURLKey: splunkURL,
- splunkTokenKey: "token",
- splunkVerifyConnectionKey: "false",
- },
- ContainerID: "containeriid",
- })
- assert.NilError(t, err)
- splunkLogger := logger.(*splunkLoggerInline)
- proxyFunc := splunkLogger.transport.Proxy
- assert.Assert(t, proxyFunc != nil)
- req, err := http.NewRequest(http.MethodGet, splunkURL, nil)
- assert.NilError(t, err)
- proxyURL, err := proxyFunc(req)
- assert.NilError(t, err)
- assert.Assert(t, proxyURL != nil)
- assert.Equal(t, proxy, proxyURL.String())
- }
- // Test default settings
- func TestDefault(t *testing.T) {
- hec := NewHTTPEventCollectorMock(t)
- go hec.Serve()
- info := logger.Info{
- Config: map[string]string{
- splunkURLKey: hec.URL(),
- splunkTokenKey: hec.token,
- },
- ContainerID: "containeriid",
- ContainerName: "container_name",
- ContainerImageID: "contaimageid",
- ContainerImageName: "container_image_name",
- }
- hostname, err := info.Hostname()
- if err != nil {
- t.Fatal(err)
- }
- loggerDriver, err := New(info)
- if err != nil {
- t.Fatal(err)
- }
- if loggerDriver.Name() != driverName {
- t.Fatal("Unexpected logger driver name")
- }
- if !hec.connectionVerified {
- t.Fatal("By default connection should be verified")
- }
- splunkLoggerDriver, ok := loggerDriver.(*splunkLoggerInline)
- if !ok {
- t.Fatal("Unexpected Splunk Logging Driver type")
- }
- if splunkLoggerDriver.url != hec.URL()+"/services/collector/event/1.0" ||
- splunkLoggerDriver.auth != "Splunk "+hec.token ||
- splunkLoggerDriver.nullMessage.Host != hostname ||
- splunkLoggerDriver.nullMessage.Source != "" ||
- splunkLoggerDriver.nullMessage.SourceType != "" ||
- splunkLoggerDriver.nullMessage.Index != "" ||
- splunkLoggerDriver.gzipCompression ||
- splunkLoggerDriver.postMessagesFrequency != defaultPostMessagesFrequency ||
- splunkLoggerDriver.postMessagesBatchSize != defaultPostMessagesBatchSize ||
- splunkLoggerDriver.bufferMaximum != defaultBufferMaximum ||
- cap(splunkLoggerDriver.stream) != defaultStreamChannelSize {
- t.Fatal("Found not default values setup in Splunk Logging Driver.")
- }
- message1Time := time.Now()
- if err := loggerDriver.Log(&logger.Message{Line: []byte("{\"a\":\"b\"}"), Source: "stdout", Timestamp: message1Time}); err != nil {
- t.Fatal(err)
- }
- message2Time := time.Now()
- if err := loggerDriver.Log(&logger.Message{Line: []byte("notajson"), Source: "stdout", Timestamp: message2Time}); err != nil {
- t.Fatal(err)
- }
- err = loggerDriver.Close()
- if err != nil {
- t.Fatal(err)
- }
- if len(hec.messages) != 2 {
- t.Fatal("Expected two messages")
- }
- if *hec.gzipEnabled {
- t.Fatal("Gzip should not be used")
- }
- message1 := hec.messages[0]
- if message1.Time != fmt.Sprintf("%f", float64(message1Time.UnixNano())/float64(time.Second)) ||
- message1.Host != hostname ||
- message1.Source != "" ||
- message1.SourceType != "" ||
- message1.Index != "" {
- t.Fatalf("Unexpected values of message 1 %v", message1)
- }
- if event, err := message1.EventAsMap(); err != nil {
- t.Fatal(err)
- } else {
- if event["line"] != "{\"a\":\"b\"}" ||
- event["source"] != "stdout" ||
- event["tag"] != "containeriid" ||
- len(event) != 3 {
- t.Fatalf("Unexpected event in message %v", event)
- }
- }
- message2 := hec.messages[1]
- if message2.Time != fmt.Sprintf("%f", float64(message2Time.UnixNano())/float64(time.Second)) ||
- message2.Host != hostname ||
- message2.Source != "" ||
- message2.SourceType != "" ||
- message2.Index != "" {
- t.Fatalf("Unexpected values of message 1 %v", message2)
- }
- if event, err := message2.EventAsMap(); err != nil {
- t.Fatal(err)
- } else {
- if event["line"] != "notajson" ||
- event["source"] != "stdout" ||
- event["tag"] != "containeriid" ||
- len(event) != 3 {
- t.Fatalf("Unexpected event in message %v", event)
- }
- }
- err = hec.Close()
- if err != nil {
- t.Fatal(err)
- }
- }
- // Verify inline format with a not default settings for most of options
- func TestInlineFormatWithNonDefaultOptions(t *testing.T) {
- hec := NewHTTPEventCollectorMock(t)
- go hec.Serve()
- info := logger.Info{
- Config: map[string]string{
- splunkURLKey: hec.URL(),
- splunkTokenKey: hec.token,
- splunkSourceKey: "mysource",
- splunkSourceTypeKey: "mysourcetype",
- splunkIndexKey: "myindex",
- splunkFormatKey: splunkFormatInline,
- splunkGzipCompressionKey: "true",
- tagKey: "{{.ImageName}}/{{.Name}}",
- labelsKey: "a",
- envRegexKey: "^foo",
- },
- ContainerID: "containeriid",
- ContainerName: "/container_name",
- ContainerImageID: "contaimageid",
- ContainerImageName: "container_image_name",
- ContainerLabels: map[string]string{
- "a": "b",
- },
- ContainerEnv: []string{"foo_finder=bar"},
- }
- hostname, err := info.Hostname()
- if err != nil {
- t.Fatal(err)
- }
- loggerDriver, err := New(info)
- if err != nil {
- t.Fatal(err)
- }
- if !hec.connectionVerified {
- t.Fatal("By default connection should be verified")
- }
- splunkLoggerDriver, ok := loggerDriver.(*splunkLoggerInline)
- if !ok {
- t.Fatal("Unexpected Splunk Logging Driver type")
- }
- if splunkLoggerDriver.url != hec.URL()+"/services/collector/event/1.0" ||
- splunkLoggerDriver.auth != "Splunk "+hec.token ||
- splunkLoggerDriver.nullMessage.Host != hostname ||
- splunkLoggerDriver.nullMessage.Source != "mysource" ||
- splunkLoggerDriver.nullMessage.SourceType != "mysourcetype" ||
- splunkLoggerDriver.nullMessage.Index != "myindex" ||
- !splunkLoggerDriver.gzipCompression ||
- splunkLoggerDriver.gzipCompressionLevel != gzip.DefaultCompression ||
- splunkLoggerDriver.postMessagesFrequency != defaultPostMessagesFrequency ||
- splunkLoggerDriver.postMessagesBatchSize != defaultPostMessagesBatchSize ||
- splunkLoggerDriver.bufferMaximum != defaultBufferMaximum ||
- cap(splunkLoggerDriver.stream) != defaultStreamChannelSize {
- t.Fatal("Values do not match configuration.")
- }
- messageTime := time.Now()
- if err := loggerDriver.Log(&logger.Message{Line: []byte("1"), Source: "stdout", Timestamp: messageTime}); err != nil {
- t.Fatal(err)
- }
- err = loggerDriver.Close()
- if err != nil {
- t.Fatal(err)
- }
- if len(hec.messages) != 1 {
- t.Fatal("Expected one message")
- }
- if !*hec.gzipEnabled {
- t.Fatal("Gzip should be used")
- }
- message := hec.messages[0]
- if message.Time != fmt.Sprintf("%f", float64(messageTime.UnixNano())/float64(time.Second)) ||
- message.Host != hostname ||
- message.Source != "mysource" ||
- message.SourceType != "mysourcetype" ||
- message.Index != "myindex" {
- t.Fatalf("Unexpected values of message %v", message)
- }
- if event, err := message.EventAsMap(); err != nil {
- t.Fatal(err)
- } else {
- if event["line"] != "1" ||
- event["source"] != "stdout" ||
- event["tag"] != "container_image_name/container_name" ||
- event["attrs"].(map[string]interface{})["a"] != "b" ||
- event["attrs"].(map[string]interface{})["foo_finder"] != "bar" ||
- len(event) != 4 {
- t.Fatalf("Unexpected event in message %v", event)
- }
- }
- err = hec.Close()
- if err != nil {
- t.Fatal(err)
- }
- }
- // Verify JSON format
- func TestJsonFormat(t *testing.T) {
- hec := NewHTTPEventCollectorMock(t)
- go hec.Serve()
- info := logger.Info{
- Config: map[string]string{
- splunkURLKey: hec.URL(),
- splunkTokenKey: hec.token,
- splunkFormatKey: splunkFormatJSON,
- splunkGzipCompressionKey: "true",
- splunkGzipCompressionLevelKey: "1",
- },
- ContainerID: "containeriid",
- ContainerName: "/container_name",
- ContainerImageID: "contaimageid",
- ContainerImageName: "container_image_name",
- }
- hostname, err := info.Hostname()
- if err != nil {
- t.Fatal(err)
- }
- loggerDriver, err := New(info)
- if err != nil {
- t.Fatal(err)
- }
- if !hec.connectionVerified {
- t.Fatal("By default connection should be verified")
- }
- splunkLoggerDriver, ok := loggerDriver.(*splunkLoggerJSON)
- if !ok {
- t.Fatal("Unexpected Splunk Logging Driver type")
- }
- if splunkLoggerDriver.url != hec.URL()+"/services/collector/event/1.0" ||
- splunkLoggerDriver.auth != "Splunk "+hec.token ||
- splunkLoggerDriver.nullMessage.Host != hostname ||
- splunkLoggerDriver.nullMessage.Source != "" ||
- splunkLoggerDriver.nullMessage.SourceType != "" ||
- splunkLoggerDriver.nullMessage.Index != "" ||
- !splunkLoggerDriver.gzipCompression ||
- splunkLoggerDriver.gzipCompressionLevel != gzip.BestSpeed ||
- splunkLoggerDriver.postMessagesFrequency != defaultPostMessagesFrequency ||
- splunkLoggerDriver.postMessagesBatchSize != defaultPostMessagesBatchSize ||
- splunkLoggerDriver.bufferMaximum != defaultBufferMaximum ||
- cap(splunkLoggerDriver.stream) != defaultStreamChannelSize {
- t.Fatal("Values do not match configuration.")
- }
- message1Time := time.Now()
- if err := loggerDriver.Log(&logger.Message{Line: []byte("{\"a\":\"b\"}"), Source: "stdout", Timestamp: message1Time}); err != nil {
- t.Fatal(err)
- }
- message2Time := time.Now()
- if err := loggerDriver.Log(&logger.Message{Line: []byte("notjson"), Source: "stdout", Timestamp: message2Time}); err != nil {
- t.Fatal(err)
- }
- err = loggerDriver.Close()
- if err != nil {
- t.Fatal(err)
- }
- if len(hec.messages) != 2 {
- t.Fatal("Expected two messages")
- }
- message1 := hec.messages[0]
- if message1.Time != fmt.Sprintf("%f", float64(message1Time.UnixNano())/float64(time.Second)) ||
- message1.Host != hostname ||
- message1.Source != "" ||
- message1.SourceType != "" ||
- message1.Index != "" {
- t.Fatalf("Unexpected values of message 1 %v", message1)
- }
- if event, err := message1.EventAsMap(); err != nil {
- t.Fatal(err)
- } else {
- if event["line"].(map[string]interface{})["a"] != "b" ||
- event["source"] != "stdout" ||
- event["tag"] != "containeriid" ||
- len(event) != 3 {
- t.Fatalf("Unexpected event in message 1 %v", event)
- }
- }
- message2 := hec.messages[1]
- if message2.Time != fmt.Sprintf("%f", float64(message2Time.UnixNano())/float64(time.Second)) ||
- message2.Host != hostname ||
- message2.Source != "" ||
- message2.SourceType != "" ||
- message2.Index != "" {
- t.Fatalf("Unexpected values of message 2 %v", message2)
- }
- // If message cannot be parsed as JSON - it should be sent as a line
- if event, err := message2.EventAsMap(); err != nil {
- t.Fatal(err)
- } else {
- if event["line"] != "notjson" ||
- event["source"] != "stdout" ||
- event["tag"] != "containeriid" ||
- len(event) != 3 {
- t.Fatalf("Unexpected event in message 2 %v", event)
- }
- }
- err = hec.Close()
- if err != nil {
- t.Fatal(err)
- }
- }
- // Verify raw format
- func TestRawFormat(t *testing.T) {
- hec := NewHTTPEventCollectorMock(t)
- go hec.Serve()
- info := logger.Info{
- Config: map[string]string{
- splunkURLKey: hec.URL(),
- splunkTokenKey: hec.token,
- splunkFormatKey: splunkFormatRaw,
- },
- ContainerID: "containeriid",
- ContainerName: "/container_name",
- ContainerImageID: "contaimageid",
- ContainerImageName: "container_image_name",
- }
- hostname, err := info.Hostname()
- assert.NilError(t, err)
- loggerDriver, err := New(info)
- assert.NilError(t, err)
- if !hec.connectionVerified {
- t.Fatal("By default connection should be verified")
- }
- splunkLoggerDriver, ok := loggerDriver.(*splunkLoggerRaw)
- if !ok {
- t.Fatal("Unexpected Splunk Logging Driver type")
- }
- if splunkLoggerDriver.url != hec.URL()+"/services/collector/event/1.0" ||
- splunkLoggerDriver.auth != "Splunk "+hec.token ||
- splunkLoggerDriver.nullMessage.Host != hostname ||
- splunkLoggerDriver.nullMessage.Source != "" ||
- splunkLoggerDriver.nullMessage.SourceType != "" ||
- splunkLoggerDriver.nullMessage.Index != "" ||
- splunkLoggerDriver.gzipCompression ||
- splunkLoggerDriver.postMessagesFrequency != defaultPostMessagesFrequency ||
- splunkLoggerDriver.postMessagesBatchSize != defaultPostMessagesBatchSize ||
- splunkLoggerDriver.bufferMaximum != defaultBufferMaximum ||
- cap(splunkLoggerDriver.stream) != defaultStreamChannelSize ||
- string(splunkLoggerDriver.prefix) != "containeriid " {
- t.Fatal("Values do not match configuration.")
- }
- message1Time := time.Now()
- if err := loggerDriver.Log(&logger.Message{Line: []byte("{\"a\":\"b\"}"), Source: "stdout", Timestamp: message1Time}); err != nil {
- t.Fatal(err)
- }
- message2Time := time.Now()
- if err := loggerDriver.Log(&logger.Message{Line: []byte("notjson"), Source: "stdout", Timestamp: message2Time}); err != nil {
- t.Fatal(err)
- }
- err = loggerDriver.Close()
- if err != nil {
- t.Fatal(err)
- }
- if len(hec.messages) != 2 {
- t.Fatal("Expected two messages")
- }
- message1 := hec.messages[0]
- if message1.Time != fmt.Sprintf("%f", float64(message1Time.UnixNano())/float64(time.Second)) ||
- message1.Host != hostname ||
- message1.Source != "" ||
- message1.SourceType != "" ||
- message1.Index != "" {
- t.Fatalf("Unexpected values of message 1 %v", message1)
- }
- if event, err := message1.EventAsString(); err != nil {
- t.Fatal(err)
- } else {
- if event != "containeriid {\"a\":\"b\"}" {
- t.Fatalf("Unexpected event in message 1 %v", event)
- }
- }
- message2 := hec.messages[1]
- if message2.Time != fmt.Sprintf("%f", float64(message2Time.UnixNano())/float64(time.Second)) ||
- message2.Host != hostname ||
- message2.Source != "" ||
- message2.SourceType != "" ||
- message2.Index != "" {
- t.Fatalf("Unexpected values of message 2 %v", message2)
- }
- if event, err := message2.EventAsString(); err != nil {
- t.Fatal(err)
- } else {
- if event != "containeriid notjson" {
- t.Fatalf("Unexpected event in message 1 %v", event)
- }
- }
- err = hec.Close()
- if err != nil {
- t.Fatal(err)
- }
- }
- // Verify raw format with labels
- func TestRawFormatWithLabels(t *testing.T) {
- hec := NewHTTPEventCollectorMock(t)
- go hec.Serve()
- info := logger.Info{
- Config: map[string]string{
- splunkURLKey: hec.URL(),
- splunkTokenKey: hec.token,
- splunkFormatKey: splunkFormatRaw,
- labelsKey: "a",
- },
- ContainerID: "containeriid",
- ContainerName: "/container_name",
- ContainerImageID: "contaimageid",
- ContainerImageName: "container_image_name",
- ContainerLabels: map[string]string{
- "a": "b",
- },
- }
- hostname, err := info.Hostname()
- if err != nil {
- t.Fatal(err)
- }
- loggerDriver, err := New(info)
- if err != nil {
- t.Fatal(err)
- }
- if !hec.connectionVerified {
- t.Fatal("By default connection should be verified")
- }
- splunkLoggerDriver, ok := loggerDriver.(*splunkLoggerRaw)
- if !ok {
- t.Fatal("Unexpected Splunk Logging Driver type")
- }
- if splunkLoggerDriver.url != hec.URL()+"/services/collector/event/1.0" ||
- splunkLoggerDriver.auth != "Splunk "+hec.token ||
- splunkLoggerDriver.nullMessage.Host != hostname ||
- splunkLoggerDriver.nullMessage.Source != "" ||
- splunkLoggerDriver.nullMessage.SourceType != "" ||
- splunkLoggerDriver.nullMessage.Index != "" ||
- splunkLoggerDriver.gzipCompression ||
- splunkLoggerDriver.postMessagesFrequency != defaultPostMessagesFrequency ||
- splunkLoggerDriver.postMessagesBatchSize != defaultPostMessagesBatchSize ||
- splunkLoggerDriver.bufferMaximum != defaultBufferMaximum ||
- cap(splunkLoggerDriver.stream) != defaultStreamChannelSize ||
- string(splunkLoggerDriver.prefix) != "containeriid a=b " {
- t.Fatal("Values do not match configuration.")
- }
- message1Time := time.Now()
- if err := loggerDriver.Log(&logger.Message{Line: []byte("{\"a\":\"b\"}"), Source: "stdout", Timestamp: message1Time}); err != nil {
- t.Fatal(err)
- }
- message2Time := time.Now()
- if err := loggerDriver.Log(&logger.Message{Line: []byte("notjson"), Source: "stdout", Timestamp: message2Time}); err != nil {
- t.Fatal(err)
- }
- err = loggerDriver.Close()
- if err != nil {
- t.Fatal(err)
- }
- if len(hec.messages) != 2 {
- t.Fatal("Expected two messages")
- }
- message1 := hec.messages[0]
- if message1.Time != fmt.Sprintf("%f", float64(message1Time.UnixNano())/float64(time.Second)) ||
- message1.Host != hostname ||
- message1.Source != "" ||
- message1.SourceType != "" ||
- message1.Index != "" {
- t.Fatalf("Unexpected values of message 1 %v", message1)
- }
- if event, err := message1.EventAsString(); err != nil {
- t.Fatal(err)
- } else {
- if event != "containeriid a=b {\"a\":\"b\"}" {
- t.Fatalf("Unexpected event in message 1 %v", event)
- }
- }
- message2 := hec.messages[1]
- if message2.Time != fmt.Sprintf("%f", float64(message2Time.UnixNano())/float64(time.Second)) ||
- message2.Host != hostname ||
- message2.Source != "" ||
- message2.SourceType != "" ||
- message2.Index != "" {
- t.Fatalf("Unexpected values of message 2 %v", message2)
- }
- if event, err := message2.EventAsString(); err != nil {
- t.Fatal(err)
- } else {
- if event != "containeriid a=b notjson" {
- t.Fatalf("Unexpected event in message 2 %v", event)
- }
- }
- err = hec.Close()
- if err != nil {
- t.Fatal(err)
- }
- }
- // Verify that Splunk Logging Driver can accept tag="" which will allow to send raw messages
- // in the same way we get them in stdout/stderr
- func TestRawFormatWithoutTag(t *testing.T) {
- hec := NewHTTPEventCollectorMock(t)
- go hec.Serve()
- info := logger.Info{
- Config: map[string]string{
- splunkURLKey: hec.URL(),
- splunkTokenKey: hec.token,
- splunkFormatKey: splunkFormatRaw,
- tagKey: "",
- },
- ContainerID: "containeriid",
- ContainerName: "/container_name",
- ContainerImageID: "contaimageid",
- ContainerImageName: "container_image_name",
- }
- hostname, err := info.Hostname()
- if err != nil {
- t.Fatal(err)
- }
- loggerDriver, err := New(info)
- if err != nil {
- t.Fatal(err)
- }
- if !hec.connectionVerified {
- t.Fatal("By default connection should be verified")
- }
- splunkLoggerDriver, ok := loggerDriver.(*splunkLoggerRaw)
- if !ok {
- t.Fatal("Unexpected Splunk Logging Driver type")
- }
- if splunkLoggerDriver.url != hec.URL()+"/services/collector/event/1.0" ||
- splunkLoggerDriver.auth != "Splunk "+hec.token ||
- splunkLoggerDriver.nullMessage.Host != hostname ||
- splunkLoggerDriver.nullMessage.Source != "" ||
- splunkLoggerDriver.nullMessage.SourceType != "" ||
- splunkLoggerDriver.nullMessage.Index != "" ||
- splunkLoggerDriver.gzipCompression ||
- splunkLoggerDriver.postMessagesFrequency != defaultPostMessagesFrequency ||
- splunkLoggerDriver.postMessagesBatchSize != defaultPostMessagesBatchSize ||
- splunkLoggerDriver.bufferMaximum != defaultBufferMaximum ||
- cap(splunkLoggerDriver.stream) != defaultStreamChannelSize ||
- string(splunkLoggerDriver.prefix) != "" {
- t.Log(string(splunkLoggerDriver.prefix) + "a")
- t.Fatal("Values do not match configuration.")
- }
- message1Time := time.Now()
- if err := loggerDriver.Log(&logger.Message{Line: []byte("{\"a\":\"b\"}"), Source: "stdout", Timestamp: message1Time}); err != nil {
- t.Fatal(err)
- }
- message2Time := time.Now()
- if err := loggerDriver.Log(&logger.Message{Line: []byte("notjson"), Source: "stdout", Timestamp: message2Time}); err != nil {
- t.Fatal(err)
- }
- message3Time := time.Now()
- if err := loggerDriver.Log(&logger.Message{Line: []byte(" "), Source: "stdout", Timestamp: message3Time}); err != nil {
- t.Fatal(err)
- }
- err = loggerDriver.Close()
- if err != nil {
- t.Fatal(err)
- }
- // message3 would have an empty or whitespace only string in the "event" field
- // both of which are not acceptable to HEC
- // thus here we must expect 2 messages, not 3
- if len(hec.messages) != 2 {
- t.Fatal("Expected two messages")
- }
- message1 := hec.messages[0]
- if message1.Time != fmt.Sprintf("%f", float64(message1Time.UnixNano())/float64(time.Second)) ||
- message1.Host != hostname ||
- message1.Source != "" ||
- message1.SourceType != "" ||
- message1.Index != "" {
- t.Fatalf("Unexpected values of message 1 %v", message1)
- }
- if event, err := message1.EventAsString(); err != nil {
- t.Fatal(err)
- } else {
- if event != "{\"a\":\"b\"}" {
- t.Fatalf("Unexpected event in message 1 %v", event)
- }
- }
- message2 := hec.messages[1]
- if message2.Time != fmt.Sprintf("%f", float64(message2Time.UnixNano())/float64(time.Second)) ||
- message2.Host != hostname ||
- message2.Source != "" ||
- message2.SourceType != "" ||
- message2.Index != "" {
- t.Fatalf("Unexpected values of message 2 %v", message2)
- }
- if event, err := message2.EventAsString(); err != nil {
- t.Fatal(err)
- } else {
- if event != "notjson" {
- t.Fatalf("Unexpected event in message 2 %v", event)
- }
- }
- err = hec.Close()
- if err != nil {
- t.Fatal(err)
- }
- }
- // Verify that we will send messages in batches with default batching parameters,
- // but change frequency to be sure that numOfRequests will match expected 17 requests
- func TestBatching(t *testing.T) {
- t.Setenv(envVarPostMessagesFrequency, "10h")
- hec := NewHTTPEventCollectorMock(t)
- go hec.Serve()
- info := logger.Info{
- Config: map[string]string{
- splunkURLKey: hec.URL(),
- splunkTokenKey: hec.token,
- },
- ContainerID: "containeriid",
- ContainerName: "/container_name",
- ContainerImageID: "contaimageid",
- ContainerImageName: "container_image_name",
- }
- loggerDriver, err := New(info)
- if err != nil {
- t.Fatal(err)
- }
- for i := 0; i < defaultStreamChannelSize*4; i++ {
- if err := loggerDriver.Log(&logger.Message{Line: []byte(strconv.Itoa(i)), Source: "stdout", Timestamp: time.Now()}); err != nil {
- t.Fatal(err)
- }
- }
- err = loggerDriver.Close()
- if err != nil {
- t.Fatal(err)
- }
- if len(hec.messages) != defaultStreamChannelSize*4 {
- t.Fatal("Not all messages delivered")
- }
- for i, message := range hec.messages {
- if event, err := message.EventAsMap(); err != nil {
- t.Fatal(err)
- } else {
- if event["line"] != strconv.Itoa(i) {
- t.Fatalf("Unexpected event in message %v", event)
- }
- }
- }
- // 1 to verify connection and 16 batches
- if hec.numOfRequests != 17 {
- t.Fatalf("Unexpected number of requests %d", hec.numOfRequests)
- }
- err = hec.Close()
- if err != nil {
- t.Fatal(err)
- }
- }
- // Verify that test is using time to fire events not rare than specified frequency
- func TestFrequency(t *testing.T) {
- t.Setenv(envVarPostMessagesFrequency, "5ms")
- hec := NewHTTPEventCollectorMock(t)
- go hec.Serve()
- info := logger.Info{
- Config: map[string]string{
- splunkURLKey: hec.URL(),
- splunkTokenKey: hec.token,
- },
- ContainerID: "containeriid",
- ContainerName: "/container_name",
- ContainerImageID: "contaimageid",
- ContainerImageName: "container_image_name",
- }
- loggerDriver, err := New(info)
- if err != nil {
- t.Fatal(err)
- }
- for i := 0; i < 10; i++ {
- if err := loggerDriver.Log(&logger.Message{Line: []byte(strconv.Itoa(i)), Source: "stdout", Timestamp: time.Now()}); err != nil {
- t.Fatal(err)
- }
- time.Sleep(15 * time.Millisecond)
- }
- err = loggerDriver.Close()
- if err != nil {
- t.Fatal(err)
- }
- if len(hec.messages) != 10 {
- t.Fatal("Not all messages delivered")
- }
- for i, message := range hec.messages {
- if event, err := message.EventAsMap(); err != nil {
- t.Fatal(err)
- } else {
- if event["line"] != strconv.Itoa(i) {
- t.Fatalf("Unexpected event in message %v", event)
- }
- }
- }
- // 1 to verify connection and 10 to verify that we have sent messages with required frequency,
- // but because frequency is too small (to keep test quick), instead of 11, use 9 if context switches will be slow
- expectedRequests := 9
- if runtime.GOOS == "windows" {
- // sometimes in Windows, this test fails with number of requests showing 8. So be more conservative.
- expectedRequests = 7
- }
- if hec.numOfRequests < expectedRequests {
- t.Fatalf("Unexpected number of requests %d", hec.numOfRequests)
- }
- err = hec.Close()
- if err != nil {
- t.Fatal(err)
- }
- }
- // Simulate behavior similar to first version of Splunk Logging Driver, when we were sending one message
- // per request
- func TestOneMessagePerRequest(t *testing.T) {
- t.Setenv(envVarPostMessagesFrequency, "10h")
- t.Setenv(envVarPostMessagesBatchSize, "1")
- t.Setenv(envVarBufferMaximum, "1")
- t.Setenv(envVarStreamChannelSize, "0")
- hec := NewHTTPEventCollectorMock(t)
- go hec.Serve()
- info := logger.Info{
- Config: map[string]string{
- splunkURLKey: hec.URL(),
- splunkTokenKey: hec.token,
- },
- ContainerID: "containeriid",
- ContainerName: "/container_name",
- ContainerImageID: "contaimageid",
- ContainerImageName: "container_image_name",
- }
- loggerDriver, err := New(info)
- if err != nil {
- t.Fatal(err)
- }
- for i := 0; i < 10; i++ {
- if err := loggerDriver.Log(&logger.Message{Line: []byte(strconv.Itoa(i)), Source: "stdout", Timestamp: time.Now()}); err != nil {
- t.Fatal(err)
- }
- }
- err = loggerDriver.Close()
- if err != nil {
- t.Fatal(err)
- }
- if len(hec.messages) != 10 {
- t.Fatal("Not all messages delivered")
- }
- for i, message := range hec.messages {
- if event, err := message.EventAsMap(); err != nil {
- t.Fatal(err)
- } else {
- if event["line"] != strconv.Itoa(i) {
- t.Fatalf("Unexpected event in message %v", event)
- }
- }
- }
- // 1 to verify connection and 10 messages
- if hec.numOfRequests != 11 {
- t.Fatalf("Unexpected number of requests %d", hec.numOfRequests)
- }
- err = hec.Close()
- if err != nil {
- t.Fatal(err)
- }
- }
- // Driver should not be created when HEC is unresponsive
- func TestVerify(t *testing.T) {
- hec := NewHTTPEventCollectorMock(t)
- hec.simulateServerError = true
- go hec.Serve()
- info := logger.Info{
- Config: map[string]string{
- splunkURLKey: hec.URL(),
- splunkTokenKey: hec.token,
- },
- ContainerID: "containeriid",
- ContainerName: "/container_name",
- ContainerImageID: "contaimageid",
- ContainerImageName: "container_image_name",
- }
- _, err := New(info)
- if err == nil {
- t.Fatal("Expecting driver to fail, when server is unresponsive")
- }
- err = hec.Close()
- if err != nil {
- t.Fatal(err)
- }
- }
- // Verify that user can specify to skip verification that Splunk HEC is working.
- // Also in this test we verify retry logic.
- func TestSkipVerify(t *testing.T) {
- hec := NewHTTPEventCollectorMock(t)
- hec.simulateServerError = true
- go hec.Serve()
- info := logger.Info{
- Config: map[string]string{
- splunkURLKey: hec.URL(),
- splunkTokenKey: hec.token,
- splunkVerifyConnectionKey: "false",
- },
- ContainerID: "containeriid",
- ContainerName: "/container_name",
- ContainerImageID: "contaimageid",
- ContainerImageName: "container_image_name",
- }
- loggerDriver, err := New(info)
- if err != nil {
- t.Fatal(err)
- }
- if hec.connectionVerified {
- t.Fatal("Connection should not be verified")
- }
- for i := 0; i < defaultStreamChannelSize*2; i++ {
- if err := loggerDriver.Log(&logger.Message{Line: []byte(strconv.Itoa(i)), Source: "stdout", Timestamp: time.Now()}); err != nil {
- t.Fatal(err)
- }
- }
- if len(hec.messages) != 0 {
- t.Fatal("No messages should be accepted at this point")
- }
- hec.simulateErr(false)
- for i := defaultStreamChannelSize * 2; i < defaultStreamChannelSize*4; i++ {
- if err := loggerDriver.Log(&logger.Message{Line: []byte(strconv.Itoa(i)), Source: "stdout", Timestamp: time.Now()}); err != nil {
- t.Fatal(err)
- }
- }
- err = loggerDriver.Close()
- if err != nil {
- t.Fatal(err)
- }
- if len(hec.messages) != defaultStreamChannelSize*4 {
- t.Fatal("Not all messages delivered")
- }
- for i, message := range hec.messages {
- if event, err := message.EventAsMap(); err != nil {
- t.Fatal(err)
- } else {
- if event["line"] != strconv.Itoa(i) {
- t.Fatalf("Unexpected event in message %v", event)
- }
- }
- }
- err = hec.Close()
- if err != nil {
- t.Fatal(err)
- }
- }
- // Verify logic for when we filled whole buffer
- func TestBufferMaximum(t *testing.T) {
- t.Setenv(envVarPostMessagesBatchSize, "2")
- t.Setenv(envVarBufferMaximum, "10")
- t.Setenv(envVarStreamChannelSize, "0")
- hec := NewHTTPEventCollectorMock(t)
- hec.simulateErr(true)
- go hec.Serve()
- info := logger.Info{
- Config: map[string]string{
- splunkURLKey: hec.URL(),
- splunkTokenKey: hec.token,
- splunkVerifyConnectionKey: "false",
- },
- ContainerID: "containeriid",
- ContainerName: "/container_name",
- ContainerImageID: "contaimageid",
- ContainerImageName: "container_image_name",
- }
- loggerDriver, err := New(info)
- if err != nil {
- t.Fatal(err)
- }
- if hec.connectionVerified {
- t.Fatal("Connection should not be verified")
- }
- for i := 0; i < 11; i++ {
- if err := loggerDriver.Log(&logger.Message{Line: []byte(strconv.Itoa(i)), Source: "stdout", Timestamp: time.Now()}); err != nil {
- t.Fatal(err)
- }
- }
- if len(hec.messages) != 0 {
- t.Fatal("No messages should be accepted at this point")
- }
- hec.simulateServerError = false
- err = loggerDriver.Close()
- if err != nil {
- t.Fatal(err)
- }
- if len(hec.messages) != 9 {
- t.Fatalf("Expected # of messages %d, got %d", 9, len(hec.messages))
- }
- // First 1000 messages are written to daemon log when buffer was full
- for i, message := range hec.messages {
- if event, err := message.EventAsMap(); err != nil {
- t.Fatal(err)
- } else {
- if event["line"] != fmt.Sprintf("%d", i+2) {
- t.Fatalf("Unexpected event in message %v", event)
- }
- }
- }
- err = hec.Close()
- if err != nil {
- t.Fatal(err)
- }
- }
- // Verify that we are not blocking close when HEC is down for the whole time
- func TestServerAlwaysDown(t *testing.T) {
- t.Setenv(envVarPostMessagesBatchSize, "2")
- t.Setenv(envVarBufferMaximum, "4")
- t.Setenv(envVarStreamChannelSize, "0")
- hec := NewHTTPEventCollectorMock(t)
- hec.simulateServerError = true
- go hec.Serve()
- info := logger.Info{
- Config: map[string]string{
- splunkURLKey: hec.URL(),
- splunkTokenKey: hec.token,
- splunkVerifyConnectionKey: "false",
- },
- ContainerID: "containeriid",
- ContainerName: "/container_name",
- ContainerImageID: "contaimageid",
- ContainerImageName: "container_image_name",
- }
- loggerDriver, err := New(info)
- if err != nil {
- t.Fatal(err)
- }
- if hec.connectionVerified {
- t.Fatal("Connection should not be verified")
- }
- for i := 0; i < 5; i++ {
- if err := loggerDriver.Log(&logger.Message{Line: []byte(strconv.Itoa(i)), Source: "stdout", Timestamp: time.Now()}); err != nil {
- t.Fatal(err)
- }
- }
- err = loggerDriver.Close()
- if err != nil {
- t.Fatal(err)
- }
- if len(hec.messages) != 0 {
- t.Fatal("No messages should be sent")
- }
- err = hec.Close()
- if err != nil {
- t.Fatal(err)
- }
- }
- // Cannot send messages after we close driver
- func TestCannotSendAfterClose(t *testing.T) {
- hec := NewHTTPEventCollectorMock(t)
- go hec.Serve()
- info := logger.Info{
- Config: map[string]string{
- splunkURLKey: hec.URL(),
- splunkTokenKey: hec.token,
- },
- ContainerID: "containeriid",
- ContainerName: "/container_name",
- ContainerImageID: "contaimageid",
- ContainerImageName: "container_image_name",
- }
- loggerDriver, err := New(info)
- if err != nil {
- t.Fatal(err)
- }
- if err := loggerDriver.Log(&logger.Message{Line: []byte("message1"), Source: "stdout", Timestamp: time.Now()}); err != nil {
- t.Fatal(err)
- }
- err = loggerDriver.Close()
- if err != nil {
- t.Fatal(err)
- }
- if err := loggerDriver.Log(&logger.Message{Line: []byte("message2"), Source: "stdout", Timestamp: time.Now()}); err == nil {
- t.Fatal("Driver should not allow to send messages after close")
- }
- if len(hec.messages) != 1 {
- t.Fatal("Only one message should be sent")
- }
- message := hec.messages[0]
- if event, err := message.EventAsMap(); err != nil {
- t.Fatal(err)
- } else {
- if event["line"] != "message1" {
- t.Fatalf("Unexpected event in message %v", event)
- }
- }
- err = hec.Close()
- if err != nil {
- t.Fatal(err)
- }
- }
- func TestDeadlockOnBlockedEndpoint(t *testing.T) {
- hec := NewHTTPEventCollectorMock(t)
- go hec.Serve()
- info := logger.Info{
- Config: map[string]string{
- splunkURLKey: hec.URL(),
- splunkTokenKey: hec.token,
- },
- ContainerID: "containeriid",
- ContainerName: "/container_name",
- ContainerImageID: "contaimageid",
- ContainerImageName: "container_image_name",
- }
- l, err := New(info)
- if err != nil {
- t.Fatal(err)
- }
- ctx, unblock := context.WithCancel(context.Background())
- hec.withBlock(ctx)
- defer unblock()
- batchSendTimeout = 1 * time.Second
- if err := l.Log(&logger.Message{}); err != nil {
- t.Fatal(err)
- }
- done := make(chan struct{})
- go func() {
- l.Close()
- close(done)
- }()
- select {
- case <-time.After(60 * time.Second):
- buf := make([]byte, 1e6)
- buf = buf[:runtime.Stack(buf, true)]
- t.Logf("STACK DUMP: \n\n%s\n\n", string(buf))
- t.Fatal("timeout waiting for close to finish")
- case <-done:
- }
- }
|