Bläddra i källkod

Wait for ready is a parameter.

Mathieu Lecarme 3 år sedan
förälder
incheckning
195adbadeb
2 ändrade filer med 50 tillägg och 9 borttagningar
  1. 13 1
      pkg/acquisition/modules/loki/loki.go
  2. 37 8
      pkg/acquisition/modules/loki/loki_test.go

+ 13 - 1
pkg/acquisition/modules/loki/loki.go

@@ -49,7 +49,8 @@ type LokiConfiguration struct {
 	DelayFor                          time.Duration     `yaml:"delay_for"`
 	Since                             timestamp         `yaml:"since"`
 	TenantID                          string            `yaml:"tenant_id"`
-	Headers                           map[string]string `yaml:"headers"` // HTTP headers for talking to Loki
+	Headers                           map[string]string `yaml:"headers"`        // HTTP headers for talking to Loki
+	WaitForReady                      time.Duration     `yaml:"wait_for_ready"` // Retry interval, default is 10 seconds
 	configuration.DataSourceCommonCfg `yaml:",inline"`
 }
 
@@ -78,6 +79,9 @@ func (l *LokiSource) Configure(config []byte, logger *log.Entry) error {
 	if err != nil {
 		return errors.Wrap(err, "Cannot parse LokiAcquisition configuration")
 	}
+	if l.Config.WaitForReady == 0 {
+		l.Config.WaitForReady = 10 * time.Second
+	}
 	u, err := url.Parse(l.Config.URL)
 	if err != nil {
 		return err
@@ -187,6 +191,14 @@ func (l *LokiSource) ConfigureByDSN(dsn string, labels map[string]string, logger
 	if q := params.Get("query"); q != "" {
 		l.Config.Query = q
 	}
+	if w := params.Get("wait_for_ready"); w != "" {
+		l.Config.WaitForReady, err = time.ParseDuration(w)
+		if err != nil {
+			return err
+		}
+	} else {
+		l.Config.WaitForReady = 10 * time.Second
+	}
 	if d := params.Get("delay_for"); d != "" {
 		delayFor, err := time.ParseDuration(d)
 		if err != nil {

+ 37 - 8
pkg/acquisition/modules/loki/loki_test.go

@@ -22,9 +22,10 @@ func TestConfiguration(t *testing.T) {
 	log.Infof("Test 'TestConfigure'")
 
 	tests := []struct {
-		config      string
-		expectedErr string
-		password    string
+		config       string
+		expectedErr  string
+		password     string
+		waitForReady time.Duration
 	}{
 		{
 			config:      `foobar: asd`,
@@ -57,6 +58,17 @@ url: http://localhost:3100/
 mode: tail
 source: loki
 url: http://localhost:3100/
+query: >
+        {server="demo"}
+`,
+			expectedErr: "",
+		},
+		{
+			config: `
+mode: tail
+source: loki
+url: http://localhost:3100/
+wait_for_ready: 5s
 query: >
         {server="demo"}
 `,
@@ -92,17 +104,23 @@ query: >
 				t.Fatalf("Bad password %s != %s", test.password, p)
 			}
 		}
+		if test.waitForReady != 0 {
+			if lokiSource.Config.WaitForReady != test.waitForReady {
+				t.Fatalf("Wrong WaitForReady %v != %v", lokiSource.Config.WaitForReady, test.waitForReady)
+			}
+		}
 	}
 }
 
 func TestConfigureDSN(t *testing.T) {
 	log.Infof("Test 'TestConfigureDSN'")
 	tests := []struct {
-		name        string
-		dsn         string
-		expectedErr string
-		since       time.Time
-		password    string
+		name         string
+		dsn          string
+		expectedErr  string
+		since        time.Time
+		password     string
+		waitForReady time.Duration
 	}{
 		{
 			name:        "Wrong scheme",
@@ -134,6 +152,12 @@ func TestConfigureDSN(t *testing.T) {
 			dsn:      `loki://login:password@localhost:3100/?query={server="demo"}`,
 			password: "password",
 		},
+		{
+			name:         "Correct DSN",
+			dsn:          `loki://localhost:3100/?query={server="demo"}&wait_for_ready=5s`,
+			expectedErr:  "",
+			waitForReady: 5 * time.Second,
+		},
 	}
 
 	for _, test := range tests {
@@ -161,6 +185,11 @@ func TestConfigureDSN(t *testing.T) {
 				t.Fatalf("Bad auth header : %s", a)
 			}
 		}
+		if test.waitForReady != 0 {
+			if lokiSource.Config.WaitForReady != test.waitForReady {
+				t.Fatalf("Wrong WaitForReady %v != %v", lokiSource.Config.WaitForReady, test.waitForReady)
+			}
+		}
 	}
 }