splunk_test.go 33 KB


  1. package splunk
  2. import (
  3. "compress/gzip"
  4. "fmt"
  5. "os"
  6. "testing"
  7. "time"
  8. "github.com/docker/docker/daemon/logger"
  9. "github.com/stretchr/testify/require"
  10. )
  11. // Validate options
  12. func TestValidateLogOpt(t *testing.T) {
  13. err := ValidateLogOpt(map[string]string{
  14. splunkURLKey: "http://127.0.0.1",
  15. splunkTokenKey: "2160C7EF-2CE9-4307-A180-F852B99CF417",
  16. splunkSourceKey: "mysource",
  17. splunkSourceTypeKey: "mysourcetype",
  18. splunkIndexKey: "myindex",
  19. splunkCAPathKey: "/usr/cert.pem",
  20. splunkCANameKey: "ca_name",
  21. splunkInsecureSkipVerifyKey: "true",
  22. splunkFormatKey: "json",
  23. splunkVerifyConnectionKey: "true",
  24. splunkGzipCompressionKey: "true",
  25. splunkGzipCompressionLevelKey: "1",
  26. envKey: "a",
  27. envRegexKey: "^foo",
  28. labelsKey: "b",
  29. tagKey: "c",
  30. })
  31. if err != nil {
  32. t.Fatal(err)
  33. }
  34. err = ValidateLogOpt(map[string]string{
  35. "not-supported-option": "a",
  36. })
  37. if err == nil {
  38. t.Fatal("Expecting error on unsupported options")
  39. }
  40. }
  41. // Driver require user to specify required options
  42. func TestNewMissedConfig(t *testing.T) {
  43. info := logger.Info{
  44. Config: map[string]string{},
  45. }
  46. _, err := New(info)
  47. if err == nil {
  48. t.Fatal("Logger driver should fail when no required parameters specified")
  49. }
  50. }
  51. // Driver require user to specify splunk-url
  52. func TestNewMissedUrl(t *testing.T) {
  53. info := logger.Info{
  54. Config: map[string]string{
  55. splunkTokenKey: "4642492F-D8BD-47F1-A005-0C08AE4657DF",
  56. },
  57. }
  58. _, err := New(info)
  59. if err.Error() != "splunk: splunk-url is expected" {
  60. t.Fatal("Logger driver should fail when no required parameters specified")
  61. }
  62. }
  63. // Driver require user to specify splunk-token
  64. func TestNewMissedToken(t *testing.T) {
  65. info := logger.Info{
  66. Config: map[string]string{
  67. splunkURLKey: "http://127.0.0.1:8088",
  68. },
  69. }
  70. _, err := New(info)
  71. if err.Error() != "splunk: splunk-token is expected" {
  72. t.Fatal("Logger driver should fail when no required parameters specified")
  73. }
  74. }
  75. // Test default settings
  76. func TestDefault(t *testing.T) {
  77. hec := NewHTTPEventCollectorMock(t)
  78. go hec.Serve()
  79. info := logger.Info{
  80. Config: map[string]string{
  81. splunkURLKey: hec.URL(),
  82. splunkTokenKey: hec.token,
  83. },
  84. ContainerID: "containeriid",
  85. ContainerName: "container_name",
  86. ContainerImageID: "contaimageid",
  87. ContainerImageName: "container_image_name",
  88. }
  89. hostname, err := info.Hostname()
  90. if err != nil {
  91. t.Fatal(err)
  92. }
  93. loggerDriver, err := New(info)
  94. if err != nil {
  95. t.Fatal(err)
  96. }
  97. if loggerDriver.Name() != driverName {
  98. t.Fatal("Unexpected logger driver name")
  99. }
  100. if !hec.connectionVerified {
  101. t.Fatal("By default connection should be verified")
  102. }
  103. splunkLoggerDriver, ok := loggerDriver.(*splunkLoggerInline)
  104. if !ok {
  105. t.Fatal("Unexpected Splunk Logging Driver type")
  106. }
  107. if splunkLoggerDriver.url != hec.URL()+"/services/collector/event/1.0" ||
  108. splunkLoggerDriver.auth != "Splunk "+hec.token ||
  109. splunkLoggerDriver.nullMessage.Host != hostname ||
  110. splunkLoggerDriver.nullMessage.Source != "" ||
  111. splunkLoggerDriver.nullMessage.SourceType != "" ||
  112. splunkLoggerDriver.nullMessage.Index != "" ||
  113. splunkLoggerDriver.gzipCompression ||
  114. splunkLoggerDriver.postMessagesFrequency != defaultPostMessagesFrequency ||
  115. splunkLoggerDriver.postMessagesBatchSize != defaultPostMessagesBatchSize ||
  116. splunkLoggerDriver.bufferMaximum != defaultBufferMaximum ||
  117. cap(splunkLoggerDriver.stream) != defaultStreamChannelSize {
  118. t.Fatal("Found not default values setup in Splunk Logging Driver.")
  119. }
  120. message1Time := time.Now()
  121. if err := loggerDriver.Log(&logger.Message{Line: []byte("{\"a\":\"b\"}"), Source: "stdout", Timestamp: message1Time}); err != nil {
  122. t.Fatal(err)
  123. }
  124. message2Time := time.Now()
  125. if err := loggerDriver.Log(&logger.Message{Line: []byte("notajson"), Source: "stdout", Timestamp: message2Time}); err != nil {
  126. t.Fatal(err)
  127. }
  128. err = loggerDriver.Close()
  129. if err != nil {
  130. t.Fatal(err)
  131. }
  132. if len(hec.messages) != 2 {
  133. t.Fatal("Expected two messages")
  134. }
  135. if *hec.gzipEnabled {
  136. t.Fatal("Gzip should not be used")
  137. }
  138. message1 := hec.messages[0]
  139. if message1.Time != fmt.Sprintf("%f", float64(message1Time.UnixNano())/float64(time.Second)) ||
  140. message1.Host != hostname ||
  141. message1.Source != "" ||
  142. message1.SourceType != "" ||
  143. message1.Index != "" {
  144. t.Fatalf("Unexpected values of message 1 %v", message1)
  145. }
  146. if event, err := message1.EventAsMap(); err != nil {
  147. t.Fatal(err)
  148. } else {
  149. if event["line"] != "{\"a\":\"b\"}" ||
  150. event["source"] != "stdout" ||
  151. event["tag"] != "containeriid" ||
  152. len(event) != 3 {
  153. t.Fatalf("Unexpected event in message %v", event)
  154. }
  155. }
  156. message2 := hec.messages[1]
  157. if message2.Time != fmt.Sprintf("%f", float64(message2Time.UnixNano())/float64(time.Second)) ||
  158. message2.Host != hostname ||
  159. message2.Source != "" ||
  160. message2.SourceType != "" ||
  161. message2.Index != "" {
  162. t.Fatalf("Unexpected values of message 1 %v", message2)
  163. }
  164. if event, err := message2.EventAsMap(); err != nil {
  165. t.Fatal(err)
  166. } else {
  167. if event["line"] != "notajson" ||
  168. event["source"] != "stdout" ||
  169. event["tag"] != "containeriid" ||
  170. len(event) != 3 {
  171. t.Fatalf("Unexpected event in message %v", event)
  172. }
  173. }
  174. err = hec.Close()
  175. if err != nil {
  176. t.Fatal(err)
  177. }
  178. }
  179. // Verify inline format with a not default settings for most of options
  180. func TestInlineFormatWithNonDefaultOptions(t *testing.T) {
  181. hec := NewHTTPEventCollectorMock(t)
  182. go hec.Serve()
  183. info := logger.Info{
  184. Config: map[string]string{
  185. splunkURLKey: hec.URL(),
  186. splunkTokenKey: hec.token,
  187. splunkSourceKey: "mysource",
  188. splunkSourceTypeKey: "mysourcetype",
  189. splunkIndexKey: "myindex",
  190. splunkFormatKey: splunkFormatInline,
  191. splunkGzipCompressionKey: "true",
  192. tagKey: "{{.ImageName}}/{{.Name}}",
  193. labelsKey: "a",
  194. envRegexKey: "^foo",
  195. },
  196. ContainerID: "containeriid",
  197. ContainerName: "/container_name",
  198. ContainerImageID: "contaimageid",
  199. ContainerImageName: "container_image_name",
  200. ContainerLabels: map[string]string{
  201. "a": "b",
  202. },
  203. ContainerEnv: []string{"foo_finder=bar"},
  204. }
  205. hostname, err := info.Hostname()
  206. if err != nil {
  207. t.Fatal(err)
  208. }
  209. loggerDriver, err := New(info)
  210. if err != nil {
  211. t.Fatal(err)
  212. }
  213. if !hec.connectionVerified {
  214. t.Fatal("By default connection should be verified")
  215. }
  216. splunkLoggerDriver, ok := loggerDriver.(*splunkLoggerInline)
  217. if !ok {
  218. t.Fatal("Unexpected Splunk Logging Driver type")
  219. }
  220. if splunkLoggerDriver.url != hec.URL()+"/services/collector/event/1.0" ||
  221. splunkLoggerDriver.auth != "Splunk "+hec.token ||
  222. splunkLoggerDriver.nullMessage.Host != hostname ||
  223. splunkLoggerDriver.nullMessage.Source != "mysource" ||
  224. splunkLoggerDriver.nullMessage.SourceType != "mysourcetype" ||
  225. splunkLoggerDriver.nullMessage.Index != "myindex" ||
  226. !splunkLoggerDriver.gzipCompression ||
  227. splunkLoggerDriver.gzipCompressionLevel != gzip.DefaultCompression ||
  228. splunkLoggerDriver.postMessagesFrequency != defaultPostMessagesFrequency ||
  229. splunkLoggerDriver.postMessagesBatchSize != defaultPostMessagesBatchSize ||
  230. splunkLoggerDriver.bufferMaximum != defaultBufferMaximum ||
  231. cap(splunkLoggerDriver.stream) != defaultStreamChannelSize {
  232. t.Fatal("Values do not match configuration.")
  233. }
  234. messageTime := time.Now()
  235. if err := loggerDriver.Log(&logger.Message{Line: []byte("1"), Source: "stdout", Timestamp: messageTime}); err != nil {
  236. t.Fatal(err)
  237. }
  238. err = loggerDriver.Close()
  239. if err != nil {
  240. t.Fatal(err)
  241. }
  242. if len(hec.messages) != 1 {
  243. t.Fatal("Expected one message")
  244. }
  245. if !*hec.gzipEnabled {
  246. t.Fatal("Gzip should be used")
  247. }
  248. message := hec.messages[0]
  249. if message.Time != fmt.Sprintf("%f", float64(messageTime.UnixNano())/float64(time.Second)) ||
  250. message.Host != hostname ||
  251. message.Source != "mysource" ||
  252. message.SourceType != "mysourcetype" ||
  253. message.Index != "myindex" {
  254. t.Fatalf("Unexpected values of message %v", message)
  255. }
  256. if event, err := message.EventAsMap(); err != nil {
  257. t.Fatal(err)
  258. } else {
  259. if event["line"] != "1" ||
  260. event["source"] != "stdout" ||
  261. event["tag"] != "container_image_name/container_name" ||
  262. event["attrs"].(map[string]interface{})["a"] != "b" ||
  263. event["attrs"].(map[string]interface{})["foo_finder"] != "bar" ||
  264. len(event) != 4 {
  265. t.Fatalf("Unexpected event in message %v", event)
  266. }
  267. }
  268. err = hec.Close()
  269. if err != nil {
  270. t.Fatal(err)
  271. }
  272. }
  273. // Verify JSON format
  274. func TestJsonFormat(t *testing.T) {
  275. hec := NewHTTPEventCollectorMock(t)
  276. go hec.Serve()
  277. info := logger.Info{
  278. Config: map[string]string{
  279. splunkURLKey: hec.URL(),
  280. splunkTokenKey: hec.token,
  281. splunkFormatKey: splunkFormatJSON,
  282. splunkGzipCompressionKey: "true",
  283. splunkGzipCompressionLevelKey: "1",
  284. },
  285. ContainerID: "containeriid",
  286. ContainerName: "/container_name",
  287. ContainerImageID: "contaimageid",
  288. ContainerImageName: "container_image_name",
  289. }
  290. hostname, err := info.Hostname()
  291. if err != nil {
  292. t.Fatal(err)
  293. }
  294. loggerDriver, err := New(info)
  295. if err != nil {
  296. t.Fatal(err)
  297. }
  298. if !hec.connectionVerified {
  299. t.Fatal("By default connection should be verified")
  300. }
  301. splunkLoggerDriver, ok := loggerDriver.(*splunkLoggerJSON)
  302. if !ok {
  303. t.Fatal("Unexpected Splunk Logging Driver type")
  304. }
  305. if splunkLoggerDriver.url != hec.URL()+"/services/collector/event/1.0" ||
  306. splunkLoggerDriver.auth != "Splunk "+hec.token ||
  307. splunkLoggerDriver.nullMessage.Host != hostname ||
  308. splunkLoggerDriver.nullMessage.Source != "" ||
  309. splunkLoggerDriver.nullMessage.SourceType != "" ||
  310. splunkLoggerDriver.nullMessage.Index != "" ||
  311. !splunkLoggerDriver.gzipCompression ||
  312. splunkLoggerDriver.gzipCompressionLevel != gzip.BestSpeed ||
  313. splunkLoggerDriver.postMessagesFrequency != defaultPostMessagesFrequency ||
  314. splunkLoggerDriver.postMessagesBatchSize != defaultPostMessagesBatchSize ||
  315. splunkLoggerDriver.bufferMaximum != defaultBufferMaximum ||
  316. cap(splunkLoggerDriver.stream) != defaultStreamChannelSize {
  317. t.Fatal("Values do not match configuration.")
  318. }
  319. message1Time := time.Now()
  320. if err := loggerDriver.Log(&logger.Message{Line: []byte("{\"a\":\"b\"}"), Source: "stdout", Timestamp: message1Time}); err != nil {
  321. t.Fatal(err)
  322. }
  323. message2Time := time.Now()
  324. if err := loggerDriver.Log(&logger.Message{Line: []byte("notjson"), Source: "stdout", Timestamp: message2Time}); err != nil {
  325. t.Fatal(err)
  326. }
  327. err = loggerDriver.Close()
  328. if err != nil {
  329. t.Fatal(err)
  330. }
  331. if len(hec.messages) != 2 {
  332. t.Fatal("Expected two messages")
  333. }
  334. message1 := hec.messages[0]
  335. if message1.Time != fmt.Sprintf("%f", float64(message1Time.UnixNano())/float64(time.Second)) ||
  336. message1.Host != hostname ||
  337. message1.Source != "" ||
  338. message1.SourceType != "" ||
  339. message1.Index != "" {
  340. t.Fatalf("Unexpected values of message 1 %v", message1)
  341. }
  342. if event, err := message1.EventAsMap(); err != nil {
  343. t.Fatal(err)
  344. } else {
  345. if event["line"].(map[string]interface{})["a"] != "b" ||
  346. event["source"] != "stdout" ||
  347. event["tag"] != "containeriid" ||
  348. len(event) != 3 {
  349. t.Fatalf("Unexpected event in message 1 %v", event)
  350. }
  351. }
  352. message2 := hec.messages[1]
  353. if message2.Time != fmt.Sprintf("%f", float64(message2Time.UnixNano())/float64(time.Second)) ||
  354. message2.Host != hostname ||
  355. message2.Source != "" ||
  356. message2.SourceType != "" ||
  357. message2.Index != "" {
  358. t.Fatalf("Unexpected values of message 2 %v", message2)
  359. }
  360. // If message cannot be parsed as JSON - it should be sent as a line
  361. if event, err := message2.EventAsMap(); err != nil {
  362. t.Fatal(err)
  363. } else {
  364. if event["line"] != "notjson" ||
  365. event["source"] != "stdout" ||
  366. event["tag"] != "containeriid" ||
  367. len(event) != 3 {
  368. t.Fatalf("Unexpected event in message 2 %v", event)
  369. }
  370. }
  371. err = hec.Close()
  372. if err != nil {
  373. t.Fatal(err)
  374. }
  375. }
  376. // Verify raw format
  377. func TestRawFormat(t *testing.T) {
  378. hec := NewHTTPEventCollectorMock(t)
  379. go hec.Serve()
  380. info := logger.Info{
  381. Config: map[string]string{
  382. splunkURLKey: hec.URL(),
  383. splunkTokenKey: hec.token,
  384. splunkFormatKey: splunkFormatRaw,
  385. },
  386. ContainerID: "containeriid",
  387. ContainerName: "/container_name",
  388. ContainerImageID: "contaimageid",
  389. ContainerImageName: "container_image_name",
  390. }
  391. hostname, err := info.Hostname()
  392. require.NoError(t, err)
  393. loggerDriver, err := New(info)
  394. require.NoError(t, err)
  395. if !hec.connectionVerified {
  396. t.Fatal("By default connection should be verified")
  397. }
  398. splunkLoggerDriver, ok := loggerDriver.(*splunkLoggerRaw)
  399. if !ok {
  400. t.Fatal("Unexpected Splunk Logging Driver type")
  401. }
  402. if splunkLoggerDriver.url != hec.URL()+"/services/collector/event/1.0" ||
  403. splunkLoggerDriver.auth != "Splunk "+hec.token ||
  404. splunkLoggerDriver.nullMessage.Host != hostname ||
  405. splunkLoggerDriver.nullMessage.Source != "" ||
  406. splunkLoggerDriver.nullMessage.SourceType != "" ||
  407. splunkLoggerDriver.nullMessage.Index != "" ||
  408. splunkLoggerDriver.gzipCompression ||
  409. splunkLoggerDriver.postMessagesFrequency != defaultPostMessagesFrequency ||
  410. splunkLoggerDriver.postMessagesBatchSize != defaultPostMessagesBatchSize ||
  411. splunkLoggerDriver.bufferMaximum != defaultBufferMaximum ||
  412. cap(splunkLoggerDriver.stream) != defaultStreamChannelSize ||
  413. string(splunkLoggerDriver.prefix) != "containeriid " {
  414. t.Fatal("Values do not match configuration.")
  415. }
  416. message1Time := time.Now()
  417. if err := loggerDriver.Log(&logger.Message{Line: []byte("{\"a\":\"b\"}"), Source: "stdout", Timestamp: message1Time}); err != nil {
  418. t.Fatal(err)
  419. }
  420. message2Time := time.Now()
  421. if err := loggerDriver.Log(&logger.Message{Line: []byte("notjson"), Source: "stdout", Timestamp: message2Time}); err != nil {
  422. t.Fatal(err)
  423. }
  424. err = loggerDriver.Close()
  425. if err != nil {
  426. t.Fatal(err)
  427. }
  428. if len(hec.messages) != 2 {
  429. t.Fatal("Expected two messages")
  430. }
  431. message1 := hec.messages[0]
  432. if message1.Time != fmt.Sprintf("%f", float64(message1Time.UnixNano())/float64(time.Second)) ||
  433. message1.Host != hostname ||
  434. message1.Source != "" ||
  435. message1.SourceType != "" ||
  436. message1.Index != "" {
  437. t.Fatalf("Unexpected values of message 1 %v", message1)
  438. }
  439. if event, err := message1.EventAsString(); err != nil {
  440. t.Fatal(err)
  441. } else {
  442. if event != "containeriid {\"a\":\"b\"}" {
  443. t.Fatalf("Unexpected event in message 1 %v", event)
  444. }
  445. }
  446. message2 := hec.messages[1]
  447. if message2.Time != fmt.Sprintf("%f", float64(message2Time.UnixNano())/float64(time.Second)) ||
  448. message2.Host != hostname ||
  449. message2.Source != "" ||
  450. message2.SourceType != "" ||
  451. message2.Index != "" {
  452. t.Fatalf("Unexpected values of message 2 %v", message2)
  453. }
  454. if event, err := message2.EventAsString(); err != nil {
  455. t.Fatal(err)
  456. } else {
  457. if event != "containeriid notjson" {
  458. t.Fatalf("Unexpected event in message 1 %v", event)
  459. }
  460. }
  461. err = hec.Close()
  462. if err != nil {
  463. t.Fatal(err)
  464. }
  465. }
  466. // Verify raw format with labels
  467. func TestRawFormatWithLabels(t *testing.T) {
  468. hec := NewHTTPEventCollectorMock(t)
  469. go hec.Serve()
  470. info := logger.Info{
  471. Config: map[string]string{
  472. splunkURLKey: hec.URL(),
  473. splunkTokenKey: hec.token,
  474. splunkFormatKey: splunkFormatRaw,
  475. labelsKey: "a",
  476. },
  477. ContainerID: "containeriid",
  478. ContainerName: "/container_name",
  479. ContainerImageID: "contaimageid",
  480. ContainerImageName: "container_image_name",
  481. ContainerLabels: map[string]string{
  482. "a": "b",
  483. },
  484. }
  485. hostname, err := info.Hostname()
  486. if err != nil {
  487. t.Fatal(err)
  488. }
  489. loggerDriver, err := New(info)
  490. if err != nil {
  491. t.Fatal(err)
  492. }
  493. if !hec.connectionVerified {
  494. t.Fatal("By default connection should be verified")
  495. }
  496. splunkLoggerDriver, ok := loggerDriver.(*splunkLoggerRaw)
  497. if !ok {
  498. t.Fatal("Unexpected Splunk Logging Driver type")
  499. }
  500. if splunkLoggerDriver.url != hec.URL()+"/services/collector/event/1.0" ||
  501. splunkLoggerDriver.auth != "Splunk "+hec.token ||
  502. splunkLoggerDriver.nullMessage.Host != hostname ||
  503. splunkLoggerDriver.nullMessage.Source != "" ||
  504. splunkLoggerDriver.nullMessage.SourceType != "" ||
  505. splunkLoggerDriver.nullMessage.Index != "" ||
  506. splunkLoggerDriver.gzipCompression ||
  507. splunkLoggerDriver.postMessagesFrequency != defaultPostMessagesFrequency ||
  508. splunkLoggerDriver.postMessagesBatchSize != defaultPostMessagesBatchSize ||
  509. splunkLoggerDriver.bufferMaximum != defaultBufferMaximum ||
  510. cap(splunkLoggerDriver.stream) != defaultStreamChannelSize ||
  511. string(splunkLoggerDriver.prefix) != "containeriid a=b " {
  512. t.Fatal("Values do not match configuration.")
  513. }
  514. message1Time := time.Now()
  515. if err := loggerDriver.Log(&logger.Message{Line: []byte("{\"a\":\"b\"}"), Source: "stdout", Timestamp: message1Time}); err != nil {
  516. t.Fatal(err)
  517. }
  518. message2Time := time.Now()
  519. if err := loggerDriver.Log(&logger.Message{Line: []byte("notjson"), Source: "stdout", Timestamp: message2Time}); err != nil {
  520. t.Fatal(err)
  521. }
  522. err = loggerDriver.Close()
  523. if err != nil {
  524. t.Fatal(err)
  525. }
  526. if len(hec.messages) != 2 {
  527. t.Fatal("Expected two messages")
  528. }
  529. message1 := hec.messages[0]
  530. if message1.Time != fmt.Sprintf("%f", float64(message1Time.UnixNano())/float64(time.Second)) ||
  531. message1.Host != hostname ||
  532. message1.Source != "" ||
  533. message1.SourceType != "" ||
  534. message1.Index != "" {
  535. t.Fatalf("Unexpected values of message 1 %v", message1)
  536. }
  537. if event, err := message1.EventAsString(); err != nil {
  538. t.Fatal(err)
  539. } else {
  540. if event != "containeriid a=b {\"a\":\"b\"}" {
  541. t.Fatalf("Unexpected event in message 1 %v", event)
  542. }
  543. }
  544. message2 := hec.messages[1]
  545. if message2.Time != fmt.Sprintf("%f", float64(message2Time.UnixNano())/float64(time.Second)) ||
  546. message2.Host != hostname ||
  547. message2.Source != "" ||
  548. message2.SourceType != "" ||
  549. message2.Index != "" {
  550. t.Fatalf("Unexpected values of message 2 %v", message2)
  551. }
  552. if event, err := message2.EventAsString(); err != nil {
  553. t.Fatal(err)
  554. } else {
  555. if event != "containeriid a=b notjson" {
  556. t.Fatalf("Unexpected event in message 2 %v", event)
  557. }
  558. }
  559. err = hec.Close()
  560. if err != nil {
  561. t.Fatal(err)
  562. }
  563. }
  564. // Verify that Splunk Logging Driver can accept tag="" which will allow to send raw messages
  565. // in the same way we get them in stdout/stderr
  566. func TestRawFormatWithoutTag(t *testing.T) {
  567. hec := NewHTTPEventCollectorMock(t)
  568. go hec.Serve()
  569. info := logger.Info{
  570. Config: map[string]string{
  571. splunkURLKey: hec.URL(),
  572. splunkTokenKey: hec.token,
  573. splunkFormatKey: splunkFormatRaw,
  574. tagKey: "",
  575. },
  576. ContainerID: "containeriid",
  577. ContainerName: "/container_name",
  578. ContainerImageID: "contaimageid",
  579. ContainerImageName: "container_image_name",
  580. }
  581. hostname, err := info.Hostname()
  582. if err != nil {
  583. t.Fatal(err)
  584. }
  585. loggerDriver, err := New(info)
  586. if err != nil {
  587. t.Fatal(err)
  588. }
  589. if !hec.connectionVerified {
  590. t.Fatal("By default connection should be verified")
  591. }
  592. splunkLoggerDriver, ok := loggerDriver.(*splunkLoggerRaw)
  593. if !ok {
  594. t.Fatal("Unexpected Splunk Logging Driver type")
  595. }
  596. if splunkLoggerDriver.url != hec.URL()+"/services/collector/event/1.0" ||
  597. splunkLoggerDriver.auth != "Splunk "+hec.token ||
  598. splunkLoggerDriver.nullMessage.Host != hostname ||
  599. splunkLoggerDriver.nullMessage.Source != "" ||
  600. splunkLoggerDriver.nullMessage.SourceType != "" ||
  601. splunkLoggerDriver.nullMessage.Index != "" ||
  602. splunkLoggerDriver.gzipCompression ||
  603. splunkLoggerDriver.postMessagesFrequency != defaultPostMessagesFrequency ||
  604. splunkLoggerDriver.postMessagesBatchSize != defaultPostMessagesBatchSize ||
  605. splunkLoggerDriver.bufferMaximum != defaultBufferMaximum ||
  606. cap(splunkLoggerDriver.stream) != defaultStreamChannelSize ||
  607. string(splunkLoggerDriver.prefix) != "" {
  608. t.Log(string(splunkLoggerDriver.prefix) + "a")
  609. t.Fatal("Values do not match configuration.")
  610. }
  611. message1Time := time.Now()
  612. if err := loggerDriver.Log(&logger.Message{Line: []byte("{\"a\":\"b\"}"), Source: "stdout", Timestamp: message1Time}); err != nil {
  613. t.Fatal(err)
  614. }
  615. message2Time := time.Now()
  616. if err := loggerDriver.Log(&logger.Message{Line: []byte("notjson"), Source: "stdout", Timestamp: message2Time}); err != nil {
  617. t.Fatal(err)
  618. }
  619. message3Time := time.Now()
  620. if err := loggerDriver.Log(&logger.Message{Line: []byte(" "), Source: "stdout", Timestamp: message3Time}); err != nil {
  621. t.Fatal(err)
  622. }
  623. err = loggerDriver.Close()
  624. if err != nil {
  625. t.Fatal(err)
  626. }
  627. // message3 would have an empty or whitespace only string in the "event" field
  628. // both of which are not acceptable to HEC
  629. // thus here we must expect 2 messages, not 3
  630. if len(hec.messages) != 2 {
  631. t.Fatal("Expected two messages")
  632. }
  633. message1 := hec.messages[0]
  634. if message1.Time != fmt.Sprintf("%f", float64(message1Time.UnixNano())/float64(time.Second)) ||
  635. message1.Host != hostname ||
  636. message1.Source != "" ||
  637. message1.SourceType != "" ||
  638. message1.Index != "" {
  639. t.Fatalf("Unexpected values of message 1 %v", message1)
  640. }
  641. if event, err := message1.EventAsString(); err != nil {
  642. t.Fatal(err)
  643. } else {
  644. if event != "{\"a\":\"b\"}" {
  645. t.Fatalf("Unexpected event in message 1 %v", event)
  646. }
  647. }
  648. message2 := hec.messages[1]
  649. if message2.Time != fmt.Sprintf("%f", float64(message2Time.UnixNano())/float64(time.Second)) ||
  650. message2.Host != hostname ||
  651. message2.Source != "" ||
  652. message2.SourceType != "" ||
  653. message2.Index != "" {
  654. t.Fatalf("Unexpected values of message 2 %v", message2)
  655. }
  656. if event, err := message2.EventAsString(); err != nil {
  657. t.Fatal(err)
  658. } else {
  659. if event != "notjson" {
  660. t.Fatalf("Unexpected event in message 2 %v", event)
  661. }
  662. }
  663. err = hec.Close()
  664. if err != nil {
  665. t.Fatal(err)
  666. }
  667. }
  668. // Verify that we will send messages in batches with default batching parameters,
  669. // but change frequency to be sure that numOfRequests will match expected 17 requests
  670. func TestBatching(t *testing.T) {
  671. if err := os.Setenv(envVarPostMessagesFrequency, "10h"); err != nil {
  672. t.Fatal(err)
  673. }
  674. hec := NewHTTPEventCollectorMock(t)
  675. go hec.Serve()
  676. info := logger.Info{
  677. Config: map[string]string{
  678. splunkURLKey: hec.URL(),
  679. splunkTokenKey: hec.token,
  680. },
  681. ContainerID: "containeriid",
  682. ContainerName: "/container_name",
  683. ContainerImageID: "contaimageid",
  684. ContainerImageName: "container_image_name",
  685. }
  686. loggerDriver, err := New(info)
  687. if err != nil {
  688. t.Fatal(err)
  689. }
  690. for i := 0; i < defaultStreamChannelSize*4; i++ {
  691. if err := loggerDriver.Log(&logger.Message{Line: []byte(fmt.Sprintf("%d", i)), Source: "stdout", Timestamp: time.Now()}); err != nil {
  692. t.Fatal(err)
  693. }
  694. }
  695. err = loggerDriver.Close()
  696. if err != nil {
  697. t.Fatal(err)
  698. }
  699. if len(hec.messages) != defaultStreamChannelSize*4 {
  700. t.Fatal("Not all messages delivered")
  701. }
  702. for i, message := range hec.messages {
  703. if event, err := message.EventAsMap(); err != nil {
  704. t.Fatal(err)
  705. } else {
  706. if event["line"] != fmt.Sprintf("%d", i) {
  707. t.Fatalf("Unexpected event in message %v", event)
  708. }
  709. }
  710. }
  711. // 1 to verify connection and 16 batches
  712. if hec.numOfRequests != 17 {
  713. t.Fatalf("Unexpected number of requests %d", hec.numOfRequests)
  714. }
  715. err = hec.Close()
  716. if err != nil {
  717. t.Fatal(err)
  718. }
  719. if err := os.Setenv(envVarPostMessagesFrequency, ""); err != nil {
  720. t.Fatal(err)
  721. }
  722. }
  723. // Verify that test is using time to fire events not rare than specified frequency
  724. func TestFrequency(t *testing.T) {
  725. if err := os.Setenv(envVarPostMessagesFrequency, "5ms"); err != nil {
  726. t.Fatal(err)
  727. }
  728. hec := NewHTTPEventCollectorMock(t)
  729. go hec.Serve()
  730. info := logger.Info{
  731. Config: map[string]string{
  732. splunkURLKey: hec.URL(),
  733. splunkTokenKey: hec.token,
  734. },
  735. ContainerID: "containeriid",
  736. ContainerName: "/container_name",
  737. ContainerImageID: "contaimageid",
  738. ContainerImageName: "container_image_name",
  739. }
  740. loggerDriver, err := New(info)
  741. if err != nil {
  742. t.Fatal(err)
  743. }
  744. for i := 0; i < 10; i++ {
  745. if err := loggerDriver.Log(&logger.Message{Line: []byte(fmt.Sprintf("%d", i)), Source: "stdout", Timestamp: time.Now()}); err != nil {
  746. t.Fatal(err)
  747. }
  748. time.Sleep(15 * time.Millisecond)
  749. }
  750. err = loggerDriver.Close()
  751. if err != nil {
  752. t.Fatal(err)
  753. }
  754. if len(hec.messages) != 10 {
  755. t.Fatal("Not all messages delivered")
  756. }
  757. for i, message := range hec.messages {
  758. if event, err := message.EventAsMap(); err != nil {
  759. t.Fatal(err)
  760. } else {
  761. if event["line"] != fmt.Sprintf("%d", i) {
  762. t.Fatalf("Unexpected event in message %v", event)
  763. }
  764. }
  765. }
  766. // 1 to verify connection and 10 to verify that we have sent messages with required frequency,
  767. // but because frequency is too small (to keep test quick), instead of 11, use 9 if context switches will be slow
  768. if hec.numOfRequests < 9 {
  769. t.Fatalf("Unexpected number of requests %d", hec.numOfRequests)
  770. }
  771. err = hec.Close()
  772. if err != nil {
  773. t.Fatal(err)
  774. }
  775. if err := os.Setenv(envVarPostMessagesFrequency, ""); err != nil {
  776. t.Fatal(err)
  777. }
  778. }
  779. // Simulate behavior similar to first version of Splunk Logging Driver, when we were sending one message
  780. // per request
  781. func TestOneMessagePerRequest(t *testing.T) {
  782. if err := os.Setenv(envVarPostMessagesFrequency, "10h"); err != nil {
  783. t.Fatal(err)
  784. }
  785. if err := os.Setenv(envVarPostMessagesBatchSize, "1"); err != nil {
  786. t.Fatal(err)
  787. }
  788. if err := os.Setenv(envVarBufferMaximum, "1"); err != nil {
  789. t.Fatal(err)
  790. }
  791. if err := os.Setenv(envVarStreamChannelSize, "0"); err != nil {
  792. t.Fatal(err)
  793. }
  794. hec := NewHTTPEventCollectorMock(t)
  795. go hec.Serve()
  796. info := logger.Info{
  797. Config: map[string]string{
  798. splunkURLKey: hec.URL(),
  799. splunkTokenKey: hec.token,
  800. },
  801. ContainerID: "containeriid",
  802. ContainerName: "/container_name",
  803. ContainerImageID: "contaimageid",
  804. ContainerImageName: "container_image_name",
  805. }
  806. loggerDriver, err := New(info)
  807. if err != nil {
  808. t.Fatal(err)
  809. }
  810. for i := 0; i < 10; i++ {
  811. if err := loggerDriver.Log(&logger.Message{Line: []byte(fmt.Sprintf("%d", i)), Source: "stdout", Timestamp: time.Now()}); err != nil {
  812. t.Fatal(err)
  813. }
  814. }
  815. err = loggerDriver.Close()
  816. if err != nil {
  817. t.Fatal(err)
  818. }
  819. if len(hec.messages) != 10 {
  820. t.Fatal("Not all messages delivered")
  821. }
  822. for i, message := range hec.messages {
  823. if event, err := message.EventAsMap(); err != nil {
  824. t.Fatal(err)
  825. } else {
  826. if event["line"] != fmt.Sprintf("%d", i) {
  827. t.Fatalf("Unexpected event in message %v", event)
  828. }
  829. }
  830. }
  831. // 1 to verify connection and 10 messages
  832. if hec.numOfRequests != 11 {
  833. t.Fatalf("Unexpected number of requests %d", hec.numOfRequests)
  834. }
  835. err = hec.Close()
  836. if err != nil {
  837. t.Fatal(err)
  838. }
  839. if err := os.Setenv(envVarPostMessagesFrequency, ""); err != nil {
  840. t.Fatal(err)
  841. }
  842. if err := os.Setenv(envVarPostMessagesBatchSize, ""); err != nil {
  843. t.Fatal(err)
  844. }
  845. if err := os.Setenv(envVarBufferMaximum, ""); err != nil {
  846. t.Fatal(err)
  847. }
  848. if err := os.Setenv(envVarStreamChannelSize, ""); err != nil {
  849. t.Fatal(err)
  850. }
  851. }
  852. // Driver should not be created when HEC is unresponsive
  853. func TestVerify(t *testing.T) {
  854. hec := NewHTTPEventCollectorMock(t)
  855. hec.simulateServerError = true
  856. go hec.Serve()
  857. info := logger.Info{
  858. Config: map[string]string{
  859. splunkURLKey: hec.URL(),
  860. splunkTokenKey: hec.token,
  861. },
  862. ContainerID: "containeriid",
  863. ContainerName: "/container_name",
  864. ContainerImageID: "contaimageid",
  865. ContainerImageName: "container_image_name",
  866. }
  867. _, err := New(info)
  868. if err == nil {
  869. t.Fatal("Expecting driver to fail, when server is unresponsive")
  870. }
  871. err = hec.Close()
  872. if err != nil {
  873. t.Fatal(err)
  874. }
  875. }
  876. // Verify that user can specify to skip verification that Splunk HEC is working.
  877. // Also in this test we verify retry logic.
  878. func TestSkipVerify(t *testing.T) {
  879. hec := NewHTTPEventCollectorMock(t)
  880. hec.simulateServerError = true
  881. go hec.Serve()
  882. info := logger.Info{
  883. Config: map[string]string{
  884. splunkURLKey: hec.URL(),
  885. splunkTokenKey: hec.token,
  886. splunkVerifyConnectionKey: "false",
  887. },
  888. ContainerID: "containeriid",
  889. ContainerName: "/container_name",
  890. ContainerImageID: "contaimageid",
  891. ContainerImageName: "container_image_name",
  892. }
  893. loggerDriver, err := New(info)
  894. if err != nil {
  895. t.Fatal(err)
  896. }
  897. if hec.connectionVerified {
  898. t.Fatal("Connection should not be verified")
  899. }
  900. for i := 0; i < defaultStreamChannelSize*2; i++ {
  901. if err := loggerDriver.Log(&logger.Message{Line: []byte(fmt.Sprintf("%d", i)), Source: "stdout", Timestamp: time.Now()}); err != nil {
  902. t.Fatal(err)
  903. }
  904. }
  905. if len(hec.messages) != 0 {
  906. t.Fatal("No messages should be accepted at this point")
  907. }
  908. hec.simulateServerError = false
  909. for i := defaultStreamChannelSize * 2; i < defaultStreamChannelSize*4; i++ {
  910. if err := loggerDriver.Log(&logger.Message{Line: []byte(fmt.Sprintf("%d", i)), Source: "stdout", Timestamp: time.Now()}); err != nil {
  911. t.Fatal(err)
  912. }
  913. }
  914. err = loggerDriver.Close()
  915. if err != nil {
  916. t.Fatal(err)
  917. }
  918. if len(hec.messages) != defaultStreamChannelSize*4 {
  919. t.Fatal("Not all messages delivered")
  920. }
  921. for i, message := range hec.messages {
  922. if event, err := message.EventAsMap(); err != nil {
  923. t.Fatal(err)
  924. } else {
  925. if event["line"] != fmt.Sprintf("%d", i) {
  926. t.Fatalf("Unexpected event in message %v", event)
  927. }
  928. }
  929. }
  930. err = hec.Close()
  931. if err != nil {
  932. t.Fatal(err)
  933. }
  934. }
  935. // Verify logic for when we filled whole buffer
  936. func TestBufferMaximum(t *testing.T) {
  937. if err := os.Setenv(envVarPostMessagesBatchSize, "2"); err != nil {
  938. t.Fatal(err)
  939. }
  940. if err := os.Setenv(envVarBufferMaximum, "10"); err != nil {
  941. t.Fatal(err)
  942. }
  943. if err := os.Setenv(envVarStreamChannelSize, "0"); err != nil {
  944. t.Fatal(err)
  945. }
  946. hec := NewHTTPEventCollectorMock(t)
  947. hec.simulateServerError = true
  948. go hec.Serve()
  949. info := logger.Info{
  950. Config: map[string]string{
  951. splunkURLKey: hec.URL(),
  952. splunkTokenKey: hec.token,
  953. splunkVerifyConnectionKey: "false",
  954. },
  955. ContainerID: "containeriid",
  956. ContainerName: "/container_name",
  957. ContainerImageID: "contaimageid",
  958. ContainerImageName: "container_image_name",
  959. }
  960. loggerDriver, err := New(info)
  961. if err != nil {
  962. t.Fatal(err)
  963. }
  964. if hec.connectionVerified {
  965. t.Fatal("Connection should not be verified")
  966. }
  967. for i := 0; i < 11; i++ {
  968. if err := loggerDriver.Log(&logger.Message{Line: []byte(fmt.Sprintf("%d", i)), Source: "stdout", Timestamp: time.Now()}); err != nil {
  969. t.Fatal(err)
  970. }
  971. }
  972. if len(hec.messages) != 0 {
  973. t.Fatal("No messages should be accepted at this point")
  974. }
  975. hec.simulateServerError = false
  976. err = loggerDriver.Close()
  977. if err != nil {
  978. t.Fatal(err)
  979. }
  980. if len(hec.messages) != 9 {
  981. t.Fatalf("Expected # of messages %d, got %d", 9, len(hec.messages))
  982. }
  983. // First 1000 messages are written to daemon log when buffer was full
  984. for i, message := range hec.messages {
  985. if event, err := message.EventAsMap(); err != nil {
  986. t.Fatal(err)
  987. } else {
  988. if event["line"] != fmt.Sprintf("%d", i+2) {
  989. t.Fatalf("Unexpected event in message %v", event)
  990. }
  991. }
  992. }
  993. err = hec.Close()
  994. if err != nil {
  995. t.Fatal(err)
  996. }
  997. if err := os.Setenv(envVarPostMessagesBatchSize, ""); err != nil {
  998. t.Fatal(err)
  999. }
  1000. if err := os.Setenv(envVarBufferMaximum, ""); err != nil {
  1001. t.Fatal(err)
  1002. }
  1003. if err := os.Setenv(envVarStreamChannelSize, ""); err != nil {
  1004. t.Fatal(err)
  1005. }
  1006. }
  1007. // Verify that we are not blocking close when HEC is down for the whole time
  1008. func TestServerAlwaysDown(t *testing.T) {
  1009. if err := os.Setenv(envVarPostMessagesBatchSize, "2"); err != nil {
  1010. t.Fatal(err)
  1011. }
  1012. if err := os.Setenv(envVarBufferMaximum, "4"); err != nil {
  1013. t.Fatal(err)
  1014. }
  1015. if err := os.Setenv(envVarStreamChannelSize, "0"); err != nil {
  1016. t.Fatal(err)
  1017. }
  1018. hec := NewHTTPEventCollectorMock(t)
  1019. hec.simulateServerError = true
  1020. go hec.Serve()
  1021. info := logger.Info{
  1022. Config: map[string]string{
  1023. splunkURLKey: hec.URL(),
  1024. splunkTokenKey: hec.token,
  1025. splunkVerifyConnectionKey: "false",
  1026. },
  1027. ContainerID: "containeriid",
  1028. ContainerName: "/container_name",
  1029. ContainerImageID: "contaimageid",
  1030. ContainerImageName: "container_image_name",
  1031. }
  1032. loggerDriver, err := New(info)
  1033. if err != nil {
  1034. t.Fatal(err)
  1035. }
  1036. if hec.connectionVerified {
  1037. t.Fatal("Connection should not be verified")
  1038. }
  1039. for i := 0; i < 5; i++ {
  1040. if err := loggerDriver.Log(&logger.Message{Line: []byte(fmt.Sprintf("%d", i)), Source: "stdout", Timestamp: time.Now()}); err != nil {
  1041. t.Fatal(err)
  1042. }
  1043. }
  1044. err = loggerDriver.Close()
  1045. if err != nil {
  1046. t.Fatal(err)
  1047. }
  1048. if len(hec.messages) != 0 {
  1049. t.Fatal("No messages should be sent")
  1050. }
  1051. err = hec.Close()
  1052. if err != nil {
  1053. t.Fatal(err)
  1054. }
  1055. if err := os.Setenv(envVarPostMessagesBatchSize, ""); err != nil {
  1056. t.Fatal(err)
  1057. }
  1058. if err := os.Setenv(envVarBufferMaximum, ""); err != nil {
  1059. t.Fatal(err)
  1060. }
  1061. if err := os.Setenv(envVarStreamChannelSize, ""); err != nil {
  1062. t.Fatal(err)
  1063. }
  1064. }
  1065. // Cannot send messages after we close driver
  1066. func TestCannotSendAfterClose(t *testing.T) {
  1067. hec := NewHTTPEventCollectorMock(t)
  1068. go hec.Serve()
  1069. info := logger.Info{
  1070. Config: map[string]string{
  1071. splunkURLKey: hec.URL(),
  1072. splunkTokenKey: hec.token,
  1073. },
  1074. ContainerID: "containeriid",
  1075. ContainerName: "/container_name",
  1076. ContainerImageID: "contaimageid",
  1077. ContainerImageName: "container_image_name",
  1078. }
  1079. loggerDriver, err := New(info)
  1080. if err != nil {
  1081. t.Fatal(err)
  1082. }
  1083. if err := loggerDriver.Log(&logger.Message{Line: []byte("message1"), Source: "stdout", Timestamp: time.Now()}); err != nil {
  1084. t.Fatal(err)
  1085. }
  1086. err = loggerDriver.Close()
  1087. if err != nil {
  1088. t.Fatal(err)
  1089. }
  1090. if err := loggerDriver.Log(&logger.Message{Line: []byte("message2"), Source: "stdout", Timestamp: time.Now()}); err == nil {
  1091. t.Fatal("Driver should not allow to send messages after close")
  1092. }
  1093. if len(hec.messages) != 1 {
  1094. t.Fatal("Only one message should be sent")
  1095. }
  1096. message := hec.messages[0]
  1097. if event, err := message.EventAsMap(); err != nil {
  1098. t.Fatal(err)
  1099. } else {
  1100. if event["line"] != "message1" {
  1101. t.Fatalf("Unexpected event in message %v", event)
  1102. }
  1103. }
  1104. err = hec.Close()
  1105. if err != nil {
  1106. t.Fatal(err)
  1107. }
  1108. }