소스 검색

[#207] feat(api): add ksql_db feature, expection handler to ksql client

Ilnur Farukhshin 4 년 전
부모
커밋
61289dde61

+ 28 - 17
kafka-ui-api/src/main/java/com/provectus/kafka/ui/client/KsqlClient.java

@@ -2,14 +2,17 @@ package com.provectus.kafka.ui.client;
 
 import com.fasterxml.jackson.databind.JsonNode;
 import com.fasterxml.jackson.databind.ObjectMapper;
+import com.provectus.kafka.ui.exception.UnprocessableEntityException;
 import com.provectus.kafka.ui.model.KsqlCommandResponse;
 import com.provectus.kafka.ui.strategy.ksqlStatement.KsqlStatementStrategy;
 import lombok.RequiredArgsConstructor;
 import lombok.SneakyThrows;
 import lombok.extern.log4j.Log4j2;
+import org.springframework.http.HttpStatus;
 import org.springframework.http.MediaType;
 import org.springframework.stereotype.Service;
 import org.springframework.web.reactive.function.BodyInserters;
+import org.springframework.web.reactive.function.client.ClientResponse;
 import org.springframework.web.reactive.function.client.WebClient;
 import reactor.core.publisher.Mono;
 
@@ -17,23 +20,31 @@ import reactor.core.publisher.Mono;
 @RequiredArgsConstructor
 @Log4j2
 public final class KsqlClient {
-  private final WebClient webClient;
-  private final ObjectMapper mapper;
+    private final WebClient webClient;
+    private final ObjectMapper mapper;
 
-  public Mono<KsqlCommandResponse> execute(KsqlStatementStrategy ksqlStatement) {
-    return webClient.post()
-      .uri(ksqlStatement.getUri())
-      .accept(new MediaType("application","vnd.ksql.v1+json"))
-      .body(BodyInserters.fromValue(ksqlStatement.getKsqlCommand()))
-      .retrieve()
-      .bodyToMono(byte[].class)
-      .map(this::toJson)
-      .map(ksqlStatement::serializeResponse)
-      .doOnError(log::error);
-  }
+    public Mono<KsqlCommandResponse> execute(KsqlStatementStrategy ksqlStatement) {
+        return webClient.post()
+                .uri(ksqlStatement.getUri())
+                .accept(new MediaType("application", "vnd.ksql.v1+json"))
+                .body(BodyInserters.fromValue(ksqlStatement.getKsqlCommand()))
+                .retrieve()
+                .onStatus(HttpStatus::isError, this::getErrorMessage)
+                .bodyToMono(byte[].class)
+                .map(this::toJson)
+                .map(ksqlStatement::serializeResponse);
+    }
 
-  @SneakyThrows
-  private JsonNode toJson(byte[] content) {
-    return this.mapper.readTree(content);
-  }
+    private Mono<Throwable> getErrorMessage(ClientResponse response) {
+        return response
+                .bodyToMono(byte[].class)
+                .map(this::toJson)
+                .map(jsonNode -> jsonNode.get("message").asText())
+                .flatMap(error -> Mono.error(new UnprocessableEntityException(error)));
+    }
+
+    @SneakyThrows
+    private JsonNode toJson(byte[] content) {
+        return this.mapper.readTree(content);
+    }
 }

+ 1 - 1
kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/Feature.java

@@ -11,8 +11,8 @@ public enum Feature {
       .filter(Predicate.not(List::isEmpty))
       .isPresent()
   ),
+  KSQL_DB(cluster -> cluster.getKsqldbServer() != null),
   SCHEMA_REGISTRY(cluster -> cluster.getSchemaRegistry() != null);
-  // TODO: add feature for FE app
 
   private final Predicate<KafkaCluster> isEnabled;
 

+ 1 - 2
kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/KsqlService.java

@@ -29,7 +29,7 @@ public class KsqlService {
             .flatMap(host -> getStatementStrategyForKsqlCommand(ksqlCommand)
                     .map(statement -> statement.host(host))
             )
-            .flatMap(statement -> ksqlClient.execute(statement));
+            .flatMap(ksqlClient::execute);
   }
 
   private Mono<KsqlStatementStrategy> getStatementStrategyForKsqlCommand(Mono<KsqlCommand> ksqlCommand) {
@@ -39,7 +39,6 @@ public class KsqlService {
                     .map(s -> s.ksqlCommand(command))
                     .findFirst())
             .flatMap(Mono::justOrEmpty)
-            // TODO: handle not parsed statements?
             .switchIfEmpty(Mono.error(new UnprocessableEntityException("Invalid sql")));
   }
 }

+ 8 - 10
kafka-ui-api/src/main/java/com/provectus/kafka/ui/strategy/ksqlStatement/KsqlStatementStrategy.java

@@ -1,6 +1,7 @@
 package com.provectus.kafka.ui.strategy.ksqlStatement;
 
 import com.fasterxml.jackson.databind.JsonNode;
+import com.provectus.kafka.ui.exception.UnprocessableEntityException;
 import com.provectus.kafka.ui.model.KsqlCommand;
 import com.provectus.kafka.ui.model.KsqlCommandResponse;
 import com.provectus.kafka.ui.model.Table;
@@ -48,7 +49,7 @@ public abstract class KsqlStatementStrategy {
             Table table = items.isArray() ? getTableFromArray(items) : getTableFromObject(items);
             return commandResponse.data(table);
         }
-        throw new InternalError("Invalid data format");
+        throw new UnprocessableEntityException("KSQL DB response mapping error");
     }
 
     protected KsqlCommandResponse serializeMessageResponse(JsonNode response, String path) {
@@ -58,8 +59,7 @@ public abstract class KsqlStatementStrategy {
             JsonNode item = first.path(path);
             return commandResponse.message(getMessageFromObject(item));
         }
-        // TODO: handle
-        throw new InternalError("Invalid data format");
+        throw new UnprocessableEntityException("KSQL DB response mapping error");
     }
 
     protected KsqlCommandResponse serializeQueryResponse(JsonNode response) {
@@ -74,7 +74,7 @@ public abstract class KsqlStatementStrategy {
         JsonNode headerRow = response.get(0);
         if (headerRow.isObject() && headerRow.size() > 0) {
             String schema = headerRow.get("header").get("schema").asText();
-            return Arrays.stream(schema.split(",")).map(s -> s.trim()).collect(Collectors.toList());
+            return Arrays.stream(schema.split(",")).map(String::trim).collect(Collectors.toList());
         }
         return new ArrayList<>();
     }
@@ -84,7 +84,7 @@ public abstract class KsqlStatementStrategy {
                 .filter(row -> row.has("row") && row.get("row").has("columns"))
                 .map(row -> row.get("row").get("columns"))
                 .map(cellNode -> getStreamForJsonArray(cellNode)
-                                .map(cell -> cell.asText())
+                                .map(JsonNode::asText)
                                 .collect(Collectors.toList())
                 )
                 .collect(Collectors.toList());
@@ -115,7 +115,7 @@ public abstract class KsqlStatementStrategy {
         if (node.isObject() && node.has("message")) {
             return node.get("message").asText();
         }
-        throw new InternalError("can't get message from empty object or array");
+        throw new UnprocessableEntityException("KSQL DB response mapping error");
     }
 
     private List<List<String>> getTableRows(JsonNode node, List<String> keys) {
@@ -131,8 +131,7 @@ public abstract class KsqlStatementStrategy {
         if (node.isArray() && node.size() > 0) {
             return StreamSupport.stream(node.spliterator(), false);
         }
-        // TODO: handle
-        throw new InternalError("not JsonArray or empty");
+        throw new UnprocessableEntityException("KSQL DB response mapping error");
     }
 
     private List<String> getJsonObjectKeys(JsonNode node) {
@@ -141,8 +140,7 @@ public abstract class KsqlStatementStrategy {
                     Spliterators.spliteratorUnknownSize(node.fieldNames(), Spliterator.ORDERED), false
             ).collect(Collectors.toList());
         }
-        // TODO: handle
-        throw new InternalError("Invalid data format");
+        throw new UnprocessableEntityException("KSQL DB response mapping error");
     }
 
     private List<String> getJsonObjectValues(JsonNode node) {

+ 1 - 1
kafka-ui-api/src/main/java/com/provectus/kafka/ui/strategy/ksqlStatement/ListStrategy.java

@@ -10,8 +10,8 @@ import java.util.Optional;
 @Component
 public class ListStrategy extends KsqlStatementStrategy {
     private final String requestPath = "/ksql";
-    private String responseValueKey = "";
     private final List<String> statements = List.of("functions", "topics", "streams", "tables");
+    private String responseValueKey = "";
 
     @Override
     public KsqlCommandResponse serializeResponse(JsonNode response) {