123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160 |
- package csplugin
- import (
- "sync"
- "time"
- "github.com/crowdsecurity/crowdsec/pkg/models"
- log "github.com/sirupsen/logrus"
- "gopkg.in/tomb.v2"
- )
- /*
- PluginWatcher is here to allow grouping and threshold features for notification plugins :
- by frequency : it will signal the plugin to deliver notifications at this frequence (watchPluginTicker)
- by threshold : it will signal the plugin to deliver notifications when the number of alerts for this plugin reaches this threshold (watchPluginAlertCounts)
- */
- // TODO: When we start using go 1.18, consider moving this struct in some utils pkg. Make the implementation more generic using generics :)
- type alertCounterByPluginName struct {
- sync.Mutex
- data map[string]int
- }
- func newAlertCounterByPluginName() alertCounterByPluginName {
- return alertCounterByPluginName{
- data: make(map[string]int),
- }
- }
- func (acp *alertCounterByPluginName) Init() {
- acp.data = make(map[string]int)
- }
- func (acp *alertCounterByPluginName) Get(key string) (int, bool) {
- acp.Lock()
- val, ok := acp.data[key]
- acp.Unlock()
- return val, ok
- }
- func (acp *alertCounterByPluginName) Set(key string, val int) {
- acp.Lock()
- acp.data[key] = val
- acp.Unlock()
- }
- type PluginWatcher struct {
- PluginConfigByName map[string]PluginConfig
- AlertCountByPluginName alertCounterByPluginName
- PluginEvents chan string
- Inserts chan string
- tomb *tomb.Tomb
- }
- var DefaultEmptyTicker = time.Second * 1
- func (pw *PluginWatcher) Init(configs map[string]PluginConfig, alertsByPluginName map[string][]*models.Alert) {
- pw.PluginConfigByName = configs
- pw.PluginEvents = make(chan string)
- pw.AlertCountByPluginName = newAlertCounterByPluginName()
- pw.Inserts = make(chan string)
- for name := range alertsByPluginName {
- pw.AlertCountByPluginName.Set(name, 0)
- }
- }
- func (pw *PluginWatcher) Start(tomb *tomb.Tomb) {
- pw.tomb = tomb
- for name := range pw.PluginConfigByName {
- pname := name
- pw.tomb.Go(func() error {
- pw.watchPluginTicker(pname)
- return nil
- })
- }
- pw.tomb.Go(func() error {
- pw.watchPluginAlertCounts()
- return nil
- })
- }
- func (pw *PluginWatcher) watchPluginTicker(pluginName string) {
- var watchTime time.Duration
- var watchCount int = -1
- // Threshold can be set : by time, by count, or both
- // if only time is set, honor it
- // if only count is set, put timer to 1 second and just check size
- // if both are set, set timer to 1 second, but check size && time
- interval := pw.PluginConfigByName[pluginName].GroupWait
- threshold := pw.PluginConfigByName[pluginName].GroupThreshold
- //only size is set
- if threshold > 0 && interval == 0 {
- watchCount = threshold
- watchTime = DefaultEmptyTicker
- } else if interval != 0 && threshold == 0 {
- //only time is set
- watchTime = interval
- } else if interval != 0 && threshold != 0 {
- //both are set
- watchTime = DefaultEmptyTicker
- watchCount = threshold
- } else {
- //none are set, we sent every event we receive
- watchTime = DefaultEmptyTicker
- watchCount = 1
- }
- ticker := time.NewTicker(watchTime)
- var lastSend time.Time = time.Now()
- for {
- select {
- case <-ticker.C:
- send := false
- //if count threshold was set, honor no matter what
- if pc, _ := pw.AlertCountByPluginName.Get(pluginName); watchCount > 0 && pc >= watchCount {
- log.Tracef("[%s] %d alerts received, sending\n", pluginName, pc)
- send = true
- pw.AlertCountByPluginName.Set(pluginName, 0)
- }
- //if time threshold only was set
- if watchTime > 0 && watchTime == interval {
- log.Tracef("sending alerts to %s, duration %s elapsed", pluginName, interval)
- send = true
- }
- //if we hit timer because it was set low to honor count, check if we should trigger
- if watchTime == DefaultEmptyTicker && watchTime != interval && interval != 0 {
- if lastSend.Add(interval).Before(time.Now()) {
- log.Tracef("sending alerts to %s, duration %s elapsed", pluginName, interval)
- send = true
- lastSend = time.Now()
- }
- }
- if send {
- log.Tracef("sending alerts to %s", pluginName)
- pw.PluginEvents <- pluginName
- }
- case <-pw.tomb.Dying():
- ticker.Stop()
- return
- }
- }
- }
- func (pw *PluginWatcher) watchPluginAlertCounts() {
- for {
- select {
- case pluginName := <-pw.Inserts:
- //we only "count" pending alerts, and watchPluginTicker is actually going to send it
- if _, ok := pw.PluginConfigByName[pluginName]; ok {
- curr, _ := pw.AlertCountByPluginName.Get(pluginName)
- pw.AlertCountByPluginName.Set(pluginName, curr+1)
- }
- case <-pw.tomb.Dying():
- return
- }
- }
- }
|