فهرست منبع

use a dedicated package for loki queries

Sebastien Blot 3 سال پیش
والد
کامیت
47d85e36d2

+ 252 - 0
pkg/acquisition/modules/loki/internal/lokiclient/loki_client.go

@@ -0,0 +1,252 @@
+package lokiclient
+
+import (
+	"context"
+	"encoding/base64"
+	"encoding/json"
+	"fmt"
+	"io/ioutil"
+	"net/http"
+	"net/url"
+	"path/filepath"
+	"strconv"
+	"time"
+
+	"github.com/crowdsecurity/crowdsec/pkg/cwversion"
+	"github.com/gorilla/websocket"
+	"github.com/pkg/errors"
+	log "github.com/sirupsen/logrus"
+	"gopkg.in/tomb.v2"
+)
+
+type LokiClient struct {
+	Logger *log.Entry
+
+	config Config
+	t      *tomb.Tomb
+}
+
+type Config struct {
+	LokiURL    string
+	LokiPrefix string
+	Query      string
+	Headers    map[string]string
+
+	Username string
+	Password string
+
+	Since        time.Duration
+	Until        time.Duration
+	WaitForReady time.Duration
+
+	Limit int
+}
+
+func (lc *LokiClient) tailLogs(conn *websocket.Conn, c chan *LokiResponse, ctx context.Context) error {
+	tick := time.NewTicker(100 * time.Millisecond)
+
+	for {
+		select {
+		case <-lc.t.Dying():
+			lc.Logger.Info("LokiClient tomb is dying, closing connection")
+			tick.Stop()
+			return conn.Close()
+		case <-ctx.Done(): //this is technically useless, as the read from the websocket is blocking :(
+			lc.Logger.Info("LokiClient context is done, closing connection")
+			tick.Stop()
+			return conn.Close()
+		case <-tick.C:
+			lc.Logger.Debug("Reading from WS")
+			jsonResponse := &LokiResponse{}
+			err := conn.ReadJSON(jsonResponse)
+			if err != nil {
+				close(c)
+				return err
+			}
+			lc.Logger.Tracef("Read from WS: %v", jsonResponse)
+			c <- jsonResponse
+			lc.Logger.Debug("Sent response to channel")
+		}
+	}
+}
+
+func (lc *LokiClient) queryRange(uri string, ctx context.Context, c chan *LokiQueryRangeResponse) error {
+	for {
+		select {
+		case <-ctx.Done():
+			return ctx.Err()
+		case <-lc.t.Dying():
+			return lc.t.Err()
+		default:
+			lc.Logger.Debugf("Querying Loki: %s", uri)
+			resp, err := http.Get(uri)
+
+			if err != nil {
+				return errors.Wrapf(err, "error querying range")
+			}
+			if resp.StatusCode != 200 {
+				body, _ := ioutil.ReadAll(resp.Body)
+				resp.Body.Close()
+				return errors.Wrapf(err, "bad HTTP response code: %d: %s", resp.StatusCode, string(body))
+			}
+
+			var lq LokiQueryRangeResponse
+
+			json.NewDecoder(resp.Body).Decode(&lq)
+			resp.Body.Close()
+
+			lc.Logger.Tracef("Got response: %+v", lq)
+
+			c <- &lq
+
+			if len(lq.Data.Result) == 0 || len(lq.Data.Result[0].Entries) < lc.config.Limit {
+				lc.Logger.Infof("Got less than %d results (%d), stopping", lc.config.Limit, len(lq.Data.Result))
+				close(c)
+				return nil
+			}
+			//Can we assume we will always have only one stream?
+			lastTs := lq.Data.Result[0].Entries[len(lq.Data.Result[0].Entries)-1].Timestamp
+
+			lc.Logger.Infof("Got %d results, last timestamp: %s (converted: %d)", len(lq.Data.Result[0].Entries), lastTs, strconv.Itoa(lastTs.Nanosecond()))
+			u, err := url.Parse(uri) //we can ignore the error, we know it's valid
+			if err != nil {
+				return errors.Wrapf(err, "error parsing URL")
+			}
+			queryParams := u.Query()
+			queryParams.Set("start", strconv.Itoa(int(lastTs.UnixNano())))
+			u.RawQuery = queryParams.Encode()
+			uri = u.String()
+		}
+	}
+}
+
+func (lc *LokiClient) getURLFor(endpoint string, params map[string]string) string {
+	u, err := url.Parse(lc.config.LokiURL)
+	if err != nil {
+		return ""
+	}
+	queryParams := u.Query()
+	for k, v := range params {
+		queryParams.Set(k, v)
+	}
+	u.RawQuery = queryParams.Encode()
+
+	u.Path = filepath.Join(lc.config.LokiPrefix, u.Path, endpoint)
+
+	switch endpoint {
+	case "loki/api/v1/tail":
+		if u.Scheme == "http" {
+			u.Scheme = "ws"
+		} else {
+			u.Scheme = "wss"
+		}
+	}
+	return u.String()
+}
+
+func (lc *LokiClient) Ready(ctx context.Context) error {
+	tick := time.NewTicker(500 * time.Millisecond)
+	url := lc.getURLFor("ready", nil)
+	for {
+		select {
+		case <-ctx.Done():
+			tick.Stop()
+			return ctx.Err()
+		case <-lc.t.Dying():
+			tick.Stop()
+			return lc.t.Err()
+		case <-tick.C:
+			lc.Logger.Debug("Checking if Loki is ready")
+			resp, err := http.Get(url)
+			if err != nil {
+				lc.Logger.Warnf("Error checking if Loki is ready: %s", err)
+				continue
+			}
+			_ = resp.Body.Close()
+			if resp.StatusCode != 200 {
+				lc.Logger.Debugf("Loki is not ready, status code: %d", resp.StatusCode)
+				continue
+			}
+			lc.Logger.Info("Loki is ready")
+			return nil
+		}
+	}
+}
+
+func (lc *LokiClient) Tail(ctx context.Context) (chan *LokiResponse, error) {
+	responseChan := make(chan *LokiResponse)
+	dialer := &websocket.Dialer{} //TODO: TLS support
+	u := lc.getURLFor("loki/api/v1/tail", map[string]string{
+		"limit": strconv.Itoa(lc.config.Limit),
+		"start": strconv.Itoa(int(time.Now().Add(-lc.config.Since).UnixNano())),
+		"query": lc.config.Query,
+	})
+
+	lc.Logger.Debugf("Since: %s (%s)", lc.config.Since, time.Now().Add(-lc.config.Since))
+
+	if lc.config.Username != "" || lc.config.Password != "" {
+		dialer.Proxy = func(req *http.Request) (*url.URL, error) {
+			req.SetBasicAuth(lc.config.Username, lc.config.Password)
+			return nil, nil
+		}
+	}
+
+	requestHeader := http.Header{}
+	for k, v := range lc.config.Headers {
+		requestHeader.Add(k, v)
+	}
+	requestHeader.Set("User-Agent", "Crowdsec "+cwversion.Version)
+	lc.Logger.Infof("Connecting to %s", u)
+	conn, resp, err := dialer.Dial(u, requestHeader)
+	defer resp.Body.Close()
+	if err != nil {
+		if resp != nil {
+			buf, err2 := ioutil.ReadAll(resp.Body)
+			if err2 != nil {
+				return nil, fmt.Errorf("error reading response body while handling WS error: %s (%s)", err, err2)
+			}
+			return nil, fmt.Errorf("error dialing WS: %s: %s", err, string(buf))
+		}
+		return nil, err
+	}
+
+	lc.t.Go(func() error {
+		return lc.tailLogs(conn, responseChan, ctx)
+	})
+
+	return responseChan, nil
+}
+
+func (lc *LokiClient) QueryRange(ctx context.Context) chan *LokiQueryRangeResponse {
+	url := lc.getURLFor("loki/api/v1/query_range", map[string]string{
+		"query":     lc.config.Query,
+		"start":     strconv.Itoa(int(time.Now().Add(-lc.config.Since).UnixNano())),
+		"end":       strconv.Itoa(int(time.Now().UnixNano())),
+		"limit":     strconv.Itoa(lc.config.Limit),
+		"direction": "forward",
+	})
+
+	c := make(chan *LokiQueryRangeResponse)
+
+	lc.Logger.Debugf("Since: %s (%s)", lc.config.Since, time.Now().Add(-lc.config.Since))
+
+	requestHeader := http.Header{}
+	for k, v := range lc.config.Headers {
+		requestHeader.Add(k, v)
+	}
+
+	if lc.config.Username != "" || lc.config.Password != "" {
+		requestHeader.Set("Authorization", "Basic "+base64.StdEncoding.EncodeToString([]byte(lc.config.Username+":"+lc.config.Password)))
+	}
+
+	requestHeader.Set("User-Agent", "Crowdsec "+cwversion.Version)
+	lc.Logger.Infof("Connecting to %s", url)
+	lc.t.Go(func() error {
+		return lc.queryRange(url, ctx, c)
+	})
+	return c
+}
+
+func NewLokiClient(config Config) *LokiClient {
+	return &LokiClient{t: &tomb.Tomb{}, Logger: log.WithField("component", "lokiclient"), config: config}
+}

+ 55 - 0
pkg/acquisition/modules/loki/internal/lokiclient/types.go

@@ -0,0 +1,55 @@
+package lokiclient
+
+import (
+	"encoding/json"
+	"strconv"
+	"time"
+)
+
+type Entry struct {
+	Timestamp time.Time
+	Line      string
+}
+
+func (e *Entry) UnmarshalJSON(b []byte) error {
+	var values []string
+	err := json.Unmarshal(b, &values)
+	if err != nil {
+		return err
+	}
+	t, err := strconv.Atoi(values[0])
+	if err != nil {
+		return err
+	}
+	e.Timestamp = time.Unix(0, int64(t))
+	e.Line = values[1]
+	return nil
+}
+
+type Stream struct {
+	Stream  map[string]string `json:"stream"`
+	Entries []Entry           `json:"values"`
+}
+
+type DroppedEntry struct {
+	Labels    map[string]string `json:"labels"`
+	Timestamp time.Time         `json:"timestamp"`
+}
+
+type LokiResponse struct {
+	Streams        []Stream      `json:"streams"`
+	DroppedEntries []interface{} `json:"dropped_entries"` //We don't care about the actual content i think ?
+}
+
+// LokiQuery GET response.
+// See https://grafana.com/docs/loki/latest/api/#get-lokiapiv1query
+type LokiQueryRangeResponse struct {
+	Status string `json:"status"`
+	Data   Data   `json:"data"`
+}
+
+type Data struct {
+	ResultType string      `json:"resultType"`
+	Result     []Stream    `json:"result"` // Warning, just stream value is handled
+	Stats      interface{} `json:"stats"`  // Stats is boring, just ignore it
+}

+ 114 - 239
pkg/acquisition/modules/loki/loki.go

@@ -5,24 +5,18 @@ https://grafana.com/docs/loki/latest/api/#get-lokiapiv1tail
 */
 */
 
 
 import (
 import (
-	"bytes"
 	"context"
 	"context"
-	"encoding/base64"
-	"encoding/json"
 	"fmt"
 	"fmt"
-	"io"
-	"io/ioutil"
-	"net"
-	"net/http"
 	"net/url"
 	"net/url"
+	"strconv"
+	"strings"
 	"time"
 	"time"
 
 
-	"github.com/crowdsecurity/crowdsec/pkg/cwversion"
 	leaky "github.com/crowdsecurity/crowdsec/pkg/leakybucket"
 	leaky "github.com/crowdsecurity/crowdsec/pkg/leakybucket"
 
 
 	"github.com/crowdsecurity/crowdsec/pkg/acquisition/configuration"
 	"github.com/crowdsecurity/crowdsec/pkg/acquisition/configuration"
+	lokiclient "github.com/crowdsecurity/crowdsec/pkg/acquisition/modules/loki/internal/lokiclient"
 	"github.com/crowdsecurity/crowdsec/pkg/types"
 	"github.com/crowdsecurity/crowdsec/pkg/types"
-	"github.com/gorilla/websocket"
 	"github.com/pkg/errors"
 	"github.com/pkg/errors"
 	"github.com/prometheus/client_golang/prometheus"
 	"github.com/prometheus/client_golang/prometheus"
 	log "github.com/sirupsen/logrus"
 	log "github.com/sirupsen/logrus"
@@ -45,24 +39,26 @@ var linesRead = prometheus.NewCounterVec(
 	[]string{"source"})
 	[]string{"source"})
 
 
 type LokiConfiguration struct {
 type LokiConfiguration struct {
-	URL                               string            `yaml:"url"`   // Loki url
-	Query                             string            `yaml:"query"` // LogQL query
+	URL                               string            `yaml:"url"`    // Loki url
+	Prefix                            string            `yaml:"prefix"` // Loki prefix
+	Query                             string            `yaml:"query"`  // LogQL query
+	Limit                             int               `yaml:"limit"`  // Limit of logs to read
 	DelayFor                          time.Duration     `yaml:"delay_for"`
 	DelayFor                          time.Duration     `yaml:"delay_for"`
-	Since                             timestamp         `yaml:"since"`
-	TenantID                          string            `yaml:"tenant_id"`
+	Since                             time.Duration     `yaml:"since"`
 	Headers                           map[string]string `yaml:"headers"`        // HTTP headers for talking to Loki
 	Headers                           map[string]string `yaml:"headers"`        // HTTP headers for talking to Loki
 	WaitForReady                      time.Duration     `yaml:"wait_for_ready"` // Retry interval, default is 10 seconds
 	WaitForReady                      time.Duration     `yaml:"wait_for_ready"` // Retry interval, default is 10 seconds
+	Username                          string            `yaml:"username"`
+	Password                          string            `yaml:"password"`
 	configuration.DataSourceCommonCfg `yaml:",inline"`
 	configuration.DataSourceCommonCfg `yaml:",inline"`
 }
 }
 
 
 type LokiSource struct {
 type LokiSource struct {
-	Config        LokiConfiguration
+	Config LokiConfiguration
+
+	client *lokiclient.LokiClient
+
 	logger        *log.Entry
 	logger        *log.Entry
 	lokiWebsocket string
 	lokiWebsocket string
-	lokiReady     string
-	dialer        *websocket.Dialer
-	header        http.Header
-	auth          *url.Userinfo
 }
 }
 
 
 func (l *LokiSource) GetMetrics() []prometheus.Collector {
 func (l *LokiSource) GetMetrics() []prometheus.Collector {
@@ -80,89 +76,46 @@ func (l *LokiSource) Configure(config []byte, logger *log.Entry) error {
 	if err != nil {
 	if err != nil {
 		return errors.Wrap(err, "Cannot parse LokiAcquisition configuration")
 		return errors.Wrap(err, "Cannot parse LokiAcquisition configuration")
 	}
 	}
+
+	if l.Config.Query == "" {
+		return errors.New("Loki query is mandatory")
+	}
+
 	if l.Config.WaitForReady == 0 {
 	if l.Config.WaitForReady == 0 {
 		l.Config.WaitForReady = 10 * time.Second
 		l.Config.WaitForReady = 10 * time.Second
 	}
 	}
 	if l.Config.Mode == "" {
 	if l.Config.Mode == "" {
 		l.Config.Mode = configuration.TAIL_MODE
 		l.Config.Mode = configuration.TAIL_MODE
 	}
 	}
-	u, err := url.Parse(l.Config.URL)
-	if err != nil {
-		return err
-	}
-	if l.Config.Since.IsZero() {
-		l.Config.Since = timestamp(time.Now())
+	if l.Config.Prefix == "" {
+		l.Config.Prefix = "/"
 	}
 	}
-	if u.User != nil {
-		l.auth = u.User
-	}
-	err = l.buildUrl()
-	if err != nil {
-		return errors.Wrap(err, "Cannot build Loki url")
-	}
-	err = l.prepareConfig()
-	if err != nil {
-		return errors.Wrap(err, "Cannot prepare Loki config")
-	}
-	return nil
-}
 
 
-func (l *LokiSource) prepareConfig() error {
-	if l.Config.Query == "" {
-		return errors.New("Loki query is mandatory")
-	}
-	l.dialer = &websocket.Dialer{}
-	l.header = http.Header{}
-	if l.Config.TenantID != "" {
-		l.header.Set("X-Scope-OrgID", l.Config.TenantID)
-	}
-	l.header.Set("User-Agent", "Crowdsec "+cwversion.Version)
-	for k, v := range l.Config.Headers {
-		l.header.Set(k, v)
-	}
-	if l.auth != nil {
-		l.header.Set("Authorization", "Basic "+base64.StdEncoding.EncodeToString([]byte(l.auth.String())))
+	if !strings.HasSuffix(l.Config.Prefix, "/") {
+		l.Config.Prefix = l.Config.Prefix + "/"
 	}
 	}
-	return nil
-}
 
 
-func (l *LokiSource) buildUrl() error {
-	u, err := url.Parse(l.Config.URL)
-	if err != nil {
-		return errors.Wrap(err, "Cannot parse Loki URL : "+l.Config.URL)
-	}
-	l.lokiReady = fmt.Sprintf("%s://%s/ready", u.Scheme, u.Host)
-
-	buff := bytes.Buffer{}
-	switch u.Scheme {
-	case "http":
-		buff.WriteString("ws")
-	case "https":
-		buff.WriteString("wss")
-	default:
-		return fmt.Errorf("unknown scheme : %s", u.Scheme)
-	}
-	buff.WriteString("://")
-	buff.WriteString(u.Host)
-	if u.Path == "" || u.Path == "/" {
-		buff.WriteString("/loki/api/v1/tail")
-	} else {
-		buff.WriteString(u.Path)
+	if l.Config.Limit == 0 {
+		l.Config.Limit = lokiLimit
 	}
 	}
-	buff.WriteByte('?')
-	params := url.Values{}
-	if l.Config.Query != "" {
-		params.Add("query", l.Config.Query)
+
+	if l.Config.Mode == configuration.TAIL_MODE {
+		l.logger.Infof("Resetting since")
+		l.Config.Since = 0
 	}
 	}
-	params.Add("limit", fmt.Sprintf("%d", lokiLimit))
-	if l.Config.DelayFor != 0 {
-		params.Add("delay_for", fmt.Sprintf("%d", int64(l.Config.DelayFor.Seconds())))
+
+	l.logger.Infof("Since value: %s", l.Config.Since.String())
+
+	clientConfig := lokiclient.Config{
+		LokiURL: l.Config.URL,
+		Headers: l.Config.Headers,
+		Limit:   l.Config.Limit,
+		Query:   l.Config.Query,
+		Since:   l.Config.Since,
 	}
 	}
-	start := time.Time(l.Config.Since)
-	params.Add("start", fmt.Sprintf("%d", start.UnixNano()))
-	buff.WriteString(params.Encode())
-	l.lokiWebsocket = buff.String()
-	l.logger.Info("Websocket url : ", l.lokiWebsocket)
+
+	l.client = lokiclient.NewLokiClient(clientConfig)
+	l.client.Logger = logger.WithField("component", "lokiclient")
 	return nil
 	return nil
 }
 }
 
 
@@ -182,14 +135,12 @@ func (l *LokiSource) ConfigureByDSN(dsn string, labels map[string]string, logger
 	if u.Host == "" {
 	if u.Host == "" {
 		return errors.New("Empty loki host")
 		return errors.New("Empty loki host")
 	}
 	}
-	scheme := "https"
+	scheme := "http"
 	// FIXME how can use http with container, in a private network?
 	// FIXME how can use http with container, in a private network?
 	if u.Host == "localhost" || u.Host == "127.0.0.1" || u.Host == "[::1]" {
 	if u.Host == "localhost" || u.Host == "127.0.0.1" || u.Host == "[::1]" {
 		scheme = "http"
 		scheme = "http"
 	}
 	}
-	if u.User != nil {
-		l.auth = u.User
-	}
+
 	l.Config.URL = fmt.Sprintf("%s://%s", scheme, u.Host)
 	l.Config.URL = fmt.Sprintf("%s://%s", scheme, u.Host)
 	params := u.Query()
 	params := u.Query()
 	if q := params.Get("query"); q != "" {
 	if q := params.Get("query"); q != "" {
@@ -211,22 +162,44 @@ func (l *LokiSource) ConfigureByDSN(dsn string, labels map[string]string, logger
 		l.Config.DelayFor = delayFor
 		l.Config.DelayFor = delayFor
 	}
 	}
 	if s := params.Get("since"); s != "" {
 	if s := params.Get("since"); s != "" {
-		err = yaml.Unmarshal([]byte(s), &l.Config.Since)
+		l.Config.Since, err = time.ParseDuration(s)
 		if err != nil {
 		if err != nil {
-			return errors.Wrap(err, "can't parse since in DSB configuration")
+			return errors.Wrap(err, "can't parse since in DSN configuration")
 		}
 		}
 	}
 	}
-	l.Config.TenantID = params.Get("tenantID")
 
 
-	err = l.buildUrl()
-	if err != nil {
-		return errors.Wrap(err, "Cannot build Loki url from DSN")
+	if limit := params.Get("limit"); limit != "" {
+		limit, err := strconv.Atoi(limit)
+		if err != nil {
+			return errors.Wrap(err, "can't parse limit in DSN configuration")
+		}
+		l.Config.Limit = limit
+	} else {
+		l.Config.Limit = 5000 // max limit allowed by loki
 	}
 	}
-	err = l.prepareConfig()
-	if err != nil {
-		return errors.Wrap(err, "Cannot prepare Loki from DSN")
+
+	if logLevel := params.Get("log_level"); logLevel != "" {
+		level, err := log.ParseLevel(logLevel)
+		if err != nil {
+			return errors.Wrap(err, "can't parse log_level in DSN configuration")
+		}
+		l.Config.LogLevel = &level
+		l.logger.Logger.SetLevel(level)
+	}
+
+	clientConfig := lokiclient.Config{
+		LokiURL:  l.Config.URL,
+		Headers:  l.Config.Headers,
+		Limit:    l.Config.Limit,
+		Query:    l.Config.Query,
+		Since:    l.Config.Since,
+		Username: l.Config.Username,
+		Password: l.Config.Password,
 	}
 	}
 
 
+	l.client = lokiclient.NewLokiClient(clientConfig)
+	l.client.Logger = logger.WithField("component", "lokiclient")
+
 	return nil
 	return nil
 }
 }
 
 
@@ -240,81 +213,39 @@ func (l *LokiSource) GetName() string {
 
 
 // OneShotAcquisition reads a set of file and returns when done
 // OneShotAcquisition reads a set of file and returns when done
 func (l *LokiSource) OneShotAcquisition(out chan types.Event, t *tomb.Tomb) error {
 func (l *LokiSource) OneShotAcquisition(out chan types.Event, t *tomb.Tomb) error {
-	err := l.ready()
+	l.logger.Debug("Loki one shot acquisition")
+	readyCtx, cancel := context.WithTimeout(context.Background(), l.Config.WaitForReady)
+	defer cancel()
+	err := l.client.Ready(readyCtx)
 	if err != nil {
 	if err != nil {
-		return errors.Wrap(err, "error while getting OneShotAcquisition")
+		return errors.Wrap(err, "loki is not ready")
 	}
 	}
 
 
-	// See https://grafana.com/docs/loki/latest/api/#get-lokiapiv1query_range
-	params := &url.Values{}
-	params.Set("query", l.Config.Query)
-	params.Set("direction", "forward") // FIXME
-	params.Set("limit", fmt.Sprintf("%d", lokiLimit))
-	params.Set("end", time.Now().Format(time.RFC3339))
-	start := time.Time(l.Config.Since)
-
-	var lq LokiQuery
-	defer t.Kill(nil)
-	defer l.logger.Info("Loki queried")
+	ctx, cancel := context.WithCancel(context.Background())
+	c := l.client.QueryRange(ctx)
 
 
 	for {
 	for {
-		params.Set("start", start.Format(time.RFC3339))
-		url := fmt.Sprintf("%s/loki/api/v1/query_range?%s",
-			l.Config.URL,
-			params.Encode())
-		logger := l.logger.WithField("url", url)
-		req, err := http.NewRequest("GET", url, nil)
-		if err != nil {
-			logger.WithError(err).Error("Loki NewRequest error")
-			return errors.Wrap(err, "Loki error while build new request")
-		}
-		req.Header = l.header
-
-		resp, err := http.DefaultClient.Do(req)
-		if err != nil {
-			logger.WithError(err).Error("http error")
-			return errors.Wrap(err, "Error while querying loki")
-		}
-		defer resp.Body.Close()
-		if resp.StatusCode != 200 {
-			msg, _ := io.ReadAll(resp.Body)
-			logger.WithField("status", resp.StatusCode).WithField("body", string(msg)).Error("loki error")
-			return fmt.Errorf("Loki query return bad status : %d", resp.StatusCode)
-		}
-		decoder := json.NewDecoder(resp.Body)
-		err = decoder.Decode(&lq)
-		if err != nil {
-			return errors.Wrap(err, "can't parse JSON loki response")
-		}
-		if len(lq.Data.Result) == 0 {
+		select {
+		case <-t.Dying():
+			l.logger.Debug("Loki one shot acquisition stopped")
+			cancel()
 			return nil
 			return nil
-		}
-		for _, result := range lq.Data.Result {
-			if len(result.Values) == 0 {
+		case resp, ok := <-c:
+			if !ok {
+				l.logger.Info("Loki acuiqisition done, chan closed")
+				cancel()
 				return nil
 				return nil
 			}
 			}
-			start = result.Values[0].Timestamp
-			logger.WithField("stream", result.Stream).Debug("Results", len(result.Values))
-			for _, entry := range result.Values {
-				l.readOneEntry(entry, result.Stream, out)
-			}
-			if len(result.Values) <= lokiLimit {
-				return nil
+			for _, stream := range resp.Data.Result {
+				for _, entry := range stream.Entries {
+					l.readOneEntry(entry, l.Config.Labels, out)
+				}
 			}
 			}
 		}
 		}
 	}
 	}
-	return nil
-}
-
-func (l *LokiSource) readOneTail(resp Tail, out chan types.Event) {
-	for _, stream := range resp.Streams {
-		for _, entry := range stream.Entries {
-			l.readOneEntry(entry, stream.Stream, out)
-		}
-	}
 }
 }
 
 
-func (l *LokiSource) readOneEntry(entry Entry, labels map[string]string, out chan types.Event) {
+func (l *LokiSource) readOneEntry(entry lokiclient.Entry, labels map[string]string, out chan types.Event) {
 	ll := types.Line{}
 	ll := types.Line{}
 	ll.Raw = entry.Line
 	ll.Raw = entry.Line
 	ll.Time = entry.Timestamp
 	ll.Time = entry.Timestamp
@@ -333,58 +264,34 @@ func (l *LokiSource) readOneEntry(entry Entry, labels map[string]string, out cha
 }
 }
 
 
 func (l *LokiSource) StreamingAcquisition(out chan types.Event, t *tomb.Tomb) error {
 func (l *LokiSource) StreamingAcquisition(out chan types.Event, t *tomb.Tomb) error {
-	err := l.ready()
+	readyCtx, cancel := context.WithTimeout(context.Background(), l.Config.WaitForReady)
+	defer cancel()
+	err := l.client.Ready(readyCtx)
 	if err != nil {
 	if err != nil {
-		return errors.Wrap(err, "error while getting StreamingAcquisition")
+		return errors.Wrap(err, "loki is not ready")
 	}
 	}
 	ll := l.logger.WithField("websocket url", l.lokiWebsocket)
 	ll := l.logger.WithField("websocket url", l.lokiWebsocket)
 	t.Go(func() error {
 	t.Go(func() error {
+		ctx, cancel := context.WithCancel(context.Background())
+		defer cancel()
+		respChan, err := l.client.Tail(ctx)
+		if err != nil {
+			ll.Errorf("could not start loki tail: %s", err)
+			return errors.Wrap(err, "could not start loki tail")
+		}
 		for {
 		for {
-			ctx, cancel := context.WithTimeout(context.TODO(), readyTimeout)
-			defer cancel()
-			go func() {
-				<-t.Dying()
-				cancel() // close the websocket.
-			}()
-			c, res, err := l.dialer.DialContext(ctx, l.lokiWebsocket, l.header)
-			if err != nil {
-				if oerr, ok := err.(*net.OpError); ok && oerr.Err.Error() == "operation was canceled" {
-					// it's ok, the websocket connection is closed by the client, triggered by the tomb, lets stop
-					return nil
-				}
-				if res == nil { // no body, it's a network error, not a HTTP error
-					ll.WithError(err).Error("loki StreamingAcquisition error before HTTP stack")
-					break
-				}
-				buf, err2 := ioutil.ReadAll(res.Body)
-				if err2 == nil {
-					ll.WithField("body", string(buf)).WithField("status", res.StatusCode).Error("loki http error")
-					break
-				}
-				ll.WithError(err2).Error("can't read loki http body")
-				break
-			}
-			defer c.Close()
-			_, reader, err := c.NextReader()
-			if err != nil {
-				ll.WithError(err).Error("loki StreamingAcquisition error while reading JSON websocket")
-				break
-			}
-			var resp Tail
-			decoder := json.NewDecoder(reader)
-			for { // draining the websocket
-				if !t.Alive() { // someone want to close this loop
-					return nil
+			select {
+			case resp := <-respChan:
+				if len(resp.DroppedEntries) > 0 {
+					ll.Warnf("%d entries dropped from loki response", len(resp.DroppedEntries))
 				}
 				}
-				err = decoder.Decode(&resp)
-				if err != nil {
-					if err == io.EOF { // the websocket is closed
-						break
+				for _, stream := range resp.Streams {
+					for _, entry := range stream.Entries {
+						l.readOneEntry(entry, l.Config.Labels, out)
 					}
 					}
-					ll.WithError(err).Error("loki StreamingAcquisition error while parsing JSON websocket")
 				}
 				}
-				l.logger.WithField("type", t).WithField("message", resp).Debug("Message receveid")
-				l.readOneTail(resp, out)
+			case <-t.Dying():
+				return nil
 			}
 			}
 		}
 		}
 	})
 	})
@@ -399,38 +306,6 @@ func (l *LokiSource) Dump() interface{} {
 	return l
 	return l
 }
 }
 
 
-func (l *LokiSource) ready() error {
-	client := &http.Client{
-		Timeout: l.Config.WaitForReady,
-	}
-
-	req, err := http.NewRequest("GET", l.lokiReady, nil)
-	if err != nil {
-		return err
-	}
-	req.Header = l.header
-
-	for i := 0; i < int(l.Config.WaitForReady/time.Second); i++ {
-		resp, err := client.Do(req)
-		if err != nil {
-			return errors.Wrap(err, "Test Loki services for readiness")
-		}
-		if resp.StatusCode == 200 {
-			return nil
-		} else {
-			body, err := ioutil.ReadAll(resp.Body)
-			if err != nil {
-				return errors.Wrap(err, "can't read body while testing Loki readiness")
-			}
-			defer resp.Body.Close()
-			l.logger.WithField("status", resp.StatusCode).WithField("bofy", string(body)).Info("Loki is not ready")
-			time.Sleep(time.Second)
-		}
-	}
-
-	return fmt.Errorf("Loki service %s is not ready", l.lokiReady)
-}
-
 //SupportedModes returns the supported modes by the acquisition module
 //SupportedModes returns the supported modes by the acquisition module
 func (l *LokiSource) SupportedModes() []string {
 func (l *LokiSource) SupportedModes() []string {
 	return []string{configuration.TAIL_MODE, configuration.CAT_MODE}
 	return []string{configuration.TAIL_MODE, configuration.CAT_MODE}

+ 108 - 110
pkg/acquisition/modules/loki/loki_test.go

@@ -1,4 +1,4 @@
-package loki
+package loki_test
 
 
 import (
 import (
 	"bytes"
 	"bytes"
@@ -11,10 +11,14 @@ import (
 	"testing"
 	"testing"
 	"time"
 	"time"
 
 
+	"context"
+
+	"github.com/crowdsecurity/crowdsec/pkg/acquisition/modules/loki"
 	"github.com/crowdsecurity/crowdsec/pkg/cstest"
 	"github.com/crowdsecurity/crowdsec/pkg/cstest"
 	"github.com/crowdsecurity/crowdsec/pkg/types"
 	"github.com/crowdsecurity/crowdsec/pkg/types"
 	log "github.com/sirupsen/logrus"
 	log "github.com/sirupsen/logrus"
 	tomb "gopkg.in/tomb.v2"
 	tomb "gopkg.in/tomb.v2"
+	"gotest.tools/v3/assert"
 )
 )
 
 
 func TestConfiguration(t *testing.T) {
 func TestConfiguration(t *testing.T) {
@@ -26,24 +30,19 @@ func TestConfiguration(t *testing.T) {
 		expectedErr  string
 		expectedErr  string
 		password     string
 		password     string
 		waitForReady time.Duration
 		waitForReady time.Duration
+		testName     string
 	}{
 	}{
 		{
 		{
 			config:      `foobar: asd`,
 			config:      `foobar: asd`,
 			expectedErr: "line 1: field foobar not found in type loki.LokiConfiguration",
 			expectedErr: "line 1: field foobar not found in type loki.LokiConfiguration",
+			testName:    "Unknown field",
 		},
 		},
 		{
 		{
 			config: `
 			config: `
 mode: tail
 mode: tail
 source: loki`,
 source: loki`,
-			expectedErr: "Cannot build Loki url",
-		},
-		{
-			config: `
-mode: tail
-source: loki
-url: stuff://localhost:3100
-`,
-			expectedErr: "unknown scheme : stuff",
+			expectedErr: "Loki query is mandatory",
+			testName:    "Missing url",
 		},
 		},
 		{
 		{
 			config: `
 			config: `
@@ -52,6 +51,7 @@ source: loki
 url: http://localhost:3100/
 url: http://localhost:3100/
 `,
 `,
 			expectedErr: "Loki query is mandatory",
 			expectedErr: "Loki query is mandatory",
+			testName:    "Missing query",
 		},
 		},
 		{
 		{
 			config: `
 			config: `
@@ -62,6 +62,7 @@ query: >
         {server="demo"}
         {server="demo"}
 `,
 `,
 			expectedErr: "",
 			expectedErr: "",
+			testName:    "Correct config",
 		},
 		},
 		{
 		{
 			config: `
 			config: `
@@ -73,6 +74,7 @@ query: >
         {server="demo"}
         {server="demo"}
 `,
 `,
 			expectedErr: "",
 			expectedErr: "",
+			testName:    "Correct config with wait_for_ready",
 		},
 		},
 		{
 		{
 
 
@@ -85,30 +87,33 @@ query: >
 `,
 `,
 			expectedErr: "",
 			expectedErr: "",
 			password:    "bar",
 			password:    "bar",
+			testName:    "Correct config with password",
 		},
 		},
 	}
 	}
 	subLogger := log.WithFields(log.Fields{
 	subLogger := log.WithFields(log.Fields{
 		"type": "loki",
 		"type": "loki",
 	})
 	})
 	for _, test := range tests {
 	for _, test := range tests {
-		lokiSource := LokiSource{}
-		err := lokiSource.Configure([]byte(test.config), subLogger)
-		cstest.AssertErrorContains(t, err, test.expectedErr)
-		if test.password == "" {
-			if lokiSource.auth != nil {
-				t.Fatalf("No auth should be here : %v", lokiSource.auth)
-			}
-		} else {
-			p, _ := lokiSource.auth.Password()
-			if test.password != p {
-				t.Fatalf("Bad password %s != %s", test.password, p)
-			}
-		}
-		if test.waitForReady != 0 {
-			if lokiSource.Config.WaitForReady != test.waitForReady {
-				t.Fatalf("Wrong WaitForReady %v != %v", lokiSource.Config.WaitForReady, test.waitForReady)
+		t.Run(test.testName, func(t *testing.T) {
+			lokiSource := loki.LokiSource{}
+			err := lokiSource.Configure([]byte(test.config), subLogger)
+			cstest.AssertErrorContains(t, err, test.expectedErr)
+			/*if test.password == "" {
+				if lokiSource.auth != nil {
+					t.Fatalf("No auth should be here : %v", lokiSource.auth)
+				}
+			} else {
+				p, _ := lokiSource.auth.Password()
+				if test.password != p {
+					t.Fatalf("Bad password %s != %s", test.password, p)
+				}
+			}*/
+			if test.waitForReady != 0 {
+				if lokiSource.Config.WaitForReady != test.waitForReady {
+					t.Fatalf("Wrong WaitForReady %v != %v", lokiSource.Config.WaitForReady, test.waitForReady)
+				}
 			}
 			}
-		}
+		})
 	}
 	}
 }
 }
 
 
@@ -147,11 +152,11 @@ func TestConfigureDSN(t *testing.T) {
 			dsn:   `loki://127.0.0.1:3100/?since=3h&query={server="demo"}`,
 			dsn:   `loki://127.0.0.1:3100/?since=3h&query={server="demo"}`,
 			since: time.Now().Add(-3 * time.Hour),
 			since: time.Now().Add(-3 * time.Hour),
 		},
 		},
-		{
+		/*{
 			name:     "Basic Auth",
 			name:     "Basic Auth",
 			dsn:      `loki://login:password@localhost:3100/?query={server="demo"}`,
 			dsn:      `loki://login:password@localhost:3100/?query={server="demo"}`,
 			password: "password",
 			password: "password",
-		},
+		},*/
 		{
 		{
 			name:         "Correct DSN",
 			name:         "Correct DSN",
 			dsn:          `loki://localhost:3100/?query={server="demo"}&wait_for_ready=5s`,
 			dsn:          `loki://localhost:3100/?query={server="demo"}&wait_for_ready=5s`,
@@ -165,10 +170,10 @@ func TestConfigureDSN(t *testing.T) {
 			"type": "loki",
 			"type": "loki",
 			"name": test.name,
 			"name": test.name,
 		})
 		})
-		lokiSource := &LokiSource{}
+		lokiSource := &loki.LokiSource{}
 		err := lokiSource.ConfigureByDSN(test.dsn, map[string]string{"type": "testtype"}, subLogger)
 		err := lokiSource.ConfigureByDSN(test.dsn, map[string]string{"type": "testtype"}, subLogger)
 		cstest.AssertErrorContains(t, err, test.expectedErr)
 		cstest.AssertErrorContains(t, err, test.expectedErr)
-		if time.Time(lokiSource.Config.Since).Round(time.Second) != test.since.Round(time.Second) {
+		/*if time.Time(lokiSource.Config.Since).Round(time.Second) != test.since.Round(time.Second) {
 			t.Fatalf("Invalid since %v", lokiSource.Config.Since)
 			t.Fatalf("Invalid since %v", lokiSource.Config.Since)
 		}
 		}
 		if test.password == "" {
 		if test.password == "" {
@@ -184,7 +189,7 @@ func TestConfigureDSN(t *testing.T) {
 			if !strings.HasPrefix(a, "Basic ") {
 			if !strings.HasPrefix(a, "Basic ") {
 				t.Fatalf("Bad auth header : %s", a)
 				t.Fatalf("Bad auth header : %s", a)
 			}
 			}
-		}
+		}*/
 		if test.waitForReady != 0 {
 		if test.waitForReady != 0 {
 			if lokiSource.Config.WaitForReady != test.waitForReady {
 			if lokiSource.Config.WaitForReady != test.waitForReady {
 				t.Fatalf("Wrong WaitForReady %v != %v", lokiSource.Config.WaitForReady, test.waitForReady)
 				t.Fatalf("Wrong WaitForReady %v != %v", lokiSource.Config.WaitForReady, test.waitForReady)
@@ -242,8 +247,7 @@ func TestOneShotAcquisition(t *testing.T) {
 mode: cat
 mode: cat
 source: loki
 source: loki
 url: http://127.0.0.1:3100
 url: http://127.0.0.1:3100
-query: >
-        {server="demo",key="%s"}
+query: '{server="demo",key="%s"}'
 since: 1h
 since: 1h
 `, title),
 `, title),
 		},
 		},
@@ -251,11 +255,10 @@ since: 1h
 
 
 	for _, ts := range tests {
 	for _, ts := range tests {
 		logger := log.New()
 		logger := log.New()
-		logger.SetLevel(log.InfoLevel)
 		subLogger := logger.WithFields(log.Fields{
 		subLogger := logger.WithFields(log.Fields{
 			"type": "loki",
 			"type": "loki",
 		})
 		})
-		lokiSource := LokiSource{}
+		lokiSource := loki.LokiSource{}
 		err := lokiSource.Configure([]byte(ts.config), subLogger)
 		err := lokiSource.Configure([]byte(ts.config), subLogger)
 		if err != nil {
 		if err != nil {
 			t.Fatalf("Unexpected error : %s", err)
 			t.Fatalf("Unexpected error : %s", err)
@@ -267,9 +270,11 @@ since: 1h
 		}
 		}
 
 
 		out := make(chan types.Event)
 		out := make(chan types.Event)
+		read := 0
 		go func() {
 		go func() {
-			for i := 0; i < 20; i++ {
+			for {
 				<-out
 				<-out
+				read++
 			}
 			}
 		}()
 		}()
 		lokiTomb := tomb.Tomb{}
 		lokiTomb := tomb.Tomb{}
@@ -277,6 +282,8 @@ since: 1h
 		if err != nil {
 		if err != nil {
 			t.Fatalf("Unexpected error : %s", err)
 			t.Fatalf("Unexpected error : %s", err)
 		}
 		}
+		assert.Equal(t, 20, read)
+
 	}
 	}
 }
 }
 
 
@@ -286,14 +293,11 @@ func TestStreamingAcquisition(t *testing.T) {
 	log.Info("Test 'TestStreamingAcquisition'")
 	log.Info("Test 'TestStreamingAcquisition'")
 	title := time.Now().String()
 	title := time.Now().String()
 	tests := []struct {
 	tests := []struct {
-		name           string
-		config         string
-		expectedErr    string
-		streamErr      string
-		expectedOutput string
-		expectedLines  int
-		logType        string
-		logLevel       log.Level
+		name          string
+		config        string
+		expectedErr   string
+		streamErr     string
+		expectedLines int
 	}{
 	}{
 		{
 		{
 			name: "Bad port",
 			name: "Bad port",
@@ -302,14 +306,11 @@ mode: tail
 source: loki
 source: loki
 url: http://127.0.0.1:3101
 url: http://127.0.0.1:3101
 query: >
 query: >
-        {server="demo"}
+  {server="demo"}
 `, // No Loki server here
 `, // No Loki server here
-			expectedErr:    "",
-			streamErr:      `Get "http://127.0.0.1:3101/ready": dial tcp 127.0.0.1:3101: connect: connection refused`,
-			expectedOutput: "",
-			expectedLines:  0,
-			logType:        "test",
-			logLevel:       log.InfoLevel,
+			expectedErr:   "",
+			streamErr:     `loki is not ready: context deadline exceeded`,
+			expectedLines: 0,
 		},
 		},
 		{
 		{
 			name: "ok",
 			name: "ok",
@@ -319,68 +320,71 @@ source: loki
 url: http://127.0.0.1:3100
 url: http://127.0.0.1:3100
 query: >
 query: >
         {server="demo"}
         {server="demo"}
-`, // No Loki server here
-			expectedErr:    "",
-			streamErr:      "",
-			expectedOutput: "",
-			expectedLines:  0,
-			logType:        "test",
-			logLevel:       log.InfoLevel,
+`,
+			expectedErr:   "",
+			streamErr:     "",
+			expectedLines: 20,
 		},
 		},
 	}
 	}
 	for _, ts := range tests {
 	for _, ts := range tests {
-		logger := log.New()
-		subLogger := logger.WithFields(log.Fields{
-			"type": "loki",
-			"name": ts.name,
-		})
+		t.Run(ts.name, func(t *testing.T) {
+			logger := log.New()
+			subLogger := logger.WithFields(log.Fields{
+				"type": "loki",
+				"name": ts.name,
+			})
 
 
-		if ts.expectedOutput != "" {
-			logger.SetLevel(ts.logLevel)
-		}
-		out := make(chan types.Event)
-		lokiTomb := tomb.Tomb{}
-		lokiSource := LokiSource{}
-		err := lokiSource.Configure([]byte(ts.config), subLogger)
-		if err != nil {
-			t.Fatalf("Unexpected error : %s", err)
-		}
-		streamTomb := tomb.Tomb{}
-		streamTomb.Go(func() error {
-			return lokiSource.StreamingAcquisition(out, &lokiTomb)
-		})
+			out := make(chan types.Event)
+			lokiTomb := tomb.Tomb{}
+			lokiSource := loki.LokiSource{}
+			err := lokiSource.Configure([]byte(ts.config), subLogger)
+			if err != nil {
+				t.Fatalf("Unexpected error : %s", err)
+			}
+			err = lokiSource.StreamingAcquisition(out, &lokiTomb)
+			cstest.AssertErrorContains(t, err, ts.streamErr)
 
 
-		readTomb := tomb.Tomb{}
-		readTomb.Go(func() error {
-			for i := 0; i < 20; i++ {
-				evt := <-out
-				fmt.Println(evt)
-				if !strings.HasSuffix(evt.Line.Raw, title) {
-					return fmt.Errorf("Incorrect suffix : %s", evt.Line.Raw)
-				}
+			if ts.streamErr != "" {
+				return
 			}
 			}
-			return nil
-		})
 
 
-		writerTomb := tomb.Tomb{}
-		writerTomb.Go(func() error {
-			return feedLoki(subLogger, 20, title)
-		})
-		err = writerTomb.Wait()
-		if err != nil {
-			t.Fatalf("Unexpected error : %s", err)
-		}
+			time.Sleep(time.Second * 2) //We need to give time to start reading from the WS
+			readTomb := tomb.Tomb{}
+			readCtx, cancel := context.WithTimeout(context.Background(), time.Second*10)
+			count := 0
 
 
-		err = streamTomb.Wait()
-		cstest.AssertErrorContains(t, err, ts.streamErr)
+			readTomb.Go(func() error {
+				defer cancel()
+				for {
+					select {
+					case <-readCtx.Done():
+						return readCtx.Err()
+					case evt := <-out:
+						count++
+						if !strings.HasSuffix(evt.Line.Raw, title) {
+							return fmt.Errorf("Incorrect suffix : %s", evt.Line.Raw)
+						}
+						if count == ts.expectedLines {
+							return nil
+						}
+					}
+				}
+			})
+
+			err = feedLoki(subLogger, ts.expectedLines, title)
+			if err != nil {
+				t.Fatalf("Unexpected error : %s", err)
+			}
 
 
-		if err == nil {
 			err = readTomb.Wait()
 			err = readTomb.Wait()
+			cancel()
 			if err != nil {
 			if err != nil {
 				t.Fatalf("Unexpected error : %s", err)
 				t.Fatalf("Unexpected error : %s", err)
 			}
 			}
-		}
+			assert.Equal(t, count, ts.expectedLines)
+		})
 	}
 	}
+
 }
 }
 
 
 func TestStopStreaming(t *testing.T) {
 func TestStopStreaming(t *testing.T) {
@@ -396,27 +400,21 @@ query: >
 		"type": "loki",
 		"type": "loki",
 	})
 	})
 	title := time.Now().String()
 	title := time.Now().String()
-	lokiSource := LokiSource{}
+	lokiSource := loki.LokiSource{}
 	err := lokiSource.Configure([]byte(config), subLogger)
 	err := lokiSource.Configure([]byte(config), subLogger)
 	if err != nil {
 	if err != nil {
 		t.Fatalf("Unexpected error : %s", err)
 		t.Fatalf("Unexpected error : %s", err)
 	}
 	}
 	out := make(chan types.Event)
 	out := make(chan types.Event)
-	drainTomb := tomb.Tomb{}
-	drainTomb.Go(func() error {
-		<-out
-		return nil
-	})
+
 	lokiTomb := &tomb.Tomb{}
 	lokiTomb := &tomb.Tomb{}
 	err = lokiSource.StreamingAcquisition(out, lokiTomb)
 	err = lokiSource.StreamingAcquisition(out, lokiTomb)
 	if err != nil {
 	if err != nil {
 		t.Fatalf("Unexpected error : %s", err)
 		t.Fatalf("Unexpected error : %s", err)
 	}
 	}
+	time.Sleep(time.Second * 2)
 	feedLoki(subLogger, 1, title)
 	feedLoki(subLogger, 1, title)
-	err = drainTomb.Wait()
-	if err != nil {
-		t.Fatalf("Unexpected error : %s", err)
-	}
+
 	lokiTomb.Kill(nil)
 	lokiTomb.Kill(nil)
 	err = lokiTomb.Wait()
 	err = lokiTomb.Wait()
 	if err != nil {
 	if err != nil {