docker.go 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557
  1. package dockeracquisition
  2. import (
  3. "bufio"
  4. "context"
  5. "fmt"
  6. "net/url"
  7. "regexp"
  8. "strconv"
  9. "strings"
  10. "time"
  11. "github.com/ahmetb/dlog"
  12. "github.com/crowdsecurity/crowdsec/pkg/acquisition/configuration"
  13. leaky "github.com/crowdsecurity/crowdsec/pkg/leakybucket"
  14. "github.com/crowdsecurity/crowdsec/pkg/types"
  15. dockerTypes "github.com/docker/docker/api/types"
  16. "github.com/docker/docker/client"
  17. "github.com/pkg/errors"
  18. "github.com/prometheus/client_golang/prometheus"
  19. log "github.com/sirupsen/logrus"
  20. "gopkg.in/tomb.v2"
  21. "gopkg.in/yaml.v2"
  22. )
  23. var linesRead = prometheus.NewCounterVec(
  24. prometheus.CounterOpts{
  25. Name: "cs_dockersource_hits_total",
  26. Help: "Total lines that were read.",
  27. },
  28. []string{"source"})
  29. type DockerConfiguration struct {
  30. CheckInterval string `yaml:"check_interval"`
  31. FollowStdout bool `yaml:"follow_stdout"`
  32. FollowStdErr bool `yaml:"follow_stderr"`
  33. Until string `yaml:"until"`
  34. Since string `yaml:"since"`
  35. DockerHost string `yaml:"docker_host"`
  36. ContainerName []string `yaml:"container_name"`
  37. ContainerID []string `yaml:"container_id"`
  38. ContainerNameRegexp []string `yaml:"container_name_regexp"`
  39. ContainerIDRegexp []string `yaml:"container_id_regexp"`
  40. ForceInotify bool `yaml:"force_inotify"`
  41. configuration.DataSourceCommonCfg `yaml:",inline"`
  42. }
  43. type DockerSource struct {
  44. Config DockerConfiguration
  45. runningContainerState map[string]*ContainerConfig
  46. compiledContainerName []*regexp.Regexp
  47. compiledContainerID []*regexp.Regexp
  48. CheckIntervalDuration time.Duration
  49. logger *log.Entry
  50. Client client.CommonAPIClient
  51. t *tomb.Tomb
  52. containerLogsOptions *dockerTypes.ContainerLogsOptions
  53. }
  54. type ContainerConfig struct {
  55. Name string
  56. ID string
  57. t *tomb.Tomb
  58. logger *log.Entry
  59. Labels map[string]string
  60. Tty bool
  61. }
  62. func (d *DockerSource) Configure(Config []byte, logger *log.Entry) error {
  63. var err error
  64. d.Config = DockerConfiguration{
  65. FollowStdout: true, // default
  66. FollowStdErr: true, // default
  67. CheckInterval: "1s", // default
  68. }
  69. d.logger = logger
  70. d.runningContainerState = make(map[string]*ContainerConfig)
  71. err = yaml.UnmarshalStrict(Config, &d.Config)
  72. if err != nil {
  73. return errors.Wrap(err, "Cannot parse DockerAcquisition configuration")
  74. }
  75. d.logger.Tracef("DockerAcquisition configuration: %+v", d.Config)
  76. if len(d.Config.ContainerName) == 0 && len(d.Config.ContainerID) == 0 && len(d.Config.ContainerIDRegexp) == 0 && len(d.Config.ContainerNameRegexp) == 0 {
  77. return fmt.Errorf("no containers names or containers ID configuration provided")
  78. }
  79. d.CheckIntervalDuration, err = time.ParseDuration(d.Config.CheckInterval)
  80. if err != nil {
  81. return fmt.Errorf("parsing 'check_interval' parameters: %s", d.CheckIntervalDuration)
  82. }
  83. if d.Config.Mode == "" {
  84. d.Config.Mode = configuration.TAIL_MODE
  85. }
  86. if d.Config.Mode != configuration.CAT_MODE && d.Config.Mode != configuration.TAIL_MODE {
  87. return fmt.Errorf("unsupported mode %s for docker datasource", d.Config.Mode)
  88. }
  89. d.logger.Tracef("Actual DockerAcquisition configuration %+v", d.Config)
  90. for _, cont := range d.Config.ContainerNameRegexp {
  91. d.compiledContainerName = append(d.compiledContainerName, regexp.MustCompile(cont))
  92. }
  93. for _, cont := range d.Config.ContainerIDRegexp {
  94. d.compiledContainerID = append(d.compiledContainerID, regexp.MustCompile(cont))
  95. }
  96. dockerClient, err := client.NewClientWithOpts(client.FromEnv, client.WithAPIVersionNegotiation())
  97. if err != nil {
  98. return err
  99. }
  100. if d.Config.Since == "" {
  101. d.Config.Since = time.Now().UTC().Format(time.RFC3339)
  102. }
  103. d.containerLogsOptions = &dockerTypes.ContainerLogsOptions{
  104. ShowStdout: d.Config.FollowStdout,
  105. ShowStderr: d.Config.FollowStdErr,
  106. Follow: true,
  107. Since: d.Config.Since,
  108. }
  109. if d.Config.Until != "" {
  110. d.containerLogsOptions.Until = d.Config.Until
  111. }
  112. if d.Config.DockerHost != "" {
  113. if err := client.WithHost(d.Config.DockerHost)(dockerClient); err != nil {
  114. return err
  115. }
  116. }
  117. d.Client = dockerClient
  118. _, err = d.Client.Info(context.Background())
  119. if err != nil {
  120. return errors.Wrapf(err, "failed to configure docker datasource %s", d.Config.DockerHost)
  121. }
  122. return nil
  123. }
  124. func (d *DockerSource) ConfigureByDSN(dsn string, labels map[string]string, logger *log.Entry) error {
  125. var err error
  126. if !strings.HasPrefix(dsn, d.GetName()+"://") {
  127. return fmt.Errorf("invalid DSN %s for docker source, must start with %s://", dsn, d.GetName())
  128. }
  129. d.Config = DockerConfiguration{
  130. FollowStdout: true,
  131. FollowStdErr: true,
  132. CheckInterval: "1s",
  133. }
  134. d.Config.ContainerName = make([]string, 0)
  135. d.Config.ContainerID = make([]string, 0)
  136. d.runningContainerState = make(map[string]*ContainerConfig)
  137. d.Config.Mode = configuration.CAT_MODE
  138. d.logger = logger
  139. d.Config.Labels = labels
  140. dockerClient, err := client.NewClientWithOpts(client.FromEnv, client.WithAPIVersionNegotiation())
  141. if err != nil {
  142. return err
  143. }
  144. d.containerLogsOptions = &dockerTypes.ContainerLogsOptions{
  145. ShowStdout: d.Config.FollowStdout,
  146. ShowStderr: d.Config.FollowStdErr,
  147. Follow: false,
  148. }
  149. dsn = strings.TrimPrefix(dsn, d.GetName()+"://")
  150. args := strings.Split(dsn, "?")
  151. if len(args) == 0 {
  152. return fmt.Errorf("invalid dsn: %s", dsn)
  153. }
  154. if len(args) == 1 && args[0] == "" {
  155. return fmt.Errorf("empty %s DSN", d.GetName()+"://")
  156. }
  157. d.Config.ContainerName = append(d.Config.ContainerName, args[0])
  158. // we add it as an ID also so user can provide docker name or docker ID
  159. d.Config.ContainerID = append(d.Config.ContainerID, args[0])
  160. // no parameters
  161. if len(args) == 1 {
  162. d.Client = dockerClient
  163. return nil
  164. }
  165. parameters, err := url.ParseQuery(args[1])
  166. if err != nil {
  167. return errors.Wrapf(err, "while parsing parameters %s: %s", dsn, err)
  168. }
  169. for k, v := range parameters {
  170. switch k {
  171. case "log_level":
  172. if len(v) != 1 {
  173. return fmt.Errorf("only one 'log_level' parameters is required, not many")
  174. }
  175. lvl, err := log.ParseLevel(v[0])
  176. if err != nil {
  177. return errors.Wrapf(err, "unknown level %s", v[0])
  178. }
  179. d.logger.Logger.SetLevel(lvl)
  180. case "until":
  181. if len(v) != 1 {
  182. return fmt.Errorf("only one 'until' parameters is required, not many")
  183. }
  184. d.containerLogsOptions.Until = v[0]
  185. case "since":
  186. if len(v) != 1 {
  187. return fmt.Errorf("only one 'since' parameters is required, not many")
  188. }
  189. d.containerLogsOptions.Since = v[0]
  190. case "follow_stdout":
  191. if len(v) != 1 {
  192. return fmt.Errorf("only one 'follow_stdout' parameters is required, not many")
  193. }
  194. followStdout, err := strconv.ParseBool(v[0])
  195. if err != nil {
  196. return fmt.Errorf("parsing 'follow_stdout' parameters: %s", err)
  197. }
  198. d.Config.FollowStdout = followStdout
  199. d.containerLogsOptions.ShowStdout = followStdout
  200. case "follow_stderr":
  201. if len(v) != 1 {
  202. return fmt.Errorf("only one 'follow_stderr' parameters is required, not many")
  203. }
  204. followStdErr, err := strconv.ParseBool(v[0])
  205. if err != nil {
  206. return fmt.Errorf("parsing 'follow_stderr' parameters: %s", err)
  207. }
  208. d.Config.FollowStdErr = followStdErr
  209. d.containerLogsOptions.ShowStderr = followStdErr
  210. case "docker_host":
  211. if len(v) != 1 {
  212. return fmt.Errorf("only one 'docker_host' parameters is required, not many")
  213. }
  214. if err := client.WithHost(v[0])(dockerClient); err != nil {
  215. return err
  216. }
  217. }
  218. }
  219. d.Client = dockerClient
  220. return nil
  221. }
  222. func (d *DockerSource) GetMode() string {
  223. return d.Config.Mode
  224. }
  225. //SupportedModes returns the supported modes by the acquisition module
  226. func (d *DockerSource) SupportedModes() []string {
  227. return []string{configuration.TAIL_MODE, configuration.CAT_MODE}
  228. }
  229. //OneShotAcquisition reads a set of file and returns when done
  230. func (d *DockerSource) OneShotAcquisition(out chan types.Event, t *tomb.Tomb) error {
  231. d.logger.Debug("In oneshot")
  232. runningContainer, err := d.Client.ContainerList(context.Background(), dockerTypes.ContainerListOptions{})
  233. if err != nil {
  234. return err
  235. }
  236. foundOne := false
  237. for _, container := range runningContainer {
  238. if _, ok := d.runningContainerState[container.ID]; ok {
  239. d.logger.Debugf("container with id %s is already being read from", container.ID)
  240. continue
  241. }
  242. if containerConfig, ok := d.EvalContainer(container); ok {
  243. d.logger.Infof("reading logs from container %s", containerConfig.Name)
  244. d.logger.Debugf("logs options: %+v", *d.containerLogsOptions)
  245. dockerReader, err := d.Client.ContainerLogs(context.Background(), containerConfig.ID, *d.containerLogsOptions)
  246. if err != nil {
  247. d.logger.Errorf("unable to read logs from container: %+v", err)
  248. return err
  249. }
  250. // we use this library to normalize docker API logs (cf. https://ahmet.im/blog/docker-logs-api-binary-format-explained/)
  251. foundOne = true
  252. var scanner *bufio.Scanner
  253. if containerConfig.Tty {
  254. scanner = bufio.NewScanner(dockerReader)
  255. } else {
  256. reader := dlog.NewReader(dockerReader)
  257. scanner = bufio.NewScanner(reader)
  258. }
  259. for scanner.Scan() {
  260. line := scanner.Text()
  261. if line == "" {
  262. continue
  263. }
  264. l := types.Line{}
  265. l.Raw = line
  266. l.Labels = d.Config.Labels
  267. l.Time = time.Now().UTC()
  268. l.Src = containerConfig.Name
  269. l.Process = true
  270. l.Module = d.GetName()
  271. linesRead.With(prometheus.Labels{"source": containerConfig.Name}).Inc()
  272. evt := types.Event{Line: l, Process: true, Type: types.LOG, ExpectMode: leaky.LIVE}
  273. out <- evt
  274. d.logger.Debugf("Sent line to parsing: %+v", evt.Line.Raw)
  275. }
  276. err = scanner.Err()
  277. if err != nil {
  278. d.logger.Errorf("Got error from docker read: %s", err)
  279. }
  280. d.runningContainerState[container.ID] = containerConfig
  281. }
  282. }
  283. t.Kill(nil)
  284. if !foundOne {
  285. return fmt.Errorf("no docker found, can't run one shot acquisition")
  286. }
  287. return nil
  288. }
  289. func (d *DockerSource) GetMetrics() []prometheus.Collector {
  290. return []prometheus.Collector{linesRead}
  291. }
  292. func (d *DockerSource) GetAggregMetrics() []prometheus.Collector {
  293. return []prometheus.Collector{linesRead}
  294. }
  295. func (d *DockerSource) GetName() string {
  296. return "docker"
  297. }
  298. func (d *DockerSource) CanRun() error {
  299. return nil
  300. }
  301. func (d *DockerSource) getContainerTTY(containerId string) bool {
  302. containerDetails, err := d.Client.ContainerInspect(context.Background(), containerId)
  303. if err != nil {
  304. return false
  305. }
  306. return containerDetails.Config.Tty
  307. }
  308. func (d *DockerSource) EvalContainer(container dockerTypes.Container) (*ContainerConfig, bool) {
  309. for _, containerID := range d.Config.ContainerID {
  310. if containerID == container.ID {
  311. return &ContainerConfig{ID: container.ID, Name: container.Names[0], Labels: d.Config.Labels, Tty: d.getContainerTTY(container.ID)}, true
  312. }
  313. }
  314. for _, containerName := range d.Config.ContainerName {
  315. for _, name := range container.Names {
  316. if strings.HasPrefix(name, "/") && len(name) > 0 {
  317. name = name[1:]
  318. }
  319. if name == containerName {
  320. return &ContainerConfig{ID: container.ID, Name: name, Labels: d.Config.Labels, Tty: d.getContainerTTY(container.ID)}, true
  321. }
  322. }
  323. }
  324. for _, cont := range d.compiledContainerID {
  325. if matched := cont.Match([]byte(container.ID)); matched {
  326. return &ContainerConfig{ID: container.ID, Name: container.Names[0], Labels: d.Config.Labels, Tty: d.getContainerTTY(container.ID)}, true
  327. }
  328. }
  329. for _, cont := range d.compiledContainerName {
  330. for _, name := range container.Names {
  331. if matched := cont.Match([]byte(name)); matched {
  332. return &ContainerConfig{ID: container.ID, Name: name, Labels: d.Config.Labels, Tty: d.getContainerTTY(container.ID)}, true
  333. }
  334. }
  335. }
  336. return &ContainerConfig{}, false
  337. }
  338. func (d *DockerSource) WatchContainer(monitChan chan *ContainerConfig, deleteChan chan *ContainerConfig) error {
  339. ticker := time.NewTicker(d.CheckIntervalDuration)
  340. d.logger.Infof("Container watcher started, interval: %s", d.CheckIntervalDuration.String())
  341. for {
  342. select {
  343. case <-d.t.Dying():
  344. d.logger.Infof("stopping container watcher")
  345. return nil
  346. case <-ticker.C:
  347. // to track for garbage collection
  348. runningContainersID := make(map[string]bool)
  349. runningContainer, err := d.Client.ContainerList(context.Background(), dockerTypes.ContainerListOptions{})
  350. if err != nil {
  351. if strings.Contains(strings.ToLower(err.Error()), "cannot connect to the docker daemon at") {
  352. for idx, container := range d.runningContainerState {
  353. if d.runningContainerState[idx].t.Alive() {
  354. d.logger.Infof("killing tail for container %s", container.Name)
  355. d.runningContainerState[idx].t.Kill(nil)
  356. if err := d.runningContainerState[idx].t.Wait(); err != nil {
  357. d.logger.Infof("error while waiting for death of %s : %s", container.Name, err)
  358. }
  359. }
  360. delete(d.runningContainerState, idx)
  361. }
  362. } else {
  363. log.Errorf("container list err: %s", err.Error())
  364. }
  365. continue
  366. }
  367. for _, container := range runningContainer {
  368. runningContainersID[container.ID] = true
  369. // don't need to re eval an already monitored container
  370. if _, ok := d.runningContainerState[container.ID]; ok {
  371. continue
  372. }
  373. if containerConfig, ok := d.EvalContainer(container); ok {
  374. monitChan <- containerConfig
  375. }
  376. }
  377. for containerStateID, containerConfig := range d.runningContainerState {
  378. if _, ok := runningContainersID[containerStateID]; !ok {
  379. deleteChan <- containerConfig
  380. }
  381. }
  382. d.logger.Tracef("Reading logs from %d containers", len(d.runningContainerState))
  383. ticker.Reset(d.CheckIntervalDuration)
  384. }
  385. }
  386. }
  387. func (d *DockerSource) StreamingAcquisition(out chan types.Event, t *tomb.Tomb) error {
  388. d.t = t
  389. monitChan := make(chan *ContainerConfig)
  390. deleteChan := make(chan *ContainerConfig)
  391. d.logger.Infof("Starting docker acquisition")
  392. t.Go(func() error {
  393. return d.DockerManager(monitChan, deleteChan, out)
  394. })
  395. return d.WatchContainer(monitChan, deleteChan)
  396. }
  397. func (d *DockerSource) Dump() interface{} {
  398. return d
  399. }
  400. func ReadTailScanner(scanner *bufio.Scanner, out chan string, t *tomb.Tomb) error {
  401. for scanner.Scan() {
  402. out <- scanner.Text()
  403. }
  404. return nil
  405. }
  406. func (d *DockerSource) TailDocker(container *ContainerConfig, outChan chan types.Event) error {
  407. container.logger.Infof("start tail for container %s", container.Name)
  408. dockerReader, err := d.Client.ContainerLogs(context.Background(), container.ID, *d.containerLogsOptions)
  409. if err != nil {
  410. container.logger.Errorf("unable to read logs from container: %+v", err)
  411. return err
  412. }
  413. var scanner *bufio.Scanner
  414. // we use this library to normalize docker API logs (cf. https://ahmet.im/blog/docker-logs-api-binary-format-explained/)
  415. if container.Tty {
  416. scanner = bufio.NewScanner(dockerReader)
  417. } else {
  418. reader := dlog.NewReader(dockerReader)
  419. scanner = bufio.NewScanner(reader)
  420. }
  421. readerChan := make(chan string)
  422. readerTomb := &tomb.Tomb{}
  423. readerTomb.Go(func() error {
  424. return ReadTailScanner(scanner, readerChan, readerTomb)
  425. })
  426. for {
  427. select {
  428. case <-container.t.Dying():
  429. readerTomb.Kill(nil)
  430. container.logger.Infof("tail stopped for container %s", container.Name)
  431. return nil
  432. case line := <-readerChan:
  433. if line == "" {
  434. continue
  435. }
  436. l := types.Line{}
  437. l.Raw = line
  438. l.Labels = d.Config.Labels
  439. l.Time = time.Now().UTC()
  440. l.Src = container.Name
  441. l.Process = true
  442. l.Module = d.GetName()
  443. evt := types.Event{Line: l, Process: true, Type: types.LOG, ExpectMode: leaky.LIVE}
  444. linesRead.With(prometheus.Labels{"source": container.Name}).Inc()
  445. outChan <- evt
  446. d.logger.Debugf("Sent line to parsing: %+v", evt.Line.Raw)
  447. case <-readerTomb.Dying():
  448. //This case is to handle temporarily losing the connection to the docker socket
  449. //The only known case currently is when using docker-socket-proxy (and maybe a docker daemon restart)
  450. d.logger.Debugf("readerTomb dying for container %s, removing it from runningContainerState", container.Name)
  451. delete(d.runningContainerState, container.ID)
  452. //Also reset the Since to avoid re-reading logs
  453. d.Config.Since = time.Now().UTC().Format(time.RFC3339)
  454. d.containerLogsOptions.Since = d.Config.Since
  455. return nil
  456. }
  457. }
  458. }
  459. func (d *DockerSource) DockerManager(in chan *ContainerConfig, deleteChan chan *ContainerConfig, outChan chan types.Event) error {
  460. d.logger.Info("DockerSource Manager started")
  461. for {
  462. select {
  463. case newContainer := <-in:
  464. if _, ok := d.runningContainerState[newContainer.ID]; !ok {
  465. newContainer.t = &tomb.Tomb{}
  466. newContainer.logger = d.logger.WithFields(log.Fields{"container_name": newContainer.Name})
  467. newContainer.t.Go(func() error {
  468. return d.TailDocker(newContainer, outChan)
  469. })
  470. d.runningContainerState[newContainer.ID] = newContainer
  471. }
  472. case containerToDelete := <-deleteChan:
  473. if containerConfig, ok := d.runningContainerState[containerToDelete.ID]; ok {
  474. log.Infof("container acquisition stopped for container '%s'", containerConfig.Name)
  475. containerConfig.t.Kill(nil)
  476. delete(d.runningContainerState, containerToDelete.ID)
  477. }
  478. case <-d.t.Dying():
  479. for idx, container := range d.runningContainerState {
  480. if d.runningContainerState[idx].t.Alive() {
  481. d.logger.Infof("killing tail for container %s", container.Name)
  482. d.runningContainerState[idx].t.Kill(nil)
  483. if err := d.runningContainerState[idx].t.Wait(); err != nil {
  484. d.logger.Infof("error while waiting for death of %s : %s", container.Name, err)
  485. }
  486. }
  487. }
  488. d.runningContainerState = nil
  489. d.logger.Debugf("routine cleanup done, return")
  490. return nil
  491. }
  492. }
  493. }