From 3287eed9317c452df8f9d9497a2df4a8a455ec43 Mon Sep 17 00:00:00 2001 From: Sebastien Blot Date: Fri, 23 Apr 2021 12:04:01 +0200 Subject: [PATCH] wip --- cmd/crowdsec/main.go | 2 +- go.mod | 1 + pkg/acquisition/acquisition.go | 75 +++--- .../configuration/configuration.go | 12 + pkg/acquisition/file_reader.go | 227 ------------------ pkg/acquisition/modules/file/file.go | 103 ++++++++ .../{ => modules/file}/file_reader_test.go | 2 +- .../journalctl}/journalctl_reader.go | 2 +- .../journalctl}/journalctl_reader_test.go | 2 +- 9 files changed, 153 insertions(+), 273 deletions(-) create mode 100644 pkg/acquisition/configuration/configuration.go delete mode 100644 pkg/acquisition/file_reader.go create mode 100644 pkg/acquisition/modules/file/file.go rename pkg/acquisition/{ => modules/file}/file_reader_test.go (99%) rename pkg/acquisition/{ => modules/journalctl}/journalctl_reader.go (99%) rename pkg/acquisition/{ => modules/journalctl}/journalctl_reader_test.go (99%) diff --git a/cmd/crowdsec/main.go b/cmd/crowdsec/main.go index 253ff4cf6..f7a4f088d 100644 --- a/cmd/crowdsec/main.go +++ b/cmd/crowdsec/main.go @@ -142,7 +142,7 @@ func LoadAcquisition(cConfig *csconfig.Config) error { if flags.SingleFilePath != "" || flags.SingleJournalctlFilter != "" { - tmpCfg := acquisition.DataSourceCfg{} + tmpCfg := acquisition.DataSourceCommonCfg{} tmpCfg.Mode = acquisition.CAT_MODE tmpCfg.Labels = map[string]string{"type": flags.SingleFileType} diff --git a/go.mod b/go.mod index 8b0022045..fcca17625 100644 --- a/go.mod +++ b/go.mod @@ -67,6 +67,7 @@ require ( google.golang.org/genproto v0.0.0-20210114201628-6edceaf6022f // indirect google.golang.org/grpc v1.35.0 // indirect gopkg.in/natefinch/lumberjack.v2 v2.0.0 + gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 // indirect gopkg.in/tomb.v2 v2.0.0-20161208151619-d5d1b5820637 gopkg.in/yaml.v2 v2.4.0 gotest.tools/v3 v3.0.3 // indirect diff --git a/pkg/acquisition/acquisition.go b/pkg/acquisition/acquisition.go index 988cdfbf6..178df4622 100644 --- a/pkg/acquisition/acquisition.go +++ b/pkg/acquisition/acquisition.go @@ -5,6 +5,8 @@ import ( "io" "os" + "github.com/crowdsecurity/crowdsec/pkg/acquisition/configuration" + file_acquisition "github.com/crowdsecurity/crowdsec/pkg/acquisition/modules/file" "github.com/crowdsecurity/crowdsec/pkg/csconfig" "github.com/crowdsecurity/crowdsec/pkg/types" "github.com/pkg/errors" @@ -57,52 +59,28 @@ cat mode will return once source has been exhausted. - how to deal with "file was not present at startup but might appear later" ? */ -var TAIL_MODE = "tail" -var CAT_MODE = "cat" - -type DataSourceCfg struct { - Mode string `yaml:"mode,omitempty"` //tail|cat|... - Filename string `yaml:"filename,omitempty"` - Filenames []string `yaml:"filenames,omitempty"` - JournalctlFilters []string `yaml:"journalctl_filter,omitempty"` - Labels map[string]string `yaml:"labels,omitempty"` - Profiling bool `yaml:"profiling,omitempty"` +var DataSourceMap = map[string]interface{}{ + "file": file_acquisition.FileSource{}, } +// The interface each datasource module must implement type DataSource interface { - Configure(DataSourceCfg) error - /*the readers must watch the tomb (especially in tail mode) to know when to shutdown. - tomb is as well used to trigger general shutdown when a datasource errors */ - StartReading(chan types.Event, *tomb.Tomb) error - Mode() string //return CAT_MODE or TAIL_MODE - //Not sure it makes sense to make those funcs part of the interface. - //While 'cat' and 'tail' are the only two modes we see now, other modes might appear - //StartTail(chan types.Event, *tomb.Tomb) error - //StartCat(chan types.Event, *tomb.Tomb) error + Configure([]byte) error // Configure the datasource + Mode() string // Get the mode (TAIL, CAT or SERVER) + SupportedModes() []string // Returns the mode supported by the datasource + OneShotAcquisition(chan types.Event, *tomb.Tomb) error // Start one shot acquisition(eg, cat a file) + LiveAcquisition(chan types.Event, *tomb.Tomb) error // Start live acquisition (eg, tail a file) } -func DataSourceConfigure(config DataSourceCfg) (DataSource, error) { - if config.Mode == "" { /*default mode is tail*/ - config.Mode = TAIL_MODE +func DataSourceConfigure(config configuration.DataSourceCommonCfg) (DataSource, error) { + dataSource := DataSourceMap[config.Type] + if dataSource == nil { + return nil, errors.Errorf("Unknown datasource %s", config.Type) } + dataSourceInstance := dataSource.New() + dataSourceInstance.Configure([]byte("")) - if len(config.Filename) > 0 || len(config.Filenames) > 0 { /*it's file acquisition*/ - - fileSrc := new(FileSource) - if err := fileSrc.Configure(config); err != nil { - return nil, errors.Wrap(err, "configuring file datasource") - } - return fileSrc, nil - } else if len(config.JournalctlFilters) > 0 { /*it's journald acquisition*/ - - journaldSrc := new(JournaldSource) - if err := journaldSrc.Configure(config); err != nil { - return nil, errors.Wrap(err, "configuring journald datasource") - } - return journaldSrc, nil - } else { - return nil, fmt.Errorf("empty filename(s) and journalctl filter, malformed datasource") - } + return nil, nil } func LoadAcquisitionFromFile(config *csconfig.CrowdsecServiceCfg) ([]DataSource, error) { @@ -119,7 +97,7 @@ func LoadAcquisitionFromFile(config *csconfig.CrowdsecServiceCfg) ([]DataSource, dec := yaml.NewDecoder(yamlFile) dec.SetStrict(true) for { - sub := DataSourceCfg{} + var sub interface{} err = dec.Decode(&sub) if err != nil { if err == io.EOF { @@ -128,6 +106,14 @@ func LoadAcquisitionFromFile(config *csconfig.CrowdsecServiceCfg) ([]DataSource, } return nil, errors.Wrap(err, fmt.Sprintf("failed to yaml decode %s", acquisFile)) } + // If no type is defined, assume file for backward compatibility + if sub.Type == "" { + sub.Type = "file" + } + // default mode is tail + if sub.Mode == "" { + sub.Mode = configuration.TAIL_MODE + } src, err := DataSourceConfigure(sub) if err != nil { log.Warningf("while configuring datasource : %s", err) @@ -141,13 +127,18 @@ func LoadAcquisitionFromFile(config *csconfig.CrowdsecServiceCfg) ([]DataSource, } func StartAcquisition(sources []DataSource, output chan types.Event, AcquisTomb *tomb.Tomb) error { - for i := 0; i < len(sources); i++ { subsrc := sources[i] //ensure its a copy log.Debugf("starting one source %d/%d ->> %T", i, len(sources), subsrc) AcquisTomb.Go(func() error { defer types.CatchPanic("crowdsec/acquis") - if err := subsrc.StartReading(output, AcquisTomb); err != nil { + var err error + if subsrc.Mode() == configuration.TAIL_MODE { + err = subsrc.LiveAcquisition(output, AcquisTomb) + } else { + err = subsrc.OneShotAcquisition(output, AcquisTomb) + } + if err != nil { return err } return nil diff --git a/pkg/acquisition/configuration/configuration.go b/pkg/acquisition/configuration/configuration.go new file mode 100644 index 000000000..59f1e3369 --- /dev/null +++ b/pkg/acquisition/configuration/configuration.go @@ -0,0 +1,12 @@ +package configuration + +type DataSourceCommonCfg struct { + Mode string `yaml:"mode,omitempty"` + Labels map[string]string `yaml:"labels,omitempty"` + Profiling bool `yaml:"profiling,omitempty"` + Type string `yaml:"type,omitempty"` +} + +var TAIL_MODE = "tail" +var CAT_MODE = "cat" +var SERVER_MODE = "server" // No difference with tail, just a bit more verbose diff --git a/pkg/acquisition/file_reader.go b/pkg/acquisition/file_reader.go deleted file mode 100644 index c525b3188..000000000 --- a/pkg/acquisition/file_reader.go +++ /dev/null @@ -1,227 +0,0 @@ -package acquisition - -import ( - "bufio" - "compress/gzip" - "fmt" - "os" - "path/filepath" - "strings" - "time" - - "github.com/pkg/errors" - "golang.org/x/sys/unix" - - leaky "github.com/crowdsecurity/crowdsec/pkg/leakybucket" - - "github.com/crowdsecurity/crowdsec/pkg/types" - "github.com/nxadm/tail" - log "github.com/sirupsen/logrus" - - "github.com/prometheus/client_golang/prometheus" - tomb "gopkg.in/tomb.v2" -) - -type FileSource struct { - Config DataSourceCfg - tails []*tail.Tail - Files []string -} - -func (f *FileSource) Configure(Config DataSourceCfg) error { - f.Config = Config - if len(Config.Filename) == 0 && len(Config.Filenames) == 0 { - return fmt.Errorf("no filename or filenames") - } - - //let's deal with the array no matter what - if len(Config.Filename) != 0 { - Config.Filenames = append(Config.Filenames, Config.Filename) - } - - for _, fexpr := range Config.Filenames { - files, err := filepath.Glob(fexpr) - if err != nil { - return errors.Wrapf(err, "while globbing %s", fexpr) - } - if len(files) == 0 { - log.Warningf("[file datasource] no results for %s", fexpr) - continue - } - - for _, file := range files { - /*check that we can read said file*/ - if err := unix.Access(file, unix.R_OK); err != nil { - return fmt.Errorf("unable to open %s : %s", file, err) - } - log.Infof("[file datasource] opening file '%s'", file) - - if f.Config.Mode == TAIL_MODE { - tail, err := tail.TailFile(file, tail.Config{ReOpen: true, Follow: true, Poll: true, Location: &tail.SeekInfo{Offset: 0, Whence: 2}}) - if err != nil { - log.Errorf("[file datasource] skipping %s : %v", file, err) - continue - } - f.Files = append(f.Files, file) - f.tails = append(f.tails, tail) - } else if f.Config.Mode == CAT_MODE { - //simply check that the file exists, it will be read differently - if _, err := os.Stat(file); err != nil { - return fmt.Errorf("can't open file %s : %s", file, err) - } - f.Files = append(f.Files, file) - } else { - return fmt.Errorf("unknown mode %s for file acquisition", f.Config.Mode) - } - - } - } - if len(f.Files) == 0 { - return fmt.Errorf("no files to read for %+v", Config.Filenames) - } - - return nil -} - -func (f *FileSource) Mode() string { - return f.Config.Mode -} - -func (f *FileSource) StartReading(out chan types.Event, t *tomb.Tomb) error { - - if f.Config.Mode == CAT_MODE { - return f.StartCat(out, t) - } else if f.Config.Mode == TAIL_MODE { - return f.StartTail(out, t) - } else { - return fmt.Errorf("unknown mode '%s' for file acquisition", f.Config.Mode) - } -} - -/*A tail-mode file reader (tail) */ -func (f *FileSource) StartTail(output chan types.Event, AcquisTomb *tomb.Tomb) error { - log.Debugf("starting file tail with %d items", len(f.tails)) - for i := 0; i < len(f.tails); i++ { - idx := i - log.Debugf("starting %d", idx) - AcquisTomb.Go(func() error { - defer types.CatchPanic("crowdsec/acquis/tailfile") - return f.TailOneFile(output, AcquisTomb, idx) - }) - } - return nil -} - -/*A one shot file reader (cat) */ -func (f *FileSource) StartCat(output chan types.Event, AcquisTomb *tomb.Tomb) error { - for i := 0; i < len(f.Files); i++ { - idx := i - log.Debugf("starting %d", idx) - AcquisTomb.Go(func() error { - defer types.CatchPanic("crowdsec/acquis/catfile") - return f.CatOneFile(output, AcquisTomb, idx) - }) - } - return nil -} - -/*A tail-mode file reader (tail) */ -func (f *FileSource) TailOneFile(output chan types.Event, AcquisTomb *tomb.Tomb, idx int) error { - - file := f.Files[idx] - tail := f.tails[idx] - - clog := log.WithFields(log.Fields{ - "acquisition file": f.Files[idx], - }) - clog.Debugf("starting") - - timeout := time.Tick(1 * time.Second) - - for { - l := types.Line{} - select { - case <-AcquisTomb.Dying(): //we are being killed by main - clog.Infof("file datasource %s stopping", file) - if err := tail.Stop(); err != nil { - clog.Errorf("error in stop : %s", err) - } - return nil - case <-tail.Tomb.Dying(): //our tailer is dying - clog.Warningf("File reader of %s died", file) - AcquisTomb.Kill(fmt.Errorf("dead reader for %s", file)) - return fmt.Errorf("reader for %s is dead", file) - case line := <-tail.Lines: - if line == nil { - clog.Debugf("Nil line") - return fmt.Errorf("tail for %s is empty", file) - } - if line.Err != nil { - log.Warningf("fetch error : %v", line.Err) - return line.Err - } - if line.Text == "" { //skip empty lines - continue - } - ReaderHits.With(prometheus.Labels{"source": file}).Inc() - - l.Raw = line.Text - l.Labels = f.Config.Labels - l.Time = line.Time - l.Src = file - l.Process = true - //we're tailing, it must be real time logs - log.Debugf("pushing %+v", l) - output <- types.Event{Line: l, Process: true, Type: types.LOG, ExpectMode: leaky.LIVE} - case <-timeout: - //time out, shall we do stuff ? - clog.Debugf("timeout") - } - } -} - -/*A one shot file reader (cat) */ -func (f *FileSource) CatOneFile(output chan types.Event, AcquisTomb *tomb.Tomb, idx int) error { - var scanner *bufio.Scanner - - log.Infof("reading %s at once", f.Files[idx]) - file := f.Files[idx] - - clog := log.WithFields(log.Fields{ - "file": file, - }) - fd, err := os.Open(file) - defer fd.Close() - if err != nil { - clog.Errorf("Failed opening file: %s", err) - return errors.Wrapf(err, "failed opening %s", f.Files[idx]) - } - - if strings.HasSuffix(file, ".gz") { - gz, err := gzip.NewReader(fd) - if err != nil { - clog.Errorf("Failed to read gz file: %s", err) - return errors.Wrapf(err, "failed to read gz %s", f.Files[idx]) - } - defer gz.Close() - scanner = bufio.NewScanner(gz) - - } else { - scanner = bufio.NewScanner(fd) - } - scanner.Split(bufio.ScanLines) - for scanner.Scan() { - log.Tracef("line %s", scanner.Text()) - l := types.Line{} - l.Raw = scanner.Text() - l.Time = time.Now() - l.Src = file - l.Labels = f.Config.Labels - l.Process = true - ReaderHits.With(prometheus.Labels{"source": file}).Inc() - //we're reading logs at once, it must be time-machine buckets - output <- types.Event{Line: l, Process: true, Type: types.LOG, ExpectMode: leaky.TIMEMACHINE} - } - AcquisTomb.Kill(nil) - return nil -} diff --git a/pkg/acquisition/modules/file/file.go b/pkg/acquisition/modules/file/file.go new file mode 100644 index 000000000..087785fd5 --- /dev/null +++ b/pkg/acquisition/modules/file/file.go @@ -0,0 +1,103 @@ +package file_acquisition + +import ( + "bufio" + "compress/gzip" + "os" + "strings" + "time" + + "github.com/crowdsecurity/crowdsec/pkg/acquisition/configuration" + leaky "github.com/crowdsecurity/crowdsec/pkg/leakybucket" + "github.com/crowdsecurity/crowdsec/pkg/types" + "github.com/nxadm/tail" + "github.com/pkg/errors" + log "github.com/sirupsen/logrus" + "gopkg.in/tomb.v2" +) + +type FileConfiguration struct { + Filenames []string + Filename string +} + +type FileSource struct { + CommonConfig configuration.DataSourceCommonCfg + FileConfig FileConfiguration + tails []*tail.Tail + Files []string +} + +func (f *FileSource) Configure(Config []byte) error { + return nil +} + +func (f *FileSource) Mode() string { + return f.CommonConfig.Mode +} + +func (f *FileSource) SupportedModes() []string { + return []string{configuration.TAIL_MODE, configuration.CAT_MODE} +} + +func (f *FileSource) OneShotAcquisition(out chan types.Event, t *tomb.Tomb) error { + for _, filename := range f.FileConfig.Filenames { + log.Infof("reading %s at once", filename) + err := f.readFile(filename, out, t) + if err != nil { + return err + } + } + return nil +} + +func (f *FileSource) LiveAcquisition(out chan types.Event, t *tomb.Tomb) error { + return nil +} + +func (f *FileSource) New() *FileSource { + return &FileSource{} +} + +func (f *FileSource) readFile(filename string, out chan types.Event, t *tomb.Tomb) error { + var scanner *bufio.Scanner + + clog := log.WithFields(log.Fields{ + "file": filename, + }) + fd, err := os.Open(filename) + if err != nil { + clog.Errorf("Failed opening file: %s", err) + return errors.Wrapf(err, "failed opening %s", filename) + } + defer fd.Close() + + if strings.HasSuffix(filename, ".gz") { + gz, err := gzip.NewReader(fd) + if err != nil { + clog.Errorf("Failed to read gz file: %s", err) + return errors.Wrapf(err, "failed to read gz %s", filename) + } + defer gz.Close() + scanner = bufio.NewScanner(gz) + + } else { + scanner = bufio.NewScanner(fd) + } + scanner.Split(bufio.ScanLines) + for scanner.Scan() { + log.Tracef("line %s", scanner.Text()) + l := types.Line{} + l.Raw = scanner.Text() + l.Time = time.Now() + l.Src = filename + l.Labels = f.CommonConfig.Labels + l.Process = true + // FIXME: How to interact with prom metrics ? + //ReaderHits.With(prometheus.Labels{"source": filename}).Inc() + //we're reading logs at once, it must be time-machine buckets + out <- types.Event{Line: l, Process: true, Type: types.LOG, ExpectMode: leaky.TIMEMACHINE} + } + t.Kill(nil) + return nil +} diff --git a/pkg/acquisition/file_reader_test.go b/pkg/acquisition/modules/file/file_reader_test.go similarity index 99% rename from pkg/acquisition/file_reader_test.go rename to pkg/acquisition/modules/file/file_reader_test.go index 6151dd742..539f0b5ef 100644 --- a/pkg/acquisition/file_reader_test.go +++ b/pkg/acquisition/modules/file/file_reader_test.go @@ -1,4 +1,4 @@ -package acquisition +package file_acquisition import ( "fmt" diff --git a/pkg/acquisition/journalctl_reader.go b/pkg/acquisition/modules/journalctl/journalctl_reader.go similarity index 99% rename from pkg/acquisition/journalctl_reader.go rename to pkg/acquisition/modules/journalctl/journalctl_reader.go index 089a907b3..dec903b68 100644 --- a/pkg/acquisition/journalctl_reader.go +++ b/pkg/acquisition/modules/journalctl/journalctl_reader.go @@ -1,4 +1,4 @@ -package acquisition +package journalctl_acquisition import ( "bufio" diff --git a/pkg/acquisition/journalctl_reader_test.go b/pkg/acquisition/modules/journalctl/journalctl_reader_test.go similarity index 99% rename from pkg/acquisition/journalctl_reader_test.go rename to pkg/acquisition/modules/journalctl/journalctl_reader_test.go index 00e43a697..3f0664b6b 100644 --- a/pkg/acquisition/journalctl_reader_test.go +++ b/pkg/acquisition/modules/journalctl/journalctl_reader_test.go @@ -1,4 +1,4 @@ -package acquisition +package journalctl_acquisition import ( "fmt"