Bladeren bron

[BE] Impl basic auth for prometheus (#3201)

* fix(3143): passes prometheus basic auth credentials

Co-authored-by: theurichde <tim.heurich@idealo.de>

* fix(3143): applies review comments

Co-authored-by: theurichde <tim.heurich@idealo.de>

Co-authored-by: theurichde <tim.heurich@idealo.de>
Co-authored-by: Roman Zabaluev <rzabaluev@provectus.com>
Wanis Fahmy 2 jaren geleden
bovenliggende
commit
f55376b40b

+ 15 - 8
kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/metrics/PrometheusMetricsRetriever.java

@@ -3,6 +3,7 @@ package com.provectus.kafka.ui.service.metrics;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Strings;
 import com.provectus.kafka.ui.model.KafkaCluster;
+import com.provectus.kafka.ui.model.MetricsConfig;
 import java.util.Arrays;
 import java.util.Optional;
 import lombok.RequiredArgsConstructor;
@@ -27,20 +28,26 @@ class PrometheusMetricsRetriever implements MetricsRetriever {
   @Override
   public Flux<RawMetric> retrieve(KafkaCluster c, Node node) {
     log.debug("Retrieving metrics from prometheus exporter: {}:{}", node.host(), c.getMetricsConfig().getPort());
-    var metricsConfig = c.getMetricsConfig();
-    Integer port = Optional.ofNullable(metricsConfig.getPort()).orElse(DEFAULT_EXPORTER_PORT);
-    return retrieve(node.host(), port, metricsConfig.isSsl());
+    return retrieve(node.host(), c.getMetricsConfig());
   }
 
   @VisibleForTesting
-  Flux<RawMetric> retrieve(String host, int port, boolean ssl) {
-    WebClient.ResponseSpec responseSpec = webClient.get()
+  Flux<RawMetric> retrieve(String host, MetricsConfig metricsConfig) {
+    int port = Optional.ofNullable(metricsConfig.getPort()).orElse(DEFAULT_EXPORTER_PORT);
+
+    var request = webClient.get()
         .uri(UriComponentsBuilder.newInstance()
-            .scheme(ssl ? "https" : "http")
+            .scheme(metricsConfig.isSsl() ? "https" : "http")
             .host(host)
             .port(port)
-            .path(METRICS_ENDPOINT_PATH).build().toUri())
-        .retrieve();
+            .path(METRICS_ENDPOINT_PATH).build().toUri());
+
+    if (metricsConfig.getUsername() != null && metricsConfig.getPassword() != null) {
+      request.headers(
+          httpHeaders -> httpHeaders.setBasicAuth(metricsConfig.getUsername(), metricsConfig.getPassword()));
+    }
+
+    WebClient.ResponseSpec responseSpec = request.retrieve();
 
     return responseSpec.bodyToMono(String.class)
         .doOnError(e -> log.error("Error while getting metrics from {}", host, e))

+ 44 - 11
kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/metrics/PrometheusMetricsRetrieverTest.java

@@ -1,7 +1,9 @@
 package com.provectus.kafka.ui.service.metrics;
 
+import com.provectus.kafka.ui.model.MetricsConfig;
 import java.io.IOException;
 import java.math.BigDecimal;
+import java.util.List;
 import java.util.Map;
 import okhttp3.mockwebserver.MockResponse;
 import okhttp3.mockwebserver.MockWebServer;
@@ -30,8 +32,33 @@ class PrometheusMetricsRetrieverTest {
   @Test
   void callsMetricsEndpointAndConvertsResponceToRawMetric() {
     var url = mockWebServer.url("/metrics");
+    mockWebServer.enqueue(prepareResponse());
+
+    MetricsConfig metricsConfig = prepareMetricsConfig(url.port(), null, null);
+
+    StepVerifier.create(retriever.retrieve(url.host(), metricsConfig))
+        .expectNextSequence(expectedRawMetrics())
+        // third metric should not be present, since it has "NaN" value
+        .verifyComplete();
+  }
+
+  @Test
+  void callsSecureMetricsEndpointAndConvertsResponceToRawMetric() {
+    var url = mockWebServer.url("/metrics");
+    mockWebServer.enqueue(prepareResponse());
+
+
+    MetricsConfig metricsConfig = prepareMetricsConfig(url.port(), "username", "password");
+
+    StepVerifier.create(retriever.retrieve(url.host(), metricsConfig))
+        .expectNextSequence(expectedRawMetrics())
+        // third metric should not be present, since it has "NaN" value
+        .verifyComplete();
+  }
+
+  MockResponse prepareResponse() {
     // body copied from real jmx exporter
-    MockResponse response = new MockResponse().setBody(
+    return new MockResponse().setBody(
         "# HELP kafka_server_KafkaRequestHandlerPool_FifteenMinuteRate Attribute exposed for management \n"
             + "# TYPE kafka_server_KafkaRequestHandlerPool_FifteenMinuteRate untyped\n"
             + "kafka_server_KafkaRequestHandlerPool_FifteenMinuteRate{name=\"RequestHandlerAvgIdlePercent\",} 0.898\n"
@@ -40,7 +67,19 @@ class PrometheusMetricsRetrieverTest {
             + "kafka_server_socket_server_metrics_request_size_avg{listener=\"PLAIN\",networkProcessor=\"1\",} 101.1\n"
             + "kafka_server_socket_server_metrics_request_size_avg{listener=\"PLAIN2\",networkProcessor=\"5\",} NaN"
     );
-    mockWebServer.enqueue(response);
+  }
+
+  MetricsConfig prepareMetricsConfig(Integer port, String username, String password) {
+    return MetricsConfig.builder()
+        .ssl(false)
+        .port(port)
+        .type(MetricsConfig.PROMETHEUS_METRICS_TYPE)
+        .username(username)
+        .password(password)
+        .build();
+  }
+
+  List<RawMetric> expectedRawMetrics() {
 
     var firstMetric = RawMetric.create(
         "kafka_server_KafkaRequestHandlerPool_FifteenMinuteRate",
@@ -48,17 +87,11 @@ class PrometheusMetricsRetrieverTest {
         new BigDecimal("0.898")
     );
 
-    var second = RawMetric.create(
+    var secondMetric = RawMetric.create(
         "kafka_server_socket_server_metrics_request_size_avg",
         Map.of("listener", "PLAIN", "networkProcessor", "1"),
         new BigDecimal("101.1")
     );
-
-    StepVerifier.create(retriever.retrieve(url.host(), url.port(), false))
-        .expectNext(firstMetric)
-        .expectNext(second)
-        // third metric should not be present, since it has "NaN" value
-        .verifyComplete();
+    return List.of(firstMetric, secondMetric);
   }
-
-}
+}