file.go 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550
  1. package fileacquisition
  2. import (
  3. "bufio"
  4. "compress/gzip"
  5. "fmt"
  6. "io"
  7. "net/url"
  8. "os"
  9. "path/filepath"
  10. "regexp"
  11. "strconv"
  12. "strings"
  13. "time"
  14. "github.com/fsnotify/fsnotify"
  15. "github.com/nxadm/tail"
  16. "github.com/pkg/errors"
  17. "github.com/prometheus/client_golang/prometheus"
  18. log "github.com/sirupsen/logrus"
  19. "gopkg.in/tomb.v2"
  20. "gopkg.in/yaml.v2"
  21. "github.com/crowdsecurity/go-cs-lib/pkg/trace"
  22. "github.com/crowdsecurity/crowdsec/pkg/acquisition/configuration"
  23. "github.com/crowdsecurity/crowdsec/pkg/types"
  24. )
  25. var linesRead = prometheus.NewCounterVec(
  26. prometheus.CounterOpts{
  27. Name: "cs_filesource_hits_total",
  28. Help: "Total lines that were read.",
  29. },
  30. []string{"source"})
  31. type FileConfiguration struct {
  32. Filenames []string
  33. ExcludeRegexps []string `yaml:"exclude_regexps"`
  34. Filename string
  35. ForceInotify bool `yaml:"force_inotify"`
  36. MaxBufferSize int `yaml:"max_buffer_size"`
  37. PollWithoutInotify bool `yaml:"poll_without_inotify"`
  38. configuration.DataSourceCommonCfg `yaml:",inline"`
  39. }
  40. type FileSource struct {
  41. config FileConfiguration
  42. watcher *fsnotify.Watcher
  43. watchedDirectories map[string]bool
  44. tails map[string]bool
  45. logger *log.Entry
  46. files []string
  47. exclude_regexps []*regexp.Regexp
  48. }
  49. func (f *FileSource) GetUuid() string {
  50. return f.config.UniqueId
  51. }
  52. func (f *FileSource) UnmarshalConfig(yamlConfig []byte) error {
  53. f.config = FileConfiguration{}
  54. err := yaml.UnmarshalStrict(yamlConfig, &f.config)
  55. if err != nil {
  56. return fmt.Errorf("cannot parse FileAcquisition configuration: %w", err)
  57. }
  58. if f.logger != nil {
  59. f.logger.Tracef("FileAcquisition configuration: %+v", f.config)
  60. }
  61. if len(f.config.Filename) != 0 {
  62. f.config.Filenames = append(f.config.Filenames, f.config.Filename)
  63. }
  64. if len(f.config.Filenames) == 0 {
  65. return fmt.Errorf("no filename or filenames configuration provided")
  66. }
  67. if f.config.Mode == "" {
  68. f.config.Mode = configuration.TAIL_MODE
  69. }
  70. if f.config.Mode != configuration.CAT_MODE && f.config.Mode != configuration.TAIL_MODE {
  71. return fmt.Errorf("unsupported mode %s for file source", f.config.Mode)
  72. }
  73. for _, exclude := range f.config.ExcludeRegexps {
  74. re, err := regexp.Compile(exclude)
  75. if err != nil {
  76. return fmt.Errorf("could not compile regexp %s: %w", exclude, err)
  77. }
  78. f.exclude_regexps = append(f.exclude_regexps, re)
  79. }
  80. return nil
  81. }
  82. func (f *FileSource) Configure(yamlConfig []byte, logger *log.Entry) error {
  83. f.logger = logger
  84. err := f.UnmarshalConfig(yamlConfig)
  85. if err != nil {
  86. return err
  87. }
  88. f.watchedDirectories = make(map[string]bool)
  89. f.tails = make(map[string]bool)
  90. f.watcher, err = fsnotify.NewWatcher()
  91. if err != nil {
  92. return fmt.Errorf("could not create fsnotify watcher: %w", err)
  93. }
  94. f.logger.Tracef("Actual FileAcquisition Configuration %+v", f.config)
  95. for _, pattern := range f.config.Filenames {
  96. if f.config.ForceInotify {
  97. directory := filepath.Dir(pattern)
  98. f.logger.Infof("Force add watch on %s", directory)
  99. if !f.watchedDirectories[directory] {
  100. err = f.watcher.Add(directory)
  101. if err != nil {
  102. f.logger.Errorf("Could not create watch on directory %s : %s", directory, err)
  103. continue
  104. }
  105. f.watchedDirectories[directory] = true
  106. }
  107. }
  108. files, err := filepath.Glob(pattern)
  109. if err != nil {
  110. return fmt.Errorf("glob failure: %w", err)
  111. }
  112. if len(files) == 0 {
  113. f.logger.Warnf("No matching files for pattern %s", pattern)
  114. continue
  115. }
  116. for _, file := range files {
  117. //check if file is excluded
  118. excluded := false
  119. for _, pattern := range f.exclude_regexps {
  120. if pattern.MatchString(file) {
  121. excluded = true
  122. f.logger.Infof("Skipping file %s as it matches exclude pattern %s", file, pattern)
  123. break
  124. }
  125. }
  126. if excluded {
  127. continue
  128. }
  129. if files[0] != pattern && f.config.Mode == configuration.TAIL_MODE { //we have a glob pattern
  130. directory := filepath.Dir(file)
  131. f.logger.Debugf("Will add watch to directory: %s", directory)
  132. if !f.watchedDirectories[directory] {
  133. err = f.watcher.Add(directory)
  134. if err != nil {
  135. f.logger.Errorf("Could not create watch on directory %s : %s", directory, err)
  136. continue
  137. }
  138. f.watchedDirectories[directory] = true
  139. } else {
  140. f.logger.Debugf("Watch for directory %s already exists", directory)
  141. }
  142. }
  143. f.logger.Infof("Adding file %s to datasources", file)
  144. f.files = append(f.files, file)
  145. }
  146. }
  147. return nil
  148. }
  149. func (f *FileSource) ConfigureByDSN(dsn string, labels map[string]string, logger *log.Entry, uuid string) error {
  150. if !strings.HasPrefix(dsn, "file://") {
  151. return fmt.Errorf("invalid DSN %s for file source, must start with file://", dsn)
  152. }
  153. f.logger = logger
  154. f.config = FileConfiguration{}
  155. dsn = strings.TrimPrefix(dsn, "file://")
  156. args := strings.Split(dsn, "?")
  157. if len(args[0]) == 0 {
  158. return fmt.Errorf("empty file:// DSN")
  159. }
  160. if len(args) == 2 && len(args[1]) != 0 {
  161. params, err := url.ParseQuery(args[1])
  162. if err != nil {
  163. return fmt.Errorf("could not parse file args: %w", err)
  164. }
  165. for key, value := range params {
  166. switch key {
  167. case "log_level":
  168. if len(value) != 1 {
  169. return errors.New("expected zero or one value for 'log_level'")
  170. }
  171. lvl, err := log.ParseLevel(value[0])
  172. if err != nil {
  173. return fmt.Errorf("unknown level %s: %w", value[0], err)
  174. }
  175. f.logger.Logger.SetLevel(lvl)
  176. case "max_buffer_size":
  177. if len(value) != 1 {
  178. return errors.New("expected zero or one value for 'max_buffer_size'")
  179. }
  180. maxBufferSize, err := strconv.Atoi(value[0])
  181. if err != nil {
  182. return fmt.Errorf("could not parse max_buffer_size %s: %w", value[0], err)
  183. }
  184. f.config.MaxBufferSize = maxBufferSize
  185. default:
  186. return fmt.Errorf("unknown parameter %s", key)
  187. }
  188. }
  189. }
  190. f.config.Labels = labels
  191. f.config.Mode = configuration.CAT_MODE
  192. f.config.UniqueId = uuid
  193. f.logger.Debugf("Will try pattern %s", args[0])
  194. files, err := filepath.Glob(args[0])
  195. if err != nil {
  196. return fmt.Errorf("glob failure: %w", err)
  197. }
  198. if len(files) == 0 {
  199. return fmt.Errorf("no matching files for pattern %s", args[0])
  200. }
  201. if len(files) > 1 {
  202. f.logger.Infof("Will read %d files", len(files))
  203. }
  204. for _, file := range files {
  205. f.logger.Infof("Adding file %s to filelist", file)
  206. f.files = append(f.files, file)
  207. }
  208. return nil
  209. }
  210. func (f *FileSource) GetMode() string {
  211. return f.config.Mode
  212. }
  213. // SupportedModes returns the supported modes by the acquisition module
  214. func (f *FileSource) SupportedModes() []string {
  215. return []string{configuration.TAIL_MODE, configuration.CAT_MODE}
  216. }
  217. // OneShotAcquisition reads a set of file and returns when done
  218. func (f *FileSource) OneShotAcquisition(out chan types.Event, t *tomb.Tomb) error {
  219. f.logger.Debug("In oneshot")
  220. for _, file := range f.files {
  221. fi, err := os.Stat(file)
  222. if err != nil {
  223. return fmt.Errorf("could not stat file %s : %w", file, err)
  224. }
  225. if fi.IsDir() {
  226. f.logger.Warnf("%s is a directory, ignoring it.", file)
  227. continue
  228. }
  229. f.logger.Infof("reading %s at once", file)
  230. err = f.readFile(file, out, t)
  231. if err != nil {
  232. return err
  233. }
  234. }
  235. return nil
  236. }
  237. func (f *FileSource) GetMetrics() []prometheus.Collector {
  238. return []prometheus.Collector{linesRead}
  239. }
  240. func (f *FileSource) GetAggregMetrics() []prometheus.Collector {
  241. return []prometheus.Collector{linesRead}
  242. }
  243. func (f *FileSource) GetName() string {
  244. return "file"
  245. }
  246. func (f *FileSource) CanRun() error {
  247. return nil
  248. }
  249. func (f *FileSource) StreamingAcquisition(out chan types.Event, t *tomb.Tomb) error {
  250. f.logger.Debug("Starting live acquisition")
  251. t.Go(func() error {
  252. return f.monitorNewFiles(out, t)
  253. })
  254. for _, file := range f.files {
  255. //before opening the file, check if we need to specifically avoid it. (XXX)
  256. skip := false
  257. for _, pattern := range f.exclude_regexps {
  258. if pattern.MatchString(file) {
  259. f.logger.Infof("file %s matches exclusion pattern %s, skipping", file, pattern.String())
  260. skip = true
  261. break
  262. }
  263. }
  264. if skip {
  265. continue
  266. }
  267. //cf. https://github.com/crowdsecurity/crowdsec/issues/1168
  268. //do not rely on stat, reclose file immediately as it's opened by Tail
  269. fd, err := os.Open(file)
  270. if err != nil {
  271. f.logger.Errorf("unable to read %s : %s", file, err)
  272. continue
  273. }
  274. if err := fd.Close(); err != nil {
  275. f.logger.Errorf("unable to close %s : %s", file, err)
  276. continue
  277. }
  278. fi, err := os.Stat(file)
  279. if err != nil {
  280. return fmt.Errorf("could not stat file %s : %w", file, err)
  281. }
  282. if fi.IsDir() {
  283. f.logger.Warnf("%s is a directory, ignoring it.", file)
  284. continue
  285. }
  286. tail, err := tail.TailFile(file, tail.Config{ReOpen: true, Follow: true, Poll: f.config.PollWithoutInotify, Location: &tail.SeekInfo{Offset: 0, Whence: io.SeekEnd}, Logger: log.NewEntry(log.StandardLogger())})
  287. if err != nil {
  288. f.logger.Errorf("Could not start tailing file %s : %s", file, err)
  289. continue
  290. }
  291. f.tails[file] = true
  292. t.Go(func() error {
  293. defer trace.CatchPanic("crowdsec/acquis/file/live/fsnotify")
  294. return f.tailFile(out, t, tail)
  295. })
  296. }
  297. return nil
  298. }
  299. func (f *FileSource) Dump() interface{} {
  300. return f
  301. }
  302. func (f *FileSource) monitorNewFiles(out chan types.Event, t *tomb.Tomb) error {
  303. logger := f.logger.WithField("goroutine", "inotify")
  304. for {
  305. select {
  306. case event, ok := <-f.watcher.Events:
  307. if !ok {
  308. return nil
  309. }
  310. if event.Op&fsnotify.Create == fsnotify.Create {
  311. fi, err := os.Stat(event.Name)
  312. if err != nil {
  313. logger.Errorf("Could not stat() new file %s, ignoring it : %s", event.Name, err)
  314. continue
  315. }
  316. if fi.IsDir() {
  317. continue
  318. }
  319. logger.Debugf("Detected new file %s", event.Name)
  320. matched := false
  321. for _, pattern := range f.config.Filenames {
  322. logger.Debugf("Matching %s with %s", pattern, event.Name)
  323. matched, err = filepath.Match(pattern, event.Name)
  324. if err != nil {
  325. logger.Errorf("Could not match pattern : %s", err)
  326. continue
  327. }
  328. if matched {
  329. logger.Debugf("Matched %s with %s", pattern, event.Name)
  330. break
  331. }
  332. }
  333. if !matched {
  334. continue
  335. }
  336. //before opening the file, check if we need to specifically avoid it. (XXX)
  337. skip := false
  338. for _, pattern := range f.exclude_regexps {
  339. if pattern.MatchString(event.Name) {
  340. f.logger.Infof("file %s matches exclusion pattern %s, skipping", event.Name, pattern.String())
  341. skip = true
  342. break
  343. }
  344. }
  345. if skip {
  346. continue
  347. }
  348. if f.tails[event.Name] {
  349. //we already have a tail on it, do not start a new one
  350. logger.Debugf("Already tailing file %s, not creating a new tail", event.Name)
  351. break
  352. }
  353. //cf. https://github.com/crowdsecurity/crowdsec/issues/1168
  354. //do not rely on stat, reclose file immediately as it's opened by Tail
  355. fd, err := os.Open(event.Name)
  356. if err != nil {
  357. f.logger.Errorf("unable to read %s : %s", event.Name, err)
  358. continue
  359. }
  360. if err := fd.Close(); err != nil {
  361. f.logger.Errorf("unable to close %s : %s", event.Name, err)
  362. continue
  363. }
  364. //Slightly different parameters for Location, as we want to read the first lines of the newly created file
  365. tail, err := tail.TailFile(event.Name, tail.Config{ReOpen: true, Follow: true, Poll: f.config.PollWithoutInotify, Location: &tail.SeekInfo{Offset: 0, Whence: io.SeekStart}})
  366. if err != nil {
  367. logger.Errorf("Could not start tailing file %s : %s", event.Name, err)
  368. break
  369. }
  370. f.tails[event.Name] = true
  371. t.Go(func() error {
  372. defer trace.CatchPanic("crowdsec/acquis/tailfile")
  373. return f.tailFile(out, t, tail)
  374. })
  375. }
  376. case err, ok := <-f.watcher.Errors:
  377. if !ok {
  378. return nil
  379. }
  380. logger.Errorf("Error while monitoring folder: %s", err)
  381. case <-t.Dying():
  382. err := f.watcher.Close()
  383. if err != nil {
  384. return fmt.Errorf("could not remove all inotify watches: %w", err)
  385. }
  386. return nil
  387. }
  388. }
  389. }
  390. func (f *FileSource) tailFile(out chan types.Event, t *tomb.Tomb, tail *tail.Tail) error {
  391. logger := f.logger.WithField("tail", tail.Filename)
  392. logger.Debugf("-> Starting tail of %s", tail.Filename)
  393. for {
  394. select {
  395. case <-t.Dying():
  396. logger.Infof("File datasource %s stopping", tail.Filename)
  397. if err := tail.Stop(); err != nil {
  398. f.logger.Errorf("error in stop : %s", err)
  399. return err
  400. }
  401. return nil
  402. case <-tail.Dying(): //our tailer is dying
  403. logger.Warningf("File reader of %s died", tail.Filename)
  404. t.Kill(fmt.Errorf("dead reader for %s", tail.Filename))
  405. return fmt.Errorf("reader for %s is dead", tail.Filename)
  406. case line := <-tail.Lines:
  407. if line == nil {
  408. logger.Warningf("tail for %s is empty", tail.Filename)
  409. continue
  410. }
  411. if line.Err != nil {
  412. logger.Warningf("fetch error : %v", line.Err)
  413. return line.Err
  414. }
  415. if line.Text == "" { //skip empty lines
  416. continue
  417. }
  418. linesRead.With(prometheus.Labels{"source": tail.Filename}).Inc()
  419. l := types.Line{
  420. Raw: trimLine(line.Text),
  421. Labels: f.config.Labels,
  422. Time: line.Time,
  423. Src: tail.Filename,
  424. Process: true,
  425. Module: f.GetName(),
  426. }
  427. //we're tailing, it must be real time logs
  428. logger.Debugf("pushing %+v", l)
  429. expectMode := types.LIVE
  430. if f.config.UseTimeMachine {
  431. expectMode = types.TIMEMACHINE
  432. }
  433. out <- types.Event{Line: l, Process: true, Type: types.LOG, ExpectMode: expectMode}
  434. }
  435. }
  436. }
  437. func (f *FileSource) readFile(filename string, out chan types.Event, t *tomb.Tomb) error {
  438. var scanner *bufio.Scanner
  439. logger := f.logger.WithField("oneshot", filename)
  440. fd, err := os.Open(filename)
  441. if err != nil {
  442. return fmt.Errorf("failed opening %s: %w", filename, err)
  443. }
  444. defer fd.Close()
  445. if strings.HasSuffix(filename, ".gz") {
  446. gz, err := gzip.NewReader(fd)
  447. if err != nil {
  448. logger.Errorf("Failed to read gz file: %s", err)
  449. return fmt.Errorf("failed to read gz %s: %w", filename, err)
  450. }
  451. defer gz.Close()
  452. scanner = bufio.NewScanner(gz)
  453. } else {
  454. scanner = bufio.NewScanner(fd)
  455. }
  456. scanner.Split(bufio.ScanLines)
  457. if f.config.MaxBufferSize > 0 {
  458. buf := make([]byte, 0, 64*1024)
  459. scanner.Buffer(buf, f.config.MaxBufferSize)
  460. }
  461. for scanner.Scan() {
  462. select {
  463. case <-t.Dying():
  464. logger.Infof("File datasource %s stopping", filename)
  465. return nil
  466. default:
  467. if scanner.Text() == "" {
  468. continue
  469. }
  470. l := types.Line{
  471. Raw: scanner.Text(),
  472. Time: time.Now().UTC(),
  473. Src: filename,
  474. Labels: f.config.Labels,
  475. Process: true,
  476. Module: f.GetName(),
  477. }
  478. logger.Debugf("line %s", l.Raw)
  479. linesRead.With(prometheus.Labels{"source": filename}).Inc()
  480. //we're reading logs at once, it must be time-machine buckets
  481. out <- types.Event{Line: l, Process: true, Type: types.LOG, ExpectMode: types.TIMEMACHINE}
  482. }
  483. }
  484. if err := scanner.Err(); err != nil {
  485. logger.Errorf("Error while reading file: %s", err)
  486. t.Kill(err)
  487. return err
  488. }
  489. t.Kill(nil)
  490. return nil
  491. }