123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357 |
- package leakybucket
- import (
- "encoding/json"
- "fmt"
- "io/ioutil"
- "math"
- "os"
- "time"
- "github.com/pkg/errors"
- "github.com/mohae/deepcopy"
- log "github.com/sirupsen/logrus"
- "github.com/antonmedv/expr"
- "github.com/crowdsecurity/crowdsec/pkg/exprhelpers"
- "github.com/crowdsecurity/crowdsec/pkg/types"
- "github.com/prometheus/client_golang/prometheus"
- )
- var serialized map[string]Leaky
- var BucketPourCache map[string][]types.Event
- var BucketPourTrack bool
- /*The leaky routines lifecycle are based on "real" time.
- But when we are running in time-machine mode, the reference time is in logs and not "real" time.
- Thus we need to garbage collect them to avoid a skyrocketing memory usage.*/
- func GarbageCollectBuckets(deadline time.Time, buckets *Buckets) error {
- buckets.wgPour.Wait()
- buckets.wgDumpState.Add(1)
- defer buckets.wgDumpState.Done()
- total := 0
- discard := 0
- toflush := []string{}
- buckets.Bucket_map.Range(func(rkey, rvalue interface{}) bool {
- key := rkey.(string)
- val := rvalue.(*Leaky)
- total += 1
- //bucket already overflowed, we can kill it
- if !val.Ovflw_ts.IsZero() {
- discard += 1
- val.logger.Debugf("overflowed at %s.", val.Ovflw_ts)
- toflush = append(toflush, key)
- val.tomb.Kill(nil)
- return true
- }
- /*FIXME : sometimes the gettokenscountat has some rounding issues when we try to
- match it with bucket capacity, even if the bucket has long due underflow. Round to 2 decimals*/
- tokat := val.Limiter.GetTokensCountAt(deadline)
- tokcapa := float64(val.Capacity)
- tokat = math.Round(tokat*100) / 100
- tokcapa = math.Round(tokcapa*100) / 100
- //bucket actually underflowed based on log time, but no in real time
- if tokat >= tokcapa {
- BucketsUnderflow.With(prometheus.Labels{"name": val.Name}).Inc()
- val.logger.Debugf("UNDERFLOW : first_ts:%s tokens_at:%f capcity:%f", val.First_ts, tokat, tokcapa)
- toflush = append(toflush, key)
- val.tomb.Kill(nil)
- return true
- } else {
- val.logger.Tracef("(%s) not dead, count:%f capacity:%f", val.First_ts, tokat, tokcapa)
- }
- if _, ok := serialized[key]; ok {
- log.Errorf("entry %s already exists", key)
- return false
- } else {
- log.Debugf("serialize %s of %s : %s", val.Name, val.Uuid, val.Mapkey)
- }
- return true
- })
- log.Infof("Cleaned %d buckets", len(toflush))
- for _, flushkey := range toflush {
- buckets.Bucket_map.Delete(flushkey)
- }
- return nil
- }
- func DumpBucketsStateAt(deadline time.Time, outputdir string, buckets *Buckets) (string, error) {
- //synchronize with PourItemtoHolders
- buckets.wgPour.Wait()
- buckets.wgDumpState.Add(1)
- defer buckets.wgDumpState.Done()
- if outputdir == "" {
- return "", fmt.Errorf("empty output dir for dump bucket state")
- }
- tmpFd, err := ioutil.TempFile(os.TempDir(), "crowdsec-buckets-dump-")
- if err != nil {
- return "", fmt.Errorf("failed to create temp file : %s", err)
- }
- defer tmpFd.Close()
- tmpFileName := tmpFd.Name()
- serialized = make(map[string]Leaky)
- log.Printf("Dumping buckets state at %s", deadline)
- total := 0
- discard := 0
- buckets.Bucket_map.Range(func(rkey, rvalue interface{}) bool {
- key := rkey.(string)
- val := rvalue.(*Leaky)
- total += 1
- if !val.Ovflw_ts.IsZero() {
- discard += 1
- val.logger.Debugf("overflowed at %s.", val.Ovflw_ts)
- return true
- }
- /*FIXME : sometimes the gettokenscountat has some rounding issues when we try to
- match it with bucket capacity, even if the bucket has long due underflow. Round to 2 decimals*/
- tokat := val.Limiter.GetTokensCountAt(deadline)
- tokcapa := float64(val.Capacity)
- tokat = math.Round(tokat*100) / 100
- tokcapa = math.Round(tokcapa*100) / 100
- if tokat >= tokcapa {
- BucketsUnderflow.With(prometheus.Labels{"name": val.Name}).Inc()
- val.logger.Debugf("UNDERFLOW : first_ts:%s tokens_at:%f capcity:%f", val.First_ts, tokat, tokcapa)
- discard += 1
- return true
- } else {
- val.logger.Debugf("(%s) not dead, count:%f capacity:%f", val.First_ts, tokat, tokcapa)
- }
- if _, ok := serialized[key]; ok {
- log.Errorf("entry %s already exists", key)
- return false
- } else {
- log.Debugf("serialize %s of %s : %s", val.Name, val.Uuid, val.Mapkey)
- }
- val.SerializedState = val.Limiter.Dump()
- serialized[key] = *val
- return true
- })
- bbuckets, err := json.MarshalIndent(serialized, "", " ")
- if err != nil {
- log.Fatalf("Failed to unmarshal buckets : %s", err)
- }
- size, err := tmpFd.Write(bbuckets)
- if err != nil {
- return "", fmt.Errorf("failed to write temp file : %s", err)
- }
- log.Infof("Serialized %d live buckets (+%d expired) in %d bytes to %s", len(serialized), discard, size, tmpFd.Name())
- serialized = nil
- return tmpFileName, nil
- }
- func ShutdownAllBuckets(buckets *Buckets) error {
- buckets.Bucket_map.Range(func(rkey, rvalue interface{}) bool {
- key := rkey.(string)
- val := rvalue.(*Leaky)
- val.tomb.Kill(nil)
- log.Infof("killed %s", key)
- return true
- })
- return nil
- }
- func PourItemToBucket(bucket *Leaky, holder BucketFactory, buckets *Buckets, parsed types.Event) (bool, error) {
- var sent bool
- var buckey = bucket.Mapkey
- var err error
- sigclosed := 0
- failed_sent := 0
- attempts := 0
- start := time.Now().UTC()
- for !sent {
- attempts += 1
- /* Warn the user if we used more than a 100 ms to pour an event, it's at least an half lock*/
- if attempts%100000 == 0 && start.Add(100*time.Millisecond).Before(time.Now().UTC()) {
- holder.logger.Warningf("stuck for %s sending event to %s (sigclosed:%d failed_sent:%d attempts:%d)", time.Since(start),
- buckey, sigclosed, failed_sent, attempts)
- }
- /* check if leak routine is up */
- select {
- case _, ok := <-bucket.Signal:
- if !ok {
- //the bucket was found and dead, get a new one and continue
- bucket.logger.Tracef("Bucket %s found dead, cleanup the body", buckey)
- buckets.Bucket_map.Delete(buckey)
- sigclosed += 1
- bucket, err = LoadOrStoreBucketFromHolder(buckey, buckets, holder, parsed.ExpectMode)
- if err != nil {
- return false, err
- }
- continue
- }
- holder.logger.Tracef("Signal exists, try to pour :)")
- default:
- /*nothing to read, but not closed, try to pour */
- holder.logger.Tracef("Signal exists but empty, try to pour :)")
- }
- /*let's see if this time-bucket should have expired */
- if bucket.Mode == TIMEMACHINE {
- bucket.mutex.Lock()
- firstTs := bucket.First_ts
- lastTs := bucket.Last_ts
- bucket.mutex.Unlock()
- if !firstTs.IsZero() {
- var d time.Time
- err = d.UnmarshalText([]byte(parsed.MarshaledTime))
- if err != nil {
- holder.logger.Warningf("Failed unmarshaling event time (%s) : %v", parsed.MarshaledTime, err)
- }
- if d.After(lastTs.Add(bucket.Duration)) {
- bucket.logger.Tracef("bucket is expired (curr event: %s, bucket deadline: %s), kill", d, lastTs.Add(bucket.Duration))
- buckets.Bucket_map.Delete(buckey)
- //not sure about this, should we create a new one ?
- sigclosed += 1
- bucket, err = LoadOrStoreBucketFromHolder(buckey, buckets, holder, parsed.ExpectMode)
- if err != nil {
- return false, err
- }
- continue
- }
- }
- }
- /*the bucket seems to be up & running*/
- select {
- case bucket.In <- parsed:
- holder.logger.Tracef("Successfully sent !")
- if BucketPourTrack {
- if _, ok := BucketPourCache[bucket.Name]; !ok {
- BucketPourCache[bucket.Name] = make([]types.Event, 0)
- }
- evt := deepcopy.Copy(parsed)
- BucketPourCache[bucket.Name] = append(BucketPourCache[bucket.Name], evt.(types.Event))
- }
- sent = true
- continue
- default:
- failed_sent += 1
- holder.logger.Tracef("Failed to send, try again")
- continue
- }
- }
- holder.logger.Debugf("bucket '%s' is poured", holder.Name)
- return sent, nil
- }
- func LoadOrStoreBucketFromHolder(partitionKey string, buckets *Buckets, holder BucketFactory, expectMode int) (*Leaky, error) {
- biface, ok := buckets.Bucket_map.Load(partitionKey)
- /* the bucket doesn't exist, create it !*/
- if !ok {
- var fresh_bucket *Leaky
- switch expectMode {
- case TIMEMACHINE:
- fresh_bucket = NewTimeMachine(holder)
- holder.logger.Debugf("Creating TimeMachine bucket")
- case LIVE:
- fresh_bucket = NewLeaky(holder)
- holder.logger.Debugf("Creating Live bucket")
- default:
- return nil, fmt.Errorf("input event has no expected mode : %+v", expectMode)
- }
- fresh_bucket.In = make(chan types.Event)
- fresh_bucket.Mapkey = partitionKey
- fresh_bucket.Signal = make(chan bool, 1)
- actual, stored := buckets.Bucket_map.LoadOrStore(partitionKey, fresh_bucket)
- if !stored {
- holder.tomb.Go(func() error {
- return LeakRoutine(fresh_bucket)
- })
- biface = fresh_bucket
- //once the created goroutine is ready to process event, we can return it
- <-fresh_bucket.Signal
- } else {
- holder.logger.Debugf("Unexpectedly found exisint bucket for %s", partitionKey)
- biface = actual
- }
- holder.logger.Debugf("Created new bucket %s", partitionKey)
- }
- return biface.(*Leaky), nil
- }
- func PourItemToHolders(parsed types.Event, holders []BucketFactory, buckets *Buckets) (bool, error) {
- var (
- ok, condition, poured bool
- )
- if BucketPourTrack {
- if BucketPourCache == nil {
- BucketPourCache = make(map[string][]types.Event)
- }
- if _, ok := BucketPourCache["OK"]; !ok {
- BucketPourCache["OK"] = make([]types.Event, 0)
- }
- evt := deepcopy.Copy(parsed)
- BucketPourCache["OK"] = append(BucketPourCache["OK"], evt.(types.Event))
- }
- //find the relevant holders (scenarios)
- for idx, holder := range holders {
- //evaluate bucket's condition
- if holder.RunTimeFilter != nil {
- holder.logger.Tracef("event against holder %d/%d", idx, len(holders))
- output, err := expr.Run(holder.RunTimeFilter, exprhelpers.GetExprEnv(map[string]interface{}{"evt": &parsed}))
- if err != nil {
- holder.logger.Errorf("failed parsing : %v", err)
- return false, fmt.Errorf("leaky failed : %s", err)
- }
- // we assume we a bool should add type check here
- if condition, ok = output.(bool); !ok {
- holder.logger.Errorf("unexpected non-bool return : %T", output)
- holder.logger.Fatalf("Filter issue")
- }
- if holder.Debug {
- holder.ExprDebugger.Run(holder.logger, condition, exprhelpers.GetExprEnv(map[string]interface{}{"evt": &parsed}))
- }
- if !condition {
- holder.logger.Debugf("Event leaving node : ko (filter mismatch)")
- continue
- }
- }
- //groupby determines the partition key for the specific bucket
- var groupby string
- if holder.RunTimeGroupBy != nil {
- tmpGroupBy, err := expr.Run(holder.RunTimeGroupBy, exprhelpers.GetExprEnv(map[string]interface{}{"evt": &parsed}))
- if err != nil {
- holder.logger.Errorf("failed groupby : %v", err)
- return false, errors.New("leaky failed :/")
- }
- if groupby, ok = tmpGroupBy.(string); !ok {
- holder.logger.Fatalf("failed groupby type : %v", err)
- return false, errors.New("groupby wrong type")
- }
- }
- buckey := GetKey(holder, groupby)
- //we need to either find the existing bucket, or create a new one (if it's the first event to hit it for this partition key)
- bucket, err := LoadOrStoreBucketFromHolder(buckey, buckets, holder, parsed.ExpectMode)
- if err != nil {
- return false, errors.Wrap(err, "failed to load or store bucket")
- }
- //finally, pour the even into the bucket
- ok, err := PourItemToBucket(bucket, holder, buckets, parsed)
- if err != nil {
- return false, errors.Wrap(err, "failed to pour bucket")
- }
- if ok {
- poured = true
- }
- }
- return poured, nil
- }
|