file.go 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541
  1. package fileacquisition
  2. import (
  3. "bufio"
  4. "compress/gzip"
  5. "fmt"
  6. "io"
  7. "net/url"
  8. "os"
  9. "path"
  10. "path/filepath"
  11. "regexp"
  12. "strconv"
  13. "strings"
  14. "time"
  15. "github.com/fsnotify/fsnotify"
  16. "github.com/nxadm/tail"
  17. "github.com/pkg/errors"
  18. "github.com/prometheus/client_golang/prometheus"
  19. log "github.com/sirupsen/logrus"
  20. "gopkg.in/tomb.v2"
  21. "gopkg.in/yaml.v2"
  22. "github.com/crowdsecurity/crowdsec/pkg/acquisition/configuration"
  23. "github.com/crowdsecurity/crowdsec/pkg/types"
  24. )
  25. var linesRead = prometheus.NewCounterVec(
  26. prometheus.CounterOpts{
  27. Name: "cs_filesource_hits_total",
  28. Help: "Total lines that were read.",
  29. },
  30. []string{"source"})
  31. type FileConfiguration struct {
  32. Filenames []string
  33. ExcludeRegexps []string `yaml:"exclude_regexps"`
  34. Filename string
  35. ForceInotify bool `yaml:"force_inotify"`
  36. MaxBufferSize int `yaml:"max_buffer_size"`
  37. configuration.DataSourceCommonCfg `yaml:",inline"`
  38. }
  39. type FileSource struct {
  40. config FileConfiguration
  41. watcher *fsnotify.Watcher
  42. watchedDirectories map[string]bool
  43. tails map[string]bool
  44. logger *log.Entry
  45. files []string
  46. exclude_regexps []*regexp.Regexp
  47. }
  48. func (f *FileSource) GetUuid() string {
  49. return f.config.UniqueId
  50. }
  51. func (f *FileSource) UnmarshalConfig(yamlConfig []byte) error {
  52. f.config = FileConfiguration{}
  53. err := yaml.UnmarshalStrict(yamlConfig, &f.config)
  54. if err != nil {
  55. return fmt.Errorf("cannot parse FileAcquisition configuration: %w", err)
  56. }
  57. if f.logger != nil {
  58. f.logger.Tracef("FileAcquisition configuration: %+v", f.config)
  59. }
  60. if len(f.config.Filename) != 0 {
  61. f.config.Filenames = append(f.config.Filenames, f.config.Filename)
  62. }
  63. if len(f.config.Filenames) == 0 {
  64. return fmt.Errorf("no filename or filenames configuration provided")
  65. }
  66. if f.config.Mode == "" {
  67. f.config.Mode = configuration.TAIL_MODE
  68. }
  69. if f.config.Mode != configuration.CAT_MODE && f.config.Mode != configuration.TAIL_MODE {
  70. return fmt.Errorf("unsupported mode %s for file source", f.config.Mode)
  71. }
  72. for _, exclude := range f.config.ExcludeRegexps {
  73. re, err := regexp.Compile(exclude)
  74. if err != nil {
  75. return fmt.Errorf("could not compile regexp %s: %w", exclude, err)
  76. }
  77. f.exclude_regexps = append(f.exclude_regexps, re)
  78. }
  79. return nil
  80. }
  81. func (f *FileSource) Configure(yamlConfig []byte, logger *log.Entry) error {
  82. f.logger = logger
  83. err := f.UnmarshalConfig(yamlConfig)
  84. if err != nil {
  85. return err
  86. }
  87. f.watchedDirectories = make(map[string]bool)
  88. f.tails = make(map[string]bool)
  89. f.watcher, err = fsnotify.NewWatcher()
  90. if err != nil {
  91. return errors.Wrapf(err, "Could not create fsnotify watcher")
  92. }
  93. f.logger.Tracef("Actual FileAcquisition Configuration %+v", f.config)
  94. for _, pattern := range f.config.Filenames {
  95. if f.config.ForceInotify {
  96. directory := filepath.Dir(pattern)
  97. f.logger.Infof("Force add watch on %s", directory)
  98. if !f.watchedDirectories[directory] {
  99. err = f.watcher.Add(directory)
  100. if err != nil {
  101. f.logger.Errorf("Could not create watch on directory %s : %s", directory, err)
  102. continue
  103. }
  104. f.watchedDirectories[directory] = true
  105. }
  106. }
  107. files, err := filepath.Glob(pattern)
  108. if err != nil {
  109. return errors.Wrap(err, "Glob failure")
  110. }
  111. if len(files) == 0 {
  112. f.logger.Warnf("No matching files for pattern %s", pattern)
  113. continue
  114. }
  115. for _, file := range files {
  116. //check if file is excluded
  117. excluded := false
  118. for _, pattern := range f.exclude_regexps {
  119. if pattern.MatchString(file) {
  120. excluded = true
  121. f.logger.Infof("Skipping file %s as it matches exclude pattern %s", file, pattern)
  122. break
  123. }
  124. }
  125. if excluded {
  126. continue
  127. }
  128. if files[0] != pattern && f.config.Mode == configuration.TAIL_MODE { //we have a glob pattern
  129. directory := filepath.Dir(file)
  130. f.logger.Debugf("Will add watch to directory: %s", directory)
  131. if !f.watchedDirectories[directory] {
  132. err = f.watcher.Add(directory)
  133. if err != nil {
  134. f.logger.Errorf("Could not create watch on directory %s : %s", directory, err)
  135. continue
  136. }
  137. f.watchedDirectories[directory] = true
  138. } else {
  139. f.logger.Debugf("Watch for directory %s already exists", directory)
  140. }
  141. }
  142. f.logger.Infof("Adding file %s to datasources", file)
  143. f.files = append(f.files, file)
  144. }
  145. }
  146. return nil
  147. }
  148. func (f *FileSource) ConfigureByDSN(dsn string, labels map[string]string, logger *log.Entry, uuid string) error {
  149. if !strings.HasPrefix(dsn, "file://") {
  150. return fmt.Errorf("invalid DSN %s for file source, must start with file://", dsn)
  151. }
  152. f.logger = logger
  153. f.config = FileConfiguration{}
  154. dsn = strings.TrimPrefix(dsn, "file://")
  155. args := strings.Split(dsn, "?")
  156. if len(args[0]) == 0 {
  157. return fmt.Errorf("empty file:// DSN")
  158. }
  159. if len(args) == 2 && len(args[1]) != 0 {
  160. params, err := url.ParseQuery(args[1])
  161. if err != nil {
  162. return errors.Wrap(err, "could not parse file args")
  163. }
  164. for key, value := range params {
  165. switch key {
  166. case "log_level":
  167. if len(value) != 1 {
  168. return errors.New("expected zero or one value for 'log_level'")
  169. }
  170. lvl, err := log.ParseLevel(value[0])
  171. if err != nil {
  172. return errors.Wrapf(err, "unknown level %s", value[0])
  173. }
  174. f.logger.Logger.SetLevel(lvl)
  175. case "max_buffer_size":
  176. if len(value) != 1 {
  177. return errors.New("expected zero or one value for 'max_buffer_size'")
  178. }
  179. maxBufferSize, err := strconv.Atoi(value[0])
  180. if err != nil {
  181. return errors.Wrapf(err, "could not parse max_buffer_size %s", value[0])
  182. }
  183. f.config.MaxBufferSize = maxBufferSize
  184. default:
  185. return fmt.Errorf("unknown parameter %s", key)
  186. }
  187. }
  188. }
  189. f.config.Labels = labels
  190. f.config.Mode = configuration.CAT_MODE
  191. f.config.UniqueId = uuid
  192. f.logger.Debugf("Will try pattern %s", args[0])
  193. files, err := filepath.Glob(args[0])
  194. if err != nil {
  195. return errors.Wrap(err, "Glob failure")
  196. }
  197. if len(files) == 0 {
  198. return fmt.Errorf("no matching files for pattern %s", args[0])
  199. }
  200. if len(files) > 1 {
  201. f.logger.Infof("Will read %d files", len(files))
  202. }
  203. for _, file := range files {
  204. f.logger.Infof("Adding file %s to filelist", file)
  205. f.files = append(f.files, file)
  206. }
  207. return nil
  208. }
  209. func (f *FileSource) GetMode() string {
  210. return f.config.Mode
  211. }
  212. // SupportedModes returns the supported modes by the acquisition module
  213. func (f *FileSource) SupportedModes() []string {
  214. return []string{configuration.TAIL_MODE, configuration.CAT_MODE}
  215. }
  216. // OneShotAcquisition reads a set of file and returns when done
  217. func (f *FileSource) OneShotAcquisition(out chan types.Event, t *tomb.Tomb) error {
  218. f.logger.Debug("In oneshot")
  219. for _, file := range f.files {
  220. fi, err := os.Stat(file)
  221. if err != nil {
  222. return fmt.Errorf("could not stat file %s : %w", file, err)
  223. }
  224. if fi.IsDir() {
  225. f.logger.Warnf("%s is a directory, ignoring it.", file)
  226. continue
  227. }
  228. f.logger.Infof("reading %s at once", file)
  229. err = f.readFile(file, out, t)
  230. if err != nil {
  231. return err
  232. }
  233. }
  234. return nil
  235. }
  236. func (f *FileSource) GetMetrics() []prometheus.Collector {
  237. return []prometheus.Collector{linesRead}
  238. }
  239. func (f *FileSource) GetAggregMetrics() []prometheus.Collector {
  240. return []prometheus.Collector{linesRead}
  241. }
  242. func (f *FileSource) GetName() string {
  243. return "file"
  244. }
  245. func (f *FileSource) CanRun() error {
  246. return nil
  247. }
  248. func (f *FileSource) StreamingAcquisition(out chan types.Event, t *tomb.Tomb) error {
  249. f.logger.Debug("Starting live acquisition")
  250. t.Go(func() error {
  251. return f.monitorNewFiles(out, t)
  252. })
  253. for _, file := range f.files {
  254. //before opening the file, check if we need to specifically avoid it. (XXX)
  255. skip := false
  256. for _, pattern := range f.exclude_regexps {
  257. if pattern.MatchString(file) {
  258. f.logger.Infof("file %s matches exclusion pattern %s, skipping", file, pattern.String())
  259. skip = true
  260. break
  261. }
  262. }
  263. if skip {
  264. continue
  265. }
  266. //cf. https://github.com/crowdsecurity/crowdsec/issues/1168
  267. //do not rely on stat, reclose file immediately as it's opened by Tail
  268. fd, err := os.Open(file)
  269. if err != nil {
  270. f.logger.Errorf("unable to read %s : %s", file, err)
  271. continue
  272. }
  273. if err := fd.Close(); err != nil {
  274. f.logger.Errorf("unable to close %s : %s", file, err)
  275. continue
  276. }
  277. fi, err := os.Stat(file)
  278. if err != nil {
  279. return fmt.Errorf("could not stat file %s : %w", file, err)
  280. }
  281. if fi.IsDir() {
  282. f.logger.Warnf("%s is a directory, ignoring it.", file)
  283. continue
  284. }
  285. tail, err := tail.TailFile(file, tail.Config{ReOpen: true, Follow: true, Poll: true, Location: &tail.SeekInfo{Offset: 0, Whence: io.SeekEnd}, Logger: log.NewEntry(log.StandardLogger())})
  286. if err != nil {
  287. f.logger.Errorf("Could not start tailing file %s : %s", file, err)
  288. continue
  289. }
  290. f.tails[file] = true
  291. t.Go(func() error {
  292. defer types.CatchPanic("crowdsec/acquis/file/live/fsnotify")
  293. return f.tailFile(out, t, tail)
  294. })
  295. }
  296. return nil
  297. }
  298. func (f *FileSource) Dump() interface{} {
  299. return f
  300. }
  301. func (f *FileSource) monitorNewFiles(out chan types.Event, t *tomb.Tomb) error {
  302. logger := f.logger.WithField("goroutine", "inotify")
  303. for {
  304. select {
  305. case event, ok := <-f.watcher.Events:
  306. if !ok {
  307. return nil
  308. }
  309. if event.Op&fsnotify.Create == fsnotify.Create {
  310. fi, err := os.Stat(event.Name)
  311. if err != nil {
  312. logger.Errorf("Could not stat() new file %s, ignoring it : %s", event.Name, err)
  313. continue
  314. }
  315. if fi.IsDir() {
  316. continue
  317. }
  318. logger.Debugf("Detected new file %s", event.Name)
  319. matched := false
  320. for _, pattern := range f.config.Filenames {
  321. logger.Debugf("Matching %s with %s", pattern, event.Name)
  322. matched, err = path.Match(pattern, event.Name)
  323. if err != nil {
  324. logger.Errorf("Could not match pattern : %s", err)
  325. continue
  326. }
  327. if matched {
  328. break
  329. }
  330. }
  331. if !matched {
  332. continue
  333. }
  334. //before opening the file, check if we need to specifically avoid it. (XXX)
  335. skip := false
  336. for _, pattern := range f.exclude_regexps {
  337. if pattern.MatchString(event.Name) {
  338. f.logger.Infof("file %s matches exclusion pattern %s, skipping", event.Name, pattern.String())
  339. skip = true
  340. break
  341. }
  342. }
  343. if skip {
  344. continue
  345. }
  346. if f.tails[event.Name] {
  347. //we already have a tail on it, do not start a new one
  348. logger.Debugf("Already tailing file %s, not creating a new tail", event.Name)
  349. break
  350. }
  351. //cf. https://github.com/crowdsecurity/crowdsec/issues/1168
  352. //do not rely on stat, reclose file immediately as it's opened by Tail
  353. fd, err := os.Open(event.Name)
  354. if err != nil {
  355. f.logger.Errorf("unable to read %s : %s", event.Name, err)
  356. continue
  357. }
  358. if err := fd.Close(); err != nil {
  359. f.logger.Errorf("unable to close %s : %s", event.Name, err)
  360. continue
  361. }
  362. //Slightly different parameters for Location, as we want to read the first lines of the newly created file
  363. tail, err := tail.TailFile(event.Name, tail.Config{ReOpen: true, Follow: true, Poll: true, Location: &tail.SeekInfo{Offset: 0, Whence: io.SeekStart}})
  364. if err != nil {
  365. logger.Errorf("Could not start tailing file %s : %s", event.Name, err)
  366. break
  367. }
  368. f.tails[event.Name] = true
  369. t.Go(func() error {
  370. defer types.CatchPanic("crowdsec/acquis/tailfile")
  371. return f.tailFile(out, t, tail)
  372. })
  373. }
  374. case err, ok := <-f.watcher.Errors:
  375. if !ok {
  376. return nil
  377. }
  378. logger.Errorf("Error while monitoring folder: %s", err)
  379. case <-t.Dying():
  380. err := f.watcher.Close()
  381. if err != nil {
  382. return errors.Wrapf(err, "could not remove all inotify watches")
  383. }
  384. return nil
  385. }
  386. }
  387. }
  388. func (f *FileSource) tailFile(out chan types.Event, t *tomb.Tomb, tail *tail.Tail) error {
  389. logger := f.logger.WithField("tail", tail.Filename)
  390. logger.Debugf("-> Starting tail of %s", tail.Filename)
  391. for {
  392. select {
  393. case <-t.Dying():
  394. logger.Infof("File datasource %s stopping", tail.Filename)
  395. if err := tail.Stop(); err != nil {
  396. f.logger.Errorf("error in stop : %s", err)
  397. return err
  398. }
  399. return nil
  400. case <-tail.Dying(): //our tailer is dying
  401. logger.Warningf("File reader of %s died", tail.Filename)
  402. t.Kill(fmt.Errorf("dead reader for %s", tail.Filename))
  403. return fmt.Errorf("reader for %s is dead", tail.Filename)
  404. case line := <-tail.Lines:
  405. if line == nil {
  406. logger.Warningf("tail for %s is empty", tail.Filename)
  407. continue
  408. }
  409. if line.Err != nil {
  410. logger.Warningf("fetch error : %v", line.Err)
  411. return line.Err
  412. }
  413. if line.Text == "" { //skip empty lines
  414. continue
  415. }
  416. linesRead.With(prometheus.Labels{"source": tail.Filename}).Inc()
  417. l := types.Line{
  418. Raw: trimLine(line.Text),
  419. Labels: f.config.Labels,
  420. Time: line.Time,
  421. Src: tail.Filename,
  422. Process: true,
  423. Module: f.GetName(),
  424. }
  425. //we're tailing, it must be real time logs
  426. logger.Debugf("pushing %+v", l)
  427. expectMode := types.LIVE
  428. if f.config.UseTimeMachine {
  429. expectMode = types.TIMEMACHINE
  430. }
  431. out <- types.Event{Line: l, Process: true, Type: types.LOG, ExpectMode: expectMode}
  432. }
  433. }
  434. }
  435. func (f *FileSource) readFile(filename string, out chan types.Event, t *tomb.Tomb) error {
  436. var scanner *bufio.Scanner
  437. logger := f.logger.WithField("oneshot", filename)
  438. fd, err := os.Open(filename)
  439. if err != nil {
  440. return errors.Wrapf(err, "failed opening %s", filename)
  441. }
  442. defer fd.Close()
  443. if strings.HasSuffix(filename, ".gz") {
  444. gz, err := gzip.NewReader(fd)
  445. if err != nil {
  446. logger.Errorf("Failed to read gz file: %s", err)
  447. return errors.Wrapf(err, "failed to read gz %s", filename)
  448. }
  449. defer gz.Close()
  450. scanner = bufio.NewScanner(gz)
  451. } else {
  452. scanner = bufio.NewScanner(fd)
  453. }
  454. scanner.Split(bufio.ScanLines)
  455. if f.config.MaxBufferSize > 0 {
  456. buf := make([]byte, 0, 64*1024)
  457. scanner.Buffer(buf, f.config.MaxBufferSize)
  458. }
  459. for scanner.Scan() {
  460. if scanner.Text() == "" {
  461. continue
  462. }
  463. l := types.Line{
  464. Raw: scanner.Text(),
  465. Time: time.Now().UTC(),
  466. Src: filename,
  467. Labels: f.config.Labels,
  468. Process: true,
  469. Module: f.GetName(),
  470. }
  471. logger.Debugf("line %s", l.Raw)
  472. linesRead.With(prometheus.Labels{"source": filename}).Inc()
  473. //we're reading logs at once, it must be time-machine buckets
  474. out <- types.Event{Line: l, Process: true, Type: types.LOG, ExpectMode: types.TIMEMACHINE}
  475. }
  476. if err := scanner.Err(); err != nil {
  477. logger.Errorf("Error while reading file: %s", err)
  478. t.Kill(err)
  479. return err
  480. }
  481. t.Kill(nil)
  482. return nil
  483. }