wip
This commit is contained in:
parent
e72bb274ad
commit
a778f1b6fe
1 changed files with 73 additions and 43 deletions
|
@ -4,6 +4,7 @@ import (
|
||||||
"bufio"
|
"bufio"
|
||||||
"compress/gzip"
|
"compress/gzip"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"io"
|
||||||
"os"
|
"os"
|
||||||
"path"
|
"path"
|
||||||
"path/filepath"
|
"path/filepath"
|
||||||
|
@ -31,6 +32,8 @@ type FileConfiguration struct {
|
||||||
type FileSource struct {
|
type FileSource struct {
|
||||||
config FileConfiguration
|
config FileConfiguration
|
||||||
watcher *fsnotify.Watcher
|
watcher *fsnotify.Watcher
|
||||||
|
watchedDirectories map[string]bool
|
||||||
|
tails map[string]bool
|
||||||
logger *log.Entry
|
logger *log.Entry
|
||||||
files []string
|
files []string
|
||||||
}
|
}
|
||||||
|
@ -38,51 +41,50 @@ type FileSource struct {
|
||||||
func (f *FileSource) Configure(Config []byte, logger *log.Entry) error {
|
func (f *FileSource) Configure(Config []byte, logger *log.Entry) error {
|
||||||
fileConfig := FileConfiguration{}
|
fileConfig := FileConfiguration{}
|
||||||
f.logger = logger
|
f.logger = logger
|
||||||
|
f.watchedDirectories = make(map[string]bool)
|
||||||
|
f.tails = make(map[string]bool)
|
||||||
err := yaml.Unmarshal(Config, &fileConfig)
|
err := yaml.Unmarshal(Config, &fileConfig)
|
||||||
f.logger.Infof("%+v", fileConfig)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
f.logger.Errorf("Could not parse configuration for File datasource : %s", err)
|
return errors.Wrap(err, "[fileacquisition] Cannot parse FileAcquisition configuration")
|
||||||
return err
|
|
||||||
}
|
}
|
||||||
|
f.logger.Tracef("FileAcquisition configuration: %+v", fileConfig)
|
||||||
if len(fileConfig.Filename) != 0 {
|
if len(fileConfig.Filename) != 0 {
|
||||||
fileConfig.Filenames = append(fileConfig.Filenames, fileConfig.Filename)
|
fileConfig.Filenames = append(fileConfig.Filenames, fileConfig.Filename)
|
||||||
}
|
}
|
||||||
if len(fileConfig.Filenames) == 0 {
|
if len(fileConfig.Filenames) == 0 {
|
||||||
f.logger.Errorf("No filename or filenames configuration provided")
|
return fmt.Errorf("[fileacquisition] no filename or filenames configuration provided")
|
||||||
return errors.New("No filename or filenames configuration provided")
|
|
||||||
}
|
}
|
||||||
f.config = fileConfig
|
f.config = fileConfig
|
||||||
f.config.Mode = configuration.TAIL_MODE
|
f.config.Mode = configuration.TAIL_MODE // FIXME
|
||||||
f.watcher, err = fsnotify.NewWatcher()
|
f.watcher, err = fsnotify.NewWatcher()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
f.logger.Errorf("Could not create fsnotify watcher : %s", err)
|
return errors.Wrapf(err, "[fileacquisition] Could not create fsnotify watcher")
|
||||||
return err
|
|
||||||
}
|
}
|
||||||
f.logger.Infof("%+v", f.config)
|
f.logger.Tracef("Actual FileAcquisition Configuration %+v", f.config)
|
||||||
for _, pattern := range f.config.Filenames {
|
for _, pattern := range f.config.Filenames {
|
||||||
files, err := filepath.Glob(pattern)
|
files, err := filepath.Glob(pattern)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
f.logger.Errorf("Glob failure: %s", err)
|
return errors.Wrap(err, "[fileacquisition] Glob failure")
|
||||||
return err
|
|
||||||
}
|
}
|
||||||
if len(files) == 0 {
|
if len(files) == 0 {
|
||||||
f.logger.Infof("No matching files for pattern %s", pattern)
|
f.logger.Infof("No matching files for pattern %s", pattern)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
f.logger.Infof("Will read %d files", len(files))
|
||||||
for _, file := range files {
|
for _, file := range files {
|
||||||
f.logger.Infof("In config for file %s", file)
|
|
||||||
f.logger.Infof("Files: %+v", files)
|
|
||||||
f.logger.Infof("Mode: %s", f.config.Mode)
|
|
||||||
if files[0] != pattern && f.config.Mode == configuration.TAIL_MODE { //we have a glob pattern
|
if files[0] != pattern && f.config.Mode == configuration.TAIL_MODE { //we have a glob pattern
|
||||||
//TODO: add only one watch per directory
|
directory := path.Dir(file)
|
||||||
f.logger.Infof("Adding watch on %s", path.Dir(file))
|
if !f.watchedDirectories[directory] {
|
||||||
err = f.watcher.Add(path.Dir(file))
|
f.logger.Infof("Adding watch on %s", directory)
|
||||||
|
err = f.watcher.Add(directory)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
f.logger.Errorf("Could not create watch on directory %s : %s", path.Dir(file), err)
|
f.logger.Errorf("Could not create watch on directory %s : %s", directory, err)
|
||||||
return err
|
continue
|
||||||
}
|
}
|
||||||
f.logger.Infof("Adding file %s", file)
|
f.watchedDirectories[directory] = true
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
f.logger.Infof("Adding file %s to filelist", file)
|
||||||
f.files = append(f.files, file)
|
f.files = append(f.files, file)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -93,10 +95,12 @@ func (f *FileSource) GetMode() string {
|
||||||
return f.config.Mode
|
return f.config.Mode
|
||||||
}
|
}
|
||||||
|
|
||||||
|
//SupportedModes returns the supported modes by the acquisition module
|
||||||
func (f *FileSource) SupportedModes() []string {
|
func (f *FileSource) SupportedModes() []string {
|
||||||
return []string{configuration.TAIL_MODE, configuration.CAT_MODE}
|
return []string{configuration.TAIL_MODE, configuration.CAT_MODE}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
//OneShotAcquisition reads a set of file and returns when done
|
||||||
func (f *FileSource) OneShotAcquisition(out chan types.Event, t *tomb.Tomb) error {
|
func (f *FileSource) OneShotAcquisition(out chan types.Event, t *tomb.Tomb) error {
|
||||||
log.Infof("Starting oneshot acquisition on %d files", len(f.files))
|
log.Infof("Starting oneshot acquisition on %d files", len(f.files))
|
||||||
for _, filename := range f.files {
|
for _, filename := range f.files {
|
||||||
|
@ -118,13 +122,14 @@ func (f *FileSource) CanRun() error {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (f *FileSource) LiveAcquisition(out chan types.Event, t *tomb.Tomb) error {
|
func (f *FileSource) LiveAcquisition(out chan types.Event, t *tomb.Tomb) error {
|
||||||
f.logger.Infof("Starting live acquisition")
|
f.logger.Debugf("Starting live acquisition")
|
||||||
for _, file := range f.files {
|
for _, file := range f.files {
|
||||||
tail, err := tail.TailFile(file, tail.Config{ReOpen: true, Follow: true, Poll: true, Location: &tail.SeekInfo{Offset: 0, Whence: 2}})
|
tail, err := tail.TailFile(file, tail.Config{ReOpen: true, Follow: true, Poll: true, Location: &tail.SeekInfo{Offset: 0, Whence: io.SeekEnd}})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
f.logger.Errorf("Could not start tailing file %s : %s", file, err)
|
f.logger.Errorf("Could not start tailing file %s : %s", file, err)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
f.tails[file] = true
|
||||||
t.Go(func() error {
|
t.Go(func() error {
|
||||||
return f.monitorNewFiles(out, t)
|
return f.monitorNewFiles(out, t)
|
||||||
})
|
})
|
||||||
|
@ -143,14 +148,42 @@ func (f *FileSource) monitorNewFiles(out chan types.Event, t *tomb.Tomb) error {
|
||||||
if !ok {
|
if !ok {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
log.Println("event:", event)
|
|
||||||
if event.Op&fsnotify.Create == fsnotify.Create {
|
if event.Op&fsnotify.Create == fsnotify.Create {
|
||||||
f.logger.Infof("Detected new file %s", event.Name)
|
fi, err := os.Stat(event.Name)
|
||||||
tail, err := tail.TailFile(event.Name, tail.Config{ReOpen: true, Follow: true, Poll: true, Location: &tail.SeekInfo{Offset: 0, Whence: 2}})
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
f.logger.Errorf("Could not start tailing file %s : %s", event.Name, err)
|
f.logger.Errorf("Could not stat() new file %s, ignoring it : %s", event.Name, err)
|
||||||
|
}
|
||||||
|
if fi.IsDir() {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
f.logger.Infof("Detected new file %s", event.Name)
|
||||||
|
matched := false
|
||||||
|
for _, pattern := range f.config.Filenames {
|
||||||
|
f.logger.Debugf("Matching %s with %s", pattern, event.Name)
|
||||||
|
matched, err = path.Match(pattern, event.Name)
|
||||||
|
if err != nil {
|
||||||
|
f.logger.Errorf("Could not match pattern : %s", err)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
if matched {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if !matched {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
if f.tails[event.Name] {
|
||||||
|
//we already have a tail on it, do not start a new one
|
||||||
|
f.logger.Debugf("Already tailing file %s, not creating a new tail", event.Name)
|
||||||
|
break
|
||||||
|
}
|
||||||
|
//Slightly different parameters for Location, as we want to read the first lines of the newly created file
|
||||||
|
tail, err := tail.TailFile(event.Name, tail.Config{ReOpen: true, Follow: true, Poll: true, Location: &tail.SeekInfo{Offset: 0, Whence: io.SeekStart}})
|
||||||
|
if err != nil {
|
||||||
|
f.logger.Errorf("Could not start tailing file %s : %s", event.Name, err)
|
||||||
|
break
|
||||||
|
}
|
||||||
|
f.tails[event.Name] = true
|
||||||
t.Go(func() error {
|
t.Go(func() error {
|
||||||
defer types.CatchPanic("crowdsec/acquis/tailfile")
|
defer types.CatchPanic("crowdsec/acquis/tailfile")
|
||||||
return f.tailFile(out, t, tail)
|
return f.tailFile(out, t, tail)
|
||||||
|
@ -160,7 +193,7 @@ func (f *FileSource) monitorNewFiles(out chan types.Event, t *tomb.Tomb) error {
|
||||||
if !ok {
|
if !ok {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
log.Println("error:", err)
|
f.logger.Errorf("Error while monitoring folder: %s", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -168,7 +201,7 @@ func (f *FileSource) monitorNewFiles(out chan types.Event, t *tomb.Tomb) error {
|
||||||
func (f *FileSource) tailFile(out chan types.Event, t *tomb.Tomb, tail *tail.Tail) error {
|
func (f *FileSource) tailFile(out chan types.Event, t *tomb.Tomb, tail *tail.Tail) error {
|
||||||
//lint:ignore SA1015 as it is an infinite loop
|
//lint:ignore SA1015 as it is an infinite loop
|
||||||
timeout := time.Tick(1 * time.Second)
|
timeout := time.Tick(1 * time.Second)
|
||||||
f.logger.Infof("-> Starting tail of %s", tail.Filename)
|
f.logger.Debugf("-> Starting tail of %s", tail.Filename)
|
||||||
for {
|
for {
|
||||||
l := types.Line{}
|
l := types.Line{}
|
||||||
select {
|
select {
|
||||||
|
@ -193,6 +226,7 @@ func (f *FileSource) tailFile(out chan types.Event, t *tomb.Tomb, tail *tail.Tai
|
||||||
if line.Text == "" { //skip empty lines
|
if line.Text == "" { //skip empty lines
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
//FIXME: prometheus metrics
|
||||||
//ReaderHits.With(prometheus.Labels{"source": tail.Filename}).Inc()
|
//ReaderHits.With(prometheus.Labels{"source": tail.Filename}).Inc()
|
||||||
|
|
||||||
l.Raw = line.Text
|
l.Raw = line.Text
|
||||||
|
@ -201,11 +235,11 @@ func (f *FileSource) tailFile(out chan types.Event, t *tomb.Tomb, tail *tail.Tai
|
||||||
l.Src = tail.Filename
|
l.Src = tail.Filename
|
||||||
l.Process = true
|
l.Process = true
|
||||||
//we're tailing, it must be real time logs
|
//we're tailing, it must be real time logs
|
||||||
f.logger.Infof("pushing %+v", l)
|
f.logger.Debugf("pushing %+v", l)
|
||||||
out <- types.Event{Line: l, Process: true, Type: types.LOG, ExpectMode: leaky.LIVE}
|
out <- types.Event{Line: l, Process: true, Type: types.LOG, ExpectMode: leaky.LIVE}
|
||||||
case <-timeout:
|
case <-timeout:
|
||||||
//time out, shall we do stuff ?
|
//time out, shall we do stuff ?
|
||||||
f.logger.Debugf("timeout")
|
f.logger.Trace("timeout")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -213,12 +247,9 @@ func (f *FileSource) tailFile(out chan types.Event, t *tomb.Tomb, tail *tail.Tai
|
||||||
func (f *FileSource) readFile(filename string, out chan types.Event, t *tomb.Tomb) error {
|
func (f *FileSource) readFile(filename string, out chan types.Event, t *tomb.Tomb) error {
|
||||||
var scanner *bufio.Scanner
|
var scanner *bufio.Scanner
|
||||||
|
|
||||||
clog := log.WithFields(log.Fields{
|
|
||||||
"file": filename,
|
|
||||||
})
|
|
||||||
fd, err := os.Open(filename)
|
fd, err := os.Open(filename)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
clog.Errorf("Failed opening file: %s", err)
|
f.logger.Errorf("Failed opening file: %s", err)
|
||||||
return errors.Wrapf(err, "failed opening %s", filename)
|
return errors.Wrapf(err, "failed opening %s", filename)
|
||||||
}
|
}
|
||||||
defer fd.Close()
|
defer fd.Close()
|
||||||
|
@ -226,7 +257,7 @@ func (f *FileSource) readFile(filename string, out chan types.Event, t *tomb.Tom
|
||||||
if strings.HasSuffix(filename, ".gz") {
|
if strings.HasSuffix(filename, ".gz") {
|
||||||
gz, err := gzip.NewReader(fd)
|
gz, err := gzip.NewReader(fd)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
clog.Errorf("Failed to read gz file: %s", err)
|
f.logger.Errorf("Failed to read gz file: %s", err)
|
||||||
return errors.Wrapf(err, "failed to read gz %s", filename)
|
return errors.Wrapf(err, "failed to read gz %s", filename)
|
||||||
}
|
}
|
||||||
defer gz.Close()
|
defer gz.Close()
|
||||||
|
@ -237,15 +268,14 @@ func (f *FileSource) readFile(filename string, out chan types.Event, t *tomb.Tom
|
||||||
}
|
}
|
||||||
scanner.Split(bufio.ScanLines)
|
scanner.Split(bufio.ScanLines)
|
||||||
for scanner.Scan() {
|
for scanner.Scan() {
|
||||||
log.Infof("line %s", scanner.Text())
|
f.logger.Debugf("line %s", scanner.Text())
|
||||||
l := types.Line{}
|
l := types.Line{}
|
||||||
l.Raw = scanner.Text()
|
l.Raw = scanner.Text()
|
||||||
l.Time = time.Now()
|
l.Time = time.Now()
|
||||||
l.Src = filename
|
l.Src = filename
|
||||||
l.Labels = f.config.Labels
|
l.Labels = f.config.Labels
|
||||||
l.Process = true
|
l.Process = true
|
||||||
// FIXME: How to interact with prom metrics ?
|
|
||||||
//ReaderHits.With(prometheus.Labels{"source": filename}).Inc()
|
|
||||||
//we're reading logs at once, it must be time-machine buckets
|
//we're reading logs at once, it must be time-machine buckets
|
||||||
out <- types.Event{Line: l, Process: true, Type: types.LOG, ExpectMode: leaky.TIMEMACHINE}
|
out <- types.Event{Line: l, Process: true, Type: types.LOG, ExpectMode: leaky.TIMEMACHINE}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue