|
@@ -34,6 +34,20 @@ import (
|
|
|
"github.com/crowdsecurity/crowdsec/pkg/types"
|
|
|
)
|
|
|
|
|
|
+type DataSourceUnavailableError struct {
|
|
|
+ Name string
|
|
|
+ Err error
|
|
|
+}
|
|
|
+
|
|
|
+func (e *DataSourceUnavailableError) Error() string {
|
|
|
+ return fmt.Sprintf("datasource '%s' is not available: %v", e.Name, e.Err)
|
|
|
+}
|
|
|
+
|
|
|
+func (e *DataSourceUnavailableError) Unwrap() error {
|
|
|
+ return e.Err
|
|
|
+}
|
|
|
+
|
|
|
+
|
|
|
// The interface each datasource must implement
|
|
|
type DataSource interface {
|
|
|
GetMetrics() []prometheus.Collector // Returns pointers to metrics that are managed by the module
|
|
@@ -73,6 +87,10 @@ func GetDataSourceIface(dataSourceType string) DataSource {
|
|
|
return source()
|
|
|
}
|
|
|
|
|
|
+// DataSourceConfigure creates and returns a DataSource object from a configuration,
|
|
|
+// if the configuration is not valid it returns an error.
|
|
|
+// If the datasource can't be run (eg. journalctl not available), it still returns an error which
|
|
|
+// can be checked for the appropriate action.
|
|
|
func DataSourceConfigure(commonConfig configuration.DataSourceCommonCfg) (*DataSource, error) {
|
|
|
// we dump it back to []byte, because we want to decode the yaml blob twice:
|
|
|
// once to DataSourceCommonCfg, and then later to the dedicated type of the datasource
|
|
@@ -98,7 +116,7 @@ func DataSourceConfigure(commonConfig configuration.DataSourceCommonCfg) (*DataS
|
|
|
subLogger := clog.WithFields(customLog)
|
|
|
/* check eventual dependencies are satisfied (ie. journald will check journalctl availability) */
|
|
|
if err := dataSrc.CanRun(); err != nil {
|
|
|
- return nil, fmt.Errorf("datasource %s cannot be run: %w", commonConfig.Source, err)
|
|
|
+ return nil, &DataSourceUnavailableError{Name: commonConfig.Source, Err: err}
|
|
|
}
|
|
|
/* configure the actual datasource */
|
|
|
if err := dataSrc.Configure(yamlConfig, subLogger); err != nil {
|
|
@@ -171,10 +189,11 @@ func LoadAcquisitionFromFile(config *csconfig.CrowdsecServiceCfg) ([]DataSource,
|
|
|
}
|
|
|
dec := yaml.NewDecoder(yamlFile)
|
|
|
dec.SetStrict(true)
|
|
|
+ idx := -1
|
|
|
for {
|
|
|
var sub configuration.DataSourceCommonCfg
|
|
|
- var idx int
|
|
|
err = dec.Decode(&sub)
|
|
|
+ idx += 1
|
|
|
if err != nil {
|
|
|
if !errors.Is(err, io.EOF) {
|
|
|
return nil, fmt.Errorf("failed to yaml decode %s: %w", acquisFile, err)
|
|
@@ -191,7 +210,6 @@ func LoadAcquisitionFromFile(config *csconfig.CrowdsecServiceCfg) ([]DataSource,
|
|
|
if len(sub.Labels) == 0 {
|
|
|
if sub.Source == "" {
|
|
|
log.Debugf("skipping empty item in %s", acquisFile)
|
|
|
- idx += 1
|
|
|
continue
|
|
|
}
|
|
|
return nil, fmt.Errorf("missing labels in %s (position: %d)", acquisFile, idx)
|
|
@@ -206,6 +224,11 @@ func LoadAcquisitionFromFile(config *csconfig.CrowdsecServiceCfg) ([]DataSource,
|
|
|
sub.UniqueId = uniqueId
|
|
|
src, err := DataSourceConfigure(sub)
|
|
|
if err != nil {
|
|
|
+ var dserr *DataSourceUnavailableError
|
|
|
+ if errors.As(err, &dserr) {
|
|
|
+ log.Error(err)
|
|
|
+ continue
|
|
|
+ }
|
|
|
return nil, fmt.Errorf("while configuring datasource of type %s from %s (position: %d): %w", sub.Source, acquisFile, idx, err)
|
|
|
}
|
|
|
if sub.TransformExpr != "" {
|
|
@@ -216,7 +239,6 @@ func LoadAcquisitionFromFile(config *csconfig.CrowdsecServiceCfg) ([]DataSource,
|
|
|
transformRuntimes[uniqueId] = vm
|
|
|
}
|
|
|
sources = append(sources, *src)
|
|
|
- idx += 1
|
|
|
}
|
|
|
}
|
|
|
return sources, nil
|
|
@@ -293,6 +315,11 @@ func transform(transformChan chan types.Event, output chan types.Event, AcquisTo
|
|
|
}
|
|
|
|
|
|
func StartAcquisition(sources []DataSource, output chan types.Event, AcquisTomb *tomb.Tomb) error {
|
|
|
+ // Don't wait if we have no sources, as it will hang forever
|
|
|
+ if len(sources) == 0 {
|
|
|
+ return nil
|
|
|
+ }
|
|
|
+
|
|
|
for i := 0; i < len(sources); i++ {
|
|
|
subsrc := sources[i] //ensure its a copy
|
|
|
log.Debugf("starting one source %d/%d ->> %T", i, len(sources), subsrc)
|
|
@@ -328,11 +355,8 @@ func StartAcquisition(sources []DataSource, output chan types.Event, AcquisTomb
|
|
|
return nil
|
|
|
})
|
|
|
}
|
|
|
- // Don't wait if we have no sources, as it will hang forever
|
|
|
- if len(sources) > 0 {
|
|
|
- /*return only when acquisition is over (cat) or never (tail)*/
|
|
|
- err := AcquisTomb.Wait()
|
|
|
- return err
|
|
|
- }
|
|
|
- return nil
|
|
|
+
|
|
|
+ /*return only when acquisition is over (cat) or never (tail)*/
|
|
|
+ err := AcquisTomb.Wait()
|
|
|
+ return err
|
|
|
}
|