alerts.go 36 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117
  1. package database
  2. import (
  3. "context"
  4. "encoding/json"
  5. "fmt"
  6. "strconv"
  7. "strings"
  8. "time"
  9. "github.com/crowdsecurity/crowdsec/pkg/csconfig"
  10. "github.com/crowdsecurity/crowdsec/pkg/database/ent"
  11. "github.com/crowdsecurity/crowdsec/pkg/database/ent/alert"
  12. "github.com/crowdsecurity/crowdsec/pkg/database/ent/bouncer"
  13. "github.com/crowdsecurity/crowdsec/pkg/database/ent/decision"
  14. "github.com/crowdsecurity/crowdsec/pkg/database/ent/event"
  15. "github.com/crowdsecurity/crowdsec/pkg/database/ent/machine"
  16. "github.com/crowdsecurity/crowdsec/pkg/database/ent/meta"
  17. "github.com/crowdsecurity/crowdsec/pkg/database/ent/predicate"
  18. "github.com/crowdsecurity/crowdsec/pkg/models"
  19. "github.com/crowdsecurity/crowdsec/pkg/types"
  20. "github.com/davecgh/go-spew/spew"
  21. "github.com/pkg/errors"
  22. log "github.com/sirupsen/logrus"
  23. )
  24. const (
  25. paginationSize = 100 // used to queryAlert to avoid 'too many SQL variable'
  26. defaultLimit = 100 // default limit of element to returns when query alerts
  27. bulkSize = 50 // bulk size when create alerts
  28. decisionBulkSize = 50
  29. )
  30. func formatAlertAsString(machineId string, alert *models.Alert) []string {
  31. var retStr []string
  32. /**/
  33. src := ""
  34. if alert.Source != nil {
  35. if *alert.Source.Scope == types.Ip {
  36. src = fmt.Sprintf("ip %s", *alert.Source.Value)
  37. if alert.Source.Cn != "" {
  38. src += " (" + alert.Source.Cn
  39. if alert.Source.AsNumber != "" {
  40. src += "/" + alert.Source.AsNumber
  41. }
  42. src += ")"
  43. }
  44. } else if *alert.Source.Scope == types.Range {
  45. src = fmt.Sprintf("range %s", *alert.Source.Value)
  46. if alert.Source.Cn != "" {
  47. src += " (" + alert.Source.Cn
  48. if alert.Source.AsNumber != "" {
  49. src += "/" + alert.Source.AsNumber
  50. }
  51. src += ")"
  52. }
  53. } else {
  54. src = fmt.Sprintf("%s %s", *alert.Source.Scope, *alert.Source.Value)
  55. }
  56. } else {
  57. src = "empty source"
  58. }
  59. /**/
  60. reason := ""
  61. if *alert.Scenario != "" {
  62. reason = fmt.Sprintf("%s by %s", *alert.Scenario, src)
  63. } else if *alert.Message != "" {
  64. reason = fmt.Sprintf("%s by %s", *alert.Scenario, src)
  65. } else {
  66. reason = fmt.Sprintf("empty scenario by %s", src)
  67. }
  68. if len(alert.Decisions) > 0 {
  69. for _, decisionItem := range alert.Decisions {
  70. decision := ""
  71. if alert.Simulated != nil && *alert.Simulated {
  72. decision = "(simulated alert)"
  73. } else if decisionItem.Simulated != nil && *decisionItem.Simulated {
  74. decision = "(simulated decision)"
  75. }
  76. if log.GetLevel() >= log.DebugLevel {
  77. /*spew is expensive*/
  78. log.Debugf("%s", spew.Sdump(decisionItem))
  79. }
  80. decision += fmt.Sprintf("%s %s on %s %s", *decisionItem.Duration,
  81. *decisionItem.Type, *decisionItem.Scope, *decisionItem.Value)
  82. retStr = append(retStr,
  83. fmt.Sprintf("(%s/%s) %s : %s", machineId,
  84. *decisionItem.Origin, reason, decision))
  85. }
  86. } else {
  87. retStr = append(retStr, fmt.Sprintf("(%s) alert : %s", machineId, reason))
  88. }
  89. return retStr
  90. }
  91. func (c *Client) CreateAlert(machineID string, alertList []*models.Alert) ([]string, error) {
  92. pageStart := 0
  93. pageEnd := bulkSize
  94. ret := []string{}
  95. for {
  96. if pageEnd >= len(alertList) {
  97. results, err := c.CreateAlertBulk(machineID, alertList[pageStart:])
  98. if err != nil {
  99. return []string{}, fmt.Errorf("unable to create alerts: %s", err)
  100. }
  101. ret = append(ret, results...)
  102. break
  103. }
  104. results, err := c.CreateAlertBulk(machineID, alertList[pageStart:pageEnd])
  105. if err != nil {
  106. return []string{}, fmt.Errorf("unable to create alerts: %s", err)
  107. }
  108. ret = append(ret, results...)
  109. pageStart += bulkSize
  110. pageEnd += bulkSize
  111. }
  112. return ret, nil
  113. }
  114. /*We can't bulk both the alert and the decision at the same time. With new consensus, we want to bulk a single alert with a lot of decisions.*/
  115. func (c *Client) UpdateCommunityBlocklist(alertItem *models.Alert) (int, int, int, error) {
  116. var err error
  117. var deleted, inserted int
  118. if alertItem == nil {
  119. return 0, 0, 0, fmt.Errorf("nil alert")
  120. }
  121. if alertItem.StartAt == nil {
  122. return 0, 0, 0, fmt.Errorf("nil start_at")
  123. }
  124. startAtTime, err := time.Parse(time.RFC3339, *alertItem.StartAt)
  125. if err != nil {
  126. return 0, 0, 0, errors.Wrapf(ParseTimeFail, "start_at field time '%s': %s", *alertItem.StartAt, err)
  127. }
  128. if alertItem.StopAt == nil {
  129. return 0, 0, 0, fmt.Errorf("nil stop_at")
  130. }
  131. stopAtTime, err := time.Parse(time.RFC3339, *alertItem.StopAt)
  132. if err != nil {
  133. return 0, 0, 0, errors.Wrapf(ParseTimeFail, "stop_at field time '%s': %s", *alertItem.StopAt, err)
  134. }
  135. ts, err := time.Parse(time.RFC3339, *alertItem.StopAt)
  136. if err != nil {
  137. c.Log.Errorf("While parsing StartAt of item %s : %s", *alertItem.StopAt, err)
  138. ts = time.Now().UTC()
  139. }
  140. alertB := c.Ent.Alert.
  141. Create().
  142. SetScenario(*alertItem.Scenario).
  143. SetMessage(*alertItem.Message).
  144. SetEventsCount(*alertItem.EventsCount).
  145. SetStartedAt(startAtTime).
  146. SetStoppedAt(stopAtTime).
  147. SetSourceScope(*alertItem.Source.Scope).
  148. SetSourceValue(*alertItem.Source.Value).
  149. SetSourceIp(alertItem.Source.IP).
  150. SetSourceRange(alertItem.Source.Range).
  151. SetSourceAsNumber(alertItem.Source.AsNumber).
  152. SetSourceAsName(alertItem.Source.AsName).
  153. SetSourceCountry(alertItem.Source.Cn).
  154. SetSourceLatitude(alertItem.Source.Latitude).
  155. SetSourceLongitude(alertItem.Source.Longitude).
  156. SetCapacity(*alertItem.Capacity).
  157. SetLeakSpeed(*alertItem.Leakspeed).
  158. SetSimulated(*alertItem.Simulated).
  159. SetScenarioVersion(*alertItem.ScenarioVersion).
  160. SetScenarioHash(*alertItem.ScenarioHash)
  161. alertRef, err := alertB.Save(c.CTX)
  162. if err != nil {
  163. return 0, 0, 0, errors.Wrapf(BulkError, "error creating alert : %s", err)
  164. }
  165. if len(alertItem.Decisions) > 0 {
  166. decisionBulk := make([]*ent.DecisionCreate, 0, decisionBulkSize)
  167. valueList := make([]string, 0, decisionBulkSize)
  168. DecOrigin := CapiMachineID
  169. if *alertItem.Decisions[0].Origin == CapiMachineID || *alertItem.Decisions[0].Origin == CapiListsMachineID {
  170. DecOrigin = *alertItem.Decisions[0].Origin
  171. } else {
  172. log.Warningf("unexpected origin %s", *alertItem.Decisions[0].Origin)
  173. }
  174. for i, decisionItem := range alertItem.Decisions {
  175. var start_ip, start_sfx, end_ip, end_sfx int64
  176. var sz int
  177. if decisionItem.Duration == nil {
  178. log.Warning("nil duration in community decision")
  179. continue
  180. }
  181. duration, err := time.ParseDuration(*decisionItem.Duration)
  182. if err != nil {
  183. return 0, 0, 0, errors.Wrapf(ParseDurationFail, "decision duration '%+v' : %s", *decisionItem.Duration, err)
  184. }
  185. if decisionItem.Scope == nil {
  186. log.Warning("nil scope in community decision")
  187. continue
  188. }
  189. /*if the scope is IP or Range, convert the value to integers */
  190. if strings.ToLower(*decisionItem.Scope) == "ip" || strings.ToLower(*decisionItem.Scope) == "range" {
  191. sz, start_ip, start_sfx, end_ip, end_sfx, err = types.Addr2Ints(*decisionItem.Value)
  192. if err != nil {
  193. return 0, 0, 0, errors.Wrapf(ParseDurationFail, "invalid addr/range %s : %s", *decisionItem.Value, err)
  194. }
  195. }
  196. /*bulk insert some new decisions*/
  197. decisionBulk = append(decisionBulk, c.Ent.Decision.Create().
  198. SetUntil(ts.Add(duration)).
  199. SetScenario(*decisionItem.Scenario).
  200. SetType(*decisionItem.Type).
  201. SetStartIP(start_ip).
  202. SetStartSuffix(start_sfx).
  203. SetEndIP(end_ip).
  204. SetEndSuffix(end_sfx).
  205. SetIPSize(int64(sz)).
  206. SetValue(*decisionItem.Value).
  207. SetScope(*decisionItem.Scope).
  208. SetOrigin(*decisionItem.Origin).
  209. SetSimulated(*alertItem.Simulated).
  210. SetOwner(alertRef))
  211. /*for bulk delete of duplicate decisions*/
  212. if decisionItem.Value == nil {
  213. log.Warning("nil value in community decision")
  214. continue
  215. }
  216. valueList = append(valueList, *decisionItem.Value)
  217. if len(decisionBulk) == decisionBulkSize {
  218. insertedDecisions, err := c.Ent.Decision.CreateBulk(decisionBulk...).Save(c.CTX)
  219. if err != nil {
  220. return 0, 0, 0, errors.Wrapf(BulkError, "bulk creating decisions : %s", err)
  221. }
  222. inserted += len(insertedDecisions)
  223. /*Deleting older decisions from capi*/
  224. deletedDecisions, err := c.Ent.Decision.Delete().
  225. Where(decision.And(
  226. decision.OriginEQ(DecOrigin),
  227. decision.Not(decision.HasOwnerWith(alert.IDEQ(alertRef.ID))),
  228. decision.ValueIn(valueList...),
  229. )).Exec(c.CTX)
  230. if err != nil {
  231. return 0, 0, 0, errors.Wrap(err, "while deleting older community blocklist decisions")
  232. }
  233. deleted += deletedDecisions
  234. if len(alertItem.Decisions)-i <= decisionBulkSize {
  235. decisionBulk = make([]*ent.DecisionCreate, 0, (len(alertItem.Decisions) - i))
  236. valueList = make([]string, 0, (len(alertItem.Decisions) - i))
  237. } else {
  238. decisionBulk = make([]*ent.DecisionCreate, 0, decisionBulkSize)
  239. valueList = make([]string, 0, decisionBulkSize)
  240. }
  241. // The 90's called, they want their concurrency back.
  242. // This is needed for sqlite, which does not support concurrent access while writing.
  243. // If we pull a large number of IPs from CAPI, and we have a slow disk, LAPI won't respond until all IPs are inserted (which can take up to a few seconds).
  244. time.Sleep(100 * time.Millisecond)
  245. }
  246. }
  247. log.Debugf("deleted %d decisions for %s vs %s", deleted, DecOrigin, *alertItem.Decisions[0].Origin)
  248. insertedDecisions, err := c.Ent.Decision.CreateBulk(decisionBulk...).Save(c.CTX)
  249. if err != nil {
  250. return 0, 0, 0, errors.Wrapf(BulkError, "creating alert decisions: %s", err)
  251. }
  252. inserted += len(insertedDecisions)
  253. /*Deleting older decisions from capi*/
  254. if len(valueList) > 0 {
  255. deletedDecisions, err := c.Ent.Decision.Delete().
  256. Where(decision.And(
  257. decision.OriginEQ(DecOrigin),
  258. decision.Not(decision.HasOwnerWith(alert.IDEQ(alertRef.ID))),
  259. decision.ValueIn(valueList...),
  260. )).Exec(c.CTX)
  261. if err != nil {
  262. return 0, 0, 0, errors.Wrap(err, "while deleting older community blocklist decisions")
  263. }
  264. deleted += deletedDecisions
  265. }
  266. }
  267. return alertRef.ID, inserted, deleted, nil
  268. }
  269. func chunkDecisions(decisions []*ent.Decision, chunkSize int) [][]*ent.Decision {
  270. var ret [][]*ent.Decision
  271. var chunk []*ent.Decision
  272. for _, d := range decisions {
  273. chunk = append(chunk, d)
  274. if len(chunk) == chunkSize {
  275. ret = append(ret, chunk)
  276. chunk = nil
  277. }
  278. }
  279. if len(chunk) > 0 {
  280. ret = append(ret, chunk)
  281. }
  282. return ret
  283. }
  284. func (c *Client) CreateAlertBulk(machineId string, alertList []*models.Alert) ([]string, error) {
  285. ret := []string{}
  286. bulkSize := 20
  287. c.Log.Debugf("writing %d items", len(alertList))
  288. bulk := make([]*ent.AlertCreate, 0, bulkSize)
  289. alertDecisions := make([][]*ent.Decision, 0, bulkSize)
  290. for i, alertItem := range alertList {
  291. var decisions []*ent.Decision
  292. var metas []*ent.Meta
  293. var events []*ent.Event
  294. owner, err := c.QueryMachineByID(machineId)
  295. if err != nil {
  296. if errors.Cause(err) != UserNotExists {
  297. return []string{}, errors.Wrapf(QueryFail, "machine '%s': %s", alertItem.MachineID, err)
  298. }
  299. c.Log.Debugf("CreateAlertBulk: Machine Id %s doesn't exist", machineId)
  300. owner = nil
  301. }
  302. startAtTime, err := time.Parse(time.RFC3339, *alertItem.StartAt)
  303. if err != nil {
  304. c.Log.Errorf("CreateAlertBulk: Failed to parse startAtTime '%s', defaulting to now: %s", *alertItem.StartAt, err)
  305. startAtTime = time.Now().UTC()
  306. }
  307. stopAtTime, err := time.Parse(time.RFC3339, *alertItem.StopAt)
  308. if err != nil {
  309. c.Log.Errorf("CreateAlertBulk: Failed to parse stopAtTime '%s', defaulting to now: %s", *alertItem.StopAt, err)
  310. stopAtTime = time.Now().UTC()
  311. }
  312. /*display proper alert in logs*/
  313. for _, disp := range formatAlertAsString(machineId, alertItem) {
  314. c.Log.Info(disp)
  315. }
  316. //let's track when we strip or drop data, notify outside of loop to avoid spam
  317. stripped := false
  318. dropped := false
  319. if len(alertItem.Events) > 0 {
  320. eventBulk := make([]*ent.EventCreate, len(alertItem.Events))
  321. for i, eventItem := range alertItem.Events {
  322. ts, err := time.Parse(time.RFC3339, *eventItem.Timestamp)
  323. if err != nil {
  324. c.Log.Errorf("CreateAlertBulk: Failed to parse event timestamp '%s', defaulting to now: %s", *eventItem.Timestamp, err)
  325. ts = time.Now().UTC()
  326. }
  327. marshallMetas, err := json.Marshal(eventItem.Meta)
  328. if err != nil {
  329. return []string{}, errors.Wrapf(MarshalFail, "event meta '%v' : %s", eventItem.Meta, err)
  330. }
  331. //the serialized field is too big, let's try to progressively strip it
  332. if event.SerializedValidator(string(marshallMetas)) != nil {
  333. stripped = true
  334. valid := false
  335. stripSize := 2048
  336. for !valid && stripSize > 0 {
  337. for _, serializedItem := range eventItem.Meta {
  338. if len(serializedItem.Value) > stripSize*2 {
  339. serializedItem.Value = serializedItem.Value[:stripSize] + "<stripped>"
  340. }
  341. }
  342. marshallMetas, err = json.Marshal(eventItem.Meta)
  343. if err != nil {
  344. return []string{}, errors.Wrapf(MarshalFail, "event meta '%v' : %s", eventItem.Meta, err)
  345. }
  346. if event.SerializedValidator(string(marshallMetas)) == nil {
  347. valid = true
  348. }
  349. stripSize /= 2
  350. }
  351. //nothing worked, drop it
  352. if !valid {
  353. dropped = true
  354. stripped = false
  355. marshallMetas = []byte("")
  356. }
  357. }
  358. eventBulk[i] = c.Ent.Event.Create().
  359. SetTime(ts).
  360. SetSerialized(string(marshallMetas))
  361. }
  362. if stripped {
  363. c.Log.Warningf("stripped 'serialized' field (machine %s / scenario %s)", machineId, *alertItem.Scenario)
  364. }
  365. if dropped {
  366. c.Log.Warningf("dropped 'serialized' field (machine %s / scenario %s)", machineId, *alertItem.Scenario)
  367. }
  368. events, err = c.Ent.Event.CreateBulk(eventBulk...).Save(c.CTX)
  369. if err != nil {
  370. return []string{}, errors.Wrapf(BulkError, "creating alert events: %s", err)
  371. }
  372. }
  373. if len(alertItem.Meta) > 0 {
  374. metaBulk := make([]*ent.MetaCreate, len(alertItem.Meta))
  375. for i, metaItem := range alertItem.Meta {
  376. metaBulk[i] = c.Ent.Meta.Create().
  377. SetKey(metaItem.Key).
  378. SetValue(metaItem.Value)
  379. }
  380. metas, err = c.Ent.Meta.CreateBulk(metaBulk...).Save(c.CTX)
  381. if err != nil {
  382. return []string{}, errors.Wrapf(BulkError, "creating alert meta: %s", err)
  383. }
  384. }
  385. decisions = make([]*ent.Decision, 0)
  386. if len(alertItem.Decisions) > 0 {
  387. decisionBulk := make([]*ent.DecisionCreate, 0, decisionBulkSize)
  388. for i, decisionItem := range alertItem.Decisions {
  389. var start_ip, start_sfx, end_ip, end_sfx int64
  390. var sz int
  391. duration, err := time.ParseDuration(*decisionItem.Duration)
  392. if err != nil {
  393. return []string{}, errors.Wrapf(ParseDurationFail, "decision duration '%+v' : %s", *decisionItem.Duration, err)
  394. }
  395. /*if the scope is IP or Range, convert the value to integers */
  396. if strings.ToLower(*decisionItem.Scope) == "ip" || strings.ToLower(*decisionItem.Scope) == "range" {
  397. sz, start_ip, start_sfx, end_ip, end_sfx, err = types.Addr2Ints(*decisionItem.Value)
  398. if err != nil {
  399. return []string{}, errors.Wrapf(ParseDurationFail, "invalid addr/range %s : %s", *decisionItem.Value, err)
  400. }
  401. }
  402. decisionCreate := c.Ent.Decision.Create().
  403. SetUntil(stopAtTime.Add(duration)).
  404. SetScenario(*decisionItem.Scenario).
  405. SetType(*decisionItem.Type).
  406. SetStartIP(start_ip).
  407. SetStartSuffix(start_sfx).
  408. SetEndIP(end_ip).
  409. SetEndSuffix(end_sfx).
  410. SetIPSize(int64(sz)).
  411. SetValue(*decisionItem.Value).
  412. SetScope(*decisionItem.Scope).
  413. SetOrigin(*decisionItem.Origin).
  414. SetSimulated(*alertItem.Simulated)
  415. decisionBulk = append(decisionBulk, decisionCreate)
  416. if len(decisionBulk) == decisionBulkSize {
  417. decisionsCreateRet, err := c.Ent.Decision.CreateBulk(decisionBulk...).Save(c.CTX)
  418. if err != nil {
  419. return []string{}, errors.Wrapf(BulkError, "creating alert decisions: %s", err)
  420. }
  421. decisions = append(decisions, decisionsCreateRet...)
  422. if len(alertItem.Decisions)-i <= decisionBulkSize {
  423. decisionBulk = make([]*ent.DecisionCreate, 0, (len(alertItem.Decisions) - i))
  424. } else {
  425. decisionBulk = make([]*ent.DecisionCreate, 0, decisionBulkSize)
  426. }
  427. }
  428. }
  429. decisionsCreateRet, err := c.Ent.Decision.CreateBulk(decisionBulk...).Save(c.CTX)
  430. if err != nil {
  431. return []string{}, errors.Wrapf(BulkError, "creating alert decisions: %s", err)
  432. }
  433. decisions = append(decisions, decisionsCreateRet...)
  434. }
  435. alertB := c.Ent.Alert.
  436. Create().
  437. SetScenario(*alertItem.Scenario).
  438. SetMessage(*alertItem.Message).
  439. SetEventsCount(*alertItem.EventsCount).
  440. SetStartedAt(startAtTime).
  441. SetStoppedAt(stopAtTime).
  442. SetSourceScope(*alertItem.Source.Scope).
  443. SetSourceValue(*alertItem.Source.Value).
  444. SetSourceIp(alertItem.Source.IP).
  445. SetSourceRange(alertItem.Source.Range).
  446. SetSourceAsNumber(alertItem.Source.AsNumber).
  447. SetSourceAsName(alertItem.Source.AsName).
  448. SetSourceCountry(alertItem.Source.Cn).
  449. SetSourceLatitude(alertItem.Source.Latitude).
  450. SetSourceLongitude(alertItem.Source.Longitude).
  451. SetCapacity(*alertItem.Capacity).
  452. SetLeakSpeed(*alertItem.Leakspeed).
  453. SetSimulated(*alertItem.Simulated).
  454. SetScenarioVersion(*alertItem.ScenarioVersion).
  455. SetScenarioHash(*alertItem.ScenarioHash).
  456. AddEvents(events...).
  457. AddMetas(metas...)
  458. if owner != nil {
  459. alertB.SetOwner(owner)
  460. }
  461. bulk = append(bulk, alertB)
  462. alertDecisions = append(alertDecisions, decisions)
  463. if len(bulk) == bulkSize {
  464. alerts, err := c.Ent.Alert.CreateBulk(bulk...).Save(c.CTX)
  465. if err != nil {
  466. return []string{}, errors.Wrapf(BulkError, "bulk creating alert : %s", err)
  467. }
  468. for alertIndex, a := range alerts {
  469. ret = append(ret, strconv.Itoa(a.ID))
  470. d := alertDecisions[alertIndex]
  471. decisionsChunk := chunkDecisions(d, bulkSize)
  472. for _, d2 := range decisionsChunk {
  473. _, err := c.Ent.Alert.Update().Where(alert.IDEQ(a.ID)).AddDecisions(d2...).Save(c.CTX)
  474. if err != nil {
  475. return []string{}, fmt.Errorf("error while updating decisions: %s", err)
  476. }
  477. }
  478. }
  479. if len(alertList)-i <= bulkSize {
  480. bulk = make([]*ent.AlertCreate, 0, (len(alertList) - i))
  481. alertDecisions = make([][]*ent.Decision, 0, (len(alertList) - i))
  482. } else {
  483. bulk = make([]*ent.AlertCreate, 0, bulkSize)
  484. alertDecisions = make([][]*ent.Decision, 0, bulkSize)
  485. }
  486. }
  487. }
  488. alerts, err := c.Ent.Alert.CreateBulk(bulk...).Save(c.CTX)
  489. if err != nil {
  490. return []string{}, errors.Wrapf(BulkError, "leftovers creating alert : %s", err)
  491. }
  492. for alertIndex, a := range alerts {
  493. ret = append(ret, strconv.Itoa(a.ID))
  494. d := alertDecisions[alertIndex]
  495. decisionsChunk := chunkDecisions(d, bulkSize)
  496. for _, d2 := range decisionsChunk {
  497. _, err := c.Ent.Alert.Update().Where(alert.IDEQ(a.ID)).AddDecisions(d2...).Save(c.CTX)
  498. if err != nil {
  499. return []string{}, fmt.Errorf("error while updating decisions: %s", err)
  500. }
  501. }
  502. }
  503. return ret, nil
  504. }
  505. func AlertPredicatesFromFilter(filter map[string][]string) ([]predicate.Alert, error) {
  506. predicates := make([]predicate.Alert, 0)
  507. var err error
  508. var start_ip, start_sfx, end_ip, end_sfx int64
  509. var hasActiveDecision bool
  510. var ip_sz int
  511. var contains bool = true
  512. /*if contains is true, return bans that *contains* the given value (value is the inner)
  513. else, return bans that are *contained* by the given value (value is the outer)*/
  514. /*the simulated filter is a bit different : if it's not present *or* set to false, specifically exclude records with simulated to true */
  515. if v, ok := filter["simulated"]; ok {
  516. if v[0] == "false" {
  517. predicates = append(predicates, alert.SimulatedEQ(false))
  518. }
  519. }
  520. if _, ok := filter["origin"]; ok {
  521. filter["include_capi"] = []string{"true"}
  522. }
  523. for param, value := range filter {
  524. switch param {
  525. case "contains":
  526. contains, err = strconv.ParseBool(value[0])
  527. if err != nil {
  528. return nil, errors.Wrapf(InvalidFilter, "invalid contains value : %s", err)
  529. }
  530. case "scope":
  531. var scope string = value[0]
  532. if strings.ToLower(scope) == "ip" {
  533. scope = types.Ip
  534. } else if strings.ToLower(scope) == "range" {
  535. scope = types.Range
  536. }
  537. predicates = append(predicates, alert.SourceScopeEQ(scope))
  538. case "value":
  539. predicates = append(predicates, alert.SourceValueEQ(value[0]))
  540. case "scenario":
  541. predicates = append(predicates, alert.HasDecisionsWith(decision.ScenarioEQ(value[0])))
  542. case "ip", "range":
  543. ip_sz, start_ip, start_sfx, end_ip, end_sfx, err = types.Addr2Ints(value[0])
  544. if err != nil {
  545. return nil, errors.Wrapf(InvalidIPOrRange, "unable to convert '%s' to int: %s", value[0], err)
  546. }
  547. case "since":
  548. duration, err := types.ParseDuration(value[0])
  549. if err != nil {
  550. return nil, errors.Wrap(err, "while parsing duration")
  551. }
  552. since := time.Now().UTC().Add(-duration)
  553. if since.IsZero() {
  554. return nil, fmt.Errorf("Empty time now() - %s", since.String())
  555. }
  556. predicates = append(predicates, alert.StartedAtGTE(since))
  557. case "created_before":
  558. duration, err := types.ParseDuration(value[0])
  559. if err != nil {
  560. return nil, errors.Wrap(err, "while parsing duration")
  561. }
  562. since := time.Now().UTC().Add(-duration)
  563. if since.IsZero() {
  564. return nil, fmt.Errorf("Empty time now() - %s", since.String())
  565. }
  566. predicates = append(predicates, alert.CreatedAtLTE(since))
  567. case "until":
  568. duration, err := types.ParseDuration(value[0])
  569. if err != nil {
  570. return nil, errors.Wrap(err, "while parsing duration")
  571. }
  572. until := time.Now().UTC().Add(-duration)
  573. if until.IsZero() {
  574. return nil, fmt.Errorf("Empty time now() - %s", until.String())
  575. }
  576. predicates = append(predicates, alert.StartedAtLTE(until))
  577. case "decision_type":
  578. predicates = append(predicates, alert.HasDecisionsWith(decision.TypeEQ(value[0])))
  579. case "origin":
  580. predicates = append(predicates, alert.HasDecisionsWith(decision.OriginEQ(value[0])))
  581. case "include_capi": //allows to exclude one or more specific origins
  582. if value[0] == "false" {
  583. predicates = append(predicates, alert.HasDecisionsWith(decision.Or(decision.OriginEQ("crowdsec"), decision.OriginEQ("cscli"))))
  584. } else if value[0] != "true" {
  585. log.Errorf("Invalid bool '%s' for include_capi", value[0])
  586. }
  587. case "has_active_decision":
  588. if hasActiveDecision, err = strconv.ParseBool(value[0]); err != nil {
  589. return nil, errors.Wrapf(ParseType, "'%s' is not a boolean: %s", value[0], err)
  590. }
  591. if hasActiveDecision {
  592. predicates = append(predicates, alert.HasDecisionsWith(decision.UntilGTE(time.Now().UTC())))
  593. } else {
  594. predicates = append(predicates, alert.Not(alert.HasDecisions()))
  595. }
  596. case "limit":
  597. continue
  598. case "sort":
  599. continue
  600. case "simulated":
  601. continue
  602. default:
  603. return nil, errors.Wrapf(InvalidFilter, "Filter parameter '%s' is unknown (=%s)", param, value[0])
  604. }
  605. }
  606. if ip_sz == 4 {
  607. if contains { /*decision contains {start_ip,end_ip}*/
  608. predicates = append(predicates, alert.And(
  609. alert.HasDecisionsWith(decision.StartIPLTE(start_ip)),
  610. alert.HasDecisionsWith(decision.EndIPGTE(end_ip)),
  611. alert.HasDecisionsWith(decision.IPSizeEQ(int64(ip_sz))),
  612. ))
  613. } else { /*decision is contained within {start_ip,end_ip}*/
  614. predicates = append(predicates, alert.And(
  615. alert.HasDecisionsWith(decision.StartIPGTE(start_ip)),
  616. alert.HasDecisionsWith(decision.EndIPLTE(end_ip)),
  617. alert.HasDecisionsWith(decision.IPSizeEQ(int64(ip_sz))),
  618. ))
  619. }
  620. } else if ip_sz == 16 {
  621. if contains { /*decision contains {start_ip,end_ip}*/
  622. predicates = append(predicates, alert.And(
  623. //matching addr size
  624. alert.HasDecisionsWith(decision.IPSizeEQ(int64(ip_sz))),
  625. alert.Or(
  626. //decision.start_ip < query.start_ip
  627. alert.HasDecisionsWith(decision.StartIPLT(start_ip)),
  628. alert.And(
  629. //decision.start_ip == query.start_ip
  630. alert.HasDecisionsWith(decision.StartIPEQ(start_ip)),
  631. //decision.start_suffix <= query.start_suffix
  632. alert.HasDecisionsWith(decision.StartSuffixLTE(start_sfx)),
  633. )),
  634. alert.Or(
  635. //decision.end_ip > query.end_ip
  636. alert.HasDecisionsWith(decision.EndIPGT(end_ip)),
  637. alert.And(
  638. //decision.end_ip == query.end_ip
  639. alert.HasDecisionsWith(decision.EndIPEQ(end_ip)),
  640. //decision.end_suffix >= query.end_suffix
  641. alert.HasDecisionsWith(decision.EndSuffixGTE(end_sfx)),
  642. ),
  643. ),
  644. ))
  645. } else { /*decision is contained within {start_ip,end_ip}*/
  646. predicates = append(predicates, alert.And(
  647. //matching addr size
  648. alert.HasDecisionsWith(decision.IPSizeEQ(int64(ip_sz))),
  649. alert.Or(
  650. //decision.start_ip > query.start_ip
  651. alert.HasDecisionsWith(decision.StartIPGT(start_ip)),
  652. alert.And(
  653. //decision.start_ip == query.start_ip
  654. alert.HasDecisionsWith(decision.StartIPEQ(start_ip)),
  655. //decision.start_suffix >= query.start_suffix
  656. alert.HasDecisionsWith(decision.StartSuffixGTE(start_sfx)),
  657. )),
  658. alert.Or(
  659. //decision.end_ip < query.end_ip
  660. alert.HasDecisionsWith(decision.EndIPLT(end_ip)),
  661. alert.And(
  662. //decision.end_ip == query.end_ip
  663. alert.HasDecisionsWith(decision.EndIPEQ(end_ip)),
  664. //decision.end_suffix <= query.end_suffix
  665. alert.HasDecisionsWith(decision.EndSuffixLTE(end_sfx)),
  666. ),
  667. ),
  668. ))
  669. }
  670. } else if ip_sz != 0 {
  671. return nil, errors.Wrapf(InvalidFilter, "Unknown ip size %d", ip_sz)
  672. }
  673. return predicates, nil
  674. }
  675. func BuildAlertRequestFromFilter(alerts *ent.AlertQuery, filter map[string][]string) (*ent.AlertQuery, error) {
  676. preds, err := AlertPredicatesFromFilter(filter)
  677. if err != nil {
  678. return nil, err
  679. }
  680. return alerts.Where(preds...), nil
  681. }
  682. func (c *Client) AlertsCountPerScenario(filters map[string][]string) (map[string]int, error) {
  683. var res []struct {
  684. Scenario string
  685. Count int
  686. }
  687. ctx := context.Background()
  688. query := c.Ent.Alert.Query()
  689. query, err := BuildAlertRequestFromFilter(query, filters)
  690. if err != nil {
  691. return nil, errors.Wrap(err, "failed to build alert request")
  692. }
  693. err = query.GroupBy(alert.FieldScenario).Aggregate(ent.Count()).Scan(ctx, &res)
  694. if err != nil {
  695. return nil, errors.Wrap(err, "failed to count alerts per scenario")
  696. }
  697. counts := make(map[string]int)
  698. for _, r := range res {
  699. counts[r.Scenario] = r.Count
  700. }
  701. return counts, nil
  702. }
  703. func (c *Client) TotalAlerts() (int, error) {
  704. return c.Ent.Alert.Query().Count(c.CTX)
  705. }
  706. func (c *Client) QueryAlertWithFilter(filter map[string][]string) ([]*ent.Alert, error) {
  707. sort := "DESC" // we sort by desc by default
  708. if val, ok := filter["sort"]; ok {
  709. if val[0] != "ASC" && val[0] != "DESC" {
  710. c.Log.Errorf("invalid 'sort' parameter: %s", val)
  711. } else {
  712. sort = val[0]
  713. }
  714. }
  715. limit := defaultLimit
  716. if val, ok := filter["limit"]; ok {
  717. limitConv, err := strconv.Atoi(val[0])
  718. if err != nil {
  719. return []*ent.Alert{}, errors.Wrapf(QueryFail, "bad limit in parameters: %s", val)
  720. }
  721. limit = limitConv
  722. }
  723. offset := 0
  724. ret := make([]*ent.Alert, 0)
  725. for {
  726. alerts := c.Ent.Alert.Query()
  727. alerts, err := BuildAlertRequestFromFilter(alerts, filter)
  728. if err != nil {
  729. return []*ent.Alert{}, err
  730. }
  731. alerts = alerts.
  732. WithDecisions().
  733. WithEvents().
  734. WithMetas().
  735. WithOwner()
  736. if limit == 0 {
  737. limit, err = alerts.Count(c.CTX)
  738. if err != nil {
  739. return []*ent.Alert{}, fmt.Errorf("unable to count nb alerts: %s", err)
  740. }
  741. }
  742. if sort == "ASC" {
  743. alerts = alerts.Order(ent.Asc(alert.FieldCreatedAt), ent.Asc(alert.FieldID))
  744. } else {
  745. alerts = alerts.Order(ent.Desc(alert.FieldCreatedAt), ent.Desc(alert.FieldID))
  746. }
  747. result, err := alerts.Limit(paginationSize).Offset(offset).All(c.CTX)
  748. if err != nil {
  749. return []*ent.Alert{}, errors.Wrapf(QueryFail, "pagination size: %d, offset: %d: %s", paginationSize, offset, err)
  750. }
  751. if diff := limit - len(ret); diff < paginationSize {
  752. if len(result) < diff {
  753. ret = append(ret, result...)
  754. c.Log.Debugf("Pagination done, %d < %d", len(result), diff)
  755. break
  756. }
  757. ret = append(ret, result[0:diff]...)
  758. } else {
  759. ret = append(ret, result...)
  760. }
  761. if len(ret) == limit || len(ret) == 0 || len(ret) < paginationSize {
  762. c.Log.Debugf("Pagination done len(ret) = %d", len(ret))
  763. break
  764. }
  765. offset += paginationSize
  766. }
  767. return ret, nil
  768. }
  769. func (c *Client) DeleteAlertGraphBatch(alertItems []*ent.Alert) (int, error) {
  770. idList := make([]int, 0)
  771. for _, alert := range alertItems {
  772. idList = append(idList, alert.ID)
  773. }
  774. _, err := c.Ent.Event.Delete().
  775. Where(event.HasOwnerWith(alert.IDIn(idList...))).Exec(c.CTX)
  776. if err != nil {
  777. c.Log.Warningf("DeleteAlertGraphBatch : %s", err)
  778. return 0, errors.Wrapf(DeleteFail, "alert graph delete batch events")
  779. }
  780. _, err = c.Ent.Meta.Delete().
  781. Where(meta.HasOwnerWith(alert.IDIn(idList...))).Exec(c.CTX)
  782. if err != nil {
  783. c.Log.Warningf("DeleteAlertGraphBatch : %s", err)
  784. return 0, errors.Wrapf(DeleteFail, "alert graph delete batch meta")
  785. }
  786. _, err = c.Ent.Decision.Delete().
  787. Where(decision.HasOwnerWith(alert.IDIn(idList...))).Exec(c.CTX)
  788. if err != nil {
  789. c.Log.Warningf("DeleteAlertGraphBatch : %s", err)
  790. return 0, errors.Wrapf(DeleteFail, "alert graph delete batch decisions")
  791. }
  792. deleted, err := c.Ent.Alert.Delete().
  793. Where(alert.IDIn(idList...)).Exec(c.CTX)
  794. if err != nil {
  795. c.Log.Warningf("DeleteAlertGraphBatch : %s", err)
  796. return deleted, errors.Wrapf(DeleteFail, "alert graph delete batch")
  797. }
  798. c.Log.Debug("Done batch delete alerts")
  799. return deleted, nil
  800. }
  801. func (c *Client) DeleteAlertGraph(alertItem *ent.Alert) error {
  802. // delete the associated events
  803. _, err := c.Ent.Event.Delete().
  804. Where(event.HasOwnerWith(alert.IDEQ(alertItem.ID))).Exec(c.CTX)
  805. if err != nil {
  806. c.Log.Warningf("DeleteAlertGraph : %s", err)
  807. return errors.Wrapf(DeleteFail, "event with alert ID '%d'", alertItem.ID)
  808. }
  809. // delete the associated meta
  810. _, err = c.Ent.Meta.Delete().
  811. Where(meta.HasOwnerWith(alert.IDEQ(alertItem.ID))).Exec(c.CTX)
  812. if err != nil {
  813. c.Log.Warningf("DeleteAlertGraph : %s", err)
  814. return errors.Wrapf(DeleteFail, "meta with alert ID '%d'", alertItem.ID)
  815. }
  816. // delete the associated decisions
  817. _, err = c.Ent.Decision.Delete().
  818. Where(decision.HasOwnerWith(alert.IDEQ(alertItem.ID))).Exec(c.CTX)
  819. if err != nil {
  820. c.Log.Warningf("DeleteAlertGraph : %s", err)
  821. return errors.Wrapf(DeleteFail, "decision with alert ID '%d'", alertItem.ID)
  822. }
  823. // delete the alert
  824. err = c.Ent.Alert.DeleteOne(alertItem).Exec(c.CTX)
  825. if err != nil {
  826. c.Log.Warningf("DeleteAlertGraph : %s", err)
  827. return errors.Wrapf(DeleteFail, "alert with ID '%d'", alertItem.ID)
  828. }
  829. return nil
  830. }
  831. func (c *Client) DeleteAlertByID(id int) error {
  832. alertItem, err := c.Ent.Alert.Query().Where(alert.IDEQ(id)).Only(c.CTX)
  833. if err != nil {
  834. return err
  835. }
  836. return c.DeleteAlertGraph(alertItem)
  837. }
  838. func (c *Client) DeleteAlertWithFilter(filter map[string][]string) (int, error) {
  839. preds, err := AlertPredicatesFromFilter(filter)
  840. if err != nil {
  841. return 0, err
  842. }
  843. return c.Ent.Alert.Delete().Where(preds...).Exec(c.CTX)
  844. }
  845. func (c *Client) FlushOrphans() {
  846. /* While it has only been linked to some very corner-case bug : https://github.com/crowdsecurity/crowdsec/issues/778 */
  847. /* We want to take care of orphaned events for which the parent alert/decision has been deleted */
  848. events_count, err := c.Ent.Event.Delete().Where(event.Not(event.HasOwner())).Exec(c.CTX)
  849. if err != nil {
  850. c.Log.Warningf("error while deleting orphan events : %s", err)
  851. return
  852. }
  853. if events_count > 0 {
  854. c.Log.Infof("%d deleted orphan events", events_count)
  855. }
  856. events_count, err = c.Ent.Decision.Delete().Where(
  857. decision.Not(decision.HasOwner())).Where(decision.UntilLTE(time.Now().UTC())).Exec(c.CTX)
  858. if err != nil {
  859. c.Log.Warningf("error while deleting orphan decisions : %s", err)
  860. return
  861. }
  862. if events_count > 0 {
  863. c.Log.Infof("%d deleted orphan decisions", events_count)
  864. }
  865. }
  866. func (c *Client) FlushAgentsAndBouncers(agentsCfg *csconfig.AuthGCCfg, bouncersCfg *csconfig.AuthGCCfg) error {
  867. log.Debug("starting FlushAgentsAndBouncers")
  868. if bouncersCfg != nil {
  869. if bouncersCfg.ApiDuration != nil {
  870. log.Debug("trying to delete old bouncers from api")
  871. deletionCount, err := c.Ent.Bouncer.Delete().Where(
  872. bouncer.LastPullLTE(time.Now().UTC().Add(-*bouncersCfg.ApiDuration)),
  873. ).Where(
  874. bouncer.AuthTypeEQ(types.ApiKeyAuthType),
  875. ).Exec(c.CTX)
  876. if err != nil {
  877. c.Log.Errorf("while auto-deleting expired bouncers (api key) : %s", err)
  878. } else if deletionCount > 0 {
  879. c.Log.Infof("deleted %d expired bouncers (api auth)", deletionCount)
  880. }
  881. }
  882. if bouncersCfg.CertDuration != nil {
  883. log.Debug("trying to delete old bouncers from cert")
  884. deletionCount, err := c.Ent.Bouncer.Delete().Where(
  885. bouncer.LastPullLTE(time.Now().UTC().Add(-*bouncersCfg.CertDuration)),
  886. ).Where(
  887. bouncer.AuthTypeEQ(types.TlsAuthType),
  888. ).Exec(c.CTX)
  889. if err != nil {
  890. c.Log.Errorf("while auto-deleting expired bouncers (api key) : %s", err)
  891. } else if deletionCount > 0 {
  892. c.Log.Infof("deleted %d expired bouncers (api auth)", deletionCount)
  893. }
  894. }
  895. }
  896. if agentsCfg != nil {
  897. if agentsCfg.CertDuration != nil {
  898. log.Debug("trying to delete old agents from cert")
  899. deletionCount, err := c.Ent.Machine.Delete().Where(
  900. machine.LastHeartbeatLTE(time.Now().UTC().Add(-*agentsCfg.CertDuration)),
  901. ).Where(
  902. machine.Not(machine.HasAlerts()),
  903. ).Where(
  904. machine.AuthTypeEQ(types.TlsAuthType),
  905. ).Exec(c.CTX)
  906. log.Debugf("deleted %d entries", deletionCount)
  907. if err != nil {
  908. c.Log.Errorf("while auto-deleting expired machine (cert) : %s", err)
  909. } else if deletionCount > 0 {
  910. c.Log.Infof("deleted %d expired machine (cert auth)", deletionCount)
  911. }
  912. }
  913. if agentsCfg.LoginPasswordDuration != nil {
  914. log.Debug("trying to delete old agents from password")
  915. deletionCount, err := c.Ent.Machine.Delete().Where(
  916. machine.LastHeartbeatLTE(time.Now().UTC().Add(-*agentsCfg.LoginPasswordDuration)),
  917. ).Where(
  918. machine.Not(machine.HasAlerts()),
  919. ).Where(
  920. machine.AuthTypeEQ(types.PasswordAuthType),
  921. ).Exec(c.CTX)
  922. log.Debugf("deleted %d entries", deletionCount)
  923. if err != nil {
  924. c.Log.Errorf("while auto-deleting expired machine (password) : %s", err)
  925. } else if deletionCount > 0 {
  926. c.Log.Infof("deleted %d expired machine (password auth)", deletionCount)
  927. }
  928. }
  929. }
  930. return nil
  931. }
  932. func (c *Client) FlushAlerts(MaxAge string, MaxItems int) error {
  933. var deletedByAge int
  934. var deletedByNbItem int
  935. var totalAlerts int
  936. var err error
  937. if !c.CanFlush {
  938. c.Log.Debug("a list is being imported, flushing later")
  939. return nil
  940. }
  941. c.Log.Debug("Flushing orphan alerts")
  942. c.FlushOrphans()
  943. c.Log.Debug("Done flushing orphan alerts")
  944. totalAlerts, err = c.TotalAlerts()
  945. if err != nil {
  946. c.Log.Warningf("FlushAlerts (max items count) : %s", err)
  947. return errors.Wrap(err, "unable to get alerts count")
  948. }
  949. c.Log.Debugf("FlushAlerts (Total alerts): %d", totalAlerts)
  950. if MaxAge != "" {
  951. filter := map[string][]string{
  952. "created_before": {MaxAge},
  953. }
  954. nbDeleted, err := c.DeleteAlertWithFilter(filter)
  955. if err != nil {
  956. c.Log.Warningf("FlushAlerts (max age) : %s", err)
  957. return errors.Wrapf(err, "unable to flush alerts with filter until: %s", MaxAge)
  958. }
  959. c.Log.Debugf("FlushAlerts (deleted max age alerts): %d", nbDeleted)
  960. deletedByAge = nbDeleted
  961. }
  962. if MaxItems > 0 {
  963. //We get the highest id for the alerts
  964. //We subtract MaxItems to avoid deleting alerts that are not old enough
  965. //This gives us the oldest alert that we want to keep
  966. //We then delete all the alerts with an id lower than this one
  967. //We can do this because the id is auto-increment, and the database won't reuse the same id twice
  968. lastAlert, err := c.QueryAlertWithFilter(map[string][]string{
  969. "sort": {"DESC"},
  970. "limit": {"1"},
  971. })
  972. c.Log.Debugf("FlushAlerts (last alert): %+v", lastAlert)
  973. if err != nil {
  974. c.Log.Errorf("FlushAlerts: could not get last alert: %s", err)
  975. return errors.Wrap(err, "could not get last alert")
  976. }
  977. if len(lastAlert) != 0 {
  978. maxid := lastAlert[0].ID - MaxItems
  979. c.Log.Debugf("FlushAlerts (max id): %d", maxid)
  980. if maxid > 0 {
  981. //This may lead to orphan alerts (at least on MySQL), but the next time the flush job will run, they will be deleted
  982. deletedByNbItem, err = c.Ent.Alert.Delete().Where(alert.IDLT(maxid)).Exec(c.CTX)
  983. if err != nil {
  984. c.Log.Errorf("FlushAlerts: Could not delete alerts : %s", err)
  985. return errors.Wrap(err, "could not delete alerts")
  986. }
  987. }
  988. }
  989. }
  990. if deletedByNbItem > 0 {
  991. c.Log.Infof("flushed %d/%d alerts because max number of alerts has been reached (%d max)", deletedByNbItem, totalAlerts, MaxItems)
  992. }
  993. if deletedByAge > 0 {
  994. c.Log.Infof("flushed %d/%d alerts because they were created %s ago or more", deletedByAge, totalAlerts, MaxAge)
  995. }
  996. return nil
  997. }
  998. func (c *Client) GetAlertByID(alertID int) (*ent.Alert, error) {
  999. alert, err := c.Ent.Alert.Query().Where(alert.IDEQ(alertID)).WithDecisions().WithEvents().WithMetas().WithOwner().First(c.CTX)
  1000. if err != nil {
  1001. /*record not found, 404*/
  1002. if ent.IsNotFound(err) {
  1003. log.Warningf("GetAlertByID (not found): %s", err)
  1004. return &ent.Alert{}, ItemNotFound
  1005. }
  1006. c.Log.Warningf("GetAlertByID : %s", err)
  1007. return &ent.Alert{}, QueryFail
  1008. }
  1009. return alert, nil
  1010. }