parent
1398a74c6d
commit
742435f178
4 changed files with 204 additions and 29 deletions
|
@ -202,23 +202,26 @@ func AcquisStartReading(ctx *FileAcquisCtx, output chan types.Event, AcquisTomb
|
|||
}
|
||||
/* start one go routine reading for each file, and pushing to chan output */
|
||||
for idx, fctx := range ctx.Files {
|
||||
log.Printf("starting reader file %d/%d : %s", idx, len(ctx.Files), fctx.Filename)
|
||||
if ctx.Profiling {
|
||||
fctx.Profiling = true
|
||||
}
|
||||
fctx := fctx
|
||||
mode := "?"
|
||||
switch fctx.Mode {
|
||||
case TAILMODE:
|
||||
mode = "tail"
|
||||
AcquisTomb.Go(func() error {
|
||||
return AcquisReadOneFile(fctx, output, AcquisTomb)
|
||||
})
|
||||
case CATMODE:
|
||||
mode = "cat"
|
||||
AcquisTomb.Go(func() error {
|
||||
return ReadAtOnce(fctx, output, AcquisTomb)
|
||||
})
|
||||
default:
|
||||
log.Fatalf("unknown read mode %s for %+v", fctx.Mode, fctx.Filenames)
|
||||
}
|
||||
log.Printf("starting (%s) reader file %d/%d : %s", mode, idx, len(ctx.Files), fctx.Filename)
|
||||
}
|
||||
log.Printf("Started %d routines for polling/read", len(ctx.Files))
|
||||
}
|
||||
|
@ -228,7 +231,6 @@ func AcquisReadOneFile(ctx FileCtx, output chan types.Event, AcquisTomb *tomb.To
|
|||
clog := log.WithFields(log.Fields{
|
||||
"acquisition file": ctx.Filename,
|
||||
})
|
||||
|
||||
if ctx.Type != FILETYPE {
|
||||
log.Errorf("Can't tail %s type for %+v", ctx.Type, ctx.Filenames)
|
||||
return fmt.Errorf("can't tail %s type for %+v", ctx.Type, ctx.Filenames)
|
||||
|
|
|
@ -1,10 +1,14 @@
|
|||
package acquisition
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"os"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/crowdsecurity/crowdsec/pkg/csconfig"
|
||||
"github.com/crowdsecurity/crowdsec/pkg/types"
|
||||
log "github.com/sirupsen/logrus"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"gopkg.in/tomb.v2"
|
||||
)
|
||||
|
@ -75,8 +79,7 @@ func TestLoadAcquisitionConfig(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
func TestAcquisStartReading(t *testing.T) {
|
||||
// Test in TAIL mode
|
||||
func TestAcquisStartReadingTailKilled(t *testing.T) {
|
||||
acquisFilePath := "./tests/acquis_test_log.yaml"
|
||||
csConfig := &csconfig.CrowdSec{
|
||||
AcquisitionFile: acquisFilePath,
|
||||
|
@ -94,46 +97,219 @@ func TestAcquisStartReading(t *testing.T) {
|
|||
t.Fatal("acquisition tomb is not alive")
|
||||
}
|
||||
|
||||
// Test in CAT mode
|
||||
time.Sleep(500 * time.Millisecond)
|
||||
filename := "./tests/test.log"
|
||||
|
||||
f, err := os.OpenFile(filename, os.O_APPEND|os.O_WRONLY, 0644)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
for i := 0; i < 5; i++ {
|
||||
_, err := f.WriteString(fmt.Sprintf("ratata%d\n", i))
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
}
|
||||
f.Close()
|
||||
|
||||
time.Sleep(500 * time.Millisecond)
|
||||
reads := 0
|
||||
L:
|
||||
for {
|
||||
select {
|
||||
case <-outputChan:
|
||||
reads++
|
||||
if reads == 2 {
|
||||
acquisTomb.Kill(nil)
|
||||
time.Sleep(100 * time.Millisecond)
|
||||
}
|
||||
case <-time.After(1 * time.Second):
|
||||
break L
|
||||
}
|
||||
}
|
||||
|
||||
log.Printf("-> %d", reads)
|
||||
if reads != 2 {
|
||||
t.Fatal()
|
||||
}
|
||||
|
||||
f, err = os.OpenFile(filename, os.O_CREATE|os.O_WRONLY, 0644)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
_, err = f.WriteString("one log line\n")
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
f.Close()
|
||||
}
|
||||
|
||||
func TestAcquisStartReadingTail(t *testing.T) {
|
||||
acquisFilePath := "./tests/acquis_test_log.yaml"
|
||||
filename := "./tests/test.log"
|
||||
csConfig := &csconfig.CrowdSec{
|
||||
AcquisitionFile: acquisFilePath,
|
||||
Profiling: false,
|
||||
}
|
||||
fCTX, err := LoadAcquisitionConfig(csConfig)
|
||||
if err != nil {
|
||||
t.Fatalf(err.Error())
|
||||
}
|
||||
outputChan := make(chan types.Event)
|
||||
acquisTomb := tomb.Tomb{}
|
||||
|
||||
AcquisStartReading(fCTX, outputChan, &acquisTomb)
|
||||
if !acquisTomb.Alive() {
|
||||
t.Fatal("acquisition tomb is not alive")
|
||||
}
|
||||
|
||||
time.Sleep(500 * time.Millisecond)
|
||||
|
||||
f, err := os.OpenFile(filename, os.O_APPEND|os.O_WRONLY, 0644)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
for i := 0; i < 5; i++ {
|
||||
_, err := f.WriteString(fmt.Sprintf("ratata%d\n", i))
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
}
|
||||
f.Close()
|
||||
|
||||
time.Sleep(500 * time.Millisecond)
|
||||
reads := 0
|
||||
L:
|
||||
for {
|
||||
select {
|
||||
case <-outputChan:
|
||||
reads++
|
||||
//log.Printf("evt %+v", evt)
|
||||
case <-time.After(1 * time.Second):
|
||||
break L
|
||||
}
|
||||
}
|
||||
|
||||
log.Printf("-> %d", reads)
|
||||
if reads != 5 {
|
||||
t.Fatal()
|
||||
}
|
||||
|
||||
acquisTomb.Kill(nil)
|
||||
if err := acquisTomb.Wait(); err != nil {
|
||||
t.Fatalf("Acquisition returned error : %s", err)
|
||||
}
|
||||
|
||||
f, err = os.OpenFile(filename, os.O_TRUNC|os.O_WRONLY, 0644)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
_, err = f.WriteString("one log line\n")
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
f.Close()
|
||||
}
|
||||
|
||||
func TestAcquisStartReadingCat(t *testing.T) {
|
||||
testFilePath := "./tests/test.log"
|
||||
|
||||
csConfig = &csconfig.CrowdSec{
|
||||
f, err := os.OpenFile(testFilePath, os.O_TRUNC|os.O_WRONLY, 0644)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
for i := 0; i < 5; i++ {
|
||||
_, err := f.WriteString(fmt.Sprintf("ratata%d\n", i))
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
}
|
||||
f.Close()
|
||||
|
||||
csConfig := &csconfig.CrowdSec{
|
||||
SingleFile: testFilePath,
|
||||
SingleFileLabel: "my_test_log",
|
||||
Profiling: false,
|
||||
}
|
||||
|
||||
fCTX, err = LoadAcquisitionConfig(csConfig)
|
||||
fCTX, err := LoadAcquisitionConfig(csConfig)
|
||||
if err != nil {
|
||||
t.Fatalf(err.Error())
|
||||
}
|
||||
outputChan = make(chan types.Event)
|
||||
acquisTomb = tomb.Tomb{}
|
||||
outputChan := make(chan types.Event)
|
||||
acquisTomb := tomb.Tomb{}
|
||||
|
||||
AcquisStartReading(fCTX, outputChan, &acquisTomb)
|
||||
if !acquisTomb.Alive() {
|
||||
t.Fatal("acquisition tomb is not alive")
|
||||
}
|
||||
|
||||
// Test with a .gz file
|
||||
testFilePath = "./tests/test.log.gz"
|
||||
|
||||
csConfig = &csconfig.CrowdSec{
|
||||
SingleFile: testFilePath,
|
||||
SingleFileLabel: "my_test_log",
|
||||
Profiling: false,
|
||||
time.Sleep(500 * time.Millisecond)
|
||||
reads := 0
|
||||
L:
|
||||
for {
|
||||
select {
|
||||
case <-outputChan:
|
||||
reads++
|
||||
case <-time.After(1 * time.Second):
|
||||
break L
|
||||
}
|
||||
}
|
||||
|
||||
fCTX, err = LoadAcquisitionConfig(csConfig)
|
||||
log.Printf("-> %d", reads)
|
||||
if reads != 5 {
|
||||
t.Fatal()
|
||||
}
|
||||
|
||||
acquisTomb.Kill(nil)
|
||||
if err := acquisTomb.Wait(); err != nil {
|
||||
t.Fatalf("Acquisition returned error : %s", err)
|
||||
}
|
||||
|
||||
f, err = os.OpenFile(testFilePath, os.O_TRUNC|os.O_WRONLY, 0644)
|
||||
if err != nil {
|
||||
t.Fatalf(err.Error())
|
||||
t.Fatal(err)
|
||||
}
|
||||
outputChan = make(chan types.Event)
|
||||
acquisTomb = tomb.Tomb{}
|
||||
|
||||
AcquisStartReading(fCTX, outputChan, &acquisTomb)
|
||||
if !acquisTomb.Alive() {
|
||||
t.Fatal("acquisition tomb is not alive")
|
||||
_, err = f.WriteString("one log line\n")
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
f.Close()
|
||||
}
|
||||
|
||||
func TestAcquisStartReadingGzCat(t *testing.T) {
|
||||
testFilePath := "./tests/test.log.gz"
|
||||
|
||||
csConfig := &csconfig.CrowdSec{
|
||||
SingleFile: testFilePath,
|
||||
SingleFileLabel: "my_test_log",
|
||||
Profiling: false,
|
||||
}
|
||||
fCTX, err := LoadAcquisitionConfig(csConfig)
|
||||
if err != nil {
|
||||
t.Fatalf(err.Error())
|
||||
}
|
||||
outputChan := make(chan types.Event)
|
||||
acquisTomb := tomb.Tomb{}
|
||||
|
||||
AcquisStartReading(fCTX, outputChan, &acquisTomb)
|
||||
if !acquisTomb.Alive() {
|
||||
t.Fatal("acquisition tomb is not alive")
|
||||
}
|
||||
|
||||
time.Sleep(500 * time.Millisecond)
|
||||
reads := 0
|
||||
L:
|
||||
for {
|
||||
select {
|
||||
case <-outputChan:
|
||||
reads++
|
||||
case <-time.After(1 * time.Second):
|
||||
break L
|
||||
}
|
||||
}
|
||||
|
||||
log.Printf("-> %d", reads)
|
||||
if reads != 1 {
|
||||
t.Fatal()
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -3,5 +3,3 @@ filenames:
|
|||
mode: tail
|
||||
labels:
|
||||
type: my_test_log
|
||||
---
|
||||
|
||||
|
|
|
@ -1 +0,0 @@
|
|||
my test log
|
Loading…
Add table
Reference in a new issue