|
@@ -337,6 +337,7 @@ func (l *LokiSource) StreamingAcquisition(out chan types.Event, t *tomb.Tomb) er
|
|
|
if err != nil {
|
|
|
return errors.Wrap(err, "error while getting StreamingAcquisition")
|
|
|
}
|
|
|
+ ll := l.logger.WithField("websocket url", l.lokiWebsocket)
|
|
|
t.Go(func() error {
|
|
|
for {
|
|
|
ctx, cancel := context.WithTimeout(context.TODO(), readyTimeout)
|
|
@@ -352,19 +353,22 @@ func (l *LokiSource) StreamingAcquisition(out chan types.Event, t *tomb.Tomb) er
|
|
|
return nil
|
|
|
}
|
|
|
if res == nil { // no body, it's a network error, not a HTTP error
|
|
|
- return errors.Wrap(err, "loki StreamingAcquisition error before HTTP stack")
|
|
|
+ ll.WithError(err).Error("loki StreamingAcquisition error before HTTP stack")
|
|
|
+ break
|
|
|
}
|
|
|
buf, err2 := ioutil.ReadAll(res.Body)
|
|
|
if err2 == nil {
|
|
|
- return fmt.Errorf("loki websocket (%s) error %v : %s", l.lokiWebsocket, err, string(buf))
|
|
|
+ ll.WithField("body", string(buf)).WithField("status", res.StatusCode).Error("loki http error")
|
|
|
+ break
|
|
|
}
|
|
|
-
|
|
|
- return err2
|
|
|
+ ll.WithError(err2).Error("can't read loki http body")
|
|
|
+ break
|
|
|
}
|
|
|
defer c.Close()
|
|
|
_, reader, err := c.NextReader()
|
|
|
if err != nil {
|
|
|
- return errors.Wrap(err, "loki StreamingAcquisition error while reading JSON websocket")
|
|
|
+ ll.WithError(err).Error("loki StreamingAcquisition error while reading JSON websocket")
|
|
|
+ break
|
|
|
}
|
|
|
var resp Tail
|
|
|
decoder := json.NewDecoder(reader)
|
|
@@ -377,7 +381,7 @@ func (l *LokiSource) StreamingAcquisition(out chan types.Event, t *tomb.Tomb) er
|
|
|
if err == io.EOF { // the websocket is closed
|
|
|
break
|
|
|
}
|
|
|
- return errors.Wrap(err, "loki StreamingAcquisition error while parsing JSON websocket")
|
|
|
+ ll.WithError(err).Error("loki StreamingAcquisition error while parsing JSON websocket")
|
|
|
}
|
|
|
l.logger.WithField("type", t).WithField("message", resp).Debug("Message receveid")
|
|
|
l.readOneTail(resp, out)
|