apic.go 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574
  1. package apiserver
  2. import (
  3. "context"
  4. "fmt"
  5. "net/url"
  6. "strconv"
  7. "strings"
  8. "sync"
  9. "time"
  10. "github.com/crowdsecurity/crowdsec/pkg/apiclient"
  11. "github.com/crowdsecurity/crowdsec/pkg/csconfig"
  12. "github.com/crowdsecurity/crowdsec/pkg/cwversion"
  13. "github.com/crowdsecurity/crowdsec/pkg/database"
  14. "github.com/crowdsecurity/crowdsec/pkg/database/ent/alert"
  15. "github.com/crowdsecurity/crowdsec/pkg/database/ent/decision"
  16. "github.com/crowdsecurity/crowdsec/pkg/models"
  17. "github.com/crowdsecurity/crowdsec/pkg/types"
  18. "github.com/go-openapi/strfmt"
  19. "github.com/pkg/errors"
  20. log "github.com/sirupsen/logrus"
  21. "gopkg.in/tomb.v2"
  22. )
  23. const (
  24. PullInterval = "2h"
  25. PushInterval = "30s"
  26. MetricsInterval = "30m"
  27. )
  28. type apic struct {
  29. pullInterval time.Duration
  30. pushInterval time.Duration
  31. metricsInterval time.Duration
  32. dbClient *database.Client
  33. apiClient *apiclient.ApiClient
  34. alertToPush chan []*models.Alert
  35. mu sync.Mutex
  36. pushTomb tomb.Tomb
  37. pullTomb tomb.Tomb
  38. metricsTomb tomb.Tomb
  39. startup bool
  40. credentials *csconfig.ApiCredentialsCfg
  41. scenarioList []string
  42. consoleConfig *csconfig.ConsoleConfig
  43. }
  44. func IsInSlice(a string, b []string) bool {
  45. for _, v := range b {
  46. if a == v {
  47. return true
  48. }
  49. }
  50. return false
  51. }
  52. func (a *apic) FetchScenariosListFromDB() ([]string, error) {
  53. scenarios := make([]string, 0)
  54. machines, err := a.dbClient.ListMachines()
  55. if err != nil {
  56. return nil, errors.Wrap(err, "while listing machines")
  57. }
  58. //merge all scenarios together
  59. for _, v := range machines {
  60. machineScenarios := strings.Split(v.Scenarios, ",")
  61. log.Debugf("%d scenarios for machine %d", len(machineScenarios), v.ID)
  62. for _, sv := range machineScenarios {
  63. if !IsInSlice(sv, scenarios) && sv != "" {
  64. scenarios = append(scenarios, sv)
  65. }
  66. }
  67. }
  68. log.Debugf("Returning list of scenarios : %+v", scenarios)
  69. return scenarios, nil
  70. }
  71. func AlertToSignal(alert *models.Alert, scenarioTrust string) *models.AddSignalsRequestItem {
  72. return &models.AddSignalsRequestItem{
  73. Message: alert.Message,
  74. Scenario: alert.Scenario,
  75. ScenarioHash: alert.ScenarioHash,
  76. ScenarioVersion: alert.ScenarioVersion,
  77. Source: alert.Source,
  78. StartAt: alert.StartAt,
  79. StopAt: alert.StopAt,
  80. CreatedAt: alert.CreatedAt,
  81. MachineID: alert.MachineID,
  82. ScenarioTrust: &scenarioTrust,
  83. }
  84. }
  85. func NewAPIC(config *csconfig.OnlineApiClientCfg, dbClient *database.Client, consoleConfig *csconfig.ConsoleConfig) (*apic, error) {
  86. var err error
  87. ret := &apic{
  88. alertToPush: make(chan []*models.Alert),
  89. dbClient: dbClient,
  90. mu: sync.Mutex{},
  91. startup: true,
  92. credentials: config.Credentials,
  93. pullTomb: tomb.Tomb{},
  94. pushTomb: tomb.Tomb{},
  95. metricsTomb: tomb.Tomb{},
  96. scenarioList: make([]string, 0),
  97. consoleConfig: consoleConfig,
  98. }
  99. ret.pullInterval, err = time.ParseDuration(PullInterval)
  100. if err != nil {
  101. return ret, err
  102. }
  103. ret.pushInterval, err = time.ParseDuration(PushInterval)
  104. if err != nil {
  105. return ret, err
  106. }
  107. ret.metricsInterval, err = time.ParseDuration(MetricsInterval)
  108. if err != nil {
  109. return ret, err
  110. }
  111. password := strfmt.Password(config.Credentials.Password)
  112. apiURL, err := url.Parse(config.Credentials.URL)
  113. if err != nil {
  114. return nil, errors.Wrapf(err, "while parsing '%s'", config.Credentials.URL)
  115. }
  116. ret.scenarioList, err = ret.FetchScenariosListFromDB()
  117. if err != nil {
  118. return nil, errors.Wrap(err, "while fetching scenarios from db")
  119. }
  120. ret.apiClient, err = apiclient.NewClient(&apiclient.Config{
  121. MachineID: config.Credentials.Login,
  122. Password: password,
  123. UserAgent: fmt.Sprintf("crowdsec/%s", cwversion.VersionStr()),
  124. URL: apiURL,
  125. VersionPrefix: "v2",
  126. Scenarios: ret.scenarioList,
  127. UpdateScenario: ret.FetchScenariosListFromDB,
  128. })
  129. return ret, err
  130. }
  131. func (a *apic) Push() error {
  132. defer types.CatchPanic("lapi/pushToAPIC")
  133. var cache models.AddSignalsRequest
  134. ticker := time.NewTicker(a.pushInterval)
  135. log.Infof("start crowdsec api push (interval: %s)", PushInterval)
  136. for {
  137. select {
  138. case <-a.pushTomb.Dying(): // if one apic routine is dying, do we kill the others?
  139. a.pullTomb.Kill(nil)
  140. a.metricsTomb.Kill(nil)
  141. log.Infof("push tomb is dying, sending cache (%d elements) before exiting", len(cache))
  142. if len(cache) == 0 {
  143. return nil
  144. }
  145. go a.Send(&cache)
  146. return nil
  147. case <-ticker.C:
  148. if len(cache) > 0 {
  149. a.mu.Lock()
  150. cacheCopy := cache
  151. cache = make(models.AddSignalsRequest, 0)
  152. a.mu.Unlock()
  153. log.Infof("Signal push: %d signals to push", len(cacheCopy))
  154. go a.Send(&cacheCopy)
  155. }
  156. case alerts := <-a.alertToPush:
  157. var signals []*models.AddSignalsRequestItem
  158. for _, alert := range alerts {
  159. if *alert.Simulated {
  160. log.Debugf("simulation enabled for alert (id:%d), will not be sent to CAPI", alert.ID)
  161. continue
  162. }
  163. scenarioTrust := "certified"
  164. if alert.ScenarioHash == nil || *alert.ScenarioHash == "" {
  165. scenarioTrust = "custom"
  166. } else if alert.ScenarioVersion == nil || *alert.ScenarioVersion == "" || *alert.ScenarioVersion == "?" {
  167. scenarioTrust = "tainted"
  168. }
  169. if len(alert.Decisions) > 0 {
  170. if *alert.Decisions[0].Origin == "cscli" {
  171. scenarioTrust = "manual"
  172. }
  173. }
  174. switch scenarioTrust {
  175. case "manual":
  176. if !*a.consoleConfig.ShareManualDecisions {
  177. log.Debugf("manual decision generated an alert, doesn't send it to CAPI because options is disabled")
  178. continue
  179. }
  180. case "tainted":
  181. if !*a.consoleConfig.ShareTaintedScenarios {
  182. log.Debugf("tainted scenario generated an alert, doesn't send it to CAPI because options is disabled")
  183. continue
  184. }
  185. case "custom":
  186. if !*a.consoleConfig.ShareCustomScenarios {
  187. log.Debugf("custom scenario generated an alert, doesn't send it to CAPI because options is disabled")
  188. continue
  189. }
  190. }
  191. signals = append(signals, AlertToSignal(alert, scenarioTrust))
  192. }
  193. a.mu.Lock()
  194. cache = append(cache, signals...)
  195. a.mu.Unlock()
  196. }
  197. }
  198. }
  199. func (a *apic) Send(cacheOrig *models.AddSignalsRequest) {
  200. /*we do have a problem with this :
  201. The apic.Push background routine reads from alertToPush chan.
  202. This chan is filled by Controller.CreateAlert
  203. If the chan apic.Send hangs, the alertToPush chan will become full,
  204. with means that Controller.CreateAlert is going to hang, blocking API worker(s).
  205. So instead, we prefer to cancel write.
  206. I don't know enough about gin to tell how much of an issue it can be.
  207. */
  208. var cache []*models.AddSignalsRequestItem = *cacheOrig
  209. var send models.AddSignalsRequest
  210. bulkSize := 50
  211. pageStart := 0
  212. pageEnd := bulkSize
  213. for {
  214. if pageEnd >= len(cache) {
  215. send = cache[pageStart:]
  216. ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
  217. defer cancel()
  218. _, _, err := a.apiClient.Signal.Add(ctx, &send)
  219. if err != nil {
  220. log.Errorf("Error while sending final chunk to central API : %s", err)
  221. return
  222. }
  223. break
  224. }
  225. send = cache[pageStart:pageEnd]
  226. ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
  227. defer cancel()
  228. _, _, err := a.apiClient.Signal.Add(ctx, &send)
  229. if err != nil {
  230. //we log it here as well, because the return value of func might be discarded
  231. log.Errorf("Error while sending chunk to central API : %s", err)
  232. }
  233. pageStart += bulkSize
  234. pageEnd += bulkSize
  235. }
  236. }
  237. var SCOPE_CAPI string = "CAPI"
  238. var SCOPE_CAPI_ALIAS string = "crowdsecurity/community-blocklist" //we don't use "CAPI" directly, to make it less confusing for the user
  239. var SCOPE_LISTS string = "lists"
  240. func (a *apic) PullTop() error {
  241. var err error
  242. /*only pull community blocklist if it's older than 1h30 */
  243. alerts := a.dbClient.Ent.Alert.Query()
  244. alerts = alerts.Where(alert.HasDecisionsWith(decision.OriginEQ(database.CapiMachineID)))
  245. alerts = alerts.Where(alert.CreatedAtGTE(time.Now().UTC().Add(-time.Duration(1*time.Hour + 30*time.Minute))))
  246. count, err := alerts.Count(a.dbClient.CTX)
  247. if err != nil {
  248. return errors.Wrap(err, "while looking for CAPI alert")
  249. }
  250. if count > 0 {
  251. log.Printf("last CAPI pull is newer than 1h30, skip.")
  252. return nil
  253. }
  254. data, _, err := a.apiClient.Decisions.GetStream(context.Background(), a.startup, []string{})
  255. if err != nil {
  256. return errors.Wrap(err, "get stream")
  257. }
  258. if a.startup {
  259. a.startup = false
  260. }
  261. /*to count additions/deletions accross lists*/
  262. var add_counters map[string]map[string]int
  263. var delete_counters map[string]map[string]int
  264. add_counters = make(map[string]map[string]int)
  265. add_counters[SCOPE_CAPI] = make(map[string]int)
  266. add_counters[SCOPE_LISTS] = make(map[string]int)
  267. delete_counters = make(map[string]map[string]int)
  268. delete_counters[SCOPE_CAPI] = make(map[string]int)
  269. delete_counters[SCOPE_LISTS] = make(map[string]int)
  270. var filter map[string][]string
  271. var nbDeleted int
  272. // process deleted decisions
  273. for _, decision := range data.Deleted {
  274. //count individual deletions
  275. if *decision.Origin == SCOPE_CAPI {
  276. delete_counters[SCOPE_CAPI][*decision.Scenario]++
  277. } else if *decision.Origin == SCOPE_LISTS {
  278. delete_counters[SCOPE_LISTS][*decision.Scenario]++
  279. } else {
  280. log.Warningf("Unknown origin %s", *decision.Origin)
  281. }
  282. if strings.ToLower(*decision.Scope) == "ip" {
  283. filter = make(map[string][]string, 1)
  284. filter["value"] = []string{*decision.Value}
  285. } else {
  286. filter = make(map[string][]string, 3)
  287. filter["value"] = []string{*decision.Value}
  288. filter["type"] = []string{*decision.Type}
  289. filter["value"] = []string{*decision.Scope}
  290. }
  291. dbCliRet, err := a.dbClient.SoftDeleteDecisionsWithFilter(filter)
  292. if err != nil {
  293. return errors.Wrap(err, "deleting decisions error")
  294. }
  295. dbCliDel, err := strconv.Atoi(dbCliRet)
  296. if err != nil {
  297. return errors.Wrapf(err, "converting db ret %d", dbCliDel)
  298. }
  299. nbDeleted += dbCliDel
  300. }
  301. log.Printf("capi/community-blocklist : %d explicit deletions", nbDeleted)
  302. if len(data.New) == 0 {
  303. log.Warnf("capi/community-blocklist : received 0 new entries, CAPI failure ?")
  304. return nil
  305. }
  306. //we receive only one list of decisions, that we need to break-up :
  307. // one alert for "community blocklist"
  308. // one alert per list we're subscribed to
  309. var alertsFromCapi []*models.Alert
  310. alertsFromCapi = make([]*models.Alert, 0)
  311. //iterate over all new decisions, and simply create corresponding alerts
  312. for _, decision := range data.New {
  313. found := false
  314. for _, sub := range alertsFromCapi {
  315. if sub.Source.Scope == nil {
  316. log.Warningf("nil scope in %+v", sub)
  317. continue
  318. }
  319. if *decision.Origin == SCOPE_CAPI {
  320. if *sub.Source.Scope == SCOPE_CAPI {
  321. found = true
  322. break
  323. }
  324. } else if *decision.Origin == SCOPE_LISTS {
  325. if *sub.Source.Scope == *decision.Origin {
  326. if sub.Scenario == nil {
  327. log.Warningf("nil scenario in %+v", sub)
  328. }
  329. if *sub.Scenario == *decision.Scenario {
  330. found = true
  331. break
  332. }
  333. }
  334. } else {
  335. log.Warningf("unknown origin %s : %+v", *decision.Origin, decision)
  336. }
  337. }
  338. if !found {
  339. log.Debugf("Create entry for origin:%s scenario:%s", *decision.Origin, *decision.Scenario)
  340. newAlert := models.Alert{}
  341. newAlert.Message = types.StrPtr("")
  342. newAlert.Source = &models.Source{}
  343. if *decision.Origin == SCOPE_CAPI { //to make things more user friendly, we replace CAPI with community-blocklist
  344. newAlert.Source.Scope = types.StrPtr(SCOPE_CAPI)
  345. newAlert.Scenario = types.StrPtr(SCOPE_CAPI)
  346. } else if *decision.Origin == SCOPE_LISTS {
  347. newAlert.Source.Scope = types.StrPtr(SCOPE_LISTS)
  348. newAlert.Scenario = types.StrPtr(*decision.Scenario)
  349. } else {
  350. log.Warningf("unknown origin %s", *decision.Origin)
  351. }
  352. newAlert.Source.Value = types.StrPtr("")
  353. newAlert.StartAt = types.StrPtr(time.Now().UTC().Format(time.RFC3339))
  354. newAlert.StopAt = types.StrPtr(time.Now().UTC().Format(time.RFC3339))
  355. newAlert.Capacity = types.Int32Ptr(0)
  356. newAlert.Simulated = types.BoolPtr(false)
  357. newAlert.EventsCount = types.Int32Ptr(int32(len(data.New)))
  358. newAlert.Leakspeed = types.StrPtr("")
  359. newAlert.ScenarioHash = types.StrPtr("")
  360. newAlert.ScenarioVersion = types.StrPtr("")
  361. newAlert.MachineID = database.CapiMachineID
  362. alertsFromCapi = append(alertsFromCapi, &newAlert)
  363. }
  364. }
  365. //iterate a second time and fill the alerts with the new decisions
  366. for _, decision := range data.New {
  367. //count and create separate alerts for each list
  368. if *decision.Origin == SCOPE_CAPI {
  369. add_counters[SCOPE_CAPI]["all"]++
  370. } else if *decision.Origin == SCOPE_LISTS {
  371. add_counters[SCOPE_LISTS][*decision.Scenario]++
  372. } else {
  373. log.Warningf("Unknown origin %s", *decision.Origin)
  374. }
  375. /*CAPI might send lower case scopes, unify it.*/
  376. switch strings.ToLower(*decision.Scope) {
  377. case "ip":
  378. *decision.Scope = types.Ip
  379. case "range":
  380. *decision.Scope = types.Range
  381. }
  382. found := false
  383. //add the individual decisions to the right list
  384. for idx, alert := range alertsFromCapi {
  385. if *decision.Origin == SCOPE_CAPI {
  386. if *alert.Source.Scope == SCOPE_CAPI {
  387. alertsFromCapi[idx].Decisions = append(alertsFromCapi[idx].Decisions, decision)
  388. found = true
  389. break
  390. }
  391. } else if *decision.Origin == SCOPE_LISTS {
  392. if *alert.Source.Scope == SCOPE_LISTS && *alert.Scenario == *decision.Scenario {
  393. alertsFromCapi[idx].Decisions = append(alertsFromCapi[idx].Decisions, decision)
  394. found = true
  395. break
  396. }
  397. } else {
  398. log.Warningf("unknown origin %s", *decision.Origin)
  399. }
  400. }
  401. if !found {
  402. log.Warningf("Orphaned decision for %s - %s", *decision.Origin, *decision.Scenario)
  403. }
  404. }
  405. for idx, alert := range alertsFromCapi {
  406. formatted_update := ""
  407. if *alertsFromCapi[idx].Source.Scope == SCOPE_CAPI {
  408. *alertsFromCapi[idx].Source.Scope = SCOPE_CAPI_ALIAS
  409. formatted_update = fmt.Sprintf("update : +%d/-%d IPs", add_counters[SCOPE_CAPI]["all"], delete_counters[SCOPE_CAPI]["all"])
  410. } else if *alertsFromCapi[idx].Source.Scope == SCOPE_LISTS {
  411. *alertsFromCapi[idx].Source.Scope = fmt.Sprintf("%s:%s", SCOPE_LISTS, *alertsFromCapi[idx].Scenario)
  412. formatted_update = fmt.Sprintf("update : +%d/-%d IPs", add_counters[SCOPE_LISTS][*alert.Scenario], delete_counters[SCOPE_LISTS][*alert.Scenario])
  413. }
  414. alertsFromCapi[idx].Scenario = types.StrPtr(formatted_update)
  415. log.Debugf("%s has %d decisions", *alertsFromCapi[idx].Source.Scope, len(alertsFromCapi[idx].Decisions))
  416. alertID, inserted, deleted, err := a.dbClient.UpdateCommunityBlocklist(alertsFromCapi[idx])
  417. if err != nil {
  418. return errors.Wrapf(err, "while saving alert from %s", *alertsFromCapi[idx].Source.Scope)
  419. }
  420. log.Printf("%s : added %d entries, deleted %d entries (alert:%d)", *alertsFromCapi[idx].Source.Scope, inserted, deleted, alertID)
  421. }
  422. return nil
  423. }
  424. func (a *apic) Pull() error {
  425. defer types.CatchPanic("lapi/pullFromAPIC")
  426. log.Infof("start crowdsec api pull (interval: %s)", PullInterval)
  427. var err error
  428. scenario := a.scenarioList
  429. toldOnce := false
  430. for {
  431. if len(scenario) > 0 {
  432. break
  433. }
  434. if !toldOnce {
  435. log.Warningf("scenario list is empty, will not pull yet")
  436. toldOnce = true
  437. }
  438. time.Sleep(1 * time.Second)
  439. scenario, err = a.FetchScenariosListFromDB()
  440. if err != nil {
  441. log.Errorf("unable to fetch scenarios from db: %s", err)
  442. }
  443. }
  444. if err := a.PullTop(); err != nil {
  445. log.Errorf("capi pull top: %s", err)
  446. }
  447. ticker := time.NewTicker(a.pullInterval)
  448. for {
  449. select {
  450. case <-ticker.C:
  451. if err := a.PullTop(); err != nil {
  452. log.Errorf("capi pull top: %s", err)
  453. continue
  454. }
  455. case <-a.pullTomb.Dying(): // if one apic routine is dying, do we kill the others?
  456. a.metricsTomb.Kill(nil)
  457. a.pushTomb.Kill(nil)
  458. return nil
  459. }
  460. }
  461. }
  462. func (a *apic) GetMetrics() (*models.Metrics, error) {
  463. version := cwversion.VersionStr()
  464. metric := &models.Metrics{
  465. ApilVersion: &version,
  466. Machines: make([]*models.MetricsAgentInfo, 0),
  467. Bouncers: make([]*models.MetricsBouncerInfo, 0),
  468. }
  469. machines, err := a.dbClient.ListMachines()
  470. if err != nil {
  471. return metric, err
  472. }
  473. bouncers, err := a.dbClient.ListBouncers()
  474. if err != nil {
  475. return metric, err
  476. }
  477. for _, machine := range machines {
  478. m := &models.MetricsAgentInfo{
  479. Version: machine.Version,
  480. Name: machine.MachineId,
  481. LastUpdate: machine.UpdatedAt.String(),
  482. LastPush: machine.LastPush.String(),
  483. }
  484. metric.Machines = append(metric.Machines, m)
  485. }
  486. for _, bouncer := range bouncers {
  487. m := &models.MetricsBouncerInfo{
  488. Version: bouncer.Version,
  489. CustomName: bouncer.Name,
  490. Name: bouncer.Type,
  491. LastPull: bouncer.LastPull.String(),
  492. }
  493. metric.Bouncers = append(metric.Bouncers, m)
  494. }
  495. return metric, nil
  496. }
  497. func (a *apic) SendMetrics() error {
  498. defer types.CatchPanic("lapi/metricsToAPIC")
  499. metrics, err := a.GetMetrics()
  500. if err != nil {
  501. log.Errorf("unable to get metrics (%s), will retry", err)
  502. }
  503. _, _, err = a.apiClient.Metrics.Add(context.Background(), metrics)
  504. if err != nil {
  505. log.Errorf("unable to send metrics (%s), will retry", err)
  506. }
  507. log.Infof("capi metrics: metrics sent successfully")
  508. log.Infof("start crowdsec api send metrics (interval: %s)", MetricsInterval)
  509. ticker := time.NewTicker(a.metricsInterval)
  510. for {
  511. select {
  512. case <-ticker.C:
  513. metrics, err := a.GetMetrics()
  514. if err != nil {
  515. log.Errorf("unable to get metrics (%s), will retry", err)
  516. }
  517. _, _, err = a.apiClient.Metrics.Add(context.Background(), metrics)
  518. if err != nil {
  519. log.Errorf("capi metrics: failed: %s", err.Error())
  520. } else {
  521. log.Infof("capi metrics: metrics sent successfully")
  522. }
  523. case <-a.metricsTomb.Dying(): // if one apic routine is dying, do we kill the others?
  524. a.pullTomb.Kill(nil)
  525. a.pushTomb.Kill(nil)
  526. return nil
  527. }
  528. }
  529. }
  530. func (a *apic) Shutdown() {
  531. a.pushTomb.Kill(nil)
  532. a.pullTomb.Kill(nil)
  533. a.metricsTomb.Kill(nil)
  534. }