Bladeren bron

Loki integration #2 (#2306)

 * Add support for loki datasource

---------

Co-authored-by: Mathieu Lecarme <mathieu@garambrogne.net>
Co-authored-by: Sebastien Blot <sebastien@crowdsec.net>
Co-authored-by: Thibault "bui" Koechlin <thibault@crowdsec.net>
lperdereau 1 jaar geleden
bovenliggende
commit
92f923cfa8

+ 12 - 0
.github/workflows/go-tests.yml

@@ -108,6 +108,18 @@ jobs:
           --health-timeout 10s
           --health-retries 5
 
+      loki:
+        image: grafana/loki:2.8.0
+        ports:
+          - "3100:3100"
+        options: >-
+          --name=loki1
+          --health-cmd "wget -q -O -  http://localhost:3100/ready | grep 'ready'"
+          --health-interval 30s
+          --health-timeout 10s
+          --health-retries 5
+          --health-start-period 30s
+
     steps:
 
     - name: Check out CrowdSec repository

+ 2 - 2
go.mod

@@ -49,6 +49,7 @@ require (
 	github.com/google/uuid v1.3.0
 	github.com/google/winops v0.0.0-20230712152054-af9b550d0601
 	github.com/goombaio/namegenerator v0.0.0-20181006234301-989e774b106e
+	github.com/gorilla/websocket v1.5.0
 	github.com/hashicorp/go-hclog v1.5.0
 	github.com/hashicorp/go-plugin v1.4.10
 	github.com/hashicorp/go-version v1.2.1
@@ -87,6 +88,7 @@ require (
 	gopkg.in/tomb.v2 v2.0.0-20161208151619-d5d1b5820637
 	gopkg.in/yaml.v2 v2.4.0
 	gopkg.in/yaml.v3 v3.0.1
+	gotest.tools/v3 v3.5.0
 	k8s.io/apiserver v0.27.3
 )
 
@@ -126,7 +128,6 @@ require (
 	github.com/golang/protobuf v1.5.3 // indirect
 	github.com/google/go-cmp v0.5.9 // indirect
 	github.com/google/gofuzz v1.2.0 // indirect
-	github.com/gorilla/websocket v1.5.0 // indirect
 	github.com/hashicorp/hcl/v2 v2.13.0 // indirect
 	github.com/hashicorp/yamux v0.0.0-20180604194846-3520598351bb // indirect
 	github.com/huandu/xstrings v1.3.2 // indirect
@@ -201,7 +202,6 @@ require (
 	google.golang.org/genproto v0.0.0-20230410155749-daa745c078e1 // indirect
 	gopkg.in/inf.v0 v0.9.1 // indirect
 	gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 // indirect
-	gotest.tools/v3 v3.5.0 // indirect
 	k8s.io/api v0.27.3 // indirect
 	k8s.io/apimachinery v0.27.3 // indirect
 	k8s.io/klog/v2 v2.90.1 // indirect

+ 3 - 2
pkg/acquisition/acquisition.go

@@ -25,6 +25,7 @@ import (
 	kafkaacquisition "github.com/crowdsecurity/crowdsec/pkg/acquisition/modules/kafka"
 	kinesisacquisition "github.com/crowdsecurity/crowdsec/pkg/acquisition/modules/kinesis"
 	k8sauditacquisition "github.com/crowdsecurity/crowdsec/pkg/acquisition/modules/kubernetesaudit"
+	lokiacquisition "github.com/crowdsecurity/crowdsec/pkg/acquisition/modules/loki"
 	s3acquisition "github.com/crowdsecurity/crowdsec/pkg/acquisition/modules/s3"
 	syslogacquisition "github.com/crowdsecurity/crowdsec/pkg/acquisition/modules/syslog"
 	wineventlogacquisition "github.com/crowdsecurity/crowdsec/pkg/acquisition/modules/wineventlog"
@@ -36,7 +37,7 @@ import (
 
 type DataSourceUnavailableError struct {
 	Name string
-	Err error
+	Err  error
 }
 
 func (e *DataSourceUnavailableError) Error() string {
@@ -47,7 +48,6 @@ func (e *DataSourceUnavailableError) Unwrap() error {
 	return e.Err
 }
 
-
 // The interface each datasource must implement
 type DataSource interface {
 	GetMetrics() []prometheus.Collector                                 // Returns pointers to metrics that are managed by the module
@@ -74,6 +74,7 @@ var AcquisitionSources = map[string]func() DataSource{
 	"wineventlog": func() DataSource { return &wineventlogacquisition.WinEventLogSource{} },
 	"kafka":       func() DataSource { return &kafkaacquisition.KafkaSource{} },
 	"k8s-audit":   func() DataSource { return &k8sauditacquisition.KubernetesAuditSource{} },
+	"loki":        func() DataSource { return &lokiacquisition.LokiSource{} },
 	"s3":          func() DataSource { return &s3acquisition.S3Source{} },
 }
 

+ 60 - 0
pkg/acquisition/modules/loki/entry.go

@@ -0,0 +1,60 @@
+package loki
+
+import (
+	"encoding/json"
+	"strconv"
+	"time"
+)
+
+type Entry struct {
+	Timestamp time.Time
+	Line      string
+}
+
+func (e *Entry) UnmarshalJSON(b []byte) error {
+	var values []string
+	err := json.Unmarshal(b, &values)
+	if err != nil {
+		return err
+	}
+	t, err := strconv.Atoi(values[0])
+	if err != nil {
+		return err
+	}
+	e.Timestamp = time.Unix(int64(t), 0)
+	e.Line = values[1]
+	return nil
+}
+
+type Stream struct {
+	Stream  map[string]string `json:"stream"`
+	Entries []Entry           `json:"values"`
+}
+
+type DroppedEntry struct {
+	Labels    map[string]string `json:"labels"`
+	Timestamp time.Time         `json:"timestamp"`
+}
+
+type Tail struct {
+	Streams        []Stream       `json:"streams"`
+	DroppedEntries []DroppedEntry `json:"dropped_entries"`
+}
+
+// LokiQuery GET response.
+// See https://grafana.com/docs/loki/latest/api/#get-lokiapiv1query
+type LokiQuery struct {
+	Status string `json:"status"`
+	Data   Data   `json:"data"`
+}
+
+type Data struct {
+	ResultType string         `json:"resultType"`
+	Result     []StreamResult `json:"result"` // Warning, just stream value is handled
+	Stats      interface{}    `json:"stats"`  // Stats is boring, just ignore it
+}
+
+type StreamResult struct {
+	Stream map[string]string `json:"stream"`
+	Values []Entry           `json:"values"`
+}

+ 315 - 0
pkg/acquisition/modules/loki/internal/lokiclient/loki_client.go

@@ -0,0 +1,315 @@
+package lokiclient
+
+import (
+	"context"
+	"encoding/base64"
+	"encoding/json"
+	"fmt"
+	"io"
+	"net/http"
+	"net/url"
+	"strconv"
+	"time"
+
+	"github.com/crowdsecurity/crowdsec/pkg/cwversion"
+	"github.com/gorilla/websocket"
+	"github.com/pkg/errors"
+	log "github.com/sirupsen/logrus"
+	"gopkg.in/tomb.v2"
+)
+
+type LokiClient struct {
+	Logger *log.Entry
+
+	config                Config
+	t                     *tomb.Tomb
+	fail_start            time.Time
+	currentTickerInterval time.Duration
+}
+
+type Config struct {
+	LokiURL    string
+	LokiPrefix string
+	Query      string
+	Headers    map[string]string
+
+	Username string
+	Password string
+
+	Since time.Duration
+	Until time.Duration
+
+	FailMaxDuration time.Duration
+
+	DelayFor int
+	Limit    int
+}
+
+func updateURI(uri string, lq LokiQueryRangeResponse, infinite bool) string {
+	u, _ := url.Parse(uri)
+	queryParams := u.Query()
+
+	if len(lq.Data.Result) > 0 {
+		lastTs := lq.Data.Result[0].Entries[len(lq.Data.Result[0].Entries)-1].Timestamp
+		// +1 the last timestamp to avoid getting the same result again.
+		queryParams.Set("start", strconv.Itoa(int(lastTs.UnixNano()+1)))
+	}
+
+	if infinite {
+		queryParams.Set("end", strconv.Itoa(int(time.Now().UnixNano())))
+	}
+
+	u.RawQuery = queryParams.Encode()
+	return u.String()
+}
+
+func (lc *LokiClient) SetTomb(t *tomb.Tomb) {
+	lc.t = t
+}
+
+func (lc *LokiClient) resetFailStart() {
+	if !lc.fail_start.IsZero() {
+		log.Infof("loki is back after %s", time.Since(lc.fail_start))
+	}
+	lc.fail_start = time.Time{}
+}
+func (lc *LokiClient) shouldRetry() bool {
+	if lc.fail_start.IsZero() {
+		lc.Logger.Warningf("loki is not available, will retry for %s", lc.config.FailMaxDuration)
+		lc.fail_start = time.Now()
+		return true
+	}
+	if time.Since(lc.fail_start) > lc.config.FailMaxDuration {
+		lc.Logger.Errorf("loki didn't manage to recover after %s, giving up", lc.config.FailMaxDuration)
+		return false
+	}
+	return true
+}
+
+func (lc *LokiClient) increaseTicker(ticker *time.Ticker) {
+	maxTicker := 10 * time.Second
+	if lc.currentTickerInterval < maxTicker {
+		lc.currentTickerInterval *= 2
+		if lc.currentTickerInterval > maxTicker {
+			lc.currentTickerInterval = maxTicker
+		}
+		ticker.Reset(lc.currentTickerInterval)
+	}
+}
+
+func (lc *LokiClient) decreaseTicker(ticker *time.Ticker) {
+	minTicker := 100 * time.Millisecond
+	if lc.currentTickerInterval != minTicker {
+		lc.currentTickerInterval = minTicker
+		ticker.Reset(lc.currentTickerInterval)
+	}
+}
+
+func (lc *LokiClient) queryRange(uri string, ctx context.Context, c chan *LokiQueryRangeResponse, infinite bool) error {
+	lc.currentTickerInterval = 100 * time.Millisecond
+	ticker := time.NewTicker(lc.currentTickerInterval)
+	defer ticker.Stop()
+	for {
+		select {
+		case <-ctx.Done():
+			return ctx.Err()
+		case <-lc.t.Dying():
+			return lc.t.Err()
+		case <-ticker.C:
+			resp, err := http.Get(uri)
+			if err != nil {
+				if ok := lc.shouldRetry(); !ok {
+					return errors.Wrapf(err, "error querying range")
+				} else {
+					lc.increaseTicker(ticker)
+					continue
+				}
+			}
+
+			if resp.StatusCode != http.StatusOK {
+				body, _ := io.ReadAll(resp.Body)
+				resp.Body.Close()
+				if ok := lc.shouldRetry(); !ok {
+					return errors.Wrapf(err, "bad HTTP response code: %d: %s", resp.StatusCode, string(body))
+				} else {
+					lc.increaseTicker(ticker)
+					continue
+				}
+			}
+
+			var lq LokiQueryRangeResponse
+			if err := json.NewDecoder(resp.Body).Decode(&lq); err != nil {
+				resp.Body.Close()
+				if ok := lc.shouldRetry(); !ok {
+					return errors.Wrapf(err, "error decoding Loki response")
+				} else {
+					lc.increaseTicker(ticker)
+					continue
+				}
+			}
+			resp.Body.Close()
+			lc.Logger.Tracef("Got response: %+v", lq)
+			c <- &lq
+			lc.resetFailStart()
+			if !infinite && (len(lq.Data.Result) == 0 || len(lq.Data.Result[0].Entries) < lc.config.Limit) {
+				lc.Logger.Infof("Got less than %d results (%d), stopping", lc.config.Limit, len(lq.Data.Result))
+				close(c)
+				return nil
+			}
+			if len(lq.Data.Result) > 0 {
+				lc.Logger.Debugf("(timer:%v) %d results / %d entries result[0] (uri:%s)", lc.currentTickerInterval, len(lq.Data.Result), len(lq.Data.Result[0].Entries), uri)
+			} else {
+				lc.Logger.Debugf("(timer:%v) no results (uri:%s)", lc.currentTickerInterval, uri)
+			}
+			if infinite {
+				if len(lq.Data.Result) > 0 { //as long as we get results, we keep lowest ticker
+					lc.decreaseTicker(ticker)
+				} else {
+					lc.increaseTicker(ticker)
+				}
+			}
+
+			uri = updateURI(uri, lq, infinite)
+		}
+	}
+}
+
+func (lc *LokiClient) getURLFor(endpoint string, params map[string]string) string {
+	u, err := url.Parse(lc.config.LokiURL)
+	if err != nil {
+		return ""
+	}
+	queryParams := u.Query()
+	for k, v := range params {
+		queryParams.Set(k, v)
+	}
+	u.RawQuery = queryParams.Encode()
+
+	u.Path, err = url.JoinPath(lc.config.LokiPrefix, u.Path, endpoint)
+
+	if err != nil {
+		return ""
+	}
+
+	if endpoint == "loki/api/v1/tail" {
+		if u.Scheme == "http" {
+			u.Scheme = "ws"
+		} else {
+			u.Scheme = "wss"
+		}
+	}
+
+	return u.String()
+}
+
+func (lc *LokiClient) Ready(ctx context.Context) error {
+	tick := time.NewTicker(500 * time.Millisecond)
+	url := lc.getURLFor("ready", nil)
+	for {
+		select {
+		case <-ctx.Done():
+			tick.Stop()
+			return ctx.Err()
+		case <-lc.t.Dying():
+			tick.Stop()
+			return lc.t.Err()
+		case <-tick.C:
+			lc.Logger.Debug("Checking if Loki is ready")
+			resp, err := http.Get(url)
+			if err != nil {
+				lc.Logger.Warnf("Error checking if Loki is ready: %s", err)
+				continue
+			}
+			_ = resp.Body.Close()
+			if resp.StatusCode != http.StatusOK {
+				lc.Logger.Debugf("Loki is not ready, status code: %d", resp.StatusCode)
+				continue
+			}
+			lc.Logger.Info("Loki is ready")
+			return nil
+		}
+	}
+}
+
+func (lc *LokiClient) Tail(ctx context.Context) (chan *LokiResponse, error) {
+	responseChan := make(chan *LokiResponse)
+	dialer := &websocket.Dialer{}
+	u := lc.getURLFor("loki/api/v1/tail", map[string]string{
+		"limit":     strconv.Itoa(lc.config.Limit),
+		"start":     strconv.Itoa(int(time.Now().Add(-lc.config.Since).UnixNano())),
+		"query":     lc.config.Query,
+		"delay_for": strconv.Itoa(lc.config.DelayFor),
+	})
+
+	lc.Logger.Debugf("Since: %s (%s)", lc.config.Since, time.Now().Add(-lc.config.Since))
+
+	if lc.config.Username != "" || lc.config.Password != "" {
+		dialer.Proxy = func(req *http.Request) (*url.URL, error) {
+			req.SetBasicAuth(lc.config.Username, lc.config.Password)
+			return nil, nil
+		}
+	}
+
+	requestHeader := http.Header{}
+	for k, v := range lc.config.Headers {
+		requestHeader.Add(k, v)
+	}
+	requestHeader.Set("User-Agent", "Crowdsec "+cwversion.VersionStr())
+	lc.Logger.Infof("Connecting to %s", u)
+	conn, _, err := dialer.Dial(u, requestHeader)
+
+	if err != nil {
+		lc.Logger.Errorf("Error connecting to websocket, err: %s", err)
+		return responseChan, fmt.Errorf("error connecting to websocket")
+	}
+
+	lc.t.Go(func() error {
+		for {
+			jsonResponse := &LokiResponse{}
+			err = conn.ReadJSON(jsonResponse)
+
+			if err != nil {
+				lc.Logger.Errorf("Error reading from websocket: %s", err)
+				return fmt.Errorf("websocket error: %w", err)
+			}
+
+			responseChan <- jsonResponse
+		}
+	})
+
+	return responseChan, nil
+}
+
+func (lc *LokiClient) QueryRange(ctx context.Context, infinite bool) chan *LokiQueryRangeResponse {
+	url := lc.getURLFor("loki/api/v1/query_range", map[string]string{
+		"query":     lc.config.Query,
+		"start":     strconv.Itoa(int(time.Now().Add(-lc.config.Since).UnixNano())),
+		"end":       strconv.Itoa(int(time.Now().UnixNano())),
+		"limit":     strconv.Itoa(lc.config.Limit),
+		"direction": "forward",
+	})
+
+	c := make(chan *LokiQueryRangeResponse)
+
+	lc.Logger.Debugf("Since: %s (%s)", lc.config.Since, time.Now().Add(-lc.config.Since))
+
+	requestHeader := http.Header{}
+	for k, v := range lc.config.Headers {
+		requestHeader.Add(k, v)
+	}
+
+	if lc.config.Username != "" || lc.config.Password != "" {
+		requestHeader.Set("Authorization", "Basic "+base64.StdEncoding.EncodeToString([]byte(lc.config.Username+":"+lc.config.Password)))
+	}
+
+	requestHeader.Set("User-Agent", "Crowdsec "+cwversion.VersionStr())
+	lc.Logger.Infof("Connecting to %s", url)
+	lc.t.Go(func() error {
+		return lc.queryRange(url, ctx, c, infinite)
+	})
+	return c
+}
+
+func NewLokiClient(config Config) *LokiClient {
+	return &LokiClient{Logger: log.WithField("component", "lokiclient"), config: config}
+}

+ 55 - 0
pkg/acquisition/modules/loki/internal/lokiclient/types.go

@@ -0,0 +1,55 @@
+package lokiclient
+
+import (
+	"encoding/json"
+	"strconv"
+	"time"
+)
+
+type Entry struct {
+	Timestamp time.Time
+	Line      string
+}
+
+func (e *Entry) UnmarshalJSON(b []byte) error {
+	var values []string
+	err := json.Unmarshal(b, &values)
+	if err != nil {
+		return err
+	}
+	t, err := strconv.Atoi(values[0])
+	if err != nil {
+		return err
+	}
+	e.Timestamp = time.Unix(0, int64(t))
+	e.Line = values[1]
+	return nil
+}
+
+type Stream struct {
+	Stream  map[string]string `json:"stream"`
+	Entries []Entry           `json:"values"`
+}
+
+type DroppedEntry struct {
+	Labels    map[string]string `json:"labels"`
+	Timestamp time.Time         `json:"timestamp"`
+}
+
+type LokiResponse struct {
+	Streams        []Stream      `json:"streams"`
+	DroppedEntries []interface{} `json:"dropped_entries"` //We don't care about the actual content i think ?
+}
+
+// LokiQuery GET response.
+// See https://grafana.com/docs/loki/latest/api/#get-lokiapiv1query
+type LokiQueryRangeResponse struct {
+	Status string `json:"status"`
+	Data   Data   `json:"data"`
+}
+
+type Data struct {
+	ResultType string      `json:"resultType"`
+	Result     []Stream    `json:"result"` // Warning, just stream value is handled
+	Stats      interface{} `json:"stats"`  // Stats is boring, just ignore it
+}

+ 370 - 0
pkg/acquisition/modules/loki/loki.go

@@ -0,0 +1,370 @@
+package loki
+
+/*
+https://grafana.com/docs/loki/latest/api/#get-lokiapiv1tail
+*/
+
+import (
+	"context"
+	"fmt"
+	"net/url"
+	"strconv"
+	"strings"
+	"time"
+
+	"github.com/pkg/errors"
+	"github.com/prometheus/client_golang/prometheus"
+	log "github.com/sirupsen/logrus"
+	tomb "gopkg.in/tomb.v2"
+	yaml "gopkg.in/yaml.v2"
+
+	"github.com/crowdsecurity/crowdsec/pkg/acquisition/configuration"
+	lokiclient "github.com/crowdsecurity/crowdsec/pkg/acquisition/modules/loki/internal/lokiclient"
+	"github.com/crowdsecurity/crowdsec/pkg/types"
+)
+
+const (
+	readyTimeout time.Duration = 3 * time.Second
+	readyLoop    int           = 3
+	readySleep   time.Duration = 10 * time.Second
+	lokiLimit    int           = 100
+)
+
+var linesRead = prometheus.NewCounterVec(
+	prometheus.CounterOpts{
+		Name: "cs_lokisource_hits_total",
+		Help: "Total lines that were read.",
+	},
+	[]string{"source"})
+
+type LokiAuthConfiguration struct {
+	Username string `yaml:"username"`
+	Password string `yaml:"password"`
+}
+
+type LokiConfiguration struct {
+	URL                               string                `yaml:"url"`    // Loki url
+	Prefix                            string                `yaml:"prefix"` // Loki prefix
+	Query                             string                `yaml:"query"`  // LogQL query
+	Limit                             int                   `yaml:"limit"`  // Limit of logs to read
+	DelayFor                          time.Duration         `yaml:"delay_for"`
+	Since                             time.Duration         `yaml:"since"`
+	Headers                           map[string]string     `yaml:"headers"`        // HTTP headers for talking to Loki
+	WaitForReady                      time.Duration         `yaml:"wait_for_ready"` // Retry interval, default is 10 seconds
+	Auth                              LokiAuthConfiguration `yaml:"auth"`
+	MaxFailureDuration                time.Duration         `yaml:"max_failure_duration"` // Max duration of failure before stopping the source
+	configuration.DataSourceCommonCfg `yaml:",inline"`
+}
+
+type LokiSource struct {
+	Config LokiConfiguration
+
+	Client *lokiclient.LokiClient
+
+	logger        *log.Entry
+	lokiWebsocket string
+}
+
+func (l *LokiSource) GetMetrics() []prometheus.Collector {
+	return []prometheus.Collector{linesRead}
+}
+
+func (l *LokiSource) GetAggregMetrics() []prometheus.Collector {
+	return []prometheus.Collector{linesRead}
+}
+
+func (l *LokiSource) UnmarshalConfig(yamlConfig []byte) error {
+	err := yaml.UnmarshalStrict(yamlConfig, &l.Config)
+	if err != nil {
+		return fmt.Errorf("cannot parse loki acquisition configuration: %w", err)
+	}
+
+	if l.Config.Query == "" {
+		return errors.New("loki query is mandatory")
+	}
+
+	if l.Config.WaitForReady == 0 {
+		l.Config.WaitForReady = 10 * time.Second
+	}
+
+	if l.Config.DelayFor < 0*time.Second || l.Config.DelayFor > 5*time.Second {
+		return errors.New("delay_for should be a value between 1s and 5s")
+	}
+
+	if l.Config.Mode == "" {
+		l.Config.Mode = configuration.TAIL_MODE
+	}
+	if l.Config.Prefix == "" {
+		l.Config.Prefix = "/"
+	}
+
+	if !strings.HasSuffix(l.Config.Prefix, "/") {
+		l.Config.Prefix += "/"
+	}
+
+	if l.Config.Limit == 0 {
+		l.Config.Limit = lokiLimit
+	}
+
+	if l.Config.Mode == configuration.TAIL_MODE {
+		l.logger.Infof("Resetting since")
+		l.Config.Since = 0
+	}
+
+	if l.Config.MaxFailureDuration == 0 {
+		l.Config.MaxFailureDuration = 30 * time.Second
+	}
+
+	return nil
+}
+
+func (l *LokiSource) Configure(config []byte, logger *log.Entry) error {
+	l.Config = LokiConfiguration{}
+	l.logger = logger
+	err := l.UnmarshalConfig(config)
+	if err != nil {
+		return err
+	}
+
+	l.logger.Infof("Since value: %s", l.Config.Since.String())
+
+	clientConfig := lokiclient.Config{
+		LokiURL:         l.Config.URL,
+		Headers:         l.Config.Headers,
+		Limit:           l.Config.Limit,
+		Query:           l.Config.Query,
+		Since:           l.Config.Since,
+		Username:        l.Config.Auth.Username,
+		Password:        l.Config.Auth.Password,
+		FailMaxDuration: l.Config.MaxFailureDuration,
+	}
+
+	l.Client = lokiclient.NewLokiClient(clientConfig)
+	l.Client.Logger = logger.WithFields(log.Fields{"component": "lokiclient", "source": l.Config.URL})
+	return nil
+}
+
+func (l *LokiSource) ConfigureByDSN(dsn string, labels map[string]string, logger *log.Entry, uuid string) error {
+	l.logger = logger
+	l.Config = LokiConfiguration{}
+	l.Config.Mode = configuration.CAT_MODE
+	l.Config.Labels = labels
+	l.Config.UniqueId = uuid
+
+	u, err := url.Parse(dsn)
+	if err != nil {
+		return fmt.Errorf("while parsing dsn '%s': %w", dsn, err)
+	}
+	if u.Scheme != "loki" {
+		return fmt.Errorf("invalid DSN %s for loki source, must start with loki://", dsn)
+	}
+	if u.Host == "" {
+		return errors.New("empty loki host")
+	}
+	scheme := "http"
+
+	params := u.Query()
+	if q := params.Get("ssl"); q != "" {
+		scheme = "https"
+	}
+	if q := params.Get("query"); q != "" {
+		l.Config.Query = q
+	}
+	if w := params.Get("wait_for_ready"); w != "" {
+		l.Config.WaitForReady, err = time.ParseDuration(w)
+		if err != nil {
+			return err
+		}
+	} else {
+		l.Config.WaitForReady = 10 * time.Second
+	}
+
+	if d := params.Get("delay_for"); d != "" {
+		l.Config.DelayFor, err = time.ParseDuration(d)
+		if err != nil {
+			return fmt.Errorf("invalid duration: %w", err)
+		}
+		if l.Config.DelayFor < 0*time.Second || l.Config.DelayFor > 5*time.Second {
+			return errors.New("delay_for should be a value between 1s and 5s")
+		}
+	} else {
+		l.Config.DelayFor = 0 * time.Second
+	}
+
+	if s := params.Get("since"); s != "" {
+		l.Config.Since, err = time.ParseDuration(s)
+		if err != nil {
+			return fmt.Errorf("invalid since in dsn: %w", err)
+		}
+	}
+
+	if max_failure_duration := params.Get("max_failure_duration"); max_failure_duration != "" {
+		duration, err := time.ParseDuration(max_failure_duration)
+		if err != nil {
+			return fmt.Errorf("invalid max_failure_duration in dsn: %w", err)
+		}
+		l.Config.MaxFailureDuration = duration
+	} else {
+		l.Config.MaxFailureDuration = 5 * time.Second // for OneShot mode it doesn't make sense to have longer duration
+	}
+
+	if limit := params.Get("limit"); limit != "" {
+		limit, err := strconv.Atoi(limit)
+		if err != nil {
+			return fmt.Errorf("invalid limit in dsn: %w", err)
+		}
+		l.Config.Limit = limit
+	} else {
+		l.Config.Limit = 5000 // max limit allowed by loki
+	}
+
+	if logLevel := params.Get("log_level"); logLevel != "" {
+		level, err := log.ParseLevel(logLevel)
+		if err != nil {
+			return fmt.Errorf("invalid log_level in dsn: %w", err)
+		}
+		l.Config.LogLevel = &level
+		l.logger.Logger.SetLevel(level)
+	}
+
+	l.Config.URL = fmt.Sprintf("%s://%s", scheme, u.Host)
+	if u.User != nil {
+		l.Config.Auth.Username = u.User.Username()
+		l.Config.Auth.Password, _ = u.User.Password()
+	}
+
+	clientConfig := lokiclient.Config{
+		LokiURL:  l.Config.URL,
+		Headers:  l.Config.Headers,
+		Limit:    l.Config.Limit,
+		Query:    l.Config.Query,
+		Since:    l.Config.Since,
+		Username: l.Config.Auth.Username,
+		Password: l.Config.Auth.Password,
+		DelayFor: int(l.Config.DelayFor / time.Second),
+	}
+
+	l.Client = lokiclient.NewLokiClient(clientConfig)
+	l.Client.Logger = logger.WithFields(log.Fields{"component": "lokiclient", "source": l.Config.URL})
+
+	return nil
+}
+
+func (l *LokiSource) GetMode() string {
+	return l.Config.Mode
+}
+
+func (l *LokiSource) GetName() string {
+	return "loki"
+}
+
+// OneShotAcquisition reads a set of file and returns when done
+func (l *LokiSource) OneShotAcquisition(out chan types.Event, t *tomb.Tomb) error {
+	l.logger.Debug("Loki one shot acquisition")
+	l.Client.SetTomb(t)
+	readyCtx, cancel := context.WithTimeout(context.Background(), l.Config.WaitForReady)
+	defer cancel()
+	err := l.Client.Ready(readyCtx)
+	if err != nil {
+		return fmt.Errorf("loki is not ready: %w", err)
+	}
+
+	ctx, cancel := context.WithCancel(context.Background())
+	c := l.Client.QueryRange(ctx, false)
+
+	for {
+		select {
+		case <-t.Dying():
+			l.logger.Debug("Loki one shot acquisition stopped")
+			cancel()
+			return nil
+		case resp, ok := <-c:
+			if !ok {
+				l.logger.Info("Loki acquisition done, chan closed")
+				cancel()
+				return nil
+			}
+			for _, stream := range resp.Data.Result {
+				for _, entry := range stream.Entries {
+					l.readOneEntry(entry, l.Config.Labels, out)
+				}
+			}
+		}
+	}
+}
+
+func (l *LokiSource) readOneEntry(entry lokiclient.Entry, labels map[string]string, out chan types.Event) {
+	ll := types.Line{}
+	ll.Raw = entry.Line
+	ll.Time = entry.Timestamp
+	ll.Src = l.Config.URL
+	ll.Labels = labels
+	ll.Process = true
+	ll.Module = l.GetName()
+
+	linesRead.With(prometheus.Labels{"source": l.Config.URL}).Inc()
+	expectMode := types.LIVE
+	if l.Config.UseTimeMachine {
+		expectMode = types.TIMEMACHINE
+	}
+	out <- types.Event{
+		Line:       ll,
+		Process:    true,
+		Type:       types.LOG,
+		ExpectMode: expectMode,
+	}
+}
+
+func (l *LokiSource) StreamingAcquisition(out chan types.Event, t *tomb.Tomb) error {
+	l.Client.SetTomb(t)
+	readyCtx, cancel := context.WithTimeout(context.Background(), l.Config.WaitForReady)
+	defer cancel()
+	err := l.Client.Ready(readyCtx)
+	if err != nil {
+		return fmt.Errorf("loki is not ready: %w", err)
+	}
+	ll := l.logger.WithField("websocket_url", l.lokiWebsocket)
+	t.Go(func() error {
+		ctx, cancel := context.WithCancel(context.Background())
+		defer cancel()
+		respChan := l.Client.QueryRange(ctx, true)
+		if err != nil {
+			ll.Errorf("could not start loki tail: %s", err)
+			return fmt.Errorf("while starting loki tail: %w", err)
+		}
+		for {
+			select {
+			case resp, ok := <-respChan:
+				if !ok {
+					ll.Warnf("loki channel closed")
+					return err
+				}
+				for _, stream := range resp.Data.Result {
+					for _, entry := range stream.Entries {
+						l.readOneEntry(entry, l.Config.Labels, out)
+					}
+				}
+			case <-t.Dying():
+				return nil
+			}
+		}
+	})
+	return nil
+}
+
+func (l *LokiSource) CanRun() error {
+	return nil
+}
+
+func (l *LokiSource) GetUuid() string {
+	return l.Config.UniqueId
+}
+
+func (l *LokiSource) Dump() interface{} {
+	return l
+}
+
+// SupportedModes returns the supported modes by the acquisition module
+func (l *LokiSource) SupportedModes() []string {
+	return []string{configuration.TAIL_MODE, configuration.CAT_MODE}
+}

+ 512 - 0
pkg/acquisition/modules/loki/loki_test.go

@@ -0,0 +1,512 @@
+package loki_test
+
+import (
+	"bytes"
+	"encoding/json"
+	"fmt"
+	"io"
+	"net/http"
+	"net/url"
+	"os"
+	"runtime"
+	"strings"
+	"testing"
+	"time"
+
+	"context"
+
+	"github.com/crowdsecurity/go-cs-lib/cstest"
+
+	"github.com/crowdsecurity/crowdsec/pkg/acquisition/modules/loki"
+	"github.com/crowdsecurity/crowdsec/pkg/types"
+	log "github.com/sirupsen/logrus"
+	tomb "gopkg.in/tomb.v2"
+	"gotest.tools/v3/assert"
+)
+
+func TestConfiguration(t *testing.T) {
+
+	log.Infof("Test 'TestConfigure'")
+
+	tests := []struct {
+		config       string
+		expectedErr  string
+		password     string
+		waitForReady time.Duration
+		delayFor     time.Duration
+		testName     string
+	}{
+		{
+			config:      `foobar: asd`,
+			expectedErr: "line 1: field foobar not found in type loki.LokiConfiguration",
+			testName:    "Unknown field",
+		},
+		{
+			config: `
+mode: tail
+source: loki`,
+			expectedErr: "loki query is mandatory",
+			testName:    "Missing url",
+		},
+		{
+			config: `
+mode: tail
+source: loki
+url: http://localhost:3100/
+`,
+			expectedErr: "loki query is mandatory",
+			testName:    "Missing query",
+		},
+		{
+			config: `
+mode: tail
+source: loki
+url: http://localhost:3100/
+query: >
+        {server="demo"}
+`,
+			expectedErr: "",
+			testName:    "Correct config",
+		},
+		{
+			config: `
+mode: tail
+source: loki
+url: http://localhost:3100/
+wait_for_ready: 5s
+query: >
+        {server="demo"}
+`,
+			expectedErr:  "",
+			testName:     "Correct config with wait_for_ready",
+			waitForReady: 5 * time.Second,
+		},
+		{
+			config: `
+mode: tail
+source: loki
+url: http://localhost:3100/
+delay_for: 1s
+query: >
+        {server="demo"}
+`,
+			expectedErr: "",
+			testName:    "Correct config with delay_for",
+			delayFor:    1 * time.Second,
+		},
+		{
+
+			config: `
+mode: tail
+source: loki
+url: http://localhost:3100/
+auth:
+  username: foo
+  password: bar
+query: >
+        {server="demo"}
+`,
+			expectedErr: "",
+			password:    "bar",
+			testName:    "Correct config with password",
+		},
+		{
+
+			config: `
+mode: tail
+source: loki
+url: http://localhost:3100/
+delay_for: 10s
+query: >
+        {server="demo"}
+`,
+			expectedErr: "delay_for should be a value between 1s and 5s",
+			testName:    "Invalid DelayFor",
+		},
+	}
+	subLogger := log.WithFields(log.Fields{
+		"type": "loki",
+	})
+	for _, test := range tests {
+		t.Run(test.testName, func(t *testing.T) {
+			lokiSource := loki.LokiSource{}
+			err := lokiSource.Configure([]byte(test.config), subLogger)
+			cstest.AssertErrorContains(t, err, test.expectedErr)
+			if test.password != "" {
+				p := lokiSource.Config.Auth.Password
+				if test.password != p {
+					t.Fatalf("Password mismatch : %s != %s", test.password, p)
+				}
+			}
+			if test.waitForReady != 0 {
+				if lokiSource.Config.WaitForReady != test.waitForReady {
+					t.Fatalf("Wrong WaitForReady %v != %v", lokiSource.Config.WaitForReady, test.waitForReady)
+				}
+			}
+			if test.delayFor != 0 {
+				if lokiSource.Config.DelayFor != test.delayFor {
+					t.Fatalf("Wrong DelayFor %v != %v", lokiSource.Config.DelayFor, test.delayFor)
+				}
+			}
+		})
+	}
+}
+
+func TestConfigureDSN(t *testing.T) {
+	log.Infof("Test 'TestConfigureDSN'")
+	tests := []struct {
+		name         string
+		dsn          string
+		expectedErr  string
+		since        time.Time
+		password     string
+		scheme       string
+		waitForReady time.Duration
+		delayFor     time.Duration
+	}{
+		{
+			name:        "Wrong scheme",
+			dsn:         "wrong://",
+			expectedErr: "invalid DSN wrong:// for loki source, must start with loki://",
+		},
+		{
+			name:        "Correct DSN",
+			dsn:         `loki://localhost:3100/?query={server="demo"}`,
+			expectedErr: "",
+		},
+		{
+			name:        "Empty host",
+			dsn:         "loki://",
+			expectedErr: "empty loki host",
+		},
+		{
+			name:        "Invalid DSN",
+			dsn:         "loki",
+			expectedErr: "invalid DSN loki for loki source, must start with loki://",
+		},
+		{
+			name:        "Invalid Delay",
+			dsn:         `loki://localhost:3100/?query={server="demo"}&delay_for=10s`,
+			expectedErr: "delay_for should be a value between 1s and 5s",
+		},
+		{
+			name:  "Bad since param",
+			dsn:   `loki://127.0.0.1:3100/?since=3h&query={server="demo"}`,
+			since: time.Now().Add(-3 * time.Hour),
+		},
+		{
+			name:     "Basic Auth",
+			dsn:      `loki://login:password@localhost:3102/?query={server="demo"}`,
+			password: "password",
+		},
+		{
+			name:         "Correct DSN",
+			dsn:          `loki://localhost:3100/?query={server="demo"}&wait_for_ready=5s&delay_for=1s`,
+			expectedErr:  "",
+			waitForReady: 5 * time.Second,
+			delayFor:     1 * time.Second,
+		},
+		{
+			name:   "SSL DSN",
+			dsn:    `loki://localhost:3100/?ssl=true`,
+			scheme: "https",
+		},
+	}
+
+	for _, test := range tests {
+		subLogger := log.WithFields(log.Fields{
+			"type": "loki",
+			"name": test.name,
+		})
+		t.Logf("Test : %s", test.name)
+		lokiSource := &loki.LokiSource{}
+		err := lokiSource.ConfigureByDSN(test.dsn, map[string]string{"type": "testtype"}, subLogger, "")
+		cstest.AssertErrorContains(t, err, test.expectedErr)
+
+		noDuration, _ := time.ParseDuration("0s")
+		if lokiSource.Config.Since != noDuration && lokiSource.Config.Since.Round(time.Second) != time.Since(test.since).Round(time.Second) {
+			t.Fatalf("Invalid since %v", lokiSource.Config.Since)
+		}
+
+		if test.password != "" {
+			p := lokiSource.Config.Auth.Password
+			if test.password != p {
+				t.Fatalf("Password mismatch : %s != %s", test.password, p)
+			}
+		}
+		if test.scheme != "" {
+			url, _ := url.Parse(lokiSource.Config.URL)
+			if test.scheme != url.Scheme {
+				t.Fatalf("Schema mismatch : %s != %s", test.scheme, url.Scheme)
+			}
+		}
+		if test.waitForReady != 0 {
+			if lokiSource.Config.WaitForReady != test.waitForReady {
+				t.Fatalf("Wrong WaitForReady %v != %v", lokiSource.Config.WaitForReady, test.waitForReady)
+			}
+		}
+		if test.delayFor != 0 {
+			if lokiSource.Config.DelayFor != test.delayFor {
+				t.Fatalf("Wrong DelayFor %v != %v", lokiSource.Config.DelayFor, test.delayFor)
+			}
+		}
+	}
+}
+
+func feedLoki(logger *log.Entry, n int, title string) error {
+	streams := LogStreams{
+		Streams: []LogStream{
+			{
+				Stream: map[string]string{
+					"server": "demo",
+					"domain": "cw.example.com",
+					"key":    title,
+				},
+				Values: make([]LogValue, n),
+			},
+		},
+	}
+	for i := 0; i < n; i++ {
+		streams.Streams[0].Values[i] = LogValue{
+			Time: time.Now(),
+			Line: fmt.Sprintf("Log line #%d %v", i, title),
+		}
+	}
+	buff, err := json.Marshal(streams)
+	if err != nil {
+		return err
+	}
+	resp, err := http.Post("http://127.0.0.1:3100/loki/api/v1/push", "application/json", bytes.NewBuffer(buff))
+	if err != nil {
+		return err
+	}
+	if resp.StatusCode != http.StatusNoContent {
+		b, _ := io.ReadAll(resp.Body)
+		logger.Error(string(b))
+		return fmt.Errorf("Bad post status %d", resp.StatusCode)
+	}
+	logger.Info(n, " Events sent")
+	return nil
+}
+
+func TestOneShotAcquisition(t *testing.T) {
+	if runtime.GOOS == "windows" {
+		t.Skip("Skipping test on windows")
+	}
+	log.SetOutput(os.Stdout)
+	log.SetLevel(log.InfoLevel)
+	log.Info("Test 'TestStreamingAcquisition'")
+	title := time.Now().String() // Loki will be messy, with a lot of stuff, lets use a unique key
+	tests := []struct {
+		config string
+	}{
+		{
+			config: fmt.Sprintf(`
+mode: cat
+source: loki
+url: http://127.0.0.1:3100
+query: '{server="demo",key="%s"}'
+since: 1h
+`, title),
+		},
+	}
+
+	for _, ts := range tests {
+		logger := log.New()
+		subLogger := logger.WithFields(log.Fields{
+			"type": "loki",
+		})
+		lokiSource := loki.LokiSource{}
+		err := lokiSource.Configure([]byte(ts.config), subLogger)
+		if err != nil {
+			t.Fatalf("Unexpected error : %s", err)
+		}
+
+		err = feedLoki(subLogger, 20, title)
+		if err != nil {
+			t.Fatalf("Unexpected error : %s", err)
+		}
+
+		out := make(chan types.Event)
+		read := 0
+		go func() {
+			for {
+				<-out
+				read++
+			}
+		}()
+		lokiTomb := tomb.Tomb{}
+		err = lokiSource.OneShotAcquisition(out, &lokiTomb)
+		if err != nil {
+			t.Fatalf("Unexpected error : %s", err)
+		}
+		assert.Equal(t, 20, read)
+
+	}
+}
+
+func TestStreamingAcquisition(t *testing.T) {
+	if runtime.GOOS == "windows" {
+		t.Skip("Skipping test on windows")
+	}
+	log.SetOutput(os.Stdout)
+	log.SetLevel(log.InfoLevel)
+	log.Info("Test 'TestStreamingAcquisition'")
+	title := time.Now().String()
+	tests := []struct {
+		name          string
+		config        string
+		expectedErr   string
+		streamErr     string
+		expectedLines int
+	}{
+		{
+			name: "Bad port",
+			config: `
+mode: tail
+source: loki
+url: http://127.0.0.1:3101
+query: >
+  {server="demo"}
+`, // No Loki server here
+			expectedErr:   "",
+			streamErr:     `loki is not ready: context deadline exceeded`,
+			expectedLines: 0,
+		},
+		{
+			name: "ok",
+			config: `
+mode: tail
+source: loki
+url: http://127.0.0.1:3100
+query: >
+  {server="demo"}
+`,
+			expectedErr:   "",
+			streamErr:     "",
+			expectedLines: 20,
+		},
+	}
+	for _, ts := range tests {
+		t.Run(ts.name, func(t *testing.T) {
+			logger := log.New()
+			subLogger := logger.WithFields(log.Fields{
+				"type": "loki",
+				"name": ts.name,
+			})
+
+			out := make(chan types.Event)
+			lokiTomb := tomb.Tomb{}
+			lokiSource := loki.LokiSource{}
+			err := lokiSource.Configure([]byte(ts.config), subLogger)
+			if err != nil {
+				t.Fatalf("Unexpected error : %s", err)
+			}
+			err = lokiSource.StreamingAcquisition(out, &lokiTomb)
+			cstest.AssertErrorContains(t, err, ts.streamErr)
+
+			if ts.streamErr != "" {
+				return
+			}
+
+			time.Sleep(time.Second * 2) //We need to give time to start reading from the WS
+			readTomb := tomb.Tomb{}
+			readCtx, cancel := context.WithTimeout(context.Background(), time.Second*10)
+			count := 0
+
+			readTomb.Go(func() error {
+				defer cancel()
+				for {
+					select {
+					case <-readCtx.Done():
+						return readCtx.Err()
+					case evt := <-out:
+						count++
+						if !strings.HasSuffix(evt.Line.Raw, title) {
+							return fmt.Errorf("Incorrect suffix : %s", evt.Line.Raw)
+						}
+						if count == ts.expectedLines {
+							return nil
+						}
+					}
+				}
+			})
+
+			err = feedLoki(subLogger, ts.expectedLines, title)
+			if err != nil {
+				t.Fatalf("Unexpected error : %s", err)
+			}
+
+			err = readTomb.Wait()
+			cancel()
+			if err != nil {
+				t.Fatalf("Unexpected error : %s", err)
+			}
+			assert.Equal(t, count, ts.expectedLines)
+		})
+	}
+
+}
+
+func TestStopStreaming(t *testing.T) {
+	if runtime.GOOS == "windows" {
+		t.Skip("Skipping test on windows")
+	}
+	config := `
+mode: tail
+source: loki
+url: http://127.0.0.1:3100
+query: >
+  {server="demo"}
+`
+	logger := log.New()
+	subLogger := logger.WithFields(log.Fields{
+		"type": "loki",
+	})
+	title := time.Now().String()
+	lokiSource := loki.LokiSource{}
+	err := lokiSource.Configure([]byte(config), subLogger)
+	if err != nil {
+		t.Fatalf("Unexpected error : %s", err)
+	}
+	out := make(chan types.Event)
+
+	lokiTomb := &tomb.Tomb{}
+	err = lokiSource.StreamingAcquisition(out, lokiTomb)
+	if err != nil {
+		t.Fatalf("Unexpected error : %s", err)
+	}
+	time.Sleep(time.Second * 2)
+	err = feedLoki(subLogger, 1, title)
+	if err != nil {
+		t.Fatalf("Unexpected error : %s", err)
+	}
+
+	lokiTomb.Kill(nil)
+	err = lokiTomb.Wait()
+	if err != nil {
+		t.Fatalf("Unexpected error : %s", err)
+	}
+}
+
+type LogStreams struct {
+	Streams []LogStream `json:"streams"`
+}
+
+type LogStream struct {
+	Stream map[string]string `json:"stream"`
+	Values []LogValue        `json:"values"`
+}
+
+type LogValue struct {
+	Time time.Time
+	Line string
+}
+
+func (l *LogValue) MarshalJSON() ([]byte, error) {
+	line, err := json.Marshal(l.Line)
+	if err != nil {
+		return nil, err
+	}
+	return []byte(fmt.Sprintf(`["%d",%s]`, l.Time.UnixNano(), string(line))), nil
+}

+ 29 - 0
pkg/acquisition/modules/loki/timestamp.go

@@ -0,0 +1,29 @@
+package loki
+
+import (
+	"fmt"
+	"time"
+)
+
+type timestamp time.Time
+
+func (t *timestamp) UnmarshalYAML(unmarshal func(interface{}) error) error {
+	var tt time.Time
+	err := unmarshal(&tt)
+	if err == nil {
+		*t = timestamp(tt)
+		return nil
+	}
+	var d time.Duration
+	err = unmarshal(&d)
+	if err == nil {
+		*t = timestamp(time.Now().Add(-d))
+		fmt.Println("t", time.Time(*t).Format(time.RFC3339))
+		return nil
+	}
+	return err
+}
+
+func (t *timestamp) IsZero() bool {
+	return time.Time(*t).IsZero()
+}

+ 47 - 0
pkg/acquisition/modules/loki/timestamp_test.go

@@ -0,0 +1,47 @@
+package loki
+
+import (
+	"testing"
+	"time"
+
+	"gopkg.in/yaml.v2"
+)
+
+func TestTimestampFail(t *testing.T) {
+	var tt timestamp
+	err := yaml.Unmarshal([]byte("plop"), tt)
+	if err == nil {
+		t.Fail()
+	}
+}
+
+func TestTimestampTime(t *testing.T) {
+	var tt timestamp
+	const ts string = "2022-06-14T12:56:39+02:00"
+	err := yaml.Unmarshal([]byte(ts), &tt)
+	if err != nil {
+		t.Error(err)
+		t.Fail()
+	}
+	if ts != time.Time(tt).Format(time.RFC3339) {
+		t.Fail()
+	}
+}
+
+func TestTimestampDuration(t *testing.T) {
+	var tt timestamp
+	err := yaml.Unmarshal([]byte("3h"), &tt)
+	if err != nil {
+		t.Error(err)
+		t.Fail()
+	}
+	d, err := time.ParseDuration("3h")
+	if err != nil {
+		t.Error(err)
+		t.Fail()
+	}
+	z := time.Now().Add(-d)
+	if z.Round(time.Second) != time.Time(tt).Round(time.Second) {
+		t.Fail()
+	}
+}

+ 10 - 0
pkg/setup/detect_test.go

@@ -983,6 +983,16 @@ func TestDetectDatasourceValidation(t *testing.T) {
 				      source: kafka`,
 			expected:    setup.Setup{Setup: []setup.ServiceSetup{}},
 			expectedErr: "invalid datasource for foobar: cannot create a kafka reader with an empty list of broker addresses",
+		}, {
+			name: "source loki: required fields",
+			config: `
+				version: 1.0
+				detect:
+				  foobar:
+				    datasource:
+				      source: loki`,
+			expected:    setup.Setup{Setup: []setup.ServiceSetup{}},
+			expectedErr: "invalid datasource for foobar: loki query is mandatory",
 		},
 	}
 

+ 5 - 0
test/localstack/docker-compose.yml

@@ -77,3 +77,8 @@ services:
       interval: 10s
       retries: 5
       timeout: 10s
+
+  loki:
+    image: grafana/loki:2.8.0
+    ports:
+      - "3100:3100"