diff --git a/pkg/acquisition/modules/file/file.go b/pkg/acquisition/modules/file/file.go index 179f23548..544a01596 100644 --- a/pkg/acquisition/modules/file/file.go +++ b/pkg/acquisition/modules/file/file.go @@ -4,6 +4,7 @@ import ( "bufio" "compress/gzip" "fmt" + "io" "os" "path" "path/filepath" @@ -29,60 +30,61 @@ type FileConfiguration struct { } type FileSource struct { - config FileConfiguration - watcher *fsnotify.Watcher - logger *log.Entry - files []string + config FileConfiguration + watcher *fsnotify.Watcher + watchedDirectories map[string]bool + tails map[string]bool + logger *log.Entry + files []string } func (f *FileSource) Configure(Config []byte, logger *log.Entry) error { fileConfig := FileConfiguration{} f.logger = logger + f.watchedDirectories = make(map[string]bool) + f.tails = make(map[string]bool) err := yaml.Unmarshal(Config, &fileConfig) - f.logger.Infof("%+v", fileConfig) if err != nil { - f.logger.Errorf("Could not parse configuration for File datasource : %s", err) - return err + return errors.Wrap(err, "[fileacquisition] Cannot parse FileAcquisition configuration") } + f.logger.Tracef("FileAcquisition configuration: %+v", fileConfig) if len(fileConfig.Filename) != 0 { fileConfig.Filenames = append(fileConfig.Filenames, fileConfig.Filename) } if len(fileConfig.Filenames) == 0 { - f.logger.Errorf("No filename or filenames configuration provided") - return errors.New("No filename or filenames configuration provided") + return fmt.Errorf("[fileacquisition] no filename or filenames configuration provided") } f.config = fileConfig - f.config.Mode = configuration.TAIL_MODE + f.config.Mode = configuration.TAIL_MODE // FIXME f.watcher, err = fsnotify.NewWatcher() if err != nil { - f.logger.Errorf("Could not create fsnotify watcher : %s", err) - return err + return errors.Wrapf(err, "[fileacquisition] Could not create fsnotify watcher") } - f.logger.Infof("%+v", f.config) + f.logger.Tracef("Actual FileAcquisition Configuration %+v", f.config) for _, pattern := range f.config.Filenames { files, err := filepath.Glob(pattern) if err != nil { - f.logger.Errorf("Glob failure: %s", err) - return err + return errors.Wrap(err, "[fileacquisition] Glob failure") } if len(files) == 0 { f.logger.Infof("No matching files for pattern %s", pattern) continue } + f.logger.Infof("Will read %d files", len(files)) for _, file := range files { - f.logger.Infof("In config for file %s", file) - f.logger.Infof("Files: %+v", files) - f.logger.Infof("Mode: %s", f.config.Mode) if files[0] != pattern && f.config.Mode == configuration.TAIL_MODE { //we have a glob pattern - //TODO: add only one watch per directory - f.logger.Infof("Adding watch on %s", path.Dir(file)) - err = f.watcher.Add(path.Dir(file)) - if err != nil { - f.logger.Errorf("Could not create watch on directory %s : %s", path.Dir(file), err) - return err + directory := path.Dir(file) + if !f.watchedDirectories[directory] { + f.logger.Infof("Adding watch on %s", directory) + err = f.watcher.Add(directory) + if err != nil { + f.logger.Errorf("Could not create watch on directory %s : %s", directory, err) + continue + } + f.watchedDirectories[directory] = true } - f.logger.Infof("Adding file %s", file) } + f.logger.Infof("Adding file %s to filelist", file) f.files = append(f.files, file) } } @@ -93,10 +95,12 @@ func (f *FileSource) GetMode() string { return f.config.Mode } +//SupportedModes returns the supported modes by the acquisition module func (f *FileSource) SupportedModes() []string { return []string{configuration.TAIL_MODE, configuration.CAT_MODE} } +//OneShotAcquisition reads a set of file and returns when done func (f *FileSource) OneShotAcquisition(out chan types.Event, t *tomb.Tomb) error { log.Infof("Starting oneshot acquisition on %d files", len(f.files)) for _, filename := range f.files { @@ -118,13 +122,14 @@ func (f *FileSource) CanRun() error { } func (f *FileSource) LiveAcquisition(out chan types.Event, t *tomb.Tomb) error { - f.logger.Infof("Starting live acquisition") + f.logger.Debugf("Starting live acquisition") for _, file := range f.files { - tail, err := tail.TailFile(file, tail.Config{ReOpen: true, Follow: true, Poll: true, Location: &tail.SeekInfo{Offset: 0, Whence: 2}}) + tail, err := tail.TailFile(file, tail.Config{ReOpen: true, Follow: true, Poll: true, Location: &tail.SeekInfo{Offset: 0, Whence: io.SeekEnd}}) if err != nil { f.logger.Errorf("Could not start tailing file %s : %s", file, err) continue } + f.tails[file] = true t.Go(func() error { return f.monitorNewFiles(out, t) }) @@ -143,14 +148,42 @@ func (f *FileSource) monitorNewFiles(out chan types.Event, t *tomb.Tomb) error { if !ok { return nil } - log.Println("event:", event) if event.Op&fsnotify.Create == fsnotify.Create { - f.logger.Infof("Detected new file %s", event.Name) - tail, err := tail.TailFile(event.Name, tail.Config{ReOpen: true, Follow: true, Poll: true, Location: &tail.SeekInfo{Offset: 0, Whence: 2}}) + fi, err := os.Stat(event.Name) if err != nil { - f.logger.Errorf("Could not start tailing file %s : %s", event.Name, err) + f.logger.Errorf("Could not stat() new file %s, ignoring it : %s", event.Name, err) + } + if fi.IsDir() { continue } + f.logger.Infof("Detected new file %s", event.Name) + matched := false + for _, pattern := range f.config.Filenames { + f.logger.Debugf("Matching %s with %s", pattern, event.Name) + matched, err = path.Match(pattern, event.Name) + if err != nil { + f.logger.Errorf("Could not match pattern : %s", err) + continue + } + if matched { + break + } + } + if !matched { + continue + } + if f.tails[event.Name] { + //we already have a tail on it, do not start a new one + f.logger.Debugf("Already tailing file %s, not creating a new tail", event.Name) + break + } + //Slightly different parameters for Location, as we want to read the first lines of the newly created file + tail, err := tail.TailFile(event.Name, tail.Config{ReOpen: true, Follow: true, Poll: true, Location: &tail.SeekInfo{Offset: 0, Whence: io.SeekStart}}) + if err != nil { + f.logger.Errorf("Could not start tailing file %s : %s", event.Name, err) + break + } + f.tails[event.Name] = true t.Go(func() error { defer types.CatchPanic("crowdsec/acquis/tailfile") return f.tailFile(out, t, tail) @@ -160,7 +193,7 @@ func (f *FileSource) monitorNewFiles(out chan types.Event, t *tomb.Tomb) error { if !ok { return nil } - log.Println("error:", err) + f.logger.Errorf("Error while monitoring folder: %s", err) } } } @@ -168,7 +201,7 @@ func (f *FileSource) monitorNewFiles(out chan types.Event, t *tomb.Tomb) error { func (f *FileSource) tailFile(out chan types.Event, t *tomb.Tomb, tail *tail.Tail) error { //lint:ignore SA1015 as it is an infinite loop timeout := time.Tick(1 * time.Second) - f.logger.Infof("-> Starting tail of %s", tail.Filename) + f.logger.Debugf("-> Starting tail of %s", tail.Filename) for { l := types.Line{} select { @@ -193,6 +226,7 @@ func (f *FileSource) tailFile(out chan types.Event, t *tomb.Tomb, tail *tail.Tai if line.Text == "" { //skip empty lines continue } + //FIXME: prometheus metrics //ReaderHits.With(prometheus.Labels{"source": tail.Filename}).Inc() l.Raw = line.Text @@ -201,11 +235,11 @@ func (f *FileSource) tailFile(out chan types.Event, t *tomb.Tomb, tail *tail.Tai l.Src = tail.Filename l.Process = true //we're tailing, it must be real time logs - f.logger.Infof("pushing %+v", l) + f.logger.Debugf("pushing %+v", l) out <- types.Event{Line: l, Process: true, Type: types.LOG, ExpectMode: leaky.LIVE} case <-timeout: //time out, shall we do stuff ? - f.logger.Debugf("timeout") + f.logger.Trace("timeout") } } } @@ -213,12 +247,9 @@ func (f *FileSource) tailFile(out chan types.Event, t *tomb.Tomb, tail *tail.Tai func (f *FileSource) readFile(filename string, out chan types.Event, t *tomb.Tomb) error { var scanner *bufio.Scanner - clog := log.WithFields(log.Fields{ - "file": filename, - }) fd, err := os.Open(filename) if err != nil { - clog.Errorf("Failed opening file: %s", err) + f.logger.Errorf("Failed opening file: %s", err) return errors.Wrapf(err, "failed opening %s", filename) } defer fd.Close() @@ -226,7 +257,7 @@ func (f *FileSource) readFile(filename string, out chan types.Event, t *tomb.Tom if strings.HasSuffix(filename, ".gz") { gz, err := gzip.NewReader(fd) if err != nil { - clog.Errorf("Failed to read gz file: %s", err) + f.logger.Errorf("Failed to read gz file: %s", err) return errors.Wrapf(err, "failed to read gz %s", filename) } defer gz.Close() @@ -237,15 +268,14 @@ func (f *FileSource) readFile(filename string, out chan types.Event, t *tomb.Tom } scanner.Split(bufio.ScanLines) for scanner.Scan() { - log.Infof("line %s", scanner.Text()) + f.logger.Debugf("line %s", scanner.Text()) l := types.Line{} l.Raw = scanner.Text() l.Time = time.Now() l.Src = filename l.Labels = f.config.Labels l.Process = true - // FIXME: How to interact with prom metrics ? - //ReaderHits.With(prometheus.Labels{"source": filename}).Inc() + //we're reading logs at once, it must be time-machine buckets out <- types.Event{Line: l, Process: true, Type: types.LOG, ExpectMode: leaky.TIMEMACHINE} }