apic.go 26 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793
  1. package apiserver
  2. import (
  3. "context"
  4. "fmt"
  5. "math/rand"
  6. "net/http"
  7. "net/url"
  8. "strconv"
  9. "strings"
  10. "sync"
  11. "time"
  12. "github.com/go-openapi/strfmt"
  13. "github.com/pkg/errors"
  14. log "github.com/sirupsen/logrus"
  15. "gopkg.in/tomb.v2"
  16. "github.com/crowdsecurity/crowdsec/pkg/apiclient"
  17. "github.com/crowdsecurity/crowdsec/pkg/csconfig"
  18. "github.com/crowdsecurity/crowdsec/pkg/cwversion"
  19. "github.com/crowdsecurity/crowdsec/pkg/database"
  20. "github.com/crowdsecurity/crowdsec/pkg/database/ent/alert"
  21. "github.com/crowdsecurity/crowdsec/pkg/database/ent/decision"
  22. "github.com/crowdsecurity/crowdsec/pkg/models"
  23. "github.com/crowdsecurity/crowdsec/pkg/modelscapi"
  24. "github.com/crowdsecurity/crowdsec/pkg/types"
  25. )
  26. var (
  27. pullIntervalDefault = time.Hour * 2
  28. pullIntervalDelta = 5 * time.Minute
  29. pushIntervalDefault = time.Second * 10
  30. pushIntervalDelta = time.Second * 7
  31. metricsIntervalDefault = time.Minute * 30
  32. metricsIntervalDelta = time.Minute * 15
  33. )
  34. var SCOPE_CAPI_ALIAS_ALIAS string = "crowdsecurity/community-blocklist" //we don't use "CAPI" directly, to make it less confusing for the user
  35. type apic struct {
  36. // when changing the intervals in tests, always set *First too
  37. // or they can be negative
  38. pullInterval time.Duration
  39. pullIntervalFirst time.Duration
  40. pushInterval time.Duration
  41. pushIntervalFirst time.Duration
  42. metricsInterval time.Duration
  43. metricsIntervalFirst time.Duration
  44. dbClient *database.Client
  45. apiClient *apiclient.ApiClient
  46. AlertsAddChan chan []*models.Alert
  47. mu sync.Mutex
  48. pushTomb tomb.Tomb
  49. pullTomb tomb.Tomb
  50. metricsTomb tomb.Tomb
  51. startup bool
  52. credentials *csconfig.ApiCredentialsCfg
  53. scenarioList []string
  54. consoleConfig *csconfig.ConsoleConfig
  55. }
  56. // randomDuration returns a duration value between d-delta and d+delta
  57. func randomDuration(d time.Duration, delta time.Duration) time.Duration {
  58. return time.Duration(float64(d) + float64(delta)*(-1.0+2.0*rand.Float64()))
  59. }
  60. func (a *apic) FetchScenariosListFromDB() ([]string, error) {
  61. scenarios := make([]string, 0)
  62. machines, err := a.dbClient.ListMachines()
  63. if err != nil {
  64. return nil, errors.Wrap(err, "while listing machines")
  65. }
  66. //merge all scenarios together
  67. for _, v := range machines {
  68. machineScenarios := strings.Split(v.Scenarios, ",")
  69. log.Debugf("%d scenarios for machine %d", len(machineScenarios), v.ID)
  70. for _, sv := range machineScenarios {
  71. if !types.InSlice(sv, scenarios) && sv != "" {
  72. scenarios = append(scenarios, sv)
  73. }
  74. }
  75. }
  76. log.Debugf("Returning list of scenarios : %+v", scenarios)
  77. return scenarios, nil
  78. }
  79. func decisionsToApiDecisions(decisions []*models.Decision) models.AddSignalsRequestItemDecisions {
  80. apiDecisions := models.AddSignalsRequestItemDecisions{}
  81. for _, decision := range decisions {
  82. x := &models.AddSignalsRequestItemDecisionsItem{
  83. Duration: types.StrPtr(*decision.Duration),
  84. ID: new(int64),
  85. Origin: types.StrPtr(*decision.Origin),
  86. Scenario: types.StrPtr(*decision.Scenario),
  87. Scope: types.StrPtr(*decision.Scope),
  88. //Simulated: *decision.Simulated,
  89. Type: types.StrPtr(*decision.Type),
  90. Until: decision.Until,
  91. Value: types.StrPtr(*decision.Value),
  92. UUID: decision.UUID,
  93. }
  94. *x.ID = decision.ID
  95. if decision.Simulated != nil {
  96. x.Simulated = *decision.Simulated
  97. }
  98. apiDecisions = append(apiDecisions, x)
  99. }
  100. return apiDecisions
  101. }
  102. func alertToSignal(alert *models.Alert, scenarioTrust string, shareContext bool) *models.AddSignalsRequestItem {
  103. signal := &models.AddSignalsRequestItem{
  104. Message: alert.Message,
  105. Scenario: alert.Scenario,
  106. ScenarioHash: alert.ScenarioHash,
  107. ScenarioVersion: alert.ScenarioVersion,
  108. Source: &models.AddSignalsRequestItemSource{
  109. AsName: alert.Source.AsName,
  110. AsNumber: alert.Source.AsNumber,
  111. Cn: alert.Source.Cn,
  112. IP: alert.Source.IP,
  113. Latitude: alert.Source.Latitude,
  114. Longitude: alert.Source.Longitude,
  115. Range: alert.Source.Range,
  116. Scope: alert.Source.Scope,
  117. Value: alert.Source.Value,
  118. },
  119. StartAt: alert.StartAt,
  120. StopAt: alert.StopAt,
  121. CreatedAt: alert.CreatedAt,
  122. MachineID: alert.MachineID,
  123. ScenarioTrust: scenarioTrust,
  124. Decisions: decisionsToApiDecisions(alert.Decisions),
  125. UUID: alert.UUID,
  126. }
  127. if shareContext {
  128. signal.Context = make([]*models.AddSignalsRequestItemContextItems0, 0)
  129. for _, meta := range alert.Meta {
  130. contextItem := models.AddSignalsRequestItemContextItems0{
  131. Key: meta.Key,
  132. Value: meta.Value,
  133. }
  134. signal.Context = append(signal.Context, &contextItem)
  135. }
  136. }
  137. return signal
  138. }
  139. func NewAPIC(config *csconfig.OnlineApiClientCfg, dbClient *database.Client, consoleConfig *csconfig.ConsoleConfig) (*apic, error) {
  140. var err error
  141. ret := &apic{
  142. AlertsAddChan: make(chan []*models.Alert),
  143. dbClient: dbClient,
  144. mu: sync.Mutex{},
  145. startup: true,
  146. credentials: config.Credentials,
  147. pullTomb: tomb.Tomb{},
  148. pushTomb: tomb.Tomb{},
  149. metricsTomb: tomb.Tomb{},
  150. scenarioList: make([]string, 0),
  151. consoleConfig: consoleConfig,
  152. pullInterval: pullIntervalDefault,
  153. pullIntervalFirst: randomDuration(pullIntervalDefault, pullIntervalDelta),
  154. pushInterval: pushIntervalDefault,
  155. pushIntervalFirst: randomDuration(pushIntervalDefault, pushIntervalDelta),
  156. metricsInterval: metricsIntervalDefault,
  157. metricsIntervalFirst: randomDuration(metricsIntervalDefault, metricsIntervalDelta),
  158. }
  159. password := strfmt.Password(config.Credentials.Password)
  160. apiURL, err := url.Parse(config.Credentials.URL)
  161. if err != nil {
  162. return nil, errors.Wrapf(err, "while parsing '%s'", config.Credentials.URL)
  163. }
  164. papiURL, err := url.Parse(config.Credentials.PapiURL)
  165. if err != nil {
  166. return nil, errors.Wrapf(err, "while parsing '%s'", config.Credentials.PapiURL)
  167. }
  168. ret.scenarioList, err = ret.FetchScenariosListFromDB()
  169. if err != nil {
  170. return nil, errors.Wrap(err, "while fetching scenarios from db")
  171. }
  172. ret.apiClient, err = apiclient.NewClient(&apiclient.Config{
  173. MachineID: config.Credentials.Login,
  174. Password: password,
  175. UserAgent: fmt.Sprintf("crowdsec/%s", cwversion.VersionStr()),
  176. URL: apiURL,
  177. PapiURL: papiURL,
  178. VersionPrefix: "v3",
  179. Scenarios: ret.scenarioList,
  180. UpdateScenario: ret.FetchScenariosListFromDB,
  181. })
  182. if err != nil {
  183. return nil, errors.Wrap(err, "while creating api client")
  184. }
  185. scenarios, err := ret.FetchScenariosListFromDB()
  186. if err != nil {
  187. return ret, errors.Wrapf(err, "get scenario in db: %s", err)
  188. }
  189. if _, err = ret.apiClient.Auth.AuthenticateWatcher(context.Background(), models.WatcherAuthRequest{
  190. MachineID: &config.Credentials.Login,
  191. Password: &password,
  192. Scenarios: scenarios,
  193. }); err != nil {
  194. return ret, errors.Wrapf(err, "authenticate watcher (%s)", config.Credentials.Login)
  195. }
  196. return ret, err
  197. }
  198. // keep track of all alerts in cache and push it to CAPI every PushInterval.
  199. func (a *apic) Push() error {
  200. defer types.CatchPanic("lapi/pushToAPIC")
  201. var cache models.AddSignalsRequest
  202. ticker := time.NewTicker(a.pushIntervalFirst)
  203. log.Infof("Start push to CrowdSec Central API (interval: %s once, then %s)", a.pushIntervalFirst.Round(time.Second), a.pushInterval)
  204. for {
  205. select {
  206. case <-a.pushTomb.Dying(): // if one apic routine is dying, do we kill the others?
  207. a.pullTomb.Kill(nil)
  208. a.metricsTomb.Kill(nil)
  209. log.Infof("push tomb is dying, sending cache (%d elements) before exiting", len(cache))
  210. if len(cache) == 0 {
  211. return nil
  212. }
  213. go a.Send(&cache)
  214. return nil
  215. case <-ticker.C:
  216. ticker.Reset(a.pushInterval)
  217. if len(cache) > 0 {
  218. a.mu.Lock()
  219. cacheCopy := cache
  220. cache = make(models.AddSignalsRequest, 0)
  221. a.mu.Unlock()
  222. log.Infof("Signal push: %d signals to push", len(cacheCopy))
  223. go a.Send(&cacheCopy)
  224. }
  225. case alerts := <-a.AlertsAddChan:
  226. var signals []*models.AddSignalsRequestItem
  227. for _, alert := range alerts {
  228. if ok := shouldShareAlert(alert, a.consoleConfig); ok {
  229. signals = append(signals, alertToSignal(alert, getScenarioTrustOfAlert(alert), *a.consoleConfig.ShareContext))
  230. }
  231. }
  232. a.mu.Lock()
  233. cache = append(cache, signals...)
  234. a.mu.Unlock()
  235. }
  236. }
  237. }
  238. func getScenarioTrustOfAlert(alert *models.Alert) string {
  239. scenarioTrust := "certified"
  240. if alert.ScenarioHash == nil || *alert.ScenarioHash == "" {
  241. scenarioTrust = "custom"
  242. } else if alert.ScenarioVersion == nil || *alert.ScenarioVersion == "" || *alert.ScenarioVersion == "?" {
  243. scenarioTrust = "tainted"
  244. }
  245. if len(alert.Decisions) > 0 {
  246. if *alert.Decisions[0].Origin == types.CscliOrigin {
  247. scenarioTrust = "manual"
  248. }
  249. }
  250. return scenarioTrust
  251. }
  252. func shouldShareAlert(alert *models.Alert, consoleConfig *csconfig.ConsoleConfig) bool {
  253. if *alert.Simulated {
  254. log.Debugf("simulation enabled for alert (id:%d), will not be sent to CAPI", alert.ID)
  255. return false
  256. }
  257. switch scenarioTrust := getScenarioTrustOfAlert(alert); scenarioTrust {
  258. case "manual":
  259. if !*consoleConfig.ShareManualDecisions {
  260. log.Debugf("manual decision generated an alert, doesn't send it to CAPI because options is disabled")
  261. return false
  262. }
  263. case "tainted":
  264. if !*consoleConfig.ShareTaintedScenarios {
  265. log.Debugf("tainted scenario generated an alert, doesn't send it to CAPI because options is disabled")
  266. return false
  267. }
  268. case "custom":
  269. if !*consoleConfig.ShareCustomScenarios {
  270. log.Debugf("custom scenario generated an alert, doesn't send it to CAPI because options is disabled")
  271. return false
  272. }
  273. }
  274. return true
  275. }
  276. func (a *apic) Send(cacheOrig *models.AddSignalsRequest) {
  277. /*we do have a problem with this :
  278. The apic.Push background routine reads from alertToPush chan.
  279. This chan is filled by Controller.CreateAlert
  280. If the chan apic.Send hangs, the alertToPush chan will become full,
  281. with means that Controller.CreateAlert is going to hang, blocking API worker(s).
  282. So instead, we prefer to cancel write.
  283. I don't know enough about gin to tell how much of an issue it can be.
  284. */
  285. var cache []*models.AddSignalsRequestItem = *cacheOrig
  286. var send models.AddSignalsRequest
  287. bulkSize := 50
  288. pageStart := 0
  289. pageEnd := bulkSize
  290. for {
  291. if pageEnd >= len(cache) {
  292. send = cache[pageStart:]
  293. ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
  294. defer cancel()
  295. _, _, err := a.apiClient.Signal.Add(ctx, &send)
  296. if err != nil {
  297. log.Errorf("sending signal to central API: %s", err)
  298. return
  299. }
  300. break
  301. }
  302. send = cache[pageStart:pageEnd]
  303. ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
  304. defer cancel()
  305. _, _, err := a.apiClient.Signal.Add(ctx, &send)
  306. if err != nil {
  307. //we log it here as well, because the return value of func might be discarded
  308. log.Errorf("sending signal to central API: %s", err)
  309. }
  310. pageStart += bulkSize
  311. pageEnd += bulkSize
  312. }
  313. }
  314. func (a *apic) CAPIPullIsOld() (bool, error) {
  315. /*only pull community blocklist if it's older than 1h30 */
  316. alerts := a.dbClient.Ent.Alert.Query()
  317. alerts = alerts.Where(alert.HasDecisionsWith(decision.OriginEQ(database.CapiMachineID)))
  318. alerts = alerts.Where(alert.CreatedAtGTE(time.Now().UTC().Add(-time.Duration(1*time.Hour + 30*time.Minute)))) //nolint:unconvert
  319. count, err := alerts.Count(a.dbClient.CTX)
  320. if err != nil {
  321. return false, errors.Wrap(err, "while looking for CAPI alert")
  322. }
  323. if count > 0 {
  324. log.Printf("last CAPI pull is newer than 1h30, skip.")
  325. return false, nil
  326. }
  327. return true, nil
  328. }
  329. func (a *apic) HandleDeletedDecisions(deletedDecisions []*models.Decision, delete_counters map[string]map[string]int) (int, error) {
  330. var filter map[string][]string
  331. var nbDeleted int
  332. for _, decision := range deletedDecisions {
  333. if strings.ToLower(*decision.Scope) == "ip" {
  334. filter = make(map[string][]string, 1)
  335. filter["value"] = []string{*decision.Value}
  336. } else {
  337. filter = make(map[string][]string, 3)
  338. filter["value"] = []string{*decision.Value}
  339. filter["type"] = []string{*decision.Type}
  340. filter["scopes"] = []string{*decision.Scope}
  341. }
  342. filter["origin"] = []string{*decision.Origin}
  343. dbCliRet, _, err := a.dbClient.SoftDeleteDecisionsWithFilter(filter)
  344. if err != nil {
  345. return 0, errors.Wrap(err, "deleting decisions error")
  346. }
  347. dbCliDel, err := strconv.Atoi(dbCliRet)
  348. if err != nil {
  349. return 0, errors.Wrapf(err, "converting db ret %d", dbCliDel)
  350. }
  351. updateCounterForDecision(delete_counters, decision.Origin, decision.Scenario, dbCliDel)
  352. nbDeleted += dbCliDel
  353. }
  354. return nbDeleted, nil
  355. }
  356. func (a *apic) HandleDeletedDecisionsV3(deletedDecisions []*modelscapi.GetDecisionsStreamResponseDeletedItem, delete_counters map[string]map[string]int) (int, error) {
  357. var filter map[string][]string
  358. var nbDeleted int
  359. for _, decisions := range deletedDecisions {
  360. scope := decisions.Scope
  361. for _, decision := range decisions.Decisions {
  362. if strings.ToLower(*scope) == "ip" {
  363. filter = make(map[string][]string, 1)
  364. filter["value"] = []string{decision}
  365. } else {
  366. filter = make(map[string][]string, 2)
  367. filter["value"] = []string{decision}
  368. filter["scopes"] = []string{*scope}
  369. }
  370. filter["origin"] = []string{types.CAPIOrigin}
  371. dbCliRet, _, err := a.dbClient.SoftDeleteDecisionsWithFilter(filter)
  372. if err != nil {
  373. return 0, errors.Wrap(err, "deleting decisions error")
  374. }
  375. dbCliDel, err := strconv.Atoi(dbCliRet)
  376. if err != nil {
  377. return 0, errors.Wrapf(err, "converting db ret %d", dbCliDel)
  378. }
  379. updateCounterForDecision(delete_counters, types.StrPtr(types.CAPIOrigin), nil, dbCliDel)
  380. nbDeleted += dbCliDel
  381. }
  382. }
  383. return nbDeleted, nil
  384. }
  385. func createAlertsForDecisions(decisions []*models.Decision) []*models.Alert {
  386. newAlerts := make([]*models.Alert, 0)
  387. for _, decision := range decisions {
  388. found := false
  389. for _, sub := range newAlerts {
  390. if sub.Source.Scope == nil {
  391. log.Warningf("nil scope in %+v", sub)
  392. continue
  393. }
  394. if *decision.Origin == types.CAPIOrigin {
  395. if *sub.Source.Scope == types.CAPIOrigin {
  396. found = true
  397. break
  398. }
  399. } else if *decision.Origin == types.ListOrigin {
  400. if *sub.Source.Scope == *decision.Origin {
  401. if sub.Scenario == nil {
  402. log.Warningf("nil scenario in %+v", sub)
  403. }
  404. if *sub.Scenario == *decision.Scenario {
  405. found = true
  406. break
  407. }
  408. }
  409. } else {
  410. log.Warningf("unknown origin %s : %+v", *decision.Origin, decision)
  411. }
  412. }
  413. if !found {
  414. log.Debugf("Create entry for origin:%s scenario:%s", *decision.Origin, *decision.Scenario)
  415. newAlerts = append(newAlerts, createAlertForDecision(decision))
  416. }
  417. }
  418. return newAlerts
  419. }
  420. func createAlertForDecision(decision *models.Decision) *models.Alert {
  421. newAlert := &models.Alert{}
  422. newAlert.Source = &models.Source{}
  423. newAlert.Source.Scope = types.StrPtr("")
  424. if *decision.Origin == types.CAPIOrigin { //to make things more user friendly, we replace CAPI with community-blocklist
  425. newAlert.Scenario = types.StrPtr(types.CAPIOrigin)
  426. newAlert.Source.Scope = types.StrPtr(types.CAPIOrigin)
  427. } else if *decision.Origin == types.ListOrigin {
  428. newAlert.Scenario = types.StrPtr(*decision.Scenario)
  429. newAlert.Source.Scope = types.StrPtr(types.ListOrigin)
  430. } else {
  431. log.Warningf("unknown origin %s", *decision.Origin)
  432. }
  433. newAlert.Message = types.StrPtr("")
  434. newAlert.Source.Value = types.StrPtr("")
  435. newAlert.StartAt = types.StrPtr(time.Now().UTC().Format(time.RFC3339))
  436. newAlert.StopAt = types.StrPtr(time.Now().UTC().Format(time.RFC3339))
  437. newAlert.Capacity = types.Int32Ptr(0)
  438. newAlert.Simulated = types.BoolPtr(false)
  439. newAlert.EventsCount = types.Int32Ptr(0)
  440. newAlert.Leakspeed = types.StrPtr("")
  441. newAlert.ScenarioHash = types.StrPtr("")
  442. newAlert.ScenarioVersion = types.StrPtr("")
  443. newAlert.MachineID = database.CapiMachineID
  444. return newAlert
  445. }
  446. // This function takes in list of parent alerts and decisions and then pairs them up.
  447. func fillAlertsWithDecisions(alerts []*models.Alert, decisions []*models.Decision, add_counters map[string]map[string]int) []*models.Alert {
  448. for _, decision := range decisions {
  449. //count and create separate alerts for each list
  450. updateCounterForDecision(add_counters, decision.Origin, decision.Scenario, 1)
  451. /*CAPI might send lower case scopes, unify it.*/
  452. switch strings.ToLower(*decision.Scope) {
  453. case "ip":
  454. *decision.Scope = types.Ip
  455. case "range":
  456. *decision.Scope = types.Range
  457. }
  458. found := false
  459. //add the individual decisions to the right list
  460. for idx, alert := range alerts {
  461. if *decision.Origin == types.CAPIOrigin {
  462. if *alert.Source.Scope == types.CAPIOrigin {
  463. alerts[idx].Decisions = append(alerts[idx].Decisions, decision)
  464. found = true
  465. break
  466. }
  467. } else if *decision.Origin == types.ListOrigin {
  468. if *alert.Source.Scope == types.ListOrigin && *alert.Scenario == *decision.Scenario {
  469. alerts[idx].Decisions = append(alerts[idx].Decisions, decision)
  470. found = true
  471. break
  472. }
  473. } else {
  474. log.Warningf("unknown origin %s", *decision.Origin)
  475. }
  476. }
  477. if !found {
  478. log.Warningf("Orphaned decision for %s - %s", *decision.Origin, *decision.Scenario)
  479. }
  480. }
  481. return alerts
  482. }
  483. // we receive a list of decisions and links for blocklist and we need to create a list of alerts :
  484. // one alert for "community blocklist"
  485. // one alert per list we're subscribed to
  486. func (a *apic) PullTop() error {
  487. var err error
  488. if lastPullIsOld, err := a.CAPIPullIsOld(); err != nil {
  489. return err
  490. } else if !lastPullIsOld {
  491. return nil
  492. }
  493. log.Infof("Starting community-blocklist update")
  494. data, _, err := a.apiClient.Decisions.GetStreamV3(context.Background(), apiclient.DecisionsStreamOpts{Startup: a.startup})
  495. if err != nil {
  496. return errors.Wrap(err, "get stream")
  497. }
  498. a.startup = false
  499. /*to count additions/deletions across lists*/
  500. log.Debugf("Received %d new decisions", len(data.New))
  501. log.Debugf("Received %d deleted decisions", len(data.Deleted))
  502. if data.Links != nil {
  503. log.Debugf("Received %d blocklists links", len(data.Links.Blocklists))
  504. }
  505. add_counters, delete_counters := makeAddAndDeleteCounters()
  506. // process deleted decisions
  507. if nbDeleted, err := a.HandleDeletedDecisionsV3(data.Deleted, delete_counters); err != nil {
  508. return err
  509. } else {
  510. log.Printf("capi/community-blocklist : %d explicit deletions", nbDeleted)
  511. }
  512. if len(data.New) == 0 {
  513. log.Infof("capi/community-blocklist : received 0 new entries (expected if you just installed crowdsec)")
  514. return nil
  515. }
  516. // create one alert for community blocklist using the first decision
  517. decisions := a.apiClient.Decisions.GetDecisionsFromGroups(data.New)
  518. alert := createAlertForDecision(decisions[0])
  519. alertsFromCapi := []*models.Alert{alert}
  520. alertsFromCapi = fillAlertsWithDecisions(alertsFromCapi, decisions, add_counters)
  521. err = a.SaveAlerts(alertsFromCapi, add_counters, delete_counters)
  522. if err != nil {
  523. return errors.Wrap(err, "while saving alerts")
  524. }
  525. // update blocklists
  526. if err := a.UpdateBlocklists(data.Links, add_counters); err != nil {
  527. return errors.Wrap(err, "while updating blocklists")
  528. }
  529. return nil
  530. }
  531. func (a *apic) SaveAlerts(alertsFromCapi []*models.Alert, add_counters map[string]map[string]int, delete_counters map[string]map[string]int) error {
  532. for idx, alert := range alertsFromCapi {
  533. alertsFromCapi[idx] = setAlertScenario(add_counters, delete_counters, alert)
  534. log.Debugf("%s has %d decisions", *alertsFromCapi[idx].Source.Scope, len(alertsFromCapi[idx].Decisions))
  535. if a.dbClient.Type == "sqlite" && (a.dbClient.WalMode == nil || !*a.dbClient.WalMode) {
  536. log.Warningf("sqlite is not using WAL mode, LAPI might become unresponsive when inserting the community blocklist")
  537. }
  538. alertID, inserted, deleted, err := a.dbClient.UpdateCommunityBlocklist(alertsFromCapi[idx])
  539. if err != nil {
  540. return errors.Wrapf(err, "while saving alert from %s", *alertsFromCapi[idx].Source.Scope)
  541. }
  542. log.Printf("%s : added %d entries, deleted %d entries (alert:%d)", *alertsFromCapi[idx].Source.Scope, inserted, deleted, alertID)
  543. }
  544. return nil
  545. }
  546. func (a *apic) UpdateBlocklists(links *modelscapi.GetDecisionsStreamResponseLinks, add_counters map[string]map[string]int) error {
  547. if links == nil {
  548. return nil
  549. }
  550. if links.Blocklists == nil {
  551. return nil
  552. }
  553. // we must use a different http client than apiClient's because the transport of apiClient is jwtTransport or here we have signed apis that are incompatibles
  554. // we can use the same baseUrl as the urls are absolute and the parse will take care of it
  555. defaultClient, err := apiclient.NewDefaultClient(a.apiClient.BaseURL, "", "", &http.Client{})
  556. if err != nil {
  557. return errors.Wrap(err, "while creating default client")
  558. }
  559. for _, blocklist := range links.Blocklists {
  560. if blocklist.Scope == nil {
  561. log.Warningf("blocklist has no scope")
  562. continue
  563. }
  564. if blocklist.Duration == nil {
  565. log.Warningf("blocklist has no duration")
  566. continue
  567. }
  568. decisions, err := defaultClient.Decisions.GetDecisionsFromBlocklist(context.Background(), blocklist)
  569. if err != nil {
  570. return errors.Wrapf(err, "while getting decisions from blocklist %s", *blocklist.Name)
  571. }
  572. if len(decisions) == 0 {
  573. log.Infof("blocklist %s has no decisions", *blocklist.Name)
  574. continue
  575. }
  576. alert := createAlertForDecision(decisions[0])
  577. alertsFromCapi := []*models.Alert{alert}
  578. alertsFromCapi = fillAlertsWithDecisions(alertsFromCapi, decisions, add_counters)
  579. err = a.SaveAlerts(alertsFromCapi, add_counters, nil)
  580. if err != nil {
  581. return errors.Wrapf(err, "while saving alert from blocklist %s", *blocklist.Name)
  582. }
  583. }
  584. return nil
  585. }
  586. func setAlertScenario(add_counters map[string]map[string]int, delete_counters map[string]map[string]int, alert *models.Alert) *models.Alert {
  587. if *alert.Source.Scope == types.CAPIOrigin {
  588. *alert.Source.Scope = SCOPE_CAPI_ALIAS_ALIAS
  589. alert.Scenario = types.StrPtr(fmt.Sprintf("update : +%d/-%d IPs", add_counters[types.CAPIOrigin]["all"], delete_counters[types.CAPIOrigin]["all"]))
  590. } else if *alert.Source.Scope == types.ListOrigin {
  591. *alert.Source.Scope = fmt.Sprintf("%s:%s", types.ListOrigin, *alert.Scenario)
  592. alert.Scenario = types.StrPtr(fmt.Sprintf("update : +%d/-%d IPs", add_counters[types.ListOrigin][*alert.Scenario], delete_counters[types.ListOrigin][*alert.Scenario]))
  593. }
  594. return alert
  595. }
  596. func (a *apic) Pull() error {
  597. defer types.CatchPanic("lapi/pullFromAPIC")
  598. toldOnce := false
  599. for {
  600. scenario, err := a.FetchScenariosListFromDB()
  601. if err != nil {
  602. log.Errorf("unable to fetch scenarios from db: %s", err)
  603. }
  604. if len(scenario) > 0 {
  605. break
  606. }
  607. if !toldOnce {
  608. log.Warning("scenario list is empty, will not pull yet")
  609. toldOnce = true
  610. }
  611. time.Sleep(1 * time.Second)
  612. }
  613. if err := a.PullTop(); err != nil {
  614. log.Errorf("capi pull top: %s", err)
  615. }
  616. log.Infof("Start pull from CrowdSec Central API (interval: %s once, then %s)", a.pullIntervalFirst.Round(time.Second), a.pullInterval)
  617. ticker := time.NewTicker(a.pullIntervalFirst)
  618. for {
  619. select {
  620. case <-ticker.C:
  621. ticker.Reset(a.pullInterval)
  622. if err := a.PullTop(); err != nil {
  623. log.Errorf("capi pull top: %s", err)
  624. continue
  625. }
  626. case <-a.pullTomb.Dying(): // if one apic routine is dying, do we kill the others?
  627. a.metricsTomb.Kill(nil)
  628. a.pushTomb.Kill(nil)
  629. return nil
  630. }
  631. }
  632. }
  633. func (a *apic) GetMetrics() (*models.Metrics, error) {
  634. metric := &models.Metrics{
  635. ApilVersion: types.StrPtr(cwversion.VersionStr()),
  636. Machines: make([]*models.MetricsAgentInfo, 0),
  637. Bouncers: make([]*models.MetricsBouncerInfo, 0),
  638. }
  639. machines, err := a.dbClient.ListMachines()
  640. if err != nil {
  641. return metric, err
  642. }
  643. bouncers, err := a.dbClient.ListBouncers()
  644. if err != nil {
  645. return metric, err
  646. }
  647. var lastpush string
  648. for _, machine := range machines {
  649. if machine.LastPush == nil {
  650. lastpush = time.Time{}.String()
  651. } else {
  652. lastpush = machine.LastPush.String()
  653. }
  654. m := &models.MetricsAgentInfo{
  655. Version: machine.Version,
  656. Name: machine.MachineId,
  657. LastUpdate: machine.UpdatedAt.String(),
  658. LastPush: lastpush,
  659. }
  660. metric.Machines = append(metric.Machines, m)
  661. }
  662. for _, bouncer := range bouncers {
  663. m := &models.MetricsBouncerInfo{
  664. Version: bouncer.Version,
  665. CustomName: bouncer.Name,
  666. Name: bouncer.Type,
  667. LastPull: bouncer.LastPull.String(),
  668. }
  669. metric.Bouncers = append(metric.Bouncers, m)
  670. }
  671. return metric, nil
  672. }
  673. func (a *apic) SendMetrics(stop chan (bool)) {
  674. defer types.CatchPanic("lapi/metricsToAPIC")
  675. ticker := time.NewTicker(a.metricsIntervalFirst)
  676. log.Infof("Start send metrics to CrowdSec Central API (interval: %s once, then %s)", a.metricsIntervalFirst.Round(time.Second), a.metricsInterval)
  677. for {
  678. metrics, err := a.GetMetrics()
  679. if err != nil {
  680. log.Errorf("unable to get metrics (%s), will retry", err)
  681. }
  682. _, _, err = a.apiClient.Metrics.Add(context.Background(), metrics)
  683. if err != nil {
  684. log.Errorf("capi metrics: failed: %s", err)
  685. } else {
  686. log.Infof("capi metrics: metrics sent successfully")
  687. }
  688. select {
  689. case <-stop:
  690. return
  691. case <-ticker.C:
  692. ticker.Reset(a.metricsInterval)
  693. case <-a.metricsTomb.Dying(): // if one apic routine is dying, do we kill the others?
  694. a.pullTomb.Kill(nil)
  695. a.pushTomb.Kill(nil)
  696. return
  697. }
  698. }
  699. }
  700. func (a *apic) Shutdown() {
  701. a.pushTomb.Kill(nil)
  702. a.pullTomb.Kill(nil)
  703. a.metricsTomb.Kill(nil)
  704. }
  705. func makeAddAndDeleteCounters() (map[string]map[string]int, map[string]map[string]int) {
  706. add_counters := make(map[string]map[string]int)
  707. add_counters[types.CAPIOrigin] = make(map[string]int)
  708. add_counters[types.ListOrigin] = make(map[string]int)
  709. delete_counters := make(map[string]map[string]int)
  710. delete_counters[types.CAPIOrigin] = make(map[string]int)
  711. delete_counters[types.ListOrigin] = make(map[string]int)
  712. return add_counters, delete_counters
  713. }
  714. func updateCounterForDecision(counter map[string]map[string]int, origin *string, scenario *string, totalDecisions int) {
  715. if *origin == types.CAPIOrigin {
  716. counter[*origin]["all"] += totalDecisions
  717. } else if *origin == types.ListOrigin {
  718. counter[*origin][*scenario] += totalDecisions
  719. } else {
  720. log.Warningf("Unknown origin %s", *origin)
  721. }
  722. }