updated mysql plugin support (#135)
* add support for plugin, support mysql & so on * fix queries Co-authored-by: erenJag <erenJag> Co-authored-by: AlteredCoder <AlteredCoder>
This commit is contained in:
parent
7fe6741df3
commit
177480cff7
21 changed files with 254 additions and 194 deletions
|
@ -124,7 +124,7 @@ func pullTOP() error {
|
|||
return fmt.Errorf("failed to convert ban to signal : %s", err)
|
||||
}
|
||||
if err := outputCTX.Insert(signalOcc); err != nil {
|
||||
log.Fatalf("Unable to write pull to sqliteDB : %+s", err.Error())
|
||||
log.Fatalf("Unable to write pull to Database : %+s", err.Error())
|
||||
}
|
||||
}
|
||||
outputCTX.Flush()
|
||||
|
|
|
@ -72,7 +72,7 @@ func simpleBanToSignal(targetIP string, reason string, expirationStr string, act
|
|||
banApp.EndIp = types.IP2Int(parsedIP)
|
||||
}
|
||||
|
||||
var banApps = make([]types.BanApplication, 1)
|
||||
var banApps = make([]types.BanApplication, 0)
|
||||
banApps = append(banApps, banApp)
|
||||
signalOcc = types.SignalOccurence{
|
||||
Scenario: reason,
|
||||
|
@ -194,7 +194,7 @@ func BanList() error {
|
|||
}
|
||||
ret, err := outputCTX.ReadAT(at)
|
||||
if err != nil {
|
||||
return fmt.Errorf("unable to get records from sqlite : %v", err)
|
||||
return fmt.Errorf("unable to get records from Database : %v", err)
|
||||
}
|
||||
ret, err = filterBans(ret)
|
||||
if err != nil {
|
||||
|
@ -356,7 +356,7 @@ cscli ban add range 1.2.3.0/24 24h "the whole range"`,
|
|||
Run: func(cmd *cobra.Command, args []string) {
|
||||
reason := strings.Join(args[2:], " ")
|
||||
if err := BanAdd(args[0], args[1], reason, remediationType); err != nil {
|
||||
log.Fatalf("failed to add ban to sqlite : %v", err)
|
||||
log.Fatalf("failed to add ban to database : %v", err)
|
||||
}
|
||||
},
|
||||
}
|
||||
|
@ -370,7 +370,7 @@ cscli ban add range 1.2.3.0/24 24h "the whole range"`,
|
|||
Run: func(cmd *cobra.Command, args []string) {
|
||||
reason := strings.Join(args[2:], " ")
|
||||
if err := BanAdd(args[0], args[1], reason, remediationType); err != nil {
|
||||
log.Fatalf("failed to add ban to sqlite : %v", err)
|
||||
log.Fatalf("failed to add ban to database : %v", err)
|
||||
}
|
||||
},
|
||||
}
|
||||
|
|
|
@ -29,7 +29,7 @@ func NewConfigCmd() *cobra.Command {
|
|||
var cmdConfig = &cobra.Command{
|
||||
Use: "config [command] <value>",
|
||||
Short: "Allows to view/edit cscli config",
|
||||
Long: `Allow to configure sqlite path and installation directory.
|
||||
Long: `Allow to configure database plugin path and installation directory.
|
||||
If no commands are specified, config is in interactive mode.`,
|
||||
Example: ` - cscli config show
|
||||
- cscli config prompt`,
|
||||
|
|
|
@ -7,6 +7,7 @@ log_dir: "./logs"
|
|||
log_mode: "stdout"
|
||||
log_level: info
|
||||
prometheus: false
|
||||
simulation_path: ./config/simulation.yaml
|
||||
profiling: false
|
||||
apimode: false
|
||||
plugin:
|
||||
|
|
19
config/plugins/backend/database.yaml
Normal file
19
config/plugins/backend/database.yaml
Normal file
|
@ -0,0 +1,19 @@
|
|||
name: database
|
||||
path: /usr/local/lib/crowdsec/plugins/backend/database.so
|
||||
config:
|
||||
## DB type supported (mysql, sqlite)
|
||||
## By default it using sqlite
|
||||
type: sqlite
|
||||
|
||||
## mysql options
|
||||
# db_host: localhost
|
||||
# db_username: crowdsec
|
||||
# db_password: crowdsec
|
||||
# db_name: crowdsec
|
||||
|
||||
## sqlite options
|
||||
db_path: /var/lib/crowdsec/data/crowdsec.db
|
||||
|
||||
## Other options
|
||||
flush: true
|
||||
# debug: true
|
|
@ -1,4 +0,0 @@
|
|||
name: sqlite
|
||||
path: /usr/local/lib/crowdsec/plugins/backend/sqlite.so
|
||||
config:
|
||||
db_path: /var/lib/crowdsec/data/crowdsec.db
|
|
@ -7,14 +7,14 @@ remediation:
|
|||
captcha: true
|
||||
duration: 4h
|
||||
outputs:
|
||||
- plugin: sqlite
|
||||
- plugin: database
|
||||
---
|
||||
profile: default_notification
|
||||
filter: "sig.Labels.remediation != 'true'"
|
||||
#remediation is empty, it means non taken
|
||||
api: false
|
||||
outputs:
|
||||
- plugin: sqlite # If we do not want to push, we can remove this line and the next one
|
||||
- plugin: database # If we do not want to push, we can remove this line and the next one
|
||||
store: false
|
||||
---
|
||||
profile: send_false_positif_to_API
|
||||
|
@ -22,5 +22,5 @@ filter: "sig.Whitelisted == true && sig.Labels.remediation == 'true'"
|
|||
#remediation is empty, it means non taken
|
||||
api: true
|
||||
outputs:
|
||||
- plugin: sqlite # If we do not want to push, we can remove this line and the next one
|
||||
- plugin: database # If we do not want to push, we can remove this line and the next one
|
||||
store: false
|
|
@ -19,7 +19,7 @@ sudo ./install.sh
|
|||
```
|
||||
|
||||
|
||||
When an IP is referenced in the SQLite database, it will be put in an ipset blacklist to ban that IP.
|
||||
When an IP is referenced in the database, it will be put in an ipset blacklist to ban that IP.
|
||||
|
||||
|
||||
ⓘ IPv4 and IPv6 are supported.
|
|
@ -18,5 +18,5 @@ sudo ./install.sh
|
|||
sudo systemctl restart nginx
|
||||
```
|
||||
|
||||
When an IP is referenced in the SQLite database, any request from this IP will lead to a `403` reply.
|
||||
When an IP is referenced in the database, any request from this IP will lead to a `403` reply.
|
||||
|
||||
|
|
|
@ -32,14 +32,14 @@ remediation:
|
|||
captcha: true
|
||||
duration: 4h
|
||||
outputs:
|
||||
- plugin: sqlite
|
||||
- plugin: database
|
||||
---
|
||||
profile: default_notification
|
||||
filter: "sig.Labels.remediation != 'true'"
|
||||
#remediation is empty, it means non taken
|
||||
api: false
|
||||
outputs:
|
||||
- plugin: sqlite # If we do not want to push, we can remove this line and the next one
|
||||
- plugin: database # If we do not want to push, we can remove this line and the next one
|
||||
store: false
|
||||
```
|
||||
|
||||
|
@ -59,10 +59,10 @@ path: <path_to_plugin_binary> #
|
|||
config: <plugin_config> # in a form of key(string)/value(string)
|
||||
```
|
||||
|
||||
For the plugin sqlite, here is its configuration file:
|
||||
For the plugin database, here is its configuration file:
|
||||
```yaml
|
||||
name: sqlite
|
||||
path: /usr/local/lib/crowdsec/plugins/backend/sqlite.so
|
||||
name: database
|
||||
path: /usr/local/lib/crowdsec/plugins/backend/database.so
|
||||
config:
|
||||
db_path: /var/lib/crowdsec/data/crowdsec.db
|
||||
flush: true
|
||||
|
|
|
@ -328,7 +328,7 @@ time="12-05-2020 12:31:43" level=warning msg="xx.xx.16.6 triggered a 4h0m0s ip b
|
|||
...
|
||||
^C
|
||||
$ {{cli.bin}} ban list
|
||||
INFO[0000] backend plugin 'sqlite' loaded
|
||||
INFO[0000] backend plugin 'database' loaded
|
||||
8 local decisions:
|
||||
+--------+-----------------+----------------------+------+--------+---------+--------------------------+--------+------------+
|
||||
| SOURCE | IP | REASON | BANS | ACTION | COUNTRY | AS | EVENTS | EXPIRATION |
|
||||
|
|
|
@ -1,4 +1,4 @@
|
|||
package sqlite
|
||||
package database
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
|
@ -12,8 +12,10 @@ import (
|
|||
|
||||
func (c *Context) DeleteExpired() error {
|
||||
//Delete the expired records
|
||||
now := time.Now()
|
||||
if c.flush {
|
||||
retx := c.Db.Where(`strftime("%s", until) < strftime("%s", "now")`).Delete(types.BanApplication{})
|
||||
//retx := c.Db.Where(`strftime("%s", until) < strftime("%s", "now")`).Delete(types.BanApplication{})
|
||||
retx := c.Db.Delete(types.BanApplication{}, "until < ?", now)
|
||||
if retx.RowsAffected > 0 {
|
||||
log.Infof("Flushed %d expired entries from Ban Application", retx.RowsAffected)
|
||||
}
|
||||
|
@ -96,8 +98,10 @@ func (c *Context) CleanUpRecordsByCount() error {
|
|||
}
|
||||
|
||||
sos := []types.BanApplication{}
|
||||
now := time.Now()
|
||||
/*get soft deleted records oldest to youngest*/
|
||||
records := c.Db.Unscoped().Table("ban_applications").Where("deleted_at is not NULL").Where(`strftime("%s", deleted_at) < strftime("%s", "now")`).Find(&sos)
|
||||
//records := c.Db.Unscoped().Table("ban_applications").Where("deleted_at is not NULL").Where(`strftime("%s", deleted_at) < strftime("%s", "now")`).Find(&sos)
|
||||
records := c.Db.Unscoped().Table("ban_applications").Where("deleted_at is not NULL").Where("deleted_at < ?", now).Find(&sos)
|
||||
if records.Error != nil {
|
||||
return errors.Wrap(records.Error, "failed to list expired bans for flush")
|
||||
}
|
||||
|
@ -151,7 +155,7 @@ func (c *Context) autoCommit() {
|
|||
select {
|
||||
case <-c.PusherTomb.Dying():
|
||||
//we need to shutdown
|
||||
log.Infof("sqlite routine shutdown")
|
||||
log.Infof("database routine shutdown")
|
||||
if err := c.Flush(); err != nil {
|
||||
log.Errorf("error while flushing records: %s", err)
|
||||
}
|
123
pkg/database/database.go
Normal file
123
pkg/database/database.go
Normal file
|
@ -0,0 +1,123 @@
|
|||
package database
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"strconv"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/crowdsecurity/crowdsec/pkg/types"
|
||||
log "github.com/sirupsen/logrus"
|
||||
|
||||
"github.com/jinzhu/gorm"
|
||||
_ "github.com/jinzhu/gorm/dialects/mysql"
|
||||
_ "github.com/jinzhu/gorm/dialects/sqlite"
|
||||
_ "github.com/mattn/go-sqlite3"
|
||||
"gopkg.in/tomb.v2"
|
||||
)
|
||||
|
||||
type Context struct {
|
||||
Db *gorm.DB //Pointer to database
|
||||
tx *gorm.DB //Pointer to current transaction (flushed on a regular basis)
|
||||
lastCommit time.Time
|
||||
flush bool
|
||||
count int32
|
||||
lock sync.Mutex //booboo
|
||||
PusherTomb tomb.Tomb
|
||||
//to manage auto cleanup : max number of records *or* oldest
|
||||
maxEventRetention int
|
||||
maxDurationRetention time.Duration
|
||||
}
|
||||
|
||||
func checkConfig(cfg map[string]string) error {
|
||||
switch dbType, _ := cfg["type"]; dbType {
|
||||
case "sqlite":
|
||||
if val, ok := cfg["db_path"]; !ok || val == "" {
|
||||
return fmt.Errorf("please specify a 'db_path' to SQLite db in the configuration")
|
||||
}
|
||||
case "mysql":
|
||||
if val, ok := cfg["db_host"]; !ok || val == "" {
|
||||
return fmt.Errorf("please specify a 'db_host' to MySQL db in the configuration")
|
||||
}
|
||||
|
||||
if val, ok := cfg["db_username"]; !ok || val == "" {
|
||||
return fmt.Errorf("please specify a 'db_username' to MySQL db in the configuration")
|
||||
}
|
||||
|
||||
if val, ok := cfg["db_password"]; !ok || val == "" {
|
||||
return fmt.Errorf("please specify a 'db_password' to MySQL db in the configuration")
|
||||
}
|
||||
|
||||
if val, ok := cfg["db_name"]; !ok || val == "" {
|
||||
return fmt.Errorf("please specify a 'db_name' to MySQL db in the configuration")
|
||||
}
|
||||
default:
|
||||
return fmt.Errorf("please specify a proper 'type' to the database configuration ")
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func NewDatabase(cfg map[string]string) (*Context, error) {
|
||||
var err error
|
||||
c := &Context{}
|
||||
|
||||
if err = checkConfig(cfg); err != nil {
|
||||
return nil, fmt.Errorf("bad database configuration : %v", err)
|
||||
}
|
||||
|
||||
if cfg["type"] == "sqlite" {
|
||||
c.Db, err = gorm.Open("sqlite3", cfg["db_path"]+"?_busy_timeout=1000")
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to open %s : %s", cfg["db_path"], err)
|
||||
}
|
||||
}
|
||||
|
||||
if cfg["type"] == "mysql" {
|
||||
gormArg := cfg["db_username"] + ":" + cfg["db_password"] + "@(" + cfg["db_host"] + ")/" + cfg["db_name"] + "?charset=utf8&parseTime=True&loc=Local"
|
||||
c.Db, err = gorm.Open("mysql", gormArg)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to open %s database : %s", cfg["db_name"], err)
|
||||
}
|
||||
}
|
||||
|
||||
if v, ok := cfg["max_records"]; ok {
|
||||
c.maxEventRetention, err = strconv.Atoi(v)
|
||||
if err != nil {
|
||||
log.Errorf("Ignoring invalid max_records '%s' : %s", v, err)
|
||||
}
|
||||
}
|
||||
if v, ok := cfg["max_records_age"]; ok {
|
||||
c.maxDurationRetention, err = time.ParseDuration(v)
|
||||
if err != nil {
|
||||
log.Errorf("Ignoring invalid duration '%s' : %s", v, err)
|
||||
}
|
||||
}
|
||||
|
||||
if val, ok := cfg["debug"]; ok && val == "true" {
|
||||
log.Infof("Enabling debug for %s", cfg["type"])
|
||||
c.Db.LogMode(true)
|
||||
}
|
||||
|
||||
c.flush, err = strconv.ParseBool(cfg["flush"])
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to parse 'flush' value %s : %s", cfg["flush"], err)
|
||||
}
|
||||
// Migrate the schema
|
||||
c.Db.AutoMigrate(&types.EventSequence{}, &types.SignalOccurence{}, &types.BanApplication{})
|
||||
c.Db.Model(&types.SignalOccurence{}).Related(&types.EventSequence{})
|
||||
c.Db.Model(&types.SignalOccurence{}).Related(&types.BanApplication{})
|
||||
c.tx = c.Db.Begin()
|
||||
c.lastCommit = time.Now()
|
||||
ret := c.tx.Commit()
|
||||
|
||||
if ret.Error != nil {
|
||||
return nil, fmt.Errorf("failed to commit records : %v", ret.Error)
|
||||
|
||||
}
|
||||
c.tx = c.Db.Begin()
|
||||
if c.tx == nil {
|
||||
return nil, fmt.Errorf("failed to begin %s transac : %s", cfg["type"], err)
|
||||
}
|
||||
return c, nil
|
||||
}
|
|
@ -1,4 +1,4 @@
|
|||
package sqlite
|
||||
package database
|
||||
|
||||
import (
|
||||
"fmt"
|
|
@ -1,8 +1,7 @@
|
|||
package sqlite
|
||||
package database
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"strconv"
|
||||
"time"
|
||||
|
||||
"github.com/crowdsecurity/crowdsec/pkg/types"
|
||||
|
@ -10,67 +9,6 @@ import (
|
|||
log "github.com/sirupsen/logrus"
|
||||
)
|
||||
|
||||
func (c *Context) GetStats(since time.Duration) ([]map[string]string, error) {
|
||||
sos := []types.SignalOccurence{}
|
||||
stats := make([]map[string]string, 0)
|
||||
as_stats := make(map[string]string)
|
||||
scenar_stats := make(map[string]string)
|
||||
country_stats := make(map[string]string)
|
||||
|
||||
/*get records that are younger than 'since' */
|
||||
records := c.Db.Order("updated_at desc").Where(`strftime("%s", created_at) >= strftime("%s", ?)`, time.Now().Add(-since)).Find(&sos)
|
||||
if records.Error != nil {
|
||||
return nil, records.Error
|
||||
}
|
||||
|
||||
for _, ld := range sos {
|
||||
/*by scenario*/
|
||||
if ld.Scenario == "" {
|
||||
ld.Scenario = "unknown"
|
||||
}
|
||||
if _, ok := scenar_stats[ld.Scenario]; !ok {
|
||||
scenar_stats[ld.Scenario] = "1"
|
||||
} else {
|
||||
nv, err := strconv.Atoi(scenar_stats[ld.Scenario])
|
||||
if err != nil {
|
||||
log.Fatalf("Unable to update internal stats : %v", err)
|
||||
}
|
||||
scenar_stats[ld.Scenario] = fmt.Sprintf("%d", nv+1)
|
||||
}
|
||||
/*by country*/
|
||||
if ld.Source_Country == "" {
|
||||
ld.Source_Country = "unknown"
|
||||
}
|
||||
if _, ok := country_stats[ld.Source_Country]; !ok {
|
||||
country_stats[ld.Source_Country] = "1"
|
||||
} else {
|
||||
nv, err := strconv.Atoi(country_stats[ld.Source_Country])
|
||||
if err != nil {
|
||||
log.Fatalf("Unable to update internal stats : %v", err)
|
||||
}
|
||||
country_stats[ld.Source_Country] = fmt.Sprintf("%d", nv+1)
|
||||
}
|
||||
/*by AS*/
|
||||
if ld.Source_AutonomousSystemNumber == "" {
|
||||
ld.Source_AutonomousSystemNumber = "unknown"
|
||||
}
|
||||
if _, ok := as_stats[ld.Source_AutonomousSystemNumber]; !ok {
|
||||
as_stats[ld.Source_AutonomousSystemNumber] = "1"
|
||||
} else {
|
||||
nv, err := strconv.Atoi(as_stats[ld.Source_AutonomousSystemNumber])
|
||||
if err != nil {
|
||||
log.Fatalf("Unable to update internal stats : %v", err)
|
||||
}
|
||||
as_stats[ld.Source_AutonomousSystemNumber] = fmt.Sprintf("%d", nv+1)
|
||||
}
|
||||
}
|
||||
stats = append(stats, as_stats)
|
||||
stats = append(stats, scenar_stats)
|
||||
stats = append(stats, country_stats)
|
||||
|
||||
return stats, nil
|
||||
}
|
||||
|
||||
//GetBansAt returns the IPs that were banned at a given time
|
||||
func (c *Context) GetBansAt(at time.Time) ([]map[string]string, error) {
|
||||
|
||||
|
@ -78,7 +16,8 @@ func (c *Context) GetBansAt(at time.Time) ([]map[string]string, error) {
|
|||
rets := make([]map[string]string, 0)
|
||||
/*get non-expired records*/
|
||||
//c.Db.LogMode(true)
|
||||
records := c.Db.Order("updated_at desc").Where(`strftime("%s", until) >= strftime("%s", ?) AND strftime("%s", created_at) < strftime("%s", ?)`, at, at).Group("ip_text").Find(&bas) /*.Count(&count)*/
|
||||
//records := c.Db.Order("updated_at desc").Where(`strftime("%s", until) >= strftime("%s", ?) AND strftime("%s", created_at) < strftime("%s", ?)`, at, at).Group("ip_text").Find(&bas) /*.Count(&count)*/
|
||||
records := c.Db.Order("updated_at desc").Where("until >= ? AND created_at < ?", at, at).Group("ip_text").Find(&bas) /*.Count(&count)*/
|
||||
if records.Error != nil {
|
||||
return nil, records.Error
|
||||
}
|
||||
|
@ -87,7 +26,8 @@ func (c *Context) GetBansAt(at time.Time) ([]map[string]string, error) {
|
|||
/*
|
||||
fetch count of bans for this specific ip_text
|
||||
*/
|
||||
ret := c.Db.Table("ban_applications").Order("updated_at desc").Where(`ip_text = ? AND strftime("%s", until) >= strftime("%s", ?) AND strftime("%s", created_at) < strftime("%s", ?) AND deleted_at is NULL`, ba.IpText, at, at).Count(&count)
|
||||
//ret := c.Db.Table("ban_applications").Order("updated_at desc").Where(`ip_text = ? AND strftime("%s", until) >= strftime("%s", ?) AND strftime("%s", created_at) < strftime("%s", ?) AND deleted_at is NULL`, ba.IpText, at, at).Count(&count)
|
||||
ret := c.Db.Table("ban_applications").Order("updated_at desc").Where(`ip_text = ? AND until >= ? AND created_at < ? AND deleted_at is NULL`, ba.IpText, at, at).Count(&count)
|
||||
if ret.Error != nil {
|
||||
return nil, fmt.Errorf("failed to fetch records count for %s : %v", ba.IpText, ret.Error)
|
||||
}
|
||||
|
@ -161,3 +101,67 @@ func (c *Context) GetBansAt(at time.Time) ([]map[string]string, error) {
|
|||
}
|
||||
return rets, nil
|
||||
}
|
||||
|
||||
func (c *Context) GetNewBan() ([]types.BanApplication, error) {
|
||||
|
||||
var bas []types.BanApplication
|
||||
|
||||
//select the news bans
|
||||
banRecords := c.Db.
|
||||
Order("updated_at desc").
|
||||
/*Get non expired (until) bans*/
|
||||
Where(`until >= ?`, time.Now()).
|
||||
/*Only get one ban per unique ip_text*/
|
||||
Group("ip_text").
|
||||
Find(&bas)
|
||||
if banRecords.Error != nil {
|
||||
return nil, fmt.Errorf("failed when selection bans : %v", banRecords.Error)
|
||||
}
|
||||
|
||||
return bas, nil
|
||||
|
||||
}
|
||||
|
||||
func (c *Context) GetNewBanSince(since time.Time) ([]types.BanApplication, error) {
|
||||
|
||||
var bas []types.BanApplication
|
||||
|
||||
//select the news bans
|
||||
banRecords := c.Db.
|
||||
Order("updated_at desc").
|
||||
/*Get non expired (until) bans*/
|
||||
Where(`until >= ?`, time.Now()).
|
||||
/*That were added since last tick*/
|
||||
Where(`updated_at >= ?`, since).
|
||||
/*Only get one ban per unique ip_text*/
|
||||
Group("ip_text").
|
||||
Find(&bas) /*.Count(&count)*/
|
||||
if banRecords.Error != nil {
|
||||
return nil, fmt.Errorf("failed when selection bans : %v", banRecords.Error)
|
||||
}
|
||||
|
||||
return bas, nil
|
||||
|
||||
}
|
||||
|
||||
func (c *Context) GetDeletedBanSince(since time.Time) ([]types.BanApplication, error) {
|
||||
var bas []types.BanApplication
|
||||
|
||||
deletedRecords := c.Db.
|
||||
/*ignore the soft delete*/
|
||||
Unscoped().
|
||||
Order("updated_at desc").
|
||||
/*ban that were deleted since since or bans that expired since since*/
|
||||
Where(`deleted_at >= ? OR
|
||||
(until >= ? AND until <= ?)`,
|
||||
since.Add(1*time.Second), since.Add(1*time.Second), time.Now()).
|
||||
/*Only get one ban per unique ip_text*/
|
||||
Group("ip_text").
|
||||
Find(&bas) /*.Count(&count)*/
|
||||
|
||||
if deletedRecords.Error != nil {
|
||||
return nil, fmt.Errorf("failed when selection deleted bans : %v", deletedRecords.Error)
|
||||
}
|
||||
|
||||
return bas, nil
|
||||
}
|
|
@ -1,4 +1,4 @@
|
|||
package sqlite
|
||||
package database
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
|
@ -31,7 +31,7 @@ func (c *Context) WriteSignal(sig types.SignalOccurence) error {
|
|||
//sig.Scenario = sig.Scenario
|
||||
if ret.Error != nil {
|
||||
log.Errorf("FAILED : %+v \n", ret.Error)
|
||||
return fmt.Errorf("failed to write signal occurence : %v", ret.Error)
|
||||
return fmt.Errorf("failed to write signal occurrence : %v", ret.Error)
|
||||
}
|
||||
return nil
|
||||
}
|
|
@ -130,7 +130,7 @@ func (o *Output) FlushAll() {
|
|||
}
|
||||
if o.bManager != nil {
|
||||
if err := o.bManager.Flush(); err != nil {
|
||||
log.Errorf("Failing Sqlite flush : %s", err)
|
||||
log.Errorf("Failing database flush : %s", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,88 +0,0 @@
|
|||
package sqlite
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"strconv"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/crowdsecurity/crowdsec/pkg/types"
|
||||
"github.com/pkg/errors"
|
||||
log "github.com/sirupsen/logrus"
|
||||
|
||||
"github.com/jinzhu/gorm"
|
||||
_ "github.com/jinzhu/gorm/dialects/sqlite"
|
||||
_ "github.com/mattn/go-sqlite3"
|
||||
"gopkg.in/tomb.v2"
|
||||
)
|
||||
|
||||
type Context struct {
|
||||
Db *gorm.DB //Pointer to sqlite db
|
||||
tx *gorm.DB //Pointer to current transaction (flushed on a regular basis)
|
||||
lastCommit time.Time
|
||||
flush bool
|
||||
count int32
|
||||
lock sync.Mutex //booboo
|
||||
PusherTomb tomb.Tomb
|
||||
//to manage auto cleanup : max number of records *or* oldest
|
||||
maxEventRetention int
|
||||
maxDurationRetention time.Duration
|
||||
}
|
||||
|
||||
func NewSQLite(cfg map[string]string) (*Context, error) {
|
||||
var err error
|
||||
c := &Context{}
|
||||
|
||||
if v, ok := cfg["max_records"]; ok {
|
||||
c.maxEventRetention, err = strconv.Atoi(v)
|
||||
if err != nil {
|
||||
log.Errorf("Ignoring invalid max_records '%s' : %s", v, err)
|
||||
}
|
||||
}
|
||||
if v, ok := cfg["max_records_age"]; ok {
|
||||
c.maxDurationRetention, err = time.ParseDuration(v)
|
||||
if err != nil {
|
||||
log.Errorf("Ignoring invalid duration '%s' : %s", v, err)
|
||||
}
|
||||
}
|
||||
if _, ok := cfg["db_path"]; !ok {
|
||||
return nil, fmt.Errorf("please specify a 'db_path' to SQLite db in the configuration")
|
||||
}
|
||||
|
||||
if cfg["db_path"] == "" {
|
||||
return nil, fmt.Errorf("please specify a 'db_path' to SQLite db in the configuration")
|
||||
}
|
||||
log.Debugf("Starting SQLite backend, path:%s", cfg["db_path"])
|
||||
|
||||
c.Db, err = gorm.Open("sqlite3", cfg["db_path"]+"?_busy_timeout=1000")
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to open %s : %s", cfg["db_path"], err)
|
||||
}
|
||||
|
||||
if val, ok := cfg["debug"]; ok && val == "true" {
|
||||
log.Infof("Enabling debug for sqlite")
|
||||
c.Db.LogMode(true)
|
||||
}
|
||||
|
||||
c.flush, err = strconv.ParseBool(cfg["flush"])
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "Unable to parse 'flush' flag")
|
||||
}
|
||||
// Migrate the schema
|
||||
c.Db.AutoMigrate(&types.EventSequence{}, &types.SignalOccurence{}, &types.BanApplication{})
|
||||
c.Db.Model(&types.SignalOccurence{}).Related(&types.EventSequence{})
|
||||
c.Db.Model(&types.SignalOccurence{}).Related(&types.BanApplication{})
|
||||
c.tx = c.Db.Begin()
|
||||
c.lastCommit = time.Now()
|
||||
ret := c.tx.Commit()
|
||||
|
||||
if ret.Error != nil {
|
||||
return nil, fmt.Errorf("failed to commit records : %v", ret.Error)
|
||||
|
||||
}
|
||||
c.tx = c.Db.Begin()
|
||||
if c.tx == nil {
|
||||
return nil, fmt.Errorf("failed to begin sqlite transac : %s", err)
|
||||
}
|
||||
return c, nil
|
||||
}
|
|
@ -19,5 +19,5 @@ type EventSequence struct {
|
|||
Source_Country string
|
||||
/*stop db only */
|
||||
SignalOccurenceID uint //unique ID for the hasMany relation
|
||||
Serialized string //the serialized dict
|
||||
Serialized string `gorm:"size:65535"` //the serialized dict
|
||||
}
|
||||
|
|
|
@ -4,14 +4,14 @@ import (
|
|||
"fmt"
|
||||
"time"
|
||||
|
||||
"github.com/crowdsecurity/crowdsec/pkg/sqlite"
|
||||
"github.com/crowdsecurity/crowdsec/pkg/database"
|
||||
"github.com/crowdsecurity/crowdsec/pkg/types"
|
||||
log "github.com/sirupsen/logrus"
|
||||
)
|
||||
|
||||
//nolint:unused // pluginDB is the interface for sqlite output plugin
|
||||
//nolint:unused // pluginDB is the interface for database output plugin
|
||||
type pluginDB struct {
|
||||
CTX *sqlite.Context
|
||||
CTX *database.Context
|
||||
}
|
||||
|
||||
func (p *pluginDB) Shutdown() error {
|
||||
|
@ -29,8 +29,8 @@ func (p *pluginDB) StartAutoCommit() error {
|
|||
|
||||
func (p *pluginDB) Init(config map[string]string) error {
|
||||
var err error
|
||||
log.Debugf("sqlite config : %+v \n", config)
|
||||
p.CTX, err = sqlite.NewSQLite(config)
|
||||
log.Debugf("database config : %+v \n", config)
|
||||
p.CTX, err = database.NewDatabase(config)
|
||||
|
||||
if err != nil {
|
||||
return err
|
|
@ -39,14 +39,15 @@ PARSER_S02="$PARSER_DIR/s02-enrich"
|
|||
SCENARIOS_DIR="$CONFIG_DIR/scenarios"
|
||||
POSTOVERFLOWS_DIR="$CONFIG_DIR/postoverflows"
|
||||
PLUGIN_BACKEND_DIR="$CONFIG_DIR/plugins/backend/"
|
||||
SQLITE_PLUGIN_FILE="$PLUGIN_BACKEND_DIR/sqlite.yaml"
|
||||
DB_PLUGIN_FILE="$PLUGIN_BACKEND_DIR/database.yaml"
|
||||
|
||||
gen_sqlite_config() {
|
||||
echo "name: sqlite" >> "$SQLITE_PLUGIN_FILE"
|
||||
echo "path: ./plugins/backend/sqlite.so" >> "$SQLITE_PLUGIN_FILE"
|
||||
echo "config:" >> "$SQLITE_PLUGIN_FILE"
|
||||
echo " db_path: ./test.db" >> "$SQLITE_PLUGIN_FILE"
|
||||
echo " flush: true" >> "$SQLITE_PLUGIN_FILE"
|
||||
echo "name: database" >> "$DB_PLUGIN_FILE"
|
||||
echo "path: ./plugins/backend/database.so" >> "$DB_PLUGIN_FILE"
|
||||
echo "config:" >> "$DB_PLUGIN_FILE"
|
||||
echo " type: sqlite" >> "$DB_PLUGIN_FILE"
|
||||
echo " db_path: ./test.db" >> "$DB_PLUGIN_FILE"
|
||||
echo " flush: true" >> "$DB_PLUGIN_FILE"
|
||||
}
|
||||
|
||||
log_info() {
|
||||
|
|
Loading…
Reference in a new issue