diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/client/KsqlClient.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/client/KsqlClient.java index b4df865c1a..29d19a89f2 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/client/KsqlClient.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/client/KsqlClient.java @@ -19,7 +19,7 @@ import reactor.core.publisher.Mono; @Service @RequiredArgsConstructor @Log4j2 -public final class KsqlClient { +public class KsqlClient { private final WebClient webClient; private final ObjectMapper mapper; diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/KsqlService.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/KsqlService.java index e9306be22c..c9eb9b9298 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/KsqlService.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/KsqlService.java @@ -25,7 +25,10 @@ public class KsqlService { return Mono.justOrEmpty(clustersStorage.getClusterByName(clusterName)) .switchIfEmpty(Mono.error(ClusterNotFoundException::new)) .map(KafkaCluster::getKsqldbServer) - .switchIfEmpty(Mono.error(KsqlDbNotFoundException::new)) + .onErrorResume(e -> { + Throwable throwable = e instanceof ClusterNotFoundException ? e : new KsqlDbNotFoundException(); + return Mono.error(throwable); + }) .flatMap(host -> getStatementStrategyForKsqlCommand(ksqlCommand) .map(statement -> statement.host(host)) ) diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/strategy/ksqlStatement/DescribeStrategy.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/strategy/ksqlStatement/DescribeStrategy.java index a6b02636a3..84299e8fcf 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/strategy/ksqlStatement/DescribeStrategy.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/strategy/ksqlStatement/DescribeStrategy.java @@ -12,7 +12,6 @@ public class DescribeStrategy extends KsqlStatementStrategy { @Override public KsqlCommandResponse serializeResponse(JsonNode response) { - System.out.println(response); return serializeTableResponse(response, responseValueKey); } diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/strategy/ksqlStatement/KsqlStatementStrategy.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/strategy/ksqlStatement/KsqlStatementStrategy.java index f936fa9e7b..2315971cf9 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/strategy/ksqlStatement/KsqlStatementStrategy.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/strategy/ksqlStatement/KsqlStatementStrategy.java @@ -63,16 +63,19 @@ public abstract class KsqlStatementStrategy { } protected KsqlCommandResponse serializeQueryResponse(JsonNode response) { - KsqlCommandResponse commandResponse = new KsqlCommandResponse(); - Table table = (new Table()) - .headers(getQueryResponseHeader(response)) - .rows(getQueryResponseRows(response)); - return commandResponse.data(table); + if (response.isArray() && response.size() > 0) { + KsqlCommandResponse commandResponse = new KsqlCommandResponse(); + Table table = (new Table()) + .headers(getQueryResponseHeader(response)) + .rows(getQueryResponseRows(response)); + return commandResponse.data(table); + } + throw new UnprocessableEntityException("KSQL DB response mapping error"); } private List getQueryResponseHeader(JsonNode response) { JsonNode headerRow = response.get(0); - if (headerRow.isObject() && headerRow.size() > 0) { + if (headerRow.isObject() && headerRow.has("header")) { String schema = headerRow.get("header").get("schema").asText(); return Arrays.stream(schema.split(",")).map(String::trim).collect(Collectors.toList()); } diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/strategy/ksqlStatement/SelectStrategy.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/strategy/ksqlStatement/SelectStrategy.java index 309578a826..3882bff434 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/strategy/ksqlStatement/SelectStrategy.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/strategy/ksqlStatement/SelectStrategy.java @@ -10,7 +10,6 @@ public class SelectStrategy extends KsqlStatementStrategy { @Override public KsqlCommandResponse serializeResponse(JsonNode response) { - System.out.println(response); return serializeQueryResponse(response); } @@ -19,7 +18,6 @@ public class SelectStrategy extends KsqlStatementStrategy { return requestPath; } - @Override protected String getTestRegExp() { return "select (.*) from (.*);"; diff --git a/kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/KsqlServiceTest.java b/kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/KsqlServiceTest.java new file mode 100644 index 0000000000..303963367a --- /dev/null +++ b/kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/KsqlServiceTest.java @@ -0,0 +1,117 @@ +package com.provectus.kafka.ui.service; + +import com.provectus.kafka.ui.client.KsqlClient; +import com.provectus.kafka.ui.exception.ClusterNotFoundException; +import com.provectus.kafka.ui.exception.KsqlDbNotFoundException; +import com.provectus.kafka.ui.exception.UnprocessableEntityException; +import com.provectus.kafka.ui.model.KafkaCluster; +import com.provectus.kafka.ui.model.KsqlCommand; +import com.provectus.kafka.ui.model.KsqlCommandResponse; +import com.provectus.kafka.ui.strategy.ksqlStatement.KsqlStatementStrategy; +import com.provectus.kafka.ui.strategy.ksqlStatement.ShowStrategy; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.Mockito; +import org.mockito.junit.jupiter.MockitoExtension; +import reactor.core.publisher.Mono; +import reactor.test.StepVerifier; + +import java.util.List; +import java.util.Optional; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.*; + +@ExtendWith(MockitoExtension.class) +class KsqlServiceTest { + private KsqlService ksqlService; + private KsqlStatementStrategy ksqlStatementStrategy; + + @Mock + private ClustersStorage clustersStorage; + @Mock + private KsqlClient ksqlClient; + + + @BeforeEach + public void setUp() { + this.ksqlStatementStrategy = new ShowStrategy(); + this.ksqlService = new KsqlService( + this.ksqlClient, + this.clustersStorage, + List.of(ksqlStatementStrategy) + ); + } + + @Test + public void shouldThrowClusterNotFoundExceptionOnExecuteKsqlCommand() { + String clusterName = "test"; + KsqlCommand command = (new KsqlCommand()).ksql("show streams;"); + when(clustersStorage.getClusterByName(clusterName)).thenReturn(Optional.ofNullable(null)); + + StepVerifier.create(ksqlService.executeKsqlCommand(clusterName, Mono.just(command))) + .verifyError(ClusterNotFoundException.class); + } + + @Test + public void shouldThrowKsqlDbNotFoundExceptionOnExecuteKsqlCommand() { + String clusterName = "test"; + KsqlCommand command = (new KsqlCommand()).ksql("show streams;"); + KafkaCluster kafkaCluster = Mockito.mock(KafkaCluster.class); + when(clustersStorage.getClusterByName(clusterName)).thenReturn(Optional.ofNullable(kafkaCluster)); + when(kafkaCluster.getKsqldbServer()).thenReturn(null); + + StepVerifier.create(ksqlService.executeKsqlCommand(clusterName, Mono.just(command))) + .verifyError(KsqlDbNotFoundException.class); + } + + @Test + public void shouldThrowUnprocessableEntityExceptionOnExecuteKsqlCommand() { + String clusterName = "test"; + KsqlCommand command = (new KsqlCommand()).ksql("CREATE STREAM users WITH (KAFKA_TOPIC='users');"); + KafkaCluster kafkaCluster = Mockito.mock(KafkaCluster.class); + when(clustersStorage.getClusterByName(clusterName)).thenReturn(Optional.ofNullable(kafkaCluster)); + when(kafkaCluster.getKsqldbServer()).thenReturn("localhost:8088"); + + StepVerifier.create(ksqlService.executeKsqlCommand(clusterName, Mono.just(command))) + .verifyError(UnprocessableEntityException.class); + + StepVerifier.create(ksqlService.executeKsqlCommand(clusterName, Mono.just(command))) + .verifyErrorMessage("Invalid sql"); + } + + @Test + public void shouldSetHostToStrategy() { + String clusterName = "test"; + String host = "localhost:8088"; + KsqlCommand command = (new KsqlCommand()).ksql("show streams;"); + KafkaCluster kafkaCluster = Mockito.mock(KafkaCluster.class); + + when(clustersStorage.getClusterByName(clusterName)).thenReturn(Optional.ofNullable(kafkaCluster)); + when(kafkaCluster.getKsqldbServer()).thenReturn(host); + when(ksqlClient.execute(any())).thenReturn(Mono.just(new KsqlCommandResponse())); + + ksqlService.executeKsqlCommand(clusterName, Mono.just(command)).block(); + assertThat(ksqlStatementStrategy.getUri()).isEqualTo(host + "/ksql"); + } + + @Test + public void shouldCallClientAndReturnResponse() { + String clusterName = "test"; + KsqlCommand command = (new KsqlCommand()).ksql("show streams;"); + KafkaCluster kafkaCluster = Mockito.mock(KafkaCluster.class); + KsqlCommandResponse response = new KsqlCommandResponse().message("success"); + + when(clustersStorage.getClusterByName(clusterName)).thenReturn(Optional.ofNullable(kafkaCluster)); + when(kafkaCluster.getKsqldbServer()).thenReturn("host"); + when(ksqlClient.execute(any())).thenReturn(Mono.just(response)); + + KsqlCommandResponse receivedResponse = ksqlService.executeKsqlCommand(clusterName, Mono.just(command)).block(); + verify(ksqlClient, times(1)).execute(ksqlStatementStrategy); + assertThat(receivedResponse).isEqualTo(response); + + } +} diff --git a/kafka-ui-api/src/test/java/com/provectus/kafka/ui/strategy/ksqlStatement/CreateStrategyTest.java b/kafka-ui-api/src/test/java/com/provectus/kafka/ui/strategy/ksqlStatement/CreateStrategyTest.java new file mode 100644 index 0000000000..938b5ae284 --- /dev/null +++ b/kafka-ui-api/src/test/java/com/provectus/kafka/ui/strategy/ksqlStatement/CreateStrategyTest.java @@ -0,0 +1,82 @@ +package com.provectus.kafka.ui.strategy.ksqlStatement; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.provectus.kafka.ui.exception.UnprocessableEntityException; +import com.provectus.kafka.ui.model.KsqlCommandResponse; +import lombok.SneakyThrows; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.junit.jupiter.MockitoExtension; +import static org.assertj.core.api.Assertions.assertThat; +import static org.junit.jupiter.api.Assertions.*; + +@ExtendWith(MockitoExtension.class) +class CreateStrategyTest { + private KsqlStatementStrategy ksqlStatementStrategy; + private ObjectMapper mapper = new ObjectMapper(); + + @BeforeEach + public void setUp() { + ksqlStatementStrategy = new CreateStrategy(); + } + + @Test + public void shouldReturnUri() { + ksqlStatementStrategy.host("ksqldb-server:8088"); + assertThat(ksqlStatementStrategy.getUri()).isEqualTo("ksqldb-server:8088/ksql"); + } + + @Test + public void shouldReturnTrueInTest() { + assertTrue(ksqlStatementStrategy.test("CREATE STREAM stream WITH (KAFKA_TOPIC='topic');")); + assertTrue(ksqlStatementStrategy.test("CREATE STREAM stream" + + " AS SELECT users.id AS userid FROM users EMIT CHANGES;" + )); + assertTrue(ksqlStatementStrategy.test( + "CREATE TABLE table (id VARCHAR) WITH (KAFKA_TOPIC='table');" + )); + assertTrue(ksqlStatementStrategy.test( + "CREATE TABLE pageviews_regions WITH (KEY_FORMAT='JSON')" + + " AS SELECT gender, COUNT(*) AS numbers" + + " FROM pageviews EMIT CHANGES;" + )); + } + + @Test + public void shouldReturnFalseInTest() { + assertFalse(ksqlStatementStrategy.test("show streams;")); + assertFalse(ksqlStatementStrategy.test("show tables;")); + assertFalse(ksqlStatementStrategy.test("CREATE TABLE test;")); + assertFalse(ksqlStatementStrategy.test("CREATE STREAM test;")); + } + + @Test + public void shouldSerializeResponse() { + String message = "updated successful"; + JsonNode node = getResponseWithMessage(message); + KsqlCommandResponse serializedResponse = ksqlStatementStrategy.serializeResponse(node); + assertThat(serializedResponse.getMessage()).isEqualTo(message); + + } + + @Test + public void shouldSerializeWithException() { + JsonNode commandStatusNode = mapper.createObjectNode().put("commandStatus", "nodeWithMessage"); + JsonNode node = mapper.createArrayNode().add(mapper.valueToTree(commandStatusNode)); + Exception exception = assertThrows( + UnprocessableEntityException.class, + () -> ksqlStatementStrategy.serializeResponse(node) + ); + + assertThat(exception.getMessage()).isEqualTo("KSQL DB response mapping error"); + } + + @SneakyThrows + private JsonNode getResponseWithMessage(String message) { + JsonNode nodeWithMessage = mapper.createObjectNode().put("message", message); + JsonNode commandStatusNode = mapper.createObjectNode().set("commandStatus", nodeWithMessage); + return mapper.createArrayNode().add(mapper.valueToTree(commandStatusNode)); + } +} diff --git a/kafka-ui-api/src/test/java/com/provectus/kafka/ui/strategy/ksqlStatement/DescribeStrategyTest.java b/kafka-ui-api/src/test/java/com/provectus/kafka/ui/strategy/ksqlStatement/DescribeStrategyTest.java new file mode 100644 index 0000000000..a36fce35e2 --- /dev/null +++ b/kafka-ui-api/src/test/java/com/provectus/kafka/ui/strategy/ksqlStatement/DescribeStrategyTest.java @@ -0,0 +1,74 @@ +package com.provectus.kafka.ui.strategy.ksqlStatement; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.provectus.kafka.ui.exception.UnprocessableEntityException; +import com.provectus.kafka.ui.model.KsqlCommandResponse; +import com.provectus.kafka.ui.model.Table; +import lombok.SneakyThrows; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.junit.jupiter.MockitoExtension; + +import java.util.List; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.junit.jupiter.api.Assertions.*; + +@ExtendWith(MockitoExtension.class) +class DescribeStrategyTest { + private KsqlStatementStrategy ksqlStatementStrategy; + private ObjectMapper mapper = new ObjectMapper(); + + @BeforeEach + public void setUp() { + ksqlStatementStrategy = new DescribeStrategy(); + } + + @Test + public void shouldReturnUri() { + ksqlStatementStrategy.host("ksqldb-server:8088"); + assertThat(ksqlStatementStrategy.getUri()).isEqualTo("ksqldb-server:8088/ksql"); + } + + @Test + public void shouldReturnTrueInTest() { + assertTrue(ksqlStatementStrategy.test("DESCRIBE users;")); + assertTrue(ksqlStatementStrategy.test("DESCRIBE EXTENDED users;")); + } + + @Test + public void shouldReturnFalseInTest() { + assertFalse(ksqlStatementStrategy.test("list streams;")); + assertFalse(ksqlStatementStrategy.test("show tables;")); + } + + @Test + public void shouldSerializeResponse() { + JsonNode node = getResponseWithObjectNode(); + KsqlCommandResponse serializedResponse = ksqlStatementStrategy.serializeResponse(node); + Table table = serializedResponse.getData(); + assertThat(table.getHeaders()).isEqualTo(List.of("key", "value")); + assertThat(table.getRows()).isEqualTo(List.of(List.of("name", "kafka"))); + } + + @Test + public void shouldSerializeWithException() { + JsonNode sourceDescriptionNode = mapper.createObjectNode().put("sourceDescription", "nodeWithMessage"); + JsonNode node = mapper.createArrayNode().add(mapper.valueToTree(sourceDescriptionNode)); + Exception exception = assertThrows( + UnprocessableEntityException.class, + () -> ksqlStatementStrategy.serializeResponse(node) + ); + + assertThat(exception.getMessage()).isEqualTo("KSQL DB response mapping error"); + } + + @SneakyThrows + private JsonNode getResponseWithObjectNode() { + JsonNode nodeWithMessage = mapper.createObjectNode().put("name", "kafka"); + JsonNode nodeWithResponse = mapper.createObjectNode().set("sourceDescription", nodeWithMessage); + return mapper.createArrayNode().add(mapper.valueToTree(nodeWithResponse)); + } +} diff --git a/kafka-ui-api/src/test/java/com/provectus/kafka/ui/strategy/ksqlStatement/DropStrategyTest.java b/kafka-ui-api/src/test/java/com/provectus/kafka/ui/strategy/ksqlStatement/DropStrategyTest.java new file mode 100644 index 0000000000..c1fb4e549f --- /dev/null +++ b/kafka-ui-api/src/test/java/com/provectus/kafka/ui/strategy/ksqlStatement/DropStrategyTest.java @@ -0,0 +1,73 @@ +package com.provectus.kafka.ui.strategy.ksqlStatement; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.provectus.kafka.ui.exception.UnprocessableEntityException; +import com.provectus.kafka.ui.model.KsqlCommandResponse; +import lombok.SneakyThrows; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.junit.jupiter.MockitoExtension; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.junit.jupiter.api.Assertions.*; + +@ExtendWith(MockitoExtension.class) +class DropStrategyTest { + private KsqlStatementStrategy ksqlStatementStrategy; + private ObjectMapper mapper = new ObjectMapper(); + + @BeforeEach + public void setUp() { + ksqlStatementStrategy = new DropStrategy(); + } + + @Test + public void shouldReturnUri() { + ksqlStatementStrategy.host("ksqldb-server:8088"); + assertThat(ksqlStatementStrategy.getUri()).isEqualTo("ksqldb-server:8088/ksql"); + } + + @Test + public void shouldReturnTrueInTest() { + assertTrue(ksqlStatementStrategy.test("drop table table1;")); + assertTrue(ksqlStatementStrategy.test("drop stream stream2;")); + } + + @Test + public void shouldReturnFalseInTest() { + assertFalse(ksqlStatementStrategy.test("show streams;")); + assertFalse(ksqlStatementStrategy.test("show tables;")); + assertFalse(ksqlStatementStrategy.test("create table test;")); + assertFalse(ksqlStatementStrategy.test("create stream test;")); + } + + @Test + public void shouldSerializeResponse() { + String message = "updated successful"; + JsonNode node = getResponseWithMessage(message); + KsqlCommandResponse serializedResponse = ksqlStatementStrategy.serializeResponse(node); + assertThat(serializedResponse.getMessage()).isEqualTo(message); + + } + + @Test + public void shouldSerializeWithException() { + JsonNode commandStatusNode = mapper.createObjectNode().put("commandStatus", "nodeWithMessage"); + JsonNode node = mapper.createArrayNode().add(mapper.valueToTree(commandStatusNode)); + Exception exception = assertThrows( + UnprocessableEntityException.class, + () -> ksqlStatementStrategy.serializeResponse(node) + ); + + assertThat(exception.getMessage()).isEqualTo("KSQL DB response mapping error"); + } + + @SneakyThrows + private JsonNode getResponseWithMessage(String message) { + JsonNode nodeWithMessage = mapper.createObjectNode().put("message", message); + JsonNode commandStatusNode = mapper.createObjectNode().set("commandStatus", nodeWithMessage); + return mapper.createArrayNode().add(mapper.valueToTree(commandStatusNode)); + } +} diff --git a/kafka-ui-api/src/test/java/com/provectus/kafka/ui/strategy/ksqlStatement/ExplainStrategyTest.java b/kafka-ui-api/src/test/java/com/provectus/kafka/ui/strategy/ksqlStatement/ExplainStrategyTest.java new file mode 100644 index 0000000000..eb889576b6 --- /dev/null +++ b/kafka-ui-api/src/test/java/com/provectus/kafka/ui/strategy/ksqlStatement/ExplainStrategyTest.java @@ -0,0 +1,72 @@ +package com.provectus.kafka.ui.strategy.ksqlStatement; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.provectus.kafka.ui.exception.UnprocessableEntityException; +import com.provectus.kafka.ui.model.KsqlCommandResponse; +import com.provectus.kafka.ui.model.Table; +import lombok.SneakyThrows; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.junit.jupiter.MockitoExtension; + +import java.util.List; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.junit.jupiter.api.Assertions.*; + +@ExtendWith(MockitoExtension.class) +class ExplainStrategyTest { + private KsqlStatementStrategy ksqlStatementStrategy; + private ObjectMapper mapper = new ObjectMapper(); + + @BeforeEach + public void setUp() { + ksqlStatementStrategy = new ExplainStrategy(); + } + + @Test + public void shouldReturnUri() { + ksqlStatementStrategy.host("ksqldb-server:8088"); + assertThat(ksqlStatementStrategy.getUri()).isEqualTo("ksqldb-server:8088/ksql"); + } + + @Test + public void shouldReturnTrueInTest() { + assertTrue(ksqlStatementStrategy.test("explain users_query_id;")); + } + + @Test + public void shouldReturnFalseInTest() { + assertFalse(ksqlStatementStrategy.test("show queries;")); + } + + @Test + public void shouldSerializeResponse() { + JsonNode node = getResponseWithObjectNode(); + KsqlCommandResponse serializedResponse = ksqlStatementStrategy.serializeResponse(node); + Table table = serializedResponse.getData(); + assertThat(table.getHeaders()).isEqualTo(List.of("key", "value")); + assertThat(table.getRows()).isEqualTo(List.of(List.of("name", "kafka"))); + } + + @Test + public void shouldSerializeWithException() { + JsonNode sourceDescriptionNode = mapper.createObjectNode().put("sourceDescription", "nodeWithMessage"); + JsonNode node = mapper.createArrayNode().add(mapper.valueToTree(sourceDescriptionNode)); + Exception exception = assertThrows( + UnprocessableEntityException.class, + () -> ksqlStatementStrategy.serializeResponse(node) + ); + + assertThat(exception.getMessage()).isEqualTo("KSQL DB response mapping error"); + } + + @SneakyThrows + private JsonNode getResponseWithObjectNode() { + JsonNode nodeWithMessage = mapper.createObjectNode().put("name", "kafka"); + JsonNode nodeWithResponse = mapper.createObjectNode().set("queryDescription", nodeWithMessage); + return mapper.createArrayNode().add(mapper.valueToTree(nodeWithResponse)); + } +} diff --git a/kafka-ui-api/src/test/java/com/provectus/kafka/ui/strategy/ksqlStatement/ListStrategyTest.java b/kafka-ui-api/src/test/java/com/provectus/kafka/ui/strategy/ksqlStatement/ListStrategyTest.java new file mode 100644 index 0000000000..280151a92c --- /dev/null +++ b/kafka-ui-api/src/test/java/com/provectus/kafka/ui/strategy/ksqlStatement/ListStrategyTest.java @@ -0,0 +1,110 @@ +package com.provectus.kafka.ui.strategy.ksqlStatement; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.provectus.kafka.ui.exception.UnprocessableEntityException; +import com.provectus.kafka.ui.model.KsqlCommandResponse; +import com.provectus.kafka.ui.model.Table; +import lombok.SneakyThrows; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.junit.jupiter.MockitoExtension; + +import java.util.List; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.junit.jupiter.api.Assertions.*; + +@ExtendWith(MockitoExtension.class) +class ListStrategyTest { + private KsqlStatementStrategy ksqlStatementStrategy; + private ObjectMapper mapper = new ObjectMapper(); + + @BeforeEach + public void setUp() { + ksqlStatementStrategy = new ListStrategy(); + } + + @Test + public void shouldReturnUri() { + ksqlStatementStrategy.host("ksqldb-server:8088"); + assertThat(ksqlStatementStrategy.getUri()).isEqualTo("ksqldb-server:8088/ksql"); + } + + @Test + public void shouldReturnTrueInTest() { + assertTrue(ksqlStatementStrategy.test("LIST STREAMS;")); + assertTrue(ksqlStatementStrategy.test("LIST TABLES;")); + assertTrue(ksqlStatementStrategy.test("LIST TOPICS;")); + assertTrue(ksqlStatementStrategy.test("LIST FUNCTIONS;")); + } + + @Test + public void shouldReturnFalseInTest() { + assertFalse(ksqlStatementStrategy.test("SHOW STREAMS;")); + assertFalse(ksqlStatementStrategy.test("SHOW TABLES;")); + assertFalse(ksqlStatementStrategy.test("SHOW TOPICS;")); + assertFalse(ksqlStatementStrategy.test("SHOW FUNCTIONS;")); + } + + @Test + public void shouldSerializeStreamsResponse() { + JsonNode node = getResponseWithData("streams"); + ksqlStatementStrategy.test("list streams;"); + KsqlCommandResponse serializedResponse = ksqlStatementStrategy.serializeResponse(node); + Table table = serializedResponse.getData(); + assertThat(table.getHeaders()).isEqualTo(List.of("header")); + assertThat(table.getRows()).isEqualTo(List.of(List.of("value"))); + } + + @Test + public void shouldSerializeTablesResponse() { + JsonNode node = getResponseWithData("tables"); + ksqlStatementStrategy.test("list tables;"); + KsqlCommandResponse serializedResponse = ksqlStatementStrategy.serializeResponse(node); + Table table = serializedResponse.getData(); + assertThat(table.getHeaders()).isEqualTo(List.of("header")); + assertThat(table.getRows()).isEqualTo(List.of(List.of("value"))); + } + + @Test + public void shouldSerializeTopicsResponse() { + JsonNode node = getResponseWithData("topics"); + ksqlStatementStrategy.test("list topics;"); + KsqlCommandResponse serializedResponse = ksqlStatementStrategy.serializeResponse(node); + Table table = serializedResponse.getData(); + assertThat(table.getHeaders()).isEqualTo(List.of("header")); + assertThat(table.getRows()).isEqualTo(List.of(List.of("value"))); + } + + @Test + public void shouldSerializeFunctionsResponse() { + JsonNode node = getResponseWithData("functions"); + ksqlStatementStrategy.test("list functions;"); + KsqlCommandResponse serializedResponse = ksqlStatementStrategy.serializeResponse(node); + Table table = serializedResponse.getData(); + assertThat(table.getHeaders()).isEqualTo(List.of("header")); + assertThat(table.getRows()).isEqualTo(List.of(List.of("value"))); + } + + @Test + public void shouldSerializeWithException() { + JsonNode node = getResponseWithData("streams"); + ksqlStatementStrategy.test("list tables;"); + Exception exception = assertThrows( + UnprocessableEntityException.class, + () -> ksqlStatementStrategy.serializeResponse(node) + ); + + assertThat(exception.getMessage()).isEqualTo("KSQL DB response mapping error"); + } + + @SneakyThrows + private JsonNode getResponseWithData(String key) { + JsonNode nodeWithDataItem = mapper.createObjectNode().put("header", "value"); + JsonNode nodeWithData = mapper.createArrayNode().add(nodeWithDataItem); + JsonNode nodeWithResponse = mapper.createObjectNode().set(key, nodeWithData); + return mapper.createArrayNode().add(mapper.valueToTree(nodeWithResponse)); + } +} diff --git a/kafka-ui-api/src/test/java/com/provectus/kafka/ui/strategy/ksqlStatement/SelectStrategyTest.java b/kafka-ui-api/src/test/java/com/provectus/kafka/ui/strategy/ksqlStatement/SelectStrategyTest.java new file mode 100644 index 0000000000..7d62fa468f --- /dev/null +++ b/kafka-ui-api/src/test/java/com/provectus/kafka/ui/strategy/ksqlStatement/SelectStrategyTest.java @@ -0,0 +1,78 @@ +package com.provectus.kafka.ui.strategy.ksqlStatement; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.provectus.kafka.ui.exception.UnprocessableEntityException; +import com.provectus.kafka.ui.model.KsqlCommandResponse; +import com.provectus.kafka.ui.model.Table; +import lombok.SneakyThrows; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.junit.jupiter.MockitoExtension; + +import java.util.List; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.junit.jupiter.api.Assertions.*; + +@ExtendWith(MockitoExtension.class) +class SelectStrategyTest { + private KsqlStatementStrategy ksqlStatementStrategy; + private ObjectMapper mapper = new ObjectMapper(); + + @BeforeEach + public void setUp() { + ksqlStatementStrategy = new SelectStrategy(); + } + + @Test + public void shouldReturnUri() { + ksqlStatementStrategy.host("ksqldb-server:8088"); + assertThat(ksqlStatementStrategy.getUri()).isEqualTo("ksqldb-server:8088/query"); + } + + @Test + public void shouldReturnTrueInTest() { + assertTrue(ksqlStatementStrategy.test("select * from users;")); + } + + @Test + public void shouldReturnFalseInTest() { + assertFalse(ksqlStatementStrategy.test("show streams;")); + assertFalse(ksqlStatementStrategy.test("select *;")); + } + + @Test + public void shouldSerializeResponse() { + JsonNode node = getResponseWithData(); + KsqlCommandResponse serializedResponse = ksqlStatementStrategy.serializeResponse(node); + Table table = serializedResponse.getData(); + assertThat(table.getHeaders()).isEqualTo(List.of("header1", "header2")); + assertThat(table.getRows()).isEqualTo(List.of(List.of("value1", "value2"))); + } + + @Test + public void shouldSerializeWithException() { + JsonNode node = mapper.createObjectNode(); + Exception exception = assertThrows( + UnprocessableEntityException.class, + () -> ksqlStatementStrategy.serializeResponse(node) + ); + + assertThat(exception.getMessage()).isEqualTo("KSQL DB response mapping error"); + } + + @SneakyThrows + private JsonNode getResponseWithData() { + JsonNode headerNode = mapper.createObjectNode().set( + "header", mapper.createObjectNode().put("schema", "header1, header2") + ); + JsonNode row = mapper.createObjectNode().set( + "row", mapper.createObjectNode().set( + "columns", mapper.createArrayNode().add("value1").add("value2") + ) + ); + return mapper.createArrayNode().add(headerNode).add(row); + } +} diff --git a/kafka-ui-api/src/test/java/com/provectus/kafka/ui/strategy/ksqlStatement/ShowStrategyTest.java b/kafka-ui-api/src/test/java/com/provectus/kafka/ui/strategy/ksqlStatement/ShowStrategyTest.java new file mode 100644 index 0000000000..bf090752b2 --- /dev/null +++ b/kafka-ui-api/src/test/java/com/provectus/kafka/ui/strategy/ksqlStatement/ShowStrategyTest.java @@ -0,0 +1,136 @@ +package com.provectus.kafka.ui.strategy.ksqlStatement; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.provectus.kafka.ui.exception.UnprocessableEntityException; +import com.provectus.kafka.ui.model.KsqlCommand; +import com.provectus.kafka.ui.model.KsqlCommandResponse; +import com.provectus.kafka.ui.model.KsqlResponseTable; +import com.provectus.kafka.ui.model.Table; +import lombok.SneakyThrows; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.junit.jupiter.MockitoExtension; + +import java.util.List; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.junit.jupiter.api.Assertions.*; + +@ExtendWith(MockitoExtension.class) +class ShowStrategyTest { + private KsqlStatementStrategy ksqlStatementStrategy; + private ObjectMapper mapper = new ObjectMapper(); + + @BeforeEach + public void setUp() { + ksqlStatementStrategy = new ShowStrategy(); + } + + @Test + public void shouldReturnUri() { + ksqlStatementStrategy.host("ksqldb-server:8088"); + assertThat(ksqlStatementStrategy.getUri()).isEqualTo("ksqldb-server:8088/ksql"); + } + + @Test + public void shouldReturnTrueInTest() { + assertTrue(ksqlStatementStrategy.test("SHOW STREAMS;")); + assertTrue(ksqlStatementStrategy.test("SHOW TABLES;")); + assertTrue(ksqlStatementStrategy.test("SHOW TOPICS;")); + assertTrue(ksqlStatementStrategy.test("SHOW QUERIES;")); + assertTrue(ksqlStatementStrategy.test("SHOW PROPERTIES;")); + assertTrue(ksqlStatementStrategy.test("SHOW FUNCTIONS;")); + } + + @Test + public void shouldReturnFalseInTest() { + assertFalse(ksqlStatementStrategy.test("LIST STREAMS;")); + assertFalse(ksqlStatementStrategy.test("LIST TABLES;")); + assertFalse(ksqlStatementStrategy.test("LIST TOPICS;")); + assertFalse(ksqlStatementStrategy.test("LIST QUERIES;")); + assertFalse(ksqlStatementStrategy.test("LIST PROPERTIES;")); + assertFalse(ksqlStatementStrategy.test("LIST FUNCTIONS;")); + } + + @Test + public void shouldSerializeStreamsResponse() { + JsonNode node = getResponseWithData("streams"); + ksqlStatementStrategy.test("show streams;"); + KsqlCommandResponse serializedResponse = ksqlStatementStrategy.serializeResponse(node); + Table table = serializedResponse.getData(); + assertThat(table.getHeaders()).isEqualTo(List.of("header")); + assertThat(table.getRows()).isEqualTo(List.of(List.of("value"))); + } + + @Test + public void shouldSerializeTablesResponse() { + JsonNode node = getResponseWithData("tables"); + ksqlStatementStrategy.test("show tables;"); + KsqlCommandResponse serializedResponse = ksqlStatementStrategy.serializeResponse(node); + Table table = serializedResponse.getData(); + assertThat(table.getHeaders()).isEqualTo(List.of("header")); + assertThat(table.getRows()).isEqualTo(List.of(List.of("value"))); + } + + @Test + public void shouldSerializeTopicsResponse() { + JsonNode node = getResponseWithData("topics"); + ksqlStatementStrategy.test("show topics;"); + KsqlCommandResponse serializedResponse = ksqlStatementStrategy.serializeResponse(node); + Table table = serializedResponse.getData(); + assertThat(table.getHeaders()).isEqualTo(List.of("header")); + assertThat(table.getRows()).isEqualTo(List.of(List.of("value"))); + } + + @Test + public void shouldSerializePropertiesResponse() { + JsonNode node = getResponseWithData("properties"); + ksqlStatementStrategy.test("show properties;"); + KsqlCommandResponse serializedResponse = ksqlStatementStrategy.serializeResponse(node); + Table table = serializedResponse.getData(); + assertThat(table.getHeaders()).isEqualTo(List.of("header")); + assertThat(table.getRows()).isEqualTo(List.of(List.of("value"))); + } + + @Test + public void shouldSerializeFunctionsResponse() { + JsonNode node = getResponseWithData("functions"); + ksqlStatementStrategy.test("show functions;"); + KsqlCommandResponse serializedResponse = ksqlStatementStrategy.serializeResponse(node); + Table table = serializedResponse.getData(); + assertThat(table.getHeaders()).isEqualTo(List.of("header")); + assertThat(table.getRows()).isEqualTo(List.of(List.of("value"))); + } + + @Test + public void shouldSerializeQueriesResponse() { + JsonNode node = getResponseWithData("queries"); + ksqlStatementStrategy.test("show queries;"); + KsqlCommandResponse serializedResponse = ksqlStatementStrategy.serializeResponse(node); + Table table = serializedResponse.getData(); + assertThat(table.getHeaders()).isEqualTo(List.of("header")); + assertThat(table.getRows()).isEqualTo(List.of(List.of("value"))); + } + + @Test + public void shouldSerializeWithException() { + JsonNode node = getResponseWithData("streams"); + ksqlStatementStrategy.test("show tables;"); + Exception exception = assertThrows( + UnprocessableEntityException.class, + () -> ksqlStatementStrategy.serializeResponse(node) + ); + + assertThat(exception.getMessage()).isEqualTo("KSQL DB response mapping error"); + } + + @SneakyThrows + private JsonNode getResponseWithData(String key) { + JsonNode nodeWithDataItem = mapper.createObjectNode().put("header", "value"); + JsonNode nodeWithData = mapper.createArrayNode().add(nodeWithDataItem); + JsonNode nodeWithResponse = mapper.createObjectNode().set(key, nodeWithData); + return mapper.createArrayNode().add(mapper.valueToTree(nodeWithResponse)); + } +} diff --git a/kafka-ui-api/src/test/java/com/provectus/kafka/ui/strategy/ksqlStatement/TerminateStrategyTest.java b/kafka-ui-api/src/test/java/com/provectus/kafka/ui/strategy/ksqlStatement/TerminateStrategyTest.java new file mode 100644 index 0000000000..8b1dffcfbb --- /dev/null +++ b/kafka-ui-api/src/test/java/com/provectus/kafka/ui/strategy/ksqlStatement/TerminateStrategyTest.java @@ -0,0 +1,70 @@ +package com.provectus.kafka.ui.strategy.ksqlStatement; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.provectus.kafka.ui.exception.UnprocessableEntityException; +import com.provectus.kafka.ui.model.KsqlCommandResponse; +import lombok.SneakyThrows; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.junit.jupiter.MockitoExtension; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.junit.jupiter.api.Assertions.*; + +@ExtendWith(MockitoExtension.class) +class TerminateStrategyTest { + private KsqlStatementStrategy ksqlStatementStrategy; + private ObjectMapper mapper = new ObjectMapper(); + + @BeforeEach + public void setUp() { + ksqlStatementStrategy = new TerminateStrategy(); + } + + @Test + public void shouldReturnUri() { + ksqlStatementStrategy.host("ksqldb-server:8088"); + assertThat(ksqlStatementStrategy.getUri()).isEqualTo("ksqldb-server:8088/ksql"); + } + + @Test + public void shouldReturnTrueInTest() { + assertTrue(ksqlStatementStrategy.test("terminate query_id;")); + } + + @Test + public void shouldReturnFalseInTest() { + assertFalse(ksqlStatementStrategy.test("show streams;")); + assertFalse(ksqlStatementStrategy.test("create table test;")); + } + + @Test + public void shouldSerializeResponse() { + String message = "query terminated."; + JsonNode node = getResponseWithMessage(message); + KsqlCommandResponse serializedResponse = ksqlStatementStrategy.serializeResponse(node); + assertThat(serializedResponse.getMessage()).isEqualTo(message); + + } + + @Test + public void shouldSerializeWithException() { + JsonNode commandStatusNode = mapper.createObjectNode().put("commandStatus", "nodeWithMessage"); + JsonNode node = mapper.createArrayNode().add(mapper.valueToTree(commandStatusNode)); + Exception exception = assertThrows( + UnprocessableEntityException.class, + () -> ksqlStatementStrategy.serializeResponse(node) + ); + + assertThat(exception.getMessage()).isEqualTo("KSQL DB response mapping error"); + } + + @SneakyThrows + private JsonNode getResponseWithMessage(String message) { + JsonNode nodeWithMessage = mapper.createObjectNode().put("message", message); + JsonNode commandStatusNode = mapper.createObjectNode().set("commandStatus", nodeWithMessage); + return mapper.createArrayNode().add(mapper.valueToTree(commandStatusNode)); + } +} diff --git a/kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml b/kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml index 31e7ccf4fc..3ca5097a86 100644 --- a/kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml +++ b/kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml @@ -1206,6 +1206,7 @@ components: enum: - SCHEMA_REGISTRY - KAFKA_CONNECT + - KSQL_DB required: - id - name