diff --git a/pkg/sqlite/commit.go b/pkg/sqlite/commit.go index cec4e5c96..ec6f18ce4 100644 --- a/pkg/sqlite/commit.go +++ b/pkg/sqlite/commit.go @@ -6,9 +6,23 @@ import ( "time" "github.com/crowdsecurity/crowdsec/pkg/types" + "github.com/pkg/errors" log "github.com/sirupsen/logrus" ) +func (c *Context) DeleteExpired() error { + //Delete the expired records + if c.flush { + retx := c.Db.Where(`strftime("%s", until) < strftime("%s", "now")`).Delete(types.BanApplication{}) + if retx.RowsAffected > 0 { + log.Infof("Flushed %d expired entries from Ban Application", retx.RowsAffected) + } + } else { + log.Infof("flush is disabled") + } + return nil +} + func (c *Context) Flush() error { c.lock.Lock() defer c.lock.Unlock() @@ -21,17 +35,64 @@ func (c *Context) Flush() error { } c.tx = c.Db.Begin() c.lastCommit = time.Now() - //Delete the expired records - if c.flush { - retx := c.Db.Where(`strftime("%s", until) < strftime("%s", "now")`).Delete(types.BanApplication{}) - if retx.RowsAffected > 0 { - log.Infof("Flushed %d expired entries from Ban Application", retx.RowsAffected) + c.DeleteExpired() + + return nil +} + +func (c *Context) CleanUpRecordsByCount() error { + var count int + + if c.maxEventRetention <= 0 { + return nil + } + + ret := c.Db.Unscoped().Table("ban_applications").Order("updated_at desc").Count(&count) + + if ret.Error != nil { + return errors.Wrap(ret.Error, "failed to get bans count") + } + if count < c.maxEventRetention { + log.Infof("%d < %d, don't cleanup", count, c.maxEventRetention) + return nil + } + + sos := []types.BanApplication{} + /*get soft deleted records oldest to youngest*/ + records := c.Db.Unscoped().Table("ban_applications").Where("deleted_at is not NULL").Where(`strftime("%s", deleted_at) < strftime("%s", "now")`).Find(&sos) + if records.Error != nil { + return errors.Wrap(records.Error, "failed to list expired bans for flush") + } + + //let's do it in a single transaction + delTx := c.Db.Unscoped().Begin() + delRecords := 0 + for _, ld := range sos { + copy := ld + delTx.Unscoped().Table("signal_occurences").Where("ID = ?", copy.SignalOccurenceID).Delete(&types.SignalOccurence{}) + delTx.Unscoped().Table("event_sequences").Where("signal_occurence_id = ?", copy.SignalOccurenceID).Delete(&types.EventSequence{}) + delTx.Unscoped().Table("ban_applications").Delete(©) + //we need to delete associations : event_sequences, signal_occurences + delRecords++ + //let's delete as well the associated event_sequence + if count-delRecords <= c.maxEventRetention { + break } } + if len(sos) > 0 { + log.Printf("Deleting %d soft-deleted results out of %d total events (%d soft-deleted)", delRecords, count, len(sos)) + ret = delTx.Unscoped().Commit() + if ret.Error != nil { + return errors.Wrap(ret.Error, "failed to delete records") + } + } else { + log.Debugf("didn't find any record to clean") + } return nil } func (c *Context) AutoCommit() { + log.Infof("starting autocommit") ticker := time.NewTicker(200 * time.Millisecond) for { select { @@ -54,10 +115,16 @@ func (c *Context) AutoCommit() { case <-ticker.C: if atomic.LoadInt32(&c.count) != 0 && (atomic.LoadInt32(&c.count)%100 == 0 || time.Since(c.lastCommit) >= 500*time.Millisecond) { + //log.Warningf("flush time") if err := c.Flush(); err != nil { log.Errorf("failed to flush : %s", err) } + //log.Printf("starting auto-cleanup") + if err := c.CleanUpRecordsByCount(); err != nil { + log.Errorf("error in auto-cleanup : %s", err) + } } + } } } diff --git a/pkg/sqlite/sqlite.go b/pkg/sqlite/sqlite.go index b3c5d8a63..4761f6242 100644 --- a/pkg/sqlite/sqlite.go +++ b/pkg/sqlite/sqlite.go @@ -23,12 +23,16 @@ type Context struct { count int32 lock sync.Mutex //booboo PusherTomb tomb.Tomb + //to manage auto cleanup : max number of records *or* oldest + maxEventRetention int + maxDurationRetention time.Duration } func NewSQLite(cfg map[string]string) (*Context, error) { var err error c := &Context{} + log.Warningf("NEW SQLITE : %+v", cfg) if _, ok := cfg["db_path"]; !ok { return nil, fmt.Errorf("please specify a 'db_path' to SQLite db in the configuration") } @@ -47,6 +51,8 @@ func NewSQLite(cfg map[string]string) (*Context, error) { c.Db.LogMode(true) } + c.Db.LogMode(true) + c.flush, _ = strconv.ParseBool(cfg["flush"]) // Migrate the schema c.Db.AutoMigrate(&types.EventSequence{}, &types.SignalOccurence{}, &types.BanApplication{}) @@ -64,6 +70,8 @@ func NewSQLite(cfg map[string]string) (*Context, error) { if c.tx == nil { return nil, fmt.Errorf("failed to begin sqlite transac : %s", err) } + //random attempt + c.maxEventRetention = 100 c.PusherTomb.Go(func() error { c.AutoCommit() return nil