crowdsec/pkg/leakybucket/timemachine.go
registergoofy 5b7ac4a473
[Rebased] fix races (#633)
* get rid of dead code
* have LeakRoutined started in a tomb
* fix race and multiple small issues in the way we handle tombs
* yet another race fix
* another race
* get rid of leaky.KillSwitch for proper tomb use
* fix deadlock
* empty overflow before exiting
* fix an obvious typo
* proper use of waitgroup
* have a smart signalisation for allowing LeakRoutine being killed
* ugly workaround
* fix lint error
* fix compilation
* fix panic
* shorten lock
* up lock both copy
* wait for crowdsec to die
* fix coding style and lint issue
* go mod tidy

Co-authored-by: bui <thibault@crowdsec.net>
2021-02-25 11:26:46 +01:00

53 lines
1.1 KiB
Go

package leakybucket
import (
"time"
"github.com/crowdsecurity/crowdsec/pkg/types"
"github.com/davecgh/go-spew/spew"
log "github.com/sirupsen/logrus"
)
func TimeMachinePour(l *Leaky, msg types.Event) {
var (
d time.Time
err error
)
if msg.MarshaledTime == "" {
log.Warningf("Trying to time-machine event without timestamp : %s", spew.Sdump(msg))
return
}
err = d.UnmarshalText([]byte(msg.MarshaledTime))
if err != nil {
log.Warningf("Failed unmarshaling event time (%s) : %v", msg.MarshaledTime, err)
return
}
l.Total_count += 1
l.mutex.Lock()
if l.First_ts.IsZero() {
l.logger.Debugf("First event, bucket creation time : %s", d)
l.First_ts = d
}
l.Last_ts = d
l.mutex.Unlock()
if l.Limiter.AllowN(d, 1) {
l.logger.Tracef("Time-Pouring event %s (tokens:%f)", d, l.Limiter.GetTokensCount())
l.Queue.Add(msg)
} else {
l.Ovflw_ts = d
l.logger.Debugf("Bucket overflow at %s", l.Ovflw_ts)
l.Queue.Add(msg)
l.Out <- l.Queue
}
}
func NewTimeMachine(g BucketFactory) *Leaky {
l := NewLeaky(g)
g.logger.Tracef("Instanciating timeMachine bucket")
l.Pour = TimeMachinePour
l.Mode = TIMEMACHINE
return l
}