Browse Source

properly handle timemachine buckets

Sebastien Blot 3 years ago
parent
commit
9f676844d9
2 changed files with 37 additions and 20 deletions
  1. 18 16
      pkg/leakybucket/blackhole.go
  2. 19 4
      pkg/leakybucket/manager_run.go

+ 18 - 16
pkg/leakybucket/blackhole.go

@@ -9,17 +9,16 @@ import (
 	"gopkg.in/tomb.v2"
 )
 
-type HiddenKey struct {
-	key        string
-	expiration time.Time
-}
-
 type Blackhole struct {
-	duration   time.Duration
-	hiddenKeys []HiddenKey
+	duration time.Duration
 	DumbProcessor
 }
 
+type BlackholeExpiration struct {
+	blExpiration      time.Time
+	cleanupExpiration time.Time //we need a separate expiration for the cleanup to properly handle timemachine buckets
+}
+
 func NewBlackhole(bucketFactory *BucketFactory) (*Blackhole, error) {
 	duration, err := time.ParseDuration(bucketFactory.Blackhole)
 	if err != nil {
@@ -28,7 +27,6 @@ func NewBlackhole(bucketFactory *BucketFactory) (*Blackhole, error) {
 	}
 	return &Blackhole{
 		duration:      duration,
-		hiddenKeys:    []HiddenKey{},
 		DumbProcessor: DumbProcessor{},
 	}, nil
 }
@@ -46,8 +44,8 @@ func CleanupBlackhole(bucketsTomb *tomb.Tomb) error {
 			return nil
 		case <-ticker.C:
 			BlackholeTracking.Range(func(key, value interface{}) bool {
-				expirationDate := value.(time.Time)
-				if expirationDate.Before(time.Now().UTC()) {
+				cleanupDate := value.(BlackholeExpiration).cleanupExpiration
+				if cleanupDate.Before(time.Now().UTC()) {
 					log.Debugf("Expiring blackhole for %s", key)
 					BlackholeTracking.Delete(key)
 				}
@@ -59,9 +57,11 @@ func CleanupBlackhole(bucketsTomb *tomb.Tomb) error {
 
 func (bl *Blackhole) OnBucketOverflow(bucketFactory *BucketFactory) func(*Leaky, types.RuntimeAlert, *Queue) (types.RuntimeAlert, *Queue) {
 	return func(leaky *Leaky, alert types.RuntimeAlert, queue *Queue) (types.RuntimeAlert, *Queue) {
-		if expirationDate, ok := BlackholeTracking.Load(leaky.Mapkey); ok {
-			if expirationDate.(time.Time).After(time.Now().UTC()) {
-				leaky.logger.Debugf("Blackhole already triggered for %s", leaky.Mapkey)
+
+		if expiration, ok := BlackholeTracking.Load(leaky.Mapkey); ok {
+			x := expiration.(BlackholeExpiration)
+			if x.blExpiration.After(leaky.Ovflw_ts) {
+				leaky.logger.Debugf("Blackhole already triggered for %s (remaining : %s", leaky.Mapkey, x.blExpiration.Sub(time.Now().UTC()))
 				return types.RuntimeAlert{
 					Mapkey: leaky.Mapkey,
 				}, nil
@@ -71,10 +71,12 @@ func (bl *Blackhole) OnBucketOverflow(bucketFactory *BucketFactory) func(*Leaky,
 			}
 		}
 
-		BlackholeTracking.Store(leaky.Mapkey, time.Now().UTC().Add(bl.duration))
-
-		leaky.logger.Debugf("Blackhole triggered for %s", leaky.Mapkey)
+		BlackholeTracking.Store(leaky.Mapkey, BlackholeExpiration{
+			blExpiration:      leaky.Ovflw_ts.Add(bl.duration),
+			cleanupExpiration: time.Now().UTC().Add(bl.duration),
+		})
 
+		leaky.logger.Debugf("Blackhole triggered for %s (expiration : %s)", leaky.Mapkey, leaky.Ovflw_ts.Add(bl.duration))
 		return alert, queue
 	}
 }

+ 19 - 4
pkg/leakybucket/manager_run.go

@@ -277,7 +277,7 @@ func LoadOrStoreBucketFromHolder(partitionKey string, buckets *Buckets, holder B
 			//once the created goroutine is ready to process event, we can return it
 			<-fresh_bucket.Signal
 		} else {
-			holder.logger.Debugf("Unexpectedly found exisint bucket for %s", partitionKey)
+			holder.logger.Debugf("Unexpectedly found existing bucket for %s", partitionKey)
 			biface = actual
 		}
 		holder.logger.Debugf("Created new bucket %s", partitionKey)
@@ -345,9 +345,24 @@ func PourItemToHolders(parsed types.Event, holders []BucketFactory, buckets *Buc
 			}
 		}
 		buckey := GetKey(holders[idx], groupby)
-		if _, ok := BlackholeTracking.Load(buckey); ok {
-			holders[idx].logger.Tracef("Event is blackholed: %s", buckey)
-			continue
+		if x, ok := BlackholeTracking.Load(buckey); ok {
+			holders[idx].logger.Debugf("Checking if blackhole has expired for %s", buckey)
+			blackholeExp := x.(BlackholeExpiration)
+			t := time.Now().UTC()
+			if parsed.ExpectMode == TIMEMACHINE {
+				//This is not optimal at all, date enrichment should also set parsed.Time to avoid parsing the date twice
+				err := t.UnmarshalText([]byte(parsed.MarshaledTime))
+				if err != nil {
+					holders[idx].logger.Errorf("failed parsing time : %v", err)
+				}
+				holders[idx].logger.Debugf("Found TIMEMACHINE bucket, using %s as time, comparing against %s ", t, blackholeExp.blExpiration)
+			}
+			if blackholeExp.blExpiration.After(t) {
+				holders[idx].logger.Debugf("Event is blackholed: %s (remaining: %s)", buckey, blackholeExp.blExpiration.Sub(t))
+				continue
+			}
+			holders[idx].logger.Debugf("Event is no longer blackholed: %s", buckey)
+			BlackholeTracking.Delete(buckey)
 		}
 		//we need to either find the existing bucket, or create a new one (if it's the first event to hit it for this partition key)
 		bucket, err := LoadOrStoreBucketFromHolder(buckey, buckets, holders[idx], parsed.ExpectMode)