broker.go 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509
  1. package csplugin
  2. import (
  3. "context"
  4. "fmt"
  5. "io"
  6. "io/fs"
  7. "math"
  8. "os"
  9. "os/exec"
  10. "os/user"
  11. "path/filepath"
  12. "strconv"
  13. "strings"
  14. "sync"
  15. "syscall"
  16. "text/template"
  17. "time"
  18. "github.com/Masterminds/sprig"
  19. "github.com/crowdsecurity/crowdsec/pkg/csconfig"
  20. "github.com/crowdsecurity/crowdsec/pkg/models"
  21. "github.com/crowdsecurity/crowdsec/pkg/protobufs"
  22. "github.com/crowdsecurity/crowdsec/pkg/types"
  23. "github.com/google/uuid"
  24. plugin "github.com/hashicorp/go-plugin"
  25. "github.com/pkg/errors"
  26. log "github.com/sirupsen/logrus"
  27. "gopkg.in/tomb.v2"
  28. "gopkg.in/yaml.v2"
  29. )
  30. var pluginMutex sync.Mutex
  31. const (
  32. PluginProtocolVersion uint = 1
  33. CrowdsecPluginKey string = "CROWDSEC_PLUGIN_KEY"
  34. )
  35. //The broker is responsible for running the plugins and dispatching events
  36. //It receives all the events from the main process and stacks them up
  37. //It is as well notified by the watcher when it needs to deliver events to plugins (based on time or count threshold)
  38. type PluginBroker struct {
  39. PluginChannel chan ProfileAlert
  40. alertsByPluginName map[string][]*models.Alert
  41. profileConfigs []*csconfig.ProfileCfg
  42. pluginConfigByName map[string]PluginConfig
  43. pluginMap map[string]plugin.Plugin
  44. notificationConfigsByPluginType map[string][][]byte // "slack" -> []{config1, config2}
  45. notificationPluginByName map[string]Notifier
  46. watcher PluginWatcher
  47. pluginKillMethods []func()
  48. pluginProcConfig *csconfig.PluginCfg
  49. pluginsTypesToDispatch map[string]struct{}
  50. }
  51. // holder to determine where to dispatch config and how to format messages
  52. type PluginConfig struct {
  53. Type string `yaml:"type"`
  54. Name string `yaml:"name"`
  55. GroupWait time.Duration `yaml:"group_wait,omitempty"`
  56. GroupThreshold int `yaml:"group_threshold,omitempty"`
  57. MaxRetry int `yaml:"max_retry,omitempty"`
  58. TimeOut time.Duration `yaml:"timeout,omitempty"`
  59. Format string `yaml:"format,omitempty"` // specific to notification plugins
  60. Config map[string]interface{} `yaml:",inline"` //to keep the plugin-specific config
  61. }
  62. type ProfileAlert struct {
  63. ProfileID uint
  64. Alert *models.Alert
  65. }
  66. func (pb *PluginBroker) Init(pluginCfg *csconfig.PluginCfg, profileConfigs []*csconfig.ProfileCfg, configPaths *csconfig.ConfigurationPaths) error {
  67. pb.PluginChannel = make(chan ProfileAlert)
  68. pb.notificationConfigsByPluginType = make(map[string][][]byte)
  69. pb.notificationPluginByName = make(map[string]Notifier)
  70. pb.pluginMap = make(map[string]plugin.Plugin)
  71. pb.pluginConfigByName = make(map[string]PluginConfig)
  72. pb.alertsByPluginName = make(map[string][]*models.Alert)
  73. pb.profileConfigs = profileConfigs
  74. pb.pluginProcConfig = pluginCfg
  75. pb.pluginsTypesToDispatch = make(map[string]struct{})
  76. if err := pb.loadConfig(configPaths.NotificationDir); err != nil {
  77. return errors.Wrap(err, "while loading plugin config")
  78. }
  79. if err := pb.loadPlugins(configPaths.PluginDir); err != nil {
  80. return errors.Wrap(err, "while loading plugin")
  81. }
  82. pb.watcher = PluginWatcher{}
  83. pb.watcher.Init(pb.pluginConfigByName, pb.alertsByPluginName)
  84. return nil
  85. }
  86. func (pb *PluginBroker) Kill() {
  87. for _, kill := range pb.pluginKillMethods {
  88. kill()
  89. }
  90. }
  91. func (pb *PluginBroker) Run(tomb *tomb.Tomb) {
  92. //we get signaled via the channel when notifications need to be delivered to plugin (via the watcher)
  93. pb.watcher.Start(tomb)
  94. for {
  95. select {
  96. case profileAlert := <-pb.PluginChannel:
  97. pb.addProfileAlert(profileAlert)
  98. case pluginName := <-pb.watcher.PluginEvents:
  99. // this can be ran in goroutine, but then locks will be needed
  100. pluginMutex.Lock()
  101. log.Tracef("going to deliver %d alerts to plugin %s", len(pb.alertsByPluginName[pluginName]), pluginName)
  102. tmpAlerts := pb.alertsByPluginName[pluginName]
  103. pb.alertsByPluginName[pluginName] = make([]*models.Alert, 0)
  104. pluginMutex.Unlock()
  105. go func() {
  106. if err := pb.pushNotificationsToPlugin(pluginName, tmpAlerts); err != nil {
  107. log.WithField("plugin:", pluginName).Error(err)
  108. }
  109. }()
  110. case <-tomb.Dying():
  111. log.Info("killing all plugins")
  112. pb.Kill()
  113. return
  114. }
  115. }
  116. }
  117. func (pb *PluginBroker) addProfileAlert(profileAlert ProfileAlert) {
  118. for _, pluginName := range pb.profileConfigs[profileAlert.ProfileID].Notifications {
  119. if _, ok := pb.pluginConfigByName[pluginName]; !ok {
  120. log.Errorf("plugin %s is not configured properly.", pluginName)
  121. continue
  122. }
  123. pluginMutex.Lock()
  124. pb.alertsByPluginName[pluginName] = append(pb.alertsByPluginName[pluginName], profileAlert.Alert)
  125. pluginMutex.Unlock()
  126. pb.watcher.Inserts <- pluginName
  127. }
  128. }
  129. func (pb *PluginBroker) profilesContainPlugin(pluginName string) bool {
  130. for _, profileCfg := range pb.profileConfigs {
  131. for _, name := range profileCfg.Notifications {
  132. if pluginName == name {
  133. return true
  134. }
  135. }
  136. }
  137. return false
  138. }
  139. func (pb *PluginBroker) loadConfig(path string) error {
  140. files, err := listFilesAtPath(path)
  141. if err != nil {
  142. return err
  143. }
  144. for _, configFilePath := range files {
  145. if !strings.HasSuffix(configFilePath, ".yaml") && !strings.HasSuffix(configFilePath, ".yml") {
  146. continue
  147. }
  148. pluginConfigs, err := parsePluginConfigFile(configFilePath)
  149. if err != nil {
  150. return err
  151. }
  152. for _, pluginConfig := range pluginConfigs {
  153. if !pb.profilesContainPlugin(pluginConfig.Name) {
  154. continue
  155. }
  156. setRequiredFields(&pluginConfig)
  157. if _, ok := pb.pluginConfigByName[pluginConfig.Name]; ok {
  158. log.Warnf("several configs for notification %s found ", pluginConfig.Name)
  159. }
  160. pb.pluginConfigByName[pluginConfig.Name] = pluginConfig
  161. }
  162. }
  163. err = pb.verifyPluginConfigsWithProfile()
  164. return err
  165. }
  166. // checks whether every notification in profile has it's own config file
  167. func (pb *PluginBroker) verifyPluginConfigsWithProfile() error {
  168. for _, profileCfg := range pb.profileConfigs {
  169. for _, pluginName := range profileCfg.Notifications {
  170. if _, ok := pb.pluginConfigByName[pluginName]; !ok {
  171. return fmt.Errorf("config file for plugin %s not found", pluginName)
  172. }
  173. pb.pluginsTypesToDispatch[pb.pluginConfigByName[pluginName].Type] = struct{}{}
  174. }
  175. }
  176. return nil
  177. }
  178. // check whether each plugin in profile has it's own binary
  179. func (pb *PluginBroker) verifyPluginBinaryWithProfile() error {
  180. for _, profileCfg := range pb.profileConfigs {
  181. for _, pluginName := range profileCfg.Notifications {
  182. if _, ok := pb.notificationPluginByName[pluginName]; !ok {
  183. return fmt.Errorf("binary for plugin %s not found", pluginName)
  184. }
  185. }
  186. }
  187. return nil
  188. }
  189. func (pb *PluginBroker) loadPlugins(path string) error {
  190. binaryPaths, err := listFilesAtPath(path)
  191. if err != nil {
  192. return err
  193. }
  194. for _, binaryPath := range binaryPaths {
  195. if err := pluginIsValid(binaryPath); err != nil {
  196. return err
  197. }
  198. pType, pSubtype, err := getPluginTypeAndSubtypeFromPath(binaryPath) // eg pType="notification" , pSubtype="slack"
  199. if err != nil {
  200. return err
  201. }
  202. if pType != "notification" {
  203. continue
  204. }
  205. if _, ok := pb.pluginsTypesToDispatch[pSubtype]; !ok {
  206. continue
  207. }
  208. pluginClient, err := pb.loadNotificationPlugin(pSubtype, binaryPath)
  209. if err != nil {
  210. return err
  211. }
  212. for _, pc := range pb.pluginConfigByName {
  213. if pc.Type != pSubtype {
  214. continue
  215. }
  216. data, err := yaml.Marshal(pc)
  217. if err != nil {
  218. return err
  219. }
  220. _, err = pluginClient.Configure(context.Background(), &protobufs.Config{Config: data})
  221. if err != nil {
  222. return errors.Wrapf(err, "while configuring %s", pc.Name)
  223. }
  224. log.Infof("registered plugin %s", pc.Name)
  225. pb.notificationPluginByName[pc.Name] = pluginClient
  226. }
  227. }
  228. return pb.verifyPluginBinaryWithProfile()
  229. }
  230. func (pb *PluginBroker) loadNotificationPlugin(name string, binaryPath string) (Notifier, error) {
  231. handshake, err := getHandshake()
  232. if err != nil {
  233. return nil, err
  234. }
  235. log.Debugf("Executing plugin %s", binaryPath)
  236. cmd := exec.Command(binaryPath)
  237. if pb.pluginProcConfig.User != "" || pb.pluginProcConfig.Group != "" {
  238. if !(pb.pluginProcConfig.User != "" && pb.pluginProcConfig.Group != "") {
  239. return nil, errors.New("while getting process attributes: both plugin user and group must be set")
  240. }
  241. cmd.SysProcAttr, err = getProcessAttr(pb.pluginProcConfig.User, pb.pluginProcConfig.Group)
  242. if err != nil {
  243. return nil, errors.Wrap(err, "while getting process attributes")
  244. }
  245. cmd.SysProcAttr.Credential.NoSetGroups = true
  246. }
  247. pb.pluginMap[name] = &NotifierPlugin{}
  248. l := log.New()
  249. err = types.ConfigureLogger(l)
  250. if err != nil {
  251. return nil, err
  252. }
  253. // We set the highest level to permit plugins to set their own log level
  254. // without that, crowdsec log level is controlling plugins level
  255. l.SetLevel(log.TraceLevel)
  256. logger := NewHCLogAdapter(l, "")
  257. c := plugin.NewClient(&plugin.ClientConfig{
  258. HandshakeConfig: handshake,
  259. Plugins: pb.pluginMap,
  260. Cmd: cmd,
  261. AllowedProtocols: []plugin.Protocol{plugin.ProtocolGRPC},
  262. Logger: logger,
  263. })
  264. client, err := c.Client()
  265. if err != nil {
  266. return nil, err
  267. }
  268. raw, err := client.Dispense(name)
  269. if err != nil {
  270. return nil, err
  271. }
  272. pb.pluginKillMethods = append(pb.pluginKillMethods, c.Kill)
  273. return raw.(Notifier), nil
  274. }
  275. func (pb *PluginBroker) pushNotificationsToPlugin(pluginName string, alerts []*models.Alert) error {
  276. log.WithField("plugin", pluginName).Debugf("pushing %d alerts to plugin", len(alerts))
  277. if len(alerts) == 0 {
  278. return nil
  279. }
  280. message, err := formatAlerts(pb.pluginConfigByName[pluginName].Format, alerts)
  281. if err != nil {
  282. return err
  283. }
  284. plugin := pb.notificationPluginByName[pluginName]
  285. backoffDuration := time.Second
  286. for i := 1; i <= pb.pluginConfigByName[pluginName].MaxRetry; i++ {
  287. ctx, cancel := context.WithTimeout(context.Background(), pb.pluginConfigByName[pluginName].TimeOut)
  288. defer cancel()
  289. _, err = plugin.Notify(
  290. ctx,
  291. &protobufs.Notification{
  292. Text: message,
  293. Name: pluginName,
  294. },
  295. )
  296. if err == nil {
  297. return err
  298. }
  299. log.WithField("plugin", pluginName).Errorf("%s error, retry num %d", err.Error(), i)
  300. time.Sleep(backoffDuration)
  301. backoffDuration *= 2
  302. }
  303. return err
  304. }
  305. func parsePluginConfigFile(path string) ([]PluginConfig, error) {
  306. parsedConfigs := make([]PluginConfig, 0)
  307. yamlFile, err := os.Open(path)
  308. if err != nil {
  309. return parsedConfigs, errors.Wrapf(err, "while opening %s", path)
  310. }
  311. dec := yaml.NewDecoder(yamlFile)
  312. dec.SetStrict(true)
  313. for {
  314. pc := PluginConfig{}
  315. err = dec.Decode(&pc)
  316. if err != nil {
  317. if err == io.EOF {
  318. break
  319. }
  320. return []PluginConfig{}, fmt.Errorf("while decoding %s got error %s", path, err.Error())
  321. }
  322. parsedConfigs = append(parsedConfigs, pc)
  323. }
  324. return parsedConfigs, nil
  325. }
  326. func setRequiredFields(pluginCfg *PluginConfig) {
  327. if pluginCfg.MaxRetry == 0 {
  328. pluginCfg.MaxRetry++
  329. }
  330. if pluginCfg.TimeOut == time.Second*0 {
  331. pluginCfg.TimeOut = time.Second * 5
  332. }
  333. }
  334. func pluginIsValid(path string) error {
  335. var details fs.FileInfo
  336. var err error
  337. // check if it exists
  338. if details, err = os.Stat(path); err != nil {
  339. return errors.Wrap(err, fmt.Sprintf("plugin at %s does not exist", path))
  340. }
  341. // check if it is owned by current user
  342. currentUser, err := user.Current()
  343. if err != nil {
  344. return errors.Wrap(err, "while getting current user")
  345. }
  346. currentUID, err := getUID(currentUser.Username)
  347. if err != nil {
  348. return errors.Wrap(err, "while looking up the current uid")
  349. }
  350. stat := details.Sys().(*syscall.Stat_t)
  351. if stat.Uid != currentUID {
  352. return fmt.Errorf("plugin at %s is not owned by user '%s'", path, currentUser.Username)
  353. }
  354. mode := details.Mode()
  355. perm := uint32(mode)
  356. if (perm & 00002) != 0 {
  357. return fmt.Errorf("plugin at %s is world writable, world writable plugins are invalid", path)
  358. }
  359. if (perm & 00020) != 0 {
  360. return fmt.Errorf("plugin at %s is group writable, group writable plugins are invalid", path)
  361. }
  362. if (mode & os.ModeSetgid) != 0 {
  363. return fmt.Errorf("plugin at %s has setgid permission, which is not allowed", path)
  364. }
  365. return nil
  366. }
  367. // helper which gives paths to all files in the given directory non-recursively
  368. func listFilesAtPath(path string) ([]string, error) {
  369. filePaths := make([]string, 0)
  370. files, err := os.ReadDir(path)
  371. if err != nil {
  372. return nil, err
  373. }
  374. for _, file := range files {
  375. if file.IsDir() {
  376. continue
  377. }
  378. filePaths = append(filePaths, filepath.Join(path, file.Name()))
  379. }
  380. return filePaths, nil
  381. }
  382. func getPluginTypeAndSubtypeFromPath(path string) (string, string, error) {
  383. pluginFileName := filepath.Base(path)
  384. parts := strings.Split(pluginFileName, "-")
  385. if len(parts) < 2 {
  386. return "", "", fmt.Errorf("plugin name %s is invalid. Name should be like {type-name}", path)
  387. }
  388. return strings.Join(parts[:len(parts)-1], "-"), parts[len(parts)-1], nil
  389. }
  390. func getUID(username string) (uint32, error) {
  391. u, err := user.Lookup(username)
  392. if err != nil {
  393. return 0, err
  394. }
  395. uid, err := strconv.ParseInt(u.Uid, 10, 32)
  396. if err != nil {
  397. return 0, err
  398. }
  399. if uid < 0 || uid > math.MaxInt32 {
  400. return 0, fmt.Errorf("out of bound uid")
  401. }
  402. return uint32(uid), nil
  403. }
  404. func getGID(groupname string) (uint32, error) {
  405. g, err := user.LookupGroup(groupname)
  406. if err != nil {
  407. return 0, err
  408. }
  409. gid, err := strconv.ParseInt(g.Gid, 10, 32)
  410. if err != nil {
  411. return 0, err
  412. }
  413. if gid < 0 || gid > math.MaxInt32 {
  414. return 0, fmt.Errorf("out of bound gid")
  415. }
  416. return uint32(gid), nil
  417. }
  418. func getProcessAttr(username string, groupname string) (*syscall.SysProcAttr, error) {
  419. uid, err := getUID(username)
  420. if err != nil {
  421. return nil, err
  422. }
  423. gid, err := getGID(groupname)
  424. if err != nil {
  425. return nil, err
  426. }
  427. return &syscall.SysProcAttr{
  428. Credential: &syscall.Credential{
  429. Uid: uid,
  430. Gid: gid,
  431. },
  432. }, nil
  433. }
  434. func getUUID() (string, error) {
  435. uuidv4, err := uuid.NewRandom()
  436. if err != nil {
  437. return "", err
  438. }
  439. return uuidv4.String(), nil
  440. }
  441. func getHandshake() (plugin.HandshakeConfig, error) {
  442. uuid, err := getUUID()
  443. if err != nil {
  444. return plugin.HandshakeConfig{}, err
  445. }
  446. handshake := plugin.HandshakeConfig{
  447. ProtocolVersion: PluginProtocolVersion,
  448. MagicCookieKey: CrowdsecPluginKey,
  449. MagicCookieValue: uuid,
  450. }
  451. return handshake, nil
  452. }
  453. func formatAlerts(format string, alerts []*models.Alert) (string, error) {
  454. template, err := template.New("").Funcs(sprig.TxtFuncMap()).Parse(format)
  455. if err != nil {
  456. return "", err
  457. }
  458. b := new(strings.Builder)
  459. err = template.Execute(b, alerts)
  460. if err != nil {
  461. return "", err
  462. }
  463. return b.String(), nil
  464. }