file.go 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442
  1. package fileacquisition
  2. import (
  3. "bufio"
  4. "compress/gzip"
  5. "fmt"
  6. "io"
  7. "net/url"
  8. "os"
  9. "path"
  10. "path/filepath"
  11. "strings"
  12. "time"
  13. "github.com/crowdsecurity/crowdsec/pkg/acquisition/configuration"
  14. leaky "github.com/crowdsecurity/crowdsec/pkg/leakybucket"
  15. "github.com/crowdsecurity/crowdsec/pkg/types"
  16. "github.com/fsnotify/fsnotify"
  17. "github.com/nxadm/tail"
  18. "github.com/pkg/errors"
  19. "github.com/prometheus/client_golang/prometheus"
  20. log "github.com/sirupsen/logrus"
  21. "gopkg.in/tomb.v2"
  22. "gopkg.in/yaml.v2"
  23. )
  24. var linesRead = prometheus.NewCounterVec(
  25. prometheus.CounterOpts{
  26. Name: "cs_filesource_hits_total",
  27. Help: "Total lines that were read.",
  28. },
  29. []string{"source"})
  30. type FileConfiguration struct {
  31. Filenames []string
  32. Filename string
  33. ForceInotify bool `yaml:"force_inotify"`
  34. configuration.DataSourceCommonCfg `yaml:",inline"`
  35. }
  36. type FileSource struct {
  37. config FileConfiguration
  38. watcher *fsnotify.Watcher
  39. watchedDirectories map[string]bool
  40. tails map[string]bool
  41. logger *log.Entry
  42. files []string
  43. }
  44. func (f *FileSource) Configure(Config []byte, logger *log.Entry) error {
  45. fileConfig := FileConfiguration{}
  46. f.logger = logger
  47. f.watchedDirectories = make(map[string]bool)
  48. f.tails = make(map[string]bool)
  49. err := yaml.UnmarshalStrict(Config, &fileConfig)
  50. if err != nil {
  51. return errors.Wrap(err, "Cannot parse FileAcquisition configuration")
  52. }
  53. f.logger.Tracef("FileAcquisition configuration: %+v", fileConfig)
  54. if len(fileConfig.Filename) != 0 {
  55. fileConfig.Filenames = append(fileConfig.Filenames, fileConfig.Filename)
  56. }
  57. if len(fileConfig.Filenames) == 0 {
  58. return fmt.Errorf("no filename or filenames configuration provided")
  59. }
  60. f.config = fileConfig
  61. if f.config.Mode == "" {
  62. f.config.Mode = configuration.TAIL_MODE
  63. }
  64. if f.config.Mode != configuration.CAT_MODE && f.config.Mode != configuration.TAIL_MODE {
  65. return fmt.Errorf("unsupported mode %s for file source", f.config.Mode)
  66. }
  67. f.watcher, err = fsnotify.NewWatcher()
  68. if err != nil {
  69. return errors.Wrapf(err, "Could not create fsnotify watcher")
  70. }
  71. f.logger.Tracef("Actual FileAcquisition Configuration %+v", f.config)
  72. for _, pattern := range f.config.Filenames {
  73. if f.config.ForceInotify {
  74. directory := filepath.Dir(pattern)
  75. f.logger.Infof("Force add watch on %s", directory)
  76. if !f.watchedDirectories[directory] {
  77. err = f.watcher.Add(directory)
  78. if err != nil {
  79. f.logger.Errorf("Could not create watch on directory %s : %s", directory, err)
  80. continue
  81. }
  82. f.watchedDirectories[directory] = true
  83. }
  84. }
  85. files, err := filepath.Glob(pattern)
  86. if err != nil {
  87. return errors.Wrap(err, "Glob failure")
  88. }
  89. if len(files) == 0 {
  90. f.logger.Warnf("No matching files for pattern %s", pattern)
  91. continue
  92. }
  93. for _, file := range files {
  94. if files[0] != pattern && f.config.Mode == configuration.TAIL_MODE { //we have a glob pattern
  95. directory := filepath.Dir(file)
  96. f.logger.Debugf("Will add watch to directory: %s", directory)
  97. if !f.watchedDirectories[directory] {
  98. err = f.watcher.Add(directory)
  99. if err != nil {
  100. f.logger.Errorf("Could not create watch on directory %s : %s", directory, err)
  101. continue
  102. }
  103. f.watchedDirectories[directory] = true
  104. } else {
  105. f.logger.Debugf("Watch for directory %s already exists", directory)
  106. }
  107. }
  108. f.logger.Infof("Adding file %s to datasources", file)
  109. f.files = append(f.files, file)
  110. }
  111. }
  112. return nil
  113. }
  114. func (f *FileSource) ConfigureByDSN(dsn string, labels map[string]string, logger *log.Entry) error {
  115. if !strings.HasPrefix(dsn, "file://") {
  116. return fmt.Errorf("invalid DSN %s for file source, must start with file://", dsn)
  117. }
  118. f.logger = logger
  119. dsn = strings.TrimPrefix(dsn, "file://")
  120. args := strings.Split(dsn, "?")
  121. if len(args[0]) == 0 {
  122. return fmt.Errorf("empty file:// DSN")
  123. }
  124. if len(args) == 2 && len(args[1]) != 0 {
  125. params, err := url.ParseQuery(args[1])
  126. if err != nil {
  127. return errors.Wrap(err, "could not parse file args")
  128. }
  129. for key, value := range params {
  130. if key != "log_level" {
  131. return fmt.Errorf("unsupported key %s in file DSN", key)
  132. }
  133. if len(value) != 1 {
  134. return errors.New("expected zero or one value for 'log_level'")
  135. }
  136. lvl, err := log.ParseLevel(value[0])
  137. if err != nil {
  138. return errors.Wrapf(err, "unknown level %s", value[0])
  139. }
  140. f.logger.Logger.SetLevel(lvl)
  141. }
  142. }
  143. f.config = FileConfiguration{}
  144. f.config.Labels = labels
  145. f.config.Mode = configuration.CAT_MODE
  146. f.logger.Debugf("Will try pattern %s", args[0])
  147. files, err := filepath.Glob(args[0])
  148. if err != nil {
  149. return errors.Wrap(err, "Glob failure")
  150. }
  151. if len(files) == 0 {
  152. return fmt.Errorf("no matching files for pattern %s", args[0])
  153. }
  154. if len(files) > 1 {
  155. f.logger.Infof("Will read %d files", len(files))
  156. }
  157. for _, file := range files {
  158. f.logger.Infof("Adding file %s to filelist", file)
  159. f.files = append(f.files, file)
  160. }
  161. return nil
  162. }
  163. func (f *FileSource) GetMode() string {
  164. return f.config.Mode
  165. }
  166. //SupportedModes returns the supported modes by the acquisition module
  167. func (f *FileSource) SupportedModes() []string {
  168. return []string{configuration.TAIL_MODE, configuration.CAT_MODE}
  169. }
  170. //OneShotAcquisition reads a set of file and returns when done
  171. func (f *FileSource) OneShotAcquisition(out chan types.Event, t *tomb.Tomb) error {
  172. f.logger.Debug("In oneshot")
  173. for _, file := range f.files {
  174. fi, err := os.Stat(file)
  175. if err != nil {
  176. return fmt.Errorf("could not stat file %s : %w", file, err)
  177. }
  178. if fi.IsDir() {
  179. f.logger.Warnf("%s is a directory, ignoring it.", file)
  180. continue
  181. }
  182. f.logger.Infof("reading %s at once", file)
  183. err = f.readFile(file, out, t)
  184. if err != nil {
  185. return err
  186. }
  187. }
  188. return nil
  189. }
  190. func (f *FileSource) GetMetrics() []prometheus.Collector {
  191. return []prometheus.Collector{linesRead}
  192. }
  193. func (f *FileSource) GetAggregMetrics() []prometheus.Collector {
  194. return []prometheus.Collector{linesRead}
  195. }
  196. func (f *FileSource) GetName() string {
  197. return "file"
  198. }
  199. func (f *FileSource) CanRun() error {
  200. return nil
  201. }
  202. func (f *FileSource) StreamingAcquisition(out chan types.Event, t *tomb.Tomb) error {
  203. f.logger.Debug("Starting live acquisition")
  204. t.Go(func() error {
  205. return f.monitorNewFiles(out, t)
  206. })
  207. for _, file := range f.files {
  208. //cf. https://github.com/crowdsecurity/crowdsec/issues/1168
  209. //do not rely on stat, reclose file immediately as it's opened by Tail
  210. fd, err := os.Open(file)
  211. if err != nil {
  212. f.logger.Errorf("unable to read %s : %s", file, err)
  213. continue
  214. }
  215. if err := fd.Close(); err != nil {
  216. f.logger.Errorf("unable to close %s : %s", file, err)
  217. continue
  218. }
  219. fi, err := os.Stat(file)
  220. if err != nil {
  221. return fmt.Errorf("could not stat file %s : %w", file, err)
  222. }
  223. if fi.IsDir() {
  224. f.logger.Warnf("%s is a directory, ignoring it.", file)
  225. continue
  226. }
  227. tail, err := tail.TailFile(file, tail.Config{ReOpen: true, Follow: true, Poll: true, Location: &tail.SeekInfo{Offset: 0, Whence: io.SeekEnd}})
  228. if err != nil {
  229. f.logger.Errorf("Could not start tailing file %s : %s", file, err)
  230. continue
  231. }
  232. f.tails[file] = true
  233. t.Go(func() error {
  234. defer types.CatchPanic("crowdsec/acquis/file/live/fsnotify")
  235. return f.tailFile(out, t, tail)
  236. })
  237. }
  238. return nil
  239. }
  240. func (f *FileSource) Dump() interface{} {
  241. return f
  242. }
  243. func (f *FileSource) monitorNewFiles(out chan types.Event, t *tomb.Tomb) error {
  244. logger := f.logger.WithField("goroutine", "inotify")
  245. for {
  246. select {
  247. case event, ok := <-f.watcher.Events:
  248. if !ok {
  249. return nil
  250. }
  251. if event.Op&fsnotify.Create == fsnotify.Create {
  252. fi, err := os.Stat(event.Name)
  253. if err != nil {
  254. logger.Errorf("Could not stat() new file %s, ignoring it : %s", event.Name, err)
  255. continue
  256. }
  257. if fi.IsDir() {
  258. continue
  259. }
  260. logger.Debugf("Detected new file %s", event.Name)
  261. matched := false
  262. for _, pattern := range f.config.Filenames {
  263. logger.Debugf("Matching %s with %s", pattern, event.Name)
  264. matched, err = path.Match(pattern, event.Name)
  265. if err != nil {
  266. logger.Errorf("Could not match pattern : %s", err)
  267. continue
  268. }
  269. if matched {
  270. break
  271. }
  272. }
  273. if !matched {
  274. continue
  275. }
  276. if f.tails[event.Name] {
  277. //we already have a tail on it, do not start a new one
  278. logger.Debugf("Already tailing file %s, not creating a new tail", event.Name)
  279. break
  280. }
  281. //cf. https://github.com/crowdsecurity/crowdsec/issues/1168
  282. //do not rely on stat, reclose file immediately as it's opened by Tail
  283. fd, err := os.Open(event.Name)
  284. if err != nil {
  285. f.logger.Errorf("unable to read %s : %s", event.Name, err)
  286. continue
  287. }
  288. if err := fd.Close(); err != nil {
  289. f.logger.Errorf("unable to close %s : %s", event.Name, err)
  290. continue
  291. }
  292. //Slightly different parameters for Location, as we want to read the first lines of the newly created file
  293. tail, err := tail.TailFile(event.Name, tail.Config{ReOpen: true, Follow: true, Poll: true, Location: &tail.SeekInfo{Offset: 0, Whence: io.SeekStart}})
  294. if err != nil {
  295. logger.Errorf("Could not start tailing file %s : %s", event.Name, err)
  296. break
  297. }
  298. f.tails[event.Name] = true
  299. t.Go(func() error {
  300. defer types.CatchPanic("crowdsec/acquis/tailfile")
  301. return f.tailFile(out, t, tail)
  302. })
  303. }
  304. case err, ok := <-f.watcher.Errors:
  305. if !ok {
  306. return nil
  307. }
  308. logger.Errorf("Error while monitoring folder: %s", err)
  309. case <-t.Dying():
  310. err := f.watcher.Close()
  311. if err != nil {
  312. return errors.Wrapf(err, "could not remove all inotify watches")
  313. }
  314. return nil
  315. }
  316. }
  317. }
  318. func (f *FileSource) tailFile(out chan types.Event, t *tomb.Tomb, tail *tail.Tail) error {
  319. logger := f.logger.WithField("tail", tail.Filename)
  320. logger.Debugf("-> Starting tail of %s", tail.Filename)
  321. for {
  322. select {
  323. case <-t.Dying():
  324. logger.Infof("File datasource %s stopping", tail.Filename)
  325. if err := tail.Stop(); err != nil {
  326. f.logger.Errorf("error in stop : %s", err)
  327. return err
  328. }
  329. return nil
  330. case <-tail.Tomb.Dying(): //our tailer is dying
  331. logger.Warningf("File reader of %s died", tail.Filename)
  332. t.Kill(fmt.Errorf("dead reader for %s", tail.Filename))
  333. return fmt.Errorf("reader for %s is dead", tail.Filename)
  334. case line := <-tail.Lines:
  335. if line == nil {
  336. logger.Debugf("Nil line")
  337. return fmt.Errorf("tail for %s is empty", tail.Filename)
  338. }
  339. if line.Err != nil {
  340. logger.Warningf("fetch error : %v", line.Err)
  341. return line.Err
  342. }
  343. if line.Text == "" { //skip empty lines
  344. continue
  345. }
  346. linesRead.With(prometheus.Labels{"source": tail.Filename}).Inc()
  347. l := types.Line{
  348. Raw: trimLine(line.Text),
  349. Labels: f.config.Labels,
  350. Time: line.Time,
  351. Src: tail.Filename,
  352. Process: true,
  353. Module: f.GetName(),
  354. }
  355. //we're tailing, it must be real time logs
  356. logger.Debugf("pushing %+v", l)
  357. expectMode := leaky.LIVE
  358. if f.config.UseTimeMachine {
  359. expectMode = leaky.TIMEMACHINE
  360. }
  361. out <- types.Event{Line: l, Process: true, Type: types.LOG, ExpectMode: expectMode}
  362. }
  363. }
  364. }
  365. func (f *FileSource) readFile(filename string, out chan types.Event, t *tomb.Tomb) error {
  366. var scanner *bufio.Scanner
  367. logger := f.logger.WithField("oneshot", filename)
  368. fd, err := os.Open(filename)
  369. if err != nil {
  370. return errors.Wrapf(err, "failed opening %s", filename)
  371. }
  372. defer fd.Close()
  373. if strings.HasSuffix(filename, ".gz") {
  374. gz, err := gzip.NewReader(fd)
  375. if err != nil {
  376. logger.Errorf("Failed to read gz file: %s", err)
  377. return errors.Wrapf(err, "failed to read gz %s", filename)
  378. }
  379. defer gz.Close()
  380. scanner = bufio.NewScanner(gz)
  381. } else {
  382. scanner = bufio.NewScanner(fd)
  383. }
  384. scanner.Split(bufio.ScanLines)
  385. for scanner.Scan() {
  386. if scanner.Text() == "" {
  387. continue
  388. }
  389. l := types.Line{
  390. Raw: scanner.Text(),
  391. Time: time.Now().UTC(),
  392. Src: filename,
  393. Labels: f.config.Labels,
  394. Process: true,
  395. Module: f.GetName(),
  396. }
  397. logger.Debugf("line %s", l.Raw)
  398. linesRead.With(prometheus.Labels{"source": filename}).Inc()
  399. //we're reading logs at once, it must be time-machine buckets
  400. out <- types.Event{Line: l, Process: true, Type: types.LOG, ExpectMode: leaky.TIMEMACHINE}
  401. }
  402. t.Kill(nil)
  403. return nil
  404. }