Loky query is mandatory
This commit is contained in:
parent
48431f20ed
commit
bf138b9231
2 changed files with 105 additions and 12 deletions
|
@ -99,6 +99,9 @@ func (l *LokiSource) Configure(config []byte, logger *log.Entry) error {
|
|||
}
|
||||
|
||||
func (l *LokiSource) prepareConfig() error {
|
||||
if l.Config.Query == "" {
|
||||
return errors.New("Loki query is mandatory")
|
||||
}
|
||||
l.dialer = &websocket.Dialer{}
|
||||
l.header = http.Header{}
|
||||
if l.Config.TenantID != "" {
|
||||
|
|
|
@ -49,6 +49,16 @@ url: stuff://localhost:3100
|
|||
mode: tail
|
||||
source: loki
|
||||
url: http://localhost:3100/
|
||||
`,
|
||||
expectedErr: "Loki query is mandatory",
|
||||
},
|
||||
{
|
||||
config: `
|
||||
mode: tail
|
||||
source: loki
|
||||
url: http://localhost:3100/
|
||||
query: >
|
||||
{server="demo"}
|
||||
`,
|
||||
expectedErr: "",
|
||||
},
|
||||
|
@ -58,6 +68,8 @@ url: http://localhost:3100/
|
|||
mode: tail
|
||||
source: loki
|
||||
url: http://foo:bar@localhost:3100/
|
||||
query: >
|
||||
{server="demo"}
|
||||
`,
|
||||
expectedErr: "",
|
||||
password: "bar",
|
||||
|
@ -99,7 +111,7 @@ func TestConfigureDSN(t *testing.T) {
|
|||
},
|
||||
{
|
||||
name: "Correct DSN",
|
||||
dsn: "loki://localhost:3100/",
|
||||
dsn: `loki://localhost:3100/?query={server="demo"}`,
|
||||
expectedErr: "",
|
||||
},
|
||||
{
|
||||
|
@ -114,12 +126,12 @@ func TestConfigureDSN(t *testing.T) {
|
|||
},
|
||||
{
|
||||
name: "Bad since param",
|
||||
dsn: "loki://127.0.0.1:3100/?since=3h",
|
||||
dsn: `loki://127.0.0.1:3100/?since=3h&query={server="demo"}`,
|
||||
since: time.Now().Add(-3 * time.Hour),
|
||||
},
|
||||
{
|
||||
name: "Basic Auth",
|
||||
dsn: "loki://login:password@localhost:3100",
|
||||
dsn: `loki://login:password@localhost:3100/?query={server="demo"}`,
|
||||
password: "password",
|
||||
},
|
||||
}
|
||||
|
@ -152,12 +164,89 @@ func TestConfigureDSN(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
func _TestOneShotAcquisition(t *testing.T) {
|
||||
log.SetOutput(os.Stdout)
|
||||
log.SetLevel(log.InfoLevel)
|
||||
log.Info("Test 'TestStreamingAcquisition'")
|
||||
title := time.Now().String()
|
||||
tests := []struct {
|
||||
config string
|
||||
}{
|
||||
{
|
||||
config: `
|
||||
mode: cat
|
||||
source: loki
|
||||
url: http://127.0.0.1:3100
|
||||
query: >
|
||||
{server="demo"}
|
||||
`,
|
||||
},
|
||||
}
|
||||
|
||||
for _, ts := range tests {
|
||||
logger := log.New()
|
||||
logger.SetLevel(log.InfoLevel)
|
||||
subLogger := logger.WithFields(log.Fields{
|
||||
"type": "loki",
|
||||
})
|
||||
lokiSource := LokiSource{}
|
||||
err := lokiSource.Configure([]byte(ts.config), subLogger)
|
||||
if err != nil {
|
||||
t.Fatalf("Unexpected error : %s", err)
|
||||
}
|
||||
|
||||
streams := LogStreams{
|
||||
Streams: []LogStream{
|
||||
{
|
||||
Stream: map[string]string{
|
||||
"server": "demo",
|
||||
"domain": "cw.example.com",
|
||||
},
|
||||
Values: make([]LogValue, 20),
|
||||
},
|
||||
},
|
||||
}
|
||||
for i := 0; i < 20; i++ {
|
||||
streams.Streams[0].Values[i] = LogValue{
|
||||
Time: time.Now(),
|
||||
Line: fmt.Sprintf("Log line #%d %v", i, title),
|
||||
}
|
||||
}
|
||||
buff, err := json.Marshal(streams)
|
||||
if err != nil {
|
||||
t.Fatalf("Unexpected error : %s", err)
|
||||
}
|
||||
resp, err := http.Post("http://127.0.0.1:3100/loki/api/v1/push", "application/json", bytes.NewBuffer(buff))
|
||||
if err != nil {
|
||||
t.Fatalf("Unexpected error : %s", err)
|
||||
}
|
||||
if resp.StatusCode != 204 {
|
||||
b, _ := ioutil.ReadAll(resp.Body)
|
||||
log.Error(string(b))
|
||||
t.Fatalf("Bad post status %d", resp.StatusCode)
|
||||
}
|
||||
subLogger.Info("20 Events sent")
|
||||
|
||||
out := make(chan types.Event)
|
||||
lokiTomb := tomb.Tomb{}
|
||||
err = lokiSource.OneShotAcquisition(out, &lokiTomb)
|
||||
if err != nil {
|
||||
t.Fatalf("Unexpected error : %s", err)
|
||||
}
|
||||
err = lokiTomb.Wait()
|
||||
if err != nil {
|
||||
t.Fatalf("Unexpected error : %s", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestStreamingAcquisition(t *testing.T) {
|
||||
log.SetOutput(os.Stdout)
|
||||
log.SetLevel(log.InfoLevel)
|
||||
log.Info("Test 'TestStreamingAcquisition'")
|
||||
title := time.Now().String()
|
||||
tests := []struct {
|
||||
name string
|
||||
config string
|
||||
expectedErr string
|
||||
streamErr string
|
||||
|
@ -167,10 +256,13 @@ func TestStreamingAcquisition(t *testing.T) {
|
|||
logLevel log.Level
|
||||
}{
|
||||
{
|
||||
name: "Bad port",
|
||||
config: `
|
||||
mode: tail
|
||||
source: loki
|
||||
url: http://127.0.0.1:3101
|
||||
query: >
|
||||
{server="demo"}
|
||||
`, // No Loki server here
|
||||
expectedErr: "",
|
||||
streamErr: `Get "http://127.0.0.1:3101/ready": dial tcp 127.0.0.1:3101: connect: connection refused`,
|
||||
|
@ -180,6 +272,7 @@ url: http://127.0.0.1:3101
|
|||
logLevel: log.InfoLevel,
|
||||
},
|
||||
{
|
||||
name: "ok",
|
||||
config: `
|
||||
mode: tail
|
||||
source: loki
|
||||
|
@ -196,17 +289,14 @@ query: >
|
|||
},
|
||||
}
|
||||
for _, ts := range tests {
|
||||
var logger *log.Logger
|
||||
var subLogger *log.Entry
|
||||
logger := log.New()
|
||||
subLogger := logger.WithFields(log.Fields{
|
||||
"type": "loki",
|
||||
"name": ts.name,
|
||||
})
|
||||
|
||||
if ts.expectedOutput != "" {
|
||||
logger.SetLevel(ts.logLevel)
|
||||
subLogger = logger.WithFields(log.Fields{
|
||||
"type": "loki",
|
||||
})
|
||||
} else {
|
||||
subLogger = log.WithFields(log.Fields{
|
||||
"type": "loki",
|
||||
})
|
||||
}
|
||||
out := make(chan types.Event)
|
||||
lokiTomb := tomb.Tomb{}
|
||||
|
|
Loading…
Reference in a new issue