commit.go 1.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354
  1. package sqlite
  2. import (
  3. "fmt"
  4. "sync/atomic"
  5. "time"
  6. sqlite3 "github.com/mattn/go-sqlite3"
  7. "github.com/crowdsecurity/crowdsec/pkg/types"
  8. log "github.com/sirupsen/logrus"
  9. )
  10. func (c *Context) Flush() error {
  11. c.lock.Lock()
  12. defer c.lock.Unlock()
  13. ret := c.tx.Commit()
  14. if ret.Error != nil {
  15. /*if the database is locked, don't overwrite the current transaction*/
  16. if ret.Error == sqlite3.ErrLocked {
  17. log.Errorf("sqlite commit : db is locked : %s", ret.Error)
  18. return ret.Error
  19. }
  20. /*if it's another error, create a new transaction to avoid locking ourselves in a bad state ?*/
  21. c.tx = c.Db.Begin()
  22. return fmt.Errorf("failed to commit records : %v", ret.Error)
  23. }
  24. c.tx = c.Db.Begin()
  25. c.lastCommit = time.Now()
  26. //Delete the expired records
  27. if c.flush {
  28. retx := c.Db.Where(`strftime("%s", until) < strftime("%s", "now")`).Delete(types.BanApplication{})
  29. if retx.RowsAffected > 0 {
  30. log.Infof("Flushed %d expired entries from Ban Application", retx.RowsAffected)
  31. }
  32. }
  33. return nil
  34. }
  35. func (c *Context) AutoCommit() {
  36. ticker := time.NewTicker(200 * time.Millisecond)
  37. for {
  38. select {
  39. case <-ticker.C:
  40. if atomic.LoadInt32(&c.count) != 0 &&
  41. (atomic.LoadInt32(&c.count)%100 == 0 || time.Since(c.lastCommit) >= 500*time.Millisecond) {
  42. if err := c.Flush(); err != nil {
  43. log.Errorf("failed to flush : %s", err)
  44. }
  45. }
  46. }
  47. }
  48. }