apic.go 29 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869
  1. package apiserver
  2. import (
  3. "context"
  4. "fmt"
  5. "math/rand"
  6. "net"
  7. "net/http"
  8. "net/url"
  9. "strconv"
  10. "strings"
  11. "sync"
  12. "time"
  13. "github.com/go-openapi/strfmt"
  14. "github.com/pkg/errors"
  15. log "github.com/sirupsen/logrus"
  16. "golang.org/x/exp/slices"
  17. "gopkg.in/tomb.v2"
  18. "github.com/crowdsecurity/go-cs-lib/pkg/ptr"
  19. "github.com/crowdsecurity/go-cs-lib/pkg/trace"
  20. "github.com/crowdsecurity/go-cs-lib/pkg/version"
  21. "github.com/crowdsecurity/crowdsec/pkg/apiclient"
  22. "github.com/crowdsecurity/crowdsec/pkg/csconfig"
  23. "github.com/crowdsecurity/crowdsec/pkg/database"
  24. "github.com/crowdsecurity/crowdsec/pkg/database/ent"
  25. "github.com/crowdsecurity/crowdsec/pkg/database/ent/alert"
  26. "github.com/crowdsecurity/crowdsec/pkg/database/ent/decision"
  27. "github.com/crowdsecurity/crowdsec/pkg/models"
  28. "github.com/crowdsecurity/crowdsec/pkg/modelscapi"
  29. "github.com/crowdsecurity/crowdsec/pkg/types"
  30. )
  31. const (
  32. // delta values must be smaller than the interval
  33. pullIntervalDefault = time.Hour * 2
  34. pullIntervalDelta = 5 * time.Minute
  35. pushIntervalDefault = time.Second * 10
  36. pushIntervalDelta = time.Second * 7
  37. metricsIntervalDefault = time.Minute * 30
  38. metricsIntervalDelta = time.Minute * 15
  39. )
  40. var SCOPE_CAPI_ALIAS_ALIAS string = "crowdsecurity/community-blocklist" //we don't use "CAPI" directly, to make it less confusing for the user
  41. type apic struct {
  42. // when changing the intervals in tests, always set *First too
  43. // or they can be negative
  44. pullInterval time.Duration
  45. pullIntervalFirst time.Duration
  46. pushInterval time.Duration
  47. pushIntervalFirst time.Duration
  48. metricsInterval time.Duration
  49. metricsIntervalFirst time.Duration
  50. dbClient *database.Client
  51. apiClient *apiclient.ApiClient
  52. AlertsAddChan chan []*models.Alert
  53. mu sync.Mutex
  54. pushTomb tomb.Tomb
  55. pullTomb tomb.Tomb
  56. metricsTomb tomb.Tomb
  57. startup bool
  58. credentials *csconfig.ApiCredentialsCfg
  59. scenarioList []string
  60. consoleConfig *csconfig.ConsoleConfig
  61. isPulling chan bool
  62. whitelists *csconfig.CapiWhitelist
  63. }
  64. // randomDuration returns a duration value between d-delta and d+delta
  65. func randomDuration(d time.Duration, delta time.Duration) time.Duration {
  66. ret := d + time.Duration(rand.Int63n(int64(2*delta))) - delta
  67. // ticker interval must be > 0 (nanoseconds)
  68. if ret <= 0 {
  69. return 1
  70. }
  71. return ret
  72. }
  73. func (a *apic) FetchScenariosListFromDB() ([]string, error) {
  74. scenarios := make([]string, 0)
  75. machines, err := a.dbClient.ListMachines()
  76. if err != nil {
  77. return nil, fmt.Errorf("while listing machines: %w", err)
  78. }
  79. //merge all scenarios together
  80. for _, v := range machines {
  81. machineScenarios := strings.Split(v.Scenarios, ",")
  82. log.Debugf("%d scenarios for machine %d", len(machineScenarios), v.ID)
  83. for _, sv := range machineScenarios {
  84. if !slices.Contains(scenarios, sv) && sv != "" {
  85. scenarios = append(scenarios, sv)
  86. }
  87. }
  88. }
  89. log.Debugf("Returning list of scenarios : %+v", scenarios)
  90. return scenarios, nil
  91. }
  92. func decisionsToApiDecisions(decisions []*models.Decision) models.AddSignalsRequestItemDecisions {
  93. apiDecisions := models.AddSignalsRequestItemDecisions{}
  94. for _, decision := range decisions {
  95. x := &models.AddSignalsRequestItemDecisionsItem{
  96. Duration: ptr.Of(*decision.Duration),
  97. ID: new(int64),
  98. Origin: ptr.Of(*decision.Origin),
  99. Scenario: ptr.Of(*decision.Scenario),
  100. Scope: ptr.Of(*decision.Scope),
  101. //Simulated: *decision.Simulated,
  102. Type: ptr.Of(*decision.Type),
  103. Until: decision.Until,
  104. Value: ptr.Of(*decision.Value),
  105. UUID: decision.UUID,
  106. }
  107. *x.ID = decision.ID
  108. if decision.Simulated != nil {
  109. x.Simulated = *decision.Simulated
  110. }
  111. apiDecisions = append(apiDecisions, x)
  112. }
  113. return apiDecisions
  114. }
  115. func alertToSignal(alert *models.Alert, scenarioTrust string, shareContext bool) *models.AddSignalsRequestItem {
  116. signal := &models.AddSignalsRequestItem{
  117. Message: alert.Message,
  118. Scenario: alert.Scenario,
  119. ScenarioHash: alert.ScenarioHash,
  120. ScenarioVersion: alert.ScenarioVersion,
  121. Source: &models.AddSignalsRequestItemSource{
  122. AsName: alert.Source.AsName,
  123. AsNumber: alert.Source.AsNumber,
  124. Cn: alert.Source.Cn,
  125. IP: alert.Source.IP,
  126. Latitude: alert.Source.Latitude,
  127. Longitude: alert.Source.Longitude,
  128. Range: alert.Source.Range,
  129. Scope: alert.Source.Scope,
  130. Value: alert.Source.Value,
  131. },
  132. StartAt: alert.StartAt,
  133. StopAt: alert.StopAt,
  134. CreatedAt: alert.CreatedAt,
  135. MachineID: alert.MachineID,
  136. ScenarioTrust: scenarioTrust,
  137. Decisions: decisionsToApiDecisions(alert.Decisions),
  138. UUID: alert.UUID,
  139. }
  140. if shareContext {
  141. signal.Context = make([]*models.AddSignalsRequestItemContextItems0, 0)
  142. for _, meta := range alert.Meta {
  143. contextItem := models.AddSignalsRequestItemContextItems0{
  144. Key: meta.Key,
  145. Value: meta.Value,
  146. }
  147. signal.Context = append(signal.Context, &contextItem)
  148. }
  149. }
  150. return signal
  151. }
  152. func NewAPIC(config *csconfig.OnlineApiClientCfg, dbClient *database.Client, consoleConfig *csconfig.ConsoleConfig, apicWhitelist *csconfig.CapiWhitelist) (*apic, error) {
  153. var err error
  154. ret := &apic{
  155. AlertsAddChan: make(chan []*models.Alert),
  156. dbClient: dbClient,
  157. mu: sync.Mutex{},
  158. startup: true,
  159. credentials: config.Credentials,
  160. pullTomb: tomb.Tomb{},
  161. pushTomb: tomb.Tomb{},
  162. metricsTomb: tomb.Tomb{},
  163. scenarioList: make([]string, 0),
  164. consoleConfig: consoleConfig,
  165. pullInterval: pullIntervalDefault,
  166. pullIntervalFirst: randomDuration(pullIntervalDefault, pullIntervalDelta),
  167. pushInterval: pushIntervalDefault,
  168. pushIntervalFirst: randomDuration(pushIntervalDefault, pushIntervalDelta),
  169. metricsInterval: metricsIntervalDefault,
  170. metricsIntervalFirst: randomDuration(metricsIntervalDefault, metricsIntervalDelta),
  171. isPulling: make(chan bool, 1),
  172. whitelists: apicWhitelist,
  173. }
  174. password := strfmt.Password(config.Credentials.Password)
  175. apiURL, err := url.Parse(config.Credentials.URL)
  176. if err != nil {
  177. return nil, fmt.Errorf("while parsing '%s': %w", config.Credentials.URL, err)
  178. }
  179. papiURL, err := url.Parse(config.Credentials.PapiURL)
  180. if err != nil {
  181. return nil, fmt.Errorf("while parsing '%s': %w", config.Credentials.PapiURL, err)
  182. }
  183. ret.scenarioList, err = ret.FetchScenariosListFromDB()
  184. if err != nil {
  185. return nil, fmt.Errorf("while fetching scenarios from db: %w", err)
  186. }
  187. ret.apiClient, err = apiclient.NewClient(&apiclient.Config{
  188. MachineID: config.Credentials.Login,
  189. Password: password,
  190. UserAgent: fmt.Sprintf("crowdsec/%s", version.String()),
  191. URL: apiURL,
  192. PapiURL: papiURL,
  193. VersionPrefix: "v3",
  194. Scenarios: ret.scenarioList,
  195. UpdateScenario: ret.FetchScenariosListFromDB,
  196. })
  197. if err != nil {
  198. return nil, fmt.Errorf("while creating api client: %w", err)
  199. }
  200. // The watcher will be authenticated by the RoundTripper the first time it will call CAPI
  201. // Explicit authentication will provoke an useless supplementary call to CAPI
  202. scenarios, err := ret.FetchScenariosListFromDB()
  203. if err != nil {
  204. return ret, fmt.Errorf("get scenario in db: %w", err)
  205. }
  206. authResp, _, err := ret.apiClient.Auth.AuthenticateWatcher(context.Background(), models.WatcherAuthRequest{
  207. MachineID: &config.Credentials.Login,
  208. Password: &password,
  209. Scenarios: scenarios,
  210. })
  211. if err != nil {
  212. return ret, fmt.Errorf("authenticate watcher (%s): %w", config.Credentials.Login, err)
  213. }
  214. if err := ret.apiClient.GetClient().Transport.(*apiclient.JWTTransport).Expiration.UnmarshalText([]byte(authResp.Expire)); err != nil {
  215. return ret, fmt.Errorf("unable to parse jwt expiration: %w", err)
  216. }
  217. ret.apiClient.GetClient().Transport.(*apiclient.JWTTransport).Token = authResp.Token
  218. return ret, err
  219. }
  220. // keep track of all alerts in cache and push it to CAPI every PushInterval.
  221. func (a *apic) Push() error {
  222. defer trace.CatchPanic("lapi/pushToAPIC")
  223. var cache models.AddSignalsRequest
  224. ticker := time.NewTicker(a.pushIntervalFirst)
  225. log.Infof("Start push to CrowdSec Central API (interval: %s once, then %s)", a.pushIntervalFirst.Round(time.Second), a.pushInterval)
  226. for {
  227. select {
  228. case <-a.pushTomb.Dying(): // if one apic routine is dying, do we kill the others?
  229. a.pullTomb.Kill(nil)
  230. a.metricsTomb.Kill(nil)
  231. log.Infof("push tomb is dying, sending cache (%d elements) before exiting", len(cache))
  232. if len(cache) == 0 {
  233. return nil
  234. }
  235. go a.Send(&cache)
  236. return nil
  237. case <-ticker.C:
  238. ticker.Reset(a.pushInterval)
  239. if len(cache) > 0 {
  240. a.mu.Lock()
  241. cacheCopy := cache
  242. cache = make(models.AddSignalsRequest, 0)
  243. a.mu.Unlock()
  244. log.Infof("Signal push: %d signals to push", len(cacheCopy))
  245. go a.Send(&cacheCopy)
  246. }
  247. case alerts := <-a.AlertsAddChan:
  248. var signals []*models.AddSignalsRequestItem
  249. for _, alert := range alerts {
  250. if ok := shouldShareAlert(alert, a.consoleConfig); ok {
  251. signals = append(signals, alertToSignal(alert, getScenarioTrustOfAlert(alert), *a.consoleConfig.ShareContext))
  252. }
  253. }
  254. a.mu.Lock()
  255. cache = append(cache, signals...)
  256. a.mu.Unlock()
  257. }
  258. }
  259. }
  260. func getScenarioTrustOfAlert(alert *models.Alert) string {
  261. scenarioTrust := "certified"
  262. if alert.ScenarioHash == nil || *alert.ScenarioHash == "" {
  263. scenarioTrust = "custom"
  264. } else if alert.ScenarioVersion == nil || *alert.ScenarioVersion == "" || *alert.ScenarioVersion == "?" {
  265. scenarioTrust = "tainted"
  266. }
  267. if len(alert.Decisions) > 0 {
  268. if *alert.Decisions[0].Origin == types.CscliOrigin {
  269. scenarioTrust = "manual"
  270. }
  271. }
  272. return scenarioTrust
  273. }
  274. func shouldShareAlert(alert *models.Alert, consoleConfig *csconfig.ConsoleConfig) bool {
  275. if *alert.Simulated {
  276. log.Debugf("simulation enabled for alert (id:%d), will not be sent to CAPI", alert.ID)
  277. return false
  278. }
  279. switch scenarioTrust := getScenarioTrustOfAlert(alert); scenarioTrust {
  280. case "manual":
  281. if !*consoleConfig.ShareManualDecisions {
  282. log.Debugf("manual decision generated an alert, doesn't send it to CAPI because options is disabled")
  283. return false
  284. }
  285. case "tainted":
  286. if !*consoleConfig.ShareTaintedScenarios {
  287. log.Debugf("tainted scenario generated an alert, doesn't send it to CAPI because options is disabled")
  288. return false
  289. }
  290. case "custom":
  291. if !*consoleConfig.ShareCustomScenarios {
  292. log.Debugf("custom scenario generated an alert, doesn't send it to CAPI because options is disabled")
  293. return false
  294. }
  295. }
  296. return true
  297. }
  298. func (a *apic) Send(cacheOrig *models.AddSignalsRequest) {
  299. /*we do have a problem with this :
  300. The apic.Push background routine reads from alertToPush chan.
  301. This chan is filled by Controller.CreateAlert
  302. If the chan apic.Send hangs, the alertToPush chan will become full,
  303. with means that Controller.CreateAlert is going to hang, blocking API worker(s).
  304. So instead, we prefer to cancel write.
  305. I don't know enough about gin to tell how much of an issue it can be.
  306. */
  307. var cache []*models.AddSignalsRequestItem = *cacheOrig
  308. var send models.AddSignalsRequest
  309. bulkSize := 50
  310. pageStart := 0
  311. pageEnd := bulkSize
  312. for {
  313. if pageEnd >= len(cache) {
  314. send = cache[pageStart:]
  315. ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
  316. defer cancel()
  317. _, _, err := a.apiClient.Signal.Add(ctx, &send)
  318. if err != nil {
  319. log.Errorf("sending signal to central API: %s", err)
  320. return
  321. }
  322. break
  323. }
  324. send = cache[pageStart:pageEnd]
  325. ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
  326. defer cancel()
  327. _, _, err := a.apiClient.Signal.Add(ctx, &send)
  328. if err != nil {
  329. //we log it here as well, because the return value of func might be discarded
  330. log.Errorf("sending signal to central API: %s", err)
  331. }
  332. pageStart += bulkSize
  333. pageEnd += bulkSize
  334. }
  335. }
  336. func (a *apic) CAPIPullIsOld() (bool, error) {
  337. /*only pull community blocklist if it's older than 1h30 */
  338. alerts := a.dbClient.Ent.Alert.Query()
  339. alerts = alerts.Where(alert.HasDecisionsWith(decision.OriginEQ(database.CapiMachineID)))
  340. alerts = alerts.Where(alert.CreatedAtGTE(time.Now().UTC().Add(-time.Duration(1*time.Hour + 30*time.Minute)))) //nolint:unconvert
  341. count, err := alerts.Count(a.dbClient.CTX)
  342. if err != nil {
  343. return false, fmt.Errorf("while looking for CAPI alert: %w", err)
  344. }
  345. if count > 0 {
  346. log.Printf("last CAPI pull is newer than 1h30, skip.")
  347. return false, nil
  348. }
  349. return true, nil
  350. }
  351. func (a *apic) HandleDeletedDecisions(deletedDecisions []*models.Decision, delete_counters map[string]map[string]int) (int, error) {
  352. var filter map[string][]string
  353. var nbDeleted int
  354. for _, decision := range deletedDecisions {
  355. if strings.ToLower(*decision.Scope) == "ip" {
  356. filter = make(map[string][]string, 1)
  357. filter["value"] = []string{*decision.Value}
  358. } else {
  359. filter = make(map[string][]string, 3)
  360. filter["value"] = []string{*decision.Value}
  361. filter["type"] = []string{*decision.Type}
  362. filter["scopes"] = []string{*decision.Scope}
  363. }
  364. filter["origin"] = []string{*decision.Origin}
  365. dbCliRet, _, err := a.dbClient.SoftDeleteDecisionsWithFilter(filter)
  366. if err != nil {
  367. return 0, fmt.Errorf("deleting decisions error: %w", err)
  368. }
  369. dbCliDel, err := strconv.Atoi(dbCliRet)
  370. if err != nil {
  371. return 0, fmt.Errorf("converting db ret %d: %w", dbCliDel, err)
  372. }
  373. updateCounterForDecision(delete_counters, decision.Origin, decision.Scenario, dbCliDel)
  374. nbDeleted += dbCliDel
  375. }
  376. return nbDeleted, nil
  377. }
  378. func (a *apic) HandleDeletedDecisionsV3(deletedDecisions []*modelscapi.GetDecisionsStreamResponseDeletedItem, delete_counters map[string]map[string]int) (int, error) {
  379. var filter map[string][]string
  380. var nbDeleted int
  381. for _, decisions := range deletedDecisions {
  382. scope := decisions.Scope
  383. for _, decision := range decisions.Decisions {
  384. if strings.ToLower(*scope) == "ip" {
  385. filter = make(map[string][]string, 1)
  386. filter["value"] = []string{decision}
  387. } else {
  388. filter = make(map[string][]string, 2)
  389. filter["value"] = []string{decision}
  390. filter["scopes"] = []string{*scope}
  391. }
  392. filter["origin"] = []string{types.CAPIOrigin}
  393. dbCliRet, _, err := a.dbClient.SoftDeleteDecisionsWithFilter(filter)
  394. if err != nil {
  395. return 0, fmt.Errorf("deleting decisions error: %w", err)
  396. }
  397. dbCliDel, err := strconv.Atoi(dbCliRet)
  398. if err != nil {
  399. return 0, fmt.Errorf("converting db ret %d: %w", dbCliDel, err)
  400. }
  401. updateCounterForDecision(delete_counters, ptr.Of(types.CAPIOrigin), nil, dbCliDel)
  402. nbDeleted += dbCliDel
  403. }
  404. }
  405. return nbDeleted, nil
  406. }
  407. func createAlertsForDecisions(decisions []*models.Decision) []*models.Alert {
  408. newAlerts := make([]*models.Alert, 0)
  409. for _, decision := range decisions {
  410. found := false
  411. for _, sub := range newAlerts {
  412. if sub.Source.Scope == nil {
  413. log.Warningf("nil scope in %+v", sub)
  414. continue
  415. }
  416. if *decision.Origin == types.CAPIOrigin {
  417. if *sub.Source.Scope == types.CAPIOrigin {
  418. found = true
  419. break
  420. }
  421. } else if *decision.Origin == types.ListOrigin {
  422. if *sub.Source.Scope == *decision.Origin {
  423. if sub.Scenario == nil {
  424. log.Warningf("nil scenario in %+v", sub)
  425. }
  426. if *sub.Scenario == *decision.Scenario {
  427. found = true
  428. break
  429. }
  430. }
  431. } else {
  432. log.Warningf("unknown origin %s : %+v", *decision.Origin, decision)
  433. }
  434. }
  435. if !found {
  436. log.Debugf("Create entry for origin:%s scenario:%s", *decision.Origin, *decision.Scenario)
  437. newAlerts = append(newAlerts, createAlertForDecision(decision))
  438. }
  439. }
  440. return newAlerts
  441. }
  442. func createAlertForDecision(decision *models.Decision) *models.Alert {
  443. newAlert := &models.Alert{}
  444. newAlert.Source = &models.Source{}
  445. newAlert.Source.Scope = ptr.Of("")
  446. if *decision.Origin == types.CAPIOrigin { //to make things more user friendly, we replace CAPI with community-blocklist
  447. newAlert.Scenario = ptr.Of(types.CAPIOrigin)
  448. newAlert.Source.Scope = ptr.Of(types.CAPIOrigin)
  449. } else if *decision.Origin == types.ListOrigin {
  450. newAlert.Scenario = ptr.Of(*decision.Scenario)
  451. newAlert.Source.Scope = ptr.Of(types.ListOrigin)
  452. } else {
  453. log.Warningf("unknown origin %s", *decision.Origin)
  454. }
  455. newAlert.Message = ptr.Of("")
  456. newAlert.Source.Value = ptr.Of("")
  457. newAlert.StartAt = ptr.Of(time.Now().UTC().Format(time.RFC3339))
  458. newAlert.StopAt = ptr.Of(time.Now().UTC().Format(time.RFC3339))
  459. newAlert.Capacity = ptr.Of(int32(0))
  460. newAlert.Simulated = ptr.Of(false)
  461. newAlert.EventsCount = ptr.Of(int32(0))
  462. newAlert.Leakspeed = ptr.Of("")
  463. newAlert.ScenarioHash = ptr.Of("")
  464. newAlert.ScenarioVersion = ptr.Of("")
  465. newAlert.MachineID = database.CapiMachineID
  466. return newAlert
  467. }
  468. // This function takes in list of parent alerts and decisions and then pairs them up.
  469. func fillAlertsWithDecisions(alerts []*models.Alert, decisions []*models.Decision, add_counters map[string]map[string]int) []*models.Alert {
  470. for _, decision := range decisions {
  471. //count and create separate alerts for each list
  472. updateCounterForDecision(add_counters, decision.Origin, decision.Scenario, 1)
  473. /*CAPI might send lower case scopes, unify it.*/
  474. switch strings.ToLower(*decision.Scope) {
  475. case "ip":
  476. *decision.Scope = types.Ip
  477. case "range":
  478. *decision.Scope = types.Range
  479. }
  480. found := false
  481. //add the individual decisions to the right list
  482. for idx, alert := range alerts {
  483. if *decision.Origin == types.CAPIOrigin {
  484. if *alert.Source.Scope == types.CAPIOrigin {
  485. alerts[idx].Decisions = append(alerts[idx].Decisions, decision)
  486. found = true
  487. break
  488. }
  489. } else if *decision.Origin == types.ListOrigin {
  490. if *alert.Source.Scope == types.ListOrigin && *alert.Scenario == *decision.Scenario {
  491. alerts[idx].Decisions = append(alerts[idx].Decisions, decision)
  492. found = true
  493. break
  494. }
  495. } else {
  496. log.Warningf("unknown origin %s", *decision.Origin)
  497. }
  498. }
  499. if !found {
  500. log.Warningf("Orphaned decision for %s - %s", *decision.Origin, *decision.Scenario)
  501. }
  502. }
  503. return alerts
  504. }
  505. // we receive a list of decisions and links for blocklist and we need to create a list of alerts :
  506. // one alert for "community blocklist"
  507. // one alert per list we're subscribed to
  508. func (a *apic) PullTop(forcePull bool) error {
  509. var err error
  510. //A mutex with TryLock would be a bit simpler
  511. //But go does not guarantee that TryLock will be able to acquire the lock even if it is available
  512. select {
  513. case a.isPulling <- true:
  514. defer func() {
  515. <-a.isPulling
  516. }()
  517. default:
  518. return errors.New("pull already in progress")
  519. }
  520. if !forcePull {
  521. if lastPullIsOld, err := a.CAPIPullIsOld(); err != nil {
  522. return err
  523. } else if !lastPullIsOld {
  524. return nil
  525. }
  526. }
  527. log.Debug("Acquiring lock for pullCAPI")
  528. err = a.dbClient.AcquirePullCAPILock()
  529. if a.dbClient.IsLocked(err) {
  530. log.Info("PullCAPI is already running, skipping")
  531. return nil
  532. }
  533. log.Infof("Starting community-blocklist update")
  534. data, _, err := a.apiClient.Decisions.GetStreamV3(context.Background(), apiclient.DecisionsStreamOpts{Startup: a.startup})
  535. if err != nil {
  536. return fmt.Errorf("get stream: %w", err)
  537. }
  538. a.startup = false
  539. /*to count additions/deletions across lists*/
  540. log.Debugf("Received %d new decisions", len(data.New))
  541. log.Debugf("Received %d deleted decisions", len(data.Deleted))
  542. if data.Links != nil {
  543. log.Debugf("Received %d blocklists links", len(data.Links.Blocklists))
  544. }
  545. add_counters, delete_counters := makeAddAndDeleteCounters()
  546. // process deleted decisions
  547. if nbDeleted, err := a.HandleDeletedDecisionsV3(data.Deleted, delete_counters); err != nil {
  548. return err
  549. } else {
  550. log.Printf("capi/community-blocklist : %d explicit deletions", nbDeleted)
  551. }
  552. if len(data.New) == 0 {
  553. log.Infof("capi/community-blocklist : received 0 new entries (expected if you just installed crowdsec)")
  554. return nil
  555. }
  556. // create one alert for community blocklist using the first decision
  557. decisions := a.apiClient.Decisions.GetDecisionsFromGroups(data.New)
  558. //apply APIC specific whitelists
  559. decisions = a.ApplyApicWhitelists(decisions)
  560. alert := createAlertForDecision(decisions[0])
  561. alertsFromCapi := []*models.Alert{alert}
  562. alertsFromCapi = fillAlertsWithDecisions(alertsFromCapi, decisions, add_counters)
  563. err = a.SaveAlerts(alertsFromCapi, add_counters, delete_counters)
  564. if err != nil {
  565. return fmt.Errorf("while saving alerts: %w", err)
  566. }
  567. // update blocklists
  568. if err := a.UpdateBlocklists(data.Links, add_counters); err != nil {
  569. return fmt.Errorf("while updating blocklists: %w", err)
  570. }
  571. log.Debug("Releasing lock for pullCAPI")
  572. if err := a.dbClient.ReleasePullCAPILock(); err != nil {
  573. return fmt.Errorf("while releasing lock: %w", err)
  574. }
  575. return nil
  576. }
  577. func (a *apic) ApplyApicWhitelists(decisions []*models.Decision) []*models.Decision {
  578. if a.whitelists == nil {
  579. return decisions
  580. }
  581. //deal with CAPI whitelists for fire. We want to avoid having a second list, so we shrink in place
  582. outIdx := 0
  583. for _, decision := range decisions {
  584. if decision.Value == nil {
  585. continue
  586. }
  587. skip := false
  588. ipval := net.ParseIP(*decision.Value)
  589. for _, cidr := range a.whitelists.Cidrs {
  590. if skip {
  591. break
  592. }
  593. if cidr.Contains(ipval) {
  594. log.Infof("%s from %s is whitelisted by %s", *decision.Value, *decision.Scenario, cidr.String())
  595. skip = true
  596. }
  597. }
  598. for _, ip := range a.whitelists.Ips {
  599. if skip {
  600. break
  601. }
  602. if ip != nil && ip.Equal(ipval) {
  603. log.Infof("%s from %s is whitelisted by %s", *decision.Value, *decision.Scenario, ip.String())
  604. skip = true
  605. }
  606. }
  607. if !skip {
  608. decisions[outIdx] = decision
  609. outIdx++
  610. }
  611. }
  612. //shrink the list, those are deleted items
  613. decisions = decisions[:outIdx]
  614. return decisions
  615. }
  616. func (a *apic) SaveAlerts(alertsFromCapi []*models.Alert, add_counters map[string]map[string]int, delete_counters map[string]map[string]int) error {
  617. for idx, alert := range alertsFromCapi {
  618. alertsFromCapi[idx] = setAlertScenario(add_counters, delete_counters, alert)
  619. log.Debugf("%s has %d decisions", *alertsFromCapi[idx].Source.Scope, len(alertsFromCapi[idx].Decisions))
  620. if a.dbClient.Type == "sqlite" && (a.dbClient.WalMode == nil || !*a.dbClient.WalMode) {
  621. log.Warningf("sqlite is not using WAL mode, LAPI might become unresponsive when inserting the community blocklist")
  622. }
  623. alertID, inserted, deleted, err := a.dbClient.UpdateCommunityBlocklist(alertsFromCapi[idx])
  624. if err != nil {
  625. return fmt.Errorf("while saving alert from %s: %w", *alertsFromCapi[idx].Source.Scope, err)
  626. }
  627. log.Printf("%s : added %d entries, deleted %d entries (alert:%d)", *alertsFromCapi[idx].Source.Scope, inserted, deleted, alertID)
  628. }
  629. return nil
  630. }
  631. func (a *apic) ShouldForcePullBlocklist(blocklist *modelscapi.BlocklistLink) (bool, error) {
  632. // we should force pull if the blocklist decisions are about to expire or there's no decision in the db
  633. alertQuery := a.dbClient.Ent.Alert.Query()
  634. alertQuery.Where(alert.SourceScopeEQ(fmt.Sprintf("%s:%s", types.ListOrigin, *blocklist.Name)))
  635. alertQuery.Order(ent.Desc(alert.FieldCreatedAt))
  636. alertInstance, err := alertQuery.First(context.Background())
  637. if err != nil {
  638. if ent.IsNotFound(err) {
  639. log.Debugf("no alert found for %s, force refresh", *blocklist.Name)
  640. return true, nil
  641. }
  642. return false, fmt.Errorf("while getting alert: %w", err)
  643. }
  644. decisionQuery := a.dbClient.Ent.Decision.Query()
  645. decisionQuery.Where(decision.HasOwnerWith(alert.IDEQ(alertInstance.ID)))
  646. firstDecision, err := decisionQuery.First(context.Background())
  647. if err != nil {
  648. if ent.IsNotFound(err) {
  649. log.Debugf("no decision found for %s, force refresh", *blocklist.Name)
  650. return true, nil
  651. }
  652. return false, fmt.Errorf("while getting decision: %w", err)
  653. }
  654. if firstDecision == nil || firstDecision.Until == nil || firstDecision.Until.Sub(time.Now().UTC()) < (a.pullInterval+15*time.Minute) {
  655. log.Debugf("at least one decision found for %s, expire soon, force refresh", *blocklist.Name)
  656. return true, nil
  657. }
  658. return false, nil
  659. }
  660. func (a *apic) UpdateBlocklists(links *modelscapi.GetDecisionsStreamResponseLinks, add_counters map[string]map[string]int) error {
  661. if links == nil {
  662. return nil
  663. }
  664. if links.Blocklists == nil {
  665. return nil
  666. }
  667. // we must use a different http client than apiClient's because the transport of apiClient is jwtTransport or here we have signed apis that are incompatibles
  668. // we can use the same baseUrl as the urls are absolute and the parse will take care of it
  669. defaultClient, err := apiclient.NewDefaultClient(a.apiClient.BaseURL, "", "", nil)
  670. if err != nil {
  671. return fmt.Errorf("while creating default client: %w", err)
  672. }
  673. for _, blocklist := range links.Blocklists {
  674. if blocklist.Scope == nil {
  675. log.Warningf("blocklist has no scope")
  676. continue
  677. }
  678. if blocklist.Duration == nil {
  679. log.Warningf("blocklist has no duration")
  680. continue
  681. }
  682. forcePull, err := a.ShouldForcePullBlocklist(blocklist)
  683. if err != nil {
  684. return fmt.Errorf("while checking if we should force pull blocklist %s: %w", *blocklist.Name, err)
  685. }
  686. blocklistConfigItemName := fmt.Sprintf("blocklist:%s:last_pull", *blocklist.Name)
  687. var lastPullTimestamp *string
  688. if !forcePull {
  689. lastPullTimestamp, err = a.dbClient.GetConfigItem(blocklistConfigItemName)
  690. if err != nil {
  691. return fmt.Errorf("while getting last pull timestamp for blocklist %s: %w", *blocklist.Name, err)
  692. }
  693. }
  694. decisions, has_changed, err := defaultClient.Decisions.GetDecisionsFromBlocklist(context.Background(), blocklist, lastPullTimestamp)
  695. if err != nil {
  696. return fmt.Errorf("while getting decisions from blocklist %s: %w", *blocklist.Name, err)
  697. }
  698. if !has_changed {
  699. if lastPullTimestamp == nil {
  700. log.Infof("blocklist %s hasn't been modified or there was an error reading it, skipping", *blocklist.Name)
  701. } else {
  702. log.Infof("blocklist %s hasn't been modified since %s, skipping", *blocklist.Name, *lastPullTimestamp)
  703. }
  704. continue
  705. }
  706. err = a.dbClient.SetConfigItem(blocklistConfigItemName, time.Now().UTC().Format(http.TimeFormat))
  707. if err != nil {
  708. return fmt.Errorf("while setting last pull timestamp for blocklist %s: %w", *blocklist.Name, err)
  709. }
  710. if len(decisions) == 0 {
  711. log.Infof("blocklist %s has no decisions", *blocklist.Name)
  712. continue
  713. }
  714. //apply APIC specific whitelists
  715. decisions = a.ApplyApicWhitelists(decisions)
  716. alert := createAlertForDecision(decisions[0])
  717. alertsFromCapi := []*models.Alert{alert}
  718. alertsFromCapi = fillAlertsWithDecisions(alertsFromCapi, decisions, add_counters)
  719. err = a.SaveAlerts(alertsFromCapi, add_counters, nil)
  720. if err != nil {
  721. return fmt.Errorf("while saving alert from blocklist %s: %w", *blocklist.Name, err)
  722. }
  723. }
  724. return nil
  725. }
  726. func setAlertScenario(add_counters map[string]map[string]int, delete_counters map[string]map[string]int, alert *models.Alert) *models.Alert {
  727. if *alert.Source.Scope == types.CAPIOrigin {
  728. *alert.Source.Scope = SCOPE_CAPI_ALIAS_ALIAS
  729. alert.Scenario = ptr.Of(fmt.Sprintf("update : +%d/-%d IPs", add_counters[types.CAPIOrigin]["all"], delete_counters[types.CAPIOrigin]["all"]))
  730. } else if *alert.Source.Scope == types.ListOrigin {
  731. *alert.Source.Scope = fmt.Sprintf("%s:%s", types.ListOrigin, *alert.Scenario)
  732. alert.Scenario = ptr.Of(fmt.Sprintf("update : +%d/-%d IPs", add_counters[types.ListOrigin][*alert.Scenario], delete_counters[types.ListOrigin][*alert.Scenario]))
  733. }
  734. return alert
  735. }
  736. func (a *apic) Pull() error {
  737. defer trace.CatchPanic("lapi/pullFromAPIC")
  738. toldOnce := false
  739. for {
  740. scenario, err := a.FetchScenariosListFromDB()
  741. if err != nil {
  742. log.Errorf("unable to fetch scenarios from db: %s", err)
  743. }
  744. if len(scenario) > 0 {
  745. break
  746. }
  747. if !toldOnce {
  748. log.Warning("scenario list is empty, will not pull yet")
  749. toldOnce = true
  750. }
  751. time.Sleep(1 * time.Second)
  752. }
  753. if err := a.PullTop(false); err != nil {
  754. log.Errorf("capi pull top: %s", err)
  755. }
  756. log.Infof("Start pull from CrowdSec Central API (interval: %s once, then %s)", a.pullIntervalFirst.Round(time.Second), a.pullInterval)
  757. ticker := time.NewTicker(a.pullIntervalFirst)
  758. for {
  759. select {
  760. case <-ticker.C:
  761. ticker.Reset(a.pullInterval)
  762. if err := a.PullTop(false); err != nil {
  763. log.Errorf("capi pull top: %s", err)
  764. continue
  765. }
  766. case <-a.pullTomb.Dying(): // if one apic routine is dying, do we kill the others?
  767. a.metricsTomb.Kill(nil)
  768. a.pushTomb.Kill(nil)
  769. return nil
  770. }
  771. }
  772. }
  773. func (a *apic) Shutdown() {
  774. a.pushTomb.Kill(nil)
  775. a.pullTomb.Kill(nil)
  776. a.metricsTomb.Kill(nil)
  777. }
  778. func makeAddAndDeleteCounters() (map[string]map[string]int, map[string]map[string]int) {
  779. add_counters := make(map[string]map[string]int)
  780. add_counters[types.CAPIOrigin] = make(map[string]int)
  781. add_counters[types.ListOrigin] = make(map[string]int)
  782. delete_counters := make(map[string]map[string]int)
  783. delete_counters[types.CAPIOrigin] = make(map[string]int)
  784. delete_counters[types.ListOrigin] = make(map[string]int)
  785. return add_counters, delete_counters
  786. }
  787. func updateCounterForDecision(counter map[string]map[string]int, origin *string, scenario *string, totalDecisions int) {
  788. if *origin == types.CAPIOrigin {
  789. counter[*origin]["all"] += totalDecisions
  790. } else if *origin == types.ListOrigin {
  791. counter[*origin][*scenario] += totalDecisions
  792. } else {
  793. log.Warningf("Unknown origin %s", *origin)
  794. }
  795. }