123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604 |
- package leakybucket
- import (
- "encoding/json"
- "errors"
- "fmt"
- "io"
- "io/ioutil"
- "math"
- "os"
- "path/filepath"
- "strings"
- "time"
- "github.com/crowdsecurity/crowdsec/pkg/cwversion"
- "github.com/crowdsecurity/crowdsec/pkg/types"
- "github.com/davecgh/go-spew/spew"
- "github.com/prometheus/client_golang/prometheus"
- log "github.com/sirupsen/logrus"
- "github.com/antonmedv/expr"
- "github.com/antonmedv/expr/vm"
- "github.com/goombaio/namegenerator"
- yaml "gopkg.in/yaml.v2"
- "github.com/crowdsecurity/crowdsec/pkg/exprhelpers"
- )
- // BucketFactory struct holds all fields for any bucket configuration. This is to have a
- // generic struct for buckets. This can be seen as a bucket factory.
- type BucketFactory struct {
- FormatVersion string `yaml:"format"`
- Author string `yaml:"author"`
- Description string `yaml:"description"`
- References []string `yaml:"references"`
- Type string `yaml:"type"` //Type can be : leaky, counter, trigger. It determines the main bucket characteristics
- Name string `yaml:"name"` //Name of the bucket, used later in log and user-messages. Should be unique
- Capacity int `yaml:"capacity"` //Capacity is applicable to leaky buckets and determines the "burst" capacity
- LeakSpeed string `yaml:"leakspeed"` //Leakspeed is a float representing how many events per second leak out of the bucket
- Duration string `yaml:"duration"` //Duration allows 'counter' buckets to have a fixed life-time
- Filter string `yaml:"filter"` //Filter is an expr that determines if an event is elligible for said bucket. Filter is evaluated against the Event struct
- GroupBy string `yaml:"groupby,omitempty"` //groupy is an expr that allows to determine the partitions of the bucket. A common example is the source_ip
- Distinct string `yaml:"distinct"` //Distinct, when present, adds a `Pour()` processor that will only pour uniq items (based on uniq_filter expr result)
- Debug bool `yaml:"debug"` //Debug, when set to true, will enable debugging for _this_ scenario specifically
- Labels map[string]string `yaml:"labels"` //Labels is K:V list aiming at providing context the overflow
- Blackhole string `yaml:"blackhole,omitempty"` //Blackhole is a duration that, if present, will prevent same bucket partition to overflow more often than $duration
- logger *log.Entry `yaml:"-"` //logger is bucket-specific logger (used by Debug as well)
- Reprocess bool `yaml:"reprocess"` //Reprocess, if true, will for the bucket to be re-injected into processing chain
- CacheSize int `yaml:"cache_size"` //CacheSize, if > 0, limits the size of in-memory cache of the bucket
- Profiling bool `yaml:"profiling"` //Profiling, if true, will make the bucket record pours/overflows/etc.
- OverflowFilter string `yaml:"overflow_filter"` //OverflowFilter if present, is a filter that must return true for the overflow to go through
- BucketName string `yaml:"-"`
- Filename string `yaml:"-"`
- RunTimeFilter *vm.Program `json:"-"`
- RunTimeGroupBy *vm.Program `json:"-"`
- Data []*types.DataSource `yaml:"data,omitempty"`
- leakspeed time.Duration //internal representation of `Leakspeed`
- duration time.Duration //internal representation of `Duration`
- ret chan types.Event //the bucket-specific output chan for overflows
- processors []Processor //processors is the list of hooks for pour/overflow/create (cf. uniq, blackhole etc.)
- output bool //??
- }
- func ValidateFactory(b *BucketFactory) error {
- if b.Name == "" {
- return fmt.Errorf("bucket must have name")
- }
- if b.Description == "" {
- return fmt.Errorf("description is mandatory")
- }
- if b.Type == "leaky" {
- if b.Capacity <= 0 { //capacity must be a positive int
- return fmt.Errorf("bad capacity for leaky '%d'", b.Capacity)
- }
- if b.LeakSpeed == "" {
- return fmt.Errorf("leakspeed can't be empty for leaky")
- }
- if b.leakspeed == 0 {
- return fmt.Errorf("bad leakspeed for leaky '%s'", b.LeakSpeed)
- }
- } else if b.Type == "counter" {
- if b.Duration == "" {
- return fmt.Errorf("duration ca't be empty for counter")
- }
- if b.duration == 0 {
- return fmt.Errorf("bad duration for counter bucket '%d'", b.duration)
- }
- if b.Capacity != -1 {
- return fmt.Errorf("counter bucket must have -1 capacity")
- }
- } else if b.Type == "trigger" {
- if b.Capacity != 0 {
- return fmt.Errorf("trigger bucket must have 0 capacity")
- }
- } else {
- return fmt.Errorf("unknown bucket type '%s'", b.Type)
- }
- return nil
- }
- /* Init recursively process yaml files from a directory and loads them as BucketFactory */
- func Init(cfg map[string]string) ([]BucketFactory, chan types.Event, error) {
- return LoadBucketDir(cfg["patterns"], cfg["data"])
- }
- func LoadBuckets(files []string, dataFolder string) ([]BucketFactory, chan types.Event, error) {
- var (
- ret []BucketFactory = []BucketFactory{}
- response chan types.Event
- )
- var seed namegenerator.Generator = namegenerator.NewNameGenerator(time.Now().UTC().UnixNano())
- err := exprhelpers.Init()
- if err != nil {
- return nil, nil, err
- }
- response = make(chan types.Event, 1)
- for _, f := range files {
- log.Debugf("Loading %s", f)
- if !strings.HasSuffix(f, ".yaml") {
- log.Debugf("Skipping %s : not a yaml file", f)
- continue
- }
- //process the yaml
- bucketConfigurationFile, err := os.Open(f)
- if err != nil {
- log.Errorf("Can't access leaky configuration file %s", f)
- return nil, nil, err
- }
- dec := yaml.NewDecoder(bucketConfigurationFile)
- dec.SetStrict(true)
- for {
- g := BucketFactory{}
- err = dec.Decode(&g)
- if err != nil {
- if err == io.EOF {
- log.Tracef("End of yaml file")
- break
- } else {
- log.Errorf("Bad yaml in %s : %v", f, err)
- return nil, nil, fmt.Errorf("bad yaml in %s : %v", f, err)
- }
- }
- //check empty
- if g.Name == "" {
- log.Errorf("Won't load nameless bucket")
- return nil, nil, fmt.Errorf("nameless bucket")
- }
- //check compat
- if g.FormatVersion == "" {
- log.Debugf("no version in %s : %s, assuming '1.0'", g.Name, f)
- g.FormatVersion = "1.0"
- }
- ok, err := cwversion.Statisfies(g.FormatVersion, cwversion.Constraint_scenario)
- if err != nil {
- log.Fatalf("Failed to check version : %s", err)
- }
- if !ok {
- log.Errorf("can't load %s : %s doesn't satisfy scenario format %s, skip", g.Name, g.FormatVersion, cwversion.Constraint_scenario)
- continue
- }
- g.Filename = filepath.Clean(f)
- g.BucketName = seed.Generate()
- g.ret = response
- err = LoadBucket(&g, dataFolder)
- if err != nil {
- log.Errorf("Failed to load bucket : %v", err)
- return nil, nil, fmt.Errorf("loadBucket failed : %v", err)
- }
- ret = append(ret, g)
- }
- }
- log.Warningf("Loaded %d scenarios", len(ret))
- return ret, response, nil
- }
- func LoadBucketDir(dir string, dataFolder string) ([]BucketFactory, chan types.Event, error) {
- var (
- filenames []string
- )
- files, err := ioutil.ReadDir(dir)
- if err != nil {
- return nil, nil, err
- }
- for _, f := range files {
- filenames = append(filenames, dir+f.Name())
- }
- return LoadBuckets(filenames, dataFolder)
- }
- /* Init recursively process yaml files from a directory and loads them as BucketFactory */
- func LoadBucket(g *BucketFactory, dataFolder string) error {
- var err error
- if g.Debug {
- var clog = logrus.New()
- if err := types.ConfigureLogger(clog); err != nil {
- log.Fatalf("While creating bucket-specific logger : %s", err)
- }
- me
- clog.SetLevel(log.DebugLevel)
- g.logger = clog.WithFields(log.Fields{
- "cfg": g.BucketName,
- "name": g.Name,
- "file": g.Filename,
- })
- } else {
- /* else bind it to the default one (might find something more elegant here)*/
- g.logger = log.WithFields(log.Fields{
- "cfg": g.BucketName,
- "name": g.Name,
- "file": g.Filename,
- })
- }
- if g.LeakSpeed != "" {
- if g.leakspeed, err = time.ParseDuration(g.LeakSpeed); err != nil {
- return fmt.Errorf("bad leakspeed '%s' in %s : %v", g.LeakSpeed, g.Filename, err)
- }
- } else {
- g.leakspeed = time.Duration(0)
- }
- if g.Duration != "" {
- if g.duration, err = time.ParseDuration(g.Duration); err != nil {
- return fmt.Errorf("invalid Duration '%s' in %s : %v", g.Duration, g.Filename, err)
- }
- }
- if g.Filter == "" {
- g.logger.Warningf("Bucket without filter, abort.")
- return fmt.Errorf("bucket without filter directive")
- }
- g.RunTimeFilter, err = expr.Compile(g.Filter, expr.Env(exprhelpers.GetExprEnv(map[string]interface{}{"evt": &types.Event{}})))
- if err != nil {
- return fmt.Errorf("invalid filter '%s' in %s : %v", g.Filter, g.Filename, err)
- }
- if g.GroupBy != "" {
- g.RunTimeGroupBy, err = expr.Compile(g.GroupBy, expr.Env(exprhelpers.GetExprEnv(map[string]interface{}{"evt": &types.Event{}})))
- if err != nil {
- return fmt.Errorf("invalid groupby '%s' in %s : %v", g.GroupBy, g.Filename, err)
- }
- }
- g.logger.Infof("Adding %s bucket", g.Type)
- //return the Holder correponding to the type of bucket
- g.processors = []Processor{}
- switch g.Type {
- case "leaky":
- g.processors = append(g.processors, &DumbProcessor{})
- case "trigger":
- g.processors = append(g.processors, &Trigger{})
- case "counter":
- g.processors = append(g.processors, &DumbProcessor{})
- default:
- return fmt.Errorf("invalid type '%s' in %s : %v", g.Type, g.Filename, err)
- }
- if g.Distinct != "" {
- g.logger.Debugf("Adding a non duplicate filter on %s.", g.Name)
- g.processors = append(g.processors, &Uniq{})
- }
- if g.OverflowFilter != "" {
- g.logger.Debugf("Adding an overflow filter")
- filovflw, err := NewOverflowFilter(g)
- if err != nil {
- g.logger.Errorf("Error creating overflow_filter : %s", err)
- return fmt.Errorf("error creating overflow_filter : %s", err)
- }
- g.processors = append(g.processors, filovflw)
- }
- if g.Blackhole != "" {
- g.logger.Debugf("Adding blackhole.")
- blackhole, err := NewBlackhole(g)
- if err != nil {
- g.logger.Errorf("Error creating blackhole : %s", err)
- return fmt.Errorf("error creating blackhole : %s", err)
- }
- g.processors = append(g.processors, blackhole)
- }
- if len(g.Data) > 0 {
- for _, data := range g.Data {
- err = exprhelpers.FileInit(dataFolder, data.DestPath, data.Type)
- if err != nil {
- g.logger.Errorf("unable to init data for file '%s': %s", data.Filename, err.Error())
- }
- }
- }
- g.output = false
- if err := ValidateFactory(g); err != nil {
- return fmt.Errorf("invalid bucket from %s : %v", g.Filename, err)
- }
- return nil
- }
- func LoadBucketsState(file string, buckets *Buckets, holders []BucketFactory) error {
- var state map[string]Leaky
- body, err := ioutil.ReadFile(file)
- if err != nil {
- return fmt.Errorf("can't state file %s : %s", file, err)
- }
- if err := json.Unmarshal(body, &state); err != nil {
- return fmt.Errorf("can't unmarshal state file %s : %s", file, err)
- }
- for k, v := range state {
- var tbucket *Leaky
- log.Debugf("Reloading bucket %s", k)
- val, ok := buckets.Bucket_map.Load(k)
- if ok {
- log.Fatalf("key %s already exists : %+v", k, val)
- }
- //find back our holder
- found := false
- for _, h := range holders {
- if h.Name == v.Name {
- log.Debugf("found factory %s/%s -> %s", h.Author, h.Name, h.Description)
- //check in which mode the bucket was
- if v.Mode == TIMEMACHINE {
- tbucket = NewTimeMachine(h)
- } else if v.Mode == LIVE {
- tbucket = NewLeaky(h)
- } else {
- log.Errorf("Unknown bucket type : %d", v.Mode)
- }
- /*Trying to restore queue state*/
- tbucket.Queue = v.Queue
- /*Trying to set the limiter to the saved values*/
- tbucket.Limiter.Load(v.SerializedState)
- tbucket.In = make(chan types.Event)
- tbucket.Mapkey = k
- tbucket.Signal = make(chan bool, 1)
- tbucket.KillSwitch = make(chan bool, 1)
- tbucket.First_ts = v.First_ts
- tbucket.Last_ts = v.Last_ts
- tbucket.Ovflw_ts = v.Ovflw_ts
- tbucket.Total_count = v.Total_count
- buckets.Bucket_map.Store(k, tbucket)
- go LeakRoutine(tbucket)
- <-tbucket.Signal
- found = true
- break
- }
- }
- if !found {
- log.Fatalf("Unable to find holder for bucket %s : %s", k, spew.Sdump(v))
- }
- }
- log.Infof("Restored %d buckets from dump", len(state))
- return nil
- }
- var serialized map[string]Leaky
- /*The leaky routines lifecycle are based on "real" time.
- But when we are running in time-machine mode, the reference time is in logs and not "real" time.
- Thus we need to garbage collect them to avoid a skyrocketing memory usage.*/
- func GarbageCollectBuckets(deadline time.Time, buckets *Buckets) error {
- total := 0
- discard := 0
- toflush := []string{}
- buckets.Bucket_map.Range(func(rkey, rvalue interface{}) bool {
- key := rkey.(string)
- val := rvalue.(*Leaky)
- total += 1
- if !val.Ovflw_ts.IsZero() {
- discard += 1
- val.logger.Debugf("overflowed at %s.", val.Ovflw_ts)
- toflush = append(toflush, key)
- val.KillSwitch <- true
- return true
- }
- /*FIXME : sometimes the gettokenscountat has some rounding issues when we try to
- match it with bucket capacity, even if the bucket has long due underflow. Round to 2 decimals*/
- tokat := val.Limiter.GetTokensCountAt(deadline)
- tokcapa := float64(val.Capacity)
- tokat = math.Round(tokat*100) / 100
- tokcapa = math.Round(tokcapa*100) / 100
- if tokat >= tokcapa {
- BucketsUnderflow.With(prometheus.Labels{"name": val.Name}).Inc()
- val.logger.Debugf("UNDERFLOW : first_ts:%s tokens_at:%f capcity:%f", val.First_ts, tokat, tokcapa)
- toflush = append(toflush, key)
- val.KillSwitch <- true
- return true
- } else {
- val.logger.Debugf("(%s) not dead, count:%f capacity:%f", val.First_ts, tokat, tokcapa)
- }
- if _, ok := serialized[key]; ok {
- log.Errorf("entry %s already exists", key)
- return false
- } else {
- log.Debugf("serialize %s of %s : %s", val.Name, val.Uuid, val.Mapkey)
- }
- return true
- })
- log.Infof("Cleaned %d buckets", len(toflush))
- for _, flushkey := range toflush {
- buckets.Bucket_map.Delete(flushkey)
- }
- return nil
- }
- func DumpBucketsStateAt(file string, deadline time.Time, buckets *Buckets) error {
- serialized = make(map[string]Leaky)
- log.Printf("Dumping buckets state at %s", deadline)
- total := 0
- discard := 0
- buckets.Bucket_map.Range(func(rkey, rvalue interface{}) bool {
- key := rkey.(string)
- val := rvalue.(*Leaky)
- total += 1
- if !val.Ovflw_ts.IsZero() {
- discard += 1
- val.logger.Debugf("overflowed at %s.", val.Ovflw_ts)
- return true
- }
- /*FIXME : sometimes the gettokenscountat has some rounding issues when we try to
- match it with bucket capacity, even if the bucket has long due underflow. Round to 2 decimals*/
- tokat := val.Limiter.GetTokensCountAt(deadline)
- tokcapa := float64(val.Capacity)
- tokat = math.Round(tokat*100) / 100
- tokcapa = math.Round(tokcapa*100) / 100
- if tokat >= tokcapa {
- BucketsUnderflow.With(prometheus.Labels{"name": val.Name}).Inc()
- val.logger.Debugf("UNDERFLOW : first_ts:%s tokens_at:%f capcity:%f", val.First_ts, tokat, tokcapa)
- discard += 1
- return true
- } else {
- val.logger.Debugf("(%s) not dead, count:%f capacity:%f", val.First_ts, tokat, tokcapa)
- }
- if _, ok := serialized[key]; ok {
- log.Errorf("entry %s already exists", key)
- return false
- } else {
- log.Debugf("serialize %s of %s : %s", val.Name, val.Uuid, val.Mapkey)
- }
- val.SerializedState = val.Limiter.Dump()
- serialized[key] = *val
- return true
- })
- bbuckets, err := json.MarshalIndent(serialized, "", " ")
- if err != nil {
- log.Fatalf("Failed to unmarshal buckets : %s", err)
- }
- err = ioutil.WriteFile(file, bbuckets, 0644)
- if err != nil {
- log.Fatalf("Failed to write buckets state %s", err)
- }
- log.Warningf("Serialized %d live buckets state, %d total with %d expired to %s", len(serialized), total, discard, file)
- return nil
- }
- func PourItemToHolders(parsed types.Event, holders []BucketFactory, buckets *Buckets) (bool, error) {
- var (
- ok, condition, sent bool
- err error
- )
- for idx, holder := range holders {
- if holder.RunTimeFilter != nil {
- log.Debugf("event against holder %d/%d", idx, len(holders))
- output, err := expr.Run(holder.RunTimeFilter, exprhelpers.GetExprEnv(map[string]interface{}{"evt": &parsed}))
- if err != nil {
- holder.logger.Errorf("failed parsing : %v", err)
- return false, fmt.Errorf("leaky failed : %s", err)
- }
- // we assume we a bool should add type check here
- if condition, ok = output.(bool); !ok {
- holder.logger.Errorf("unexpected non-bool return : %T", output)
- log.Fatalf("Filter issue")
- }
- if !condition {
- holder.logger.Debugf("eval(FALSE) %s", holder.Filter)
- //log.Debugf("%s -> FALSE", holder.Filter)
- //holder.logger.Debugf("Filter eval failed")
- continue
- } else {
- holder.logger.Debugf("eval(TRUE) %s", holder.Filter)
- }
- }
- sent = false
- var groupby string
- if holder.RunTimeGroupBy != nil {
- tmpGroupBy, err := expr.Run(holder.RunTimeGroupBy, exprhelpers.GetExprEnv(map[string]interface{}{"evt": &parsed}))
- if err != nil {
- log.Errorf("failed groupby : %v", err)
- return false, errors.New("leaky failed :/")
- }
- if groupby, ok = tmpGroupBy.(string); !ok {
- log.Fatalf("failed groupby type : %v", err)
- return false, errors.New("groupby wrong type")
- }
- }
- buckey := GetKey(holder, groupby)
- sigclosed := 0
- keymiss := 0
- failed_sent := 0
- attempts := 0
- start := time.Now()
- for !sent {
- attempts += 1
- /* Warn the user if we used more than a 100 ms to pour an event, it's at least an half lock*/
- if attempts%100000 == 0 && start.Add(100*time.Millisecond).Before(time.Now()) {
- log.Warningf("stuck for %s sending event to %s (sigclosed:%d keymiss:%d failed_sent:%d attempts:%d)", time.Since(start),
- buckey, sigclosed, keymiss, failed_sent, attempts)
- }
- biface, ok := buckets.Bucket_map.Load(buckey)
- //biface, bigout
- /* the bucket doesn't exist, create it !*/
- if !ok {
- /*
- not found in map
- */
- log.Debugf("Creating bucket %s", buckey)
- keymiss += 1
- var fresh_bucket *Leaky
- switch parsed.ExpectMode {
- case TIMEMACHINE:
- fresh_bucket = NewTimeMachine(holder)
- holder.logger.Debugf("Creating TimeMachine bucket")
- case LIVE:
- fresh_bucket = NewLeaky(holder)
- holder.logger.Debugf("Creating Live bucket")
- default:
- log.Fatalf("input event has no expected mode, malformed : %+v", parsed)
- }
- fresh_bucket.In = make(chan types.Event)
- fresh_bucket.Mapkey = buckey
- fresh_bucket.Signal = make(chan bool, 1)
- fresh_bucket.KillSwitch = make(chan bool, 1)
- buckets.Bucket_map.Store(buckey, fresh_bucket)
- go LeakRoutine(fresh_bucket)
- log.Debugf("Created new bucket %s", buckey)
- //wait for signal to be opened
- <-fresh_bucket.Signal
- continue
- }
- bucket := biface.(*Leaky)
- /* check if leak routine is up */
- select {
- case _, ok := <-bucket.Signal:
- if !ok {
- //it's closed, delete it
- bucket.logger.Debugf("Bucket %s found dead, cleanup the body", buckey)
- buckets.Bucket_map.Delete(buckey)
- sigclosed += 1
- continue
- }
- log.Debugf("Signal exists, try to pour :)")
- default:
- /*nothing to read, but not closed, try to pour */
- log.Debugf("Signal exists but empty, try to pour :)")
- }
- /*let's see if this time-bucket should have expired */
- if bucket.Mode == TIMEMACHINE && !bucket.First_ts.IsZero() {
- var d time.Time
- err = d.UnmarshalText([]byte(parsed.MarshaledTime))
- if err != nil {
- log.Warningf("Failed unmarshaling event time (%s) : %v", parsed.MarshaledTime, err)
- }
- if d.After(bucket.Last_ts.Add(bucket.Duration)) {
- bucket.logger.Debugf("bucket is expired (curr event: %s, bucket deadline: %s), kill", d, bucket.Last_ts.Add(bucket.Duration))
- buckets.Bucket_map.Delete(buckey)
- continue
- }
- }
- /*if we're here, let's try to pour */
- select {
- case bucket.In <- parsed:
- log.Debugf("Successfully sent !")
- //sent was successful !
- sent = true
- continue
- default:
- failed_sent += 1
- log.Debugf("Failed to send, try again")
- continue
- }
- }
- log.Debugf("bucket '%s' is poured", holder.Name)
- }
- return sent, nil
- }
|