papi.go 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349
  1. package apiserver
  2. import (
  3. "context"
  4. "encoding/json"
  5. "fmt"
  6. "net/http"
  7. "sync"
  8. "time"
  9. "github.com/crowdsecurity/crowdsec/pkg/apiclient"
  10. "github.com/crowdsecurity/crowdsec/pkg/csconfig"
  11. "github.com/crowdsecurity/crowdsec/pkg/database"
  12. "github.com/crowdsecurity/crowdsec/pkg/longpollclient"
  13. "github.com/crowdsecurity/crowdsec/pkg/models"
  14. "github.com/crowdsecurity/crowdsec/pkg/types"
  15. "github.com/pkg/errors"
  16. log "github.com/sirupsen/logrus"
  17. "gopkg.in/tomb.v2"
  18. )
  19. var (
  20. SyncInterval = time.Second * 10
  21. )
  22. const (
  23. PapiPullKey = "papi:last_pull"
  24. )
  25. var (
  26. operationMap = map[string]func(*Message, *Papi, bool) error{
  27. "decision": DecisionCmd,
  28. "alert": AlertCmd,
  29. "management": ManagementCmd,
  30. }
  31. )
  32. type Header struct {
  33. OperationType string `json:"operation_type"`
  34. OperationCmd string `json:"operation_cmd"`
  35. Timestamp time.Time `json:"timestamp"`
  36. Message string `json:"message"`
  37. UUID string `json:"uuid"`
  38. Source *Source `json:"source"`
  39. Destination string `json:"destination"`
  40. }
  41. type Source struct {
  42. User string `json:"user"`
  43. }
  44. type Message struct {
  45. Header *Header
  46. Data interface{} `json:"data"`
  47. }
  48. type OperationChannels struct {
  49. AddAlertChannel chan []*models.Alert
  50. DeleteDecisionChannel chan []*models.Decision
  51. }
  52. type Papi struct {
  53. URL string
  54. Client *longpollclient.LongPollClient
  55. DBClient *database.Client
  56. apiClient *apiclient.ApiClient
  57. Channels *OperationChannels
  58. mu sync.Mutex
  59. pullTomb tomb.Tomb
  60. syncTomb tomb.Tomb
  61. SyncInterval time.Duration
  62. consoleConfig *csconfig.ConsoleConfig
  63. Logger *log.Entry
  64. apic *apic
  65. }
  66. type PapiPermCheckError struct {
  67. Error string `json:"error"`
  68. }
  69. type PapiPermCheckSuccess struct {
  70. Status string `json:"status"`
  71. Plan string `json:"plan"`
  72. Categories []string `json:"categories"`
  73. }
  74. func NewPAPI(apic *apic, dbClient *database.Client, consoleConfig *csconfig.ConsoleConfig, logLevel log.Level) (*Papi, error) {
  75. logger := log.New()
  76. if err := types.ConfigureLogger(logger); err != nil {
  77. return &Papi{}, fmt.Errorf("creating papi logger: %s", err)
  78. }
  79. logger.SetLevel(logLevel)
  80. papiUrl := *apic.apiClient.PapiURL
  81. papiUrl.Path = fmt.Sprintf("%s%s", types.PAPIVersion, types.PAPIPollUrl)
  82. longPollClient, err := longpollclient.NewLongPollClient(longpollclient.LongPollClientConfig{
  83. Url: papiUrl,
  84. Logger: logger,
  85. HttpClient: apic.apiClient.GetClient(),
  86. })
  87. if err != nil {
  88. return &Papi{}, errors.Wrap(err, "failed to create PAPI client")
  89. }
  90. channels := &OperationChannels{
  91. AddAlertChannel: apic.AlertsAddChan,
  92. DeleteDecisionChannel: make(chan []*models.Decision),
  93. }
  94. papi := &Papi{
  95. URL: apic.apiClient.PapiURL.String(),
  96. Client: longPollClient,
  97. DBClient: dbClient,
  98. Channels: channels,
  99. SyncInterval: SyncInterval,
  100. mu: sync.Mutex{},
  101. pullTomb: tomb.Tomb{},
  102. syncTomb: tomb.Tomb{},
  103. apiClient: apic.apiClient,
  104. apic: apic,
  105. consoleConfig: consoleConfig,
  106. Logger: logger.WithFields(log.Fields{"interval": SyncInterval.Seconds(), "source": "papi"}),
  107. }
  108. return papi, nil
  109. }
  110. func (p *Papi) handleEvent(event longpollclient.Event, sync bool) error {
  111. logger := p.Logger.WithField("request-id", event.RequestId)
  112. logger.Debugf("message received: %+v", event.Data)
  113. message := &Message{}
  114. if err := json.Unmarshal([]byte(event.Data), message); err != nil {
  115. return fmt.Errorf("polling papi message format is not compatible: %+v: %s", event.Data, err)
  116. }
  117. if message.Header == nil {
  118. return fmt.Errorf("no header in message, skipping")
  119. }
  120. if message.Header.Source == nil {
  121. return fmt.Errorf("no source user in header message, skipping")
  122. }
  123. if operationFunc, ok := operationMap[message.Header.OperationType]; ok {
  124. logger.Debugf("Calling operation '%s'", message.Header.OperationType)
  125. err := operationFunc(message, p, sync)
  126. if err != nil {
  127. return fmt.Errorf("'%s %s failed: %s", message.Header.OperationType, message.Header.OperationCmd, err)
  128. }
  129. } else {
  130. return fmt.Errorf("operation '%s' unknown, continue", message.Header.OperationType)
  131. }
  132. return nil
  133. }
  134. func (p *Papi) GetPermissions() (PapiPermCheckSuccess, error) {
  135. httpClient := p.apiClient.GetClient()
  136. papiCheckUrl := fmt.Sprintf("%s%s%s", p.URL, types.PAPIVersion, types.PAPIPermissionsUrl)
  137. req, err := http.NewRequest(http.MethodGet, papiCheckUrl, nil)
  138. if err != nil {
  139. return PapiPermCheckSuccess{}, fmt.Errorf("failed to create request : %s", err)
  140. }
  141. resp, err := httpClient.Do(req)
  142. if err != nil {
  143. log.Fatalf("failed to get response : %s", err)
  144. }
  145. defer resp.Body.Close()
  146. if resp.StatusCode != http.StatusOK {
  147. errResp := PapiPermCheckError{}
  148. err = json.NewDecoder(resp.Body).Decode(&errResp)
  149. if err != nil {
  150. return PapiPermCheckSuccess{}, fmt.Errorf("failed to decode response : %s", err)
  151. }
  152. return PapiPermCheckSuccess{}, fmt.Errorf("unable to query PAPI : %s (%d)", errResp.Error, resp.StatusCode)
  153. }
  154. respBody := PapiPermCheckSuccess{}
  155. err = json.NewDecoder(resp.Body).Decode(&respBody)
  156. if err != nil {
  157. return PapiPermCheckSuccess{}, fmt.Errorf("failed to decode response : %s", err)
  158. }
  159. return respBody, nil
  160. }
  161. func reverse(s []longpollclient.Event) []longpollclient.Event {
  162. a := make([]longpollclient.Event, len(s))
  163. copy(a, s)
  164. for i := len(a)/2 - 1; i >= 0; i-- {
  165. opp := len(a) - 1 - i
  166. a[i], a[opp] = a[opp], a[i]
  167. }
  168. return a
  169. }
  170. func (p *Papi) PullOnce(since time.Time, sync bool) error {
  171. events, err := p.Client.PullOnce(since)
  172. if err != nil {
  173. return err
  174. }
  175. reversedEvents := reverse(events) //PAPI sends events in the reverse order, which is not an issue when pulling them in real time, but here we need the correct order
  176. eventsCount := len(events)
  177. p.Logger.Infof("received %d events", eventsCount)
  178. for i, event := range reversedEvents {
  179. if err := p.handleEvent(event, sync); err != nil {
  180. p.Logger.WithField("request-id", event.RequestId).Errorf("failed to handle event: %s", err)
  181. }
  182. p.Logger.Debugf("handled event %d/%d", i, eventsCount)
  183. }
  184. p.Logger.Debugf("finished handling events")
  185. //Don't update the timestamp in DB, as a "real" LAPI might be running
  186. //Worst case, crowdsec will receive a few duplicated events and will discard them
  187. return nil
  188. }
  189. // PullPAPI is the long polling client for real-time decisions from PAPI
  190. func (p *Papi) Pull() error {
  191. defer types.CatchPanic("lapi/PullPAPI")
  192. p.Logger.Infof("Starting Polling API Pull")
  193. lastTimestamp := time.Time{}
  194. lastTimestampStr, err := p.DBClient.GetConfigItem(PapiPullKey)
  195. if err != nil {
  196. p.Logger.Warningf("failed to get last timestamp for papi pull: %s", err)
  197. }
  198. //value doesn't exist, it's first time we're pulling
  199. if lastTimestampStr == nil {
  200. binTime, err := lastTimestamp.MarshalText()
  201. if err != nil {
  202. return errors.Wrap(err, "failed to marshal last timestamp")
  203. }
  204. if err := p.DBClient.SetConfigItem(PapiPullKey, string(binTime)); err != nil {
  205. p.Logger.Errorf("error setting papi pull last key: %s", err)
  206. } else {
  207. p.Logger.Debugf("config item '%s' set in database with value '%s'", PapiPullKey, string(binTime))
  208. }
  209. } else {
  210. if err := lastTimestamp.UnmarshalText([]byte(*lastTimestampStr)); err != nil {
  211. return errors.Wrap(err, "failed to unmarshal last timestamp")
  212. }
  213. }
  214. p.Logger.Infof("Starting PAPI pull (since:%s)", lastTimestamp)
  215. for event := range p.Client.Start(lastTimestamp) {
  216. logger := p.Logger.WithField("request-id", event.RequestId)
  217. //update last timestamp in database
  218. newTime := time.Now().UTC()
  219. binTime, err := newTime.MarshalText()
  220. if err != nil {
  221. return errors.Wrap(err, "failed to marshal last timestamp")
  222. }
  223. err = p.handleEvent(event, false)
  224. if err != nil {
  225. logger.Errorf("failed to handle event: %s", err)
  226. continue
  227. }
  228. if err := p.DBClient.SetConfigItem(PapiPullKey, string(binTime)); err != nil {
  229. return errors.Wrap(err, "failed to update last timestamp")
  230. } else {
  231. logger.Debugf("set last timestamp to %s", newTime)
  232. }
  233. }
  234. return nil
  235. }
  236. func (p *Papi) SyncDecisions() error {
  237. defer types.CatchPanic("lapi/syncDecisionsToCAPI")
  238. var cache models.DecisionsDeleteRequest
  239. ticker := time.NewTicker(p.SyncInterval)
  240. p.Logger.Infof("Start decisions sync to CrowdSec Central API (interval: %s)", p.SyncInterval)
  241. for {
  242. select {
  243. case <-p.syncTomb.Dying(): // if one apic routine is dying, do we kill the others?
  244. p.Logger.Infof("sync decisions tomb is dying, sending cache (%d elements) before exiting", len(cache))
  245. if len(cache) == 0 {
  246. return nil
  247. }
  248. go p.SendDeletedDecisions(&cache)
  249. return nil
  250. case <-ticker.C:
  251. if len(cache) > 0 {
  252. p.mu.Lock()
  253. cacheCopy := cache
  254. cache = make([]models.DecisionsDeleteRequestItem, 0)
  255. p.mu.Unlock()
  256. p.Logger.Infof("sync decisions: %d deleted decisions to push", len(cacheCopy))
  257. go p.SendDeletedDecisions(&cacheCopy)
  258. }
  259. case deletedDecisions := <-p.Channels.DeleteDecisionChannel:
  260. if (p.consoleConfig.ShareManualDecisions != nil && *p.consoleConfig.ShareManualDecisions) || (p.consoleConfig.ConsoleManagement != nil && *p.consoleConfig.ConsoleManagement) {
  261. var tmpDecisions []models.DecisionsDeleteRequestItem
  262. p.Logger.Debugf("%d decisions deletion to add in cache", len(deletedDecisions))
  263. for _, decision := range deletedDecisions {
  264. tmpDecisions = append(tmpDecisions, models.DecisionsDeleteRequestItem(decision.UUID))
  265. }
  266. p.mu.Lock()
  267. cache = append(cache, tmpDecisions...)
  268. p.mu.Unlock()
  269. }
  270. }
  271. }
  272. }
  273. func (p *Papi) SendDeletedDecisions(cacheOrig *models.DecisionsDeleteRequest) {
  274. var cache []models.DecisionsDeleteRequestItem = *cacheOrig
  275. var send models.DecisionsDeleteRequest
  276. bulkSize := 50
  277. pageStart := 0
  278. pageEnd := bulkSize
  279. for {
  280. if pageEnd >= len(cache) {
  281. send = cache[pageStart:]
  282. ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
  283. defer cancel()
  284. _, _, err := p.apiClient.DecisionDelete.Add(ctx, &send)
  285. if err != nil {
  286. p.Logger.Errorf("sending deleted decisions to central API: %s", err)
  287. return
  288. }
  289. break
  290. }
  291. send = cache[pageStart:pageEnd]
  292. ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
  293. defer cancel()
  294. _, _, err := p.apiClient.DecisionDelete.Add(ctx, &send)
  295. if err != nil {
  296. //we log it here as well, because the return value of func might be discarded
  297. p.Logger.Errorf("sending deleted decisions to central API: %s", err)
  298. }
  299. pageStart += bulkSize
  300. pageEnd += bulkSize
  301. }
  302. }
  303. func (p *Papi) Shutdown() {
  304. p.Logger.Infof("Shutting down PAPI")
  305. p.syncTomb.Kill(nil)
  306. p.Client.Stop()
  307. }