manager.go 20 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604
  1. package leakybucket
  2. import (
  3. "encoding/json"
  4. "errors"
  5. "fmt"
  6. "io"
  7. "io/ioutil"
  8. "math"
  9. "os"
  10. "path/filepath"
  11. "strings"
  12. "time"
  13. "github.com/crowdsecurity/crowdsec/pkg/cwversion"
  14. "github.com/crowdsecurity/crowdsec/pkg/types"
  15. "github.com/davecgh/go-spew/spew"
  16. "github.com/prometheus/client_golang/prometheus"
  17. log "github.com/sirupsen/logrus"
  18. "github.com/antonmedv/expr"
  19. "github.com/antonmedv/expr/vm"
  20. "github.com/goombaio/namegenerator"
  21. yaml "gopkg.in/yaml.v2"
  22. "github.com/crowdsecurity/crowdsec/pkg/exprhelpers"
  23. )
  24. // BucketFactory struct holds all fields for any bucket configuration. This is to have a
  25. // generic struct for buckets. This can be seen as a bucket factory.
  26. type BucketFactory struct {
  27. FormatVersion string `yaml:"format"`
  28. Author string `yaml:"author"`
  29. Description string `yaml:"description"`
  30. References []string `yaml:"references"`
  31. Type string `yaml:"type"` //Type can be : leaky, counter, trigger. It determines the main bucket characteristics
  32. Name string `yaml:"name"` //Name of the bucket, used later in log and user-messages. Should be unique
  33. Capacity int `yaml:"capacity"` //Capacity is applicable to leaky buckets and determines the "burst" capacity
  34. LeakSpeed string `yaml:"leakspeed"` //Leakspeed is a float representing how many events per second leak out of the bucket
  35. Duration string `yaml:"duration"` //Duration allows 'counter' buckets to have a fixed life-time
  36. Filter string `yaml:"filter"` //Filter is an expr that determines if an event is elligible for said bucket. Filter is evaluated against the Event struct
  37. GroupBy string `yaml:"groupby,omitempty"` //groupy is an expr that allows to determine the partitions of the bucket. A common example is the source_ip
  38. Distinct string `yaml:"distinct"` //Distinct, when present, adds a `Pour()` processor that will only pour uniq items (based on uniq_filter expr result)
  39. Debug bool `yaml:"debug"` //Debug, when set to true, will enable debugging for _this_ scenario specifically
  40. Labels map[string]string `yaml:"labels"` //Labels is K:V list aiming at providing context the overflow
  41. Blackhole string `yaml:"blackhole,omitempty"` //Blackhole is a duration that, if present, will prevent same bucket partition to overflow more often than $duration
  42. logger *log.Entry `yaml:"-"` //logger is bucket-specific logger (used by Debug as well)
  43. Reprocess bool `yaml:"reprocess"` //Reprocess, if true, will for the bucket to be re-injected into processing chain
  44. CacheSize int `yaml:"cache_size"` //CacheSize, if > 0, limits the size of in-memory cache of the bucket
  45. Profiling bool `yaml:"profiling"` //Profiling, if true, will make the bucket record pours/overflows/etc.
  46. OverflowFilter string `yaml:"overflow_filter"` //OverflowFilter if present, is a filter that must return true for the overflow to go through
  47. BucketName string `yaml:"-"`
  48. Filename string `yaml:"-"`
  49. RunTimeFilter *vm.Program `json:"-"`
  50. RunTimeGroupBy *vm.Program `json:"-"`
  51. Data []*types.DataSource `yaml:"data,omitempty"`
  52. leakspeed time.Duration //internal representation of `Leakspeed`
  53. duration time.Duration //internal representation of `Duration`
  54. ret chan types.Event //the bucket-specific output chan for overflows
  55. processors []Processor //processors is the list of hooks for pour/overflow/create (cf. uniq, blackhole etc.)
  56. output bool //??
  57. }
  58. func ValidateFactory(b *BucketFactory) error {
  59. if b.Name == "" {
  60. return fmt.Errorf("bucket must have name")
  61. }
  62. if b.Description == "" {
  63. return fmt.Errorf("description is mandatory")
  64. }
  65. if b.Type == "leaky" {
  66. if b.Capacity <= 0 { //capacity must be a positive int
  67. return fmt.Errorf("bad capacity for leaky '%d'", b.Capacity)
  68. }
  69. if b.LeakSpeed == "" {
  70. return fmt.Errorf("leakspeed can't be empty for leaky")
  71. }
  72. if b.leakspeed == 0 {
  73. return fmt.Errorf("bad leakspeed for leaky '%s'", b.LeakSpeed)
  74. }
  75. } else if b.Type == "counter" {
  76. if b.Duration == "" {
  77. return fmt.Errorf("duration ca't be empty for counter")
  78. }
  79. if b.duration == 0 {
  80. return fmt.Errorf("bad duration for counter bucket '%d'", b.duration)
  81. }
  82. if b.Capacity != -1 {
  83. return fmt.Errorf("counter bucket must have -1 capacity")
  84. }
  85. } else if b.Type == "trigger" {
  86. if b.Capacity != 0 {
  87. return fmt.Errorf("trigger bucket must have 0 capacity")
  88. }
  89. } else {
  90. return fmt.Errorf("unknown bucket type '%s'", b.Type)
  91. }
  92. return nil
  93. }
  94. /* Init recursively process yaml files from a directory and loads them as BucketFactory */
  95. func Init(cfg map[string]string) ([]BucketFactory, chan types.Event, error) {
  96. return LoadBucketDir(cfg["patterns"], cfg["data"])
  97. }
  98. func LoadBuckets(files []string, dataFolder string) ([]BucketFactory, chan types.Event, error) {
  99. var (
  100. ret []BucketFactory = []BucketFactory{}
  101. response chan types.Event
  102. )
  103. var seed namegenerator.Generator = namegenerator.NewNameGenerator(time.Now().UTC().UnixNano())
  104. err := exprhelpers.Init()
  105. if err != nil {
  106. return nil, nil, err
  107. }
  108. response = make(chan types.Event, 1)
  109. for _, f := range files {
  110. log.Debugf("Loading %s", f)
  111. if !strings.HasSuffix(f, ".yaml") {
  112. log.Debugf("Skipping %s : not a yaml file", f)
  113. continue
  114. }
  115. //process the yaml
  116. bucketConfigurationFile, err := os.Open(f)
  117. if err != nil {
  118. log.Errorf("Can't access leaky configuration file %s", f)
  119. return nil, nil, err
  120. }
  121. dec := yaml.NewDecoder(bucketConfigurationFile)
  122. dec.SetStrict(true)
  123. for {
  124. g := BucketFactory{}
  125. err = dec.Decode(&g)
  126. if err != nil {
  127. if err == io.EOF {
  128. log.Tracef("End of yaml file")
  129. break
  130. } else {
  131. log.Errorf("Bad yaml in %s : %v", f, err)
  132. return nil, nil, fmt.Errorf("bad yaml in %s : %v", f, err)
  133. }
  134. }
  135. //check empty
  136. if g.Name == "" {
  137. log.Errorf("Won't load nameless bucket")
  138. return nil, nil, fmt.Errorf("nameless bucket")
  139. }
  140. //check compat
  141. if g.FormatVersion == "" {
  142. log.Debugf("no version in %s : %s, assuming '1.0'", g.Name, f)
  143. g.FormatVersion = "1.0"
  144. }
  145. ok, err := cwversion.Statisfies(g.FormatVersion, cwversion.Constraint_scenario)
  146. if err != nil {
  147. log.Fatalf("Failed to check version : %s", err)
  148. }
  149. if !ok {
  150. log.Errorf("can't load %s : %s doesn't satisfy scenario format %s, skip", g.Name, g.FormatVersion, cwversion.Constraint_scenario)
  151. continue
  152. }
  153. g.Filename = filepath.Clean(f)
  154. g.BucketName = seed.Generate()
  155. g.ret = response
  156. err = LoadBucket(&g, dataFolder)
  157. if err != nil {
  158. log.Errorf("Failed to load bucket : %v", err)
  159. return nil, nil, fmt.Errorf("loadBucket failed : %v", err)
  160. }
  161. ret = append(ret, g)
  162. }
  163. }
  164. log.Warningf("Loaded %d scenarios", len(ret))
  165. return ret, response, nil
  166. }
  167. func LoadBucketDir(dir string, dataFolder string) ([]BucketFactory, chan types.Event, error) {
  168. var (
  169. filenames []string
  170. )
  171. files, err := ioutil.ReadDir(dir)
  172. if err != nil {
  173. return nil, nil, err
  174. }
  175. for _, f := range files {
  176. filenames = append(filenames, dir+f.Name())
  177. }
  178. return LoadBuckets(filenames, dataFolder)
  179. }
  180. /* Init recursively process yaml files from a directory and loads them as BucketFactory */
  181. func LoadBucket(g *BucketFactory, dataFolder string) error {
  182. var err error
  183. if g.Debug {
  184. var clog = logrus.New()
  185. if err := types.ConfigureLogger(clog); err != nil {
  186. log.Fatalf("While creating bucket-specific logger : %s", err)
  187. }
  188. me
  189. clog.SetLevel(log.DebugLevel)
  190. g.logger = clog.WithFields(log.Fields{
  191. "cfg": g.BucketName,
  192. "name": g.Name,
  193. "file": g.Filename,
  194. })
  195. } else {
  196. /* else bind it to the default one (might find something more elegant here)*/
  197. g.logger = log.WithFields(log.Fields{
  198. "cfg": g.BucketName,
  199. "name": g.Name,
  200. "file": g.Filename,
  201. })
  202. }
  203. if g.LeakSpeed != "" {
  204. if g.leakspeed, err = time.ParseDuration(g.LeakSpeed); err != nil {
  205. return fmt.Errorf("bad leakspeed '%s' in %s : %v", g.LeakSpeed, g.Filename, err)
  206. }
  207. } else {
  208. g.leakspeed = time.Duration(0)
  209. }
  210. if g.Duration != "" {
  211. if g.duration, err = time.ParseDuration(g.Duration); err != nil {
  212. return fmt.Errorf("invalid Duration '%s' in %s : %v", g.Duration, g.Filename, err)
  213. }
  214. }
  215. if g.Filter == "" {
  216. g.logger.Warningf("Bucket without filter, abort.")
  217. return fmt.Errorf("bucket without filter directive")
  218. }
  219. g.RunTimeFilter, err = expr.Compile(g.Filter, expr.Env(exprhelpers.GetExprEnv(map[string]interface{}{"evt": &types.Event{}})))
  220. if err != nil {
  221. return fmt.Errorf("invalid filter '%s' in %s : %v", g.Filter, g.Filename, err)
  222. }
  223. if g.GroupBy != "" {
  224. g.RunTimeGroupBy, err = expr.Compile(g.GroupBy, expr.Env(exprhelpers.GetExprEnv(map[string]interface{}{"evt": &types.Event{}})))
  225. if err != nil {
  226. return fmt.Errorf("invalid groupby '%s' in %s : %v", g.GroupBy, g.Filename, err)
  227. }
  228. }
  229. g.logger.Infof("Adding %s bucket", g.Type)
  230. //return the Holder correponding to the type of bucket
  231. g.processors = []Processor{}
  232. switch g.Type {
  233. case "leaky":
  234. g.processors = append(g.processors, &DumbProcessor{})
  235. case "trigger":
  236. g.processors = append(g.processors, &Trigger{})
  237. case "counter":
  238. g.processors = append(g.processors, &DumbProcessor{})
  239. default:
  240. return fmt.Errorf("invalid type '%s' in %s : %v", g.Type, g.Filename, err)
  241. }
  242. if g.Distinct != "" {
  243. g.logger.Debugf("Adding a non duplicate filter on %s.", g.Name)
  244. g.processors = append(g.processors, &Uniq{})
  245. }
  246. if g.OverflowFilter != "" {
  247. g.logger.Debugf("Adding an overflow filter")
  248. filovflw, err := NewOverflowFilter(g)
  249. if err != nil {
  250. g.logger.Errorf("Error creating overflow_filter : %s", err)
  251. return fmt.Errorf("error creating overflow_filter : %s", err)
  252. }
  253. g.processors = append(g.processors, filovflw)
  254. }
  255. if g.Blackhole != "" {
  256. g.logger.Debugf("Adding blackhole.")
  257. blackhole, err := NewBlackhole(g)
  258. if err != nil {
  259. g.logger.Errorf("Error creating blackhole : %s", err)
  260. return fmt.Errorf("error creating blackhole : %s", err)
  261. }
  262. g.processors = append(g.processors, blackhole)
  263. }
  264. if len(g.Data) > 0 {
  265. for _, data := range g.Data {
  266. err = exprhelpers.FileInit(dataFolder, data.DestPath, data.Type)
  267. if err != nil {
  268. g.logger.Errorf("unable to init data for file '%s': %s", data.Filename, err.Error())
  269. }
  270. }
  271. }
  272. g.output = false
  273. if err := ValidateFactory(g); err != nil {
  274. return fmt.Errorf("invalid bucket from %s : %v", g.Filename, err)
  275. }
  276. return nil
  277. }
  278. func LoadBucketsState(file string, buckets *Buckets, holders []BucketFactory) error {
  279. var state map[string]Leaky
  280. body, err := ioutil.ReadFile(file)
  281. if err != nil {
  282. return fmt.Errorf("can't state file %s : %s", file, err)
  283. }
  284. if err := json.Unmarshal(body, &state); err != nil {
  285. return fmt.Errorf("can't unmarshal state file %s : %s", file, err)
  286. }
  287. for k, v := range state {
  288. var tbucket *Leaky
  289. log.Debugf("Reloading bucket %s", k)
  290. val, ok := buckets.Bucket_map.Load(k)
  291. if ok {
  292. log.Fatalf("key %s already exists : %+v", k, val)
  293. }
  294. //find back our holder
  295. found := false
  296. for _, h := range holders {
  297. if h.Name == v.Name {
  298. log.Debugf("found factory %s/%s -> %s", h.Author, h.Name, h.Description)
  299. //check in which mode the bucket was
  300. if v.Mode == TIMEMACHINE {
  301. tbucket = NewTimeMachine(h)
  302. } else if v.Mode == LIVE {
  303. tbucket = NewLeaky(h)
  304. } else {
  305. log.Errorf("Unknown bucket type : %d", v.Mode)
  306. }
  307. /*Trying to restore queue state*/
  308. tbucket.Queue = v.Queue
  309. /*Trying to set the limiter to the saved values*/
  310. tbucket.Limiter.Load(v.SerializedState)
  311. tbucket.In = make(chan types.Event)
  312. tbucket.Mapkey = k
  313. tbucket.Signal = make(chan bool, 1)
  314. tbucket.KillSwitch = make(chan bool, 1)
  315. tbucket.First_ts = v.First_ts
  316. tbucket.Last_ts = v.Last_ts
  317. tbucket.Ovflw_ts = v.Ovflw_ts
  318. tbucket.Total_count = v.Total_count
  319. buckets.Bucket_map.Store(k, tbucket)
  320. go LeakRoutine(tbucket)
  321. <-tbucket.Signal
  322. found = true
  323. break
  324. }
  325. }
  326. if !found {
  327. log.Fatalf("Unable to find holder for bucket %s : %s", k, spew.Sdump(v))
  328. }
  329. }
  330. log.Infof("Restored %d buckets from dump", len(state))
  331. return nil
  332. }
  333. var serialized map[string]Leaky
  334. /*The leaky routines lifecycle are based on "real" time.
  335. But when we are running in time-machine mode, the reference time is in logs and not "real" time.
  336. Thus we need to garbage collect them to avoid a skyrocketing memory usage.*/
  337. func GarbageCollectBuckets(deadline time.Time, buckets *Buckets) error {
  338. total := 0
  339. discard := 0
  340. toflush := []string{}
  341. buckets.Bucket_map.Range(func(rkey, rvalue interface{}) bool {
  342. key := rkey.(string)
  343. val := rvalue.(*Leaky)
  344. total += 1
  345. if !val.Ovflw_ts.IsZero() {
  346. discard += 1
  347. val.logger.Debugf("overflowed at %s.", val.Ovflw_ts)
  348. toflush = append(toflush, key)
  349. val.KillSwitch <- true
  350. return true
  351. }
  352. /*FIXME : sometimes the gettokenscountat has some rounding issues when we try to
  353. match it with bucket capacity, even if the bucket has long due underflow. Round to 2 decimals*/
  354. tokat := val.Limiter.GetTokensCountAt(deadline)
  355. tokcapa := float64(val.Capacity)
  356. tokat = math.Round(tokat*100) / 100
  357. tokcapa = math.Round(tokcapa*100) / 100
  358. if tokat >= tokcapa {
  359. BucketsUnderflow.With(prometheus.Labels{"name": val.Name}).Inc()
  360. val.logger.Debugf("UNDERFLOW : first_ts:%s tokens_at:%f capcity:%f", val.First_ts, tokat, tokcapa)
  361. toflush = append(toflush, key)
  362. val.KillSwitch <- true
  363. return true
  364. } else {
  365. val.logger.Debugf("(%s) not dead, count:%f capacity:%f", val.First_ts, tokat, tokcapa)
  366. }
  367. if _, ok := serialized[key]; ok {
  368. log.Errorf("entry %s already exists", key)
  369. return false
  370. } else {
  371. log.Debugf("serialize %s of %s : %s", val.Name, val.Uuid, val.Mapkey)
  372. }
  373. return true
  374. })
  375. log.Infof("Cleaned %d buckets", len(toflush))
  376. for _, flushkey := range toflush {
  377. buckets.Bucket_map.Delete(flushkey)
  378. }
  379. return nil
  380. }
  381. func DumpBucketsStateAt(file string, deadline time.Time, buckets *Buckets) error {
  382. serialized = make(map[string]Leaky)
  383. log.Printf("Dumping buckets state at %s", deadline)
  384. total := 0
  385. discard := 0
  386. buckets.Bucket_map.Range(func(rkey, rvalue interface{}) bool {
  387. key := rkey.(string)
  388. val := rvalue.(*Leaky)
  389. total += 1
  390. if !val.Ovflw_ts.IsZero() {
  391. discard += 1
  392. val.logger.Debugf("overflowed at %s.", val.Ovflw_ts)
  393. return true
  394. }
  395. /*FIXME : sometimes the gettokenscountat has some rounding issues when we try to
  396. match it with bucket capacity, even if the bucket has long due underflow. Round to 2 decimals*/
  397. tokat := val.Limiter.GetTokensCountAt(deadline)
  398. tokcapa := float64(val.Capacity)
  399. tokat = math.Round(tokat*100) / 100
  400. tokcapa = math.Round(tokcapa*100) / 100
  401. if tokat >= tokcapa {
  402. BucketsUnderflow.With(prometheus.Labels{"name": val.Name}).Inc()
  403. val.logger.Debugf("UNDERFLOW : first_ts:%s tokens_at:%f capcity:%f", val.First_ts, tokat, tokcapa)
  404. discard += 1
  405. return true
  406. } else {
  407. val.logger.Debugf("(%s) not dead, count:%f capacity:%f", val.First_ts, tokat, tokcapa)
  408. }
  409. if _, ok := serialized[key]; ok {
  410. log.Errorf("entry %s already exists", key)
  411. return false
  412. } else {
  413. log.Debugf("serialize %s of %s : %s", val.Name, val.Uuid, val.Mapkey)
  414. }
  415. val.SerializedState = val.Limiter.Dump()
  416. serialized[key] = *val
  417. return true
  418. })
  419. bbuckets, err := json.MarshalIndent(serialized, "", " ")
  420. if err != nil {
  421. log.Fatalf("Failed to unmarshal buckets : %s", err)
  422. }
  423. err = ioutil.WriteFile(file, bbuckets, 0644)
  424. if err != nil {
  425. log.Fatalf("Failed to write buckets state %s", err)
  426. }
  427. log.Warningf("Serialized %d live buckets state, %d total with %d expired to %s", len(serialized), total, discard, file)
  428. return nil
  429. }
  430. func PourItemToHolders(parsed types.Event, holders []BucketFactory, buckets *Buckets) (bool, error) {
  431. var (
  432. ok, condition, sent bool
  433. err error
  434. )
  435. for idx, holder := range holders {
  436. if holder.RunTimeFilter != nil {
  437. log.Debugf("event against holder %d/%d", idx, len(holders))
  438. output, err := expr.Run(holder.RunTimeFilter, exprhelpers.GetExprEnv(map[string]interface{}{"evt": &parsed}))
  439. if err != nil {
  440. holder.logger.Errorf("failed parsing : %v", err)
  441. return false, fmt.Errorf("leaky failed : %s", err)
  442. }
  443. // we assume we a bool should add type check here
  444. if condition, ok = output.(bool); !ok {
  445. holder.logger.Errorf("unexpected non-bool return : %T", output)
  446. log.Fatalf("Filter issue")
  447. }
  448. if !condition {
  449. holder.logger.Debugf("eval(FALSE) %s", holder.Filter)
  450. //log.Debugf("%s -> FALSE", holder.Filter)
  451. //holder.logger.Debugf("Filter eval failed")
  452. continue
  453. } else {
  454. holder.logger.Debugf("eval(TRUE) %s", holder.Filter)
  455. }
  456. }
  457. sent = false
  458. var groupby string
  459. if holder.RunTimeGroupBy != nil {
  460. tmpGroupBy, err := expr.Run(holder.RunTimeGroupBy, exprhelpers.GetExprEnv(map[string]interface{}{"evt": &parsed}))
  461. if err != nil {
  462. log.Errorf("failed groupby : %v", err)
  463. return false, errors.New("leaky failed :/")
  464. }
  465. if groupby, ok = tmpGroupBy.(string); !ok {
  466. log.Fatalf("failed groupby type : %v", err)
  467. return false, errors.New("groupby wrong type")
  468. }
  469. }
  470. buckey := GetKey(holder, groupby)
  471. sigclosed := 0
  472. keymiss := 0
  473. failed_sent := 0
  474. attempts := 0
  475. start := time.Now()
  476. for !sent {
  477. attempts += 1
  478. /* Warn the user if we used more than a 100 ms to pour an event, it's at least an half lock*/
  479. if attempts%100000 == 0 && start.Add(100*time.Millisecond).Before(time.Now()) {
  480. log.Warningf("stuck for %s sending event to %s (sigclosed:%d keymiss:%d failed_sent:%d attempts:%d)", time.Since(start),
  481. buckey, sigclosed, keymiss, failed_sent, attempts)
  482. }
  483. biface, ok := buckets.Bucket_map.Load(buckey)
  484. //biface, bigout
  485. /* the bucket doesn't exist, create it !*/
  486. if !ok {
  487. /*
  488. not found in map
  489. */
  490. log.Debugf("Creating bucket %s", buckey)
  491. keymiss += 1
  492. var fresh_bucket *Leaky
  493. switch parsed.ExpectMode {
  494. case TIMEMACHINE:
  495. fresh_bucket = NewTimeMachine(holder)
  496. holder.logger.Debugf("Creating TimeMachine bucket")
  497. case LIVE:
  498. fresh_bucket = NewLeaky(holder)
  499. holder.logger.Debugf("Creating Live bucket")
  500. default:
  501. log.Fatalf("input event has no expected mode, malformed : %+v", parsed)
  502. }
  503. fresh_bucket.In = make(chan types.Event)
  504. fresh_bucket.Mapkey = buckey
  505. fresh_bucket.Signal = make(chan bool, 1)
  506. fresh_bucket.KillSwitch = make(chan bool, 1)
  507. buckets.Bucket_map.Store(buckey, fresh_bucket)
  508. go LeakRoutine(fresh_bucket)
  509. log.Debugf("Created new bucket %s", buckey)
  510. //wait for signal to be opened
  511. <-fresh_bucket.Signal
  512. continue
  513. }
  514. bucket := biface.(*Leaky)
  515. /* check if leak routine is up */
  516. select {
  517. case _, ok := <-bucket.Signal:
  518. if !ok {
  519. //it's closed, delete it
  520. bucket.logger.Debugf("Bucket %s found dead, cleanup the body", buckey)
  521. buckets.Bucket_map.Delete(buckey)
  522. sigclosed += 1
  523. continue
  524. }
  525. log.Debugf("Signal exists, try to pour :)")
  526. default:
  527. /*nothing to read, but not closed, try to pour */
  528. log.Debugf("Signal exists but empty, try to pour :)")
  529. }
  530. /*let's see if this time-bucket should have expired */
  531. if bucket.Mode == TIMEMACHINE && !bucket.First_ts.IsZero() {
  532. var d time.Time
  533. err = d.UnmarshalText([]byte(parsed.MarshaledTime))
  534. if err != nil {
  535. log.Warningf("Failed unmarshaling event time (%s) : %v", parsed.MarshaledTime, err)
  536. }
  537. if d.After(bucket.Last_ts.Add(bucket.Duration)) {
  538. bucket.logger.Debugf("bucket is expired (curr event: %s, bucket deadline: %s), kill", d, bucket.Last_ts.Add(bucket.Duration))
  539. buckets.Bucket_map.Delete(buckey)
  540. continue
  541. }
  542. }
  543. /*if we're here, let's try to pour */
  544. select {
  545. case bucket.In <- parsed:
  546. log.Debugf("Successfully sent !")
  547. //sent was successful !
  548. sent = true
  549. continue
  550. default:
  551. failed_sent += 1
  552. log.Debugf("Failed to send, try again")
  553. continue
  554. }
  555. }
  556. log.Debugf("bucket '%s' is poured", holder.Name)
  557. }
  558. return sent, nil
  559. }