2020-11-30 09:37:17 +00:00
package apiserver
import (
"context"
"fmt"
2022-10-28 11:55:59 +00:00
"math/rand"
2023-03-21 10:50:10 +00:00
"net"
2023-02-06 13:06:14 +00:00
"net/http"
2020-11-30 09:37:17 +00:00
"net/url"
2021-08-25 09:45:29 +00:00
"strconv"
2020-11-30 09:37:17 +00:00
"strings"
"sync"
"time"
2022-10-28 11:55:59 +00:00
"github.com/go-openapi/strfmt"
"github.com/pkg/errors"
log "github.com/sirupsen/logrus"
2023-05-31 10:39:22 +00:00
"golang.org/x/exp/slices"
2022-10-28 11:55:59 +00:00
"gopkg.in/tomb.v2"
2023-05-25 13:43:39 +00:00
"github.com/crowdsecurity/go-cs-lib/pkg/ptr"
2023-05-23 08:52:47 +00:00
"github.com/crowdsecurity/go-cs-lib/pkg/trace"
"github.com/crowdsecurity/go-cs-lib/pkg/version"
2020-11-30 09:37:17 +00:00
"github.com/crowdsecurity/crowdsec/pkg/apiclient"
"github.com/crowdsecurity/crowdsec/pkg/csconfig"
"github.com/crowdsecurity/crowdsec/pkg/database"
2023-02-13 14:06:14 +00:00
"github.com/crowdsecurity/crowdsec/pkg/database/ent"
2021-08-25 09:45:29 +00:00
"github.com/crowdsecurity/crowdsec/pkg/database/ent/alert"
"github.com/crowdsecurity/crowdsec/pkg/database/ent/decision"
2020-11-30 09:37:17 +00:00
"github.com/crowdsecurity/crowdsec/pkg/models"
2023-02-06 13:06:14 +00:00
"github.com/crowdsecurity/crowdsec/pkg/modelscapi"
2020-11-30 09:37:17 +00:00
"github.com/crowdsecurity/crowdsec/pkg/types"
)
2022-03-29 12:20:26 +00:00
var (
2022-10-28 11:55:59 +00:00
pullIntervalDefault = time . Hour * 2
pullIntervalDelta = 5 * time . Minute
2023-01-31 13:47:44 +00:00
pushIntervalDefault = time . Second * 10
pushIntervalDelta = time . Second * 7
2022-10-28 11:55:59 +00:00
metricsIntervalDefault = time . Minute * 30
metricsIntervalDelta = time . Minute * 15
2020-11-30 09:37:17 +00:00
)
2023-01-31 13:47:44 +00:00
var SCOPE_CAPI_ALIAS_ALIAS string = "crowdsecurity/community-blocklist" //we don't use "CAPI" directly, to make it less confusing for the user
2022-03-29 12:20:26 +00:00
2020-11-30 09:37:17 +00:00
type apic struct {
2022-10-28 11:55:59 +00:00
// when changing the intervals in tests, always set *First too
// or they can be negative
pullInterval time . Duration
pullIntervalFirst time . Duration
pushInterval time . Duration
pushIntervalFirst time . Duration
metricsInterval time . Duration
metricsIntervalFirst time . Duration
dbClient * database . Client
apiClient * apiclient . ApiClient
2023-01-31 13:47:44 +00:00
AlertsAddChan chan [ ] * models . Alert
mu sync . Mutex
pushTomb tomb . Tomb
pullTomb tomb . Tomb
metricsTomb tomb . Tomb
startup bool
credentials * csconfig . ApiCredentialsCfg
scenarioList [ ] string
consoleConfig * csconfig . ConsoleConfig
2023-03-21 13:06:19 +00:00
isPulling chan bool
2023-03-21 10:50:10 +00:00
whitelists * csconfig . CapiWhitelist
2022-10-28 11:55:59 +00:00
}
// randomDuration returns a duration value between d-delta and d+delta
func randomDuration ( d time . Duration , delta time . Duration ) time . Duration {
return time . Duration ( float64 ( d ) + float64 ( delta ) * ( - 1.0 + 2.0 * rand . Float64 ( ) ) )
2020-11-30 09:37:17 +00:00
}
func ( a * apic ) FetchScenariosListFromDB ( ) ( [ ] string , error ) {
scenarios := make ( [ ] string , 0 )
machines , err := a . dbClient . ListMachines ( )
if err != nil {
return nil , errors . Wrap ( err , "while listing machines" )
}
//merge all scenarios together
for _ , v := range machines {
machineScenarios := strings . Split ( v . Scenarios , "," )
log . Debugf ( "%d scenarios for machine %d" , len ( machineScenarios ) , v . ID )
for _ , sv := range machineScenarios {
2023-05-31 10:39:22 +00:00
if ! slices . Contains ( scenarios , sv ) && sv != "" {
2020-11-30 09:37:17 +00:00
scenarios = append ( scenarios , sv )
}
}
}
log . Debugf ( "Returning list of scenarios : %+v" , scenarios )
return scenarios , nil
}
2023-01-31 13:47:44 +00:00
func decisionsToApiDecisions ( decisions [ ] * models . Decision ) models . AddSignalsRequestItemDecisions {
apiDecisions := models . AddSignalsRequestItemDecisions { }
for _ , decision := range decisions {
x := & models . AddSignalsRequestItemDecisionsItem {
2023-05-25 13:43:39 +00:00
Duration : ptr . Of ( * decision . Duration ) ,
2023-01-31 13:47:44 +00:00
ID : new ( int64 ) ,
2023-05-25 13:43:39 +00:00
Origin : ptr . Of ( * decision . Origin ) ,
Scenario : ptr . Of ( * decision . Scenario ) ,
Scope : ptr . Of ( * decision . Scope ) ,
2023-01-31 13:47:44 +00:00
//Simulated: *decision.Simulated,
2023-05-25 13:43:39 +00:00
Type : ptr . Of ( * decision . Type ) ,
2023-01-31 13:47:44 +00:00
Until : decision . Until ,
2023-05-25 13:43:39 +00:00
Value : ptr . Of ( * decision . Value ) ,
2023-01-31 13:47:44 +00:00
UUID : decision . UUID ,
}
* x . ID = decision . ID
if decision . Simulated != nil {
x . Simulated = * decision . Simulated
}
apiDecisions = append ( apiDecisions , x )
}
return apiDecisions
}
2023-01-04 15:50:02 +00:00
func alertToSignal ( alert * models . Alert , scenarioTrust string , shareContext bool ) * models . AddSignalsRequestItem {
signal := & models . AddSignalsRequestItem {
2020-11-30 09:37:17 +00:00
Message : alert . Message ,
Scenario : alert . Scenario ,
ScenarioHash : alert . ScenarioHash ,
ScenarioVersion : alert . ScenarioVersion ,
2023-01-31 13:47:44 +00:00
Source : & models . AddSignalsRequestItemSource {
AsName : alert . Source . AsName ,
AsNumber : alert . Source . AsNumber ,
Cn : alert . Source . Cn ,
IP : alert . Source . IP ,
Latitude : alert . Source . Latitude ,
Longitude : alert . Source . Longitude ,
Range : alert . Source . Range ,
Scope : alert . Source . Scope ,
Value : alert . Source . Value ,
} ,
StartAt : alert . StartAt ,
StopAt : alert . StopAt ,
CreatedAt : alert . CreatedAt ,
MachineID : alert . MachineID ,
ScenarioTrust : scenarioTrust ,
Decisions : decisionsToApiDecisions ( alert . Decisions ) ,
UUID : alert . UUID ,
2020-11-30 09:37:17 +00:00
}
2023-01-04 15:50:02 +00:00
if shareContext {
signal . Context = make ( [ ] * models . AddSignalsRequestItemContextItems0 , 0 )
for _ , meta := range alert . Meta {
contextItem := models . AddSignalsRequestItemContextItems0 {
Key : meta . Key ,
Value : meta . Value ,
}
signal . Context = append ( signal . Context , & contextItem )
}
}
return signal
2020-11-30 09:37:17 +00:00
}
2023-03-21 10:50:10 +00:00
func NewAPIC ( config * csconfig . OnlineApiClientCfg , dbClient * database . Client , consoleConfig * csconfig . ConsoleConfig , apicWhitelist * csconfig . CapiWhitelist ) ( * apic , error ) {
2020-11-30 09:37:17 +00:00
var err error
ret := & apic {
2023-01-31 13:47:44 +00:00
AlertsAddChan : make ( chan [ ] * models . Alert ) ,
2022-10-28 11:55:59 +00:00
dbClient : dbClient ,
mu : sync . Mutex { } ,
startup : true ,
credentials : config . Credentials ,
pullTomb : tomb . Tomb { } ,
pushTomb : tomb . Tomb { } ,
metricsTomb : tomb . Tomb { } ,
scenarioList : make ( [ ] string , 0 ) ,
consoleConfig : consoleConfig ,
pullInterval : pullIntervalDefault ,
pullIntervalFirst : randomDuration ( pullIntervalDefault , pullIntervalDelta ) ,
pushInterval : pushIntervalDefault ,
pushIntervalFirst : randomDuration ( pushIntervalDefault , pushIntervalDelta ) ,
metricsInterval : metricsIntervalDefault ,
metricsIntervalFirst : randomDuration ( metricsIntervalDefault , metricsIntervalDelta ) ,
2023-03-21 13:06:19 +00:00
isPulling : make ( chan bool , 1 ) ,
2023-03-21 10:50:10 +00:00
whitelists : apicWhitelist ,
2020-11-30 09:37:17 +00:00
}
password := strfmt . Password ( config . Credentials . Password )
apiURL , err := url . Parse ( config . Credentials . URL )
if err != nil {
return nil , errors . Wrapf ( err , "while parsing '%s'" , config . Credentials . URL )
}
2023-01-31 13:47:44 +00:00
papiURL , err := url . Parse ( config . Credentials . PapiURL )
if err != nil {
return nil , errors . Wrapf ( err , "while parsing '%s'" , config . Credentials . PapiURL )
}
2020-11-30 09:37:17 +00:00
ret . scenarioList , err = ret . FetchScenariosListFromDB ( )
if err != nil {
return nil , errors . Wrap ( err , "while fetching scenarios from db" )
}
ret . apiClient , err = apiclient . NewClient ( & apiclient . Config {
MachineID : config . Credentials . Login ,
Password : password ,
2023-05-23 08:52:47 +00:00
UserAgent : fmt . Sprintf ( "crowdsec/%s" , version . String ( ) ) ,
2020-11-30 09:37:17 +00:00
URL : apiURL ,
2023-01-31 13:47:44 +00:00
PapiURL : papiURL ,
2023-02-06 13:06:14 +00:00
VersionPrefix : "v3" ,
2020-11-30 09:37:17 +00:00
Scenarios : ret . scenarioList ,
UpdateScenario : ret . FetchScenariosListFromDB ,
} )
2023-01-31 13:47:44 +00:00
if err != nil {
return nil , errors . Wrap ( err , "while creating api client" )
}
2023-02-13 14:05:58 +00:00
// The watcher will be authenticated by the RoundTripper the first time it will call CAPI
// Explicit authentication will provoke an useless supplementary call to CAPI
2023-02-16 15:16:26 +00:00
scenarios , err := ret . FetchScenariosListFromDB ( )
if err != nil {
return ret , errors . Wrapf ( err , "get scenario in db: %s" , err )
}
authResp , _ , err := ret . apiClient . Auth . AuthenticateWatcher ( context . Background ( ) , models . WatcherAuthRequest {
MachineID : & config . Credentials . Login ,
Password : & password ,
Scenarios : scenarios ,
} )
if err != nil {
return ret , errors . Wrapf ( err , "authenticate watcher (%s)" , config . Credentials . Login )
}
if err := ret . apiClient . GetClient ( ) . Transport . ( * apiclient . JWTTransport ) . Expiration . UnmarshalText ( [ ] byte ( authResp . Expire ) ) ; err != nil {
return ret , errors . Wrap ( err , "unable to parse jwt expiration" )
}
ret . apiClient . GetClient ( ) . Transport . ( * apiclient . JWTTransport ) . Token = authResp . Token
2023-01-31 13:47:44 +00:00
2021-04-07 12:51:00 +00:00
return ret , err
2020-11-30 09:37:17 +00:00
}
2022-03-29 12:20:26 +00:00
// keep track of all alerts in cache and push it to CAPI every PushInterval.
2020-11-30 09:37:17 +00:00
func ( a * apic ) Push ( ) error {
2023-05-23 08:52:47 +00:00
defer trace . CatchPanic ( "lapi/pushToAPIC" )
2020-11-30 09:37:17 +00:00
var cache models . AddSignalsRequest
2022-10-28 11:55:59 +00:00
ticker := time . NewTicker ( a . pushIntervalFirst )
log . Infof ( "Start push to CrowdSec Central API (interval: %s once, then %s)" , a . pushIntervalFirst . Round ( time . Second ) , a . pushInterval )
2020-11-30 09:37:17 +00:00
for {
select {
case <- a . pushTomb . Dying ( ) : // if one apic routine is dying, do we kill the others?
a . pullTomb . Kill ( nil )
a . metricsTomb . Kill ( nil )
log . Infof ( "push tomb is dying, sending cache (%d elements) before exiting" , len ( cache ) )
if len ( cache ) == 0 {
return nil
}
2020-11-30 16:46:02 +00:00
go a . Send ( & cache )
return nil
2020-11-30 09:37:17 +00:00
case <- ticker . C :
2022-10-28 11:55:59 +00:00
ticker . Reset ( a . pushInterval )
2020-11-30 09:37:17 +00:00
if len ( cache ) > 0 {
a . mu . Lock ( )
cacheCopy := cache
cache = make ( models . AddSignalsRequest , 0 )
a . mu . Unlock ( )
log . Infof ( "Signal push: %d signals to push" , len ( cacheCopy ) )
2020-11-30 16:46:02 +00:00
go a . Send ( & cacheCopy )
2020-11-30 09:37:17 +00:00
}
2023-01-31 13:47:44 +00:00
case alerts := <- a . AlertsAddChan :
2020-11-30 09:37:17 +00:00
var signals [ ] * models . AddSignalsRequestItem
for _ , alert := range alerts {
2022-03-29 12:20:26 +00:00
if ok := shouldShareAlert ( alert , a . consoleConfig ) ; ok {
2023-01-04 15:50:02 +00:00
signals = append ( signals , alertToSignal ( alert , getScenarioTrustOfAlert ( alert ) , * a . consoleConfig . ShareContext ) )
2022-01-13 15:46:16 +00:00
}
2020-11-30 09:37:17 +00:00
}
a . mu . Lock ( )
cache = append ( cache , signals ... )
a . mu . Unlock ( )
}
}
}
2022-03-29 12:20:26 +00:00
func getScenarioTrustOfAlert ( alert * models . Alert ) string {
scenarioTrust := "certified"
if alert . ScenarioHash == nil || * alert . ScenarioHash == "" {
scenarioTrust = "custom"
} else if alert . ScenarioVersion == nil || * alert . ScenarioVersion == "" || * alert . ScenarioVersion == "?" {
scenarioTrust = "tainted"
}
if len ( alert . Decisions ) > 0 {
2023-01-31 13:47:44 +00:00
if * alert . Decisions [ 0 ] . Origin == types . CscliOrigin {
2022-03-29 12:20:26 +00:00
scenarioTrust = "manual"
}
}
return scenarioTrust
}
func shouldShareAlert ( alert * models . Alert , consoleConfig * csconfig . ConsoleConfig ) bool {
if * alert . Simulated {
log . Debugf ( "simulation enabled for alert (id:%d), will not be sent to CAPI" , alert . ID )
return false
}
switch scenarioTrust := getScenarioTrustOfAlert ( alert ) ; scenarioTrust {
case "manual" :
if ! * consoleConfig . ShareManualDecisions {
log . Debugf ( "manual decision generated an alert, doesn't send it to CAPI because options is disabled" )
return false
}
case "tainted" :
if ! * consoleConfig . ShareTaintedScenarios {
log . Debugf ( "tainted scenario generated an alert, doesn't send it to CAPI because options is disabled" )
return false
}
case "custom" :
if ! * consoleConfig . ShareCustomScenarios {
log . Debugf ( "custom scenario generated an alert, doesn't send it to CAPI because options is disabled" )
return false
}
}
return true
}
2020-11-30 16:46:02 +00:00
func ( a * apic ) Send ( cacheOrig * models . AddSignalsRequest ) {
2020-11-30 09:37:17 +00:00
/ * we do have a problem with this :
The apic . Push background routine reads from alertToPush chan .
This chan is filled by Controller . CreateAlert
If the chan apic . Send hangs , the alertToPush chan will become full ,
with means that Controller . CreateAlert is going to hang , blocking API worker ( s ) .
So instead , we prefer to cancel write .
I don ' t know enough about gin to tell how much of an issue it can be .
* /
2020-11-30 16:46:02 +00:00
var cache [ ] * models . AddSignalsRequestItem = * cacheOrig
var send models . AddSignalsRequest
bulkSize := 50
pageStart := 0
pageEnd := bulkSize
for {
if pageEnd >= len ( cache ) {
send = cache [ pageStart : ]
ctx , cancel := context . WithTimeout ( context . Background ( ) , 5 * time . Second )
defer cancel ( )
_ , _ , err := a . apiClient . Signal . Add ( ctx , & send )
if err != nil {
2023-01-31 13:47:44 +00:00
log . Errorf ( "sending signal to central API: %s" , err )
2020-11-30 16:46:02 +00:00
return
}
break
}
send = cache [ pageStart : pageEnd ]
ctx , cancel := context . WithTimeout ( context . Background ( ) , 5 * time . Second )
defer cancel ( )
_ , _ , err := a . apiClient . Signal . Add ( ctx , & send )
if err != nil {
//we log it here as well, because the return value of func might be discarded
2023-01-31 13:47:44 +00:00
log . Errorf ( "sending signal to central API: %s" , err )
2020-11-30 16:46:02 +00:00
}
pageStart += bulkSize
pageEnd += bulkSize
}
2020-11-30 09:37:17 +00:00
}
2022-03-29 12:20:26 +00:00
func ( a * apic ) CAPIPullIsOld ( ) ( bool , error ) {
2021-08-25 09:45:29 +00:00
/*only pull community blocklist if it's older than 1h30 */
alerts := a . dbClient . Ent . Alert . Query ( )
alerts = alerts . Where ( alert . HasDecisionsWith ( decision . OriginEQ ( database . CapiMachineID ) ) )
2022-06-16 12:41:54 +00:00
alerts = alerts . Where ( alert . CreatedAtGTE ( time . Now ( ) . UTC ( ) . Add ( - time . Duration ( 1 * time . Hour + 30 * time . Minute ) ) ) ) //nolint:unconvert
2021-08-25 09:45:29 +00:00
count , err := alerts . Count ( a . dbClient . CTX )
if err != nil {
2022-03-29 12:20:26 +00:00
return false , errors . Wrap ( err , "while looking for CAPI alert" )
2021-08-25 09:45:29 +00:00
}
if count > 0 {
log . Printf ( "last CAPI pull is newer than 1h30, skip." )
2022-03-29 12:20:26 +00:00
return false , nil
2020-11-30 09:37:17 +00:00
}
2022-03-29 12:20:26 +00:00
return true , nil
}
2022-01-11 13:31:51 +00:00
2022-03-29 12:20:26 +00:00
func ( a * apic ) HandleDeletedDecisions ( deletedDecisions [ ] * models . Decision , delete_counters map [ string ] map [ string ] int ) ( int , error ) {
2020-11-30 09:37:17 +00:00
var filter map [ string ] [ ] string
2021-08-25 09:45:29 +00:00
var nbDeleted int
2022-03-29 12:20:26 +00:00
for _ , decision := range deletedDecisions {
2020-11-30 09:37:17 +00:00
if strings . ToLower ( * decision . Scope ) == "ip" {
filter = make ( map [ string ] [ ] string , 1 )
filter [ "value" ] = [ ] string { * decision . Value }
} else {
filter = make ( map [ string ] [ ] string , 3 )
filter [ "value" ] = [ ] string { * decision . Value }
filter [ "type" ] = [ ] string { * decision . Type }
2022-03-29 12:20:26 +00:00
filter [ "scopes" ] = [ ] string { * decision . Scope }
2020-11-30 09:37:17 +00:00
}
2022-03-29 12:20:26 +00:00
filter [ "origin" ] = [ ] string { * decision . Origin }
2020-11-30 09:37:17 +00:00
2023-01-31 13:47:44 +00:00
dbCliRet , _ , err := a . dbClient . SoftDeleteDecisionsWithFilter ( filter )
2020-11-30 09:37:17 +00:00
if err != nil {
2022-03-29 12:20:26 +00:00
return 0 , errors . Wrap ( err , "deleting decisions error" )
2020-11-30 09:37:17 +00:00
}
2021-08-25 09:45:29 +00:00
dbCliDel , err := strconv . Atoi ( dbCliRet )
if err != nil {
2022-03-29 12:20:26 +00:00
return 0 , errors . Wrapf ( err , "converting db ret %d" , dbCliDel )
2021-08-25 09:45:29 +00:00
}
2023-02-06 13:06:14 +00:00
updateCounterForDecision ( delete_counters , decision . Origin , decision . Scenario , dbCliDel )
2021-08-25 09:45:29 +00:00
nbDeleted += dbCliDel
2020-11-30 09:37:17 +00:00
}
2022-03-29 12:20:26 +00:00
return nbDeleted , nil
2023-02-06 13:06:14 +00:00
}
func ( a * apic ) HandleDeletedDecisionsV3 ( deletedDecisions [ ] * modelscapi . GetDecisionsStreamResponseDeletedItem , delete_counters map [ string ] map [ string ] int ) ( int , error ) {
var filter map [ string ] [ ] string
var nbDeleted int
for _ , decisions := range deletedDecisions {
scope := decisions . Scope
for _ , decision := range decisions . Decisions {
if strings . ToLower ( * scope ) == "ip" {
filter = make ( map [ string ] [ ] string , 1 )
filter [ "value" ] = [ ] string { decision }
} else {
filter = make ( map [ string ] [ ] string , 2 )
filter [ "value" ] = [ ] string { decision }
filter [ "scopes" ] = [ ] string { * scope }
}
filter [ "origin" ] = [ ] string { types . CAPIOrigin }
2020-11-30 09:37:17 +00:00
2023-02-06 13:06:14 +00:00
dbCliRet , _ , err := a . dbClient . SoftDeleteDecisionsWithFilter ( filter )
if err != nil {
return 0 , errors . Wrap ( err , "deleting decisions error" )
}
dbCliDel , err := strconv . Atoi ( dbCliRet )
if err != nil {
return 0 , errors . Wrapf ( err , "converting db ret %d" , dbCliDel )
}
2023-05-25 13:43:39 +00:00
updateCounterForDecision ( delete_counters , ptr . Of ( types . CAPIOrigin ) , nil , dbCliDel )
2023-02-06 13:06:14 +00:00
nbDeleted += dbCliDel
}
}
return nbDeleted , nil
2022-03-29 12:20:26 +00:00
}
2022-01-11 13:31:51 +00:00
2022-03-29 12:20:26 +00:00
func createAlertsForDecisions ( decisions [ ] * models . Decision ) [ ] * models . Alert {
newAlerts := make ( [ ] * models . Alert , 0 )
for _ , decision := range decisions {
2022-01-11 13:31:51 +00:00
found := false
2022-03-29 12:20:26 +00:00
for _ , sub := range newAlerts {
2022-01-11 13:31:51 +00:00
if sub . Source . Scope == nil {
log . Warningf ( "nil scope in %+v" , sub )
continue
}
2023-01-31 13:47:44 +00:00
if * decision . Origin == types . CAPIOrigin {
if * sub . Source . Scope == types . CAPIOrigin {
2022-01-11 13:31:51 +00:00
found = true
break
}
2023-01-31 13:47:44 +00:00
} else if * decision . Origin == types . ListOrigin {
2022-01-11 13:31:51 +00:00
if * sub . Source . Scope == * decision . Origin {
if sub . Scenario == nil {
log . Warningf ( "nil scenario in %+v" , sub )
}
if * sub . Scenario == * decision . Scenario {
found = true
break
}
}
} else {
log . Warningf ( "unknown origin %s : %+v" , * decision . Origin , decision )
}
}
if ! found {
log . Debugf ( "Create entry for origin:%s scenario:%s" , * decision . Origin , * decision . Scenario )
2022-03-29 12:20:26 +00:00
newAlerts = append ( newAlerts , createAlertForDecision ( decision ) )
2022-01-11 13:31:51 +00:00
}
}
2022-03-29 12:20:26 +00:00
return newAlerts
}
2022-01-11 13:31:51 +00:00
2022-03-29 12:20:26 +00:00
func createAlertForDecision ( decision * models . Decision ) * models . Alert {
newAlert := & models . Alert { }
newAlert . Source = & models . Source { }
2023-05-25 13:43:39 +00:00
newAlert . Source . Scope = ptr . Of ( "" )
2023-01-31 13:47:44 +00:00
if * decision . Origin == types . CAPIOrigin { //to make things more user friendly, we replace CAPI with community-blocklist
2023-05-25 13:43:39 +00:00
newAlert . Scenario = ptr . Of ( types . CAPIOrigin )
newAlert . Source . Scope = ptr . Of ( types . CAPIOrigin )
2023-01-31 13:47:44 +00:00
} else if * decision . Origin == types . ListOrigin {
2023-05-25 13:43:39 +00:00
newAlert . Scenario = ptr . Of ( * decision . Scenario )
newAlert . Source . Scope = ptr . Of ( types . ListOrigin )
2022-03-29 12:20:26 +00:00
} else {
log . Warningf ( "unknown origin %s" , * decision . Origin )
}
2023-05-25 13:43:39 +00:00
newAlert . Message = ptr . Of ( "" )
newAlert . Source . Value = ptr . Of ( "" )
newAlert . StartAt = ptr . Of ( time . Now ( ) . UTC ( ) . Format ( time . RFC3339 ) )
newAlert . StopAt = ptr . Of ( time . Now ( ) . UTC ( ) . Format ( time . RFC3339 ) )
newAlert . Capacity = ptr . Of ( int32 ( 0 ) )
newAlert . Simulated = ptr . Of ( false )
newAlert . EventsCount = ptr . Of ( int32 ( 0 ) )
newAlert . Leakspeed = ptr . Of ( "" )
newAlert . ScenarioHash = ptr . Of ( "" )
newAlert . ScenarioVersion = ptr . Of ( "" )
2022-03-29 12:20:26 +00:00
newAlert . MachineID = database . CapiMachineID
return newAlert
}
// This function takes in list of parent alerts and decisions and then pairs them up.
func fillAlertsWithDecisions ( alerts [ ] * models . Alert , decisions [ ] * models . Decision , add_counters map [ string ] map [ string ] int ) [ ] * models . Alert {
for _ , decision := range decisions {
2022-01-11 13:31:51 +00:00
//count and create separate alerts for each list
2023-02-06 13:06:14 +00:00
updateCounterForDecision ( add_counters , decision . Origin , decision . Scenario , 1 )
2021-01-14 15:27:45 +00:00
2021-07-02 09:23:46 +00:00
/*CAPI might send lower case scopes, unify it.*/
switch strings . ToLower ( * decision . Scope ) {
case "ip" :
* decision . Scope = types . Ip
case "range" :
* decision . Scope = types . Range
}
2022-01-11 13:31:51 +00:00
found := false
//add the individual decisions to the right list
2022-03-29 12:20:26 +00:00
for idx , alert := range alerts {
2023-01-31 13:47:44 +00:00
if * decision . Origin == types . CAPIOrigin {
if * alert . Source . Scope == types . CAPIOrigin {
2022-03-29 12:20:26 +00:00
alerts [ idx ] . Decisions = append ( alerts [ idx ] . Decisions , decision )
2022-01-11 13:31:51 +00:00
found = true
break
}
2023-01-31 13:47:44 +00:00
} else if * decision . Origin == types . ListOrigin {
if * alert . Source . Scope == types . ListOrigin && * alert . Scenario == * decision . Scenario {
2022-03-29 12:20:26 +00:00
alerts [ idx ] . Decisions = append ( alerts [ idx ] . Decisions , decision )
2022-01-11 13:31:51 +00:00
found = true
break
}
} else {
log . Warningf ( "unknown origin %s" , * decision . Origin )
}
}
if ! found {
log . Warningf ( "Orphaned decision for %s - %s" , * decision . Origin , * decision . Scenario )
}
2020-11-30 09:37:17 +00:00
}
2022-03-29 12:20:26 +00:00
return alerts
}
2023-02-06 13:06:14 +00:00
// we receive a list of decisions and links for blocklist and we need to create a list of alerts :
2022-03-29 12:20:26 +00:00
// one alert for "community blocklist"
// one alert per list we're subscribed to
2023-03-21 13:06:19 +00:00
func ( a * apic ) PullTop ( forcePull bool ) error {
2022-03-29 12:20:26 +00:00
var err error
2023-03-21 13:06:19 +00:00
//A mutex with TryLock would be a bit simpler
//But go does not guarantee that TryLock will be able to acquire the lock even if it is available
select {
case a . isPulling <- true :
defer func ( ) {
<- a . isPulling
} ( )
default :
return errors . New ( "pull already in progress" )
}
if ! forcePull {
if lastPullIsOld , err := a . CAPIPullIsOld ( ) ; err != nil {
return err
} else if ! lastPullIsOld {
return nil
}
2022-03-29 12:20:26 +00:00
}
2022-11-08 09:44:25 +00:00
log . Infof ( "Starting community-blocklist update" )
2023-02-06 13:06:14 +00:00
data , _ , err := a . apiClient . Decisions . GetStreamV3 ( context . Background ( ) , apiclient . DecisionsStreamOpts { Startup : a . startup } )
2022-03-29 12:20:26 +00:00
if err != nil {
return errors . Wrap ( err , "get stream" )
}
a . startup = false
2022-04-19 09:25:27 +00:00
/*to count additions/deletions across lists*/
2022-03-29 12:20:26 +00:00
2022-10-26 08:48:17 +00:00
log . Debugf ( "Received %d new decisions" , len ( data . New ) )
log . Debugf ( "Received %d deleted decisions" , len ( data . Deleted ) )
2023-02-06 13:06:14 +00:00
if data . Links != nil {
log . Debugf ( "Received %d blocklists links" , len ( data . Links . Blocklists ) )
}
2022-10-26 08:48:17 +00:00
2022-03-29 12:20:26 +00:00
add_counters , delete_counters := makeAddAndDeleteCounters ( )
// process deleted decisions
2023-02-06 13:06:14 +00:00
if nbDeleted , err := a . HandleDeletedDecisionsV3 ( data . Deleted , delete_counters ) ; err != nil {
2022-03-29 12:20:26 +00:00
return err
} else {
log . Printf ( "capi/community-blocklist : %d explicit deletions" , nbDeleted )
}
if len ( data . New ) == 0 {
2022-04-01 13:31:33 +00:00
log . Infof ( "capi/community-blocklist : received 0 new entries (expected if you just installed crowdsec)" )
2022-03-29 12:20:26 +00:00
return nil
}
2023-02-06 13:06:14 +00:00
// create one alert for community blocklist using the first decision
decisions := a . apiClient . Decisions . GetDecisionsFromGroups ( data . New )
2023-03-21 10:50:10 +00:00
//apply APIC specific whitelists
decisions = a . ApplyApicWhitelists ( decisions )
2023-02-06 13:06:14 +00:00
alert := createAlertForDecision ( decisions [ 0 ] )
alertsFromCapi := [ ] * models . Alert { alert }
alertsFromCapi = fillAlertsWithDecisions ( alertsFromCapi , decisions , add_counters )
err = a . SaveAlerts ( alertsFromCapi , add_counters , delete_counters )
if err != nil {
return errors . Wrap ( err , "while saving alerts" )
}
// update blocklists
if err := a . UpdateBlocklists ( data . Links , add_counters ) ; err != nil {
return errors . Wrap ( err , "while updating blocklists" )
}
return nil
}
2021-08-25 09:45:29 +00:00
2023-03-21 10:50:10 +00:00
func ( a * apic ) ApplyApicWhitelists ( decisions [ ] * models . Decision ) [ ] * models . Decision {
if a . whitelists == nil {
return decisions
}
//deal with CAPI whitelists for fire. We want to avoid having a second list, so we shrink in place
outIdx := 0
for _ , decision := range decisions {
if decision . Value == nil {
continue
}
skip := false
ipval := net . ParseIP ( * decision . Value )
for _ , cidr := range a . whitelists . Cidrs {
if skip {
break
}
if cidr . Contains ( ipval ) {
log . Infof ( "%s from %s is whitelisted by %s" , * decision . Value , * decision . Scenario , cidr . String ( ) )
skip = true
}
}
for _ , ip := range a . whitelists . Ips {
if skip {
break
}
if ip != nil && ip . Equal ( ipval ) {
log . Infof ( "%s from %s is whitelisted by %s" , * decision . Value , * decision . Scenario , ip . String ( ) )
skip = true
}
}
if ! skip {
decisions [ outIdx ] = decision
outIdx ++
}
}
//shrink the list, those are deleted items
decisions = decisions [ : outIdx ]
return decisions
}
2023-02-06 13:06:14 +00:00
func ( a * apic ) SaveAlerts ( alertsFromCapi [ ] * models . Alert , add_counters map [ string ] map [ string ] int , delete_counters map [ string ] map [ string ] int ) error {
2022-01-11 13:31:51 +00:00
for idx , alert := range alertsFromCapi {
2022-03-29 12:20:26 +00:00
alertsFromCapi [ idx ] = setAlertScenario ( add_counters , delete_counters , alert )
2022-01-11 13:31:51 +00:00
log . Debugf ( "%s has %d decisions" , * alertsFromCapi [ idx ] . Source . Scope , len ( alertsFromCapi [ idx ] . Decisions ) )
2022-10-26 08:48:17 +00:00
if a . dbClient . Type == "sqlite" && ( a . dbClient . WalMode == nil || ! * a . dbClient . WalMode ) {
log . Warningf ( "sqlite is not using WAL mode, LAPI might become unresponsive when inserting the community blocklist" )
}
2022-01-11 13:31:51 +00:00
alertID , inserted , deleted , err := a . dbClient . UpdateCommunityBlocklist ( alertsFromCapi [ idx ] )
if err != nil {
return errors . Wrapf ( err , "while saving alert from %s" , * alertsFromCapi [ idx ] . Source . Scope )
}
log . Printf ( "%s : added %d entries, deleted %d entries (alert:%d)" , * alertsFromCapi [ idx ] . Source . Scope , inserted , deleted , alertID )
}
2023-02-06 13:06:14 +00:00
return nil
}
2023-02-13 14:06:14 +00:00
func ( a * apic ) ShouldForcePullBlocklist ( blocklist * modelscapi . BlocklistLink ) ( bool , error ) {
// we should force pull if the blocklist decisions are about to expire or there's no decision in the db
alertQuery := a . dbClient . Ent . Alert . Query ( )
alertQuery . Where ( alert . SourceScopeEQ ( fmt . Sprintf ( "%s:%s" , types . ListOrigin , * blocklist . Name ) ) )
alertQuery . Order ( ent . Desc ( alert . FieldCreatedAt ) )
alertInstance , err := alertQuery . First ( context . Background ( ) )
if err != nil {
if ent . IsNotFound ( err ) {
log . Debugf ( "no alert found for %s, force refresh" , * blocklist . Name )
return true , nil
}
return false , errors . Wrap ( err , "while getting alert" )
}
decisionQuery := a . dbClient . Ent . Decision . Query ( )
decisionQuery . Where ( decision . HasOwnerWith ( alert . IDEQ ( alertInstance . ID ) ) )
firstDecision , err := decisionQuery . First ( context . Background ( ) )
if err != nil {
if ent . IsNotFound ( err ) {
log . Debugf ( "no decision found for %s, force refresh" , * blocklist . Name )
return true , nil
}
return false , errors . Wrap ( err , "while getting decision" )
}
if firstDecision == nil || firstDecision . Until == nil || firstDecision . Until . Sub ( time . Now ( ) . UTC ( ) ) < ( a . pullInterval + 15 * time . Minute ) {
log . Debugf ( "at least one decision found for %s, expire soon, force refresh" , * blocklist . Name )
return true , nil
}
return false , nil
}
2023-02-06 13:06:14 +00:00
func ( a * apic ) UpdateBlocklists ( links * modelscapi . GetDecisionsStreamResponseLinks , add_counters map [ string ] map [ string ] int ) error {
if links == nil {
return nil
}
if links . Blocklists == nil {
return nil
}
// we must use a different http client than apiClient's because the transport of apiClient is jwtTransport or here we have signed apis that are incompatibles
// we can use the same baseUrl as the urls are absolute and the parse will take care of it
2023-02-13 14:06:14 +00:00
defaultClient , err := apiclient . NewDefaultClient ( a . apiClient . BaseURL , "" , "" , nil )
2023-02-06 13:06:14 +00:00
if err != nil {
return errors . Wrap ( err , "while creating default client" )
}
for _ , blocklist := range links . Blocklists {
if blocklist . Scope == nil {
log . Warningf ( "blocklist has no scope" )
continue
}
if blocklist . Duration == nil {
log . Warningf ( "blocklist has no duration" )
continue
}
2023-02-13 14:06:14 +00:00
forcePull , err := a . ShouldForcePullBlocklist ( blocklist )
if err != nil {
return errors . Wrapf ( err , "while checking if we should force pull blocklist %s" , * blocklist . Name )
}
blocklistConfigItemName := fmt . Sprintf ( "blocklist:%s:last_pull" , * blocklist . Name )
var lastPullTimestamp * string
if ! forcePull {
lastPullTimestamp , err = a . dbClient . GetConfigItem ( blocklistConfigItemName )
if err != nil {
return errors . Wrapf ( err , "while getting last pull timestamp for blocklist %s" , * blocklist . Name )
}
}
decisions , has_changed , err := defaultClient . Decisions . GetDecisionsFromBlocklist ( context . Background ( ) , blocklist , lastPullTimestamp )
2023-02-06 13:06:14 +00:00
if err != nil {
return errors . Wrapf ( err , "while getting decisions from blocklist %s" , * blocklist . Name )
}
2023-02-13 14:06:14 +00:00
if ! has_changed {
if lastPullTimestamp == nil {
log . Infof ( "blocklist %s hasn't been modified or there was an error reading it, skipping" , * blocklist . Name )
} else {
log . Infof ( "blocklist %s hasn't been modified since %s, skipping" , * blocklist . Name , * lastPullTimestamp )
}
continue
}
err = a . dbClient . SetConfigItem ( blocklistConfigItemName , time . Now ( ) . UTC ( ) . Format ( http . TimeFormat ) )
if err != nil {
return errors . Wrapf ( err , "while setting last pull timestamp for blocklist %s" , * blocklist . Name )
}
2023-02-06 13:06:14 +00:00
if len ( decisions ) == 0 {
log . Infof ( "blocklist %s has no decisions" , * blocklist . Name )
continue
}
2023-03-21 10:50:10 +00:00
//apply APIC specific whitelists
decisions = a . ApplyApicWhitelists ( decisions )
2023-02-06 13:06:14 +00:00
alert := createAlertForDecision ( decisions [ 0 ] )
alertsFromCapi := [ ] * models . Alert { alert }
alertsFromCapi = fillAlertsWithDecisions ( alertsFromCapi , decisions , add_counters )
err = a . SaveAlerts ( alertsFromCapi , add_counters , nil )
if err != nil {
return errors . Wrapf ( err , "while saving alert from blocklist %s" , * blocklist . Name )
}
}
2020-11-30 09:37:17 +00:00
return nil
}
2022-03-29 12:20:26 +00:00
func setAlertScenario ( add_counters map [ string ] map [ string ] int , delete_counters map [ string ] map [ string ] int , alert * models . Alert ) * models . Alert {
2023-01-31 13:47:44 +00:00
if * alert . Source . Scope == types . CAPIOrigin {
* alert . Source . Scope = SCOPE_CAPI_ALIAS_ALIAS
2023-05-25 13:43:39 +00:00
alert . Scenario = ptr . Of ( fmt . Sprintf ( "update : +%d/-%d IPs" , add_counters [ types . CAPIOrigin ] [ "all" ] , delete_counters [ types . CAPIOrigin ] [ "all" ] ) )
2023-01-31 13:47:44 +00:00
} else if * alert . Source . Scope == types . ListOrigin {
* alert . Source . Scope = fmt . Sprintf ( "%s:%s" , types . ListOrigin , * alert . Scenario )
2023-05-25 13:43:39 +00:00
alert . Scenario = ptr . Of ( fmt . Sprintf ( "update : +%d/-%d IPs" , add_counters [ types . ListOrigin ] [ * alert . Scenario ] , delete_counters [ types . ListOrigin ] [ * alert . Scenario ] ) )
2022-03-29 12:20:26 +00:00
}
return alert
}
2020-11-30 09:37:17 +00:00
func ( a * apic ) Pull ( ) error {
2023-05-23 08:52:47 +00:00
defer trace . CatchPanic ( "lapi/pullFromAPIC" )
2020-11-30 09:37:17 +00:00
2020-12-14 10:54:16 +00:00
toldOnce := false
2020-11-30 09:37:17 +00:00
for {
2022-03-29 12:20:26 +00:00
scenario , err := a . FetchScenariosListFromDB ( )
if err != nil {
log . Errorf ( "unable to fetch scenarios from db: %s" , err )
}
2020-11-30 09:37:17 +00:00
if len ( scenario ) > 0 {
break
}
2020-12-14 10:54:16 +00:00
if ! toldOnce {
2022-06-22 07:38:23 +00:00
log . Warning ( "scenario list is empty, will not pull yet" )
2020-12-14 10:54:16 +00:00
toldOnce = true
}
2020-11-30 09:37:17 +00:00
time . Sleep ( 1 * time . Second )
}
2023-03-21 13:06:19 +00:00
if err := a . PullTop ( false ) ; err != nil {
2020-11-30 09:37:17 +00:00
log . Errorf ( "capi pull top: %s" , err )
}
2022-10-28 11:55:59 +00:00
log . Infof ( "Start pull from CrowdSec Central API (interval: %s once, then %s)" , a . pullIntervalFirst . Round ( time . Second ) , a . pullInterval )
ticker := time . NewTicker ( a . pullIntervalFirst )
2020-11-30 09:37:17 +00:00
for {
select {
case <- ticker . C :
2022-10-28 11:55:59 +00:00
ticker . Reset ( a . pullInterval )
2023-03-21 13:06:19 +00:00
if err := a . PullTop ( false ) ; err != nil {
2020-11-30 09:37:17 +00:00
log . Errorf ( "capi pull top: %s" , err )
continue
}
case <- a . pullTomb . Dying ( ) : // if one apic routine is dying, do we kill the others?
a . metricsTomb . Kill ( nil )
a . pushTomb . Kill ( nil )
return nil
}
}
}
2021-11-02 11:16:33 +00:00
func ( a * apic ) GetMetrics ( ) ( * models . Metrics , error ) {
metric := & models . Metrics {
2023-05-25 13:43:39 +00:00
ApilVersion : ptr . Of ( version . String ( ) ) ,
2022-01-13 15:46:16 +00:00
Machines : make ( [ ] * models . MetricsAgentInfo , 0 ) ,
Bouncers : make ( [ ] * models . MetricsBouncerInfo , 0 ) ,
2021-11-02 11:16:33 +00:00
}
machines , err := a . dbClient . ListMachines ( )
if err != nil {
return metric , err
}
bouncers , err := a . dbClient . ListBouncers ( )
if err != nil {
return metric , err
}
2022-01-20 17:10:40 +00:00
var lastpush string
2021-11-02 11:16:33 +00:00
for _ , machine := range machines {
2022-01-20 17:10:40 +00:00
if machine . LastPush == nil {
lastpush = time . Time { } . String ( )
} else {
lastpush = machine . LastPush . String ( )
}
2022-01-13 15:46:16 +00:00
m := & models . MetricsAgentInfo {
Version : machine . Version ,
Name : machine . MachineId ,
LastUpdate : machine . UpdatedAt . String ( ) ,
2022-01-20 17:10:40 +00:00
LastPush : lastpush ,
2021-11-02 11:16:33 +00:00
}
metric . Machines = append ( metric . Machines , m )
}
for _ , bouncer := range bouncers {
2022-01-13 15:46:16 +00:00
m := & models . MetricsBouncerInfo {
Version : bouncer . Version ,
CustomName : bouncer . Name ,
Name : bouncer . Type ,
LastPull : bouncer . LastPull . String ( ) ,
2021-11-02 11:16:33 +00:00
}
metric . Bouncers = append ( metric . Bouncers , m )
}
return metric , nil
}
2022-09-30 14:01:42 +00:00
func ( a * apic ) SendMetrics ( stop chan ( bool ) ) {
2023-05-23 08:52:47 +00:00
defer trace . CatchPanic ( "lapi/metricsToAPIC" )
2020-11-30 09:37:17 +00:00
2022-10-28 11:55:59 +00:00
ticker := time . NewTicker ( a . metricsIntervalFirst )
log . Infof ( "Start send metrics to CrowdSec Central API (interval: %s once, then %s)" , a . metricsIntervalFirst . Round ( time . Second ) , a . metricsInterval )
2020-11-30 09:37:17 +00:00
for {
2022-09-30 14:01:42 +00:00
metrics , err := a . GetMetrics ( )
if err != nil {
log . Errorf ( "unable to get metrics (%s), will retry" , err )
}
_ , _ , err = a . apiClient . Metrics . Add ( context . Background ( ) , metrics )
if err != nil {
log . Errorf ( "capi metrics: failed: %s" , err )
} else {
log . Infof ( "capi metrics: metrics sent successfully" )
}
2020-11-30 09:37:17 +00:00
select {
2022-09-30 14:01:42 +00:00
case <- stop :
return
2020-11-30 09:37:17 +00:00
case <- ticker . C :
2022-10-28 11:55:59 +00:00
ticker . Reset ( a . metricsInterval )
2020-11-30 09:37:17 +00:00
case <- a . metricsTomb . Dying ( ) : // if one apic routine is dying, do we kill the others?
a . pullTomb . Kill ( nil )
a . pushTomb . Kill ( nil )
2022-09-30 14:01:42 +00:00
return
2020-11-30 09:37:17 +00:00
}
}
}
func ( a * apic ) Shutdown ( ) {
a . pushTomb . Kill ( nil )
a . pullTomb . Kill ( nil )
a . metricsTomb . Kill ( nil )
}
2022-03-29 12:20:26 +00:00
func makeAddAndDeleteCounters ( ) ( map [ string ] map [ string ] int , map [ string ] map [ string ] int ) {
add_counters := make ( map [ string ] map [ string ] int )
2023-01-31 13:47:44 +00:00
add_counters [ types . CAPIOrigin ] = make ( map [ string ] int )
add_counters [ types . ListOrigin ] = make ( map [ string ] int )
2022-03-29 12:20:26 +00:00
delete_counters := make ( map [ string ] map [ string ] int )
2023-01-31 13:47:44 +00:00
delete_counters [ types . CAPIOrigin ] = make ( map [ string ] int )
delete_counters [ types . ListOrigin ] = make ( map [ string ] int )
2022-03-29 12:20:26 +00:00
return add_counters , delete_counters
}
2023-02-06 13:06:14 +00:00
func updateCounterForDecision ( counter map [ string ] map [ string ] int , origin * string , scenario * string , totalDecisions int ) {
if * origin == types . CAPIOrigin {
counter [ * origin ] [ "all" ] += totalDecisions
} else if * origin == types . ListOrigin {
counter [ * origin ] [ * scenario ] += totalDecisions
2022-10-06 09:48:06 +00:00
} else {
2023-02-06 13:06:14 +00:00
log . Warningf ( "Unknown origin %s" , * origin )
2022-03-29 12:20:26 +00:00
}
}