From 98560d0cf56d9c56366065b9a40a915844238468 Mon Sep 17 00:00:00 2001 From: mmetc <92726601+mmetc@users.noreply.github.com> Date: Thu, 7 Mar 2024 12:29:10 +0100 Subject: [PATCH] bin/crowdsec: avoid writing errors twice when log_media=stdout (#2876) * bin/crowdsec: avoid writing errors twice when log_media=stdout simpler, correct hook usage * lint --- cmd/crowdsec/api.go | 2 +- cmd/crowdsec/fatalhook.go | 28 +++++++++++++++++++ cmd/crowdsec/hook.go | 43 ------------------------------ cmd/crowdsec/main.go | 21 ++++++++++----- cmd/crowdsec/metrics.go | 6 ++--- cmd/crowdsec/output.go | 8 +++--- cmd/crowdsec/parse.go | 2 +- cmd/crowdsec/pour.go | 19 ++++++++----- cmd/crowdsec/run_in_svc.go | 4 +-- cmd/crowdsec/run_in_svc_windows.go | 4 +-- 10 files changed, 68 insertions(+), 69 deletions(-) create mode 100644 cmd/crowdsec/fatalhook.go delete mode 100644 cmd/crowdsec/hook.go diff --git a/cmd/crowdsec/api.go b/cmd/crowdsec/api.go index 4ac5c3ce9..995345a25 100644 --- a/cmd/crowdsec/api.go +++ b/cmd/crowdsec/api.go @@ -1,11 +1,11 @@ package main import ( + "errors" "fmt" "runtime" "time" - "github.com/pkg/errors" log "github.com/sirupsen/logrus" "github.com/crowdsecurity/go-cs-lib/trace" diff --git a/cmd/crowdsec/fatalhook.go b/cmd/crowdsec/fatalhook.go new file mode 100644 index 000000000..84a57406a --- /dev/null +++ b/cmd/crowdsec/fatalhook.go @@ -0,0 +1,28 @@ +package main + +import ( + "io" + + log "github.com/sirupsen/logrus" +) + +// FatalHook is used to log fatal messages to stderr when the rest goes to a file +type FatalHook struct { + Writer io.Writer + LogLevels []log.Level +} + +func (hook *FatalHook) Fire(entry *log.Entry) error { + line, err := entry.String() + if err != nil { + return err + } + + _, err = hook.Writer.Write([]byte(line)) + + return err +} + +func (hook *FatalHook) Levels() []log.Level { + return hook.LogLevels +} diff --git a/cmd/crowdsec/hook.go b/cmd/crowdsec/hook.go deleted file mode 100644 index 28515d9e4..000000000 --- a/cmd/crowdsec/hook.go +++ /dev/null @@ -1,43 +0,0 @@ -package main - -import ( - "io" - "os" - - log "github.com/sirupsen/logrus" -) - -type ConditionalHook struct { - Writer io.Writer - LogLevels []log.Level - Enabled bool -} - -func (hook *ConditionalHook) Fire(entry *log.Entry) error { - if hook.Enabled { - line, err := entry.String() - if err != nil { - return err - } - - _, err = hook.Writer.Write([]byte(line)) - - return err - } - - return nil -} - -func (hook *ConditionalHook) Levels() []log.Level { - return hook.LogLevels -} - -// The primal logging hook is set up before parsing config.yaml. -// Once config.yaml is parsed, the primal hook is disabled if the -// configured logger is writing to stderr. Otherwise it's used to -// report fatal errors and panics to stderr in addition to the log file. -var primalHook = &ConditionalHook{ - Writer: os.Stderr, - LogLevels: []log.Level{log.FatalLevel, log.PanicLevel}, - Enabled: true, -} diff --git a/cmd/crowdsec/main.go b/cmd/crowdsec/main.go index 7f3070b5f..70f7d48dc 100644 --- a/cmd/crowdsec/main.go +++ b/cmd/crowdsec/main.go @@ -72,7 +72,7 @@ type Flags struct { DisableCAPI bool Transform string OrderEvent bool - CpuProfile string + CPUProfile string } type labelsMap map[string]string @@ -181,7 +181,7 @@ func (f *Flags) Parse() { } flag.StringVar(&dumpFolder, "dump-data", "", "dump parsers/buckets raw outputs") - flag.StringVar(&f.CpuProfile, "cpu-profile", "", "write cpu profile to file") + flag.StringVar(&f.CPUProfile, "cpu-profile", "", "write cpu profile to file") flag.Parse() } @@ -249,7 +249,12 @@ func LoadConfig(configFile string, disableAgent bool, disableAPI bool, quiet boo return nil, err } - primalHook.Enabled = (cConfig.Common.LogMedia != "stdout") + if cConfig.Common.LogMedia != "stdout" { + log.AddHook(&FatalHook{ + Writer: os.Stderr, + LogLevels: []log.Level{log.FatalLevel, log.PanicLevel}, + }) + } if err := csconfig.LoadFeatureFlagsFile(configFile, log.StandardLogger()); err != nil { return nil, err @@ -323,7 +328,9 @@ func LoadConfig(configFile string, disableAgent bool, disableAPI bool, quiet boo var crowdsecT0 time.Time func main() { - log.AddHook(primalHook) + // The initial log level is INFO, even if the user provided an -error or -warning flag + // because we need feature flags before parsing cli flags + log.SetFormatter(&log.TextFormatter{TimestampFormat: time.RFC3339, FullTimestamp: true}) if err := fflag.RegisterAllFeatures(); err != nil { log.Fatalf("failed to register features: %s", err) @@ -355,13 +362,13 @@ func main() { os.Exit(0) } - if flags.CpuProfile != "" { - f, err := os.Create(flags.CpuProfile) + if flags.CPUProfile != "" { + f, err := os.Create(flags.CPUProfile) if err != nil { log.Fatalf("could not create CPU profile: %s", err) } - log.Infof("CPU profile will be written to %s", flags.CpuProfile) + log.Infof("CPU profile will be written to %s", flags.CPUProfile) if err := pprof.StartCPUProfile(f); err != nil { f.Close() diff --git a/cmd/crowdsec/metrics.go b/cmd/crowdsec/metrics.go index 563bb56bf..aed43db00 100644 --- a/cmd/crowdsec/metrics.go +++ b/cmd/crowdsec/metrics.go @@ -104,12 +104,12 @@ func computeDynamicMetrics(next http.Handler, dbClient *database.Client) http.Ha return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { // catch panics here because they are not handled by servePrometheus defer trace.CatchPanic("crowdsec/computeDynamicMetrics") - //update cache metrics (stash) + // update cache metrics (stash) cache.UpdateCacheMetrics() - //update cache metrics (regexp) + // update cache metrics (regexp) exprhelpers.UpdateRegexpCacheMetrics() - //decision metrics are only relevant for LAPI + // decision metrics are only relevant for LAPI if dbClient == nil { next.ServeHTTP(w, r) return diff --git a/cmd/crowdsec/output.go b/cmd/crowdsec/output.go index c4a2c0b6a..ac05b502e 100644 --- a/cmd/crowdsec/output.go +++ b/cmd/crowdsec/output.go @@ -27,7 +27,7 @@ func dedupAlerts(alerts []types.RuntimeAlert) ([]*models.Alert, error) { } for k, src := range alert.Sources { - refsrc := *alert.Alert //copy + refsrc := *alert.Alert // copy log.Tracef("source[%s]", k) @@ -81,7 +81,7 @@ LOOP: cacheMutex.Unlock() if err := PushAlerts(cachecopy, client); err != nil { log.Errorf("while pushing to api : %s", err) - //just push back the events to the queue + // just push back the events to the queue cacheMutex.Lock() cache = append(cache, cachecopy...) cacheMutex.Unlock() @@ -110,8 +110,8 @@ LOOP: return fmt.Errorf("postoverflow failed: %w", err) } log.Printf("%s", *event.Overflow.Alert.Message) - //if the Alert is nil, it's to signal bucket is ready for GC, don't track this - //dump after postoveflow processing to avoid missing whitelist info + // if the Alert is nil, it's to signal bucket is ready for GC, don't track this + // dump after postoveflow processing to avoid missing whitelist info if dumpStates && event.Overflow.Alert != nil { if bucketOverflows == nil { bucketOverflows = make([]types.Event, 0) diff --git a/cmd/crowdsec/parse.go b/cmd/crowdsec/parse.go index c62eeb586..53c9ee65d 100644 --- a/cmd/crowdsec/parse.go +++ b/cmd/crowdsec/parse.go @@ -11,7 +11,6 @@ import ( ) func runParse(input chan types.Event, output chan types.Event, parserCTX parser.UnixParserCtx, nodes []parser.Node) error { - LOOP: for { select { @@ -56,5 +55,6 @@ LOOP: output <- parsed } } + return nil } diff --git a/cmd/crowdsec/pour.go b/cmd/crowdsec/pour.go index 3f717e397..388c7a6c1 100644 --- a/cmd/crowdsec/pour.go +++ b/cmd/crowdsec/pour.go @@ -4,27 +4,30 @@ import ( "fmt" "time" + "github.com/prometheus/client_golang/prometheus" + log "github.com/sirupsen/logrus" + "github.com/crowdsecurity/crowdsec/pkg/csconfig" leaky "github.com/crowdsecurity/crowdsec/pkg/leakybucket" "github.com/crowdsecurity/crowdsec/pkg/types" - "github.com/prometheus/client_golang/prometheus" - log "github.com/sirupsen/logrus" ) func runPour(input chan types.Event, holders []leaky.BucketFactory, buckets *leaky.Buckets, cConfig *csconfig.Config) error { count := 0 + for { - //bucket is now ready + // bucket is now ready select { case <-bucketsTomb.Dying(): log.Infof("Bucket routine exiting") return nil case parsed := <-input: startTime := time.Now() + count++ if count%5000 == 0 { log.Infof("%d existing buckets", leaky.LeakyRoutineCount) - //when in forensics mode, garbage collect buckets + // when in forensics mode, garbage collect buckets if cConfig.Crowdsec.BucketsGCEnabled { if parsed.MarshaledTime != "" { z := &time.Time{} @@ -32,26 +35,30 @@ func runPour(input chan types.Event, holders []leaky.BucketFactory, buckets *lea log.Warningf("Failed to unmarshal time from event '%s' : %s", parsed.MarshaledTime, err) } else { log.Warning("Starting buckets garbage collection ...") + if err = leaky.GarbageCollectBuckets(*z, buckets); err != nil { - return fmt.Errorf("failed to start bucket GC : %s", err) + return fmt.Errorf("failed to start bucket GC : %w", err) } } } } } - //here we can bucketify with parsed + // here we can bucketify with parsed poured, err := leaky.PourItemToHolders(parsed, holders, buckets) if err != nil { log.Errorf("bucketify failed for: %v", parsed) continue } + elapsed := time.Since(startTime) globalPourHistogram.With(prometheus.Labels{"type": parsed.Line.Module, "source": parsed.Line.Src}).Observe(elapsed.Seconds()) + if poured { globalBucketPourOk.Inc() } else { globalBucketPourKo.Inc() } + if len(parsed.MarshaledTime) != 0 { if err := lastProcessedItem.UnmarshalText([]byte(parsed.MarshaledTime)); err != nil { log.Warningf("failed to unmarshal time from event : %s", err) diff --git a/cmd/crowdsec/run_in_svc.go b/cmd/crowdsec/run_in_svc.go index 5a8bc9a6c..58f4cdf00 100644 --- a/cmd/crowdsec/run_in_svc.go +++ b/cmd/crowdsec/run_in_svc.go @@ -23,8 +23,8 @@ func StartRunSvc() error { defer trace.CatchPanic("crowdsec/StartRunSvc") - //Always try to stop CPU profiling to avoid passing flags around - //It's a noop if profiling is not enabled + // Always try to stop CPU profiling to avoid passing flags around + // It's a noop if profiling is not enabled defer pprof.StopCPUProfile() if cConfig, err = LoadConfig(flags.ConfigFile, flags.DisableAgent, flags.DisableAPI, false); err != nil { diff --git a/cmd/crowdsec/run_in_svc_windows.go b/cmd/crowdsec/run_in_svc_windows.go index 7845e9c58..c0aa18d7f 100644 --- a/cmd/crowdsec/run_in_svc_windows.go +++ b/cmd/crowdsec/run_in_svc_windows.go @@ -20,8 +20,8 @@ func StartRunSvc() error { defer trace.CatchPanic("crowdsec/StartRunSvc") - //Always try to stop CPU profiling to avoid passing flags around - //It's a noop if profiling is not enabled + // Always try to stop CPU profiling to avoid passing flags around + // It's a noop if profiling is not enabled defer pprof.StopCPUProfile() isRunninginService, err := svc.IsWindowsService()