Pārlūkot izejas kodu

log processor: share apiclient in output goroutines

marco 1 gadu atpakaļ
vecāks
revīzija
5ec595805d
3 mainītis faili ar 102 papildinājumiem un 80 dzēšanām
  1. 10 1
      cmd/crowdsec/crowdsec.go
  2. 89 0
      cmd/crowdsec/lapiclient.go
  3. 3 79
      cmd/crowdsec/output.go

+ 10 - 1
cmd/crowdsec/crowdsec.go

@@ -1,6 +1,7 @@
 package main
 
 import (
+	"context"
 	"fmt"
 	"os"
 	"path/filepath"
@@ -99,13 +100,21 @@ func runCrowdsec(cConfig *csconfig.Config, parsers *parser.Parsers, hub *cwhub.H
 	})
 	bucketWg.Wait()
 
+	apiClient, err := AuthenticatedLAPIClient(*cConfig.API.Client.Credentials, hub)
+	if err != nil {
+		return err
+	}
+
+	log.Debugf("Starting HeartBeat service")
+	apiClient.HeartBeat.StartHeartBeat(context.Background(), &outputsTomb)
+
 	outputWg := &sync.WaitGroup{}
 	outputsTomb.Go(func() error {
 		outputWg.Add(1)
 		for i := 0; i < cConfig.Crowdsec.OutputRoutinesCount; i++ {
 			outputsTomb.Go(func() error {
 				defer trace.CatchPanic("crowdsec/runOutput")
-				if err := runOutput(inputEventChan, outputEventChan, buckets, *parsers.Povfwctx, parsers.Povfwnodes, *cConfig.API.Client.Credentials, hub); err != nil {
+				if err := runOutput(inputEventChan, outputEventChan, buckets, *parsers.Povfwctx, parsers.Povfwnodes, apiClient); err != nil {
 					log.Fatalf("starting outputs error : %s", err)
 					return err
 				}

+ 89 - 0
cmd/crowdsec/lapiclient.go

@@ -0,0 +1,89 @@
+package main
+
+import (
+	"context"
+	"fmt"
+	"net/url"
+	"time"
+
+	"github.com/go-openapi/strfmt"
+
+	"github.com/crowdsecurity/go-cs-lib/version"
+
+	"github.com/crowdsecurity/crowdsec/pkg/apiclient"
+	"github.com/crowdsecurity/crowdsec/pkg/csconfig"
+	"github.com/crowdsecurity/crowdsec/pkg/cwhub"
+	"github.com/crowdsecurity/crowdsec/pkg/models"
+)
+
+func AuthenticatedLAPIClient(credentials csconfig.ApiCredentialsCfg, hub *cwhub.Hub) (*apiclient.ApiClient, error) {
+	scenarios, err := hub.GetInstalledItemNames(cwhub.SCENARIOS)
+	if err != nil {
+		return nil, fmt.Errorf("loading list of installed hub scenarios: %w", err)
+	}
+
+	appsecRules, err := hub.GetInstalledItemNames(cwhub.APPSEC_RULES)
+	if err != nil {
+		return nil, fmt.Errorf("loading list of installed hub appsec rules: %w", err)
+	}
+
+	installedScenariosAndAppsecRules := make([]string, 0, len(scenarios)+len(appsecRules))
+	installedScenariosAndAppsecRules = append(installedScenariosAndAppsecRules, scenarios...)
+	installedScenariosAndAppsecRules = append(installedScenariosAndAppsecRules, appsecRules...)
+
+	apiURL, err := url.Parse(credentials.URL)
+	if err != nil {
+		return nil, fmt.Errorf("parsing api url ('%s'): %w", credentials.URL, err)
+	}
+	papiURL, err := url.Parse(credentials.PapiURL)
+	if err != nil {
+		return nil, fmt.Errorf("parsing polling api url ('%s'): %w", credentials.PapiURL, err)
+	}
+	password := strfmt.Password(credentials.Password)
+
+	client, err := apiclient.NewClient(&apiclient.Config{
+		MachineID:     credentials.Login,
+		Password:      password,
+		Scenarios:     installedScenariosAndAppsecRules,
+		UserAgent:     fmt.Sprintf("crowdsec/%s", version.String()),
+		URL:           apiURL,
+		PapiURL:       papiURL,
+		VersionPrefix: "v1",
+		UpdateScenario: func() ([]string, error) {
+			scenarios, err := hub.GetInstalledItemNames(cwhub.SCENARIOS)
+			if err != nil {
+				return nil, err
+			}
+			appsecRules, err := hub.GetInstalledItemNames(cwhub.APPSEC_RULES)
+			if err != nil {
+				return nil, err
+			}
+			ret := make([]string, 0, len(scenarios)+len(appsecRules))
+			ret = append(ret, scenarios...)
+			ret = append(ret, appsecRules...)
+			return ret, nil
+		},
+	})
+	if err != nil {
+		return nil, fmt.Errorf("new client api: %w", err)
+	}
+
+	authResp, _, err := client.Auth.AuthenticateWatcher(context.Background(), models.WatcherAuthRequest{
+		MachineID: &credentials.Login,
+		Password:  &password,
+		Scenarios: installedScenariosAndAppsecRules,
+	})
+	if err != nil {
+		return nil, fmt.Errorf("authenticate watcher (%s): %w", credentials.Login, err)
+	}
+
+	var expiration time.Time
+	if err := expiration.UnmarshalText([]byte(authResp.Expire)); err != nil {
+		return nil, fmt.Errorf("unable to parse jwt expiration: %w", err)
+	}
+
+	client.GetClient().Transport.(*apiclient.JWTTransport).Token = authResp.Token
+	client.GetClient().Transport.(*apiclient.JWTTransport).Expiration = expiration
+
+	return client, nil
+}

+ 3 - 79
cmd/crowdsec/output.go

@@ -3,18 +3,12 @@ package main
 import (
 	"context"
 	"fmt"
-	"net/url"
 	"sync"
 	"time"
 
-	"github.com/go-openapi/strfmt"
 	log "github.com/sirupsen/logrus"
 
-	"github.com/crowdsecurity/go-cs-lib/version"
-
 	"github.com/crowdsecurity/crowdsec/pkg/apiclient"
-	"github.com/crowdsecurity/crowdsec/pkg/csconfig"
-	"github.com/crowdsecurity/crowdsec/pkg/cwhub"
 	leaky "github.com/crowdsecurity/crowdsec/pkg/leakybucket"
 	"github.com/crowdsecurity/crowdsec/pkg/models"
 	"github.com/crowdsecurity/crowdsec/pkg/parser"
@@ -63,82 +57,12 @@ var bucketOverflows []types.Event
 
 func runOutput(input chan types.Event, overflow chan types.Event, buckets *leaky.Buckets,
 	postOverflowCTX parser.UnixParserCtx, postOverflowNodes []parser.Node,
-	apiConfig csconfig.ApiCredentialsCfg, hub *cwhub.Hub) error {
+	client *apiclient.ApiClient) error {
 
-	var err error
 	ticker := time.NewTicker(1 * time.Second)
 
 	var cache []types.RuntimeAlert
 	var cacheMutex sync.Mutex
-
-	scenarios, err := hub.GetInstalledItemNames(cwhub.SCENARIOS)
-	if err != nil {
-		return fmt.Errorf("loading list of installed hub scenarios: %w", err)
-	}
-
-	appsecRules, err := hub.GetInstalledItemNames(cwhub.APPSEC_RULES)
-	if err != nil {
-		return fmt.Errorf("loading list of installed hub appsec rules: %w", err)
-	}
-
-	installedScenariosAndAppsecRules := make([]string, 0, len(scenarios)+len(appsecRules))
-	installedScenariosAndAppsecRules = append(installedScenariosAndAppsecRules, scenarios...)
-	installedScenariosAndAppsecRules = append(installedScenariosAndAppsecRules, appsecRules...)
-
-	apiURL, err := url.Parse(apiConfig.URL)
-	if err != nil {
-		return fmt.Errorf("parsing api url ('%s'): %w", apiConfig.URL, err)
-	}
-	papiURL, err := url.Parse(apiConfig.PapiURL)
-	if err != nil {
-		return fmt.Errorf("parsing polling api url ('%s'): %w", apiConfig.PapiURL, err)
-	}
-	password := strfmt.Password(apiConfig.Password)
-
-	Client, err := apiclient.NewClient(&apiclient.Config{
-		MachineID:     apiConfig.Login,
-		Password:      password,
-		Scenarios:     installedScenariosAndAppsecRules,
-		UserAgent:     fmt.Sprintf("crowdsec/%s", version.String()),
-		URL:           apiURL,
-		PapiURL:       papiURL,
-		VersionPrefix: "v1",
-		UpdateScenario: func() ([]string, error) {
-			scenarios, err := hub.GetInstalledItemNames(cwhub.SCENARIOS)
-			if err != nil {
-				return nil, err
-			}
-			appsecRules, err := hub.GetInstalledItemNames(cwhub.APPSEC_RULES)
-			if err != nil {
-				return nil, err
-			}
-			ret := make([]string, 0, len(scenarios)+len(appsecRules))
-			ret = append(ret, scenarios...)
-			ret = append(ret, appsecRules...)
-			return ret, nil
-		},
-	})
-	if err != nil {
-		return fmt.Errorf("new client api: %w", err)
-	}
-	authResp, _, err := Client.Auth.AuthenticateWatcher(context.Background(), models.WatcherAuthRequest{
-		MachineID: &apiConfig.Login,
-		Password:  &password,
-		Scenarios: installedScenariosAndAppsecRules,
-	})
-	if err != nil {
-		return fmt.Errorf("authenticate watcher (%s): %w", apiConfig.Login, err)
-	}
-
-	if err := Client.GetClient().Transport.(*apiclient.JWTTransport).Expiration.UnmarshalText([]byte(authResp.Expire)); err != nil {
-		return fmt.Errorf("unable to parse jwt expiration: %w", err)
-	}
-
-	Client.GetClient().Transport.(*apiclient.JWTTransport).Token = authResp.Token
-
-	//start the heartbeat service
-	log.Debugf("Starting HeartBeat service")
-	Client.HeartBeat.StartHeartBeat(context.Background(), &outputsTomb)
 LOOP:
 	for {
 		select {
@@ -149,7 +73,7 @@ LOOP:
 				newcache := make([]types.RuntimeAlert, 0)
 				cache = newcache
 				cacheMutex.Unlock()
-				if err := PushAlerts(cachecopy, Client); err != nil {
+				if err := PushAlerts(cachecopy, client); err != nil {
 					log.Errorf("while pushing to api : %s", err)
 					//just push back the events to the queue
 					cacheMutex.Lock()
@@ -162,7 +86,7 @@ LOOP:
 				cacheMutex.Lock()
 				cachecopy := cache
 				cacheMutex.Unlock()
-				if err := PushAlerts(cachecopy, Client); err != nil {
+				if err := PushAlerts(cachecopy, client); err != nil {
 					log.Errorf("while pushing leftovers to api : %s", err)
 				}
 			}