diff --git a/pkg/acquisition/modules/kafka/kafka.go b/pkg/acquisition/modules/kafka/kafka.go index f825a924c..e089902ca 100644 --- a/pkg/acquisition/modules/kafka/kafka.go +++ b/pkg/acquisition/modules/kafka/kafka.go @@ -79,12 +79,16 @@ func (k *KafkaSource) UnmarshalConfig(yamlConfig []byte) error { k.Config.Mode = configuration.TAIL_MODE } + k.logger.Debugf("successfully unmarshaled kafka configuration : %+v", k.Config) + return err } func (k *KafkaSource) Configure(yamlConfig []byte, logger *log.Entry) error { k.logger = logger + k.logger.Debugf("start configuring %s source", dataSourceName) + err := k.UnmarshalConfig(yamlConfig) if err != nil { return err @@ -95,7 +99,7 @@ func (k *KafkaSource) Configure(yamlConfig []byte, logger *log.Entry) error { return fmt.Errorf("cannot create %s dialer: %w", dataSourceName, err) } - k.Reader, err = k.Config.NewReader(dialer) + k.Reader, err = k.Config.NewReader(dialer, k.logger) if err != nil { return fmt.Errorf("cannote create %s reader: %w", dataSourceName, err) } @@ -104,6 +108,8 @@ func (k *KafkaSource) Configure(yamlConfig []byte, logger *log.Entry) error { return fmt.Errorf("cannot create %s reader", dataSourceName) } + k.logger.Debugf("successfully configured %s source", dataSourceName) + return nil } @@ -143,6 +149,7 @@ func (k *KafkaSource) ReadMessage(out chan types.Event) error { // Start processing from latest Offset k.Reader.SetOffsetAt(context.Background(), time.Now()) for { + k.logger.Tracef("reading message from topic '%s'", k.Config.Topic) m, err := k.Reader.ReadMessage(context.Background()) if err != nil { if err == io.EOF { @@ -160,6 +167,7 @@ func (k *KafkaSource) ReadMessage(out chan types.Event) error { Process: true, Module: k.GetName(), } + k.logger.Tracef("line with message read from topic '%s': %+v", k.Config.Topic, l) linesRead.With(prometheus.Labels{"topic": k.Config.Topic}).Inc() var evt types.Event @@ -173,6 +181,7 @@ func (k *KafkaSource) ReadMessage(out chan types.Event) error { } func (k *KafkaSource) RunReader(out chan types.Event, t *tomb.Tomb) error { + k.logger.Debugf("starting %s datasource reader goroutine with configuration %+v", dataSourceName, k.Config) t.Go(func() error { return k.ReadMessage(out) }) @@ -190,7 +199,7 @@ func (k *KafkaSource) RunReader(out chan types.Event, t *tomb.Tomb) error { } func (k *KafkaSource) StreamingAcquisition(out chan types.Event, t *tomb.Tomb) error { - k.logger.Infof("start reader on topic '%s'", k.Config.Topic) + k.logger.Infof("start reader on brokers '%+v' with topic '%s'", k.Config.Brokers, k.Config.Topic) t.Go(func() error { defer trace.CatchPanic("crowdsec/acquis/kafka/live") @@ -254,11 +263,13 @@ func (kc *KafkaConfiguration) NewDialer() (*kafka.Dialer, error) { return dialer, nil } -func (kc *KafkaConfiguration) NewReader(dialer *kafka.Dialer) (*kafka.Reader, error) { +func (kc *KafkaConfiguration) NewReader(dialer *kafka.Dialer, logger *log.Entry) (*kafka.Reader, error) { rConf := kafka.ReaderConfig{ - Brokers: kc.Brokers, - Topic: kc.Topic, - Dialer: dialer, + Brokers: kc.Brokers, + Topic: kc.Topic, + Dialer: dialer, + Logger: kafka.LoggerFunc(logger.Debugf), + ErrorLogger: kafka.LoggerFunc(logger.Errorf), } if kc.GroupID != "" { rConf.GroupID = kc.GroupID