From a9a22b40256bc3d51def1e07a30107d7065df8f1 Mon Sep 17 00:00:00 2001 From: iliax Date: Tue, 18 Jul 2023 17:02:53 +0400 Subject: [PATCH] wip --- .../kafka/ui/config/ClustersProperties.java | 1 + .../PrometheusExposeController.java | 13 +- .../kafka/ui/model/KafkaCluster.java | 3 + .../kafka/ui/service/KafkaClusterFactory.java | 60 +++- .../ui/util/KafkaServicesValidation.java | 16 +- .../main/resources/swagger/kafka-ui-api.yaml | 2 + .../swagger/prometheus-query-api.yaml | 309 +++++++----------- 7 files changed, 207 insertions(+), 197 deletions(-) diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/config/ClustersProperties.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/config/ClustersProperties.java index d0ca9df5d5..adbf5839eb 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/config/ClustersProperties.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/config/ClustersProperties.java @@ -75,6 +75,7 @@ public class ClustersProperties { String keystoreLocation; String keystorePassword; + Boolean prometheusExpose; MetricsStorage store; } diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/PrometheusExposeController.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/PrometheusExposeController.java index c01de33973..6fcf634170 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/PrometheusExposeController.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/PrometheusExposeController.java @@ -23,6 +23,7 @@ public class PrometheusExposeController extends AbstractController implements Pr PrometheusExpose.exposeAllMetrics( clustersStorage.getKafkaClusters() .stream() + .filter(KafkaCluster::isExposeMetricsViaPrometheusEndpoint) .collect(Collectors.toMap(KafkaCluster::getName, c -> statisticsCache.get(c).getMetrics())) ) ); @@ -30,9 +31,13 @@ public class PrometheusExposeController extends AbstractController implements Pr @Override public Mono> getAllClusterMetrics(String clusterName, ServerWebExchange exchange) { + var cluster = getCluster(clusterName); + if (!cluster.isExposeMetricsViaPrometheusEndpoint()) { + return Mono.empty(); + } return Mono.just( PrometheusExpose.exposeClusterMetrics( - statisticsCache.get(getCluster(clusterName)).getMetrics() + statisticsCache.get(cluster).getMetrics() ) ); } @@ -41,9 +46,13 @@ public class PrometheusExposeController extends AbstractController implements Pr public Mono> getBrokerMetrics(String clusterName, Long brokerId, ServerWebExchange exchange) { + var cluster = getCluster(clusterName); + if (!cluster.isExposeMetricsViaPrometheusEndpoint()) { + return Mono.empty(); + } return Mono.just( PrometheusExpose.exposeBrokerMetrics( - statisticsCache.get(getCluster(clusterName)).getMetrics(), brokerId.intValue() + statisticsCache.get(cluster).getMetrics(), brokerId.intValue() ) ); } diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/KafkaCluster.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/KafkaCluster.java index 73c8c5e164..fc797e3811 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/KafkaCluster.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/KafkaCluster.java @@ -14,6 +14,7 @@ import lombok.AccessLevel; import lombok.AllArgsConstructor; import lombok.Builder; import lombok.Data; +import prometheus.query.api.PrometheusClientApi; @Data @Builder(toBuilder = true) @@ -26,10 +27,12 @@ public class KafkaCluster { private final String bootstrapServers; private final Properties properties; private final boolean readOnly; + private final boolean exposeMetricsViaPrometheusEndpoint; private final DataMasking masking; private final PollingSettings pollingSettings; private final ReactiveFailover schemaRegistryClient; private final Map> connectsClients; private final ReactiveFailover ksqlClient; private final MetricsScrapping metricsScrapping; + private final ReactiveFailover prometheusStorageClient; } diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/KafkaClusterFactory.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/KafkaClusterFactory.java index c7276c3ce6..f628fb1f40 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/KafkaClusterFactory.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/KafkaClusterFactory.java @@ -1,5 +1,12 @@ package com.provectus.kafka.ui.service; +import static com.provectus.kafka.ui.util.KafkaServicesValidation.validateClusterConnection; +import static com.provectus.kafka.ui.util.KafkaServicesValidation.validateConnect; +import static com.provectus.kafka.ui.util.KafkaServicesValidation.validateKsql; +import static com.provectus.kafka.ui.util.KafkaServicesValidation.validatePrometheusStore; +import static com.provectus.kafka.ui.util.KafkaServicesValidation.validateSchemaRegistry; +import static com.provectus.kafka.ui.util.KafkaServicesValidation.validateTruststore; + import com.provectus.kafka.ui.client.RetryingKafkaConnectClient; import com.provectus.kafka.ui.config.ClustersProperties; import com.provectus.kafka.ui.config.WebclientProperties; @@ -25,8 +32,10 @@ import java.util.Properties; import java.util.stream.Stream; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Service; +import org.springframework.util.StringUtils; import org.springframework.util.unit.DataSize; import org.springframework.web.reactive.function.client.WebClient; +import prometheus.query.api.PrometheusClientApi; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.util.function.Tuple2; @@ -56,6 +65,7 @@ public class KafkaClusterFactory { builder.bootstrapServers(clusterProperties.getBootstrapServers()); builder.properties(convertProperties(clusterProperties.getProperties())); builder.readOnly(clusterProperties.isReadOnly()); + builder.exposeMetricsViaPrometheusEndpoint(exposeMetricsViaPrometheusEndpoint(clusterProperties)); builder.masking(DataMasking.create(clusterProperties.getMasking())); builder.pollingSettings(PollingSettings.create(clusterProperties, properties)); builder.metricsScrapping(MetricsScrapping.create(clusterProperties, jmxMetricsRetriever)); @@ -69,13 +79,16 @@ public class KafkaClusterFactory { if (ksqlConfigured(clusterProperties)) { builder.ksqlClient(ksqlClient(clusterProperties)); } + if (prometheusStorageConfigured(clusterProperties)) { + builder.prometheusStorageClient(prometheusStorageClient(clusterProperties)); + } builder.originalProperties(clusterProperties); return builder.build(); } public Mono validate(ClustersProperties.Cluster clusterProperties) { if (clusterProperties.getSsl() != null) { - Optional errMsg = KafkaServicesValidation.validateTruststore(clusterProperties.getSsl()); + Optional errMsg = validateTruststore(clusterProperties.getSsl()); if (errMsg.isPresent()) { return Mono.just(new ClusterConfigValidationDTO() .kafka(new ApplicationPropertyValidationDTO() @@ -85,40 +98,51 @@ public class KafkaClusterFactory { } return Mono.zip( - KafkaServicesValidation.validateClusterConnection( + validateClusterConnection( clusterProperties.getBootstrapServers(), convertProperties(clusterProperties.getProperties()), clusterProperties.getSsl() ), schemaRegistryConfigured(clusterProperties) - ? KafkaServicesValidation.validateSchemaRegistry( - () -> schemaRegistryClient(clusterProperties)).map(Optional::of) + ? validateSchemaRegistry(() -> schemaRegistryClient(clusterProperties)).map(Optional::of) : Mono.>just(Optional.empty()), ksqlConfigured(clusterProperties) - ? KafkaServicesValidation.validateKsql(() -> ksqlClient(clusterProperties)).map(Optional::of) + ? validateKsql(() -> ksqlClient(clusterProperties)).map(Optional::of) : Mono.>just(Optional.empty()), connectClientsConfigured(clusterProperties) ? Flux.fromIterable(clusterProperties.getKafkaConnect()) .flatMap(c -> - KafkaServicesValidation.validateConnect(() -> connectClient(clusterProperties, c)) + validateConnect(() -> connectClient(clusterProperties, c)) .map(r -> Tuples.of(c.getName(), r))) .collectMap(Tuple2::getT1, Tuple2::getT2) .map(Optional::of) : - Mono.>>just(Optional.empty()) + Mono.>>just(Optional.empty()), + + prometheusStorageConfigured(clusterProperties) + ? validatePrometheusStore(() -> prometheusStorageClient(clusterProperties)).map(Optional::of) + : Mono.>just(Optional.empty()) + ).map(tuple -> { var validation = new ClusterConfigValidationDTO(); validation.kafka(tuple.getT1()); tuple.getT2().ifPresent(validation::schemaRegistry); tuple.getT3().ifPresent(validation::ksqldb); tuple.getT4().ifPresent(validation::kafkaConnects); + tuple.getT5().ifPresent(validation::prometheusStorage); return validation; }); } + private boolean exposeMetricsViaPrometheusEndpoint(ClustersProperties.Cluster clusterProperties) { + return Optional.ofNullable(clusterProperties.getMetrics()) + .map(m -> Boolean.TRUE.equals(m.getPrometheusExpose())) + .orElse(true); + } + private Properties convertProperties(Map propertiesMap) { Properties properties = new Properties(); if (propertiesMap != null) { @@ -153,6 +177,28 @@ public class KafkaClusterFactory { ); } + private ReactiveFailover prometheusStorageClient(ClustersProperties.Cluster cluster) { + WebClient webClient = new WebClientConfigurator() + .configureSsl(cluster.getSsl(), cluster.getSchemaRegistrySsl()) + .configureBufferSize(webClientMaxBuffSize) + .build(); + return ReactiveFailover.create( + parseUrlList(cluster.getMetrics().getStore().getPrometheus().getUrl()), + url -> new PrometheusClientApi(new prometheus.query.ApiClient(webClient, null, null).setBasePath(url)), + ReactiveFailover.CONNECTION_REFUSED_EXCEPTION_FILTER, + "No live schemaRegistry instances available", + ReactiveFailover.DEFAULT_RETRY_GRACE_PERIOD_MS + ); + } + + private boolean prometheusStorageConfigured(ClustersProperties.Cluster cluster) { + return Optional.ofNullable(cluster.getMetrics()) + .flatMap(m -> Optional.ofNullable(m.getStore())) + .flatMap(s -> Optional.of(s.getPrometheus())) + .map(p -> StringUtils.hasText(p.getUrl())) + .orElse(false); + } + private boolean schemaRegistryConfigured(ClustersProperties.Cluster clusterProperties) { return clusterProperties.getSchemaRegistry() != null; } diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/util/KafkaServicesValidation.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/util/KafkaServicesValidation.java index 4b8af81f85..516ad2266b 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/util/KafkaServicesValidation.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/util/KafkaServicesValidation.java @@ -19,6 +19,7 @@ import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.admin.AdminClient; import org.apache.kafka.clients.admin.AdminClientConfig; import org.springframework.util.ResourceUtils; +import prometheus.query.api.PrometheusClientApi; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; @@ -46,7 +47,7 @@ public final class KafkaServicesValidation { public static Optional validateTruststore(TruststoreConfig truststoreConfig) { if (truststoreConfig.getTruststoreLocation() != null && truststoreConfig.getTruststorePassword() != null) { try (FileInputStream fileInputStream = new FileInputStream( - (ResourceUtils.getFile(truststoreConfig.getTruststoreLocation())))) { + (ResourceUtils.getFile(truststoreConfig.getTruststoreLocation())))) { KeyStore trustStore = KeyStore.getInstance(KeyStore.getDefaultType()); trustStore.load(fileInputStream, truststoreConfig.getTruststorePassword().toCharArray()); TrustManagerFactory trustManagerFactory = TrustManagerFactory.getInstance( @@ -141,5 +142,18 @@ public final class KafkaServicesValidation { .onErrorResume(KafkaServicesValidation::invalid); } + public static Mono validatePrometheusStore( + Supplier> clientSupplier) { + ReactiveFailover client; + try { + client = clientSupplier.get(); + } catch (Exception e) { + log.error("Error creating Prometheus client", e); + return invalid("Error creating Prometheus client: " + e.getMessage()); + } + return client.mono(c -> c.query("1", null, null)) //TODO: check params + .then(valid()) + .onErrorResume(KafkaServicesValidation::invalid); + } } diff --git a/kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml b/kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml index 9a4a901953..6a32482b14 100644 --- a/kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml +++ b/kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml @@ -3677,6 +3677,8 @@ components: $ref: '#/components/schemas/ApplicationPropertyValidation' ksqldb: $ref: '#/components/schemas/ApplicationPropertyValidation' + prometheusStorage: + $ref: '#/components/schemas/ApplicationPropertyValidation' ApplicationConfig: type: object diff --git a/kafka-ui-contract/src/main/resources/swagger/prometheus-query-api.yaml b/kafka-ui-contract/src/main/resources/swagger/prometheus-query-api.yaml index 33512b94f4..ed2efcea68 100644 --- a/kafka-ui-contract/src/main/resources/swagger/prometheus-query-api.yaml +++ b/kafka-ui-contract/src/main/resources/swagger/prometheus-query-api.yaml @@ -2,10 +2,6 @@ openapi: 3.0.1 info: title: | Prometheus query HTTP API - Copied from https://raw.githubusercontent.com/HelloKunal/OpenAPI-Specification-of-Go-API/main/swagger.yaml - description: | - The current stable HTTP API is reachable under /api/v1 on a Prometheus server. Any non-breaking additions will be added under that endpoint. - termsOfService: urn:tos version: 0.1.0 contact: { } @@ -53,7 +49,7 @@ paths: content: application/json: schema: - $ref: '#/components/schemas/responseLabelValues' + $ref: '#/components/schemas/LabelValuesResponse' /api/v1/labels: get: @@ -89,7 +85,7 @@ paths: content: application/json: schema: - $ref: '#/components/schemas/responseLabelNames' + $ref: '#/components/schemas/LabelNamesResponse' /api/v1/metadata: get: @@ -104,7 +100,7 @@ paths: description: Maximum number of metrics to return. required: true schema: - type: number + type: integer - name: metric in: query description: A metric name to filter metadata for. All metric metadata is retrieved if left empty. @@ -116,14 +112,14 @@ paths: content: application/json: schema: - $ref: '#/components/schemas/responseMetadata' + $ref: '#/components/schemas/MetadataResponse' 201: description: | Success content: application/json: schema: - $ref: '#/components/schemas/responseMetadata' + $ref: '#/components/schemas/MetadataResponse' /api/v1/query: get: @@ -162,7 +158,7 @@ paths: content: application/json: schema: - $ref: '#/components/schemas/queryData' + $ref: '#/components/schemas/QueryResponse' /api/v1/query_range: @@ -170,7 +166,7 @@ paths: tags: - PrometheusClient summary: Evaluates query over range of time. - description: Evaluates an expression query over a range of time + description: Evaluates an expression query over a range of time operationId: queryRange parameters: - name: query @@ -212,34 +208,7 @@ paths: content: application/json: schema: - $ref: "#/components/schemas/responseQuery_range" - example: - status: success - data: - resultType: matrix - result: - - metric: - __name__: up - job: prometheus - instance: localhost:9090 - values: - - - 1.435781430781E9 - - "1" - - - 1.435781445781E9 - - "1" - - - 1.435781460781E9 - - "1" - - metric: - __name__: up - job: node - instance: localhost:9091 - values: - - - 1.435781430781E9 - - "0" - - - 1.435781445781E9 - - "0" - - - 1.435781460781E9 - - "1" + $ref: "#/components/schemas/QueryResponse" /api/v1/series: @@ -278,175 +247,141 @@ paths: content: application/json: schema: - $ref: '#/components/schemas/responseSeries' - example: - status: success - data: - - __name__: up - job: prometheus - instance: localhost:9090 - - __name__: up - job: node - instance: localhost:9091 - - __name__: process_start_time_seconds - job: prometheus - instance: localhost:9090 + $ref: '#/components/schemas/SeriesResponse' components: schemas: - Label: + BaseResponse: type: object + required: [ status ] properties: - Name: + status: type: string - Value: + enum: [ "success", "error" ] + error: type: string - description: Label is a key/value pair of strings. - Labels: - type: array - description: |- - Labels is a sorted set of labels. Order has to be guaranteed upon - instantiation. - items: - $ref: '#/components/schemas/Label' - MetricType: - type: string - description: MetricType represents metric type values. - - metadata: - type: object - properties: - Help: + errorType: type: string - Type: - $ref: '#/components/schemas/MetricType' - Unit: - type: string - - queryData: - type: object - properties: - Result: - type: object - properties: - metric: - type: object - properties: - __name__: - type: string - job: - type: string - instance: - type: string - value: - type: array - items: - oneOf: - - type: string - format: "unix_timestamp" - - type: string - format: "sample_value" - ResultType: - type: string - enum: - - matrix - - vector - - scalar - - string - - - responseSeries: - type: array - description: a list of objects that contain the label name/value pairs which - identify each series - items: - type: object - properties: - __name__: - type: string - job: - type: string - instance: + warnings: + type: array + items: type: string - responseSnapshot: + QueryResponse: type: object + allOf: + - $ref: "#/components/schemas/BaseResponse" properties: - name: - type: string + data: + $ref: '#/components/schemas/QueryResponseData' - responseQuery_exemplars: - type: object - properties: - seriesLabels: - type: object - properties: - __name__: - type: string - job: - type: string - instance: - type: string - service: - type: string - exemplars: - type: object - properties: - labels: - type: object - properties: - traceID: - type: string - values: - type: string - timestamp: - type: string - format: "unix_timestamp" - - responseQuery_range: + QueryResponseData: type: object + required: [ "resultType" ] properties: resultType: type: string - result: - type: object - properties: - metric: - type: object - properties: - __name__: - type: string - job: - type: string - instance: - type: string - values: - type: array - items: - oneOf: - - type: string - format: "unix_timestamp" - - type: string - format: "sample_value" + discriminator: + propertyName: resultType + mapping: + matrix: '#/components/schemas/MatrixQueryResponse' + vector: '#/components/schemas/InstantVectorQueryResponse' + scalar: '#/components/schemas/ScalarQueryResponse' + string: '#/components/schemas/StringQueryResponse' + anyOf: + - $ref: '#/components/schemas/MatrixQueryResponse' + - $ref: '#/components/schemas/InstantVectorQueryResponse' + - $ref: '#/components/schemas/ScalarQueryResponse' + - $ref: '#/components/schemas/StringQueryResponse' - responseMetadata: + MatrixQueryResponse: type: object + allOf: + - $ref: "#/components/schemas/QueryResponseData" properties: - metric name: - type: string - additionalProperties: - $ref: '#/components/schemas/metadata' - description: a (key, object) map. `metric name`is an example key + result: + type: array + items: { } - responseLabelValues: - type: array - description: a list of string label values - items: - type: string + InstantVectorQueryResponse: + type: object + allOf: + - $ref: "#/components/schemas/QueryResponseData" + properties: + result: + type: array + items: { } - responseLabelNames: - type: array - description: a list of string label names - items: - type: string + ScalarQueryResponse: + type: object + allOf: + - $ref: "#/components/schemas/QueryResponseData" + properties: + result: + type: array + items: { } + + StringQueryResponse: + type: object + allOf: + - $ref: "#/components/schemas/QueryResponseData" + properties: + result: + type: array + items: { } + + SeriesResponse: + type: object + allOf: + - $ref: "#/components/schemas/BaseResponse" + properties: + data: + type: array + description: a list of objects that contain the label name/value pairs which + identify each series + items: + type: object + properties: + __name__: + type: string + job: + type: string + instance: + type: string + + MetadataResponse: + type: object + allOf: + - $ref: "#/components/schemas/BaseResponse" + properties: + data: + type: object + additionalProperties: + type: array + items: + type: object + additionalProperties: true + + LabelValuesResponse: + type: object + allOf: + - $ref: "#/components/schemas/BaseResponse" + properties: + data: + type: array + description: a list of string label values + items: + type: string + + LabelNamesResponse: + type: object + allOf: + - $ref: "#/components/schemas/BaseResponse" + properties: + data: + type: array + description: a list of string label names + items: + type: string