docker.go 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524
  1. package dockeracquisition
  2. import (
  3. "bufio"
  4. "context"
  5. "fmt"
  6. "net/url"
  7. "regexp"
  8. "strconv"
  9. "strings"
  10. "time"
  11. "github.com/ahmetb/dlog"
  12. "github.com/crowdsecurity/crowdsec/pkg/acquisition/configuration"
  13. leaky "github.com/crowdsecurity/crowdsec/pkg/leakybucket"
  14. "github.com/crowdsecurity/crowdsec/pkg/types"
  15. dockerTypes "github.com/docker/docker/api/types"
  16. "github.com/docker/docker/client"
  17. "github.com/pkg/errors"
  18. "github.com/prometheus/client_golang/prometheus"
  19. log "github.com/sirupsen/logrus"
  20. "gopkg.in/tomb.v2"
  21. "gopkg.in/yaml.v2"
  22. )
  23. var linesRead = prometheus.NewCounterVec(
  24. prometheus.CounterOpts{
  25. Name: "cs_dockersource_hits_total",
  26. Help: "Total lines that were read.",
  27. },
  28. []string{"source"})
  29. type DockerConfiguration struct {
  30. CheckInterval string `yaml:"check_interval"`
  31. FollowStdout bool `yaml:"follow_stdout"`
  32. FollowStdErr bool `yaml:"follow_stderr"`
  33. Until string `yaml:"until"`
  34. Since string `yaml:"since"`
  35. DockerHost string `yaml:"docker_host"`
  36. ContainerName []string `yaml:"container_name"`
  37. ContainerID []string `yaml:"container_id"`
  38. ContainerNameRegexp []string `yaml:"container_name_regexp"`
  39. ContainerIDRegexp []string `yaml:"container_id_regexp"`
  40. ForceInotify bool `yaml:"force_inotify"`
  41. configuration.DataSourceCommonCfg `yaml:",inline"`
  42. }
  43. type DockerSource struct {
  44. Config DockerConfiguration
  45. runningContainerState map[string]*ContainerConfig
  46. compiledContainerName []*regexp.Regexp
  47. compiledContainerID []*regexp.Regexp
  48. CheckIntervalDuration time.Duration
  49. logger *log.Entry
  50. Client client.CommonAPIClient
  51. t *tomb.Tomb
  52. containerLogsOptions *dockerTypes.ContainerLogsOptions
  53. }
  54. type ContainerConfig struct {
  55. Name string
  56. ID string
  57. t *tomb.Tomb
  58. logger *log.Entry
  59. Labels map[string]string
  60. }
  61. func (d *DockerSource) Configure(Config []byte, logger *log.Entry) error {
  62. var err error
  63. d.Config = DockerConfiguration{
  64. FollowStdout: true, // default
  65. FollowStdErr: true, // default
  66. CheckInterval: "1s", // default
  67. }
  68. d.logger = logger
  69. d.runningContainerState = make(map[string]*ContainerConfig)
  70. err = yaml.UnmarshalStrict(Config, &d.Config)
  71. if err != nil {
  72. return errors.Wrap(err, "Cannot parse DockerAcquisition configuration")
  73. }
  74. d.logger.Tracef("DockerAcquisition configuration: %+v", d.Config)
  75. if len(d.Config.ContainerName) == 0 && len(d.Config.ContainerID) == 0 && len(d.Config.ContainerIDRegexp) == 0 && len(d.Config.ContainerNameRegexp) == 0 {
  76. return fmt.Errorf("no containers names or containers ID configuration provided")
  77. }
  78. d.CheckIntervalDuration, err = time.ParseDuration(d.Config.CheckInterval)
  79. if err != nil {
  80. return fmt.Errorf("parsing 'check_interval' parameters: %s", d.CheckIntervalDuration)
  81. }
  82. if d.Config.Mode == "" {
  83. d.Config.Mode = configuration.TAIL_MODE
  84. }
  85. if d.Config.Mode != configuration.CAT_MODE && d.Config.Mode != configuration.TAIL_MODE {
  86. return fmt.Errorf("unsupported mode %s for docker datasource", d.Config.Mode)
  87. }
  88. d.logger.Tracef("Actual DockerAcquisition configuration %+v", d.Config)
  89. for _, cont := range d.Config.ContainerNameRegexp {
  90. d.compiledContainerName = append(d.compiledContainerName, regexp.MustCompile(cont))
  91. }
  92. for _, cont := range d.Config.ContainerIDRegexp {
  93. d.compiledContainerID = append(d.compiledContainerID, regexp.MustCompile(cont))
  94. }
  95. dockerClient, err := client.NewClientWithOpts(client.FromEnv, client.WithAPIVersionNegotiation())
  96. if err != nil {
  97. return err
  98. }
  99. if d.Config.Since == "" {
  100. d.Config.Since = time.Now().UTC().Format(time.RFC3339)
  101. }
  102. d.containerLogsOptions = &dockerTypes.ContainerLogsOptions{
  103. ShowStdout: d.Config.FollowStdout,
  104. ShowStderr: d.Config.FollowStdErr,
  105. Follow: true,
  106. Since: d.Config.Since,
  107. }
  108. if d.Config.Until != "" {
  109. d.containerLogsOptions.Until = d.Config.Until
  110. }
  111. if d.Config.DockerHost != "" {
  112. if err := client.WithHost(d.Config.DockerHost)(dockerClient); err != nil {
  113. return err
  114. }
  115. }
  116. d.Client = dockerClient
  117. _, err = d.Client.Info(context.Background())
  118. if err != nil {
  119. return errors.Wrapf(err, "failed to configure docker datasource %s", d.Config.DockerHost)
  120. }
  121. return nil
  122. }
  123. func (d *DockerSource) ConfigureByDSN(dsn string, labels map[string]string, logger *log.Entry) error {
  124. var err error
  125. if !strings.HasPrefix(dsn, d.GetName()+"://") {
  126. return fmt.Errorf("invalid DSN %s for docker source, must start with %s://", dsn, d.GetName())
  127. }
  128. d.Config = DockerConfiguration{
  129. FollowStdout: true,
  130. FollowStdErr: true,
  131. CheckInterval: "1s",
  132. }
  133. d.Config.ContainerName = make([]string, 0)
  134. d.Config.ContainerID = make([]string, 0)
  135. d.runningContainerState = make(map[string]*ContainerConfig)
  136. d.Config.Mode = configuration.CAT_MODE
  137. d.logger = logger
  138. d.Config.Labels = labels
  139. dockerClient, err := client.NewClientWithOpts(client.FromEnv, client.WithAPIVersionNegotiation())
  140. if err != nil {
  141. return err
  142. }
  143. d.containerLogsOptions = &dockerTypes.ContainerLogsOptions{
  144. ShowStdout: d.Config.FollowStdout,
  145. ShowStderr: d.Config.FollowStdErr,
  146. Follow: false,
  147. }
  148. dsn = strings.TrimPrefix(dsn, d.GetName()+"://")
  149. args := strings.Split(dsn, "?")
  150. if len(args) == 0 {
  151. return fmt.Errorf("invalid dsn: %s", dsn)
  152. }
  153. if len(args) == 1 && args[0] == "" {
  154. return fmt.Errorf("empty %s DSN", d.GetName()+"://")
  155. }
  156. d.Config.ContainerName = append(d.Config.ContainerName, args[0])
  157. // we add it as an ID also so user can provide docker name or docker ID
  158. d.Config.ContainerID = append(d.Config.ContainerID, args[0])
  159. // no parameters
  160. if len(args) == 1 {
  161. d.Client = dockerClient
  162. return nil
  163. }
  164. parameters, err := url.ParseQuery(args[1])
  165. if err != nil {
  166. return errors.Wrapf(err, "while parsing parameters %s: %s", dsn, err)
  167. }
  168. for k, v := range parameters {
  169. switch k {
  170. case "log_level":
  171. if len(v) != 1 {
  172. return fmt.Errorf("only one 'log_level' parameters is required, not many")
  173. }
  174. lvl, err := log.ParseLevel(v[0])
  175. if err != nil {
  176. return errors.Wrapf(err, "unknown level %s", v[0])
  177. }
  178. d.logger.Logger.SetLevel(lvl)
  179. case "until":
  180. if len(v) != 1 {
  181. return fmt.Errorf("only one 'until' parameters is required, not many")
  182. }
  183. d.containerLogsOptions.Until = v[0]
  184. case "since":
  185. if len(v) != 1 {
  186. return fmt.Errorf("only one 'since' parameters is required, not many")
  187. }
  188. d.containerLogsOptions.Since = v[0]
  189. case "follow_stdout":
  190. if len(v) != 1 {
  191. return fmt.Errorf("only one 'follow_stdout' parameters is required, not many")
  192. }
  193. followStdout, err := strconv.ParseBool(v[0])
  194. if err != nil {
  195. return fmt.Errorf("parsing 'follow_stdout' parameters: %s", err)
  196. }
  197. d.Config.FollowStdout = followStdout
  198. d.containerLogsOptions.ShowStdout = followStdout
  199. case "follow_stderr":
  200. if len(v) != 1 {
  201. return fmt.Errorf("only one 'follow_stderr' parameters is required, not many")
  202. }
  203. followStdErr, err := strconv.ParseBool(v[0])
  204. if err != nil {
  205. return fmt.Errorf("parsing 'follow_stderr' parameters: %s", err)
  206. }
  207. d.Config.FollowStdErr = followStdErr
  208. d.containerLogsOptions.ShowStderr = followStdErr
  209. case "docker_host":
  210. if len(v) != 1 {
  211. return fmt.Errorf("only one 'docker_host' parameters is required, not many")
  212. }
  213. if err := client.WithHost(v[0])(dockerClient); err != nil {
  214. return err
  215. }
  216. }
  217. }
  218. d.Client = dockerClient
  219. return nil
  220. }
  221. func (d *DockerSource) GetMode() string {
  222. return d.Config.Mode
  223. }
  224. //SupportedModes returns the supported modes by the acquisition module
  225. func (d *DockerSource) SupportedModes() []string {
  226. return []string{configuration.TAIL_MODE, configuration.CAT_MODE}
  227. }
  228. //OneShotAcquisition reads a set of file and returns when done
  229. func (d *DockerSource) OneShotAcquisition(out chan types.Event, t *tomb.Tomb) error {
  230. d.logger.Debug("In oneshot")
  231. runningContainer, err := d.Client.ContainerList(context.Background(), dockerTypes.ContainerListOptions{})
  232. if err != nil {
  233. return err
  234. }
  235. foundOne := false
  236. for _, container := range runningContainer {
  237. if _, ok := d.runningContainerState[container.ID]; ok {
  238. d.logger.Debugf("container with id %s is already being read from", container.ID)
  239. continue
  240. }
  241. if containerConfig, ok := d.EvalContainer(container); ok {
  242. d.logger.Infof("reading logs from container %s", containerConfig.Name)
  243. d.logger.Debugf("logs options: %+v", *d.containerLogsOptions)
  244. dockerReader, err := d.Client.ContainerLogs(context.Background(), containerConfig.ID, *d.containerLogsOptions)
  245. if err != nil {
  246. d.logger.Errorf("unable to read logs from container: %+v", err)
  247. return err
  248. }
  249. // we use this library to normalize docker API logs (cf. https://ahmet.im/blog/docker-logs-api-binary-format-explained/)
  250. reader := dlog.NewReader(dockerReader)
  251. foundOne = true
  252. scanner := bufio.NewScanner(reader)
  253. for scanner.Scan() {
  254. line := scanner.Text()
  255. if line == "" {
  256. continue
  257. }
  258. l := types.Line{}
  259. l.Raw = line
  260. l.Labels = d.Config.Labels
  261. l.Time = time.Now().UTC()
  262. l.Src = containerConfig.Name
  263. l.Process = true
  264. l.Module = d.GetName()
  265. linesRead.With(prometheus.Labels{"source": containerConfig.Name}).Inc()
  266. evt := types.Event{Line: l, Process: true, Type: types.LOG, ExpectMode: leaky.LIVE}
  267. out <- evt
  268. d.logger.Debugf("Sent line to parsing: %+v", evt.Line.Raw)
  269. }
  270. d.runningContainerState[container.ID] = containerConfig
  271. }
  272. }
  273. t.Kill(nil)
  274. if !foundOne {
  275. return fmt.Errorf("no docker found, can't run one shot acquisition")
  276. }
  277. return nil
  278. }
  279. func (d *DockerSource) GetMetrics() []prometheus.Collector {
  280. return []prometheus.Collector{linesRead}
  281. }
  282. func (d *DockerSource) GetAggregMetrics() []prometheus.Collector {
  283. return []prometheus.Collector{linesRead}
  284. }
  285. func (d *DockerSource) GetName() string {
  286. return "docker"
  287. }
  288. func (d *DockerSource) CanRun() error {
  289. return nil
  290. }
  291. func (d *DockerSource) EvalContainer(container dockerTypes.Container) (*ContainerConfig, bool) {
  292. for _, containerID := range d.Config.ContainerID {
  293. if containerID == container.ID {
  294. return &ContainerConfig{ID: container.ID, Name: container.Names[0], Labels: d.Config.Labels}, true
  295. }
  296. }
  297. for _, containerName := range d.Config.ContainerName {
  298. for _, name := range container.Names {
  299. if strings.HasPrefix(name, "/") && len(name) > 0 {
  300. name = name[1:]
  301. }
  302. if name == containerName {
  303. return &ContainerConfig{ID: container.ID, Name: name, Labels: d.Config.Labels}, true
  304. }
  305. }
  306. }
  307. for _, cont := range d.compiledContainerID {
  308. if matched := cont.Match([]byte(container.ID)); matched {
  309. return &ContainerConfig{ID: container.ID, Name: container.Names[0], Labels: d.Config.Labels}, true
  310. }
  311. }
  312. for _, cont := range d.compiledContainerName {
  313. for _, name := range container.Names {
  314. if matched := cont.Match([]byte(name)); matched {
  315. return &ContainerConfig{ID: container.ID, Name: name, Labels: d.Config.Labels}, true
  316. }
  317. }
  318. }
  319. return &ContainerConfig{}, false
  320. }
  321. func (d *DockerSource) WatchContainer(monitChan chan *ContainerConfig, deleteChan chan *ContainerConfig) error {
  322. ticker := time.NewTicker(d.CheckIntervalDuration)
  323. d.logger.Infof("Container watcher started, interval: %s", d.CheckIntervalDuration.String())
  324. for {
  325. select {
  326. case <-d.t.Dying():
  327. d.logger.Infof("stopping container watcher")
  328. return nil
  329. case <-ticker.C:
  330. // to track for garbage collection
  331. runningContainersID := make(map[string]bool)
  332. runningContainer, err := d.Client.ContainerList(context.Background(), dockerTypes.ContainerListOptions{})
  333. if err != nil {
  334. if strings.Contains(strings.ToLower(err.Error()), "cannot connect to the docker daemon at") {
  335. for idx, container := range d.runningContainerState {
  336. if d.runningContainerState[idx].t.Alive() {
  337. d.logger.Infof("killing tail for container %s", container.Name)
  338. d.runningContainerState[idx].t.Kill(nil)
  339. if err := d.runningContainerState[idx].t.Wait(); err != nil {
  340. d.logger.Infof("error while waiting for death of %s : %s", container.Name, err)
  341. }
  342. }
  343. delete(d.runningContainerState, idx)
  344. }
  345. } else {
  346. log.Errorf("container list err: %s", err.Error())
  347. }
  348. continue
  349. }
  350. for _, container := range runningContainer {
  351. runningContainersID[container.ID] = true
  352. // don't need to re eval an already monitored container
  353. if _, ok := d.runningContainerState[container.ID]; ok {
  354. continue
  355. }
  356. if containerConfig, ok := d.EvalContainer(container); ok {
  357. monitChan <- containerConfig
  358. }
  359. }
  360. for containerStateID, containerConfig := range d.runningContainerState {
  361. if _, ok := runningContainersID[containerStateID]; !ok {
  362. deleteChan <- containerConfig
  363. }
  364. }
  365. d.logger.Tracef("Reading logs from %d containers", len(d.runningContainerState))
  366. ticker.Reset(d.CheckIntervalDuration)
  367. }
  368. }
  369. }
  370. func (d *DockerSource) StreamingAcquisition(out chan types.Event, t *tomb.Tomb) error {
  371. d.t = t
  372. monitChan := make(chan *ContainerConfig)
  373. deleteChan := make(chan *ContainerConfig)
  374. d.logger.Infof("Starting docker acquisition")
  375. t.Go(func() error {
  376. return d.DockerManager(monitChan, deleteChan, out)
  377. })
  378. return d.WatchContainer(monitChan, deleteChan)
  379. }
  380. func (d *DockerSource) Dump() interface{} {
  381. return d
  382. }
  383. func ReadTailScanner(scanner *bufio.Scanner, out chan string, t *tomb.Tomb) error {
  384. for scanner.Scan() {
  385. out <- scanner.Text()
  386. }
  387. return nil
  388. }
  389. func (d *DockerSource) TailDocker(container *ContainerConfig, outChan chan types.Event) error {
  390. container.logger.Infof("start tail for container %s", container.Name)
  391. dockerReader, err := d.Client.ContainerLogs(context.Background(), container.ID, *d.containerLogsOptions)
  392. if err != nil {
  393. container.logger.Errorf("unable to read logs from container: %+v", err)
  394. return err
  395. }
  396. // we use this library to normalize docker API logs (cf. https://ahmet.im/blog/docker-logs-api-binary-format-explained/)
  397. reader := dlog.NewReader(dockerReader)
  398. scanner := bufio.NewScanner(reader)
  399. readerChan := make(chan string)
  400. readerTomb := &tomb.Tomb{}
  401. readerTomb.Go(func() error {
  402. return ReadTailScanner(scanner, readerChan, readerTomb)
  403. })
  404. for {
  405. select {
  406. case <-container.t.Dying():
  407. readerTomb.Kill(nil)
  408. container.logger.Infof("tail stopped for container %s", container.Name)
  409. return nil
  410. case line := <-readerChan:
  411. if line == "" {
  412. continue
  413. }
  414. l := types.Line{}
  415. l.Raw = line
  416. l.Labels = d.Config.Labels
  417. l.Time = time.Now().UTC()
  418. l.Src = container.Name
  419. l.Process = true
  420. l.Module = d.GetName()
  421. evt := types.Event{Line: l, Process: true, Type: types.LOG, ExpectMode: leaky.LIVE}
  422. linesRead.With(prometheus.Labels{"source": container.Name}).Inc()
  423. outChan <- evt
  424. d.logger.Debugf("Sent line to parsing: %+v", evt.Line.Raw)
  425. }
  426. }
  427. }
  428. func (d *DockerSource) DockerManager(in chan *ContainerConfig, deleteChan chan *ContainerConfig, outChan chan types.Event) error {
  429. d.logger.Info("DockerSource Manager started")
  430. for {
  431. select {
  432. case newContainer := <-in:
  433. if _, ok := d.runningContainerState[newContainer.ID]; !ok {
  434. newContainer.t = &tomb.Tomb{}
  435. newContainer.logger = d.logger.WithFields(log.Fields{"container_name": newContainer.Name})
  436. newContainer.t.Go(func() error {
  437. return d.TailDocker(newContainer, outChan)
  438. })
  439. d.runningContainerState[newContainer.ID] = newContainer
  440. }
  441. case containerToDelete := <-deleteChan:
  442. if containerConfig, ok := d.runningContainerState[containerToDelete.ID]; ok {
  443. log.Infof("container acquisition stopped for container '%s'", containerConfig.Name)
  444. containerConfig.t.Kill(nil)
  445. delete(d.runningContainerState, containerToDelete.ID)
  446. }
  447. case <-d.t.Dying():
  448. for idx, container := range d.runningContainerState {
  449. if d.runningContainerState[idx].t.Alive() {
  450. d.logger.Infof("killing tail for container %s", container.Name)
  451. d.runningContainerState[idx].t.Kill(nil)
  452. if err := d.runningContainerState[idx].t.Wait(); err != nil {
  453. d.logger.Infof("error while waiting for death of %s : %s", container.Name, err)
  454. }
  455. }
  456. }
  457. d.runningContainerState = nil
  458. d.logger.Debugf("routine cleanup done, return")
  459. return nil
  460. }
  461. }
  462. }