Browse Source

Docker datasource (#1064)

* add docker datasource
AlteredCoder 3 years ago
parent
commit
4917aa23c9

+ 1 - 1
cmd/crowdsec-cli/explain.go

@@ -28,7 +28,7 @@ Explain log pipeline
 		Example: `
 		Example: `
 cscli explain --file ./myfile.log --type nginx 
 cscli explain --file ./myfile.log --type nginx 
 cscli explain --log "Sep 19 18:33:22 scw-d95986 sshd[24347]: pam_unix(sshd:auth): authentication failure; logname= uid=0 euid=0 tty=ssh ruser= rhost=1.2.3.4" --type syslog
 cscli explain --log "Sep 19 18:33:22 scw-d95986 sshd[24347]: pam_unix(sshd:auth): authentication failure; logname= uid=0 euid=0 tty=ssh ruser= rhost=1.2.3.4" --type syslog
-cscli explain -dsn "file://myfile.log" --type nginx
+cscli explain --dsn "file://myfile.log" --type nginx
 		`,
 		`,
 		Args:              cobra.ExactArgs(0),
 		Args:              cobra.ExactArgs(0),
 		DisableAutoGenTag: true,
 		DisableAutoGenTag: true,

+ 3 - 2
go.mod

@@ -9,6 +9,7 @@ require (
 	github.com/Masterminds/semver v1.5.0 // indirect
 	github.com/Masterminds/semver v1.5.0 // indirect
 	github.com/Masterminds/sprig v2.22.0+incompatible
 	github.com/Masterminds/sprig v2.22.0+incompatible
 	github.com/Microsoft/go-winio v0.4.16 // indirect
 	github.com/Microsoft/go-winio v0.4.16 // indirect
+	github.com/ahmetb/dlog v0.0.0-20170105205344-4fb5f8204f26
 	github.com/alexliesenfeld/health v0.5.1
 	github.com/alexliesenfeld/health v0.5.1
 	github.com/antonmedv/expr v1.8.9
 	github.com/antonmedv/expr v1.8.9
 	github.com/appleboy/gin-jwt/v2 v2.6.4
 	github.com/appleboy/gin-jwt/v2 v2.6.4
@@ -24,7 +25,7 @@ require (
 	github.com/docker/docker v20.10.2+incompatible
 	github.com/docker/docker v20.10.2+incompatible
 	github.com/docker/go-connections v0.4.0
 	github.com/docker/go-connections v0.4.0
 	github.com/enescakir/emoji v1.0.0
 	github.com/enescakir/emoji v1.0.0
-	github.com/fatih/color v1.13.0 // indirect
+	github.com/fatih/color v1.13.0
 	github.com/fsnotify/fsnotify v1.4.9
 	github.com/fsnotify/fsnotify v1.4.9
 	github.com/gin-gonic/gin v1.6.3
 	github.com/gin-gonic/gin v1.6.3
 	github.com/go-co-op/gocron v1.9.0
 	github.com/go-co-op/gocron v1.9.0
@@ -62,7 +63,7 @@ require (
 	github.com/prometheus/client_golang v1.10.0
 	github.com/prometheus/client_golang v1.10.0
 	github.com/prometheus/client_model v0.2.0
 	github.com/prometheus/client_model v0.2.0
 	github.com/prometheus/prom2json v1.3.0
 	github.com/prometheus/prom2json v1.3.0
-	github.com/r3labs/diff/v2 v2.14.1 // indirect
+	github.com/r3labs/diff/v2 v2.14.1
 	github.com/rivo/uniseg v0.2.0 // indirect
 	github.com/rivo/uniseg v0.2.0 // indirect
 	github.com/russross/blackfriday/v2 v2.1.0 // indirect
 	github.com/russross/blackfriday/v2 v2.1.0 // indirect
 	github.com/sirupsen/logrus v1.8.1
 	github.com/sirupsen/logrus v1.8.1

+ 2 - 0
go.sum

@@ -45,6 +45,8 @@ github.com/Shopify/toxiproxy v2.1.4+incompatible/go.mod h1:OXgGpZ6Cli1/URJOF1DMx
 github.com/VividCortex/gohistogram v1.0.0/go.mod h1:Pf5mBqqDxYaXu3hDrrU+w6nw50o/4+TcAqDqk/vUH7g=
 github.com/VividCortex/gohistogram v1.0.0/go.mod h1:Pf5mBqqDxYaXu3hDrrU+w6nw50o/4+TcAqDqk/vUH7g=
 github.com/afex/hystrix-go v0.0.0-20180502004556-fa1af6a1f4f5/go.mod h1:SkGFH1ia65gfNATL8TAiHDNxPzPdmEL5uirI2Uyuz6c=
 github.com/afex/hystrix-go v0.0.0-20180502004556-fa1af6a1f4f5/go.mod h1:SkGFH1ia65gfNATL8TAiHDNxPzPdmEL5uirI2Uyuz6c=
 github.com/agnivade/levenshtein v1.0.1/go.mod h1:CURSv5d9Uaml+FovSIICkLbAUZ9S4RqaHDIsdSBg7lM=
 github.com/agnivade/levenshtein v1.0.1/go.mod h1:CURSv5d9Uaml+FovSIICkLbAUZ9S4RqaHDIsdSBg7lM=
+github.com/ahmetb/dlog v0.0.0-20170105205344-4fb5f8204f26 h1:3YVZUqkoev4mL+aCwVOSWV4M7pN+NURHL38Z2zq5JKA=
+github.com/ahmetb/dlog v0.0.0-20170105205344-4fb5f8204f26/go.mod h1:ymXt5bw5uSNu4jveerFxE0vNYxF8ncqbptntMaFMg3k=
 github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc=
 github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc=
 github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc=
 github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc=
 github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0=
 github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0=

+ 14 - 10
pkg/acquisition/acquisition.go

@@ -8,10 +8,10 @@ import (
 
 
 	"github.com/crowdsecurity/crowdsec/pkg/acquisition/configuration"
 	"github.com/crowdsecurity/crowdsec/pkg/acquisition/configuration"
 	cloudwatchacquisition "github.com/crowdsecurity/crowdsec/pkg/acquisition/modules/cloudwatch"
 	cloudwatchacquisition "github.com/crowdsecurity/crowdsec/pkg/acquisition/modules/cloudwatch"
+	dockeracquisition "github.com/crowdsecurity/crowdsec/pkg/acquisition/modules/docker"
 	fileacquisition "github.com/crowdsecurity/crowdsec/pkg/acquisition/modules/file"
 	fileacquisition "github.com/crowdsecurity/crowdsec/pkg/acquisition/modules/file"
 	journalctlacquisition "github.com/crowdsecurity/crowdsec/pkg/acquisition/modules/journalctl"
 	journalctlacquisition "github.com/crowdsecurity/crowdsec/pkg/acquisition/modules/journalctl"
 	syslogacquisition "github.com/crowdsecurity/crowdsec/pkg/acquisition/modules/syslog"
 	syslogacquisition "github.com/crowdsecurity/crowdsec/pkg/acquisition/modules/syslog"
-
 	"github.com/crowdsecurity/crowdsec/pkg/csconfig"
 	"github.com/crowdsecurity/crowdsec/pkg/csconfig"
 	"github.com/crowdsecurity/crowdsec/pkg/types"
 	"github.com/crowdsecurity/crowdsec/pkg/types"
 	"github.com/pkg/errors"
 	"github.com/pkg/errors"
@@ -24,15 +24,15 @@ import (
 
 
 // The interface each datasource must implement
 // The interface each datasource must implement
 type DataSource interface {
 type DataSource interface {
-	GetMetrics() []prometheus.Collector                      // Returns pointers to metrics that are managed by the module
-	GetAggregMetrics() []prometheus.Collector                // Returns pointers to metrics that are managed by the module (aggregated mode, limits cardinality)
-	Configure([]byte, *log.Entry) error                      // Configure the datasource
-	ConfigureByDSN(string, map[string]string, *log.Entry) error	// Configure the datasource
-	GetMode() string                                         // Get the mode (TAIL, CAT or SERVER)
-	GetName() string                                         // Get the name of the module
-	OneShotAcquisition(chan types.Event, *tomb.Tomb) error   // Start one shot acquisition(eg, cat a file)
-	StreamingAcquisition(chan types.Event, *tomb.Tomb) error // Start live acquisition (eg, tail a file)
-	CanRun() error                                           // Whether the datasource can run or not (eg, journalctl on BSD is a non-sense)
+	GetMetrics() []prometheus.Collector                         // Returns pointers to metrics that are managed by the module
+	GetAggregMetrics() []prometheus.Collector                   // Returns pointers to metrics that are managed by the module (aggregated mode, limits cardinality)
+	Configure([]byte, *log.Entry) error                         // Configure the datasource
+	ConfigureByDSN(string, map[string]string, *log.Entry) error // Configure the datasource
+	GetMode() string                                            // Get the mode (TAIL, CAT or SERVER)
+	GetName() string                                            // Get the name of the module
+	OneShotAcquisition(chan types.Event, *tomb.Tomb) error      // Start one shot acquisition(eg, cat a file)
+	StreamingAcquisition(chan types.Event, *tomb.Tomb) error    // Start live acquisition (eg, tail a file)
+	CanRun() error                                              // Whether the datasource can run or not (eg, journalctl on BSD is a non-sense)
 	Dump() interface{}
 	Dump() interface{}
 }
 }
 
 
@@ -56,6 +56,10 @@ var AcquisitionSources = []struct {
 		name:  "syslog",
 		name:  "syslog",
 		iface: func() DataSource { return &syslogacquisition.SyslogSource{} },
 		iface: func() DataSource { return &syslogacquisition.SyslogSource{} },
 	},
 	},
+	{
+		name:  "docker",
+		iface: func() DataSource { return &dockeracquisition.DockerSource{} },
+	},
 }
 }
 
 
 func GetDataSourceIface(dataSourceType string) DataSource {
 func GetDataSourceIface(dataSourceType string) DataSource {

+ 518 - 0
pkg/acquisition/modules/docker/docker.go

@@ -0,0 +1,518 @@
+package dockeracquisition
+
+import (
+	"bufio"
+	"context"
+	"fmt"
+	"net/url"
+	"regexp"
+	"strconv"
+	"strings"
+	"time"
+
+	"github.com/ahmetb/dlog"
+	"github.com/crowdsecurity/crowdsec/pkg/acquisition/configuration"
+	leaky "github.com/crowdsecurity/crowdsec/pkg/leakybucket"
+	"github.com/crowdsecurity/crowdsec/pkg/types"
+	dockerTypes "github.com/docker/docker/api/types"
+	"github.com/docker/docker/client"
+
+	"github.com/pkg/errors"
+	"github.com/prometheus/client_golang/prometheus"
+	log "github.com/sirupsen/logrus"
+	"gopkg.in/tomb.v2"
+	"gopkg.in/yaml.v2"
+)
+
+var linesRead = prometheus.NewCounterVec(
+	prometheus.CounterOpts{
+		Name: "cs_dockersource_hits_total",
+		Help: "Total lines that were read.",
+	},
+	[]string{"source"})
+
+type DockerConfiguration struct {
+	CheckInterval                     string   `yaml:"check_interval"`
+	FollowStdout                      bool     `yaml:"follow_stdout"`
+	FollowStdErr                      bool     `yaml:"follow_stderr"`
+	Until                             string   `yaml:"until"`
+	Since                             string   `yaml:"since"`
+	DockerHost                        string   `yaml:"docker_host"`
+	ContainerName                     []string `yaml:"container_name"`
+	ContainerID                       []string `yaml:"container_id"`
+	ContainerNameRegexp               []string `yaml:"container_name_regexp"`
+	ContainerIDRegexp                 []string `yaml:"container_id_regexp"`
+	ForceInotify                      bool     `yaml:"force_inotify"`
+	configuration.DataSourceCommonCfg `yaml:",inline"`
+}
+
+type DockerSource struct {
+	Config                DockerConfiguration
+	runningContainerState map[string]*ContainerConfig
+	compiledContainerName []*regexp.Regexp
+	compiledContainerID   []*regexp.Regexp
+	CheckIntervalDuration time.Duration
+	logger                *log.Entry
+	Client                client.CommonAPIClient
+	t                     *tomb.Tomb
+	containerLogsOptions  *dockerTypes.ContainerLogsOptions
+}
+
+type ContainerConfig struct {
+	Name   string
+	ID     string
+	t      *tomb.Tomb
+	logger *log.Entry
+	Labels map[string]string
+}
+
+func (d *DockerSource) Configure(Config []byte, logger *log.Entry) error {
+	var err error
+
+	d.Config = DockerConfiguration{
+		FollowStdout:  true, // default
+		FollowStdErr:  true, // default
+		CheckInterval: "1s", // default
+	}
+	d.logger = logger
+
+	d.runningContainerState = make(map[string]*ContainerConfig)
+
+	err = yaml.UnmarshalStrict(Config, &d.Config)
+	if err != nil {
+		return errors.Wrap(err, "Cannot parse DockerAcquisition configuration")
+	}
+
+	d.logger.Tracef("DockerAcquisition configuration: %+v", d.Config)
+	if len(d.Config.ContainerName) == 0 && len(d.Config.ContainerID) == 0 && len(d.Config.ContainerIDRegexp) == 0 && len(d.Config.ContainerNameRegexp) == 0 {
+		return fmt.Errorf("no containers names or containers ID configuration provided")
+	}
+
+	d.CheckIntervalDuration, err = time.ParseDuration(d.Config.CheckInterval)
+	if err != nil {
+		return fmt.Errorf("parsing 'check_interval' parameters: %s", d.CheckIntervalDuration)
+	}
+
+	if d.Config.Mode == "" {
+		d.Config.Mode = configuration.TAIL_MODE
+	}
+	if d.Config.Mode != configuration.CAT_MODE && d.Config.Mode != configuration.TAIL_MODE {
+		return fmt.Errorf("unsupported mode %s for docker datasource", d.Config.Mode)
+	}
+	d.logger.Tracef("Actual DockerAcquisition configuration %+v", d.Config)
+
+	for _, cont := range d.Config.ContainerNameRegexp {
+		d.compiledContainerName = append(d.compiledContainerName, regexp.MustCompile(cont))
+	}
+
+	for _, cont := range d.Config.ContainerIDRegexp {
+		d.compiledContainerID = append(d.compiledContainerID, regexp.MustCompile(cont))
+	}
+
+	dockerClient, err := client.NewClientWithOpts(client.FromEnv)
+	if err != nil {
+		return err
+	}
+
+	if d.Config.Since == "" {
+		d.Config.Since = time.Now().Format(time.RFC3339)
+	}
+
+	d.containerLogsOptions = &dockerTypes.ContainerLogsOptions{
+		ShowStdout: d.Config.FollowStdout,
+		ShowStderr: d.Config.FollowStdErr,
+		Follow:     true,
+		Since:      d.Config.Since,
+	}
+
+	if d.Config.Until != "" {
+		d.containerLogsOptions.Until = d.Config.Until
+	}
+
+	if d.Config.DockerHost != "" {
+		if err := client.WithHost(d.Config.DockerHost)(dockerClient); err != nil {
+			return err
+		}
+	}
+	d.Client = dockerClient
+
+	return nil
+}
+
+func (d *DockerSource) ConfigureByDSN(dsn string, labels map[string]string, logger *log.Entry) error {
+	var err error
+
+	if !strings.HasPrefix(dsn, d.GetName()+"://") {
+		return fmt.Errorf("invalid DSN %s for docker source, must start with %s://", dsn, d.GetName())
+	}
+
+	d.Config = DockerConfiguration{
+		FollowStdout:  true,
+		FollowStdErr:  true,
+		CheckInterval: "1s",
+	}
+	d.Config.ContainerName = make([]string, 0)
+	d.Config.ContainerID = make([]string, 0)
+	d.runningContainerState = make(map[string]*ContainerConfig)
+	d.Config.Mode = configuration.CAT_MODE
+	d.logger = logger
+	d.Config.Labels = labels
+
+	dockerClient, err := client.NewClientWithOpts(client.FromEnv)
+	if err != nil {
+		return err
+	}
+
+	d.containerLogsOptions = &dockerTypes.ContainerLogsOptions{
+		ShowStdout: d.Config.FollowStdout,
+		ShowStderr: d.Config.FollowStdErr,
+		Follow:     false,
+	}
+	dsn = strings.TrimPrefix(dsn, d.GetName()+"://")
+	args := strings.Split(dsn, "?")
+
+	if len(args) == 0 {
+		return fmt.Errorf("invalid dsn: %s", dsn)
+	}
+
+	if len(args) == 1 && args[0] == "" {
+		return fmt.Errorf("empty %s DSN", d.GetName()+"://")
+	}
+	d.Config.ContainerName = append(d.Config.ContainerName, args[0])
+	// we add it as an ID also so user can provide docker name or docker ID
+	d.Config.ContainerID = append(d.Config.ContainerID, args[0])
+
+	// no parameters
+	if len(args) == 1 {
+		d.Client = dockerClient
+		return nil
+	}
+
+	parameters, err := url.ParseQuery(args[1])
+	if err != nil {
+		return errors.Wrapf(err, "while parsing parameters %s: %s", dsn, err)
+	}
+
+	for k, v := range parameters {
+		switch k {
+		case "log_level":
+			if len(v) != 1 {
+				return fmt.Errorf("only one 'log_level' parameters is required, not many")
+			}
+			lvl, err := log.ParseLevel(v[0])
+			if err != nil {
+				return errors.Wrapf(err, "unknown level %s", v[0])
+			}
+			d.logger.Logger.SetLevel(lvl)
+		case "until":
+			if len(v) != 1 {
+				return fmt.Errorf("only one 'until' parameters is required, not many")
+			}
+			d.containerLogsOptions.Until = v[0]
+		case "since":
+			if len(v) != 1 {
+				return fmt.Errorf("only one 'since' parameters is required, not many")
+			}
+			d.containerLogsOptions.Since = v[0]
+		case "follow_stdout":
+			if len(v) != 1 {
+				return fmt.Errorf("only one 'follow_stdout' parameters is required, not many")
+			}
+			followStdout, err := strconv.ParseBool(v[0])
+			if err != nil {
+				return fmt.Errorf("parsing 'follow_stdout' parameters: %s", err)
+			}
+			d.Config.FollowStdout = followStdout
+			d.containerLogsOptions.ShowStdout = followStdout
+		case "follow_stderr":
+			if len(v) != 1 {
+				return fmt.Errorf("only one 'follow_stderr' parameters is required, not many")
+			}
+			followStdErr, err := strconv.ParseBool(v[0])
+			if err != nil {
+				return fmt.Errorf("parsing 'follow_stderr' parameters: %s", err)
+			}
+			d.Config.FollowStdErr = followStdErr
+			d.containerLogsOptions.ShowStderr = followStdErr
+		case "docker_host":
+			if len(v) != 1 {
+				return fmt.Errorf("only one 'docker_host' parameters is required, not many")
+			}
+			if err := client.WithHost(v[0])(dockerClient); err != nil {
+				return err
+			}
+		}
+	}
+	d.Client = dockerClient
+	return nil
+}
+
+func (d *DockerSource) GetMode() string {
+	return d.Config.Mode
+}
+
+//SupportedModes returns the supported modes by the acquisition module
+func (d *DockerSource) SupportedModes() []string {
+	return []string{configuration.TAIL_MODE, configuration.CAT_MODE}
+}
+
+//OneShotAcquisition reads a set of file and returns when done
+func (d *DockerSource) OneShotAcquisition(out chan types.Event, t *tomb.Tomb) error {
+	d.logger.Debug("In oneshot")
+	runningContainer, err := d.Client.ContainerList(context.Background(), dockerTypes.ContainerListOptions{})
+	if err != nil {
+		return err
+	}
+	foundOne := false
+	for _, container := range runningContainer {
+		if _, ok := d.runningContainerState[container.ID]; ok {
+			d.logger.Debugf("container with id %s is already being read from", container.ID)
+			continue
+		}
+		if containerConfig, ok := d.EvalContainer(container); ok {
+			d.logger.Infof("reading logs from container %s", containerConfig.Name)
+			d.logger.Debugf("logs options: %+v", *d.containerLogsOptions)
+			dockerReader, err := d.Client.ContainerLogs(context.Background(), containerConfig.ID, *d.containerLogsOptions)
+			if err != nil {
+				d.logger.Errorf("unable to read logs from container: %+v", err)
+				return err
+			}
+			// we use this library to normalize docker API logs (cf. https://ahmet.im/blog/docker-logs-api-binary-format-explained/)
+			reader := dlog.NewReader(dockerReader)
+			foundOne = true
+			scanner := bufio.NewScanner(reader)
+			for scanner.Scan() {
+				line := scanner.Text()
+				if line == "" {
+					continue
+				}
+				l := types.Line{}
+				l.Raw = line
+				l.Labels = d.Config.Labels
+				l.Time = time.Now()
+				l.Src = containerConfig.Name
+				l.Process = true
+				l.Module = d.GetName()
+				linesRead.With(prometheus.Labels{"source": containerConfig.Name}).Inc()
+				evt := types.Event{Line: l, Process: true, Type: types.LOG, ExpectMode: leaky.LIVE}
+				out <- evt
+				d.logger.Debugf("Sent line to parsing: %+v", evt.Line.Raw)
+			}
+			d.runningContainerState[container.ID] = containerConfig
+		}
+	}
+
+	t.Kill(nil)
+
+	if !foundOne {
+		return fmt.Errorf("no docker found, can't run one shot acquisition")
+	}
+
+	return nil
+}
+
+func (d *DockerSource) GetMetrics() []prometheus.Collector {
+	return []prometheus.Collector{linesRead}
+}
+
+func (d *DockerSource) GetAggregMetrics() []prometheus.Collector {
+	return []prometheus.Collector{linesRead}
+}
+
+func (d *DockerSource) GetName() string {
+	return "docker"
+}
+
+func (d *DockerSource) CanRun() error {
+	return nil
+}
+
+func (d *DockerSource) EvalContainer(container dockerTypes.Container) (*ContainerConfig, bool) {
+	for _, containerID := range d.Config.ContainerID {
+		if containerID == container.ID {
+			return &ContainerConfig{ID: container.ID, Name: container.Names[0], Labels: d.Config.Labels}, true
+		}
+	}
+
+	for _, containerName := range d.Config.ContainerName {
+		for _, name := range container.Names {
+			if strings.HasPrefix(name, "/") && len(name) > 0 {
+				name = name[1:]
+			}
+			if name == containerName {
+				return &ContainerConfig{ID: container.ID, Name: name, Labels: d.Config.Labels}, true
+			}
+		}
+
+	}
+
+	for _, cont := range d.compiledContainerID {
+		if matched := cont.Match([]byte(container.ID)); matched {
+			return &ContainerConfig{ID: container.ID, Name: container.Names[0], Labels: d.Config.Labels}, true
+		}
+	}
+
+	for _, cont := range d.compiledContainerName {
+		for _, name := range container.Names {
+			if matched := cont.Match([]byte(name)); matched {
+				return &ContainerConfig{ID: container.ID, Name: name, Labels: d.Config.Labels}, true
+			}
+		}
+
+	}
+
+	return &ContainerConfig{}, false
+}
+
+func (d *DockerSource) WatchContainer(monitChan chan *ContainerConfig, deleteChan chan *ContainerConfig) error {
+	ticker := time.NewTicker(d.CheckIntervalDuration)
+	d.logger.Infof("Container watcher started, interval: %s", d.CheckIntervalDuration.String())
+	for {
+		select {
+		case <-d.t.Dying():
+			d.logger.Infof("stopping container watcher")
+			return nil
+		case <-ticker.C:
+			// to track for garbage collection
+			runningContainersID := make(map[string]bool)
+			runningContainer, err := d.Client.ContainerList(context.Background(), dockerTypes.ContainerListOptions{})
+			if err != nil {
+				if strings.Contains(strings.ToLower(err.Error()), "cannot connect to the docker daemon at") {
+					for idx, container := range d.runningContainerState {
+						if d.runningContainerState[idx].t.Alive() {
+							d.logger.Infof("killing tail for container %s", container.Name)
+							d.runningContainerState[idx].t.Kill(nil)
+							if err := d.runningContainerState[idx].t.Wait(); err != nil {
+								d.logger.Infof("error while waiting for death of %s : %s", container.Name, err)
+							}
+						}
+						delete(d.runningContainerState, idx)
+					}
+				} else {
+					log.Debugf("container list err: %s", err.Error())
+				}
+				continue
+			}
+
+			for _, container := range runningContainer {
+				runningContainersID[container.ID] = true
+
+				// don't need to re eval an already monitored container
+				if _, ok := d.runningContainerState[container.ID]; ok {
+					continue
+				}
+				if containerConfig, ok := d.EvalContainer(container); ok {
+					monitChan <- containerConfig
+				}
+			}
+
+			for containerStateID, containerConfig := range d.runningContainerState {
+				if _, ok := runningContainersID[containerStateID]; !ok {
+					deleteChan <- containerConfig
+				}
+			}
+			d.logger.Tracef("Reading logs from %d containers", len(d.runningContainerState))
+
+			ticker.Reset(d.CheckIntervalDuration)
+		}
+	}
+}
+
+func (d *DockerSource) StreamingAcquisition(out chan types.Event, t *tomb.Tomb) error {
+	d.t = t
+	monitChan := make(chan *ContainerConfig)
+	deleteChan := make(chan *ContainerConfig)
+	d.logger.Infof("Starting docker acquisition")
+	t.Go(func() error {
+		return d.DockerManager(monitChan, deleteChan, out)
+	})
+
+	return d.WatchContainer(monitChan, deleteChan)
+}
+
+func (d *DockerSource) Dump() interface{} {
+	return d
+}
+
+func ReadTailScanner(scanner *bufio.Scanner, out chan string, t *tomb.Tomb) error {
+	for scanner.Scan() {
+		out <- scanner.Text()
+	}
+	return nil
+}
+
+func (d *DockerSource) TailDocker(container *ContainerConfig, outChan chan types.Event) error {
+	container.logger.Infof("start tail for container %s", container.Name)
+	dockerReader, err := d.Client.ContainerLogs(context.Background(), container.ID, *d.containerLogsOptions)
+	if err != nil {
+		container.logger.Errorf("unable to read logs from container: %+v", err)
+		return err
+	}
+	// we use this library to normalize docker API logs (cf. https://ahmet.im/blog/docker-logs-api-binary-format-explained/)
+	reader := dlog.NewReader(dockerReader)
+	scanner := bufio.NewScanner(reader)
+	readerChan := make(chan string)
+	readerTomb := &tomb.Tomb{}
+	readerTomb.Go(func() error {
+		return ReadTailScanner(scanner, readerChan, readerTomb)
+	})
+	for {
+		select {
+		case <-container.t.Dying():
+			readerTomb.Kill(nil)
+			container.logger.Infof("tail stopped for container %s", container.Name)
+			return nil
+		case line := <-readerChan:
+			if line == "" {
+				continue
+			}
+			l := types.Line{}
+			l.Raw = line
+			l.Labels = d.Config.Labels
+			l.Time = time.Now()
+			l.Src = container.Name
+			l.Process = true
+			l.Module = d.GetName()
+			evt := types.Event{Line: l, Process: true, Type: types.LOG, ExpectMode: leaky.LIVE}
+			linesRead.With(prometheus.Labels{"source": container.Name}).Inc()
+			outChan <- evt
+			d.logger.Debugf("Sent line to parsing: %+v", evt.Line.Raw)
+		}
+	}
+}
+
+func (d *DockerSource) DockerManager(in chan *ContainerConfig, deleteChan chan *ContainerConfig, outChan chan types.Event) error {
+	d.logger.Info("DockerSource Manager started")
+	for {
+		select {
+		case newContainer := <-in:
+			if _, ok := d.runningContainerState[newContainer.ID]; !ok {
+				newContainer.t = &tomb.Tomb{}
+				newContainer.logger = d.logger.WithFields(log.Fields{"container_name": newContainer.Name})
+				newContainer.t.Go(func() error {
+					return d.TailDocker(newContainer, outChan)
+				})
+				d.runningContainerState[newContainer.ID] = newContainer
+			}
+		case containerToDelete := <-deleteChan:
+			if containerConfig, ok := d.runningContainerState[containerToDelete.ID]; ok {
+				log.Infof("container acquisition stopped for container '%s'", containerConfig.Name)
+				containerConfig.t.Kill(nil)
+				delete(d.runningContainerState, containerToDelete.ID)
+			}
+		case <-d.t.Dying():
+			for idx, container := range d.runningContainerState {
+				if d.runningContainerState[idx].t.Alive() {
+					d.logger.Infof("killing tail for container %s", container.Name)
+					d.runningContainerState[idx].t.Kill(nil)
+					if err := d.runningContainerState[idx].t.Wait(); err != nil {
+						d.logger.Infof("error while waiting for death of %s : %s", container.Name, err)
+					}
+				}
+			}
+			d.runningContainerState = nil
+			d.logger.Debugf("routine cleanup done, return")
+			return nil
+		}
+	}
+}

+ 330 - 0
pkg/acquisition/modules/docker/docker_test.go

@@ -0,0 +1,330 @@
+package dockeracquisition
+
+import (
+	"context"
+	"fmt"
+	"io"
+	"os"
+	"strings"
+	"testing"
+	"time"
+
+	"github.com/crowdsecurity/crowdsec/pkg/types"
+	dockerTypes "github.com/docker/docker/api/types"
+	"github.com/docker/docker/client"
+	log "github.com/sirupsen/logrus"
+	"gopkg.in/tomb.v2"
+
+	"github.com/stretchr/testify/assert"
+)
+
+const testContainerName = "docker_test"
+
+func TestConfigure(t *testing.T) {
+	log.Infof("Test 'TestConfigure'")
+
+	tests := []struct {
+		config      string
+		expectedErr string
+	}{
+		{
+			config:      `foobar: asd`,
+			expectedErr: "line 1: field foobar not found in type dockeracquisition.DockerConfiguration",
+		},
+		{
+			config: `
+mode: tail
+source: docker`,
+			expectedErr: "no containers names or containers ID configuration provided",
+		},
+		{
+			config: `
+mode: cat
+source: docker
+container_name:
+ - toto`,
+			expectedErr: "",
+		},
+	}
+
+	subLogger := log.WithFields(log.Fields{
+		"type": "docker",
+	})
+	for _, test := range tests {
+		f := DockerSource{}
+		err := f.Configure([]byte(test.config), subLogger)
+		if test.expectedErr != "" && err == nil {
+			t.Fatalf("Expected err %s but got nil !", test.expectedErr)
+		}
+		if test.expectedErr != "" {
+			assert.Contains(t, err.Error(), test.expectedErr)
+		}
+	}
+}
+
+func TestConfigureDSN(t *testing.T) {
+	log.Infof("Test 'TestConfigureDSN'")
+
+	tests := []struct {
+		name        string
+		dsn         string
+		expectedErr string
+	}{
+		{
+			name:        "invalid DSN",
+			dsn:         "asd://",
+			expectedErr: "invalid DSN asd:// for docker source, must start with docker://",
+		},
+		{
+			name:        "empty DSN",
+			dsn:         "docker://",
+			expectedErr: "empty docker:// DSN",
+		},
+		{
+			name:        "DSN ok with log_level",
+			dsn:         "docker://test_docker?log_level=warn",
+			expectedErr: "",
+		},
+		{
+			name:        "DSN invalid log_level",
+			dsn:         "docker://test_docker?log_level=foobar",
+			expectedErr: "unknown level foobar: not a valid logrus Level:",
+		},
+		{
+			name:        "DSN ok with multiple parameters",
+			dsn:         "docker://test_docker?since=42min&docker_host=unix:///var/run/podman/podman.sock",
+			expectedErr: "",
+		},
+	}
+	subLogger := log.WithFields(log.Fields{
+		"type": "docker",
+	})
+	for _, test := range tests {
+		f := DockerSource{}
+		err := f.ConfigureByDSN(test.dsn, map[string]string{"type": "testtype"}, subLogger)
+		if test.expectedErr != "" {
+			assert.Contains(t, err.Error(), test.expectedErr)
+		} else {
+			assert.Equal(t, err, nil)
+		}
+	}
+}
+
+type mockDockerCli struct {
+	client.Client
+}
+
+func TestStreamingAcquisition(t *testing.T) {
+	log.SetOutput(os.Stdout)
+	log.SetLevel(log.InfoLevel)
+	log.Info("Test 'TestStreamingAcquisition'")
+	tests := []struct {
+		config         string
+		expectedErr    string
+		expectedOutput string
+		expectedLines  int
+		logType        string
+		logLevel       log.Level
+	}{
+		{
+			config: `
+source: docker
+mode: cat
+container_name:
+ - docker_test`,
+			expectedErr:    "",
+			expectedOutput: "",
+			expectedLines:  3,
+			logType:        "test",
+			logLevel:       log.InfoLevel,
+		},
+		{
+			config: `
+source: docker
+mode: cat
+container_name_regexp:
+ - docker_*`,
+			expectedErr:    "",
+			expectedOutput: "",
+			expectedLines:  3,
+			logType:        "test",
+			logLevel:       log.InfoLevel,
+		},
+	}
+
+	for _, ts := range tests {
+		var logger *log.Logger
+		var subLogger *log.Entry
+		if ts.expectedOutput != "" {
+			logger.SetLevel(ts.logLevel)
+			subLogger = logger.WithFields(log.Fields{
+				"type": "docker",
+			})
+		} else {
+			subLogger = log.WithFields(log.Fields{
+				"type": "docker",
+			})
+		}
+
+		dockerTomb := tomb.Tomb{}
+		out := make(chan types.Event)
+		dockerSource := DockerSource{}
+		err := dockerSource.Configure([]byte(ts.config), subLogger)
+		if err != nil {
+			t.Fatalf("Unexpected error : %s", err)
+		}
+		dockerSource.Client = new(mockDockerCli)
+		actualLines := 0
+		readerTomb := &tomb.Tomb{}
+		streamTomb := tomb.Tomb{}
+		streamTomb.Go(func() error {
+			return dockerSource.StreamingAcquisition(out, &dockerTomb)
+		})
+		readerTomb.Go(func() error {
+			time.Sleep(1 * time.Second)
+			ticker := time.NewTicker(1 * time.Second)
+			for {
+				select {
+				case <-out:
+					actualLines++
+					ticker.Reset(1 * time.Second)
+				case <-ticker.C:
+					log.Infof("no more line to read")
+					readerTomb.Kill(nil)
+					return nil
+				}
+			}
+		})
+		time.Sleep(10 * time.Second)
+		if ts.expectedErr == "" && err != nil {
+			t.Fatalf("Unexpected error : %s", err)
+		} else if ts.expectedErr != "" && err != nil {
+			assert.Contains(t, err.Error(), ts.expectedErr)
+			continue
+		} else if ts.expectedErr != "" && err == nil {
+			t.Fatalf("Expected error %s, but got nothing !", ts.expectedErr)
+		}
+		if err := readerTomb.Wait(); err != nil {
+			t.Fatal(err)
+		}
+		//time.Sleep(4 * time.Second)
+		if ts.expectedLines != 0 {
+			assert.Equal(t, ts.expectedLines, actualLines)
+		}
+		dockerSource.t.Kill(nil)
+		err = streamTomb.Wait()
+		if err != nil {
+			t.Fatalf("docker acquisition error: %s", err)
+		}
+	}
+
+}
+
+func (cli *mockDockerCli) ContainerList(ctx context.Context, options dockerTypes.ContainerListOptions) ([]dockerTypes.Container, error) {
+	containers := make([]dockerTypes.Container, 0)
+	container := &dockerTypes.Container{
+		ID:    "12456",
+		Names: []string{testContainerName},
+	}
+	containers = append(containers, *container)
+
+	return containers, nil
+}
+
+func (cli *mockDockerCli) ContainerLogs(ctx context.Context, container string, options dockerTypes.ContainerLogsOptions) (io.ReadCloser, error) {
+	startLineByte := "\x01\x00\x00\x00\x00\x00\x00\x1f"
+	data := []string{"docker", "test", "1234"}
+	ret := ""
+	for _, line := range data {
+		ret += fmt.Sprintf("%s%s\n", startLineByte, line)
+	}
+	r := io.NopCloser(strings.NewReader(ret)) // r type is io.ReadCloser
+	return r, nil
+}
+
+func TestOneShot(t *testing.T) {
+	log.Infof("Test 'TestOneShot'")
+
+	tests := []struct {
+		dsn            string
+		expectedErr    string
+		expectedOutput string
+		expectedLines  int
+		logType        string
+		logLevel       log.Level
+	}{
+		{
+			dsn:            "docker://non_exist_docker",
+			expectedErr:    "no docker found, can't run one shot acquisition",
+			expectedOutput: "",
+			expectedLines:  0,
+			logType:        "test",
+			logLevel:       log.InfoLevel,
+		},
+		{
+			dsn:            "docker://" + testContainerName,
+			expectedErr:    "",
+			expectedOutput: "",
+			expectedLines:  3,
+			logType:        "test",
+			logLevel:       log.InfoLevel,
+		},
+	}
+
+	for _, ts := range tests {
+		var subLogger *log.Entry
+		var logger *log.Logger
+		if ts.expectedOutput != "" {
+			logger.SetLevel(ts.logLevel)
+			subLogger = logger.WithFields(log.Fields{
+				"type": "docker",
+			})
+		} else {
+			log.SetLevel(ts.logLevel)
+			subLogger = log.WithFields(log.Fields{
+				"type": "docker",
+			})
+		}
+
+		dockerClient := &DockerSource{}
+		labels := make(map[string]string)
+		labels["type"] = ts.logType
+
+		if err := dockerClient.ConfigureByDSN(ts.dsn, labels, subLogger); err != nil {
+			t.Fatalf("unable to configure dsn '%s': %s", ts.dsn, err)
+		}
+		dockerClient.Client = new(mockDockerCli)
+		out := make(chan types.Event)
+		actualLines := 0
+		if ts.expectedLines != 0 {
+			go func() {
+			READLOOP:
+				for {
+					select {
+					case <-out:
+						actualLines++
+					case <-time.After(1 * time.Second):
+						break READLOOP
+					}
+				}
+			}()
+		}
+		tomb := tomb.Tomb{}
+		err := dockerClient.OneShotAcquisition(out, &tomb)
+
+		if ts.expectedErr == "" && err != nil {
+			t.Fatalf("Unexpected error : %s", err)
+		} else if ts.expectedErr != "" && err != nil {
+			assert.Contains(t, err.Error(), ts.expectedErr)
+			continue
+		} else if ts.expectedErr != "" && err == nil {
+			t.Fatalf("Expected error %s, but got nothing !", ts.expectedErr)
+		}
+		// else we do the check before actualLines is incremented ...
+		time.Sleep(1 * time.Second)
+		if ts.expectedLines != 0 {
+			assert.Equal(t, ts.expectedLines, actualLines)
+		}
+	}
+
+}

+ 6 - 0
pkg/csconfig/crowdsec_service.go

@@ -60,6 +60,12 @@ func (c *Config) LoadCrowdsec() error {
 			return errors.Wrap(err, "while globing acquis_dir")
 			return errors.Wrap(err, "while globing acquis_dir")
 		}
 		}
 		c.Crowdsec.AcquisitionFiles = append(c.Crowdsec.AcquisitionFiles, files...)
 		c.Crowdsec.AcquisitionFiles = append(c.Crowdsec.AcquisitionFiles, files...)
+
+		files, err = filepath.Glob(c.Crowdsec.AcquisitionDirPath + "/*.yml")
+		if err != nil {
+			return errors.Wrap(err, "while globing acquis_dir")
+		}
+		c.Crowdsec.AcquisitionFiles = append(c.Crowdsec.AcquisitionFiles, files...)
 	}
 	}
 	if c.Crowdsec.AcquisitionDirPath == "" && c.Crowdsec.AcquisitionFilePath == "" {
 	if c.Crowdsec.AcquisitionDirPath == "" && c.Crowdsec.AcquisitionFilePath == "" {
 		log.Warningf("no acquisition_path nor acquisition_dir")
 		log.Warningf("no acquisition_path nor acquisition_dir")