splunk_test.go 33 KB


  1. package splunk
  2. import (
  3. "compress/gzip"
  4. "fmt"
  5. "os"
  6. "testing"
  7. "time"
  8. "github.com/docker/docker/daemon/logger"
  9. "github.com/stretchr/testify/require"
  10. )
  11. // Validate options
  12. func TestValidateLogOpt(t *testing.T) {
  13. err := ValidateLogOpt(map[string]string{
  14. splunkURLKey: "http://127.0.0.1",
  15. splunkTokenKey: "2160C7EF-2CE9-4307-A180-F852B99CF417",
  16. splunkSourceKey: "mysource",
  17. splunkSourceTypeKey: "mysourcetype",
  18. splunkIndexKey: "myindex",
  19. splunkCAPathKey: "/usr/cert.pem",
  20. splunkCANameKey: "ca_name",
  21. splunkInsecureSkipVerifyKey: "true",
  22. splunkFormatKey: "json",
  23. splunkVerifyConnectionKey: "true",
  24. splunkGzipCompressionKey: "true",
  25. splunkGzipCompressionLevelKey: "1",
  26. envKey: "a",
  27. envRegexKey: "^foo",
  28. labelsKey: "b",
  29. tagKey: "c",
  30. })
  31. if err != nil {
  32. t.Fatal(err)
  33. }
  34. err = ValidateLogOpt(map[string]string{
  35. "not-supported-option": "a",
  36. })
  37. if err == nil {
  38. t.Fatal("Expecting error on unsupported options")
  39. }
  40. }
  41. // Driver require user to specify required options
  42. func TestNewMissedConfig(t *testing.T) {
  43. info := logger.Info{
  44. Config: map[string]string{},
  45. }
  46. _, err := New(info)
  47. if err == nil {
  48. t.Fatal("Logger driver should fail when no required parameters specified")
  49. }
  50. }
  51. // Driver require user to specify splunk-url
  52. func TestNewMissedUrl(t *testing.T) {
  53. info := logger.Info{
  54. Config: map[string]string{
  55. splunkTokenKey: "4642492F-D8BD-47F1-A005-0C08AE4657DF",
  56. },
  57. }
  58. _, err := New(info)
  59. if err.Error() != "splunk: splunk-url is expected" {
  60. t.Fatal("Logger driver should fail when no required parameters specified")
  61. }
  62. }
  63. // Driver require user to specify splunk-token
  64. func TestNewMissedToken(t *testing.T) {
  65. info := logger.Info{
  66. Config: map[string]string{
  67. splunkURLKey: "http://127.0.0.1:8088",
  68. },
  69. }
  70. _, err := New(info)
  71. if err.Error() != "splunk: splunk-token is expected" {
  72. t.Fatal("Logger driver should fail when no required parameters specified")
  73. }
  74. }
  75. // Test default settings
  76. func TestDefault(t *testing.T) {
  77. hec := NewHTTPEventCollectorMock(t)
  78. go hec.Serve()
  79. info := logger.Info{
  80. Config: map[string]string{
  81. splunkURLKey: hec.URL(),
  82. splunkTokenKey: hec.token,
  83. },
  84. ContainerID: "containeriid",
  85. ContainerName: "container_name",
  86. ContainerImageID: "contaimageid",
  87. ContainerImageName: "container_image_name",
  88. }
  89. hostname, err := info.Hostname()
  90. if err != nil {
  91. t.Fatal(err)
  92. }
  93. loggerDriver, err := New(info)
  94. if err != nil {
  95. t.Fatal(err)
  96. }
  97. if loggerDriver.Name() != driverName {
  98. t.Fatal("Unexpected logger driver name")
  99. }
  100. if !hec.connectionVerified {
  101. t.Fatal("By default connection should be verified")
  102. }
  103. splunkLoggerDriver, ok := loggerDriver.(*splunkLoggerInline)
  104. if !ok {
  105. t.Fatal("Unexpected Splunk Logging Driver type")
  106. }
  107. if splunkLoggerDriver.url != hec.URL()+"/services/collector/event/1.0" ||
  108. splunkLoggerDriver.auth != "Splunk "+hec.token ||
  109. splunkLoggerDriver.nullMessage.Host != hostname ||
  110. splunkLoggerDriver.nullMessage.Source != "" ||
  111. splunkLoggerDriver.nullMessage.SourceType != "" ||
  112. splunkLoggerDriver.nullMessage.Index != "" ||
  113. splunkLoggerDriver.gzipCompression ||
  114. splunkLoggerDriver.postMessagesFrequency != defaultPostMessagesFrequency ||
  115. splunkLoggerDriver.postMessagesBatchSize != defaultPostMessagesBatchSize ||
  116. splunkLoggerDriver.bufferMaximum != defaultBufferMaximum ||
  117. cap(splunkLoggerDriver.stream) != defaultStreamChannelSize {
  118. t.Fatal("Found not default values setup in Splunk Logging Driver.")
  119. }
  120. message1Time := time.Now()
  121. if err := loggerDriver.Log(&logger.Message{Line: []byte("{\"a\":\"b\"}"), Source: "stdout", Timestamp: message1Time}); err != nil {
  122. t.Fatal(err)
  123. }
  124. message2Time := time.Now()
  125. if err := loggerDriver.Log(&logger.Message{Line: []byte("notajson"), Source: "stdout", Timestamp: message2Time}); err != nil {
  126. t.Fatal(err)
  127. }
  128. err = loggerDriver.Close()
  129. if err != nil {
  130. t.Fatal(err)
  131. }
  132. if len(hec.messages) != 2 {
  133. t.Fatal("Expected two messages")
  134. }
  135. if *hec.gzipEnabled {
  136. t.Fatal("Gzip should not be used")
  137. }
  138. message1 := hec.messages[0]
  139. if message1.Time != fmt.Sprintf("%f", float64(message1Time.UnixNano())/float64(time.Second)) ||
  140. message1.Host != hostname ||
  141. message1.Source != "" ||
  142. message1.SourceType != "" ||
  143. message1.Index != "" {
  144. t.Fatalf("Unexpected values of message 1 %v", message1)
  145. }
  146. if event, err := message1.EventAsMap(); err != nil {
  147. t.Fatal(err)
  148. } else {
  149. if event["line"] != "{\"a\":\"b\"}" ||
  150. event["source"] != "stdout" ||
  151. event["tag"] != "containeriid" ||
  152. len(event) != 3 {
  153. t.Fatalf("Unexpected event in message %v", event)
  154. }
  155. }
  156. message2 := hec.messages[1]
  157. if message2.Time != fmt.Sprintf("%f", float64(message2Time.UnixNano())/float64(time.Second)) ||
  158. message2.Host != hostname ||
  159. message2.Source != "" ||
  160. message2.SourceType != "" ||
  161. message2.Index != "" {
  162. t.Fatalf("Unexpected values of message 1 %v", message2)
  163. }
  164. if event, err := message2.EventAsMap(); err != nil {
  165. t.Fatal(err)
  166. } else {
  167. if event["line"] != "notajson" ||
  168. event["source"] != "stdout" ||
  169. event["tag"] != "containeriid" ||
  170. len(event) != 3 {
  171. t.Fatalf("Unexpected event in message %v", event)
  172. }
  173. }
  174. err = hec.Close()
  175. if err != nil {
  176. t.Fatal(err)
  177. }
  178. }
  179. // Verify inline format with a not default settings for most of options
  180. func TestInlineFormatWithNonDefaultOptions(t *testing.T) {
  181. hec := NewHTTPEventCollectorMock(t)
  182. go hec.Serve()
  183. info := logger.Info{
  184. Config: map[string]string{
  185. splunkURLKey: hec.URL(),
  186. splunkTokenKey: hec.token,
  187. splunkSourceKey: "mysource",
  188. splunkSourceTypeKey: "mysourcetype",
  189. splunkIndexKey: "myindex",
  190. splunkFormatKey: splunkFormatInline,
  191. splunkGzipCompressionKey: "true",
  192. tagKey: "{{.ImageName}}/{{.Name}}",
  193. labelsKey: "a",
  194. envRegexKey: "^foo",
  195. },
  196. ContainerID: "containeriid",
  197. ContainerName: "/container_name",
  198. ContainerImageID: "contaimageid",
  199. ContainerImageName: "container_image_name",
  200. ContainerLabels: map[string]string{
  201. "a": "b",
  202. },
  203. ContainerEnv: []string{"foo_finder=bar"},
  204. }
  205. hostname, err := info.Hostname()
  206. if err != nil {
  207. t.Fatal(err)
  208. }
  209. loggerDriver, err := New(info)
  210. if err != nil {
  211. t.Fatal(err)
  212. }
  213. if !hec.connectionVerified {
  214. t.Fatal("By default connection should be verified")
  215. }
  216. splunkLoggerDriver, ok := loggerDriver.(*splunkLoggerInline)
  217. if !ok {
  218. t.Fatal("Unexpected Splunk Logging Driver type")
  219. }
  220. if splunkLoggerDriver.url != hec.URL()+"/services/collector/event/1.0" ||
  221. splunkLoggerDriver.auth != "Splunk "+hec.token ||
  222. splunkLoggerDriver.nullMessage.Host != hostname ||
  223. splunkLoggerDriver.nullMessage.Source != "mysource" ||
  224. splunkLoggerDriver.nullMessage.SourceType != "mysourcetype" ||
  225. splunkLoggerDriver.nullMessage.Index != "myindex" ||
  226. !splunkLoggerDriver.gzipCompression ||
  227. splunkLoggerDriver.gzipCompressionLevel != gzip.DefaultCompression ||
  228. splunkLoggerDriver.postMessagesFrequency != defaultPostMessagesFrequency ||
  229. splunkLoggerDriver.postMessagesBatchSize != defaultPostMessagesBatchSize ||
  230. splunkLoggerDriver.bufferMaximum != defaultBufferMaximum ||
  231. cap(splunkLoggerDriver.stream) != defaultStreamChannelSize {
  232. t.Fatal("Values do not match configuration.")
  233. }
  234. messageTime := time.Now()
  235. if err := loggerDriver.Log(&logger.Message{Line: []byte("1"), Source: "stdout", Timestamp: messageTime}); err != nil {
  236. t.Fatal(err)
  237. }
  238. err = loggerDriver.Close()
  239. if err != nil {
  240. t.Fatal(err)
  241. }
  242. if len(hec.messages) != 1 {
  243. t.Fatal("Expected one message")
  244. }
  245. if !*hec.gzipEnabled {
  246. t.Fatal("Gzip should be used")
  247. }
  248. message := hec.messages[0]
  249. if message.Time != fmt.Sprintf("%f", float64(messageTime.UnixNano())/float64(time.Second)) ||
  250. message.Host != hostname ||
  251. message.Source != "mysource" ||
  252. message.SourceType != "mysourcetype" ||
  253. message.Index != "myindex" {
  254. t.Fatalf("Unexpected values of message %v", message)
  255. }
  256. if event, err := message.EventAsMap(); err != nil {
  257. t.Fatal(err)
  258. } else {
  259. if event["line"] != "1" ||
  260. event["source"] != "stdout" ||
  261. event["tag"] != "container_image_name/container_name" ||
  262. event["attrs"].(map[string]interface{})["a"] != "b" ||
  263. event["attrs"].(map[string]interface{})["foo_finder"] != "bar" ||
  264. len(event) != 4 {
  265. t.Fatalf("Unexpected event in message %v", event)
  266. }
  267. }
  268. err = hec.Close()
  269. if err != nil {
  270. t.Fatal(err)
  271. }
  272. }
  273. // Verify JSON format
  274. func TestJsonFormat(t *testing.T) {
  275. hec := NewHTTPEventCollectorMock(t)
  276. go hec.Serve()
  277. info := logger.Info{
  278. Config: map[string]string{
  279. splunkURLKey: hec.URL(),
  280. splunkTokenKey: hec.token,
  281. splunkFormatKey: splunkFormatJSON,
  282. splunkGzipCompressionKey: "true",
  283. splunkGzipCompressionLevelKey: "1",
  284. },
  285. ContainerID: "containeriid",
  286. ContainerName: "/container_name",
  287. ContainerImageID: "contaimageid",
  288. ContainerImageName: "container_image_name",
  289. }
  290. hostname, err := info.Hostname()
  291. if err != nil {
  292. t.Fatal(err)
  293. }
  294. loggerDriver, err := New(info)
  295. if err != nil {
  296. t.Fatal(err)
  297. }
  298. if !hec.connectionVerified {
  299. t.Fatal("By default connection should be verified")
  300. }
  301. splunkLoggerDriver, ok := loggerDriver.(*splunkLoggerJSON)
  302. if !ok {
  303. t.Fatal("Unexpected Splunk Logging Driver type")
  304. }
  305. if splunkLoggerDriver.url != hec.URL()+"/services/collector/event/1.0" ||
  306. splunkLoggerDriver.auth != "Splunk "+hec.token ||
  307. splunkLoggerDriver.nullMessage.Host != hostname ||
  308. splunkLoggerDriver.nullMessage.Source != "" ||
  309. splunkLoggerDriver.nullMessage.SourceType != "" ||
  310. splunkLoggerDriver.nullMessage.Index != "" ||
  311. !splunkLoggerDriver.gzipCompression ||
  312. splunkLoggerDriver.gzipCompressionLevel != gzip.BestSpeed ||
  313. splunkLoggerDriver.postMessagesFrequency != defaultPostMessagesFrequency ||
  314. splunkLoggerDriver.postMessagesBatchSize != defaultPostMessagesBatchSize ||
  315. splunkLoggerDriver.bufferMaximum != defaultBufferMaximum ||
  316. cap(splunkLoggerDriver.stream) != defaultStreamChannelSize {
  317. t.Fatal("Values do not match configuration.")
  318. }
  319. message1Time := time.Now()
  320. if err := loggerDriver.Log(&logger.Message{Line: []byte("{\"a\":\"b\"}"), Source: "stdout", Timestamp: message1Time}); err != nil {
  321. t.Fatal(err)
  322. }
  323. message2Time := time.Now()
  324. if err := loggerDriver.Log(&logger.Message{Line: []byte("notjson"), Source: "stdout", Timestamp: message2Time}); err != nil {
  325. t.Fatal(err)
  326. }
  327. err = loggerDriver.Close()
  328. if err != nil {
  329. t.Fatal(err)
  330. }
  331. if len(hec.messages) != 2 {
  332. t.Fatal("Expected two messages")
  333. }
  334. message1 := hec.messages[0]
  335. if message1.Time != fmt.Sprintf("%f", float64(message1Time.UnixNano())/float64(time.Second)) ||
  336. message1.Host != hostname ||
  337. message1.Source != "" ||
  338. message1.SourceType != "" ||
  339. message1.Index != "" {
  340. t.Fatalf("Unexpected values of message 1 %v", message1)
  341. }
  342. if event, err := message1.EventAsMap(); err != nil {
  343. t.Fatal(err)
  344. } else {
  345. if event["line"].(map[string]interface{})["a"] != "b" ||
  346. event["source"] != "stdout" ||
  347. event["tag"] != "containeriid" ||
  348. len(event) != 3 {
  349. t.Fatalf("Unexpected event in message 1 %v", event)
  350. }
  351. }
  352. message2 := hec.messages[1]
  353. if message2.Time != fmt.Sprintf("%f", float64(message2Time.UnixNano())/float64(time.Second)) ||
  354. message2.Host != hostname ||
  355. message2.Source != "" ||
  356. message2.SourceType != "" ||
  357. message2.Index != "" {
  358. t.Fatalf("Unexpected values of message 2 %v", message2)
  359. }
  360. // If message cannot be parsed as JSON - it should be sent as a line
  361. if event, err := message2.EventAsMap(); err != nil {
  362. t.Fatal(err)
  363. } else {
  364. if event["line"] != "notjson" ||
  365. event["source"] != "stdout" ||
  366. event["tag"] != "containeriid" ||
  367. len(event) != 3 {
  368. t.Fatalf("Unexpected event in message 2 %v", event)
  369. }
  370. }
  371. err = hec.Close()
  372. if err != nil {
  373. t.Fatal(err)
  374. }
  375. }
  376. // Verify raw format
  377. func TestRawFormat(t *testing.T) {
  378. hec := NewHTTPEventCollectorMock(t)
  379. go hec.Serve()
  380. info := logger.Info{
  381. Config: map[string]string{
  382. splunkURLKey: hec.URL(),
  383. splunkTokenKey: hec.token,
  384. splunkFormatKey: splunkFormatRaw,
  385. },
  386. ContainerID: "containeriid",
  387. ContainerName: "/container_name",
  388. ContainerImageID: "contaimageid",
  389. ContainerImageName: "container_image_name",
  390. }
  391. hostname, err := info.Hostname()
  392. require.NoError(t, err)
  393. loggerDriver, err := New(info)
  394. require.NoError(t, err)
  395. if !hec.connectionVerified {
  396. t.Fatal("By default connection should be verified")
  397. }
  398. splunkLoggerDriver, ok := loggerDriver.(*splunkLoggerRaw)
  399. if !ok {
  400. t.Fatal("Unexpected Splunk Logging Driver type")
  401. }
  402. if splunkLoggerDriver.url != hec.URL()+"/services/collector/event/1.0" ||
  403. splunkLoggerDriver.auth != "Splunk "+hec.token ||
  404. splunkLoggerDriver.nullMessage.Host != hostname ||
  405. splunkLoggerDriver.nullMessage.Source != "" ||
  406. splunkLoggerDriver.nullMessage.SourceType != "" ||
  407. splunkLoggerDriver.nullMessage.Index != "" ||
  408. splunkLoggerDriver.gzipCompression ||
  409. splunkLoggerDriver.postMessagesFrequency != defaultPostMessagesFrequency ||
  410. splunkLoggerDriver.postMessagesBatchSize != defaultPostMessagesBatchSize ||
  411. splunkLoggerDriver.bufferMaximum != defaultBufferMaximum ||
  412. cap(splunkLoggerDriver.stream) != defaultStreamChannelSize ||
  413. string(splunkLoggerDriver.prefix) != "containeriid " {
  414. t.Fatal("Values do not match configuration.")
  415. }
  416. message1Time := time.Now()
  417. if err := loggerDriver.Log(&logger.Message{Line: []byte("{\"a\":\"b\"}"), Source: "stdout", Timestamp: message1Time}); err != nil {
  418. t.Fatal(err)
  419. }
  420. message2Time := time.Now()
  421. if err := loggerDriver.Log(&logger.Message{Line: []byte("notjson"), Source: "stdout", Timestamp: message2Time}); err != nil {
  422. t.Fatal(err)
  423. }
  424. err = loggerDriver.Close()
  425. if err != nil {
  426. t.Fatal(err)
  427. }
  428. if len(hec.messages) != 2 {
  429. t.Fatal("Expected two messages")
  430. }
  431. message1 := hec.messages[0]
  432. if message1.Time != fmt.Sprintf("%f", float64(message1Time.UnixNano())/float64(time.Second)) ||
  433. message1.Host != hostname ||
  434. message1.Source != "" ||
  435. message1.SourceType != "" ||
  436. message1.Index != "" {
  437. t.Fatalf("Unexpected values of message 1 %v", message1)
  438. }
  439. if event, err := message1.EventAsString(); err != nil {
  440. t.Fatal(err)
  441. } else {
  442. if event != "containeriid {\"a\":\"b\"}" {
  443. t.Fatalf("Unexpected event in message 1 %v", event)
  444. }
  445. }
  446. message2 := hec.messages[1]
  447. if message2.Time != fmt.Sprintf("%f", float64(message2Time.UnixNano())/float64(time.Second)) ||
  448. message2.Host != hostname ||
  449. message2.Source != "" ||
  450. message2.SourceType != "" ||
  451. message2.Index != "" {
  452. t.Fatalf("Unexpected values of message 2 %v", message2)
  453. }
  454. if event, err := message2.EventAsString(); err != nil {
  455. t.Fatal(err)
  456. } else {
  457. if event != "containeriid notjson" {
  458. t.Fatalf("Unexpected event in message 1 %v", event)
  459. }
  460. }
  461. err = hec.Close()
  462. if err != nil {
  463. t.Fatal(err)
  464. }
  465. }
  466. // Verify raw format with labels
  467. func TestRawFormatWithLabels(t *testing.T) {
  468. hec := NewHTTPEventCollectorMock(t)
  469. go hec.Serve()
  470. info := logger.Info{
  471. Config: map[string]string{
  472. splunkURLKey: hec.URL(),
  473. splunkTokenKey: hec.token,
  474. splunkFormatKey: splunkFormatRaw,
  475. labelsKey: "a",
  476. },
  477. ContainerID: "containeriid",
  478. ContainerName: "/container_name",
  479. ContainerImageID: "contaimageid",
  480. ContainerImageName: "container_image_name",
  481. ContainerLabels: map[string]string{
  482. "a": "b",
  483. },
  484. }
  485. hostname, err := info.Hostname()
  486. if err != nil {
  487. t.Fatal(err)
  488. }
  489. loggerDriver, err := New(info)
  490. if err != nil {
  491. t.Fatal(err)
  492. }
  493. if !hec.connectionVerified {
  494. t.Fatal("By default connection should be verified")
  495. }
  496. splunkLoggerDriver, ok := loggerDriver.(*splunkLoggerRaw)
  497. if !ok {
  498. t.Fatal("Unexpected Splunk Logging Driver type")
  499. }
  500. if splunkLoggerDriver.url != hec.URL()+"/services/collector/event/1.0" ||
  501. splunkLoggerDriver.auth != "Splunk "+hec.token ||
  502. splunkLoggerDriver.nullMessage.Host != hostname ||
  503. splunkLoggerDriver.nullMessage.Source != "" ||
  504. splunkLoggerDriver.nullMessage.SourceType != "" ||
  505. splunkLoggerDriver.nullMessage.Index != "" ||
  506. splunkLoggerDriver.gzipCompression ||
  507. splunkLoggerDriver.postMessagesFrequency != defaultPostMessagesFrequency ||
  508. splunkLoggerDriver.postMessagesBatchSize != defaultPostMessagesBatchSize ||
  509. splunkLoggerDriver.bufferMaximum != defaultBufferMaximum ||
  510. cap(splunkLoggerDriver.stream) != defaultStreamChannelSize ||
  511. string(splunkLoggerDriver.prefix) != "containeriid a=b " {
  512. t.Fatal("Values do not match configuration.")
  513. }
  514. message1Time := time.Now()
  515. if err := loggerDriver.Log(&logger.Message{Line: []byte("{\"a\":\"b\"}"), Source: "stdout", Timestamp: message1Time}); err != nil {
  516. t.Fatal(err)
  517. }
  518. message2Time := time.Now()
  519. if err := loggerDriver.Log(&logger.Message{Line: []byte("notjson"), Source: "stdout", Timestamp: message2Time}); err != nil {
  520. t.Fatal(err)
  521. }
  522. err = loggerDriver.Close()
  523. if err != nil {
  524. t.Fatal(err)
  525. }
  526. if len(hec.messages) != 2 {
  527. t.Fatal("Expected two messages")
  528. }
  529. message1 := hec.messages[0]
  530. if message1.Time != fmt.Sprintf("%f", float64(message1Time.UnixNano())/float64(time.Second)) ||
  531. message1.Host != hostname ||
  532. message1.Source != "" ||
  533. message1.SourceType != "" ||
  534. message1.Index != "" {
  535. t.Fatalf("Unexpected values of message 1 %v", message1)
  536. }
  537. if event, err := message1.EventAsString(); err != nil {
  538. t.Fatal(err)
  539. } else {
  540. if event != "containeriid a=b {\"a\":\"b\"}" {
  541. t.Fatalf("Unexpected event in message 1 %v", event)
  542. }
  543. }
  544. message2 := hec.messages[1]
  545. if message2.Time != fmt.Sprintf("%f", float64(message2Time.UnixNano())/float64(time.Second)) ||
  546. message2.Host != hostname ||
  547. message2.Source != "" ||
  548. message2.SourceType != "" ||
  549. message2.Index != "" {
  550. t.Fatalf("Unexpected values of message 2 %v", message2)
  551. }
  552. if event, err := message2.EventAsString(); err != nil {
  553. t.Fatal(err)
  554. } else {
  555. if event != "containeriid a=b notjson" {
  556. t.Fatalf("Unexpected event in message 2 %v", event)
  557. }
  558. }
  559. err = hec.Close()
  560. if err != nil {
  561. t.Fatal(err)
  562. }
  563. }
  564. // Verify that Splunk Logging Driver can accept tag="" which will allow to send raw messages
  565. // in the same way we get them in stdout/stderr
  566. func TestRawFormatWithoutTag(t *testing.T) {
  567. hec := NewHTTPEventCollectorMock(t)
  568. go hec.Serve()
  569. info := logger.Info{
  570. Config: map[string]string{
  571. splunkURLKey: hec.URL(),
  572. splunkTokenKey: hec.token,
  573. splunkFormatKey: splunkFormatRaw,
  574. tagKey: "",
  575. },
  576. ContainerID: "containeriid",
  577. ContainerName: "/container_name",
  578. ContainerImageID: "contaimageid",
  579. ContainerImageName: "container_image_name",
  580. }
  581. hostname, err := info.Hostname()
  582. if err != nil {
  583. t.Fatal(err)
  584. }
  585. loggerDriver, err := New(info)
  586. if err != nil {
  587. t.Fatal(err)
  588. }
  589. if !hec.connectionVerified {
  590. t.Fatal("By default connection should be verified")
  591. }
  592. splunkLoggerDriver, ok := loggerDriver.(*splunkLoggerRaw)
  593. if !ok {
  594. t.Fatal("Unexpected Splunk Logging Driver type")
  595. }
  596. if splunkLoggerDriver.url != hec.URL()+"/services/collector/event/1.0" ||
  597. splunkLoggerDriver.auth != "Splunk "+hec.token ||
  598. splunkLoggerDriver.nullMessage.Host != hostname ||
  599. splunkLoggerDriver.nullMessage.Source != "" ||
  600. splunkLoggerDriver.nullMessage.SourceType != "" ||
  601. splunkLoggerDriver.nullMessage.Index != "" ||
  602. splunkLoggerDriver.gzipCompression ||
  603. splunkLoggerDriver.postMessagesFrequency != defaultPostMessagesFrequency ||
  604. splunkLoggerDriver.postMessagesBatchSize != defaultPostMessagesBatchSize ||
  605. splunkLoggerDriver.bufferMaximum != defaultBufferMaximum ||
  606. cap(splunkLoggerDriver.stream) != defaultStreamChannelSize ||
  607. string(splunkLoggerDriver.prefix) != "" {
  608. t.Log(string(splunkLoggerDriver.prefix) + "a")
  609. t.Fatal("Values do not match configuration.")
  610. }
  611. message1Time := time.Now()
  612. if err := loggerDriver.Log(&logger.Message{Line: []byte("{\"a\":\"b\"}"), Source: "stdout", Timestamp: message1Time}); err != nil {
  613. t.Fatal(err)
  614. }
  615. message2Time := time.Now()
  616. if err := loggerDriver.Log(&logger.Message{Line: []byte("notjson"), Source: "stdout", Timestamp: message2Time}); err != nil {
  617. t.Fatal(err)
  618. }
  619. err = loggerDriver.Close()
  620. if err != nil {
  621. t.Fatal(err)
  622. }
  623. if len(hec.messages) != 2 {
  624. t.Fatal("Expected two messages")
  625. }
  626. message1 := hec.messages[0]
  627. if message1.Time != fmt.Sprintf("%f", float64(message1Time.UnixNano())/float64(time.Second)) ||
  628. message1.Host != hostname ||
  629. message1.Source != "" ||
  630. message1.SourceType != "" ||
  631. message1.Index != "" {
  632. t.Fatalf("Unexpected values of message 1 %v", message1)
  633. }
  634. if event, err := message1.EventAsString(); err != nil {
  635. t.Fatal(err)
  636. } else {
  637. if event != "{\"a\":\"b\"}" {
  638. t.Fatalf("Unexpected event in message 1 %v", event)
  639. }
  640. }
  641. message2 := hec.messages[1]
  642. if message2.Time != fmt.Sprintf("%f", float64(message2Time.UnixNano())/float64(time.Second)) ||
  643. message2.Host != hostname ||
  644. message2.Source != "" ||
  645. message2.SourceType != "" ||
  646. message2.Index != "" {
  647. t.Fatalf("Unexpected values of message 2 %v", message2)
  648. }
  649. if event, err := message2.EventAsString(); err != nil {
  650. t.Fatal(err)
  651. } else {
  652. if event != "notjson" {
  653. t.Fatalf("Unexpected event in message 2 %v", event)
  654. }
  655. }
  656. err = hec.Close()
  657. if err != nil {
  658. t.Fatal(err)
  659. }
  660. }
  661. // Verify that we will send messages in batches with default batching parameters,
  662. // but change frequency to be sure that numOfRequests will match expected 17 requests
  663. func TestBatching(t *testing.T) {
  664. if err := os.Setenv(envVarPostMessagesFrequency, "10h"); err != nil {
  665. t.Fatal(err)
  666. }
  667. hec := NewHTTPEventCollectorMock(t)
  668. go hec.Serve()
  669. info := logger.Info{
  670. Config: map[string]string{
  671. splunkURLKey: hec.URL(),
  672. splunkTokenKey: hec.token,
  673. },
  674. ContainerID: "containeriid",
  675. ContainerName: "/container_name",
  676. ContainerImageID: "contaimageid",
  677. ContainerImageName: "container_image_name",
  678. }
  679. loggerDriver, err := New(info)
  680. if err != nil {
  681. t.Fatal(err)
  682. }
  683. for i := 0; i < defaultStreamChannelSize*4; i++ {
  684. if err := loggerDriver.Log(&logger.Message{Line: []byte(fmt.Sprintf("%d", i)), Source: "stdout", Timestamp: time.Now()}); err != nil {
  685. t.Fatal(err)
  686. }
  687. }
  688. err = loggerDriver.Close()
  689. if err != nil {
  690. t.Fatal(err)
  691. }
  692. if len(hec.messages) != defaultStreamChannelSize*4 {
  693. t.Fatal("Not all messages delivered")
  694. }
  695. for i, message := range hec.messages {
  696. if event, err := message.EventAsMap(); err != nil {
  697. t.Fatal(err)
  698. } else {
  699. if event["line"] != fmt.Sprintf("%d", i) {
  700. t.Fatalf("Unexpected event in message %v", event)
  701. }
  702. }
  703. }
  704. // 1 to verify connection and 16 batches
  705. if hec.numOfRequests != 17 {
  706. t.Fatalf("Unexpected number of requests %d", hec.numOfRequests)
  707. }
  708. err = hec.Close()
  709. if err != nil {
  710. t.Fatal(err)
  711. }
  712. if err := os.Setenv(envVarPostMessagesFrequency, ""); err != nil {
  713. t.Fatal(err)
  714. }
  715. }
  716. // Verify that test is using time to fire events not rare than specified frequency
  717. func TestFrequency(t *testing.T) {
  718. if err := os.Setenv(envVarPostMessagesFrequency, "5ms"); err != nil {
  719. t.Fatal(err)
  720. }
  721. hec := NewHTTPEventCollectorMock(t)
  722. go hec.Serve()
  723. info := logger.Info{
  724. Config: map[string]string{
  725. splunkURLKey: hec.URL(),
  726. splunkTokenKey: hec.token,
  727. },
  728. ContainerID: "containeriid",
  729. ContainerName: "/container_name",
  730. ContainerImageID: "contaimageid",
  731. ContainerImageName: "container_image_name",
  732. }
  733. loggerDriver, err := New(info)
  734. if err != nil {
  735. t.Fatal(err)
  736. }
  737. for i := 0; i < 10; i++ {
  738. if err := loggerDriver.Log(&logger.Message{Line: []byte(fmt.Sprintf("%d", i)), Source: "stdout", Timestamp: time.Now()}); err != nil {
  739. t.Fatal(err)
  740. }
  741. time.Sleep(15 * time.Millisecond)
  742. }
  743. err = loggerDriver.Close()
  744. if err != nil {
  745. t.Fatal(err)
  746. }
  747. if len(hec.messages) != 10 {
  748. t.Fatal("Not all messages delivered")
  749. }
  750. for i, message := range hec.messages {
  751. if event, err := message.EventAsMap(); err != nil {
  752. t.Fatal(err)
  753. } else {
  754. if event["line"] != fmt.Sprintf("%d", i) {
  755. t.Fatalf("Unexpected event in message %v", event)
  756. }
  757. }
  758. }
  759. // 1 to verify connection and 10 to verify that we have sent messages with required frequency,
  760. // but because frequency is too small (to keep test quick), instead of 11, use 9 if context switches will be slow
  761. if hec.numOfRequests < 9 {
  762. t.Fatalf("Unexpected number of requests %d", hec.numOfRequests)
  763. }
  764. err = hec.Close()
  765. if err != nil {
  766. t.Fatal(err)
  767. }
  768. if err := os.Setenv(envVarPostMessagesFrequency, ""); err != nil {
  769. t.Fatal(err)
  770. }
  771. }
  772. // Simulate behavior similar to first version of Splunk Logging Driver, when we were sending one message
  773. // per request
  774. func TestOneMessagePerRequest(t *testing.T) {
  775. if err := os.Setenv(envVarPostMessagesFrequency, "10h"); err != nil {
  776. t.Fatal(err)
  777. }
  778. if err := os.Setenv(envVarPostMessagesBatchSize, "1"); err != nil {
  779. t.Fatal(err)
  780. }
  781. if err := os.Setenv(envVarBufferMaximum, "1"); err != nil {
  782. t.Fatal(err)
  783. }
  784. if err := os.Setenv(envVarStreamChannelSize, "0"); err != nil {
  785. t.Fatal(err)
  786. }
  787. hec := NewHTTPEventCollectorMock(t)
  788. go hec.Serve()
  789. info := logger.Info{
  790. Config: map[string]string{
  791. splunkURLKey: hec.URL(),
  792. splunkTokenKey: hec.token,
  793. },
  794. ContainerID: "containeriid",
  795. ContainerName: "/container_name",
  796. ContainerImageID: "contaimageid",
  797. ContainerImageName: "container_image_name",
  798. }
  799. loggerDriver, err := New(info)
  800. if err != nil {
  801. t.Fatal(err)
  802. }
  803. for i := 0; i < 10; i++ {
  804. if err := loggerDriver.Log(&logger.Message{Line: []byte(fmt.Sprintf("%d", i)), Source: "stdout", Timestamp: time.Now()}); err != nil {
  805. t.Fatal(err)
  806. }
  807. }
  808. err = loggerDriver.Close()
  809. if err != nil {
  810. t.Fatal(err)
  811. }
  812. if len(hec.messages) != 10 {
  813. t.Fatal("Not all messages delivered")
  814. }
  815. for i, message := range hec.messages {
  816. if event, err := message.EventAsMap(); err != nil {
  817. t.Fatal(err)
  818. } else {
  819. if event["line"] != fmt.Sprintf("%d", i) {
  820. t.Fatalf("Unexpected event in message %v", event)
  821. }
  822. }
  823. }
  824. // 1 to verify connection and 10 messages
  825. if hec.numOfRequests != 11 {
  826. t.Fatalf("Unexpected number of requests %d", hec.numOfRequests)
  827. }
  828. err = hec.Close()
  829. if err != nil {
  830. t.Fatal(err)
  831. }
  832. if err := os.Setenv(envVarPostMessagesFrequency, ""); err != nil {
  833. t.Fatal(err)
  834. }
  835. if err := os.Setenv(envVarPostMessagesBatchSize, ""); err != nil {
  836. t.Fatal(err)
  837. }
  838. if err := os.Setenv(envVarBufferMaximum, ""); err != nil {
  839. t.Fatal(err)
  840. }
  841. if err := os.Setenv(envVarStreamChannelSize, ""); err != nil {
  842. t.Fatal(err)
  843. }
  844. }
  845. // Driver should not be created when HEC is unresponsive
  846. func TestVerify(t *testing.T) {
  847. hec := NewHTTPEventCollectorMock(t)
  848. hec.simulateServerError = true
  849. go hec.Serve()
  850. info := logger.Info{
  851. Config: map[string]string{
  852. splunkURLKey: hec.URL(),
  853. splunkTokenKey: hec.token,
  854. },
  855. ContainerID: "containeriid",
  856. ContainerName: "/container_name",
  857. ContainerImageID: "contaimageid",
  858. ContainerImageName: "container_image_name",
  859. }
  860. _, err := New(info)
  861. if err == nil {
  862. t.Fatal("Expecting driver to fail, when server is unresponsive")
  863. }
  864. err = hec.Close()
  865. if err != nil {
  866. t.Fatal(err)
  867. }
  868. }
  869. // Verify that user can specify to skip verification that Splunk HEC is working.
  870. // Also in this test we verify retry logic.
  871. func TestSkipVerify(t *testing.T) {
  872. hec := NewHTTPEventCollectorMock(t)
  873. hec.simulateServerError = true
  874. go hec.Serve()
  875. info := logger.Info{
  876. Config: map[string]string{
  877. splunkURLKey: hec.URL(),
  878. splunkTokenKey: hec.token,
  879. splunkVerifyConnectionKey: "false",
  880. },
  881. ContainerID: "containeriid",
  882. ContainerName: "/container_name",
  883. ContainerImageID: "contaimageid",
  884. ContainerImageName: "container_image_name",
  885. }
  886. loggerDriver, err := New(info)
  887. if err != nil {
  888. t.Fatal(err)
  889. }
  890. if hec.connectionVerified {
  891. t.Fatal("Connection should not be verified")
  892. }
  893. for i := 0; i < defaultStreamChannelSize*2; i++ {
  894. if err := loggerDriver.Log(&logger.Message{Line: []byte(fmt.Sprintf("%d", i)), Source: "stdout", Timestamp: time.Now()}); err != nil {
  895. t.Fatal(err)
  896. }
  897. }
  898. if len(hec.messages) != 0 {
  899. t.Fatal("No messages should be accepted at this point")
  900. }
  901. hec.simulateServerError = false
  902. for i := defaultStreamChannelSize * 2; i < defaultStreamChannelSize*4; i++ {
  903. if err := loggerDriver.Log(&logger.Message{Line: []byte(fmt.Sprintf("%d", i)), Source: "stdout", Timestamp: time.Now()}); err != nil {
  904. t.Fatal(err)
  905. }
  906. }
  907. err = loggerDriver.Close()
  908. if err != nil {
  909. t.Fatal(err)
  910. }
  911. if len(hec.messages) != defaultStreamChannelSize*4 {
  912. t.Fatal("Not all messages delivered")
  913. }
  914. for i, message := range hec.messages {
  915. if event, err := message.EventAsMap(); err != nil {
  916. t.Fatal(err)
  917. } else {
  918. if event["line"] != fmt.Sprintf("%d", i) {
  919. t.Fatalf("Unexpected event in message %v", event)
  920. }
  921. }
  922. }
  923. err = hec.Close()
  924. if err != nil {
  925. t.Fatal(err)
  926. }
  927. }
  928. // Verify logic for when we filled whole buffer
  929. func TestBufferMaximum(t *testing.T) {
  930. if err := os.Setenv(envVarPostMessagesBatchSize, "2"); err != nil {
  931. t.Fatal(err)
  932. }
  933. if err := os.Setenv(envVarBufferMaximum, "10"); err != nil {
  934. t.Fatal(err)
  935. }
  936. if err := os.Setenv(envVarStreamChannelSize, "0"); err != nil {
  937. t.Fatal(err)
  938. }
  939. hec := NewHTTPEventCollectorMock(t)
  940. hec.simulateServerError = true
  941. go hec.Serve()
  942. info := logger.Info{
  943. Config: map[string]string{
  944. splunkURLKey: hec.URL(),
  945. splunkTokenKey: hec.token,
  946. splunkVerifyConnectionKey: "false",
  947. },
  948. ContainerID: "containeriid",
  949. ContainerName: "/container_name",
  950. ContainerImageID: "contaimageid",
  951. ContainerImageName: "container_image_name",
  952. }
  953. loggerDriver, err := New(info)
  954. if err != nil {
  955. t.Fatal(err)
  956. }
  957. if hec.connectionVerified {
  958. t.Fatal("Connection should not be verified")
  959. }
  960. for i := 0; i < 11; i++ {
  961. if err := loggerDriver.Log(&logger.Message{Line: []byte(fmt.Sprintf("%d", i)), Source: "stdout", Timestamp: time.Now()}); err != nil {
  962. t.Fatal(err)
  963. }
  964. }
  965. if len(hec.messages) != 0 {
  966. t.Fatal("No messages should be accepted at this point")
  967. }
  968. hec.simulateServerError = false
  969. err = loggerDriver.Close()
  970. if err != nil {
  971. t.Fatal(err)
  972. }
  973. if len(hec.messages) != 9 {
  974. t.Fatalf("Expected # of messages %d, got %d", 9, len(hec.messages))
  975. }
  976. // First 1000 messages are written to daemon log when buffer was full
  977. for i, message := range hec.messages {
  978. if event, err := message.EventAsMap(); err != nil {
  979. t.Fatal(err)
  980. } else {
  981. if event["line"] != fmt.Sprintf("%d", i+2) {
  982. t.Fatalf("Unexpected event in message %v", event)
  983. }
  984. }
  985. }
  986. err = hec.Close()
  987. if err != nil {
  988. t.Fatal(err)
  989. }
  990. if err := os.Setenv(envVarPostMessagesBatchSize, ""); err != nil {
  991. t.Fatal(err)
  992. }
  993. if err := os.Setenv(envVarBufferMaximum, ""); err != nil {
  994. t.Fatal(err)
  995. }
  996. if err := os.Setenv(envVarStreamChannelSize, ""); err != nil {
  997. t.Fatal(err)
  998. }
  999. }
  1000. // Verify that we are not blocking close when HEC is down for the whole time
  1001. func TestServerAlwaysDown(t *testing.T) {
  1002. if err := os.Setenv(envVarPostMessagesBatchSize, "2"); err != nil {
  1003. t.Fatal(err)
  1004. }
  1005. if err := os.Setenv(envVarBufferMaximum, "4"); err != nil {
  1006. t.Fatal(err)
  1007. }
  1008. if err := os.Setenv(envVarStreamChannelSize, "0"); err != nil {
  1009. t.Fatal(err)
  1010. }
  1011. hec := NewHTTPEventCollectorMock(t)
  1012. hec.simulateServerError = true
  1013. go hec.Serve()
  1014. info := logger.Info{
  1015. Config: map[string]string{
  1016. splunkURLKey: hec.URL(),
  1017. splunkTokenKey: hec.token,
  1018. splunkVerifyConnectionKey: "false",
  1019. },
  1020. ContainerID: "containeriid",
  1021. ContainerName: "/container_name",
  1022. ContainerImageID: "contaimageid",
  1023. ContainerImageName: "container_image_name",
  1024. }
  1025. loggerDriver, err := New(info)
  1026. if err != nil {
  1027. t.Fatal(err)
  1028. }
  1029. if hec.connectionVerified {
  1030. t.Fatal("Connection should not be verified")
  1031. }
  1032. for i := 0; i < 5; i++ {
  1033. if err := loggerDriver.Log(&logger.Message{Line: []byte(fmt.Sprintf("%d", i)), Source: "stdout", Timestamp: time.Now()}); err != nil {
  1034. t.Fatal(err)
  1035. }
  1036. }
  1037. err = loggerDriver.Close()
  1038. if err != nil {
  1039. t.Fatal(err)
  1040. }
  1041. if len(hec.messages) != 0 {
  1042. t.Fatal("No messages should be sent")
  1043. }
  1044. err = hec.Close()
  1045. if err != nil {
  1046. t.Fatal(err)
  1047. }
  1048. if err := os.Setenv(envVarPostMessagesBatchSize, ""); err != nil {
  1049. t.Fatal(err)
  1050. }
  1051. if err := os.Setenv(envVarBufferMaximum, ""); err != nil {
  1052. t.Fatal(err)
  1053. }
  1054. if err := os.Setenv(envVarStreamChannelSize, ""); err != nil {
  1055. t.Fatal(err)
  1056. }
  1057. }
  1058. // Cannot send messages after we close driver
  1059. func TestCannotSendAfterClose(t *testing.T) {
  1060. hec := NewHTTPEventCollectorMock(t)
  1061. go hec.Serve()
  1062. info := logger.Info{
  1063. Config: map[string]string{
  1064. splunkURLKey: hec.URL(),
  1065. splunkTokenKey: hec.token,
  1066. },
  1067. ContainerID: "containeriid",
  1068. ContainerName: "/container_name",
  1069. ContainerImageID: "contaimageid",
  1070. ContainerImageName: "container_image_name",
  1071. }
  1072. loggerDriver, err := New(info)
  1073. if err != nil {
  1074. t.Fatal(err)
  1075. }
  1076. if err := loggerDriver.Log(&logger.Message{Line: []byte("message1"), Source: "stdout", Timestamp: time.Now()}); err != nil {
  1077. t.Fatal(err)
  1078. }
  1079. err = loggerDriver.Close()
  1080. if err != nil {
  1081. t.Fatal(err)
  1082. }
  1083. if err := loggerDriver.Log(&logger.Message{Line: []byte("message2"), Source: "stdout", Timestamp: time.Now()}); err == nil {
  1084. t.Fatal("Driver should not allow to send messages after close")
  1085. }
  1086. if len(hec.messages) != 1 {
  1087. t.Fatal("Only one message should be sent")
  1088. }
  1089. message := hec.messages[0]
  1090. if event, err := message.EventAsMap(); err != nil {
  1091. t.Fatal(err)
  1092. } else {
  1093. if event["line"] != "message1" {
  1094. t.Fatalf("Unexpected event in message %v", event)
  1095. }
  1096. }
  1097. err = hec.Close()
  1098. if err != nil {
  1099. t.Fatal(err)
  1100. }
  1101. }