Allow parsers to capture data for future enrichment (#1969)
* Allow parsers to capture data in a cache, that can be later accessed via expr helpers (fake multi-line support)
This commit is contained in:
parent
cd4dabde0e
commit
6fb962a941
17 changed files with 434 additions and 16 deletions
|
@ -57,6 +57,10 @@ func FormatPrometheusMetrics(out io.Writer, url string, formatType string) error
|
||||||
lapi_bouncer_stats := map[string]map[string]map[string]int{}
|
lapi_bouncer_stats := map[string]map[string]map[string]int{}
|
||||||
decisions_stats := map[string]map[string]map[string]int{}
|
decisions_stats := map[string]map[string]map[string]int{}
|
||||||
alerts_stats := map[string]int{}
|
alerts_stats := map[string]int{}
|
||||||
|
stash_stats := map[string]struct {
|
||||||
|
Type string
|
||||||
|
Count int
|
||||||
|
}{}
|
||||||
|
|
||||||
for idx, fam := range result {
|
for idx, fam := range result {
|
||||||
if !strings.HasPrefix(fam.Name, "cs_") {
|
if !strings.HasPrefix(fam.Name, "cs_") {
|
||||||
|
@ -93,6 +97,8 @@ func FormatPrometheusMetrics(out io.Writer, url string, formatType string) error
|
||||||
origin := metric.Labels["origin"]
|
origin := metric.Labels["origin"]
|
||||||
action := metric.Labels["action"]
|
action := metric.Labels["action"]
|
||||||
|
|
||||||
|
mtype := metric.Labels["type"]
|
||||||
|
|
||||||
fval, err := strconv.ParseFloat(value, 32)
|
fval, err := strconv.ParseFloat(value, 32)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Errorf("Unexpected int value %s : %s", value, err)
|
log.Errorf("Unexpected int value %s : %s", value, err)
|
||||||
|
@ -208,6 +214,11 @@ func FormatPrometheusMetrics(out io.Writer, url string, formatType string) error
|
||||||
alerts_stats[scenario] = make(map[string]int)
|
alerts_stats[scenario] = make(map[string]int)
|
||||||
}*/
|
}*/
|
||||||
alerts_stats[reason] += ival
|
alerts_stats[reason] += ival
|
||||||
|
case "cs_cache_size":
|
||||||
|
stash_stats[name] = struct {
|
||||||
|
Type string
|
||||||
|
Count int
|
||||||
|
}{Type: mtype, Count: ival}
|
||||||
default:
|
default:
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
@ -225,8 +236,9 @@ func FormatPrometheusMetrics(out io.Writer, url string, formatType string) error
|
||||||
lapiDecisionStatsTable(out, lapi_decisions_stats)
|
lapiDecisionStatsTable(out, lapi_decisions_stats)
|
||||||
decisionStatsTable(out, decisions_stats)
|
decisionStatsTable(out, decisions_stats)
|
||||||
alertStatsTable(out, alerts_stats)
|
alertStatsTable(out, alerts_stats)
|
||||||
|
stashStatsTable(out, stash_stats)
|
||||||
} else if formatType == "json" {
|
} else if formatType == "json" {
|
||||||
for _, val := range []interface{}{acquis_stats, parsers_stats, buckets_stats, lapi_stats, lapi_bouncer_stats, lapi_machine_stats, lapi_decisions_stats, decisions_stats, alerts_stats} {
|
for _, val := range []interface{}{acquis_stats, parsers_stats, buckets_stats, lapi_stats, lapi_bouncer_stats, lapi_machine_stats, lapi_decisions_stats, decisions_stats, alerts_stats, stash_stats} {
|
||||||
x, err := json.MarshalIndent(val, "", " ")
|
x, err := json.MarshalIndent(val, "", " ")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("failed to unmarshal metrics : %v", err)
|
return fmt.Errorf("failed to unmarshal metrics : %v", err)
|
||||||
|
@ -236,7 +248,7 @@ func FormatPrometheusMetrics(out io.Writer, url string, formatType string) error
|
||||||
return nil
|
return nil
|
||||||
|
|
||||||
} else if formatType == "raw" {
|
} else if formatType == "raw" {
|
||||||
for _, val := range []interface{}{acquis_stats, parsers_stats, buckets_stats, lapi_stats, lapi_bouncer_stats, lapi_machine_stats, lapi_decisions_stats, decisions_stats, alerts_stats} {
|
for _, val := range []interface{}{acquis_stats, parsers_stats, buckets_stats, lapi_stats, lapi_bouncer_stats, lapi_machine_stats, lapi_decisions_stats, decisions_stats, alerts_stats, stash_stats} {
|
||||||
x, err := yaml.Marshal(val)
|
x, err := yaml.Marshal(val)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("failed to unmarshal metrics : %v", err)
|
return fmt.Errorf("failed to unmarshal metrics : %v", err)
|
||||||
|
|
|
@ -129,6 +129,41 @@ func parserStatsTable(out io.Writer, stats map[string]map[string]int) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func stashStatsTable(out io.Writer, stats map[string]struct {
|
||||||
|
Type string
|
||||||
|
Count int
|
||||||
|
}) {
|
||||||
|
|
||||||
|
t := newTable(out)
|
||||||
|
t.SetRowLines(false)
|
||||||
|
t.SetHeaders("Name", "Type", "Items")
|
||||||
|
t.SetAlignment(table.AlignLeft, table.AlignLeft, table.AlignLeft)
|
||||||
|
|
||||||
|
// unfortunately, we can't reuse metricsToTable as the structure is too different :/
|
||||||
|
sortedKeys := []string{}
|
||||||
|
for k := range stats {
|
||||||
|
sortedKeys = append(sortedKeys, k)
|
||||||
|
}
|
||||||
|
sort.Strings(sortedKeys)
|
||||||
|
|
||||||
|
numRows := 0
|
||||||
|
for _, alabel := range sortedKeys {
|
||||||
|
astats := stats[alabel]
|
||||||
|
|
||||||
|
row := []string{
|
||||||
|
alabel,
|
||||||
|
astats.Type,
|
||||||
|
fmt.Sprintf("%d", astats.Count),
|
||||||
|
}
|
||||||
|
t.AddRow(row...)
|
||||||
|
numRows++
|
||||||
|
}
|
||||||
|
if numRows > 0 {
|
||||||
|
renderTableTitle(out, "\nParser Stash Metrics:")
|
||||||
|
t.Render()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func lapiStatsTable(out io.Writer, stats map[string]map[string]int) {
|
func lapiStatsTable(out io.Writer, stats map[string]map[string]int) {
|
||||||
t := newTable(out)
|
t := newTable(out)
|
||||||
t.SetRowLines(false)
|
t.SetRowLines(false)
|
||||||
|
|
|
@ -5,6 +5,7 @@ import (
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
v1 "github.com/crowdsecurity/crowdsec/pkg/apiserver/controllers/v1"
|
v1 "github.com/crowdsecurity/crowdsec/pkg/apiserver/controllers/v1"
|
||||||
|
"github.com/crowdsecurity/crowdsec/pkg/cache"
|
||||||
"github.com/crowdsecurity/crowdsec/pkg/csconfig"
|
"github.com/crowdsecurity/crowdsec/pkg/csconfig"
|
||||||
"github.com/crowdsecurity/crowdsec/pkg/cwversion"
|
"github.com/crowdsecurity/crowdsec/pkg/cwversion"
|
||||||
"github.com/crowdsecurity/crowdsec/pkg/database"
|
"github.com/crowdsecurity/crowdsec/pkg/database"
|
||||||
|
@ -100,6 +101,10 @@ var globalPourHistogram = prometheus.NewHistogramVec(
|
||||||
|
|
||||||
func computeDynamicMetrics(next http.Handler, dbClient *database.Client) http.HandlerFunc {
|
func computeDynamicMetrics(next http.Handler, dbClient *database.Client) http.HandlerFunc {
|
||||||
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||||
|
//update cache metrics (stash)
|
||||||
|
cache.UpdateCacheMetrics()
|
||||||
|
|
||||||
|
//decision metrics are only relevant for LAPI
|
||||||
if dbClient == nil {
|
if dbClient == nil {
|
||||||
next.ServeHTTP(w, r)
|
next.ServeHTTP(w, r)
|
||||||
return
|
return
|
||||||
|
@ -160,7 +165,8 @@ func registerPrometheus(config *csconfig.PrometheusCfg) {
|
||||||
globalCsInfo, globalParsingHistogram, globalPourHistogram,
|
globalCsInfo, globalParsingHistogram, globalPourHistogram,
|
||||||
leaky.BucketsUnderflow, leaky.BucketsCanceled, leaky.BucketsInstantiation, leaky.BucketsOverflow,
|
leaky.BucketsUnderflow, leaky.BucketsCanceled, leaky.BucketsInstantiation, leaky.BucketsOverflow,
|
||||||
v1.LapiRouteHits,
|
v1.LapiRouteHits,
|
||||||
leaky.BucketsCurrentCount)
|
leaky.BucketsCurrentCount,
|
||||||
|
cache.CacheMetrics)
|
||||||
} else {
|
} else {
|
||||||
log.Infof("Loading prometheus collectors")
|
log.Infof("Loading prometheus collectors")
|
||||||
prometheus.MustRegister(globalParserHits, globalParserHitsOk, globalParserHitsKo,
|
prometheus.MustRegister(globalParserHits, globalParserHitsOk, globalParserHitsKo,
|
||||||
|
@ -168,7 +174,8 @@ func registerPrometheus(config *csconfig.PrometheusCfg) {
|
||||||
globalCsInfo, globalParsingHistogram, globalPourHistogram,
|
globalCsInfo, globalParsingHistogram, globalPourHistogram,
|
||||||
v1.LapiRouteHits, v1.LapiMachineHits, v1.LapiBouncerHits, v1.LapiNilDecisions, v1.LapiNonNilDecisions, v1.LapiResponseTime,
|
v1.LapiRouteHits, v1.LapiMachineHits, v1.LapiBouncerHits, v1.LapiNilDecisions, v1.LapiNonNilDecisions, v1.LapiResponseTime,
|
||||||
leaky.BucketsPour, leaky.BucketsUnderflow, leaky.BucketsCanceled, leaky.BucketsInstantiation, leaky.BucketsOverflow, leaky.BucketsCurrentCount,
|
leaky.BucketsPour, leaky.BucketsUnderflow, leaky.BucketsCanceled, leaky.BucketsInstantiation, leaky.BucketsOverflow, leaky.BucketsCurrentCount,
|
||||||
globalActiveDecisions, globalAlerts)
|
globalActiveDecisions, globalAlerts,
|
||||||
|
cache.CacheMetrics)
|
||||||
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
1
go.mod
1
go.mod
|
@ -91,6 +91,7 @@ require (
|
||||||
github.com/apparentlymart/go-textseg/v13 v13.0.0 // indirect
|
github.com/apparentlymart/go-textseg/v13 v13.0.0 // indirect
|
||||||
github.com/asaskevich/govalidator v0.0.0-20200907205600-7a23bdc65eef // indirect
|
github.com/asaskevich/govalidator v0.0.0-20200907205600-7a23bdc65eef // indirect
|
||||||
github.com/beorn7/perks v1.0.1 // indirect
|
github.com/beorn7/perks v1.0.1 // indirect
|
||||||
|
github.com/bluele/gcache v0.0.2 // indirect
|
||||||
github.com/cespare/xxhash/v2 v2.1.2 // indirect
|
github.com/cespare/xxhash/v2 v2.1.2 // indirect
|
||||||
github.com/containerd/containerd v1.6.2 // indirect
|
github.com/containerd/containerd v1.6.2 // indirect
|
||||||
github.com/cpuguy83/go-md2man/v2 v2.0.2 // indirect
|
github.com/cpuguy83/go-md2man/v2 v2.0.2 // indirect
|
||||||
|
|
2
go.sum
2
go.sum
|
@ -121,6 +121,8 @@ github.com/bketelsen/crypt v0.0.3-0.20200106085610-5cbc8cc4026c/go.mod h1:MKsuJm
|
||||||
github.com/blackfireio/osinfo v1.0.3 h1:Yk2t2GTPjBcESv6nDSWZKO87bGMQgO+Hi9OoXPpxX8c=
|
github.com/blackfireio/osinfo v1.0.3 h1:Yk2t2GTPjBcESv6nDSWZKO87bGMQgO+Hi9OoXPpxX8c=
|
||||||
github.com/blackfireio/osinfo v1.0.3/go.mod h1:Pd987poVNmd5Wsx6PRPw4+w7kLlf9iJxoRKPtPAjOrA=
|
github.com/blackfireio/osinfo v1.0.3/go.mod h1:Pd987poVNmd5Wsx6PRPw4+w7kLlf9iJxoRKPtPAjOrA=
|
||||||
github.com/blang/semver v3.5.1+incompatible/go.mod h1:kRBLl5iJ+tD4TcOOxsy/0fnwebNt5EWlYSAyrTnjyyk=
|
github.com/blang/semver v3.5.1+incompatible/go.mod h1:kRBLl5iJ+tD4TcOOxsy/0fnwebNt5EWlYSAyrTnjyyk=
|
||||||
|
github.com/bluele/gcache v0.0.2 h1:WcbfdXICg7G/DGBh1PFfcirkWOQV+v077yF1pSy3DGw=
|
||||||
|
github.com/bluele/gcache v0.0.2/go.mod h1:m15KV+ECjptwSPxKhOhQoAFQVtUFjTVkc3H8o0t/fp0=
|
||||||
github.com/buger/jsonparser v1.1.1 h1:2PnMjfWD7wBILjqQbt530v576A/cAbQvEW9gGIpYMUs=
|
github.com/buger/jsonparser v1.1.1 h1:2PnMjfWD7wBILjqQbt530v576A/cAbQvEW9gGIpYMUs=
|
||||||
github.com/buger/jsonparser v1.1.1/go.mod h1:6RYKKt7H4d4+iWqouImQ9R2FZql3VbhNgx27UK13J/0=
|
github.com/buger/jsonparser v1.1.1/go.mod h1:6RYKKt7H4d4+iWqouImQ9R2FZql3VbhNgx27UK13J/0=
|
||||||
github.com/c-robinson/iplib v1.0.3 h1:NG0UF0GoEsrC1/vyfX1Lx2Ss7CySWl3KqqXh3q4DdPU=
|
github.com/c-robinson/iplib v1.0.3 h1:NG0UF0GoEsrC1/vyfX1Lx2Ss7CySWl3KqqXh3q4DdPU=
|
||||||
|
|
119
pkg/cache/cache.go
vendored
Normal file
119
pkg/cache/cache.go
vendored
Normal file
|
@ -0,0 +1,119 @@
|
||||||
|
package cache
|
||||||
|
|
||||||
|
import (
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/bluele/gcache"
|
||||||
|
"github.com/crowdsecurity/crowdsec/pkg/types"
|
||||||
|
"github.com/prometheus/client_golang/prometheus"
|
||||||
|
"github.com/sirupsen/logrus"
|
||||||
|
log "github.com/sirupsen/logrus"
|
||||||
|
)
|
||||||
|
|
||||||
|
var Caches []gcache.Cache
|
||||||
|
var CacheNames []string
|
||||||
|
var CacheConfig []CacheCfg
|
||||||
|
|
||||||
|
/*prometheus*/
|
||||||
|
var CacheMetrics = prometheus.NewGaugeVec(
|
||||||
|
prometheus.GaugeOpts{
|
||||||
|
Name: "cs_cache_size",
|
||||||
|
Help: "Entries per cache.",
|
||||||
|
},
|
||||||
|
[]string{"name", "type"},
|
||||||
|
)
|
||||||
|
|
||||||
|
// UpdateCacheMetrics is called directly by the prom handler
|
||||||
|
func UpdateCacheMetrics() {
|
||||||
|
CacheMetrics.Reset()
|
||||||
|
for i, name := range CacheNames {
|
||||||
|
CacheMetrics.With(prometheus.Labels{"name": name, "type": CacheConfig[i].Strategy}).Set(float64(Caches[i].Len(false)))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
type CacheCfg struct {
|
||||||
|
Name string
|
||||||
|
Size int
|
||||||
|
TTL time.Duration
|
||||||
|
Strategy string
|
||||||
|
LogLevel *log.Level
|
||||||
|
Logger *log.Entry
|
||||||
|
}
|
||||||
|
|
||||||
|
func CacheInit(cfg CacheCfg) error {
|
||||||
|
|
||||||
|
for _, name := range CacheNames {
|
||||||
|
if name == cfg.Name {
|
||||||
|
log.Infof("Cache %s already exists", cfg.Name)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
//get a default logger
|
||||||
|
if cfg.LogLevel == nil {
|
||||||
|
cfg.LogLevel = new(log.Level)
|
||||||
|
*cfg.LogLevel = log.InfoLevel
|
||||||
|
}
|
||||||
|
var clog = logrus.New()
|
||||||
|
if err := types.ConfigureLogger(clog); err != nil {
|
||||||
|
log.Fatalf("While creating cache logger : %s", err)
|
||||||
|
}
|
||||||
|
clog.SetLevel(*cfg.LogLevel)
|
||||||
|
cfg.Logger = clog.WithFields(log.Fields{
|
||||||
|
"cache": cfg.Name,
|
||||||
|
})
|
||||||
|
|
||||||
|
tmpCache := gcache.New(cfg.Size)
|
||||||
|
switch cfg.Strategy {
|
||||||
|
case "LRU":
|
||||||
|
tmpCache = tmpCache.LRU()
|
||||||
|
case "LFU":
|
||||||
|
tmpCache = tmpCache.LFU()
|
||||||
|
case "ARC":
|
||||||
|
tmpCache = tmpCache.ARC()
|
||||||
|
default:
|
||||||
|
cfg.Strategy = "LRU"
|
||||||
|
tmpCache = tmpCache.LRU()
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
CTICache := tmpCache.Build()
|
||||||
|
Caches = append(Caches, CTICache)
|
||||||
|
CacheNames = append(CacheNames, cfg.Name)
|
||||||
|
CacheConfig = append(CacheConfig, cfg)
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func SetKey(cacheName string, key string, value string, expiration *time.Duration) error {
|
||||||
|
|
||||||
|
for i, name := range CacheNames {
|
||||||
|
if name == cacheName {
|
||||||
|
if expiration == nil {
|
||||||
|
expiration = &CacheConfig[i].TTL
|
||||||
|
}
|
||||||
|
CacheConfig[i].Logger.Debugf("Setting key %s to %s with expiration %v", key, value, *expiration)
|
||||||
|
if err := Caches[i].SetWithExpire(key, value, *expiration); err != nil {
|
||||||
|
CacheConfig[i].Logger.Warningf("While setting key %s in cache %s: %s", key, cacheName, err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func GetKey(cacheName string, key string) (string, error) {
|
||||||
|
for i, name := range CacheNames {
|
||||||
|
if name == cacheName {
|
||||||
|
if value, err := Caches[i].Get(key); err != nil {
|
||||||
|
//do not warn or log if key not found
|
||||||
|
if err == gcache.KeyNotFoundError {
|
||||||
|
return "", nil
|
||||||
|
}
|
||||||
|
CacheConfig[i].Logger.Warningf("While getting key %s in cache %s: %s", key, cacheName, err)
|
||||||
|
return "", err
|
||||||
|
} else {
|
||||||
|
return value.(string), nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
log.Warningf("Cache %s not found", cacheName)
|
||||||
|
return "", nil
|
||||||
|
}
|
30
pkg/cache/cache_test.go
vendored
Normal file
30
pkg/cache/cache_test.go
vendored
Normal file
|
@ -0,0 +1,30 @@
|
||||||
|
package cache
|
||||||
|
|
||||||
|
import (
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/stretchr/testify/assert"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestCreateSetGet(t *testing.T) {
|
||||||
|
err := CacheInit(CacheCfg{Name: "test", Size: 100, TTL: 1 * time.Second})
|
||||||
|
assert.Empty(t, err)
|
||||||
|
//set & get
|
||||||
|
err = SetKey("test", "testkey0", "testvalue1", nil)
|
||||||
|
assert.Empty(t, err)
|
||||||
|
|
||||||
|
ret, err := GetKey("test", "testkey0")
|
||||||
|
assert.Equal(t, "testvalue1", ret)
|
||||||
|
assert.Empty(t, err)
|
||||||
|
//re-set
|
||||||
|
err = SetKey("test", "testkey0", "testvalue2", nil)
|
||||||
|
assert.Empty(t, err)
|
||||||
|
assert.Equal(t, "testvalue1", ret)
|
||||||
|
assert.Empty(t, err)
|
||||||
|
//expire
|
||||||
|
time.Sleep(1500 * time.Millisecond)
|
||||||
|
ret, err = GetKey("test", "testkey0")
|
||||||
|
assert.Equal(t, "", ret)
|
||||||
|
assert.Empty(t, err)
|
||||||
|
}
|
|
@ -14,6 +14,7 @@ import (
|
||||||
|
|
||||||
"github.com/c-robinson/iplib"
|
"github.com/c-robinson/iplib"
|
||||||
|
|
||||||
|
"github.com/crowdsecurity/crowdsec/pkg/cache"
|
||||||
"github.com/crowdsecurity/crowdsec/pkg/database"
|
"github.com/crowdsecurity/crowdsec/pkg/database"
|
||||||
"github.com/davecgh/go-spew/spew"
|
"github.com/davecgh/go-spew/spew"
|
||||||
log "github.com/sirupsen/logrus"
|
log "github.com/sirupsen/logrus"
|
||||||
|
@ -69,6 +70,8 @@ func GetExprEnv(ctx map[string]interface{}) map[string]interface{} {
|
||||||
"GetDecisionsSinceCount": GetDecisionsSinceCount,
|
"GetDecisionsSinceCount": GetDecisionsSinceCount,
|
||||||
"Sprintf": fmt.Sprintf,
|
"Sprintf": fmt.Sprintf,
|
||||||
"ParseUnix": ParseUnix,
|
"ParseUnix": ParseUnix,
|
||||||
|
"GetFromStash": cache.GetKey,
|
||||||
|
"SetInStash": cache.SetKey,
|
||||||
}
|
}
|
||||||
for k, v := range ctx {
|
for k, v := range ctx {
|
||||||
ExprLib[k] = v
|
ExprLib[k] = v
|
||||||
|
|
|
@ -4,6 +4,7 @@ import (
|
||||||
"fmt"
|
"fmt"
|
||||||
"net"
|
"net"
|
||||||
"strings"
|
"strings"
|
||||||
|
"time"
|
||||||
|
|
||||||
"github.com/antonmedv/expr"
|
"github.com/antonmedv/expr"
|
||||||
"github.com/crowdsecurity/grokky"
|
"github.com/crowdsecurity/grokky"
|
||||||
|
@ -11,6 +12,7 @@ import (
|
||||||
yaml "gopkg.in/yaml.v2"
|
yaml "gopkg.in/yaml.v2"
|
||||||
|
|
||||||
"github.com/antonmedv/expr/vm"
|
"github.com/antonmedv/expr/vm"
|
||||||
|
"github.com/crowdsecurity/crowdsec/pkg/cache"
|
||||||
"github.com/crowdsecurity/crowdsec/pkg/exprhelpers"
|
"github.com/crowdsecurity/crowdsec/pkg/exprhelpers"
|
||||||
"github.com/crowdsecurity/crowdsec/pkg/types"
|
"github.com/crowdsecurity/crowdsec/pkg/types"
|
||||||
"github.com/davecgh/go-spew/spew"
|
"github.com/davecgh/go-spew/spew"
|
||||||
|
@ -57,6 +59,8 @@ type Node struct {
|
||||||
Grok types.GrokPattern `yaml:"grok,omitempty"`
|
Grok types.GrokPattern `yaml:"grok,omitempty"`
|
||||||
//Statics can be present in any type of node and is executed last
|
//Statics can be present in any type of node and is executed last
|
||||||
Statics []types.ExtraField `yaml:"statics,omitempty"`
|
Statics []types.ExtraField `yaml:"statics,omitempty"`
|
||||||
|
//Stash allows to capture data from the log line and store it in an accessible cache
|
||||||
|
Stash []types.DataCapture `yaml:"stash,omitempty"`
|
||||||
//Whitelists
|
//Whitelists
|
||||||
Whitelist Whitelist `yaml:"whitelist,omitempty"`
|
Whitelist Whitelist `yaml:"whitelist,omitempty"`
|
||||||
Data []*types.DataSource `yaml:"data,omitempty"`
|
Data []*types.DataSource `yaml:"data,omitempty"`
|
||||||
|
@ -103,6 +107,25 @@ func (n *Node) validate(pctx *UnixParserCtx, ectx EnricherCtx) error {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
for idx, stash := range n.Stash {
|
||||||
|
if stash.Name == "" {
|
||||||
|
return fmt.Errorf("stash %d : name must be set", idx)
|
||||||
|
}
|
||||||
|
if stash.Value == "" {
|
||||||
|
return fmt.Errorf("stash %s : value expression must be set", stash.Name)
|
||||||
|
}
|
||||||
|
if stash.Key == "" {
|
||||||
|
return fmt.Errorf("stash %s : key expression must be set", stash.Name)
|
||||||
|
}
|
||||||
|
if stash.TTL == "" {
|
||||||
|
return fmt.Errorf("stash %s : ttl must be set", stash.Name)
|
||||||
|
}
|
||||||
|
//should be configurable
|
||||||
|
if stash.MaxMapSize == 0 {
|
||||||
|
stash.MaxMapSize = 100
|
||||||
|
}
|
||||||
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -285,6 +308,50 @@ func (n *Node) process(p *types.Event, ctx UnixParserCtx, expressionEnv map[stri
|
||||||
clog.Tracef("! No grok pattern : %p", n.Grok.RunTimeRegexp)
|
clog.Tracef("! No grok pattern : %p", n.Grok.RunTimeRegexp)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
//Process the stash (data collection) if : a grok was present and succeeded, or if there is no grok
|
||||||
|
if NodeHasOKGrok || n.Grok.RunTimeRegexp == nil {
|
||||||
|
for idx, stash := range n.Stash {
|
||||||
|
var value string
|
||||||
|
var key string
|
||||||
|
if stash.ValueExpression == nil {
|
||||||
|
clog.Warningf("Stash %d has no value expression, skipping", idx)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
if stash.KeyExpression == nil {
|
||||||
|
clog.Warningf("Stash %d has no key expression, skipping", idx)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
//collect the data
|
||||||
|
output, err := expr.Run(stash.ValueExpression, cachedExprEnv)
|
||||||
|
if err != nil {
|
||||||
|
clog.Warningf("Error while running stash val expression : %v", err)
|
||||||
|
}
|
||||||
|
//can we expect anything else than a string ?
|
||||||
|
switch output := output.(type) {
|
||||||
|
case string:
|
||||||
|
value = output
|
||||||
|
default:
|
||||||
|
clog.Warningf("unexpected type %t (%v) while running '%s'", output, output, stash.Value)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
//collect the key
|
||||||
|
output, err = expr.Run(stash.KeyExpression, cachedExprEnv)
|
||||||
|
if err != nil {
|
||||||
|
clog.Warningf("Error while running stash key expression : %v", err)
|
||||||
|
}
|
||||||
|
//can we expect anything else than a string ?
|
||||||
|
switch output := output.(type) {
|
||||||
|
case string:
|
||||||
|
key = output
|
||||||
|
default:
|
||||||
|
clog.Warningf("unexpected type %t (%v) while running '%s'", output, output, stash.Key)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
cache.SetKey(stash.Name, key, value, &stash.TTLVal)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
//Iterate on leafs
|
//Iterate on leafs
|
||||||
if len(n.LeavesNodes) > 0 {
|
if len(n.LeavesNodes) > 0 {
|
||||||
for _, leaf := range n.LeavesNodes {
|
for _, leaf := range n.LeavesNodes {
|
||||||
|
@ -434,10 +501,10 @@ func (n *Node) compile(pctx *UnixParserCtx, ectx EnricherCtx) error {
|
||||||
n.Logger.Tracef("+ Regexp Compilation '%s'", n.Grok.RegexpName)
|
n.Logger.Tracef("+ Regexp Compilation '%s'", n.Grok.RegexpName)
|
||||||
n.Grok.RunTimeRegexp, err = pctx.Grok.Get(n.Grok.RegexpName)
|
n.Grok.RunTimeRegexp, err = pctx.Grok.Get(n.Grok.RegexpName)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("Unable to find grok '%s' : %v", n.Grok.RegexpName, err)
|
return fmt.Errorf("unable to find grok '%s' : %v", n.Grok.RegexpName, err)
|
||||||
}
|
}
|
||||||
if n.Grok.RunTimeRegexp == nil {
|
if n.Grok.RunTimeRegexp == nil {
|
||||||
return fmt.Errorf("Empty grok '%s'", n.Grok.RegexpName)
|
return fmt.Errorf("empty grok '%s'", n.Grok.RegexpName)
|
||||||
}
|
}
|
||||||
n.Logger.Tracef("%s regexp: %s", n.Grok.RegexpName, n.Grok.RunTimeRegexp.Regexp.String())
|
n.Logger.Tracef("%s regexp: %s", n.Grok.RegexpName, n.Grok.RunTimeRegexp.Regexp.String())
|
||||||
valid = true
|
valid = true
|
||||||
|
@ -447,11 +514,11 @@ func (n *Node) compile(pctx *UnixParserCtx, ectx EnricherCtx) error {
|
||||||
}
|
}
|
||||||
n.Grok.RunTimeRegexp, err = pctx.Grok.Compile(n.Grok.RegexpValue)
|
n.Grok.RunTimeRegexp, err = pctx.Grok.Compile(n.Grok.RegexpValue)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("Failed to compile grok '%s': %v\n", n.Grok.RegexpValue, err)
|
return fmt.Errorf("failed to compile grok '%s': %v", n.Grok.RegexpValue, err)
|
||||||
}
|
}
|
||||||
if n.Grok.RunTimeRegexp == nil {
|
if n.Grok.RunTimeRegexp == nil {
|
||||||
// We shouldn't be here because compilation succeeded, so regexp shouldn't be nil
|
// We shouldn't be here because compilation succeeded, so regexp shouldn't be nil
|
||||||
return fmt.Errorf("Grok compilation failure: %s", n.Grok.RegexpValue)
|
return fmt.Errorf("grok compilation failure: %s", n.Grok.RegexpValue)
|
||||||
}
|
}
|
||||||
n.Logger.Tracef("%s regexp : %s", n.Grok.RegexpValue, n.Grok.RunTimeRegexp.Regexp.String())
|
n.Logger.Tracef("%s regexp : %s", n.Grok.RegexpValue, n.Grok.RunTimeRegexp.Regexp.String())
|
||||||
valid = true
|
valid = true
|
||||||
|
@ -480,6 +547,38 @@ func (n *Node) compile(pctx *UnixParserCtx, ectx EnricherCtx) error {
|
||||||
}
|
}
|
||||||
valid = true
|
valid = true
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/* load data capture (stash) */
|
||||||
|
for i, stash := range n.Stash {
|
||||||
|
n.Stash[i].ValueExpression, err = expr.Compile(stash.Value,
|
||||||
|
expr.Env(exprhelpers.GetExprEnv(map[string]interface{}{"evt": &types.Event{}})))
|
||||||
|
if err != nil {
|
||||||
|
return errors.Wrap(err, "while compiling stash value expression")
|
||||||
|
}
|
||||||
|
|
||||||
|
n.Stash[i].KeyExpression, err = expr.Compile(stash.Key,
|
||||||
|
expr.Env(exprhelpers.GetExprEnv(map[string]interface{}{"evt": &types.Event{}})))
|
||||||
|
if err != nil {
|
||||||
|
return errors.Wrap(err, "while compiling stash key expression")
|
||||||
|
}
|
||||||
|
|
||||||
|
n.Stash[i].TTLVal, err = time.ParseDuration(stash.TTL)
|
||||||
|
if err != nil {
|
||||||
|
return errors.Wrap(err, "while parsing stash ttl")
|
||||||
|
}
|
||||||
|
|
||||||
|
logLvl := n.Logger.Logger.GetLevel()
|
||||||
|
//init the cache, does it make sense to create it here just to be sure everything is fine ?
|
||||||
|
if err := cache.CacheInit(cache.CacheCfg{
|
||||||
|
Size: n.Stash[i].MaxMapSize,
|
||||||
|
TTL: n.Stash[i].TTLVal,
|
||||||
|
Name: n.Stash[i].Name,
|
||||||
|
LogLevel: &logLvl,
|
||||||
|
}); err != nil {
|
||||||
|
return errors.Wrap(err, "while initializing cache")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/* compile leafs if present */
|
/* compile leafs if present */
|
||||||
if len(n.LeavesNodes) > 0 {
|
if len(n.LeavesNodes) > 0 {
|
||||||
for idx := range n.LeavesNodes {
|
for idx := range n.LeavesNodes {
|
||||||
|
|
|
@ -138,7 +138,7 @@ func testOneParser(pctx *UnixParserCtx, ectx EnricherCtx, dir string, b *testing
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
//prepTests is going to do the initialisation of parser : it's going to load enrichment plugins and load the patterns. This is done here so that we don't redo it for each test
|
// prepTests is going to do the initialisation of parser : it's going to load enrichment plugins and load the patterns. This is done here so that we don't redo it for each test
|
||||||
func prepTests() (*UnixParserCtx, EnricherCtx, error) {
|
func prepTests() (*UnixParserCtx, EnricherCtx, error) {
|
||||||
var (
|
var (
|
||||||
err error
|
err error
|
||||||
|
@ -252,6 +252,7 @@ func matchEvent(expected types.Event, out types.Event, debug bool) ([]string, bo
|
||||||
if debug {
|
if debug {
|
||||||
retInfo = append(retInfo, fmt.Sprintf("mismatch %s[%s] %s != %s", outLabels[mapIdx], expKey, expVal, outVal))
|
retInfo = append(retInfo, fmt.Sprintf("mismatch %s[%s] %s != %s", outLabels[mapIdx], expKey, expVal, outVal))
|
||||||
}
|
}
|
||||||
|
valid = false
|
||||||
goto checkFinished
|
goto checkFinished
|
||||||
}
|
}
|
||||||
} else { //missing entry
|
} else { //missing entry
|
||||||
|
@ -266,11 +267,11 @@ func matchEvent(expected types.Event, out types.Event, debug bool) ([]string, bo
|
||||||
checkFinished:
|
checkFinished:
|
||||||
if valid {
|
if valid {
|
||||||
if debug {
|
if debug {
|
||||||
retInfo = append(retInfo, fmt.Sprintf("OK ! %s", strings.Join(retInfo, "/")))
|
retInfo = append(retInfo, fmt.Sprintf("OK ! \n\t%s", strings.Join(retInfo, "\n\t")))
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
if debug {
|
if debug {
|
||||||
retInfo = append(retInfo, fmt.Sprintf("KO ! %s", strings.Join(retInfo, "/")))
|
retInfo = append(retInfo, fmt.Sprintf("KO ! \n\t%s", strings.Join(retInfo, "\n\t")))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return retInfo, valid
|
return retInfo, valid
|
||||||
|
|
Binary file not shown.
Before Width: | Height: | Size: 3.1 KiB After Width: | Height: | Size: 12 KiB |
31
pkg/parser/tests/base-grok-stash/base-grok-stash.yaml
Normal file
31
pkg/parser/tests/base-grok-stash/base-grok-stash.yaml
Normal file
|
@ -0,0 +1,31 @@
|
||||||
|
filter: "evt.Line.Labels.type == 'testlog'"
|
||||||
|
debug: true
|
||||||
|
onsuccess: next_stage
|
||||||
|
name: tests/base-grok-stash
|
||||||
|
pattern_syntax:
|
||||||
|
TEST_START: start %{DATA:program} thing with pid %{NUMBER:pid}
|
||||||
|
TEST_CONTINUED: pid %{NUMBER:pid} did a forbidden thing
|
||||||
|
nodes:
|
||||||
|
- #name: tests/base-grok-stash-sub-start
|
||||||
|
grok:
|
||||||
|
name: "TEST_START"
|
||||||
|
apply_on: Line.Raw
|
||||||
|
statics:
|
||||||
|
- meta: log_type
|
||||||
|
value: test_start
|
||||||
|
stash:
|
||||||
|
- name: test_program_pid_assoc
|
||||||
|
key: evt.Parsed.pid
|
||||||
|
value: evt.Parsed.program
|
||||||
|
ttl: 30s
|
||||||
|
size: 10
|
||||||
|
- #name: tests/base-grok-stash-sub-cont
|
||||||
|
grok:
|
||||||
|
name: "TEST_CONTINUED"
|
||||||
|
apply_on: Line.Raw
|
||||||
|
statics:
|
||||||
|
- meta: log_type
|
||||||
|
value: test_continue
|
||||||
|
- meta: associated_prog_name
|
||||||
|
expression: GetFromStash("test_program_pid_assoc", evt.Parsed.pid)
|
||||||
|
|
2
pkg/parser/tests/base-grok-stash/parsers.yaml
Normal file
2
pkg/parser/tests/base-grok-stash/parsers.yaml
Normal file
|
@ -0,0 +1,2 @@
|
||||||
|
- filename: {{.TestDirectory}}/base-grok-stash.yaml
|
||||||
|
stage: s00-raw
|
63
pkg/parser/tests/base-grok-stash/test.yaml
Normal file
63
pkg/parser/tests/base-grok-stash/test.yaml
Normal file
|
@ -0,0 +1,63 @@
|
||||||
|
#these are the events we input into parser
|
||||||
|
lines:
|
||||||
|
- Line:
|
||||||
|
Labels:
|
||||||
|
type: testlog
|
||||||
|
Raw: start foobar thing with pid 12
|
||||||
|
- Line:
|
||||||
|
Labels:
|
||||||
|
type: testlog
|
||||||
|
Raw: start toto thing with pid 42
|
||||||
|
- Line:
|
||||||
|
Labels:
|
||||||
|
type: testlog
|
||||||
|
Raw: pid 12 did a forbidden thing
|
||||||
|
- Line:
|
||||||
|
Labels:
|
||||||
|
type: testlog
|
||||||
|
Raw: pid 42 did a forbidden thing
|
||||||
|
- Line:
|
||||||
|
Labels:
|
||||||
|
type: testlog
|
||||||
|
Raw: pid 45 did a forbidden thing
|
||||||
|
#these are the results we expect from the parser
|
||||||
|
results:
|
||||||
|
|
||||||
|
- Meta:
|
||||||
|
log_type: test_start
|
||||||
|
Parsed:
|
||||||
|
program: foobar
|
||||||
|
pid: "12"
|
||||||
|
Process: true
|
||||||
|
Stage: s00-raw
|
||||||
|
|
||||||
|
- Meta:
|
||||||
|
log_type: test_start
|
||||||
|
Parsed:
|
||||||
|
program: toto
|
||||||
|
pid: "42"
|
||||||
|
Process: true
|
||||||
|
Stage: s00-raw
|
||||||
|
|
||||||
|
- Meta:
|
||||||
|
log_type: test_continue
|
||||||
|
associated_prog_name: foobar
|
||||||
|
Parsed:
|
||||||
|
pid: "12"
|
||||||
|
Process: true
|
||||||
|
Stage: s00-raw
|
||||||
|
|
||||||
|
- Meta:
|
||||||
|
log_type: test_continue
|
||||||
|
associated_prog_name: toto
|
||||||
|
Parsed:
|
||||||
|
pid: "42"
|
||||||
|
Process: true
|
||||||
|
Stage: s00-raw
|
||||||
|
|
||||||
|
- Meta:
|
||||||
|
log_type: test_continue
|
||||||
|
Parsed:
|
||||||
|
pid: "45"
|
||||||
|
Process: true
|
||||||
|
Stage: s00-raw
|
|
@ -1,5 +1,6 @@
|
||||||
filter: "'source_ip' in evt.Meta"
|
filter: "'source_ip' in evt.Meta"
|
||||||
name: tests/geoip-enrich
|
name: tests/geoip-enrich
|
||||||
|
debug: true
|
||||||
description: "Populate event with geoloc info : as, country, coords, source range."
|
description: "Populate event with geoloc info : as, country, coords, source range."
|
||||||
statics:
|
statics:
|
||||||
- method: GeoIpCity
|
- method: GeoIpCity
|
||||||
|
|
|
@ -2,7 +2,7 @@
|
||||||
lines:
|
lines:
|
||||||
- Meta:
|
- Meta:
|
||||||
test: test1
|
test: test1
|
||||||
source_ip: 8.8.8.8
|
source_ip: 1.0.0.1
|
||||||
- Meta:
|
- Meta:
|
||||||
test: test2
|
test: test2
|
||||||
source_ip: 192.168.0.1
|
source_ip: 192.168.0.1
|
||||||
|
@ -10,11 +10,10 @@ lines:
|
||||||
results:
|
results:
|
||||||
- Process: true
|
- Process: true
|
||||||
Enriched:
|
Enriched:
|
||||||
IsoCode: US
|
|
||||||
IsInEU: false
|
IsInEU: false
|
||||||
ASNOrg: Google LLC
|
ASNOrg: "Google Inc."
|
||||||
Meta:
|
Meta:
|
||||||
source_ip: 8.8.8.8
|
source_ip: 1.0.0.1
|
||||||
- Process: true
|
- Process: true
|
||||||
Enriched:
|
Enriched:
|
||||||
IsInEU: false
|
IsInEU: false
|
||||||
|
|
|
@ -1,11 +1,13 @@
|
||||||
package types
|
package types
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"time"
|
||||||
|
|
||||||
"github.com/antonmedv/expr/vm"
|
"github.com/antonmedv/expr/vm"
|
||||||
"github.com/crowdsecurity/grokky"
|
"github.com/crowdsecurity/grokky"
|
||||||
)
|
)
|
||||||
|
|
||||||
//Used mostly for statics
|
// Used mostly for statics
|
||||||
type ExtraField struct {
|
type ExtraField struct {
|
||||||
//if the target is indicated by name Struct.Field etc,
|
//if the target is indicated by name Struct.Field etc,
|
||||||
TargetByName string `yaml:"target,omitempty"`
|
TargetByName string `yaml:"target,omitempty"`
|
||||||
|
@ -39,3 +41,14 @@ type GrokPattern struct {
|
||||||
//a grok can contain statics that apply if pattern is successful
|
//a grok can contain statics that apply if pattern is successful
|
||||||
Statics []ExtraField `yaml:"statics,omitempty"`
|
Statics []ExtraField `yaml:"statics,omitempty"`
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type DataCapture struct {
|
||||||
|
Name string `yaml:"name,omitempty"`
|
||||||
|
Key string `yaml:"key,omitempty"`
|
||||||
|
KeyExpression *vm.Program `yaml:"-"`
|
||||||
|
Value string `yaml:"value,omitempty"`
|
||||||
|
ValueExpression *vm.Program `yaml:"-"`
|
||||||
|
TTL string `yaml:"ttl,omitempty"`
|
||||||
|
TTLVal time.Duration `yaml:"-"`
|
||||||
|
MaxMapSize int `yaml:"size,omitempty"`
|
||||||
|
}
|
||||||
|
|
Loading…
Reference in a new issue