Browse Source

[#207] tests(api): added unit tests for KsqlService, KsqlCommandStrategies

Ilnur Farukhshin 3 years ago
parent
commit
d59b4e69f9
15 changed files with 827 additions and 11 deletions
  1. 1 1
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/client/KsqlClient.java
  2. 4 1
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/KsqlService.java
  3. 0 1
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/strategy/ksqlStatement/DescribeStrategy.java
  4. 9 6
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/strategy/ksqlStatement/KsqlStatementStrategy.java
  5. 0 2
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/strategy/ksqlStatement/SelectStrategy.java
  6. 117 0
      kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/KsqlServiceTest.java
  7. 82 0
      kafka-ui-api/src/test/java/com/provectus/kafka/ui/strategy/ksqlStatement/CreateStrategyTest.java
  8. 74 0
      kafka-ui-api/src/test/java/com/provectus/kafka/ui/strategy/ksqlStatement/DescribeStrategyTest.java
  9. 73 0
      kafka-ui-api/src/test/java/com/provectus/kafka/ui/strategy/ksqlStatement/DropStrategyTest.java
  10. 72 0
      kafka-ui-api/src/test/java/com/provectus/kafka/ui/strategy/ksqlStatement/ExplainStrategyTest.java
  11. 110 0
      kafka-ui-api/src/test/java/com/provectus/kafka/ui/strategy/ksqlStatement/ListStrategyTest.java
  12. 78 0
      kafka-ui-api/src/test/java/com/provectus/kafka/ui/strategy/ksqlStatement/SelectStrategyTest.java
  13. 136 0
      kafka-ui-api/src/test/java/com/provectus/kafka/ui/strategy/ksqlStatement/ShowStrategyTest.java
  14. 70 0
      kafka-ui-api/src/test/java/com/provectus/kafka/ui/strategy/ksqlStatement/TerminateStrategyTest.java
  15. 1 0
      kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml

+ 1 - 1
kafka-ui-api/src/main/java/com/provectus/kafka/ui/client/KsqlClient.java

@@ -19,7 +19,7 @@ import reactor.core.publisher.Mono;
 @Service
 @Service
 @RequiredArgsConstructor
 @RequiredArgsConstructor
 @Log4j2
 @Log4j2
-public final class KsqlClient {
+public class KsqlClient {
     private final WebClient webClient;
     private final WebClient webClient;
     private final ObjectMapper mapper;
     private final ObjectMapper mapper;
 
 

+ 4 - 1
kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/KsqlService.java

@@ -25,7 +25,10 @@ public class KsqlService {
     return Mono.justOrEmpty(clustersStorage.getClusterByName(clusterName))
     return Mono.justOrEmpty(clustersStorage.getClusterByName(clusterName))
             .switchIfEmpty(Mono.error(ClusterNotFoundException::new))
             .switchIfEmpty(Mono.error(ClusterNotFoundException::new))
             .map(KafkaCluster::getKsqldbServer)
             .map(KafkaCluster::getKsqldbServer)
-            .switchIfEmpty(Mono.error(KsqlDbNotFoundException::new))
+            .onErrorResume(e -> {
+              Throwable throwable = e instanceof ClusterNotFoundException ? e : new KsqlDbNotFoundException();
+              return Mono.error(throwable);
+            })
             .flatMap(host -> getStatementStrategyForKsqlCommand(ksqlCommand)
             .flatMap(host -> getStatementStrategyForKsqlCommand(ksqlCommand)
                     .map(statement -> statement.host(host))
                     .map(statement -> statement.host(host))
             )
             )

+ 0 - 1
kafka-ui-api/src/main/java/com/provectus/kafka/ui/strategy/ksqlStatement/DescribeStrategy.java

@@ -12,7 +12,6 @@ public class DescribeStrategy extends KsqlStatementStrategy {
 
 
     @Override
     @Override
     public KsqlCommandResponse serializeResponse(JsonNode response) {
     public KsqlCommandResponse serializeResponse(JsonNode response) {
-        System.out.println(response);
         return serializeTableResponse(response, responseValueKey);
         return serializeTableResponse(response, responseValueKey);
     }
     }
 
 

+ 9 - 6
kafka-ui-api/src/main/java/com/provectus/kafka/ui/strategy/ksqlStatement/KsqlStatementStrategy.java

@@ -63,16 +63,19 @@ public abstract class KsqlStatementStrategy {
     }
     }
 
 
     protected KsqlCommandResponse serializeQueryResponse(JsonNode response) {
     protected KsqlCommandResponse serializeQueryResponse(JsonNode response) {
-        KsqlCommandResponse commandResponse = new KsqlCommandResponse();
-        Table table = (new Table())
-                .headers(getQueryResponseHeader(response))
-                .rows(getQueryResponseRows(response));
-        return commandResponse.data(table);
+        if (response.isArray() && response.size() > 0) {
+            KsqlCommandResponse commandResponse = new KsqlCommandResponse();
+            Table table = (new Table())
+                    .headers(getQueryResponseHeader(response))
+                    .rows(getQueryResponseRows(response));
+            return commandResponse.data(table);
+        }
+        throw new UnprocessableEntityException("KSQL DB response mapping error");
     }
     }
 
 
     private List<String> getQueryResponseHeader(JsonNode response) {
     private List<String> getQueryResponseHeader(JsonNode response) {
         JsonNode headerRow = response.get(0);
         JsonNode headerRow = response.get(0);
-        if (headerRow.isObject() && headerRow.size() > 0) {
+        if (headerRow.isObject() && headerRow.has("header")) {
             String schema = headerRow.get("header").get("schema").asText();
             String schema = headerRow.get("header").get("schema").asText();
             return Arrays.stream(schema.split(",")).map(String::trim).collect(Collectors.toList());
             return Arrays.stream(schema.split(",")).map(String::trim).collect(Collectors.toList());
         }
         }

+ 0 - 2
kafka-ui-api/src/main/java/com/provectus/kafka/ui/strategy/ksqlStatement/SelectStrategy.java

@@ -10,7 +10,6 @@ public class SelectStrategy extends KsqlStatementStrategy {
 
 
     @Override
     @Override
     public KsqlCommandResponse serializeResponse(JsonNode response) {
     public KsqlCommandResponse serializeResponse(JsonNode response) {
-        System.out.println(response);
         return serializeQueryResponse(response);
         return serializeQueryResponse(response);
     }
     }
 
 
@@ -19,7 +18,6 @@ public class SelectStrategy extends KsqlStatementStrategy {
         return requestPath;
         return requestPath;
     }
     }
 
 
-
     @Override
     @Override
     protected String getTestRegExp() {
     protected String getTestRegExp() {
         return "select (.*) from (.*);";
         return "select (.*) from (.*);";

+ 117 - 0
kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/KsqlServiceTest.java

@@ -0,0 +1,117 @@
+package com.provectus.kafka.ui.service;
+
+import com.provectus.kafka.ui.client.KsqlClient;
+import com.provectus.kafka.ui.exception.ClusterNotFoundException;
+import com.provectus.kafka.ui.exception.KsqlDbNotFoundException;
+import com.provectus.kafka.ui.exception.UnprocessableEntityException;
+import com.provectus.kafka.ui.model.KafkaCluster;
+import com.provectus.kafka.ui.model.KsqlCommand;
+import com.provectus.kafka.ui.model.KsqlCommandResponse;
+import com.provectus.kafka.ui.strategy.ksqlStatement.KsqlStatementStrategy;
+import com.provectus.kafka.ui.strategy.ksqlStatement.ShowStrategy;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.junit.jupiter.MockitoExtension;
+import reactor.core.publisher.Mono;
+import reactor.test.StepVerifier;
+
+import java.util.List;
+import java.util.Optional;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.*;
+
+@ExtendWith(MockitoExtension.class)
+class KsqlServiceTest {
+    private KsqlService ksqlService;
+    private KsqlStatementStrategy ksqlStatementStrategy;
+
+    @Mock
+    private ClustersStorage clustersStorage;
+    @Mock
+    private KsqlClient ksqlClient;
+
+
+    @BeforeEach
+    public void setUp() {
+        this.ksqlStatementStrategy = new ShowStrategy();
+        this.ksqlService = new KsqlService(
+                this.ksqlClient,
+                this.clustersStorage,
+                List.of(ksqlStatementStrategy)
+        );
+    }
+
+    @Test
+    public void shouldThrowClusterNotFoundExceptionOnExecuteKsqlCommand() {
+        String clusterName = "test";
+        KsqlCommand command = (new KsqlCommand()).ksql("show streams;");
+        when(clustersStorage.getClusterByName(clusterName)).thenReturn(Optional.ofNullable(null));
+
+        StepVerifier.create(ksqlService.executeKsqlCommand(clusterName, Mono.just(command)))
+                .verifyError(ClusterNotFoundException.class);
+    }
+
+    @Test
+    public void shouldThrowKsqlDbNotFoundExceptionOnExecuteKsqlCommand() {
+        String clusterName = "test";
+        KsqlCommand command = (new KsqlCommand()).ksql("show streams;");
+        KafkaCluster kafkaCluster = Mockito.mock(KafkaCluster.class);
+        when(clustersStorage.getClusterByName(clusterName)).thenReturn(Optional.ofNullable(kafkaCluster));
+        when(kafkaCluster.getKsqldbServer()).thenReturn(null);
+
+        StepVerifier.create(ksqlService.executeKsqlCommand(clusterName, Mono.just(command)))
+                .verifyError(KsqlDbNotFoundException.class);
+    }
+
+    @Test
+    public void shouldThrowUnprocessableEntityExceptionOnExecuteKsqlCommand() {
+        String clusterName = "test";
+        KsqlCommand command = (new KsqlCommand()).ksql("CREATE STREAM users WITH (KAFKA_TOPIC='users');");
+        KafkaCluster kafkaCluster = Mockito.mock(KafkaCluster.class);
+        when(clustersStorage.getClusterByName(clusterName)).thenReturn(Optional.ofNullable(kafkaCluster));
+        when(kafkaCluster.getKsqldbServer()).thenReturn("localhost:8088");
+
+        StepVerifier.create(ksqlService.executeKsqlCommand(clusterName, Mono.just(command)))
+                .verifyError(UnprocessableEntityException.class);
+
+        StepVerifier.create(ksqlService.executeKsqlCommand(clusterName, Mono.just(command)))
+                .verifyErrorMessage("Invalid sql");
+    }
+
+    @Test
+    public void shouldSetHostToStrategy() {
+        String clusterName = "test";
+        String host = "localhost:8088";
+        KsqlCommand command = (new KsqlCommand()).ksql("show streams;");
+        KafkaCluster kafkaCluster = Mockito.mock(KafkaCluster.class);
+
+        when(clustersStorage.getClusterByName(clusterName)).thenReturn(Optional.ofNullable(kafkaCluster));
+        when(kafkaCluster.getKsqldbServer()).thenReturn(host);
+        when(ksqlClient.execute(any())).thenReturn(Mono.just(new KsqlCommandResponse()));
+
+        ksqlService.executeKsqlCommand(clusterName, Mono.just(command)).block();
+        assertThat(ksqlStatementStrategy.getUri()).isEqualTo(host + "/ksql");
+    }
+
+    @Test
+    public void shouldCallClientAndReturnResponse() {
+        String clusterName = "test";
+        KsqlCommand command = (new KsqlCommand()).ksql("show streams;");
+        KafkaCluster kafkaCluster = Mockito.mock(KafkaCluster.class);
+        KsqlCommandResponse response = new KsqlCommandResponse().message("success");
+
+        when(clustersStorage.getClusterByName(clusterName)).thenReturn(Optional.ofNullable(kafkaCluster));
+        when(kafkaCluster.getKsqldbServer()).thenReturn("host");
+        when(ksqlClient.execute(any())).thenReturn(Mono.just(response));
+
+        KsqlCommandResponse receivedResponse = ksqlService.executeKsqlCommand(clusterName, Mono.just(command)).block();
+        verify(ksqlClient, times(1)).execute(ksqlStatementStrategy);
+        assertThat(receivedResponse).isEqualTo(response);
+
+    }
+}

+ 82 - 0
kafka-ui-api/src/test/java/com/provectus/kafka/ui/strategy/ksqlStatement/CreateStrategyTest.java

@@ -0,0 +1,82 @@
+package com.provectus.kafka.ui.strategy.ksqlStatement;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.provectus.kafka.ui.exception.UnprocessableEntityException;
+import com.provectus.kafka.ui.model.KsqlCommandResponse;
+import lombok.SneakyThrows;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.junit.jupiter.MockitoExtension;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.junit.jupiter.api.Assertions.*;
+
+@ExtendWith(MockitoExtension.class)
+class CreateStrategyTest {
+    private KsqlStatementStrategy ksqlStatementStrategy;
+    private ObjectMapper mapper = new ObjectMapper();
+
+    @BeforeEach
+    public void setUp() {
+        ksqlStatementStrategy = new CreateStrategy();
+    }
+
+    @Test
+    public void shouldReturnUri() {
+        ksqlStatementStrategy.host("ksqldb-server:8088");
+        assertThat(ksqlStatementStrategy.getUri()).isEqualTo("ksqldb-server:8088/ksql");
+    }
+
+    @Test
+    public void shouldReturnTrueInTest() {
+        assertTrue(ksqlStatementStrategy.test("CREATE STREAM stream WITH (KAFKA_TOPIC='topic');"));
+        assertTrue(ksqlStatementStrategy.test("CREATE STREAM stream" +
+                " AS SELECT users.id AS userid FROM users EMIT CHANGES;"
+        ));
+        assertTrue(ksqlStatementStrategy.test(
+                "CREATE TABLE table (id VARCHAR) WITH (KAFKA_TOPIC='table');"
+        ));
+        assertTrue(ksqlStatementStrategy.test(
+                "CREATE TABLE pageviews_regions WITH (KEY_FORMAT='JSON')" +
+                        "  AS SELECT gender, COUNT(*) AS numbers" +
+                        "  FROM pageviews EMIT CHANGES;"
+        ));
+    }
+
+    @Test
+    public void shouldReturnFalseInTest() {
+        assertFalse(ksqlStatementStrategy.test("show streams;"));
+        assertFalse(ksqlStatementStrategy.test("show tables;"));
+        assertFalse(ksqlStatementStrategy.test("CREATE TABLE test;"));
+        assertFalse(ksqlStatementStrategy.test("CREATE STREAM test;"));
+    }
+
+    @Test
+    public void shouldSerializeResponse() {
+        String message = "updated successful";
+        JsonNode node = getResponseWithMessage(message);
+        KsqlCommandResponse serializedResponse = ksqlStatementStrategy.serializeResponse(node);
+        assertThat(serializedResponse.getMessage()).isEqualTo(message);
+
+    }
+
+    @Test
+    public void shouldSerializeWithException() {
+        JsonNode commandStatusNode = mapper.createObjectNode().put("commandStatus", "nodeWithMessage");
+        JsonNode node = mapper.createArrayNode().add(mapper.valueToTree(commandStatusNode));
+        Exception exception = assertThrows(
+                UnprocessableEntityException.class,
+                () -> ksqlStatementStrategy.serializeResponse(node)
+        );
+
+        assertThat(exception.getMessage()).isEqualTo("KSQL DB response mapping error");
+    }
+
+    @SneakyThrows
+    private JsonNode getResponseWithMessage(String message) {
+        JsonNode nodeWithMessage = mapper.createObjectNode().put("message", message);
+        JsonNode commandStatusNode = mapper.createObjectNode().set("commandStatus", nodeWithMessage);
+        return mapper.createArrayNode().add(mapper.valueToTree(commandStatusNode));
+    }
+}

+ 74 - 0
kafka-ui-api/src/test/java/com/provectus/kafka/ui/strategy/ksqlStatement/DescribeStrategyTest.java

@@ -0,0 +1,74 @@
+package com.provectus.kafka.ui.strategy.ksqlStatement;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.provectus.kafka.ui.exception.UnprocessableEntityException;
+import com.provectus.kafka.ui.model.KsqlCommandResponse;
+import com.provectus.kafka.ui.model.Table;
+import lombok.SneakyThrows;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+import java.util.List;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.junit.jupiter.api.Assertions.*;
+
+@ExtendWith(MockitoExtension.class)
+class DescribeStrategyTest {
+    private KsqlStatementStrategy ksqlStatementStrategy;
+    private ObjectMapper mapper = new ObjectMapper();
+
+    @BeforeEach
+    public void setUp() {
+        ksqlStatementStrategy = new DescribeStrategy();
+    }
+
+    @Test
+    public void shouldReturnUri() {
+        ksqlStatementStrategy.host("ksqldb-server:8088");
+        assertThat(ksqlStatementStrategy.getUri()).isEqualTo("ksqldb-server:8088/ksql");
+    }
+
+    @Test
+    public void shouldReturnTrueInTest() {
+        assertTrue(ksqlStatementStrategy.test("DESCRIBE users;"));
+        assertTrue(ksqlStatementStrategy.test("DESCRIBE EXTENDED users;"));
+    }
+
+    @Test
+    public void shouldReturnFalseInTest() {
+        assertFalse(ksqlStatementStrategy.test("list streams;"));
+        assertFalse(ksqlStatementStrategy.test("show tables;"));
+    }
+
+    @Test
+    public void shouldSerializeResponse() {
+        JsonNode node = getResponseWithObjectNode();
+        KsqlCommandResponse serializedResponse = ksqlStatementStrategy.serializeResponse(node);
+        Table table = serializedResponse.getData();
+        assertThat(table.getHeaders()).isEqualTo(List.of("key", "value"));
+        assertThat(table.getRows()).isEqualTo(List.of(List.of("name", "kafka")));
+    }
+
+    @Test
+    public void shouldSerializeWithException() {
+        JsonNode sourceDescriptionNode = mapper.createObjectNode().put("sourceDescription", "nodeWithMessage");
+        JsonNode node = mapper.createArrayNode().add(mapper.valueToTree(sourceDescriptionNode));
+        Exception exception = assertThrows(
+                UnprocessableEntityException.class,
+                () -> ksqlStatementStrategy.serializeResponse(node)
+        );
+
+        assertThat(exception.getMessage()).isEqualTo("KSQL DB response mapping error");
+    }
+
+    @SneakyThrows
+    private JsonNode getResponseWithObjectNode() {
+        JsonNode nodeWithMessage = mapper.createObjectNode().put("name", "kafka");
+        JsonNode nodeWithResponse = mapper.createObjectNode().set("sourceDescription", nodeWithMessage);
+        return mapper.createArrayNode().add(mapper.valueToTree(nodeWithResponse));
+    }
+}

+ 73 - 0
kafka-ui-api/src/test/java/com/provectus/kafka/ui/strategy/ksqlStatement/DropStrategyTest.java

@@ -0,0 +1,73 @@
+package com.provectus.kafka.ui.strategy.ksqlStatement;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.provectus.kafka.ui.exception.UnprocessableEntityException;
+import com.provectus.kafka.ui.model.KsqlCommandResponse;
+import lombok.SneakyThrows;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.junit.jupiter.api.Assertions.*;
+
+@ExtendWith(MockitoExtension.class)
+class DropStrategyTest {
+    private KsqlStatementStrategy ksqlStatementStrategy;
+    private ObjectMapper mapper = new ObjectMapper();
+
+    @BeforeEach
+    public void setUp() {
+        ksqlStatementStrategy = new DropStrategy();
+    }
+
+    @Test
+    public void shouldReturnUri() {
+        ksqlStatementStrategy.host("ksqldb-server:8088");
+        assertThat(ksqlStatementStrategy.getUri()).isEqualTo("ksqldb-server:8088/ksql");
+    }
+
+    @Test
+    public void shouldReturnTrueInTest() {
+        assertTrue(ksqlStatementStrategy.test("drop table table1;"));
+        assertTrue(ksqlStatementStrategy.test("drop stream stream2;"));
+    }
+
+    @Test
+    public void shouldReturnFalseInTest() {
+        assertFalse(ksqlStatementStrategy.test("show streams;"));
+        assertFalse(ksqlStatementStrategy.test("show tables;"));
+        assertFalse(ksqlStatementStrategy.test("create table test;"));
+        assertFalse(ksqlStatementStrategy.test("create stream test;"));
+    }
+
+    @Test
+    public void shouldSerializeResponse() {
+        String message = "updated successful";
+        JsonNode node = getResponseWithMessage(message);
+        KsqlCommandResponse serializedResponse = ksqlStatementStrategy.serializeResponse(node);
+        assertThat(serializedResponse.getMessage()).isEqualTo(message);
+
+    }
+
+    @Test
+    public void shouldSerializeWithException() {
+        JsonNode commandStatusNode = mapper.createObjectNode().put("commandStatus", "nodeWithMessage");
+        JsonNode node = mapper.createArrayNode().add(mapper.valueToTree(commandStatusNode));
+        Exception exception = assertThrows(
+                UnprocessableEntityException.class,
+                () -> ksqlStatementStrategy.serializeResponse(node)
+        );
+
+        assertThat(exception.getMessage()).isEqualTo("KSQL DB response mapping error");
+    }
+
+    @SneakyThrows
+    private JsonNode getResponseWithMessage(String message) {
+        JsonNode nodeWithMessage = mapper.createObjectNode().put("message", message);
+        JsonNode commandStatusNode = mapper.createObjectNode().set("commandStatus", nodeWithMessage);
+        return mapper.createArrayNode().add(mapper.valueToTree(commandStatusNode));
+    }
+}

+ 72 - 0
kafka-ui-api/src/test/java/com/provectus/kafka/ui/strategy/ksqlStatement/ExplainStrategyTest.java

@@ -0,0 +1,72 @@
+package com.provectus.kafka.ui.strategy.ksqlStatement;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.provectus.kafka.ui.exception.UnprocessableEntityException;
+import com.provectus.kafka.ui.model.KsqlCommandResponse;
+import com.provectus.kafka.ui.model.Table;
+import lombok.SneakyThrows;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+import java.util.List;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.junit.jupiter.api.Assertions.*;
+
+@ExtendWith(MockitoExtension.class)
+class ExplainStrategyTest {
+    private KsqlStatementStrategy ksqlStatementStrategy;
+    private ObjectMapper mapper = new ObjectMapper();
+
+    @BeforeEach
+    public void setUp() {
+        ksqlStatementStrategy = new ExplainStrategy();
+    }
+
+    @Test
+    public void shouldReturnUri() {
+        ksqlStatementStrategy.host("ksqldb-server:8088");
+        assertThat(ksqlStatementStrategy.getUri()).isEqualTo("ksqldb-server:8088/ksql");
+    }
+
+    @Test
+    public void shouldReturnTrueInTest() {
+        assertTrue(ksqlStatementStrategy.test("explain users_query_id;"));
+    }
+
+    @Test
+    public void shouldReturnFalseInTest() {
+        assertFalse(ksqlStatementStrategy.test("show queries;"));
+    }
+
+    @Test
+    public void shouldSerializeResponse() {
+        JsonNode node = getResponseWithObjectNode();
+        KsqlCommandResponse serializedResponse = ksqlStatementStrategy.serializeResponse(node);
+        Table table = serializedResponse.getData();
+        assertThat(table.getHeaders()).isEqualTo(List.of("key", "value"));
+        assertThat(table.getRows()).isEqualTo(List.of(List.of("name", "kafka")));
+    }
+
+    @Test
+    public void shouldSerializeWithException() {
+        JsonNode sourceDescriptionNode = mapper.createObjectNode().put("sourceDescription", "nodeWithMessage");
+        JsonNode node = mapper.createArrayNode().add(mapper.valueToTree(sourceDescriptionNode));
+        Exception exception = assertThrows(
+                UnprocessableEntityException.class,
+                () -> ksqlStatementStrategy.serializeResponse(node)
+        );
+
+        assertThat(exception.getMessage()).isEqualTo("KSQL DB response mapping error");
+    }
+
+    @SneakyThrows
+    private JsonNode getResponseWithObjectNode() {
+        JsonNode nodeWithMessage = mapper.createObjectNode().put("name", "kafka");
+        JsonNode nodeWithResponse = mapper.createObjectNode().set("queryDescription", nodeWithMessage);
+        return mapper.createArrayNode().add(mapper.valueToTree(nodeWithResponse));
+    }
+}

+ 110 - 0
kafka-ui-api/src/test/java/com/provectus/kafka/ui/strategy/ksqlStatement/ListStrategyTest.java

@@ -0,0 +1,110 @@
+package com.provectus.kafka.ui.strategy.ksqlStatement;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.provectus.kafka.ui.exception.UnprocessableEntityException;
+import com.provectus.kafka.ui.model.KsqlCommandResponse;
+import com.provectus.kafka.ui.model.Table;
+import lombok.SneakyThrows;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+import java.util.List;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.junit.jupiter.api.Assertions.*;
+
+@ExtendWith(MockitoExtension.class)
+class ListStrategyTest {
+    private KsqlStatementStrategy ksqlStatementStrategy;
+    private ObjectMapper mapper = new ObjectMapper();
+
+    @BeforeEach
+    public void setUp() {
+        ksqlStatementStrategy = new ListStrategy();
+    }
+
+    @Test
+    public void shouldReturnUri() {
+        ksqlStatementStrategy.host("ksqldb-server:8088");
+        assertThat(ksqlStatementStrategy.getUri()).isEqualTo("ksqldb-server:8088/ksql");
+    }
+
+    @Test
+    public void shouldReturnTrueInTest() {
+        assertTrue(ksqlStatementStrategy.test("LIST STREAMS;"));
+        assertTrue(ksqlStatementStrategy.test("LIST TABLES;"));
+        assertTrue(ksqlStatementStrategy.test("LIST TOPICS;"));
+        assertTrue(ksqlStatementStrategy.test("LIST FUNCTIONS;"));
+    }
+
+    @Test
+    public void shouldReturnFalseInTest() {
+        assertFalse(ksqlStatementStrategy.test("SHOW STREAMS;"));
+        assertFalse(ksqlStatementStrategy.test("SHOW TABLES;"));
+        assertFalse(ksqlStatementStrategy.test("SHOW TOPICS;"));
+        assertFalse(ksqlStatementStrategy.test("SHOW FUNCTIONS;"));
+    }
+
+    @Test
+    public void shouldSerializeStreamsResponse() {
+        JsonNode node = getResponseWithData("streams");
+        ksqlStatementStrategy.test("list streams;");
+        KsqlCommandResponse serializedResponse = ksqlStatementStrategy.serializeResponse(node);
+        Table table = serializedResponse.getData();
+        assertThat(table.getHeaders()).isEqualTo(List.of("header"));
+        assertThat(table.getRows()).isEqualTo(List.of(List.of("value")));
+    }
+
+    @Test
+    public void shouldSerializeTablesResponse() {
+        JsonNode node = getResponseWithData("tables");
+        ksqlStatementStrategy.test("list tables;");
+        KsqlCommandResponse serializedResponse = ksqlStatementStrategy.serializeResponse(node);
+        Table table = serializedResponse.getData();
+        assertThat(table.getHeaders()).isEqualTo(List.of("header"));
+        assertThat(table.getRows()).isEqualTo(List.of(List.of("value")));
+    }
+
+    @Test
+    public void shouldSerializeTopicsResponse() {
+        JsonNode node = getResponseWithData("topics");
+        ksqlStatementStrategy.test("list topics;");
+        KsqlCommandResponse serializedResponse = ksqlStatementStrategy.serializeResponse(node);
+        Table table = serializedResponse.getData();
+        assertThat(table.getHeaders()).isEqualTo(List.of("header"));
+        assertThat(table.getRows()).isEqualTo(List.of(List.of("value")));
+    }
+
+    @Test
+    public void shouldSerializeFunctionsResponse() {
+        JsonNode node = getResponseWithData("functions");
+        ksqlStatementStrategy.test("list functions;");
+        KsqlCommandResponse serializedResponse = ksqlStatementStrategy.serializeResponse(node);
+        Table table = serializedResponse.getData();
+        assertThat(table.getHeaders()).isEqualTo(List.of("header"));
+        assertThat(table.getRows()).isEqualTo(List.of(List.of("value")));
+    }
+
+    @Test
+    public void shouldSerializeWithException() {
+        JsonNode node = getResponseWithData("streams");
+        ksqlStatementStrategy.test("list tables;");
+        Exception exception = assertThrows(
+                UnprocessableEntityException.class,
+                () -> ksqlStatementStrategy.serializeResponse(node)
+        );
+
+        assertThat(exception.getMessage()).isEqualTo("KSQL DB response mapping error");
+    }
+
+    @SneakyThrows
+    private JsonNode getResponseWithData(String key) {
+        JsonNode nodeWithDataItem = mapper.createObjectNode().put("header", "value");
+        JsonNode nodeWithData = mapper.createArrayNode().add(nodeWithDataItem);
+        JsonNode nodeWithResponse = mapper.createObjectNode().set(key, nodeWithData);
+        return mapper.createArrayNode().add(mapper.valueToTree(nodeWithResponse));
+    }
+}

+ 78 - 0
kafka-ui-api/src/test/java/com/provectus/kafka/ui/strategy/ksqlStatement/SelectStrategyTest.java

@@ -0,0 +1,78 @@
+package com.provectus.kafka.ui.strategy.ksqlStatement;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.provectus.kafka.ui.exception.UnprocessableEntityException;
+import com.provectus.kafka.ui.model.KsqlCommandResponse;
+import com.provectus.kafka.ui.model.Table;
+import lombok.SneakyThrows;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+import java.util.List;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.junit.jupiter.api.Assertions.*;
+
+@ExtendWith(MockitoExtension.class)
+class SelectStrategyTest {
+    private KsqlStatementStrategy ksqlStatementStrategy;
+    private ObjectMapper mapper = new ObjectMapper();
+
+    @BeforeEach
+    public void setUp() {
+        ksqlStatementStrategy = new SelectStrategy();
+    }
+
+    @Test
+    public void shouldReturnUri() {
+        ksqlStatementStrategy.host("ksqldb-server:8088");
+        assertThat(ksqlStatementStrategy.getUri()).isEqualTo("ksqldb-server:8088/query");
+    }
+
+    @Test
+    public void shouldReturnTrueInTest() {
+        assertTrue(ksqlStatementStrategy.test("select * from users;"));
+    }
+
+    @Test
+    public void shouldReturnFalseInTest() {
+        assertFalse(ksqlStatementStrategy.test("show streams;"));
+        assertFalse(ksqlStatementStrategy.test("select *;"));
+    }
+
+    @Test
+    public void shouldSerializeResponse() {
+        JsonNode node = getResponseWithData();
+        KsqlCommandResponse serializedResponse = ksqlStatementStrategy.serializeResponse(node);
+        Table table = serializedResponse.getData();
+        assertThat(table.getHeaders()).isEqualTo(List.of("header1", "header2"));
+        assertThat(table.getRows()).isEqualTo(List.of(List.of("value1", "value2")));
+    }
+
+    @Test
+    public void shouldSerializeWithException() {
+        JsonNode node = mapper.createObjectNode();
+        Exception exception = assertThrows(
+                UnprocessableEntityException.class,
+                () -> ksqlStatementStrategy.serializeResponse(node)
+        );
+
+        assertThat(exception.getMessage()).isEqualTo("KSQL DB response mapping error");
+    }
+
+    @SneakyThrows
+    private JsonNode getResponseWithData() {
+        JsonNode headerNode = mapper.createObjectNode().set(
+                "header", mapper.createObjectNode().put("schema", "header1, header2")
+        );
+        JsonNode row = mapper.createObjectNode().set(
+                "row", mapper.createObjectNode().set(
+                        "columns", mapper.createArrayNode().add("value1").add("value2")
+                )
+        );
+        return mapper.createArrayNode().add(headerNode).add(row);
+    }
+}

+ 136 - 0
kafka-ui-api/src/test/java/com/provectus/kafka/ui/strategy/ksqlStatement/ShowStrategyTest.java

@@ -0,0 +1,136 @@
+package com.provectus.kafka.ui.strategy.ksqlStatement;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.provectus.kafka.ui.exception.UnprocessableEntityException;
+import com.provectus.kafka.ui.model.KsqlCommand;
+import com.provectus.kafka.ui.model.KsqlCommandResponse;
+import com.provectus.kafka.ui.model.KsqlResponseTable;
+import com.provectus.kafka.ui.model.Table;
+import lombok.SneakyThrows;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+import java.util.List;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.junit.jupiter.api.Assertions.*;
+
+@ExtendWith(MockitoExtension.class)
+class ShowStrategyTest {
+    private KsqlStatementStrategy ksqlStatementStrategy;
+    private ObjectMapper mapper = new ObjectMapper();
+
+    @BeforeEach
+    public void setUp() {
+        ksqlStatementStrategy = new ShowStrategy();
+    }
+
+    @Test
+    public void shouldReturnUri() {
+        ksqlStatementStrategy.host("ksqldb-server:8088");
+        assertThat(ksqlStatementStrategy.getUri()).isEqualTo("ksqldb-server:8088/ksql");
+    }
+
+    @Test
+    public void shouldReturnTrueInTest() {
+        assertTrue(ksqlStatementStrategy.test("SHOW STREAMS;"));
+        assertTrue(ksqlStatementStrategy.test("SHOW TABLES;"));
+        assertTrue(ksqlStatementStrategy.test("SHOW TOPICS;"));
+        assertTrue(ksqlStatementStrategy.test("SHOW QUERIES;"));
+        assertTrue(ksqlStatementStrategy.test("SHOW PROPERTIES;"));
+        assertTrue(ksqlStatementStrategy.test("SHOW FUNCTIONS;"));
+    }
+
+    @Test
+    public void shouldReturnFalseInTest() {
+        assertFalse(ksqlStatementStrategy.test("LIST STREAMS;"));
+        assertFalse(ksqlStatementStrategy.test("LIST TABLES;"));
+        assertFalse(ksqlStatementStrategy.test("LIST TOPICS;"));
+        assertFalse(ksqlStatementStrategy.test("LIST QUERIES;"));
+        assertFalse(ksqlStatementStrategy.test("LIST PROPERTIES;"));
+        assertFalse(ksqlStatementStrategy.test("LIST FUNCTIONS;"));
+    }
+
+    @Test
+    public void shouldSerializeStreamsResponse() {
+        JsonNode node = getResponseWithData("streams");
+        ksqlStatementStrategy.test("show streams;");
+        KsqlCommandResponse serializedResponse = ksqlStatementStrategy.serializeResponse(node);
+        Table table = serializedResponse.getData();
+        assertThat(table.getHeaders()).isEqualTo(List.of("header"));
+        assertThat(table.getRows()).isEqualTo(List.of(List.of("value")));
+    }
+
+    @Test
+    public void shouldSerializeTablesResponse() {
+        JsonNode node = getResponseWithData("tables");
+        ksqlStatementStrategy.test("show tables;");
+        KsqlCommandResponse serializedResponse = ksqlStatementStrategy.serializeResponse(node);
+        Table table = serializedResponse.getData();
+        assertThat(table.getHeaders()).isEqualTo(List.of("header"));
+        assertThat(table.getRows()).isEqualTo(List.of(List.of("value")));
+    }
+
+    @Test
+    public void shouldSerializeTopicsResponse() {
+        JsonNode node = getResponseWithData("topics");
+        ksqlStatementStrategy.test("show topics;");
+        KsqlCommandResponse serializedResponse = ksqlStatementStrategy.serializeResponse(node);
+        Table table = serializedResponse.getData();
+        assertThat(table.getHeaders()).isEqualTo(List.of("header"));
+        assertThat(table.getRows()).isEqualTo(List.of(List.of("value")));
+    }
+
+    @Test
+    public void shouldSerializePropertiesResponse() {
+        JsonNode node = getResponseWithData("properties");
+        ksqlStatementStrategy.test("show properties;");
+        KsqlCommandResponse serializedResponse = ksqlStatementStrategy.serializeResponse(node);
+        Table table = serializedResponse.getData();
+        assertThat(table.getHeaders()).isEqualTo(List.of("header"));
+        assertThat(table.getRows()).isEqualTo(List.of(List.of("value")));
+    }
+
+    @Test
+    public void shouldSerializeFunctionsResponse() {
+        JsonNode node = getResponseWithData("functions");
+        ksqlStatementStrategy.test("show functions;");
+        KsqlCommandResponse serializedResponse = ksqlStatementStrategy.serializeResponse(node);
+        Table table = serializedResponse.getData();
+        assertThat(table.getHeaders()).isEqualTo(List.of("header"));
+        assertThat(table.getRows()).isEqualTo(List.of(List.of("value")));
+    }
+
+    @Test
+    public void shouldSerializeQueriesResponse() {
+        JsonNode node = getResponseWithData("queries");
+        ksqlStatementStrategy.test("show queries;");
+        KsqlCommandResponse serializedResponse = ksqlStatementStrategy.serializeResponse(node);
+        Table table = serializedResponse.getData();
+        assertThat(table.getHeaders()).isEqualTo(List.of("header"));
+        assertThat(table.getRows()).isEqualTo(List.of(List.of("value")));
+    }
+
+    @Test
+    public void shouldSerializeWithException() {
+        JsonNode node = getResponseWithData("streams");
+        ksqlStatementStrategy.test("show tables;");
+        Exception exception = assertThrows(
+                UnprocessableEntityException.class,
+                () -> ksqlStatementStrategy.serializeResponse(node)
+        );
+
+        assertThat(exception.getMessage()).isEqualTo("KSQL DB response mapping error");
+    }
+
+    @SneakyThrows
+    private JsonNode getResponseWithData(String key) {
+        JsonNode nodeWithDataItem = mapper.createObjectNode().put("header", "value");
+        JsonNode nodeWithData = mapper.createArrayNode().add(nodeWithDataItem);
+        JsonNode nodeWithResponse = mapper.createObjectNode().set(key, nodeWithData);
+        return mapper.createArrayNode().add(mapper.valueToTree(nodeWithResponse));
+    }
+}

+ 70 - 0
kafka-ui-api/src/test/java/com/provectus/kafka/ui/strategy/ksqlStatement/TerminateStrategyTest.java

@@ -0,0 +1,70 @@
+package com.provectus.kafka.ui.strategy.ksqlStatement;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.provectus.kafka.ui.exception.UnprocessableEntityException;
+import com.provectus.kafka.ui.model.KsqlCommandResponse;
+import lombok.SneakyThrows;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.junit.jupiter.api.Assertions.*;
+
+@ExtendWith(MockitoExtension.class)
+class TerminateStrategyTest {
+    private KsqlStatementStrategy ksqlStatementStrategy;
+    private ObjectMapper mapper = new ObjectMapper();
+
+    @BeforeEach
+    public void setUp() {
+        ksqlStatementStrategy = new TerminateStrategy();
+    }
+
+    @Test
+    public void shouldReturnUri() {
+        ksqlStatementStrategy.host("ksqldb-server:8088");
+        assertThat(ksqlStatementStrategy.getUri()).isEqualTo("ksqldb-server:8088/ksql");
+    }
+
+    @Test
+    public void shouldReturnTrueInTest() {
+        assertTrue(ksqlStatementStrategy.test("terminate query_id;"));
+    }
+
+    @Test
+    public void shouldReturnFalseInTest() {
+        assertFalse(ksqlStatementStrategy.test("show streams;"));
+        assertFalse(ksqlStatementStrategy.test("create table test;"));
+    }
+
+    @Test
+    public void shouldSerializeResponse() {
+        String message = "query terminated.";
+        JsonNode node = getResponseWithMessage(message);
+        KsqlCommandResponse serializedResponse = ksqlStatementStrategy.serializeResponse(node);
+        assertThat(serializedResponse.getMessage()).isEqualTo(message);
+
+    }
+
+    @Test
+    public void shouldSerializeWithException() {
+        JsonNode commandStatusNode = mapper.createObjectNode().put("commandStatus", "nodeWithMessage");
+        JsonNode node = mapper.createArrayNode().add(mapper.valueToTree(commandStatusNode));
+        Exception exception = assertThrows(
+                UnprocessableEntityException.class,
+                () -> ksqlStatementStrategy.serializeResponse(node)
+        );
+
+        assertThat(exception.getMessage()).isEqualTo("KSQL DB response mapping error");
+    }
+
+    @SneakyThrows
+    private JsonNode getResponseWithMessage(String message) {
+        JsonNode nodeWithMessage = mapper.createObjectNode().put("message", message);
+        JsonNode commandStatusNode = mapper.createObjectNode().set("commandStatus", nodeWithMessage);
+        return mapper.createArrayNode().add(mapper.valueToTree(commandStatusNode));
+    }
+}

+ 1 - 0
kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml

@@ -1206,6 +1206,7 @@ components:
             enum:
             enum:
               - SCHEMA_REGISTRY
               - SCHEMA_REGISTRY
               - KAFKA_CONNECT
               - KAFKA_CONNECT
+              - KSQL_DB
       required:
       required:
         - id
         - id
         - name
         - name