apic.go 30 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906
  1. package apiserver
  2. import (
  3. "context"
  4. "fmt"
  5. "math/rand"
  6. "net"
  7. "net/http"
  8. "net/url"
  9. "strconv"
  10. "strings"
  11. "sync"
  12. "time"
  13. "github.com/go-openapi/strfmt"
  14. "github.com/pkg/errors"
  15. log "github.com/sirupsen/logrus"
  16. "gopkg.in/tomb.v2"
  17. "github.com/crowdsecurity/crowdsec/pkg/apiclient"
  18. "github.com/crowdsecurity/crowdsec/pkg/csconfig"
  19. "github.com/crowdsecurity/crowdsec/pkg/cwversion"
  20. "github.com/crowdsecurity/crowdsec/pkg/database"
  21. "github.com/crowdsecurity/crowdsec/pkg/database/ent"
  22. "github.com/crowdsecurity/crowdsec/pkg/database/ent/alert"
  23. "github.com/crowdsecurity/crowdsec/pkg/database/ent/decision"
  24. "github.com/crowdsecurity/crowdsec/pkg/models"
  25. "github.com/crowdsecurity/crowdsec/pkg/modelscapi"
  26. "github.com/crowdsecurity/crowdsec/pkg/types"
  27. )
  28. var (
  29. pullIntervalDefault = time.Hour * 2
  30. pullIntervalDelta = 5 * time.Minute
  31. pushIntervalDefault = time.Second * 10
  32. pushIntervalDelta = time.Second * 7
  33. metricsIntervalDefault = time.Minute * 30
  34. metricsIntervalDelta = time.Minute * 15
  35. )
  36. var SCOPE_CAPI_ALIAS_ALIAS string = "crowdsecurity/community-blocklist" //we don't use "CAPI" directly, to make it less confusing for the user
  37. type apic struct {
  38. // when changing the intervals in tests, always set *First too
  39. // or they can be negative
  40. pullInterval time.Duration
  41. pullIntervalFirst time.Duration
  42. pushInterval time.Duration
  43. pushIntervalFirst time.Duration
  44. metricsInterval time.Duration
  45. metricsIntervalFirst time.Duration
  46. dbClient *database.Client
  47. apiClient *apiclient.ApiClient
  48. AlertsAddChan chan []*models.Alert
  49. mu sync.Mutex
  50. pushTomb tomb.Tomb
  51. pullTomb tomb.Tomb
  52. metricsTomb tomb.Tomb
  53. startup bool
  54. credentials *csconfig.ApiCredentialsCfg
  55. scenarioList []string
  56. consoleConfig *csconfig.ConsoleConfig
  57. whitelists *csconfig.CapiWhitelist
  58. }
  59. // randomDuration returns a duration value between d-delta and d+delta
  60. func randomDuration(d time.Duration, delta time.Duration) time.Duration {
  61. return time.Duration(float64(d) + float64(delta)*(-1.0+2.0*rand.Float64()))
  62. }
  63. func (a *apic) FetchScenariosListFromDB() ([]string, error) {
  64. scenarios := make([]string, 0)
  65. machines, err := a.dbClient.ListMachines()
  66. if err != nil {
  67. return nil, errors.Wrap(err, "while listing machines")
  68. }
  69. //merge all scenarios together
  70. for _, v := range machines {
  71. machineScenarios := strings.Split(v.Scenarios, ",")
  72. log.Debugf("%d scenarios for machine %d", len(machineScenarios), v.ID)
  73. for _, sv := range machineScenarios {
  74. if !types.InSlice(sv, scenarios) && sv != "" {
  75. scenarios = append(scenarios, sv)
  76. }
  77. }
  78. }
  79. log.Debugf("Returning list of scenarios : %+v", scenarios)
  80. return scenarios, nil
  81. }
  82. func decisionsToApiDecisions(decisions []*models.Decision) models.AddSignalsRequestItemDecisions {
  83. apiDecisions := models.AddSignalsRequestItemDecisions{}
  84. for _, decision := range decisions {
  85. x := &models.AddSignalsRequestItemDecisionsItem{
  86. Duration: types.StrPtr(*decision.Duration),
  87. ID: new(int64),
  88. Origin: types.StrPtr(*decision.Origin),
  89. Scenario: types.StrPtr(*decision.Scenario),
  90. Scope: types.StrPtr(*decision.Scope),
  91. //Simulated: *decision.Simulated,
  92. Type: types.StrPtr(*decision.Type),
  93. Until: decision.Until,
  94. Value: types.StrPtr(*decision.Value),
  95. UUID: decision.UUID,
  96. }
  97. *x.ID = decision.ID
  98. if decision.Simulated != nil {
  99. x.Simulated = *decision.Simulated
  100. }
  101. apiDecisions = append(apiDecisions, x)
  102. }
  103. return apiDecisions
  104. }
  105. func alertToSignal(alert *models.Alert, scenarioTrust string, shareContext bool) *models.AddSignalsRequestItem {
  106. signal := &models.AddSignalsRequestItem{
  107. Message: alert.Message,
  108. Scenario: alert.Scenario,
  109. ScenarioHash: alert.ScenarioHash,
  110. ScenarioVersion: alert.ScenarioVersion,
  111. Source: &models.AddSignalsRequestItemSource{
  112. AsName: alert.Source.AsName,
  113. AsNumber: alert.Source.AsNumber,
  114. Cn: alert.Source.Cn,
  115. IP: alert.Source.IP,
  116. Latitude: alert.Source.Latitude,
  117. Longitude: alert.Source.Longitude,
  118. Range: alert.Source.Range,
  119. Scope: alert.Source.Scope,
  120. Value: alert.Source.Value,
  121. },
  122. StartAt: alert.StartAt,
  123. StopAt: alert.StopAt,
  124. CreatedAt: alert.CreatedAt,
  125. MachineID: alert.MachineID,
  126. ScenarioTrust: scenarioTrust,
  127. Decisions: decisionsToApiDecisions(alert.Decisions),
  128. UUID: alert.UUID,
  129. }
  130. if shareContext {
  131. signal.Context = make([]*models.AddSignalsRequestItemContextItems0, 0)
  132. for _, meta := range alert.Meta {
  133. contextItem := models.AddSignalsRequestItemContextItems0{
  134. Key: meta.Key,
  135. Value: meta.Value,
  136. }
  137. signal.Context = append(signal.Context, &contextItem)
  138. }
  139. }
  140. return signal
  141. }
  142. func NewAPIC(config *csconfig.OnlineApiClientCfg, dbClient *database.Client, consoleConfig *csconfig.ConsoleConfig, apicWhitelist *csconfig.CapiWhitelist) (*apic, error) {
  143. var err error
  144. ret := &apic{
  145. AlertsAddChan: make(chan []*models.Alert),
  146. dbClient: dbClient,
  147. mu: sync.Mutex{},
  148. startup: true,
  149. credentials: config.Credentials,
  150. pullTomb: tomb.Tomb{},
  151. pushTomb: tomb.Tomb{},
  152. metricsTomb: tomb.Tomb{},
  153. scenarioList: make([]string, 0),
  154. consoleConfig: consoleConfig,
  155. pullInterval: pullIntervalDefault,
  156. pullIntervalFirst: randomDuration(pullIntervalDefault, pullIntervalDelta),
  157. pushInterval: pushIntervalDefault,
  158. pushIntervalFirst: randomDuration(pushIntervalDefault, pushIntervalDelta),
  159. metricsInterval: metricsIntervalDefault,
  160. metricsIntervalFirst: randomDuration(metricsIntervalDefault, metricsIntervalDelta),
  161. whitelists: apicWhitelist,
  162. }
  163. password := strfmt.Password(config.Credentials.Password)
  164. apiURL, err := url.Parse(config.Credentials.URL)
  165. if err != nil {
  166. return nil, errors.Wrapf(err, "while parsing '%s'", config.Credentials.URL)
  167. }
  168. papiURL, err := url.Parse(config.Credentials.PapiURL)
  169. if err != nil {
  170. return nil, errors.Wrapf(err, "while parsing '%s'", config.Credentials.PapiURL)
  171. }
  172. ret.scenarioList, err = ret.FetchScenariosListFromDB()
  173. if err != nil {
  174. return nil, errors.Wrap(err, "while fetching scenarios from db")
  175. }
  176. ret.apiClient, err = apiclient.NewClient(&apiclient.Config{
  177. MachineID: config.Credentials.Login,
  178. Password: password,
  179. UserAgent: fmt.Sprintf("crowdsec/%s", cwversion.VersionStr()),
  180. URL: apiURL,
  181. PapiURL: papiURL,
  182. VersionPrefix: "v3",
  183. Scenarios: ret.scenarioList,
  184. UpdateScenario: ret.FetchScenariosListFromDB,
  185. })
  186. if err != nil {
  187. return nil, errors.Wrap(err, "while creating api client")
  188. }
  189. // The watcher will be authenticated by the RoundTripper the first time it will call CAPI
  190. // Explicit authentication will provoke an useless supplementary call to CAPI
  191. scenarios, err := ret.FetchScenariosListFromDB()
  192. if err != nil {
  193. return ret, errors.Wrapf(err, "get scenario in db: %s", err)
  194. }
  195. authResp, _, err := ret.apiClient.Auth.AuthenticateWatcher(context.Background(), models.WatcherAuthRequest{
  196. MachineID: &config.Credentials.Login,
  197. Password: &password,
  198. Scenarios: scenarios,
  199. })
  200. if err != nil {
  201. return ret, errors.Wrapf(err, "authenticate watcher (%s)", config.Credentials.Login)
  202. }
  203. if err := ret.apiClient.GetClient().Transport.(*apiclient.JWTTransport).Expiration.UnmarshalText([]byte(authResp.Expire)); err != nil {
  204. return ret, errors.Wrap(err, "unable to parse jwt expiration")
  205. }
  206. ret.apiClient.GetClient().Transport.(*apiclient.JWTTransport).Token = authResp.Token
  207. return ret, err
  208. }
  209. // keep track of all alerts in cache and push it to CAPI every PushInterval.
  210. func (a *apic) Push() error {
  211. defer types.CatchPanic("lapi/pushToAPIC")
  212. var cache models.AddSignalsRequest
  213. ticker := time.NewTicker(a.pushIntervalFirst)
  214. log.Infof("Start push to CrowdSec Central API (interval: %s once, then %s)", a.pushIntervalFirst.Round(time.Second), a.pushInterval)
  215. for {
  216. select {
  217. case <-a.pushTomb.Dying(): // if one apic routine is dying, do we kill the others?
  218. a.pullTomb.Kill(nil)
  219. a.metricsTomb.Kill(nil)
  220. log.Infof("push tomb is dying, sending cache (%d elements) before exiting", len(cache))
  221. if len(cache) == 0 {
  222. return nil
  223. }
  224. go a.Send(&cache)
  225. return nil
  226. case <-ticker.C:
  227. ticker.Reset(a.pushInterval)
  228. if len(cache) > 0 {
  229. a.mu.Lock()
  230. cacheCopy := cache
  231. cache = make(models.AddSignalsRequest, 0)
  232. a.mu.Unlock()
  233. log.Infof("Signal push: %d signals to push", len(cacheCopy))
  234. go a.Send(&cacheCopy)
  235. }
  236. case alerts := <-a.AlertsAddChan:
  237. var signals []*models.AddSignalsRequestItem
  238. for _, alert := range alerts {
  239. if ok := shouldShareAlert(alert, a.consoleConfig); ok {
  240. signals = append(signals, alertToSignal(alert, getScenarioTrustOfAlert(alert), *a.consoleConfig.ShareContext))
  241. }
  242. }
  243. a.mu.Lock()
  244. cache = append(cache, signals...)
  245. a.mu.Unlock()
  246. }
  247. }
  248. }
  249. func getScenarioTrustOfAlert(alert *models.Alert) string {
  250. scenarioTrust := "certified"
  251. if alert.ScenarioHash == nil || *alert.ScenarioHash == "" {
  252. scenarioTrust = "custom"
  253. } else if alert.ScenarioVersion == nil || *alert.ScenarioVersion == "" || *alert.ScenarioVersion == "?" {
  254. scenarioTrust = "tainted"
  255. }
  256. if len(alert.Decisions) > 0 {
  257. if *alert.Decisions[0].Origin == types.CscliOrigin {
  258. scenarioTrust = "manual"
  259. }
  260. }
  261. return scenarioTrust
  262. }
  263. func shouldShareAlert(alert *models.Alert, consoleConfig *csconfig.ConsoleConfig) bool {
  264. if *alert.Simulated {
  265. log.Debugf("simulation enabled for alert (id:%d), will not be sent to CAPI", alert.ID)
  266. return false
  267. }
  268. switch scenarioTrust := getScenarioTrustOfAlert(alert); scenarioTrust {
  269. case "manual":
  270. if !*consoleConfig.ShareManualDecisions {
  271. log.Debugf("manual decision generated an alert, doesn't send it to CAPI because options is disabled")
  272. return false
  273. }
  274. case "tainted":
  275. if !*consoleConfig.ShareTaintedScenarios {
  276. log.Debugf("tainted scenario generated an alert, doesn't send it to CAPI because options is disabled")
  277. return false
  278. }
  279. case "custom":
  280. if !*consoleConfig.ShareCustomScenarios {
  281. log.Debugf("custom scenario generated an alert, doesn't send it to CAPI because options is disabled")
  282. return false
  283. }
  284. }
  285. return true
  286. }
  287. func (a *apic) Send(cacheOrig *models.AddSignalsRequest) {
  288. /*we do have a problem with this :
  289. The apic.Push background routine reads from alertToPush chan.
  290. This chan is filled by Controller.CreateAlert
  291. If the chan apic.Send hangs, the alertToPush chan will become full,
  292. with means that Controller.CreateAlert is going to hang, blocking API worker(s).
  293. So instead, we prefer to cancel write.
  294. I don't know enough about gin to tell how much of an issue it can be.
  295. */
  296. var cache []*models.AddSignalsRequestItem = *cacheOrig
  297. var send models.AddSignalsRequest
  298. bulkSize := 50
  299. pageStart := 0
  300. pageEnd := bulkSize
  301. for {
  302. if pageEnd >= len(cache) {
  303. send = cache[pageStart:]
  304. ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
  305. defer cancel()
  306. _, _, err := a.apiClient.Signal.Add(ctx, &send)
  307. if err != nil {
  308. log.Errorf("sending signal to central API: %s", err)
  309. return
  310. }
  311. break
  312. }
  313. send = cache[pageStart:pageEnd]
  314. ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
  315. defer cancel()
  316. _, _, err := a.apiClient.Signal.Add(ctx, &send)
  317. if err != nil {
  318. //we log it here as well, because the return value of func might be discarded
  319. log.Errorf("sending signal to central API: %s", err)
  320. }
  321. pageStart += bulkSize
  322. pageEnd += bulkSize
  323. }
  324. }
  325. func (a *apic) CAPIPullIsOld() (bool, error) {
  326. /*only pull community blocklist if it's older than 1h30 */
  327. alerts := a.dbClient.Ent.Alert.Query()
  328. alerts = alerts.Where(alert.HasDecisionsWith(decision.OriginEQ(database.CapiMachineID)))
  329. alerts = alerts.Where(alert.CreatedAtGTE(time.Now().UTC().Add(-time.Duration(1*time.Hour + 30*time.Minute)))) //nolint:unconvert
  330. count, err := alerts.Count(a.dbClient.CTX)
  331. if err != nil {
  332. return false, errors.Wrap(err, "while looking for CAPI alert")
  333. }
  334. if count > 0 {
  335. log.Printf("last CAPI pull is newer than 1h30, skip.")
  336. return false, nil
  337. }
  338. return true, nil
  339. }
  340. func (a *apic) HandleDeletedDecisions(deletedDecisions []*models.Decision, delete_counters map[string]map[string]int) (int, error) {
  341. var filter map[string][]string
  342. var nbDeleted int
  343. for _, decision := range deletedDecisions {
  344. if strings.ToLower(*decision.Scope) == "ip" {
  345. filter = make(map[string][]string, 1)
  346. filter["value"] = []string{*decision.Value}
  347. } else {
  348. filter = make(map[string][]string, 3)
  349. filter["value"] = []string{*decision.Value}
  350. filter["type"] = []string{*decision.Type}
  351. filter["scopes"] = []string{*decision.Scope}
  352. }
  353. filter["origin"] = []string{*decision.Origin}
  354. dbCliRet, _, err := a.dbClient.SoftDeleteDecisionsWithFilter(filter)
  355. if err != nil {
  356. return 0, errors.Wrap(err, "deleting decisions error")
  357. }
  358. dbCliDel, err := strconv.Atoi(dbCliRet)
  359. if err != nil {
  360. return 0, errors.Wrapf(err, "converting db ret %d", dbCliDel)
  361. }
  362. updateCounterForDecision(delete_counters, decision.Origin, decision.Scenario, dbCliDel)
  363. nbDeleted += dbCliDel
  364. }
  365. return nbDeleted, nil
  366. }
  367. func (a *apic) HandleDeletedDecisionsV3(deletedDecisions []*modelscapi.GetDecisionsStreamResponseDeletedItem, delete_counters map[string]map[string]int) (int, error) {
  368. var filter map[string][]string
  369. var nbDeleted int
  370. for _, decisions := range deletedDecisions {
  371. scope := decisions.Scope
  372. for _, decision := range decisions.Decisions {
  373. if strings.ToLower(*scope) == "ip" {
  374. filter = make(map[string][]string, 1)
  375. filter["value"] = []string{decision}
  376. } else {
  377. filter = make(map[string][]string, 2)
  378. filter["value"] = []string{decision}
  379. filter["scopes"] = []string{*scope}
  380. }
  381. filter["origin"] = []string{types.CAPIOrigin}
  382. dbCliRet, _, err := a.dbClient.SoftDeleteDecisionsWithFilter(filter)
  383. if err != nil {
  384. return 0, errors.Wrap(err, "deleting decisions error")
  385. }
  386. dbCliDel, err := strconv.Atoi(dbCliRet)
  387. if err != nil {
  388. return 0, errors.Wrapf(err, "converting db ret %d", dbCliDel)
  389. }
  390. updateCounterForDecision(delete_counters, types.StrPtr(types.CAPIOrigin), nil, dbCliDel)
  391. nbDeleted += dbCliDel
  392. }
  393. }
  394. return nbDeleted, nil
  395. }
  396. func createAlertsForDecisions(decisions []*models.Decision) []*models.Alert {
  397. newAlerts := make([]*models.Alert, 0)
  398. for _, decision := range decisions {
  399. found := false
  400. for _, sub := range newAlerts {
  401. if sub.Source.Scope == nil {
  402. log.Warningf("nil scope in %+v", sub)
  403. continue
  404. }
  405. if *decision.Origin == types.CAPIOrigin {
  406. if *sub.Source.Scope == types.CAPIOrigin {
  407. found = true
  408. break
  409. }
  410. } else if *decision.Origin == types.ListOrigin {
  411. if *sub.Source.Scope == *decision.Origin {
  412. if sub.Scenario == nil {
  413. log.Warningf("nil scenario in %+v", sub)
  414. }
  415. if *sub.Scenario == *decision.Scenario {
  416. found = true
  417. break
  418. }
  419. }
  420. } else {
  421. log.Warningf("unknown origin %s : %+v", *decision.Origin, decision)
  422. }
  423. }
  424. if !found {
  425. log.Debugf("Create entry for origin:%s scenario:%s", *decision.Origin, *decision.Scenario)
  426. newAlerts = append(newAlerts, createAlertForDecision(decision))
  427. }
  428. }
  429. return newAlerts
  430. }
  431. func createAlertForDecision(decision *models.Decision) *models.Alert {
  432. newAlert := &models.Alert{}
  433. newAlert.Source = &models.Source{}
  434. newAlert.Source.Scope = types.StrPtr("")
  435. if *decision.Origin == types.CAPIOrigin { //to make things more user friendly, we replace CAPI with community-blocklist
  436. newAlert.Scenario = types.StrPtr(types.CAPIOrigin)
  437. newAlert.Source.Scope = types.StrPtr(types.CAPIOrigin)
  438. } else if *decision.Origin == types.ListOrigin {
  439. newAlert.Scenario = types.StrPtr(*decision.Scenario)
  440. newAlert.Source.Scope = types.StrPtr(types.ListOrigin)
  441. } else {
  442. log.Warningf("unknown origin %s", *decision.Origin)
  443. }
  444. newAlert.Message = types.StrPtr("")
  445. newAlert.Source.Value = types.StrPtr("")
  446. newAlert.StartAt = types.StrPtr(time.Now().UTC().Format(time.RFC3339))
  447. newAlert.StopAt = types.StrPtr(time.Now().UTC().Format(time.RFC3339))
  448. newAlert.Capacity = types.Int32Ptr(0)
  449. newAlert.Simulated = types.BoolPtr(false)
  450. newAlert.EventsCount = types.Int32Ptr(0)
  451. newAlert.Leakspeed = types.StrPtr("")
  452. newAlert.ScenarioHash = types.StrPtr("")
  453. newAlert.ScenarioVersion = types.StrPtr("")
  454. newAlert.MachineID = database.CapiMachineID
  455. return newAlert
  456. }
  457. // This function takes in list of parent alerts and decisions and then pairs them up.
  458. func fillAlertsWithDecisions(alerts []*models.Alert, decisions []*models.Decision, add_counters map[string]map[string]int) []*models.Alert {
  459. for _, decision := range decisions {
  460. //count and create separate alerts for each list
  461. updateCounterForDecision(add_counters, decision.Origin, decision.Scenario, 1)
  462. /*CAPI might send lower case scopes, unify it.*/
  463. switch strings.ToLower(*decision.Scope) {
  464. case "ip":
  465. *decision.Scope = types.Ip
  466. case "range":
  467. *decision.Scope = types.Range
  468. }
  469. found := false
  470. //add the individual decisions to the right list
  471. for idx, alert := range alerts {
  472. if *decision.Origin == types.CAPIOrigin {
  473. if *alert.Source.Scope == types.CAPIOrigin {
  474. alerts[idx].Decisions = append(alerts[idx].Decisions, decision)
  475. found = true
  476. break
  477. }
  478. } else if *decision.Origin == types.ListOrigin {
  479. if *alert.Source.Scope == types.ListOrigin && *alert.Scenario == *decision.Scenario {
  480. alerts[idx].Decisions = append(alerts[idx].Decisions, decision)
  481. found = true
  482. break
  483. }
  484. } else {
  485. log.Warningf("unknown origin %s", *decision.Origin)
  486. }
  487. }
  488. if !found {
  489. log.Warningf("Orphaned decision for %s - %s", *decision.Origin, *decision.Scenario)
  490. }
  491. }
  492. return alerts
  493. }
  494. // we receive a list of decisions and links for blocklist and we need to create a list of alerts :
  495. // one alert for "community blocklist"
  496. // one alert per list we're subscribed to
  497. func (a *apic) PullTop() error {
  498. var err error
  499. if lastPullIsOld, err := a.CAPIPullIsOld(); err != nil {
  500. return err
  501. } else if !lastPullIsOld {
  502. return nil
  503. }
  504. log.Infof("Starting community-blocklist update")
  505. data, _, err := a.apiClient.Decisions.GetStreamV3(context.Background(), apiclient.DecisionsStreamOpts{Startup: a.startup})
  506. if err != nil {
  507. return errors.Wrap(err, "get stream")
  508. }
  509. a.startup = false
  510. /*to count additions/deletions across lists*/
  511. log.Debugf("Received %d new decisions", len(data.New))
  512. log.Debugf("Received %d deleted decisions", len(data.Deleted))
  513. if data.Links != nil {
  514. log.Debugf("Received %d blocklists links", len(data.Links.Blocklists))
  515. }
  516. add_counters, delete_counters := makeAddAndDeleteCounters()
  517. // process deleted decisions
  518. if nbDeleted, err := a.HandleDeletedDecisionsV3(data.Deleted, delete_counters); err != nil {
  519. return err
  520. } else {
  521. log.Printf("capi/community-blocklist : %d explicit deletions", nbDeleted)
  522. }
  523. if len(data.New) == 0 {
  524. log.Infof("capi/community-blocklist : received 0 new entries (expected if you just installed crowdsec)")
  525. return nil
  526. }
  527. // create one alert for community blocklist using the first decision
  528. decisions := a.apiClient.Decisions.GetDecisionsFromGroups(data.New)
  529. //apply APIC specific whitelists
  530. decisions = a.ApplyApicWhitelists(decisions)
  531. alert := createAlertForDecision(decisions[0])
  532. alertsFromCapi := []*models.Alert{alert}
  533. alertsFromCapi = fillAlertsWithDecisions(alertsFromCapi, decisions, add_counters)
  534. err = a.SaveAlerts(alertsFromCapi, add_counters, delete_counters)
  535. if err != nil {
  536. return errors.Wrap(err, "while saving alerts")
  537. }
  538. // update blocklists
  539. if err := a.UpdateBlocklists(data.Links, add_counters); err != nil {
  540. return errors.Wrap(err, "while updating blocklists")
  541. }
  542. return nil
  543. }
  544. func (a *apic) ApplyApicWhitelists(decisions []*models.Decision) []*models.Decision {
  545. if a.whitelists == nil {
  546. return decisions
  547. }
  548. //deal with CAPI whitelists for fire. We want to avoid having a second list, so we shrink in place
  549. outIdx := 0
  550. for _, decision := range decisions {
  551. if decision.Value == nil {
  552. continue
  553. }
  554. skip := false
  555. ipval := net.ParseIP(*decision.Value)
  556. for _, cidr := range a.whitelists.Cidrs {
  557. if skip {
  558. break
  559. }
  560. if cidr.Contains(ipval) {
  561. log.Infof("%s from %s is whitelisted by %s", *decision.Value, *decision.Scenario, cidr.String())
  562. skip = true
  563. }
  564. }
  565. for _, ip := range a.whitelists.Ips {
  566. if skip {
  567. break
  568. }
  569. if ip != nil && ip.Equal(ipval) {
  570. log.Infof("%s from %s is whitelisted by %s", *decision.Value, *decision.Scenario, ip.String())
  571. skip = true
  572. }
  573. }
  574. if !skip {
  575. decisions[outIdx] = decision
  576. outIdx++
  577. }
  578. }
  579. //shrink the list, those are deleted items
  580. decisions = decisions[:outIdx]
  581. return decisions
  582. }
  583. func (a *apic) SaveAlerts(alertsFromCapi []*models.Alert, add_counters map[string]map[string]int, delete_counters map[string]map[string]int) error {
  584. for idx, alert := range alertsFromCapi {
  585. alertsFromCapi[idx] = setAlertScenario(add_counters, delete_counters, alert)
  586. log.Debugf("%s has %d decisions", *alertsFromCapi[idx].Source.Scope, len(alertsFromCapi[idx].Decisions))
  587. if a.dbClient.Type == "sqlite" && (a.dbClient.WalMode == nil || !*a.dbClient.WalMode) {
  588. log.Warningf("sqlite is not using WAL mode, LAPI might become unresponsive when inserting the community blocklist")
  589. }
  590. alertID, inserted, deleted, err := a.dbClient.UpdateCommunityBlocklist(alertsFromCapi[idx])
  591. if err != nil {
  592. return errors.Wrapf(err, "while saving alert from %s", *alertsFromCapi[idx].Source.Scope)
  593. }
  594. log.Printf("%s : added %d entries, deleted %d entries (alert:%d)", *alertsFromCapi[idx].Source.Scope, inserted, deleted, alertID)
  595. }
  596. return nil
  597. }
  598. func (a *apic) ShouldForcePullBlocklist(blocklist *modelscapi.BlocklistLink) (bool, error) {
  599. // we should force pull if the blocklist decisions are about to expire or there's no decision in the db
  600. alertQuery := a.dbClient.Ent.Alert.Query()
  601. alertQuery.Where(alert.SourceScopeEQ(fmt.Sprintf("%s:%s", types.ListOrigin, *blocklist.Name)))
  602. alertQuery.Order(ent.Desc(alert.FieldCreatedAt))
  603. alertInstance, err := alertQuery.First(context.Background())
  604. if err != nil {
  605. if ent.IsNotFound(err) {
  606. log.Debugf("no alert found for %s, force refresh", *blocklist.Name)
  607. return true, nil
  608. }
  609. return false, errors.Wrap(err, "while getting alert")
  610. }
  611. decisionQuery := a.dbClient.Ent.Decision.Query()
  612. decisionQuery.Where(decision.HasOwnerWith(alert.IDEQ(alertInstance.ID)))
  613. firstDecision, err := decisionQuery.First(context.Background())
  614. if err != nil {
  615. if ent.IsNotFound(err) {
  616. log.Debugf("no decision found for %s, force refresh", *blocklist.Name)
  617. return true, nil
  618. }
  619. return false, errors.Wrap(err, "while getting decision")
  620. }
  621. if firstDecision == nil || firstDecision.Until == nil || firstDecision.Until.Sub(time.Now().UTC()) < (a.pullInterval+15*time.Minute) {
  622. log.Debugf("at least one decision found for %s, expire soon, force refresh", *blocklist.Name)
  623. return true, nil
  624. }
  625. return false, nil
  626. }
  627. func (a *apic) UpdateBlocklists(links *modelscapi.GetDecisionsStreamResponseLinks, add_counters map[string]map[string]int) error {
  628. if links == nil {
  629. return nil
  630. }
  631. if links.Blocklists == nil {
  632. return nil
  633. }
  634. // we must use a different http client than apiClient's because the transport of apiClient is jwtTransport or here we have signed apis that are incompatibles
  635. // we can use the same baseUrl as the urls are absolute and the parse will take care of it
  636. defaultClient, err := apiclient.NewDefaultClient(a.apiClient.BaseURL, "", "", nil)
  637. if err != nil {
  638. return errors.Wrap(err, "while creating default client")
  639. }
  640. for _, blocklist := range links.Blocklists {
  641. if blocklist.Scope == nil {
  642. log.Warningf("blocklist has no scope")
  643. continue
  644. }
  645. if blocklist.Duration == nil {
  646. log.Warningf("blocklist has no duration")
  647. continue
  648. }
  649. forcePull, err := a.ShouldForcePullBlocklist(blocklist)
  650. if err != nil {
  651. return errors.Wrapf(err, "while checking if we should force pull blocklist %s", *blocklist.Name)
  652. }
  653. blocklistConfigItemName := fmt.Sprintf("blocklist:%s:last_pull", *blocklist.Name)
  654. var lastPullTimestamp *string
  655. if !forcePull {
  656. lastPullTimestamp, err = a.dbClient.GetConfigItem(blocklistConfigItemName)
  657. if err != nil {
  658. return errors.Wrapf(err, "while getting last pull timestamp for blocklist %s", *blocklist.Name)
  659. }
  660. }
  661. decisions, has_changed, err := defaultClient.Decisions.GetDecisionsFromBlocklist(context.Background(), blocklist, lastPullTimestamp)
  662. if err != nil {
  663. return errors.Wrapf(err, "while getting decisions from blocklist %s", *blocklist.Name)
  664. }
  665. if !has_changed {
  666. if lastPullTimestamp == nil {
  667. log.Infof("blocklist %s hasn't been modified or there was an error reading it, skipping", *blocklist.Name)
  668. } else {
  669. log.Infof("blocklist %s hasn't been modified since %s, skipping", *blocklist.Name, *lastPullTimestamp)
  670. }
  671. continue
  672. }
  673. err = a.dbClient.SetConfigItem(blocklistConfigItemName, time.Now().UTC().Format(http.TimeFormat))
  674. if err != nil {
  675. return errors.Wrapf(err, "while setting last pull timestamp for blocklist %s", *blocklist.Name)
  676. }
  677. if len(decisions) == 0 {
  678. log.Infof("blocklist %s has no decisions", *blocklist.Name)
  679. continue
  680. }
  681. //apply APIC specific whitelists
  682. decisions = a.ApplyApicWhitelists(decisions)
  683. alert := createAlertForDecision(decisions[0])
  684. alertsFromCapi := []*models.Alert{alert}
  685. alertsFromCapi = fillAlertsWithDecisions(alertsFromCapi, decisions, add_counters)
  686. err = a.SaveAlerts(alertsFromCapi, add_counters, nil)
  687. if err != nil {
  688. return errors.Wrapf(err, "while saving alert from blocklist %s", *blocklist.Name)
  689. }
  690. }
  691. return nil
  692. }
  693. func setAlertScenario(add_counters map[string]map[string]int, delete_counters map[string]map[string]int, alert *models.Alert) *models.Alert {
  694. if *alert.Source.Scope == types.CAPIOrigin {
  695. *alert.Source.Scope = SCOPE_CAPI_ALIAS_ALIAS
  696. alert.Scenario = types.StrPtr(fmt.Sprintf("update : +%d/-%d IPs", add_counters[types.CAPIOrigin]["all"], delete_counters[types.CAPIOrigin]["all"]))
  697. } else if *alert.Source.Scope == types.ListOrigin {
  698. *alert.Source.Scope = fmt.Sprintf("%s:%s", types.ListOrigin, *alert.Scenario)
  699. alert.Scenario = types.StrPtr(fmt.Sprintf("update : +%d/-%d IPs", add_counters[types.ListOrigin][*alert.Scenario], delete_counters[types.ListOrigin][*alert.Scenario]))
  700. }
  701. return alert
  702. }
  703. func (a *apic) Pull() error {
  704. defer types.CatchPanic("lapi/pullFromAPIC")
  705. toldOnce := false
  706. for {
  707. scenario, err := a.FetchScenariosListFromDB()
  708. if err != nil {
  709. log.Errorf("unable to fetch scenarios from db: %s", err)
  710. }
  711. if len(scenario) > 0 {
  712. break
  713. }
  714. if !toldOnce {
  715. log.Warning("scenario list is empty, will not pull yet")
  716. toldOnce = true
  717. }
  718. time.Sleep(1 * time.Second)
  719. }
  720. if err := a.PullTop(); err != nil {
  721. log.Errorf("capi pull top: %s", err)
  722. }
  723. log.Infof("Start pull from CrowdSec Central API (interval: %s once, then %s)", a.pullIntervalFirst.Round(time.Second), a.pullInterval)
  724. ticker := time.NewTicker(a.pullIntervalFirst)
  725. for {
  726. select {
  727. case <-ticker.C:
  728. ticker.Reset(a.pullInterval)
  729. if err := a.PullTop(); err != nil {
  730. log.Errorf("capi pull top: %s", err)
  731. continue
  732. }
  733. case <-a.pullTomb.Dying(): // if one apic routine is dying, do we kill the others?
  734. a.metricsTomb.Kill(nil)
  735. a.pushTomb.Kill(nil)
  736. return nil
  737. }
  738. }
  739. }
  740. func (a *apic) GetMetrics() (*models.Metrics, error) {
  741. metric := &models.Metrics{
  742. ApilVersion: types.StrPtr(cwversion.VersionStr()),
  743. Machines: make([]*models.MetricsAgentInfo, 0),
  744. Bouncers: make([]*models.MetricsBouncerInfo, 0),
  745. }
  746. machines, err := a.dbClient.ListMachines()
  747. if err != nil {
  748. return metric, err
  749. }
  750. bouncers, err := a.dbClient.ListBouncers()
  751. if err != nil {
  752. return metric, err
  753. }
  754. var lastpush string
  755. for _, machine := range machines {
  756. if machine.LastPush == nil {
  757. lastpush = time.Time{}.String()
  758. } else {
  759. lastpush = machine.LastPush.String()
  760. }
  761. m := &models.MetricsAgentInfo{
  762. Version: machine.Version,
  763. Name: machine.MachineId,
  764. LastUpdate: machine.UpdatedAt.String(),
  765. LastPush: lastpush,
  766. }
  767. metric.Machines = append(metric.Machines, m)
  768. }
  769. for _, bouncer := range bouncers {
  770. m := &models.MetricsBouncerInfo{
  771. Version: bouncer.Version,
  772. CustomName: bouncer.Name,
  773. Name: bouncer.Type,
  774. LastPull: bouncer.LastPull.String(),
  775. }
  776. metric.Bouncers = append(metric.Bouncers, m)
  777. }
  778. return metric, nil
  779. }
  780. func (a *apic) SendMetrics(stop chan (bool)) {
  781. defer types.CatchPanic("lapi/metricsToAPIC")
  782. ticker := time.NewTicker(a.metricsIntervalFirst)
  783. log.Infof("Start send metrics to CrowdSec Central API (interval: %s once, then %s)", a.metricsIntervalFirst.Round(time.Second), a.metricsInterval)
  784. for {
  785. metrics, err := a.GetMetrics()
  786. if err != nil {
  787. log.Errorf("unable to get metrics (%s), will retry", err)
  788. }
  789. _, _, err = a.apiClient.Metrics.Add(context.Background(), metrics)
  790. if err != nil {
  791. log.Errorf("capi metrics: failed: %s", err)
  792. } else {
  793. log.Infof("capi metrics: metrics sent successfully")
  794. }
  795. select {
  796. case <-stop:
  797. return
  798. case <-ticker.C:
  799. ticker.Reset(a.metricsInterval)
  800. case <-a.metricsTomb.Dying(): // if one apic routine is dying, do we kill the others?
  801. a.pullTomb.Kill(nil)
  802. a.pushTomb.Kill(nil)
  803. return
  804. }
  805. }
  806. }
  807. func (a *apic) Shutdown() {
  808. a.pushTomb.Kill(nil)
  809. a.pullTomb.Kill(nil)
  810. a.metricsTomb.Kill(nil)
  811. }
  812. func makeAddAndDeleteCounters() (map[string]map[string]int, map[string]map[string]int) {
  813. add_counters := make(map[string]map[string]int)
  814. add_counters[types.CAPIOrigin] = make(map[string]int)
  815. add_counters[types.ListOrigin] = make(map[string]int)
  816. delete_counters := make(map[string]map[string]int)
  817. delete_counters[types.CAPIOrigin] = make(map[string]int)
  818. delete_counters[types.ListOrigin] = make(map[string]int)
  819. return add_counters, delete_counters
  820. }
  821. func updateCounterForDecision(counter map[string]map[string]int, origin *string, scenario *string, totalDecisions int) {
  822. if *origin == types.CAPIOrigin {
  823. counter[*origin]["all"] += totalDecisions
  824. } else if *origin == types.ListOrigin {
  825. counter[*origin][*scenario] += totalDecisions
  826. } else {
  827. log.Warningf("Unknown origin %s", *origin)
  828. }
  829. }