fix the way acquisition is stopped (#2069)
* fix the way acquisition is stopped by draining inputLineChan before terminating it. --------- Co-authored-by: sabban <15465465+sabban@users.noreply.github.com>
This commit is contained in:
parent
01ea78c10e
commit
39a4a256fd
3 changed files with 22 additions and 6 deletions
|
@ -44,8 +44,8 @@ func initCrowdsec(cConfig *csconfig.Config) (*parser.Parsers, error) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func runCrowdsec(cConfig *csconfig.Config, parsers *parser.Parsers) error {
|
func runCrowdsec(cConfig *csconfig.Config, parsers *parser.Parsers) error {
|
||||||
inputLineChan := make(chan types.Event)
|
inputEventChan = make(chan types.Event)
|
||||||
inputEventChan := make(chan types.Event)
|
inputLineChan = make(chan types.Event)
|
||||||
|
|
||||||
//start go-routines for parsing, buckets pour and outputs.
|
//start go-routines for parsing, buckets pour and outputs.
|
||||||
parserWg := &sync.WaitGroup{}
|
parserWg := &sync.WaitGroup{}
|
||||||
|
|
|
@ -40,8 +40,11 @@ var (
|
||||||
/*the state of acquisition*/
|
/*the state of acquisition*/
|
||||||
dataSources []acquisition.DataSource
|
dataSources []acquisition.DataSource
|
||||||
/*the state of the buckets*/
|
/*the state of the buckets*/
|
||||||
holders []leakybucket.BucketFactory
|
holders []leakybucket.BucketFactory
|
||||||
buckets *leakybucket.Buckets
|
buckets *leakybucket.Buckets
|
||||||
|
|
||||||
|
inputLineChan chan types.Event
|
||||||
|
inputEventChan chan types.Event
|
||||||
outputEventChan chan types.Event // the buckets init returns its own chan that is used for multiplexing
|
outputEventChan chan types.Event // the buckets init returns its own chan that is used for multiplexing
|
||||||
/*settings*/
|
/*settings*/
|
||||||
lastProcessedItem time.Time /*keep track of last item timestamp in time-machine. it is used to GC buckets when we dump them.*/
|
lastProcessedItem time.Time /*keep track of last item timestamp in time-machine. it is used to GC buckets when we dump them.*/
|
||||||
|
@ -283,7 +286,6 @@ func LoadConfig(cConfig *csconfig.Config) error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
// exitWithCode must be called right before the program termination,
|
// exitWithCode must be called right before the program termination,
|
||||||
// to allow measuring functional test coverage in case of abnormal exit.
|
// to allow measuring functional test coverage in case of abnormal exit.
|
||||||
//
|
//
|
||||||
|
|
|
@ -115,7 +115,7 @@ func ShutdownCrowdsecRoutines() error {
|
||||||
if len(dataSources) > 0 {
|
if len(dataSources) > 0 {
|
||||||
acquisTomb.Kill(nil)
|
acquisTomb.Kill(nil)
|
||||||
log.Debugf("waiting for acquisition to finish")
|
log.Debugf("waiting for acquisition to finish")
|
||||||
|
drainChan(inputLineChan)
|
||||||
if err := acquisTomb.Wait(); err != nil {
|
if err := acquisTomb.Wait(); err != nil {
|
||||||
log.Warningf("Acquisition returned error : %s", err)
|
log.Warningf("Acquisition returned error : %s", err)
|
||||||
reterr = err
|
reterr = err
|
||||||
|
@ -124,6 +124,7 @@ func ShutdownCrowdsecRoutines() error {
|
||||||
|
|
||||||
log.Debugf("acquisition is finished, wait for parser/bucket/ouputs.")
|
log.Debugf("acquisition is finished, wait for parser/bucket/ouputs.")
|
||||||
parsersTomb.Kill(nil)
|
parsersTomb.Kill(nil)
|
||||||
|
drainChan(inputEventChan)
|
||||||
if err := parsersTomb.Wait(); err != nil {
|
if err := parsersTomb.Wait(); err != nil {
|
||||||
log.Warningf("Parsers returned error : %s", err)
|
log.Warningf("Parsers returned error : %s", err)
|
||||||
reterr = err
|
reterr = err
|
||||||
|
@ -194,6 +195,19 @@ func shutdown(sig os.Signal, cConfig *csconfig.Config) error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func drainChan(c chan types.Event) {
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case _, ok := <-c:
|
||||||
|
if !ok { //closed
|
||||||
|
return
|
||||||
|
}
|
||||||
|
default:
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func HandleSignals(cConfig *csconfig.Config) error {
|
func HandleSignals(cConfig *csconfig.Config) error {
|
||||||
var (
|
var (
|
||||||
newConfig *csconfig.Config
|
newConfig *csconfig.Config
|
||||||
|
|
Loading…
Reference in a new issue