broker.go 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421
  1. package csplugin
  2. import (
  3. "context"
  4. "errors"
  5. "fmt"
  6. "io"
  7. "os"
  8. "reflect"
  9. "strings"
  10. "sync"
  11. "text/template"
  12. "time"
  13. "github.com/Masterminds/sprig/v3"
  14. "github.com/google/uuid"
  15. plugin "github.com/hashicorp/go-plugin"
  16. log "github.com/sirupsen/logrus"
  17. "gopkg.in/tomb.v2"
  18. "gopkg.in/yaml.v2"
  19. "github.com/crowdsecurity/go-cs-lib/csstring"
  20. "github.com/crowdsecurity/go-cs-lib/slicetools"
  21. "github.com/crowdsecurity/crowdsec/pkg/csconfig"
  22. "github.com/crowdsecurity/crowdsec/pkg/models"
  23. "github.com/crowdsecurity/crowdsec/pkg/protobufs"
  24. "github.com/crowdsecurity/crowdsec/pkg/types"
  25. )
  26. var pluginMutex sync.Mutex
  27. const (
  28. PluginProtocolVersion uint = 1
  29. CrowdsecPluginKey string = "CROWDSEC_PLUGIN_KEY"
  30. )
  31. // The broker is responsible for running the plugins and dispatching events
  32. // It receives all the events from the main process and stacks them up
  33. // It is as well notified by the watcher when it needs to deliver events to plugins (based on time or count threshold)
  34. type PluginBroker struct {
  35. PluginChannel chan ProfileAlert
  36. alertsByPluginName map[string][]*models.Alert
  37. profileConfigs []*csconfig.ProfileCfg
  38. pluginConfigByName map[string]PluginConfig
  39. pluginMap map[string]plugin.Plugin
  40. notificationConfigsByPluginType map[string][][]byte // "slack" -> []{config1, config2}
  41. notificationPluginByName map[string]Notifier
  42. watcher PluginWatcher
  43. pluginKillMethods []func()
  44. pluginProcConfig *csconfig.PluginCfg
  45. pluginsTypesToDispatch map[string]struct{}
  46. }
  47. // holder to determine where to dispatch config and how to format messages
  48. type PluginConfig struct {
  49. Type string `yaml:"type"`
  50. Name string `yaml:"name"`
  51. GroupWait time.Duration `yaml:"group_wait,omitempty"`
  52. GroupThreshold int `yaml:"group_threshold,omitempty"`
  53. MaxRetry int `yaml:"max_retry,omitempty"`
  54. TimeOut time.Duration `yaml:"timeout,omitempty"`
  55. Format string `yaml:"format,omitempty"` // specific to notification plugins
  56. Config map[string]interface{} `yaml:",inline"` //to keep the plugin-specific config
  57. }
  58. type ProfileAlert struct {
  59. ProfileID uint
  60. Alert *models.Alert
  61. }
  62. func (pb *PluginBroker) Init(pluginCfg *csconfig.PluginCfg, profileConfigs []*csconfig.ProfileCfg, configPaths *csconfig.ConfigurationPaths) error {
  63. pb.PluginChannel = make(chan ProfileAlert)
  64. pb.notificationConfigsByPluginType = make(map[string][][]byte)
  65. pb.notificationPluginByName = make(map[string]Notifier)
  66. pb.pluginMap = make(map[string]plugin.Plugin)
  67. pb.pluginConfigByName = make(map[string]PluginConfig)
  68. pb.alertsByPluginName = make(map[string][]*models.Alert)
  69. pb.profileConfigs = profileConfigs
  70. pb.pluginProcConfig = pluginCfg
  71. pb.pluginsTypesToDispatch = make(map[string]struct{})
  72. if err := pb.loadConfig(configPaths.NotificationDir); err != nil {
  73. return fmt.Errorf("while loading plugin config: %w", err)
  74. }
  75. if err := pb.loadPlugins(configPaths.PluginDir); err != nil {
  76. return fmt.Errorf("while loading plugin: %w", err)
  77. }
  78. pb.watcher = PluginWatcher{}
  79. pb.watcher.Init(pb.pluginConfigByName, pb.alertsByPluginName)
  80. return nil
  81. }
  82. func (pb *PluginBroker) Kill() {
  83. for _, kill := range pb.pluginKillMethods {
  84. kill()
  85. }
  86. }
  87. func (pb *PluginBroker) Run(pluginTomb *tomb.Tomb) {
  88. //we get signaled via the channel when notifications need to be delivered to plugin (via the watcher)
  89. pb.watcher.Start(&tomb.Tomb{})
  90. loop:
  91. for {
  92. select {
  93. case profileAlert := <-pb.PluginChannel:
  94. pb.addProfileAlert(profileAlert)
  95. case pluginName := <-pb.watcher.PluginEvents:
  96. // this can be run in goroutine, but then locks will be needed
  97. pluginMutex.Lock()
  98. log.Tracef("going to deliver %d alerts to plugin %s", len(pb.alertsByPluginName[pluginName]), pluginName)
  99. tmpAlerts := pb.alertsByPluginName[pluginName]
  100. pb.alertsByPluginName[pluginName] = make([]*models.Alert, 0)
  101. pluginMutex.Unlock()
  102. go func() {
  103. //Chunk alerts to respect group_threshold
  104. threshold := pb.pluginConfigByName[pluginName].GroupThreshold
  105. if threshold == 0 {
  106. threshold = 1
  107. }
  108. for _, chunk := range slicetools.Chunks(tmpAlerts, threshold) {
  109. if err := pb.pushNotificationsToPlugin(pluginName, chunk); err != nil {
  110. log.WithField("plugin:", pluginName).Error(err)
  111. }
  112. }
  113. }()
  114. case <-pluginTomb.Dying():
  115. log.Infof("pluginTomb dying")
  116. pb.watcher.tomb.Kill(errors.New("Terminating"))
  117. for {
  118. select {
  119. case <-pb.watcher.tomb.Dead():
  120. log.Info("killing all plugins")
  121. pb.Kill()
  122. break loop
  123. case pluginName := <-pb.watcher.PluginEvents:
  124. // this can be run in goroutine, but then locks will be needed
  125. pluginMutex.Lock()
  126. log.Tracef("going to deliver %d alerts to plugin %s", len(pb.alertsByPluginName[pluginName]), pluginName)
  127. tmpAlerts := pb.alertsByPluginName[pluginName]
  128. pb.alertsByPluginName[pluginName] = make([]*models.Alert, 0)
  129. pluginMutex.Unlock()
  130. if err := pb.pushNotificationsToPlugin(pluginName, tmpAlerts); err != nil {
  131. log.WithField("plugin:", pluginName).Error(err)
  132. }
  133. }
  134. }
  135. }
  136. }
  137. }
  138. func (pb *PluginBroker) addProfileAlert(profileAlert ProfileAlert) {
  139. for _, pluginName := range pb.profileConfigs[profileAlert.ProfileID].Notifications {
  140. if _, ok := pb.pluginConfigByName[pluginName]; !ok {
  141. log.Errorf("plugin %s is not configured properly.", pluginName)
  142. continue
  143. }
  144. pluginMutex.Lock()
  145. pb.alertsByPluginName[pluginName] = append(pb.alertsByPluginName[pluginName], profileAlert.Alert)
  146. pluginMutex.Unlock()
  147. pb.watcher.Inserts <- pluginName
  148. }
  149. }
  150. func (pb *PluginBroker) profilesContainPlugin(pluginName string) bool {
  151. for _, profileCfg := range pb.profileConfigs {
  152. for _, name := range profileCfg.Notifications {
  153. if pluginName == name {
  154. return true
  155. }
  156. }
  157. }
  158. return false
  159. }
  160. func (pb *PluginBroker) loadConfig(path string) error {
  161. files, err := listFilesAtPath(path)
  162. if err != nil {
  163. return err
  164. }
  165. for _, configFilePath := range files {
  166. if !strings.HasSuffix(configFilePath, ".yaml") && !strings.HasSuffix(configFilePath, ".yml") {
  167. continue
  168. }
  169. pluginConfigs, err := ParsePluginConfigFile(configFilePath)
  170. if err != nil {
  171. return err
  172. }
  173. for _, pluginConfig := range pluginConfigs {
  174. SetRequiredFields(&pluginConfig)
  175. if _, ok := pb.pluginConfigByName[pluginConfig.Name]; ok {
  176. log.Warningf("notification '%s' is defined multiple times", pluginConfig.Name)
  177. }
  178. pb.pluginConfigByName[pluginConfig.Name] = pluginConfig
  179. if !pb.profilesContainPlugin(pluginConfig.Name) {
  180. continue
  181. }
  182. }
  183. }
  184. err = pb.verifyPluginConfigsWithProfile()
  185. return err
  186. }
  187. // checks whether every notification in profile has its own config file
  188. func (pb *PluginBroker) verifyPluginConfigsWithProfile() error {
  189. for _, profileCfg := range pb.profileConfigs {
  190. for _, pluginName := range profileCfg.Notifications {
  191. if _, ok := pb.pluginConfigByName[pluginName]; !ok {
  192. return fmt.Errorf("config file for plugin %s not found", pluginName)
  193. }
  194. pb.pluginsTypesToDispatch[pb.pluginConfigByName[pluginName].Type] = struct{}{}
  195. }
  196. }
  197. return nil
  198. }
  199. // check whether each plugin in profile has its own binary
  200. func (pb *PluginBroker) verifyPluginBinaryWithProfile() error {
  201. for _, profileCfg := range pb.profileConfigs {
  202. for _, pluginName := range profileCfg.Notifications {
  203. if _, ok := pb.notificationPluginByName[pluginName]; !ok {
  204. return fmt.Errorf("binary for plugin %s not found", pluginName)
  205. }
  206. }
  207. }
  208. return nil
  209. }
  210. func (pb *PluginBroker) loadPlugins(path string) error {
  211. binaryPaths, err := listFilesAtPath(path)
  212. if err != nil {
  213. return err
  214. }
  215. for _, binaryPath := range binaryPaths {
  216. if err := pluginIsValid(binaryPath); err != nil {
  217. return err
  218. }
  219. pType, pSubtype, err := getPluginTypeAndSubtypeFromPath(binaryPath) // eg pType="notification" , pSubtype="slack"
  220. if err != nil {
  221. return err
  222. }
  223. if pType != "notification" {
  224. continue
  225. }
  226. if _, ok := pb.pluginsTypesToDispatch[pSubtype]; !ok {
  227. continue
  228. }
  229. pluginClient, err := pb.loadNotificationPlugin(pSubtype, binaryPath)
  230. if err != nil {
  231. return err
  232. }
  233. for _, pc := range pb.pluginConfigByName {
  234. if pc.Type != pSubtype {
  235. continue
  236. }
  237. data, err := yaml.Marshal(pc)
  238. if err != nil {
  239. return err
  240. }
  241. data = []byte(csstring.StrictExpand(string(data), os.LookupEnv))
  242. _, err = pluginClient.Configure(context.Background(), &protobufs.Config{Config: data})
  243. if err != nil {
  244. return fmt.Errorf("while configuring %s: %w", pc.Name, err)
  245. }
  246. log.Infof("registered plugin %s", pc.Name)
  247. pb.notificationPluginByName[pc.Name] = pluginClient
  248. }
  249. }
  250. return pb.verifyPluginBinaryWithProfile()
  251. }
  252. func (pb *PluginBroker) loadNotificationPlugin(name string, binaryPath string) (Notifier, error) {
  253. handshake, err := getHandshake()
  254. if err != nil {
  255. return nil, err
  256. }
  257. log.Debugf("Executing plugin %s", binaryPath)
  258. cmd, err := pb.CreateCmd(binaryPath)
  259. if err != nil {
  260. return nil, err
  261. }
  262. pb.pluginMap[name] = &NotifierPlugin{}
  263. l := log.New()
  264. err = types.ConfigureLogger(l)
  265. if err != nil {
  266. return nil, err
  267. }
  268. // We set the highest level to permit plugins to set their own log level
  269. // without that, crowdsec log level is controlling plugins level
  270. l.SetLevel(log.TraceLevel)
  271. logger := NewHCLogAdapter(l, "")
  272. c := plugin.NewClient(&plugin.ClientConfig{
  273. HandshakeConfig: handshake,
  274. Plugins: pb.pluginMap,
  275. Cmd: cmd,
  276. AllowedProtocols: []plugin.Protocol{plugin.ProtocolGRPC},
  277. Logger: logger,
  278. })
  279. client, err := c.Client()
  280. if err != nil {
  281. return nil, err
  282. }
  283. raw, err := client.Dispense(name)
  284. if err != nil {
  285. return nil, err
  286. }
  287. pb.pluginKillMethods = append(pb.pluginKillMethods, c.Kill)
  288. return raw.(Notifier), nil
  289. }
  290. func (pb *PluginBroker) pushNotificationsToPlugin(pluginName string, alerts []*models.Alert) error {
  291. log.WithField("plugin", pluginName).Debugf("pushing %d alerts to plugin", len(alerts))
  292. if len(alerts) == 0 {
  293. return nil
  294. }
  295. message, err := FormatAlerts(pb.pluginConfigByName[pluginName].Format, alerts)
  296. if err != nil {
  297. return err
  298. }
  299. plugin := pb.notificationPluginByName[pluginName]
  300. backoffDuration := time.Second
  301. for i := 1; i <= pb.pluginConfigByName[pluginName].MaxRetry; i++ {
  302. ctx, cancel := context.WithTimeout(context.Background(), pb.pluginConfigByName[pluginName].TimeOut)
  303. defer cancel()
  304. _, err = plugin.Notify(
  305. ctx,
  306. &protobufs.Notification{
  307. Text: message,
  308. Name: pluginName,
  309. },
  310. )
  311. if err == nil {
  312. return nil
  313. }
  314. log.WithField("plugin", pluginName).Errorf("%s error, retry num %d", err, i)
  315. time.Sleep(backoffDuration)
  316. backoffDuration *= 2
  317. }
  318. return err
  319. }
  320. func ParsePluginConfigFile(path string) ([]PluginConfig, error) {
  321. parsedConfigs := make([]PluginConfig, 0)
  322. yamlFile, err := os.Open(path)
  323. if err != nil {
  324. return nil, fmt.Errorf("while opening %s: %w", path, err)
  325. }
  326. dec := yaml.NewDecoder(yamlFile)
  327. dec.SetStrict(true)
  328. for {
  329. pc := PluginConfig{}
  330. err = dec.Decode(&pc)
  331. if err != nil {
  332. if errors.Is(err, io.EOF) {
  333. break
  334. }
  335. return nil, fmt.Errorf("while decoding %s got error %s", path, err)
  336. }
  337. // if the yaml document is empty, skip
  338. if reflect.DeepEqual(pc, PluginConfig{}) {
  339. continue
  340. }
  341. parsedConfigs = append(parsedConfigs, pc)
  342. }
  343. return parsedConfigs, nil
  344. }
  345. func SetRequiredFields(pluginCfg *PluginConfig) {
  346. if pluginCfg.MaxRetry == 0 {
  347. pluginCfg.MaxRetry++
  348. }
  349. if pluginCfg.TimeOut == time.Second*0 {
  350. pluginCfg.TimeOut = time.Second * 5
  351. }
  352. }
  353. func getUUID() (string, error) {
  354. uuidv4, err := uuid.NewRandom()
  355. if err != nil {
  356. return "", err
  357. }
  358. return uuidv4.String(), nil
  359. }
  360. func getHandshake() (plugin.HandshakeConfig, error) {
  361. uuid, err := getUUID()
  362. if err != nil {
  363. return plugin.HandshakeConfig{}, err
  364. }
  365. handshake := plugin.HandshakeConfig{
  366. ProtocolVersion: PluginProtocolVersion,
  367. MagicCookieKey: CrowdsecPluginKey,
  368. MagicCookieValue: uuid,
  369. }
  370. return handshake, nil
  371. }
  372. func FormatAlerts(format string, alerts []*models.Alert) (string, error) {
  373. template, err := template.New("").Funcs(sprig.TxtFuncMap()).Funcs(funcMap()).Parse(format)
  374. if err != nil {
  375. return "", err
  376. }
  377. b := new(strings.Builder)
  378. err = template.Execute(b, alerts)
  379. if err != nil {
  380. return "", err
  381. }
  382. return b.String(), nil
  383. }