Преглед изворни кода

randomize pull, push and metric intervals; reload crowdsec only when hub changed (#1846)

mmetc пре 2 година
родитељ
комит
df88f4e1e9
5 измењених фајлова са 99 додато и 49 уклоњено
  1. 9 2
      config/crowdsec.cron.daily
  2. 66 42
      pkg/apiserver/apic.go
  3. 10 5
      pkg/apiserver/apic_test.go
  4. 11 0
      pkg/cwhub/download.go
  5. 3 0
      pkg/cwhub/helpers.go

+ 9 - 2
config/crowdsec.cron.daily

@@ -1,5 +1,12 @@
 #!/bin/sh
+
 test -x /usr/bin/cscli || exit 0
-/usr/bin/cscli --error hub update && /usr/bin/cscli --error hub upgrade
-systemctl reload crowdsec
+
+/usr/bin/cscli --error hub update
+
+upgraded=$(/usr/bin/cscli --error hub upgrade)
+if [ -n "$upgraded" ]; then
+    systemctl reload crowdsec
+fi
+
 exit 0

+ 66 - 42
pkg/apiserver/apic.go

@@ -3,12 +3,18 @@ package apiserver
 import (
 	"context"
 	"fmt"
+	"math/rand"
 	"net/url"
 	"strconv"
 	"strings"
 	"sync"
 	"time"
 
+	"github.com/go-openapi/strfmt"
+	"github.com/pkg/errors"
+	log "github.com/sirupsen/logrus"
+	"gopkg.in/tomb.v2"
+
 	"github.com/crowdsecurity/crowdsec/pkg/apiclient"
 	"github.com/crowdsecurity/crowdsec/pkg/csconfig"
 	"github.com/crowdsecurity/crowdsec/pkg/cwversion"
@@ -17,17 +23,15 @@ import (
 	"github.com/crowdsecurity/crowdsec/pkg/database/ent/decision"
 	"github.com/crowdsecurity/crowdsec/pkg/models"
 	"github.com/crowdsecurity/crowdsec/pkg/types"
-	"github.com/go-openapi/strfmt"
-	"github.com/pkg/errors"
-	log "github.com/sirupsen/logrus"
-
-	"gopkg.in/tomb.v2"
 )
 
 var (
-	PullInterval    = time.Hour * 2
-	PushInterval    = time.Second * 30
-	MetricsInterval = time.Minute * 30
+	pullIntervalDefault    = time.Hour * 2
+	pullIntervalDelta      = 5 * time.Minute
+	pushIntervalDefault    = time.Second * 30
+	pushIntervalDelta      = time.Second * 15
+	metricsIntervalDefault = time.Minute * 30
+	metricsIntervalDelta   = time.Minute * 15
 )
 
 var SCOPE_CAPI string = "CAPI"
@@ -35,20 +39,30 @@ var SCOPE_CAPI_ALIAS string = "crowdsecurity/community-blocklist" //we don't use
 var SCOPE_LISTS string = "lists"
 
 type apic struct {
-	pullInterval    time.Duration
-	pushInterval    time.Duration
-	metricsInterval time.Duration
-	dbClient        *database.Client
-	apiClient       *apiclient.ApiClient
-	alertToPush     chan []*models.Alert
-	mu              sync.Mutex
-	pushTomb        tomb.Tomb
-	pullTomb        tomb.Tomb
-	metricsTomb     tomb.Tomb
-	startup         bool
-	credentials     *csconfig.ApiCredentialsCfg
-	scenarioList    []string
-	consoleConfig   *csconfig.ConsoleConfig
+	// when changing the intervals in tests, always set *First too
+	// or they can be negative
+	pullInterval         time.Duration
+	pullIntervalFirst    time.Duration
+	pushInterval         time.Duration
+	pushIntervalFirst    time.Duration
+	metricsInterval      time.Duration
+	metricsIntervalFirst time.Duration
+	dbClient             *database.Client
+	apiClient            *apiclient.ApiClient
+	alertToPush          chan []*models.Alert
+	mu                   sync.Mutex
+	pushTomb             tomb.Tomb
+	pullTomb             tomb.Tomb
+	metricsTomb          tomb.Tomb
+	startup              bool
+	credentials          *csconfig.ApiCredentialsCfg
+	scenarioList         []string
+	consoleConfig        *csconfig.ConsoleConfig
+}
+
+// randomDuration returns a duration value between d-delta and d+delta
+func randomDuration(d time.Duration, delta time.Duration) time.Duration {
+	return time.Duration(float64(d) + float64(delta)*(-1.0+2.0*rand.Float64()))
 }
 
 func (a *apic) FetchScenariosListFromDB() ([]string, error) {
@@ -89,19 +103,22 @@ func alertToSignal(alert *models.Alert, scenarioTrust string) *models.AddSignals
 func NewAPIC(config *csconfig.OnlineApiClientCfg, dbClient *database.Client, consoleConfig *csconfig.ConsoleConfig) (*apic, error) {
 	var err error
 	ret := &apic{
-		alertToPush:     make(chan []*models.Alert),
-		dbClient:        dbClient,
-		mu:              sync.Mutex{},
-		startup:         true,
-		credentials:     config.Credentials,
-		pullTomb:        tomb.Tomb{},
-		pushTomb:        tomb.Tomb{},
-		metricsTomb:     tomb.Tomb{},
-		scenarioList:    make([]string, 0),
-		consoleConfig:   consoleConfig,
-		pullInterval:    PullInterval,
-		pushInterval:    PushInterval,
-		metricsInterval: MetricsInterval,
+		alertToPush:          make(chan []*models.Alert),
+		dbClient:             dbClient,
+		mu:                   sync.Mutex{},
+		startup:              true,
+		credentials:          config.Credentials,
+		pullTomb:             tomb.Tomb{},
+		pushTomb:             tomb.Tomb{},
+		metricsTomb:          tomb.Tomb{},
+		scenarioList:         make([]string, 0),
+		consoleConfig:        consoleConfig,
+		pullInterval:         pullIntervalDefault,
+		pullIntervalFirst:    randomDuration(pullIntervalDefault, pullIntervalDelta),
+		pushInterval:         pushIntervalDefault,
+		pushIntervalFirst:    randomDuration(pushIntervalDefault, pushIntervalDelta),
+		metricsInterval:      metricsIntervalDefault,
+		metricsIntervalFirst: randomDuration(metricsIntervalDefault, metricsIntervalDelta),
 	}
 
 	password := strfmt.Password(config.Credentials.Password)
@@ -130,8 +147,9 @@ func (a *apic) Push() error {
 	defer types.CatchPanic("lapi/pushToAPIC")
 
 	var cache models.AddSignalsRequest
-	ticker := time.NewTicker(a.pushInterval)
-	log.Infof("Start push to CrowdSec Central API (interval: %s)", PushInterval)
+	ticker := time.NewTicker(a.pushIntervalFirst)
+
+	log.Infof("Start push to CrowdSec Central API (interval: %s once, then %s)", a.pushIntervalFirst.Round(time.Second), a.pushInterval)
 
 	for {
 		select {
@@ -145,6 +163,7 @@ func (a *apic) Push() error {
 			go a.Send(&cache)
 			return nil
 		case <-ticker.C:
+			ticker.Reset(a.pushInterval)
 			if len(cache) > 0 {
 				a.mu.Lock()
 				cacheCopy := cache
@@ -469,7 +488,6 @@ func setAlertScenario(add_counters map[string]map[string]int, delete_counters ma
 
 func (a *apic) Pull() error {
 	defer types.CatchPanic("lapi/pullFromAPIC")
-	log.Infof("Start pull from CrowdSec Central API (interval: %s)", PullInterval)
 
 	toldOnce := false
 	for {
@@ -489,10 +507,14 @@ func (a *apic) Pull() error {
 	if err := a.PullTop(); err != nil {
 		log.Errorf("capi pull top: %s", err)
 	}
-	ticker := time.NewTicker(a.pullInterval)
+
+	log.Infof("Start pull from CrowdSec Central API (interval: %s once, then %s)", a.pullIntervalFirst.Round(time.Second), a.pullInterval)
+	ticker := time.NewTicker(a.pullIntervalFirst)
+
 	for {
 		select {
 		case <-ticker.C:
+			ticker.Reset(a.pullInterval)
 			if err := a.PullTop(); err != nil {
 				log.Errorf("capi pull top: %s", err)
 				continue
@@ -550,8 +572,10 @@ func (a *apic) GetMetrics() (*models.Metrics, error) {
 func (a *apic) SendMetrics(stop chan (bool)) {
 	defer types.CatchPanic("lapi/metricsToAPIC")
 
-	log.Infof("Start send metrics to CrowdSec Central API (interval: %s)", a.metricsInterval)
-	ticker := time.NewTicker(a.metricsInterval)
+	ticker := time.NewTicker(a.metricsIntervalFirst)
+
+	log.Infof("Start send metrics to CrowdSec Central API (interval: %s once, then %s)", a.metricsIntervalFirst.Round(time.Second), a.metricsInterval)
+
 	for {
 		metrics, err := a.GetMetrics()
 		if err != nil {
@@ -568,7 +592,7 @@ func (a *apic) SendMetrics(stop chan (bool)) {
 		case <-stop:
 			return
 		case <-ticker.C:
-			continue
+			ticker.Reset(a.metricsInterval)
 		case <-a.metricsTomb.Dying(): // if one apic routine is dying, do we kill the others?
 			a.pullTomb.Kill(nil)
 			a.pushTomb.Kill(nil)

+ 10 - 5
pkg/apiserver/apic_test.go

@@ -12,6 +12,12 @@ import (
 	"testing"
 	"time"
 
+	"github.com/jarcoal/httpmock"
+	"github.com/sirupsen/logrus"
+	"github.com/stretchr/testify/assert"
+	"github.com/stretchr/testify/require"
+	"gopkg.in/tomb.v2"
+
 	"github.com/crowdsecurity/crowdsec/pkg/apiclient"
 	"github.com/crowdsecurity/crowdsec/pkg/csconfig"
 	"github.com/crowdsecurity/crowdsec/pkg/cstest"
@@ -21,11 +27,6 @@ import (
 	"github.com/crowdsecurity/crowdsec/pkg/database/ent/machine"
 	"github.com/crowdsecurity/crowdsec/pkg/models"
 	"github.com/crowdsecurity/crowdsec/pkg/types"
-	"github.com/jarcoal/httpmock"
-	"github.com/sirupsen/logrus"
-	"github.com/stretchr/testify/assert"
-	"github.com/stretchr/testify/require"
-	"gopkg.in/tomb.v2"
 )
 
 func getDBClient(t *testing.T) *database.Client {
@@ -669,6 +670,7 @@ func TestAPICPush(t *testing.T) {
 		t.Run(tc.name, func(t *testing.T) {
 			api := getAPIC(t)
 			api.pushInterval = time.Millisecond
+			api.pushIntervalFirst = time.Millisecond
 			url, err := url.ParseRequestURI("http://api.crowdsec.net/")
 			require.NoError(t, err)
 
@@ -759,8 +761,10 @@ func TestAPICSendMetrics(t *testing.T) {
 
 			api := getAPIC(t)
 			api.pushInterval = time.Millisecond
+			api.pushIntervalFirst = time.Millisecond
 			api.apiClient = apiClient
 			api.metricsInterval = tc.metricsInterval
+			api.metricsIntervalFirst = tc.metricsInterval
 			tc.setUp(api)
 
 			stop := make(chan bool)
@@ -810,6 +814,7 @@ func TestAPICPull(t *testing.T) {
 		t.Run(tc.name, func(t *testing.T) {
 			api = getAPIC(t)
 			api.pullInterval = time.Millisecond
+			api.pullIntervalFirst = time.Millisecond
 			url, err := url.ParseRequestURI("http://api.crowdsec.net/")
 			require.NoError(t, err)
 			httpmock.Activate()

+ 11 - 0
pkg/cwhub/download.go

@@ -55,6 +55,17 @@ func DownloadHubIdx(hub *csconfig.Hub) ([]byte, error) {
 	if err != nil {
 		return nil, errors.Wrap(err, "failed to read request answer for hub index")
 	}
+
+	oldContent, err := os.ReadFile(hub.HubIndexFile)
+	if err != nil {
+		if !os.IsNotExist(err) {
+			log.Warningf("failed to read hub index: %s", err)
+		}
+	} else if bytes.Equal(body, oldContent) {
+		log.Info("hub index is up to date")
+		// write it anyway, can't hurt
+	}
+
 	file, err := os.OpenFile(hub.HubIndexFile, os.O_CREATE|os.O_WRONLY|os.O_TRUNC, 0644)
 
 	if err != nil {

+ 3 - 0
pkg/cwhub/helpers.go

@@ -197,6 +197,9 @@ func UpgradeConfig(csConfig *csconfig.Config, itemType string, name string, forc
 				log.Infof("%v %s is local", emoji.Prohibited, v.Name)
 			}
 		} else {
+			// this is used while scripting to know if the hub has been upgraded
+			// and a configuration reload is required
+			fmt.Printf("updated %s\n", v.Name)
 			log.Infof("%v %s : updated", emoji.Package, v.Name)
 			updated++
 		}