alerts.go 42 KB


  1. package database
  2. import (
  3. "context"
  4. "encoding/json"
  5. "fmt"
  6. "sort"
  7. "strconv"
  8. "strings"
  9. "time"
  10. "github.com/crowdsecurity/crowdsec/pkg/csconfig"
  11. "github.com/crowdsecurity/crowdsec/pkg/database/ent"
  12. "github.com/crowdsecurity/crowdsec/pkg/database/ent/alert"
  13. "github.com/crowdsecurity/crowdsec/pkg/database/ent/bouncer"
  14. "github.com/crowdsecurity/crowdsec/pkg/database/ent/decision"
  15. "github.com/crowdsecurity/crowdsec/pkg/database/ent/event"
  16. "github.com/crowdsecurity/crowdsec/pkg/database/ent/machine"
  17. "github.com/crowdsecurity/crowdsec/pkg/database/ent/meta"
  18. "github.com/crowdsecurity/crowdsec/pkg/database/ent/predicate"
  19. "github.com/crowdsecurity/crowdsec/pkg/models"
  20. "github.com/crowdsecurity/crowdsec/pkg/types"
  21. "github.com/davecgh/go-spew/spew"
  22. "github.com/pkg/errors"
  23. log "github.com/sirupsen/logrus"
  24. )
  25. const (
  26. paginationSize = 100 // used to queryAlert to avoid 'too many SQL variable'
  27. defaultLimit = 100 // default limit of element to returns when query alerts
  28. bulkSize = 50 // bulk size when create alerts
  29. decisionBulkSize = 50
  30. )
  31. func formatAlertCN(source models.Source) string {
  32. cn := source.Cn
  33. if source.AsNumber != "" {
  34. cn += "/" + source.AsNumber
  35. }
  36. return cn
  37. }
  38. func formatAlertSource(alert *models.Alert) string {
  39. if alert.Source == nil {
  40. return "empty source"
  41. }
  42. if *alert.Source.Scope == types.Ip {
  43. ret := "ip " + *alert.Source.Value
  44. cn := formatAlertCN(*alert.Source)
  45. if cn != "" {
  46. ret += " (" + cn + ")"
  47. }
  48. return ret
  49. }
  50. if *alert.Source.Scope == types.Range {
  51. ret := "range " + *alert.Source.Value
  52. cn := formatAlertCN(*alert.Source)
  53. if cn != "" {
  54. ret += " (" + cn + ")"
  55. }
  56. return ret
  57. }
  58. return *alert.Source.Scope + " " + *alert.Source.Value
  59. }
  60. func formatAlertAsString(machineId string, alert *models.Alert) []string {
  61. src := formatAlertSource(alert)
  62. /**/
  63. msg := ""
  64. if alert.Scenario != nil && *alert.Scenario != "" {
  65. msg = *alert.Scenario
  66. } else if alert.Message != nil && *alert.Message != "" {
  67. msg = *alert.Message
  68. } else {
  69. msg = "empty scenario"
  70. }
  71. reason := fmt.Sprintf("%s by %s", msg, src)
  72. if len(alert.Decisions) == 0 {
  73. return []string{fmt.Sprintf("(%s) alert : %s", machineId, reason)}
  74. }
  75. var retStr []string
  76. for i, decisionItem := range alert.Decisions {
  77. decision := ""
  78. if alert.Simulated != nil && *alert.Simulated {
  79. decision = "(simulated alert)"
  80. } else if decisionItem.Simulated != nil && *decisionItem.Simulated {
  81. decision = "(simulated decision)"
  82. }
  83. if log.GetLevel() >= log.DebugLevel {
  84. /*spew is expensive*/
  85. log.Debugf("%s", spew.Sdump(decisionItem))
  86. }
  87. if len(alert.Decisions) > 1 {
  88. reason = fmt.Sprintf("%s for %d/%d decisions", msg, i+1, len(alert.Decisions))
  89. }
  90. machineIdOrigin := ""
  91. if machineId == "" {
  92. machineIdOrigin = *decisionItem.Origin
  93. } else {
  94. machineIdOrigin = fmt.Sprintf("%s/%s", machineId, *decisionItem.Origin)
  95. }
  96. decision += fmt.Sprintf("%s %s on %s %s", *decisionItem.Duration,
  97. *decisionItem.Type, *decisionItem.Scope, *decisionItem.Value)
  98. retStr = append(retStr,
  99. fmt.Sprintf("(%s) %s : %s", machineIdOrigin, reason, decision))
  100. }
  101. return retStr
  102. }
  103. // CreateOrUpdateAlert is specific to PAPI : It checks if alert already exists, otherwise inserts it
  104. // if alert already exists, it checks it associated decisions already exists
  105. // if some associated decisions are missing (ie. previous insert ended up in error) it inserts them
  106. func (c *Client) CreateOrUpdateAlert(machineID string, alertItem *models.Alert) (string, error) {
  107. if alertItem.UUID == "" {
  108. return "", fmt.Errorf("alert UUID is empty")
  109. }
  110. alerts, err := c.Ent.Alert.Query().Where(alert.UUID(alertItem.UUID)).WithDecisions().All(c.CTX)
  111. if err != nil && !ent.IsNotFound(err) {
  112. return "", errors.Wrap(err, "unable to query alerts for uuid %s")
  113. }
  114. //alert wasn't found, insert it (expected hotpath)
  115. if ent.IsNotFound(err) || len(alerts) == 0 {
  116. ret, err := c.CreateAlert(machineID, []*models.Alert{alertItem})
  117. if err != nil {
  118. return "", errors.Wrap(err, "unable to create alert")
  119. }
  120. return ret[0], nil
  121. }
  122. //this should never happen
  123. if len(alerts) > 1 {
  124. return "", fmt.Errorf("multiple alerts found for uuid %s", alertItem.UUID)
  125. }
  126. log.Infof("Alert %s already exists, checking associated decisions", alertItem.UUID)
  127. //alert is found, check for any missing decisions
  128. missingUuids := []string{}
  129. newUuids := []string{}
  130. for _, decItem := range alertItem.Decisions {
  131. newUuids = append(newUuids, decItem.UUID)
  132. }
  133. foundAlert := alerts[0]
  134. foundUuids := []string{}
  135. for _, decItem := range foundAlert.Edges.Decisions {
  136. foundUuids = append(foundUuids, decItem.UUID)
  137. }
  138. sort.Strings(foundUuids)
  139. sort.Strings(newUuids)
  140. for idx, uuid := range newUuids {
  141. if len(foundUuids) < idx+1 || uuid != foundUuids[idx] {
  142. log.Warningf("Decision with uuid %s not found in alert %s", uuid, foundAlert.UUID)
  143. missingUuids = append(missingUuids, uuid)
  144. }
  145. }
  146. if len(missingUuids) == 0 {
  147. log.Warningf("alert %s was already complete with decisions %+v", alertItem.UUID, foundUuids)
  148. return "", nil
  149. }
  150. //add any and all missing decisions based on their uuids
  151. //prepare missing decisions
  152. missingDecisions := []*models.Decision{}
  153. for _, uuid := range missingUuids {
  154. for _, newDecision := range alertItem.Decisions {
  155. if newDecision.UUID == uuid {
  156. missingDecisions = append(missingDecisions, newDecision)
  157. }
  158. }
  159. }
  160. //add missing decisions
  161. log.Debugf("Adding %d missing decisions to alert %s", len(missingDecisions), foundAlert.UUID)
  162. decisions := make([]*ent.Decision, 0)
  163. decisionBulk := make([]*ent.DecisionCreate, 0, decisionBulkSize)
  164. for i, decisionItem := range missingDecisions {
  165. var start_ip, start_sfx, end_ip, end_sfx int64
  166. var sz int
  167. /*if the scope is IP or Range, convert the value to integers */
  168. if strings.ToLower(*decisionItem.Scope) == "ip" || strings.ToLower(*decisionItem.Scope) == "range" {
  169. sz, start_ip, start_sfx, end_ip, end_sfx, err = types.Addr2Ints(*decisionItem.Value)
  170. if err != nil {
  171. return "", errors.Wrapf(ParseDurationFail, "invalid addr/range %s : %s", *decisionItem.Value, err)
  172. }
  173. }
  174. decisionDuration, err := time.ParseDuration(*decisionItem.Duration)
  175. if err != nil {
  176. log.Warningf("invalid duration %s for decision %s", *decisionItem.Duration, decisionItem.UUID)
  177. continue
  178. }
  179. //use the created_at from the alert instead
  180. alertTime, err := time.Parse(time.RFC3339, alertItem.CreatedAt)
  181. if err != nil {
  182. log.Errorf("unable to parse alert time %s : %s", alertItem.CreatedAt, err)
  183. alertTime = time.Now()
  184. }
  185. decisionUntil := alertTime.UTC().Add(decisionDuration)
  186. decisionCreate := c.Ent.Decision.Create().
  187. SetUntil(decisionUntil).
  188. SetScenario(*decisionItem.Scenario).
  189. SetType(*decisionItem.Type).
  190. SetStartIP(start_ip).
  191. SetStartSuffix(start_sfx).
  192. SetEndIP(end_ip).
  193. SetEndSuffix(end_sfx).
  194. SetIPSize(int64(sz)).
  195. SetValue(*decisionItem.Value).
  196. SetScope(*decisionItem.Scope).
  197. SetOrigin(*decisionItem.Origin).
  198. SetSimulated(*alertItem.Simulated).
  199. SetUUID(decisionItem.UUID)
  200. decisionBulk = append(decisionBulk, decisionCreate)
  201. if len(decisionBulk) == decisionBulkSize {
  202. decisionsCreateRet, err := c.Ent.Decision.CreateBulk(decisionBulk...).Save(c.CTX)
  203. if err != nil {
  204. return "", errors.Wrapf(BulkError, "creating alert decisions: %s", err)
  205. }
  206. decisions = append(decisions, decisionsCreateRet...)
  207. if len(missingDecisions)-i <= decisionBulkSize {
  208. decisionBulk = make([]*ent.DecisionCreate, 0, (len(missingDecisions) - i))
  209. } else {
  210. decisionBulk = make([]*ent.DecisionCreate, 0, decisionBulkSize)
  211. }
  212. }
  213. }
  214. decisionsCreateRet, err := c.Ent.Decision.CreateBulk(decisionBulk...).Save(c.CTX)
  215. if err != nil {
  216. return "", errors.Wrapf(BulkError, "creating alert decisions: %s", err)
  217. }
  218. decisions = append(decisions, decisionsCreateRet...)
  219. //now that we bulk created missing decisions, let's update the alert
  220. err = c.Ent.Alert.Update().Where(alert.UUID(alertItem.UUID)).AddDecisions(decisions...).Exec(c.CTX)
  221. if err != nil {
  222. return "", errors.Wrapf(err, "updating alert %s : %s", alertItem.UUID, err)
  223. }
  224. return "", nil
  225. }
  226. func (c *Client) CreateAlert(machineID string, alertList []*models.Alert) ([]string, error) {
  227. pageStart := 0
  228. pageEnd := bulkSize
  229. ret := []string{}
  230. for {
  231. if pageEnd >= len(alertList) {
  232. results, err := c.CreateAlertBulk(machineID, alertList[pageStart:])
  233. if err != nil {
  234. return []string{}, fmt.Errorf("unable to create alerts: %s", err)
  235. }
  236. ret = append(ret, results...)
  237. break
  238. }
  239. results, err := c.CreateAlertBulk(machineID, alertList[pageStart:pageEnd])
  240. if err != nil {
  241. return []string{}, fmt.Errorf("unable to create alerts: %s", err)
  242. }
  243. ret = append(ret, results...)
  244. pageStart += bulkSize
  245. pageEnd += bulkSize
  246. }
  247. return ret, nil
  248. }
  249. // UpdateCommunityBlocklist is called to update either the community blocklist (or other lists the user subscribed to)
  250. // it takes care of creating the new alert with the associated decisions, and it will as well deleted the "older" overlapping decisions:
  251. // 1st pull, you get decisions [1,2,3]. it inserts [1,2,3]
  252. // 2nd pull, you get decisions [1,2,3,4]. it inserts [1,2,3,4] and will try to delete [1,2,3,4] with a different alert ID and same origin
  253. func (c *Client) UpdateCommunityBlocklist(alertItem *models.Alert) (int, int, int, error) {
  254. var err error
  255. if alertItem == nil {
  256. return 0, 0, 0, fmt.Errorf("nil alert")
  257. }
  258. if alertItem.StartAt == nil {
  259. return 0, 0, 0, fmt.Errorf("nil start_at")
  260. }
  261. startAtTime, err := time.Parse(time.RFC3339, *alertItem.StartAt)
  262. if err != nil {
  263. return 0, 0, 0, errors.Wrapf(ParseTimeFail, "start_at field time '%s': %s", *alertItem.StartAt, err)
  264. }
  265. if alertItem.StopAt == nil {
  266. return 0, 0, 0, fmt.Errorf("nil stop_at")
  267. }
  268. stopAtTime, err := time.Parse(time.RFC3339, *alertItem.StopAt)
  269. if err != nil {
  270. return 0, 0, 0, errors.Wrapf(ParseTimeFail, "stop_at field time '%s': %s", *alertItem.StopAt, err)
  271. }
  272. ts, err := time.Parse(time.RFC3339, *alertItem.StopAt)
  273. if err != nil {
  274. c.Log.Errorf("While parsing StartAt of item %s : %s", *alertItem.StopAt, err)
  275. ts = time.Now().UTC()
  276. }
  277. alertB := c.Ent.Alert.
  278. Create().
  279. SetScenario(*alertItem.Scenario).
  280. SetMessage(*alertItem.Message).
  281. SetEventsCount(*alertItem.EventsCount).
  282. SetStartedAt(startAtTime).
  283. SetStoppedAt(stopAtTime).
  284. SetSourceScope(*alertItem.Source.Scope).
  285. SetSourceValue(*alertItem.Source.Value).
  286. SetSourceIp(alertItem.Source.IP).
  287. SetSourceRange(alertItem.Source.Range).
  288. SetSourceAsNumber(alertItem.Source.AsNumber).
  289. SetSourceAsName(alertItem.Source.AsName).
  290. SetSourceCountry(alertItem.Source.Cn).
  291. SetSourceLatitude(alertItem.Source.Latitude).
  292. SetSourceLongitude(alertItem.Source.Longitude).
  293. SetCapacity(*alertItem.Capacity).
  294. SetLeakSpeed(*alertItem.Leakspeed).
  295. SetSimulated(*alertItem.Simulated).
  296. SetScenarioVersion(*alertItem.ScenarioVersion).
  297. SetScenarioHash(*alertItem.ScenarioHash)
  298. alertRef, err := alertB.Save(c.CTX)
  299. if err != nil {
  300. return 0, 0, 0, errors.Wrapf(BulkError, "error creating alert : %s", err)
  301. }
  302. if len(alertItem.Decisions) == 0 {
  303. return alertRef.ID, 0, 0, nil
  304. }
  305. txClient, err := c.Ent.Tx(c.CTX)
  306. if err != nil {
  307. return 0, 0, 0, errors.Wrapf(BulkError, "error creating transaction : %s", err)
  308. }
  309. decisionBulk := make([]*ent.DecisionCreate, 0, decisionBulkSize)
  310. valueList := make([]string, 0, decisionBulkSize)
  311. DecOrigin := CapiMachineID
  312. if *alertItem.Decisions[0].Origin == CapiMachineID || *alertItem.Decisions[0].Origin == CapiListsMachineID {
  313. DecOrigin = *alertItem.Decisions[0].Origin
  314. } else {
  315. log.Warningf("unexpected origin %s", *alertItem.Decisions[0].Origin)
  316. }
  317. deleted := 0
  318. inserted := 0
  319. for i, decisionItem := range alertItem.Decisions {
  320. var start_ip, start_sfx, end_ip, end_sfx int64
  321. var sz int
  322. if decisionItem.Duration == nil {
  323. log.Warning("nil duration in community decision")
  324. continue
  325. }
  326. duration, err := time.ParseDuration(*decisionItem.Duration)
  327. if err != nil {
  328. rollbackErr := txClient.Rollback()
  329. if rollbackErr != nil {
  330. log.Errorf("rollback error: %s", rollbackErr)
  331. }
  332. return 0, 0, 0, errors.Wrapf(ParseDurationFail, "decision duration '%+v' : %s", *decisionItem.Duration, err)
  333. }
  334. if decisionItem.Scope == nil {
  335. log.Warning("nil scope in community decision")
  336. continue
  337. }
  338. /*if the scope is IP or Range, convert the value to integers */
  339. if strings.ToLower(*decisionItem.Scope) == "ip" || strings.ToLower(*decisionItem.Scope) == "range" {
  340. sz, start_ip, start_sfx, end_ip, end_sfx, err = types.Addr2Ints(*decisionItem.Value)
  341. if err != nil {
  342. rollbackErr := txClient.Rollback()
  343. if rollbackErr != nil {
  344. log.Errorf("rollback error: %s", rollbackErr)
  345. }
  346. return 0, 0, 0, errors.Wrapf(ParseDurationFail, "invalid addr/range %s : %s", *decisionItem.Value, err)
  347. }
  348. }
  349. /*bulk insert some new decisions*/
  350. decisionBulk = append(decisionBulk, c.Ent.Decision.Create().
  351. SetUntil(ts.Add(duration)).
  352. SetScenario(*decisionItem.Scenario).
  353. SetType(*decisionItem.Type).
  354. SetStartIP(start_ip).
  355. SetStartSuffix(start_sfx).
  356. SetEndIP(end_ip).
  357. SetEndSuffix(end_sfx).
  358. SetIPSize(int64(sz)).
  359. SetValue(*decisionItem.Value).
  360. SetScope(*decisionItem.Scope).
  361. SetOrigin(*decisionItem.Origin).
  362. SetSimulated(*alertItem.Simulated).
  363. SetOwner(alertRef))
  364. /*for bulk delete of duplicate decisions*/
  365. if decisionItem.Value == nil {
  366. log.Warning("nil value in community decision")
  367. continue
  368. }
  369. valueList = append(valueList, *decisionItem.Value)
  370. if len(decisionBulk) == decisionBulkSize {
  371. insertedDecisions, err := txClient.Decision.CreateBulk(decisionBulk...).Save(c.CTX)
  372. if err != nil {
  373. rollbackErr := txClient.Rollback()
  374. if rollbackErr != nil {
  375. log.Errorf("rollback error: %s", rollbackErr)
  376. }
  377. return 0, 0, 0, errors.Wrapf(BulkError, "bulk creating decisions : %s", err)
  378. }
  379. inserted += len(insertedDecisions)
  380. /*Deleting older decisions from capi*/
  381. deletedDecisions, err := txClient.Decision.Delete().
  382. Where(decision.And(
  383. decision.OriginEQ(DecOrigin),
  384. decision.Not(decision.HasOwnerWith(alert.IDEQ(alertRef.ID))),
  385. decision.ValueIn(valueList...),
  386. )).Exec(c.CTX)
  387. if err != nil {
  388. rollbackErr := txClient.Rollback()
  389. if rollbackErr != nil {
  390. log.Errorf("rollback error: %s", rollbackErr)
  391. }
  392. return 0, 0, 0, errors.Wrap(err, "while deleting older community blocklist decisions")
  393. }
  394. deleted += deletedDecisions
  395. if len(alertItem.Decisions)-i <= decisionBulkSize {
  396. decisionBulk = make([]*ent.DecisionCreate, 0, (len(alertItem.Decisions) - i))
  397. valueList = make([]string, 0, (len(alertItem.Decisions) - i))
  398. } else {
  399. decisionBulk = make([]*ent.DecisionCreate, 0, decisionBulkSize)
  400. valueList = make([]string, 0, decisionBulkSize)
  401. }
  402. }
  403. }
  404. log.Debugf("deleted %d decisions for %s vs %s", deleted, DecOrigin, *alertItem.Decisions[0].Origin)
  405. insertedDecisions, err := txClient.Decision.CreateBulk(decisionBulk...).Save(c.CTX)
  406. if err != nil {
  407. return 0, 0, 0, errors.Wrapf(BulkError, "creating alert decisions: %s", err)
  408. }
  409. inserted += len(insertedDecisions)
  410. /*Deleting older decisions from capi*/
  411. if len(valueList) > 0 {
  412. deletedDecisions, err := txClient.Decision.Delete().
  413. Where(decision.And(
  414. decision.OriginEQ(DecOrigin),
  415. decision.Not(decision.HasOwnerWith(alert.IDEQ(alertRef.ID))),
  416. decision.ValueIn(valueList...),
  417. )).Exec(c.CTX)
  418. if err != nil {
  419. rollbackErr := txClient.Rollback()
  420. if rollbackErr != nil {
  421. log.Errorf("rollback error: %s", rollbackErr)
  422. }
  423. return 0, 0, 0, errors.Wrap(err, "while deleting older community blocklist decisions")
  424. }
  425. deleted += deletedDecisions
  426. }
  427. err = txClient.Commit()
  428. if err != nil {
  429. rollbackErr := txClient.Rollback()
  430. if rollbackErr != nil {
  431. log.Errorf("rollback error: %s", rollbackErr)
  432. }
  433. return 0, 0, 0, errors.Wrapf(BulkError, "error committing transaction : %s", err)
  434. }
  435. return alertRef.ID, inserted, deleted, nil
  436. }
  437. func chunkDecisions(decisions []*ent.Decision, chunkSize int) [][]*ent.Decision {
  438. var ret [][]*ent.Decision
  439. var chunk []*ent.Decision
  440. for _, d := range decisions {
  441. chunk = append(chunk, d)
  442. if len(chunk) == chunkSize {
  443. ret = append(ret, chunk)
  444. chunk = nil
  445. }
  446. }
  447. if len(chunk) > 0 {
  448. ret = append(ret, chunk)
  449. }
  450. return ret
  451. }
  452. func (c *Client) CreateAlertBulk(machineId string, alertList []*models.Alert) ([]string, error) {
  453. ret := []string{}
  454. bulkSize := 20
  455. var owner *ent.Machine
  456. var err error
  457. if machineId != "" {
  458. owner, err = c.QueryMachineByID(machineId)
  459. if err != nil {
  460. if errors.Cause(err) != UserNotExists {
  461. return []string{}, errors.Wrapf(QueryFail, "machine '%s': %s", machineId, err)
  462. }
  463. c.Log.Debugf("CreateAlertBulk: Machine Id %s doesn't exist", machineId)
  464. owner = nil
  465. }
  466. } else {
  467. owner = nil
  468. }
  469. c.Log.Debugf("writing %d items", len(alertList))
  470. bulk := make([]*ent.AlertCreate, 0, bulkSize)
  471. alertDecisions := make([][]*ent.Decision, 0, bulkSize)
  472. for i, alertItem := range alertList {
  473. var decisions []*ent.Decision
  474. var metas []*ent.Meta
  475. var events []*ent.Event
  476. startAtTime, err := time.Parse(time.RFC3339, *alertItem.StartAt)
  477. if err != nil {
  478. c.Log.Errorf("CreateAlertBulk: Failed to parse startAtTime '%s', defaulting to now: %s", *alertItem.StartAt, err)
  479. startAtTime = time.Now().UTC()
  480. }
  481. stopAtTime, err := time.Parse(time.RFC3339, *alertItem.StopAt)
  482. if err != nil {
  483. c.Log.Errorf("CreateAlertBulk: Failed to parse stopAtTime '%s', defaulting to now: %s", *alertItem.StopAt, err)
  484. stopAtTime = time.Now().UTC()
  485. }
  486. /*display proper alert in logs*/
  487. for _, disp := range formatAlertAsString(machineId, alertItem) {
  488. c.Log.Info(disp)
  489. }
  490. //let's track when we strip or drop data, notify outside of loop to avoid spam
  491. stripped := false
  492. dropped := false
  493. if len(alertItem.Events) > 0 {
  494. eventBulk := make([]*ent.EventCreate, len(alertItem.Events))
  495. for i, eventItem := range alertItem.Events {
  496. ts, err := time.Parse(time.RFC3339, *eventItem.Timestamp)
  497. if err != nil {
  498. c.Log.Errorf("CreateAlertBulk: Failed to parse event timestamp '%s', defaulting to now: %s", *eventItem.Timestamp, err)
  499. ts = time.Now().UTC()
  500. }
  501. marshallMetas, err := json.Marshal(eventItem.Meta)
  502. if err != nil {
  503. return []string{}, errors.Wrapf(MarshalFail, "event meta '%v' : %s", eventItem.Meta, err)
  504. }
  505. //the serialized field is too big, let's try to progressively strip it
  506. if event.SerializedValidator(string(marshallMetas)) != nil {
  507. stripped = true
  508. valid := false
  509. stripSize := 2048
  510. for !valid && stripSize > 0 {
  511. for _, serializedItem := range eventItem.Meta {
  512. if len(serializedItem.Value) > stripSize*2 {
  513. serializedItem.Value = serializedItem.Value[:stripSize] + "<stripped>"
  514. }
  515. }
  516. marshallMetas, err = json.Marshal(eventItem.Meta)
  517. if err != nil {
  518. return []string{}, errors.Wrapf(MarshalFail, "event meta '%v' : %s", eventItem.Meta, err)
  519. }
  520. if event.SerializedValidator(string(marshallMetas)) == nil {
  521. valid = true
  522. }
  523. stripSize /= 2
  524. }
  525. //nothing worked, drop it
  526. if !valid {
  527. dropped = true
  528. stripped = false
  529. marshallMetas = []byte("")
  530. }
  531. }
  532. eventBulk[i] = c.Ent.Event.Create().
  533. SetTime(ts).
  534. SetSerialized(string(marshallMetas))
  535. }
  536. if stripped {
  537. c.Log.Warningf("stripped 'serialized' field (machine %s / scenario %s)", machineId, *alertItem.Scenario)
  538. }
  539. if dropped {
  540. c.Log.Warningf("dropped 'serialized' field (machine %s / scenario %s)", machineId, *alertItem.Scenario)
  541. }
  542. events, err = c.Ent.Event.CreateBulk(eventBulk...).Save(c.CTX)
  543. if err != nil {
  544. return []string{}, errors.Wrapf(BulkError, "creating alert events: %s", err)
  545. }
  546. }
  547. if len(alertItem.Meta) > 0 {
  548. metaBulk := make([]*ent.MetaCreate, len(alertItem.Meta))
  549. for i, metaItem := range alertItem.Meta {
  550. metaBulk[i] = c.Ent.Meta.Create().
  551. SetKey(metaItem.Key).
  552. SetValue(metaItem.Value)
  553. }
  554. metas, err = c.Ent.Meta.CreateBulk(metaBulk...).Save(c.CTX)
  555. if err != nil {
  556. return []string{}, errors.Wrapf(BulkError, "creating alert meta: %s", err)
  557. }
  558. }
  559. decisions = make([]*ent.Decision, 0)
  560. if len(alertItem.Decisions) > 0 {
  561. decisionBulk := make([]*ent.DecisionCreate, 0, decisionBulkSize)
  562. for i, decisionItem := range alertItem.Decisions {
  563. var start_ip, start_sfx, end_ip, end_sfx int64
  564. var sz int
  565. duration, err := time.ParseDuration(*decisionItem.Duration)
  566. if err != nil {
  567. return []string{}, errors.Wrapf(ParseDurationFail, "decision duration '%+v' : %s", *decisionItem.Duration, err)
  568. }
  569. /*if the scope is IP or Range, convert the value to integers */
  570. if strings.ToLower(*decisionItem.Scope) == "ip" || strings.ToLower(*decisionItem.Scope) == "range" {
  571. sz, start_ip, start_sfx, end_ip, end_sfx, err = types.Addr2Ints(*decisionItem.Value)
  572. if err != nil {
  573. return []string{}, errors.Wrapf(ParseDurationFail, "invalid addr/range %s : %s", *decisionItem.Value, err)
  574. }
  575. }
  576. decisionCreate := c.Ent.Decision.Create().
  577. SetUntil(stopAtTime.Add(duration)).
  578. SetScenario(*decisionItem.Scenario).
  579. SetType(*decisionItem.Type).
  580. SetStartIP(start_ip).
  581. SetStartSuffix(start_sfx).
  582. SetEndIP(end_ip).
  583. SetEndSuffix(end_sfx).
  584. SetIPSize(int64(sz)).
  585. SetValue(*decisionItem.Value).
  586. SetScope(*decisionItem.Scope).
  587. SetOrigin(*decisionItem.Origin).
  588. SetSimulated(*alertItem.Simulated).
  589. SetUUID(decisionItem.UUID)
  590. decisionBulk = append(decisionBulk, decisionCreate)
  591. if len(decisionBulk) == decisionBulkSize {
  592. decisionsCreateRet, err := c.Ent.Decision.CreateBulk(decisionBulk...).Save(c.CTX)
  593. if err != nil {
  594. return []string{}, errors.Wrapf(BulkError, "creating alert decisions: %s", err)
  595. }
  596. decisions = append(decisions, decisionsCreateRet...)
  597. if len(alertItem.Decisions)-i <= decisionBulkSize {
  598. decisionBulk = make([]*ent.DecisionCreate, 0, (len(alertItem.Decisions) - i))
  599. } else {
  600. decisionBulk = make([]*ent.DecisionCreate, 0, decisionBulkSize)
  601. }
  602. }
  603. }
  604. decisionsCreateRet, err := c.Ent.Decision.CreateBulk(decisionBulk...).Save(c.CTX)
  605. if err != nil {
  606. return []string{}, errors.Wrapf(BulkError, "creating alert decisions: %s", err)
  607. }
  608. decisions = append(decisions, decisionsCreateRet...)
  609. }
  610. alertB := c.Ent.Alert.
  611. Create().
  612. SetScenario(*alertItem.Scenario).
  613. SetMessage(*alertItem.Message).
  614. SetEventsCount(*alertItem.EventsCount).
  615. SetStartedAt(startAtTime).
  616. SetStoppedAt(stopAtTime).
  617. SetSourceScope(*alertItem.Source.Scope).
  618. SetSourceValue(*alertItem.Source.Value).
  619. SetSourceIp(alertItem.Source.IP).
  620. SetSourceRange(alertItem.Source.Range).
  621. SetSourceAsNumber(alertItem.Source.AsNumber).
  622. SetSourceAsName(alertItem.Source.AsName).
  623. SetSourceCountry(alertItem.Source.Cn).
  624. SetSourceLatitude(alertItem.Source.Latitude).
  625. SetSourceLongitude(alertItem.Source.Longitude).
  626. SetCapacity(*alertItem.Capacity).
  627. SetLeakSpeed(*alertItem.Leakspeed).
  628. SetSimulated(*alertItem.Simulated).
  629. SetScenarioVersion(*alertItem.ScenarioVersion).
  630. SetScenarioHash(*alertItem.ScenarioHash).
  631. SetUUID(alertItem.UUID).
  632. AddEvents(events...).
  633. AddMetas(metas...)
  634. if owner != nil {
  635. alertB.SetOwner(owner)
  636. }
  637. bulk = append(bulk, alertB)
  638. alertDecisions = append(alertDecisions, decisions)
  639. if len(bulk) == bulkSize {
  640. alerts, err := c.Ent.Alert.CreateBulk(bulk...).Save(c.CTX)
  641. if err != nil {
  642. return []string{}, errors.Wrapf(BulkError, "bulk creating alert : %s", err)
  643. }
  644. for alertIndex, a := range alerts {
  645. ret = append(ret, strconv.Itoa(a.ID))
  646. d := alertDecisions[alertIndex]
  647. decisionsChunk := chunkDecisions(d, bulkSize)
  648. for _, d2 := range decisionsChunk {
  649. _, err := c.Ent.Alert.Update().Where(alert.IDEQ(a.ID)).AddDecisions(d2...).Save(c.CTX)
  650. if err != nil {
  651. return []string{}, fmt.Errorf("error while updating decisions: %s", err)
  652. }
  653. }
  654. }
  655. if len(alertList)-i <= bulkSize {
  656. bulk = make([]*ent.AlertCreate, 0, (len(alertList) - i))
  657. alertDecisions = make([][]*ent.Decision, 0, (len(alertList) - i))
  658. } else {
  659. bulk = make([]*ent.AlertCreate, 0, bulkSize)
  660. alertDecisions = make([][]*ent.Decision, 0, bulkSize)
  661. }
  662. }
  663. }
  664. alerts, err := c.Ent.Alert.CreateBulk(bulk...).Save(c.CTX)
  665. if err != nil {
  666. return []string{}, errors.Wrapf(BulkError, "leftovers creating alert : %s", err)
  667. }
  668. for alertIndex, a := range alerts {
  669. ret = append(ret, strconv.Itoa(a.ID))
  670. d := alertDecisions[alertIndex]
  671. decisionsChunk := chunkDecisions(d, bulkSize)
  672. for _, d2 := range decisionsChunk {
  673. _, err := c.Ent.Alert.Update().Where(alert.IDEQ(a.ID)).AddDecisions(d2...).Save(c.CTX)
  674. if err != nil {
  675. return []string{}, fmt.Errorf("error while updating decisions: %s", err)
  676. }
  677. }
  678. }
  679. return ret, nil
  680. }
  681. func AlertPredicatesFromFilter(filter map[string][]string) ([]predicate.Alert, error) {
  682. predicates := make([]predicate.Alert, 0)
  683. var err error
  684. var start_ip, start_sfx, end_ip, end_sfx int64
  685. var hasActiveDecision bool
  686. var ip_sz int
  687. var contains bool = true
  688. /*if contains is true, return bans that *contains* the given value (value is the inner)
  689. else, return bans that are *contained* by the given value (value is the outer)*/
  690. /*the simulated filter is a bit different : if it's not present *or* set to false, specifically exclude records with simulated to true */
  691. if v, ok := filter["simulated"]; ok {
  692. if v[0] == "false" {
  693. predicates = append(predicates, alert.SimulatedEQ(false))
  694. }
  695. }
  696. if _, ok := filter["origin"]; ok {
  697. filter["include_capi"] = []string{"true"}
  698. }
  699. for param, value := range filter {
  700. switch param {
  701. case "contains":
  702. contains, err = strconv.ParseBool(value[0])
  703. if err != nil {
  704. return nil, errors.Wrapf(InvalidFilter, "invalid contains value : %s", err)
  705. }
  706. case "scope":
  707. var scope string = value[0]
  708. if strings.ToLower(scope) == "ip" {
  709. scope = types.Ip
  710. } else if strings.ToLower(scope) == "range" {
  711. scope = types.Range
  712. }
  713. predicates = append(predicates, alert.SourceScopeEQ(scope))
  714. case "value":
  715. predicates = append(predicates, alert.SourceValueEQ(value[0]))
  716. case "scenario":
  717. predicates = append(predicates, alert.HasDecisionsWith(decision.ScenarioEQ(value[0])))
  718. case "ip", "range":
  719. ip_sz, start_ip, start_sfx, end_ip, end_sfx, err = types.Addr2Ints(value[0])
  720. if err != nil {
  721. return nil, errors.Wrapf(InvalidIPOrRange, "unable to convert '%s' to int: %s", value[0], err)
  722. }
  723. case "since":
  724. duration, err := types.ParseDuration(value[0])
  725. if err != nil {
  726. return nil, errors.Wrap(err, "while parsing duration")
  727. }
  728. since := time.Now().UTC().Add(-duration)
  729. if since.IsZero() {
  730. return nil, fmt.Errorf("Empty time now() - %s", since.String())
  731. }
  732. predicates = append(predicates, alert.StartedAtGTE(since))
  733. case "created_before":
  734. duration, err := types.ParseDuration(value[0])
  735. if err != nil {
  736. return nil, errors.Wrap(err, "while parsing duration")
  737. }
  738. since := time.Now().UTC().Add(-duration)
  739. if since.IsZero() {
  740. return nil, fmt.Errorf("Empty time now() - %s", since.String())
  741. }
  742. predicates = append(predicates, alert.CreatedAtLTE(since))
  743. case "until":
  744. duration, err := types.ParseDuration(value[0])
  745. if err != nil {
  746. return nil, errors.Wrap(err, "while parsing duration")
  747. }
  748. until := time.Now().UTC().Add(-duration)
  749. if until.IsZero() {
  750. return nil, fmt.Errorf("Empty time now() - %s", until.String())
  751. }
  752. predicates = append(predicates, alert.StartedAtLTE(until))
  753. case "decision_type":
  754. predicates = append(predicates, alert.HasDecisionsWith(decision.TypeEQ(value[0])))
  755. case "origin":
  756. predicates = append(predicates, alert.HasDecisionsWith(decision.OriginEQ(value[0])))
  757. case "include_capi": //allows to exclude one or more specific origins
  758. if value[0] == "false" {
  759. predicates = append(predicates, alert.HasDecisionsWith(
  760. decision.Or(decision.OriginEQ(types.CrowdSecOrigin),
  761. decision.OriginEQ(types.CscliOrigin),
  762. decision.OriginEQ(types.ConsoleOrigin),
  763. decision.OriginEQ(types.CscliImportOrigin))))
  764. } else if value[0] != "true" {
  765. log.Errorf("Invalid bool '%s' for include_capi", value[0])
  766. }
  767. case "has_active_decision":
  768. if hasActiveDecision, err = strconv.ParseBool(value[0]); err != nil {
  769. return nil, errors.Wrapf(ParseType, "'%s' is not a boolean: %s", value[0], err)
  770. }
  771. if hasActiveDecision {
  772. predicates = append(predicates, alert.HasDecisionsWith(decision.UntilGTE(time.Now().UTC())))
  773. } else {
  774. predicates = append(predicates, alert.Not(alert.HasDecisions()))
  775. }
  776. case "limit":
  777. continue
  778. case "sort":
  779. continue
  780. case "simulated":
  781. continue
  782. default:
  783. return nil, errors.Wrapf(InvalidFilter, "Filter parameter '%s' is unknown (=%s)", param, value[0])
  784. }
  785. }
  786. if ip_sz == 4 {
  787. if contains { /*decision contains {start_ip,end_ip}*/
  788. predicates = append(predicates, alert.And(
  789. alert.HasDecisionsWith(decision.StartIPLTE(start_ip)),
  790. alert.HasDecisionsWith(decision.EndIPGTE(end_ip)),
  791. alert.HasDecisionsWith(decision.IPSizeEQ(int64(ip_sz))),
  792. ))
  793. } else { /*decision is contained within {start_ip,end_ip}*/
  794. predicates = append(predicates, alert.And(
  795. alert.HasDecisionsWith(decision.StartIPGTE(start_ip)),
  796. alert.HasDecisionsWith(decision.EndIPLTE(end_ip)),
  797. alert.HasDecisionsWith(decision.IPSizeEQ(int64(ip_sz))),
  798. ))
  799. }
  800. } else if ip_sz == 16 {
  801. if contains { /*decision contains {start_ip,end_ip}*/
  802. predicates = append(predicates, alert.And(
  803. //matching addr size
  804. alert.HasDecisionsWith(decision.IPSizeEQ(int64(ip_sz))),
  805. alert.Or(
  806. //decision.start_ip < query.start_ip
  807. alert.HasDecisionsWith(decision.StartIPLT(start_ip)),
  808. alert.And(
  809. //decision.start_ip == query.start_ip
  810. alert.HasDecisionsWith(decision.StartIPEQ(start_ip)),
  811. //decision.start_suffix <= query.start_suffix
  812. alert.HasDecisionsWith(decision.StartSuffixLTE(start_sfx)),
  813. )),
  814. alert.Or(
  815. //decision.end_ip > query.end_ip
  816. alert.HasDecisionsWith(decision.EndIPGT(end_ip)),
  817. alert.And(
  818. //decision.end_ip == query.end_ip
  819. alert.HasDecisionsWith(decision.EndIPEQ(end_ip)),
  820. //decision.end_suffix >= query.end_suffix
  821. alert.HasDecisionsWith(decision.EndSuffixGTE(end_sfx)),
  822. ),
  823. ),
  824. ))
  825. } else { /*decision is contained within {start_ip,end_ip}*/
  826. predicates = append(predicates, alert.And(
  827. //matching addr size
  828. alert.HasDecisionsWith(decision.IPSizeEQ(int64(ip_sz))),
  829. alert.Or(
  830. //decision.start_ip > query.start_ip
  831. alert.HasDecisionsWith(decision.StartIPGT(start_ip)),
  832. alert.And(
  833. //decision.start_ip == query.start_ip
  834. alert.HasDecisionsWith(decision.StartIPEQ(start_ip)),
  835. //decision.start_suffix >= query.start_suffix
  836. alert.HasDecisionsWith(decision.StartSuffixGTE(start_sfx)),
  837. )),
  838. alert.Or(
  839. //decision.end_ip < query.end_ip
  840. alert.HasDecisionsWith(decision.EndIPLT(end_ip)),
  841. alert.And(
  842. //decision.end_ip == query.end_ip
  843. alert.HasDecisionsWith(decision.EndIPEQ(end_ip)),
  844. //decision.end_suffix <= query.end_suffix
  845. alert.HasDecisionsWith(decision.EndSuffixLTE(end_sfx)),
  846. ),
  847. ),
  848. ))
  849. }
  850. } else if ip_sz != 0 {
  851. return nil, errors.Wrapf(InvalidFilter, "Unknown ip size %d", ip_sz)
  852. }
  853. return predicates, nil
  854. }
  855. func BuildAlertRequestFromFilter(alerts *ent.AlertQuery, filter map[string][]string) (*ent.AlertQuery, error) {
  856. preds, err := AlertPredicatesFromFilter(filter)
  857. if err != nil {
  858. return nil, err
  859. }
  860. return alerts.Where(preds...), nil
  861. }
  862. func (c *Client) AlertsCountPerScenario(filters map[string][]string) (map[string]int, error) {
  863. var res []struct {
  864. Scenario string
  865. Count int
  866. }
  867. ctx := context.Background()
  868. query := c.Ent.Alert.Query()
  869. query, err := BuildAlertRequestFromFilter(query, filters)
  870. if err != nil {
  871. return nil, errors.Wrap(err, "failed to build alert request")
  872. }
  873. err = query.GroupBy(alert.FieldScenario).Aggregate(ent.Count()).Scan(ctx, &res)
  874. if err != nil {
  875. return nil, errors.Wrap(err, "failed to count alerts per scenario")
  876. }
  877. counts := make(map[string]int)
  878. for _, r := range res {
  879. counts[r.Scenario] = r.Count
  880. }
  881. return counts, nil
  882. }
  883. func (c *Client) TotalAlerts() (int, error) {
  884. return c.Ent.Alert.Query().Count(c.CTX)
  885. }
  886. func (c *Client) QueryAlertWithFilter(filter map[string][]string) ([]*ent.Alert, error) {
  887. sort := "DESC" // we sort by desc by default
  888. if val, ok := filter["sort"]; ok {
  889. if val[0] != "ASC" && val[0] != "DESC" {
  890. c.Log.Errorf("invalid 'sort' parameter: %s", val)
  891. } else {
  892. sort = val[0]
  893. }
  894. }
  895. limit := defaultLimit
  896. if val, ok := filter["limit"]; ok {
  897. limitConv, err := strconv.Atoi(val[0])
  898. if err != nil {
  899. return []*ent.Alert{}, errors.Wrapf(QueryFail, "bad limit in parameters: %s", val)
  900. }
  901. limit = limitConv
  902. }
  903. offset := 0
  904. ret := make([]*ent.Alert, 0)
  905. for {
  906. alerts := c.Ent.Alert.Query()
  907. alerts, err := BuildAlertRequestFromFilter(alerts, filter)
  908. if err != nil {
  909. return []*ent.Alert{}, err
  910. }
  911. alerts = alerts.
  912. WithDecisions().
  913. WithEvents().
  914. WithMetas().
  915. WithOwner()
  916. if limit == 0 {
  917. limit, err = alerts.Count(c.CTX)
  918. if err != nil {
  919. return []*ent.Alert{}, fmt.Errorf("unable to count nb alerts: %s", err)
  920. }
  921. }
  922. if sort == "ASC" {
  923. alerts = alerts.Order(ent.Asc(alert.FieldCreatedAt), ent.Asc(alert.FieldID))
  924. } else {
  925. alerts = alerts.Order(ent.Desc(alert.FieldCreatedAt), ent.Desc(alert.FieldID))
  926. }
  927. result, err := alerts.Limit(paginationSize).Offset(offset).All(c.CTX)
  928. if err != nil {
  929. return []*ent.Alert{}, errors.Wrapf(QueryFail, "pagination size: %d, offset: %d: %s", paginationSize, offset, err)
  930. }
  931. if diff := limit - len(ret); diff < paginationSize {
  932. if len(result) < diff {
  933. ret = append(ret, result...)
  934. c.Log.Debugf("Pagination done, %d < %d", len(result), diff)
  935. break
  936. }
  937. ret = append(ret, result[0:diff]...)
  938. } else {
  939. ret = append(ret, result...)
  940. }
  941. if len(ret) == limit || len(ret) == 0 || len(ret) < paginationSize {
  942. c.Log.Debugf("Pagination done len(ret) = %d", len(ret))
  943. break
  944. }
  945. offset += paginationSize
  946. }
  947. return ret, nil
  948. }
  949. func (c *Client) DeleteAlertGraphBatch(alertItems []*ent.Alert) (int, error) {
  950. idList := make([]int, 0)
  951. for _, alert := range alertItems {
  952. idList = append(idList, alert.ID)
  953. }
  954. _, err := c.Ent.Event.Delete().
  955. Where(event.HasOwnerWith(alert.IDIn(idList...))).Exec(c.CTX)
  956. if err != nil {
  957. c.Log.Warningf("DeleteAlertGraphBatch : %s", err)
  958. return 0, errors.Wrapf(DeleteFail, "alert graph delete batch events")
  959. }
  960. _, err = c.Ent.Meta.Delete().
  961. Where(meta.HasOwnerWith(alert.IDIn(idList...))).Exec(c.CTX)
  962. if err != nil {
  963. c.Log.Warningf("DeleteAlertGraphBatch : %s", err)
  964. return 0, errors.Wrapf(DeleteFail, "alert graph delete batch meta")
  965. }
  966. _, err = c.Ent.Decision.Delete().
  967. Where(decision.HasOwnerWith(alert.IDIn(idList...))).Exec(c.CTX)
  968. if err != nil {
  969. c.Log.Warningf("DeleteAlertGraphBatch : %s", err)
  970. return 0, errors.Wrapf(DeleteFail, "alert graph delete batch decisions")
  971. }
  972. deleted, err := c.Ent.Alert.Delete().
  973. Where(alert.IDIn(idList...)).Exec(c.CTX)
  974. if err != nil {
  975. c.Log.Warningf("DeleteAlertGraphBatch : %s", err)
  976. return deleted, errors.Wrapf(DeleteFail, "alert graph delete batch")
  977. }
  978. c.Log.Debug("Done batch delete alerts")
  979. return deleted, nil
  980. }
  981. func (c *Client) DeleteAlertGraph(alertItem *ent.Alert) error {
  982. // delete the associated events
  983. _, err := c.Ent.Event.Delete().
  984. Where(event.HasOwnerWith(alert.IDEQ(alertItem.ID))).Exec(c.CTX)
  985. if err != nil {
  986. c.Log.Warningf("DeleteAlertGraph : %s", err)
  987. return errors.Wrapf(DeleteFail, "event with alert ID '%d'", alertItem.ID)
  988. }
  989. // delete the associated meta
  990. _, err = c.Ent.Meta.Delete().
  991. Where(meta.HasOwnerWith(alert.IDEQ(alertItem.ID))).Exec(c.CTX)
  992. if err != nil {
  993. c.Log.Warningf("DeleteAlertGraph : %s", err)
  994. return errors.Wrapf(DeleteFail, "meta with alert ID '%d'", alertItem.ID)
  995. }
  996. // delete the associated decisions
  997. _, err = c.Ent.Decision.Delete().
  998. Where(decision.HasOwnerWith(alert.IDEQ(alertItem.ID))).Exec(c.CTX)
  999. if err != nil {
  1000. c.Log.Warningf("DeleteAlertGraph : %s", err)
  1001. return errors.Wrapf(DeleteFail, "decision with alert ID '%d'", alertItem.ID)
  1002. }
  1003. // delete the alert
  1004. err = c.Ent.Alert.DeleteOne(alertItem).Exec(c.CTX)
  1005. if err != nil {
  1006. c.Log.Warningf("DeleteAlertGraph : %s", err)
  1007. return errors.Wrapf(DeleteFail, "alert with ID '%d'", alertItem.ID)
  1008. }
  1009. return nil
  1010. }
  1011. func (c *Client) DeleteAlertByID(id int) error {
  1012. alertItem, err := c.Ent.Alert.Query().Where(alert.IDEQ(id)).Only(c.CTX)
  1013. if err != nil {
  1014. return err
  1015. }
  1016. return c.DeleteAlertGraph(alertItem)
  1017. }
  1018. func (c *Client) DeleteAlertWithFilter(filter map[string][]string) (int, error) {
  1019. preds, err := AlertPredicatesFromFilter(filter)
  1020. if err != nil {
  1021. return 0, err
  1022. }
  1023. return c.Ent.Alert.Delete().Where(preds...).Exec(c.CTX)
  1024. }
  1025. func (c *Client) FlushOrphans() {
  1026. /* While it has only been linked to some very corner-case bug : https://github.com/crowdsecurity/crowdsec/issues/778 */
  1027. /* We want to take care of orphaned events for which the parent alert/decision has been deleted */
  1028. events_count, err := c.Ent.Event.Delete().Where(event.Not(event.HasOwner())).Exec(c.CTX)
  1029. if err != nil {
  1030. c.Log.Warningf("error while deleting orphan events : %s", err)
  1031. return
  1032. }
  1033. if events_count > 0 {
  1034. c.Log.Infof("%d deleted orphan events", events_count)
  1035. }
  1036. events_count, err = c.Ent.Decision.Delete().Where(
  1037. decision.Not(decision.HasOwner())).Where(decision.UntilLTE(time.Now().UTC())).Exec(c.CTX)
  1038. if err != nil {
  1039. c.Log.Warningf("error while deleting orphan decisions : %s", err)
  1040. return
  1041. }
  1042. if events_count > 0 {
  1043. c.Log.Infof("%d deleted orphan decisions", events_count)
  1044. }
  1045. }
  1046. func (c *Client) FlushAgentsAndBouncers(agentsCfg *csconfig.AuthGCCfg, bouncersCfg *csconfig.AuthGCCfg) error {
  1047. log.Debug("starting FlushAgentsAndBouncers")
  1048. if bouncersCfg != nil {
  1049. if bouncersCfg.ApiDuration != nil {
  1050. log.Debug("trying to delete old bouncers from api")
  1051. deletionCount, err := c.Ent.Bouncer.Delete().Where(
  1052. bouncer.LastPullLTE(time.Now().UTC().Add(-*bouncersCfg.ApiDuration)),
  1053. ).Where(
  1054. bouncer.AuthTypeEQ(types.ApiKeyAuthType),
  1055. ).Exec(c.CTX)
  1056. if err != nil {
  1057. c.Log.Errorf("while auto-deleting expired bouncers (api key) : %s", err)
  1058. } else if deletionCount > 0 {
  1059. c.Log.Infof("deleted %d expired bouncers (api auth)", deletionCount)
  1060. }
  1061. }
  1062. if bouncersCfg.CertDuration != nil {
  1063. log.Debug("trying to delete old bouncers from cert")
  1064. deletionCount, err := c.Ent.Bouncer.Delete().Where(
  1065. bouncer.LastPullLTE(time.Now().UTC().Add(-*bouncersCfg.CertDuration)),
  1066. ).Where(
  1067. bouncer.AuthTypeEQ(types.TlsAuthType),
  1068. ).Exec(c.CTX)
  1069. if err != nil {
  1070. c.Log.Errorf("while auto-deleting expired bouncers (api key) : %s", err)
  1071. } else if deletionCount > 0 {
  1072. c.Log.Infof("deleted %d expired bouncers (api auth)", deletionCount)
  1073. }
  1074. }
  1075. }
  1076. if agentsCfg != nil {
  1077. if agentsCfg.CertDuration != nil {
  1078. log.Debug("trying to delete old agents from cert")
  1079. deletionCount, err := c.Ent.Machine.Delete().Where(
  1080. machine.LastHeartbeatLTE(time.Now().UTC().Add(-*agentsCfg.CertDuration)),
  1081. ).Where(
  1082. machine.Not(machine.HasAlerts()),
  1083. ).Where(
  1084. machine.AuthTypeEQ(types.TlsAuthType),
  1085. ).Exec(c.CTX)
  1086. log.Debugf("deleted %d entries", deletionCount)
  1087. if err != nil {
  1088. c.Log.Errorf("while auto-deleting expired machine (cert) : %s", err)
  1089. } else if deletionCount > 0 {
  1090. c.Log.Infof("deleted %d expired machine (cert auth)", deletionCount)
  1091. }
  1092. }
  1093. if agentsCfg.LoginPasswordDuration != nil {
  1094. log.Debug("trying to delete old agents from password")
  1095. deletionCount, err := c.Ent.Machine.Delete().Where(
  1096. machine.LastHeartbeatLTE(time.Now().UTC().Add(-*agentsCfg.LoginPasswordDuration)),
  1097. ).Where(
  1098. machine.Not(machine.HasAlerts()),
  1099. ).Where(
  1100. machine.AuthTypeEQ(types.PasswordAuthType),
  1101. ).Exec(c.CTX)
  1102. log.Debugf("deleted %d entries", deletionCount)
  1103. if err != nil {
  1104. c.Log.Errorf("while auto-deleting expired machine (password) : %s", err)
  1105. } else if deletionCount > 0 {
  1106. c.Log.Infof("deleted %d expired machine (password auth)", deletionCount)
  1107. }
  1108. }
  1109. }
  1110. return nil
  1111. }
  1112. func (c *Client) FlushAlerts(MaxAge string, MaxItems int) error {
  1113. var deletedByAge int
  1114. var deletedByNbItem int
  1115. var totalAlerts int
  1116. var err error
  1117. if !c.CanFlush {
  1118. c.Log.Debug("a list is being imported, flushing later")
  1119. return nil
  1120. }
  1121. c.Log.Debug("Flushing orphan alerts")
  1122. c.FlushOrphans()
  1123. c.Log.Debug("Done flushing orphan alerts")
  1124. totalAlerts, err = c.TotalAlerts()
  1125. if err != nil {
  1126. c.Log.Warningf("FlushAlerts (max items count) : %s", err)
  1127. return errors.Wrap(err, "unable to get alerts count")
  1128. }
  1129. c.Log.Debugf("FlushAlerts (Total alerts): %d", totalAlerts)
  1130. if MaxAge != "" {
  1131. filter := map[string][]string{
  1132. "created_before": {MaxAge},
  1133. }
  1134. nbDeleted, err := c.DeleteAlertWithFilter(filter)
  1135. if err != nil {
  1136. c.Log.Warningf("FlushAlerts (max age) : %s", err)
  1137. return errors.Wrapf(err, "unable to flush alerts with filter until: %s", MaxAge)
  1138. }
  1139. c.Log.Debugf("FlushAlerts (deleted max age alerts): %d", nbDeleted)
  1140. deletedByAge = nbDeleted
  1141. }
  1142. if MaxItems > 0 {
  1143. //We get the highest id for the alerts
  1144. //We subtract MaxItems to avoid deleting alerts that are not old enough
  1145. //This gives us the oldest alert that we want to keep
  1146. //We then delete all the alerts with an id lower than this one
  1147. //We can do this because the id is auto-increment, and the database won't reuse the same id twice
  1148. lastAlert, err := c.QueryAlertWithFilter(map[string][]string{
  1149. "sort": {"DESC"},
  1150. "limit": {"1"},
  1151. })
  1152. c.Log.Debugf("FlushAlerts (last alert): %+v", lastAlert)
  1153. if err != nil {
  1154. c.Log.Errorf("FlushAlerts: could not get last alert: %s", err)
  1155. return errors.Wrap(err, "could not get last alert")
  1156. }
  1157. if len(lastAlert) != 0 {
  1158. maxid := lastAlert[0].ID - MaxItems
  1159. c.Log.Debugf("FlushAlerts (max id): %d", maxid)
  1160. if maxid > 0 {
  1161. //This may lead to orphan alerts (at least on MySQL), but the next time the flush job will run, they will be deleted
  1162. deletedByNbItem, err = c.Ent.Alert.Delete().Where(alert.IDLT(maxid)).Exec(c.CTX)
  1163. if err != nil {
  1164. c.Log.Errorf("FlushAlerts: Could not delete alerts : %s", err)
  1165. return errors.Wrap(err, "could not delete alerts")
  1166. }
  1167. }
  1168. }
  1169. }
  1170. if deletedByNbItem > 0 {
  1171. c.Log.Infof("flushed %d/%d alerts because max number of alerts has been reached (%d max)", deletedByNbItem, totalAlerts, MaxItems)
  1172. }
  1173. if deletedByAge > 0 {
  1174. c.Log.Infof("flushed %d/%d alerts because they were created %s ago or more", deletedByAge, totalAlerts, MaxAge)
  1175. }
  1176. return nil
  1177. }
  1178. func (c *Client) GetAlertByID(alertID int) (*ent.Alert, error) {
  1179. alert, err := c.Ent.Alert.Query().Where(alert.IDEQ(alertID)).WithDecisions().WithEvents().WithMetas().WithOwner().First(c.CTX)
  1180. if err != nil {
  1181. /*record not found, 404*/
  1182. if ent.IsNotFound(err) {
  1183. log.Warningf("GetAlertByID (not found): %s", err)
  1184. return &ent.Alert{}, ItemNotFound
  1185. }
  1186. c.Log.Warningf("GetAlertByID : %s", err)
  1187. return &ent.Alert{}, QueryFail
  1188. }
  1189. return alert, nil
  1190. }