This commit is contained in:
iliax 2023-07-20 17:21:51 +04:00
parent e9793ecf67
commit 28c9871ff2
7 changed files with 149 additions and 67 deletions

View file

@ -4,18 +4,17 @@ import com.provectus.kafka.ui.api.GraphsApi;
import com.provectus.kafka.ui.model.GraphDataRequestDTO; import com.provectus.kafka.ui.model.GraphDataRequestDTO;
import com.provectus.kafka.ui.model.GraphDescriptionDTO; import com.provectus.kafka.ui.model.GraphDescriptionDTO;
import com.provectus.kafka.ui.model.GraphDescriptionsDTO; import com.provectus.kafka.ui.model.GraphDescriptionsDTO;
import com.provectus.kafka.ui.model.GraphParameterDTO;
import com.provectus.kafka.ui.model.PrometheusApiQueryResponseDTO; import com.provectus.kafka.ui.model.PrometheusApiQueryResponseDTO;
import com.provectus.kafka.ui.model.PrometheusApiQueryResponseDataDTO;
import com.provectus.kafka.ui.model.rbac.AccessContext; import com.provectus.kafka.ui.model.rbac.AccessContext;
import com.provectus.kafka.ui.model.rbac.permission.KsqlAction;
import com.provectus.kafka.ui.service.AdminClientService;
import com.provectus.kafka.ui.service.graphs.GraphsService; import com.provectus.kafka.ui.service.graphs.GraphsService;
import com.provectus.kafka.ui.service.audit.AuditService; import com.provectus.kafka.ui.service.audit.AuditService;
import com.provectus.kafka.ui.service.graphs.GraphsStorage;
import com.provectus.kafka.ui.service.rbac.AccessControlService; import com.provectus.kafka.ui.service.rbac.AccessControlService;
import java.time.Duration; import java.time.Duration;
import java.time.OffsetDateTime; import java.time.OffsetDateTime;
import java.util.List;
import java.util.Optional; import java.util.Optional;
import java.util.stream.Stream;
import lombok.RequiredArgsConstructor; import lombok.RequiredArgsConstructor;
import org.mapstruct.Mapper; import org.mapstruct.Mapper;
import org.mapstruct.factory.Mappers; import org.mapstruct.factory.Mappers;
@ -66,15 +65,22 @@ public class GraphsController extends AbstractController implements GraphsApi {
@Override @Override
public Mono<ResponseEntity<GraphDescriptionsDTO>> getGraphsList(String clusterName, public Mono<ResponseEntity<GraphDescriptionsDTO>> getGraphsList(String clusterName,
ServerWebExchange exchange) { ServerWebExchange exchange) {
var graphs = graphsService.getAllGraphs().toList(); var graphs = graphsService.getAllGraphs();
var cluster = getCluster(clusterName); var cluster = getCluster(clusterName);
if (cluster.getPrometheusStorageClient() == null) { if (cluster.getPrometheusStorageClient() == null) {
graphs = List.of(); graphs = Stream.empty();
} }
return Mono.just( return Mono.just(
ResponseEntity.ok( ResponseEntity.ok(
new GraphDescriptionsDTO().graphs(graphs) new GraphDescriptionsDTO().graphs(graphs.map(this::map).toList())
) )
); );
} }
private GraphDescriptionDTO map(GraphsStorage.GraphDescription graph) {
return new GraphDescriptionDTO(graph.id())
.defaultPeriod(Optional.ofNullable(graph.defaultInterval()).map(Duration::toString).orElse(null))
.type(graph.isRange() ? GraphDescriptionDTO.TypeEnum.RANGE : GraphDescriptionDTO.TypeEnum.INSTANT)
.parameters(graph.params().stream().map(GraphParameterDTO::new).toList());
}
} }

View file

@ -184,7 +184,7 @@ public class KafkaClusterFactory {
.build(); .build();
return ReactiveFailover.create( return ReactiveFailover.create(
parseUrlList(cluster.getMetrics().getStore().getPrometheus().getUrl()), parseUrlList(cluster.getMetrics().getStore().getPrometheus().getUrl()),
url -> new PrometheusClientApi(new prometheus.query.ApiClient(webClient, null, null).setBasePath(url)), url -> new PrometheusClientApi(new prometheus.query.ApiClient(webClient).setBasePath(url)),
ReactiveFailover.CONNECTION_REFUSED_EXCEPTION_FILTER, ReactiveFailover.CONNECTION_REFUSED_EXCEPTION_FILTER,
"No live schemaRegistry instances available", "No live schemaRegistry instances available",
ReactiveFailover.DEFAULT_RETRY_GRACE_PERIOD_MS ReactiveFailover.DEFAULT_RETRY_GRACE_PERIOD_MS

View file

@ -3,17 +3,16 @@ package com.provectus.kafka.ui.service.graphs;
import com.google.common.base.Preconditions; import com.google.common.base.Preconditions;
import com.provectus.kafka.ui.exception.NotFoundException; import com.provectus.kafka.ui.exception.NotFoundException;
import com.provectus.kafka.ui.exception.ValidationException; import com.provectus.kafka.ui.exception.ValidationException;
import com.provectus.kafka.ui.model.GraphDescriptionDTO;
import com.provectus.kafka.ui.model.KafkaCluster; import com.provectus.kafka.ui.model.KafkaCluster;
import com.provectus.kafka.ui.service.graphs.GraphsStorage.GraphDescription;
import com.provectus.kafka.ui.service.metrics.prometheus.PromQueryTemplate;
import java.time.Duration; import java.time.Duration;
import java.time.Instant; import java.time.Instant;
import java.util.HashMap;
import java.util.Map; import java.util.Map;
import java.util.Optional; import java.util.Optional;
import java.util.stream.Stream; import java.util.stream.Stream;
import javax.annotation.Nullable; import javax.annotation.Nullable;
import lombok.RequiredArgsConstructor; import lombok.RequiredArgsConstructor;
import org.apache.commons.lang3.text.StrSubstitutor;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import prometheus.query.api.PrometheusClientApi; import prometheus.query.api.PrometheusClientApi;
import prometheus.query.model.QueryResponse; import prometheus.query.model.QueryResponse;
@ -42,9 +41,9 @@ public class GraphsService {
} }
return cluster.getPrometheusStorageClient() return cluster.getPrometheusStorageClient()
.mono(client -> { .mono(client -> {
String preparedQuery = prepareQuery(cluster.getName(), graph.getPrometheusQuery(), params); String preparedQuery = prepareQuery(graph, cluster.getName(), params);
if (graph.getDefaultPeriod() != null) { if (graph.isRange()) {
return queryRange(client, preparedQuery, Duration.parse(graph.getDefaultPeriod()), from, to); return queryRange(client, preparedQuery, graph.defaultInterval(), from, to);
} }
return queryInstant(client, preparedQuery); return queryInstant(client, preparedQuery);
}); });
@ -77,7 +76,7 @@ public class GraphsService {
return intervalInSecs + "s"; return intervalInSecs + "s";
} }
int step = ((int) (((double) intervalInSecs) / 200)); int step = ((int) (((double) intervalInSecs) / 200));
System.out.println("Chosen step size"); //TODo System.out.println("Chosen step size " + step); //TODo
return step + "s"; return step + "s";
} }
@ -85,15 +84,11 @@ public class GraphsService {
return c.query(preparedQuery, null, null); return c.query(preparedQuery, null, null);
} }
public static String prepareQuery(GraphDescription d, String clusterName, @Nullable Map<String, String> params) {
private String prepareQuery(String clusterName, String queryTemplate, @Nullable Map<String, String> params) { return new PromQueryTemplate(d).getQuery(clusterName, Optional.ofNullable(params).orElse(Map.of()));
Map<String, String> replacements = new HashMap<>();
replacements.putAll(Optional.ofNullable(params).orElse(Map.of()));
replacements.put("cluster", clusterName);
return new StrSubstitutor(replacements).replace(queryTemplate);
} }
public Stream<GraphDescriptionDTO> getAllGraphs() { public Stream<GraphDescription> getAllGraphs() {
return graphsStorage.getAll(); return graphsStorage.getAll();
} }

View file

@ -1,53 +1,89 @@
package com.provectus.kafka.ui.service.graphs; package com.provectus.kafka.ui.service.graphs;
import com.provectus.kafka.ui.model.GraphDescriptionDTO; import com.provectus.kafka.ui.exception.ValidationException;
import com.provectus.kafka.ui.model.GraphParameterDTO; import com.provectus.kafka.ui.service.metrics.prometheus.PromQueryTemplate;
import java.time.Duration; import java.time.Duration;
import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Optional; import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import java.util.stream.Stream; import java.util.stream.Stream;
import javax.annotation.Nullable;
import lombok.Builder;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
@Component @Component
class GraphsStorage { public class GraphsStorage {
private final Map<String, GraphDescriptionDTO> graphsById; private static final Duration DEFAULT_RANGE_DURATION = Duration.ofDays(7);
GraphsStorage() { @Builder
this.graphsById = PREDEFINED_GRAPHS.stream() public record GraphDescription(String id,
.collect(Collectors.toMap(GraphDescriptionDTO::getId, d -> d)); @Nullable Duration defaultInterval,
String prometheusQuery,
Set<String> params) {
public boolean isRange() {
return defaultInterval != null;
}
} }
Optional<GraphDescriptionDTO> getDescription(String id) { private final Map<String, GraphDescription> graphsById;
GraphsStorage() {
validateGraphDescr(PREDEFINED_GRAPHS);
this.graphsById = PREDEFINED_GRAPHS.stream()
.collect(Collectors.toMap(GraphDescription::id, d -> d));
}
Optional<GraphDescription> getDescription(String id) {
return Optional.ofNullable(graphsById.get(id)); return Optional.ofNullable(graphsById.get(id));
} }
Stream<GraphDescriptionDTO> getAll() { Stream<GraphDescription> getAll() {
return graphsById.values().stream(); return graphsById.values().stream();
} }
private static final List<GraphDescriptionDTO> PREDEFINED_GRAPHS = List.of( private void validateGraphDescr(List<GraphDescription> descriptions) {
Map<String, String> errors = new HashMap<>();
for (GraphDescription description : descriptions) {
new PromQueryTemplate(description)
.validateSyntax()
.ifPresent(err -> errors.put(description.id(), err));
}
if (!errors.isEmpty()) {
throw new ValidationException("Error validating queries for following graphs: " + errors);
}
}
new GraphDescriptionDTO("broker_bytes_disk_ts") private static final List<GraphDescription> PREDEFINED_GRAPHS = List.of(
.defaultPeriod(Duration.ofDays(7).toString())
.prometheusQuery("broker_bytes_disk{cluster=\"${cluster}\"}"),
new GraphDescriptionDTO("broker_bytes_disk") GraphDescription.builder()
.prometheusQuery("broker_bytes_disk{cluster=\"${cluster}\"}"), .id("broker_bytes_disk_ts")
.defaultInterval(DEFAULT_RANGE_DURATION)
.prometheusQuery("broker_bytes_disk{cluster=\"${cluster}\"}")
.params(Set.of())
.build(),
new GraphDescriptionDTO("kafka_topic_partition_current_offset") GraphDescription.builder()
.prometheusQuery("topic_bytes_disk{cluster=\"${cluster}\"}"), .id("broker_bytes_disk")
.prometheusQuery("broker_bytes_disk{cluster=\"${cluster}\"}")
.params(Set.of())
.build(),
new GraphDescriptionDTO("kafka_topic_partition_current_offset_ts") GraphDescription.builder()
.defaultPeriod(Duration.ofDays(7).toString()) .id("kafka_topic_partition_current_offset")
.prometheusQuery("topic_bytes_disk{cluster=\"${cluster}\"}"), .prometheusQuery("kafka_topic_partition_current_offset{cluster=\"${cluster}\"}")
.params(Set.of())
.build(),
new GraphDescriptionDTO("kafka_topic_partition_current_offset_per_topic_ts") GraphDescription.builder()
.defaultPeriod(Duration.ofDays(7).toString()) .id("kafka_topic_partition_current_offset_per_topic_ts")
.prometheusQuery("topic_bytes_disk{cluster=\"${cluster}\", topic = \"${topic}\"}") .defaultInterval(DEFAULT_RANGE_DURATION)
.addParametersItem(new GraphParameterDTO().name("topic")) .prometheusQuery("kafka_topic_partition_current_offset{cluster=\"${cluster}\",topic = \"${topic}\"}")
.params(Set.of("topic"))
.build()
); );
} }

View file

@ -1,6 +1,7 @@
package com.provectus.kafka.ui.service.metrics.prometheus; package com.provectus.kafka.ui.service.metrics.prometheus;
import com.provectus.kafka.ui.exception.ValidationException; import com.provectus.kafka.ui.exception.ValidationException;
import java.util.Optional;
import org.antlr.v4.runtime.BaseErrorListener; import org.antlr.v4.runtime.BaseErrorListener;
import org.antlr.v4.runtime.CharStreams; import org.antlr.v4.runtime.CharStreams;
import org.antlr.v4.runtime.CommonTokenStream; import org.antlr.v4.runtime.CommonTokenStream;
@ -9,24 +10,23 @@ import org.antlr.v4.runtime.Recognizer;
import promql.PromQLLexer; import promql.PromQLLexer;
import promql.PromQLParser; import promql.PromQLParser;
public class PromQlGrammar { class PromQueryLangGrammar {
public static void main(String[] args) { // returns error msg, or empty if query is valid
String promql = "sum( " + static Optional<String> validateExpression(String query) {
" kafka_controller_kafkacontroller_activecontrollercount{cluster_name=\"3299fef4\",metrics=\"kafka\"}) OR " + try {
" kafka_controller_kafkacontroller_activecontrollercount{cluster_name=\"3299fef4\",job=\"topic-scanner\"}"; parseExpression(query);
System.out.println(parseMetricSelector(promql)); return Optional.empty();
} catch (ValidationException v) {
return Optional.of(v.getMessage());
}
} }
public static PromQLParser.InstantSelectorContext parseMetricSelector(String selector) { static PromQLParser.ExpressionContext parseExpression(String query) {
return parse(selector).instantSelector();
}
public static PromQLParser.ExpressionContext parseExpression(String query) {
return parse(query).expression(); return parse(query).expression();
} }
private static PromQLParser parse(String str) { private static PromQLParser parse(String str) throws ValidationException {
PromQLLexer lexer = new PromQLLexer(CharStreams.fromString(str)); PromQLLexer lexer = new PromQLLexer(CharStreams.fromString(str));
lexer.addErrorListener(new BaseErrorListener() { lexer.addErrorListener(new BaseErrorListener() {
@Override @Override

View file

@ -0,0 +1,48 @@
package com.provectus.kafka.ui.service.metrics.prometheus;
import com.google.common.collect.Sets;
import com.provectus.kafka.ui.exception.ValidationException;
import com.provectus.kafka.ui.service.graphs.GraphsStorage;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import org.apache.commons.lang3.text.StrSubstitutor;
public class PromQueryTemplate {
private final String queryTemplate;
private final Set<String> paramsNames;
public PromQueryTemplate(GraphsStorage.GraphDescription d) {
this(d.prometheusQuery(), d.params());
}
public PromQueryTemplate(String templateQueryString, Set<String> paramsNames) {
this.queryTemplate = templateQueryString;
this.paramsNames = paramsNames;
}
public String getQuery(String clusterName, Map<String, String> paramValues) {
var missingParams = Sets.difference(paramsNames, paramValues.keySet());
if (!missingParams.isEmpty()) {
throw new ValidationException("Not all params set for query, missing: " + missingParams);
}
Map<String, String> replacements = new HashMap<>(paramValues);
replacements.put("cluster", clusterName);
return replaceParams(replacements);
}
public Optional<String> validateSyntax() {
Map<String, String> fakeReplacements = new HashMap<>();
paramsNames.forEach(paramName -> fakeReplacements.put(paramName, "1"));
fakeReplacements.put("cluster", "1");
String prepared = replaceParams(fakeReplacements);
return PromQueryLangGrammar.validateExpression(prepared);
}
private String replaceParams(Map<String, String> replacements) {
return new StrSubstitutor(replacements).replace(queryTemplate);
}
}

View file

@ -3730,23 +3730,20 @@ components:
id: id:
type: string type: string
description: Id that should be used to query data on API level description: Id that should be used to query data on API level
metadata: type:
description: Additional info that can be used on UI for rendering adjustments type: string
type: object enum: ["range", "instant"]
additionalProperties: true
defaultPeriod: defaultPeriod:
type: string type: string
description: ISO_8601 duration string. If not set - instant (not range) query will be executed description: ISO_8601 duration string (for "range" graphs only)
parameters: parameters:
type: array type: array
items: items:
$ref: '#/components/schemas/GraphParameter' $ref: '#/components/schemas/GraphParameter'
prometheusQuery:
type: string
description: Mustache query template with param holders
GraphParameter: GraphParameter:
type: object type: object
required: ["name"]
properties: properties:
name: name:
type: string type: string