wip
This commit is contained in:
parent
30510781c6
commit
8e6b47ad85
6 changed files with 19 additions and 19 deletions
|
@ -1,6 +1,7 @@
|
||||||
package com.provectus.kafka.ui.config;
|
package com.provectus.kafka.ui.config;
|
||||||
|
|
||||||
import com.provectus.kafka.ui.model.MetricsScrapeProperties;
|
import static com.provectus.kafka.ui.model.MetricsScrapeProperties.JMX_METRICS_TYPE;
|
||||||
|
|
||||||
import jakarta.annotation.PostConstruct;
|
import jakarta.annotation.PostConstruct;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
|
@ -167,7 +168,7 @@ public class ClustersProperties {
|
||||||
private void setMetricsDefaults() {
|
private void setMetricsDefaults() {
|
||||||
for (Cluster cluster : clusters) {
|
for (Cluster cluster : clusters) {
|
||||||
if (cluster.getMetrics() != null && !StringUtils.hasText(cluster.getMetrics().getType())) {
|
if (cluster.getMetrics() != null && !StringUtils.hasText(cluster.getMetrics().getType())) {
|
||||||
cluster.getMetrics().setType(MetricsScrapeProperties.JMX_METRICS_TYPE);
|
cluster.getMetrics().setType(JMX_METRICS_TYPE);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -3,7 +3,7 @@ package com.provectus.kafka.ui.controller;
|
||||||
import com.provectus.kafka.ui.api.PrometheusExposeApi;
|
import com.provectus.kafka.ui.api.PrometheusExposeApi;
|
||||||
import com.provectus.kafka.ui.model.KafkaCluster;
|
import com.provectus.kafka.ui.model.KafkaCluster;
|
||||||
import com.provectus.kafka.ui.service.StatisticsCache;
|
import com.provectus.kafka.ui.service.StatisticsCache;
|
||||||
import com.provectus.kafka.ui.util.PrometheusEndpointUtil;
|
import com.provectus.kafka.ui.service.metrics.PrometheusEndpointExpose;
|
||||||
import java.util.stream.Collectors;
|
import java.util.stream.Collectors;
|
||||||
import lombok.RequiredArgsConstructor;
|
import lombok.RequiredArgsConstructor;
|
||||||
import org.springframework.http.ResponseEntity;
|
import org.springframework.http.ResponseEntity;
|
||||||
|
@ -20,7 +20,7 @@ public class PrometheusExposeController extends AbstractController implements Pr
|
||||||
@Override
|
@Override
|
||||||
public Mono<ResponseEntity<String>> getAllMetrics(ServerWebExchange exchange) {
|
public Mono<ResponseEntity<String>> getAllMetrics(ServerWebExchange exchange) {
|
||||||
return Mono.just(
|
return Mono.just(
|
||||||
PrometheusEndpointUtil.exposeAllMetrics(
|
PrometheusEndpointExpose.exposeAllMetrics(
|
||||||
clustersStorage.getKafkaClusters()
|
clustersStorage.getKafkaClusters()
|
||||||
.stream()
|
.stream()
|
||||||
.collect(Collectors.toMap(KafkaCluster::getName, c -> statisticsCache.get(c).getMetrics()))
|
.collect(Collectors.toMap(KafkaCluster::getName, c -> statisticsCache.get(c).getMetrics()))
|
||||||
|
@ -31,7 +31,7 @@ public class PrometheusExposeController extends AbstractController implements Pr
|
||||||
@Override
|
@Override
|
||||||
public Mono<ResponseEntity<String>> getAllClusterMetrics(String clusterName, ServerWebExchange exchange) {
|
public Mono<ResponseEntity<String>> getAllClusterMetrics(String clusterName, ServerWebExchange exchange) {
|
||||||
return Mono.just(
|
return Mono.just(
|
||||||
PrometheusEndpointUtil.exposeClusterMetrics(
|
PrometheusEndpointExpose.exposeClusterMetrics(
|
||||||
statisticsCache.get(getCluster(clusterName)).getMetrics()
|
statisticsCache.get(getCluster(clusterName)).getMetrics()
|
||||||
)
|
)
|
||||||
);
|
);
|
||||||
|
@ -42,7 +42,7 @@ public class PrometheusExposeController extends AbstractController implements Pr
|
||||||
Long brokerId,
|
Long brokerId,
|
||||||
ServerWebExchange exchange) {
|
ServerWebExchange exchange) {
|
||||||
return Mono.just(
|
return Mono.just(
|
||||||
PrometheusEndpointUtil.exposeBrokerMetrics(
|
PrometheusEndpointExpose.exposeBrokerMetrics(
|
||||||
statisticsCache.get(getCluster(clusterName)).getMetrics(), brokerId.intValue()
|
statisticsCache.get(getCluster(clusterName)).getMetrics(), brokerId.intValue()
|
||||||
)
|
)
|
||||||
);
|
);
|
||||||
|
|
|
@ -1,4 +1,4 @@
|
||||||
package com.provectus.kafka.ui.util;
|
package com.provectus.kafka.ui.service.metrics;
|
||||||
|
|
||||||
import static io.prometheus.client.Collector.MetricFamilySamples;
|
import static io.prometheus.client.Collector.MetricFamilySamples;
|
||||||
|
|
||||||
|
@ -17,9 +17,9 @@ import lombok.SneakyThrows;
|
||||||
import org.springframework.http.HttpHeaders;
|
import org.springframework.http.HttpHeaders;
|
||||||
import org.springframework.http.ResponseEntity;
|
import org.springframework.http.ResponseEntity;
|
||||||
|
|
||||||
public final class PrometheusEndpointUtil {
|
public final class PrometheusEndpointExpose {
|
||||||
|
|
||||||
private PrometheusEndpointUtil() {
|
private PrometheusEndpointExpose() {
|
||||||
}
|
}
|
||||||
|
|
||||||
public static ResponseEntity<String> exposeAllMetrics(Map<String, Metrics> clustersMetrics) {
|
public static ResponseEntity<String> exposeAllMetrics(Map<String, Metrics> clustersMetrics) {
|
||||||
|
@ -48,7 +48,7 @@ public final class PrometheusEndpointUtil {
|
||||||
.map(mfs -> addLbl(mfs, "cluster", e.getKey())))
|
.map(mfs -> addLbl(mfs, "cluster", e.getKey())))
|
||||||
// merging MFS with same name, keeping order
|
// merging MFS with same name, keeping order
|
||||||
.collect(Collectors.toMap(mfs -> mfs.name, mfs -> mfs,
|
.collect(Collectors.toMap(mfs -> mfs.name, mfs -> mfs,
|
||||||
PrometheusEndpointUtil::concatSamples, LinkedHashMap::new))
|
PrometheusEndpointExpose::concatSamples, LinkedHashMap::new))
|
||||||
.values()
|
.values()
|
||||||
.stream();
|
.stream();
|
||||||
}
|
}
|
|
@ -38,7 +38,7 @@ public class MetricsScrapping {
|
||||||
var metrics = cluster.getMetrics();
|
var metrics = cluster.getMetrics();
|
||||||
if (cluster.getMetrics() != null) {
|
if (cluster.getMetrics() != null) {
|
||||||
var scrapeProperties = createScrapeProps(cluster);
|
var scrapeProperties = createScrapeProps(cluster);
|
||||||
if (metrics.getType() == null || metrics.getType().equalsIgnoreCase(JMX_METRICS_TYPE)) {
|
if (metrics.getType().equalsIgnoreCase(JMX_METRICS_TYPE) && metrics.getPort() != null) {
|
||||||
jmxMetricsScraper = new JmxMetricsScraper(scrapeProperties, jmxMetricsRetriever);
|
jmxMetricsScraper = new JmxMetricsScraper(scrapeProperties, jmxMetricsRetriever);
|
||||||
} else if (metrics.getType().equalsIgnoreCase(PROMETHEUS_METRICS_TYPE)) {
|
} else if (metrics.getType().equalsIgnoreCase(PROMETHEUS_METRICS_TYPE)) {
|
||||||
prometheusScraper = new PrometheusScraper(scrapeProperties);
|
prometheusScraper = new PrometheusScraper(scrapeProperties);
|
||||||
|
|
|
@ -44,12 +44,12 @@ public class JmxMetricsRetriever implements Closeable {
|
||||||
JmxSslSocketFactory.clearFactoriesCache();
|
JmxSslSocketFactory.clearFactoriesCache();
|
||||||
}
|
}
|
||||||
|
|
||||||
public Mono<List<RawMetric>> retrieveFromNode(MetricsScrapeProperties metricsConfig, Node node) {
|
public Mono<List<RawMetric>> retrieveFromNode(MetricsScrapeProperties scrapeProperties, Node node) {
|
||||||
if (isSslJmxEndpoint(metricsConfig) && !SSL_JMX_SUPPORTED) {
|
if (isSslJmxEndpoint(scrapeProperties) && !SSL_JMX_SUPPORTED) {
|
||||||
log.warn("Cluster has jmx ssl configured, but it is not supported by app");
|
log.warn("Cluster has jmx ssl configured, but it is not supported by app");
|
||||||
return Mono.just(List.of());
|
return Mono.just(List.of());
|
||||||
}
|
}
|
||||||
return Mono.fromSupplier(() -> retrieveSync(metricsConfig, node))
|
return Mono.fromSupplier(() -> retrieveSync(scrapeProperties, node))
|
||||||
.subscribeOn(Schedulers.boundedElastic());
|
.subscribeOn(Schedulers.boundedElastic());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -4,13 +4,12 @@ import static com.provectus.kafka.ui.service.metrics.scrape.prometheus.Prometheu
|
||||||
import static io.prometheus.client.Collector.MetricFamilySamples;
|
import static io.prometheus.client.Collector.MetricFamilySamples;
|
||||||
import static io.prometheus.client.Collector.MetricFamilySamples.Sample;
|
import static io.prometheus.client.Collector.MetricFamilySamples.Sample;
|
||||||
import static io.prometheus.client.Collector.Type;
|
import static io.prometheus.client.Collector.Type;
|
||||||
import static java.lang.Double.NaN;
|
|
||||||
import static java.lang.Double.POSITIVE_INFINITY;
|
import static java.lang.Double.POSITIVE_INFINITY;
|
||||||
import static org.assertj.core.api.Assertions.assertThat;
|
import static org.assertj.core.api.Assertions.assertThat;
|
||||||
|
|
||||||
import com.google.common.collect.Iterators;
|
import com.google.common.collect.Iterators;
|
||||||
import com.google.common.collect.Lists;
|
import com.google.common.collect.Lists;
|
||||||
import com.provectus.kafka.ui.util.PrometheusEndpointUtil;
|
import com.provectus.kafka.ui.service.metrics.PrometheusEndpointExpose;
|
||||||
import io.prometheus.client.Collector;
|
import io.prometheus.client.Collector;
|
||||||
import io.prometheus.client.CollectorRegistry;
|
import io.prometheus.client.CollectorRegistry;
|
||||||
import io.prometheus.client.Counter;
|
import io.prometheus.client.Counter;
|
||||||
|
@ -27,7 +26,7 @@ class PrometheusEndpointParserTest {
|
||||||
@Test
|
@Test
|
||||||
void parsesAllGeneratedMetricTypes() {
|
void parsesAllGeneratedMetricTypes() {
|
||||||
List<MetricFamilySamples> original = generateMfs();
|
List<MetricFamilySamples> original = generateMfs();
|
||||||
String exposed = PrometheusEndpointUtil.constructResponse(original.stream()).getBody();
|
String exposed = PrometheusEndpointExpose.constructResponse(original.stream()).getBody();
|
||||||
List<MetricFamilySamples> parsed = parse(exposed.lines());
|
List<MetricFamilySamples> parsed = parse(exposed.lines());
|
||||||
assertThat(parsed).containsExactlyElementsOf(original);
|
assertThat(parsed).containsExactlyElementsOf(original);
|
||||||
}
|
}
|
||||||
|
@ -37,8 +36,8 @@ class PrometheusEndpointParserTest {
|
||||||
String expose = """
|
String expose = """
|
||||||
# HELP http_requests_total The total number of HTTP requests.
|
# HELP http_requests_total The total number of HTTP requests.
|
||||||
# TYPE http_requests_total counter
|
# TYPE http_requests_total counter
|
||||||
http_requests_total{method="post",code="200"} 1027 1395066363000
|
http_requests_total{method="post",code="200",} 1027 1395066363000
|
||||||
http_requests_total{method="post",code="400"} 3 1395066363000
|
http_requests_total{method="post",code="400",} 3 1395066363000
|
||||||
# Minimalistic line:
|
# Minimalistic line:
|
||||||
metric_without_timestamp_and_labels 12.47
|
metric_without_timestamp_and_labels 12.47
|
||||||
|
|
||||||
|
|
Loading…
Add table
Reference in a new issue