apic.go 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458
  1. package apiserver
  2. import (
  3. "context"
  4. "fmt"
  5. "net/url"
  6. "strconv"
  7. "strings"
  8. "sync"
  9. "time"
  10. "github.com/crowdsecurity/crowdsec/pkg/apiclient"
  11. "github.com/crowdsecurity/crowdsec/pkg/csconfig"
  12. "github.com/crowdsecurity/crowdsec/pkg/cwversion"
  13. "github.com/crowdsecurity/crowdsec/pkg/database"
  14. "github.com/crowdsecurity/crowdsec/pkg/database/ent/alert"
  15. "github.com/crowdsecurity/crowdsec/pkg/database/ent/decision"
  16. "github.com/crowdsecurity/crowdsec/pkg/models"
  17. "github.com/crowdsecurity/crowdsec/pkg/types"
  18. "github.com/go-openapi/strfmt"
  19. "github.com/pkg/errors"
  20. log "github.com/sirupsen/logrus"
  21. "gopkg.in/tomb.v2"
  22. )
  23. const (
  24. PullInterval = "2h"
  25. PushInterval = "30s"
  26. MetricsInterval = "30m"
  27. )
  28. type apic struct {
  29. pullInterval time.Duration
  30. pushInterval time.Duration
  31. metricsInterval time.Duration
  32. dbClient *database.Client
  33. apiClient *apiclient.ApiClient
  34. alertToPush chan []*models.Alert
  35. mu sync.Mutex
  36. pushTomb tomb.Tomb
  37. pullTomb tomb.Tomb
  38. metricsTomb tomb.Tomb
  39. startup bool
  40. credentials *csconfig.ApiCredentialsCfg
  41. scenarioList []string
  42. consoleConfig *csconfig.ConsoleConfig
  43. decisionsToDelete chan models.Decision
  44. }
  45. func IsInSlice(a string, b []string) bool {
  46. for _, v := range b {
  47. if a == v {
  48. return true
  49. }
  50. }
  51. return false
  52. }
  53. func (a *apic) FetchScenariosListFromDB() ([]string, error) {
  54. scenarios := make([]string, 0)
  55. machines, err := a.dbClient.ListMachines()
  56. if err != nil {
  57. return nil, errors.Wrap(err, "while listing machines")
  58. }
  59. //merge all scenarios together
  60. for _, v := range machines {
  61. machineScenarios := strings.Split(v.Scenarios, ",")
  62. log.Debugf("%d scenarios for machine %d", len(machineScenarios), v.ID)
  63. for _, sv := range machineScenarios {
  64. if !IsInSlice(sv, scenarios) && sv != "" {
  65. scenarios = append(scenarios, sv)
  66. }
  67. }
  68. }
  69. log.Debugf("Returning list of scenarios : %+v", scenarios)
  70. return scenarios, nil
  71. }
  72. func AlertToSignal(alert *models.Alert, scenarioTrust string, keepDecisions bool) *models.AddSignalsRequestItem {
  73. signal := &models.AddSignalsRequestItem{
  74. Message: alert.Message,
  75. Scenario: alert.Scenario,
  76. ScenarioHash: alert.ScenarioHash,
  77. ScenarioVersion: alert.ScenarioVersion,
  78. Source: alert.Source,
  79. StartAt: alert.StartAt,
  80. StopAt: alert.StopAt,
  81. CreatedAt: alert.CreatedAt,
  82. MachineID: alert.MachineID,
  83. ScenarioTrust: &scenarioTrust,
  84. }
  85. if keepDecisions {
  86. log.Debugf("Keeping decisions to send to CAPI")
  87. signal.Decisions = alert.Decisions
  88. }
  89. return signal
  90. }
  91. func NewAPIC(config *csconfig.OnlineApiClientCfg, dbClient *database.Client, consoleConfig *csconfig.ConsoleConfig) (*apic, error) {
  92. var err error
  93. ret := &apic{
  94. alertToPush: make(chan []*models.Alert),
  95. dbClient: dbClient,
  96. mu: sync.Mutex{},
  97. startup: true,
  98. credentials: config.Credentials,
  99. pullTomb: tomb.Tomb{},
  100. pushTomb: tomb.Tomb{},
  101. metricsTomb: tomb.Tomb{},
  102. scenarioList: make([]string, 0),
  103. decisionsToDelete: make(chan models.Decision),
  104. consoleConfig: consoleConfig,
  105. }
  106. ret.pullInterval, err = time.ParseDuration(PullInterval)
  107. if err != nil {
  108. return ret, err
  109. }
  110. ret.pushInterval, err = time.ParseDuration(PushInterval)
  111. if err != nil {
  112. return ret, err
  113. }
  114. ret.metricsInterval, err = time.ParseDuration(MetricsInterval)
  115. if err != nil {
  116. return ret, err
  117. }
  118. password := strfmt.Password(config.Credentials.Password)
  119. apiURL, err := url.Parse(config.Credentials.URL)
  120. if err != nil {
  121. return nil, errors.Wrapf(err, "while parsing '%s'", config.Credentials.URL)
  122. }
  123. ret.scenarioList, err = ret.FetchScenariosListFromDB()
  124. if err != nil {
  125. return nil, errors.Wrap(err, "while fetching scenarios from db")
  126. }
  127. ret.apiClient, err = apiclient.NewClient(&apiclient.Config{
  128. MachineID: config.Credentials.Login,
  129. Password: password,
  130. UserAgent: fmt.Sprintf("crowdsec/%s", cwversion.VersionStr()),
  131. URL: apiURL,
  132. VersionPrefix: "v2",
  133. Scenarios: ret.scenarioList,
  134. UpdateScenario: ret.FetchScenariosListFromDB,
  135. })
  136. return ret, err
  137. }
  138. func (a *apic) Push() error {
  139. defer types.CatchPanic("lapi/pushToAPIC")
  140. var cache models.AddSignalsRequest
  141. ticker := time.NewTicker(a.pushInterval)
  142. log.Infof("start crowdsec api push (interval: %s)", PushInterval)
  143. for {
  144. select {
  145. case <-a.pushTomb.Dying(): // if one apic routine is dying, do we kill the others?
  146. a.pullTomb.Kill(nil)
  147. a.metricsTomb.Kill(nil)
  148. log.Infof("push tomb is dying, sending cache (%d elements) before exiting", len(cache))
  149. if len(cache) == 0 {
  150. return nil
  151. }
  152. go a.Send(&cache)
  153. return nil
  154. case <-ticker.C:
  155. if len(cache) > 0 {
  156. a.mu.Lock()
  157. cacheCopy := cache
  158. cache = make(models.AddSignalsRequest, 0)
  159. a.mu.Unlock()
  160. log.Infof("Signal push: %d signals to push", len(cacheCopy))
  161. go a.Send(&cacheCopy)
  162. }
  163. case alerts := <-a.alertToPush:
  164. var signals []*models.AddSignalsRequestItem
  165. for _, alert := range alerts {
  166. if *alert.Simulated {
  167. log.Debugf("simulation enabled for alert (id:%d), will not be sent to CAPI", alert.ID)
  168. continue
  169. }
  170. scenarioTrust := "certified"
  171. if alert.ScenarioHash == nil || *alert.ScenarioHash == "" {
  172. scenarioTrust = "custom"
  173. }
  174. if alert.ScenarioVersion == nil || *alert.ScenarioVersion == "" || *alert.ScenarioVersion == "?" {
  175. scenarioTrust = "tainted"
  176. }
  177. if len(alert.Decisions) > 0 {
  178. if *alert.Decisions[0].Origin == "cscli" {
  179. scenarioTrust = "manual"
  180. }
  181. }
  182. switch scenarioTrust {
  183. case "manual":
  184. if !*a.consoleConfig.ShareManualDecisions {
  185. log.Debugf("manual decision generated an alert, doesn't send it to CAPI because options is disabled")
  186. continue
  187. }
  188. case "tainted":
  189. if !*a.consoleConfig.ShareTaintedScenarios {
  190. log.Debugf("tainted scenario generated an alert, doesn't send it to CAPI because options is disabled")
  191. continue
  192. }
  193. case "custom":
  194. if !*a.consoleConfig.ShareCustomScenarios {
  195. log.Debugf("custom scenario generated an alert, doesn't send it to CAPI because options is disabled")
  196. continue
  197. }
  198. }
  199. log.Infof("Add signals for '%s' alert", scenarioTrust)
  200. signals = append(signals, AlertToSignal(alert, scenarioTrust, *a.consoleConfig.ShareDecisions))
  201. }
  202. a.mu.Lock()
  203. cache = append(cache, signals...)
  204. a.mu.Unlock()
  205. }
  206. }
  207. }
  208. func (a *apic) Send(cacheOrig *models.AddSignalsRequest) {
  209. /*we do have a problem with this :
  210. The apic.Push background routine reads from alertToPush chan.
  211. This chan is filled by Controller.CreateAlert
  212. If the chan apic.Send hangs, the alertToPush chan will become full,
  213. with means that Controller.CreateAlert is going to hang, blocking API worker(s).
  214. So instead, we prefer to cancel write.
  215. I don't know enough about gin to tell how much of an issue it can be.
  216. */
  217. var cache []*models.AddSignalsRequestItem = *cacheOrig
  218. var send models.AddSignalsRequest
  219. bulkSize := 50
  220. pageStart := 0
  221. pageEnd := bulkSize
  222. for {
  223. if pageEnd >= len(cache) {
  224. send = cache[pageStart:]
  225. ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
  226. defer cancel()
  227. _, _, err := a.apiClient.Signal.Add(ctx, &send)
  228. if err != nil {
  229. log.Errorf("Error while sending final chunk to central API : %s", err)
  230. return
  231. }
  232. break
  233. }
  234. send = cache[pageStart:pageEnd]
  235. ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
  236. defer cancel()
  237. _, _, err := a.apiClient.Signal.Add(ctx, &send)
  238. if err != nil {
  239. //we log it here as well, because the return value of func might be discarded
  240. log.Errorf("Error while sending chunk to central API : %s", err)
  241. }
  242. pageStart += bulkSize
  243. pageEnd += bulkSize
  244. }
  245. }
  246. func (a *apic) PullTop() error {
  247. var err error
  248. /*only pull community blocklist if it's older than 1h30 */
  249. alerts := a.dbClient.Ent.Alert.Query()
  250. alerts = alerts.Where(alert.HasDecisionsWith(decision.OriginEQ(database.CapiMachineID)))
  251. alerts = alerts.Where(alert.CreatedAtGTE(time.Now().Add(-time.Duration(1*time.Hour + 30*time.Minute))))
  252. count, err := alerts.Count(a.dbClient.CTX)
  253. if err != nil {
  254. return errors.Wrap(err, "while looking for CAPI alert")
  255. }
  256. if count > 0 {
  257. log.Printf("last CAPI pull is newer than 1h30, skip.")
  258. return nil
  259. }
  260. data, _, err := a.apiClient.Decisions.GetStream(context.Background(), a.startup, []string{})
  261. if err != nil {
  262. return errors.Wrap(err, "get stream")
  263. }
  264. if a.startup {
  265. a.startup = false
  266. }
  267. // process deleted decisions
  268. var filter map[string][]string
  269. var nbDeleted int
  270. for _, decision := range data.Deleted {
  271. if strings.ToLower(*decision.Scope) == "ip" {
  272. filter = make(map[string][]string, 1)
  273. filter["value"] = []string{*decision.Value}
  274. } else {
  275. filter = make(map[string][]string, 3)
  276. filter["value"] = []string{*decision.Value}
  277. filter["type"] = []string{*decision.Type}
  278. filter["value"] = []string{*decision.Scope}
  279. }
  280. dbCliRet, err := a.dbClient.SoftDeleteDecisionsWithFilter(filter)
  281. if err != nil {
  282. return errors.Wrap(err, "deleting decisions error")
  283. }
  284. dbCliDel, err := strconv.Atoi(dbCliRet)
  285. if err != nil {
  286. return errors.Wrapf(err, "converting db ret %d", dbCliDel)
  287. }
  288. nbDeleted += dbCliDel
  289. }
  290. log.Printf("capi/community-blocklist : %d explicit deletions", nbDeleted)
  291. if len(data.New) == 0 {
  292. log.Warnf("capi/community-blocklist : received 0 new entries, CAPI failure ?")
  293. return nil
  294. }
  295. capiPullTopX := models.Alert{}
  296. capiPullTopX.Scenario = types.StrPtr(fmt.Sprintf("update : +%d/-%d IPs", len(data.New), len(data.Deleted)))
  297. capiPullTopX.Message = types.StrPtr("")
  298. capiPullTopX.Source = &models.Source{}
  299. capiPullTopX.Source.Scope = types.StrPtr("crowdsec/community-blocklist")
  300. capiPullTopX.Source.Value = types.StrPtr("")
  301. capiPullTopX.StartAt = types.StrPtr(time.Now().Format(time.RFC3339))
  302. capiPullTopX.StopAt = types.StrPtr(time.Now().Format(time.RFC3339))
  303. capiPullTopX.Capacity = types.Int32Ptr(0)
  304. capiPullTopX.Simulated = types.BoolPtr(false)
  305. capiPullTopX.EventsCount = types.Int32Ptr(int32(len(data.New)))
  306. capiPullTopX.Leakspeed = types.StrPtr("")
  307. capiPullTopX.ScenarioHash = types.StrPtr("")
  308. capiPullTopX.ScenarioVersion = types.StrPtr("")
  309. capiPullTopX.MachineID = database.CapiMachineID
  310. // process new decisions
  311. for _, decision := range data.New {
  312. /*CAPI might send lower case scopes, unify it.*/
  313. switch strings.ToLower(*decision.Scope) {
  314. case "ip":
  315. *decision.Scope = types.Ip
  316. case "range":
  317. *decision.Scope = types.Range
  318. }
  319. capiPullTopX.Decisions = append(capiPullTopX.Decisions, decision)
  320. }
  321. alertID, inserted, deleted, err := a.dbClient.UpdateCommunityBlocklist(&capiPullTopX)
  322. if err != nil {
  323. return errors.Wrap(err, "while saving alert from capi/community-blocklist")
  324. }
  325. log.Printf("capi/community-blocklist : added %d entries, deleted %d entries (alert:%d)", inserted, deleted, alertID)
  326. return nil
  327. }
  328. func (a *apic) Pull() error {
  329. defer types.CatchPanic("lapi/pullFromAPIC")
  330. log.Infof("start crowdsec api pull (interval: %s)", PullInterval)
  331. var err error
  332. scenario := a.scenarioList
  333. toldOnce := false
  334. for {
  335. if len(scenario) > 0 {
  336. break
  337. }
  338. if !toldOnce {
  339. log.Warningf("scenario list is empty, will not pull yet")
  340. toldOnce = true
  341. }
  342. time.Sleep(1 * time.Second)
  343. scenario, err = a.FetchScenariosListFromDB()
  344. if err != nil {
  345. log.Errorf("unable to fetch scenarios from db: %s", err)
  346. }
  347. }
  348. if err := a.PullTop(); err != nil {
  349. log.Errorf("capi pull top: %s", err)
  350. }
  351. ticker := time.NewTicker(a.pullInterval)
  352. for {
  353. select {
  354. case <-ticker.C:
  355. if err := a.PullTop(); err != nil {
  356. log.Errorf("capi pull top: %s", err)
  357. continue
  358. }
  359. case <-a.pullTomb.Dying(): // if one apic routine is dying, do we kill the others?
  360. a.metricsTomb.Kill(nil)
  361. a.pushTomb.Kill(nil)
  362. return nil
  363. }
  364. }
  365. }
  366. func (a *apic) SendMetrics() error {
  367. defer types.CatchPanic("lapi/metricsToAPIC")
  368. log.Infof("start crowdsec api send metrics (interval: %s)", MetricsInterval)
  369. ticker := time.NewTicker(a.metricsInterval)
  370. for {
  371. select {
  372. case <-ticker.C:
  373. version := cwversion.VersionStr()
  374. metric := &models.Metrics{
  375. ApilVersion: &version,
  376. Machines: make([]*models.MetricsAgentInfo, 0),
  377. Bouncers: make([]*models.MetricsBouncerInfo, 0),
  378. }
  379. machines, err := a.dbClient.ListMachines()
  380. if err != nil {
  381. return err
  382. }
  383. bouncers, err := a.dbClient.ListBouncers()
  384. if err != nil {
  385. return err
  386. }
  387. for _, machine := range machines {
  388. m := &models.MetricsAgentInfo{
  389. Version: machine.Version,
  390. Name: machine.MachineId,
  391. LastUpdate: machine.UpdatedAt.String(),
  392. }
  393. metric.Machines = append(metric.Machines, m)
  394. }
  395. for _, bouncer := range bouncers {
  396. m := &models.MetricsBouncerInfo{
  397. Version: bouncer.Version,
  398. Name: bouncer.Name,
  399. Type: bouncer.Type,
  400. LastPull: bouncer.LastPull.String(),
  401. }
  402. metric.Bouncers = append(metric.Bouncers, m)
  403. }
  404. _, _, err = a.apiClient.Metrics.Add(context.Background(), metric)
  405. if err != nil {
  406. return errors.Wrap(err, "sending metrics failed")
  407. }
  408. log.Infof("capi metrics: metrics sent successfully")
  409. case <-a.metricsTomb.Dying(): // if one apic routine is dying, do we kill the others?
  410. a.pullTomb.Kill(nil)
  411. a.pushTomb.Kill(nil)
  412. return nil
  413. }
  414. }
  415. }
  416. func (a *apic) Shutdown() {
  417. a.pushTomb.Kill(nil)
  418. a.pullTomb.Kill(nil)
  419. a.metricsTomb.Kill(nil)
  420. }