123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442 |
- package fileacquisition
- import (
- "bufio"
- "compress/gzip"
- "fmt"
- "io"
- "net/url"
- "os"
- "path"
- "path/filepath"
- "strings"
- "time"
- "github.com/crowdsecurity/crowdsec/pkg/acquisition/configuration"
- leaky "github.com/crowdsecurity/crowdsec/pkg/leakybucket"
- "github.com/crowdsecurity/crowdsec/pkg/types"
- "github.com/fsnotify/fsnotify"
- "github.com/nxadm/tail"
- "github.com/pkg/errors"
- "github.com/prometheus/client_golang/prometheus"
- log "github.com/sirupsen/logrus"
- "gopkg.in/tomb.v2"
- "gopkg.in/yaml.v2"
- )
- var linesRead = prometheus.NewCounterVec(
- prometheus.CounterOpts{
- Name: "cs_filesource_hits_total",
- Help: "Total lines that were read.",
- },
- []string{"source"})
- type FileConfiguration struct {
- Filenames []string
- Filename string
- ForceInotify bool `yaml:"force_inotify"`
- configuration.DataSourceCommonCfg `yaml:",inline"`
- }
- type FileSource struct {
- config FileConfiguration
- watcher *fsnotify.Watcher
- watchedDirectories map[string]bool
- tails map[string]bool
- logger *log.Entry
- files []string
- }
- func (f *FileSource) Configure(Config []byte, logger *log.Entry) error {
- fileConfig := FileConfiguration{}
- f.logger = logger
- f.watchedDirectories = make(map[string]bool)
- f.tails = make(map[string]bool)
- err := yaml.UnmarshalStrict(Config, &fileConfig)
- if err != nil {
- return errors.Wrap(err, "Cannot parse FileAcquisition configuration")
- }
- f.logger.Tracef("FileAcquisition configuration: %+v", fileConfig)
- if len(fileConfig.Filename) != 0 {
- fileConfig.Filenames = append(fileConfig.Filenames, fileConfig.Filename)
- }
- if len(fileConfig.Filenames) == 0 {
- return fmt.Errorf("no filename or filenames configuration provided")
- }
- f.config = fileConfig
- if f.config.Mode == "" {
- f.config.Mode = configuration.TAIL_MODE
- }
- if f.config.Mode != configuration.CAT_MODE && f.config.Mode != configuration.TAIL_MODE {
- return fmt.Errorf("unsupported mode %s for file source", f.config.Mode)
- }
- f.watcher, err = fsnotify.NewWatcher()
- if err != nil {
- return errors.Wrapf(err, "Could not create fsnotify watcher")
- }
- f.logger.Tracef("Actual FileAcquisition Configuration %+v", f.config)
- for _, pattern := range f.config.Filenames {
- if f.config.ForceInotify {
- directory := filepath.Dir(pattern)
- f.logger.Infof("Force add watch on %s", directory)
- if !f.watchedDirectories[directory] {
- err = f.watcher.Add(directory)
- if err != nil {
- f.logger.Errorf("Could not create watch on directory %s : %s", directory, err)
- continue
- }
- f.watchedDirectories[directory] = true
- }
- }
- files, err := filepath.Glob(pattern)
- if err != nil {
- return errors.Wrap(err, "Glob failure")
- }
- if len(files) == 0 {
- f.logger.Warnf("No matching files for pattern %s", pattern)
- continue
- }
- for _, file := range files {
- if files[0] != pattern && f.config.Mode == configuration.TAIL_MODE { //we have a glob pattern
- directory := filepath.Dir(file)
- f.logger.Debugf("Will add watch to directory: %s", directory)
- if !f.watchedDirectories[directory] {
- err = f.watcher.Add(directory)
- if err != nil {
- f.logger.Errorf("Could not create watch on directory %s : %s", directory, err)
- continue
- }
- f.watchedDirectories[directory] = true
- } else {
- f.logger.Debugf("Watch for directory %s already exists", directory)
- }
- }
- f.logger.Infof("Adding file %s to datasources", file)
- f.files = append(f.files, file)
- }
- }
- return nil
- }
- func (f *FileSource) ConfigureByDSN(dsn string, labels map[string]string, logger *log.Entry) error {
- if !strings.HasPrefix(dsn, "file://") {
- return fmt.Errorf("invalid DSN %s for file source, must start with file://", dsn)
- }
- f.logger = logger
- dsn = strings.TrimPrefix(dsn, "file://")
- args := strings.Split(dsn, "?")
- if len(args[0]) == 0 {
- return fmt.Errorf("empty file:// DSN")
- }
- if len(args) == 2 && len(args[1]) != 0 {
- params, err := url.ParseQuery(args[1])
- if err != nil {
- return errors.Wrap(err, "could not parse file args")
- }
- for key, value := range params {
- if key != "log_level" {
- return fmt.Errorf("unsupported key %s in file DSN", key)
- }
- if len(value) != 1 {
- return errors.New("expected zero or one value for 'log_level'")
- }
- lvl, err := log.ParseLevel(value[0])
- if err != nil {
- return errors.Wrapf(err, "unknown level %s", value[0])
- }
- f.logger.Logger.SetLevel(lvl)
- }
- }
- f.config = FileConfiguration{}
- f.config.Labels = labels
- f.config.Mode = configuration.CAT_MODE
- f.logger.Debugf("Will try pattern %s", args[0])
- files, err := filepath.Glob(args[0])
- if err != nil {
- return errors.Wrap(err, "Glob failure")
- }
- if len(files) == 0 {
- return fmt.Errorf("no matching files for pattern %s", args[0])
- }
- if len(files) > 1 {
- f.logger.Infof("Will read %d files", len(files))
- }
- for _, file := range files {
- f.logger.Infof("Adding file %s to filelist", file)
- f.files = append(f.files, file)
- }
- return nil
- }
- func (f *FileSource) GetMode() string {
- return f.config.Mode
- }
- //SupportedModes returns the supported modes by the acquisition module
- func (f *FileSource) SupportedModes() []string {
- return []string{configuration.TAIL_MODE, configuration.CAT_MODE}
- }
- //OneShotAcquisition reads a set of file and returns when done
- func (f *FileSource) OneShotAcquisition(out chan types.Event, t *tomb.Tomb) error {
- f.logger.Debug("In oneshot")
- for _, file := range f.files {
- fi, err := os.Stat(file)
- if err != nil {
- return fmt.Errorf("could not stat file %s : %w", file, err)
- }
- if fi.IsDir() {
- f.logger.Warnf("%s is a directory, ignoring it.", file)
- continue
- }
- f.logger.Infof("reading %s at once", file)
- err = f.readFile(file, out, t)
- if err != nil {
- return err
- }
- }
- return nil
- }
- func (f *FileSource) GetMetrics() []prometheus.Collector {
- return []prometheus.Collector{linesRead}
- }
- func (f *FileSource) GetAggregMetrics() []prometheus.Collector {
- return []prometheus.Collector{linesRead}
- }
- func (f *FileSource) GetName() string {
- return "file"
- }
- func (f *FileSource) CanRun() error {
- return nil
- }
- func (f *FileSource) StreamingAcquisition(out chan types.Event, t *tomb.Tomb) error {
- f.logger.Debug("Starting live acquisition")
- t.Go(func() error {
- return f.monitorNewFiles(out, t)
- })
- for _, file := range f.files {
- //cf. https://github.com/crowdsecurity/crowdsec/issues/1168
- //do not rely on stat, reclose file immediately as it's opened by Tail
- fd, err := os.Open(file)
- if err != nil {
- f.logger.Errorf("unable to read %s : %s", file, err)
- continue
- }
- if err := fd.Close(); err != nil {
- f.logger.Errorf("unable to close %s : %s", file, err)
- continue
- }
- fi, err := os.Stat(file)
- if err != nil {
- return fmt.Errorf("could not stat file %s : %w", file, err)
- }
- if fi.IsDir() {
- f.logger.Warnf("%s is a directory, ignoring it.", file)
- continue
- }
- tail, err := tail.TailFile(file, tail.Config{ReOpen: true, Follow: true, Poll: true, Location: &tail.SeekInfo{Offset: 0, Whence: io.SeekEnd}})
- if err != nil {
- f.logger.Errorf("Could not start tailing file %s : %s", file, err)
- continue
- }
- f.tails[file] = true
- t.Go(func() error {
- defer types.CatchPanic("crowdsec/acquis/file/live/fsnotify")
- return f.tailFile(out, t, tail)
- })
- }
- return nil
- }
- func (f *FileSource) Dump() interface{} {
- return f
- }
- func (f *FileSource) monitorNewFiles(out chan types.Event, t *tomb.Tomb) error {
- logger := f.logger.WithField("goroutine", "inotify")
- for {
- select {
- case event, ok := <-f.watcher.Events:
- if !ok {
- return nil
- }
- if event.Op&fsnotify.Create == fsnotify.Create {
- fi, err := os.Stat(event.Name)
- if err != nil {
- logger.Errorf("Could not stat() new file %s, ignoring it : %s", event.Name, err)
- continue
- }
- if fi.IsDir() {
- continue
- }
- logger.Debugf("Detected new file %s", event.Name)
- matched := false
- for _, pattern := range f.config.Filenames {
- logger.Debugf("Matching %s with %s", pattern, event.Name)
- matched, err = path.Match(pattern, event.Name)
- if err != nil {
- logger.Errorf("Could not match pattern : %s", err)
- continue
- }
- if matched {
- break
- }
- }
- if !matched {
- continue
- }
- if f.tails[event.Name] {
- //we already have a tail on it, do not start a new one
- logger.Debugf("Already tailing file %s, not creating a new tail", event.Name)
- break
- }
- //cf. https://github.com/crowdsecurity/crowdsec/issues/1168
- //do not rely on stat, reclose file immediately as it's opened by Tail
- fd, err := os.Open(event.Name)
- if err != nil {
- f.logger.Errorf("unable to read %s : %s", event.Name, err)
- continue
- }
- if err := fd.Close(); err != nil {
- f.logger.Errorf("unable to close %s : %s", event.Name, err)
- continue
- }
- //Slightly different parameters for Location, as we want to read the first lines of the newly created file
- tail, err := tail.TailFile(event.Name, tail.Config{ReOpen: true, Follow: true, Poll: true, Location: &tail.SeekInfo{Offset: 0, Whence: io.SeekStart}})
- if err != nil {
- logger.Errorf("Could not start tailing file %s : %s", event.Name, err)
- break
- }
- f.tails[event.Name] = true
- t.Go(func() error {
- defer types.CatchPanic("crowdsec/acquis/tailfile")
- return f.tailFile(out, t, tail)
- })
- }
- case err, ok := <-f.watcher.Errors:
- if !ok {
- return nil
- }
- logger.Errorf("Error while monitoring folder: %s", err)
- case <-t.Dying():
- err := f.watcher.Close()
- if err != nil {
- return errors.Wrapf(err, "could not remove all inotify watches")
- }
- return nil
- }
- }
- }
- func (f *FileSource) tailFile(out chan types.Event, t *tomb.Tomb, tail *tail.Tail) error {
- logger := f.logger.WithField("tail", tail.Filename)
- logger.Debugf("-> Starting tail of %s", tail.Filename)
- for {
- select {
- case <-t.Dying():
- logger.Infof("File datasource %s stopping", tail.Filename)
- if err := tail.Stop(); err != nil {
- f.logger.Errorf("error in stop : %s", err)
- return err
- }
- return nil
- case <-tail.Tomb.Dying(): //our tailer is dying
- logger.Warningf("File reader of %s died", tail.Filename)
- t.Kill(fmt.Errorf("dead reader for %s", tail.Filename))
- return fmt.Errorf("reader for %s is dead", tail.Filename)
- case line := <-tail.Lines:
- if line == nil {
- logger.Debugf("Nil line")
- return fmt.Errorf("tail for %s is empty", tail.Filename)
- }
- if line.Err != nil {
- logger.Warningf("fetch error : %v", line.Err)
- return line.Err
- }
- if line.Text == "" { //skip empty lines
- continue
- }
- linesRead.With(prometheus.Labels{"source": tail.Filename}).Inc()
- l := types.Line{
- Raw: trimLine(line.Text),
- Labels: f.config.Labels,
- Time: line.Time,
- Src: tail.Filename,
- Process: true,
- Module: f.GetName(),
- }
- //we're tailing, it must be real time logs
- logger.Debugf("pushing %+v", l)
- expectMode := leaky.LIVE
- if f.config.UseTimeMachine {
- expectMode = leaky.TIMEMACHINE
- }
- out <- types.Event{Line: l, Process: true, Type: types.LOG, ExpectMode: expectMode}
- }
- }
- }
- func (f *FileSource) readFile(filename string, out chan types.Event, t *tomb.Tomb) error {
- var scanner *bufio.Scanner
- logger := f.logger.WithField("oneshot", filename)
- fd, err := os.Open(filename)
- if err != nil {
- return errors.Wrapf(err, "failed opening %s", filename)
- }
- defer fd.Close()
- if strings.HasSuffix(filename, ".gz") {
- gz, err := gzip.NewReader(fd)
- if err != nil {
- logger.Errorf("Failed to read gz file: %s", err)
- return errors.Wrapf(err, "failed to read gz %s", filename)
- }
- defer gz.Close()
- scanner = bufio.NewScanner(gz)
- } else {
- scanner = bufio.NewScanner(fd)
- }
- scanner.Split(bufio.ScanLines)
- for scanner.Scan() {
- if scanner.Text() == "" {
- continue
- }
- l := types.Line{
- Raw: scanner.Text(),
- Time: time.Now().UTC(),
- Src: filename,
- Labels: f.config.Labels,
- Process: true,
- Module: f.GetName(),
- }
- logger.Debugf("line %s", l.Raw)
- linesRead.With(prometheus.Labels{"source": filename}).Inc()
- //we're reading logs at once, it must be time-machine buckets
- out <- types.Event{Line: l, Process: true, Type: types.LOG, ExpectMode: leaky.TIMEMACHINE}
- }
- t.Kill(nil)
- return nil
- }
|