file.go 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514
  1. package fileacquisition
  2. import (
  3. "bufio"
  4. "compress/gzip"
  5. "fmt"
  6. "io"
  7. "net/url"
  8. "os"
  9. "path"
  10. "path/filepath"
  11. "regexp"
  12. "strings"
  13. "time"
  14. "github.com/fsnotify/fsnotify"
  15. "github.com/nxadm/tail"
  16. "github.com/pkg/errors"
  17. "github.com/prometheus/client_golang/prometheus"
  18. log "github.com/sirupsen/logrus"
  19. "gopkg.in/tomb.v2"
  20. "gopkg.in/yaml.v2"
  21. "github.com/crowdsecurity/crowdsec/pkg/acquisition/configuration"
  22. "github.com/crowdsecurity/crowdsec/pkg/types"
  23. )
  24. var linesRead = prometheus.NewCounterVec(
  25. prometheus.CounterOpts{
  26. Name: "cs_filesource_hits_total",
  27. Help: "Total lines that were read.",
  28. },
  29. []string{"source"})
  30. type FileConfiguration struct {
  31. Filenames []string
  32. ExcludeRegexps []string `yaml:"exclude_regexps"`
  33. Filename string
  34. ForceInotify bool `yaml:"force_inotify"`
  35. configuration.DataSourceCommonCfg `yaml:",inline"`
  36. }
  37. type FileSource struct {
  38. config FileConfiguration
  39. watcher *fsnotify.Watcher
  40. watchedDirectories map[string]bool
  41. tails map[string]bool
  42. logger *log.Entry
  43. files []string
  44. exclude_regexps []*regexp.Regexp
  45. }
  46. func (f *FileSource) UnmarshalConfig(yamlConfig []byte) error {
  47. f.config = FileConfiguration{}
  48. err := yaml.UnmarshalStrict(yamlConfig, &f.config)
  49. if err != nil {
  50. return fmt.Errorf("cannot parse FileAcquisition configuration: %w", err)
  51. }
  52. if f.logger != nil {
  53. f.logger.Tracef("FileAcquisition configuration: %+v", f.config)
  54. }
  55. if len(f.config.Filename) != 0 {
  56. f.config.Filenames = append(f.config.Filenames, f.config.Filename)
  57. }
  58. if len(f.config.Filenames) == 0 {
  59. return fmt.Errorf("no filename or filenames configuration provided")
  60. }
  61. if f.config.Mode == "" {
  62. f.config.Mode = configuration.TAIL_MODE
  63. }
  64. if f.config.Mode != configuration.CAT_MODE && f.config.Mode != configuration.TAIL_MODE {
  65. return fmt.Errorf("unsupported mode %s for file source", f.config.Mode)
  66. }
  67. for _, exclude := range f.config.ExcludeRegexps {
  68. re, err := regexp.Compile(exclude)
  69. if err != nil {
  70. return fmt.Errorf("could not compile regexp %s: %w", exclude, err)
  71. }
  72. f.exclude_regexps = append(f.exclude_regexps, re)
  73. }
  74. return nil
  75. }
  76. func (f *FileSource) Configure(yamlConfig []byte, logger *log.Entry) error {
  77. f.logger = logger
  78. err := f.UnmarshalConfig(yamlConfig)
  79. if err != nil {
  80. return err
  81. }
  82. f.watchedDirectories = make(map[string]bool)
  83. f.tails = make(map[string]bool)
  84. f.watcher, err = fsnotify.NewWatcher()
  85. if err != nil {
  86. return errors.Wrapf(err, "Could not create fsnotify watcher")
  87. }
  88. f.logger.Tracef("Actual FileAcquisition Configuration %+v", f.config)
  89. for _, pattern := range f.config.Filenames {
  90. if f.config.ForceInotify {
  91. directory := filepath.Dir(pattern)
  92. f.logger.Infof("Force add watch on %s", directory)
  93. if !f.watchedDirectories[directory] {
  94. err = f.watcher.Add(directory)
  95. if err != nil {
  96. f.logger.Errorf("Could not create watch on directory %s : %s", directory, err)
  97. continue
  98. }
  99. f.watchedDirectories[directory] = true
  100. }
  101. }
  102. files, err := filepath.Glob(pattern)
  103. if err != nil {
  104. return errors.Wrap(err, "Glob failure")
  105. }
  106. if len(files) == 0 {
  107. f.logger.Warnf("No matching files for pattern %s", pattern)
  108. continue
  109. }
  110. for _, file := range files {
  111. //check if file is excluded
  112. excluded := false
  113. for _, pattern := range f.exclude_regexps {
  114. if pattern.MatchString(file) {
  115. excluded = true
  116. f.logger.Infof("Skipping file %s as it matches exclude pattern %s", file, pattern)
  117. break
  118. }
  119. }
  120. if excluded {
  121. continue
  122. }
  123. if files[0] != pattern && f.config.Mode == configuration.TAIL_MODE { //we have a glob pattern
  124. directory := filepath.Dir(file)
  125. f.logger.Debugf("Will add watch to directory: %s", directory)
  126. if !f.watchedDirectories[directory] {
  127. err = f.watcher.Add(directory)
  128. if err != nil {
  129. f.logger.Errorf("Could not create watch on directory %s : %s", directory, err)
  130. continue
  131. }
  132. f.watchedDirectories[directory] = true
  133. } else {
  134. f.logger.Debugf("Watch for directory %s already exists", directory)
  135. }
  136. }
  137. f.logger.Infof("Adding file %s to datasources", file)
  138. f.files = append(f.files, file)
  139. }
  140. }
  141. return nil
  142. }
  143. func (f *FileSource) ConfigureByDSN(dsn string, labels map[string]string, logger *log.Entry) error {
  144. if !strings.HasPrefix(dsn, "file://") {
  145. return fmt.Errorf("invalid DSN %s for file source, must start with file://", dsn)
  146. }
  147. f.logger = logger
  148. dsn = strings.TrimPrefix(dsn, "file://")
  149. args := strings.Split(dsn, "?")
  150. if len(args[0]) == 0 {
  151. return fmt.Errorf("empty file:// DSN")
  152. }
  153. if len(args) == 2 && len(args[1]) != 0 {
  154. params, err := url.ParseQuery(args[1])
  155. if err != nil {
  156. return errors.Wrap(err, "could not parse file args")
  157. }
  158. for key, value := range params {
  159. if key != "log_level" {
  160. return fmt.Errorf("unsupported key %s in file DSN", key)
  161. }
  162. if len(value) != 1 {
  163. return errors.New("expected zero or one value for 'log_level'")
  164. }
  165. lvl, err := log.ParseLevel(value[0])
  166. if err != nil {
  167. return errors.Wrapf(err, "unknown level %s", value[0])
  168. }
  169. f.logger.Logger.SetLevel(lvl)
  170. }
  171. }
  172. f.config = FileConfiguration{}
  173. f.config.Labels = labels
  174. f.config.Mode = configuration.CAT_MODE
  175. f.logger.Debugf("Will try pattern %s", args[0])
  176. files, err := filepath.Glob(args[0])
  177. if err != nil {
  178. return errors.Wrap(err, "Glob failure")
  179. }
  180. if len(files) == 0 {
  181. return fmt.Errorf("no matching files for pattern %s", args[0])
  182. }
  183. if len(files) > 1 {
  184. f.logger.Infof("Will read %d files", len(files))
  185. }
  186. for _, file := range files {
  187. f.logger.Infof("Adding file %s to filelist", file)
  188. f.files = append(f.files, file)
  189. }
  190. return nil
  191. }
  192. func (f *FileSource) GetMode() string {
  193. return f.config.Mode
  194. }
  195. // SupportedModes returns the supported modes by the acquisition module
  196. func (f *FileSource) SupportedModes() []string {
  197. return []string{configuration.TAIL_MODE, configuration.CAT_MODE}
  198. }
  199. // OneShotAcquisition reads a set of file and returns when done
  200. func (f *FileSource) OneShotAcquisition(out chan types.Event, t *tomb.Tomb) error {
  201. f.logger.Debug("In oneshot")
  202. for _, file := range f.files {
  203. fi, err := os.Stat(file)
  204. if err != nil {
  205. return fmt.Errorf("could not stat file %s : %w", file, err)
  206. }
  207. if fi.IsDir() {
  208. f.logger.Warnf("%s is a directory, ignoring it.", file)
  209. continue
  210. }
  211. f.logger.Infof("reading %s at once", file)
  212. err = f.readFile(file, out, t)
  213. if err != nil {
  214. return err
  215. }
  216. }
  217. return nil
  218. }
  219. func (f *FileSource) GetMetrics() []prometheus.Collector {
  220. return []prometheus.Collector{linesRead}
  221. }
  222. func (f *FileSource) GetAggregMetrics() []prometheus.Collector {
  223. return []prometheus.Collector{linesRead}
  224. }
  225. func (f *FileSource) GetName() string {
  226. return "file"
  227. }
  228. func (f *FileSource) CanRun() error {
  229. return nil
  230. }
  231. func (f *FileSource) StreamingAcquisition(out chan types.Event, t *tomb.Tomb) error {
  232. f.logger.Debug("Starting live acquisition")
  233. t.Go(func() error {
  234. return f.monitorNewFiles(out, t)
  235. })
  236. for _, file := range f.files {
  237. //before opening the file, check if we need to specifically avoid it. (XXX)
  238. skip := false
  239. for _, pattern := range f.exclude_regexps {
  240. if pattern.MatchString(file) {
  241. f.logger.Infof("file %s matches exclusion pattern %s, skipping", file, pattern.String())
  242. skip = true
  243. break
  244. }
  245. }
  246. if skip {
  247. continue
  248. }
  249. //cf. https://github.com/crowdsecurity/crowdsec/issues/1168
  250. //do not rely on stat, reclose file immediately as it's opened by Tail
  251. fd, err := os.Open(file)
  252. if err != nil {
  253. f.logger.Errorf("unable to read %s : %s", file, err)
  254. continue
  255. }
  256. if err := fd.Close(); err != nil {
  257. f.logger.Errorf("unable to close %s : %s", file, err)
  258. continue
  259. }
  260. fi, err := os.Stat(file)
  261. if err != nil {
  262. return fmt.Errorf("could not stat file %s : %w", file, err)
  263. }
  264. if fi.IsDir() {
  265. f.logger.Warnf("%s is a directory, ignoring it.", file)
  266. continue
  267. }
  268. tail, err := tail.TailFile(file, tail.Config{ReOpen: true, Follow: true, Poll: true, Location: &tail.SeekInfo{Offset: 0, Whence: io.SeekEnd}, Logger: log.NewEntry(log.StandardLogger())})
  269. if err != nil {
  270. f.logger.Errorf("Could not start tailing file %s : %s", file, err)
  271. continue
  272. }
  273. f.tails[file] = true
  274. t.Go(func() error {
  275. defer types.CatchPanic("crowdsec/acquis/file/live/fsnotify")
  276. return f.tailFile(out, t, tail)
  277. })
  278. }
  279. return nil
  280. }
  281. func (f *FileSource) Dump() interface{} {
  282. return f
  283. }
  284. func (f *FileSource) monitorNewFiles(out chan types.Event, t *tomb.Tomb) error {
  285. logger := f.logger.WithField("goroutine", "inotify")
  286. for {
  287. select {
  288. case event, ok := <-f.watcher.Events:
  289. if !ok {
  290. return nil
  291. }
  292. if event.Op&fsnotify.Create == fsnotify.Create {
  293. fi, err := os.Stat(event.Name)
  294. if err != nil {
  295. logger.Errorf("Could not stat() new file %s, ignoring it : %s", event.Name, err)
  296. continue
  297. }
  298. if fi.IsDir() {
  299. continue
  300. }
  301. logger.Debugf("Detected new file %s", event.Name)
  302. matched := false
  303. for _, pattern := range f.config.Filenames {
  304. logger.Debugf("Matching %s with %s", pattern, event.Name)
  305. matched, err = path.Match(pattern, event.Name)
  306. if err != nil {
  307. logger.Errorf("Could not match pattern : %s", err)
  308. continue
  309. }
  310. if matched {
  311. break
  312. }
  313. }
  314. if !matched {
  315. continue
  316. }
  317. //before opening the file, check if we need to specifically avoid it. (XXX)
  318. skip := false
  319. for _, pattern := range f.exclude_regexps {
  320. if pattern.MatchString(event.Name) {
  321. f.logger.Infof("file %s matches exclusion pattern %s, skipping", event.Name, pattern.String())
  322. skip = true
  323. break
  324. }
  325. }
  326. if skip {
  327. continue
  328. }
  329. if f.tails[event.Name] {
  330. //we already have a tail on it, do not start a new one
  331. logger.Debugf("Already tailing file %s, not creating a new tail", event.Name)
  332. break
  333. }
  334. //cf. https://github.com/crowdsecurity/crowdsec/issues/1168
  335. //do not rely on stat, reclose file immediately as it's opened by Tail
  336. fd, err := os.Open(event.Name)
  337. if err != nil {
  338. f.logger.Errorf("unable to read %s : %s", event.Name, err)
  339. continue
  340. }
  341. if err := fd.Close(); err != nil {
  342. f.logger.Errorf("unable to close %s : %s", event.Name, err)
  343. continue
  344. }
  345. //Slightly different parameters for Location, as we want to read the first lines of the newly created file
  346. tail, err := tail.TailFile(event.Name, tail.Config{ReOpen: true, Follow: true, Poll: true, Location: &tail.SeekInfo{Offset: 0, Whence: io.SeekStart}})
  347. if err != nil {
  348. logger.Errorf("Could not start tailing file %s : %s", event.Name, err)
  349. break
  350. }
  351. f.tails[event.Name] = true
  352. t.Go(func() error {
  353. defer types.CatchPanic("crowdsec/acquis/tailfile")
  354. return f.tailFile(out, t, tail)
  355. })
  356. }
  357. case err, ok := <-f.watcher.Errors:
  358. if !ok {
  359. return nil
  360. }
  361. logger.Errorf("Error while monitoring folder: %s", err)
  362. case <-t.Dying():
  363. err := f.watcher.Close()
  364. if err != nil {
  365. return errors.Wrapf(err, "could not remove all inotify watches")
  366. }
  367. return nil
  368. }
  369. }
  370. }
  371. func (f *FileSource) tailFile(out chan types.Event, t *tomb.Tomb, tail *tail.Tail) error {
  372. logger := f.logger.WithField("tail", tail.Filename)
  373. logger.Debugf("-> Starting tail of %s", tail.Filename)
  374. for {
  375. select {
  376. case <-t.Dying():
  377. logger.Infof("File datasource %s stopping", tail.Filename)
  378. if err := tail.Stop(); err != nil {
  379. f.logger.Errorf("error in stop : %s", err)
  380. return err
  381. }
  382. return nil
  383. case <-tail.Dying(): //our tailer is dying
  384. logger.Warningf("File reader of %s died", tail.Filename)
  385. t.Kill(fmt.Errorf("dead reader for %s", tail.Filename))
  386. return fmt.Errorf("reader for %s is dead", tail.Filename)
  387. case line := <-tail.Lines:
  388. if line == nil {
  389. logger.Warningf("tail for %s is empty", tail.Filename)
  390. continue
  391. }
  392. if line.Err != nil {
  393. logger.Warningf("fetch error : %v", line.Err)
  394. return line.Err
  395. }
  396. if line.Text == "" { //skip empty lines
  397. continue
  398. }
  399. linesRead.With(prometheus.Labels{"source": tail.Filename}).Inc()
  400. l := types.Line{
  401. Raw: trimLine(line.Text),
  402. Labels: f.config.Labels,
  403. Time: line.Time,
  404. Src: tail.Filename,
  405. Process: true,
  406. Module: f.GetName(),
  407. }
  408. //we're tailing, it must be real time logs
  409. logger.Debugf("pushing %+v", l)
  410. expectMode := types.LIVE
  411. if f.config.UseTimeMachine {
  412. expectMode = types.TIMEMACHINE
  413. }
  414. out <- types.Event{Line: l, Process: true, Type: types.LOG, ExpectMode: expectMode}
  415. }
  416. }
  417. }
  418. func (f *FileSource) readFile(filename string, out chan types.Event, t *tomb.Tomb) error {
  419. var scanner *bufio.Scanner
  420. logger := f.logger.WithField("oneshot", filename)
  421. fd, err := os.Open(filename)
  422. if err != nil {
  423. return errors.Wrapf(err, "failed opening %s", filename)
  424. }
  425. defer fd.Close()
  426. if strings.HasSuffix(filename, ".gz") {
  427. gz, err := gzip.NewReader(fd)
  428. if err != nil {
  429. logger.Errorf("Failed to read gz file: %s", err)
  430. return errors.Wrapf(err, "failed to read gz %s", filename)
  431. }
  432. defer gz.Close()
  433. scanner = bufio.NewScanner(gz)
  434. } else {
  435. scanner = bufio.NewScanner(fd)
  436. }
  437. scanner.Split(bufio.ScanLines)
  438. for scanner.Scan() {
  439. if scanner.Text() == "" {
  440. continue
  441. }
  442. l := types.Line{
  443. Raw: scanner.Text(),
  444. Time: time.Now().UTC(),
  445. Src: filename,
  446. Labels: f.config.Labels,
  447. Process: true,
  448. Module: f.GetName(),
  449. }
  450. logger.Debugf("line %s", l.Raw)
  451. linesRead.With(prometheus.Labels{"source": filename}).Inc()
  452. //we're reading logs at once, it must be time-machine buckets
  453. out <- types.Event{Line: l, Process: true, Type: types.LOG, ExpectMode: types.TIMEMACHINE}
  454. }
  455. t.Kill(nil)
  456. return nil
  457. }