This commit is contained in:
Thibault bui Koechlin 2020-06-05 18:51:12 +02:00
parent c37f020da3
commit 0b5cd18533
11 changed files with 113 additions and 53 deletions

View file

@ -1,34 +0,0 @@
package main
import (
"fmt"
"github.com/crowdsecurity/crowdsec/pkg/acquisition"
)
func loadAcquisition() (*acquisition.FileAcquisCtx, error) {
var acquisitionCTX *acquisition.FileAcquisCtx
var err error
/*Init the acqusition : from cli or from acquis.yaml file*/
if cConfig.SingleFile != "" {
var input acquisition.FileCtx
input.Filename = cConfig.SingleFile
input.Mode = acquisition.CATMODE
input.Labels = make(map[string]string)
input.Labels["type"] = cConfig.SingleFileLabel
acquisitionCTX, err = acquisition.InitReaderFromFileCtx([]acquisition.FileCtx{input})
} else { /* Init file reader if we tail */
acquisitionCTX, err = acquisition.InitReader(cConfig.AcquisitionFile)
}
if err != nil {
return nil, fmt.Errorf("unable to start file acquisition, bailout %v", err)
}
if acquisitionCTX == nil {
return nil, fmt.Errorf("no inputs to process")
}
if cConfig.Profiling {
acquisitionCTX.Profiling = true
}
return acquisitionCTX, nil
}

View file

@ -67,7 +67,7 @@ func main() {
log.Fatal(err.Error())
}
log.Infof("Crowdwatch %s", cwversion.VersionStr())
log.Infof("Crowdsec %s", cwversion.VersionStr())
if cConfig.Prometheus {
registerPrometheus()
@ -294,7 +294,7 @@ func main() {
log.Warningf("Starting processing data")
//Init the acqusition : from cli or from acquis.yaml file
acquisitionCTX, err = loadAcquisition()
acquisitionCTX, err = acquisition.LoadAcquisitionConfig(cConfig)
if err != nil {
log.Fatalf("Failed to start acquisition : %s", err)
}

View file

@ -40,7 +40,7 @@ LOOP:
buckets.Bucket_map.Delete(event.Overflow.MapKey)
} else {
/*let's handle output profiles */
if err := output.ProcessOutput(event.Overflow, outputProfiles); err != nil {
if err := output.ProcessOutput(*(event.Overflow), outputProfiles); err != nil {
log.Warningf("Error while processing overflow/output : %s", err)
}
}

1
go.mod
View file

@ -15,6 +15,7 @@ require (
github.com/docker/go-connections v0.4.0
github.com/docker/go-units v0.4.0 // indirect
github.com/enescakir/emoji v1.0.0
github.com/google/go-cmp v0.4.0
github.com/goombaio/namegenerator v0.0.0-20181006234301-989e774b106e
github.com/hashicorp/go-version v1.2.0
github.com/jamiealquiza/tachymeter v2.0.0+incompatible

3
go.sum
View file

@ -83,7 +83,9 @@ github.com/golang/protobuf v1.3.2/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5y
github.com/google/btree v1.0.0/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ=
github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5aqRK0M=
github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU=
github.com/google/go-cmp v0.4.0 h1:xsAVV57WRhGj6kEIi8ReJzQlHHqcBYCElAvkovg3B/4=
github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.4.1 h1:/exdXoGamhu5ONeUJH0deniYLWYvQwW66yvlfiiKTu0=
github.com/google/go-querystring v1.0.0 h1:Xkwi/a1rcvNg1PPYe5vI8GbeBY/jrVuDX5ASuANWTrk=
github.com/google/go-querystring v1.0.0/go.mod h1:odCYkC5MyYFN7vkCjXpyrEuKhc/BUO6wN/zVPAxq5ck=
github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg=
@ -168,6 +170,7 @@ github.com/prometheus/client_golang v0.9.3/go.mod h1:/TN21ttK/J9q6uSwhBd54HahCDf
github.com/prometheus/client_golang v1.0.0/go.mod h1:db9x61etRT2tGnBNRi70OPL5FsnadC4Ky3P0J6CfImo=
github.com/prometheus/client_golang v1.5.1 h1:bdHYieyGlH+6OLEk2YQha8THib30KP0/yD0YH9m6xcA=
github.com/prometheus/client_golang v1.5.1/go.mod h1:e9GMxYsXl05ICDXkRhurwBS4Q3OK1iX/F2sw+iXX5zU=
github.com/prometheus/client_golang v1.6.0 h1:YVPodQOcK15POxhgARIvnDRVpLcuK8mglnMrWfyrw6A=
github.com/prometheus/client_model v0.0.0-20180712105110-5c3871d89910/go.mod h1:MbSGuTsp3dbXC40dX6PRTWyKYBIrTGTE9sqQNg2J8bo=
github.com/prometheus/client_model v0.0.0-20190129233127-fd36f4220a90/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA=
github.com/prometheus/client_model v0.1.0/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA=

View file

@ -10,6 +10,7 @@ import (
"os"
"strings"
"github.com/crowdsecurity/crowdsec/pkg/csconfig"
leaky "github.com/crowdsecurity/crowdsec/pkg/leakybucket"
"github.com/crowdsecurity/crowdsec/pkg/types"
@ -66,6 +67,33 @@ var ReaderHits = prometheus.NewCounterVec(
[]string{"source"},
)
func LoadAcquisitionConfig(cConfig *csconfig.CrowdSec) (*FileAcquisCtx, error) {
var acquisitionCTX *FileAcquisCtx
var err error
/*Init the acqusition : from cli or from acquis.yaml file*/
if cConfig.SingleFile != "" {
var input FileCtx
input.Filename = cConfig.SingleFile
input.Mode = CATMODE
input.Labels = make(map[string]string)
input.Labels["type"] = cConfig.SingleFileLabel
acquisitionCTX, err = InitReaderFromFileCtx([]FileCtx{input})
} else { /* Init file reader if we tail */
acquisitionCTX, err = InitReader(cConfig.AcquisitionFile)
}
if err != nil {
return nil, fmt.Errorf("unable to start file acquisition, bailout %v", err)
}
if acquisitionCTX == nil {
return nil, fmt.Errorf("no inputs to process")
}
if cConfig.Profiling {
acquisitionCTX.Profiling = true
}
return acquisitionCTX, nil
}
func InitReader(cfg string) (*FileAcquisCtx, error) {
var files []FileCtx

View file

@ -13,7 +13,7 @@ import (
)
func (ctx *ApiCtx) AppendSignal(sig types.SignalOccurence) error {
ctx.toPush = append(ctx.toPush, types.Event{Overflow: sig})
ctx.toPush = append(ctx.toPush, types.Event{Overflow: &sig})
log.Debugf("api append signal: adding new signal (cache size : %d): %+v", len(ctx.toPush), sig)
return nil
}

View file

@ -233,13 +233,13 @@ func LeakRoutine(l *Leaky) {
break
}
}
l.logger.Tracef("Overflow event: %s", spew.Sdump(types.Event{Overflow: sig}))
l.logger.Tracef("Overflow event: %s", spew.Sdump(types.Event{Overflow: &sig}))
mt, _ := l.Ovflw_ts.MarshalText()
l.logger.Tracef("overflow time : %s", mt)
if l.Profiling {
BucketsOverflow.With(prometheus.Labels{"name": l.Name}).Inc()
}
l.AllOut <- types.Event{Overflow: sig, Type: types.OVFLW, MarshaledTime: string(mt)}
l.AllOut <- types.Event{Overflow: &sig, Type: types.OVFLW, MarshaledTime: string(mt)}
return
/*we underflow or reach bucket deadline (timers)*/
case <-durationTicker:
@ -266,9 +266,9 @@ func LeakRoutine(l *Leaky) {
BucketsUnderflow.With(prometheus.Labels{"name": l.Name}).Inc()
}
l.logger.Tracef("Overflow event: %s", spew.Sdump(types.Event{Overflow: sig}))
l.logger.Tracef("Overflow event: %s", spew.Sdump(types.Event{Overflow: &sig}))
l.AllOut <- types.Event{Overflow: sig, Type: types.OVFLW}
l.AllOut <- types.Event{Overflow: &sig, Type: types.OVFLW}
l.logger.Tracef("Returning from leaky routine.")
return
}

View file

@ -227,6 +227,9 @@ func stageidx(stage string, stages []string) int {
return -1
}
var ParseDump bool
var StageParseCache map[string]map[string]types.Event
func /*(u types.UnixParser)*/ Parse(ctx UnixParserCtx, xp types.Event, nodes []Node) (types.Event, error) {
var event types.Event = xp
@ -250,7 +253,13 @@ func /*(u types.UnixParser)*/ Parse(ctx UnixParserCtx, xp types.Event, nodes []N
log.Tracef("INPUT '%s'", event.Line.Raw)
}
if ParseDump {
StageParseCache = make(map[string]map[string]types.Event)
}
for _, stage := range ctx.Stages {
if ParseDump {
StageParseCache[stage] = make(map[string]types.Event)
}
/* if the node is forward in stages, seek to its stage */
/* this is for example used by testing system to inject logs in post-syslog-parsing phase*/
if stageidx(event.Stage, ctx.Stages) > stageidx(stage, ctx.Stages) {
@ -267,14 +276,14 @@ func /*(u types.UnixParser)*/ Parse(ctx UnixParserCtx, xp types.Event, nodes []N
isStageOK := false
for idx, node := range nodes {
clog := log.WithFields(log.Fields{
"node-name": node.rn,
"stage": event.Stage,
})
//Only process current stage's nodes
if event.Stage != node.Stage {
continue
}
clog := log.WithFields(log.Fields{
"node-name": node.rn,
"stage": event.Stage,
})
clog.Tracef("Processing node %d/%d -> %s", idx, len(nodes), node.rn)
if ctx.Profiling {
node.Profiling = true
@ -283,6 +292,11 @@ func /*(u types.UnixParser)*/ Parse(ctx UnixParserCtx, xp types.Event, nodes []N
if err != nil {
clog.Fatalf("Error while processing node : %v", err)
}
if ret && ParseDump {
evtcopy := types.Event{}
types.Clone(&event, &evtcopy)
StageParseCache[stage][node.Name] = evtcopy
}
clog.Tracef("node (%s) ret : %v", node.rn, ret)
if ret {
isStageOK = true

View file

@ -1,6 +1,8 @@
package types
import (
"encoding/json"
"fmt"
"time"
)
@ -19,17 +21,51 @@ type Event struct {
/* the current stage of the line being parsed */
Stage string `yaml:"Stage,omitempty"`
/* original line (produced by acquisition) */
Line Line `json:"-" yaml:"Line,omitempty"`
Line Line `yaml:"Line,omitempty"`
/* output of groks */
Parsed map[string]string `json:"-" yaml:"Parsed,omitempty"`
Parsed map[string]string `yaml:"Parsed,omitempty"`
/* output of enrichment */
Enriched map[string]string `json:"Enriched,omitempty" yaml:"Enriched,omitempty"`
/* Overflow */
Overflow SignalOccurence `yaml:"Overflow,omitempty"`
Time time.Time `json:"Time,omitempty"` //parsed time `json:"-"` ``
StrTime string `yaml:"StrTime,omitempty"`
MarshaledTime string `yaml:"MarshaledTime,omitempty"`
Process bool `yaml:"Process,omitempty"` //can be set to false to avoid processing line
Overflow *SignalOccurence `yaml:"Overflow,omitempty"`
Time time.Time `json:"Time,omitempty"` //parsed time
StrTime string `yaml:"StrTime,omitempty"`
MarshaledTime string `yaml:"MarshaledTime,omitempty"`
Process bool `yaml:"Process,omitempty"` //can be set to false to avoid processing line
/* Meta is the only part that will make it to the API - it should be normalized */
Meta map[string]string `json:"Meta,omitempty" yaml:"Meta,omitempty"`
}
func MarshalForHumans(evt Event) (string, error) {
repr := make(map[string]interface{})
repr["Whitelisted"] = evt.Whitelisted
repr["WhiteListReason"] = evt.WhiteListReason
repr["Stage"] = evt.Stage
if evt.Line.Raw != "" {
repr["Line"] = evt.Line
}
if len(evt.Parsed) > 0 {
repr["Parsed"] = evt.Parsed
}
if len(evt.Enriched) > 0 {
repr["Enriched"] = evt.Enriched
}
if len(evt.Meta) > 0 {
repr["Meta"] = evt.Meta
}
if evt.Overflow.Events_count != 0 {
repr["Overflow"] = evt.Overflow
}
repr["StrTime"] = evt.StrTime
repr["Process"] = evt.Process
output, err := json.MarshalIndent(repr, "", " ")
if err != nil {
return "", fmt.Errorf("failed to marshal : %s", err)
}
return string(output), nil
}
func MarshalForAPI() ([]byte, error) {
return nil, nil
}

View file

@ -1,7 +1,9 @@
package types
import (
"bytes"
"encoding/binary"
"encoding/gob"
"fmt"
"io"
"net"
@ -93,3 +95,13 @@ func ConfigureLogger(clog *log.Logger) error {
clog.SetLevel(logLevel)
return nil
}
// straight from stackoverflow : Clone deep-copies a to b
func Clone(a, b interface{}) {
buff := new(bytes.Buffer)
enc := gob.NewEncoder(buff)
dec := gob.NewDecoder(buff)
enc.Encode(a)
dec.Decode(b)
}