bin/crowdsec: avoid writing errors twice when log_media=stdout (#2876)
* bin/crowdsec: avoid writing errors twice when log_media=stdout simpler, correct hook usage * lint
This commit is contained in:
parent
e611d01c90
commit
98560d0cf5
10 changed files with 68 additions and 69 deletions
|
@ -1,11 +1,11 @@
|
||||||
package main
|
package main
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
"runtime"
|
"runtime"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/pkg/errors"
|
|
||||||
log "github.com/sirupsen/logrus"
|
log "github.com/sirupsen/logrus"
|
||||||
|
|
||||||
"github.com/crowdsecurity/go-cs-lib/trace"
|
"github.com/crowdsecurity/go-cs-lib/trace"
|
||||||
|
|
28
cmd/crowdsec/fatalhook.go
Normal file
28
cmd/crowdsec/fatalhook.go
Normal file
|
@ -0,0 +1,28 @@
|
||||||
|
package main
|
||||||
|
|
||||||
|
import (
|
||||||
|
"io"
|
||||||
|
|
||||||
|
log "github.com/sirupsen/logrus"
|
||||||
|
)
|
||||||
|
|
||||||
|
// FatalHook is used to log fatal messages to stderr when the rest goes to a file
|
||||||
|
type FatalHook struct {
|
||||||
|
Writer io.Writer
|
||||||
|
LogLevels []log.Level
|
||||||
|
}
|
||||||
|
|
||||||
|
func (hook *FatalHook) Fire(entry *log.Entry) error {
|
||||||
|
line, err := entry.String()
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
_, err = hook.Writer.Write([]byte(line))
|
||||||
|
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
func (hook *FatalHook) Levels() []log.Level {
|
||||||
|
return hook.LogLevels
|
||||||
|
}
|
|
@ -1,43 +0,0 @@
|
||||||
package main
|
|
||||||
|
|
||||||
import (
|
|
||||||
"io"
|
|
||||||
"os"
|
|
||||||
|
|
||||||
log "github.com/sirupsen/logrus"
|
|
||||||
)
|
|
||||||
|
|
||||||
type ConditionalHook struct {
|
|
||||||
Writer io.Writer
|
|
||||||
LogLevels []log.Level
|
|
||||||
Enabled bool
|
|
||||||
}
|
|
||||||
|
|
||||||
func (hook *ConditionalHook) Fire(entry *log.Entry) error {
|
|
||||||
if hook.Enabled {
|
|
||||||
line, err := entry.String()
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
_, err = hook.Writer.Write([]byte(line))
|
|
||||||
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (hook *ConditionalHook) Levels() []log.Level {
|
|
||||||
return hook.LogLevels
|
|
||||||
}
|
|
||||||
|
|
||||||
// The primal logging hook is set up before parsing config.yaml.
|
|
||||||
// Once config.yaml is parsed, the primal hook is disabled if the
|
|
||||||
// configured logger is writing to stderr. Otherwise it's used to
|
|
||||||
// report fatal errors and panics to stderr in addition to the log file.
|
|
||||||
var primalHook = &ConditionalHook{
|
|
||||||
Writer: os.Stderr,
|
|
||||||
LogLevels: []log.Level{log.FatalLevel, log.PanicLevel},
|
|
||||||
Enabled: true,
|
|
||||||
}
|
|
|
@ -72,7 +72,7 @@ type Flags struct {
|
||||||
DisableCAPI bool
|
DisableCAPI bool
|
||||||
Transform string
|
Transform string
|
||||||
OrderEvent bool
|
OrderEvent bool
|
||||||
CpuProfile string
|
CPUProfile string
|
||||||
}
|
}
|
||||||
|
|
||||||
type labelsMap map[string]string
|
type labelsMap map[string]string
|
||||||
|
@ -181,7 +181,7 @@ func (f *Flags) Parse() {
|
||||||
}
|
}
|
||||||
|
|
||||||
flag.StringVar(&dumpFolder, "dump-data", "", "dump parsers/buckets raw outputs")
|
flag.StringVar(&dumpFolder, "dump-data", "", "dump parsers/buckets raw outputs")
|
||||||
flag.StringVar(&f.CpuProfile, "cpu-profile", "", "write cpu profile to file")
|
flag.StringVar(&f.CPUProfile, "cpu-profile", "", "write cpu profile to file")
|
||||||
flag.Parse()
|
flag.Parse()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -249,7 +249,12 @@ func LoadConfig(configFile string, disableAgent bool, disableAPI bool, quiet boo
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
primalHook.Enabled = (cConfig.Common.LogMedia != "stdout")
|
if cConfig.Common.LogMedia != "stdout" {
|
||||||
|
log.AddHook(&FatalHook{
|
||||||
|
Writer: os.Stderr,
|
||||||
|
LogLevels: []log.Level{log.FatalLevel, log.PanicLevel},
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
if err := csconfig.LoadFeatureFlagsFile(configFile, log.StandardLogger()); err != nil {
|
if err := csconfig.LoadFeatureFlagsFile(configFile, log.StandardLogger()); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
|
@ -323,7 +328,9 @@ func LoadConfig(configFile string, disableAgent bool, disableAPI bool, quiet boo
|
||||||
var crowdsecT0 time.Time
|
var crowdsecT0 time.Time
|
||||||
|
|
||||||
func main() {
|
func main() {
|
||||||
log.AddHook(primalHook)
|
// The initial log level is INFO, even if the user provided an -error or -warning flag
|
||||||
|
// because we need feature flags before parsing cli flags
|
||||||
|
log.SetFormatter(&log.TextFormatter{TimestampFormat: time.RFC3339, FullTimestamp: true})
|
||||||
|
|
||||||
if err := fflag.RegisterAllFeatures(); err != nil {
|
if err := fflag.RegisterAllFeatures(); err != nil {
|
||||||
log.Fatalf("failed to register features: %s", err)
|
log.Fatalf("failed to register features: %s", err)
|
||||||
|
@ -355,13 +362,13 @@ func main() {
|
||||||
os.Exit(0)
|
os.Exit(0)
|
||||||
}
|
}
|
||||||
|
|
||||||
if flags.CpuProfile != "" {
|
if flags.CPUProfile != "" {
|
||||||
f, err := os.Create(flags.CpuProfile)
|
f, err := os.Create(flags.CPUProfile)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Fatalf("could not create CPU profile: %s", err)
|
log.Fatalf("could not create CPU profile: %s", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
log.Infof("CPU profile will be written to %s", flags.CpuProfile)
|
log.Infof("CPU profile will be written to %s", flags.CPUProfile)
|
||||||
|
|
||||||
if err := pprof.StartCPUProfile(f); err != nil {
|
if err := pprof.StartCPUProfile(f); err != nil {
|
||||||
f.Close()
|
f.Close()
|
||||||
|
|
|
@ -104,12 +104,12 @@ func computeDynamicMetrics(next http.Handler, dbClient *database.Client) http.Ha
|
||||||
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||||
// catch panics here because they are not handled by servePrometheus
|
// catch panics here because they are not handled by servePrometheus
|
||||||
defer trace.CatchPanic("crowdsec/computeDynamicMetrics")
|
defer trace.CatchPanic("crowdsec/computeDynamicMetrics")
|
||||||
//update cache metrics (stash)
|
// update cache metrics (stash)
|
||||||
cache.UpdateCacheMetrics()
|
cache.UpdateCacheMetrics()
|
||||||
//update cache metrics (regexp)
|
// update cache metrics (regexp)
|
||||||
exprhelpers.UpdateRegexpCacheMetrics()
|
exprhelpers.UpdateRegexpCacheMetrics()
|
||||||
|
|
||||||
//decision metrics are only relevant for LAPI
|
// decision metrics are only relevant for LAPI
|
||||||
if dbClient == nil {
|
if dbClient == nil {
|
||||||
next.ServeHTTP(w, r)
|
next.ServeHTTP(w, r)
|
||||||
return
|
return
|
||||||
|
|
|
@ -27,7 +27,7 @@ func dedupAlerts(alerts []types.RuntimeAlert) ([]*models.Alert, error) {
|
||||||
}
|
}
|
||||||
|
|
||||||
for k, src := range alert.Sources {
|
for k, src := range alert.Sources {
|
||||||
refsrc := *alert.Alert //copy
|
refsrc := *alert.Alert // copy
|
||||||
|
|
||||||
log.Tracef("source[%s]", k)
|
log.Tracef("source[%s]", k)
|
||||||
|
|
||||||
|
@ -81,7 +81,7 @@ LOOP:
|
||||||
cacheMutex.Unlock()
|
cacheMutex.Unlock()
|
||||||
if err := PushAlerts(cachecopy, client); err != nil {
|
if err := PushAlerts(cachecopy, client); err != nil {
|
||||||
log.Errorf("while pushing to api : %s", err)
|
log.Errorf("while pushing to api : %s", err)
|
||||||
//just push back the events to the queue
|
// just push back the events to the queue
|
||||||
cacheMutex.Lock()
|
cacheMutex.Lock()
|
||||||
cache = append(cache, cachecopy...)
|
cache = append(cache, cachecopy...)
|
||||||
cacheMutex.Unlock()
|
cacheMutex.Unlock()
|
||||||
|
@ -110,8 +110,8 @@ LOOP:
|
||||||
return fmt.Errorf("postoverflow failed: %w", err)
|
return fmt.Errorf("postoverflow failed: %w", err)
|
||||||
}
|
}
|
||||||
log.Printf("%s", *event.Overflow.Alert.Message)
|
log.Printf("%s", *event.Overflow.Alert.Message)
|
||||||
//if the Alert is nil, it's to signal bucket is ready for GC, don't track this
|
// if the Alert is nil, it's to signal bucket is ready for GC, don't track this
|
||||||
//dump after postoveflow processing to avoid missing whitelist info
|
// dump after postoveflow processing to avoid missing whitelist info
|
||||||
if dumpStates && event.Overflow.Alert != nil {
|
if dumpStates && event.Overflow.Alert != nil {
|
||||||
if bucketOverflows == nil {
|
if bucketOverflows == nil {
|
||||||
bucketOverflows = make([]types.Event, 0)
|
bucketOverflows = make([]types.Event, 0)
|
||||||
|
|
|
@ -11,7 +11,6 @@ import (
|
||||||
)
|
)
|
||||||
|
|
||||||
func runParse(input chan types.Event, output chan types.Event, parserCTX parser.UnixParserCtx, nodes []parser.Node) error {
|
func runParse(input chan types.Event, output chan types.Event, parserCTX parser.UnixParserCtx, nodes []parser.Node) error {
|
||||||
|
|
||||||
LOOP:
|
LOOP:
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
|
@ -56,5 +55,6 @@ LOOP:
|
||||||
output <- parsed
|
output <- parsed
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
|
@ -4,27 +4,30 @@ import (
|
||||||
"fmt"
|
"fmt"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"github.com/prometheus/client_golang/prometheus"
|
||||||
|
log "github.com/sirupsen/logrus"
|
||||||
|
|
||||||
"github.com/crowdsecurity/crowdsec/pkg/csconfig"
|
"github.com/crowdsecurity/crowdsec/pkg/csconfig"
|
||||||
leaky "github.com/crowdsecurity/crowdsec/pkg/leakybucket"
|
leaky "github.com/crowdsecurity/crowdsec/pkg/leakybucket"
|
||||||
"github.com/crowdsecurity/crowdsec/pkg/types"
|
"github.com/crowdsecurity/crowdsec/pkg/types"
|
||||||
"github.com/prometheus/client_golang/prometheus"
|
|
||||||
log "github.com/sirupsen/logrus"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
func runPour(input chan types.Event, holders []leaky.BucketFactory, buckets *leaky.Buckets, cConfig *csconfig.Config) error {
|
func runPour(input chan types.Event, holders []leaky.BucketFactory, buckets *leaky.Buckets, cConfig *csconfig.Config) error {
|
||||||
count := 0
|
count := 0
|
||||||
|
|
||||||
for {
|
for {
|
||||||
//bucket is now ready
|
// bucket is now ready
|
||||||
select {
|
select {
|
||||||
case <-bucketsTomb.Dying():
|
case <-bucketsTomb.Dying():
|
||||||
log.Infof("Bucket routine exiting")
|
log.Infof("Bucket routine exiting")
|
||||||
return nil
|
return nil
|
||||||
case parsed := <-input:
|
case parsed := <-input:
|
||||||
startTime := time.Now()
|
startTime := time.Now()
|
||||||
|
|
||||||
count++
|
count++
|
||||||
if count%5000 == 0 {
|
if count%5000 == 0 {
|
||||||
log.Infof("%d existing buckets", leaky.LeakyRoutineCount)
|
log.Infof("%d existing buckets", leaky.LeakyRoutineCount)
|
||||||
//when in forensics mode, garbage collect buckets
|
// when in forensics mode, garbage collect buckets
|
||||||
if cConfig.Crowdsec.BucketsGCEnabled {
|
if cConfig.Crowdsec.BucketsGCEnabled {
|
||||||
if parsed.MarshaledTime != "" {
|
if parsed.MarshaledTime != "" {
|
||||||
z := &time.Time{}
|
z := &time.Time{}
|
||||||
|
@ -32,26 +35,30 @@ func runPour(input chan types.Event, holders []leaky.BucketFactory, buckets *lea
|
||||||
log.Warningf("Failed to unmarshal time from event '%s' : %s", parsed.MarshaledTime, err)
|
log.Warningf("Failed to unmarshal time from event '%s' : %s", parsed.MarshaledTime, err)
|
||||||
} else {
|
} else {
|
||||||
log.Warning("Starting buckets garbage collection ...")
|
log.Warning("Starting buckets garbage collection ...")
|
||||||
|
|
||||||
if err = leaky.GarbageCollectBuckets(*z, buckets); err != nil {
|
if err = leaky.GarbageCollectBuckets(*z, buckets); err != nil {
|
||||||
return fmt.Errorf("failed to start bucket GC : %s", err)
|
return fmt.Errorf("failed to start bucket GC : %w", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
//here we can bucketify with parsed
|
// here we can bucketify with parsed
|
||||||
poured, err := leaky.PourItemToHolders(parsed, holders, buckets)
|
poured, err := leaky.PourItemToHolders(parsed, holders, buckets)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Errorf("bucketify failed for: %v", parsed)
|
log.Errorf("bucketify failed for: %v", parsed)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
elapsed := time.Since(startTime)
|
elapsed := time.Since(startTime)
|
||||||
globalPourHistogram.With(prometheus.Labels{"type": parsed.Line.Module, "source": parsed.Line.Src}).Observe(elapsed.Seconds())
|
globalPourHistogram.With(prometheus.Labels{"type": parsed.Line.Module, "source": parsed.Line.Src}).Observe(elapsed.Seconds())
|
||||||
|
|
||||||
if poured {
|
if poured {
|
||||||
globalBucketPourOk.Inc()
|
globalBucketPourOk.Inc()
|
||||||
} else {
|
} else {
|
||||||
globalBucketPourKo.Inc()
|
globalBucketPourKo.Inc()
|
||||||
}
|
}
|
||||||
|
|
||||||
if len(parsed.MarshaledTime) != 0 {
|
if len(parsed.MarshaledTime) != 0 {
|
||||||
if err := lastProcessedItem.UnmarshalText([]byte(parsed.MarshaledTime)); err != nil {
|
if err := lastProcessedItem.UnmarshalText([]byte(parsed.MarshaledTime)); err != nil {
|
||||||
log.Warningf("failed to unmarshal time from event : %s", err)
|
log.Warningf("failed to unmarshal time from event : %s", err)
|
||||||
|
|
|
@ -23,8 +23,8 @@ func StartRunSvc() error {
|
||||||
|
|
||||||
defer trace.CatchPanic("crowdsec/StartRunSvc")
|
defer trace.CatchPanic("crowdsec/StartRunSvc")
|
||||||
|
|
||||||
//Always try to stop CPU profiling to avoid passing flags around
|
// Always try to stop CPU profiling to avoid passing flags around
|
||||||
//It's a noop if profiling is not enabled
|
// It's a noop if profiling is not enabled
|
||||||
defer pprof.StopCPUProfile()
|
defer pprof.StopCPUProfile()
|
||||||
|
|
||||||
if cConfig, err = LoadConfig(flags.ConfigFile, flags.DisableAgent, flags.DisableAPI, false); err != nil {
|
if cConfig, err = LoadConfig(flags.ConfigFile, flags.DisableAgent, flags.DisableAPI, false); err != nil {
|
||||||
|
|
|
@ -20,8 +20,8 @@ func StartRunSvc() error {
|
||||||
|
|
||||||
defer trace.CatchPanic("crowdsec/StartRunSvc")
|
defer trace.CatchPanic("crowdsec/StartRunSvc")
|
||||||
|
|
||||||
//Always try to stop CPU profiling to avoid passing flags around
|
// Always try to stop CPU profiling to avoid passing flags around
|
||||||
//It's a noop if profiling is not enabled
|
// It's a noop if profiling is not enabled
|
||||||
defer pprof.StopCPUProfile()
|
defer pprof.StopCPUProfile()
|
||||||
|
|
||||||
isRunninginService, err := svc.IsWindowsService()
|
isRunninginService, err := svc.IsWindowsService()
|
||||||
|
|
Loading…
Reference in a new issue