alerts.go 31 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964
  1. package database
  2. import (
  3. "encoding/json"
  4. "fmt"
  5. "strconv"
  6. "strings"
  7. "time"
  8. "github.com/crowdsecurity/crowdsec/pkg/database/ent"
  9. "github.com/crowdsecurity/crowdsec/pkg/database/ent/alert"
  10. "github.com/crowdsecurity/crowdsec/pkg/database/ent/decision"
  11. "github.com/crowdsecurity/crowdsec/pkg/database/ent/event"
  12. "github.com/crowdsecurity/crowdsec/pkg/database/ent/meta"
  13. "github.com/crowdsecurity/crowdsec/pkg/models"
  14. "github.com/crowdsecurity/crowdsec/pkg/types"
  15. "github.com/davecgh/go-spew/spew"
  16. "github.com/pkg/errors"
  17. log "github.com/sirupsen/logrus"
  18. )
  19. const (
  20. paginationSize = 100 // used to queryAlert to avoid 'too many SQL variable'
  21. defaultLimit = 100 // default limit of element to returns when query alerts
  22. bulkSize = 50 // bulk size when create alerts
  23. decisionBulkSize = 50
  24. )
  25. func formatAlertAsString(machineId string, alert *models.Alert) []string {
  26. var retStr []string
  27. /**/
  28. src := ""
  29. if alert.Source != nil {
  30. if *alert.Source.Scope == types.Ip {
  31. src = fmt.Sprintf("ip %s", *alert.Source.Value)
  32. if alert.Source.Cn != "" {
  33. src += " (" + alert.Source.Cn
  34. if alert.Source.AsNumber != "" {
  35. src += "/" + alert.Source.AsNumber
  36. }
  37. src += ")"
  38. }
  39. } else if *alert.Source.Scope == types.Range {
  40. src = fmt.Sprintf("range %s", *alert.Source.Value)
  41. if alert.Source.Cn != "" {
  42. src += " (" + alert.Source.Cn
  43. if alert.Source.AsNumber != "" {
  44. src += "/" + alert.Source.AsNumber
  45. }
  46. src += ")"
  47. }
  48. } else {
  49. src = fmt.Sprintf("%s %s", *alert.Source.Scope, *alert.Source.Value)
  50. }
  51. } else {
  52. src = "empty source"
  53. }
  54. /**/
  55. reason := ""
  56. if *alert.Scenario != "" {
  57. reason = fmt.Sprintf("%s by %s", *alert.Scenario, src)
  58. } else if *alert.Message != "" {
  59. reason = fmt.Sprintf("%s by %s", *alert.Scenario, src)
  60. } else {
  61. reason = fmt.Sprintf("empty scenario by %s", src)
  62. }
  63. if len(alert.Decisions) > 0 {
  64. for _, decisionItem := range alert.Decisions {
  65. decision := ""
  66. if alert.Simulated != nil && *alert.Simulated {
  67. decision = "(simulated alert)"
  68. } else if decisionItem.Simulated != nil && *decisionItem.Simulated {
  69. decision = "(simulated decision)"
  70. }
  71. if log.GetLevel() >= log.DebugLevel {
  72. /*spew is expensive*/
  73. log.Debugf("%s", spew.Sdump(decisionItem))
  74. }
  75. decision += fmt.Sprintf("%s %s on %s %s", *decisionItem.Duration,
  76. *decisionItem.Type, *decisionItem.Scope, *decisionItem.Value)
  77. retStr = append(retStr,
  78. fmt.Sprintf("(%s/%s) %s : %s", machineId,
  79. *decisionItem.Origin, reason, decision))
  80. }
  81. } else {
  82. retStr = append(retStr, fmt.Sprintf("(%s) alert : %s", machineId, reason))
  83. }
  84. return retStr
  85. }
  86. func (c *Client) CreateAlert(machineID string, alertList []*models.Alert) ([]string, error) {
  87. pageStart := 0
  88. pageEnd := bulkSize
  89. ret := []string{}
  90. for {
  91. if pageEnd >= len(alertList) {
  92. results, err := c.CreateAlertBulk(machineID, alertList[pageStart:])
  93. if err != nil {
  94. return []string{}, fmt.Errorf("unable to create alerts: %s", err)
  95. }
  96. ret = append(ret, results...)
  97. break
  98. }
  99. results, err := c.CreateAlertBulk(machineID, alertList[pageStart:pageEnd])
  100. if err != nil {
  101. return []string{}, fmt.Errorf("unable to create alerts: %s", err)
  102. }
  103. ret = append(ret, results...)
  104. pageStart += bulkSize
  105. pageEnd += bulkSize
  106. }
  107. return ret, nil
  108. }
  109. /*We can't bulk both the alert and the decision at the same time. With new consensus, we want to bulk a single alert with a lot of decisions.*/
  110. func (c *Client) UpdateCommunityBlocklist(alertItem *models.Alert) (int, int, int, error) {
  111. var err error
  112. var deleted, inserted int
  113. if alertItem == nil {
  114. return 0, 0, 0, fmt.Errorf("nil alert")
  115. }
  116. if alertItem.StartAt == nil {
  117. return 0, 0, 0, fmt.Errorf("nil start_at")
  118. }
  119. startAtTime, err := time.Parse(time.RFC3339, *alertItem.StartAt)
  120. if err != nil {
  121. return 0, 0, 0, errors.Wrapf(ParseTimeFail, "start_at field time '%s': %s", *alertItem.StartAt, err)
  122. }
  123. if alertItem.StopAt == nil {
  124. return 0, 0, 0, fmt.Errorf("nil stop_at")
  125. }
  126. stopAtTime, err := time.Parse(time.RFC3339, *alertItem.StopAt)
  127. if err != nil {
  128. return 0, 0, 0, errors.Wrapf(ParseTimeFail, "stop_at field time '%s': %s", *alertItem.StopAt, err)
  129. }
  130. ts, err := time.Parse(time.RFC3339, *alertItem.StopAt)
  131. if err != nil {
  132. c.Log.Errorf("While parsing StartAt of item %s : %s", *alertItem.StopAt, err)
  133. ts = time.Now()
  134. }
  135. alertB := c.Ent.Alert.
  136. Create().
  137. SetScenario(*alertItem.Scenario).
  138. SetMessage(*alertItem.Message).
  139. SetEventsCount(*alertItem.EventsCount).
  140. SetStartedAt(startAtTime).
  141. SetStoppedAt(stopAtTime).
  142. SetSourceScope(*alertItem.Source.Scope).
  143. SetSourceValue(*alertItem.Source.Value).
  144. SetSourceIp(alertItem.Source.IP).
  145. SetSourceRange(alertItem.Source.Range).
  146. SetSourceAsNumber(alertItem.Source.AsNumber).
  147. SetSourceAsName(alertItem.Source.AsName).
  148. SetSourceCountry(alertItem.Source.Cn).
  149. SetSourceLatitude(alertItem.Source.Latitude).
  150. SetSourceLongitude(alertItem.Source.Longitude).
  151. SetCapacity(*alertItem.Capacity).
  152. SetLeakSpeed(*alertItem.Leakspeed).
  153. SetSimulated(*alertItem.Simulated).
  154. SetScenarioVersion(*alertItem.ScenarioVersion).
  155. SetScenarioHash(*alertItem.ScenarioHash)
  156. alertRef, err := alertB.Save(c.CTX)
  157. if err != nil {
  158. return 0, 0, 0, errors.Wrapf(BulkError, "error creating alert : %s", err)
  159. }
  160. if len(alertItem.Decisions) > 0 {
  161. decisionBulk := make([]*ent.DecisionCreate, 0, decisionBulkSize)
  162. valueList := make([]string, 0, decisionBulkSize)
  163. for i, decisionItem := range alertItem.Decisions {
  164. var start_ip, start_sfx, end_ip, end_sfx int64
  165. var sz int
  166. if decisionItem.Duration == nil {
  167. log.Warningf("nil duration in community decision")
  168. continue
  169. }
  170. duration, err := time.ParseDuration(*decisionItem.Duration)
  171. if err != nil {
  172. return 0, 0, 0, errors.Wrapf(ParseDurationFail, "decision duration '%v' : %s", decisionItem.Duration, err)
  173. }
  174. if decisionItem.Scope == nil {
  175. log.Warningf("nil scope in community decision")
  176. continue
  177. }
  178. /*if the scope is IP or Range, convert the value to integers */
  179. if strings.ToLower(*decisionItem.Scope) == "ip" || strings.ToLower(*decisionItem.Scope) == "range" {
  180. sz, start_ip, start_sfx, end_ip, end_sfx, err = types.Addr2Ints(*decisionItem.Value)
  181. if err != nil {
  182. return 0, 0, 0, errors.Wrapf(ParseDurationFail, "invalid addr/range %s : %s", *decisionItem.Value, err)
  183. }
  184. }
  185. /*bulk insert some new decisions*/
  186. decisionBulk = append(decisionBulk, c.Ent.Decision.Create().
  187. SetUntil(ts.Add(duration)).
  188. SetScenario(*decisionItem.Scenario).
  189. SetType(*decisionItem.Type).
  190. SetStartIP(start_ip).
  191. SetStartSuffix(start_sfx).
  192. SetEndIP(end_ip).
  193. SetEndSuffix(end_sfx).
  194. SetIPSize(int64(sz)).
  195. SetValue(*decisionItem.Value).
  196. SetScope(*decisionItem.Scope).
  197. SetOrigin(*decisionItem.Origin).
  198. SetSimulated(*alertItem.Simulated).
  199. SetOwner(alertRef))
  200. /*for bulk delete of duplicate decisions*/
  201. if decisionItem.Value == nil {
  202. log.Warningf("nil value in community decision")
  203. continue
  204. }
  205. valueList = append(valueList, *decisionItem.Value)
  206. if len(decisionBulk) == decisionBulkSize {
  207. insertedDecisions, err := c.Ent.Decision.CreateBulk(decisionBulk...).Save(c.CTX)
  208. if err != nil {
  209. return 0, 0, 0, errors.Wrapf(BulkError, "bulk creating decisions : %s", err)
  210. }
  211. inserted += len(insertedDecisions)
  212. /*Deleting older decisions from capi*/
  213. deletedDecisions, err := c.Ent.Decision.Delete().
  214. Where(decision.And(
  215. decision.OriginEQ(CapiMachineID),
  216. decision.Not(decision.HasOwnerWith(alert.IDEQ(alertRef.ID))),
  217. decision.ValueIn(valueList...),
  218. )).Exec(c.CTX)
  219. if err != nil {
  220. return 0, 0, 0, errors.Wrap(err, "while deleting older community blocklist decisions")
  221. }
  222. deleted += deletedDecisions
  223. if len(alertItem.Decisions)-i <= decisionBulkSize {
  224. decisionBulk = make([]*ent.DecisionCreate, 0, (len(alertItem.Decisions) - i))
  225. valueList = make([]string, 0, (len(alertItem.Decisions) - i))
  226. } else {
  227. decisionBulk = make([]*ent.DecisionCreate, 0, decisionBulkSize)
  228. valueList = make([]string, 0, decisionBulkSize)
  229. }
  230. // The 90's called, they want their concurrency back.
  231. // This is needed for sqlite, which does not support concurrent access while writing.
  232. // If we pull a large number of IPs from CAPI, and we have a slow disk, LAPI won't respond until all IPs are inserted (which can take up to a few seconds).
  233. time.Sleep(100 * time.Millisecond)
  234. }
  235. }
  236. insertedDecisions, err := c.Ent.Decision.CreateBulk(decisionBulk...).Save(c.CTX)
  237. if err != nil {
  238. return 0, 0, 0, errors.Wrapf(BulkError, "creating alert decisions: %s", err)
  239. }
  240. inserted += len(insertedDecisions)
  241. /*Deleting older decisions from capi*/
  242. if len(valueList) > 0 {
  243. deletedDecisions, err := c.Ent.Decision.Delete().
  244. Where(decision.And(
  245. decision.OriginEQ(CapiMachineID),
  246. decision.Not(decision.HasOwnerWith(alert.IDEQ(alertRef.ID))),
  247. decision.ValueIn(valueList...),
  248. )).Exec(c.CTX)
  249. if err != nil {
  250. return 0, 0, 0, errors.Wrap(err, "while deleting older community blocklist decisions")
  251. }
  252. deleted += deletedDecisions
  253. }
  254. }
  255. return alertRef.ID, inserted, deleted, nil
  256. }
  257. func chunkDecisions(decisions []*ent.Decision, chunkSize int) [][]*ent.Decision {
  258. var ret [][]*ent.Decision
  259. var chunk []*ent.Decision
  260. for _, d := range decisions {
  261. chunk = append(chunk, d)
  262. if len(chunk) == chunkSize {
  263. ret = append(ret, chunk)
  264. chunk = nil
  265. }
  266. }
  267. if len(chunk) > 0 {
  268. ret = append(ret, chunk)
  269. }
  270. return ret
  271. }
  272. func (c *Client) CreateAlertBulk(machineId string, alertList []*models.Alert) ([]string, error) {
  273. ret := []string{}
  274. bulkSize := 20
  275. c.Log.Debugf("writting %d items", len(alertList))
  276. bulk := make([]*ent.AlertCreate, 0, bulkSize)
  277. alertDecisions := make([][]*ent.Decision, 0, bulkSize)
  278. for i, alertItem := range alertList {
  279. var decisions []*ent.Decision
  280. var metas []*ent.Meta
  281. var events []*ent.Event
  282. owner, err := c.QueryMachineByID(machineId)
  283. if err != nil {
  284. if errors.Cause(err) != UserNotExists {
  285. return []string{}, errors.Wrapf(QueryFail, "machine '%s': %s", alertItem.MachineID, err)
  286. }
  287. c.Log.Debugf("CreateAlertBulk: Machine Id %s doesn't exist", machineId)
  288. owner = nil
  289. }
  290. startAtTime, err := time.Parse(time.RFC3339, *alertItem.StartAt)
  291. if err != nil {
  292. return []string{}, errors.Wrapf(ParseTimeFail, "start_at field time '%s': %s", *alertItem.StartAt, err)
  293. }
  294. stopAtTime, err := time.Parse(time.RFC3339, *alertItem.StopAt)
  295. if err != nil {
  296. return []string{}, errors.Wrapf(ParseTimeFail, "stop_at field time '%s': %s", *alertItem.StopAt, err)
  297. }
  298. /*display proper alert in logs*/
  299. for _, disp := range formatAlertAsString(machineId, alertItem) {
  300. c.Log.Info(disp)
  301. }
  302. //let's track when we strip or drop data, notify outside of loop to avoid spam
  303. stripped := false
  304. dropped := false
  305. if len(alertItem.Events) > 0 {
  306. eventBulk := make([]*ent.EventCreate, len(alertItem.Events))
  307. for i, eventItem := range alertItem.Events {
  308. ts, err := time.Parse(time.RFC3339, *eventItem.Timestamp)
  309. if err != nil {
  310. return []string{}, errors.Wrapf(ParseTimeFail, "event timestamp '%s' : %s", *eventItem.Timestamp, err)
  311. }
  312. marshallMetas, err := json.Marshal(eventItem.Meta)
  313. if err != nil {
  314. return []string{}, errors.Wrapf(MarshalFail, "event meta '%v' : %s", eventItem.Meta, err)
  315. }
  316. //the serialized field is too big, let's try to progressively strip it
  317. if event.SerializedValidator(string(marshallMetas)) != nil {
  318. stripped = true
  319. valid := false
  320. stripSize := 2048
  321. for !valid && stripSize > 0 {
  322. for _, serializedItem := range eventItem.Meta {
  323. if len(serializedItem.Value) > stripSize*2 {
  324. serializedItem.Value = serializedItem.Value[:stripSize] + "<stripped>"
  325. }
  326. }
  327. marshallMetas, err = json.Marshal(eventItem.Meta)
  328. if err != nil {
  329. return []string{}, errors.Wrapf(MarshalFail, "event meta '%v' : %s", eventItem.Meta, err)
  330. }
  331. if event.SerializedValidator(string(marshallMetas)) == nil {
  332. valid = true
  333. }
  334. stripSize /= 2
  335. }
  336. //nothing worked, drop it
  337. if !valid {
  338. dropped = true
  339. stripped = false
  340. marshallMetas = []byte("")
  341. }
  342. }
  343. eventBulk[i] = c.Ent.Event.Create().
  344. SetTime(ts).
  345. SetSerialized(string(marshallMetas))
  346. }
  347. if stripped {
  348. c.Log.Warningf("stripped 'serialized' field (machine %s / scenario %s)", machineId, *alertItem.Scenario)
  349. }
  350. if dropped {
  351. c.Log.Warningf("dropped 'serialized' field (machine %s / scenario %s)", machineId, *alertItem.Scenario)
  352. }
  353. events, err = c.Ent.Event.CreateBulk(eventBulk...).Save(c.CTX)
  354. if err != nil {
  355. return []string{}, errors.Wrapf(BulkError, "creating alert events: %s", err)
  356. }
  357. }
  358. if len(alertItem.Meta) > 0 {
  359. metaBulk := make([]*ent.MetaCreate, len(alertItem.Meta))
  360. for i, metaItem := range alertItem.Meta {
  361. metaBulk[i] = c.Ent.Meta.Create().
  362. SetKey(metaItem.Key).
  363. SetValue(metaItem.Value)
  364. }
  365. metas, err = c.Ent.Meta.CreateBulk(metaBulk...).Save(c.CTX)
  366. if err != nil {
  367. return []string{}, errors.Wrapf(BulkError, "creating alert meta: %s", err)
  368. }
  369. }
  370. ts, err := time.Parse(time.RFC3339, *alertItem.StopAt)
  371. if err != nil {
  372. c.Log.Errorf("While parsing StartAt of item %s : %s", *alertItem.StopAt, err)
  373. ts = time.Now()
  374. }
  375. decisions = make([]*ent.Decision, 0)
  376. if len(alertItem.Decisions) > 0 {
  377. decisionBulk := make([]*ent.DecisionCreate, 0, decisionBulkSize)
  378. for i, decisionItem := range alertItem.Decisions {
  379. var start_ip, start_sfx, end_ip, end_sfx int64
  380. var sz int
  381. duration, err := time.ParseDuration(*decisionItem.Duration)
  382. if err != nil {
  383. return []string{}, errors.Wrapf(ParseDurationFail, "decision duration '%v' : %s", decisionItem.Duration, err)
  384. }
  385. /*if the scope is IP or Range, convert the value to integers */
  386. if strings.ToLower(*decisionItem.Scope) == "ip" || strings.ToLower(*decisionItem.Scope) == "range" {
  387. sz, start_ip, start_sfx, end_ip, end_sfx, err = types.Addr2Ints(*decisionItem.Value)
  388. if err != nil {
  389. return []string{}, errors.Wrapf(ParseDurationFail, "invalid addr/range %s : %s", *decisionItem.Value, err)
  390. }
  391. }
  392. decisionCreate := c.Ent.Decision.Create().
  393. SetUntil(ts.Add(duration)).
  394. SetScenario(*decisionItem.Scenario).
  395. SetType(*decisionItem.Type).
  396. SetStartIP(start_ip).
  397. SetStartSuffix(start_sfx).
  398. SetEndIP(end_ip).
  399. SetEndSuffix(end_sfx).
  400. SetIPSize(int64(sz)).
  401. SetValue(*decisionItem.Value).
  402. SetScope(*decisionItem.Scope).
  403. SetOrigin(*decisionItem.Origin).
  404. SetSimulated(*alertItem.Simulated)
  405. decisionBulk = append(decisionBulk, decisionCreate)
  406. if len(decisionBulk) == decisionBulkSize {
  407. decisionsCreateRet, err := c.Ent.Decision.CreateBulk(decisionBulk...).Save(c.CTX)
  408. if err != nil {
  409. return []string{}, errors.Wrapf(BulkError, "creating alert decisions: %s", err)
  410. }
  411. decisions = append(decisions, decisionsCreateRet...)
  412. if len(alertItem.Decisions)-i <= decisionBulkSize {
  413. decisionBulk = make([]*ent.DecisionCreate, 0, (len(alertItem.Decisions) - i))
  414. } else {
  415. decisionBulk = make([]*ent.DecisionCreate, 0, decisionBulkSize)
  416. }
  417. }
  418. }
  419. decisionsCreateRet, err := c.Ent.Decision.CreateBulk(decisionBulk...).Save(c.CTX)
  420. if err != nil {
  421. return []string{}, errors.Wrapf(BulkError, "creating alert decisions: %s", err)
  422. }
  423. decisions = append(decisions, decisionsCreateRet...)
  424. }
  425. alertB := c.Ent.Alert.
  426. Create().
  427. SetScenario(*alertItem.Scenario).
  428. SetMessage(*alertItem.Message).
  429. SetEventsCount(*alertItem.EventsCount).
  430. SetStartedAt(startAtTime).
  431. SetStoppedAt(stopAtTime).
  432. SetSourceScope(*alertItem.Source.Scope).
  433. SetSourceValue(*alertItem.Source.Value).
  434. SetSourceIp(alertItem.Source.IP).
  435. SetSourceRange(alertItem.Source.Range).
  436. SetSourceAsNumber(alertItem.Source.AsNumber).
  437. SetSourceAsName(alertItem.Source.AsName).
  438. SetSourceCountry(alertItem.Source.Cn).
  439. SetSourceLatitude(alertItem.Source.Latitude).
  440. SetSourceLongitude(alertItem.Source.Longitude).
  441. SetCapacity(*alertItem.Capacity).
  442. SetLeakSpeed(*alertItem.Leakspeed).
  443. SetSimulated(*alertItem.Simulated).
  444. SetScenarioVersion(*alertItem.ScenarioVersion).
  445. SetScenarioHash(*alertItem.ScenarioHash).
  446. AddEvents(events...).
  447. AddMetas(metas...)
  448. if owner != nil {
  449. alertB.SetOwner(owner)
  450. }
  451. bulk = append(bulk, alertB)
  452. alertDecisions = append(alertDecisions, decisions)
  453. if len(bulk) == bulkSize {
  454. alerts, err := c.Ent.Alert.CreateBulk(bulk...).Save(c.CTX)
  455. if err != nil {
  456. return []string{}, errors.Wrapf(BulkError, "bulk creating alert : %s", err)
  457. }
  458. for alertIndex, a := range alerts {
  459. ret = append(ret, strconv.Itoa(a.ID))
  460. d := alertDecisions[alertIndex]
  461. decisionsChunk := chunkDecisions(d, bulkSize)
  462. for _, d2 := range decisionsChunk {
  463. _, err := c.Ent.Alert.Update().Where(alert.IDEQ(a.ID)).AddDecisions(d2...).Save(c.CTX)
  464. if err != nil {
  465. return []string{}, fmt.Errorf("error while updating decisions: %s", err.Error())
  466. }
  467. }
  468. }
  469. if len(alertList)-i <= bulkSize {
  470. bulk = make([]*ent.AlertCreate, 0, (len(alertList) - i))
  471. alertDecisions = make([][]*ent.Decision, 0, (len(alertList) - i))
  472. } else {
  473. bulk = make([]*ent.AlertCreate, 0, bulkSize)
  474. alertDecisions = make([][]*ent.Decision, 0, bulkSize)
  475. }
  476. }
  477. }
  478. alerts, err := c.Ent.Alert.CreateBulk(bulk...).Save(c.CTX)
  479. if err != nil {
  480. return []string{}, errors.Wrapf(BulkError, "leftovers creating alert : %s", err)
  481. }
  482. for alertIndex, a := range alerts {
  483. ret = append(ret, strconv.Itoa(a.ID))
  484. d := alertDecisions[alertIndex]
  485. decisionsChunk := chunkDecisions(d, bulkSize)
  486. for _, d2 := range decisionsChunk {
  487. _, err := c.Ent.Alert.Update().Where(alert.IDEQ(a.ID)).AddDecisions(d2...).Save(c.CTX)
  488. if err != nil {
  489. return []string{}, fmt.Errorf("error while updating decisions: %s", err.Error())
  490. }
  491. }
  492. }
  493. return ret, nil
  494. }
  495. func BuildAlertRequestFromFilter(alerts *ent.AlertQuery, filter map[string][]string) (*ent.AlertQuery, error) {
  496. var err error
  497. var start_ip, start_sfx, end_ip, end_sfx int64
  498. var hasActiveDecision bool
  499. var ip_sz int
  500. var contains bool = true
  501. /*if contains is true, return bans that *contains* the given value (value is the inner)
  502. else, return bans that are *contained* by the given value (value is the outer)*/
  503. /*the simulated filter is a bit different : if it's not present *or* set to false, specifically exclude records with simulated to true */
  504. if v, ok := filter["simulated"]; ok {
  505. if v[0] == "false" {
  506. alerts = alerts.Where(alert.SimulatedEQ(false))
  507. }
  508. delete(filter, "simulated")
  509. }
  510. for param, value := range filter {
  511. switch param {
  512. case "contains":
  513. contains, err = strconv.ParseBool(value[0])
  514. if err != nil {
  515. return nil, errors.Wrapf(InvalidFilter, "invalid contains value : %s", err)
  516. }
  517. case "scope":
  518. var scope string = value[0]
  519. if strings.ToLower(scope) == "ip" {
  520. scope = types.Ip
  521. } else if strings.ToLower(scope) == "range" {
  522. scope = types.Range
  523. }
  524. alerts = alerts.Where(alert.SourceScopeEQ(scope))
  525. case "value":
  526. alerts = alerts.Where(alert.SourceValueEQ(value[0]))
  527. case "scenario":
  528. alerts = alerts.Where(alert.ScenarioEQ(value[0]))
  529. case "ip", "range":
  530. ip_sz, start_ip, start_sfx, end_ip, end_sfx, err = types.Addr2Ints(value[0])
  531. if err != nil {
  532. return nil, errors.Wrapf(InvalidIPOrRange, "unable to convert '%s' to int: %s", value[0], err)
  533. }
  534. case "since":
  535. duration, err := types.ParseDuration(value[0])
  536. if err != nil {
  537. return nil, errors.Wrap(err, "while parsing duration")
  538. }
  539. since := time.Now().Add(-duration)
  540. if since.IsZero() {
  541. return nil, fmt.Errorf("Empty time now() - %s", since.String())
  542. }
  543. alerts = alerts.Where(alert.StartedAtGTE(since))
  544. case "created_before":
  545. duration, err := types.ParseDuration(value[0])
  546. if err != nil {
  547. return nil, errors.Wrap(err, "while parsing duration")
  548. }
  549. since := time.Now().Add(-duration)
  550. if since.IsZero() {
  551. return nil, fmt.Errorf("Empty time now() - %s", since.String())
  552. }
  553. alerts = alerts.Where(alert.CreatedAtLTE(since))
  554. case "until":
  555. duration, err := types.ParseDuration(value[0])
  556. if err != nil {
  557. return nil, errors.Wrap(err, "while parsing duration")
  558. }
  559. until := time.Now().Add(-duration)
  560. if until.IsZero() {
  561. return nil, fmt.Errorf("Empty time now() - %s", until.String())
  562. }
  563. alerts = alerts.Where(alert.StartedAtLTE(until))
  564. case "decision_type":
  565. alerts = alerts.Where(alert.HasDecisionsWith(decision.TypeEQ(value[0])))
  566. case "include_capi": //allows to exclude one or more specific origins
  567. if value[0] == "false" {
  568. alerts = alerts.Where(alert.HasDecisionsWith(decision.OriginNEQ(CapiMachineID)))
  569. } else if value[0] != "true" {
  570. log.Errorf("Invalid bool '%s' for include_capi", value[0])
  571. }
  572. case "has_active_decision":
  573. if hasActiveDecision, err = strconv.ParseBool(value[0]); err != nil {
  574. return nil, errors.Wrapf(ParseType, "'%s' is not a boolean: %s", value[0], err)
  575. }
  576. if hasActiveDecision {
  577. alerts = alerts.Where(alert.HasDecisionsWith(decision.UntilGTE(time.Now())))
  578. } else {
  579. alerts = alerts.Where(alert.Not(alert.HasDecisions()))
  580. }
  581. case "limit":
  582. continue
  583. case "sort":
  584. continue
  585. default:
  586. return nil, errors.Wrapf(InvalidFilter, "Filter parameter '%s' is unknown (=%s)", param, value[0])
  587. }
  588. }
  589. if ip_sz == 4 {
  590. if contains { /*decision contains {start_ip,end_ip}*/
  591. alerts = alerts.Where(alert.And(
  592. alert.HasDecisionsWith(decision.StartIPLTE(start_ip)),
  593. alert.HasDecisionsWith(decision.EndIPGTE(end_ip)),
  594. alert.HasDecisionsWith(decision.IPSizeEQ(int64(ip_sz))),
  595. ))
  596. } else { /*decision is contained within {start_ip,end_ip}*/
  597. alerts = alerts.Where(alert.And(
  598. alert.HasDecisionsWith(decision.StartIPGTE(start_ip)),
  599. alert.HasDecisionsWith(decision.EndIPLTE(end_ip)),
  600. alert.HasDecisionsWith(decision.IPSizeEQ(int64(ip_sz))),
  601. ))
  602. }
  603. } else if ip_sz == 16 {
  604. if contains { /*decision contains {start_ip,end_ip}*/
  605. alerts = alerts.Where(alert.And(
  606. //matching addr size
  607. alert.HasDecisionsWith(decision.IPSizeEQ(int64(ip_sz))),
  608. alert.Or(
  609. //decision.start_ip < query.start_ip
  610. alert.HasDecisionsWith(decision.StartIPLT(start_ip)),
  611. alert.And(
  612. //decision.start_ip == query.start_ip
  613. alert.HasDecisionsWith(decision.StartIPEQ(start_ip)),
  614. //decision.start_suffix <= query.start_suffix
  615. alert.HasDecisionsWith(decision.StartSuffixLTE(start_sfx)),
  616. )),
  617. alert.Or(
  618. //decision.end_ip > query.end_ip
  619. alert.HasDecisionsWith(decision.EndIPGT(end_ip)),
  620. alert.And(
  621. //decision.end_ip == query.end_ip
  622. alert.HasDecisionsWith(decision.EndIPEQ(end_ip)),
  623. //decision.end_suffix >= query.end_suffix
  624. alert.HasDecisionsWith(decision.EndSuffixGTE(end_sfx)),
  625. ),
  626. ),
  627. ))
  628. } else { /*decision is contained within {start_ip,end_ip}*/
  629. alerts = alerts.Where(alert.And(
  630. //matching addr size
  631. alert.HasDecisionsWith(decision.IPSizeEQ(int64(ip_sz))),
  632. alert.Or(
  633. //decision.start_ip > query.start_ip
  634. alert.HasDecisionsWith(decision.StartIPGT(start_ip)),
  635. alert.And(
  636. //decision.start_ip == query.start_ip
  637. alert.HasDecisionsWith(decision.StartIPEQ(start_ip)),
  638. //decision.start_suffix >= query.start_suffix
  639. alert.HasDecisionsWith(decision.StartSuffixGTE(start_sfx)),
  640. )),
  641. alert.Or(
  642. //decision.end_ip < query.end_ip
  643. alert.HasDecisionsWith(decision.EndIPLT(end_ip)),
  644. alert.And(
  645. //decision.end_ip == query.end_ip
  646. alert.HasDecisionsWith(decision.EndIPEQ(end_ip)),
  647. //decision.end_suffix <= query.end_suffix
  648. alert.HasDecisionsWith(decision.EndSuffixLTE(end_sfx)),
  649. ),
  650. ),
  651. ))
  652. }
  653. } else if ip_sz != 0 {
  654. return nil, errors.Wrapf(InvalidFilter, "Unknown ip size %d", ip_sz)
  655. }
  656. return alerts, nil
  657. }
  658. func (c *Client) TotalAlerts() (int, error) {
  659. return c.Ent.Alert.Query().Count(c.CTX)
  660. }
  661. func (c *Client) QueryAlertWithFilter(filter map[string][]string) ([]*ent.Alert, error) {
  662. sort := "DESC" // we sort by desc by default
  663. if val, ok := filter["sort"]; ok {
  664. if val[0] != "ASC" && val[0] != "DESC" {
  665. c.Log.Errorf("invalid 'sort' parameter: %s", val)
  666. } else {
  667. sort = val[0]
  668. }
  669. }
  670. limit := defaultLimit
  671. if val, ok := filter["limit"]; ok {
  672. limitConv, err := strconv.Atoi(val[0])
  673. if err != nil {
  674. return []*ent.Alert{}, errors.Wrapf(QueryFail, "bad limit in parameters: %s", val)
  675. }
  676. limit = limitConv
  677. }
  678. offset := 0
  679. ret := make([]*ent.Alert, 0)
  680. for {
  681. alerts := c.Ent.Alert.Query()
  682. alerts, err := BuildAlertRequestFromFilter(alerts, filter)
  683. if err != nil {
  684. return []*ent.Alert{}, err
  685. }
  686. alerts = alerts.
  687. WithDecisions().
  688. WithEvents().
  689. WithMetas().
  690. WithOwner()
  691. if sort == "ASC" {
  692. alerts = alerts.Order(ent.Asc(alert.FieldCreatedAt))
  693. } else {
  694. alerts = alerts.Order(ent.Desc(alert.FieldCreatedAt))
  695. }
  696. if limit == 0 {
  697. limit, err = alerts.Count(c.CTX)
  698. if err != nil {
  699. return []*ent.Alert{}, fmt.Errorf("unable to count nb alerts: %s", err)
  700. }
  701. }
  702. result, err := alerts.Limit(paginationSize).Offset(offset).All(c.CTX)
  703. if err != nil {
  704. return []*ent.Alert{}, errors.Wrapf(QueryFail, "pagination size: %d, offset: %d: %s", paginationSize, offset, err)
  705. }
  706. if diff := limit - len(ret); diff < paginationSize {
  707. if len(result) < diff {
  708. ret = append(ret, result...)
  709. c.Log.Debugf("Pagination done, %d < %d", len(result), diff)
  710. break
  711. }
  712. ret = append(ret, result[0:diff]...)
  713. } else {
  714. ret = append(ret, result...)
  715. }
  716. if len(ret) == limit || len(ret) == 0 {
  717. c.Log.Debugf("Pagination done len(ret) = %d", len(ret))
  718. break
  719. }
  720. offset += paginationSize
  721. }
  722. return ret, nil
  723. }
  724. func (c *Client) DeleteAlertGraphBatch(alertItems []*ent.Alert) (int, error) {
  725. idList := make([]int, 0)
  726. for _, alert := range alertItems {
  727. idList = append(idList, int(alert.ID))
  728. }
  729. deleted, err := c.Ent.Alert.Delete().
  730. Where(alert.IDIn(idList...)).Exec(c.CTX)
  731. if err != nil {
  732. c.Log.Warningf("DeleteAlertGraph : %s", err)
  733. return deleted, errors.Wrapf(DeleteFail, "alert graph delete batch")
  734. }
  735. c.Log.Debug("Done batch delete alerts")
  736. return deleted, nil
  737. }
  738. func (c *Client) DeleteAlertGraph(alertItem *ent.Alert) error {
  739. // delete the associated events
  740. _, err := c.Ent.Event.Delete().
  741. Where(event.HasOwnerWith(alert.IDEQ(alertItem.ID))).Exec(c.CTX)
  742. if err != nil {
  743. c.Log.Warningf("DeleteAlertGraph : %s", err)
  744. return errors.Wrapf(DeleteFail, "event with alert ID '%d'", alertItem.ID)
  745. }
  746. // delete the associated meta
  747. _, err = c.Ent.Meta.Delete().
  748. Where(meta.HasOwnerWith(alert.IDEQ(alertItem.ID))).Exec(c.CTX)
  749. if err != nil {
  750. c.Log.Warningf("DeleteAlertGraph : %s", err)
  751. return errors.Wrapf(DeleteFail, "meta with alert ID '%d'", alertItem.ID)
  752. }
  753. // delete the associated decisions
  754. _, err = c.Ent.Decision.Delete().
  755. Where(decision.HasOwnerWith(alert.IDEQ(alertItem.ID))).Exec(c.CTX)
  756. if err != nil {
  757. c.Log.Warningf("DeleteAlertGraph : %s", err)
  758. return errors.Wrapf(DeleteFail, "decision with alert ID '%d'", alertItem.ID)
  759. }
  760. // delete the alert
  761. err = c.Ent.Alert.DeleteOne(alertItem).Exec(c.CTX)
  762. if err != nil {
  763. c.Log.Warningf("DeleteAlertGraph : %s", err)
  764. return errors.Wrapf(DeleteFail, "alert with ID '%d'", alertItem.ID)
  765. }
  766. return nil
  767. }
  768. func (c *Client) DeleteAlertWithFilter(filter map[string][]string) (int, error) {
  769. var err error
  770. // Get all the alerts that match the filter
  771. alertsToDelete, err := c.QueryAlertWithFilter(filter)
  772. for _, alertItem := range alertsToDelete {
  773. err = c.DeleteAlertGraph(alertItem)
  774. if err != nil {
  775. c.Log.Warningf("DeleteAlertWithFilter : %s", err)
  776. return 0, errors.Wrapf(DeleteFail, "event with alert ID '%d'", alertItem.ID)
  777. }
  778. }
  779. return len(alertsToDelete), nil
  780. }
  781. func (c *Client) FlushOrphans() {
  782. /* While it has only been linked to some very corner-case bug : https://github.com/crowdsecurity/crowdsec/issues/778 */
  783. /* We want to take care of orphaned events for which the parent alert/decision has been deleted */
  784. events_count, err := c.Ent.Event.Delete().Where(event.Not(event.HasOwner())).Exec(c.CTX)
  785. if err != nil {
  786. c.Log.Warningf("error while deleting orphan events : %s", err)
  787. return
  788. }
  789. if events_count > 0 {
  790. c.Log.Infof("%d deleted orphan events", events_count)
  791. }
  792. events_count, err = c.Ent.Decision.Delete().Where(
  793. decision.Not(decision.HasOwner())).Where(decision.UntilLTE(time.Now())).Exec(c.CTX)
  794. if err != nil {
  795. c.Log.Warningf("error while deleting orphan decisions : %s", err)
  796. return
  797. }
  798. if events_count > 0 {
  799. c.Log.Infof("%d deleted orphan decisions", events_count)
  800. }
  801. }
  802. func (c *Client) FlushAlerts(MaxAge string, MaxItems int) error {
  803. var deletedByAge int
  804. var deletedByNbItem int
  805. var totalAlerts int
  806. var err error
  807. if !c.CanFlush {
  808. c.Log.Debug("a list is being imported, flushing later")
  809. return nil
  810. }
  811. c.Log.Debug("Flushing orphan alerts")
  812. c.FlushOrphans()
  813. c.Log.Debug("Done flushing orphan alerts")
  814. totalAlerts, err = c.TotalAlerts()
  815. if err != nil {
  816. c.Log.Warningf("FlushAlerts (max items count) : %s", err)
  817. return errors.Wrap(err, "unable to get alerts count")
  818. }
  819. c.Log.Debugf("FlushAlerts (Total alerts): %d", totalAlerts)
  820. if MaxAge != "" {
  821. filter := map[string][]string{
  822. "created_before": {MaxAge},
  823. }
  824. nbDeleted, err := c.DeleteAlertWithFilter(filter)
  825. if err != nil {
  826. c.Log.Warningf("FlushAlerts (max age) : %s", err)
  827. return errors.Wrapf(err, "unable to flush alerts with filter until: %s", MaxAge)
  828. }
  829. c.Log.Debugf("FlushAlerts (deleted max age alerts): %d", nbDeleted)
  830. deletedByAge = nbDeleted
  831. }
  832. if MaxItems > 0 {
  833. //We get the highest id for the alerts
  834. //We substract MaxItems to avoid deleting alerts that are not old enough
  835. //This gives us the oldest alert that we want to keep
  836. //We then delete all the alerts with an id lower than this one
  837. //We can do this because the id is auto-increment, and the database won't reuse the same id twice
  838. lastAlert, err := c.QueryAlertWithFilter(map[string][]string{
  839. "sort": {"DESC"},
  840. "limit": {"1"},
  841. })
  842. c.Log.Debugf("FlushAlerts (last alert): %+v", lastAlert)
  843. if err != nil {
  844. c.Log.Errorf("FlushAlerts: could not get last alert: %s", err)
  845. return errors.Wrap(err, "could not get last alert")
  846. }
  847. if len(lastAlert) != 0 {
  848. maxid := lastAlert[0].ID - MaxItems
  849. c.Log.Debugf("FlushAlerts (max id): %d", maxid)
  850. if maxid > 0 {
  851. //This may lead to orphan alerts (at least on MySQL), but the next time the flush job will run, they will be deleted
  852. deletedByNbItem, err = c.Ent.Alert.Delete().Where(alert.IDLT(maxid)).Exec(c.CTX)
  853. if err != nil {
  854. c.Log.Errorf("FlushAlerts: Could not delete alerts : %s", err)
  855. return errors.Wrap(err, "could not delete alerts")
  856. }
  857. }
  858. }
  859. }
  860. if deletedByNbItem > 0 {
  861. c.Log.Infof("flushed %d/%d alerts because max number of alerts has been reached (%d max)", deletedByNbItem, totalAlerts, MaxItems)
  862. }
  863. if deletedByAge > 0 {
  864. c.Log.Infof("flushed %d/%d alerts because they were created %s ago or more", deletedByAge, totalAlerts, MaxAge)
  865. }
  866. return nil
  867. }
  868. func (c *Client) GetAlertByID(alertID int) (*ent.Alert, error) {
  869. alert, err := c.Ent.Alert.Query().Where(alert.IDEQ(alertID)).WithDecisions().WithEvents().WithMetas().WithOwner().First(c.CTX)
  870. if err != nil {
  871. /*record not found, 404*/
  872. if ent.IsNotFound(err) {
  873. log.Warningf("GetAlertByID (not found): %s", err)
  874. return &ent.Alert{}, ItemNotFound
  875. }
  876. c.Log.Warningf("GetAlertByID : %s", err)
  877. return &ent.Alert{}, QueryFail
  878. }
  879. return alert, nil
  880. }