file.go 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493
  1. package fileacquisition
  2. import (
  3. "bufio"
  4. "compress/gzip"
  5. "fmt"
  6. "io"
  7. "net/url"
  8. "os"
  9. "path"
  10. "path/filepath"
  11. "regexp"
  12. "strings"
  13. "time"
  14. "github.com/crowdsecurity/crowdsec/pkg/acquisition/configuration"
  15. leaky "github.com/crowdsecurity/crowdsec/pkg/leakybucket"
  16. "github.com/crowdsecurity/crowdsec/pkg/types"
  17. "github.com/fsnotify/fsnotify"
  18. "github.com/nxadm/tail"
  19. "github.com/pkg/errors"
  20. "github.com/prometheus/client_golang/prometheus"
  21. log "github.com/sirupsen/logrus"
  22. "gopkg.in/tomb.v2"
  23. "gopkg.in/yaml.v2"
  24. )
  25. var linesRead = prometheus.NewCounterVec(
  26. prometheus.CounterOpts{
  27. Name: "cs_filesource_hits_total",
  28. Help: "Total lines that were read.",
  29. },
  30. []string{"source"})
  31. type FileConfiguration struct {
  32. Filenames []string
  33. ExcludeRegexps []string `yaml:"exclude_regexps"`
  34. Filename string
  35. ForceInotify bool `yaml:"force_inotify"`
  36. configuration.DataSourceCommonCfg `yaml:",inline"`
  37. }
  38. type FileSource struct {
  39. config FileConfiguration
  40. watcher *fsnotify.Watcher
  41. watchedDirectories map[string]bool
  42. tails map[string]bool
  43. logger *log.Entry
  44. files []string
  45. exclude_regexps []*regexp.Regexp
  46. }
  47. func (f *FileSource) Configure(Config []byte, logger *log.Entry) error {
  48. fileConfig := FileConfiguration{}
  49. f.logger = logger
  50. f.watchedDirectories = make(map[string]bool)
  51. f.tails = make(map[string]bool)
  52. err := yaml.UnmarshalStrict(Config, &fileConfig)
  53. if err != nil {
  54. return errors.Wrap(err, "Cannot parse FileAcquisition configuration")
  55. }
  56. f.logger.Tracef("FileAcquisition configuration: %+v", fileConfig)
  57. if len(fileConfig.Filename) != 0 {
  58. fileConfig.Filenames = append(fileConfig.Filenames, fileConfig.Filename)
  59. }
  60. if len(fileConfig.Filenames) == 0 {
  61. return fmt.Errorf("no filename or filenames configuration provided")
  62. }
  63. f.config = fileConfig
  64. if f.config.Mode == "" {
  65. f.config.Mode = configuration.TAIL_MODE
  66. }
  67. if f.config.Mode != configuration.CAT_MODE && f.config.Mode != configuration.TAIL_MODE {
  68. return fmt.Errorf("unsupported mode %s for file source", f.config.Mode)
  69. }
  70. f.watcher, err = fsnotify.NewWatcher()
  71. if err != nil {
  72. return errors.Wrapf(err, "Could not create fsnotify watcher")
  73. }
  74. for _, exclude := range f.config.ExcludeRegexps {
  75. re, err := regexp.Compile(exclude)
  76. if err != nil {
  77. return errors.Wrapf(err, "Could not compile regexp %s", exclude)
  78. }
  79. f.exclude_regexps = append(f.exclude_regexps, re)
  80. }
  81. f.logger.Tracef("Actual FileAcquisition Configuration %+v", f.config)
  82. for _, pattern := range f.config.Filenames {
  83. if f.config.ForceInotify {
  84. directory := filepath.Dir(pattern)
  85. f.logger.Infof("Force add watch on %s", directory)
  86. if !f.watchedDirectories[directory] {
  87. err = f.watcher.Add(directory)
  88. if err != nil {
  89. f.logger.Errorf("Could not create watch on directory %s : %s", directory, err)
  90. continue
  91. }
  92. f.watchedDirectories[directory] = true
  93. }
  94. }
  95. files, err := filepath.Glob(pattern)
  96. if err != nil {
  97. return errors.Wrap(err, "Glob failure")
  98. }
  99. if len(files) == 0 {
  100. f.logger.Warnf("No matching files for pattern %s", pattern)
  101. continue
  102. }
  103. for _, file := range files {
  104. //check if file is excluded
  105. excluded := false
  106. for _, pattern := range f.exclude_regexps {
  107. if pattern.MatchString(file) {
  108. excluded = true
  109. f.logger.Infof("Skipping file %s as it matches exclude pattern %s", file, pattern)
  110. break
  111. }
  112. }
  113. if excluded {
  114. continue
  115. }
  116. if files[0] != pattern && f.config.Mode == configuration.TAIL_MODE { //we have a glob pattern
  117. directory := filepath.Dir(file)
  118. f.logger.Debugf("Will add watch to directory: %s", directory)
  119. if !f.watchedDirectories[directory] {
  120. err = f.watcher.Add(directory)
  121. if err != nil {
  122. f.logger.Errorf("Could not create watch on directory %s : %s", directory, err)
  123. continue
  124. }
  125. f.watchedDirectories[directory] = true
  126. } else {
  127. f.logger.Debugf("Watch for directory %s already exists", directory)
  128. }
  129. }
  130. f.logger.Infof("Adding file %s to datasources", file)
  131. f.files = append(f.files, file)
  132. }
  133. }
  134. return nil
  135. }
  136. func (f *FileSource) ConfigureByDSN(dsn string, labels map[string]string, logger *log.Entry) error {
  137. if !strings.HasPrefix(dsn, "file://") {
  138. return fmt.Errorf("invalid DSN %s for file source, must start with file://", dsn)
  139. }
  140. f.logger = logger
  141. dsn = strings.TrimPrefix(dsn, "file://")
  142. args := strings.Split(dsn, "?")
  143. if len(args[0]) == 0 {
  144. return fmt.Errorf("empty file:// DSN")
  145. }
  146. if len(args) == 2 && len(args[1]) != 0 {
  147. params, err := url.ParseQuery(args[1])
  148. if err != nil {
  149. return errors.Wrap(err, "could not parse file args")
  150. }
  151. for key, value := range params {
  152. if key != "log_level" {
  153. return fmt.Errorf("unsupported key %s in file DSN", key)
  154. }
  155. if len(value) != 1 {
  156. return errors.New("expected zero or one value for 'log_level'")
  157. }
  158. lvl, err := log.ParseLevel(value[0])
  159. if err != nil {
  160. return errors.Wrapf(err, "unknown level %s", value[0])
  161. }
  162. f.logger.Logger.SetLevel(lvl)
  163. }
  164. }
  165. f.config = FileConfiguration{}
  166. f.config.Labels = labels
  167. f.config.Mode = configuration.CAT_MODE
  168. f.logger.Debugf("Will try pattern %s", args[0])
  169. files, err := filepath.Glob(args[0])
  170. if err != nil {
  171. return errors.Wrap(err, "Glob failure")
  172. }
  173. if len(files) == 0 {
  174. return fmt.Errorf("no matching files for pattern %s", args[0])
  175. }
  176. if len(files) > 1 {
  177. f.logger.Infof("Will read %d files", len(files))
  178. }
  179. for _, file := range files {
  180. f.logger.Infof("Adding file %s to filelist", file)
  181. f.files = append(f.files, file)
  182. }
  183. return nil
  184. }
  185. func (f *FileSource) GetMode() string {
  186. return f.config.Mode
  187. }
  188. //SupportedModes returns the supported modes by the acquisition module
  189. func (f *FileSource) SupportedModes() []string {
  190. return []string{configuration.TAIL_MODE, configuration.CAT_MODE}
  191. }
  192. //OneShotAcquisition reads a set of file and returns when done
  193. func (f *FileSource) OneShotAcquisition(out chan types.Event, t *tomb.Tomb) error {
  194. f.logger.Debug("In oneshot")
  195. for _, file := range f.files {
  196. fi, err := os.Stat(file)
  197. if err != nil {
  198. return fmt.Errorf("could not stat file %s : %w", file, err)
  199. }
  200. if fi.IsDir() {
  201. f.logger.Warnf("%s is a directory, ignoring it.", file)
  202. continue
  203. }
  204. f.logger.Infof("reading %s at once", file)
  205. err = f.readFile(file, out, t)
  206. if err != nil {
  207. return err
  208. }
  209. }
  210. return nil
  211. }
  212. func (f *FileSource) GetMetrics() []prometheus.Collector {
  213. return []prometheus.Collector{linesRead}
  214. }
  215. func (f *FileSource) GetAggregMetrics() []prometheus.Collector {
  216. return []prometheus.Collector{linesRead}
  217. }
  218. func (f *FileSource) GetName() string {
  219. return "file"
  220. }
  221. func (f *FileSource) CanRun() error {
  222. return nil
  223. }
  224. func (f *FileSource) StreamingAcquisition(out chan types.Event, t *tomb.Tomb) error {
  225. f.logger.Debug("Starting live acquisition")
  226. t.Go(func() error {
  227. return f.monitorNewFiles(out, t)
  228. })
  229. for _, file := range f.files {
  230. //before opening the file, check if we need to specifically avoid it. (XXX)
  231. skip := false
  232. for _, pattern := range f.exclude_regexps {
  233. if pattern.MatchString(file) {
  234. f.logger.Infof("file %s matches exclusion pattern %s, skipping", file, pattern.String())
  235. skip = true
  236. break
  237. }
  238. }
  239. if skip {
  240. continue
  241. }
  242. //cf. https://github.com/crowdsecurity/crowdsec/issues/1168
  243. //do not rely on stat, reclose file immediately as it's opened by Tail
  244. fd, err := os.Open(file)
  245. if err != nil {
  246. f.logger.Errorf("unable to read %s : %s", file, err)
  247. continue
  248. }
  249. if err := fd.Close(); err != nil {
  250. f.logger.Errorf("unable to close %s : %s", file, err)
  251. continue
  252. }
  253. fi, err := os.Stat(file)
  254. if err != nil {
  255. return fmt.Errorf("could not stat file %s : %w", file, err)
  256. }
  257. if fi.IsDir() {
  258. f.logger.Warnf("%s is a directory, ignoring it.", file)
  259. continue
  260. }
  261. tail, err := tail.TailFile(file, tail.Config{ReOpen: true, Follow: true, Poll: true, Location: &tail.SeekInfo{Offset: 0, Whence: io.SeekEnd}})
  262. if err != nil {
  263. f.logger.Errorf("Could not start tailing file %s : %s", file, err)
  264. continue
  265. }
  266. f.tails[file] = true
  267. t.Go(func() error {
  268. defer types.CatchPanic("crowdsec/acquis/file/live/fsnotify")
  269. return f.tailFile(out, t, tail)
  270. })
  271. }
  272. return nil
  273. }
  274. func (f *FileSource) Dump() interface{} {
  275. return f
  276. }
  277. func (f *FileSource) monitorNewFiles(out chan types.Event, t *tomb.Tomb) error {
  278. logger := f.logger.WithField("goroutine", "inotify")
  279. for {
  280. select {
  281. case event, ok := <-f.watcher.Events:
  282. if !ok {
  283. return nil
  284. }
  285. if event.Op&fsnotify.Create == fsnotify.Create {
  286. fi, err := os.Stat(event.Name)
  287. if err != nil {
  288. logger.Errorf("Could not stat() new file %s, ignoring it : %s", event.Name, err)
  289. continue
  290. }
  291. if fi.IsDir() {
  292. continue
  293. }
  294. logger.Debugf("Detected new file %s", event.Name)
  295. matched := false
  296. for _, pattern := range f.config.Filenames {
  297. logger.Debugf("Matching %s with %s", pattern, event.Name)
  298. matched, err = path.Match(pattern, event.Name)
  299. if err != nil {
  300. logger.Errorf("Could not match pattern : %s", err)
  301. continue
  302. }
  303. if matched {
  304. break
  305. }
  306. }
  307. if !matched {
  308. continue
  309. }
  310. //before opening the file, check if we need to specifically avoid it. (XXX)
  311. skip := false
  312. for _, pattern := range f.exclude_regexps {
  313. if pattern.MatchString(event.Name) {
  314. f.logger.Infof("file %s matches exclusion pattern %s, skipping", event.Name, pattern.String())
  315. skip = true
  316. break
  317. }
  318. }
  319. if skip {
  320. continue
  321. }
  322. if f.tails[event.Name] {
  323. //we already have a tail on it, do not start a new one
  324. logger.Debugf("Already tailing file %s, not creating a new tail", event.Name)
  325. break
  326. }
  327. //cf. https://github.com/crowdsecurity/crowdsec/issues/1168
  328. //do not rely on stat, reclose file immediately as it's opened by Tail
  329. fd, err := os.Open(event.Name)
  330. if err != nil {
  331. f.logger.Errorf("unable to read %s : %s", event.Name, err)
  332. continue
  333. }
  334. if err := fd.Close(); err != nil {
  335. f.logger.Errorf("unable to close %s : %s", event.Name, err)
  336. continue
  337. }
  338. //Slightly different parameters for Location, as we want to read the first lines of the newly created file
  339. tail, err := tail.TailFile(event.Name, tail.Config{ReOpen: true, Follow: true, Poll: true, Location: &tail.SeekInfo{Offset: 0, Whence: io.SeekStart}})
  340. if err != nil {
  341. logger.Errorf("Could not start tailing file %s : %s", event.Name, err)
  342. break
  343. }
  344. f.tails[event.Name] = true
  345. t.Go(func() error {
  346. defer types.CatchPanic("crowdsec/acquis/tailfile")
  347. return f.tailFile(out, t, tail)
  348. })
  349. }
  350. case err, ok := <-f.watcher.Errors:
  351. if !ok {
  352. return nil
  353. }
  354. logger.Errorf("Error while monitoring folder: %s", err)
  355. case <-t.Dying():
  356. err := f.watcher.Close()
  357. if err != nil {
  358. return errors.Wrapf(err, "could not remove all inotify watches")
  359. }
  360. return nil
  361. }
  362. }
  363. }
  364. func (f *FileSource) tailFile(out chan types.Event, t *tomb.Tomb, tail *tail.Tail) error {
  365. logger := f.logger.WithField("tail", tail.Filename)
  366. logger.Debugf("-> Starting tail of %s", tail.Filename)
  367. for {
  368. select {
  369. case <-t.Dying():
  370. logger.Infof("File datasource %s stopping", tail.Filename)
  371. if err := tail.Stop(); err != nil {
  372. f.logger.Errorf("error in stop : %s", err)
  373. return err
  374. }
  375. return nil
  376. case <-tail.Tomb.Dying(): //our tailer is dying
  377. logger.Warningf("File reader of %s died", tail.Filename)
  378. t.Kill(fmt.Errorf("dead reader for %s", tail.Filename))
  379. return fmt.Errorf("reader for %s is dead", tail.Filename)
  380. case line := <-tail.Lines:
  381. if line == nil {
  382. logger.Warningf("tail for %s is empty", tail.Filename)
  383. continue
  384. }
  385. if line.Err != nil {
  386. logger.Warningf("fetch error : %v", line.Err)
  387. return line.Err
  388. }
  389. if line.Text == "" { //skip empty lines
  390. continue
  391. }
  392. linesRead.With(prometheus.Labels{"source": tail.Filename}).Inc()
  393. l := types.Line{
  394. Raw: trimLine(line.Text),
  395. Labels: f.config.Labels,
  396. Time: line.Time,
  397. Src: tail.Filename,
  398. Process: true,
  399. Module: f.GetName(),
  400. }
  401. //we're tailing, it must be real time logs
  402. logger.Debugf("pushing %+v", l)
  403. expectMode := leaky.LIVE
  404. if f.config.UseTimeMachine {
  405. expectMode = leaky.TIMEMACHINE
  406. }
  407. out <- types.Event{Line: l, Process: true, Type: types.LOG, ExpectMode: expectMode}
  408. }
  409. }
  410. }
  411. func (f *FileSource) readFile(filename string, out chan types.Event, t *tomb.Tomb) error {
  412. var scanner *bufio.Scanner
  413. logger := f.logger.WithField("oneshot", filename)
  414. fd, err := os.Open(filename)
  415. if err != nil {
  416. return errors.Wrapf(err, "failed opening %s", filename)
  417. }
  418. defer fd.Close()
  419. if strings.HasSuffix(filename, ".gz") {
  420. gz, err := gzip.NewReader(fd)
  421. if err != nil {
  422. logger.Errorf("Failed to read gz file: %s", err)
  423. return errors.Wrapf(err, "failed to read gz %s", filename)
  424. }
  425. defer gz.Close()
  426. scanner = bufio.NewScanner(gz)
  427. } else {
  428. scanner = bufio.NewScanner(fd)
  429. }
  430. scanner.Split(bufio.ScanLines)
  431. for scanner.Scan() {
  432. if scanner.Text() == "" {
  433. continue
  434. }
  435. l := types.Line{
  436. Raw: scanner.Text(),
  437. Time: time.Now().UTC(),
  438. Src: filename,
  439. Labels: f.config.Labels,
  440. Process: true,
  441. Module: f.GetName(),
  442. }
  443. logger.Debugf("line %s", l.Raw)
  444. linesRead.With(prometheus.Labels{"source": filename}).Inc()
  445. //we're reading logs at once, it must be time-machine buckets
  446. out <- types.Event{Line: l, Process: true, Type: types.LOG, ExpectMode: leaky.TIMEMACHINE}
  447. }
  448. t.Kill(nil)
  449. return nil
  450. }