123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557 |
- package dockeracquisition
- import (
- "bufio"
- "context"
- "fmt"
- "net/url"
- "regexp"
- "strconv"
- "strings"
- "time"
- "github.com/ahmetb/dlog"
- "github.com/crowdsecurity/crowdsec/pkg/acquisition/configuration"
- leaky "github.com/crowdsecurity/crowdsec/pkg/leakybucket"
- "github.com/crowdsecurity/crowdsec/pkg/types"
- dockerTypes "github.com/docker/docker/api/types"
- "github.com/docker/docker/client"
- "github.com/pkg/errors"
- "github.com/prometheus/client_golang/prometheus"
- log "github.com/sirupsen/logrus"
- "gopkg.in/tomb.v2"
- "gopkg.in/yaml.v2"
- )
- var linesRead = prometheus.NewCounterVec(
- prometheus.CounterOpts{
- Name: "cs_dockersource_hits_total",
- Help: "Total lines that were read.",
- },
- []string{"source"})
- type DockerConfiguration struct {
- CheckInterval string `yaml:"check_interval"`
- FollowStdout bool `yaml:"follow_stdout"`
- FollowStdErr bool `yaml:"follow_stderr"`
- Until string `yaml:"until"`
- Since string `yaml:"since"`
- DockerHost string `yaml:"docker_host"`
- ContainerName []string `yaml:"container_name"`
- ContainerID []string `yaml:"container_id"`
- ContainerNameRegexp []string `yaml:"container_name_regexp"`
- ContainerIDRegexp []string `yaml:"container_id_regexp"`
- ForceInotify bool `yaml:"force_inotify"`
- configuration.DataSourceCommonCfg `yaml:",inline"`
- }
- type DockerSource struct {
- Config DockerConfiguration
- runningContainerState map[string]*ContainerConfig
- compiledContainerName []*regexp.Regexp
- compiledContainerID []*regexp.Regexp
- CheckIntervalDuration time.Duration
- logger *log.Entry
- Client client.CommonAPIClient
- t *tomb.Tomb
- containerLogsOptions *dockerTypes.ContainerLogsOptions
- }
- type ContainerConfig struct {
- Name string
- ID string
- t *tomb.Tomb
- logger *log.Entry
- Labels map[string]string
- Tty bool
- }
- func (d *DockerSource) Configure(Config []byte, logger *log.Entry) error {
- var err error
- d.Config = DockerConfiguration{
- FollowStdout: true, // default
- FollowStdErr: true, // default
- CheckInterval: "1s", // default
- }
- d.logger = logger
- d.runningContainerState = make(map[string]*ContainerConfig)
- err = yaml.UnmarshalStrict(Config, &d.Config)
- if err != nil {
- return errors.Wrap(err, "Cannot parse DockerAcquisition configuration")
- }
- d.logger.Tracef("DockerAcquisition configuration: %+v", d.Config)
- if len(d.Config.ContainerName) == 0 && len(d.Config.ContainerID) == 0 && len(d.Config.ContainerIDRegexp) == 0 && len(d.Config.ContainerNameRegexp) == 0 {
- return fmt.Errorf("no containers names or containers ID configuration provided")
- }
- d.CheckIntervalDuration, err = time.ParseDuration(d.Config.CheckInterval)
- if err != nil {
- return fmt.Errorf("parsing 'check_interval' parameters: %s", d.CheckIntervalDuration)
- }
- if d.Config.Mode == "" {
- d.Config.Mode = configuration.TAIL_MODE
- }
- if d.Config.Mode != configuration.CAT_MODE && d.Config.Mode != configuration.TAIL_MODE {
- return fmt.Errorf("unsupported mode %s for docker datasource", d.Config.Mode)
- }
- d.logger.Tracef("Actual DockerAcquisition configuration %+v", d.Config)
- for _, cont := range d.Config.ContainerNameRegexp {
- d.compiledContainerName = append(d.compiledContainerName, regexp.MustCompile(cont))
- }
- for _, cont := range d.Config.ContainerIDRegexp {
- d.compiledContainerID = append(d.compiledContainerID, regexp.MustCompile(cont))
- }
- dockerClient, err := client.NewClientWithOpts(client.FromEnv, client.WithAPIVersionNegotiation())
- if err != nil {
- return err
- }
- if d.Config.Since == "" {
- d.Config.Since = time.Now().UTC().Format(time.RFC3339)
- }
- d.containerLogsOptions = &dockerTypes.ContainerLogsOptions{
- ShowStdout: d.Config.FollowStdout,
- ShowStderr: d.Config.FollowStdErr,
- Follow: true,
- Since: d.Config.Since,
- }
- if d.Config.Until != "" {
- d.containerLogsOptions.Until = d.Config.Until
- }
- if d.Config.DockerHost != "" {
- if err := client.WithHost(d.Config.DockerHost)(dockerClient); err != nil {
- return err
- }
- }
- d.Client = dockerClient
- _, err = d.Client.Info(context.Background())
- if err != nil {
- return errors.Wrapf(err, "failed to configure docker datasource %s", d.Config.DockerHost)
- }
- return nil
- }
- func (d *DockerSource) ConfigureByDSN(dsn string, labels map[string]string, logger *log.Entry) error {
- var err error
- if !strings.HasPrefix(dsn, d.GetName()+"://") {
- return fmt.Errorf("invalid DSN %s for docker source, must start with %s://", dsn, d.GetName())
- }
- d.Config = DockerConfiguration{
- FollowStdout: true,
- FollowStdErr: true,
- CheckInterval: "1s",
- }
- d.Config.ContainerName = make([]string, 0)
- d.Config.ContainerID = make([]string, 0)
- d.runningContainerState = make(map[string]*ContainerConfig)
- d.Config.Mode = configuration.CAT_MODE
- d.logger = logger
- d.Config.Labels = labels
- dockerClient, err := client.NewClientWithOpts(client.FromEnv, client.WithAPIVersionNegotiation())
- if err != nil {
- return err
- }
- d.containerLogsOptions = &dockerTypes.ContainerLogsOptions{
- ShowStdout: d.Config.FollowStdout,
- ShowStderr: d.Config.FollowStdErr,
- Follow: false,
- }
- dsn = strings.TrimPrefix(dsn, d.GetName()+"://")
- args := strings.Split(dsn, "?")
- if len(args) == 0 {
- return fmt.Errorf("invalid dsn: %s", dsn)
- }
- if len(args) == 1 && args[0] == "" {
- return fmt.Errorf("empty %s DSN", d.GetName()+"://")
- }
- d.Config.ContainerName = append(d.Config.ContainerName, args[0])
- // we add it as an ID also so user can provide docker name or docker ID
- d.Config.ContainerID = append(d.Config.ContainerID, args[0])
- // no parameters
- if len(args) == 1 {
- d.Client = dockerClient
- return nil
- }
- parameters, err := url.ParseQuery(args[1])
- if err != nil {
- return errors.Wrapf(err, "while parsing parameters %s: %s", dsn, err)
- }
- for k, v := range parameters {
- switch k {
- case "log_level":
- if len(v) != 1 {
- return fmt.Errorf("only one 'log_level' parameters is required, not many")
- }
- lvl, err := log.ParseLevel(v[0])
- if err != nil {
- return errors.Wrapf(err, "unknown level %s", v[0])
- }
- d.logger.Logger.SetLevel(lvl)
- case "until":
- if len(v) != 1 {
- return fmt.Errorf("only one 'until' parameters is required, not many")
- }
- d.containerLogsOptions.Until = v[0]
- case "since":
- if len(v) != 1 {
- return fmt.Errorf("only one 'since' parameters is required, not many")
- }
- d.containerLogsOptions.Since = v[0]
- case "follow_stdout":
- if len(v) != 1 {
- return fmt.Errorf("only one 'follow_stdout' parameters is required, not many")
- }
- followStdout, err := strconv.ParseBool(v[0])
- if err != nil {
- return fmt.Errorf("parsing 'follow_stdout' parameters: %s", err)
- }
- d.Config.FollowStdout = followStdout
- d.containerLogsOptions.ShowStdout = followStdout
- case "follow_stderr":
- if len(v) != 1 {
- return fmt.Errorf("only one 'follow_stderr' parameters is required, not many")
- }
- followStdErr, err := strconv.ParseBool(v[0])
- if err != nil {
- return fmt.Errorf("parsing 'follow_stderr' parameters: %s", err)
- }
- d.Config.FollowStdErr = followStdErr
- d.containerLogsOptions.ShowStderr = followStdErr
- case "docker_host":
- if len(v) != 1 {
- return fmt.Errorf("only one 'docker_host' parameters is required, not many")
- }
- if err := client.WithHost(v[0])(dockerClient); err != nil {
- return err
- }
- }
- }
- d.Client = dockerClient
- return nil
- }
- func (d *DockerSource) GetMode() string {
- return d.Config.Mode
- }
- //SupportedModes returns the supported modes by the acquisition module
- func (d *DockerSource) SupportedModes() []string {
- return []string{configuration.TAIL_MODE, configuration.CAT_MODE}
- }
- //OneShotAcquisition reads a set of file and returns when done
- func (d *DockerSource) OneShotAcquisition(out chan types.Event, t *tomb.Tomb) error {
- d.logger.Debug("In oneshot")
- runningContainer, err := d.Client.ContainerList(context.Background(), dockerTypes.ContainerListOptions{})
- if err != nil {
- return err
- }
- foundOne := false
- for _, container := range runningContainer {
- if _, ok := d.runningContainerState[container.ID]; ok {
- d.logger.Debugf("container with id %s is already being read from", container.ID)
- continue
- }
- if containerConfig, ok := d.EvalContainer(container); ok {
- d.logger.Infof("reading logs from container %s", containerConfig.Name)
- d.logger.Debugf("logs options: %+v", *d.containerLogsOptions)
- dockerReader, err := d.Client.ContainerLogs(context.Background(), containerConfig.ID, *d.containerLogsOptions)
- if err != nil {
- d.logger.Errorf("unable to read logs from container: %+v", err)
- return err
- }
- // we use this library to normalize docker API logs (cf. https://ahmet.im/blog/docker-logs-api-binary-format-explained/)
- foundOne = true
- var scanner *bufio.Scanner
- if containerConfig.Tty {
- scanner = bufio.NewScanner(dockerReader)
- } else {
- reader := dlog.NewReader(dockerReader)
- scanner = bufio.NewScanner(reader)
- }
- for scanner.Scan() {
- line := scanner.Text()
- if line == "" {
- continue
- }
- l := types.Line{}
- l.Raw = line
- l.Labels = d.Config.Labels
- l.Time = time.Now().UTC()
- l.Src = containerConfig.Name
- l.Process = true
- l.Module = d.GetName()
- linesRead.With(prometheus.Labels{"source": containerConfig.Name}).Inc()
- evt := types.Event{Line: l, Process: true, Type: types.LOG, ExpectMode: leaky.LIVE}
- out <- evt
- d.logger.Debugf("Sent line to parsing: %+v", evt.Line.Raw)
- }
- err = scanner.Err()
- if err != nil {
- d.logger.Errorf("Got error from docker read: %s", err)
- }
- d.runningContainerState[container.ID] = containerConfig
- }
- }
- t.Kill(nil)
- if !foundOne {
- return fmt.Errorf("no docker found, can't run one shot acquisition")
- }
- return nil
- }
- func (d *DockerSource) GetMetrics() []prometheus.Collector {
- return []prometheus.Collector{linesRead}
- }
- func (d *DockerSource) GetAggregMetrics() []prometheus.Collector {
- return []prometheus.Collector{linesRead}
- }
- func (d *DockerSource) GetName() string {
- return "docker"
- }
- func (d *DockerSource) CanRun() error {
- return nil
- }
- func (d *DockerSource) getContainerTTY(containerId string) bool {
- containerDetails, err := d.Client.ContainerInspect(context.Background(), containerId)
- if err != nil {
- return false
- }
- return containerDetails.Config.Tty
- }
- func (d *DockerSource) EvalContainer(container dockerTypes.Container) (*ContainerConfig, bool) {
- for _, containerID := range d.Config.ContainerID {
- if containerID == container.ID {
- return &ContainerConfig{ID: container.ID, Name: container.Names[0], Labels: d.Config.Labels, Tty: d.getContainerTTY(container.ID)}, true
- }
- }
- for _, containerName := range d.Config.ContainerName {
- for _, name := range container.Names {
- if strings.HasPrefix(name, "/") && len(name) > 0 {
- name = name[1:]
- }
- if name == containerName {
- return &ContainerConfig{ID: container.ID, Name: name, Labels: d.Config.Labels, Tty: d.getContainerTTY(container.ID)}, true
- }
- }
- }
- for _, cont := range d.compiledContainerID {
- if matched := cont.Match([]byte(container.ID)); matched {
- return &ContainerConfig{ID: container.ID, Name: container.Names[0], Labels: d.Config.Labels, Tty: d.getContainerTTY(container.ID)}, true
- }
- }
- for _, cont := range d.compiledContainerName {
- for _, name := range container.Names {
- if matched := cont.Match([]byte(name)); matched {
- return &ContainerConfig{ID: container.ID, Name: name, Labels: d.Config.Labels, Tty: d.getContainerTTY(container.ID)}, true
- }
- }
- }
- return &ContainerConfig{}, false
- }
- func (d *DockerSource) WatchContainer(monitChan chan *ContainerConfig, deleteChan chan *ContainerConfig) error {
- ticker := time.NewTicker(d.CheckIntervalDuration)
- d.logger.Infof("Container watcher started, interval: %s", d.CheckIntervalDuration.String())
- for {
- select {
- case <-d.t.Dying():
- d.logger.Infof("stopping container watcher")
- return nil
- case <-ticker.C:
- // to track for garbage collection
- runningContainersID := make(map[string]bool)
- runningContainer, err := d.Client.ContainerList(context.Background(), dockerTypes.ContainerListOptions{})
- if err != nil {
- if strings.Contains(strings.ToLower(err.Error()), "cannot connect to the docker daemon at") {
- for idx, container := range d.runningContainerState {
- if d.runningContainerState[idx].t.Alive() {
- d.logger.Infof("killing tail for container %s", container.Name)
- d.runningContainerState[idx].t.Kill(nil)
- if err := d.runningContainerState[idx].t.Wait(); err != nil {
- d.logger.Infof("error while waiting for death of %s : %s", container.Name, err)
- }
- }
- delete(d.runningContainerState, idx)
- }
- } else {
- log.Errorf("container list err: %s", err.Error())
- }
- continue
- }
- for _, container := range runningContainer {
- runningContainersID[container.ID] = true
- // don't need to re eval an already monitored container
- if _, ok := d.runningContainerState[container.ID]; ok {
- continue
- }
- if containerConfig, ok := d.EvalContainer(container); ok {
- monitChan <- containerConfig
- }
- }
- for containerStateID, containerConfig := range d.runningContainerState {
- if _, ok := runningContainersID[containerStateID]; !ok {
- deleteChan <- containerConfig
- }
- }
- d.logger.Tracef("Reading logs from %d containers", len(d.runningContainerState))
- ticker.Reset(d.CheckIntervalDuration)
- }
- }
- }
- func (d *DockerSource) StreamingAcquisition(out chan types.Event, t *tomb.Tomb) error {
- d.t = t
- monitChan := make(chan *ContainerConfig)
- deleteChan := make(chan *ContainerConfig)
- d.logger.Infof("Starting docker acquisition")
- t.Go(func() error {
- return d.DockerManager(monitChan, deleteChan, out)
- })
- return d.WatchContainer(monitChan, deleteChan)
- }
- func (d *DockerSource) Dump() interface{} {
- return d
- }
- func ReadTailScanner(scanner *bufio.Scanner, out chan string, t *tomb.Tomb) error {
- for scanner.Scan() {
- out <- scanner.Text()
- }
- return nil
- }
- func (d *DockerSource) TailDocker(container *ContainerConfig, outChan chan types.Event) error {
- container.logger.Infof("start tail for container %s", container.Name)
- dockerReader, err := d.Client.ContainerLogs(context.Background(), container.ID, *d.containerLogsOptions)
- if err != nil {
- container.logger.Errorf("unable to read logs from container: %+v", err)
- return err
- }
- var scanner *bufio.Scanner
- // we use this library to normalize docker API logs (cf. https://ahmet.im/blog/docker-logs-api-binary-format-explained/)
- if container.Tty {
- scanner = bufio.NewScanner(dockerReader)
- } else {
- reader := dlog.NewReader(dockerReader)
- scanner = bufio.NewScanner(reader)
- }
- readerChan := make(chan string)
- readerTomb := &tomb.Tomb{}
- readerTomb.Go(func() error {
- return ReadTailScanner(scanner, readerChan, readerTomb)
- })
- for {
- select {
- case <-container.t.Dying():
- readerTomb.Kill(nil)
- container.logger.Infof("tail stopped for container %s", container.Name)
- return nil
- case line := <-readerChan:
- if line == "" {
- continue
- }
- l := types.Line{}
- l.Raw = line
- l.Labels = d.Config.Labels
- l.Time = time.Now().UTC()
- l.Src = container.Name
- l.Process = true
- l.Module = d.GetName()
- evt := types.Event{Line: l, Process: true, Type: types.LOG, ExpectMode: leaky.LIVE}
- linesRead.With(prometheus.Labels{"source": container.Name}).Inc()
- outChan <- evt
- d.logger.Debugf("Sent line to parsing: %+v", evt.Line.Raw)
- case <-readerTomb.Dying():
- //This case is to handle temporarily losing the connection to the docker socket
- //The only known case currently is when using docker-socket-proxy (and maybe a docker daemon restart)
- d.logger.Debugf("readerTomb dying for container %s, removing it from runningContainerState", container.Name)
- delete(d.runningContainerState, container.ID)
- //Also reset the Since to avoid re-reading logs
- d.Config.Since = time.Now().UTC().Format(time.RFC3339)
- d.containerLogsOptions.Since = d.Config.Since
- return nil
- }
- }
- }
- func (d *DockerSource) DockerManager(in chan *ContainerConfig, deleteChan chan *ContainerConfig, outChan chan types.Event) error {
- d.logger.Info("DockerSource Manager started")
- for {
- select {
- case newContainer := <-in:
- if _, ok := d.runningContainerState[newContainer.ID]; !ok {
- newContainer.t = &tomb.Tomb{}
- newContainer.logger = d.logger.WithFields(log.Fields{"container_name": newContainer.Name})
- newContainer.t.Go(func() error {
- return d.TailDocker(newContainer, outChan)
- })
- d.runningContainerState[newContainer.ID] = newContainer
- }
- case containerToDelete := <-deleteChan:
- if containerConfig, ok := d.runningContainerState[containerToDelete.ID]; ok {
- log.Infof("container acquisition stopped for container '%s'", containerConfig.Name)
- containerConfig.t.Kill(nil)
- delete(d.runningContainerState, containerToDelete.ID)
- }
- case <-d.t.Dying():
- for idx, container := range d.runningContainerState {
- if d.runningContainerState[idx].t.Alive() {
- d.logger.Infof("killing tail for container %s", container.Name)
- d.runningContainerState[idx].t.Kill(nil)
- if err := d.runningContainerState[idx].t.Wait(); err != nil {
- d.logger.Infof("error while waiting for death of %s : %s", container.Name, err)
- }
- }
- }
- d.runningContainerState = nil
- d.logger.Debugf("routine cleanup done, return")
- return nil
- }
- }
- }
|