apic.go 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390
  1. package apiserver
  2. import (
  3. "context"
  4. "fmt"
  5. "net/url"
  6. "strings"
  7. "sync"
  8. "time"
  9. "github.com/crowdsecurity/crowdsec/pkg/apiclient"
  10. "github.com/crowdsecurity/crowdsec/pkg/apiserver/controllers"
  11. "github.com/crowdsecurity/crowdsec/pkg/csconfig"
  12. "github.com/crowdsecurity/crowdsec/pkg/cwversion"
  13. "github.com/crowdsecurity/crowdsec/pkg/database"
  14. "github.com/crowdsecurity/crowdsec/pkg/models"
  15. "github.com/crowdsecurity/crowdsec/pkg/types"
  16. "github.com/go-openapi/strfmt"
  17. "github.com/pkg/errors"
  18. log "github.com/sirupsen/logrus"
  19. "gopkg.in/tomb.v2"
  20. )
  21. const (
  22. PullInterval = "2h"
  23. PushInterval = "30s"
  24. MetricsInterval = "30m"
  25. )
  26. type apic struct {
  27. pullInterval time.Duration
  28. pushInterval time.Duration
  29. metricsInterval time.Duration
  30. dbClient *database.Client
  31. apiClient *apiclient.ApiClient
  32. alertToPush chan []*models.Alert
  33. mu sync.Mutex
  34. pushTomb tomb.Tomb
  35. pullTomb tomb.Tomb
  36. metricsTomb tomb.Tomb
  37. startup bool
  38. credentials *csconfig.ApiCredentialsCfg
  39. scenarioList []string
  40. }
  41. func IsInSlice(a string, b []string) bool {
  42. for _, v := range b {
  43. if a == v {
  44. return true
  45. }
  46. }
  47. return false
  48. }
  49. func (a *apic) FetchScenariosListFromDB() ([]string, error) {
  50. scenarios := make([]string, 0)
  51. machines, err := a.dbClient.ListMachines()
  52. if err != nil {
  53. return nil, errors.Wrap(err, "while listing machines")
  54. }
  55. //merge all scenarios together
  56. for _, v := range machines {
  57. machineScenarios := strings.Split(v.Scenarios, ",")
  58. log.Debugf("%d scenarios for machine %d", len(machineScenarios), v.ID)
  59. for _, sv := range machineScenarios {
  60. if !IsInSlice(sv, scenarios) && sv != "" {
  61. scenarios = append(scenarios, sv)
  62. }
  63. }
  64. }
  65. log.Debugf("Returning list of scenarios : %+v", scenarios)
  66. return scenarios, nil
  67. }
  68. func AlertToSignal(alert *models.Alert) *models.AddSignalsRequestItem {
  69. return &models.AddSignalsRequestItem{
  70. Message: alert.Message,
  71. Scenario: alert.Scenario,
  72. ScenarioHash: alert.ScenarioHash,
  73. ScenarioVersion: alert.ScenarioVersion,
  74. Source: alert.Source,
  75. StartAt: alert.StartAt,
  76. StopAt: alert.StopAt,
  77. CreatedAt: alert.CreatedAt,
  78. MachineID: alert.MachineID,
  79. }
  80. }
  81. func NewAPIC(config *csconfig.OnlineApiClientCfg, dbClient *database.Client) (*apic, error) {
  82. var err error
  83. ret := &apic{
  84. alertToPush: make(chan []*models.Alert),
  85. dbClient: dbClient,
  86. mu: sync.Mutex{},
  87. startup: true,
  88. credentials: config.Credentials,
  89. pullTomb: tomb.Tomb{},
  90. pushTomb: tomb.Tomb{},
  91. metricsTomb: tomb.Tomb{},
  92. scenarioList: make([]string, 0),
  93. }
  94. ret.pullInterval, err = time.ParseDuration(PullInterval)
  95. if err != nil {
  96. return ret, err
  97. }
  98. ret.pushInterval, err = time.ParseDuration(PushInterval)
  99. if err != nil {
  100. return ret, err
  101. }
  102. ret.metricsInterval, err = time.ParseDuration(MetricsInterval)
  103. if err != nil {
  104. return ret, err
  105. }
  106. password := strfmt.Password(config.Credentials.Password)
  107. apiURL, err := url.Parse(config.Credentials.URL)
  108. if err != nil {
  109. return nil, errors.Wrapf(err, "while parsing '%s'", config.Credentials.URL)
  110. }
  111. ret.scenarioList, err = ret.FetchScenariosListFromDB()
  112. if err != nil {
  113. return nil, errors.Wrap(err, "while fetching scenarios from db")
  114. }
  115. ret.apiClient, err = apiclient.NewClient(&apiclient.Config{
  116. MachineID: config.Credentials.Login,
  117. Password: password,
  118. UserAgent: fmt.Sprintf("crowdsec/%s", cwversion.VersionStr()),
  119. URL: apiURL,
  120. VersionPrefix: "v2",
  121. Scenarios: ret.scenarioList,
  122. UpdateScenario: ret.FetchScenariosListFromDB,
  123. })
  124. return ret, nil
  125. }
  126. func (a *apic) Push() error {
  127. defer types.CatchPanic("lapi/pushToAPIC")
  128. var cache models.AddSignalsRequest
  129. ticker := time.NewTicker(a.pushInterval)
  130. log.Infof("start crowdsec api push (interval: %s)", PushInterval)
  131. for {
  132. select {
  133. case <-a.pushTomb.Dying(): // if one apic routine is dying, do we kill the others?
  134. a.pullTomb.Kill(nil)
  135. a.metricsTomb.Kill(nil)
  136. log.Infof("push tomb is dying, sending cache (%d elements) before exiting", len(cache))
  137. if len(cache) == 0 {
  138. return nil
  139. }
  140. go a.Send(&cache)
  141. return nil
  142. case <-ticker.C:
  143. if len(cache) > 0 {
  144. a.mu.Lock()
  145. cacheCopy := cache
  146. cache = make(models.AddSignalsRequest, 0)
  147. a.mu.Unlock()
  148. log.Infof("Signal push: %d signals to push", len(cacheCopy))
  149. go a.Send(&cacheCopy)
  150. }
  151. case alerts := <-a.alertToPush:
  152. var signals []*models.AddSignalsRequestItem
  153. for _, alert := range alerts {
  154. signals = append(signals, AlertToSignal(alert))
  155. }
  156. a.mu.Lock()
  157. cache = append(cache, signals...)
  158. a.mu.Unlock()
  159. }
  160. }
  161. }
  162. func (a *apic) Send(cacheOrig *models.AddSignalsRequest) {
  163. /*we do have a problem with this :
  164. The apic.Push background routine reads from alertToPush chan.
  165. This chan is filled by Controller.CreateAlert
  166. If the chan apic.Send hangs, the alertToPush chan will become full,
  167. with means that Controller.CreateAlert is going to hang, blocking API worker(s).
  168. So instead, we prefer to cancel write.
  169. I don't know enough about gin to tell how much of an issue it can be.
  170. */
  171. var cache []*models.AddSignalsRequestItem = *cacheOrig
  172. var send models.AddSignalsRequest
  173. bulkSize := 50
  174. pageStart := 0
  175. pageEnd := bulkSize
  176. for {
  177. if pageEnd >= len(cache) {
  178. send = cache[pageStart:]
  179. ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
  180. defer cancel()
  181. _, _, err := a.apiClient.Signal.Add(ctx, &send)
  182. if err != nil {
  183. log.Errorf("Error while sending final chunk to central API : %s", err)
  184. return
  185. }
  186. break
  187. }
  188. send = cache[pageStart:pageEnd]
  189. ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
  190. defer cancel()
  191. _, _, err := a.apiClient.Signal.Add(ctx, &send)
  192. if err != nil {
  193. //we log it here as well, because the return value of func might be discarded
  194. log.Errorf("Error while sending chunk to central API : %s", err)
  195. }
  196. pageStart += bulkSize
  197. pageEnd += bulkSize
  198. }
  199. }
  200. func (a *apic) PullTop() error {
  201. var err error
  202. data, _, err := a.apiClient.Decisions.GetStream(context.Background(), a.startup)
  203. if err != nil {
  204. return errors.Wrap(err, "get stream")
  205. }
  206. if a.startup {
  207. a.startup = false
  208. }
  209. // process deleted decisions
  210. var filter map[string][]string
  211. for _, decision := range data.Deleted {
  212. if strings.ToLower(*decision.Scope) == "ip" {
  213. filter = make(map[string][]string, 1)
  214. filter["value"] = []string{*decision.Value}
  215. } else {
  216. filter = make(map[string][]string, 3)
  217. filter["value"] = []string{*decision.Value}
  218. filter["type"] = []string{*decision.Type}
  219. filter["value"] = []string{*decision.Scope}
  220. }
  221. nbDeleted, err := a.dbClient.SoftDeleteDecisionsWithFilter(filter)
  222. if err != nil {
  223. return err
  224. }
  225. log.Printf("pull top: deleted %s entries", nbDeleted)
  226. }
  227. alertCreated, err := a.dbClient.Ent.Alert.
  228. Create().
  229. SetScenario(fmt.Sprintf("update : +%d/-%d IPs", len(data.New), len(data.Deleted))).
  230. SetSourceScope("Comunity blocklist").
  231. Save(a.dbClient.CTX)
  232. if err != nil {
  233. return errors.Wrap(err, "create alert from crowdsec-api")
  234. }
  235. // process new decisions
  236. for _, decision := range data.New {
  237. /*ensure scope makes sense no matter what consensus gives*/
  238. if strings.ToLower(*decision.Scope) == "ip" {
  239. *decision.Scope = types.Ip
  240. } else if strings.ToLower(*decision.Scope) == "range" {
  241. *decision.Scope = types.Range
  242. }
  243. duration, err := time.ParseDuration(*decision.Duration)
  244. if err != nil {
  245. return errors.Wrapf(err, "parse decision duration '%s':", *decision.Duration)
  246. }
  247. startIP, endIP, err := controllers.GetIpsFromIpRange(*decision.Value)
  248. if err != nil {
  249. return errors.Wrapf(err, "ip to int '%s':", *decision.Value)
  250. }
  251. _, err = a.dbClient.Ent.Decision.Create().
  252. SetUntil(time.Now().Add(duration)).
  253. SetScenario(*decision.Scenario).
  254. SetType(*decision.Type).
  255. SetStartIP(startIP).
  256. SetEndIP(endIP).
  257. SetValue(*decision.Value).
  258. SetScope(*decision.Scope).
  259. SetOrigin(*decision.Origin).
  260. SetOwner(alertCreated).Save(a.dbClient.CTX)
  261. if err != nil {
  262. return errors.Wrap(err, "decision creation from crowdsec-api:")
  263. }
  264. }
  265. log.Printf("pull top: added %d entries", len(data.New))
  266. return nil
  267. }
  268. func (a *apic) Pull() error {
  269. defer types.CatchPanic("lapi/pullFromAPIC")
  270. log.Infof("start crowdsec api pull (interval: %s)", PullInterval)
  271. var err error
  272. scenario := a.scenarioList
  273. for {
  274. if len(scenario) > 0 {
  275. break
  276. }
  277. log.Warningf("scenario list is empty, will not pull yet")
  278. time.Sleep(1 * time.Second)
  279. scenario, err = a.FetchScenariosListFromDB()
  280. if err != nil {
  281. log.Errorf("unable to fetch scenarios from db: %s", err)
  282. }
  283. }
  284. if err := a.PullTop(); err != nil {
  285. log.Errorf("capi pull top: %s", err)
  286. }
  287. ticker := time.NewTicker(a.pullInterval)
  288. for {
  289. select {
  290. case <-ticker.C:
  291. if err := a.PullTop(); err != nil {
  292. log.Errorf("capi pull top: %s", err)
  293. continue
  294. }
  295. case <-a.pullTomb.Dying(): // if one apic routine is dying, do we kill the others?
  296. a.metricsTomb.Kill(nil)
  297. a.pushTomb.Kill(nil)
  298. return nil
  299. }
  300. }
  301. }
  302. func (a *apic) SendMetrics() error {
  303. defer types.CatchPanic("lapi/metricsToAPIC")
  304. log.Infof("start crowdsec api send metrics (interval: %s)", MetricsInterval)
  305. ticker := time.NewTicker(a.metricsInterval)
  306. for {
  307. select {
  308. case <-ticker.C:
  309. version := cwversion.VersionStr()
  310. metric := &models.Metrics{
  311. ApilVersion: &version,
  312. Machines: make([]*models.MetricsSoftInfo, 0),
  313. Bouncers: make([]*models.MetricsSoftInfo, 0),
  314. }
  315. machines, err := a.dbClient.ListMachines()
  316. if err != nil {
  317. return err
  318. }
  319. bouncers, err := a.dbClient.ListBouncers()
  320. if err != nil {
  321. return err
  322. }
  323. // models.metric structure : len(machines), len(bouncers), a.credentials.Login
  324. // _, _, err := a.apiClient.Metrics.Add(//*models.Metrics)
  325. for _, machine := range machines {
  326. m := &models.MetricsSoftInfo{
  327. Version: machine.Version,
  328. Name: machine.MachineId,
  329. }
  330. metric.Machines = append(metric.Machines, m)
  331. }
  332. for _, bouncer := range bouncers {
  333. m := &models.MetricsSoftInfo{
  334. Version: bouncer.Version,
  335. Name: bouncer.Type,
  336. }
  337. metric.Bouncers = append(metric.Bouncers, m)
  338. }
  339. _, _, err = a.apiClient.Metrics.Add(context.Background(), metric)
  340. if err != nil {
  341. return errors.Wrap(err, "sending metrics failed")
  342. }
  343. log.Infof("capi metrics: metrics sent successfully")
  344. case <-a.metricsTomb.Dying(): // if one apic routine is dying, do we kill the others?
  345. a.pullTomb.Kill(nil)
  346. a.pushTomb.Kill(nil)
  347. return nil
  348. }
  349. }
  350. }
  351. func (a *apic) Shutdown() {
  352. a.pushTomb.Kill(nil)
  353. a.pullTomb.Kill(nil)
  354. a.metricsTomb.Kill(nil)
  355. }