add Clone, add ParseDump feature
This commit is contained in:
parent
1a499a2b80
commit
44ef58b847
2 changed files with 34 additions and 4 deletions
|
@ -227,6 +227,9 @@ func stageidx(stage string, stages []string) int {
|
|||
return -1
|
||||
}
|
||||
|
||||
var ParseDump bool
|
||||
var StageParseCache map[string]map[string]types.Event
|
||||
|
||||
func /*(u types.UnixParser)*/ Parse(ctx UnixParserCtx, xp types.Event, nodes []Node) (types.Event, error) {
|
||||
var event types.Event = xp
|
||||
|
||||
|
@ -250,7 +253,14 @@ func /*(u types.UnixParser)*/ Parse(ctx UnixParserCtx, xp types.Event, nodes []N
|
|||
log.Tracef("INPUT '%s'", event.Line.Raw)
|
||||
}
|
||||
|
||||
if ParseDump {
|
||||
StageParseCache = make(map[string]map[string]types.Event)
|
||||
}
|
||||
|
||||
for _, stage := range ctx.Stages {
|
||||
if ParseDump {
|
||||
StageParseCache[stage] = make(map[string]types.Event)
|
||||
}
|
||||
/* if the node is forward in stages, seek to its stage */
|
||||
/* this is for example used by testing system to inject logs in post-syslog-parsing phase*/
|
||||
if stageidx(event.Stage, ctx.Stages) > stageidx(stage, ctx.Stages) {
|
||||
|
@ -267,14 +277,14 @@ func /*(u types.UnixParser)*/ Parse(ctx UnixParserCtx, xp types.Event, nodes []N
|
|||
|
||||
isStageOK := false
|
||||
for idx, node := range nodes {
|
||||
clog := log.WithFields(log.Fields{
|
||||
"node-name": node.rn,
|
||||
"stage": event.Stage,
|
||||
})
|
||||
//Only process current stage's nodes
|
||||
if event.Stage != node.Stage {
|
||||
continue
|
||||
}
|
||||
clog := log.WithFields(log.Fields{
|
||||
"node-name": node.rn,
|
||||
"stage": event.Stage,
|
||||
})
|
||||
clog.Tracef("Processing node %d/%d -> %s", idx, len(nodes), node.rn)
|
||||
if ctx.Profiling {
|
||||
node.Profiling = true
|
||||
|
@ -286,6 +296,11 @@ func /*(u types.UnixParser)*/ Parse(ctx UnixParserCtx, xp types.Event, nodes []N
|
|||
clog.Tracef("node (%s) ret : %v", node.rn, ret)
|
||||
if ret {
|
||||
isStageOK = true
|
||||
if ParseDump {
|
||||
evtcopy := types.Event{}
|
||||
types.Clone(&event, &evtcopy)
|
||||
StageParseCache[stage][node.Name] = evtcopy
|
||||
}
|
||||
}
|
||||
if ret && node.OnSuccess == "next_stage" {
|
||||
clog.Debugf("node successful, stop end stage %s", stage)
|
||||
|
|
|
@ -1,7 +1,9 @@
|
|||
package types
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"encoding/binary"
|
||||
"encoding/gob"
|
||||
"fmt"
|
||||
"io"
|
||||
"net"
|
||||
|
@ -93,3 +95,16 @@ func ConfigureLogger(clog *log.Logger) error {
|
|||
clog.SetLevel(logLevel)
|
||||
return nil
|
||||
}
|
||||
|
||||
func Clone(a, b interface{}) {
|
||||
|
||||
buff := new(bytes.Buffer)
|
||||
enc := gob.NewEncoder(buff)
|
||||
dec := gob.NewDecoder(buff)
|
||||
if err := enc.Encode(a); err != nil {
|
||||
log.Fatalf("failed cloning %T", a)
|
||||
}
|
||||
if err := dec.Decode(b); err != nil {
|
||||
log.Fatalf("failed cloning %T", b)
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Add table
Reference in a new issue