flush.go 8.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278
  1. package database
  2. import (
  3. "fmt"
  4. "time"
  5. "github.com/go-co-op/gocron"
  6. log "github.com/sirupsen/logrus"
  7. "github.com/crowdsecurity/crowdsec/pkg/csconfig"
  8. "github.com/crowdsecurity/crowdsec/pkg/database/ent/alert"
  9. "github.com/crowdsecurity/crowdsec/pkg/database/ent/bouncer"
  10. "github.com/crowdsecurity/crowdsec/pkg/database/ent/decision"
  11. "github.com/crowdsecurity/crowdsec/pkg/database/ent/event"
  12. "github.com/crowdsecurity/crowdsec/pkg/database/ent/machine"
  13. "github.com/crowdsecurity/crowdsec/pkg/types"
  14. )
  15. func (c *Client) StartFlushScheduler(config *csconfig.FlushDBCfg) (*gocron.Scheduler, error) {
  16. maxItems := 0
  17. maxAge := ""
  18. if config.MaxItems != nil && *config.MaxItems <= 0 {
  19. return nil, fmt.Errorf("max_items can't be zero or negative number")
  20. }
  21. if config.MaxItems != nil {
  22. maxItems = *config.MaxItems
  23. }
  24. if config.MaxAge != nil && *config.MaxAge != "" {
  25. maxAge = *config.MaxAge
  26. }
  27. // Init & Start cronjob every minute for alerts
  28. scheduler := gocron.NewScheduler(time.UTC)
  29. job, err := scheduler.Every(1).Minute().Do(c.FlushAlerts, maxAge, maxItems)
  30. if err != nil {
  31. return nil, fmt.Errorf("while starting FlushAlerts scheduler: %w", err)
  32. }
  33. job.SingletonMode()
  34. // Init & Start cronjob every hour for bouncers/agents
  35. if config.AgentsGC != nil {
  36. if config.AgentsGC.Cert != nil {
  37. duration, err := ParseDuration(*config.AgentsGC.Cert)
  38. if err != nil {
  39. return nil, fmt.Errorf("while parsing agents cert auto-delete duration: %w", err)
  40. }
  41. config.AgentsGC.CertDuration = &duration
  42. }
  43. if config.AgentsGC.LoginPassword != nil {
  44. duration, err := ParseDuration(*config.AgentsGC.LoginPassword)
  45. if err != nil {
  46. return nil, fmt.Errorf("while parsing agents login/password auto-delete duration: %w", err)
  47. }
  48. config.AgentsGC.LoginPasswordDuration = &duration
  49. }
  50. if config.AgentsGC.Api != nil {
  51. log.Warning("agents auto-delete for API auth is not supported (use cert or login_password)")
  52. }
  53. }
  54. if config.BouncersGC != nil {
  55. if config.BouncersGC.Cert != nil {
  56. duration, err := ParseDuration(*config.BouncersGC.Cert)
  57. if err != nil {
  58. return nil, fmt.Errorf("while parsing bouncers cert auto-delete duration: %w", err)
  59. }
  60. config.BouncersGC.CertDuration = &duration
  61. }
  62. if config.BouncersGC.Api != nil {
  63. duration, err := ParseDuration(*config.BouncersGC.Api)
  64. if err != nil {
  65. return nil, fmt.Errorf("while parsing bouncers api auto-delete duration: %w", err)
  66. }
  67. config.BouncersGC.ApiDuration = &duration
  68. }
  69. if config.BouncersGC.LoginPassword != nil {
  70. log.Warning("bouncers auto-delete for login/password auth is not supported (use cert or api)")
  71. }
  72. }
  73. baJob, err := scheduler.Every(1).Minute().Do(c.FlushAgentsAndBouncers, config.AgentsGC, config.BouncersGC)
  74. if err != nil {
  75. return nil, fmt.Errorf("while starting FlushAgentsAndBouncers scheduler: %w", err)
  76. }
  77. baJob.SingletonMode()
  78. scheduler.StartAsync()
  79. return scheduler, nil
  80. }
  81. func (c *Client) FlushOrphans() {
  82. /* While it has only been linked to some very corner-case bug : https://github.com/crowdsecurity/crowdsec/issues/778 */
  83. /* We want to take care of orphaned events for which the parent alert/decision has been deleted */
  84. eventsCount, err := c.Ent.Event.Delete().Where(event.Not(event.HasOwner())).Exec(c.CTX)
  85. if err != nil {
  86. c.Log.Warningf("error while deleting orphan events: %s", err)
  87. return
  88. }
  89. if eventsCount > 0 {
  90. c.Log.Infof("%d deleted orphan events", eventsCount)
  91. }
  92. eventsCount, err = c.Ent.Decision.Delete().Where(
  93. decision.Not(decision.HasOwner())).Where(decision.UntilLTE(time.Now().UTC())).Exec(c.CTX)
  94. if err != nil {
  95. c.Log.Warningf("error while deleting orphan decisions: %s", err)
  96. return
  97. }
  98. if eventsCount > 0 {
  99. c.Log.Infof("%d deleted orphan decisions", eventsCount)
  100. }
  101. }
  102. func (c *Client) flushBouncers(bouncersCfg *csconfig.AuthGCCfg) {
  103. if bouncersCfg == nil {
  104. return
  105. }
  106. if bouncersCfg.ApiDuration != nil {
  107. log.Debug("trying to delete old bouncers from api")
  108. deletionCount, err := c.Ent.Bouncer.Delete().Where(
  109. bouncer.LastPullLTE(time.Now().UTC().Add(-*bouncersCfg.ApiDuration)),
  110. ).Where(
  111. bouncer.AuthTypeEQ(types.ApiKeyAuthType),
  112. ).Exec(c.CTX)
  113. if err != nil {
  114. c.Log.Errorf("while auto-deleting expired bouncers (api key): %s", err)
  115. } else if deletionCount > 0 {
  116. c.Log.Infof("deleted %d expired bouncers (api auth)", deletionCount)
  117. }
  118. }
  119. if bouncersCfg.CertDuration != nil {
  120. log.Debug("trying to delete old bouncers from cert")
  121. deletionCount, err := c.Ent.Bouncer.Delete().Where(
  122. bouncer.LastPullLTE(time.Now().UTC().Add(-*bouncersCfg.CertDuration)),
  123. ).Where(
  124. bouncer.AuthTypeEQ(types.TlsAuthType),
  125. ).Exec(c.CTX)
  126. if err != nil {
  127. c.Log.Errorf("while auto-deleting expired bouncers (api key): %s", err)
  128. } else if deletionCount > 0 {
  129. c.Log.Infof("deleted %d expired bouncers (api auth)", deletionCount)
  130. }
  131. }
  132. }
  133. func (c *Client) flushAgents(agentsCfg *csconfig.AuthGCCfg) {
  134. if agentsCfg == nil {
  135. return
  136. }
  137. if agentsCfg.CertDuration != nil {
  138. log.Debug("trying to delete old agents from cert")
  139. deletionCount, err := c.Ent.Machine.Delete().Where(
  140. machine.LastHeartbeatLTE(time.Now().UTC().Add(-*agentsCfg.CertDuration)),
  141. ).Where(
  142. machine.Not(machine.HasAlerts()),
  143. ).Where(
  144. machine.AuthTypeEQ(types.TlsAuthType),
  145. ).Exec(c.CTX)
  146. log.Debugf("deleted %d entries", deletionCount)
  147. if err != nil {
  148. c.Log.Errorf("while auto-deleting expired machine (cert): %s", err)
  149. } else if deletionCount > 0 {
  150. c.Log.Infof("deleted %d expired machine (cert auth)", deletionCount)
  151. }
  152. }
  153. if agentsCfg.LoginPasswordDuration != nil {
  154. log.Debug("trying to delete old agents from password")
  155. deletionCount, err := c.Ent.Machine.Delete().Where(
  156. machine.LastHeartbeatLTE(time.Now().UTC().Add(-*agentsCfg.LoginPasswordDuration)),
  157. ).Where(
  158. machine.Not(machine.HasAlerts()),
  159. ).Where(
  160. machine.AuthTypeEQ(types.PasswordAuthType),
  161. ).Exec(c.CTX)
  162. log.Debugf("deleted %d entries", deletionCount)
  163. if err != nil {
  164. c.Log.Errorf("while auto-deleting expired machine (password): %s", err)
  165. } else if deletionCount > 0 {
  166. c.Log.Infof("deleted %d expired machine (password auth)", deletionCount)
  167. }
  168. }
  169. }
  170. func (c *Client) FlushAgentsAndBouncers(agentsCfg *csconfig.AuthGCCfg, bouncersCfg *csconfig.AuthGCCfg) error {
  171. log.Debug("starting FlushAgentsAndBouncers")
  172. c.flushBouncers(bouncersCfg)
  173. c.flushAgents(agentsCfg)
  174. return nil
  175. }
  176. func (c *Client) FlushAlerts(MaxAge string, MaxItems int) error {
  177. var deletedByAge int
  178. var deletedByNbItem int
  179. var totalAlerts int
  180. var err error
  181. if !c.CanFlush {
  182. c.Log.Debug("a list is being imported, flushing later")
  183. return nil
  184. }
  185. c.Log.Debug("Flushing orphan alerts")
  186. c.FlushOrphans()
  187. c.Log.Debug("Done flushing orphan alerts")
  188. totalAlerts, err = c.TotalAlerts()
  189. if err != nil {
  190. c.Log.Warningf("FlushAlerts (max items count): %s", err)
  191. return fmt.Errorf("unable to get alerts count: %w", err)
  192. }
  193. c.Log.Debugf("FlushAlerts (Total alerts): %d", totalAlerts)
  194. if MaxAge != "" {
  195. filter := map[string][]string{
  196. "created_before": {MaxAge},
  197. }
  198. nbDeleted, err := c.DeleteAlertWithFilter(filter)
  199. if err != nil {
  200. c.Log.Warningf("FlushAlerts (max age): %s", err)
  201. return fmt.Errorf("unable to flush alerts with filter until=%s: %w", MaxAge, err)
  202. }
  203. c.Log.Debugf("FlushAlerts (deleted max age alerts): %d", nbDeleted)
  204. deletedByAge = nbDeleted
  205. }
  206. if MaxItems > 0 {
  207. //We get the highest id for the alerts
  208. //We subtract MaxItems to avoid deleting alerts that are not old enough
  209. //This gives us the oldest alert that we want to keep
  210. //We then delete all the alerts with an id lower than this one
  211. //We can do this because the id is auto-increment, and the database won't reuse the same id twice
  212. lastAlert, err := c.QueryAlertWithFilter(map[string][]string{
  213. "sort": {"DESC"},
  214. "limit": {"1"},
  215. //we do not care about fetching the edges, we just want the id
  216. "with_decisions": {"false"},
  217. })
  218. c.Log.Debugf("FlushAlerts (last alert): %+v", lastAlert)
  219. if err != nil {
  220. c.Log.Errorf("FlushAlerts: could not get last alert: %s", err)
  221. return fmt.Errorf("could not get last alert: %w", err)
  222. }
  223. if len(lastAlert) != 0 {
  224. maxid := lastAlert[0].ID - MaxItems
  225. c.Log.Debugf("FlushAlerts (max id): %d", maxid)
  226. if maxid > 0 {
  227. //This may lead to orphan alerts (at least on MySQL), but the next time the flush job will run, they will be deleted
  228. deletedByNbItem, err = c.Ent.Alert.Delete().Where(alert.IDLT(maxid)).Exec(c.CTX)
  229. if err != nil {
  230. c.Log.Errorf("FlushAlerts: Could not delete alerts: %s", err)
  231. return fmt.Errorf("could not delete alerts: %w", err)
  232. }
  233. }
  234. }
  235. }
  236. if deletedByNbItem > 0 {
  237. c.Log.Infof("flushed %d/%d alerts because the max number of alerts has been reached (%d max)", deletedByNbItem, totalAlerts, MaxItems)
  238. }
  239. if deletedByAge > 0 {
  240. c.Log.Infof("flushed %d/%d alerts because they were created %s ago or more", deletedByAge, totalAlerts, MaxAge)
  241. }
  242. return nil
  243. }