apic.go 21 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643
  1. package apiserver
  2. import (
  3. "context"
  4. "fmt"
  5. "math/rand"
  6. "net/url"
  7. "strconv"
  8. "strings"
  9. "sync"
  10. "time"
  11. "github.com/go-openapi/strfmt"
  12. "github.com/pkg/errors"
  13. log "github.com/sirupsen/logrus"
  14. "gopkg.in/tomb.v2"
  15. "github.com/crowdsecurity/crowdsec/pkg/apiclient"
  16. "github.com/crowdsecurity/crowdsec/pkg/csconfig"
  17. "github.com/crowdsecurity/crowdsec/pkg/cwversion"
  18. "github.com/crowdsecurity/crowdsec/pkg/database"
  19. "github.com/crowdsecurity/crowdsec/pkg/database/ent/alert"
  20. "github.com/crowdsecurity/crowdsec/pkg/database/ent/decision"
  21. "github.com/crowdsecurity/crowdsec/pkg/models"
  22. "github.com/crowdsecurity/crowdsec/pkg/types"
  23. )
  24. var (
  25. pullIntervalDefault = time.Hour * 2
  26. pullIntervalDelta = 5 * time.Minute
  27. pushIntervalDefault = time.Second * 30
  28. pushIntervalDelta = time.Second * 15
  29. metricsIntervalDefault = time.Minute * 30
  30. metricsIntervalDelta = time.Minute * 15
  31. )
  32. var SCOPE_CAPI string = "CAPI"
  33. var SCOPE_CAPI_ALIAS string = "crowdsecurity/community-blocklist" //we don't use "CAPI" directly, to make it less confusing for the user
  34. var SCOPE_LISTS string = "lists"
  35. type apic struct {
  36. // when changing the intervals in tests, always set *First too
  37. // or they can be negative
  38. pullInterval time.Duration
  39. pullIntervalFirst time.Duration
  40. pushInterval time.Duration
  41. pushIntervalFirst time.Duration
  42. metricsInterval time.Duration
  43. metricsIntervalFirst time.Duration
  44. dbClient *database.Client
  45. apiClient *apiclient.ApiClient
  46. alertToPush chan []*models.Alert
  47. mu sync.Mutex
  48. pushTomb tomb.Tomb
  49. pullTomb tomb.Tomb
  50. metricsTomb tomb.Tomb
  51. startup bool
  52. credentials *csconfig.ApiCredentialsCfg
  53. scenarioList []string
  54. consoleConfig *csconfig.ConsoleConfig
  55. }
  56. // randomDuration returns a duration value between d-delta and d+delta
  57. func randomDuration(d time.Duration, delta time.Duration) time.Duration {
  58. return time.Duration(float64(d) + float64(delta)*(-1.0+2.0*rand.Float64()))
  59. }
  60. func (a *apic) FetchScenariosListFromDB() ([]string, error) {
  61. scenarios := make([]string, 0)
  62. machines, err := a.dbClient.ListMachines()
  63. if err != nil {
  64. return nil, errors.Wrap(err, "while listing machines")
  65. }
  66. //merge all scenarios together
  67. for _, v := range machines {
  68. machineScenarios := strings.Split(v.Scenarios, ",")
  69. log.Debugf("%d scenarios for machine %d", len(machineScenarios), v.ID)
  70. for _, sv := range machineScenarios {
  71. if !types.InSlice(sv, scenarios) && sv != "" {
  72. scenarios = append(scenarios, sv)
  73. }
  74. }
  75. }
  76. log.Debugf("Returning list of scenarios : %+v", scenarios)
  77. return scenarios, nil
  78. }
  79. func alertToSignal(alert *models.Alert, scenarioTrust string, shareContext bool) *models.AddSignalsRequestItem {
  80. signal := &models.AddSignalsRequestItem{
  81. Message: alert.Message,
  82. Scenario: alert.Scenario,
  83. ScenarioHash: alert.ScenarioHash,
  84. ScenarioVersion: alert.ScenarioVersion,
  85. Source: alert.Source,
  86. StartAt: alert.StartAt,
  87. StopAt: alert.StopAt,
  88. CreatedAt: alert.CreatedAt,
  89. MachineID: alert.MachineID,
  90. ScenarioTrust: scenarioTrust,
  91. }
  92. if shareContext {
  93. signal.Context = make([]*models.AddSignalsRequestItemContextItems0, 0)
  94. for _, meta := range alert.Meta {
  95. contextItem := models.AddSignalsRequestItemContextItems0{
  96. Key: meta.Key,
  97. Value: meta.Value,
  98. }
  99. signal.Context = append(signal.Context, &contextItem)
  100. }
  101. }
  102. return signal
  103. }
  104. func NewAPIC(config *csconfig.OnlineApiClientCfg, dbClient *database.Client, consoleConfig *csconfig.ConsoleConfig) (*apic, error) {
  105. var err error
  106. ret := &apic{
  107. alertToPush: make(chan []*models.Alert),
  108. dbClient: dbClient,
  109. mu: sync.Mutex{},
  110. startup: true,
  111. credentials: config.Credentials,
  112. pullTomb: tomb.Tomb{},
  113. pushTomb: tomb.Tomb{},
  114. metricsTomb: tomb.Tomb{},
  115. scenarioList: make([]string, 0),
  116. consoleConfig: consoleConfig,
  117. pullInterval: pullIntervalDefault,
  118. pullIntervalFirst: randomDuration(pullIntervalDefault, pullIntervalDelta),
  119. pushInterval: pushIntervalDefault,
  120. pushIntervalFirst: randomDuration(pushIntervalDefault, pushIntervalDelta),
  121. metricsInterval: metricsIntervalDefault,
  122. metricsIntervalFirst: randomDuration(metricsIntervalDefault, metricsIntervalDelta),
  123. }
  124. password := strfmt.Password(config.Credentials.Password)
  125. apiURL, err := url.Parse(config.Credentials.URL)
  126. if err != nil {
  127. return nil, errors.Wrapf(err, "while parsing '%s'", config.Credentials.URL)
  128. }
  129. ret.scenarioList, err = ret.FetchScenariosListFromDB()
  130. if err != nil {
  131. return nil, errors.Wrap(err, "while fetching scenarios from db")
  132. }
  133. ret.apiClient, err = apiclient.NewClient(&apiclient.Config{
  134. MachineID: config.Credentials.Login,
  135. Password: password,
  136. UserAgent: fmt.Sprintf("crowdsec/%s", cwversion.VersionStr()),
  137. URL: apiURL,
  138. VersionPrefix: "v2",
  139. Scenarios: ret.scenarioList,
  140. UpdateScenario: ret.FetchScenariosListFromDB,
  141. })
  142. return ret, err
  143. }
  144. // keep track of all alerts in cache and push it to CAPI every PushInterval.
  145. func (a *apic) Push() error {
  146. defer types.CatchPanic("lapi/pushToAPIC")
  147. var cache models.AddSignalsRequest
  148. ticker := time.NewTicker(a.pushIntervalFirst)
  149. log.Infof("Start push to CrowdSec Central API (interval: %s once, then %s)", a.pushIntervalFirst.Round(time.Second), a.pushInterval)
  150. for {
  151. select {
  152. case <-a.pushTomb.Dying(): // if one apic routine is dying, do we kill the others?
  153. a.pullTomb.Kill(nil)
  154. a.metricsTomb.Kill(nil)
  155. log.Infof("push tomb is dying, sending cache (%d elements) before exiting", len(cache))
  156. if len(cache) == 0 {
  157. return nil
  158. }
  159. go a.Send(&cache)
  160. return nil
  161. case <-ticker.C:
  162. ticker.Reset(a.pushInterval)
  163. if len(cache) > 0 {
  164. a.mu.Lock()
  165. cacheCopy := cache
  166. cache = make(models.AddSignalsRequest, 0)
  167. a.mu.Unlock()
  168. log.Infof("Signal push: %d signals to push", len(cacheCopy))
  169. go a.Send(&cacheCopy)
  170. }
  171. case alerts := <-a.alertToPush:
  172. var signals []*models.AddSignalsRequestItem
  173. for _, alert := range alerts {
  174. if ok := shouldShareAlert(alert, a.consoleConfig); ok {
  175. signals = append(signals, alertToSignal(alert, getScenarioTrustOfAlert(alert), *a.consoleConfig.ShareContext))
  176. }
  177. }
  178. a.mu.Lock()
  179. cache = append(cache, signals...)
  180. a.mu.Unlock()
  181. }
  182. }
  183. }
  184. func getScenarioTrustOfAlert(alert *models.Alert) string {
  185. scenarioTrust := "certified"
  186. if alert.ScenarioHash == nil || *alert.ScenarioHash == "" {
  187. scenarioTrust = "custom"
  188. } else if alert.ScenarioVersion == nil || *alert.ScenarioVersion == "" || *alert.ScenarioVersion == "?" {
  189. scenarioTrust = "tainted"
  190. }
  191. if len(alert.Decisions) > 0 {
  192. if *alert.Decisions[0].Origin == "cscli" {
  193. scenarioTrust = "manual"
  194. }
  195. }
  196. return scenarioTrust
  197. }
  198. func shouldShareAlert(alert *models.Alert, consoleConfig *csconfig.ConsoleConfig) bool {
  199. if *alert.Simulated {
  200. log.Debugf("simulation enabled for alert (id:%d), will not be sent to CAPI", alert.ID)
  201. return false
  202. }
  203. switch scenarioTrust := getScenarioTrustOfAlert(alert); scenarioTrust {
  204. case "manual":
  205. if !*consoleConfig.ShareManualDecisions {
  206. log.Debugf("manual decision generated an alert, doesn't send it to CAPI because options is disabled")
  207. return false
  208. }
  209. case "tainted":
  210. if !*consoleConfig.ShareTaintedScenarios {
  211. log.Debugf("tainted scenario generated an alert, doesn't send it to CAPI because options is disabled")
  212. return false
  213. }
  214. case "custom":
  215. if !*consoleConfig.ShareCustomScenarios {
  216. log.Debugf("custom scenario generated an alert, doesn't send it to CAPI because options is disabled")
  217. return false
  218. }
  219. }
  220. return true
  221. }
  222. func (a *apic) Send(cacheOrig *models.AddSignalsRequest) {
  223. /*we do have a problem with this :
  224. The apic.Push background routine reads from alertToPush chan.
  225. This chan is filled by Controller.CreateAlert
  226. If the chan apic.Send hangs, the alertToPush chan will become full,
  227. with means that Controller.CreateAlert is going to hang, blocking API worker(s).
  228. So instead, we prefer to cancel write.
  229. I don't know enough about gin to tell how much of an issue it can be.
  230. */
  231. var cache []*models.AddSignalsRequestItem = *cacheOrig
  232. var send models.AddSignalsRequest
  233. bulkSize := 50
  234. pageStart := 0
  235. pageEnd := bulkSize
  236. for {
  237. if pageEnd >= len(cache) {
  238. send = cache[pageStart:]
  239. ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
  240. defer cancel()
  241. _, _, err := a.apiClient.Signal.Add(ctx, &send)
  242. if err != nil {
  243. log.Errorf("Error while sending final chunk to central API : %s", err)
  244. return
  245. }
  246. break
  247. }
  248. send = cache[pageStart:pageEnd]
  249. ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
  250. defer cancel()
  251. _, _, err := a.apiClient.Signal.Add(ctx, &send)
  252. if err != nil {
  253. //we log it here as well, because the return value of func might be discarded
  254. log.Errorf("Error while sending chunk to central API : %s", err)
  255. }
  256. pageStart += bulkSize
  257. pageEnd += bulkSize
  258. }
  259. }
  260. func (a *apic) CAPIPullIsOld() (bool, error) {
  261. /*only pull community blocklist if it's older than 1h30 */
  262. alerts := a.dbClient.Ent.Alert.Query()
  263. alerts = alerts.Where(alert.HasDecisionsWith(decision.OriginEQ(database.CapiMachineID)))
  264. alerts = alerts.Where(alert.CreatedAtGTE(time.Now().UTC().Add(-time.Duration(1*time.Hour + 30*time.Minute)))) //nolint:unconvert
  265. count, err := alerts.Count(a.dbClient.CTX)
  266. if err != nil {
  267. return false, errors.Wrap(err, "while looking for CAPI alert")
  268. }
  269. if count > 0 {
  270. log.Printf("last CAPI pull is newer than 1h30, skip.")
  271. return false, nil
  272. }
  273. return true, nil
  274. }
  275. func (a *apic) HandleDeletedDecisions(deletedDecisions []*models.Decision, delete_counters map[string]map[string]int) (int, error) {
  276. var filter map[string][]string
  277. var nbDeleted int
  278. for _, decision := range deletedDecisions {
  279. if strings.ToLower(*decision.Scope) == "ip" {
  280. filter = make(map[string][]string, 1)
  281. filter["value"] = []string{*decision.Value}
  282. } else {
  283. filter = make(map[string][]string, 3)
  284. filter["value"] = []string{*decision.Value}
  285. filter["type"] = []string{*decision.Type}
  286. filter["scopes"] = []string{*decision.Scope}
  287. }
  288. filter["origin"] = []string{*decision.Origin}
  289. dbCliRet, err := a.dbClient.SoftDeleteDecisionsWithFilter(filter)
  290. if err != nil {
  291. return 0, errors.Wrap(err, "deleting decisions error")
  292. }
  293. dbCliDel, err := strconv.Atoi(dbCliRet)
  294. if err != nil {
  295. return 0, errors.Wrapf(err, "converting db ret %d", dbCliDel)
  296. }
  297. updateCounterForDecision(delete_counters, decision, dbCliDel)
  298. nbDeleted += dbCliDel
  299. }
  300. return nbDeleted, nil
  301. }
  302. func createAlertsForDecisions(decisions []*models.Decision) []*models.Alert {
  303. newAlerts := make([]*models.Alert, 0)
  304. for _, decision := range decisions {
  305. found := false
  306. for _, sub := range newAlerts {
  307. if sub.Source.Scope == nil {
  308. log.Warningf("nil scope in %+v", sub)
  309. continue
  310. }
  311. if *decision.Origin == SCOPE_CAPI {
  312. if *sub.Source.Scope == SCOPE_CAPI {
  313. found = true
  314. break
  315. }
  316. } else if *decision.Origin == SCOPE_LISTS {
  317. if *sub.Source.Scope == *decision.Origin {
  318. if sub.Scenario == nil {
  319. log.Warningf("nil scenario in %+v", sub)
  320. }
  321. if *sub.Scenario == *decision.Scenario {
  322. found = true
  323. break
  324. }
  325. }
  326. } else {
  327. log.Warningf("unknown origin %s : %+v", *decision.Origin, decision)
  328. }
  329. }
  330. if !found {
  331. log.Debugf("Create entry for origin:%s scenario:%s", *decision.Origin, *decision.Scenario)
  332. newAlerts = append(newAlerts, createAlertForDecision(decision))
  333. }
  334. }
  335. return newAlerts
  336. }
  337. func createAlertForDecision(decision *models.Decision) *models.Alert {
  338. newAlert := &models.Alert{}
  339. newAlert.Source = &models.Source{}
  340. newAlert.Source.Scope = types.StrPtr("")
  341. if *decision.Origin == SCOPE_CAPI { //to make things more user friendly, we replace CAPI with community-blocklist
  342. newAlert.Scenario = types.StrPtr(SCOPE_CAPI)
  343. newAlert.Source.Scope = types.StrPtr(SCOPE_CAPI)
  344. } else if *decision.Origin == SCOPE_LISTS {
  345. newAlert.Scenario = types.StrPtr(*decision.Scenario)
  346. newAlert.Source.Scope = types.StrPtr(SCOPE_LISTS)
  347. } else {
  348. log.Warningf("unknown origin %s", *decision.Origin)
  349. }
  350. newAlert.Message = types.StrPtr("")
  351. newAlert.Source.Value = types.StrPtr("")
  352. newAlert.StartAt = types.StrPtr(time.Now().UTC().Format(time.RFC3339))
  353. newAlert.StopAt = types.StrPtr(time.Now().UTC().Format(time.RFC3339))
  354. newAlert.Capacity = types.Int32Ptr(0)
  355. newAlert.Simulated = types.BoolPtr(false)
  356. newAlert.EventsCount = types.Int32Ptr(0)
  357. newAlert.Leakspeed = types.StrPtr("")
  358. newAlert.ScenarioHash = types.StrPtr("")
  359. newAlert.ScenarioVersion = types.StrPtr("")
  360. newAlert.MachineID = database.CapiMachineID
  361. return newAlert
  362. }
  363. // This function takes in list of parent alerts and decisions and then pairs them up.
  364. func fillAlertsWithDecisions(alerts []*models.Alert, decisions []*models.Decision, add_counters map[string]map[string]int) []*models.Alert {
  365. for _, decision := range decisions {
  366. //count and create separate alerts for each list
  367. updateCounterForDecision(add_counters, decision, 1)
  368. /*CAPI might send lower case scopes, unify it.*/
  369. switch strings.ToLower(*decision.Scope) {
  370. case "ip":
  371. *decision.Scope = types.Ip
  372. case "range":
  373. *decision.Scope = types.Range
  374. }
  375. found := false
  376. //add the individual decisions to the right list
  377. for idx, alert := range alerts {
  378. if *decision.Origin == SCOPE_CAPI {
  379. if *alert.Source.Scope == SCOPE_CAPI {
  380. alerts[idx].Decisions = append(alerts[idx].Decisions, decision)
  381. found = true
  382. break
  383. }
  384. } else if *decision.Origin == SCOPE_LISTS {
  385. if *alert.Source.Scope == SCOPE_LISTS && *alert.Scenario == *decision.Scenario {
  386. alerts[idx].Decisions = append(alerts[idx].Decisions, decision)
  387. found = true
  388. break
  389. }
  390. } else {
  391. log.Warningf("unknown origin %s", *decision.Origin)
  392. }
  393. }
  394. if !found {
  395. log.Warningf("Orphaned decision for %s - %s", *decision.Origin, *decision.Scenario)
  396. }
  397. }
  398. return alerts
  399. }
  400. // we receive only one list of decisions, that we need to break-up :
  401. // one alert for "community blocklist"
  402. // one alert per list we're subscribed to
  403. func (a *apic) PullTop() error {
  404. var err error
  405. if lastPullIsOld, err := a.CAPIPullIsOld(); err != nil {
  406. return err
  407. } else if !lastPullIsOld {
  408. return nil
  409. }
  410. log.Infof("Starting community-blocklist update")
  411. data, _, err := a.apiClient.Decisions.GetStream(context.Background(), apiclient.DecisionsStreamOpts{Startup: a.startup})
  412. if err != nil {
  413. return errors.Wrap(err, "get stream")
  414. }
  415. a.startup = false
  416. /*to count additions/deletions across lists*/
  417. log.Debugf("Received %d new decisions", len(data.New))
  418. log.Debugf("Received %d deleted decisions", len(data.Deleted))
  419. add_counters, delete_counters := makeAddAndDeleteCounters()
  420. // process deleted decisions
  421. if nbDeleted, err := a.HandleDeletedDecisions(data.Deleted, delete_counters); err != nil {
  422. return err
  423. } else {
  424. log.Printf("capi/community-blocklist : %d explicit deletions", nbDeleted)
  425. }
  426. if len(data.New) == 0 {
  427. log.Infof("capi/community-blocklist : received 0 new entries (expected if you just installed crowdsec)")
  428. return nil
  429. }
  430. // we receive only one list of decisions, that we need to break-up :
  431. // one alert for "community blocklist"
  432. // one alert per list we're subscribed to
  433. alertsFromCapi := createAlertsForDecisions(data.New)
  434. alertsFromCapi = fillAlertsWithDecisions(alertsFromCapi, data.New, add_counters)
  435. for idx, alert := range alertsFromCapi {
  436. alertsFromCapi[idx] = setAlertScenario(add_counters, delete_counters, alert)
  437. log.Debugf("%s has %d decisions", *alertsFromCapi[idx].Source.Scope, len(alertsFromCapi[idx].Decisions))
  438. if a.dbClient.Type == "sqlite" && (a.dbClient.WalMode == nil || !*a.dbClient.WalMode) {
  439. log.Warningf("sqlite is not using WAL mode, LAPI might become unresponsive when inserting the community blocklist")
  440. }
  441. alertID, inserted, deleted, err := a.dbClient.UpdateCommunityBlocklist(alertsFromCapi[idx])
  442. if err != nil {
  443. return errors.Wrapf(err, "while saving alert from %s", *alertsFromCapi[idx].Source.Scope)
  444. }
  445. log.Printf("%s : added %d entries, deleted %d entries (alert:%d)", *alertsFromCapi[idx].Source.Scope, inserted, deleted, alertID)
  446. }
  447. return nil
  448. }
  449. func setAlertScenario(add_counters map[string]map[string]int, delete_counters map[string]map[string]int, alert *models.Alert) *models.Alert {
  450. if *alert.Source.Scope == SCOPE_CAPI {
  451. *alert.Source.Scope = SCOPE_CAPI_ALIAS
  452. alert.Scenario = types.StrPtr(fmt.Sprintf("update : +%d/-%d IPs", add_counters[SCOPE_CAPI]["all"], delete_counters[SCOPE_CAPI]["all"]))
  453. } else if *alert.Source.Scope == SCOPE_LISTS {
  454. *alert.Source.Scope = fmt.Sprintf("%s:%s", SCOPE_LISTS, *alert.Scenario)
  455. alert.Scenario = types.StrPtr(fmt.Sprintf("update : +%d/-%d IPs", add_counters[SCOPE_LISTS][*alert.Scenario], delete_counters[SCOPE_LISTS][*alert.Scenario]))
  456. }
  457. return alert
  458. }
  459. func (a *apic) Pull() error {
  460. defer types.CatchPanic("lapi/pullFromAPIC")
  461. toldOnce := false
  462. for {
  463. scenario, err := a.FetchScenariosListFromDB()
  464. if err != nil {
  465. log.Errorf("unable to fetch scenarios from db: %s", err)
  466. }
  467. if len(scenario) > 0 {
  468. break
  469. }
  470. if !toldOnce {
  471. log.Warning("scenario list is empty, will not pull yet")
  472. toldOnce = true
  473. }
  474. time.Sleep(1 * time.Second)
  475. }
  476. if err := a.PullTop(); err != nil {
  477. log.Errorf("capi pull top: %s", err)
  478. }
  479. log.Infof("Start pull from CrowdSec Central API (interval: %s once, then %s)", a.pullIntervalFirst.Round(time.Second), a.pullInterval)
  480. ticker := time.NewTicker(a.pullIntervalFirst)
  481. for {
  482. select {
  483. case <-ticker.C:
  484. ticker.Reset(a.pullInterval)
  485. if err := a.PullTop(); err != nil {
  486. log.Errorf("capi pull top: %s", err)
  487. continue
  488. }
  489. case <-a.pullTomb.Dying(): // if one apic routine is dying, do we kill the others?
  490. a.metricsTomb.Kill(nil)
  491. a.pushTomb.Kill(nil)
  492. return nil
  493. }
  494. }
  495. }
  496. func (a *apic) GetMetrics() (*models.Metrics, error) {
  497. metric := &models.Metrics{
  498. ApilVersion: types.StrPtr(cwversion.VersionStr()),
  499. Machines: make([]*models.MetricsAgentInfo, 0),
  500. Bouncers: make([]*models.MetricsBouncerInfo, 0),
  501. }
  502. machines, err := a.dbClient.ListMachines()
  503. if err != nil {
  504. return metric, err
  505. }
  506. bouncers, err := a.dbClient.ListBouncers()
  507. if err != nil {
  508. return metric, err
  509. }
  510. var lastpush string
  511. for _, machine := range machines {
  512. if machine.LastPush == nil {
  513. lastpush = time.Time{}.String()
  514. } else {
  515. lastpush = machine.LastPush.String()
  516. }
  517. m := &models.MetricsAgentInfo{
  518. Version: machine.Version,
  519. Name: machine.MachineId,
  520. LastUpdate: machine.UpdatedAt.String(),
  521. LastPush: lastpush,
  522. }
  523. metric.Machines = append(metric.Machines, m)
  524. }
  525. for _, bouncer := range bouncers {
  526. m := &models.MetricsBouncerInfo{
  527. Version: bouncer.Version,
  528. CustomName: bouncer.Name,
  529. Name: bouncer.Type,
  530. LastPull: bouncer.LastPull.String(),
  531. }
  532. metric.Bouncers = append(metric.Bouncers, m)
  533. }
  534. return metric, nil
  535. }
  536. func (a *apic) SendMetrics(stop chan (bool)) {
  537. defer types.CatchPanic("lapi/metricsToAPIC")
  538. ticker := time.NewTicker(a.metricsIntervalFirst)
  539. log.Infof("Start send metrics to CrowdSec Central API (interval: %s once, then %s)", a.metricsIntervalFirst.Round(time.Second), a.metricsInterval)
  540. for {
  541. metrics, err := a.GetMetrics()
  542. if err != nil {
  543. log.Errorf("unable to get metrics (%s), will retry", err)
  544. }
  545. _, _, err = a.apiClient.Metrics.Add(context.Background(), metrics)
  546. if err != nil {
  547. log.Errorf("capi metrics: failed: %s", err)
  548. } else {
  549. log.Infof("capi metrics: metrics sent successfully")
  550. }
  551. select {
  552. case <-stop:
  553. return
  554. case <-ticker.C:
  555. ticker.Reset(a.metricsInterval)
  556. case <-a.metricsTomb.Dying(): // if one apic routine is dying, do we kill the others?
  557. a.pullTomb.Kill(nil)
  558. a.pushTomb.Kill(nil)
  559. return
  560. }
  561. }
  562. }
  563. func (a *apic) Shutdown() {
  564. a.pushTomb.Kill(nil)
  565. a.pullTomb.Kill(nil)
  566. a.metricsTomb.Kill(nil)
  567. }
  568. func makeAddAndDeleteCounters() (map[string]map[string]int, map[string]map[string]int) {
  569. add_counters := make(map[string]map[string]int)
  570. add_counters[SCOPE_CAPI] = make(map[string]int)
  571. add_counters[SCOPE_LISTS] = make(map[string]int)
  572. delete_counters := make(map[string]map[string]int)
  573. delete_counters[SCOPE_CAPI] = make(map[string]int)
  574. delete_counters[SCOPE_LISTS] = make(map[string]int)
  575. return add_counters, delete_counters
  576. }
  577. func updateCounterForDecision(counter map[string]map[string]int, decision *models.Decision, totalDecisions int) {
  578. if *decision.Origin == SCOPE_CAPI {
  579. counter[*decision.Origin]["all"] += totalDecisions
  580. } else if *decision.Origin == SCOPE_LISTS {
  581. counter[*decision.Origin][*decision.Scenario] += totalDecisions
  582. } else {
  583. log.Warningf("Unknown origin %s", *decision.Origin)
  584. }
  585. }