|
@@ -31,35 +31,36 @@ import (
|
|
// BucketFactory struct holds all fields for any bucket configuration. This is to have a
|
|
// BucketFactory struct holds all fields for any bucket configuration. This is to have a
|
|
// generic struct for buckets. This can be seen as a bucket factory.
|
|
// generic struct for buckets. This can be seen as a bucket factory.
|
|
type BucketFactory struct {
|
|
type BucketFactory struct {
|
|
- FormatVersion string `yaml:"format"`
|
|
|
|
- Author string `yaml:"author"`
|
|
|
|
- Description string `yaml:"description"`
|
|
|
|
- References []string `yaml:"references"`
|
|
|
|
- Type string `yaml:"type"` //Type can be : leaky, counter, trigger. It determines the main bucket characteristics
|
|
|
|
- Name string `yaml:"name"` //Name of the bucket, used later in log and user-messages. Should be unique
|
|
|
|
- Capacity int `yaml:"capacity"` //Capacity is applicable to leaky buckets and determines the "burst" capacity
|
|
|
|
- LeakSpeed string `yaml:"leakspeed"` //Leakspeed is a float representing how many events per second leak out of the bucket
|
|
|
|
- Duration string `yaml:"duration"` //Duration allows 'counter' buckets to have a fixed life-time
|
|
|
|
- Filter string `yaml:"filter"` //Filter is an expr that determines if an event is elligible for said bucket. Filter is evaluated against the Event struct
|
|
|
|
- GroupBy string `yaml:"groupby,omitempty"` //groupy is an expr that allows to determine the partitions of the bucket. A common example is the source_ip
|
|
|
|
- Distinct string `yaml:"distinct"` //Distinct, when present, adds a `Pour()` processor that will only pour uniq items (based on uniq_filter expr result)
|
|
|
|
- Debug bool `yaml:"debug"` //Debug, when set to true, will enable debugging for _this_ scenario specifically
|
|
|
|
- Labels map[string]string `yaml:"labels"` //Labels is K:V list aiming at providing context the overflow
|
|
|
|
- Blackhole string `yaml:"blackhole,omitempty"` //Blackhole is a duration that, if present, will prevent same bucket partition to overflow more often than $duration
|
|
|
|
- logger *log.Entry `yaml:"-"` //logger is bucket-specific logger (used by Debug as well)
|
|
|
|
- Reprocess bool `yaml:"reprocess"` //Reprocess, if true, will for the bucket to be re-injected into processing chain
|
|
|
|
- CacheSize int `yaml:"cache_size"` //CacheSize, if > 0, limits the size of in-memory cache of the bucket
|
|
|
|
- Profiling bool `yaml:"profiling"` //Profiling, if true, will make the bucket record pours/overflows/etc.
|
|
|
|
- OverflowFilter string `yaml:"overflow_filter"` //OverflowFilter if present, is a filter that must return true for the overflow to go through
|
|
|
|
- BucketName string `yaml:"-"`
|
|
|
|
- Filename string `yaml:"-"`
|
|
|
|
- RunTimeFilter *vm.Program `json:"-"`
|
|
|
|
- RunTimeGroupBy *vm.Program `json:"-"`
|
|
|
|
- leakspeed time.Duration //internal representation of `Leakspeed`
|
|
|
|
- duration time.Duration //internal representation of `Duration`
|
|
|
|
- ret chan types.Event //the bucket-specific output chan for overflows
|
|
|
|
- processors []Processor //processors is the list of hooks for pour/overflow/create (cf. uniq, blackhole etc.)
|
|
|
|
- output bool //??
|
|
|
|
|
|
+ FormatVersion string `yaml:"format"`
|
|
|
|
+ Author string `yaml:"author"`
|
|
|
|
+ Description string `yaml:"description"`
|
|
|
|
+ References []string `yaml:"references"`
|
|
|
|
+ Type string `yaml:"type"` //Type can be : leaky, counter, trigger. It determines the main bucket characteristics
|
|
|
|
+ Name string `yaml:"name"` //Name of the bucket, used later in log and user-messages. Should be unique
|
|
|
|
+ Capacity int `yaml:"capacity"` //Capacity is applicable to leaky buckets and determines the "burst" capacity
|
|
|
|
+ LeakSpeed string `yaml:"leakspeed"` //Leakspeed is a float representing how many events per second leak out of the bucket
|
|
|
|
+ Duration string `yaml:"duration"` //Duration allows 'counter' buckets to have a fixed life-time
|
|
|
|
+ Filter string `yaml:"filter"` //Filter is an expr that determines if an event is elligible for said bucket. Filter is evaluated against the Event struct
|
|
|
|
+ GroupBy string `yaml:"groupby,omitempty"` //groupy is an expr that allows to determine the partitions of the bucket. A common example is the source_ip
|
|
|
|
+ Distinct string `yaml:"distinct"` //Distinct, when present, adds a `Pour()` processor that will only pour uniq items (based on uniq_filter expr result)
|
|
|
|
+ Debug bool `yaml:"debug"` //Debug, when set to true, will enable debugging for _this_ scenario specifically
|
|
|
|
+ Labels map[string]string `yaml:"labels"` //Labels is K:V list aiming at providing context the overflow
|
|
|
|
+ Blackhole string `yaml:"blackhole,omitempty"` //Blackhole is a duration that, if present, will prevent same bucket partition to overflow more often than $duration
|
|
|
|
+ logger *log.Entry `yaml:"-"` //logger is bucket-specific logger (used by Debug as well)
|
|
|
|
+ Reprocess bool `yaml:"reprocess"` //Reprocess, if true, will for the bucket to be re-injected into processing chain
|
|
|
|
+ CacheSize int `yaml:"cache_size"` //CacheSize, if > 0, limits the size of in-memory cache of the bucket
|
|
|
|
+ Profiling bool `yaml:"profiling"` //Profiling, if true, will make the bucket record pours/overflows/etc.
|
|
|
|
+ OverflowFilter string `yaml:"overflow_filter"` //OverflowFilter if present, is a filter that must return true for the overflow to go through
|
|
|
|
+ BucketName string `yaml:"-"`
|
|
|
|
+ Filename string `yaml:"-"`
|
|
|
|
+ RunTimeFilter *vm.Program `json:"-"`
|
|
|
|
+ RunTimeGroupBy *vm.Program `json:"-"`
|
|
|
|
+ Data []*types.DataSource `yaml:"data,omitempty"`
|
|
|
|
+ leakspeed time.Duration //internal representation of `Leakspeed`
|
|
|
|
+ duration time.Duration //internal representation of `Duration`
|
|
|
|
+ ret chan types.Event //the bucket-specific output chan for overflows
|
|
|
|
+ processors []Processor //processors is the list of hooks for pour/overflow/create (cf. uniq, blackhole etc.)
|
|
|
|
+ output bool //??
|
|
}
|
|
}
|
|
|
|
|
|
func ValidateFactory(b *BucketFactory) error {
|
|
func ValidateFactory(b *BucketFactory) error {
|
|
@@ -101,16 +102,18 @@ func ValidateFactory(b *BucketFactory) error {
|
|
|
|
|
|
/* Init recursively process yaml files from a directory and loads them as BucketFactory */
|
|
/* Init recursively process yaml files from a directory and loads them as BucketFactory */
|
|
func Init(cfg map[string]string) ([]BucketFactory, chan types.Event, error) {
|
|
func Init(cfg map[string]string) ([]BucketFactory, chan types.Event, error) {
|
|
- return LoadBucketDir(cfg["patterns"])
|
|
|
|
|
|
+ return LoadBucketDir(cfg["patterns"], cfg["data"])
|
|
}
|
|
}
|
|
|
|
|
|
-func LoadBuckets(files []string) ([]BucketFactory, chan types.Event, error) {
|
|
|
|
|
|
+func LoadBuckets(files []string, dataFolder string) ([]BucketFactory, chan types.Event, error) {
|
|
var (
|
|
var (
|
|
ret []BucketFactory = []BucketFactory{}
|
|
ret []BucketFactory = []BucketFactory{}
|
|
response chan types.Event
|
|
response chan types.Event
|
|
)
|
|
)
|
|
|
|
|
|
var seed namegenerator.Generator = namegenerator.NewNameGenerator(time.Now().UTC().UnixNano())
|
|
var seed namegenerator.Generator = namegenerator.NewNameGenerator(time.Now().UTC().UnixNano())
|
|
|
|
+ exprhelpers.Init()
|
|
|
|
+
|
|
response = make(chan types.Event, 1)
|
|
response = make(chan types.Event, 1)
|
|
for _, f := range files {
|
|
for _, f := range files {
|
|
log.Debugf("Loading %s", f)
|
|
log.Debugf("Loading %s", f)
|
|
@@ -160,7 +163,7 @@ func LoadBuckets(files []string) ([]BucketFactory, chan types.Event, error) {
|
|
g.Filename = filepath.Clean(f)
|
|
g.Filename = filepath.Clean(f)
|
|
g.BucketName = seed.Generate()
|
|
g.BucketName = seed.Generate()
|
|
g.ret = response
|
|
g.ret = response
|
|
- err = LoadBucket(&g)
|
|
|
|
|
|
+ err = LoadBucket(&g, dataFolder)
|
|
if err != nil {
|
|
if err != nil {
|
|
log.Errorf("Failed to load bucket : %v", err)
|
|
log.Errorf("Failed to load bucket : %v", err)
|
|
return nil, nil, fmt.Errorf("loadBucket failed : %v", err)
|
|
return nil, nil, fmt.Errorf("loadBucket failed : %v", err)
|
|
@@ -172,7 +175,7 @@ func LoadBuckets(files []string) ([]BucketFactory, chan types.Event, error) {
|
|
return ret, response, nil
|
|
return ret, response, nil
|
|
}
|
|
}
|
|
|
|
|
|
-func LoadBucketDir(dir string) ([]BucketFactory, chan types.Event, error) {
|
|
|
|
|
|
+func LoadBucketDir(dir string, dataFolder string) ([]BucketFactory, chan types.Event, error) {
|
|
var (
|
|
var (
|
|
filenames []string
|
|
filenames []string
|
|
)
|
|
)
|
|
@@ -183,11 +186,11 @@ func LoadBucketDir(dir string) ([]BucketFactory, chan types.Event, error) {
|
|
for _, f := range files {
|
|
for _, f := range files {
|
|
filenames = append(filenames, dir+f.Name())
|
|
filenames = append(filenames, dir+f.Name())
|
|
}
|
|
}
|
|
- return LoadBuckets(filenames)
|
|
|
|
|
|
+ return LoadBuckets(filenames, dataFolder)
|
|
}
|
|
}
|
|
|
|
|
|
/* Init recursively process yaml files from a directory and loads them as BucketFactory */
|
|
/* Init recursively process yaml files from a directory and loads them as BucketFactory */
|
|
-func LoadBucket(g *BucketFactory) error {
|
|
|
|
|
|
+func LoadBucket(g *BucketFactory, dataFolder string) error {
|
|
var err error
|
|
var err error
|
|
if g.Debug {
|
|
if g.Debug {
|
|
var clog = logrus.New()
|
|
var clog = logrus.New()
|
|
@@ -275,6 +278,12 @@ func LoadBucket(g *BucketFactory) error {
|
|
g.processors = append(g.processors, blackhole)
|
|
g.processors = append(g.processors, blackhole)
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+ if len(g.Data) > 0 {
|
|
|
|
+ for _, data := range g.Data {
|
|
|
|
+ err = exprhelpers.FileInit(dataFolder, data.DestPath)
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
g.output = false
|
|
g.output = false
|
|
if err := ValidateFactory(g); err != nil {
|
|
if err := ValidateFactory(g); err != nil {
|
|
return fmt.Errorf("invalid bucket from %s : %v", g.Filename, err)
|
|
return fmt.Errorf("invalid bucket from %s : %v", g.Filename, err)
|