This commit is contained in:
bui 2021-04-30 16:44:24 +02:00
parent 29e08a3323
commit a6a5024355
4 changed files with 134 additions and 99 deletions

View file

@ -11,7 +11,6 @@ import (
"sort"
"github.com/crowdsecurity/crowdsec/pkg/acquisition"
"github.com/crowdsecurity/crowdsec/pkg/acquisition/configuration"
"github.com/crowdsecurity/crowdsec/pkg/csconfig"
"github.com/crowdsecurity/crowdsec/pkg/cwhub"
"github.com/crowdsecurity/crowdsec/pkg/cwversion"
@ -46,18 +45,16 @@ var (
)
type Flags struct {
ConfigFile string
TraceLevel bool
DebugLevel bool
InfoLevel bool
PrintVersion bool
SingleFilePath string
SingleJournalctlFilter string
SingleFileType string
SingleFileJsonOutput string
TestMode bool
DisableAgent bool
DisableAPI bool
ConfigFile string
TraceLevel bool
DebugLevel bool
InfoLevel bool
PrintVersion bool
SingleFileType string
OneShotDSN string
TestMode bool
DisableAgent bool
DisableAPI bool
}
type parsers struct {
@ -140,41 +137,46 @@ func LoadBuckets(cConfig *csconfig.Config) error {
func LoadAcquisition(cConfig *csconfig.Config) error {
var err error
if flags.SingleFilePath != "" || flags.SingleJournalctlFilter != "" {
if flags.SingleFileType != "" || flags.OneShotDSN == "" {
tmpCfg := configuration.DataSourceCommonCfg{}
tmpCfg.Mode = configuration.CAT_MODE
tmpCfg.Labels = map[string]string{"type": flags.SingleFileType}
// tmpCfg := configuration.DataSourceCommonCfg{}
// tmpCfg.Labels = map[string]string{"type": flags.SingleFileType}
var craftConf []byte
var acquisType string
if flags.SingleFilePath != "" {
acquisType = "file"
craftConf = []byte(fmt.Sprintf(`
filename: %s
labels:
type: %s
type: file
mode: cat`, flags.SingleFilePath, flags.SingleFileType))
} else if flags.SingleJournalctlFilter != "" {
acquisType = "journald"
craftConf = []byte(fmt.Sprintf(`
journalctl_filter: %s
labels:
type: %s
type: journald
mode: cat`, flags.SingleJournalctlFilter, flags.SingleFileType))
if flags.OneShotDSN == "" || flags.SingleFileType == "" {
return fmt.Errorf("-type requires a -dsn argument")
}
tmpCfg.Type = acquisType
datasrc, err := acquisition.DataSourceConfigure(craftConf, tmpCfg)
if err != nil {
return fmt.Errorf("while configuring specified file datasource : %s", err)
}
if dataSources == nil {
dataSources = make([]acquisition.DataSource, 0)
}
dataSources = append(dataSources, *datasrc)
dataSources, err = acquisition.LoadAcquisitionFromDSN(flags.OneShotDSN, flags.SingleFileType)
log.Fatalf("set type ?")
// var craftConf []byte
// var acquisType string
// if flags.SingleFilePath != "" {
// acquisType = "file"
// craftConf = []byte(fmt.Sprintf(`
// filename: %s
// labels:
// type: %s
// type: file
// mode: cat`, flags.SingleFilePath, flags.SingleFileType))
// } else if flags.SingleJournalctlFilter != "" {
// acquisType = "journald"
// craftConf = []byte(fmt.Sprintf(`
// journalctl_filter: %s
// labels:
// type: %s
// type: journald
// mode: cat`, flags.SingleJournalctlFilter, flags.SingleFileType))
// }
// tmpCfg.Source = acquisType
// datasrc, err := acquisition.DataSourceConfigure(craftConf, tmpCfg)
// if err != nil {
// return fmt.Errorf("while configuring specified file datasource : %s", err)
// }
// if dataSources == nil {
// dataSources = make([]acquisition.DataSource, 0)
// }
// dataSources = append(dataSources, *datasrc)
} else {
dataSources, err = acquisition.LoadAcquisitionFromFile(cConfig.Crowdsec)
if err != nil {
@ -192,8 +194,7 @@ func (f *Flags) Parse() {
flag.BoolVar(&f.DebugLevel, "debug", false, "print debug-level on stdout")
flag.BoolVar(&f.InfoLevel, "info", false, "print info-level on stdout")
flag.BoolVar(&f.PrintVersion, "version", false, "display version")
flag.StringVar(&f.SingleFilePath, "file", "", "Process a single file in time-machine")
flag.StringVar(&f.SingleJournalctlFilter, "jfilter", "", "Process a single journalctl output in time-machine")
flag.StringVar(&f.OneShotDSN, "dsn", "", "Process a single data source in time-machine")
flag.StringVar(&f.SingleFileType, "type", "", "Labels.type for file in time-machine")
flag.BoolVar(&f.TestMode, "t", false, "only test configs")
flag.BoolVar(&f.DisableAgent, "no-cs", false, "disable crowdsec agent")
@ -225,17 +226,17 @@ func LoadConfig(cConfig *csconfig.Config) error {
log.Fatalf("You must run at least the API Server or crowdsec")
}
if flags.SingleFilePath != "" {
if flags.SingleFileType == "" {
return fmt.Errorf("-file requires -type")
}
}
// if flags.SingleFilePath != "" {
// if flags.SingleFileType == "" {
// return fmt.Errorf("-file requires -type")
// }
// }
if flags.SingleJournalctlFilter != "" {
if flags.SingleFileType == "" {
return fmt.Errorf("-jfilter requires -type")
}
}
// if flags.SingleJournalctlFilter != "" {
// if flags.SingleFileType == "" {
// return fmt.Errorf("-jfilter requires -type")
// }
// }
if flags.DebugLevel {
logLevel := log.DebugLevel
@ -254,7 +255,7 @@ func LoadConfig(cConfig *csconfig.Config) error {
cConfig.Crowdsec.LintOnly = true
}
if flags.SingleFilePath != "" || flags.SingleJournalctlFilter != "" {
if flags.SingleFileType != "" && flags.OneShotDSN != "" {
cConfig.API.Server.OnlineClient = nil
/*if the api is disabled as well, just read file and exit, don't daemonize*/
if flags.DisableAPI {

View file

@ -4,6 +4,7 @@ import (
"fmt"
"io"
"os"
"strings"
"github.com/crowdsecurity/crowdsec/pkg/acquisition/configuration"
file_acquisition "github.com/crowdsecurity/crowdsec/pkg/acquisition/modules/file"
@ -41,6 +42,14 @@ var ReaderHits = prometheus.NewCounterVec(
source: files
filenames:
- "/var/log/nginx/*.log"
---
type: nginx
source: file
file:
filenames:
- /var/log/xxx
```
!!! how to handle expect mode that is not directly linked to tail/cat mode
@ -65,7 +74,7 @@ cat mode will return once source has been exhausted.
type DataSource interface {
GetMetrics() []prometheus.Collector // Returns pointers to metrics that are managed by the module
Configure([]byte, *log.Entry) error // Configure the datasource
ConfigureByDSN(string, *log.Entry) error // Configure the datasource
ConfigureByDSN(string, string, *log.Entry) error // Configure the datasource
GetMode() string // Get the mode (TAIL, CAT or SERVER)
SupportedModes() []string // TO REMOVE : Returns the mode supported by the datasource
SupportedDSN() []string // Returns the list of supported URI schemes (file:// s3:// ...)
@ -94,7 +103,14 @@ func GetDataSourceIface(dataSourceType string) DataSource {
return nil
}
func DataSourceConfigure(yamlConfig []byte, commonConfig configuration.DataSourceCommonCfg) (*DataSource, error) {
func DataSourceConfigure(commonConfig configuration.DataSourceCommonCfg) (*DataSource, error) {
//we dump it back to []byte, because we want to decode the yaml blob twice :
//once to DataSourceCommonCfg, and then later to the dedicated type of the datasource
yamlConfig, err := yaml.Marshal(commonConfig)
if err != nil {
return nil, errors.Wrap(err, "unable to marshal back interface")
}
if dataSrc := GetDataSourceIface(commonConfig.Source); dataSrc != nil {
/* this logger will then be used by the datasource at runtime */
clog := log.New()
@ -122,23 +138,47 @@ func DataSourceConfigure(yamlConfig []byte, commonConfig configuration.DataSourc
}
//detectBackwardCompatAcquis : try to magically detect the type for backward compat (type was not mandatory then)
func detectBackwardCompatAcquis(raw []byte) string {
var out map[string]interface{}
if err := yaml.Unmarshal(raw, &out); err != nil {
return ""
}
if _, ok := out["filename"]; ok {
func detectBackwardCompatAcquis(sub configuration.DataSourceCommonCfg) string {
if _, ok := sub.Config["filename"]; ok {
return "file"
}
if _, ok := out["filenames"]; ok {
if _, ok := sub.Config["filenames"]; ok {
return "file"
}
if _, ok := out["journalctl_filter"]; ok {
if _, ok := sub.Config["journalctl_filter"]; ok {
return "journalctl"
}
return ""
}
func LoadAcquisitionFromDSN(dsn string, label string) ([]DataSource, error) {
var sources []DataSource
frags := strings.Split(dsn, ":")
if len(frags) == 1 {
return nil, fmt.Errorf("%s isn't valid dsn (no protocol)", dsn)
}
dataSrc := GetDataSourceIface(frags[0])
if dataSrc == nil {
return nil, fmt.Errorf("no acquisition for protocol %s://", frags[0])
}
/* this logger will then be used by the datasource at runtime */
clog := log.New()
if err := types.ConfigureLogger(clog); err != nil {
return nil, errors.Wrap(err, "while configuring datasource logger")
}
subLogger := clog.WithFields(log.Fields{
"type": dsn,
})
err := dataSrc.ConfigureByDSN(dsn, label, subLogger)
if err != nil {
return nil, errors.Wrapf(err, "while configuration datasource for %s", dsn)
}
sources = append(sources, dataSrc)
return sources, nil
}
// LoadAcquisitionFromFile unmarshals the configuration item and checks its availability
func LoadAcquisitionFromFile(config *csconfig.CrowdsecServiceCfg) ([]DataSource, error) {
@ -151,11 +191,10 @@ func LoadAcquisitionFromFile(config *csconfig.CrowdsecServiceCfg) ([]DataSource,
return nil, errors.Wrapf(err, "can't open %s", acquisFile)
}
dec := yaml.NewDecoder(yamlFile)
dec.SetStrict(false)
dec.SetStrict(true)
for {
var sub configuration.DataSourceCommonCfg
var holder interface{}
err = dec.Decode(&holder)
err = dec.Decode(&sub)
if err != nil {
if err == io.EOF {
log.Tracef("End of yaml file")
@ -164,18 +203,8 @@ func LoadAcquisitionFromFile(config *csconfig.CrowdsecServiceCfg) ([]DataSource,
return nil, errors.Wrapf(err, "failed to yaml decode %s", acquisFile)
}
//we dump it back to []byte, because we want to decode the yaml blob twice :
//once to DataSourceCommonCfg, and then later to the dedicated type of the datasource
inBytes, err := yaml.Marshal(holder)
if err != nil {
return nil, errors.Wrap(err, "unable to marshal back interface")
}
if err := yaml.Unmarshal(inBytes, &sub); err != nil {
return nil, errors.Wrapf(err, "configuration isn't valid config in %s : %s", acquisFile, string(inBytes))
}
//for backward compat ('type' was not mandatory, detect it)
if guessType := detectBackwardCompatAcquis(inBytes); guessType != "" {
if guessType := detectBackwardCompatAcquis(sub); guessType != "" {
sub.Source = guessType
}
//it's an empty item, skip it
@ -190,7 +219,7 @@ func LoadAcquisitionFromFile(config *csconfig.CrowdsecServiceCfg) ([]DataSource,
if GetDataSourceIface(sub.Source) == nil {
return nil, fmt.Errorf("unknown data source %s in %s", sub.Source, acquisFile)
}
src, err := DataSourceConfigure(inBytes, sub)
src, err := DataSourceConfigure(sub)
if err != nil {
return nil, errors.Wrapf(err, "while configuring datasource of type %s from %s", sub.Source, acquisFile)
}

View file

@ -47,7 +47,9 @@ func (f *MockSource) CanRun() error { re
func (f *MockSource) GetMetrics() []prometheus.Collector { return nil }
func (f *MockSource) Dump() interface{} { return f }
func (f *MockSource) SupportedDSN() []string { return []string{"mock://"} }
func (f *MockSource) ConfigureByDSN(string, *log.Entry) error { return fmt.Errorf("not supported") }
func (f *MockSource) ConfigureByDSN(string, string, *log.Entry) error {
return fmt.Errorf("not supported")
}
//func (f *MockSource) New() DataSource { return &MockSource{} }
@ -175,7 +177,7 @@ wowo: ajsajasjas
for _, test := range tests {
common := configuration.DataSourceCommonCfg{}
yaml.Unmarshal(test.RawBytes, &common)
ds, err := DataSourceConfigure(test.RawBytes, common)
ds, err := DataSourceConfigure(common)
if test.ExpectedError != "" {
if err == nil {
t.Fatalf("expected error %s, got none", test.ExpectedError)
@ -235,7 +237,7 @@ func TestLoadAcquisitionFromFile(t *testing.T) {
Config: csconfig.CrowdsecServiceCfg{
AcquisitionFiles: []string{"test_files/badyaml.yaml"},
},
ExpectedError: "configuration isn't valid config in test_files/badyaml.yaml",
ExpectedError: "failed to yaml decode test_files/badyaml.yaml: yaml: unmarshal errors",
ExpectedLen: 0,
},
{
@ -338,11 +340,11 @@ func (f *MockCat) OneShotAcquisition(out chan types.Event, tomb *tomb.Tomb) erro
func (f *MockCat) LiveAcquisition(chan types.Event, *tomb.Tomb) error {
return fmt.Errorf("can't run in tail")
}
func (f *MockCat) CanRun() error { return nil }
func (f *MockCat) GetMetrics() []prometheus.Collector { return nil }
func (f *MockCat) Dump() interface{} { return f }
func (f *MockCat) SupportedDSN() []string { return []string{"mock://"} }
func (f *MockCat) ConfigureByDSN(string, *log.Entry) error { return fmt.Errorf("not supported") }
func (f *MockCat) CanRun() error { return nil }
func (f *MockCat) GetMetrics() []prometheus.Collector { return nil }
func (f *MockCat) Dump() interface{} { return f }
func (f *MockCat) SupportedDSN() []string { return []string{"mock://"} }
func (f *MockCat) ConfigureByDSN(string, string, *log.Entry) error { return fmt.Errorf("not supported") }
//----
@ -377,11 +379,13 @@ func (f *MockTail) LiveAcquisition(out chan types.Event, t *tomb.Tomb) error {
return nil
}
}
func (f *MockTail) CanRun() error { return nil }
func (f *MockTail) GetMetrics() []prometheus.Collector { return nil }
func (f *MockTail) Dump() interface{} { return f }
func (f *MockTail) SupportedDSN() []string { return []string{"mock://"} }
func (f *MockTail) ConfigureByDSN(string, *log.Entry) error { return fmt.Errorf("not supported") }
func (f *MockTail) CanRun() error { return nil }
func (f *MockTail) GetMetrics() []prometheus.Collector { return nil }
func (f *MockTail) Dump() interface{} { return f }
func (f *MockTail) SupportedDSN() []string { return []string{"mock://"} }
func (f *MockTail) ConfigureByDSN(string, string, *log.Entry) error {
return fmt.Errorf("not supported")
}
//func StartAcquisition(sources []DataSource, output chan types.Event, AcquisTomb *tomb.Tomb) error {

View file

@ -5,10 +5,11 @@ import (
)
type DataSourceCommonCfg struct {
Mode string `yaml:"mode,omitempty"`
Labels map[string]string `yaml:"labels,omitempty"`
LogLevel *log.Level `yaml:"log_level,omitempty"`
Source string `yaml:"source,omitempty"`
Mode string `yaml:"mode,omitempty"`
Labels map[string]string `yaml:"labels,omitempty"`
LogLevel *log.Level `yaml:"log_level,omitempty"`
Source string `yaml:"source,omitempty"`
Config map[string]interface{} `yaml:",inline"` //to keep the datasource-specific configuration directives
}
var TAIL_MODE = "tail"