apic.go 28 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857
  1. package apiserver
  2. import (
  3. "context"
  4. "fmt"
  5. "math/rand"
  6. "net/http"
  7. "net/url"
  8. "strconv"
  9. "strings"
  10. "sync"
  11. "time"
  12. "github.com/go-openapi/strfmt"
  13. "github.com/pkg/errors"
  14. log "github.com/sirupsen/logrus"
  15. "gopkg.in/tomb.v2"
  16. "github.com/crowdsecurity/crowdsec/pkg/apiclient"
  17. "github.com/crowdsecurity/crowdsec/pkg/csconfig"
  18. "github.com/crowdsecurity/crowdsec/pkg/cwversion"
  19. "github.com/crowdsecurity/crowdsec/pkg/database"
  20. "github.com/crowdsecurity/crowdsec/pkg/database/ent"
  21. "github.com/crowdsecurity/crowdsec/pkg/database/ent/alert"
  22. "github.com/crowdsecurity/crowdsec/pkg/database/ent/decision"
  23. "github.com/crowdsecurity/crowdsec/pkg/models"
  24. "github.com/crowdsecurity/crowdsec/pkg/modelscapi"
  25. "github.com/crowdsecurity/crowdsec/pkg/types"
  26. )
  27. var (
  28. pullIntervalDefault = time.Hour * 2
  29. pullIntervalDelta = 5 * time.Minute
  30. pushIntervalDefault = time.Second * 10
  31. pushIntervalDelta = time.Second * 7
  32. metricsIntervalDefault = time.Minute * 30
  33. metricsIntervalDelta = time.Minute * 15
  34. )
  35. var SCOPE_CAPI_ALIAS_ALIAS string = "crowdsecurity/community-blocklist" //we don't use "CAPI" directly, to make it less confusing for the user
  36. type apic struct {
  37. // when changing the intervals in tests, always set *First too
  38. // or they can be negative
  39. pullInterval time.Duration
  40. pullIntervalFirst time.Duration
  41. pushInterval time.Duration
  42. pushIntervalFirst time.Duration
  43. metricsInterval time.Duration
  44. metricsIntervalFirst time.Duration
  45. dbClient *database.Client
  46. apiClient *apiclient.ApiClient
  47. AlertsAddChan chan []*models.Alert
  48. mu sync.Mutex
  49. pushTomb tomb.Tomb
  50. pullTomb tomb.Tomb
  51. metricsTomb tomb.Tomb
  52. startup bool
  53. credentials *csconfig.ApiCredentialsCfg
  54. scenarioList []string
  55. consoleConfig *csconfig.ConsoleConfig
  56. }
  57. // randomDuration returns a duration value between d-delta and d+delta
  58. func randomDuration(d time.Duration, delta time.Duration) time.Duration {
  59. return time.Duration(float64(d) + float64(delta)*(-1.0+2.0*rand.Float64()))
  60. }
  61. func (a *apic) FetchScenariosListFromDB() ([]string, error) {
  62. scenarios := make([]string, 0)
  63. machines, err := a.dbClient.ListMachines()
  64. if err != nil {
  65. return nil, errors.Wrap(err, "while listing machines")
  66. }
  67. //merge all scenarios together
  68. for _, v := range machines {
  69. machineScenarios := strings.Split(v.Scenarios, ",")
  70. log.Debugf("%d scenarios for machine %d", len(machineScenarios), v.ID)
  71. for _, sv := range machineScenarios {
  72. if !types.InSlice(sv, scenarios) && sv != "" {
  73. scenarios = append(scenarios, sv)
  74. }
  75. }
  76. }
  77. log.Debugf("Returning list of scenarios : %+v", scenarios)
  78. return scenarios, nil
  79. }
  80. func decisionsToApiDecisions(decisions []*models.Decision) models.AddSignalsRequestItemDecisions {
  81. apiDecisions := models.AddSignalsRequestItemDecisions{}
  82. for _, decision := range decisions {
  83. x := &models.AddSignalsRequestItemDecisionsItem{
  84. Duration: types.StrPtr(*decision.Duration),
  85. ID: new(int64),
  86. Origin: types.StrPtr(*decision.Origin),
  87. Scenario: types.StrPtr(*decision.Scenario),
  88. Scope: types.StrPtr(*decision.Scope),
  89. //Simulated: *decision.Simulated,
  90. Type: types.StrPtr(*decision.Type),
  91. Until: decision.Until,
  92. Value: types.StrPtr(*decision.Value),
  93. UUID: decision.UUID,
  94. }
  95. *x.ID = decision.ID
  96. if decision.Simulated != nil {
  97. x.Simulated = *decision.Simulated
  98. }
  99. apiDecisions = append(apiDecisions, x)
  100. }
  101. return apiDecisions
  102. }
  103. func alertToSignal(alert *models.Alert, scenarioTrust string, shareContext bool) *models.AddSignalsRequestItem {
  104. signal := &models.AddSignalsRequestItem{
  105. Message: alert.Message,
  106. Scenario: alert.Scenario,
  107. ScenarioHash: alert.ScenarioHash,
  108. ScenarioVersion: alert.ScenarioVersion,
  109. Source: &models.AddSignalsRequestItemSource{
  110. AsName: alert.Source.AsName,
  111. AsNumber: alert.Source.AsNumber,
  112. Cn: alert.Source.Cn,
  113. IP: alert.Source.IP,
  114. Latitude: alert.Source.Latitude,
  115. Longitude: alert.Source.Longitude,
  116. Range: alert.Source.Range,
  117. Scope: alert.Source.Scope,
  118. Value: alert.Source.Value,
  119. },
  120. StartAt: alert.StartAt,
  121. StopAt: alert.StopAt,
  122. CreatedAt: alert.CreatedAt,
  123. MachineID: alert.MachineID,
  124. ScenarioTrust: scenarioTrust,
  125. Decisions: decisionsToApiDecisions(alert.Decisions),
  126. UUID: alert.UUID,
  127. }
  128. if shareContext {
  129. signal.Context = make([]*models.AddSignalsRequestItemContextItems0, 0)
  130. for _, meta := range alert.Meta {
  131. contextItem := models.AddSignalsRequestItemContextItems0{
  132. Key: meta.Key,
  133. Value: meta.Value,
  134. }
  135. signal.Context = append(signal.Context, &contextItem)
  136. }
  137. }
  138. return signal
  139. }
  140. func NewAPIC(config *csconfig.OnlineApiClientCfg, dbClient *database.Client, consoleConfig *csconfig.ConsoleConfig) (*apic, error) {
  141. var err error
  142. ret := &apic{
  143. AlertsAddChan: make(chan []*models.Alert),
  144. dbClient: dbClient,
  145. mu: sync.Mutex{},
  146. startup: true,
  147. credentials: config.Credentials,
  148. pullTomb: tomb.Tomb{},
  149. pushTomb: tomb.Tomb{},
  150. metricsTomb: tomb.Tomb{},
  151. scenarioList: make([]string, 0),
  152. consoleConfig: consoleConfig,
  153. pullInterval: pullIntervalDefault,
  154. pullIntervalFirst: randomDuration(pullIntervalDefault, pullIntervalDelta),
  155. pushInterval: pushIntervalDefault,
  156. pushIntervalFirst: randomDuration(pushIntervalDefault, pushIntervalDelta),
  157. metricsInterval: metricsIntervalDefault,
  158. metricsIntervalFirst: randomDuration(metricsIntervalDefault, metricsIntervalDelta),
  159. }
  160. password := strfmt.Password(config.Credentials.Password)
  161. apiURL, err := url.Parse(config.Credentials.URL)
  162. if err != nil {
  163. return nil, errors.Wrapf(err, "while parsing '%s'", config.Credentials.URL)
  164. }
  165. papiURL, err := url.Parse(config.Credentials.PapiURL)
  166. if err != nil {
  167. return nil, errors.Wrapf(err, "while parsing '%s'", config.Credentials.PapiURL)
  168. }
  169. ret.scenarioList, err = ret.FetchScenariosListFromDB()
  170. if err != nil {
  171. return nil, errors.Wrap(err, "while fetching scenarios from db")
  172. }
  173. ret.apiClient, err = apiclient.NewClient(&apiclient.Config{
  174. MachineID: config.Credentials.Login,
  175. Password: password,
  176. UserAgent: fmt.Sprintf("crowdsec/%s", cwversion.VersionStr()),
  177. URL: apiURL,
  178. PapiURL: papiURL,
  179. VersionPrefix: "v3",
  180. Scenarios: ret.scenarioList,
  181. UpdateScenario: ret.FetchScenariosListFromDB,
  182. })
  183. if err != nil {
  184. return nil, errors.Wrap(err, "while creating api client")
  185. }
  186. // The watcher will be authenticated by the RoundTripper the first time it will call CAPI
  187. // Explicit authentication will provoke an useless supplementary call to CAPI
  188. scenarios, err := ret.FetchScenariosListFromDB()
  189. if err != nil {
  190. return ret, errors.Wrapf(err, "get scenario in db: %s", err)
  191. }
  192. authResp, _, err := ret.apiClient.Auth.AuthenticateWatcher(context.Background(), models.WatcherAuthRequest{
  193. MachineID: &config.Credentials.Login,
  194. Password: &password,
  195. Scenarios: scenarios,
  196. })
  197. if err != nil {
  198. return ret, errors.Wrapf(err, "authenticate watcher (%s)", config.Credentials.Login)
  199. }
  200. if err := ret.apiClient.GetClient().Transport.(*apiclient.JWTTransport).Expiration.UnmarshalText([]byte(authResp.Expire)); err != nil {
  201. return ret, errors.Wrap(err, "unable to parse jwt expiration")
  202. }
  203. ret.apiClient.GetClient().Transport.(*apiclient.JWTTransport).Token = authResp.Token
  204. return ret, err
  205. }
  206. // keep track of all alerts in cache and push it to CAPI every PushInterval.
  207. func (a *apic) Push() error {
  208. defer types.CatchPanic("lapi/pushToAPIC")
  209. var cache models.AddSignalsRequest
  210. ticker := time.NewTicker(a.pushIntervalFirst)
  211. log.Infof("Start push to CrowdSec Central API (interval: %s once, then %s)", a.pushIntervalFirst.Round(time.Second), a.pushInterval)
  212. for {
  213. select {
  214. case <-a.pushTomb.Dying(): // if one apic routine is dying, do we kill the others?
  215. a.pullTomb.Kill(nil)
  216. a.metricsTomb.Kill(nil)
  217. log.Infof("push tomb is dying, sending cache (%d elements) before exiting", len(cache))
  218. if len(cache) == 0 {
  219. return nil
  220. }
  221. go a.Send(&cache)
  222. return nil
  223. case <-ticker.C:
  224. ticker.Reset(a.pushInterval)
  225. if len(cache) > 0 {
  226. a.mu.Lock()
  227. cacheCopy := cache
  228. cache = make(models.AddSignalsRequest, 0)
  229. a.mu.Unlock()
  230. log.Infof("Signal push: %d signals to push", len(cacheCopy))
  231. go a.Send(&cacheCopy)
  232. }
  233. case alerts := <-a.AlertsAddChan:
  234. var signals []*models.AddSignalsRequestItem
  235. for _, alert := range alerts {
  236. if ok := shouldShareAlert(alert, a.consoleConfig); ok {
  237. signals = append(signals, alertToSignal(alert, getScenarioTrustOfAlert(alert), *a.consoleConfig.ShareContext))
  238. }
  239. }
  240. a.mu.Lock()
  241. cache = append(cache, signals...)
  242. a.mu.Unlock()
  243. }
  244. }
  245. }
  246. func getScenarioTrustOfAlert(alert *models.Alert) string {
  247. scenarioTrust := "certified"
  248. if alert.ScenarioHash == nil || *alert.ScenarioHash == "" {
  249. scenarioTrust = "custom"
  250. } else if alert.ScenarioVersion == nil || *alert.ScenarioVersion == "" || *alert.ScenarioVersion == "?" {
  251. scenarioTrust = "tainted"
  252. }
  253. if len(alert.Decisions) > 0 {
  254. if *alert.Decisions[0].Origin == types.CscliOrigin {
  255. scenarioTrust = "manual"
  256. }
  257. }
  258. return scenarioTrust
  259. }
  260. func shouldShareAlert(alert *models.Alert, consoleConfig *csconfig.ConsoleConfig) bool {
  261. if *alert.Simulated {
  262. log.Debugf("simulation enabled for alert (id:%d), will not be sent to CAPI", alert.ID)
  263. return false
  264. }
  265. switch scenarioTrust := getScenarioTrustOfAlert(alert); scenarioTrust {
  266. case "manual":
  267. if !*consoleConfig.ShareManualDecisions {
  268. log.Debugf("manual decision generated an alert, doesn't send it to CAPI because options is disabled")
  269. return false
  270. }
  271. case "tainted":
  272. if !*consoleConfig.ShareTaintedScenarios {
  273. log.Debugf("tainted scenario generated an alert, doesn't send it to CAPI because options is disabled")
  274. return false
  275. }
  276. case "custom":
  277. if !*consoleConfig.ShareCustomScenarios {
  278. log.Debugf("custom scenario generated an alert, doesn't send it to CAPI because options is disabled")
  279. return false
  280. }
  281. }
  282. return true
  283. }
  284. func (a *apic) Send(cacheOrig *models.AddSignalsRequest) {
  285. /*we do have a problem with this :
  286. The apic.Push background routine reads from alertToPush chan.
  287. This chan is filled by Controller.CreateAlert
  288. If the chan apic.Send hangs, the alertToPush chan will become full,
  289. with means that Controller.CreateAlert is going to hang, blocking API worker(s).
  290. So instead, we prefer to cancel write.
  291. I don't know enough about gin to tell how much of an issue it can be.
  292. */
  293. var cache []*models.AddSignalsRequestItem = *cacheOrig
  294. var send models.AddSignalsRequest
  295. bulkSize := 50
  296. pageStart := 0
  297. pageEnd := bulkSize
  298. for {
  299. if pageEnd >= len(cache) {
  300. send = cache[pageStart:]
  301. ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
  302. defer cancel()
  303. _, _, err := a.apiClient.Signal.Add(ctx, &send)
  304. if err != nil {
  305. log.Errorf("sending signal to central API: %s", err)
  306. return
  307. }
  308. break
  309. }
  310. send = cache[pageStart:pageEnd]
  311. ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
  312. defer cancel()
  313. _, _, err := a.apiClient.Signal.Add(ctx, &send)
  314. if err != nil {
  315. //we log it here as well, because the return value of func might be discarded
  316. log.Errorf("sending signal to central API: %s", err)
  317. }
  318. pageStart += bulkSize
  319. pageEnd += bulkSize
  320. }
  321. }
  322. func (a *apic) CAPIPullIsOld() (bool, error) {
  323. /*only pull community blocklist if it's older than 1h30 */
  324. alerts := a.dbClient.Ent.Alert.Query()
  325. alerts = alerts.Where(alert.HasDecisionsWith(decision.OriginEQ(database.CapiMachineID)))
  326. alerts = alerts.Where(alert.CreatedAtGTE(time.Now().UTC().Add(-time.Duration(1*time.Hour + 30*time.Minute)))) //nolint:unconvert
  327. count, err := alerts.Count(a.dbClient.CTX)
  328. if err != nil {
  329. return false, errors.Wrap(err, "while looking for CAPI alert")
  330. }
  331. if count > 0 {
  332. log.Printf("last CAPI pull is newer than 1h30, skip.")
  333. return false, nil
  334. }
  335. return true, nil
  336. }
  337. func (a *apic) HandleDeletedDecisions(deletedDecisions []*models.Decision, delete_counters map[string]map[string]int) (int, error) {
  338. var filter map[string][]string
  339. var nbDeleted int
  340. for _, decision := range deletedDecisions {
  341. if strings.ToLower(*decision.Scope) == "ip" {
  342. filter = make(map[string][]string, 1)
  343. filter["value"] = []string{*decision.Value}
  344. } else {
  345. filter = make(map[string][]string, 3)
  346. filter["value"] = []string{*decision.Value}
  347. filter["type"] = []string{*decision.Type}
  348. filter["scopes"] = []string{*decision.Scope}
  349. }
  350. filter["origin"] = []string{*decision.Origin}
  351. dbCliRet, _, err := a.dbClient.SoftDeleteDecisionsWithFilter(filter)
  352. if err != nil {
  353. return 0, errors.Wrap(err, "deleting decisions error")
  354. }
  355. dbCliDel, err := strconv.Atoi(dbCliRet)
  356. if err != nil {
  357. return 0, errors.Wrapf(err, "converting db ret %d", dbCliDel)
  358. }
  359. updateCounterForDecision(delete_counters, decision.Origin, decision.Scenario, dbCliDel)
  360. nbDeleted += dbCliDel
  361. }
  362. return nbDeleted, nil
  363. }
  364. func (a *apic) HandleDeletedDecisionsV3(deletedDecisions []*modelscapi.GetDecisionsStreamResponseDeletedItem, delete_counters map[string]map[string]int) (int, error) {
  365. var filter map[string][]string
  366. var nbDeleted int
  367. for _, decisions := range deletedDecisions {
  368. scope := decisions.Scope
  369. for _, decision := range decisions.Decisions {
  370. if strings.ToLower(*scope) == "ip" {
  371. filter = make(map[string][]string, 1)
  372. filter["value"] = []string{decision}
  373. } else {
  374. filter = make(map[string][]string, 2)
  375. filter["value"] = []string{decision}
  376. filter["scopes"] = []string{*scope}
  377. }
  378. filter["origin"] = []string{types.CAPIOrigin}
  379. dbCliRet, _, err := a.dbClient.SoftDeleteDecisionsWithFilter(filter)
  380. if err != nil {
  381. return 0, errors.Wrap(err, "deleting decisions error")
  382. }
  383. dbCliDel, err := strconv.Atoi(dbCliRet)
  384. if err != nil {
  385. return 0, errors.Wrapf(err, "converting db ret %d", dbCliDel)
  386. }
  387. updateCounterForDecision(delete_counters, types.StrPtr(types.CAPIOrigin), nil, dbCliDel)
  388. nbDeleted += dbCliDel
  389. }
  390. }
  391. return nbDeleted, nil
  392. }
  393. func createAlertsForDecisions(decisions []*models.Decision) []*models.Alert {
  394. newAlerts := make([]*models.Alert, 0)
  395. for _, decision := range decisions {
  396. found := false
  397. for _, sub := range newAlerts {
  398. if sub.Source.Scope == nil {
  399. log.Warningf("nil scope in %+v", sub)
  400. continue
  401. }
  402. if *decision.Origin == types.CAPIOrigin {
  403. if *sub.Source.Scope == types.CAPIOrigin {
  404. found = true
  405. break
  406. }
  407. } else if *decision.Origin == types.ListOrigin {
  408. if *sub.Source.Scope == *decision.Origin {
  409. if sub.Scenario == nil {
  410. log.Warningf("nil scenario in %+v", sub)
  411. }
  412. if *sub.Scenario == *decision.Scenario {
  413. found = true
  414. break
  415. }
  416. }
  417. } else {
  418. log.Warningf("unknown origin %s : %+v", *decision.Origin, decision)
  419. }
  420. }
  421. if !found {
  422. log.Debugf("Create entry for origin:%s scenario:%s", *decision.Origin, *decision.Scenario)
  423. newAlerts = append(newAlerts, createAlertForDecision(decision))
  424. }
  425. }
  426. return newAlerts
  427. }
  428. func createAlertForDecision(decision *models.Decision) *models.Alert {
  429. newAlert := &models.Alert{}
  430. newAlert.Source = &models.Source{}
  431. newAlert.Source.Scope = types.StrPtr("")
  432. if *decision.Origin == types.CAPIOrigin { //to make things more user friendly, we replace CAPI with community-blocklist
  433. newAlert.Scenario = types.StrPtr(types.CAPIOrigin)
  434. newAlert.Source.Scope = types.StrPtr(types.CAPIOrigin)
  435. } else if *decision.Origin == types.ListOrigin {
  436. newAlert.Scenario = types.StrPtr(*decision.Scenario)
  437. newAlert.Source.Scope = types.StrPtr(types.ListOrigin)
  438. } else {
  439. log.Warningf("unknown origin %s", *decision.Origin)
  440. }
  441. newAlert.Message = types.StrPtr("")
  442. newAlert.Source.Value = types.StrPtr("")
  443. newAlert.StartAt = types.StrPtr(time.Now().UTC().Format(time.RFC3339))
  444. newAlert.StopAt = types.StrPtr(time.Now().UTC().Format(time.RFC3339))
  445. newAlert.Capacity = types.Int32Ptr(0)
  446. newAlert.Simulated = types.BoolPtr(false)
  447. newAlert.EventsCount = types.Int32Ptr(0)
  448. newAlert.Leakspeed = types.StrPtr("")
  449. newAlert.ScenarioHash = types.StrPtr("")
  450. newAlert.ScenarioVersion = types.StrPtr("")
  451. newAlert.MachineID = database.CapiMachineID
  452. return newAlert
  453. }
  454. // This function takes in list of parent alerts and decisions and then pairs them up.
  455. func fillAlertsWithDecisions(alerts []*models.Alert, decisions []*models.Decision, add_counters map[string]map[string]int) []*models.Alert {
  456. for _, decision := range decisions {
  457. //count and create separate alerts for each list
  458. updateCounterForDecision(add_counters, decision.Origin, decision.Scenario, 1)
  459. /*CAPI might send lower case scopes, unify it.*/
  460. switch strings.ToLower(*decision.Scope) {
  461. case "ip":
  462. *decision.Scope = types.Ip
  463. case "range":
  464. *decision.Scope = types.Range
  465. }
  466. found := false
  467. //add the individual decisions to the right list
  468. for idx, alert := range alerts {
  469. if *decision.Origin == types.CAPIOrigin {
  470. if *alert.Source.Scope == types.CAPIOrigin {
  471. alerts[idx].Decisions = append(alerts[idx].Decisions, decision)
  472. found = true
  473. break
  474. }
  475. } else if *decision.Origin == types.ListOrigin {
  476. if *alert.Source.Scope == types.ListOrigin && *alert.Scenario == *decision.Scenario {
  477. alerts[idx].Decisions = append(alerts[idx].Decisions, decision)
  478. found = true
  479. break
  480. }
  481. } else {
  482. log.Warningf("unknown origin %s", *decision.Origin)
  483. }
  484. }
  485. if !found {
  486. log.Warningf("Orphaned decision for %s - %s", *decision.Origin, *decision.Scenario)
  487. }
  488. }
  489. return alerts
  490. }
  491. // we receive a list of decisions and links for blocklist and we need to create a list of alerts :
  492. // one alert for "community blocklist"
  493. // one alert per list we're subscribed to
  494. func (a *apic) PullTop() error {
  495. var err error
  496. if lastPullIsOld, err := a.CAPIPullIsOld(); err != nil {
  497. return err
  498. } else if !lastPullIsOld {
  499. return nil
  500. }
  501. log.Infof("Starting community-blocklist update")
  502. data, _, err := a.apiClient.Decisions.GetStreamV3(context.Background(), apiclient.DecisionsStreamOpts{Startup: a.startup})
  503. if err != nil {
  504. return errors.Wrap(err, "get stream")
  505. }
  506. a.startup = false
  507. /*to count additions/deletions across lists*/
  508. log.Debugf("Received %d new decisions", len(data.New))
  509. log.Debugf("Received %d deleted decisions", len(data.Deleted))
  510. if data.Links != nil {
  511. log.Debugf("Received %d blocklists links", len(data.Links.Blocklists))
  512. }
  513. add_counters, delete_counters := makeAddAndDeleteCounters()
  514. // process deleted decisions
  515. if nbDeleted, err := a.HandleDeletedDecisionsV3(data.Deleted, delete_counters); err != nil {
  516. return err
  517. } else {
  518. log.Printf("capi/community-blocklist : %d explicit deletions", nbDeleted)
  519. }
  520. if len(data.New) == 0 {
  521. log.Infof("capi/community-blocklist : received 0 new entries (expected if you just installed crowdsec)")
  522. return nil
  523. }
  524. // create one alert for community blocklist using the first decision
  525. decisions := a.apiClient.Decisions.GetDecisionsFromGroups(data.New)
  526. alert := createAlertForDecision(decisions[0])
  527. alertsFromCapi := []*models.Alert{alert}
  528. alertsFromCapi = fillAlertsWithDecisions(alertsFromCapi, decisions, add_counters)
  529. err = a.SaveAlerts(alertsFromCapi, add_counters, delete_counters)
  530. if err != nil {
  531. return errors.Wrap(err, "while saving alerts")
  532. }
  533. // update blocklists
  534. if err := a.UpdateBlocklists(data.Links, add_counters); err != nil {
  535. return errors.Wrap(err, "while updating blocklists")
  536. }
  537. return nil
  538. }
  539. func (a *apic) SaveAlerts(alertsFromCapi []*models.Alert, add_counters map[string]map[string]int, delete_counters map[string]map[string]int) error {
  540. for idx, alert := range alertsFromCapi {
  541. alertsFromCapi[idx] = setAlertScenario(add_counters, delete_counters, alert)
  542. log.Debugf("%s has %d decisions", *alertsFromCapi[idx].Source.Scope, len(alertsFromCapi[idx].Decisions))
  543. if a.dbClient.Type == "sqlite" && (a.dbClient.WalMode == nil || !*a.dbClient.WalMode) {
  544. log.Warningf("sqlite is not using WAL mode, LAPI might become unresponsive when inserting the community blocklist")
  545. }
  546. alertID, inserted, deleted, err := a.dbClient.UpdateCommunityBlocklist(alertsFromCapi[idx])
  547. if err != nil {
  548. return errors.Wrapf(err, "while saving alert from %s", *alertsFromCapi[idx].Source.Scope)
  549. }
  550. log.Printf("%s : added %d entries, deleted %d entries (alert:%d)", *alertsFromCapi[idx].Source.Scope, inserted, deleted, alertID)
  551. }
  552. return nil
  553. }
  554. func (a *apic) ShouldForcePullBlocklist(blocklist *modelscapi.BlocklistLink) (bool, error) {
  555. // we should force pull if the blocklist decisions are about to expire or there's no decision in the db
  556. alertQuery := a.dbClient.Ent.Alert.Query()
  557. alertQuery.Where(alert.SourceScopeEQ(fmt.Sprintf("%s:%s", types.ListOrigin, *blocklist.Name)))
  558. alertQuery.Order(ent.Desc(alert.FieldCreatedAt))
  559. alertInstance, err := alertQuery.First(context.Background())
  560. if err != nil {
  561. if ent.IsNotFound(err) {
  562. log.Debugf("no alert found for %s, force refresh", *blocklist.Name)
  563. return true, nil
  564. }
  565. return false, errors.Wrap(err, "while getting alert")
  566. }
  567. decisionQuery := a.dbClient.Ent.Decision.Query()
  568. decisionQuery.Where(decision.HasOwnerWith(alert.IDEQ(alertInstance.ID)))
  569. firstDecision, err := decisionQuery.First(context.Background())
  570. if err != nil {
  571. if ent.IsNotFound(err) {
  572. log.Debugf("no decision found for %s, force refresh", *blocklist.Name)
  573. return true, nil
  574. }
  575. return false, errors.Wrap(err, "while getting decision")
  576. }
  577. if firstDecision == nil || firstDecision.Until == nil || firstDecision.Until.Sub(time.Now().UTC()) < (a.pullInterval+15*time.Minute) {
  578. log.Debugf("at least one decision found for %s, expire soon, force refresh", *blocklist.Name)
  579. return true, nil
  580. }
  581. return false, nil
  582. }
  583. func (a *apic) UpdateBlocklists(links *modelscapi.GetDecisionsStreamResponseLinks, add_counters map[string]map[string]int) error {
  584. if links == nil {
  585. return nil
  586. }
  587. if links.Blocklists == nil {
  588. return nil
  589. }
  590. // we must use a different http client than apiClient's because the transport of apiClient is jwtTransport or here we have signed apis that are incompatibles
  591. // we can use the same baseUrl as the urls are absolute and the parse will take care of it
  592. defaultClient, err := apiclient.NewDefaultClient(a.apiClient.BaseURL, "", "", nil)
  593. if err != nil {
  594. return errors.Wrap(err, "while creating default client")
  595. }
  596. for _, blocklist := range links.Blocklists {
  597. if blocklist.Scope == nil {
  598. log.Warningf("blocklist has no scope")
  599. continue
  600. }
  601. if blocklist.Duration == nil {
  602. log.Warningf("blocklist has no duration")
  603. continue
  604. }
  605. forcePull, err := a.ShouldForcePullBlocklist(blocklist)
  606. if err != nil {
  607. return errors.Wrapf(err, "while checking if we should force pull blocklist %s", *blocklist.Name)
  608. }
  609. blocklistConfigItemName := fmt.Sprintf("blocklist:%s:last_pull", *blocklist.Name)
  610. var lastPullTimestamp *string
  611. if !forcePull {
  612. lastPullTimestamp, err = a.dbClient.GetConfigItem(blocklistConfigItemName)
  613. if err != nil {
  614. return errors.Wrapf(err, "while getting last pull timestamp for blocklist %s", *blocklist.Name)
  615. }
  616. }
  617. decisions, has_changed, err := defaultClient.Decisions.GetDecisionsFromBlocklist(context.Background(), blocklist, lastPullTimestamp)
  618. if err != nil {
  619. return errors.Wrapf(err, "while getting decisions from blocklist %s", *blocklist.Name)
  620. }
  621. if !has_changed {
  622. if lastPullTimestamp == nil {
  623. log.Infof("blocklist %s hasn't been modified or there was an error reading it, skipping", *blocklist.Name)
  624. } else {
  625. log.Infof("blocklist %s hasn't been modified since %s, skipping", *blocklist.Name, *lastPullTimestamp)
  626. }
  627. continue
  628. }
  629. err = a.dbClient.SetConfigItem(blocklistConfigItemName, time.Now().UTC().Format(http.TimeFormat))
  630. if err != nil {
  631. return errors.Wrapf(err, "while setting last pull timestamp for blocklist %s", *blocklist.Name)
  632. }
  633. if len(decisions) == 0 {
  634. log.Infof("blocklist %s has no decisions", *blocklist.Name)
  635. continue
  636. }
  637. alert := createAlertForDecision(decisions[0])
  638. alertsFromCapi := []*models.Alert{alert}
  639. alertsFromCapi = fillAlertsWithDecisions(alertsFromCapi, decisions, add_counters)
  640. err = a.SaveAlerts(alertsFromCapi, add_counters, nil)
  641. if err != nil {
  642. return errors.Wrapf(err, "while saving alert from blocklist %s", *blocklist.Name)
  643. }
  644. }
  645. return nil
  646. }
  647. func setAlertScenario(add_counters map[string]map[string]int, delete_counters map[string]map[string]int, alert *models.Alert) *models.Alert {
  648. if *alert.Source.Scope == types.CAPIOrigin {
  649. *alert.Source.Scope = SCOPE_CAPI_ALIAS_ALIAS
  650. alert.Scenario = types.StrPtr(fmt.Sprintf("update : +%d/-%d IPs", add_counters[types.CAPIOrigin]["all"], delete_counters[types.CAPIOrigin]["all"]))
  651. } else if *alert.Source.Scope == types.ListOrigin {
  652. *alert.Source.Scope = fmt.Sprintf("%s:%s", types.ListOrigin, *alert.Scenario)
  653. alert.Scenario = types.StrPtr(fmt.Sprintf("update : +%d/-%d IPs", add_counters[types.ListOrigin][*alert.Scenario], delete_counters[types.ListOrigin][*alert.Scenario]))
  654. }
  655. return alert
  656. }
  657. func (a *apic) Pull() error {
  658. defer types.CatchPanic("lapi/pullFromAPIC")
  659. toldOnce := false
  660. for {
  661. scenario, err := a.FetchScenariosListFromDB()
  662. if err != nil {
  663. log.Errorf("unable to fetch scenarios from db: %s", err)
  664. }
  665. if len(scenario) > 0 {
  666. break
  667. }
  668. if !toldOnce {
  669. log.Warning("scenario list is empty, will not pull yet")
  670. toldOnce = true
  671. }
  672. time.Sleep(1 * time.Second)
  673. }
  674. if err := a.PullTop(); err != nil {
  675. log.Errorf("capi pull top: %s", err)
  676. }
  677. log.Infof("Start pull from CrowdSec Central API (interval: %s once, then %s)", a.pullIntervalFirst.Round(time.Second), a.pullInterval)
  678. ticker := time.NewTicker(a.pullIntervalFirst)
  679. for {
  680. select {
  681. case <-ticker.C:
  682. ticker.Reset(a.pullInterval)
  683. if err := a.PullTop(); err != nil {
  684. log.Errorf("capi pull top: %s", err)
  685. continue
  686. }
  687. case <-a.pullTomb.Dying(): // if one apic routine is dying, do we kill the others?
  688. a.metricsTomb.Kill(nil)
  689. a.pushTomb.Kill(nil)
  690. return nil
  691. }
  692. }
  693. }
  694. func (a *apic) GetMetrics() (*models.Metrics, error) {
  695. metric := &models.Metrics{
  696. ApilVersion: types.StrPtr(cwversion.VersionStr()),
  697. Machines: make([]*models.MetricsAgentInfo, 0),
  698. Bouncers: make([]*models.MetricsBouncerInfo, 0),
  699. }
  700. machines, err := a.dbClient.ListMachines()
  701. if err != nil {
  702. return metric, err
  703. }
  704. bouncers, err := a.dbClient.ListBouncers()
  705. if err != nil {
  706. return metric, err
  707. }
  708. var lastpush string
  709. for _, machine := range machines {
  710. if machine.LastPush == nil {
  711. lastpush = time.Time{}.String()
  712. } else {
  713. lastpush = machine.LastPush.String()
  714. }
  715. m := &models.MetricsAgentInfo{
  716. Version: machine.Version,
  717. Name: machine.MachineId,
  718. LastUpdate: machine.UpdatedAt.String(),
  719. LastPush: lastpush,
  720. }
  721. metric.Machines = append(metric.Machines, m)
  722. }
  723. for _, bouncer := range bouncers {
  724. m := &models.MetricsBouncerInfo{
  725. Version: bouncer.Version,
  726. CustomName: bouncer.Name,
  727. Name: bouncer.Type,
  728. LastPull: bouncer.LastPull.String(),
  729. }
  730. metric.Bouncers = append(metric.Bouncers, m)
  731. }
  732. return metric, nil
  733. }
  734. func (a *apic) SendMetrics(stop chan (bool)) {
  735. defer types.CatchPanic("lapi/metricsToAPIC")
  736. ticker := time.NewTicker(a.metricsIntervalFirst)
  737. log.Infof("Start send metrics to CrowdSec Central API (interval: %s once, then %s)", a.metricsIntervalFirst.Round(time.Second), a.metricsInterval)
  738. for {
  739. metrics, err := a.GetMetrics()
  740. if err != nil {
  741. log.Errorf("unable to get metrics (%s), will retry", err)
  742. }
  743. _, _, err = a.apiClient.Metrics.Add(context.Background(), metrics)
  744. if err != nil {
  745. log.Errorf("capi metrics: failed: %s", err)
  746. } else {
  747. log.Infof("capi metrics: metrics sent successfully")
  748. }
  749. select {
  750. case <-stop:
  751. return
  752. case <-ticker.C:
  753. ticker.Reset(a.metricsInterval)
  754. case <-a.metricsTomb.Dying(): // if one apic routine is dying, do we kill the others?
  755. a.pullTomb.Kill(nil)
  756. a.pushTomb.Kill(nil)
  757. return
  758. }
  759. }
  760. }
  761. func (a *apic) Shutdown() {
  762. a.pushTomb.Kill(nil)
  763. a.pullTomb.Kill(nil)
  764. a.metricsTomb.Kill(nil)
  765. }
  766. func makeAddAndDeleteCounters() (map[string]map[string]int, map[string]map[string]int) {
  767. add_counters := make(map[string]map[string]int)
  768. add_counters[types.CAPIOrigin] = make(map[string]int)
  769. add_counters[types.ListOrigin] = make(map[string]int)
  770. delete_counters := make(map[string]map[string]int)
  771. delete_counters[types.CAPIOrigin] = make(map[string]int)
  772. delete_counters[types.ListOrigin] = make(map[string]int)
  773. return add_counters, delete_counters
  774. }
  775. func updateCounterForDecision(counter map[string]map[string]int, origin *string, scenario *string, totalDecisions int) {
  776. if *origin == types.CAPIOrigin {
  777. counter[*origin]["all"] += totalDecisions
  778. } else if *origin == types.ListOrigin {
  779. counter[*origin][*scenario] += totalDecisions
  780. } else {
  781. log.Warningf("Unknown origin %s", *origin)
  782. }
  783. }