don't use transactions

This commit is contained in:
Thibault bui Koechlin 2020-07-26 10:24:10 +02:00
parent e035601c8b
commit 6ead68ff73
3 changed files with 26 additions and 63 deletions

View file

@ -1,8 +1,6 @@
package database package database
import ( import (
"fmt"
"sync/atomic"
"time" "time"
"github.com/crowdsecurity/crowdsec/pkg/types" "github.com/crowdsecurity/crowdsec/pkg/types"
@ -11,10 +9,11 @@ import (
) )
func (c *Context) DeleteExpired() error { func (c *Context) DeleteExpired() error {
c.lock.Lock()
defer c.lock.Unlock()
//Delete the expired records //Delete the expired records
now := time.Now() now := time.Now()
if c.flush { if c.flush {
//retx := c.Db.Where(`strftime("%s", until) < strftime("%s", "now")`).Delete(types.BanApplication{})
retx := c.Db.Delete(types.BanApplication{}, "until < ?", now) retx := c.Db.Delete(types.BanApplication{}, "until < ?", now)
if retx.RowsAffected > 0 { if retx.RowsAffected > 0 {
log.Infof("Flushed %d expired entries from Ban Application", retx.RowsAffected) log.Infof("Flushed %d expired entries from Ban Application", retx.RowsAffected)
@ -23,18 +22,8 @@ func (c *Context) DeleteExpired() error {
return nil return nil
} }
/*Flush doesn't do anything here : we are not using transactions or such, nothing to "flush" per se*/
func (c *Context) Flush() error { func (c *Context) Flush() error {
c.lock.Lock()
defer c.lock.Unlock()
ret := c.tx.Commit()
if ret.Error != nil {
c.tx = c.Db.Begin()
return fmt.Errorf("failed to commit records : %v", ret.Error)
}
c.tx = c.Db.Begin()
c.lastCommit = time.Now()
return nil return nil
} }
@ -60,21 +49,20 @@ func (c *Context) CleanUpRecordsByAge() error {
log.Debugf("no event older than %s", c.maxDurationRetention.String()) log.Debugf("no event older than %s", c.maxDurationRetention.String())
return nil return nil
} }
//let's do it in a single transaction
delTx := c.Db.Unscoped().Begin()
delRecords := 0
delRecords := 0
for _, record := range sos { for _, record := range sos {
copy := record copy := record
delTx.Unscoped().Table("signal_occurences").Where("ID = ?", copy.SignalOccurenceID).Delete(&types.SignalOccurence{}) if ret := c.Db.Unscoped().Table("signal_occurences").Where("ID = ?", copy.SignalOccurenceID).Delete(&types.SignalOccurence{}); ret.Error != nil {
delTx.Unscoped().Table("event_sequences").Where("signal_occurence_id = ?", copy.SignalOccurenceID).Delete(&types.EventSequence{}) return errors.Wrap(ret.Error, "failed to clean signal_occurences")
delTx.Unscoped().Table("ban_applications").Delete(&copy)
//we need to delete associations : event_sequences, signal_occurences
delRecords++
} }
ret = delTx.Unscoped().Commit() if ret := c.Db.Unscoped().Table("event_sequences").Where("signal_occurence_id = ?", copy.SignalOccurenceID).Delete(&types.EventSequence{}); ret.Error != nil {
if ret.Error != nil { return errors.Wrap(ret.Error, "failed to clean event_sequences")
return errors.Wrap(ret.Error, "failed to delete records") }
if ret := c.Db.Unscoped().Table("ban_applications").Delete(&copy); ret.Error != nil {
return errors.Wrap(ret.Error, "failed to clean ban_applications")
}
delRecords++
} }
log.Printf("max_records_age: deleting %d events (max age:%s)", delRecords, c.maxDurationRetention) log.Printf("max_records_age: deleting %d events (max age:%s)", delRecords, c.maxDurationRetention)
return nil return nil
@ -107,13 +95,18 @@ func (c *Context) CleanUpRecordsByCount() error {
} }
//let's do it in a single transaction //let's do it in a single transaction
delTx := c.Db.Unscoped().Begin()
delRecords := 0 delRecords := 0
for _, ld := range sos { for _, ld := range sos {
copy := ld copy := ld
delTx.Unscoped().Table("signal_occurences").Where("ID = ?", copy.SignalOccurenceID).Delete(&types.SignalOccurence{}) if ret := c.Db.Unscoped().Table("signal_occurences").Where("ID = ?", copy.SignalOccurenceID).Delete(&types.SignalOccurence{}); ret.Error != nil {
delTx.Unscoped().Table("event_sequences").Where("signal_occurence_id = ?", copy.SignalOccurenceID).Delete(&types.EventSequence{}) return errors.Wrap(ret.Error, "failed to clean signal_occurences")
delTx.Unscoped().Table("ban_applications").Delete(&copy) }
if ret := c.Db.Unscoped().Table("event_sequences").Where("signal_occurence_id = ?", copy.SignalOccurenceID).Delete(&types.EventSequence{}); ret.Error != nil {
return errors.Wrap(ret.Error, "failed to clean event_sequences")
}
if ret := c.Db.Unscoped().Table("ban_applications").Delete(&copy); ret.Error != nil {
return errors.Wrap(ret.Error, "failed to clean ban_applications")
}
//we need to delete associations : event_sequences, signal_occurences //we need to delete associations : event_sequences, signal_occurences
delRecords++ delRecords++
//let's delete as well the associated event_sequence //let's delete as well the associated event_sequence
@ -122,12 +115,7 @@ func (c *Context) CleanUpRecordsByCount() error {
} }
} }
if len(sos) > 0 { if len(sos) > 0 {
//log.Printf("Deleting %d soft-deleted results out of %d total events (%d soft-deleted)", delRecords, count, len(sos))
log.Printf("max_records: deleting %d events. (%d soft-deleted)", delRecords, len(sos)) log.Printf("max_records: deleting %d events. (%d soft-deleted)", delRecords, len(sos))
ret = delTx.Unscoped().Commit()
if ret.Error != nil {
return errors.Wrap(ret.Error, "failed to delete records")
}
} else { } else {
log.Debugf("didn't find any record to clean") log.Debugf("didn't find any record to clean")
} }
@ -145,7 +133,6 @@ func (c *Context) StartAutoCommit() error {
func (c *Context) autoCommit() { func (c *Context) autoCommit() {
log.Debugf("starting autocommit") log.Debugf("starting autocommit")
ticker := time.NewTicker(200 * time.Millisecond)
cleanUpTicker := time.NewTicker(1 * time.Minute) cleanUpTicker := time.NewTicker(1 * time.Minute)
expireTicker := time.NewTicker(1 * time.Second) expireTicker := time.NewTicker(1 * time.Second)
if !c.flush { if !c.flush {
@ -159,12 +146,6 @@ func (c *Context) autoCommit() {
if err := c.Flush(); err != nil { if err := c.Flush(); err != nil {
log.Errorf("error while flushing records: %s", err) log.Errorf("error while flushing records: %s", err)
} }
if ret := c.tx.Commit(); ret.Error != nil {
log.Errorf("failed to commit records : %v", ret.Error)
}
if err := c.tx.Close(); err != nil {
log.Errorf("error while closing tx : %s", err)
}
if err := c.Db.Close(); err != nil { if err := c.Db.Close(); err != nil {
log.Errorf("error while closing db : %s", err) log.Errorf("error while closing db : %s", err)
} }
@ -173,14 +154,6 @@ func (c *Context) autoCommit() {
if err := c.DeleteExpired(); err != nil { if err := c.DeleteExpired(); err != nil {
log.Errorf("Error while deleting expired records: %s", err) log.Errorf("Error while deleting expired records: %s", err)
} }
case <-ticker.C:
if atomic.LoadInt32(&c.count) != 0 &&
(atomic.LoadInt32(&c.count)%100 == 0 || time.Since(c.lastCommit) >= 500*time.Millisecond) {
if err := c.Flush(); err != nil {
log.Errorf("failed to flush : %s", err)
}
}
case <-cleanUpTicker.C: case <-cleanUpTicker.C:
if err := c.CleanUpRecordsByCount(); err != nil { if err := c.CleanUpRecordsByCount(); err != nil {
log.Errorf("error in max records cleanup : %s", err) log.Errorf("error in max records cleanup : %s", err)

View file

@ -107,17 +107,7 @@ func NewDatabase(cfg map[string]string) (*Context, error) {
c.Db.AutoMigrate(&types.EventSequence{}, &types.SignalOccurence{}, &types.BanApplication{}) c.Db.AutoMigrate(&types.EventSequence{}, &types.SignalOccurence{}, &types.BanApplication{})
c.Db.Model(&types.SignalOccurence{}).Related(&types.EventSequence{}) c.Db.Model(&types.SignalOccurence{}).Related(&types.EventSequence{})
c.Db.Model(&types.SignalOccurence{}).Related(&types.BanApplication{}) c.Db.Model(&types.SignalOccurence{}).Related(&types.BanApplication{})
c.tx = c.Db.Begin()
c.lastCommit = time.Now() c.lastCommit = time.Now()
ret := c.tx.Commit()
if ret.Error != nil {
return nil, fmt.Errorf("failed to commit records : %v", ret.Error)
}
c.tx = c.Db.Begin()
if c.tx == nil {
return nil, fmt.Errorf("failed to begin %s transac : %s", cfg["type"], err)
}
return c, nil return c, nil
} }

View file

@ -15,7 +15,7 @@ func (c *Context) WriteBanApplication(ban types.BanApplication) error {
c.lock.Lock() c.lock.Lock()
defer c.lock.Unlock() defer c.lock.Unlock()
log.Debugf("Ban application being called : %s %s", ban.Scenario, ban.IpText) log.Debugf("Ban application being called : %s %s", ban.Scenario, ban.IpText)
ret := c.tx.Where(types.BanApplication{IpText: ban.IpText}).Assign(types.BanApplication{Until: ban.Until}).Assign(types.BanApplication{Reason: ban.Reason}).Assign(types.BanApplication{MeasureType: ban.MeasureType}).FirstOrCreate(&ban) ret := c.Db.Where(types.BanApplication{IpText: ban.IpText}).Assign(types.BanApplication{Until: ban.Until}).Assign(types.BanApplication{Reason: ban.Reason}).Assign(types.BanApplication{MeasureType: ban.MeasureType}).FirstOrCreate(&ban)
if ret.Error != nil { if ret.Error != nil {
return fmt.Errorf("failed to write ban record : %v", ret.Error) return fmt.Errorf("failed to write ban record : %v", ret.Error)
} }
@ -28,14 +28,14 @@ func (c *Context) WriteSignal(sig types.SignalOccurence) error {
defer c.lock.Unlock() defer c.lock.Unlock()
/*let's ensure we only have one ban active for a given scope*/ /*let's ensure we only have one ban active for a given scope*/
for _, ba := range sig.BanApplications { for _, ba := range sig.BanApplications {
ret := c.tx.Where("ip_text = ?", ba.IpText).Delete(types.BanApplication{}) ret := c.Db.Where("ip_text = ?", ba.IpText).Delete(types.BanApplication{})
if ret.Error != nil { if ret.Error != nil {
log.Errorf("While delete overlaping bans : %s", ret.Error) log.Errorf("While delete overlaping bans : %s", ret.Error)
return fmt.Errorf("failed to write signal occurrence : %v", ret.Error) return fmt.Errorf("failed to write signal occurrence : %v", ret.Error)
} }
} }
/*and add the new one(s)*/ /*and add the new one(s)*/
ret := c.tx.Create(&sig) ret := c.Db.Create(&sig)
if ret.Error != nil { if ret.Error != nil {
log.Errorf("While creating new bans : %s", ret.Error) log.Errorf("While creating new bans : %s", ret.Error)
return fmt.Errorf("failed to write signal occurrence : %s", ret.Error) return fmt.Errorf("failed to write signal occurrence : %s", ret.Error)