|
@@ -757,48 +757,25 @@ func TestOneShotAcquisition(t *testing.T) {
|
|
|
if tc.setup != nil {
|
|
|
tc.setup(t, &cw)
|
|
|
}
|
|
|
- out := make(chan types.Event)
|
|
|
+ out := make(chan types.Event, 100)
|
|
|
tmb := tomb.Tomb{}
|
|
|
var rcvdEvts []types.Event
|
|
|
|
|
|
dbgLogger.Infof("running StreamingAcquisition")
|
|
|
- actmb := tomb.Tomb{}
|
|
|
- actmb.Go(func() error {
|
|
|
- err := cw.OneShotAcquisition(out, &actmb)
|
|
|
- dbgLogger.Infof("acquis done")
|
|
|
- cstest.RequireErrorContains(t, err, tc.expectedStartErr)
|
|
|
- return nil
|
|
|
- })
|
|
|
-
|
|
|
+ err = cw.OneShotAcquisition(out, &tmb)
|
|
|
+ dbgLogger.Infof("acquis done")
|
|
|
+ cstest.RequireErrorContains(t, err, tc.expectedStartErr)
|
|
|
+ close(out)
|
|
|
// let's empty output chan
|
|
|
- tmb.Go(func() error {
|
|
|
- for {
|
|
|
- select {
|
|
|
- case in := <-out:
|
|
|
- log.Debugf("received event %+v", in)
|
|
|
- rcvdEvts = append(rcvdEvts, in)
|
|
|
- case <-tmb.Dying():
|
|
|
- log.Debugf("pumper died")
|
|
|
- return nil
|
|
|
- }
|
|
|
- }
|
|
|
- })
|
|
|
+ for evt := range out {
|
|
|
+ rcvdEvts = append(rcvdEvts, evt)
|
|
|
+ }
|
|
|
|
|
|
if tc.run != nil {
|
|
|
tc.run(t, &cw)
|
|
|
} else {
|
|
|
dbgLogger.Warning("no code to run")
|
|
|
}
|
|
|
-
|
|
|
- time.Sleep(5 * time.Second)
|
|
|
- dbgLogger.Infof("killing collector")
|
|
|
- tmb.Kill(nil)
|
|
|
- <-tmb.Dead()
|
|
|
- dbgLogger.Infof("killing datasource")
|
|
|
- actmb.Kill(nil)
|
|
|
- dbgLogger.Infof("waiting datasource death")
|
|
|
- <-actmb.Dead()
|
|
|
- // check results
|
|
|
if tc.expectedResLen != -1 {
|
|
|
if tc.expectedResLen != len(rcvdEvts) {
|
|
|
t.Fatalf("%s : expected %d results got %d -> %v", tc.name, tc.expectedResLen, len(rcvdEvts), rcvdEvts)
|