file acquisition: don't bubble error when tailed file disappears (#2903)

* file acquisition: don't bubble error when tailed file disappears
* don't call t.Kill()
* lint (whitespace)
This commit is contained in:
mmetc 2024-03-18 11:25:45 +01:00 committed by GitHub
parent fd2bb8927c
commit 2f49088163
No known key found for this signature in database
GPG key ID: B5690EEEBB952194

View file

@ -3,6 +3,7 @@ package fileacquisition
import ( import (
"bufio" "bufio"
"compress/gzip" "compress/gzip"
"errors"
"fmt" "fmt"
"io" "io"
"net/url" "net/url"
@ -16,7 +17,6 @@ import (
"github.com/fsnotify/fsnotify" "github.com/fsnotify/fsnotify"
"github.com/nxadm/tail" "github.com/nxadm/tail"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus"
log "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"
"gopkg.in/tomb.v2" "gopkg.in/tomb.v2"
@ -63,6 +63,7 @@ func (f *FileSource) GetUuid() string {
func (f *FileSource) UnmarshalConfig(yamlConfig []byte) error { func (f *FileSource) UnmarshalConfig(yamlConfig []byte) error {
f.config = FileConfiguration{} f.config = FileConfiguration{}
err := yaml.UnmarshalStrict(yamlConfig, &f.config) err := yaml.UnmarshalStrict(yamlConfig, &f.config)
if err != nil { if err != nil {
return fmt.Errorf("cannot parse FileAcquisition configuration: %w", err) return fmt.Errorf("cannot parse FileAcquisition configuration: %w", err)
@ -77,7 +78,7 @@ func (f *FileSource) UnmarshalConfig(yamlConfig []byte) error {
} }
if len(f.config.Filenames) == 0 { if len(f.config.Filenames) == 0 {
return fmt.Errorf("no filename or filenames configuration provided") return errors.New("no filename or filenames configuration provided")
} }
if f.config.Mode == "" { if f.config.Mode == "" {
@ -93,6 +94,7 @@ func (f *FileSource) UnmarshalConfig(yamlConfig []byte) error {
if err != nil { if err != nil {
return fmt.Errorf("could not compile regexp %s: %w", exclude, err) return fmt.Errorf("could not compile regexp %s: %w", exclude, err)
} }
f.exclude_regexps = append(f.exclude_regexps, re) f.exclude_regexps = append(f.exclude_regexps, re)
} }
@ -123,56 +125,68 @@ func (f *FileSource) Configure(yamlConfig []byte, logger *log.Entry, MetricsLeve
if f.config.ForceInotify { if f.config.ForceInotify {
directory := filepath.Dir(pattern) directory := filepath.Dir(pattern)
f.logger.Infof("Force add watch on %s", directory) f.logger.Infof("Force add watch on %s", directory)
if !f.watchedDirectories[directory] { if !f.watchedDirectories[directory] {
err = f.watcher.Add(directory) err = f.watcher.Add(directory)
if err != nil { if err != nil {
f.logger.Errorf("Could not create watch on directory %s : %s", directory, err) f.logger.Errorf("Could not create watch on directory %s : %s", directory, err)
continue continue
} }
f.watchedDirectories[directory] = true f.watchedDirectories[directory] = true
} }
} }
files, err := filepath.Glob(pattern) files, err := filepath.Glob(pattern)
if err != nil { if err != nil {
return fmt.Errorf("glob failure: %w", err) return fmt.Errorf("glob failure: %w", err)
} }
if len(files) == 0 { if len(files) == 0 {
f.logger.Warnf("No matching files for pattern %s", pattern) f.logger.Warnf("No matching files for pattern %s", pattern)
continue continue
} }
for _, file := range files {
for _, file := range files {
// check if file is excluded // check if file is excluded
excluded := false excluded := false
for _, pattern := range f.exclude_regexps { for _, pattern := range f.exclude_regexps {
if pattern.MatchString(file) { if pattern.MatchString(file) {
excluded = true excluded = true
f.logger.Infof("Skipping file %s as it matches exclude pattern %s", file, pattern) f.logger.Infof("Skipping file %s as it matches exclude pattern %s", file, pattern)
break break
} }
} }
if excluded { if excluded {
continue continue
} }
if files[0] != pattern && f.config.Mode == configuration.TAIL_MODE { // we have a glob pattern if files[0] != pattern && f.config.Mode == configuration.TAIL_MODE { // we have a glob pattern
directory := filepath.Dir(file) directory := filepath.Dir(file)
f.logger.Debugf("Will add watch to directory: %s", directory) f.logger.Debugf("Will add watch to directory: %s", directory)
if !f.watchedDirectories[directory] {
if !f.watchedDirectories[directory] {
err = f.watcher.Add(directory) err = f.watcher.Add(directory)
if err != nil { if err != nil {
f.logger.Errorf("Could not create watch on directory %s : %s", directory, err) f.logger.Errorf("Could not create watch on directory %s : %s", directory, err)
continue continue
} }
f.watchedDirectories[directory] = true f.watchedDirectories[directory] = true
} else { } else {
f.logger.Debugf("Watch for directory %s already exists", directory) f.logger.Debugf("Watch for directory %s already exists", directory)
} }
} }
f.logger.Infof("Adding file %s to datasources", file) f.logger.Infof("Adding file %s to datasources", file)
f.files = append(f.files, file) f.files = append(f.files, file)
} }
} }
return nil return nil
} }
@ -189,7 +203,7 @@ func (f *FileSource) ConfigureByDSN(dsn string, labels map[string]string, logger
args := strings.Split(dsn, "?") args := strings.Split(dsn, "?")
if len(args[0]) == 0 { if len(args[0]) == 0 {
return fmt.Errorf("empty file:// DSN") return errors.New("empty file:// DSN")
} }
if len(args) == 2 && len(args[1]) != 0 { if len(args) == 2 && len(args[1]) != 0 {
@ -197,25 +211,30 @@ func (f *FileSource) ConfigureByDSN(dsn string, labels map[string]string, logger
if err != nil { if err != nil {
return fmt.Errorf("could not parse file args: %w", err) return fmt.Errorf("could not parse file args: %w", err)
} }
for key, value := range params { for key, value := range params {
switch key { switch key {
case "log_level": case "log_level":
if len(value) != 1 { if len(value) != 1 {
return errors.New("expected zero or one value for 'log_level'") return errors.New("expected zero or one value for 'log_level'")
} }
lvl, err := log.ParseLevel(value[0]) lvl, err := log.ParseLevel(value[0])
if err != nil { if err != nil {
return fmt.Errorf("unknown level %s: %w", value[0], err) return fmt.Errorf("unknown level %s: %w", value[0], err)
} }
f.logger.Logger.SetLevel(lvl) f.logger.Logger.SetLevel(lvl)
case "max_buffer_size": case "max_buffer_size":
if len(value) != 1 { if len(value) != 1 {
return errors.New("expected zero or one value for 'max_buffer_size'") return errors.New("expected zero or one value for 'max_buffer_size'")
} }
maxBufferSize, err := strconv.Atoi(value[0]) maxBufferSize, err := strconv.Atoi(value[0])
if err != nil { if err != nil {
return fmt.Errorf("could not parse max_buffer_size %s: %w", value[0], err) return fmt.Errorf("could not parse max_buffer_size %s: %w", value[0], err)
} }
f.config.MaxBufferSize = maxBufferSize f.config.MaxBufferSize = maxBufferSize
default: default:
return fmt.Errorf("unknown parameter %s", key) return fmt.Errorf("unknown parameter %s", key)
@ -228,6 +247,7 @@ func (f *FileSource) ConfigureByDSN(dsn string, labels map[string]string, logger
f.config.UniqueId = uuid f.config.UniqueId = uuid
f.logger.Debugf("Will try pattern %s", args[0]) f.logger.Debugf("Will try pattern %s", args[0])
files, err := filepath.Glob(args[0]) files, err := filepath.Glob(args[0])
if err != nil { if err != nil {
return fmt.Errorf("glob failure: %w", err) return fmt.Errorf("glob failure: %w", err)
@ -245,6 +265,7 @@ func (f *FileSource) ConfigureByDSN(dsn string, labels map[string]string, logger
f.logger.Infof("Adding file %s to filelist", file) f.logger.Infof("Adding file %s to filelist", file)
f.files = append(f.files, file) f.files = append(f.files, file)
} }
return nil return nil
} }
@ -260,22 +281,26 @@ func (f *FileSource) SupportedModes() []string {
// OneShotAcquisition reads a set of file and returns when done // OneShotAcquisition reads a set of file and returns when done
func (f *FileSource) OneShotAcquisition(out chan types.Event, t *tomb.Tomb) error { func (f *FileSource) OneShotAcquisition(out chan types.Event, t *tomb.Tomb) error {
f.logger.Debug("In oneshot") f.logger.Debug("In oneshot")
for _, file := range f.files { for _, file := range f.files {
fi, err := os.Stat(file) fi, err := os.Stat(file)
if err != nil { if err != nil {
return fmt.Errorf("could not stat file %s : %w", file, err) return fmt.Errorf("could not stat file %s : %w", file, err)
} }
if fi.IsDir() { if fi.IsDir() {
f.logger.Warnf("%s is a directory, ignoring it.", file) f.logger.Warnf("%s is a directory, ignoring it.", file)
continue continue
} }
f.logger.Infof("reading %s at once", file) f.logger.Infof("reading %s at once", file)
err = f.readFile(file, out, t) err = f.readFile(file, out, t)
if err != nil { if err != nil {
return err return err
} }
} }
return nil return nil
} }
@ -300,16 +325,21 @@ func (f *FileSource) StreamingAcquisition(out chan types.Event, t *tomb.Tomb) er
t.Go(func() error { t.Go(func() error {
return f.monitorNewFiles(out, t) return f.monitorNewFiles(out, t)
}) })
for _, file := range f.files { for _, file := range f.files {
// before opening the file, check if we need to specifically avoid it. (XXX) // before opening the file, check if we need to specifically avoid it. (XXX)
skip := false skip := false
for _, pattern := range f.exclude_regexps { for _, pattern := range f.exclude_regexps {
if pattern.MatchString(file) { if pattern.MatchString(file) {
f.logger.Infof("file %s matches exclusion pattern %s, skipping", file, pattern.String()) f.logger.Infof("file %s matches exclusion pattern %s, skipping", file, pattern.String())
skip = true skip = true
break break
} }
} }
if skip { if skip {
continue continue
} }
@ -321,6 +351,7 @@ func (f *FileSource) StreamingAcquisition(out chan types.Event, t *tomb.Tomb) er
f.logger.Errorf("unable to read %s : %s", file, err) f.logger.Errorf("unable to read %s : %s", file, err)
continue continue
} }
if err := fd.Close(); err != nil { if err := fd.Close(); err != nil {
f.logger.Errorf("unable to close %s : %s", file, err) f.logger.Errorf("unable to close %s : %s", file, err)
continue continue
@ -330,6 +361,7 @@ func (f *FileSource) StreamingAcquisition(out chan types.Event, t *tomb.Tomb) er
if err != nil { if err != nil {
return fmt.Errorf("could not stat file %s : %w", file, err) return fmt.Errorf("could not stat file %s : %w", file, err)
} }
if fi.IsDir() { if fi.IsDir() {
f.logger.Warnf("%s is a directory, ignoring it.", file) f.logger.Warnf("%s is a directory, ignoring it.", file)
continue continue
@ -343,9 +375,12 @@ func (f *FileSource) StreamingAcquisition(out chan types.Event, t *tomb.Tomb) er
if err != nil { if err != nil {
f.logger.Warningf("Could not get fs type for %s : %s", file, err) f.logger.Warningf("Could not get fs type for %s : %s", file, err)
} }
f.logger.Debugf("fs for %s is network: %t (%s)", file, networkFS, fsType) f.logger.Debugf("fs for %s is network: %t (%s)", file, networkFS, fsType)
if networkFS { if networkFS {
f.logger.Warnf("Disabling inotify poll on %s as it is on a network share. You can manually set poll_without_inotify to true to make this message disappear, or to false to enforce inotify poll", file) f.logger.Warnf("Disabling inotify poll on %s as it is on a network share. You can manually set poll_without_inotify to true to make this message disappear, or to false to enforce inotify poll", file)
inotifyPoll = false inotifyPoll = false
} }
} }
@ -355,6 +390,7 @@ func (f *FileSource) StreamingAcquisition(out chan types.Event, t *tomb.Tomb) er
f.logger.Errorf("Could not start tailing file %s : %s", file, err) f.logger.Errorf("Could not start tailing file %s : %s", file, err)
continue continue
} }
f.tailMapMutex.Lock() f.tailMapMutex.Lock()
f.tails[file] = true f.tails[file] = true
f.tailMapMutex.Unlock() f.tailMapMutex.Unlock()
@ -363,6 +399,7 @@ func (f *FileSource) StreamingAcquisition(out chan types.Event, t *tomb.Tomb) er
return f.tailFile(out, t, tail) return f.tailFile(out, t, tail)
}) })
} }
return nil return nil
} }
@ -372,6 +409,7 @@ func (f *FileSource) Dump() interface{} {
func (f *FileSource) monitorNewFiles(out chan types.Event, t *tomb.Tomb) error { func (f *FileSource) monitorNewFiles(out chan types.Event, t *tomb.Tomb) error {
logger := f.logger.WithField("goroutine", "inotify") logger := f.logger.WithField("goroutine", "inotify")
for { for {
select { select {
case event, ok := <-f.watcher.Events: case event, ok := <-f.watcher.Events:
@ -385,36 +423,47 @@ func (f *FileSource) monitorNewFiles(out chan types.Event, t *tomb.Tomb) error {
logger.Errorf("Could not stat() new file %s, ignoring it : %s", event.Name, err) logger.Errorf("Could not stat() new file %s, ignoring it : %s", event.Name, err)
continue continue
} }
if fi.IsDir() { if fi.IsDir() {
continue continue
} }
logger.Debugf("Detected new file %s", event.Name) logger.Debugf("Detected new file %s", event.Name)
matched := false matched := false
for _, pattern := range f.config.Filenames { for _, pattern := range f.config.Filenames {
logger.Debugf("Matching %s with %s", pattern, event.Name) logger.Debugf("Matching %s with %s", pattern, event.Name)
matched, err = filepath.Match(pattern, event.Name) matched, err = filepath.Match(pattern, event.Name)
if err != nil { if err != nil {
logger.Errorf("Could not match pattern : %s", err) logger.Errorf("Could not match pattern : %s", err)
continue continue
} }
if matched { if matched {
logger.Debugf("Matched %s with %s", pattern, event.Name) logger.Debugf("Matched %s with %s", pattern, event.Name)
break break
} }
} }
if !matched { if !matched {
continue continue
} }
// before opening the file, check if we need to specifically avoid it. (XXX) // before opening the file, check if we need to specifically avoid it. (XXX)
skip := false skip := false
for _, pattern := range f.exclude_regexps { for _, pattern := range f.exclude_regexps {
if pattern.MatchString(event.Name) { if pattern.MatchString(event.Name) {
f.logger.Infof("file %s matches exclusion pattern %s, skipping", event.Name, pattern.String()) f.logger.Infof("file %s matches exclusion pattern %s, skipping", event.Name, pattern.String())
skip = true skip = true
break break
} }
} }
if skip { if skip {
continue continue
} }
@ -424,6 +473,7 @@ func (f *FileSource) monitorNewFiles(out chan types.Event, t *tomb.Tomb) error {
f.tailMapMutex.RUnlock() f.tailMapMutex.RUnlock()
// we already have a tail on it, do not start a new one // we already have a tail on it, do not start a new one
logger.Debugf("Already tailing file %s, not creating a new tail", event.Name) logger.Debugf("Already tailing file %s, not creating a new tail", event.Name)
break break
} }
f.tailMapMutex.RUnlock() f.tailMapMutex.RUnlock()
@ -450,7 +500,9 @@ func (f *FileSource) monitorNewFiles(out chan types.Event, t *tomb.Tomb) error {
if err != nil { if err != nil {
f.logger.Warningf("Could not get fs type for %s : %s", event.Name, err) f.logger.Warningf("Could not get fs type for %s : %s", event.Name, err)
} }
f.logger.Debugf("fs for %s is network: %t (%s)", event.Name, networkFS, fsType) f.logger.Debugf("fs for %s is network: %t (%s)", event.Name, networkFS, fsType)
if networkFS { if networkFS {
inotifyPoll = false inotifyPoll = false
} }
@ -463,6 +515,7 @@ func (f *FileSource) monitorNewFiles(out chan types.Event, t *tomb.Tomb) error {
logger.Errorf("Could not start tailing file %s : %s", event.Name, err) logger.Errorf("Could not start tailing file %s : %s", event.Name, err)
break break
} }
f.tailMapMutex.Lock() f.tailMapMutex.Lock()
f.tails[event.Name] = true f.tails[event.Name] = true
f.tailMapMutex.Unlock() f.tailMapMutex.Unlock()
@ -475,12 +528,14 @@ func (f *FileSource) monitorNewFiles(out chan types.Event, t *tomb.Tomb) error {
if !ok { if !ok {
return nil return nil
} }
logger.Errorf("Error while monitoring folder: %s", err) logger.Errorf("Error while monitoring folder: %s", err)
case <-t.Dying(): case <-t.Dying():
err := f.watcher.Close() err := f.watcher.Close()
if err != nil { if err != nil {
return fmt.Errorf("could not remove all inotify watches: %w", err) return fmt.Errorf("could not remove all inotify watches: %w", err)
} }
return nil return nil
} }
} }
@ -489,14 +544,17 @@ func (f *FileSource) monitorNewFiles(out chan types.Event, t *tomb.Tomb) error {
func (f *FileSource) tailFile(out chan types.Event, t *tomb.Tomb, tail *tail.Tail) error { func (f *FileSource) tailFile(out chan types.Event, t *tomb.Tomb, tail *tail.Tail) error {
logger := f.logger.WithField("tail", tail.Filename) logger := f.logger.WithField("tail", tail.Filename)
logger.Debugf("-> Starting tail of %s", tail.Filename) logger.Debugf("-> Starting tail of %s", tail.Filename)
for { for {
select { select {
case <-t.Dying(): case <-t.Dying():
logger.Infof("File datasource %s stopping", tail.Filename) logger.Infof("File datasource %s stopping", tail.Filename)
if err := tail.Stop(); err != nil { if err := tail.Stop(); err != nil {
f.logger.Errorf("error in stop : %s", err) f.logger.Errorf("error in stop : %s", err)
return err return err
} }
return nil return nil
case <-tail.Dying(): // our tailer is dying case <-tail.Dying(): // our tailer is dying
err := tail.Err() err := tail.Err()
@ -504,24 +562,29 @@ func (f *FileSource) tailFile(out chan types.Event, t *tomb.Tomb, tail *tail.Tai
if err != nil { if err != nil {
errMsg = fmt.Sprintf(errMsg+" : %s", err) errMsg = fmt.Sprintf(errMsg+" : %s", err)
} }
logger.Warningf(errMsg) logger.Warningf(errMsg)
t.Kill(fmt.Errorf(errMsg))
return fmt.Errorf(errMsg) return nil
case line := <-tail.Lines: case line := <-tail.Lines:
if line == nil { if line == nil {
logger.Warningf("tail for %s is empty", tail.Filename) logger.Warningf("tail for %s is empty", tail.Filename)
continue continue
} }
if line.Err != nil { if line.Err != nil {
logger.Warningf("fetch error : %v", line.Err) logger.Warningf("fetch error : %v", line.Err)
return line.Err return line.Err
} }
if line.Text == "" { // skip empty lines if line.Text == "" { // skip empty lines
continue continue
} }
if f.metricsLevel != configuration.METRICS_NONE { if f.metricsLevel != configuration.METRICS_NONE {
linesRead.With(prometheus.Labels{"source": tail.Filename}).Inc() linesRead.With(prometheus.Labels{"source": tail.Filename}).Inc()
} }
src := tail.Filename src := tail.Filename
if f.metricsLevel == configuration.METRICS_AGGREGATE { if f.metricsLevel == configuration.METRICS_AGGREGATE {
src = filepath.Base(tail.Filename) src = filepath.Base(tail.Filename)
@ -549,12 +612,14 @@ func (f *FileSource) tailFile(out chan types.Event, t *tomb.Tomb, tail *tail.Tai
func (f *FileSource) readFile(filename string, out chan types.Event, t *tomb.Tomb) error { func (f *FileSource) readFile(filename string, out chan types.Event, t *tomb.Tomb) error {
var scanner *bufio.Scanner var scanner *bufio.Scanner
logger := f.logger.WithField("oneshot", filename) logger := f.logger.WithField("oneshot", filename)
fd, err := os.Open(filename) fd, err := os.Open(filename)
if err != nil { if err != nil {
return fmt.Errorf("failed opening %s: %w", filename, err) return fmt.Errorf("failed opening %s: %w", filename, err)
} }
defer fd.Close() defer fd.Close()
if strings.HasSuffix(filename, ".gz") { if strings.HasSuffix(filename, ".gz") {
@ -563,17 +628,20 @@ func (f *FileSource) readFile(filename string, out chan types.Event, t *tomb.Tom
logger.Errorf("Failed to read gz file: %s", err) logger.Errorf("Failed to read gz file: %s", err)
return fmt.Errorf("failed to read gz %s: %w", filename, err) return fmt.Errorf("failed to read gz %s: %w", filename, err)
} }
defer gz.Close() defer gz.Close()
scanner = bufio.NewScanner(gz) scanner = bufio.NewScanner(gz)
} else { } else {
scanner = bufio.NewScanner(fd) scanner = bufio.NewScanner(fd)
} }
scanner.Split(bufio.ScanLines) scanner.Split(bufio.ScanLines)
if f.config.MaxBufferSize > 0 { if f.config.MaxBufferSize > 0 {
buf := make([]byte, 0, 64*1024) buf := make([]byte, 0, 64*1024)
scanner.Buffer(buf, f.config.MaxBufferSize) scanner.Buffer(buf, f.config.MaxBufferSize)
} }
for scanner.Scan() { for scanner.Scan() {
select { select {
case <-t.Dying(): case <-t.Dying():
@ -583,6 +651,7 @@ func (f *FileSource) readFile(filename string, out chan types.Event, t *tomb.Tom
if scanner.Text() == "" { if scanner.Text() == "" {
continue continue
} }
l := types.Line{ l := types.Line{
Raw: scanner.Text(), Raw: scanner.Text(),
Time: time.Now().UTC(), Time: time.Now().UTC(),
@ -598,11 +667,15 @@ func (f *FileSource) readFile(filename string, out chan types.Event, t *tomb.Tom
out <- types.Event{Line: l, Process: true, Type: types.LOG, ExpectMode: types.TIMEMACHINE} out <- types.Event{Line: l, Process: true, Type: types.LOG, ExpectMode: types.TIMEMACHINE}
} }
} }
if err := scanner.Err(); err != nil { if err := scanner.Err(); err != nil {
logger.Errorf("Error while reading file: %s", err) logger.Errorf("Error while reading file: %s", err)
t.Kill(err) t.Kill(err)
return err return err
} }
t.Kill(nil) t.Kill(nil)
return nil return nil
} }