splunk_test.go 34 KB


  1. package splunk
  2. import (
  3. "compress/gzip"
  4. "context"
  5. "fmt"
  6. "os"
  7. "runtime"
  8. "testing"
  9. "time"
  10. "github.com/docker/docker/daemon/logger"
  11. "github.com/stretchr/testify/require"
  12. )
  13. // Validate options
  14. func TestValidateLogOpt(t *testing.T) {
  15. err := ValidateLogOpt(map[string]string{
  16. splunkURLKey: "http://127.0.0.1",
  17. splunkTokenKey: "2160C7EF-2CE9-4307-A180-F852B99CF417",
  18. splunkSourceKey: "mysource",
  19. splunkSourceTypeKey: "mysourcetype",
  20. splunkIndexKey: "myindex",
  21. splunkCAPathKey: "/usr/cert.pem",
  22. splunkCANameKey: "ca_name",
  23. splunkInsecureSkipVerifyKey: "true",
  24. splunkFormatKey: "json",
  25. splunkVerifyConnectionKey: "true",
  26. splunkGzipCompressionKey: "true",
  27. splunkGzipCompressionLevelKey: "1",
  28. envKey: "a",
  29. envRegexKey: "^foo",
  30. labelsKey: "b",
  31. tagKey: "c",
  32. })
  33. if err != nil {
  34. t.Fatal(err)
  35. }
  36. err = ValidateLogOpt(map[string]string{
  37. "not-supported-option": "a",
  38. })
  39. if err == nil {
  40. t.Fatal("Expecting error on unsupported options")
  41. }
  42. }
  43. // Driver require user to specify required options
  44. func TestNewMissedConfig(t *testing.T) {
  45. info := logger.Info{
  46. Config: map[string]string{},
  47. }
  48. _, err := New(info)
  49. if err == nil {
  50. t.Fatal("Logger driver should fail when no required parameters specified")
  51. }
  52. }
  53. // Driver require user to specify splunk-url
  54. func TestNewMissedUrl(t *testing.T) {
  55. info := logger.Info{
  56. Config: map[string]string{
  57. splunkTokenKey: "4642492F-D8BD-47F1-A005-0C08AE4657DF",
  58. },
  59. }
  60. _, err := New(info)
  61. if err.Error() != "splunk: splunk-url is expected" {
  62. t.Fatal("Logger driver should fail when no required parameters specified")
  63. }
  64. }
  65. // Driver require user to specify splunk-token
  66. func TestNewMissedToken(t *testing.T) {
  67. info := logger.Info{
  68. Config: map[string]string{
  69. splunkURLKey: "http://127.0.0.1:8088",
  70. },
  71. }
  72. _, err := New(info)
  73. if err.Error() != "splunk: splunk-token is expected" {
  74. t.Fatal("Logger driver should fail when no required parameters specified")
  75. }
  76. }
  77. // Test default settings
  78. func TestDefault(t *testing.T) {
  79. hec := NewHTTPEventCollectorMock(t)
  80. go hec.Serve()
  81. info := logger.Info{
  82. Config: map[string]string{
  83. splunkURLKey: hec.URL(),
  84. splunkTokenKey: hec.token,
  85. },
  86. ContainerID: "containeriid",
  87. ContainerName: "container_name",
  88. ContainerImageID: "contaimageid",
  89. ContainerImageName: "container_image_name",
  90. }
  91. hostname, err := info.Hostname()
  92. if err != nil {
  93. t.Fatal(err)
  94. }
  95. loggerDriver, err := New(info)
  96. if err != nil {
  97. t.Fatal(err)
  98. }
  99. if loggerDriver.Name() != driverName {
  100. t.Fatal("Unexpected logger driver name")
  101. }
  102. if !hec.connectionVerified {
  103. t.Fatal("By default connection should be verified")
  104. }
  105. splunkLoggerDriver, ok := loggerDriver.(*splunkLoggerInline)
  106. if !ok {
  107. t.Fatal("Unexpected Splunk Logging Driver type")
  108. }
  109. if splunkLoggerDriver.url != hec.URL()+"/services/collector/event/1.0" ||
  110. splunkLoggerDriver.auth != "Splunk "+hec.token ||
  111. splunkLoggerDriver.nullMessage.Host != hostname ||
  112. splunkLoggerDriver.nullMessage.Source != "" ||
  113. splunkLoggerDriver.nullMessage.SourceType != "" ||
  114. splunkLoggerDriver.nullMessage.Index != "" ||
  115. splunkLoggerDriver.gzipCompression ||
  116. splunkLoggerDriver.postMessagesFrequency != defaultPostMessagesFrequency ||
  117. splunkLoggerDriver.postMessagesBatchSize != defaultPostMessagesBatchSize ||
  118. splunkLoggerDriver.bufferMaximum != defaultBufferMaximum ||
  119. cap(splunkLoggerDriver.stream) != defaultStreamChannelSize {
  120. t.Fatal("Found not default values setup in Splunk Logging Driver.")
  121. }
  122. message1Time := time.Now()
  123. if err := loggerDriver.Log(&logger.Message{Line: []byte("{\"a\":\"b\"}"), Source: "stdout", Timestamp: message1Time}); err != nil {
  124. t.Fatal(err)
  125. }
  126. message2Time := time.Now()
  127. if err := loggerDriver.Log(&logger.Message{Line: []byte("notajson"), Source: "stdout", Timestamp: message2Time}); err != nil {
  128. t.Fatal(err)
  129. }
  130. err = loggerDriver.Close()
  131. if err != nil {
  132. t.Fatal(err)
  133. }
  134. if len(hec.messages) != 2 {
  135. t.Fatal("Expected two messages")
  136. }
  137. if *hec.gzipEnabled {
  138. t.Fatal("Gzip should not be used")
  139. }
  140. message1 := hec.messages[0]
  141. if message1.Time != fmt.Sprintf("%f", float64(message1Time.UnixNano())/float64(time.Second)) ||
  142. message1.Host != hostname ||
  143. message1.Source != "" ||
  144. message1.SourceType != "" ||
  145. message1.Index != "" {
  146. t.Fatalf("Unexpected values of message 1 %v", message1)
  147. }
  148. if event, err := message1.EventAsMap(); err != nil {
  149. t.Fatal(err)
  150. } else {
  151. if event["line"] != "{\"a\":\"b\"}" ||
  152. event["source"] != "stdout" ||
  153. event["tag"] != "containeriid" ||
  154. len(event) != 3 {
  155. t.Fatalf("Unexpected event in message %v", event)
  156. }
  157. }
  158. message2 := hec.messages[1]
  159. if message2.Time != fmt.Sprintf("%f", float64(message2Time.UnixNano())/float64(time.Second)) ||
  160. message2.Host != hostname ||
  161. message2.Source != "" ||
  162. message2.SourceType != "" ||
  163. message2.Index != "" {
  164. t.Fatalf("Unexpected values of message 1 %v", message2)
  165. }
  166. if event, err := message2.EventAsMap(); err != nil {
  167. t.Fatal(err)
  168. } else {
  169. if event["line"] != "notajson" ||
  170. event["source"] != "stdout" ||
  171. event["tag"] != "containeriid" ||
  172. len(event) != 3 {
  173. t.Fatalf("Unexpected event in message %v", event)
  174. }
  175. }
  176. err = hec.Close()
  177. if err != nil {
  178. t.Fatal(err)
  179. }
  180. }
  181. // Verify inline format with a not default settings for most of options
  182. func TestInlineFormatWithNonDefaultOptions(t *testing.T) {
  183. hec := NewHTTPEventCollectorMock(t)
  184. go hec.Serve()
  185. info := logger.Info{
  186. Config: map[string]string{
  187. splunkURLKey: hec.URL(),
  188. splunkTokenKey: hec.token,
  189. splunkSourceKey: "mysource",
  190. splunkSourceTypeKey: "mysourcetype",
  191. splunkIndexKey: "myindex",
  192. splunkFormatKey: splunkFormatInline,
  193. splunkGzipCompressionKey: "true",
  194. tagKey: "{{.ImageName}}/{{.Name}}",
  195. labelsKey: "a",
  196. envRegexKey: "^foo",
  197. },
  198. ContainerID: "containeriid",
  199. ContainerName: "/container_name",
  200. ContainerImageID: "contaimageid",
  201. ContainerImageName: "container_image_name",
  202. ContainerLabels: map[string]string{
  203. "a": "b",
  204. },
  205. ContainerEnv: []string{"foo_finder=bar"},
  206. }
  207. hostname, err := info.Hostname()
  208. if err != nil {
  209. t.Fatal(err)
  210. }
  211. loggerDriver, err := New(info)
  212. if err != nil {
  213. t.Fatal(err)
  214. }
  215. if !hec.connectionVerified {
  216. t.Fatal("By default connection should be verified")
  217. }
  218. splunkLoggerDriver, ok := loggerDriver.(*splunkLoggerInline)
  219. if !ok {
  220. t.Fatal("Unexpected Splunk Logging Driver type")
  221. }
  222. if splunkLoggerDriver.url != hec.URL()+"/services/collector/event/1.0" ||
  223. splunkLoggerDriver.auth != "Splunk "+hec.token ||
  224. splunkLoggerDriver.nullMessage.Host != hostname ||
  225. splunkLoggerDriver.nullMessage.Source != "mysource" ||
  226. splunkLoggerDriver.nullMessage.SourceType != "mysourcetype" ||
  227. splunkLoggerDriver.nullMessage.Index != "myindex" ||
  228. !splunkLoggerDriver.gzipCompression ||
  229. splunkLoggerDriver.gzipCompressionLevel != gzip.DefaultCompression ||
  230. splunkLoggerDriver.postMessagesFrequency != defaultPostMessagesFrequency ||
  231. splunkLoggerDriver.postMessagesBatchSize != defaultPostMessagesBatchSize ||
  232. splunkLoggerDriver.bufferMaximum != defaultBufferMaximum ||
  233. cap(splunkLoggerDriver.stream) != defaultStreamChannelSize {
  234. t.Fatal("Values do not match configuration.")
  235. }
  236. messageTime := time.Now()
  237. if err := loggerDriver.Log(&logger.Message{Line: []byte("1"), Source: "stdout", Timestamp: messageTime}); err != nil {
  238. t.Fatal(err)
  239. }
  240. err = loggerDriver.Close()
  241. if err != nil {
  242. t.Fatal(err)
  243. }
  244. if len(hec.messages) != 1 {
  245. t.Fatal("Expected one message")
  246. }
  247. if !*hec.gzipEnabled {
  248. t.Fatal("Gzip should be used")
  249. }
  250. message := hec.messages[0]
  251. if message.Time != fmt.Sprintf("%f", float64(messageTime.UnixNano())/float64(time.Second)) ||
  252. message.Host != hostname ||
  253. message.Source != "mysource" ||
  254. message.SourceType != "mysourcetype" ||
  255. message.Index != "myindex" {
  256. t.Fatalf("Unexpected values of message %v", message)
  257. }
  258. if event, err := message.EventAsMap(); err != nil {
  259. t.Fatal(err)
  260. } else {
  261. if event["line"] != "1" ||
  262. event["source"] != "stdout" ||
  263. event["tag"] != "container_image_name/container_name" ||
  264. event["attrs"].(map[string]interface{})["a"] != "b" ||
  265. event["attrs"].(map[string]interface{})["foo_finder"] != "bar" ||
  266. len(event) != 4 {
  267. t.Fatalf("Unexpected event in message %v", event)
  268. }
  269. }
  270. err = hec.Close()
  271. if err != nil {
  272. t.Fatal(err)
  273. }
  274. }
  275. // Verify JSON format
  276. func TestJsonFormat(t *testing.T) {
  277. hec := NewHTTPEventCollectorMock(t)
  278. go hec.Serve()
  279. info := logger.Info{
  280. Config: map[string]string{
  281. splunkURLKey: hec.URL(),
  282. splunkTokenKey: hec.token,
  283. splunkFormatKey: splunkFormatJSON,
  284. splunkGzipCompressionKey: "true",
  285. splunkGzipCompressionLevelKey: "1",
  286. },
  287. ContainerID: "containeriid",
  288. ContainerName: "/container_name",
  289. ContainerImageID: "contaimageid",
  290. ContainerImageName: "container_image_name",
  291. }
  292. hostname, err := info.Hostname()
  293. if err != nil {
  294. t.Fatal(err)
  295. }
  296. loggerDriver, err := New(info)
  297. if err != nil {
  298. t.Fatal(err)
  299. }
  300. if !hec.connectionVerified {
  301. t.Fatal("By default connection should be verified")
  302. }
  303. splunkLoggerDriver, ok := loggerDriver.(*splunkLoggerJSON)
  304. if !ok {
  305. t.Fatal("Unexpected Splunk Logging Driver type")
  306. }
  307. if splunkLoggerDriver.url != hec.URL()+"/services/collector/event/1.0" ||
  308. splunkLoggerDriver.auth != "Splunk "+hec.token ||
  309. splunkLoggerDriver.nullMessage.Host != hostname ||
  310. splunkLoggerDriver.nullMessage.Source != "" ||
  311. splunkLoggerDriver.nullMessage.SourceType != "" ||
  312. splunkLoggerDriver.nullMessage.Index != "" ||
  313. !splunkLoggerDriver.gzipCompression ||
  314. splunkLoggerDriver.gzipCompressionLevel != gzip.BestSpeed ||
  315. splunkLoggerDriver.postMessagesFrequency != defaultPostMessagesFrequency ||
  316. splunkLoggerDriver.postMessagesBatchSize != defaultPostMessagesBatchSize ||
  317. splunkLoggerDriver.bufferMaximum != defaultBufferMaximum ||
  318. cap(splunkLoggerDriver.stream) != defaultStreamChannelSize {
  319. t.Fatal("Values do not match configuration.")
  320. }
  321. message1Time := time.Now()
  322. if err := loggerDriver.Log(&logger.Message{Line: []byte("{\"a\":\"b\"}"), Source: "stdout", Timestamp: message1Time}); err != nil {
  323. t.Fatal(err)
  324. }
  325. message2Time := time.Now()
  326. if err := loggerDriver.Log(&logger.Message{Line: []byte("notjson"), Source: "stdout", Timestamp: message2Time}); err != nil {
  327. t.Fatal(err)
  328. }
  329. err = loggerDriver.Close()
  330. if err != nil {
  331. t.Fatal(err)
  332. }
  333. if len(hec.messages) != 2 {
  334. t.Fatal("Expected two messages")
  335. }
  336. message1 := hec.messages[0]
  337. if message1.Time != fmt.Sprintf("%f", float64(message1Time.UnixNano())/float64(time.Second)) ||
  338. message1.Host != hostname ||
  339. message1.Source != "" ||
  340. message1.SourceType != "" ||
  341. message1.Index != "" {
  342. t.Fatalf("Unexpected values of message 1 %v", message1)
  343. }
  344. if event, err := message1.EventAsMap(); err != nil {
  345. t.Fatal(err)
  346. } else {
  347. if event["line"].(map[string]interface{})["a"] != "b" ||
  348. event["source"] != "stdout" ||
  349. event["tag"] != "containeriid" ||
  350. len(event) != 3 {
  351. t.Fatalf("Unexpected event in message 1 %v", event)
  352. }
  353. }
  354. message2 := hec.messages[1]
  355. if message2.Time != fmt.Sprintf("%f", float64(message2Time.UnixNano())/float64(time.Second)) ||
  356. message2.Host != hostname ||
  357. message2.Source != "" ||
  358. message2.SourceType != "" ||
  359. message2.Index != "" {
  360. t.Fatalf("Unexpected values of message 2 %v", message2)
  361. }
  362. // If message cannot be parsed as JSON - it should be sent as a line
  363. if event, err := message2.EventAsMap(); err != nil {
  364. t.Fatal(err)
  365. } else {
  366. if event["line"] != "notjson" ||
  367. event["source"] != "stdout" ||
  368. event["tag"] != "containeriid" ||
  369. len(event) != 3 {
  370. t.Fatalf("Unexpected event in message 2 %v", event)
  371. }
  372. }
  373. err = hec.Close()
  374. if err != nil {
  375. t.Fatal(err)
  376. }
  377. }
  378. // Verify raw format
  379. func TestRawFormat(t *testing.T) {
  380. hec := NewHTTPEventCollectorMock(t)
  381. go hec.Serve()
  382. info := logger.Info{
  383. Config: map[string]string{
  384. splunkURLKey: hec.URL(),
  385. splunkTokenKey: hec.token,
  386. splunkFormatKey: splunkFormatRaw,
  387. },
  388. ContainerID: "containeriid",
  389. ContainerName: "/container_name",
  390. ContainerImageID: "contaimageid",
  391. ContainerImageName: "container_image_name",
  392. }
  393. hostname, err := info.Hostname()
  394. require.NoError(t, err)
  395. loggerDriver, err := New(info)
  396. require.NoError(t, err)
  397. if !hec.connectionVerified {
  398. t.Fatal("By default connection should be verified")
  399. }
  400. splunkLoggerDriver, ok := loggerDriver.(*splunkLoggerRaw)
  401. if !ok {
  402. t.Fatal("Unexpected Splunk Logging Driver type")
  403. }
  404. if splunkLoggerDriver.url != hec.URL()+"/services/collector/event/1.0" ||
  405. splunkLoggerDriver.auth != "Splunk "+hec.token ||
  406. splunkLoggerDriver.nullMessage.Host != hostname ||
  407. splunkLoggerDriver.nullMessage.Source != "" ||
  408. splunkLoggerDriver.nullMessage.SourceType != "" ||
  409. splunkLoggerDriver.nullMessage.Index != "" ||
  410. splunkLoggerDriver.gzipCompression ||
  411. splunkLoggerDriver.postMessagesFrequency != defaultPostMessagesFrequency ||
  412. splunkLoggerDriver.postMessagesBatchSize != defaultPostMessagesBatchSize ||
  413. splunkLoggerDriver.bufferMaximum != defaultBufferMaximum ||
  414. cap(splunkLoggerDriver.stream) != defaultStreamChannelSize ||
  415. string(splunkLoggerDriver.prefix) != "containeriid " {
  416. t.Fatal("Values do not match configuration.")
  417. }
  418. message1Time := time.Now()
  419. if err := loggerDriver.Log(&logger.Message{Line: []byte("{\"a\":\"b\"}"), Source: "stdout", Timestamp: message1Time}); err != nil {
  420. t.Fatal(err)
  421. }
  422. message2Time := time.Now()
  423. if err := loggerDriver.Log(&logger.Message{Line: []byte("notjson"), Source: "stdout", Timestamp: message2Time}); err != nil {
  424. t.Fatal(err)
  425. }
  426. err = loggerDriver.Close()
  427. if err != nil {
  428. t.Fatal(err)
  429. }
  430. if len(hec.messages) != 2 {
  431. t.Fatal("Expected two messages")
  432. }
  433. message1 := hec.messages[0]
  434. if message1.Time != fmt.Sprintf("%f", float64(message1Time.UnixNano())/float64(time.Second)) ||
  435. message1.Host != hostname ||
  436. message1.Source != "" ||
  437. message1.SourceType != "" ||
  438. message1.Index != "" {
  439. t.Fatalf("Unexpected values of message 1 %v", message1)
  440. }
  441. if event, err := message1.EventAsString(); err != nil {
  442. t.Fatal(err)
  443. } else {
  444. if event != "containeriid {\"a\":\"b\"}" {
  445. t.Fatalf("Unexpected event in message 1 %v", event)
  446. }
  447. }
  448. message2 := hec.messages[1]
  449. if message2.Time != fmt.Sprintf("%f", float64(message2Time.UnixNano())/float64(time.Second)) ||
  450. message2.Host != hostname ||
  451. message2.Source != "" ||
  452. message2.SourceType != "" ||
  453. message2.Index != "" {
  454. t.Fatalf("Unexpected values of message 2 %v", message2)
  455. }
  456. if event, err := message2.EventAsString(); err != nil {
  457. t.Fatal(err)
  458. } else {
  459. if event != "containeriid notjson" {
  460. t.Fatalf("Unexpected event in message 1 %v", event)
  461. }
  462. }
  463. err = hec.Close()
  464. if err != nil {
  465. t.Fatal(err)
  466. }
  467. }
  468. // Verify raw format with labels
  469. func TestRawFormatWithLabels(t *testing.T) {
  470. hec := NewHTTPEventCollectorMock(t)
  471. go hec.Serve()
  472. info := logger.Info{
  473. Config: map[string]string{
  474. splunkURLKey: hec.URL(),
  475. splunkTokenKey: hec.token,
  476. splunkFormatKey: splunkFormatRaw,
  477. labelsKey: "a",
  478. },
  479. ContainerID: "containeriid",
  480. ContainerName: "/container_name",
  481. ContainerImageID: "contaimageid",
  482. ContainerImageName: "container_image_name",
  483. ContainerLabels: map[string]string{
  484. "a": "b",
  485. },
  486. }
  487. hostname, err := info.Hostname()
  488. if err != nil {
  489. t.Fatal(err)
  490. }
  491. loggerDriver, err := New(info)
  492. if err != nil {
  493. t.Fatal(err)
  494. }
  495. if !hec.connectionVerified {
  496. t.Fatal("By default connection should be verified")
  497. }
  498. splunkLoggerDriver, ok := loggerDriver.(*splunkLoggerRaw)
  499. if !ok {
  500. t.Fatal("Unexpected Splunk Logging Driver type")
  501. }
  502. if splunkLoggerDriver.url != hec.URL()+"/services/collector/event/1.0" ||
  503. splunkLoggerDriver.auth != "Splunk "+hec.token ||
  504. splunkLoggerDriver.nullMessage.Host != hostname ||
  505. splunkLoggerDriver.nullMessage.Source != "" ||
  506. splunkLoggerDriver.nullMessage.SourceType != "" ||
  507. splunkLoggerDriver.nullMessage.Index != "" ||
  508. splunkLoggerDriver.gzipCompression ||
  509. splunkLoggerDriver.postMessagesFrequency != defaultPostMessagesFrequency ||
  510. splunkLoggerDriver.postMessagesBatchSize != defaultPostMessagesBatchSize ||
  511. splunkLoggerDriver.bufferMaximum != defaultBufferMaximum ||
  512. cap(splunkLoggerDriver.stream) != defaultStreamChannelSize ||
  513. string(splunkLoggerDriver.prefix) != "containeriid a=b " {
  514. t.Fatal("Values do not match configuration.")
  515. }
  516. message1Time := time.Now()
  517. if err := loggerDriver.Log(&logger.Message{Line: []byte("{\"a\":\"b\"}"), Source: "stdout", Timestamp: message1Time}); err != nil {
  518. t.Fatal(err)
  519. }
  520. message2Time := time.Now()
  521. if err := loggerDriver.Log(&logger.Message{Line: []byte("notjson"), Source: "stdout", Timestamp: message2Time}); err != nil {
  522. t.Fatal(err)
  523. }
  524. err = loggerDriver.Close()
  525. if err != nil {
  526. t.Fatal(err)
  527. }
  528. if len(hec.messages) != 2 {
  529. t.Fatal("Expected two messages")
  530. }
  531. message1 := hec.messages[0]
  532. if message1.Time != fmt.Sprintf("%f", float64(message1Time.UnixNano())/float64(time.Second)) ||
  533. message1.Host != hostname ||
  534. message1.Source != "" ||
  535. message1.SourceType != "" ||
  536. message1.Index != "" {
  537. t.Fatalf("Unexpected values of message 1 %v", message1)
  538. }
  539. if event, err := message1.EventAsString(); err != nil {
  540. t.Fatal(err)
  541. } else {
  542. if event != "containeriid a=b {\"a\":\"b\"}" {
  543. t.Fatalf("Unexpected event in message 1 %v", event)
  544. }
  545. }
  546. message2 := hec.messages[1]
  547. if message2.Time != fmt.Sprintf("%f", float64(message2Time.UnixNano())/float64(time.Second)) ||
  548. message2.Host != hostname ||
  549. message2.Source != "" ||
  550. message2.SourceType != "" ||
  551. message2.Index != "" {
  552. t.Fatalf("Unexpected values of message 2 %v", message2)
  553. }
  554. if event, err := message2.EventAsString(); err != nil {
  555. t.Fatal(err)
  556. } else {
  557. if event != "containeriid a=b notjson" {
  558. t.Fatalf("Unexpected event in message 2 %v", event)
  559. }
  560. }
  561. err = hec.Close()
  562. if err != nil {
  563. t.Fatal(err)
  564. }
  565. }
  566. // Verify that Splunk Logging Driver can accept tag="" which will allow to send raw messages
  567. // in the same way we get them in stdout/stderr
  568. func TestRawFormatWithoutTag(t *testing.T) {
  569. hec := NewHTTPEventCollectorMock(t)
  570. go hec.Serve()
  571. info := logger.Info{
  572. Config: map[string]string{
  573. splunkURLKey: hec.URL(),
  574. splunkTokenKey: hec.token,
  575. splunkFormatKey: splunkFormatRaw,
  576. tagKey: "",
  577. },
  578. ContainerID: "containeriid",
  579. ContainerName: "/container_name",
  580. ContainerImageID: "contaimageid",
  581. ContainerImageName: "container_image_name",
  582. }
  583. hostname, err := info.Hostname()
  584. if err != nil {
  585. t.Fatal(err)
  586. }
  587. loggerDriver, err := New(info)
  588. if err != nil {
  589. t.Fatal(err)
  590. }
  591. if !hec.connectionVerified {
  592. t.Fatal("By default connection should be verified")
  593. }
  594. splunkLoggerDriver, ok := loggerDriver.(*splunkLoggerRaw)
  595. if !ok {
  596. t.Fatal("Unexpected Splunk Logging Driver type")
  597. }
  598. if splunkLoggerDriver.url != hec.URL()+"/services/collector/event/1.0" ||
  599. splunkLoggerDriver.auth != "Splunk "+hec.token ||
  600. splunkLoggerDriver.nullMessage.Host != hostname ||
  601. splunkLoggerDriver.nullMessage.Source != "" ||
  602. splunkLoggerDriver.nullMessage.SourceType != "" ||
  603. splunkLoggerDriver.nullMessage.Index != "" ||
  604. splunkLoggerDriver.gzipCompression ||
  605. splunkLoggerDriver.postMessagesFrequency != defaultPostMessagesFrequency ||
  606. splunkLoggerDriver.postMessagesBatchSize != defaultPostMessagesBatchSize ||
  607. splunkLoggerDriver.bufferMaximum != defaultBufferMaximum ||
  608. cap(splunkLoggerDriver.stream) != defaultStreamChannelSize ||
  609. string(splunkLoggerDriver.prefix) != "" {
  610. t.Log(string(splunkLoggerDriver.prefix) + "a")
  611. t.Fatal("Values do not match configuration.")
  612. }
  613. message1Time := time.Now()
  614. if err := loggerDriver.Log(&logger.Message{Line: []byte("{\"a\":\"b\"}"), Source: "stdout", Timestamp: message1Time}); err != nil {
  615. t.Fatal(err)
  616. }
  617. message2Time := time.Now()
  618. if err := loggerDriver.Log(&logger.Message{Line: []byte("notjson"), Source: "stdout", Timestamp: message2Time}); err != nil {
  619. t.Fatal(err)
  620. }
  621. message3Time := time.Now()
  622. if err := loggerDriver.Log(&logger.Message{Line: []byte(" "), Source: "stdout", Timestamp: message3Time}); err != nil {
  623. t.Fatal(err)
  624. }
  625. err = loggerDriver.Close()
  626. if err != nil {
  627. t.Fatal(err)
  628. }
  629. // message3 would have an empty or whitespace only string in the "event" field
  630. // both of which are not acceptable to HEC
  631. // thus here we must expect 2 messages, not 3
  632. if len(hec.messages) != 2 {
  633. t.Fatal("Expected two messages")
  634. }
  635. message1 := hec.messages[0]
  636. if message1.Time != fmt.Sprintf("%f", float64(message1Time.UnixNano())/float64(time.Second)) ||
  637. message1.Host != hostname ||
  638. message1.Source != "" ||
  639. message1.SourceType != "" ||
  640. message1.Index != "" {
  641. t.Fatalf("Unexpected values of message 1 %v", message1)
  642. }
  643. if event, err := message1.EventAsString(); err != nil {
  644. t.Fatal(err)
  645. } else {
  646. if event != "{\"a\":\"b\"}" {
  647. t.Fatalf("Unexpected event in message 1 %v", event)
  648. }
  649. }
  650. message2 := hec.messages[1]
  651. if message2.Time != fmt.Sprintf("%f", float64(message2Time.UnixNano())/float64(time.Second)) ||
  652. message2.Host != hostname ||
  653. message2.Source != "" ||
  654. message2.SourceType != "" ||
  655. message2.Index != "" {
  656. t.Fatalf("Unexpected values of message 2 %v", message2)
  657. }
  658. if event, err := message2.EventAsString(); err != nil {
  659. t.Fatal(err)
  660. } else {
  661. if event != "notjson" {
  662. t.Fatalf("Unexpected event in message 2 %v", event)
  663. }
  664. }
  665. err = hec.Close()
  666. if err != nil {
  667. t.Fatal(err)
  668. }
  669. }
  670. // Verify that we will send messages in batches with default batching parameters,
  671. // but change frequency to be sure that numOfRequests will match expected 17 requests
  672. func TestBatching(t *testing.T) {
  673. if err := os.Setenv(envVarPostMessagesFrequency, "10h"); err != nil {
  674. t.Fatal(err)
  675. }
  676. hec := NewHTTPEventCollectorMock(t)
  677. go hec.Serve()
  678. info := logger.Info{
  679. Config: map[string]string{
  680. splunkURLKey: hec.URL(),
  681. splunkTokenKey: hec.token,
  682. },
  683. ContainerID: "containeriid",
  684. ContainerName: "/container_name",
  685. ContainerImageID: "contaimageid",
  686. ContainerImageName: "container_image_name",
  687. }
  688. loggerDriver, err := New(info)
  689. if err != nil {
  690. t.Fatal(err)
  691. }
  692. for i := 0; i < defaultStreamChannelSize*4; i++ {
  693. if err := loggerDriver.Log(&logger.Message{Line: []byte(fmt.Sprintf("%d", i)), Source: "stdout", Timestamp: time.Now()}); err != nil {
  694. t.Fatal(err)
  695. }
  696. }
  697. err = loggerDriver.Close()
  698. if err != nil {
  699. t.Fatal(err)
  700. }
  701. if len(hec.messages) != defaultStreamChannelSize*4 {
  702. t.Fatal("Not all messages delivered")
  703. }
  704. for i, message := range hec.messages {
  705. if event, err := message.EventAsMap(); err != nil {
  706. t.Fatal(err)
  707. } else {
  708. if event["line"] != fmt.Sprintf("%d", i) {
  709. t.Fatalf("Unexpected event in message %v", event)
  710. }
  711. }
  712. }
  713. // 1 to verify connection and 16 batches
  714. if hec.numOfRequests != 17 {
  715. t.Fatalf("Unexpected number of requests %d", hec.numOfRequests)
  716. }
  717. err = hec.Close()
  718. if err != nil {
  719. t.Fatal(err)
  720. }
  721. if err := os.Setenv(envVarPostMessagesFrequency, ""); err != nil {
  722. t.Fatal(err)
  723. }
  724. }
  725. // Verify that test is using time to fire events not rare than specified frequency
  726. func TestFrequency(t *testing.T) {
  727. if err := os.Setenv(envVarPostMessagesFrequency, "5ms"); err != nil {
  728. t.Fatal(err)
  729. }
  730. hec := NewHTTPEventCollectorMock(t)
  731. go hec.Serve()
  732. info := logger.Info{
  733. Config: map[string]string{
  734. splunkURLKey: hec.URL(),
  735. splunkTokenKey: hec.token,
  736. },
  737. ContainerID: "containeriid",
  738. ContainerName: "/container_name",
  739. ContainerImageID: "contaimageid",
  740. ContainerImageName: "container_image_name",
  741. }
  742. loggerDriver, err := New(info)
  743. if err != nil {
  744. t.Fatal(err)
  745. }
  746. for i := 0; i < 10; i++ {
  747. if err := loggerDriver.Log(&logger.Message{Line: []byte(fmt.Sprintf("%d", i)), Source: "stdout", Timestamp: time.Now()}); err != nil {
  748. t.Fatal(err)
  749. }
  750. time.Sleep(15 * time.Millisecond)
  751. }
  752. err = loggerDriver.Close()
  753. if err != nil {
  754. t.Fatal(err)
  755. }
  756. if len(hec.messages) != 10 {
  757. t.Fatal("Not all messages delivered")
  758. }
  759. for i, message := range hec.messages {
  760. if event, err := message.EventAsMap(); err != nil {
  761. t.Fatal(err)
  762. } else {
  763. if event["line"] != fmt.Sprintf("%d", i) {
  764. t.Fatalf("Unexpected event in message %v", event)
  765. }
  766. }
  767. }
  768. // 1 to verify connection and 10 to verify that we have sent messages with required frequency,
  769. // but because frequency is too small (to keep test quick), instead of 11, use 9 if context switches will be slow
  770. if hec.numOfRequests < 9 {
  771. t.Fatalf("Unexpected number of requests %d", hec.numOfRequests)
  772. }
  773. err = hec.Close()
  774. if err != nil {
  775. t.Fatal(err)
  776. }
  777. if err := os.Setenv(envVarPostMessagesFrequency, ""); err != nil {
  778. t.Fatal(err)
  779. }
  780. }
  781. // Simulate behavior similar to first version of Splunk Logging Driver, when we were sending one message
  782. // per request
  783. func TestOneMessagePerRequest(t *testing.T) {
  784. if err := os.Setenv(envVarPostMessagesFrequency, "10h"); err != nil {
  785. t.Fatal(err)
  786. }
  787. if err := os.Setenv(envVarPostMessagesBatchSize, "1"); err != nil {
  788. t.Fatal(err)
  789. }
  790. if err := os.Setenv(envVarBufferMaximum, "1"); err != nil {
  791. t.Fatal(err)
  792. }
  793. if err := os.Setenv(envVarStreamChannelSize, "0"); err != nil {
  794. t.Fatal(err)
  795. }
  796. hec := NewHTTPEventCollectorMock(t)
  797. go hec.Serve()
  798. info := logger.Info{
  799. Config: map[string]string{
  800. splunkURLKey: hec.URL(),
  801. splunkTokenKey: hec.token,
  802. },
  803. ContainerID: "containeriid",
  804. ContainerName: "/container_name",
  805. ContainerImageID: "contaimageid",
  806. ContainerImageName: "container_image_name",
  807. }
  808. loggerDriver, err := New(info)
  809. if err != nil {
  810. t.Fatal(err)
  811. }
  812. for i := 0; i < 10; i++ {
  813. if err := loggerDriver.Log(&logger.Message{Line: []byte(fmt.Sprintf("%d", i)), Source: "stdout", Timestamp: time.Now()}); err != nil {
  814. t.Fatal(err)
  815. }
  816. }
  817. err = loggerDriver.Close()
  818. if err != nil {
  819. t.Fatal(err)
  820. }
  821. if len(hec.messages) != 10 {
  822. t.Fatal("Not all messages delivered")
  823. }
  824. for i, message := range hec.messages {
  825. if event, err := message.EventAsMap(); err != nil {
  826. t.Fatal(err)
  827. } else {
  828. if event["line"] != fmt.Sprintf("%d", i) {
  829. t.Fatalf("Unexpected event in message %v", event)
  830. }
  831. }
  832. }
  833. // 1 to verify connection and 10 messages
  834. if hec.numOfRequests != 11 {
  835. t.Fatalf("Unexpected number of requests %d", hec.numOfRequests)
  836. }
  837. err = hec.Close()
  838. if err != nil {
  839. t.Fatal(err)
  840. }
  841. if err := os.Setenv(envVarPostMessagesFrequency, ""); err != nil {
  842. t.Fatal(err)
  843. }
  844. if err := os.Setenv(envVarPostMessagesBatchSize, ""); err != nil {
  845. t.Fatal(err)
  846. }
  847. if err := os.Setenv(envVarBufferMaximum, ""); err != nil {
  848. t.Fatal(err)
  849. }
  850. if err := os.Setenv(envVarStreamChannelSize, ""); err != nil {
  851. t.Fatal(err)
  852. }
  853. }
  854. // Driver should not be created when HEC is unresponsive
  855. func TestVerify(t *testing.T) {
  856. hec := NewHTTPEventCollectorMock(t)
  857. hec.simulateServerError = true
  858. go hec.Serve()
  859. info := logger.Info{
  860. Config: map[string]string{
  861. splunkURLKey: hec.URL(),
  862. splunkTokenKey: hec.token,
  863. },
  864. ContainerID: "containeriid",
  865. ContainerName: "/container_name",
  866. ContainerImageID: "contaimageid",
  867. ContainerImageName: "container_image_name",
  868. }
  869. _, err := New(info)
  870. if err == nil {
  871. t.Fatal("Expecting driver to fail, when server is unresponsive")
  872. }
  873. err = hec.Close()
  874. if err != nil {
  875. t.Fatal(err)
  876. }
  877. }
  878. // Verify that user can specify to skip verification that Splunk HEC is working.
  879. // Also in this test we verify retry logic.
  880. func TestSkipVerify(t *testing.T) {
  881. hec := NewHTTPEventCollectorMock(t)
  882. hec.simulateServerError = true
  883. go hec.Serve()
  884. info := logger.Info{
  885. Config: map[string]string{
  886. splunkURLKey: hec.URL(),
  887. splunkTokenKey: hec.token,
  888. splunkVerifyConnectionKey: "false",
  889. },
  890. ContainerID: "containeriid",
  891. ContainerName: "/container_name",
  892. ContainerImageID: "contaimageid",
  893. ContainerImageName: "container_image_name",
  894. }
  895. loggerDriver, err := New(info)
  896. if err != nil {
  897. t.Fatal(err)
  898. }
  899. if hec.connectionVerified {
  900. t.Fatal("Connection should not be verified")
  901. }
  902. for i := 0; i < defaultStreamChannelSize*2; i++ {
  903. if err := loggerDriver.Log(&logger.Message{Line: []byte(fmt.Sprintf("%d", i)), Source: "stdout", Timestamp: time.Now()}); err != nil {
  904. t.Fatal(err)
  905. }
  906. }
  907. if len(hec.messages) != 0 {
  908. t.Fatal("No messages should be accepted at this point")
  909. }
  910. hec.simulateErr(false)
  911. for i := defaultStreamChannelSize * 2; i < defaultStreamChannelSize*4; i++ {
  912. if err := loggerDriver.Log(&logger.Message{Line: []byte(fmt.Sprintf("%d", i)), Source: "stdout", Timestamp: time.Now()}); err != nil {
  913. t.Fatal(err)
  914. }
  915. }
  916. err = loggerDriver.Close()
  917. if err != nil {
  918. t.Fatal(err)
  919. }
  920. if len(hec.messages) != defaultStreamChannelSize*4 {
  921. t.Fatal("Not all messages delivered")
  922. }
  923. for i, message := range hec.messages {
  924. if event, err := message.EventAsMap(); err != nil {
  925. t.Fatal(err)
  926. } else {
  927. if event["line"] != fmt.Sprintf("%d", i) {
  928. t.Fatalf("Unexpected event in message %v", event)
  929. }
  930. }
  931. }
  932. err = hec.Close()
  933. if err != nil {
  934. t.Fatal(err)
  935. }
  936. }
  937. // Verify logic for when we filled whole buffer
  938. func TestBufferMaximum(t *testing.T) {
  939. if err := os.Setenv(envVarPostMessagesBatchSize, "2"); err != nil {
  940. t.Fatal(err)
  941. }
  942. if err := os.Setenv(envVarBufferMaximum, "10"); err != nil {
  943. t.Fatal(err)
  944. }
  945. if err := os.Setenv(envVarStreamChannelSize, "0"); err != nil {
  946. t.Fatal(err)
  947. }
  948. hec := NewHTTPEventCollectorMock(t)
  949. hec.simulateErr(true)
  950. go hec.Serve()
  951. info := logger.Info{
  952. Config: map[string]string{
  953. splunkURLKey: hec.URL(),
  954. splunkTokenKey: hec.token,
  955. splunkVerifyConnectionKey: "false",
  956. },
  957. ContainerID: "containeriid",
  958. ContainerName: "/container_name",
  959. ContainerImageID: "contaimageid",
  960. ContainerImageName: "container_image_name",
  961. }
  962. loggerDriver, err := New(info)
  963. if err != nil {
  964. t.Fatal(err)
  965. }
  966. if hec.connectionVerified {
  967. t.Fatal("Connection should not be verified")
  968. }
  969. for i := 0; i < 11; i++ {
  970. if err := loggerDriver.Log(&logger.Message{Line: []byte(fmt.Sprintf("%d", i)), Source: "stdout", Timestamp: time.Now()}); err != nil {
  971. t.Fatal(err)
  972. }
  973. }
  974. if len(hec.messages) != 0 {
  975. t.Fatal("No messages should be accepted at this point")
  976. }
  977. hec.simulateServerError = false
  978. err = loggerDriver.Close()
  979. if err != nil {
  980. t.Fatal(err)
  981. }
  982. if len(hec.messages) != 9 {
  983. t.Fatalf("Expected # of messages %d, got %d", 9, len(hec.messages))
  984. }
  985. // First 1000 messages are written to daemon log when buffer was full
  986. for i, message := range hec.messages {
  987. if event, err := message.EventAsMap(); err != nil {
  988. t.Fatal(err)
  989. } else {
  990. if event["line"] != fmt.Sprintf("%d", i+2) {
  991. t.Fatalf("Unexpected event in message %v", event)
  992. }
  993. }
  994. }
  995. err = hec.Close()
  996. if err != nil {
  997. t.Fatal(err)
  998. }
  999. if err := os.Setenv(envVarPostMessagesBatchSize, ""); err != nil {
  1000. t.Fatal(err)
  1001. }
  1002. if err := os.Setenv(envVarBufferMaximum, ""); err != nil {
  1003. t.Fatal(err)
  1004. }
  1005. if err := os.Setenv(envVarStreamChannelSize, ""); err != nil {
  1006. t.Fatal(err)
  1007. }
  1008. }
  1009. // Verify that we are not blocking close when HEC is down for the whole time
  1010. func TestServerAlwaysDown(t *testing.T) {
  1011. if err := os.Setenv(envVarPostMessagesBatchSize, "2"); err != nil {
  1012. t.Fatal(err)
  1013. }
  1014. if err := os.Setenv(envVarBufferMaximum, "4"); err != nil {
  1015. t.Fatal(err)
  1016. }
  1017. if err := os.Setenv(envVarStreamChannelSize, "0"); err != nil {
  1018. t.Fatal(err)
  1019. }
  1020. hec := NewHTTPEventCollectorMock(t)
  1021. hec.simulateServerError = true
  1022. go hec.Serve()
  1023. info := logger.Info{
  1024. Config: map[string]string{
  1025. splunkURLKey: hec.URL(),
  1026. splunkTokenKey: hec.token,
  1027. splunkVerifyConnectionKey: "false",
  1028. },
  1029. ContainerID: "containeriid",
  1030. ContainerName: "/container_name",
  1031. ContainerImageID: "contaimageid",
  1032. ContainerImageName: "container_image_name",
  1033. }
  1034. loggerDriver, err := New(info)
  1035. if err != nil {
  1036. t.Fatal(err)
  1037. }
  1038. if hec.connectionVerified {
  1039. t.Fatal("Connection should not be verified")
  1040. }
  1041. for i := 0; i < 5; i++ {
  1042. if err := loggerDriver.Log(&logger.Message{Line: []byte(fmt.Sprintf("%d", i)), Source: "stdout", Timestamp: time.Now()}); err != nil {
  1043. t.Fatal(err)
  1044. }
  1045. }
  1046. err = loggerDriver.Close()
  1047. if err != nil {
  1048. t.Fatal(err)
  1049. }
  1050. if len(hec.messages) != 0 {
  1051. t.Fatal("No messages should be sent")
  1052. }
  1053. err = hec.Close()
  1054. if err != nil {
  1055. t.Fatal(err)
  1056. }
  1057. if err := os.Setenv(envVarPostMessagesBatchSize, ""); err != nil {
  1058. t.Fatal(err)
  1059. }
  1060. if err := os.Setenv(envVarBufferMaximum, ""); err != nil {
  1061. t.Fatal(err)
  1062. }
  1063. if err := os.Setenv(envVarStreamChannelSize, ""); err != nil {
  1064. t.Fatal(err)
  1065. }
  1066. }
  1067. // Cannot send messages after we close driver
  1068. func TestCannotSendAfterClose(t *testing.T) {
  1069. hec := NewHTTPEventCollectorMock(t)
  1070. go hec.Serve()
  1071. info := logger.Info{
  1072. Config: map[string]string{
  1073. splunkURLKey: hec.URL(),
  1074. splunkTokenKey: hec.token,
  1075. },
  1076. ContainerID: "containeriid",
  1077. ContainerName: "/container_name",
  1078. ContainerImageID: "contaimageid",
  1079. ContainerImageName: "container_image_name",
  1080. }
  1081. loggerDriver, err := New(info)
  1082. if err != nil {
  1083. t.Fatal(err)
  1084. }
  1085. if err := loggerDriver.Log(&logger.Message{Line: []byte("message1"), Source: "stdout", Timestamp: time.Now()}); err != nil {
  1086. t.Fatal(err)
  1087. }
  1088. err = loggerDriver.Close()
  1089. if err != nil {
  1090. t.Fatal(err)
  1091. }
  1092. if err := loggerDriver.Log(&logger.Message{Line: []byte("message2"), Source: "stdout", Timestamp: time.Now()}); err == nil {
  1093. t.Fatal("Driver should not allow to send messages after close")
  1094. }
  1095. if len(hec.messages) != 1 {
  1096. t.Fatal("Only one message should be sent")
  1097. }
  1098. message := hec.messages[0]
  1099. if event, err := message.EventAsMap(); err != nil {
  1100. t.Fatal(err)
  1101. } else {
  1102. if event["line"] != "message1" {
  1103. t.Fatalf("Unexpected event in message %v", event)
  1104. }
  1105. }
  1106. err = hec.Close()
  1107. if err != nil {
  1108. t.Fatal(err)
  1109. }
  1110. }
  1111. func TestDeadlockOnBlockedEndpoint(t *testing.T) {
  1112. hec := NewHTTPEventCollectorMock(t)
  1113. go hec.Serve()
  1114. info := logger.Info{
  1115. Config: map[string]string{
  1116. splunkURLKey: hec.URL(),
  1117. splunkTokenKey: hec.token,
  1118. },
  1119. ContainerID: "containeriid",
  1120. ContainerName: "/container_name",
  1121. ContainerImageID: "contaimageid",
  1122. ContainerImageName: "container_image_name",
  1123. }
  1124. l, err := New(info)
  1125. if err != nil {
  1126. t.Fatal(err)
  1127. }
  1128. ctx, unblock := context.WithCancel(context.Background())
  1129. hec.withBlock(ctx)
  1130. defer unblock()
  1131. batchSendTimeout = 1 * time.Second
  1132. if err := l.Log(&logger.Message{}); err != nil {
  1133. t.Fatal(err)
  1134. }
  1135. done := make(chan struct{})
  1136. go func() {
  1137. l.Close()
  1138. close(done)
  1139. }()
  1140. select {
  1141. case <-time.After(60 * time.Second):
  1142. buf := make([]byte, 1e6)
  1143. buf = buf[:runtime.Stack(buf, true)]
  1144. t.Logf("STACK DUMP: \n\n%s\n\n", string(buf))
  1145. t.Fatal("timeout waiting for close to finish")
  1146. case <-done:
  1147. }
  1148. }