|
@@ -20,9 +20,10 @@ import (
|
|
)
|
|
)
|
|
|
|
|
|
const (
|
|
const (
|
|
- paginationSize = 100 // used to queryAlert to avoid 'too many SQL variable'
|
|
|
|
- defaultLimit = 100 // default limit of element to returns when query alerts
|
|
|
|
- bulkSize = 50 // bulk size when create alerts
|
|
|
|
|
|
+ paginationSize = 100 // used to queryAlert to avoid 'too many SQL variable'
|
|
|
|
+ defaultLimit = 100 // default limit of element to returns when query alerts
|
|
|
|
+ bulkSize = 50 // bulk size when create alerts
|
|
|
|
+ decisionBulkSize = 50
|
|
)
|
|
)
|
|
|
|
|
|
func formatAlertAsString(machineId string, alert *models.Alert) []string {
|
|
func formatAlertAsString(machineId string, alert *models.Alert) []string {
|
|
@@ -117,7 +118,6 @@ func (c *Client) CreateAlert(machineID string, alertList []*models.Alert) ([]str
|
|
/*We can't bulk both the alert and the decision at the same time. With new consensus, we want to bulk a single alert with a lot of decisions.*/
|
|
/*We can't bulk both the alert and the decision at the same time. With new consensus, we want to bulk a single alert with a lot of decisions.*/
|
|
func (c *Client) UpdateCommunityBlocklist(alertItem *models.Alert) (int, int, int, error) {
|
|
func (c *Client) UpdateCommunityBlocklist(alertItem *models.Alert) (int, int, int, error) {
|
|
|
|
|
|
- decisionBulkSize := 50
|
|
|
|
var err error
|
|
var err error
|
|
var deleted, inserted int
|
|
var deleted, inserted int
|
|
|
|
|
|
@@ -278,6 +278,23 @@ func (c *Client) UpdateCommunityBlocklist(alertItem *models.Alert) (int, int, in
|
|
return alertRef.ID, inserted, deleted, nil
|
|
return alertRef.ID, inserted, deleted, nil
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+func chunkDecisions(decisions []*ent.Decision, chunkSize int) [][]*ent.Decision {
|
|
|
|
+ var ret [][]*ent.Decision
|
|
|
|
+ var chunk []*ent.Decision
|
|
|
|
+
|
|
|
|
+ for _, d := range decisions {
|
|
|
|
+ chunk = append(chunk, d)
|
|
|
|
+ if len(chunk) == chunkSize {
|
|
|
|
+ ret = append(ret, chunk)
|
|
|
|
+ chunk = nil
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ if len(chunk) > 0 {
|
|
|
|
+ ret = append(ret, chunk)
|
|
|
|
+ }
|
|
|
|
+ return ret
|
|
|
|
+}
|
|
|
|
+
|
|
func (c *Client) CreateAlertBulk(machineId string, alertList []*models.Alert) ([]string, error) {
|
|
func (c *Client) CreateAlertBulk(machineId string, alertList []*models.Alert) ([]string, error) {
|
|
|
|
|
|
ret := []string{}
|
|
ret := []string{}
|
|
@@ -285,6 +302,7 @@ func (c *Client) CreateAlertBulk(machineId string, alertList []*models.Alert) ([
|
|
|
|
|
|
c.Log.Debugf("writting %d items", len(alertList))
|
|
c.Log.Debugf("writting %d items", len(alertList))
|
|
bulk := make([]*ent.AlertCreate, 0, bulkSize)
|
|
bulk := make([]*ent.AlertCreate, 0, bulkSize)
|
|
|
|
+ alertDecisions := make([][]*ent.Decision, 0, bulkSize)
|
|
for i, alertItem := range alertList {
|
|
for i, alertItem := range alertList {
|
|
var decisions []*ent.Decision
|
|
var decisions []*ent.Decision
|
|
var metas []*ent.Meta
|
|
var metas []*ent.Meta
|
|
@@ -394,8 +412,10 @@ func (c *Client) CreateAlertBulk(machineId string, alertList []*models.Alert) ([
|
|
c.Log.Errorf("While parsing StartAt of item %s : %s", *alertItem.StopAt, err)
|
|
c.Log.Errorf("While parsing StartAt of item %s : %s", *alertItem.StopAt, err)
|
|
ts = time.Now()
|
|
ts = time.Now()
|
|
}
|
|
}
|
|
|
|
+
|
|
|
|
+ decisions = make([]*ent.Decision, 0)
|
|
if len(alertItem.Decisions) > 0 {
|
|
if len(alertItem.Decisions) > 0 {
|
|
- decisionBulk := make([]*ent.DecisionCreate, len(alertItem.Decisions))
|
|
|
|
|
|
+ decisionBulk := make([]*ent.DecisionCreate, 0, decisionBulkSize)
|
|
for i, decisionItem := range alertItem.Decisions {
|
|
for i, decisionItem := range alertItem.Decisions {
|
|
var start_ip, start_sfx, end_ip, end_sfx int64
|
|
var start_ip, start_sfx, end_ip, end_sfx int64
|
|
var sz int
|
|
var sz int
|
|
@@ -412,7 +432,8 @@ func (c *Client) CreateAlertBulk(machineId string, alertList []*models.Alert) ([
|
|
return []string{}, errors.Wrapf(ParseDurationFail, "invalid addr/range %s : %s", *decisionItem.Value, err)
|
|
return []string{}, errors.Wrapf(ParseDurationFail, "invalid addr/range %s : %s", *decisionItem.Value, err)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
- decisionBulk[i] = c.Ent.Decision.Create().
|
|
|
|
|
|
+
|
|
|
|
+ decisionCreate := c.Ent.Decision.Create().
|
|
SetUntil(ts.Add(duration)).
|
|
SetUntil(ts.Add(duration)).
|
|
SetScenario(*decisionItem.Scenario).
|
|
SetScenario(*decisionItem.Scenario).
|
|
SetType(*decisionItem.Type).
|
|
SetType(*decisionItem.Type).
|
|
@@ -425,12 +446,27 @@ func (c *Client) CreateAlertBulk(machineId string, alertList []*models.Alert) ([
|
|
SetScope(*decisionItem.Scope).
|
|
SetScope(*decisionItem.Scope).
|
|
SetOrigin(*decisionItem.Origin).
|
|
SetOrigin(*decisionItem.Origin).
|
|
SetSimulated(*alertItem.Simulated)
|
|
SetSimulated(*alertItem.Simulated)
|
|
|
|
+
|
|
|
|
+ decisionBulk = append(decisionBulk, decisionCreate)
|
|
|
|
+ if len(decisionBulk) == decisionBulkSize {
|
|
|
|
+ decisionsCreateRet, err := c.Ent.Decision.CreateBulk(decisionBulk...).Save(c.CTX)
|
|
|
|
+ if err != nil {
|
|
|
|
+ return []string{}, errors.Wrapf(BulkError, "creating alert decisions: %s", err)
|
|
|
|
+
|
|
|
|
+ }
|
|
|
|
+ decisions = append(decisions, decisionsCreateRet...)
|
|
|
|
+ if len(alertItem.Decisions)-i <= decisionBulkSize {
|
|
|
|
+ decisionBulk = make([]*ent.DecisionCreate, 0, (len(alertItem.Decisions) - i))
|
|
|
|
+ } else {
|
|
|
|
+ decisionBulk = make([]*ent.DecisionCreate, 0, decisionBulkSize)
|
|
|
|
+ }
|
|
|
|
+ }
|
|
}
|
|
}
|
|
- decisions, err = c.Ent.Decision.CreateBulk(decisionBulk...).Save(c.CTX)
|
|
|
|
|
|
+ decisionsCreateRet, err := c.Ent.Decision.CreateBulk(decisionBulk...).Save(c.CTX)
|
|
if err != nil {
|
|
if err != nil {
|
|
return []string{}, errors.Wrapf(BulkError, "creating alert decisions: %s", err)
|
|
return []string{}, errors.Wrapf(BulkError, "creating alert decisions: %s", err)
|
|
-
|
|
|
|
}
|
|
}
|
|
|
|
+ decisions = append(decisions, decisionsCreateRet...)
|
|
}
|
|
}
|
|
|
|
|
|
alertB := c.Ent.Alert.
|
|
alertB := c.Ent.Alert.
|
|
@@ -454,7 +490,6 @@ func (c *Client) CreateAlertBulk(machineId string, alertList []*models.Alert) ([
|
|
SetSimulated(*alertItem.Simulated).
|
|
SetSimulated(*alertItem.Simulated).
|
|
SetScenarioVersion(*alertItem.ScenarioVersion).
|
|
SetScenarioVersion(*alertItem.ScenarioVersion).
|
|
SetScenarioHash(*alertItem.ScenarioHash).
|
|
SetScenarioHash(*alertItem.ScenarioHash).
|
|
- AddDecisions(decisions...).
|
|
|
|
AddEvents(events...).
|
|
AddEvents(events...).
|
|
AddMetas(metas...)
|
|
AddMetas(metas...)
|
|
|
|
|
|
@@ -462,20 +497,31 @@ func (c *Client) CreateAlertBulk(machineId string, alertList []*models.Alert) ([
|
|
alertB.SetOwner(owner)
|
|
alertB.SetOwner(owner)
|
|
}
|
|
}
|
|
bulk = append(bulk, alertB)
|
|
bulk = append(bulk, alertB)
|
|
|
|
+ alertDecisions = append(alertDecisions, decisions)
|
|
|
|
|
|
if len(bulk) == bulkSize {
|
|
if len(bulk) == bulkSize {
|
|
alerts, err := c.Ent.Alert.CreateBulk(bulk...).Save(c.CTX)
|
|
alerts, err := c.Ent.Alert.CreateBulk(bulk...).Save(c.CTX)
|
|
if err != nil {
|
|
if err != nil {
|
|
return []string{}, errors.Wrapf(BulkError, "bulk creating alert : %s", err)
|
|
return []string{}, errors.Wrapf(BulkError, "bulk creating alert : %s", err)
|
|
}
|
|
}
|
|
- for _, alert := range alerts {
|
|
|
|
- ret = append(ret, strconv.Itoa(alert.ID))
|
|
|
|
|
|
+ for _, a := range alerts {
|
|
|
|
+ ret = append(ret, strconv.Itoa(a.ID))
|
|
|
|
+ for _, d := range alertDecisions {
|
|
|
|
+ decisionsChunk := chunkDecisions(d, bulkSize)
|
|
|
|
+ for _, d2 := range decisionsChunk {
|
|
|
|
+ _, err := c.Ent.Alert.Update().Where(alert.IDEQ(a.ID)).AddDecisions(d2...).Save(c.CTX)
|
|
|
|
+ if err != nil {
|
|
|
|
+ return []string{}, fmt.Errorf("error while updating decisions: %s", err.Error())
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ }
|
|
}
|
|
}
|
|
-
|
|
|
|
if len(alertList)-i <= bulkSize {
|
|
if len(alertList)-i <= bulkSize {
|
|
bulk = make([]*ent.AlertCreate, 0, (len(alertList) - i))
|
|
bulk = make([]*ent.AlertCreate, 0, (len(alertList) - i))
|
|
|
|
+ alertDecisions = make([][]*ent.Decision, 0, (len(alertList) - i))
|
|
} else {
|
|
} else {
|
|
bulk = make([]*ent.AlertCreate, 0, bulkSize)
|
|
bulk = make([]*ent.AlertCreate, 0, bulkSize)
|
|
|
|
+ alertDecisions = make([][]*ent.Decision, 0, bulkSize)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
@@ -485,8 +531,17 @@ func (c *Client) CreateAlertBulk(machineId string, alertList []*models.Alert) ([
|
|
return []string{}, errors.Wrapf(BulkError, "leftovers creating alert : %s", err)
|
|
return []string{}, errors.Wrapf(BulkError, "leftovers creating alert : %s", err)
|
|
}
|
|
}
|
|
|
|
|
|
- for _, alert := range alerts {
|
|
|
|
- ret = append(ret, strconv.Itoa(alert.ID))
|
|
|
|
|
|
+ for _, a := range alerts {
|
|
|
|
+ ret = append(ret, strconv.Itoa(a.ID))
|
|
|
|
+ for _, d := range alertDecisions {
|
|
|
|
+ decisionsChunk := chunkDecisions(d, bulkSize)
|
|
|
|
+ for _, d2 := range decisionsChunk {
|
|
|
|
+ _, err := c.Ent.Alert.Update().Where(alert.IDEQ(a.ID)).AddDecisions(d2...).Save(c.CTX)
|
|
|
|
+ if err != nil {
|
|
|
|
+ return []string{}, fmt.Errorf("error while updating decisions: %s", err.Error())
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ }
|
|
}
|
|
}
|
|
|
|
|
|
return ret, nil
|
|
return ret, nil
|
|
@@ -812,7 +867,9 @@ func (c *Client) FlushOrphans() {
|
|
c.Log.Infof("%d deleted orphan events", events_count)
|
|
c.Log.Infof("%d deleted orphan events", events_count)
|
|
}
|
|
}
|
|
|
|
|
|
- events_count, err = c.Ent.Decision.Delete().Where(decision.Not(decision.HasOwner())).Exec(c.CTX)
|
|
|
|
|
|
+ events_count, err = c.Ent.Decision.Delete().Where(
|
|
|
|
+ decision.Not(decision.HasOwner())).Where(decision.UntilLTE(time.Now())).Exec(c.CTX)
|
|
|
|
+
|
|
if err != nil {
|
|
if err != nil {
|
|
c.Log.Warningf("error while deleting orphan decisions : %s", err)
|
|
c.Log.Warningf("error while deleting orphan decisions : %s", err)
|
|
return
|
|
return
|
|
@@ -828,6 +885,11 @@ func (c *Client) FlushAlerts(MaxAge string, MaxItems int) error {
|
|
var totalAlerts int
|
|
var totalAlerts int
|
|
var err error
|
|
var err error
|
|
|
|
|
|
|
|
+ if !c.CanFlush {
|
|
|
|
+ c.Log.Debug("a list is being imported, flushing later")
|
|
|
|
+ return nil
|
|
|
|
+ }
|
|
|
|
+
|
|
c.Log.Debug("Flushing orphan alerts")
|
|
c.Log.Debug("Flushing orphan alerts")
|
|
c.FlushOrphans()
|
|
c.FlushOrphans()
|
|
c.Log.Debug("Done flushing orphan alerts")
|
|
c.Log.Debug("Done flushing orphan alerts")
|