|
@@ -195,8 +195,8 @@ func FromFactory(bucketFactory BucketFactory) *Leaky {
|
|
|
func LeakRoutine(leaky *Leaky) error {
|
|
|
|
|
|
var (
|
|
|
- durationTicker <-chan time.Time = make(<-chan time.Time)
|
|
|
- underflowTicker *time.Ticker
|
|
|
+ durationTicker <-chan time.Time = make(<-chan time.Time)
|
|
|
+ firstEvent bool = true
|
|
|
)
|
|
|
|
|
|
defer types.CatchPanic(fmt.Sprintf("crowdsec/LeakRoutine/%s", leaky.Name))
|
|
@@ -240,12 +240,19 @@ func LeakRoutine(leaky *Leaky) error {
|
|
|
|
|
|
leaky.Pour(leaky, *msg) // glue for now
|
|
|
//Clear cache on behalf of pour
|
|
|
- if underflowTicker != nil {
|
|
|
- underflowTicker.Stop()
|
|
|
+
|
|
|
+ // if durationTicker isn't initialized, then we're pouring our first event
|
|
|
+ if firstEvent {
|
|
|
+ durationTicker = time.NewTicker(leaky.Duration).C
|
|
|
+ }
|
|
|
+
|
|
|
+ // reinitialize the durationTicker when it's not a counter bucket
|
|
|
+ if !leaky.timedOverflow {
|
|
|
+ ticker := time.NewTicker(leaky.Duration)
|
|
|
+ durationTicker = ticker.C
|
|
|
+ defer ticker.Stop()
|
|
|
}
|
|
|
- underflowTicker = time.NewTicker(leaky.Duration)
|
|
|
- durationTicker = underflowTicker.C
|
|
|
- defer underflowTicker.Stop()
|
|
|
+ firstEvent = false
|
|
|
/*we overflowed*/
|
|
|
case ofw := <-leaky.Out:
|
|
|
leaky.overflow(ofw)
|