kinesis.go 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510
  1. package kinesisacquisition
  2. import (
  3. "bytes"
  4. "compress/gzip"
  5. "encoding/json"
  6. "fmt"
  7. "io/ioutil"
  8. "strings"
  9. "time"
  10. "github.com/aws/aws-sdk-go/aws"
  11. "github.com/aws/aws-sdk-go/aws/arn"
  12. "github.com/aws/aws-sdk-go/aws/session"
  13. "github.com/aws/aws-sdk-go/service/kinesis"
  14. "github.com/crowdsecurity/crowdsec/pkg/acquisition/configuration"
  15. "github.com/crowdsecurity/crowdsec/pkg/leakybucket"
  16. "github.com/crowdsecurity/crowdsec/pkg/types"
  17. "github.com/pkg/errors"
  18. "github.com/prometheus/client_golang/prometheus"
  19. log "github.com/sirupsen/logrus"
  20. "gopkg.in/tomb.v2"
  21. "gopkg.in/yaml.v2"
  22. )
  23. type KinesisConfiguration struct {
  24. configuration.DataSourceCommonCfg `yaml:",inline"`
  25. StreamName string `yaml:"stream_name"`
  26. StreamARN string `yaml:"stream_arn"`
  27. UseEnhancedFanOut bool `yaml:"use_enhanced_fanout"` //Use RegisterStreamConsumer and SubscribeToShard instead of GetRecords
  28. AwsProfile *string `yaml:"aws_profile"`
  29. AwsRegion string `yaml:"aws_region"`
  30. AwsEndpoint string `yaml:"aws_endpoint"`
  31. ConsumerName string `yaml:"consumer_name"`
  32. FromSubscription bool `yaml:"from_subscription"`
  33. MaxRetries int `yaml:"max_retries"`
  34. }
  35. type KinesisSource struct {
  36. Config KinesisConfiguration
  37. logger *log.Entry
  38. kClient *kinesis.Kinesis
  39. shardReaderTomb *tomb.Tomb
  40. }
  41. type CloudWatchSubscriptionRecord struct {
  42. MessageType string `json:"messageType"`
  43. Owner string `json:"owner"`
  44. LogGroup string `json:"logGroup"`
  45. LogStream string `json:"logStream"`
  46. SubscriptionFilters []string `json:"subscriptionFilters"`
  47. LogEvents []CloudwatchSubscriptionLogEvent `json:"logEvents"`
  48. }
  49. type CloudwatchSubscriptionLogEvent struct {
  50. ID string `json:"id"`
  51. Message string `json:"message"`
  52. Timestamp int64 `json:"timestamp"`
  53. }
  54. var linesRead = prometheus.NewCounterVec(
  55. prometheus.CounterOpts{
  56. Name: "cs_kinesis_stream_hits_total",
  57. Help: "Number of event read per stream.",
  58. },
  59. []string{"stream"},
  60. )
  61. var linesReadShards = prometheus.NewCounterVec(
  62. prometheus.CounterOpts{
  63. Name: "cs_kinesis_shards_hits_total",
  64. Help: "Number of event read per shards.",
  65. },
  66. []string{"stream", "shard"},
  67. )
  68. func (k *KinesisSource) newClient() error {
  69. var sess *session.Session
  70. if k.Config.AwsProfile != nil {
  71. sess = session.Must(session.NewSessionWithOptions(session.Options{
  72. SharedConfigState: session.SharedConfigEnable,
  73. Profile: *k.Config.AwsProfile,
  74. }))
  75. } else {
  76. sess = session.Must(session.NewSessionWithOptions(session.Options{
  77. SharedConfigState: session.SharedConfigEnable,
  78. }))
  79. }
  80. if sess == nil {
  81. return fmt.Errorf("failed to create aws session")
  82. }
  83. config := aws.NewConfig()
  84. if k.Config.AwsRegion != "" {
  85. config = config.WithRegion(k.Config.AwsRegion)
  86. }
  87. if k.Config.AwsEndpoint != "" {
  88. config = config.WithEndpoint(k.Config.AwsEndpoint)
  89. }
  90. k.kClient = kinesis.New(sess, config)
  91. if k.kClient == nil {
  92. return fmt.Errorf("failed to create kinesis client")
  93. }
  94. return nil
  95. }
  96. func (k *KinesisSource) GetMetrics() []prometheus.Collector {
  97. return []prometheus.Collector{linesRead, linesReadShards}
  98. }
  99. func (k *KinesisSource) GetAggregMetrics() []prometheus.Collector {
  100. return []prometheus.Collector{linesRead, linesReadShards}
  101. }
  102. func (k *KinesisSource) Configure(yamlConfig []byte, logger *log.Entry) error {
  103. config := KinesisConfiguration{}
  104. k.logger = logger
  105. err := yaml.UnmarshalStrict(yamlConfig, &config)
  106. if err != nil {
  107. return errors.Wrap(err, "Cannot parse kinesis datasource configuration")
  108. }
  109. if config.Mode == "" {
  110. config.Mode = configuration.TAIL_MODE
  111. }
  112. k.Config = config
  113. if k.Config.StreamName == "" && !k.Config.UseEnhancedFanOut {
  114. return fmt.Errorf("stream_name is mandatory when use_enhanced_fanout is false")
  115. }
  116. if k.Config.StreamARN == "" && k.Config.UseEnhancedFanOut {
  117. return fmt.Errorf("stream_arn is mandatory when use_enhanced_fanout is true")
  118. }
  119. if k.Config.ConsumerName == "" && k.Config.UseEnhancedFanOut {
  120. return fmt.Errorf("consumer_name is mandatory when use_enhanced_fanout is true")
  121. }
  122. if k.Config.StreamARN != "" && k.Config.StreamName != "" {
  123. return fmt.Errorf("stream_arn and stream_name are mutually exclusive")
  124. }
  125. if k.Config.MaxRetries <= 0 {
  126. k.Config.MaxRetries = 10
  127. }
  128. err = k.newClient()
  129. if err != nil {
  130. return errors.Wrap(err, "Cannot create kinesis client")
  131. }
  132. k.shardReaderTomb = &tomb.Tomb{}
  133. return nil
  134. }
  135. func (k *KinesisSource) ConfigureByDSN(string, map[string]string, *log.Entry) error {
  136. return fmt.Errorf("kinesis datasource does not support command-line acquisition")
  137. }
  138. func (k *KinesisSource) GetMode() string {
  139. return k.Config.Mode
  140. }
  141. func (k *KinesisSource) GetName() string {
  142. return "kinesis"
  143. }
  144. func (k *KinesisSource) OneShotAcquisition(out chan types.Event, t *tomb.Tomb) error {
  145. return fmt.Errorf("kinesis datasource does not support one-shot acquisition")
  146. }
  147. func (k *KinesisSource) decodeFromSubscription(record []byte) ([]CloudwatchSubscriptionLogEvent, error) {
  148. b := bytes.NewBuffer(record)
  149. r, err := gzip.NewReader(b)
  150. if err != nil {
  151. k.logger.Error(err)
  152. return nil, err
  153. }
  154. decompressed, err := ioutil.ReadAll(r)
  155. if err != nil {
  156. k.logger.Error(err)
  157. return nil, err
  158. }
  159. var subscriptionRecord CloudWatchSubscriptionRecord
  160. err = json.Unmarshal(decompressed, &subscriptionRecord)
  161. if err != nil {
  162. k.logger.Error(err)
  163. return nil, err
  164. }
  165. return subscriptionRecord.LogEvents, nil
  166. }
  167. func (k *KinesisSource) WaitForConsumerDeregistration(consumerName string, streamARN string) error {
  168. maxTries := k.Config.MaxRetries
  169. for i := 0; i < maxTries; i++ {
  170. _, err := k.kClient.DescribeStreamConsumer(&kinesis.DescribeStreamConsumerInput{
  171. ConsumerName: aws.String(consumerName),
  172. StreamARN: aws.String(streamARN),
  173. })
  174. if err != nil {
  175. switch err.(type) {
  176. case *kinesis.ResourceNotFoundException:
  177. return nil
  178. default:
  179. k.logger.Errorf("Error while waiting for consumer deregistration: %s", err)
  180. return errors.Wrap(err, "Cannot describe stream consumer")
  181. }
  182. }
  183. time.Sleep(time.Millisecond * 200 * time.Duration(i+1))
  184. }
  185. return fmt.Errorf("consumer %s is not deregistered after %d tries", consumerName, maxTries)
  186. }
  187. func (k *KinesisSource) DeregisterConsumer() error {
  188. k.logger.Debugf("Deregistering consumer %s if it exists", k.Config.ConsumerName)
  189. _, err := k.kClient.DeregisterStreamConsumer(&kinesis.DeregisterStreamConsumerInput{
  190. ConsumerName: aws.String(k.Config.ConsumerName),
  191. StreamARN: aws.String(k.Config.StreamARN),
  192. })
  193. if err != nil {
  194. switch err.(type) {
  195. case *kinesis.ResourceNotFoundException:
  196. default:
  197. return errors.Wrap(err, "Cannot deregister stream consumer")
  198. }
  199. }
  200. err = k.WaitForConsumerDeregistration(k.Config.ConsumerName, k.Config.StreamARN)
  201. if err != nil {
  202. return errors.Wrap(err, "Cannot wait for consumer deregistration")
  203. }
  204. return nil
  205. }
  206. func (k *KinesisSource) WaitForConsumerRegistration(consumerARN string) error {
  207. maxTries := k.Config.MaxRetries
  208. for i := 0; i < maxTries; i++ {
  209. describeOutput, err := k.kClient.DescribeStreamConsumer(&kinesis.DescribeStreamConsumerInput{
  210. ConsumerARN: aws.String(consumerARN),
  211. })
  212. if err != nil {
  213. return errors.Wrap(err, "Cannot describe stream consumer")
  214. }
  215. if *describeOutput.ConsumerDescription.ConsumerStatus == "ACTIVE" {
  216. k.logger.Debugf("Consumer %s is active", consumerARN)
  217. return nil
  218. }
  219. time.Sleep(time.Millisecond * 200 * time.Duration(i+1))
  220. k.logger.Debugf("Waiting for consumer registration %d", i)
  221. }
  222. return fmt.Errorf("consumer %s is not active after %d tries", consumerARN, maxTries)
  223. }
  224. func (k *KinesisSource) RegisterConsumer() (*kinesis.RegisterStreamConsumerOutput, error) {
  225. k.logger.Debugf("Registering consumer %s", k.Config.ConsumerName)
  226. streamConsumer, err := k.kClient.RegisterStreamConsumer(&kinesis.RegisterStreamConsumerInput{
  227. ConsumerName: aws.String(k.Config.ConsumerName),
  228. StreamARN: aws.String(k.Config.StreamARN),
  229. })
  230. if err != nil {
  231. return nil, errors.Wrap(err, "Cannot register stream consumer")
  232. }
  233. err = k.WaitForConsumerRegistration(*streamConsumer.Consumer.ConsumerARN)
  234. if err != nil {
  235. return nil, errors.Wrap(err, "Timeout while waiting for consumer to be active")
  236. }
  237. return streamConsumer, nil
  238. }
  239. func (k *KinesisSource) ParseAndPushRecords(records []*kinesis.Record, out chan types.Event, logger *log.Entry, shardId string) {
  240. for _, record := range records {
  241. if k.Config.StreamARN != "" {
  242. linesReadShards.With(prometheus.Labels{"stream": k.Config.StreamARN, "shard": shardId}).Inc()
  243. linesRead.With(prometheus.Labels{"stream": k.Config.StreamARN}).Inc()
  244. } else {
  245. linesReadShards.With(prometheus.Labels{"stream": k.Config.StreamName, "shard": shardId}).Inc()
  246. linesRead.With(prometheus.Labels{"stream": k.Config.StreamName}).Inc()
  247. }
  248. var data []CloudwatchSubscriptionLogEvent
  249. var err error
  250. if k.Config.FromSubscription {
  251. //The AWS docs says that the data is base64 encoded
  252. //but apparently GetRecords decodes it for us ?
  253. data, err = k.decodeFromSubscription(record.Data)
  254. if err != nil {
  255. logger.Errorf("Cannot decode data: %s", err)
  256. continue
  257. }
  258. } else {
  259. data = []CloudwatchSubscriptionLogEvent{{Message: string(record.Data)}}
  260. }
  261. for _, event := range data {
  262. logger.Tracef("got record %s", event.Message)
  263. l := types.Line{}
  264. l.Raw = event.Message
  265. l.Labels = k.Config.Labels
  266. l.Time = time.Now().UTC()
  267. l.Process = true
  268. l.Module = k.GetName()
  269. if k.Config.StreamARN != "" {
  270. l.Src = k.Config.StreamARN
  271. } else {
  272. l.Src = k.Config.StreamName
  273. }
  274. evt := types.Event{Line: l, Process: true, Type: types.LOG, ExpectMode: leakybucket.LIVE}
  275. out <- evt
  276. }
  277. }
  278. }
  279. func (k *KinesisSource) ReadFromSubscription(reader kinesis.SubscribeToShardEventStreamReader, out chan types.Event, shardId string, streamName string) error {
  280. logger := k.logger.WithFields(log.Fields{"shard_id": shardId})
  281. //ghetto sync, kinesis allows to subscribe to a closed shard, which will make the goroutine exit immediately
  282. //and we won't be able to start a new one if this is the first one started by the tomb
  283. //TODO: look into parent shards to see if a shard is closed before starting to read it ?
  284. time.Sleep(time.Second)
  285. for {
  286. select {
  287. case <-k.shardReaderTomb.Dying():
  288. logger.Infof("Subscribed shard reader is dying")
  289. err := reader.Close()
  290. if err != nil {
  291. return errors.Wrap(err, "Cannot close kinesis subscribed shard reader")
  292. }
  293. return nil
  294. case event, ok := <-reader.Events():
  295. if !ok {
  296. logger.Infof("Event chan has been closed")
  297. return nil
  298. }
  299. switch event := event.(type) {
  300. case *kinesis.SubscribeToShardEvent:
  301. k.ParseAndPushRecords(event.Records, out, logger, shardId)
  302. case *kinesis.SubscribeToShardEventStreamUnknownEvent:
  303. logger.Infof("got an unknown event, what to do ?")
  304. }
  305. }
  306. }
  307. }
  308. func (k *KinesisSource) SubscribeToShards(arn arn.ARN, streamConsumer *kinesis.RegisterStreamConsumerOutput, out chan types.Event) error {
  309. shards, err := k.kClient.ListShards(&kinesis.ListShardsInput{
  310. StreamName: aws.String(arn.Resource[7:]),
  311. })
  312. if err != nil {
  313. return errors.Wrap(err, "Cannot list shards for enhanced_read")
  314. }
  315. for _, shard := range shards.Shards {
  316. shardId := *shard.ShardId
  317. r, err := k.kClient.SubscribeToShard(&kinesis.SubscribeToShardInput{
  318. ShardId: aws.String(shardId),
  319. StartingPosition: &kinesis.StartingPosition{Type: aws.String(kinesis.ShardIteratorTypeLatest)},
  320. ConsumerARN: streamConsumer.Consumer.ConsumerARN,
  321. })
  322. if err != nil {
  323. return errors.Wrap(err, "Cannot subscribe to shard")
  324. }
  325. k.shardReaderTomb.Go(func() error {
  326. return k.ReadFromSubscription(r.GetEventStream().Reader, out, shardId, arn.Resource[7:])
  327. })
  328. }
  329. return nil
  330. }
  331. func (k *KinesisSource) EnhancedRead(out chan types.Event, t *tomb.Tomb) error {
  332. parsedARN, err := arn.Parse(k.Config.StreamARN)
  333. if err != nil {
  334. return errors.Wrap(err, "Cannot parse stream ARN")
  335. }
  336. if !strings.HasPrefix(parsedARN.Resource, "stream/") {
  337. return fmt.Errorf("resource part of stream ARN %s does not start with stream/", k.Config.StreamARN)
  338. }
  339. k.logger = k.logger.WithFields(log.Fields{"stream": parsedARN.Resource[7:]})
  340. k.logger.Info("starting kinesis acquisition with enhanced fan-out")
  341. err = k.DeregisterConsumer()
  342. if err != nil {
  343. return errors.Wrap(err, "Cannot deregister consumer")
  344. }
  345. streamConsumer, err := k.RegisterConsumer()
  346. if err != nil {
  347. return errors.Wrap(err, "Cannot register consumer")
  348. }
  349. for {
  350. k.shardReaderTomb = &tomb.Tomb{}
  351. err = k.SubscribeToShards(parsedARN, streamConsumer, out)
  352. if err != nil {
  353. return errors.Wrap(err, "Cannot subscribe to shards")
  354. }
  355. select {
  356. case <-t.Dying():
  357. k.logger.Infof("Kinesis source is dying")
  358. k.shardReaderTomb.Kill(nil)
  359. _ = k.shardReaderTomb.Wait() //we don't care about the error as we kill the tomb ourselves
  360. err = k.DeregisterConsumer()
  361. if err != nil {
  362. return errors.Wrap(err, "Cannot deregister consumer")
  363. }
  364. return nil
  365. case <-k.shardReaderTomb.Dying():
  366. k.logger.Debugf("Kinesis subscribed shard reader is dying")
  367. if k.shardReaderTomb.Err() != nil {
  368. return k.shardReaderTomb.Err()
  369. }
  370. //All goroutines have exited without error, so a resharding event, start again
  371. k.logger.Debugf("All reader goroutines have exited, resharding event or periodic resubscribe")
  372. continue
  373. }
  374. }
  375. }
  376. func (k *KinesisSource) ReadFromShard(out chan types.Event, shardId string) error {
  377. logger := k.logger.WithFields(log.Fields{"shard": shardId})
  378. logger.Debugf("Starting to read shard")
  379. sharIt, err := k.kClient.GetShardIterator(&kinesis.GetShardIteratorInput{ShardId: aws.String(shardId),
  380. StreamName: &k.Config.StreamName,
  381. ShardIteratorType: aws.String(kinesis.ShardIteratorTypeLatest)})
  382. if err != nil {
  383. logger.Errorf("Cannot get shard iterator: %s", err)
  384. return errors.Wrap(err, "Cannot get shard iterator")
  385. }
  386. it := sharIt.ShardIterator
  387. //AWS recommends to wait for a second between calls to GetRecords for a given shard
  388. ticker := time.NewTicker(time.Second)
  389. for {
  390. select {
  391. case <-ticker.C:
  392. records, err := k.kClient.GetRecords(&kinesis.GetRecordsInput{ShardIterator: it})
  393. it = records.NextShardIterator
  394. if err != nil {
  395. switch err.(type) {
  396. case *kinesis.ProvisionedThroughputExceededException:
  397. logger.Warn("Provisioned throughput exceeded")
  398. //TODO: implement exponential backoff
  399. continue
  400. case *kinesis.ExpiredIteratorException:
  401. logger.Warn("Expired iterator")
  402. continue
  403. default:
  404. logger.Error("Cannot get records")
  405. return errors.Wrap(err, "Cannot get records")
  406. }
  407. }
  408. k.ParseAndPushRecords(records.Records, out, logger, shardId)
  409. if it == nil {
  410. logger.Warnf("Shard has been closed")
  411. return nil
  412. }
  413. case <-k.shardReaderTomb.Dying():
  414. logger.Infof("shardReaderTomb is dying, exiting ReadFromShard")
  415. ticker.Stop()
  416. return nil
  417. }
  418. }
  419. }
  420. func (k *KinesisSource) ReadFromStream(out chan types.Event, t *tomb.Tomb) error {
  421. k.logger = k.logger.WithFields(log.Fields{"stream": k.Config.StreamName})
  422. k.logger.Info("starting kinesis acquisition from shards")
  423. for {
  424. shards, err := k.kClient.ListShards(&kinesis.ListShardsInput{
  425. StreamName: aws.String(k.Config.StreamName),
  426. })
  427. if err != nil {
  428. return errors.Wrap(err, "Cannot list shards")
  429. }
  430. k.shardReaderTomb = &tomb.Tomb{}
  431. for _, shard := range shards.Shards {
  432. shardId := *shard.ShardId
  433. k.shardReaderTomb.Go(func() error {
  434. defer types.CatchPanic("crowdsec/acquis/kinesis/streaming/shard")
  435. return k.ReadFromShard(out, shardId)
  436. })
  437. }
  438. select {
  439. case <-t.Dying():
  440. k.logger.Info("kinesis source is dying")
  441. k.shardReaderTomb.Kill(nil)
  442. _ = k.shardReaderTomb.Wait() //we don't care about the error as we kill the tomb ourselves
  443. return nil
  444. case <-k.shardReaderTomb.Dying():
  445. reason := k.shardReaderTomb.Err()
  446. if reason != nil {
  447. k.logger.Errorf("Unexpected error from shard reader : %s", reason)
  448. return reason
  449. }
  450. k.logger.Infof("All shards have been closed, probably a resharding event, restarting acquisition")
  451. continue
  452. }
  453. }
  454. }
  455. func (k *KinesisSource) StreamingAcquisition(out chan types.Event, t *tomb.Tomb) error {
  456. t.Go(func() error {
  457. defer types.CatchPanic("crowdsec/acquis/kinesis/streaming")
  458. if k.Config.UseEnhancedFanOut {
  459. return k.EnhancedRead(out, t)
  460. } else {
  461. return k.ReadFromStream(out, t)
  462. }
  463. })
  464. return nil
  465. }
  466. func (k *KinesisSource) CanRun() error {
  467. return nil
  468. }
  469. func (k *KinesisSource) Dump() interface{} {
  470. return k
  471. }