|
@@ -75,12 +75,14 @@ func randomDuration(d time.Duration, delta time.Duration) time.Duration {
|
|
|
if ret <= 0 {
|
|
|
return 1
|
|
|
}
|
|
|
+
|
|
|
return ret
|
|
|
}
|
|
|
|
|
|
func (a *apic) FetchScenariosListFromDB() ([]string, error) {
|
|
|
scenarios := make([]string, 0)
|
|
|
machines, err := a.dbClient.ListMachines()
|
|
|
+
|
|
|
if err != nil {
|
|
|
return nil, fmt.Errorf("while listing machines: %w", err)
|
|
|
}
|
|
@@ -88,18 +90,22 @@ func (a *apic) FetchScenariosListFromDB() ([]string, error) {
|
|
|
for _, v := range machines {
|
|
|
machineScenarios := strings.Split(v.Scenarios, ",")
|
|
|
log.Debugf("%d scenarios for machine %d", len(machineScenarios), v.ID)
|
|
|
+
|
|
|
for _, sv := range machineScenarios {
|
|
|
if !slices.Contains(scenarios, sv) && sv != "" {
|
|
|
scenarios = append(scenarios, sv)
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
+
|
|
|
log.Debugf("Returning list of scenarios : %+v", scenarios)
|
|
|
+
|
|
|
return scenarios, nil
|
|
|
}
|
|
|
|
|
|
func decisionsToApiDecisions(decisions []*models.Decision) models.AddSignalsRequestItemDecisions {
|
|
|
apiDecisions := models.AddSignalsRequestItemDecisions{}
|
|
|
+
|
|
|
for _, decision := range decisions {
|
|
|
x := &models.AddSignalsRequestItemDecisionsItem{
|
|
|
Duration: ptr.Of(*decision.Duration),
|
|
@@ -114,11 +120,14 @@ func decisionsToApiDecisions(decisions []*models.Decision) models.AddSignalsRequ
|
|
|
UUID: decision.UUID,
|
|
|
}
|
|
|
*x.ID = decision.ID
|
|
|
+
|
|
|
if decision.Simulated != nil {
|
|
|
x.Simulated = *decision.Simulated
|
|
|
}
|
|
|
+
|
|
|
apiDecisions = append(apiDecisions, x)
|
|
|
}
|
|
|
+
|
|
|
return apiDecisions
|
|
|
}
|
|
|
|
|
@@ -149,6 +158,7 @@ func alertToSignal(alert *models.Alert, scenarioTrust string, shareContext bool)
|
|
|
}
|
|
|
if shareContext {
|
|
|
signal.Context = make([]*models.AddSignalsRequestItemContextItems0, 0)
|
|
|
+
|
|
|
for _, meta := range alert.Meta {
|
|
|
contextItem := models.AddSignalsRequestItemContextItems0{
|
|
|
Key: meta.Key,
|
|
@@ -157,13 +167,14 @@ func alertToSignal(alert *models.Alert, scenarioTrust string, shareContext bool)
|
|
|
signal.Context = append(signal.Context, &contextItem)
|
|
|
}
|
|
|
}
|
|
|
+
|
|
|
return signal
|
|
|
}
|
|
|
|
|
|
func NewAPIC(config *csconfig.OnlineApiClientCfg, dbClient *database.Client, consoleConfig *csconfig.ConsoleConfig, apicWhitelist *csconfig.CapiWhitelist) (*apic, error) {
|
|
|
var err error
|
|
|
- ret := &apic{
|
|
|
|
|
|
+ ret := &apic{
|
|
|
AlertsAddChan: make(chan []*models.Alert),
|
|
|
dbClient: dbClient,
|
|
|
mu: sync.Mutex{},
|
|
@@ -186,9 +197,11 @@ func NewAPIC(config *csconfig.OnlineApiClientCfg, dbClient *database.Client, con
|
|
|
|
|
|
password := strfmt.Password(config.Credentials.Password)
|
|
|
apiURL, err := url.Parse(config.Credentials.URL)
|
|
|
+
|
|
|
if err != nil {
|
|
|
return nil, fmt.Errorf("while parsing '%s': %w", config.Credentials.URL, err)
|
|
|
}
|
|
|
+
|
|
|
papiURL, err := url.Parse(config.Credentials.PapiURL)
|
|
|
if err != nil {
|
|
|
return nil, fmt.Errorf("while parsing '%s': %w", config.Credentials.PapiURL, err)
|
|
@@ -198,6 +211,7 @@ func NewAPIC(config *csconfig.OnlineApiClientCfg, dbClient *database.Client, con
|
|
|
if err != nil {
|
|
|
return nil, fmt.Errorf("while fetching scenarios from db: %w", err)
|
|
|
}
|
|
|
+
|
|
|
ret.apiClient, err = apiclient.NewClient(&apiclient.Config{
|
|
|
MachineID: config.Credentials.Login,
|
|
|
Password: password,
|
|
@@ -228,7 +242,7 @@ func NewAPIC(config *csconfig.OnlineApiClientCfg, dbClient *database.Client, con
|
|
|
return ret, fmt.Errorf("authenticate watcher (%s): %w", config.Credentials.Login, err)
|
|
|
}
|
|
|
|
|
|
- if err := ret.apiClient.GetClient().Transport.(*apiclient.JWTTransport).Expiration.UnmarshalText([]byte(authResp.Expire)); err != nil {
|
|
|
+ if err = ret.apiClient.GetClient().Transport.(*apiclient.JWTTransport).Expiration.UnmarshalText([]byte(authResp.Expire)); err != nil {
|
|
|
return ret, fmt.Errorf("unable to parse jwt expiration: %w", err)
|
|
|
}
|
|
|
|
|
@@ -242,6 +256,7 @@ func (a *apic) Push() error {
|
|
|
defer trace.CatchPanic("lapi/pushToAPIC")
|
|
|
|
|
|
var cache models.AddSignalsRequest
|
|
|
+
|
|
|
ticker := time.NewTicker(a.pushIntervalFirst)
|
|
|
|
|
|
log.Infof("Start push to CrowdSec Central API (interval: %s once, then %s)", a.pushIntervalFirst.Round(time.Second), a.pushInterval)
|
|
@@ -252,28 +267,35 @@ func (a *apic) Push() error {
|
|
|
a.pullTomb.Kill(nil)
|
|
|
a.metricsTomb.Kill(nil)
|
|
|
log.Infof("push tomb is dying, sending cache (%d elements) before exiting", len(cache))
|
|
|
+
|
|
|
if len(cache) == 0 {
|
|
|
return nil
|
|
|
}
|
|
|
+
|
|
|
go a.Send(&cache)
|
|
|
+
|
|
|
return nil
|
|
|
case <-ticker.C:
|
|
|
ticker.Reset(a.pushInterval)
|
|
|
+
|
|
|
if len(cache) > 0 {
|
|
|
a.mu.Lock()
|
|
|
cacheCopy := cache
|
|
|
cache = make(models.AddSignalsRequest, 0)
|
|
|
a.mu.Unlock()
|
|
|
log.Infof("Signal push: %d signals to push", len(cacheCopy))
|
|
|
+
|
|
|
go a.Send(&cacheCopy)
|
|
|
}
|
|
|
case alerts := <-a.AlertsAddChan:
|
|
|
var signals []*models.AddSignalsRequestItem
|
|
|
+
|
|
|
for _, alert := range alerts {
|
|
|
if ok := shouldShareAlert(alert, a.consoleConfig); ok {
|
|
|
signals = append(signals, alertToSignal(alert, getScenarioTrustOfAlert(alert), *a.consoleConfig.ShareContext))
|
|
|
}
|
|
|
}
|
|
|
+
|
|
|
a.mu.Lock()
|
|
|
cache = append(cache, signals...)
|
|
|
a.mu.Unlock()
|
|
@@ -288,11 +310,13 @@ func getScenarioTrustOfAlert(alert *models.Alert) string {
|
|
|
} else if alert.ScenarioVersion == nil || *alert.ScenarioVersion == "" || *alert.ScenarioVersion == "?" {
|
|
|
scenarioTrust = "tainted"
|
|
|
}
|
|
|
+
|
|
|
if len(alert.Decisions) > 0 {
|
|
|
if *alert.Decisions[0].Origin == types.CscliOrigin {
|
|
|
scenarioTrust = "manual"
|
|
|
}
|
|
|
}
|
|
|
+
|
|
|
return scenarioTrust
|
|
|
}
|
|
|
|
|
@@ -301,6 +325,7 @@ func shouldShareAlert(alert *models.Alert, consoleConfig *csconfig.ConsoleConfig
|
|
|
log.Debugf("simulation enabled for alert (id:%d), will not be sent to CAPI", alert.ID)
|
|
|
return false
|
|
|
}
|
|
|
+
|
|
|
switch scenarioTrust := getScenarioTrustOfAlert(alert); scenarioTrust {
|
|
|
case "manual":
|
|
|
if !*consoleConfig.ShareManualDecisions {
|
|
@@ -318,6 +343,7 @@ func shouldShareAlert(alert *models.Alert, consoleConfig *csconfig.ConsoleConfig
|
|
|
return false
|
|
|
}
|
|
|
}
|
|
|
+
|
|
|
return true
|
|
|
}
|
|
|
|
|
@@ -333,34 +359,44 @@ func (a *apic) Send(cacheOrig *models.AddSignalsRequest) {
|
|
|
|
|
|
I don't know enough about gin to tell how much of an issue it can be.
|
|
|
*/
|
|
|
- var cache []*models.AddSignalsRequestItem = *cacheOrig
|
|
|
- var send models.AddSignalsRequest
|
|
|
+ var (
|
|
|
+ cache []*models.AddSignalsRequestItem = *cacheOrig
|
|
|
+ send models.AddSignalsRequest
|
|
|
+ )
|
|
|
|
|
|
bulkSize := 50
|
|
|
pageStart := 0
|
|
|
pageEnd := bulkSize
|
|
|
|
|
|
for {
|
|
|
-
|
|
|
if pageEnd >= len(cache) {
|
|
|
send = cache[pageStart:]
|
|
|
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
|
|
|
+
|
|
|
defer cancel()
|
|
|
+
|
|
|
_, _, err := a.apiClient.Signal.Add(ctx, &send)
|
|
|
+
|
|
|
if err != nil {
|
|
|
log.Errorf("sending signal to central API: %s", err)
|
|
|
return
|
|
|
}
|
|
|
+
|
|
|
break
|
|
|
}
|
|
|
+
|
|
|
send = cache[pageStart:pageEnd]
|
|
|
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
|
|
|
+
|
|
|
defer cancel()
|
|
|
+
|
|
|
_, _, err := a.apiClient.Signal.Add(ctx, &send)
|
|
|
+
|
|
|
if err != nil {
|
|
|
//we log it here as well, because the return value of func might be discarded
|
|
|
log.Errorf("sending signal to central API: %s", err)
|
|
|
}
|
|
|
+
|
|
|
pageStart += bulkSize
|
|
|
pageEnd += bulkSize
|
|
|
}
|
|
@@ -372,18 +408,22 @@ func (a *apic) CAPIPullIsOld() (bool, error) {
|
|
|
alerts = alerts.Where(alert.HasDecisionsWith(decision.OriginEQ(database.CapiMachineID)))
|
|
|
alerts = alerts.Where(alert.CreatedAtGTE(time.Now().UTC().Add(-time.Duration(1*time.Hour + 30*time.Minute)))) //nolint:unconvert
|
|
|
count, err := alerts.Count(a.dbClient.CTX)
|
|
|
+
|
|
|
if err != nil {
|
|
|
return false, fmt.Errorf("while looking for CAPI alert: %w", err)
|
|
|
}
|
|
|
+
|
|
|
if count > 0 {
|
|
|
log.Printf("last CAPI pull is newer than 1h30, skip.")
|
|
|
return false, nil
|
|
|
}
|
|
|
+
|
|
|
return true, nil
|
|
|
}
|
|
|
|
|
|
-func (a *apic) HandleDeletedDecisions(deletedDecisions []*models.Decision, delete_counters map[string]map[string]int) (int, error) {
|
|
|
+func (a *apic) HandleDeletedDecisions(deletedDecisions []*models.Decision, deleteCounters map[string]map[string]int) (int, error) {
|
|
|
nbDeleted := 0
|
|
|
+
|
|
|
for _, decision := range deletedDecisions {
|
|
|
filter := map[string][]string{
|
|
|
"value": {*decision.Value},
|
|
@@ -398,20 +438,25 @@ func (a *apic) HandleDeletedDecisions(deletedDecisions []*models.Decision, delet
|
|
|
if err != nil {
|
|
|
return 0, fmt.Errorf("deleting decisions error: %w", err)
|
|
|
}
|
|
|
+
|
|
|
dbCliDel, err := strconv.Atoi(dbCliRet)
|
|
|
if err != nil {
|
|
|
return 0, fmt.Errorf("converting db ret %d: %w", dbCliDel, err)
|
|
|
}
|
|
|
- updateCounterForDecision(delete_counters, decision.Origin, decision.Scenario, dbCliDel)
|
|
|
+
|
|
|
+ updateCounterForDecision(deleteCounters, decision.Origin, decision.Scenario, dbCliDel)
|
|
|
nbDeleted += dbCliDel
|
|
|
}
|
|
|
+
|
|
|
return nbDeleted, nil
|
|
|
}
|
|
|
|
|
|
-func (a *apic) HandleDeletedDecisionsV3(deletedDecisions []*modelscapi.GetDecisionsStreamResponseDeletedItem, delete_counters map[string]map[string]int) (int, error) {
|
|
|
+func (a *apic) HandleDeletedDecisionsV3(deletedDecisions []*modelscapi.GetDecisionsStreamResponseDeletedItem, deleteCounters map[string]map[string]int) (int, error) {
|
|
|
var nbDeleted int
|
|
|
+
|
|
|
for _, decisions := range deletedDecisions {
|
|
|
scope := decisions.Scope
|
|
|
+
|
|
|
for _, decision := range decisions.Decisions {
|
|
|
filter := map[string][]string{
|
|
|
"value": {decision},
|
|
@@ -425,26 +470,32 @@ func (a *apic) HandleDeletedDecisionsV3(deletedDecisions []*modelscapi.GetDecisi
|
|
|
if err != nil {
|
|
|
return 0, fmt.Errorf("deleting decisions error: %w", err)
|
|
|
}
|
|
|
+
|
|
|
dbCliDel, err := strconv.Atoi(dbCliRet)
|
|
|
if err != nil {
|
|
|
return 0, fmt.Errorf("converting db ret %d: %w", dbCliDel, err)
|
|
|
}
|
|
|
- updateCounterForDecision(delete_counters, ptr.Of(types.CAPIOrigin), nil, dbCliDel)
|
|
|
+
|
|
|
+ updateCounterForDecision(deleteCounters, ptr.Of(types.CAPIOrigin), nil, dbCliDel)
|
|
|
nbDeleted += dbCliDel
|
|
|
}
|
|
|
}
|
|
|
+
|
|
|
return nbDeleted, nil
|
|
|
}
|
|
|
|
|
|
func createAlertsForDecisions(decisions []*models.Decision) []*models.Alert {
|
|
|
newAlerts := make([]*models.Alert, 0)
|
|
|
+
|
|
|
for _, decision := range decisions {
|
|
|
found := false
|
|
|
+
|
|
|
for _, sub := range newAlerts {
|
|
|
if sub.Source.Scope == nil {
|
|
|
log.Warningf("nil scope in %+v", sub)
|
|
|
continue
|
|
|
}
|
|
|
+
|
|
|
if *decision.Origin == types.CAPIOrigin {
|
|
|
if *sub.Source.Scope == types.CAPIOrigin {
|
|
|
found = true
|
|
@@ -464,11 +515,13 @@ func createAlertsForDecisions(decisions []*models.Decision) []*models.Alert {
|
|
|
log.Warningf("unknown origin %s : %+v", *decision.Origin, decision)
|
|
|
}
|
|
|
}
|
|
|
+
|
|
|
if !found {
|
|
|
log.Debugf("Create entry for origin:%s scenario:%s", *decision.Origin, *decision.Scenario)
|
|
|
newAlerts = append(newAlerts, createAlertForDecision(decision))
|
|
|
}
|
|
|
}
|
|
|
+
|
|
|
return newAlerts
|
|
|
}
|
|
|
|
|
@@ -489,6 +542,7 @@ func createAlertForDecision(decision *models.Decision) *models.Alert {
|
|
|
// XXX: this or nil?
|
|
|
scenario = ""
|
|
|
scope = ""
|
|
|
+
|
|
|
log.Warningf("unknown origin %s", *decision.Origin)
|
|
|
}
|
|
|
|
|
@@ -512,10 +566,10 @@ func createAlertForDecision(decision *models.Decision) *models.Alert {
|
|
|
}
|
|
|
|
|
|
// This function takes in list of parent alerts and decisions and then pairs them up.
|
|
|
-func fillAlertsWithDecisions(alerts []*models.Alert, decisions []*models.Decision, add_counters map[string]map[string]int) []*models.Alert {
|
|
|
+func fillAlertsWithDecisions(alerts []*models.Alert, decisions []*models.Decision, addCounters map[string]map[string]int) []*models.Alert {
|
|
|
for _, decision := range decisions {
|
|
|
//count and create separate alerts for each list
|
|
|
- updateCounterForDecision(add_counters, decision.Origin, decision.Scenario, 1)
|
|
|
+ updateCounterForDecision(addCounters, decision.Origin, decision.Scenario, 1)
|
|
|
|
|
|
/*CAPI might send lower case scopes, unify it.*/
|
|
|
switch strings.ToLower(*decision.Scope) {
|
|
@@ -524,6 +578,7 @@ func fillAlertsWithDecisions(alerts []*models.Alert, decisions []*models.Decisio
|
|
|
case "range":
|
|
|
*decision.Scope = types.Range
|
|
|
}
|
|
|
+
|
|
|
found := false
|
|
|
//add the individual decisions to the right list
|
|
|
for idx, alert := range alerts {
|
|
@@ -531,6 +586,7 @@ func fillAlertsWithDecisions(alerts []*models.Alert, decisions []*models.Decisio
|
|
|
if *alert.Source.Scope == types.CAPIOrigin {
|
|
|
alerts[idx].Decisions = append(alerts[idx].Decisions, decision)
|
|
|
found = true
|
|
|
+
|
|
|
break
|
|
|
}
|
|
|
} else if *decision.Origin == types.ListOrigin {
|
|
@@ -543,10 +599,12 @@ func fillAlertsWithDecisions(alerts []*models.Alert, decisions []*models.Decisio
|
|
|
log.Warningf("unknown origin %s", *decision.Origin)
|
|
|
}
|
|
|
}
|
|
|
+
|
|
|
if !found {
|
|
|
log.Warningf("Orphaned decision for %s - %s", *decision.Origin, *decision.Scenario)
|
|
|
}
|
|
|
}
|
|
|
+
|
|
|
return alerts
|
|
|
}
|
|
|
|
|
@@ -581,18 +639,20 @@ func (a *apic) PullTop(forcePull bool) error {
|
|
|
if err != nil {
|
|
|
return fmt.Errorf("get stream: %w", err)
|
|
|
}
|
|
|
+
|
|
|
a.startup = false
|
|
|
/*to count additions/deletions across lists*/
|
|
|
|
|
|
log.Debugf("Received %d new decisions", len(data.New))
|
|
|
log.Debugf("Received %d deleted decisions", len(data.Deleted))
|
|
|
+
|
|
|
if data.Links != nil {
|
|
|
log.Debugf("Received %d blocklists links", len(data.Links.Blocklists))
|
|
|
}
|
|
|
|
|
|
- add_counters, delete_counters := makeAddAndDeleteCounters()
|
|
|
+ addCounters, deleteCounters := makeAddAndDeleteCounters()
|
|
|
// process deleted decisions
|
|
|
- if nbDeleted, err := a.HandleDeletedDecisionsV3(data.Deleted, delete_counters); err != nil {
|
|
|
+ if nbDeleted, err := a.HandleDeletedDecisionsV3(data.Deleted, deleteCounters); err != nil {
|
|
|
return err
|
|
|
} else {
|
|
|
log.Printf("capi/community-blocklist : %d explicit deletions", nbDeleted)
|
|
@@ -610,28 +670,30 @@ func (a *apic) PullTop(forcePull bool) error {
|
|
|
|
|
|
alert := createAlertForDecision(decisions[0])
|
|
|
alertsFromCapi := []*models.Alert{alert}
|
|
|
- alertsFromCapi = fillAlertsWithDecisions(alertsFromCapi, decisions, add_counters)
|
|
|
+ alertsFromCapi = fillAlertsWithDecisions(alertsFromCapi, decisions, addCounters)
|
|
|
|
|
|
- err = a.SaveAlerts(alertsFromCapi, add_counters, delete_counters)
|
|
|
+ err = a.SaveAlerts(alertsFromCapi, addCounters, deleteCounters)
|
|
|
if err != nil {
|
|
|
return fmt.Errorf("while saving alerts: %w", err)
|
|
|
}
|
|
|
|
|
|
// update blocklists
|
|
|
- if err := a.UpdateBlocklists(data.Links, add_counters, forcePull); err != nil {
|
|
|
+ if err := a.UpdateBlocklists(data.Links, addCounters, forcePull); err != nil {
|
|
|
return fmt.Errorf("while updating blocklists: %w", err)
|
|
|
}
|
|
|
+
|
|
|
return nil
|
|
|
}
|
|
|
|
|
|
// we receive a link to a blocklist, we pull the content of the blocklist and we create one alert
|
|
|
func (a *apic) PullBlocklist(blocklist *modelscapi.BlocklistLink, forcePull bool) error {
|
|
|
- add_counters, _ := makeAddAndDeleteCounters()
|
|
|
+ addCounters, _ := makeAddAndDeleteCounters()
|
|
|
if err := a.UpdateBlocklists(&modelscapi.GetDecisionsStreamResponseLinks{
|
|
|
Blocklists: []*modelscapi.BlocklistLink{blocklist},
|
|
|
- }, add_counters, forcePull); err != nil {
|
|
|
+ }, addCounters, forcePull); err != nil {
|
|
|
return fmt.Errorf("while pulling blocklist: %w", err)
|
|
|
}
|
|
|
+
|
|
|
return nil
|
|
|
}
|
|
|
|
|
@@ -641,17 +703,20 @@ func (a *apic) whitelistedBy(decision *models.Decision) string {
|
|
|
if decision.Value == nil {
|
|
|
return ""
|
|
|
}
|
|
|
+
|
|
|
ipval := net.ParseIP(*decision.Value)
|
|
|
for _, cidr := range a.whitelists.Cidrs {
|
|
|
if cidr.Contains(ipval) {
|
|
|
return cidr.String()
|
|
|
}
|
|
|
}
|
|
|
+
|
|
|
for _, ip := range a.whitelists.Ips {
|
|
|
if ip != nil && ip.Equal(ipval) {
|
|
|
return ip.String()
|
|
|
}
|
|
|
}
|
|
|
+
|
|
|
return ""
|
|
|
}
|
|
|
|
|
@@ -661,12 +726,14 @@ func (a *apic) ApplyApicWhitelists(decisions []*models.Decision) []*models.Decis
|
|
|
}
|
|
|
//deal with CAPI whitelists for fire. We want to avoid having a second list, so we shrink in place
|
|
|
outIdx := 0
|
|
|
+
|
|
|
for _, decision := range decisions {
|
|
|
whitelister := a.whitelistedBy(decision)
|
|
|
if whitelister != "" {
|
|
|
log.Infof("%s from %s is whitelisted by %s", *decision.Value, *decision.Scenario, whitelister)
|
|
|
continue
|
|
|
}
|
|
|
+
|
|
|
decisions[outIdx] = decision
|
|
|
outIdx++
|
|
|
}
|
|
@@ -674,17 +741,20 @@ func (a *apic) ApplyApicWhitelists(decisions []*models.Decision) []*models.Decis
|
|
|
return decisions[:outIdx]
|
|
|
}
|
|
|
|
|
|
-func (a *apic) SaveAlerts(alertsFromCapi []*models.Alert, add_counters map[string]map[string]int, delete_counters map[string]map[string]int) error {
|
|
|
+func (a *apic) SaveAlerts(alertsFromCapi []*models.Alert, addCounters map[string]map[string]int, deleteCounters map[string]map[string]int) error {
|
|
|
for _, alert := range alertsFromCapi {
|
|
|
- setAlertScenario(alert, add_counters, delete_counters)
|
|
|
+ setAlertScenario(alert, addCounters, deleteCounters)
|
|
|
log.Debugf("%s has %d decisions", *alert.Source.Scope, len(alert.Decisions))
|
|
|
+
|
|
|
if a.dbClient.Type == "sqlite" && (a.dbClient.WalMode == nil || !*a.dbClient.WalMode) {
|
|
|
log.Warningf("sqlite is not using WAL mode, LAPI might become unresponsive when inserting the community blocklist")
|
|
|
}
|
|
|
+
|
|
|
alertID, inserted, deleted, err := a.dbClient.UpdateCommunityBlocklist(alert)
|
|
|
if err != nil {
|
|
|
return fmt.Errorf("while saving alert from %s: %w", *alert.Source.Scope, err)
|
|
|
}
|
|
|
+
|
|
|
log.Printf("%s : added %d entries, deleted %d entries (alert:%d)", *alert.Source.Scope, inserted, deleted, alertID)
|
|
|
}
|
|
|
|
|
@@ -697,71 +767,91 @@ func (a *apic) ShouldForcePullBlocklist(blocklist *modelscapi.BlocklistLink) (bo
|
|
|
alertQuery.Where(alert.SourceScopeEQ(fmt.Sprintf("%s:%s", types.ListOrigin, *blocklist.Name)))
|
|
|
alertQuery.Order(ent.Desc(alert.FieldCreatedAt))
|
|
|
alertInstance, err := alertQuery.First(context.Background())
|
|
|
+
|
|
|
if err != nil {
|
|
|
if ent.IsNotFound(err) {
|
|
|
log.Debugf("no alert found for %s, force refresh", *blocklist.Name)
|
|
|
return true, nil
|
|
|
}
|
|
|
+
|
|
|
return false, fmt.Errorf("while getting alert: %w", err)
|
|
|
}
|
|
|
+
|
|
|
decisionQuery := a.dbClient.Ent.Decision.Query()
|
|
|
decisionQuery.Where(decision.HasOwnerWith(alert.IDEQ(alertInstance.ID)))
|
|
|
firstDecision, err := decisionQuery.First(context.Background())
|
|
|
+
|
|
|
if err != nil {
|
|
|
if ent.IsNotFound(err) {
|
|
|
log.Debugf("no decision found for %s, force refresh", *blocklist.Name)
|
|
|
return true, nil
|
|
|
}
|
|
|
+
|
|
|
return false, fmt.Errorf("while getting decision: %w", err)
|
|
|
}
|
|
|
+
|
|
|
if firstDecision == nil || firstDecision.Until == nil || firstDecision.Until.Sub(time.Now().UTC()) < (a.pullInterval+15*time.Minute) {
|
|
|
log.Debugf("at least one decision found for %s, expire soon, force refresh", *blocklist.Name)
|
|
|
return true, nil
|
|
|
}
|
|
|
+
|
|
|
return false, nil
|
|
|
}
|
|
|
|
|
|
-func (a *apic) updateBlocklist(client *apiclient.ApiClient, blocklist *modelscapi.BlocklistLink, add_counters map[string]map[string]int, forcePull bool) error {
|
|
|
+func (a *apic) updateBlocklist(client *apiclient.ApiClient, blocklist *modelscapi.BlocklistLink, addCounters map[string]map[string]int, forcePull bool) error {
|
|
|
if blocklist.Scope == nil {
|
|
|
log.Warningf("blocklist has no scope")
|
|
|
return nil
|
|
|
}
|
|
|
+
|
|
|
if blocklist.Duration == nil {
|
|
|
log.Warningf("blocklist has no duration")
|
|
|
return nil
|
|
|
}
|
|
|
+
|
|
|
if !forcePull {
|
|
|
_forcePull, err := a.ShouldForcePullBlocklist(blocklist)
|
|
|
if err != nil {
|
|
|
return fmt.Errorf("while checking if we should force pull blocklist %s: %w", *blocklist.Name, err)
|
|
|
}
|
|
|
+
|
|
|
forcePull = _forcePull
|
|
|
}
|
|
|
+
|
|
|
blocklistConfigItemName := fmt.Sprintf("blocklist:%s:last_pull", *blocklist.Name)
|
|
|
- var lastPullTimestamp *string
|
|
|
- var err error
|
|
|
+
|
|
|
+ var (
|
|
|
+ lastPullTimestamp *string
|
|
|
+ err error
|
|
|
+ )
|
|
|
+
|
|
|
if !forcePull {
|
|
|
lastPullTimestamp, err = a.dbClient.GetConfigItem(blocklistConfigItemName)
|
|
|
if err != nil {
|
|
|
return fmt.Errorf("while getting last pull timestamp for blocklist %s: %w", *blocklist.Name, err)
|
|
|
}
|
|
|
}
|
|
|
+
|
|
|
decisions, hasChanged, err := client.Decisions.GetDecisionsFromBlocklist(context.Background(), blocklist, lastPullTimestamp)
|
|
|
if err != nil {
|
|
|
return fmt.Errorf("while getting decisions from blocklist %s: %w", *blocklist.Name, err)
|
|
|
}
|
|
|
+
|
|
|
if !hasChanged {
|
|
|
if lastPullTimestamp == nil {
|
|
|
log.Infof("blocklist %s hasn't been modified or there was an error reading it, skipping", *blocklist.Name)
|
|
|
} else {
|
|
|
log.Infof("blocklist %s hasn't been modified since %s, skipping", *blocklist.Name, *lastPullTimestamp)
|
|
|
}
|
|
|
+
|
|
|
return nil
|
|
|
}
|
|
|
+
|
|
|
err = a.dbClient.SetConfigItem(blocklistConfigItemName, time.Now().UTC().Format(http.TimeFormat))
|
|
|
if err != nil {
|
|
|
return fmt.Errorf("while setting last pull timestamp for blocklist %s: %w", *blocklist.Name, err)
|
|
|
}
|
|
|
+
|
|
|
if len(decisions) == 0 {
|
|
|
log.Infof("blocklist %s has no decisions", *blocklist.Name)
|
|
|
return nil
|
|
@@ -770,19 +860,21 @@ func (a *apic) updateBlocklist(client *apiclient.ApiClient, blocklist *modelscap
|
|
|
decisions = a.ApplyApicWhitelists(decisions)
|
|
|
alert := createAlertForDecision(decisions[0])
|
|
|
alertsFromCapi := []*models.Alert{alert}
|
|
|
- alertsFromCapi = fillAlertsWithDecisions(alertsFromCapi, decisions, add_counters)
|
|
|
+ alertsFromCapi = fillAlertsWithDecisions(alertsFromCapi, decisions, addCounters)
|
|
|
|
|
|
- err = a.SaveAlerts(alertsFromCapi, add_counters, nil)
|
|
|
+ err = a.SaveAlerts(alertsFromCapi, addCounters, nil)
|
|
|
if err != nil {
|
|
|
return fmt.Errorf("while saving alert from blocklist %s: %w", *blocklist.Name, err)
|
|
|
}
|
|
|
+
|
|
|
return nil
|
|
|
}
|
|
|
|
|
|
-func (a *apic) UpdateBlocklists(links *modelscapi.GetDecisionsStreamResponseLinks, add_counters map[string]map[string]int, forcePull bool) error {
|
|
|
+func (a *apic) UpdateBlocklists(links *modelscapi.GetDecisionsStreamResponseLinks, addCounters map[string]map[string]int, forcePull bool) error {
|
|
|
if links == nil {
|
|
|
return nil
|
|
|
}
|
|
|
+
|
|
|
if links.Blocklists == nil {
|
|
|
return nil
|
|
|
}
|
|
@@ -792,21 +884,23 @@ func (a *apic) UpdateBlocklists(links *modelscapi.GetDecisionsStreamResponseLink
|
|
|
if err != nil {
|
|
|
return fmt.Errorf("while creating default client: %w", err)
|
|
|
}
|
|
|
+
|
|
|
for _, blocklist := range links.Blocklists {
|
|
|
- if err := a.updateBlocklist(defaultClient, blocklist, add_counters, forcePull); err != nil {
|
|
|
+ if err := a.updateBlocklist(defaultClient, blocklist, addCounters, forcePull); err != nil {
|
|
|
return err
|
|
|
}
|
|
|
}
|
|
|
+
|
|
|
return nil
|
|
|
}
|
|
|
|
|
|
-func setAlertScenario(alert *models.Alert, add_counters map[string]map[string]int, delete_counters map[string]map[string]int) {
|
|
|
+func setAlertScenario(alert *models.Alert, addCounters map[string]map[string]int, deleteCounters map[string]map[string]int) {
|
|
|
if *alert.Source.Scope == types.CAPIOrigin {
|
|
|
*alert.Source.Scope = types.CommunityBlocklistPullSourceScope
|
|
|
- alert.Scenario = ptr.Of(fmt.Sprintf("update : +%d/-%d IPs", add_counters[types.CAPIOrigin]["all"], delete_counters[types.CAPIOrigin]["all"]))
|
|
|
+ alert.Scenario = ptr.Of(fmt.Sprintf("update : +%d/-%d IPs", addCounters[types.CAPIOrigin]["all"], deleteCounters[types.CAPIOrigin]["all"]))
|
|
|
} else if *alert.Source.Scope == types.ListOrigin {
|
|
|
*alert.Source.Scope = fmt.Sprintf("%s:%s", types.ListOrigin, *alert.Scenario)
|
|
|
- alert.Scenario = ptr.Of(fmt.Sprintf("update : +%d/-%d IPs", add_counters[types.ListOrigin][*alert.Scenario], delete_counters[types.ListOrigin][*alert.Scenario]))
|
|
|
+ alert.Scenario = ptr.Of(fmt.Sprintf("update : +%d/-%d IPs", addCounters[types.ListOrigin][*alert.Scenario], deleteCounters[types.ListOrigin][*alert.Scenario]))
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -814,20 +908,26 @@ func (a *apic) Pull() error {
|
|
|
defer trace.CatchPanic("lapi/pullFromAPIC")
|
|
|
|
|
|
toldOnce := false
|
|
|
+
|
|
|
for {
|
|
|
scenario, err := a.FetchScenariosListFromDB()
|
|
|
if err != nil {
|
|
|
log.Errorf("unable to fetch scenarios from db: %s", err)
|
|
|
}
|
|
|
+
|
|
|
if len(scenario) > 0 {
|
|
|
break
|
|
|
}
|
|
|
+
|
|
|
if !toldOnce {
|
|
|
log.Warning("scenario list is empty, will not pull yet")
|
|
|
+
|
|
|
toldOnce = true
|
|
|
}
|
|
|
+
|
|
|
time.Sleep(1 * time.Second)
|
|
|
}
|
|
|
+
|
|
|
if err := a.PullTop(false); err != nil {
|
|
|
log.Errorf("capi pull top: %s", err)
|
|
|
}
|
|
@@ -839,6 +939,7 @@ func (a *apic) Pull() error {
|
|
|
select {
|
|
|
case <-ticker.C:
|
|
|
ticker.Reset(a.pullInterval)
|
|
|
+
|
|
|
if err := a.PullTop(false); err != nil {
|
|
|
log.Errorf("capi pull top: %s", err)
|
|
|
continue
|
|
@@ -846,6 +947,7 @@ func (a *apic) Pull() error {
|
|
|
case <-a.pullTomb.Dying(): // if one apic routine is dying, do we kill the others?
|
|
|
a.metricsTomb.Kill(nil)
|
|
|
a.pushTomb.Kill(nil)
|
|
|
+
|
|
|
return nil
|
|
|
}
|
|
|
}
|
|
@@ -858,15 +960,15 @@ func (a *apic) Shutdown() {
|
|
|
}
|
|
|
|
|
|
func makeAddAndDeleteCounters() (map[string]map[string]int, map[string]map[string]int) {
|
|
|
- add_counters := make(map[string]map[string]int)
|
|
|
- add_counters[types.CAPIOrigin] = make(map[string]int)
|
|
|
- add_counters[types.ListOrigin] = make(map[string]int)
|
|
|
+ addCounters := make(map[string]map[string]int)
|
|
|
+ addCounters[types.CAPIOrigin] = make(map[string]int)
|
|
|
+ addCounters[types.ListOrigin] = make(map[string]int)
|
|
|
|
|
|
- delete_counters := make(map[string]map[string]int)
|
|
|
- delete_counters[types.CAPIOrigin] = make(map[string]int)
|
|
|
- delete_counters[types.ListOrigin] = make(map[string]int)
|
|
|
+ deleteCounters := make(map[string]map[string]int)
|
|
|
+ deleteCounters[types.CAPIOrigin] = make(map[string]int)
|
|
|
+ deleteCounters[types.ListOrigin] = make(map[string]int)
|
|
|
|
|
|
- return add_counters, delete_counters
|
|
|
+ return addCounters, deleteCounters
|
|
|
}
|
|
|
|
|
|
func updateCounterForDecision(counter map[string]map[string]int, origin *string, scenario *string, totalDecisions int) {
|