watcher.go 4.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160
  1. package csplugin
  2. import (
  3. "sync"
  4. "time"
  5. "github.com/crowdsecurity/crowdsec/pkg/models"
  6. log "github.com/sirupsen/logrus"
  7. "gopkg.in/tomb.v2"
  8. )
  9. /*
  10. PluginWatcher is here to allow grouping and threshold features for notification plugins :
  11. by frequency : it will signal the plugin to deliver notifications at this frequence (watchPluginTicker)
  12. by threshold : it will signal the plugin to deliver notifications when the number of alerts for this plugin reaches this threshold (watchPluginAlertCounts)
  13. */
  14. // TODO: When we start using go 1.18, consider moving this struct in some utils pkg. Make the implementation more generic using generics :)
  15. type alertCounterByPluginName struct {
  16. sync.Mutex
  17. data map[string]int
  18. }
  19. func newAlertCounterByPluginName() alertCounterByPluginName {
  20. return alertCounterByPluginName{
  21. data: make(map[string]int),
  22. }
  23. }
  24. func (acp *alertCounterByPluginName) Init() {
  25. acp.data = make(map[string]int)
  26. }
  27. func (acp *alertCounterByPluginName) Get(key string) (int, bool) {
  28. acp.Lock()
  29. val, ok := acp.data[key]
  30. acp.Unlock()
  31. return val, ok
  32. }
  33. func (acp *alertCounterByPluginName) Set(key string, val int) {
  34. acp.Lock()
  35. acp.data[key] = val
  36. acp.Unlock()
  37. }
  38. type PluginWatcher struct {
  39. PluginConfigByName map[string]PluginConfig
  40. AlertCountByPluginName alertCounterByPluginName
  41. PluginEvents chan string
  42. Inserts chan string
  43. tomb *tomb.Tomb
  44. }
  45. var DefaultEmptyTicker = time.Second * 1
  46. func (pw *PluginWatcher) Init(configs map[string]PluginConfig, alertsByPluginName map[string][]*models.Alert) {
  47. pw.PluginConfigByName = configs
  48. pw.PluginEvents = make(chan string)
  49. pw.AlertCountByPluginName = newAlertCounterByPluginName()
  50. pw.Inserts = make(chan string)
  51. for name := range alertsByPluginName {
  52. pw.AlertCountByPluginName.Set(name, 0)
  53. }
  54. }
  55. func (pw *PluginWatcher) Start(tomb *tomb.Tomb) {
  56. pw.tomb = tomb
  57. for name := range pw.PluginConfigByName {
  58. pname := name
  59. pw.tomb.Go(func() error {
  60. pw.watchPluginTicker(pname)
  61. return nil
  62. })
  63. }
  64. pw.tomb.Go(func() error {
  65. pw.watchPluginAlertCounts()
  66. return nil
  67. })
  68. }
  69. func (pw *PluginWatcher) watchPluginTicker(pluginName string) {
  70. var watchTime time.Duration
  71. var watchCount int = -1
  72. // Threshold can be set : by time, by count, or both
  73. // if only time is set, honor it
  74. // if only count is set, put timer to 1 second and just check size
  75. // if both are set, set timer to 1 second, but check size && time
  76. interval := pw.PluginConfigByName[pluginName].GroupWait
  77. threshold := pw.PluginConfigByName[pluginName].GroupThreshold
  78. //only size is set
  79. if threshold > 0 && interval == 0 {
  80. watchCount = threshold
  81. watchTime = DefaultEmptyTicker
  82. } else if interval != 0 && threshold == 0 {
  83. //only time is set
  84. watchTime = interval
  85. } else if interval != 0 && threshold != 0 {
  86. //both are set
  87. watchTime = DefaultEmptyTicker
  88. watchCount = threshold
  89. } else {
  90. //none are set, we sent every event we receive
  91. watchTime = DefaultEmptyTicker
  92. watchCount = 1
  93. }
  94. ticker := time.NewTicker(watchTime)
  95. var lastSend time.Time = time.Now()
  96. for {
  97. select {
  98. case <-ticker.C:
  99. send := false
  100. //if count threshold was set, honor no matter what
  101. if pc, _ := pw.AlertCountByPluginName.Get(pluginName); watchCount > 0 && pc >= watchCount {
  102. log.Tracef("[%s] %d alerts received, sending\n", pluginName, pc)
  103. send = true
  104. pw.AlertCountByPluginName.Set(pluginName, 0)
  105. }
  106. //if time threshold only was set
  107. if watchTime > 0 && watchTime == interval {
  108. log.Tracef("sending alerts to %s, duration %s elapsed", pluginName, interval)
  109. send = true
  110. }
  111. //if we hit timer because it was set low to honor count, check if we should trigger
  112. if watchTime == DefaultEmptyTicker && watchTime != interval && interval != 0 {
  113. if lastSend.Add(interval).Before(time.Now()) {
  114. log.Tracef("sending alerts to %s, duration %s elapsed", pluginName, interval)
  115. send = true
  116. lastSend = time.Now()
  117. }
  118. }
  119. if send {
  120. log.Tracef("sending alerts to %s", pluginName)
  121. pw.PluginEvents <- pluginName
  122. }
  123. case <-pw.tomb.Dying():
  124. ticker.Stop()
  125. return
  126. }
  127. }
  128. }
  129. func (pw *PluginWatcher) watchPluginAlertCounts() {
  130. for {
  131. select {
  132. case pluginName := <-pw.Inserts:
  133. //we only "count" pending alerts, and watchPluginTicker is actually going to send it
  134. if _, ok := pw.PluginConfigByName[pluginName]; ok {
  135. curr, _ := pw.AlertCountByPluginName.Get(pluginName)
  136. pw.AlertCountByPluginName.Set(pluginName, curr+1)
  137. }
  138. case <-pw.tomb.Dying():
  139. return
  140. }
  141. }
  142. }