|
@@ -297,7 +297,7 @@ func PourItemToHolders(parsed types.Event, holders []BucketFactory, buckets *Buc
|
|
|
evt := deepcopy.Copy(parsed)
|
|
|
BucketPourCache["OK"] = append(BucketPourCache["OK"], evt.(types.Event))
|
|
|
}
|
|
|
-
|
|
|
+ parserEnv := map[string]interface{}{"evt": &parsed}
|
|
|
//find the relevant holders (scenarios)
|
|
|
for idx := 0; idx < len(holders); idx++ {
|
|
|
//for idx, holder := range holders {
|
|
@@ -305,7 +305,7 @@ func PourItemToHolders(parsed types.Event, holders []BucketFactory, buckets *Buc
|
|
|
//evaluate bucket's condition
|
|
|
if holders[idx].RunTimeFilter != nil {
|
|
|
holders[idx].logger.Tracef("event against holder %d/%d", idx, len(holders))
|
|
|
- output, err := expr.Run(holders[idx].RunTimeFilter, map[string]interface{}{"evt": &parsed})
|
|
|
+ output, err := expr.Run(holders[idx].RunTimeFilter, parserEnv)
|
|
|
if err != nil {
|
|
|
holders[idx].logger.Errorf("failed parsing : %v", err)
|
|
|
return false, fmt.Errorf("leaky failed : %s", err)
|
|
@@ -317,7 +317,7 @@ func PourItemToHolders(parsed types.Event, holders []BucketFactory, buckets *Buc
|
|
|
}
|
|
|
|
|
|
if holders[idx].Debug {
|
|
|
- holders[idx].ExprDebugger.Run(holders[idx].logger, condition, map[string]interface{}{"evt": &parsed})
|
|
|
+ holders[idx].ExprDebugger.Run(holders[idx].logger, condition, parserEnv)
|
|
|
}
|
|
|
if !condition {
|
|
|
holders[idx].logger.Debugf("Event leaving node : ko (filter mismatch)")
|
|
@@ -328,7 +328,7 @@ func PourItemToHolders(parsed types.Event, holders []BucketFactory, buckets *Buc
|
|
|
//groupby determines the partition key for the specific bucket
|
|
|
var groupby string
|
|
|
if holders[idx].RunTimeGroupBy != nil {
|
|
|
- tmpGroupBy, err := expr.Run(holders[idx].RunTimeGroupBy, map[string]interface{}{"evt": &parsed})
|
|
|
+ tmpGroupBy, err := expr.Run(holders[idx].RunTimeGroupBy, parserEnv)
|
|
|
if err != nil {
|
|
|
holders[idx].logger.Errorf("failed groupby : %v", err)
|
|
|
return false, errors.New("leaky failed :/")
|