Sebastien Blot 1 рік тому
батько
коміт
dbdf3ad1bb

+ 37 - 36
cmd/crowdsec/lpmetrics.go

@@ -4,39 +4,40 @@ import (
 	"context"
 	"errors"
 	"net/http"
-        "github.com/sirupsen/logrus"
-        "github.com/blackfireio/osinfo"
 	"time"
 
+	"github.com/blackfireio/osinfo"
+	"github.com/sirupsen/logrus"
+
 	"gopkg.in/tomb.v2"
 
-        "github.com/crowdsecurity/go-cs-lib/ptr"
-        "github.com/crowdsecurity/go-cs-lib/trace"
+	"github.com/crowdsecurity/go-cs-lib/ptr"
+	"github.com/crowdsecurity/go-cs-lib/trace"
 
-        "github.com/crowdsecurity/crowdsec/pkg/acquisition"
-        "github.com/crowdsecurity/crowdsec/pkg/apiclient"
-        "github.com/crowdsecurity/crowdsec/pkg/cwhub"
-        "github.com/crowdsecurity/crowdsec/pkg/cwversion"
-        "github.com/crowdsecurity/crowdsec/pkg/fflag"
-        "github.com/crowdsecurity/crowdsec/pkg/models"
+	"github.com/crowdsecurity/crowdsec/pkg/acquisition"
+	"github.com/crowdsecurity/crowdsec/pkg/apiclient"
+	"github.com/crowdsecurity/crowdsec/pkg/cwhub"
+	"github.com/crowdsecurity/crowdsec/pkg/cwversion"
+	"github.com/crowdsecurity/crowdsec/pkg/fflag"
+	"github.com/crowdsecurity/crowdsec/pkg/models"
 )
 
 // MetricsProvider collects metrics from the LP and sends them to the LAPI
 type MetricsProvider struct {
-	apic *apiclient.ApiClient
+	apic     *apiclient.ApiClient
 	interval time.Duration
-	static staticMetrics
-	logger *logrus.Entry
+	static   staticMetrics
+	logger   *logrus.Entry
 }
 
 type staticMetrics struct {
-	osName            string
-	osVersion         string
-	startupTS         int64
-	featureFlags      []string
-	consoleOptions    []string
-	datasourceMap     map[string]int64
-	hubState	  models.HubItems
+	osName         string
+	osVersion      string
+	startupTS      int64
+	featureFlags   []string
+	consoleOptions []string
+	datasourceMap  map[string]int64
+	hubState       models.HubItems
 }
 
 func getHubState(hub *cwhub.Hub) models.HubItems {
@@ -96,50 +97,48 @@ func detectOS() (string, string) {
 	return osInfo.Name, osInfo.Version
 }
 
-
 func NewMetricsProvider(apic *apiclient.ApiClient, interval time.Duration, logger *logrus.Entry,
-		consoleOptions []string, datasources []acquisition.DataSource, hub *cwhub.Hub) *MetricsProvider {
+	consoleOptions []string, datasources []acquisition.DataSource, hub *cwhub.Hub) *MetricsProvider {
 	return &MetricsProvider{
-		apic: apic,
+		apic:     apic,
 		interval: interval,
-		logger: logger,
-		static: newStaticMetrics(consoleOptions, datasources, hub),
+		logger:   logger,
+		static:   newStaticMetrics(consoleOptions, datasources, hub),
 	}
 }
 
 func (m *MetricsProvider) metricsPayload() *models.AllMetrics {
 	meta := &models.MetricsMeta{
 		UtcStartupTimestamp: m.static.startupTS,
-		WindowSizeSeconds: int64(m.interval.Seconds()),
+		WindowSizeSeconds:   int64(m.interval.Seconds()),
 	}
 
 	os := &models.OSversion{
-		Name: m.static.osName,
+		Name:    m.static.osName,
 		Version: m.static.osVersion,
 	}
 
 	base := models.BaseMetrics{
-		Meta: meta,
-		Os: os,
-		Version: ptr.Of(cwversion.VersionStr()),
+		Meta:         meta,
+		Os:           os,
+		Version:      ptr.Of(cwversion.VersionStr()),
 		FeatureFlags: m.static.featureFlags,
 	}
 
 	item0 := &models.LogProcessorsMetricsItems0{
-		BaseMetrics: base,
+		BaseMetrics:    base,
 		ConsoleOptions: m.static.consoleOptions,
-		Datasources: m.static.datasourceMap,
-		HubItems: m.static.hubState,
+		Datasources:    m.static.datasourceMap,
+		HubItems:       m.static.hubState,
 	}
 
 	// TODO: more metric details... ?
 
 	return &models.AllMetrics{
-                LogProcessors: []models.LogProcessorsMetrics{{item0}},
+		LogProcessors: []models.LogProcessorsMetrics{{item0}},
 	}
 }
 
-
 func (m *MetricsProvider) Run(ctx context.Context, myTomb *tomb.Tomb) error {
 	defer trace.CatchPanic("crowdsec/MetricsProvider.Run")
 
@@ -149,7 +148,7 @@ func (m *MetricsProvider) Run(ctx context.Context, myTomb *tomb.Tomb) error {
 
 	met := m.metricsPayload()
 
-	ticker := time.NewTicker(m.interval)
+	ticker := time.NewTicker(1) //Send on start
 
 	for {
 		select {
@@ -177,6 +176,8 @@ func (m *MetricsProvider) Run(ctx context.Context, myTomb *tomb.Tomb) error {
 				continue
 			}
 
+			ticker.Reset(m.interval)
+
 			m.logger.Tracef("lp usage metrics sent")
 		case <-myTomb.Dying():
 			ticker.Stop()

+ 87 - 1
pkg/apiserver/apic_metrics.go

@@ -2,11 +2,15 @@ package apiserver
 
 import (
 	"context"
+	"encoding/json"
+	"strings"
 	"time"
 
-	log "github.com/sirupsen/logrus"
 	"slices"
 
+	"github.com/davecgh/go-spew/spew"
+	log "github.com/sirupsen/logrus"
+
 	"github.com/crowdsecurity/go-cs-lib/ptr"
 	"github.com/crowdsecurity/go-cs-lib/trace"
 	"github.com/crowdsecurity/go-cs-lib/version"
@@ -14,6 +18,67 @@ import (
 	"github.com/crowdsecurity/crowdsec/pkg/models"
 )
 
+func (a *apic) GetUsageMetrics() (*models.AllMetrics, error) {
+	lpsMetrics, err := a.dbClient.GetLPsUsageMetrics()
+
+	if err != nil {
+		return nil, err
+	}
+
+	spew.Dump(lpsMetrics)
+
+	bouncersMetrics, err := a.dbClient.GetBouncersUsageMetrics()
+	if err != nil {
+		return nil, err
+	}
+
+	spew.Dump(bouncersMetrics)
+
+	allMetrics := &models.AllMetrics{}
+
+	allLps := a.dbClient.ListMachines()
+	allBouncers := a.dbClient.ListBouncers()
+
+	for _, lpsMetric := range lpsMetrics {
+		lpName := lpsMetric.GeneratedBy
+		metrics := models.LogProcessorsMetricsItems0{}
+
+		err := json.Unmarshal([]byte(lpsMetric.Payload), &metrics)
+
+		if err != nil {
+			log.Errorf("unable to unmarshal LPs metrics (%s)", err)
+			continue
+		}
+
+		lp, err := a.dbClient.QueryMachineByID(lpName)
+
+		if err != nil {
+			log.Errorf("unable to get LP information for %s: %s", lpName, err)
+			continue
+		}
+
+		if lp.Hubstate != nil {
+			metrics.HubItems = *lp.Hubstate
+		}
+
+		metrics.Os = &models.OSversion{
+			Name:    lp.Osname,
+			Version: lp.Osversion,
+		}
+
+		metrics.FeatureFlags = strings.Split(lp.Featureflags, ",")
+		metrics.Version = &lp.Version
+		//TODO: meta
+
+	}
+
+	//bouncerInfos := make(map[string]string)
+
+	//TODO: add LAPI metrics
+
+	return allMetrics, nil
+}
+
 func (a *apic) GetMetrics() (*models.Metrics, error) {
 	machines, err := a.dbClient.ListMachines()
 	if err != nil {
@@ -160,3 +225,24 @@ func (a *apic) SendMetrics(stop chan (bool)) {
 		}
 	}
 }
+
+func (a *apic) SendUsageMetrics() {
+	defer trace.CatchPanic("lapi/usageMetricsToAPIC")
+
+	ticker := time.NewTicker(5 * time.Second)
+
+	for {
+		select {
+		case <-a.metricsTomb.Dying():
+			//The normal metrics routine also kills push/pull tombs, does that make sense ?
+			ticker.Stop()
+			return
+		case <-ticker.C:
+			_, err := a.GetUsageMetrics()
+			if err != nil {
+				log.Errorf("unable to get usage metrics (%s)", err)
+			}
+
+		}
+	}
+}

+ 11 - 1
pkg/apiserver/apiserver.go

@@ -25,6 +25,7 @@ import (
 	"github.com/crowdsecurity/crowdsec/pkg/csconfig"
 	"github.com/crowdsecurity/crowdsec/pkg/csplugin"
 	"github.com/crowdsecurity/crowdsec/pkg/database"
+	"github.com/crowdsecurity/crowdsec/pkg/fflag"
 	"github.com/crowdsecurity/crowdsec/pkg/types"
 )
 
@@ -360,6 +361,15 @@ func (s *APIServer) Run(apiReady chan bool) error {
 			s.apic.SendMetrics(make(chan bool))
 			return nil
 		})
+
+		if fflag.CAPIUsageMetrics.IsEnabled() {
+			log.Infof("CAPI_USAGE_METRICS flag is enabled, starting usage metrics routine")
+			s.apic.metricsTomb.Go(func() error {
+				s.apic.SendUsageMetrics()
+				return nil
+			})
+		}
+
 	}
 
 	s.httpServerTomb.Go(func() error {
@@ -368,7 +378,7 @@ func (s *APIServer) Run(apiReady chan bool) error {
 
 	if err := s.httpServerTomb.Wait(); err != nil {
 		return fmt.Errorf("local API server stopped with error: %w", err)
-        }
+	}
 
 	return nil
 }

+ 32 - 1
pkg/database/metrics.go

@@ -15,7 +15,6 @@ import (
 // RemoveOldMetrics
 // avoid errors.Wrapf
 
-
 func (c *Client) CreateMetric(generatedType metric.GeneratedType, generatedBy string, collectedAt time.Time, payload string) (*ent.Metric, error) {
 	metric, err := c.Ent.Metric.
 		Create().
@@ -38,3 +37,35 @@ func (c *Client) CreateMetric(generatedType metric.GeneratedType, generatedBy st
 
 	return metric, nil
 }
+
+func (c *Client) GetLPsUsageMetrics() ([]*ent.Metric, error) {
+	metrics, err := c.Ent.Metric.Query().
+		Where(
+			metric.GeneratedTypeEQ(metric.GeneratedTypeLP),
+			metric.PushedAtIsNil(),
+		).
+		Order(ent.Desc(metric.FieldCollectedAt)).
+		All(c.CTX)
+	if err != nil {
+		c.Log.Warningf("GetLPsUsageMetrics: %s", err)
+		return nil, fmt.Errorf("getting LPs usage metrics: %w", err)
+	}
+
+	return metrics, nil
+}
+
+func (c *Client) GetBouncersUsageMetrics() ([]*ent.Metric, error) {
+	metrics, err := c.Ent.Metric.Query().
+		Where(
+			metric.GeneratedTypeEQ(metric.GeneratedTypeRC),
+			metric.PushedAtIsNil(),
+		).
+		Order(ent.Desc(metric.FieldCollectedAt)).
+		All(c.CTX)
+	if err != nil {
+		c.Log.Warningf("GetBouncersUsageMetrics: %s", err)
+		return nil, fmt.Errorf("getting bouncers usage metrics: %w", err)
+	}
+
+	return metrics, nil
+}

+ 6 - 0
pkg/fflag/crowdsec.go

@@ -8,6 +8,7 @@ var ChunkedDecisionsStream = &Feature{Name: "chunked_decisions_stream", Descript
 var PapiClient = &Feature{Name: "papi_client", Description: "Enable Polling API client", State: DeprecatedState}
 var Re2GrokSupport = &Feature{Name: "re2_grok_support", Description: "Enable RE2 support for GROK patterns"}
 var Re2RegexpInfileSupport = &Feature{Name: "re2_regexp_in_file_support", Description: "Enable RE2 support for RegexpInFile expr helper"}
+var CAPIUsageMetrics = &Feature{Name: "capi_usage_metrics", Description: "Enable usage metrics push to CAPI"}
 
 func RegisterAllFeatures() error {
 	err := Crowdsec.RegisterFeature(CscliSetup)
@@ -40,5 +41,10 @@ func RegisterAllFeatures() error {
 		return err
 	}
 
+	err = Crowdsec.RegisterFeature(CAPIUsageMetrics)
+	if err != nil {
+		return err
+	}
+
 	return nil
 }