manager_run.go 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357
  1. package leakybucket
  2. import (
  3. "encoding/json"
  4. "fmt"
  5. "io/ioutil"
  6. "math"
  7. "os"
  8. "time"
  9. "github.com/pkg/errors"
  10. "github.com/mohae/deepcopy"
  11. log "github.com/sirupsen/logrus"
  12. "github.com/antonmedv/expr"
  13. "github.com/crowdsecurity/crowdsec/pkg/exprhelpers"
  14. "github.com/crowdsecurity/crowdsec/pkg/types"
  15. "github.com/prometheus/client_golang/prometheus"
  16. )
  17. var serialized map[string]Leaky
  18. var BucketPourCache map[string][]types.Event
  19. var BucketPourTrack bool
  20. /*The leaky routines lifecycle are based on "real" time.
  21. But when we are running in time-machine mode, the reference time is in logs and not "real" time.
  22. Thus we need to garbage collect them to avoid a skyrocketing memory usage.*/
  23. func GarbageCollectBuckets(deadline time.Time, buckets *Buckets) error {
  24. buckets.wgPour.Wait()
  25. buckets.wgDumpState.Add(1)
  26. defer buckets.wgDumpState.Done()
  27. total := 0
  28. discard := 0
  29. toflush := []string{}
  30. buckets.Bucket_map.Range(func(rkey, rvalue interface{}) bool {
  31. key := rkey.(string)
  32. val := rvalue.(*Leaky)
  33. total += 1
  34. //bucket already overflowed, we can kill it
  35. if !val.Ovflw_ts.IsZero() {
  36. discard += 1
  37. val.logger.Debugf("overflowed at %s.", val.Ovflw_ts)
  38. toflush = append(toflush, key)
  39. val.tomb.Kill(nil)
  40. return true
  41. }
  42. /*FIXME : sometimes the gettokenscountat has some rounding issues when we try to
  43. match it with bucket capacity, even if the bucket has long due underflow. Round to 2 decimals*/
  44. tokat := val.Limiter.GetTokensCountAt(deadline)
  45. tokcapa := float64(val.Capacity)
  46. tokat = math.Round(tokat*100) / 100
  47. tokcapa = math.Round(tokcapa*100) / 100
  48. //bucket actually underflowed based on log time, but no in real time
  49. if tokat >= tokcapa {
  50. BucketsUnderflow.With(prometheus.Labels{"name": val.Name}).Inc()
  51. val.logger.Debugf("UNDERFLOW : first_ts:%s tokens_at:%f capcity:%f", val.First_ts, tokat, tokcapa)
  52. toflush = append(toflush, key)
  53. val.tomb.Kill(nil)
  54. return true
  55. } else {
  56. val.logger.Tracef("(%s) not dead, count:%f capacity:%f", val.First_ts, tokat, tokcapa)
  57. }
  58. if _, ok := serialized[key]; ok {
  59. log.Errorf("entry %s already exists", key)
  60. return false
  61. } else {
  62. log.Debugf("serialize %s of %s : %s", val.Name, val.Uuid, val.Mapkey)
  63. }
  64. return true
  65. })
  66. log.Infof("Cleaned %d buckets", len(toflush))
  67. for _, flushkey := range toflush {
  68. buckets.Bucket_map.Delete(flushkey)
  69. }
  70. return nil
  71. }
  72. func DumpBucketsStateAt(deadline time.Time, outputdir string, buckets *Buckets) (string, error) {
  73. //synchronize with PourItemtoHolders
  74. buckets.wgPour.Wait()
  75. buckets.wgDumpState.Add(1)
  76. defer buckets.wgDumpState.Done()
  77. if outputdir == "" {
  78. return "", fmt.Errorf("empty output dir for dump bucket state")
  79. }
  80. tmpFd, err := ioutil.TempFile(os.TempDir(), "crowdsec-buckets-dump-")
  81. if err != nil {
  82. return "", fmt.Errorf("failed to create temp file : %s", err)
  83. }
  84. defer tmpFd.Close()
  85. tmpFileName := tmpFd.Name()
  86. serialized = make(map[string]Leaky)
  87. log.Printf("Dumping buckets state at %s", deadline)
  88. total := 0
  89. discard := 0
  90. buckets.Bucket_map.Range(func(rkey, rvalue interface{}) bool {
  91. key := rkey.(string)
  92. val := rvalue.(*Leaky)
  93. total += 1
  94. if !val.Ovflw_ts.IsZero() {
  95. discard += 1
  96. val.logger.Debugf("overflowed at %s.", val.Ovflw_ts)
  97. return true
  98. }
  99. /*FIXME : sometimes the gettokenscountat has some rounding issues when we try to
  100. match it with bucket capacity, even if the bucket has long due underflow. Round to 2 decimals*/
  101. tokat := val.Limiter.GetTokensCountAt(deadline)
  102. tokcapa := float64(val.Capacity)
  103. tokat = math.Round(tokat*100) / 100
  104. tokcapa = math.Round(tokcapa*100) / 100
  105. if tokat >= tokcapa {
  106. BucketsUnderflow.With(prometheus.Labels{"name": val.Name}).Inc()
  107. val.logger.Debugf("UNDERFLOW : first_ts:%s tokens_at:%f capcity:%f", val.First_ts, tokat, tokcapa)
  108. discard += 1
  109. return true
  110. } else {
  111. val.logger.Debugf("(%s) not dead, count:%f capacity:%f", val.First_ts, tokat, tokcapa)
  112. }
  113. if _, ok := serialized[key]; ok {
  114. log.Errorf("entry %s already exists", key)
  115. return false
  116. } else {
  117. log.Debugf("serialize %s of %s : %s", val.Name, val.Uuid, val.Mapkey)
  118. }
  119. val.SerializedState = val.Limiter.Dump()
  120. serialized[key] = *val
  121. return true
  122. })
  123. bbuckets, err := json.MarshalIndent(serialized, "", " ")
  124. if err != nil {
  125. log.Fatalf("Failed to unmarshal buckets : %s", err)
  126. }
  127. size, err := tmpFd.Write(bbuckets)
  128. if err != nil {
  129. return "", fmt.Errorf("failed to write temp file : %s", err)
  130. }
  131. log.Infof("Serialized %d live buckets (+%d expired) in %d bytes to %s", len(serialized), discard, size, tmpFd.Name())
  132. serialized = nil
  133. return tmpFileName, nil
  134. }
  135. func ShutdownAllBuckets(buckets *Buckets) error {
  136. buckets.Bucket_map.Range(func(rkey, rvalue interface{}) bool {
  137. key := rkey.(string)
  138. val := rvalue.(*Leaky)
  139. val.tomb.Kill(nil)
  140. log.Infof("killed %s", key)
  141. return true
  142. })
  143. return nil
  144. }
  145. func PourItemToBucket(bucket *Leaky, holder BucketFactory, buckets *Buckets, parsed types.Event) (bool, error) {
  146. var sent bool
  147. var buckey = bucket.Mapkey
  148. var err error
  149. sigclosed := 0
  150. failed_sent := 0
  151. attempts := 0
  152. start := time.Now().UTC()
  153. for !sent {
  154. attempts += 1
  155. /* Warn the user if we used more than a 100 ms to pour an event, it's at least an half lock*/
  156. if attempts%100000 == 0 && start.Add(100*time.Millisecond).Before(time.Now().UTC()) {
  157. holder.logger.Warningf("stuck for %s sending event to %s (sigclosed:%d failed_sent:%d attempts:%d)", time.Since(start),
  158. buckey, sigclosed, failed_sent, attempts)
  159. }
  160. /* check if leak routine is up */
  161. select {
  162. case _, ok := <-bucket.Signal:
  163. if !ok {
  164. //the bucket was found and dead, get a new one and continue
  165. bucket.logger.Tracef("Bucket %s found dead, cleanup the body", buckey)
  166. buckets.Bucket_map.Delete(buckey)
  167. sigclosed += 1
  168. bucket, err = LoadOrStoreBucketFromHolder(buckey, buckets, holder, parsed.ExpectMode)
  169. if err != nil {
  170. return false, err
  171. }
  172. continue
  173. }
  174. holder.logger.Tracef("Signal exists, try to pour :)")
  175. default:
  176. /*nothing to read, but not closed, try to pour */
  177. holder.logger.Tracef("Signal exists but empty, try to pour :)")
  178. }
  179. /*let's see if this time-bucket should have expired */
  180. if bucket.Mode == TIMEMACHINE {
  181. bucket.mutex.Lock()
  182. firstTs := bucket.First_ts
  183. lastTs := bucket.Last_ts
  184. bucket.mutex.Unlock()
  185. if !firstTs.IsZero() {
  186. var d time.Time
  187. err = d.UnmarshalText([]byte(parsed.MarshaledTime))
  188. if err != nil {
  189. holder.logger.Warningf("Failed unmarshaling event time (%s) : %v", parsed.MarshaledTime, err)
  190. }
  191. if d.After(lastTs.Add(bucket.Duration)) {
  192. bucket.logger.Tracef("bucket is expired (curr event: %s, bucket deadline: %s), kill", d, lastTs.Add(bucket.Duration))
  193. buckets.Bucket_map.Delete(buckey)
  194. //not sure about this, should we create a new one ?
  195. sigclosed += 1
  196. bucket, err = LoadOrStoreBucketFromHolder(buckey, buckets, holder, parsed.ExpectMode)
  197. if err != nil {
  198. return false, err
  199. }
  200. continue
  201. }
  202. }
  203. }
  204. /*the bucket seems to be up & running*/
  205. select {
  206. case bucket.In <- parsed:
  207. holder.logger.Tracef("Successfully sent !")
  208. if BucketPourTrack {
  209. if _, ok := BucketPourCache[bucket.Name]; !ok {
  210. BucketPourCache[bucket.Name] = make([]types.Event, 0)
  211. }
  212. evt := deepcopy.Copy(parsed)
  213. BucketPourCache[bucket.Name] = append(BucketPourCache[bucket.Name], evt.(types.Event))
  214. }
  215. sent = true
  216. continue
  217. default:
  218. failed_sent += 1
  219. holder.logger.Tracef("Failed to send, try again")
  220. continue
  221. }
  222. }
  223. holder.logger.Debugf("bucket '%s' is poured", holder.Name)
  224. return sent, nil
  225. }
  226. func LoadOrStoreBucketFromHolder(partitionKey string, buckets *Buckets, holder BucketFactory, expectMode int) (*Leaky, error) {
  227. biface, ok := buckets.Bucket_map.Load(partitionKey)
  228. /* the bucket doesn't exist, create it !*/
  229. if !ok {
  230. var fresh_bucket *Leaky
  231. switch expectMode {
  232. case TIMEMACHINE:
  233. fresh_bucket = NewTimeMachine(holder)
  234. holder.logger.Debugf("Creating TimeMachine bucket")
  235. case LIVE:
  236. fresh_bucket = NewLeaky(holder)
  237. holder.logger.Debugf("Creating Live bucket")
  238. default:
  239. return nil, fmt.Errorf("input event has no expected mode : %+v", expectMode)
  240. }
  241. fresh_bucket.In = make(chan types.Event)
  242. fresh_bucket.Mapkey = partitionKey
  243. fresh_bucket.Signal = make(chan bool, 1)
  244. actual, stored := buckets.Bucket_map.LoadOrStore(partitionKey, fresh_bucket)
  245. if !stored {
  246. holder.tomb.Go(func() error {
  247. return LeakRoutine(fresh_bucket)
  248. })
  249. biface = fresh_bucket
  250. //once the created goroutine is ready to process event, we can return it
  251. <-fresh_bucket.Signal
  252. } else {
  253. holder.logger.Debugf("Unexpectedly found exisint bucket for %s", partitionKey)
  254. biface = actual
  255. }
  256. holder.logger.Debugf("Created new bucket %s", partitionKey)
  257. }
  258. return biface.(*Leaky), nil
  259. }
  260. func PourItemToHolders(parsed types.Event, holders []BucketFactory, buckets *Buckets) (bool, error) {
  261. var (
  262. ok, condition, poured bool
  263. )
  264. if BucketPourTrack {
  265. if BucketPourCache == nil {
  266. BucketPourCache = make(map[string][]types.Event)
  267. }
  268. if _, ok := BucketPourCache["OK"]; !ok {
  269. BucketPourCache["OK"] = make([]types.Event, 0)
  270. }
  271. evt := deepcopy.Copy(parsed)
  272. BucketPourCache["OK"] = append(BucketPourCache["OK"], evt.(types.Event))
  273. }
  274. //find the relevant holders (scenarios)
  275. for idx, holder := range holders {
  276. //evaluate bucket's condition
  277. if holder.RunTimeFilter != nil {
  278. holder.logger.Tracef("event against holder %d/%d", idx, len(holders))
  279. output, err := expr.Run(holder.RunTimeFilter, exprhelpers.GetExprEnv(map[string]interface{}{"evt": &parsed}))
  280. if err != nil {
  281. holder.logger.Errorf("failed parsing : %v", err)
  282. return false, fmt.Errorf("leaky failed : %s", err)
  283. }
  284. // we assume we a bool should add type check here
  285. if condition, ok = output.(bool); !ok {
  286. holder.logger.Errorf("unexpected non-bool return : %T", output)
  287. holder.logger.Fatalf("Filter issue")
  288. }
  289. if holder.Debug {
  290. holder.ExprDebugger.Run(holder.logger, condition, exprhelpers.GetExprEnv(map[string]interface{}{"evt": &parsed}))
  291. }
  292. if !condition {
  293. holder.logger.Debugf("Event leaving node : ko (filter mismatch)")
  294. continue
  295. }
  296. }
  297. //groupby determines the partition key for the specific bucket
  298. var groupby string
  299. if holder.RunTimeGroupBy != nil {
  300. tmpGroupBy, err := expr.Run(holder.RunTimeGroupBy, exprhelpers.GetExprEnv(map[string]interface{}{"evt": &parsed}))
  301. if err != nil {
  302. holder.logger.Errorf("failed groupby : %v", err)
  303. return false, errors.New("leaky failed :/")
  304. }
  305. if groupby, ok = tmpGroupBy.(string); !ok {
  306. holder.logger.Fatalf("failed groupby type : %v", err)
  307. return false, errors.New("groupby wrong type")
  308. }
  309. }
  310. buckey := GetKey(holder, groupby)
  311. //we need to either find the existing bucket, or create a new one (if it's the first event to hit it for this partition key)
  312. bucket, err := LoadOrStoreBucketFromHolder(buckey, buckets, holder, parsed.ExpectMode)
  313. if err != nil {
  314. return false, errors.Wrap(err, "failed to load or store bucket")
  315. }
  316. //finally, pour the even into the bucket
  317. ok, err := PourItemToBucket(bucket, holder, buckets, parsed)
  318. if err != nil {
  319. return false, errors.Wrap(err, "failed to pour bucket")
  320. }
  321. if ok {
  322. poured = true
  323. }
  324. }
  325. return poured, nil
  326. }