123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269 |
- package kafkaacquisition
- import (
- "context"
- "crypto/tls"
- "crypto/x509"
- "fmt"
- "io"
- "os"
- "strconv"
- "time"
- "github.com/prometheus/client_golang/prometheus"
- "github.com/segmentio/kafka-go"
- log "github.com/sirupsen/logrus"
- "gopkg.in/tomb.v2"
- "gopkg.in/yaml.v2"
- "github.com/crowdsecurity/go-cs-lib/pkg/trace"
- "github.com/crowdsecurity/crowdsec/pkg/acquisition/configuration"
- "github.com/crowdsecurity/crowdsec/pkg/types"
- )
- var (
- dataSourceName = "kafka"
- )
- var linesRead = prometheus.NewCounterVec(
- prometheus.CounterOpts{
- Name: "cs_kafkasource_hits_total",
- Help: "Total lines that were read from topic",
- },
- []string{"topic"})
- type KafkaConfiguration struct {
- Brokers []string `yaml:"brokers"`
- Topic string `yaml:"topic"`
- GroupID string `yaml:"group_id"`
- Timeout string `yaml:"timeout"`
- TLS *TLSConfig `yaml:"tls"`
- configuration.DataSourceCommonCfg `yaml:",inline"`
- }
- type TLSConfig struct {
- InsecureSkipVerify bool `yaml:"insecure_skip_verify"`
- ClientCert string `yaml:"client_cert"`
- ClientKey string `yaml:"client_key"`
- CaCert string `yaml:"ca_cert"`
- }
- type KafkaSource struct {
- Config KafkaConfiguration
- logger *log.Entry
- Reader *kafka.Reader
- }
- func (k *KafkaSource) GetUuid() string {
- return k.Config.UniqueId
- }
- func (k *KafkaSource) UnmarshalConfig(yamlConfig []byte) error {
- k.Config = KafkaConfiguration{}
- err := yaml.UnmarshalStrict(yamlConfig, &k.Config)
- if err != nil {
- return fmt.Errorf("cannot parse %s datasource configuration: %w", dataSourceName, err)
- }
- if len(k.Config.Brokers) == 0 {
- return fmt.Errorf("cannot create a %s reader with an empty list of broker addresses", dataSourceName)
- }
- if k.Config.Topic == "" {
- return fmt.Errorf("cannot create a %s reader with am empty topic", dataSourceName)
- }
- if k.Config.Mode == "" {
- k.Config.Mode = configuration.TAIL_MODE
- }
- return err
- }
- func (k *KafkaSource) Configure(yamlConfig []byte, logger *log.Entry) error {
- k.logger = logger
- err := k.UnmarshalConfig(yamlConfig)
- if err != nil {
- return err
- }
- dialer, err := k.Config.NewDialer()
- if err != nil {
- return fmt.Errorf("cannot create %s dialer: %w", dataSourceName, err)
- }
- k.Reader, err = k.Config.NewReader(dialer)
- if err != nil {
- return fmt.Errorf("cannote create %s reader: %w", dataSourceName, err)
- }
- if k.Reader == nil {
- return fmt.Errorf("cannot create %s reader", dataSourceName)
- }
- return nil
- }
- func (k *KafkaSource) ConfigureByDSN(string, map[string]string, *log.Entry, string) error {
- return fmt.Errorf("%s datasource does not support command-line acquisition", dataSourceName)
- }
- func (k *KafkaSource) GetMode() string {
- return k.Config.Mode
- }
- func (k *KafkaSource) GetName() string {
- return dataSourceName
- }
- func (k *KafkaSource) OneShotAcquisition(out chan types.Event, t *tomb.Tomb) error {
- return fmt.Errorf("%s datasource does not support one-shot acquisition", dataSourceName)
- }
- func (k *KafkaSource) CanRun() error {
- return nil
- }
- func (k *KafkaSource) GetMetrics() []prometheus.Collector {
- return []prometheus.Collector{linesRead}
- }
- func (k *KafkaSource) GetAggregMetrics() []prometheus.Collector {
- return []prometheus.Collector{linesRead}
- }
- func (k *KafkaSource) Dump() interface{} {
- return k
- }
- func (k *KafkaSource) ReadMessage(out chan types.Event) error {
- // Start processing from latest Offset
- k.Reader.SetOffsetAt(context.Background(), time.Now())
- for {
- m, err := k.Reader.ReadMessage(context.Background())
- if err != nil {
- if err == io.EOF {
- return nil
- }
- k.logger.Errorln(fmt.Errorf("while reading %s message: %w", dataSourceName, err))
- }
- l := types.Line{
- Raw: string(m.Value),
- Labels: k.Config.Labels,
- Time: m.Time.UTC(),
- Src: k.Config.Topic,
- Process: true,
- Module: k.GetName(),
- }
- linesRead.With(prometheus.Labels{"topic": k.Config.Topic}).Inc()
- var evt types.Event
- if !k.Config.UseTimeMachine {
- evt = types.Event{Line: l, Process: true, Type: types.LOG, ExpectMode: types.LIVE}
- } else {
- evt = types.Event{Line: l, Process: true, Type: types.LOG, ExpectMode: types.TIMEMACHINE}
- }
- out <- evt
- }
- }
- func (k *KafkaSource) RunReader(out chan types.Event, t *tomb.Tomb) error {
- t.Go(func() error {
- return k.ReadMessage(out)
- })
- //nolint //fp
- for {
- select {
- case <-t.Dying():
- k.logger.Infof("%s datasource topic %s stopping", dataSourceName, k.Config.Topic)
- if err := k.Reader.Close(); err != nil {
- return fmt.Errorf("while closing %s reader on topic '%s': %w", dataSourceName, k.Config.Topic, err)
- }
- return nil
- }
- }
- }
- func (k *KafkaSource) StreamingAcquisition(out chan types.Event, t *tomb.Tomb) error {
- k.logger.Infof("start reader on topic '%s'", k.Config.Topic)
- t.Go(func() error {
- defer trace.CatchPanic("crowdsec/acquis/kafka/live")
- return k.RunReader(out, t)
- })
- return nil
- }
- func (kc *KafkaConfiguration) NewTLSConfig() (*tls.Config, error) {
- tlsConfig := tls.Config{
- InsecureSkipVerify: kc.TLS.InsecureSkipVerify,
- }
- cert, err := tls.LoadX509KeyPair(kc.TLS.ClientCert, kc.TLS.ClientKey)
- if err != nil {
- return &tlsConfig, err
- }
- tlsConfig.Certificates = []tls.Certificate{cert}
- caCert, err := os.ReadFile(kc.TLS.CaCert)
- if err != nil {
- return &tlsConfig, err
- }
- caCertPool, err := x509.SystemCertPool()
- if err != nil {
- return &tlsConfig, fmt.Errorf("unable to load system CA certificates: %w", err)
- }
- if caCertPool == nil {
- caCertPool = x509.NewCertPool()
- }
- caCertPool.AppendCertsFromPEM(caCert)
- tlsConfig.RootCAs = caCertPool
- tlsConfig.BuildNameToCertificate()
- return &tlsConfig, err
- }
- func (kc *KafkaConfiguration) NewDialer() (*kafka.Dialer, error) {
- dialer := &kafka.Dialer{}
- var timeoutDuration time.Duration
- timeoutDuration = time.Duration(10) * time.Second
- if kc.Timeout != "" {
- intTimeout, err := strconv.Atoi(kc.Timeout)
- if err != nil {
- return dialer, err
- }
- timeoutDuration = time.Duration(intTimeout) * time.Second
- }
- dialer = &kafka.Dialer{
- Timeout: timeoutDuration,
- DualStack: true,
- }
- if kc.TLS != nil {
- tlsConfig, err := kc.NewTLSConfig()
- if err != nil {
- return dialer, err
- }
- dialer.TLS = tlsConfig
- }
- return dialer, nil
- }
- func (kc *KafkaConfiguration) NewReader(dialer *kafka.Dialer) (*kafka.Reader, error) {
- rConf := kafka.ReaderConfig{
- Brokers: kc.Brokers,
- Topic: kc.Topic,
- Dialer: dialer,
- }
- if kc.GroupID != "" {
- rConf.GroupID = kc.GroupID
- }
- if err := rConf.Validate(); err != nil {
- return &kafka.Reader{}, fmt.Errorf("while validating reader configuration: %w", err)
- }
- return kafka.NewReader(rConf), nil
- }
|