lint (wsl)
This commit is contained in:
parent
0cb08e1c31
commit
50d99950b6
7 changed files with 80 additions and 17 deletions
|
@ -14,8 +14,8 @@ import (
|
|||
"github.com/crowdsecurity/go-cs-lib/trace"
|
||||
|
||||
"github.com/crowdsecurity/crowdsec/pkg/acquisition"
|
||||
"github.com/crowdsecurity/crowdsec/pkg/appsec"
|
||||
"github.com/crowdsecurity/crowdsec/pkg/alertcontext"
|
||||
"github.com/crowdsecurity/crowdsec/pkg/appsec"
|
||||
"github.com/crowdsecurity/crowdsec/pkg/csconfig"
|
||||
"github.com/crowdsecurity/crowdsec/pkg/cwhub"
|
||||
leaky "github.com/crowdsecurity/crowdsec/pkg/leakybucket"
|
||||
|
@ -57,45 +57,55 @@ func runCrowdsec(cConfig *csconfig.Config, parsers *parser.Parsers, hub *cwhub.H
|
|||
|
||||
//start go-routines for parsing, buckets pour and outputs.
|
||||
parserWg := &sync.WaitGroup{}
|
||||
|
||||
parsersTomb.Go(func() error {
|
||||
parserWg.Add(1)
|
||||
|
||||
for i := 0; i < cConfig.Crowdsec.ParserRoutinesCount; i++ {
|
||||
parsersTomb.Go(func() error {
|
||||
defer trace.CatchPanic("crowdsec/runParse")
|
||||
|
||||
if err := runParse(inputLineChan, inputEventChan, *parsers.Ctx, parsers.Nodes); err != nil { //this error will never happen as parser.Parse is not able to return errors
|
||||
log.Fatalf("starting parse error : %s", err)
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
})
|
||||
}
|
||||
parserWg.Done()
|
||||
|
||||
return nil
|
||||
})
|
||||
parserWg.Wait()
|
||||
|
||||
bucketWg := &sync.WaitGroup{}
|
||||
|
||||
bucketsTomb.Go(func() error {
|
||||
bucketWg.Add(1)
|
||||
/*restore previous state as well if present*/
|
||||
if cConfig.Crowdsec.BucketStateFile != "" {
|
||||
log.Warningf("Restoring buckets state from %s", cConfig.Crowdsec.BucketStateFile)
|
||||
|
||||
if err := leaky.LoadBucketsState(cConfig.Crowdsec.BucketStateFile, buckets, holders); err != nil {
|
||||
return fmt.Errorf("unable to restore buckets : %s", err)
|
||||
return fmt.Errorf("unable to restore buckets: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
for i := 0; i < cConfig.Crowdsec.BucketsRoutinesCount; i++ {
|
||||
bucketsTomb.Go(func() error {
|
||||
defer trace.CatchPanic("crowdsec/runPour")
|
||||
|
||||
if err := runPour(inputEventChan, holders, buckets, cConfig); err != nil {
|
||||
log.Fatalf("starting pour error : %s", err)
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
})
|
||||
}
|
||||
bucketWg.Done()
|
||||
|
||||
return nil
|
||||
})
|
||||
bucketWg.Wait()
|
||||
|
@ -109,19 +119,24 @@ func runCrowdsec(cConfig *csconfig.Config, parsers *parser.Parsers, hub *cwhub.H
|
|||
apiClient.HeartBeat.StartHeartBeat(context.Background(), &outputsTomb)
|
||||
|
||||
outputWg := &sync.WaitGroup{}
|
||||
|
||||
outputsTomb.Go(func() error {
|
||||
outputWg.Add(1)
|
||||
|
||||
for i := 0; i < cConfig.Crowdsec.OutputRoutinesCount; i++ {
|
||||
outputsTomb.Go(func() error {
|
||||
defer trace.CatchPanic("crowdsec/runOutput")
|
||||
|
||||
if err := runOutput(inputEventChan, outputEventChan, buckets, *parsers.Povfwctx, parsers.Povfwnodes, apiClient); err != nil {
|
||||
log.Fatalf("starting outputs error : %s", err)
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
})
|
||||
}
|
||||
outputWg.Done()
|
||||
|
||||
return nil
|
||||
})
|
||||
outputWg.Wait()
|
||||
|
@ -131,11 +146,12 @@ func runCrowdsec(cConfig *csconfig.Config, parsers *parser.Parsers, hub *cwhub.H
|
|||
if cConfig.Prometheus.Level == "aggregated" {
|
||||
aggregated = true
|
||||
}
|
||||
|
||||
if err := acquisition.GetMetrics(dataSources, aggregated); err != nil {
|
||||
return fmt.Errorf("while fetching prometheus metrics for datasources: %w", err)
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
log.Info("Starting processing data")
|
||||
|
||||
if err := acquisition.StartAcquisition(dataSources, inputLineChan, &acquisTomb); err != nil {
|
||||
|
@ -148,11 +164,13 @@ func runCrowdsec(cConfig *csconfig.Config, parsers *parser.Parsers, hub *cwhub.H
|
|||
func serveCrowdsec(parsers *parser.Parsers, cConfig *csconfig.Config, hub *cwhub.Hub, agentReady chan bool) {
|
||||
crowdsecTomb.Go(func() error {
|
||||
defer trace.CatchPanic("crowdsec/serveCrowdsec")
|
||||
|
||||
go func() {
|
||||
defer trace.CatchPanic("crowdsec/runCrowdsec")
|
||||
// this logs every time, even at config reload
|
||||
log.Debugf("running agent after %s ms", time.Since(crowdsecT0))
|
||||
agentReady <- true
|
||||
|
||||
if err := runCrowdsec(cConfig, parsers, hub); err != nil {
|
||||
log.Fatalf("unable to start crowdsec routines: %s", err)
|
||||
}
|
||||
|
@ -164,16 +182,20 @@ func serveCrowdsec(parsers *parser.Parsers, cConfig *csconfig.Config, hub *cwhub
|
|||
*/
|
||||
waitOnTomb()
|
||||
log.Debugf("Shutting down crowdsec routines")
|
||||
|
||||
if err := ShutdownCrowdsecRoutines(); err != nil {
|
||||
log.Fatalf("unable to shutdown crowdsec routines: %s", err)
|
||||
}
|
||||
|
||||
log.Debugf("everything is dead, return crowdsecTomb")
|
||||
|
||||
if dumpStates {
|
||||
dumpParserState()
|
||||
dumpOverflowState()
|
||||
dumpBucketsPour()
|
||||
os.Exit(0)
|
||||
}
|
||||
|
||||
return nil
|
||||
})
|
||||
}
|
||||
|
@ -183,55 +205,65 @@ func dumpBucketsPour() {
|
|||
if err != nil {
|
||||
log.Fatalf("open: %s", err)
|
||||
}
|
||||
|
||||
out, err := yaml.Marshal(leaky.BucketPourCache)
|
||||
if err != nil {
|
||||
log.Fatalf("marshal: %s", err)
|
||||
}
|
||||
|
||||
b, err := fd.Write(out)
|
||||
if err != nil {
|
||||
log.Fatalf("write: %s", err)
|
||||
}
|
||||
|
||||
log.Tracef("wrote %d bytes", b)
|
||||
|
||||
if err := fd.Close(); err != nil {
|
||||
log.Fatalf(" close: %s", err)
|
||||
}
|
||||
}
|
||||
|
||||
func dumpParserState() {
|
||||
|
||||
fd, err := os.OpenFile(filepath.Join(parser.DumpFolder, "parser-dump.yaml"), os.O_CREATE|os.O_WRONLY|os.O_TRUNC, 0666)
|
||||
if err != nil {
|
||||
log.Fatalf("open: %s", err)
|
||||
}
|
||||
|
||||
out, err := yaml.Marshal(parser.StageParseCache)
|
||||
if err != nil {
|
||||
log.Fatalf("marshal: %s", err)
|
||||
}
|
||||
|
||||
b, err := fd.Write(out)
|
||||
if err != nil {
|
||||
log.Fatalf("write: %s", err)
|
||||
}
|
||||
|
||||
log.Tracef("wrote %d bytes", b)
|
||||
|
||||
if err := fd.Close(); err != nil {
|
||||
log.Fatalf(" close: %s", err)
|
||||
}
|
||||
}
|
||||
|
||||
func dumpOverflowState() {
|
||||
|
||||
fd, err := os.OpenFile(filepath.Join(parser.DumpFolder, "bucket-dump.yaml"), os.O_CREATE|os.O_WRONLY|os.O_TRUNC, 0666)
|
||||
if err != nil {
|
||||
log.Fatalf("open: %s", err)
|
||||
}
|
||||
|
||||
out, err := yaml.Marshal(bucketOverflows)
|
||||
if err != nil {
|
||||
log.Fatalf("marshal: %s", err)
|
||||
}
|
||||
|
||||
b, err := fd.Write(out)
|
||||
if err != nil {
|
||||
log.Fatalf("write: %s", err)
|
||||
}
|
||||
|
||||
log.Tracef("wrote %d bytes", b)
|
||||
|
||||
if err := fd.Close(); err != nil {
|
||||
log.Fatalf(" close: %s", err)
|
||||
}
|
||||
|
|
|
@ -35,10 +35,12 @@ func AuthenticatedLAPIClient(credentials csconfig.ApiCredentialsCfg, hub *cwhub.
|
|||
if err != nil {
|
||||
return nil, fmt.Errorf("parsing api url ('%s'): %w", credentials.URL, err)
|
||||
}
|
||||
|
||||
papiURL, err := url.Parse(credentials.PapiURL)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("parsing polling api url ('%s'): %w", credentials.PapiURL, err)
|
||||
}
|
||||
|
||||
password := strfmt.Password(credentials.Password)
|
||||
|
||||
client, err := apiclient.NewClient(&apiclient.Config{
|
||||
|
@ -61,6 +63,7 @@ func AuthenticatedLAPIClient(credentials csconfig.ApiCredentialsCfg, hub *cwhub.
|
|||
ret := make([]string, 0, len(scenarios)+len(appsecRules))
|
||||
ret = append(ret, scenarios...)
|
||||
ret = append(ret, appsecRules...)
|
||||
|
||||
return ret, nil
|
||||
},
|
||||
})
|
||||
|
|
|
@ -114,13 +114,17 @@ func computeDynamicMetrics(next http.Handler, dbClient *database.Client) http.Ha
|
|||
}
|
||||
|
||||
decisionsFilters := make(map[string][]string, 0)
|
||||
|
||||
decisions, err := dbClient.QueryDecisionCountByScenario(decisionsFilters)
|
||||
if err != nil {
|
||||
log.Errorf("Error querying decisions for metrics: %v", err)
|
||||
next.ServeHTTP(w, r)
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
globalActiveDecisions.Reset()
|
||||
|
||||
for _, d := range decisions {
|
||||
globalActiveDecisions.With(prometheus.Labels{"reason": d.Scenario, "origin": d.Origin, "action": d.Type}).Set(float64(d.Count))
|
||||
}
|
||||
|
@ -136,6 +140,7 @@ func computeDynamicMetrics(next http.Handler, dbClient *database.Client) http.Ha
|
|||
if err != nil {
|
||||
log.Errorf("Error querying alerts for metrics: %v", err)
|
||||
next.ServeHTTP(w, r)
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
|
@ -173,7 +178,6 @@ func registerPrometheus(config *csconfig.PrometheusCfg) {
|
|||
globalActiveDecisions, globalAlerts, parser.NodesWlHitsOk, parser.NodesWlHits,
|
||||
cache.CacheMetrics, exprhelpers.RegexpCacheMetrics,
|
||||
)
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -188,6 +192,7 @@ func servePrometheus(config *csconfig.PrometheusCfg, dbClient *database.Client,
|
|||
|
||||
http.Handle("/metrics", computeDynamicMetrics(promhttp.Handler(), dbClient))
|
||||
log.Debugf("serving metrics after %s ms", time.Since(crowdsecT0))
|
||||
|
||||
if err := http.ListenAndServe(fmt.Sprintf("%s:%d", config.ListenAddr, config.ListenPort), nil); err != nil {
|
||||
log.Warningf("prometheus: %s", err)
|
||||
}
|
||||
|
|
|
@ -16,7 +16,6 @@ import (
|
|||
)
|
||||
|
||||
func dedupAlerts(alerts []types.RuntimeAlert) ([]*models.Alert, error) {
|
||||
|
||||
var dedupCache []*models.Alert
|
||||
|
||||
for idx, alert := range alerts {
|
||||
|
@ -26,16 +25,21 @@ func dedupAlerts(alerts []types.RuntimeAlert) ([]*models.Alert, error) {
|
|||
dedupCache = append(dedupCache, alert.Alert)
|
||||
continue
|
||||
}
|
||||
|
||||
for k, src := range alert.Sources {
|
||||
refsrc := *alert.Alert //copy
|
||||
|
||||
log.Tracef("source[%s]", k)
|
||||
|
||||
refsrc.Source = &src
|
||||
dedupCache = append(dedupCache, &refsrc)
|
||||
}
|
||||
}
|
||||
|
||||
if len(dedupCache) != len(alerts) {
|
||||
log.Tracef("went from %d to %d alerts", len(alerts), len(dedupCache))
|
||||
}
|
||||
|
||||
return dedupCache, nil
|
||||
}
|
||||
|
||||
|
@ -46,23 +50,25 @@ func PushAlerts(alerts []types.RuntimeAlert, client *apiclient.ApiClient) error
|
|||
if err != nil {
|
||||
return fmt.Errorf("failed to transform alerts for api: %w", err)
|
||||
}
|
||||
|
||||
_, _, err = client.Alerts.Add(ctx, alertsToPush)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed sending alert to LAPI: %w", err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
var bucketOverflows []types.Event
|
||||
|
||||
func runOutput(input chan types.Event, overflow chan types.Event, buckets *leaky.Buckets,
|
||||
postOverflowCTX parser.UnixParserCtx, postOverflowNodes []parser.Node,
|
||||
client *apiclient.ApiClient) error {
|
||||
func runOutput(input chan types.Event, overflow chan types.Event, buckets *leaky.Buckets, postOverflowCTX parser.UnixParserCtx,
|
||||
postOverflowNodes []parser.Node, client *apiclient.ApiClient) error {
|
||||
var (
|
||||
cache []types.RuntimeAlert
|
||||
cacheMutex sync.Mutex
|
||||
)
|
||||
|
||||
ticker := time.NewTicker(1 * time.Second)
|
||||
|
||||
var cache []types.RuntimeAlert
|
||||
var cacheMutex sync.Mutex
|
||||
LOOP:
|
||||
for {
|
||||
select {
|
||||
|
@ -90,6 +96,7 @@ LOOP:
|
|||
log.Errorf("while pushing leftovers to api : %s", err)
|
||||
}
|
||||
}
|
||||
|
||||
break LOOP
|
||||
case event := <-overflow:
|
||||
/*if alert is empty and mapKey is present, the overflow is just to cleanup bucket*/
|
||||
|
@ -100,7 +107,7 @@ LOOP:
|
|||
/* process post overflow parser nodes */
|
||||
event, err := parser.Parse(postOverflowCTX, event, postOverflowNodes)
|
||||
if err != nil {
|
||||
return fmt.Errorf("postoverflow failed : %s", err)
|
||||
return fmt.Errorf("postoverflow failed: %w", err)
|
||||
}
|
||||
log.Printf("%s", *event.Overflow.Alert.Message)
|
||||
//if the Alert is nil, it's to signal bucket is ready for GC, don't track this
|
||||
|
@ -130,6 +137,6 @@ LOOP:
|
|||
}
|
||||
|
||||
ticker.Stop()
|
||||
return nil
|
||||
|
||||
return nil
|
||||
}
|
||||
|
|
|
@ -45,7 +45,7 @@ func StartRunSvc() error {
|
|||
dbClient, err = database.NewClient(cConfig.DbConfig)
|
||||
|
||||
if err != nil {
|
||||
return fmt.Errorf("unable to create database client: %s", err)
|
||||
return fmt.Errorf("unable to create database client: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -84,7 +84,7 @@ func WindowsRun() error {
|
|||
dbClient, err = database.NewClient(cConfig.DbConfig)
|
||||
|
||||
if err != nil {
|
||||
return fmt.Errorf("unable to create database client: %s", err)
|
||||
return fmt.Errorf("unable to create database client: %w", err)
|
||||
}
|
||||
}
|
||||
registerPrometheus(cConfig.Prometheus)
|
||||
|
|
|
@ -42,7 +42,9 @@ func debugHandler(sig os.Signal, cConfig *csconfig.Config) error {
|
|||
if err := leaky.ShutdownAllBuckets(buckets); err != nil {
|
||||
log.Warningf("Failed to shut down routines : %s", err)
|
||||
}
|
||||
|
||||
log.Printf("Shutdown is finished, buckets are in %s", tmpFile)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -66,8 +68,10 @@ func reloadHandler(sig os.Signal) (*csconfig.Config, error) {
|
|||
if !cConfig.DisableAPI {
|
||||
if flags.DisableCAPI {
|
||||
log.Warningf("Communication with CrowdSec Central API disabled from args")
|
||||
|
||||
cConfig.API.Server.OnlineClient = nil
|
||||
}
|
||||
|
||||
apiServer, err := initAPIServer(cConfig)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("unable to init api server: %w", err)
|
||||
|
@ -109,6 +113,7 @@ func reloadHandler(sig os.Signal) (*csconfig.Config, error) {
|
|||
log.Warningf("Failed to delete temp file (%s) : %s", tmpFile, err)
|
||||
}
|
||||
}
|
||||
|
||||
return cConfig, nil
|
||||
}
|
||||
|
||||
|
@ -116,10 +121,12 @@ func ShutdownCrowdsecRoutines() error {
|
|||
var reterr error
|
||||
|
||||
log.Debugf("Shutting down crowdsec sub-routines")
|
||||
|
||||
if len(dataSources) > 0 {
|
||||
acquisTomb.Kill(nil)
|
||||
log.Debugf("waiting for acquisition to finish")
|
||||
drainChan(inputLineChan)
|
||||
|
||||
if err := acquisTomb.Wait(); err != nil {
|
||||
log.Warningf("Acquisition returned error : %s", err)
|
||||
reterr = err
|
||||
|
@ -129,6 +136,7 @@ func ShutdownCrowdsecRoutines() error {
|
|||
log.Debugf("acquisition is finished, wait for parser/bucket/ouputs.")
|
||||
parsersTomb.Kill(nil)
|
||||
drainChan(inputEventChan)
|
||||
|
||||
if err := parsersTomb.Wait(); err != nil {
|
||||
log.Warningf("Parsers returned error : %s", err)
|
||||
reterr = err
|
||||
|
@ -159,6 +167,7 @@ func ShutdownCrowdsecRoutines() error {
|
|||
log.Warningf("Outputs returned error : %s", err)
|
||||
reterr = err
|
||||
}
|
||||
|
||||
log.Debugf("outputs are done")
|
||||
case <-time.After(3 * time.Second):
|
||||
// this can happen if outputs are stuck in a http retry loop
|
||||
|
@ -180,6 +189,7 @@ func shutdownAPI() error {
|
|||
}
|
||||
|
||||
log.Debugf("done")
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -192,6 +202,7 @@ func shutdownCrowdsec() error {
|
|||
}
|
||||
|
||||
log.Debugf("done")
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -291,6 +302,7 @@ func HandleSignals(cConfig *csconfig.Config) error {
|
|||
if err == nil {
|
||||
log.Warning("Crowdsec service shutting down")
|
||||
}
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
|
@ -324,6 +336,7 @@ func Serve(cConfig *csconfig.Config, agentReady chan bool) error {
|
|||
|
||||
if cConfig.API.CTI != nil && *cConfig.API.CTI.Enabled {
|
||||
log.Infof("Crowdsec CTI helper enabled")
|
||||
|
||||
if err := exprhelpers.InitCrowdsecCTI(cConfig.API.CTI.Key, cConfig.API.CTI.CacheTimeout, cConfig.API.CTI.CacheSize, cConfig.API.CTI.LogLevel); err != nil {
|
||||
return fmt.Errorf("failed to init crowdsec cti: %w", err)
|
||||
}
|
||||
|
@ -336,6 +349,7 @@ func Serve(cConfig *csconfig.Config, agentReady chan bool) error {
|
|||
|
||||
if flags.DisableCAPI {
|
||||
log.Warningf("Communication with CrowdSec Central API disabled from args")
|
||||
|
||||
cConfig.API.Server.OnlineClient = nil
|
||||
}
|
||||
|
||||
|
@ -394,6 +408,7 @@ func Serve(cConfig *csconfig.Config, agentReady chan bool) error {
|
|||
|
||||
for _, ch := range waitChans {
|
||||
<-ch
|
||||
|
||||
switch ch {
|
||||
case apiTomb.Dead():
|
||||
log.Infof("api shutdown")
|
||||
|
@ -401,5 +416,6 @@ func Serve(cConfig *csconfig.Config, agentReady chan bool) error {
|
|||
log.Infof("crowdsec shutdown")
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
|
Loading…
Add table
Reference in a new issue