|
@@ -383,6 +383,47 @@ query: >
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+func TestStopStreaming(t *testing.T) {
|
|
|
+ config := `
|
|
|
+mode: tail
|
|
|
+source: loki
|
|
|
+url: http://127.0.0.1:3100
|
|
|
+query: >
|
|
|
+ {server="demo"}
|
|
|
+`
|
|
|
+ logger := log.New()
|
|
|
+ subLogger := logger.WithFields(log.Fields{
|
|
|
+ "type": "loki",
|
|
|
+ })
|
|
|
+ title := time.Now().String()
|
|
|
+ lokiSource := LokiSource{}
|
|
|
+ err := lokiSource.Configure([]byte(config), subLogger)
|
|
|
+ if err != nil {
|
|
|
+ t.Fatalf("Unexpected error : %s", err)
|
|
|
+ }
|
|
|
+ out := make(chan types.Event)
|
|
|
+ drainTomb := tomb.Tomb{}
|
|
|
+ drainTomb.Go(func() error {
|
|
|
+ <-out
|
|
|
+ return nil
|
|
|
+ })
|
|
|
+ lokiTomb := &tomb.Tomb{}
|
|
|
+ err = lokiSource.StreamingAcquisition(out, lokiTomb)
|
|
|
+ if err != nil {
|
|
|
+ t.Fatalf("Unexpected error : %s", err)
|
|
|
+ }
|
|
|
+ feedLoki(subLogger, 1, title)
|
|
|
+ err = drainTomb.Wait()
|
|
|
+ if err != nil {
|
|
|
+ t.Fatalf("Unexpected error : %s", err)
|
|
|
+ }
|
|
|
+ lokiTomb.Kill(nil)
|
|
|
+ err = lokiTomb.Wait()
|
|
|
+ if err != nil {
|
|
|
+ t.Fatalf("Unexpected error : %s", err)
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
type LogStreams struct {
|
|
|
Streams []LogStream `json:"streams"`
|
|
|
}
|