papi.go 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347
  1. package apiserver
  2. import (
  3. "context"
  4. "encoding/json"
  5. "fmt"
  6. "net/http"
  7. "sync"
  8. "time"
  9. "github.com/crowdsecurity/crowdsec/pkg/apiclient"
  10. "github.com/crowdsecurity/crowdsec/pkg/csconfig"
  11. "github.com/crowdsecurity/crowdsec/pkg/database"
  12. "github.com/crowdsecurity/crowdsec/pkg/longpollclient"
  13. "github.com/crowdsecurity/crowdsec/pkg/models"
  14. "github.com/crowdsecurity/crowdsec/pkg/types"
  15. "github.com/pkg/errors"
  16. "github.com/sirupsen/logrus"
  17. log "github.com/sirupsen/logrus"
  18. "gopkg.in/tomb.v2"
  19. )
  20. var (
  21. SyncInterval = time.Second * 10
  22. )
  23. const (
  24. PapiPullKey = "papi:last_pull"
  25. )
  26. var (
  27. operationMap = map[string]func(*Message, *Papi) error{
  28. "decision": DecisionCmd,
  29. "alert": AlertCmd,
  30. }
  31. )
  32. type Header struct {
  33. OperationType string `json:"operation_type"`
  34. OperationCmd string `json:"operation_cmd"`
  35. Timestamp time.Time `json:"timestamp"`
  36. Message string `json:"message"`
  37. UUID string `json:"uuid"`
  38. Source *Source `json:"source"`
  39. Destination string `json:"destination"`
  40. }
  41. type Source struct {
  42. User string `json:"user"`
  43. }
  44. type Message struct {
  45. Header *Header
  46. Data interface{} `json:"data"`
  47. }
  48. type OperationChannels struct {
  49. AddAlertChannel chan []*models.Alert
  50. DeleteDecisionChannel chan []*models.Decision
  51. }
  52. type Papi struct {
  53. URL string
  54. Client *longpollclient.LongPollClient
  55. DBClient *database.Client
  56. apiClient *apiclient.ApiClient
  57. Channels *OperationChannels
  58. mu sync.Mutex
  59. pullTomb tomb.Tomb
  60. syncTomb tomb.Tomb
  61. SyncInterval time.Duration
  62. consoleConfig *csconfig.ConsoleConfig
  63. Logger *log.Entry
  64. }
  65. type PapiPermCheckError struct {
  66. Error string `json:"error"`
  67. }
  68. type PapiPermCheckSuccess struct {
  69. Status string `json:"status"`
  70. Plan string `json:"plan"`
  71. Categories []string `json:"categories"`
  72. }
  73. func NewPAPI(apic *apic, dbClient *database.Client, consoleConfig *csconfig.ConsoleConfig, logLevel log.Level) (*Papi, error) {
  74. logger := logrus.New()
  75. if err := types.ConfigureLogger(logger); err != nil {
  76. return &Papi{}, fmt.Errorf("creating papi logger: %s", err)
  77. }
  78. logger.SetLevel(logLevel)
  79. papiUrl := *apic.apiClient.PapiURL
  80. papiUrl.Path = fmt.Sprintf("%s%s", types.PAPIVersion, types.PAPIPollUrl)
  81. longPollClient, err := longpollclient.NewLongPollClient(longpollclient.LongPollClientConfig{
  82. Url: papiUrl,
  83. Logger: logger,
  84. HttpClient: apic.apiClient.GetClient(),
  85. })
  86. if err != nil {
  87. return &Papi{}, errors.Wrap(err, "failed to create PAPI client")
  88. }
  89. channels := &OperationChannels{
  90. AddAlertChannel: apic.AlertsAddChan,
  91. DeleteDecisionChannel: make(chan []*models.Decision),
  92. }
  93. papi := &Papi{
  94. URL: apic.apiClient.PapiURL.String(),
  95. Client: longPollClient,
  96. DBClient: dbClient,
  97. Channels: channels,
  98. SyncInterval: SyncInterval,
  99. mu: sync.Mutex{},
  100. pullTomb: tomb.Tomb{},
  101. syncTomb: tomb.Tomb{},
  102. apiClient: apic.apiClient,
  103. consoleConfig: consoleConfig,
  104. Logger: logger.WithFields(log.Fields{"interval": SyncInterval.Seconds(), "source": "papi"}),
  105. }
  106. return papi, nil
  107. }
  108. func (p *Papi) handleEvent(event longpollclient.Event) error {
  109. logger := p.Logger.WithField("request-id", event.RequestId)
  110. logger.Debugf("message received: %+v", event.Data)
  111. message := &Message{}
  112. if err := json.Unmarshal([]byte(event.Data), message); err != nil {
  113. return fmt.Errorf("polling papi message format is not compatible: %+v: %s", event.Data, err)
  114. }
  115. if message.Header == nil {
  116. return fmt.Errorf("no header in message, skipping")
  117. }
  118. if message.Header.Source == nil {
  119. return fmt.Errorf("no source user in header message, skipping")
  120. }
  121. if operationFunc, ok := operationMap[message.Header.OperationType]; ok {
  122. logger.Debugf("Calling operation '%s'", message.Header.OperationType)
  123. err := operationFunc(message, p)
  124. if err != nil {
  125. return fmt.Errorf("'%s %s failed: %s", message.Header.OperationType, message.Header.OperationCmd, err)
  126. }
  127. } else {
  128. return fmt.Errorf("operation '%s' unknown, continue", message.Header.OperationType)
  129. }
  130. return nil
  131. }
  132. func (p *Papi) GetPermissions() (PapiPermCheckSuccess, error) {
  133. httpClient := p.apiClient.GetClient()
  134. papiCheckUrl := fmt.Sprintf("%s%s%s", p.URL, types.PAPIVersion, types.PAPIPermissionsUrl)
  135. req, err := http.NewRequest(http.MethodGet, papiCheckUrl, nil)
  136. if err != nil {
  137. return PapiPermCheckSuccess{}, fmt.Errorf("failed to create request : %s", err)
  138. }
  139. resp, err := httpClient.Do(req)
  140. if err != nil {
  141. log.Fatalf("failed to get response : %s", err)
  142. }
  143. defer resp.Body.Close()
  144. if resp.StatusCode != http.StatusOK {
  145. errResp := PapiPermCheckError{}
  146. err = json.NewDecoder(resp.Body).Decode(&errResp)
  147. if err != nil {
  148. return PapiPermCheckSuccess{}, fmt.Errorf("failed to decode response : %s", err)
  149. }
  150. return PapiPermCheckSuccess{}, fmt.Errorf("unable to query PAPI : %s (%d)", errResp.Error, resp.StatusCode)
  151. }
  152. respBody := PapiPermCheckSuccess{}
  153. err = json.NewDecoder(resp.Body).Decode(&respBody)
  154. if err != nil {
  155. return PapiPermCheckSuccess{}, fmt.Errorf("failed to decode response : %s", err)
  156. }
  157. return respBody, nil
  158. }
  159. func reverse(s []longpollclient.Event) []longpollclient.Event {
  160. a := make([]longpollclient.Event, len(s))
  161. copy(a, s)
  162. for i := len(a)/2 - 1; i >= 0; i-- {
  163. opp := len(a) - 1 - i
  164. a[i], a[opp] = a[opp], a[i]
  165. }
  166. return a
  167. }
  168. func (p *Papi) PullOnce(since time.Time) error {
  169. events, err := p.Client.PullOnce(since)
  170. if err != nil {
  171. return err
  172. }
  173. reversedEvents := reverse(events) //PAPI sends events in the reverse order, which is not an issue when pulling them in real time, but here we need the correct order
  174. eventsCount := len(events)
  175. p.Logger.Infof("received %d events", eventsCount)
  176. for i, event := range reversedEvents {
  177. if err := p.handleEvent(event); err != nil {
  178. p.Logger.WithField("request-id", event.RequestId).Errorf("failed to handle event: %s", err)
  179. }
  180. p.Logger.Debugf("handled event %d/%d", i, eventsCount)
  181. }
  182. p.Logger.Debugf("finished handling events")
  183. //Don't update the timestamp in DB, as a "real" LAPI might be running
  184. //Worst case, crowdsec will receive a few duplicated events and will discard them
  185. return nil
  186. }
  187. // PullPAPI is the long polling client for real-time decisions from PAPI
  188. func (p *Papi) Pull() error {
  189. defer types.CatchPanic("lapi/PullPAPI")
  190. p.Logger.Infof("Starting Polling API Pull")
  191. lastTimestamp := time.Time{}
  192. lastTimestampStr, err := p.DBClient.GetConfigItem(PapiPullKey)
  193. if err != nil {
  194. p.Logger.Warningf("failed to get last timestamp for papi pull: %s", err)
  195. }
  196. //value doesn't exist, it's first time we're pulling
  197. if lastTimestampStr == nil {
  198. binTime, err := lastTimestamp.MarshalText()
  199. if err != nil {
  200. return errors.Wrap(err, "failed to marshal last timestamp")
  201. }
  202. if err := p.DBClient.SetConfigItem(PapiPullKey, string(binTime)); err != nil {
  203. p.Logger.Errorf("error setting papi pull last key: %s", err)
  204. } else {
  205. p.Logger.Debugf("config item '%s' set in database with value '%s'", PapiPullKey, string(binTime))
  206. }
  207. } else {
  208. if err := lastTimestamp.UnmarshalText([]byte(*lastTimestampStr)); err != nil {
  209. return errors.Wrap(err, "failed to unmarshal last timestamp")
  210. }
  211. }
  212. p.Logger.Infof("Starting PAPI pull (since:%s)", lastTimestamp)
  213. for event := range p.Client.Start(lastTimestamp) {
  214. logger := p.Logger.WithField("request-id", event.RequestId)
  215. //update last timestamp in database
  216. newTime := time.Now().UTC()
  217. binTime, err := newTime.MarshalText()
  218. if err != nil {
  219. return errors.Wrap(err, "failed to marshal last timestamp")
  220. }
  221. err = p.handleEvent(event)
  222. if err != nil {
  223. logger.Errorf("failed to handle event: %s", err)
  224. continue
  225. }
  226. if err := p.DBClient.SetConfigItem(PapiPullKey, string(binTime)); err != nil {
  227. return errors.Wrap(err, "failed to update last timestamp")
  228. } else {
  229. logger.Debugf("set last timestamp to %s", newTime)
  230. }
  231. }
  232. return nil
  233. }
  234. func (p *Papi) SyncDecisions() error {
  235. defer types.CatchPanic("lapi/syncDecisionsToCAPI")
  236. var cache models.DecisionsDeleteRequest
  237. ticker := time.NewTicker(p.SyncInterval)
  238. p.Logger.Infof("Start decisions sync to CrowdSec Central API (interval: %s)", p.SyncInterval)
  239. for {
  240. select {
  241. case <-p.syncTomb.Dying(): // if one apic routine is dying, do we kill the others?
  242. p.Logger.Infof("sync decisions tomb is dying, sending cache (%d elements) before exiting", len(cache))
  243. if len(cache) == 0 {
  244. return nil
  245. }
  246. go p.SendDeletedDecisions(&cache)
  247. return nil
  248. case <-ticker.C:
  249. if len(cache) > 0 {
  250. p.mu.Lock()
  251. cacheCopy := cache
  252. cache = make([]models.DecisionsDeleteRequestItem, 0)
  253. p.mu.Unlock()
  254. p.Logger.Infof("sync decisions: %d deleted decisions to push", len(cacheCopy))
  255. go p.SendDeletedDecisions(&cacheCopy)
  256. }
  257. case deletedDecisions := <-p.Channels.DeleteDecisionChannel:
  258. if (p.consoleConfig.ShareManualDecisions != nil && *p.consoleConfig.ShareManualDecisions) || (p.consoleConfig.ReceiveDecisions != nil && *p.consoleConfig.ReceiveDecisions) {
  259. var tmpDecisions []models.DecisionsDeleteRequestItem
  260. p.Logger.Debugf("%d decisions deletion to add in cache", len(deletedDecisions))
  261. for _, decision := range deletedDecisions {
  262. tmpDecisions = append(tmpDecisions, models.DecisionsDeleteRequestItem(decision.UUID))
  263. }
  264. p.mu.Lock()
  265. cache = append(cache, tmpDecisions...)
  266. p.mu.Unlock()
  267. }
  268. }
  269. }
  270. }
  271. func (p *Papi) SendDeletedDecisions(cacheOrig *models.DecisionsDeleteRequest) {
  272. var cache []models.DecisionsDeleteRequestItem = *cacheOrig
  273. var send models.DecisionsDeleteRequest
  274. bulkSize := 50
  275. pageStart := 0
  276. pageEnd := bulkSize
  277. for {
  278. if pageEnd >= len(cache) {
  279. send = cache[pageStart:]
  280. ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
  281. defer cancel()
  282. _, _, err := p.apiClient.DecisionDelete.Add(ctx, &send)
  283. if err != nil {
  284. p.Logger.Errorf("sending deleted decisions to central API: %s", err)
  285. return
  286. }
  287. break
  288. }
  289. send = cache[pageStart:pageEnd]
  290. ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
  291. defer cancel()
  292. _, _, err := p.apiClient.DecisionDelete.Add(ctx, &send)
  293. if err != nil {
  294. //we log it here as well, because the return value of func might be discarded
  295. p.Logger.Errorf("sending deleted decisions to central API: %s", err)
  296. }
  297. pageStart += bulkSize
  298. pageEnd += bulkSize
  299. }
  300. }
  301. func (p *Papi) Shutdown() {
  302. p.Logger.Infof("Shutting down PAPI")
  303. p.syncTomb.Kill(nil)
  304. p.Client.Stop()
  305. }