Compare commits
9 commits
master
...
blackhole-
Author | SHA1 | Date | |
---|---|---|---|
|
56f457d4e3 | ||
|
5ae97d582c | ||
|
662764929d | ||
|
7a00123ed0 | ||
|
cb1a86fdc8 | ||
|
51fdc38789 | ||
|
9f676844d9 | ||
|
10c97e46af | ||
|
9f72bbe725 |
21 changed files with 126 additions and 60 deletions
|
@ -11,6 +11,7 @@ import (
|
|||
"github.com/crowdsecurity/crowdsec/pkg/acquisition"
|
||||
"github.com/crowdsecurity/crowdsec/pkg/csconfig"
|
||||
"github.com/crowdsecurity/crowdsec/pkg/cwhub"
|
||||
"github.com/crowdsecurity/crowdsec/pkg/leakybucket"
|
||||
leaky "github.com/crowdsecurity/crowdsec/pkg/leakybucket"
|
||||
"github.com/crowdsecurity/crowdsec/pkg/parser"
|
||||
"github.com/crowdsecurity/crowdsec/pkg/types"
|
||||
|
@ -47,6 +48,8 @@ func runCrowdsec(cConfig *csconfig.Config, parsers *parser.Parsers) error {
|
|||
inputLineChan := make(chan types.Event)
|
||||
inputEventChan := make(chan types.Event)
|
||||
|
||||
leaky.BlackholeTracking = &sync.Map{}
|
||||
|
||||
//start go-routines for parsing, buckets pour and outputs.
|
||||
parserWg := &sync.WaitGroup{}
|
||||
parsersTomb.Go(func() error {
|
||||
|
@ -67,6 +70,16 @@ func runCrowdsec(cConfig *csconfig.Config, parsers *parser.Parsers) error {
|
|||
parserWg.Wait()
|
||||
|
||||
bucketWg := &sync.WaitGroup{}
|
||||
//Only start the blackhole GC routine if buckets GC is not enabled, ie we are replaying a file
|
||||
//This is because the GC routine expects events to happen in real time, which is not the case during a replay.
|
||||
if !cConfig.Crowdsec.BucketsGCEnabled {
|
||||
bucketsTomb.Go(func() error {
|
||||
bucketWg.Add(1)
|
||||
leakybucket.BlackholeGC(&bucketsTomb)
|
||||
bucketWg.Done()
|
||||
return nil
|
||||
})
|
||||
}
|
||||
bucketsTomb.Go(func() error {
|
||||
bucketWg.Add(1)
|
||||
/*restore previous state as well if present*/
|
||||
|
|
|
@ -149,6 +149,8 @@ func LoadAcquisition(cConfig *csconfig.Config) error {
|
|||
flags.Labels = labels
|
||||
flags.Labels["type"] = flags.SingleFileType
|
||||
|
||||
cConfig.Crowdsec.BucketsGCEnabled = true
|
||||
|
||||
dataSources, err = acquisition.LoadAcquisitionFromDSN(flags.OneShotDSN, flags.Labels)
|
||||
if err != nil {
|
||||
return errors.Wrapf(err, "failed to configure datasource for %s", flags.OneShotDSN)
|
||||
|
|
|
@ -114,7 +114,6 @@ func (c *Controller) sendAlertToPluginChannel(alert *models.Alert, profileID uin
|
|||
|
||||
// CreateAlert : write received alerts in body to the database
|
||||
func (c *Controller) CreateAlert(gctx *gin.Context) {
|
||||
|
||||
var input models.AddAlertsRequest
|
||||
|
||||
claims := jwt.ExtractClaims(gctx)
|
||||
|
|
|
@ -22,6 +22,7 @@ type DatabaseCfg struct {
|
|||
Flush *FlushDBCfg `yaml:"flush"`
|
||||
LogLevel *log.Level `yaml:"log_level"`
|
||||
MaxOpenConns *int `yaml:"max_open_conns,omitempty"`
|
||||
UseWal *bool `yaml:"use_wal,omitempty"`
|
||||
}
|
||||
|
||||
type AuthGCCfg struct {
|
||||
|
|
|
@ -888,7 +888,12 @@ func (c *Client) DeleteAlertWithFilter(filter map[string][]string) (int, error)
|
|||
return 0, errors.Wrap(DeleteFail, "alert query failed")
|
||||
}
|
||||
|
||||
for _, alertItem := range alertsToDelete {
|
||||
c.Log.Debugf("Deleting %d alerts", len(alertsToDelete))
|
||||
|
||||
for p, alertItem := range alertsToDelete {
|
||||
if p%100 == 0 {
|
||||
c.Log.Debugf("Deleting alert %d", p)
|
||||
}
|
||||
err = c.DeleteAlertGraph(alertItem)
|
||||
if err != nil {
|
||||
c.Log.Warningf("DeleteAlertWithFilter : %s", err)
|
||||
|
|
|
@ -76,7 +76,16 @@ func NewClient(config *csconfig.DatabaseCfg) (*Client, error) {
|
|||
return &Client{}, fmt.Errorf("unable to set perms on %s: %v", config.DbPath, err)
|
||||
}
|
||||
}
|
||||
drv, err := getEntDriver("sqlite3", dialect.SQLite, fmt.Sprintf("file:%s?_busy_timeout=100000&_fk=1", config.DbPath), config)
|
||||
if config.UseWal == nil {
|
||||
entLogger.Warn("you are using sqlite without WAL, this can have an impact of performance. If you do not store the database in a network share, set db_config.use_wal to true. Set to false to disable this warning")
|
||||
}
|
||||
var sqliteConnectionStringParameters string
|
||||
if config.UseWal != nil && *config.UseWal {
|
||||
sqliteConnectionStringParameters = "_busy_timeout=100000&_fk=1&_journal_mode=WAL"
|
||||
} else {
|
||||
sqliteConnectionStringParameters = "_busy_timeout=100000&_fk=1"
|
||||
}
|
||||
drv, err := getEntDriver("sqlite3", dialect.SQLite, fmt.Sprintf("file:%s?%s", config.DbPath, sqliteConnectionStringParameters), config)
|
||||
if err != nil {
|
||||
return &Client{}, errors.Wrapf(err, "failed opening connection to sqlite: %v", config.DbPath)
|
||||
}
|
||||
|
|
|
@ -5,17 +5,17 @@ import (
|
|||
"time"
|
||||
|
||||
"github.com/crowdsecurity/crowdsec/pkg/types"
|
||||
log "github.com/sirupsen/logrus"
|
||||
"gopkg.in/tomb.v2"
|
||||
)
|
||||
|
||||
type HiddenKey struct {
|
||||
key string
|
||||
expiration time.Time
|
||||
type Blackhole struct {
|
||||
duration time.Duration
|
||||
DumbProcessor
|
||||
}
|
||||
|
||||
type Blackhole struct {
|
||||
duration time.Duration
|
||||
hiddenKeys []HiddenKey
|
||||
DumbProcessor
|
||||
type BlackholeExpiration struct {
|
||||
blExpiration time.Time
|
||||
}
|
||||
|
||||
func NewBlackhole(bucketFactory *BucketFactory) (*Blackhole, error) {
|
||||
|
@ -26,43 +26,59 @@ func NewBlackhole(bucketFactory *BucketFactory) (*Blackhole, error) {
|
|||
}
|
||||
return &Blackhole{
|
||||
duration: duration,
|
||||
hiddenKeys: []HiddenKey{},
|
||||
DumbProcessor: DumbProcessor{},
|
||||
}, nil
|
||||
}
|
||||
|
||||
func CleanupBlackhole(lastEvent time.Time) {
|
||||
BlackholeTracking.Range(func(key, value interface{}) bool {
|
||||
cleanupDate := value.(BlackholeExpiration).blExpiration
|
||||
if cleanupDate.Before(lastEvent) {
|
||||
log.Debugf("Expiring blackhole for %s", key)
|
||||
BlackholeTracking.Delete(key)
|
||||
}
|
||||
return true
|
||||
})
|
||||
}
|
||||
|
||||
func BlackholeGC(bucketsTomb *tomb.Tomb) error {
|
||||
ticker := time.NewTicker(10 * time.Second)
|
||||
for {
|
||||
select {
|
||||
case <-bucketsTomb.Dying():
|
||||
ticker.Stop()
|
||||
BlackholeTracking.Range(func(key, value interface{}) bool {
|
||||
BlackholeTracking.Delete(key)
|
||||
return true
|
||||
})
|
||||
return nil
|
||||
case <-ticker.C:
|
||||
CleanupBlackhole(time.Now().UTC())
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (bl *Blackhole) OnBucketOverflow(bucketFactory *BucketFactory) func(*Leaky, types.RuntimeAlert, *Queue) (types.RuntimeAlert, *Queue) {
|
||||
return func(leaky *Leaky, alert types.RuntimeAlert, queue *Queue) (types.RuntimeAlert, *Queue) {
|
||||
var blackholed bool = false
|
||||
var tmp []HiddenKey
|
||||
// search if we are blackholed and refresh the slice
|
||||
for _, element := range bl.hiddenKeys {
|
||||
|
||||
if element.key == leaky.Mapkey {
|
||||
if element.expiration.After(leaky.Ovflw_ts) {
|
||||
leaky.logger.Debugf("Overflow discarded, still blackholed for %s", element.expiration.Sub(leaky.Ovflw_ts))
|
||||
blackholed = true
|
||||
}
|
||||
}
|
||||
|
||||
if element.expiration.After(leaky.Ovflw_ts) {
|
||||
tmp = append(tmp, element)
|
||||
if expiration, ok := BlackholeTracking.Load(leaky.Mapkey); ok {
|
||||
x := expiration.(BlackholeExpiration)
|
||||
if x.blExpiration.After(leaky.Ovflw_ts) {
|
||||
leaky.logger.Debugf("Blackhole already triggered for %s (remaining : %s", leaky.Mapkey, x.blExpiration.Sub(time.Now().UTC()))
|
||||
return types.RuntimeAlert{
|
||||
Mapkey: leaky.Mapkey,
|
||||
}, nil
|
||||
} else {
|
||||
leaky.logger.Debugf("%s left blackhole %s ago", element.key, leaky.Ovflw_ts.Sub(element.expiration))
|
||||
|
||||
leaky.logger.Debugf("Blackhole expired for %s", leaky.Mapkey)
|
||||
BlackholeTracking.Delete(leaky.Mapkey)
|
||||
}
|
||||
}
|
||||
bl.hiddenKeys = tmp
|
||||
|
||||
if blackholed {
|
||||
leaky.logger.Tracef("Event is blackholed (%s)", leaky.First_ts)
|
||||
return types.RuntimeAlert{
|
||||
Mapkey: leaky.Mapkey,
|
||||
}, nil
|
||||
}
|
||||
bl.hiddenKeys = append(bl.hiddenKeys, HiddenKey{leaky.Mapkey, leaky.Ovflw_ts.Add(bl.duration)})
|
||||
leaky.logger.Debugf("Adding overflow to blackhole (%s)", leaky.First_ts)
|
||||
BlackholeTracking.Store(leaky.Mapkey, BlackholeExpiration{
|
||||
blExpiration: leaky.Ovflw_ts.Add(bl.duration),
|
||||
})
|
||||
|
||||
leaky.logger.Debugf("Blackhole triggered for %s (expiration : %s)", leaky.Mapkey, leaky.Ovflw_ts.Add(bl.duration))
|
||||
return alert, queue
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -220,7 +220,6 @@ func LeakRoutine(leaky *Leaky) error {
|
|||
}
|
||||
}
|
||||
|
||||
leaky.logger.Debugf("Leaky routine starting, lifetime : %s", leaky.Duration)
|
||||
for {
|
||||
select {
|
||||
/*receiving an event*/
|
||||
|
|
|
@ -38,6 +38,8 @@ func TestBucket(t *testing.T) {
|
|||
log.Fatalf("exprhelpers init failed: %s", err)
|
||||
}
|
||||
|
||||
BlackholeTracking = &sync.Map{}
|
||||
|
||||
if envSetting != "" {
|
||||
if err := testOneBucket(t, envSetting, tomb); err != nil {
|
||||
t.Fatalf("Test '%s' failed : %s", envSetting, err)
|
||||
|
@ -64,8 +66,8 @@ func TestBucket(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
//during tests, we're likely to have only one scenario, and thus only one holder.
|
||||
//we want to avoid the death of the tomb because all existing buckets have been destroyed.
|
||||
// during tests, we're likely to have only one scenario, and thus only one holder.
|
||||
// we want to avoid the death of the tomb because all existing buckets have been destroyed.
|
||||
func watchTomb(tomb *tomb.Tomb) {
|
||||
for {
|
||||
if tomb.Alive() == false {
|
||||
|
|
|
@ -6,6 +6,7 @@ import (
|
|||
"io/ioutil"
|
||||
"math"
|
||||
"os"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/pkg/errors"
|
||||
|
@ -23,9 +24,13 @@ var serialized map[string]Leaky
|
|||
var BucketPourCache map[string][]types.Event
|
||||
var BucketPourTrack bool
|
||||
|
||||
/*The leaky routines lifecycle are based on "real" time.
|
||||
var BlackholeTracking *sync.Map
|
||||
|
||||
/*
|
||||
The leaky routines lifecycle are based on "real" time.
|
||||
But when we are running in time-machine mode, the reference time is in logs and not "real" time.
|
||||
Thus we need to garbage collect them to avoid a skyrocketing memory usage.*/
|
||||
Thus we need to garbage collect them to avoid a skyrocketing memory usage.
|
||||
*/
|
||||
func GarbageCollectBuckets(deadline time.Time, buckets *Buckets) error {
|
||||
buckets.wgPour.Wait()
|
||||
buckets.wgDumpState.Add(1)
|
||||
|
@ -272,7 +277,7 @@ func LoadOrStoreBucketFromHolder(partitionKey string, buckets *Buckets, holder B
|
|||
//once the created goroutine is ready to process event, we can return it
|
||||
<-fresh_bucket.Signal
|
||||
} else {
|
||||
holder.logger.Debugf("Unexpectedly found exisint bucket for %s", partitionKey)
|
||||
holder.logger.Debugf("Unexpectedly found existing bucket for %s", partitionKey)
|
||||
biface = actual
|
||||
}
|
||||
holder.logger.Debugf("Created new bucket %s", partitionKey)
|
||||
|
@ -340,7 +345,25 @@ func PourItemToHolders(parsed types.Event, holders []BucketFactory, buckets *Buc
|
|||
}
|
||||
}
|
||||
buckey := GetKey(holders[idx], groupby)
|
||||
|
||||
if x, ok := BlackholeTracking.Load(buckey); ok {
|
||||
holders[idx].logger.Debugf("Checking if blackhole has expired for %s", buckey)
|
||||
blackholeExp := x.(BlackholeExpiration)
|
||||
t := time.Now().UTC()
|
||||
if parsed.ExpectMode == TIMEMACHINE {
|
||||
//This is not optimal at all, date enrichment should also set parsed.Time to avoid parsing the date twice
|
||||
err := t.UnmarshalText([]byte(parsed.MarshaledTime))
|
||||
if err != nil {
|
||||
holders[idx].logger.Errorf("failed parsing time : %v", err)
|
||||
}
|
||||
holders[idx].logger.Debugf("Found TIMEMACHINE bucket, using %s as time, comparing against %s ", t, blackholeExp.blExpiration)
|
||||
}
|
||||
if blackholeExp.blExpiration.After(t) {
|
||||
holders[idx].logger.Debugf("Event is blackholed: %s (remaining: %s)", buckey, blackholeExp.blExpiration.Sub(t))
|
||||
continue
|
||||
}
|
||||
holders[idx].logger.Debugf("Event is no longer blackholed: %s", buckey)
|
||||
BlackholeTracking.Delete(buckey)
|
||||
}
|
||||
//we need to either find the existing bucket, or create a new one (if it's the first event to hit it for this partition key)
|
||||
bucket, err := LoadOrStoreBucketFromHolder(buckey, buckets, holders[idx], parsed.ExpectMode)
|
||||
if err != nil {
|
||||
|
|
|
@ -1,7 +1,7 @@
|
|||
# ssh bruteforce
|
||||
type: leaky
|
||||
debug: true
|
||||
name: test/simple-leaky
|
||||
name: test/simple-leaky-blackhole
|
||||
description: "Simple leaky"
|
||||
filter: "evt.Line.Labels.type =='testlog'"
|
||||
leakspeed: "10s"
|
||||
|
|
|
@ -91,16 +91,12 @@
|
|||
}
|
||||
},
|
||||
"Alert" : {
|
||||
"scenario": "test/simple-leaky",
|
||||
"scenario": "test/simple-leaky-blackhole",
|
||||
"events_count": 2
|
||||
|
||||
}
|
||||
}
|
||||
},
|
||||
{
|
||||
"Alert": {
|
||||
}
|
||||
},
|
||||
{
|
||||
"Alert": {
|
||||
"sources": {
|
||||
|
@ -112,7 +108,7 @@
|
|||
}
|
||||
},
|
||||
"Alert" : {
|
||||
"scenario": "test/simple-leaky",
|
||||
"scenario": "test/simple-leaky-blackhole",
|
||||
"events_count": 2
|
||||
|
||||
}
|
||||
|
|
|
@ -1,6 +1,6 @@
|
|||
type: leaky
|
||||
debug: true
|
||||
name: test/simple-leaky
|
||||
name: test/simple-leaky-ovfl
|
||||
description: "Simple leaky"
|
||||
filter: "evt.Line.Labels.type =='testlog'"
|
||||
leakspeed: "10s"
|
||||
|
|
|
@ -36,7 +36,7 @@
|
|||
}
|
||||
},
|
||||
"Alert" : {
|
||||
"scenario": "test/simple-leaky",
|
||||
"scenario": "test/simple-leaky-ovfl",
|
||||
"events_count": 2
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,7 +1,7 @@
|
|||
# ssh bruteforce
|
||||
type: leaky
|
||||
debug: true
|
||||
name: test/simple-leaky
|
||||
name: test/simple-leaky-underflow
|
||||
description: "Simple leaky"
|
||||
filter: "evt.Line.Labels.type =='testlog'"
|
||||
leakspeed: "0.5s"
|
||||
|
|
|
@ -1,7 +1,7 @@
|
|||
# ssh bruteforce
|
||||
type: leaky
|
||||
debug: true
|
||||
name: test/simple-leaky
|
||||
name: test/simple-leaky-state
|
||||
description: "Simple leaky"
|
||||
filter: "evt.Line.Labels.type =='testlog'"
|
||||
leakspeed: "10s"
|
||||
|
|
|
@ -1,6 +1,6 @@
|
|||
{
|
||||
"cdf58e6ae48e79ac3ae0f006e1a2e627eccd8b63": {
|
||||
"Name": "test/simple-leaky",
|
||||
"4fccb3db1e4c2e1c94bdb747336c8749bbc470ef": {
|
||||
"Name": "test/simple-leaky-state",
|
||||
"Mode": 1,
|
||||
"SerializedState": {
|
||||
"Limit": 0.1,
|
||||
|
@ -74,7 +74,7 @@
|
|||
},
|
||||
"Capacity": 3,
|
||||
"CacheSize": 0,
|
||||
"Mapkey": "cdf58e6ae48e79ac3ae0f006e1a2e627eccd8b63",
|
||||
"Mapkey": "4fccb3db1e4c2e1c94bdb747336c8749bbc470ef",
|
||||
"Reprocess": false,
|
||||
"Uuid": "dark-bush",
|
||||
"First_ts": "2020-01-01T10:00:04Z",
|
||||
|
|
|
@ -52,7 +52,7 @@
|
|||
}
|
||||
},
|
||||
"Alert" : {
|
||||
"scenario": "test/simple-leaky",
|
||||
"scenario": "test/simple-leaky-state",
|
||||
"events_count": 4
|
||||
}
|
||||
|
||||
|
|
|
@ -1,7 +1,7 @@
|
|||
# ssh bruteforce
|
||||
type: leaky
|
||||
debug: true
|
||||
name: test/simple-leaky
|
||||
name: test/simple-leaky-uniq
|
||||
description: "Simple leaky"
|
||||
filter: "evt.Line.Labels.type =='testlog'"
|
||||
leakspeed: "10s"
|
||||
|
|
|
@ -52,7 +52,7 @@
|
|||
}
|
||||
},
|
||||
"Alert" : {
|
||||
"scenario": "test/simple-leaky",
|
||||
"scenario": "test/simple-leaky-uniq",
|
||||
"events_count": 2
|
||||
}
|
||||
|
||||
|
|
|
@ -81,6 +81,7 @@ config_generate() {
|
|||
.config_paths.plugin_dir=strenv(PLUGIN_DIR) |
|
||||
.crowdsec_service.acquisition_path=strenv(CONFIG_DIR)+"/acquis.yaml" |
|
||||
.db_config.db_path=strenv(DATA_DIR)+"/crowdsec.db" |
|
||||
.db_config.use_wal=true |
|
||||
.api.client.credentials_path=strenv(CONFIG_DIR)+"/local_api_credentials.yaml" |
|
||||
.api.server.profiles_path=strenv(CONFIG_DIR)+"/profiles.yaml" |
|
||||
.api.server.console_path=strenv(CONFIG_DIR)+"/console.yaml" |
|
||||
|
|
Loading…
Reference in a new issue