kafka.go 7.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290
  1. package kafkaacquisition
  2. import (
  3. "context"
  4. "crypto/tls"
  5. "crypto/x509"
  6. "errors"
  7. "fmt"
  8. "io"
  9. "os"
  10. "strconv"
  11. "time"
  12. "github.com/prometheus/client_golang/prometheus"
  13. "github.com/segmentio/kafka-go"
  14. log "github.com/sirupsen/logrus"
  15. "gopkg.in/tomb.v2"
  16. "gopkg.in/yaml.v2"
  17. "github.com/crowdsecurity/go-cs-lib/trace"
  18. "github.com/crowdsecurity/crowdsec/pkg/acquisition/configuration"
  19. "github.com/crowdsecurity/crowdsec/pkg/types"
  20. )
  21. var (
  22. dataSourceName = "kafka"
  23. )
  24. var linesRead = prometheus.NewCounterVec(
  25. prometheus.CounterOpts{
  26. Name: "cs_kafkasource_hits_total",
  27. Help: "Total lines that were read from topic",
  28. },
  29. []string{"topic"})
  30. type KafkaConfiguration struct {
  31. Brokers []string `yaml:"brokers"`
  32. Topic string `yaml:"topic"`
  33. GroupID string `yaml:"group_id"`
  34. Partition int `yaml:"partition"`
  35. Timeout string `yaml:"timeout"`
  36. TLS *TLSConfig `yaml:"tls"`
  37. configuration.DataSourceCommonCfg `yaml:",inline"`
  38. }
  39. type TLSConfig struct {
  40. InsecureSkipVerify bool `yaml:"insecure_skip_verify"`
  41. ClientCert string `yaml:"client_cert"`
  42. ClientKey string `yaml:"client_key"`
  43. CaCert string `yaml:"ca_cert"`
  44. }
  45. type KafkaSource struct {
  46. Config KafkaConfiguration
  47. logger *log.Entry
  48. Reader *kafka.Reader
  49. }
  50. func (k *KafkaSource) GetUuid() string {
  51. return k.Config.UniqueId
  52. }
  53. func (k *KafkaSource) UnmarshalConfig(yamlConfig []byte) error {
  54. k.Config = KafkaConfiguration{}
  55. err := yaml.UnmarshalStrict(yamlConfig, &k.Config)
  56. if err != nil {
  57. return fmt.Errorf("cannot parse %s datasource configuration: %w", dataSourceName, err)
  58. }
  59. if len(k.Config.Brokers) == 0 {
  60. return fmt.Errorf("cannot create a %s reader with an empty list of broker addresses", dataSourceName)
  61. }
  62. if k.Config.Topic == "" {
  63. return fmt.Errorf("cannot create a %s reader with am empty topic", dataSourceName)
  64. }
  65. if k.Config.Mode == "" {
  66. k.Config.Mode = configuration.TAIL_MODE
  67. }
  68. k.logger.Debugf("successfully unmarshaled kafka configuration : %+v", k.Config)
  69. return err
  70. }
  71. func (k *KafkaSource) Configure(yamlConfig []byte, logger *log.Entry) error {
  72. k.logger = logger
  73. k.logger.Debugf("start configuring %s source", dataSourceName)
  74. err := k.UnmarshalConfig(yamlConfig)
  75. if err != nil {
  76. return err
  77. }
  78. dialer, err := k.Config.NewDialer()
  79. if err != nil {
  80. return fmt.Errorf("cannot create %s dialer: %w", dataSourceName, err)
  81. }
  82. k.Reader, err = k.Config.NewReader(dialer, k.logger)
  83. if err != nil {
  84. return fmt.Errorf("cannote create %s reader: %w", dataSourceName, err)
  85. }
  86. if k.Reader == nil {
  87. return fmt.Errorf("cannot create %s reader", dataSourceName)
  88. }
  89. k.logger.Debugf("successfully configured %s source", dataSourceName)
  90. return nil
  91. }
  92. func (k *KafkaSource) ConfigureByDSN(string, map[string]string, *log.Entry, string) error {
  93. return fmt.Errorf("%s datasource does not support command-line acquisition", dataSourceName)
  94. }
  95. func (k *KafkaSource) GetMode() string {
  96. return k.Config.Mode
  97. }
  98. func (k *KafkaSource) GetName() string {
  99. return dataSourceName
  100. }
  101. func (k *KafkaSource) OneShotAcquisition(out chan types.Event, t *tomb.Tomb) error {
  102. return fmt.Errorf("%s datasource does not support one-shot acquisition", dataSourceName)
  103. }
  104. func (k *KafkaSource) CanRun() error {
  105. return nil
  106. }
  107. func (k *KafkaSource) GetMetrics() []prometheus.Collector {
  108. return []prometheus.Collector{linesRead}
  109. }
  110. func (k *KafkaSource) GetAggregMetrics() []prometheus.Collector {
  111. return []prometheus.Collector{linesRead}
  112. }
  113. func (k *KafkaSource) Dump() interface{} {
  114. return k
  115. }
  116. func (k *KafkaSource) ReadMessage(out chan types.Event) error {
  117. // Start processing from latest Offset
  118. k.Reader.SetOffsetAt(context.Background(), time.Now())
  119. for {
  120. k.logger.Tracef("reading message from topic '%s'", k.Config.Topic)
  121. m, err := k.Reader.ReadMessage(context.Background())
  122. if err != nil {
  123. if errors.Is(err, io.EOF) {
  124. return nil
  125. }
  126. k.logger.Errorln(fmt.Errorf("while reading %s message: %w", dataSourceName, err))
  127. continue
  128. }
  129. k.logger.Tracef("got message: %s", string(m.Value))
  130. l := types.Line{
  131. Raw: string(m.Value),
  132. Labels: k.Config.Labels,
  133. Time: m.Time.UTC(),
  134. Src: k.Config.Topic,
  135. Process: true,
  136. Module: k.GetName(),
  137. }
  138. k.logger.Tracef("line with message read from topic '%s': %+v", k.Config.Topic, l)
  139. linesRead.With(prometheus.Labels{"topic": k.Config.Topic}).Inc()
  140. var evt types.Event
  141. if !k.Config.UseTimeMachine {
  142. evt = types.Event{Line: l, Process: true, Type: types.LOG, ExpectMode: types.LIVE}
  143. } else {
  144. evt = types.Event{Line: l, Process: true, Type: types.LOG, ExpectMode: types.TIMEMACHINE}
  145. }
  146. out <- evt
  147. }
  148. }
  149. func (k *KafkaSource) RunReader(out chan types.Event, t *tomb.Tomb) error {
  150. k.logger.Debugf("starting %s datasource reader goroutine with configuration %+v", dataSourceName, k.Config)
  151. t.Go(func() error {
  152. return k.ReadMessage(out)
  153. })
  154. //nolint //fp
  155. for {
  156. select {
  157. case <-t.Dying():
  158. k.logger.Infof("%s datasource topic %s stopping", dataSourceName, k.Config.Topic)
  159. if err := k.Reader.Close(); err != nil {
  160. return fmt.Errorf("while closing %s reader on topic '%s': %w", dataSourceName, k.Config.Topic, err)
  161. }
  162. return nil
  163. }
  164. }
  165. }
  166. func (k *KafkaSource) StreamingAcquisition(out chan types.Event, t *tomb.Tomb) error {
  167. k.logger.Infof("start reader on brokers '%+v' with topic '%s'", k.Config.Brokers, k.Config.Topic)
  168. t.Go(func() error {
  169. defer trace.CatchPanic("crowdsec/acquis/kafka/live")
  170. return k.RunReader(out, t)
  171. })
  172. return nil
  173. }
  174. func (kc *KafkaConfiguration) NewTLSConfig() (*tls.Config, error) {
  175. tlsConfig := tls.Config{
  176. InsecureSkipVerify: kc.TLS.InsecureSkipVerify,
  177. }
  178. cert, err := tls.LoadX509KeyPair(kc.TLS.ClientCert, kc.TLS.ClientKey)
  179. if err != nil {
  180. return &tlsConfig, err
  181. }
  182. tlsConfig.Certificates = []tls.Certificate{cert}
  183. caCert, err := os.ReadFile(kc.TLS.CaCert)
  184. if err != nil {
  185. return &tlsConfig, err
  186. }
  187. caCertPool, err := x509.SystemCertPool()
  188. if err != nil {
  189. return &tlsConfig, fmt.Errorf("unable to load system CA certificates: %w", err)
  190. }
  191. if caCertPool == nil {
  192. caCertPool = x509.NewCertPool()
  193. }
  194. caCertPool.AppendCertsFromPEM(caCert)
  195. tlsConfig.RootCAs = caCertPool
  196. return &tlsConfig, err
  197. }
  198. func (kc *KafkaConfiguration) NewDialer() (*kafka.Dialer, error) {
  199. dialer := &kafka.Dialer{}
  200. var timeoutDuration time.Duration
  201. timeoutDuration = time.Duration(10) * time.Second
  202. if kc.Timeout != "" {
  203. intTimeout, err := strconv.Atoi(kc.Timeout)
  204. if err != nil {
  205. return dialer, err
  206. }
  207. timeoutDuration = time.Duration(intTimeout) * time.Second
  208. }
  209. dialer = &kafka.Dialer{
  210. Timeout: timeoutDuration,
  211. DualStack: true,
  212. }
  213. if kc.TLS != nil {
  214. tlsConfig, err := kc.NewTLSConfig()
  215. if err != nil {
  216. return dialer, err
  217. }
  218. dialer.TLS = tlsConfig
  219. }
  220. return dialer, nil
  221. }
  222. func (kc *KafkaConfiguration) NewReader(dialer *kafka.Dialer, logger *log.Entry) (*kafka.Reader, error) {
  223. rConf := kafka.ReaderConfig{
  224. Brokers: kc.Brokers,
  225. Topic: kc.Topic,
  226. Dialer: dialer,
  227. Logger: kafka.LoggerFunc(logger.Debugf),
  228. ErrorLogger: kafka.LoggerFunc(logger.Errorf),
  229. }
  230. if kc.GroupID != "" && kc.Partition != 0 {
  231. return &kafka.Reader{}, fmt.Errorf("cannot specify both group_id and partition")
  232. }
  233. if kc.GroupID != "" {
  234. rConf.GroupID = kc.GroupID
  235. } else if kc.Partition != 0 {
  236. rConf.Partition = kc.Partition
  237. } else {
  238. logger.Warnf("no group_id specified, crowdsec will only read from the 1st partition of the topic")
  239. }
  240. if err := rConf.Validate(); err != nil {
  241. return &kafka.Reader{}, fmt.Errorf("while validating reader configuration: %w", err)
  242. }
  243. return kafka.NewReader(rConf), nil
  244. }