This commit is contained in:
iliax 2023-07-04 11:05:53 +04:00
parent dc95269234
commit cb159e1af5
10 changed files with 120 additions and 25 deletions

View file

@ -17,7 +17,8 @@ abstract class AbstractAuthSecurityConfig {
"/login",
"/logout",
"/oauth2/**",
"/static/**"
"/static/**",
"/api/clusters/**/prometheus/expose/**"
};
}

View file

@ -7,7 +7,7 @@ import org.springframework.beans.factory.annotation.Autowired;
public abstract class AbstractController {
private ClustersStorage clustersStorage;
protected ClustersStorage clustersStorage;
protected KafkaCluster getCluster(String name) {
return clustersStorage.getClusterByName(name)

View file

@ -7,7 +7,7 @@ import com.provectus.kafka.ui.api.PrometheusExposeApi;
import com.provectus.kafka.ui.service.StatisticsCache;
import io.prometheus.client.exporter.common.TextFormat;
import java.io.StringWriter;
import java.io.Writer;
import java.util.ArrayList;
import java.util.List;
import java.util.stream.Stream;
import lombok.RequiredArgsConstructor;
@ -25,11 +25,16 @@ public class PrometheusExposeController extends AbstractController implements Pr
private final StatisticsCache statisticsCache;
@Override
public Mono<ResponseEntity<String>> getAllMetrics(String clusterName, ServerWebExchange exchange) {
public Mono<ResponseEntity<String>> getAllMetrics(ServerWebExchange exchange) {
return constructResponse(getSummarizedMetricsWithClusterLbl());
}
@Override
public Mono<ResponseEntity<String>> getAllClusterMetrics(String clusterName, ServerWebExchange exchange) {
return constructResponse(
statisticsCache.get(getCluster(clusterName))
.getMetrics()
.getSummarizedBrokersMetrics()
.getSummarizedMetrics()
);
}
@ -37,6 +42,7 @@ public class PrometheusExposeController extends AbstractController implements Pr
public Mono<ResponseEntity<String>> getBrokerMetrics(String clusterName,
Long brokerId,
ServerWebExchange exchange) {
//TODO: discuss - do we need to append broker_id lbl ?
return constructResponse(
statisticsCache.get(getCluster(clusterName))
.getMetrics()
@ -46,9 +52,42 @@ public class PrometheusExposeController extends AbstractController implements Pr
);
}
private Stream<MetricFamilySamples> getSummarizedMetricsWithClusterLbl() {
return clustersStorage.getKafkaClusters()
.stream()
.flatMap(c -> statisticsCache.get(c)
.getMetrics()
.getSummarizedMetrics()
.map(mfs -> appendClusterLbl(mfs, c.getName())));
}
private static MetricFamilySamples appendClusterLbl(MetricFamilySamples mfs, String clusterName) {
return new MetricFamilySamples(
mfs.name,
mfs.unit,
mfs.type,
mfs.help,
mfs.samples.stream()
.map(sample ->
new MetricFamilySamples.Sample(
sample.name,
prependToList(sample.labelNames, "cluster"),
prependToList(sample.labelValues, clusterName),
sample.value
)).toList()
);
}
private static <T> List<T> prependToList(List<T> lst, T toPrepend) {
var result = new ArrayList<T>(lst.size() + 1);
result.add(toPrepend);
result.addAll(lst);
return result;
}
@SneakyThrows
private Mono<ResponseEntity<String>> constructResponse(Stream<MetricFamilySamples> metrics) {
Writer writer = new StringWriter();
private static Mono<ResponseEntity<String>> constructResponse(Stream<MetricFamilySamples> metrics) {
StringWriter writer = new StringWriter();
TextFormat.writeOpenMetrics100(writer, Iterators.asEnumeration(metrics.iterator()));
HttpHeaders responseHeaders = new HttpHeaders();

View file

@ -32,10 +32,7 @@ import com.provectus.kafka.ui.model.ReplicaDTO;
import com.provectus.kafka.ui.model.TopicConfigDTO;
import com.provectus.kafka.ui.model.TopicDTO;
import com.provectus.kafka.ui.model.TopicDetailsDTO;
import com.provectus.kafka.ui.service.metrics.RawMetric;
import io.prometheus.client.Collector;
import java.math.BigDecimal;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
@ -62,7 +59,7 @@ public interface ClusterMapper {
@Deprecated
default ClusterMetricsDTO toClusterMetrics(Metrics metrics) {
return new ClusterMetricsDTO()
.items(convert(metrics.getSummarizedBrokersMetrics()).toList());
.items(convert(metrics.getSummarizedMetrics()).toList());
}
private Stream<MetricDTO> convert(Stream<MetricFamilySamples> metrics) {

View file

@ -47,7 +47,7 @@ public class Metrics {
}
}
public Stream<MetricFamilySamples> getSummarizedBrokersMetrics() {
public Stream<MetricFamilySamples> getSummarizedMetrics() {
return Streams.concat(
inferredMetrics.asList().stream(),
perBrokerScrapedMetrics

View file

@ -10,9 +10,8 @@ import java.math.BigDecimal;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.kafka.common.Node;
public class WellKnownMetrics {
public class IoRatesMetricsScanner {
// per broker
final Map<Integer, BigDecimal> brokerBytesInFifteenMinuteRate = new HashMap<>();
@ -22,7 +21,7 @@ public class WellKnownMetrics {
final Map<String, BigDecimal> bytesInFifteenMinuteRate = new HashMap<>();
final Map<String, BigDecimal> bytesOutFifteenMinuteRate = new HashMap<>();
public WellKnownMetrics(Map<Integer, List<Collector.MetricFamilySamples>> perBrokerMetrics) {
public IoRatesMetricsScanner(Map<Integer, List<Collector.MetricFamilySamples>> perBrokerMetrics) {
perBrokerMetrics.forEach((nodeId, metrics) -> {
metrics.forEach(m -> {
RawMetric.create(m).forEach(rawMetric -> {
@ -33,7 +32,7 @@ public class WellKnownMetrics {
});
}
public Metrics.IoRates ioRates() {
public Metrics.IoRates get() {
return Metrics.IoRates.builder()
.topicBytesInPerSec(bytesInFifteenMinuteRate)
.topicBytesOutPerSec(bytesOutFifteenMinuteRate)

View file

@ -18,8 +18,7 @@ public class PerBrokerScrapedMetrics {
}
Metrics.IoRates ioRates() {
//TODO: rename WKMetrics
return new WellKnownMetrics(perBrokerMetrics).ioRates();
return new IoRatesMetricsScanner(perBrokerMetrics).get();
}
}

View file

@ -101,7 +101,7 @@ public class InferredMetricsScraper {
state.logDirSpaceStats().totalBytes()
);
}
//TODO: maybe add per-directory stats also
//TODO: maybe add per-directory stats also?
}
});
}

View file

@ -6,9 +6,9 @@ import java.math.BigDecimal;
import org.apache.kafka.common.Node;
import org.junit.jupiter.api.Test;
class WellKnownMetricsTest {
class IoRatesMetricsScannerTest {
private WellKnownMetrics wellKnownMetrics;
private IoRatesMetricsScanner ioRatesMetricsScanner;
@Test
void bytesIoTopicMetricsPopulated() {
@ -21,9 +21,9 @@ class WellKnownMetricsTest {
"some_unknown_prefix_brokertopicmetrics_fifteenminuterate{name=\"bytesinpersec\",topic=\"test-topic\",} 1.0",
"some_unknown_prefix_brokertopicmetrics_fifteenminuterate{name=\"bytesoutpersec\",topic=\"test-topic\",} 2.0"
);
assertThat(wellKnownMetrics.bytesInFifteenMinuteRate)
assertThat(ioRatesMetricsScanner.bytesInFifteenMinuteRate)
.containsEntry("test-topic", new BigDecimal("3.0"));
assertThat(wellKnownMetrics.bytesOutFifteenMinuteRate)
assertThat(ioRatesMetricsScanner.bytesOutFifteenMinuteRate)
.containsEntry("test-topic", new BigDecimal("6.0"));
}
@ -40,12 +40,12 @@ class WellKnownMetricsTest {
"some_unknown_prefix_brokertopicmetrics_fifteenminuterate{name=\"bytesoutpersec\",} 20.0"
);
assertThat(wellKnownMetrics.brokerBytesInFifteenMinuteRate)
assertThat(ioRatesMetricsScanner.brokerBytesInFifteenMinuteRate)
.hasSize(2)
.containsEntry(1, new BigDecimal("1.0"))
.containsEntry(2, new BigDecimal("10.0"));
assertThat(wellKnownMetrics.brokerBytesOutFifteenMinuteRate)
assertThat(ioRatesMetricsScanner.brokerBytesOutFifteenMinuteRate)
.hasSize(2)
.containsEntry(1, new BigDecimal("2.0"))
.containsEntry(2, new BigDecimal("20.0"));

View file

@ -157,6 +157,66 @@ paths:
schema:
$ref: '#/components/schemas/ClusterMetrics'
/api/prometheus/expose/all:
get:
tags:
- PrometheusExpose
summary: getAllMetrics
operationId: getAllMetrics
responses:
200:
description: OK
content:
application/text:
schema:
type: string
/api/clusters/{clusterName}/prometheus/expose/all:
get:
tags:
- PrometheusExpose
summary: getAllClusterMetrics
operationId: getAllClusterMetrics
parameters:
- name: clusterName
in: path
required: true
schema:
type: string
responses:
200:
description: OK
content:
application/text:
schema:
type: string
/api/clusters/{clusterName}/prometheus/expose/broker/{brokerId}:
get:
tags:
- PrometheusExpose
summary: getBrokerMetrics
operationId: getBrokerMetrics
parameters:
- name: clusterName
in: path
required: true
schema:
type: string
- name: brokerId
in: path
required: true
schema:
type: integer
format: int64
responses:
200:
description: OK
content:
application/text:
schema:
type: string
/api/clusters/{clusterName}/stats:
get:
tags: