|
@@ -358,14 +358,15 @@ func (l *LokiSource) StreamingAcquisition(out chan types.Event, t *tomb.Tomb) er
|
|
|
if err2 == nil {
|
|
|
return fmt.Errorf("loki websocket (%s) error %v : %s", l.lokiWebsocket, err, string(buf))
|
|
|
}
|
|
|
+
|
|
|
return err2
|
|
|
}
|
|
|
defer c.Close()
|
|
|
- var resp Tail
|
|
|
_, reader, err := c.NextReader()
|
|
|
if err != nil {
|
|
|
- return errors.Wrap(err, "OneShotAcquisition error while reading JSON websocket")
|
|
|
+ return errors.Wrap(err, "loki OneShotAcquisition error while reading JSON websocket")
|
|
|
}
|
|
|
+ var resp Tail
|
|
|
decoder := json.NewDecoder(reader)
|
|
|
for { // draining the websocket
|
|
|
if !t.Alive() { // someone want to close this loop
|