Parcourir la source

No sql transaction + proper time-machine wait (#148)

Thibault "bui" Koechlin il y a 5 ans
Parent
commit
151af2d0d8
4 fichiers modifiés avec 61 ajouts et 65 suppressions
  1. 35 1
      cmd/crowdsec/serve.go
  2. 22 49
      pkg/database/commit.go
  3. 1 12
      pkg/database/database.go
  4. 3 3
      pkg/database/write.go

+ 35 - 1
cmd/crowdsec/serve.go

@@ -139,7 +139,41 @@ func serveOneTimeRun(outputRunner outputs.Output) error {
 	}
 	log.Infof("acquisition is finished, wait for parser/bucket/ouputs.")
 
-	//let's wait more than enough for in-flight events to be parsed.
+	/*
+		While it might make sense to want to shut-down parser/buckets/etc. as soon as acquisition is finished,
+		we might have some pending buckets : buckets that overflowed, but which LeakRoutine are still alive because they
+		are waiting to be able to "commit" (push to db). This can happens specifically in a context where a lot of logs
+		are going to trigger overflow (ie. trigger buckets with ~100% of the logs triggering an overflow).
+
+		To avoid this (which would mean that we would "lose" some overflows), let's monitor the number of live buckets.
+		However, because of the blackhole mechanism, you can't really wait for the number of LeakRoutine to go to zero (we might have to wait $blackhole_duration).
+
+		So : we are waiting for the number of buckets to stop decreasing before returning. "how long" we should wait is a bit of the trick question,
+		as some operations (ie. reverse dns or such in post-overflow) can take some time :)
+	*/
+
+	bucketCount := leaky.LeakyRoutineCount
+	rounds := 0
+	successiveStillRounds := 0
+	for {
+		rounds++
+		time.Sleep(5 * time.Second)
+		currBucketCount := leaky.LeakyRoutineCount
+		if currBucketCount != bucketCount {
+			if rounds == 0 || rounds%2 == 0 {
+				log.Printf("Still %d live LeakRoutines, waiting (was %d)", currBucketCount, bucketCount)
+			}
+			bucketCount = currBucketCount
+			successiveStillRounds = 0
+		} else {
+			if successiveStillRounds > 1 {
+				log.Printf("LeakRoutines commit over.")
+				break
+			}
+			successiveStillRounds++
+		}
+	}
+
 	time.Sleep(5 * time.Second)
 
 	// wait for the parser to parse all events

+ 22 - 49
pkg/database/commit.go

@@ -1,8 +1,6 @@
 package database
 
 import (
-	"fmt"
-	"sync/atomic"
 	"time"
 
 	"github.com/crowdsecurity/crowdsec/pkg/types"
@@ -11,10 +9,11 @@ import (
 )
 
 func (c *Context) DeleteExpired() error {
+	c.lock.Lock()
+	defer c.lock.Unlock()
 	//Delete the expired records
 	now := time.Now()
 	if c.flush {
-		//retx := c.Db.Where(`strftime("%s", until) < strftime("%s", "now")`).Delete(types.BanApplication{})
 		retx := c.Db.Delete(types.BanApplication{}, "until < ?", now)
 		if retx.RowsAffected > 0 {
 			log.Infof("Flushed %d expired entries from Ban Application", retx.RowsAffected)
@@ -23,18 +22,8 @@ func (c *Context) DeleteExpired() error {
 	return nil
 }
 
+/*Flush doesn't do anything here : we are not using transactions or such, nothing to "flush" per se*/
 func (c *Context) Flush() error {
-	c.lock.Lock()
-	defer c.lock.Unlock()
-
-	ret := c.tx.Commit()
-
-	if ret.Error != nil {
-		c.tx = c.Db.Begin()
-		return fmt.Errorf("failed to commit records : %v", ret.Error)
-	}
-	c.tx = c.Db.Begin()
-	c.lastCommit = time.Now()
 	return nil
 }
 
@@ -60,22 +49,21 @@ func (c *Context) CleanUpRecordsByAge() error {
 		log.Debugf("no event older than %s", c.maxDurationRetention.String())
 		return nil
 	}
-	//let's do it in a single transaction
-	delTx := c.Db.Unscoped().Begin()
-	delRecords := 0
 
+	delRecords := 0
 	for _, record := range sos {
 		copy := record
-		delTx.Unscoped().Table("signal_occurences").Where("ID = ?", copy.SignalOccurenceID).Delete(&types.SignalOccurence{})
-		delTx.Unscoped().Table("event_sequences").Where("signal_occurence_id = ?", copy.SignalOccurenceID).Delete(&types.EventSequence{})
-		delTx.Unscoped().Table("ban_applications").Delete(&copy)
-		//we need to delete associations : event_sequences, signal_occurences
+		if ret := c.Db.Unscoped().Table("signal_occurences").Where("ID = ?", copy.SignalOccurenceID).Delete(&types.SignalOccurence{}); ret.Error != nil {
+			return errors.Wrap(ret.Error, "failed to clean signal_occurences")
+		}
+		if ret := c.Db.Unscoped().Table("event_sequences").Where("signal_occurence_id = ?", copy.SignalOccurenceID).Delete(&types.EventSequence{}); ret.Error != nil {
+			return errors.Wrap(ret.Error, "failed to clean event_sequences")
+		}
+		if ret := c.Db.Unscoped().Table("ban_applications").Delete(&copy); ret.Error != nil {
+			return errors.Wrap(ret.Error, "failed to clean ban_applications")
+		}
 		delRecords++
 	}
-	ret = delTx.Unscoped().Commit()
-	if ret.Error != nil {
-		return errors.Wrap(ret.Error, "failed to delete records")
-	}
 	log.Printf("max_records_age: deleting %d events (max age:%s)", delRecords, c.maxDurationRetention)
 	return nil
 }
@@ -107,13 +95,18 @@ func (c *Context) CleanUpRecordsByCount() error {
 	}
 
 	//let's do it in a single transaction
-	delTx := c.Db.Unscoped().Begin()
 	delRecords := 0
 	for _, ld := range sos {
 		copy := ld
-		delTx.Unscoped().Table("signal_occurences").Where("ID = ?", copy.SignalOccurenceID).Delete(&types.SignalOccurence{})
-		delTx.Unscoped().Table("event_sequences").Where("signal_occurence_id = ?", copy.SignalOccurenceID).Delete(&types.EventSequence{})
-		delTx.Unscoped().Table("ban_applications").Delete(&copy)
+		if ret := c.Db.Unscoped().Table("signal_occurences").Where("ID = ?", copy.SignalOccurenceID).Delete(&types.SignalOccurence{}); ret.Error != nil {
+			return errors.Wrap(ret.Error, "failed to clean signal_occurences")
+		}
+		if ret := c.Db.Unscoped().Table("event_sequences").Where("signal_occurence_id = ?", copy.SignalOccurenceID).Delete(&types.EventSequence{}); ret.Error != nil {
+			return errors.Wrap(ret.Error, "failed to clean event_sequences")
+		}
+		if ret := c.Db.Unscoped().Table("ban_applications").Delete(&copy); ret.Error != nil {
+			return errors.Wrap(ret.Error, "failed to clean ban_applications")
+		}
 		//we need to delete associations : event_sequences, signal_occurences
 		delRecords++
 		//let's delete as well the associated event_sequence
@@ -122,12 +115,7 @@ func (c *Context) CleanUpRecordsByCount() error {
 		}
 	}
 	if len(sos) > 0 {
-		//log.Printf("Deleting %d soft-deleted results out of %d total events (%d soft-deleted)", delRecords, count, len(sos))
 		log.Printf("max_records: deleting %d events. (%d soft-deleted)", delRecords, len(sos))
-		ret = delTx.Unscoped().Commit()
-		if ret.Error != nil {
-			return errors.Wrap(ret.Error, "failed to delete records")
-		}
 	} else {
 		log.Debugf("didn't find any record to clean")
 	}
@@ -145,7 +133,6 @@ func (c *Context) StartAutoCommit() error {
 
 func (c *Context) autoCommit() {
 	log.Debugf("starting autocommit")
-	ticker := time.NewTicker(200 * time.Millisecond)
 	cleanUpTicker := time.NewTicker(1 * time.Minute)
 	expireTicker := time.NewTicker(1 * time.Second)
 	if !c.flush {
@@ -159,12 +146,6 @@ func (c *Context) autoCommit() {
 			if err := c.Flush(); err != nil {
 				log.Errorf("error while flushing records: %s", err)
 			}
-			if ret := c.tx.Commit(); ret.Error != nil {
-				log.Errorf("failed to commit records : %v", ret.Error)
-			}
-			if err := c.tx.Close(); err != nil {
-				log.Errorf("error while closing tx : %s", err)
-			}
 			if err := c.Db.Close(); err != nil {
 				log.Errorf("error while closing db : %s", err)
 			}
@@ -173,14 +154,6 @@ func (c *Context) autoCommit() {
 			if err := c.DeleteExpired(); err != nil {
 				log.Errorf("Error while deleting expired records: %s", err)
 			}
-		case <-ticker.C:
-			if atomic.LoadInt32(&c.count) != 0 &&
-				(atomic.LoadInt32(&c.count)%100 == 0 || time.Since(c.lastCommit) >= 500*time.Millisecond) {
-				if err := c.Flush(); err != nil {
-					log.Errorf("failed to flush : %s", err)
-				}
-
-			}
 		case <-cleanUpTicker.C:
 			if err := c.CleanUpRecordsByCount(); err != nil {
 				log.Errorf("error in max records cleanup : %s", err)

+ 1 - 12
pkg/database/database.go

@@ -18,7 +18,6 @@ import (
 
 type Context struct {
 	Db         *gorm.DB //Pointer to database
-	tx         *gorm.DB //Pointer to current transaction (flushed on a regular basis)
 	lastCommit time.Time
 	flush      bool
 	count      int32
@@ -107,17 +106,7 @@ func NewDatabase(cfg map[string]string) (*Context, error) {
 	c.Db.AutoMigrate(&types.EventSequence{}, &types.SignalOccurence{}, &types.BanApplication{})
 	c.Db.Model(&types.SignalOccurence{}).Related(&types.EventSequence{})
 	c.Db.Model(&types.SignalOccurence{}).Related(&types.BanApplication{})
-	c.tx = c.Db.Begin()
-	c.lastCommit = time.Now()
-	ret := c.tx.Commit()
-
-	if ret.Error != nil {
-		return nil, fmt.Errorf("failed to commit records : %v", ret.Error)
 
-	}
-	c.tx = c.Db.Begin()
-	if c.tx == nil {
-		return nil, fmt.Errorf("failed to begin %s transac : %s", cfg["type"], err)
-	}
+	c.lastCommit = time.Now()
 	return c, nil
 }

+ 3 - 3
pkg/database/write.go

@@ -15,7 +15,7 @@ func (c *Context) WriteBanApplication(ban types.BanApplication) error {
 	c.lock.Lock()
 	defer c.lock.Unlock()
 	log.Debugf("Ban application being called : %s %s", ban.Scenario, ban.IpText)
-	ret := c.tx.Where(types.BanApplication{IpText: ban.IpText}).Assign(types.BanApplication{Until: ban.Until}).Assign(types.BanApplication{Reason: ban.Reason}).Assign(types.BanApplication{MeasureType: ban.MeasureType}).FirstOrCreate(&ban)
+	ret := c.Db.Where(types.BanApplication{IpText: ban.IpText}).Assign(types.BanApplication{Until: ban.Until}).Assign(types.BanApplication{Reason: ban.Reason}).Assign(types.BanApplication{MeasureType: ban.MeasureType}).FirstOrCreate(&ban)
 	if ret.Error != nil {
 		return fmt.Errorf("failed to write ban record : %v", ret.Error)
 	}
@@ -28,14 +28,14 @@ func (c *Context) WriteSignal(sig types.SignalOccurence) error {
 	defer c.lock.Unlock()
 	/*let's ensure we only have one ban active for a given scope*/
 	for _, ba := range sig.BanApplications {
-		ret := c.tx.Where("ip_text = ?", ba.IpText).Delete(types.BanApplication{})
+		ret := c.Db.Where("ip_text = ?", ba.IpText).Delete(types.BanApplication{})
 		if ret.Error != nil {
 			log.Errorf("While delete overlaping bans : %s", ret.Error)
 			return fmt.Errorf("failed to write signal occurrence : %v", ret.Error)
 		}
 	}
 	/*and add the new one(s)*/
-	ret := c.tx.Create(&sig)
+	ret := c.Db.Create(&sig)
 	if ret.Error != nil {
 		log.Errorf("While creating new bans : %s", ret.Error)
 		return fmt.Errorf("failed to write signal occurrence : %s", ret.Error)