From 94d80c97ae94e85e34dbb2cc2d4c3f5702bacb7b Mon Sep 17 00:00:00 2001 From: Thibault bui Koechlin Date: Wed, 17 Jun 2020 01:13:19 +0200 Subject: [PATCH] adding shutdown and tombs for proper shutdown --- cmd/crowdsec/output.go | 5 ++++- cmd/crowdsec/serve.go | 4 +++- pkg/cwapi/auth.go | 20 +++++++++++++++++++- pkg/cwapi/signals.go | 3 +++ pkg/cwplugin/backend.go | 14 +++++++++++++- pkg/outputs/ouputs.go | 19 +++++++++++++++++++ 6 files changed, 61 insertions(+), 4 deletions(-) diff --git a/cmd/crowdsec/output.go b/cmd/crowdsec/output.go index af7c738ea..8a7c98d40 100644 --- a/cmd/crowdsec/output.go +++ b/cmd/crowdsec/output.go @@ -22,8 +22,11 @@ LOOP: for { select { case <-bucketsTomb.Dying(): - log.Infof("Exiting output processing") + log.Infof("Flushing outputs") output.FlushAll() + log.Infof("Shuting down output routines") + output.Shutdown() + log.Infof("Done shutdown down output") break LOOP case event := <-overflow: if cConfig.Profiling { diff --git a/cmd/crowdsec/serve.go b/cmd/crowdsec/serve.go index 9477a60c4..3baa7793b 100644 --- a/cmd/crowdsec/serve.go +++ b/cmd/crowdsec/serve.go @@ -19,10 +19,11 @@ func reloadHandler(sig os.Signal) error { if err := ShutdownRoutines(); err != nil { log.Errorf("Failed to shut down routines: %s", err) } - //dump buckets state + //todo : properly stop acquis with the tail readers if err := leaky.DumpBucketsStateAt("buckets_state.json", time.Now(), buckets); err != nil { log.Fatalf("Failed dumping bucket state : %s", err) } + //reload configurations //restore bucket state @@ -62,6 +63,7 @@ func ShutdownRoutines() error { reterr = err } + log.Infof("outputs are done") return reterr } diff --git a/pkg/cwapi/auth.go b/pkg/cwapi/auth.go index 208664220..eabdafc54 100644 --- a/pkg/cwapi/auth.go +++ b/pkg/cwapi/auth.go @@ -15,6 +15,7 @@ import ( "gopkg.in/yaml.v2" "github.com/dghubble/sling" + "gopkg.in/tomb.v2" ) type ApiCtx struct { @@ -37,6 +38,7 @@ type ApiCtx struct { tokenExpired bool `yaml:"-"` toPush []types.Event `yaml:"-"` Http *sling.Sling `yaml:"-"` + PusherTomb tomb.Tomb } type ApiCreds struct { @@ -113,7 +115,23 @@ func (ctx *ApiCtx) Init(cfg string, profile string) error { return err } //start the background go-routine - go ctx.pushLoop() //nolint:errcheck // runs into the background, we can't check error with chan or such + ctx.PusherTomb.Go(func() error { + err := ctx.pushLoop() + if err != nil { + log.Errorf("api push error : %s", err) + return err + } + return nil + }) + return nil +} + +func (ctx *ApiCtx) Shutdown() error { + ctx.PusherTomb.Kill(nil) + log.Infof("Waiting for API routine to finish") + if err := ctx.PusherTomb.Wait(); err != nil { + return fmt.Errorf("API routine returned error : %s", err) + } return nil } diff --git a/pkg/cwapi/signals.go b/pkg/cwapi/signals.go index bdc416735..9d088e8fa 100644 --- a/pkg/cwapi/signals.go +++ b/pkg/cwapi/signals.go @@ -105,6 +105,9 @@ func (ctx *ApiCtx) pushLoop() error { if err != nil { log.Errorf("api push loop: %s", err.Error()) } + case <-ctx.PusherTomb.Dying(): //we are being killed by main + log.Infof("Killing api routine") + return nil } } diff --git a/pkg/cwplugin/backend.go b/pkg/cwplugin/backend.go index 07f3fb372..4cc59196e 100644 --- a/pkg/cwplugin/backend.go +++ b/pkg/cwplugin/backend.go @@ -20,6 +20,7 @@ type Backend interface { Delete(string) (int, error) Init(map[string]string) error Flush() error + Shutdown() error DeleteAll() error } @@ -82,7 +83,7 @@ func NewBackendPlugin(path string, isDaemon bool) (*BackendManager, error) { plugNew := symNew() bInterface, ok := plugNew.(Backend) if !ok { - return nil, fmt.Errorf("unexpected '%s' type, skipping", newPlugin.Name) + return nil, fmt.Errorf("unexpected '%s' type (%T), skipping", newPlugin.Name, plugNew) } // Add the interface and Init() @@ -120,6 +121,17 @@ func (b *BackendManager) Delete(target string) (int, error) { return nbDel, nil } +func (b *BackendManager) Shutdown() error { + var err error + for _, plugin := range b.backendPlugins { + err = plugin.funcs.Shutdown() + if err != nil { + return fmt.Errorf("failed to shutdown : %s", err) + } + } + return nil +} + func (b *BackendManager) DeleteAll() error { var err error for _, plugin := range b.backendPlugins { diff --git a/pkg/outputs/ouputs.go b/pkg/outputs/ouputs.go index f114400ca..94ec701e7 100644 --- a/pkg/outputs/ouputs.go +++ b/pkg/outputs/ouputs.go @@ -81,6 +81,25 @@ func OvflwToOrder(sig types.SignalOccurence, prof types.Profile) (*types.BanOrde return &ordr, nil, warn } +func (o *Output) Shutdown() error { + var reterr error + if o.API != nil { + if err := o.API.Shutdown(); err != nil { + log.Warningf("error while shutting down API : %s", err) + reterr = err + } + } + if o.bManager != nil { + if err := o.bManager.Shutdown(); err != nil { + log.Warningf("error while shutting down backend : %s", err) + reterr = err + } + } + //bManager + //TBD : the backend(s) should be stopped in the same way + return reterr +} + func (o *Output) FlushAll() { if o.API != nil { if err := o.API.Flush(); err != nil {