Browse Source

adding shutdown and tombs for proper shutdown

Thibault bui Koechlin 5 years ago
parent
commit
94d80c97ae
6 changed files with 61 additions and 4 deletions
  1. 4 1
      cmd/crowdsec/output.go
  2. 3 1
      cmd/crowdsec/serve.go
  3. 19 1
      pkg/cwapi/auth.go
  4. 3 0
      pkg/cwapi/signals.go
  5. 13 1
      pkg/cwplugin/backend.go
  6. 19 0
      pkg/outputs/ouputs.go

+ 4 - 1
cmd/crowdsec/output.go

@@ -22,8 +22,11 @@ LOOP:
 	for {
 	for {
 		select {
 		select {
 		case <-bucketsTomb.Dying():
 		case <-bucketsTomb.Dying():
-			log.Infof("Exiting output processing")
+			log.Infof("Flushing outputs")
 			output.FlushAll()
 			output.FlushAll()
+			log.Infof("Shuting down output routines")
+			output.Shutdown()
+			log.Infof("Done shutdown down output")
 			break LOOP
 			break LOOP
 		case event := <-overflow:
 		case event := <-overflow:
 			if cConfig.Profiling {
 			if cConfig.Profiling {

+ 3 - 1
cmd/crowdsec/serve.go

@@ -19,10 +19,11 @@ func reloadHandler(sig os.Signal) error {
 	if err := ShutdownRoutines(); err != nil {
 	if err := ShutdownRoutines(); err != nil {
 		log.Errorf("Failed to shut down routines: %s", err)
 		log.Errorf("Failed to shut down routines: %s", err)
 	}
 	}
-	//dump buckets state
+	//todo : properly stop acquis with the tail readers
 	if err := leaky.DumpBucketsStateAt("buckets_state.json", time.Now(), buckets); err != nil {
 	if err := leaky.DumpBucketsStateAt("buckets_state.json", time.Now(), buckets); err != nil {
 		log.Fatalf("Failed dumping bucket state : %s", err)
 		log.Fatalf("Failed dumping bucket state : %s", err)
 	}
 	}
+
 	//reload configurations
 	//reload configurations
 
 
 	//restore bucket state
 	//restore bucket state
@@ -62,6 +63,7 @@ func ShutdownRoutines() error {
 		reterr = err
 		reterr = err
 
 
 	}
 	}
+	log.Infof("outputs are done")
 	return reterr
 	return reterr
 }
 }
 
 

+ 19 - 1
pkg/cwapi/auth.go

@@ -15,6 +15,7 @@ import (
 	"gopkg.in/yaml.v2"
 	"gopkg.in/yaml.v2"
 
 
 	"github.com/dghubble/sling"
 	"github.com/dghubble/sling"
+	"gopkg.in/tomb.v2"
 )
 )
 
 
 type ApiCtx struct {
 type ApiCtx struct {
@@ -37,6 +38,7 @@ type ApiCtx struct {
 	tokenExpired bool          `yaml:"-"`
 	tokenExpired bool          `yaml:"-"`
 	toPush       []types.Event `yaml:"-"`
 	toPush       []types.Event `yaml:"-"`
 	Http         *sling.Sling  `yaml:"-"`
 	Http         *sling.Sling  `yaml:"-"`
+	PusherTomb   tomb.Tomb
 }
 }
 
 
 type ApiCreds struct {
 type ApiCreds struct {
@@ -113,7 +115,23 @@ func (ctx *ApiCtx) Init(cfg string, profile string) error {
 		return err
 		return err
 	}
 	}
 	//start the background go-routine
 	//start the background go-routine
-	go ctx.pushLoop() //nolint:errcheck // runs into the background, we can't check error with chan or such
+	ctx.PusherTomb.Go(func() error {
+		err := ctx.pushLoop()
+		if err != nil {
+			log.Errorf("api push error : %s", err)
+			return err
+		}
+		return nil
+	})
+	return nil
+}
+
+func (ctx *ApiCtx) Shutdown() error {
+	ctx.PusherTomb.Kill(nil)
+	log.Infof("Waiting for API routine to finish")
+	if err := ctx.PusherTomb.Wait(); err != nil {
+		return fmt.Errorf("API routine returned error : %s", err)
+	}
 	return nil
 	return nil
 }
 }
 
 

+ 3 - 0
pkg/cwapi/signals.go

@@ -105,6 +105,9 @@ func (ctx *ApiCtx) pushLoop() error {
 			if err != nil {
 			if err != nil {
 				log.Errorf("api push loop: %s", err.Error())
 				log.Errorf("api push loop: %s", err.Error())
 			}
 			}
+		case <-ctx.PusherTomb.Dying(): //we are being killed by main
+			log.Infof("Killing api routine")
+			return nil
 		}
 		}
 	}
 	}
 
 

+ 13 - 1
pkg/cwplugin/backend.go

@@ -20,6 +20,7 @@ type Backend interface {
 	Delete(string) (int, error)
 	Delete(string) (int, error)
 	Init(map[string]string) error
 	Init(map[string]string) error
 	Flush() error
 	Flush() error
+	Shutdown() error
 	DeleteAll() error
 	DeleteAll() error
 }
 }
 
 
@@ -82,7 +83,7 @@ func NewBackendPlugin(path string, isDaemon bool) (*BackendManager, error) {
 		plugNew := symNew()
 		plugNew := symNew()
 		bInterface, ok := plugNew.(Backend)
 		bInterface, ok := plugNew.(Backend)
 		if !ok {
 		if !ok {
-			return nil, fmt.Errorf("unexpected '%s' type, skipping", newPlugin.Name)
+			return nil, fmt.Errorf("unexpected '%s' type (%T), skipping", newPlugin.Name, plugNew)
 		}
 		}
 
 
 		// Add the interface and Init()
 		// Add the interface and Init()
@@ -120,6 +121,17 @@ func (b *BackendManager) Delete(target string) (int, error) {
 	return nbDel, nil
 	return nbDel, nil
 }
 }
 
 
+func (b *BackendManager) Shutdown() error {
+	var err error
+	for _, plugin := range b.backendPlugins {
+		err = plugin.funcs.Shutdown()
+		if err != nil {
+			return fmt.Errorf("failed to shutdown : %s", err)
+		}
+	}
+	return nil
+}
+
 func (b *BackendManager) DeleteAll() error {
 func (b *BackendManager) DeleteAll() error {
 	var err error
 	var err error
 	for _, plugin := range b.backendPlugins {
 	for _, plugin := range b.backendPlugins {

+ 19 - 0
pkg/outputs/ouputs.go

@@ -81,6 +81,25 @@ func OvflwToOrder(sig types.SignalOccurence, prof types.Profile) (*types.BanOrde
 	return &ordr, nil, warn
 	return &ordr, nil, warn
 }
 }
 
 
+func (o *Output) Shutdown() error {
+	var reterr error
+	if o.API != nil {
+		if err := o.API.Shutdown(); err != nil {
+			log.Warningf("error while shutting down API : %s", err)
+			reterr = err
+		}
+	}
+	if o.bManager != nil {
+		if err := o.bManager.Shutdown(); err != nil {
+			log.Warningf("error while shutting down backend : %s", err)
+			reterr = err
+		}
+	}
+	//bManager
+	//TBD : the backend(s) should be stopped in the same way
+	return reterr
+}
+
 func (o *Output) FlushAll() {
 func (o *Output) FlushAll() {
 	if o.API != nil {
 	if o.API != nil {
 		if err := o.API.Flush(); err != nil {
 		if err := o.API.Flush(); err != nil {