splunk_test.go 33 KB


  1. package splunk
  2. import (
  3. "compress/gzip"
  4. "fmt"
  5. "os"
  6. "testing"
  7. "time"
  8. "github.com/docker/docker/daemon/logger"
  9. )
  10. // Validate options
  11. func TestValidateLogOpt(t *testing.T) {
  12. err := ValidateLogOpt(map[string]string{
  13. splunkURLKey: "http://127.0.0.1",
  14. splunkTokenKey: "2160C7EF-2CE9-4307-A180-F852B99CF417",
  15. splunkSourceKey: "mysource",
  16. splunkSourceTypeKey: "mysourcetype",
  17. splunkIndexKey: "myindex",
  18. splunkCAPathKey: "/usr/cert.pem",
  19. splunkCANameKey: "ca_name",
  20. splunkInsecureSkipVerifyKey: "true",
  21. splunkFormatKey: "json",
  22. splunkVerifyConnectionKey: "true",
  23. splunkGzipCompressionKey: "true",
  24. splunkGzipCompressionLevelKey: "1",
  25. envKey: "a",
  26. labelsKey: "b",
  27. tagKey: "c",
  28. })
  29. if err != nil {
  30. t.Fatal(err)
  31. }
  32. err = ValidateLogOpt(map[string]string{
  33. "not-supported-option": "a",
  34. })
  35. if err == nil {
  36. t.Fatal("Expecting error on unsupported options")
  37. }
  38. }
  39. // Driver require user to specify required options
  40. func TestNewMissedConfig(t *testing.T) {
  41. ctx := logger.Context{
  42. Config: map[string]string{},
  43. }
  44. _, err := New(ctx)
  45. if err == nil {
  46. t.Fatal("Logger driver should fail when no required parameters specified")
  47. }
  48. }
  49. // Driver require user to specify splunk-url
  50. func TestNewMissedUrl(t *testing.T) {
  51. ctx := logger.Context{
  52. Config: map[string]string{
  53. splunkTokenKey: "4642492F-D8BD-47F1-A005-0C08AE4657DF",
  54. },
  55. }
  56. _, err := New(ctx)
  57. if err.Error() != "splunk: splunk-url is expected" {
  58. t.Fatal("Logger driver should fail when no required parameters specified")
  59. }
  60. }
  61. // Driver require user to specify splunk-token
  62. func TestNewMissedToken(t *testing.T) {
  63. ctx := logger.Context{
  64. Config: map[string]string{
  65. splunkURLKey: "http://127.0.0.1:8088",
  66. },
  67. }
  68. _, err := New(ctx)
  69. if err.Error() != "splunk: splunk-token is expected" {
  70. t.Fatal("Logger driver should fail when no required parameters specified")
  71. }
  72. }
  73. // Test default settings
  74. func TestDefault(t *testing.T) {
  75. hec := NewHTTPEventCollectorMock(t)
  76. go hec.Serve()
  77. ctx := logger.Context{
  78. Config: map[string]string{
  79. splunkURLKey: hec.URL(),
  80. splunkTokenKey: hec.token,
  81. },
  82. ContainerID: "containeriid",
  83. ContainerName: "container_name",
  84. ContainerImageID: "contaimageid",
  85. ContainerImageName: "container_image_name",
  86. }
  87. hostname, err := ctx.Hostname()
  88. if err != nil {
  89. t.Fatal(err)
  90. }
  91. loggerDriver, err := New(ctx)
  92. if err != nil {
  93. t.Fatal(err)
  94. }
  95. if loggerDriver.Name() != driverName {
  96. t.Fatal("Unexpected logger driver name")
  97. }
  98. if !hec.connectionVerified {
  99. t.Fatal("By default connection should be verified")
  100. }
  101. splunkLoggerDriver, ok := loggerDriver.(*splunkLoggerInline)
  102. if !ok {
  103. t.Fatal("Unexpected Splunk Logging Driver type")
  104. }
  105. if splunkLoggerDriver.url != hec.URL()+"/services/collector/event/1.0" ||
  106. splunkLoggerDriver.auth != "Splunk "+hec.token ||
  107. splunkLoggerDriver.nullMessage.Host != hostname ||
  108. splunkLoggerDriver.nullMessage.Source != "" ||
  109. splunkLoggerDriver.nullMessage.SourceType != "" ||
  110. splunkLoggerDriver.nullMessage.Index != "" ||
  111. splunkLoggerDriver.gzipCompression != false ||
  112. splunkLoggerDriver.postMessagesFrequency != defaultPostMessagesFrequency ||
  113. splunkLoggerDriver.postMessagesBatchSize != defaultPostMessagesBatchSize ||
  114. splunkLoggerDriver.bufferMaximum != defaultBufferMaximum ||
  115. cap(splunkLoggerDriver.stream) != defaultStreamChannelSize {
  116. t.Fatal("Found not default values setup in Splunk Logging Driver.")
  117. }
  118. message1Time := time.Now()
  119. if err := loggerDriver.Log(&logger.Message{[]byte("{\"a\":\"b\"}"), "stdout", message1Time, nil, false}); err != nil {
  120. t.Fatal(err)
  121. }
  122. message2Time := time.Now()
  123. if err := loggerDriver.Log(&logger.Message{[]byte("notajson"), "stdout", message2Time, nil, false}); err != nil {
  124. t.Fatal(err)
  125. }
  126. err = loggerDriver.Close()
  127. if err != nil {
  128. t.Fatal(err)
  129. }
  130. if len(hec.messages) != 2 {
  131. t.Fatal("Expected two messages")
  132. }
  133. if *hec.gzipEnabled {
  134. t.Fatal("Gzip should not be used")
  135. }
  136. message1 := hec.messages[0]
  137. if message1.Time != fmt.Sprintf("%f", float64(message1Time.UnixNano())/float64(time.Second)) ||
  138. message1.Host != hostname ||
  139. message1.Source != "" ||
  140. message1.SourceType != "" ||
  141. message1.Index != "" {
  142. t.Fatalf("Unexpected values of message 1 %v", message1)
  143. }
  144. if event, err := message1.EventAsMap(); err != nil {
  145. t.Fatal(err)
  146. } else {
  147. if event["line"] != "{\"a\":\"b\"}" ||
  148. event["source"] != "stdout" ||
  149. event["tag"] != "containeriid" ||
  150. len(event) != 3 {
  151. t.Fatalf("Unexpected event in message %v", event)
  152. }
  153. }
  154. message2 := hec.messages[1]
  155. if message2.Time != fmt.Sprintf("%f", float64(message2Time.UnixNano())/float64(time.Second)) ||
  156. message2.Host != hostname ||
  157. message2.Source != "" ||
  158. message2.SourceType != "" ||
  159. message2.Index != "" {
  160. t.Fatalf("Unexpected values of message 1 %v", message2)
  161. }
  162. if event, err := message2.EventAsMap(); err != nil {
  163. t.Fatal(err)
  164. } else {
  165. if event["line"] != "notajson" ||
  166. event["source"] != "stdout" ||
  167. event["tag"] != "containeriid" ||
  168. len(event) != 3 {
  169. t.Fatalf("Unexpected event in message %v", event)
  170. }
  171. }
  172. err = hec.Close()
  173. if err != nil {
  174. t.Fatal(err)
  175. }
  176. }
  177. // Verify inline format with a not default settings for most of options
  178. func TestInlineFormatWithNonDefaultOptions(t *testing.T) {
  179. hec := NewHTTPEventCollectorMock(t)
  180. go hec.Serve()
  181. ctx := logger.Context{
  182. Config: map[string]string{
  183. splunkURLKey: hec.URL(),
  184. splunkTokenKey: hec.token,
  185. splunkSourceKey: "mysource",
  186. splunkSourceTypeKey: "mysourcetype",
  187. splunkIndexKey: "myindex",
  188. splunkFormatKey: splunkFormatInline,
  189. splunkGzipCompressionKey: "true",
  190. tagKey: "{{.ImageName}}/{{.Name}}",
  191. labelsKey: "a",
  192. },
  193. ContainerID: "containeriid",
  194. ContainerName: "/container_name",
  195. ContainerImageID: "contaimageid",
  196. ContainerImageName: "container_image_name",
  197. ContainerLabels: map[string]string{
  198. "a": "b",
  199. },
  200. }
  201. hostname, err := ctx.Hostname()
  202. if err != nil {
  203. t.Fatal(err)
  204. }
  205. loggerDriver, err := New(ctx)
  206. if err != nil {
  207. t.Fatal(err)
  208. }
  209. if !hec.connectionVerified {
  210. t.Fatal("By default connection should be verified")
  211. }
  212. splunkLoggerDriver, ok := loggerDriver.(*splunkLoggerInline)
  213. if !ok {
  214. t.Fatal("Unexpected Splunk Logging Driver type")
  215. }
  216. if splunkLoggerDriver.url != hec.URL()+"/services/collector/event/1.0" ||
  217. splunkLoggerDriver.auth != "Splunk "+hec.token ||
  218. splunkLoggerDriver.nullMessage.Host != hostname ||
  219. splunkLoggerDriver.nullMessage.Source != "mysource" ||
  220. splunkLoggerDriver.nullMessage.SourceType != "mysourcetype" ||
  221. splunkLoggerDriver.nullMessage.Index != "myindex" ||
  222. splunkLoggerDriver.gzipCompression != true ||
  223. splunkLoggerDriver.gzipCompressionLevel != gzip.DefaultCompression ||
  224. splunkLoggerDriver.postMessagesFrequency != defaultPostMessagesFrequency ||
  225. splunkLoggerDriver.postMessagesBatchSize != defaultPostMessagesBatchSize ||
  226. splunkLoggerDriver.bufferMaximum != defaultBufferMaximum ||
  227. cap(splunkLoggerDriver.stream) != defaultStreamChannelSize {
  228. t.Fatal("Values do not match configuration.")
  229. }
  230. messageTime := time.Now()
  231. if err := loggerDriver.Log(&logger.Message{[]byte("1"), "stdout", messageTime, nil, false}); err != nil {
  232. t.Fatal(err)
  233. }
  234. err = loggerDriver.Close()
  235. if err != nil {
  236. t.Fatal(err)
  237. }
  238. if len(hec.messages) != 1 {
  239. t.Fatal("Expected one message")
  240. }
  241. if !*hec.gzipEnabled {
  242. t.Fatal("Gzip should be used")
  243. }
  244. message := hec.messages[0]
  245. if message.Time != fmt.Sprintf("%f", float64(messageTime.UnixNano())/float64(time.Second)) ||
  246. message.Host != hostname ||
  247. message.Source != "mysource" ||
  248. message.SourceType != "mysourcetype" ||
  249. message.Index != "myindex" {
  250. t.Fatalf("Unexpected values of message %v", message)
  251. }
  252. if event, err := message.EventAsMap(); err != nil {
  253. t.Fatal(err)
  254. } else {
  255. if event["line"] != "1" ||
  256. event["source"] != "stdout" ||
  257. event["tag"] != "container_image_name/container_name" ||
  258. event["attrs"].(map[string]interface{})["a"] != "b" ||
  259. len(event) != 4 {
  260. t.Fatalf("Unexpected event in message %v", event)
  261. }
  262. }
  263. err = hec.Close()
  264. if err != nil {
  265. t.Fatal(err)
  266. }
  267. }
  268. // Verify JSON format
  269. func TestJsonFormat(t *testing.T) {
  270. hec := NewHTTPEventCollectorMock(t)
  271. go hec.Serve()
  272. ctx := logger.Context{
  273. Config: map[string]string{
  274. splunkURLKey: hec.URL(),
  275. splunkTokenKey: hec.token,
  276. splunkFormatKey: splunkFormatJSON,
  277. splunkGzipCompressionKey: "true",
  278. splunkGzipCompressionLevelKey: "1",
  279. },
  280. ContainerID: "containeriid",
  281. ContainerName: "/container_name",
  282. ContainerImageID: "contaimageid",
  283. ContainerImageName: "container_image_name",
  284. }
  285. hostname, err := ctx.Hostname()
  286. if err != nil {
  287. t.Fatal(err)
  288. }
  289. loggerDriver, err := New(ctx)
  290. if err != nil {
  291. t.Fatal(err)
  292. }
  293. if !hec.connectionVerified {
  294. t.Fatal("By default connection should be verified")
  295. }
  296. splunkLoggerDriver, ok := loggerDriver.(*splunkLoggerJSON)
  297. if !ok {
  298. t.Fatal("Unexpected Splunk Logging Driver type")
  299. }
  300. if splunkLoggerDriver.url != hec.URL()+"/services/collector/event/1.0" ||
  301. splunkLoggerDriver.auth != "Splunk "+hec.token ||
  302. splunkLoggerDriver.nullMessage.Host != hostname ||
  303. splunkLoggerDriver.nullMessage.Source != "" ||
  304. splunkLoggerDriver.nullMessage.SourceType != "" ||
  305. splunkLoggerDriver.nullMessage.Index != "" ||
  306. splunkLoggerDriver.gzipCompression != true ||
  307. splunkLoggerDriver.gzipCompressionLevel != gzip.BestSpeed ||
  308. splunkLoggerDriver.postMessagesFrequency != defaultPostMessagesFrequency ||
  309. splunkLoggerDriver.postMessagesBatchSize != defaultPostMessagesBatchSize ||
  310. splunkLoggerDriver.bufferMaximum != defaultBufferMaximum ||
  311. cap(splunkLoggerDriver.stream) != defaultStreamChannelSize {
  312. t.Fatal("Values do not match configuration.")
  313. }
  314. message1Time := time.Now()
  315. if err := loggerDriver.Log(&logger.Message{[]byte("{\"a\":\"b\"}"), "stdout", message1Time, nil, false}); err != nil {
  316. t.Fatal(err)
  317. }
  318. message2Time := time.Now()
  319. if err := loggerDriver.Log(&logger.Message{[]byte("notjson"), "stdout", message2Time, nil, false}); err != nil {
  320. t.Fatal(err)
  321. }
  322. err = loggerDriver.Close()
  323. if err != nil {
  324. t.Fatal(err)
  325. }
  326. if len(hec.messages) != 2 {
  327. t.Fatal("Expected two messages")
  328. }
  329. message1 := hec.messages[0]
  330. if message1.Time != fmt.Sprintf("%f", float64(message1Time.UnixNano())/float64(time.Second)) ||
  331. message1.Host != hostname ||
  332. message1.Source != "" ||
  333. message1.SourceType != "" ||
  334. message1.Index != "" {
  335. t.Fatalf("Unexpected values of message 1 %v", message1)
  336. }
  337. if event, err := message1.EventAsMap(); err != nil {
  338. t.Fatal(err)
  339. } else {
  340. if event["line"].(map[string]interface{})["a"] != "b" ||
  341. event["source"] != "stdout" ||
  342. event["tag"] != "containeriid" ||
  343. len(event) != 3 {
  344. t.Fatalf("Unexpected event in message 1 %v", event)
  345. }
  346. }
  347. message2 := hec.messages[1]
  348. if message2.Time != fmt.Sprintf("%f", float64(message2Time.UnixNano())/float64(time.Second)) ||
  349. message2.Host != hostname ||
  350. message2.Source != "" ||
  351. message2.SourceType != "" ||
  352. message2.Index != "" {
  353. t.Fatalf("Unexpected values of message 2 %v", message2)
  354. }
  355. // If message cannot be parsed as JSON - it should be sent as a line
  356. if event, err := message2.EventAsMap(); err != nil {
  357. t.Fatal(err)
  358. } else {
  359. if event["line"] != "notjson" ||
  360. event["source"] != "stdout" ||
  361. event["tag"] != "containeriid" ||
  362. len(event) != 3 {
  363. t.Fatalf("Unexpected event in message 2 %v", event)
  364. }
  365. }
  366. err = hec.Close()
  367. if err != nil {
  368. t.Fatal(err)
  369. }
  370. }
  371. // Verify raw format
  372. func TestRawFormat(t *testing.T) {
  373. hec := NewHTTPEventCollectorMock(t)
  374. go hec.Serve()
  375. ctx := logger.Context{
  376. Config: map[string]string{
  377. splunkURLKey: hec.URL(),
  378. splunkTokenKey: hec.token,
  379. splunkFormatKey: splunkFormatRaw,
  380. },
  381. ContainerID: "containeriid",
  382. ContainerName: "/container_name",
  383. ContainerImageID: "contaimageid",
  384. ContainerImageName: "container_image_name",
  385. }
  386. hostname, err := ctx.Hostname()
  387. if err != nil {
  388. t.Fatal(err)
  389. }
  390. loggerDriver, err := New(ctx)
  391. if err != nil {
  392. t.Fatal(err)
  393. }
  394. if !hec.connectionVerified {
  395. t.Fatal("By default connection should be verified")
  396. }
  397. splunkLoggerDriver, ok := loggerDriver.(*splunkLoggerRaw)
  398. if !ok {
  399. t.Fatal("Unexpected Splunk Logging Driver type")
  400. }
  401. if splunkLoggerDriver.url != hec.URL()+"/services/collector/event/1.0" ||
  402. splunkLoggerDriver.auth != "Splunk "+hec.token ||
  403. splunkLoggerDriver.nullMessage.Host != hostname ||
  404. splunkLoggerDriver.nullMessage.Source != "" ||
  405. splunkLoggerDriver.nullMessage.SourceType != "" ||
  406. splunkLoggerDriver.nullMessage.Index != "" ||
  407. splunkLoggerDriver.gzipCompression != false ||
  408. splunkLoggerDriver.postMessagesFrequency != defaultPostMessagesFrequency ||
  409. splunkLoggerDriver.postMessagesBatchSize != defaultPostMessagesBatchSize ||
  410. splunkLoggerDriver.bufferMaximum != defaultBufferMaximum ||
  411. cap(splunkLoggerDriver.stream) != defaultStreamChannelSize ||
  412. string(splunkLoggerDriver.prefix) != "containeriid " {
  413. t.Fatal("Values do not match configuration.")
  414. }
  415. message1Time := time.Now()
  416. if err := loggerDriver.Log(&logger.Message{[]byte("{\"a\":\"b\"}"), "stdout", message1Time, nil, false}); err != nil {
  417. t.Fatal(err)
  418. }
  419. message2Time := time.Now()
  420. if err := loggerDriver.Log(&logger.Message{[]byte("notjson"), "stdout", message2Time, nil, false}); err != nil {
  421. t.Fatal(err)
  422. }
  423. err = loggerDriver.Close()
  424. if err != nil {
  425. t.Fatal(err)
  426. }
  427. if len(hec.messages) != 2 {
  428. t.Fatal("Expected two messages")
  429. }
  430. message1 := hec.messages[0]
  431. if message1.Time != fmt.Sprintf("%f", float64(message1Time.UnixNano())/float64(time.Second)) ||
  432. message1.Host != hostname ||
  433. message1.Source != "" ||
  434. message1.SourceType != "" ||
  435. message1.Index != "" {
  436. t.Fatalf("Unexpected values of message 1 %v", message1)
  437. }
  438. if event, err := message1.EventAsString(); err != nil {
  439. t.Fatal(err)
  440. } else {
  441. if event != "containeriid {\"a\":\"b\"}" {
  442. t.Fatalf("Unexpected event in message 1 %v", event)
  443. }
  444. }
  445. message2 := hec.messages[1]
  446. if message2.Time != fmt.Sprintf("%f", float64(message2Time.UnixNano())/float64(time.Second)) ||
  447. message2.Host != hostname ||
  448. message2.Source != "" ||
  449. message2.SourceType != "" ||
  450. message2.Index != "" {
  451. t.Fatalf("Unexpected values of message 2 %v", message2)
  452. }
  453. if event, err := message2.EventAsString(); err != nil {
  454. t.Fatal(err)
  455. } else {
  456. if event != "containeriid notjson" {
  457. t.Fatalf("Unexpected event in message 1 %v", event)
  458. }
  459. }
  460. err = hec.Close()
  461. if err != nil {
  462. t.Fatal(err)
  463. }
  464. }
  465. // Verify raw format with labels
  466. func TestRawFormatWithLabels(t *testing.T) {
  467. hec := NewHTTPEventCollectorMock(t)
  468. go hec.Serve()
  469. ctx := logger.Context{
  470. Config: map[string]string{
  471. splunkURLKey: hec.URL(),
  472. splunkTokenKey: hec.token,
  473. splunkFormatKey: splunkFormatRaw,
  474. labelsKey: "a",
  475. },
  476. ContainerID: "containeriid",
  477. ContainerName: "/container_name",
  478. ContainerImageID: "contaimageid",
  479. ContainerImageName: "container_image_name",
  480. ContainerLabels: map[string]string{
  481. "a": "b",
  482. },
  483. }
  484. hostname, err := ctx.Hostname()
  485. if err != nil {
  486. t.Fatal(err)
  487. }
  488. loggerDriver, err := New(ctx)
  489. if err != nil {
  490. t.Fatal(err)
  491. }
  492. if !hec.connectionVerified {
  493. t.Fatal("By default connection should be verified")
  494. }
  495. splunkLoggerDriver, ok := loggerDriver.(*splunkLoggerRaw)
  496. if !ok {
  497. t.Fatal("Unexpected Splunk Logging Driver type")
  498. }
  499. if splunkLoggerDriver.url != hec.URL()+"/services/collector/event/1.0" ||
  500. splunkLoggerDriver.auth != "Splunk "+hec.token ||
  501. splunkLoggerDriver.nullMessage.Host != hostname ||
  502. splunkLoggerDriver.nullMessage.Source != "" ||
  503. splunkLoggerDriver.nullMessage.SourceType != "" ||
  504. splunkLoggerDriver.nullMessage.Index != "" ||
  505. splunkLoggerDriver.gzipCompression != false ||
  506. splunkLoggerDriver.postMessagesFrequency != defaultPostMessagesFrequency ||
  507. splunkLoggerDriver.postMessagesBatchSize != defaultPostMessagesBatchSize ||
  508. splunkLoggerDriver.bufferMaximum != defaultBufferMaximum ||
  509. cap(splunkLoggerDriver.stream) != defaultStreamChannelSize ||
  510. string(splunkLoggerDriver.prefix) != "containeriid a=b " {
  511. t.Fatal("Values do not match configuration.")
  512. }
  513. message1Time := time.Now()
  514. if err := loggerDriver.Log(&logger.Message{[]byte("{\"a\":\"b\"}"), "stdout", message1Time, nil, false}); err != nil {
  515. t.Fatal(err)
  516. }
  517. message2Time := time.Now()
  518. if err := loggerDriver.Log(&logger.Message{[]byte("notjson"), "stdout", message2Time, nil, false}); err != nil {
  519. t.Fatal(err)
  520. }
  521. err = loggerDriver.Close()
  522. if err != nil {
  523. t.Fatal(err)
  524. }
  525. if len(hec.messages) != 2 {
  526. t.Fatal("Expected two messages")
  527. }
  528. message1 := hec.messages[0]
  529. if message1.Time != fmt.Sprintf("%f", float64(message1Time.UnixNano())/float64(time.Second)) ||
  530. message1.Host != hostname ||
  531. message1.Source != "" ||
  532. message1.SourceType != "" ||
  533. message1.Index != "" {
  534. t.Fatalf("Unexpected values of message 1 %v", message1)
  535. }
  536. if event, err := message1.EventAsString(); err != nil {
  537. t.Fatal(err)
  538. } else {
  539. if event != "containeriid a=b {\"a\":\"b\"}" {
  540. t.Fatalf("Unexpected event in message 1 %v", event)
  541. }
  542. }
  543. message2 := hec.messages[1]
  544. if message2.Time != fmt.Sprintf("%f", float64(message2Time.UnixNano())/float64(time.Second)) ||
  545. message2.Host != hostname ||
  546. message2.Source != "" ||
  547. message2.SourceType != "" ||
  548. message2.Index != "" {
  549. t.Fatalf("Unexpected values of message 2 %v", message2)
  550. }
  551. if event, err := message2.EventAsString(); err != nil {
  552. t.Fatal(err)
  553. } else {
  554. if event != "containeriid a=b notjson" {
  555. t.Fatalf("Unexpected event in message 2 %v", event)
  556. }
  557. }
  558. err = hec.Close()
  559. if err != nil {
  560. t.Fatal(err)
  561. }
  562. }
  563. // Verify that Splunk Logging Driver can accept tag="" which will allow to send raw messages
  564. // in the same way we get them in stdout/stderr
  565. func TestRawFormatWithoutTag(t *testing.T) {
  566. hec := NewHTTPEventCollectorMock(t)
  567. go hec.Serve()
  568. ctx := logger.Context{
  569. Config: map[string]string{
  570. splunkURLKey: hec.URL(),
  571. splunkTokenKey: hec.token,
  572. splunkFormatKey: splunkFormatRaw,
  573. tagKey: "",
  574. },
  575. ContainerID: "containeriid",
  576. ContainerName: "/container_name",
  577. ContainerImageID: "contaimageid",
  578. ContainerImageName: "container_image_name",
  579. }
  580. hostname, err := ctx.Hostname()
  581. if err != nil {
  582. t.Fatal(err)
  583. }
  584. loggerDriver, err := New(ctx)
  585. if err != nil {
  586. t.Fatal(err)
  587. }
  588. if !hec.connectionVerified {
  589. t.Fatal("By default connection should be verified")
  590. }
  591. splunkLoggerDriver, ok := loggerDriver.(*splunkLoggerRaw)
  592. if !ok {
  593. t.Fatal("Unexpected Splunk Logging Driver type")
  594. }
  595. if splunkLoggerDriver.url != hec.URL()+"/services/collector/event/1.0" ||
  596. splunkLoggerDriver.auth != "Splunk "+hec.token ||
  597. splunkLoggerDriver.nullMessage.Host != hostname ||
  598. splunkLoggerDriver.nullMessage.Source != "" ||
  599. splunkLoggerDriver.nullMessage.SourceType != "" ||
  600. splunkLoggerDriver.nullMessage.Index != "" ||
  601. splunkLoggerDriver.gzipCompression != false ||
  602. splunkLoggerDriver.postMessagesFrequency != defaultPostMessagesFrequency ||
  603. splunkLoggerDriver.postMessagesBatchSize != defaultPostMessagesBatchSize ||
  604. splunkLoggerDriver.bufferMaximum != defaultBufferMaximum ||
  605. cap(splunkLoggerDriver.stream) != defaultStreamChannelSize ||
  606. string(splunkLoggerDriver.prefix) != "" {
  607. t.Log(string(splunkLoggerDriver.prefix) + "a")
  608. t.Fatal("Values do not match configuration.")
  609. }
  610. message1Time := time.Now()
  611. if err := loggerDriver.Log(&logger.Message{[]byte("{\"a\":\"b\"}"), "stdout", message1Time, nil, false}); err != nil {
  612. t.Fatal(err)
  613. }
  614. message2Time := time.Now()
  615. if err := loggerDriver.Log(&logger.Message{[]byte("notjson"), "stdout", message2Time, nil, false}); err != nil {
  616. t.Fatal(err)
  617. }
  618. err = loggerDriver.Close()
  619. if err != nil {
  620. t.Fatal(err)
  621. }
  622. if len(hec.messages) != 2 {
  623. t.Fatal("Expected two messages")
  624. }
  625. message1 := hec.messages[0]
  626. if message1.Time != fmt.Sprintf("%f", float64(message1Time.UnixNano())/float64(time.Second)) ||
  627. message1.Host != hostname ||
  628. message1.Source != "" ||
  629. message1.SourceType != "" ||
  630. message1.Index != "" {
  631. t.Fatalf("Unexpected values of message 1 %v", message1)
  632. }
  633. if event, err := message1.EventAsString(); err != nil {
  634. t.Fatal(err)
  635. } else {
  636. if event != "{\"a\":\"b\"}" {
  637. t.Fatalf("Unexpected event in message 1 %v", event)
  638. }
  639. }
  640. message2 := hec.messages[1]
  641. if message2.Time != fmt.Sprintf("%f", float64(message2Time.UnixNano())/float64(time.Second)) ||
  642. message2.Host != hostname ||
  643. message2.Source != "" ||
  644. message2.SourceType != "" ||
  645. message2.Index != "" {
  646. t.Fatalf("Unexpected values of message 2 %v", message2)
  647. }
  648. if event, err := message2.EventAsString(); err != nil {
  649. t.Fatal(err)
  650. } else {
  651. if event != "notjson" {
  652. t.Fatalf("Unexpected event in message 2 %v", event)
  653. }
  654. }
  655. err = hec.Close()
  656. if err != nil {
  657. t.Fatal(err)
  658. }
  659. }
  660. // Verify that we will send messages in batches with default batching parameters,
  661. // but change frequency to be sure that numOfRequests will match expected 17 requests
  662. func TestBatching(t *testing.T) {
  663. if err := os.Setenv(envVarPostMessagesFrequency, "10h"); err != nil {
  664. t.Fatal(err)
  665. }
  666. hec := NewHTTPEventCollectorMock(t)
  667. go hec.Serve()
  668. ctx := logger.Context{
  669. Config: map[string]string{
  670. splunkURLKey: hec.URL(),
  671. splunkTokenKey: hec.token,
  672. },
  673. ContainerID: "containeriid",
  674. ContainerName: "/container_name",
  675. ContainerImageID: "contaimageid",
  676. ContainerImageName: "container_image_name",
  677. }
  678. loggerDriver, err := New(ctx)
  679. if err != nil {
  680. t.Fatal(err)
  681. }
  682. for i := 0; i < defaultStreamChannelSize*4; i++ {
  683. if err := loggerDriver.Log(&logger.Message{[]byte(fmt.Sprintf("%d", i)), "stdout", time.Now(), nil, false}); err != nil {
  684. t.Fatal(err)
  685. }
  686. }
  687. err = loggerDriver.Close()
  688. if err != nil {
  689. t.Fatal(err)
  690. }
  691. if len(hec.messages) != defaultStreamChannelSize*4 {
  692. t.Fatal("Not all messages delivered")
  693. }
  694. for i, message := range hec.messages {
  695. if event, err := message.EventAsMap(); err != nil {
  696. t.Fatal(err)
  697. } else {
  698. if event["line"] != fmt.Sprintf("%d", i) {
  699. t.Fatalf("Unexpected event in message %v", event)
  700. }
  701. }
  702. }
  703. // 1 to verify connection and 16 batches
  704. if hec.numOfRequests != 17 {
  705. t.Fatalf("Unexpected number of requests %d", hec.numOfRequests)
  706. }
  707. err = hec.Close()
  708. if err != nil {
  709. t.Fatal(err)
  710. }
  711. if err := os.Setenv(envVarPostMessagesFrequency, ""); err != nil {
  712. t.Fatal(err)
  713. }
  714. }
  715. // Verify that test is using time to fire events not rare than specified frequency
  716. func TestFrequency(t *testing.T) {
  717. if err := os.Setenv(envVarPostMessagesFrequency, "5ms"); err != nil {
  718. t.Fatal(err)
  719. }
  720. hec := NewHTTPEventCollectorMock(t)
  721. go hec.Serve()
  722. ctx := logger.Context{
  723. Config: map[string]string{
  724. splunkURLKey: hec.URL(),
  725. splunkTokenKey: hec.token,
  726. },
  727. ContainerID: "containeriid",
  728. ContainerName: "/container_name",
  729. ContainerImageID: "contaimageid",
  730. ContainerImageName: "container_image_name",
  731. }
  732. loggerDriver, err := New(ctx)
  733. if err != nil {
  734. t.Fatal(err)
  735. }
  736. for i := 0; i < 10; i++ {
  737. if err := loggerDriver.Log(&logger.Message{[]byte(fmt.Sprintf("%d", i)), "stdout", time.Now(), nil, false}); err != nil {
  738. t.Fatal(err)
  739. }
  740. time.Sleep(15 * time.Millisecond)
  741. }
  742. err = loggerDriver.Close()
  743. if err != nil {
  744. t.Fatal(err)
  745. }
  746. if len(hec.messages) != 10 {
  747. t.Fatal("Not all messages delivered")
  748. }
  749. for i, message := range hec.messages {
  750. if event, err := message.EventAsMap(); err != nil {
  751. t.Fatal(err)
  752. } else {
  753. if event["line"] != fmt.Sprintf("%d", i) {
  754. t.Fatalf("Unexpected event in message %v", event)
  755. }
  756. }
  757. }
  758. // 1 to verify connection and 10 to verify that we have sent messages with required frequency,
  759. // but because frequency is too small (to keep test quick), instead of 11, use 9 if context switches will be slow
  760. if hec.numOfRequests < 9 {
  761. t.Fatalf("Unexpected number of requests %d", hec.numOfRequests)
  762. }
  763. err = hec.Close()
  764. if err != nil {
  765. t.Fatal(err)
  766. }
  767. if err := os.Setenv(envVarPostMessagesFrequency, ""); err != nil {
  768. t.Fatal(err)
  769. }
  770. }
  771. // Simulate behavior similar to first version of Splunk Logging Driver, when we were sending one message
  772. // per request
  773. func TestOneMessagePerRequest(t *testing.T) {
  774. if err := os.Setenv(envVarPostMessagesFrequency, "10h"); err != nil {
  775. t.Fatal(err)
  776. }
  777. if err := os.Setenv(envVarPostMessagesBatchSize, "1"); err != nil {
  778. t.Fatal(err)
  779. }
  780. if err := os.Setenv(envVarBufferMaximum, "1"); err != nil {
  781. t.Fatal(err)
  782. }
  783. if err := os.Setenv(envVarStreamChannelSize, "0"); err != nil {
  784. t.Fatal(err)
  785. }
  786. hec := NewHTTPEventCollectorMock(t)
  787. go hec.Serve()
  788. ctx := logger.Context{
  789. Config: map[string]string{
  790. splunkURLKey: hec.URL(),
  791. splunkTokenKey: hec.token,
  792. },
  793. ContainerID: "containeriid",
  794. ContainerName: "/container_name",
  795. ContainerImageID: "contaimageid",
  796. ContainerImageName: "container_image_name",
  797. }
  798. loggerDriver, err := New(ctx)
  799. if err != nil {
  800. t.Fatal(err)
  801. }
  802. for i := 0; i < 10; i++ {
  803. if err := loggerDriver.Log(&logger.Message{[]byte(fmt.Sprintf("%d", i)), "stdout", time.Now(), nil, false}); err != nil {
  804. t.Fatal(err)
  805. }
  806. }
  807. err = loggerDriver.Close()
  808. if err != nil {
  809. t.Fatal(err)
  810. }
  811. if len(hec.messages) != 10 {
  812. t.Fatal("Not all messages delivered")
  813. }
  814. for i, message := range hec.messages {
  815. if event, err := message.EventAsMap(); err != nil {
  816. t.Fatal(err)
  817. } else {
  818. if event["line"] != fmt.Sprintf("%d", i) {
  819. t.Fatalf("Unexpected event in message %v", event)
  820. }
  821. }
  822. }
  823. // 1 to verify connection and 10 messages
  824. if hec.numOfRequests != 11 {
  825. t.Fatalf("Unexpected number of requests %d", hec.numOfRequests)
  826. }
  827. err = hec.Close()
  828. if err != nil {
  829. t.Fatal(err)
  830. }
  831. if err := os.Setenv(envVarPostMessagesFrequency, ""); err != nil {
  832. t.Fatal(err)
  833. }
  834. if err := os.Setenv(envVarPostMessagesBatchSize, ""); err != nil {
  835. t.Fatal(err)
  836. }
  837. if err := os.Setenv(envVarBufferMaximum, ""); err != nil {
  838. t.Fatal(err)
  839. }
  840. if err := os.Setenv(envVarStreamChannelSize, ""); err != nil {
  841. t.Fatal(err)
  842. }
  843. }
  844. // Driver should not be created when HEC is unresponsive
  845. func TestVerify(t *testing.T) {
  846. hec := NewHTTPEventCollectorMock(t)
  847. hec.simulateServerError = true
  848. go hec.Serve()
  849. ctx := logger.Context{
  850. Config: map[string]string{
  851. splunkURLKey: hec.URL(),
  852. splunkTokenKey: hec.token,
  853. },
  854. ContainerID: "containeriid",
  855. ContainerName: "/container_name",
  856. ContainerImageID: "contaimageid",
  857. ContainerImageName: "container_image_name",
  858. }
  859. _, err := New(ctx)
  860. if err == nil {
  861. t.Fatal("Expecting driver to fail, when server is unresponsive")
  862. }
  863. err = hec.Close()
  864. if err != nil {
  865. t.Fatal(err)
  866. }
  867. }
  868. // Verify that user can specify to skip verification that Splunk HEC is working.
  869. // Also in this test we verify retry logic.
  870. func TestSkipVerify(t *testing.T) {
  871. hec := NewHTTPEventCollectorMock(t)
  872. hec.simulateServerError = true
  873. go hec.Serve()
  874. ctx := logger.Context{
  875. Config: map[string]string{
  876. splunkURLKey: hec.URL(),
  877. splunkTokenKey: hec.token,
  878. splunkVerifyConnectionKey: "false",
  879. },
  880. ContainerID: "containeriid",
  881. ContainerName: "/container_name",
  882. ContainerImageID: "contaimageid",
  883. ContainerImageName: "container_image_name",
  884. }
  885. loggerDriver, err := New(ctx)
  886. if err != nil {
  887. t.Fatal(err)
  888. }
  889. if hec.connectionVerified {
  890. t.Fatal("Connection should not be verified")
  891. }
  892. for i := 0; i < defaultStreamChannelSize*2; i++ {
  893. if err := loggerDriver.Log(&logger.Message{[]byte(fmt.Sprintf("%d", i)), "stdout", time.Now(), nil, false}); err != nil {
  894. t.Fatal(err)
  895. }
  896. }
  897. if len(hec.messages) != 0 {
  898. t.Fatal("No messages should be accepted at this point")
  899. }
  900. hec.simulateServerError = false
  901. for i := defaultStreamChannelSize * 2; i < defaultStreamChannelSize*4; i++ {
  902. if err := loggerDriver.Log(&logger.Message{[]byte(fmt.Sprintf("%d", i)), "stdout", time.Now(), nil, false}); err != nil {
  903. t.Fatal(err)
  904. }
  905. }
  906. err = loggerDriver.Close()
  907. if err != nil {
  908. t.Fatal(err)
  909. }
  910. if len(hec.messages) != defaultStreamChannelSize*4 {
  911. t.Fatal("Not all messages delivered")
  912. }
  913. for i, message := range hec.messages {
  914. if event, err := message.EventAsMap(); err != nil {
  915. t.Fatal(err)
  916. } else {
  917. if event["line"] != fmt.Sprintf("%d", i) {
  918. t.Fatalf("Unexpected event in message %v", event)
  919. }
  920. }
  921. }
  922. err = hec.Close()
  923. if err != nil {
  924. t.Fatal(err)
  925. }
  926. }
  927. // Verify logic for when we filled whole buffer
  928. func TestBufferMaximum(t *testing.T) {
  929. if err := os.Setenv(envVarPostMessagesBatchSize, "2"); err != nil {
  930. t.Fatal(err)
  931. }
  932. if err := os.Setenv(envVarBufferMaximum, "10"); err != nil {
  933. t.Fatal(err)
  934. }
  935. if err := os.Setenv(envVarStreamChannelSize, "0"); err != nil {
  936. t.Fatal(err)
  937. }
  938. hec := NewHTTPEventCollectorMock(t)
  939. hec.simulateServerError = true
  940. go hec.Serve()
  941. ctx := logger.Context{
  942. Config: map[string]string{
  943. splunkURLKey: hec.URL(),
  944. splunkTokenKey: hec.token,
  945. splunkVerifyConnectionKey: "false",
  946. },
  947. ContainerID: "containeriid",
  948. ContainerName: "/container_name",
  949. ContainerImageID: "contaimageid",
  950. ContainerImageName: "container_image_name",
  951. }
  952. loggerDriver, err := New(ctx)
  953. if err != nil {
  954. t.Fatal(err)
  955. }
  956. if hec.connectionVerified {
  957. t.Fatal("Connection should not be verified")
  958. }
  959. for i := 0; i < 11; i++ {
  960. if err := loggerDriver.Log(&logger.Message{[]byte(fmt.Sprintf("%d", i)), "stdout", time.Now(), nil, false}); err != nil {
  961. t.Fatal(err)
  962. }
  963. }
  964. if len(hec.messages) != 0 {
  965. t.Fatal("No messages should be accepted at this point")
  966. }
  967. hec.simulateServerError = false
  968. err = loggerDriver.Close()
  969. if err != nil {
  970. t.Fatal(err)
  971. }
  972. if len(hec.messages) != 9 {
  973. t.Fatalf("Expected # of messages %d, got %d", 9, len(hec.messages))
  974. }
  975. // First 1000 messages are written to daemon log when buffer was full
  976. for i, message := range hec.messages {
  977. if event, err := message.EventAsMap(); err != nil {
  978. t.Fatal(err)
  979. } else {
  980. if event["line"] != fmt.Sprintf("%d", i+2) {
  981. t.Fatalf("Unexpected event in message %v", event)
  982. }
  983. }
  984. }
  985. err = hec.Close()
  986. if err != nil {
  987. t.Fatal(err)
  988. }
  989. if err := os.Setenv(envVarPostMessagesBatchSize, ""); err != nil {
  990. t.Fatal(err)
  991. }
  992. if err := os.Setenv(envVarBufferMaximum, ""); err != nil {
  993. t.Fatal(err)
  994. }
  995. if err := os.Setenv(envVarStreamChannelSize, ""); err != nil {
  996. t.Fatal(err)
  997. }
  998. }
  999. // Verify that we are not blocking close when HEC is down for the whole time
  1000. func TestServerAlwaysDown(t *testing.T) {
  1001. if err := os.Setenv(envVarPostMessagesBatchSize, "2"); err != nil {
  1002. t.Fatal(err)
  1003. }
  1004. if err := os.Setenv(envVarBufferMaximum, "4"); err != nil {
  1005. t.Fatal(err)
  1006. }
  1007. if err := os.Setenv(envVarStreamChannelSize, "0"); err != nil {
  1008. t.Fatal(err)
  1009. }
  1010. hec := NewHTTPEventCollectorMock(t)
  1011. hec.simulateServerError = true
  1012. go hec.Serve()
  1013. ctx := logger.Context{
  1014. Config: map[string]string{
  1015. splunkURLKey: hec.URL(),
  1016. splunkTokenKey: hec.token,
  1017. splunkVerifyConnectionKey: "false",
  1018. },
  1019. ContainerID: "containeriid",
  1020. ContainerName: "/container_name",
  1021. ContainerImageID: "contaimageid",
  1022. ContainerImageName: "container_image_name",
  1023. }
  1024. loggerDriver, err := New(ctx)
  1025. if err != nil {
  1026. t.Fatal(err)
  1027. }
  1028. if hec.connectionVerified {
  1029. t.Fatal("Connection should not be verified")
  1030. }
  1031. for i := 0; i < 5; i++ {
  1032. if err := loggerDriver.Log(&logger.Message{[]byte(fmt.Sprintf("%d", i)), "stdout", time.Now(), nil, false}); err != nil {
  1033. t.Fatal(err)
  1034. }
  1035. }
  1036. err = loggerDriver.Close()
  1037. if err != nil {
  1038. t.Fatal(err)
  1039. }
  1040. if len(hec.messages) != 0 {
  1041. t.Fatal("No messages should be sent")
  1042. }
  1043. err = hec.Close()
  1044. if err != nil {
  1045. t.Fatal(err)
  1046. }
  1047. if err := os.Setenv(envVarPostMessagesBatchSize, ""); err != nil {
  1048. t.Fatal(err)
  1049. }
  1050. if err := os.Setenv(envVarBufferMaximum, ""); err != nil {
  1051. t.Fatal(err)
  1052. }
  1053. if err := os.Setenv(envVarStreamChannelSize, ""); err != nil {
  1054. t.Fatal(err)
  1055. }
  1056. }
  1057. // Cannot send messages after we close driver
  1058. func TestCannotSendAfterClose(t *testing.T) {
  1059. hec := NewHTTPEventCollectorMock(t)
  1060. go hec.Serve()
  1061. ctx := logger.Context{
  1062. Config: map[string]string{
  1063. splunkURLKey: hec.URL(),
  1064. splunkTokenKey: hec.token,
  1065. },
  1066. ContainerID: "containeriid",
  1067. ContainerName: "/container_name",
  1068. ContainerImageID: "contaimageid",
  1069. ContainerImageName: "container_image_name",
  1070. }
  1071. loggerDriver, err := New(ctx)
  1072. if err != nil {
  1073. t.Fatal(err)
  1074. }
  1075. if err := loggerDriver.Log(&logger.Message{[]byte("message1"), "stdout", time.Now(), nil, false}); err != nil {
  1076. t.Fatal(err)
  1077. }
  1078. err = loggerDriver.Close()
  1079. if err != nil {
  1080. t.Fatal(err)
  1081. }
  1082. if err := loggerDriver.Log(&logger.Message{[]byte("message2"), "stdout", time.Now(), nil, false}); err == nil {
  1083. t.Fatal("Driver should not allow to send messages after close")
  1084. }
  1085. if len(hec.messages) != 1 {
  1086. t.Fatal("Only one message should be sent")
  1087. }
  1088. message := hec.messages[0]
  1089. if event, err := message.EventAsMap(); err != nil {
  1090. t.Fatal(err)
  1091. } else {
  1092. if event["line"] != "message1" {
  1093. t.Fatalf("Unexpected event in message %v", event)
  1094. }
  1095. }
  1096. err = hec.Close()
  1097. if err != nil {
  1098. t.Fatal(err)
  1099. }
  1100. }