apic.go 19 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606
  1. package apiserver
  2. import (
  3. "context"
  4. "fmt"
  5. "net/url"
  6. "strconv"
  7. "strings"
  8. "sync"
  9. "time"
  10. "github.com/crowdsecurity/crowdsec/pkg/apiclient"
  11. "github.com/crowdsecurity/crowdsec/pkg/csconfig"
  12. "github.com/crowdsecurity/crowdsec/pkg/cwversion"
  13. "github.com/crowdsecurity/crowdsec/pkg/database"
  14. "github.com/crowdsecurity/crowdsec/pkg/database/ent/alert"
  15. "github.com/crowdsecurity/crowdsec/pkg/database/ent/decision"
  16. "github.com/crowdsecurity/crowdsec/pkg/models"
  17. "github.com/crowdsecurity/crowdsec/pkg/types"
  18. "github.com/go-openapi/strfmt"
  19. "github.com/pkg/errors"
  20. log "github.com/sirupsen/logrus"
  21. "gopkg.in/tomb.v2"
  22. )
  23. var (
  24. PullInterval = time.Hour * 2
  25. PushInterval = time.Second * 30
  26. MetricsInterval = time.Minute * 30
  27. )
  28. var SCOPE_CAPI string = "CAPI"
  29. var SCOPE_CAPI_ALIAS string = "crowdsecurity/community-blocklist" //we don't use "CAPI" directly, to make it less confusing for the user
  30. var SCOPE_LISTS string = "lists"
  31. type apic struct {
  32. pullInterval time.Duration
  33. pushInterval time.Duration
  34. metricsInterval time.Duration
  35. dbClient *database.Client
  36. apiClient *apiclient.ApiClient
  37. alertToPush chan []*models.Alert
  38. mu sync.Mutex
  39. pushTomb tomb.Tomb
  40. pullTomb tomb.Tomb
  41. metricsTomb tomb.Tomb
  42. startup bool
  43. credentials *csconfig.ApiCredentialsCfg
  44. scenarioList []string
  45. consoleConfig *csconfig.ConsoleConfig
  46. }
  47. func (a *apic) FetchScenariosListFromDB() ([]string, error) {
  48. scenarios := make([]string, 0)
  49. machines, err := a.dbClient.ListMachines()
  50. if err != nil {
  51. return nil, errors.Wrap(err, "while listing machines")
  52. }
  53. //merge all scenarios together
  54. for _, v := range machines {
  55. machineScenarios := strings.Split(v.Scenarios, ",")
  56. log.Debugf("%d scenarios for machine %d", len(machineScenarios), v.ID)
  57. for _, sv := range machineScenarios {
  58. if !types.InSlice(sv, scenarios) && sv != "" {
  59. scenarios = append(scenarios, sv)
  60. }
  61. }
  62. }
  63. log.Debugf("Returning list of scenarios : %+v", scenarios)
  64. return scenarios, nil
  65. }
  66. func alertToSignal(alert *models.Alert, scenarioTrust string) *models.AddSignalsRequestItem {
  67. return &models.AddSignalsRequestItem{
  68. Message: alert.Message,
  69. Scenario: alert.Scenario,
  70. ScenarioHash: alert.ScenarioHash,
  71. ScenarioVersion: alert.ScenarioVersion,
  72. Source: alert.Source,
  73. StartAt: alert.StartAt,
  74. StopAt: alert.StopAt,
  75. CreatedAt: alert.CreatedAt,
  76. MachineID: alert.MachineID,
  77. ScenarioTrust: &scenarioTrust,
  78. }
  79. }
  80. func NewAPIC(config *csconfig.OnlineApiClientCfg, dbClient *database.Client, consoleConfig *csconfig.ConsoleConfig) (*apic, error) {
  81. var err error
  82. ret := &apic{
  83. alertToPush: make(chan []*models.Alert),
  84. dbClient: dbClient,
  85. mu: sync.Mutex{},
  86. startup: true,
  87. credentials: config.Credentials,
  88. pullTomb: tomb.Tomb{},
  89. pushTomb: tomb.Tomb{},
  90. metricsTomb: tomb.Tomb{},
  91. scenarioList: make([]string, 0),
  92. consoleConfig: consoleConfig,
  93. pullInterval: PullInterval,
  94. pushInterval: PushInterval,
  95. metricsInterval: MetricsInterval,
  96. }
  97. password := strfmt.Password(config.Credentials.Password)
  98. apiURL, err := url.Parse(config.Credentials.URL)
  99. if err != nil {
  100. return nil, errors.Wrapf(err, "while parsing '%s'", config.Credentials.URL)
  101. }
  102. ret.scenarioList, err = ret.FetchScenariosListFromDB()
  103. if err != nil {
  104. return nil, errors.Wrap(err, "while fetching scenarios from db")
  105. }
  106. ret.apiClient, err = apiclient.NewClient(&apiclient.Config{
  107. MachineID: config.Credentials.Login,
  108. Password: password,
  109. UserAgent: fmt.Sprintf("crowdsec/%s", cwversion.VersionStr()),
  110. URL: apiURL,
  111. VersionPrefix: "v2",
  112. Scenarios: ret.scenarioList,
  113. UpdateScenario: ret.FetchScenariosListFromDB,
  114. })
  115. return ret, err
  116. }
  117. // keep track of all alerts in cache and push it to CAPI every PushInterval.
  118. func (a *apic) Push() error {
  119. defer types.CatchPanic("lapi/pushToAPIC")
  120. var cache models.AddSignalsRequest
  121. ticker := time.NewTicker(a.pushInterval)
  122. log.Infof("Start push to CrowdSec Central API (interval: %s)", PushInterval)
  123. for {
  124. select {
  125. case <-a.pushTomb.Dying(): // if one apic routine is dying, do we kill the others?
  126. a.pullTomb.Kill(nil)
  127. a.metricsTomb.Kill(nil)
  128. log.Infof("push tomb is dying, sending cache (%d elements) before exiting", len(cache))
  129. if len(cache) == 0 {
  130. return nil
  131. }
  132. go a.Send(&cache)
  133. return nil
  134. case <-ticker.C:
  135. if len(cache) > 0 {
  136. a.mu.Lock()
  137. cacheCopy := cache
  138. cache = make(models.AddSignalsRequest, 0)
  139. a.mu.Unlock()
  140. log.Infof("Signal push: %d signals to push", len(cacheCopy))
  141. go a.Send(&cacheCopy)
  142. }
  143. case alerts := <-a.alertToPush:
  144. var signals []*models.AddSignalsRequestItem
  145. for _, alert := range alerts {
  146. if ok := shouldShareAlert(alert, a.consoleConfig); ok {
  147. signals = append(signals, alertToSignal(alert, getScenarioTrustOfAlert(alert)))
  148. }
  149. }
  150. a.mu.Lock()
  151. cache = append(cache, signals...)
  152. a.mu.Unlock()
  153. }
  154. }
  155. }
  156. func getScenarioTrustOfAlert(alert *models.Alert) string {
  157. scenarioTrust := "certified"
  158. if alert.ScenarioHash == nil || *alert.ScenarioHash == "" {
  159. scenarioTrust = "custom"
  160. } else if alert.ScenarioVersion == nil || *alert.ScenarioVersion == "" || *alert.ScenarioVersion == "?" {
  161. scenarioTrust = "tainted"
  162. }
  163. if len(alert.Decisions) > 0 {
  164. if *alert.Decisions[0].Origin == "cscli" {
  165. scenarioTrust = "manual"
  166. }
  167. }
  168. return scenarioTrust
  169. }
  170. func shouldShareAlert(alert *models.Alert, consoleConfig *csconfig.ConsoleConfig) bool {
  171. if *alert.Simulated {
  172. log.Debugf("simulation enabled for alert (id:%d), will not be sent to CAPI", alert.ID)
  173. return false
  174. }
  175. switch scenarioTrust := getScenarioTrustOfAlert(alert); scenarioTrust {
  176. case "manual":
  177. if !*consoleConfig.ShareManualDecisions {
  178. log.Debugf("manual decision generated an alert, doesn't send it to CAPI because options is disabled")
  179. return false
  180. }
  181. case "tainted":
  182. if !*consoleConfig.ShareTaintedScenarios {
  183. log.Debugf("tainted scenario generated an alert, doesn't send it to CAPI because options is disabled")
  184. return false
  185. }
  186. case "custom":
  187. if !*consoleConfig.ShareCustomScenarios {
  188. log.Debugf("custom scenario generated an alert, doesn't send it to CAPI because options is disabled")
  189. return false
  190. }
  191. }
  192. return true
  193. }
  194. func (a *apic) Send(cacheOrig *models.AddSignalsRequest) {
  195. /*we do have a problem with this :
  196. The apic.Push background routine reads from alertToPush chan.
  197. This chan is filled by Controller.CreateAlert
  198. If the chan apic.Send hangs, the alertToPush chan will become full,
  199. with means that Controller.CreateAlert is going to hang, blocking API worker(s).
  200. So instead, we prefer to cancel write.
  201. I don't know enough about gin to tell how much of an issue it can be.
  202. */
  203. var cache []*models.AddSignalsRequestItem = *cacheOrig
  204. var send models.AddSignalsRequest
  205. bulkSize := 50
  206. pageStart := 0
  207. pageEnd := bulkSize
  208. for {
  209. if pageEnd >= len(cache) {
  210. send = cache[pageStart:]
  211. ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
  212. defer cancel()
  213. _, _, err := a.apiClient.Signal.Add(ctx, &send)
  214. if err != nil {
  215. log.Errorf("Error while sending final chunk to central API : %s", err)
  216. return
  217. }
  218. break
  219. }
  220. send = cache[pageStart:pageEnd]
  221. ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
  222. defer cancel()
  223. _, _, err := a.apiClient.Signal.Add(ctx, &send)
  224. if err != nil {
  225. //we log it here as well, because the return value of func might be discarded
  226. log.Errorf("Error while sending chunk to central API : %s", err)
  227. }
  228. pageStart += bulkSize
  229. pageEnd += bulkSize
  230. }
  231. }
  232. func (a *apic) CAPIPullIsOld() (bool, error) {
  233. /*only pull community blocklist if it's older than 1h30 */
  234. alerts := a.dbClient.Ent.Alert.Query()
  235. alerts = alerts.Where(alert.HasDecisionsWith(decision.OriginEQ(database.CapiMachineID)))
  236. alerts = alerts.Where(alert.CreatedAtGTE(time.Now().UTC().Add(-time.Duration(1*time.Hour + 30*time.Minute)))) //nolint:unconvert
  237. count, err := alerts.Count(a.dbClient.CTX)
  238. if err != nil {
  239. return false, errors.Wrap(err, "while looking for CAPI alert")
  240. }
  241. if count > 0 {
  242. log.Printf("last CAPI pull is newer than 1h30, skip.")
  243. return false, nil
  244. }
  245. return true, nil
  246. }
  247. func (a *apic) HandleDeletedDecisions(deletedDecisions []*models.Decision, delete_counters map[string]map[string]int) (int, error) {
  248. var filter map[string][]string
  249. var nbDeleted int
  250. for _, decision := range deletedDecisions {
  251. if strings.ToLower(*decision.Scope) == "ip" {
  252. filter = make(map[string][]string, 1)
  253. filter["value"] = []string{*decision.Value}
  254. } else {
  255. filter = make(map[string][]string, 3)
  256. filter["value"] = []string{*decision.Value}
  257. filter["type"] = []string{*decision.Type}
  258. filter["scopes"] = []string{*decision.Scope}
  259. }
  260. filter["origin"] = []string{*decision.Origin}
  261. dbCliRet, err := a.dbClient.SoftDeleteDecisionsWithFilter(filter)
  262. if err != nil {
  263. return 0, errors.Wrap(err, "deleting decisions error")
  264. }
  265. dbCliDel, err := strconv.Atoi(dbCliRet)
  266. if err != nil {
  267. return 0, errors.Wrapf(err, "converting db ret %d", dbCliDel)
  268. }
  269. updateCounterForDecision(delete_counters, decision, dbCliDel)
  270. nbDeleted += dbCliDel
  271. }
  272. return nbDeleted, nil
  273. }
  274. func createAlertsForDecisions(decisions []*models.Decision) []*models.Alert {
  275. newAlerts := make([]*models.Alert, 0)
  276. for _, decision := range decisions {
  277. found := false
  278. for _, sub := range newAlerts {
  279. if sub.Source.Scope == nil {
  280. log.Warningf("nil scope in %+v", sub)
  281. continue
  282. }
  283. if *decision.Origin == SCOPE_CAPI {
  284. if *sub.Source.Scope == SCOPE_CAPI {
  285. found = true
  286. break
  287. }
  288. } else if *decision.Origin == SCOPE_LISTS {
  289. if *sub.Source.Scope == *decision.Origin {
  290. if sub.Scenario == nil {
  291. log.Warningf("nil scenario in %+v", sub)
  292. }
  293. if *sub.Scenario == *decision.Scenario {
  294. found = true
  295. break
  296. }
  297. }
  298. } else {
  299. log.Warningf("unknown origin %s : %+v", *decision.Origin, decision)
  300. }
  301. }
  302. if !found {
  303. log.Debugf("Create entry for origin:%s scenario:%s", *decision.Origin, *decision.Scenario)
  304. newAlerts = append(newAlerts, createAlertForDecision(decision))
  305. }
  306. }
  307. return newAlerts
  308. }
  309. func createAlertForDecision(decision *models.Decision) *models.Alert {
  310. newAlert := &models.Alert{}
  311. newAlert.Source = &models.Source{}
  312. newAlert.Source.Scope = types.StrPtr("")
  313. if *decision.Origin == SCOPE_CAPI { //to make things more user friendly, we replace CAPI with community-blocklist
  314. newAlert.Scenario = types.StrPtr(SCOPE_CAPI)
  315. newAlert.Source.Scope = types.StrPtr(SCOPE_CAPI)
  316. } else if *decision.Origin == SCOPE_LISTS {
  317. newAlert.Scenario = types.StrPtr(*decision.Scenario)
  318. newAlert.Source.Scope = types.StrPtr(SCOPE_LISTS)
  319. } else {
  320. log.Warningf("unknown origin %s", *decision.Origin)
  321. }
  322. newAlert.Message = types.StrPtr("")
  323. newAlert.Source.Value = types.StrPtr("")
  324. newAlert.StartAt = types.StrPtr(time.Now().UTC().Format(time.RFC3339))
  325. newAlert.StopAt = types.StrPtr(time.Now().UTC().Format(time.RFC3339))
  326. newAlert.Capacity = types.Int32Ptr(0)
  327. newAlert.Simulated = types.BoolPtr(false)
  328. newAlert.EventsCount = types.Int32Ptr(0)
  329. newAlert.Leakspeed = types.StrPtr("")
  330. newAlert.ScenarioHash = types.StrPtr("")
  331. newAlert.ScenarioVersion = types.StrPtr("")
  332. newAlert.MachineID = database.CapiMachineID
  333. return newAlert
  334. }
  335. // This function takes in list of parent alerts and decisions and then pairs them up.
  336. func fillAlertsWithDecisions(alerts []*models.Alert, decisions []*models.Decision, add_counters map[string]map[string]int) []*models.Alert {
  337. for _, decision := range decisions {
  338. //count and create separate alerts for each list
  339. updateCounterForDecision(add_counters, decision, 1)
  340. /*CAPI might send lower case scopes, unify it.*/
  341. switch strings.ToLower(*decision.Scope) {
  342. case "ip":
  343. *decision.Scope = types.Ip
  344. case "range":
  345. *decision.Scope = types.Range
  346. }
  347. found := false
  348. //add the individual decisions to the right list
  349. for idx, alert := range alerts {
  350. if *decision.Origin == SCOPE_CAPI {
  351. if *alert.Source.Scope == SCOPE_CAPI {
  352. alerts[idx].Decisions = append(alerts[idx].Decisions, decision)
  353. found = true
  354. break
  355. }
  356. } else if *decision.Origin == SCOPE_LISTS {
  357. if *alert.Source.Scope == SCOPE_LISTS && *alert.Scenario == *decision.Scenario {
  358. alerts[idx].Decisions = append(alerts[idx].Decisions, decision)
  359. found = true
  360. break
  361. }
  362. } else {
  363. log.Warningf("unknown origin %s", *decision.Origin)
  364. }
  365. }
  366. if !found {
  367. log.Warningf("Orphaned decision for %s - %s", *decision.Origin, *decision.Scenario)
  368. }
  369. }
  370. return alerts
  371. }
  372. // we receive only one list of decisions, that we need to break-up :
  373. // one alert for "community blocklist"
  374. // one alert per list we're subscribed to
  375. func (a *apic) PullTop() error {
  376. var err error
  377. if lastPullIsOld, err := a.CAPIPullIsOld(); err != nil {
  378. return err
  379. } else if !lastPullIsOld {
  380. return nil
  381. }
  382. data, _, err := a.apiClient.Decisions.GetStream(context.Background(), apiclient.DecisionsStreamOpts{Startup: a.startup})
  383. if err != nil {
  384. return errors.Wrap(err, "get stream")
  385. }
  386. a.startup = false
  387. /*to count additions/deletions across lists*/
  388. log.Debugf("Received %d new decisions", len(data.New))
  389. log.Debugf("Received %d deleted decisions", len(data.Deleted))
  390. add_counters, delete_counters := makeAddAndDeleteCounters()
  391. // process deleted decisions
  392. if nbDeleted, err := a.HandleDeletedDecisions(data.Deleted, delete_counters); err != nil {
  393. return err
  394. } else {
  395. log.Printf("capi/community-blocklist : %d explicit deletions", nbDeleted)
  396. }
  397. if len(data.New) == 0 {
  398. log.Infof("capi/community-blocklist : received 0 new entries (expected if you just installed crowdsec)")
  399. return nil
  400. }
  401. // we receive only one list of decisions, that we need to break-up :
  402. // one alert for "community blocklist"
  403. // one alert per list we're subscribed to
  404. alertsFromCapi := createAlertsForDecisions(data.New)
  405. alertsFromCapi = fillAlertsWithDecisions(alertsFromCapi, data.New, add_counters)
  406. for idx, alert := range alertsFromCapi {
  407. alertsFromCapi[idx] = setAlertScenario(add_counters, delete_counters, alert)
  408. log.Debugf("%s has %d decisions", *alertsFromCapi[idx].Source.Scope, len(alertsFromCapi[idx].Decisions))
  409. if a.dbClient.Type == "sqlite" && (a.dbClient.WalMode == nil || !*a.dbClient.WalMode) {
  410. log.Warningf("sqlite is not using WAL mode, LAPI might become unresponsive when inserting the community blocklist")
  411. }
  412. alertID, inserted, deleted, err := a.dbClient.UpdateCommunityBlocklist(alertsFromCapi[idx])
  413. if err != nil {
  414. return errors.Wrapf(err, "while saving alert from %s", *alertsFromCapi[idx].Source.Scope)
  415. }
  416. log.Printf("%s : added %d entries, deleted %d entries (alert:%d)", *alertsFromCapi[idx].Source.Scope, inserted, deleted, alertID)
  417. }
  418. return nil
  419. }
  420. func setAlertScenario(add_counters map[string]map[string]int, delete_counters map[string]map[string]int, alert *models.Alert) *models.Alert {
  421. if *alert.Source.Scope == SCOPE_CAPI {
  422. *alert.Source.Scope = SCOPE_CAPI_ALIAS
  423. alert.Scenario = types.StrPtr(fmt.Sprintf("update : +%d/-%d IPs", add_counters[SCOPE_CAPI]["all"], delete_counters[SCOPE_CAPI]["all"]))
  424. } else if *alert.Source.Scope == SCOPE_LISTS {
  425. *alert.Source.Scope = fmt.Sprintf("%s:%s", SCOPE_LISTS, *alert.Scenario)
  426. alert.Scenario = types.StrPtr(fmt.Sprintf("update : +%d/-%d IPs", add_counters[SCOPE_LISTS][*alert.Scenario], delete_counters[SCOPE_LISTS][*alert.Scenario]))
  427. }
  428. return alert
  429. }
  430. func (a *apic) Pull() error {
  431. defer types.CatchPanic("lapi/pullFromAPIC")
  432. log.Infof("Start pull from CrowdSec Central API (interval: %s)", PullInterval)
  433. toldOnce := false
  434. for {
  435. scenario, err := a.FetchScenariosListFromDB()
  436. if err != nil {
  437. log.Errorf("unable to fetch scenarios from db: %s", err)
  438. }
  439. if len(scenario) > 0 {
  440. break
  441. }
  442. if !toldOnce {
  443. log.Warning("scenario list is empty, will not pull yet")
  444. toldOnce = true
  445. }
  446. time.Sleep(1 * time.Second)
  447. }
  448. if err := a.PullTop(); err != nil {
  449. log.Errorf("capi pull top: %s", err)
  450. }
  451. ticker := time.NewTicker(a.pullInterval)
  452. for {
  453. select {
  454. case <-ticker.C:
  455. if err := a.PullTop(); err != nil {
  456. log.Errorf("capi pull top: %s", err)
  457. continue
  458. }
  459. case <-a.pullTomb.Dying(): // if one apic routine is dying, do we kill the others?
  460. a.metricsTomb.Kill(nil)
  461. a.pushTomb.Kill(nil)
  462. return nil
  463. }
  464. }
  465. }
  466. func (a *apic) GetMetrics() (*models.Metrics, error) {
  467. metric := &models.Metrics{
  468. ApilVersion: types.StrPtr(cwversion.VersionStr()),
  469. Machines: make([]*models.MetricsAgentInfo, 0),
  470. Bouncers: make([]*models.MetricsBouncerInfo, 0),
  471. }
  472. machines, err := a.dbClient.ListMachines()
  473. if err != nil {
  474. return metric, err
  475. }
  476. bouncers, err := a.dbClient.ListBouncers()
  477. if err != nil {
  478. return metric, err
  479. }
  480. var lastpush string
  481. for _, machine := range machines {
  482. if machine.LastPush == nil {
  483. lastpush = time.Time{}.String()
  484. } else {
  485. lastpush = machine.LastPush.String()
  486. }
  487. m := &models.MetricsAgentInfo{
  488. Version: machine.Version,
  489. Name: machine.MachineId,
  490. LastUpdate: machine.UpdatedAt.String(),
  491. LastPush: lastpush,
  492. }
  493. metric.Machines = append(metric.Machines, m)
  494. }
  495. for _, bouncer := range bouncers {
  496. m := &models.MetricsBouncerInfo{
  497. Version: bouncer.Version,
  498. CustomName: bouncer.Name,
  499. Name: bouncer.Type,
  500. LastPull: bouncer.LastPull.String(),
  501. }
  502. metric.Bouncers = append(metric.Bouncers, m)
  503. }
  504. return metric, nil
  505. }
  506. func (a *apic) SendMetrics(stop chan (bool)) {
  507. defer types.CatchPanic("lapi/metricsToAPIC")
  508. log.Infof("Start send metrics to CrowdSec Central API (interval: %s)", a.metricsInterval)
  509. ticker := time.NewTicker(a.metricsInterval)
  510. for {
  511. metrics, err := a.GetMetrics()
  512. if err != nil {
  513. log.Errorf("unable to get metrics (%s), will retry", err)
  514. }
  515. _, _, err = a.apiClient.Metrics.Add(context.Background(), metrics)
  516. if err != nil {
  517. log.Errorf("capi metrics: failed: %s", err)
  518. } else {
  519. log.Infof("capi metrics: metrics sent successfully")
  520. }
  521. select {
  522. case <-stop:
  523. return
  524. case <-ticker.C:
  525. continue
  526. case <-a.metricsTomb.Dying(): // if one apic routine is dying, do we kill the others?
  527. a.pullTomb.Kill(nil)
  528. a.pushTomb.Kill(nil)
  529. return
  530. }
  531. }
  532. }
  533. func (a *apic) Shutdown() {
  534. a.pushTomb.Kill(nil)
  535. a.pullTomb.Kill(nil)
  536. a.metricsTomb.Kill(nil)
  537. }
  538. func makeAddAndDeleteCounters() (map[string]map[string]int, map[string]map[string]int) {
  539. add_counters := make(map[string]map[string]int)
  540. add_counters[SCOPE_CAPI] = make(map[string]int)
  541. add_counters[SCOPE_LISTS] = make(map[string]int)
  542. delete_counters := make(map[string]map[string]int)
  543. delete_counters[SCOPE_CAPI] = make(map[string]int)
  544. delete_counters[SCOPE_LISTS] = make(map[string]int)
  545. return add_counters, delete_counters
  546. }
  547. func updateCounterForDecision(counter map[string]map[string]int, decision *models.Decision, totalDecisions int) {
  548. if *decision.Origin == SCOPE_CAPI {
  549. counter[*decision.Origin]["all"] += totalDecisions
  550. } else if *decision.Origin == SCOPE_LISTS {
  551. counter[*decision.Origin][*decision.Scenario] += totalDecisions
  552. } else {
  553. log.Warningf("Unknown origin %s", *decision.Origin)
  554. }
  555. }