Преглед на файлове

Add `transform` configuration option for acquisition (#2144)

blotus преди 2 години
родител
ревизия
61bea26486

+ 7 - 1
cmd/crowdsec/main.go

@@ -69,6 +69,7 @@ type Flags struct {
 	DisableAPI     bool
 	WinSvc         string
 	DisableCAPI    bool
+	Transform      string
 }
 
 type labelsMap map[string]string
@@ -107,7 +108,7 @@ func LoadAcquisition(cConfig *csconfig.Config) error {
 		flags.Labels = labels
 		flags.Labels["type"] = flags.SingleFileType
 
-		dataSources, err = acquisition.LoadAcquisitionFromDSN(flags.OneShotDSN, flags.Labels)
+		dataSources, err = acquisition.LoadAcquisitionFromDSN(flags.OneShotDSN, flags.Labels, flags.Transform)
 		if err != nil {
 			return errors.Wrapf(err, "failed to configure datasource for %s", flags.OneShotDSN)
 		}
@@ -149,6 +150,7 @@ func (f *Flags) Parse() {
 	flag.BoolVar(&f.ErrorLevel, "error", false, "print error-level on stderr")
 	flag.BoolVar(&f.PrintVersion, "version", false, "display version")
 	flag.StringVar(&f.OneShotDSN, "dsn", "", "Process a single data source in time-machine")
+	flag.StringVar(&f.Transform, "transform", "", "expr to apply on the event after acquisition")
 	flag.StringVar(&f.SingleFileType, "type", "", "Labels.type for file in time-machine")
 	flag.Var(&labels, "label", "Additional Labels for file in time-machine")
 	flag.BoolVar(&f.TestMode, "t", false, "only test configs")
@@ -257,6 +259,10 @@ func LoadConfig(cConfig *csconfig.Config) error {
 		return errors.New("-dsn requires a -type argument")
 	}
 
+	if flags.Transform != "" && flags.OneShotDSN == "" {
+		return errors.New("-transform requires a -dsn argument")
+	}
+
 	if flags.SingleFileType != "" && flags.OneShotDSN == "" {
 		return errors.New("-type requires a -dsn argument")
 	}

+ 104 - 15
pkg/acquisition/acquisition.go

@@ -7,6 +7,9 @@ import (
 	"os"
 	"strings"
 
+	"github.com/antonmedv/expr"
+	"github.com/antonmedv/expr/vm"
+	"github.com/google/uuid"
 	"github.com/prometheus/client_golang/prometheus"
 	log "github.com/sirupsen/logrus"
 	tomb "gopkg.in/tomb.v2"
@@ -23,6 +26,7 @@ import (
 	s3acquisition "github.com/crowdsecurity/crowdsec/pkg/acquisition/modules/s3"
 	syslogacquisition "github.com/crowdsecurity/crowdsec/pkg/acquisition/modules/syslog"
 	wineventlogacquisition "github.com/crowdsecurity/crowdsec/pkg/acquisition/modules/wineventlog"
+	"github.com/crowdsecurity/crowdsec/pkg/exprhelpers"
 
 	"github.com/crowdsecurity/crowdsec/pkg/csconfig"
 	"github.com/crowdsecurity/crowdsec/pkg/types"
@@ -30,16 +34,17 @@ import (
 
 // The interface each datasource must implement
 type DataSource interface {
-	GetMetrics() []prometheus.Collector                         // Returns pointers to metrics that are managed by the module
-	GetAggregMetrics() []prometheus.Collector                   // Returns pointers to metrics that are managed by the module (aggregated mode, limits cardinality)
-	UnmarshalConfig([]byte) error                               // Decode and pre-validate the YAML datasource - anything that can be checked before runtime
-	Configure([]byte, *log.Entry) error                         // Complete the YAML datasource configuration and perform runtime checks.
-	ConfigureByDSN(string, map[string]string, *log.Entry) error // Configure the datasource
-	GetMode() string                                            // Get the mode (TAIL, CAT or SERVER)
-	GetName() string                                            // Get the name of the module
-	OneShotAcquisition(chan types.Event, *tomb.Tomb) error      // Start one shot acquisition(eg, cat a file)
-	StreamingAcquisition(chan types.Event, *tomb.Tomb) error    // Start live acquisition (eg, tail a file)
-	CanRun() error                                              // Whether the datasource can run or not (eg, journalctl on BSD is a non-sense)
+	GetMetrics() []prometheus.Collector                                 // Returns pointers to metrics that are managed by the module
+	GetAggregMetrics() []prometheus.Collector                           // Returns pointers to metrics that are managed by the module (aggregated mode, limits cardinality)
+	UnmarshalConfig([]byte) error                                       // Decode and pre-validate the YAML datasource - anything that can be checked before runtime
+	Configure([]byte, *log.Entry) error                                 // Complete the YAML datasource configuration and perform runtime checks.
+	ConfigureByDSN(string, map[string]string, *log.Entry, string) error // Configure the datasource
+	GetMode() string                                                    // Get the mode (TAIL, CAT or SERVER)
+	GetName() string                                                    // Get the name of the module
+	OneShotAcquisition(chan types.Event, *tomb.Tomb) error              // Start one shot acquisition(eg, cat a file)
+	StreamingAcquisition(chan types.Event, *tomb.Tomb) error            // Start live acquisition (eg, tail a file)
+	CanRun() error                                                      // Whether the datasource can run or not (eg, journalctl on BSD is a non-sense)
+	GetUuid() string                                                    // Get the unique identifier of the datasource
 	Dump() interface{}
 }
 
@@ -56,6 +61,8 @@ var AcquisitionSources = map[string]func() DataSource{
 	"s3":          func() DataSource { return &s3acquisition.S3Source{} },
 }
 
+var transformRuntimes = map[string]*vm.Program{}
+
 func GetDataSourceIface(dataSourceType string) DataSource {
 	source := AcquisitionSources[dataSourceType]
 	if source == nil {
@@ -115,7 +122,7 @@ func detectBackwardCompatAcquis(sub configuration.DataSourceCommonCfg) string {
 	return ""
 }
 
-func LoadAcquisitionFromDSN(dsn string, labels map[string]string) ([]DataSource, error) {
+func LoadAcquisitionFromDSN(dsn string, labels map[string]string, transformExpr string) ([]DataSource, error) {
 	var sources []DataSource
 
 	frags := strings.Split(dsn, ":")
@@ -134,7 +141,15 @@ func LoadAcquisitionFromDSN(dsn string, labels map[string]string) ([]DataSource,
 	subLogger := clog.WithFields(log.Fields{
 		"type": dsn,
 	})
-	err := dataSrc.ConfigureByDSN(dsn, labels, subLogger)
+	uniqueId := uuid.NewString()
+	if transformExpr != "" {
+		vm, err := expr.Compile(transformExpr, exprhelpers.GetExprOptions(map[string]interface{}{"evt": &types.Event{}})...)
+		if err != nil {
+			return nil, fmt.Errorf("while compiling transform expression '%s': %w", transformExpr, err)
+		}
+		transformRuntimes[uniqueId] = vm
+	}
+	err := dataSrc.ConfigureByDSN(dsn, labels, subLogger, uniqueId)
 	if err != nil {
 		return nil, fmt.Errorf("while configuration datasource for %s: %w", dsn, err)
 	}
@@ -185,10 +200,19 @@ func LoadAcquisitionFromFile(config *csconfig.CrowdsecServiceCfg) ([]DataSource,
 			if GetDataSourceIface(sub.Source) == nil {
 				return nil, fmt.Errorf("unknown data source %s in %s (position: %d)", sub.Source, acquisFile, idx)
 			}
+			uniqueId := uuid.NewString()
+			sub.UniqueId = uniqueId
 			src, err := DataSourceConfigure(sub)
 			if err != nil {
 				return nil, fmt.Errorf("while configuring datasource of type %s from %s (position: %d): %w", sub.Source, acquisFile, idx, err)
 			}
+			if sub.TransformExpr != "" {
+				vm, err := expr.Compile(sub.TransformExpr, exprhelpers.GetExprOptions(map[string]interface{}{"evt": &types.Event{}})...)
+				if err != nil {
+					return nil, fmt.Errorf("while compiling transform expression '%s' for datasource %s in %s (position: %d): %w", sub.TransformExpr, sub.Source, acquisFile, idx, err)
+				}
+				transformRuntimes[uniqueId] = vm
+			}
 			sources = append(sources, *src)
 			idx += 1
 		}
@@ -212,11 +236,60 @@ func GetMetrics(sources []DataSource, aggregated bool) error {
 				// ignore the error
 			}
 		}
-
 	}
 	return nil
 }
 
+func transform(transformChan chan types.Event, output chan types.Event, AcquisTomb *tomb.Tomb, transformRuntime *vm.Program, logger *log.Entry) {
+	defer types.CatchPanic("crowdsec/acquis")
+	logger.Infof("transformer started")
+	for {
+		select {
+		case <-AcquisTomb.Dying():
+			logger.Debugf("transformer is dying")
+			return
+		case evt := <-transformChan:
+			logger.Tracef("Received event %s", evt.Line.Raw)
+			out, err := expr.Run(transformRuntime, map[string]interface{}{"evt": &evt})
+			if err != nil {
+				logger.Errorf("while running transform expression: %s, sending event as-is", err)
+				output <- evt
+			}
+			if out == nil {
+				logger.Errorf("transform expression returned nil, sending event as-is")
+				output <- evt
+			}
+			switch v := out.(type) {
+			case string:
+				logger.Tracef("transform expression returned %s", v)
+				evt.Line.Raw = v
+				output <- evt
+			case []interface{}:
+				logger.Tracef("transform expression returned %v", v) //nolint:asasalint // We actually want to log the slice content
+				for _, line := range v {
+					l, ok := line.(string)
+					if !ok {
+						logger.Errorf("transform expression returned []interface{}, but cannot assert an element to string")
+						output <- evt
+						continue
+					}
+					evt.Line.Raw = l
+					output <- evt
+				}
+			case []string:
+				logger.Tracef("transform expression returned %v", v)
+				for _, line := range v {
+					evt.Line.Raw = line
+					output <- evt
+				}
+			default:
+				logger.Errorf("transform expression returned an invalid type %T, sending event as-is", out)
+				output <- evt
+			}
+		}
+	}
+}
+
 func StartAcquisition(sources []DataSource, output chan types.Event, AcquisTomb *tomb.Tomb) error {
 	for i := 0; i < len(sources); i++ {
 		subsrc := sources[i] //ensure its a copy
@@ -225,10 +298,26 @@ func StartAcquisition(sources []DataSource, output chan types.Event, AcquisTomb
 		AcquisTomb.Go(func() error {
 			defer types.CatchPanic("crowdsec/acquis")
 			var err error
+
+			outChan := output
+			log.Debugf("datasource %s UUID: %s", subsrc.GetName(), subsrc.GetUuid())
+			if transformRuntime, ok := transformRuntimes[subsrc.GetUuid()]; ok {
+				log.Infof("transform expression found for datasource %s", subsrc.GetName())
+				transformChan := make(chan types.Event)
+				outChan = transformChan
+				transformLogger := log.WithFields(log.Fields{
+					"component":  "transform",
+					"datasource": subsrc.GetName(),
+				})
+				AcquisTomb.Go(func() error {
+					transform(outChan, output, AcquisTomb, transformRuntime, transformLogger)
+					return nil
+				})
+			}
 			if subsrc.GetMode() == configuration.TAIL_MODE {
-				err = subsrc.StreamingAcquisition(output, AcquisTomb)
+				err = subsrc.StreamingAcquisition(outChan, AcquisTomb)
 			} else {
-				err = subsrc.OneShotAcquisition(output, AcquisTomb)
+				err = subsrc.OneShotAcquisition(outChan, AcquisTomb)
 			}
 			if err != nil {
 				//if one of the acqusition returns an error, we kill the others to properly shutdown

+ 9 - 5
pkg/acquisition/acquisition_test.go

@@ -58,9 +58,10 @@ func (f *MockSource) GetMetrics() []prometheus.Collector                      {
 func (f *MockSource) GetAggregMetrics() []prometheus.Collector                { return nil }
 func (f *MockSource) Dump() interface{}                                       { return f }
 func (f *MockSource) GetName() string                                         { return "mock" }
-func (f *MockSource) ConfigureByDSN(string, map[string]string, *log.Entry) error {
+func (f *MockSource) ConfigureByDSN(string, map[string]string, *log.Entry, string) error {
 	return fmt.Errorf("not supported")
 }
+func (f *MockSource) GetUuid() string { return "" }
 
 // copy the mocksource, but this one can't run
 type MockSourceCantRun struct {
@@ -326,9 +327,10 @@ func (f *MockCat) CanRun() error                            { return nil }
 func (f *MockCat) GetMetrics() []prometheus.Collector       { return nil }
 func (f *MockCat) GetAggregMetrics() []prometheus.Collector { return nil }
 func (f *MockCat) Dump() interface{}                        { return f }
-func (f *MockCat) ConfigureByDSN(string, map[string]string, *log.Entry) error {
+func (f *MockCat) ConfigureByDSN(string, map[string]string, *log.Entry, string) error {
 	return fmt.Errorf("not supported")
 }
+func (f *MockCat) GetUuid() string { return "" }
 
 //----
 
@@ -367,9 +369,10 @@ func (f *MockTail) CanRun() error                            { return nil }
 func (f *MockTail) GetMetrics() []prometheus.Collector       { return nil }
 func (f *MockTail) GetAggregMetrics() []prometheus.Collector { return nil }
 func (f *MockTail) Dump() interface{}                        { return f }
-func (f *MockTail) ConfigureByDSN(string, map[string]string, *log.Entry) error {
+func (f *MockTail) ConfigureByDSN(string, map[string]string, *log.Entry, string) error {
 	return fmt.Errorf("not supported")
 }
+func (f *MockTail) GetUuid() string { return "" }
 
 //func StartAcquisition(sources []DataSource, output chan types.Event, AcquisTomb *tomb.Tomb) error {
 
@@ -490,13 +493,14 @@ func (f *MockSourceByDSN) GetMetrics() []prometheus.Collector
 func (f *MockSourceByDSN) GetAggregMetrics() []prometheus.Collector                { return nil }
 func (f *MockSourceByDSN) Dump() interface{}                                       { return f }
 func (f *MockSourceByDSN) GetName() string                                         { return "mockdsn" }
-func (f *MockSourceByDSN) ConfigureByDSN(dsn string, labels map[string]string, logger *log.Entry) error {
+func (f *MockSourceByDSN) ConfigureByDSN(dsn string, labels map[string]string, logger *log.Entry, uuid string) error {
 	dsn = strings.TrimPrefix(dsn, "mockdsn://")
 	if dsn != "test_expect" {
 		return fmt.Errorf("unexpected value")
 	}
 	return nil
 }
+func (f *MockSourceByDSN) GetUuid() string { return "" }
 
 func TestConfigureByDSN(t *testing.T) {
 	tests := []struct {
@@ -529,7 +533,7 @@ func TestConfigureByDSN(t *testing.T) {
 	for _, tc := range tests {
 		tc := tc
 		t.Run(tc.dsn, func(t *testing.T) {
-			srcs, err := LoadAcquisitionFromDSN(tc.dsn, map[string]string{"type": "test_label"})
+			srcs, err := LoadAcquisitionFromDSN(tc.dsn, map[string]string{"type": "test_label"}, "")
 			cstest.RequireErrorContains(t, err, tc.ExpectedError)
 
 			assert.Len(t, srcs, tc.ExpectedResLen)

+ 2 - 0
pkg/acquisition/configuration/configuration.go

@@ -11,6 +11,8 @@ type DataSourceCommonCfg struct {
 	Source         string                 `yaml:"source,omitempty"`
 	Name           string                 `yaml:"name,omitempty"`
 	UseTimeMachine bool                   `yaml:"use_time_machine,omitempty"`
+	UniqueId       string                 `yaml:"unique_id,omitempty"`
+	TransformExpr  string                 `yaml:"transform,omitempty"`
 	Config         map[string]interface{} `yaml:",inline"` //to keep the datasource-specific configuration directives
 }
 

+ 7 - 1
pkg/acquisition/modules/cloudwatch/cloudwatch.go

@@ -100,6 +100,10 @@ var (
 	def_AwsConfigDir            = ""
 )
 
+func (cw *CloudwatchSource) GetUuid() string {
+	return cw.Config.UniqueId
+}
+
 func (cw *CloudwatchSource) UnmarshalConfig(yamlConfig []byte) error {
 	cw.Config = CloudwatchSourceConfiguration{}
 	if err := yaml.UnmarshalStrict(yamlConfig, &cw.Config); err != nil {
@@ -509,7 +513,7 @@ func (cw *CloudwatchSource) TailLogStream(cfg *LogStreamTailConfig, outChan chan
 	}
 }
 
-func (cw *CloudwatchSource) ConfigureByDSN(dsn string, labels map[string]string, logger *log.Entry) error {
+func (cw *CloudwatchSource) ConfigureByDSN(dsn string, labels map[string]string, logger *log.Entry, uuid string) error {
 	cw.logger = logger
 
 	dsn = strings.TrimPrefix(dsn, cw.GetName()+"://")
@@ -524,6 +528,8 @@ func (cw *CloudwatchSource) ConfigureByDSN(dsn string, labels map[string]string,
 	cw.Config.GroupName = frags[0]
 	cw.Config.StreamName = &frags[1]
 	cw.Config.Labels = labels
+	cw.Config.UniqueId = uuid
+
 	u, err := url.ParseQuery(args[1])
 	if err != nil {
 		return errors.Wrapf(err, "while parsing %s", dsn)

+ 2 - 2
pkg/acquisition/modules/cloudwatch/cloudwatch_test.go

@@ -623,7 +623,7 @@ func TestConfigureByDSN(t *testing.T) {
 			dbgLogger := log.New().WithField("test", tc.name)
 			dbgLogger.Logger.SetLevel(log.DebugLevel)
 			cw := CloudwatchSource{}
-			err := cw.ConfigureByDSN(tc.dsn, tc.labels, dbgLogger)
+			err := cw.ConfigureByDSN(tc.dsn, tc.labels, dbgLogger, "")
 			cstest.RequireErrorContains(t, err, tc.expectedCfgErr)
 		})
 	}
@@ -746,7 +746,7 @@ func TestOneShotAcquisition(t *testing.T) {
 			dbgLogger.Logger.SetLevel(log.DebugLevel)
 			dbgLogger.Infof("starting test")
 			cw := CloudwatchSource{}
-			err := cw.ConfigureByDSN(tc.dsn, map[string]string{"type": "test"}, dbgLogger)
+			err := cw.ConfigureByDSN(tc.dsn, map[string]string{"type": "test"}, dbgLogger, "")
 			cstest.RequireErrorContains(t, err, tc.expectedCfgErr)
 			if tc.expectedCfgErr != "" {
 				return

+ 6 - 1
pkg/acquisition/modules/docker/docker.go

@@ -67,6 +67,10 @@ type ContainerConfig struct {
 	Tty    bool
 }
 
+func (d *DockerSource) GetUuid() string {
+	return d.Config.UniqueId
+}
+
 func (d *DockerSource) UnmarshalConfig(yamlConfig []byte) error {
 	d.Config = DockerConfiguration{
 		FollowStdout:  true, // default
@@ -158,7 +162,7 @@ func (d *DockerSource) Configure(yamlConfig []byte, logger *log.Entry) error {
 	return nil
 }
 
-func (d *DockerSource) ConfigureByDSN(dsn string, labels map[string]string, logger *log.Entry) error {
+func (d *DockerSource) ConfigureByDSN(dsn string, labels map[string]string, logger *log.Entry, uuid string) error {
 	var err error
 
 	if !strings.HasPrefix(dsn, d.GetName()+"://") {
@@ -170,6 +174,7 @@ func (d *DockerSource) ConfigureByDSN(dsn string, labels map[string]string, logg
 		FollowStdErr:  true,
 		CheckInterval: "1s",
 	}
+	d.Config.UniqueId = uuid
 	d.Config.ContainerName = make([]string, 0)
 	d.Config.ContainerID = make([]string, 0)
 	d.runningContainerState = make(map[string]*ContainerConfig)

+ 2 - 2
pkg/acquisition/modules/docker/docker_test.go

@@ -107,7 +107,7 @@ func TestConfigureDSN(t *testing.T) {
 	})
 	for _, test := range tests {
 		f := DockerSource{}
-		err := f.ConfigureByDSN(test.dsn, map[string]string{"type": "testtype"}, subLogger)
+		err := f.ConfigureByDSN(test.dsn, map[string]string{"type": "testtype"}, subLogger, "")
 		cstest.AssertErrorContains(t, err, test.expectedErr)
 	}
 }
@@ -303,7 +303,7 @@ func TestOneShot(t *testing.T) {
 		labels := make(map[string]string)
 		labels["type"] = ts.logType
 
-		if err := dockerClient.ConfigureByDSN(ts.dsn, labels, subLogger); err != nil {
+		if err := dockerClient.ConfigureByDSN(ts.dsn, labels, subLogger, ""); err != nil {
 			t.Fatalf("unable to configure dsn '%s': %s", ts.dsn, err)
 		}
 		dockerClient.Client = new(mockDockerCli)

+ 39 - 12
pkg/acquisition/modules/file/file.go

@@ -10,6 +10,7 @@ import (
 	"path"
 	"path/filepath"
 	"regexp"
+	"strconv"
 	"strings"
 	"time"
 
@@ -37,6 +38,7 @@ type FileConfiguration struct {
 	ExcludeRegexps                    []string `yaml:"exclude_regexps"`
 	Filename                          string
 	ForceInotify                      bool `yaml:"force_inotify"`
+	MaxBufferSize                     int  `yaml:"max_buffer_size"`
 	configuration.DataSourceCommonCfg `yaml:",inline"`
 }
 
@@ -50,6 +52,10 @@ type FileSource struct {
 	exclude_regexps    []*regexp.Regexp
 }
 
+func (f *FileSource) GetUuid() string {
+	return f.config.UniqueId
+}
+
 func (f *FileSource) UnmarshalConfig(yamlConfig []byte) error {
 	f.config = FileConfiguration{}
 	err := yaml.UnmarshalStrict(yamlConfig, &f.config)
@@ -163,12 +169,13 @@ func (f *FileSource) Configure(yamlConfig []byte, logger *log.Entry) error {
 	return nil
 }
 
-func (f *FileSource) ConfigureByDSN(dsn string, labels map[string]string, logger *log.Entry) error {
+func (f *FileSource) ConfigureByDSN(dsn string, labels map[string]string, logger *log.Entry, uuid string) error {
 	if !strings.HasPrefix(dsn, "file://") {
 		return fmt.Errorf("invalid DSN %s for file source, must start with file://", dsn)
 	}
 
 	f.logger = logger
+	f.config = FileConfiguration{}
 
 	dsn = strings.TrimPrefix(dsn, "file://")
 
@@ -184,23 +191,34 @@ func (f *FileSource) ConfigureByDSN(dsn string, labels map[string]string, logger
 			return errors.Wrap(err, "could not parse file args")
 		}
 		for key, value := range params {
-			if key != "log_level" {
-				return fmt.Errorf("unsupported key %s in file DSN", key)
-			}
-			if len(value) != 1 {
-				return errors.New("expected zero or one value for 'log_level'")
-			}
-			lvl, err := log.ParseLevel(value[0])
-			if err != nil {
-				return errors.Wrapf(err, "unknown level %s", value[0])
+			switch key {
+			case "log_level":
+				if len(value) != 1 {
+					return errors.New("expected zero or one value for 'log_level'")
+				}
+				lvl, err := log.ParseLevel(value[0])
+				if err != nil {
+					return errors.Wrapf(err, "unknown level %s", value[0])
+				}
+				f.logger.Logger.SetLevel(lvl)
+			case "max_buffer_size":
+				if len(value) != 1 {
+					return errors.New("expected zero or one value for 'max_buffer_size'")
+				}
+				maxBufferSize, err := strconv.Atoi(value[0])
+				if err != nil {
+					return errors.Wrapf(err, "could not parse max_buffer_size %s", value[0])
+				}
+				f.config.MaxBufferSize = maxBufferSize
+			default:
+				return fmt.Errorf("unknown parameter %s", key)
 			}
-			f.logger.Logger.SetLevel(lvl)
 		}
 	}
 
-	f.config = FileConfiguration{}
 	f.config.Labels = labels
 	f.config.Mode = configuration.CAT_MODE
+	f.config.UniqueId = uuid
 
 	f.logger.Debugf("Will try pattern %s", args[0])
 	files, err := filepath.Glob(args[0])
@@ -491,6 +509,10 @@ func (f *FileSource) readFile(filename string, out chan types.Event, t *tomb.Tom
 		scanner = bufio.NewScanner(fd)
 	}
 	scanner.Split(bufio.ScanLines)
+	if f.config.MaxBufferSize > 0 {
+		buf := make([]byte, 0, 64*1024)
+		scanner.Buffer(buf, f.config.MaxBufferSize)
+	}
 	for scanner.Scan() {
 		if scanner.Text() == "" {
 			continue
@@ -509,6 +531,11 @@ func (f *FileSource) readFile(filename string, out chan types.Event, t *tomb.Tom
 		//we're reading logs at once, it must be time-machine buckets
 		out <- types.Event{Line: l, Process: true, Type: types.LOG, ExpectMode: types.TIMEMACHINE}
 	}
+	if err := scanner.Err(); err != nil {
+		logger.Errorf("Error while reading file: %s", err)
+		t.Kill(err)
+		return err
+	}
 	t.Kill(nil)
 	return nil
 }

+ 1 - 1
pkg/acquisition/modules/file/file_test.go

@@ -97,7 +97,7 @@ func TestConfigureDSN(t *testing.T) {
 		tc := tc
 		t.Run(tc.dsn, func(t *testing.T) {
 			f := fileacquisition.FileSource{}
-			err := f.ConfigureByDSN(tc.dsn, map[string]string{"type": "testtype"}, subLogger)
+			err := f.ConfigureByDSN(tc.dsn, map[string]string{"type": "testtype"}, subLogger, "")
 			cstest.RequireErrorContains(t, err, tc.expectedErr)
 		})
 	}

+ 6 - 1
pkg/acquisition/modules/journalctl/journalctl.go

@@ -154,6 +154,10 @@ func (j *JournalCtlSource) runJournalCtl(out chan types.Event, t *tomb.Tomb) err
 	}
 }
 
+func (j *JournalCtlSource) GetUuid() string {
+	return j.config.UniqueId
+}
+
 func (j *JournalCtlSource) GetMetrics() []prometheus.Collector {
 	return []prometheus.Collector{linesRead}
 }
@@ -200,11 +204,12 @@ func (j *JournalCtlSource) Configure(yamlConfig []byte, logger *log.Entry) error
 	return nil
 }
 
-func (j *JournalCtlSource) ConfigureByDSN(dsn string, labels map[string]string, logger *log.Entry) error {
+func (j *JournalCtlSource) ConfigureByDSN(dsn string, labels map[string]string, logger *log.Entry, uuid string) error {
 	j.logger = logger
 	j.config = JournalCtlConfiguration{}
 	j.config.Mode = configuration.CAT_MODE
 	j.config.Labels = labels
+	j.config.UniqueId = uuid
 
 	//format for the DSN is : journalctl://filters=FILTER1&filters=FILTER2
 	if !strings.HasPrefix(dsn, "journalctl://") {

+ 1 - 1
pkg/acquisition/modules/journalctl/journalctl_test.go

@@ -96,7 +96,7 @@ func TestConfigureDSN(t *testing.T) {
 	})
 	for _, test := range tests {
 		f := JournalCtlSource{}
-		err := f.ConfigureByDSN(test.dsn, map[string]string{"type": "testtype"}, subLogger)
+		err := f.ConfigureByDSN(test.dsn, map[string]string{"type": "testtype"}, subLogger, "")
 		cstest.AssertErrorContains(t, err, test.expectedErr)
 	}
 }

+ 5 - 1
pkg/acquisition/modules/kafka/kafka.go

@@ -54,6 +54,10 @@ type KafkaSource struct {
 	Reader *kafka.Reader
 }
 
+func (k *KafkaSource) GetUuid() string {
+	return k.Config.UniqueId
+}
+
 func (k *KafkaSource) UnmarshalConfig(yamlConfig []byte) error {
 	k.Config = KafkaConfiguration{}
 
@@ -102,7 +106,7 @@ func (k *KafkaSource) Configure(yamlConfig []byte, logger *log.Entry) error {
 	return nil
 }
 
-func (k *KafkaSource) ConfigureByDSN(string, map[string]string, *log.Entry) error {
+func (k *KafkaSource) ConfigureByDSN(string, map[string]string, *log.Entry, string) error {
 	return fmt.Errorf("%s datasource does not support command-line acquisition", dataSourceName)
 }
 

+ 5 - 1
pkg/acquisition/modules/kinesis/kinesis.go

@@ -74,6 +74,10 @@ var linesReadShards = prometheus.NewCounterVec(
 	[]string{"stream", "shard"},
 )
 
+func (k *KinesisSource) GetUuid() string {
+	return k.Config.UniqueId
+}
+
 func (k *KinesisSource) newClient() error {
 	var sess *session.Session
 
@@ -161,7 +165,7 @@ func (k *KinesisSource) Configure(yamlConfig []byte, logger *log.Entry) error {
 	return nil
 }
 
-func (k *KinesisSource) ConfigureByDSN(string, map[string]string, *log.Entry) error {
+func (k *KinesisSource) ConfigureByDSN(string, map[string]string, *log.Entry, string) error {
 	return fmt.Errorf("kinesis datasource does not support command-line acquisition")
 }
 

+ 5 - 1
pkg/acquisition/modules/kubernetesaudit/k8s_audit.go

@@ -48,6 +48,10 @@ var requestCount = prometheus.NewCounterVec(
 	},
 	[]string{"source"})
 
+func (ka *KubernetesAuditSource) GetUuid() string {
+	return ka.config.UniqueId
+}
+
 func (ka *KubernetesAuditSource) GetMetrics() []prometheus.Collector {
 	return []prometheus.Collector{eventCount, requestCount}
 }
@@ -105,7 +109,7 @@ func (ka *KubernetesAuditSource) Configure(config []byte, logger *log.Entry) err
 	return nil
 }
 
-func (ka *KubernetesAuditSource) ConfigureByDSN(dsn string, labels map[string]string, logger *log.Entry) error {
+func (ka *KubernetesAuditSource) ConfigureByDSN(dsn string, labels map[string]string, logger *log.Entry, uuid string) error {
 	return fmt.Errorf("k8s-audit datasource does not support command-line acquisition")
 }
 

+ 62 - 19
pkg/acquisition/modules/s3/s3.go

@@ -2,12 +2,15 @@ package s3acquisition
 
 import (
 	"bufio"
+	"bytes"
 	"compress/gzip"
 	"context"
 	"encoding/json"
 	"fmt"
+	"io"
 	"net/url"
 	"sort"
+	"strconv"
 	"strings"
 	"time"
 
@@ -22,7 +25,6 @@ import (
 	"github.com/crowdsecurity/crowdsec/pkg/types"
 	"github.com/pkg/errors"
 	"github.com/prometheus/client_golang/prometheus"
-	"github.com/sirupsen/logrus"
 	log "github.com/sirupsen/logrus"
 	"gopkg.in/tomb.v2"
 	"gopkg.in/yaml.v2"
@@ -40,6 +42,7 @@ type S3Configuration struct {
 	PollingInterval                   int     `yaml:"polling_interval"`
 	SQSName                           string  `yaml:"sqs_name"`
 	SQSFormat                         string  `yaml:"sqs_format"`
+	MaxBufferSize                     int     `yaml:"max_buffer_size"`
 }
 
 type S3Source struct {
@@ -138,10 +141,12 @@ func (s *S3Source) newS3Client() error {
 	if s.Config.AwsEndpoint != "" {
 		config = config.WithEndpoint(s.Config.AwsEndpoint)
 	}
+
 	s.s3Client = s3.New(sess, config)
 	if s.s3Client == nil {
 		return fmt.Errorf("failed to create S3 client")
 	}
+
 	return nil
 }
 
@@ -372,7 +377,7 @@ func (s *S3Source) readFile(bucket string, key string) error {
 	//TODO: Handle SSE-C
 	var scanner *bufio.Scanner
 
-	logger := s.logger.WithFields(logrus.Fields{
+	logger := s.logger.WithFields(log.Fields{
 		"method": "readFile",
 		"bucket": bucket,
 		"key":    key,
@@ -382,20 +387,36 @@ func (s *S3Source) readFile(bucket string, key string) error {
 		Bucket: aws.String(bucket),
 		Key:    aws.String(key),
 	})
+
 	if err != nil {
 		return fmt.Errorf("failed to get object %s/%s: %w", bucket, key, err)
 	}
 	defer output.Body.Close()
+
 	if strings.HasSuffix(key, ".gz") {
-		gzReader, err := gzip.NewReader(output.Body)
+		//This *might* be a gzipped file, but sometimes the SDK will decompress the data for us (it's not clear when it happens, only had the issue with cloudtrail logs)
+		header := make([]byte, 2)
+		_, err := output.Body.Read(header)
 		if err != nil {
-			return fmt.Errorf("failed to read gzip object %s/%s: %w", bucket, key, err)
+			return fmt.Errorf("failed to read header of object %s/%s: %w", bucket, key, err)
+		}
+		if header[0] == 0x1f && header[1] == 0x8b {
+			gz, err := gzip.NewReader(io.MultiReader(bytes.NewReader(header), output.Body))
+			if err != nil {
+				return fmt.Errorf("failed to create gzip reader for object %s/%s: %w", bucket, key, err)
+			}
+			scanner = bufio.NewScanner(gz)
+		} else {
+			scanner = bufio.NewScanner(io.MultiReader(bytes.NewReader(header), output.Body))
 		}
-		defer gzReader.Close()
-		scanner = bufio.NewScanner(gzReader)
 	} else {
 		scanner = bufio.NewScanner(output.Body)
 	}
+	if s.Config.MaxBufferSize > 0 {
+		s.logger.Infof("Setting max buffer size to %d", s.Config.MaxBufferSize)
+		buf := make([]byte, 0, bufio.MaxScanTokenSize)
+		scanner.Buffer(buf, s.Config.MaxBufferSize)
+	}
 	for scanner.Scan() {
 		text := scanner.Text()
 		logger.Tracef("Read line %s", text)
@@ -422,6 +443,10 @@ func (s *S3Source) readFile(bucket string, key string) error {
 	return nil
 }
 
+func (s *S3Source) GetUuid() string {
+	return s.Config.UniqueId
+}
+
 func (s *S3Source) GetMetrics() []prometheus.Collector {
 	return []prometheus.Collector{linesRead, objectsRead, sqsMessagesReceived}
 }
@@ -446,6 +471,10 @@ func (s *S3Source) UnmarshalConfig(yamlConfig []byte) error {
 		s.Config.PollingInterval = 60
 	}
 
+	if s.Config.MaxBufferSize == 0 {
+		s.Config.MaxBufferSize = bufio.MaxScanTokenSize
+	}
+
 	if s.Config.PollingMethod != PollMethodList && s.Config.PollingMethod != PollMethodSQS {
 		return fmt.Errorf("invalid polling method %s", s.Config.PollingMethod)
 	}
@@ -509,16 +538,16 @@ func (s *S3Source) Configure(yamlConfig []byte, logger *log.Entry) error {
 	return nil
 }
 
-func (s *S3Source) ConfigureByDSN(dsn string, labels map[string]string, logger *log.Entry) error {
+func (s *S3Source) ConfigureByDSN(dsn string, labels map[string]string, logger *log.Entry, uuid string) error {
 	if !strings.HasPrefix(dsn, "s3://") {
 		return fmt.Errorf("invalid DSN %s for S3 source, must start with s3://", dsn)
 	}
 
+	s.Config = S3Configuration{}
 	s.logger = logger.WithFields(log.Fields{
 		"bucket": s.Config.BucketName,
 		"prefix": s.Config.Prefix,
 	})
-
 	dsn = strings.TrimPrefix(dsn, "s3://")
 	args := strings.Split(dsn, "?")
 	if len(args[0]) == 0 {
@@ -531,23 +560,35 @@ func (s *S3Source) ConfigureByDSN(dsn string, labels map[string]string, logger *
 			return errors.Wrap(err, "could not parse s3 args")
 		}
 		for key, value := range params {
-			if key != "log_level" {
-				return fmt.Errorf("unsupported key %s in s3 DSN", key)
-			}
-			if len(value) != 1 {
-				return errors.New("expected zero or one value for 'log_level'")
-			}
-			lvl, err := log.ParseLevel(value[0])
-			if err != nil {
-				return errors.Wrapf(err, "unknown level %s", value[0])
+			switch key {
+			case "log_level":
+				if len(value) != 1 {
+					return errors.New("expected zero or one value for 'log_level'")
+				}
+				lvl, err := log.ParseLevel(value[0])
+				if err != nil {
+					return errors.Wrapf(err, "unknown level %s", value[0])
+				}
+				s.logger.Logger.SetLevel(lvl)
+			case "max_buffer_size":
+				if len(value) != 1 {
+					return errors.New("expected zero or one value for 'max_buffer_size'")
+				}
+				maxBufferSize, err := strconv.Atoi(value[0])
+				if err != nil {
+					return errors.Wrapf(err, "invalid value for 'max_buffer_size'")
+				}
+				s.logger.Debugf("Setting max buffer size to %d", maxBufferSize)
+				s.Config.MaxBufferSize = maxBufferSize
+			default:
+				return fmt.Errorf("unknown parameter %s", key)
 			}
-			s.logger.Logger.SetLevel(lvl)
 		}
 	}
 
-	s.Config = S3Configuration{}
 	s.Config.Labels = labels
 	s.Config.Mode = configuration.CAT_MODE
+	s.Config.UniqueId = uuid
 
 	pathParts := strings.Split(args[0], "/")
 	s.logger.Debugf("pathParts: %v", pathParts)
@@ -587,6 +628,7 @@ func (s *S3Source) OneShotAcquisition(out chan types.Event, t *tomb.Tomb) error
 	s.logger.Infof("starting acquisition of %s/%s/%s", s.Config.BucketName, s.Config.Prefix, s.Config.Key)
 	s.out = out
 	s.ctx, s.cancel = context.WithCancel(context.Background())
+	s.Config.UseTimeMachine = true
 	if s.Config.Key != "" {
 		err := s.readFile(s.Config.BucketName, s.Config.Key)
 		if err != nil {
@@ -605,6 +647,7 @@ func (s *S3Source) OneShotAcquisition(out chan types.Event, t *tomb.Tomb) error
 			}
 		}
 	}
+	t.Kill(nil)
 	return nil
 }
 

+ 3 - 2
pkg/acquisition/modules/s3/s3_test.go

@@ -234,7 +234,7 @@ func TestDSNAcquis(t *testing.T) {
 			linesRead := 0
 			f := S3Source{}
 			logger := log.NewEntry(log.New())
-			err := f.ConfigureByDSN(test.dsn, map[string]string{"foo": "bar"}, logger)
+			err := f.ConfigureByDSN(test.dsn, map[string]string{"foo": "bar"}, logger, "")
 			if err != nil {
 				t.Fatalf("unexpected error: %s", err.Error())
 			}
@@ -257,7 +257,8 @@ func TestDSNAcquis(t *testing.T) {
 			}()
 
 			f.s3Client = mockS3Client{}
-			err = f.OneShotAcquisition(out, nil)
+			tmb := tomb.Tomb{}
+			err = f.OneShotAcquisition(out, &tmb)
 			if err != nil {
 				t.Fatalf("unexpected error: %s", err.Error())
 			}

+ 5 - 1
pkg/acquisition/modules/syslog/syslog.go

@@ -48,6 +48,10 @@ var linesParsed = prometheus.NewCounterVec(
 	},
 	[]string{"source", "type"})
 
+func (s *SyslogSource) GetUuid() string {
+	return s.config.UniqueId
+}
+
 func (s *SyslogSource) GetName() string {
 	return "syslog"
 }
@@ -72,7 +76,7 @@ func (s *SyslogSource) GetAggregMetrics() []prometheus.Collector {
 	return []prometheus.Collector{linesReceived, linesParsed}
 }
 
-func (s *SyslogSource) ConfigureByDSN(dsn string, labels map[string]string, logger *log.Entry) error {
+func (s *SyslogSource) ConfigureByDSN(dsn string, labels map[string]string, logger *log.Entry, uuid string) error {
 	return fmt.Errorf("syslog datasource does not support one shot acquisition")
 }
 

+ 5 - 1
pkg/acquisition/modules/wineventlog/wineventlog.go

@@ -15,6 +15,10 @@ import (
 
 type WinEventLogSource struct{}
 
+func (w *WinEventLogSource) GetUuid() string {
+	return ""
+}
+
 func (w *WinEventLogSource) UnmarshalConfig(yamlConfig []byte) error {
 	return nil
 }
@@ -23,7 +27,7 @@ func (w *WinEventLogSource) Configure(yamlConfig []byte, logger *log.Entry) erro
 	return nil
 }
 
-func (w *WinEventLogSource) ConfigureByDSN(dsn string, labels map[string]string, logger *log.Entry) error {
+func (w *WinEventLogSource) ConfigureByDSN(dsn string, labels map[string]string, logger *log.Entry, uuid string) error {
 	return nil
 }
 

+ 5 - 1
pkg/acquisition/modules/wineventlog/wineventlog_windows.go

@@ -228,6 +228,10 @@ func (w *WinEventLogSource) generateConfig(query string) (*winlog.SubscribeConfi
 	return &config, nil
 }
 
+func (w *WinEventLogSource) GetUuid() string {
+	return w.config.UniqueId
+}
+
 func (w *WinEventLogSource) UnmarshalConfig(yamlConfig []byte) error {
 	w.config = WinEventLogConfiguration{}
 
@@ -280,7 +284,7 @@ func (w *WinEventLogSource) Configure(yamlConfig []byte, logger *log.Entry) erro
 	return nil
 }
 
-func (w *WinEventLogSource) ConfigureByDSN(dsn string, labels map[string]string, logger *log.Entry) error {
+func (w *WinEventLogSource) ConfigureByDSN(dsn string, labels map[string]string, logger *log.Entry, uuid string) error {
 	return nil
 }