diff --git a/cmd/crowdsec/main.go b/cmd/crowdsec/main.go index c1d4b31e6..73dc16a4e 100644 --- a/cmd/crowdsec/main.go +++ b/cmd/crowdsec/main.go @@ -204,11 +204,11 @@ func main() { for _, scenarios := range CustomScenarios { bucketFiles = append(bucketFiles, scenarios.Filename) } - holders, outputEventChan, err = leaky.LoadBuckets(bucketFiles) + holders, outputEventChan, err = leaky.LoadBuckets(bucketFiles, cConfig.DataFolder) } else { log.Infof("Loading scenarios") - holders, outputEventChan, err = leaky.Init(map[string]string{"patterns": cConfig.ConfigFolder + "/scenarios/"}) + holders, outputEventChan, err = leaky.Init(map[string]string{"patterns": cConfig.ConfigFolder + "/scenarios/", "data": cConfig.DataFolder}) } if err != nil { log.Fatalf("Scenario loading failed : %v", err) diff --git a/pkg/cwhub/hubMgmt.go b/pkg/cwhub/hubMgmt.go index e6de2e97c..daa9ea7b4 100644 --- a/pkg/cwhub/hubMgmt.go +++ b/pkg/cwhub/hubMgmt.go @@ -749,7 +749,6 @@ func DownloadItem(target Item, tdir string, overwrite bool, dataFolder string) ( target.Tainted = false target.UpToDate = true - log.Infof("Installing : %+v \n", string(body)) dec := yaml.NewDecoder(bytes.NewReader(body)) for { data := &types.DataSet{} diff --git a/pkg/exprhelpers/exprlib.go b/pkg/exprhelpers/exprlib.go index 8dfc2c509..8da7120d7 100644 --- a/pkg/exprhelpers/exprlib.go +++ b/pkg/exprhelpers/exprlib.go @@ -1,12 +1,19 @@ package exprhelpers import ( + "bufio" + "os" + "path" + "regexp" "strconv" "strings" log "github.com/sirupsen/logrus" ) +var dataFile map[string][]string +var dataFileRegex map[string][]*regexp.Regexp + func Atof(x string) float64 { log.Debugf("debug atof %s", x) ret, err := strconv.ParseFloat(x, 64) @@ -26,9 +33,68 @@ func EndsWith(s string, suff string) bool { func GetExprEnv(ctx map[string]interface{}) map[string]interface{} { - var ExprLib = map[string]interface{}{"Atof": Atof, "JsonExtract": JsonExtract, "JsonExtractLib": JsonExtractLib} + var ExprLib = map[string]interface{}{"Atof": Atof, "JsonExtract": JsonExtract, "JsonExtractLib": JsonExtractLib, "File": File, "RegexpInFile": RegexpInFile} for k, v := range ctx { ExprLib[k] = v } return ExprLib } + +func Init() error { + log.Infof("Expr helper initiated") + dataFile = make(map[string][]string) + dataFileRegex = make(map[string][]*regexp.Regexp) + return nil +} + +func FileInit(fileFolder string, filename string) error { + filepath := path.Join(fileFolder, filename) + file, err := os.Open(filepath) + if err != nil { + log.Fatal(err) + } + defer file.Close() + + if _, ok := dataFile[filename]; !ok { + dataFile[filename] = []string{} + } + fileType := "string" + scanner := bufio.NewScanner(file) + scanner.Scan() + if scanner.Text() == "#type: regex" { // if file contains, it should have this header + fileType = "regex" + } + for scanner.Scan() { + if fileType == "regex" { + dataFileRegex[filename] = append(dataFileRegex[filename], regexp.MustCompile(scanner.Text())) + } else { + dataFile[filename] = append(dataFile[filename], scanner.Text()) + } + } + + if err := scanner.Err(); err != nil { + log.Fatal(err) + } + return nil +} + +func File(filename string) []string { + if _, ok := dataFile[filename]; ok { + return dataFile[filename] + } + log.Errorf("file '%s' not found for expr library", filename) + return []string{} +} + +func RegexpInFile(data string, filename string) bool { + if _, ok := dataFileRegex[filename]; ok { + for _, re := range dataFileRegex[filename] { + if re.Match([]byte(data)) { + return true + } + } + } else { + log.Errorf("file '%s' not found for expr library", filename) + } + return false +} diff --git a/pkg/leakybucket/manager.go b/pkg/leakybucket/manager.go index 377f1282a..cd8b1ea75 100644 --- a/pkg/leakybucket/manager.go +++ b/pkg/leakybucket/manager.go @@ -31,35 +31,36 @@ import ( // BucketFactory struct holds all fields for any bucket configuration. This is to have a // generic struct for buckets. This can be seen as a bucket factory. type BucketFactory struct { - FormatVersion string `yaml:"format"` - Author string `yaml:"author"` - Description string `yaml:"description"` - References []string `yaml:"references"` - Type string `yaml:"type"` //Type can be : leaky, counter, trigger. It determines the main bucket characteristics - Name string `yaml:"name"` //Name of the bucket, used later in log and user-messages. Should be unique - Capacity int `yaml:"capacity"` //Capacity is applicable to leaky buckets and determines the "burst" capacity - LeakSpeed string `yaml:"leakspeed"` //Leakspeed is a float representing how many events per second leak out of the bucket - Duration string `yaml:"duration"` //Duration allows 'counter' buckets to have a fixed life-time - Filter string `yaml:"filter"` //Filter is an expr that determines if an event is elligible for said bucket. Filter is evaluated against the Event struct - GroupBy string `yaml:"groupby,omitempty"` //groupy is an expr that allows to determine the partitions of the bucket. A common example is the source_ip - Distinct string `yaml:"distinct"` //Distinct, when present, adds a `Pour()` processor that will only pour uniq items (based on uniq_filter expr result) - Debug bool `yaml:"debug"` //Debug, when set to true, will enable debugging for _this_ scenario specifically - Labels map[string]string `yaml:"labels"` //Labels is K:V list aiming at providing context the overflow - Blackhole string `yaml:"blackhole,omitempty"` //Blackhole is a duration that, if present, will prevent same bucket partition to overflow more often than $duration - logger *log.Entry `yaml:"-"` //logger is bucket-specific logger (used by Debug as well) - Reprocess bool `yaml:"reprocess"` //Reprocess, if true, will for the bucket to be re-injected into processing chain - CacheSize int `yaml:"cache_size"` //CacheSize, if > 0, limits the size of in-memory cache of the bucket - Profiling bool `yaml:"profiling"` //Profiling, if true, will make the bucket record pours/overflows/etc. - OverflowFilter string `yaml:"overflow_filter"` //OverflowFilter if present, is a filter that must return true for the overflow to go through - BucketName string `yaml:"-"` - Filename string `yaml:"-"` - RunTimeFilter *vm.Program `json:"-"` - RunTimeGroupBy *vm.Program `json:"-"` - leakspeed time.Duration //internal representation of `Leakspeed` - duration time.Duration //internal representation of `Duration` - ret chan types.Event //the bucket-specific output chan for overflows - processors []Processor //processors is the list of hooks for pour/overflow/create (cf. uniq, blackhole etc.) - output bool //?? + FormatVersion string `yaml:"format"` + Author string `yaml:"author"` + Description string `yaml:"description"` + References []string `yaml:"references"` + Type string `yaml:"type"` //Type can be : leaky, counter, trigger. It determines the main bucket characteristics + Name string `yaml:"name"` //Name of the bucket, used later in log and user-messages. Should be unique + Capacity int `yaml:"capacity"` //Capacity is applicable to leaky buckets and determines the "burst" capacity + LeakSpeed string `yaml:"leakspeed"` //Leakspeed is a float representing how many events per second leak out of the bucket + Duration string `yaml:"duration"` //Duration allows 'counter' buckets to have a fixed life-time + Filter string `yaml:"filter"` //Filter is an expr that determines if an event is elligible for said bucket. Filter is evaluated against the Event struct + GroupBy string `yaml:"groupby,omitempty"` //groupy is an expr that allows to determine the partitions of the bucket. A common example is the source_ip + Distinct string `yaml:"distinct"` //Distinct, when present, adds a `Pour()` processor that will only pour uniq items (based on uniq_filter expr result) + Debug bool `yaml:"debug"` //Debug, when set to true, will enable debugging for _this_ scenario specifically + Labels map[string]string `yaml:"labels"` //Labels is K:V list aiming at providing context the overflow + Blackhole string `yaml:"blackhole,omitempty"` //Blackhole is a duration that, if present, will prevent same bucket partition to overflow more often than $duration + logger *log.Entry `yaml:"-"` //logger is bucket-specific logger (used by Debug as well) + Reprocess bool `yaml:"reprocess"` //Reprocess, if true, will for the bucket to be re-injected into processing chain + CacheSize int `yaml:"cache_size"` //CacheSize, if > 0, limits the size of in-memory cache of the bucket + Profiling bool `yaml:"profiling"` //Profiling, if true, will make the bucket record pours/overflows/etc. + OverflowFilter string `yaml:"overflow_filter"` //OverflowFilter if present, is a filter that must return true for the overflow to go through + BucketName string `yaml:"-"` + Filename string `yaml:"-"` + RunTimeFilter *vm.Program `json:"-"` + RunTimeGroupBy *vm.Program `json:"-"` + Data []*types.DataSource `yaml:"data,omitempty"` + leakspeed time.Duration //internal representation of `Leakspeed` + duration time.Duration //internal representation of `Duration` + ret chan types.Event //the bucket-specific output chan for overflows + processors []Processor //processors is the list of hooks for pour/overflow/create (cf. uniq, blackhole etc.) + output bool //?? } func ValidateFactory(b *BucketFactory) error { @@ -101,16 +102,18 @@ func ValidateFactory(b *BucketFactory) error { /* Init recursively process yaml files from a directory and loads them as BucketFactory */ func Init(cfg map[string]string) ([]BucketFactory, chan types.Event, error) { - return LoadBucketDir(cfg["patterns"]) + return LoadBucketDir(cfg["patterns"], cfg["data"]) } -func LoadBuckets(files []string) ([]BucketFactory, chan types.Event, error) { +func LoadBuckets(files []string, dataFolder string) ([]BucketFactory, chan types.Event, error) { var ( ret []BucketFactory = []BucketFactory{} response chan types.Event ) var seed namegenerator.Generator = namegenerator.NewNameGenerator(time.Now().UTC().UnixNano()) + exprhelpers.Init() + response = make(chan types.Event, 1) for _, f := range files { log.Debugf("Loading %s", f) @@ -160,7 +163,7 @@ func LoadBuckets(files []string) ([]BucketFactory, chan types.Event, error) { g.Filename = filepath.Clean(f) g.BucketName = seed.Generate() g.ret = response - err = LoadBucket(&g) + err = LoadBucket(&g, dataFolder) if err != nil { log.Errorf("Failed to load bucket : %v", err) return nil, nil, fmt.Errorf("loadBucket failed : %v", err) @@ -172,7 +175,7 @@ func LoadBuckets(files []string) ([]BucketFactory, chan types.Event, error) { return ret, response, nil } -func LoadBucketDir(dir string) ([]BucketFactory, chan types.Event, error) { +func LoadBucketDir(dir string, dataFolder string) ([]BucketFactory, chan types.Event, error) { var ( filenames []string ) @@ -183,11 +186,11 @@ func LoadBucketDir(dir string) ([]BucketFactory, chan types.Event, error) { for _, f := range files { filenames = append(filenames, dir+f.Name()) } - return LoadBuckets(filenames) + return LoadBuckets(filenames, dataFolder) } /* Init recursively process yaml files from a directory and loads them as BucketFactory */ -func LoadBucket(g *BucketFactory) error { +func LoadBucket(g *BucketFactory, dataFolder string) error { var err error if g.Debug { var clog = logrus.New() @@ -275,6 +278,12 @@ func LoadBucket(g *BucketFactory) error { g.processors = append(g.processors, blackhole) } + if len(g.Data) > 0 { + for _, data := range g.Data { + err = exprhelpers.FileInit(dataFolder, data.DestPath) + } + } + g.output = false if err := ValidateFactory(g); err != nil { return fmt.Errorf("invalid bucket from %s : %v", g.Filename, err) diff --git a/pkg/types/dataset.go b/pkg/types/dataset.go index de21a17ab..dd3797466 100644 --- a/pkg/types/dataset.go +++ b/pkg/types/dataset.go @@ -10,13 +10,13 @@ import ( log "github.com/sirupsen/logrus" ) -type dataSource struct { +type DataSource struct { SourceURL string `yaml:"source_url"` DestPath string `yaml:"dest_file"` } type DataSet struct { - Data []*dataSource `yaml:"data",omitempty` + Data []*DataSource `yaml:"data",omitempty` } func downloadFile(url string, destPath string) error { @@ -59,7 +59,7 @@ func downloadFile(url string, destPath string) error { return nil } -func GetData(data []*dataSource, dataDir string) error { +func GetData(data []*DataSource, dataDir string) error { for _, dataS := range data { destPath := path.Join(dataDir, dataS.DestPath) log.Infof("downloading data '%s' in '%s'", dataS.SourceURL, destPath)