Преглед на файлове

handle the auto-flush of old data

Thibault bui Koechlin преди 5 години
родител
ревизия
8bfce6f3e2
променени са 4 файла, в които са добавени 70 реда и са изтрити 32 реда
  1. 26 7
      pkg/cwplugin/backend.go
  2. 24 18
      pkg/outputs/ouputs.go
  3. 7 6
      pkg/sqlite/commit.go
  4. 13 1
      pkg/sqlite/sqlite.go

+ 26 - 7
pkg/cwplugin/backend.go

@@ -28,18 +28,28 @@ type BackendPlugin struct {
 	Name           string `yaml:"name"`
 	Path           string `yaml:"path"`
 	ConfigFilePath string
-	Config         map[string]string `yaml:"config"`
-	ID             string
-	funcs          Backend
+	//Config is passed to the backend plugin.
+	//It contains specific plugin config + plugin config from main yaml file
+	Config map[string]string `yaml:"config"`
+	ID     string
+	funcs  Backend
 }
 
 type BackendManager struct {
 	backendPlugins map[string]BackendPlugin
 }
 
-func NewBackendPlugin(path string, isDaemon bool) (*BackendManager, error) {
+func NewBackendPlugin(outputConfig map[string]string) (*BackendManager, error) {
 	var files []string
 	var backendManager = &BackendManager{}
+	var path string
+
+	if v, ok := outputConfig["backend"]; ok {
+		path = v
+	} else {
+		return nil, fmt.Errorf("missing 'backend' (path to backend plugins)")
+	}
+	//var path = output.BackendFolder
 	err := filepath.Walk(path, func(path string, info os.FileInfo, err error) error {
 		if filepath.Ext(path) == ".yaml" {
 			files = append(files, path)
@@ -88,10 +98,19 @@ func NewBackendPlugin(path string, isDaemon bool) (*BackendManager, error) {
 
 		// Add the interface and Init()
 		newPlugin.funcs = bInterface
-		if isDaemon {
-			newPlugin.Config["flush"] = "true"
+		// Merge backend config from main config file
+		if v, ok := outputConfig["max_records"]; ok {
+			newPlugin.Config["max_records"] = v
+		} else {
+			log.Warningf("missing 'max_records' parameters, setting to default (1000)")
+			newPlugin.Config["max_records"] = "1000"
+		}
+
+		if v, ok := outputConfig["max_records_age"]; ok {
+			newPlugin.Config["max_records_age"] = v
 		} else {
-			newPlugin.Config["flush"] = "false"
+			log.Warningf("missing 'max_records_age' parameters, setting to default (30d)")
+			newPlugin.Config["max_records_age"] = "30d"
 		}
 
 		err = newPlugin.funcs.Init(newPlugin.Config)

+ 24 - 18
pkg/outputs/ouputs.go

@@ -10,6 +10,7 @@ import (
 	"github.com/crowdsecurity/crowdsec/pkg/cwplugin"
 	"github.com/crowdsecurity/crowdsec/pkg/exprhelpers"
 	"github.com/crowdsecurity/crowdsec/pkg/types"
+	"github.com/pkg/errors"
 
 	"github.com/crowdsecurity/crowdsec/pkg/cwapi"
 
@@ -18,10 +19,18 @@ import (
 	"gopkg.in/yaml.v2"
 )
 
+//OutputFactory is part of the main yaml configuration file, and holds generic backend config
 type OutputFactory struct {
 	BackendFolder string `yaml:"backend"`
+	//For the db GC : how many records can we keep at most
+	MaxRecords string `yaml:"max_records"`
+	//For the db GC what is the oldest records we tolerate
+	MaxRecordsAge string `yaml:"max_records_age"`
+	//Should we automatically flush expired bans
+	Flush bool `yaml:"flush"`
 }
 
+//Output holds the runtime objects of backend
 type Output struct {
 	API      *cwapi.ApiCtx
 	bManager *cwplugin.BackendManager
@@ -95,8 +104,6 @@ func (o *Output) Shutdown() error {
 			reterr = err
 		}
 	}
-	//bManager
-	//TBD : the backend(s) should be stopped in the same way
 	return reterr
 }
 
@@ -281,19 +288,6 @@ func (o *Output) LoadAPIConfig(configFile string) error {
 	return nil
 }
 
-func (o *Output) load(config *OutputFactory, isDaemon bool) error {
-	var err error
-	if config == nil {
-		return fmt.Errorf("missing output plugin configuration")
-	}
-	log.Debugf("loading backend plugins ...")
-	o.bManager, err = cwplugin.NewBackendPlugin(config.BackendFolder, isDaemon)
-	if err != nil {
-		return err
-	}
-	return nil
-}
-
 func (o *Output) Delete(target string) (int, error) {
 	nbDel, err := o.bManager.Delete(target)
 	return nbDel, err
@@ -322,11 +316,23 @@ func (o *Output) ReadAT(timeAT time.Time) ([]map[string]string, error) {
 	return ret, nil
 }
 
-func NewOutput(config *OutputFactory, isDaemon bool) (*Output, error) {
+func NewOutput(config *OutputFactory) (*Output, error) {
 	var output Output
-	err := output.load(config, isDaemon)
+	var err error
+
+	if config == nil {
+		return nil, fmt.Errorf("missing output plugin configuration")
+	}
+	log.Debugf("loading backend plugins ...")
+	//turn the *OutputFactory into a map[string]string for less constraint
+	backendConfig := map[string]string{
+		"backend":         config.BackendFolder,
+		"max_records":     config.MaxRecords,
+		"max_records_age": config.MaxRecordsAge,
+		"flush":           strconv.FormatBool(config.Flush)}
+	output.bManager, err = cwplugin.NewBackendPlugin(backendConfig)
 	if err != nil {
-		return nil, err
+		return nil, errors.Wrap(err, "failed to load backend plugin")
 	}
 	return &output, nil
 }

+ 7 - 6
pkg/sqlite/commit.go

@@ -92,8 +92,9 @@ func (c *Context) CleanUpRecordsByCount() error {
 }
 
 func (c *Context) AutoCommit() {
-	log.Infof("starting autocommit")
+	log.Warningf("starting autocommit")
 	ticker := time.NewTicker(200 * time.Millisecond)
+	cleanUpTicker := time.NewTicker(1 * time.Minute)
 	for {
 		select {
 		case <-c.PusherTomb.Dying():
@@ -115,14 +116,14 @@ func (c *Context) AutoCommit() {
 		case <-ticker.C:
 			if atomic.LoadInt32(&c.count) != 0 &&
 				(atomic.LoadInt32(&c.count)%100 == 0 || time.Since(c.lastCommit) >= 500*time.Millisecond) {
-				//log.Warningf("flush time")
 				if err := c.Flush(); err != nil {
 					log.Errorf("failed to flush : %s", err)
 				}
-				//log.Printf("starting auto-cleanup")
-				if err := c.CleanUpRecordsByCount(); err != nil {
-					log.Errorf("error in auto-cleanup : %s", err)
-				}
+
+			}
+		case <-cleanUpTicker.C:
+			if err := c.CleanUpRecordsByCount(); err != nil {
+				log.Errorf("error in auto-cleanup : %s", err)
 			}
 
 		}

+ 13 - 1
pkg/sqlite/sqlite.go

@@ -32,6 +32,18 @@ func NewSQLite(cfg map[string]string) (*Context, error) {
 	var err error
 	c := &Context{}
 
+	if v, ok := cfg["max_records"]; ok {
+		c.maxEventRetention, err = strconv.Atoi(v)
+		if err != nil {
+			log.Errorf("Ignoring invalid max_records '%s' : %s", v, err)
+		}
+	}
+	if v, ok := cfg["max_records_duration"]; ok {
+		c.maxDurationRetention, err = time.ParseDuration(v)
+		if err != nil {
+			log.Errorf("Ignoring invalid duration '%s' : %s", v, err)
+		}
+	}
 	log.Warningf("NEW SQLITE : %+v", cfg)
 	if _, ok := cfg["db_path"]; !ok {
 		return nil, fmt.Errorf("please specify a 'db_path' to SQLite db in the configuration")
@@ -71,7 +83,7 @@ func NewSQLite(cfg map[string]string) (*Context, error) {
 		return nil, fmt.Errorf("failed to begin sqlite transac : %s", err)
 	}
 	//random attempt
-	c.maxEventRetention = 100
+	//c.maxEventRetention = 100
 	c.PusherTomb.Go(func() error {
 		c.AutoCommit()
 		return nil