diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/GraphsController.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/GraphsController.java index 4969162002..ffa0730c5a 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/GraphsController.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/GraphsController.java @@ -4,18 +4,17 @@ import com.provectus.kafka.ui.api.GraphsApi; import com.provectus.kafka.ui.model.GraphDataRequestDTO; import com.provectus.kafka.ui.model.GraphDescriptionDTO; import com.provectus.kafka.ui.model.GraphDescriptionsDTO; +import com.provectus.kafka.ui.model.GraphParameterDTO; import com.provectus.kafka.ui.model.PrometheusApiQueryResponseDTO; -import com.provectus.kafka.ui.model.PrometheusApiQueryResponseDataDTO; import com.provectus.kafka.ui.model.rbac.AccessContext; -import com.provectus.kafka.ui.model.rbac.permission.KsqlAction; -import com.provectus.kafka.ui.service.AdminClientService; import com.provectus.kafka.ui.service.graphs.GraphsService; import com.provectus.kafka.ui.service.audit.AuditService; +import com.provectus.kafka.ui.service.graphs.GraphsStorage; import com.provectus.kafka.ui.service.rbac.AccessControlService; import java.time.Duration; import java.time.OffsetDateTime; -import java.util.List; import java.util.Optional; +import java.util.stream.Stream; import lombok.RequiredArgsConstructor; import org.mapstruct.Mapper; import org.mapstruct.factory.Mappers; @@ -66,15 +65,22 @@ public class GraphsController extends AbstractController implements GraphsApi { @Override public Mono> getGraphsList(String clusterName, ServerWebExchange exchange) { - var graphs = graphsService.getAllGraphs().toList(); + var graphs = graphsService.getAllGraphs(); var cluster = getCluster(clusterName); if (cluster.getPrometheusStorageClient() == null) { - graphs = List.of(); + graphs = Stream.empty(); } return Mono.just( ResponseEntity.ok( - new GraphDescriptionsDTO().graphs(graphs) + new GraphDescriptionsDTO().graphs(graphs.map(this::map).toList()) ) ); } + + private GraphDescriptionDTO map(GraphsStorage.GraphDescription graph) { + return new GraphDescriptionDTO(graph.id()) + .defaultPeriod(Optional.ofNullable(graph.defaultInterval()).map(Duration::toString).orElse(null)) + .type(graph.isRange() ? GraphDescriptionDTO.TypeEnum.RANGE : GraphDescriptionDTO.TypeEnum.INSTANT) + .parameters(graph.params().stream().map(GraphParameterDTO::new).toList()); + } } diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/KafkaClusterFactory.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/KafkaClusterFactory.java index 272ad58774..2ce9e5bfe6 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/KafkaClusterFactory.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/KafkaClusterFactory.java @@ -184,7 +184,7 @@ public class KafkaClusterFactory { .build(); return ReactiveFailover.create( parseUrlList(cluster.getMetrics().getStore().getPrometheus().getUrl()), - url -> new PrometheusClientApi(new prometheus.query.ApiClient(webClient, null, null).setBasePath(url)), + url -> new PrometheusClientApi(new prometheus.query.ApiClient(webClient).setBasePath(url)), ReactiveFailover.CONNECTION_REFUSED_EXCEPTION_FILTER, "No live schemaRegistry instances available", ReactiveFailover.DEFAULT_RETRY_GRACE_PERIOD_MS diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/graphs/GraphsService.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/graphs/GraphsService.java index 1cab785332..811b888716 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/graphs/GraphsService.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/graphs/GraphsService.java @@ -3,17 +3,16 @@ package com.provectus.kafka.ui.service.graphs; import com.google.common.base.Preconditions; import com.provectus.kafka.ui.exception.NotFoundException; import com.provectus.kafka.ui.exception.ValidationException; -import com.provectus.kafka.ui.model.GraphDescriptionDTO; import com.provectus.kafka.ui.model.KafkaCluster; +import com.provectus.kafka.ui.service.graphs.GraphsStorage.GraphDescription; +import com.provectus.kafka.ui.service.metrics.prometheus.PromQueryTemplate; import java.time.Duration; import java.time.Instant; -import java.util.HashMap; import java.util.Map; import java.util.Optional; import java.util.stream.Stream; import javax.annotation.Nullable; import lombok.RequiredArgsConstructor; -import org.apache.commons.lang3.text.StrSubstitutor; import org.springframework.stereotype.Component; import prometheus.query.api.PrometheusClientApi; import prometheus.query.model.QueryResponse; @@ -42,9 +41,9 @@ public class GraphsService { } return cluster.getPrometheusStorageClient() .mono(client -> { - String preparedQuery = prepareQuery(cluster.getName(), graph.getPrometheusQuery(), params); - if (graph.getDefaultPeriod() != null) { - return queryRange(client, preparedQuery, Duration.parse(graph.getDefaultPeriod()), from, to); + String preparedQuery = prepareQuery(graph, cluster.getName(), params); + if (graph.isRange()) { + return queryRange(client, preparedQuery, graph.defaultInterval(), from, to); } return queryInstant(client, preparedQuery); }); @@ -77,7 +76,7 @@ public class GraphsService { return intervalInSecs + "s"; } int step = ((int) (((double) intervalInSecs) / 200)); - System.out.println("Chosen step size"); //TODo + System.out.println("Chosen step size " + step); //TODo return step + "s"; } @@ -85,15 +84,11 @@ public class GraphsService { return c.query(preparedQuery, null, null); } - - private String prepareQuery(String clusterName, String queryTemplate, @Nullable Map params) { - Map replacements = new HashMap<>(); - replacements.putAll(Optional.ofNullable(params).orElse(Map.of())); - replacements.put("cluster", clusterName); - return new StrSubstitutor(replacements).replace(queryTemplate); + public static String prepareQuery(GraphDescription d, String clusterName, @Nullable Map params) { + return new PromQueryTemplate(d).getQuery(clusterName, Optional.ofNullable(params).orElse(Map.of())); } - public Stream getAllGraphs() { + public Stream getAllGraphs() { return graphsStorage.getAll(); } diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/graphs/GraphsStorage.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/graphs/GraphsStorage.java index 38190f98bc..206ef4be3f 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/graphs/GraphsStorage.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/graphs/GraphsStorage.java @@ -1,53 +1,89 @@ package com.provectus.kafka.ui.service.graphs; -import com.provectus.kafka.ui.model.GraphDescriptionDTO; -import com.provectus.kafka.ui.model.GraphParameterDTO; +import com.provectus.kafka.ui.exception.ValidationException; +import com.provectus.kafka.ui.service.metrics.prometheus.PromQueryTemplate; import java.time.Duration; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.Set; import java.util.stream.Collectors; import java.util.stream.Stream; +import javax.annotation.Nullable; +import lombok.Builder; import org.springframework.stereotype.Component; @Component -class GraphsStorage { - - private final Map graphsById; +public class GraphsStorage { - GraphsStorage() { - this.graphsById = PREDEFINED_GRAPHS.stream() - .collect(Collectors.toMap(GraphDescriptionDTO::getId, d -> d)); + private static final Duration DEFAULT_RANGE_DURATION = Duration.ofDays(7); + + @Builder + public record GraphDescription(String id, + @Nullable Duration defaultInterval, + String prometheusQuery, + Set params) { + public boolean isRange() { + return defaultInterval != null; + } } - Optional getDescription(String id) { + private final Map graphsById; + + GraphsStorage() { + validateGraphDescr(PREDEFINED_GRAPHS); + this.graphsById = PREDEFINED_GRAPHS.stream() + .collect(Collectors.toMap(GraphDescription::id, d -> d)); + } + + Optional getDescription(String id) { return Optional.ofNullable(graphsById.get(id)); } - Stream getAll() { + Stream getAll() { return graphsById.values().stream(); } - private static final List PREDEFINED_GRAPHS = List.of( + private void validateGraphDescr(List descriptions) { + Map errors = new HashMap<>(); + for (GraphDescription description : descriptions) { + new PromQueryTemplate(description) + .validateSyntax() + .ifPresent(err -> errors.put(description.id(), err)); + } + if (!errors.isEmpty()) { + throw new ValidationException("Error validating queries for following graphs: " + errors); + } + } - new GraphDescriptionDTO("broker_bytes_disk_ts") - .defaultPeriod(Duration.ofDays(7).toString()) - .prometheusQuery("broker_bytes_disk{cluster=\"${cluster}\"}"), + private static final List PREDEFINED_GRAPHS = List.of( - new GraphDescriptionDTO("broker_bytes_disk") - .prometheusQuery("broker_bytes_disk{cluster=\"${cluster}\"}"), + GraphDescription.builder() + .id("broker_bytes_disk_ts") + .defaultInterval(DEFAULT_RANGE_DURATION) + .prometheusQuery("broker_bytes_disk{cluster=\"${cluster}\"}") + .params(Set.of()) + .build(), - new GraphDescriptionDTO("kafka_topic_partition_current_offset") - .prometheusQuery("topic_bytes_disk{cluster=\"${cluster}\"}"), + GraphDescription.builder() + .id("broker_bytes_disk") + .prometheusQuery("broker_bytes_disk{cluster=\"${cluster}\"}") + .params(Set.of()) + .build(), - new GraphDescriptionDTO("kafka_topic_partition_current_offset_ts") - .defaultPeriod(Duration.ofDays(7).toString()) - .prometheusQuery("topic_bytes_disk{cluster=\"${cluster}\"}"), + GraphDescription.builder() + .id("kafka_topic_partition_current_offset") + .prometheusQuery("kafka_topic_partition_current_offset{cluster=\"${cluster}\"}") + .params(Set.of()) + .build(), - new GraphDescriptionDTO("kafka_topic_partition_current_offset_per_topic_ts") - .defaultPeriod(Duration.ofDays(7).toString()) - .prometheusQuery("topic_bytes_disk{cluster=\"${cluster}\", topic = \"${topic}\"}") - .addParametersItem(new GraphParameterDTO().name("topic")) + GraphDescription.builder() + .id("kafka_topic_partition_current_offset_per_topic_ts") + .defaultInterval(DEFAULT_RANGE_DURATION) + .prometheusQuery("kafka_topic_partition_current_offset{cluster=\"${cluster}\",topic = \"${topic}\"}") + .params(Set.of("topic")) + .build() ); } diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/metrics/prometheus/PromQlGrammar.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/metrics/prometheus/PromQueryLangGrammar.java similarity index 59% rename from kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/metrics/prometheus/PromQlGrammar.java rename to kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/metrics/prometheus/PromQueryLangGrammar.java index 5b007065fb..13885f2efe 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/metrics/prometheus/PromQlGrammar.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/metrics/prometheus/PromQueryLangGrammar.java @@ -1,6 +1,7 @@ package com.provectus.kafka.ui.service.metrics.prometheus; import com.provectus.kafka.ui.exception.ValidationException; +import java.util.Optional; import org.antlr.v4.runtime.BaseErrorListener; import org.antlr.v4.runtime.CharStreams; import org.antlr.v4.runtime.CommonTokenStream; @@ -9,24 +10,23 @@ import org.antlr.v4.runtime.Recognizer; import promql.PromQLLexer; import promql.PromQLParser; -public class PromQlGrammar { +class PromQueryLangGrammar { - public static void main(String[] args) { - String promql = "sum( " + - " kafka_controller_kafkacontroller_activecontrollercount{cluster_name=\"3299fef4\",metrics=\"kafka\"}) OR " + - " kafka_controller_kafkacontroller_activecontrollercount{cluster_name=\"3299fef4\",job=\"topic-scanner\"}"; - System.out.println(parseMetricSelector(promql)); + // returns error msg, or empty if query is valid + static Optional validateExpression(String query) { + try { + parseExpression(query); + return Optional.empty(); + } catch (ValidationException v) { + return Optional.of(v.getMessage()); + } } - public static PromQLParser.InstantSelectorContext parseMetricSelector(String selector) { - return parse(selector).instantSelector(); - } - - public static PromQLParser.ExpressionContext parseExpression(String query) { + static PromQLParser.ExpressionContext parseExpression(String query) { return parse(query).expression(); } - private static PromQLParser parse(String str) { + private static PromQLParser parse(String str) throws ValidationException { PromQLLexer lexer = new PromQLLexer(CharStreams.fromString(str)); lexer.addErrorListener(new BaseErrorListener() { @Override diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/metrics/prometheus/PromQueryTemplate.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/metrics/prometheus/PromQueryTemplate.java new file mode 100644 index 0000000000..4225d759ae --- /dev/null +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/metrics/prometheus/PromQueryTemplate.java @@ -0,0 +1,48 @@ +package com.provectus.kafka.ui.service.metrics.prometheus; + +import com.google.common.collect.Sets; +import com.provectus.kafka.ui.exception.ValidationException; +import com.provectus.kafka.ui.service.graphs.GraphsStorage; +import java.util.HashMap; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import org.apache.commons.lang3.text.StrSubstitutor; + +public class PromQueryTemplate { + + private final String queryTemplate; + private final Set paramsNames; + + public PromQueryTemplate(GraphsStorage.GraphDescription d) { + this(d.prometheusQuery(), d.params()); + } + + public PromQueryTemplate(String templateQueryString, Set paramsNames) { + this.queryTemplate = templateQueryString; + this.paramsNames = paramsNames; + } + + public String getQuery(String clusterName, Map paramValues) { + var missingParams = Sets.difference(paramsNames, paramValues.keySet()); + if (!missingParams.isEmpty()) { + throw new ValidationException("Not all params set for query, missing: " + missingParams); + } + Map replacements = new HashMap<>(paramValues); + replacements.put("cluster", clusterName); + return replaceParams(replacements); + } + + public Optional validateSyntax() { + Map fakeReplacements = new HashMap<>(); + paramsNames.forEach(paramName -> fakeReplacements.put(paramName, "1")); + fakeReplacements.put("cluster", "1"); + String prepared = replaceParams(fakeReplacements); + return PromQueryLangGrammar.validateExpression(prepared); + } + + private String replaceParams(Map replacements) { + return new StrSubstitutor(replacements).replace(queryTemplate); + } + +} diff --git a/kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml b/kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml index 3231ed7166..96345bfd87 100644 --- a/kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml +++ b/kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml @@ -3730,23 +3730,20 @@ components: id: type: string description: Id that should be used to query data on API level - metadata: - description: Additional info that can be used on UI for rendering adjustments - type: object - additionalProperties: true + type: + type: string + enum: ["range", "instant"] defaultPeriod: type: string - description: ISO_8601 duration string. If not set - instant (not range) query will be executed + description: ISO_8601 duration string (for "range" graphs only) parameters: type: array items: $ref: '#/components/schemas/GraphParameter' - prometheusQuery: - type: string - description: Mustache query template with param holders GraphParameter: type: object + required: ["name"] properties: name: type: string