123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190 |
- package sqlite
- import (
- "fmt"
- "sync/atomic"
- "time"
- "github.com/crowdsecurity/crowdsec/pkg/types"
- "github.com/pkg/errors"
- log "github.com/sirupsen/logrus"
- )
- func (c *Context) DeleteExpired() error {
- //Delete the expired records
- if c.flush {
- retx := c.Db.Where(`strftime("%s", until) < strftime("%s", "now")`).Delete(types.BanApplication{})
- if retx.RowsAffected > 0 {
- log.Infof("Flushed %d expired entries from Ban Application", retx.RowsAffected)
- }
- }
- return nil
- }
- func (c *Context) Flush() error {
- c.lock.Lock()
- defer c.lock.Unlock()
- ret := c.tx.Commit()
- if ret.Error != nil {
- c.tx = c.Db.Begin()
- return fmt.Errorf("failed to commit records : %v", ret.Error)
- }
- c.tx = c.Db.Begin()
- c.lastCommit = time.Now()
- return nil
- }
- func (c *Context) CleanUpRecordsByAge() error {
- //let's fetch all expired records that are more than XX days olds
- sos := []types.BanApplication{}
- if c.maxDurationRetention == 0 {
- return nil
- }
- //look for soft-deleted events that are OLDER than maxDurationRetention
- ret := c.Db.Unscoped().Table("ban_applications").Where("deleted_at is not NULL").
- Where(fmt.Sprintf("deleted_at > date('now','-%d minutes')", int(c.maxDurationRetention.Minutes()))).
- Order("updated_at desc").Find(&sos)
- if ret.Error != nil {
- return errors.Wrap(ret.Error, "failed to get count of old records")
- }
- //no events elligible
- if len(sos) == 0 || ret.RowsAffected == 0 {
- log.Debugf("no event older than %s", c.maxDurationRetention.String())
- return nil
- }
- //let's do it in a single transaction
- delTx := c.Db.Unscoped().Begin()
- delRecords := 0
- for _, record := range sos {
- copy := record
- delTx.Unscoped().Table("signal_occurences").Where("ID = ?", copy.SignalOccurenceID).Delete(&types.SignalOccurence{})
- delTx.Unscoped().Table("event_sequences").Where("signal_occurence_id = ?", copy.SignalOccurenceID).Delete(&types.EventSequence{})
- delTx.Unscoped().Table("ban_applications").Delete(©)
- //we need to delete associations : event_sequences, signal_occurences
- delRecords++
- }
- ret = delTx.Unscoped().Commit()
- if ret.Error != nil {
- return errors.Wrap(ret.Error, "failed to delete records")
- }
- log.Printf("max_records_age: deleting %d events (max age:%s)", delRecords, c.maxDurationRetention)
- return nil
- }
- func (c *Context) CleanUpRecordsByCount() error {
- var count int
- if c.maxEventRetention <= 0 {
- return nil
- }
- ret := c.Db.Unscoped().Table("ban_applications").Order("updated_at desc").Count(&count)
- if ret.Error != nil {
- return errors.Wrap(ret.Error, "failed to get bans count")
- }
- if count < c.maxEventRetention {
- log.Debugf("%d < %d, don't cleanup", count, c.maxEventRetention)
- return nil
- }
- sos := []types.BanApplication{}
- /*get soft deleted records oldest to youngest*/
- records := c.Db.Unscoped().Table("ban_applications").Where("deleted_at is not NULL").Where(`strftime("%s", deleted_at) < strftime("%s", "now")`).Find(&sos)
- if records.Error != nil {
- return errors.Wrap(records.Error, "failed to list expired bans for flush")
- }
- //let's do it in a single transaction
- delTx := c.Db.Unscoped().Begin()
- delRecords := 0
- for _, ld := range sos {
- copy := ld
- delTx.Unscoped().Table("signal_occurences").Where("ID = ?", copy.SignalOccurenceID).Delete(&types.SignalOccurence{})
- delTx.Unscoped().Table("event_sequences").Where("signal_occurence_id = ?", copy.SignalOccurenceID).Delete(&types.EventSequence{})
- delTx.Unscoped().Table("ban_applications").Delete(©)
- //we need to delete associations : event_sequences, signal_occurences
- delRecords++
- //let's delete as well the associated event_sequence
- if count-delRecords <= c.maxEventRetention {
- break
- }
- }
- if len(sos) > 0 {
- //log.Printf("Deleting %d soft-deleted results out of %d total events (%d soft-deleted)", delRecords, count, len(sos))
- log.Printf("max_records: deleting %d events. (%d soft-deleted)", delRecords, len(sos))
- ret = delTx.Unscoped().Commit()
- if ret.Error != nil {
- return errors.Wrap(ret.Error, "failed to delete records")
- }
- } else {
- log.Debugf("didn't find any record to clean")
- }
- return nil
- }
- func (c *Context) StartAutoCommit() error {
- //TBD : we shouldn't start auto-commit if we are in cli mode ?
- c.PusherTomb.Go(func() error {
- c.autoCommit()
- return nil
- })
- return nil
- }
- func (c *Context) autoCommit() {
- log.Debugf("starting autocommit")
- ticker := time.NewTicker(200 * time.Millisecond)
- cleanUpTicker := time.NewTicker(1 * time.Minute)
- expireTicker := time.NewTicker(1 * time.Second)
- if !c.flush {
- log.Debugf("flush is disabled")
- }
- for {
- select {
- case <-c.PusherTomb.Dying():
- //we need to shutdown
- log.Infof("sqlite routine shutdown")
- if err := c.Flush(); err != nil {
- log.Errorf("error while flushing records: %s", err)
- }
- if ret := c.tx.Commit(); ret.Error != nil {
- log.Errorf("failed to commit records : %v", ret.Error)
- }
- if err := c.tx.Close(); err != nil {
- log.Errorf("error while closing tx : %s", err)
- }
- if err := c.Db.Close(); err != nil {
- log.Errorf("error while closing db : %s", err)
- }
- return
- case <-expireTicker.C:
- if err := c.DeleteExpired(); err != nil {
- log.Errorf("Error while deleting expired records: %s", err)
- }
- case <-ticker.C:
- if atomic.LoadInt32(&c.count) != 0 &&
- (atomic.LoadInt32(&c.count)%100 == 0 || time.Since(c.lastCommit) >= 500*time.Millisecond) {
- if err := c.Flush(); err != nil {
- log.Errorf("failed to flush : %s", err)
- }
- }
- case <-cleanUpTicker.C:
- if err := c.CleanUpRecordsByCount(); err != nil {
- log.Errorf("error in max records cleanup : %s", err)
- }
- if err := c.CleanUpRecordsByAge(); err != nil {
- log.Errorf("error in old records cleanup : %s", err)
- }
- }
- }
- }
|