From bf138b9231de1649c20cddcbd5a7e7636528aece Mon Sep 17 00:00:00 2001 From: Mathieu Lecarme Date: Wed, 15 Jun 2022 10:32:59 +0200 Subject: [PATCH] Loky query is mandatory --- pkg/acquisition/modules/loki/loki.go | 3 + pkg/acquisition/modules/loki/loki_test.go | 114 +++++++++++++++++++--- 2 files changed, 105 insertions(+), 12 deletions(-) diff --git a/pkg/acquisition/modules/loki/loki.go b/pkg/acquisition/modules/loki/loki.go index db5b199b0..63ca6208c 100644 --- a/pkg/acquisition/modules/loki/loki.go +++ b/pkg/acquisition/modules/loki/loki.go @@ -99,6 +99,9 @@ func (l *LokiSource) Configure(config []byte, logger *log.Entry) error { } func (l *LokiSource) prepareConfig() error { + if l.Config.Query == "" { + return errors.New("Loki query is mandatory") + } l.dialer = &websocket.Dialer{} l.header = http.Header{} if l.Config.TenantID != "" { diff --git a/pkg/acquisition/modules/loki/loki_test.go b/pkg/acquisition/modules/loki/loki_test.go index 683a3c310..5bc273d8e 100644 --- a/pkg/acquisition/modules/loki/loki_test.go +++ b/pkg/acquisition/modules/loki/loki_test.go @@ -49,6 +49,16 @@ url: stuff://localhost:3100 mode: tail source: loki url: http://localhost:3100/ +`, + expectedErr: "Loki query is mandatory", + }, + { + config: ` +mode: tail +source: loki +url: http://localhost:3100/ +query: > + {server="demo"} `, expectedErr: "", }, @@ -58,6 +68,8 @@ url: http://localhost:3100/ mode: tail source: loki url: http://foo:bar@localhost:3100/ +query: > + {server="demo"} `, expectedErr: "", password: "bar", @@ -99,7 +111,7 @@ func TestConfigureDSN(t *testing.T) { }, { name: "Correct DSN", - dsn: "loki://localhost:3100/", + dsn: `loki://localhost:3100/?query={server="demo"}`, expectedErr: "", }, { @@ -114,12 +126,12 @@ func TestConfigureDSN(t *testing.T) { }, { name: "Bad since param", - dsn: "loki://127.0.0.1:3100/?since=3h", + dsn: `loki://127.0.0.1:3100/?since=3h&query={server="demo"}`, since: time.Now().Add(-3 * time.Hour), }, { name: "Basic Auth", - dsn: "loki://login:password@localhost:3100", + dsn: `loki://login:password@localhost:3100/?query={server="demo"}`, password: "password", }, } @@ -152,12 +164,89 @@ func TestConfigureDSN(t *testing.T) { } } +func _TestOneShotAcquisition(t *testing.T) { + log.SetOutput(os.Stdout) + log.SetLevel(log.InfoLevel) + log.Info("Test 'TestStreamingAcquisition'") + title := time.Now().String() + tests := []struct { + config string + }{ + { + config: ` +mode: cat +source: loki +url: http://127.0.0.1:3100 +query: > + {server="demo"} +`, + }, + } + + for _, ts := range tests { + logger := log.New() + logger.SetLevel(log.InfoLevel) + subLogger := logger.WithFields(log.Fields{ + "type": "loki", + }) + lokiSource := LokiSource{} + err := lokiSource.Configure([]byte(ts.config), subLogger) + if err != nil { + t.Fatalf("Unexpected error : %s", err) + } + + streams := LogStreams{ + Streams: []LogStream{ + { + Stream: map[string]string{ + "server": "demo", + "domain": "cw.example.com", + }, + Values: make([]LogValue, 20), + }, + }, + } + for i := 0; i < 20; i++ { + streams.Streams[0].Values[i] = LogValue{ + Time: time.Now(), + Line: fmt.Sprintf("Log line #%d %v", i, title), + } + } + buff, err := json.Marshal(streams) + if err != nil { + t.Fatalf("Unexpected error : %s", err) + } + resp, err := http.Post("http://127.0.0.1:3100/loki/api/v1/push", "application/json", bytes.NewBuffer(buff)) + if err != nil { + t.Fatalf("Unexpected error : %s", err) + } + if resp.StatusCode != 204 { + b, _ := ioutil.ReadAll(resp.Body) + log.Error(string(b)) + t.Fatalf("Bad post status %d", resp.StatusCode) + } + subLogger.Info("20 Events sent") + + out := make(chan types.Event) + lokiTomb := tomb.Tomb{} + err = lokiSource.OneShotAcquisition(out, &lokiTomb) + if err != nil { + t.Fatalf("Unexpected error : %s", err) + } + err = lokiTomb.Wait() + if err != nil { + t.Fatalf("Unexpected error : %s", err) + } + } +} + func TestStreamingAcquisition(t *testing.T) { log.SetOutput(os.Stdout) log.SetLevel(log.InfoLevel) log.Info("Test 'TestStreamingAcquisition'") title := time.Now().String() tests := []struct { + name string config string expectedErr string streamErr string @@ -167,10 +256,13 @@ func TestStreamingAcquisition(t *testing.T) { logLevel log.Level }{ { + name: "Bad port", config: ` mode: tail source: loki url: http://127.0.0.1:3101 +query: > + {server="demo"} `, // No Loki server here expectedErr: "", streamErr: `Get "http://127.0.0.1:3101/ready": dial tcp 127.0.0.1:3101: connect: connection refused`, @@ -180,6 +272,7 @@ url: http://127.0.0.1:3101 logLevel: log.InfoLevel, }, { + name: "ok", config: ` mode: tail source: loki @@ -196,17 +289,14 @@ query: > }, } for _, ts := range tests { - var logger *log.Logger - var subLogger *log.Entry + logger := log.New() + subLogger := logger.WithFields(log.Fields{ + "type": "loki", + "name": ts.name, + }) + if ts.expectedOutput != "" { logger.SetLevel(ts.logLevel) - subLogger = logger.WithFields(log.Fields{ - "type": "loki", - }) - } else { - subLogger = log.WithFields(log.Fields{ - "type": "loki", - }) } out := make(chan types.Event) lokiTomb := tomb.Tomb{}