splunk_test.go 35 KB


  1. package splunk // import "github.com/docker/docker/daemon/logger/splunk"
  2. import (
  3. "compress/gzip"
  4. "context"
  5. "fmt"
  6. "net/http"
  7. "os"
  8. "runtime"
  9. "testing"
  10. "time"
  11. "github.com/docker/docker/daemon/logger"
  12. "gotest.tools/assert"
  13. "gotest.tools/env"
  14. )
  15. // Validate options
  16. func TestValidateLogOpt(t *testing.T) {
  17. err := ValidateLogOpt(map[string]string{
  18. splunkURLKey: "http://127.0.0.1",
  19. splunkTokenKey: "2160C7EF-2CE9-4307-A180-F852B99CF417",
  20. splunkSourceKey: "mysource",
  21. splunkSourceTypeKey: "mysourcetype",
  22. splunkIndexKey: "myindex",
  23. splunkCAPathKey: "/usr/cert.pem",
  24. splunkCANameKey: "ca_name",
  25. splunkInsecureSkipVerifyKey: "true",
  26. splunkFormatKey: "json",
  27. splunkVerifyConnectionKey: "true",
  28. splunkGzipCompressionKey: "true",
  29. splunkGzipCompressionLevelKey: "1",
  30. envKey: "a",
  31. envRegexKey: "^foo",
  32. labelsKey: "b",
  33. tagKey: "c",
  34. })
  35. if err != nil {
  36. t.Fatal(err)
  37. }
  38. err = ValidateLogOpt(map[string]string{
  39. "not-supported-option": "a",
  40. })
  41. if err == nil {
  42. t.Fatal("Expecting error on unsupported options")
  43. }
  44. }
  45. // Driver require user to specify required options
  46. func TestNewMissedConfig(t *testing.T) {
  47. info := logger.Info{
  48. Config: map[string]string{},
  49. }
  50. _, err := New(info)
  51. if err == nil {
  52. t.Fatal("Logger driver should fail when no required parameters specified")
  53. }
  54. }
  55. // Driver require user to specify splunk-url
  56. func TestNewMissedUrl(t *testing.T) {
  57. info := logger.Info{
  58. Config: map[string]string{
  59. splunkTokenKey: "4642492F-D8BD-47F1-A005-0C08AE4657DF",
  60. },
  61. }
  62. _, err := New(info)
  63. if err.Error() != "splunk: splunk-url is expected" {
  64. t.Fatal("Logger driver should fail when no required parameters specified")
  65. }
  66. }
  67. // Driver require user to specify splunk-token
  68. func TestNewMissedToken(t *testing.T) {
  69. info := logger.Info{
  70. Config: map[string]string{
  71. splunkURLKey: "http://127.0.0.1:8088",
  72. },
  73. }
  74. _, err := New(info)
  75. if err.Error() != "splunk: splunk-token is expected" {
  76. t.Fatal("Logger driver should fail when no required parameters specified")
  77. }
  78. }
  79. func TestNewWithProxy(t *testing.T) {
  80. proxy := "http://proxy.testing:8888"
  81. reset := env.Patch(t, "HTTP_PROXY", proxy)
  82. defer reset()
  83. // must not be localhost
  84. splunkURL := "http://example.com:12345"
  85. logger, err := New(logger.Info{
  86. Config: map[string]string{
  87. splunkURLKey: splunkURL,
  88. splunkTokenKey: "token",
  89. splunkVerifyConnectionKey: "false",
  90. },
  91. ContainerID: "containeriid",
  92. })
  93. assert.NilError(t, err)
  94. splunkLogger := logger.(*splunkLoggerInline)
  95. proxyFunc := splunkLogger.transport.Proxy
  96. assert.Assert(t, proxyFunc != nil)
  97. req, err := http.NewRequest("GET", splunkURL, nil)
  98. assert.NilError(t, err)
  99. proxyURL, err := proxyFunc(req)
  100. assert.NilError(t, err)
  101. assert.Assert(t, proxyURL != nil)
  102. assert.Equal(t, proxy, proxyURL.String())
  103. }
  104. // Test default settings
  105. func TestDefault(t *testing.T) {
  106. hec := NewHTTPEventCollectorMock(t)
  107. go hec.Serve()
  108. info := logger.Info{
  109. Config: map[string]string{
  110. splunkURLKey: hec.URL(),
  111. splunkTokenKey: hec.token,
  112. },
  113. ContainerID: "containeriid",
  114. ContainerName: "container_name",
  115. ContainerImageID: "contaimageid",
  116. ContainerImageName: "container_image_name",
  117. }
  118. hostname, err := info.Hostname()
  119. if err != nil {
  120. t.Fatal(err)
  121. }
  122. loggerDriver, err := New(info)
  123. if err != nil {
  124. t.Fatal(err)
  125. }
  126. if loggerDriver.Name() != driverName {
  127. t.Fatal("Unexpected logger driver name")
  128. }
  129. if !hec.connectionVerified {
  130. t.Fatal("By default connection should be verified")
  131. }
  132. splunkLoggerDriver, ok := loggerDriver.(*splunkLoggerInline)
  133. if !ok {
  134. t.Fatal("Unexpected Splunk Logging Driver type")
  135. }
  136. if splunkLoggerDriver.url != hec.URL()+"/services/collector/event/1.0" ||
  137. splunkLoggerDriver.auth != "Splunk "+hec.token ||
  138. splunkLoggerDriver.nullMessage.Host != hostname ||
  139. splunkLoggerDriver.nullMessage.Source != "" ||
  140. splunkLoggerDriver.nullMessage.SourceType != "" ||
  141. splunkLoggerDriver.nullMessage.Index != "" ||
  142. splunkLoggerDriver.gzipCompression ||
  143. splunkLoggerDriver.postMessagesFrequency != defaultPostMessagesFrequency ||
  144. splunkLoggerDriver.postMessagesBatchSize != defaultPostMessagesBatchSize ||
  145. splunkLoggerDriver.bufferMaximum != defaultBufferMaximum ||
  146. cap(splunkLoggerDriver.stream) != defaultStreamChannelSize {
  147. t.Fatal("Found not default values setup in Splunk Logging Driver.")
  148. }
  149. message1Time := time.Now()
  150. if err := loggerDriver.Log(&logger.Message{Line: []byte("{\"a\":\"b\"}"), Source: "stdout", Timestamp: message1Time}); err != nil {
  151. t.Fatal(err)
  152. }
  153. message2Time := time.Now()
  154. if err := loggerDriver.Log(&logger.Message{Line: []byte("notajson"), Source: "stdout", Timestamp: message2Time}); err != nil {
  155. t.Fatal(err)
  156. }
  157. err = loggerDriver.Close()
  158. if err != nil {
  159. t.Fatal(err)
  160. }
  161. if len(hec.messages) != 2 {
  162. t.Fatal("Expected two messages")
  163. }
  164. if *hec.gzipEnabled {
  165. t.Fatal("Gzip should not be used")
  166. }
  167. message1 := hec.messages[0]
  168. if message1.Time != fmt.Sprintf("%f", float64(message1Time.UnixNano())/float64(time.Second)) ||
  169. message1.Host != hostname ||
  170. message1.Source != "" ||
  171. message1.SourceType != "" ||
  172. message1.Index != "" {
  173. t.Fatalf("Unexpected values of message 1 %v", message1)
  174. }
  175. if event, err := message1.EventAsMap(); err != nil {
  176. t.Fatal(err)
  177. } else {
  178. if event["line"] != "{\"a\":\"b\"}" ||
  179. event["source"] != "stdout" ||
  180. event["tag"] != "containeriid" ||
  181. len(event) != 3 {
  182. t.Fatalf("Unexpected event in message %v", event)
  183. }
  184. }
  185. message2 := hec.messages[1]
  186. if message2.Time != fmt.Sprintf("%f", float64(message2Time.UnixNano())/float64(time.Second)) ||
  187. message2.Host != hostname ||
  188. message2.Source != "" ||
  189. message2.SourceType != "" ||
  190. message2.Index != "" {
  191. t.Fatalf("Unexpected values of message 1 %v", message2)
  192. }
  193. if event, err := message2.EventAsMap(); err != nil {
  194. t.Fatal(err)
  195. } else {
  196. if event["line"] != "notajson" ||
  197. event["source"] != "stdout" ||
  198. event["tag"] != "containeriid" ||
  199. len(event) != 3 {
  200. t.Fatalf("Unexpected event in message %v", event)
  201. }
  202. }
  203. err = hec.Close()
  204. if err != nil {
  205. t.Fatal(err)
  206. }
  207. }
  208. // Verify inline format with a not default settings for most of options
  209. func TestInlineFormatWithNonDefaultOptions(t *testing.T) {
  210. hec := NewHTTPEventCollectorMock(t)
  211. go hec.Serve()
  212. info := logger.Info{
  213. Config: map[string]string{
  214. splunkURLKey: hec.URL(),
  215. splunkTokenKey: hec.token,
  216. splunkSourceKey: "mysource",
  217. splunkSourceTypeKey: "mysourcetype",
  218. splunkIndexKey: "myindex",
  219. splunkFormatKey: splunkFormatInline,
  220. splunkGzipCompressionKey: "true",
  221. tagKey: "{{.ImageName}}/{{.Name}}",
  222. labelsKey: "a",
  223. envRegexKey: "^foo",
  224. },
  225. ContainerID: "containeriid",
  226. ContainerName: "/container_name",
  227. ContainerImageID: "contaimageid",
  228. ContainerImageName: "container_image_name",
  229. ContainerLabels: map[string]string{
  230. "a": "b",
  231. },
  232. ContainerEnv: []string{"foo_finder=bar"},
  233. }
  234. hostname, err := info.Hostname()
  235. if err != nil {
  236. t.Fatal(err)
  237. }
  238. loggerDriver, err := New(info)
  239. if err != nil {
  240. t.Fatal(err)
  241. }
  242. if !hec.connectionVerified {
  243. t.Fatal("By default connection should be verified")
  244. }
  245. splunkLoggerDriver, ok := loggerDriver.(*splunkLoggerInline)
  246. if !ok {
  247. t.Fatal("Unexpected Splunk Logging Driver type")
  248. }
  249. if splunkLoggerDriver.url != hec.URL()+"/services/collector/event/1.0" ||
  250. splunkLoggerDriver.auth != "Splunk "+hec.token ||
  251. splunkLoggerDriver.nullMessage.Host != hostname ||
  252. splunkLoggerDriver.nullMessage.Source != "mysource" ||
  253. splunkLoggerDriver.nullMessage.SourceType != "mysourcetype" ||
  254. splunkLoggerDriver.nullMessage.Index != "myindex" ||
  255. !splunkLoggerDriver.gzipCompression ||
  256. splunkLoggerDriver.gzipCompressionLevel != gzip.DefaultCompression ||
  257. splunkLoggerDriver.postMessagesFrequency != defaultPostMessagesFrequency ||
  258. splunkLoggerDriver.postMessagesBatchSize != defaultPostMessagesBatchSize ||
  259. splunkLoggerDriver.bufferMaximum != defaultBufferMaximum ||
  260. cap(splunkLoggerDriver.stream) != defaultStreamChannelSize {
  261. t.Fatal("Values do not match configuration.")
  262. }
  263. messageTime := time.Now()
  264. if err := loggerDriver.Log(&logger.Message{Line: []byte("1"), Source: "stdout", Timestamp: messageTime}); err != nil {
  265. t.Fatal(err)
  266. }
  267. err = loggerDriver.Close()
  268. if err != nil {
  269. t.Fatal(err)
  270. }
  271. if len(hec.messages) != 1 {
  272. t.Fatal("Expected one message")
  273. }
  274. if !*hec.gzipEnabled {
  275. t.Fatal("Gzip should be used")
  276. }
  277. message := hec.messages[0]
  278. if message.Time != fmt.Sprintf("%f", float64(messageTime.UnixNano())/float64(time.Second)) ||
  279. message.Host != hostname ||
  280. message.Source != "mysource" ||
  281. message.SourceType != "mysourcetype" ||
  282. message.Index != "myindex" {
  283. t.Fatalf("Unexpected values of message %v", message)
  284. }
  285. if event, err := message.EventAsMap(); err != nil {
  286. t.Fatal(err)
  287. } else {
  288. if event["line"] != "1" ||
  289. event["source"] != "stdout" ||
  290. event["tag"] != "container_image_name/container_name" ||
  291. event["attrs"].(map[string]interface{})["a"] != "b" ||
  292. event["attrs"].(map[string]interface{})["foo_finder"] != "bar" ||
  293. len(event) != 4 {
  294. t.Fatalf("Unexpected event in message %v", event)
  295. }
  296. }
  297. err = hec.Close()
  298. if err != nil {
  299. t.Fatal(err)
  300. }
  301. }
  302. // Verify JSON format
  303. func TestJsonFormat(t *testing.T) {
  304. hec := NewHTTPEventCollectorMock(t)
  305. go hec.Serve()
  306. info := logger.Info{
  307. Config: map[string]string{
  308. splunkURLKey: hec.URL(),
  309. splunkTokenKey: hec.token,
  310. splunkFormatKey: splunkFormatJSON,
  311. splunkGzipCompressionKey: "true",
  312. splunkGzipCompressionLevelKey: "1",
  313. },
  314. ContainerID: "containeriid",
  315. ContainerName: "/container_name",
  316. ContainerImageID: "contaimageid",
  317. ContainerImageName: "container_image_name",
  318. }
  319. hostname, err := info.Hostname()
  320. if err != nil {
  321. t.Fatal(err)
  322. }
  323. loggerDriver, err := New(info)
  324. if err != nil {
  325. t.Fatal(err)
  326. }
  327. if !hec.connectionVerified {
  328. t.Fatal("By default connection should be verified")
  329. }
  330. splunkLoggerDriver, ok := loggerDriver.(*splunkLoggerJSON)
  331. if !ok {
  332. t.Fatal("Unexpected Splunk Logging Driver type")
  333. }
  334. if splunkLoggerDriver.url != hec.URL()+"/services/collector/event/1.0" ||
  335. splunkLoggerDriver.auth != "Splunk "+hec.token ||
  336. splunkLoggerDriver.nullMessage.Host != hostname ||
  337. splunkLoggerDriver.nullMessage.Source != "" ||
  338. splunkLoggerDriver.nullMessage.SourceType != "" ||
  339. splunkLoggerDriver.nullMessage.Index != "" ||
  340. !splunkLoggerDriver.gzipCompression ||
  341. splunkLoggerDriver.gzipCompressionLevel != gzip.BestSpeed ||
  342. splunkLoggerDriver.postMessagesFrequency != defaultPostMessagesFrequency ||
  343. splunkLoggerDriver.postMessagesBatchSize != defaultPostMessagesBatchSize ||
  344. splunkLoggerDriver.bufferMaximum != defaultBufferMaximum ||
  345. cap(splunkLoggerDriver.stream) != defaultStreamChannelSize {
  346. t.Fatal("Values do not match configuration.")
  347. }
  348. message1Time := time.Now()
  349. if err := loggerDriver.Log(&logger.Message{Line: []byte("{\"a\":\"b\"}"), Source: "stdout", Timestamp: message1Time}); err != nil {
  350. t.Fatal(err)
  351. }
  352. message2Time := time.Now()
  353. if err := loggerDriver.Log(&logger.Message{Line: []byte("notjson"), Source: "stdout", Timestamp: message2Time}); err != nil {
  354. t.Fatal(err)
  355. }
  356. err = loggerDriver.Close()
  357. if err != nil {
  358. t.Fatal(err)
  359. }
  360. if len(hec.messages) != 2 {
  361. t.Fatal("Expected two messages")
  362. }
  363. message1 := hec.messages[0]
  364. if message1.Time != fmt.Sprintf("%f", float64(message1Time.UnixNano())/float64(time.Second)) ||
  365. message1.Host != hostname ||
  366. message1.Source != "" ||
  367. message1.SourceType != "" ||
  368. message1.Index != "" {
  369. t.Fatalf("Unexpected values of message 1 %v", message1)
  370. }
  371. if event, err := message1.EventAsMap(); err != nil {
  372. t.Fatal(err)
  373. } else {
  374. if event["line"].(map[string]interface{})["a"] != "b" ||
  375. event["source"] != "stdout" ||
  376. event["tag"] != "containeriid" ||
  377. len(event) != 3 {
  378. t.Fatalf("Unexpected event in message 1 %v", event)
  379. }
  380. }
  381. message2 := hec.messages[1]
  382. if message2.Time != fmt.Sprintf("%f", float64(message2Time.UnixNano())/float64(time.Second)) ||
  383. message2.Host != hostname ||
  384. message2.Source != "" ||
  385. message2.SourceType != "" ||
  386. message2.Index != "" {
  387. t.Fatalf("Unexpected values of message 2 %v", message2)
  388. }
  389. // If message cannot be parsed as JSON - it should be sent as a line
  390. if event, err := message2.EventAsMap(); err != nil {
  391. t.Fatal(err)
  392. } else {
  393. if event["line"] != "notjson" ||
  394. event["source"] != "stdout" ||
  395. event["tag"] != "containeriid" ||
  396. len(event) != 3 {
  397. t.Fatalf("Unexpected event in message 2 %v", event)
  398. }
  399. }
  400. err = hec.Close()
  401. if err != nil {
  402. t.Fatal(err)
  403. }
  404. }
  405. // Verify raw format
  406. func TestRawFormat(t *testing.T) {
  407. hec := NewHTTPEventCollectorMock(t)
  408. go hec.Serve()
  409. info := logger.Info{
  410. Config: map[string]string{
  411. splunkURLKey: hec.URL(),
  412. splunkTokenKey: hec.token,
  413. splunkFormatKey: splunkFormatRaw,
  414. },
  415. ContainerID: "containeriid",
  416. ContainerName: "/container_name",
  417. ContainerImageID: "contaimageid",
  418. ContainerImageName: "container_image_name",
  419. }
  420. hostname, err := info.Hostname()
  421. assert.NilError(t, err)
  422. loggerDriver, err := New(info)
  423. assert.NilError(t, err)
  424. if !hec.connectionVerified {
  425. t.Fatal("By default connection should be verified")
  426. }
  427. splunkLoggerDriver, ok := loggerDriver.(*splunkLoggerRaw)
  428. if !ok {
  429. t.Fatal("Unexpected Splunk Logging Driver type")
  430. }
  431. if splunkLoggerDriver.url != hec.URL()+"/services/collector/event/1.0" ||
  432. splunkLoggerDriver.auth != "Splunk "+hec.token ||
  433. splunkLoggerDriver.nullMessage.Host != hostname ||
  434. splunkLoggerDriver.nullMessage.Source != "" ||
  435. splunkLoggerDriver.nullMessage.SourceType != "" ||
  436. splunkLoggerDriver.nullMessage.Index != "" ||
  437. splunkLoggerDriver.gzipCompression ||
  438. splunkLoggerDriver.postMessagesFrequency != defaultPostMessagesFrequency ||
  439. splunkLoggerDriver.postMessagesBatchSize != defaultPostMessagesBatchSize ||
  440. splunkLoggerDriver.bufferMaximum != defaultBufferMaximum ||
  441. cap(splunkLoggerDriver.stream) != defaultStreamChannelSize ||
  442. string(splunkLoggerDriver.prefix) != "containeriid " {
  443. t.Fatal("Values do not match configuration.")
  444. }
  445. message1Time := time.Now()
  446. if err := loggerDriver.Log(&logger.Message{Line: []byte("{\"a\":\"b\"}"), Source: "stdout", Timestamp: message1Time}); err != nil {
  447. t.Fatal(err)
  448. }
  449. message2Time := time.Now()
  450. if err := loggerDriver.Log(&logger.Message{Line: []byte("notjson"), Source: "stdout", Timestamp: message2Time}); err != nil {
  451. t.Fatal(err)
  452. }
  453. err = loggerDriver.Close()
  454. if err != nil {
  455. t.Fatal(err)
  456. }
  457. if len(hec.messages) != 2 {
  458. t.Fatal("Expected two messages")
  459. }
  460. message1 := hec.messages[0]
  461. if message1.Time != fmt.Sprintf("%f", float64(message1Time.UnixNano())/float64(time.Second)) ||
  462. message1.Host != hostname ||
  463. message1.Source != "" ||
  464. message1.SourceType != "" ||
  465. message1.Index != "" {
  466. t.Fatalf("Unexpected values of message 1 %v", message1)
  467. }
  468. if event, err := message1.EventAsString(); err != nil {
  469. t.Fatal(err)
  470. } else {
  471. if event != "containeriid {\"a\":\"b\"}" {
  472. t.Fatalf("Unexpected event in message 1 %v", event)
  473. }
  474. }
  475. message2 := hec.messages[1]
  476. if message2.Time != fmt.Sprintf("%f", float64(message2Time.UnixNano())/float64(time.Second)) ||
  477. message2.Host != hostname ||
  478. message2.Source != "" ||
  479. message2.SourceType != "" ||
  480. message2.Index != "" {
  481. t.Fatalf("Unexpected values of message 2 %v", message2)
  482. }
  483. if event, err := message2.EventAsString(); err != nil {
  484. t.Fatal(err)
  485. } else {
  486. if event != "containeriid notjson" {
  487. t.Fatalf("Unexpected event in message 1 %v", event)
  488. }
  489. }
  490. err = hec.Close()
  491. if err != nil {
  492. t.Fatal(err)
  493. }
  494. }
  495. // Verify raw format with labels
  496. func TestRawFormatWithLabels(t *testing.T) {
  497. hec := NewHTTPEventCollectorMock(t)
  498. go hec.Serve()
  499. info := logger.Info{
  500. Config: map[string]string{
  501. splunkURLKey: hec.URL(),
  502. splunkTokenKey: hec.token,
  503. splunkFormatKey: splunkFormatRaw,
  504. labelsKey: "a",
  505. },
  506. ContainerID: "containeriid",
  507. ContainerName: "/container_name",
  508. ContainerImageID: "contaimageid",
  509. ContainerImageName: "container_image_name",
  510. ContainerLabels: map[string]string{
  511. "a": "b",
  512. },
  513. }
  514. hostname, err := info.Hostname()
  515. if err != nil {
  516. t.Fatal(err)
  517. }
  518. loggerDriver, err := New(info)
  519. if err != nil {
  520. t.Fatal(err)
  521. }
  522. if !hec.connectionVerified {
  523. t.Fatal("By default connection should be verified")
  524. }
  525. splunkLoggerDriver, ok := loggerDriver.(*splunkLoggerRaw)
  526. if !ok {
  527. t.Fatal("Unexpected Splunk Logging Driver type")
  528. }
  529. if splunkLoggerDriver.url != hec.URL()+"/services/collector/event/1.0" ||
  530. splunkLoggerDriver.auth != "Splunk "+hec.token ||
  531. splunkLoggerDriver.nullMessage.Host != hostname ||
  532. splunkLoggerDriver.nullMessage.Source != "" ||
  533. splunkLoggerDriver.nullMessage.SourceType != "" ||
  534. splunkLoggerDriver.nullMessage.Index != "" ||
  535. splunkLoggerDriver.gzipCompression ||
  536. splunkLoggerDriver.postMessagesFrequency != defaultPostMessagesFrequency ||
  537. splunkLoggerDriver.postMessagesBatchSize != defaultPostMessagesBatchSize ||
  538. splunkLoggerDriver.bufferMaximum != defaultBufferMaximum ||
  539. cap(splunkLoggerDriver.stream) != defaultStreamChannelSize ||
  540. string(splunkLoggerDriver.prefix) != "containeriid a=b " {
  541. t.Fatal("Values do not match configuration.")
  542. }
  543. message1Time := time.Now()
  544. if err := loggerDriver.Log(&logger.Message{Line: []byte("{\"a\":\"b\"}"), Source: "stdout", Timestamp: message1Time}); err != nil {
  545. t.Fatal(err)
  546. }
  547. message2Time := time.Now()
  548. if err := loggerDriver.Log(&logger.Message{Line: []byte("notjson"), Source: "stdout", Timestamp: message2Time}); err != nil {
  549. t.Fatal(err)
  550. }
  551. err = loggerDriver.Close()
  552. if err != nil {
  553. t.Fatal(err)
  554. }
  555. if len(hec.messages) != 2 {
  556. t.Fatal("Expected two messages")
  557. }
  558. message1 := hec.messages[0]
  559. if message1.Time != fmt.Sprintf("%f", float64(message1Time.UnixNano())/float64(time.Second)) ||
  560. message1.Host != hostname ||
  561. message1.Source != "" ||
  562. message1.SourceType != "" ||
  563. message1.Index != "" {
  564. t.Fatalf("Unexpected values of message 1 %v", message1)
  565. }
  566. if event, err := message1.EventAsString(); err != nil {
  567. t.Fatal(err)
  568. } else {
  569. if event != "containeriid a=b {\"a\":\"b\"}" {
  570. t.Fatalf("Unexpected event in message 1 %v", event)
  571. }
  572. }
  573. message2 := hec.messages[1]
  574. if message2.Time != fmt.Sprintf("%f", float64(message2Time.UnixNano())/float64(time.Second)) ||
  575. message2.Host != hostname ||
  576. message2.Source != "" ||
  577. message2.SourceType != "" ||
  578. message2.Index != "" {
  579. t.Fatalf("Unexpected values of message 2 %v", message2)
  580. }
  581. if event, err := message2.EventAsString(); err != nil {
  582. t.Fatal(err)
  583. } else {
  584. if event != "containeriid a=b notjson" {
  585. t.Fatalf("Unexpected event in message 2 %v", event)
  586. }
  587. }
  588. err = hec.Close()
  589. if err != nil {
  590. t.Fatal(err)
  591. }
  592. }
  593. // Verify that Splunk Logging Driver can accept tag="" which will allow to send raw messages
  594. // in the same way we get them in stdout/stderr
  595. func TestRawFormatWithoutTag(t *testing.T) {
  596. hec := NewHTTPEventCollectorMock(t)
  597. go hec.Serve()
  598. info := logger.Info{
  599. Config: map[string]string{
  600. splunkURLKey: hec.URL(),
  601. splunkTokenKey: hec.token,
  602. splunkFormatKey: splunkFormatRaw,
  603. tagKey: "",
  604. },
  605. ContainerID: "containeriid",
  606. ContainerName: "/container_name",
  607. ContainerImageID: "contaimageid",
  608. ContainerImageName: "container_image_name",
  609. }
  610. hostname, err := info.Hostname()
  611. if err != nil {
  612. t.Fatal(err)
  613. }
  614. loggerDriver, err := New(info)
  615. if err != nil {
  616. t.Fatal(err)
  617. }
  618. if !hec.connectionVerified {
  619. t.Fatal("By default connection should be verified")
  620. }
  621. splunkLoggerDriver, ok := loggerDriver.(*splunkLoggerRaw)
  622. if !ok {
  623. t.Fatal("Unexpected Splunk Logging Driver type")
  624. }
  625. if splunkLoggerDriver.url != hec.URL()+"/services/collector/event/1.0" ||
  626. splunkLoggerDriver.auth != "Splunk "+hec.token ||
  627. splunkLoggerDriver.nullMessage.Host != hostname ||
  628. splunkLoggerDriver.nullMessage.Source != "" ||
  629. splunkLoggerDriver.nullMessage.SourceType != "" ||
  630. splunkLoggerDriver.nullMessage.Index != "" ||
  631. splunkLoggerDriver.gzipCompression ||
  632. splunkLoggerDriver.postMessagesFrequency != defaultPostMessagesFrequency ||
  633. splunkLoggerDriver.postMessagesBatchSize != defaultPostMessagesBatchSize ||
  634. splunkLoggerDriver.bufferMaximum != defaultBufferMaximum ||
  635. cap(splunkLoggerDriver.stream) != defaultStreamChannelSize ||
  636. string(splunkLoggerDriver.prefix) != "" {
  637. t.Log(string(splunkLoggerDriver.prefix) + "a")
  638. t.Fatal("Values do not match configuration.")
  639. }
  640. message1Time := time.Now()
  641. if err := loggerDriver.Log(&logger.Message{Line: []byte("{\"a\":\"b\"}"), Source: "stdout", Timestamp: message1Time}); err != nil {
  642. t.Fatal(err)
  643. }
  644. message2Time := time.Now()
  645. if err := loggerDriver.Log(&logger.Message{Line: []byte("notjson"), Source: "stdout", Timestamp: message2Time}); err != nil {
  646. t.Fatal(err)
  647. }
  648. message3Time := time.Now()
  649. if err := loggerDriver.Log(&logger.Message{Line: []byte(" "), Source: "stdout", Timestamp: message3Time}); err != nil {
  650. t.Fatal(err)
  651. }
  652. err = loggerDriver.Close()
  653. if err != nil {
  654. t.Fatal(err)
  655. }
  656. // message3 would have an empty or whitespace only string in the "event" field
  657. // both of which are not acceptable to HEC
  658. // thus here we must expect 2 messages, not 3
  659. if len(hec.messages) != 2 {
  660. t.Fatal("Expected two messages")
  661. }
  662. message1 := hec.messages[0]
  663. if message1.Time != fmt.Sprintf("%f", float64(message1Time.UnixNano())/float64(time.Second)) ||
  664. message1.Host != hostname ||
  665. message1.Source != "" ||
  666. message1.SourceType != "" ||
  667. message1.Index != "" {
  668. t.Fatalf("Unexpected values of message 1 %v", message1)
  669. }
  670. if event, err := message1.EventAsString(); err != nil {
  671. t.Fatal(err)
  672. } else {
  673. if event != "{\"a\":\"b\"}" {
  674. t.Fatalf("Unexpected event in message 1 %v", event)
  675. }
  676. }
  677. message2 := hec.messages[1]
  678. if message2.Time != fmt.Sprintf("%f", float64(message2Time.UnixNano())/float64(time.Second)) ||
  679. message2.Host != hostname ||
  680. message2.Source != "" ||
  681. message2.SourceType != "" ||
  682. message2.Index != "" {
  683. t.Fatalf("Unexpected values of message 2 %v", message2)
  684. }
  685. if event, err := message2.EventAsString(); err != nil {
  686. t.Fatal(err)
  687. } else {
  688. if event != "notjson" {
  689. t.Fatalf("Unexpected event in message 2 %v", event)
  690. }
  691. }
  692. err = hec.Close()
  693. if err != nil {
  694. t.Fatal(err)
  695. }
  696. }
  697. // Verify that we will send messages in batches with default batching parameters,
  698. // but change frequency to be sure that numOfRequests will match expected 17 requests
  699. func TestBatching(t *testing.T) {
  700. if err := os.Setenv(envVarPostMessagesFrequency, "10h"); err != nil {
  701. t.Fatal(err)
  702. }
  703. hec := NewHTTPEventCollectorMock(t)
  704. go hec.Serve()
  705. info := logger.Info{
  706. Config: map[string]string{
  707. splunkURLKey: hec.URL(),
  708. splunkTokenKey: hec.token,
  709. },
  710. ContainerID: "containeriid",
  711. ContainerName: "/container_name",
  712. ContainerImageID: "contaimageid",
  713. ContainerImageName: "container_image_name",
  714. }
  715. loggerDriver, err := New(info)
  716. if err != nil {
  717. t.Fatal(err)
  718. }
  719. for i := 0; i < defaultStreamChannelSize*4; i++ {
  720. if err := loggerDriver.Log(&logger.Message{Line: []byte(fmt.Sprintf("%d", i)), Source: "stdout", Timestamp: time.Now()}); err != nil {
  721. t.Fatal(err)
  722. }
  723. }
  724. err = loggerDriver.Close()
  725. if err != nil {
  726. t.Fatal(err)
  727. }
  728. if len(hec.messages) != defaultStreamChannelSize*4 {
  729. t.Fatal("Not all messages delivered")
  730. }
  731. for i, message := range hec.messages {
  732. if event, err := message.EventAsMap(); err != nil {
  733. t.Fatal(err)
  734. } else {
  735. if event["line"] != fmt.Sprintf("%d", i) {
  736. t.Fatalf("Unexpected event in message %v", event)
  737. }
  738. }
  739. }
  740. // 1 to verify connection and 16 batches
  741. if hec.numOfRequests != 17 {
  742. t.Fatalf("Unexpected number of requests %d", hec.numOfRequests)
  743. }
  744. err = hec.Close()
  745. if err != nil {
  746. t.Fatal(err)
  747. }
  748. if err := os.Setenv(envVarPostMessagesFrequency, ""); err != nil {
  749. t.Fatal(err)
  750. }
  751. }
  752. // Verify that test is using time to fire events not rare than specified frequency
  753. func TestFrequency(t *testing.T) {
  754. if err := os.Setenv(envVarPostMessagesFrequency, "5ms"); err != nil {
  755. t.Fatal(err)
  756. }
  757. hec := NewHTTPEventCollectorMock(t)
  758. go hec.Serve()
  759. info := logger.Info{
  760. Config: map[string]string{
  761. splunkURLKey: hec.URL(),
  762. splunkTokenKey: hec.token,
  763. },
  764. ContainerID: "containeriid",
  765. ContainerName: "/container_name",
  766. ContainerImageID: "contaimageid",
  767. ContainerImageName: "container_image_name",
  768. }
  769. loggerDriver, err := New(info)
  770. if err != nil {
  771. t.Fatal(err)
  772. }
  773. for i := 0; i < 10; i++ {
  774. if err := loggerDriver.Log(&logger.Message{Line: []byte(fmt.Sprintf("%d", i)), Source: "stdout", Timestamp: time.Now()}); err != nil {
  775. t.Fatal(err)
  776. }
  777. time.Sleep(15 * time.Millisecond)
  778. }
  779. err = loggerDriver.Close()
  780. if err != nil {
  781. t.Fatal(err)
  782. }
  783. if len(hec.messages) != 10 {
  784. t.Fatal("Not all messages delivered")
  785. }
  786. for i, message := range hec.messages {
  787. if event, err := message.EventAsMap(); err != nil {
  788. t.Fatal(err)
  789. } else {
  790. if event["line"] != fmt.Sprintf("%d", i) {
  791. t.Fatalf("Unexpected event in message %v", event)
  792. }
  793. }
  794. }
  795. // 1 to verify connection and 10 to verify that we have sent messages with required frequency,
  796. // but because frequency is too small (to keep test quick), instead of 11, use 9 if context switches will be slow
  797. if hec.numOfRequests < 9 {
  798. t.Fatalf("Unexpected number of requests %d", hec.numOfRequests)
  799. }
  800. err = hec.Close()
  801. if err != nil {
  802. t.Fatal(err)
  803. }
  804. if err := os.Setenv(envVarPostMessagesFrequency, ""); err != nil {
  805. t.Fatal(err)
  806. }
  807. }
  808. // Simulate behavior similar to first version of Splunk Logging Driver, when we were sending one message
  809. // per request
  810. func TestOneMessagePerRequest(t *testing.T) {
  811. if err := os.Setenv(envVarPostMessagesFrequency, "10h"); err != nil {
  812. t.Fatal(err)
  813. }
  814. if err := os.Setenv(envVarPostMessagesBatchSize, "1"); err != nil {
  815. t.Fatal(err)
  816. }
  817. if err := os.Setenv(envVarBufferMaximum, "1"); err != nil {
  818. t.Fatal(err)
  819. }
  820. if err := os.Setenv(envVarStreamChannelSize, "0"); err != nil {
  821. t.Fatal(err)
  822. }
  823. hec := NewHTTPEventCollectorMock(t)
  824. go hec.Serve()
  825. info := logger.Info{
  826. Config: map[string]string{
  827. splunkURLKey: hec.URL(),
  828. splunkTokenKey: hec.token,
  829. },
  830. ContainerID: "containeriid",
  831. ContainerName: "/container_name",
  832. ContainerImageID: "contaimageid",
  833. ContainerImageName: "container_image_name",
  834. }
  835. loggerDriver, err := New(info)
  836. if err != nil {
  837. t.Fatal(err)
  838. }
  839. for i := 0; i < 10; i++ {
  840. if err := loggerDriver.Log(&logger.Message{Line: []byte(fmt.Sprintf("%d", i)), Source: "stdout", Timestamp: time.Now()}); err != nil {
  841. t.Fatal(err)
  842. }
  843. }
  844. err = loggerDriver.Close()
  845. if err != nil {
  846. t.Fatal(err)
  847. }
  848. if len(hec.messages) != 10 {
  849. t.Fatal("Not all messages delivered")
  850. }
  851. for i, message := range hec.messages {
  852. if event, err := message.EventAsMap(); err != nil {
  853. t.Fatal(err)
  854. } else {
  855. if event["line"] != fmt.Sprintf("%d", i) {
  856. t.Fatalf("Unexpected event in message %v", event)
  857. }
  858. }
  859. }
  860. // 1 to verify connection and 10 messages
  861. if hec.numOfRequests != 11 {
  862. t.Fatalf("Unexpected number of requests %d", hec.numOfRequests)
  863. }
  864. err = hec.Close()
  865. if err != nil {
  866. t.Fatal(err)
  867. }
  868. if err := os.Setenv(envVarPostMessagesFrequency, ""); err != nil {
  869. t.Fatal(err)
  870. }
  871. if err := os.Setenv(envVarPostMessagesBatchSize, ""); err != nil {
  872. t.Fatal(err)
  873. }
  874. if err := os.Setenv(envVarBufferMaximum, ""); err != nil {
  875. t.Fatal(err)
  876. }
  877. if err := os.Setenv(envVarStreamChannelSize, ""); err != nil {
  878. t.Fatal(err)
  879. }
  880. }
  881. // Driver should not be created when HEC is unresponsive
  882. func TestVerify(t *testing.T) {
  883. hec := NewHTTPEventCollectorMock(t)
  884. hec.simulateServerError = true
  885. go hec.Serve()
  886. info := logger.Info{
  887. Config: map[string]string{
  888. splunkURLKey: hec.URL(),
  889. splunkTokenKey: hec.token,
  890. },
  891. ContainerID: "containeriid",
  892. ContainerName: "/container_name",
  893. ContainerImageID: "contaimageid",
  894. ContainerImageName: "container_image_name",
  895. }
  896. _, err := New(info)
  897. if err == nil {
  898. t.Fatal("Expecting driver to fail, when server is unresponsive")
  899. }
  900. err = hec.Close()
  901. if err != nil {
  902. t.Fatal(err)
  903. }
  904. }
  905. // Verify that user can specify to skip verification that Splunk HEC is working.
  906. // Also in this test we verify retry logic.
  907. func TestSkipVerify(t *testing.T) {
  908. hec := NewHTTPEventCollectorMock(t)
  909. hec.simulateServerError = true
  910. go hec.Serve()
  911. info := logger.Info{
  912. Config: map[string]string{
  913. splunkURLKey: hec.URL(),
  914. splunkTokenKey: hec.token,
  915. splunkVerifyConnectionKey: "false",
  916. },
  917. ContainerID: "containeriid",
  918. ContainerName: "/container_name",
  919. ContainerImageID: "contaimageid",
  920. ContainerImageName: "container_image_name",
  921. }
  922. loggerDriver, err := New(info)
  923. if err != nil {
  924. t.Fatal(err)
  925. }
  926. if hec.connectionVerified {
  927. t.Fatal("Connection should not be verified")
  928. }
  929. for i := 0; i < defaultStreamChannelSize*2; i++ {
  930. if err := loggerDriver.Log(&logger.Message{Line: []byte(fmt.Sprintf("%d", i)), Source: "stdout", Timestamp: time.Now()}); err != nil {
  931. t.Fatal(err)
  932. }
  933. }
  934. if len(hec.messages) != 0 {
  935. t.Fatal("No messages should be accepted at this point")
  936. }
  937. hec.simulateErr(false)
  938. for i := defaultStreamChannelSize * 2; i < defaultStreamChannelSize*4; i++ {
  939. if err := loggerDriver.Log(&logger.Message{Line: []byte(fmt.Sprintf("%d", i)), Source: "stdout", Timestamp: time.Now()}); err != nil {
  940. t.Fatal(err)
  941. }
  942. }
  943. err = loggerDriver.Close()
  944. if err != nil {
  945. t.Fatal(err)
  946. }
  947. if len(hec.messages) != defaultStreamChannelSize*4 {
  948. t.Fatal("Not all messages delivered")
  949. }
  950. for i, message := range hec.messages {
  951. if event, err := message.EventAsMap(); err != nil {
  952. t.Fatal(err)
  953. } else {
  954. if event["line"] != fmt.Sprintf("%d", i) {
  955. t.Fatalf("Unexpected event in message %v", event)
  956. }
  957. }
  958. }
  959. err = hec.Close()
  960. if err != nil {
  961. t.Fatal(err)
  962. }
  963. }
  964. // Verify logic for when we filled whole buffer
  965. func TestBufferMaximum(t *testing.T) {
  966. if err := os.Setenv(envVarPostMessagesBatchSize, "2"); err != nil {
  967. t.Fatal(err)
  968. }
  969. if err := os.Setenv(envVarBufferMaximum, "10"); err != nil {
  970. t.Fatal(err)
  971. }
  972. if err := os.Setenv(envVarStreamChannelSize, "0"); err != nil {
  973. t.Fatal(err)
  974. }
  975. hec := NewHTTPEventCollectorMock(t)
  976. hec.simulateErr(true)
  977. go hec.Serve()
  978. info := logger.Info{
  979. Config: map[string]string{
  980. splunkURLKey: hec.URL(),
  981. splunkTokenKey: hec.token,
  982. splunkVerifyConnectionKey: "false",
  983. },
  984. ContainerID: "containeriid",
  985. ContainerName: "/container_name",
  986. ContainerImageID: "contaimageid",
  987. ContainerImageName: "container_image_name",
  988. }
  989. loggerDriver, err := New(info)
  990. if err != nil {
  991. t.Fatal(err)
  992. }
  993. if hec.connectionVerified {
  994. t.Fatal("Connection should not be verified")
  995. }
  996. for i := 0; i < 11; i++ {
  997. if err := loggerDriver.Log(&logger.Message{Line: []byte(fmt.Sprintf("%d", i)), Source: "stdout", Timestamp: time.Now()}); err != nil {
  998. t.Fatal(err)
  999. }
  1000. }
  1001. if len(hec.messages) != 0 {
  1002. t.Fatal("No messages should be accepted at this point")
  1003. }
  1004. hec.simulateServerError = false
  1005. err = loggerDriver.Close()
  1006. if err != nil {
  1007. t.Fatal(err)
  1008. }
  1009. if len(hec.messages) != 9 {
  1010. t.Fatalf("Expected # of messages %d, got %d", 9, len(hec.messages))
  1011. }
  1012. // First 1000 messages are written to daemon log when buffer was full
  1013. for i, message := range hec.messages {
  1014. if event, err := message.EventAsMap(); err != nil {
  1015. t.Fatal(err)
  1016. } else {
  1017. if event["line"] != fmt.Sprintf("%d", i+2) {
  1018. t.Fatalf("Unexpected event in message %v", event)
  1019. }
  1020. }
  1021. }
  1022. err = hec.Close()
  1023. if err != nil {
  1024. t.Fatal(err)
  1025. }
  1026. if err := os.Setenv(envVarPostMessagesBatchSize, ""); err != nil {
  1027. t.Fatal(err)
  1028. }
  1029. if err := os.Setenv(envVarBufferMaximum, ""); err != nil {
  1030. t.Fatal(err)
  1031. }
  1032. if err := os.Setenv(envVarStreamChannelSize, ""); err != nil {
  1033. t.Fatal(err)
  1034. }
  1035. }
  1036. // Verify that we are not blocking close when HEC is down for the whole time
  1037. func TestServerAlwaysDown(t *testing.T) {
  1038. if err := os.Setenv(envVarPostMessagesBatchSize, "2"); err != nil {
  1039. t.Fatal(err)
  1040. }
  1041. if err := os.Setenv(envVarBufferMaximum, "4"); err != nil {
  1042. t.Fatal(err)
  1043. }
  1044. if err := os.Setenv(envVarStreamChannelSize, "0"); err != nil {
  1045. t.Fatal(err)
  1046. }
  1047. hec := NewHTTPEventCollectorMock(t)
  1048. hec.simulateServerError = true
  1049. go hec.Serve()
  1050. info := logger.Info{
  1051. Config: map[string]string{
  1052. splunkURLKey: hec.URL(),
  1053. splunkTokenKey: hec.token,
  1054. splunkVerifyConnectionKey: "false",
  1055. },
  1056. ContainerID: "containeriid",
  1057. ContainerName: "/container_name",
  1058. ContainerImageID: "contaimageid",
  1059. ContainerImageName: "container_image_name",
  1060. }
  1061. loggerDriver, err := New(info)
  1062. if err != nil {
  1063. t.Fatal(err)
  1064. }
  1065. if hec.connectionVerified {
  1066. t.Fatal("Connection should not be verified")
  1067. }
  1068. for i := 0; i < 5; i++ {
  1069. if err := loggerDriver.Log(&logger.Message{Line: []byte(fmt.Sprintf("%d", i)), Source: "stdout", Timestamp: time.Now()}); err != nil {
  1070. t.Fatal(err)
  1071. }
  1072. }
  1073. err = loggerDriver.Close()
  1074. if err != nil {
  1075. t.Fatal(err)
  1076. }
  1077. if len(hec.messages) != 0 {
  1078. t.Fatal("No messages should be sent")
  1079. }
  1080. err = hec.Close()
  1081. if err != nil {
  1082. t.Fatal(err)
  1083. }
  1084. if err := os.Setenv(envVarPostMessagesBatchSize, ""); err != nil {
  1085. t.Fatal(err)
  1086. }
  1087. if err := os.Setenv(envVarBufferMaximum, ""); err != nil {
  1088. t.Fatal(err)
  1089. }
  1090. if err := os.Setenv(envVarStreamChannelSize, ""); err != nil {
  1091. t.Fatal(err)
  1092. }
  1093. }
  1094. // Cannot send messages after we close driver
  1095. func TestCannotSendAfterClose(t *testing.T) {
  1096. hec := NewHTTPEventCollectorMock(t)
  1097. go hec.Serve()
  1098. info := logger.Info{
  1099. Config: map[string]string{
  1100. splunkURLKey: hec.URL(),
  1101. splunkTokenKey: hec.token,
  1102. },
  1103. ContainerID: "containeriid",
  1104. ContainerName: "/container_name",
  1105. ContainerImageID: "contaimageid",
  1106. ContainerImageName: "container_image_name",
  1107. }
  1108. loggerDriver, err := New(info)
  1109. if err != nil {
  1110. t.Fatal(err)
  1111. }
  1112. if err := loggerDriver.Log(&logger.Message{Line: []byte("message1"), Source: "stdout", Timestamp: time.Now()}); err != nil {
  1113. t.Fatal(err)
  1114. }
  1115. err = loggerDriver.Close()
  1116. if err != nil {
  1117. t.Fatal(err)
  1118. }
  1119. if err := loggerDriver.Log(&logger.Message{Line: []byte("message2"), Source: "stdout", Timestamp: time.Now()}); err == nil {
  1120. t.Fatal("Driver should not allow to send messages after close")
  1121. }
  1122. if len(hec.messages) != 1 {
  1123. t.Fatal("Only one message should be sent")
  1124. }
  1125. message := hec.messages[0]
  1126. if event, err := message.EventAsMap(); err != nil {
  1127. t.Fatal(err)
  1128. } else {
  1129. if event["line"] != "message1" {
  1130. t.Fatalf("Unexpected event in message %v", event)
  1131. }
  1132. }
  1133. err = hec.Close()
  1134. if err != nil {
  1135. t.Fatal(err)
  1136. }
  1137. }
  1138. func TestDeadlockOnBlockedEndpoint(t *testing.T) {
  1139. hec := NewHTTPEventCollectorMock(t)
  1140. go hec.Serve()
  1141. info := logger.Info{
  1142. Config: map[string]string{
  1143. splunkURLKey: hec.URL(),
  1144. splunkTokenKey: hec.token,
  1145. },
  1146. ContainerID: "containeriid",
  1147. ContainerName: "/container_name",
  1148. ContainerImageID: "contaimageid",
  1149. ContainerImageName: "container_image_name",
  1150. }
  1151. l, err := New(info)
  1152. if err != nil {
  1153. t.Fatal(err)
  1154. }
  1155. ctx, unblock := context.WithCancel(context.Background())
  1156. hec.withBlock(ctx)
  1157. defer unblock()
  1158. batchSendTimeout = 1 * time.Second
  1159. if err := l.Log(&logger.Message{}); err != nil {
  1160. t.Fatal(err)
  1161. }
  1162. done := make(chan struct{})
  1163. go func() {
  1164. l.Close()
  1165. close(done)
  1166. }()
  1167. select {
  1168. case <-time.After(60 * time.Second):
  1169. buf := make([]byte, 1e6)
  1170. buf = buf[:runtime.Stack(buf, true)]
  1171. t.Logf("STACK DUMP: \n\n%s\n\n", string(buf))
  1172. t.Fatal("timeout waiting for close to finish")
  1173. case <-done:
  1174. }
  1175. }