|
@@ -375,6 +375,7 @@ func GarbageCollectBuckets(deadline time.Time, buckets *Buckets) error {
|
|
key := rkey.(string)
|
|
key := rkey.(string)
|
|
val := rvalue.(*Leaky)
|
|
val := rvalue.(*Leaky)
|
|
total += 1
|
|
total += 1
|
|
|
|
+ //bucket already overflowed, we can kill it
|
|
if !val.Ovflw_ts.IsZero() {
|
|
if !val.Ovflw_ts.IsZero() {
|
|
discard += 1
|
|
discard += 1
|
|
val.logger.Debugf("overflowed at %s.", val.Ovflw_ts)
|
|
val.logger.Debugf("overflowed at %s.", val.Ovflw_ts)
|
|
@@ -388,6 +389,7 @@ func GarbageCollectBuckets(deadline time.Time, buckets *Buckets) error {
|
|
tokcapa := float64(val.Capacity)
|
|
tokcapa := float64(val.Capacity)
|
|
tokat = math.Round(tokat*100) / 100
|
|
tokat = math.Round(tokat*100) / 100
|
|
tokcapa = math.Round(tokcapa*100) / 100
|
|
tokcapa = math.Round(tokcapa*100) / 100
|
|
|
|
+ //bucket actually underflowed based on log time, but no in real time
|
|
if tokat >= tokcapa {
|
|
if tokat >= tokcapa {
|
|
BucketsUnderflow.With(prometheus.Labels{"name": val.Name}).Inc()
|
|
BucketsUnderflow.With(prometheus.Labels{"name": val.Name}).Inc()
|
|
val.logger.Debugf("UNDERFLOW : first_ts:%s tokens_at:%f capcity:%f", val.First_ts, tokat, tokcapa)
|
|
val.logger.Debugf("UNDERFLOW : first_ts:%s tokens_at:%f capcity:%f", val.First_ts, tokat, tokcapa)
|
|
@@ -412,7 +414,14 @@ func GarbageCollectBuckets(deadline time.Time, buckets *Buckets) error {
|
|
return nil
|
|
return nil
|
|
}
|
|
}
|
|
|
|
|
|
-func DumpBucketsStateAt(file string, deadline time.Time, buckets *Buckets) error {
|
|
|
|
|
|
+func DumpBucketsStateAt(deadline time.Time, buckets *Buckets) (string, error) {
|
|
|
|
+ //var file string
|
|
|
|
+ tmpFd, err := ioutil.TempFile(os.TempDir(), "crowdsec-buckets-dump-")
|
|
|
|
+ if err != nil {
|
|
|
|
+ return "", fmt.Errorf("failed to create temp file : %s", err)
|
|
|
|
+ }
|
|
|
|
+ defer tmpFd.Close()
|
|
|
|
+ tmpFileName := tmpFd.Name()
|
|
serialized = make(map[string]Leaky)
|
|
serialized = make(map[string]Leaky)
|
|
log.Printf("Dumping buckets state at %s", deadline)
|
|
log.Printf("Dumping buckets state at %s", deadline)
|
|
total := 0
|
|
total := 0
|
|
@@ -455,11 +464,23 @@ func DumpBucketsStateAt(file string, deadline time.Time, buckets *Buckets) error
|
|
if err != nil {
|
|
if err != nil {
|
|
log.Fatalf("Failed to unmarshal buckets : %s", err)
|
|
log.Fatalf("Failed to unmarshal buckets : %s", err)
|
|
}
|
|
}
|
|
- err = ioutil.WriteFile(file, bbuckets, 0644)
|
|
|
|
|
|
+ size, err := tmpFd.Write(bbuckets)
|
|
if err != nil {
|
|
if err != nil {
|
|
- log.Fatalf("Failed to write buckets state %s", err)
|
|
|
|
|
|
+ return "", fmt.Errorf("failed to write temp file : %s", err)
|
|
}
|
|
}
|
|
- log.Warningf("Serialized %d live buckets state, %d total with %d expired to %s", len(serialized), total, discard, file)
|
|
|
|
|
|
+ log.Infof("Serialized %d live buckets (+%d expired) in %d bytes to %s", len(serialized), discard, size, tmpFd.Name())
|
|
|
|
+ serialized = nil
|
|
|
|
+ return tmpFileName, nil
|
|
|
|
+}
|
|
|
|
+
|
|
|
|
+func ShutdownAllBuckets(buckets *Buckets) error {
|
|
|
|
+ buckets.Bucket_map.Range(func(rkey, rvalue interface{}) bool {
|
|
|
|
+ key := rkey.(string)
|
|
|
|
+ val := rvalue.(*Leaky)
|
|
|
|
+ val.KillSwitch <- true
|
|
|
|
+ log.Infof("killed %s", key)
|
|
|
|
+ return true
|
|
|
|
+ })
|
|
return nil
|
|
return nil
|
|
}
|
|
}
|
|
|
|
|