apic.go 29 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995
  1. package apiserver
  2. import (
  3. "context"
  4. "errors"
  5. "fmt"
  6. "math/rand"
  7. "net"
  8. "net/http"
  9. "net/url"
  10. "slices"
  11. "strconv"
  12. "strings"
  13. "sync"
  14. "time"
  15. "github.com/go-openapi/strfmt"
  16. log "github.com/sirupsen/logrus"
  17. "gopkg.in/tomb.v2"
  18. "github.com/crowdsecurity/go-cs-lib/ptr"
  19. "github.com/crowdsecurity/go-cs-lib/trace"
  20. "github.com/crowdsecurity/go-cs-lib/version"
  21. "github.com/crowdsecurity/crowdsec/pkg/apiclient"
  22. "github.com/crowdsecurity/crowdsec/pkg/csconfig"
  23. "github.com/crowdsecurity/crowdsec/pkg/database"
  24. "github.com/crowdsecurity/crowdsec/pkg/database/ent"
  25. "github.com/crowdsecurity/crowdsec/pkg/database/ent/alert"
  26. "github.com/crowdsecurity/crowdsec/pkg/database/ent/decision"
  27. "github.com/crowdsecurity/crowdsec/pkg/models"
  28. "github.com/crowdsecurity/crowdsec/pkg/modelscapi"
  29. "github.com/crowdsecurity/crowdsec/pkg/types"
  30. )
  31. const (
  32. // delta values must be smaller than the interval
  33. pullIntervalDefault = time.Hour * 2
  34. pullIntervalDelta = 5 * time.Minute
  35. pushIntervalDefault = time.Second * 10
  36. pushIntervalDelta = time.Second * 7
  37. metricsIntervalDefault = time.Minute * 30
  38. metricsIntervalDelta = time.Minute * 15
  39. )
  40. type apic struct {
  41. // when changing the intervals in tests, always set *First too
  42. // or they can be negative
  43. pullInterval time.Duration
  44. pullIntervalFirst time.Duration
  45. pushInterval time.Duration
  46. pushIntervalFirst time.Duration
  47. metricsInterval time.Duration
  48. metricsIntervalFirst time.Duration
  49. dbClient *database.Client
  50. apiClient *apiclient.ApiClient
  51. AlertsAddChan chan []*models.Alert
  52. mu sync.Mutex
  53. pushTomb tomb.Tomb
  54. pullTomb tomb.Tomb
  55. metricsTomb tomb.Tomb
  56. startup bool
  57. credentials *csconfig.ApiCredentialsCfg
  58. scenarioList []string
  59. consoleConfig *csconfig.ConsoleConfig
  60. isPulling chan bool
  61. whitelists *csconfig.CapiWhitelist
  62. }
  63. // randomDuration returns a duration value between d-delta and d+delta
  64. func randomDuration(d time.Duration, delta time.Duration) time.Duration {
  65. ret := d + time.Duration(rand.Int63n(int64(2*delta))) - delta
  66. // ticker interval must be > 0 (nanoseconds)
  67. if ret <= 0 {
  68. return 1
  69. }
  70. return ret
  71. }
  72. func (a *apic) FetchScenariosListFromDB() ([]string, error) {
  73. scenarios := make([]string, 0)
  74. machines, err := a.dbClient.ListMachines()
  75. if err != nil {
  76. return nil, fmt.Errorf("while listing machines: %w", err)
  77. }
  78. //merge all scenarios together
  79. for _, v := range machines {
  80. machineScenarios := strings.Split(v.Scenarios, ",")
  81. log.Debugf("%d scenarios for machine %d", len(machineScenarios), v.ID)
  82. for _, sv := range machineScenarios {
  83. if !slices.Contains(scenarios, sv) && sv != "" {
  84. scenarios = append(scenarios, sv)
  85. }
  86. }
  87. }
  88. log.Debugf("Returning list of scenarios : %+v", scenarios)
  89. return scenarios, nil
  90. }
  91. func decisionsToApiDecisions(decisions []*models.Decision) models.AddSignalsRequestItemDecisions {
  92. apiDecisions := models.AddSignalsRequestItemDecisions{}
  93. for _, decision := range decisions {
  94. x := &models.AddSignalsRequestItemDecisionsItem{
  95. Duration: ptr.Of(*decision.Duration),
  96. ID: new(int64),
  97. Origin: ptr.Of(*decision.Origin),
  98. Scenario: ptr.Of(*decision.Scenario),
  99. Scope: ptr.Of(*decision.Scope),
  100. //Simulated: *decision.Simulated,
  101. Type: ptr.Of(*decision.Type),
  102. Until: decision.Until,
  103. Value: ptr.Of(*decision.Value),
  104. UUID: decision.UUID,
  105. }
  106. *x.ID = decision.ID
  107. if decision.Simulated != nil {
  108. x.Simulated = *decision.Simulated
  109. }
  110. apiDecisions = append(apiDecisions, x)
  111. }
  112. return apiDecisions
  113. }
  114. func alertToSignal(alert *models.Alert, scenarioTrust string, shareContext bool) *models.AddSignalsRequestItem {
  115. signal := &models.AddSignalsRequestItem{
  116. Message: alert.Message,
  117. Scenario: alert.Scenario,
  118. ScenarioHash: alert.ScenarioHash,
  119. ScenarioVersion: alert.ScenarioVersion,
  120. Source: &models.AddSignalsRequestItemSource{
  121. AsName: alert.Source.AsName,
  122. AsNumber: alert.Source.AsNumber,
  123. Cn: alert.Source.Cn,
  124. IP: alert.Source.IP,
  125. Latitude: alert.Source.Latitude,
  126. Longitude: alert.Source.Longitude,
  127. Range: alert.Source.Range,
  128. Scope: alert.Source.Scope,
  129. Value: alert.Source.Value,
  130. },
  131. StartAt: alert.StartAt,
  132. StopAt: alert.StopAt,
  133. CreatedAt: alert.CreatedAt,
  134. MachineID: alert.MachineID,
  135. ScenarioTrust: scenarioTrust,
  136. Decisions: decisionsToApiDecisions(alert.Decisions),
  137. UUID: alert.UUID,
  138. }
  139. if shareContext {
  140. signal.Context = make([]*models.AddSignalsRequestItemContextItems0, 0)
  141. for _, meta := range alert.Meta {
  142. contextItem := models.AddSignalsRequestItemContextItems0{
  143. Key: meta.Key,
  144. Value: meta.Value,
  145. }
  146. signal.Context = append(signal.Context, &contextItem)
  147. }
  148. }
  149. return signal
  150. }
  151. func NewAPIC(config *csconfig.OnlineApiClientCfg, dbClient *database.Client, consoleConfig *csconfig.ConsoleConfig, apicWhitelist *csconfig.CapiWhitelist) (*apic, error) {
  152. var err error
  153. ret := &apic{
  154. AlertsAddChan: make(chan []*models.Alert),
  155. dbClient: dbClient,
  156. mu: sync.Mutex{},
  157. startup: true,
  158. credentials: config.Credentials,
  159. pullTomb: tomb.Tomb{},
  160. pushTomb: tomb.Tomb{},
  161. metricsTomb: tomb.Tomb{},
  162. scenarioList: make([]string, 0),
  163. consoleConfig: consoleConfig,
  164. pullInterval: pullIntervalDefault,
  165. pullIntervalFirst: randomDuration(pullIntervalDefault, pullIntervalDelta),
  166. pushInterval: pushIntervalDefault,
  167. pushIntervalFirst: randomDuration(pushIntervalDefault, pushIntervalDelta),
  168. metricsInterval: metricsIntervalDefault,
  169. metricsIntervalFirst: randomDuration(metricsIntervalDefault, metricsIntervalDelta),
  170. isPulling: make(chan bool, 1),
  171. whitelists: apicWhitelist,
  172. }
  173. password := strfmt.Password(config.Credentials.Password)
  174. apiURL, err := url.Parse(config.Credentials.URL)
  175. if err != nil {
  176. return nil, fmt.Errorf("while parsing '%s': %w", config.Credentials.URL, err)
  177. }
  178. papiURL, err := url.Parse(config.Credentials.PapiURL)
  179. if err != nil {
  180. return nil, fmt.Errorf("while parsing '%s': %w", config.Credentials.PapiURL, err)
  181. }
  182. ret.scenarioList, err = ret.FetchScenariosListFromDB()
  183. if err != nil {
  184. return nil, fmt.Errorf("while fetching scenarios from db: %w", err)
  185. }
  186. ret.apiClient, err = apiclient.NewClient(&apiclient.Config{
  187. MachineID: config.Credentials.Login,
  188. Password: password,
  189. UserAgent: fmt.Sprintf("crowdsec/%s", version.String()),
  190. URL: apiURL,
  191. PapiURL: papiURL,
  192. VersionPrefix: "v3",
  193. Scenarios: ret.scenarioList,
  194. UpdateScenario: ret.FetchScenariosListFromDB,
  195. })
  196. if err != nil {
  197. return nil, fmt.Errorf("while creating api client: %w", err)
  198. }
  199. // The watcher will be authenticated by the RoundTripper the first time it will call CAPI
  200. // Explicit authentication will provoke a useless supplementary call to CAPI
  201. scenarios, err := ret.FetchScenariosListFromDB()
  202. if err != nil {
  203. return ret, fmt.Errorf("get scenario in db: %w", err)
  204. }
  205. authResp, _, err := ret.apiClient.Auth.AuthenticateWatcher(context.Background(), models.WatcherAuthRequest{
  206. MachineID: &config.Credentials.Login,
  207. Password: &password,
  208. Scenarios: scenarios,
  209. })
  210. if err != nil {
  211. return ret, fmt.Errorf("authenticate watcher (%s): %w", config.Credentials.Login, err)
  212. }
  213. if err = ret.apiClient.GetClient().Transport.(*apiclient.JWTTransport).Expiration.UnmarshalText([]byte(authResp.Expire)); err != nil {
  214. return ret, fmt.Errorf("unable to parse jwt expiration: %w", err)
  215. }
  216. ret.apiClient.GetClient().Transport.(*apiclient.JWTTransport).Token = authResp.Token
  217. return ret, err
  218. }
  219. // keep track of all alerts in cache and push it to CAPI every PushInterval.
  220. func (a *apic) Push() error {
  221. defer trace.CatchPanic("lapi/pushToAPIC")
  222. var cache models.AddSignalsRequest
  223. ticker := time.NewTicker(a.pushIntervalFirst)
  224. log.Infof("Start push to CrowdSec Central API (interval: %s once, then %s)", a.pushIntervalFirst.Round(time.Second), a.pushInterval)
  225. for {
  226. select {
  227. case <-a.pushTomb.Dying(): // if one apic routine is dying, do we kill the others?
  228. a.pullTomb.Kill(nil)
  229. a.metricsTomb.Kill(nil)
  230. log.Infof("push tomb is dying, sending cache (%d elements) before exiting", len(cache))
  231. if len(cache) == 0 {
  232. return nil
  233. }
  234. go a.Send(&cache)
  235. return nil
  236. case <-ticker.C:
  237. ticker.Reset(a.pushInterval)
  238. if len(cache) > 0 {
  239. a.mu.Lock()
  240. cacheCopy := cache
  241. cache = make(models.AddSignalsRequest, 0)
  242. a.mu.Unlock()
  243. log.Infof("Signal push: %d signals to push", len(cacheCopy))
  244. go a.Send(&cacheCopy)
  245. }
  246. case alerts := <-a.AlertsAddChan:
  247. var signals []*models.AddSignalsRequestItem
  248. for _, alert := range alerts {
  249. if ok := shouldShareAlert(alert, a.consoleConfig); ok {
  250. signals = append(signals, alertToSignal(alert, getScenarioTrustOfAlert(alert), *a.consoleConfig.ShareContext))
  251. }
  252. }
  253. a.mu.Lock()
  254. cache = append(cache, signals...)
  255. a.mu.Unlock()
  256. }
  257. }
  258. }
  259. func getScenarioTrustOfAlert(alert *models.Alert) string {
  260. scenarioTrust := "certified"
  261. if alert.ScenarioHash == nil || *alert.ScenarioHash == "" {
  262. scenarioTrust = "custom"
  263. } else if alert.ScenarioVersion == nil || *alert.ScenarioVersion == "" || *alert.ScenarioVersion == "?" {
  264. scenarioTrust = "tainted"
  265. }
  266. if len(alert.Decisions) > 0 {
  267. if *alert.Decisions[0].Origin == types.CscliOrigin {
  268. scenarioTrust = "manual"
  269. }
  270. }
  271. return scenarioTrust
  272. }
  273. func shouldShareAlert(alert *models.Alert, consoleConfig *csconfig.ConsoleConfig) bool {
  274. if *alert.Simulated {
  275. log.Debugf("simulation enabled for alert (id:%d), will not be sent to CAPI", alert.ID)
  276. return false
  277. }
  278. switch scenarioTrust := getScenarioTrustOfAlert(alert); scenarioTrust {
  279. case "manual":
  280. if !*consoleConfig.ShareManualDecisions {
  281. log.Debugf("manual decision generated an alert, doesn't send it to CAPI because options is disabled")
  282. return false
  283. }
  284. case "tainted":
  285. if !*consoleConfig.ShareTaintedScenarios {
  286. log.Debugf("tainted scenario generated an alert, doesn't send it to CAPI because options is disabled")
  287. return false
  288. }
  289. case "custom":
  290. if !*consoleConfig.ShareCustomScenarios {
  291. log.Debugf("custom scenario generated an alert, doesn't send it to CAPI because options is disabled")
  292. return false
  293. }
  294. }
  295. return true
  296. }
  297. func (a *apic) Send(cacheOrig *models.AddSignalsRequest) {
  298. /*we do have a problem with this :
  299. The apic.Push background routine reads from alertToPush chan.
  300. This chan is filled by Controller.CreateAlert
  301. If the chan apic.Send hangs, the alertToPush chan will become full,
  302. with means that Controller.CreateAlert is going to hang, blocking API worker(s).
  303. So instead, we prefer to cancel write.
  304. I don't know enough about gin to tell how much of an issue it can be.
  305. */
  306. var (
  307. cache []*models.AddSignalsRequestItem = *cacheOrig
  308. send models.AddSignalsRequest
  309. )
  310. bulkSize := 50
  311. pageStart := 0
  312. pageEnd := bulkSize
  313. for {
  314. if pageEnd >= len(cache) {
  315. send = cache[pageStart:]
  316. ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
  317. defer cancel()
  318. _, _, err := a.apiClient.Signal.Add(ctx, &send)
  319. if err != nil {
  320. log.Errorf("sending signal to central API: %s", err)
  321. return
  322. }
  323. break
  324. }
  325. send = cache[pageStart:pageEnd]
  326. ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
  327. defer cancel()
  328. _, _, err := a.apiClient.Signal.Add(ctx, &send)
  329. if err != nil {
  330. //we log it here as well, because the return value of func might be discarded
  331. log.Errorf("sending signal to central API: %s", err)
  332. }
  333. pageStart += bulkSize
  334. pageEnd += bulkSize
  335. }
  336. }
  337. func (a *apic) CAPIPullIsOld() (bool, error) {
  338. /*only pull community blocklist if it's older than 1h30 */
  339. alerts := a.dbClient.Ent.Alert.Query()
  340. alerts = alerts.Where(alert.HasDecisionsWith(decision.OriginEQ(database.CapiMachineID)))
  341. alerts = alerts.Where(alert.CreatedAtGTE(time.Now().UTC().Add(-time.Duration(1*time.Hour + 30*time.Minute)))) //nolint:unconvert
  342. count, err := alerts.Count(a.dbClient.CTX)
  343. if err != nil {
  344. return false, fmt.Errorf("while looking for CAPI alert: %w", err)
  345. }
  346. if count > 0 {
  347. log.Printf("last CAPI pull is newer than 1h30, skip.")
  348. return false, nil
  349. }
  350. return true, nil
  351. }
  352. func (a *apic) HandleDeletedDecisions(deletedDecisions []*models.Decision, deleteCounters map[string]map[string]int) (int, error) {
  353. nbDeleted := 0
  354. for _, decision := range deletedDecisions {
  355. filter := map[string][]string{
  356. "value": {*decision.Value},
  357. "origin": {*decision.Origin},
  358. }
  359. if strings.ToLower(*decision.Scope) != "ip" {
  360. filter["type"] = []string{*decision.Type}
  361. filter["scopes"] = []string{*decision.Scope}
  362. }
  363. dbCliRet, _, err := a.dbClient.SoftDeleteDecisionsWithFilter(filter)
  364. if err != nil {
  365. return 0, fmt.Errorf("deleting decisions error: %w", err)
  366. }
  367. dbCliDel, err := strconv.Atoi(dbCliRet)
  368. if err != nil {
  369. return 0, fmt.Errorf("converting db ret %d: %w", dbCliDel, err)
  370. }
  371. updateCounterForDecision(deleteCounters, decision.Origin, decision.Scenario, dbCliDel)
  372. nbDeleted += dbCliDel
  373. }
  374. return nbDeleted, nil
  375. }
  376. func (a *apic) HandleDeletedDecisionsV3(deletedDecisions []*modelscapi.GetDecisionsStreamResponseDeletedItem, deleteCounters map[string]map[string]int) (int, error) {
  377. var nbDeleted int
  378. for _, decisions := range deletedDecisions {
  379. scope := decisions.Scope
  380. for _, decision := range decisions.Decisions {
  381. filter := map[string][]string{
  382. "value": {decision},
  383. "origin": {types.CAPIOrigin},
  384. }
  385. if strings.ToLower(*scope) != "ip" {
  386. filter["scopes"] = []string{*scope}
  387. }
  388. dbCliRet, _, err := a.dbClient.SoftDeleteDecisionsWithFilter(filter)
  389. if err != nil {
  390. return 0, fmt.Errorf("deleting decisions error: %w", err)
  391. }
  392. dbCliDel, err := strconv.Atoi(dbCliRet)
  393. if err != nil {
  394. return 0, fmt.Errorf("converting db ret %d: %w", dbCliDel, err)
  395. }
  396. updateCounterForDecision(deleteCounters, ptr.Of(types.CAPIOrigin), nil, dbCliDel)
  397. nbDeleted += dbCliDel
  398. }
  399. }
  400. return nbDeleted, nil
  401. }
  402. func createAlertsForDecisions(decisions []*models.Decision) []*models.Alert {
  403. newAlerts := make([]*models.Alert, 0)
  404. for _, decision := range decisions {
  405. found := false
  406. for _, sub := range newAlerts {
  407. if sub.Source.Scope == nil {
  408. log.Warningf("nil scope in %+v", sub)
  409. continue
  410. }
  411. if *decision.Origin == types.CAPIOrigin {
  412. if *sub.Source.Scope == types.CAPIOrigin {
  413. found = true
  414. break
  415. }
  416. } else if *decision.Origin == types.ListOrigin {
  417. if *sub.Source.Scope == *decision.Origin {
  418. if sub.Scenario == nil {
  419. log.Warningf("nil scenario in %+v", sub)
  420. }
  421. if *sub.Scenario == *decision.Scenario {
  422. found = true
  423. break
  424. }
  425. }
  426. } else {
  427. log.Warningf("unknown origin %s : %+v", *decision.Origin, decision)
  428. }
  429. }
  430. if !found {
  431. log.Debugf("Create entry for origin:%s scenario:%s", *decision.Origin, *decision.Scenario)
  432. newAlerts = append(newAlerts, createAlertForDecision(decision))
  433. }
  434. }
  435. return newAlerts
  436. }
  437. func createAlertForDecision(decision *models.Decision) *models.Alert {
  438. var (
  439. scenario string
  440. scope string
  441. )
  442. switch *decision.Origin {
  443. case types.CAPIOrigin:
  444. scenario = types.CAPIOrigin
  445. scope = types.CAPIOrigin
  446. case types.ListOrigin:
  447. scenario = *decision.Scenario
  448. scope = types.ListOrigin
  449. default:
  450. scenario = ""
  451. scope = ""
  452. log.Warningf("unknown origin %s", *decision.Origin)
  453. }
  454. return &models.Alert{
  455. Source: &models.Source{
  456. Scope: ptr.Of(scope),
  457. Value: ptr.Of(""),
  458. },
  459. Scenario: ptr.Of(scenario),
  460. Message: ptr.Of(""),
  461. StartAt: ptr.Of(time.Now().UTC().Format(time.RFC3339)),
  462. StopAt: ptr.Of(time.Now().UTC().Format(time.RFC3339)),
  463. Capacity: ptr.Of(int32(0)),
  464. Simulated: ptr.Of(false),
  465. EventsCount: ptr.Of(int32(0)),
  466. Leakspeed: ptr.Of(""),
  467. ScenarioHash: ptr.Of(""),
  468. ScenarioVersion: ptr.Of(""),
  469. MachineID: database.CapiMachineID,
  470. }
  471. }
  472. // This function takes in list of parent alerts and decisions and then pairs them up.
  473. func fillAlertsWithDecisions(alerts []*models.Alert, decisions []*models.Decision, addCounters map[string]map[string]int) []*models.Alert {
  474. for _, decision := range decisions {
  475. //count and create separate alerts for each list
  476. updateCounterForDecision(addCounters, decision.Origin, decision.Scenario, 1)
  477. /*CAPI might send lower case scopes, unify it.*/
  478. switch strings.ToLower(*decision.Scope) {
  479. case "ip":
  480. *decision.Scope = types.Ip
  481. case "range":
  482. *decision.Scope = types.Range
  483. }
  484. found := false
  485. //add the individual decisions to the right list
  486. for idx, alert := range alerts {
  487. if *decision.Origin == types.CAPIOrigin {
  488. if *alert.Source.Scope == types.CAPIOrigin {
  489. alerts[idx].Decisions = append(alerts[idx].Decisions, decision)
  490. found = true
  491. break
  492. }
  493. } else if *decision.Origin == types.ListOrigin {
  494. if *alert.Source.Scope == types.ListOrigin && *alert.Scenario == *decision.Scenario {
  495. alerts[idx].Decisions = append(alerts[idx].Decisions, decision)
  496. found = true
  497. break
  498. }
  499. } else {
  500. log.Warningf("unknown origin %s", *decision.Origin)
  501. }
  502. }
  503. if !found {
  504. log.Warningf("Orphaned decision for %s - %s", *decision.Origin, *decision.Scenario)
  505. }
  506. }
  507. return alerts
  508. }
  509. // we receive a list of decisions and links for blocklist and we need to create a list of alerts :
  510. // one alert for "community blocklist"
  511. // one alert per list we're subscribed to
  512. func (a *apic) PullTop(forcePull bool) error {
  513. var err error
  514. //A mutex with TryLock would be a bit simpler
  515. //But go does not guarantee that TryLock will be able to acquire the lock even if it is available
  516. select {
  517. case a.isPulling <- true:
  518. defer func() {
  519. <-a.isPulling
  520. }()
  521. default:
  522. return errors.New("pull already in progress")
  523. }
  524. if !forcePull {
  525. if lastPullIsOld, err := a.CAPIPullIsOld(); err != nil {
  526. return err
  527. } else if !lastPullIsOld {
  528. return nil
  529. }
  530. }
  531. log.Debug("Acquiring lock for pullCAPI")
  532. err = a.dbClient.AcquirePullCAPILock()
  533. if a.dbClient.IsLocked(err) {
  534. log.Info("PullCAPI is already running, skipping")
  535. return nil
  536. }
  537. log.Infof("Starting community-blocklist update")
  538. data, _, err := a.apiClient.Decisions.GetStreamV3(context.Background(), apiclient.DecisionsStreamOpts{Startup: a.startup})
  539. if err != nil {
  540. return fmt.Errorf("get stream: %w", err)
  541. }
  542. a.startup = false
  543. /*to count additions/deletions across lists*/
  544. log.Debugf("Received %d new decisions", len(data.New))
  545. log.Debugf("Received %d deleted decisions", len(data.Deleted))
  546. if data.Links != nil {
  547. log.Debugf("Received %d blocklists links", len(data.Links.Blocklists))
  548. }
  549. addCounters, deleteCounters := makeAddAndDeleteCounters()
  550. // process deleted decisions
  551. nbDeleted, err := a.HandleDeletedDecisionsV3(data.Deleted, deleteCounters)
  552. if err != nil {
  553. return err
  554. }
  555. log.Printf("capi/community-blocklist : %d explicit deletions", nbDeleted)
  556. if len(data.New) == 0 {
  557. log.Infof("capi/community-blocklist : received 0 new entries (expected if you just installed crowdsec)")
  558. return nil
  559. }
  560. // create one alert for community blocklist using the first decision
  561. decisions := a.apiClient.Decisions.GetDecisionsFromGroups(data.New)
  562. //apply APIC specific whitelists
  563. decisions = a.ApplyApicWhitelists(decisions)
  564. alert := createAlertForDecision(decisions[0])
  565. alertsFromCapi := []*models.Alert{alert}
  566. alertsFromCapi = fillAlertsWithDecisions(alertsFromCapi, decisions, addCounters)
  567. err = a.SaveAlerts(alertsFromCapi, addCounters, deleteCounters)
  568. if err != nil {
  569. return fmt.Errorf("while saving alerts: %w", err)
  570. }
  571. // update blocklists
  572. if err := a.UpdateBlocklists(data.Links, addCounters, forcePull); err != nil {
  573. return fmt.Errorf("while updating blocklists: %w", err)
  574. }
  575. log.Debug("Releasing lock for pullCAPI")
  576. if err := a.dbClient.ReleasePullCAPILock(); err != nil {
  577. return fmt.Errorf("while releasing lock: %w", err)
  578. }
  579. return nil
  580. }
  581. // we receive a link to a blocklist, we pull the content of the blocklist and we create one alert
  582. func (a *apic) PullBlocklist(blocklist *modelscapi.BlocklistLink, forcePull bool) error {
  583. addCounters, _ := makeAddAndDeleteCounters()
  584. if err := a.UpdateBlocklists(&modelscapi.GetDecisionsStreamResponseLinks{
  585. Blocklists: []*modelscapi.BlocklistLink{blocklist},
  586. }, addCounters, forcePull); err != nil {
  587. return fmt.Errorf("while pulling blocklist: %w", err)
  588. }
  589. return nil
  590. }
  591. // if decisions is whitelisted: return representation of the whitelist ip or cidr
  592. // if not whitelisted: empty string
  593. func (a *apic) whitelistedBy(decision *models.Decision) string {
  594. if decision.Value == nil {
  595. return ""
  596. }
  597. ipval := net.ParseIP(*decision.Value)
  598. for _, cidr := range a.whitelists.Cidrs {
  599. if cidr.Contains(ipval) {
  600. return cidr.String()
  601. }
  602. }
  603. for _, ip := range a.whitelists.Ips {
  604. if ip != nil && ip.Equal(ipval) {
  605. return ip.String()
  606. }
  607. }
  608. return ""
  609. }
  610. func (a *apic) ApplyApicWhitelists(decisions []*models.Decision) []*models.Decision {
  611. if a.whitelists == nil || len(a.whitelists.Cidrs) == 0 && len(a.whitelists.Ips) == 0 {
  612. return decisions
  613. }
  614. //deal with CAPI whitelists for fire. We want to avoid having a second list, so we shrink in place
  615. outIdx := 0
  616. for _, decision := range decisions {
  617. whitelister := a.whitelistedBy(decision)
  618. if whitelister != "" {
  619. log.Infof("%s from %s is whitelisted by %s", *decision.Value, *decision.Scenario, whitelister)
  620. continue
  621. }
  622. decisions[outIdx] = decision
  623. outIdx++
  624. }
  625. //shrink the list, those are deleted items
  626. return decisions[:outIdx]
  627. }
  628. func (a *apic) SaveAlerts(alertsFromCapi []*models.Alert, addCounters map[string]map[string]int, deleteCounters map[string]map[string]int) error {
  629. for _, alert := range alertsFromCapi {
  630. setAlertScenario(alert, addCounters, deleteCounters)
  631. log.Debugf("%s has %d decisions", *alert.Source.Scope, len(alert.Decisions))
  632. if a.dbClient.Type == "sqlite" && (a.dbClient.WalMode == nil || !*a.dbClient.WalMode) {
  633. log.Warningf("sqlite is not using WAL mode, LAPI might become unresponsive when inserting the community blocklist")
  634. }
  635. alertID, inserted, deleted, err := a.dbClient.UpdateCommunityBlocklist(alert)
  636. if err != nil {
  637. return fmt.Errorf("while saving alert from %s: %w", *alert.Source.Scope, err)
  638. }
  639. log.Printf("%s : added %d entries, deleted %d entries (alert:%d)", *alert.Source.Scope, inserted, deleted, alertID)
  640. }
  641. return nil
  642. }
  643. func (a *apic) ShouldForcePullBlocklist(blocklist *modelscapi.BlocklistLink) (bool, error) {
  644. // we should force pull if the blocklist decisions are about to expire or there's no decision in the db
  645. alertQuery := a.dbClient.Ent.Alert.Query()
  646. alertQuery.Where(alert.SourceScopeEQ(fmt.Sprintf("%s:%s", types.ListOrigin, *blocklist.Name)))
  647. alertQuery.Order(ent.Desc(alert.FieldCreatedAt))
  648. alertInstance, err := alertQuery.First(context.Background())
  649. if err != nil {
  650. if ent.IsNotFound(err) {
  651. log.Debugf("no alert found for %s, force refresh", *blocklist.Name)
  652. return true, nil
  653. }
  654. return false, fmt.Errorf("while getting alert: %w", err)
  655. }
  656. decisionQuery := a.dbClient.Ent.Decision.Query()
  657. decisionQuery.Where(decision.HasOwnerWith(alert.IDEQ(alertInstance.ID)))
  658. firstDecision, err := decisionQuery.First(context.Background())
  659. if err != nil {
  660. if ent.IsNotFound(err) {
  661. log.Debugf("no decision found for %s, force refresh", *blocklist.Name)
  662. return true, nil
  663. }
  664. return false, fmt.Errorf("while getting decision: %w", err)
  665. }
  666. if firstDecision == nil || firstDecision.Until == nil || firstDecision.Until.Sub(time.Now().UTC()) < (a.pullInterval+15*time.Minute) {
  667. log.Debugf("at least one decision found for %s, expire soon, force refresh", *blocklist.Name)
  668. return true, nil
  669. }
  670. return false, nil
  671. }
  672. func (a *apic) updateBlocklist(client *apiclient.ApiClient, blocklist *modelscapi.BlocklistLink, addCounters map[string]map[string]int, forcePull bool) error {
  673. if blocklist.Scope == nil {
  674. log.Warningf("blocklist has no scope")
  675. return nil
  676. }
  677. if blocklist.Duration == nil {
  678. log.Warningf("blocklist has no duration")
  679. return nil
  680. }
  681. if !forcePull {
  682. _forcePull, err := a.ShouldForcePullBlocklist(blocklist)
  683. if err != nil {
  684. return fmt.Errorf("while checking if we should force pull blocklist %s: %w", *blocklist.Name, err)
  685. }
  686. forcePull = _forcePull
  687. }
  688. blocklistConfigItemName := fmt.Sprintf("blocklist:%s:last_pull", *blocklist.Name)
  689. var (
  690. lastPullTimestamp *string
  691. err error
  692. )
  693. if !forcePull {
  694. lastPullTimestamp, err = a.dbClient.GetConfigItem(blocklistConfigItemName)
  695. if err != nil {
  696. return fmt.Errorf("while getting last pull timestamp for blocklist %s: %w", *blocklist.Name, err)
  697. }
  698. }
  699. decisions, hasChanged, err := client.Decisions.GetDecisionsFromBlocklist(context.Background(), blocklist, lastPullTimestamp)
  700. if err != nil {
  701. return fmt.Errorf("while getting decisions from blocklist %s: %w", *blocklist.Name, err)
  702. }
  703. if !hasChanged {
  704. if lastPullTimestamp == nil {
  705. log.Infof("blocklist %s hasn't been modified or there was an error reading it, skipping", *blocklist.Name)
  706. } else {
  707. log.Infof("blocklist %s hasn't been modified since %s, skipping", *blocklist.Name, *lastPullTimestamp)
  708. }
  709. return nil
  710. }
  711. err = a.dbClient.SetConfigItem(blocklistConfigItemName, time.Now().UTC().Format(http.TimeFormat))
  712. if err != nil {
  713. return fmt.Errorf("while setting last pull timestamp for blocklist %s: %w", *blocklist.Name, err)
  714. }
  715. if len(decisions) == 0 {
  716. log.Infof("blocklist %s has no decisions", *blocklist.Name)
  717. return nil
  718. }
  719. //apply APIC specific whitelists
  720. decisions = a.ApplyApicWhitelists(decisions)
  721. alert := createAlertForDecision(decisions[0])
  722. alertsFromCapi := []*models.Alert{alert}
  723. alertsFromCapi = fillAlertsWithDecisions(alertsFromCapi, decisions, addCounters)
  724. err = a.SaveAlerts(alertsFromCapi, addCounters, nil)
  725. if err != nil {
  726. return fmt.Errorf("while saving alert from blocklist %s: %w", *blocklist.Name, err)
  727. }
  728. return nil
  729. }
  730. func (a *apic) UpdateBlocklists(links *modelscapi.GetDecisionsStreamResponseLinks, addCounters map[string]map[string]int, forcePull bool) error {
  731. if links == nil {
  732. return nil
  733. }
  734. if links.Blocklists == nil {
  735. return nil
  736. }
  737. // we must use a different http client than apiClient's because the transport of apiClient is jwtTransport or here we have signed apis that are incompatibles
  738. // we can use the same baseUrl as the urls are absolute and the parse will take care of it
  739. defaultClient, err := apiclient.NewDefaultClient(a.apiClient.BaseURL, "", "", nil)
  740. if err != nil {
  741. return fmt.Errorf("while creating default client: %w", err)
  742. }
  743. for _, blocklist := range links.Blocklists {
  744. if err := a.updateBlocklist(defaultClient, blocklist, addCounters, forcePull); err != nil {
  745. return err
  746. }
  747. }
  748. return nil
  749. }
  750. func setAlertScenario(alert *models.Alert, addCounters map[string]map[string]int, deleteCounters map[string]map[string]int) {
  751. if *alert.Source.Scope == types.CAPIOrigin {
  752. *alert.Source.Scope = types.CommunityBlocklistPullSourceScope
  753. alert.Scenario = ptr.Of(fmt.Sprintf("update : +%d/-%d IPs", addCounters[types.CAPIOrigin]["all"], deleteCounters[types.CAPIOrigin]["all"]))
  754. } else if *alert.Source.Scope == types.ListOrigin {
  755. *alert.Source.Scope = fmt.Sprintf("%s:%s", types.ListOrigin, *alert.Scenario)
  756. alert.Scenario = ptr.Of(fmt.Sprintf("update : +%d/-%d IPs", addCounters[types.ListOrigin][*alert.Scenario], deleteCounters[types.ListOrigin][*alert.Scenario]))
  757. }
  758. }
  759. func (a *apic) Pull() error {
  760. defer trace.CatchPanic("lapi/pullFromAPIC")
  761. toldOnce := false
  762. for {
  763. scenario, err := a.FetchScenariosListFromDB()
  764. if err != nil {
  765. log.Errorf("unable to fetch scenarios from db: %s", err)
  766. }
  767. if len(scenario) > 0 {
  768. break
  769. }
  770. if !toldOnce {
  771. log.Warning("scenario list is empty, will not pull yet")
  772. toldOnce = true
  773. }
  774. time.Sleep(1 * time.Second)
  775. }
  776. if err := a.PullTop(false); err != nil {
  777. log.Errorf("capi pull top: %s", err)
  778. }
  779. log.Infof("Start pull from CrowdSec Central API (interval: %s once, then %s)", a.pullIntervalFirst.Round(time.Second), a.pullInterval)
  780. ticker := time.NewTicker(a.pullIntervalFirst)
  781. for {
  782. select {
  783. case <-ticker.C:
  784. ticker.Reset(a.pullInterval)
  785. if err := a.PullTop(false); err != nil {
  786. log.Errorf("capi pull top: %s", err)
  787. continue
  788. }
  789. case <-a.pullTomb.Dying(): // if one apic routine is dying, do we kill the others?
  790. a.metricsTomb.Kill(nil)
  791. a.pushTomb.Kill(nil)
  792. return nil
  793. }
  794. }
  795. }
  796. func (a *apic) Shutdown() {
  797. a.pushTomb.Kill(nil)
  798. a.pullTomb.Kill(nil)
  799. a.metricsTomb.Kill(nil)
  800. }
  801. func makeAddAndDeleteCounters() (map[string]map[string]int, map[string]map[string]int) {
  802. addCounters := make(map[string]map[string]int)
  803. addCounters[types.CAPIOrigin] = make(map[string]int)
  804. addCounters[types.ListOrigin] = make(map[string]int)
  805. deleteCounters := make(map[string]map[string]int)
  806. deleteCounters[types.CAPIOrigin] = make(map[string]int)
  807. deleteCounters[types.ListOrigin] = make(map[string]int)
  808. return addCounters, deleteCounters
  809. }
  810. func updateCounterForDecision(counter map[string]map[string]int, origin *string, scenario *string, totalDecisions int) {
  811. if *origin == types.CAPIOrigin {
  812. counter[*origin]["all"] += totalDecisions
  813. } else if *origin == types.ListOrigin {
  814. counter[*origin][*scenario] += totalDecisions
  815. } else {
  816. log.Warningf("Unknown origin %s", *origin)
  817. }
  818. }