123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274 |
- package kafkaacquisition
- import (
- "context"
- "net"
- "runtime"
- "strconv"
- "testing"
- "time"
- "github.com/segmentio/kafka-go"
- log "github.com/sirupsen/logrus"
- "github.com/stretchr/testify/require"
- "gopkg.in/tomb.v2"
- "github.com/crowdsecurity/go-cs-lib/cstest"
- "github.com/crowdsecurity/crowdsec/pkg/types"
- )
- func TestConfigure(t *testing.T) {
- tests := []struct {
- config string
- expectedErr string
- }{
- {
- config: `
- foobar: bla
- source: kafka`,
- expectedErr: "line 2: field foobar not found in type kafkaacquisition.KafkaConfiguration",
- },
- {
- config: `source: kafka`,
- expectedErr: "cannot create a kafka reader with an empty list of broker addresses",
- },
- {
- config: `
- source: kafka
- brokers:
- - bla
- timeout: 5`,
- expectedErr: "cannot create a kafka reader with am empty topic",
- },
- {
- config: `
- source: kafka
- brokers:
- - bla
- topic: toto
- timeout: aa`,
- expectedErr: "cannot create kafka dialer: strconv.Atoi: parsing \"aa\": invalid syntax",
- },
- {
- config: `
- source: kafka
- brokers:
- - localhost:9092
- topic: crowdsec`,
- expectedErr: "",
- },
- {
- config: `
- source: kafka
- brokers:
- - localhost:9092
- topic: crowdsec
- partition: 1
- group_id: crowdsec`,
- expectedErr: "cannote create kafka reader: cannot specify both group_id and partition",
- },
- }
- subLogger := log.WithFields(log.Fields{
- "type": "kafka",
- })
- for _, test := range tests {
- k := KafkaSource{}
- err := k.Configure([]byte(test.config), subLogger)
- cstest.AssertErrorContains(t, err, test.expectedErr)
- }
- }
- func writeToKafka(w *kafka.Writer, logs []string) {
- for idx, log := range logs {
- err := w.WriteMessages(context.Background(), kafka.Message{
- Key: []byte(strconv.Itoa(idx)),
- // create an arbitrary message payload for the value
- Value: []byte(log),
- })
- if err != nil {
- panic("could not write message " + err.Error())
- }
- }
- }
- func createTopic(topic string, broker string) {
- conn, err := kafka.Dial("tcp", broker)
- if err != nil {
- panic(err)
- }
- defer conn.Close()
- controller, err := conn.Controller()
- if err != nil {
- panic(err)
- }
- var controllerConn *kafka.Conn
- controllerConn, err = kafka.Dial("tcp", net.JoinHostPort(controller.Host, strconv.Itoa(controller.Port)))
- if err != nil {
- panic(err)
- }
- defer controllerConn.Close()
- topicConfigs := []kafka.TopicConfig{
- {
- Topic: topic,
- NumPartitions: 1,
- ReplicationFactor: 1,
- },
- }
- err = controllerConn.CreateTopics(topicConfigs...)
- if err != nil {
- panic(err)
- }
- }
- func TestStreamingAcquisition(t *testing.T) {
- if runtime.GOOS == "windows" {
- t.Skip("Skipping test on windows")
- }
- tests := []struct {
- name string
- logs []string
- expectedLines int
- expectedErr string
- }{
- {
- name: "valid msgs",
- logs: []string{
- "message 1",
- "message 2",
- "message 3",
- },
- expectedLines: 3,
- },
- }
- subLogger := log.WithFields(log.Fields{
- "type": "kafka",
- })
- createTopic("crowdsecplaintext", "localhost:9092")
- w := kafka.NewWriter(kafka.WriterConfig{
- Brokers: []string{"localhost:9092"},
- Topic: "crowdsecplaintext",
- })
- if w == nil {
- log.Fatalf("Unable to setup a kafka producer")
- }
- for _, ts := range tests {
- ts := ts
- t.Run(ts.name, func(t *testing.T) {
- k := KafkaSource{}
- err := k.Configure([]byte(`
- source: kafka
- brokers:
- - localhost:9092
- topic: crowdsecplaintext`), subLogger)
- if err != nil {
- t.Fatalf("could not configure kafka source : %s", err)
- }
- tomb := tomb.Tomb{}
- out := make(chan types.Event)
- err = k.StreamingAcquisition(out, &tomb)
- cstest.AssertErrorContains(t, err, ts.expectedErr)
- actualLines := 0
- go writeToKafka(w, ts.logs)
- READLOOP:
- for {
- select {
- case <-out:
- actualLines++
- case <-time.After(2 * time.Second):
- break READLOOP
- }
- }
- require.Equal(t, ts.expectedLines, actualLines)
- tomb.Kill(nil)
- tomb.Wait()
- })
- }
- }
- func TestStreamingAcquisitionWithSSL(t *testing.T) {
- if runtime.GOOS == "windows" {
- t.Skip("Skipping test on windows")
- }
- tests := []struct {
- name string
- logs []string
- expectedLines int
- expectedErr string
- }{
- {
- name: "valid msgs",
- logs: []string{
- "message 1",
- "message 2",
- },
- expectedLines: 2,
- },
- }
- subLogger := log.WithFields(log.Fields{
- "type": "kafka",
- })
- createTopic("crowdsecssl", "localhost:9092")
- w2 := kafka.NewWriter(kafka.WriterConfig{
- Brokers: []string{"localhost:9092"},
- Topic: "crowdsecssl",
- })
- if w2 == nil {
- log.Fatalf("Unable to setup a kafka producer")
- }
- for _, ts := range tests {
- ts := ts
- t.Run(ts.name, func(t *testing.T) {
- k := KafkaSource{}
- err := k.Configure([]byte(`
- source: kafka
- brokers:
- - localhost:9093
- topic: crowdsecssl
- tls:
- insecure_skip_verify: true
- client_cert: ./testdata/kafkaClient.certificate.pem
- client_key: ./testdata/kafkaClient.key
- ca_cert: ./testdata/snakeoil-ca-1.crt
- `), subLogger)
- if err != nil {
- t.Fatalf("could not configure kafka source : %s", err)
- }
- tomb := tomb.Tomb{}
- out := make(chan types.Event)
- err = k.StreamingAcquisition(out, &tomb)
- cstest.AssertErrorContains(t, err, ts.expectedErr)
- actualLines := 0
- go writeToKafka(w2, ts.logs)
- READLOOP:
- for {
- select {
- case <-out:
- actualLines++
- case <-time.After(2 * time.Second):
- break READLOOP
- }
- }
- require.Equal(t, ts.expectedLines, actualLines)
- tomb.Kill(nil)
- tomb.Wait()
- })
- }
- }
|