diff --git a/pkg/leakybucket/bucket.go b/pkg/leakybucket/bucket.go index 286c51f11..500c7bc9b 100644 --- a/pkg/leakybucket/bucket.go +++ b/pkg/leakybucket/bucket.go @@ -70,6 +70,7 @@ type Leaky struct { wgPour *sync.WaitGroup wgDumpState *sync.WaitGroup mutex *sync.Mutex //used only for TIMEMACHINE mode to allow garbage collection without races + orderEvent bool } var BucketsPour = prometheus.NewCounterVec( diff --git a/pkg/leakybucket/manager_run.go b/pkg/leakybucket/manager_run.go index 679c603df..d4f11266f 100644 --- a/pkg/leakybucket/manager_run.go +++ b/pkg/leakybucket/manager_run.go @@ -6,6 +6,7 @@ import ( "fmt" "math" "os" + "sync" "time" "github.com/antonmedv/expr" @@ -279,6 +280,8 @@ func LoadOrStoreBucketFromHolder(partitionKey string, buckets *Buckets, holder B return biface.(*Leaky), nil } +var orderEvent map[string]*sync.WaitGroup + func PourItemToHolders(parsed types.Event, holders []BucketFactory, buckets *Buckets) (bool, error) { var ( ok, condition, poured bool @@ -344,7 +347,21 @@ func PourItemToHolders(parsed types.Event, holders []BucketFactory, buckets *Buc return false, fmt.Errorf("failed to load or store bucket: %w", err) } //finally, pour the even into the bucket + + if bucket.orderEvent { + if orderEvent == nil { + orderEvent = make(map[string]*sync.WaitGroup) + } + orderEvent[buckey] = &sync.WaitGroup{} + orderEvent[buckey].Add(1) + } + ok, err := PourItemToBucket(bucket, holders[idx], buckets, &parsed) + + if bucket.orderEvent { + orderEvent[buckey].Wait() + } + if err != nil { return false, fmt.Errorf("failed to pour bucket: %w", err) }