From b1c09f75127704fea25dacaad1bccb7cddd008f5 Mon Sep 17 00:00:00 2001 From: "Thibault \"bui\" Koechlin" Date: Wed, 13 Mar 2024 14:57:19 +0100 Subject: [PATCH] acquisition : take prometheus level into account (#2885) * properly take into account the aggregation level of prometheus metrics in acquisition --- cmd/crowdsec/crowdsec.go | 3 +- cmd/crowdsec/main.go | 2 +- cmd/crowdsec/metrics.go | 3 +- pkg/acquisition/acquisition.go | 30 +++++++++++--- pkg/acquisition/acquisition_test.go | 16 ++++---- .../configuration/configuration.go | 11 ++++++ pkg/acquisition/modules/appsec/appsec.go | 5 ++- .../modules/cloudwatch/cloudwatch.go | 22 ++++++++--- .../modules/cloudwatch/cloudwatch_test.go | 5 ++- pkg/acquisition/modules/docker/docker.go | 9 +++-- pkg/acquisition/modules/docker/docker_test.go | 9 +++-- pkg/acquisition/modules/file/file.go | 15 +++++-- pkg/acquisition/modules/file/file_test.go | 9 +++-- .../modules/journalctl/journalctl.go | 16 +++++--- .../modules/journalctl/journalctl_test.go | 15 +++---- pkg/acquisition/modules/kafka/kafka.go | 14 ++++--- pkg/acquisition/modules/kafka/kafka_test.go | 7 ++-- pkg/acquisition/modules/kinesis/kinesis.go | 16 +++++--- .../modules/kinesis/kinesis_test.go | 9 +++-- .../modules/kubernetesaudit/k8s_audit.go | 25 +++++++----- .../modules/kubernetesaudit/k8s_audit_test.go | 5 ++- pkg/acquisition/modules/loki/loki.go | 10 +++-- pkg/acquisition/modules/loki/loki_test.go | 9 +++-- pkg/acquisition/modules/s3/s3.go | 39 ++++++++++++------- pkg/acquisition/modules/s3/s3_test.go | 9 +++-- pkg/acquisition/modules/syslog/syslog.go | 25 +++++++----- pkg/acquisition/modules/syslog/syslog_test.go | 5 ++- .../modules/wineventlog/wineventlog.go | 2 +- .../modules/wineventlog/wineventlog_test.go | 7 ++-- .../wineventlog/wineventlog_windows.go | 18 +++++---- pkg/csconfig/config.go | 3 +- 31 files changed, 245 insertions(+), 128 deletions(-) diff --git a/cmd/crowdsec/crowdsec.go b/cmd/crowdsec/crowdsec.go index 37a12b9d3..f604af1de 100644 --- a/cmd/crowdsec/crowdsec.go +++ b/cmd/crowdsec/crowdsec.go @@ -14,6 +14,7 @@ import ( "github.com/crowdsecurity/go-cs-lib/trace" "github.com/crowdsecurity/crowdsec/pkg/acquisition" + "github.com/crowdsecurity/crowdsec/pkg/acquisition/configuration" "github.com/crowdsecurity/crowdsec/pkg/alertcontext" "github.com/crowdsecurity/crowdsec/pkg/appsec" "github.com/crowdsecurity/crowdsec/pkg/csconfig" @@ -147,7 +148,7 @@ func runCrowdsec(cConfig *csconfig.Config, parsers *parser.Parsers, hub *cwhub.H if cConfig.Prometheus != nil && cConfig.Prometheus.Enabled { aggregated := false - if cConfig.Prometheus.Level == "aggregated" { + if cConfig.Prometheus.Level == configuration.CFG_METRICS_AGGREGATE { aggregated = true } diff --git a/cmd/crowdsec/main.go b/cmd/crowdsec/main.go index 895079048..5f04e9b99 100644 --- a/cmd/crowdsec/main.go +++ b/cmd/crowdsec/main.go @@ -123,7 +123,7 @@ func LoadAcquisition(cConfig *csconfig.Config) ([]acquisition.DataSource, error) return nil, fmt.Errorf("failed to configure datasource for %s: %w", flags.OneShotDSN, err) } } else { - dataSources, err = acquisition.LoadAcquisitionFromFile(cConfig.Crowdsec) + dataSources, err = acquisition.LoadAcquisitionFromFile(cConfig.Crowdsec, cConfig.Prometheus) if err != nil { return nil, err } diff --git a/cmd/crowdsec/metrics.go b/cmd/crowdsec/metrics.go index 682968bb7..d670051ce 100644 --- a/cmd/crowdsec/metrics.go +++ b/cmd/crowdsec/metrics.go @@ -12,6 +12,7 @@ import ( "github.com/crowdsecurity/go-cs-lib/trace" "github.com/crowdsecurity/go-cs-lib/version" + "github.com/crowdsecurity/crowdsec/pkg/acquisition/configuration" v1 "github.com/crowdsecurity/crowdsec/pkg/apiserver/controllers/v1" "github.com/crowdsecurity/crowdsec/pkg/cache" "github.com/crowdsecurity/crowdsec/pkg/csconfig" @@ -161,7 +162,7 @@ func registerPrometheus(config *csconfig.PrometheusCfg) { // Registering prometheus // If in aggregated mode, do not register events associated with a source, to keep the cardinality low - if config.Level == "aggregated" { + if config.Level == configuration.CFG_METRICS_AGGREGATE { log.Infof("Loading aggregated prometheus collectors") prometheus.MustRegister(globalParserHits, globalParserHitsOk, globalParserHitsKo, globalCsInfo, globalParsingHistogram, globalPourHistogram, diff --git a/pkg/acquisition/acquisition.go b/pkg/acquisition/acquisition.go index 336029363..677bf664e 100644 --- a/pkg/acquisition/acquisition.go +++ b/pkg/acquisition/acquisition.go @@ -54,7 +54,7 @@ type DataSource interface { GetMetrics() []prometheus.Collector // Returns pointers to metrics that are managed by the module GetAggregMetrics() []prometheus.Collector // Returns pointers to metrics that are managed by the module (aggregated mode, limits cardinality) UnmarshalConfig([]byte) error // Decode and pre-validate the YAML datasource - anything that can be checked before runtime - Configure([]byte, *log.Entry) error // Complete the YAML datasource configuration and perform runtime checks. + Configure([]byte, *log.Entry, int) error // Complete the YAML datasource configuration and perform runtime checks. ConfigureByDSN(string, map[string]string, *log.Entry, string) error // Configure the datasource GetMode() string // Get the mode (TAIL, CAT or SERVER) GetName() string // Get the name of the module @@ -94,7 +94,7 @@ func GetDataSourceIface(dataSourceType string) DataSource { // if the configuration is not valid it returns an error. // If the datasource can't be run (eg. journalctl not available), it still returns an error which // can be checked for the appropriate action. -func DataSourceConfigure(commonConfig configuration.DataSourceCommonCfg) (*DataSource, error) { +func DataSourceConfigure(commonConfig configuration.DataSourceCommonCfg, metricsLevel int) (*DataSource, error) { // we dump it back to []byte, because we want to decode the yaml blob twice: // once to DataSourceCommonCfg, and then later to the dedicated type of the datasource yamlConfig, err := yaml.Marshal(commonConfig) @@ -122,7 +122,7 @@ func DataSourceConfigure(commonConfig configuration.DataSourceCommonCfg) (*DataS return nil, &DataSourceUnavailableError{Name: commonConfig.Source, Err: err} } /* configure the actual datasource */ - if err := dataSrc.Configure(yamlConfig, subLogger); err != nil { + if err := dataSrc.Configure(yamlConfig, subLogger, metricsLevel); err != nil { return nil, fmt.Errorf("failed to configure datasource %s: %w", commonConfig.Source, err) } @@ -180,10 +180,30 @@ func LoadAcquisitionFromDSN(dsn string, labels map[string]string, transformExpr return sources, nil } +func GetMetricsLevelFromPromCfg(prom *csconfig.PrometheusCfg) int { + if prom == nil { + return configuration.METRICS_FULL + + } + if !prom.Enabled { + return configuration.METRICS_NONE + } + if prom.Level == configuration.CFG_METRICS_AGGREGATE { + return configuration.METRICS_AGGREGATE + } + + if prom.Level == configuration.CFG_METRICS_FULL { + return configuration.METRICS_FULL + } + return configuration.METRICS_FULL + +} + // LoadAcquisitionFromFile unmarshals the configuration item and checks its availability -func LoadAcquisitionFromFile(config *csconfig.CrowdsecServiceCfg) ([]DataSource, error) { +func LoadAcquisitionFromFile(config *csconfig.CrowdsecServiceCfg, prom *csconfig.PrometheusCfg) ([]DataSource, error) { var sources []DataSource + metrics_level := GetMetricsLevelFromPromCfg(prom) for _, acquisFile := range config.AcquisitionFiles { log.Infof("loading acquisition file : %s", acquisFile) yamlFile, err := os.Open(acquisFile) @@ -225,7 +245,7 @@ func LoadAcquisitionFromFile(config *csconfig.CrowdsecServiceCfg) ([]DataSource, } uniqueId := uuid.NewString() sub.UniqueId = uniqueId - src, err := DataSourceConfigure(sub) + src, err := DataSourceConfigure(sub, metrics_level) if err != nil { var dserr *DataSourceUnavailableError if errors.As(err, &dserr) { diff --git a/pkg/acquisition/acquisition_test.go b/pkg/acquisition/acquisition_test.go index 44b3878e1..33e494855 100644 --- a/pkg/acquisition/acquisition_test.go +++ b/pkg/acquisition/acquisition_test.go @@ -35,7 +35,7 @@ func (f *MockSource) UnmarshalConfig(cfg []byte) error { return nil } -func (f *MockSource) Configure(cfg []byte, logger *log.Entry) error { +func (f *MockSource) Configure(cfg []byte, logger *log.Entry, metricsLevel int) error { f.logger = logger if err := f.UnmarshalConfig(cfg); err != nil { return err @@ -182,7 +182,7 @@ wowo: ajsajasjas t.Run(tc.TestName, func(t *testing.T) { common := configuration.DataSourceCommonCfg{} yaml.Unmarshal([]byte(tc.String), &common) - ds, err := DataSourceConfigure(common) + ds, err := DataSourceConfigure(common, configuration.METRICS_NONE) cstest.RequireErrorContains(t, err, tc.ExpectedError) if tc.ExpectedError != "" { return @@ -283,7 +283,7 @@ func TestLoadAcquisitionFromFile(t *testing.T) { for _, tc := range tests { tc := tc t.Run(tc.TestName, func(t *testing.T) { - dss, err := LoadAcquisitionFromFile(&tc.Config) + dss, err := LoadAcquisitionFromFile(&tc.Config, nil) cstest.RequireErrorContains(t, err, tc.ExpectedError) if tc.ExpectedError != "" { return @@ -305,7 +305,7 @@ type MockCat struct { logger *log.Entry } -func (f *MockCat) Configure(cfg []byte, logger *log.Entry) error { +func (f *MockCat) Configure(cfg []byte, logger *log.Entry, metricsLevel int) error { f.logger = logger if f.Mode == "" { f.Mode = configuration.CAT_MODE @@ -349,7 +349,7 @@ type MockTail struct { logger *log.Entry } -func (f *MockTail) Configure(cfg []byte, logger *log.Entry) error { +func (f *MockTail) Configure(cfg []byte, logger *log.Entry, metricsLevel int) error { f.logger = logger if f.Mode == "" { f.Mode = configuration.TAIL_MODE @@ -497,8 +497,10 @@ type MockSourceByDSN struct { logger *log.Entry //nolint: unused } -func (f *MockSourceByDSN) UnmarshalConfig(cfg []byte) error { return nil } -func (f *MockSourceByDSN) Configure(cfg []byte, logger *log.Entry) error { return nil } +func (f *MockSourceByDSN) UnmarshalConfig(cfg []byte) error { return nil } +func (f *MockSourceByDSN) Configure(cfg []byte, logger *log.Entry, metricsLevel int) error { + return nil +} func (f *MockSourceByDSN) GetMode() string { return f.Mode } func (f *MockSourceByDSN) OneShotAcquisition(chan types.Event, *tomb.Tomb) error { return nil } func (f *MockSourceByDSN) StreamingAcquisition(chan types.Event, *tomb.Tomb) error { return nil } diff --git a/pkg/acquisition/configuration/configuration.go b/pkg/acquisition/configuration/configuration.go index 5ec1a4ac4..3e27da1b9 100644 --- a/pkg/acquisition/configuration/configuration.go +++ b/pkg/acquisition/configuration/configuration.go @@ -19,3 +19,14 @@ type DataSourceCommonCfg struct { var TAIL_MODE = "tail" var CAT_MODE = "cat" var SERVER_MODE = "server" // No difference with tail, just a bit more verbose + +const ( + METRICS_NONE = iota + METRICS_AGGREGATE + METRICS_FULL +) + +const ( + CFG_METRICS_AGGREGATE = "aggregated" + CFG_METRICS_FULL = "full" +) diff --git a/pkg/acquisition/modules/appsec/appsec.go b/pkg/acquisition/modules/appsec/appsec.go index a3c8c7dd8..f97905406 100644 --- a/pkg/acquisition/modules/appsec/appsec.go +++ b/pkg/acquisition/modules/appsec/appsec.go @@ -49,6 +49,7 @@ type AppsecSourceConfig struct { // runtime structure of AppsecSourceConfig type AppsecSource struct { + metricsLevel int config AppsecSourceConfig logger *log.Entry mux *http.ServeMux @@ -149,13 +150,13 @@ func (w *AppsecSource) GetAggregMetrics() []prometheus.Collector { return []prometheus.Collector{AppsecReqCounter, AppsecBlockCounter, AppsecRuleHits, AppsecOutbandParsingHistogram, AppsecInbandParsingHistogram, AppsecGlobalParsingHistogram} } -func (w *AppsecSource) Configure(yamlConfig []byte, logger *log.Entry) error { +func (w *AppsecSource) Configure(yamlConfig []byte, logger *log.Entry, MetricsLevel int) error { err := w.UnmarshalConfig(yamlConfig) if err != nil { return errors.Wrap(err, "unable to parse appsec configuration") } w.logger = logger - + w.metricsLevel = MetricsLevel w.logger.Tracef("Appsec configuration: %+v", w.config) if w.config.AuthCacheDuration == nil { diff --git a/pkg/acquisition/modules/cloudwatch/cloudwatch.go b/pkg/acquisition/modules/cloudwatch/cloudwatch.go index 89887bef0..1ac1465d3 100644 --- a/pkg/acquisition/modules/cloudwatch/cloudwatch.go +++ b/pkg/acquisition/modules/cloudwatch/cloudwatch.go @@ -43,7 +43,8 @@ var linesRead = prometheus.NewCounterVec( // CloudwatchSource is the runtime instance keeping track of N streams within 1 cloudwatch group type CloudwatchSource struct { - Config CloudwatchSourceConfiguration + metricsLevel int + Config CloudwatchSourceConfiguration /*runtime stuff*/ logger *log.Entry t *tomb.Tomb @@ -152,11 +153,12 @@ func (cw *CloudwatchSource) UnmarshalConfig(yamlConfig []byte) error { return nil } -func (cw *CloudwatchSource) Configure(yamlConfig []byte, logger *log.Entry) error { +func (cw *CloudwatchSource) Configure(yamlConfig []byte, logger *log.Entry, MetricsLevel int) error { err := cw.UnmarshalConfig(yamlConfig) if err != nil { return err } + cw.metricsLevel = MetricsLevel cw.logger = logger.WithField("group", cw.Config.GroupName) @@ -385,7 +387,9 @@ func (cw *CloudwatchSource) LogStreamManager(in chan LogStreamTailConfig, outCha if !stream.t.Alive() { cw.logger.Debugf("stream %s already exists, but is dead", newStream.StreamName) cw.monitoredStreams = append(cw.monitoredStreams[:idx], cw.monitoredStreams[idx+1:]...) - openedStreams.With(prometheus.Labels{"group": newStream.GroupName}).Dec() + if cw.metricsLevel != configuration.METRICS_NONE { + openedStreams.With(prometheus.Labels{"group": newStream.GroupName}).Dec() + } break } shouldCreate = false @@ -395,7 +399,9 @@ func (cw *CloudwatchSource) LogStreamManager(in chan LogStreamTailConfig, outCha //let's start watching this stream if shouldCreate { - openedStreams.With(prometheus.Labels{"group": newStream.GroupName}).Inc() + if cw.metricsLevel != configuration.METRICS_NONE { + openedStreams.With(prometheus.Labels{"group": newStream.GroupName}).Inc() + } newStream.t = tomb.Tomb{} newStream.logger = cw.logger.WithFields(log.Fields{"stream": newStream.StreamName}) cw.logger.Debugf("starting tail of stream %s", newStream.StreamName) @@ -409,7 +415,9 @@ func (cw *CloudwatchSource) LogStreamManager(in chan LogStreamTailConfig, outCha for idx, stream := range cw.monitoredStreams { if !cw.monitoredStreams[idx].t.Alive() { cw.logger.Debugf("remove dead stream %s", stream.StreamName) - openedStreams.With(prometheus.Labels{"group": cw.monitoredStreams[idx].GroupName}).Dec() + if cw.metricsLevel != configuration.METRICS_NONE { + openedStreams.With(prometheus.Labels{"group": cw.monitoredStreams[idx].GroupName}).Dec() + } } else { newMonitoredStreams = append(newMonitoredStreams, stream) } @@ -485,7 +493,9 @@ func (cw *CloudwatchSource) TailLogStream(cfg *LogStreamTailConfig, outChan chan cfg.logger.Warningf("cwLogToEvent error, discarded event : %s", err) } else { cfg.logger.Debugf("pushing message : %s", evt.Line.Raw) - linesRead.With(prometheus.Labels{"group": cfg.GroupName, "stream": cfg.StreamName}).Inc() + if cw.metricsLevel != configuration.METRICS_NONE { + linesRead.With(prometheus.Labels{"group": cfg.GroupName, "stream": cfg.StreamName}).Inc() + } outChan <- evt } } diff --git a/pkg/acquisition/modules/cloudwatch/cloudwatch_test.go b/pkg/acquisition/modules/cloudwatch/cloudwatch_test.go index 5d64755e2..11842e61f 100644 --- a/pkg/acquisition/modules/cloudwatch/cloudwatch_test.go +++ b/pkg/acquisition/modules/cloudwatch/cloudwatch_test.go @@ -13,6 +13,7 @@ import ( "github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/service/cloudwatchlogs" + "github.com/crowdsecurity/crowdsec/pkg/acquisition/configuration" "github.com/crowdsecurity/crowdsec/pkg/types" log "github.com/sirupsen/logrus" "github.com/stretchr/testify/require" @@ -427,7 +428,7 @@ stream_name: test_stream`), dbgLogger.Logger.SetLevel(log.DebugLevel) dbgLogger.Infof("starting test") cw := CloudwatchSource{} - err := cw.Configure(tc.config, dbgLogger) + err := cw.Configure(tc.config, dbgLogger, configuration.METRICS_NONE) cstest.RequireErrorContains(t, err, tc.expectedCfgErr) if tc.expectedCfgErr != "" { @@ -559,7 +560,7 @@ stream_name: test_stream`), dbgLogger := log.New().WithField("test", tc.name) dbgLogger.Logger.SetLevel(log.DebugLevel) cw := CloudwatchSource{} - err := cw.Configure(tc.config, dbgLogger) + err := cw.Configure(tc.config, dbgLogger, configuration.METRICS_NONE) cstest.RequireErrorContains(t, err, tc.expectedCfgErr) if tc.expectedCfgErr != "" { return diff --git a/pkg/acquisition/modules/docker/docker.go b/pkg/acquisition/modules/docker/docker.go index 60f1100b3..9f1febf2c 100644 --- a/pkg/acquisition/modules/docker/docker.go +++ b/pkg/acquisition/modules/docker/docker.go @@ -46,6 +46,7 @@ type DockerConfiguration struct { } type DockerSource struct { + metricsLevel int Config DockerConfiguration runningContainerState map[string]*ContainerConfig compiledContainerName []*regexp.Regexp @@ -128,9 +129,9 @@ func (d *DockerSource) UnmarshalConfig(yamlConfig []byte) error { return nil } -func (d *DockerSource) Configure(yamlConfig []byte, logger *log.Entry) error { +func (d *DockerSource) Configure(yamlConfig []byte, logger *log.Entry, MetricsLevel int) error { d.logger = logger - + d.metricsLevel = MetricsLevel err := d.UnmarshalConfig(yamlConfig) if err != nil { return err @@ -325,7 +326,9 @@ func (d *DockerSource) OneShotAcquisition(out chan types.Event, t *tomb.Tomb) er l.Src = containerConfig.Name l.Process = true l.Module = d.GetName() - linesRead.With(prometheus.Labels{"source": containerConfig.Name}).Inc() + if d.metricsLevel != configuration.METRICS_NONE { + linesRead.With(prometheus.Labels{"source": containerConfig.Name}).Inc() + } evt := types.Event{Line: l, Process: true, Type: types.LOG, ExpectMode: types.TIMEMACHINE} out <- evt d.logger.Debugf("Sent line to parsing: %+v", evt.Line.Raw) diff --git a/pkg/acquisition/modules/docker/docker_test.go b/pkg/acquisition/modules/docker/docker_test.go index c4d23168a..6c010f895 100644 --- a/pkg/acquisition/modules/docker/docker_test.go +++ b/pkg/acquisition/modules/docker/docker_test.go @@ -13,6 +13,7 @@ import ( "github.com/crowdsecurity/go-cs-lib/cstest" + "github.com/crowdsecurity/crowdsec/pkg/acquisition/configuration" "github.com/crowdsecurity/crowdsec/pkg/types" dockerTypes "github.com/docker/docker/api/types" dockerContainer "github.com/docker/docker/api/types/container" @@ -60,7 +61,7 @@ container_name: for _, test := range tests { f := DockerSource{} - err := f.Configure([]byte(test.config), subLogger) + err := f.Configure([]byte(test.config), subLogger, configuration.METRICS_NONE) cstest.AssertErrorContains(t, err, test.expectedErr) } } @@ -162,7 +163,7 @@ container_name_regexp: for _, ts := range tests { var ( - logger *log.Logger + logger *log.Logger subLogger *log.Entry ) @@ -182,7 +183,7 @@ container_name_regexp: out := make(chan types.Event) dockerSource := DockerSource{} - err := dockerSource.Configure([]byte(ts.config), subLogger) + err := dockerSource.Configure([]byte(ts.config), subLogger, configuration.METRICS_NONE) if err != nil { t.Fatalf("Unexpected error : %s", err) } @@ -304,7 +305,7 @@ func TestOneShot(t *testing.T) { for _, ts := range tests { var ( subLogger *log.Entry - logger *log.Logger + logger *log.Logger ) if ts.expectedOutput != "" { diff --git a/pkg/acquisition/modules/file/file.go b/pkg/acquisition/modules/file/file.go index a0c226574..efc897152 100644 --- a/pkg/acquisition/modules/file/file.go +++ b/pkg/acquisition/modules/file/file.go @@ -46,6 +46,7 @@ type FileConfiguration struct { } type FileSource struct { + metricsLevel int config FileConfiguration watcher *fsnotify.Watcher watchedDirectories map[string]bool @@ -98,8 +99,9 @@ func (f *FileSource) UnmarshalConfig(yamlConfig []byte) error { return nil } -func (f *FileSource) Configure(yamlConfig []byte, logger *log.Entry) error { +func (f *FileSource) Configure(yamlConfig []byte, logger *log.Entry, MetricsLevel int) error { f.logger = logger + f.metricsLevel = MetricsLevel err := f.UnmarshalConfig(yamlConfig) if err != nil { @@ -517,12 +519,19 @@ func (f *FileSource) tailFile(out chan types.Event, t *tomb.Tomb, tail *tail.Tai if line.Text == "" { //skip empty lines continue } - linesRead.With(prometheus.Labels{"source": tail.Filename}).Inc() + if f.metricsLevel != configuration.METRICS_NONE { + linesRead.With(prometheus.Labels{"source": tail.Filename}).Inc() + } + src := tail.Filename + if f.metricsLevel == configuration.METRICS_AGGREGATE { + src = filepath.Base(tail.Filename) + } + l := types.Line{ Raw: trimLine(line.Text), Labels: f.config.Labels, Time: line.Time, - Src: tail.Filename, + Src: src, Process: true, Module: f.GetName(), } diff --git a/pkg/acquisition/modules/file/file_test.go b/pkg/acquisition/modules/file/file_test.go index 410beb4bc..ad5fe8bfa 100644 --- a/pkg/acquisition/modules/file/file_test.go +++ b/pkg/acquisition/modules/file/file_test.go @@ -15,6 +15,7 @@ import ( "github.com/crowdsecurity/go-cs-lib/cstest" + "github.com/crowdsecurity/crowdsec/pkg/acquisition/configuration" fileacquisition "github.com/crowdsecurity/crowdsec/pkg/acquisition/modules/file" "github.com/crowdsecurity/crowdsec/pkg/types" ) @@ -56,7 +57,7 @@ exclude_regexps: ["as[a-$d"]`, tc := tc t.Run(tc.name, func(t *testing.T) { f := fileacquisition.FileSource{} - err := f.Configure([]byte(tc.config), subLogger) + err := f.Configure([]byte(tc.config), subLogger, configuration.METRICS_NONE) cstest.RequireErrorContains(t, err, tc.expectedErr) }) } @@ -222,7 +223,7 @@ filename: test_files/test_delete.log`, tc.setup() } - err := f.Configure([]byte(tc.config), subLogger) + err := f.Configure([]byte(tc.config), subLogger, configuration.METRICS_NONE) cstest.RequireErrorContains(t, err, tc.expectedConfigErr) if tc.expectedConfigErr != "" { return @@ -384,7 +385,7 @@ force_inotify: true`, testPattern), tc.setup() } - err := f.Configure([]byte(tc.config), subLogger) + err := f.Configure([]byte(tc.config), subLogger, configuration.METRICS_NONE) require.NoError(t, err) if tc.afterConfigure != nil { @@ -455,7 +456,7 @@ exclude_regexps: ["\\.gz$"]` }) f := fileacquisition.FileSource{} - if err := f.Configure([]byte(config), subLogger); err != nil { + if err := f.Configure([]byte(config), subLogger, configuration.METRICS_NONE); err != nil { subLogger.Fatalf("unexpected error: %s", err) } diff --git a/pkg/acquisition/modules/journalctl/journalctl.go b/pkg/acquisition/modules/journalctl/journalctl.go index 55091a7b5..e8bb5a3ed 100644 --- a/pkg/acquisition/modules/journalctl/journalctl.go +++ b/pkg/acquisition/modules/journalctl/journalctl.go @@ -26,10 +26,11 @@ type JournalCtlConfiguration struct { } type JournalCtlSource struct { - config JournalCtlConfiguration - logger *log.Entry - src string - args []string + metricsLevel int + config JournalCtlConfiguration + logger *log.Entry + src string + args []string } const journalctlCmd string = "journalctl" @@ -131,7 +132,9 @@ func (j *JournalCtlSource) runJournalCtl(out chan types.Event, t *tomb.Tomb) err l.Src = j.src l.Process = true l.Module = j.GetName() - linesRead.With(prometheus.Labels{"source": j.src}).Inc() + if j.metricsLevel != configuration.METRICS_NONE { + linesRead.With(prometheus.Labels{"source": j.src}).Inc() + } var evt types.Event if !j.config.UseTimeMachine { evt = types.Event{Line: l, Process: true, Type: types.LOG, ExpectMode: types.LIVE} @@ -194,8 +197,9 @@ func (j *JournalCtlSource) UnmarshalConfig(yamlConfig []byte) error { return nil } -func (j *JournalCtlSource) Configure(yamlConfig []byte, logger *log.Entry) error { +func (j *JournalCtlSource) Configure(yamlConfig []byte, logger *log.Entry, MetricsLevel int) error { j.logger = logger + j.metricsLevel = MetricsLevel err := j.UnmarshalConfig(yamlConfig) if err != nil { diff --git a/pkg/acquisition/modules/journalctl/journalctl_test.go b/pkg/acquisition/modules/journalctl/journalctl_test.go index a91fba31b..9d1f1bb7e 100644 --- a/pkg/acquisition/modules/journalctl/journalctl_test.go +++ b/pkg/acquisition/modules/journalctl/journalctl_test.go @@ -10,6 +10,7 @@ import ( "github.com/crowdsecurity/go-cs-lib/cstest" + "github.com/crowdsecurity/crowdsec/pkg/acquisition/configuration" "github.com/crowdsecurity/crowdsec/pkg/types" log "github.com/sirupsen/logrus" "github.com/sirupsen/logrus/hooks/test" @@ -52,7 +53,7 @@ journalctl_filter: for _, test := range tests { f := JournalCtlSource{} - err := f.Configure([]byte(test.config), subLogger) + err := f.Configure([]byte(test.config), subLogger, configuration.METRICS_NONE) cstest.AssertErrorContains(t, err, test.expectedErr) } } @@ -144,9 +145,9 @@ journalctl_filter: } for _, ts := range tests { var ( - logger *log.Logger + logger *log.Logger subLogger *log.Entry - hook *test.Hook + hook *test.Hook ) if ts.expectedOutput != "" { @@ -165,7 +166,7 @@ journalctl_filter: out := make(chan types.Event, 100) j := JournalCtlSource{} - err := j.Configure([]byte(ts.config), subLogger) + err := j.Configure([]byte(ts.config), subLogger, configuration.METRICS_NONE) if err != nil { t.Fatalf("Unexpected error : %s", err) } @@ -218,9 +219,9 @@ journalctl_filter: } for _, ts := range tests { var ( - logger *log.Logger + logger *log.Logger subLogger *log.Entry - hook *test.Hook + hook *test.Hook ) if ts.expectedOutput != "" { @@ -239,7 +240,7 @@ journalctl_filter: out := make(chan types.Event) j := JournalCtlSource{} - err := j.Configure([]byte(ts.config), subLogger) + err := j.Configure([]byte(ts.config), subLogger, configuration.METRICS_NONE) if err != nil { t.Fatalf("Unexpected error : %s", err) } diff --git a/pkg/acquisition/modules/kafka/kafka.go b/pkg/acquisition/modules/kafka/kafka.go index 5b6e8fc0d..f64bb1df3 100644 --- a/pkg/acquisition/modules/kafka/kafka.go +++ b/pkg/acquisition/modules/kafka/kafka.go @@ -52,9 +52,10 @@ type TLSConfig struct { } type KafkaSource struct { - Config KafkaConfiguration - logger *log.Entry - Reader *kafka.Reader + metricsLevel int + Config KafkaConfiguration + logger *log.Entry + Reader *kafka.Reader } func (k *KafkaSource) GetUuid() string { @@ -86,8 +87,9 @@ func (k *KafkaSource) UnmarshalConfig(yamlConfig []byte) error { return err } -func (k *KafkaSource) Configure(yamlConfig []byte, logger *log.Entry) error { +func (k *KafkaSource) Configure(yamlConfig []byte, logger *log.Entry, MetricsLevel int) error { k.logger = logger + k.metricsLevel = MetricsLevel k.logger.Debugf("start configuring %s source", dataSourceName) @@ -170,7 +172,9 @@ func (k *KafkaSource) ReadMessage(out chan types.Event) error { Module: k.GetName(), } k.logger.Tracef("line with message read from topic '%s': %+v", k.Config.Topic, l) - linesRead.With(prometheus.Labels{"topic": k.Config.Topic}).Inc() + if k.metricsLevel != configuration.METRICS_NONE { + linesRead.With(prometheus.Labels{"topic": k.Config.Topic}).Inc() + } var evt types.Event if !k.Config.UseTimeMachine { diff --git a/pkg/acquisition/modules/kafka/kafka_test.go b/pkg/acquisition/modules/kafka/kafka_test.go index 92ccd4c7a..6eda37a37 100644 --- a/pkg/acquisition/modules/kafka/kafka_test.go +++ b/pkg/acquisition/modules/kafka/kafka_test.go @@ -15,6 +15,7 @@ import ( "github.com/crowdsecurity/go-cs-lib/cstest" + "github.com/crowdsecurity/crowdsec/pkg/acquisition/configuration" "github.com/crowdsecurity/crowdsec/pkg/types" ) @@ -75,7 +76,7 @@ group_id: crowdsec`, }) for _, test := range tests { k := KafkaSource{} - err := k.Configure([]byte(test.config), subLogger) + err := k.Configure([]byte(test.config), subLogger, configuration.METRICS_NONE) cstest.AssertErrorContains(t, err, test.expectedErr) } } @@ -169,7 +170,7 @@ func TestStreamingAcquisition(t *testing.T) { source: kafka brokers: - localhost:9092 -topic: crowdsecplaintext`), subLogger) +topic: crowdsecplaintext`), subLogger, configuration.METRICS_NONE) if err != nil { t.Fatalf("could not configure kafka source : %s", err) } @@ -245,7 +246,7 @@ tls: client_cert: ./testdata/kafkaClient.certificate.pem client_key: ./testdata/kafkaClient.key ca_cert: ./testdata/snakeoil-ca-1.crt - `), subLogger) + `), subLogger, configuration.METRICS_NONE) if err != nil { t.Fatalf("could not configure kafka source : %s", err) } diff --git a/pkg/acquisition/modules/kinesis/kinesis.go b/pkg/acquisition/modules/kinesis/kinesis.go index e2cc79963..a86816244 100644 --- a/pkg/acquisition/modules/kinesis/kinesis.go +++ b/pkg/acquisition/modules/kinesis/kinesis.go @@ -38,6 +38,7 @@ type KinesisConfiguration struct { } type KinesisSource struct { + metricsLevel int Config KinesisConfiguration logger *log.Entry kClient *kinesis.Kinesis @@ -149,8 +150,9 @@ func (k *KinesisSource) UnmarshalConfig(yamlConfig []byte) error { return nil } -func (k *KinesisSource) Configure(yamlConfig []byte, logger *log.Entry) error { +func (k *KinesisSource) Configure(yamlConfig []byte, logger *log.Entry, MetricsLevel int) error { k.logger = logger + k.metricsLevel = MetricsLevel err := k.UnmarshalConfig(yamlConfig) if err != nil { @@ -283,11 +285,15 @@ func (k *KinesisSource) RegisterConsumer() (*kinesis.RegisterStreamConsumerOutpu func (k *KinesisSource) ParseAndPushRecords(records []*kinesis.Record, out chan types.Event, logger *log.Entry, shardId string) { for _, record := range records { if k.Config.StreamARN != "" { - linesReadShards.With(prometheus.Labels{"stream": k.Config.StreamARN, "shard": shardId}).Inc() - linesRead.With(prometheus.Labels{"stream": k.Config.StreamARN}).Inc() + if k.metricsLevel != configuration.METRICS_NONE { + linesReadShards.With(prometheus.Labels{"stream": k.Config.StreamARN, "shard": shardId}).Inc() + linesRead.With(prometheus.Labels{"stream": k.Config.StreamARN}).Inc() + } } else { - linesReadShards.With(prometheus.Labels{"stream": k.Config.StreamName, "shard": shardId}).Inc() - linesRead.With(prometheus.Labels{"stream": k.Config.StreamName}).Inc() + if k.metricsLevel != configuration.METRICS_NONE { + linesReadShards.With(prometheus.Labels{"stream": k.Config.StreamName, "shard": shardId}).Inc() + linesRead.With(prometheus.Labels{"stream": k.Config.StreamName}).Inc() + } } var data []CloudwatchSubscriptionLogEvent var err error diff --git a/pkg/acquisition/modules/kinesis/kinesis_test.go b/pkg/acquisition/modules/kinesis/kinesis_test.go index 662d6040e..a4e4f2f73 100644 --- a/pkg/acquisition/modules/kinesis/kinesis_test.go +++ b/pkg/acquisition/modules/kinesis/kinesis_test.go @@ -17,6 +17,7 @@ import ( "github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/aws/session" "github.com/aws/aws-sdk-go/service/kinesis" + "github.com/crowdsecurity/crowdsec/pkg/acquisition/configuration" "github.com/crowdsecurity/crowdsec/pkg/types" log "github.com/sirupsen/logrus" "github.com/stretchr/testify/assert" @@ -143,7 +144,7 @@ stream_arn: arn:aws:kinesis:eu-west-1:123456789012:stream/my-stream`, }) for _, test := range tests { f := KinesisSource{} - err := f.Configure([]byte(test.config), subLogger) + err := f.Configure([]byte(test.config), subLogger, configuration.METRICS_NONE) cstest.AssertErrorContains(t, err, test.expectedErr) } } @@ -172,7 +173,7 @@ stream_name: stream-1-shard`, config := fmt.Sprintf(test.config, endpoint) err := f.Configure([]byte(config), log.WithFields(log.Fields{ "type": "kinesis", - })) + }), configuration.METRICS_NONE) if err != nil { t.Fatalf("Error configuring source: %s", err) } @@ -218,7 +219,7 @@ stream_name: stream-2-shards`, config := fmt.Sprintf(test.config, endpoint) err := f.Configure([]byte(config), log.WithFields(log.Fields{ "type": "kinesis", - })) + }), configuration.METRICS_NONE) if err != nil { t.Fatalf("Error configuring source: %s", err) } @@ -267,7 +268,7 @@ from_subscription: true`, config := fmt.Sprintf(test.config, endpoint) err := f.Configure([]byte(config), log.WithFields(log.Fields{ "type": "kinesis", - })) + }), configuration.METRICS_NONE) if err != nil { t.Fatalf("Error configuring source: %s", err) } diff --git a/pkg/acquisition/modules/kubernetesaudit/k8s_audit.go b/pkg/acquisition/modules/kubernetesaudit/k8s_audit.go index ee44bd01a..7d27f9e03 100644 --- a/pkg/acquisition/modules/kubernetesaudit/k8s_audit.go +++ b/pkg/acquisition/modules/kubernetesaudit/k8s_audit.go @@ -28,12 +28,13 @@ type KubernetesAuditConfiguration struct { } type KubernetesAuditSource struct { - config KubernetesAuditConfiguration - logger *log.Entry - mux *http.ServeMux - server *http.Server - outChan chan types.Event - addr string + metricsLevel int + config KubernetesAuditConfiguration + logger *log.Entry + mux *http.ServeMux + server *http.Server + outChan chan types.Event + addr string } var eventCount = prometheus.NewCounterVec( @@ -93,8 +94,9 @@ func (ka *KubernetesAuditSource) UnmarshalConfig(yamlConfig []byte) error { return nil } -func (ka *KubernetesAuditSource) Configure(config []byte, logger *log.Entry) error { +func (ka *KubernetesAuditSource) Configure(config []byte, logger *log.Entry, MetricsLevel int) error { ka.logger = logger + ka.metricsLevel = MetricsLevel err := ka.UnmarshalConfig(config) if err != nil { @@ -161,7 +163,10 @@ func (ka *KubernetesAuditSource) Dump() interface{} { } func (ka *KubernetesAuditSource) webhookHandler(w http.ResponseWriter, r *http.Request) { - requestCount.WithLabelValues(ka.addr).Inc() + + if ka.metricsLevel != configuration.METRICS_NONE { + requestCount.WithLabelValues(ka.addr).Inc() + } if r.Method != http.MethodPost { w.WriteHeader(http.StatusMethodNotAllowed) return @@ -185,7 +190,9 @@ func (ka *KubernetesAuditSource) webhookHandler(w http.ResponseWriter, r *http.R remoteIP := strings.Split(r.RemoteAddr, ":")[0] for _, auditEvent := range auditEvents.Items { - eventCount.WithLabelValues(ka.addr).Inc() + if ka.metricsLevel != configuration.METRICS_NONE { + eventCount.WithLabelValues(ka.addr).Inc() + } bytesEvent, err := json.Marshal(auditEvent) if err != nil { ka.logger.Errorf("Error marshaling audit event: %s", err) diff --git a/pkg/acquisition/modules/kubernetesaudit/k8s_audit_test.go b/pkg/acquisition/modules/kubernetesaudit/k8s_audit_test.go index c3502c956..331822ecf 100644 --- a/pkg/acquisition/modules/kubernetesaudit/k8s_audit_test.go +++ b/pkg/acquisition/modules/kubernetesaudit/k8s_audit_test.go @@ -6,6 +6,7 @@ import ( "testing" "time" + "github.com/crowdsecurity/crowdsec/pkg/acquisition/configuration" "github.com/crowdsecurity/crowdsec/pkg/types" log "github.com/sirupsen/logrus" "github.com/stretchr/testify/assert" @@ -81,7 +82,7 @@ webhook_path: /k8s-audit`, require.NoError(t, err) - err = f.Configure([]byte(test.config), subLogger) + err = f.Configure([]byte(test.config), subLogger, configuration.METRICS_NONE) require.NoError(t, err) f.StreamingAcquisition(out, tb) @@ -253,7 +254,7 @@ webhook_path: /k8s-audit`, f := KubernetesAuditSource{} err := f.UnmarshalConfig([]byte(test.config)) require.NoError(t, err) - err = f.Configure([]byte(test.config), subLogger) + err = f.Configure([]byte(test.config), subLogger, configuration.METRICS_NONE) require.NoError(t, err) diff --git a/pkg/acquisition/modules/loki/loki.go b/pkg/acquisition/modules/loki/loki.go index 555deefe2..3625c6895 100644 --- a/pkg/acquisition/modules/loki/loki.go +++ b/pkg/acquisition/modules/loki/loki.go @@ -57,7 +57,8 @@ type LokiConfiguration struct { } type LokiSource struct { - Config LokiConfiguration + metricsLevel int + Config LokiConfiguration Client *lokiclient.LokiClient @@ -118,9 +119,10 @@ func (l *LokiSource) UnmarshalConfig(yamlConfig []byte) error { return nil } -func (l *LokiSource) Configure(config []byte, logger *log.Entry) error { +func (l *LokiSource) Configure(config []byte, logger *log.Entry, MetricsLevel int) error { l.Config = LokiConfiguration{} l.logger = logger + l.metricsLevel = MetricsLevel err := l.UnmarshalConfig(config) if err != nil { return err @@ -302,7 +304,9 @@ func (l *LokiSource) readOneEntry(entry lokiclient.Entry, labels map[string]stri ll.Process = true ll.Module = l.GetName() - linesRead.With(prometheus.Labels{"source": l.Config.URL}).Inc() + if l.metricsLevel != configuration.METRICS_NONE { + linesRead.With(prometheus.Labels{"source": l.Config.URL}).Inc() + } expectMode := types.LIVE if l.Config.UseTimeMachine { expectMode = types.TIMEMACHINE diff --git a/pkg/acquisition/modules/loki/loki_test.go b/pkg/acquisition/modules/loki/loki_test.go index 8511d5445..9ac3ccbd3 100644 --- a/pkg/acquisition/modules/loki/loki_test.go +++ b/pkg/acquisition/modules/loki/loki_test.go @@ -20,6 +20,7 @@ import ( "github.com/crowdsecurity/go-cs-lib/cstest" + "github.com/crowdsecurity/crowdsec/pkg/acquisition/configuration" "github.com/crowdsecurity/crowdsec/pkg/acquisition/modules/loki" "github.com/crowdsecurity/crowdsec/pkg/types" ) @@ -130,7 +131,7 @@ query: > for _, test := range tests { t.Run(test.testName, func(t *testing.T) { lokiSource := loki.LokiSource{} - err := lokiSource.Configure([]byte(test.config), subLogger) + err := lokiSource.Configure([]byte(test.config), subLogger, configuration.METRICS_NONE) cstest.AssertErrorContains(t, err, test.expectedErr) if test.password != "" { @@ -346,7 +347,7 @@ since: 1h "type": "loki", }) lokiSource := loki.LokiSource{} - err := lokiSource.Configure([]byte(ts.config), subLogger) + err := lokiSource.Configure([]byte(ts.config), subLogger, configuration.METRICS_NONE) if err != nil { t.Fatalf("Unexpected error : %s", err) @@ -436,7 +437,7 @@ query: > lokiTomb := tomb.Tomb{} lokiSource := loki.LokiSource{} - err := lokiSource.Configure([]byte(ts.config), subLogger) + err := lokiSource.Configure([]byte(ts.config), subLogger, configuration.METRICS_NONE) if err != nil { t.Fatalf("Unexpected error : %s", err) } @@ -514,7 +515,7 @@ query: > title := time.Now().String() lokiSource := loki.LokiSource{} - err := lokiSource.Configure([]byte(config), subLogger) + err := lokiSource.Configure([]byte(config), subLogger, configuration.METRICS_NONE) if err != nil { t.Fatalf("Unexpected error : %s", err) } diff --git a/pkg/acquisition/modules/s3/s3.go b/pkg/acquisition/modules/s3/s3.go index 651d40d3d..98b2e685c 100644 --- a/pkg/acquisition/modules/s3/s3.go +++ b/pkg/acquisition/modules/s3/s3.go @@ -47,15 +47,16 @@ type S3Configuration struct { } type S3Source struct { - Config S3Configuration - logger *log.Entry - s3Client s3iface.S3API - sqsClient sqsiface.SQSAPI - readerChan chan S3Object - t *tomb.Tomb - out chan types.Event - ctx aws.Context - cancel context.CancelFunc + MetricsLevel int + Config S3Configuration + logger *log.Entry + s3Client s3iface.S3API + sqsClient sqsiface.SQSAPI + readerChan chan S3Object + t *tomb.Tomb + out chan types.Event + ctx aws.Context + cancel context.CancelFunc } type S3Object struct { @@ -345,7 +346,9 @@ func (s *S3Source) sqsPoll() error { logger.Tracef("SQS output: %v", out) logger.Debugf("Received %d messages from SQS", len(out.Messages)) for _, message := range out.Messages { - sqsMessagesReceived.WithLabelValues(s.Config.SQSName).Inc() + if s.MetricsLevel != configuration.METRICS_NONE { + sqsMessagesReceived.WithLabelValues(s.Config.SQSName).Inc() + } bucket, key, err := s.extractBucketAndPrefix(message.Body) if err != nil { logger.Errorf("Error while parsing SQS message: %s", err) @@ -426,14 +429,20 @@ func (s *S3Source) readFile(bucket string, key string) error { default: text := scanner.Text() logger.Tracef("Read line %s", text) - linesRead.WithLabelValues(bucket).Inc() + if s.MetricsLevel != configuration.METRICS_NONE { + linesRead.WithLabelValues(bucket).Inc() + } l := types.Line{} l.Raw = text l.Labels = s.Config.Labels l.Time = time.Now().UTC() l.Process = true l.Module = s.GetName() - l.Src = bucket + "/" + key + if s.MetricsLevel == configuration.METRICS_FULL { + l.Src = bucket + "/" + key + } else if s.MetricsLevel == configuration.METRICS_AGGREGATE { + l.Src = bucket + } var evt types.Event if !s.Config.UseTimeMachine { evt = types.Event{Line: l, Process: true, Type: types.LOG, ExpectMode: types.LIVE} @@ -446,7 +455,9 @@ func (s *S3Source) readFile(bucket string, key string) error { if err := scanner.Err(); err != nil { return fmt.Errorf("failed to read object %s/%s: %s", bucket, key, err) } - objectsRead.WithLabelValues(bucket).Inc() + if s.MetricsLevel != configuration.METRICS_NONE { + objectsRead.WithLabelValues(bucket).Inc() + } return nil } @@ -505,7 +516,7 @@ func (s *S3Source) UnmarshalConfig(yamlConfig []byte) error { return nil } -func (s *S3Source) Configure(yamlConfig []byte, logger *log.Entry) error { +func (s *S3Source) Configure(yamlConfig []byte, logger *log.Entry, metricsLevel int) error { err := s.UnmarshalConfig(yamlConfig) if err != nil { return err diff --git a/pkg/acquisition/modules/s3/s3_test.go b/pkg/acquisition/modules/s3/s3_test.go index 02423b139..e94521d18 100644 --- a/pkg/acquisition/modules/s3/s3_test.go +++ b/pkg/acquisition/modules/s3/s3_test.go @@ -14,6 +14,7 @@ import ( "github.com/aws/aws-sdk-go/service/s3/s3iface" "github.com/aws/aws-sdk-go/service/sqs" "github.com/aws/aws-sdk-go/service/sqs/sqsiface" + "github.com/crowdsecurity/crowdsec/pkg/acquisition/configuration" "github.com/crowdsecurity/crowdsec/pkg/types" log "github.com/sirupsen/logrus" "github.com/stretchr/testify/assert" @@ -66,7 +67,7 @@ sqs_name: foobar for _, test := range tests { t.Run(test.name, func(t *testing.T) { f := S3Source{} - err := f.Configure([]byte(test.config), nil) + err := f.Configure([]byte(test.config), nil, configuration.METRICS_NONE) if err == nil { t.Fatalf("expected error, got none") } @@ -111,7 +112,7 @@ polling_method: list t.Run(test.name, func(t *testing.T) { f := S3Source{} logger := log.NewEntry(log.New()) - err := f.Configure([]byte(test.config), logger) + err := f.Configure([]byte(test.config), logger, configuration.METRICS_NONE) if err != nil { t.Fatalf("unexpected error: %s", err.Error()) } @@ -306,7 +307,7 @@ prefix: foo/ f := S3Source{} logger := log.NewEntry(log.New()) logger.Logger.SetLevel(log.TraceLevel) - err := f.Configure([]byte(test.config), logger) + err := f.Configure([]byte(test.config), logger, configuration.METRICS_NONE) if err != nil { t.Fatalf("unexpected error: %s", err.Error()) } @@ -381,7 +382,7 @@ sqs_name: test linesRead := 0 f := S3Source{} logger := log.NewEntry(log.New()) - err := f.Configure([]byte(test.config), logger) + err := f.Configure([]byte(test.config), logger, configuration.METRICS_NONE) if err != nil { t.Fatalf("unexpected error: %s", err.Error()) } diff --git a/pkg/acquisition/modules/syslog/syslog.go b/pkg/acquisition/modules/syslog/syslog.go index 8aed28368..47940c329 100644 --- a/pkg/acquisition/modules/syslog/syslog.go +++ b/pkg/acquisition/modules/syslog/syslog.go @@ -29,10 +29,11 @@ type SyslogConfiguration struct { } type SyslogSource struct { - config SyslogConfiguration - logger *log.Entry - server *syslogserver.SyslogServer - serverTomb *tomb.Tomb + metricsLevel int + config SyslogConfiguration + logger *log.Entry + server *syslogserver.SyslogServer + serverTomb *tomb.Tomb } var linesReceived = prometheus.NewCounterVec( @@ -121,10 +122,10 @@ func (s *SyslogSource) UnmarshalConfig(yamlConfig []byte) error { return nil } -func (s *SyslogSource) Configure(yamlConfig []byte, logger *log.Entry) error { +func (s *SyslogSource) Configure(yamlConfig []byte, logger *log.Entry, MetricsLevel int) error { s.logger = logger s.logger.Infof("Starting syslog datasource configuration") - + s.metricsLevel = MetricsLevel err := s.UnmarshalConfig(yamlConfig) if err != nil { return err @@ -198,7 +199,9 @@ func (s *SyslogSource) handleSyslogMsg(out chan types.Event, t *tomb.Tomb, c cha logger := s.logger.WithField("client", syslogLine.Client) logger.Tracef("raw: %s", syslogLine) - linesReceived.With(prometheus.Labels{"source": syslogLine.Client}).Inc() + if s.metricsLevel != configuration.METRICS_NONE { + linesReceived.With(prometheus.Labels{"source": syslogLine.Client}).Inc() + } p := rfc3164.NewRFC3164Parser(rfc3164.WithCurrentYear()) err := p.Parse(syslogLine.Message) if err != nil { @@ -211,10 +214,14 @@ func (s *SyslogSource) handleSyslogMsg(out chan types.Event, t *tomb.Tomb, c cha continue } line = s.buildLogFromSyslog(p2.Timestamp, p2.Hostname, p2.Tag, p2.PID, p2.Message) - linesParsed.With(prometheus.Labels{"source": syslogLine.Client, "type": "rfc5424"}).Inc() + if s.metricsLevel != configuration.METRICS_NONE { + linesParsed.With(prometheus.Labels{"source": syslogLine.Client, "type": "rfc5424"}).Inc() + } } else { line = s.buildLogFromSyslog(p.Timestamp, p.Hostname, p.Tag, p.PID, p.Message) - linesParsed.With(prometheus.Labels{"source": syslogLine.Client, "type": "rfc3164"}).Inc() + if s.metricsLevel != configuration.METRICS_NONE { + linesParsed.With(prometheus.Labels{"source": syslogLine.Client, "type": "rfc3164"}).Inc() + } } line = strings.TrimSuffix(line, "\n") diff --git a/pkg/acquisition/modules/syslog/syslog_test.go b/pkg/acquisition/modules/syslog/syslog_test.go index 1d2ba3fb6..ba14c7db0 100644 --- a/pkg/acquisition/modules/syslog/syslog_test.go +++ b/pkg/acquisition/modules/syslog/syslog_test.go @@ -9,6 +9,7 @@ import ( "github.com/crowdsecurity/go-cs-lib/cstest" + "github.com/crowdsecurity/crowdsec/pkg/acquisition/configuration" "github.com/crowdsecurity/crowdsec/pkg/types" log "github.com/sirupsen/logrus" "gopkg.in/tomb.v2" @@ -56,7 +57,7 @@ listen_addr: 10.0.0`, }) for _, test := range tests { s := SyslogSource{} - err := s.Configure([]byte(test.config), subLogger) + err := s.Configure([]byte(test.config), subLogger, configuration.METRICS_NONE) cstest.AssertErrorContains(t, err, test.expectedErr) } } @@ -137,7 +138,7 @@ listen_addr: 127.0.0.1`, "type": "syslog", }) s := SyslogSource{} - err := s.Configure([]byte(ts.config), subLogger) + err := s.Configure([]byte(ts.config), subLogger, configuration.METRICS_NONE) if err != nil { t.Fatalf("could not configure syslog source : %s", err) } diff --git a/pkg/acquisition/modules/wineventlog/wineventlog.go b/pkg/acquisition/modules/wineventlog/wineventlog.go index f0eca5d13..44035d0a7 100644 --- a/pkg/acquisition/modules/wineventlog/wineventlog.go +++ b/pkg/acquisition/modules/wineventlog/wineventlog.go @@ -23,7 +23,7 @@ func (w *WinEventLogSource) UnmarshalConfig(yamlConfig []byte) error { return nil } -func (w *WinEventLogSource) Configure(yamlConfig []byte, logger *log.Entry) error { +func (w *WinEventLogSource) Configure(yamlConfig []byte, logger *log.Entry, metricsLevel int) error { return nil } diff --git a/pkg/acquisition/modules/wineventlog/wineventlog_test.go b/pkg/acquisition/modules/wineventlog/wineventlog_test.go index 053ba88b5..c937ceba8 100644 --- a/pkg/acquisition/modules/wineventlog/wineventlog_test.go +++ b/pkg/acquisition/modules/wineventlog/wineventlog_test.go @@ -7,6 +7,7 @@ import ( "testing" "time" + "github.com/crowdsecurity/crowdsec/pkg/acquisition/configuration" "github.com/crowdsecurity/crowdsec/pkg/exprhelpers" "github.com/crowdsecurity/crowdsec/pkg/types" log "github.com/sirupsen/logrus" @@ -58,7 +59,7 @@ xpath_query: test`, }) for _, test := range tests { f := WinEventLogSource{} - err := f.Configure([]byte(test.config), subLogger) + err := f.Configure([]byte(test.config), subLogger, configuration.METRICS_NONE) assert.Contains(t, err.Error(), test.expectedErr) } } @@ -117,7 +118,7 @@ event_level: bla`, }) for _, test := range tests { f := WinEventLogSource{} - f.Configure([]byte(test.config), subLogger) + f.Configure([]byte(test.config), subLogger, configuration.METRICS_NONE) q, err := f.buildXpathQuery() if test.expectedErr != "" { if err == nil { @@ -194,7 +195,7 @@ event_ids: to := &tomb.Tomb{} c := make(chan types.Event) f := WinEventLogSource{} - f.Configure([]byte(test.config), subLogger) + f.Configure([]byte(test.config), subLogger, configuration.METRICS_NONE) f.StreamingAcquisition(c, to) time.Sleep(time.Second) lines := test.expectedLines diff --git a/pkg/acquisition/modules/wineventlog/wineventlog_windows.go b/pkg/acquisition/modules/wineventlog/wineventlog_windows.go index ee69dc35c..8adbf1e53 100644 --- a/pkg/acquisition/modules/wineventlog/wineventlog_windows.go +++ b/pkg/acquisition/modules/wineventlog/wineventlog_windows.go @@ -34,11 +34,12 @@ type WinEventLogConfiguration struct { } type WinEventLogSource struct { - config WinEventLogConfiguration - logger *log.Entry - evtConfig *winlog.SubscribeConfig - query string - name string + metricsLevel int + config WinEventLogConfiguration + logger *log.Entry + evtConfig *winlog.SubscribeConfig + query string + name string } type QueryList struct { @@ -188,7 +189,9 @@ func (w *WinEventLogSource) getEvents(out chan types.Event, t *tomb.Tomb) error continue } for _, event := range renderedEvents { - linesRead.With(prometheus.Labels{"source": w.name}).Inc() + if w.metricsLevel != configuration.METRICS_NONE { + linesRead.With(prometheus.Labels{"source": w.name}).Inc() + } l := types.Line{} l.Raw = event l.Module = w.GetName() @@ -270,8 +273,9 @@ func (w *WinEventLogSource) UnmarshalConfig(yamlConfig []byte) error { return nil } -func (w *WinEventLogSource) Configure(yamlConfig []byte, logger *log.Entry) error { +func (w *WinEventLogSource) Configure(yamlConfig []byte, logger *log.Entry, MetricsLevel int) error { w.logger = logger + w.metricsLevel = MetricsLevel err := w.UnmarshalConfig(yamlConfig) if err != nil { diff --git a/pkg/csconfig/config.go b/pkg/csconfig/config.go index 0c960803e..e007e042b 100644 --- a/pkg/csconfig/config.go +++ b/pkg/csconfig/config.go @@ -12,6 +12,7 @@ import ( log "github.com/sirupsen/logrus" "gopkg.in/yaml.v3" + "github.com/crowdsecurity/crowdsec/pkg/acquisition/configuration" "github.com/crowdsecurity/go-cs-lib/csstring" "github.com/crowdsecurity/go-cs-lib/ptr" "github.com/crowdsecurity/go-cs-lib/yamlpatch" @@ -118,7 +119,7 @@ func NewDefaultConfig() *Config { } prometheus := PrometheusCfg{ Enabled: true, - Level: "full", + Level: configuration.CFG_METRICS_FULL, } configPaths := ConfigurationPaths{ ConfigDir: DefaultConfigPath("."),