Improve distinct/uniq behaviour (#1478)
* make uniq/distinct use a cache that is independant of the bucket's cache_size * add testing specifically for cache_size
This commit is contained in:
parent
0449ec1868
commit
fbcb2ed7fd
4 changed files with 220 additions and 10 deletions
|
@ -0,0 +1,14 @@
|
|||
# ssh bruteforce
|
||||
type: leaky
|
||||
debug: true
|
||||
name: test/simple-leaky
|
||||
description: "Simple leaky"
|
||||
filter: "evt.Line.Labels.type =='testlog'"
|
||||
leakspeed: "20s"
|
||||
capacity: 3
|
||||
cache_size: 1
|
||||
distinct: evt.Meta.uniq_key
|
||||
groupby: evt.Meta.source_ip
|
||||
labels:
|
||||
type: overflow_1
|
||||
|
|
@ -0,0 +1,2 @@
|
|||
- filename: {{.TestDirectory}}/bucket.yaml
|
||||
|
194
pkg/leakybucket/tests/simple-leaky-uniq-cachesize/test.json
Normal file
194
pkg/leakybucket/tests/simple-leaky-uniq-cachesize/test.json
Normal file
|
@ -0,0 +1,194 @@
|
|||
{
|
||||
"lines": [
|
||||
{
|
||||
"Line": {
|
||||
"Labels": {
|
||||
"type": "testlog"
|
||||
},
|
||||
"Raw": "xxheader VALUE1 trailing stuff"
|
||||
},
|
||||
"MarshaledTime": "2020-01-01T10:00:00+00:00",
|
||||
"Meta": {
|
||||
"source_ip": "1.2.3.4",
|
||||
"uniq_key": "aaa"
|
||||
}
|
||||
},
|
||||
{
|
||||
"Line": {
|
||||
"Labels": {
|
||||
"type": "testlog"
|
||||
},
|
||||
"Raw": "xxheader VALUE2 trailing stuff"
|
||||
},
|
||||
"MarshaledTime": "2020-01-01T10:00:01+00:00",
|
||||
"Meta": {
|
||||
"source_ip": "1.2.3.4",
|
||||
"uniq_key": "aab"
|
||||
}
|
||||
},
|
||||
{
|
||||
"Line": {
|
||||
"Labels": {
|
||||
"type": "testlog"
|
||||
},
|
||||
"Raw": "xxheader VALUE2 trailing stuff"
|
||||
},
|
||||
"MarshaledTime": "2020-01-01T10:00:01+00:00",
|
||||
"Meta": {
|
||||
"source_ip": "1.2.3.4",
|
||||
"uniq_key": "aac"
|
||||
}
|
||||
},
|
||||
{
|
||||
"Line": {
|
||||
"Labels": {
|
||||
"type": "testlog"
|
||||
},
|
||||
"Raw": "xxheader VALUE2 trailing stuff"
|
||||
},
|
||||
"MarshaledTime": "2020-01-01T10:00:02+00:00",
|
||||
"Meta": {
|
||||
"source_ip": "1.2.3.4",
|
||||
"uniq_key": "aaa"
|
||||
}
|
||||
},
|
||||
{
|
||||
"Line": {
|
||||
"Labels": {
|
||||
"type": "testlog"
|
||||
},
|
||||
"Raw": "xxheader VALUE2 trailing stuff"
|
||||
},
|
||||
"MarshaledTime": "2020-01-01T10:00:02+00:00",
|
||||
"Meta": {
|
||||
"source_ip": "1.2.3.4",
|
||||
"uniq_key": "aaa"
|
||||
}
|
||||
},
|
||||
{
|
||||
"Line": {
|
||||
"Labels": {
|
||||
"type": "testlog"
|
||||
},
|
||||
"Raw": "xxheader VALUE2 trailing stuff"
|
||||
},
|
||||
"MarshaledTime": "2020-01-01T10:00:03+00:00",
|
||||
"Meta": {
|
||||
"source_ip": "1.2.3.4",
|
||||
"uniq_key": "aab"
|
||||
}
|
||||
},
|
||||
|
||||
{
|
||||
"Line": {
|
||||
"Labels": {
|
||||
"type": "testlog"
|
||||
},
|
||||
"Raw": "xxheader VALUE2 trailing stuff"
|
||||
},
|
||||
"MarshaledTime": "2020-01-01T10:00:03+00:00",
|
||||
"Meta": {
|
||||
"source_ip": "1.2.3.5",
|
||||
"uniq_key": "aaa"
|
||||
}
|
||||
},
|
||||
{
|
||||
"Line": {
|
||||
"Labels": {
|
||||
"type": "testlog"
|
||||
},
|
||||
"Raw": "xxheader VALUE2 trailing stuff"
|
||||
},
|
||||
"MarshaledTime": "2020-01-01T10:00:04+00:00",
|
||||
"Meta": {
|
||||
"source_ip": "1.2.3.5",
|
||||
"uniq_key": "aab"
|
||||
}
|
||||
},
|
||||
{
|
||||
"Line": {
|
||||
"Labels": {
|
||||
"type": "testlog"
|
||||
},
|
||||
"Raw": "xxheader VALUE2 trailing stuff"
|
||||
},
|
||||
"MarshaledTime": "2020-01-01T10:00:04+00:00",
|
||||
"Meta": {
|
||||
"source_ip": "1.2.3.5",
|
||||
"uniq_key": "aac"
|
||||
}
|
||||
},
|
||||
{
|
||||
"Line": {
|
||||
"Labels": {
|
||||
"type": "testlog"
|
||||
},
|
||||
"Raw": "xxheader VALUE2 trailing stuff"
|
||||
},
|
||||
"MarshaledTime": "2020-01-01T10:00:05+00:00",
|
||||
"Meta": {
|
||||
"source_ip": "1.2.3.5",
|
||||
"uniq_key": "aaa"
|
||||
}
|
||||
},
|
||||
{
|
||||
"Line": {
|
||||
"Labels": {
|
||||
"type": "testlog"
|
||||
},
|
||||
"Raw": "xxheader VALUE2 trailing stuff"
|
||||
},
|
||||
"MarshaledTime": "2020-01-01T10:00:05+00:00",
|
||||
"Meta": {
|
||||
"source_ip": "1.2.3.5",
|
||||
"uniq_key": "aab"
|
||||
}
|
||||
},
|
||||
{
|
||||
"Line": {
|
||||
"Labels": {
|
||||
"type": "testlog"
|
||||
},
|
||||
"Raw": "xxheader VALUE2 trailing stuff"
|
||||
},
|
||||
"MarshaledTime": "2020-01-01T10:00:06+00:00",
|
||||
"Meta": {
|
||||
"source_ip": "1.2.3.5",
|
||||
"uniq_key": "aac"
|
||||
}
|
||||
},
|
||||
{
|
||||
"Line": {
|
||||
"Labels": {
|
||||
"type": "testlog"
|
||||
},
|
||||
"Raw": "xxheader VALUE2 trailing stuff"
|
||||
},
|
||||
"MarshaledTime": "2020-01-01T10:00:06+00:00",
|
||||
"Meta": {
|
||||
"source_ip": "1.2.3.5",
|
||||
"uniq_key": "aad"
|
||||
}
|
||||
}
|
||||
],
|
||||
"results": [
|
||||
{
|
||||
"Alert": {
|
||||
"sources": {
|
||||
"1.2.3.5": {
|
||||
"scope": "Ip",
|
||||
"value": "1.2.3.5",
|
||||
|
||||
"ip": "1.2.3.5"
|
||||
}
|
||||
},
|
||||
"Alert" : {
|
||||
"scenario": "test/simple-leaky",
|
||||
"events_count": 4
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
]
|
||||
}
|
||||
|
|
@ -16,6 +16,7 @@ import (
|
|||
|
||||
type Uniq struct {
|
||||
DistinctCompiled *vm.Program
|
||||
KeyCache map[string]bool
|
||||
}
|
||||
|
||||
func (u *Uniq) OnBucketPour(bucketFactory *BucketFactory) func(types.Event, *Leaky) *types.Event {
|
||||
|
@ -26,17 +27,15 @@ func (u *Uniq) OnBucketPour(bucketFactory *BucketFactory) func(types.Event, *Lea
|
|||
return &msg
|
||||
}
|
||||
leaky.logger.Tracef("Uniq '%s' -> '%s'", bucketFactory.Distinct, element)
|
||||
for _, evt := range leaky.Queue.GetQueue() {
|
||||
if val, err := getElement(evt, u.DistinctCompiled); err == nil && val == element {
|
||||
leaky.logger.Debugf("Uniq(%s) : ko, discard event", element)
|
||||
return nil
|
||||
}
|
||||
if err != nil {
|
||||
leaky.logger.Errorf("Uniq filter exec failed : %v", err)
|
||||
}
|
||||
if _, ok := u.KeyCache[element]; !ok {
|
||||
leaky.logger.Debugf("Uniq(%s) : ok", element)
|
||||
u.KeyCache[element] = true
|
||||
return &msg
|
||||
|
||||
} else {
|
||||
leaky.logger.Debugf("Uniq(%s) : ko, discard event", element)
|
||||
return nil
|
||||
}
|
||||
leaky.logger.Debugf("Uniq(%s) : ok", element)
|
||||
return &msg
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -50,6 +49,7 @@ func (u *Uniq) OnBucketInit(bucketFactory *BucketFactory) error {
|
|||
var err error
|
||||
|
||||
u.DistinctCompiled, err = expr.Compile(bucketFactory.Distinct, expr.Env(exprhelpers.GetExprEnv(map[string]interface{}{"evt": &types.Event{}})))
|
||||
u.KeyCache = make(map[string]bool)
|
||||
return err
|
||||
}
|
||||
|
||||
|
|
Loading…
Add table
Reference in a new issue