|
@@ -135,9 +135,8 @@ func FromFactory(g BucketFactory) *Leaky {
|
|
} else {
|
|
} else {
|
|
limiter = rate.NewLimiter(rate.Every(g.leakspeed), g.Capacity)
|
|
limiter = rate.NewLimiter(rate.Every(g.leakspeed), g.Capacity)
|
|
}
|
|
}
|
|
- if g.Profiling {
|
|
|
|
- BucketsInstanciation.With(prometheus.Labels{"name": g.Name}).Inc()
|
|
|
|
- }
|
|
|
|
|
|
+ BucketsInstanciation.With(prometheus.Labels{"name": g.Name}).Inc()
|
|
|
|
+
|
|
//create the leaky bucket per se
|
|
//create the leaky bucket per se
|
|
l := &Leaky{
|
|
l := &Leaky{
|
|
Name: g.Name,
|
|
Name: g.Name,
|
|
@@ -196,7 +195,7 @@ func LeakRoutine(l *Leaky) {
|
|
}
|
|
}
|
|
|
|
|
|
l.logger.Debugf("Leaky routine starting, lifetime : %s", l.Duration)
|
|
l.logger.Debugf("Leaky routine starting, lifetime : %s", l.Duration)
|
|
- defer l.logger.Debugf("Leaky routine exiting")
|
|
|
|
|
|
+ defer l.logger.Infof("Leaky routine exiting") //to remove
|
|
for {
|
|
for {
|
|
select {
|
|
select {
|
|
/*receiving an event*/
|
|
/*receiving an event*/
|
|
@@ -212,9 +211,8 @@ func LeakRoutine(l *Leaky) {
|
|
l.logger.Tracef("Pour event: %s", spew.Sdump(msg))
|
|
l.logger.Tracef("Pour event: %s", spew.Sdump(msg))
|
|
l.logger.Debugf("Pouring event.")
|
|
l.logger.Debugf("Pouring event.")
|
|
|
|
|
|
- if l.Profiling {
|
|
|
|
- BucketsPour.With(prometheus.Labels{"name": l.Name, "source": msg.Line.Src}).Inc()
|
|
|
|
- }
|
|
|
|
|
|
+ BucketsPour.With(prometheus.Labels{"name": l.Name, "source": msg.Line.Src}).Inc()
|
|
|
|
+
|
|
l.Pour(l, msg) // glue for now
|
|
l.Pour(l, msg) // glue for now
|
|
//Clear cache on behalf of pour
|
|
//Clear cache on behalf of pour
|
|
tmp := time.NewTicker(l.Duration)
|
|
tmp := time.NewTicker(l.Duration)
|
|
@@ -240,9 +238,9 @@ func LeakRoutine(l *Leaky) {
|
|
l.logger.Tracef("Overflow event: %s", spew.Sdump(types.Event{Overflow: sig}))
|
|
l.logger.Tracef("Overflow event: %s", spew.Sdump(types.Event{Overflow: sig}))
|
|
mt, _ := l.Ovflw_ts.MarshalText()
|
|
mt, _ := l.Ovflw_ts.MarshalText()
|
|
l.logger.Tracef("overflow time : %s", mt)
|
|
l.logger.Tracef("overflow time : %s", mt)
|
|
- if l.Profiling {
|
|
|
|
- BucketsOverflow.With(prometheus.Labels{"name": l.Name}).Inc()
|
|
|
|
- }
|
|
|
|
|
|
+
|
|
|
|
+ BucketsOverflow.With(prometheus.Labels{"name": l.Name}).Inc()
|
|
|
|
+
|
|
l.AllOut <- types.Event{Overflow: sig, Type: types.OVFLW, MarshaledTime: string(mt)}
|
|
l.AllOut <- types.Event{Overflow: sig, Type: types.OVFLW, MarshaledTime: string(mt)}
|
|
return
|
|
return
|
|
/*we underflow or reach bucket deadline (timers)*/
|
|
/*we underflow or reach bucket deadline (timers)*/
|
|
@@ -253,9 +251,8 @@ func LeakRoutine(l *Leaky) {
|
|
sig := types.SignalOccurence{MapKey: l.Mapkey}
|
|
sig := types.SignalOccurence{MapKey: l.Mapkey}
|
|
|
|
|
|
if l.timedOverflow {
|
|
if l.timedOverflow {
|
|
- if l.Profiling {
|
|
|
|
- BucketsOverflow.With(prometheus.Labels{"name": l.Name}).Inc()
|
|
|
|
- }
|
|
|
|
|
|
+ BucketsOverflow.With(prometheus.Labels{"name": l.Name}).Inc()
|
|
|
|
+
|
|
sig = FormatOverflow(l, ofw)
|
|
sig = FormatOverflow(l, ofw)
|
|
for _, f := range l.BucketConfig.processors {
|
|
for _, f := range l.BucketConfig.processors {
|
|
sig, ofw = f.OnBucketOverflow(l.BucketConfig)(l, sig, ofw)
|
|
sig, ofw = f.OnBucketOverflow(l.BucketConfig)(l, sig, ofw)
|