Jelajahi Sumber

Merge pull request #682 from provectus/issue207/ksqldb

Add KSQL DB support
Ilnur Farukhshin 3 tahun lalu
induk
melakukan
693cd475f3
27 mengubah file dengan 1276 tambahan dan 2 penghapusan
  1. 1 0
      README.md
  2. 50 0
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/client/KsqlClient.java
  3. 1 0
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/config/ClustersProperties.java
  4. 26 0
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/KsqlController.java
  5. 2 1
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/exception/ErrorCode.java
  6. 13 0
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/exception/KsqlDbNotFoundException.java
  7. 1 0
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/Feature.java
  8. 1 0
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/KafkaCluster.java
  9. 49 0
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/KsqlService.java
  10. 166 0
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/strategy/ksql/statement/BaseStrategy.java
  11. 20 0
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/strategy/ksql/statement/CreateStrategy.java
  12. 20 0
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/strategy/ksql/statement/DescribeStrategy.java
  13. 20 0
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/strategy/ksql/statement/DropStrategy.java
  14. 20 0
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/strategy/ksql/statement/ExplainStrategy.java
  15. 24 0
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/strategy/ksql/statement/SelectStrategy.java
  16. 57 0
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/strategy/ksql/statement/ShowStrategy.java
  17. 20 0
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/strategy/ksql/statement/TerminateStrategy.java
  18. 2 1
      kafka-ui-api/src/main/resources/application-local.yml
  19. 124 0
      kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/KsqlServiceTest.java
  20. 85 0
      kafka-ui-api/src/test/java/com/provectus/kafka/ui/strategy/ksql/statement/CreateStrategyTest.java
  21. 76 0
      kafka-ui-api/src/test/java/com/provectus/kafka/ui/strategy/ksql/statement/DescribeStrategyTest.java
  22. 75 0
      kafka-ui-api/src/test/java/com/provectus/kafka/ui/strategy/ksql/statement/DropStrategyTest.java
  23. 74 0
      kafka-ui-api/src/test/java/com/provectus/kafka/ui/strategy/ksql/statement/ExplainStrategyTest.java
  24. 79 0
      kafka-ui-api/src/test/java/com/provectus/kafka/ui/strategy/ksql/statement/SelectStrategyTest.java
  25. 135 0
      kafka-ui-api/src/test/java/com/provectus/kafka/ui/strategy/ksql/statement/ShowStrategyTest.java
  26. 72 0
      kafka-ui-api/src/test/java/com/provectus/kafka/ui/strategy/ksql/statement/TerminateStrategyTest.java
  27. 63 0
      kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml

+ 1 - 0
README.md

@@ -163,6 +163,7 @@ For example, if you want to use an environment variable to set the `name` parame
 |`KAFKA_CLUSTERS_0_NAME` | Cluster name
 |`KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS` 	|Address where to connect 
 |`KAFKA_CLUSTERS_0_ZOOKEEPER` 	| Zookeper service address 
+|`KAFKA_CLUSTERS_0_KSQLDBSERVER` 	| KSQL DB server address 
 |`KAFKA_CLUSTERS_0_PROPERTIES_SECURITY_PROTOCOL` 	|Security protocol to connect to the brokers. For SSL connection use "SSL", for plaintext connection don't set this environment variable
 |`KAFKA_CLUSTERS_0_SCHEMAREGISTRY`   	|SchemaRegistry's address
 |`KAFKA_CLUSTERS_0_SCHEMAREGISTRYAUTH_USERNAME`   	|SchemaRegistry's basic authentication username

+ 50 - 0
kafka-ui-api/src/main/java/com/provectus/kafka/ui/client/KsqlClient.java

@@ -0,0 +1,50 @@
+package com.provectus.kafka.ui.client;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.provectus.kafka.ui.exception.UnprocessableEntityException;
+import com.provectus.kafka.ui.model.KsqlCommandResponse;
+import com.provectus.kafka.ui.strategy.ksql.statement.BaseStrategy;
+import lombok.RequiredArgsConstructor;
+import lombok.SneakyThrows;
+import lombok.extern.log4j.Log4j2;
+import org.springframework.http.HttpStatus;
+import org.springframework.http.MediaType;
+import org.springframework.stereotype.Service;
+import org.springframework.web.reactive.function.BodyInserters;
+import org.springframework.web.reactive.function.client.ClientResponse;
+import org.springframework.web.reactive.function.client.WebClient;
+import reactor.core.publisher.Mono;
+
+@Service
+@RequiredArgsConstructor
+@Log4j2
+public class KsqlClient {
+  private final WebClient webClient;
+  private final ObjectMapper mapper;
+
+  public Mono<KsqlCommandResponse> execute(BaseStrategy ksqlStatement) {
+    return webClient.post()
+        .uri(ksqlStatement.getUri())
+        .accept(new MediaType("application", "vnd.ksql.v1+json"))
+        .body(BodyInserters.fromValue(ksqlStatement.getKsqlCommand()))
+        .retrieve()
+        .onStatus(HttpStatus::isError, this::getErrorMessage)
+        .bodyToMono(byte[].class)
+        .map(this::toJson)
+        .map(ksqlStatement::serializeResponse);
+  }
+
+  private Mono<Throwable> getErrorMessage(ClientResponse response) {
+    return response
+        .bodyToMono(byte[].class)
+        .map(this::toJson)
+        .map(jsonNode -> jsonNode.get("message").asText())
+        .flatMap(error -> Mono.error(new UnprocessableEntityException(error)));
+  }
+
+  @SneakyThrows
+  private JsonNode toJson(byte[] content) {
+    return this.mapper.readTree(content);
+  }
+}

+ 1 - 0
kafka-ui-api/src/main/java/com/provectus/kafka/ui/config/ClustersProperties.java

@@ -21,6 +21,7 @@ public class ClustersProperties {
     String zookeeper;
     String schemaRegistry;
     SchemaRegistryAuth schemaRegistryAuth;
+    String ksqldbServer;
     String schemaNameTemplate = "%s-value";
     String keySchemaNameTemplate = "%s-key";
     String protobufFile;

+ 26 - 0
kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/KsqlController.java

@@ -0,0 +1,26 @@
+package com.provectus.kafka.ui.controller;
+
+import com.provectus.kafka.ui.api.KsqlApi;
+import com.provectus.kafka.ui.model.KsqlCommand;
+import com.provectus.kafka.ui.model.KsqlCommandResponse;
+import com.provectus.kafka.ui.service.KsqlService;
+import lombok.RequiredArgsConstructor;
+import lombok.extern.log4j.Log4j2;
+import org.springframework.http.ResponseEntity;
+import org.springframework.web.bind.annotation.RestController;
+import org.springframework.web.server.ServerWebExchange;
+import reactor.core.publisher.Mono;
+
+@RestController
+@RequiredArgsConstructor
+@Log4j2
+public class KsqlController implements KsqlApi {
+  private final KsqlService ksqlService;
+
+  @Override
+  public Mono<ResponseEntity<KsqlCommandResponse>> executeKsqlCommand(String clusterName,
+                                                                      Mono<KsqlCommand> ksqlCommand,
+                                                                      ServerWebExchange exchange) {
+    return ksqlService.executeKsqlCommand(clusterName, ksqlCommand).map(ResponseEntity::ok);
+  }
+}

+ 2 - 1
kafka-ui-api/src/main/java/com/provectus/kafka/ui/exception/ErrorCode.java

@@ -19,7 +19,8 @@ public enum ErrorCode {
   CLUSTER_NOT_FOUND(4007, HttpStatus.NOT_FOUND),
   TOPIC_NOT_FOUND(4008, HttpStatus.NOT_FOUND),
   SCHEMA_NOT_FOUND(4009, HttpStatus.NOT_FOUND),
-  CONNECT_NOT_FOUND(4010, HttpStatus.NOT_FOUND);
+  CONNECT_NOT_FOUND(4010, HttpStatus.NOT_FOUND),
+  KSQLDB_NOT_FOUND(4011, HttpStatus.NOT_FOUND);
 
   static {
     // codes uniqueness check

+ 13 - 0
kafka-ui-api/src/main/java/com/provectus/kafka/ui/exception/KsqlDbNotFoundException.java

@@ -0,0 +1,13 @@
+package com.provectus.kafka.ui.exception;
+
+public class KsqlDbNotFoundException extends CustomBaseException {
+
+  public KsqlDbNotFoundException() {
+    super("KSQL DB not found");
+  }
+
+  @Override
+  public ErrorCode getErrorCode() {
+    return ErrorCode.KSQLDB_NOT_FOUND;
+  }
+}

+ 1 - 0
kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/Feature.java

@@ -11,6 +11,7 @@ public enum Feature {
       .filter(Predicate.not(List::isEmpty))
       .isPresent()
   ),
+  KSQL_DB(cluster -> cluster.getKsqldbServer() != null),
   SCHEMA_REGISTRY(cluster -> cluster.getSchemaRegistry() != null);
 
   private final Predicate<KafkaCluster> isEnabled;

+ 1 - 0
kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/KafkaCluster.java

@@ -16,6 +16,7 @@ public class KafkaCluster {
   private final String bootstrapServers;
   private final String zookeeper;
   private final InternalSchemaRegistry schemaRegistry;
+  private final String ksqldbServer;
   private final List<KafkaConnectCluster> kafkaConnect;
   private final String schemaNameTemplate;
   private final String keySchemaNameTemplate;

+ 49 - 0
kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/KsqlService.java

@@ -0,0 +1,49 @@
+package com.provectus.kafka.ui.service;
+
+import com.provectus.kafka.ui.client.KsqlClient;
+import com.provectus.kafka.ui.exception.ClusterNotFoundException;
+import com.provectus.kafka.ui.exception.KsqlDbNotFoundException;
+import com.provectus.kafka.ui.exception.UnprocessableEntityException;
+import com.provectus.kafka.ui.model.KafkaCluster;
+import com.provectus.kafka.ui.model.KsqlCommand;
+import com.provectus.kafka.ui.model.KsqlCommandResponse;
+import com.provectus.kafka.ui.strategy.ksql.statement.BaseStrategy;
+import java.util.List;
+import lombok.RequiredArgsConstructor;
+import org.springframework.stereotype.Service;
+import reactor.core.publisher.Mono;
+
+@Service
+@RequiredArgsConstructor
+public class KsqlService {
+  private final KsqlClient ksqlClient;
+  private final ClustersStorage clustersStorage;
+  private final List<BaseStrategy> ksqlStatementStrategies;
+
+  public Mono<KsqlCommandResponse> executeKsqlCommand(String clusterName,
+                                                      Mono<KsqlCommand> ksqlCommand) {
+    return Mono.justOrEmpty(clustersStorage.getClusterByName(clusterName))
+        .switchIfEmpty(Mono.error(ClusterNotFoundException::new))
+        .map(KafkaCluster::getKsqldbServer)
+        .onErrorResume(e -> {
+          Throwable throwable =
+              e instanceof ClusterNotFoundException ? e : new KsqlDbNotFoundException();
+          return Mono.error(throwable);
+        })
+        .flatMap(host -> getStatementStrategyForKsqlCommand(ksqlCommand)
+            .map(statement -> statement.host(host))
+        )
+        .flatMap(ksqlClient::execute);
+  }
+
+  private Mono<BaseStrategy> getStatementStrategyForKsqlCommand(
+      Mono<KsqlCommand> ksqlCommand) {
+    return ksqlCommand
+        .map(command -> ksqlStatementStrategies.stream()
+            .filter(s -> s.test(command.getKsql()))
+            .map(s -> s.ksqlCommand(command))
+            .findFirst())
+        .flatMap(Mono::justOrEmpty)
+        .switchIfEmpty(Mono.error(new UnprocessableEntityException("Invalid sql")));
+  }
+}

+ 166 - 0
kafka-ui-api/src/main/java/com/provectus/kafka/ui/strategy/ksql/statement/BaseStrategy.java

@@ -0,0 +1,166 @@
+package com.provectus.kafka.ui.strategy.ksql.statement;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.provectus.kafka.ui.exception.UnprocessableEntityException;
+import com.provectus.kafka.ui.model.KsqlCommand;
+import com.provectus.kafka.ui.model.KsqlCommandResponse;
+import com.provectus.kafka.ui.model.Table;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Spliterator;
+import java.util.Spliterators;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import java.util.stream.Stream;
+import java.util.stream.StreamSupport;
+
+public abstract class BaseStrategy {
+  protected static final String KSQL_REQUEST_PATH = "/ksql";
+  protected static final String QUERY_REQUEST_PATH = "/query";
+  private static final String MAPPING_EXCEPTION_ERROR = "KSQL DB response mapping error";
+  protected String host = null;
+  protected KsqlCommand ksqlCommand = null;
+
+  public String getUri() {
+    if (this.host != null) {
+      return this.host + this.getRequestPath();
+    }
+    throw new UnprocessableEntityException("Strategy doesn't have host");
+  }
+
+  public boolean test(String sql) {
+    return sql.trim().toLowerCase().matches(getTestRegExp());
+  }
+
+  public BaseStrategy host(String host) {
+    this.host = host;
+    return this;
+  }
+
+  public KsqlCommand getKsqlCommand() {
+    return ksqlCommand;
+  }
+
+  public BaseStrategy ksqlCommand(KsqlCommand ksqlCommand) {
+    this.ksqlCommand = ksqlCommand;
+    return this;
+  }
+
+  protected String getRequestPath() {
+    return BaseStrategy.KSQL_REQUEST_PATH;
+  }
+
+  protected KsqlCommandResponse serializeTableResponse(JsonNode response, String key) {
+    JsonNode item = getResponseFirstItemValue(response, key);
+    Table table = item.isArray() ? getTableFromArray(item) : getTableFromObject(item);
+    return (new KsqlCommandResponse()).data(table);
+  }
+
+  protected KsqlCommandResponse serializeMessageResponse(JsonNode response, String key) {
+    JsonNode item = getResponseFirstItemValue(response, key);
+    return (new KsqlCommandResponse()).message(getMessageFromObject(item));
+  }
+
+  protected KsqlCommandResponse serializeQueryResponse(JsonNode response) {
+    if (response.isArray() && response.size() > 0) {
+      Table table = (new Table())
+          .headers(getQueryResponseHeader(response))
+          .rows(getQueryResponseRows(response));
+      return (new KsqlCommandResponse()).data(table);
+    }
+    throw new UnprocessableEntityException(MAPPING_EXCEPTION_ERROR);
+  }
+
+  private JsonNode getResponseFirstItemValue(JsonNode response, String key) {
+    if (response.isArray() && response.size() > 0) {
+      JsonNode first = response.get(0);
+      if (first.has(key)) {
+        return first.path(key);
+      }
+    }
+    throw new UnprocessableEntityException(MAPPING_EXCEPTION_ERROR);
+  }
+
+  private List<String> getQueryResponseHeader(JsonNode response) {
+    JsonNode headerRow = response.get(0);
+    if (headerRow.isObject() && headerRow.has("header")) {
+      String schema = headerRow.get("header").get("schema").asText();
+      return Arrays.stream(schema.split(",")).map(String::trim).collect(Collectors.toList());
+    }
+    return new ArrayList<>();
+  }
+
+  private List<List<String>> getQueryResponseRows(JsonNode node) {
+    return getStreamForJsonArray(node)
+        .filter(row -> row.has("row") && row.get("row").has("columns"))
+        .map(row -> row.get("row").get("columns"))
+        .map(cellNode -> getStreamForJsonArray(cellNode)
+            .map(JsonNode::asText)
+            .collect(Collectors.toList())
+        )
+        .collect(Collectors.toList());
+  }
+
+  private Table getTableFromArray(JsonNode node) {
+    Table table = new Table();
+    table.headers(new ArrayList<>()).rows(new ArrayList<>());
+    if (node.size() > 0) {
+      List<String> keys = getJsonObjectKeys(node.get(0));
+      List<List<String>> rows = getTableRows(node, keys);
+      table.headers(keys).rows(rows);
+    }
+    return table;
+  }
+
+  private Table getTableFromObject(JsonNode node) {
+    List<String> keys = getJsonObjectKeys(node);
+    List<String> values = getJsonObjectValues(node);
+    List<List<String>> rows = IntStream
+        .range(0, keys.size())
+        .mapToObj(i -> List.of(keys.get(i), values.get(i)))
+        .collect(Collectors.toList());
+    return (new Table()).headers(List.of("key", "value")).rows(rows);
+  }
+
+  private String getMessageFromObject(JsonNode node) {
+    if (node.isObject() && node.has("message")) {
+      return node.get("message").asText();
+    }
+    throw new UnprocessableEntityException(MAPPING_EXCEPTION_ERROR);
+  }
+
+  private List<List<String>> getTableRows(JsonNode node, List<String> keys) {
+    return getStreamForJsonArray(node)
+        .map(row -> keys.stream()
+            .map(header -> row.get(header).asText())
+            .collect(Collectors.toList())
+        )
+        .collect(Collectors.toList());
+  }
+
+  private Stream<JsonNode> getStreamForJsonArray(JsonNode node) {
+    if (node.isArray() && node.size() > 0) {
+      return StreamSupport.stream(node.spliterator(), false);
+    }
+    throw new UnprocessableEntityException(MAPPING_EXCEPTION_ERROR);
+  }
+
+  private List<String> getJsonObjectKeys(JsonNode node) {
+    if (node.isObject()) {
+      return StreamSupport.stream(
+          Spliterators.spliteratorUnknownSize(node.fieldNames(), Spliterator.ORDERED), false
+      ).collect(Collectors.toList());
+    }
+    throw new UnprocessableEntityException(MAPPING_EXCEPTION_ERROR);
+  }
+
+  private List<String> getJsonObjectValues(JsonNode node) {
+    return getJsonObjectKeys(node).stream().map(key -> node.get(key).asText())
+        .collect(Collectors.toList());
+  }
+
+  public abstract KsqlCommandResponse serializeResponse(JsonNode response);
+
+  protected abstract String getTestRegExp();
+}

+ 20 - 0
kafka-ui-api/src/main/java/com/provectus/kafka/ui/strategy/ksql/statement/CreateStrategy.java

@@ -0,0 +1,20 @@
+package com.provectus.kafka.ui.strategy.ksql.statement;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.provectus.kafka.ui.model.KsqlCommandResponse;
+import org.springframework.stereotype.Component;
+
+@Component
+public class CreateStrategy extends BaseStrategy {
+  private static final String RESPONSE_VALUE_KEY = "commandStatus";
+
+  @Override
+  public KsqlCommandResponse serializeResponse(JsonNode response) {
+    return serializeMessageResponse(response, RESPONSE_VALUE_KEY);
+  }
+
+  @Override
+  protected String getTestRegExp() {
+    return "create (table|stream)(.*)(with|as select(.*)from)(.*);";
+  }
+}

+ 20 - 0
kafka-ui-api/src/main/java/com/provectus/kafka/ui/strategy/ksql/statement/DescribeStrategy.java

@@ -0,0 +1,20 @@
+package com.provectus.kafka.ui.strategy.ksql.statement;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.provectus.kafka.ui.model.KsqlCommandResponse;
+import org.springframework.stereotype.Component;
+
+@Component
+public class DescribeStrategy extends BaseStrategy {
+  private static final String RESPONSE_VALUE_KEY = "sourceDescription";
+
+  @Override
+  public KsqlCommandResponse serializeResponse(JsonNode response) {
+    return serializeTableResponse(response, RESPONSE_VALUE_KEY);
+  }
+
+  @Override
+  protected String getTestRegExp() {
+    return "describe (.*);";
+  }
+}

+ 20 - 0
kafka-ui-api/src/main/java/com/provectus/kafka/ui/strategy/ksql/statement/DropStrategy.java

@@ -0,0 +1,20 @@
+package com.provectus.kafka.ui.strategy.ksql.statement;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.provectus.kafka.ui.model.KsqlCommandResponse;
+import org.springframework.stereotype.Component;
+
+@Component
+public class DropStrategy extends BaseStrategy {
+  private static final String RESPONSE_VALUE_KEY = "commandStatus";
+
+  @Override
+  public KsqlCommandResponse serializeResponse(JsonNode response) {
+    return serializeMessageResponse(response, RESPONSE_VALUE_KEY);
+  }
+
+  @Override
+  protected String getTestRegExp() {
+    return "drop (table|stream) (.*);";
+  }
+}

+ 20 - 0
kafka-ui-api/src/main/java/com/provectus/kafka/ui/strategy/ksql/statement/ExplainStrategy.java

@@ -0,0 +1,20 @@
+package com.provectus.kafka.ui.strategy.ksql.statement;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.provectus.kafka.ui.model.KsqlCommandResponse;
+import org.springframework.stereotype.Component;
+
+@Component
+public class ExplainStrategy extends BaseStrategy {
+  private static final String RESPONSE_VALUE_KEY = "queryDescription";
+
+  @Override
+  public KsqlCommandResponse serializeResponse(JsonNode response) {
+    return serializeTableResponse(response, RESPONSE_VALUE_KEY);
+  }
+
+  @Override
+  protected String getTestRegExp() {
+    return "explain (.*);";
+  }
+}

+ 24 - 0
kafka-ui-api/src/main/java/com/provectus/kafka/ui/strategy/ksql/statement/SelectStrategy.java

@@ -0,0 +1,24 @@
+package com.provectus.kafka.ui.strategy.ksql.statement;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.provectus.kafka.ui.model.KsqlCommandResponse;
+import org.springframework.stereotype.Component;
+
+@Component
+public class SelectStrategy extends BaseStrategy {
+
+  @Override
+  public KsqlCommandResponse serializeResponse(JsonNode response) {
+    return serializeQueryResponse(response);
+  }
+
+  @Override
+  protected String getRequestPath() {
+    return BaseStrategy.QUERY_REQUEST_PATH;
+  }
+
+  @Override
+  protected String getTestRegExp() {
+    return "select (.*) from (.*);";
+  }
+}

+ 57 - 0
kafka-ui-api/src/main/java/com/provectus/kafka/ui/strategy/ksql/statement/ShowStrategy.java

@@ -0,0 +1,57 @@
+package com.provectus.kafka.ui.strategy.ksql.statement;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.provectus.kafka.ui.model.KsqlCommandResponse;
+import java.util.List;
+import java.util.Optional;
+import org.springframework.stereotype.Component;
+
+@Component
+public class ShowStrategy extends BaseStrategy {
+  private static final List<String> SHOW_STATEMENTS =
+      List.of("functions", "topics", "streams", "tables", "queries", "properties");
+  private static final List<String> LIST_STATEMENTS =
+      List.of("functions", "topics", "streams", "tables");
+  private String responseValueKey = "";
+
+  @Override
+  public KsqlCommandResponse serializeResponse(JsonNode response) {
+    return serializeTableResponse(response, responseValueKey);
+  }
+
+  @Override
+  public boolean test(String sql) {
+    Optional<String> statement = SHOW_STATEMENTS.stream()
+        .filter(s -> testSql(sql, getShowRegExp(s)) || testSql(sql, getListRegExp(s)))
+        .findFirst();
+    if (statement.isPresent()) {
+      setResponseValueKey(statement.get());
+      return true;
+    }
+    return false;
+  }
+
+  @Override
+  protected String getTestRegExp() {
+    return "";
+  }
+
+  protected String getShowRegExp(String key) {
+    return "show " + key + ";";
+  }
+
+  protected String getListRegExp(String key) {
+    if (LIST_STATEMENTS.contains(key)) {
+      return "list " + key + ";";
+    }
+    return "";
+  }
+
+  private void setResponseValueKey(String path) {
+    responseValueKey = path;
+  }
+
+  private boolean testSql(String sql, String pattern) {
+    return sql.trim().toLowerCase().matches(pattern);
+  }
+}

+ 20 - 0
kafka-ui-api/src/main/java/com/provectus/kafka/ui/strategy/ksql/statement/TerminateStrategy.java

@@ -0,0 +1,20 @@
+package com.provectus.kafka.ui.strategy.ksql.statement;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.provectus.kafka.ui.model.KsqlCommandResponse;
+import org.springframework.stereotype.Component;
+
+@Component
+public class TerminateStrategy extends BaseStrategy {
+  private static final String RESPONSE_VALUE_KEY = "commandStatus";
+
+  @Override
+  public KsqlCommandResponse serializeResponse(JsonNode response) {
+    return serializeMessageResponse(response, RESPONSE_VALUE_KEY);
+  }
+
+  @Override
+  protected String getTestRegExp() {
+    return "terminate (.*);";
+  }
+}

+ 2 - 1
kafka-ui-api/src/main/resources/application-local.yml

@@ -4,6 +4,7 @@ kafka:
       bootstrapServers: localhost:9093
       zookeeper: localhost:2181
       schemaRegistry: http://localhost:8081
+      ksqldbServer: http://localhost:8088
       kafkaConnect:
         - name: first
           address: http://localhost:8083
@@ -25,4 +26,4 @@ spring:
   jmx:
     enabled: true
 auth:
-  enabled: false
+  enabled: false

+ 124 - 0
kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/KsqlServiceTest.java

@@ -0,0 +1,124 @@
+package com.provectus.kafka.ui.service;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import com.provectus.kafka.ui.client.KsqlClient;
+import com.provectus.kafka.ui.exception.ClusterNotFoundException;
+import com.provectus.kafka.ui.exception.KsqlDbNotFoundException;
+import com.provectus.kafka.ui.exception.UnprocessableEntityException;
+import com.provectus.kafka.ui.model.KafkaCluster;
+import com.provectus.kafka.ui.model.KsqlCommand;
+import com.provectus.kafka.ui.model.KsqlCommandResponse;
+import com.provectus.kafka.ui.strategy.ksql.statement.BaseStrategy;
+import com.provectus.kafka.ui.strategy.ksql.statement.ShowStrategy;
+import java.util.List;
+import java.util.Optional;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.junit.jupiter.MockitoExtension;
+import reactor.core.publisher.Mono;
+import reactor.test.StepVerifier;
+
+@ExtendWith(MockitoExtension.class)
+class KsqlServiceTest {
+  private KsqlService ksqlService;
+  private BaseStrategy baseStrategy;
+
+  @Mock
+  private ClustersStorage clustersStorage;
+  @Mock
+  private KsqlClient ksqlClient;
+
+
+  @BeforeEach
+  public void setUp() {
+    this.baseStrategy = new ShowStrategy();
+    this.ksqlService = new KsqlService(
+        this.ksqlClient,
+        this.clustersStorage,
+        List.of(baseStrategy)
+    );
+  }
+
+  @Test
+  void shouldThrowClusterNotFoundExceptionOnExecuteKsqlCommand() {
+    String clusterName = "test";
+    KsqlCommand command = (new KsqlCommand()).ksql("show streams;");
+    when(clustersStorage.getClusterByName(clusterName)).thenReturn(Optional.ofNullable(null));
+
+    StepVerifier.create(ksqlService.executeKsqlCommand(clusterName, Mono.just(command)))
+        .verifyError(ClusterNotFoundException.class);
+  }
+
+  @Test
+  void shouldThrowKsqlDbNotFoundExceptionOnExecuteKsqlCommand() {
+    String clusterName = "test";
+    KsqlCommand command = (new KsqlCommand()).ksql("show streams;");
+    KafkaCluster kafkaCluster = Mockito.mock(KafkaCluster.class);
+    when(clustersStorage.getClusterByName(clusterName))
+        .thenReturn(Optional.ofNullable(kafkaCluster));
+    when(kafkaCluster.getKsqldbServer()).thenReturn(null);
+
+    StepVerifier.create(ksqlService.executeKsqlCommand(clusterName, Mono.just(command)))
+        .verifyError(KsqlDbNotFoundException.class);
+  }
+
+  @Test
+  void shouldThrowUnprocessableEntityExceptionOnExecuteKsqlCommand() {
+    String clusterName = "test";
+    KsqlCommand command =
+        (new KsqlCommand()).ksql("CREATE STREAM users WITH (KAFKA_TOPIC='users');");
+    KafkaCluster kafkaCluster = Mockito.mock(KafkaCluster.class);
+    when(clustersStorage.getClusterByName(clusterName))
+        .thenReturn(Optional.ofNullable(kafkaCluster));
+    when(kafkaCluster.getKsqldbServer()).thenReturn("localhost:8088");
+
+    StepVerifier.create(ksqlService.executeKsqlCommand(clusterName, Mono.just(command)))
+        .verifyError(UnprocessableEntityException.class);
+
+    StepVerifier.create(ksqlService.executeKsqlCommand(clusterName, Mono.just(command)))
+        .verifyErrorMessage("Invalid sql");
+  }
+
+  @Test
+  void shouldSetHostToStrategy() {
+    String clusterName = "test";
+    String host = "localhost:8088";
+    KsqlCommand command = (new KsqlCommand()).ksql("show streams;");
+    KafkaCluster kafkaCluster = Mockito.mock(KafkaCluster.class);
+
+    when(clustersStorage.getClusterByName(clusterName))
+        .thenReturn(Optional.ofNullable(kafkaCluster));
+    when(kafkaCluster.getKsqldbServer()).thenReturn(host);
+    when(ksqlClient.execute(any())).thenReturn(Mono.just(new KsqlCommandResponse()));
+
+    ksqlService.executeKsqlCommand(clusterName, Mono.just(command)).block();
+    assertThat(baseStrategy.getUri()).isEqualTo(host + "/ksql");
+  }
+
+  @Test
+  void shouldCallClientAndReturnResponse() {
+    String clusterName = "test";
+    KsqlCommand command = (new KsqlCommand()).ksql("show streams;");
+    KafkaCluster kafkaCluster = Mockito.mock(KafkaCluster.class);
+    KsqlCommandResponse response = new KsqlCommandResponse().message("success");
+
+    when(clustersStorage.getClusterByName(clusterName))
+        .thenReturn(Optional.ofNullable(kafkaCluster));
+    when(kafkaCluster.getKsqldbServer()).thenReturn("host");
+    when(ksqlClient.execute(any())).thenReturn(Mono.just(response));
+
+    KsqlCommandResponse receivedResponse =
+        ksqlService.executeKsqlCommand(clusterName, Mono.just(command)).block();
+    verify(ksqlClient, times(1)).execute(baseStrategy);
+    assertThat(receivedResponse).isEqualTo(response);
+
+  }
+}

+ 85 - 0
kafka-ui-api/src/test/java/com/provectus/kafka/ui/strategy/ksql/statement/CreateStrategyTest.java

@@ -0,0 +1,85 @@
+package com.provectus.kafka.ui.strategy.ksql.statement;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.provectus.kafka.ui.exception.UnprocessableEntityException;
+import com.provectus.kafka.ui.model.KsqlCommandResponse;
+import lombok.SneakyThrows;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+@ExtendWith(MockitoExtension.class)
+class CreateStrategyTest {
+  private final ObjectMapper mapper = new ObjectMapper();
+  private CreateStrategy strategy;
+
+  @BeforeEach
+  void setUp() {
+    strategy = new CreateStrategy();
+  }
+
+  @Test
+  void shouldReturnUri() {
+    strategy.host("ksqldb-server:8088");
+    assertThat(strategy.getUri()).isEqualTo("ksqldb-server:8088/ksql");
+  }
+
+  @Test
+  void shouldReturnTrueInTest() {
+    assertTrue(strategy.test("CREATE STREAM stream WITH (KAFKA_TOPIC='topic');"));
+    assertTrue(strategy.test("CREATE STREAM stream"
+        + " AS SELECT users.id AS userid FROM users EMIT CHANGES;"
+    ));
+    assertTrue(strategy.test(
+        "CREATE TABLE table (id VARCHAR) WITH (KAFKA_TOPIC='table');"
+    ));
+    assertTrue(strategy.test(
+        "CREATE TABLE pageviews_regions WITH (KEY_FORMAT='JSON')"
+            + "  AS SELECT gender, COUNT(*) AS numbers"
+            + "  FROM pageviews EMIT CHANGES;"
+    ));
+  }
+
+  @Test
+  void shouldReturnFalseInTest() {
+    assertFalse(strategy.test("show streams;"));
+    assertFalse(strategy.test("show tables;"));
+    assertFalse(strategy.test("CREATE TABLE test;"));
+    assertFalse(strategy.test("CREATE STREAM test;"));
+  }
+
+  @Test
+  void shouldSerializeResponse() {
+    String message = "updated successful";
+    JsonNode node = getResponseWithMessage(message);
+    KsqlCommandResponse serializedResponse = strategy.serializeResponse(node);
+    assertThat(serializedResponse.getMessage()).isEqualTo(message);
+
+  }
+
+  @Test
+  void shouldSerializeWithException() {
+    JsonNode commandStatusNode = mapper.createObjectNode().put("commandStatus", "nodeWithMessage");
+    JsonNode node = mapper.createArrayNode().add(mapper.valueToTree(commandStatusNode));
+    Exception exception = assertThrows(
+        UnprocessableEntityException.class,
+        () -> strategy.serializeResponse(node)
+    );
+
+    assertThat(exception.getMessage()).isEqualTo("KSQL DB response mapping error");
+  }
+
+  @SneakyThrows
+  private JsonNode getResponseWithMessage(String message) {
+    JsonNode nodeWithMessage = mapper.createObjectNode().put("message", message);
+    JsonNode commandStatusNode = mapper.createObjectNode().set("commandStatus", nodeWithMessage);
+    return mapper.createArrayNode().add(mapper.valueToTree(commandStatusNode));
+  }
+}

+ 76 - 0
kafka-ui-api/src/test/java/com/provectus/kafka/ui/strategy/ksql/statement/DescribeStrategyTest.java

@@ -0,0 +1,76 @@
+package com.provectus.kafka.ui.strategy.ksql.statement;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.provectus.kafka.ui.exception.UnprocessableEntityException;
+import com.provectus.kafka.ui.model.KsqlCommandResponse;
+import com.provectus.kafka.ui.model.Table;
+import java.util.List;
+import lombok.SneakyThrows;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+@ExtendWith(MockitoExtension.class)
+class DescribeStrategyTest {
+  private final ObjectMapper mapper = new ObjectMapper();
+  private DescribeStrategy strategy;
+
+  @BeforeEach
+  void setUp() {
+    strategy = new DescribeStrategy();
+  }
+
+  @Test
+  void shouldReturnUri() {
+    strategy.host("ksqldb-server:8088");
+    assertThat(strategy.getUri()).isEqualTo("ksqldb-server:8088/ksql");
+  }
+
+  @Test
+  void shouldReturnTrueInTest() {
+    assertTrue(strategy.test("DESCRIBE users;"));
+    assertTrue(strategy.test("DESCRIBE EXTENDED users;"));
+  }
+
+  @Test
+  void shouldReturnFalseInTest() {
+    assertFalse(strategy.test("list streams;"));
+    assertFalse(strategy.test("show tables;"));
+  }
+
+  @Test
+  void shouldSerializeResponse() {
+    JsonNode node = getResponseWithObjectNode();
+    KsqlCommandResponse serializedResponse = strategy.serializeResponse(node);
+    Table table = serializedResponse.getData();
+    assertThat(table.getHeaders()).isEqualTo(List.of("key", "value"));
+    assertThat(table.getRows()).isEqualTo(List.of(List.of("name", "kafka")));
+  }
+
+  @Test
+  void shouldSerializeWithException() {
+    JsonNode sourceDescriptionNode =
+        mapper.createObjectNode().put("sourceDescription", "nodeWithMessage");
+    JsonNode node = mapper.createArrayNode().add(mapper.valueToTree(sourceDescriptionNode));
+    Exception exception = assertThrows(
+        UnprocessableEntityException.class,
+        () -> strategy.serializeResponse(node)
+    );
+
+    assertThat(exception.getMessage()).isEqualTo("KSQL DB response mapping error");
+  }
+
+  @SneakyThrows
+  private JsonNode getResponseWithObjectNode() {
+    JsonNode nodeWithMessage = mapper.createObjectNode().put("name", "kafka");
+    JsonNode nodeWithResponse = mapper.createObjectNode().set("sourceDescription", nodeWithMessage);
+    return mapper.createArrayNode().add(mapper.valueToTree(nodeWithResponse));
+  }
+}

+ 75 - 0
kafka-ui-api/src/test/java/com/provectus/kafka/ui/strategy/ksql/statement/DropStrategyTest.java

@@ -0,0 +1,75 @@
+package com.provectus.kafka.ui.strategy.ksql.statement;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.provectus.kafka.ui.exception.UnprocessableEntityException;
+import com.provectus.kafka.ui.model.KsqlCommandResponse;
+import lombok.SneakyThrows;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+@ExtendWith(MockitoExtension.class)
+class DropStrategyTest {
+  private final ObjectMapper mapper = new ObjectMapper();
+  private DropStrategy strategy;
+
+  @BeforeEach
+  void setUp() {
+    strategy = new DropStrategy();
+  }
+
+  @Test
+  void shouldReturnUri() {
+    strategy.host("ksqldb-server:8088");
+    assertThat(strategy.getUri()).isEqualTo("ksqldb-server:8088/ksql");
+  }
+
+  @Test
+  void shouldReturnTrueInTest() {
+    assertTrue(strategy.test("drop table table1;"));
+    assertTrue(strategy.test("drop stream stream2;"));
+  }
+
+  @Test
+  void shouldReturnFalseInTest() {
+    assertFalse(strategy.test("show streams;"));
+    assertFalse(strategy.test("show tables;"));
+    assertFalse(strategy.test("create table test;"));
+    assertFalse(strategy.test("create stream test;"));
+  }
+
+  @Test
+  void shouldSerializeResponse() {
+    String message = "updated successful";
+    JsonNode node = getResponseWithMessage(message);
+    KsqlCommandResponse serializedResponse = strategy.serializeResponse(node);
+    assertThat(serializedResponse.getMessage()).isEqualTo(message);
+
+  }
+
+  @Test
+  void shouldSerializeWithException() {
+    JsonNode commandStatusNode = mapper.createObjectNode().put("commandStatus", "nodeWithMessage");
+    JsonNode node = mapper.createArrayNode().add(mapper.valueToTree(commandStatusNode));
+    Exception exception = assertThrows(
+        UnprocessableEntityException.class,
+        () -> strategy.serializeResponse(node)
+    );
+
+    assertThat(exception.getMessage()).isEqualTo("KSQL DB response mapping error");
+  }
+
+  @SneakyThrows
+  private JsonNode getResponseWithMessage(String message) {
+    JsonNode nodeWithMessage = mapper.createObjectNode().put("message", message);
+    JsonNode commandStatusNode = mapper.createObjectNode().set("commandStatus", nodeWithMessage);
+    return mapper.createArrayNode().add(mapper.valueToTree(commandStatusNode));
+  }
+}

+ 74 - 0
kafka-ui-api/src/test/java/com/provectus/kafka/ui/strategy/ksql/statement/ExplainStrategyTest.java

@@ -0,0 +1,74 @@
+package com.provectus.kafka.ui.strategy.ksql.statement;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.provectus.kafka.ui.exception.UnprocessableEntityException;
+import com.provectus.kafka.ui.model.KsqlCommandResponse;
+import com.provectus.kafka.ui.model.Table;
+import java.util.List;
+import lombok.SneakyThrows;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+@ExtendWith(MockitoExtension.class)
+class ExplainStrategyTest {
+  private final ObjectMapper mapper = new ObjectMapper();
+  private ExplainStrategy strategy;
+
+  @BeforeEach
+  void setUp() {
+    strategy = new ExplainStrategy();
+  }
+
+  @Test
+  void shouldReturnUri() {
+    strategy.host("ksqldb-server:8088");
+    assertThat(strategy.getUri()).isEqualTo("ksqldb-server:8088/ksql");
+  }
+
+  @Test
+  void shouldReturnTrueInTest() {
+    assertTrue(strategy.test("explain users_query_id;"));
+  }
+
+  @Test
+  void shouldReturnFalseInTest() {
+    assertFalse(strategy.test("show queries;"));
+  }
+
+  @Test
+  void shouldSerializeResponse() {
+    JsonNode node = getResponseWithObjectNode();
+    KsqlCommandResponse serializedResponse = strategy.serializeResponse(node);
+    Table table = serializedResponse.getData();
+    assertThat(table.getHeaders()).isEqualTo(List.of("key", "value"));
+    assertThat(table.getRows()).isEqualTo(List.of(List.of("name", "kafka")));
+  }
+
+  @Test
+  void shouldSerializeWithException() {
+    JsonNode sourceDescriptionNode =
+        mapper.createObjectNode().put("sourceDescription", "nodeWithMessage");
+    JsonNode node = mapper.createArrayNode().add(mapper.valueToTree(sourceDescriptionNode));
+    Exception exception = assertThrows(
+        UnprocessableEntityException.class,
+        () -> strategy.serializeResponse(node)
+    );
+
+    assertThat(exception.getMessage()).isEqualTo("KSQL DB response mapping error");
+  }
+
+  @SneakyThrows
+  private JsonNode getResponseWithObjectNode() {
+    JsonNode nodeWithMessage = mapper.createObjectNode().put("name", "kafka");
+    JsonNode nodeWithResponse = mapper.createObjectNode().set("queryDescription", nodeWithMessage);
+    return mapper.createArrayNode().add(mapper.valueToTree(nodeWithResponse));
+  }
+}

+ 79 - 0
kafka-ui-api/src/test/java/com/provectus/kafka/ui/strategy/ksql/statement/SelectStrategyTest.java

@@ -0,0 +1,79 @@
+package com.provectus.kafka.ui.strategy.ksql.statement;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.provectus.kafka.ui.exception.UnprocessableEntityException;
+import com.provectus.kafka.ui.model.KsqlCommandResponse;
+import com.provectus.kafka.ui.model.Table;
+import java.util.List;
+import lombok.SneakyThrows;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+@ExtendWith(MockitoExtension.class)
+class SelectStrategyTest {
+  private final ObjectMapper mapper = new ObjectMapper();
+  private SelectStrategy strategy;
+
+  @BeforeEach
+  void setUp() {
+    strategy = new SelectStrategy();
+  }
+
+  @Test
+  void shouldReturnUri() {
+    strategy.host("ksqldb-server:8088");
+    assertThat(strategy.getUri()).isEqualTo("ksqldb-server:8088/query");
+  }
+
+  @Test
+  void shouldReturnTrueInTest() {
+    assertTrue(strategy.test("select * from users;"));
+  }
+
+  @Test
+  void shouldReturnFalseInTest() {
+    assertFalse(strategy.test("show streams;"));
+    assertFalse(strategy.test("select *;"));
+  }
+
+  @Test
+  void shouldSerializeResponse() {
+    JsonNode node = getResponseWithData();
+    KsqlCommandResponse serializedResponse = strategy.serializeResponse(node);
+    Table table = serializedResponse.getData();
+    assertThat(table.getHeaders()).isEqualTo(List.of("header1", "header2"));
+    assertThat(table.getRows()).isEqualTo(List.of(List.of("value1", "value2")));
+  }
+
+  @Test
+  void shouldSerializeWithException() {
+    JsonNode node = mapper.createObjectNode();
+    Exception exception = assertThrows(
+        UnprocessableEntityException.class,
+        () -> strategy.serializeResponse(node)
+    );
+
+    assertThat(exception.getMessage()).isEqualTo("KSQL DB response mapping error");
+  }
+
+  @SneakyThrows
+  private JsonNode getResponseWithData() {
+    JsonNode headerNode = mapper.createObjectNode().set(
+        "header", mapper.createObjectNode().put("schema", "header1, header2")
+    );
+    JsonNode row = mapper.createObjectNode().set(
+        "row", mapper.createObjectNode().set(
+            "columns", mapper.createArrayNode().add("value1").add("value2")
+        )
+    );
+    return mapper.createArrayNode().add(headerNode).add(row);
+  }
+}

+ 135 - 0
kafka-ui-api/src/test/java/com/provectus/kafka/ui/strategy/ksql/statement/ShowStrategyTest.java

@@ -0,0 +1,135 @@
+package com.provectus.kafka.ui.strategy.ksql.statement;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.provectus.kafka.ui.exception.UnprocessableEntityException;
+import com.provectus.kafka.ui.model.KsqlCommandResponse;
+import com.provectus.kafka.ui.model.Table;
+import java.util.List;
+import lombok.SneakyThrows;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+@ExtendWith(MockitoExtension.class)
+class ShowStrategyTest {
+  private final ObjectMapper mapper = new ObjectMapper();
+  private ShowStrategy strategy;
+
+  @BeforeEach
+  void setUp() {
+    strategy = new ShowStrategy();
+  }
+
+  @Test
+  void shouldReturnUri() {
+    strategy.host("ksqldb-server:8088");
+    assertThat(strategy.getUri()).isEqualTo("ksqldb-server:8088/ksql");
+  }
+
+  @Test
+  void shouldReturnTrueInTest() {
+    assertTrue(strategy.test("SHOW STREAMS;"));
+    assertTrue(strategy.test("SHOW TABLES;"));
+    assertTrue(strategy.test("SHOW TOPICS;"));
+    assertTrue(strategy.test("SHOW QUERIES;"));
+    assertTrue(strategy.test("SHOW PROPERTIES;"));
+    assertTrue(strategy.test("SHOW FUNCTIONS;"));
+    assertTrue(strategy.test("LIST STREAMS;"));
+    assertTrue(strategy.test("LIST TABLES;"));
+    assertTrue(strategy.test("LIST TOPICS;"));
+    assertTrue(strategy.test("LIST FUNCTIONS;"));
+  }
+
+  @Test
+  void shouldReturnFalseInTest() {
+    assertFalse(strategy.test("LIST QUERIES;"));
+    assertFalse(strategy.test("LIST PROPERTIES;"));
+  }
+
+  @Test
+  void shouldSerializeStreamsResponse() {
+    JsonNode node = getResponseWithData("streams");
+    strategy.test("show streams;");
+    KsqlCommandResponse serializedResponse = strategy.serializeResponse(node);
+    Table table = serializedResponse.getData();
+    assertThat(table.getHeaders()).isEqualTo(List.of("header"));
+    assertThat(table.getRows()).isEqualTo(List.of(List.of("value")));
+  }
+
+  @Test
+  void shouldSerializeTablesResponse() {
+    JsonNode node = getResponseWithData("tables");
+    strategy.test("show tables;");
+    KsqlCommandResponse serializedResponse = strategy.serializeResponse(node);
+    Table table = serializedResponse.getData();
+    assertThat(table.getHeaders()).isEqualTo(List.of("header"));
+    assertThat(table.getRows()).isEqualTo(List.of(List.of("value")));
+  }
+
+  @Test
+  void shouldSerializeTopicsResponse() {
+    JsonNode node = getResponseWithData("topics");
+    strategy.test("show topics;");
+    KsqlCommandResponse serializedResponse = strategy.serializeResponse(node);
+    Table table = serializedResponse.getData();
+    assertThat(table.getHeaders()).isEqualTo(List.of("header"));
+    assertThat(table.getRows()).isEqualTo(List.of(List.of("value")));
+  }
+
+  @Test
+  void shouldSerializePropertiesResponse() {
+    JsonNode node = getResponseWithData("properties");
+    strategy.test("show properties;");
+    KsqlCommandResponse serializedResponse = strategy.serializeResponse(node);
+    Table table = serializedResponse.getData();
+    assertThat(table.getHeaders()).isEqualTo(List.of("header"));
+    assertThat(table.getRows()).isEqualTo(List.of(List.of("value")));
+  }
+
+  @Test
+  void shouldSerializeFunctionsResponse() {
+    JsonNode node = getResponseWithData("functions");
+    strategy.test("show functions;");
+    KsqlCommandResponse serializedResponse = strategy.serializeResponse(node);
+    Table table = serializedResponse.getData();
+    assertThat(table.getHeaders()).isEqualTo(List.of("header"));
+    assertThat(table.getRows()).isEqualTo(List.of(List.of("value")));
+  }
+
+  @Test
+  void shouldSerializeQueriesResponse() {
+    JsonNode node = getResponseWithData("queries");
+    strategy.test("show queries;");
+    KsqlCommandResponse serializedResponse = strategy.serializeResponse(node);
+    Table table = serializedResponse.getData();
+    assertThat(table.getHeaders()).isEqualTo(List.of("header"));
+    assertThat(table.getRows()).isEqualTo(List.of(List.of("value")));
+  }
+
+  @Test
+  void shouldSerializeWithException() {
+    JsonNode node = getResponseWithData("streams");
+    strategy.test("show tables;");
+    Exception exception = assertThrows(
+        UnprocessableEntityException.class,
+        () -> strategy.serializeResponse(node)
+    );
+
+    assertThat(exception.getMessage()).isEqualTo("KSQL DB response mapping error");
+  }
+
+  @SneakyThrows
+  private JsonNode getResponseWithData(String key) {
+    JsonNode nodeWithDataItem = mapper.createObjectNode().put("header", "value");
+    JsonNode nodeWithData = mapper.createArrayNode().add(nodeWithDataItem);
+    JsonNode nodeWithResponse = mapper.createObjectNode().set(key, nodeWithData);
+    return mapper.createArrayNode().add(mapper.valueToTree(nodeWithResponse));
+  }
+}

+ 72 - 0
kafka-ui-api/src/test/java/com/provectus/kafka/ui/strategy/ksql/statement/TerminateStrategyTest.java

@@ -0,0 +1,72 @@
+package com.provectus.kafka.ui.strategy.ksql.statement;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.provectus.kafka.ui.exception.UnprocessableEntityException;
+import com.provectus.kafka.ui.model.KsqlCommandResponse;
+import lombok.SneakyThrows;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+@ExtendWith(MockitoExtension.class)
+class TerminateStrategyTest {
+  private final ObjectMapper mapper = new ObjectMapper();
+  private TerminateStrategy strategy;
+
+  @BeforeEach
+  void setUp() {
+    strategy = new TerminateStrategy();
+  }
+
+  @Test
+  void shouldReturnUri() {
+    strategy.host("ksqldb-server:8088");
+    assertThat(strategy.getUri()).isEqualTo("ksqldb-server:8088/ksql");
+  }
+
+  @Test
+  void shouldReturnTrueInTest() {
+    assertTrue(strategy.test("terminate query_id;"));
+  }
+
+  @Test
+  void shouldReturnFalseInTest() {
+    assertFalse(strategy.test("show streams;"));
+    assertFalse(strategy.test("create table test;"));
+  }
+
+  @Test
+  void shouldSerializeResponse() {
+    String message = "query terminated.";
+    JsonNode node = getResponseWithMessage(message);
+    KsqlCommandResponse serializedResponse = strategy.serializeResponse(node);
+    assertThat(serializedResponse.getMessage()).isEqualTo(message);
+
+  }
+
+  @Test
+  void shouldSerializeWithException() {
+    JsonNode commandStatusNode = mapper.createObjectNode().put("commandStatus", "nodeWithMessage");
+    JsonNode node = mapper.createArrayNode().add(mapper.valueToTree(commandStatusNode));
+    Exception exception = assertThrows(
+        UnprocessableEntityException.class,
+        () -> strategy.serializeResponse(node)
+    );
+
+    assertThat(exception.getMessage()).isEqualTo("KSQL DB response mapping error");
+  }
+
+  @SneakyThrows
+  private JsonNode getResponseWithMessage(String message) {
+    JsonNode nodeWithMessage = mapper.createObjectNode().put("message", message);
+    JsonNode commandStatusNode = mapper.createObjectNode().set("commandStatus", nodeWithMessage);
+    return mapper.createArrayNode().add(mapper.valueToTree(commandStatusNode));
+  }
+}

+ 63 - 0
kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml

@@ -1247,6 +1247,31 @@ paths:
         200:
           description: OK
 
+  /api/clusters/{clusterName}/ksql:
+    post:
+      tags:
+        - Ksql
+      summary: executeKsqlCommand
+      operationId: executeKsqlCommand
+      parameters:
+        - name: clusterName
+          in: path
+          required: true
+          schema:
+            type: string
+      requestBody:
+        content:
+          application/json:
+            schema:
+              $ref: '#/components/schemas/KsqlCommand'
+      responses:
+        200:
+          description: OK
+          content:
+            application/json:
+              schema:
+                $ref: '#/components/schemas/KsqlCommandResponse'
+
   /api/clusters/{clusterName}/connects/{connectName}/plugins:
     get:
       tags:
@@ -1406,6 +1431,7 @@ components:
             enum:
               - SCHEMA_REGISTRY
               - KAFKA_CONNECT
+              - KSQL_DB
       required:
         - id
         - name
@@ -2207,6 +2233,43 @@ components:
           items:
             $ref: '#/components/schemas/ConnectorPluginConfig'
 
+    KsqlCommand:
+      type: object
+      properties:
+        ksql:
+          type: string
+        streamsProperties:
+          type: object
+          additionalProperties:
+            type: string
+      required:
+        - ksql
+
+    KsqlCommandResponse:
+      type: object
+      properties:
+        data:
+          $ref: '#/components/schemas/Table'
+        message:
+          type: string
+
+    Table:
+      type: object
+      properties:
+        headers:
+          type: array
+          items:
+            type: string
+        rows:
+          type: array
+          items:
+            type: array
+            items:
+              type: string
+      required:
+        - headers
+        - rows
+
     FullConnectorInfo:
       type: object
       properties: