Przeglądaj źródła

Implement reinject command to send notifications of alerts (#1638)

* implement reinject command to send notifications of alerts using a profile

Co-authored-by: sabban <15465465+sabban@users.noreply.github.com>
Manuel Sabban 2 lat temu
rodzic
commit
7d0f89df29

+ 166 - 18
cmd/crowdsec-cli/notifications.go

@@ -1,25 +1,35 @@
 package main
 
 import (
+	"context"
 	"encoding/csv"
 	"encoding/json"
 	"fmt"
 	"io/fs"
+	"net/url"
 	"os"
 	"path/filepath"
+	"strconv"
 	"strings"
+	"time"
 
+	"github.com/crowdsecurity/crowdsec/pkg/apiclient"
 	"github.com/crowdsecurity/crowdsec/pkg/csconfig"
 	"github.com/crowdsecurity/crowdsec/pkg/csplugin"
+	"github.com/crowdsecurity/crowdsec/pkg/csprofiles"
+	"github.com/crowdsecurity/crowdsec/pkg/cwversion"
+	"github.com/go-openapi/strfmt"
 	"github.com/olekukonko/tablewriter"
 	"github.com/pkg/errors"
 	log "github.com/sirupsen/logrus"
 	"github.com/spf13/cobra"
+	"gopkg.in/tomb.v2"
 )
 
 type NotificationsCfg struct {
 	Config   csplugin.PluginConfig  `json:"plugin_config"`
 	Profiles []*csconfig.ProfileCfg `json:"associated_profiles"`
+	ids      []uint
 }
 
 func NewNotificationsCmd() *cobra.Command {
@@ -50,8 +60,12 @@ func NewNotificationsCmd() *cobra.Command {
 		Example:           `cscli notifications list`,
 		Args:              cobra.ExactArgs(0),
 		DisableAutoGenTag: true,
-		Run: func(cmd *cobra.Command, arg []string) {
-			ncfgs := getNotificationsConfiguration()
+		RunE: func(cmd *cobra.Command, arg []string) error {
+			ncfgs, err := getNotificationsConfiguration()
+			if err != nil {
+				return errors.Wrap(err, "Can't build profiles configuration")
+			}
+
 			if csConfig.Cscli.Output == "human" {
 				table := tablewriter.NewWriter(os.Stdout)
 				table.SetCenterSeparator("")
@@ -72,14 +86,14 @@ func NewNotificationsCmd() *cobra.Command {
 			} else if csConfig.Cscli.Output == "json" {
 				x, err := json.MarshalIndent(ncfgs, "", " ")
 				if err != nil {
-					log.Fatalf("failed to marshal notification configuration")
+					return errors.New("failed to marshal notification configuration")
 				}
 				fmt.Printf("%s", string(x))
 			} else if csConfig.Cscli.Output == "raw" {
 				csvwriter := csv.NewWriter(os.Stdout)
 				err := csvwriter.Write([]string{"Name", "Type", "Profile name"})
 				if err != nil {
-					log.Fatalf("failed to write raw header: %s", err)
+					return errors.Wrap(err, "failed to write raw header")
 				}
 				for _, b := range ncfgs {
 					profilesList := []string{}
@@ -88,11 +102,12 @@ func NewNotificationsCmd() *cobra.Command {
 					}
 					err := csvwriter.Write([]string{b.Config.Name, b.Config.Type, strings.Join(profilesList, ", ")})
 					if err != nil {
-						log.Fatalf("failed to write raw content: %s", err)
+						return errors.Wrap(err, "failed to write raw content")
 					}
 				}
 				csvwriter.Flush()
 			}
+			return nil
 		},
 	}
 	cmdNotifications.AddCommand(cmdNotificationsList)
@@ -104,7 +119,7 @@ func NewNotificationsCmd() *cobra.Command {
 		Example:           `cscli notifications inspect <plugin_name>`,
 		Args:              cobra.ExactArgs(1),
 		DisableAutoGenTag: true,
-		Run: func(cmd *cobra.Command, arg []string) {
+		RunE: func(cmd *cobra.Command, arg []string) error {
 			var (
 				cfg NotificationsCfg
 				ok  bool
@@ -113,11 +128,14 @@ func NewNotificationsCmd() *cobra.Command {
 			pluginName := arg[0]
 
 			if pluginName == "" {
-				log.Fatalf("Please provide a plugin name to inspect")
+				errors.New("Please provide a plugin name to inspect")
+			}
+			ncfgs, err := getNotificationsConfiguration()
+			if err != nil {
+				return errors.Wrap(err, "Can't build profiles configuration")
 			}
-			ncfgs := getNotificationsConfiguration()
 			if cfg, ok = ncfgs[pluginName]; !ok {
-				log.Fatalf("The provided plugin name doesn't exist or isn't active")
+				return errors.New("The provided plugin name doesn't exist or isn't active")
 			}
 
 			if csConfig.Cscli.Output == "human" || csConfig.Cscli.Output == "raw" {
@@ -131,17 +149,140 @@ func NewNotificationsCmd() *cobra.Command {
 			} else if csConfig.Cscli.Output == "json" {
 				x, err := json.MarshalIndent(cfg, "", " ")
 				if err != nil {
-					log.Fatalf("failed to marshal notification configuration")
+					return errors.New("failed to marshal notification configuration")
 				}
 				fmt.Printf("%s", string(x))
 			}
+			return nil
 		},
 	}
 	cmdNotifications.AddCommand(cmdNotificationsInspect)
+	var remediation bool
+	var alertOverride string
+	var cmdNotificationsReinject = &cobra.Command{
+		Use:   "reinject",
+		Short: "reinject alert into notifications system",
+		Long:  `Reinject alert into notifications system`,
+		Example: `
+cscli notifications reinject <alert_id>
+cscli notifications reinject <alert_id> --remediation
+cscli notifications reinject <alert_id> -a '{"remediation": true,"scenario":"notification/test"}'
+`,
+		Args:              cobra.ExactArgs(1),
+		DisableAutoGenTag: true,
+		RunE: func(cmd *cobra.Command, args []string) error {
+			var (
+				pluginBroker csplugin.PluginBroker
+				pluginTomb   tomb.Tomb
+			)
+			if len(args) != 1 {
+				printHelp(cmd)
+				return errors.New("Wrong number of argument: there should be one argument")
+			}
+
+			//first: get the alert
+			id, err := strconv.Atoi(args[0])
+			if err != nil {
+				return errors.New(fmt.Sprintf("bad alert id %s", args[0]))
+			}
+			if err := csConfig.LoadAPIClient(); err != nil {
+				return errors.Wrapf(err, "loading api client")
+			}
+			if csConfig.API.Client == nil {
+				return errors.New("There is no configuration on 'api_client:'")
+			}
+			if csConfig.API.Client.Credentials == nil {
+				return errors.New(fmt.Sprintf("Please provide credentials for the API in '%s'", csConfig.API.Client.CredentialsFilePath))
+			}
+			apiURL, err := url.Parse(csConfig.API.Client.Credentials.URL)
+			if err != nil {
+				return errors.Wrapf(err, "error parsing the URL of the API")
+			}
+			client, err := apiclient.NewClient(&apiclient.Config{
+				MachineID:     csConfig.API.Client.Credentials.Login,
+				Password:      strfmt.Password(csConfig.API.Client.Credentials.Password),
+				UserAgent:     fmt.Sprintf("crowdsec/%s", cwversion.VersionStr()),
+				URL:           apiURL,
+				VersionPrefix: "v1",
+			})
+			if err != nil {
+				return errors.Wrapf(err, "error creating the client for the API")
+			}
+			alert, _, err := client.Alerts.GetByID(context.Background(), id)
+			if err != nil {
+				return errors.Wrapf(err, fmt.Sprintf("can't find alert with id %s", args[0]))
+			}
+
+			if alertOverride != "" {
+				if err = json.Unmarshal([]byte(alertOverride), alert); err != nil {
+					return errors.Wrapf(err, "Can't unmarshal the data given in the alert flag")
+				}
+			}
+			if !remediation {
+				alert.Remediation = true
+			}
+
+			// second we start plugins
+			err = pluginBroker.Init(csConfig.PluginConfig, csConfig.API.Server.Profiles, csConfig.ConfigPaths)
+			if err != nil {
+				return errors.Wrapf(err, "Can't initialize plugins")
+			}
+
+			pluginTomb.Go(func() error {
+				pluginBroker.Run(&pluginTomb)
+				return nil
+			})
+
+			//third: get the profile(s), and process the whole stuff
+
+			profiles, err := csprofiles.NewProfile(csConfig.API.Server.Profiles)
+			if err != nil {
+				return errors.Wrap(err, "Cannot extract profiles from configuration")
+			}
+
+			for id, profile := range profiles {
+				_, matched, err := profile.EvaluateProfile(alert)
+				if err != nil {
+					return errors.Wrapf(err, "can't evaluate profile %s", profile.Cfg.Name)
+				}
+				if !matched {
+					log.Infof("The profile %s didn't match", profile.Cfg.Name)
+					continue
+				}
+				log.Infof("The profile %s matched, sending to its configured notification plugins", profile.Cfg.Name)
+			loop:
+				for {
+					select {
+					case pluginBroker.PluginChannel <- csplugin.ProfileAlert{
+						ProfileID: uint(id),
+						Alert:     alert,
+					}:
+						break loop
+					default:
+						time.Sleep(50 * time.Millisecond)
+						log.Info("sleeping\n")
+
+					}
+				}
+				if profile.Cfg.OnSuccess == "break" {
+					log.Infof("The profile %s contains a 'on_success: break' so bailing out", profile.Cfg.Name)
+					break
+				}
+			}
+
+			//			time.Sleep(2 * time.Second) // There's no mechanism to ensure notification has been sent
+			pluginTomb.Kill(errors.New("terminating"))
+			pluginTomb.Wait()
+			return nil
+		},
+	}
+	cmdNotificationsReinject.Flags().BoolVarP(&remediation, "remediation", "r", false, "Set Alert.Remediation to false in the reinjected alert (see your profile filter configuration)")
+	cmdNotificationsReinject.Flags().StringVarP(&alertOverride, "alert", "a", "", "JSON string used to override alert fields in the reinjected alert (see crowdsec/pkg/models/alert.go in the source tree for the full definition of the object)")
+	cmdNotifications.AddCommand(cmdNotificationsReinject)
 	return cmdNotifications
 }
 
-func getNotificationsConfiguration() map[string]NotificationsCfg {
+func getNotificationsConfiguration() (map[string]NotificationsCfg, error) {
 	pcfgs := map[string]csplugin.PluginConfig{}
 	wf := func(path string, info fs.FileInfo, err error) error {
 		if info == nil {
@@ -161,38 +302,45 @@ func getNotificationsConfiguration() map[string]NotificationsCfg {
 	}
 
 	if err := filepath.Walk(csConfig.ConfigPaths.NotificationDir, wf); err != nil {
-		log.Fatalf("Loading notifification plugin configuration: %s", err)
+		return nil, errors.Wrap(err, "Loading notifification plugin configuration")
 	}
 
 	// A bit of a tricky stuf now: reconcile profiles and notification plugins
 	ncfgs := map[string]NotificationsCfg{}
-	for _, profile := range csConfig.API.Server.Profiles {
+	profiles, err := csprofiles.NewProfile(csConfig.API.Server.Profiles)
+	if err != nil {
+		return nil, errors.Wrap(err, "Cannot extract profiles from configuration")
+	}
+	for profileID, profile := range profiles {
 	loop:
-		for _, notif := range profile.Notifications {
+		for _, notif := range profile.Cfg.Notifications {
 			for name, pc := range pcfgs {
 				if notif == name {
 					if _, ok := ncfgs[pc.Name]; !ok {
 						ncfgs[pc.Name] = NotificationsCfg{
 							Config:   pc,
-							Profiles: []*csconfig.ProfileCfg{profile},
+							Profiles: []*csconfig.ProfileCfg{profile.Cfg},
+							ids:      []uint{uint(profileID)},
 						}
 						continue loop
 					}
 					tmp := ncfgs[pc.Name]
 					for _, pr := range tmp.Profiles {
 						var profiles []*csconfig.ProfileCfg
-						if pr.Name == profile.Name {
+						if pr.Name == profile.Cfg.Name {
 							continue
 						}
-						profiles = append(tmp.Profiles, profile)
+						profiles = append(tmp.Profiles, profile.Cfg)
+						ids := append(tmp.ids, uint(profileID))
 						ncfgs[pc.Name] = NotificationsCfg{
 							Config:   tmp.Config,
 							Profiles: profiles,
+							ids:      ids,
 						}
 					}
 				}
 			}
 		}
 	}
-	return ncfgs
+	return ncfgs, nil
 }

+ 26 - 6
pkg/csplugin/broker.go

@@ -96,9 +96,10 @@ func (pb *PluginBroker) Kill() {
 	}
 }
 
-func (pb *PluginBroker) Run(tomb *tomb.Tomb) {
+func (pb *PluginBroker) Run(pluginTomb *tomb.Tomb) {
 	//we get signaled via the channel when notifications need to be delivered to plugin (via the watcher)
-	pb.watcher.Start(tomb)
+	pb.watcher.Start(&tomb.Tomb{})
+loop:
 	for {
 		select {
 		case profileAlert := <-pb.PluginChannel:
@@ -117,13 +118,32 @@ func (pb *PluginBroker) Run(tomb *tomb.Tomb) {
 				}
 			}()
 
-		case <-tomb.Dying():
-			log.Info("killing all plugins")
-			pb.Kill()
-			return
+		case <-pluginTomb.Dying():
+			log.Infof("plugingTomb dying")
+			pb.watcher.tomb.Kill(errors.New("Terminating"))
+			for {
+				select {
+				case <-pb.watcher.tomb.Dead():
+					log.Info("killing all plugins")
+					pb.Kill()
+					break loop
+				case pluginName := <-pb.watcher.PluginEvents:
+					// this can be ran in goroutine, but then locks will be needed
+					pluginMutex.Lock()
+					log.Tracef("going to deliver %d alerts to plugin %s", len(pb.alertsByPluginName[pluginName]), pluginName)
+					tmpAlerts := pb.alertsByPluginName[pluginName]
+					pb.alertsByPluginName[pluginName] = make([]*models.Alert, 0)
+					pluginMutex.Unlock()
+
+					if err := pb.pushNotificationsToPlugin(pluginName, tmpAlerts); err != nil {
+						log.WithField("plugin:", pluginName).Error(err)
+					}
+				}
+			}
 		}
 	}
 }
+
 func (pb *PluginBroker) addProfileAlert(profileAlert ProfileAlert) {
 	for _, pluginName := range pb.profileConfigs[profileAlert.ProfileID].Notifications {
 		if _, ok := pb.pluginConfigByName[pluginName]; !ok {

+ 3 - 0
pkg/csplugin/watcher.go

@@ -139,6 +139,9 @@ func (pw *PluginWatcher) watchPluginTicker(pluginName string) {
 			}
 		case <-pw.tomb.Dying():
 			ticker.Stop()
+			// emptying
+			// no lock here because we have the broker still listening even in dying state before killing us
+			pw.PluginEvents <- pluginName
 			return
 		}
 	}

+ 5 - 5
pkg/csplugin/watcher_test.go

@@ -14,8 +14,9 @@ import (
 
 var ctx = context.Background()
 
-func resetTestTomb(testTomb *tomb.Tomb) {
+func resetTestTomb(testTomb *tomb.Tomb, pw *PluginWatcher) {
 	testTomb.Kill(nil)
+	<-pw.PluginEvents
 	if err := testTomb.Wait(); err != nil {
 		log.Fatal(err)
 	}
@@ -64,8 +65,7 @@ func TestPluginWatcherInterval(t *testing.T) {
 	defer cancel()
 	err := listenChannelWithTimeout(ct, pw.PluginEvents)
 	assert.ErrorContains(t, err, "context deadline exceeded")
-
-	resetTestTomb(&testTomb)
+	resetTestTomb(&testTomb, &pw)
 	testTomb = tomb.Tomb{}
 	pw.Start(&testTomb)
 
@@ -73,7 +73,7 @@ func TestPluginWatcherInterval(t *testing.T) {
 	defer cancel()
 	err = listenChannelWithTimeout(ct, pw.PluginEvents)
 	assert.NilError(t, err)
-	resetTestTomb(&testTomb)
+	resetTestTomb(&testTomb, &pw)
 	// This is to avoid the int complaining
 }
 
@@ -113,5 +113,5 @@ func TestPluginAlertCountWatcher(t *testing.T) {
 	defer cancel()
 	err = listenChannelWithTimeout(ct, pw.PluginEvents)
 	assert.NilError(t, err)
-	resetTestTomb(&testTomb)
+	resetTestTomb(&testTomb, &pw)
 }