|
@@ -354,18 +354,18 @@ func (l *LokiSource) StreamingAcquisition(out chan types.Event, t *tomb.Tomb) er
|
|
|
}
|
|
|
defer c.Close()
|
|
|
var resp Tail
|
|
|
+ _, reader, err := c.NextReader()
|
|
|
+ if err != nil {
|
|
|
+ return errors.Wrap(err, "OneShotAcquisition error while reading JSON websocket")
|
|
|
+ }
|
|
|
+ decoder := json.NewDecoder(reader)
|
|
|
for { // draining the websocket
|
|
|
if !t.Alive() { // someone want to close this loop
|
|
|
return nil
|
|
|
}
|
|
|
- t, msg, err := c.ReadMessage()
|
|
|
- if len(msg) == 0 {
|
|
|
- time.Sleep(100 * time.Millisecond)
|
|
|
- continue
|
|
|
- }
|
|
|
- err = json.Unmarshal(msg, &resp)
|
|
|
+ err = decoder.Decode(&resp)
|
|
|
if err != nil {
|
|
|
- return errors.Wrap(err, "OneShotAcquisition error while reading JSON websocket")
|
|
|
+ return errors.Wrap(err, "OneShotAcquisition error while parsing JSON websocket")
|
|
|
}
|
|
|
l.logger.WithField("type", t).WithField("message", resp).Debug("Message receveid")
|
|
|
l.readOneTail(resp, out)
|