update
This commit is contained in:
parent
f96cb2a70d
commit
ab525fff6a
5 changed files with 114 additions and 42 deletions
|
@ -20,6 +20,7 @@ import (
|
|||
"github.com/olekukonko/tablewriter"
|
||||
log "github.com/sirupsen/logrus"
|
||||
"github.com/spf13/cobra"
|
||||
"gopkg.in/yaml.v2"
|
||||
)
|
||||
|
||||
func NewConsoleCmd() *cobra.Command {
|
||||
|
@ -295,6 +296,21 @@ Disable given information push to the central API.`,
|
|||
cmdLabelAdd.MarkFlagRequired("value")
|
||||
cmdLabel.AddCommand(cmdLabelAdd)
|
||||
|
||||
cmdLabelStatus := &cobra.Command{
|
||||
Use: "status",
|
||||
Short: "List label to send with alerts",
|
||||
DisableAutoGenTag: true,
|
||||
Run: func(cmd *cobra.Command, args []string) {
|
||||
dump, err := yaml.Marshal(csConfig.Crowdsec.LabelsToSend)
|
||||
if err != nil {
|
||||
log.Fatalf("unable to show labels status: %s", err)
|
||||
}
|
||||
fmt.Println(dump)
|
||||
|
||||
},
|
||||
}
|
||||
cmdLabel.AddCommand(cmdLabelStatus)
|
||||
|
||||
cmdConsole.AddCommand(cmdLabel)
|
||||
|
||||
return cmdConsole
|
||||
|
|
|
@ -392,7 +392,6 @@ func (c *Client) CreateAlertBulk(machineId string, alertList []*models.Alert) ([
|
|||
return []string{}, errors.Wrapf(BulkError, "creating alert events: %s", err)
|
||||
}
|
||||
}
|
||||
|
||||
if len(alertItem.Meta) > 0 {
|
||||
metaBulk := make([]*ent.MetaCreate, len(alertItem.Meta))
|
||||
for i, metaItem := range alertItem.Meta {
|
||||
|
|
|
@ -7,6 +7,7 @@ import (
|
|||
"time"
|
||||
|
||||
//"log"
|
||||
"github.com/antonmedv/expr/vm"
|
||||
"github.com/crowdsecurity/crowdsec/pkg/time/rate"
|
||||
"github.com/crowdsecurity/crowdsec/pkg/types"
|
||||
"github.com/goombaio/namegenerator"
|
||||
|
@ -71,6 +72,7 @@ type Leaky struct {
|
|||
wgPour *sync.WaitGroup
|
||||
wgDumpState *sync.WaitGroup
|
||||
mutex *sync.Mutex //used only for TIMEMACHINE mode to allow garbage collection without races
|
||||
LabelsToSend map[string][]*vm.Program
|
||||
}
|
||||
|
||||
var BucketsPour = prometheus.NewCounterVec(
|
||||
|
@ -179,6 +181,7 @@ func FromFactory(bucketFactory BucketFactory) *Leaky {
|
|||
wgPour: bucketFactory.wgPour,
|
||||
wgDumpState: bucketFactory.wgDumpState,
|
||||
mutex: &sync.Mutex{},
|
||||
LabelsToSend: bucketFactory.LabelsToSendCompiled,
|
||||
}
|
||||
if l.BucketConfig.Capacity > 0 && l.BucketConfig.leakspeed != time.Duration(0) {
|
||||
l.Duration = time.Duration(l.BucketConfig.Capacity+1) * l.BucketConfig.leakspeed
|
||||
|
|
|
@ -32,46 +32,48 @@ import (
|
|||
// BucketFactory struct holds all fields for any bucket configuration. This is to have a
|
||||
// generic struct for buckets. This can be seen as a bucket factory.
|
||||
type BucketFactory struct {
|
||||
FormatVersion string `yaml:"format"`
|
||||
Author string `yaml:"author"`
|
||||
Description string `yaml:"description"`
|
||||
References []string `yaml:"references"`
|
||||
Type string `yaml:"type"` //Type can be : leaky, counter, trigger. It determines the main bucket characteristics
|
||||
Name string `yaml:"name"` //Name of the bucket, used later in log and user-messages. Should be unique
|
||||
Capacity int `yaml:"capacity"` //Capacity is applicable to leaky buckets and determines the "burst" capacity
|
||||
LeakSpeed string `yaml:"leakspeed"` //Leakspeed is a float representing how many events per second leak out of the bucket
|
||||
Duration string `yaml:"duration"` //Duration allows 'counter' buckets to have a fixed life-time
|
||||
Filter string `yaml:"filter"` //Filter is an expr that determines if an event is elligible for said bucket. Filter is evaluated against the Event struct
|
||||
GroupBy string `yaml:"groupby,omitempty"` //groupy is an expr that allows to determine the partitions of the bucket. A common example is the source_ip
|
||||
Distinct string `yaml:"distinct"` //Distinct, when present, adds a `Pour()` processor that will only pour uniq items (based on distinct expr result)
|
||||
Debug bool `yaml:"debug"` //Debug, when set to true, will enable debugging for _this_ scenario specifically
|
||||
Labels map[string]string `yaml:"labels"` //Labels is K:V list aiming at providing context the overflow
|
||||
Blackhole string `yaml:"blackhole,omitempty"` //Blackhole is a duration that, if present, will prevent same bucket partition to overflow more often than $duration
|
||||
logger *log.Entry `yaml:"-"` //logger is bucket-specific logger (used by Debug as well)
|
||||
Reprocess bool `yaml:"reprocess"` //Reprocess, if true, will for the bucket to be re-injected into processing chain
|
||||
CacheSize int `yaml:"cache_size"` //CacheSize, if > 0, limits the size of in-memory cache of the bucket
|
||||
Profiling bool `yaml:"profiling"` //Profiling, if true, will make the bucket record pours/overflows/etc.
|
||||
OverflowFilter string `yaml:"overflow_filter"` //OverflowFilter if present, is a filter that must return true for the overflow to go through
|
||||
ScopeType types.ScopeType `yaml:"scope,omitempty"` //to enforce a different remediation than blocking an IP. Will default this to IP
|
||||
BucketName string `yaml:"-"`
|
||||
Filename string `yaml:"-"`
|
||||
RunTimeFilter *vm.Program `json:"-"`
|
||||
ExprDebugger *exprhelpers.ExprDebugger `yaml:"-" json:"-"` // used to debug expression by printing the content of each variable of the expression
|
||||
RunTimeGroupBy *vm.Program `json:"-"`
|
||||
Data []*types.DataSource `yaml:"data,omitempty"`
|
||||
DataDir string `yaml:"-"`
|
||||
CancelOnFilter string `yaml:"cancel_on,omitempty"` //a filter that, if matched, kills the bucket
|
||||
leakspeed time.Duration //internal representation of `Leakspeed`
|
||||
duration time.Duration //internal representation of `Duration`
|
||||
ret chan types.Event //the bucket-specific output chan for overflows
|
||||
processors []Processor //processors is the list of hooks for pour/overflow/create (cf. uniq, blackhole etc.)
|
||||
output bool //??
|
||||
ScenarioVersion string `yaml:"version,omitempty"`
|
||||
hash string `yaml:"-"`
|
||||
Simulated bool `yaml:"simulated"` //Set to true if the scenario instanciating the bucket was in the exclusion list
|
||||
tomb *tomb.Tomb `yaml:"-"`
|
||||
wgPour *sync.WaitGroup `yaml:"-"`
|
||||
wgDumpState *sync.WaitGroup `yaml:"-"`
|
||||
FormatVersion string `yaml:"format"`
|
||||
Author string `yaml:"author"`
|
||||
Description string `yaml:"description"`
|
||||
References []string `yaml:"references"`
|
||||
Type string `yaml:"type"` //Type can be : leaky, counter, trigger. It determines the main bucket characteristics
|
||||
Name string `yaml:"name"` //Name of the bucket, used later in log and user-messages. Should be unique
|
||||
Capacity int `yaml:"capacity"` //Capacity is applicable to leaky buckets and determines the "burst" capacity
|
||||
LeakSpeed string `yaml:"leakspeed"` //Leakspeed is a float representing how many events per second leak out of the bucket
|
||||
Duration string `yaml:"duration"` //Duration allows 'counter' buckets to have a fixed life-time
|
||||
Filter string `yaml:"filter"` //Filter is an expr that determines if an event is elligible for said bucket. Filter is evaluated against the Event struct
|
||||
GroupBy string `yaml:"groupby,omitempty"` //groupy is an expr that allows to determine the partitions of the bucket. A common example is the source_ip
|
||||
Distinct string `yaml:"distinct"` //Distinct, when present, adds a `Pour()` processor that will only pour uniq items (based on distinct expr result)
|
||||
Debug bool `yaml:"debug"` //Debug, when set to true, will enable debugging for _this_ scenario specifically
|
||||
Labels map[string]string `yaml:"labels"` //Labels is K:V list aiming at providing context the overflow
|
||||
Blackhole string `yaml:"blackhole,omitempty"` //Blackhole is a duration that, if present, will prevent same bucket partition to overflow more often than $duration
|
||||
logger *log.Entry `yaml:"-"` //logger is bucket-specific logger (used by Debug as well)
|
||||
Reprocess bool `yaml:"reprocess"` //Reprocess, if true, will for the bucket to be re-injected into processing chain
|
||||
CacheSize int `yaml:"cache_size"` //CacheSize, if > 0, limits the size of in-memory cache of the bucket
|
||||
Profiling bool `yaml:"profiling"` //Profiling, if true, will make the bucket record pours/overflows/etc.
|
||||
OverflowFilter string `yaml:"overflow_filter"` //OverflowFilter if present, is a filter that must return true for the overflow to go through
|
||||
ScopeType types.ScopeType `yaml:"scope,omitempty"` //to enforce a different remediation than blocking an IP. Will default this to IP
|
||||
BucketName string `yaml:"-"`
|
||||
Filename string `yaml:"-"`
|
||||
RunTimeFilter *vm.Program `json:"-"`
|
||||
ExprDebugger *exprhelpers.ExprDebugger `yaml:"-" json:"-"` // used to debug expression by printing the content of each variable of the expression
|
||||
RunTimeGroupBy *vm.Program `json:"-"`
|
||||
Data []*types.DataSource `yaml:"data,omitempty"`
|
||||
DataDir string `yaml:"-"`
|
||||
CancelOnFilter string `yaml:"cancel_on,omitempty"` //a filter that, if matched, kills the bucket
|
||||
leakspeed time.Duration //internal representation of `Leakspeed`
|
||||
duration time.Duration //internal representation of `Duration`
|
||||
ret chan types.Event //the bucket-specific output chan for overflows
|
||||
processors []Processor //processors is the list of hooks for pour/overflow/create (cf. uniq, blackhole etc.)
|
||||
output bool //??
|
||||
ScenarioVersion string `yaml:"version,omitempty"`
|
||||
hash string `yaml:"-"`
|
||||
Simulated bool `yaml:"simulated"` //Set to true if the scenario instanciating the bucket was in the exclusion list
|
||||
tomb *tomb.Tomb `yaml:"-"`
|
||||
wgPour *sync.WaitGroup `yaml:"-"`
|
||||
wgDumpState *sync.WaitGroup `yaml:"-"`
|
||||
LabelsToSend map[string][]string `yaml:"-"`
|
||||
LabelsToSendCompiled map[string][]*vm.Program `yaml:"-"`
|
||||
}
|
||||
|
||||
func ValidateFactory(bucketFactory *BucketFactory) error {
|
||||
|
@ -216,6 +218,7 @@ func LoadBuckets(cscfg *csconfig.CrowdsecServiceCfg, files []string, tomb *tomb.
|
|||
|
||||
bucketFactory.wgDumpState = buckets.wgDumpState
|
||||
bucketFactory.wgPour = buckets.wgPour
|
||||
bucketFactory.LabelsToSend = cscfg.LabelsToSend
|
||||
err = LoadBucket(&bucketFactory, tomb)
|
||||
if err != nil {
|
||||
log.Errorf("Failed to load bucket %s : %v", bucketFactory.Name, err)
|
||||
|
@ -348,6 +351,18 @@ func LoadBucket(bucketFactory *BucketFactory, tomb *tomb.Tomb) error {
|
|||
return fmt.Errorf("invalid bucket from %s : %v", bucketFactory.Filename, err)
|
||||
}
|
||||
bucketFactory.tomb = tomb
|
||||
bucketFactory.LabelsToSendCompiled = make(map[string][]*vm.Program)
|
||||
for key, values := range bucketFactory.LabelsToSend {
|
||||
bucketFactory.LabelsToSendCompiled[key] = make([]*vm.Program, 0)
|
||||
for _, value := range values {
|
||||
valueCompiled, err := expr.Compile(value, expr.Env(exprhelpers.GetExprEnv(map[string]interface{}{"evt": &types.Event{}})))
|
||||
if err != nil {
|
||||
return fmt.Errorf("compilation of '%s' failed: %v", value, err)
|
||||
}
|
||||
bucketFactory.LabelsToSendCompiled[key] = append(bucketFactory.LabelsToSendCompiled[key], valueCompiled)
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
|
||||
}
|
||||
|
|
|
@ -1,6 +1,7 @@
|
|||
package leakybucket
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"net"
|
||||
"sort"
|
||||
|
@ -14,6 +15,7 @@ import (
|
|||
log "github.com/sirupsen/logrus"
|
||||
|
||||
"github.com/antonmedv/expr"
|
||||
"github.com/antonmedv/expr/vm"
|
||||
"github.com/crowdsecurity/crowdsec/pkg/exprhelpers"
|
||||
)
|
||||
|
||||
|
@ -232,7 +234,42 @@ func alertFormatSource(leaky *Leaky, queue *Queue) (map[string]models.Source, st
|
|||
return sources, source_type, nil
|
||||
}
|
||||
|
||||
func EventToLabel(Queue) {
|
||||
func EventToLabel(labels map[string][]*vm.Program, queue *Queue) models.Meta {
|
||||
meta := make([]*models.MetaItems0, 0)
|
||||
for _, evt := range queue.Queue {
|
||||
for key, values := range labels {
|
||||
tmpMeta := models.MetaItems0{}
|
||||
tmpMeta.Key = key
|
||||
tmpValue := make([]string, 0)
|
||||
|
||||
for _, value := range values {
|
||||
var val string
|
||||
output, err := expr.Run(value, exprhelpers.GetExprEnv(map[string]interface{}{"evt": evt}))
|
||||
if err != nil {
|
||||
log.Warningf("failed to get value of '%v': %v", value, err)
|
||||
continue
|
||||
}
|
||||
switch out := output.(type) {
|
||||
case string:
|
||||
val = out
|
||||
case int:
|
||||
val = strconv.Itoa(out)
|
||||
default:
|
||||
log.Warningf("unexpected return type for label to send : %T", output)
|
||||
continue
|
||||
}
|
||||
tmpValue = append(tmpValue, val)
|
||||
}
|
||||
valueBytes, err := json.Marshal(tmpValue)
|
||||
if err != nil {
|
||||
log.Warningf("unable to marshall label values to send: %s", err)
|
||||
}
|
||||
tmpMeta.Value = string(valueBytes)
|
||||
meta = append(meta, &tmpMeta)
|
||||
}
|
||||
}
|
||||
ret := models.Meta(meta)
|
||||
return ret
|
||||
|
||||
}
|
||||
|
||||
|
@ -296,6 +333,7 @@ func NewAlert(leaky *Leaky, queue *Queue) (types.RuntimeAlert, error) {
|
|||
*apiAlert.Message = fmt.Sprintf("%s %s performed '%s' (%d events over %s) at %s", source_scope, sourceStr, leaky.Name, leaky.Total_count, leaky.Ovflw_ts.Sub(leaky.First_ts), leaky.Last_ts)
|
||||
//Get the events from Leaky/Queue
|
||||
apiAlert.Events = EventsFromQueue(queue)
|
||||
apiAlert.Meta = EventToLabel(leaky.LabelsToSend, leaky.Queue)
|
||||
|
||||
//Loop over the Sources and generate appropriate number of ApiAlerts
|
||||
for _, srcValue := range sources {
|
||||
|
@ -321,5 +359,6 @@ func NewAlert(leaky *Leaky, queue *Queue) (types.RuntimeAlert, error) {
|
|||
if leaky.Reprocess {
|
||||
runtimeAlert.Reprocess = true
|
||||
}
|
||||
|
||||
return runtimeAlert, nil
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue