fix: Update

This commit is contained in:
Louis PERDEREAU 2023-06-24 18:38:46 +02:00 committed by lperdereau
parent 8f3a20b7a4
commit 3dadf6898a
6 changed files with 50 additions and 38 deletions
go.modgo.sum
pkg/acquisition/modules/loki
test/localstack

2
go.mod
View file

@ -128,8 +128,6 @@ require (
github.com/google/gofuzz v1.2.0 // indirect
github.com/gorilla/websocket v1.5.0 // indirect
github.com/hashicorp/hcl/v2 v2.13.0 // indirect
github.com/gorilla/mux v1.7.3 // indirect
github.com/gorilla/websocket v1.5.0 // indirect
github.com/hashicorp/yamux v0.0.0-20180604194846-3520598351bb // indirect
github.com/huandu/xstrings v1.3.2 // indirect
github.com/imdario/mergo v0.3.12 // indirect

3
go.sum
View file

@ -414,8 +414,7 @@ github.com/googleapis/gax-go/v2 v2.0.5/go.mod h1:DWXyrwAJ9X0FpwwEdw+IPEYBICEFu5m
github.com/goombaio/namegenerator v0.0.0-20181006234301-989e774b106e h1:XmA6L9IPRdUr28a+SK/oMchGgQy159wvzXA5tJ7l+40=
github.com/goombaio/namegenerator v0.0.0-20181006234301-989e774b106e/go.mod h1:AFIo+02s+12CEg8Gzz9kzhCbmbq6JcKNrhHffCGA9z4=
github.com/gopherjs/gopherjs v0.0.0-20181017120253-0766667cb4d1/go.mod h1:wJfORRmW1u3UXTncJ5qlYoELFm8eSnnEO6hX4iZ3EWY=
github.com/gorilla/mux v1.7.3 h1:gnP5JzjVOuiZD07fKKToCAOjS0yOpj/qPETTXCCS6hw=
github.com/gorilla/mux v1.7.3/go.mod h1:1lud6UwP+6orDFRuTfBEV8e9/aOM/c4fVVCaMa2zaAs=
github.com/gorilla/websocket v1.4.2/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
github.com/gorilla/websocket v1.5.0 h1:PPwGk2jz7EePpoHN/+ClbZu8SPxiqlu12wZP/3sWmnc=
github.com/gorilla/websocket v1.5.0/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
github.com/gregjones/httpcache v0.0.0-20180305231024-9cad4c3443a7/go.mod h1:FecbI9+v66THATjSRHfNgh1IVFe/9kFxbXtjV0ctIMA=

View file

@ -198,7 +198,7 @@ func (lc *LokiClient) Tail(ctx context.Context) (chan *LokiResponse, error) {
for k, v := range lc.config.Headers {
requestHeader.Add(k, v)
}
requestHeader.Set("User-Agent", "Crowdsec "+cwversion.Version)
requestHeader.Set("User-Agent", "Crowdsec "+cwversion.VersionStr())
lc.Logger.Infof("Connecting to %s", u)
conn, resp, err := dialer.Dial(u, requestHeader)
defer resp.Body.Close()
@ -242,7 +242,7 @@ func (lc *LokiClient) QueryRange(ctx context.Context) chan *LokiQueryRangeRespon
requestHeader.Set("Authorization", "Basic "+base64.StdEncoding.EncodeToString([]byte(lc.config.Username+":"+lc.config.Password)))
}
requestHeader.Set("User-Agent", "Crowdsec "+cwversion.Version)
requestHeader.Set("User-Agent", "Crowdsec "+cwversion.VersionStr())
lc.Logger.Infof("Connecting to %s", url)
lc.t.Go(func() error {
return lc.queryRange(url, ctx, c)

View file

@ -12,16 +12,15 @@ import (
"strings"
"time"
leaky "github.com/crowdsecurity/crowdsec/pkg/leakybucket"
"github.com/crowdsecurity/crowdsec/pkg/acquisition/configuration"
lokiclient "github.com/crowdsecurity/crowdsec/pkg/acquisition/modules/loki/internal/lokiclient"
"github.com/crowdsecurity/crowdsec/pkg/types"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
log "github.com/sirupsen/logrus"
tomb "gopkg.in/tomb.v2"
"gopkg.in/yaml.v2"
yaml "gopkg.in/yaml.v2"
"github.com/crowdsecurity/crowdsec/pkg/acquisition/configuration"
"github.com/crowdsecurity/crowdsec/pkg/types"
lokiclient "github.com/crowdsecurity/crowdsec/pkg/acquisition/modules/loki/internal/lokiclient"
)
const (
@ -55,7 +54,7 @@ type LokiConfiguration struct {
type LokiSource struct {
Config LokiConfiguration
client *lokiclient.LokiClient
Client *lokiclient.LokiClient
logger *log.Entry
lokiWebsocket string
@ -69,10 +68,8 @@ func (l *LokiSource) GetAggregMetrics() []prometheus.Collector {
return []prometheus.Collector{linesRead}
}
func (l *LokiSource) Configure(config []byte, logger *log.Entry) error {
l.Config = LokiConfiguration{}
l.logger = logger
err := yaml.UnmarshalStrict(config, &l.Config)
func (l *LokiSource) UnmarshalConfig(yamlConfig []byte) error {
err := yaml.UnmarshalStrict(yamlConfig, &l.Config)
if err != nil {
return errors.Wrap(err, "Cannot parse LokiAcquisition configuration")
}
@ -104,6 +101,17 @@ func (l *LokiSource) Configure(config []byte, logger *log.Entry) error {
l.Config.Since = 0
}
return nil
}
func (l *LokiSource) Configure(config []byte, logger *log.Entry) error {
l.Config = LokiConfiguration{}
l.logger = logger
err := l.UnmarshalConfig(config)
if err != nil {
return err
}
l.logger.Infof("Since value: %s", l.Config.Since.String())
clientConfig := lokiclient.Config{
@ -114,12 +122,12 @@ func (l *LokiSource) Configure(config []byte, logger *log.Entry) error {
Since: l.Config.Since,
}
l.client = lokiclient.NewLokiClient(clientConfig)
l.client.Logger = logger.WithField("component", "lokiclient")
l.Client = lokiclient.NewLokiClient(clientConfig)
l.Client.Logger = logger.WithField("component", "lokiclient")
return nil
}
func (l *LokiSource) ConfigureByDSN(dsn string, labels map[string]string, logger *log.Entry) error {
func (l *LokiSource) ConfigureByDSN(dsn string, labels map[string]string, logger *log.Entry, uuid string) error {
l.logger = logger
l.Config = LokiConfiguration{}
l.Config.Mode = configuration.CAT_MODE
@ -136,10 +144,6 @@ func (l *LokiSource) ConfigureByDSN(dsn string, labels map[string]string, logger
return errors.New("Empty loki host")
}
scheme := "http"
// FIXME how can use http with container, in a private network?
if u.Host == "localhost" || u.Host == "127.0.0.1" || u.Host == "[::1]" {
scheme = "http"
}
l.Config.URL = fmt.Sprintf("%s://%s", scheme, u.Host)
params := u.Query()
@ -197,8 +201,8 @@ func (l *LokiSource) ConfigureByDSN(dsn string, labels map[string]string, logger
Password: l.Config.Password,
}
l.client = lokiclient.NewLokiClient(clientConfig)
l.client.Logger = logger.WithField("component", "lokiclient")
l.Client = lokiclient.NewLokiClient(clientConfig)
l.Client.Logger = logger.WithField("component", "lokiclient")
return nil
}
@ -216,13 +220,13 @@ func (l *LokiSource) OneShotAcquisition(out chan types.Event, t *tomb.Tomb) erro
l.logger.Debug("Loki one shot acquisition")
readyCtx, cancel := context.WithTimeout(context.Background(), l.Config.WaitForReady)
defer cancel()
err := l.client.Ready(readyCtx)
err := l.Client.Ready(readyCtx)
if err != nil {
return errors.Wrap(err, "loki is not ready")
}
ctx, cancel := context.WithCancel(context.Background())
c := l.client.QueryRange(ctx)
c := l.Client.QueryRange(ctx)
for {
select {
@ -259,14 +263,14 @@ func (l *LokiSource) readOneEntry(entry lokiclient.Entry, labels map[string]stri
Line: ll,
Process: true,
Type: types.LOG,
ExpectMode: leaky.TIMEMACHINE,
ExpectMode: types.TIMEMACHINE,
}
}
func (l *LokiSource) StreamingAcquisition(out chan types.Event, t *tomb.Tomb) error {
readyCtx, cancel := context.WithTimeout(context.Background(), l.Config.WaitForReady)
defer cancel()
err := l.client.Ready(readyCtx)
err := l.Client.Ready(readyCtx)
if err != nil {
return errors.Wrap(err, "loki is not ready")
}
@ -274,17 +278,17 @@ func (l *LokiSource) StreamingAcquisition(out chan types.Event, t *tomb.Tomb) er
t.Go(func() error {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
respChan, err := l.client.Tail(ctx)
respChan, err := l.Client.Tail(ctx)
if err != nil {
ll.Errorf("could not start loki tail: %s", err)
return errors.Wrap(err, "could not start loki tail")
}
for {
select {
case resp := <-respChan:
if resp == nil {
ll.Warnf("got nil response from loki tail")
continue
case resp, ok := <-respChan:
if !ok {
ll.Warnf("loki channel closed")
return err
}
if len(resp.DroppedEntries) > 0 {
ll.Warnf("%d entries dropped from loki response", len(resp.DroppedEntries))
@ -306,6 +310,10 @@ func (l *LokiSource) CanRun() error {
return nil
}
func (l *LokiSource) GetUuid() string {
return ""
}
func (l *LokiSource) Dump() interface{} {
return l
}

View file

@ -13,8 +13,9 @@ import (
"context"
"github.com/crowdsecurity/go-cs-lib/pkg/cstest"
"github.com/crowdsecurity/crowdsec/pkg/acquisition/modules/loki"
"github.com/crowdsecurity/crowdsec/pkg/cstest"
"github.com/crowdsecurity/crowdsec/pkg/types"
log "github.com/sirupsen/logrus"
tomb "gopkg.in/tomb.v2"
@ -171,7 +172,7 @@ func TestConfigureDSN(t *testing.T) {
"name": test.name,
})
lokiSource := &loki.LokiSource{}
err := lokiSource.ConfigureByDSN(test.dsn, map[string]string{"type": "testtype"}, subLogger)
err := lokiSource.ConfigureByDSN(test.dsn, map[string]string{"type": "testtype"}, subLogger, "")
cstest.AssertErrorContains(t, err, test.expectedErr)
/*if time.Time(lokiSource.Config.Since).Round(time.Second) != test.since.Round(time.Second) {
t.Fatalf("Invalid since %v", lokiSource.Config.Since)
@ -441,5 +442,5 @@ func (l *LogValue) MarshalJSON() ([]byte, error) {
if err != nil {
return nil, err
}
return []byte(fmt.Sprintf(`[%d,%s]`, l.Time.UnixNano(), string(line))), nil
return []byte(fmt.Sprintf(`["%d",%s]`, l.Time.UnixNano(), string(line))), nil
}

View file

@ -77,3 +77,9 @@ services:
interval: 10s
retries: 5
timeout: 10s
loki:
image: grafana/loki:2.8.0
ports:
- "3100:3100"
command: -config.file=/etc/loki/local-config.yaml