kafka_test.go 5.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262
  1. package kafkaacquisition
  2. import (
  3. "context"
  4. "net"
  5. "runtime"
  6. "strconv"
  7. "testing"
  8. "time"
  9. "github.com/crowdsecurity/crowdsec/pkg/cstest"
  10. "github.com/crowdsecurity/crowdsec/pkg/types"
  11. "github.com/segmentio/kafka-go"
  12. log "github.com/sirupsen/logrus"
  13. "gopkg.in/tomb.v2"
  14. "gotest.tools/v3/assert"
  15. )
  16. func TestConfigure(t *testing.T) {
  17. tests := []struct {
  18. config string
  19. expectedErr string
  20. }{
  21. {
  22. config: `
  23. foobar: bla
  24. source: kafka`,
  25. expectedErr: "line 2: field foobar not found in type kafkaacquisition.KafkaConfiguration",
  26. },
  27. {
  28. config: `source: kafka`,
  29. expectedErr: "cannot create a kafka reader with an empty list of broker addresses",
  30. },
  31. {
  32. config: `
  33. source: kafka
  34. brokers:
  35. - bla
  36. timeout: 5`,
  37. expectedErr: "cannot create a kafka reader with am empty topic",
  38. },
  39. {
  40. config: `
  41. source: kafka
  42. brokers:
  43. - bla
  44. topic: toto
  45. timeout: aa`,
  46. expectedErr: "cannot create kafka dialer: strconv.Atoi: parsing \"aa\": invalid syntax",
  47. },
  48. {
  49. config: `
  50. source: kafka
  51. brokers:
  52. - localhost:9092
  53. topic: crowdsec`,
  54. expectedErr: "",
  55. },
  56. }
  57. subLogger := log.WithFields(log.Fields{
  58. "type": "kafka",
  59. })
  60. for _, test := range tests {
  61. k := KafkaSource{}
  62. err := k.Configure([]byte(test.config), subLogger)
  63. cstest.AssertErrorContains(t, err, test.expectedErr)
  64. }
  65. }
  66. func writeToKafka(w *kafka.Writer, logs []string) {
  67. for idx, log := range logs {
  68. err := w.WriteMessages(context.Background(), kafka.Message{
  69. Key: []byte(strconv.Itoa(idx)),
  70. // create an arbitrary message payload for the value
  71. Value: []byte(log),
  72. })
  73. if err != nil {
  74. panic("could not write message " + err.Error())
  75. }
  76. }
  77. }
  78. func createTopic(topic string, broker string) {
  79. conn, err := kafka.Dial("tcp", broker)
  80. if err != nil {
  81. panic(err.Error())
  82. }
  83. defer conn.Close()
  84. controller, err := conn.Controller()
  85. if err != nil {
  86. panic(err.Error())
  87. }
  88. var controllerConn *kafka.Conn
  89. controllerConn, err = kafka.Dial("tcp", net.JoinHostPort(controller.Host, strconv.Itoa(controller.Port)))
  90. if err != nil {
  91. panic(err.Error())
  92. }
  93. defer controllerConn.Close()
  94. topicConfigs := []kafka.TopicConfig{
  95. {
  96. Topic: topic,
  97. NumPartitions: 1,
  98. ReplicationFactor: 1,
  99. },
  100. }
  101. err = controllerConn.CreateTopics(topicConfigs...)
  102. if err != nil {
  103. panic(err.Error())
  104. }
  105. }
  106. func TestStreamingAcquisition(t *testing.T) {
  107. if runtime.GOOS == "windows" {
  108. t.Skip("Skipping test on windows")
  109. }
  110. tests := []struct {
  111. name string
  112. logs []string
  113. expectedLines int
  114. expectedErr string
  115. }{
  116. {
  117. name: "valid msgs",
  118. logs: []string{
  119. "message 1",
  120. "message 2",
  121. "message 3",
  122. },
  123. expectedLines: 3,
  124. },
  125. }
  126. subLogger := log.WithFields(log.Fields{
  127. "type": "kafka",
  128. })
  129. createTopic("crowdsecplaintext", "localhost:9092")
  130. w := kafka.NewWriter(kafka.WriterConfig{
  131. Brokers: []string{"localhost:9092"},
  132. Topic: "crowdsecplaintext",
  133. })
  134. if w == nil {
  135. log.Fatalf("Unable to setup a kafka producer")
  136. }
  137. for _, ts := range tests {
  138. ts := ts
  139. t.Run(ts.name, func(t *testing.T) {
  140. k := KafkaSource{}
  141. err := k.Configure([]byte(`
  142. source: kafka
  143. brokers:
  144. - localhost:9092
  145. topic: crowdsecplaintext`), subLogger)
  146. if err != nil {
  147. t.Fatalf("could not configure kafka source : %s", err)
  148. }
  149. tomb := tomb.Tomb{}
  150. out := make(chan types.Event)
  151. err = k.StreamingAcquisition(out, &tomb)
  152. cstest.AssertErrorContains(t, err, ts.expectedErr)
  153. actualLines := 0
  154. go writeToKafka(w, ts.logs)
  155. READLOOP:
  156. for {
  157. select {
  158. case <-out:
  159. actualLines++
  160. case <-time.After(2 * time.Second):
  161. break READLOOP
  162. }
  163. }
  164. assert.Equal(t, ts.expectedLines, actualLines)
  165. tomb.Kill(nil)
  166. tomb.Wait()
  167. })
  168. }
  169. }
  170. func TestStreamingAcquisitionWithSSL(t *testing.T) {
  171. if runtime.GOOS == "windows" {
  172. t.Skip("Skipping test on windows")
  173. }
  174. tests := []struct {
  175. name string
  176. logs []string
  177. expectedLines int
  178. expectedErr string
  179. }{
  180. {
  181. name: "valid msgs",
  182. logs: []string{
  183. "message 1",
  184. "message 2",
  185. },
  186. expectedLines: 2,
  187. },
  188. }
  189. subLogger := log.WithFields(log.Fields{
  190. "type": "kafka",
  191. })
  192. createTopic("crowdsecssl", "localhost:9092")
  193. w2 := kafka.NewWriter(kafka.WriterConfig{
  194. Brokers: []string{"localhost:9092"},
  195. Topic: "crowdsecssl",
  196. })
  197. if w2 == nil {
  198. log.Fatalf("Unable to setup a kafka producer")
  199. }
  200. for _, ts := range tests {
  201. ts := ts
  202. t.Run(ts.name, func(t *testing.T) {
  203. k := KafkaSource{}
  204. err := k.Configure([]byte(`
  205. source: kafka
  206. brokers:
  207. - localhost:9093
  208. topic: crowdsecssl
  209. tls:
  210. insecure_skip_verify: true
  211. client_cert: ./testdata/kafkaClient.certificate.pem
  212. client_key: ./testdata/kafkaClient.key
  213. ca_cert: ./testdata/snakeoil-ca-1.crt
  214. `), subLogger)
  215. if err != nil {
  216. t.Fatalf("could not configure kafka source : %s", err)
  217. }
  218. tomb := tomb.Tomb{}
  219. out := make(chan types.Event)
  220. err = k.StreamingAcquisition(out, &tomb)
  221. cstest.AssertErrorContains(t, err, ts.expectedErr)
  222. actualLines := 0
  223. go writeToKafka(w2, ts.logs)
  224. READLOOP:
  225. for {
  226. select {
  227. case <-out:
  228. actualLines++
  229. case <-time.After(2 * time.Second):
  230. break READLOOP
  231. }
  232. }
  233. assert.Equal(t, ts.expectedLines, actualLines)
  234. tomb.Kill(nil)
  235. tomb.Wait()
  236. })
  237. }
  238. }