splunk_test.go 36 KB


  1. package splunk // import "github.com/docker/docker/daemon/logger/splunk"
  2. import (
  3. "compress/gzip"
  4. "context"
  5. "fmt"
  6. "net/http"
  7. "os"
  8. "runtime"
  9. "testing"
  10. "time"
  11. "github.com/docker/docker/daemon/logger"
  12. "gotest.tools/v3/assert"
  13. "gotest.tools/v3/env"
  14. )
  15. // Validate options
  16. func TestValidateLogOpt(t *testing.T) {
  17. err := ValidateLogOpt(map[string]string{
  18. splunkURLKey: "http://127.0.0.1",
  19. splunkTokenKey: "2160C7EF-2CE9-4307-A180-F852B99CF417",
  20. splunkSourceKey: "mysource",
  21. splunkSourceTypeKey: "mysourcetype",
  22. splunkIndexKey: "myindex",
  23. splunkCAPathKey: "/usr/cert.pem",
  24. splunkCANameKey: "ca_name",
  25. splunkInsecureSkipVerifyKey: "true",
  26. splunkFormatKey: "json",
  27. splunkVerifyConnectionKey: "true",
  28. splunkGzipCompressionKey: "true",
  29. splunkGzipCompressionLevelKey: "1",
  30. envKey: "a",
  31. envRegexKey: "^foo",
  32. labelsKey: "b",
  33. tagKey: "c",
  34. })
  35. if err != nil {
  36. t.Fatal(err)
  37. }
  38. err = ValidateLogOpt(map[string]string{
  39. "not-supported-option": "a",
  40. })
  41. if err == nil {
  42. t.Fatal("Expecting error on unsupported options")
  43. }
  44. }
  45. // Driver require user to specify required options
  46. func TestNewMissedConfig(t *testing.T) {
  47. info := logger.Info{
  48. Config: map[string]string{},
  49. }
  50. _, err := New(info)
  51. if err == nil {
  52. t.Fatal("Logger driver should fail when no required parameters specified")
  53. }
  54. }
  55. // Driver require user to specify splunk-url
  56. func TestNewMissedUrl(t *testing.T) {
  57. info := logger.Info{
  58. Config: map[string]string{
  59. splunkTokenKey: "4642492F-D8BD-47F1-A005-0C08AE4657DF",
  60. },
  61. }
  62. _, err := New(info)
  63. if err.Error() != "splunk: splunk-url is expected" {
  64. t.Fatal("Logger driver should fail when no required parameters specified")
  65. }
  66. }
  67. // Driver require user to specify splunk-token
  68. func TestNewMissedToken(t *testing.T) {
  69. info := logger.Info{
  70. Config: map[string]string{
  71. splunkURLKey: "http://127.0.0.1:8088",
  72. },
  73. }
  74. _, err := New(info)
  75. if err.Error() != "splunk: splunk-token is expected" {
  76. t.Fatal("Logger driver should fail when no required parameters specified")
  77. }
  78. }
  79. func TestNewWithProxy(t *testing.T) {
  80. proxy := "http://proxy.testing:8888"
  81. reset := env.Patch(t, "HTTP_PROXY", proxy)
  82. defer reset()
  83. // must not be localhost
  84. splunkURL := "http://example.com:12345"
  85. logger, err := New(logger.Info{
  86. Config: map[string]string{
  87. splunkURLKey: splunkURL,
  88. splunkTokenKey: "token",
  89. splunkVerifyConnectionKey: "false",
  90. },
  91. ContainerID: "containeriid",
  92. })
  93. assert.NilError(t, err)
  94. splunkLogger := logger.(*splunkLoggerInline)
  95. proxyFunc := splunkLogger.transport.Proxy
  96. assert.Assert(t, proxyFunc != nil)
  97. req, err := http.NewRequest(http.MethodGet, splunkURL, nil)
  98. assert.NilError(t, err)
  99. proxyURL, err := proxyFunc(req)
  100. assert.NilError(t, err)
  101. assert.Assert(t, proxyURL != nil)
  102. assert.Equal(t, proxy, proxyURL.String())
  103. }
  104. // Test default settings
  105. func TestDefault(t *testing.T) {
  106. hec := NewHTTPEventCollectorMock(t)
  107. go hec.Serve()
  108. info := logger.Info{
  109. Config: map[string]string{
  110. splunkURLKey: hec.URL(),
  111. splunkTokenKey: hec.token,
  112. },
  113. ContainerID: "containeriid",
  114. ContainerName: "container_name",
  115. ContainerImageID: "contaimageid",
  116. ContainerImageName: "container_image_name",
  117. }
  118. hostname, err := info.Hostname()
  119. if err != nil {
  120. t.Fatal(err)
  121. }
  122. loggerDriver, err := New(info)
  123. if err != nil {
  124. t.Fatal(err)
  125. }
  126. if loggerDriver.Name() != driverName {
  127. t.Fatal("Unexpected logger driver name")
  128. }
  129. if !hec.connectionVerified {
  130. t.Fatal("By default connection should be verified")
  131. }
  132. splunkLoggerDriver, ok := loggerDriver.(*splunkLoggerInline)
  133. if !ok {
  134. t.Fatal("Unexpected Splunk Logging Driver type")
  135. }
  136. if splunkLoggerDriver.url != hec.URL()+"/services/collector/event/1.0" ||
  137. splunkLoggerDriver.auth != "Splunk "+hec.token ||
  138. splunkLoggerDriver.nullMessage.Host != hostname ||
  139. splunkLoggerDriver.nullMessage.Source != "" ||
  140. splunkLoggerDriver.nullMessage.SourceType != "" ||
  141. splunkLoggerDriver.nullMessage.Index != "" ||
  142. splunkLoggerDriver.gzipCompression ||
  143. splunkLoggerDriver.postMessagesFrequency != defaultPostMessagesFrequency ||
  144. splunkLoggerDriver.postMessagesBatchSize != defaultPostMessagesBatchSize ||
  145. splunkLoggerDriver.bufferMaximum != defaultBufferMaximum ||
  146. cap(splunkLoggerDriver.stream) != defaultStreamChannelSize {
  147. t.Fatal("Found not default values setup in Splunk Logging Driver.")
  148. }
  149. message1Time := time.Now()
  150. if err := loggerDriver.Log(&logger.Message{Line: []byte("{\"a\":\"b\"}"), Source: "stdout", Timestamp: message1Time}); err != nil {
  151. t.Fatal(err)
  152. }
  153. message2Time := time.Now()
  154. if err := loggerDriver.Log(&logger.Message{Line: []byte("notajson"), Source: "stdout", Timestamp: message2Time}); err != nil {
  155. t.Fatal(err)
  156. }
  157. err = loggerDriver.Close()
  158. if err != nil {
  159. t.Fatal(err)
  160. }
  161. if len(hec.messages) != 2 {
  162. t.Fatal("Expected two messages")
  163. }
  164. if *hec.gzipEnabled {
  165. t.Fatal("Gzip should not be used")
  166. }
  167. message1 := hec.messages[0]
  168. if message1.Time != fmt.Sprintf("%f", float64(message1Time.UnixNano())/float64(time.Second)) ||
  169. message1.Host != hostname ||
  170. message1.Source != "" ||
  171. message1.SourceType != "" ||
  172. message1.Index != "" {
  173. t.Fatalf("Unexpected values of message 1 %v", message1)
  174. }
  175. if event, err := message1.EventAsMap(); err != nil {
  176. t.Fatal(err)
  177. } else {
  178. if event["line"] != "{\"a\":\"b\"}" ||
  179. event["source"] != "stdout" ||
  180. event["tag"] != "containeriid" ||
  181. len(event) != 3 {
  182. t.Fatalf("Unexpected event in message %v", event)
  183. }
  184. }
  185. message2 := hec.messages[1]
  186. if message2.Time != fmt.Sprintf("%f", float64(message2Time.UnixNano())/float64(time.Second)) ||
  187. message2.Host != hostname ||
  188. message2.Source != "" ||
  189. message2.SourceType != "" ||
  190. message2.Index != "" {
  191. t.Fatalf("Unexpected values of message 1 %v", message2)
  192. }
  193. if event, err := message2.EventAsMap(); err != nil {
  194. t.Fatal(err)
  195. } else {
  196. if event["line"] != "notajson" ||
  197. event["source"] != "stdout" ||
  198. event["tag"] != "containeriid" ||
  199. len(event) != 3 {
  200. t.Fatalf("Unexpected event in message %v", event)
  201. }
  202. }
  203. err = hec.Close()
  204. if err != nil {
  205. t.Fatal(err)
  206. }
  207. }
  208. // Verify inline format with a not default settings for most of options
  209. func TestInlineFormatWithNonDefaultOptions(t *testing.T) {
  210. hec := NewHTTPEventCollectorMock(t)
  211. go hec.Serve()
  212. info := logger.Info{
  213. Config: map[string]string{
  214. splunkURLKey: hec.URL(),
  215. splunkTokenKey: hec.token,
  216. splunkSourceKey: "mysource",
  217. splunkSourceTypeKey: "mysourcetype",
  218. splunkIndexKey: "myindex",
  219. splunkFormatKey: splunkFormatInline,
  220. splunkGzipCompressionKey: "true",
  221. tagKey: "{{.ImageName}}/{{.Name}}",
  222. labelsKey: "a",
  223. envRegexKey: "^foo",
  224. },
  225. ContainerID: "containeriid",
  226. ContainerName: "/container_name",
  227. ContainerImageID: "contaimageid",
  228. ContainerImageName: "container_image_name",
  229. ContainerLabels: map[string]string{
  230. "a": "b",
  231. },
  232. ContainerEnv: []string{"foo_finder=bar"},
  233. }
  234. hostname, err := info.Hostname()
  235. if err != nil {
  236. t.Fatal(err)
  237. }
  238. loggerDriver, err := New(info)
  239. if err != nil {
  240. t.Fatal(err)
  241. }
  242. if !hec.connectionVerified {
  243. t.Fatal("By default connection should be verified")
  244. }
  245. splunkLoggerDriver, ok := loggerDriver.(*splunkLoggerInline)
  246. if !ok {
  247. t.Fatal("Unexpected Splunk Logging Driver type")
  248. }
  249. if splunkLoggerDriver.url != hec.URL()+"/services/collector/event/1.0" ||
  250. splunkLoggerDriver.auth != "Splunk "+hec.token ||
  251. splunkLoggerDriver.nullMessage.Host != hostname ||
  252. splunkLoggerDriver.nullMessage.Source != "mysource" ||
  253. splunkLoggerDriver.nullMessage.SourceType != "mysourcetype" ||
  254. splunkLoggerDriver.nullMessage.Index != "myindex" ||
  255. !splunkLoggerDriver.gzipCompression ||
  256. splunkLoggerDriver.gzipCompressionLevel != gzip.DefaultCompression ||
  257. splunkLoggerDriver.postMessagesFrequency != defaultPostMessagesFrequency ||
  258. splunkLoggerDriver.postMessagesBatchSize != defaultPostMessagesBatchSize ||
  259. splunkLoggerDriver.bufferMaximum != defaultBufferMaximum ||
  260. cap(splunkLoggerDriver.stream) != defaultStreamChannelSize {
  261. t.Fatal("Values do not match configuration.")
  262. }
  263. messageTime := time.Now()
  264. if err := loggerDriver.Log(&logger.Message{Line: []byte("1"), Source: "stdout", Timestamp: messageTime}); err != nil {
  265. t.Fatal(err)
  266. }
  267. err = loggerDriver.Close()
  268. if err != nil {
  269. t.Fatal(err)
  270. }
  271. if len(hec.messages) != 1 {
  272. t.Fatal("Expected one message")
  273. }
  274. if !*hec.gzipEnabled {
  275. t.Fatal("Gzip should be used")
  276. }
  277. message := hec.messages[0]
  278. if message.Time != fmt.Sprintf("%f", float64(messageTime.UnixNano())/float64(time.Second)) ||
  279. message.Host != hostname ||
  280. message.Source != "mysource" ||
  281. message.SourceType != "mysourcetype" ||
  282. message.Index != "myindex" {
  283. t.Fatalf("Unexpected values of message %v", message)
  284. }
  285. if event, err := message.EventAsMap(); err != nil {
  286. t.Fatal(err)
  287. } else {
  288. if event["line"] != "1" ||
  289. event["source"] != "stdout" ||
  290. event["tag"] != "container_image_name/container_name" ||
  291. event["attrs"].(map[string]interface{})["a"] != "b" ||
  292. event["attrs"].(map[string]interface{})["foo_finder"] != "bar" ||
  293. len(event) != 4 {
  294. t.Fatalf("Unexpected event in message %v", event)
  295. }
  296. }
  297. err = hec.Close()
  298. if err != nil {
  299. t.Fatal(err)
  300. }
  301. }
  302. // Verify JSON format
  303. func TestJsonFormat(t *testing.T) {
  304. hec := NewHTTPEventCollectorMock(t)
  305. go hec.Serve()
  306. info := logger.Info{
  307. Config: map[string]string{
  308. splunkURLKey: hec.URL(),
  309. splunkTokenKey: hec.token,
  310. splunkFormatKey: splunkFormatJSON,
  311. splunkGzipCompressionKey: "true",
  312. splunkGzipCompressionLevelKey: "1",
  313. },
  314. ContainerID: "containeriid",
  315. ContainerName: "/container_name",
  316. ContainerImageID: "contaimageid",
  317. ContainerImageName: "container_image_name",
  318. }
  319. hostname, err := info.Hostname()
  320. if err != nil {
  321. t.Fatal(err)
  322. }
  323. loggerDriver, err := New(info)
  324. if err != nil {
  325. t.Fatal(err)
  326. }
  327. if !hec.connectionVerified {
  328. t.Fatal("By default connection should be verified")
  329. }
  330. splunkLoggerDriver, ok := loggerDriver.(*splunkLoggerJSON)
  331. if !ok {
  332. t.Fatal("Unexpected Splunk Logging Driver type")
  333. }
  334. if splunkLoggerDriver.url != hec.URL()+"/services/collector/event/1.0" ||
  335. splunkLoggerDriver.auth != "Splunk "+hec.token ||
  336. splunkLoggerDriver.nullMessage.Host != hostname ||
  337. splunkLoggerDriver.nullMessage.Source != "" ||
  338. splunkLoggerDriver.nullMessage.SourceType != "" ||
  339. splunkLoggerDriver.nullMessage.Index != "" ||
  340. !splunkLoggerDriver.gzipCompression ||
  341. splunkLoggerDriver.gzipCompressionLevel != gzip.BestSpeed ||
  342. splunkLoggerDriver.postMessagesFrequency != defaultPostMessagesFrequency ||
  343. splunkLoggerDriver.postMessagesBatchSize != defaultPostMessagesBatchSize ||
  344. splunkLoggerDriver.bufferMaximum != defaultBufferMaximum ||
  345. cap(splunkLoggerDriver.stream) != defaultStreamChannelSize {
  346. t.Fatal("Values do not match configuration.")
  347. }
  348. message1Time := time.Now()
  349. if err := loggerDriver.Log(&logger.Message{Line: []byte("{\"a\":\"b\"}"), Source: "stdout", Timestamp: message1Time}); err != nil {
  350. t.Fatal(err)
  351. }
  352. message2Time := time.Now()
  353. if err := loggerDriver.Log(&logger.Message{Line: []byte("notjson"), Source: "stdout", Timestamp: message2Time}); err != nil {
  354. t.Fatal(err)
  355. }
  356. err = loggerDriver.Close()
  357. if err != nil {
  358. t.Fatal(err)
  359. }
  360. if len(hec.messages) != 2 {
  361. t.Fatal("Expected two messages")
  362. }
  363. message1 := hec.messages[0]
  364. if message1.Time != fmt.Sprintf("%f", float64(message1Time.UnixNano())/float64(time.Second)) ||
  365. message1.Host != hostname ||
  366. message1.Source != "" ||
  367. message1.SourceType != "" ||
  368. message1.Index != "" {
  369. t.Fatalf("Unexpected values of message 1 %v", message1)
  370. }
  371. if event, err := message1.EventAsMap(); err != nil {
  372. t.Fatal(err)
  373. } else {
  374. if event["line"].(map[string]interface{})["a"] != "b" ||
  375. event["source"] != "stdout" ||
  376. event["tag"] != "containeriid" ||
  377. len(event) != 3 {
  378. t.Fatalf("Unexpected event in message 1 %v", event)
  379. }
  380. }
  381. message2 := hec.messages[1]
  382. if message2.Time != fmt.Sprintf("%f", float64(message2Time.UnixNano())/float64(time.Second)) ||
  383. message2.Host != hostname ||
  384. message2.Source != "" ||
  385. message2.SourceType != "" ||
  386. message2.Index != "" {
  387. t.Fatalf("Unexpected values of message 2 %v", message2)
  388. }
  389. // If message cannot be parsed as JSON - it should be sent as a line
  390. if event, err := message2.EventAsMap(); err != nil {
  391. t.Fatal(err)
  392. } else {
  393. if event["line"] != "notjson" ||
  394. event["source"] != "stdout" ||
  395. event["tag"] != "containeriid" ||
  396. len(event) != 3 {
  397. t.Fatalf("Unexpected event in message 2 %v", event)
  398. }
  399. }
  400. err = hec.Close()
  401. if err != nil {
  402. t.Fatal(err)
  403. }
  404. }
  405. // Verify raw format
  406. func TestRawFormat(t *testing.T) {
  407. hec := NewHTTPEventCollectorMock(t)
  408. go hec.Serve()
  409. info := logger.Info{
  410. Config: map[string]string{
  411. splunkURLKey: hec.URL(),
  412. splunkTokenKey: hec.token,
  413. splunkFormatKey: splunkFormatRaw,
  414. },
  415. ContainerID: "containeriid",
  416. ContainerName: "/container_name",
  417. ContainerImageID: "contaimageid",
  418. ContainerImageName: "container_image_name",
  419. }
  420. hostname, err := info.Hostname()
  421. assert.NilError(t, err)
  422. loggerDriver, err := New(info)
  423. assert.NilError(t, err)
  424. if !hec.connectionVerified {
  425. t.Fatal("By default connection should be verified")
  426. }
  427. splunkLoggerDriver, ok := loggerDriver.(*splunkLoggerRaw)
  428. if !ok {
  429. t.Fatal("Unexpected Splunk Logging Driver type")
  430. }
  431. if splunkLoggerDriver.url != hec.URL()+"/services/collector/event/1.0" ||
  432. splunkLoggerDriver.auth != "Splunk "+hec.token ||
  433. splunkLoggerDriver.nullMessage.Host != hostname ||
  434. splunkLoggerDriver.nullMessage.Source != "" ||
  435. splunkLoggerDriver.nullMessage.SourceType != "" ||
  436. splunkLoggerDriver.nullMessage.Index != "" ||
  437. splunkLoggerDriver.gzipCompression ||
  438. splunkLoggerDriver.postMessagesFrequency != defaultPostMessagesFrequency ||
  439. splunkLoggerDriver.postMessagesBatchSize != defaultPostMessagesBatchSize ||
  440. splunkLoggerDriver.bufferMaximum != defaultBufferMaximum ||
  441. cap(splunkLoggerDriver.stream) != defaultStreamChannelSize ||
  442. string(splunkLoggerDriver.prefix) != "containeriid " {
  443. t.Fatal("Values do not match configuration.")
  444. }
  445. message1Time := time.Now()
  446. if err := loggerDriver.Log(&logger.Message{Line: []byte("{\"a\":\"b\"}"), Source: "stdout", Timestamp: message1Time}); err != nil {
  447. t.Fatal(err)
  448. }
  449. message2Time := time.Now()
  450. if err := loggerDriver.Log(&logger.Message{Line: []byte("notjson"), Source: "stdout", Timestamp: message2Time}); err != nil {
  451. t.Fatal(err)
  452. }
  453. err = loggerDriver.Close()
  454. if err != nil {
  455. t.Fatal(err)
  456. }
  457. if len(hec.messages) != 2 {
  458. t.Fatal("Expected two messages")
  459. }
  460. message1 := hec.messages[0]
  461. if message1.Time != fmt.Sprintf("%f", float64(message1Time.UnixNano())/float64(time.Second)) ||
  462. message1.Host != hostname ||
  463. message1.Source != "" ||
  464. message1.SourceType != "" ||
  465. message1.Index != "" {
  466. t.Fatalf("Unexpected values of message 1 %v", message1)
  467. }
  468. if event, err := message1.EventAsString(); err != nil {
  469. t.Fatal(err)
  470. } else {
  471. if event != "containeriid {\"a\":\"b\"}" {
  472. t.Fatalf("Unexpected event in message 1 %v", event)
  473. }
  474. }
  475. message2 := hec.messages[1]
  476. if message2.Time != fmt.Sprintf("%f", float64(message2Time.UnixNano())/float64(time.Second)) ||
  477. message2.Host != hostname ||
  478. message2.Source != "" ||
  479. message2.SourceType != "" ||
  480. message2.Index != "" {
  481. t.Fatalf("Unexpected values of message 2 %v", message2)
  482. }
  483. if event, err := message2.EventAsString(); err != nil {
  484. t.Fatal(err)
  485. } else {
  486. if event != "containeriid notjson" {
  487. t.Fatalf("Unexpected event in message 1 %v", event)
  488. }
  489. }
  490. err = hec.Close()
  491. if err != nil {
  492. t.Fatal(err)
  493. }
  494. }
  495. // Verify raw format with labels
  496. func TestRawFormatWithLabels(t *testing.T) {
  497. hec := NewHTTPEventCollectorMock(t)
  498. go hec.Serve()
  499. info := logger.Info{
  500. Config: map[string]string{
  501. splunkURLKey: hec.URL(),
  502. splunkTokenKey: hec.token,
  503. splunkFormatKey: splunkFormatRaw,
  504. labelsKey: "a",
  505. },
  506. ContainerID: "containeriid",
  507. ContainerName: "/container_name",
  508. ContainerImageID: "contaimageid",
  509. ContainerImageName: "container_image_name",
  510. ContainerLabels: map[string]string{
  511. "a": "b",
  512. },
  513. }
  514. hostname, err := info.Hostname()
  515. if err != nil {
  516. t.Fatal(err)
  517. }
  518. loggerDriver, err := New(info)
  519. if err != nil {
  520. t.Fatal(err)
  521. }
  522. if !hec.connectionVerified {
  523. t.Fatal("By default connection should be verified")
  524. }
  525. splunkLoggerDriver, ok := loggerDriver.(*splunkLoggerRaw)
  526. if !ok {
  527. t.Fatal("Unexpected Splunk Logging Driver type")
  528. }
  529. if splunkLoggerDriver.url != hec.URL()+"/services/collector/event/1.0" ||
  530. splunkLoggerDriver.auth != "Splunk "+hec.token ||
  531. splunkLoggerDriver.nullMessage.Host != hostname ||
  532. splunkLoggerDriver.nullMessage.Source != "" ||
  533. splunkLoggerDriver.nullMessage.SourceType != "" ||
  534. splunkLoggerDriver.nullMessage.Index != "" ||
  535. splunkLoggerDriver.gzipCompression ||
  536. splunkLoggerDriver.postMessagesFrequency != defaultPostMessagesFrequency ||
  537. splunkLoggerDriver.postMessagesBatchSize != defaultPostMessagesBatchSize ||
  538. splunkLoggerDriver.bufferMaximum != defaultBufferMaximum ||
  539. cap(splunkLoggerDriver.stream) != defaultStreamChannelSize ||
  540. string(splunkLoggerDriver.prefix) != "containeriid a=b " {
  541. t.Fatal("Values do not match configuration.")
  542. }
  543. message1Time := time.Now()
  544. if err := loggerDriver.Log(&logger.Message{Line: []byte("{\"a\":\"b\"}"), Source: "stdout", Timestamp: message1Time}); err != nil {
  545. t.Fatal(err)
  546. }
  547. message2Time := time.Now()
  548. if err := loggerDriver.Log(&logger.Message{Line: []byte("notjson"), Source: "stdout", Timestamp: message2Time}); err != nil {
  549. t.Fatal(err)
  550. }
  551. err = loggerDriver.Close()
  552. if err != nil {
  553. t.Fatal(err)
  554. }
  555. if len(hec.messages) != 2 {
  556. t.Fatal("Expected two messages")
  557. }
  558. message1 := hec.messages[0]
  559. if message1.Time != fmt.Sprintf("%f", float64(message1Time.UnixNano())/float64(time.Second)) ||
  560. message1.Host != hostname ||
  561. message1.Source != "" ||
  562. message1.SourceType != "" ||
  563. message1.Index != "" {
  564. t.Fatalf("Unexpected values of message 1 %v", message1)
  565. }
  566. if event, err := message1.EventAsString(); err != nil {
  567. t.Fatal(err)
  568. } else {
  569. if event != "containeriid a=b {\"a\":\"b\"}" {
  570. t.Fatalf("Unexpected event in message 1 %v", event)
  571. }
  572. }
  573. message2 := hec.messages[1]
  574. if message2.Time != fmt.Sprintf("%f", float64(message2Time.UnixNano())/float64(time.Second)) ||
  575. message2.Host != hostname ||
  576. message2.Source != "" ||
  577. message2.SourceType != "" ||
  578. message2.Index != "" {
  579. t.Fatalf("Unexpected values of message 2 %v", message2)
  580. }
  581. if event, err := message2.EventAsString(); err != nil {
  582. t.Fatal(err)
  583. } else {
  584. if event != "containeriid a=b notjson" {
  585. t.Fatalf("Unexpected event in message 2 %v", event)
  586. }
  587. }
  588. err = hec.Close()
  589. if err != nil {
  590. t.Fatal(err)
  591. }
  592. }
  593. // Verify that Splunk Logging Driver can accept tag="" which will allow to send raw messages
  594. // in the same way we get them in stdout/stderr
  595. func TestRawFormatWithoutTag(t *testing.T) {
  596. hec := NewHTTPEventCollectorMock(t)
  597. go hec.Serve()
  598. info := logger.Info{
  599. Config: map[string]string{
  600. splunkURLKey: hec.URL(),
  601. splunkTokenKey: hec.token,
  602. splunkFormatKey: splunkFormatRaw,
  603. tagKey: "",
  604. },
  605. ContainerID: "containeriid",
  606. ContainerName: "/container_name",
  607. ContainerImageID: "contaimageid",
  608. ContainerImageName: "container_image_name",
  609. }
  610. hostname, err := info.Hostname()
  611. if err != nil {
  612. t.Fatal(err)
  613. }
  614. loggerDriver, err := New(info)
  615. if err != nil {
  616. t.Fatal(err)
  617. }
  618. if !hec.connectionVerified {
  619. t.Fatal("By default connection should be verified")
  620. }
  621. splunkLoggerDriver, ok := loggerDriver.(*splunkLoggerRaw)
  622. if !ok {
  623. t.Fatal("Unexpected Splunk Logging Driver type")
  624. }
  625. if splunkLoggerDriver.url != hec.URL()+"/services/collector/event/1.0" ||
  626. splunkLoggerDriver.auth != "Splunk "+hec.token ||
  627. splunkLoggerDriver.nullMessage.Host != hostname ||
  628. splunkLoggerDriver.nullMessage.Source != "" ||
  629. splunkLoggerDriver.nullMessage.SourceType != "" ||
  630. splunkLoggerDriver.nullMessage.Index != "" ||
  631. splunkLoggerDriver.gzipCompression ||
  632. splunkLoggerDriver.postMessagesFrequency != defaultPostMessagesFrequency ||
  633. splunkLoggerDriver.postMessagesBatchSize != defaultPostMessagesBatchSize ||
  634. splunkLoggerDriver.bufferMaximum != defaultBufferMaximum ||
  635. cap(splunkLoggerDriver.stream) != defaultStreamChannelSize ||
  636. string(splunkLoggerDriver.prefix) != "" {
  637. t.Log(string(splunkLoggerDriver.prefix) + "a")
  638. t.Fatal("Values do not match configuration.")
  639. }
  640. message1Time := time.Now()
  641. if err := loggerDriver.Log(&logger.Message{Line: []byte("{\"a\":\"b\"}"), Source: "stdout", Timestamp: message1Time}); err != nil {
  642. t.Fatal(err)
  643. }
  644. message2Time := time.Now()
  645. if err := loggerDriver.Log(&logger.Message{Line: []byte("notjson"), Source: "stdout", Timestamp: message2Time}); err != nil {
  646. t.Fatal(err)
  647. }
  648. message3Time := time.Now()
  649. if err := loggerDriver.Log(&logger.Message{Line: []byte(" "), Source: "stdout", Timestamp: message3Time}); err != nil {
  650. t.Fatal(err)
  651. }
  652. err = loggerDriver.Close()
  653. if err != nil {
  654. t.Fatal(err)
  655. }
  656. // message3 would have an empty or whitespace only string in the "event" field
  657. // both of which are not acceptable to HEC
  658. // thus here we must expect 2 messages, not 3
  659. if len(hec.messages) != 2 {
  660. t.Fatal("Expected two messages")
  661. }
  662. message1 := hec.messages[0]
  663. if message1.Time != fmt.Sprintf("%f", float64(message1Time.UnixNano())/float64(time.Second)) ||
  664. message1.Host != hostname ||
  665. message1.Source != "" ||
  666. message1.SourceType != "" ||
  667. message1.Index != "" {
  668. t.Fatalf("Unexpected values of message 1 %v", message1)
  669. }
  670. if event, err := message1.EventAsString(); err != nil {
  671. t.Fatal(err)
  672. } else {
  673. if event != "{\"a\":\"b\"}" {
  674. t.Fatalf("Unexpected event in message 1 %v", event)
  675. }
  676. }
  677. message2 := hec.messages[1]
  678. if message2.Time != fmt.Sprintf("%f", float64(message2Time.UnixNano())/float64(time.Second)) ||
  679. message2.Host != hostname ||
  680. message2.Source != "" ||
  681. message2.SourceType != "" ||
  682. message2.Index != "" {
  683. t.Fatalf("Unexpected values of message 2 %v", message2)
  684. }
  685. if event, err := message2.EventAsString(); err != nil {
  686. t.Fatal(err)
  687. } else {
  688. if event != "notjson" {
  689. t.Fatalf("Unexpected event in message 2 %v", event)
  690. }
  691. }
  692. err = hec.Close()
  693. if err != nil {
  694. t.Fatal(err)
  695. }
  696. }
  697. // Verify that we will send messages in batches with default batching parameters,
  698. // but change frequency to be sure that numOfRequests will match expected 17 requests
  699. func TestBatching(t *testing.T) {
  700. if err := os.Setenv(envVarPostMessagesFrequency, "10h"); err != nil {
  701. t.Fatal(err)
  702. }
  703. hec := NewHTTPEventCollectorMock(t)
  704. go hec.Serve()
  705. info := logger.Info{
  706. Config: map[string]string{
  707. splunkURLKey: hec.URL(),
  708. splunkTokenKey: hec.token,
  709. },
  710. ContainerID: "containeriid",
  711. ContainerName: "/container_name",
  712. ContainerImageID: "contaimageid",
  713. ContainerImageName: "container_image_name",
  714. }
  715. loggerDriver, err := New(info)
  716. if err != nil {
  717. t.Fatal(err)
  718. }
  719. for i := 0; i < defaultStreamChannelSize*4; i++ {
  720. if err := loggerDriver.Log(&logger.Message{Line: []byte(fmt.Sprintf("%d", i)), Source: "stdout", Timestamp: time.Now()}); err != nil {
  721. t.Fatal(err)
  722. }
  723. }
  724. err = loggerDriver.Close()
  725. if err != nil {
  726. t.Fatal(err)
  727. }
  728. if len(hec.messages) != defaultStreamChannelSize*4 {
  729. t.Fatal("Not all messages delivered")
  730. }
  731. for i, message := range hec.messages {
  732. if event, err := message.EventAsMap(); err != nil {
  733. t.Fatal(err)
  734. } else {
  735. if event["line"] != fmt.Sprintf("%d", i) {
  736. t.Fatalf("Unexpected event in message %v", event)
  737. }
  738. }
  739. }
  740. // 1 to verify connection and 16 batches
  741. if hec.numOfRequests != 17 {
  742. t.Fatalf("Unexpected number of requests %d", hec.numOfRequests)
  743. }
  744. err = hec.Close()
  745. if err != nil {
  746. t.Fatal(err)
  747. }
  748. if err := os.Setenv(envVarPostMessagesFrequency, ""); err != nil {
  749. t.Fatal(err)
  750. }
  751. }
  752. // Verify that test is using time to fire events not rare than specified frequency
  753. func TestFrequency(t *testing.T) {
  754. if err := os.Setenv(envVarPostMessagesFrequency, "5ms"); err != nil {
  755. t.Fatal(err)
  756. }
  757. hec := NewHTTPEventCollectorMock(t)
  758. go hec.Serve()
  759. info := logger.Info{
  760. Config: map[string]string{
  761. splunkURLKey: hec.URL(),
  762. splunkTokenKey: hec.token,
  763. },
  764. ContainerID: "containeriid",
  765. ContainerName: "/container_name",
  766. ContainerImageID: "contaimageid",
  767. ContainerImageName: "container_image_name",
  768. }
  769. loggerDriver, err := New(info)
  770. if err != nil {
  771. t.Fatal(err)
  772. }
  773. for i := 0; i < 10; i++ {
  774. if err := loggerDriver.Log(&logger.Message{Line: []byte(fmt.Sprintf("%d", i)), Source: "stdout", Timestamp: time.Now()}); err != nil {
  775. t.Fatal(err)
  776. }
  777. time.Sleep(15 * time.Millisecond)
  778. }
  779. err = loggerDriver.Close()
  780. if err != nil {
  781. t.Fatal(err)
  782. }
  783. if len(hec.messages) != 10 {
  784. t.Fatal("Not all messages delivered")
  785. }
  786. for i, message := range hec.messages {
  787. if event, err := message.EventAsMap(); err != nil {
  788. t.Fatal(err)
  789. } else {
  790. if event["line"] != fmt.Sprintf("%d", i) {
  791. t.Fatalf("Unexpected event in message %v", event)
  792. }
  793. }
  794. }
  795. // 1 to verify connection and 10 to verify that we have sent messages with required frequency,
  796. // but because frequency is too small (to keep test quick), instead of 11, use 9 if context switches will be slow
  797. expectedRequests := 9
  798. if runtime.GOOS == "windows" {
  799. // sometimes in Windows, this test fails with number of requests showing 8. So be more conservative.
  800. expectedRequests = 7
  801. }
  802. if hec.numOfRequests < expectedRequests {
  803. t.Fatalf("Unexpected number of requests %d", hec.numOfRequests)
  804. }
  805. err = hec.Close()
  806. if err != nil {
  807. t.Fatal(err)
  808. }
  809. if err := os.Setenv(envVarPostMessagesFrequency, ""); err != nil {
  810. t.Fatal(err)
  811. }
  812. }
  813. // Simulate behavior similar to first version of Splunk Logging Driver, when we were sending one message
  814. // per request
  815. func TestOneMessagePerRequest(t *testing.T) {
  816. if err := os.Setenv(envVarPostMessagesFrequency, "10h"); err != nil {
  817. t.Fatal(err)
  818. }
  819. if err := os.Setenv(envVarPostMessagesBatchSize, "1"); err != nil {
  820. t.Fatal(err)
  821. }
  822. if err := os.Setenv(envVarBufferMaximum, "1"); err != nil {
  823. t.Fatal(err)
  824. }
  825. if err := os.Setenv(envVarStreamChannelSize, "0"); err != nil {
  826. t.Fatal(err)
  827. }
  828. hec := NewHTTPEventCollectorMock(t)
  829. go hec.Serve()
  830. info := logger.Info{
  831. Config: map[string]string{
  832. splunkURLKey: hec.URL(),
  833. splunkTokenKey: hec.token,
  834. },
  835. ContainerID: "containeriid",
  836. ContainerName: "/container_name",
  837. ContainerImageID: "contaimageid",
  838. ContainerImageName: "container_image_name",
  839. }
  840. loggerDriver, err := New(info)
  841. if err != nil {
  842. t.Fatal(err)
  843. }
  844. for i := 0; i < 10; i++ {
  845. if err := loggerDriver.Log(&logger.Message{Line: []byte(fmt.Sprintf("%d", i)), Source: "stdout", Timestamp: time.Now()}); err != nil {
  846. t.Fatal(err)
  847. }
  848. }
  849. err = loggerDriver.Close()
  850. if err != nil {
  851. t.Fatal(err)
  852. }
  853. if len(hec.messages) != 10 {
  854. t.Fatal("Not all messages delivered")
  855. }
  856. for i, message := range hec.messages {
  857. if event, err := message.EventAsMap(); err != nil {
  858. t.Fatal(err)
  859. } else {
  860. if event["line"] != fmt.Sprintf("%d", i) {
  861. t.Fatalf("Unexpected event in message %v", event)
  862. }
  863. }
  864. }
  865. // 1 to verify connection and 10 messages
  866. if hec.numOfRequests != 11 {
  867. t.Fatalf("Unexpected number of requests %d", hec.numOfRequests)
  868. }
  869. err = hec.Close()
  870. if err != nil {
  871. t.Fatal(err)
  872. }
  873. if err := os.Setenv(envVarPostMessagesFrequency, ""); err != nil {
  874. t.Fatal(err)
  875. }
  876. if err := os.Setenv(envVarPostMessagesBatchSize, ""); err != nil {
  877. t.Fatal(err)
  878. }
  879. if err := os.Setenv(envVarBufferMaximum, ""); err != nil {
  880. t.Fatal(err)
  881. }
  882. if err := os.Setenv(envVarStreamChannelSize, ""); err != nil {
  883. t.Fatal(err)
  884. }
  885. }
  886. // Driver should not be created when HEC is unresponsive
  887. func TestVerify(t *testing.T) {
  888. hec := NewHTTPEventCollectorMock(t)
  889. hec.simulateServerError = true
  890. go hec.Serve()
  891. info := logger.Info{
  892. Config: map[string]string{
  893. splunkURLKey: hec.URL(),
  894. splunkTokenKey: hec.token,
  895. },
  896. ContainerID: "containeriid",
  897. ContainerName: "/container_name",
  898. ContainerImageID: "contaimageid",
  899. ContainerImageName: "container_image_name",
  900. }
  901. _, err := New(info)
  902. if err == nil {
  903. t.Fatal("Expecting driver to fail, when server is unresponsive")
  904. }
  905. err = hec.Close()
  906. if err != nil {
  907. t.Fatal(err)
  908. }
  909. }
  910. // Verify that user can specify to skip verification that Splunk HEC is working.
  911. // Also in this test we verify retry logic.
  912. func TestSkipVerify(t *testing.T) {
  913. hec := NewHTTPEventCollectorMock(t)
  914. hec.simulateServerError = true
  915. go hec.Serve()
  916. info := logger.Info{
  917. Config: map[string]string{
  918. splunkURLKey: hec.URL(),
  919. splunkTokenKey: hec.token,
  920. splunkVerifyConnectionKey: "false",
  921. },
  922. ContainerID: "containeriid",
  923. ContainerName: "/container_name",
  924. ContainerImageID: "contaimageid",
  925. ContainerImageName: "container_image_name",
  926. }
  927. loggerDriver, err := New(info)
  928. if err != nil {
  929. t.Fatal(err)
  930. }
  931. if hec.connectionVerified {
  932. t.Fatal("Connection should not be verified")
  933. }
  934. for i := 0; i < defaultStreamChannelSize*2; i++ {
  935. if err := loggerDriver.Log(&logger.Message{Line: []byte(fmt.Sprintf("%d", i)), Source: "stdout", Timestamp: time.Now()}); err != nil {
  936. t.Fatal(err)
  937. }
  938. }
  939. if len(hec.messages) != 0 {
  940. t.Fatal("No messages should be accepted at this point")
  941. }
  942. hec.simulateErr(false)
  943. for i := defaultStreamChannelSize * 2; i < defaultStreamChannelSize*4; i++ {
  944. if err := loggerDriver.Log(&logger.Message{Line: []byte(fmt.Sprintf("%d", i)), Source: "stdout", Timestamp: time.Now()}); err != nil {
  945. t.Fatal(err)
  946. }
  947. }
  948. err = loggerDriver.Close()
  949. if err != nil {
  950. t.Fatal(err)
  951. }
  952. if len(hec.messages) != defaultStreamChannelSize*4 {
  953. t.Fatal("Not all messages delivered")
  954. }
  955. for i, message := range hec.messages {
  956. if event, err := message.EventAsMap(); err != nil {
  957. t.Fatal(err)
  958. } else {
  959. if event["line"] != fmt.Sprintf("%d", i) {
  960. t.Fatalf("Unexpected event in message %v", event)
  961. }
  962. }
  963. }
  964. err = hec.Close()
  965. if err != nil {
  966. t.Fatal(err)
  967. }
  968. }
  969. // Verify logic for when we filled whole buffer
  970. func TestBufferMaximum(t *testing.T) {
  971. if err := os.Setenv(envVarPostMessagesBatchSize, "2"); err != nil {
  972. t.Fatal(err)
  973. }
  974. if err := os.Setenv(envVarBufferMaximum, "10"); err != nil {
  975. t.Fatal(err)
  976. }
  977. if err := os.Setenv(envVarStreamChannelSize, "0"); err != nil {
  978. t.Fatal(err)
  979. }
  980. hec := NewHTTPEventCollectorMock(t)
  981. hec.simulateErr(true)
  982. go hec.Serve()
  983. info := logger.Info{
  984. Config: map[string]string{
  985. splunkURLKey: hec.URL(),
  986. splunkTokenKey: hec.token,
  987. splunkVerifyConnectionKey: "false",
  988. },
  989. ContainerID: "containeriid",
  990. ContainerName: "/container_name",
  991. ContainerImageID: "contaimageid",
  992. ContainerImageName: "container_image_name",
  993. }
  994. loggerDriver, err := New(info)
  995. if err != nil {
  996. t.Fatal(err)
  997. }
  998. if hec.connectionVerified {
  999. t.Fatal("Connection should not be verified")
  1000. }
  1001. for i := 0; i < 11; i++ {
  1002. if err := loggerDriver.Log(&logger.Message{Line: []byte(fmt.Sprintf("%d", i)), Source: "stdout", Timestamp: time.Now()}); err != nil {
  1003. t.Fatal(err)
  1004. }
  1005. }
  1006. if len(hec.messages) != 0 {
  1007. t.Fatal("No messages should be accepted at this point")
  1008. }
  1009. hec.simulateServerError = false
  1010. err = loggerDriver.Close()
  1011. if err != nil {
  1012. t.Fatal(err)
  1013. }
  1014. if len(hec.messages) != 9 {
  1015. t.Fatalf("Expected # of messages %d, got %d", 9, len(hec.messages))
  1016. }
  1017. // First 1000 messages are written to daemon log when buffer was full
  1018. for i, message := range hec.messages {
  1019. if event, err := message.EventAsMap(); err != nil {
  1020. t.Fatal(err)
  1021. } else {
  1022. if event["line"] != fmt.Sprintf("%d", i+2) {
  1023. t.Fatalf("Unexpected event in message %v", event)
  1024. }
  1025. }
  1026. }
  1027. err = hec.Close()
  1028. if err != nil {
  1029. t.Fatal(err)
  1030. }
  1031. if err := os.Setenv(envVarPostMessagesBatchSize, ""); err != nil {
  1032. t.Fatal(err)
  1033. }
  1034. if err := os.Setenv(envVarBufferMaximum, ""); err != nil {
  1035. t.Fatal(err)
  1036. }
  1037. if err := os.Setenv(envVarStreamChannelSize, ""); err != nil {
  1038. t.Fatal(err)
  1039. }
  1040. }
  1041. // Verify that we are not blocking close when HEC is down for the whole time
  1042. func TestServerAlwaysDown(t *testing.T) {
  1043. if err := os.Setenv(envVarPostMessagesBatchSize, "2"); err != nil {
  1044. t.Fatal(err)
  1045. }
  1046. if err := os.Setenv(envVarBufferMaximum, "4"); err != nil {
  1047. t.Fatal(err)
  1048. }
  1049. if err := os.Setenv(envVarStreamChannelSize, "0"); err != nil {
  1050. t.Fatal(err)
  1051. }
  1052. hec := NewHTTPEventCollectorMock(t)
  1053. hec.simulateServerError = true
  1054. go hec.Serve()
  1055. info := logger.Info{
  1056. Config: map[string]string{
  1057. splunkURLKey: hec.URL(),
  1058. splunkTokenKey: hec.token,
  1059. splunkVerifyConnectionKey: "false",
  1060. },
  1061. ContainerID: "containeriid",
  1062. ContainerName: "/container_name",
  1063. ContainerImageID: "contaimageid",
  1064. ContainerImageName: "container_image_name",
  1065. }
  1066. loggerDriver, err := New(info)
  1067. if err != nil {
  1068. t.Fatal(err)
  1069. }
  1070. if hec.connectionVerified {
  1071. t.Fatal("Connection should not be verified")
  1072. }
  1073. for i := 0; i < 5; i++ {
  1074. if err := loggerDriver.Log(&logger.Message{Line: []byte(fmt.Sprintf("%d", i)), Source: "stdout", Timestamp: time.Now()}); err != nil {
  1075. t.Fatal(err)
  1076. }
  1077. }
  1078. err = loggerDriver.Close()
  1079. if err != nil {
  1080. t.Fatal(err)
  1081. }
  1082. if len(hec.messages) != 0 {
  1083. t.Fatal("No messages should be sent")
  1084. }
  1085. err = hec.Close()
  1086. if err != nil {
  1087. t.Fatal(err)
  1088. }
  1089. if err := os.Setenv(envVarPostMessagesBatchSize, ""); err != nil {
  1090. t.Fatal(err)
  1091. }
  1092. if err := os.Setenv(envVarBufferMaximum, ""); err != nil {
  1093. t.Fatal(err)
  1094. }
  1095. if err := os.Setenv(envVarStreamChannelSize, ""); err != nil {
  1096. t.Fatal(err)
  1097. }
  1098. }
  1099. // Cannot send messages after we close driver
  1100. func TestCannotSendAfterClose(t *testing.T) {
  1101. hec := NewHTTPEventCollectorMock(t)
  1102. go hec.Serve()
  1103. info := logger.Info{
  1104. Config: map[string]string{
  1105. splunkURLKey: hec.URL(),
  1106. splunkTokenKey: hec.token,
  1107. },
  1108. ContainerID: "containeriid",
  1109. ContainerName: "/container_name",
  1110. ContainerImageID: "contaimageid",
  1111. ContainerImageName: "container_image_name",
  1112. }
  1113. loggerDriver, err := New(info)
  1114. if err != nil {
  1115. t.Fatal(err)
  1116. }
  1117. if err := loggerDriver.Log(&logger.Message{Line: []byte("message1"), Source: "stdout", Timestamp: time.Now()}); err != nil {
  1118. t.Fatal(err)
  1119. }
  1120. err = loggerDriver.Close()
  1121. if err != nil {
  1122. t.Fatal(err)
  1123. }
  1124. if err := loggerDriver.Log(&logger.Message{Line: []byte("message2"), Source: "stdout", Timestamp: time.Now()}); err == nil {
  1125. t.Fatal("Driver should not allow to send messages after close")
  1126. }
  1127. if len(hec.messages) != 1 {
  1128. t.Fatal("Only one message should be sent")
  1129. }
  1130. message := hec.messages[0]
  1131. if event, err := message.EventAsMap(); err != nil {
  1132. t.Fatal(err)
  1133. } else {
  1134. if event["line"] != "message1" {
  1135. t.Fatalf("Unexpected event in message %v", event)
  1136. }
  1137. }
  1138. err = hec.Close()
  1139. if err != nil {
  1140. t.Fatal(err)
  1141. }
  1142. }
  1143. func TestDeadlockOnBlockedEndpoint(t *testing.T) {
  1144. hec := NewHTTPEventCollectorMock(t)
  1145. go hec.Serve()
  1146. info := logger.Info{
  1147. Config: map[string]string{
  1148. splunkURLKey: hec.URL(),
  1149. splunkTokenKey: hec.token,
  1150. },
  1151. ContainerID: "containeriid",
  1152. ContainerName: "/container_name",
  1153. ContainerImageID: "contaimageid",
  1154. ContainerImageName: "container_image_name",
  1155. }
  1156. l, err := New(info)
  1157. if err != nil {
  1158. t.Fatal(err)
  1159. }
  1160. ctx, unblock := context.WithCancel(context.Background())
  1161. hec.withBlock(ctx)
  1162. defer unblock()
  1163. batchSendTimeout = 1 * time.Second
  1164. if err := l.Log(&logger.Message{}); err != nil {
  1165. t.Fatal(err)
  1166. }
  1167. done := make(chan struct{})
  1168. go func() {
  1169. l.Close()
  1170. close(done)
  1171. }()
  1172. select {
  1173. case <-time.After(60 * time.Second):
  1174. buf := make([]byte, 1e6)
  1175. buf = buf[:runtime.Stack(buf, true)]
  1176. t.Logf("STACK DUMP: \n\n%s\n\n", string(buf))
  1177. t.Fatal("timeout waiting for close to finish")
  1178. case <-done:
  1179. }
  1180. }