|
@@ -51,6 +51,7 @@ type BucketFactory struct {
|
|
CacheSize int `yaml:"cache_size"` //CacheSize, if > 0, limits the size of in-memory cache of the bucket
|
|
CacheSize int `yaml:"cache_size"` //CacheSize, if > 0, limits the size of in-memory cache of the bucket
|
|
Profiling bool `yaml:"profiling"` //Profiling, if true, will make the bucket record pours/overflows/etc.
|
|
Profiling bool `yaml:"profiling"` //Profiling, if true, will make the bucket record pours/overflows/etc.
|
|
OverflowFilter string `yaml:"overflow_filter"` //OverflowFilter if present, is a filter that must return true for the overflow to go through
|
|
OverflowFilter string `yaml:"overflow_filter"` //OverflowFilter if present, is a filter that must return true for the overflow to go through
|
|
|
|
+ Data []*dataSource `yaml:"data,omitempty"` //If data are needed for the filter to work (ie downloading file)
|
|
BucketName string `yaml:"-"`
|
|
BucketName string `yaml:"-"`
|
|
Filename string `yaml:"-"`
|
|
Filename string `yaml:"-"`
|
|
RunTimeFilter *vm.Program `json:"-"`
|
|
RunTimeFilter *vm.Program `json:"-"`
|
|
@@ -101,15 +102,14 @@ func ValidateFactory(b *BucketFactory) error {
|
|
|
|
|
|
/* Init recursively process yaml files from a directory and loads them as BucketFactory */
|
|
/* Init recursively process yaml files from a directory and loads them as BucketFactory */
|
|
func Init(cfg map[string]string) ([]BucketFactory, chan types.Event, error) {
|
|
func Init(cfg map[string]string) ([]BucketFactory, chan types.Event, error) {
|
|
- return LoadBucketDir(cfg["patterns"])
|
|
|
|
|
|
+ return LoadBucketDir(cfg["patterns"], cfg["data"])
|
|
}
|
|
}
|
|
|
|
|
|
-func LoadBuckets(files []string) ([]BucketFactory, chan types.Event, error) {
|
|
|
|
|
|
+func LoadBuckets(files []string, dataDir string) ([]BucketFactory, chan types.Event, error) {
|
|
var (
|
|
var (
|
|
ret []BucketFactory = []BucketFactory{}
|
|
ret []BucketFactory = []BucketFactory{}
|
|
response chan types.Event
|
|
response chan types.Event
|
|
)
|
|
)
|
|
-
|
|
|
|
var seed namegenerator.Generator = namegenerator.NewNameGenerator(time.Now().UTC().UnixNano())
|
|
var seed namegenerator.Generator = namegenerator.NewNameGenerator(time.Now().UTC().UnixNano())
|
|
response = make(chan types.Event, 1)
|
|
response = make(chan types.Event, 1)
|
|
for _, f := range files {
|
|
for _, f := range files {
|
|
@@ -160,7 +160,7 @@ func LoadBuckets(files []string) ([]BucketFactory, chan types.Event, error) {
|
|
g.Filename = filepath.Clean(f)
|
|
g.Filename = filepath.Clean(f)
|
|
g.BucketName = seed.Generate()
|
|
g.BucketName = seed.Generate()
|
|
g.ret = response
|
|
g.ret = response
|
|
- err = LoadBucket(&g)
|
|
|
|
|
|
+ err = LoadBucket(&g, dataDir)
|
|
if err != nil {
|
|
if err != nil {
|
|
log.Errorf("Failed to load bucket : %v", err)
|
|
log.Errorf("Failed to load bucket : %v", err)
|
|
return nil, nil, fmt.Errorf("loadBucket failed : %v", err)
|
|
return nil, nil, fmt.Errorf("loadBucket failed : %v", err)
|
|
@@ -172,7 +172,7 @@ func LoadBuckets(files []string) ([]BucketFactory, chan types.Event, error) {
|
|
return ret, response, nil
|
|
return ret, response, nil
|
|
}
|
|
}
|
|
|
|
|
|
-func LoadBucketDir(dir string) ([]BucketFactory, chan types.Event, error) {
|
|
|
|
|
|
+func LoadBucketDir(dir string, dataDir string) ([]BucketFactory, chan types.Event, error) {
|
|
var (
|
|
var (
|
|
filenames []string
|
|
filenames []string
|
|
)
|
|
)
|
|
@@ -183,11 +183,11 @@ func LoadBucketDir(dir string) ([]BucketFactory, chan types.Event, error) {
|
|
for _, f := range files {
|
|
for _, f := range files {
|
|
filenames = append(filenames, dir+f.Name())
|
|
filenames = append(filenames, dir+f.Name())
|
|
}
|
|
}
|
|
- return LoadBuckets(filenames)
|
|
|
|
|
|
+ return LoadBuckets(filenames, dataDir)
|
|
}
|
|
}
|
|
|
|
|
|
/* Init recursively process yaml files from a directory and loads them as BucketFactory */
|
|
/* Init recursively process yaml files from a directory and loads them as BucketFactory */
|
|
-func LoadBucket(g *BucketFactory) error {
|
|
|
|
|
|
+func LoadBucket(g *BucketFactory, dataDir string) error {
|
|
var err error
|
|
var err error
|
|
if g.Debug {
|
|
if g.Debug {
|
|
var clog = logrus.New()
|
|
var clog = logrus.New()
|
|
@@ -222,7 +222,7 @@ func LoadBucket(g *BucketFactory) error {
|
|
|
|
|
|
if g.Filter == "" {
|
|
if g.Filter == "" {
|
|
g.logger.Warningf("Bucket without filter, abort.")
|
|
g.logger.Warningf("Bucket without filter, abort.")
|
|
- return fmt.Errorf("bucket without filter directive.")
|
|
|
|
|
|
+ return fmt.Errorf("bucket doesn't have filter")
|
|
}
|
|
}
|
|
g.RunTimeFilter, err = expr.Compile(g.Filter, expr.Env(exprhelpers.GetExprEnv(map[string]interface{}{"evt": &types.Event{}})))
|
|
g.RunTimeFilter, err = expr.Compile(g.Filter, expr.Env(exprhelpers.GetExprEnv(map[string]interface{}{"evt": &types.Event{}})))
|
|
if err != nil {
|
|
if err != nil {
|
|
@@ -236,6 +236,12 @@ func LoadBucket(g *BucketFactory) error {
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+ if len(g.Data) > 0 {
|
|
|
|
+ if err := getData(g.Data, dataDir); err != nil {
|
|
|
|
+ return fmt.Errorf("unable to download data: %s", err)
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
g.logger.Infof("Adding %s bucket", g.Type)
|
|
g.logger.Infof("Adding %s bucket", g.Type)
|
|
//return the Holder correponding to the type of bucket
|
|
//return the Holder correponding to the type of bucket
|
|
g.processors = []Processor{}
|
|
g.processors = []Processor{}
|