Prechádzať zdrojové kódy

log processor: share apiclient in output goroutines (#2836)

mmetc 1 rok pred
rodič
commit
d34fb7e8a8

+ 7 - 3
.golangci.yml

@@ -11,7 +11,7 @@ run:
 linters-settings:
   cyclop:
     # lower this after refactoring
-    max-complexity: 70
+    max-complexity: 53
 
   gci:
     sections:
@@ -26,7 +26,7 @@ linters-settings:
 
   gocyclo:
     # lower this after refactoring
-    min-complexity: 70
+    min-complexity: 49
 
   funlen:
     # Checks the number of lines in a function.
@@ -46,7 +46,7 @@ linters-settings:
 
   maintidx:
     # raise this after refactoring
-    under: 9
+    under: 11
 
   misspell:
     locale: US
@@ -263,6 +263,10 @@ issues:
         - perfsprint
       text: "fmt.Sprintf can be replaced .*"
 
+    - linters:
+        - perfsprint
+      text: "fmt.Errorf can be replaced with errors.New"
+
     #
     # Will fix, easy but some neurons required
     #

+ 3 - 1
cmd/crowdsec/api.go

@@ -56,7 +56,8 @@ func initAPIServer(cConfig *csconfig.Config) (*apiserver.APIServer, error) {
 	return apiServer, nil
 }
 
-func serveAPIServer(apiServer *apiserver.APIServer, apiReady chan bool) {
+func serveAPIServer(apiServer *apiserver.APIServer) {
+	apiReady := make(chan bool, 1)
 	apiTomb.Go(func() error {
 		defer trace.CatchPanic("crowdsec/serveAPIServer")
 		go func() {
@@ -80,6 +81,7 @@ func serveAPIServer(apiServer *apiserver.APIServer, apiReady chan bool) {
 		}
 		return nil
 	})
+	<-apiReady
 }
 
 func hasPlugins(profiles []*csconfig.ProfileCfg) bool {

+ 48 - 8
cmd/crowdsec/crowdsec.go

@@ -1,6 +1,7 @@
 package main
 
 import (
+	"context"
 	"fmt"
 	"os"
 	"path/filepath"
@@ -13,8 +14,8 @@ import (
 	"github.com/crowdsecurity/go-cs-lib/trace"
 
 	"github.com/crowdsecurity/crowdsec/pkg/acquisition"
-	"github.com/crowdsecurity/crowdsec/pkg/appsec"
 	"github.com/crowdsecurity/crowdsec/pkg/alertcontext"
+	"github.com/crowdsecurity/crowdsec/pkg/appsec"
 	"github.com/crowdsecurity/crowdsec/pkg/csconfig"
 	"github.com/crowdsecurity/crowdsec/pkg/cwhub"
 	leaky "github.com/crowdsecurity/crowdsec/pkg/leakybucket"
@@ -56,63 +57,86 @@ func runCrowdsec(cConfig *csconfig.Config, parsers *parser.Parsers, hub *cwhub.H
 
 	//start go-routines for parsing, buckets pour and outputs.
 	parserWg := &sync.WaitGroup{}
+
 	parsersTomb.Go(func() error {
 		parserWg.Add(1)
+
 		for i := 0; i < cConfig.Crowdsec.ParserRoutinesCount; i++ {
 			parsersTomb.Go(func() error {
 				defer trace.CatchPanic("crowdsec/runParse")
+
 				if err := runParse(inputLineChan, inputEventChan, *parsers.Ctx, parsers.Nodes); err != nil { //this error will never happen as parser.Parse is not able to return errors
 					log.Fatalf("starting parse error : %s", err)
 					return err
 				}
+
 				return nil
 			})
 		}
 		parserWg.Done()
+
 		return nil
 	})
 	parserWg.Wait()
 
 	bucketWg := &sync.WaitGroup{}
+
 	bucketsTomb.Go(func() error {
 		bucketWg.Add(1)
 		/*restore previous state as well if present*/
 		if cConfig.Crowdsec.BucketStateFile != "" {
 			log.Warningf("Restoring buckets state from %s", cConfig.Crowdsec.BucketStateFile)
+
 			if err := leaky.LoadBucketsState(cConfig.Crowdsec.BucketStateFile, buckets, holders); err != nil {
-				return fmt.Errorf("unable to restore buckets : %s", err)
+				return fmt.Errorf("unable to restore buckets: %w", err)
 			}
 		}
 
 		for i := 0; i < cConfig.Crowdsec.BucketsRoutinesCount; i++ {
 			bucketsTomb.Go(func() error {
 				defer trace.CatchPanic("crowdsec/runPour")
+
 				if err := runPour(inputEventChan, holders, buckets, cConfig); err != nil {
 					log.Fatalf("starting pour error : %s", err)
 					return err
 				}
+
 				return nil
 			})
 		}
 		bucketWg.Done()
+
 		return nil
 	})
 	bucketWg.Wait()
 
+	apiClient, err := AuthenticatedLAPIClient(*cConfig.API.Client.Credentials, hub)
+	if err != nil {
+		return err
+	}
+
+	log.Debugf("Starting HeartBeat service")
+	apiClient.HeartBeat.StartHeartBeat(context.Background(), &outputsTomb)
+
 	outputWg := &sync.WaitGroup{}
+
 	outputsTomb.Go(func() error {
 		outputWg.Add(1)
+
 		for i := 0; i < cConfig.Crowdsec.OutputRoutinesCount; i++ {
 			outputsTomb.Go(func() error {
 				defer trace.CatchPanic("crowdsec/runOutput")
-				if err := runOutput(inputEventChan, outputEventChan, buckets, *parsers.Povfwctx, parsers.Povfwnodes, *cConfig.API.Client.Credentials, hub); err != nil {
+
+				if err := runOutput(inputEventChan, outputEventChan, buckets, *parsers.Povfwctx, parsers.Povfwnodes, apiClient); err != nil {
 					log.Fatalf("starting outputs error : %s", err)
 					return err
 				}
+
 				return nil
 			})
 		}
 		outputWg.Done()
+
 		return nil
 	})
 	outputWg.Wait()
@@ -122,16 +146,16 @@ func runCrowdsec(cConfig *csconfig.Config, parsers *parser.Parsers, hub *cwhub.H
 		if cConfig.Prometheus.Level == "aggregated" {
 			aggregated = true
 		}
+
 		if err := acquisition.GetMetrics(dataSources, aggregated); err != nil {
 			return fmt.Errorf("while fetching prometheus metrics for datasources: %w", err)
 		}
-
 	}
+
 	log.Info("Starting processing data")
 
 	if err := acquisition.StartAcquisition(dataSources, inputLineChan, &acquisTomb); err != nil {
-		log.Fatalf("starting acquisition error : %s", err)
-		return err
+		return fmt.Errorf("starting acquisition error: %w", err)
 	}
 
 	return nil
@@ -140,11 +164,13 @@ func runCrowdsec(cConfig *csconfig.Config, parsers *parser.Parsers, hub *cwhub.H
 func serveCrowdsec(parsers *parser.Parsers, cConfig *csconfig.Config, hub *cwhub.Hub, agentReady chan bool) {
 	crowdsecTomb.Go(func() error {
 		defer trace.CatchPanic("crowdsec/serveCrowdsec")
+
 		go func() {
 			defer trace.CatchPanic("crowdsec/runCrowdsec")
 			// this logs every time, even at config reload
 			log.Debugf("running agent after %s ms", time.Since(crowdsecT0))
 			agentReady <- true
+
 			if err := runCrowdsec(cConfig, parsers, hub); err != nil {
 				log.Fatalf("unable to start crowdsec routines: %s", err)
 			}
@@ -156,16 +182,20 @@ func serveCrowdsec(parsers *parser.Parsers, cConfig *csconfig.Config, hub *cwhub
 		*/
 		waitOnTomb()
 		log.Debugf("Shutting down crowdsec routines")
+
 		if err := ShutdownCrowdsecRoutines(); err != nil {
 			log.Fatalf("unable to shutdown crowdsec routines: %s", err)
 		}
+
 		log.Debugf("everything is dead, return crowdsecTomb")
+
 		if dumpStates {
 			dumpParserState()
 			dumpOverflowState()
 			dumpBucketsPour()
 			os.Exit(0)
 		}
+
 		return nil
 	})
 }
@@ -175,55 +205,65 @@ func dumpBucketsPour() {
 	if err != nil {
 		log.Fatalf("open: %s", err)
 	}
+
 	out, err := yaml.Marshal(leaky.BucketPourCache)
 	if err != nil {
 		log.Fatalf("marshal: %s", err)
 	}
+
 	b, err := fd.Write(out)
 	if err != nil {
 		log.Fatalf("write: %s", err)
 	}
+
 	log.Tracef("wrote %d bytes", b)
+
 	if err := fd.Close(); err != nil {
 		log.Fatalf(" close: %s", err)
 	}
 }
 
 func dumpParserState() {
-
 	fd, err := os.OpenFile(filepath.Join(parser.DumpFolder, "parser-dump.yaml"), os.O_CREATE|os.O_WRONLY|os.O_TRUNC, 0666)
 	if err != nil {
 		log.Fatalf("open: %s", err)
 	}
+
 	out, err := yaml.Marshal(parser.StageParseCache)
 	if err != nil {
 		log.Fatalf("marshal: %s", err)
 	}
+
 	b, err := fd.Write(out)
 	if err != nil {
 		log.Fatalf("write: %s", err)
 	}
+
 	log.Tracef("wrote %d bytes", b)
+
 	if err := fd.Close(); err != nil {
 		log.Fatalf(" close: %s", err)
 	}
 }
 
 func dumpOverflowState() {
-
 	fd, err := os.OpenFile(filepath.Join(parser.DumpFolder, "bucket-dump.yaml"), os.O_CREATE|os.O_WRONLY|os.O_TRUNC, 0666)
 	if err != nil {
 		log.Fatalf("open: %s", err)
 	}
+
 	out, err := yaml.Marshal(bucketOverflows)
 	if err != nil {
 		log.Fatalf("marshal: %s", err)
 	}
+
 	b, err := fd.Write(out)
 	if err != nil {
 		log.Fatalf("write: %s", err)
 	}
+
 	log.Tracef("wrote %d bytes", b)
+
 	if err := fd.Close(); err != nil {
 		log.Fatalf(" close: %s", err)
 	}

+ 92 - 0
cmd/crowdsec/lapiclient.go

@@ -0,0 +1,92 @@
+package main
+
+import (
+	"context"
+	"fmt"
+	"net/url"
+	"time"
+
+	"github.com/go-openapi/strfmt"
+
+	"github.com/crowdsecurity/go-cs-lib/version"
+
+	"github.com/crowdsecurity/crowdsec/pkg/apiclient"
+	"github.com/crowdsecurity/crowdsec/pkg/csconfig"
+	"github.com/crowdsecurity/crowdsec/pkg/cwhub"
+	"github.com/crowdsecurity/crowdsec/pkg/models"
+)
+
+func AuthenticatedLAPIClient(credentials csconfig.ApiCredentialsCfg, hub *cwhub.Hub) (*apiclient.ApiClient, error) {
+	scenarios, err := hub.GetInstalledItemNames(cwhub.SCENARIOS)
+	if err != nil {
+		return nil, fmt.Errorf("loading list of installed hub scenarios: %w", err)
+	}
+
+	appsecRules, err := hub.GetInstalledItemNames(cwhub.APPSEC_RULES)
+	if err != nil {
+		return nil, fmt.Errorf("loading list of installed hub appsec rules: %w", err)
+	}
+
+	installedScenariosAndAppsecRules := make([]string, 0, len(scenarios)+len(appsecRules))
+	installedScenariosAndAppsecRules = append(installedScenariosAndAppsecRules, scenarios...)
+	installedScenariosAndAppsecRules = append(installedScenariosAndAppsecRules, appsecRules...)
+
+	apiURL, err := url.Parse(credentials.URL)
+	if err != nil {
+		return nil, fmt.Errorf("parsing api url ('%s'): %w", credentials.URL, err)
+	}
+
+	papiURL, err := url.Parse(credentials.PapiURL)
+	if err != nil {
+		return nil, fmt.Errorf("parsing polling api url ('%s'): %w", credentials.PapiURL, err)
+	}
+
+	password := strfmt.Password(credentials.Password)
+
+	client, err := apiclient.NewClient(&apiclient.Config{
+		MachineID:     credentials.Login,
+		Password:      password,
+		Scenarios:     installedScenariosAndAppsecRules,
+		UserAgent:     fmt.Sprintf("crowdsec/%s", version.String()),
+		URL:           apiURL,
+		PapiURL:       papiURL,
+		VersionPrefix: "v1",
+		UpdateScenario: func() ([]string, error) {
+			scenarios, err := hub.GetInstalledItemNames(cwhub.SCENARIOS)
+			if err != nil {
+				return nil, err
+			}
+			appsecRules, err := hub.GetInstalledItemNames(cwhub.APPSEC_RULES)
+			if err != nil {
+				return nil, err
+			}
+			ret := make([]string, 0, len(scenarios)+len(appsecRules))
+			ret = append(ret, scenarios...)
+			ret = append(ret, appsecRules...)
+
+			return ret, nil
+		},
+	})
+	if err != nil {
+		return nil, fmt.Errorf("new client api: %w", err)
+	}
+
+	authResp, _, err := client.Auth.AuthenticateWatcher(context.Background(), models.WatcherAuthRequest{
+		MachineID: &credentials.Login,
+		Password:  &password,
+		Scenarios: installedScenariosAndAppsecRules,
+	})
+	if err != nil {
+		return nil, fmt.Errorf("authenticate watcher (%s): %w", credentials.Login, err)
+	}
+
+	var expiration time.Time
+	if err := expiration.UnmarshalText([]byte(authResp.Expire)); err != nil {
+		return nil, fmt.Errorf("unable to parse jwt expiration: %w", err)
+	}
+
+	client.GetClient().Transport.(*apiclient.JWTTransport).Token = authResp.Token
+	client.GetClient().Transport.(*apiclient.JWTTransport).Expiration = expiration
+
+	return client, nil
+}

+ 9 - 4
cmd/crowdsec/metrics.go

@@ -114,13 +114,17 @@ func computeDynamicMetrics(next http.Handler, dbClient *database.Client) http.Ha
 		}
 
 		decisionsFilters := make(map[string][]string, 0)
+
 		decisions, err := dbClient.QueryDecisionCountByScenario(decisionsFilters)
 		if err != nil {
 			log.Errorf("Error querying decisions for metrics: %v", err)
 			next.ServeHTTP(w, r)
+
 			return
 		}
+
 		globalActiveDecisions.Reset()
+
 		for _, d := range decisions {
 			globalActiveDecisions.With(prometheus.Labels{"reason": d.Scenario, "origin": d.Origin, "action": d.Type}).Set(float64(d.Count))
 		}
@@ -136,6 +140,7 @@ func computeDynamicMetrics(next http.Handler, dbClient *database.Client) http.Ha
 		if err != nil {
 			log.Errorf("Error querying alerts for metrics: %v", err)
 			next.ServeHTTP(w, r)
+
 			return
 		}
 
@@ -173,11 +178,12 @@ func registerPrometheus(config *csconfig.PrometheusCfg) {
 			globalActiveDecisions, globalAlerts, parser.NodesWlHitsOk, parser.NodesWlHits,
 			cache.CacheMetrics, exprhelpers.RegexpCacheMetrics,
 		)
-
 	}
 }
 
-func servePrometheus(config *csconfig.PrometheusCfg, dbClient *database.Client, apiReady chan bool, agentReady chan bool) {
+func servePrometheus(config *csconfig.PrometheusCfg, dbClient *database.Client, agentReady chan bool) {
+	<-agentReady
+
 	if !config.Enabled {
 		return
 	}
@@ -185,9 +191,8 @@ func servePrometheus(config *csconfig.PrometheusCfg, dbClient *database.Client,
 	defer trace.CatchPanic("crowdsec/servePrometheus")
 
 	http.Handle("/metrics", computeDynamicMetrics(promhttp.Handler(), dbClient))
-	<-apiReady
-	<-agentReady
 	log.Debugf("serving metrics after %s ms", time.Since(crowdsecT0))
+
 	if err := http.ListenAndServe(fmt.Sprintf("%s:%d", config.ListenAddr, config.ListenPort), nil); err != nil {
 		log.Warningf("prometheus: %s", err)
 	}

+ 18 - 87
cmd/crowdsec/output.go

@@ -3,18 +3,12 @@ package main
 import (
 	"context"
 	"fmt"
-	"net/url"
 	"sync"
 	"time"
 
-	"github.com/go-openapi/strfmt"
 	log "github.com/sirupsen/logrus"
 
-	"github.com/crowdsecurity/go-cs-lib/version"
-
 	"github.com/crowdsecurity/crowdsec/pkg/apiclient"
-	"github.com/crowdsecurity/crowdsec/pkg/csconfig"
-	"github.com/crowdsecurity/crowdsec/pkg/cwhub"
 	leaky "github.com/crowdsecurity/crowdsec/pkg/leakybucket"
 	"github.com/crowdsecurity/crowdsec/pkg/models"
 	"github.com/crowdsecurity/crowdsec/pkg/parser"
@@ -22,7 +16,6 @@ import (
 )
 
 func dedupAlerts(alerts []types.RuntimeAlert) ([]*models.Alert, error) {
-
 	var dedupCache []*models.Alert
 
 	for idx, alert := range alerts {
@@ -32,16 +25,21 @@ func dedupAlerts(alerts []types.RuntimeAlert) ([]*models.Alert, error) {
 			dedupCache = append(dedupCache, alert.Alert)
 			continue
 		}
+
 		for k, src := range alert.Sources {
 			refsrc := *alert.Alert //copy
+
 			log.Tracef("source[%s]", k)
+
 			refsrc.Source = &src
 			dedupCache = append(dedupCache, &refsrc)
 		}
 	}
+
 	if len(dedupCache) != len(alerts) {
 		log.Tracef("went from %d to %d alerts", len(alerts), len(dedupCache))
 	}
+
 	return dedupCache, nil
 }
 
@@ -52,93 +50,25 @@ func PushAlerts(alerts []types.RuntimeAlert, client *apiclient.ApiClient) error
 	if err != nil {
 		return fmt.Errorf("failed to transform alerts for api: %w", err)
 	}
+
 	_, _, err = client.Alerts.Add(ctx, alertsToPush)
 	if err != nil {
 		return fmt.Errorf("failed sending alert to LAPI: %w", err)
 	}
+
 	return nil
 }
 
 var bucketOverflows []types.Event
 
-func runOutput(input chan types.Event, overflow chan types.Event, buckets *leaky.Buckets,
-	postOverflowCTX parser.UnixParserCtx, postOverflowNodes []parser.Node,
-	apiConfig csconfig.ApiCredentialsCfg, hub *cwhub.Hub) error {
+func runOutput(input chan types.Event, overflow chan types.Event, buckets *leaky.Buckets, postOverflowCTX parser.UnixParserCtx,
+	postOverflowNodes []parser.Node, client *apiclient.ApiClient) error {
+	var (
+		cache      []types.RuntimeAlert
+		cacheMutex sync.Mutex
+	)
 
-	var err error
 	ticker := time.NewTicker(1 * time.Second)
-
-	var cache []types.RuntimeAlert
-	var cacheMutex sync.Mutex
-
-	scenarios, err := hub.GetInstalledItemNames(cwhub.SCENARIOS)
-	if err != nil {
-		return fmt.Errorf("loading list of installed hub scenarios: %w", err)
-	}
-
-	appsecRules, err := hub.GetInstalledItemNames(cwhub.APPSEC_RULES)
-	if err != nil {
-		return fmt.Errorf("loading list of installed hub appsec rules: %w", err)
-	}
-
-	installedScenariosAndAppsecRules := make([]string, 0, len(scenarios)+len(appsecRules))
-	installedScenariosAndAppsecRules = append(installedScenariosAndAppsecRules, scenarios...)
-	installedScenariosAndAppsecRules = append(installedScenariosAndAppsecRules, appsecRules...)
-
-	apiURL, err := url.Parse(apiConfig.URL)
-	if err != nil {
-		return fmt.Errorf("parsing api url ('%s'): %w", apiConfig.URL, err)
-	}
-	papiURL, err := url.Parse(apiConfig.PapiURL)
-	if err != nil {
-		return fmt.Errorf("parsing polling api url ('%s'): %w", apiConfig.PapiURL, err)
-	}
-	password := strfmt.Password(apiConfig.Password)
-
-	Client, err := apiclient.NewClient(&apiclient.Config{
-		MachineID:     apiConfig.Login,
-		Password:      password,
-		Scenarios:     installedScenariosAndAppsecRules,
-		UserAgent:     fmt.Sprintf("crowdsec/%s", version.String()),
-		URL:           apiURL,
-		PapiURL:       papiURL,
-		VersionPrefix: "v1",
-		UpdateScenario: func() ([]string, error) {
-			scenarios, err := hub.GetInstalledItemNames(cwhub.SCENARIOS)
-			if err != nil {
-				return nil, err
-			}
-			appsecRules, err := hub.GetInstalledItemNames(cwhub.APPSEC_RULES)
-			if err != nil {
-				return nil, err
-			}
-			ret := make([]string, 0, len(scenarios)+len(appsecRules))
-			ret = append(ret, scenarios...)
-			ret = append(ret, appsecRules...)
-			return ret, nil
-		},
-	})
-	if err != nil {
-		return fmt.Errorf("new client api: %w", err)
-	}
-	authResp, _, err := Client.Auth.AuthenticateWatcher(context.Background(), models.WatcherAuthRequest{
-		MachineID: &apiConfig.Login,
-		Password:  &password,
-		Scenarios: installedScenariosAndAppsecRules,
-	})
-	if err != nil {
-		return fmt.Errorf("authenticate watcher (%s): %w", apiConfig.Login, err)
-	}
-
-	if err := Client.GetClient().Transport.(*apiclient.JWTTransport).Expiration.UnmarshalText([]byte(authResp.Expire)); err != nil {
-		return fmt.Errorf("unable to parse jwt expiration: %w", err)
-	}
-
-	Client.GetClient().Transport.(*apiclient.JWTTransport).Token = authResp.Token
-
-	//start the heartbeat service
-	log.Debugf("Starting HeartBeat service")
-	Client.HeartBeat.StartHeartBeat(context.Background(), &outputsTomb)
 LOOP:
 	for {
 		select {
@@ -149,7 +79,7 @@ LOOP:
 				newcache := make([]types.RuntimeAlert, 0)
 				cache = newcache
 				cacheMutex.Unlock()
-				if err := PushAlerts(cachecopy, Client); err != nil {
+				if err := PushAlerts(cachecopy, client); err != nil {
 					log.Errorf("while pushing to api : %s", err)
 					//just push back the events to the queue
 					cacheMutex.Lock()
@@ -162,10 +92,11 @@ LOOP:
 				cacheMutex.Lock()
 				cachecopy := cache
 				cacheMutex.Unlock()
-				if err := PushAlerts(cachecopy, Client); err != nil {
+				if err := PushAlerts(cachecopy, client); err != nil {
 					log.Errorf("while pushing leftovers to api : %s", err)
 				}
 			}
+
 			break LOOP
 		case event := <-overflow:
 			/*if alert is empty and mapKey is present, the overflow is just to cleanup bucket*/
@@ -176,7 +107,7 @@ LOOP:
 			/* process post overflow parser nodes */
 			event, err := parser.Parse(postOverflowCTX, event, postOverflowNodes)
 			if err != nil {
-				return fmt.Errorf("postoverflow failed : %s", err)
+				return fmt.Errorf("postoverflow failed: %w", err)
 			}
 			log.Printf("%s", *event.Overflow.Alert.Message)
 			//if the Alert is nil, it's to signal bucket is ready for GC, don't track this
@@ -206,6 +137,6 @@ LOOP:
 	}
 
 	ticker.Stop()
-	return nil
 
+	return nil
 }

+ 8 - 4
cmd/crowdsec/run_in_svc.go

@@ -33,7 +33,6 @@ func StartRunSvc() error {
 
 	log.Infof("Crowdsec %s", version.String())
 
-	apiReady := make(chan bool, 1)
 	agentReady := make(chan bool, 1)
 
 	// Enable profiling early
@@ -46,14 +45,19 @@ func StartRunSvc() error {
 			dbClient, err = database.NewClient(cConfig.DbConfig)
 
 			if err != nil {
-				return fmt.Errorf("unable to create database client: %s", err)
+				return fmt.Errorf("unable to create database client: %w", err)
 			}
 		}
 
 		registerPrometheus(cConfig.Prometheus)
 
-		go servePrometheus(cConfig.Prometheus, dbClient, apiReady, agentReady)
+		go servePrometheus(cConfig.Prometheus, dbClient, agentReady)
+	} else {
+		// avoid leaking the channel
+		go func() {
+			<-agentReady
+		}()
 	}
 
-	return Serve(cConfig, apiReady, agentReady)
+	return Serve(cConfig, agentReady)
 }

+ 3 - 4
cmd/crowdsec/run_in_svc_windows.go

@@ -73,7 +73,6 @@ func WindowsRun() error {
 
 	log.Infof("Crowdsec %s", version.String())
 
-	apiReady := make(chan bool, 1)
 	agentReady := make(chan bool, 1)
 
 	// Enable profiling early
@@ -85,11 +84,11 @@ func WindowsRun() error {
 			dbClient, err = database.NewClient(cConfig.DbConfig)
 
 			if err != nil {
-				return fmt.Errorf("unable to create database client: %s", err)
+				return fmt.Errorf("unable to create database client: %w", err)
 			}
 		}
 		registerPrometheus(cConfig.Prometheus)
-		go servePrometheus(cConfig.Prometheus, dbClient, apiReady, agentReady)
+		go servePrometheus(cConfig.Prometheus, dbClient, agentReady)
 	}
-	return Serve(cConfig, apiReady, agentReady)
+	return Serve(cConfig, agentReady)
 }

+ 21 - 6
cmd/crowdsec/serve.go

@@ -42,7 +42,9 @@ func debugHandler(sig os.Signal, cConfig *csconfig.Config) error {
 	if err := leaky.ShutdownAllBuckets(buckets); err != nil {
 		log.Warningf("Failed to shut down routines : %s", err)
 	}
+
 	log.Printf("Shutdown is finished, buckets are in %s", tmpFile)
+
 	return nil
 }
 
@@ -66,15 +68,16 @@ func reloadHandler(sig os.Signal) (*csconfig.Config, error) {
 	if !cConfig.DisableAPI {
 		if flags.DisableCAPI {
 			log.Warningf("Communication with CrowdSec Central API disabled from args")
+
 			cConfig.API.Server.OnlineClient = nil
 		}
+
 		apiServer, err := initAPIServer(cConfig)
 		if err != nil {
 			return nil, fmt.Errorf("unable to init api server: %w", err)
 		}
 
-		apiReady := make(chan bool, 1)
-		serveAPIServer(apiServer, apiReady)
+		serveAPIServer(apiServer)
 	}
 
 	if !cConfig.DisableAgent {
@@ -110,6 +113,7 @@ func reloadHandler(sig os.Signal) (*csconfig.Config, error) {
 			log.Warningf("Failed to delete temp file (%s) : %s", tmpFile, err)
 		}
 	}
+
 	return cConfig, nil
 }
 
@@ -117,10 +121,12 @@ func ShutdownCrowdsecRoutines() error {
 	var reterr error
 
 	log.Debugf("Shutting down crowdsec sub-routines")
+
 	if len(dataSources) > 0 {
 		acquisTomb.Kill(nil)
 		log.Debugf("waiting for acquisition to finish")
 		drainChan(inputLineChan)
+
 		if err := acquisTomb.Wait(); err != nil {
 			log.Warningf("Acquisition returned error : %s", err)
 			reterr = err
@@ -130,6 +136,7 @@ func ShutdownCrowdsecRoutines() error {
 	log.Debugf("acquisition is finished, wait for parser/bucket/ouputs.")
 	parsersTomb.Kill(nil)
 	drainChan(inputEventChan)
+
 	if err := parsersTomb.Wait(); err != nil {
 		log.Warningf("Parsers returned error : %s", err)
 		reterr = err
@@ -160,6 +167,7 @@ func ShutdownCrowdsecRoutines() error {
 			log.Warningf("Outputs returned error : %s", err)
 			reterr = err
 		}
+
 		log.Debugf("outputs are done")
 	case <-time.After(3 * time.Second):
 		// this can happen if outputs are stuck in a http retry loop
@@ -181,6 +189,7 @@ func shutdownAPI() error {
 	}
 
 	log.Debugf("done")
+
 	return nil
 }
 
@@ -193,6 +202,7 @@ func shutdownCrowdsec() error {
 	}
 
 	log.Debugf("done")
+
 	return nil
 }
 
@@ -292,10 +302,11 @@ func HandleSignals(cConfig *csconfig.Config) error {
 	if err == nil {
 		log.Warning("Crowdsec service shutting down")
 	}
+
 	return err
 }
 
-func Serve(cConfig *csconfig.Config, apiReady chan bool, agentReady chan bool) error {
+func Serve(cConfig *csconfig.Config, agentReady chan bool) error {
 	acquisTomb = tomb.Tomb{}
 	parsersTomb = tomb.Tomb{}
 	bucketsTomb = tomb.Tomb{}
@@ -325,6 +336,7 @@ func Serve(cConfig *csconfig.Config, apiReady chan bool, agentReady chan bool) e
 
 	if cConfig.API.CTI != nil && *cConfig.API.CTI.Enabled {
 		log.Infof("Crowdsec CTI helper enabled")
+
 		if err := exprhelpers.InitCrowdsecCTI(cConfig.API.CTI.Key, cConfig.API.CTI.CacheTimeout, cConfig.API.CTI.CacheSize, cConfig.API.CTI.LogLevel); err != nil {
 			return fmt.Errorf("failed to init crowdsec cti: %w", err)
 		}
@@ -337,6 +349,7 @@ func Serve(cConfig *csconfig.Config, apiReady chan bool, agentReady chan bool) e
 
 		if flags.DisableCAPI {
 			log.Warningf("Communication with CrowdSec Central API disabled from args")
+
 			cConfig.API.Server.OnlineClient = nil
 		}
 
@@ -346,10 +359,8 @@ func Serve(cConfig *csconfig.Config, apiReady chan bool, agentReady chan bool) e
 		}
 
 		if !flags.TestMode {
-			serveAPIServer(apiServer, apiReady)
+			serveAPIServer(apiServer)
 		}
-	} else {
-		apiReady <- true
 	}
 
 	if !cConfig.DisableAgent {
@@ -366,6 +377,8 @@ func Serve(cConfig *csconfig.Config, apiReady chan bool, agentReady chan bool) e
 		// if it's just linting, we're done
 		if !flags.TestMode {
 			serveCrowdsec(csParsers, cConfig, hub, agentReady)
+		} else {
+			agentReady <- true
 		}
 	} else {
 		agentReady <- true
@@ -395,6 +408,7 @@ func Serve(cConfig *csconfig.Config, apiReady chan bool, agentReady chan bool) e
 
 	for _, ch := range waitChans {
 		<-ch
+
 		switch ch {
 		case apiTomb.Dead():
 			log.Infof("api shutdown")
@@ -402,5 +416,6 @@ func Serve(cConfig *csconfig.Config, apiReady chan bool, agentReady chan bool) e
 			log.Infof("crowdsec shutdown")
 		}
 	}
+
 	return nil
 }

+ 3 - 0
test/bats/01_crowdsec.bats

@@ -75,6 +75,9 @@ teardown() {
 
     rune -0 ./instance-crowdsec start-pid
     PID="$output"
+
+    sleep .5
+
     assert_file_exists "$log_old"
     assert_file_contains "$log_old" "Starting processing data"
 

+ 17 - 4
test/bats/40_live-ban.bats

@@ -41,10 +41,23 @@ teardown() {
     echo -e "---\nfilename: ${tmpfile}\nlabels:\n  type: syslog\n" >>"${ACQUIS_YAML}"
 
     ./instance-crowdsec start
+
+    sleep 0.2
+
     fake_log >>"${tmpfile}"
-    sleep 2
+
+    sleep 0.2
+
     rm -f -- "${tmpfile}"
-    rune -0 cscli decisions list -o json
-    rune -0 jq -r '.[].decisions[0].value' <(output)
-    assert_output '1.1.1.172'
+
+    found=0
+    # this may take some time in CI
+    for _ in $(seq 1 10); do
+        if cscli decisions list -o json | jq -r '.[].decisions[0].value' | grep -q '1.1.1.172'; then
+            found=1
+            break
+        fi
+        sleep 0.2
+    done
+    assert_equal 1 "${found}"
 }