[Loki] Set headers/basic auth if set for queryRange (#2815)
This commit is contained in:
parent
df159b0167
commit
fa56d35a48
2 changed files with 49 additions and 27 deletions
|
@ -25,6 +25,7 @@ type LokiClient struct {
|
|||
t *tomb.Tomb
|
||||
fail_start time.Time
|
||||
currentTickerInterval time.Duration
|
||||
requestHeaders map[string]string
|
||||
}
|
||||
|
||||
type Config struct {
|
||||
|
@ -116,7 +117,7 @@ func (lc *LokiClient) queryRange(uri string, ctx context.Context, c chan *LokiQu
|
|||
case <-lc.t.Dying():
|
||||
return lc.t.Err()
|
||||
case <-ticker.C:
|
||||
resp, err := http.Get(uri)
|
||||
resp, err := lc.Get(uri)
|
||||
if err != nil {
|
||||
if ok := lc.shouldRetry(); !ok {
|
||||
return errors.Wrapf(err, "error querying range")
|
||||
|
@ -127,6 +128,7 @@ func (lc *LokiClient) queryRange(uri string, ctx context.Context, c chan *LokiQu
|
|||
}
|
||||
|
||||
if resp.StatusCode != http.StatusOK {
|
||||
lc.Logger.Warnf("bad HTTP response code for query range: %d", resp.StatusCode)
|
||||
body, _ := io.ReadAll(resp.Body)
|
||||
resp.Body.Close()
|
||||
if ok := lc.shouldRetry(); !ok {
|
||||
|
@ -215,7 +217,7 @@ func (lc *LokiClient) Ready(ctx context.Context) error {
|
|||
return lc.t.Err()
|
||||
case <-tick.C:
|
||||
lc.Logger.Debug("Checking if Loki is ready")
|
||||
resp, err := http.Get(url)
|
||||
resp, err := lc.Get(url)
|
||||
if err != nil {
|
||||
lc.Logger.Warnf("Error checking if Loki is ready: %s", err)
|
||||
continue
|
||||
|
@ -251,10 +253,9 @@ func (lc *LokiClient) Tail(ctx context.Context) (chan *LokiResponse, error) {
|
|||
}
|
||||
|
||||
requestHeader := http.Header{}
|
||||
for k, v := range lc.config.Headers {
|
||||
for k, v := range lc.requestHeaders {
|
||||
requestHeader.Add(k, v)
|
||||
}
|
||||
requestHeader.Set("User-Agent", "Crowdsec "+cwversion.VersionStr())
|
||||
lc.Logger.Infof("Connecting to %s", u)
|
||||
conn, _, err := dialer.Dial(u, requestHeader)
|
||||
|
||||
|
@ -293,16 +294,6 @@ func (lc *LokiClient) QueryRange(ctx context.Context, infinite bool) chan *LokiQ
|
|||
|
||||
lc.Logger.Debugf("Since: %s (%s)", lc.config.Since, time.Now().Add(-lc.config.Since))
|
||||
|
||||
requestHeader := http.Header{}
|
||||
for k, v := range lc.config.Headers {
|
||||
requestHeader.Add(k, v)
|
||||
}
|
||||
|
||||
if lc.config.Username != "" || lc.config.Password != "" {
|
||||
requestHeader.Set("Authorization", "Basic "+base64.StdEncoding.EncodeToString([]byte(lc.config.Username+":"+lc.config.Password)))
|
||||
}
|
||||
|
||||
requestHeader.Set("User-Agent", "Crowdsec "+cwversion.VersionStr())
|
||||
lc.Logger.Infof("Connecting to %s", url)
|
||||
lc.t.Go(func() error {
|
||||
return lc.queryRange(url, ctx, c, infinite)
|
||||
|
@ -310,6 +301,26 @@ func (lc *LokiClient) QueryRange(ctx context.Context, infinite bool) chan *LokiQ
|
|||
return c
|
||||
}
|
||||
|
||||
func NewLokiClient(config Config) *LokiClient {
|
||||
return &LokiClient{Logger: log.WithField("component", "lokiclient"), config: config}
|
||||
// Create a wrapper for http.Get to be able to set headers and auth
|
||||
func (lc *LokiClient) Get(url string) (*http.Response, error) {
|
||||
request, err := http.NewRequest(http.MethodGet, url, nil)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
for k, v := range lc.requestHeaders {
|
||||
request.Header.Add(k, v)
|
||||
}
|
||||
return http.DefaultClient.Do(request)
|
||||
}
|
||||
|
||||
func NewLokiClient(config Config) *LokiClient {
|
||||
headers := make(map[string]string)
|
||||
for k, v := range config.Headers {
|
||||
headers[k] = v
|
||||
}
|
||||
if config.Username != "" || config.Password != "" {
|
||||
headers["Authorization"] = "Basic " + base64.StdEncoding.EncodeToString([]byte(config.Username+":"+config.Password))
|
||||
}
|
||||
headers["User-Agent"] = "Crowdsec " + cwversion.VersionStr()
|
||||
return &LokiClient{Logger: log.WithField("component", "lokiclient"), config: config, requestHeaders: headers}
|
||||
}
|
||||
|
|
|
@ -276,10 +276,17 @@ func feedLoki(logger *log.Entry, n int, title string) error {
|
|||
if err != nil {
|
||||
return err
|
||||
}
|
||||
resp, err := http.Post("http://127.0.0.1:3100/loki/api/v1/push", "application/json", bytes.NewBuffer(buff))
|
||||
req, err := http.NewRequest(http.MethodPost, "http://127.0.0.1:3100/loki/api/v1/push", bytes.NewBuffer(buff))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
req.Header.Set("Content-Type", "application/json")
|
||||
req.Header.Set("X-Scope-OrgID", "1234")
|
||||
resp, err := http.DefaultClient.Do(req)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
if resp.StatusCode != http.StatusNoContent {
|
||||
b, _ := io.ReadAll(resp.Body)
|
||||
logger.Error(string(b))
|
||||
|
@ -306,6 +313,8 @@ mode: cat
|
|||
source: loki
|
||||
url: http://127.0.0.1:3100
|
||||
query: '{server="demo",key="%s"}'
|
||||
headers:
|
||||
x-scope-orgid: "1234"
|
||||
since: 1h
|
||||
`, title),
|
||||
},
|
||||
|
@ -362,26 +371,26 @@ func TestStreamingAcquisition(t *testing.T) {
|
|||
}{
|
||||
{
|
||||
name: "Bad port",
|
||||
config: `
|
||||
mode: tail
|
||||
config: `mode: tail
|
||||
source: loki
|
||||
url: http://127.0.0.1:3101
|
||||
url: "http://127.0.0.1:3101"
|
||||
headers:
|
||||
x-scope-orgid: "1234"
|
||||
query: >
|
||||
{server="demo"}
|
||||
`, // No Loki server here
|
||||
{server="demo"}`, // No Loki server here
|
||||
expectedErr: "",
|
||||
streamErr: `loki is not ready: context deadline exceeded`,
|
||||
expectedLines: 0,
|
||||
},
|
||||
{
|
||||
name: "ok",
|
||||
config: `
|
||||
mode: tail
|
||||
config: `mode: tail
|
||||
source: loki
|
||||
url: http://127.0.0.1:3100
|
||||
url: "http://127.0.0.1:3100"
|
||||
headers:
|
||||
x-scope-orgid: "1234"
|
||||
query: >
|
||||
{server="demo"}
|
||||
`,
|
||||
{server="demo"}`,
|
||||
expectedErr: "",
|
||||
streamErr: "",
|
||||
expectedLines: 20,
|
||||
|
@ -456,6 +465,8 @@ func TestStopStreaming(t *testing.T) {
|
|||
mode: tail
|
||||
source: loki
|
||||
url: http://127.0.0.1:3100
|
||||
headers:
|
||||
x-scope-orgid: "1234"
|
||||
query: >
|
||||
{server="demo"}
|
||||
`
|
||||
|
|
Loading…
Reference in a new issue