kafkaAcquisition: add more debug (#2609)
* kafkaAcquisition: add more debug
This commit is contained in:
parent
d7ef51e6ba
commit
947b247a40
1 changed files with 17 additions and 6 deletions
|
@ -79,12 +79,16 @@ func (k *KafkaSource) UnmarshalConfig(yamlConfig []byte) error {
|
||||||
k.Config.Mode = configuration.TAIL_MODE
|
k.Config.Mode = configuration.TAIL_MODE
|
||||||
}
|
}
|
||||||
|
|
||||||
|
k.logger.Debugf("successfully unmarshaled kafka configuration : %+v", k.Config)
|
||||||
|
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
func (k *KafkaSource) Configure(yamlConfig []byte, logger *log.Entry) error {
|
func (k *KafkaSource) Configure(yamlConfig []byte, logger *log.Entry) error {
|
||||||
k.logger = logger
|
k.logger = logger
|
||||||
|
|
||||||
|
k.logger.Debugf("start configuring %s source", dataSourceName)
|
||||||
|
|
||||||
err := k.UnmarshalConfig(yamlConfig)
|
err := k.UnmarshalConfig(yamlConfig)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
|
@ -95,7 +99,7 @@ func (k *KafkaSource) Configure(yamlConfig []byte, logger *log.Entry) error {
|
||||||
return fmt.Errorf("cannot create %s dialer: %w", dataSourceName, err)
|
return fmt.Errorf("cannot create %s dialer: %w", dataSourceName, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
k.Reader, err = k.Config.NewReader(dialer)
|
k.Reader, err = k.Config.NewReader(dialer, k.logger)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("cannote create %s reader: %w", dataSourceName, err)
|
return fmt.Errorf("cannote create %s reader: %w", dataSourceName, err)
|
||||||
}
|
}
|
||||||
|
@ -104,6 +108,8 @@ func (k *KafkaSource) Configure(yamlConfig []byte, logger *log.Entry) error {
|
||||||
return fmt.Errorf("cannot create %s reader", dataSourceName)
|
return fmt.Errorf("cannot create %s reader", dataSourceName)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
k.logger.Debugf("successfully configured %s source", dataSourceName)
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -143,6 +149,7 @@ func (k *KafkaSource) ReadMessage(out chan types.Event) error {
|
||||||
// Start processing from latest Offset
|
// Start processing from latest Offset
|
||||||
k.Reader.SetOffsetAt(context.Background(), time.Now())
|
k.Reader.SetOffsetAt(context.Background(), time.Now())
|
||||||
for {
|
for {
|
||||||
|
k.logger.Tracef("reading message from topic '%s'", k.Config.Topic)
|
||||||
m, err := k.Reader.ReadMessage(context.Background())
|
m, err := k.Reader.ReadMessage(context.Background())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if err == io.EOF {
|
if err == io.EOF {
|
||||||
|
@ -160,6 +167,7 @@ func (k *KafkaSource) ReadMessage(out chan types.Event) error {
|
||||||
Process: true,
|
Process: true,
|
||||||
Module: k.GetName(),
|
Module: k.GetName(),
|
||||||
}
|
}
|
||||||
|
k.logger.Tracef("line with message read from topic '%s': %+v", k.Config.Topic, l)
|
||||||
linesRead.With(prometheus.Labels{"topic": k.Config.Topic}).Inc()
|
linesRead.With(prometheus.Labels{"topic": k.Config.Topic}).Inc()
|
||||||
var evt types.Event
|
var evt types.Event
|
||||||
|
|
||||||
|
@ -173,6 +181,7 @@ func (k *KafkaSource) ReadMessage(out chan types.Event) error {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (k *KafkaSource) RunReader(out chan types.Event, t *tomb.Tomb) error {
|
func (k *KafkaSource) RunReader(out chan types.Event, t *tomb.Tomb) error {
|
||||||
|
k.logger.Debugf("starting %s datasource reader goroutine with configuration %+v", dataSourceName, k.Config)
|
||||||
t.Go(func() error {
|
t.Go(func() error {
|
||||||
return k.ReadMessage(out)
|
return k.ReadMessage(out)
|
||||||
})
|
})
|
||||||
|
@ -190,7 +199,7 @@ func (k *KafkaSource) RunReader(out chan types.Event, t *tomb.Tomb) error {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (k *KafkaSource) StreamingAcquisition(out chan types.Event, t *tomb.Tomb) error {
|
func (k *KafkaSource) StreamingAcquisition(out chan types.Event, t *tomb.Tomb) error {
|
||||||
k.logger.Infof("start reader on topic '%s'", k.Config.Topic)
|
k.logger.Infof("start reader on brokers '%+v' with topic '%s'", k.Config.Brokers, k.Config.Topic)
|
||||||
|
|
||||||
t.Go(func() error {
|
t.Go(func() error {
|
||||||
defer trace.CatchPanic("crowdsec/acquis/kafka/live")
|
defer trace.CatchPanic("crowdsec/acquis/kafka/live")
|
||||||
|
@ -254,11 +263,13 @@ func (kc *KafkaConfiguration) NewDialer() (*kafka.Dialer, error) {
|
||||||
return dialer, nil
|
return dialer, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (kc *KafkaConfiguration) NewReader(dialer *kafka.Dialer) (*kafka.Reader, error) {
|
func (kc *KafkaConfiguration) NewReader(dialer *kafka.Dialer, logger *log.Entry) (*kafka.Reader, error) {
|
||||||
rConf := kafka.ReaderConfig{
|
rConf := kafka.ReaderConfig{
|
||||||
Brokers: kc.Brokers,
|
Brokers: kc.Brokers,
|
||||||
Topic: kc.Topic,
|
Topic: kc.Topic,
|
||||||
Dialer: dialer,
|
Dialer: dialer,
|
||||||
|
Logger: kafka.LoggerFunc(logger.Debugf),
|
||||||
|
ErrorLogger: kafka.LoggerFunc(logger.Errorf),
|
||||||
}
|
}
|
||||||
if kc.GroupID != "" {
|
if kc.GroupID != "" {
|
||||||
rConf.GroupID = kc.GroupID
|
rConf.GroupID = kc.GroupID
|
||||||
|
|
Loading…
Reference in a new issue