|
@@ -108,6 +108,30 @@ func (f *FileSource) Configure(Config []byte, logger *log.Entry) error {
|
|
return nil
|
|
return nil
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+func (f *FileSource) ConfigureByDSN(dsn string, logger *log.Entry) error {
|
|
|
|
+ if !strings.HasPrefix(dsn, "file://") {
|
|
|
|
+ return fmt.Errorf("invalid DSN %s for file source, must start with file://", dsn)
|
|
|
|
+ }
|
|
|
|
+ pattern := strings.TrimPrefix(dsn, "file://")
|
|
|
|
+ if len(pattern) == 0 {
|
|
|
|
+ return fmt.Errorf("empty file:// DSN")
|
|
|
|
+ }
|
|
|
|
+ f.logger = logger
|
|
|
|
+ files, err := filepath.Glob(pattern)
|
|
|
|
+ if err != nil {
|
|
|
|
+ return errors.Wrap(err, "Glob failure")
|
|
|
|
+ }
|
|
|
|
+ if len(files) == 0 {
|
|
|
|
+ return fmt.Errorf("no matching files for pattern %s", pattern)
|
|
|
|
+ }
|
|
|
|
+ f.logger.Infof("Will read %d files", len(files))
|
|
|
|
+ for _, file := range files {
|
|
|
|
+ f.logger.Infof("Adding file %s to filelist", file)
|
|
|
|
+ f.files = append(f.files, file)
|
|
|
|
+ }
|
|
|
|
+ return nil
|
|
|
|
+}
|
|
|
|
+
|
|
func (f *FileSource) GetMode() string {
|
|
func (f *FileSource) GetMode() string {
|
|
return f.config.Mode
|
|
return f.config.Mode
|
|
}
|
|
}
|
|
@@ -119,21 +143,21 @@ func (f *FileSource) SupportedModes() []string {
|
|
|
|
|
|
//OneShotAcquisition reads a set of file and returns when done
|
|
//OneShotAcquisition reads a set of file and returns when done
|
|
func (f *FileSource) OneShotAcquisition(out chan types.Event, t *tomb.Tomb) error {
|
|
func (f *FileSource) OneShotAcquisition(out chan types.Event, t *tomb.Tomb) error {
|
|
- f.logger.Infof("Starting oneshot acquisition on %d files", len(f.files))
|
|
|
|
- for _, filename := range f.files {
|
|
|
|
- fi, err := os.Stat(filename)
|
|
|
|
|
|
+ for _, file := range f.files {
|
|
|
|
+ fi, err := os.Stat(file)
|
|
if err != nil {
|
|
if err != nil {
|
|
- return fmt.Errorf("could not stat file %s : %w", filename, err)
|
|
|
|
|
|
+ return fmt.Errorf("could not stat file %s : %w", file, err)
|
|
}
|
|
}
|
|
if fi.IsDir() {
|
|
if fi.IsDir() {
|
|
- f.logger.Warnf("%s is a directory, ignoring it.", filename)
|
|
|
|
|
|
+ f.logger.Warnf("%s is a directory, ignoring it.", file)
|
|
continue
|
|
continue
|
|
}
|
|
}
|
|
- f.logger.Infof("reading %s at once", filename)
|
|
|
|
- err = f.readFile(filename, out, t)
|
|
|
|
|
|
+ f.logger.Infof("reading %s at once", file)
|
|
|
|
+ err = f.readFile(file, out, t)
|
|
if err != nil {
|
|
if err != nil {
|
|
return err
|
|
return err
|
|
}
|
|
}
|
|
|
|
+
|
|
}
|
|
}
|
|
return nil
|
|
return nil
|
|
}
|
|
}
|