Browse Source

[#207] feat(api, contract): return ksql command result as table

Ilnur Farukhshin 4 years ago
parent
commit
3686e75c8f

+ 12 - 2
kafka-ui-api/src/main/java/com/provectus/kafka/ui/client/KsqlClient.java

@@ -1,8 +1,11 @@
 package com.provectus.kafka.ui.client;
 
+import com.fasterxml.jackson.databind.JsonNode;
 import com.fasterxml.jackson.databind.ObjectMapper;
+import com.provectus.kafka.ui.model.KsqlResponseTable;
 import com.provectus.kafka.ui.strategy.ksqlStatement.KsqlStatementStrategy;
 import lombok.RequiredArgsConstructor;
+import lombok.SneakyThrows;
 import lombok.extern.log4j.Log4j2;
 import org.springframework.http.MediaType;
 import org.springframework.stereotype.Service;
@@ -15,15 +18,22 @@ import reactor.core.publisher.Mono;
 @Log4j2
 public final class KsqlClient {
   private final WebClient webClient;
+  private final ObjectMapper mapper;
 
-  public Mono<Object> execute(KsqlStatementStrategy ksqlStatement) {
+  public Mono<KsqlResponseTable> execute(KsqlStatementStrategy ksqlStatement) {
     return webClient.post()
       .uri(ksqlStatement.getUri())
       .accept(new MediaType("application","vnd.ksql.v1+json"))
       .body(BodyInserters.fromValue(ksqlStatement.getKsqlCommand()))
       .retrieve()
-      .bodyToMono(String.class)
+      .bodyToMono(byte[].class)
+      .map(this::toJson)
       .map(ksqlStatement::serializeResponse)
       .doOnError(log::error);
   }
+
+  @SneakyThrows
+  private JsonNode toJson(byte[] content) {
+    return this.mapper.readTree(content);
+  }
 }

+ 5 - 4
kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/KsqlController.java

@@ -3,6 +3,7 @@ package com.provectus.kafka.ui.controller;
 import com.provectus.kafka.ui.api.KsqlApi;
 import com.provectus.kafka.ui.model.KsqlCommand;
 
+import com.provectus.kafka.ui.model.KsqlResponseTable;
 import com.provectus.kafka.ui.service.KsqlService;
 import lombok.RequiredArgsConstructor;
 import lombok.extern.log4j.Log4j2;
@@ -18,9 +19,9 @@ public class KsqlController implements KsqlApi {
   private final KsqlService ksqlService;
 
   @Override
-  public Mono<ResponseEntity<Object>> executeKsqlCommand(String clusterName,
-                                                               Mono<KsqlCommand> ksqlCommand,
-                                                               ServerWebExchange exchange) {
-    return Mono.just(ResponseEntity.ok(ksqlService.executeKsqlCommand(clusterName, ksqlCommand)));
+  public Mono<ResponseEntity<KsqlResponseTable>> executeKsqlCommand(String clusterName,
+                                                                    Mono<KsqlCommand> ksqlCommand,
+                                                                    ServerWebExchange exchange) {
+    return ksqlService.executeKsqlCommand(clusterName, ksqlCommand).map(ResponseEntity::ok);
   }
 }

+ 1 - 0
kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/Feature.java

@@ -12,6 +12,7 @@ public enum Feature {
       .isPresent()
   ),
   SCHEMA_REGISTRY(cluster -> cluster.getSchemaRegistry() != null);
+  // TODO: add feature for FE app
 
   private final Predicate<KafkaCluster> isEnabled;
 

+ 2 - 1
kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/KsqlService.java

@@ -6,6 +6,7 @@ import com.provectus.kafka.ui.exception.KsqlDbNotFoundException;
 import com.provectus.kafka.ui.exception.UnprocessableEntityException;
 import com.provectus.kafka.ui.model.KafkaCluster;
 import com.provectus.kafka.ui.model.KsqlCommand;
+import com.provectus.kafka.ui.model.KsqlResponseTable;
 import com.provectus.kafka.ui.strategy.ksqlStatement.KsqlStatementStrategy;
 import lombok.RequiredArgsConstructor;
 import org.springframework.stereotype.Service;
@@ -20,7 +21,7 @@ public class KsqlService {
   private final ClustersStorage clustersStorage;
   private final List<KsqlStatementStrategy> commandParamsStrategies;
 
-  public Mono<Object> executeKsqlCommand(String clusterName, Mono<KsqlCommand> ksqlCommand) {
+  public Mono<KsqlResponseTable> executeKsqlCommand(String clusterName, Mono<KsqlCommand> ksqlCommand) {
     return Mono.justOrEmpty(clustersStorage.getClusterByName(clusterName))
             .switchIfEmpty(Mono.error(ClusterNotFoundException::new))
             .map(KafkaCluster::getKsqldbServer)

+ 58 - 3
kafka-ui-api/src/main/java/com/provectus/kafka/ui/strategy/ksqlStatement/KsqlStatementStrategy.java

@@ -1,16 +1,28 @@
 package com.provectus.kafka.ui.strategy.ksqlStatement;
 
+import com.fasterxml.jackson.databind.JsonNode;
 import com.provectus.kafka.ui.model.KsqlCommand;
+import com.provectus.kafka.ui.model.KsqlResponseTable;
+
+import java.util.*;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
 
 public abstract class KsqlStatementStrategy {
     protected String host = null;
     protected KsqlCommand ksqlCommand = null;
 
     public String getUri() {
-        if (this.host != null) { return this.host + this.getRequestPath(); }
+        if (this.host != null) {
+            return this.host + this.getRequestPath();
+        }
         return null;
     }
 
+    public boolean test(String sql) {
+        return sql.trim().toLowerCase().matches(getTestRegExp());
+    }
+
     public KsqlStatementStrategy host(String host) {
         this.host = host;
         return this;
@@ -25,9 +37,52 @@ public abstract class KsqlStatementStrategy {
         return this;
     }
 
-    public abstract Object serializeResponse(String response);
+    protected KsqlResponseTable getKsqlTable(JsonNode node) {
+        KsqlResponseTable table = new KsqlResponseTable();
+        table.headers(new ArrayList<>()).rows(new ArrayList<>());
+        if (node.size() > 0) {
+            List<String> keys = getTableHeaders(node.get(0));
+            List<List<String>> rows = getTableRows(node, keys);
+            table.headers(keys).rows(rows);
+        }
+        return table;
+    }
 
-    public abstract boolean test(String sql);
+    protected KsqlResponseTable serializeTableResponse(JsonNode response, String path) {
+        if (response.isArray() && response.size() > 0) {
+            JsonNode first = response.get(0);
+            JsonNode items = first.path(path);
+            return this.getKsqlTable(items);
+        }
+        throw new InternalError("Invalid data format");
+    }
+
+    private List<List<String>> getTableRows(JsonNode node, List<String> keys) {
+        if (node.isArray() && node.size() > 0) {
+            return StreamSupport.stream(node.spliterator(), false)
+                    .map(row -> keys.stream()
+                            .map(header -> row.get(header).asText())
+                            .collect(Collectors.toList())
+                    )
+                    .collect(Collectors.toList());
+        }
+        // TODO: handle
+        throw new InternalError("Invalid data format");
+    }
+
+    private List<String> getTableHeaders(JsonNode node) {
+        if (node.isObject()) {
+            return StreamSupport.stream(
+                    Spliterators.spliteratorUnknownSize(node.fieldNames(), Spliterator.ORDERED), false
+            ).collect(Collectors.toList());
+        }
+        // TODO: handle
+        throw new InternalError("Invalid data format");
+    }
+
+    public abstract KsqlResponseTable serializeResponse(JsonNode response);
 
     protected abstract String getRequestPath();
+
+    protected abstract String getTestRegExp();
 }

+ 0 - 29
kafka-ui-api/src/main/java/com/provectus/kafka/ui/strategy/ksqlStatement/ListStreamsStrategy.java

@@ -1,29 +0,0 @@
-package com.provectus.kafka.ui.strategy.ksqlStatement;
-
-import com.google.gson.JsonArray;
-import com.google.gson.JsonParser;
-import lombok.SneakyThrows;
-import org.springframework.stereotype.Component;
-
-
-@Component
-public class ListStreamsStrategy extends KsqlStatementStrategy {
-    private final String requestPath = "/ksql";
-
-    @SneakyThrows
-    @Override
-    public Object serializeResponse(String response) {
-        JsonArray jsonArray = JsonParser.parseString(response).getAsJsonArray();
-        return jsonArray.get(0).getAsJsonObject().get("streams").getAsJsonArray().toString();
-    }
-
-    @Override
-    public boolean test(String sql) {
-        return sql.trim().toLowerCase().matches("(list|show) streams;");
-    }
-
-    @Override
-    protected String getRequestPath() {
-        return requestPath;
-    }
-}

+ 0 - 26
kafka-ui-api/src/main/java/com/provectus/kafka/ui/strategy/ksqlStatement/ListTopicsStrategy.java

@@ -1,26 +0,0 @@
-package com.provectus.kafka.ui.strategy.ksqlStatement;
-
-import com.google.gson.JsonArray;
-import com.google.gson.JsonParser;
-import org.springframework.stereotype.Component;
-
-@Component
-public class ListTopicsStrategy extends KsqlStatementStrategy {
-    private final String requestPath = "/ksql";
-
-    @Override
-    public Object serializeResponse(String response) {
-        JsonArray jsonArray = JsonParser.parseString(response).getAsJsonArray();
-        return jsonArray.get(0).getAsJsonObject().get("topics").getAsJsonArray().toString();
-    }
-
-    @Override
-    public boolean test(String sql) {
-        return sql.trim().toLowerCase().matches("(list|show) topics;");
-    }
-
-    @Override
-    protected String getRequestPath() {
-        return requestPath;
-    }
-}

+ 27 - 0
kafka-ui-api/src/main/java/com/provectus/kafka/ui/strategy/ksqlStatement/ShowPropertiesStrategy.java

@@ -0,0 +1,27 @@
+package com.provectus.kafka.ui.strategy.ksqlStatement;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.provectus.kafka.ui.model.KsqlResponseTable;
+import org.springframework.stereotype.Component;
+
+
+@Component
+public class ShowPropertiesStrategy extends KsqlStatementStrategy {
+    private final String requestPath = "/ksql";
+    private final String responseValueKey = "properties";
+
+    @Override
+    public KsqlResponseTable serializeResponse(JsonNode response) {
+        return serializeTableResponse(response, responseValueKey);
+    }
+
+    @Override
+    protected String getRequestPath() {
+        return requestPath;
+    }
+
+    @Override
+    protected String getTestRegExp() {
+        return "show properties;";
+    }
+}

+ 27 - 0
kafka-ui-api/src/main/java/com/provectus/kafka/ui/strategy/ksqlStatement/ShowQueriesStrategy.java

@@ -0,0 +1,27 @@
+package com.provectus.kafka.ui.strategy.ksqlStatement;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.provectus.kafka.ui.model.KsqlResponseTable;
+import org.springframework.stereotype.Component;
+
+
+@Component
+public class ShowQueriesStrategy extends KsqlStatementStrategy {
+    private final String requestPath = "/ksql";
+    private final String responseValueKey = "query";
+
+    @Override
+    public KsqlResponseTable serializeResponse(JsonNode response) {
+        return serializeTableResponse(response, responseValueKey);
+    }
+
+    @Override
+    protected String getRequestPath() {
+        return requestPath;
+    }
+
+    @Override
+    protected String getTestRegExp() {
+        return "show queries;";
+    }
+}

+ 27 - 0
kafka-ui-api/src/main/java/com/provectus/kafka/ui/strategy/ksqlStatement/ShowStreamsStrategy.java

@@ -0,0 +1,27 @@
+package com.provectus.kafka.ui.strategy.ksqlStatement;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.provectus.kafka.ui.model.KsqlResponseTable;
+import org.springframework.stereotype.Component;
+
+
+@Component
+public class ShowStreamsStrategy extends KsqlStatementStrategy {
+    private final String requestPath = "/ksql";
+    private final String responseValueKey = "streams";
+
+    @Override
+    public KsqlResponseTable serializeResponse(JsonNode response) {
+        return serializeTableResponse(response, responseValueKey);
+    }
+
+    @Override
+    protected String getRequestPath() {
+        return requestPath;
+    }
+
+    @Override
+    protected String getTestRegExp() {
+        return "(list|show) streams;";
+    }
+}

+ 26 - 0
kafka-ui-api/src/main/java/com/provectus/kafka/ui/strategy/ksqlStatement/ShowTablesStrategy.java

@@ -0,0 +1,26 @@
+package com.provectus.kafka.ui.strategy.ksqlStatement;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.provectus.kafka.ui.model.KsqlResponseTable;
+import org.springframework.stereotype.Component;
+
+@Component
+public class ShowTablesStrategy extends KsqlStatementStrategy {
+    private final String requestPath = "/ksql";
+    private final String responseValueKey = "tables";
+
+    @Override
+    public KsqlResponseTable serializeResponse(JsonNode response) {
+        return serializeTableResponse(response, responseValueKey);
+    }
+
+    @Override
+    protected String getRequestPath() {
+        return requestPath;
+    }
+
+    @Override
+    protected String getTestRegExp() {
+        return "(list|show) tables;";
+    }
+}

+ 26 - 0
kafka-ui-api/src/main/java/com/provectus/kafka/ui/strategy/ksqlStatement/ShowTopicsStrategy.java

@@ -0,0 +1,26 @@
+package com.provectus.kafka.ui.strategy.ksqlStatement;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.provectus.kafka.ui.model.KsqlResponseTable;
+import org.springframework.stereotype.Component;
+
+@Component
+public class ShowTopicsStrategy extends KsqlStatementStrategy {
+    private final String requestPath = "/ksql";
+    private final String responseValueKey = "topics";
+
+    @Override
+    public KsqlResponseTable serializeResponse(JsonNode response) {
+        return serializeTableResponse(response, responseValueKey);
+    }
+
+    @Override
+    protected String getRequestPath() {
+        return requestPath;
+    }
+
+    @Override
+    protected String getTestRegExp() {
+        return "(list|show) topics;";
+    }
+}

+ 17 - 1
kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml

@@ -1079,7 +1079,7 @@ paths:
           content:
             application/json:
               schema:
-                type: object
+                $ref: '#/components/schemas/KsqlResponseTable'
 
   /api/clusters/{clusterName}/connects/{connectName}/plugins:
     get:
@@ -1856,6 +1856,22 @@ components:
       required:
         - ksql
 
+    KsqlResponseTable:
+      type: object
+      properties:
+        headers:
+          type: array
+          items:
+            type: string
+        rows:
+          type: array
+          items:
+            type: array
+            items:
+              type: string
+      required:
+        - ksql
+
     FullConnectorInfo:
       type: object
       properties: