(wip) lp metrics

This commit is contained in:
marco 2024-02-13 15:08:17 +01:00
parent 8315daf85a
commit 48f1a1a03a
19 changed files with 310 additions and 35 deletions

View file

@ -23,35 +23,36 @@ import (
"github.com/crowdsecurity/crowdsec/pkg/types"
)
func initCrowdsec(cConfig *csconfig.Config, hub *cwhub.Hub) (*parser.Parsers, error) {
func initCrowdsec(cConfig *csconfig.Config, hub *cwhub.Hub) (*parser.Parsers, []acquisition.DataSource, error) {
var err error
if err = alertcontext.LoadConsoleContext(cConfig, hub); err != nil {
return nil, fmt.Errorf("while loading context: %w", err)
return nil, nil, fmt.Errorf("while loading context: %w", err)
}
// Start loading configs
csParsers := parser.NewParsers(hub)
if csParsers, err = parser.LoadParsers(cConfig, csParsers); err != nil {
return nil, fmt.Errorf("while loading parsers: %w", err)
return nil, nil, fmt.Errorf("while loading parsers: %w", err)
}
if err := LoadBuckets(cConfig, hub); err != nil {
return nil, fmt.Errorf("while loading scenarios: %w", err)
return nil, nil, fmt.Errorf("while loading scenarios: %w", err)
}
if err := appsec.LoadAppsecRules(hub); err != nil {
return nil, fmt.Errorf("while loading appsec rules: %w", err)
return nil, nil, fmt.Errorf("while loading appsec rules: %w", err)
}
if err := LoadAcquisition(cConfig); err != nil {
return nil, fmt.Errorf("while loading acquisition config: %w", err)
datasources, err := LoadAcquisition(cConfig)
if err != nil {
return nil, nil, fmt.Errorf("while loading acquisition config: %w", err)
}
return csParsers, nil
return csParsers, datasources, nil
}
func runCrowdsec(cConfig *csconfig.Config, parsers *parser.Parsers, hub *cwhub.Hub) error {
func runCrowdsec(cConfig *csconfig.Config, parsers *parser.Parsers, hub *cwhub.Hub, datasources []acquisition.DataSource) error {
inputEventChan = make(chan types.Event)
inputLineChan = make(chan types.Event)
@ -141,6 +142,12 @@ func runCrowdsec(cConfig *csconfig.Config, parsers *parser.Parsers, hub *cwhub.H
})
outputWg.Wait()
lpMetricsTomb.Go(func() error {
// in case of reload, we send a new startup time
// (use crowdsecT0 as a reference for the first startup time)
return lpMetrics(apiClient, cConfig.API.Server.ConsoleConfig, datasources)
})
if cConfig.Prometheus != nil && cConfig.Prometheus.Enabled {
aggregated := false
if cConfig.Prometheus.Level == "aggregated" {
@ -161,7 +168,7 @@ func runCrowdsec(cConfig *csconfig.Config, parsers *parser.Parsers, hub *cwhub.H
return nil
}
func serveCrowdsec(parsers *parser.Parsers, cConfig *csconfig.Config, hub *cwhub.Hub, agentReady chan bool) {
func serveCrowdsec(parsers *parser.Parsers, cConfig *csconfig.Config, hub *cwhub.Hub, datasources []acquisition.DataSource, agentReady chan bool) {
crowdsecTomb.Go(func() error {
defer trace.CatchPanic("crowdsec/serveCrowdsec")
@ -171,7 +178,7 @@ func serveCrowdsec(parsers *parser.Parsers, cConfig *csconfig.Config, hub *cwhub
log.Debugf("running agent after %s ms", time.Since(crowdsecT0))
agentReady <- true
if err := runCrowdsec(cConfig, parsers, hub); err != nil {
if err := runCrowdsec(cConfig, parsers, hub, datasources); err != nil {
log.Fatalf("unable to start crowdsec routines: %s", err)
}
}()

113
cmd/crowdsec/lpmetrics.go Normal file
View file

@ -0,0 +1,113 @@
package main
import (
"context"
"net/http"
log "github.com/sirupsen/logrus"
"github.com/blackfireio/osinfo"
"time"
"github.com/crowdsecurity/go-cs-lib/ptr"
"github.com/crowdsecurity/go-cs-lib/trace"
"github.com/crowdsecurity/crowdsec/pkg/acquisition"
"github.com/crowdsecurity/crowdsec/pkg/apiclient"
"github.com/crowdsecurity/crowdsec/pkg/csconfig"
"github.com/crowdsecurity/crowdsec/pkg/cwversion"
"github.com/crowdsecurity/crowdsec/pkg/fflag"
"github.com/crowdsecurity/crowdsec/pkg/models"
)
func detectOs() (string, string) {
if cwversion.System == "docker" {
return "docker", ""
}
osInfo, err := osinfo.GetOSInfo()
if err != nil {
return cwversion.System, "???"
}
return osInfo.Name, osInfo.Version
}
func lpMetricsPayload(consoleCfg *csconfig.ConsoleConfig, datasources []acquisition.DataSource, windowSize int, utcStartupTimestamp int64) models.AllMetrics {
meta := &models.MetricsMeta{
UtcStartupTimestamp: float64(utcStartupTimestamp),
WindowSizeSeconds: int64(windowSize),
}
osName, osVersion := detectOs()
os := &models.OSversion{
Name: osName,
Version: osVersion,
}
features := fflag.Crowdsec.GetEnabledFeatures()
datasourceMap := map[string]int64{}
for _, ds := range datasources {
datasourceMap[ds.GetName()] += 1
}
return models.AllMetrics{
LogProcessors: []models.LogProcessorsMetrics{
{
&models.LogProcessorsMetricsItems0{
BaseMetrics: models.BaseMetrics{
Meta: meta,
Os: os,
Version: ptr.Of(cwversion.VersionStr()),
FeatureFlags: features,
},
ConsoleOptions: consoleCfg.EnabledOptions(),
Datasources: datasourceMap,
},
},
},
}
}
// lpMetrics collects metrics from the LP and sends them to the LAPI
func lpMetrics(client *apiclient.ApiClient, consoleCfg *csconfig.ConsoleConfig, datasources []acquisition.DataSource) error {
defer trace.CatchPanic("crowdsec/runLpMetrics")
log.Trace("Starting lpMetrics goroutine")
windowSize := 6
utcStartupEpoch := time.Now().Unix()
met := lpMetricsPayload(consoleCfg, datasources, windowSize, utcStartupEpoch)
ticker := time.NewTicker(time.Duration(windowSize) * time.Second)
log.Tracef("Sending lp metrics every %d seconds", windowSize)
LOOP:
for {
select {
case <-ticker.C:
met.LogProcessors[0][0].Meta.UtcNowTimestamp = float64(time.Now().Unix())
_, resp, err := client.UsageMetrics.Add(context.Background(), &met)
if err != nil {
log.Errorf("failed to send lp metrics: %s", err)
continue
}
if resp.Response.StatusCode != http.StatusCreated {
log.Errorf("failed to send lp metrics: %s", resp.Response.Status)
continue
}
log.Tracef("lp usage metrics sent")
case <-lpMetricsTomb.Dying():
break LOOP
}
}
ticker.Stop()
return nil
}

View file

@ -27,13 +27,14 @@ import (
var (
/*tombs for the parser, buckets and outputs.*/
acquisTomb tomb.Tomb
parsersTomb tomb.Tomb
bucketsTomb tomb.Tomb
outputsTomb tomb.Tomb
apiTomb tomb.Tomb
crowdsecTomb tomb.Tomb
pluginTomb tomb.Tomb
acquisTomb tomb.Tomb
parsersTomb tomb.Tomb
bucketsTomb tomb.Tomb
outputsTomb tomb.Tomb
apiTomb tomb.Tomb
crowdsecTomb tomb.Tomb
pluginTomb tomb.Tomb
lpMetricsTomb tomb.Tomb
flags *Flags
@ -107,7 +108,7 @@ func LoadBuckets(cConfig *csconfig.Config, hub *cwhub.Hub) error {
return nil
}
func LoadAcquisition(cConfig *csconfig.Config) error {
func LoadAcquisition(cConfig *csconfig.Config) ([]acquisition.DataSource, error) {
var err error
if flags.SingleFileType != "" && flags.OneShotDSN != "" {
@ -116,20 +117,20 @@ func LoadAcquisition(cConfig *csconfig.Config) error {
dataSources, err = acquisition.LoadAcquisitionFromDSN(flags.OneShotDSN, flags.Labels, flags.Transform)
if err != nil {
return errors.Wrapf(err, "failed to configure datasource for %s", flags.OneShotDSN)
return nil, errors.Wrapf(err, "failed to configure datasource for %s", flags.OneShotDSN)
}
} else {
dataSources, err = acquisition.LoadAcquisitionFromFile(cConfig.Crowdsec)
if err != nil {
return err
return nil, err
}
}
if len(dataSources) == 0 {
return fmt.Errorf("no datasource enabled")
return nil, fmt.Errorf("no datasource enabled")
}
return nil
return dataSources, nil
}
var (

View file

@ -86,7 +86,7 @@ func reloadHandler(sig os.Signal) (*csconfig.Config, error) {
return nil, fmt.Errorf("while loading hub index: %w", err)
}
csParsers, err := initCrowdsec(cConfig, hub)
csParsers, datasources, err := initCrowdsec(cConfig, hub)
if err != nil {
return nil, fmt.Errorf("unable to init crowdsec: %w", err)
}
@ -103,7 +103,7 @@ func reloadHandler(sig os.Signal) (*csconfig.Config, error) {
}
agentReady := make(chan bool, 1)
serveCrowdsec(csParsers, cConfig, hub, agentReady)
serveCrowdsec(csParsers, cConfig, hub, datasources, agentReady)
}
log.Printf("Reload is finished")
@ -369,14 +369,14 @@ func Serve(cConfig *csconfig.Config, agentReady chan bool) error {
return fmt.Errorf("while loading hub index: %w", err)
}
csParsers, err := initCrowdsec(cConfig, hub)
csParsers, datasources, err := initCrowdsec(cConfig, hub)
if err != nil {
return fmt.Errorf("crowdsec init: %w", err)
}
// if it's just linting, we're done
if !flags.TestMode {
serveCrowdsec(csParsers, cConfig, hub, agentReady)
serveCrowdsec(csParsers, cConfig, hub, datasources, agentReady)
} else {
agentReady <- true
}

View file

@ -39,6 +39,7 @@ type ApiClient struct {
Metrics *MetricsService
Signal *SignalService
HeartBeat *HeartBeatService
UsageMetrics *UsageMetricsService
}
func (a *ApiClient) GetClient() *http.Client {
@ -95,6 +96,7 @@ func NewClient(config *Config) (*ApiClient, error) {
c.Signal = (*SignalService)(&c.common)
c.DecisionDelete = (*DecisionDeleteService)(&c.common)
c.HeartBeat = (*HeartBeatService)(&c.common)
c.UsageMetrics = (*UsageMetricsService)(&c.common)
return c, nil
}

View file

@ -0,0 +1,32 @@
package apiclient
import (
"context"
"fmt"
"net/http"
log "github.com/sirupsen/logrus"
"github.com/crowdsecurity/crowdsec/pkg/models"
)
type UsageMetricsService service
func (s *UsageMetricsService) Add(ctx context.Context, metrics *models.AllMetrics) (interface{}, *Response, error) {
log.Warnf("prefix: %s", s.client.URLPrefix)
u := fmt.Sprintf("%s/usage-metrics/", s.client.URLPrefix)
req, err := s.client.NewRequest(http.MethodPost, u, &metrics)
if err != nil {
return nil, nil, err
}
var response interface{}
resp, err := s.client.Do(ctx, req, &response)
if err != nil {
return nil, resp, err
}
return &response, resp, nil
}

View file

@ -106,6 +106,7 @@ func (c *Controller) NewV1() error {
jwtAuth.DELETE("/decisions", c.HandlerV1.DeleteDecisions)
jwtAuth.DELETE("/decisions/:decision_id", c.HandlerV1.DeleteDecisionById)
jwtAuth.GET("/heartbeat", c.HandlerV1.HeartBeat)
jwtAuth.POST("/usage-metrics", c.HandlerV1.UsageMetrics)
}
apiKeyAuth := groupV1.Group("")
@ -115,6 +116,7 @@ func (c *Controller) NewV1() error {
apiKeyAuth.HEAD("/decisions", c.HandlerV1.GetDecision)
apiKeyAuth.GET("/decisions/stream", c.HandlerV1.StreamDecision)
apiKeyAuth.HEAD("/decisions/stream", c.HandlerV1.StreamDecision)
// apiKeyAuth.POST("/usage-metrics", c.HandlerV1.UsageMetrics)
}
return nil

View file

@ -0,0 +1,42 @@
package v1
import (
"net/http"
jwt "github.com/appleboy/gin-jwt/v2"
"github.com/gin-gonic/gin"
// "github.com/sanity-io/litter"
"github.com/go-openapi/strfmt"
log "github.com/sirupsen/logrus"
"github.com/crowdsecurity/crowdsec/pkg/models"
)
// UsageMetrics receives metrics from log processors and remediation components
func (c *Controller) UsageMetrics(gctx *gin.Context) {
var input models.AllMetrics
claims := jwt.ExtractClaims(gctx)
// TBD: use defined rather than hardcoded key to find back owner
machineID := claims["id"].(string)
if err := gctx.ShouldBindJSON(&input); err != nil {
log.Errorf("Failed to bind json: %s", err)
gctx.JSON(http.StatusBadRequest, gin.H{"message": err.Error()})
return
}
if err := input.Validate(strfmt.Default); err != nil {
log.Errorf("Failed to validate input: %s", err)
c.HandleDBErrors(gctx, err)
return
}
log.Infof("Received all metrics from %s", machineID)
// inputStr := litter.Sdump(input)
// log.Trace(inputStr)
// empty body
gctx.Status(http.StatusCreated)
}

View file

@ -37,6 +37,29 @@ type ConsoleConfig struct {
ShareContext *bool `yaml:"share_context"`
}
func (c *ConsoleConfig) EnabledOptions() []string {
ret := []string{}
if c == nil {
return ret
}
if c.ShareCustomScenarios != nil && *c.ShareCustomScenarios {
ret = append(ret, SEND_CUSTOM_SCENARIOS)
}
if c.ShareTaintedScenarios != nil && *c.ShareTaintedScenarios {
ret = append(ret, SEND_TAINTED_SCENARIOS)
}
if c.ShareManualDecisions != nil && *c.ShareManualDecisions {
ret = append(ret, SEND_MANUAL_SCENARIOS)
}
if c.ConsoleManagement != nil && *c.ConsoleManagement {
ret = append(ret, CONSOLE_MANAGEMENT)
}
if c.ShareContext != nil && *c.ShareContext {
ret = append(ret, SEND_CONTEXT)
}
return ret
}
func (c *ConsoleConfig) IsPAPIEnabled() bool {
if c == nil || c.ConsoleManagement == nil {
return false

View file

@ -66,7 +66,7 @@ teardown() {
}
@test "simulate one bouncer request with a valid cert" {
rune -0 curl -s --cert "${tmpdir}/bouncer.pem" --key "${tmpdir}/bouncer-key.pem" --cacert "${tmpdir}/bundle.pem" https://localhost:8080/v1/decisions\?ip=42.42.42.42
rune -0 curl -f -s --cert "${tmpdir}/bouncer.pem" --key "${tmpdir}/bouncer-key.pem" --cacert "${tmpdir}/bundle.pem" https://localhost:8080/v1/decisions\?ip=42.42.42.42
assert_output "null"
rune -0 cscli bouncers list -o json
rune -0 jq '. | length' <(output)

View file

@ -90,3 +90,40 @@ teardown() {
rune -0 jq '. | length' <(output)
assert_output 1
}
@test "usage metrics" {
# a registered log processor can send metrics for the console
token=$(lp_login)
usage_metrics="http://localhost:8080/v1/usage-metrics"
payload=$(cat <<-EOT
remediation_components: []
log_processors:
-
- version: "v1.0"
feature_flags:
- marshmallows
meta:
window_size_seconds: 600
utc_startup_timestamp: 1707399316
utc_now_timestamp: 1707485349
os:
name: CentOS
version: "8"
metrics:
- name: logs_parsed
value: 5000
unit: count
labels: {}
console_options:
- share_context
datasources:
syslog: 1
file: 4
EOT
)
echo -e "$payload" >/tmp/bbb
rune -0 curl -f -sS -H "Authorization: Bearer ${token}" -X POST "$usage_metrics" --data "$(echo "$payload" | yq -o j)"
refute_output
}

View file

@ -24,7 +24,7 @@ setup() {
api() {
URI="$1"
curl -s -H "X-Api-Key: ${API_KEY}" "${CROWDSEC_API_URL}${URI}"
curl -f -s -H "X-Api-Key: ${API_KEY}" "${CROWDSEC_API_URL}${URI}"
}
#----------

View file

@ -24,7 +24,7 @@ setup() {
api() {
URI="$1"
curl -s -H "X-Api-Key: ${API_KEY}" "${CROWDSEC_API_URL}${URI}"
curl -f -s -H "X-Api-Key: ${API_KEY}" "${CROWDSEC_API_URL}${URI}"
}
#----------

View file

@ -24,7 +24,7 @@ setup() {
api() {
URI="$1"
curl -s -H "X-Api-Key: ${API_KEY}" "${CROWDSEC_API_URL}${URI}"
curl -f -s -H "X-Api-Key: ${API_KEY}" "${CROWDSEC_API_URL}${URI}"
}
#----------

View file

@ -24,7 +24,7 @@ setup() {
api() {
URI="$1"
curl -s -H "X-Api-Key: ${API_KEY}" "${CROWDSEC_API_URL}${URI}"
curl -f -s -H "X-Api-Key: ${API_KEY}" "${CROWDSEC_API_URL}${URI}"
}
#----------

View file

@ -26,7 +26,7 @@ setup() {
api() {
URI="$1"
curl -s -H "X-Api-Key:${API_KEY}" "${CROWDSEC_API_URL}${URI}"
curl -f -s -H "X-Api-Key:${API_KEY}" "${CROWDSEC_API_URL}${URI}"
}
output_new_decisions() {

View file

@ -25,7 +25,7 @@ setup() {
api() {
URI="$1"
curl -s -H "X-Api-Key: ${API_KEY}" "${CROWDSEC_API_URL}${URI}"
curl -f -s -H "X-Api-Key: ${API_KEY}" "${CROWDSEC_API_URL}${URI}"
}
@test "adding decisions for multiple scopes" {

View file

@ -25,7 +25,7 @@ setup() {
api() {
URI="$1"
curl -s -H "X-Api-Key: ${API_KEY}" "${CROWDSEC_API_URL}${URI}"
curl -f -s -H "X-Api-Key: ${API_KEY}" "${CROWDSEC_API_URL}${URI}"
}
@test "adding decisions for multiple ips" {

View file

@ -276,3 +276,19 @@ rune() {
run --separate-stderr "$@"
}
export -f rune
# as a log processor, connect to lapi and get a token
lp_login() {
local cred
cred=$(config_get .api.client.credentials_path)
local url
url="$(yq '.url' < "$cred")/v1/watchers/login"
local resp
resp=$(yq -oj -I0 '{"machine_id":.login,"password":.password}' < "$cred" | curl -s -X POST "$url" --data-binary @-)
if [[ "$(yq -e '.code' <<<"$resp")" != 200 ]]; then
echo "login_lp: failed to login" >&3
return 1
fi
echo "$resp" | yq -r '.token'
}
export -f lp_login