diff --git a/pkg/acquisition/file_reader.go b/pkg/acquisition/file_reader.go index f79caef63..a4ee92233 100644 --- a/pkg/acquisition/file_reader.go +++ b/pkg/acquisition/file_reader.go @@ -202,23 +202,26 @@ func AcquisStartReading(ctx *FileAcquisCtx, output chan types.Event, AcquisTomb } /* start one go routine reading for each file, and pushing to chan output */ for idx, fctx := range ctx.Files { - log.Printf("starting reader file %d/%d : %s", idx, len(ctx.Files), fctx.Filename) if ctx.Profiling { fctx.Profiling = true } fctx := fctx + mode := "?" switch fctx.Mode { case TAILMODE: + mode = "tail" AcquisTomb.Go(func() error { return AcquisReadOneFile(fctx, output, AcquisTomb) }) case CATMODE: + mode = "cat" AcquisTomb.Go(func() error { return ReadAtOnce(fctx, output, AcquisTomb) }) default: log.Fatalf("unknown read mode %s for %+v", fctx.Mode, fctx.Filenames) } + log.Printf("starting (%s) reader file %d/%d : %s", mode, idx, len(ctx.Files), fctx.Filename) } log.Printf("Started %d routines for polling/read", len(ctx.Files)) } @@ -228,7 +231,6 @@ func AcquisReadOneFile(ctx FileCtx, output chan types.Event, AcquisTomb *tomb.To clog := log.WithFields(log.Fields{ "acquisition file": ctx.Filename, }) - if ctx.Type != FILETYPE { log.Errorf("Can't tail %s type for %+v", ctx.Type, ctx.Filenames) return fmt.Errorf("can't tail %s type for %+v", ctx.Type, ctx.Filenames) diff --git a/pkg/acquisition/file_reader_test.go b/pkg/acquisition/file_reader_test.go index 66df87d71..1f8d7b090 100644 --- a/pkg/acquisition/file_reader_test.go +++ b/pkg/acquisition/file_reader_test.go @@ -1,10 +1,14 @@ package acquisition import ( + "fmt" + "os" "testing" + "time" "github.com/crowdsecurity/crowdsec/pkg/csconfig" "github.com/crowdsecurity/crowdsec/pkg/types" + log "github.com/sirupsen/logrus" "github.com/stretchr/testify/assert" "gopkg.in/tomb.v2" ) @@ -75,8 +79,7 @@ func TestLoadAcquisitionConfig(t *testing.T) { } } -func TestAcquisStartReading(t *testing.T) { - // Test in TAIL mode +func TestAcquisStartReadingTailKilled(t *testing.T) { acquisFilePath := "./tests/acquis_test_log.yaml" csConfig := &csconfig.CrowdSec{ AcquisitionFile: acquisFilePath, @@ -94,46 +97,219 @@ func TestAcquisStartReading(t *testing.T) { t.Fatal("acquisition tomb is not alive") } - // Test in CAT mode + time.Sleep(500 * time.Millisecond) + filename := "./tests/test.log" + + f, err := os.OpenFile(filename, os.O_APPEND|os.O_WRONLY, 0644) + if err != nil { + t.Fatal(err) + } + for i := 0; i < 5; i++ { + _, err := f.WriteString(fmt.Sprintf("ratata%d\n", i)) + if err != nil { + t.Fatal(err) + } + } + f.Close() + + time.Sleep(500 * time.Millisecond) + reads := 0 +L: + for { + select { + case <-outputChan: + reads++ + if reads == 2 { + acquisTomb.Kill(nil) + time.Sleep(100 * time.Millisecond) + } + case <-time.After(1 * time.Second): + break L + } + } + + log.Printf("-> %d", reads) + if reads != 2 { + t.Fatal() + } + + f, err = os.OpenFile(filename, os.O_CREATE|os.O_WRONLY, 0644) + if err != nil { + t.Fatal(err) + } + _, err = f.WriteString("one log line\n") + if err != nil { + t.Fatal(err) + } + f.Close() +} + +func TestAcquisStartReadingTail(t *testing.T) { + acquisFilePath := "./tests/acquis_test_log.yaml" + filename := "./tests/test.log" + csConfig := &csconfig.CrowdSec{ + AcquisitionFile: acquisFilePath, + Profiling: false, + } + fCTX, err := LoadAcquisitionConfig(csConfig) + if err != nil { + t.Fatalf(err.Error()) + } + outputChan := make(chan types.Event) + acquisTomb := tomb.Tomb{} + + AcquisStartReading(fCTX, outputChan, &acquisTomb) + if !acquisTomb.Alive() { + t.Fatal("acquisition tomb is not alive") + } + + time.Sleep(500 * time.Millisecond) + + f, err := os.OpenFile(filename, os.O_APPEND|os.O_WRONLY, 0644) + if err != nil { + t.Fatal(err) + } + for i := 0; i < 5; i++ { + _, err := f.WriteString(fmt.Sprintf("ratata%d\n", i)) + if err != nil { + t.Fatal(err) + } + } + f.Close() + + time.Sleep(500 * time.Millisecond) + reads := 0 +L: + for { + select { + case <-outputChan: + reads++ + //log.Printf("evt %+v", evt) + case <-time.After(1 * time.Second): + break L + } + } + + log.Printf("-> %d", reads) + if reads != 5 { + t.Fatal() + } + + acquisTomb.Kill(nil) + if err := acquisTomb.Wait(); err != nil { + t.Fatalf("Acquisition returned error : %s", err) + } + + f, err = os.OpenFile(filename, os.O_TRUNC|os.O_WRONLY, 0644) + if err != nil { + t.Fatal(err) + } + _, err = f.WriteString("one log line\n") + if err != nil { + t.Fatal(err) + } + f.Close() +} + +func TestAcquisStartReadingCat(t *testing.T) { testFilePath := "./tests/test.log" - csConfig = &csconfig.CrowdSec{ + f, err := os.OpenFile(testFilePath, os.O_TRUNC|os.O_WRONLY, 0644) + if err != nil { + t.Fatal(err) + } + for i := 0; i < 5; i++ { + _, err := f.WriteString(fmt.Sprintf("ratata%d\n", i)) + if err != nil { + t.Fatal(err) + } + } + f.Close() + + csConfig := &csconfig.CrowdSec{ SingleFile: testFilePath, SingleFileLabel: "my_test_log", Profiling: false, } - - fCTX, err = LoadAcquisitionConfig(csConfig) + fCTX, err := LoadAcquisitionConfig(csConfig) if err != nil { t.Fatalf(err.Error()) } - outputChan = make(chan types.Event) - acquisTomb = tomb.Tomb{} + outputChan := make(chan types.Event) + acquisTomb := tomb.Tomb{} AcquisStartReading(fCTX, outputChan, &acquisTomb) if !acquisTomb.Alive() { t.Fatal("acquisition tomb is not alive") } - // Test with a .gz file - testFilePath = "./tests/test.log.gz" - - csConfig = &csconfig.CrowdSec{ - SingleFile: testFilePath, - SingleFileLabel: "my_test_log", - Profiling: false, + time.Sleep(500 * time.Millisecond) + reads := 0 +L: + for { + select { + case <-outputChan: + reads++ + case <-time.After(1 * time.Second): + break L + } } - fCTX, err = LoadAcquisitionConfig(csConfig) + log.Printf("-> %d", reads) + if reads != 5 { + t.Fatal() + } + + acquisTomb.Kill(nil) + if err := acquisTomb.Wait(); err != nil { + t.Fatalf("Acquisition returned error : %s", err) + } + + f, err = os.OpenFile(testFilePath, os.O_TRUNC|os.O_WRONLY, 0644) if err != nil { - t.Fatalf(err.Error()) + t.Fatal(err) } - outputChan = make(chan types.Event) - acquisTomb = tomb.Tomb{} - - AcquisStartReading(fCTX, outputChan, &acquisTomb) - if !acquisTomb.Alive() { - t.Fatal("acquisition tomb is not alive") + _, err = f.WriteString("one log line\n") + if err != nil { + t.Fatal(err) + } + f.Close() +} + +func TestAcquisStartReadingGzCat(t *testing.T) { + testFilePath := "./tests/test.log.gz" + + csConfig := &csconfig.CrowdSec{ + SingleFile: testFilePath, + SingleFileLabel: "my_test_log", + Profiling: false, + } + fCTX, err := LoadAcquisitionConfig(csConfig) + if err != nil { + t.Fatalf(err.Error()) + } + outputChan := make(chan types.Event) + acquisTomb := tomb.Tomb{} + + AcquisStartReading(fCTX, outputChan, &acquisTomb) + if !acquisTomb.Alive() { + t.Fatal("acquisition tomb is not alive") + } + + time.Sleep(500 * time.Millisecond) + reads := 0 +L: + for { + select { + case <-outputChan: + reads++ + case <-time.After(1 * time.Second): + break L + } + } + + log.Printf("-> %d", reads) + if reads != 1 { + t.Fatal() } - } diff --git a/pkg/acquisition/tests/acquis_test_log.yaml b/pkg/acquisition/tests/acquis_test_log.yaml index aa09d148d..b5abe712f 100644 --- a/pkg/acquisition/tests/acquis_test_log.yaml +++ b/pkg/acquisition/tests/acquis_test_log.yaml @@ -3,5 +3,3 @@ filenames: mode: tail labels: type: my_test_log ---- - diff --git a/pkg/acquisition/tests/test.log b/pkg/acquisition/tests/test.log index c1e87ac5b..e69de29bb 100644 --- a/pkg/acquisition/tests/test.log +++ b/pkg/acquisition/tests/test.log @@ -1 +0,0 @@ -my test log \ No newline at end of file