alerts.go 31 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967
  1. package database
  2. import (
  3. "encoding/json"
  4. "fmt"
  5. "strconv"
  6. "strings"
  7. "time"
  8. "github.com/crowdsecurity/crowdsec/pkg/database/ent"
  9. "github.com/crowdsecurity/crowdsec/pkg/database/ent/alert"
  10. "github.com/crowdsecurity/crowdsec/pkg/database/ent/decision"
  11. "github.com/crowdsecurity/crowdsec/pkg/database/ent/event"
  12. "github.com/crowdsecurity/crowdsec/pkg/database/ent/meta"
  13. "github.com/crowdsecurity/crowdsec/pkg/models"
  14. "github.com/crowdsecurity/crowdsec/pkg/types"
  15. "github.com/davecgh/go-spew/spew"
  16. "github.com/pkg/errors"
  17. log "github.com/sirupsen/logrus"
  18. )
  19. const (
  20. paginationSize = 100 // used to queryAlert to avoid 'too many SQL variable'
  21. defaultLimit = 100 // default limit of element to returns when query alerts
  22. bulkSize = 50 // bulk size when create alerts
  23. decisionBulkSize = 50
  24. )
  25. func formatAlertAsString(machineId string, alert *models.Alert) []string {
  26. var retStr []string
  27. /**/
  28. src := ""
  29. if alert.Source != nil {
  30. if *alert.Source.Scope == types.Ip {
  31. src = fmt.Sprintf("ip %s", *alert.Source.Value)
  32. if alert.Source.Cn != "" {
  33. src += " (" + alert.Source.Cn
  34. if alert.Source.AsNumber != "" {
  35. src += "/" + alert.Source.AsNumber
  36. }
  37. src += ")"
  38. }
  39. } else if *alert.Source.Scope == types.Range {
  40. src = fmt.Sprintf("range %s", *alert.Source.Value)
  41. if alert.Source.Cn != "" {
  42. src += " (" + alert.Source.Cn
  43. if alert.Source.AsNumber != "" {
  44. src += "/" + alert.Source.AsNumber
  45. }
  46. src += ")"
  47. }
  48. } else {
  49. src = fmt.Sprintf("%s %s", *alert.Source.Scope, *alert.Source.Value)
  50. }
  51. } else {
  52. src = "empty source"
  53. }
  54. /**/
  55. reason := ""
  56. if *alert.Scenario != "" {
  57. reason = fmt.Sprintf("%s by %s", *alert.Scenario, src)
  58. } else if *alert.Message != "" {
  59. reason = fmt.Sprintf("%s by %s", *alert.Scenario, src)
  60. } else {
  61. reason = fmt.Sprintf("empty scenario by %s", src)
  62. }
  63. if len(alert.Decisions) > 0 {
  64. for _, decisionItem := range alert.Decisions {
  65. decision := ""
  66. if alert.Simulated != nil && *alert.Simulated {
  67. decision = "(simulated alert)"
  68. } else if decisionItem.Simulated != nil && *decisionItem.Simulated {
  69. decision = "(simulated decision)"
  70. }
  71. if log.GetLevel() >= log.DebugLevel {
  72. /*spew is expensive*/
  73. log.Debugf("%s", spew.Sdump(decisionItem))
  74. }
  75. decision += fmt.Sprintf("%s %s on %s %s", *decisionItem.Duration,
  76. *decisionItem.Type, *decisionItem.Scope, *decisionItem.Value)
  77. retStr = append(retStr,
  78. fmt.Sprintf("(%s/%s) %s : %s", machineId,
  79. *decisionItem.Origin, reason, decision))
  80. }
  81. } else {
  82. retStr = append(retStr, fmt.Sprintf("(%s) alert : %s", machineId, reason))
  83. }
  84. return retStr
  85. }
  86. func (c *Client) CreateAlert(machineID string, alertList []*models.Alert) ([]string, error) {
  87. pageStart := 0
  88. pageEnd := bulkSize
  89. ret := []string{}
  90. for {
  91. if pageEnd >= len(alertList) {
  92. results, err := c.CreateAlertBulk(machineID, alertList[pageStart:])
  93. if err != nil {
  94. return []string{}, fmt.Errorf("unable to create alerts: %s", err)
  95. }
  96. ret = append(ret, results...)
  97. break
  98. }
  99. results, err := c.CreateAlertBulk(machineID, alertList[pageStart:pageEnd])
  100. if err != nil {
  101. return []string{}, fmt.Errorf("unable to create alerts: %s", err)
  102. }
  103. ret = append(ret, results...)
  104. pageStart += bulkSize
  105. pageEnd += bulkSize
  106. }
  107. return ret, nil
  108. }
  109. /*We can't bulk both the alert and the decision at the same time. With new consensus, we want to bulk a single alert with a lot of decisions.*/
  110. func (c *Client) UpdateCommunityBlocklist(alertItem *models.Alert) (int, int, int, error) {
  111. var err error
  112. var deleted, inserted int
  113. if alertItem == nil {
  114. return 0, 0, 0, fmt.Errorf("nil alert")
  115. }
  116. if alertItem.StartAt == nil {
  117. return 0, 0, 0, fmt.Errorf("nil start_at")
  118. }
  119. startAtTime, err := time.Parse(time.RFC3339, *alertItem.StartAt)
  120. if err != nil {
  121. return 0, 0, 0, errors.Wrapf(ParseTimeFail, "start_at field time '%s': %s", *alertItem.StartAt, err)
  122. }
  123. if alertItem.StopAt == nil {
  124. return 0, 0, 0, fmt.Errorf("nil stop_at")
  125. }
  126. stopAtTime, err := time.Parse(time.RFC3339, *alertItem.StopAt)
  127. if err != nil {
  128. return 0, 0, 0, errors.Wrapf(ParseTimeFail, "stop_at field time '%s': %s", *alertItem.StopAt, err)
  129. }
  130. ts, err := time.Parse(time.RFC3339, *alertItem.StopAt)
  131. if err != nil {
  132. c.Log.Errorf("While parsing StartAt of item %s : %s", *alertItem.StopAt, err)
  133. ts = time.Now()
  134. }
  135. alertB := c.Ent.Alert.
  136. Create().
  137. SetScenario(*alertItem.Scenario).
  138. SetMessage(*alertItem.Message).
  139. SetEventsCount(*alertItem.EventsCount).
  140. SetStartedAt(startAtTime).
  141. SetStoppedAt(stopAtTime).
  142. SetSourceScope(*alertItem.Source.Scope).
  143. SetSourceValue(*alertItem.Source.Value).
  144. SetSourceIp(alertItem.Source.IP).
  145. SetSourceRange(alertItem.Source.Range).
  146. SetSourceAsNumber(alertItem.Source.AsNumber).
  147. SetSourceAsName(alertItem.Source.AsName).
  148. SetSourceCountry(alertItem.Source.Cn).
  149. SetSourceLatitude(alertItem.Source.Latitude).
  150. SetSourceLongitude(alertItem.Source.Longitude).
  151. SetCapacity(*alertItem.Capacity).
  152. SetLeakSpeed(*alertItem.Leakspeed).
  153. SetSimulated(*alertItem.Simulated).
  154. SetScenarioVersion(*alertItem.ScenarioVersion).
  155. SetScenarioHash(*alertItem.ScenarioHash)
  156. alertRef, err := alertB.Save(c.CTX)
  157. if err != nil {
  158. return 0, 0, 0, errors.Wrapf(BulkError, "error creating alert : %s", err)
  159. }
  160. if len(alertItem.Decisions) > 0 {
  161. decisionBulk := make([]*ent.DecisionCreate, 0, decisionBulkSize)
  162. valueList := make([]string, 0, decisionBulkSize)
  163. for i, decisionItem := range alertItem.Decisions {
  164. var start_ip, start_sfx, end_ip, end_sfx int64
  165. var sz int
  166. if decisionItem.Duration == nil {
  167. log.Warningf("nil duration in community decision")
  168. continue
  169. }
  170. duration, err := time.ParseDuration(*decisionItem.Duration)
  171. if err != nil {
  172. return 0, 0, 0, errors.Wrapf(ParseDurationFail, "decision duration '%v' : %s", decisionItem.Duration, err)
  173. }
  174. if decisionItem.Scope == nil {
  175. log.Warningf("nil scope in community decision")
  176. continue
  177. }
  178. /*if the scope is IP or Range, convert the value to integers */
  179. if strings.ToLower(*decisionItem.Scope) == "ip" || strings.ToLower(*decisionItem.Scope) == "range" {
  180. sz, start_ip, start_sfx, end_ip, end_sfx, err = types.Addr2Ints(*decisionItem.Value)
  181. if err != nil {
  182. return 0, 0, 0, errors.Wrapf(ParseDurationFail, "invalid addr/range %s : %s", *decisionItem.Value, err)
  183. }
  184. }
  185. /*bulk insert some new decisions*/
  186. decisionBulk = append(decisionBulk, c.Ent.Decision.Create().
  187. SetUntil(ts.Add(duration)).
  188. SetScenario(*decisionItem.Scenario).
  189. SetType(*decisionItem.Type).
  190. SetStartIP(start_ip).
  191. SetStartSuffix(start_sfx).
  192. SetEndIP(end_ip).
  193. SetEndSuffix(end_sfx).
  194. SetIPSize(int64(sz)).
  195. SetValue(*decisionItem.Value).
  196. SetScope(*decisionItem.Scope).
  197. SetOrigin(*decisionItem.Origin).
  198. SetSimulated(*alertItem.Simulated).
  199. SetOwner(alertRef))
  200. /*for bulk delete of duplicate decisions*/
  201. if decisionItem.Value == nil {
  202. log.Warningf("nil value in community decision")
  203. continue
  204. }
  205. valueList = append(valueList, *decisionItem.Value)
  206. if len(decisionBulk) == decisionBulkSize {
  207. insertedDecisions, err := c.Ent.Decision.CreateBulk(decisionBulk...).Save(c.CTX)
  208. if err != nil {
  209. return 0, 0, 0, errors.Wrapf(BulkError, "bulk creating decisions : %s", err)
  210. }
  211. inserted += len(insertedDecisions)
  212. /*Deleting older decisions from capi*/
  213. deletedDecisions, err := c.Ent.Decision.Delete().
  214. Where(decision.And(
  215. decision.OriginEQ(CapiMachineID),
  216. decision.Not(decision.HasOwnerWith(alert.IDEQ(alertRef.ID))),
  217. decision.ValueIn(valueList...),
  218. )).Exec(c.CTX)
  219. if err != nil {
  220. return 0, 0, 0, errors.Wrap(err, "while deleting older community blocklist decisions")
  221. }
  222. deleted += deletedDecisions
  223. if len(alertItem.Decisions)-i <= decisionBulkSize {
  224. decisionBulk = make([]*ent.DecisionCreate, 0, (len(alertItem.Decisions) - i))
  225. valueList = make([]string, 0, (len(alertItem.Decisions) - i))
  226. } else {
  227. decisionBulk = make([]*ent.DecisionCreate, 0, decisionBulkSize)
  228. valueList = make([]string, 0, decisionBulkSize)
  229. }
  230. // The 90's called, they want their concurrency back.
  231. // This is needed for sqlite, which does not support concurrent access while writing.
  232. // If we pull a large number of IPs from CAPI, and we have a slow disk, LAPI won't respond until all IPs are inserted (which can take up to a few seconds).
  233. time.Sleep(100 * time.Millisecond)
  234. }
  235. }
  236. insertedDecisions, err := c.Ent.Decision.CreateBulk(decisionBulk...).Save(c.CTX)
  237. if err != nil {
  238. return 0, 0, 0, errors.Wrapf(BulkError, "creating alert decisions: %s", err)
  239. }
  240. inserted += len(insertedDecisions)
  241. /*Deleting older decisions from capi*/
  242. if len(valueList) > 0 {
  243. deletedDecisions, err := c.Ent.Decision.Delete().
  244. Where(decision.And(
  245. decision.OriginEQ(CapiMachineID),
  246. decision.Not(decision.HasOwnerWith(alert.IDEQ(alertRef.ID))),
  247. decision.ValueIn(valueList...),
  248. )).Exec(c.CTX)
  249. if err != nil {
  250. return 0, 0, 0, errors.Wrap(err, "while deleting older community blocklist decisions")
  251. }
  252. deleted += deletedDecisions
  253. }
  254. }
  255. return alertRef.ID, inserted, deleted, nil
  256. }
  257. func chunkDecisions(decisions []*ent.Decision, chunkSize int) [][]*ent.Decision {
  258. var ret [][]*ent.Decision
  259. var chunk []*ent.Decision
  260. for _, d := range decisions {
  261. chunk = append(chunk, d)
  262. if len(chunk) == chunkSize {
  263. ret = append(ret, chunk)
  264. chunk = nil
  265. }
  266. }
  267. if len(chunk) > 0 {
  268. ret = append(ret, chunk)
  269. }
  270. return ret
  271. }
  272. func (c *Client) CreateAlertBulk(machineId string, alertList []*models.Alert) ([]string, error) {
  273. ret := []string{}
  274. bulkSize := 20
  275. c.Log.Debugf("writting %d items", len(alertList))
  276. bulk := make([]*ent.AlertCreate, 0, bulkSize)
  277. alertDecisions := make([][]*ent.Decision, 0, bulkSize)
  278. for i, alertItem := range alertList {
  279. var decisions []*ent.Decision
  280. var metas []*ent.Meta
  281. var events []*ent.Event
  282. owner, err := c.QueryMachineByID(machineId)
  283. if err != nil {
  284. if errors.Cause(err) != UserNotExists {
  285. return []string{}, errors.Wrapf(QueryFail, "machine '%s': %s", alertItem.MachineID, err)
  286. }
  287. c.Log.Debugf("CreateAlertBulk: Machine Id %s doesn't exist", machineId)
  288. owner = nil
  289. }
  290. startAtTime, err := time.Parse(time.RFC3339, *alertItem.StartAt)
  291. if err != nil {
  292. return []string{}, errors.Wrapf(ParseTimeFail, "start_at field time '%s': %s", *alertItem.StartAt, err)
  293. }
  294. stopAtTime, err := time.Parse(time.RFC3339, *alertItem.StopAt)
  295. if err != nil {
  296. return []string{}, errors.Wrapf(ParseTimeFail, "stop_at field time '%s': %s", *alertItem.StopAt, err)
  297. }
  298. /*display proper alert in logs*/
  299. for _, disp := range formatAlertAsString(machineId, alertItem) {
  300. c.Log.Info(disp)
  301. }
  302. //let's track when we strip or drop data, notify outside of loop to avoid spam
  303. stripped := false
  304. dropped := false
  305. if len(alertItem.Events) > 0 {
  306. eventBulk := make([]*ent.EventCreate, len(alertItem.Events))
  307. for i, eventItem := range alertItem.Events {
  308. ts, err := time.Parse(time.RFC3339, *eventItem.Timestamp)
  309. if err != nil {
  310. return []string{}, errors.Wrapf(ParseTimeFail, "event timestamp '%s' : %s", *eventItem.Timestamp, err)
  311. }
  312. marshallMetas, err := json.Marshal(eventItem.Meta)
  313. if err != nil {
  314. return []string{}, errors.Wrapf(MarshalFail, "event meta '%v' : %s", eventItem.Meta, err)
  315. }
  316. //the serialized field is too big, let's try to progressively strip it
  317. if event.SerializedValidator(string(marshallMetas)) != nil {
  318. stripped = true
  319. valid := false
  320. stripSize := 2048
  321. for !valid && stripSize > 0 {
  322. for _, serializedItem := range eventItem.Meta {
  323. if len(serializedItem.Value) > stripSize*2 {
  324. serializedItem.Value = serializedItem.Value[:stripSize] + "<stripped>"
  325. }
  326. }
  327. marshallMetas, err = json.Marshal(eventItem.Meta)
  328. if err != nil {
  329. return []string{}, errors.Wrapf(MarshalFail, "event meta '%v' : %s", eventItem.Meta, err)
  330. }
  331. if event.SerializedValidator(string(marshallMetas)) == nil {
  332. valid = true
  333. }
  334. stripSize /= 2
  335. }
  336. //nothing worked, drop it
  337. if !valid {
  338. dropped = true
  339. stripped = false
  340. marshallMetas = []byte("")
  341. }
  342. }
  343. eventBulk[i] = c.Ent.Event.Create().
  344. SetTime(ts).
  345. SetSerialized(string(marshallMetas))
  346. }
  347. if stripped {
  348. c.Log.Warningf("stripped 'serialized' field (machine %s / scenario %s)", machineId, *alertItem.Scenario)
  349. }
  350. if dropped {
  351. c.Log.Warningf("dropped 'serialized' field (machine %s / scenario %s)", machineId, *alertItem.Scenario)
  352. }
  353. events, err = c.Ent.Event.CreateBulk(eventBulk...).Save(c.CTX)
  354. if err != nil {
  355. return []string{}, errors.Wrapf(BulkError, "creating alert events: %s", err)
  356. }
  357. }
  358. if len(alertItem.Meta) > 0 {
  359. metaBulk := make([]*ent.MetaCreate, len(alertItem.Meta))
  360. for i, metaItem := range alertItem.Meta {
  361. metaBulk[i] = c.Ent.Meta.Create().
  362. SetKey(metaItem.Key).
  363. SetValue(metaItem.Value)
  364. }
  365. metas, err = c.Ent.Meta.CreateBulk(metaBulk...).Save(c.CTX)
  366. if err != nil {
  367. return []string{}, errors.Wrapf(BulkError, "creating alert meta: %s", err)
  368. }
  369. }
  370. ts, err := time.Parse(time.RFC3339, *alertItem.StopAt)
  371. if err != nil {
  372. c.Log.Errorf("While parsing StartAt of item %s : %s", *alertItem.StopAt, err)
  373. ts = time.Now()
  374. }
  375. decisions = make([]*ent.Decision, 0)
  376. if len(alertItem.Decisions) > 0 {
  377. decisionBulk := make([]*ent.DecisionCreate, 0, decisionBulkSize)
  378. for i, decisionItem := range alertItem.Decisions {
  379. var start_ip, start_sfx, end_ip, end_sfx int64
  380. var sz int
  381. duration, err := time.ParseDuration(*decisionItem.Duration)
  382. if err != nil {
  383. return []string{}, errors.Wrapf(ParseDurationFail, "decision duration '%v' : %s", decisionItem.Duration, err)
  384. }
  385. /*if the scope is IP or Range, convert the value to integers */
  386. if strings.ToLower(*decisionItem.Scope) == "ip" || strings.ToLower(*decisionItem.Scope) == "range" {
  387. sz, start_ip, start_sfx, end_ip, end_sfx, err = types.Addr2Ints(*decisionItem.Value)
  388. if err != nil {
  389. return []string{}, errors.Wrapf(ParseDurationFail, "invalid addr/range %s : %s", *decisionItem.Value, err)
  390. }
  391. }
  392. decisionCreate := c.Ent.Decision.Create().
  393. SetUntil(ts.Add(duration)).
  394. SetScenario(*decisionItem.Scenario).
  395. SetType(*decisionItem.Type).
  396. SetStartIP(start_ip).
  397. SetStartSuffix(start_sfx).
  398. SetEndIP(end_ip).
  399. SetEndSuffix(end_sfx).
  400. SetIPSize(int64(sz)).
  401. SetValue(*decisionItem.Value).
  402. SetScope(*decisionItem.Scope).
  403. SetOrigin(*decisionItem.Origin).
  404. SetSimulated(*alertItem.Simulated)
  405. decisionBulk = append(decisionBulk, decisionCreate)
  406. if len(decisionBulk) == decisionBulkSize {
  407. decisionsCreateRet, err := c.Ent.Decision.CreateBulk(decisionBulk...).Save(c.CTX)
  408. if err != nil {
  409. return []string{}, errors.Wrapf(BulkError, "creating alert decisions: %s", err)
  410. }
  411. decisions = append(decisions, decisionsCreateRet...)
  412. if len(alertItem.Decisions)-i <= decisionBulkSize {
  413. decisionBulk = make([]*ent.DecisionCreate, 0, (len(alertItem.Decisions) - i))
  414. } else {
  415. decisionBulk = make([]*ent.DecisionCreate, 0, decisionBulkSize)
  416. }
  417. }
  418. }
  419. decisionsCreateRet, err := c.Ent.Decision.CreateBulk(decisionBulk...).Save(c.CTX)
  420. if err != nil {
  421. return []string{}, errors.Wrapf(BulkError, "creating alert decisions: %s", err)
  422. }
  423. decisions = append(decisions, decisionsCreateRet...)
  424. }
  425. alertB := c.Ent.Alert.
  426. Create().
  427. SetScenario(*alertItem.Scenario).
  428. SetMessage(*alertItem.Message).
  429. SetEventsCount(*alertItem.EventsCount).
  430. SetStartedAt(startAtTime).
  431. SetStoppedAt(stopAtTime).
  432. SetSourceScope(*alertItem.Source.Scope).
  433. SetSourceValue(*alertItem.Source.Value).
  434. SetSourceIp(alertItem.Source.IP).
  435. SetSourceRange(alertItem.Source.Range).
  436. SetSourceAsNumber(alertItem.Source.AsNumber).
  437. SetSourceAsName(alertItem.Source.AsName).
  438. SetSourceCountry(alertItem.Source.Cn).
  439. SetSourceLatitude(alertItem.Source.Latitude).
  440. SetSourceLongitude(alertItem.Source.Longitude).
  441. SetCapacity(*alertItem.Capacity).
  442. SetLeakSpeed(*alertItem.Leakspeed).
  443. SetSimulated(*alertItem.Simulated).
  444. SetScenarioVersion(*alertItem.ScenarioVersion).
  445. SetScenarioHash(*alertItem.ScenarioHash).
  446. AddEvents(events...).
  447. AddMetas(metas...)
  448. if owner != nil {
  449. alertB.SetOwner(owner)
  450. }
  451. bulk = append(bulk, alertB)
  452. alertDecisions = append(alertDecisions, decisions)
  453. if len(bulk) == bulkSize {
  454. alerts, err := c.Ent.Alert.CreateBulk(bulk...).Save(c.CTX)
  455. if err != nil {
  456. return []string{}, errors.Wrapf(BulkError, "bulk creating alert : %s", err)
  457. }
  458. for _, a := range alerts {
  459. ret = append(ret, strconv.Itoa(a.ID))
  460. for _, d := range alertDecisions {
  461. decisionsChunk := chunkDecisions(d, bulkSize)
  462. for _, d2 := range decisionsChunk {
  463. _, err := c.Ent.Alert.Update().Where(alert.IDEQ(a.ID)).AddDecisions(d2...).Save(c.CTX)
  464. if err != nil {
  465. return []string{}, fmt.Errorf("error while updating decisions: %s", err.Error())
  466. }
  467. }
  468. }
  469. }
  470. if len(alertList)-i <= bulkSize {
  471. bulk = make([]*ent.AlertCreate, 0, (len(alertList) - i))
  472. alertDecisions = make([][]*ent.Decision, 0, (len(alertList) - i))
  473. } else {
  474. bulk = make([]*ent.AlertCreate, 0, bulkSize)
  475. alertDecisions = make([][]*ent.Decision, 0, bulkSize)
  476. }
  477. }
  478. }
  479. alerts, err := c.Ent.Alert.CreateBulk(bulk...).Save(c.CTX)
  480. if err != nil {
  481. return []string{}, errors.Wrapf(BulkError, "leftovers creating alert : %s", err)
  482. }
  483. for _, a := range alerts {
  484. ret = append(ret, strconv.Itoa(a.ID))
  485. for _, d := range alertDecisions {
  486. decisionsChunk := chunkDecisions(d, bulkSize)
  487. for _, d2 := range decisionsChunk {
  488. _, err := c.Ent.Alert.Update().Where(alert.IDEQ(a.ID)).AddDecisions(d2...).Save(c.CTX)
  489. if err != nil {
  490. return []string{}, fmt.Errorf("error while updating decisions: %s", err.Error())
  491. }
  492. }
  493. }
  494. }
  495. return ret, nil
  496. }
  497. func BuildAlertRequestFromFilter(alerts *ent.AlertQuery, filter map[string][]string) (*ent.AlertQuery, error) {
  498. var err error
  499. var start_ip, start_sfx, end_ip, end_sfx int64
  500. var hasActiveDecision bool
  501. var ip_sz int
  502. var contains bool = true
  503. /*if contains is true, return bans that *contains* the given value (value is the inner)
  504. else, return bans that are *contained* by the given value (value is the outer)*/
  505. /*the simulated filter is a bit different : if it's not present *or* set to false, specifically exclude records with simulated to true */
  506. if v, ok := filter["simulated"]; ok {
  507. if v[0] == "false" {
  508. alerts = alerts.Where(alert.SimulatedEQ(false))
  509. }
  510. delete(filter, "simulated")
  511. }
  512. for param, value := range filter {
  513. switch param {
  514. case "contains":
  515. contains, err = strconv.ParseBool(value[0])
  516. if err != nil {
  517. return nil, errors.Wrapf(InvalidFilter, "invalid contains value : %s", err)
  518. }
  519. case "scope":
  520. var scope string = value[0]
  521. if strings.ToLower(scope) == "ip" {
  522. scope = types.Ip
  523. } else if strings.ToLower(scope) == "range" {
  524. scope = types.Range
  525. }
  526. alerts = alerts.Where(alert.SourceScopeEQ(scope))
  527. case "value":
  528. alerts = alerts.Where(alert.SourceValueEQ(value[0]))
  529. case "scenario":
  530. alerts = alerts.Where(alert.ScenarioEQ(value[0]))
  531. case "ip", "range":
  532. ip_sz, start_ip, start_sfx, end_ip, end_sfx, err = types.Addr2Ints(value[0])
  533. if err != nil {
  534. return nil, errors.Wrapf(InvalidIPOrRange, "unable to convert '%s' to int: %s", value[0], err)
  535. }
  536. case "since":
  537. duration, err := types.ParseDuration(value[0])
  538. if err != nil {
  539. return nil, errors.Wrap(err, "while parsing duration")
  540. }
  541. since := time.Now().Add(-duration)
  542. if since.IsZero() {
  543. return nil, fmt.Errorf("Empty time now() - %s", since.String())
  544. }
  545. alerts = alerts.Where(alert.StartedAtGTE(since))
  546. case "created_before":
  547. duration, err := types.ParseDuration(value[0])
  548. if err != nil {
  549. return nil, errors.Wrap(err, "while parsing duration")
  550. }
  551. since := time.Now().Add(-duration)
  552. if since.IsZero() {
  553. return nil, fmt.Errorf("Empty time now() - %s", since.String())
  554. }
  555. alerts = alerts.Where(alert.CreatedAtLTE(since))
  556. case "until":
  557. duration, err := types.ParseDuration(value[0])
  558. if err != nil {
  559. return nil, errors.Wrap(err, "while parsing duration")
  560. }
  561. until := time.Now().Add(-duration)
  562. if until.IsZero() {
  563. return nil, fmt.Errorf("Empty time now() - %s", until.String())
  564. }
  565. alerts = alerts.Where(alert.StartedAtLTE(until))
  566. case "decision_type":
  567. alerts = alerts.Where(alert.HasDecisionsWith(decision.TypeEQ(value[0])))
  568. case "include_capi": //allows to exclude one or more specific origins
  569. if value[0] == "false" {
  570. alerts = alerts.Where(alert.HasDecisionsWith(decision.OriginNEQ(CapiMachineID)))
  571. } else if value[0] != "true" {
  572. log.Errorf("Invalid bool '%s' for include_capi", value[0])
  573. }
  574. case "has_active_decision":
  575. if hasActiveDecision, err = strconv.ParseBool(value[0]); err != nil {
  576. return nil, errors.Wrapf(ParseType, "'%s' is not a boolean: %s", value[0], err)
  577. }
  578. if hasActiveDecision {
  579. alerts = alerts.Where(alert.HasDecisionsWith(decision.UntilGTE(time.Now())))
  580. } else {
  581. alerts = alerts.Where(alert.Not(alert.HasDecisions()))
  582. }
  583. case "limit":
  584. continue
  585. case "sort":
  586. continue
  587. default:
  588. return nil, errors.Wrapf(InvalidFilter, "Filter parameter '%s' is unknown (=%s)", param, value[0])
  589. }
  590. }
  591. if ip_sz == 4 {
  592. if contains { /*decision contains {start_ip,end_ip}*/
  593. alerts = alerts.Where(alert.And(
  594. alert.HasDecisionsWith(decision.StartIPLTE(start_ip)),
  595. alert.HasDecisionsWith(decision.EndIPGTE(end_ip)),
  596. alert.HasDecisionsWith(decision.IPSizeEQ(int64(ip_sz))),
  597. ))
  598. } else { /*decision is contained within {start_ip,end_ip}*/
  599. alerts = alerts.Where(alert.And(
  600. alert.HasDecisionsWith(decision.StartIPGTE(start_ip)),
  601. alert.HasDecisionsWith(decision.EndIPLTE(end_ip)),
  602. alert.HasDecisionsWith(decision.IPSizeEQ(int64(ip_sz))),
  603. ))
  604. }
  605. } else if ip_sz == 16 {
  606. if contains { /*decision contains {start_ip,end_ip}*/
  607. alerts = alerts.Where(alert.And(
  608. //matching addr size
  609. alert.HasDecisionsWith(decision.IPSizeEQ(int64(ip_sz))),
  610. alert.Or(
  611. //decision.start_ip < query.start_ip
  612. alert.HasDecisionsWith(decision.StartIPLT(start_ip)),
  613. alert.And(
  614. //decision.start_ip == query.start_ip
  615. alert.HasDecisionsWith(decision.StartIPEQ(start_ip)),
  616. //decision.start_suffix <= query.start_suffix
  617. alert.HasDecisionsWith(decision.StartSuffixLTE(start_sfx)),
  618. )),
  619. alert.Or(
  620. //decision.end_ip > query.end_ip
  621. alert.HasDecisionsWith(decision.EndIPGT(end_ip)),
  622. alert.And(
  623. //decision.end_ip == query.end_ip
  624. alert.HasDecisionsWith(decision.EndIPEQ(end_ip)),
  625. //decision.end_suffix >= query.end_suffix
  626. alert.HasDecisionsWith(decision.EndSuffixGTE(end_sfx)),
  627. ),
  628. ),
  629. ))
  630. } else { /*decision is contained within {start_ip,end_ip}*/
  631. alerts = alerts.Where(alert.And(
  632. //matching addr size
  633. alert.HasDecisionsWith(decision.IPSizeEQ(int64(ip_sz))),
  634. alert.Or(
  635. //decision.start_ip > query.start_ip
  636. alert.HasDecisionsWith(decision.StartIPGT(start_ip)),
  637. alert.And(
  638. //decision.start_ip == query.start_ip
  639. alert.HasDecisionsWith(decision.StartIPEQ(start_ip)),
  640. //decision.start_suffix >= query.start_suffix
  641. alert.HasDecisionsWith(decision.StartSuffixGTE(start_sfx)),
  642. )),
  643. alert.Or(
  644. //decision.end_ip < query.end_ip
  645. alert.HasDecisionsWith(decision.EndIPLT(end_ip)),
  646. alert.And(
  647. //decision.end_ip == query.end_ip
  648. alert.HasDecisionsWith(decision.EndIPEQ(end_ip)),
  649. //decision.end_suffix <= query.end_suffix
  650. alert.HasDecisionsWith(decision.EndSuffixLTE(end_sfx)),
  651. ),
  652. ),
  653. ))
  654. }
  655. } else if ip_sz != 0 {
  656. return nil, errors.Wrapf(InvalidFilter, "Unknown ip size %d", ip_sz)
  657. }
  658. return alerts, nil
  659. }
  660. func (c *Client) TotalAlerts() (int, error) {
  661. return c.Ent.Alert.Query().Count(c.CTX)
  662. }
  663. func (c *Client) QueryAlertWithFilter(filter map[string][]string) ([]*ent.Alert, error) {
  664. sort := "DESC" // we sort by desc by default
  665. if val, ok := filter["sort"]; ok {
  666. if val[0] != "ASC" && val[0] != "DESC" {
  667. c.Log.Errorf("invalid 'sort' parameter: %s", val)
  668. } else {
  669. sort = val[0]
  670. }
  671. }
  672. limit := defaultLimit
  673. if val, ok := filter["limit"]; ok {
  674. limitConv, err := strconv.Atoi(val[0])
  675. if err != nil {
  676. return []*ent.Alert{}, errors.Wrapf(QueryFail, "bad limit in parameters: %s", val)
  677. }
  678. limit = limitConv
  679. }
  680. offset := 0
  681. ret := make([]*ent.Alert, 0)
  682. for {
  683. alerts := c.Ent.Alert.Query()
  684. alerts, err := BuildAlertRequestFromFilter(alerts, filter)
  685. if err != nil {
  686. return []*ent.Alert{}, err
  687. }
  688. alerts = alerts.
  689. WithDecisions().
  690. WithEvents().
  691. WithMetas().
  692. WithOwner()
  693. if sort == "ASC" {
  694. alerts = alerts.Order(ent.Asc(alert.FieldCreatedAt))
  695. } else {
  696. alerts = alerts.Order(ent.Desc(alert.FieldCreatedAt))
  697. }
  698. if limit == 0 {
  699. limit, err = alerts.Count(c.CTX)
  700. if err != nil {
  701. return []*ent.Alert{}, fmt.Errorf("unable to count nb alerts: %s", err)
  702. }
  703. }
  704. result, err := alerts.Limit(paginationSize).Offset(offset).All(c.CTX)
  705. if err != nil {
  706. return []*ent.Alert{}, errors.Wrapf(QueryFail, "pagination size: %d, offset: %d: %s", paginationSize, offset, err)
  707. }
  708. if diff := limit - len(ret); diff < paginationSize {
  709. if len(result) < diff {
  710. ret = append(ret, result...)
  711. c.Log.Debugf("Pagination done, %d < %d", len(result), diff)
  712. break
  713. }
  714. ret = append(ret, result[0:diff]...)
  715. } else {
  716. ret = append(ret, result...)
  717. }
  718. if len(ret) == limit || len(ret) == 0 {
  719. c.Log.Debugf("Pagination done len(ret) = %d", len(ret))
  720. break
  721. }
  722. offset += paginationSize
  723. }
  724. return ret, nil
  725. }
  726. func (c *Client) DeleteAlertGraphBatch(alertItems []*ent.Alert) (int, error) {
  727. idList := make([]int, 0)
  728. for _, alert := range alertItems {
  729. idList = append(idList, int(alert.ID))
  730. }
  731. deleted, err := c.Ent.Alert.Delete().
  732. Where(alert.IDIn(idList...)).Exec(c.CTX)
  733. if err != nil {
  734. c.Log.Warningf("DeleteAlertGraph : %s", err)
  735. return deleted, errors.Wrapf(DeleteFail, "alert graph delete batch")
  736. }
  737. c.Log.Debug("Done batch delete alerts")
  738. return deleted, nil
  739. }
  740. func (c *Client) DeleteAlertGraph(alertItem *ent.Alert) error {
  741. // delete the associated events
  742. _, err := c.Ent.Event.Delete().
  743. Where(event.HasOwnerWith(alert.IDEQ(alertItem.ID))).Exec(c.CTX)
  744. if err != nil {
  745. c.Log.Warningf("DeleteAlertGraph : %s", err)
  746. return errors.Wrapf(DeleteFail, "event with alert ID '%d'", alertItem.ID)
  747. }
  748. // delete the associated meta
  749. _, err = c.Ent.Meta.Delete().
  750. Where(meta.HasOwnerWith(alert.IDEQ(alertItem.ID))).Exec(c.CTX)
  751. if err != nil {
  752. c.Log.Warningf("DeleteAlertGraph : %s", err)
  753. return errors.Wrapf(DeleteFail, "meta with alert ID '%d'", alertItem.ID)
  754. }
  755. // delete the associated decisions
  756. _, err = c.Ent.Decision.Delete().
  757. Where(decision.HasOwnerWith(alert.IDEQ(alertItem.ID))).Exec(c.CTX)
  758. if err != nil {
  759. c.Log.Warningf("DeleteAlertGraph : %s", err)
  760. return errors.Wrapf(DeleteFail, "decision with alert ID '%d'", alertItem.ID)
  761. }
  762. // delete the alert
  763. err = c.Ent.Alert.DeleteOne(alertItem).Exec(c.CTX)
  764. if err != nil {
  765. c.Log.Warningf("DeleteAlertGraph : %s", err)
  766. return errors.Wrapf(DeleteFail, "alert with ID '%d'", alertItem.ID)
  767. }
  768. return nil
  769. }
  770. func (c *Client) DeleteAlertWithFilter(filter map[string][]string) (int, error) {
  771. var err error
  772. // Get all the alerts that match the filter
  773. alertsToDelete, err := c.QueryAlertWithFilter(filter)
  774. for _, alertItem := range alertsToDelete {
  775. err = c.DeleteAlertGraph(alertItem)
  776. if err != nil {
  777. c.Log.Warningf("DeleteAlertWithFilter : %s", err)
  778. return 0, errors.Wrapf(DeleteFail, "event with alert ID '%d'", alertItem.ID)
  779. }
  780. }
  781. return len(alertsToDelete), nil
  782. }
  783. func (c *Client) FlushOrphans() {
  784. /* While it has only been linked to some very corner-case bug : https://github.com/crowdsecurity/crowdsec/issues/778 */
  785. /* We want to take care of orphaned events for which the parent alert/decision has been deleted */
  786. events_count, err := c.Ent.Event.Delete().Where(event.Not(event.HasOwner())).Exec(c.CTX)
  787. if err != nil {
  788. c.Log.Warningf("error while deleting orphan events : %s", err)
  789. return
  790. }
  791. if events_count > 0 {
  792. c.Log.Infof("%d deleted orphan events", events_count)
  793. }
  794. events_count, err = c.Ent.Decision.Delete().Where(
  795. decision.Not(decision.HasOwner())).Where(decision.UntilLTE(time.Now())).Exec(c.CTX)
  796. if err != nil {
  797. c.Log.Warningf("error while deleting orphan decisions : %s", err)
  798. return
  799. }
  800. if events_count > 0 {
  801. c.Log.Infof("%d deleted orphan decisions", events_count)
  802. }
  803. }
  804. func (c *Client) FlushAlerts(MaxAge string, MaxItems int) error {
  805. var deletedByAge int
  806. var deletedByNbItem int
  807. var totalAlerts int
  808. var err error
  809. if !c.CanFlush {
  810. c.Log.Debug("a list is being imported, flushing later")
  811. return nil
  812. }
  813. c.Log.Debug("Flushing orphan alerts")
  814. c.FlushOrphans()
  815. c.Log.Debug("Done flushing orphan alerts")
  816. totalAlerts, err = c.TotalAlerts()
  817. if err != nil {
  818. c.Log.Warningf("FlushAlerts (max items count) : %s", err)
  819. return errors.Wrap(err, "unable to get alerts count")
  820. }
  821. c.Log.Debugf("FlushAlerts (Total alerts): %d", totalAlerts)
  822. if MaxAge != "" {
  823. filter := map[string][]string{
  824. "created_before": {MaxAge},
  825. }
  826. nbDeleted, err := c.DeleteAlertWithFilter(filter)
  827. if err != nil {
  828. c.Log.Warningf("FlushAlerts (max age) : %s", err)
  829. return errors.Wrapf(err, "unable to flush alerts with filter until: %s", MaxAge)
  830. }
  831. c.Log.Debugf("FlushAlerts (deleted max age alerts): %d", nbDeleted)
  832. deletedByAge = nbDeleted
  833. }
  834. if MaxItems > 0 {
  835. //We get the highest id for the alerts
  836. //We substract MaxItems to avoid deleting alerts that are not old enough
  837. //This gives us the oldest alert that we want to keep
  838. //We then delete all the alerts with an id lower than this one
  839. //We can do this because the id is auto-increment, and the database won't reuse the same id twice
  840. lastAlert, err := c.QueryAlertWithFilter(map[string][]string{
  841. "sort": {"DESC"},
  842. "limit": {"1"},
  843. })
  844. c.Log.Debugf("FlushAlerts (last alert): %+v", lastAlert)
  845. if err != nil {
  846. c.Log.Errorf("FlushAlerts: could not get last alert: %s", err)
  847. return errors.Wrap(err, "could not get last alert")
  848. }
  849. if len(lastAlert) != 0 {
  850. maxid := lastAlert[0].ID - MaxItems
  851. c.Log.Debugf("FlushAlerts (max id): %d", maxid)
  852. if maxid > 0 {
  853. //This may lead to orphan alerts (at least on MySQL), but the next time the flush job will run, they will be deleted
  854. deletedByNbItem, err = c.Ent.Alert.Delete().Where(alert.IDLT(maxid)).Exec(c.CTX)
  855. if err != nil {
  856. c.Log.Errorf("FlushAlerts: Could not delete alerts : %s", err)
  857. return errors.Wrap(err, "could not delete alerts")
  858. }
  859. }
  860. }
  861. }
  862. if deletedByNbItem > 0 {
  863. c.Log.Infof("flushed %d/%d alerts because max number of alerts has been reached (%d max)", deletedByNbItem, totalAlerts, MaxItems)
  864. }
  865. if deletedByAge > 0 {
  866. c.Log.Infof("flushed %d/%d alerts because they were created %s ago or more", deletedByAge, totalAlerts, MaxAge)
  867. }
  868. return nil
  869. }
  870. func (c *Client) GetAlertByID(alertID int) (*ent.Alert, error) {
  871. alert, err := c.Ent.Alert.Query().Where(alert.IDEQ(alertID)).WithDecisions().WithEvents().WithMetas().WithOwner().First(c.CTX)
  872. if err != nil {
  873. /*record not found, 404*/
  874. if ent.IsNotFound(err) {
  875. log.Warningf("GetAlertByID (not found): %s", err)
  876. return &ent.Alert{}, ItemNotFound
  877. }
  878. c.Log.Warningf("GetAlertByID : %s", err)
  879. return &ent.Alert{}, QueryFail
  880. }
  881. return alert, nil
  882. }