123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509 |
- package csplugin
- import (
- "context"
- "fmt"
- "io"
- "io/fs"
- "math"
- "os"
- "os/exec"
- "os/user"
- "path/filepath"
- "strconv"
- "strings"
- "sync"
- "syscall"
- "text/template"
- "time"
- "github.com/Masterminds/sprig"
- "github.com/crowdsecurity/crowdsec/pkg/csconfig"
- "github.com/crowdsecurity/crowdsec/pkg/models"
- "github.com/crowdsecurity/crowdsec/pkg/protobufs"
- "github.com/crowdsecurity/crowdsec/pkg/types"
- "github.com/google/uuid"
- plugin "github.com/hashicorp/go-plugin"
- "github.com/pkg/errors"
- log "github.com/sirupsen/logrus"
- "gopkg.in/tomb.v2"
- "gopkg.in/yaml.v2"
- )
- var pluginMutex sync.Mutex
- const (
- PluginProtocolVersion uint = 1
- CrowdsecPluginKey string = "CROWDSEC_PLUGIN_KEY"
- )
- //The broker is responsible for running the plugins and dispatching events
- //It receives all the events from the main process and stacks them up
- //It is as well notified by the watcher when it needs to deliver events to plugins (based on time or count threshold)
- type PluginBroker struct {
- PluginChannel chan ProfileAlert
- alertsByPluginName map[string][]*models.Alert
- profileConfigs []*csconfig.ProfileCfg
- pluginConfigByName map[string]PluginConfig
- pluginMap map[string]plugin.Plugin
- notificationConfigsByPluginType map[string][][]byte // "slack" -> []{config1, config2}
- notificationPluginByName map[string]Notifier
- watcher PluginWatcher
- pluginKillMethods []func()
- pluginProcConfig *csconfig.PluginCfg
- pluginsTypesToDispatch map[string]struct{}
- }
- // holder to determine where to dispatch config and how to format messages
- type PluginConfig struct {
- Type string `yaml:"type"`
- Name string `yaml:"name"`
- GroupWait time.Duration `yaml:"group_wait,omitempty"`
- GroupThreshold int `yaml:"group_threshold,omitempty"`
- MaxRetry int `yaml:"max_retry,omitempty"`
- TimeOut time.Duration `yaml:"timeout,omitempty"`
- Format string `yaml:"format,omitempty"` // specific to notification plugins
- Config map[string]interface{} `yaml:",inline"` //to keep the plugin-specific config
- }
- type ProfileAlert struct {
- ProfileID uint
- Alert *models.Alert
- }
- func (pb *PluginBroker) Init(pluginCfg *csconfig.PluginCfg, profileConfigs []*csconfig.ProfileCfg, configPaths *csconfig.ConfigurationPaths) error {
- pb.PluginChannel = make(chan ProfileAlert)
- pb.notificationConfigsByPluginType = make(map[string][][]byte)
- pb.notificationPluginByName = make(map[string]Notifier)
- pb.pluginMap = make(map[string]plugin.Plugin)
- pb.pluginConfigByName = make(map[string]PluginConfig)
- pb.alertsByPluginName = make(map[string][]*models.Alert)
- pb.profileConfigs = profileConfigs
- pb.pluginProcConfig = pluginCfg
- pb.pluginsTypesToDispatch = make(map[string]struct{})
- if err := pb.loadConfig(configPaths.NotificationDir); err != nil {
- return errors.Wrap(err, "while loading plugin config")
- }
- if err := pb.loadPlugins(configPaths.PluginDir); err != nil {
- return errors.Wrap(err, "while loading plugin")
- }
- pb.watcher = PluginWatcher{}
- pb.watcher.Init(pb.pluginConfigByName, pb.alertsByPluginName)
- return nil
- }
- func (pb *PluginBroker) Kill() {
- for _, kill := range pb.pluginKillMethods {
- kill()
- }
- }
- func (pb *PluginBroker) Run(tomb *tomb.Tomb) {
- //we get signaled via the channel when notifications need to be delivered to plugin (via the watcher)
- pb.watcher.Start(tomb)
- for {
- select {
- case profileAlert := <-pb.PluginChannel:
- pb.addProfileAlert(profileAlert)
- case pluginName := <-pb.watcher.PluginEvents:
- // this can be ran in goroutine, but then locks will be needed
- pluginMutex.Lock()
- log.Tracef("going to deliver %d alerts to plugin %s", len(pb.alertsByPluginName[pluginName]), pluginName)
- tmpAlerts := pb.alertsByPluginName[pluginName]
- pb.alertsByPluginName[pluginName] = make([]*models.Alert, 0)
- pluginMutex.Unlock()
- go func() {
- if err := pb.pushNotificationsToPlugin(pluginName, tmpAlerts); err != nil {
- log.WithField("plugin:", pluginName).Error(err)
- }
- }()
- case <-tomb.Dying():
- log.Info("killing all plugins")
- pb.Kill()
- return
- }
- }
- }
- func (pb *PluginBroker) addProfileAlert(profileAlert ProfileAlert) {
- for _, pluginName := range pb.profileConfigs[profileAlert.ProfileID].Notifications {
- if _, ok := pb.pluginConfigByName[pluginName]; !ok {
- log.Errorf("plugin %s is not configured properly.", pluginName)
- continue
- }
- pluginMutex.Lock()
- pb.alertsByPluginName[pluginName] = append(pb.alertsByPluginName[pluginName], profileAlert.Alert)
- pluginMutex.Unlock()
- pb.watcher.Inserts <- pluginName
- }
- }
- func (pb *PluginBroker) profilesContainPlugin(pluginName string) bool {
- for _, profileCfg := range pb.profileConfigs {
- for _, name := range profileCfg.Notifications {
- if pluginName == name {
- return true
- }
- }
- }
- return false
- }
- func (pb *PluginBroker) loadConfig(path string) error {
- files, err := listFilesAtPath(path)
- if err != nil {
- return err
- }
- for _, configFilePath := range files {
- if !strings.HasSuffix(configFilePath, ".yaml") && !strings.HasSuffix(configFilePath, ".yml") {
- continue
- }
- pluginConfigs, err := parsePluginConfigFile(configFilePath)
- if err != nil {
- return err
- }
- for _, pluginConfig := range pluginConfigs {
- if !pb.profilesContainPlugin(pluginConfig.Name) {
- continue
- }
- setRequiredFields(&pluginConfig)
- if _, ok := pb.pluginConfigByName[pluginConfig.Name]; ok {
- log.Warnf("several configs for notification %s found ", pluginConfig.Name)
- }
- pb.pluginConfigByName[pluginConfig.Name] = pluginConfig
- }
- }
- err = pb.verifyPluginConfigsWithProfile()
- return err
- }
- // checks whether every notification in profile has it's own config file
- func (pb *PluginBroker) verifyPluginConfigsWithProfile() error {
- for _, profileCfg := range pb.profileConfigs {
- for _, pluginName := range profileCfg.Notifications {
- if _, ok := pb.pluginConfigByName[pluginName]; !ok {
- return fmt.Errorf("config file for plugin %s not found", pluginName)
- }
- pb.pluginsTypesToDispatch[pb.pluginConfigByName[pluginName].Type] = struct{}{}
- }
- }
- return nil
- }
- // check whether each plugin in profile has it's own binary
- func (pb *PluginBroker) verifyPluginBinaryWithProfile() error {
- for _, profileCfg := range pb.profileConfigs {
- for _, pluginName := range profileCfg.Notifications {
- if _, ok := pb.notificationPluginByName[pluginName]; !ok {
- return fmt.Errorf("binary for plugin %s not found", pluginName)
- }
- }
- }
- return nil
- }
- func (pb *PluginBroker) loadPlugins(path string) error {
- binaryPaths, err := listFilesAtPath(path)
- if err != nil {
- return err
- }
- for _, binaryPath := range binaryPaths {
- if err := pluginIsValid(binaryPath); err != nil {
- return err
- }
- pType, pSubtype, err := getPluginTypeAndSubtypeFromPath(binaryPath) // eg pType="notification" , pSubtype="slack"
- if err != nil {
- return err
- }
- if pType != "notification" {
- continue
- }
- if _, ok := pb.pluginsTypesToDispatch[pSubtype]; !ok {
- continue
- }
- pluginClient, err := pb.loadNotificationPlugin(pSubtype, binaryPath)
- if err != nil {
- return err
- }
- for _, pc := range pb.pluginConfigByName {
- if pc.Type != pSubtype {
- continue
- }
- data, err := yaml.Marshal(pc)
- if err != nil {
- return err
- }
- _, err = pluginClient.Configure(context.Background(), &protobufs.Config{Config: data})
- if err != nil {
- return errors.Wrapf(err, "while configuring %s", pc.Name)
- }
- log.Infof("registered plugin %s", pc.Name)
- pb.notificationPluginByName[pc.Name] = pluginClient
- }
- }
- return pb.verifyPluginBinaryWithProfile()
- }
- func (pb *PluginBroker) loadNotificationPlugin(name string, binaryPath string) (Notifier, error) {
- handshake, err := getHandshake()
- if err != nil {
- return nil, err
- }
- log.Debugf("Executing plugin %s", binaryPath)
- cmd := exec.Command(binaryPath)
- if pb.pluginProcConfig.User != "" || pb.pluginProcConfig.Group != "" {
- if !(pb.pluginProcConfig.User != "" && pb.pluginProcConfig.Group != "") {
- return nil, errors.New("while getting process attributes: both plugin user and group must be set")
- }
- cmd.SysProcAttr, err = getProcessAttr(pb.pluginProcConfig.User, pb.pluginProcConfig.Group)
- if err != nil {
- return nil, errors.Wrap(err, "while getting process attributes")
- }
- cmd.SysProcAttr.Credential.NoSetGroups = true
- }
- pb.pluginMap[name] = &NotifierPlugin{}
- l := log.New()
- err = types.ConfigureLogger(l)
- if err != nil {
- return nil, err
- }
- // We set the highest level to permit plugins to set their own log level
- // without that, crowdsec log level is controlling plugins level
- l.SetLevel(log.TraceLevel)
- logger := NewHCLogAdapter(l, "")
- c := plugin.NewClient(&plugin.ClientConfig{
- HandshakeConfig: handshake,
- Plugins: pb.pluginMap,
- Cmd: cmd,
- AllowedProtocols: []plugin.Protocol{plugin.ProtocolGRPC},
- Logger: logger,
- })
- client, err := c.Client()
- if err != nil {
- return nil, err
- }
- raw, err := client.Dispense(name)
- if err != nil {
- return nil, err
- }
- pb.pluginKillMethods = append(pb.pluginKillMethods, c.Kill)
- return raw.(Notifier), nil
- }
- func (pb *PluginBroker) pushNotificationsToPlugin(pluginName string, alerts []*models.Alert) error {
- log.WithField("plugin", pluginName).Debugf("pushing %d alerts to plugin", len(alerts))
- if len(alerts) == 0 {
- return nil
- }
- message, err := formatAlerts(pb.pluginConfigByName[pluginName].Format, alerts)
- if err != nil {
- return err
- }
- plugin := pb.notificationPluginByName[pluginName]
- backoffDuration := time.Second
- for i := 1; i <= pb.pluginConfigByName[pluginName].MaxRetry; i++ {
- ctx, cancel := context.WithTimeout(context.Background(), pb.pluginConfigByName[pluginName].TimeOut)
- defer cancel()
- _, err = plugin.Notify(
- ctx,
- &protobufs.Notification{
- Text: message,
- Name: pluginName,
- },
- )
- if err == nil {
- return err
- }
- log.WithField("plugin", pluginName).Errorf("%s error, retry num %d", err.Error(), i)
- time.Sleep(backoffDuration)
- backoffDuration *= 2
- }
- return err
- }
- func parsePluginConfigFile(path string) ([]PluginConfig, error) {
- parsedConfigs := make([]PluginConfig, 0)
- yamlFile, err := os.Open(path)
- if err != nil {
- return parsedConfigs, errors.Wrapf(err, "while opening %s", path)
- }
- dec := yaml.NewDecoder(yamlFile)
- dec.SetStrict(true)
- for {
- pc := PluginConfig{}
- err = dec.Decode(&pc)
- if err != nil {
- if err == io.EOF {
- break
- }
- return []PluginConfig{}, fmt.Errorf("while decoding %s got error %s", path, err.Error())
- }
- parsedConfigs = append(parsedConfigs, pc)
- }
- return parsedConfigs, nil
- }
- func setRequiredFields(pluginCfg *PluginConfig) {
- if pluginCfg.MaxRetry == 0 {
- pluginCfg.MaxRetry++
- }
- if pluginCfg.TimeOut == time.Second*0 {
- pluginCfg.TimeOut = time.Second * 5
- }
- }
- func pluginIsValid(path string) error {
- var details fs.FileInfo
- var err error
- // check if it exists
- if details, err = os.Stat(path); err != nil {
- return errors.Wrap(err, fmt.Sprintf("plugin at %s does not exist", path))
- }
- // check if it is owned by current user
- currentUser, err := user.Current()
- if err != nil {
- return errors.Wrap(err, "while getting current user")
- }
- currentUID, err := getUID(currentUser.Username)
- if err != nil {
- return errors.Wrap(err, "while looking up the current uid")
- }
- stat := details.Sys().(*syscall.Stat_t)
- if stat.Uid != currentUID {
- return fmt.Errorf("plugin at %s is not owned by user '%s'", path, currentUser.Username)
- }
- mode := details.Mode()
- perm := uint32(mode)
- if (perm & 00002) != 0 {
- return fmt.Errorf("plugin at %s is world writable, world writable plugins are invalid", path)
- }
- if (perm & 00020) != 0 {
- return fmt.Errorf("plugin at %s is group writable, group writable plugins are invalid", path)
- }
- if (mode & os.ModeSetgid) != 0 {
- return fmt.Errorf("plugin at %s has setgid permission, which is not allowed", path)
- }
- return nil
- }
- // helper which gives paths to all files in the given directory non-recursively
- func listFilesAtPath(path string) ([]string, error) {
- filePaths := make([]string, 0)
- files, err := os.ReadDir(path)
- if err != nil {
- return nil, err
- }
- for _, file := range files {
- if file.IsDir() {
- continue
- }
- filePaths = append(filePaths, filepath.Join(path, file.Name()))
- }
- return filePaths, nil
- }
- func getPluginTypeAndSubtypeFromPath(path string) (string, string, error) {
- pluginFileName := filepath.Base(path)
- parts := strings.Split(pluginFileName, "-")
- if len(parts) < 2 {
- return "", "", fmt.Errorf("plugin name %s is invalid. Name should be like {type-name}", path)
- }
- return strings.Join(parts[:len(parts)-1], "-"), parts[len(parts)-1], nil
- }
- func getUID(username string) (uint32, error) {
- u, err := user.Lookup(username)
- if err != nil {
- return 0, err
- }
- uid, err := strconv.ParseInt(u.Uid, 10, 32)
- if err != nil {
- return 0, err
- }
- if uid < 0 || uid > math.MaxInt32 {
- return 0, fmt.Errorf("out of bound uid")
- }
- return uint32(uid), nil
- }
- func getGID(groupname string) (uint32, error) {
- g, err := user.LookupGroup(groupname)
- if err != nil {
- return 0, err
- }
- gid, err := strconv.ParseInt(g.Gid, 10, 32)
- if err != nil {
- return 0, err
- }
- if gid < 0 || gid > math.MaxInt32 {
- return 0, fmt.Errorf("out of bound gid")
- }
- return uint32(gid), nil
- }
- func getProcessAttr(username string, groupname string) (*syscall.SysProcAttr, error) {
- uid, err := getUID(username)
- if err != nil {
- return nil, err
- }
- gid, err := getGID(groupname)
- if err != nil {
- return nil, err
- }
- return &syscall.SysProcAttr{
- Credential: &syscall.Credential{
- Uid: uid,
- Gid: gid,
- },
- }, nil
- }
- func getUUID() (string, error) {
- uuidv4, err := uuid.NewRandom()
- if err != nil {
- return "", err
- }
- return uuidv4.String(), nil
- }
- func getHandshake() (plugin.HandshakeConfig, error) {
- uuid, err := getUUID()
- if err != nil {
- return plugin.HandshakeConfig{}, err
- }
- handshake := plugin.HandshakeConfig{
- ProtocolVersion: PluginProtocolVersion,
- MagicCookieKey: CrowdsecPluginKey,
- MagicCookieValue: uuid,
- }
- return handshake, nil
- }
- func formatAlerts(format string, alerts []*models.Alert) (string, error) {
- template, err := template.New("").Funcs(sprig.TxtFuncMap()).Parse(format)
- if err != nil {
- return "", err
- }
- b := new(strings.Builder)
- err = template.Execute(b, alerts)
- if err != nil {
- return "", err
- }
- return b.String(), nil
- }
|