This commit is contained in:
Sebastien Blot 2021-04-26 11:52:39 +02:00
parent 3287eed931
commit cb8955238d
No known key found for this signature in database
GPG key ID: DFC2902F40449F6A
7 changed files with 38 additions and 715 deletions

View file

@ -4,7 +4,6 @@ import (
"flag"
"fmt"
"os"
"strings"
_ "net/http/pprof"
"time"
@ -12,6 +11,7 @@ import (
"sort"
"github.com/crowdsecurity/crowdsec/pkg/acquisition"
"github.com/crowdsecurity/crowdsec/pkg/acquisition/configuration"
"github.com/crowdsecurity/crowdsec/pkg/csconfig"
"github.com/crowdsecurity/crowdsec/pkg/cwhub"
"github.com/crowdsecurity/crowdsec/pkg/cwversion"
@ -142,24 +142,24 @@ func LoadAcquisition(cConfig *csconfig.Config) error {
if flags.SingleFilePath != "" || flags.SingleJournalctlFilter != "" {
tmpCfg := acquisition.DataSourceCommonCfg{}
tmpCfg.Mode = acquisition.CAT_MODE
tmpCfg := configuration.DataSourceCommonCfg{}
tmpCfg.Mode = configuration.CAT_MODE
tmpCfg.Labels = map[string]string{"type": flags.SingleFileType}
if flags.SingleFilePath != "" {
/*if flags.SingleFilePath != "" {
tmpCfg.Filename = flags.SingleFilePath
} else if flags.SingleJournalctlFilter != "" {
tmpCfg.JournalctlFilters = strings.Split(flags.SingleJournalctlFilter, " ")
}
}*/
datasrc, err := acquisition.DataSourceConfigure(tmpCfg)
datasrc, err := acquisition.DataSourceConfigure([]byte(""), "file")
if err != nil {
return fmt.Errorf("while configuring specified file datasource : %s", err)
}
if dataSources == nil {
dataSources = make([]acquisition.DataSource, 0)
}
dataSources = append(dataSources, datasrc)
dataSources = append(dataSources, *datasrc)
} else {
dataSources, err = acquisition.LoadAcquisitionFromFile(cConfig.Crowdsec)
if err != nil {

View file

@ -6,7 +6,6 @@ import (
"os"
"github.com/crowdsecurity/crowdsec/pkg/acquisition/configuration"
file_acquisition "github.com/crowdsecurity/crowdsec/pkg/acquisition/modules/file"
"github.com/crowdsecurity/crowdsec/pkg/csconfig"
"github.com/crowdsecurity/crowdsec/pkg/types"
"github.com/pkg/errors"
@ -59,11 +58,7 @@ cat mode will return once source has been exhausted.
- how to deal with "file was not present at startup but might appear later" ?
*/
var DataSourceMap = map[string]interface{}{
"file": file_acquisition.FileSource{},
}
// The interface each datasource module must implement
// The interface each datasource must implement
type DataSource interface {
Configure([]byte) error // Configure the datasource
Mode() string // Get the mode (TAIL, CAT or SERVER)
@ -72,15 +67,16 @@ type DataSource interface {
LiveAcquisition(chan types.Event, *tomb.Tomb) error // Start live acquisition (eg, tail a file)
}
func DataSourceConfigure(config configuration.DataSourceCommonCfg) (DataSource, error) {
dataSource := DataSourceMap[config.Type]
func DataSourceConfigure(yamlConfig []byte, dataSourceType string) (*DataSource, error) {
datasourceMap := DataSourceMap{}
dataSource := datasourceMap.GetDataSource(dataSourceType)
if dataSource == nil {
return nil, errors.Errorf("Unknown datasource %s", config.Type)
return nil, errors.Errorf("Unknown datasource %s", dataSourceType)
}
dataSourceInstance := dataSource.New()
dataSourceInstance.Configure([]byte(""))
//dataSourceInstance := *dataSource.New()
err := dataSource.Configure([]byte(""))
return nil, nil
return dataSource, nil
}
func LoadAcquisitionFromFile(config *csconfig.CrowdsecServiceCfg) ([]DataSource, error) {
@ -95,9 +91,9 @@ func LoadAcquisitionFromFile(config *csconfig.CrowdsecServiceCfg) ([]DataSource,
return nil, errors.Wrapf(err, "can't open %s", acquisFile)
}
dec := yaml.NewDecoder(yamlFile)
dec.SetStrict(true)
dec.SetStrict(false)
for {
var sub interface{}
var sub configuration.DataSourceCommonCfg
err = dec.Decode(&sub)
if err != nil {
if err == io.EOF {
@ -114,12 +110,12 @@ func LoadAcquisitionFromFile(config *csconfig.CrowdsecServiceCfg) ([]DataSource,
if sub.Mode == "" {
sub.Mode = configuration.TAIL_MODE
}
src, err := DataSourceConfigure(sub)
src, err := DataSourceConfigure([]byte(""), sub.Type)
if err != nil {
log.Warningf("while configuring datasource : %s", err)
continue
}
sources = append(sources, src)
sources = append(sources, *src)
}
}

View file

@ -1,138 +1 @@
package acquisition
import (
"fmt"
"testing"
"time"
"github.com/stretchr/testify/assert"
"github.com/crowdsecurity/crowdsec/pkg/csconfig"
"github.com/crowdsecurity/crowdsec/pkg/types"
log "github.com/sirupsen/logrus"
tomb "gopkg.in/tomb.v2"
)
func TestConfigLoading(t *testing.T) {
//bad filename
cfg := csconfig.CrowdsecServiceCfg{
AcquisitionFiles: []string{"./tests/xxx.yaml"},
}
_, err := LoadAcquisitionFromFile(&cfg)
assert.Contains(t, fmt.Sprintf("%s", err), "can't open ./tests/xxx.yaml: open ./tests/xxx.yaml: no such file or directory")
//bad config file
cfg = csconfig.CrowdsecServiceCfg{
AcquisitionFiles: []string{"./tests/test.log"},
}
_, err = LoadAcquisitionFromFile(&cfg)
assert.Contains(t, fmt.Sprintf("%s", err), "failed to yaml decode ./tests/test.log: yaml: unmarshal errors")
//correct config file
cfg = csconfig.CrowdsecServiceCfg{
AcquisitionFiles: []string{"./tests/acquis_test.yaml"},
}
srcs, err := LoadAcquisitionFromFile(&cfg)
if err != nil {
t.Fatalf("unexpected error : %s", err)
}
assert.Equal(t, len(srcs), 1)
}
func TestDataSourceConfigure(t *testing.T) {
tests := []struct {
cfg DataSourceCfg
//tombState
config_error string
read_error string
tomb_error string
lines int
}{
{ //missing filename(s)
cfg: DataSourceCfg{
Mode: CAT_MODE,
},
config_error: "empty filename(s) and journalctl filter, malformed datasource",
},
{ //missing filename(s)
cfg: DataSourceCfg{
Mode: TAIL_MODE,
},
config_error: "empty filename(s) and journalctl filter, malformed datasource",
},
{ //bad mode(s)
cfg: DataSourceCfg{
Filename: "./tests/test.log",
Mode: "ratata",
},
config_error: "configuring file datasource: unknown mode ratata for file acquisition",
},
{ //ok test
cfg: DataSourceCfg{
Mode: CAT_MODE,
Filename: "./tests/test.log",
},
},
{ //missing mode, default to CAT_MODE
cfg: DataSourceCfg{
Filename: "./tests/test.log",
},
},
{ //ok test for journalctl
cfg: DataSourceCfg{
Mode: CAT_MODE,
JournalctlFilters: []string{"-test.run=TestSimJournalctlCatOneLine", "--"},
},
},
}
for tidx, test := range tests {
srcs, err := DataSourceConfigure(test.cfg)
if test.config_error != "" {
assert.Contains(t, fmt.Sprintf("%s", err), test.config_error)
log.Infof("expected config error ok : %s", test.config_error)
continue
} else {
if err != nil {
t.Fatalf("%d/%d unexpected config error %s", tidx, len(tests), err)
}
}
//check we got the expected mode
if tests[tidx].cfg.Mode == "" {
tests[tidx].cfg.Mode = TAIL_MODE
}
assert.Equal(t, srcs.Mode(), tests[tidx].cfg.Mode)
out := make(chan types.Event)
tomb := tomb.Tomb{}
go func() {
err = StartAcquisition([]DataSource{srcs}, out, &tomb)
if test.read_error != "" {
assert.Contains(t, fmt.Sprintf("%s", err), test.read_error)
log.Infof("expected read error ok : %s", test.read_error)
} else {
if err != nil {
log.Fatalf("%d/%d unexpected read error %s", tidx, len(tests), err)
}
}
}()
log.Printf("kill iiittt")
//we're actually not interested in the result :)
tomb.Kill(nil)
time.Sleep(1 * time.Second)
if test.tomb_error != "" {
assert.Contains(t, fmt.Sprintf("%s", tomb.Err()), test.tomb_error)
log.Infof("expected tomb error ok : %s", test.read_error)
continue
} else {
if tomb.Err() != nil {
t.Fatalf("%d/%d unexpected tomb error %s", tidx, len(tests), tomb.Err())
}
}
}
}

View file

@ -0,0 +1,17 @@
package acquisition
import file_acquisition "github.com/crowdsecurity/crowdsec/pkg/acquisition/modules/file"
type DataSourceMap struct {
Datasources map[string]DataSource
}
func (d *DataSourceMap) GetDataSource(dstype string) *DataSource {
datasource := d.Datasources[dstype]
return &datasource
}
func (d *DataSourceMap) New() {
m := make(map[string]DataSource)
m["file"] = &file_acquisition.FileSource{}
}

View file

@ -29,6 +29,7 @@ type FileSource struct {
}
func (f *FileSource) Configure(Config []byte) error {
log.Warn("Configuring FileSource")
return nil
}
@ -56,6 +57,7 @@ func (f *FileSource) LiveAcquisition(out chan types.Event, t *tomb.Tomb) error {
}
func (f *FileSource) New() *FileSource {
log.Warn("Creating new FileSource")
return &FileSource{}
}

View file

@ -1,383 +1 @@
package file_acquisition
import (
"fmt"
"os"
"strings"
"testing"
"time"
"github.com/crowdsecurity/crowdsec/pkg/types"
log "github.com/sirupsen/logrus"
"github.com/stretchr/testify/assert"
tomb "gopkg.in/tomb.v2"
)
func TestAcquisCat(t *testing.T) {
tests := []struct {
cfg DataSourceCfg
//tombState
config_error string
read_error string
tomb_error string
lines int
}{
{ //missing filename(s)
cfg: DataSourceCfg{
Mode: CAT_MODE,
},
config_error: "no filename or filenames",
},
{ //forbiden file
cfg: DataSourceCfg{
Mode: CAT_MODE,
Filename: "/etc/shadow",
},
config_error: "unable to open /etc/shadow : permission denied",
},
{ //bad regexp
cfg: DataSourceCfg{
Filename: "[a-",
Mode: CAT_MODE,
},
config_error: "while globbing [a-: syntax error in pattern",
},
{ //inexisting file
cfg: DataSourceCfg{
Filename: "/does/not/exists",
Mode: CAT_MODE,
},
config_error: "no files to read for [/does/not/exists]",
},
{ //ok file
cfg: DataSourceCfg{
Filename: "./tests/test.log",
Mode: CAT_MODE,
},
lines: 1,
},
{ //invalid gz
cfg: DataSourceCfg{
Filename: "./tests/badlog.gz",
Mode: CAT_MODE,
},
lines: 0,
tomb_error: "failed to read gz ./tests/badlog.gz: EOF",
},
{ //good gz
cfg: DataSourceCfg{
Filename: "./tests/test.log.gz",
Mode: CAT_MODE,
},
lines: 1,
},
}
for tidx, test := range tests {
fileSrc := new(FileSource)
err := fileSrc.Configure(test.cfg)
if test.config_error != "" {
assert.Contains(t, fmt.Sprintf("%s", err), test.config_error)
log.Infof("expected config error ok : %s", test.config_error)
continue
} else {
if err != nil {
t.Fatalf("%d/%d unexpected config error %s", tidx, len(tests), err)
}
}
out := make(chan types.Event)
tomb := tomb.Tomb{}
count := 0
err = fileSrc.StartReading(out, &tomb)
if test.read_error != "" {
assert.Contains(t, fmt.Sprintf("%s", err), test.read_error)
log.Infof("expected read error ok : %s", test.read_error)
continue
} else {
if err != nil {
t.Fatalf("%d/%d unexpected read error %s", tidx, len(tests), err)
}
}
READLOOP:
for {
select {
case <-out:
count++
case <-time.After(1 * time.Second):
break READLOOP
}
}
if count != test.lines {
t.Fatalf("%d/%d expected %d line read, got %d", tidx, len(tests), test.lines, count)
}
if test.tomb_error != "" {
assert.Contains(t, fmt.Sprintf("%s", tomb.Err()), test.tomb_error)
log.Infof("expected tomb error ok : %s", test.read_error)
continue
} else {
if tomb.Err() != nil {
t.Fatalf("%d/%d unexpected tomb error %s", tidx, len(tests), tomb.Err())
}
}
}
}
func TestTailKill(t *testing.T) {
cfg := DataSourceCfg{
Filename: "./tests/test.log",
Mode: TAIL_MODE,
}
fileSrc := new(FileSource)
err := fileSrc.Configure(cfg)
if err != nil {
t.Fatalf("unexpected config error %s", err)
}
out := make(chan types.Event)
tb := tomb.Tomb{}
err = fileSrc.StartReading(out, &tb)
if err != nil {
t.Fatalf("unexpected read error %s", err)
}
time.Sleep(1 * time.Second)
if tb.Err() != tomb.ErrStillAlive {
t.Fatalf("unexpected tomb error %s (should be alive)", tb.Err())
}
//kill it :>
tb.Kill(nil)
time.Sleep(1 * time.Second)
if tb.Err() != nil {
t.Fatalf("unexpected tomb error %s (should be dead)", tb.Err())
}
}
func TestTailKillBis(t *testing.T) {
cfg := DataSourceCfg{
Filename: "./tests/test.log",
Mode: TAIL_MODE,
}
fileSrc := new(FileSource)
err := fileSrc.Configure(cfg)
if err != nil {
t.Fatalf("unexpected config error %s", err)
}
out := make(chan types.Event)
tb := tomb.Tomb{}
err = fileSrc.StartReading(out, &tb)
if err != nil {
t.Fatalf("unexpected read error %s", err)
}
time.Sleep(1 * time.Second)
if tb.Err() != tomb.ErrStillAlive {
t.Fatalf("unexpected tomb error %s (should be alive)", tb.Err())
}
//kill the underlying tomb of tailer
fileSrc.tails[0].Kill(fmt.Errorf("ratata"))
time.Sleep(1 * time.Second)
//it can be two errors :
if !strings.Contains(fmt.Sprintf("%s", tb.Err()), "dead reader for ./tests/test.log") &&
!strings.Contains(fmt.Sprintf("%s", tb.Err()), "tail for ./tests/test.log is empty") {
t.Fatalf("unexpected error : %s", tb.Err())
}
}
func TestTailRuntime(t *testing.T) {
//log.SetLevel(log.TraceLevel)
cfg := DataSourceCfg{
Filename: "./tests/test.log",
Mode: TAIL_MODE,
}
fileSrc := new(FileSource)
err := fileSrc.Configure(cfg)
if err != nil {
t.Fatalf("unexpected config error %s", err)
}
out := make(chan types.Event)
tb := tomb.Tomb{}
count := 0
err = fileSrc.StartReading(out, &tb)
if err != nil {
t.Fatalf("unexpected read error %s", err)
}
time.Sleep(1 * time.Second)
//write data
f, err := os.OpenFile(cfg.Filename, os.O_APPEND|os.O_WRONLY, 0644)
if err != nil {
t.Fatal(err)
}
for i := 0; i < 5; i++ {
_, err := f.WriteString(fmt.Sprintf("ratata%d\n", i))
if err != nil {
t.Fatal(err)
}
}
f.Close()
READLOOP:
for {
select {
case <-out:
count++
case <-time.After(1 * time.Second):
break READLOOP
}
}
if count != 5 {
t.Fatalf("expected %d line read, got %d", 5, count)
}
if tb.Err() != tomb.ErrStillAlive {
t.Fatalf("unexpected tomb error %s", tb.Err())
}
/*reset the file*/
f, err = os.OpenFile(cfg.Filename, os.O_CREATE|os.O_WRONLY, 0644)
if err != nil {
t.Fatal(err)
}
_, err = f.WriteString("one log line\n")
if err != nil {
t.Fatal(err)
}
f.Close()
}
func TestAcquisTail(t *testing.T) {
tests := []struct {
cfg DataSourceCfg
//tombState
config_error string
read_error string
tomb_error string
lines int
}{
{ //missing filename(s)
cfg: DataSourceCfg{
Mode: TAIL_MODE,
},
config_error: "no filename or filenames",
},
{ //forbiden file
cfg: DataSourceCfg{
Mode: TAIL_MODE,
Filename: "/etc/shadow",
},
config_error: "unable to open /etc/shadow : permission denied",
},
{ //bad regexp
cfg: DataSourceCfg{
Filename: "[a-",
Mode: TAIL_MODE,
},
config_error: "while globbing [a-: syntax error in pattern",
},
{ //inexisting file
cfg: DataSourceCfg{
Filename: "/does/not/exists",
Mode: TAIL_MODE,
},
config_error: "no files to read for [/does/not/exists]",
},
{ //ok file
cfg: DataSourceCfg{
Filename: "./tests/test.log",
Mode: TAIL_MODE,
},
lines: 0,
tomb_error: "still alive",
},
{ //invalid gz
cfg: DataSourceCfg{
Filename: "./tests/badlog.gz",
Mode: TAIL_MODE,
},
lines: 0,
tomb_error: "still alive",
},
{ //good gz
cfg: DataSourceCfg{
Filename: "./tests/test.log.gz",
Mode: TAIL_MODE,
},
lines: 0,
tomb_error: "still alive",
},
}
for tidx, test := range tests {
fileSrc := new(FileSource)
err := fileSrc.Configure(test.cfg)
if test.config_error != "" {
assert.Contains(t, fmt.Sprintf("%s", err), test.config_error)
log.Infof("expected config error ok : %s", test.config_error)
continue
} else {
if err != nil {
t.Fatalf("%d/%d unexpected config error %s", tidx, len(tests), err)
}
}
out := make(chan types.Event)
tomb := tomb.Tomb{}
count := 0
err = fileSrc.StartReading(out, &tomb)
if test.read_error != "" {
assert.Contains(t, fmt.Sprintf("%s", err), test.read_error)
log.Infof("expected read error ok : %s", test.read_error)
continue
} else {
if err != nil {
t.Fatalf("%d/%d unexpected read error %s", tidx, len(tests), err)
}
}
READLOOP:
for {
select {
case <-out:
count++
case <-time.After(1 * time.Second):
break READLOOP
}
}
if count != test.lines {
t.Fatalf("%d/%d expected %d line read, got %d", tidx, len(tests), test.lines, count)
}
if test.tomb_error != "" {
assert.Contains(t, fmt.Sprintf("%s", tomb.Err()), test.tomb_error)
log.Infof("expected tomb error ok : %s", test.read_error)
continue
} else {
if tomb.Err() != nil {
t.Fatalf("%d/%d unexpected tomb error %s", tidx, len(tests), tomb.Err())
}
}
}
}

View file

@ -1,174 +1 @@
package journalctl_acquisition
import (
"bufio"
"encoding/json"
"fmt"
"io"
"os/exec"
"strings"
"time"
log "github.com/sirupsen/logrus"
leaky "github.com/crowdsecurity/crowdsec/pkg/leakybucket"
"github.com/crowdsecurity/crowdsec/pkg/types"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
tomb "gopkg.in/tomb.v2"
)
/*
journald/systemd support :
systemd has its own logging system, which stores files in non-text mode.
To be able to read those, we're going to read the output of journalctl, see https://github.com/crowdsecurity/crowdsec/issues/423
TBD :
- handle journalctl errors
*/
type JournaldSource struct {
Config DataSourceCfg
Cmd *exec.Cmd
Stdout io.ReadCloser
Stderr io.ReadCloser
Decoder *json.Decoder
SrcName string
}
var JOURNALD_CMD = "journalctl"
var JOURNALD_DEFAULT_TAIL_ARGS = []string{"--follow"}
var JOURNALD_DEFAULT_CAT_ARGS = []string{}
func (j *JournaldSource) Configure(config DataSourceCfg) error {
var journalArgs []string
j.Config = config
if config.JournalctlFilters == nil {
return fmt.Errorf("journalctl_filter shouldn't be empty")
}
if j.Config.Mode == TAIL_MODE {
journalArgs = JOURNALD_DEFAULT_TAIL_ARGS
} else if j.Config.Mode == CAT_MODE {
journalArgs = JOURNALD_DEFAULT_CAT_ARGS
} else {
return fmt.Errorf("unknown mode '%s' for journald source", j.Config.Mode)
}
journalArgs = append(journalArgs, config.JournalctlFilters...)
j.Cmd = exec.Command(JOURNALD_CMD, journalArgs...)
j.Stderr, _ = j.Cmd.StderrPipe()
j.Stdout, _ = j.Cmd.StdoutPipe()
j.SrcName = fmt.Sprintf("journalctl-%s", strings.Join(config.JournalctlFilters, "."))
log.Infof("[journald datasource] Configured with filters : %+v", journalArgs)
log.Debugf("cmd path : %s", j.Cmd.Path)
log.Debugf("cmd args : %+v", j.Cmd.Args)
return nil
}
func (j *JournaldSource) Mode() string {
return j.Config.Mode
}
func (j *JournaldSource) readOutput(out chan types.Event, t *tomb.Tomb) error {
/*
todo : handle the channel
*/
clog := log.WithFields(log.Fields{
"acquisition file": j.SrcName,
})
if err := j.Cmd.Start(); err != nil {
clog.Errorf("failed to start journalctl: %s", err)
return errors.Wrapf(err, "starting journalctl (%s)", j.SrcName)
}
readErr := make(chan error)
/*read stderr*/
go func() {
scanner := bufio.NewScanner(j.Stderr)
if scanner == nil {
readErr <- fmt.Errorf("failed to create stderr scanner")
return
}
for scanner.Scan() {
txt := scanner.Text()
clog.Warningf("got stderr message : %s", txt)
readErr <- fmt.Errorf(txt)
}
}()
/*read stdout*/
go func() {
scanner := bufio.NewScanner(j.Stdout)
if scanner == nil {
readErr <- fmt.Errorf("failed to create stdout scanner")
return
}
for scanner.Scan() {
l := types.Line{}
ReaderHits.With(prometheus.Labels{"source": j.SrcName}).Inc()
l.Raw = scanner.Text()
clog.Debugf("getting one line : %s", l.Raw)
l.Labels = j.Config.Labels
l.Time = time.Now()
l.Src = j.SrcName
l.Process = true
evt := types.Event{Line: l, Process: true, Type: types.LOG, ExpectMode: leaky.LIVE}
out <- evt
}
clog.Debugf("finished reading from journalctl")
if err := scanner.Err(); err != nil {
clog.Debugf("got an error while reading %s : %s", j.SrcName, err)
readErr <- err
return
}
readErr <- nil
}()
for {
select {
case <-t.Dying():
clog.Debugf("journalctl datasource %s stopping", j.SrcName)
return nil
case err := <-readErr:
clog.Debugf("the subroutine returned, leave as well")
if err != nil {
clog.Warningf("journalctl reader error : %s", err)
t.Kill(err)
}
return err
}
}
}
func (j *JournaldSource) StartReading(out chan types.Event, t *tomb.Tomb) error {
if j.Config.Mode == CAT_MODE {
return j.StartCat(out, t)
} else if j.Config.Mode == TAIL_MODE {
return j.StartTail(out, t)
} else {
return fmt.Errorf("unknown mode '%s' for file acquisition", j.Config.Mode)
}
}
func (j *JournaldSource) StartCat(out chan types.Event, t *tomb.Tomb) error {
t.Go(func() error {
defer types.CatchPanic("crowdsec/acquis/tailjournalctl")
return j.readOutput(out, t)
})
return nil
}
func (j *JournaldSource) StartTail(out chan types.Event, t *tomb.Tomb) error {
t.Go(func() error {
defer types.CatchPanic("crowdsec/acquis/catjournalctl")
return j.readOutput(out, t)
})
return nil
}