file.go 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417
  1. package fileacquisition
  2. import (
  3. "bufio"
  4. "compress/gzip"
  5. "fmt"
  6. "io"
  7. "net/url"
  8. "os"
  9. "path"
  10. "path/filepath"
  11. "strings"
  12. "time"
  13. "github.com/crowdsecurity/crowdsec/pkg/acquisition/configuration"
  14. leaky "github.com/crowdsecurity/crowdsec/pkg/leakybucket"
  15. "github.com/crowdsecurity/crowdsec/pkg/types"
  16. "github.com/fsnotify/fsnotify"
  17. "github.com/nxadm/tail"
  18. "github.com/pkg/errors"
  19. "github.com/prometheus/client_golang/prometheus"
  20. log "github.com/sirupsen/logrus"
  21. "golang.org/x/sys/unix"
  22. "gopkg.in/tomb.v2"
  23. "gopkg.in/yaml.v2"
  24. )
  25. var linesRead = prometheus.NewCounterVec(
  26. prometheus.CounterOpts{
  27. Name: "cs_filesource_hits_total",
  28. Help: "Total lines that were read.",
  29. },
  30. []string{"source"})
  31. type FileConfiguration struct {
  32. Filenames []string
  33. Filename string
  34. ForceInotify bool `yaml:"force_inotify"`
  35. configuration.DataSourceCommonCfg `yaml:",inline"`
  36. }
  37. type FileSource struct {
  38. config FileConfiguration
  39. watcher *fsnotify.Watcher
  40. watchedDirectories map[string]bool
  41. tails map[string]bool
  42. logger *log.Entry
  43. files []string
  44. }
  45. func (f *FileSource) Configure(Config []byte, logger *log.Entry) error {
  46. fileConfig := FileConfiguration{}
  47. f.logger = logger
  48. f.watchedDirectories = make(map[string]bool)
  49. f.tails = make(map[string]bool)
  50. err := yaml.UnmarshalStrict(Config, &fileConfig)
  51. if err != nil {
  52. return errors.Wrap(err, "Cannot parse FileAcquisition configuration")
  53. }
  54. f.logger.Tracef("FileAcquisition configuration: %+v", fileConfig)
  55. if len(fileConfig.Filename) != 0 {
  56. fileConfig.Filenames = append(fileConfig.Filenames, fileConfig.Filename)
  57. }
  58. if len(fileConfig.Filenames) == 0 {
  59. return fmt.Errorf("no filename or filenames configuration provided")
  60. }
  61. f.config = fileConfig
  62. if f.config.Mode == "" {
  63. f.config.Mode = configuration.TAIL_MODE
  64. }
  65. if f.config.Mode != configuration.CAT_MODE && f.config.Mode != configuration.TAIL_MODE {
  66. return fmt.Errorf("unsupported mode %s for file source", f.config.Mode)
  67. }
  68. f.watcher, err = fsnotify.NewWatcher()
  69. if err != nil {
  70. return errors.Wrapf(err, "Could not create fsnotify watcher")
  71. }
  72. f.logger.Tracef("Actual FileAcquisition Configuration %+v", f.config)
  73. for _, pattern := range f.config.Filenames {
  74. if f.config.ForceInotify {
  75. directory := path.Dir(pattern)
  76. f.logger.Infof("Force add watch on %s", directory)
  77. if !f.watchedDirectories[directory] {
  78. err = f.watcher.Add(directory)
  79. if err != nil {
  80. f.logger.Errorf("Could not create watch on directory %s : %s", directory, err)
  81. continue
  82. }
  83. f.watchedDirectories[directory] = true
  84. }
  85. }
  86. files, err := filepath.Glob(pattern)
  87. if err != nil {
  88. return errors.Wrap(err, "Glob failure")
  89. }
  90. if len(files) == 0 {
  91. f.logger.Warnf("No matching files for pattern %s", pattern)
  92. continue
  93. }
  94. for _, file := range files {
  95. if files[0] != pattern && f.config.Mode == configuration.TAIL_MODE { //we have a glob pattern
  96. directory := path.Dir(file)
  97. if !f.watchedDirectories[directory] {
  98. err = f.watcher.Add(directory)
  99. if err != nil {
  100. f.logger.Errorf("Could not create watch on directory %s : %s", directory, err)
  101. continue
  102. }
  103. f.watchedDirectories[directory] = true
  104. }
  105. }
  106. f.logger.Infof("Adding file %s to datasources", file)
  107. f.files = append(f.files, file)
  108. }
  109. }
  110. return nil
  111. }
  112. func (f *FileSource) ConfigureByDSN(dsn string, labelType string, logger *log.Entry) error {
  113. if !strings.HasPrefix(dsn, "file://") {
  114. return fmt.Errorf("invalid DSN %s for file source, must start with file://", dsn)
  115. }
  116. f.logger = logger
  117. dsn = strings.TrimPrefix(dsn, "file://")
  118. args := strings.Split(dsn, "?")
  119. if len(args[0]) == 0 {
  120. return fmt.Errorf("empty file:// DSN")
  121. }
  122. if len(args) == 2 && len(args[1]) != 0 {
  123. params, err := url.ParseQuery(args[1])
  124. if err != nil {
  125. return fmt.Errorf("could not parse file args : %s", err)
  126. }
  127. for key, value := range params {
  128. if key != "log_level" {
  129. return fmt.Errorf("unsupported key %s in file DSN", key)
  130. }
  131. if len(value) != 1 {
  132. return fmt.Errorf("expected zero or one value for 'log_level'")
  133. }
  134. lvl, err := log.ParseLevel(value[0])
  135. if err != nil {
  136. return errors.Wrapf(err, "unknown level %s", value[0])
  137. }
  138. f.logger.Logger.SetLevel(lvl)
  139. }
  140. }
  141. f.config = FileConfiguration{}
  142. f.config.Labels = map[string]string{"type": labelType}
  143. f.config.Mode = configuration.CAT_MODE
  144. f.logger.Debugf("Will try pattern %s", args[0])
  145. files, err := filepath.Glob(args[0])
  146. if err != nil {
  147. return errors.Wrap(err, "Glob failure")
  148. }
  149. if len(files) == 0 {
  150. return fmt.Errorf("no matching files for pattern %s", args[0])
  151. }
  152. if len(files) > 1 {
  153. f.logger.Infof("Will read %d files", len(files))
  154. }
  155. for _, file := range files {
  156. f.logger.Infof("Adding file %s to filelist", file)
  157. f.files = append(f.files, file)
  158. }
  159. return nil
  160. }
  161. func (f *FileSource) GetMode() string {
  162. return f.config.Mode
  163. }
  164. //SupportedModes returns the supported modes by the acquisition module
  165. func (f *FileSource) SupportedModes() []string {
  166. return []string{configuration.TAIL_MODE, configuration.CAT_MODE}
  167. }
  168. //OneShotAcquisition reads a set of file and returns when done
  169. func (f *FileSource) OneShotAcquisition(out chan types.Event, t *tomb.Tomb) error {
  170. f.logger.Debug("In oneshot")
  171. for _, file := range f.files {
  172. fi, err := os.Stat(file)
  173. if err != nil {
  174. return fmt.Errorf("could not stat file %s : %w", file, err)
  175. }
  176. if fi.IsDir() {
  177. f.logger.Warnf("%s is a directory, ignoring it.", file)
  178. continue
  179. }
  180. f.logger.Infof("reading %s at once", file)
  181. err = f.readFile(file, out, t)
  182. if err != nil {
  183. return err
  184. }
  185. }
  186. return nil
  187. }
  188. func (f *FileSource) GetMetrics() []prometheus.Collector {
  189. return []prometheus.Collector{linesRead}
  190. }
  191. func (f *FileSource) GetAggregMetrics() []prometheus.Collector {
  192. return []prometheus.Collector{linesRead}
  193. }
  194. func (f *FileSource) GetName() string {
  195. return "file"
  196. }
  197. func (f *FileSource) CanRun() error {
  198. return nil
  199. }
  200. func (f *FileSource) StreamingAcquisition(out chan types.Event, t *tomb.Tomb) error {
  201. f.logger.Debug("Starting live acquisition")
  202. t.Go(func() error {
  203. return f.monitorNewFiles(out, t)
  204. })
  205. for _, file := range f.files {
  206. err := unix.Access(file, unix.R_OK)
  207. if err != nil {
  208. f.logger.Errorf("unable to read %s : %s", file, err)
  209. continue
  210. }
  211. fi, err := os.Stat(file)
  212. if err != nil {
  213. return fmt.Errorf("could not stat file %s : %w", file, err)
  214. }
  215. if fi.IsDir() {
  216. f.logger.Warnf("%s is a directory, ignoring it.", file)
  217. continue
  218. }
  219. tail, err := tail.TailFile(file, tail.Config{ReOpen: true, Follow: true, Poll: true, Location: &tail.SeekInfo{Offset: 0, Whence: io.SeekEnd}})
  220. if err != nil {
  221. f.logger.Errorf("Could not start tailing file %s : %s", file, err)
  222. continue
  223. }
  224. f.tails[file] = true
  225. t.Go(func() error {
  226. defer types.CatchPanic("crowdsec/acquis/file/live/fsnotify")
  227. return f.tailFile(out, t, tail)
  228. })
  229. }
  230. return nil
  231. }
  232. func (f *FileSource) Dump() interface{} {
  233. return f
  234. }
  235. func (f *FileSource) monitorNewFiles(out chan types.Event, t *tomb.Tomb) error {
  236. logger := f.logger.WithField("goroutine", "inotify")
  237. for {
  238. select {
  239. case event, ok := <-f.watcher.Events:
  240. if !ok {
  241. return nil
  242. }
  243. if event.Op&fsnotify.Create == fsnotify.Create {
  244. fi, err := os.Stat(event.Name)
  245. if err != nil {
  246. logger.Errorf("Could not stat() new file %s, ignoring it : %s", event.Name, err)
  247. continue
  248. }
  249. if fi.IsDir() {
  250. continue
  251. }
  252. logger.Debugf("Detected new file %s", event.Name)
  253. matched := false
  254. for _, pattern := range f.config.Filenames {
  255. logger.Debugf("Matching %s with %s", pattern, event.Name)
  256. matched, err = path.Match(pattern, event.Name)
  257. if err != nil {
  258. logger.Errorf("Could not match pattern : %s", err)
  259. continue
  260. }
  261. if matched {
  262. break
  263. }
  264. }
  265. if !matched {
  266. continue
  267. }
  268. if f.tails[event.Name] {
  269. //we already have a tail on it, do not start a new one
  270. logger.Debugf("Already tailing file %s, not creating a new tail", event.Name)
  271. break
  272. }
  273. err = unix.Access(event.Name, unix.R_OK)
  274. if err != nil {
  275. logger.Errorf("unable to read %s : %s", event.Name, err)
  276. continue
  277. }
  278. //Slightly different parameters for Location, as we want to read the first lines of the newly created file
  279. tail, err := tail.TailFile(event.Name, tail.Config{ReOpen: true, Follow: true, Poll: true, Location: &tail.SeekInfo{Offset: 0, Whence: io.SeekStart}})
  280. if err != nil {
  281. logger.Errorf("Could not start tailing file %s : %s", event.Name, err)
  282. break
  283. }
  284. f.tails[event.Name] = true
  285. t.Go(func() error {
  286. defer types.CatchPanic("crowdsec/acquis/tailfile")
  287. return f.tailFile(out, t, tail)
  288. })
  289. }
  290. case err, ok := <-f.watcher.Errors:
  291. if !ok {
  292. return nil
  293. }
  294. logger.Errorf("Error while monitoring folder: %s", err)
  295. case <-t.Dying():
  296. err := f.watcher.Close()
  297. if err != nil {
  298. return errors.Wrapf(err, "could not remove all inotify watches")
  299. }
  300. return nil
  301. }
  302. }
  303. }
  304. func (f *FileSource) tailFile(out chan types.Event, t *tomb.Tomb, tail *tail.Tail) error {
  305. logger := f.logger.WithField("tail", tail.Filename)
  306. logger.Debugf("-> Starting tail of %s", tail.Filename)
  307. for {
  308. l := types.Line{}
  309. select {
  310. case <-t.Dying():
  311. logger.Infof("File datasource %s stopping", tail.Filename)
  312. if err := tail.Stop(); err != nil {
  313. f.logger.Errorf("error in stop : %s", err)
  314. return err
  315. }
  316. return nil
  317. case <-tail.Tomb.Dying(): //our tailer is dying
  318. logger.Warningf("File reader of %s died", tail.Filename)
  319. t.Kill(fmt.Errorf("dead reader for %s", tail.Filename))
  320. return fmt.Errorf("reader for %s is dead", tail.Filename)
  321. case line := <-tail.Lines:
  322. if line == nil {
  323. logger.Debugf("Nil line")
  324. return fmt.Errorf("tail for %s is empty", tail.Filename)
  325. }
  326. if line.Err != nil {
  327. logger.Warningf("fetch error : %v", line.Err)
  328. return line.Err
  329. }
  330. if line.Text == "" { //skip empty lines
  331. continue
  332. }
  333. linesRead.With(prometheus.Labels{"source": tail.Filename}).Inc()
  334. l.Raw = line.Text
  335. l.Labels = f.config.Labels
  336. l.Time = line.Time
  337. l.Src = tail.Filename
  338. l.Process = true
  339. l.Module = f.GetName()
  340. //we're tailing, it must be real time logs
  341. logger.Debugf("pushing %+v", l)
  342. out <- types.Event{Line: l, Process: true, Type: types.LOG, ExpectMode: leaky.LIVE}
  343. }
  344. }
  345. }
  346. func (f *FileSource) readFile(filename string, out chan types.Event, t *tomb.Tomb) error {
  347. var scanner *bufio.Scanner
  348. logger := f.logger.WithField("oneshot", filename)
  349. fd, err := os.Open(filename)
  350. if err != nil {
  351. return errors.Wrapf(err, "failed opening %s", filename)
  352. }
  353. defer fd.Close()
  354. if strings.HasSuffix(filename, ".gz") {
  355. gz, err := gzip.NewReader(fd)
  356. if err != nil {
  357. logger.Errorf("Failed to read gz file: %s", err)
  358. return errors.Wrapf(err, "failed to read gz %s", filename)
  359. }
  360. defer gz.Close()
  361. scanner = bufio.NewScanner(gz)
  362. } else {
  363. scanner = bufio.NewScanner(fd)
  364. }
  365. scanner.Split(bufio.ScanLines)
  366. for scanner.Scan() {
  367. logger.Debugf("line %s", scanner.Text())
  368. l := types.Line{}
  369. l.Raw = scanner.Text()
  370. l.Time = time.Now()
  371. l.Src = filename
  372. l.Labels = f.config.Labels
  373. l.Process = true
  374. l.Module = f.GetName()
  375. linesRead.With(prometheus.Labels{"source": filename}).Inc()
  376. //we're reading logs at once, it must be time-machine buckets
  377. out <- types.Event{Line: l, Process: true, Type: types.LOG, ExpectMode: leaky.TIMEMACHINE}
  378. }
  379. t.Kill(nil)
  380. return nil
  381. }