commit.go 5.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190
  1. package sqlite
  2. import (
  3. "fmt"
  4. "sync/atomic"
  5. "time"
  6. "github.com/crowdsecurity/crowdsec/pkg/types"
  7. "github.com/pkg/errors"
  8. log "github.com/sirupsen/logrus"
  9. )
  10. func (c *Context) DeleteExpired() error {
  11. //Delete the expired records
  12. if c.flush {
  13. retx := c.Db.Where(`strftime("%s", until) < strftime("%s", "now")`).Delete(types.BanApplication{})
  14. if retx.RowsAffected > 0 {
  15. log.Infof("Flushed %d expired entries from Ban Application", retx.RowsAffected)
  16. }
  17. }
  18. return nil
  19. }
  20. func (c *Context) Flush() error {
  21. c.lock.Lock()
  22. defer c.lock.Unlock()
  23. ret := c.tx.Commit()
  24. if ret.Error != nil {
  25. c.tx = c.Db.Begin()
  26. return fmt.Errorf("failed to commit records : %v", ret.Error)
  27. }
  28. c.tx = c.Db.Begin()
  29. c.lastCommit = time.Now()
  30. return nil
  31. }
  32. func (c *Context) CleanUpRecordsByAge() error {
  33. //let's fetch all expired records that are more than XX days olds
  34. sos := []types.BanApplication{}
  35. if c.maxDurationRetention == 0 {
  36. return nil
  37. }
  38. //look for soft-deleted events that are OLDER than maxDurationRetention
  39. ret := c.Db.Unscoped().Table("ban_applications").Where("deleted_at is not NULL").
  40. Where(fmt.Sprintf("deleted_at > date('now','-%d minutes')", int(c.maxDurationRetention.Minutes()))).
  41. Order("updated_at desc").Find(&sos)
  42. if ret.Error != nil {
  43. return errors.Wrap(ret.Error, "failed to get count of old records")
  44. }
  45. //no events elligible
  46. if len(sos) == 0 || ret.RowsAffected == 0 {
  47. log.Debugf("no event older than %s", c.maxDurationRetention.String())
  48. return nil
  49. }
  50. //let's do it in a single transaction
  51. delTx := c.Db.Unscoped().Begin()
  52. delRecords := 0
  53. for _, record := range sos {
  54. copy := record
  55. delTx.Unscoped().Table("signal_occurences").Where("ID = ?", copy.SignalOccurenceID).Delete(&types.SignalOccurence{})
  56. delTx.Unscoped().Table("event_sequences").Where("signal_occurence_id = ?", copy.SignalOccurenceID).Delete(&types.EventSequence{})
  57. delTx.Unscoped().Table("ban_applications").Delete(&copy)
  58. //we need to delete associations : event_sequences, signal_occurences
  59. delRecords++
  60. }
  61. ret = delTx.Unscoped().Commit()
  62. if ret.Error != nil {
  63. return errors.Wrap(ret.Error, "failed to delete records")
  64. }
  65. log.Printf("max_records_age: deleting %d events (max age:%s)", delRecords, c.maxDurationRetention)
  66. return nil
  67. }
  68. func (c *Context) CleanUpRecordsByCount() error {
  69. var count int
  70. if c.maxEventRetention <= 0 {
  71. return nil
  72. }
  73. ret := c.Db.Unscoped().Table("ban_applications").Order("updated_at desc").Count(&count)
  74. if ret.Error != nil {
  75. return errors.Wrap(ret.Error, "failed to get bans count")
  76. }
  77. if count < c.maxEventRetention {
  78. log.Debugf("%d < %d, don't cleanup", count, c.maxEventRetention)
  79. return nil
  80. }
  81. sos := []types.BanApplication{}
  82. /*get soft deleted records oldest to youngest*/
  83. records := c.Db.Unscoped().Table("ban_applications").Where("deleted_at is not NULL").Where(`strftime("%s", deleted_at) < strftime("%s", "now")`).Find(&sos)
  84. if records.Error != nil {
  85. return errors.Wrap(records.Error, "failed to list expired bans for flush")
  86. }
  87. //let's do it in a single transaction
  88. delTx := c.Db.Unscoped().Begin()
  89. delRecords := 0
  90. for _, ld := range sos {
  91. copy := ld
  92. delTx.Unscoped().Table("signal_occurences").Where("ID = ?", copy.SignalOccurenceID).Delete(&types.SignalOccurence{})
  93. delTx.Unscoped().Table("event_sequences").Where("signal_occurence_id = ?", copy.SignalOccurenceID).Delete(&types.EventSequence{})
  94. delTx.Unscoped().Table("ban_applications").Delete(&copy)
  95. //we need to delete associations : event_sequences, signal_occurences
  96. delRecords++
  97. //let's delete as well the associated event_sequence
  98. if count-delRecords <= c.maxEventRetention {
  99. break
  100. }
  101. }
  102. if len(sos) > 0 {
  103. //log.Printf("Deleting %d soft-deleted results out of %d total events (%d soft-deleted)", delRecords, count, len(sos))
  104. log.Printf("max_records: deleting %d events. (%d soft-deleted)", delRecords, len(sos))
  105. ret = delTx.Unscoped().Commit()
  106. if ret.Error != nil {
  107. return errors.Wrap(ret.Error, "failed to delete records")
  108. }
  109. } else {
  110. log.Debugf("didn't find any record to clean")
  111. }
  112. return nil
  113. }
  114. func (c *Context) StartAutoCommit() error {
  115. //TBD : we shouldn't start auto-commit if we are in cli mode ?
  116. c.PusherTomb.Go(func() error {
  117. c.autoCommit()
  118. return nil
  119. })
  120. return nil
  121. }
  122. func (c *Context) autoCommit() {
  123. log.Debugf("starting autocommit")
  124. ticker := time.NewTicker(200 * time.Millisecond)
  125. cleanUpTicker := time.NewTicker(1 * time.Minute)
  126. expireTicker := time.NewTicker(1 * time.Second)
  127. if !c.flush {
  128. log.Debugf("flush is disabled")
  129. }
  130. for {
  131. select {
  132. case <-c.PusherTomb.Dying():
  133. //we need to shutdown
  134. log.Infof("sqlite routine shutdown")
  135. if err := c.Flush(); err != nil {
  136. log.Errorf("error while flushing records: %s", err)
  137. }
  138. if ret := c.tx.Commit(); ret.Error != nil {
  139. log.Errorf("failed to commit records : %v", ret.Error)
  140. }
  141. if err := c.tx.Close(); err != nil {
  142. log.Errorf("error while closing tx : %s", err)
  143. }
  144. if err := c.Db.Close(); err != nil {
  145. log.Errorf("error while closing db : %s", err)
  146. }
  147. return
  148. case <-expireTicker.C:
  149. if err := c.DeleteExpired(); err != nil {
  150. log.Errorf("Error while deleting expired records: %s", err)
  151. }
  152. case <-ticker.C:
  153. if atomic.LoadInt32(&c.count) != 0 &&
  154. (atomic.LoadInt32(&c.count)%100 == 0 || time.Since(c.lastCommit) >= 500*time.Millisecond) {
  155. if err := c.Flush(); err != nil {
  156. log.Errorf("failed to flush : %s", err)
  157. }
  158. }
  159. case <-cleanUpTicker.C:
  160. if err := c.CleanUpRecordsByCount(); err != nil {
  161. log.Errorf("error in max records cleanup : %s", err)
  162. }
  163. if err := c.CleanUpRecordsByAge(); err != nil {
  164. log.Errorf("error in old records cleanup : %s", err)
  165. }
  166. }
  167. }
  168. }