splunk_test.go 33 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306
  1. package splunk
  2. import (
  3. "compress/gzip"
  4. "fmt"
  5. "os"
  6. "testing"
  7. "time"
  8. "github.com/docker/docker/daemon/logger"
  9. )
  10. // Validate options
  11. func TestValidateLogOpt(t *testing.T) {
  12. err := ValidateLogOpt(map[string]string{
  13. splunkURLKey: "http://127.0.0.1",
  14. splunkTokenKey: "2160C7EF-2CE9-4307-A180-F852B99CF417",
  15. splunkSourceKey: "mysource",
  16. splunkSourceTypeKey: "mysourcetype",
  17. splunkIndexKey: "myindex",
  18. splunkCAPathKey: "/usr/cert.pem",
  19. splunkCANameKey: "ca_name",
  20. splunkInsecureSkipVerifyKey: "true",
  21. splunkFormatKey: "json",
  22. splunkVerifyConnectionKey: "true",
  23. splunkGzipCompressionKey: "true",
  24. splunkGzipCompressionLevelKey: "1",
  25. envKey: "a",
  26. envRegexKey: "^foo",
  27. labelsKey: "b",
  28. tagKey: "c",
  29. })
  30. if err != nil {
  31. t.Fatal(err)
  32. }
  33. err = ValidateLogOpt(map[string]string{
  34. "not-supported-option": "a",
  35. })
  36. if err == nil {
  37. t.Fatal("Expecting error on unsupported options")
  38. }
  39. }
  40. // Driver require user to specify required options
  41. func TestNewMissedConfig(t *testing.T) {
  42. info := logger.Info{
  43. Config: map[string]string{},
  44. }
  45. _, err := New(info)
  46. if err == nil {
  47. t.Fatal("Logger driver should fail when no required parameters specified")
  48. }
  49. }
  50. // Driver require user to specify splunk-url
  51. func TestNewMissedUrl(t *testing.T) {
  52. info := logger.Info{
  53. Config: map[string]string{
  54. splunkTokenKey: "4642492F-D8BD-47F1-A005-0C08AE4657DF",
  55. },
  56. }
  57. _, err := New(info)
  58. if err.Error() != "splunk: splunk-url is expected" {
  59. t.Fatal("Logger driver should fail when no required parameters specified")
  60. }
  61. }
  62. // Driver require user to specify splunk-token
  63. func TestNewMissedToken(t *testing.T) {
  64. info := logger.Info{
  65. Config: map[string]string{
  66. splunkURLKey: "http://127.0.0.1:8088",
  67. },
  68. }
  69. _, err := New(info)
  70. if err.Error() != "splunk: splunk-token is expected" {
  71. t.Fatal("Logger driver should fail when no required parameters specified")
  72. }
  73. }
  74. // Test default settings
  75. func TestDefault(t *testing.T) {
  76. hec := NewHTTPEventCollectorMock(t)
  77. go hec.Serve()
  78. info := logger.Info{
  79. Config: map[string]string{
  80. splunkURLKey: hec.URL(),
  81. splunkTokenKey: hec.token,
  82. },
  83. ContainerID: "containeriid",
  84. ContainerName: "container_name",
  85. ContainerImageID: "contaimageid",
  86. ContainerImageName: "container_image_name",
  87. }
  88. hostname, err := info.Hostname()
  89. if err != nil {
  90. t.Fatal(err)
  91. }
  92. loggerDriver, err := New(info)
  93. if err != nil {
  94. t.Fatal(err)
  95. }
  96. if loggerDriver.Name() != driverName {
  97. t.Fatal("Unexpected logger driver name")
  98. }
  99. if !hec.connectionVerified {
  100. t.Fatal("By default connection should be verified")
  101. }
  102. splunkLoggerDriver, ok := loggerDriver.(*splunkLoggerInline)
  103. if !ok {
  104. t.Fatal("Unexpected Splunk Logging Driver type")
  105. }
  106. if splunkLoggerDriver.url != hec.URL()+"/services/collector/event/1.0" ||
  107. splunkLoggerDriver.auth != "Splunk "+hec.token ||
  108. splunkLoggerDriver.nullMessage.Host != hostname ||
  109. splunkLoggerDriver.nullMessage.Source != "" ||
  110. splunkLoggerDriver.nullMessage.SourceType != "" ||
  111. splunkLoggerDriver.nullMessage.Index != "" ||
  112. splunkLoggerDriver.gzipCompression != false ||
  113. splunkLoggerDriver.postMessagesFrequency != defaultPostMessagesFrequency ||
  114. splunkLoggerDriver.postMessagesBatchSize != defaultPostMessagesBatchSize ||
  115. splunkLoggerDriver.bufferMaximum != defaultBufferMaximum ||
  116. cap(splunkLoggerDriver.stream) != defaultStreamChannelSize {
  117. t.Fatal("Found not default values setup in Splunk Logging Driver.")
  118. }
  119. message1Time := time.Now()
  120. if err := loggerDriver.Log(&logger.Message{Line: []byte("{\"a\":\"b\"}"), Source: "stdout", Timestamp: message1Time}); err != nil {
  121. t.Fatal(err)
  122. }
  123. message2Time := time.Now()
  124. if err := loggerDriver.Log(&logger.Message{Line: []byte("notajson"), Source: "stdout", Timestamp: message2Time}); err != nil {
  125. t.Fatal(err)
  126. }
  127. err = loggerDriver.Close()
  128. if err != nil {
  129. t.Fatal(err)
  130. }
  131. if len(hec.messages) != 2 {
  132. t.Fatal("Expected two messages")
  133. }
  134. if *hec.gzipEnabled {
  135. t.Fatal("Gzip should not be used")
  136. }
  137. message1 := hec.messages[0]
  138. if message1.Time != fmt.Sprintf("%f", float64(message1Time.UnixNano())/float64(time.Second)) ||
  139. message1.Host != hostname ||
  140. message1.Source != "" ||
  141. message1.SourceType != "" ||
  142. message1.Index != "" {
  143. t.Fatalf("Unexpected values of message 1 %v", message1)
  144. }
  145. if event, err := message1.EventAsMap(); err != nil {
  146. t.Fatal(err)
  147. } else {
  148. if event["line"] != "{\"a\":\"b\"}" ||
  149. event["source"] != "stdout" ||
  150. event["tag"] != "containeriid" ||
  151. len(event) != 3 {
  152. t.Fatalf("Unexpected event in message %v", event)
  153. }
  154. }
  155. message2 := hec.messages[1]
  156. if message2.Time != fmt.Sprintf("%f", float64(message2Time.UnixNano())/float64(time.Second)) ||
  157. message2.Host != hostname ||
  158. message2.Source != "" ||
  159. message2.SourceType != "" ||
  160. message2.Index != "" {
  161. t.Fatalf("Unexpected values of message 1 %v", message2)
  162. }
  163. if event, err := message2.EventAsMap(); err != nil {
  164. t.Fatal(err)
  165. } else {
  166. if event["line"] != "notajson" ||
  167. event["source"] != "stdout" ||
  168. event["tag"] != "containeriid" ||
  169. len(event) != 3 {
  170. t.Fatalf("Unexpected event in message %v", event)
  171. }
  172. }
  173. err = hec.Close()
  174. if err != nil {
  175. t.Fatal(err)
  176. }
  177. }
  178. // Verify inline format with a not default settings for most of options
  179. func TestInlineFormatWithNonDefaultOptions(t *testing.T) {
  180. hec := NewHTTPEventCollectorMock(t)
  181. go hec.Serve()
  182. info := logger.Info{
  183. Config: map[string]string{
  184. splunkURLKey: hec.URL(),
  185. splunkTokenKey: hec.token,
  186. splunkSourceKey: "mysource",
  187. splunkSourceTypeKey: "mysourcetype",
  188. splunkIndexKey: "myindex",
  189. splunkFormatKey: splunkFormatInline,
  190. splunkGzipCompressionKey: "true",
  191. tagKey: "{{.ImageName}}/{{.Name}}",
  192. labelsKey: "a",
  193. envRegexKey: "^foo",
  194. },
  195. ContainerID: "containeriid",
  196. ContainerName: "/container_name",
  197. ContainerImageID: "contaimageid",
  198. ContainerImageName: "container_image_name",
  199. ContainerLabels: map[string]string{
  200. "a": "b",
  201. },
  202. ContainerEnv: []string{"foo_finder=bar"},
  203. }
  204. hostname, err := info.Hostname()
  205. if err != nil {
  206. t.Fatal(err)
  207. }
  208. loggerDriver, err := New(info)
  209. if err != nil {
  210. t.Fatal(err)
  211. }
  212. if !hec.connectionVerified {
  213. t.Fatal("By default connection should be verified")
  214. }
  215. splunkLoggerDriver, ok := loggerDriver.(*splunkLoggerInline)
  216. if !ok {
  217. t.Fatal("Unexpected Splunk Logging Driver type")
  218. }
  219. if splunkLoggerDriver.url != hec.URL()+"/services/collector/event/1.0" ||
  220. splunkLoggerDriver.auth != "Splunk "+hec.token ||
  221. splunkLoggerDriver.nullMessage.Host != hostname ||
  222. splunkLoggerDriver.nullMessage.Source != "mysource" ||
  223. splunkLoggerDriver.nullMessage.SourceType != "mysourcetype" ||
  224. splunkLoggerDriver.nullMessage.Index != "myindex" ||
  225. splunkLoggerDriver.gzipCompression != true ||
  226. splunkLoggerDriver.gzipCompressionLevel != gzip.DefaultCompression ||
  227. splunkLoggerDriver.postMessagesFrequency != defaultPostMessagesFrequency ||
  228. splunkLoggerDriver.postMessagesBatchSize != defaultPostMessagesBatchSize ||
  229. splunkLoggerDriver.bufferMaximum != defaultBufferMaximum ||
  230. cap(splunkLoggerDriver.stream) != defaultStreamChannelSize {
  231. t.Fatal("Values do not match configuration.")
  232. }
  233. messageTime := time.Now()
  234. if err := loggerDriver.Log(&logger.Message{Line: []byte("1"), Source: "stdout", Timestamp: messageTime}); err != nil {
  235. t.Fatal(err)
  236. }
  237. err = loggerDriver.Close()
  238. if err != nil {
  239. t.Fatal(err)
  240. }
  241. if len(hec.messages) != 1 {
  242. t.Fatal("Expected one message")
  243. }
  244. if !*hec.gzipEnabled {
  245. t.Fatal("Gzip should be used")
  246. }
  247. message := hec.messages[0]
  248. if message.Time != fmt.Sprintf("%f", float64(messageTime.UnixNano())/float64(time.Second)) ||
  249. message.Host != hostname ||
  250. message.Source != "mysource" ||
  251. message.SourceType != "mysourcetype" ||
  252. message.Index != "myindex" {
  253. t.Fatalf("Unexpected values of message %v", message)
  254. }
  255. if event, err := message.EventAsMap(); err != nil {
  256. t.Fatal(err)
  257. } else {
  258. if event["line"] != "1" ||
  259. event["source"] != "stdout" ||
  260. event["tag"] != "container_image_name/container_name" ||
  261. event["attrs"].(map[string]interface{})["a"] != "b" ||
  262. event["attrs"].(map[string]interface{})["foo_finder"] != "bar" ||
  263. len(event) != 4 {
  264. t.Fatalf("Unexpected event in message %v", event)
  265. }
  266. }
  267. err = hec.Close()
  268. if err != nil {
  269. t.Fatal(err)
  270. }
  271. }
  272. // Verify JSON format
  273. func TestJsonFormat(t *testing.T) {
  274. hec := NewHTTPEventCollectorMock(t)
  275. go hec.Serve()
  276. info := logger.Info{
  277. Config: map[string]string{
  278. splunkURLKey: hec.URL(),
  279. splunkTokenKey: hec.token,
  280. splunkFormatKey: splunkFormatJSON,
  281. splunkGzipCompressionKey: "true",
  282. splunkGzipCompressionLevelKey: "1",
  283. },
  284. ContainerID: "containeriid",
  285. ContainerName: "/container_name",
  286. ContainerImageID: "contaimageid",
  287. ContainerImageName: "container_image_name",
  288. }
  289. hostname, err := info.Hostname()
  290. if err != nil {
  291. t.Fatal(err)
  292. }
  293. loggerDriver, err := New(info)
  294. if err != nil {
  295. t.Fatal(err)
  296. }
  297. if !hec.connectionVerified {
  298. t.Fatal("By default connection should be verified")
  299. }
  300. splunkLoggerDriver, ok := loggerDriver.(*splunkLoggerJSON)
  301. if !ok {
  302. t.Fatal("Unexpected Splunk Logging Driver type")
  303. }
  304. if splunkLoggerDriver.url != hec.URL()+"/services/collector/event/1.0" ||
  305. splunkLoggerDriver.auth != "Splunk "+hec.token ||
  306. splunkLoggerDriver.nullMessage.Host != hostname ||
  307. splunkLoggerDriver.nullMessage.Source != "" ||
  308. splunkLoggerDriver.nullMessage.SourceType != "" ||
  309. splunkLoggerDriver.nullMessage.Index != "" ||
  310. splunkLoggerDriver.gzipCompression != true ||
  311. splunkLoggerDriver.gzipCompressionLevel != gzip.BestSpeed ||
  312. splunkLoggerDriver.postMessagesFrequency != defaultPostMessagesFrequency ||
  313. splunkLoggerDriver.postMessagesBatchSize != defaultPostMessagesBatchSize ||
  314. splunkLoggerDriver.bufferMaximum != defaultBufferMaximum ||
  315. cap(splunkLoggerDriver.stream) != defaultStreamChannelSize {
  316. t.Fatal("Values do not match configuration.")
  317. }
  318. message1Time := time.Now()
  319. if err := loggerDriver.Log(&logger.Message{Line: []byte("{\"a\":\"b\"}"), Source: "stdout", Timestamp: message1Time}); err != nil {
  320. t.Fatal(err)
  321. }
  322. message2Time := time.Now()
  323. if err := loggerDriver.Log(&logger.Message{Line: []byte("notjson"), Source: "stdout", Timestamp: message2Time}); err != nil {
  324. t.Fatal(err)
  325. }
  326. err = loggerDriver.Close()
  327. if err != nil {
  328. t.Fatal(err)
  329. }
  330. if len(hec.messages) != 2 {
  331. t.Fatal("Expected two messages")
  332. }
  333. message1 := hec.messages[0]
  334. if message1.Time != fmt.Sprintf("%f", float64(message1Time.UnixNano())/float64(time.Second)) ||
  335. message1.Host != hostname ||
  336. message1.Source != "" ||
  337. message1.SourceType != "" ||
  338. message1.Index != "" {
  339. t.Fatalf("Unexpected values of message 1 %v", message1)
  340. }
  341. if event, err := message1.EventAsMap(); err != nil {
  342. t.Fatal(err)
  343. } else {
  344. if event["line"].(map[string]interface{})["a"] != "b" ||
  345. event["source"] != "stdout" ||
  346. event["tag"] != "containeriid" ||
  347. len(event) != 3 {
  348. t.Fatalf("Unexpected event in message 1 %v", event)
  349. }
  350. }
  351. message2 := hec.messages[1]
  352. if message2.Time != fmt.Sprintf("%f", float64(message2Time.UnixNano())/float64(time.Second)) ||
  353. message2.Host != hostname ||
  354. message2.Source != "" ||
  355. message2.SourceType != "" ||
  356. message2.Index != "" {
  357. t.Fatalf("Unexpected values of message 2 %v", message2)
  358. }
  359. // If message cannot be parsed as JSON - it should be sent as a line
  360. if event, err := message2.EventAsMap(); err != nil {
  361. t.Fatal(err)
  362. } else {
  363. if event["line"] != "notjson" ||
  364. event["source"] != "stdout" ||
  365. event["tag"] != "containeriid" ||
  366. len(event) != 3 {
  367. t.Fatalf("Unexpected event in message 2 %v", event)
  368. }
  369. }
  370. err = hec.Close()
  371. if err != nil {
  372. t.Fatal(err)
  373. }
  374. }
  375. // Verify raw format
  376. func TestRawFormat(t *testing.T) {
  377. hec := NewHTTPEventCollectorMock(t)
  378. go hec.Serve()
  379. info := logger.Info{
  380. Config: map[string]string{
  381. splunkURLKey: hec.URL(),
  382. splunkTokenKey: hec.token,
  383. splunkFormatKey: splunkFormatRaw,
  384. },
  385. ContainerID: "containeriid",
  386. ContainerName: "/container_name",
  387. ContainerImageID: "contaimageid",
  388. ContainerImageName: "container_image_name",
  389. }
  390. hostname, err := info.Hostname()
  391. if err != nil {
  392. t.Fatal(err)
  393. }
  394. loggerDriver, err := New(info)
  395. if err != nil {
  396. t.Fatal(err)
  397. }
  398. if !hec.connectionVerified {
  399. t.Fatal("By default connection should be verified")
  400. }
  401. splunkLoggerDriver, ok := loggerDriver.(*splunkLoggerRaw)
  402. if !ok {
  403. t.Fatal("Unexpected Splunk Logging Driver type")
  404. }
  405. if splunkLoggerDriver.url != hec.URL()+"/services/collector/event/1.0" ||
  406. splunkLoggerDriver.auth != "Splunk "+hec.token ||
  407. splunkLoggerDriver.nullMessage.Host != hostname ||
  408. splunkLoggerDriver.nullMessage.Source != "" ||
  409. splunkLoggerDriver.nullMessage.SourceType != "" ||
  410. splunkLoggerDriver.nullMessage.Index != "" ||
  411. splunkLoggerDriver.gzipCompression != false ||
  412. splunkLoggerDriver.postMessagesFrequency != defaultPostMessagesFrequency ||
  413. splunkLoggerDriver.postMessagesBatchSize != defaultPostMessagesBatchSize ||
  414. splunkLoggerDriver.bufferMaximum != defaultBufferMaximum ||
  415. cap(splunkLoggerDriver.stream) != defaultStreamChannelSize ||
  416. string(splunkLoggerDriver.prefix) != "containeriid " {
  417. t.Fatal("Values do not match configuration.")
  418. }
  419. message1Time := time.Now()
  420. if err := loggerDriver.Log(&logger.Message{Line: []byte("{\"a\":\"b\"}"), Source: "stdout", Timestamp: message1Time}); err != nil {
  421. t.Fatal(err)
  422. }
  423. message2Time := time.Now()
  424. if err := loggerDriver.Log(&logger.Message{Line: []byte("notjson"), Source: "stdout", Timestamp: message2Time}); err != nil {
  425. t.Fatal(err)
  426. }
  427. err = loggerDriver.Close()
  428. if err != nil {
  429. t.Fatal(err)
  430. }
  431. if len(hec.messages) != 2 {
  432. t.Fatal("Expected two messages")
  433. }
  434. message1 := hec.messages[0]
  435. if message1.Time != fmt.Sprintf("%f", float64(message1Time.UnixNano())/float64(time.Second)) ||
  436. message1.Host != hostname ||
  437. message1.Source != "" ||
  438. message1.SourceType != "" ||
  439. message1.Index != "" {
  440. t.Fatalf("Unexpected values of message 1 %v", message1)
  441. }
  442. if event, err := message1.EventAsString(); err != nil {
  443. t.Fatal(err)
  444. } else {
  445. if event != "containeriid {\"a\":\"b\"}" {
  446. t.Fatalf("Unexpected event in message 1 %v", event)
  447. }
  448. }
  449. message2 := hec.messages[1]
  450. if message2.Time != fmt.Sprintf("%f", float64(message2Time.UnixNano())/float64(time.Second)) ||
  451. message2.Host != hostname ||
  452. message2.Source != "" ||
  453. message2.SourceType != "" ||
  454. message2.Index != "" {
  455. t.Fatalf("Unexpected values of message 2 %v", message2)
  456. }
  457. if event, err := message2.EventAsString(); err != nil {
  458. t.Fatal(err)
  459. } else {
  460. if event != "containeriid notjson" {
  461. t.Fatalf("Unexpected event in message 1 %v", event)
  462. }
  463. }
  464. err = hec.Close()
  465. if err != nil {
  466. t.Fatal(err)
  467. }
  468. }
  469. // Verify raw format with labels
  470. func TestRawFormatWithLabels(t *testing.T) {
  471. hec := NewHTTPEventCollectorMock(t)
  472. go hec.Serve()
  473. info := logger.Info{
  474. Config: map[string]string{
  475. splunkURLKey: hec.URL(),
  476. splunkTokenKey: hec.token,
  477. splunkFormatKey: splunkFormatRaw,
  478. labelsKey: "a",
  479. },
  480. ContainerID: "containeriid",
  481. ContainerName: "/container_name",
  482. ContainerImageID: "contaimageid",
  483. ContainerImageName: "container_image_name",
  484. ContainerLabels: map[string]string{
  485. "a": "b",
  486. },
  487. }
  488. hostname, err := info.Hostname()
  489. if err != nil {
  490. t.Fatal(err)
  491. }
  492. loggerDriver, err := New(info)
  493. if err != nil {
  494. t.Fatal(err)
  495. }
  496. if !hec.connectionVerified {
  497. t.Fatal("By default connection should be verified")
  498. }
  499. splunkLoggerDriver, ok := loggerDriver.(*splunkLoggerRaw)
  500. if !ok {
  501. t.Fatal("Unexpected Splunk Logging Driver type")
  502. }
  503. if splunkLoggerDriver.url != hec.URL()+"/services/collector/event/1.0" ||
  504. splunkLoggerDriver.auth != "Splunk "+hec.token ||
  505. splunkLoggerDriver.nullMessage.Host != hostname ||
  506. splunkLoggerDriver.nullMessage.Source != "" ||
  507. splunkLoggerDriver.nullMessage.SourceType != "" ||
  508. splunkLoggerDriver.nullMessage.Index != "" ||
  509. splunkLoggerDriver.gzipCompression != false ||
  510. splunkLoggerDriver.postMessagesFrequency != defaultPostMessagesFrequency ||
  511. splunkLoggerDriver.postMessagesBatchSize != defaultPostMessagesBatchSize ||
  512. splunkLoggerDriver.bufferMaximum != defaultBufferMaximum ||
  513. cap(splunkLoggerDriver.stream) != defaultStreamChannelSize ||
  514. string(splunkLoggerDriver.prefix) != "containeriid a=b " {
  515. t.Fatal("Values do not match configuration.")
  516. }
  517. message1Time := time.Now()
  518. if err := loggerDriver.Log(&logger.Message{Line: []byte("{\"a\":\"b\"}"), Source: "stdout", Timestamp: message1Time}); err != nil {
  519. t.Fatal(err)
  520. }
  521. message2Time := time.Now()
  522. if err := loggerDriver.Log(&logger.Message{Line: []byte("notjson"), Source: "stdout", Timestamp: message2Time}); err != nil {
  523. t.Fatal(err)
  524. }
  525. err = loggerDriver.Close()
  526. if err != nil {
  527. t.Fatal(err)
  528. }
  529. if len(hec.messages) != 2 {
  530. t.Fatal("Expected two messages")
  531. }
  532. message1 := hec.messages[0]
  533. if message1.Time != fmt.Sprintf("%f", float64(message1Time.UnixNano())/float64(time.Second)) ||
  534. message1.Host != hostname ||
  535. message1.Source != "" ||
  536. message1.SourceType != "" ||
  537. message1.Index != "" {
  538. t.Fatalf("Unexpected values of message 1 %v", message1)
  539. }
  540. if event, err := message1.EventAsString(); err != nil {
  541. t.Fatal(err)
  542. } else {
  543. if event != "containeriid a=b {\"a\":\"b\"}" {
  544. t.Fatalf("Unexpected event in message 1 %v", event)
  545. }
  546. }
  547. message2 := hec.messages[1]
  548. if message2.Time != fmt.Sprintf("%f", float64(message2Time.UnixNano())/float64(time.Second)) ||
  549. message2.Host != hostname ||
  550. message2.Source != "" ||
  551. message2.SourceType != "" ||
  552. message2.Index != "" {
  553. t.Fatalf("Unexpected values of message 2 %v", message2)
  554. }
  555. if event, err := message2.EventAsString(); err != nil {
  556. t.Fatal(err)
  557. } else {
  558. if event != "containeriid a=b notjson" {
  559. t.Fatalf("Unexpected event in message 2 %v", event)
  560. }
  561. }
  562. err = hec.Close()
  563. if err != nil {
  564. t.Fatal(err)
  565. }
  566. }
  567. // Verify that Splunk Logging Driver can accept tag="" which will allow to send raw messages
  568. // in the same way we get them in stdout/stderr
  569. func TestRawFormatWithoutTag(t *testing.T) {
  570. hec := NewHTTPEventCollectorMock(t)
  571. go hec.Serve()
  572. info := logger.Info{
  573. Config: map[string]string{
  574. splunkURLKey: hec.URL(),
  575. splunkTokenKey: hec.token,
  576. splunkFormatKey: splunkFormatRaw,
  577. tagKey: "",
  578. },
  579. ContainerID: "containeriid",
  580. ContainerName: "/container_name",
  581. ContainerImageID: "contaimageid",
  582. ContainerImageName: "container_image_name",
  583. }
  584. hostname, err := info.Hostname()
  585. if err != nil {
  586. t.Fatal(err)
  587. }
  588. loggerDriver, err := New(info)
  589. if err != nil {
  590. t.Fatal(err)
  591. }
  592. if !hec.connectionVerified {
  593. t.Fatal("By default connection should be verified")
  594. }
  595. splunkLoggerDriver, ok := loggerDriver.(*splunkLoggerRaw)
  596. if !ok {
  597. t.Fatal("Unexpected Splunk Logging Driver type")
  598. }
  599. if splunkLoggerDriver.url != hec.URL()+"/services/collector/event/1.0" ||
  600. splunkLoggerDriver.auth != "Splunk "+hec.token ||
  601. splunkLoggerDriver.nullMessage.Host != hostname ||
  602. splunkLoggerDriver.nullMessage.Source != "" ||
  603. splunkLoggerDriver.nullMessage.SourceType != "" ||
  604. splunkLoggerDriver.nullMessage.Index != "" ||
  605. splunkLoggerDriver.gzipCompression != false ||
  606. splunkLoggerDriver.postMessagesFrequency != defaultPostMessagesFrequency ||
  607. splunkLoggerDriver.postMessagesBatchSize != defaultPostMessagesBatchSize ||
  608. splunkLoggerDriver.bufferMaximum != defaultBufferMaximum ||
  609. cap(splunkLoggerDriver.stream) != defaultStreamChannelSize ||
  610. string(splunkLoggerDriver.prefix) != "" {
  611. t.Log(string(splunkLoggerDriver.prefix) + "a")
  612. t.Fatal("Values do not match configuration.")
  613. }
  614. message1Time := time.Now()
  615. if err := loggerDriver.Log(&logger.Message{Line: []byte("{\"a\":\"b\"}"), Source: "stdout", Timestamp: message1Time}); err != nil {
  616. t.Fatal(err)
  617. }
  618. message2Time := time.Now()
  619. if err := loggerDriver.Log(&logger.Message{Line: []byte("notjson"), Source: "stdout", Timestamp: message2Time}); err != nil {
  620. t.Fatal(err)
  621. }
  622. err = loggerDriver.Close()
  623. if err != nil {
  624. t.Fatal(err)
  625. }
  626. if len(hec.messages) != 2 {
  627. t.Fatal("Expected two messages")
  628. }
  629. message1 := hec.messages[0]
  630. if message1.Time != fmt.Sprintf("%f", float64(message1Time.UnixNano())/float64(time.Second)) ||
  631. message1.Host != hostname ||
  632. message1.Source != "" ||
  633. message1.SourceType != "" ||
  634. message1.Index != "" {
  635. t.Fatalf("Unexpected values of message 1 %v", message1)
  636. }
  637. if event, err := message1.EventAsString(); err != nil {
  638. t.Fatal(err)
  639. } else {
  640. if event != "{\"a\":\"b\"}" {
  641. t.Fatalf("Unexpected event in message 1 %v", event)
  642. }
  643. }
  644. message2 := hec.messages[1]
  645. if message2.Time != fmt.Sprintf("%f", float64(message2Time.UnixNano())/float64(time.Second)) ||
  646. message2.Host != hostname ||
  647. message2.Source != "" ||
  648. message2.SourceType != "" ||
  649. message2.Index != "" {
  650. t.Fatalf("Unexpected values of message 2 %v", message2)
  651. }
  652. if event, err := message2.EventAsString(); err != nil {
  653. t.Fatal(err)
  654. } else {
  655. if event != "notjson" {
  656. t.Fatalf("Unexpected event in message 2 %v", event)
  657. }
  658. }
  659. err = hec.Close()
  660. if err != nil {
  661. t.Fatal(err)
  662. }
  663. }
  664. // Verify that we will send messages in batches with default batching parameters,
  665. // but change frequency to be sure that numOfRequests will match expected 17 requests
  666. func TestBatching(t *testing.T) {
  667. if err := os.Setenv(envVarPostMessagesFrequency, "10h"); err != nil {
  668. t.Fatal(err)
  669. }
  670. hec := NewHTTPEventCollectorMock(t)
  671. go hec.Serve()
  672. info := logger.Info{
  673. Config: map[string]string{
  674. splunkURLKey: hec.URL(),
  675. splunkTokenKey: hec.token,
  676. },
  677. ContainerID: "containeriid",
  678. ContainerName: "/container_name",
  679. ContainerImageID: "contaimageid",
  680. ContainerImageName: "container_image_name",
  681. }
  682. loggerDriver, err := New(info)
  683. if err != nil {
  684. t.Fatal(err)
  685. }
  686. for i := 0; i < defaultStreamChannelSize*4; i++ {
  687. if err := loggerDriver.Log(&logger.Message{Line: []byte(fmt.Sprintf("%d", i)), Source: "stdout", Timestamp: time.Now()}); err != nil {
  688. t.Fatal(err)
  689. }
  690. }
  691. err = loggerDriver.Close()
  692. if err != nil {
  693. t.Fatal(err)
  694. }
  695. if len(hec.messages) != defaultStreamChannelSize*4 {
  696. t.Fatal("Not all messages delivered")
  697. }
  698. for i, message := range hec.messages {
  699. if event, err := message.EventAsMap(); err != nil {
  700. t.Fatal(err)
  701. } else {
  702. if event["line"] != fmt.Sprintf("%d", i) {
  703. t.Fatalf("Unexpected event in message %v", event)
  704. }
  705. }
  706. }
  707. // 1 to verify connection and 16 batches
  708. if hec.numOfRequests != 17 {
  709. t.Fatalf("Unexpected number of requests %d", hec.numOfRequests)
  710. }
  711. err = hec.Close()
  712. if err != nil {
  713. t.Fatal(err)
  714. }
  715. if err := os.Setenv(envVarPostMessagesFrequency, ""); err != nil {
  716. t.Fatal(err)
  717. }
  718. }
  719. // Verify that test is using time to fire events not rare than specified frequency
  720. func TestFrequency(t *testing.T) {
  721. if err := os.Setenv(envVarPostMessagesFrequency, "5ms"); err != nil {
  722. t.Fatal(err)
  723. }
  724. hec := NewHTTPEventCollectorMock(t)
  725. go hec.Serve()
  726. info := logger.Info{
  727. Config: map[string]string{
  728. splunkURLKey: hec.URL(),
  729. splunkTokenKey: hec.token,
  730. },
  731. ContainerID: "containeriid",
  732. ContainerName: "/container_name",
  733. ContainerImageID: "contaimageid",
  734. ContainerImageName: "container_image_name",
  735. }
  736. loggerDriver, err := New(info)
  737. if err != nil {
  738. t.Fatal(err)
  739. }
  740. for i := 0; i < 10; i++ {
  741. if err := loggerDriver.Log(&logger.Message{Line: []byte(fmt.Sprintf("%d", i)), Source: "stdout", Timestamp: time.Now()}); err != nil {
  742. t.Fatal(err)
  743. }
  744. time.Sleep(15 * time.Millisecond)
  745. }
  746. err = loggerDriver.Close()
  747. if err != nil {
  748. t.Fatal(err)
  749. }
  750. if len(hec.messages) != 10 {
  751. t.Fatal("Not all messages delivered")
  752. }
  753. for i, message := range hec.messages {
  754. if event, err := message.EventAsMap(); err != nil {
  755. t.Fatal(err)
  756. } else {
  757. if event["line"] != fmt.Sprintf("%d", i) {
  758. t.Fatalf("Unexpected event in message %v", event)
  759. }
  760. }
  761. }
  762. // 1 to verify connection and 10 to verify that we have sent messages with required frequency,
  763. // but because frequency is too small (to keep test quick), instead of 11, use 9 if context switches will be slow
  764. if hec.numOfRequests < 9 {
  765. t.Fatalf("Unexpected number of requests %d", hec.numOfRequests)
  766. }
  767. err = hec.Close()
  768. if err != nil {
  769. t.Fatal(err)
  770. }
  771. if err := os.Setenv(envVarPostMessagesFrequency, ""); err != nil {
  772. t.Fatal(err)
  773. }
  774. }
  775. // Simulate behavior similar to first version of Splunk Logging Driver, when we were sending one message
  776. // per request
  777. func TestOneMessagePerRequest(t *testing.T) {
  778. if err := os.Setenv(envVarPostMessagesFrequency, "10h"); err != nil {
  779. t.Fatal(err)
  780. }
  781. if err := os.Setenv(envVarPostMessagesBatchSize, "1"); err != nil {
  782. t.Fatal(err)
  783. }
  784. if err := os.Setenv(envVarBufferMaximum, "1"); err != nil {
  785. t.Fatal(err)
  786. }
  787. if err := os.Setenv(envVarStreamChannelSize, "0"); err != nil {
  788. t.Fatal(err)
  789. }
  790. hec := NewHTTPEventCollectorMock(t)
  791. go hec.Serve()
  792. info := logger.Info{
  793. Config: map[string]string{
  794. splunkURLKey: hec.URL(),
  795. splunkTokenKey: hec.token,
  796. },
  797. ContainerID: "containeriid",
  798. ContainerName: "/container_name",
  799. ContainerImageID: "contaimageid",
  800. ContainerImageName: "container_image_name",
  801. }
  802. loggerDriver, err := New(info)
  803. if err != nil {
  804. t.Fatal(err)
  805. }
  806. for i := 0; i < 10; i++ {
  807. if err := loggerDriver.Log(&logger.Message{Line: []byte(fmt.Sprintf("%d", i)), Source: "stdout", Timestamp: time.Now()}); err != nil {
  808. t.Fatal(err)
  809. }
  810. }
  811. err = loggerDriver.Close()
  812. if err != nil {
  813. t.Fatal(err)
  814. }
  815. if len(hec.messages) != 10 {
  816. t.Fatal("Not all messages delivered")
  817. }
  818. for i, message := range hec.messages {
  819. if event, err := message.EventAsMap(); err != nil {
  820. t.Fatal(err)
  821. } else {
  822. if event["line"] != fmt.Sprintf("%d", i) {
  823. t.Fatalf("Unexpected event in message %v", event)
  824. }
  825. }
  826. }
  827. // 1 to verify connection and 10 messages
  828. if hec.numOfRequests != 11 {
  829. t.Fatalf("Unexpected number of requests %d", hec.numOfRequests)
  830. }
  831. err = hec.Close()
  832. if err != nil {
  833. t.Fatal(err)
  834. }
  835. if err := os.Setenv(envVarPostMessagesFrequency, ""); err != nil {
  836. t.Fatal(err)
  837. }
  838. if err := os.Setenv(envVarPostMessagesBatchSize, ""); err != nil {
  839. t.Fatal(err)
  840. }
  841. if err := os.Setenv(envVarBufferMaximum, ""); err != nil {
  842. t.Fatal(err)
  843. }
  844. if err := os.Setenv(envVarStreamChannelSize, ""); err != nil {
  845. t.Fatal(err)
  846. }
  847. }
  848. // Driver should not be created when HEC is unresponsive
  849. func TestVerify(t *testing.T) {
  850. hec := NewHTTPEventCollectorMock(t)
  851. hec.simulateServerError = true
  852. go hec.Serve()
  853. info := logger.Info{
  854. Config: map[string]string{
  855. splunkURLKey: hec.URL(),
  856. splunkTokenKey: hec.token,
  857. },
  858. ContainerID: "containeriid",
  859. ContainerName: "/container_name",
  860. ContainerImageID: "contaimageid",
  861. ContainerImageName: "container_image_name",
  862. }
  863. _, err := New(info)
  864. if err == nil {
  865. t.Fatal("Expecting driver to fail, when server is unresponsive")
  866. }
  867. err = hec.Close()
  868. if err != nil {
  869. t.Fatal(err)
  870. }
  871. }
  872. // Verify that user can specify to skip verification that Splunk HEC is working.
  873. // Also in this test we verify retry logic.
  874. func TestSkipVerify(t *testing.T) {
  875. hec := NewHTTPEventCollectorMock(t)
  876. hec.simulateServerError = true
  877. go hec.Serve()
  878. info := logger.Info{
  879. Config: map[string]string{
  880. splunkURLKey: hec.URL(),
  881. splunkTokenKey: hec.token,
  882. splunkVerifyConnectionKey: "false",
  883. },
  884. ContainerID: "containeriid",
  885. ContainerName: "/container_name",
  886. ContainerImageID: "contaimageid",
  887. ContainerImageName: "container_image_name",
  888. }
  889. loggerDriver, err := New(info)
  890. if err != nil {
  891. t.Fatal(err)
  892. }
  893. if hec.connectionVerified {
  894. t.Fatal("Connection should not be verified")
  895. }
  896. for i := 0; i < defaultStreamChannelSize*2; i++ {
  897. if err := loggerDriver.Log(&logger.Message{Line: []byte(fmt.Sprintf("%d", i)), Source: "stdout", Timestamp: time.Now()}); err != nil {
  898. t.Fatal(err)
  899. }
  900. }
  901. if len(hec.messages) != 0 {
  902. t.Fatal("No messages should be accepted at this point")
  903. }
  904. hec.simulateServerError = false
  905. for i := defaultStreamChannelSize * 2; i < defaultStreamChannelSize*4; i++ {
  906. if err := loggerDriver.Log(&logger.Message{Line: []byte(fmt.Sprintf("%d", i)), Source: "stdout", Timestamp: time.Now()}); err != nil {
  907. t.Fatal(err)
  908. }
  909. }
  910. err = loggerDriver.Close()
  911. if err != nil {
  912. t.Fatal(err)
  913. }
  914. if len(hec.messages) != defaultStreamChannelSize*4 {
  915. t.Fatal("Not all messages delivered")
  916. }
  917. for i, message := range hec.messages {
  918. if event, err := message.EventAsMap(); err != nil {
  919. t.Fatal(err)
  920. } else {
  921. if event["line"] != fmt.Sprintf("%d", i) {
  922. t.Fatalf("Unexpected event in message %v", event)
  923. }
  924. }
  925. }
  926. err = hec.Close()
  927. if err != nil {
  928. t.Fatal(err)
  929. }
  930. }
  931. // Verify logic for when we filled whole buffer
  932. func TestBufferMaximum(t *testing.T) {
  933. if err := os.Setenv(envVarPostMessagesBatchSize, "2"); err != nil {
  934. t.Fatal(err)
  935. }
  936. if err := os.Setenv(envVarBufferMaximum, "10"); err != nil {
  937. t.Fatal(err)
  938. }
  939. if err := os.Setenv(envVarStreamChannelSize, "0"); err != nil {
  940. t.Fatal(err)
  941. }
  942. hec := NewHTTPEventCollectorMock(t)
  943. hec.simulateServerError = true
  944. go hec.Serve()
  945. info := logger.Info{
  946. Config: map[string]string{
  947. splunkURLKey: hec.URL(),
  948. splunkTokenKey: hec.token,
  949. splunkVerifyConnectionKey: "false",
  950. },
  951. ContainerID: "containeriid",
  952. ContainerName: "/container_name",
  953. ContainerImageID: "contaimageid",
  954. ContainerImageName: "container_image_name",
  955. }
  956. loggerDriver, err := New(info)
  957. if err != nil {
  958. t.Fatal(err)
  959. }
  960. if hec.connectionVerified {
  961. t.Fatal("Connection should not be verified")
  962. }
  963. for i := 0; i < 11; i++ {
  964. if err := loggerDriver.Log(&logger.Message{Line: []byte(fmt.Sprintf("%d", i)), Source: "stdout", Timestamp: time.Now()}); err != nil {
  965. t.Fatal(err)
  966. }
  967. }
  968. if len(hec.messages) != 0 {
  969. t.Fatal("No messages should be accepted at this point")
  970. }
  971. hec.simulateServerError = false
  972. err = loggerDriver.Close()
  973. if err != nil {
  974. t.Fatal(err)
  975. }
  976. if len(hec.messages) != 9 {
  977. t.Fatalf("Expected # of messages %d, got %d", 9, len(hec.messages))
  978. }
  979. // First 1000 messages are written to daemon log when buffer was full
  980. for i, message := range hec.messages {
  981. if event, err := message.EventAsMap(); err != nil {
  982. t.Fatal(err)
  983. } else {
  984. if event["line"] != fmt.Sprintf("%d", i+2) {
  985. t.Fatalf("Unexpected event in message %v", event)
  986. }
  987. }
  988. }
  989. err = hec.Close()
  990. if err != nil {
  991. t.Fatal(err)
  992. }
  993. if err := os.Setenv(envVarPostMessagesBatchSize, ""); err != nil {
  994. t.Fatal(err)
  995. }
  996. if err := os.Setenv(envVarBufferMaximum, ""); err != nil {
  997. t.Fatal(err)
  998. }
  999. if err := os.Setenv(envVarStreamChannelSize, ""); err != nil {
  1000. t.Fatal(err)
  1001. }
  1002. }
  1003. // Verify that we are not blocking close when HEC is down for the whole time
  1004. func TestServerAlwaysDown(t *testing.T) {
  1005. if err := os.Setenv(envVarPostMessagesBatchSize, "2"); err != nil {
  1006. t.Fatal(err)
  1007. }
  1008. if err := os.Setenv(envVarBufferMaximum, "4"); err != nil {
  1009. t.Fatal(err)
  1010. }
  1011. if err := os.Setenv(envVarStreamChannelSize, "0"); err != nil {
  1012. t.Fatal(err)
  1013. }
  1014. hec := NewHTTPEventCollectorMock(t)
  1015. hec.simulateServerError = true
  1016. go hec.Serve()
  1017. info := logger.Info{
  1018. Config: map[string]string{
  1019. splunkURLKey: hec.URL(),
  1020. splunkTokenKey: hec.token,
  1021. splunkVerifyConnectionKey: "false",
  1022. },
  1023. ContainerID: "containeriid",
  1024. ContainerName: "/container_name",
  1025. ContainerImageID: "contaimageid",
  1026. ContainerImageName: "container_image_name",
  1027. }
  1028. loggerDriver, err := New(info)
  1029. if err != nil {
  1030. t.Fatal(err)
  1031. }
  1032. if hec.connectionVerified {
  1033. t.Fatal("Connection should not be verified")
  1034. }
  1035. for i := 0; i < 5; i++ {
  1036. if err := loggerDriver.Log(&logger.Message{Line: []byte(fmt.Sprintf("%d", i)), Source: "stdout", Timestamp: time.Now()}); err != nil {
  1037. t.Fatal(err)
  1038. }
  1039. }
  1040. err = loggerDriver.Close()
  1041. if err != nil {
  1042. t.Fatal(err)
  1043. }
  1044. if len(hec.messages) != 0 {
  1045. t.Fatal("No messages should be sent")
  1046. }
  1047. err = hec.Close()
  1048. if err != nil {
  1049. t.Fatal(err)
  1050. }
  1051. if err := os.Setenv(envVarPostMessagesBatchSize, ""); err != nil {
  1052. t.Fatal(err)
  1053. }
  1054. if err := os.Setenv(envVarBufferMaximum, ""); err != nil {
  1055. t.Fatal(err)
  1056. }
  1057. if err := os.Setenv(envVarStreamChannelSize, ""); err != nil {
  1058. t.Fatal(err)
  1059. }
  1060. }
  1061. // Cannot send messages after we close driver
  1062. func TestCannotSendAfterClose(t *testing.T) {
  1063. hec := NewHTTPEventCollectorMock(t)
  1064. go hec.Serve()
  1065. info := logger.Info{
  1066. Config: map[string]string{
  1067. splunkURLKey: hec.URL(),
  1068. splunkTokenKey: hec.token,
  1069. },
  1070. ContainerID: "containeriid",
  1071. ContainerName: "/container_name",
  1072. ContainerImageID: "contaimageid",
  1073. ContainerImageName: "container_image_name",
  1074. }
  1075. loggerDriver, err := New(info)
  1076. if err != nil {
  1077. t.Fatal(err)
  1078. }
  1079. if err := loggerDriver.Log(&logger.Message{Line: []byte("message1"), Source: "stdout", Timestamp: time.Now()}); err != nil {
  1080. t.Fatal(err)
  1081. }
  1082. err = loggerDriver.Close()
  1083. if err != nil {
  1084. t.Fatal(err)
  1085. }
  1086. if err := loggerDriver.Log(&logger.Message{Line: []byte("message2"), Source: "stdout", Timestamp: time.Now()}); err == nil {
  1087. t.Fatal("Driver should not allow to send messages after close")
  1088. }
  1089. if len(hec.messages) != 1 {
  1090. t.Fatal("Only one message should be sent")
  1091. }
  1092. message := hec.messages[0]
  1093. if event, err := message.EventAsMap(); err != nil {
  1094. t.Fatal(err)
  1095. } else {
  1096. if event["line"] != "message1" {
  1097. t.Fatalf("Unexpected event in message %v", event)
  1098. }
  1099. }
  1100. err = hec.Close()
  1101. if err != nil {
  1102. t.Fatal(err)
  1103. }
  1104. }