properly update the time structure within event (#2122)

* properly update the time structure within event to ensure it works in time-machine

* move LIVE and TIMEMACHINE to pkg/types : less code needs to import leakybucket package, and we avoid duplicating constants
This commit is contained in:
Thibault "bui" Koechlin 2023-03-16 16:25:50 +01:00 committed by GitHub
parent c77fe16943
commit 618be9ff68
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
19 changed files with 74 additions and 69 deletions

View file

@ -20,7 +20,6 @@ import (
"gopkg.in/yaml.v2"
"github.com/crowdsecurity/crowdsec/pkg/acquisition/configuration"
leaky "github.com/crowdsecurity/crowdsec/pkg/leakybucket"
"github.com/crowdsecurity/crowdsec/pkg/parser"
"github.com/crowdsecurity/crowdsec/pkg/types"
)
@ -43,7 +42,7 @@ var linesRead = prometheus.NewCounterVec(
[]string{"group", "stream"},
)
//CloudwatchSource is the runtime instance keeping track of N streams within 1 cloudwatch group
// CloudwatchSource is the runtime instance keeping track of N streams within 1 cloudwatch group
type CloudwatchSource struct {
Config CloudwatchSourceConfiguration
/*runtime stuff*/
@ -54,7 +53,7 @@ type CloudwatchSource struct {
streamIndexes map[string]string
}
//CloudwatchSourceConfiguration allows user to define one or more streams to monitor within a cloudwatch log group
// CloudwatchSourceConfiguration allows user to define one or more streams to monitor within a cloudwatch log group
type CloudwatchSourceConfiguration struct {
configuration.DataSourceCommonCfg `yaml:",inline"`
GroupName string `yaml:"group_name"` //the group name to be monitored
@ -74,7 +73,7 @@ type CloudwatchSourceConfiguration struct {
AwsRegion *string `yaml:"aws_region,omitempty"`
}
//LogStreamTailConfig is the configuration for one given stream within one group
// LogStreamTailConfig is the configuration for one given stream within one group
type LogStreamTailConfig struct {
GroupName string
StreamName string
@ -317,9 +316,9 @@ func (cw *CloudwatchSource) WatchLogGroupForStreams(out chan LogStreamTailConfig
//the stream has been updated recently, check if we should monitor it
var expectMode int
if !cw.Config.UseTimeMachine {
expectMode = leaky.LIVE
expectMode = types.LIVE
} else {
expectMode = leaky.TIMEMACHINE
expectMode = types.TIMEMACHINE
}
monitorStream := LogStreamTailConfig{
GroupName: cw.Config.GroupName,
@ -351,7 +350,7 @@ func (cw *CloudwatchSource) WatchLogGroupForStreams(out chan LogStreamTailConfig
}
}
//LogStreamManager receives the potential streams to monitor, and starts a go routine when needed
// LogStreamManager receives the potential streams to monitor, and starts a go routine when needed
func (cw *CloudwatchSource) LogStreamManager(in chan LogStreamTailConfig, outChan chan types.Event) error {
cw.logger.Debugf("starting to monitor streams for %s", cw.Config.GroupName)
@ -617,7 +616,7 @@ func (cw *CloudwatchSource) OneShotAcquisition(out chan types.Event, t *tomb.Tom
"stream": *cw.Config.StreamName,
}),
Labels: cw.Config.Labels,
ExpectMode: leaky.TIMEMACHINE,
ExpectMode: types.TIMEMACHINE,
}
return cw.CatLogStream(&config, out)
}

View file

@ -21,7 +21,6 @@ import (
"github.com/crowdsecurity/dlog"
"github.com/crowdsecurity/crowdsec/pkg/acquisition/configuration"
leaky "github.com/crowdsecurity/crowdsec/pkg/leakybucket"
"github.com/crowdsecurity/crowdsec/pkg/types"
)
@ -271,12 +270,12 @@ func (d *DockerSource) GetMode() string {
return d.Config.Mode
}
//SupportedModes returns the supported modes by the acquisition module
// SupportedModes returns the supported modes by the acquisition module
func (d *DockerSource) SupportedModes() []string {
return []string{configuration.TAIL_MODE, configuration.CAT_MODE}
}
//OneShotAcquisition reads a set of file and returns when done
// OneShotAcquisition reads a set of file and returns when done
func (d *DockerSource) OneShotAcquisition(out chan types.Event, t *tomb.Tomb) error {
d.logger.Debug("In oneshot")
runningContainer, err := d.Client.ContainerList(context.Background(), dockerTypes.ContainerListOptions{})
@ -319,7 +318,7 @@ func (d *DockerSource) OneShotAcquisition(out chan types.Event, t *tomb.Tomb) er
l.Process = true
l.Module = d.GetName()
linesRead.With(prometheus.Labels{"source": containerConfig.Name}).Inc()
evt := types.Event{Line: l, Process: true, Type: types.LOG, ExpectMode: leaky.TIMEMACHINE}
evt := types.Event{Line: l, Process: true, Type: types.LOG, ExpectMode: types.TIMEMACHINE}
out <- evt
d.logger.Debugf("Sent line to parsing: %+v", evt.Line.Raw)
}
@ -518,9 +517,9 @@ func (d *DockerSource) TailDocker(container *ContainerConfig, outChan chan types
l.Module = d.GetName()
var evt types.Event
if !d.Config.UseTimeMachine {
evt = types.Event{Line: l, Process: true, Type: types.LOG, ExpectMode: leaky.LIVE}
evt = types.Event{Line: l, Process: true, Type: types.LOG, ExpectMode: types.LIVE}
} else {
evt = types.Event{Line: l, Process: true, Type: types.LOG, ExpectMode: leaky.TIMEMACHINE}
evt = types.Event{Line: l, Process: true, Type: types.LOG, ExpectMode: types.TIMEMACHINE}
}
linesRead.With(prometheus.Labels{"source": container.Name}).Inc()
outChan <- evt

View file

@ -22,7 +22,6 @@ import (
"gopkg.in/yaml.v2"
"github.com/crowdsecurity/crowdsec/pkg/acquisition/configuration"
leaky "github.com/crowdsecurity/crowdsec/pkg/leakybucket"
"github.com/crowdsecurity/crowdsec/pkg/types"
)
@ -228,12 +227,12 @@ func (f *FileSource) GetMode() string {
return f.config.Mode
}
//SupportedModes returns the supported modes by the acquisition module
// SupportedModes returns the supported modes by the acquisition module
func (f *FileSource) SupportedModes() []string {
return []string{configuration.TAIL_MODE, configuration.CAT_MODE}
}
//OneShotAcquisition reads a set of file and returns when done
// OneShotAcquisition reads a set of file and returns when done
func (f *FileSource) OneShotAcquisition(out chan types.Event, t *tomb.Tomb) error {
f.logger.Debug("In oneshot")
for _, file := range f.files {
@ -460,9 +459,9 @@ func (f *FileSource) tailFile(out chan types.Event, t *tomb.Tomb, tail *tail.Tai
//we're tailing, it must be real time logs
logger.Debugf("pushing %+v", l)
expectMode := leaky.LIVE
expectMode := types.LIVE
if f.config.UseTimeMachine {
expectMode = leaky.TIMEMACHINE
expectMode = types.TIMEMACHINE
}
out <- types.Event{Line: l, Process: true, Type: types.LOG, ExpectMode: expectMode}
}
@ -508,7 +507,7 @@ func (f *FileSource) readFile(filename string, out chan types.Event, t *tomb.Tom
linesRead.With(prometheus.Labels{"source": filename}).Inc()
//we're reading logs at once, it must be time-machine buckets
out <- types.Event{Line: l, Process: true, Type: types.LOG, ExpectMode: leaky.TIMEMACHINE}
out <- types.Event{Line: l, Process: true, Type: types.LOG, ExpectMode: types.TIMEMACHINE}
}
t.Kill(nil)
return nil

View file

@ -16,7 +16,6 @@ import (
"gopkg.in/yaml.v2"
"github.com/crowdsecurity/crowdsec/pkg/acquisition/configuration"
leaky "github.com/crowdsecurity/crowdsec/pkg/leakybucket"
"github.com/crowdsecurity/crowdsec/pkg/types"
)
@ -134,9 +133,9 @@ func (j *JournalCtlSource) runJournalCtl(out chan types.Event, t *tomb.Tomb) err
linesRead.With(prometheus.Labels{"source": j.src}).Inc()
var evt types.Event
if !j.config.UseTimeMachine {
evt = types.Event{Line: l, Process: true, Type: types.LOG, ExpectMode: leaky.LIVE}
evt = types.Event{Line: l, Process: true, Type: types.LOG, ExpectMode: types.LIVE}
} else {
evt = types.Event{Line: l, Process: true, Type: types.LOG, ExpectMode: leaky.TIMEMACHINE}
evt = types.Event{Line: l, Process: true, Type: types.LOG, ExpectMode: types.TIMEMACHINE}
}
out <- evt
case stderrLine := <-stderrChan:

View file

@ -18,7 +18,6 @@ import (
"gopkg.in/yaml.v2"
"github.com/crowdsecurity/crowdsec/pkg/acquisition/configuration"
"github.com/crowdsecurity/crowdsec/pkg/leakybucket"
"github.com/crowdsecurity/crowdsec/pkg/types"
)
@ -158,9 +157,9 @@ func (k *KafkaSource) ReadMessage(out chan types.Event) error {
var evt types.Event
if !k.Config.UseTimeMachine {
evt = types.Event{Line: l, Process: true, Type: types.LOG, ExpectMode: leakybucket.LIVE}
evt = types.Event{Line: l, Process: true, Type: types.LOG, ExpectMode: types.LIVE}
} else {
evt = types.Event{Line: l, Process: true, Type: types.LOG, ExpectMode: leakybucket.TIMEMACHINE}
evt = types.Event{Line: l, Process: true, Type: types.LOG, ExpectMode: types.TIMEMACHINE}
}
out <- evt
}

View file

@ -20,7 +20,6 @@ import (
"gopkg.in/yaml.v2"
"github.com/crowdsecurity/crowdsec/pkg/acquisition/configuration"
"github.com/crowdsecurity/crowdsec/pkg/leakybucket"
"github.com/crowdsecurity/crowdsec/pkg/types"
)
@ -313,9 +312,9 @@ func (k *KinesisSource) ParseAndPushRecords(records []*kinesis.Record, out chan
}
var evt types.Event
if !k.Config.UseTimeMachine {
evt = types.Event{Line: l, Process: true, Type: types.LOG, ExpectMode: leakybucket.LIVE}
evt = types.Event{Line: l, Process: true, Type: types.LOG, ExpectMode: types.LIVE}
} else {
evt = types.Event{Line: l, Process: true, Type: types.LOG, ExpectMode: leakybucket.TIMEMACHINE}
evt = types.Event{Line: l, Process: true, Type: types.LOG, ExpectMode: types.TIMEMACHINE}
}
out <- evt
}

View file

@ -9,7 +9,6 @@ import (
"strings"
"github.com/crowdsecurity/crowdsec/pkg/acquisition/configuration"
"github.com/crowdsecurity/crowdsec/pkg/leakybucket"
"github.com/crowdsecurity/crowdsec/pkg/types"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
@ -194,7 +193,7 @@ func (ka *KubernetesAuditSource) webhookHandler(w http.ResponseWriter, r *http.R
Line: l,
Process: true,
Type: types.LOG,
ExpectMode: leakybucket.LIVE,
ExpectMode: types.LIVE,
}
}
}

View file

@ -16,7 +16,6 @@ import (
"github.com/crowdsecurity/crowdsec/pkg/acquisition/modules/syslog/internal/parser/rfc3164"
"github.com/crowdsecurity/crowdsec/pkg/acquisition/modules/syslog/internal/parser/rfc5424"
syslogserver "github.com/crowdsecurity/crowdsec/pkg/acquisition/modules/syslog/internal/server"
leaky "github.com/crowdsecurity/crowdsec/pkg/leakybucket"
"github.com/crowdsecurity/crowdsec/pkg/types"
)
@ -221,9 +220,9 @@ func (s *SyslogSource) handleSyslogMsg(out chan types.Event, t *tomb.Tomb, c cha
l.Src = syslogLine.Client
l.Process = true
if !s.config.UseTimeMachine {
out <- types.Event{Line: l, Process: true, Type: types.LOG, ExpectMode: leaky.LIVE}
out <- types.Event{Line: l, Process: true, Type: types.LOG, ExpectMode: types.LIVE}
} else {
out <- types.Event{Line: l, Process: true, Type: types.LOG, ExpectMode: leaky.TIMEMACHINE}
out <- types.Event{Line: l, Process: true, Type: types.LOG, ExpectMode: types.TIMEMACHINE}
}
}
}

View file

@ -18,7 +18,6 @@ import (
"gopkg.in/yaml.v2"
"github.com/crowdsecurity/crowdsec/pkg/acquisition/configuration"
leaky "github.com/crowdsecurity/crowdsec/pkg/leakybucket"
"github.com/crowdsecurity/crowdsec/pkg/types"
)
@ -73,7 +72,7 @@ func logLevelToInt(logLevel string) ([]string, error) {
}
}
//This is lifted from winops/winlog, but we only want to render the basic XML string, we don't need the extra fluff
// This is lifted from winops/winlog, but we only want to render the basic XML string, we don't need the extra fluff
func (w *WinEventLogSource) getXMLEvents(config *winlog.SubscribeConfig, publisherCache map[string]windows.Handle, resultSet windows.Handle, maxEvents int) ([]string, error) {
var events = make([]windows.Handle, maxEvents)
var returned uint32
@ -196,9 +195,9 @@ func (w *WinEventLogSource) getEvents(out chan types.Event, t *tomb.Tomb) error
l.Src = w.name
l.Process = true
if !w.config.UseTimeMachine {
out <- types.Event{Line: l, Process: true, Type: types.LOG, ExpectMode: leaky.LIVE}
out <- types.Event{Line: l, Process: true, Type: types.LOG, ExpectMode: types.LIVE}
} else {
out <- types.Event{Line: l, Process: true, Type: types.LOG, ExpectMode: leaky.TIMEMACHINE}
out <- types.Event{Line: l, Process: true, Type: types.LOG, ExpectMode: types.TIMEMACHINE}
}
}
}

View file

@ -6,24 +6,20 @@ import (
"sync/atomic"
"time"
//"log"
"github.com/crowdsecurity/crowdsec/pkg/time/rate"
"github.com/crowdsecurity/crowdsec/pkg/types"
"github.com/mohae/deepcopy"
"gopkg.in/tomb.v2"
//rate "time/rate"
"github.com/davecgh/go-spew/spew"
"github.com/mohae/deepcopy"
"github.com/prometheus/client_golang/prometheus"
log "github.com/sirupsen/logrus"
//"golang.org/x/time/rate"
"gopkg.in/tomb.v2"
)
const (
LIVE = iota
TIMEMACHINE
)
// those constants are now defined in types/constants
// const (
// LIVE = iota
// TIMEMACHINE
// )
// Leaky represents one instance of a bucket
type Leaky struct {
@ -171,7 +167,7 @@ func FromFactory(bucketFactory BucketFactory) *Leaky {
Pour: Pour,
Reprocess: bucketFactory.Reprocess,
Profiling: bucketFactory.Profiling,
Mode: LIVE,
Mode: types.LIVE,
scopeType: bucketFactory.ScopeType,
scenarioVersion: bucketFactory.ScenarioVersion,
hash: bucketFactory.hash,

View file

@ -174,7 +174,7 @@ func testFile(t *testing.T, file string, bs string, holders []BucketFactory, res
latest_ts = ts
}
in.ExpectMode = TIMEMACHINE
in.ExpectMode = types.TIMEMACHINE
log.Infof("Buckets input : %s", spew.Sdump(in))
ok, err := PourItemToHolders(in, holders, buckets)
if err != nil {

View file

@ -158,7 +158,7 @@ func ValidateFactory(bucketFactory *BucketFactory) error {
func LoadBuckets(cscfg *csconfig.CrowdsecServiceCfg, files []string, tomb *tomb.Tomb, buckets *Buckets) ([]BucketFactory, chan types.Event, error) {
var (
ret = []BucketFactory{}
ret = []BucketFactory{}
response chan types.Event
)
@ -408,9 +408,9 @@ func LoadBucketsState(file string, buckets *Buckets, bucketFactories []BucketFac
if h.Name == v.Name {
log.Debugf("found factory %s/%s -> %s", h.Author, h.Name, h.Description)
//check in which mode the bucket was
if v.Mode == TIMEMACHINE {
if v.Mode == types.TIMEMACHINE {
tbucket = NewTimeMachine(h)
} else if v.Mode == LIVE {
} else if v.Mode == types.LIVE {
tbucket = NewLeaky(h)
} else {
log.Errorf("Unknown bucket type : %d", v.Mode)

View file

@ -22,9 +22,11 @@ var serialized map[string]Leaky
var BucketPourCache map[string][]types.Event
var BucketPourTrack bool
/*The leaky routines lifecycle are based on "real" time.
/*
The leaky routines lifecycle are based on "real" time.
But when we are running in time-machine mode, the reference time is in logs and not "real" time.
Thus we need to garbage collect them to avoid a skyrocketing memory usage.*/
Thus we need to garbage collect them to avoid a skyrocketing memory usage.
*/
func GarbageCollectBuckets(deadline time.Time, buckets *Buckets) error {
buckets.wgPour.Wait()
buckets.wgDumpState.Add(1)
@ -192,7 +194,7 @@ func PourItemToBucket(bucket *Leaky, holder BucketFactory, buckets *Buckets, par
}
/*let's see if this time-bucket should have expired */
if bucket.Mode == TIMEMACHINE {
if bucket.Mode == types.TIMEMACHINE {
bucket.mutex.Lock()
firstTs := bucket.First_ts
lastTs := bucket.Last_ts
@ -250,10 +252,10 @@ func LoadOrStoreBucketFromHolder(partitionKey string, buckets *Buckets, holder B
var fresh_bucket *Leaky
switch expectMode {
case TIMEMACHINE:
case types.TIMEMACHINE:
fresh_bucket = NewTimeMachine(holder)
holder.logger.Debugf("Creating TimeMachine bucket")
case LIVE:
case types.LIVE:
fresh_bucket = NewLeaky(holder)
holder.logger.Debugf("Creating Live bucket")
default:

View file

@ -51,6 +51,6 @@ func NewTimeMachine(g BucketFactory) *Leaky {
l := NewLeaky(g)
g.logger.Tracef("Instantiating timeMachine bucket")
l.Pour = TimeMachinePour
l.Mode = TIMEMACHINE
l.Mode = types.TIMEMACHINE
return l
}

View file

@ -15,7 +15,7 @@ func (t *Trigger) OnBucketPour(b *BucketFactory) func(types.Event, *Leaky) *type
// Pour makes the bucket overflow all the time
// TriggerPour unconditionally overflows
return func(msg types.Event, l *Leaky) *types.Event {
if l.Mode == TIMEMACHINE {
if l.Mode == types.TIMEMACHINE {
var d time.Time
err := d.UnmarshalText([]byte(msg.MarshaledTime))
if err != nil {

View file

@ -72,13 +72,22 @@ func ParseDate(in string, p *types.Event, x interface{}, plog *log.Entry) (map[s
strDate, parsedDate = GenDateParse(in)
if !parsedDate.IsZero() {
ret["MarshaledTime"] = strDate
//In time machine, we take the time parsed from the event. In live mode, we keep the timestamp collected at acquisition
if p.ExpectMode == types.TIMEMACHINE {
p.Time = parsedDate
}
return ret, nil
}
strDate = expr.ParseUnix(in)
if strDate != "" {
ret["MarshaledTime"] = strDate
timeobj, err := expr.ParseUnixTime(in)
if err == nil {
ret["MarshaledTime"] = timeobj.Format(time.RFC3339)
//In time machine, we take the time parsed from the event. In live mode, we keep the timestamp collected at acquisition
if p.ExpectMode == types.TIMEMACHINE {
p.Time = timeobj
}
return ret, nil
}
}
plog.Debugf("no suitable date format found for '%s', falling back to now", in)
now := time.Now().UTC()

View file

@ -30,3 +30,9 @@ func GetOrigins() []string {
CAPIOrigin,
}
}
// Leakybucket can be in mode LIVE or TIMEMACHINE
const (
LIVE = iota
TIMEMACHINE
)

View file

@ -14,11 +14,11 @@ const (
OVFLW
)
//Event is the structure representing a runtime event (log or overflow)
// Event is the structure representing a runtime event (log or overflow)
type Event struct {
/* is it a log or an overflow */
Type int `yaml:"Type,omitempty" json:"Type,omitempty"` //Can be types.LOG (0) or types.OVFLOW (1)
ExpectMode int `yaml:"ExpectMode,omitempty" json:"ExpectMode,omitempty"` //how to buckets should handle event : leaky.TIMEMACHINE or leaky.LIVE
ExpectMode int `yaml:"ExpectMode,omitempty" json:"ExpectMode,omitempty"` //how to buckets should handle event : types.TIMEMACHINE or types.LIVE
Whitelisted bool `yaml:"Whitelisted,omitempty" json:"Whitelisted,omitempty"`
WhitelistReason string `yaml:"WhitelistReason,omitempty" json:"whitelist_reason,omitempty"`
//should add whitelist reason ?
@ -73,7 +73,7 @@ func (e *Event) GetMeta(key string) string {
return ""
}
//Move in leakybuckets
// Move in leakybuckets
const (
Undefined = ""
Ip = "Ip"
@ -83,7 +83,7 @@ const (
AS = "AS"
)
//Move in leakybuckets
// Move in leakybuckets
type ScopeType struct {
Scope string `yaml:"type"`
Filter string `yaml:"expression"`

View file

@ -4,7 +4,7 @@ line: Sep 19 18:33:22 scw-d95986 sshd[24347]: pam_unix(sshd:auth): authenticatio
├ s01-parse
| └ 🟢 crowdsecurity/sshd-logs (+8 ~1)
├ s02-enrich
| ├ 🟢 crowdsecurity/dateparse-enrich (+2 ~1)
| ├ 🟢 crowdsecurity/dateparse-enrich (+2 ~2)
| └ 🟢 crowdsecurity/geoip-enrich (+10)
├-------- parser success 🟢
├ Scenarios
@ -12,3 +12,4 @@ line: Sep 19 18:33:22 scw-d95986 sshd[24347]: pam_unix(sshd:auth): authenticatio
├ 🟢 crowdsecurity/ssh-bf_user-enum
├ 🟢 crowdsecurity/ssh-slow-bf
└ 🟢 crowdsecurity/ssh-slow-bf_user-enum