commit.go 3.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130
  1. package sqlite
  2. import (
  3. "fmt"
  4. "sync/atomic"
  5. "time"
  6. "github.com/crowdsecurity/crowdsec/pkg/types"
  7. "github.com/pkg/errors"
  8. log "github.com/sirupsen/logrus"
  9. )
  10. func (c *Context) DeleteExpired() error {
  11. //Delete the expired records
  12. if c.flush {
  13. retx := c.Db.Where(`strftime("%s", until) < strftime("%s", "now")`).Delete(types.BanApplication{})
  14. if retx.RowsAffected > 0 {
  15. log.Infof("Flushed %d expired entries from Ban Application", retx.RowsAffected)
  16. }
  17. } else {
  18. log.Infof("flush is disabled")
  19. }
  20. return nil
  21. }
  22. func (c *Context) Flush() error {
  23. c.lock.Lock()
  24. defer c.lock.Unlock()
  25. ret := c.tx.Commit()
  26. if ret.Error != nil {
  27. c.tx = c.Db.Begin()
  28. return fmt.Errorf("failed to commit records : %v", ret.Error)
  29. }
  30. c.tx = c.Db.Begin()
  31. c.lastCommit = time.Now()
  32. c.DeleteExpired()
  33. return nil
  34. }
  35. func (c *Context) CleanUpRecordsByCount() error {
  36. var count int
  37. if c.maxEventRetention <= 0 {
  38. return nil
  39. }
  40. ret := c.Db.Unscoped().Table("ban_applications").Order("updated_at desc").Count(&count)
  41. if ret.Error != nil {
  42. return errors.Wrap(ret.Error, "failed to get bans count")
  43. }
  44. if count < c.maxEventRetention {
  45. log.Infof("%d < %d, don't cleanup", count, c.maxEventRetention)
  46. return nil
  47. }
  48. sos := []types.BanApplication{}
  49. /*get soft deleted records oldest to youngest*/
  50. records := c.Db.Unscoped().Table("ban_applications").Where("deleted_at is not NULL").Where(`strftime("%s", deleted_at) < strftime("%s", "now")`).Find(&sos)
  51. if records.Error != nil {
  52. return errors.Wrap(records.Error, "failed to list expired bans for flush")
  53. }
  54. //let's do it in a single transaction
  55. delTx := c.Db.Unscoped().Begin()
  56. delRecords := 0
  57. for _, ld := range sos {
  58. copy := ld
  59. delTx.Unscoped().Table("signal_occurences").Where("ID = ?", copy.SignalOccurenceID).Delete(&types.SignalOccurence{})
  60. delTx.Unscoped().Table("event_sequences").Where("signal_occurence_id = ?", copy.SignalOccurenceID).Delete(&types.EventSequence{})
  61. delTx.Unscoped().Table("ban_applications").Delete(&copy)
  62. //we need to delete associations : event_sequences, signal_occurences
  63. delRecords++
  64. //let's delete as well the associated event_sequence
  65. if count-delRecords <= c.maxEventRetention {
  66. break
  67. }
  68. }
  69. if len(sos) > 0 {
  70. log.Printf("Deleting %d soft-deleted results out of %d total events (%d soft-deleted)", delRecords, count, len(sos))
  71. ret = delTx.Unscoped().Commit()
  72. if ret.Error != nil {
  73. return errors.Wrap(ret.Error, "failed to delete records")
  74. }
  75. } else {
  76. log.Debugf("didn't find any record to clean")
  77. }
  78. return nil
  79. }
  80. func (c *Context) AutoCommit() {
  81. log.Infof("starting autocommit")
  82. ticker := time.NewTicker(200 * time.Millisecond)
  83. for {
  84. select {
  85. case <-c.PusherTomb.Dying():
  86. //we need to shutdown
  87. log.Infof("sqlite routine shutdown")
  88. if err := c.Flush(); err != nil {
  89. log.Errorf("error while flushing records: %s", err)
  90. }
  91. if ret := c.tx.Commit(); ret.Error != nil {
  92. log.Errorf("failed to commit records : %v", ret.Error)
  93. }
  94. if err := c.tx.Close(); err != nil {
  95. log.Errorf("error while closing tx : %s", err)
  96. }
  97. if err := c.Db.Close(); err != nil {
  98. log.Errorf("error while closing db : %s", err)
  99. }
  100. return
  101. case <-ticker.C:
  102. if atomic.LoadInt32(&c.count) != 0 &&
  103. (atomic.LoadInt32(&c.count)%100 == 0 || time.Since(c.lastCommit) >= 500*time.Millisecond) {
  104. //log.Warningf("flush time")
  105. if err := c.Flush(); err != nil {
  106. log.Errorf("failed to flush : %s", err)
  107. }
  108. //log.Printf("starting auto-cleanup")
  109. if err := c.CleanUpRecordsByCount(); err != nil {
  110. log.Errorf("error in auto-cleanup : %s", err)
  111. }
  112. }
  113. }
  114. }
  115. }