alerts.go 31 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973
  1. package database
  2. import (
  3. "encoding/json"
  4. "fmt"
  5. "strconv"
  6. "strings"
  7. "time"
  8. "github.com/crowdsecurity/crowdsec/pkg/database/ent"
  9. "github.com/crowdsecurity/crowdsec/pkg/database/ent/alert"
  10. "github.com/crowdsecurity/crowdsec/pkg/database/ent/decision"
  11. "github.com/crowdsecurity/crowdsec/pkg/database/ent/event"
  12. "github.com/crowdsecurity/crowdsec/pkg/database/ent/meta"
  13. "github.com/crowdsecurity/crowdsec/pkg/models"
  14. "github.com/crowdsecurity/crowdsec/pkg/types"
  15. "github.com/davecgh/go-spew/spew"
  16. "github.com/pkg/errors"
  17. log "github.com/sirupsen/logrus"
  18. )
  19. const (
  20. paginationSize = 100 // used to queryAlert to avoid 'too many SQL variable'
  21. defaultLimit = 100 // default limit of element to returns when query alerts
  22. bulkSize = 50 // bulk size when create alerts
  23. decisionBulkSize = 50
  24. )
  25. func formatAlertAsString(machineId string, alert *models.Alert) []string {
  26. var retStr []string
  27. /**/
  28. src := ""
  29. if alert.Source != nil {
  30. if *alert.Source.Scope == types.Ip {
  31. src = fmt.Sprintf("ip %s", *alert.Source.Value)
  32. if alert.Source.Cn != "" {
  33. src += " (" + alert.Source.Cn
  34. if alert.Source.AsNumber != "" {
  35. src += "/" + alert.Source.AsNumber
  36. }
  37. src += ")"
  38. }
  39. } else if *alert.Source.Scope == types.Range {
  40. src = fmt.Sprintf("range %s", *alert.Source.Value)
  41. if alert.Source.Cn != "" {
  42. src += " (" + alert.Source.Cn
  43. if alert.Source.AsNumber != "" {
  44. src += "/" + alert.Source.AsNumber
  45. }
  46. src += ")"
  47. }
  48. } else {
  49. src = fmt.Sprintf("%s %s", *alert.Source.Scope, *alert.Source.Value)
  50. }
  51. } else {
  52. src = "empty source"
  53. }
  54. /**/
  55. reason := ""
  56. if *alert.Scenario != "" {
  57. reason = fmt.Sprintf("%s by %s", *alert.Scenario, src)
  58. } else if *alert.Message != "" {
  59. reason = fmt.Sprintf("%s by %s", *alert.Scenario, src)
  60. } else {
  61. reason = fmt.Sprintf("empty scenario by %s", src)
  62. }
  63. if len(alert.Decisions) > 0 {
  64. for _, decisionItem := range alert.Decisions {
  65. decision := ""
  66. if alert.Simulated != nil && *alert.Simulated {
  67. decision = "(simulated alert)"
  68. } else if decisionItem.Simulated != nil && *decisionItem.Simulated {
  69. decision = "(simulated decision)"
  70. }
  71. if log.GetLevel() >= log.DebugLevel {
  72. /*spew is expensive*/
  73. log.Debugf("%s", spew.Sdump(decisionItem))
  74. }
  75. decision += fmt.Sprintf("%s %s on %s %s", *decisionItem.Duration,
  76. *decisionItem.Type, *decisionItem.Scope, *decisionItem.Value)
  77. retStr = append(retStr,
  78. fmt.Sprintf("(%s/%s) %s : %s", machineId,
  79. *decisionItem.Origin, reason, decision))
  80. }
  81. } else {
  82. retStr = append(retStr, fmt.Sprintf("(%s) alert : %s", machineId, reason))
  83. }
  84. return retStr
  85. }
  86. func (c *Client) CreateAlert(machineID string, alertList []*models.Alert) ([]string, error) {
  87. pageStart := 0
  88. pageEnd := bulkSize
  89. ret := []string{}
  90. for {
  91. if pageEnd >= len(alertList) {
  92. results, err := c.CreateAlertBulk(machineID, alertList[pageStart:])
  93. if err != nil {
  94. return []string{}, fmt.Errorf("unable to create alerts: %s", err)
  95. }
  96. ret = append(ret, results...)
  97. break
  98. }
  99. results, err := c.CreateAlertBulk(machineID, alertList[pageStart:pageEnd])
  100. if err != nil {
  101. return []string{}, fmt.Errorf("unable to create alerts: %s", err)
  102. }
  103. ret = append(ret, results...)
  104. pageStart += bulkSize
  105. pageEnd += bulkSize
  106. }
  107. return ret, nil
  108. }
  109. /*We can't bulk both the alert and the decision at the same time. With new consensus, we want to bulk a single alert with a lot of decisions.*/
  110. func (c *Client) UpdateCommunityBlocklist(alertItem *models.Alert) (int, int, int, error) {
  111. var err error
  112. var deleted, inserted int
  113. if alertItem == nil {
  114. return 0, 0, 0, fmt.Errorf("nil alert")
  115. }
  116. if alertItem.StartAt == nil {
  117. return 0, 0, 0, fmt.Errorf("nil start_at")
  118. }
  119. startAtTime, err := time.Parse(time.RFC3339, *alertItem.StartAt)
  120. if err != nil {
  121. return 0, 0, 0, errors.Wrapf(ParseTimeFail, "start_at field time '%s': %s", *alertItem.StartAt, err)
  122. }
  123. if alertItem.StopAt == nil {
  124. return 0, 0, 0, fmt.Errorf("nil stop_at")
  125. }
  126. stopAtTime, err := time.Parse(time.RFC3339, *alertItem.StopAt)
  127. if err != nil {
  128. return 0, 0, 0, errors.Wrapf(ParseTimeFail, "stop_at field time '%s': %s", *alertItem.StopAt, err)
  129. }
  130. ts, err := time.Parse(time.RFC3339, *alertItem.StopAt)
  131. if err != nil {
  132. c.Log.Errorf("While parsing StartAt of item %s : %s", *alertItem.StopAt, err)
  133. ts = time.Now().UTC()
  134. }
  135. alertB := c.Ent.Alert.
  136. Create().
  137. SetScenario(*alertItem.Scenario).
  138. SetMessage(*alertItem.Message).
  139. SetEventsCount(*alertItem.EventsCount).
  140. SetStartedAt(startAtTime).
  141. SetStoppedAt(stopAtTime).
  142. SetSourceScope(*alertItem.Source.Scope).
  143. SetSourceValue(*alertItem.Source.Value).
  144. SetSourceIp(alertItem.Source.IP).
  145. SetSourceRange(alertItem.Source.Range).
  146. SetSourceAsNumber(alertItem.Source.AsNumber).
  147. SetSourceAsName(alertItem.Source.AsName).
  148. SetSourceCountry(alertItem.Source.Cn).
  149. SetSourceLatitude(alertItem.Source.Latitude).
  150. SetSourceLongitude(alertItem.Source.Longitude).
  151. SetCapacity(*alertItem.Capacity).
  152. SetLeakSpeed(*alertItem.Leakspeed).
  153. SetSimulated(*alertItem.Simulated).
  154. SetScenarioVersion(*alertItem.ScenarioVersion).
  155. SetScenarioHash(*alertItem.ScenarioHash)
  156. alertRef, err := alertB.Save(c.CTX)
  157. if err != nil {
  158. return 0, 0, 0, errors.Wrapf(BulkError, "error creating alert : %s", err)
  159. }
  160. if len(alertItem.Decisions) > 0 {
  161. decisionBulk := make([]*ent.DecisionCreate, 0, decisionBulkSize)
  162. valueList := make([]string, 0, decisionBulkSize)
  163. for i, decisionItem := range alertItem.Decisions {
  164. var start_ip, start_sfx, end_ip, end_sfx int64
  165. var sz int
  166. if decisionItem.Duration == nil {
  167. log.Warningf("nil duration in community decision")
  168. continue
  169. }
  170. duration, err := time.ParseDuration(*decisionItem.Duration)
  171. if err != nil {
  172. return 0, 0, 0, errors.Wrapf(ParseDurationFail, "decision duration '%v' : %s", decisionItem.Duration, err)
  173. }
  174. if decisionItem.Scope == nil {
  175. log.Warningf("nil scope in community decision")
  176. continue
  177. }
  178. /*if the scope is IP or Range, convert the value to integers */
  179. if strings.ToLower(*decisionItem.Scope) == "ip" || strings.ToLower(*decisionItem.Scope) == "range" {
  180. sz, start_ip, start_sfx, end_ip, end_sfx, err = types.Addr2Ints(*decisionItem.Value)
  181. if err != nil {
  182. return 0, 0, 0, errors.Wrapf(ParseDurationFail, "invalid addr/range %s : %s", *decisionItem.Value, err)
  183. }
  184. }
  185. /*bulk insert some new decisions*/
  186. decisionBulk = append(decisionBulk, c.Ent.Decision.Create().
  187. SetUntil(ts.Add(duration)).
  188. SetScenario(*decisionItem.Scenario).
  189. SetType(*decisionItem.Type).
  190. SetStartIP(start_ip).
  191. SetStartSuffix(start_sfx).
  192. SetEndIP(end_ip).
  193. SetEndSuffix(end_sfx).
  194. SetIPSize(int64(sz)).
  195. SetValue(*decisionItem.Value).
  196. SetScope(*decisionItem.Scope).
  197. SetOrigin(*decisionItem.Origin).
  198. SetSimulated(*alertItem.Simulated).
  199. SetOwner(alertRef))
  200. /*for bulk delete of duplicate decisions*/
  201. if decisionItem.Value == nil {
  202. log.Warningf("nil value in community decision")
  203. continue
  204. }
  205. valueList = append(valueList, *decisionItem.Value)
  206. if len(decisionBulk) == decisionBulkSize {
  207. insertedDecisions, err := c.Ent.Decision.CreateBulk(decisionBulk...).Save(c.CTX)
  208. if err != nil {
  209. return 0, 0, 0, errors.Wrapf(BulkError, "bulk creating decisions : %s", err)
  210. }
  211. inserted += len(insertedDecisions)
  212. /*Deleting older decisions from capi*/
  213. deletedDecisions, err := c.Ent.Decision.Delete().
  214. Where(decision.And(
  215. decision.OriginEQ(CapiMachineID),
  216. decision.Not(decision.HasOwnerWith(alert.IDEQ(alertRef.ID))),
  217. decision.ValueIn(valueList...),
  218. )).Exec(c.CTX)
  219. if err != nil {
  220. return 0, 0, 0, errors.Wrap(err, "while deleting older community blocklist decisions")
  221. }
  222. deleted += deletedDecisions
  223. if len(alertItem.Decisions)-i <= decisionBulkSize {
  224. decisionBulk = make([]*ent.DecisionCreate, 0, (len(alertItem.Decisions) - i))
  225. valueList = make([]string, 0, (len(alertItem.Decisions) - i))
  226. } else {
  227. decisionBulk = make([]*ent.DecisionCreate, 0, decisionBulkSize)
  228. valueList = make([]string, 0, decisionBulkSize)
  229. }
  230. // The 90's called, they want their concurrency back.
  231. // This is needed for sqlite, which does not support concurrent access while writing.
  232. // If we pull a large number of IPs from CAPI, and we have a slow disk, LAPI won't respond until all IPs are inserted (which can take up to a few seconds).
  233. time.Sleep(100 * time.Millisecond)
  234. }
  235. }
  236. insertedDecisions, err := c.Ent.Decision.CreateBulk(decisionBulk...).Save(c.CTX)
  237. if err != nil {
  238. return 0, 0, 0, errors.Wrapf(BulkError, "creating alert decisions: %s", err)
  239. }
  240. inserted += len(insertedDecisions)
  241. /*Deleting older decisions from capi*/
  242. if len(valueList) > 0 {
  243. deletedDecisions, err := c.Ent.Decision.Delete().
  244. Where(decision.And(
  245. decision.OriginEQ(CapiMachineID),
  246. decision.Not(decision.HasOwnerWith(alert.IDEQ(alertRef.ID))),
  247. decision.ValueIn(valueList...),
  248. )).Exec(c.CTX)
  249. if err != nil {
  250. return 0, 0, 0, errors.Wrap(err, "while deleting older community blocklist decisions")
  251. }
  252. deleted += deletedDecisions
  253. }
  254. }
  255. return alertRef.ID, inserted, deleted, nil
  256. }
  257. func chunkDecisions(decisions []*ent.Decision, chunkSize int) [][]*ent.Decision {
  258. var ret [][]*ent.Decision
  259. var chunk []*ent.Decision
  260. for _, d := range decisions {
  261. chunk = append(chunk, d)
  262. if len(chunk) == chunkSize {
  263. ret = append(ret, chunk)
  264. chunk = nil
  265. }
  266. }
  267. if len(chunk) > 0 {
  268. ret = append(ret, chunk)
  269. }
  270. return ret
  271. }
  272. func (c *Client) CreateAlertBulk(machineId string, alertList []*models.Alert) ([]string, error) {
  273. ret := []string{}
  274. bulkSize := 20
  275. c.Log.Debugf("writting %d items", len(alertList))
  276. bulk := make([]*ent.AlertCreate, 0, bulkSize)
  277. alertDecisions := make([][]*ent.Decision, 0, bulkSize)
  278. for i, alertItem := range alertList {
  279. var decisions []*ent.Decision
  280. var metas []*ent.Meta
  281. var events []*ent.Event
  282. owner, err := c.QueryMachineByID(machineId)
  283. if err != nil {
  284. if errors.Cause(err) != UserNotExists {
  285. return []string{}, errors.Wrapf(QueryFail, "machine '%s': %s", alertItem.MachineID, err)
  286. }
  287. c.Log.Debugf("CreateAlertBulk: Machine Id %s doesn't exist", machineId)
  288. owner = nil
  289. }
  290. startAtTime, err := time.Parse(time.RFC3339, *alertItem.StartAt)
  291. if err != nil {
  292. return []string{}, errors.Wrapf(ParseTimeFail, "start_at field time '%s': %s", *alertItem.StartAt, err)
  293. }
  294. stopAtTime, err := time.Parse(time.RFC3339, *alertItem.StopAt)
  295. if err != nil {
  296. return []string{}, errors.Wrapf(ParseTimeFail, "stop_at field time '%s': %s", *alertItem.StopAt, err)
  297. }
  298. /*display proper alert in logs*/
  299. for _, disp := range formatAlertAsString(machineId, alertItem) {
  300. c.Log.Info(disp)
  301. }
  302. //let's track when we strip or drop data, notify outside of loop to avoid spam
  303. stripped := false
  304. dropped := false
  305. if len(alertItem.Events) > 0 {
  306. eventBulk := make([]*ent.EventCreate, len(alertItem.Events))
  307. for i, eventItem := range alertItem.Events {
  308. ts, err := time.Parse(time.RFC3339, *eventItem.Timestamp)
  309. if err != nil {
  310. return []string{}, errors.Wrapf(ParseTimeFail, "event timestamp '%s' : %s", *eventItem.Timestamp, err)
  311. }
  312. marshallMetas, err := json.Marshal(eventItem.Meta)
  313. if err != nil {
  314. return []string{}, errors.Wrapf(MarshalFail, "event meta '%v' : %s", eventItem.Meta, err)
  315. }
  316. //the serialized field is too big, let's try to progressively strip it
  317. if event.SerializedValidator(string(marshallMetas)) != nil {
  318. stripped = true
  319. valid := false
  320. stripSize := 2048
  321. for !valid && stripSize > 0 {
  322. for _, serializedItem := range eventItem.Meta {
  323. if len(serializedItem.Value) > stripSize*2 {
  324. serializedItem.Value = serializedItem.Value[:stripSize] + "<stripped>"
  325. }
  326. }
  327. marshallMetas, err = json.Marshal(eventItem.Meta)
  328. if err != nil {
  329. return []string{}, errors.Wrapf(MarshalFail, "event meta '%v' : %s", eventItem.Meta, err)
  330. }
  331. if event.SerializedValidator(string(marshallMetas)) == nil {
  332. valid = true
  333. }
  334. stripSize /= 2
  335. }
  336. //nothing worked, drop it
  337. if !valid {
  338. dropped = true
  339. stripped = false
  340. marshallMetas = []byte("")
  341. }
  342. }
  343. eventBulk[i] = c.Ent.Event.Create().
  344. SetTime(ts).
  345. SetSerialized(string(marshallMetas))
  346. }
  347. if stripped {
  348. c.Log.Warningf("stripped 'serialized' field (machine %s / scenario %s)", machineId, *alertItem.Scenario)
  349. }
  350. if dropped {
  351. c.Log.Warningf("dropped 'serialized' field (machine %s / scenario %s)", machineId, *alertItem.Scenario)
  352. }
  353. events, err = c.Ent.Event.CreateBulk(eventBulk...).Save(c.CTX)
  354. if err != nil {
  355. return []string{}, errors.Wrapf(BulkError, "creating alert events: %s", err)
  356. }
  357. }
  358. if len(alertItem.Meta) > 0 {
  359. metaBulk := make([]*ent.MetaCreate, len(alertItem.Meta))
  360. for i, metaItem := range alertItem.Meta {
  361. metaBulk[i] = c.Ent.Meta.Create().
  362. SetKey(metaItem.Key).
  363. SetValue(metaItem.Value)
  364. }
  365. metas, err = c.Ent.Meta.CreateBulk(metaBulk...).Save(c.CTX)
  366. if err != nil {
  367. return []string{}, errors.Wrapf(BulkError, "creating alert meta: %s", err)
  368. }
  369. }
  370. ts, err := time.Parse(time.RFC3339, *alertItem.StopAt)
  371. if err != nil {
  372. c.Log.Errorf("While parsing StartAt of item %s : %s", *alertItem.StopAt, err)
  373. ts = time.Now().UTC()
  374. }
  375. decisions = make([]*ent.Decision, 0)
  376. if len(alertItem.Decisions) > 0 {
  377. decisionBulk := make([]*ent.DecisionCreate, 0, decisionBulkSize)
  378. for i, decisionItem := range alertItem.Decisions {
  379. var start_ip, start_sfx, end_ip, end_sfx int64
  380. var sz int
  381. duration, err := time.ParseDuration(*decisionItem.Duration)
  382. if err != nil {
  383. return []string{}, errors.Wrapf(ParseDurationFail, "decision duration '%v' : %s", decisionItem.Duration, err)
  384. }
  385. /*if the scope is IP or Range, convert the value to integers */
  386. if strings.ToLower(*decisionItem.Scope) == "ip" || strings.ToLower(*decisionItem.Scope) == "range" {
  387. sz, start_ip, start_sfx, end_ip, end_sfx, err = types.Addr2Ints(*decisionItem.Value)
  388. if err != nil {
  389. return []string{}, errors.Wrapf(ParseDurationFail, "invalid addr/range %s : %s", *decisionItem.Value, err)
  390. }
  391. }
  392. decisionCreate := c.Ent.Decision.Create().
  393. SetUntil(ts.Add(duration)).
  394. SetScenario(*decisionItem.Scenario).
  395. SetType(*decisionItem.Type).
  396. SetStartIP(start_ip).
  397. SetStartSuffix(start_sfx).
  398. SetEndIP(end_ip).
  399. SetEndSuffix(end_sfx).
  400. SetIPSize(int64(sz)).
  401. SetValue(*decisionItem.Value).
  402. SetScope(*decisionItem.Scope).
  403. SetOrigin(*decisionItem.Origin).
  404. SetSimulated(*alertItem.Simulated)
  405. decisionBulk = append(decisionBulk, decisionCreate)
  406. if len(decisionBulk) == decisionBulkSize {
  407. decisionsCreateRet, err := c.Ent.Decision.CreateBulk(decisionBulk...).Save(c.CTX)
  408. if err != nil {
  409. return []string{}, errors.Wrapf(BulkError, "creating alert decisions: %s", err)
  410. }
  411. decisions = append(decisions, decisionsCreateRet...)
  412. if len(alertItem.Decisions)-i <= decisionBulkSize {
  413. decisionBulk = make([]*ent.DecisionCreate, 0, (len(alertItem.Decisions) - i))
  414. } else {
  415. decisionBulk = make([]*ent.DecisionCreate, 0, decisionBulkSize)
  416. }
  417. }
  418. }
  419. decisionsCreateRet, err := c.Ent.Decision.CreateBulk(decisionBulk...).Save(c.CTX)
  420. if err != nil {
  421. return []string{}, errors.Wrapf(BulkError, "creating alert decisions: %s", err)
  422. }
  423. decisions = append(decisions, decisionsCreateRet...)
  424. }
  425. alertB := c.Ent.Alert.
  426. Create().
  427. SetScenario(*alertItem.Scenario).
  428. SetMessage(*alertItem.Message).
  429. SetEventsCount(*alertItem.EventsCount).
  430. SetStartedAt(startAtTime).
  431. SetStoppedAt(stopAtTime).
  432. SetSourceScope(*alertItem.Source.Scope).
  433. SetSourceValue(*alertItem.Source.Value).
  434. SetSourceIp(alertItem.Source.IP).
  435. SetSourceRange(alertItem.Source.Range).
  436. SetSourceAsNumber(alertItem.Source.AsNumber).
  437. SetSourceAsName(alertItem.Source.AsName).
  438. SetSourceCountry(alertItem.Source.Cn).
  439. SetSourceLatitude(alertItem.Source.Latitude).
  440. SetSourceLongitude(alertItem.Source.Longitude).
  441. SetCapacity(*alertItem.Capacity).
  442. SetLeakSpeed(*alertItem.Leakspeed).
  443. SetSimulated(*alertItem.Simulated).
  444. SetScenarioVersion(*alertItem.ScenarioVersion).
  445. SetScenarioHash(*alertItem.ScenarioHash).
  446. AddEvents(events...).
  447. AddMetas(metas...)
  448. if owner != nil {
  449. alertB.SetOwner(owner)
  450. }
  451. bulk = append(bulk, alertB)
  452. alertDecisions = append(alertDecisions, decisions)
  453. if len(bulk) == bulkSize {
  454. alerts, err := c.Ent.Alert.CreateBulk(bulk...).Save(c.CTX)
  455. if err != nil {
  456. return []string{}, errors.Wrapf(BulkError, "bulk creating alert : %s", err)
  457. }
  458. for alertIndex, a := range alerts {
  459. ret = append(ret, strconv.Itoa(a.ID))
  460. d := alertDecisions[alertIndex]
  461. decisionsChunk := chunkDecisions(d, bulkSize)
  462. for _, d2 := range decisionsChunk {
  463. _, err := c.Ent.Alert.Update().Where(alert.IDEQ(a.ID)).AddDecisions(d2...).Save(c.CTX)
  464. if err != nil {
  465. return []string{}, fmt.Errorf("error while updating decisions: %s", err.Error())
  466. }
  467. }
  468. }
  469. if len(alertList)-i <= bulkSize {
  470. bulk = make([]*ent.AlertCreate, 0, (len(alertList) - i))
  471. alertDecisions = make([][]*ent.Decision, 0, (len(alertList) - i))
  472. } else {
  473. bulk = make([]*ent.AlertCreate, 0, bulkSize)
  474. alertDecisions = make([][]*ent.Decision, 0, bulkSize)
  475. }
  476. }
  477. }
  478. alerts, err := c.Ent.Alert.CreateBulk(bulk...).Save(c.CTX)
  479. if err != nil {
  480. return []string{}, errors.Wrapf(BulkError, "leftovers creating alert : %s", err)
  481. }
  482. for alertIndex, a := range alerts {
  483. ret = append(ret, strconv.Itoa(a.ID))
  484. d := alertDecisions[alertIndex]
  485. decisionsChunk := chunkDecisions(d, bulkSize)
  486. for _, d2 := range decisionsChunk {
  487. _, err := c.Ent.Alert.Update().Where(alert.IDEQ(a.ID)).AddDecisions(d2...).Save(c.CTX)
  488. if err != nil {
  489. return []string{}, fmt.Errorf("error while updating decisions: %s", err.Error())
  490. }
  491. }
  492. }
  493. return ret, nil
  494. }
  495. func BuildAlertRequestFromFilter(alerts *ent.AlertQuery, filter map[string][]string) (*ent.AlertQuery, error) {
  496. var err error
  497. var start_ip, start_sfx, end_ip, end_sfx int64
  498. var hasActiveDecision bool
  499. var ip_sz int
  500. var contains bool = true
  501. /*if contains is true, return bans that *contains* the given value (value is the inner)
  502. else, return bans that are *contained* by the given value (value is the outer)*/
  503. /*the simulated filter is a bit different : if it's not present *or* set to false, specifically exclude records with simulated to true */
  504. if v, ok := filter["simulated"]; ok {
  505. if v[0] == "false" {
  506. alerts = alerts.Where(alert.SimulatedEQ(false))
  507. }
  508. delete(filter, "simulated")
  509. }
  510. if _, ok := filter["origin"]; ok {
  511. filter["include_capi"] = []string{"true"}
  512. }
  513. for param, value := range filter {
  514. switch param {
  515. case "contains":
  516. contains, err = strconv.ParseBool(value[0])
  517. if err != nil {
  518. return nil, errors.Wrapf(InvalidFilter, "invalid contains value : %s", err)
  519. }
  520. case "scope":
  521. var scope string = value[0]
  522. if strings.ToLower(scope) == "ip" {
  523. scope = types.Ip
  524. } else if strings.ToLower(scope) == "range" {
  525. scope = types.Range
  526. }
  527. alerts = alerts.Where(alert.SourceScopeEQ(scope))
  528. case "value":
  529. alerts = alerts.Where(alert.SourceValueEQ(value[0]))
  530. case "scenario":
  531. alerts = alerts.Where(alert.HasDecisionsWith(decision.ScenarioEQ(value[0])))
  532. case "ip", "range":
  533. ip_sz, start_ip, start_sfx, end_ip, end_sfx, err = types.Addr2Ints(value[0])
  534. if err != nil {
  535. return nil, errors.Wrapf(InvalidIPOrRange, "unable to convert '%s' to int: %s", value[0], err)
  536. }
  537. case "since":
  538. duration, err := types.ParseDuration(value[0])
  539. if err != nil {
  540. return nil, errors.Wrap(err, "while parsing duration")
  541. }
  542. since := time.Now().UTC().Add(-duration)
  543. if since.IsZero() {
  544. return nil, fmt.Errorf("Empty time now() - %s", since.String())
  545. }
  546. alerts = alerts.Where(alert.StartedAtGTE(since))
  547. case "created_before":
  548. duration, err := types.ParseDuration(value[0])
  549. if err != nil {
  550. return nil, errors.Wrap(err, "while parsing duration")
  551. }
  552. since := time.Now().UTC().Add(-duration)
  553. if since.IsZero() {
  554. return nil, fmt.Errorf("Empty time now() - %s", since.String())
  555. }
  556. alerts = alerts.Where(alert.CreatedAtLTE(since))
  557. case "until":
  558. duration, err := types.ParseDuration(value[0])
  559. if err != nil {
  560. return nil, errors.Wrap(err, "while parsing duration")
  561. }
  562. until := time.Now().UTC().Add(-duration)
  563. if until.IsZero() {
  564. return nil, fmt.Errorf("Empty time now() - %s", until.String())
  565. }
  566. alerts = alerts.Where(alert.StartedAtLTE(until))
  567. case "decision_type":
  568. alerts = alerts.Where(alert.HasDecisionsWith(decision.TypeEQ(value[0])))
  569. case "origin":
  570. alerts = alerts.Where(alert.HasDecisionsWith(decision.OriginEQ(value[0])))
  571. case "include_capi": //allows to exclude one or more specific origins
  572. if value[0] == "false" {
  573. alerts = alerts.Where(alert.HasDecisionsWith(decision.Or(decision.OriginEQ("crowdsec"), decision.OriginEQ("cscli"))))
  574. } else if value[0] != "true" {
  575. log.Errorf("Invalid bool '%s' for include_capi", value[0])
  576. }
  577. case "has_active_decision":
  578. if hasActiveDecision, err = strconv.ParseBool(value[0]); err != nil {
  579. return nil, errors.Wrapf(ParseType, "'%s' is not a boolean: %s", value[0], err)
  580. }
  581. if hasActiveDecision {
  582. alerts = alerts.Where(alert.HasDecisionsWith(decision.UntilGTE(time.Now().UTC())))
  583. } else {
  584. alerts = alerts.Where(alert.Not(alert.HasDecisions()))
  585. }
  586. case "limit":
  587. continue
  588. case "sort":
  589. continue
  590. default:
  591. return nil, errors.Wrapf(InvalidFilter, "Filter parameter '%s' is unknown (=%s)", param, value[0])
  592. }
  593. }
  594. if ip_sz == 4 {
  595. if contains { /*decision contains {start_ip,end_ip}*/
  596. alerts = alerts.Where(alert.And(
  597. alert.HasDecisionsWith(decision.StartIPLTE(start_ip)),
  598. alert.HasDecisionsWith(decision.EndIPGTE(end_ip)),
  599. alert.HasDecisionsWith(decision.IPSizeEQ(int64(ip_sz))),
  600. ))
  601. } else { /*decision is contained within {start_ip,end_ip}*/
  602. alerts = alerts.Where(alert.And(
  603. alert.HasDecisionsWith(decision.StartIPGTE(start_ip)),
  604. alert.HasDecisionsWith(decision.EndIPLTE(end_ip)),
  605. alert.HasDecisionsWith(decision.IPSizeEQ(int64(ip_sz))),
  606. ))
  607. }
  608. } else if ip_sz == 16 {
  609. if contains { /*decision contains {start_ip,end_ip}*/
  610. alerts = alerts.Where(alert.And(
  611. //matching addr size
  612. alert.HasDecisionsWith(decision.IPSizeEQ(int64(ip_sz))),
  613. alert.Or(
  614. //decision.start_ip < query.start_ip
  615. alert.HasDecisionsWith(decision.StartIPLT(start_ip)),
  616. alert.And(
  617. //decision.start_ip == query.start_ip
  618. alert.HasDecisionsWith(decision.StartIPEQ(start_ip)),
  619. //decision.start_suffix <= query.start_suffix
  620. alert.HasDecisionsWith(decision.StartSuffixLTE(start_sfx)),
  621. )),
  622. alert.Or(
  623. //decision.end_ip > query.end_ip
  624. alert.HasDecisionsWith(decision.EndIPGT(end_ip)),
  625. alert.And(
  626. //decision.end_ip == query.end_ip
  627. alert.HasDecisionsWith(decision.EndIPEQ(end_ip)),
  628. //decision.end_suffix >= query.end_suffix
  629. alert.HasDecisionsWith(decision.EndSuffixGTE(end_sfx)),
  630. ),
  631. ),
  632. ))
  633. } else { /*decision is contained within {start_ip,end_ip}*/
  634. alerts = alerts.Where(alert.And(
  635. //matching addr size
  636. alert.HasDecisionsWith(decision.IPSizeEQ(int64(ip_sz))),
  637. alert.Or(
  638. //decision.start_ip > query.start_ip
  639. alert.HasDecisionsWith(decision.StartIPGT(start_ip)),
  640. alert.And(
  641. //decision.start_ip == query.start_ip
  642. alert.HasDecisionsWith(decision.StartIPEQ(start_ip)),
  643. //decision.start_suffix >= query.start_suffix
  644. alert.HasDecisionsWith(decision.StartSuffixGTE(start_sfx)),
  645. )),
  646. alert.Or(
  647. //decision.end_ip < query.end_ip
  648. alert.HasDecisionsWith(decision.EndIPLT(end_ip)),
  649. alert.And(
  650. //decision.end_ip == query.end_ip
  651. alert.HasDecisionsWith(decision.EndIPEQ(end_ip)),
  652. //decision.end_suffix <= query.end_suffix
  653. alert.HasDecisionsWith(decision.EndSuffixLTE(end_sfx)),
  654. ),
  655. ),
  656. ))
  657. }
  658. } else if ip_sz != 0 {
  659. return nil, errors.Wrapf(InvalidFilter, "Unknown ip size %d", ip_sz)
  660. }
  661. return alerts, nil
  662. }
  663. func (c *Client) TotalAlerts() (int, error) {
  664. return c.Ent.Alert.Query().Count(c.CTX)
  665. }
  666. func (c *Client) QueryAlertWithFilter(filter map[string][]string) ([]*ent.Alert, error) {
  667. sort := "DESC" // we sort by desc by default
  668. if val, ok := filter["sort"]; ok {
  669. if val[0] != "ASC" && val[0] != "DESC" {
  670. c.Log.Errorf("invalid 'sort' parameter: %s", val)
  671. } else {
  672. sort = val[0]
  673. }
  674. }
  675. limit := defaultLimit
  676. if val, ok := filter["limit"]; ok {
  677. limitConv, err := strconv.Atoi(val[0])
  678. if err != nil {
  679. return []*ent.Alert{}, errors.Wrapf(QueryFail, "bad limit in parameters: %s", val)
  680. }
  681. limit = limitConv
  682. }
  683. offset := 0
  684. ret := make([]*ent.Alert, 0)
  685. for {
  686. alerts := c.Ent.Alert.Query()
  687. alerts, err := BuildAlertRequestFromFilter(alerts, filter)
  688. if err != nil {
  689. return []*ent.Alert{}, err
  690. }
  691. alerts = alerts.
  692. WithDecisions().
  693. WithEvents().
  694. WithMetas().
  695. WithOwner()
  696. if limit == 0 {
  697. limit, err = alerts.Count(c.CTX)
  698. if err != nil {
  699. return []*ent.Alert{}, fmt.Errorf("unable to count nb alerts: %s", err)
  700. }
  701. }
  702. if sort == "ASC" {
  703. alerts = alerts.Order(ent.Asc(alert.FieldCreatedAt))
  704. } else {
  705. alerts = alerts.Order(ent.Desc(alert.FieldCreatedAt))
  706. }
  707. result, err := alerts.Limit(paginationSize).Offset(offset).All(c.CTX)
  708. if err != nil {
  709. return []*ent.Alert{}, errors.Wrapf(QueryFail, "pagination size: %d, offset: %d: %s", paginationSize, offset, err)
  710. }
  711. if diff := limit - len(ret); diff < paginationSize {
  712. if len(result) < diff {
  713. ret = append(ret, result...)
  714. c.Log.Debugf("Pagination done, %d < %d", len(result), diff)
  715. break
  716. }
  717. ret = append(ret, result[0:diff]...)
  718. } else {
  719. ret = append(ret, result...)
  720. }
  721. if len(ret) == limit || len(ret) == 0 {
  722. c.Log.Debugf("Pagination done len(ret) = %d", len(ret))
  723. break
  724. }
  725. offset += paginationSize
  726. }
  727. return ret, nil
  728. }
  729. func (c *Client) DeleteAlertGraphBatch(alertItems []*ent.Alert) (int, error) {
  730. idList := make([]int, 0)
  731. for _, alert := range alertItems {
  732. idList = append(idList, int(alert.ID))
  733. }
  734. deleted, err := c.Ent.Alert.Delete().
  735. Where(alert.IDIn(idList...)).Exec(c.CTX)
  736. if err != nil {
  737. c.Log.Warningf("DeleteAlertGraph : %s", err)
  738. return deleted, errors.Wrapf(DeleteFail, "alert graph delete batch")
  739. }
  740. c.Log.Debug("Done batch delete alerts")
  741. return deleted, nil
  742. }
  743. func (c *Client) DeleteAlertGraph(alertItem *ent.Alert) error {
  744. // delete the associated events
  745. _, err := c.Ent.Event.Delete().
  746. Where(event.HasOwnerWith(alert.IDEQ(alertItem.ID))).Exec(c.CTX)
  747. if err != nil {
  748. c.Log.Warningf("DeleteAlertGraph : %s", err)
  749. return errors.Wrapf(DeleteFail, "event with alert ID '%d'", alertItem.ID)
  750. }
  751. // delete the associated meta
  752. _, err = c.Ent.Meta.Delete().
  753. Where(meta.HasOwnerWith(alert.IDEQ(alertItem.ID))).Exec(c.CTX)
  754. if err != nil {
  755. c.Log.Warningf("DeleteAlertGraph : %s", err)
  756. return errors.Wrapf(DeleteFail, "meta with alert ID '%d'", alertItem.ID)
  757. }
  758. // delete the associated decisions
  759. _, err = c.Ent.Decision.Delete().
  760. Where(decision.HasOwnerWith(alert.IDEQ(alertItem.ID))).Exec(c.CTX)
  761. if err != nil {
  762. c.Log.Warningf("DeleteAlertGraph : %s", err)
  763. return errors.Wrapf(DeleteFail, "decision with alert ID '%d'", alertItem.ID)
  764. }
  765. // delete the alert
  766. err = c.Ent.Alert.DeleteOne(alertItem).Exec(c.CTX)
  767. if err != nil {
  768. c.Log.Warningf("DeleteAlertGraph : %s", err)
  769. return errors.Wrapf(DeleteFail, "alert with ID '%d'", alertItem.ID)
  770. }
  771. return nil
  772. }
  773. func (c *Client) DeleteAlertWithFilter(filter map[string][]string) (int, error) {
  774. var err error
  775. // Get all the alerts that match the filter
  776. alertsToDelete, err := c.QueryAlertWithFilter(filter)
  777. for _, alertItem := range alertsToDelete {
  778. err = c.DeleteAlertGraph(alertItem)
  779. if err != nil {
  780. c.Log.Warningf("DeleteAlertWithFilter : %s", err)
  781. return 0, errors.Wrapf(DeleteFail, "event with alert ID '%d'", alertItem.ID)
  782. }
  783. }
  784. return len(alertsToDelete), nil
  785. }
  786. func (c *Client) FlushOrphans() {
  787. /* While it has only been linked to some very corner-case bug : https://github.com/crowdsecurity/crowdsec/issues/778 */
  788. /* We want to take care of orphaned events for which the parent alert/decision has been deleted */
  789. events_count, err := c.Ent.Event.Delete().Where(event.Not(event.HasOwner())).Exec(c.CTX)
  790. if err != nil {
  791. c.Log.Warningf("error while deleting orphan events : %s", err)
  792. return
  793. }
  794. if events_count > 0 {
  795. c.Log.Infof("%d deleted orphan events", events_count)
  796. }
  797. events_count, err = c.Ent.Decision.Delete().Where(
  798. decision.Not(decision.HasOwner())).Where(decision.UntilLTE(time.Now().UTC())).Exec(c.CTX)
  799. if err != nil {
  800. c.Log.Warningf("error while deleting orphan decisions : %s", err)
  801. return
  802. }
  803. if events_count > 0 {
  804. c.Log.Infof("%d deleted orphan decisions", events_count)
  805. }
  806. }
  807. func (c *Client) FlushAlerts(MaxAge string, MaxItems int) error {
  808. var deletedByAge int
  809. var deletedByNbItem int
  810. var totalAlerts int
  811. var err error
  812. if !c.CanFlush {
  813. c.Log.Debug("a list is being imported, flushing later")
  814. return nil
  815. }
  816. c.Log.Debug("Flushing orphan alerts")
  817. c.FlushOrphans()
  818. c.Log.Debug("Done flushing orphan alerts")
  819. totalAlerts, err = c.TotalAlerts()
  820. if err != nil {
  821. c.Log.Warningf("FlushAlerts (max items count) : %s", err)
  822. return errors.Wrap(err, "unable to get alerts count")
  823. }
  824. c.Log.Debugf("FlushAlerts (Total alerts): %d", totalAlerts)
  825. if MaxAge != "" {
  826. filter := map[string][]string{
  827. "created_before": {MaxAge},
  828. }
  829. nbDeleted, err := c.DeleteAlertWithFilter(filter)
  830. if err != nil {
  831. c.Log.Warningf("FlushAlerts (max age) : %s", err)
  832. return errors.Wrapf(err, "unable to flush alerts with filter until: %s", MaxAge)
  833. }
  834. c.Log.Debugf("FlushAlerts (deleted max age alerts): %d", nbDeleted)
  835. deletedByAge = nbDeleted
  836. }
  837. if MaxItems > 0 {
  838. //We get the highest id for the alerts
  839. //We substract MaxItems to avoid deleting alerts that are not old enough
  840. //This gives us the oldest alert that we want to keep
  841. //We then delete all the alerts with an id lower than this one
  842. //We can do this because the id is auto-increment, and the database won't reuse the same id twice
  843. lastAlert, err := c.QueryAlertWithFilter(map[string][]string{
  844. "sort": {"DESC"},
  845. "limit": {"1"},
  846. })
  847. c.Log.Debugf("FlushAlerts (last alert): %+v", lastAlert)
  848. if err != nil {
  849. c.Log.Errorf("FlushAlerts: could not get last alert: %s", err)
  850. return errors.Wrap(err, "could not get last alert")
  851. }
  852. if len(lastAlert) != 0 {
  853. maxid := lastAlert[0].ID - MaxItems
  854. c.Log.Debugf("FlushAlerts (max id): %d", maxid)
  855. if maxid > 0 {
  856. //This may lead to orphan alerts (at least on MySQL), but the next time the flush job will run, they will be deleted
  857. deletedByNbItem, err = c.Ent.Alert.Delete().Where(alert.IDLT(maxid)).Exec(c.CTX)
  858. if err != nil {
  859. c.Log.Errorf("FlushAlerts: Could not delete alerts : %s", err)
  860. return errors.Wrap(err, "could not delete alerts")
  861. }
  862. }
  863. }
  864. }
  865. if deletedByNbItem > 0 {
  866. c.Log.Infof("flushed %d/%d alerts because max number of alerts has been reached (%d max)", deletedByNbItem, totalAlerts, MaxItems)
  867. }
  868. if deletedByAge > 0 {
  869. c.Log.Infof("flushed %d/%d alerts because they were created %s ago or more", deletedByAge, totalAlerts, MaxAge)
  870. }
  871. return nil
  872. }
  873. func (c *Client) GetAlertByID(alertID int) (*ent.Alert, error) {
  874. alert, err := c.Ent.Alert.Query().Where(alert.IDEQ(alertID)).WithDecisions().WithEvents().WithMetas().WithOwner().First(c.CTX)
  875. if err != nil {
  876. /*record not found, 404*/
  877. if ent.IsNotFound(err) {
  878. log.Warningf("GetAlertByID (not found): %s", err)
  879. return &ent.Alert{}, ItemNotFound
  880. }
  881. c.Log.Warningf("GetAlertByID : %s", err)
  882. return &ent.Alert{}, QueryFail
  883. }
  884. return alert, nil
  885. }