file.go 9.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339
  1. package file_acquisition
  2. import (
  3. "bufio"
  4. "compress/gzip"
  5. "fmt"
  6. "io"
  7. "os"
  8. "path"
  9. "path/filepath"
  10. "strings"
  11. "time"
  12. "github.com/crowdsecurity/crowdsec/pkg/acquisition/configuration"
  13. leaky "github.com/crowdsecurity/crowdsec/pkg/leakybucket"
  14. "github.com/crowdsecurity/crowdsec/pkg/types"
  15. "github.com/fsnotify/fsnotify"
  16. "github.com/nxadm/tail"
  17. "github.com/pkg/errors"
  18. "github.com/prometheus/client_golang/prometheus"
  19. log "github.com/sirupsen/logrus"
  20. "gopkg.in/tomb.v2"
  21. "gopkg.in/yaml.v2"
  22. )
  23. type FileConfiguration struct {
  24. Filenames []string
  25. Filename string
  26. ForceInotify bool `yaml:"force_inotify"`
  27. configuration.DataSourceCommonCfg `yaml:",inline"`
  28. }
  29. type FileSource struct {
  30. config FileConfiguration
  31. watcher *fsnotify.Watcher
  32. watchedDirectories map[string]bool
  33. tails map[string]bool
  34. logger *log.Entry
  35. files []string
  36. }
  37. func (f *FileSource) SupportedDSN() []string {
  38. return []string{"file://"}
  39. }
  40. func (f *FileSource) Configure(Config []byte, logger *log.Entry) error {
  41. fileConfig := FileConfiguration{}
  42. f.logger = logger
  43. f.watchedDirectories = make(map[string]bool)
  44. f.tails = make(map[string]bool)
  45. err := yaml.UnmarshalStrict(Config, &fileConfig)
  46. if err != nil {
  47. return errors.Wrap(err, "Cannot parse FileAcquisition configuration")
  48. }
  49. f.logger.Tracef("FileAcquisition configuration: %+v", fileConfig)
  50. if len(fileConfig.Filename) != 0 {
  51. fileConfig.Filenames = append(fileConfig.Filenames, fileConfig.Filename)
  52. }
  53. if len(fileConfig.Filenames) == 0 {
  54. return fmt.Errorf("no filename or filenames configuration provided")
  55. }
  56. f.config = fileConfig
  57. if f.config.Mode == "" {
  58. f.config.Mode = configuration.TAIL_MODE
  59. }
  60. f.watcher, err = fsnotify.NewWatcher()
  61. if err != nil {
  62. return errors.Wrapf(err, "Could not create fsnotify watcher")
  63. }
  64. f.logger.Tracef("Actual FileAcquisition Configuration %+v", f.config)
  65. for _, pattern := range f.config.Filenames {
  66. if f.config.ForceInotify {
  67. directory := path.Dir(pattern)
  68. f.logger.Infof("Force add watch on %s", directory)
  69. if !f.watchedDirectories[directory] {
  70. err = f.watcher.Add(directory)
  71. if err != nil {
  72. f.logger.Errorf("Could not create watch on directory %s : %s", directory, err)
  73. continue
  74. }
  75. f.watchedDirectories[directory] = true
  76. }
  77. }
  78. files, err := filepath.Glob(pattern)
  79. if err != nil {
  80. return errors.Wrap(err, "Glob failure")
  81. }
  82. if len(files) == 0 {
  83. f.logger.Infof("No matching files for pattern %s", pattern)
  84. continue
  85. }
  86. f.logger.Infof("Will read %d files", len(files))
  87. for _, file := range files {
  88. if files[0] != pattern && f.config.Mode == configuration.TAIL_MODE { //we have a glob pattern
  89. directory := path.Dir(file)
  90. if !f.watchedDirectories[directory] {
  91. err = f.watcher.Add(directory)
  92. if err != nil {
  93. f.logger.Errorf("Could not create watch on directory %s : %s", directory, err)
  94. continue
  95. }
  96. f.watchedDirectories[directory] = true
  97. }
  98. }
  99. f.logger.Infof("Adding file %s to filelist", file)
  100. f.files = append(f.files, file)
  101. }
  102. }
  103. return nil
  104. }
  105. func (f *FileSource) ConfigureByDSN(dsn string, logger *log.Entry) error {
  106. if !strings.HasPrefix(dsn, "file://") {
  107. return fmt.Errorf("invalid DSN %s for file source, must start with file://", dsn)
  108. }
  109. pattern := strings.TrimPrefix(dsn, "file://")
  110. if len(pattern) == 0 {
  111. return fmt.Errorf("empty file:// DSN")
  112. }
  113. f.logger = logger
  114. files, err := filepath.Glob(pattern)
  115. if err != nil {
  116. return errors.Wrap(err, "Glob failure")
  117. }
  118. if len(files) == 0 {
  119. return fmt.Errorf("no matching files for pattern %s", pattern)
  120. }
  121. f.logger.Infof("Will read %d files", len(files))
  122. for _, file := range files {
  123. f.logger.Infof("Adding file %s to filelist", file)
  124. f.files = append(f.files, file)
  125. }
  126. return nil
  127. }
  128. func (f *FileSource) GetMode() string {
  129. return f.config.Mode
  130. }
  131. //SupportedModes returns the supported modes by the acquisition module
  132. func (f *FileSource) SupportedModes() []string {
  133. return []string{configuration.TAIL_MODE, configuration.CAT_MODE}
  134. }
  135. //OneShotAcquisition reads a set of file and returns when done
  136. func (f *FileSource) OneShotAcquisition(out chan types.Event, t *tomb.Tomb) error {
  137. for _, file := range f.files {
  138. fi, err := os.Stat(file)
  139. if err != nil {
  140. return fmt.Errorf("could not stat file %s : %w", file, err)
  141. }
  142. if fi.IsDir() {
  143. f.logger.Warnf("%s is a directory, ignoring it.", file)
  144. continue
  145. }
  146. f.logger.Infof("reading %s at once", file)
  147. err = f.readFile(file, out, t)
  148. if err != nil {
  149. return err
  150. }
  151. }
  152. return nil
  153. }
  154. func (f *FileSource) GetMetrics() []prometheus.Collector {
  155. return nil
  156. }
  157. func (f *FileSource) CanRun() error {
  158. return nil
  159. }
  160. func (f *FileSource) LiveAcquisition(out chan types.Event, t *tomb.Tomb) error {
  161. f.logger.Debugf("Starting live acquisition")
  162. t.Go(func() error {
  163. return f.monitorNewFiles(out, t)
  164. })
  165. for _, file := range f.files {
  166. tail, err := tail.TailFile(file, tail.Config{ReOpen: true, Follow: true, Poll: true, Location: &tail.SeekInfo{Offset: 0, Whence: io.SeekEnd}})
  167. if err != nil {
  168. f.logger.Errorf("Could not start tailing file %s : %s", file, err)
  169. continue
  170. }
  171. f.tails[file] = true
  172. t.Go(func() error {
  173. defer types.CatchPanic("crowdsec/acquis/file/live/fsnotify")
  174. return f.tailFile(out, t, tail)
  175. })
  176. }
  177. return nil
  178. }
  179. func (f *FileSource) Dump() interface{} {
  180. return f
  181. }
  182. func (f *FileSource) monitorNewFiles(out chan types.Event, t *tomb.Tomb) error {
  183. for {
  184. select {
  185. case event, ok := <-f.watcher.Events:
  186. if !ok {
  187. return nil
  188. }
  189. if event.Op&fsnotify.Create == fsnotify.Create {
  190. fi, err := os.Stat(event.Name)
  191. if err != nil {
  192. f.logger.Errorf("Could not stat() new file %s, ignoring it : %s", event.Name, err)
  193. }
  194. if fi.IsDir() {
  195. continue
  196. }
  197. f.logger.Infof("Detected new file %s", event.Name)
  198. matched := false
  199. for _, pattern := range f.config.Filenames {
  200. f.logger.Debugf("Matching %s with %s", pattern, event.Name)
  201. matched, err = path.Match(pattern, event.Name)
  202. if err != nil {
  203. f.logger.Errorf("Could not match pattern : %s", err)
  204. continue
  205. }
  206. if matched {
  207. break
  208. }
  209. }
  210. if !matched {
  211. continue
  212. }
  213. if f.tails[event.Name] {
  214. //we already have a tail on it, do not start a new one
  215. f.logger.Debugf("Already tailing file %s, not creating a new tail", event.Name)
  216. break
  217. }
  218. //Slightly different parameters for Location, as we want to read the first lines of the newly created file
  219. tail, err := tail.TailFile(event.Name, tail.Config{ReOpen: true, Follow: true, Poll: true, Location: &tail.SeekInfo{Offset: 0, Whence: io.SeekStart}})
  220. if err != nil {
  221. f.logger.Errorf("Could not start tailing file %s : %s", event.Name, err)
  222. break
  223. }
  224. f.tails[event.Name] = true
  225. t.Go(func() error {
  226. defer types.CatchPanic("crowdsec/acquis/tailfile")
  227. return f.tailFile(out, t, tail)
  228. })
  229. }
  230. case err, ok := <-f.watcher.Errors:
  231. if !ok {
  232. return nil
  233. }
  234. f.logger.Errorf("Error while monitoring folder: %s", err)
  235. }
  236. }
  237. }
  238. func (f *FileSource) tailFile(out chan types.Event, t *tomb.Tomb, tail *tail.Tail) error {
  239. //lint:ignore SA1015 as it is an infinite loop
  240. timeout := time.Tick(1 * time.Second)
  241. f.logger.Debugf("-> Starting tail of %s", tail.Filename)
  242. for {
  243. l := types.Line{}
  244. select {
  245. case <-t.Dying():
  246. f.logger.Infof("File datasource %s stopping", tail.Filename)
  247. if err := tail.Stop(); err != nil {
  248. f.logger.Errorf("error in stop : %s", err)
  249. }
  250. case <-tail.Tomb.Dying(): //our tailer is dying
  251. f.logger.Warningf("File reader of %s died", tail.Filename)
  252. t.Kill(fmt.Errorf("dead reader for %s", tail.Filename))
  253. return fmt.Errorf("reader for %s is dead", tail.Filename)
  254. case line := <-tail.Lines:
  255. if line == nil {
  256. f.logger.Debugf("Nil line")
  257. return fmt.Errorf("tail for %s is empty", tail.Filename)
  258. }
  259. if line.Err != nil {
  260. log.Warningf("fetch error : %v", line.Err)
  261. return line.Err
  262. }
  263. if line.Text == "" { //skip empty lines
  264. continue
  265. }
  266. //FIXME: prometheus metrics
  267. //ReaderHits.With(prometheus.Labels{"source": tail.Filename}).Inc()
  268. l.Raw = line.Text
  269. l.Labels = f.config.Labels
  270. l.Time = line.Time
  271. l.Src = tail.Filename
  272. l.Process = true
  273. //we're tailing, it must be real time logs
  274. f.logger.Debugf("pushing %+v", l)
  275. out <- types.Event{Line: l, Process: true, Type: types.LOG, ExpectMode: leaky.LIVE}
  276. case <-timeout:
  277. //time out, shall we do stuff ?
  278. f.logger.Trace("timeout")
  279. }
  280. }
  281. }
  282. func (f *FileSource) readFile(filename string, out chan types.Event, t *tomb.Tomb) error {
  283. var scanner *bufio.Scanner
  284. fd, err := os.Open(filename)
  285. if err != nil {
  286. return errors.Wrapf(err, "failed opening %s", filename)
  287. }
  288. defer fd.Close()
  289. if strings.HasSuffix(filename, ".gz") {
  290. gz, err := gzip.NewReader(fd)
  291. if err != nil {
  292. f.logger.Errorf("Failed to read gz file: %s", err)
  293. return errors.Wrapf(err, "failed to read gz %s", filename)
  294. }
  295. defer gz.Close()
  296. scanner = bufio.NewScanner(gz)
  297. } else {
  298. scanner = bufio.NewScanner(fd)
  299. }
  300. scanner.Split(bufio.ScanLines)
  301. for scanner.Scan() {
  302. f.logger.Debugf("line %s", scanner.Text())
  303. l := types.Line{}
  304. l.Raw = scanner.Text()
  305. l.Time = time.Now()
  306. l.Src = filename
  307. l.Labels = f.config.Labels
  308. l.Process = true
  309. //we're reading logs at once, it must be time-machine buckets
  310. out <- types.Event{Line: l, Process: true, Type: types.LOG, ExpectMode: leaky.TIMEMACHINE}
  311. }
  312. t.Kill(nil)
  313. return nil
  314. }