kafka.go 6.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269
  1. package kafkaacquisition
  2. import (
  3. "context"
  4. "crypto/tls"
  5. "crypto/x509"
  6. "fmt"
  7. "io"
  8. "os"
  9. "strconv"
  10. "time"
  11. "github.com/prometheus/client_golang/prometheus"
  12. "github.com/segmentio/kafka-go"
  13. log "github.com/sirupsen/logrus"
  14. "gopkg.in/tomb.v2"
  15. "gopkg.in/yaml.v2"
  16. "github.com/crowdsecurity/go-cs-lib/pkg/trace"
  17. "github.com/crowdsecurity/crowdsec/pkg/acquisition/configuration"
  18. "github.com/crowdsecurity/crowdsec/pkg/types"
  19. )
  20. var (
  21. dataSourceName = "kafka"
  22. )
  23. var linesRead = prometheus.NewCounterVec(
  24. prometheus.CounterOpts{
  25. Name: "cs_kafkasource_hits_total",
  26. Help: "Total lines that were read from topic",
  27. },
  28. []string{"topic"})
  29. type KafkaConfiguration struct {
  30. Brokers []string `yaml:"brokers"`
  31. Topic string `yaml:"topic"`
  32. GroupID string `yaml:"group_id"`
  33. Timeout string `yaml:"timeout"`
  34. TLS *TLSConfig `yaml:"tls"`
  35. configuration.DataSourceCommonCfg `yaml:",inline"`
  36. }
  37. type TLSConfig struct {
  38. InsecureSkipVerify bool `yaml:"insecure_skip_verify"`
  39. ClientCert string `yaml:"client_cert"`
  40. ClientKey string `yaml:"client_key"`
  41. CaCert string `yaml:"ca_cert"`
  42. }
  43. type KafkaSource struct {
  44. Config KafkaConfiguration
  45. logger *log.Entry
  46. Reader *kafka.Reader
  47. }
  48. func (k *KafkaSource) GetUuid() string {
  49. return k.Config.UniqueId
  50. }
  51. func (k *KafkaSource) UnmarshalConfig(yamlConfig []byte) error {
  52. k.Config = KafkaConfiguration{}
  53. err := yaml.UnmarshalStrict(yamlConfig, &k.Config)
  54. if err != nil {
  55. return fmt.Errorf("cannot parse %s datasource configuration: %w", dataSourceName, err)
  56. }
  57. if len(k.Config.Brokers) == 0 {
  58. return fmt.Errorf("cannot create a %s reader with an empty list of broker addresses", dataSourceName)
  59. }
  60. if k.Config.Topic == "" {
  61. return fmt.Errorf("cannot create a %s reader with am empty topic", dataSourceName)
  62. }
  63. if k.Config.Mode == "" {
  64. k.Config.Mode = configuration.TAIL_MODE
  65. }
  66. return err
  67. }
  68. func (k *KafkaSource) Configure(yamlConfig []byte, logger *log.Entry) error {
  69. k.logger = logger
  70. err := k.UnmarshalConfig(yamlConfig)
  71. if err != nil {
  72. return err
  73. }
  74. dialer, err := k.Config.NewDialer()
  75. if err != nil {
  76. return fmt.Errorf("cannot create %s dialer: %w", dataSourceName, err)
  77. }
  78. k.Reader, err = k.Config.NewReader(dialer)
  79. if err != nil {
  80. return fmt.Errorf("cannote create %s reader: %w", dataSourceName, err)
  81. }
  82. if k.Reader == nil {
  83. return fmt.Errorf("cannot create %s reader", dataSourceName)
  84. }
  85. return nil
  86. }
  87. func (k *KafkaSource) ConfigureByDSN(string, map[string]string, *log.Entry, string) error {
  88. return fmt.Errorf("%s datasource does not support command-line acquisition", dataSourceName)
  89. }
  90. func (k *KafkaSource) GetMode() string {
  91. return k.Config.Mode
  92. }
  93. func (k *KafkaSource) GetName() string {
  94. return dataSourceName
  95. }
  96. func (k *KafkaSource) OneShotAcquisition(out chan types.Event, t *tomb.Tomb) error {
  97. return fmt.Errorf("%s datasource does not support one-shot acquisition", dataSourceName)
  98. }
  99. func (k *KafkaSource) CanRun() error {
  100. return nil
  101. }
  102. func (k *KafkaSource) GetMetrics() []prometheus.Collector {
  103. return []prometheus.Collector{linesRead}
  104. }
  105. func (k *KafkaSource) GetAggregMetrics() []prometheus.Collector {
  106. return []prometheus.Collector{linesRead}
  107. }
  108. func (k *KafkaSource) Dump() interface{} {
  109. return k
  110. }
  111. func (k *KafkaSource) ReadMessage(out chan types.Event) error {
  112. // Start processing from latest Offset
  113. k.Reader.SetOffsetAt(context.Background(), time.Now())
  114. for {
  115. m, err := k.Reader.ReadMessage(context.Background())
  116. if err != nil {
  117. if err == io.EOF {
  118. return nil
  119. }
  120. k.logger.Errorln(fmt.Errorf("while reading %s message: %w", dataSourceName, err))
  121. }
  122. l := types.Line{
  123. Raw: string(m.Value),
  124. Labels: k.Config.Labels,
  125. Time: m.Time.UTC(),
  126. Src: k.Config.Topic,
  127. Process: true,
  128. Module: k.GetName(),
  129. }
  130. linesRead.With(prometheus.Labels{"topic": k.Config.Topic}).Inc()
  131. var evt types.Event
  132. if !k.Config.UseTimeMachine {
  133. evt = types.Event{Line: l, Process: true, Type: types.LOG, ExpectMode: types.LIVE}
  134. } else {
  135. evt = types.Event{Line: l, Process: true, Type: types.LOG, ExpectMode: types.TIMEMACHINE}
  136. }
  137. out <- evt
  138. }
  139. }
  140. func (k *KafkaSource) RunReader(out chan types.Event, t *tomb.Tomb) error {
  141. t.Go(func() error {
  142. return k.ReadMessage(out)
  143. })
  144. //nolint //fp
  145. for {
  146. select {
  147. case <-t.Dying():
  148. k.logger.Infof("%s datasource topic %s stopping", dataSourceName, k.Config.Topic)
  149. if err := k.Reader.Close(); err != nil {
  150. return fmt.Errorf("while closing %s reader on topic '%s': %w", dataSourceName, k.Config.Topic, err)
  151. }
  152. return nil
  153. }
  154. }
  155. }
  156. func (k *KafkaSource) StreamingAcquisition(out chan types.Event, t *tomb.Tomb) error {
  157. k.logger.Infof("start reader on topic '%s'", k.Config.Topic)
  158. t.Go(func() error {
  159. defer trace.CatchPanic("crowdsec/acquis/kafka/live")
  160. return k.RunReader(out, t)
  161. })
  162. return nil
  163. }
  164. func (kc *KafkaConfiguration) NewTLSConfig() (*tls.Config, error) {
  165. tlsConfig := tls.Config{
  166. InsecureSkipVerify: kc.TLS.InsecureSkipVerify,
  167. }
  168. cert, err := tls.LoadX509KeyPair(kc.TLS.ClientCert, kc.TLS.ClientKey)
  169. if err != nil {
  170. return &tlsConfig, err
  171. }
  172. tlsConfig.Certificates = []tls.Certificate{cert}
  173. caCert, err := os.ReadFile(kc.TLS.CaCert)
  174. if err != nil {
  175. return &tlsConfig, err
  176. }
  177. caCertPool, err := x509.SystemCertPool()
  178. if err != nil {
  179. return &tlsConfig, fmt.Errorf("unable to load system CA certificates: %w", err)
  180. }
  181. if caCertPool == nil {
  182. caCertPool = x509.NewCertPool()
  183. }
  184. caCertPool.AppendCertsFromPEM(caCert)
  185. tlsConfig.RootCAs = caCertPool
  186. tlsConfig.BuildNameToCertificate()
  187. return &tlsConfig, err
  188. }
  189. func (kc *KafkaConfiguration) NewDialer() (*kafka.Dialer, error) {
  190. dialer := &kafka.Dialer{}
  191. var timeoutDuration time.Duration
  192. timeoutDuration = time.Duration(10) * time.Second
  193. if kc.Timeout != "" {
  194. intTimeout, err := strconv.Atoi(kc.Timeout)
  195. if err != nil {
  196. return dialer, err
  197. }
  198. timeoutDuration = time.Duration(intTimeout) * time.Second
  199. }
  200. dialer = &kafka.Dialer{
  201. Timeout: timeoutDuration,
  202. DualStack: true,
  203. }
  204. if kc.TLS != nil {
  205. tlsConfig, err := kc.NewTLSConfig()
  206. if err != nil {
  207. return dialer, err
  208. }
  209. dialer.TLS = tlsConfig
  210. }
  211. return dialer, nil
  212. }
  213. func (kc *KafkaConfiguration) NewReader(dialer *kafka.Dialer) (*kafka.Reader, error) {
  214. rConf := kafka.ReaderConfig{
  215. Brokers: kc.Brokers,
  216. Topic: kc.Topic,
  217. Dialer: dialer,
  218. }
  219. if kc.GroupID != "" {
  220. rConf.GroupID = kc.GroupID
  221. }
  222. if err := rConf.Validate(); err != nil {
  223. return &kafka.Reader{}, fmt.Errorf("while validating reader configuration: %w", err)
  224. }
  225. return kafka.NewReader(rConf), nil
  226. }