瀏覽代碼

Add conditional bucket (#1962)

blotus 2 年之前
父節點
當前提交
a84e4b6b15

+ 25 - 11
pkg/leakybucket/bucket.go

@@ -61,16 +61,17 @@ type Leaky struct {
 	Duration     time.Duration
 	Pour         func(*Leaky, types.Event) `json:"-"`
 	//Profiling when set to true enables profiling of bucket
-	Profiling       bool
-	timedOverflow   bool
-	logger          *log.Entry
-	scopeType       types.ScopeType
-	hash            string
-	scenarioVersion string
-	tomb            *tomb.Tomb
-	wgPour          *sync.WaitGroup
-	wgDumpState     *sync.WaitGroup
-	mutex           *sync.Mutex //used only for TIMEMACHINE mode to allow garbage collection without races
+	Profiling           bool
+	timedOverflow       bool
+	conditionalOverflow bool
+	logger              *log.Entry
+	scopeType           types.ScopeType
+	hash                string
+	scenarioVersion     string
+	tomb                *tomb.Tomb
+	wgPour              *sync.WaitGroup
+	wgDumpState         *sync.WaitGroup
+	mutex               *sync.Mutex //used only for TIMEMACHINE mode to allow garbage collection without races
 }
 
 var BucketsPour = prometheus.NewCounterVec(
@@ -188,6 +189,10 @@ func FromFactory(bucketFactory BucketFactory) *Leaky {
 		l.timedOverflow = true
 	}
 
+	if l.BucketConfig.Type == "conditional" {
+		l.conditionalOverflow = true
+		l.Duration = l.BucketConfig.leakspeed
+	}
 	return l
 }
 
@@ -247,6 +252,14 @@ func LeakRoutine(leaky *Leaky) error {
 			BucketsPour.With(prometheus.Labels{"name": leaky.Name, "source": msg.Line.Src, "type": msg.Line.Module}).Inc()
 
 			leaky.Pour(leaky, *msg) // glue for now
+
+			for _, processor := range processors {
+				msg = processor.AfterBucketPour(leaky.BucketConfig)(*msg, leaky)
+				if msg == nil {
+					goto End
+				}
+			}
+
 			//Clear cache on behalf of pour
 
 			// if durationTicker isn't initialized, then we're pouring our first event
@@ -337,7 +350,8 @@ func Pour(leaky *Leaky, msg types.Event) {
 		leaky.First_ts = time.Now().UTC()
 	}
 	leaky.Last_ts = time.Now().UTC()
-	if leaky.Limiter.Allow() {
+
+	if leaky.Limiter.Allow() || leaky.conditionalOverflow {
 		leaky.Queue.Add(msg)
 	} else {
 		leaky.Ovflw_ts = time.Now().UTC()

+ 2 - 2
pkg/leakybucket/buckets_test.go

@@ -64,8 +64,8 @@ func TestBucket(t *testing.T) {
 	}
 }
 
-//during tests, we're likely to have only one scenario, and thus only one holder.
-//we want to avoid the death of the tomb because all existing buckets have been destroyed.
+// during tests, we're likely to have only one scenario, and thus only one holder.
+// we want to avoid the death of the tomb because all existing buckets have been destroyed.
 func watchTomb(tomb *tomb.Tomb) {
 	for {
 		if tomb.Alive() == false {

+ 61 - 0
pkg/leakybucket/conditional.go

@@ -0,0 +1,61 @@
+package leakybucket
+
+import (
+	"fmt"
+	"time"
+
+	"github.com/antonmedv/expr"
+	"github.com/antonmedv/expr/vm"
+	"github.com/crowdsecurity/crowdsec/pkg/exprhelpers"
+	"github.com/crowdsecurity/crowdsec/pkg/types"
+)
+
+type ConditionalOverflow struct {
+	ConditionalFilter        string
+	ConditionalFilterRuntime *vm.Program
+	DumbProcessor
+}
+
+func NewConditionalOverflow(g *BucketFactory) (*ConditionalOverflow, error) {
+	var err error
+
+	c := ConditionalOverflow{}
+	c.ConditionalFilter = g.ConditionalOverflow
+	c.ConditionalFilterRuntime, err = expr.Compile(c.ConditionalFilter, expr.Env(exprhelpers.GetExprEnv(map[string]interface{}{
+		"queue": &Queue{}, "leaky": &Leaky{}})))
+	if err != nil {
+		g.logger.Errorf("Unable to compile condition expression for conditional bucket : %s", err)
+		return nil, fmt.Errorf("unable to compile condition expression for conditional bucket : %v", err)
+	}
+	return &c, nil
+}
+
+func (c *ConditionalOverflow) AfterBucketPour(b *BucketFactory) func(types.Event, *Leaky) *types.Event {
+	return func(msg types.Event, l *Leaky) *types.Event {
+		var condition, ok bool
+		if c.ConditionalFilterRuntime != nil {
+			l.logger.Debugf("Running condition expression : %s", c.ConditionalFilter)
+			ret, err := expr.Run(c.ConditionalFilterRuntime, exprhelpers.GetExprEnv(map[string]interface{}{"evt": &msg, "queue": l.Queue, "leaky": l}))
+			if err != nil {
+				l.logger.Errorf("unable to run conditional filter : %s", err)
+				return &msg
+			}
+
+			l.logger.Debugf("Conditional bucket expression returned : %v", ret)
+
+			if condition, ok = ret.(bool); !ok {
+				l.logger.Warningf("overflow condition, unexpected non-bool return : %T", ret)
+				return &msg
+			}
+
+			if condition {
+				l.logger.Debugf("Conditional bucket overflow")
+				l.Ovflw_ts = time.Now().UTC()
+				l.Out <- l.Queue
+				return nil
+			}
+		}
+
+		return &msg
+	}
+}

+ 68 - 42
pkg/leakybucket/manager_load.go

@@ -34,49 +34,50 @@ import (
 // BucketFactory struct holds all fields for any bucket configuration. This is to have a
 // generic struct for buckets. This can be seen as a bucket factory.
 type BucketFactory struct {
-	FormatVersion   string                    `yaml:"format"`
-	Author          string                    `yaml:"author"`
-	Description     string                    `yaml:"description"`
-	References      []string                  `yaml:"references"`
-	Type            string                    `yaml:"type"`                //Type can be : leaky, counter, trigger. It determines the main bucket characteristics
-	Name            string                    `yaml:"name"`                //Name of the bucket, used later in log and user-messages. Should be unique
-	Capacity        int                       `yaml:"capacity"`            //Capacity is applicable to leaky buckets and determines the "burst" capacity
-	LeakSpeed       string                    `yaml:"leakspeed"`           //Leakspeed is a float representing how many events per second leak out of the bucket
-	Duration        string                    `yaml:"duration"`            //Duration allows 'counter' buckets to have a fixed life-time
-	Filter          string                    `yaml:"filter"`              //Filter is an expr that determines if an event is elligible for said bucket. Filter is evaluated against the Event struct
-	GroupBy         string                    `yaml:"groupby,omitempty"`   //groupy is an expr that allows to determine the partitions of the bucket. A common example is the source_ip
-	Distinct        string                    `yaml:"distinct"`            //Distinct, when present, adds a `Pour()` processor that will only pour uniq items (based on distinct expr result)
-	Debug           bool                      `yaml:"debug"`               //Debug, when set to true, will enable debugging for _this_ scenario specifically
-	Labels          map[string]string         `yaml:"labels"`              //Labels is K:V list aiming at providing context the overflow
-	Blackhole       string                    `yaml:"blackhole,omitempty"` //Blackhole is a duration that, if present, will prevent same bucket partition to overflow more often than $duration
-	logger          *log.Entry                `yaml:"-"`                   //logger is bucket-specific logger (used by Debug as well)
-	Reprocess       bool                      `yaml:"reprocess"`           //Reprocess, if true, will for the bucket to be re-injected into processing chain
-	CacheSize       int                       `yaml:"cache_size"`          //CacheSize, if > 0, limits the size of in-memory cache of the bucket
-	Profiling       bool                      `yaml:"profiling"`           //Profiling, if true, will make the bucket record pours/overflows/etc.
-	OverflowFilter  string                    `yaml:"overflow_filter"`     //OverflowFilter if present, is a filter that must return true for the overflow to go through
-	ScopeType       types.ScopeType           `yaml:"scope,omitempty"`     //to enforce a different remediation than blocking an IP. Will default this to IP
-	BucketName      string                    `yaml:"-"`
-	Filename        string                    `yaml:"-"`
-	RunTimeFilter   *vm.Program               `json:"-"`
-	ExprDebugger    *exprhelpers.ExprDebugger `yaml:"-" json:"-"` // used to debug expression by printing the content of each variable of the expression
-	RunTimeGroupBy  *vm.Program               `json:"-"`
-	Data            []*types.DataSource       `yaml:"data,omitempty"`
-	DataDir         string                    `yaml:"-"`
-	CancelOnFilter  string                    `yaml:"cancel_on,omitempty"` //a filter that, if matched, kills the bucket
-	leakspeed       time.Duration             //internal representation of `Leakspeed`
-	duration        time.Duration             //internal representation of `Duration`
-	ret             chan types.Event          //the bucket-specific output chan for overflows
-	processors      []Processor               //processors is the list of hooks for pour/overflow/create (cf. uniq, blackhole etc.)
-	output          bool                      //??
-	ScenarioVersion string                    `yaml:"version,omitempty"`
-	hash            string                    `yaml:"-"`
-	Simulated       bool                      `yaml:"simulated"` //Set to true if the scenario instantiating the bucket was in the exclusion list
-	tomb            *tomb.Tomb                `yaml:"-"`
-	wgPour          *sync.WaitGroup           `yaml:"-"`
-	wgDumpState     *sync.WaitGroup           `yaml:"-"`
+	FormatVersion       string                    `yaml:"format"`
+	Author              string                    `yaml:"author"`
+	Description         string                    `yaml:"description"`
+	References          []string                  `yaml:"references"`
+	Type                string                    `yaml:"type"`                //Type can be : leaky, counter, trigger. It determines the main bucket characteristics
+	Name                string                    `yaml:"name"`                //Name of the bucket, used later in log and user-messages. Should be unique
+	Capacity            int                       `yaml:"capacity"`            //Capacity is applicable to leaky buckets and determines the "burst" capacity
+	LeakSpeed           string                    `yaml:"leakspeed"`           //Leakspeed is a float representing how many events per second leak out of the bucket
+	Duration            string                    `yaml:"duration"`            //Duration allows 'counter' buckets to have a fixed life-time
+	Filter              string                    `yaml:"filter"`              //Filter is an expr that determines if an event is elligible for said bucket. Filter is evaluated against the Event struct
+	GroupBy             string                    `yaml:"groupby,omitempty"`   //groupy is an expr that allows to determine the partitions of the bucket. A common example is the source_ip
+	Distinct            string                    `yaml:"distinct"`            //Distinct, when present, adds a `Pour()` processor that will only pour uniq items (based on distinct expr result)
+	Debug               bool                      `yaml:"debug"`               //Debug, when set to true, will enable debugging for _this_ scenario specifically
+	Labels              map[string]string         `yaml:"labels"`              //Labels is K:V list aiming at providing context the overflow
+	Blackhole           string                    `yaml:"blackhole,omitempty"` //Blackhole is a duration that, if present, will prevent same bucket partition to overflow more often than $duration
+	logger              *log.Entry                `yaml:"-"`                   //logger is bucket-specific logger (used by Debug as well)
+	Reprocess           bool                      `yaml:"reprocess"`           //Reprocess, if true, will for the bucket to be re-injected into processing chain
+	CacheSize           int                       `yaml:"cache_size"`          //CacheSize, if > 0, limits the size of in-memory cache of the bucket
+	Profiling           bool                      `yaml:"profiling"`           //Profiling, if true, will make the bucket record pours/overflows/etc.
+	OverflowFilter      string                    `yaml:"overflow_filter"`     //OverflowFilter if present, is a filter that must return true for the overflow to go through
+	ConditionalOverflow string                    `yaml:"condition"`           //condition if present, is an expression that must return true for the bucket to overflow
+	ScopeType           types.ScopeType           `yaml:"scope,omitempty"`     //to enforce a different remediation than blocking an IP. Will default this to IP
+	BucketName          string                    `yaml:"-"`
+	Filename            string                    `yaml:"-"`
+	RunTimeFilter       *vm.Program               `json:"-"`
+	ExprDebugger        *exprhelpers.ExprDebugger `yaml:"-" json:"-"` // used to debug expression by printing the content of each variable of the expression
+	RunTimeGroupBy      *vm.Program               `json:"-"`
+	Data                []*types.DataSource       `yaml:"data,omitempty"`
+	DataDir             string                    `yaml:"-"`
+	CancelOnFilter      string                    `yaml:"cancel_on,omitempty"` //a filter that, if matched, kills the bucket
+	leakspeed           time.Duration             //internal representation of `Leakspeed`
+	duration            time.Duration             //internal representation of `Duration`
+	ret                 chan types.Event          //the bucket-specific output chan for overflows
+	processors          []Processor               //processors is the list of hooks for pour/overflow/create (cf. uniq, blackhole etc.)
+	output              bool                      //??
+	ScenarioVersion     string                    `yaml:"version,omitempty"`
+	hash                string                    `yaml:"-"`
+	Simulated           bool                      `yaml:"simulated"` //Set to true if the scenario instantiating the bucket was in the exclusion list
+	tomb                *tomb.Tomb                `yaml:"-"`
+	wgPour              *sync.WaitGroup           `yaml:"-"`
+	wgDumpState         *sync.WaitGroup           `yaml:"-"`
 }
 
-//we use one NameGenerator for all the future buckets
+// we use one NameGenerator for all the future buckets
 var seed namegenerator.Generator = namegenerator.NewNameGenerator(time.Now().UTC().UnixNano())
 
 func ValidateFactory(bucketFactory *BucketFactory) error {
@@ -98,7 +99,7 @@ func ValidateFactory(bucketFactory *BucketFactory) error {
 		}
 	} else if bucketFactory.Type == "counter" {
 		if bucketFactory.Duration == "" {
-			return fmt.Errorf("duration ca't be empty for counter")
+			return fmt.Errorf("duration can't be empty for counter")
 		}
 		if bucketFactory.duration == 0 {
 			return fmt.Errorf("bad duration for counter bucket '%d'", bucketFactory.duration)
@@ -110,6 +111,19 @@ func ValidateFactory(bucketFactory *BucketFactory) error {
 		if bucketFactory.Capacity != 0 {
 			return fmt.Errorf("trigger bucket must have 0 capacity")
 		}
+	} else if bucketFactory.Type == "conditional" {
+		if bucketFactory.ConditionalOverflow == "" {
+			return fmt.Errorf("conditional bucket must have a condition")
+		}
+		if bucketFactory.Capacity != -1 {
+			bucketFactory.logger.Warnf("Using a value different than -1 as capacity for conditional bucket, this may lead to unexpected overflows")
+		}
+		if bucketFactory.LeakSpeed == "" {
+			return fmt.Errorf("leakspeed can't be empty for conditional bucket")
+		}
+		if bucketFactory.leakspeed == 0 {
+			return fmt.Errorf("bad leakspeed for conditional bucket '%s'", bucketFactory.LeakSpeed)
+		}
 	} else {
 		return fmt.Errorf("unknown bucket type '%s'", bucketFactory.Type)
 	}
@@ -304,6 +318,8 @@ func LoadBucket(bucketFactory *BucketFactory, tomb *tomb.Tomb) error {
 		bucketFactory.processors = append(bucketFactory.processors, &Trigger{})
 	case "counter":
 		bucketFactory.processors = append(bucketFactory.processors, &DumbProcessor{})
+	case "conditional":
+		bucketFactory.processors = append(bucketFactory.processors, &DumbProcessor{})
 	default:
 		return fmt.Errorf("invalid type '%s' in %s : %v", bucketFactory.Type, bucketFactory.Filename, err)
 	}
@@ -338,6 +354,16 @@ func LoadBucket(bucketFactory *BucketFactory, tomb *tomb.Tomb) error {
 		bucketFactory.processors = append(bucketFactory.processors, blackhole)
 	}
 
+	if bucketFactory.ConditionalOverflow != "" {
+		bucketFactory.logger.Tracef("Adding conditional overflow.")
+		condovflw, err := NewConditionalOverflow(bucketFactory)
+		if err != nil {
+			bucketFactory.logger.Errorf("Error creating conditional overflow : %s", err)
+			return fmt.Errorf("error creating conditional overflow : %s", err)
+		}
+		bucketFactory.processors = append(bucketFactory.processors, condovflw)
+	}
+
 	if len(bucketFactory.Data) > 0 {
 		for _, data := range bucketFactory.Data {
 			if data.DestPath == "" {

+ 7 - 0
pkg/leakybucket/processor.go

@@ -6,6 +6,8 @@ type Processor interface {
 	OnBucketInit(Bucket *BucketFactory) error
 	OnBucketPour(Bucket *BucketFactory) func(types.Event, *Leaky) *types.Event
 	OnBucketOverflow(Bucket *BucketFactory) func(*Leaky, types.RuntimeAlert, *Queue) (types.RuntimeAlert, *Queue)
+
+	AfterBucketPour(Bucket *BucketFactory) func(types.Event, *Leaky) *types.Event
 }
 
 type DumbProcessor struct {
@@ -25,5 +27,10 @@ func (d *DumbProcessor) OnBucketOverflow(b *BucketFactory) func(*Leaky, types.Ru
 	return func(leaky *Leaky, alert types.RuntimeAlert, queue *Queue) (types.RuntimeAlert, *Queue) {
 		return alert, queue
 	}
+}
 
+func (d *DumbProcessor) AfterBucketPour(bucketFactory *BucketFactory) func(types.Event, *Leaky) *types.Event {
+	return func(msg types.Event, leaky *Leaky) *types.Event {
+		return &msg
+	}
 }

+ 6 - 0
pkg/leakybucket/reset_filter.go

@@ -64,6 +64,12 @@ func (u *CancelOnFilter) OnBucketOverflow(bucketFactory *BucketFactory) func(*Le
 	}
 }
 
+func (u *CancelOnFilter) AfterBucketPour(bucketFactory *BucketFactory) func(types.Event, *Leaky) *types.Event {
+	return func(msg types.Event, leaky *Leaky) *types.Event {
+		return &msg
+	}
+}
+
 func (u *CancelOnFilter) OnBucketInit(bucketFactory *BucketFactory) error {
 	var err error
 	var compiledExpr struct {

+ 11 - 0
pkg/leakybucket/tests/conditional-bucket/bucket.yaml

@@ -0,0 +1,11 @@
+type: conditional
+name: test/conditional
+#debug: true
+description: "conditional bucket"
+filter: "evt.Meta.log_type == 'http_access-log'"
+groupby: evt.Meta.source_ip
+condition: any(queue.Queue, {.Meta.http_path == "/"}) and any(queue.Queue, {.Meta.http_path == "/foo"})
+leakspeed: 1s
+capacity: -1
+labels:
+  type: overflow_1

+ 1 - 0
pkg/leakybucket/tests/conditional-bucket/scenarios.yaml

@@ -0,0 +1 @@
+ - filename: {{.TestDirectory}}/bucket.yaml

+ 50 - 0
pkg/leakybucket/tests/conditional-bucket/test.json

@@ -0,0 +1,50 @@
+{
+	"lines": [
+	   {
+		  "Line": {
+			 "Labels": {
+				"type": "nginx"
+			 },
+			 "Raw": "don't care"
+		  },
+		  "MarshaledTime": "2020-01-01T10:00:00.000Z",
+		  "Meta": {
+			 "source_ip": "2a00:1450:4007:816::200e",
+			 "log_type": "http_access-log",
+			 "http_path": "/"
+		  }
+	   },
+	   {
+		"Line": {
+		   "Labels": {
+			  "type": "nginx"
+		   },
+		   "Raw": "don't care"
+		},
+		"MarshaledTime": "2020-01-01T10:00:00.000Z",
+		"Meta": {
+		   "source_ip": "2a00:1450:4007:816::200e",
+		   "log_type": "http_access-log",
+		   "http_path": "/foo"
+		}
+	 }
+	],
+	"results": [
+	  {
+		"Type" : 1,
+		"Alert": {
+		  "sources" : {
+			"2a00:1450:4007:816::200e": {
+			  "ip": "2a00:1450:4007:816::200e",
+			  "scope": "Ip",
+			  "value": "2a00:1450:4007:816::200e"
+			}
+		  },
+		  "Alert" : {
+			"scenario": "test/conditional",
+			"events_count": 2
+		  }
+		}
+	  }
+	]
+  }

+ 6 - 0
pkg/leakybucket/uniq.go

@@ -53,6 +53,12 @@ func (u *Uniq) OnBucketOverflow(bucketFactory *BucketFactory) func(*Leaky, types
 	}
 }
 
+func (u *Uniq) AfterBucketPour(bucketFactory *BucketFactory) func(types.Event, *Leaky) *types.Event {
+	return func(msg types.Event, leaky *Leaky) *types.Event {
+		return &msg
+	}
+}
+
 func (u *Uniq) OnBucketInit(bucketFactory *BucketFactory) error {
 	var err error
 	var compiledExpr *vm.Program