|
@@ -195,8 +195,9 @@ func FromFactory(bucketFactory BucketFactory) *Leaky {
|
|
|
func LeakRoutine(leaky *Leaky) error {
|
|
|
|
|
|
var (
|
|
|
- durationTicker <-chan time.Time = make(<-chan time.Time)
|
|
|
- firstEvent bool = true
|
|
|
+ durationTickerChan <-chan time.Time = make(<-chan time.Time)
|
|
|
+ durationTicker *time.Ticker
|
|
|
+ firstEvent bool = true
|
|
|
)
|
|
|
|
|
|
defer types.CatchPanic(fmt.Sprintf("crowdsec/LeakRoutine/%s", leaky.Name))
|
|
@@ -242,15 +243,16 @@ func LeakRoutine(leaky *Leaky) error {
|
|
|
//Clear cache on behalf of pour
|
|
|
|
|
|
// if durationTicker isn't initialized, then we're pouring our first event
|
|
|
- if firstEvent {
|
|
|
- durationTicker = time.NewTicker(leaky.Duration).C
|
|
|
- }
|
|
|
|
|
|
// reinitialize the durationTicker when it's not a counter bucket
|
|
|
- if !leaky.timedOverflow {
|
|
|
- ticker := time.NewTicker(leaky.Duration)
|
|
|
- durationTicker = ticker.C
|
|
|
- defer ticker.Stop()
|
|
|
+ if !leaky.timedOverflow || firstEvent {
|
|
|
+ if firstEvent {
|
|
|
+ durationTicker = time.NewTicker(leaky.Duration)
|
|
|
+ durationTickerChan = durationTicker.C
|
|
|
+ defer durationTicker.Stop()
|
|
|
+ } else {
|
|
|
+ durationTicker.Reset(leaky.Duration)
|
|
|
+ }
|
|
|
}
|
|
|
firstEvent = false
|
|
|
/*we overflowed*/
|
|
@@ -266,7 +268,7 @@ func LeakRoutine(leaky *Leaky) error {
|
|
|
leaky.logger.Tracef("Returning from leaky routine.")
|
|
|
return nil
|
|
|
/*we underflow or reach bucket deadline (timers)*/
|
|
|
- case <-durationTicker:
|
|
|
+ case <-durationTickerChan:
|
|
|
var (
|
|
|
alert types.RuntimeAlert
|
|
|
err error
|