2020-05-15 09:39:16 +00:00
package leakybucket
import (
"encoding/json"
2022-11-29 08:16:07 +00:00
"errors"
2020-05-15 09:39:16 +00:00
"fmt"
"io"
"os"
"path/filepath"
"strings"
2021-02-25 10:26:46 +00:00
"sync"
2020-05-15 09:39:16 +00:00
"time"
"github.com/antonmedv/expr"
"github.com/antonmedv/expr/vm"
2023-03-08 15:07:49 +00:00
"github.com/davecgh/go-spew/spew"
2020-05-15 09:39:16 +00:00
"github.com/goombaio/namegenerator"
2023-03-08 15:07:49 +00:00
log "github.com/sirupsen/logrus"
2021-02-25 10:26:46 +00:00
"gopkg.in/tomb.v2"
2020-05-15 09:39:16 +00:00
yaml "gopkg.in/yaml.v2"
2023-03-08 15:07:49 +00:00
"github.com/crowdsecurity/crowdsec/pkg/alertcontext"
"github.com/crowdsecurity/crowdsec/pkg/csconfig"
"github.com/crowdsecurity/crowdsec/pkg/cwhub"
"github.com/crowdsecurity/crowdsec/pkg/cwversion"
2020-05-15 09:39:16 +00:00
"github.com/crowdsecurity/crowdsec/pkg/exprhelpers"
2023-03-08 15:07:49 +00:00
"github.com/crowdsecurity/crowdsec/pkg/types"
2020-05-15 09:39:16 +00:00
)
// BucketFactory struct holds all fields for any bucket configuration. This is to have a
// generic struct for buckets. This can be seen as a bucket factory.
type BucketFactory struct {
2023-01-06 08:26:16 +00:00
FormatVersion string ` yaml:"format" `
Author string ` yaml:"author" `
Description string ` yaml:"description" `
References [ ] string ` yaml:"references" `
Type string ` yaml:"type" ` //Type can be : leaky, counter, trigger. It determines the main bucket characteristics
Name string ` yaml:"name" ` //Name of the bucket, used later in log and user-messages. Should be unique
Capacity int ` yaml:"capacity" ` //Capacity is applicable to leaky buckets and determines the "burst" capacity
LeakSpeed string ` yaml:"leakspeed" ` //Leakspeed is a float representing how many events per second leak out of the bucket
Duration string ` yaml:"duration" ` //Duration allows 'counter' buckets to have a fixed life-time
Filter string ` yaml:"filter" ` //Filter is an expr that determines if an event is elligible for said bucket. Filter is evaluated against the Event struct
GroupBy string ` yaml:"groupby,omitempty" ` //groupy is an expr that allows to determine the partitions of the bucket. A common example is the source_ip
Distinct string ` yaml:"distinct" ` //Distinct, when present, adds a `Pour()` processor that will only pour uniq items (based on distinct expr result)
Debug bool ` yaml:"debug" ` //Debug, when set to true, will enable debugging for _this_ scenario specifically
Labels map [ string ] string ` yaml:"labels" ` //Labels is K:V list aiming at providing context the overflow
Blackhole string ` yaml:"blackhole,omitempty" ` //Blackhole is a duration that, if present, will prevent same bucket partition to overflow more often than $duration
logger * log . Entry ` yaml:"-" ` //logger is bucket-specific logger (used by Debug as well)
Reprocess bool ` yaml:"reprocess" ` //Reprocess, if true, will for the bucket to be re-injected into processing chain
CacheSize int ` yaml:"cache_size" ` //CacheSize, if > 0, limits the size of in-memory cache of the bucket
Profiling bool ` yaml:"profiling" ` //Profiling, if true, will make the bucket record pours/overflows/etc.
OverflowFilter string ` yaml:"overflow_filter" ` //OverflowFilter if present, is a filter that must return true for the overflow to go through
ConditionalOverflow string ` yaml:"condition" ` //condition if present, is an expression that must return true for the bucket to overflow
2023-06-21 13:08:27 +00:00
BayesianPrior float32 ` yaml:"bayesian_prior" `
BayesianThreshold float32 ` yaml:"bayesian_threshold" `
BayesianConditions [ ] RawBayesianCondition ` yaml:"bayesian_conditions" ` //conditions for the bayesian bucket
2023-01-06 08:26:16 +00:00
ScopeType types . ScopeType ` yaml:"scope,omitempty" ` //to enforce a different remediation than blocking an IP. Will default this to IP
BucketName string ` yaml:"-" `
Filename string ` yaml:"-" `
RunTimeFilter * vm . Program ` json:"-" `
ExprDebugger * exprhelpers . ExprDebugger ` yaml:"-" json:"-" ` // used to debug expression by printing the content of each variable of the expression
RunTimeGroupBy * vm . Program ` json:"-" `
Data [ ] * types . DataSource ` yaml:"data,omitempty" `
DataDir string ` yaml:"-" `
CancelOnFilter string ` yaml:"cancel_on,omitempty" ` //a filter that, if matched, kills the bucket
leakspeed time . Duration //internal representation of `Leakspeed`
duration time . Duration //internal representation of `Duration`
ret chan types . Event //the bucket-specific output chan for overflows
processors [ ] Processor //processors is the list of hooks for pour/overflow/create (cf. uniq, blackhole etc.)
output bool //??
ScenarioVersion string ` yaml:"version,omitempty" `
hash string ` yaml:"-" `
Simulated bool ` yaml:"simulated" ` //Set to true if the scenario instantiating the bucket was in the exclusion list
tomb * tomb . Tomb ` yaml:"-" `
wgPour * sync . WaitGroup ` yaml:"-" `
wgDumpState * sync . WaitGroup ` yaml:"-" `
2020-05-15 09:39:16 +00:00
}
2023-01-06 08:26:16 +00:00
// we use one NameGenerator for all the future buckets
2022-06-15 08:02:00 +00:00
var seed namegenerator . Generator = namegenerator . NewNameGenerator ( time . Now ( ) . UTC ( ) . UnixNano ( ) )
2020-11-30 09:37:17 +00:00
func ValidateFactory ( bucketFactory * BucketFactory ) error {
if bucketFactory . Name == "" {
2020-05-19 19:31:06 +00:00
return fmt . Errorf ( "bucket must have name" )
2020-05-15 09:39:16 +00:00
}
2020-11-30 09:37:17 +00:00
if bucketFactory . Description == "" {
2020-05-19 19:31:06 +00:00
return fmt . Errorf ( "description is mandatory" )
2020-05-15 09:39:16 +00:00
}
2020-11-30 09:37:17 +00:00
if bucketFactory . Type == "leaky" {
if bucketFactory . Capacity <= 0 { //capacity must be a positive int
return fmt . Errorf ( "bad capacity for leaky '%d'" , bucketFactory . Capacity )
2020-05-15 09:39:16 +00:00
}
2020-11-30 09:37:17 +00:00
if bucketFactory . LeakSpeed == "" {
2020-05-15 09:39:16 +00:00
return fmt . Errorf ( "leakspeed can't be empty for leaky" )
}
2020-11-30 09:37:17 +00:00
if bucketFactory . leakspeed == 0 {
return fmt . Errorf ( "bad leakspeed for leaky '%s'" , bucketFactory . LeakSpeed )
2020-05-15 09:39:16 +00:00
}
2020-11-30 09:37:17 +00:00
} else if bucketFactory . Type == "counter" {
if bucketFactory . Duration == "" {
2023-01-06 08:26:16 +00:00
return fmt . Errorf ( "duration can't be empty for counter" )
2020-05-15 09:39:16 +00:00
}
2020-11-30 09:37:17 +00:00
if bucketFactory . duration == 0 {
return fmt . Errorf ( "bad duration for counter bucket '%d'" , bucketFactory . duration )
2020-05-15 09:39:16 +00:00
}
2020-11-30 09:37:17 +00:00
if bucketFactory . Capacity != - 1 {
2020-05-19 19:31:06 +00:00
return fmt . Errorf ( "counter bucket must have -1 capacity" )
2020-05-15 09:39:16 +00:00
}
2020-11-30 09:37:17 +00:00
} else if bucketFactory . Type == "trigger" {
if bucketFactory . Capacity != 0 {
2020-05-19 19:31:06 +00:00
return fmt . Errorf ( "trigger bucket must have 0 capacity" )
2020-05-15 09:39:16 +00:00
}
2023-01-06 08:26:16 +00:00
} else if bucketFactory . Type == "conditional" {
if bucketFactory . ConditionalOverflow == "" {
return fmt . Errorf ( "conditional bucket must have a condition" )
}
if bucketFactory . Capacity != - 1 {
bucketFactory . logger . Warnf ( "Using a value different than -1 as capacity for conditional bucket, this may lead to unexpected overflows" )
}
if bucketFactory . LeakSpeed == "" {
return fmt . Errorf ( "leakspeed can't be empty for conditional bucket" )
}
if bucketFactory . leakspeed == 0 {
return fmt . Errorf ( "bad leakspeed for conditional bucket '%s'" , bucketFactory . LeakSpeed )
}
2023-06-21 13:08:27 +00:00
} else if bucketFactory . Type == "bayesian" {
if bucketFactory . BayesianConditions == nil {
return fmt . Errorf ( "bayesian bucket must have bayesian conditions" )
}
if bucketFactory . BayesianPrior == 0 {
return fmt . Errorf ( "bayesian bucket must have a valid, non-zero prior" )
}
if bucketFactory . BayesianThreshold == 0 {
return fmt . Errorf ( "bayesian bucket must have a valid, non-zero threshold" )
}
if bucketFactory . BayesianPrior > 1 {
return fmt . Errorf ( "bayesian bucket must have a valid, non-zero prior" )
}
if bucketFactory . BayesianThreshold > 1 {
return fmt . Errorf ( "bayesian bucket must have a valid, non-zero threshold" )
}
if bucketFactory . Capacity != - 1 {
return fmt . Errorf ( "bayesian bucket must have capacity -1" )
}
2020-05-15 09:39:16 +00:00
} else {
2020-11-30 09:37:17 +00:00
return fmt . Errorf ( "unknown bucket type '%s'" , bucketFactory . Type )
2020-05-15 09:39:16 +00:00
}
2020-11-30 09:37:17 +00:00
switch bucketFactory . ScopeType . Scope {
case types . Undefined :
bucketFactory . ScopeType . Scope = types . Ip
case types . Ip :
case types . Range :
2022-02-14 15:50:52 +00:00
var (
runTimeFilter * vm . Program
err error
)
if bucketFactory . ScopeType . Filter != "" {
2023-03-28 08:49:01 +00:00
if runTimeFilter , err = expr . Compile ( bucketFactory . ScopeType . Filter , exprhelpers . GetExprOptions ( map [ string ] interface { } { "evt" : & types . Event { } } ) ... ) ; err != nil {
2022-02-14 15:50:52 +00:00
return fmt . Errorf ( "Error compiling the scope filter: %s" , err )
}
bucketFactory . ScopeType . RunTimeFilter = runTimeFilter
}
2020-11-30 09:37:17 +00:00
default :
//Compile the scope filter
var (
runTimeFilter * vm . Program
err error
)
2022-02-14 15:50:52 +00:00
if bucketFactory . ScopeType . Filter != "" {
2023-03-28 08:49:01 +00:00
if runTimeFilter , err = expr . Compile ( bucketFactory . ScopeType . Filter , exprhelpers . GetExprOptions ( map [ string ] interface { } { "evt" : & types . Event { } } ) ... ) ; err != nil {
2022-02-14 15:50:52 +00:00
return fmt . Errorf ( "Error compiling the scope filter: %s" , err )
}
bucketFactory . ScopeType . RunTimeFilter = runTimeFilter
2020-11-30 09:37:17 +00:00
}
}
return nil
2020-05-15 09:39:16 +00:00
}
2021-02-25 10:26:46 +00:00
func LoadBuckets ( cscfg * csconfig . CrowdsecServiceCfg , files [ ] string , tomb * tomb . Tomb , buckets * Buckets ) ( [ ] BucketFactory , chan types . Event , error ) {
2020-05-15 09:39:16 +00:00
var (
2023-03-16 15:25:50 +00:00
ret = [ ] BucketFactory { }
2020-05-15 09:39:16 +00:00
response chan types . Event
)
response = make ( chan types . Event , 1 )
for _ , f := range files {
2020-11-30 09:37:17 +00:00
log . Debugf ( "Loading '%s'" , f )
2022-06-22 15:01:27 +00:00
if ! strings . HasSuffix ( f , ".yaml" ) && ! strings . HasSuffix ( f , ".yml" ) {
2020-05-15 09:39:16 +00:00
log . Debugf ( "Skipping %s : not a yaml file" , f )
continue
}
//process the yaml
bucketConfigurationFile , err := os . Open ( f )
if err != nil {
log . Errorf ( "Can't access leaky configuration file %s" , f )
return nil , nil , err
}
dec := yaml . NewDecoder ( bucketConfigurationFile )
dec . SetStrict ( true )
for {
2020-11-30 09:37:17 +00:00
bucketFactory := BucketFactory { }
err = dec . Decode ( & bucketFactory )
2020-05-15 09:39:16 +00:00
if err != nil {
2022-11-29 08:16:07 +00:00
if ! errors . Is ( err , io . EOF ) {
2020-05-15 09:39:16 +00:00
log . Errorf ( "Bad yaml in %s : %v" , f , err )
2020-05-19 19:31:06 +00:00
return nil , nil , fmt . Errorf ( "bad yaml in %s : %v" , f , err )
2020-05-15 09:39:16 +00:00
}
2022-02-01 21:08:06 +00:00
log . Tracef ( "End of yaml file" )
break
2020-05-15 09:39:16 +00:00
}
2020-11-30 09:37:17 +00:00
bucketFactory . DataDir = cscfg . DataDir
2020-05-15 09:39:16 +00:00
//check empty
2020-11-30 09:37:17 +00:00
if bucketFactory . Name == "" {
2020-05-15 09:39:16 +00:00
log . Errorf ( "Won't load nameless bucket" )
2020-05-19 19:31:06 +00:00
return nil , nil , fmt . Errorf ( "nameless bucket" )
2020-05-15 09:39:16 +00:00
}
//check compat
2020-11-30 09:37:17 +00:00
if bucketFactory . FormatVersion == "" {
log . Tracef ( "no version in %s : %s, assuming '1.0'" , bucketFactory . Name , f )
bucketFactory . FormatVersion = "1.0"
2020-05-15 09:39:16 +00:00
}
2023-05-23 08:52:47 +00:00
ok , err := cwversion . Satisfies ( bucketFactory . FormatVersion , cwversion . Constraint_scenario )
2020-05-15 09:39:16 +00:00
if err != nil {
log . Fatalf ( "Failed to check version : %s" , err )
}
if ! ok {
2020-11-30 09:37:17 +00:00
log . Errorf ( "can't load %s : %s doesn't satisfy scenario format %s, skip" , bucketFactory . Name , bucketFactory . FormatVersion , cwversion . Constraint_scenario )
2020-05-15 09:39:16 +00:00
continue
}
2020-11-30 09:37:17 +00:00
bucketFactory . Filename = filepath . Clean ( f )
bucketFactory . BucketName = seed . Generate ( )
bucketFactory . ret = response
hubItem , err := cwhub . GetItemByPath ( cwhub . SCENARIOS , bucketFactory . Filename )
if err != nil {
log . Errorf ( "scenario %s (%s) couldn't be find in hub (ignore if in unit tests)" , bucketFactory . Name , bucketFactory . Filename )
} else {
if cscfg . SimulationConfig != nil {
bucketFactory . Simulated = cscfg . SimulationConfig . IsSimulated ( hubItem . Name )
}
if hubItem != nil {
bucketFactory . ScenarioVersion = hubItem . LocalVersion
bucketFactory . hash = hubItem . LocalHash
} else {
log . Errorf ( "scenario %s (%s) couldn't be find in hub (ignore if in unit tests)" , bucketFactory . Name , bucketFactory . Filename )
}
}
2021-02-25 10:26:46 +00:00
bucketFactory . wgDumpState = buckets . wgDumpState
bucketFactory . wgPour = buckets . wgPour
err = LoadBucket ( & bucketFactory , tomb )
2020-05-15 09:39:16 +00:00
if err != nil {
2020-11-30 09:37:17 +00:00
log . Errorf ( "Failed to load bucket %s : %v" , bucketFactory . Name , err )
return nil , nil , fmt . Errorf ( "loading of %s failed : %v" , bucketFactory . Name , err )
2020-05-15 09:39:16 +00:00
}
2020-11-30 09:37:17 +00:00
ret = append ( ret , bucketFactory )
2020-05-15 09:39:16 +00:00
}
}
2023-01-04 15:50:02 +00:00
if err := alertcontext . NewAlertContext ( cscfg . ContextToSend , cscfg . ConsoleContextValueLength ) ; err != nil {
return nil , nil , fmt . Errorf ( "unable to load alert context: %s" , err )
}
2020-05-15 09:39:16 +00:00
log . Warningf ( "Loaded %d scenarios" , len ( ret ) )
return ret , response , nil
}
/* Init recursively process yaml files from a directory and loads them as BucketFactory */
2021-02-25 10:26:46 +00:00
func LoadBucket ( bucketFactory * BucketFactory , tomb * tomb . Tomb ) error {
2020-05-15 09:39:16 +00:00
var err error
2020-11-30 09:37:17 +00:00
if bucketFactory . Debug {
2023-03-08 15:07:49 +00:00
var clog = log . New ( )
2020-05-27 12:35:26 +00:00
if err := types . ConfigureLogger ( clog ) ; err != nil {
2020-05-27 09:51:49 +00:00
log . Fatalf ( "While creating bucket-specific logger : %s" , err )
}
2020-05-15 09:39:16 +00:00
clog . SetLevel ( log . DebugLevel )
2020-11-30 09:37:17 +00:00
bucketFactory . logger = clog . WithFields ( log . Fields {
"cfg" : bucketFactory . BucketName ,
"name" : bucketFactory . Name ,
"file" : bucketFactory . Filename ,
2020-05-15 09:39:16 +00:00
} )
} else {
/* else bind it to the default one (might find something more elegant here)*/
2020-11-30 09:37:17 +00:00
bucketFactory . logger = log . WithFields ( log . Fields {
"cfg" : bucketFactory . BucketName ,
"name" : bucketFactory . Name ,
"file" : bucketFactory . Filename ,
2020-05-15 09:39:16 +00:00
} )
}
2020-11-30 09:37:17 +00:00
if bucketFactory . LeakSpeed != "" {
if bucketFactory . leakspeed , err = time . ParseDuration ( bucketFactory . LeakSpeed ) ; err != nil {
return fmt . Errorf ( "bad leakspeed '%s' in %s : %v" , bucketFactory . LeakSpeed , bucketFactory . Filename , err )
2020-05-15 09:39:16 +00:00
}
} else {
2020-11-30 09:37:17 +00:00
bucketFactory . leakspeed = time . Duration ( 0 )
2020-05-15 09:39:16 +00:00
}
2020-11-30 09:37:17 +00:00
if bucketFactory . Duration != "" {
if bucketFactory . duration , err = time . ParseDuration ( bucketFactory . Duration ) ; err != nil {
return fmt . Errorf ( "invalid Duration '%s' in %s : %v" , bucketFactory . Duration , bucketFactory . Filename , err )
2020-05-15 09:39:16 +00:00
}
}
2020-11-30 09:37:17 +00:00
if bucketFactory . Filter == "" {
2022-06-22 07:38:23 +00:00
bucketFactory . logger . Warning ( "Bucket without filter, abort." )
2020-05-27 16:21:18 +00:00
return fmt . Errorf ( "bucket without filter directive" )
2020-05-15 09:39:16 +00:00
}
2023-03-28 08:49:01 +00:00
bucketFactory . RunTimeFilter , err = expr . Compile ( bucketFactory . Filter , exprhelpers . GetExprOptions ( map [ string ] interface { } { "evt" : & types . Event { } } ) ... )
2020-05-15 09:39:16 +00:00
if err != nil {
2020-11-30 09:37:17 +00:00
return fmt . Errorf ( "invalid filter '%s' in %s : %v" , bucketFactory . Filter , bucketFactory . Filename , err )
2020-05-15 09:39:16 +00:00
}
2020-11-30 09:37:17 +00:00
if bucketFactory . Debug {
2023-03-28 08:49:01 +00:00
bucketFactory . ExprDebugger , err = exprhelpers . NewDebugger ( bucketFactory . Filter , exprhelpers . GetExprOptions ( map [ string ] interface { } { "evt" : & types . Event { } } ) ... )
2020-08-03 10:21:15 +00:00
if err != nil {
2020-11-30 09:37:17 +00:00
log . Errorf ( "unable to build debug filter for '%s' : %s" , bucketFactory . Filter , err )
2020-08-03 10:21:15 +00:00
}
}
2020-05-15 09:39:16 +00:00
2020-11-30 09:37:17 +00:00
if bucketFactory . GroupBy != "" {
2023-03-28 08:49:01 +00:00
bucketFactory . RunTimeGroupBy , err = expr . Compile ( bucketFactory . GroupBy , exprhelpers . GetExprOptions ( map [ string ] interface { } { "evt" : & types . Event { } } ) ... )
2020-05-15 09:39:16 +00:00
if err != nil {
2020-11-30 09:37:17 +00:00
return fmt . Errorf ( "invalid groupby '%s' in %s : %v" , bucketFactory . GroupBy , bucketFactory . Filename , err )
2020-05-15 09:39:16 +00:00
}
}
2020-11-30 09:37:17 +00:00
bucketFactory . logger . Infof ( "Adding %s bucket" , bucketFactory . Type )
2022-04-27 09:04:12 +00:00
//return the Holder corresponding to the type of bucket
2020-11-30 09:37:17 +00:00
bucketFactory . processors = [ ] Processor { }
switch bucketFactory . Type {
2020-05-15 09:39:16 +00:00
case "leaky" :
2020-11-30 09:37:17 +00:00
bucketFactory . processors = append ( bucketFactory . processors , & DumbProcessor { } )
2020-05-15 09:39:16 +00:00
case "trigger" :
2020-11-30 09:37:17 +00:00
bucketFactory . processors = append ( bucketFactory . processors , & Trigger { } )
2020-05-15 09:39:16 +00:00
case "counter" :
2020-11-30 09:37:17 +00:00
bucketFactory . processors = append ( bucketFactory . processors , & DumbProcessor { } )
2023-01-06 08:26:16 +00:00
case "conditional" :
bucketFactory . processors = append ( bucketFactory . processors , & DumbProcessor { } )
2023-06-21 13:08:27 +00:00
case "bayesian" :
bucketFactory . processors = append ( bucketFactory . processors , & DumbProcessor { } )
2020-05-15 09:39:16 +00:00
default :
2020-11-30 09:37:17 +00:00
return fmt . Errorf ( "invalid type '%s' in %s : %v" , bucketFactory . Type , bucketFactory . Filename , err )
2020-05-15 09:39:16 +00:00
}
2020-11-30 09:37:17 +00:00
if bucketFactory . Distinct != "" {
2023-03-27 14:01:42 +00:00
bucketFactory . logger . Tracef ( "Adding a non duplicate filter" )
2020-11-30 09:37:17 +00:00
bucketFactory . processors = append ( bucketFactory . processors , & Uniq { } )
2020-05-15 09:39:16 +00:00
}
2021-12-17 08:56:02 +00:00
if bucketFactory . CancelOnFilter != "" {
2023-03-27 14:01:42 +00:00
bucketFactory . logger . Tracef ( "Adding a cancel_on filter" )
2021-12-17 08:56:02 +00:00
bucketFactory . processors = append ( bucketFactory . processors , & CancelOnFilter { } )
}
2020-11-30 09:37:17 +00:00
if bucketFactory . OverflowFilter != "" {
bucketFactory . logger . Tracef ( "Adding an overflow filter" )
filovflw , err := NewOverflowFilter ( bucketFactory )
2020-05-15 09:39:16 +00:00
if err != nil {
2020-11-30 09:37:17 +00:00
bucketFactory . logger . Errorf ( "Error creating overflow_filter : %s" , err )
2020-05-19 19:31:06 +00:00
return fmt . Errorf ( "error creating overflow_filter : %s" , err )
2020-05-15 09:39:16 +00:00
}
2020-11-30 09:37:17 +00:00
bucketFactory . processors = append ( bucketFactory . processors , filovflw )
2020-05-15 09:39:16 +00:00
}
2020-11-30 09:37:17 +00:00
if bucketFactory . Blackhole != "" {
bucketFactory . logger . Tracef ( "Adding blackhole." )
blackhole , err := NewBlackhole ( bucketFactory )
2020-05-15 09:39:16 +00:00
if err != nil {
2020-11-30 09:37:17 +00:00
bucketFactory . logger . Errorf ( "Error creating blackhole : %s" , err )
2020-05-19 19:31:06 +00:00
return fmt . Errorf ( "error creating blackhole : %s" , err )
2020-05-15 09:39:16 +00:00
}
2020-11-30 09:37:17 +00:00
bucketFactory . processors = append ( bucketFactory . processors , blackhole )
2020-05-15 09:39:16 +00:00
}
2023-01-06 08:26:16 +00:00
if bucketFactory . ConditionalOverflow != "" {
2023-03-27 14:01:42 +00:00
bucketFactory . logger . Tracef ( "Adding conditional overflow" )
bucketFactory . processors = append ( bucketFactory . processors , & ConditionalOverflow { } )
2023-01-06 08:26:16 +00:00
}
2023-06-21 13:08:27 +00:00
if bucketFactory . BayesianThreshold != 0 {
bucketFactory . logger . Tracef ( "Adding bayesian processor" )
bucketFactory . processors = append ( bucketFactory . processors , & BayesianBucket { } )
}
2020-11-30 09:37:17 +00:00
if len ( bucketFactory . Data ) > 0 {
for _ , data := range bucketFactory . Data {
2020-06-01 16:32:01 +00:00
if data . DestPath == "" {
2020-11-30 09:37:17 +00:00
bucketFactory . logger . Errorf ( "no dest_file provided for '%s'" , bucketFactory . Name )
2020-06-01 16:32:01 +00:00
continue
}
2020-11-30 09:37:17 +00:00
err = exprhelpers . FileInit ( bucketFactory . DataDir , data . DestPath , data . Type )
2020-05-27 14:31:08 +00:00
if err != nil {
2022-06-22 13:53:53 +00:00
bucketFactory . logger . Errorf ( "unable to init data for file '%s': %s" , data . DestPath , err )
2020-05-27 14:31:08 +00:00
}
2023-03-08 15:07:49 +00:00
if data . Type == "regexp" { //cache only makes sense for regexp
exprhelpers . RegexpCacheInit ( data . DestPath , * data )
}
2020-05-27 14:31:08 +00:00
}
}
2020-11-30 09:37:17 +00:00
bucketFactory . output = false
if err := ValidateFactory ( bucketFactory ) ; err != nil {
return fmt . Errorf ( "invalid bucket from %s : %v" , bucketFactory . Filename , err )
2020-05-15 09:39:16 +00:00
}
2021-02-25 10:26:46 +00:00
bucketFactory . tomb = tomb
2023-01-04 15:50:02 +00:00
2020-05-15 09:39:16 +00:00
return nil
}
2020-11-30 09:37:17 +00:00
func LoadBucketsState ( file string , buckets * Buckets , bucketFactories [ ] BucketFactory ) error {
2020-05-15 09:39:16 +00:00
var state map [ string ] Leaky
2022-09-06 11:55:03 +00:00
body , err := os . ReadFile ( file )
2020-05-15 09:39:16 +00:00
if err != nil {
return fmt . Errorf ( "can't state file %s : %s" , file , err )
}
if err := json . Unmarshal ( body , & state ) ; err != nil {
return fmt . Errorf ( "can't unmarshal state file %s : %s" , file , err )
}
for k , v := range state {
var tbucket * Leaky
log . Debugf ( "Reloading bucket %s" , k )
val , ok := buckets . Bucket_map . Load ( k )
if ok {
log . Fatalf ( "key %s already exists : %+v" , k , val )
}
//find back our holder
found := false
2020-11-30 09:37:17 +00:00
for _ , h := range bucketFactories {
2020-05-15 09:39:16 +00:00
if h . Name == v . Name {
log . Debugf ( "found factory %s/%s -> %s" , h . Author , h . Name , h . Description )
//check in which mode the bucket was
2023-03-16 15:25:50 +00:00
if v . Mode == types . TIMEMACHINE {
2020-05-15 09:39:16 +00:00
tbucket = NewTimeMachine ( h )
2023-03-16 15:25:50 +00:00
} else if v . Mode == types . LIVE {
2020-05-15 09:39:16 +00:00
tbucket = NewLeaky ( h )
} else {
log . Errorf ( "Unknown bucket type : %d" , v . Mode )
}
/*Trying to restore queue state*/
tbucket . Queue = v . Queue
/*Trying to set the limiter to the saved values*/
tbucket . Limiter . Load ( v . SerializedState )
2022-06-13 12:41:05 +00:00
tbucket . In = make ( chan * types . Event )
2020-05-15 09:39:16 +00:00
tbucket . Mapkey = k
tbucket . Signal = make ( chan bool , 1 )
tbucket . First_ts = v . First_ts
tbucket . Last_ts = v . Last_ts
tbucket . Ovflw_ts = v . Ovflw_ts
tbucket . Total_count = v . Total_count
buckets . Bucket_map . Store ( k , tbucket )
2021-02-25 10:26:46 +00:00
h . tomb . Go ( func ( ) error {
return LeakRoutine ( tbucket )
} )
2020-05-20 08:49:17 +00:00
<- tbucket . Signal
2020-05-15 09:39:16 +00:00
found = true
break
}
}
2020-05-20 08:49:17 +00:00
if ! found {
2020-05-15 09:39:16 +00:00
log . Fatalf ( "Unable to find holder for bucket %s : %s" , k , spew . Sdump ( v ) )
}
}
log . Infof ( "Restored %d buckets from dump" , len ( state ) )
return nil
}