broker_test.go 9.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387
  1. //go:build linux || freebsd || netbsd || openbsd || solaris || !windows
  2. package csplugin
  3. import (
  4. "bytes"
  5. "encoding/json"
  6. "io"
  7. "os"
  8. "testing"
  9. "time"
  10. log "github.com/sirupsen/logrus"
  11. "github.com/stretchr/testify/assert"
  12. "github.com/stretchr/testify/require"
  13. "gopkg.in/tomb.v2"
  14. "gopkg.in/yaml.v2"
  15. "github.com/crowdsecurity/go-cs-lib/pkg/cstest"
  16. "github.com/crowdsecurity/crowdsec/pkg/csconfig"
  17. "github.com/crowdsecurity/crowdsec/pkg/models"
  18. )
  19. func (s *PluginSuite) permissionSetter(perm os.FileMode) func(*testing.T) {
  20. return func(t *testing.T) {
  21. err := os.Chmod(s.pluginBinary, perm)
  22. require.NoError(t, err, "chmod %s %s", perm, s.pluginBinary)
  23. }
  24. }
  25. func (s *PluginSuite) readconfig() (PluginConfig) {
  26. var config PluginConfig
  27. t := s.T()
  28. orig, err := os.ReadFile(s.pluginConfig)
  29. require.NoError(t, err,"unable to read config file %s", s.pluginConfig)
  30. err = yaml.Unmarshal(orig, &config)
  31. require.NoError(t, err,"unable to unmarshal config file")
  32. return config
  33. }
  34. func (s *PluginSuite) writeconfig(config PluginConfig) {
  35. t := s.T()
  36. data, err := yaml.Marshal(&config)
  37. require.NoError(t, err,"unable to marshal config file")
  38. err = os.WriteFile(s.pluginConfig, data, 0644)
  39. require.NoError(t, err,"unable to write config file %s", s.pluginConfig)
  40. }
  41. func (s *PluginSuite) TestBrokerInit() {
  42. tests := []struct {
  43. name string
  44. action func(*testing.T)
  45. procCfg csconfig.PluginCfg
  46. expectedErr string
  47. }{
  48. {
  49. name: "valid config",
  50. },
  51. {
  52. name: "group writable binary",
  53. expectedErr: "notification-dummy is world writable",
  54. action: s.permissionSetter(0o722),
  55. },
  56. {
  57. name: "group writable binary",
  58. expectedErr: "notification-dummy is group writable",
  59. action: s.permissionSetter(0o724),
  60. },
  61. {
  62. name: "no plugin dir",
  63. expectedErr: cstest.FileNotFoundMessage,
  64. action: func(t *testing.T) {
  65. err := os.RemoveAll(s.runDir)
  66. require.NoError(t, err)
  67. },
  68. },
  69. {
  70. name: "no plugin binary",
  71. expectedErr: "binary for plugin dummy_default not found",
  72. action: func(t *testing.T) {
  73. err := os.Remove(s.pluginBinary)
  74. require.NoError(t, err)
  75. },
  76. },
  77. {
  78. name: "only specify user",
  79. expectedErr: "both plugin user and group must be set",
  80. procCfg: csconfig.PluginCfg{
  81. User: "123445555551122toto",
  82. },
  83. },
  84. {
  85. name: "only specify group",
  86. expectedErr: "both plugin user and group must be set",
  87. procCfg: csconfig.PluginCfg{
  88. Group: "123445555551122toto",
  89. },
  90. },
  91. {
  92. name: "Fails to run as root",
  93. expectedErr: "operation not permitted",
  94. procCfg: csconfig.PluginCfg{
  95. User: "root",
  96. Group: "root",
  97. },
  98. },
  99. {
  100. name: "Invalid user and group",
  101. expectedErr: "unknown user toto1234",
  102. procCfg: csconfig.PluginCfg{
  103. User: "toto1234",
  104. Group: "toto1234",
  105. },
  106. },
  107. {
  108. name: "Valid user and invalid group",
  109. expectedErr: "unknown group toto1234",
  110. procCfg: csconfig.PluginCfg{
  111. User: "nobody",
  112. Group: "toto1234",
  113. },
  114. },
  115. }
  116. for _, tc := range tests {
  117. tc := tc
  118. s.Run(tc.name, func() {
  119. t := s.T()
  120. if tc.action != nil {
  121. tc.action(t)
  122. }
  123. _, err := s.InitBroker(&tc.procCfg)
  124. cstest.RequireErrorContains(t, err, tc.expectedErr)
  125. })
  126. }
  127. }
  128. func (s *PluginSuite) TestBrokerNoThreshold() {
  129. var alerts []models.Alert
  130. DefaultEmptyTicker = 50 * time.Millisecond
  131. t := s.T()
  132. pb, err := s.InitBroker(nil)
  133. assert.NoError(t, err)
  134. tomb := tomb.Tomb{}
  135. go pb.Run(&tomb)
  136. // send one item, it should be processed right now
  137. pb.PluginChannel <- ProfileAlert{ProfileID: uint(0), Alert: &models.Alert{}}
  138. time.Sleep(200 * time.Millisecond)
  139. // we expect one now
  140. content, err := os.ReadFile("./out")
  141. require.NoError(t, err, "Error reading file")
  142. err = json.Unmarshal(content, &alerts)
  143. require.NoError(t, err)
  144. assert.Len(t, alerts, 1)
  145. // remove it
  146. os.Remove("./out")
  147. // and another one
  148. log.Printf("second send")
  149. pb.PluginChannel <- ProfileAlert{ProfileID: uint(0), Alert: &models.Alert{}}
  150. time.Sleep(200 * time.Millisecond)
  151. // we expect one again, as we cleaned the file
  152. content, err = os.ReadFile("./out")
  153. require.NoError(t, err, "Error reading file")
  154. err = json.Unmarshal(content, &alerts)
  155. log.Printf("content-> %s", content)
  156. assert.NoError(t, err)
  157. assert.Len(t, alerts, 1)
  158. }
  159. func (s *PluginSuite) TestBrokerRunGroupAndTimeThreshold_TimeFirst() {
  160. // test grouping by "time"
  161. DefaultEmptyTicker = 50 * time.Millisecond
  162. t := s.T()
  163. // set groupwait and groupthreshold, should honor whichever comes first
  164. cfg := s.readconfig()
  165. cfg.GroupThreshold = 4
  166. cfg.GroupWait = 1 * time.Second
  167. s.writeconfig(cfg)
  168. pb, err := s.InitBroker(nil)
  169. assert.NoError(t, err)
  170. tomb := tomb.Tomb{}
  171. go pb.Run(&tomb)
  172. // send data
  173. pb.PluginChannel <- ProfileAlert{ProfileID: uint(0), Alert: &models.Alert{}}
  174. pb.PluginChannel <- ProfileAlert{ProfileID: uint(0), Alert: &models.Alert{}}
  175. pb.PluginChannel <- ProfileAlert{ProfileID: uint(0), Alert: &models.Alert{}}
  176. time.Sleep(500 * time.Millisecond)
  177. // because of group threshold, we shouldn't have data yet
  178. assert.NoFileExists(t, "./out")
  179. time.Sleep(1 * time.Second)
  180. // after 1 seconds, we should have data
  181. content, err := os.ReadFile("./out")
  182. assert.NoError(t, err)
  183. var alerts []models.Alert
  184. err = json.Unmarshal(content, &alerts)
  185. assert.NoError(t, err)
  186. assert.Len(t, alerts, 3)
  187. }
  188. func (s *PluginSuite) TestBrokerRunGroupAndTimeThreshold_CountFirst() {
  189. DefaultEmptyTicker = 50 * time.Millisecond
  190. t := s.T()
  191. // set groupwait and groupthreshold, should honor whichever comes first
  192. cfg := s.readconfig()
  193. cfg.GroupThreshold = 4
  194. cfg.GroupWait = 4 * time.Second
  195. s.writeconfig(cfg)
  196. pb, err := s.InitBroker(nil)
  197. assert.NoError(t, err)
  198. tomb := tomb.Tomb{}
  199. go pb.Run(&tomb)
  200. // send data
  201. pb.PluginChannel <- ProfileAlert{ProfileID: uint(0), Alert: &models.Alert{}}
  202. pb.PluginChannel <- ProfileAlert{ProfileID: uint(0), Alert: &models.Alert{}}
  203. pb.PluginChannel <- ProfileAlert{ProfileID: uint(0), Alert: &models.Alert{}}
  204. time.Sleep(100 * time.Millisecond)
  205. // because of group threshold, we shouldn't have data yet
  206. assert.NoFileExists(t, "./out")
  207. pb.PluginChannel <- ProfileAlert{ProfileID: uint(0), Alert: &models.Alert{}}
  208. time.Sleep(100 * time.Millisecond)
  209. // and now we should
  210. content, err := os.ReadFile("./out")
  211. require.NoError(t, err, "Error reading file")
  212. var alerts []models.Alert
  213. err = json.Unmarshal(content, &alerts)
  214. assert.NoError(t, err)
  215. assert.Len(t, alerts, 4)
  216. }
  217. func (s *PluginSuite) TestBrokerRunGroupThreshold() {
  218. // test grouping by "size"
  219. DefaultEmptyTicker = 50 * time.Millisecond
  220. t := s.T()
  221. // set groupwait
  222. cfg := s.readconfig()
  223. cfg.GroupThreshold = 4
  224. s.writeconfig(cfg)
  225. pb, err := s.InitBroker(nil)
  226. assert.NoError(t, err)
  227. tomb := tomb.Tomb{}
  228. go pb.Run(&tomb)
  229. // send data
  230. pb.PluginChannel <- ProfileAlert{ProfileID: uint(0), Alert: &models.Alert{}}
  231. pb.PluginChannel <- ProfileAlert{ProfileID: uint(0), Alert: &models.Alert{}}
  232. pb.PluginChannel <- ProfileAlert{ProfileID: uint(0), Alert: &models.Alert{}}
  233. time.Sleep(time.Second)
  234. // because of group threshold, we shouldn't have data yet
  235. assert.NoFileExists(t, "./out")
  236. pb.PluginChannel <- ProfileAlert{ProfileID: uint(0), Alert: &models.Alert{}}
  237. pb.PluginChannel <- ProfileAlert{ProfileID: uint(0), Alert: &models.Alert{}}
  238. pb.PluginChannel <- ProfileAlert{ProfileID: uint(0), Alert: &models.Alert{}}
  239. time.Sleep(time.Second)
  240. // and now we should
  241. content, err := os.ReadFile("./out")
  242. require.NoError(t, err, "Error reading file")
  243. decoder := json.NewDecoder(bytes.NewReader(content))
  244. var alerts []models.Alert
  245. // two notifications, one with 4 alerts, one with 2 alerts
  246. err = decoder.Decode(&alerts)
  247. assert.NoError(t, err)
  248. assert.Len(t, alerts, 4)
  249. err = decoder.Decode(&alerts)
  250. assert.NoError(t, err)
  251. assert.Len(t, alerts, 2)
  252. err = decoder.Decode(&alerts)
  253. assert.Equal(t, err, io.EOF)
  254. }
  255. func (s *PluginSuite) TestBrokerRunTimeThreshold() {
  256. DefaultEmptyTicker = 50 * time.Millisecond
  257. t := s.T()
  258. // set groupwait
  259. cfg := s.readconfig()
  260. cfg.GroupWait = 1 * time.Second
  261. s.writeconfig(cfg)
  262. pb, err := s.InitBroker(nil)
  263. assert.NoError(t, err)
  264. tomb := tomb.Tomb{}
  265. go pb.Run(&tomb)
  266. // send data
  267. pb.PluginChannel <- ProfileAlert{ProfileID: uint(0), Alert: &models.Alert{}}
  268. time.Sleep(200 * time.Millisecond)
  269. // we shouldn't have data yet
  270. assert.NoFileExists(t, "./out")
  271. time.Sleep(1 * time.Second)
  272. // and now we should
  273. content, err := os.ReadFile("./out")
  274. require.NoError(t, err, "Error reading file")
  275. var alerts []models.Alert
  276. err = json.Unmarshal(content, &alerts)
  277. assert.NoError(t, err)
  278. assert.Len(t, alerts, 1)
  279. }
  280. func (s *PluginSuite) TestBrokerRunSimple() {
  281. DefaultEmptyTicker = 50 * time.Millisecond
  282. t := s.T()
  283. pb, err := s.InitBroker(nil)
  284. assert.NoError(t, err)
  285. tomb := tomb.Tomb{}
  286. go pb.Run(&tomb)
  287. assert.NoFileExists(t, "./out")
  288. defer os.Remove("./out")
  289. pb.PluginChannel <- ProfileAlert{ProfileID: uint(0), Alert: &models.Alert{}}
  290. pb.PluginChannel <- ProfileAlert{ProfileID: uint(0), Alert: &models.Alert{}}
  291. // make it wait a bit, CI can be slow
  292. time.Sleep(time.Second)
  293. content, err := os.ReadFile("./out")
  294. require.NoError(t, err, "Error reading file")
  295. decoder := json.NewDecoder(bytes.NewReader(content))
  296. var alerts []models.Alert
  297. // two notifications, one alert each
  298. err = decoder.Decode(&alerts)
  299. assert.NoError(t, err)
  300. assert.Len(t, alerts, 1)
  301. err = decoder.Decode(&alerts)
  302. assert.NoError(t, err)
  303. assert.Len(t, alerts, 1)
  304. err = decoder.Decode(&alerts)
  305. assert.Equal(t, err, io.EOF)
  306. }