splunk_test.go 34 KB


  1. package splunk // import "github.com/docker/docker/daemon/logger/splunk"
  2. import (
  3. "compress/gzip"
  4. "context"
  5. "fmt"
  6. "net/http"
  7. "runtime"
  8. "strconv"
  9. "testing"
  10. "time"
  11. "github.com/docker/docker/daemon/logger"
  12. "gotest.tools/v3/assert"
  13. )
  14. // Validate options
  15. func TestValidateLogOpt(t *testing.T) {
  16. err := ValidateLogOpt(map[string]string{
  17. splunkURLKey: "http://127.0.0.1",
  18. splunkTokenKey: "2160C7EF-2CE9-4307-A180-F852B99CF417",
  19. splunkSourceKey: "mysource",
  20. splunkSourceTypeKey: "mysourcetype",
  21. splunkIndexKey: "myindex",
  22. splunkCAPathKey: "/usr/cert.pem",
  23. splunkCANameKey: "ca_name",
  24. splunkInsecureSkipVerifyKey: "true",
  25. splunkFormatKey: "json",
  26. splunkVerifyConnectionKey: "true",
  27. splunkGzipCompressionKey: "true",
  28. splunkGzipCompressionLevelKey: "1",
  29. envKey: "a",
  30. envRegexKey: "^foo",
  31. labelsKey: "b",
  32. tagKey: "c",
  33. })
  34. if err != nil {
  35. t.Fatal(err)
  36. }
  37. err = ValidateLogOpt(map[string]string{
  38. "not-supported-option": "a",
  39. })
  40. if err == nil {
  41. t.Fatal("Expecting error on unsupported options")
  42. }
  43. }
  44. // Driver require user to specify required options
  45. func TestNewMissedConfig(t *testing.T) {
  46. info := logger.Info{
  47. Config: map[string]string{},
  48. }
  49. _, err := New(info)
  50. if err == nil {
  51. t.Fatal("Logger driver should fail when no required parameters specified")
  52. }
  53. }
  54. // Driver require user to specify splunk-url
  55. func TestNewMissedUrl(t *testing.T) {
  56. info := logger.Info{
  57. Config: map[string]string{
  58. splunkTokenKey: "4642492F-D8BD-47F1-A005-0C08AE4657DF",
  59. },
  60. }
  61. _, err := New(info)
  62. if err.Error() != "splunk: splunk-url is expected" {
  63. t.Fatal("Logger driver should fail when no required parameters specified")
  64. }
  65. }
  66. // Driver require user to specify splunk-token
  67. func TestNewMissedToken(t *testing.T) {
  68. info := logger.Info{
  69. Config: map[string]string{
  70. splunkURLKey: "http://127.0.0.1:8088",
  71. },
  72. }
  73. _, err := New(info)
  74. if err.Error() != "splunk: splunk-token is expected" {
  75. t.Fatal("Logger driver should fail when no required parameters specified")
  76. }
  77. }
  78. func TestNewWithProxy(t *testing.T) {
  79. proxy := "http://proxy.testing:8888"
  80. t.Setenv("HTTP_PROXY", proxy)
  81. // must not be localhost
  82. splunkURL := "http://example.com:12345"
  83. logger, err := New(logger.Info{
  84. Config: map[string]string{
  85. splunkURLKey: splunkURL,
  86. splunkTokenKey: "token",
  87. splunkVerifyConnectionKey: "false",
  88. },
  89. ContainerID: "containeriid",
  90. })
  91. assert.NilError(t, err)
  92. splunkLogger := logger.(*splunkLoggerInline)
  93. proxyFunc := splunkLogger.transport.Proxy
  94. assert.Assert(t, proxyFunc != nil)
  95. req, err := http.NewRequest(http.MethodGet, splunkURL, nil)
  96. assert.NilError(t, err)
  97. proxyURL, err := proxyFunc(req)
  98. assert.NilError(t, err)
  99. assert.Assert(t, proxyURL != nil)
  100. assert.Equal(t, proxy, proxyURL.String())
  101. }
  102. // Test default settings
  103. func TestDefault(t *testing.T) {
  104. hec := NewHTTPEventCollectorMock(t)
  105. go hec.Serve()
  106. info := logger.Info{
  107. Config: map[string]string{
  108. splunkURLKey: hec.URL(),
  109. splunkTokenKey: hec.token,
  110. },
  111. ContainerID: "containeriid",
  112. ContainerName: "container_name",
  113. ContainerImageID: "contaimageid",
  114. ContainerImageName: "container_image_name",
  115. }
  116. hostname, err := info.Hostname()
  117. if err != nil {
  118. t.Fatal(err)
  119. }
  120. loggerDriver, err := New(info)
  121. if err != nil {
  122. t.Fatal(err)
  123. }
  124. if loggerDriver.Name() != driverName {
  125. t.Fatal("Unexpected logger driver name")
  126. }
  127. if !hec.connectionVerified {
  128. t.Fatal("By default connection should be verified")
  129. }
  130. splunkLoggerDriver, ok := loggerDriver.(*splunkLoggerInline)
  131. if !ok {
  132. t.Fatal("Unexpected Splunk Logging Driver type")
  133. }
  134. if splunkLoggerDriver.url != hec.URL()+"/services/collector/event/1.0" ||
  135. splunkLoggerDriver.auth != "Splunk "+hec.token ||
  136. splunkLoggerDriver.nullMessage.Host != hostname ||
  137. splunkLoggerDriver.nullMessage.Source != "" ||
  138. splunkLoggerDriver.nullMessage.SourceType != "" ||
  139. splunkLoggerDriver.nullMessage.Index != "" ||
  140. splunkLoggerDriver.gzipCompression ||
  141. splunkLoggerDriver.postMessagesFrequency != defaultPostMessagesFrequency ||
  142. splunkLoggerDriver.postMessagesBatchSize != defaultPostMessagesBatchSize ||
  143. splunkLoggerDriver.bufferMaximum != defaultBufferMaximum ||
  144. cap(splunkLoggerDriver.stream) != defaultStreamChannelSize {
  145. t.Fatal("Found not default values setup in Splunk Logging Driver.")
  146. }
  147. message1Time := time.Now()
  148. if err := loggerDriver.Log(&logger.Message{Line: []byte("{\"a\":\"b\"}"), Source: "stdout", Timestamp: message1Time}); err != nil {
  149. t.Fatal(err)
  150. }
  151. message2Time := time.Now()
  152. if err := loggerDriver.Log(&logger.Message{Line: []byte("notajson"), Source: "stdout", Timestamp: message2Time}); err != nil {
  153. t.Fatal(err)
  154. }
  155. err = loggerDriver.Close()
  156. if err != nil {
  157. t.Fatal(err)
  158. }
  159. if len(hec.messages) != 2 {
  160. t.Fatal("Expected two messages")
  161. }
  162. if *hec.gzipEnabled {
  163. t.Fatal("Gzip should not be used")
  164. }
  165. message1 := hec.messages[0]
  166. if message1.Time != fmt.Sprintf("%f", float64(message1Time.UnixNano())/float64(time.Second)) ||
  167. message1.Host != hostname ||
  168. message1.Source != "" ||
  169. message1.SourceType != "" ||
  170. message1.Index != "" {
  171. t.Fatalf("Unexpected values of message 1 %v", message1)
  172. }
  173. if event, err := message1.EventAsMap(); err != nil {
  174. t.Fatal(err)
  175. } else {
  176. if event["line"] != "{\"a\":\"b\"}" ||
  177. event["source"] != "stdout" ||
  178. event["tag"] != "containeriid" ||
  179. len(event) != 3 {
  180. t.Fatalf("Unexpected event in message %v", event)
  181. }
  182. }
  183. message2 := hec.messages[1]
  184. if message2.Time != fmt.Sprintf("%f", float64(message2Time.UnixNano())/float64(time.Second)) ||
  185. message2.Host != hostname ||
  186. message2.Source != "" ||
  187. message2.SourceType != "" ||
  188. message2.Index != "" {
  189. t.Fatalf("Unexpected values of message 1 %v", message2)
  190. }
  191. if event, err := message2.EventAsMap(); err != nil {
  192. t.Fatal(err)
  193. } else {
  194. if event["line"] != "notajson" ||
  195. event["source"] != "stdout" ||
  196. event["tag"] != "containeriid" ||
  197. len(event) != 3 {
  198. t.Fatalf("Unexpected event in message %v", event)
  199. }
  200. }
  201. err = hec.Close()
  202. if err != nil {
  203. t.Fatal(err)
  204. }
  205. }
  206. // Verify inline format with a not default settings for most of options
  207. func TestInlineFormatWithNonDefaultOptions(t *testing.T) {
  208. hec := NewHTTPEventCollectorMock(t)
  209. go hec.Serve()
  210. info := logger.Info{
  211. Config: map[string]string{
  212. splunkURLKey: hec.URL(),
  213. splunkTokenKey: hec.token,
  214. splunkSourceKey: "mysource",
  215. splunkSourceTypeKey: "mysourcetype",
  216. splunkIndexKey: "myindex",
  217. splunkFormatKey: splunkFormatInline,
  218. splunkGzipCompressionKey: "true",
  219. tagKey: "{{.ImageName}}/{{.Name}}",
  220. labelsKey: "a",
  221. envRegexKey: "^foo",
  222. },
  223. ContainerID: "containeriid",
  224. ContainerName: "/container_name",
  225. ContainerImageID: "contaimageid",
  226. ContainerImageName: "container_image_name",
  227. ContainerLabels: map[string]string{
  228. "a": "b",
  229. },
  230. ContainerEnv: []string{"foo_finder=bar"},
  231. }
  232. hostname, err := info.Hostname()
  233. if err != nil {
  234. t.Fatal(err)
  235. }
  236. loggerDriver, err := New(info)
  237. if err != nil {
  238. t.Fatal(err)
  239. }
  240. if !hec.connectionVerified {
  241. t.Fatal("By default connection should be verified")
  242. }
  243. splunkLoggerDriver, ok := loggerDriver.(*splunkLoggerInline)
  244. if !ok {
  245. t.Fatal("Unexpected Splunk Logging Driver type")
  246. }
  247. if splunkLoggerDriver.url != hec.URL()+"/services/collector/event/1.0" ||
  248. splunkLoggerDriver.auth != "Splunk "+hec.token ||
  249. splunkLoggerDriver.nullMessage.Host != hostname ||
  250. splunkLoggerDriver.nullMessage.Source != "mysource" ||
  251. splunkLoggerDriver.nullMessage.SourceType != "mysourcetype" ||
  252. splunkLoggerDriver.nullMessage.Index != "myindex" ||
  253. !splunkLoggerDriver.gzipCompression ||
  254. splunkLoggerDriver.gzipCompressionLevel != gzip.DefaultCompression ||
  255. splunkLoggerDriver.postMessagesFrequency != defaultPostMessagesFrequency ||
  256. splunkLoggerDriver.postMessagesBatchSize != defaultPostMessagesBatchSize ||
  257. splunkLoggerDriver.bufferMaximum != defaultBufferMaximum ||
  258. cap(splunkLoggerDriver.stream) != defaultStreamChannelSize {
  259. t.Fatal("Values do not match configuration.")
  260. }
  261. messageTime := time.Now()
  262. if err := loggerDriver.Log(&logger.Message{Line: []byte("1"), Source: "stdout", Timestamp: messageTime}); err != nil {
  263. t.Fatal(err)
  264. }
  265. err = loggerDriver.Close()
  266. if err != nil {
  267. t.Fatal(err)
  268. }
  269. if len(hec.messages) != 1 {
  270. t.Fatal("Expected one message")
  271. }
  272. if !*hec.gzipEnabled {
  273. t.Fatal("Gzip should be used")
  274. }
  275. message := hec.messages[0]
  276. if message.Time != fmt.Sprintf("%f", float64(messageTime.UnixNano())/float64(time.Second)) ||
  277. message.Host != hostname ||
  278. message.Source != "mysource" ||
  279. message.SourceType != "mysourcetype" ||
  280. message.Index != "myindex" {
  281. t.Fatalf("Unexpected values of message %v", message)
  282. }
  283. if event, err := message.EventAsMap(); err != nil {
  284. t.Fatal(err)
  285. } else {
  286. if event["line"] != "1" ||
  287. event["source"] != "stdout" ||
  288. event["tag"] != "container_image_name/container_name" ||
  289. event["attrs"].(map[string]interface{})["a"] != "b" ||
  290. event["attrs"].(map[string]interface{})["foo_finder"] != "bar" ||
  291. len(event) != 4 {
  292. t.Fatalf("Unexpected event in message %v", event)
  293. }
  294. }
  295. err = hec.Close()
  296. if err != nil {
  297. t.Fatal(err)
  298. }
  299. }
  300. // Verify JSON format
  301. func TestJsonFormat(t *testing.T) {
  302. hec := NewHTTPEventCollectorMock(t)
  303. go hec.Serve()
  304. info := logger.Info{
  305. Config: map[string]string{
  306. splunkURLKey: hec.URL(),
  307. splunkTokenKey: hec.token,
  308. splunkFormatKey: splunkFormatJSON,
  309. splunkGzipCompressionKey: "true",
  310. splunkGzipCompressionLevelKey: "1",
  311. },
  312. ContainerID: "containeriid",
  313. ContainerName: "/container_name",
  314. ContainerImageID: "contaimageid",
  315. ContainerImageName: "container_image_name",
  316. }
  317. hostname, err := info.Hostname()
  318. if err != nil {
  319. t.Fatal(err)
  320. }
  321. loggerDriver, err := New(info)
  322. if err != nil {
  323. t.Fatal(err)
  324. }
  325. if !hec.connectionVerified {
  326. t.Fatal("By default connection should be verified")
  327. }
  328. splunkLoggerDriver, ok := loggerDriver.(*splunkLoggerJSON)
  329. if !ok {
  330. t.Fatal("Unexpected Splunk Logging Driver type")
  331. }
  332. if splunkLoggerDriver.url != hec.URL()+"/services/collector/event/1.0" ||
  333. splunkLoggerDriver.auth != "Splunk "+hec.token ||
  334. splunkLoggerDriver.nullMessage.Host != hostname ||
  335. splunkLoggerDriver.nullMessage.Source != "" ||
  336. splunkLoggerDriver.nullMessage.SourceType != "" ||
  337. splunkLoggerDriver.nullMessage.Index != "" ||
  338. !splunkLoggerDriver.gzipCompression ||
  339. splunkLoggerDriver.gzipCompressionLevel != gzip.BestSpeed ||
  340. splunkLoggerDriver.postMessagesFrequency != defaultPostMessagesFrequency ||
  341. splunkLoggerDriver.postMessagesBatchSize != defaultPostMessagesBatchSize ||
  342. splunkLoggerDriver.bufferMaximum != defaultBufferMaximum ||
  343. cap(splunkLoggerDriver.stream) != defaultStreamChannelSize {
  344. t.Fatal("Values do not match configuration.")
  345. }
  346. message1Time := time.Now()
  347. if err := loggerDriver.Log(&logger.Message{Line: []byte("{\"a\":\"b\"}"), Source: "stdout", Timestamp: message1Time}); err != nil {
  348. t.Fatal(err)
  349. }
  350. message2Time := time.Now()
  351. if err := loggerDriver.Log(&logger.Message{Line: []byte("notjson"), Source: "stdout", Timestamp: message2Time}); err != nil {
  352. t.Fatal(err)
  353. }
  354. err = loggerDriver.Close()
  355. if err != nil {
  356. t.Fatal(err)
  357. }
  358. if len(hec.messages) != 2 {
  359. t.Fatal("Expected two messages")
  360. }
  361. message1 := hec.messages[0]
  362. if message1.Time != fmt.Sprintf("%f", float64(message1Time.UnixNano())/float64(time.Second)) ||
  363. message1.Host != hostname ||
  364. message1.Source != "" ||
  365. message1.SourceType != "" ||
  366. message1.Index != "" {
  367. t.Fatalf("Unexpected values of message 1 %v", message1)
  368. }
  369. if event, err := message1.EventAsMap(); err != nil {
  370. t.Fatal(err)
  371. } else {
  372. if event["line"].(map[string]interface{})["a"] != "b" ||
  373. event["source"] != "stdout" ||
  374. event["tag"] != "containeriid" ||
  375. len(event) != 3 {
  376. t.Fatalf("Unexpected event in message 1 %v", event)
  377. }
  378. }
  379. message2 := hec.messages[1]
  380. if message2.Time != fmt.Sprintf("%f", float64(message2Time.UnixNano())/float64(time.Second)) ||
  381. message2.Host != hostname ||
  382. message2.Source != "" ||
  383. message2.SourceType != "" ||
  384. message2.Index != "" {
  385. t.Fatalf("Unexpected values of message 2 %v", message2)
  386. }
  387. // If message cannot be parsed as JSON - it should be sent as a line
  388. if event, err := message2.EventAsMap(); err != nil {
  389. t.Fatal(err)
  390. } else {
  391. if event["line"] != "notjson" ||
  392. event["source"] != "stdout" ||
  393. event["tag"] != "containeriid" ||
  394. len(event) != 3 {
  395. t.Fatalf("Unexpected event in message 2 %v", event)
  396. }
  397. }
  398. err = hec.Close()
  399. if err != nil {
  400. t.Fatal(err)
  401. }
  402. }
  403. // Verify raw format
  404. func TestRawFormat(t *testing.T) {
  405. hec := NewHTTPEventCollectorMock(t)
  406. go hec.Serve()
  407. info := logger.Info{
  408. Config: map[string]string{
  409. splunkURLKey: hec.URL(),
  410. splunkTokenKey: hec.token,
  411. splunkFormatKey: splunkFormatRaw,
  412. },
  413. ContainerID: "containeriid",
  414. ContainerName: "/container_name",
  415. ContainerImageID: "contaimageid",
  416. ContainerImageName: "container_image_name",
  417. }
  418. hostname, err := info.Hostname()
  419. assert.NilError(t, err)
  420. loggerDriver, err := New(info)
  421. assert.NilError(t, err)
  422. if !hec.connectionVerified {
  423. t.Fatal("By default connection should be verified")
  424. }
  425. splunkLoggerDriver, ok := loggerDriver.(*splunkLoggerRaw)
  426. if !ok {
  427. t.Fatal("Unexpected Splunk Logging Driver type")
  428. }
  429. if splunkLoggerDriver.url != hec.URL()+"/services/collector/event/1.0" ||
  430. splunkLoggerDriver.auth != "Splunk "+hec.token ||
  431. splunkLoggerDriver.nullMessage.Host != hostname ||
  432. splunkLoggerDriver.nullMessage.Source != "" ||
  433. splunkLoggerDriver.nullMessage.SourceType != "" ||
  434. splunkLoggerDriver.nullMessage.Index != "" ||
  435. splunkLoggerDriver.gzipCompression ||
  436. splunkLoggerDriver.postMessagesFrequency != defaultPostMessagesFrequency ||
  437. splunkLoggerDriver.postMessagesBatchSize != defaultPostMessagesBatchSize ||
  438. splunkLoggerDriver.bufferMaximum != defaultBufferMaximum ||
  439. cap(splunkLoggerDriver.stream) != defaultStreamChannelSize ||
  440. string(splunkLoggerDriver.prefix) != "containeriid " {
  441. t.Fatal("Values do not match configuration.")
  442. }
  443. message1Time := time.Now()
  444. if err := loggerDriver.Log(&logger.Message{Line: []byte("{\"a\":\"b\"}"), Source: "stdout", Timestamp: message1Time}); err != nil {
  445. t.Fatal(err)
  446. }
  447. message2Time := time.Now()
  448. if err := loggerDriver.Log(&logger.Message{Line: []byte("notjson"), Source: "stdout", Timestamp: message2Time}); err != nil {
  449. t.Fatal(err)
  450. }
  451. err = loggerDriver.Close()
  452. if err != nil {
  453. t.Fatal(err)
  454. }
  455. if len(hec.messages) != 2 {
  456. t.Fatal("Expected two messages")
  457. }
  458. message1 := hec.messages[0]
  459. if message1.Time != fmt.Sprintf("%f", float64(message1Time.UnixNano())/float64(time.Second)) ||
  460. message1.Host != hostname ||
  461. message1.Source != "" ||
  462. message1.SourceType != "" ||
  463. message1.Index != "" {
  464. t.Fatalf("Unexpected values of message 1 %v", message1)
  465. }
  466. if event, err := message1.EventAsString(); err != nil {
  467. t.Fatal(err)
  468. } else {
  469. if event != "containeriid {\"a\":\"b\"}" {
  470. t.Fatalf("Unexpected event in message 1 %v", event)
  471. }
  472. }
  473. message2 := hec.messages[1]
  474. if message2.Time != fmt.Sprintf("%f", float64(message2Time.UnixNano())/float64(time.Second)) ||
  475. message2.Host != hostname ||
  476. message2.Source != "" ||
  477. message2.SourceType != "" ||
  478. message2.Index != "" {
  479. t.Fatalf("Unexpected values of message 2 %v", message2)
  480. }
  481. if event, err := message2.EventAsString(); err != nil {
  482. t.Fatal(err)
  483. } else {
  484. if event != "containeriid notjson" {
  485. t.Fatalf("Unexpected event in message 1 %v", event)
  486. }
  487. }
  488. err = hec.Close()
  489. if err != nil {
  490. t.Fatal(err)
  491. }
  492. }
  493. // Verify raw format with labels
  494. func TestRawFormatWithLabels(t *testing.T) {
  495. hec := NewHTTPEventCollectorMock(t)
  496. go hec.Serve()
  497. info := logger.Info{
  498. Config: map[string]string{
  499. splunkURLKey: hec.URL(),
  500. splunkTokenKey: hec.token,
  501. splunkFormatKey: splunkFormatRaw,
  502. labelsKey: "a",
  503. },
  504. ContainerID: "containeriid",
  505. ContainerName: "/container_name",
  506. ContainerImageID: "contaimageid",
  507. ContainerImageName: "container_image_name",
  508. ContainerLabels: map[string]string{
  509. "a": "b",
  510. },
  511. }
  512. hostname, err := info.Hostname()
  513. if err != nil {
  514. t.Fatal(err)
  515. }
  516. loggerDriver, err := New(info)
  517. if err != nil {
  518. t.Fatal(err)
  519. }
  520. if !hec.connectionVerified {
  521. t.Fatal("By default connection should be verified")
  522. }
  523. splunkLoggerDriver, ok := loggerDriver.(*splunkLoggerRaw)
  524. if !ok {
  525. t.Fatal("Unexpected Splunk Logging Driver type")
  526. }
  527. if splunkLoggerDriver.url != hec.URL()+"/services/collector/event/1.0" ||
  528. splunkLoggerDriver.auth != "Splunk "+hec.token ||
  529. splunkLoggerDriver.nullMessage.Host != hostname ||
  530. splunkLoggerDriver.nullMessage.Source != "" ||
  531. splunkLoggerDriver.nullMessage.SourceType != "" ||
  532. splunkLoggerDriver.nullMessage.Index != "" ||
  533. splunkLoggerDriver.gzipCompression ||
  534. splunkLoggerDriver.postMessagesFrequency != defaultPostMessagesFrequency ||
  535. splunkLoggerDriver.postMessagesBatchSize != defaultPostMessagesBatchSize ||
  536. splunkLoggerDriver.bufferMaximum != defaultBufferMaximum ||
  537. cap(splunkLoggerDriver.stream) != defaultStreamChannelSize ||
  538. string(splunkLoggerDriver.prefix) != "containeriid a=b " {
  539. t.Fatal("Values do not match configuration.")
  540. }
  541. message1Time := time.Now()
  542. if err := loggerDriver.Log(&logger.Message{Line: []byte("{\"a\":\"b\"}"), Source: "stdout", Timestamp: message1Time}); err != nil {
  543. t.Fatal(err)
  544. }
  545. message2Time := time.Now()
  546. if err := loggerDriver.Log(&logger.Message{Line: []byte("notjson"), Source: "stdout", Timestamp: message2Time}); err != nil {
  547. t.Fatal(err)
  548. }
  549. err = loggerDriver.Close()
  550. if err != nil {
  551. t.Fatal(err)
  552. }
  553. if len(hec.messages) != 2 {
  554. t.Fatal("Expected two messages")
  555. }
  556. message1 := hec.messages[0]
  557. if message1.Time != fmt.Sprintf("%f", float64(message1Time.UnixNano())/float64(time.Second)) ||
  558. message1.Host != hostname ||
  559. message1.Source != "" ||
  560. message1.SourceType != "" ||
  561. message1.Index != "" {
  562. t.Fatalf("Unexpected values of message 1 %v", message1)
  563. }
  564. if event, err := message1.EventAsString(); err != nil {
  565. t.Fatal(err)
  566. } else {
  567. if event != "containeriid a=b {\"a\":\"b\"}" {
  568. t.Fatalf("Unexpected event in message 1 %v", event)
  569. }
  570. }
  571. message2 := hec.messages[1]
  572. if message2.Time != fmt.Sprintf("%f", float64(message2Time.UnixNano())/float64(time.Second)) ||
  573. message2.Host != hostname ||
  574. message2.Source != "" ||
  575. message2.SourceType != "" ||
  576. message2.Index != "" {
  577. t.Fatalf("Unexpected values of message 2 %v", message2)
  578. }
  579. if event, err := message2.EventAsString(); err != nil {
  580. t.Fatal(err)
  581. } else {
  582. if event != "containeriid a=b notjson" {
  583. t.Fatalf("Unexpected event in message 2 %v", event)
  584. }
  585. }
  586. err = hec.Close()
  587. if err != nil {
  588. t.Fatal(err)
  589. }
  590. }
  591. // Verify that Splunk Logging Driver can accept tag="" which will allow to send raw messages
  592. // in the same way we get them in stdout/stderr
  593. func TestRawFormatWithoutTag(t *testing.T) {
  594. hec := NewHTTPEventCollectorMock(t)
  595. go hec.Serve()
  596. info := logger.Info{
  597. Config: map[string]string{
  598. splunkURLKey: hec.URL(),
  599. splunkTokenKey: hec.token,
  600. splunkFormatKey: splunkFormatRaw,
  601. tagKey: "",
  602. },
  603. ContainerID: "containeriid",
  604. ContainerName: "/container_name",
  605. ContainerImageID: "contaimageid",
  606. ContainerImageName: "container_image_name",
  607. }
  608. hostname, err := info.Hostname()
  609. if err != nil {
  610. t.Fatal(err)
  611. }
  612. loggerDriver, err := New(info)
  613. if err != nil {
  614. t.Fatal(err)
  615. }
  616. if !hec.connectionVerified {
  617. t.Fatal("By default connection should be verified")
  618. }
  619. splunkLoggerDriver, ok := loggerDriver.(*splunkLoggerRaw)
  620. if !ok {
  621. t.Fatal("Unexpected Splunk Logging Driver type")
  622. }
  623. if splunkLoggerDriver.url != hec.URL()+"/services/collector/event/1.0" ||
  624. splunkLoggerDriver.auth != "Splunk "+hec.token ||
  625. splunkLoggerDriver.nullMessage.Host != hostname ||
  626. splunkLoggerDriver.nullMessage.Source != "" ||
  627. splunkLoggerDriver.nullMessage.SourceType != "" ||
  628. splunkLoggerDriver.nullMessage.Index != "" ||
  629. splunkLoggerDriver.gzipCompression ||
  630. splunkLoggerDriver.postMessagesFrequency != defaultPostMessagesFrequency ||
  631. splunkLoggerDriver.postMessagesBatchSize != defaultPostMessagesBatchSize ||
  632. splunkLoggerDriver.bufferMaximum != defaultBufferMaximum ||
  633. cap(splunkLoggerDriver.stream) != defaultStreamChannelSize ||
  634. string(splunkLoggerDriver.prefix) != "" {
  635. t.Log(string(splunkLoggerDriver.prefix) + "a")
  636. t.Fatal("Values do not match configuration.")
  637. }
  638. message1Time := time.Now()
  639. if err := loggerDriver.Log(&logger.Message{Line: []byte("{\"a\":\"b\"}"), Source: "stdout", Timestamp: message1Time}); err != nil {
  640. t.Fatal(err)
  641. }
  642. message2Time := time.Now()
  643. if err := loggerDriver.Log(&logger.Message{Line: []byte("notjson"), Source: "stdout", Timestamp: message2Time}); err != nil {
  644. t.Fatal(err)
  645. }
  646. message3Time := time.Now()
  647. if err := loggerDriver.Log(&logger.Message{Line: []byte(" "), Source: "stdout", Timestamp: message3Time}); err != nil {
  648. t.Fatal(err)
  649. }
  650. err = loggerDriver.Close()
  651. if err != nil {
  652. t.Fatal(err)
  653. }
  654. // message3 would have an empty or whitespace only string in the "event" field
  655. // both of which are not acceptable to HEC
  656. // thus here we must expect 2 messages, not 3
  657. if len(hec.messages) != 2 {
  658. t.Fatal("Expected two messages")
  659. }
  660. message1 := hec.messages[0]
  661. if message1.Time != fmt.Sprintf("%f", float64(message1Time.UnixNano())/float64(time.Second)) ||
  662. message1.Host != hostname ||
  663. message1.Source != "" ||
  664. message1.SourceType != "" ||
  665. message1.Index != "" {
  666. t.Fatalf("Unexpected values of message 1 %v", message1)
  667. }
  668. if event, err := message1.EventAsString(); err != nil {
  669. t.Fatal(err)
  670. } else {
  671. if event != "{\"a\":\"b\"}" {
  672. t.Fatalf("Unexpected event in message 1 %v", event)
  673. }
  674. }
  675. message2 := hec.messages[1]
  676. if message2.Time != fmt.Sprintf("%f", float64(message2Time.UnixNano())/float64(time.Second)) ||
  677. message2.Host != hostname ||
  678. message2.Source != "" ||
  679. message2.SourceType != "" ||
  680. message2.Index != "" {
  681. t.Fatalf("Unexpected values of message 2 %v", message2)
  682. }
  683. if event, err := message2.EventAsString(); err != nil {
  684. t.Fatal(err)
  685. } else {
  686. if event != "notjson" {
  687. t.Fatalf("Unexpected event in message 2 %v", event)
  688. }
  689. }
  690. err = hec.Close()
  691. if err != nil {
  692. t.Fatal(err)
  693. }
  694. }
  695. // Verify that we will send messages in batches with default batching parameters,
  696. // but change frequency to be sure that numOfRequests will match expected 17 requests
  697. func TestBatching(t *testing.T) {
  698. t.Setenv(envVarPostMessagesFrequency, "10h")
  699. hec := NewHTTPEventCollectorMock(t)
  700. go hec.Serve()
  701. info := logger.Info{
  702. Config: map[string]string{
  703. splunkURLKey: hec.URL(),
  704. splunkTokenKey: hec.token,
  705. },
  706. ContainerID: "containeriid",
  707. ContainerName: "/container_name",
  708. ContainerImageID: "contaimageid",
  709. ContainerImageName: "container_image_name",
  710. }
  711. loggerDriver, err := New(info)
  712. if err != nil {
  713. t.Fatal(err)
  714. }
  715. for i := 0; i < defaultStreamChannelSize*4; i++ {
  716. if err := loggerDriver.Log(&logger.Message{Line: []byte(strconv.Itoa(i)), Source: "stdout", Timestamp: time.Now()}); err != nil {
  717. t.Fatal(err)
  718. }
  719. }
  720. err = loggerDriver.Close()
  721. if err != nil {
  722. t.Fatal(err)
  723. }
  724. if len(hec.messages) != defaultStreamChannelSize*4 {
  725. t.Fatal("Not all messages delivered")
  726. }
  727. for i, message := range hec.messages {
  728. if event, err := message.EventAsMap(); err != nil {
  729. t.Fatal(err)
  730. } else {
  731. if event["line"] != strconv.Itoa(i) {
  732. t.Fatalf("Unexpected event in message %v", event)
  733. }
  734. }
  735. }
  736. // 1 to verify connection and 16 batches
  737. if hec.numOfRequests != 17 {
  738. t.Fatalf("Unexpected number of requests %d", hec.numOfRequests)
  739. }
  740. err = hec.Close()
  741. if err != nil {
  742. t.Fatal(err)
  743. }
  744. }
  745. // Verify that test is using time to fire events not rare than specified frequency
  746. func TestFrequency(t *testing.T) {
  747. t.Setenv(envVarPostMessagesFrequency, "5ms")
  748. hec := NewHTTPEventCollectorMock(t)
  749. go hec.Serve()
  750. info := logger.Info{
  751. Config: map[string]string{
  752. splunkURLKey: hec.URL(),
  753. splunkTokenKey: hec.token,
  754. },
  755. ContainerID: "containeriid",
  756. ContainerName: "/container_name",
  757. ContainerImageID: "contaimageid",
  758. ContainerImageName: "container_image_name",
  759. }
  760. loggerDriver, err := New(info)
  761. if err != nil {
  762. t.Fatal(err)
  763. }
  764. for i := 0; i < 10; i++ {
  765. if err := loggerDriver.Log(&logger.Message{Line: []byte(strconv.Itoa(i)), Source: "stdout", Timestamp: time.Now()}); err != nil {
  766. t.Fatal(err)
  767. }
  768. time.Sleep(15 * time.Millisecond)
  769. }
  770. err = loggerDriver.Close()
  771. if err != nil {
  772. t.Fatal(err)
  773. }
  774. if len(hec.messages) != 10 {
  775. t.Fatal("Not all messages delivered")
  776. }
  777. for i, message := range hec.messages {
  778. if event, err := message.EventAsMap(); err != nil {
  779. t.Fatal(err)
  780. } else {
  781. if event["line"] != strconv.Itoa(i) {
  782. t.Fatalf("Unexpected event in message %v", event)
  783. }
  784. }
  785. }
  786. // 1 to verify connection and 10 to verify that we have sent messages with required frequency,
  787. // but because frequency is too small (to keep test quick), instead of 11, use 9 if context switches will be slow
  788. expectedRequests := 9
  789. if runtime.GOOS == "windows" {
  790. // sometimes in Windows, this test fails with number of requests showing 8. So be more conservative.
  791. expectedRequests = 7
  792. }
  793. if hec.numOfRequests < expectedRequests {
  794. t.Fatalf("Unexpected number of requests %d", hec.numOfRequests)
  795. }
  796. err = hec.Close()
  797. if err != nil {
  798. t.Fatal(err)
  799. }
  800. }
  801. // Simulate behavior similar to first version of Splunk Logging Driver, when we were sending one message
  802. // per request
  803. func TestOneMessagePerRequest(t *testing.T) {
  804. t.Setenv(envVarPostMessagesFrequency, "10h")
  805. t.Setenv(envVarPostMessagesBatchSize, "1")
  806. t.Setenv(envVarBufferMaximum, "1")
  807. t.Setenv(envVarStreamChannelSize, "0")
  808. hec := NewHTTPEventCollectorMock(t)
  809. go hec.Serve()
  810. info := logger.Info{
  811. Config: map[string]string{
  812. splunkURLKey: hec.URL(),
  813. splunkTokenKey: hec.token,
  814. },
  815. ContainerID: "containeriid",
  816. ContainerName: "/container_name",
  817. ContainerImageID: "contaimageid",
  818. ContainerImageName: "container_image_name",
  819. }
  820. loggerDriver, err := New(info)
  821. if err != nil {
  822. t.Fatal(err)
  823. }
  824. for i := 0; i < 10; i++ {
  825. if err := loggerDriver.Log(&logger.Message{Line: []byte(strconv.Itoa(i)), Source: "stdout", Timestamp: time.Now()}); err != nil {
  826. t.Fatal(err)
  827. }
  828. }
  829. err = loggerDriver.Close()
  830. if err != nil {
  831. t.Fatal(err)
  832. }
  833. if len(hec.messages) != 10 {
  834. t.Fatal("Not all messages delivered")
  835. }
  836. for i, message := range hec.messages {
  837. if event, err := message.EventAsMap(); err != nil {
  838. t.Fatal(err)
  839. } else {
  840. if event["line"] != strconv.Itoa(i) {
  841. t.Fatalf("Unexpected event in message %v", event)
  842. }
  843. }
  844. }
  845. // 1 to verify connection and 10 messages
  846. if hec.numOfRequests != 11 {
  847. t.Fatalf("Unexpected number of requests %d", hec.numOfRequests)
  848. }
  849. err = hec.Close()
  850. if err != nil {
  851. t.Fatal(err)
  852. }
  853. }
  854. // Driver should not be created when HEC is unresponsive
  855. func TestVerify(t *testing.T) {
  856. hec := NewHTTPEventCollectorMock(t)
  857. hec.simulateServerError = true
  858. go hec.Serve()
  859. info := logger.Info{
  860. Config: map[string]string{
  861. splunkURLKey: hec.URL(),
  862. splunkTokenKey: hec.token,
  863. },
  864. ContainerID: "containeriid",
  865. ContainerName: "/container_name",
  866. ContainerImageID: "contaimageid",
  867. ContainerImageName: "container_image_name",
  868. }
  869. _, err := New(info)
  870. if err == nil {
  871. t.Fatal("Expecting driver to fail, when server is unresponsive")
  872. }
  873. err = hec.Close()
  874. if err != nil {
  875. t.Fatal(err)
  876. }
  877. }
  878. // Verify that user can specify to skip verification that Splunk HEC is working.
  879. // Also in this test we verify retry logic.
  880. func TestSkipVerify(t *testing.T) {
  881. hec := NewHTTPEventCollectorMock(t)
  882. hec.simulateServerError = true
  883. go hec.Serve()
  884. info := logger.Info{
  885. Config: map[string]string{
  886. splunkURLKey: hec.URL(),
  887. splunkTokenKey: hec.token,
  888. splunkVerifyConnectionKey: "false",
  889. },
  890. ContainerID: "containeriid",
  891. ContainerName: "/container_name",
  892. ContainerImageID: "contaimageid",
  893. ContainerImageName: "container_image_name",
  894. }
  895. loggerDriver, err := New(info)
  896. if err != nil {
  897. t.Fatal(err)
  898. }
  899. if hec.connectionVerified {
  900. t.Fatal("Connection should not be verified")
  901. }
  902. for i := 0; i < defaultStreamChannelSize*2; i++ {
  903. if err := loggerDriver.Log(&logger.Message{Line: []byte(strconv.Itoa(i)), Source: "stdout", Timestamp: time.Now()}); err != nil {
  904. t.Fatal(err)
  905. }
  906. }
  907. if len(hec.messages) != 0 {
  908. t.Fatal("No messages should be accepted at this point")
  909. }
  910. hec.simulateErr(false)
  911. for i := defaultStreamChannelSize * 2; i < defaultStreamChannelSize*4; i++ {
  912. if err := loggerDriver.Log(&logger.Message{Line: []byte(strconv.Itoa(i)), Source: "stdout", Timestamp: time.Now()}); err != nil {
  913. t.Fatal(err)
  914. }
  915. }
  916. err = loggerDriver.Close()
  917. if err != nil {
  918. t.Fatal(err)
  919. }
  920. if len(hec.messages) != defaultStreamChannelSize*4 {
  921. t.Fatal("Not all messages delivered")
  922. }
  923. for i, message := range hec.messages {
  924. if event, err := message.EventAsMap(); err != nil {
  925. t.Fatal(err)
  926. } else {
  927. if event["line"] != strconv.Itoa(i) {
  928. t.Fatalf("Unexpected event in message %v", event)
  929. }
  930. }
  931. }
  932. err = hec.Close()
  933. if err != nil {
  934. t.Fatal(err)
  935. }
  936. }
  937. // Verify logic for when we filled whole buffer
  938. func TestBufferMaximum(t *testing.T) {
  939. t.Setenv(envVarPostMessagesBatchSize, "2")
  940. t.Setenv(envVarBufferMaximum, "10")
  941. t.Setenv(envVarStreamChannelSize, "0")
  942. hec := NewHTTPEventCollectorMock(t)
  943. hec.simulateErr(true)
  944. go hec.Serve()
  945. info := logger.Info{
  946. Config: map[string]string{
  947. splunkURLKey: hec.URL(),
  948. splunkTokenKey: hec.token,
  949. splunkVerifyConnectionKey: "false",
  950. },
  951. ContainerID: "containeriid",
  952. ContainerName: "/container_name",
  953. ContainerImageID: "contaimageid",
  954. ContainerImageName: "container_image_name",
  955. }
  956. loggerDriver, err := New(info)
  957. if err != nil {
  958. t.Fatal(err)
  959. }
  960. if hec.connectionVerified {
  961. t.Fatal("Connection should not be verified")
  962. }
  963. for i := 0; i < 11; i++ {
  964. if err := loggerDriver.Log(&logger.Message{Line: []byte(strconv.Itoa(i)), Source: "stdout", Timestamp: time.Now()}); err != nil {
  965. t.Fatal(err)
  966. }
  967. }
  968. if len(hec.messages) != 0 {
  969. t.Fatal("No messages should be accepted at this point")
  970. }
  971. hec.simulateServerError = false
  972. err = loggerDriver.Close()
  973. if err != nil {
  974. t.Fatal(err)
  975. }
  976. if len(hec.messages) != 9 {
  977. t.Fatalf("Expected # of messages %d, got %d", 9, len(hec.messages))
  978. }
  979. // First 1000 messages are written to daemon log when buffer was full
  980. for i, message := range hec.messages {
  981. if event, err := message.EventAsMap(); err != nil {
  982. t.Fatal(err)
  983. } else {
  984. if event["line"] != fmt.Sprintf("%d", i+2) {
  985. t.Fatalf("Unexpected event in message %v", event)
  986. }
  987. }
  988. }
  989. err = hec.Close()
  990. if err != nil {
  991. t.Fatal(err)
  992. }
  993. }
  994. // Verify that we are not blocking close when HEC is down for the whole time
  995. func TestServerAlwaysDown(t *testing.T) {
  996. t.Setenv(envVarPostMessagesBatchSize, "2")
  997. t.Setenv(envVarBufferMaximum, "4")
  998. t.Setenv(envVarStreamChannelSize, "0")
  999. hec := NewHTTPEventCollectorMock(t)
  1000. hec.simulateServerError = true
  1001. go hec.Serve()
  1002. info := logger.Info{
  1003. Config: map[string]string{
  1004. splunkURLKey: hec.URL(),
  1005. splunkTokenKey: hec.token,
  1006. splunkVerifyConnectionKey: "false",
  1007. },
  1008. ContainerID: "containeriid",
  1009. ContainerName: "/container_name",
  1010. ContainerImageID: "contaimageid",
  1011. ContainerImageName: "container_image_name",
  1012. }
  1013. loggerDriver, err := New(info)
  1014. if err != nil {
  1015. t.Fatal(err)
  1016. }
  1017. if hec.connectionVerified {
  1018. t.Fatal("Connection should not be verified")
  1019. }
  1020. for i := 0; i < 5; i++ {
  1021. if err := loggerDriver.Log(&logger.Message{Line: []byte(strconv.Itoa(i)), Source: "stdout", Timestamp: time.Now()}); err != nil {
  1022. t.Fatal(err)
  1023. }
  1024. }
  1025. err = loggerDriver.Close()
  1026. if err != nil {
  1027. t.Fatal(err)
  1028. }
  1029. if len(hec.messages) != 0 {
  1030. t.Fatal("No messages should be sent")
  1031. }
  1032. err = hec.Close()
  1033. if err != nil {
  1034. t.Fatal(err)
  1035. }
  1036. }
  1037. // Cannot send messages after we close driver
  1038. func TestCannotSendAfterClose(t *testing.T) {
  1039. hec := NewHTTPEventCollectorMock(t)
  1040. go hec.Serve()
  1041. info := logger.Info{
  1042. Config: map[string]string{
  1043. splunkURLKey: hec.URL(),
  1044. splunkTokenKey: hec.token,
  1045. },
  1046. ContainerID: "containeriid",
  1047. ContainerName: "/container_name",
  1048. ContainerImageID: "contaimageid",
  1049. ContainerImageName: "container_image_name",
  1050. }
  1051. loggerDriver, err := New(info)
  1052. if err != nil {
  1053. t.Fatal(err)
  1054. }
  1055. if err := loggerDriver.Log(&logger.Message{Line: []byte("message1"), Source: "stdout", Timestamp: time.Now()}); err != nil {
  1056. t.Fatal(err)
  1057. }
  1058. err = loggerDriver.Close()
  1059. if err != nil {
  1060. t.Fatal(err)
  1061. }
  1062. if err := loggerDriver.Log(&logger.Message{Line: []byte("message2"), Source: "stdout", Timestamp: time.Now()}); err == nil {
  1063. t.Fatal("Driver should not allow to send messages after close")
  1064. }
  1065. if len(hec.messages) != 1 {
  1066. t.Fatal("Only one message should be sent")
  1067. }
  1068. message := hec.messages[0]
  1069. if event, err := message.EventAsMap(); err != nil {
  1070. t.Fatal(err)
  1071. } else {
  1072. if event["line"] != "message1" {
  1073. t.Fatalf("Unexpected event in message %v", event)
  1074. }
  1075. }
  1076. err = hec.Close()
  1077. if err != nil {
  1078. t.Fatal(err)
  1079. }
  1080. }
  1081. func TestDeadlockOnBlockedEndpoint(t *testing.T) {
  1082. hec := NewHTTPEventCollectorMock(t)
  1083. go hec.Serve()
  1084. info := logger.Info{
  1085. Config: map[string]string{
  1086. splunkURLKey: hec.URL(),
  1087. splunkTokenKey: hec.token,
  1088. },
  1089. ContainerID: "containeriid",
  1090. ContainerName: "/container_name",
  1091. ContainerImageID: "contaimageid",
  1092. ContainerImageName: "container_image_name",
  1093. }
  1094. l, err := New(info)
  1095. if err != nil {
  1096. t.Fatal(err)
  1097. }
  1098. ctx, unblock := context.WithCancel(context.Background())
  1099. hec.withBlock(ctx)
  1100. defer unblock()
  1101. batchSendTimeout = 1 * time.Second
  1102. if err := l.Log(&logger.Message{}); err != nil {
  1103. t.Fatal(err)
  1104. }
  1105. done := make(chan struct{})
  1106. go func() {
  1107. l.Close()
  1108. close(done)
  1109. }()
  1110. select {
  1111. case <-time.After(60 * time.Second):
  1112. buf := make([]byte, 1e6)
  1113. buf = buf[:runtime.Stack(buf, true)]
  1114. t.Logf("STACK DUMP: \n\n%s\n\n", string(buf))
  1115. t.Fatal("timeout waiting for close to finish")
  1116. case <-done:
  1117. }
  1118. }