broker.go 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403
  1. package csplugin
  2. import (
  3. "context"
  4. "fmt"
  5. "io"
  6. "os"
  7. "path/filepath"
  8. "strings"
  9. "sync"
  10. "text/template"
  11. "time"
  12. "github.com/Masterminds/sprig"
  13. "github.com/crowdsecurity/crowdsec/pkg/csconfig"
  14. "github.com/crowdsecurity/crowdsec/pkg/models"
  15. "github.com/crowdsecurity/crowdsec/pkg/protobufs"
  16. "github.com/crowdsecurity/crowdsec/pkg/types"
  17. "github.com/google/uuid"
  18. plugin "github.com/hashicorp/go-plugin"
  19. "github.com/pkg/errors"
  20. log "github.com/sirupsen/logrus"
  21. "gopkg.in/tomb.v2"
  22. "gopkg.in/yaml.v2"
  23. )
  24. var pluginMutex sync.Mutex
  25. const (
  26. PluginProtocolVersion uint = 1
  27. CrowdsecPluginKey string = "CROWDSEC_PLUGIN_KEY"
  28. )
  29. //The broker is responsible for running the plugins and dispatching events
  30. //It receives all the events from the main process and stacks them up
  31. //It is as well notified by the watcher when it needs to deliver events to plugins (based on time or count threshold)
  32. type PluginBroker struct {
  33. PluginChannel chan ProfileAlert
  34. alertsByPluginName map[string][]*models.Alert
  35. profileConfigs []*csconfig.ProfileCfg
  36. pluginConfigByName map[string]PluginConfig
  37. pluginMap map[string]plugin.Plugin
  38. notificationConfigsByPluginType map[string][][]byte // "slack" -> []{config1, config2}
  39. notificationPluginByName map[string]Notifier
  40. watcher PluginWatcher
  41. pluginKillMethods []func()
  42. pluginProcConfig *csconfig.PluginCfg
  43. pluginsTypesToDispatch map[string]struct{}
  44. }
  45. // holder to determine where to dispatch config and how to format messages
  46. type PluginConfig struct {
  47. Type string `yaml:"type"`
  48. Name string `yaml:"name"`
  49. GroupWait time.Duration `yaml:"group_wait,omitempty"`
  50. GroupThreshold int `yaml:"group_threshold,omitempty"`
  51. MaxRetry int `yaml:"max_retry,omitempty"`
  52. TimeOut time.Duration `yaml:"timeout,omitempty"`
  53. Format string `yaml:"format,omitempty"` // specific to notification plugins
  54. Config map[string]interface{} `yaml:",inline"` //to keep the plugin-specific config
  55. }
  56. type ProfileAlert struct {
  57. ProfileID uint
  58. Alert *models.Alert
  59. }
  60. func (pb *PluginBroker) Init(pluginCfg *csconfig.PluginCfg, profileConfigs []*csconfig.ProfileCfg, configPaths *csconfig.ConfigurationPaths) error {
  61. pb.PluginChannel = make(chan ProfileAlert)
  62. pb.notificationConfigsByPluginType = make(map[string][][]byte)
  63. pb.notificationPluginByName = make(map[string]Notifier)
  64. pb.pluginMap = make(map[string]plugin.Plugin)
  65. pb.pluginConfigByName = make(map[string]PluginConfig)
  66. pb.alertsByPluginName = make(map[string][]*models.Alert)
  67. pb.profileConfigs = profileConfigs
  68. pb.pluginProcConfig = pluginCfg
  69. pb.pluginsTypesToDispatch = make(map[string]struct{})
  70. if err := pb.loadConfig(configPaths.NotificationDir); err != nil {
  71. return errors.Wrap(err, "while loading plugin config")
  72. }
  73. if err := pb.loadPlugins(configPaths.PluginDir); err != nil {
  74. return errors.Wrap(err, "while loading plugin")
  75. }
  76. pb.watcher = PluginWatcher{}
  77. pb.watcher.Init(pb.pluginConfigByName, pb.alertsByPluginName)
  78. return nil
  79. }
  80. func (pb *PluginBroker) Kill() {
  81. for _, kill := range pb.pluginKillMethods {
  82. kill()
  83. }
  84. }
  85. func (pb *PluginBroker) Run(tomb *tomb.Tomb) {
  86. //we get signaled via the channel when notifications need to be delivered to plugin (via the watcher)
  87. pb.watcher.Start(tomb)
  88. for {
  89. select {
  90. case profileAlert := <-pb.PluginChannel:
  91. pb.addProfileAlert(profileAlert)
  92. case pluginName := <-pb.watcher.PluginEvents:
  93. // this can be ran in goroutine, but then locks will be needed
  94. pluginMutex.Lock()
  95. log.Tracef("going to deliver %d alerts to plugin %s", len(pb.alertsByPluginName[pluginName]), pluginName)
  96. tmpAlerts := pb.alertsByPluginName[pluginName]
  97. pb.alertsByPluginName[pluginName] = make([]*models.Alert, 0)
  98. pluginMutex.Unlock()
  99. go func() {
  100. if err := pb.pushNotificationsToPlugin(pluginName, tmpAlerts); err != nil {
  101. log.WithField("plugin:", pluginName).Error(err)
  102. }
  103. }()
  104. case <-tomb.Dying():
  105. log.Info("killing all plugins")
  106. pb.Kill()
  107. return
  108. }
  109. }
  110. }
  111. func (pb *PluginBroker) addProfileAlert(profileAlert ProfileAlert) {
  112. for _, pluginName := range pb.profileConfigs[profileAlert.ProfileID].Notifications {
  113. if _, ok := pb.pluginConfigByName[pluginName]; !ok {
  114. log.Errorf("plugin %s is not configured properly.", pluginName)
  115. continue
  116. }
  117. pluginMutex.Lock()
  118. pb.alertsByPluginName[pluginName] = append(pb.alertsByPluginName[pluginName], profileAlert.Alert)
  119. pluginMutex.Unlock()
  120. pb.watcher.Inserts <- pluginName
  121. }
  122. }
  123. func (pb *PluginBroker) profilesContainPlugin(pluginName string) bool {
  124. for _, profileCfg := range pb.profileConfigs {
  125. for _, name := range profileCfg.Notifications {
  126. if pluginName == name {
  127. return true
  128. }
  129. }
  130. }
  131. return false
  132. }
  133. func (pb *PluginBroker) loadConfig(path string) error {
  134. files, err := listFilesAtPath(path)
  135. if err != nil {
  136. return err
  137. }
  138. for _, configFilePath := range files {
  139. if !strings.HasSuffix(configFilePath, ".yaml") && !strings.HasSuffix(configFilePath, ".yml") {
  140. continue
  141. }
  142. pluginConfigs, err := ParsePluginConfigFile(configFilePath)
  143. if err != nil {
  144. return err
  145. }
  146. for _, pluginConfig := range pluginConfigs {
  147. if !pb.profilesContainPlugin(pluginConfig.Name) {
  148. continue
  149. }
  150. setRequiredFields(&pluginConfig)
  151. if _, ok := pb.pluginConfigByName[pluginConfig.Name]; ok {
  152. log.Warnf("several configs for notification %s found ", pluginConfig.Name)
  153. }
  154. pb.pluginConfigByName[pluginConfig.Name] = pluginConfig
  155. }
  156. }
  157. err = pb.verifyPluginConfigsWithProfile()
  158. return err
  159. }
  160. // checks whether every notification in profile has it's own config file
  161. func (pb *PluginBroker) verifyPluginConfigsWithProfile() error {
  162. for _, profileCfg := range pb.profileConfigs {
  163. for _, pluginName := range profileCfg.Notifications {
  164. if _, ok := pb.pluginConfigByName[pluginName]; !ok {
  165. return fmt.Errorf("config file for plugin %s not found", pluginName)
  166. }
  167. pb.pluginsTypesToDispatch[pb.pluginConfigByName[pluginName].Type] = struct{}{}
  168. }
  169. }
  170. return nil
  171. }
  172. // check whether each plugin in profile has it's own binary
  173. func (pb *PluginBroker) verifyPluginBinaryWithProfile() error {
  174. for _, profileCfg := range pb.profileConfigs {
  175. for _, pluginName := range profileCfg.Notifications {
  176. if _, ok := pb.notificationPluginByName[pluginName]; !ok {
  177. return fmt.Errorf("binary for plugin %s not found", pluginName)
  178. }
  179. }
  180. }
  181. return nil
  182. }
  183. func (pb *PluginBroker) loadPlugins(path string) error {
  184. binaryPaths, err := listFilesAtPath(path)
  185. if err != nil {
  186. return err
  187. }
  188. for _, binaryPath := range binaryPaths {
  189. if err := pluginIsValid(binaryPath); err != nil {
  190. return err
  191. }
  192. pType, pSubtype, err := getPluginTypeAndSubtypeFromPath(binaryPath) // eg pType="notification" , pSubtype="slack"
  193. if err != nil {
  194. return err
  195. }
  196. if pType != "notification" {
  197. continue
  198. }
  199. if _, ok := pb.pluginsTypesToDispatch[pSubtype]; !ok {
  200. continue
  201. }
  202. pluginClient, err := pb.loadNotificationPlugin(pSubtype, binaryPath)
  203. if err != nil {
  204. return err
  205. }
  206. for _, pc := range pb.pluginConfigByName {
  207. if pc.Type != pSubtype {
  208. continue
  209. }
  210. data, err := yaml.Marshal(pc)
  211. if err != nil {
  212. return err
  213. }
  214. _, err = pluginClient.Configure(context.Background(), &protobufs.Config{Config: data})
  215. if err != nil {
  216. return errors.Wrapf(err, "while configuring %s", pc.Name)
  217. }
  218. log.Infof("registered plugin %s", pc.Name)
  219. pb.notificationPluginByName[pc.Name] = pluginClient
  220. }
  221. }
  222. return pb.verifyPluginBinaryWithProfile()
  223. }
  224. func (pb *PluginBroker) loadNotificationPlugin(name string, binaryPath string) (Notifier, error) {
  225. handshake, err := getHandshake()
  226. if err != nil {
  227. return nil, err
  228. }
  229. log.Debugf("Executing plugin %s", binaryPath)
  230. cmd, err := pb.CreateCmd(binaryPath)
  231. if err != nil {
  232. return nil, err
  233. }
  234. pb.pluginMap[name] = &NotifierPlugin{}
  235. l := log.New()
  236. err = types.ConfigureLogger(l)
  237. if err != nil {
  238. return nil, err
  239. }
  240. // We set the highest level to permit plugins to set their own log level
  241. // without that, crowdsec log level is controlling plugins level
  242. l.SetLevel(log.TraceLevel)
  243. logger := NewHCLogAdapter(l, "")
  244. c := plugin.NewClient(&plugin.ClientConfig{
  245. HandshakeConfig: handshake,
  246. Plugins: pb.pluginMap,
  247. Cmd: cmd,
  248. AllowedProtocols: []plugin.Protocol{plugin.ProtocolGRPC},
  249. Logger: logger,
  250. })
  251. client, err := c.Client()
  252. if err != nil {
  253. return nil, err
  254. }
  255. raw, err := client.Dispense(name)
  256. if err != nil {
  257. return nil, err
  258. }
  259. pb.pluginKillMethods = append(pb.pluginKillMethods, c.Kill)
  260. return raw.(Notifier), nil
  261. }
  262. func (pb *PluginBroker) pushNotificationsToPlugin(pluginName string, alerts []*models.Alert) error {
  263. log.WithField("plugin", pluginName).Debugf("pushing %d alerts to plugin", len(alerts))
  264. if len(alerts) == 0 {
  265. return nil
  266. }
  267. message, err := formatAlerts(pb.pluginConfigByName[pluginName].Format, alerts)
  268. if err != nil {
  269. return err
  270. }
  271. plugin := pb.notificationPluginByName[pluginName]
  272. backoffDuration := time.Second
  273. for i := 1; i <= pb.pluginConfigByName[pluginName].MaxRetry; i++ {
  274. ctx, cancel := context.WithTimeout(context.Background(), pb.pluginConfigByName[pluginName].TimeOut)
  275. defer cancel()
  276. _, err = plugin.Notify(
  277. ctx,
  278. &protobufs.Notification{
  279. Text: message,
  280. Name: pluginName,
  281. },
  282. )
  283. if err == nil {
  284. return err
  285. }
  286. log.WithField("plugin", pluginName).Errorf("%s error, retry num %d", err.Error(), i)
  287. time.Sleep(backoffDuration)
  288. backoffDuration *= 2
  289. }
  290. return err
  291. }
  292. func ParsePluginConfigFile(path string) ([]PluginConfig, error) {
  293. parsedConfigs := make([]PluginConfig, 0)
  294. yamlFile, err := os.Open(path)
  295. if err != nil {
  296. return parsedConfigs, errors.Wrapf(err, "while opening %s", path)
  297. }
  298. dec := yaml.NewDecoder(yamlFile)
  299. dec.SetStrict(true)
  300. for {
  301. pc := PluginConfig{}
  302. err = dec.Decode(&pc)
  303. if err != nil {
  304. if err == io.EOF {
  305. break
  306. }
  307. return []PluginConfig{}, fmt.Errorf("while decoding %s got error %s", path, err.Error())
  308. }
  309. parsedConfigs = append(parsedConfigs, pc)
  310. }
  311. return parsedConfigs, nil
  312. }
  313. func setRequiredFields(pluginCfg *PluginConfig) {
  314. if pluginCfg.MaxRetry == 0 {
  315. pluginCfg.MaxRetry++
  316. }
  317. if pluginCfg.TimeOut == time.Second*0 {
  318. pluginCfg.TimeOut = time.Second * 5
  319. }
  320. }
  321. // helper which gives paths to all files in the given directory non-recursively
  322. func listFilesAtPath(path string) ([]string, error) {
  323. filePaths := make([]string, 0)
  324. files, err := os.ReadDir(path)
  325. if err != nil {
  326. return nil, err
  327. }
  328. for _, file := range files {
  329. if file.IsDir() {
  330. continue
  331. }
  332. filePaths = append(filePaths, filepath.Join(path, file.Name()))
  333. }
  334. return filePaths, nil
  335. }
  336. func getUUID() (string, error) {
  337. uuidv4, err := uuid.NewRandom()
  338. if err != nil {
  339. return "", err
  340. }
  341. return uuidv4.String(), nil
  342. }
  343. func getHandshake() (plugin.HandshakeConfig, error) {
  344. uuid, err := getUUID()
  345. if err != nil {
  346. return plugin.HandshakeConfig{}, err
  347. }
  348. handshake := plugin.HandshakeConfig{
  349. ProtocolVersion: PluginProtocolVersion,
  350. MagicCookieKey: CrowdsecPluginKey,
  351. MagicCookieValue: uuid,
  352. }
  353. return handshake, nil
  354. }
  355. func formatAlerts(format string, alerts []*models.Alert) (string, error) {
  356. template, err := template.New("").Funcs(sprig.TxtFuncMap()).Parse(format)
  357. if err != nil {
  358. return "", err
  359. }
  360. b := new(strings.Builder)
  361. err = template.Execute(b, alerts)
  362. if err != nil {
  363. return "", err
  364. }
  365. return b.String(), nil
  366. }