Browse Source

Add `cscli papi status` and `cscli papi sync` (#2091)

blotus 2 years ago
parent
commit
85ab9c68a2

+ 5 - 2
cmd/crowdsec-cli/capi.go

@@ -10,6 +10,7 @@ import (
 	"github.com/crowdsecurity/crowdsec/pkg/csconfig"
 	"github.com/crowdsecurity/crowdsec/pkg/cwhub"
 	"github.com/crowdsecurity/crowdsec/pkg/cwversion"
+	"github.com/crowdsecurity/crowdsec/pkg/fflag"
 	"github.com/crowdsecurity/crowdsec/pkg/models"
 	"github.com/crowdsecurity/crowdsec/pkg/types"
 	"github.com/go-openapi/strfmt"
@@ -19,7 +20,7 @@ import (
 	"gopkg.in/yaml.v2"
 )
 
-const CAPIBaseURL string = "https://api.crowdsec.net/"
+const CAPIBaseURL string = "https://api.dev.crowdsec.net/"
 const CAPIURLPrefix = "v3"
 
 func NewCapiCmd() *cobra.Command {
@@ -92,7 +93,9 @@ func NewCapiRegisterCmd() *cobra.Command {
 				Login:    capiUser,
 				Password: password.String(),
 				URL:      types.CAPIBaseURL,
-				PapiURL:  types.PAPIBaseURL,
+			}
+			if fflag.PapiClient.IsEnabled() {
+				apiCfg.PapiURL = types.PAPIBaseURL
 			}
 			apiConfigDump, err := yaml.Marshal(apiCfg)
 			if err != nil {

+ 12 - 8
cmd/crowdsec-cli/main.go

@@ -159,16 +159,16 @@ It is meant to allow you to manage bans, parsers/scenarios/etc, api and generall
 	}
 
 	cc.Init(&cc.Config{
-		RootCmd: rootCmd,
-		Headings: cc.Yellow,
-		Commands: cc.Green + cc.Bold,
+		RootCmd:       rootCmd,
+		Headings:      cc.Yellow,
+		Commands:      cc.Green + cc.Bold,
 		CmdShortDescr: cc.Cyan,
-		Example: cc.Italic,
-		ExecName: cc.Bold,
-		Aliases: cc.Bold + cc.Italic,
+		Example:       cc.Italic,
+		ExecName:      cc.Bold,
+		Aliases:       cc.Bold + cc.Italic,
 		FlagsDataType: cc.White,
-		Flags: cc.Green,
-		FlagsDescr: cc.Cyan,
+		Flags:         cc.Green,
+		FlagsDescr:    cc.Cyan,
 	})
 	rootCmd.SetOut(color.Output)
 
@@ -247,6 +247,10 @@ It is meant to allow you to manage bans, parsers/scenarios/etc, api and generall
 		rootCmd.AddCommand(NewSetupCmd())
 	}
 
+	if fflag.PapiClient.IsEnabled() {
+		rootCmd.AddCommand(NewPapiCmd())
+	}
+
 	if err := rootCmd.Execute(); err != nil {
 		if bincoverTesting != "" {
 			log.Debug("coverage report is enabled")

+ 136 - 0
cmd/crowdsec-cli/papi.go

@@ -0,0 +1,136 @@
+package main
+
+import (
+	"time"
+
+	"github.com/crowdsecurity/crowdsec/pkg/apiserver"
+	"github.com/crowdsecurity/crowdsec/pkg/database"
+	"github.com/crowdsecurity/crowdsec/pkg/types"
+	"github.com/pkg/errors"
+	log "github.com/sirupsen/logrus"
+	"github.com/spf13/cobra"
+	"gopkg.in/tomb.v2"
+)
+
+func NewPapiCmd() *cobra.Command {
+	var cmdLapi = &cobra.Command{
+		Use:               "papi [action]",
+		Short:             "Manage interaction with Polling API (PAPI)",
+		Args:              cobra.MinimumNArgs(1),
+		DisableAutoGenTag: true,
+		PersistentPreRunE: func(cmd *cobra.Command, args []string) error {
+			if err := csConfig.LoadAPIServer(); err != nil || csConfig.DisableAPI {
+				return errors.Wrap(err, "Local API is disabled, please run this command on the local API machine")
+			}
+			if csConfig.API.Server.OnlineClient == nil {
+				log.Fatalf("no configuration for Central API in '%s'", *csConfig.FilePath)
+			}
+			if csConfig.API.Server.OnlineClient.Credentials.PapiURL == "" {
+				log.Fatalf("no PAPI URL in configuration")
+			}
+			return nil
+		},
+	}
+
+	cmdLapi.AddCommand(NewPapiStatusCmd())
+	cmdLapi.AddCommand(NewPapiSyncCmd())
+
+	return cmdLapi
+}
+
+func NewPapiStatusCmd() *cobra.Command {
+	cmdCapiStatus := &cobra.Command{
+		Use:               "status",
+		Short:             "Get status of the Polling API",
+		Args:              cobra.MinimumNArgs(0),
+		DisableAutoGenTag: true,
+		Run: func(cmd *cobra.Command, args []string) {
+			var err error
+			dbClient, err = database.NewClient(csConfig.DbConfig)
+			if err != nil {
+				log.Fatalf("unable to initialize database client : %s", err)
+			}
+
+			apic, err := apiserver.NewAPIC(csConfig.API.Server.OnlineClient, dbClient, csConfig.API.Server.ConsoleConfig)
+
+			if err != nil {
+				log.Fatalf("unable to initialize API client : %s", err)
+			}
+
+			papi, err := apiserver.NewPAPI(apic, dbClient, csConfig.API.Server.ConsoleConfig, log.GetLevel())
+
+			if err != nil {
+				log.Fatalf("unable to initialize PAPI client : %s", err)
+			}
+
+			perms, err := papi.GetPermissions()
+
+			if err != nil {
+				log.Fatalf("unable to get PAPI permissions: %s", err)
+			}
+			var lastTimestampStr *string
+			lastTimestampStr, err = dbClient.GetConfigItem(apiserver.PapiPullKey)
+			if err != nil {
+				lastTimestampStr = types.StrPtr("never")
+			}
+			log.Infof("You can successfully interact with Polling API (PAPI)")
+			log.Infof("Console plan: %s", perms.Plan)
+			log.Infof("Last order received: %s", *lastTimestampStr)
+
+			log.Infof("PAPI subscriptions:")
+			for _, sub := range perms.Categories {
+				log.Infof(" - %s", sub)
+			}
+		},
+	}
+
+	return cmdCapiStatus
+}
+
+func NewPapiSyncCmd() *cobra.Command {
+	cmdCapiSync := &cobra.Command{
+		Use:               "sync",
+		Short:             "Sync with the Polling API, pulling all non-expired orders for the instance",
+		Args:              cobra.MinimumNArgs(0),
+		DisableAutoGenTag: true,
+		Run: func(cmd *cobra.Command, args []string) {
+			var err error
+			t := tomb.Tomb{}
+			dbClient, err = database.NewClient(csConfig.DbConfig)
+			if err != nil {
+				log.Fatalf("unable to initialize database client : %s", err)
+			}
+
+			apic, err := apiserver.NewAPIC(csConfig.API.Server.OnlineClient, dbClient, csConfig.API.Server.ConsoleConfig)
+
+			if err != nil {
+				log.Fatalf("unable to initialize API client : %s", err)
+			}
+
+			t.Go(apic.Push)
+
+			papi, err := apiserver.NewPAPI(apic, dbClient, csConfig.API.Server.ConsoleConfig, log.GetLevel())
+
+			if err != nil {
+				log.Fatalf("unable to initialize PAPI client : %s", err)
+			}
+			t.Go(papi.SyncDecisions)
+
+			err = papi.PullOnce(time.Time{})
+
+			if err != nil {
+				log.Fatalf("unable to sync decisions: %s", err)
+			}
+
+			log.Infof("Sending acknowledgements to CAPI")
+
+			apic.Shutdown()
+			papi.Shutdown()
+			t.Wait()
+			time.Sleep(5 * time.Second) //FIXME: the push done by apic.Push is run inside a sub goroutine, sleep to make sure it's done
+
+		},
+	}
+
+	return cmdCapiSync
+}

+ 111 - 27
pkg/apiserver/papi.go

@@ -4,6 +4,7 @@ import (
 	"context"
 	"encoding/json"
 	"fmt"
+	"net/http"
 	"sync"
 	"time"
 
@@ -72,6 +73,16 @@ type Papi struct {
 	Logger        *log.Entry
 }
 
+type PapiPermCheckError struct {
+	Error string `json:"error"`
+}
+
+type PapiPermCheckSuccess struct {
+	Status     string   `json:"status"`
+	Plan       string   `json:"plan"`
+	Categories []string `json:"categories"`
+}
+
 func NewPAPI(apic *apic, dbClient *database.Client, consoleConfig *csconfig.ConsoleConfig, logLevel log.Level) (*Papi, error) {
 
 	logger := logrus.New()
@@ -80,8 +91,10 @@ func NewPAPI(apic *apic, dbClient *database.Client, consoleConfig *csconfig.Cons
 	}
 	logger.SetLevel(logLevel)
 
+	papiUrl := *apic.apiClient.PapiURL
+	papiUrl.Path = fmt.Sprintf("%s%s", types.PAPIVersion, types.PAPIPollUrl)
 	longPollClient, err := longpollclient.NewLongPollClient(longpollclient.LongPollClientConfig{
-		Url:        *apic.apiClient.PapiURL,
+		Url:        papiUrl,
 		Logger:     logger,
 		HttpClient: apic.apiClient.GetClient(),
 	})
@@ -112,6 +125,94 @@ func NewPAPI(apic *apic, dbClient *database.Client, consoleConfig *csconfig.Cons
 	return papi, nil
 }
 
+func (p *Papi) handleEvent(event longpollclient.Event) error {
+	logger := p.Logger.WithField("request-id", event.RequestId)
+	logger.Debugf("message received: %+v", event.Data)
+	message := &Message{}
+	if err := json.Unmarshal([]byte(event.Data), message); err != nil {
+		return fmt.Errorf("polling papi message format is not compatible: %+v: %s", event.Data, err)
+	}
+	if message.Header == nil {
+		return fmt.Errorf("no header in message, skipping")
+	}
+	if message.Header.Source == nil {
+		return fmt.Errorf("no source user in header message, skipping")
+	}
+
+	if operationFunc, ok := operationMap[message.Header.OperationType]; ok {
+		logger.Debugf("Calling operation '%s'", message.Header.OperationType)
+		err := operationFunc(message, p)
+		if err != nil {
+			return fmt.Errorf("'%s %s failed: %s", message.Header.OperationType, message.Header.OperationCmd, err)
+		}
+	} else {
+		return fmt.Errorf("operation '%s' unknown, continue", message.Header.OperationType)
+	}
+	return nil
+}
+
+func (p *Papi) GetPermissions() (PapiPermCheckSuccess, error) {
+	httpClient := p.apiClient.GetClient()
+	papiCheckUrl := fmt.Sprintf("%s%s%s", p.URL, types.PAPIVersion, types.PAPIPermissionsUrl)
+	req, err := http.NewRequest(http.MethodGet, papiCheckUrl, nil)
+	if err != nil {
+		return PapiPermCheckSuccess{}, fmt.Errorf("failed to create request : %s", err)
+	}
+	resp, err := httpClient.Do(req)
+	if err != nil {
+		log.Fatalf("failed to get response : %s", err)
+	}
+
+	defer resp.Body.Close()
+	if resp.StatusCode != http.StatusOK {
+		errResp := PapiPermCheckError{}
+		err = json.NewDecoder(resp.Body).Decode(&errResp)
+		if err != nil {
+			return PapiPermCheckSuccess{}, fmt.Errorf("failed to decode response : %s", err)
+		}
+		return PapiPermCheckSuccess{}, fmt.Errorf("unable to query PAPI : %s (%d)", errResp.Error, resp.StatusCode)
+	}
+	respBody := PapiPermCheckSuccess{}
+	err = json.NewDecoder(resp.Body).Decode(&respBody)
+	if err != nil {
+		return PapiPermCheckSuccess{}, fmt.Errorf("failed to decode response : %s", err)
+	}
+	return respBody, nil
+}
+
+func reverse(s []longpollclient.Event) []longpollclient.Event {
+	a := make([]longpollclient.Event, len(s))
+	copy(a, s)
+
+	for i := len(a)/2 - 1; i >= 0; i-- {
+		opp := len(a) - 1 - i
+		a[i], a[opp] = a[opp], a[i]
+	}
+
+	return a
+}
+
+func (p *Papi) PullOnce(since time.Time) error {
+	events, err := p.Client.PullOnce(since)
+	if err != nil {
+		return err
+	}
+
+	reversedEvents := reverse(events) //PAPI sends events in the reverse order, which is not an issue when pulling them in real time, but here we need the correct order
+	eventsCount := len(events)
+	p.Logger.Infof("received %d events", eventsCount)
+	for i, event := range reversedEvents {
+		if err := p.handleEvent(event); err != nil {
+			p.Logger.WithField("request-id", event.RequestId).Errorf("failed to handle event: %s", err)
+		}
+		p.Logger.Debugf("handled event %d/%d", i, eventsCount)
+	}
+	p.Logger.Debugf("finished handling events")
+	//Don't update the timestamp in DB, as a "real" LAPI might be running
+	//Worst case, crowdsec will receive a few duplicated events and will discard them
+	return nil
+}
+
 // PullPAPI is the long polling client for real-time decisions from PAPI
 func (p *Papi) Pull() error {
 
@@ -149,33 +250,10 @@ func (p *Papi) Pull() error {
 		if err != nil {
 			return errors.Wrap(err, "failed to marshal last timestamp")
 		}
-		logger.Debugf("message received: %+v", event.Data)
-		message := &Message{}
-		if err := json.Unmarshal([]byte(event.Data), message); err != nil {
-			logger.Errorf("polling papi message format is not compatible: %+v: %s", event.Data, err)
-			// do we want to continue or exit ?
-			continue
-		}
-
-		if message.Header == nil {
-			logger.Errorf("no header in message, skipping")
-			continue
-		}
-
-		if message.Header.Source == nil {
-			logger.Errorf("no source user in header message, skipping")
-			continue
-		}
 
-		if operationFunc, ok := operationMap[message.Header.OperationType]; ok {
-			logger.Debugf("Calling operation '%s'", message.Header.OperationType)
-			err := operationFunc(message, p)
-			if err != nil {
-				logger.Errorf("'%s %s failed: %s", message.Header.OperationType, message.Header.OperationCmd, err)
-				continue
-			}
-		} else {
-			logger.Errorf("operation '%s' unknown, continue", message.Header.OperationType)
+		err = p.handleEvent(event)
+		if err != nil {
+			logger.Errorf("failed to handle event: %s", err)
 			continue
 		}
 
@@ -261,3 +339,9 @@ func (p *Papi) SendDeletedDecisions(cacheOrig *models.DecisionsDeleteRequest) {
 		pageEnd += bulkSize
 	}
 }
+
+func (p *Papi) Shutdown() {
+	p.Logger.Infof("Shutting down PAPI")
+	p.syncTomb.Kill(nil)
+	p.Client.Stop()
+}

+ 52 - 6
pkg/longpollclient/client.go

@@ -19,6 +19,7 @@ type LongPollClient struct {
 	url        url.URL
 	logger     *log.Entry
 	since      int64
+	timeout    string
 	httpClient *http.Client
 }
 
@@ -48,13 +49,11 @@ var errUnauthorized = fmt.Errorf("user is not authorized to use PAPI")
 
 const timeoutMessage = "no events before timeout"
 
-func (c *LongPollClient) doQuery() error {
-
+func (c *LongPollClient) doQuery() (*http.Response, error) {
 	logger := c.logger.WithField("method", "doQuery")
-
 	query := c.url.Query()
 	query.Set("since_time", fmt.Sprintf("%d", c.since))
-	query.Set("timeout", "45")
+	query.Set("timeout", c.timeout)
 	c.url.RawQuery = query.Encode()
 
 	logger.Debugf("Query parameters: %s", c.url.RawQuery)
@@ -62,15 +61,29 @@ func (c *LongPollClient) doQuery() error {
 	req, err := http.NewRequest(http.MethodGet, c.url.String(), nil)
 	if err != nil {
 		logger.Errorf("failed to create request: %s", err)
-		return err
+		return nil, err
 	}
 	req.Header.Set("Accept", "application/json")
 	resp, err := c.httpClient.Do(req)
 	if err != nil {
 		logger.Errorf("failed to execute request: %s", err)
+		return nil, err
+	}
+	return resp, nil
+}
+
+func (c *LongPollClient) poll() error {
+
+	logger := c.logger.WithField("method", "poll")
+
+	resp, err := c.doQuery()
+
+	if err != nil {
 		return err
 	}
+
 	defer resp.Body.Close()
+
 	requestId := resp.Header.Get("X-Amzn-Trace-Id")
 	logger = logger.WithField("request-id", requestId)
 	if resp.StatusCode != http.StatusOK {
@@ -142,7 +155,7 @@ func (c *LongPollClient) pollEvents() error {
 			return nil
 		default:
 			c.logger.Debug("Polling PAPI")
-			err := c.doQuery()
+			err := c.poll()
 			if err != nil {
 				c.logger.Errorf("failed to poll: %s", err)
 				if err == errUnauthorized {
@@ -160,6 +173,7 @@ func (c *LongPollClient) Start(since time.Time) chan Event {
 	c.logger.Infof("starting polling client")
 	c.c = make(chan Event)
 	c.since = since.Unix() * 1000
+	c.timeout = "45"
 	c.t.Go(c.pollEvents)
 	return c.c
 }
@@ -169,6 +183,38 @@ func (c *LongPollClient) Stop() error {
 	return nil
 }
 
+func (c *LongPollClient) PullOnce(since time.Time) ([]Event, error) {
+	c.logger.Debug("Pulling PAPI once")
+	c.since = since.Unix() * 1000
+	c.timeout = "1"
+	resp, err := c.doQuery()
+	if err != nil {
+		return nil, err
+	}
+	defer resp.Body.Close()
+	decoder := json.NewDecoder(resp.Body)
+	var pollResp pollResponse
+	err = decoder.Decode(&pollResp)
+	if err != nil {
+		if err == io.EOF {
+			c.logger.Debugf("server closed connection")
+			return nil, nil
+		}
+		return nil, fmt.Errorf("error decoding poll response: %v", err)
+	}
+
+	c.logger.Tracef("got response: %+v", pollResp)
+
+	if len(pollResp.ErrorMessage) > 0 {
+		if pollResp.ErrorMessage == timeoutMessage {
+			c.logger.Debugf("got timeout message")
+			return nil, nil
+		}
+		return nil, fmt.Errorf("longpoll API error message: %s", pollResp.ErrorMessage)
+	}
+	return pollResp.Events, nil
+}
+
 func NewLongPollClient(config LongPollClientConfig) (*LongPollClient, error) {
 	var logger *log.Entry
 	if config.Url == (url.URL{}) {

+ 5 - 1
pkg/types/constants.go

@@ -4,7 +4,11 @@ const ApiKeyAuthType = "api-key"
 const TlsAuthType = "tls"
 const PasswordAuthType = "password"
 
-const PAPIBaseURL = "https://papi.api.crowdsec.net/v1/decisions/stream/poll"
+const PAPIBaseURL = "https://papi.api.crowdsec.net/"
+const PAPIVersion = "v1"
+const PAPIPollUrl = "/decisions/stream/poll"
+const PAPIPermissionsUrl = "/permissions"
+
 const CAPIBaseURL = "https://api.crowdsec.net/"
 
 const CscliOrigin = "cscli"