up
This commit is contained in:
parent
e72bb274ad
commit
982c3388c8
4 changed files with 233 additions and 90 deletions
|
@ -6,7 +6,6 @@ import (
|
|||
"os"
|
||||
|
||||
"github.com/crowdsecurity/crowdsec/pkg/acquisition/configuration"
|
||||
file_acquisition "github.com/crowdsecurity/crowdsec/pkg/acquisition/modules/file"
|
||||
"github.com/crowdsecurity/crowdsec/pkg/csconfig"
|
||||
"github.com/crowdsecurity/crowdsec/pkg/types"
|
||||
"github.com/pkg/errors"
|
||||
|
@ -42,6 +41,8 @@ var ReaderHits = prometheus.NewCounterVec(
|
|||
filenames:
|
||||
- "/var/log/nginx/*.log"
|
||||
```
|
||||
|
||||
!!! how to handle expect mode that is not directly linked to tail/cat mode
|
||||
*/
|
||||
|
||||
/* Approach
|
||||
|
@ -68,30 +69,30 @@ type DataSource interface {
|
|||
OneShotAcquisition(chan types.Event, *tomb.Tomb) error // Start one shot acquisition(eg, cat a file)
|
||||
LiveAcquisition(chan types.Event, *tomb.Tomb) error // Start live acquisition (eg, tail a file)
|
||||
CanRun() error // Whether the datasource can run or not (eg, journalctl on BSD is a non-sense)
|
||||
Dump() interface{}
|
||||
New() DataSource
|
||||
}
|
||||
|
||||
var AcquisitionSources = []struct {
|
||||
name string
|
||||
iface DataSource
|
||||
iface func() DataSource
|
||||
}{
|
||||
{
|
||||
name: "file",
|
||||
iface: &file_acquisition.FileSource{},
|
||||
},
|
||||
// {
|
||||
// name: "file",
|
||||
// iface: &file_acquisition.FileSource{},
|
||||
// },
|
||||
}
|
||||
|
||||
func GetDataSourceIface(dataSourceType string) DataSource {
|
||||
for _, source := range AcquisitionSources {
|
||||
if source.name == dataSourceType {
|
||||
newsrc := source.iface
|
||||
return newsrc
|
||||
return source.iface.New()
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func DataSourceConfigure(yamlConfig []byte, commonConfig configuration.DataSourceCommonCfg) (*DataSource, error) {
|
||||
|
||||
if dataSrc := GetDataSourceIface(commonConfig.Type); dataSrc != nil {
|
||||
/* this logger will then be used by the datasource at runtime */
|
||||
clog := log.New()
|
||||
|
@ -104,7 +105,6 @@ func DataSourceConfigure(yamlConfig []byte, commonConfig configuration.DataSourc
|
|||
subLogger := clog.WithFields(log.Fields{
|
||||
"type": commonConfig.Type,
|
||||
})
|
||||
|
||||
/* check eventual dependencies are satisfied (ie. journald will check journalctl availability) */
|
||||
if err := dataSrc.CanRun(); err != nil {
|
||||
return nil, errors.Wrapf(err, "datasource %s cannot be run", commonConfig.Type)
|
||||
|
@ -135,16 +135,15 @@ func DataSourceConfigure(yamlConfig []byte, commonConfig configuration.DataSourc
|
|||
func LoadAcquisitionFromFile(config *csconfig.CrowdsecServiceCfg) ([]DataSource, error) {
|
||||
|
||||
var sources []DataSource
|
||||
var acquisSources = config.AcquisitionFiles
|
||||
|
||||
for _, acquisFile := range acquisSources {
|
||||
for _, acquisFile := range config.AcquisitionFiles {
|
||||
log.Infof("loading acquisition file : %s", acquisFile)
|
||||
yamlFile, err := os.Open(acquisFile)
|
||||
if err != nil {
|
||||
return nil, errors.Wrapf(err, "can't open %s", acquisFile)
|
||||
}
|
||||
dec := yaml.NewDecoder(yamlFile)
|
||||
dec.SetStrict(false)
|
||||
dec.SetStrict(true)
|
||||
for {
|
||||
var sub configuration.DataSourceCommonCfg
|
||||
var holder interface{}
|
||||
|
@ -156,6 +155,20 @@ func LoadAcquisitionFromFile(config *csconfig.CrowdsecServiceCfg) ([]DataSource,
|
|||
}
|
||||
return nil, errors.Wrapf(err, "failed to yaml decode %s", sub.ConfigFile)
|
||||
}
|
||||
log.Printf("%s -> %T", acquisFile, holder)
|
||||
|
||||
switch holder.(type) {
|
||||
case map[interface{}]interface{}:
|
||||
//leftover empty item
|
||||
if len(holder.(map[interface{}]interface{})) == 0 {
|
||||
log.Printf("leftover empty item")
|
||||
break
|
||||
}
|
||||
}
|
||||
//the user let a trailing `---` at the end of the file, and the last item is empty
|
||||
// if len(holder.(map[interface{}]interface{})) == 0 {
|
||||
// continue
|
||||
// }
|
||||
//we dump it back to []byte, because we want to decode the yaml blob twice :
|
||||
//once to DataSourceCommonCfg, and then later to the dedicated type of the datasource
|
||||
inBytes, err := yaml.Marshal(holder)
|
||||
|
@ -176,7 +189,7 @@ func LoadAcquisitionFromFile(config *csconfig.CrowdsecServiceCfg) ([]DataSource,
|
|||
sub.Mode = configuration.TAIL_MODE
|
||||
}
|
||||
if GetDataSourceIface(sub.Type) == nil {
|
||||
log.Errorf("unknown data source %s in %s", sub.Type, sub.ConfigFile)
|
||||
return nil, fmt.Errorf("unknown data source %s in %s", sub.Type, sub.ConfigFile)
|
||||
}
|
||||
|
||||
src, err := DataSourceConfigure(inBytes, sub)
|
||||
|
|
|
@ -2,33 +2,29 @@ package acquisition
|
|||
|
||||
import (
|
||||
"fmt"
|
||||
"strings"
|
||||
"testing"
|
||||
|
||||
"github.com/crowdsecurity/crowdsec/pkg/acquisition/configuration"
|
||||
"github.com/crowdsecurity/crowdsec/pkg/csconfig"
|
||||
"github.com/crowdsecurity/crowdsec/pkg/types"
|
||||
"github.com/davecgh/go-spew/spew"
|
||||
"github.com/pkg/errors"
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
log "github.com/sirupsen/logrus"
|
||||
tomb "gopkg.in/tomb.v2"
|
||||
"gopkg.in/yaml.v2"
|
||||
"gotest.tools/v3/assert"
|
||||
)
|
||||
|
||||
type MockSource struct {
|
||||
configuration.DataSourceCommonCfg
|
||||
logger *log.Entry `yaml:"-"`
|
||||
Toto string `yaml:"toto"`
|
||||
configuration.DataSourceCommonCfg `yaml:",inline"`
|
||||
Toto string `yaml:"toto"`
|
||||
logger *log.Entry
|
||||
}
|
||||
|
||||
func (f *MockSource) GetMetrics() []prometheus.Collector {
|
||||
return nil
|
||||
}
|
||||
func (f *MockSource) Configure(cfg []byte, logger *log.Entry) error {
|
||||
f.logger = logger
|
||||
f.logger.Infof("config called, bytes: %s", string(cfg))
|
||||
f.logger.Tracef("this is trace!")
|
||||
|
||||
if err := yaml.Unmarshal(cfg, f); err != nil {
|
||||
if err := yaml.UnmarshalStrict(cfg, &f); err != nil {
|
||||
return errors.Wrap(err, "while unmarshaling to reader specific config")
|
||||
}
|
||||
if f.Toto == "" {
|
||||
|
@ -36,24 +32,22 @@ func (f *MockSource) Configure(cfg []byte, logger *log.Entry) error {
|
|||
}
|
||||
return nil
|
||||
}
|
||||
func (f *MockSource) GetMode() string {
|
||||
f.logger.Debugf("called")
|
||||
return f.Mode
|
||||
}
|
||||
func (f *MockSource) SupportedModes() []string {
|
||||
return []string{"tail", "cat"}
|
||||
}
|
||||
func (f *MockSource) OneShotAcquisition(chan types.Event, *tomb.Tomb) error {
|
||||
return nil
|
||||
func (f *MockSource) GetMode() string { return f.Mode }
|
||||
func (f *MockSource) SupportedModes() []string { return []string{"tail", "cat"} }
|
||||
func (f *MockSource) OneShotAcquisition(chan types.Event, *tomb.Tomb) error { return nil }
|
||||
func (f *MockSource) LiveAcquisition(chan types.Event, *tomb.Tomb) error { return nil }
|
||||
func (f *MockSource) CanRun() error { return nil }
|
||||
func (f *MockSource) GetMetrics() []prometheus.Collector { return nil }
|
||||
func (f *MockSource) Dump() interface{} { return f }
|
||||
func (f *MockSource) New() DataSource { return &MockSource{} }
|
||||
|
||||
//copy the mocksource, but this one can't run
|
||||
type MockSourceCantRun struct {
|
||||
MockSource
|
||||
}
|
||||
|
||||
func (f *MockSource) LiveAcquisition(chan types.Event, *tomb.Tomb) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (f *MockSource) CanRun() error {
|
||||
return nil
|
||||
}
|
||||
func (f *MockSourceCantRun) CanRun() error { return fmt.Errorf("can't run bro") }
|
||||
func (f *MockSourceCantRun) New() DataSource { return &MockSourceCantRun{} }
|
||||
|
||||
//appendMockSource is only used to add mock source for tests
|
||||
func appendMockSource() {
|
||||
|
@ -64,45 +58,197 @@ func appendMockSource() {
|
|||
}{name: "mock", iface: &MockSource{}}
|
||||
AcquisitionSources = append(AcquisitionSources, mock)
|
||||
}
|
||||
if GetDataSourceIface("mock_cant_run") == nil {
|
||||
mock := struct {
|
||||
name string
|
||||
iface DataSource
|
||||
}{name: "mock_cant_run", iface: &MockSourceCantRun{}}
|
||||
AcquisitionSources = append(AcquisitionSources, mock)
|
||||
}
|
||||
}
|
||||
|
||||
func TestLoadAcquisition(t *testing.T) {
|
||||
|
||||
func TestDataSourceConfigure(t *testing.T) {
|
||||
appendMockSource()
|
||||
|
||||
config := []byte(
|
||||
`
|
||||
tests := []struct {
|
||||
TestName string
|
||||
RawBytes []byte
|
||||
ExpectedError string
|
||||
}{
|
||||
{
|
||||
TestName: "basic_valid_config",
|
||||
RawBytes: []byte(`
|
||||
mode: cat
|
||||
labels:
|
||||
test: foobar
|
||||
log_level: info
|
||||
type: mock
|
||||
toto: test_value1
|
||||
`),
|
||||
},
|
||||
{
|
||||
TestName: "basic_debug_config",
|
||||
RawBytes: []byte(`
|
||||
mode: cat
|
||||
labels:
|
||||
test: foobar
|
||||
log_level: debug
|
||||
type: mock
|
||||
toto: test_value1
|
||||
`),
|
||||
},
|
||||
{
|
||||
TestName: "basic_tailmode_config",
|
||||
RawBytes: []byte(`
|
||||
mode: tail
|
||||
labels:
|
||||
toto: tutu
|
||||
test: foobar
|
||||
log_level: debug
|
||||
type: mock
|
||||
toto: foobar
|
||||
log_level: trace
|
||||
`)
|
||||
var generic interface{}
|
||||
var common configuration.DataSourceCommonCfg
|
||||
|
||||
if err := yaml.Unmarshal(config, &generic); err != nil {
|
||||
t.Fatalf("failed to unmarshal %s : %s", config, err)
|
||||
toto: test_value1
|
||||
`),
|
||||
},
|
||||
{
|
||||
TestName: "bad_mode_config",
|
||||
RawBytes: []byte(`
|
||||
mode: ratata
|
||||
labels:
|
||||
test: foobar
|
||||
log_level: debug
|
||||
type: mock
|
||||
toto: test_value1
|
||||
`),
|
||||
ExpectedError: "ratata mode is not supported by mock",
|
||||
},
|
||||
{
|
||||
TestName: "bad_type_config",
|
||||
RawBytes: []byte(`
|
||||
mode: ratata
|
||||
labels:
|
||||
test: foobar
|
||||
log_level: debug
|
||||
type: tutu
|
||||
`),
|
||||
ExpectedError: "cannot find source tutu",
|
||||
},
|
||||
{
|
||||
TestName: "mismatch_config",
|
||||
RawBytes: []byte(`
|
||||
mode: cat
|
||||
labels:
|
||||
test: foobar
|
||||
log_level: debug
|
||||
type: mock
|
||||
wowo: ajsajasjas
|
||||
`),
|
||||
ExpectedError: "field wowo not found in type acquisition.MockSource",
|
||||
},
|
||||
{
|
||||
TestName: "cant_run_error",
|
||||
RawBytes: []byte(`
|
||||
mode: cat
|
||||
labels:
|
||||
test: foobar
|
||||
log_level: debug
|
||||
type: mock_cant_run
|
||||
wowo: ajsajasjas
|
||||
`),
|
||||
ExpectedError: "datasource mock_cant_run cannot be run: can't run bro",
|
||||
},
|
||||
}
|
||||
|
||||
outBytes, err := yaml.Marshal(generic)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
for _, test := range tests {
|
||||
common := configuration.DataSourceCommonCfg{}
|
||||
yaml.Unmarshal(test.RawBytes, &common)
|
||||
ds, err := DataSourceConfigure(test.RawBytes, common)
|
||||
if test.ExpectedError != "" {
|
||||
if err == nil {
|
||||
t.Fatalf("expected error %s, got none", test.ExpectedError)
|
||||
}
|
||||
if !strings.Contains(err.Error(), test.ExpectedError) {
|
||||
t.Fatalf("%s : expected error '%s' in '%s'", test.TestName, test.ExpectedError, err.Error())
|
||||
} else {
|
||||
continue
|
||||
}
|
||||
} else {
|
||||
if err != nil {
|
||||
t.Fatalf("%s : unexpected error '%s'", test.TestName, err)
|
||||
}
|
||||
}
|
||||
|
||||
switch test.TestName {
|
||||
case "basic_valid_config":
|
||||
mock := (*ds).Dump().(*MockSource)
|
||||
assert.Equal(t, mock.Toto, "test_value1")
|
||||
assert.Equal(t, mock.Mode, "cat")
|
||||
assert.Equal(t, mock.logger.Logger.Level, log.InfoLevel)
|
||||
assert.DeepEqual(t, mock.Labels, map[string]string{"test": "foobar"})
|
||||
case "basic_debug_config":
|
||||
mock := (*ds).Dump().(*MockSource)
|
||||
assert.Equal(t, mock.Toto, "test_value1")
|
||||
assert.Equal(t, mock.Mode, "cat")
|
||||
assert.Equal(t, mock.logger.Logger.Level, log.DebugLevel)
|
||||
assert.DeepEqual(t, mock.Labels, map[string]string{"test": "foobar"})
|
||||
case "basic_tailmode_config":
|
||||
mock := (*ds).Dump().(*MockSource)
|
||||
assert.Equal(t, mock.Toto, "test_value1")
|
||||
assert.Equal(t, mock.Mode, "tail")
|
||||
assert.Equal(t, mock.logger.Logger.Level, log.DebugLevel)
|
||||
assert.DeepEqual(t, mock.Labels, map[string]string{"test": "foobar"})
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestLoadAcquisitionFromFile(t *testing.T) {
|
||||
appendMockSource()
|
||||
tests := []struct {
|
||||
TestName string
|
||||
Config csconfig.CrowdsecServiceCfg
|
||||
ExpectedError string
|
||||
ExpectedLen int
|
||||
}{
|
||||
{
|
||||
TestName: "non_existent_file",
|
||||
Config: csconfig.CrowdsecServiceCfg{
|
||||
AcquisitionFiles: []string{"does_not_exist"},
|
||||
},
|
||||
ExpectedError: "can't open does_not_exist",
|
||||
ExpectedLen: 0,
|
||||
},
|
||||
{
|
||||
TestName: "invalid_yaml_file",
|
||||
Config: csconfig.CrowdsecServiceCfg{
|
||||
AcquisitionFiles: []string{"test_files/badyaml.yaml"},
|
||||
},
|
||||
ExpectedError: "configuration isn't valid config in test_files/badyaml.yaml",
|
||||
ExpectedLen: 0,
|
||||
},
|
||||
{
|
||||
TestName: "invalid_empty_yaml",
|
||||
Config: csconfig.CrowdsecServiceCfg{
|
||||
AcquisitionFiles: []string{"test_files/emptyitem.yaml"},
|
||||
},
|
||||
ExpectedLen: 0,
|
||||
},
|
||||
}
|
||||
for _, test := range tests {
|
||||
dss, err := LoadAcquisitionFromFile(&test.Config)
|
||||
if test.ExpectedError != "" {
|
||||
if err == nil {
|
||||
t.Fatalf("expected error %s, got none", test.ExpectedError)
|
||||
}
|
||||
if !strings.Contains(err.Error(), test.ExpectedError) {
|
||||
t.Fatalf("%s : expected error '%s' in '%s'", test.TestName, test.ExpectedError, err.Error())
|
||||
} else {
|
||||
continue
|
||||
}
|
||||
} else {
|
||||
if err != nil {
|
||||
t.Fatalf("%s : unexpected error '%s'", test.TestName, err)
|
||||
}
|
||||
}
|
||||
if len(dss) != test.ExpectedLen {
|
||||
t.Fatalf("%s : expected %d datasources got %d", test.TestName, test.ExpectedLen, len(dss))
|
||||
}
|
||||
|
||||
}
|
||||
log.Printf("-> %s", outBytes)
|
||||
|
||||
if err := yaml.Unmarshal(outBytes, &common); err != nil {
|
||||
t.Fatalf("cannot unmarshal to generic : %s", err)
|
||||
}
|
||||
|
||||
// if generic == nil {
|
||||
// t.Fatalf("result of unmarshal is empty :(")
|
||||
// }
|
||||
// log.Printf("raw iface : %s", spew.Sdump(generic))
|
||||
// common = generic.(configuration.DataSourceCommonCfg)
|
||||
|
||||
ds, err := DataSourceConfigure(outBytes, common)
|
||||
log.Printf("-> ds : %s", spew.Sdump(ds))
|
||||
log.Printf("-> err : %s", err)
|
||||
}
|
||||
|
|
|
@ -10,6 +10,7 @@ type DataSourceCommonCfg struct {
|
|||
LogLevel *log.Level `yaml:"log_level,omitempty"`
|
||||
Type string `yaml:"type,omitempty"`
|
||||
ConfigFile string `yaml:"-"` //filled at run time : the filepath from which the config was unmarshaled
|
||||
//logger *log.Entry `yaml:"-"`
|
||||
}
|
||||
|
||||
var TAIL_MODE = "tail"
|
||||
|
|
|
@ -1,17 +0,0 @@
|
|||
package acquisition
|
||||
|
||||
// import file_acquisition "github.com/crowdsecurity/crowdsec/pkg/acquisition/modules/file"
|
||||
|
||||
// type DataSourceMap struct {
|
||||
// Datasources map[string]DataSource
|
||||
// }
|
||||
|
||||
// func (d *DataSourceMap) GetDataSource(dstype string) *DataSource {
|
||||
// datasource := d.Datasources[dstype]
|
||||
// return &datasource
|
||||
// }
|
||||
|
||||
// func (d *DataSourceMap) New() {
|
||||
// m := make(map[string]DataSource)
|
||||
// m["file"] = &file_acquisition.FileSource{}
|
||||
// }
|
Loading…
Add table
Reference in a new issue