apic.go 30 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925
  1. package apiserver
  2. import (
  3. "context"
  4. "fmt"
  5. "math/rand"
  6. "net"
  7. "net/http"
  8. "net/url"
  9. "strconv"
  10. "strings"
  11. "sync"
  12. "time"
  13. "github.com/go-openapi/strfmt"
  14. "github.com/pkg/errors"
  15. log "github.com/sirupsen/logrus"
  16. "golang.org/x/exp/slices"
  17. "gopkg.in/tomb.v2"
  18. "github.com/crowdsecurity/go-cs-lib/pkg/ptr"
  19. "github.com/crowdsecurity/go-cs-lib/pkg/trace"
  20. "github.com/crowdsecurity/go-cs-lib/pkg/version"
  21. "github.com/crowdsecurity/crowdsec/pkg/apiclient"
  22. "github.com/crowdsecurity/crowdsec/pkg/csconfig"
  23. "github.com/crowdsecurity/crowdsec/pkg/database"
  24. "github.com/crowdsecurity/crowdsec/pkg/database/ent"
  25. "github.com/crowdsecurity/crowdsec/pkg/database/ent/alert"
  26. "github.com/crowdsecurity/crowdsec/pkg/database/ent/decision"
  27. "github.com/crowdsecurity/crowdsec/pkg/models"
  28. "github.com/crowdsecurity/crowdsec/pkg/modelscapi"
  29. "github.com/crowdsecurity/crowdsec/pkg/types"
  30. )
  31. var (
  32. pullIntervalDefault = time.Hour * 2
  33. pullIntervalDelta = 5 * time.Minute
  34. pushIntervalDefault = time.Second * 10
  35. pushIntervalDelta = time.Second * 7
  36. metricsIntervalDefault = time.Minute * 30
  37. metricsIntervalDelta = time.Minute * 15
  38. )
  39. var SCOPE_CAPI_ALIAS_ALIAS string = "crowdsecurity/community-blocklist" //we don't use "CAPI" directly, to make it less confusing for the user
  40. type apic struct {
  41. // when changing the intervals in tests, always set *First too
  42. // or they can be negative
  43. pullInterval time.Duration
  44. pullIntervalFirst time.Duration
  45. pushInterval time.Duration
  46. pushIntervalFirst time.Duration
  47. metricsInterval time.Duration
  48. metricsIntervalFirst time.Duration
  49. dbClient *database.Client
  50. apiClient *apiclient.ApiClient
  51. AlertsAddChan chan []*models.Alert
  52. mu sync.Mutex
  53. pushTomb tomb.Tomb
  54. pullTomb tomb.Tomb
  55. metricsTomb tomb.Tomb
  56. startup bool
  57. credentials *csconfig.ApiCredentialsCfg
  58. scenarioList []string
  59. consoleConfig *csconfig.ConsoleConfig
  60. isPulling chan bool
  61. whitelists *csconfig.CapiWhitelist
  62. }
  63. // randomDuration returns a duration value between d-delta and d+delta
  64. func randomDuration(d time.Duration, delta time.Duration) time.Duration {
  65. return time.Duration(float64(d) + float64(delta)*(-1.0+2.0*rand.Float64()))
  66. }
  67. func (a *apic) FetchScenariosListFromDB() ([]string, error) {
  68. scenarios := make([]string, 0)
  69. machines, err := a.dbClient.ListMachines()
  70. if err != nil {
  71. return nil, errors.Wrap(err, "while listing machines")
  72. }
  73. //merge all scenarios together
  74. for _, v := range machines {
  75. machineScenarios := strings.Split(v.Scenarios, ",")
  76. log.Debugf("%d scenarios for machine %d", len(machineScenarios), v.ID)
  77. for _, sv := range machineScenarios {
  78. if !slices.Contains(scenarios, sv) && sv != "" {
  79. scenarios = append(scenarios, sv)
  80. }
  81. }
  82. }
  83. log.Debugf("Returning list of scenarios : %+v", scenarios)
  84. return scenarios, nil
  85. }
  86. func decisionsToApiDecisions(decisions []*models.Decision) models.AddSignalsRequestItemDecisions {
  87. apiDecisions := models.AddSignalsRequestItemDecisions{}
  88. for _, decision := range decisions {
  89. x := &models.AddSignalsRequestItemDecisionsItem{
  90. Duration: ptr.Of(*decision.Duration),
  91. ID: new(int64),
  92. Origin: ptr.Of(*decision.Origin),
  93. Scenario: ptr.Of(*decision.Scenario),
  94. Scope: ptr.Of(*decision.Scope),
  95. //Simulated: *decision.Simulated,
  96. Type: ptr.Of(*decision.Type),
  97. Until: decision.Until,
  98. Value: ptr.Of(*decision.Value),
  99. UUID: decision.UUID,
  100. }
  101. *x.ID = decision.ID
  102. if decision.Simulated != nil {
  103. x.Simulated = *decision.Simulated
  104. }
  105. apiDecisions = append(apiDecisions, x)
  106. }
  107. return apiDecisions
  108. }
  109. func alertToSignal(alert *models.Alert, scenarioTrust string, shareContext bool) *models.AddSignalsRequestItem {
  110. signal := &models.AddSignalsRequestItem{
  111. Message: alert.Message,
  112. Scenario: alert.Scenario,
  113. ScenarioHash: alert.ScenarioHash,
  114. ScenarioVersion: alert.ScenarioVersion,
  115. Source: &models.AddSignalsRequestItemSource{
  116. AsName: alert.Source.AsName,
  117. AsNumber: alert.Source.AsNumber,
  118. Cn: alert.Source.Cn,
  119. IP: alert.Source.IP,
  120. Latitude: alert.Source.Latitude,
  121. Longitude: alert.Source.Longitude,
  122. Range: alert.Source.Range,
  123. Scope: alert.Source.Scope,
  124. Value: alert.Source.Value,
  125. },
  126. StartAt: alert.StartAt,
  127. StopAt: alert.StopAt,
  128. CreatedAt: alert.CreatedAt,
  129. MachineID: alert.MachineID,
  130. ScenarioTrust: scenarioTrust,
  131. Decisions: decisionsToApiDecisions(alert.Decisions),
  132. UUID: alert.UUID,
  133. }
  134. if shareContext {
  135. signal.Context = make([]*models.AddSignalsRequestItemContextItems0, 0)
  136. for _, meta := range alert.Meta {
  137. contextItem := models.AddSignalsRequestItemContextItems0{
  138. Key: meta.Key,
  139. Value: meta.Value,
  140. }
  141. signal.Context = append(signal.Context, &contextItem)
  142. }
  143. }
  144. return signal
  145. }
  146. func NewAPIC(config *csconfig.OnlineApiClientCfg, dbClient *database.Client, consoleConfig *csconfig.ConsoleConfig, apicWhitelist *csconfig.CapiWhitelist) (*apic, error) {
  147. var err error
  148. ret := &apic{
  149. AlertsAddChan: make(chan []*models.Alert),
  150. dbClient: dbClient,
  151. mu: sync.Mutex{},
  152. startup: true,
  153. credentials: config.Credentials,
  154. pullTomb: tomb.Tomb{},
  155. pushTomb: tomb.Tomb{},
  156. metricsTomb: tomb.Tomb{},
  157. scenarioList: make([]string, 0),
  158. consoleConfig: consoleConfig,
  159. pullInterval: pullIntervalDefault,
  160. pullIntervalFirst: randomDuration(pullIntervalDefault, pullIntervalDelta),
  161. pushInterval: pushIntervalDefault,
  162. pushIntervalFirst: randomDuration(pushIntervalDefault, pushIntervalDelta),
  163. metricsInterval: metricsIntervalDefault,
  164. metricsIntervalFirst: randomDuration(metricsIntervalDefault, metricsIntervalDelta),
  165. isPulling: make(chan bool, 1),
  166. whitelists: apicWhitelist,
  167. }
  168. password := strfmt.Password(config.Credentials.Password)
  169. apiURL, err := url.Parse(config.Credentials.URL)
  170. if err != nil {
  171. return nil, errors.Wrapf(err, "while parsing '%s'", config.Credentials.URL)
  172. }
  173. papiURL, err := url.Parse(config.Credentials.PapiURL)
  174. if err != nil {
  175. return nil, errors.Wrapf(err, "while parsing '%s'", config.Credentials.PapiURL)
  176. }
  177. ret.scenarioList, err = ret.FetchScenariosListFromDB()
  178. if err != nil {
  179. return nil, errors.Wrap(err, "while fetching scenarios from db")
  180. }
  181. ret.apiClient, err = apiclient.NewClient(&apiclient.Config{
  182. MachineID: config.Credentials.Login,
  183. Password: password,
  184. UserAgent: fmt.Sprintf("crowdsec/%s", version.String()),
  185. URL: apiURL,
  186. PapiURL: papiURL,
  187. VersionPrefix: "v3",
  188. Scenarios: ret.scenarioList,
  189. UpdateScenario: ret.FetchScenariosListFromDB,
  190. })
  191. if err != nil {
  192. return nil, errors.Wrap(err, "while creating api client")
  193. }
  194. // The watcher will be authenticated by the RoundTripper the first time it will call CAPI
  195. // Explicit authentication will provoke an useless supplementary call to CAPI
  196. scenarios, err := ret.FetchScenariosListFromDB()
  197. if err != nil {
  198. return ret, errors.Wrapf(err, "get scenario in db: %s", err)
  199. }
  200. authResp, _, err := ret.apiClient.Auth.AuthenticateWatcher(context.Background(), models.WatcherAuthRequest{
  201. MachineID: &config.Credentials.Login,
  202. Password: &password,
  203. Scenarios: scenarios,
  204. })
  205. if err != nil {
  206. return ret, errors.Wrapf(err, "authenticate watcher (%s)", config.Credentials.Login)
  207. }
  208. if err := ret.apiClient.GetClient().Transport.(*apiclient.JWTTransport).Expiration.UnmarshalText([]byte(authResp.Expire)); err != nil {
  209. return ret, errors.Wrap(err, "unable to parse jwt expiration")
  210. }
  211. ret.apiClient.GetClient().Transport.(*apiclient.JWTTransport).Token = authResp.Token
  212. return ret, err
  213. }
  214. // keep track of all alerts in cache and push it to CAPI every PushInterval.
  215. func (a *apic) Push() error {
  216. defer trace.CatchPanic("lapi/pushToAPIC")
  217. var cache models.AddSignalsRequest
  218. ticker := time.NewTicker(a.pushIntervalFirst)
  219. log.Infof("Start push to CrowdSec Central API (interval: %s once, then %s)", a.pushIntervalFirst.Round(time.Second), a.pushInterval)
  220. for {
  221. select {
  222. case <-a.pushTomb.Dying(): // if one apic routine is dying, do we kill the others?
  223. a.pullTomb.Kill(nil)
  224. a.metricsTomb.Kill(nil)
  225. log.Infof("push tomb is dying, sending cache (%d elements) before exiting", len(cache))
  226. if len(cache) == 0 {
  227. return nil
  228. }
  229. go a.Send(&cache)
  230. return nil
  231. case <-ticker.C:
  232. ticker.Reset(a.pushInterval)
  233. if len(cache) > 0 {
  234. a.mu.Lock()
  235. cacheCopy := cache
  236. cache = make(models.AddSignalsRequest, 0)
  237. a.mu.Unlock()
  238. log.Infof("Signal push: %d signals to push", len(cacheCopy))
  239. go a.Send(&cacheCopy)
  240. }
  241. case alerts := <-a.AlertsAddChan:
  242. var signals []*models.AddSignalsRequestItem
  243. for _, alert := range alerts {
  244. if ok := shouldShareAlert(alert, a.consoleConfig); ok {
  245. signals = append(signals, alertToSignal(alert, getScenarioTrustOfAlert(alert), *a.consoleConfig.ShareContext))
  246. }
  247. }
  248. a.mu.Lock()
  249. cache = append(cache, signals...)
  250. a.mu.Unlock()
  251. }
  252. }
  253. }
  254. func getScenarioTrustOfAlert(alert *models.Alert) string {
  255. scenarioTrust := "certified"
  256. if alert.ScenarioHash == nil || *alert.ScenarioHash == "" {
  257. scenarioTrust = "custom"
  258. } else if alert.ScenarioVersion == nil || *alert.ScenarioVersion == "" || *alert.ScenarioVersion == "?" {
  259. scenarioTrust = "tainted"
  260. }
  261. if len(alert.Decisions) > 0 {
  262. if *alert.Decisions[0].Origin == types.CscliOrigin {
  263. scenarioTrust = "manual"
  264. }
  265. }
  266. return scenarioTrust
  267. }
  268. func shouldShareAlert(alert *models.Alert, consoleConfig *csconfig.ConsoleConfig) bool {
  269. if *alert.Simulated {
  270. log.Debugf("simulation enabled for alert (id:%d), will not be sent to CAPI", alert.ID)
  271. return false
  272. }
  273. switch scenarioTrust := getScenarioTrustOfAlert(alert); scenarioTrust {
  274. case "manual":
  275. if !*consoleConfig.ShareManualDecisions {
  276. log.Debugf("manual decision generated an alert, doesn't send it to CAPI because options is disabled")
  277. return false
  278. }
  279. case "tainted":
  280. if !*consoleConfig.ShareTaintedScenarios {
  281. log.Debugf("tainted scenario generated an alert, doesn't send it to CAPI because options is disabled")
  282. return false
  283. }
  284. case "custom":
  285. if !*consoleConfig.ShareCustomScenarios {
  286. log.Debugf("custom scenario generated an alert, doesn't send it to CAPI because options is disabled")
  287. return false
  288. }
  289. }
  290. return true
  291. }
  292. func (a *apic) Send(cacheOrig *models.AddSignalsRequest) {
  293. /*we do have a problem with this :
  294. The apic.Push background routine reads from alertToPush chan.
  295. This chan is filled by Controller.CreateAlert
  296. If the chan apic.Send hangs, the alertToPush chan will become full,
  297. with means that Controller.CreateAlert is going to hang, blocking API worker(s).
  298. So instead, we prefer to cancel write.
  299. I don't know enough about gin to tell how much of an issue it can be.
  300. */
  301. var cache []*models.AddSignalsRequestItem = *cacheOrig
  302. var send models.AddSignalsRequest
  303. bulkSize := 50
  304. pageStart := 0
  305. pageEnd := bulkSize
  306. for {
  307. if pageEnd >= len(cache) {
  308. send = cache[pageStart:]
  309. ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
  310. defer cancel()
  311. _, _, err := a.apiClient.Signal.Add(ctx, &send)
  312. if err != nil {
  313. log.Errorf("sending signal to central API: %s", err)
  314. return
  315. }
  316. break
  317. }
  318. send = cache[pageStart:pageEnd]
  319. ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
  320. defer cancel()
  321. _, _, err := a.apiClient.Signal.Add(ctx, &send)
  322. if err != nil {
  323. //we log it here as well, because the return value of func might be discarded
  324. log.Errorf("sending signal to central API: %s", err)
  325. }
  326. pageStart += bulkSize
  327. pageEnd += bulkSize
  328. }
  329. }
  330. func (a *apic) CAPIPullIsOld() (bool, error) {
  331. /*only pull community blocklist if it's older than 1h30 */
  332. alerts := a.dbClient.Ent.Alert.Query()
  333. alerts = alerts.Where(alert.HasDecisionsWith(decision.OriginEQ(database.CapiMachineID)))
  334. alerts = alerts.Where(alert.CreatedAtGTE(time.Now().UTC().Add(-time.Duration(1*time.Hour + 30*time.Minute)))) //nolint:unconvert
  335. count, err := alerts.Count(a.dbClient.CTX)
  336. if err != nil {
  337. return false, errors.Wrap(err, "while looking for CAPI alert")
  338. }
  339. if count > 0 {
  340. log.Printf("last CAPI pull is newer than 1h30, skip.")
  341. return false, nil
  342. }
  343. return true, nil
  344. }
  345. func (a *apic) HandleDeletedDecisions(deletedDecisions []*models.Decision, delete_counters map[string]map[string]int) (int, error) {
  346. var filter map[string][]string
  347. var nbDeleted int
  348. for _, decision := range deletedDecisions {
  349. if strings.ToLower(*decision.Scope) == "ip" {
  350. filter = make(map[string][]string, 1)
  351. filter["value"] = []string{*decision.Value}
  352. } else {
  353. filter = make(map[string][]string, 3)
  354. filter["value"] = []string{*decision.Value}
  355. filter["type"] = []string{*decision.Type}
  356. filter["scopes"] = []string{*decision.Scope}
  357. }
  358. filter["origin"] = []string{*decision.Origin}
  359. dbCliRet, _, err := a.dbClient.SoftDeleteDecisionsWithFilter(filter)
  360. if err != nil {
  361. return 0, errors.Wrap(err, "deleting decisions error")
  362. }
  363. dbCliDel, err := strconv.Atoi(dbCliRet)
  364. if err != nil {
  365. return 0, errors.Wrapf(err, "converting db ret %d", dbCliDel)
  366. }
  367. updateCounterForDecision(delete_counters, decision.Origin, decision.Scenario, dbCliDel)
  368. nbDeleted += dbCliDel
  369. }
  370. return nbDeleted, nil
  371. }
  372. func (a *apic) HandleDeletedDecisionsV3(deletedDecisions []*modelscapi.GetDecisionsStreamResponseDeletedItem, delete_counters map[string]map[string]int) (int, error) {
  373. var filter map[string][]string
  374. var nbDeleted int
  375. for _, decisions := range deletedDecisions {
  376. scope := decisions.Scope
  377. for _, decision := range decisions.Decisions {
  378. if strings.ToLower(*scope) == "ip" {
  379. filter = make(map[string][]string, 1)
  380. filter["value"] = []string{decision}
  381. } else {
  382. filter = make(map[string][]string, 2)
  383. filter["value"] = []string{decision}
  384. filter["scopes"] = []string{*scope}
  385. }
  386. filter["origin"] = []string{types.CAPIOrigin}
  387. dbCliRet, _, err := a.dbClient.SoftDeleteDecisionsWithFilter(filter)
  388. if err != nil {
  389. return 0, errors.Wrap(err, "deleting decisions error")
  390. }
  391. dbCliDel, err := strconv.Atoi(dbCliRet)
  392. if err != nil {
  393. return 0, errors.Wrapf(err, "converting db ret %d", dbCliDel)
  394. }
  395. updateCounterForDecision(delete_counters, ptr.Of(types.CAPIOrigin), nil, dbCliDel)
  396. nbDeleted += dbCliDel
  397. }
  398. }
  399. return nbDeleted, nil
  400. }
  401. func createAlertsForDecisions(decisions []*models.Decision) []*models.Alert {
  402. newAlerts := make([]*models.Alert, 0)
  403. for _, decision := range decisions {
  404. found := false
  405. for _, sub := range newAlerts {
  406. if sub.Source.Scope == nil {
  407. log.Warningf("nil scope in %+v", sub)
  408. continue
  409. }
  410. if *decision.Origin == types.CAPIOrigin {
  411. if *sub.Source.Scope == types.CAPIOrigin {
  412. found = true
  413. break
  414. }
  415. } else if *decision.Origin == types.ListOrigin {
  416. if *sub.Source.Scope == *decision.Origin {
  417. if sub.Scenario == nil {
  418. log.Warningf("nil scenario in %+v", sub)
  419. }
  420. if *sub.Scenario == *decision.Scenario {
  421. found = true
  422. break
  423. }
  424. }
  425. } else {
  426. log.Warningf("unknown origin %s : %+v", *decision.Origin, decision)
  427. }
  428. }
  429. if !found {
  430. log.Debugf("Create entry for origin:%s scenario:%s", *decision.Origin, *decision.Scenario)
  431. newAlerts = append(newAlerts, createAlertForDecision(decision))
  432. }
  433. }
  434. return newAlerts
  435. }
  436. func createAlertForDecision(decision *models.Decision) *models.Alert {
  437. newAlert := &models.Alert{}
  438. newAlert.Source = &models.Source{}
  439. newAlert.Source.Scope = ptr.Of("")
  440. if *decision.Origin == types.CAPIOrigin { //to make things more user friendly, we replace CAPI with community-blocklist
  441. newAlert.Scenario = ptr.Of(types.CAPIOrigin)
  442. newAlert.Source.Scope = ptr.Of(types.CAPIOrigin)
  443. } else if *decision.Origin == types.ListOrigin {
  444. newAlert.Scenario = ptr.Of(*decision.Scenario)
  445. newAlert.Source.Scope = ptr.Of(types.ListOrigin)
  446. } else {
  447. log.Warningf("unknown origin %s", *decision.Origin)
  448. }
  449. newAlert.Message = ptr.Of("")
  450. newAlert.Source.Value = ptr.Of("")
  451. newAlert.StartAt = ptr.Of(time.Now().UTC().Format(time.RFC3339))
  452. newAlert.StopAt = ptr.Of(time.Now().UTC().Format(time.RFC3339))
  453. newAlert.Capacity = ptr.Of(int32(0))
  454. newAlert.Simulated = ptr.Of(false)
  455. newAlert.EventsCount = ptr.Of(int32(0))
  456. newAlert.Leakspeed = ptr.Of("")
  457. newAlert.ScenarioHash = ptr.Of("")
  458. newAlert.ScenarioVersion = ptr.Of("")
  459. newAlert.MachineID = database.CapiMachineID
  460. return newAlert
  461. }
  462. // This function takes in list of parent alerts and decisions and then pairs them up.
  463. func fillAlertsWithDecisions(alerts []*models.Alert, decisions []*models.Decision, add_counters map[string]map[string]int) []*models.Alert {
  464. for _, decision := range decisions {
  465. //count and create separate alerts for each list
  466. updateCounterForDecision(add_counters, decision.Origin, decision.Scenario, 1)
  467. /*CAPI might send lower case scopes, unify it.*/
  468. switch strings.ToLower(*decision.Scope) {
  469. case "ip":
  470. *decision.Scope = types.Ip
  471. case "range":
  472. *decision.Scope = types.Range
  473. }
  474. found := false
  475. //add the individual decisions to the right list
  476. for idx, alert := range alerts {
  477. if *decision.Origin == types.CAPIOrigin {
  478. if *alert.Source.Scope == types.CAPIOrigin {
  479. alerts[idx].Decisions = append(alerts[idx].Decisions, decision)
  480. found = true
  481. break
  482. }
  483. } else if *decision.Origin == types.ListOrigin {
  484. if *alert.Source.Scope == types.ListOrigin && *alert.Scenario == *decision.Scenario {
  485. alerts[idx].Decisions = append(alerts[idx].Decisions, decision)
  486. found = true
  487. break
  488. }
  489. } else {
  490. log.Warningf("unknown origin %s", *decision.Origin)
  491. }
  492. }
  493. if !found {
  494. log.Warningf("Orphaned decision for %s - %s", *decision.Origin, *decision.Scenario)
  495. }
  496. }
  497. return alerts
  498. }
  499. // we receive a list of decisions and links for blocklist and we need to create a list of alerts :
  500. // one alert for "community blocklist"
  501. // one alert per list we're subscribed to
  502. func (a *apic) PullTop(forcePull bool) error {
  503. var err error
  504. //A mutex with TryLock would be a bit simpler
  505. //But go does not guarantee that TryLock will be able to acquire the lock even if it is available
  506. select {
  507. case a.isPulling <- true:
  508. defer func() {
  509. <-a.isPulling
  510. }()
  511. default:
  512. return errors.New("pull already in progress")
  513. }
  514. if !forcePull {
  515. if lastPullIsOld, err := a.CAPIPullIsOld(); err != nil {
  516. return err
  517. } else if !lastPullIsOld {
  518. return nil
  519. }
  520. }
  521. log.Infof("Starting community-blocklist update")
  522. data, _, err := a.apiClient.Decisions.GetStreamV3(context.Background(), apiclient.DecisionsStreamOpts{Startup: a.startup})
  523. if err != nil {
  524. return errors.Wrap(err, "get stream")
  525. }
  526. a.startup = false
  527. /*to count additions/deletions across lists*/
  528. log.Debugf("Received %d new decisions", len(data.New))
  529. log.Debugf("Received %d deleted decisions", len(data.Deleted))
  530. if data.Links != nil {
  531. log.Debugf("Received %d blocklists links", len(data.Links.Blocklists))
  532. }
  533. add_counters, delete_counters := makeAddAndDeleteCounters()
  534. // process deleted decisions
  535. if nbDeleted, err := a.HandleDeletedDecisionsV3(data.Deleted, delete_counters); err != nil {
  536. return err
  537. } else {
  538. log.Printf("capi/community-blocklist : %d explicit deletions", nbDeleted)
  539. }
  540. if len(data.New) == 0 {
  541. log.Infof("capi/community-blocklist : received 0 new entries (expected if you just installed crowdsec)")
  542. return nil
  543. }
  544. // create one alert for community blocklist using the first decision
  545. decisions := a.apiClient.Decisions.GetDecisionsFromGroups(data.New)
  546. //apply APIC specific whitelists
  547. decisions = a.ApplyApicWhitelists(decisions)
  548. alert := createAlertForDecision(decisions[0])
  549. alertsFromCapi := []*models.Alert{alert}
  550. alertsFromCapi = fillAlertsWithDecisions(alertsFromCapi, decisions, add_counters)
  551. err = a.SaveAlerts(alertsFromCapi, add_counters, delete_counters)
  552. if err != nil {
  553. return errors.Wrap(err, "while saving alerts")
  554. }
  555. // update blocklists
  556. if err := a.UpdateBlocklists(data.Links, add_counters); err != nil {
  557. return errors.Wrap(err, "while updating blocklists")
  558. }
  559. return nil
  560. }
  561. func (a *apic) ApplyApicWhitelists(decisions []*models.Decision) []*models.Decision {
  562. if a.whitelists == nil {
  563. return decisions
  564. }
  565. //deal with CAPI whitelists for fire. We want to avoid having a second list, so we shrink in place
  566. outIdx := 0
  567. for _, decision := range decisions {
  568. if decision.Value == nil {
  569. continue
  570. }
  571. skip := false
  572. ipval := net.ParseIP(*decision.Value)
  573. for _, cidr := range a.whitelists.Cidrs {
  574. if skip {
  575. break
  576. }
  577. if cidr.Contains(ipval) {
  578. log.Infof("%s from %s is whitelisted by %s", *decision.Value, *decision.Scenario, cidr.String())
  579. skip = true
  580. }
  581. }
  582. for _, ip := range a.whitelists.Ips {
  583. if skip {
  584. break
  585. }
  586. if ip != nil && ip.Equal(ipval) {
  587. log.Infof("%s from %s is whitelisted by %s", *decision.Value, *decision.Scenario, ip.String())
  588. skip = true
  589. }
  590. }
  591. if !skip {
  592. decisions[outIdx] = decision
  593. outIdx++
  594. }
  595. }
  596. //shrink the list, those are deleted items
  597. decisions = decisions[:outIdx]
  598. return decisions
  599. }
  600. func (a *apic) SaveAlerts(alertsFromCapi []*models.Alert, add_counters map[string]map[string]int, delete_counters map[string]map[string]int) error {
  601. for idx, alert := range alertsFromCapi {
  602. alertsFromCapi[idx] = setAlertScenario(add_counters, delete_counters, alert)
  603. log.Debugf("%s has %d decisions", *alertsFromCapi[idx].Source.Scope, len(alertsFromCapi[idx].Decisions))
  604. if a.dbClient.Type == "sqlite" && (a.dbClient.WalMode == nil || !*a.dbClient.WalMode) {
  605. log.Warningf("sqlite is not using WAL mode, LAPI might become unresponsive when inserting the community blocklist")
  606. }
  607. alertID, inserted, deleted, err := a.dbClient.UpdateCommunityBlocklist(alertsFromCapi[idx])
  608. if err != nil {
  609. return errors.Wrapf(err, "while saving alert from %s", *alertsFromCapi[idx].Source.Scope)
  610. }
  611. log.Printf("%s : added %d entries, deleted %d entries (alert:%d)", *alertsFromCapi[idx].Source.Scope, inserted, deleted, alertID)
  612. }
  613. return nil
  614. }
  615. func (a *apic) ShouldForcePullBlocklist(blocklist *modelscapi.BlocklistLink) (bool, error) {
  616. // we should force pull if the blocklist decisions are about to expire or there's no decision in the db
  617. alertQuery := a.dbClient.Ent.Alert.Query()
  618. alertQuery.Where(alert.SourceScopeEQ(fmt.Sprintf("%s:%s", types.ListOrigin, *blocklist.Name)))
  619. alertQuery.Order(ent.Desc(alert.FieldCreatedAt))
  620. alertInstance, err := alertQuery.First(context.Background())
  621. if err != nil {
  622. if ent.IsNotFound(err) {
  623. log.Debugf("no alert found for %s, force refresh", *blocklist.Name)
  624. return true, nil
  625. }
  626. return false, errors.Wrap(err, "while getting alert")
  627. }
  628. decisionQuery := a.dbClient.Ent.Decision.Query()
  629. decisionQuery.Where(decision.HasOwnerWith(alert.IDEQ(alertInstance.ID)))
  630. firstDecision, err := decisionQuery.First(context.Background())
  631. if err != nil {
  632. if ent.IsNotFound(err) {
  633. log.Debugf("no decision found for %s, force refresh", *blocklist.Name)
  634. return true, nil
  635. }
  636. return false, errors.Wrap(err, "while getting decision")
  637. }
  638. if firstDecision == nil || firstDecision.Until == nil || firstDecision.Until.Sub(time.Now().UTC()) < (a.pullInterval+15*time.Minute) {
  639. log.Debugf("at least one decision found for %s, expire soon, force refresh", *blocklist.Name)
  640. return true, nil
  641. }
  642. return false, nil
  643. }
  644. func (a *apic) UpdateBlocklists(links *modelscapi.GetDecisionsStreamResponseLinks, add_counters map[string]map[string]int) error {
  645. if links == nil {
  646. return nil
  647. }
  648. if links.Blocklists == nil {
  649. return nil
  650. }
  651. // we must use a different http client than apiClient's because the transport of apiClient is jwtTransport or here we have signed apis that are incompatibles
  652. // we can use the same baseUrl as the urls are absolute and the parse will take care of it
  653. defaultClient, err := apiclient.NewDefaultClient(a.apiClient.BaseURL, "", "", nil)
  654. if err != nil {
  655. return errors.Wrap(err, "while creating default client")
  656. }
  657. for _, blocklist := range links.Blocklists {
  658. if blocklist.Scope == nil {
  659. log.Warningf("blocklist has no scope")
  660. continue
  661. }
  662. if blocklist.Duration == nil {
  663. log.Warningf("blocklist has no duration")
  664. continue
  665. }
  666. forcePull, err := a.ShouldForcePullBlocklist(blocklist)
  667. if err != nil {
  668. return errors.Wrapf(err, "while checking if we should force pull blocklist %s", *blocklist.Name)
  669. }
  670. blocklistConfigItemName := fmt.Sprintf("blocklist:%s:last_pull", *blocklist.Name)
  671. var lastPullTimestamp *string
  672. if !forcePull {
  673. lastPullTimestamp, err = a.dbClient.GetConfigItem(blocklistConfigItemName)
  674. if err != nil {
  675. return errors.Wrapf(err, "while getting last pull timestamp for blocklist %s", *blocklist.Name)
  676. }
  677. }
  678. decisions, has_changed, err := defaultClient.Decisions.GetDecisionsFromBlocklist(context.Background(), blocklist, lastPullTimestamp)
  679. if err != nil {
  680. return errors.Wrapf(err, "while getting decisions from blocklist %s", *blocklist.Name)
  681. }
  682. if !has_changed {
  683. if lastPullTimestamp == nil {
  684. log.Infof("blocklist %s hasn't been modified or there was an error reading it, skipping", *blocklist.Name)
  685. } else {
  686. log.Infof("blocklist %s hasn't been modified since %s, skipping", *blocklist.Name, *lastPullTimestamp)
  687. }
  688. continue
  689. }
  690. err = a.dbClient.SetConfigItem(blocklistConfigItemName, time.Now().UTC().Format(http.TimeFormat))
  691. if err != nil {
  692. return errors.Wrapf(err, "while setting last pull timestamp for blocklist %s", *blocklist.Name)
  693. }
  694. if len(decisions) == 0 {
  695. log.Infof("blocklist %s has no decisions", *blocklist.Name)
  696. continue
  697. }
  698. //apply APIC specific whitelists
  699. decisions = a.ApplyApicWhitelists(decisions)
  700. alert := createAlertForDecision(decisions[0])
  701. alertsFromCapi := []*models.Alert{alert}
  702. alertsFromCapi = fillAlertsWithDecisions(alertsFromCapi, decisions, add_counters)
  703. err = a.SaveAlerts(alertsFromCapi, add_counters, nil)
  704. if err != nil {
  705. return errors.Wrapf(err, "while saving alert from blocklist %s", *blocklist.Name)
  706. }
  707. }
  708. return nil
  709. }
  710. func setAlertScenario(add_counters map[string]map[string]int, delete_counters map[string]map[string]int, alert *models.Alert) *models.Alert {
  711. if *alert.Source.Scope == types.CAPIOrigin {
  712. *alert.Source.Scope = SCOPE_CAPI_ALIAS_ALIAS
  713. alert.Scenario = ptr.Of(fmt.Sprintf("update : +%d/-%d IPs", add_counters[types.CAPIOrigin]["all"], delete_counters[types.CAPIOrigin]["all"]))
  714. } else if *alert.Source.Scope == types.ListOrigin {
  715. *alert.Source.Scope = fmt.Sprintf("%s:%s", types.ListOrigin, *alert.Scenario)
  716. alert.Scenario = ptr.Of(fmt.Sprintf("update : +%d/-%d IPs", add_counters[types.ListOrigin][*alert.Scenario], delete_counters[types.ListOrigin][*alert.Scenario]))
  717. }
  718. return alert
  719. }
  720. func (a *apic) Pull() error {
  721. defer trace.CatchPanic("lapi/pullFromAPIC")
  722. toldOnce := false
  723. for {
  724. scenario, err := a.FetchScenariosListFromDB()
  725. if err != nil {
  726. log.Errorf("unable to fetch scenarios from db: %s", err)
  727. }
  728. if len(scenario) > 0 {
  729. break
  730. }
  731. if !toldOnce {
  732. log.Warning("scenario list is empty, will not pull yet")
  733. toldOnce = true
  734. }
  735. time.Sleep(1 * time.Second)
  736. }
  737. if err := a.PullTop(false); err != nil {
  738. log.Errorf("capi pull top: %s", err)
  739. }
  740. log.Infof("Start pull from CrowdSec Central API (interval: %s once, then %s)", a.pullIntervalFirst.Round(time.Second), a.pullInterval)
  741. ticker := time.NewTicker(a.pullIntervalFirst)
  742. for {
  743. select {
  744. case <-ticker.C:
  745. ticker.Reset(a.pullInterval)
  746. if err := a.PullTop(false); err != nil {
  747. log.Errorf("capi pull top: %s", err)
  748. continue
  749. }
  750. case <-a.pullTomb.Dying(): // if one apic routine is dying, do we kill the others?
  751. a.metricsTomb.Kill(nil)
  752. a.pushTomb.Kill(nil)
  753. return nil
  754. }
  755. }
  756. }
  757. func (a *apic) GetMetrics() (*models.Metrics, error) {
  758. metric := &models.Metrics{
  759. ApilVersion: ptr.Of(version.String()),
  760. Machines: make([]*models.MetricsAgentInfo, 0),
  761. Bouncers: make([]*models.MetricsBouncerInfo, 0),
  762. }
  763. machines, err := a.dbClient.ListMachines()
  764. if err != nil {
  765. return metric, err
  766. }
  767. bouncers, err := a.dbClient.ListBouncers()
  768. if err != nil {
  769. return metric, err
  770. }
  771. var lastpush string
  772. for _, machine := range machines {
  773. if machine.LastPush == nil {
  774. lastpush = time.Time{}.String()
  775. } else {
  776. lastpush = machine.LastPush.String()
  777. }
  778. m := &models.MetricsAgentInfo{
  779. Version: machine.Version,
  780. Name: machine.MachineId,
  781. LastUpdate: machine.UpdatedAt.String(),
  782. LastPush: lastpush,
  783. }
  784. metric.Machines = append(metric.Machines, m)
  785. }
  786. for _, bouncer := range bouncers {
  787. m := &models.MetricsBouncerInfo{
  788. Version: bouncer.Version,
  789. CustomName: bouncer.Name,
  790. Name: bouncer.Type,
  791. LastPull: bouncer.LastPull.String(),
  792. }
  793. metric.Bouncers = append(metric.Bouncers, m)
  794. }
  795. return metric, nil
  796. }
  797. func (a *apic) SendMetrics(stop chan (bool)) {
  798. defer trace.CatchPanic("lapi/metricsToAPIC")
  799. ticker := time.NewTicker(a.metricsIntervalFirst)
  800. log.Infof("Start send metrics to CrowdSec Central API (interval: %s once, then %s)", a.metricsIntervalFirst.Round(time.Second), a.metricsInterval)
  801. for {
  802. metrics, err := a.GetMetrics()
  803. if err != nil {
  804. log.Errorf("unable to get metrics (%s), will retry", err)
  805. }
  806. _, _, err = a.apiClient.Metrics.Add(context.Background(), metrics)
  807. if err != nil {
  808. log.Errorf("capi metrics: failed: %s", err)
  809. } else {
  810. log.Infof("capi metrics: metrics sent successfully")
  811. }
  812. select {
  813. case <-stop:
  814. return
  815. case <-ticker.C:
  816. ticker.Reset(a.metricsInterval)
  817. case <-a.metricsTomb.Dying(): // if one apic routine is dying, do we kill the others?
  818. a.pullTomb.Kill(nil)
  819. a.pushTomb.Kill(nil)
  820. return
  821. }
  822. }
  823. }
  824. func (a *apic) Shutdown() {
  825. a.pushTomb.Kill(nil)
  826. a.pullTomb.Kill(nil)
  827. a.metricsTomb.Kill(nil)
  828. }
  829. func makeAddAndDeleteCounters() (map[string]map[string]int, map[string]map[string]int) {
  830. add_counters := make(map[string]map[string]int)
  831. add_counters[types.CAPIOrigin] = make(map[string]int)
  832. add_counters[types.ListOrigin] = make(map[string]int)
  833. delete_counters := make(map[string]map[string]int)
  834. delete_counters[types.CAPIOrigin] = make(map[string]int)
  835. delete_counters[types.ListOrigin] = make(map[string]int)
  836. return add_counters, delete_counters
  837. }
  838. func updateCounterForDecision(counter map[string]map[string]int, origin *string, scenario *string, totalDecisions int) {
  839. if *origin == types.CAPIOrigin {
  840. counter[*origin]["all"] += totalDecisions
  841. } else if *origin == types.ListOrigin {
  842. counter[*origin][*scenario] += totalDecisions
  843. } else {
  844. log.Warningf("Unknown origin %s", *origin)
  845. }
  846. }