|
@@ -0,0 +1,242 @@
|
|
|
+package kafkaacquisition
|
|
|
+
|
|
|
+import (
|
|
|
+ "context"
|
|
|
+ "crypto/tls"
|
|
|
+ "crypto/x509"
|
|
|
+ "fmt"
|
|
|
+ "io"
|
|
|
+ "io/ioutil"
|
|
|
+ "strconv"
|
|
|
+ "time"
|
|
|
+
|
|
|
+ "github.com/crowdsecurity/crowdsec/pkg/acquisition/configuration"
|
|
|
+ "github.com/crowdsecurity/crowdsec/pkg/leakybucket"
|
|
|
+ "github.com/crowdsecurity/crowdsec/pkg/types"
|
|
|
+ "github.com/pkg/errors"
|
|
|
+ "github.com/prometheus/client_golang/prometheus"
|
|
|
+ "github.com/segmentio/kafka-go"
|
|
|
+ log "github.com/sirupsen/logrus"
|
|
|
+ "gopkg.in/tomb.v2"
|
|
|
+ "gopkg.in/yaml.v2"
|
|
|
+)
|
|
|
+
|
|
|
+var (
|
|
|
+ dataSourceName = "kafka"
|
|
|
+)
|
|
|
+
|
|
|
+var linesRead = prometheus.NewCounterVec(
|
|
|
+ prometheus.CounterOpts{
|
|
|
+ Name: "cs_kafkasource_hits_total",
|
|
|
+ Help: "Total lines that were read from topic",
|
|
|
+ },
|
|
|
+ []string{"topic"})
|
|
|
+
|
|
|
+type KafkaConfiguration struct {
|
|
|
+ Brokers []string `yaml:"brokers"`
|
|
|
+ Topic string `yaml:"topic"`
|
|
|
+ GroupID string `yaml:"group_id"`
|
|
|
+ Timeout string `yaml:"timeout"`
|
|
|
+ TLS *TLSConfig `yaml:"tls"`
|
|
|
+ configuration.DataSourceCommonCfg `yaml:",inline"`
|
|
|
+}
|
|
|
+
|
|
|
+type TLSConfig struct {
|
|
|
+ InsecureSkipVerify bool `yaml:"insecure_skip_verify"`
|
|
|
+ ClientCert string `yaml:"client_cert"`
|
|
|
+ ClientKey string `yaml:"client_key"`
|
|
|
+ CaCert string `yaml:"ca_cert"`
|
|
|
+}
|
|
|
+
|
|
|
+type KafkaSource struct {
|
|
|
+ Config KafkaConfiguration
|
|
|
+ logger *log.Entry
|
|
|
+ Reader *kafka.Reader
|
|
|
+}
|
|
|
+
|
|
|
+func (k *KafkaSource) Configure(Config []byte, logger *log.Entry) error {
|
|
|
+ var err error
|
|
|
+
|
|
|
+ k.Config = KafkaConfiguration{}
|
|
|
+ k.logger = logger
|
|
|
+ err = yaml.UnmarshalStrict(Config, &k.Config)
|
|
|
+ if err != nil {
|
|
|
+ return errors.Wrapf(err, "cannot parse %s datasource configuration", dataSourceName)
|
|
|
+ }
|
|
|
+ if len(k.Config.Brokers) == 0 {
|
|
|
+ return fmt.Errorf("cannot create a %s reader with an empty list of broker addresses", dataSourceName)
|
|
|
+ }
|
|
|
+ if k.Config.Topic == "" {
|
|
|
+ return fmt.Errorf("cannot create a %s reader with am empty topic", dataSourceName)
|
|
|
+ }
|
|
|
+ if k.Config.Mode == "" {
|
|
|
+ k.Config.Mode = configuration.TAIL_MODE
|
|
|
+ }
|
|
|
+ dialer, err := k.Config.NewDialer()
|
|
|
+ if err != nil {
|
|
|
+ return errors.Wrapf(err, "cannot create %s dialer", dataSourceName)
|
|
|
+ }
|
|
|
+ k.Reader, err = k.Config.NewReader(dialer)
|
|
|
+ if err != nil {
|
|
|
+ return errors.Wrapf(err, "cannote create %s reader", dataSourceName)
|
|
|
+ }
|
|
|
+ if k.Reader == nil {
|
|
|
+ return fmt.Errorf("cannot create %s reader", dataSourceName)
|
|
|
+ }
|
|
|
+ return nil
|
|
|
+}
|
|
|
+
|
|
|
+func (k *KafkaSource) ConfigureByDSN(string, map[string]string, *log.Entry) error {
|
|
|
+ return fmt.Errorf("%s datasource does not support command-line acquisition", dataSourceName)
|
|
|
+}
|
|
|
+
|
|
|
+func (k *KafkaSource) GetMode() string {
|
|
|
+ return k.Config.Mode
|
|
|
+}
|
|
|
+
|
|
|
+func (k *KafkaSource) GetName() string {
|
|
|
+ return dataSourceName
|
|
|
+}
|
|
|
+
|
|
|
+func (k *KafkaSource) OneShotAcquisition(out chan types.Event, t *tomb.Tomb) error {
|
|
|
+ return fmt.Errorf("%s datasource does not support one-shot acquisition", dataSourceName)
|
|
|
+}
|
|
|
+
|
|
|
+func (k *KafkaSource) CanRun() error {
|
|
|
+ return nil
|
|
|
+}
|
|
|
+
|
|
|
+func (k *KafkaSource) GetMetrics() []prometheus.Collector {
|
|
|
+ return []prometheus.Collector{linesRead}
|
|
|
+}
|
|
|
+
|
|
|
+func (k *KafkaSource) GetAggregMetrics() []prometheus.Collector {
|
|
|
+ return []prometheus.Collector{linesRead}
|
|
|
+}
|
|
|
+
|
|
|
+func (k *KafkaSource) Dump() interface{} {
|
|
|
+ return k
|
|
|
+}
|
|
|
+
|
|
|
+func (k *KafkaSource) ReadMessage(out chan types.Event) error {
|
|
|
+ // Start processing from latest Offset
|
|
|
+ k.Reader.SetOffsetAt(context.Background(), time.Now())
|
|
|
+ for {
|
|
|
+ m, err := k.Reader.ReadMessage(context.Background())
|
|
|
+ if err != nil {
|
|
|
+ if err == io.EOF {
|
|
|
+ return nil
|
|
|
+ }
|
|
|
+ k.logger.Errorln(errors.Wrapf(err, "while reading %s message", dataSourceName))
|
|
|
+ }
|
|
|
+ l := types.Line{
|
|
|
+ Raw: string(m.Value),
|
|
|
+ Labels: k.Config.Labels,
|
|
|
+ Time: m.Time.UTC(),
|
|
|
+ Src: k.Config.Topic,
|
|
|
+ Process: true,
|
|
|
+ Module: k.GetName(),
|
|
|
+ }
|
|
|
+ linesRead.With(prometheus.Labels{"topic": k.Config.Topic}).Inc()
|
|
|
+ var evt types.Event
|
|
|
+
|
|
|
+ if !k.Config.UseTimeMachine {
|
|
|
+ evt = types.Event{Line: l, Process: true, Type: types.LOG, ExpectMode: leakybucket.LIVE}
|
|
|
+ } else {
|
|
|
+ evt = types.Event{Line: l, Process: true, Type: types.LOG, ExpectMode: leakybucket.TIMEMACHINE}
|
|
|
+ }
|
|
|
+ out <- evt
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
+func (k *KafkaSource) RunReader(out chan types.Event, t *tomb.Tomb) error {
|
|
|
+ t.Go(func() error {
|
|
|
+ return k.ReadMessage(out)
|
|
|
+ })
|
|
|
+ //nolint //fp
|
|
|
+ for {
|
|
|
+ select {
|
|
|
+ case <-t.Dying():
|
|
|
+ k.logger.Infof("%s datasource topic %s stopping", dataSourceName, k.Config.Topic)
|
|
|
+ if err := k.Reader.Close(); err != nil {
|
|
|
+ return errors.Wrapf(err, "while closing %s reader on topic '%s'", dataSourceName, k.Config.Topic)
|
|
|
+ }
|
|
|
+ return nil
|
|
|
+ }
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
+func (k *KafkaSource) StreamingAcquisition(out chan types.Event, t *tomb.Tomb) error {
|
|
|
+ k.logger.Infof("start reader on topic '%s'", k.Config.Topic)
|
|
|
+
|
|
|
+ t.Go(func() error {
|
|
|
+ defer types.CatchPanic("crowdsec/acquis/kafka/live")
|
|
|
+ return k.RunReader(out, t)
|
|
|
+ })
|
|
|
+
|
|
|
+ return nil
|
|
|
+}
|
|
|
+
|
|
|
+func (kc *KafkaConfiguration) NewTLSConfig() (*tls.Config, error) {
|
|
|
+ tlsConfig := tls.Config{
|
|
|
+ InsecureSkipVerify: kc.TLS.InsecureSkipVerify,
|
|
|
+ }
|
|
|
+
|
|
|
+ cert, err := tls.LoadX509KeyPair(kc.TLS.ClientCert, kc.TLS.ClientKey)
|
|
|
+ if err != nil {
|
|
|
+ return &tlsConfig, err
|
|
|
+ }
|
|
|
+ tlsConfig.Certificates = []tls.Certificate{cert}
|
|
|
+
|
|
|
+ caCert, err := ioutil.ReadFile(kc.TLS.CaCert)
|
|
|
+ if err != nil {
|
|
|
+ return &tlsConfig, err
|
|
|
+ }
|
|
|
+ caCertPool := x509.NewCertPool()
|
|
|
+ caCertPool.AppendCertsFromPEM(caCert)
|
|
|
+ tlsConfig.RootCAs = caCertPool
|
|
|
+
|
|
|
+ tlsConfig.BuildNameToCertificate()
|
|
|
+ return &tlsConfig, err
|
|
|
+}
|
|
|
+
|
|
|
+func (kc *KafkaConfiguration) NewDialer() (*kafka.Dialer, error) {
|
|
|
+ dialer := &kafka.Dialer{}
|
|
|
+ var timeoutDuration time.Duration
|
|
|
+ timeoutDuration = time.Duration(10) * time.Second
|
|
|
+ if kc.Timeout != "" {
|
|
|
+ intTimeout, err := strconv.Atoi(kc.Timeout)
|
|
|
+ if err != nil {
|
|
|
+ return dialer, err
|
|
|
+ }
|
|
|
+ timeoutDuration = time.Duration(intTimeout) * time.Second
|
|
|
+ }
|
|
|
+ dialer = &kafka.Dialer{
|
|
|
+ Timeout: timeoutDuration,
|
|
|
+ DualStack: true,
|
|
|
+ }
|
|
|
+
|
|
|
+ if kc.TLS != nil {
|
|
|
+ tlsConfig, err := kc.NewTLSConfig()
|
|
|
+ if err != nil {
|
|
|
+ return dialer, err
|
|
|
+ }
|
|
|
+ dialer.TLS = tlsConfig
|
|
|
+ }
|
|
|
+ return dialer, nil
|
|
|
+}
|
|
|
+
|
|
|
+func (kc *KafkaConfiguration) NewReader(dialer *kafka.Dialer) (*kafka.Reader, error) {
|
|
|
+ rConf := kafka.ReaderConfig{
|
|
|
+ Brokers: kc.Brokers,
|
|
|
+ Topic: kc.Topic,
|
|
|
+ Dialer: dialer,
|
|
|
+ }
|
|
|
+ if kc.GroupID != "" {
|
|
|
+ rConf.GroupID = kc.GroupID
|
|
|
+ }
|
|
|
+ if err := rConf.Validate(); err != nil {
|
|
|
+ return &kafka.Reader{}, errors.Wrapf(err, "while validating reader configuration")
|
|
|
+ }
|
|
|
+ return kafka.NewReader(rConf), nil
|
|
|
+}
|