acquisition : take prometheus level into account (#2885)
* properly take into account the aggregation level of prometheus metrics in acquisition
This commit is contained in:
parent
bd785ede15
commit
b1c09f7512
31 changed files with 245 additions and 128 deletions
|
@ -14,6 +14,7 @@ import (
|
||||||
"github.com/crowdsecurity/go-cs-lib/trace"
|
"github.com/crowdsecurity/go-cs-lib/trace"
|
||||||
|
|
||||||
"github.com/crowdsecurity/crowdsec/pkg/acquisition"
|
"github.com/crowdsecurity/crowdsec/pkg/acquisition"
|
||||||
|
"github.com/crowdsecurity/crowdsec/pkg/acquisition/configuration"
|
||||||
"github.com/crowdsecurity/crowdsec/pkg/alertcontext"
|
"github.com/crowdsecurity/crowdsec/pkg/alertcontext"
|
||||||
"github.com/crowdsecurity/crowdsec/pkg/appsec"
|
"github.com/crowdsecurity/crowdsec/pkg/appsec"
|
||||||
"github.com/crowdsecurity/crowdsec/pkg/csconfig"
|
"github.com/crowdsecurity/crowdsec/pkg/csconfig"
|
||||||
|
@ -147,7 +148,7 @@ func runCrowdsec(cConfig *csconfig.Config, parsers *parser.Parsers, hub *cwhub.H
|
||||||
|
|
||||||
if cConfig.Prometheus != nil && cConfig.Prometheus.Enabled {
|
if cConfig.Prometheus != nil && cConfig.Prometheus.Enabled {
|
||||||
aggregated := false
|
aggregated := false
|
||||||
if cConfig.Prometheus.Level == "aggregated" {
|
if cConfig.Prometheus.Level == configuration.CFG_METRICS_AGGREGATE {
|
||||||
aggregated = true
|
aggregated = true
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -123,7 +123,7 @@ func LoadAcquisition(cConfig *csconfig.Config) ([]acquisition.DataSource, error)
|
||||||
return nil, fmt.Errorf("failed to configure datasource for %s: %w", flags.OneShotDSN, err)
|
return nil, fmt.Errorf("failed to configure datasource for %s: %w", flags.OneShotDSN, err)
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
dataSources, err = acquisition.LoadAcquisitionFromFile(cConfig.Crowdsec)
|
dataSources, err = acquisition.LoadAcquisitionFromFile(cConfig.Crowdsec, cConfig.Prometheus)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
|
@ -12,6 +12,7 @@ import (
|
||||||
"github.com/crowdsecurity/go-cs-lib/trace"
|
"github.com/crowdsecurity/go-cs-lib/trace"
|
||||||
"github.com/crowdsecurity/go-cs-lib/version"
|
"github.com/crowdsecurity/go-cs-lib/version"
|
||||||
|
|
||||||
|
"github.com/crowdsecurity/crowdsec/pkg/acquisition/configuration"
|
||||||
v1 "github.com/crowdsecurity/crowdsec/pkg/apiserver/controllers/v1"
|
v1 "github.com/crowdsecurity/crowdsec/pkg/apiserver/controllers/v1"
|
||||||
"github.com/crowdsecurity/crowdsec/pkg/cache"
|
"github.com/crowdsecurity/crowdsec/pkg/cache"
|
||||||
"github.com/crowdsecurity/crowdsec/pkg/csconfig"
|
"github.com/crowdsecurity/crowdsec/pkg/csconfig"
|
||||||
|
@ -161,7 +162,7 @@ func registerPrometheus(config *csconfig.PrometheusCfg) {
|
||||||
|
|
||||||
// Registering prometheus
|
// Registering prometheus
|
||||||
// If in aggregated mode, do not register events associated with a source, to keep the cardinality low
|
// If in aggregated mode, do not register events associated with a source, to keep the cardinality low
|
||||||
if config.Level == "aggregated" {
|
if config.Level == configuration.CFG_METRICS_AGGREGATE {
|
||||||
log.Infof("Loading aggregated prometheus collectors")
|
log.Infof("Loading aggregated prometheus collectors")
|
||||||
prometheus.MustRegister(globalParserHits, globalParserHitsOk, globalParserHitsKo,
|
prometheus.MustRegister(globalParserHits, globalParserHitsOk, globalParserHitsKo,
|
||||||
globalCsInfo, globalParsingHistogram, globalPourHistogram,
|
globalCsInfo, globalParsingHistogram, globalPourHistogram,
|
||||||
|
|
|
@ -54,7 +54,7 @@ type DataSource interface {
|
||||||
GetMetrics() []prometheus.Collector // Returns pointers to metrics that are managed by the module
|
GetMetrics() []prometheus.Collector // Returns pointers to metrics that are managed by the module
|
||||||
GetAggregMetrics() []prometheus.Collector // Returns pointers to metrics that are managed by the module (aggregated mode, limits cardinality)
|
GetAggregMetrics() []prometheus.Collector // Returns pointers to metrics that are managed by the module (aggregated mode, limits cardinality)
|
||||||
UnmarshalConfig([]byte) error // Decode and pre-validate the YAML datasource - anything that can be checked before runtime
|
UnmarshalConfig([]byte) error // Decode and pre-validate the YAML datasource - anything that can be checked before runtime
|
||||||
Configure([]byte, *log.Entry) error // Complete the YAML datasource configuration and perform runtime checks.
|
Configure([]byte, *log.Entry, int) error // Complete the YAML datasource configuration and perform runtime checks.
|
||||||
ConfigureByDSN(string, map[string]string, *log.Entry, string) error // Configure the datasource
|
ConfigureByDSN(string, map[string]string, *log.Entry, string) error // Configure the datasource
|
||||||
GetMode() string // Get the mode (TAIL, CAT or SERVER)
|
GetMode() string // Get the mode (TAIL, CAT or SERVER)
|
||||||
GetName() string // Get the name of the module
|
GetName() string // Get the name of the module
|
||||||
|
@ -94,7 +94,7 @@ func GetDataSourceIface(dataSourceType string) DataSource {
|
||||||
// if the configuration is not valid it returns an error.
|
// if the configuration is not valid it returns an error.
|
||||||
// If the datasource can't be run (eg. journalctl not available), it still returns an error which
|
// If the datasource can't be run (eg. journalctl not available), it still returns an error which
|
||||||
// can be checked for the appropriate action.
|
// can be checked for the appropriate action.
|
||||||
func DataSourceConfigure(commonConfig configuration.DataSourceCommonCfg) (*DataSource, error) {
|
func DataSourceConfigure(commonConfig configuration.DataSourceCommonCfg, metricsLevel int) (*DataSource, error) {
|
||||||
// we dump it back to []byte, because we want to decode the yaml blob twice:
|
// we dump it back to []byte, because we want to decode the yaml blob twice:
|
||||||
// once to DataSourceCommonCfg, and then later to the dedicated type of the datasource
|
// once to DataSourceCommonCfg, and then later to the dedicated type of the datasource
|
||||||
yamlConfig, err := yaml.Marshal(commonConfig)
|
yamlConfig, err := yaml.Marshal(commonConfig)
|
||||||
|
@ -122,7 +122,7 @@ func DataSourceConfigure(commonConfig configuration.DataSourceCommonCfg) (*DataS
|
||||||
return nil, &DataSourceUnavailableError{Name: commonConfig.Source, Err: err}
|
return nil, &DataSourceUnavailableError{Name: commonConfig.Source, Err: err}
|
||||||
}
|
}
|
||||||
/* configure the actual datasource */
|
/* configure the actual datasource */
|
||||||
if err := dataSrc.Configure(yamlConfig, subLogger); err != nil {
|
if err := dataSrc.Configure(yamlConfig, subLogger, metricsLevel); err != nil {
|
||||||
return nil, fmt.Errorf("failed to configure datasource %s: %w", commonConfig.Source, err)
|
return nil, fmt.Errorf("failed to configure datasource %s: %w", commonConfig.Source, err)
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -180,10 +180,30 @@ func LoadAcquisitionFromDSN(dsn string, labels map[string]string, transformExpr
|
||||||
return sources, nil
|
return sources, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func GetMetricsLevelFromPromCfg(prom *csconfig.PrometheusCfg) int {
|
||||||
|
if prom == nil {
|
||||||
|
return configuration.METRICS_FULL
|
||||||
|
|
||||||
|
}
|
||||||
|
if !prom.Enabled {
|
||||||
|
return configuration.METRICS_NONE
|
||||||
|
}
|
||||||
|
if prom.Level == configuration.CFG_METRICS_AGGREGATE {
|
||||||
|
return configuration.METRICS_AGGREGATE
|
||||||
|
}
|
||||||
|
|
||||||
|
if prom.Level == configuration.CFG_METRICS_FULL {
|
||||||
|
return configuration.METRICS_FULL
|
||||||
|
}
|
||||||
|
return configuration.METRICS_FULL
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
// LoadAcquisitionFromFile unmarshals the configuration item and checks its availability
|
// LoadAcquisitionFromFile unmarshals the configuration item and checks its availability
|
||||||
func LoadAcquisitionFromFile(config *csconfig.CrowdsecServiceCfg) ([]DataSource, error) {
|
func LoadAcquisitionFromFile(config *csconfig.CrowdsecServiceCfg, prom *csconfig.PrometheusCfg) ([]DataSource, error) {
|
||||||
var sources []DataSource
|
var sources []DataSource
|
||||||
|
|
||||||
|
metrics_level := GetMetricsLevelFromPromCfg(prom)
|
||||||
for _, acquisFile := range config.AcquisitionFiles {
|
for _, acquisFile := range config.AcquisitionFiles {
|
||||||
log.Infof("loading acquisition file : %s", acquisFile)
|
log.Infof("loading acquisition file : %s", acquisFile)
|
||||||
yamlFile, err := os.Open(acquisFile)
|
yamlFile, err := os.Open(acquisFile)
|
||||||
|
@ -225,7 +245,7 @@ func LoadAcquisitionFromFile(config *csconfig.CrowdsecServiceCfg) ([]DataSource,
|
||||||
}
|
}
|
||||||
uniqueId := uuid.NewString()
|
uniqueId := uuid.NewString()
|
||||||
sub.UniqueId = uniqueId
|
sub.UniqueId = uniqueId
|
||||||
src, err := DataSourceConfigure(sub)
|
src, err := DataSourceConfigure(sub, metrics_level)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
var dserr *DataSourceUnavailableError
|
var dserr *DataSourceUnavailableError
|
||||||
if errors.As(err, &dserr) {
|
if errors.As(err, &dserr) {
|
||||||
|
|
|
@ -35,7 +35,7 @@ func (f *MockSource) UnmarshalConfig(cfg []byte) error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (f *MockSource) Configure(cfg []byte, logger *log.Entry) error {
|
func (f *MockSource) Configure(cfg []byte, logger *log.Entry, metricsLevel int) error {
|
||||||
f.logger = logger
|
f.logger = logger
|
||||||
if err := f.UnmarshalConfig(cfg); err != nil {
|
if err := f.UnmarshalConfig(cfg); err != nil {
|
||||||
return err
|
return err
|
||||||
|
@ -182,7 +182,7 @@ wowo: ajsajasjas
|
||||||
t.Run(tc.TestName, func(t *testing.T) {
|
t.Run(tc.TestName, func(t *testing.T) {
|
||||||
common := configuration.DataSourceCommonCfg{}
|
common := configuration.DataSourceCommonCfg{}
|
||||||
yaml.Unmarshal([]byte(tc.String), &common)
|
yaml.Unmarshal([]byte(tc.String), &common)
|
||||||
ds, err := DataSourceConfigure(common)
|
ds, err := DataSourceConfigure(common, configuration.METRICS_NONE)
|
||||||
cstest.RequireErrorContains(t, err, tc.ExpectedError)
|
cstest.RequireErrorContains(t, err, tc.ExpectedError)
|
||||||
if tc.ExpectedError != "" {
|
if tc.ExpectedError != "" {
|
||||||
return
|
return
|
||||||
|
@ -283,7 +283,7 @@ func TestLoadAcquisitionFromFile(t *testing.T) {
|
||||||
for _, tc := range tests {
|
for _, tc := range tests {
|
||||||
tc := tc
|
tc := tc
|
||||||
t.Run(tc.TestName, func(t *testing.T) {
|
t.Run(tc.TestName, func(t *testing.T) {
|
||||||
dss, err := LoadAcquisitionFromFile(&tc.Config)
|
dss, err := LoadAcquisitionFromFile(&tc.Config, nil)
|
||||||
cstest.RequireErrorContains(t, err, tc.ExpectedError)
|
cstest.RequireErrorContains(t, err, tc.ExpectedError)
|
||||||
if tc.ExpectedError != "" {
|
if tc.ExpectedError != "" {
|
||||||
return
|
return
|
||||||
|
@ -305,7 +305,7 @@ type MockCat struct {
|
||||||
logger *log.Entry
|
logger *log.Entry
|
||||||
}
|
}
|
||||||
|
|
||||||
func (f *MockCat) Configure(cfg []byte, logger *log.Entry) error {
|
func (f *MockCat) Configure(cfg []byte, logger *log.Entry, metricsLevel int) error {
|
||||||
f.logger = logger
|
f.logger = logger
|
||||||
if f.Mode == "" {
|
if f.Mode == "" {
|
||||||
f.Mode = configuration.CAT_MODE
|
f.Mode = configuration.CAT_MODE
|
||||||
|
@ -349,7 +349,7 @@ type MockTail struct {
|
||||||
logger *log.Entry
|
logger *log.Entry
|
||||||
}
|
}
|
||||||
|
|
||||||
func (f *MockTail) Configure(cfg []byte, logger *log.Entry) error {
|
func (f *MockTail) Configure(cfg []byte, logger *log.Entry, metricsLevel int) error {
|
||||||
f.logger = logger
|
f.logger = logger
|
||||||
if f.Mode == "" {
|
if f.Mode == "" {
|
||||||
f.Mode = configuration.TAIL_MODE
|
f.Mode = configuration.TAIL_MODE
|
||||||
|
@ -497,8 +497,10 @@ type MockSourceByDSN struct {
|
||||||
logger *log.Entry //nolint: unused
|
logger *log.Entry //nolint: unused
|
||||||
}
|
}
|
||||||
|
|
||||||
func (f *MockSourceByDSN) UnmarshalConfig(cfg []byte) error { return nil }
|
func (f *MockSourceByDSN) UnmarshalConfig(cfg []byte) error { return nil }
|
||||||
func (f *MockSourceByDSN) Configure(cfg []byte, logger *log.Entry) error { return nil }
|
func (f *MockSourceByDSN) Configure(cfg []byte, logger *log.Entry, metricsLevel int) error {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
func (f *MockSourceByDSN) GetMode() string { return f.Mode }
|
func (f *MockSourceByDSN) GetMode() string { return f.Mode }
|
||||||
func (f *MockSourceByDSN) OneShotAcquisition(chan types.Event, *tomb.Tomb) error { return nil }
|
func (f *MockSourceByDSN) OneShotAcquisition(chan types.Event, *tomb.Tomb) error { return nil }
|
||||||
func (f *MockSourceByDSN) StreamingAcquisition(chan types.Event, *tomb.Tomb) error { return nil }
|
func (f *MockSourceByDSN) StreamingAcquisition(chan types.Event, *tomb.Tomb) error { return nil }
|
||||||
|
|
|
@ -19,3 +19,14 @@ type DataSourceCommonCfg struct {
|
||||||
var TAIL_MODE = "tail"
|
var TAIL_MODE = "tail"
|
||||||
var CAT_MODE = "cat"
|
var CAT_MODE = "cat"
|
||||||
var SERVER_MODE = "server" // No difference with tail, just a bit more verbose
|
var SERVER_MODE = "server" // No difference with tail, just a bit more verbose
|
||||||
|
|
||||||
|
const (
|
||||||
|
METRICS_NONE = iota
|
||||||
|
METRICS_AGGREGATE
|
||||||
|
METRICS_FULL
|
||||||
|
)
|
||||||
|
|
||||||
|
const (
|
||||||
|
CFG_METRICS_AGGREGATE = "aggregated"
|
||||||
|
CFG_METRICS_FULL = "full"
|
||||||
|
)
|
||||||
|
|
|
@ -49,6 +49,7 @@ type AppsecSourceConfig struct {
|
||||||
|
|
||||||
// runtime structure of AppsecSourceConfig
|
// runtime structure of AppsecSourceConfig
|
||||||
type AppsecSource struct {
|
type AppsecSource struct {
|
||||||
|
metricsLevel int
|
||||||
config AppsecSourceConfig
|
config AppsecSourceConfig
|
||||||
logger *log.Entry
|
logger *log.Entry
|
||||||
mux *http.ServeMux
|
mux *http.ServeMux
|
||||||
|
@ -149,13 +150,13 @@ func (w *AppsecSource) GetAggregMetrics() []prometheus.Collector {
|
||||||
return []prometheus.Collector{AppsecReqCounter, AppsecBlockCounter, AppsecRuleHits, AppsecOutbandParsingHistogram, AppsecInbandParsingHistogram, AppsecGlobalParsingHistogram}
|
return []prometheus.Collector{AppsecReqCounter, AppsecBlockCounter, AppsecRuleHits, AppsecOutbandParsingHistogram, AppsecInbandParsingHistogram, AppsecGlobalParsingHistogram}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (w *AppsecSource) Configure(yamlConfig []byte, logger *log.Entry) error {
|
func (w *AppsecSource) Configure(yamlConfig []byte, logger *log.Entry, MetricsLevel int) error {
|
||||||
err := w.UnmarshalConfig(yamlConfig)
|
err := w.UnmarshalConfig(yamlConfig)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return errors.Wrap(err, "unable to parse appsec configuration")
|
return errors.Wrap(err, "unable to parse appsec configuration")
|
||||||
}
|
}
|
||||||
w.logger = logger
|
w.logger = logger
|
||||||
|
w.metricsLevel = MetricsLevel
|
||||||
w.logger.Tracef("Appsec configuration: %+v", w.config)
|
w.logger.Tracef("Appsec configuration: %+v", w.config)
|
||||||
|
|
||||||
if w.config.AuthCacheDuration == nil {
|
if w.config.AuthCacheDuration == nil {
|
||||||
|
|
|
@ -43,7 +43,8 @@ var linesRead = prometheus.NewCounterVec(
|
||||||
|
|
||||||
// CloudwatchSource is the runtime instance keeping track of N streams within 1 cloudwatch group
|
// CloudwatchSource is the runtime instance keeping track of N streams within 1 cloudwatch group
|
||||||
type CloudwatchSource struct {
|
type CloudwatchSource struct {
|
||||||
Config CloudwatchSourceConfiguration
|
metricsLevel int
|
||||||
|
Config CloudwatchSourceConfiguration
|
||||||
/*runtime stuff*/
|
/*runtime stuff*/
|
||||||
logger *log.Entry
|
logger *log.Entry
|
||||||
t *tomb.Tomb
|
t *tomb.Tomb
|
||||||
|
@ -152,11 +153,12 @@ func (cw *CloudwatchSource) UnmarshalConfig(yamlConfig []byte) error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (cw *CloudwatchSource) Configure(yamlConfig []byte, logger *log.Entry) error {
|
func (cw *CloudwatchSource) Configure(yamlConfig []byte, logger *log.Entry, MetricsLevel int) error {
|
||||||
err := cw.UnmarshalConfig(yamlConfig)
|
err := cw.UnmarshalConfig(yamlConfig)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
cw.metricsLevel = MetricsLevel
|
||||||
|
|
||||||
cw.logger = logger.WithField("group", cw.Config.GroupName)
|
cw.logger = logger.WithField("group", cw.Config.GroupName)
|
||||||
|
|
||||||
|
@ -385,7 +387,9 @@ func (cw *CloudwatchSource) LogStreamManager(in chan LogStreamTailConfig, outCha
|
||||||
if !stream.t.Alive() {
|
if !stream.t.Alive() {
|
||||||
cw.logger.Debugf("stream %s already exists, but is dead", newStream.StreamName)
|
cw.logger.Debugf("stream %s already exists, but is dead", newStream.StreamName)
|
||||||
cw.monitoredStreams = append(cw.monitoredStreams[:idx], cw.monitoredStreams[idx+1:]...)
|
cw.monitoredStreams = append(cw.monitoredStreams[:idx], cw.monitoredStreams[idx+1:]...)
|
||||||
openedStreams.With(prometheus.Labels{"group": newStream.GroupName}).Dec()
|
if cw.metricsLevel != configuration.METRICS_NONE {
|
||||||
|
openedStreams.With(prometheus.Labels{"group": newStream.GroupName}).Dec()
|
||||||
|
}
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
shouldCreate = false
|
shouldCreate = false
|
||||||
|
@ -395,7 +399,9 @@ func (cw *CloudwatchSource) LogStreamManager(in chan LogStreamTailConfig, outCha
|
||||||
|
|
||||||
//let's start watching this stream
|
//let's start watching this stream
|
||||||
if shouldCreate {
|
if shouldCreate {
|
||||||
openedStreams.With(prometheus.Labels{"group": newStream.GroupName}).Inc()
|
if cw.metricsLevel != configuration.METRICS_NONE {
|
||||||
|
openedStreams.With(prometheus.Labels{"group": newStream.GroupName}).Inc()
|
||||||
|
}
|
||||||
newStream.t = tomb.Tomb{}
|
newStream.t = tomb.Tomb{}
|
||||||
newStream.logger = cw.logger.WithFields(log.Fields{"stream": newStream.StreamName})
|
newStream.logger = cw.logger.WithFields(log.Fields{"stream": newStream.StreamName})
|
||||||
cw.logger.Debugf("starting tail of stream %s", newStream.StreamName)
|
cw.logger.Debugf("starting tail of stream %s", newStream.StreamName)
|
||||||
|
@ -409,7 +415,9 @@ func (cw *CloudwatchSource) LogStreamManager(in chan LogStreamTailConfig, outCha
|
||||||
for idx, stream := range cw.monitoredStreams {
|
for idx, stream := range cw.monitoredStreams {
|
||||||
if !cw.monitoredStreams[idx].t.Alive() {
|
if !cw.monitoredStreams[idx].t.Alive() {
|
||||||
cw.logger.Debugf("remove dead stream %s", stream.StreamName)
|
cw.logger.Debugf("remove dead stream %s", stream.StreamName)
|
||||||
openedStreams.With(prometheus.Labels{"group": cw.monitoredStreams[idx].GroupName}).Dec()
|
if cw.metricsLevel != configuration.METRICS_NONE {
|
||||||
|
openedStreams.With(prometheus.Labels{"group": cw.monitoredStreams[idx].GroupName}).Dec()
|
||||||
|
}
|
||||||
} else {
|
} else {
|
||||||
newMonitoredStreams = append(newMonitoredStreams, stream)
|
newMonitoredStreams = append(newMonitoredStreams, stream)
|
||||||
}
|
}
|
||||||
|
@ -485,7 +493,9 @@ func (cw *CloudwatchSource) TailLogStream(cfg *LogStreamTailConfig, outChan chan
|
||||||
cfg.logger.Warningf("cwLogToEvent error, discarded event : %s", err)
|
cfg.logger.Warningf("cwLogToEvent error, discarded event : %s", err)
|
||||||
} else {
|
} else {
|
||||||
cfg.logger.Debugf("pushing message : %s", evt.Line.Raw)
|
cfg.logger.Debugf("pushing message : %s", evt.Line.Raw)
|
||||||
linesRead.With(prometheus.Labels{"group": cfg.GroupName, "stream": cfg.StreamName}).Inc()
|
if cw.metricsLevel != configuration.METRICS_NONE {
|
||||||
|
linesRead.With(prometheus.Labels{"group": cfg.GroupName, "stream": cfg.StreamName}).Inc()
|
||||||
|
}
|
||||||
outChan <- evt
|
outChan <- evt
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -13,6 +13,7 @@ import (
|
||||||
|
|
||||||
"github.com/aws/aws-sdk-go/aws"
|
"github.com/aws/aws-sdk-go/aws"
|
||||||
"github.com/aws/aws-sdk-go/service/cloudwatchlogs"
|
"github.com/aws/aws-sdk-go/service/cloudwatchlogs"
|
||||||
|
"github.com/crowdsecurity/crowdsec/pkg/acquisition/configuration"
|
||||||
"github.com/crowdsecurity/crowdsec/pkg/types"
|
"github.com/crowdsecurity/crowdsec/pkg/types"
|
||||||
log "github.com/sirupsen/logrus"
|
log "github.com/sirupsen/logrus"
|
||||||
"github.com/stretchr/testify/require"
|
"github.com/stretchr/testify/require"
|
||||||
|
@ -427,7 +428,7 @@ stream_name: test_stream`),
|
||||||
dbgLogger.Logger.SetLevel(log.DebugLevel)
|
dbgLogger.Logger.SetLevel(log.DebugLevel)
|
||||||
dbgLogger.Infof("starting test")
|
dbgLogger.Infof("starting test")
|
||||||
cw := CloudwatchSource{}
|
cw := CloudwatchSource{}
|
||||||
err := cw.Configure(tc.config, dbgLogger)
|
err := cw.Configure(tc.config, dbgLogger, configuration.METRICS_NONE)
|
||||||
cstest.RequireErrorContains(t, err, tc.expectedCfgErr)
|
cstest.RequireErrorContains(t, err, tc.expectedCfgErr)
|
||||||
|
|
||||||
if tc.expectedCfgErr != "" {
|
if tc.expectedCfgErr != "" {
|
||||||
|
@ -559,7 +560,7 @@ stream_name: test_stream`),
|
||||||
dbgLogger := log.New().WithField("test", tc.name)
|
dbgLogger := log.New().WithField("test", tc.name)
|
||||||
dbgLogger.Logger.SetLevel(log.DebugLevel)
|
dbgLogger.Logger.SetLevel(log.DebugLevel)
|
||||||
cw := CloudwatchSource{}
|
cw := CloudwatchSource{}
|
||||||
err := cw.Configure(tc.config, dbgLogger)
|
err := cw.Configure(tc.config, dbgLogger, configuration.METRICS_NONE)
|
||||||
cstest.RequireErrorContains(t, err, tc.expectedCfgErr)
|
cstest.RequireErrorContains(t, err, tc.expectedCfgErr)
|
||||||
if tc.expectedCfgErr != "" {
|
if tc.expectedCfgErr != "" {
|
||||||
return
|
return
|
||||||
|
|
|
@ -46,6 +46,7 @@ type DockerConfiguration struct {
|
||||||
}
|
}
|
||||||
|
|
||||||
type DockerSource struct {
|
type DockerSource struct {
|
||||||
|
metricsLevel int
|
||||||
Config DockerConfiguration
|
Config DockerConfiguration
|
||||||
runningContainerState map[string]*ContainerConfig
|
runningContainerState map[string]*ContainerConfig
|
||||||
compiledContainerName []*regexp.Regexp
|
compiledContainerName []*regexp.Regexp
|
||||||
|
@ -128,9 +129,9 @@ func (d *DockerSource) UnmarshalConfig(yamlConfig []byte) error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (d *DockerSource) Configure(yamlConfig []byte, logger *log.Entry) error {
|
func (d *DockerSource) Configure(yamlConfig []byte, logger *log.Entry, MetricsLevel int) error {
|
||||||
d.logger = logger
|
d.logger = logger
|
||||||
|
d.metricsLevel = MetricsLevel
|
||||||
err := d.UnmarshalConfig(yamlConfig)
|
err := d.UnmarshalConfig(yamlConfig)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
|
@ -325,7 +326,9 @@ func (d *DockerSource) OneShotAcquisition(out chan types.Event, t *tomb.Tomb) er
|
||||||
l.Src = containerConfig.Name
|
l.Src = containerConfig.Name
|
||||||
l.Process = true
|
l.Process = true
|
||||||
l.Module = d.GetName()
|
l.Module = d.GetName()
|
||||||
linesRead.With(prometheus.Labels{"source": containerConfig.Name}).Inc()
|
if d.metricsLevel != configuration.METRICS_NONE {
|
||||||
|
linesRead.With(prometheus.Labels{"source": containerConfig.Name}).Inc()
|
||||||
|
}
|
||||||
evt := types.Event{Line: l, Process: true, Type: types.LOG, ExpectMode: types.TIMEMACHINE}
|
evt := types.Event{Line: l, Process: true, Type: types.LOG, ExpectMode: types.TIMEMACHINE}
|
||||||
out <- evt
|
out <- evt
|
||||||
d.logger.Debugf("Sent line to parsing: %+v", evt.Line.Raw)
|
d.logger.Debugf("Sent line to parsing: %+v", evt.Line.Raw)
|
||||||
|
|
|
@ -13,6 +13,7 @@ import (
|
||||||
|
|
||||||
"github.com/crowdsecurity/go-cs-lib/cstest"
|
"github.com/crowdsecurity/go-cs-lib/cstest"
|
||||||
|
|
||||||
|
"github.com/crowdsecurity/crowdsec/pkg/acquisition/configuration"
|
||||||
"github.com/crowdsecurity/crowdsec/pkg/types"
|
"github.com/crowdsecurity/crowdsec/pkg/types"
|
||||||
dockerTypes "github.com/docker/docker/api/types"
|
dockerTypes "github.com/docker/docker/api/types"
|
||||||
dockerContainer "github.com/docker/docker/api/types/container"
|
dockerContainer "github.com/docker/docker/api/types/container"
|
||||||
|
@ -60,7 +61,7 @@ container_name:
|
||||||
|
|
||||||
for _, test := range tests {
|
for _, test := range tests {
|
||||||
f := DockerSource{}
|
f := DockerSource{}
|
||||||
err := f.Configure([]byte(test.config), subLogger)
|
err := f.Configure([]byte(test.config), subLogger, configuration.METRICS_NONE)
|
||||||
cstest.AssertErrorContains(t, err, test.expectedErr)
|
cstest.AssertErrorContains(t, err, test.expectedErr)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -162,7 +163,7 @@ container_name_regexp:
|
||||||
|
|
||||||
for _, ts := range tests {
|
for _, ts := range tests {
|
||||||
var (
|
var (
|
||||||
logger *log.Logger
|
logger *log.Logger
|
||||||
subLogger *log.Entry
|
subLogger *log.Entry
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -182,7 +183,7 @@ container_name_regexp:
|
||||||
out := make(chan types.Event)
|
out := make(chan types.Event)
|
||||||
dockerSource := DockerSource{}
|
dockerSource := DockerSource{}
|
||||||
|
|
||||||
err := dockerSource.Configure([]byte(ts.config), subLogger)
|
err := dockerSource.Configure([]byte(ts.config), subLogger, configuration.METRICS_NONE)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("Unexpected error : %s", err)
|
t.Fatalf("Unexpected error : %s", err)
|
||||||
}
|
}
|
||||||
|
@ -304,7 +305,7 @@ func TestOneShot(t *testing.T) {
|
||||||
for _, ts := range tests {
|
for _, ts := range tests {
|
||||||
var (
|
var (
|
||||||
subLogger *log.Entry
|
subLogger *log.Entry
|
||||||
logger *log.Logger
|
logger *log.Logger
|
||||||
)
|
)
|
||||||
|
|
||||||
if ts.expectedOutput != "" {
|
if ts.expectedOutput != "" {
|
||||||
|
|
|
@ -46,6 +46,7 @@ type FileConfiguration struct {
|
||||||
}
|
}
|
||||||
|
|
||||||
type FileSource struct {
|
type FileSource struct {
|
||||||
|
metricsLevel int
|
||||||
config FileConfiguration
|
config FileConfiguration
|
||||||
watcher *fsnotify.Watcher
|
watcher *fsnotify.Watcher
|
||||||
watchedDirectories map[string]bool
|
watchedDirectories map[string]bool
|
||||||
|
@ -98,8 +99,9 @@ func (f *FileSource) UnmarshalConfig(yamlConfig []byte) error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (f *FileSource) Configure(yamlConfig []byte, logger *log.Entry) error {
|
func (f *FileSource) Configure(yamlConfig []byte, logger *log.Entry, MetricsLevel int) error {
|
||||||
f.logger = logger
|
f.logger = logger
|
||||||
|
f.metricsLevel = MetricsLevel
|
||||||
|
|
||||||
err := f.UnmarshalConfig(yamlConfig)
|
err := f.UnmarshalConfig(yamlConfig)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -517,12 +519,19 @@ func (f *FileSource) tailFile(out chan types.Event, t *tomb.Tomb, tail *tail.Tai
|
||||||
if line.Text == "" { //skip empty lines
|
if line.Text == "" { //skip empty lines
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
linesRead.With(prometheus.Labels{"source": tail.Filename}).Inc()
|
if f.metricsLevel != configuration.METRICS_NONE {
|
||||||
|
linesRead.With(prometheus.Labels{"source": tail.Filename}).Inc()
|
||||||
|
}
|
||||||
|
src := tail.Filename
|
||||||
|
if f.metricsLevel == configuration.METRICS_AGGREGATE {
|
||||||
|
src = filepath.Base(tail.Filename)
|
||||||
|
}
|
||||||
|
|
||||||
l := types.Line{
|
l := types.Line{
|
||||||
Raw: trimLine(line.Text),
|
Raw: trimLine(line.Text),
|
||||||
Labels: f.config.Labels,
|
Labels: f.config.Labels,
|
||||||
Time: line.Time,
|
Time: line.Time,
|
||||||
Src: tail.Filename,
|
Src: src,
|
||||||
Process: true,
|
Process: true,
|
||||||
Module: f.GetName(),
|
Module: f.GetName(),
|
||||||
}
|
}
|
||||||
|
|
|
@ -15,6 +15,7 @@ import (
|
||||||
|
|
||||||
"github.com/crowdsecurity/go-cs-lib/cstest"
|
"github.com/crowdsecurity/go-cs-lib/cstest"
|
||||||
|
|
||||||
|
"github.com/crowdsecurity/crowdsec/pkg/acquisition/configuration"
|
||||||
fileacquisition "github.com/crowdsecurity/crowdsec/pkg/acquisition/modules/file"
|
fileacquisition "github.com/crowdsecurity/crowdsec/pkg/acquisition/modules/file"
|
||||||
"github.com/crowdsecurity/crowdsec/pkg/types"
|
"github.com/crowdsecurity/crowdsec/pkg/types"
|
||||||
)
|
)
|
||||||
|
@ -56,7 +57,7 @@ exclude_regexps: ["as[a-$d"]`,
|
||||||
tc := tc
|
tc := tc
|
||||||
t.Run(tc.name, func(t *testing.T) {
|
t.Run(tc.name, func(t *testing.T) {
|
||||||
f := fileacquisition.FileSource{}
|
f := fileacquisition.FileSource{}
|
||||||
err := f.Configure([]byte(tc.config), subLogger)
|
err := f.Configure([]byte(tc.config), subLogger, configuration.METRICS_NONE)
|
||||||
cstest.RequireErrorContains(t, err, tc.expectedErr)
|
cstest.RequireErrorContains(t, err, tc.expectedErr)
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
@ -222,7 +223,7 @@ filename: test_files/test_delete.log`,
|
||||||
tc.setup()
|
tc.setup()
|
||||||
}
|
}
|
||||||
|
|
||||||
err := f.Configure([]byte(tc.config), subLogger)
|
err := f.Configure([]byte(tc.config), subLogger, configuration.METRICS_NONE)
|
||||||
cstest.RequireErrorContains(t, err, tc.expectedConfigErr)
|
cstest.RequireErrorContains(t, err, tc.expectedConfigErr)
|
||||||
if tc.expectedConfigErr != "" {
|
if tc.expectedConfigErr != "" {
|
||||||
return
|
return
|
||||||
|
@ -384,7 +385,7 @@ force_inotify: true`, testPattern),
|
||||||
tc.setup()
|
tc.setup()
|
||||||
}
|
}
|
||||||
|
|
||||||
err := f.Configure([]byte(tc.config), subLogger)
|
err := f.Configure([]byte(tc.config), subLogger, configuration.METRICS_NONE)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
if tc.afterConfigure != nil {
|
if tc.afterConfigure != nil {
|
||||||
|
@ -455,7 +456,7 @@ exclude_regexps: ["\\.gz$"]`
|
||||||
})
|
})
|
||||||
|
|
||||||
f := fileacquisition.FileSource{}
|
f := fileacquisition.FileSource{}
|
||||||
if err := f.Configure([]byte(config), subLogger); err != nil {
|
if err := f.Configure([]byte(config), subLogger, configuration.METRICS_NONE); err != nil {
|
||||||
subLogger.Fatalf("unexpected error: %s", err)
|
subLogger.Fatalf("unexpected error: %s", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -26,10 +26,11 @@ type JournalCtlConfiguration struct {
|
||||||
}
|
}
|
||||||
|
|
||||||
type JournalCtlSource struct {
|
type JournalCtlSource struct {
|
||||||
config JournalCtlConfiguration
|
metricsLevel int
|
||||||
logger *log.Entry
|
config JournalCtlConfiguration
|
||||||
src string
|
logger *log.Entry
|
||||||
args []string
|
src string
|
||||||
|
args []string
|
||||||
}
|
}
|
||||||
|
|
||||||
const journalctlCmd string = "journalctl"
|
const journalctlCmd string = "journalctl"
|
||||||
|
@ -131,7 +132,9 @@ func (j *JournalCtlSource) runJournalCtl(out chan types.Event, t *tomb.Tomb) err
|
||||||
l.Src = j.src
|
l.Src = j.src
|
||||||
l.Process = true
|
l.Process = true
|
||||||
l.Module = j.GetName()
|
l.Module = j.GetName()
|
||||||
linesRead.With(prometheus.Labels{"source": j.src}).Inc()
|
if j.metricsLevel != configuration.METRICS_NONE {
|
||||||
|
linesRead.With(prometheus.Labels{"source": j.src}).Inc()
|
||||||
|
}
|
||||||
var evt types.Event
|
var evt types.Event
|
||||||
if !j.config.UseTimeMachine {
|
if !j.config.UseTimeMachine {
|
||||||
evt = types.Event{Line: l, Process: true, Type: types.LOG, ExpectMode: types.LIVE}
|
evt = types.Event{Line: l, Process: true, Type: types.LOG, ExpectMode: types.LIVE}
|
||||||
|
@ -194,8 +197,9 @@ func (j *JournalCtlSource) UnmarshalConfig(yamlConfig []byte) error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (j *JournalCtlSource) Configure(yamlConfig []byte, logger *log.Entry) error {
|
func (j *JournalCtlSource) Configure(yamlConfig []byte, logger *log.Entry, MetricsLevel int) error {
|
||||||
j.logger = logger
|
j.logger = logger
|
||||||
|
j.metricsLevel = MetricsLevel
|
||||||
|
|
||||||
err := j.UnmarshalConfig(yamlConfig)
|
err := j.UnmarshalConfig(yamlConfig)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|
|
@ -10,6 +10,7 @@ import (
|
||||||
|
|
||||||
"github.com/crowdsecurity/go-cs-lib/cstest"
|
"github.com/crowdsecurity/go-cs-lib/cstest"
|
||||||
|
|
||||||
|
"github.com/crowdsecurity/crowdsec/pkg/acquisition/configuration"
|
||||||
"github.com/crowdsecurity/crowdsec/pkg/types"
|
"github.com/crowdsecurity/crowdsec/pkg/types"
|
||||||
log "github.com/sirupsen/logrus"
|
log "github.com/sirupsen/logrus"
|
||||||
"github.com/sirupsen/logrus/hooks/test"
|
"github.com/sirupsen/logrus/hooks/test"
|
||||||
|
@ -52,7 +53,7 @@ journalctl_filter:
|
||||||
|
|
||||||
for _, test := range tests {
|
for _, test := range tests {
|
||||||
f := JournalCtlSource{}
|
f := JournalCtlSource{}
|
||||||
err := f.Configure([]byte(test.config), subLogger)
|
err := f.Configure([]byte(test.config), subLogger, configuration.METRICS_NONE)
|
||||||
cstest.AssertErrorContains(t, err, test.expectedErr)
|
cstest.AssertErrorContains(t, err, test.expectedErr)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -144,9 +145,9 @@ journalctl_filter:
|
||||||
}
|
}
|
||||||
for _, ts := range tests {
|
for _, ts := range tests {
|
||||||
var (
|
var (
|
||||||
logger *log.Logger
|
logger *log.Logger
|
||||||
subLogger *log.Entry
|
subLogger *log.Entry
|
||||||
hook *test.Hook
|
hook *test.Hook
|
||||||
)
|
)
|
||||||
|
|
||||||
if ts.expectedOutput != "" {
|
if ts.expectedOutput != "" {
|
||||||
|
@ -165,7 +166,7 @@ journalctl_filter:
|
||||||
out := make(chan types.Event, 100)
|
out := make(chan types.Event, 100)
|
||||||
j := JournalCtlSource{}
|
j := JournalCtlSource{}
|
||||||
|
|
||||||
err := j.Configure([]byte(ts.config), subLogger)
|
err := j.Configure([]byte(ts.config), subLogger, configuration.METRICS_NONE)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("Unexpected error : %s", err)
|
t.Fatalf("Unexpected error : %s", err)
|
||||||
}
|
}
|
||||||
|
@ -218,9 +219,9 @@ journalctl_filter:
|
||||||
}
|
}
|
||||||
for _, ts := range tests {
|
for _, ts := range tests {
|
||||||
var (
|
var (
|
||||||
logger *log.Logger
|
logger *log.Logger
|
||||||
subLogger *log.Entry
|
subLogger *log.Entry
|
||||||
hook *test.Hook
|
hook *test.Hook
|
||||||
)
|
)
|
||||||
|
|
||||||
if ts.expectedOutput != "" {
|
if ts.expectedOutput != "" {
|
||||||
|
@ -239,7 +240,7 @@ journalctl_filter:
|
||||||
out := make(chan types.Event)
|
out := make(chan types.Event)
|
||||||
j := JournalCtlSource{}
|
j := JournalCtlSource{}
|
||||||
|
|
||||||
err := j.Configure([]byte(ts.config), subLogger)
|
err := j.Configure([]byte(ts.config), subLogger, configuration.METRICS_NONE)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("Unexpected error : %s", err)
|
t.Fatalf("Unexpected error : %s", err)
|
||||||
}
|
}
|
||||||
|
|
|
@ -52,9 +52,10 @@ type TLSConfig struct {
|
||||||
}
|
}
|
||||||
|
|
||||||
type KafkaSource struct {
|
type KafkaSource struct {
|
||||||
Config KafkaConfiguration
|
metricsLevel int
|
||||||
logger *log.Entry
|
Config KafkaConfiguration
|
||||||
Reader *kafka.Reader
|
logger *log.Entry
|
||||||
|
Reader *kafka.Reader
|
||||||
}
|
}
|
||||||
|
|
||||||
func (k *KafkaSource) GetUuid() string {
|
func (k *KafkaSource) GetUuid() string {
|
||||||
|
@ -86,8 +87,9 @@ func (k *KafkaSource) UnmarshalConfig(yamlConfig []byte) error {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
func (k *KafkaSource) Configure(yamlConfig []byte, logger *log.Entry) error {
|
func (k *KafkaSource) Configure(yamlConfig []byte, logger *log.Entry, MetricsLevel int) error {
|
||||||
k.logger = logger
|
k.logger = logger
|
||||||
|
k.metricsLevel = MetricsLevel
|
||||||
|
|
||||||
k.logger.Debugf("start configuring %s source", dataSourceName)
|
k.logger.Debugf("start configuring %s source", dataSourceName)
|
||||||
|
|
||||||
|
@ -170,7 +172,9 @@ func (k *KafkaSource) ReadMessage(out chan types.Event) error {
|
||||||
Module: k.GetName(),
|
Module: k.GetName(),
|
||||||
}
|
}
|
||||||
k.logger.Tracef("line with message read from topic '%s': %+v", k.Config.Topic, l)
|
k.logger.Tracef("line with message read from topic '%s': %+v", k.Config.Topic, l)
|
||||||
linesRead.With(prometheus.Labels{"topic": k.Config.Topic}).Inc()
|
if k.metricsLevel != configuration.METRICS_NONE {
|
||||||
|
linesRead.With(prometheus.Labels{"topic": k.Config.Topic}).Inc()
|
||||||
|
}
|
||||||
var evt types.Event
|
var evt types.Event
|
||||||
|
|
||||||
if !k.Config.UseTimeMachine {
|
if !k.Config.UseTimeMachine {
|
||||||
|
|
|
@ -15,6 +15,7 @@ import (
|
||||||
|
|
||||||
"github.com/crowdsecurity/go-cs-lib/cstest"
|
"github.com/crowdsecurity/go-cs-lib/cstest"
|
||||||
|
|
||||||
|
"github.com/crowdsecurity/crowdsec/pkg/acquisition/configuration"
|
||||||
"github.com/crowdsecurity/crowdsec/pkg/types"
|
"github.com/crowdsecurity/crowdsec/pkg/types"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -75,7 +76,7 @@ group_id: crowdsec`,
|
||||||
})
|
})
|
||||||
for _, test := range tests {
|
for _, test := range tests {
|
||||||
k := KafkaSource{}
|
k := KafkaSource{}
|
||||||
err := k.Configure([]byte(test.config), subLogger)
|
err := k.Configure([]byte(test.config), subLogger, configuration.METRICS_NONE)
|
||||||
cstest.AssertErrorContains(t, err, test.expectedErr)
|
cstest.AssertErrorContains(t, err, test.expectedErr)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -169,7 +170,7 @@ func TestStreamingAcquisition(t *testing.T) {
|
||||||
source: kafka
|
source: kafka
|
||||||
brokers:
|
brokers:
|
||||||
- localhost:9092
|
- localhost:9092
|
||||||
topic: crowdsecplaintext`), subLogger)
|
topic: crowdsecplaintext`), subLogger, configuration.METRICS_NONE)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("could not configure kafka source : %s", err)
|
t.Fatalf("could not configure kafka source : %s", err)
|
||||||
}
|
}
|
||||||
|
@ -245,7 +246,7 @@ tls:
|
||||||
client_cert: ./testdata/kafkaClient.certificate.pem
|
client_cert: ./testdata/kafkaClient.certificate.pem
|
||||||
client_key: ./testdata/kafkaClient.key
|
client_key: ./testdata/kafkaClient.key
|
||||||
ca_cert: ./testdata/snakeoil-ca-1.crt
|
ca_cert: ./testdata/snakeoil-ca-1.crt
|
||||||
`), subLogger)
|
`), subLogger, configuration.METRICS_NONE)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("could not configure kafka source : %s", err)
|
t.Fatalf("could not configure kafka source : %s", err)
|
||||||
}
|
}
|
||||||
|
|
|
@ -38,6 +38,7 @@ type KinesisConfiguration struct {
|
||||||
}
|
}
|
||||||
|
|
||||||
type KinesisSource struct {
|
type KinesisSource struct {
|
||||||
|
metricsLevel int
|
||||||
Config KinesisConfiguration
|
Config KinesisConfiguration
|
||||||
logger *log.Entry
|
logger *log.Entry
|
||||||
kClient *kinesis.Kinesis
|
kClient *kinesis.Kinesis
|
||||||
|
@ -149,8 +150,9 @@ func (k *KinesisSource) UnmarshalConfig(yamlConfig []byte) error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (k *KinesisSource) Configure(yamlConfig []byte, logger *log.Entry) error {
|
func (k *KinesisSource) Configure(yamlConfig []byte, logger *log.Entry, MetricsLevel int) error {
|
||||||
k.logger = logger
|
k.logger = logger
|
||||||
|
k.metricsLevel = MetricsLevel
|
||||||
|
|
||||||
err := k.UnmarshalConfig(yamlConfig)
|
err := k.UnmarshalConfig(yamlConfig)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -283,11 +285,15 @@ func (k *KinesisSource) RegisterConsumer() (*kinesis.RegisterStreamConsumerOutpu
|
||||||
func (k *KinesisSource) ParseAndPushRecords(records []*kinesis.Record, out chan types.Event, logger *log.Entry, shardId string) {
|
func (k *KinesisSource) ParseAndPushRecords(records []*kinesis.Record, out chan types.Event, logger *log.Entry, shardId string) {
|
||||||
for _, record := range records {
|
for _, record := range records {
|
||||||
if k.Config.StreamARN != "" {
|
if k.Config.StreamARN != "" {
|
||||||
linesReadShards.With(prometheus.Labels{"stream": k.Config.StreamARN, "shard": shardId}).Inc()
|
if k.metricsLevel != configuration.METRICS_NONE {
|
||||||
linesRead.With(prometheus.Labels{"stream": k.Config.StreamARN}).Inc()
|
linesReadShards.With(prometheus.Labels{"stream": k.Config.StreamARN, "shard": shardId}).Inc()
|
||||||
|
linesRead.With(prometheus.Labels{"stream": k.Config.StreamARN}).Inc()
|
||||||
|
}
|
||||||
} else {
|
} else {
|
||||||
linesReadShards.With(prometheus.Labels{"stream": k.Config.StreamName, "shard": shardId}).Inc()
|
if k.metricsLevel != configuration.METRICS_NONE {
|
||||||
linesRead.With(prometheus.Labels{"stream": k.Config.StreamName}).Inc()
|
linesReadShards.With(prometheus.Labels{"stream": k.Config.StreamName, "shard": shardId}).Inc()
|
||||||
|
linesRead.With(prometheus.Labels{"stream": k.Config.StreamName}).Inc()
|
||||||
|
}
|
||||||
}
|
}
|
||||||
var data []CloudwatchSubscriptionLogEvent
|
var data []CloudwatchSubscriptionLogEvent
|
||||||
var err error
|
var err error
|
||||||
|
|
|
@ -17,6 +17,7 @@ import (
|
||||||
"github.com/aws/aws-sdk-go/aws"
|
"github.com/aws/aws-sdk-go/aws"
|
||||||
"github.com/aws/aws-sdk-go/aws/session"
|
"github.com/aws/aws-sdk-go/aws/session"
|
||||||
"github.com/aws/aws-sdk-go/service/kinesis"
|
"github.com/aws/aws-sdk-go/service/kinesis"
|
||||||
|
"github.com/crowdsecurity/crowdsec/pkg/acquisition/configuration"
|
||||||
"github.com/crowdsecurity/crowdsec/pkg/types"
|
"github.com/crowdsecurity/crowdsec/pkg/types"
|
||||||
log "github.com/sirupsen/logrus"
|
log "github.com/sirupsen/logrus"
|
||||||
"github.com/stretchr/testify/assert"
|
"github.com/stretchr/testify/assert"
|
||||||
|
@ -143,7 +144,7 @@ stream_arn: arn:aws:kinesis:eu-west-1:123456789012:stream/my-stream`,
|
||||||
})
|
})
|
||||||
for _, test := range tests {
|
for _, test := range tests {
|
||||||
f := KinesisSource{}
|
f := KinesisSource{}
|
||||||
err := f.Configure([]byte(test.config), subLogger)
|
err := f.Configure([]byte(test.config), subLogger, configuration.METRICS_NONE)
|
||||||
cstest.AssertErrorContains(t, err, test.expectedErr)
|
cstest.AssertErrorContains(t, err, test.expectedErr)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -172,7 +173,7 @@ stream_name: stream-1-shard`,
|
||||||
config := fmt.Sprintf(test.config, endpoint)
|
config := fmt.Sprintf(test.config, endpoint)
|
||||||
err := f.Configure([]byte(config), log.WithFields(log.Fields{
|
err := f.Configure([]byte(config), log.WithFields(log.Fields{
|
||||||
"type": "kinesis",
|
"type": "kinesis",
|
||||||
}))
|
}), configuration.METRICS_NONE)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("Error configuring source: %s", err)
|
t.Fatalf("Error configuring source: %s", err)
|
||||||
}
|
}
|
||||||
|
@ -218,7 +219,7 @@ stream_name: stream-2-shards`,
|
||||||
config := fmt.Sprintf(test.config, endpoint)
|
config := fmt.Sprintf(test.config, endpoint)
|
||||||
err := f.Configure([]byte(config), log.WithFields(log.Fields{
|
err := f.Configure([]byte(config), log.WithFields(log.Fields{
|
||||||
"type": "kinesis",
|
"type": "kinesis",
|
||||||
}))
|
}), configuration.METRICS_NONE)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("Error configuring source: %s", err)
|
t.Fatalf("Error configuring source: %s", err)
|
||||||
}
|
}
|
||||||
|
@ -267,7 +268,7 @@ from_subscription: true`,
|
||||||
config := fmt.Sprintf(test.config, endpoint)
|
config := fmt.Sprintf(test.config, endpoint)
|
||||||
err := f.Configure([]byte(config), log.WithFields(log.Fields{
|
err := f.Configure([]byte(config), log.WithFields(log.Fields{
|
||||||
"type": "kinesis",
|
"type": "kinesis",
|
||||||
}))
|
}), configuration.METRICS_NONE)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("Error configuring source: %s", err)
|
t.Fatalf("Error configuring source: %s", err)
|
||||||
}
|
}
|
||||||
|
|
|
@ -28,12 +28,13 @@ type KubernetesAuditConfiguration struct {
|
||||||
}
|
}
|
||||||
|
|
||||||
type KubernetesAuditSource struct {
|
type KubernetesAuditSource struct {
|
||||||
config KubernetesAuditConfiguration
|
metricsLevel int
|
||||||
logger *log.Entry
|
config KubernetesAuditConfiguration
|
||||||
mux *http.ServeMux
|
logger *log.Entry
|
||||||
server *http.Server
|
mux *http.ServeMux
|
||||||
outChan chan types.Event
|
server *http.Server
|
||||||
addr string
|
outChan chan types.Event
|
||||||
|
addr string
|
||||||
}
|
}
|
||||||
|
|
||||||
var eventCount = prometheus.NewCounterVec(
|
var eventCount = prometheus.NewCounterVec(
|
||||||
|
@ -93,8 +94,9 @@ func (ka *KubernetesAuditSource) UnmarshalConfig(yamlConfig []byte) error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (ka *KubernetesAuditSource) Configure(config []byte, logger *log.Entry) error {
|
func (ka *KubernetesAuditSource) Configure(config []byte, logger *log.Entry, MetricsLevel int) error {
|
||||||
ka.logger = logger
|
ka.logger = logger
|
||||||
|
ka.metricsLevel = MetricsLevel
|
||||||
|
|
||||||
err := ka.UnmarshalConfig(config)
|
err := ka.UnmarshalConfig(config)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -161,7 +163,10 @@ func (ka *KubernetesAuditSource) Dump() interface{} {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (ka *KubernetesAuditSource) webhookHandler(w http.ResponseWriter, r *http.Request) {
|
func (ka *KubernetesAuditSource) webhookHandler(w http.ResponseWriter, r *http.Request) {
|
||||||
requestCount.WithLabelValues(ka.addr).Inc()
|
|
||||||
|
if ka.metricsLevel != configuration.METRICS_NONE {
|
||||||
|
requestCount.WithLabelValues(ka.addr).Inc()
|
||||||
|
}
|
||||||
if r.Method != http.MethodPost {
|
if r.Method != http.MethodPost {
|
||||||
w.WriteHeader(http.StatusMethodNotAllowed)
|
w.WriteHeader(http.StatusMethodNotAllowed)
|
||||||
return
|
return
|
||||||
|
@ -185,7 +190,9 @@ func (ka *KubernetesAuditSource) webhookHandler(w http.ResponseWriter, r *http.R
|
||||||
|
|
||||||
remoteIP := strings.Split(r.RemoteAddr, ":")[0]
|
remoteIP := strings.Split(r.RemoteAddr, ":")[0]
|
||||||
for _, auditEvent := range auditEvents.Items {
|
for _, auditEvent := range auditEvents.Items {
|
||||||
eventCount.WithLabelValues(ka.addr).Inc()
|
if ka.metricsLevel != configuration.METRICS_NONE {
|
||||||
|
eventCount.WithLabelValues(ka.addr).Inc()
|
||||||
|
}
|
||||||
bytesEvent, err := json.Marshal(auditEvent)
|
bytesEvent, err := json.Marshal(auditEvent)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
ka.logger.Errorf("Error marshaling audit event: %s", err)
|
ka.logger.Errorf("Error marshaling audit event: %s", err)
|
||||||
|
|
|
@ -6,6 +6,7 @@ import (
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"github.com/crowdsecurity/crowdsec/pkg/acquisition/configuration"
|
||||||
"github.com/crowdsecurity/crowdsec/pkg/types"
|
"github.com/crowdsecurity/crowdsec/pkg/types"
|
||||||
log "github.com/sirupsen/logrus"
|
log "github.com/sirupsen/logrus"
|
||||||
"github.com/stretchr/testify/assert"
|
"github.com/stretchr/testify/assert"
|
||||||
|
@ -81,7 +82,7 @@ webhook_path: /k8s-audit`,
|
||||||
|
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
err = f.Configure([]byte(test.config), subLogger)
|
err = f.Configure([]byte(test.config), subLogger, configuration.METRICS_NONE)
|
||||||
|
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
f.StreamingAcquisition(out, tb)
|
f.StreamingAcquisition(out, tb)
|
||||||
|
@ -253,7 +254,7 @@ webhook_path: /k8s-audit`,
|
||||||
f := KubernetesAuditSource{}
|
f := KubernetesAuditSource{}
|
||||||
err := f.UnmarshalConfig([]byte(test.config))
|
err := f.UnmarshalConfig([]byte(test.config))
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
err = f.Configure([]byte(test.config), subLogger)
|
err = f.Configure([]byte(test.config), subLogger, configuration.METRICS_NONE)
|
||||||
|
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
|
|
@ -57,7 +57,8 @@ type LokiConfiguration struct {
|
||||||
}
|
}
|
||||||
|
|
||||||
type LokiSource struct {
|
type LokiSource struct {
|
||||||
Config LokiConfiguration
|
metricsLevel int
|
||||||
|
Config LokiConfiguration
|
||||||
|
|
||||||
Client *lokiclient.LokiClient
|
Client *lokiclient.LokiClient
|
||||||
|
|
||||||
|
@ -118,9 +119,10 @@ func (l *LokiSource) UnmarshalConfig(yamlConfig []byte) error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (l *LokiSource) Configure(config []byte, logger *log.Entry) error {
|
func (l *LokiSource) Configure(config []byte, logger *log.Entry, MetricsLevel int) error {
|
||||||
l.Config = LokiConfiguration{}
|
l.Config = LokiConfiguration{}
|
||||||
l.logger = logger
|
l.logger = logger
|
||||||
|
l.metricsLevel = MetricsLevel
|
||||||
err := l.UnmarshalConfig(config)
|
err := l.UnmarshalConfig(config)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
|
@ -302,7 +304,9 @@ func (l *LokiSource) readOneEntry(entry lokiclient.Entry, labels map[string]stri
|
||||||
ll.Process = true
|
ll.Process = true
|
||||||
ll.Module = l.GetName()
|
ll.Module = l.GetName()
|
||||||
|
|
||||||
linesRead.With(prometheus.Labels{"source": l.Config.URL}).Inc()
|
if l.metricsLevel != configuration.METRICS_NONE {
|
||||||
|
linesRead.With(prometheus.Labels{"source": l.Config.URL}).Inc()
|
||||||
|
}
|
||||||
expectMode := types.LIVE
|
expectMode := types.LIVE
|
||||||
if l.Config.UseTimeMachine {
|
if l.Config.UseTimeMachine {
|
||||||
expectMode = types.TIMEMACHINE
|
expectMode = types.TIMEMACHINE
|
||||||
|
|
|
@ -20,6 +20,7 @@ import (
|
||||||
|
|
||||||
"github.com/crowdsecurity/go-cs-lib/cstest"
|
"github.com/crowdsecurity/go-cs-lib/cstest"
|
||||||
|
|
||||||
|
"github.com/crowdsecurity/crowdsec/pkg/acquisition/configuration"
|
||||||
"github.com/crowdsecurity/crowdsec/pkg/acquisition/modules/loki"
|
"github.com/crowdsecurity/crowdsec/pkg/acquisition/modules/loki"
|
||||||
"github.com/crowdsecurity/crowdsec/pkg/types"
|
"github.com/crowdsecurity/crowdsec/pkg/types"
|
||||||
)
|
)
|
||||||
|
@ -130,7 +131,7 @@ query: >
|
||||||
for _, test := range tests {
|
for _, test := range tests {
|
||||||
t.Run(test.testName, func(t *testing.T) {
|
t.Run(test.testName, func(t *testing.T) {
|
||||||
lokiSource := loki.LokiSource{}
|
lokiSource := loki.LokiSource{}
|
||||||
err := lokiSource.Configure([]byte(test.config), subLogger)
|
err := lokiSource.Configure([]byte(test.config), subLogger, configuration.METRICS_NONE)
|
||||||
cstest.AssertErrorContains(t, err, test.expectedErr)
|
cstest.AssertErrorContains(t, err, test.expectedErr)
|
||||||
|
|
||||||
if test.password != "" {
|
if test.password != "" {
|
||||||
|
@ -346,7 +347,7 @@ since: 1h
|
||||||
"type": "loki",
|
"type": "loki",
|
||||||
})
|
})
|
||||||
lokiSource := loki.LokiSource{}
|
lokiSource := loki.LokiSource{}
|
||||||
err := lokiSource.Configure([]byte(ts.config), subLogger)
|
err := lokiSource.Configure([]byte(ts.config), subLogger, configuration.METRICS_NONE)
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("Unexpected error : %s", err)
|
t.Fatalf("Unexpected error : %s", err)
|
||||||
|
@ -436,7 +437,7 @@ query: >
|
||||||
lokiTomb := tomb.Tomb{}
|
lokiTomb := tomb.Tomb{}
|
||||||
lokiSource := loki.LokiSource{}
|
lokiSource := loki.LokiSource{}
|
||||||
|
|
||||||
err := lokiSource.Configure([]byte(ts.config), subLogger)
|
err := lokiSource.Configure([]byte(ts.config), subLogger, configuration.METRICS_NONE)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("Unexpected error : %s", err)
|
t.Fatalf("Unexpected error : %s", err)
|
||||||
}
|
}
|
||||||
|
@ -514,7 +515,7 @@ query: >
|
||||||
title := time.Now().String()
|
title := time.Now().String()
|
||||||
lokiSource := loki.LokiSource{}
|
lokiSource := loki.LokiSource{}
|
||||||
|
|
||||||
err := lokiSource.Configure([]byte(config), subLogger)
|
err := lokiSource.Configure([]byte(config), subLogger, configuration.METRICS_NONE)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("Unexpected error : %s", err)
|
t.Fatalf("Unexpected error : %s", err)
|
||||||
}
|
}
|
||||||
|
|
|
@ -47,15 +47,16 @@ type S3Configuration struct {
|
||||||
}
|
}
|
||||||
|
|
||||||
type S3Source struct {
|
type S3Source struct {
|
||||||
Config S3Configuration
|
MetricsLevel int
|
||||||
logger *log.Entry
|
Config S3Configuration
|
||||||
s3Client s3iface.S3API
|
logger *log.Entry
|
||||||
sqsClient sqsiface.SQSAPI
|
s3Client s3iface.S3API
|
||||||
readerChan chan S3Object
|
sqsClient sqsiface.SQSAPI
|
||||||
t *tomb.Tomb
|
readerChan chan S3Object
|
||||||
out chan types.Event
|
t *tomb.Tomb
|
||||||
ctx aws.Context
|
out chan types.Event
|
||||||
cancel context.CancelFunc
|
ctx aws.Context
|
||||||
|
cancel context.CancelFunc
|
||||||
}
|
}
|
||||||
|
|
||||||
type S3Object struct {
|
type S3Object struct {
|
||||||
|
@ -345,7 +346,9 @@ func (s *S3Source) sqsPoll() error {
|
||||||
logger.Tracef("SQS output: %v", out)
|
logger.Tracef("SQS output: %v", out)
|
||||||
logger.Debugf("Received %d messages from SQS", len(out.Messages))
|
logger.Debugf("Received %d messages from SQS", len(out.Messages))
|
||||||
for _, message := range out.Messages {
|
for _, message := range out.Messages {
|
||||||
sqsMessagesReceived.WithLabelValues(s.Config.SQSName).Inc()
|
if s.MetricsLevel != configuration.METRICS_NONE {
|
||||||
|
sqsMessagesReceived.WithLabelValues(s.Config.SQSName).Inc()
|
||||||
|
}
|
||||||
bucket, key, err := s.extractBucketAndPrefix(message.Body)
|
bucket, key, err := s.extractBucketAndPrefix(message.Body)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logger.Errorf("Error while parsing SQS message: %s", err)
|
logger.Errorf("Error while parsing SQS message: %s", err)
|
||||||
|
@ -426,14 +429,20 @@ func (s *S3Source) readFile(bucket string, key string) error {
|
||||||
default:
|
default:
|
||||||
text := scanner.Text()
|
text := scanner.Text()
|
||||||
logger.Tracef("Read line %s", text)
|
logger.Tracef("Read line %s", text)
|
||||||
linesRead.WithLabelValues(bucket).Inc()
|
if s.MetricsLevel != configuration.METRICS_NONE {
|
||||||
|
linesRead.WithLabelValues(bucket).Inc()
|
||||||
|
}
|
||||||
l := types.Line{}
|
l := types.Line{}
|
||||||
l.Raw = text
|
l.Raw = text
|
||||||
l.Labels = s.Config.Labels
|
l.Labels = s.Config.Labels
|
||||||
l.Time = time.Now().UTC()
|
l.Time = time.Now().UTC()
|
||||||
l.Process = true
|
l.Process = true
|
||||||
l.Module = s.GetName()
|
l.Module = s.GetName()
|
||||||
l.Src = bucket + "/" + key
|
if s.MetricsLevel == configuration.METRICS_FULL {
|
||||||
|
l.Src = bucket + "/" + key
|
||||||
|
} else if s.MetricsLevel == configuration.METRICS_AGGREGATE {
|
||||||
|
l.Src = bucket
|
||||||
|
}
|
||||||
var evt types.Event
|
var evt types.Event
|
||||||
if !s.Config.UseTimeMachine {
|
if !s.Config.UseTimeMachine {
|
||||||
evt = types.Event{Line: l, Process: true, Type: types.LOG, ExpectMode: types.LIVE}
|
evt = types.Event{Line: l, Process: true, Type: types.LOG, ExpectMode: types.LIVE}
|
||||||
|
@ -446,7 +455,9 @@ func (s *S3Source) readFile(bucket string, key string) error {
|
||||||
if err := scanner.Err(); err != nil {
|
if err := scanner.Err(); err != nil {
|
||||||
return fmt.Errorf("failed to read object %s/%s: %s", bucket, key, err)
|
return fmt.Errorf("failed to read object %s/%s: %s", bucket, key, err)
|
||||||
}
|
}
|
||||||
objectsRead.WithLabelValues(bucket).Inc()
|
if s.MetricsLevel != configuration.METRICS_NONE {
|
||||||
|
objectsRead.WithLabelValues(bucket).Inc()
|
||||||
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -505,7 +516,7 @@ func (s *S3Source) UnmarshalConfig(yamlConfig []byte) error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *S3Source) Configure(yamlConfig []byte, logger *log.Entry) error {
|
func (s *S3Source) Configure(yamlConfig []byte, logger *log.Entry, metricsLevel int) error {
|
||||||
err := s.UnmarshalConfig(yamlConfig)
|
err := s.UnmarshalConfig(yamlConfig)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
|
|
|
@ -14,6 +14,7 @@ import (
|
||||||
"github.com/aws/aws-sdk-go/service/s3/s3iface"
|
"github.com/aws/aws-sdk-go/service/s3/s3iface"
|
||||||
"github.com/aws/aws-sdk-go/service/sqs"
|
"github.com/aws/aws-sdk-go/service/sqs"
|
||||||
"github.com/aws/aws-sdk-go/service/sqs/sqsiface"
|
"github.com/aws/aws-sdk-go/service/sqs/sqsiface"
|
||||||
|
"github.com/crowdsecurity/crowdsec/pkg/acquisition/configuration"
|
||||||
"github.com/crowdsecurity/crowdsec/pkg/types"
|
"github.com/crowdsecurity/crowdsec/pkg/types"
|
||||||
log "github.com/sirupsen/logrus"
|
log "github.com/sirupsen/logrus"
|
||||||
"github.com/stretchr/testify/assert"
|
"github.com/stretchr/testify/assert"
|
||||||
|
@ -66,7 +67,7 @@ sqs_name: foobar
|
||||||
for _, test := range tests {
|
for _, test := range tests {
|
||||||
t.Run(test.name, func(t *testing.T) {
|
t.Run(test.name, func(t *testing.T) {
|
||||||
f := S3Source{}
|
f := S3Source{}
|
||||||
err := f.Configure([]byte(test.config), nil)
|
err := f.Configure([]byte(test.config), nil, configuration.METRICS_NONE)
|
||||||
if err == nil {
|
if err == nil {
|
||||||
t.Fatalf("expected error, got none")
|
t.Fatalf("expected error, got none")
|
||||||
}
|
}
|
||||||
|
@ -111,7 +112,7 @@ polling_method: list
|
||||||
t.Run(test.name, func(t *testing.T) {
|
t.Run(test.name, func(t *testing.T) {
|
||||||
f := S3Source{}
|
f := S3Source{}
|
||||||
logger := log.NewEntry(log.New())
|
logger := log.NewEntry(log.New())
|
||||||
err := f.Configure([]byte(test.config), logger)
|
err := f.Configure([]byte(test.config), logger, configuration.METRICS_NONE)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("unexpected error: %s", err.Error())
|
t.Fatalf("unexpected error: %s", err.Error())
|
||||||
}
|
}
|
||||||
|
@ -306,7 +307,7 @@ prefix: foo/
|
||||||
f := S3Source{}
|
f := S3Source{}
|
||||||
logger := log.NewEntry(log.New())
|
logger := log.NewEntry(log.New())
|
||||||
logger.Logger.SetLevel(log.TraceLevel)
|
logger.Logger.SetLevel(log.TraceLevel)
|
||||||
err := f.Configure([]byte(test.config), logger)
|
err := f.Configure([]byte(test.config), logger, configuration.METRICS_NONE)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("unexpected error: %s", err.Error())
|
t.Fatalf("unexpected error: %s", err.Error())
|
||||||
}
|
}
|
||||||
|
@ -381,7 +382,7 @@ sqs_name: test
|
||||||
linesRead := 0
|
linesRead := 0
|
||||||
f := S3Source{}
|
f := S3Source{}
|
||||||
logger := log.NewEntry(log.New())
|
logger := log.NewEntry(log.New())
|
||||||
err := f.Configure([]byte(test.config), logger)
|
err := f.Configure([]byte(test.config), logger, configuration.METRICS_NONE)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("unexpected error: %s", err.Error())
|
t.Fatalf("unexpected error: %s", err.Error())
|
||||||
}
|
}
|
||||||
|
|
|
@ -29,10 +29,11 @@ type SyslogConfiguration struct {
|
||||||
}
|
}
|
||||||
|
|
||||||
type SyslogSource struct {
|
type SyslogSource struct {
|
||||||
config SyslogConfiguration
|
metricsLevel int
|
||||||
logger *log.Entry
|
config SyslogConfiguration
|
||||||
server *syslogserver.SyslogServer
|
logger *log.Entry
|
||||||
serverTomb *tomb.Tomb
|
server *syslogserver.SyslogServer
|
||||||
|
serverTomb *tomb.Tomb
|
||||||
}
|
}
|
||||||
|
|
||||||
var linesReceived = prometheus.NewCounterVec(
|
var linesReceived = prometheus.NewCounterVec(
|
||||||
|
@ -121,10 +122,10 @@ func (s *SyslogSource) UnmarshalConfig(yamlConfig []byte) error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *SyslogSource) Configure(yamlConfig []byte, logger *log.Entry) error {
|
func (s *SyslogSource) Configure(yamlConfig []byte, logger *log.Entry, MetricsLevel int) error {
|
||||||
s.logger = logger
|
s.logger = logger
|
||||||
s.logger.Infof("Starting syslog datasource configuration")
|
s.logger.Infof("Starting syslog datasource configuration")
|
||||||
|
s.metricsLevel = MetricsLevel
|
||||||
err := s.UnmarshalConfig(yamlConfig)
|
err := s.UnmarshalConfig(yamlConfig)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
|
@ -198,7 +199,9 @@ func (s *SyslogSource) handleSyslogMsg(out chan types.Event, t *tomb.Tomb, c cha
|
||||||
|
|
||||||
logger := s.logger.WithField("client", syslogLine.Client)
|
logger := s.logger.WithField("client", syslogLine.Client)
|
||||||
logger.Tracef("raw: %s", syslogLine)
|
logger.Tracef("raw: %s", syslogLine)
|
||||||
linesReceived.With(prometheus.Labels{"source": syslogLine.Client}).Inc()
|
if s.metricsLevel != configuration.METRICS_NONE {
|
||||||
|
linesReceived.With(prometheus.Labels{"source": syslogLine.Client}).Inc()
|
||||||
|
}
|
||||||
p := rfc3164.NewRFC3164Parser(rfc3164.WithCurrentYear())
|
p := rfc3164.NewRFC3164Parser(rfc3164.WithCurrentYear())
|
||||||
err := p.Parse(syslogLine.Message)
|
err := p.Parse(syslogLine.Message)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -211,10 +214,14 @@ func (s *SyslogSource) handleSyslogMsg(out chan types.Event, t *tomb.Tomb, c cha
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
line = s.buildLogFromSyslog(p2.Timestamp, p2.Hostname, p2.Tag, p2.PID, p2.Message)
|
line = s.buildLogFromSyslog(p2.Timestamp, p2.Hostname, p2.Tag, p2.PID, p2.Message)
|
||||||
linesParsed.With(prometheus.Labels{"source": syslogLine.Client, "type": "rfc5424"}).Inc()
|
if s.metricsLevel != configuration.METRICS_NONE {
|
||||||
|
linesParsed.With(prometheus.Labels{"source": syslogLine.Client, "type": "rfc5424"}).Inc()
|
||||||
|
}
|
||||||
} else {
|
} else {
|
||||||
line = s.buildLogFromSyslog(p.Timestamp, p.Hostname, p.Tag, p.PID, p.Message)
|
line = s.buildLogFromSyslog(p.Timestamp, p.Hostname, p.Tag, p.PID, p.Message)
|
||||||
linesParsed.With(prometheus.Labels{"source": syslogLine.Client, "type": "rfc3164"}).Inc()
|
if s.metricsLevel != configuration.METRICS_NONE {
|
||||||
|
linesParsed.With(prometheus.Labels{"source": syslogLine.Client, "type": "rfc3164"}).Inc()
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
line = strings.TrimSuffix(line, "\n")
|
line = strings.TrimSuffix(line, "\n")
|
||||||
|
|
|
@ -9,6 +9,7 @@ import (
|
||||||
|
|
||||||
"github.com/crowdsecurity/go-cs-lib/cstest"
|
"github.com/crowdsecurity/go-cs-lib/cstest"
|
||||||
|
|
||||||
|
"github.com/crowdsecurity/crowdsec/pkg/acquisition/configuration"
|
||||||
"github.com/crowdsecurity/crowdsec/pkg/types"
|
"github.com/crowdsecurity/crowdsec/pkg/types"
|
||||||
log "github.com/sirupsen/logrus"
|
log "github.com/sirupsen/logrus"
|
||||||
"gopkg.in/tomb.v2"
|
"gopkg.in/tomb.v2"
|
||||||
|
@ -56,7 +57,7 @@ listen_addr: 10.0.0`,
|
||||||
})
|
})
|
||||||
for _, test := range tests {
|
for _, test := range tests {
|
||||||
s := SyslogSource{}
|
s := SyslogSource{}
|
||||||
err := s.Configure([]byte(test.config), subLogger)
|
err := s.Configure([]byte(test.config), subLogger, configuration.METRICS_NONE)
|
||||||
cstest.AssertErrorContains(t, err, test.expectedErr)
|
cstest.AssertErrorContains(t, err, test.expectedErr)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -137,7 +138,7 @@ listen_addr: 127.0.0.1`,
|
||||||
"type": "syslog",
|
"type": "syslog",
|
||||||
})
|
})
|
||||||
s := SyslogSource{}
|
s := SyslogSource{}
|
||||||
err := s.Configure([]byte(ts.config), subLogger)
|
err := s.Configure([]byte(ts.config), subLogger, configuration.METRICS_NONE)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("could not configure syslog source : %s", err)
|
t.Fatalf("could not configure syslog source : %s", err)
|
||||||
}
|
}
|
||||||
|
|
|
@ -23,7 +23,7 @@ func (w *WinEventLogSource) UnmarshalConfig(yamlConfig []byte) error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (w *WinEventLogSource) Configure(yamlConfig []byte, logger *log.Entry) error {
|
func (w *WinEventLogSource) Configure(yamlConfig []byte, logger *log.Entry, metricsLevel int) error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -7,6 +7,7 @@ import (
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"github.com/crowdsecurity/crowdsec/pkg/acquisition/configuration"
|
||||||
"github.com/crowdsecurity/crowdsec/pkg/exprhelpers"
|
"github.com/crowdsecurity/crowdsec/pkg/exprhelpers"
|
||||||
"github.com/crowdsecurity/crowdsec/pkg/types"
|
"github.com/crowdsecurity/crowdsec/pkg/types"
|
||||||
log "github.com/sirupsen/logrus"
|
log "github.com/sirupsen/logrus"
|
||||||
|
@ -58,7 +59,7 @@ xpath_query: test`,
|
||||||
})
|
})
|
||||||
for _, test := range tests {
|
for _, test := range tests {
|
||||||
f := WinEventLogSource{}
|
f := WinEventLogSource{}
|
||||||
err := f.Configure([]byte(test.config), subLogger)
|
err := f.Configure([]byte(test.config), subLogger, configuration.METRICS_NONE)
|
||||||
assert.Contains(t, err.Error(), test.expectedErr)
|
assert.Contains(t, err.Error(), test.expectedErr)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -117,7 +118,7 @@ event_level: bla`,
|
||||||
})
|
})
|
||||||
for _, test := range tests {
|
for _, test := range tests {
|
||||||
f := WinEventLogSource{}
|
f := WinEventLogSource{}
|
||||||
f.Configure([]byte(test.config), subLogger)
|
f.Configure([]byte(test.config), subLogger, configuration.METRICS_NONE)
|
||||||
q, err := f.buildXpathQuery()
|
q, err := f.buildXpathQuery()
|
||||||
if test.expectedErr != "" {
|
if test.expectedErr != "" {
|
||||||
if err == nil {
|
if err == nil {
|
||||||
|
@ -194,7 +195,7 @@ event_ids:
|
||||||
to := &tomb.Tomb{}
|
to := &tomb.Tomb{}
|
||||||
c := make(chan types.Event)
|
c := make(chan types.Event)
|
||||||
f := WinEventLogSource{}
|
f := WinEventLogSource{}
|
||||||
f.Configure([]byte(test.config), subLogger)
|
f.Configure([]byte(test.config), subLogger, configuration.METRICS_NONE)
|
||||||
f.StreamingAcquisition(c, to)
|
f.StreamingAcquisition(c, to)
|
||||||
time.Sleep(time.Second)
|
time.Sleep(time.Second)
|
||||||
lines := test.expectedLines
|
lines := test.expectedLines
|
||||||
|
|
|
@ -34,11 +34,12 @@ type WinEventLogConfiguration struct {
|
||||||
}
|
}
|
||||||
|
|
||||||
type WinEventLogSource struct {
|
type WinEventLogSource struct {
|
||||||
config WinEventLogConfiguration
|
metricsLevel int
|
||||||
logger *log.Entry
|
config WinEventLogConfiguration
|
||||||
evtConfig *winlog.SubscribeConfig
|
logger *log.Entry
|
||||||
query string
|
evtConfig *winlog.SubscribeConfig
|
||||||
name string
|
query string
|
||||||
|
name string
|
||||||
}
|
}
|
||||||
|
|
||||||
type QueryList struct {
|
type QueryList struct {
|
||||||
|
@ -188,7 +189,9 @@ func (w *WinEventLogSource) getEvents(out chan types.Event, t *tomb.Tomb) error
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
for _, event := range renderedEvents {
|
for _, event := range renderedEvents {
|
||||||
linesRead.With(prometheus.Labels{"source": w.name}).Inc()
|
if w.metricsLevel != configuration.METRICS_NONE {
|
||||||
|
linesRead.With(prometheus.Labels{"source": w.name}).Inc()
|
||||||
|
}
|
||||||
l := types.Line{}
|
l := types.Line{}
|
||||||
l.Raw = event
|
l.Raw = event
|
||||||
l.Module = w.GetName()
|
l.Module = w.GetName()
|
||||||
|
@ -270,8 +273,9 @@ func (w *WinEventLogSource) UnmarshalConfig(yamlConfig []byte) error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (w *WinEventLogSource) Configure(yamlConfig []byte, logger *log.Entry) error {
|
func (w *WinEventLogSource) Configure(yamlConfig []byte, logger *log.Entry, MetricsLevel int) error {
|
||||||
w.logger = logger
|
w.logger = logger
|
||||||
|
w.metricsLevel = MetricsLevel
|
||||||
|
|
||||||
err := w.UnmarshalConfig(yamlConfig)
|
err := w.UnmarshalConfig(yamlConfig)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|
|
@ -12,6 +12,7 @@ import (
|
||||||
log "github.com/sirupsen/logrus"
|
log "github.com/sirupsen/logrus"
|
||||||
"gopkg.in/yaml.v3"
|
"gopkg.in/yaml.v3"
|
||||||
|
|
||||||
|
"github.com/crowdsecurity/crowdsec/pkg/acquisition/configuration"
|
||||||
"github.com/crowdsecurity/go-cs-lib/csstring"
|
"github.com/crowdsecurity/go-cs-lib/csstring"
|
||||||
"github.com/crowdsecurity/go-cs-lib/ptr"
|
"github.com/crowdsecurity/go-cs-lib/ptr"
|
||||||
"github.com/crowdsecurity/go-cs-lib/yamlpatch"
|
"github.com/crowdsecurity/go-cs-lib/yamlpatch"
|
||||||
|
@ -118,7 +119,7 @@ func NewDefaultConfig() *Config {
|
||||||
}
|
}
|
||||||
prometheus := PrometheusCfg{
|
prometheus := PrometheusCfg{
|
||||||
Enabled: true,
|
Enabled: true,
|
||||||
Level: "full",
|
Level: configuration.CFG_METRICS_FULL,
|
||||||
}
|
}
|
||||||
configPaths := ConfigurationPaths{
|
configPaths := ConfigurationPaths{
|
||||||
ConfigDir: DefaultConfigPath("."),
|
ConfigDir: DefaultConfigPath("."),
|
||||||
|
|
Loading…
Reference in a new issue