This commit is contained in:
Sebastien Blot 2021-04-23 12:04:01 +02:00
parent 430b4bafbc
commit 3287eed931
No known key found for this signature in database
GPG key ID: DFC2902F40449F6A
9 changed files with 153 additions and 273 deletions

View file

@ -142,7 +142,7 @@ func LoadAcquisition(cConfig *csconfig.Config) error {
if flags.SingleFilePath != "" || flags.SingleJournalctlFilter != "" {
tmpCfg := acquisition.DataSourceCfg{}
tmpCfg := acquisition.DataSourceCommonCfg{}
tmpCfg.Mode = acquisition.CAT_MODE
tmpCfg.Labels = map[string]string{"type": flags.SingleFileType}

1
go.mod
View file

@ -67,6 +67,7 @@ require (
google.golang.org/genproto v0.0.0-20210114201628-6edceaf6022f // indirect
google.golang.org/grpc v1.35.0 // indirect
gopkg.in/natefinch/lumberjack.v2 v2.0.0
gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 // indirect
gopkg.in/tomb.v2 v2.0.0-20161208151619-d5d1b5820637
gopkg.in/yaml.v2 v2.4.0
gotest.tools/v3 v3.0.3 // indirect

View file

@ -5,6 +5,8 @@ import (
"io"
"os"
"github.com/crowdsecurity/crowdsec/pkg/acquisition/configuration"
file_acquisition "github.com/crowdsecurity/crowdsec/pkg/acquisition/modules/file"
"github.com/crowdsecurity/crowdsec/pkg/csconfig"
"github.com/crowdsecurity/crowdsec/pkg/types"
"github.com/pkg/errors"
@ -57,52 +59,28 @@ cat mode will return once source has been exhausted.
- how to deal with "file was not present at startup but might appear later" ?
*/
var TAIL_MODE = "tail"
var CAT_MODE = "cat"
type DataSourceCfg struct {
Mode string `yaml:"mode,omitempty"` //tail|cat|...
Filename string `yaml:"filename,omitempty"`
Filenames []string `yaml:"filenames,omitempty"`
JournalctlFilters []string `yaml:"journalctl_filter,omitempty"`
Labels map[string]string `yaml:"labels,omitempty"`
Profiling bool `yaml:"profiling,omitempty"`
var DataSourceMap = map[string]interface{}{
"file": file_acquisition.FileSource{},
}
// The interface each datasource module must implement
type DataSource interface {
Configure(DataSourceCfg) error
/*the readers must watch the tomb (especially in tail mode) to know when to shutdown.
tomb is as well used to trigger general shutdown when a datasource errors */
StartReading(chan types.Event, *tomb.Tomb) error
Mode() string //return CAT_MODE or TAIL_MODE
//Not sure it makes sense to make those funcs part of the interface.
//While 'cat' and 'tail' are the only two modes we see now, other modes might appear
//StartTail(chan types.Event, *tomb.Tomb) error
//StartCat(chan types.Event, *tomb.Tomb) error
Configure([]byte) error // Configure the datasource
Mode() string // Get the mode (TAIL, CAT or SERVER)
SupportedModes() []string // Returns the mode supported by the datasource
OneShotAcquisition(chan types.Event, *tomb.Tomb) error // Start one shot acquisition(eg, cat a file)
LiveAcquisition(chan types.Event, *tomb.Tomb) error // Start live acquisition (eg, tail a file)
}
func DataSourceConfigure(config DataSourceCfg) (DataSource, error) {
if config.Mode == "" { /*default mode is tail*/
config.Mode = TAIL_MODE
func DataSourceConfigure(config configuration.DataSourceCommonCfg) (DataSource, error) {
dataSource := DataSourceMap[config.Type]
if dataSource == nil {
return nil, errors.Errorf("Unknown datasource %s", config.Type)
}
dataSourceInstance := dataSource.New()
dataSourceInstance.Configure([]byte(""))
if len(config.Filename) > 0 || len(config.Filenames) > 0 { /*it's file acquisition*/
fileSrc := new(FileSource)
if err := fileSrc.Configure(config); err != nil {
return nil, errors.Wrap(err, "configuring file datasource")
}
return fileSrc, nil
} else if len(config.JournalctlFilters) > 0 { /*it's journald acquisition*/
journaldSrc := new(JournaldSource)
if err := journaldSrc.Configure(config); err != nil {
return nil, errors.Wrap(err, "configuring journald datasource")
}
return journaldSrc, nil
} else {
return nil, fmt.Errorf("empty filename(s) and journalctl filter, malformed datasource")
}
return nil, nil
}
func LoadAcquisitionFromFile(config *csconfig.CrowdsecServiceCfg) ([]DataSource, error) {
@ -119,7 +97,7 @@ func LoadAcquisitionFromFile(config *csconfig.CrowdsecServiceCfg) ([]DataSource,
dec := yaml.NewDecoder(yamlFile)
dec.SetStrict(true)
for {
sub := DataSourceCfg{}
var sub interface{}
err = dec.Decode(&sub)
if err != nil {
if err == io.EOF {
@ -128,6 +106,14 @@ func LoadAcquisitionFromFile(config *csconfig.CrowdsecServiceCfg) ([]DataSource,
}
return nil, errors.Wrap(err, fmt.Sprintf("failed to yaml decode %s", acquisFile))
}
// If no type is defined, assume file for backward compatibility
if sub.Type == "" {
sub.Type = "file"
}
// default mode is tail
if sub.Mode == "" {
sub.Mode = configuration.TAIL_MODE
}
src, err := DataSourceConfigure(sub)
if err != nil {
log.Warningf("while configuring datasource : %s", err)
@ -141,13 +127,18 @@ func LoadAcquisitionFromFile(config *csconfig.CrowdsecServiceCfg) ([]DataSource,
}
func StartAcquisition(sources []DataSource, output chan types.Event, AcquisTomb *tomb.Tomb) error {
for i := 0; i < len(sources); i++ {
subsrc := sources[i] //ensure its a copy
log.Debugf("starting one source %d/%d ->> %T", i, len(sources), subsrc)
AcquisTomb.Go(func() error {
defer types.CatchPanic("crowdsec/acquis")
if err := subsrc.StartReading(output, AcquisTomb); err != nil {
var err error
if subsrc.Mode() == configuration.TAIL_MODE {
err = subsrc.LiveAcquisition(output, AcquisTomb)
} else {
err = subsrc.OneShotAcquisition(output, AcquisTomb)
}
if err != nil {
return err
}
return nil

View file

@ -0,0 +1,12 @@
package configuration
type DataSourceCommonCfg struct {
Mode string `yaml:"mode,omitempty"`
Labels map[string]string `yaml:"labels,omitempty"`
Profiling bool `yaml:"profiling,omitempty"`
Type string `yaml:"type,omitempty"`
}
var TAIL_MODE = "tail"
var CAT_MODE = "cat"
var SERVER_MODE = "server" // No difference with tail, just a bit more verbose

View file

@ -1,227 +0,0 @@
package acquisition
import (
"bufio"
"compress/gzip"
"fmt"
"os"
"path/filepath"
"strings"
"time"
"github.com/pkg/errors"
"golang.org/x/sys/unix"
leaky "github.com/crowdsecurity/crowdsec/pkg/leakybucket"
"github.com/crowdsecurity/crowdsec/pkg/types"
"github.com/nxadm/tail"
log "github.com/sirupsen/logrus"
"github.com/prometheus/client_golang/prometheus"
tomb "gopkg.in/tomb.v2"
)
type FileSource struct {
Config DataSourceCfg
tails []*tail.Tail
Files []string
}
func (f *FileSource) Configure(Config DataSourceCfg) error {
f.Config = Config
if len(Config.Filename) == 0 && len(Config.Filenames) == 0 {
return fmt.Errorf("no filename or filenames")
}
//let's deal with the array no matter what
if len(Config.Filename) != 0 {
Config.Filenames = append(Config.Filenames, Config.Filename)
}
for _, fexpr := range Config.Filenames {
files, err := filepath.Glob(fexpr)
if err != nil {
return errors.Wrapf(err, "while globbing %s", fexpr)
}
if len(files) == 0 {
log.Warningf("[file datasource] no results for %s", fexpr)
continue
}
for _, file := range files {
/*check that we can read said file*/
if err := unix.Access(file, unix.R_OK); err != nil {
return fmt.Errorf("unable to open %s : %s", file, err)
}
log.Infof("[file datasource] opening file '%s'", file)
if f.Config.Mode == TAIL_MODE {
tail, err := tail.TailFile(file, tail.Config{ReOpen: true, Follow: true, Poll: true, Location: &tail.SeekInfo{Offset: 0, Whence: 2}})
if err != nil {
log.Errorf("[file datasource] skipping %s : %v", file, err)
continue
}
f.Files = append(f.Files, file)
f.tails = append(f.tails, tail)
} else if f.Config.Mode == CAT_MODE {
//simply check that the file exists, it will be read differently
if _, err := os.Stat(file); err != nil {
return fmt.Errorf("can't open file %s : %s", file, err)
}
f.Files = append(f.Files, file)
} else {
return fmt.Errorf("unknown mode %s for file acquisition", f.Config.Mode)
}
}
}
if len(f.Files) == 0 {
return fmt.Errorf("no files to read for %+v", Config.Filenames)
}
return nil
}
func (f *FileSource) Mode() string {
return f.Config.Mode
}
func (f *FileSource) StartReading(out chan types.Event, t *tomb.Tomb) error {
if f.Config.Mode == CAT_MODE {
return f.StartCat(out, t)
} else if f.Config.Mode == TAIL_MODE {
return f.StartTail(out, t)
} else {
return fmt.Errorf("unknown mode '%s' for file acquisition", f.Config.Mode)
}
}
/*A tail-mode file reader (tail) */
func (f *FileSource) StartTail(output chan types.Event, AcquisTomb *tomb.Tomb) error {
log.Debugf("starting file tail with %d items", len(f.tails))
for i := 0; i < len(f.tails); i++ {
idx := i
log.Debugf("starting %d", idx)
AcquisTomb.Go(func() error {
defer types.CatchPanic("crowdsec/acquis/tailfile")
return f.TailOneFile(output, AcquisTomb, idx)
})
}
return nil
}
/*A one shot file reader (cat) */
func (f *FileSource) StartCat(output chan types.Event, AcquisTomb *tomb.Tomb) error {
for i := 0; i < len(f.Files); i++ {
idx := i
log.Debugf("starting %d", idx)
AcquisTomb.Go(func() error {
defer types.CatchPanic("crowdsec/acquis/catfile")
return f.CatOneFile(output, AcquisTomb, idx)
})
}
return nil
}
/*A tail-mode file reader (tail) */
func (f *FileSource) TailOneFile(output chan types.Event, AcquisTomb *tomb.Tomb, idx int) error {
file := f.Files[idx]
tail := f.tails[idx]
clog := log.WithFields(log.Fields{
"acquisition file": f.Files[idx],
})
clog.Debugf("starting")
timeout := time.Tick(1 * time.Second)
for {
l := types.Line{}
select {
case <-AcquisTomb.Dying(): //we are being killed by main
clog.Infof("file datasource %s stopping", file)
if err := tail.Stop(); err != nil {
clog.Errorf("error in stop : %s", err)
}
return nil
case <-tail.Tomb.Dying(): //our tailer is dying
clog.Warningf("File reader of %s died", file)
AcquisTomb.Kill(fmt.Errorf("dead reader for %s", file))
return fmt.Errorf("reader for %s is dead", file)
case line := <-tail.Lines:
if line == nil {
clog.Debugf("Nil line")
return fmt.Errorf("tail for %s is empty", file)
}
if line.Err != nil {
log.Warningf("fetch error : %v", line.Err)
return line.Err
}
if line.Text == "" { //skip empty lines
continue
}
ReaderHits.With(prometheus.Labels{"source": file}).Inc()
l.Raw = line.Text
l.Labels = f.Config.Labels
l.Time = line.Time
l.Src = file
l.Process = true
//we're tailing, it must be real time logs
log.Debugf("pushing %+v", l)
output <- types.Event{Line: l, Process: true, Type: types.LOG, ExpectMode: leaky.LIVE}
case <-timeout:
//time out, shall we do stuff ?
clog.Debugf("timeout")
}
}
}
/*A one shot file reader (cat) */
func (f *FileSource) CatOneFile(output chan types.Event, AcquisTomb *tomb.Tomb, idx int) error {
var scanner *bufio.Scanner
log.Infof("reading %s at once", f.Files[idx])
file := f.Files[idx]
clog := log.WithFields(log.Fields{
"file": file,
})
fd, err := os.Open(file)
defer fd.Close()
if err != nil {
clog.Errorf("Failed opening file: %s", err)
return errors.Wrapf(err, "failed opening %s", f.Files[idx])
}
if strings.HasSuffix(file, ".gz") {
gz, err := gzip.NewReader(fd)
if err != nil {
clog.Errorf("Failed to read gz file: %s", err)
return errors.Wrapf(err, "failed to read gz %s", f.Files[idx])
}
defer gz.Close()
scanner = bufio.NewScanner(gz)
} else {
scanner = bufio.NewScanner(fd)
}
scanner.Split(bufio.ScanLines)
for scanner.Scan() {
log.Tracef("line %s", scanner.Text())
l := types.Line{}
l.Raw = scanner.Text()
l.Time = time.Now()
l.Src = file
l.Labels = f.Config.Labels
l.Process = true
ReaderHits.With(prometheus.Labels{"source": file}).Inc()
//we're reading logs at once, it must be time-machine buckets
output <- types.Event{Line: l, Process: true, Type: types.LOG, ExpectMode: leaky.TIMEMACHINE}
}
AcquisTomb.Kill(nil)
return nil
}

View file

@ -0,0 +1,103 @@
package file_acquisition
import (
"bufio"
"compress/gzip"
"os"
"strings"
"time"
"github.com/crowdsecurity/crowdsec/pkg/acquisition/configuration"
leaky "github.com/crowdsecurity/crowdsec/pkg/leakybucket"
"github.com/crowdsecurity/crowdsec/pkg/types"
"github.com/nxadm/tail"
"github.com/pkg/errors"
log "github.com/sirupsen/logrus"
"gopkg.in/tomb.v2"
)
type FileConfiguration struct {
Filenames []string
Filename string
}
type FileSource struct {
CommonConfig configuration.DataSourceCommonCfg
FileConfig FileConfiguration
tails []*tail.Tail
Files []string
}
func (f *FileSource) Configure(Config []byte) error {
return nil
}
func (f *FileSource) Mode() string {
return f.CommonConfig.Mode
}
func (f *FileSource) SupportedModes() []string {
return []string{configuration.TAIL_MODE, configuration.CAT_MODE}
}
func (f *FileSource) OneShotAcquisition(out chan types.Event, t *tomb.Tomb) error {
for _, filename := range f.FileConfig.Filenames {
log.Infof("reading %s at once", filename)
err := f.readFile(filename, out, t)
if err != nil {
return err
}
}
return nil
}
func (f *FileSource) LiveAcquisition(out chan types.Event, t *tomb.Tomb) error {
return nil
}
func (f *FileSource) New() *FileSource {
return &FileSource{}
}
func (f *FileSource) readFile(filename string, out chan types.Event, t *tomb.Tomb) error {
var scanner *bufio.Scanner
clog := log.WithFields(log.Fields{
"file": filename,
})
fd, err := os.Open(filename)
if err != nil {
clog.Errorf("Failed opening file: %s", err)
return errors.Wrapf(err, "failed opening %s", filename)
}
defer fd.Close()
if strings.HasSuffix(filename, ".gz") {
gz, err := gzip.NewReader(fd)
if err != nil {
clog.Errorf("Failed to read gz file: %s", err)
return errors.Wrapf(err, "failed to read gz %s", filename)
}
defer gz.Close()
scanner = bufio.NewScanner(gz)
} else {
scanner = bufio.NewScanner(fd)
}
scanner.Split(bufio.ScanLines)
for scanner.Scan() {
log.Tracef("line %s", scanner.Text())
l := types.Line{}
l.Raw = scanner.Text()
l.Time = time.Now()
l.Src = filename
l.Labels = f.CommonConfig.Labels
l.Process = true
// FIXME: How to interact with prom metrics ?
//ReaderHits.With(prometheus.Labels{"source": filename}).Inc()
//we're reading logs at once, it must be time-machine buckets
out <- types.Event{Line: l, Process: true, Type: types.LOG, ExpectMode: leaky.TIMEMACHINE}
}
t.Kill(nil)
return nil
}

View file

@ -1,4 +1,4 @@
package acquisition
package file_acquisition
import (
"fmt"

View file

@ -1,4 +1,4 @@
package acquisition
package journalctl_acquisition
import (
"bufio"

View file

@ -1,4 +1,4 @@
package acquisition
package journalctl_acquisition
import (
"fmt"