|
@@ -12,6 +12,7 @@ import (
|
|
"fmt"
|
|
"fmt"
|
|
"io"
|
|
"io"
|
|
"io/ioutil"
|
|
"io/ioutil"
|
|
|
|
+ "net"
|
|
"net/http"
|
|
"net/http"
|
|
"net/url"
|
|
"net/url"
|
|
"time"
|
|
"time"
|
|
@@ -346,6 +347,13 @@ func (l *LokiSource) StreamingAcquisition(out chan types.Event, t *tomb.Tomb) er
|
|
}()
|
|
}()
|
|
c, res, err := l.dialer.DialContext(ctx, l.lokiWebsocket, l.header)
|
|
c, res, err := l.dialer.DialContext(ctx, l.lokiWebsocket, l.header)
|
|
if err != nil {
|
|
if err != nil {
|
|
|
|
+ if oerr, ok := err.(*net.OpError); ok && oerr.Err.Error() == "operation was canceled" {
|
|
|
|
+ // it's ok, the websocket connection is closed by the client, triggered by the tomb, lets stop
|
|
|
|
+ return nil
|
|
|
|
+ }
|
|
|
|
+ if res == nil { // no body, it's a network error, not a HTTP error
|
|
|
|
+ return errors.Wrap(err, "loki OneShotAcquisition error before HTTP stack")
|
|
|
|
+ }
|
|
buf, err2 := ioutil.ReadAll(res.Body)
|
|
buf, err2 := ioutil.ReadAll(res.Body)
|
|
if err2 == nil {
|
|
if err2 == nil {
|
|
return fmt.Errorf("loki websocket (%s) error %v : %s", l.lokiWebsocket, err, string(buf))
|
|
return fmt.Errorf("loki websocket (%s) error %v : %s", l.lokiWebsocket, err, string(buf))
|