|
@@ -154,7 +154,7 @@ func ShutdownAllBuckets(buckets *Buckets) error {
|
|
|
return nil
|
|
|
}
|
|
|
|
|
|
-func PourItemToBucket(bucket *Leaky, holder BucketFactory, buckets *Buckets, parsed types.Event) (bool, error) {
|
|
|
+func PourItemToBucket(bucket *Leaky, holder BucketFactory, buckets *Buckets, parsed *types.Event) (bool, error) {
|
|
|
var sent bool
|
|
|
var buckey = bucket.Mapkey
|
|
|
var err error
|
|
@@ -186,10 +186,10 @@ func PourItemToBucket(bucket *Leaky, holder BucketFactory, buckets *Buckets, par
|
|
|
}
|
|
|
continue
|
|
|
}
|
|
|
- holder.logger.Tracef("Signal exists, try to pour :)")
|
|
|
+ //holder.logger.Tracef("Signal exists, try to pour :)")
|
|
|
default:
|
|
|
/*nothing to read, but not closed, try to pour */
|
|
|
- holder.logger.Tracef("Signal exists but empty, try to pour :)")
|
|
|
+ //holder.logger.Tracef("Signal exists but empty, try to pour :)")
|
|
|
}
|
|
|
|
|
|
/*let's see if this time-bucket should have expired */
|
|
@@ -221,19 +221,19 @@ func PourItemToBucket(bucket *Leaky, holder BucketFactory, buckets *Buckets, par
|
|
|
/*the bucket seems to be up & running*/
|
|
|
select {
|
|
|
case bucket.In <- parsed:
|
|
|
- holder.logger.Tracef("Successfully sent !")
|
|
|
+ //holder.logger.Tracef("Successfully sent !")
|
|
|
if BucketPourTrack {
|
|
|
if _, ok := BucketPourCache[bucket.Name]; !ok {
|
|
|
BucketPourCache[bucket.Name] = make([]types.Event, 0)
|
|
|
}
|
|
|
- evt := deepcopy.Copy(parsed)
|
|
|
+ evt := deepcopy.Copy(*parsed)
|
|
|
BucketPourCache[bucket.Name] = append(BucketPourCache[bucket.Name], evt.(types.Event))
|
|
|
}
|
|
|
sent = true
|
|
|
continue
|
|
|
default:
|
|
|
failed_sent += 1
|
|
|
- holder.logger.Tracef("Failed to send, try again")
|
|
|
+ //holder.logger.Tracef("Failed to send, try again")
|
|
|
continue
|
|
|
|
|
|
}
|
|
@@ -260,7 +260,7 @@ func LoadOrStoreBucketFromHolder(partitionKey string, buckets *Buckets, holder B
|
|
|
default:
|
|
|
return nil, fmt.Errorf("input event has no expected mode : %+v", expectMode)
|
|
|
}
|
|
|
- fresh_bucket.In = make(chan types.Event)
|
|
|
+ fresh_bucket.In = make(chan *types.Event)
|
|
|
fresh_bucket.Mapkey = partitionKey
|
|
|
fresh_bucket.Signal = make(chan bool, 1)
|
|
|
actual, stored := buckets.Bucket_map.LoadOrStore(partitionKey, fresh_bucket)
|
|
@@ -299,54 +299,55 @@ func PourItemToHolders(parsed types.Event, holders []BucketFactory, buckets *Buc
|
|
|
cachedExprEnv := exprhelpers.GetExprEnv(map[string]interface{}{"evt": &parsed})
|
|
|
|
|
|
//find the relevant holders (scenarios)
|
|
|
- for idx, holder := range holders {
|
|
|
+ for idx := 0; idx < len(holders); idx++ {
|
|
|
+ //for idx, holder := range holders {
|
|
|
|
|
|
//evaluate bucket's condition
|
|
|
- if holder.RunTimeFilter != nil {
|
|
|
- holder.logger.Tracef("event against holder %d/%d", idx, len(holders))
|
|
|
- output, err := expr.Run(holder.RunTimeFilter, cachedExprEnv)
|
|
|
+ if holders[idx].RunTimeFilter != nil {
|
|
|
+ holders[idx].logger.Tracef("event against holder %d/%d", idx, len(holders))
|
|
|
+ output, err := expr.Run(holders[idx].RunTimeFilter, cachedExprEnv)
|
|
|
if err != nil {
|
|
|
- holder.logger.Errorf("failed parsing : %v", err)
|
|
|
+ holders[idx].logger.Errorf("failed parsing : %v", err)
|
|
|
return false, fmt.Errorf("leaky failed : %s", err)
|
|
|
}
|
|
|
// we assume we a bool should add type check here
|
|
|
if condition, ok = output.(bool); !ok {
|
|
|
- holder.logger.Errorf("unexpected non-bool return : %T", output)
|
|
|
- holder.logger.Fatalf("Filter issue")
|
|
|
+ holders[idx].logger.Errorf("unexpected non-bool return : %T", output)
|
|
|
+ holders[idx].logger.Fatalf("Filter issue")
|
|
|
}
|
|
|
|
|
|
- if holder.Debug {
|
|
|
- holder.ExprDebugger.Run(holder.logger, condition, cachedExprEnv)
|
|
|
+ if holders[idx].Debug {
|
|
|
+ holders[idx].ExprDebugger.Run(holders[idx].logger, condition, cachedExprEnv)
|
|
|
}
|
|
|
if !condition {
|
|
|
- holder.logger.Debugf("Event leaving node : ko (filter mismatch)")
|
|
|
+ holders[idx].logger.Debugf("Event leaving node : ko (filter mismatch)")
|
|
|
continue
|
|
|
}
|
|
|
}
|
|
|
|
|
|
//groupby determines the partition key for the specific bucket
|
|
|
var groupby string
|
|
|
- if holder.RunTimeGroupBy != nil {
|
|
|
- tmpGroupBy, err := expr.Run(holder.RunTimeGroupBy, cachedExprEnv)
|
|
|
+ if holders[idx].RunTimeGroupBy != nil {
|
|
|
+ tmpGroupBy, err := expr.Run(holders[idx].RunTimeGroupBy, cachedExprEnv)
|
|
|
if err != nil {
|
|
|
- holder.logger.Errorf("failed groupby : %v", err)
|
|
|
+ holders[idx].logger.Errorf("failed groupby : %v", err)
|
|
|
return false, errors.New("leaky failed :/")
|
|
|
}
|
|
|
|
|
|
if groupby, ok = tmpGroupBy.(string); !ok {
|
|
|
- holder.logger.Fatalf("failed groupby type : %v", err)
|
|
|
+ holders[idx].logger.Fatalf("failed groupby type : %v", err)
|
|
|
return false, errors.New("groupby wrong type")
|
|
|
}
|
|
|
}
|
|
|
- buckey := GetKey(holder, groupby)
|
|
|
+ buckey := GetKey(holders[idx], groupby)
|
|
|
|
|
|
//we need to either find the existing bucket, or create a new one (if it's the first event to hit it for this partition key)
|
|
|
- bucket, err := LoadOrStoreBucketFromHolder(buckey, buckets, holder, parsed.ExpectMode)
|
|
|
+ bucket, err := LoadOrStoreBucketFromHolder(buckey, buckets, holders[idx], parsed.ExpectMode)
|
|
|
if err != nil {
|
|
|
return false, errors.Wrap(err, "failed to load or store bucket")
|
|
|
}
|
|
|
//finally, pour the even into the bucket
|
|
|
- ok, err := PourItemToBucket(bucket, holder, buckets, parsed)
|
|
|
+ ok, err := PourItemToBucket(bucket, holders[idx], buckets, &parsed)
|
|
|
if err != nil {
|
|
|
return false, errors.Wrap(err, "failed to pour bucket")
|
|
|
}
|