diff --git a/cmd/crowdsec/acquisition.go b/cmd/crowdsec/acquisition.go deleted file mode 100644 index 246cf3945..000000000 --- a/cmd/crowdsec/acquisition.go +++ /dev/null @@ -1,34 +0,0 @@ -package main - -import ( - "fmt" - - "github.com/crowdsecurity/crowdsec/pkg/acquisition" -) - -func loadAcquisition() (*acquisition.FileAcquisCtx, error) { - var acquisitionCTX *acquisition.FileAcquisCtx - var err error - /*Init the acqusition : from cli or from acquis.yaml file*/ - if cConfig.SingleFile != "" { - var input acquisition.FileCtx - input.Filename = cConfig.SingleFile - input.Mode = acquisition.CATMODE - input.Labels = make(map[string]string) - input.Labels["type"] = cConfig.SingleFileLabel - acquisitionCTX, err = acquisition.InitReaderFromFileCtx([]acquisition.FileCtx{input}) - } else { /* Init file reader if we tail */ - acquisitionCTX, err = acquisition.InitReader(cConfig.AcquisitionFile) - } - if err != nil { - return nil, fmt.Errorf("unable to start file acquisition, bailout %v", err) - } - if acquisitionCTX == nil { - return nil, fmt.Errorf("no inputs to process") - } - if cConfig.Profiling { - acquisitionCTX.Profiling = true - } - - return acquisitionCTX, nil -} diff --git a/cmd/crowdsec/main.go b/cmd/crowdsec/main.go index cc9bbf85d..3bf2afd62 100644 --- a/cmd/crowdsec/main.go +++ b/cmd/crowdsec/main.go @@ -67,7 +67,7 @@ func main() { log.Fatal(err.Error()) } - log.Infof("Crowdwatch %s", cwversion.VersionStr()) + log.Infof("Crowdsec %s", cwversion.VersionStr()) if cConfig.Prometheus { registerPrometheus() @@ -294,7 +294,7 @@ func main() { log.Warningf("Starting processing data") //Init the acqusition : from cli or from acquis.yaml file - acquisitionCTX, err = loadAcquisition() + acquisitionCTX, err = acquisition.LoadAcquisitionConfig(cConfig) if err != nil { log.Fatalf("Failed to start acquisition : %s", err) } diff --git a/cmd/crowdsec/output.go b/cmd/crowdsec/output.go index af7c738ea..d5e1bd2e9 100644 --- a/cmd/crowdsec/output.go +++ b/cmd/crowdsec/output.go @@ -40,7 +40,7 @@ LOOP: buckets.Bucket_map.Delete(event.Overflow.MapKey) } else { /*let's handle output profiles */ - if err := output.ProcessOutput(event.Overflow, outputProfiles); err != nil { + if err := output.ProcessOutput(*(event.Overflow), outputProfiles); err != nil { log.Warningf("Error while processing overflow/output : %s", err) } } diff --git a/go.mod b/go.mod index 95bf7db39..f625048fc 100644 --- a/go.mod +++ b/go.mod @@ -15,6 +15,7 @@ require ( github.com/docker/go-connections v0.4.0 github.com/docker/go-units v0.4.0 // indirect github.com/enescakir/emoji v1.0.0 + github.com/google/go-cmp v0.4.0 github.com/goombaio/namegenerator v0.0.0-20181006234301-989e774b106e github.com/hashicorp/go-version v1.2.0 github.com/jamiealquiza/tachymeter v2.0.0+incompatible diff --git a/go.sum b/go.sum index 374bcc68b..843147963 100644 --- a/go.sum +++ b/go.sum @@ -83,7 +83,9 @@ github.com/golang/protobuf v1.3.2/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5y github.com/google/btree v1.0.0/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ= github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5aqRK0M= github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= +github.com/google/go-cmp v0.4.0 h1:xsAVV57WRhGj6kEIi8ReJzQlHHqcBYCElAvkovg3B/4= github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= +github.com/google/go-cmp v0.4.1 h1:/exdXoGamhu5ONeUJH0deniYLWYvQwW66yvlfiiKTu0= github.com/google/go-querystring v1.0.0 h1:Xkwi/a1rcvNg1PPYe5vI8GbeBY/jrVuDX5ASuANWTrk= github.com/google/go-querystring v1.0.0/go.mod h1:odCYkC5MyYFN7vkCjXpyrEuKhc/BUO6wN/zVPAxq5ck= github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= @@ -168,6 +170,7 @@ github.com/prometheus/client_golang v0.9.3/go.mod h1:/TN21ttK/J9q6uSwhBd54HahCDf github.com/prometheus/client_golang v1.0.0/go.mod h1:db9x61etRT2tGnBNRi70OPL5FsnadC4Ky3P0J6CfImo= github.com/prometheus/client_golang v1.5.1 h1:bdHYieyGlH+6OLEk2YQha8THib30KP0/yD0YH9m6xcA= github.com/prometheus/client_golang v1.5.1/go.mod h1:e9GMxYsXl05ICDXkRhurwBS4Q3OK1iX/F2sw+iXX5zU= +github.com/prometheus/client_golang v1.6.0 h1:YVPodQOcK15POxhgARIvnDRVpLcuK8mglnMrWfyrw6A= github.com/prometheus/client_model v0.0.0-20180712105110-5c3871d89910/go.mod h1:MbSGuTsp3dbXC40dX6PRTWyKYBIrTGTE9sqQNg2J8bo= github.com/prometheus/client_model v0.0.0-20190129233127-fd36f4220a90/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA= github.com/prometheus/client_model v0.1.0/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA= diff --git a/pkg/acquisition/file_reader.go b/pkg/acquisition/file_reader.go index 116ab2fab..ecb41711f 100644 --- a/pkg/acquisition/file_reader.go +++ b/pkg/acquisition/file_reader.go @@ -10,6 +10,7 @@ import ( "os" "strings" + "github.com/crowdsecurity/crowdsec/pkg/csconfig" leaky "github.com/crowdsecurity/crowdsec/pkg/leakybucket" "github.com/crowdsecurity/crowdsec/pkg/types" @@ -66,6 +67,33 @@ var ReaderHits = prometheus.NewCounterVec( []string{"source"}, ) +func LoadAcquisitionConfig(cConfig *csconfig.CrowdSec) (*FileAcquisCtx, error) { + var acquisitionCTX *FileAcquisCtx + var err error + /*Init the acqusition : from cli or from acquis.yaml file*/ + if cConfig.SingleFile != "" { + var input FileCtx + input.Filename = cConfig.SingleFile + input.Mode = CATMODE + input.Labels = make(map[string]string) + input.Labels["type"] = cConfig.SingleFileLabel + acquisitionCTX, err = InitReaderFromFileCtx([]FileCtx{input}) + } else { /* Init file reader if we tail */ + acquisitionCTX, err = InitReader(cConfig.AcquisitionFile) + } + if err != nil { + return nil, fmt.Errorf("unable to start file acquisition, bailout %v", err) + } + if acquisitionCTX == nil { + return nil, fmt.Errorf("no inputs to process") + } + if cConfig.Profiling { + acquisitionCTX.Profiling = true + } + + return acquisitionCTX, nil +} + func InitReader(cfg string) (*FileAcquisCtx, error) { var files []FileCtx diff --git a/pkg/cwapi/signals.go b/pkg/cwapi/signals.go index bdc416735..0610f9d15 100644 --- a/pkg/cwapi/signals.go +++ b/pkg/cwapi/signals.go @@ -13,7 +13,7 @@ import ( ) func (ctx *ApiCtx) AppendSignal(sig types.SignalOccurence) error { - ctx.toPush = append(ctx.toPush, types.Event{Overflow: sig}) + ctx.toPush = append(ctx.toPush, types.Event{Overflow: &sig}) log.Debugf("api append signal: adding new signal (cache size : %d): %+v", len(ctx.toPush), sig) return nil } diff --git a/pkg/leakybucket/bucket.go b/pkg/leakybucket/bucket.go index 18913f191..1309c937e 100644 --- a/pkg/leakybucket/bucket.go +++ b/pkg/leakybucket/bucket.go @@ -233,13 +233,13 @@ func LeakRoutine(l *Leaky) { break } } - l.logger.Tracef("Overflow event: %s", spew.Sdump(types.Event{Overflow: sig})) + l.logger.Tracef("Overflow event: %s", spew.Sdump(types.Event{Overflow: &sig})) mt, _ := l.Ovflw_ts.MarshalText() l.logger.Tracef("overflow time : %s", mt) if l.Profiling { BucketsOverflow.With(prometheus.Labels{"name": l.Name}).Inc() } - l.AllOut <- types.Event{Overflow: sig, Type: types.OVFLW, MarshaledTime: string(mt)} + l.AllOut <- types.Event{Overflow: &sig, Type: types.OVFLW, MarshaledTime: string(mt)} return /*we underflow or reach bucket deadline (timers)*/ case <-durationTicker: @@ -266,9 +266,9 @@ func LeakRoutine(l *Leaky) { BucketsUnderflow.With(prometheus.Labels{"name": l.Name}).Inc() } - l.logger.Tracef("Overflow event: %s", spew.Sdump(types.Event{Overflow: sig})) + l.logger.Tracef("Overflow event: %s", spew.Sdump(types.Event{Overflow: &sig})) - l.AllOut <- types.Event{Overflow: sig, Type: types.OVFLW} + l.AllOut <- types.Event{Overflow: &sig, Type: types.OVFLW} l.logger.Tracef("Returning from leaky routine.") return } diff --git a/pkg/parser/runtime.go b/pkg/parser/runtime.go index b255f21c0..d2eb35f49 100644 --- a/pkg/parser/runtime.go +++ b/pkg/parser/runtime.go @@ -227,6 +227,9 @@ func stageidx(stage string, stages []string) int { return -1 } +var ParseDump bool +var StageParseCache map[string]map[string]types.Event + func /*(u types.UnixParser)*/ Parse(ctx UnixParserCtx, xp types.Event, nodes []Node) (types.Event, error) { var event types.Event = xp @@ -250,7 +253,13 @@ func /*(u types.UnixParser)*/ Parse(ctx UnixParserCtx, xp types.Event, nodes []N log.Tracef("INPUT '%s'", event.Line.Raw) } + if ParseDump { + StageParseCache = make(map[string]map[string]types.Event) + } for _, stage := range ctx.Stages { + if ParseDump { + StageParseCache[stage] = make(map[string]types.Event) + } /* if the node is forward in stages, seek to its stage */ /* this is for example used by testing system to inject logs in post-syslog-parsing phase*/ if stageidx(event.Stage, ctx.Stages) > stageidx(stage, ctx.Stages) { @@ -267,14 +276,14 @@ func /*(u types.UnixParser)*/ Parse(ctx UnixParserCtx, xp types.Event, nodes []N isStageOK := false for idx, node := range nodes { - clog := log.WithFields(log.Fields{ - "node-name": node.rn, - "stage": event.Stage, - }) //Only process current stage's nodes if event.Stage != node.Stage { continue } + clog := log.WithFields(log.Fields{ + "node-name": node.rn, + "stage": event.Stage, + }) clog.Tracef("Processing node %d/%d -> %s", idx, len(nodes), node.rn) if ctx.Profiling { node.Profiling = true @@ -283,6 +292,11 @@ func /*(u types.UnixParser)*/ Parse(ctx UnixParserCtx, xp types.Event, nodes []N if err != nil { clog.Fatalf("Error while processing node : %v", err) } + if ret && ParseDump { + evtcopy := types.Event{} + types.Clone(&event, &evtcopy) + StageParseCache[stage][node.Name] = evtcopy + } clog.Tracef("node (%s) ret : %v", node.rn, ret) if ret { isStageOK = true diff --git a/pkg/types/event.go b/pkg/types/event.go index e1a43d382..cf5e0e363 100644 --- a/pkg/types/event.go +++ b/pkg/types/event.go @@ -1,6 +1,8 @@ package types import ( + "encoding/json" + "fmt" "time" ) @@ -19,17 +21,51 @@ type Event struct { /* the current stage of the line being parsed */ Stage string `yaml:"Stage,omitempty"` /* original line (produced by acquisition) */ - Line Line `json:"-" yaml:"Line,omitempty"` + Line Line `yaml:"Line,omitempty"` /* output of groks */ - Parsed map[string]string `json:"-" yaml:"Parsed,omitempty"` + Parsed map[string]string `yaml:"Parsed,omitempty"` /* output of enrichment */ Enriched map[string]string `json:"Enriched,omitempty" yaml:"Enriched,omitempty"` /* Overflow */ - Overflow SignalOccurence `yaml:"Overflow,omitempty"` - Time time.Time `json:"Time,omitempty"` //parsed time `json:"-"` `` - StrTime string `yaml:"StrTime,omitempty"` - MarshaledTime string `yaml:"MarshaledTime,omitempty"` - Process bool `yaml:"Process,omitempty"` //can be set to false to avoid processing line + Overflow *SignalOccurence `yaml:"Overflow,omitempty"` + Time time.Time `json:"Time,omitempty"` //parsed time + StrTime string `yaml:"StrTime,omitempty"` + MarshaledTime string `yaml:"MarshaledTime,omitempty"` + Process bool `yaml:"Process,omitempty"` //can be set to false to avoid processing line /* Meta is the only part that will make it to the API - it should be normalized */ Meta map[string]string `json:"Meta,omitempty" yaml:"Meta,omitempty"` } + +func MarshalForHumans(evt Event) (string, error) { + repr := make(map[string]interface{}) + + repr["Whitelisted"] = evt.Whitelisted + repr["WhiteListReason"] = evt.WhiteListReason + repr["Stage"] = evt.Stage + if evt.Line.Raw != "" { + repr["Line"] = evt.Line + } + if len(evt.Parsed) > 0 { + repr["Parsed"] = evt.Parsed + } + if len(evt.Enriched) > 0 { + repr["Enriched"] = evt.Enriched + } + if len(evt.Meta) > 0 { + repr["Meta"] = evt.Meta + } + if evt.Overflow.Events_count != 0 { + repr["Overflow"] = evt.Overflow + } + repr["StrTime"] = evt.StrTime + repr["Process"] = evt.Process + output, err := json.MarshalIndent(repr, "", " ") + if err != nil { + return "", fmt.Errorf("failed to marshal : %s", err) + } + return string(output), nil +} + +func MarshalForAPI() ([]byte, error) { + return nil, nil +} diff --git a/pkg/types/utils.go b/pkg/types/utils.go index a5f8fded9..45f01e185 100644 --- a/pkg/types/utils.go +++ b/pkg/types/utils.go @@ -1,7 +1,9 @@ package types import ( + "bytes" "encoding/binary" + "encoding/gob" "fmt" "io" "net" @@ -93,3 +95,13 @@ func ConfigureLogger(clog *log.Logger) error { clog.SetLevel(logLevel) return nil } + +// straight from stackoverflow : Clone deep-copies a to b +func Clone(a, b interface{}) { + + buff := new(bytes.Buffer) + enc := gob.NewEncoder(buff) + dec := gob.NewDecoder(buff) + enc.Encode(a) + dec.Decode(b) +}