kafka_test.go 5.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274
  1. package kafkaacquisition
  2. import (
  3. "context"
  4. "net"
  5. "runtime"
  6. "strconv"
  7. "testing"
  8. "time"
  9. "github.com/segmentio/kafka-go"
  10. log "github.com/sirupsen/logrus"
  11. "github.com/stretchr/testify/require"
  12. "gopkg.in/tomb.v2"
  13. "github.com/crowdsecurity/go-cs-lib/cstest"
  14. "github.com/crowdsecurity/crowdsec/pkg/types"
  15. )
  16. func TestConfigure(t *testing.T) {
  17. tests := []struct {
  18. config string
  19. expectedErr string
  20. }{
  21. {
  22. config: `
  23. foobar: bla
  24. source: kafka`,
  25. expectedErr: "line 2: field foobar not found in type kafkaacquisition.KafkaConfiguration",
  26. },
  27. {
  28. config: `source: kafka`,
  29. expectedErr: "cannot create a kafka reader with an empty list of broker addresses",
  30. },
  31. {
  32. config: `
  33. source: kafka
  34. brokers:
  35. - bla
  36. timeout: 5`,
  37. expectedErr: "cannot create a kafka reader with am empty topic",
  38. },
  39. {
  40. config: `
  41. source: kafka
  42. brokers:
  43. - bla
  44. topic: toto
  45. timeout: aa`,
  46. expectedErr: "cannot create kafka dialer: strconv.Atoi: parsing \"aa\": invalid syntax",
  47. },
  48. {
  49. config: `
  50. source: kafka
  51. brokers:
  52. - localhost:9092
  53. topic: crowdsec`,
  54. expectedErr: "",
  55. },
  56. {
  57. config: `
  58. source: kafka
  59. brokers:
  60. - localhost:9092
  61. topic: crowdsec
  62. partition: 1
  63. group_id: crowdsec`,
  64. expectedErr: "cannote create kafka reader: cannot specify both group_id and partition",
  65. },
  66. }
  67. subLogger := log.WithFields(log.Fields{
  68. "type": "kafka",
  69. })
  70. for _, test := range tests {
  71. k := KafkaSource{}
  72. err := k.Configure([]byte(test.config), subLogger)
  73. cstest.AssertErrorContains(t, err, test.expectedErr)
  74. }
  75. }
  76. func writeToKafka(w *kafka.Writer, logs []string) {
  77. for idx, log := range logs {
  78. err := w.WriteMessages(context.Background(), kafka.Message{
  79. Key: []byte(strconv.Itoa(idx)),
  80. // create an arbitrary message payload for the value
  81. Value: []byte(log),
  82. })
  83. if err != nil {
  84. panic("could not write message " + err.Error())
  85. }
  86. }
  87. }
  88. func createTopic(topic string, broker string) {
  89. conn, err := kafka.Dial("tcp", broker)
  90. if err != nil {
  91. panic(err)
  92. }
  93. defer conn.Close()
  94. controller, err := conn.Controller()
  95. if err != nil {
  96. panic(err)
  97. }
  98. var controllerConn *kafka.Conn
  99. controllerConn, err = kafka.Dial("tcp", net.JoinHostPort(controller.Host, strconv.Itoa(controller.Port)))
  100. if err != nil {
  101. panic(err)
  102. }
  103. defer controllerConn.Close()
  104. topicConfigs := []kafka.TopicConfig{
  105. {
  106. Topic: topic,
  107. NumPartitions: 1,
  108. ReplicationFactor: 1,
  109. },
  110. }
  111. err = controllerConn.CreateTopics(topicConfigs...)
  112. if err != nil {
  113. panic(err)
  114. }
  115. }
  116. func TestStreamingAcquisition(t *testing.T) {
  117. if runtime.GOOS == "windows" {
  118. t.Skip("Skipping test on windows")
  119. }
  120. tests := []struct {
  121. name string
  122. logs []string
  123. expectedLines int
  124. expectedErr string
  125. }{
  126. {
  127. name: "valid msgs",
  128. logs: []string{
  129. "message 1",
  130. "message 2",
  131. "message 3",
  132. },
  133. expectedLines: 3,
  134. },
  135. }
  136. subLogger := log.WithFields(log.Fields{
  137. "type": "kafka",
  138. })
  139. createTopic("crowdsecplaintext", "localhost:9092")
  140. w := kafka.NewWriter(kafka.WriterConfig{
  141. Brokers: []string{"localhost:9092"},
  142. Topic: "crowdsecplaintext",
  143. })
  144. if w == nil {
  145. log.Fatalf("Unable to setup a kafka producer")
  146. }
  147. for _, ts := range tests {
  148. ts := ts
  149. t.Run(ts.name, func(t *testing.T) {
  150. k := KafkaSource{}
  151. err := k.Configure([]byte(`
  152. source: kafka
  153. brokers:
  154. - localhost:9092
  155. topic: crowdsecplaintext`), subLogger)
  156. if err != nil {
  157. t.Fatalf("could not configure kafka source : %s", err)
  158. }
  159. tomb := tomb.Tomb{}
  160. out := make(chan types.Event)
  161. err = k.StreamingAcquisition(out, &tomb)
  162. cstest.AssertErrorContains(t, err, ts.expectedErr)
  163. actualLines := 0
  164. go writeToKafka(w, ts.logs)
  165. READLOOP:
  166. for {
  167. select {
  168. case <-out:
  169. actualLines++
  170. case <-time.After(2 * time.Second):
  171. break READLOOP
  172. }
  173. }
  174. require.Equal(t, ts.expectedLines, actualLines)
  175. tomb.Kill(nil)
  176. tomb.Wait()
  177. })
  178. }
  179. }
  180. func TestStreamingAcquisitionWithSSL(t *testing.T) {
  181. if runtime.GOOS == "windows" {
  182. t.Skip("Skipping test on windows")
  183. }
  184. tests := []struct {
  185. name string
  186. logs []string
  187. expectedLines int
  188. expectedErr string
  189. }{
  190. {
  191. name: "valid msgs",
  192. logs: []string{
  193. "message 1",
  194. "message 2",
  195. },
  196. expectedLines: 2,
  197. },
  198. }
  199. subLogger := log.WithFields(log.Fields{
  200. "type": "kafka",
  201. })
  202. createTopic("crowdsecssl", "localhost:9092")
  203. w2 := kafka.NewWriter(kafka.WriterConfig{
  204. Brokers: []string{"localhost:9092"},
  205. Topic: "crowdsecssl",
  206. })
  207. if w2 == nil {
  208. log.Fatalf("Unable to setup a kafka producer")
  209. }
  210. for _, ts := range tests {
  211. ts := ts
  212. t.Run(ts.name, func(t *testing.T) {
  213. k := KafkaSource{}
  214. err := k.Configure([]byte(`
  215. source: kafka
  216. brokers:
  217. - localhost:9093
  218. topic: crowdsecssl
  219. tls:
  220. insecure_skip_verify: true
  221. client_cert: ./testdata/kafkaClient.certificate.pem
  222. client_key: ./testdata/kafkaClient.key
  223. ca_cert: ./testdata/snakeoil-ca-1.crt
  224. `), subLogger)
  225. if err != nil {
  226. t.Fatalf("could not configure kafka source : %s", err)
  227. }
  228. tomb := tomb.Tomb{}
  229. out := make(chan types.Event)
  230. err = k.StreamingAcquisition(out, &tomb)
  231. cstest.AssertErrorContains(t, err, ts.expectedErr)
  232. actualLines := 0
  233. go writeToKafka(w2, ts.logs)
  234. READLOOP:
  235. for {
  236. select {
  237. case <-out:
  238. actualLines++
  239. case <-time.After(2 * time.Second):
  240. break READLOOP
  241. }
  242. }
  243. require.Equal(t, ts.expectedLines, actualLines)
  244. tomb.Kill(nil)
  245. tomb.Wait()
  246. })
  247. }
  248. }