Переглянути джерело

Update sqlite plugin to database plugin that supported multiples Db backends

erenJag 5 роки тому
батько
коміт
fd99d132d1

+ 1 - 1
cmd/crowdsec-cli/api.go

@@ -125,7 +125,7 @@ func pullTOP() error {
 			return fmt.Errorf("failed to convert ban to signal : %s", err)
 		}
 		if err := outputCTX.Insert(signalOcc); err != nil {
-			log.Fatalf("Unable to write pull to sqliteDB : %+s", err.Error())
+			log.Fatalf("Unable to write pull to Database : %+s", err.Error())
 		}
 	}
 	outputCTX.Flush()

+ 5 - 5
cmd/crowdsec-cli/ban.go

@@ -68,7 +68,7 @@ func simpleBanToSignal(targetIP string, reason string, expirationStr string, act
 		banApp.EndIp = types.IP2Int(parsedIP)
 	}
 
-	var banApps = make([]types.BanApplication, 1)
+	var banApps = make([]types.BanApplication, 0)
 	banApps = append(banApps, banApp)
 	signalOcc = types.SignalOccurence{
 		Scenario:                            reason,
@@ -94,7 +94,7 @@ func BanList() error {
 	}
 	ret, err := outputCTX.ReadAT(at)
 	if err != nil {
-		return fmt.Errorf("unable to get records from sqlite : %v", err)
+		return fmt.Errorf("unable to get records from Database : %v", err)
 	}
 	if config.output == "raw" {
 		fmt.Printf("source,ip,reason,bans,action,country,as,events_count,expiration\n")
@@ -167,7 +167,7 @@ func BanAdd(target string, duration string, reason string, action string) error
 	if err != nil {
 		return err
 	}
-	log.Infof("%s %s for %s (%s)", action, target, duration, reason)
+	log.Infof("Wrote ban to database.")
 	return nil
 }
 
@@ -225,7 +225,7 @@ cscli ban add range 1.2.3.0/24 24h "the whole range"`,
 		Run: func(cmd *cobra.Command, args []string) {
 			reason := strings.Join(args[2:], " ")
 			if err := BanAdd(args[0], args[1], reason, remediationType); err != nil {
-				log.Fatalf("failed to add ban to sqlite : %v", err)
+				log.Fatalf("failed to add ban to database : %v", err)
 			}
 		},
 	}
@@ -239,7 +239,7 @@ cscli ban add range 1.2.3.0/24 24h "the whole range"`,
 		Run: func(cmd *cobra.Command, args []string) {
 			reason := strings.Join(args[2:], " ")
 			if err := BanAdd(args[0], args[1], reason, remediationType); err != nil {
-				log.Fatalf("failed to add ban to sqlite : %v", err)
+				log.Fatalf("failed to add ban to database : %v", err)
 			}
 		},
 	}

+ 18 - 0
config/plugins/backend/database.yaml

@@ -0,0 +1,18 @@
+name: database
+path: /usr/local/lib/crowdsec/plugins/backend/database.so
+config:
+  ## DB type supported (mysql, sqlite)
+  type: sqlite
+  
+  ## mysql options
+  # db_host: 172.17.0.2
+  # db_username: root
+  # db_password: totolol42
+  # db_name: crowdsec
+
+  ## sqlite options
+  db_path: /var/lib/crowdsec/data/crowdsec.db
+
+  ## Other options
+  flush: true
+  # debug: true

+ 0 - 4
config/plugins/backend/sqlite.yaml

@@ -1,4 +0,0 @@
-name: sqlite
-path: /usr/local/lib/crowdsec/plugins/backend/sqlite.so
-config:
-  db_path: /var/lib/crowdsec/data/crowdsec.db

+ 2 - 2
config/profiles.yaml

@@ -7,12 +7,12 @@ remediation:
   captcha: true
   duration: 4h
 outputs:
-  - plugin: sqlite
+  - plugin: database
 ---
 profile: default_notification
 filter: "sig.Labels.remediation != 'true'"
 #remediation is empty, it means non taken
 api: false
 outputs:
-  - plugin: sqlite  # If we do not want to push, we can remove this line and the next one
+  - plugin: database  # If we do not want to push, we can remove this line and the next one
     store: false

+ 2 - 2
pkg/sqlite/commit.go → pkg/database/commit.go

@@ -1,4 +1,4 @@
-package sqlite
+package database
 
 import (
 	"fmt"
@@ -151,7 +151,7 @@ func (c *Context) autoCommit() {
 		select {
 		case <-c.PusherTomb.Dying():
 			//we need to shutdown
-			log.Infof("sqlite routine shutdown")
+			log.Infof("database routine shutdown")
 			if err := c.Flush(); err != nil {
 				log.Errorf("error while flushing records: %s", err)
 			}

+ 108 - 0
pkg/database/database.go

@@ -0,0 +1,108 @@
+package database
+
+import (
+	"fmt"
+	"strconv"
+	"sync"
+	"time"
+
+	"github.com/crowdsecurity/crowdsec/pkg/types"
+	log "github.com/sirupsen/logrus"
+
+	"github.com/jinzhu/gorm"
+	_ "github.com/jinzhu/gorm/dialects/mysql"
+	_ "github.com/jinzhu/gorm/dialects/sqlite"
+	_ "github.com/mattn/go-sqlite3"
+	"gopkg.in/tomb.v2"
+)
+
+type Context struct {
+	Db         *gorm.DB //Pointer to database
+	tx         *gorm.DB //Pointer to current transaction (flushed on a regular basis)
+	lastCommit time.Time
+	flush      bool
+	count      int32
+	lock       sync.Mutex //booboo
+	PusherTomb tomb.Tomb
+}
+
+func checkConfig(cfg map[string]string) error {
+	switch dbType, _ := cfg["type"]; dbType {
+	case "sqlite":
+		if val, ok := cfg["db_path"]; !ok && val == "" {
+			return fmt.Errorf("please specify a 'db_path' to SQLite db in the configuration")
+		}
+	case "mysql":
+		if val, ok := cfg["db_host"]; !ok && val == "" {
+			return fmt.Errorf("please specify a 'db_host' to SQLite db in the configuration")
+		}
+
+		if val, ok := cfg["db_username"]; !ok && val == "" {
+			return fmt.Errorf("please specify a 'db_username' to SQLite db in the configuration")
+		}
+
+		if val, ok := cfg["db_password"]; !ok && val == "" {
+			return fmt.Errorf("please specify a 'db_password' to SQLite db in the configuration")
+		}
+
+		if val, ok := cfg["db_name"]; !ok && val == "" {
+			return fmt.Errorf("please specify a 'db_name' to SQLite db in the configuration")
+		}
+	default:
+		return fmt.Errorf("please specify a proper 'type' to the database configuration ")
+	}
+
+	return nil
+}
+
+func NewDatabase(cfg map[string]string) (*Context, error) {
+	var err error
+	c := &Context{}
+
+	if err = checkConfig(cfg); err != nil {
+		return nil, fmt.Errorf("bad database configuration : %v", err)
+	}
+
+	if cfg["type"] == "sqlite" {
+		c.Db, err = gorm.Open("sqlite3", cfg["db_path"]+"?_busy_timeout=1000")
+		if err != nil {
+			return nil, fmt.Errorf("failed to open %s : %s", cfg["db_path"], err)
+		}
+	}
+
+	if cfg["type"] == "mysql" {
+		gormArg := cfg["db_username"] + ":" + cfg["db_password"] + "@(" + cfg["db_host"] + ")/" + cfg["db_name"] + "?charset=utf8&parseTime=True&loc=Local"
+		c.Db, err = gorm.Open("mysql", gormArg)
+		if err != nil {
+			return nil, fmt.Errorf("failed to open %s database : %s", cfg["db_name"], err)
+		}
+	}
+
+	if val, ok := cfg["debug"]; ok && val == "true" {
+		log.Infof("Enabling debug for %s", cfg["type"])
+		c.Db.LogMode(true)
+	}
+
+	c.flush, _ = strconv.ParseBool(cfg["flush"])
+	// Migrate the schema
+	c.Db.AutoMigrate(&types.EventSequence{}, &types.SignalOccurence{}, &types.BanApplication{})
+	c.Db.Model(&types.SignalOccurence{}).Related(&types.EventSequence{})
+	c.Db.Model(&types.SignalOccurence{}).Related(&types.BanApplication{})
+	c.tx = c.Db.Begin()
+	c.lastCommit = time.Now()
+	ret := c.tx.Commit()
+
+	if ret.Error != nil {
+		return nil, fmt.Errorf("failed to commit records : %v", ret.Error)
+
+	}
+	c.tx = c.Db.Begin()
+	if c.tx == nil {
+		return nil, fmt.Errorf("failed to begin %s transac : %s", cfg["type"], err)
+	}
+	c.PusherTomb.Go(func() error {
+		c.AutoCommit()
+		return nil
+	})
+	return c, nil
+}

+ 1 - 1
pkg/sqlite/delete.go → pkg/database/delete.go

@@ -1,4 +1,4 @@
-package sqlite
+package database
 
 import (
 	"fmt"

+ 7 - 4
pkg/sqlite/stats.go → pkg/database/stats.go

@@ -1,4 +1,4 @@
-package sqlite
+package database
 
 import (
 	"fmt"
@@ -18,7 +18,8 @@ func (c *Context) GetStats(since time.Duration) ([]map[string]string, error) {
 	country_stats := make(map[string]string)
 
 	/*get records that are younger than 'since' */
-	records := c.Db.Order("updated_at desc").Where(`strftime("%s", created_at) >= strftime("%s", ?)`, time.Now().Add(-since)).Find(&sos)
+	//records := c.Db.Order("updated_at desc").Where(`strftime("%s", created_at) >= strftime("%s", ?)`, time.Now().Add(-since)).Find(&sos)
+	records := c.Db.Order("updated_at desc").Where("created_at >= ?", time.Now().Add(-since)).Find(&sos)
 	if records.Error != nil {
 		return nil, records.Error
 	}
@@ -78,7 +79,8 @@ func (c *Context) GetBansAt(at time.Time) ([]map[string]string, error) {
 	rets := make([]map[string]string, 0)
 	/*get non-expired records*/
 	//c.Db.LogMode(true)
-	records := c.Db.Order("updated_at desc").Where(`strftime("%s", until) >= strftime("%s", ?) AND strftime("%s", created_at) < strftime("%s", ?)`, at, at).Group("ip_text").Find(&bas) /*.Count(&count)*/
+	//records := c.Db.Order("updated_at desc").Where(`strftime("%s", until) >= strftime("%s", ?) AND strftime("%s", created_at) < strftime("%s", ?)`, at, at).Group("ip_text").Find(&bas) /*.Count(&count)*/
+	records := c.Db.Order("updated_at desc").Where("until >= ? AND created_at < ?", at, at).Group("ip_text").Find(&bas) /*.Count(&count)*/
 	if records.Error != nil {
 		return nil, records.Error
 	}
@@ -87,7 +89,8 @@ func (c *Context) GetBansAt(at time.Time) ([]map[string]string, error) {
 		/*
 		 fetch count of bans for this specific ip_text
 		*/
-		ret := c.Db.Table("ban_applications").Order("updated_at desc").Where(`ip_text = ? AND strftime("%s", until) >= strftime("%s", ?) AND strftime("%s", created_at) < strftime("%s", ?) AND deleted_at is NULL`, ba.IpText, at, at).Count(&count)
+		//ret := c.Db.Table("ban_applications").Order("updated_at desc").Where(`ip_text = ? AND strftime("%s", until) >= strftime("%s", ?) AND strftime("%s", created_at) < strftime("%s", ?) AND deleted_at is NULL`, ba.IpText, at, at).Count(&count)
+		ret := c.Db.Table("ban_applications").Order("updated_at desc").Where(`ip_text = ? AND until >= ? AND created_at < ? AND deleted_at is NULL`, ba.IpText, at, at).Count(&count)
 		if ret.Error != nil {
 			return nil, fmt.Errorf("failed to fetch records count for %s : %v", ba.IpText, ret.Error)
 		}

+ 2 - 2
pkg/sqlite/write.go → pkg/database/write.go

@@ -1,4 +1,4 @@
-package sqlite
+package database
 
 import (
 	"fmt"
@@ -31,7 +31,7 @@ func (c *Context) WriteSignal(sig types.SignalOccurence) error {
 	//sig.Scenario = sig.Scenario
 	if ret.Error != nil {
 		log.Errorf("FAILED : %+v \n", ret.Error)
-		return fmt.Errorf("failed to write signal occurence : %v", ret.Error)
+		return fmt.Errorf("failed to write signal occurrence : %v", ret.Error)
 	}
 	return nil
 }

+ 1 - 1
pkg/outputs/ouputs.go

@@ -125,7 +125,7 @@ func (o *Output) FlushAll() {
 	}
 	if o.bManager != nil {
 		if err := o.bManager.Flush(); err != nil {
-			log.Errorf("Failing Sqlite flush : %s", err)
+			log.Errorf("Failing database flush : %s", err)
 		}
 	}
 }

+ 0 - 88
pkg/sqlite/sqlite.go

@@ -1,88 +0,0 @@
-package sqlite
-
-import (
-	"fmt"
-	"strconv"
-	"sync"
-	"time"
-
-	"github.com/crowdsecurity/crowdsec/pkg/types"
-	"github.com/pkg/errors"
-	log "github.com/sirupsen/logrus"
-
-	"github.com/jinzhu/gorm"
-	_ "github.com/jinzhu/gorm/dialects/sqlite"
-	_ "github.com/mattn/go-sqlite3"
-	"gopkg.in/tomb.v2"
-)
-
-type Context struct {
-	Db         *gorm.DB //Pointer to sqlite db
-	tx         *gorm.DB //Pointer to current transaction (flushed on a regular basis)
-	lastCommit time.Time
-	flush      bool
-	count      int32
-	lock       sync.Mutex //booboo
-	PusherTomb tomb.Tomb
-	//to manage auto cleanup : max number of records *or* oldest
-	maxEventRetention    int
-	maxDurationRetention time.Duration
-}
-
-func NewSQLite(cfg map[string]string) (*Context, error) {
-	var err error
-	c := &Context{}
-
-	if v, ok := cfg["max_records"]; ok {
-		c.maxEventRetention, err = strconv.Atoi(v)
-		if err != nil {
-			log.Errorf("Ignoring invalid max_records '%s' : %s", v, err)
-		}
-	}
-	if v, ok := cfg["max_records_age"]; ok {
-		c.maxDurationRetention, err = time.ParseDuration(v)
-		if err != nil {
-			log.Errorf("Ignoring invalid duration '%s' : %s", v, err)
-		}
-	}
-	if _, ok := cfg["db_path"]; !ok {
-		return nil, fmt.Errorf("please specify a 'db_path' to SQLite db in the configuration")
-	}
-
-	if cfg["db_path"] == "" {
-		return nil, fmt.Errorf("please specify a 'db_path' to SQLite db in the configuration")
-	}
-	log.Debugf("Starting SQLite backend, path:%s", cfg["db_path"])
-
-	c.Db, err = gorm.Open("sqlite3", cfg["db_path"]+"?_busy_timeout=1000")
-	if err != nil {
-		return nil, fmt.Errorf("failed to open %s : %s", cfg["db_path"], err)
-	}
-
-	if val, ok := cfg["debug"]; ok && val == "true" {
-		log.Infof("Enabling debug for sqlite")
-		c.Db.LogMode(true)
-	}
-
-	c.flush, err = strconv.ParseBool(cfg["flush"])
-	if err != nil {
-		return nil, errors.Wrap(err, "Unable to parse 'flush' flag")
-	}
-	// Migrate the schema
-	c.Db.AutoMigrate(&types.EventSequence{}, &types.SignalOccurence{}, &types.BanApplication{})
-	c.Db.Model(&types.SignalOccurence{}).Related(&types.EventSequence{})
-	c.Db.Model(&types.SignalOccurence{}).Related(&types.BanApplication{})
-	c.tx = c.Db.Begin()
-	c.lastCommit = time.Now()
-	ret := c.tx.Commit()
-
-	if ret.Error != nil {
-		return nil, fmt.Errorf("failed to commit records : %v", ret.Error)
-
-	}
-	c.tx = c.Db.Begin()
-	if c.tx == nil {
-		return nil, fmt.Errorf("failed to begin sqlite transac : %s", err)
-	}
-	return c, nil
-}

+ 5 - 5
plugins/backend/sqlite.go → plugins/backend/database.go

@@ -4,14 +4,14 @@ import (
 	"fmt"
 	"time"
 
-	"github.com/crowdsecurity/crowdsec/pkg/sqlite"
+	"github.com/crowdsecurity/crowdsec/pkg/database"
 	"github.com/crowdsecurity/crowdsec/pkg/types"
 	log "github.com/sirupsen/logrus"
 )
 
-//nolint:unused // pluginDB is the interface for sqlite output plugin
+//nolint:unused // pluginDB is the interface for database output plugin
 type pluginDB struct {
-	CTX *sqlite.Context
+	CTX *database.Context
 }
 
 func (p *pluginDB) Shutdown() error {
@@ -29,8 +29,8 @@ func (p *pluginDB) StartAutoCommit() error {
 
 func (p *pluginDB) Init(config map[string]string) error {
 	var err error
-	log.Debugf("sqlite config : %+v \n", config)
-	p.CTX, err = sqlite.NewSQLite(config)
+	log.Debugf("database config : %+v \n", config)
+	p.CTX, err = database.NewDatabase(config)
 
 	if err != nil {
 		return err

+ 7 - 6
scripts/test_env.sh

@@ -39,14 +39,15 @@ PARSER_S02="$PARSER_DIR/s02-enrich"
 SCENARIOS_DIR="$CONFIG_DIR/scenarios"
 POSTOVERFLOWS_DIR="$CONFIG_DIR/postoverflows"
 PLUGIN_BACKEND_DIR="$CONFIG_DIR/plugins/backend/"
-SQLITE_PLUGIN_FILE="$PLUGIN_BACKEND_DIR/sqlite.yaml"
+DB_PLUGIN_FILE="$PLUGIN_BACKEND_DIR/database.yaml"
 
 gen_sqlite_config() {
-	echo "name: sqlite" >> "$SQLITE_PLUGIN_FILE"
-	echo "path: ./plugins/backend/sqlite.so" >> "$SQLITE_PLUGIN_FILE"
-	echo "config:" >> "$SQLITE_PLUGIN_FILE"
-	echo "  db_path: ./test.db" >> "$SQLITE_PLUGIN_FILE"
-	echo "  flush: true" >> "$SQLITE_PLUGIN_FILE"
+	echo "name: database" >> "$DB_PLUGIN_FILE"
+	echo "path: ./plugins/backend/database.so" >> "$DB_PLUGIN_FILE"
+	echo "config:" >> "$DB_PLUGIN_FILE"
+	echo "  type: sqlite" >> "$DB_PLUGIN_FILE"
+	echo "  db_path: ./test.db" >> "$DB_PLUGIN_FILE"
+	echo "  flush: true" >> "$DB_PLUGIN_FILE"
 }
 
 log_info() {