apic.go 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408
  1. package apiserver
  2. import (
  3. "context"
  4. "fmt"
  5. "net/url"
  6. "strings"
  7. "sync"
  8. "time"
  9. "github.com/crowdsecurity/crowdsec/pkg/apiclient"
  10. "github.com/crowdsecurity/crowdsec/pkg/csconfig"
  11. "github.com/crowdsecurity/crowdsec/pkg/cwversion"
  12. "github.com/crowdsecurity/crowdsec/pkg/database"
  13. "github.com/crowdsecurity/crowdsec/pkg/models"
  14. "github.com/crowdsecurity/crowdsec/pkg/types"
  15. "github.com/go-openapi/strfmt"
  16. "github.com/pkg/errors"
  17. log "github.com/sirupsen/logrus"
  18. "gopkg.in/tomb.v2"
  19. )
  20. const (
  21. PullInterval = "2h"
  22. PushInterval = "30s"
  23. MetricsInterval = "30m"
  24. )
  25. type apic struct {
  26. pullInterval time.Duration
  27. pushInterval time.Duration
  28. metricsInterval time.Duration
  29. dbClient *database.Client
  30. apiClient *apiclient.ApiClient
  31. alertToPush chan []*models.Alert
  32. mu sync.Mutex
  33. pushTomb tomb.Tomb
  34. pullTomb tomb.Tomb
  35. metricsTomb tomb.Tomb
  36. startup bool
  37. credentials *csconfig.ApiCredentialsCfg
  38. scenarioList []string
  39. }
  40. func IsInSlice(a string, b []string) bool {
  41. for _, v := range b {
  42. if a == v {
  43. return true
  44. }
  45. }
  46. return false
  47. }
  48. func (a *apic) FetchScenariosListFromDB() ([]string, error) {
  49. scenarios := make([]string, 0)
  50. machines, err := a.dbClient.ListMachines()
  51. if err != nil {
  52. return nil, errors.Wrap(err, "while listing machines")
  53. }
  54. //merge all scenarios together
  55. for _, v := range machines {
  56. machineScenarios := strings.Split(v.Scenarios, ",")
  57. log.Debugf("%d scenarios for machine %d", len(machineScenarios), v.ID)
  58. for _, sv := range machineScenarios {
  59. if !IsInSlice(sv, scenarios) && sv != "" {
  60. scenarios = append(scenarios, sv)
  61. }
  62. }
  63. }
  64. log.Debugf("Returning list of scenarios : %+v", scenarios)
  65. return scenarios, nil
  66. }
  67. func AlertToSignal(alert *models.Alert) *models.AddSignalsRequestItem {
  68. return &models.AddSignalsRequestItem{
  69. Message: alert.Message,
  70. Scenario: alert.Scenario,
  71. ScenarioHash: alert.ScenarioHash,
  72. ScenarioVersion: alert.ScenarioVersion,
  73. Source: alert.Source,
  74. StartAt: alert.StartAt,
  75. StopAt: alert.StopAt,
  76. CreatedAt: alert.CreatedAt,
  77. MachineID: alert.MachineID,
  78. }
  79. }
  80. func NewAPIC(config *csconfig.OnlineApiClientCfg, dbClient *database.Client) (*apic, error) {
  81. var err error
  82. ret := &apic{
  83. alertToPush: make(chan []*models.Alert),
  84. dbClient: dbClient,
  85. mu: sync.Mutex{},
  86. startup: true,
  87. credentials: config.Credentials,
  88. pullTomb: tomb.Tomb{},
  89. pushTomb: tomb.Tomb{},
  90. metricsTomb: tomb.Tomb{},
  91. scenarioList: make([]string, 0),
  92. }
  93. ret.pullInterval, err = time.ParseDuration(PullInterval)
  94. if err != nil {
  95. return ret, err
  96. }
  97. ret.pushInterval, err = time.ParseDuration(PushInterval)
  98. if err != nil {
  99. return ret, err
  100. }
  101. ret.metricsInterval, err = time.ParseDuration(MetricsInterval)
  102. if err != nil {
  103. return ret, err
  104. }
  105. password := strfmt.Password(config.Credentials.Password)
  106. apiURL, err := url.Parse(config.Credentials.URL)
  107. if err != nil {
  108. return nil, errors.Wrapf(err, "while parsing '%s'", config.Credentials.URL)
  109. }
  110. ret.scenarioList, err = ret.FetchScenariosListFromDB()
  111. if err != nil {
  112. return nil, errors.Wrap(err, "while fetching scenarios from db")
  113. }
  114. ret.apiClient, err = apiclient.NewClient(&apiclient.Config{
  115. MachineID: config.Credentials.Login,
  116. Password: password,
  117. UserAgent: fmt.Sprintf("crowdsec/%s", cwversion.VersionStr()),
  118. URL: apiURL,
  119. VersionPrefix: "v2",
  120. Scenarios: ret.scenarioList,
  121. UpdateScenario: ret.FetchScenariosListFromDB,
  122. })
  123. return ret, err
  124. }
  125. func (a *apic) Push() error {
  126. defer types.CatchPanic("lapi/pushToAPIC")
  127. var cache models.AddSignalsRequest
  128. ticker := time.NewTicker(a.pushInterval)
  129. log.Infof("start crowdsec api push (interval: %s)", PushInterval)
  130. for {
  131. select {
  132. case <-a.pushTomb.Dying(): // if one apic routine is dying, do we kill the others?
  133. a.pullTomb.Kill(nil)
  134. a.metricsTomb.Kill(nil)
  135. log.Infof("push tomb is dying, sending cache (%d elements) before exiting", len(cache))
  136. if len(cache) == 0 {
  137. return nil
  138. }
  139. go a.Send(&cache)
  140. return nil
  141. case <-ticker.C:
  142. if len(cache) > 0 {
  143. a.mu.Lock()
  144. cacheCopy := cache
  145. cache = make(models.AddSignalsRequest, 0)
  146. a.mu.Unlock()
  147. log.Infof("Signal push: %d signals to push", len(cacheCopy))
  148. go a.Send(&cacheCopy)
  149. }
  150. case alerts := <-a.alertToPush:
  151. var signals []*models.AddSignalsRequestItem
  152. for _, alert := range alerts {
  153. /*we're only interested into decisions coming from scenarios of the hub*/
  154. if alert.ScenarioHash == nil || *alert.ScenarioHash == "" {
  155. continue
  156. }
  157. /*and we're not interested into tainted scenarios neither*/
  158. if alert.ScenarioVersion == nil || *alert.ScenarioVersion == "" || *alert.ScenarioVersion == "?" {
  159. continue
  160. }
  161. /*we also ignore alerts in simulated mode*/
  162. if *alert.Simulated {
  163. log.Debugf("simulation enabled for alert (id:%d), will not be sent to CAPI", alert.ID)
  164. continue
  165. }
  166. signals = append(signals, AlertToSignal(alert))
  167. }
  168. a.mu.Lock()
  169. cache = append(cache, signals...)
  170. a.mu.Unlock()
  171. }
  172. }
  173. }
  174. func (a *apic) Send(cacheOrig *models.AddSignalsRequest) {
  175. /*we do have a problem with this :
  176. The apic.Push background routine reads from alertToPush chan.
  177. This chan is filled by Controller.CreateAlert
  178. If the chan apic.Send hangs, the alertToPush chan will become full,
  179. with means that Controller.CreateAlert is going to hang, blocking API worker(s).
  180. So instead, we prefer to cancel write.
  181. I don't know enough about gin to tell how much of an issue it can be.
  182. */
  183. var cache []*models.AddSignalsRequestItem = *cacheOrig
  184. var send models.AddSignalsRequest
  185. bulkSize := 50
  186. pageStart := 0
  187. pageEnd := bulkSize
  188. for {
  189. if pageEnd >= len(cache) {
  190. send = cache[pageStart:]
  191. ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
  192. defer cancel()
  193. _, _, err := a.apiClient.Signal.Add(ctx, &send)
  194. if err != nil {
  195. log.Errorf("Error while sending final chunk to central API : %s", err)
  196. return
  197. }
  198. break
  199. }
  200. send = cache[pageStart:pageEnd]
  201. ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
  202. defer cancel()
  203. _, _, err := a.apiClient.Signal.Add(ctx, &send)
  204. if err != nil {
  205. //we log it here as well, because the return value of func might be discarded
  206. log.Errorf("Error while sending chunk to central API : %s", err)
  207. }
  208. pageStart += bulkSize
  209. pageEnd += bulkSize
  210. }
  211. }
  212. func (a *apic) PullTop() error {
  213. var err error
  214. data, _, err := a.apiClient.Decisions.GetStream(context.Background(), a.startup, []string{})
  215. if err != nil {
  216. return errors.Wrap(err, "get stream")
  217. }
  218. if a.startup {
  219. a.startup = false
  220. }
  221. // process deleted decisions
  222. var filter map[string][]string
  223. for _, decision := range data.Deleted {
  224. if strings.ToLower(*decision.Scope) == "ip" {
  225. filter = make(map[string][]string, 1)
  226. filter["value"] = []string{*decision.Value}
  227. } else {
  228. filter = make(map[string][]string, 3)
  229. filter["value"] = []string{*decision.Value}
  230. filter["type"] = []string{*decision.Type}
  231. filter["value"] = []string{*decision.Scope}
  232. }
  233. nbDeleted, err := a.dbClient.SoftDeleteDecisionsWithFilter(filter)
  234. if err != nil {
  235. return err
  236. }
  237. log.Printf("pull top: deleted %s entries", nbDeleted)
  238. }
  239. alertCreated, err := a.dbClient.Ent.Alert.
  240. Create().
  241. SetScenario(fmt.Sprintf("update : +%d/-%d IPs", len(data.New), len(data.Deleted))).
  242. SetSourceScope("Community blocklist").
  243. Save(a.dbClient.CTX)
  244. if err != nil {
  245. return errors.Wrap(err, "create alert from crowdsec-api")
  246. }
  247. // process new decisions
  248. for _, decision := range data.New {
  249. var start_ip, start_sfx, end_ip, end_sfx int64
  250. var sz int
  251. /*if the scope is IP or Range, convert the value to integers */
  252. if strings.ToLower(*decision.Scope) == "ip" || strings.ToLower(*decision.Scope) == "range" {
  253. sz, start_ip, start_sfx, end_ip, end_sfx, err = types.Addr2Ints(*decision.Value)
  254. if err != nil {
  255. return errors.Wrapf(err, "invalid ip/range %s", *decision.Value)
  256. }
  257. }
  258. duration, err := time.ParseDuration(*decision.Duration)
  259. if err != nil {
  260. return errors.Wrapf(err, "parse decision duration '%s':", *decision.Duration)
  261. }
  262. _, err = a.dbClient.Ent.Decision.Create().
  263. SetUntil(time.Now().Add(duration)).
  264. SetScenario(*decision.Scenario).
  265. SetType(*decision.Type).
  266. SetIPSize(int64(sz)).
  267. SetStartIP(start_ip).
  268. SetStartSuffix(start_sfx).
  269. SetEndIP(end_ip).
  270. SetEndSuffix(end_sfx).
  271. SetValue(*decision.Value).
  272. SetScope(*decision.Scope).
  273. SetOrigin(*decision.Origin).
  274. SetOwner(alertCreated).Save(a.dbClient.CTX)
  275. if err != nil {
  276. return errors.Wrap(err, "decision creation from crowdsec-api:")
  277. }
  278. }
  279. log.Printf("pull top: added %d entries", len(data.New))
  280. return nil
  281. }
  282. func (a *apic) Pull() error {
  283. defer types.CatchPanic("lapi/pullFromAPIC")
  284. log.Infof("start crowdsec api pull (interval: %s)", PullInterval)
  285. var err error
  286. scenario := a.scenarioList
  287. toldOnce := false
  288. for {
  289. if len(scenario) > 0 {
  290. break
  291. }
  292. if !toldOnce {
  293. log.Warningf("scenario list is empty, will not pull yet")
  294. toldOnce = true
  295. }
  296. time.Sleep(1 * time.Second)
  297. scenario, err = a.FetchScenariosListFromDB()
  298. if err != nil {
  299. log.Errorf("unable to fetch scenarios from db: %s", err)
  300. }
  301. }
  302. if err := a.PullTop(); err != nil {
  303. log.Errorf("capi pull top: %s", err)
  304. }
  305. ticker := time.NewTicker(a.pullInterval)
  306. for {
  307. select {
  308. case <-ticker.C:
  309. if err := a.PullTop(); err != nil {
  310. log.Errorf("capi pull top: %s", err)
  311. continue
  312. }
  313. case <-a.pullTomb.Dying(): // if one apic routine is dying, do we kill the others?
  314. a.metricsTomb.Kill(nil)
  315. a.pushTomb.Kill(nil)
  316. return nil
  317. }
  318. }
  319. }
  320. func (a *apic) SendMetrics() error {
  321. defer types.CatchPanic("lapi/metricsToAPIC")
  322. log.Infof("start crowdsec api send metrics (interval: %s)", MetricsInterval)
  323. ticker := time.NewTicker(a.metricsInterval)
  324. for {
  325. select {
  326. case <-ticker.C:
  327. version := cwversion.VersionStr()
  328. metric := &models.Metrics{
  329. ApilVersion: &version,
  330. Machines: make([]*models.MetricsSoftInfo, 0),
  331. Bouncers: make([]*models.MetricsSoftInfo, 0),
  332. }
  333. machines, err := a.dbClient.ListMachines()
  334. if err != nil {
  335. return err
  336. }
  337. bouncers, err := a.dbClient.ListBouncers()
  338. if err != nil {
  339. return err
  340. }
  341. // models.metric structure : len(machines), len(bouncers), a.credentials.Login
  342. // _, _, err := a.apiClient.Metrics.Add(//*models.Metrics)
  343. for _, machine := range machines {
  344. m := &models.MetricsSoftInfo{
  345. Version: machine.Version,
  346. Name: machine.MachineId,
  347. }
  348. metric.Machines = append(metric.Machines, m)
  349. }
  350. for _, bouncer := range bouncers {
  351. m := &models.MetricsSoftInfo{
  352. Version: bouncer.Version,
  353. Name: bouncer.Type,
  354. }
  355. metric.Bouncers = append(metric.Bouncers, m)
  356. }
  357. _, _, err = a.apiClient.Metrics.Add(context.Background(), metric)
  358. if err != nil {
  359. return errors.Wrap(err, "sending metrics failed")
  360. }
  361. log.Infof("capi metrics: metrics sent successfully")
  362. case <-a.metricsTomb.Dying(): // if one apic routine is dying, do we kill the others?
  363. a.pullTomb.Kill(nil)
  364. a.pushTomb.Kill(nil)
  365. return nil
  366. }
  367. }
  368. }
  369. func (a *apic) Shutdown() {
  370. a.pushTomb.Kill(nil)
  371. a.pullTomb.Kill(nil)
  372. a.metricsTomb.Kill(nil)
  373. }