iliax hace 1 año
padre
commit
e9793ecf67

+ 80 - 0
kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/GraphsController.java

@@ -0,0 +1,80 @@
+package com.provectus.kafka.ui.controller;
+
+import com.provectus.kafka.ui.api.GraphsApi;
+import com.provectus.kafka.ui.model.GraphDataRequestDTO;
+import com.provectus.kafka.ui.model.GraphDescriptionDTO;
+import com.provectus.kafka.ui.model.GraphDescriptionsDTO;
+import com.provectus.kafka.ui.model.PrometheusApiQueryResponseDTO;
+import com.provectus.kafka.ui.model.PrometheusApiQueryResponseDataDTO;
+import com.provectus.kafka.ui.model.rbac.AccessContext;
+import com.provectus.kafka.ui.model.rbac.permission.KsqlAction;
+import com.provectus.kafka.ui.service.AdminClientService;
+import com.provectus.kafka.ui.service.graphs.GraphsService;
+import com.provectus.kafka.ui.service.audit.AuditService;
+import com.provectus.kafka.ui.service.rbac.AccessControlService;
+import java.time.Duration;
+import java.time.OffsetDateTime;
+import java.util.List;
+import java.util.Optional;
+import lombok.RequiredArgsConstructor;
+import org.mapstruct.Mapper;
+import org.mapstruct.factory.Mappers;
+import org.springframework.http.ResponseEntity;
+import org.springframework.web.bind.annotation.RestController;
+import org.springframework.web.server.ServerWebExchange;
+import prometheus.query.model.QueryResponse;
+import reactor.core.publisher.Mono;
+
+@RestController
+@RequiredArgsConstructor
+public class GraphsController extends AbstractController implements GraphsApi {
+
+  private static final PrometheusApiMapper MAPPER = Mappers.getMapper(PrometheusApiMapper.class);
+
+  @Mapper
+  interface PrometheusApiMapper {
+    PrometheusApiQueryResponseDTO fromClientResponse(QueryResponse resp);
+  }
+
+  private final AccessControlService accessControlService;
+  private final AuditService auditService;
+  private final GraphsService graphsService;
+
+  @Override
+  public Mono<ResponseEntity<PrometheusApiQueryResponseDTO>> getGraphData(String clusterName,
+                                                                          Mono<GraphDataRequestDTO> graphDataRequestDTO,
+                                                                          ServerWebExchange exchange) {
+    var context = AccessContext.builder()
+        .cluster(clusterName)
+        .operationName("getGraphData")
+        .build();
+
+    return accessControlService.validateAccess(context)
+        .then(
+            graphDataRequestDTO.flatMap(req ->
+                    graphsService.getGraphData(
+                        getCluster(clusterName),
+                        req.getId(),
+                        Optional.ofNullable(req.getFrom()).map(OffsetDateTime::toInstant).orElse(null),
+                        Optional.ofNullable(req.getTo()).map(OffsetDateTime::toInstant).orElse(null),
+                        req.getParameters()
+                    ).map(MAPPER::fromClientResponse))
+                .map(ResponseEntity::ok)
+        ).doOnEach(sig -> auditService.audit(context, sig));
+  }
+
+  @Override
+  public Mono<ResponseEntity<GraphDescriptionsDTO>> getGraphsList(String clusterName,
+                                                                  ServerWebExchange exchange) {
+    var graphs = graphsService.getAllGraphs().toList();
+    var cluster = getCluster(clusterName);
+    if (cluster.getPrometheusStorageClient() == null) {
+      graphs = List.of();
+    }
+    return Mono.just(
+        ResponseEntity.ok(
+            new GraphDescriptionsDTO().graphs(graphs)
+        )
+    );
+  }
+}

+ 1 - 1
kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/KafkaClusterFactory.java

@@ -179,7 +179,7 @@ public class KafkaClusterFactory {
 
 
   private ReactiveFailover<PrometheusClientApi> prometheusStorageClient(ClustersProperties.Cluster cluster) {
   private ReactiveFailover<PrometheusClientApi> prometheusStorageClient(ClustersProperties.Cluster cluster) {
     WebClient webClient = new WebClientConfigurator()
     WebClient webClient = new WebClientConfigurator()
-        .configureSsl(cluster.getSsl(), cluster.getSchemaRegistrySsl())
+        .configureSsl(cluster.getSsl(), null)
         .configureBufferSize(webClientMaxBuffSize)
         .configureBufferSize(webClientMaxBuffSize)
         .build();
         .build();
     return ReactiveFailover.create(
     return ReactiveFailover.create(

+ 100 - 0
kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/graphs/GraphsService.java

@@ -0,0 +1,100 @@
+package com.provectus.kafka.ui.service.graphs;
+
+import com.google.common.base.Preconditions;
+import com.provectus.kafka.ui.exception.NotFoundException;
+import com.provectus.kafka.ui.exception.ValidationException;
+import com.provectus.kafka.ui.model.GraphDescriptionDTO;
+import com.provectus.kafka.ui.model.KafkaCluster;
+import java.time.Duration;
+import java.time.Instant;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+import java.util.stream.Stream;
+import javax.annotation.Nullable;
+import lombok.RequiredArgsConstructor;
+import org.apache.commons.lang3.text.StrSubstitutor;
+import org.springframework.stereotype.Component;
+import prometheus.query.api.PrometheusClientApi;
+import prometheus.query.model.QueryResponse;
+import reactor.core.publisher.Mono;
+
+@Component
+@RequiredArgsConstructor
+public class GraphsService {
+
+  private static final int TARGET_MATRIX_DATA_POINTS = 200;
+
+  private final GraphsStorage graphsStorage;
+
+  public Mono<QueryResponse> getGraphData(KafkaCluster cluster,
+                                          String id,
+                                          @Nullable Instant from,
+                                          @Nullable Instant to,
+                                          @Nullable Map<String, String> params) {
+
+    var graph = graphsStorage.getDescription(id)
+        .orElseThrow(() -> new NotFoundException("No graph found with id = " + id));
+
+    var promClient = cluster.getPrometheusStorageClient();
+    if (promClient == null) {
+      throw new ValidationException("Prometheus not configured for cluster");
+    }
+    return cluster.getPrometheusStorageClient()
+        .mono(client -> {
+          String preparedQuery = prepareQuery(cluster.getName(), graph.getPrometheusQuery(), params);
+          if (graph.getDefaultPeriod() != null) {
+            return queryRange(client, preparedQuery, Duration.parse(graph.getDefaultPeriod()), from, to);
+          }
+          return queryInstant(client, preparedQuery);
+        });
+  }
+
+  private Mono<QueryResponse> queryRange(PrometheusClientApi c,
+                                         String preparedQuery,
+                                         Duration defaultPeriod,
+                                         @Nullable Instant from,
+                                         @Nullable Instant to) {
+    if (from == null) {
+      from = Instant.now().minus(defaultPeriod);
+    }
+    if (to == null) {
+      to = Instant.now();
+    }
+    Preconditions.checkArgument(to.isAfter(from));
+    return c.queryRange(
+        preparedQuery,
+        String.valueOf(from.getEpochSecond()),
+        String.valueOf(to.getEpochSecond()),
+        calculateStepSize(from, to),
+        null
+    );
+  }
+
+  private String calculateStepSize(Instant from, Instant to) {
+    long intervalInSecs = to.getEpochSecond() - from.getEpochSecond();
+    if (intervalInSecs <= TARGET_MATRIX_DATA_POINTS) {
+      return intervalInSecs + "s";
+    }
+    int step = ((int) (((double) intervalInSecs) / 200));
+    System.out.println("Chosen step size"); //TODo
+    return step + "s";
+  }
+
+  private Mono<QueryResponse> queryInstant(PrometheusClientApi c, String preparedQuery) {
+    return c.query(preparedQuery, null, null);
+  }
+
+
+  private String prepareQuery(String clusterName, String queryTemplate, @Nullable Map<String, String> params) {
+    Map<String, String> replacements = new HashMap<>();
+    replacements.putAll(Optional.ofNullable(params).orElse(Map.of()));
+    replacements.put("cluster", clusterName);
+    return new StrSubstitutor(replacements).replace(queryTemplate);
+  }
+
+  public Stream<GraphDescriptionDTO> getAllGraphs() {
+    return graphsStorage.getAll();
+  }
+
+}

+ 53 - 0
kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/graphs/GraphsStorage.java

@@ -0,0 +1,53 @@
+package com.provectus.kafka.ui.service.graphs;
+
+import com.provectus.kafka.ui.model.GraphDescriptionDTO;
+import com.provectus.kafka.ui.model.GraphParameterDTO;
+import java.time.Duration;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+import org.springframework.stereotype.Component;
+
+@Component
+class GraphsStorage {
+  
+  private final Map<String, GraphDescriptionDTO> graphsById;
+
+  GraphsStorage() {
+    this.graphsById = PREDEFINED_GRAPHS.stream()
+        .collect(Collectors.toMap(GraphDescriptionDTO::getId, d -> d));
+  }
+
+  Optional<GraphDescriptionDTO> getDescription(String id) {
+    return Optional.ofNullable(graphsById.get(id));
+  }
+
+  Stream<GraphDescriptionDTO> getAll() {
+    return graphsById.values().stream();
+  }
+
+  private static final List<GraphDescriptionDTO> PREDEFINED_GRAPHS = List.of(
+
+      new GraphDescriptionDTO("broker_bytes_disk_ts")
+          .defaultPeriod(Duration.ofDays(7).toString())
+          .prometheusQuery("broker_bytes_disk{cluster=\"${cluster}\"}"),
+
+      new GraphDescriptionDTO("broker_bytes_disk")
+          .prometheusQuery("broker_bytes_disk{cluster=\"${cluster}\"}"),
+
+      new GraphDescriptionDTO("kafka_topic_partition_current_offset")
+          .prometheusQuery("topic_bytes_disk{cluster=\"${cluster}\"}"),
+
+      new GraphDescriptionDTO("kafka_topic_partition_current_offset_ts")
+          .defaultPeriod(Duration.ofDays(7).toString())
+          .prometheusQuery("topic_bytes_disk{cluster=\"${cluster}\"}"),
+
+      new GraphDescriptionDTO("kafka_topic_partition_current_offset_per_topic_ts")
+          .defaultPeriod(Duration.ofDays(7).toString())
+          .prometheusQuery("topic_bytes_disk{cluster=\"${cluster}\", topic = \"${topic}\"}")
+          .addParametersItem(new GraphParameterDTO().name("topic"))
+  );
+
+}

+ 1 - 0
kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/metrics/scrape/MetricsScrapping.java

@@ -74,6 +74,7 @@ public class MetricsScrapping {
   private void sendMetricsToSink(Metrics metrics) {
   private void sendMetricsToSink(Metrics metrics) {
     sink.send(prepareMetricsForSending(metrics))
     sink.send(prepareMetricsForSending(metrics))
         .doOnError(th -> log.warn("Error sending metrics to metrics sink", th))
         .doOnError(th -> log.warn("Error sending metrics to metrics sink", th))
+        .doOnTerminate(() -> log.debug("Metrics sent to sink")) //todo
         .subscribe();
         .subscribe();
   }
   }
 
 

+ 206 - 1
kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml

@@ -1,4 +1,4 @@
-openapi: 3.0.0
+openapi: 3.0.1
 info:
 info:
   description: Api Documentation
   description: Api Documentation
   version: 0.1.0
   version: 0.1.0
@@ -32,6 +32,52 @@ paths:
                   $ref: '#/components/schemas/Cluster'
                   $ref: '#/components/schemas/Cluster'
 
 
 
 
+  /api/clusters/{clusterName}/graphs/descriptions:
+    get:
+      tags:
+        - Graphs
+      summary: getGraphsList
+      operationId: getGraphsList
+      parameters:
+        - name: clusterName
+          in: path
+          required: true
+          schema:
+            type: string
+      responses:
+        200:
+          description: |
+            Success
+          content:
+            application/json:
+              schema:
+                $ref: '#/components/schemas/GraphDescriptions'
+
+  /api/clusters/{clusterName}/graphs/prometheus:
+    post:
+      tags:
+        - Graphs
+      summary: getGraphData
+      operationId: getGraphData
+      parameters:
+        - name: clusterName
+          in: path
+          required: true
+          schema:
+            type: string
+      requestBody:
+        content:
+          application/json:
+            schema:
+              $ref: '#/components/schemas/GraphDataRequest'
+      responses:
+        200:
+          description: OK
+          content:
+            application/json:
+              schema:
+                $ref: '#/components/schemas/PrometheusApiQueryResponse'
+
   /api/clusters/{clusterName}/cache:
   /api/clusters/{clusterName}/cache:
     post:
     post:
       tags:
       tags:
@@ -3653,6 +3699,165 @@ components:
           additionalProperties:
           additionalProperties:
             $ref: '#/components/schemas/ClusterConfigValidation'
             $ref: '#/components/schemas/ClusterConfigValidation'
 
 
+    GraphDataRequest:
+      type: object
+      properties:
+        id:
+          type: string
+        parameters:
+          type: object
+          additionalProperties:
+            type: string
+        from:
+          type: string
+          format: date-time
+        to:
+          type: string
+          format: date-time
+
+    GraphDescriptions:
+      type: object
+      properties:
+        graphs:
+          type: array
+          items:
+            $ref: '#/components/schemas/GraphDescription'
+
+    GraphDescription:
+      type: object
+      required: ["id"]
+      properties:
+        id:
+          type: string
+          description: Id that should be used to query data on API level
+        metadata:
+          description: Additional info that can be used on UI for rendering adjustments
+          type: object
+          additionalProperties: true
+        defaultPeriod:
+          type: string
+          description: ISO_8601 duration string. If not set - instant (not range) query will be executed
+        parameters:
+          type: array
+          items:
+            $ref: '#/components/schemas/GraphParameter'
+        prometheusQuery:
+          type: string
+          description: Mustache query template with param holders
+
+    GraphParameter:
+      type: object
+      properties:
+        name:
+          type: string
+
+    PrometheusApiBaseResponse:
+      type: object
+      required: [ status ]
+      properties:
+        status:
+          type: string
+          enum: [ "success", "error" ]
+        error:
+          type: string
+        errorType:
+          type: string
+        warnings:
+          type: array
+          items:
+            type: string
+
+    PrometheusApiQueryResponse:
+      type: object
+      allOf:
+        - $ref: "#/components/schemas/PrometheusApiBaseResponse"
+      properties:
+        data:
+          $ref: '#/components/schemas/PrometheusApiQueryResponseData'
+
+    PrometheusApiQueryResponseData:
+      type: object
+      required: [ "resultType" ]
+      properties:
+        resultType:
+          type: string
+        result:
+          type: array
+          items: { }
+#      discriminator:
+#        propertyName: resultType
+#        mapping:
+#          matrix: '#/components/schemas/MatrixQueryResponse'
+#          vector: '#/components/schemas/InstantVectorQueryResponse'
+#          scalar: '#/components/schemas/ScalarQueryResponse'
+#          string: '#/components/schemas/StringQueryResponse'
+#      anyOf:
+#        - $ref: '#/components/schemas/MatrixQueryResponse'
+#        - $ref: '#/components/schemas/InstantVectorQueryResponse'
+#        - $ref: '#/components/schemas/ScalarQueryResponse'
+#        - $ref: '#/components/schemas/StringQueryResponse'
+#
+#    MatrixQueryResponse:
+#      type: object
+#      allOf:
+#        - $ref: "#/components/schemas/QueryResponseData"
+#      properties:
+#        result:
+#          type: array
+#          description: |
+#            Format:
+#            [
+#              {
+#                "metric": { "<label_name>": "<label_value>", ... },
+#                "values": [ [ <unix_time>, "<sample_value>" ], ... ],
+#                "histograms": [ [ <unix_time>, <histogram> ], ... ]
+#              }, ...
+#            ]
+#          items: { }
+#
+#    InstantVectorQueryResponse:
+#      type: object
+#      allOf:
+#        - $ref: "#/components/schemas/QueryResponseData"
+#      properties:
+#        result:
+#          type: array
+#          description: |
+#            Format:
+#            [
+#              {
+#                "metric": { "<label_name>": "<label_value>", ... },
+#                "value": [ <unix_time>, "<sample_value>" ],
+#                "histogram": [ <unix_time>, <histogram> ]
+#             }, ...
+#            ]
+#          items: { }
+#
+#    ScalarQueryResponse:
+#      type: object
+#      allOf:
+#        - $ref: "#/components/schemas/QueryResponseData"
+#      properties:
+#        result:
+#          type: array
+#          description: |
+#            Format:
+#            [ <unix_time>, "<scalar_value>" ]
+#          items: { }
+#
+#    StringQueryResponse:
+#      type: object
+#      allOf:
+#        - $ref: "#/components/schemas/QueryResponseData"
+#      properties:
+#        result:
+#          type: array
+#          description: |
+#            Format:
+#            [ <unix_time>, "<string_value>" ]
+#          items: { }
+
+
     ApplicationPropertyValidation:
     ApplicationPropertyValidation:
       type: object
       type: object
       required: [error]
       required: [error]

+ 71 - 67
kafka-ui-contract/src/main/resources/swagger/prometheus-query-api.yaml

@@ -281,78 +281,82 @@ components:
       properties:
       properties:
         resultType:
         resultType:
           type: string
           type: string
-      discriminator:
-        propertyName: resultType
-        mapping:
-          matrix: '#/components/schemas/MatrixQueryResponse'
-          vector: '#/components/schemas/InstantVectorQueryResponse'
-          scalar: '#/components/schemas/ScalarQueryResponse'
-          string: '#/components/schemas/StringQueryResponse'
-      anyOf:
-        - $ref: '#/components/schemas/MatrixQueryResponse'
-        - $ref: '#/components/schemas/InstantVectorQueryResponse'
-        - $ref: '#/components/schemas/ScalarQueryResponse'
-        - $ref: '#/components/schemas/StringQueryResponse'
-
-    MatrixQueryResponse:
-      type: object
-      allOf:
-        - $ref: "#/components/schemas/QueryResponseData"
-      properties:
         result:
         result:
           type: array
           type: array
-          description: |
-            Format:
-            [
-              {
-                "metric": { "<label_name>": "<label_value>", ... },
-                "values": [ [ <unix_time>, "<sample_value>" ], ... ],
-                "histograms": [ [ <unix_time>, <histogram> ], ... ]
-              }, ...
-            ]
           items: { }
           items: { }
 
 
-    InstantVectorQueryResponse:
-      type: object
-      allOf:
-        - $ref: "#/components/schemas/QueryResponseData"
-      properties:
-        result:
-          type: array
-          description: |
-            Format:
-            [
-              {
-                "metric": { "<label_name>": "<label_value>", ... },
-                "value": [ <unix_time>, "<sample_value>" ],
-                "histogram": [ <unix_time>, <histogram> ]
-             }, ...
-            ]
-          items: { }
+#      discriminator:
+#        propertyName: resultType
+#        mapping:
+#          matrix: '#/components/schemas/MatrixQueryResponse'
+#          vector: '#/components/schemas/InstantVectorQueryResponse'
+#          scalar: '#/components/schemas/ScalarQueryResponse'
+#          string: '#/components/schemas/StringQueryResponse'
+#      anyOf:
+#        - $ref: '#/components/schemas/MatrixQueryResponse'
+#        - $ref: '#/components/schemas/InstantVectorQueryResponse'
+#        - $ref: '#/components/schemas/ScalarQueryResponse'
+#        - $ref: '#/components/schemas/StringQueryResponse'
 
 
-    ScalarQueryResponse:
-      type: object
-      allOf:
-        - $ref: "#/components/schemas/QueryResponseData"
-      properties:
-        result:
-          type: array
-          description: |
-            Format:
-            [ <unix_time>, "<scalar_value>" ]
-          items: { }
-
-    StringQueryResponse:
-      type: object
-      allOf:
-        - $ref: "#/components/schemas/QueryResponseData"
-      properties:
-        result:
-          type: array
-          description: |
-            Format:
-            [ <unix_time>, "<string_value>" ]
-          items: { }
+#    MatrixQueryResponse:
+#      type: object
+#      allOf:
+#        - $ref: "#/components/schemas/QueryResponseData"
+#      properties:
+#        result:
+#          type: array
+#          description: |
+#            Format:
+#            [
+#              {
+#                "metric": { "<label_name>": "<label_value>", ... },
+#                "values": [ [ <unix_time>, "<sample_value>" ], ... ],
+#                "histograms": [ [ <unix_time>, <histogram> ], ... ]
+#              }, ...
+#            ]
+#          items: { }
+#
+#    InstantVectorQueryResponse:
+#      type: object
+#      allOf:
+#        - $ref: "#/components/schemas/QueryResponseData"
+#      properties:
+#        result:
+#          type: array
+#          description: |
+#            Format:
+#            [
+#              {
+#                "metric": { "<label_name>": "<label_value>", ... },
+#                "value": [ <unix_time>, "<sample_value>" ],
+#                "histogram": [ <unix_time>, <histogram> ]
+#             }, ...
+#            ]
+#          items: { }
+#
+#    ScalarQueryResponse:
+#      type: object
+#      allOf:
+#        - $ref: "#/components/schemas/QueryResponseData"
+#      properties:
+#        result:
+#          type: array
+#          description: |
+#            Format:
+#            [ <unix_time>, "<scalar_value>" ]
+#          items: { }
+#
+#    StringQueryResponse:
+#      type: object
+#      allOf:
+#        - $ref: "#/components/schemas/QueryResponseData"
+#      properties:
+#        result:
+#          type: array
+#          description: |
+#            Format:
+#            [ <unix_time>, "<string_value>" ]
+#          items: { }
 
 
     SeriesResponse:
     SeriesResponse:
       type: object
       type: object