commit.go 1.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263
  1. package sqlite
  2. import (
  3. "fmt"
  4. "sync/atomic"
  5. "time"
  6. "github.com/crowdsecurity/crowdsec/pkg/types"
  7. log "github.com/sirupsen/logrus"
  8. )
  9. func (c *Context) Flush() error {
  10. c.lock.Lock()
  11. defer c.lock.Unlock()
  12. ret := c.tx.Commit()
  13. if ret.Error != nil {
  14. c.tx = c.Db.Begin()
  15. return fmt.Errorf("failed to commit records : %v", ret.Error)
  16. }
  17. c.tx = c.Db.Begin()
  18. c.lastCommit = time.Now()
  19. //Delete the expired records
  20. if c.flush {
  21. retx := c.Db.Where(`strftime("%s", until) < strftime("%s", "now")`).Delete(types.BanApplication{})
  22. if retx.RowsAffected > 0 {
  23. log.Infof("Flushed %d expired entries from Ban Application", retx.RowsAffected)
  24. }
  25. }
  26. return nil
  27. }
  28. func (c *Context) AutoCommit() {
  29. ticker := time.NewTicker(200 * time.Millisecond)
  30. for {
  31. select {
  32. case <-c.PusherTomb.Dying():
  33. //we need to shutdown
  34. log.Infof("sqlite routine shutdown")
  35. if err := c.Flush(); err != nil {
  36. log.Errorf("error while flushing records: %s", err)
  37. }
  38. if ret := c.tx.Commit(); ret.Error != nil {
  39. log.Errorf("failed to commit records : %v", ret.Error)
  40. }
  41. if err := c.tx.Close(); err != nil {
  42. log.Errorf("error while closing tx : %s", err)
  43. }
  44. if err := c.Db.Close(); err != nil {
  45. log.Errorf("error while closing db : %s", err)
  46. }
  47. return
  48. case <-ticker.C:
  49. if atomic.LoadInt32(&c.count) != 0 &&
  50. (atomic.LoadInt32(&c.count)%100 == 0 || time.Since(c.lastCommit) >= 500*time.Millisecond) {
  51. if err := c.Flush(); err != nil {
  52. log.Errorf("failed to flush : %s", err)
  53. }
  54. }
  55. }
  56. }
  57. }