wip
This commit is contained in:
parent
b3eea2048e
commit
f08784339b
1 changed files with 14 additions and 2 deletions
|
@ -113,23 +113,34 @@ func (f *FileSource) Configure(Config []byte, logger *log.Entry) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
func (f *FileSource) ConfigureByDSN(dsn string, logger *log.Entry) error {
|
||||
func (f *FileSource) ConfigureByDSN(dsn string, labelType string, logger *log.Entry) error {
|
||||
if !strings.HasPrefix(dsn, "file://") {
|
||||
return fmt.Errorf("invalid DSN %s for file source, must start with file://", dsn)
|
||||
}
|
||||
|
||||
pattern := strings.TrimPrefix(dsn, "file://")
|
||||
|
||||
if len(pattern) == 0 {
|
||||
return fmt.Errorf("empty file:// DSN")
|
||||
}
|
||||
|
||||
f.logger = logger
|
||||
f.config = FileConfiguration{}
|
||||
f.config.Labels = map[string]string{"type": labelType}
|
||||
f.config.Mode = configuration.CAT_MODE
|
||||
|
||||
f.logger.Debugf("Will try pattern %s", pattern)
|
||||
files, err := filepath.Glob(pattern)
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "Glob failure")
|
||||
}
|
||||
|
||||
if len(files) == 0 {
|
||||
return fmt.Errorf("no matching files for pattern %s", pattern)
|
||||
}
|
||||
|
||||
f.logger.Infof("Will read %d files", len(files))
|
||||
|
||||
for _, file := range files {
|
||||
f.logger.Infof("Adding file %s to filelist", file)
|
||||
f.files = append(f.files, file)
|
||||
|
@ -148,6 +159,7 @@ func (f *FileSource) SupportedModes() []string {
|
|||
|
||||
//OneShotAcquisition reads a set of file and returns when done
|
||||
func (f *FileSource) OneShotAcquisition(out chan types.Event, t *tomb.Tomb) error {
|
||||
f.logger.Debug("In oneshot")
|
||||
for _, file := range f.files {
|
||||
fi, err := os.Stat(file)
|
||||
if err != nil {
|
||||
|
@ -176,7 +188,7 @@ func (f *FileSource) CanRun() error {
|
|||
}
|
||||
|
||||
func (f *FileSource) LiveAcquisition(out chan types.Event, t *tomb.Tomb) error {
|
||||
f.logger.Debugf("Starting live acquisition")
|
||||
f.logger.Debug("Starting live acquisition")
|
||||
t.Go(func() error {
|
||||
return f.monitorNewFiles(out, t)
|
||||
})
|
||||
|
|
Loading…
Add table
Reference in a new issue