apic.go 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402
  1. package apiserver
  2. import (
  3. "context"
  4. "fmt"
  5. "net/url"
  6. "strings"
  7. "sync"
  8. "time"
  9. "github.com/crowdsecurity/crowdsec/pkg/apiclient"
  10. "github.com/crowdsecurity/crowdsec/pkg/apiserver/controllers"
  11. "github.com/crowdsecurity/crowdsec/pkg/csconfig"
  12. "github.com/crowdsecurity/crowdsec/pkg/cwversion"
  13. "github.com/crowdsecurity/crowdsec/pkg/database"
  14. "github.com/crowdsecurity/crowdsec/pkg/models"
  15. "github.com/crowdsecurity/crowdsec/pkg/types"
  16. "github.com/go-openapi/strfmt"
  17. "github.com/pkg/errors"
  18. log "github.com/sirupsen/logrus"
  19. "gopkg.in/tomb.v2"
  20. )
  21. const (
  22. PullInterval = "2h"
  23. PushInterval = "30s"
  24. MetricsInterval = "30m"
  25. )
  26. type apic struct {
  27. pullInterval time.Duration
  28. pushInterval time.Duration
  29. metricsInterval time.Duration
  30. dbClient *database.Client
  31. apiClient *apiclient.ApiClient
  32. alertToPush chan []*models.Alert
  33. mu sync.Mutex
  34. pushTomb tomb.Tomb
  35. pullTomb tomb.Tomb
  36. metricsTomb tomb.Tomb
  37. startup bool
  38. credentials *csconfig.ApiCredentialsCfg
  39. scenarioList []string
  40. }
  41. func IsInSlice(a string, b []string) bool {
  42. for _, v := range b {
  43. if a == v {
  44. return true
  45. }
  46. }
  47. return false
  48. }
  49. func (a *apic) FetchScenariosListFromDB() ([]string, error) {
  50. scenarios := make([]string, 0)
  51. machines, err := a.dbClient.ListMachines()
  52. if err != nil {
  53. return nil, errors.Wrap(err, "while listing machines")
  54. }
  55. //merge all scenarios together
  56. for _, v := range machines {
  57. machineScenarios := strings.Split(v.Scenarios, ",")
  58. log.Debugf("%d scenarios for machine %d", len(machineScenarios), v.ID)
  59. for _, sv := range machineScenarios {
  60. if !IsInSlice(sv, scenarios) && sv != "" {
  61. scenarios = append(scenarios, sv)
  62. }
  63. }
  64. }
  65. log.Debugf("Returning list of scenarios : %+v", scenarios)
  66. return scenarios, nil
  67. }
  68. func AlertToSignal(alert *models.Alert) *models.AddSignalsRequestItem {
  69. return &models.AddSignalsRequestItem{
  70. Message: alert.Message,
  71. Scenario: alert.Scenario,
  72. ScenarioHash: alert.ScenarioHash,
  73. ScenarioVersion: alert.ScenarioVersion,
  74. Source: alert.Source,
  75. StartAt: alert.StartAt,
  76. StopAt: alert.StopAt,
  77. CreatedAt: alert.CreatedAt,
  78. MachineID: alert.MachineID,
  79. }
  80. }
  81. func NewAPIC(config *csconfig.OnlineApiClientCfg, dbClient *database.Client) (*apic, error) {
  82. var err error
  83. ret := &apic{
  84. alertToPush: make(chan []*models.Alert),
  85. dbClient: dbClient,
  86. mu: sync.Mutex{},
  87. startup: true,
  88. credentials: config.Credentials,
  89. pullTomb: tomb.Tomb{},
  90. pushTomb: tomb.Tomb{},
  91. metricsTomb: tomb.Tomb{},
  92. scenarioList: make([]string, 0),
  93. }
  94. ret.pullInterval, err = time.ParseDuration(PullInterval)
  95. if err != nil {
  96. return ret, err
  97. }
  98. ret.pushInterval, err = time.ParseDuration(PushInterval)
  99. if err != nil {
  100. return ret, err
  101. }
  102. ret.metricsInterval, err = time.ParseDuration(MetricsInterval)
  103. if err != nil {
  104. return ret, err
  105. }
  106. password := strfmt.Password(config.Credentials.Password)
  107. apiURL, err := url.Parse(config.Credentials.URL)
  108. if err != nil {
  109. return nil, errors.Wrapf(err, "while parsing '%s'", config.Credentials.URL)
  110. }
  111. ret.scenarioList, err = ret.FetchScenariosListFromDB()
  112. if err != nil {
  113. return nil, errors.Wrap(err, "while fetching scenarios from db")
  114. }
  115. ret.apiClient, err = apiclient.NewClient(&apiclient.Config{
  116. MachineID: config.Credentials.Login,
  117. Password: password,
  118. UserAgent: fmt.Sprintf("crowdsec/%s", cwversion.VersionStr()),
  119. URL: apiURL,
  120. VersionPrefix: "v2",
  121. Scenarios: ret.scenarioList,
  122. UpdateScenario: ret.FetchScenariosListFromDB,
  123. })
  124. return ret, nil
  125. }
  126. func (a *apic) Push() error {
  127. defer types.CatchPanic("lapi/pushToAPIC")
  128. var cache models.AddSignalsRequest
  129. ticker := time.NewTicker(a.pushInterval)
  130. log.Infof("start crowdsec api push (interval: %s)", PushInterval)
  131. for {
  132. select {
  133. case <-a.pushTomb.Dying(): // if one apic routine is dying, do we kill the others?
  134. a.pullTomb.Kill(nil)
  135. a.metricsTomb.Kill(nil)
  136. log.Infof("push tomb is dying, sending cache (%d elements) before exiting", len(cache))
  137. if len(cache) == 0 {
  138. return nil
  139. }
  140. go a.Send(&cache)
  141. return nil
  142. case <-ticker.C:
  143. if len(cache) > 0 {
  144. a.mu.Lock()
  145. cacheCopy := cache
  146. cache = make(models.AddSignalsRequest, 0)
  147. a.mu.Unlock()
  148. log.Infof("Signal push: %d signals to push", len(cacheCopy))
  149. go a.Send(&cacheCopy)
  150. }
  151. case alerts := <-a.alertToPush:
  152. var signals []*models.AddSignalsRequestItem
  153. for _, alert := range alerts {
  154. /*we're only interested into decisions coming from scenarios of the hub*/
  155. if alert.ScenarioHash == nil || *alert.ScenarioHash == "" {
  156. continue
  157. }
  158. /*and we're not interested into tainted scenarios neither*/
  159. if alert.ScenarioVersion == nil || *alert.ScenarioVersion == "" || *alert.ScenarioVersion == "?" {
  160. continue
  161. }
  162. signals = append(signals, AlertToSignal(alert))
  163. }
  164. a.mu.Lock()
  165. cache = append(cache, signals...)
  166. a.mu.Unlock()
  167. }
  168. }
  169. }
  170. func (a *apic) Send(cacheOrig *models.AddSignalsRequest) {
  171. /*we do have a problem with this :
  172. The apic.Push background routine reads from alertToPush chan.
  173. This chan is filled by Controller.CreateAlert
  174. If the chan apic.Send hangs, the alertToPush chan will become full,
  175. with means that Controller.CreateAlert is going to hang, blocking API worker(s).
  176. So instead, we prefer to cancel write.
  177. I don't know enough about gin to tell how much of an issue it can be.
  178. */
  179. var cache []*models.AddSignalsRequestItem = *cacheOrig
  180. var send models.AddSignalsRequest
  181. bulkSize := 50
  182. pageStart := 0
  183. pageEnd := bulkSize
  184. for {
  185. if pageEnd >= len(cache) {
  186. send = cache[pageStart:]
  187. ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
  188. defer cancel()
  189. _, _, err := a.apiClient.Signal.Add(ctx, &send)
  190. if err != nil {
  191. log.Errorf("Error while sending final chunk to central API : %s", err)
  192. return
  193. }
  194. break
  195. }
  196. send = cache[pageStart:pageEnd]
  197. ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
  198. defer cancel()
  199. _, _, err := a.apiClient.Signal.Add(ctx, &send)
  200. if err != nil {
  201. //we log it here as well, because the return value of func might be discarded
  202. log.Errorf("Error while sending chunk to central API : %s", err)
  203. }
  204. pageStart += bulkSize
  205. pageEnd += bulkSize
  206. }
  207. }
  208. func (a *apic) PullTop() error {
  209. var err error
  210. data, _, err := a.apiClient.Decisions.GetStream(context.Background(), a.startup)
  211. if err != nil {
  212. return errors.Wrap(err, "get stream")
  213. }
  214. if a.startup {
  215. a.startup = false
  216. }
  217. // process deleted decisions
  218. var filter map[string][]string
  219. for _, decision := range data.Deleted {
  220. if strings.ToLower(*decision.Scope) == "ip" {
  221. filter = make(map[string][]string, 1)
  222. filter["value"] = []string{*decision.Value}
  223. } else {
  224. filter = make(map[string][]string, 3)
  225. filter["value"] = []string{*decision.Value}
  226. filter["type"] = []string{*decision.Type}
  227. filter["value"] = []string{*decision.Scope}
  228. }
  229. nbDeleted, err := a.dbClient.SoftDeleteDecisionsWithFilter(filter)
  230. if err != nil {
  231. return err
  232. }
  233. log.Printf("pull top: deleted %s entries", nbDeleted)
  234. }
  235. alertCreated, err := a.dbClient.Ent.Alert.
  236. Create().
  237. SetScenario(fmt.Sprintf("update : +%d/-%d IPs", len(data.New), len(data.Deleted))).
  238. SetSourceScope("Comunity blocklist").
  239. Save(a.dbClient.CTX)
  240. if err != nil {
  241. return errors.Wrap(err, "create alert from crowdsec-api")
  242. }
  243. // process new decisions
  244. for _, decision := range data.New {
  245. /*ensure scope makes sense no matter what consensus gives*/
  246. if strings.ToLower(*decision.Scope) == "ip" {
  247. *decision.Scope = types.Ip
  248. } else if strings.ToLower(*decision.Scope) == "range" {
  249. *decision.Scope = types.Range
  250. }
  251. duration, err := time.ParseDuration(*decision.Duration)
  252. if err != nil {
  253. return errors.Wrapf(err, "parse decision duration '%s':", *decision.Duration)
  254. }
  255. startIP, endIP, err := controllers.GetIpsFromIpRange(*decision.Value)
  256. if err != nil {
  257. return errors.Wrapf(err, "ip to int '%s':", *decision.Value)
  258. }
  259. _, err = a.dbClient.Ent.Decision.Create().
  260. SetUntil(time.Now().Add(duration)).
  261. SetScenario(*decision.Scenario).
  262. SetType(*decision.Type).
  263. SetStartIP(startIP).
  264. SetEndIP(endIP).
  265. SetValue(*decision.Value).
  266. SetScope(*decision.Scope).
  267. SetOrigin(*decision.Origin).
  268. SetOwner(alertCreated).Save(a.dbClient.CTX)
  269. if err != nil {
  270. return errors.Wrap(err, "decision creation from crowdsec-api:")
  271. }
  272. }
  273. log.Printf("pull top: added %d entries", len(data.New))
  274. return nil
  275. }
  276. func (a *apic) Pull() error {
  277. defer types.CatchPanic("lapi/pullFromAPIC")
  278. log.Infof("start crowdsec api pull (interval: %s)", PullInterval)
  279. var err error
  280. scenario := a.scenarioList
  281. toldOnce := false
  282. for {
  283. if len(scenario) > 0 {
  284. break
  285. }
  286. if !toldOnce {
  287. log.Warningf("scenario list is empty, will not pull yet")
  288. toldOnce = true
  289. }
  290. time.Sleep(1 * time.Second)
  291. scenario, err = a.FetchScenariosListFromDB()
  292. if err != nil {
  293. log.Errorf("unable to fetch scenarios from db: %s", err)
  294. }
  295. }
  296. if err := a.PullTop(); err != nil {
  297. log.Errorf("capi pull top: %s", err)
  298. }
  299. ticker := time.NewTicker(a.pullInterval)
  300. for {
  301. select {
  302. case <-ticker.C:
  303. if err := a.PullTop(); err != nil {
  304. log.Errorf("capi pull top: %s", err)
  305. continue
  306. }
  307. case <-a.pullTomb.Dying(): // if one apic routine is dying, do we kill the others?
  308. a.metricsTomb.Kill(nil)
  309. a.pushTomb.Kill(nil)
  310. return nil
  311. }
  312. }
  313. }
  314. func (a *apic) SendMetrics() error {
  315. defer types.CatchPanic("lapi/metricsToAPIC")
  316. log.Infof("start crowdsec api send metrics (interval: %s)", MetricsInterval)
  317. ticker := time.NewTicker(a.metricsInterval)
  318. for {
  319. select {
  320. case <-ticker.C:
  321. version := cwversion.VersionStr()
  322. metric := &models.Metrics{
  323. ApilVersion: &version,
  324. Machines: make([]*models.MetricsSoftInfo, 0),
  325. Bouncers: make([]*models.MetricsSoftInfo, 0),
  326. }
  327. machines, err := a.dbClient.ListMachines()
  328. if err != nil {
  329. return err
  330. }
  331. bouncers, err := a.dbClient.ListBouncers()
  332. if err != nil {
  333. return err
  334. }
  335. // models.metric structure : len(machines), len(bouncers), a.credentials.Login
  336. // _, _, err := a.apiClient.Metrics.Add(//*models.Metrics)
  337. for _, machine := range machines {
  338. m := &models.MetricsSoftInfo{
  339. Version: machine.Version,
  340. Name: machine.MachineId,
  341. }
  342. metric.Machines = append(metric.Machines, m)
  343. }
  344. for _, bouncer := range bouncers {
  345. m := &models.MetricsSoftInfo{
  346. Version: bouncer.Version,
  347. Name: bouncer.Type,
  348. }
  349. metric.Bouncers = append(metric.Bouncers, m)
  350. }
  351. _, _, err = a.apiClient.Metrics.Add(context.Background(), metric)
  352. if err != nil {
  353. return errors.Wrap(err, "sending metrics failed")
  354. }
  355. log.Infof("capi metrics: metrics sent successfully")
  356. case <-a.metricsTomb.Dying(): // if one apic routine is dying, do we kill the others?
  357. a.pullTomb.Kill(nil)
  358. a.pushTomb.Kill(nil)
  359. return nil
  360. }
  361. }
  362. }
  363. func (a *apic) Shutdown() {
  364. a.pushTomb.Kill(nil)
  365. a.pullTomb.Kill(nil)
  366. a.metricsTomb.Kill(nil)
  367. }