bucket.go 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354
  1. package leakybucket
  2. import (
  3. "fmt"
  4. "sync"
  5. "sync/atomic"
  6. "time"
  7. //"log"
  8. "github.com/crowdsecurity/crowdsec/pkg/time/rate"
  9. "github.com/crowdsecurity/crowdsec/pkg/types"
  10. "github.com/goombaio/namegenerator"
  11. "gopkg.in/tomb.v2"
  12. //rate "time/rate"
  13. "github.com/davecgh/go-spew/spew"
  14. "github.com/prometheus/client_golang/prometheus"
  15. log "github.com/sirupsen/logrus"
  16. //"golang.org/x/time/rate"
  17. )
  18. const (
  19. LIVE = iota
  20. TIMEMACHINE
  21. )
  22. //Leaky represents one instance of a bucket
  23. type Leaky struct {
  24. Name string
  25. Mode int //LIVE or TIMEMACHINE
  26. //the limiter is what holds the proper "leaky aspect", it determines when/if we can pour objects
  27. Limiter rate.RateLimiter `json:"-"`
  28. SerializedState rate.Lstate
  29. //Queue is used to held the cache of objects in the bucket, it is used to know 'how many' objects we have in buffer.
  30. Queue *Queue
  31. //Leaky buckets are receiving message through a chan
  32. In chan types.Event `json:"-"`
  33. //Leaky buckets are pushing their overflows through a chan
  34. Out chan *Queue `json:"-"`
  35. // shared for all buckets (the idea is to kill this afterwards)
  36. AllOut chan types.Event `json:"-"`
  37. //max capacity (for burst)
  38. Capacity int
  39. //CacheRatio is the number of elements that should be kept in memory (compared to capacity)
  40. CacheSize int
  41. //the unique identifier of the bucket (a hash)
  42. Mapkey string
  43. // chan for signaling
  44. Signal chan bool `json:"-"`
  45. Suicide chan bool `json:"-"`
  46. Reprocess bool
  47. Simulated bool
  48. Uuid string
  49. First_ts time.Time
  50. Last_ts time.Time
  51. Ovflw_ts time.Time
  52. Total_count int
  53. Leakspeed time.Duration
  54. BucketConfig *BucketFactory
  55. Duration time.Duration
  56. Pour func(*Leaky, types.Event) `json:"-"`
  57. //Profiling when set to true enables profiling of bucket
  58. Profiling bool
  59. timedOverflow bool
  60. logger *log.Entry
  61. scopeType types.ScopeType
  62. hash string
  63. scenarioVersion string
  64. tomb *tomb.Tomb
  65. wgPour *sync.WaitGroup
  66. wgDumpState *sync.WaitGroup
  67. mutex *sync.Mutex //used only for TIMEMACHINE mode to allow garbage collection without races
  68. }
  69. var BucketsPour = prometheus.NewCounterVec(
  70. prometheus.CounterOpts{
  71. Name: "cs_bucket_poured_total",
  72. Help: "Total events were poured in bucket.",
  73. },
  74. []string{"source", "type", "name"},
  75. )
  76. var BucketsOverflow = prometheus.NewCounterVec(
  77. prometheus.CounterOpts{
  78. Name: "cs_bucket_overflowed_total",
  79. Help: "Total buckets overflowed.",
  80. },
  81. []string{"name"},
  82. )
  83. var BucketsCanceled = prometheus.NewCounterVec(
  84. prometheus.CounterOpts{
  85. Name: "cs_bucket_canceled_total",
  86. Help: "Total buckets canceled.",
  87. },
  88. []string{"name"},
  89. )
  90. var BucketsUnderflow = prometheus.NewCounterVec(
  91. prometheus.CounterOpts{
  92. Name: "cs_bucket_underflowed_total",
  93. Help: "Total buckets underflowed.",
  94. },
  95. []string{"name"},
  96. )
  97. var BucketsInstanciation = prometheus.NewCounterVec(
  98. prometheus.CounterOpts{
  99. Name: "cs_bucket_created_total",
  100. Help: "Total buckets were instanciated.",
  101. },
  102. []string{"name"},
  103. )
  104. var BucketsCurrentCount = prometheus.NewGaugeVec(
  105. prometheus.GaugeOpts{
  106. Name: "cs_buckets",
  107. Help: "Number of buckets that currently exist.",
  108. },
  109. []string{"name"},
  110. )
  111. var LeakyRoutineCount int64
  112. // Newleaky creates a new leaky bucket from a BucketFactory
  113. // Events created by the bucket (overflow, bucket empty) are sent to a chan defined by BucketFactory
  114. // The leaky bucket implementation is based on rate limiter (see https://godoc.org/golang.org/x/time/rate)
  115. // There's a trick to have an event said when the bucket gets empty to allow its destruction
  116. func NewLeaky(bucketFactory BucketFactory) *Leaky {
  117. bucketFactory.logger.Tracef("Instantiating live bucket %s", bucketFactory.Name)
  118. return FromFactory(bucketFactory)
  119. }
  120. func FromFactory(bucketFactory BucketFactory) *Leaky {
  121. var limiter rate.RateLimiter
  122. //golang rate limiter. It's mainly intended for http rate limiter
  123. Qsize := bucketFactory.Capacity
  124. if bucketFactory.CacheSize > 0 {
  125. //cache is smaller than actual capacity
  126. if bucketFactory.CacheSize <= bucketFactory.Capacity {
  127. Qsize = bucketFactory.CacheSize
  128. //bucket might be counter (infinite size), allow cache limitation
  129. } else if bucketFactory.Capacity == -1 {
  130. Qsize = bucketFactory.CacheSize
  131. }
  132. }
  133. if bucketFactory.Capacity == -1 {
  134. //In this case we allow all events to pass.
  135. //maybe in the future we could avoid using a limiter
  136. limiter = &rate.AlwaysFull{}
  137. } else {
  138. limiter = rate.NewLimiter(rate.Every(bucketFactory.leakspeed), bucketFactory.Capacity)
  139. }
  140. BucketsInstanciation.With(prometheus.Labels{"name": bucketFactory.Name}).Inc()
  141. //create the leaky bucket per se
  142. l := &Leaky{
  143. Name: bucketFactory.Name,
  144. Limiter: limiter,
  145. Uuid: namegenerator.NewNameGenerator(time.Now().UTC().UnixNano()).Generate(),
  146. Queue: NewQueue(Qsize),
  147. CacheSize: bucketFactory.CacheSize,
  148. Out: make(chan *Queue, 1),
  149. Suicide: make(chan bool, 1),
  150. AllOut: bucketFactory.ret,
  151. Capacity: bucketFactory.Capacity,
  152. Leakspeed: bucketFactory.leakspeed,
  153. BucketConfig: &bucketFactory,
  154. Pour: Pour,
  155. Reprocess: bucketFactory.Reprocess,
  156. Profiling: bucketFactory.Profiling,
  157. Mode: LIVE,
  158. scopeType: bucketFactory.ScopeType,
  159. scenarioVersion: bucketFactory.ScenarioVersion,
  160. hash: bucketFactory.hash,
  161. Simulated: bucketFactory.Simulated,
  162. tomb: bucketFactory.tomb,
  163. wgPour: bucketFactory.wgPour,
  164. wgDumpState: bucketFactory.wgDumpState,
  165. mutex: &sync.Mutex{},
  166. }
  167. if l.BucketConfig.Capacity > 0 && l.BucketConfig.leakspeed != time.Duration(0) {
  168. l.Duration = time.Duration(l.BucketConfig.Capacity+1) * l.BucketConfig.leakspeed
  169. }
  170. if l.BucketConfig.duration != time.Duration(0) {
  171. l.Duration = l.BucketConfig.duration
  172. l.timedOverflow = true
  173. }
  174. return l
  175. }
  176. /* for now mimic a leak routine */
  177. //LeakRoutine us the life of a bucket. It dies when the bucket underflows or overflows
  178. func LeakRoutine(leaky *Leaky) error {
  179. var (
  180. durationTicker <-chan time.Time = make(<-chan time.Time)
  181. )
  182. defer types.CatchPanic(fmt.Sprintf("crowdsec/LeakRoutine/%s", leaky.Name))
  183. BucketsCurrentCount.With(prometheus.Labels{"name": leaky.Name}).Inc()
  184. defer BucketsCurrentCount.With(prometheus.Labels{"name": leaky.Name}).Dec()
  185. /*todo : we create a logger at runtime while we want leakroutine to be up asap, might not be a good idea*/
  186. leaky.logger = leaky.BucketConfig.logger.WithFields(log.Fields{"capacity": leaky.Capacity, "partition": leaky.Mapkey, "bucket_id": leaky.Uuid})
  187. leaky.Signal <- true
  188. atomic.AddInt64(&LeakyRoutineCount, 1)
  189. defer atomic.AddInt64(&LeakyRoutineCount, -1)
  190. for _, f := range leaky.BucketConfig.processors {
  191. err := f.OnBucketInit(leaky.BucketConfig)
  192. if err != nil {
  193. leaky.logger.Errorf("Problem at bucket initializiation. Bail out %T : %v", f, err)
  194. close(leaky.Signal)
  195. return fmt.Errorf("Problem at bucket initializiation. Bail out %T : %v", f, err)
  196. }
  197. }
  198. leaky.logger.Debugf("Leaky routine starting, lifetime : %s", leaky.Duration)
  199. for {
  200. select {
  201. /*receiving an event*/
  202. case msg := <-leaky.In:
  203. /*the msg var use is confusing and is redeclared in a different type :/*/
  204. for _, processor := range leaky.BucketConfig.processors {
  205. msg := processor.OnBucketPour(leaky.BucketConfig)(msg, leaky)
  206. // if &msg == nil we stop processing
  207. if msg == nil {
  208. goto End
  209. }
  210. }
  211. if leaky.logger.Level >= log.TraceLevel {
  212. leaky.logger.Tracef("Pour event: %s", spew.Sdump(msg))
  213. }
  214. BucketsPour.With(prometheus.Labels{"name": leaky.Name, "source": msg.Line.Src, "type": msg.Line.Module}).Inc()
  215. leaky.Pour(leaky, msg) // glue for now
  216. //Clear cache on behalf of pour
  217. tmp := time.NewTicker(leaky.Duration)
  218. durationTicker = tmp.C
  219. defer tmp.Stop()
  220. /*we overflowed*/
  221. case ofw := <-leaky.Out:
  222. leaky.overflow(ofw)
  223. return nil
  224. /*suiciiiide*/
  225. case <-leaky.Suicide:
  226. close(leaky.Signal)
  227. BucketsCanceled.With(prometheus.Labels{"name": leaky.Name}).Inc()
  228. leaky.logger.Debugf("Suicide triggered")
  229. leaky.AllOut <- types.Event{Type: types.OVFLW, Overflow: types.RuntimeAlert{Mapkey: leaky.Mapkey}}
  230. leaky.logger.Tracef("Returning from leaky routine.")
  231. return nil
  232. /*we underflow or reach bucket deadline (timers)*/
  233. case <-durationTicker:
  234. var (
  235. alert types.RuntimeAlert
  236. err error
  237. )
  238. leaky.Ovflw_ts = time.Now().UTC()
  239. close(leaky.Signal)
  240. ofw := leaky.Queue
  241. alert = types.RuntimeAlert{Mapkey: leaky.Mapkey}
  242. if leaky.timedOverflow {
  243. BucketsOverflow.With(prometheus.Labels{"name": leaky.Name}).Inc()
  244. alert, err = NewAlert(leaky, ofw)
  245. if err != nil {
  246. log.Errorf("%s", err)
  247. }
  248. for _, f := range leaky.BucketConfig.processors {
  249. alert, ofw = f.OnBucketOverflow(leaky.BucketConfig)(leaky, alert, ofw)
  250. if ofw == nil {
  251. leaky.logger.Debugf("Overflow has been discarded (%T)", f)
  252. break
  253. }
  254. }
  255. leaky.logger.Infof("Timed Overflow")
  256. } else {
  257. leaky.logger.Debugf("bucket underflow, destroy")
  258. BucketsUnderflow.With(prometheus.Labels{"name": leaky.Name}).Inc()
  259. }
  260. if leaky.logger.Level >= log.TraceLevel {
  261. /*don't sdump if it's not going to printed, it's expensive*/
  262. leaky.logger.Tracef("Overflow event: %s", spew.Sdump(types.Event{Overflow: alert}))
  263. }
  264. leaky.AllOut <- types.Event{Overflow: alert, Type: types.OVFLW}
  265. leaky.logger.Tracef("Returning from leaky routine.")
  266. return nil
  267. case <-leaky.tomb.Dying():
  268. leaky.logger.Debugf("Bucket externally killed, return")
  269. for len(leaky.Out) > 0 {
  270. ofw := <-leaky.Out
  271. leaky.overflow(ofw)
  272. }
  273. leaky.AllOut <- types.Event{Type: types.OVFLW, Overflow: types.RuntimeAlert{Mapkey: leaky.Mapkey}}
  274. return nil
  275. }
  276. End:
  277. }
  278. }
  279. func Pour(leaky *Leaky, msg types.Event) {
  280. leaky.wgDumpState.Wait()
  281. leaky.wgPour.Add(1)
  282. defer leaky.wgPour.Done()
  283. leaky.Total_count += 1
  284. if leaky.First_ts.IsZero() {
  285. leaky.First_ts = time.Now().UTC()
  286. }
  287. leaky.Last_ts = time.Now().UTC()
  288. if leaky.Limiter.Allow() {
  289. leaky.Queue.Add(msg)
  290. } else {
  291. leaky.Ovflw_ts = time.Now().UTC()
  292. leaky.logger.Debugf("Last event to be poured, bucket overflow.")
  293. leaky.Queue.Add(msg)
  294. leaky.Out <- leaky.Queue
  295. }
  296. }
  297. func (leaky *Leaky) overflow(ofw *Queue) {
  298. close(leaky.Signal)
  299. alert, err := NewAlert(leaky, ofw)
  300. if err != nil {
  301. log.Errorf("%s", err)
  302. }
  303. leaky.logger.Tracef("Overflow hooks time : %v", leaky.BucketConfig.processors)
  304. for _, f := range leaky.BucketConfig.processors {
  305. alert, ofw = f.OnBucketOverflow(leaky.BucketConfig)(leaky, alert, ofw)
  306. if ofw == nil {
  307. leaky.logger.Debugf("Overflow has been discarded (%T)", f)
  308. break
  309. }
  310. }
  311. if leaky.logger.Level >= log.TraceLevel {
  312. leaky.logger.Tracef("Overflow event: %s", spew.Sdump(types.RuntimeAlert(alert)))
  313. }
  314. mt, _ := leaky.Ovflw_ts.MarshalText()
  315. leaky.logger.Tracef("overflow time : %s", mt)
  316. BucketsOverflow.With(prometheus.Labels{"name": leaky.Name}).Inc()
  317. leaky.AllOut <- types.Event{Overflow: alert, Type: types.OVFLW, MarshaledTime: string(mt)}
  318. }