فهرست منبع

Add alerts and decisions metrics, LAPI and agent timing prom metrics (#1546)

Thibault "bui" Koechlin 3 سال پیش
والد
کامیت
a6ed08b239

+ 62 - 3
cmd/crowdsec-cli/metrics.go

@@ -124,6 +124,8 @@ func ShowPrometheus(url string) {
 	lapi_stats := map[string]map[string]int{}
 	lapi_machine_stats := map[string]map[string]map[string]int{}
 	lapi_bouncer_stats := map[string]map[string]map[string]int{}
+	decisions_stats := map[string]map[string]map[string]int{}
+	alerts_stats := map[string]int{}
 
 	for idx, fam := range result {
 		if !strings.HasPrefix(fam.Name, "cs_") {
@@ -131,7 +133,11 @@ func ShowPrometheus(url string) {
 		}
 		log.Tracef("round %d", idx)
 		for _, m := range fam.Metrics {
-			metric := m.(prom2json.Metric)
+			metric, ok := m.(prom2json.Metric)
+			if !ok {
+				log.Debugf("failed to convert metric to prom2json.Metric")
+				continue
+			}
 			name, ok := metric.Labels["name"]
 			if !ok {
 				log.Debugf("no name in Metric %v", metric.Labels)
@@ -152,6 +158,10 @@ func ShowPrometheus(url string) {
 			route := metric.Labels["route"]
 			method := metric.Labels["method"]
 
+			reason := metric.Labels["reason"]
+			origin := metric.Labels["origin"]
+			action := metric.Labels["action"]
+
 			fval, err := strconv.ParseFloat(value, 32)
 			if err != nil {
 				log.Errorf("Unexpected int value %s : %s", value, err)
@@ -254,6 +264,19 @@ func ShowPrometheus(url string) {
 					x.NonEmpty += ival
 				}
 				lapi_decisions_stats[bouncer] = x
+			case "cs_active_decisions":
+				if _, ok := decisions_stats[reason]; !ok {
+					decisions_stats[reason] = make(map[string]map[string]int)
+				}
+				if _, ok := decisions_stats[reason][origin]; !ok {
+					decisions_stats[reason][origin] = make(map[string]int)
+				}
+				decisions_stats[reason][origin][action] += ival
+			case "cs_alerts":
+				/*if _, ok := alerts_stats[scenario]; !ok {
+					alerts_stats[scenario] = make(map[string]int)
+				}*/
+				alerts_stats[reason] += ival
 			default:
 				continue
 			}
@@ -329,6 +352,30 @@ func ShowPrometheus(url string) {
 			}
 		}
 
+		decisionsTable := tablewriter.NewWriter(os.Stdout)
+		decisionsTable.SetHeader([]string{"Reason", "Origin", "Action", "Count"})
+		for reason, origins := range decisions_stats {
+			for origin, actions := range origins {
+				for action, hits := range actions {
+					row := []string{}
+					row = append(row, reason)
+					row = append(row, origin)
+					row = append(row, action)
+					row = append(row, fmt.Sprintf("%d", hits))
+					decisionsTable.Append(row)
+				}
+			}
+		}
+
+		alertsTable := tablewriter.NewWriter(os.Stdout)
+		alertsTable.SetHeader([]string{"Reason", "Count"})
+		for scenario, hits := range alerts_stats {
+			row := []string{}
+			row = append(row, scenario)
+			row = append(row, fmt.Sprintf("%d", hits))
+			alertsTable.Append(row)
+		}
+
 		if bucketsTable.NumLines() > 0 {
 			log.Printf("Buckets Metrics:")
 			bucketsTable.SetAlignment(tablewriter.ALIGN_LEFT)
@@ -366,8 +413,20 @@ func ShowPrometheus(url string) {
 			lapiDecisionsTable.Render()
 		}
 
+		if decisionsTable.NumLines() > 0 {
+			log.Printf("Local Api Decisions:")
+			decisionsTable.SetAlignment(tablewriter.ALIGN_LEFT)
+			decisionsTable.Render()
+		}
+
+		if alertsTable.NumLines() > 0 {
+			log.Printf("Local Api Alerts:")
+			alertsTable.SetAlignment(tablewriter.ALIGN_LEFT)
+			alertsTable.Render()
+		}
+
 	} else if csConfig.Cscli.Output == "json" {
-		for _, val := range []interface{}{acquis_stats, parsers_stats, buckets_stats, lapi_stats, lapi_bouncer_stats, lapi_machine_stats, lapi_decisions_stats} {
+		for _, val := range []interface{}{acquis_stats, parsers_stats, buckets_stats, lapi_stats, lapi_bouncer_stats, lapi_machine_stats, lapi_decisions_stats, decisions_stats, alerts_stats} {
 			x, err := json.MarshalIndent(val, "", " ")
 			if err != nil {
 				log.Fatalf("failed to unmarshal metrics : %v", err)
@@ -375,7 +434,7 @@ func ShowPrometheus(url string) {
 			fmt.Printf("%s\n", string(x))
 		}
 	} else if csConfig.Cscli.Output == "raw" {
-		for _, val := range []interface{}{acquis_stats, parsers_stats, buckets_stats, lapi_stats, lapi_bouncer_stats, lapi_machine_stats, lapi_decisions_stats} {
+		for _, val := range []interface{}{acquis_stats, parsers_stats, buckets_stats, lapi_stats, lapi_bouncer_stats, lapi_machine_stats, lapi_decisions_stats, decisions_stats, alerts_stats} {
 			x, err := yaml.Marshal(val)
 			if err != nil {
 				log.Fatalf("failed to unmarshal metrics : %v", err)

+ 91 - 6
cmd/crowdsec/metrics.go

@@ -6,6 +6,7 @@ import (
 	v1 "github.com/crowdsecurity/crowdsec/pkg/apiserver/controllers/v1"
 	"github.com/crowdsecurity/crowdsec/pkg/csconfig"
 	"github.com/crowdsecurity/crowdsec/pkg/cwversion"
+	"github.com/crowdsecurity/crowdsec/pkg/database"
 	leaky "github.com/crowdsecurity/crowdsec/pkg/leakybucket"
 	"github.com/crowdsecurity/crowdsec/pkg/parser"
 	"github.com/crowdsecurity/crowdsec/pkg/types"
@@ -62,6 +63,81 @@ var globalCsInfo = prometheus.NewGauge(
 	},
 )
 
+var globalActiveDecisions = prometheus.NewGaugeVec(
+	prometheus.GaugeOpts{
+		Name: "cs_active_decisions",
+		Help: "Number of active decisions.",
+	},
+	[]string{"reason", "origin", "action"},
+)
+
+var globalAlerts = prometheus.NewGaugeVec(
+	prometheus.GaugeOpts{
+		Name: "cs_alerts",
+		Help: "Number of alerts (excluding CAPI).",
+	},
+	[]string{"reason"},
+)
+
+var globalParsingHistogram = prometheus.NewHistogramVec(
+	prometheus.HistogramOpts{
+		Help:    "Time spent parsing a line",
+		Name:    "cs_parsing_time_seconds",
+		Buckets: []float64{0.0005, 0.001, 0.0015, 0.002, 0.0025, 0.003, 0.004, 0.005, 0.0075, 0.01},
+	},
+	[]string{"type", "source"},
+)
+
+var globalPourHistogram = prometheus.NewHistogramVec(
+	prometheus.HistogramOpts{
+		Name:    "cs_bucket_pour_seconds",
+		Help:    "Time spent pouring an event to buckets.",
+		Buckets: []float64{0.001, 0.002, 0.005, 0.01, 0.015, 0.02, 0.03, 0.04, 0.05},
+	},
+	[]string{"type", "source"},
+)
+
+func computeDynamicMetrics(next http.Handler, dbClient *database.Client) http.HandlerFunc {
+	return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
+		if dbClient == nil {
+			next.ServeHTTP(w, r)
+			return
+		}
+
+		decisionsFilters := make(map[string][]string, 0)
+		decisions, err := dbClient.QueryDecisionCountByScenario(decisionsFilters)
+		if err != nil {
+			log.Errorf("Error querying decisions for metrics: %v", err)
+			next.ServeHTTP(w, r)
+			return
+		}
+		globalActiveDecisions.Reset()
+		for _, d := range decisions {
+			globalActiveDecisions.With(prometheus.Labels{"reason": d.Scenario, "origin": d.Origin, "action": d.Type}).Set(float64(d.Count))
+		}
+
+		globalAlerts.Reset()
+
+		alertsFilter := map[string][]string{
+			"include_capi": {"false"},
+		}
+
+		alerts, err := dbClient.AlertsCountPerScenario(alertsFilter)
+
+		if err != nil {
+			log.Errorf("Error querying alerts for metrics: %v", err)
+			next.ServeHTTP(w, r)
+			return
+		}
+
+		for k, v := range alerts {
+			globalAlerts.With(prometheus.Labels{"reason": k}).Set(float64(v))
+		}
+
+		next.ServeHTTP(w, r)
+	})
+}
+
 func registerPrometheus(config *csconfig.PrometheusCfg) {
 	if !config.Enabled {
 		return
@@ -75,13 +151,12 @@ func registerPrometheus(config *csconfig.PrometheusCfg) {
 		config.ListenPort = 6060
 	}
 
-	defer types.CatchPanic("crowdsec/registerPrometheus")
 	/*Registering prometheus*/
 	/*If in aggregated mode, do not register events associated to a source, keeps cardinality low*/
 	if config.Level == "aggregated" {
 		log.Infof("Loading aggregated prometheus collectors")
 		prometheus.MustRegister(globalParserHits, globalParserHitsOk, globalParserHitsKo,
-			globalCsInfo,
+			globalCsInfo, globalParsingHistogram, globalPourHistogram,
 			leaky.BucketsUnderflow, leaky.BucketsCanceled, leaky.BucketsInstanciation, leaky.BucketsOverflow,
 			v1.LapiRouteHits,
 			leaky.BucketsCurrentCount)
@@ -89,12 +164,22 @@ func registerPrometheus(config *csconfig.PrometheusCfg) {
 		log.Infof("Loading prometheus collectors")
 		prometheus.MustRegister(globalParserHits, globalParserHitsOk, globalParserHitsKo,
 			parser.NodesHits, parser.NodesHitsOk, parser.NodesHitsKo,
-			globalCsInfo,
-			v1.LapiRouteHits, v1.LapiMachineHits, v1.LapiBouncerHits, v1.LapiNilDecisions, v1.LapiNonNilDecisions,
-			leaky.BucketsPour, leaky.BucketsUnderflow, leaky.BucketsCanceled, leaky.BucketsInstanciation, leaky.BucketsOverflow, leaky.BucketsCurrentCount)
+			globalCsInfo, globalParsingHistogram, globalPourHistogram,
+			v1.LapiRouteHits, v1.LapiMachineHits, v1.LapiBouncerHits, v1.LapiNilDecisions, v1.LapiNonNilDecisions, v1.LapiResponseTime,
+			leaky.BucketsPour, leaky.BucketsUnderflow, leaky.BucketsCanceled, leaky.BucketsInstanciation, leaky.BucketsOverflow, leaky.BucketsCurrentCount,
+			globalActiveDecisions, globalAlerts)
 
 	}
-	http.Handle("/metrics", promhttp.Handler())
+}
+
+func servePrometheus(config *csconfig.PrometheusCfg, dbClient *database.Client) {
+	if !config.Enabled {
+		return
+	}
+
+	defer types.CatchPanic("crowdsec/servePrometheus")
+
+	http.Handle("/metrics", computeDynamicMetrics(promhttp.Handler(), dbClient))
 	if err := http.ListenAndServe(fmt.Sprintf("%s:%d", config.ListenAddr, config.ListenPort), nil); err != nil {
 		log.Warningf("prometheus: %s", err)
 	}

+ 5 - 0
cmd/crowdsec/parse.go

@@ -1,6 +1,8 @@
 package main
 
 import (
+	"time"
+
 	"github.com/prometheus/client_golang/prometheus"
 	log "github.com/sirupsen/logrus"
 
@@ -26,11 +28,14 @@ LOOP:
 			}
 			globalParserHits.With(prometheus.Labels{"source": event.Line.Src, "type": event.Line.Module}).Inc()
 
+			startParsing := time.Now()
 			/* parse the log using magic */
 			parsed, err := parser.Parse(parserCTX, event, nodes)
 			if err != nil {
 				log.Errorf("failed parsing : %v\n", err)
 			}
+			elapsed := time.Since(startParsing)
+			globalParsingHistogram.With(prometheus.Labels{"source": event.Line.Src, "type": event.Line.Module}).Observe(elapsed.Seconds())
 			if !parsed.Process {
 				globalParserHitsKo.With(prometheus.Labels{"source": event.Line.Src, "type": event.Line.Module}).Inc()
 				log.Debugf("Discarding line %+v", parsed)

+ 4 - 0
cmd/crowdsec/pour.go

@@ -7,6 +7,7 @@ import (
 	"github.com/crowdsecurity/crowdsec/pkg/csconfig"
 	leaky "github.com/crowdsecurity/crowdsec/pkg/leakybucket"
 	"github.com/crowdsecurity/crowdsec/pkg/types"
+	"github.com/prometheus/client_golang/prometheus"
 	log "github.com/sirupsen/logrus"
 )
 
@@ -21,6 +22,7 @@ func runPour(input chan types.Event, holders []leaky.BucketFactory, buckets *lea
 			log.Infof("Bucket routine exiting")
 			return nil
 		case parsed := <-input:
+			startTime := time.Now()
 			count++
 			if count%5000 == 0 {
 				log.Infof("%d existing buckets", leaky.LeakyRoutineCount)
@@ -45,6 +47,8 @@ func runPour(input chan types.Event, holders []leaky.BucketFactory, buckets *lea
 				log.Errorf("bucketify failed for: %v", parsed)
 				return fmt.Errorf("process of event failed : %v", err)
 			}
+			elapsed := time.Since(startTime)
+			globalPourHistogram.With(prometheus.Labels{"type": parsed.Line.Module, "source": parsed.Line.Src}).Observe(elapsed.Seconds())
 			if poured {
 				globalBucketPourOk.Inc()
 			} else {

+ 13 - 1
cmd/crowdsec/run_in_svc.go

@@ -8,6 +8,7 @@ import (
 
 	"github.com/crowdsecurity/crowdsec/pkg/csconfig"
 	"github.com/crowdsecurity/crowdsec/pkg/cwversion"
+	"github.com/crowdsecurity/crowdsec/pkg/database"
 	"github.com/crowdsecurity/crowdsec/pkg/types"
 	log "github.com/sirupsen/logrus"
 	"github.com/sirupsen/logrus/hooks/writer"
@@ -48,7 +49,18 @@ func StartRunSvc() error {
 
 	// Enable profiling early
 	if cConfig.Prometheus != nil {
-		go registerPrometheus(cConfig.Prometheus)
+		var dbClient *database.Client
+		var err error
+
+		if cConfig.DbConfig != nil {
+			dbClient, err = database.NewClient(cConfig.DbConfig)
+
+			if err != nil {
+				log.Fatalf("unable to create database client: %s", err)
+			}
+		}
+		registerPrometheus(cConfig.Prometheus)
+		go servePrometheus(cConfig.Prometheus, dbClient)
 	}
 	return Serve(cConfig)
 }

+ 13 - 1
cmd/crowdsec/run_in_svc_windows.go

@@ -6,6 +6,7 @@ import (
 
 	"github.com/crowdsecurity/crowdsec/pkg/csconfig"
 	"github.com/crowdsecurity/crowdsec/pkg/cwversion"
+	"github.com/crowdsecurity/crowdsec/pkg/database"
 	"github.com/crowdsecurity/crowdsec/pkg/types"
 	"github.com/pkg/errors"
 	log "github.com/sirupsen/logrus"
@@ -88,7 +89,18 @@ func WindowsRun() error {
 
 	// Enable profiling early
 	if cConfig.Prometheus != nil {
-		go registerPrometheus(cConfig.Prometheus)
+		var dbClient *database.Client
+		var err error
+
+		if cConfig.DbConfig != nil {
+			dbClient, err = database.NewClient(cConfig.DbConfig)
+
+			if err != nil {
+				log.Fatalf("unable to create database client: %s", err)
+			}
+		}
+		registerPrometheus(cConfig.Prometheus)
+		go servePrometheus(cConfig.Prometheus, dbClient)
 	}
 	return Serve(cConfig)
 }

+ 13 - 0
pkg/apiserver/controllers/v1/metrics.go

@@ -1,6 +1,8 @@
 package v1
 
 import (
+	"time"
+
 	jwt "github.com/appleboy/gin-jwt/v2"
 	"github.com/gin-gonic/gin"
 	"github.com/prometheus/client_golang/prometheus"
@@ -52,6 +54,14 @@ var LapiNonNilDecisions = prometheus.NewCounterVec(
 	[]string{"bouncer"},
 )
 
+var LapiResponseTime = prometheus.NewHistogramVec(
+	prometheus.HistogramOpts{
+		Name:    "cs_lapi_request_duration_seconds",
+		Help:    "Response time of LAPI",
+		Buckets: []float64{0.005, 0.01, 0.025, 0.05, 0.075, 0.1, 0.2, 0.3, 0.4, 0.5, 0.75, 1},
+	},
+	[]string{"endpoint", "method"})
+
 func PrometheusBouncersHasEmptyDecision(c *gin.Context) {
 	name, ok := c.Get("BOUNCER_NAME")
 	if ok {
@@ -99,9 +109,12 @@ func PrometheusBouncersMiddleware() gin.HandlerFunc {
 
 func PrometheusMiddleware() gin.HandlerFunc {
 	return func(c *gin.Context) {
+		startTime := time.Now()
 		LapiRouteHits.With(prometheus.Labels{
 			"route":  c.Request.URL.Path,
 			"method": c.Request.Method}).Inc()
 		c.Next()
+		elapsed := time.Since(startTime)
+		LapiResponseTime.With(prometheus.Labels{"method": c.Request.Method, "endpoint": c.Request.URL.Path}).Observe(elapsed.Seconds())
 	}
 }

+ 33 - 0
pkg/database/alerts.go

@@ -1,6 +1,7 @@
 package database
 
 import (
+	"context"
 	"encoding/json"
 	"fmt"
 	"strconv"
@@ -722,6 +723,38 @@ func BuildAlertRequestFromFilter(alerts *ent.AlertQuery, filter map[string][]str
 	return alerts, nil
 }
 
+func (c *Client) AlertsCountPerScenario(filters map[string][]string) (map[string]int, error) {
+
+	var res []struct {
+		Scenario string
+		Count    int
+	}
+
+	ctx := context.Background()
+
+	query := c.Ent.Alert.Query()
+
+	query, err := BuildAlertRequestFromFilter(query, filters)
+
+	if err != nil {
+		return nil, errors.Wrap(err, "failed to build alert request")
+	}
+
+	err = query.GroupBy(alert.FieldScenario).Aggregate(ent.Count()).Scan(ctx, &res)
+
+	if err != nil {
+		return nil, errors.Wrap(err, "failed to count alerts per scenario")
+	}
+
+	counts := make(map[string]int)
+
+	for _, r := range res {
+		counts[r.Scenario] = r.Count
+	}
+
+	return counts, nil
+}
+
 func (c *Client) TotalAlerts() (int, error) {
 	return c.Ent.Alert.Query().Count(c.CTX)
 }

+ 30 - 0
pkg/database/decisions.go

@@ -15,6 +15,13 @@ import (
 	"github.com/pkg/errors"
 )
 
+type DecisionsByScenario struct {
+	Scenario string
+	Count    int
+	Origin   string
+	Type     string
+}
+
 func BuildDecisionRequestWithFilter(query *ent.DecisionQuery, filter map[string][]string) (*ent.DecisionQuery, []*sql.Predicate, error) {
 
 	var err error
@@ -231,6 +238,29 @@ func (c *Client) QueryAllDecisionsWithFilters(filters map[string][]string) ([]*e
 	return data, nil
 }
 
+func (c *Client) QueryDecisionCountByScenario(filters map[string][]string) ([]*DecisionsByScenario, error) {
+	query := c.Ent.Decision.Query().Where(
+		decision.UntilGT(time.Now().UTC()),
+	)
+	query, _, err := BuildDecisionRequestWithFilter(query, filters)
+
+	if err != nil {
+		c.Log.Warningf("QueryDecisionCountByScenario : %s", err)
+		return nil, errors.Wrap(QueryFail, "count all decisions with filters")
+	}
+
+	var r []*DecisionsByScenario
+
+	err = query.GroupBy(decision.FieldScenario, decision.FieldOrigin, decision.FieldType).Aggregate(ent.Count()).Scan(c.CTX, &r)
+
+	if err != nil {
+		c.Log.Warningf("QueryDecisionCountByScenario : %s", err)
+		return nil, errors.Wrap(QueryFail, "count all decisions with filters")
+	}
+
+	return r, nil
+}
+
 func (c *Client) QueryExpiredDecisionsWithFilters(filters map[string][]string) ([]*ent.Decision, error) {
 	now := time.Now().UTC()
 	query := c.Ent.Decision.Query().Where(

+ 1 - 1
tests/bats/06_crowdsec.bats

@@ -31,7 +31,7 @@ declare stderr
     run -1 --separate-stderr "${BIN_DIR}/crowdsec"
     refute_output
     run -0 echo "${stderr}"
-    assert_output --partial "api server init: unable to run local API: unable to init database client: unknown database type 'meh'"
+    assert_output --partial "unable to create database client: unknown database type 'meh'"
 }
 
 @test "${FILE} CS_LAPI_SECRET not strong enough" {