sabban 2 anni fa
parent
commit
8f1ca97a88
2 ha cambiato i file con 18 aggiunte e 0 eliminazioni
  1. 1 0
      pkg/leakybucket/bucket.go
  2. 17 0
      pkg/leakybucket/manager_run.go

+ 1 - 0
pkg/leakybucket/bucket.go

@@ -70,6 +70,7 @@ type Leaky struct {
 	wgPour              *sync.WaitGroup
 	wgDumpState         *sync.WaitGroup
 	mutex               *sync.Mutex //used only for TIMEMACHINE mode to allow garbage collection without races
+	orderEvent          bool
 }
 
 var BucketsPour = prometheus.NewCounterVec(

+ 17 - 0
pkg/leakybucket/manager_run.go

@@ -6,6 +6,7 @@ import (
 	"fmt"
 	"math"
 	"os"
+	"sync"
 	"time"
 
 	"github.com/antonmedv/expr"
@@ -279,6 +280,8 @@ func LoadOrStoreBucketFromHolder(partitionKey string, buckets *Buckets, holder B
 	return biface.(*Leaky), nil
 }
 
+var orderEvent map[string]*sync.WaitGroup
+
 func PourItemToHolders(parsed types.Event, holders []BucketFactory, buckets *Buckets) (bool, error) {
 	var (
 		ok, condition, poured bool
@@ -344,7 +347,21 @@ func PourItemToHolders(parsed types.Event, holders []BucketFactory, buckets *Buc
 			return false, fmt.Errorf("failed to load or store bucket: %w", err)
 		}
 		//finally, pour the even into the bucket
+
+		if bucket.orderEvent {
+			if orderEvent == nil {
+				orderEvent = make(map[string]*sync.WaitGroup)
+			}
+			orderEvent[buckey] = &sync.WaitGroup{}
+			orderEvent[buckey].Add(1)
+		}
+
 		ok, err := PourItemToBucket(bucket, holders[idx], buckets, &parsed)
+
+		if bucket.orderEvent {
+			orderEvent[buckey].Wait()
+		}
+
 		if err != nil {
 			return false, fmt.Errorf("failed to pour bucket: %w", err)
 		}