diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/client/KsqlClient.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/client/KsqlClient.java deleted file mode 100644 index 8d051234ab..0000000000 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/client/KsqlClient.java +++ /dev/null @@ -1,53 +0,0 @@ -package com.provectus.kafka.ui.client; - -import com.fasterxml.jackson.databind.JsonNode; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.provectus.kafka.ui.exception.UnprocessableEntityException; -import com.provectus.kafka.ui.model.KafkaCluster; -import com.provectus.kafka.ui.model.KsqlCommandResponseDTO; -import com.provectus.kafka.ui.service.ksql.KsqlApiClient; -import com.provectus.kafka.ui.strategy.ksql.statement.BaseStrategy; -import lombok.RequiredArgsConstructor; -import lombok.SneakyThrows; -import lombok.extern.slf4j.Slf4j; -import org.springframework.http.HttpStatus; -import org.springframework.http.MediaType; -import org.springframework.stereotype.Service; -import org.springframework.web.reactive.function.BodyInserters; -import org.springframework.web.reactive.function.client.ClientResponse; -import org.springframework.web.reactive.function.client.WebClient; -import reactor.core.publisher.Mono; - -@Service -@RequiredArgsConstructor -@Slf4j -public class KsqlClient { - private final WebClient webClient; - private final ObjectMapper mapper; - - public Mono execute(BaseStrategy ksqlStatement, KafkaCluster cluster) { - return webClient.post() - .uri(ksqlStatement.getUri()) - .headers(httpHeaders -> KsqlApiClient.setBasicAuthIfEnabled(httpHeaders, cluster)) - .accept(new MediaType("application", "vnd.ksql.v1+json")) - .body(BodyInserters.fromValue(ksqlStatement.getKsqlCommand())) - .retrieve() - .onStatus(HttpStatus::isError, this::getErrorMessage) - .bodyToMono(byte[].class) - .map(this::toJson) - .map(ksqlStatement::serializeResponse); - } - - private Mono getErrorMessage(ClientResponse response) { - return response - .bodyToMono(byte[].class) - .map(this::toJson) - .map(jsonNode -> jsonNode.get("message").asText()) - .flatMap(error -> Mono.error(new UnprocessableEntityException(error))); - } - - @SneakyThrows - private JsonNode toJson(byte[] content) { - return this.mapper.readTree(content); - } -} diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/KsqlController.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/KsqlController.java index 62dc24fab2..c3d833e2b8 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/KsqlController.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/KsqlController.java @@ -1,15 +1,12 @@ package com.provectus.kafka.ui.controller; import com.provectus.kafka.ui.api.KsqlApi; -import com.provectus.kafka.ui.model.KsqlCommandDTO; -import com.provectus.kafka.ui.model.KsqlCommandResponseDTO; import com.provectus.kafka.ui.model.KsqlCommandV2DTO; import com.provectus.kafka.ui.model.KsqlCommandV2ResponseDTO; import com.provectus.kafka.ui.model.KsqlResponseDTO; import com.provectus.kafka.ui.model.KsqlStreamDescriptionDTO; import com.provectus.kafka.ui.model.KsqlTableDescriptionDTO; import com.provectus.kafka.ui.model.KsqlTableResponseDTO; -import com.provectus.kafka.ui.service.KsqlService; import com.provectus.kafka.ui.service.ksql.KsqlServiceV2; import java.util.List; import java.util.Map; @@ -27,17 +24,8 @@ import reactor.core.publisher.Mono; @RequiredArgsConstructor @Slf4j public class KsqlController extends AbstractController implements KsqlApi { - private final KsqlService ksqlService; - private final KsqlServiceV2 ksqlServiceV2; - @Override - public Mono> executeKsqlCommand(String clusterName, - Mono - ksqlCommand, - ServerWebExchange exchange) { - return ksqlService.executeKsqlCommand(getCluster(clusterName), ksqlCommand) - .map(ResponseEntity::ok); - } + private final KsqlServiceV2 ksqlServiceV2; @Override public Mono> executeKsql(String clusterName, diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/KsqlService.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/KsqlService.java deleted file mode 100644 index 6f74ede75e..0000000000 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/KsqlService.java +++ /dev/null @@ -1,47 +0,0 @@ -package com.provectus.kafka.ui.service; - -import com.provectus.kafka.ui.client.KsqlClient; -import com.provectus.kafka.ui.exception.ClusterNotFoundException; -import com.provectus.kafka.ui.exception.KsqlDbNotFoundException; -import com.provectus.kafka.ui.exception.UnprocessableEntityException; -import com.provectus.kafka.ui.model.KafkaCluster; -import com.provectus.kafka.ui.model.KsqlCommandDTO; -import com.provectus.kafka.ui.model.KsqlCommandResponseDTO; -import com.provectus.kafka.ui.strategy.ksql.statement.BaseStrategy; -import java.util.List; -import lombok.RequiredArgsConstructor; -import org.springframework.stereotype.Service; -import reactor.core.publisher.Mono; - -@Service -@RequiredArgsConstructor -public class KsqlService { - private final KsqlClient ksqlClient; - private final List ksqlStatementStrategies; - - public Mono executeKsqlCommand(KafkaCluster cluster, - Mono ksqlCommand) { - return Mono.justOrEmpty(cluster) - .map(KafkaCluster::getKsqldbServer) - .onErrorResume(e -> { - Throwable throwable = - e instanceof ClusterNotFoundException ? e : new KsqlDbNotFoundException(); - return Mono.error(throwable); - }) - .flatMap(ksqlServer -> getStatementStrategyForKsqlCommand(ksqlCommand) - .map(statement -> statement.host(ksqlServer.getUrl())) - ) - .flatMap(baseStrategy -> ksqlClient.execute(baseStrategy, cluster)); - } - - private Mono getStatementStrategyForKsqlCommand( - Mono ksqlCommand) { - return ksqlCommand - .map(command -> ksqlStatementStrategies.stream() - .filter(s -> s.test(command.getKsql())) - .map(s -> s.ksqlCommand(command)) - .findFirst()) - .flatMap(Mono::justOrEmpty) - .switchIfEmpty(Mono.error(new UnprocessableEntityException("Invalid sql"))); - } -} diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/strategy/ksql/statement/BaseStrategy.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/strategy/ksql/statement/BaseStrategy.java deleted file mode 100644 index fa057116ad..0000000000 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/strategy/ksql/statement/BaseStrategy.java +++ /dev/null @@ -1,166 +0,0 @@ -package com.provectus.kafka.ui.strategy.ksql.statement; - -import com.fasterxml.jackson.databind.JsonNode; -import com.provectus.kafka.ui.exception.UnprocessableEntityException; -import com.provectus.kafka.ui.model.KsqlCommandDTO; -import com.provectus.kafka.ui.model.KsqlCommandResponseDTO; -import com.provectus.kafka.ui.model.TableDTO; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.List; -import java.util.Spliterator; -import java.util.Spliterators; -import java.util.stream.Collectors; -import java.util.stream.IntStream; -import java.util.stream.Stream; -import java.util.stream.StreamSupport; - -public abstract class BaseStrategy { - protected static final String KSQL_REQUEST_PATH = "/ksql"; - protected static final String QUERY_REQUEST_PATH = "/query"; - private static final String MAPPING_EXCEPTION_ERROR = "KSQL DB response mapping error"; - protected String host = null; - protected KsqlCommandDTO ksqlCommand = null; - - public String getUri() { - if (this.host != null) { - return this.host + this.getRequestPath(); - } - throw new UnprocessableEntityException("Strategy doesn't have host"); - } - - public boolean test(String sql) { - return sql.trim().toLowerCase().matches(getTestRegExp()); - } - - public BaseStrategy host(String host) { - this.host = host; - return this; - } - - public KsqlCommandDTO getKsqlCommand() { - return ksqlCommand; - } - - public BaseStrategy ksqlCommand(KsqlCommandDTO ksqlCommand) { - this.ksqlCommand = ksqlCommand; - return this; - } - - protected String getRequestPath() { - return BaseStrategy.KSQL_REQUEST_PATH; - } - - protected KsqlCommandResponseDTO serializeTableResponse(JsonNode response, String key) { - JsonNode item = getResponseFirstItemValue(response, key); - TableDTO table = item.isArray() ? getTableFromArray(item) : getTableFromObject(item); - return (new KsqlCommandResponseDTO()).data(table); - } - - protected KsqlCommandResponseDTO serializeMessageResponse(JsonNode response, String key) { - JsonNode item = getResponseFirstItemValue(response, key); - return (new KsqlCommandResponseDTO()).message(getMessageFromObject(item)); - } - - protected KsqlCommandResponseDTO serializeQueryResponse(JsonNode response) { - if (response.isArray() && response.size() > 0) { - TableDTO table = (new TableDTO()) - .headers(getQueryResponseHeader(response)) - .rows(getQueryResponseRows(response)); - return (new KsqlCommandResponseDTO()).data(table); - } - throw new UnprocessableEntityException(MAPPING_EXCEPTION_ERROR); - } - - private JsonNode getResponseFirstItemValue(JsonNode response, String key) { - if (response.isArray() && response.size() > 0) { - JsonNode first = response.get(0); - if (first.has(key)) { - return first.path(key); - } - } - throw new UnprocessableEntityException(MAPPING_EXCEPTION_ERROR); - } - - private List getQueryResponseHeader(JsonNode response) { - JsonNode headerRow = response.get(0); - if (headerRow.isObject() && headerRow.has("header")) { - String schema = headerRow.get("header").get("schema").asText(); - return Arrays.stream(schema.split(",")).map(String::trim).collect(Collectors.toList()); - } - return new ArrayList<>(); - } - - private List> getQueryResponseRows(JsonNode node) { - return getStreamForJsonArray(node) - .filter(row -> row.has("row") && row.get("row").has("columns")) - .map(row -> row.get("row").get("columns")) - .map(cellNode -> getStreamForJsonArray(cellNode) - .map(JsonNode::asText) - .collect(Collectors.toList()) - ) - .collect(Collectors.toList()); - } - - private TableDTO getTableFromArray(JsonNode node) { - TableDTO table = new TableDTO(); - table.headers(new ArrayList<>()).rows(new ArrayList<>()); - if (node.size() > 0) { - List keys = getJsonObjectKeys(node.get(0)); - List> rows = getTableRows(node, keys); - table.headers(keys).rows(rows); - } - return table; - } - - private TableDTO getTableFromObject(JsonNode node) { - List keys = getJsonObjectKeys(node); - List values = getJsonObjectValues(node); - List> rows = IntStream - .range(0, keys.size()) - .mapToObj(i -> List.of(keys.get(i), values.get(i))) - .collect(Collectors.toList()); - return (new TableDTO()).headers(List.of("key", "value")).rows(rows); - } - - private String getMessageFromObject(JsonNode node) { - if (node.isObject() && node.has("message")) { - return node.get("message").asText(); - } - throw new UnprocessableEntityException(MAPPING_EXCEPTION_ERROR); - } - - private List> getTableRows(JsonNode node, List keys) { - return getStreamForJsonArray(node) - .map(row -> keys.stream() - .map(header -> row.get(header).asText()) - .collect(Collectors.toList()) - ) - .collect(Collectors.toList()); - } - - private Stream getStreamForJsonArray(JsonNode node) { - if (node.isArray() && node.size() > 0) { - return StreamSupport.stream(node.spliterator(), false); - } - throw new UnprocessableEntityException(MAPPING_EXCEPTION_ERROR); - } - - private List getJsonObjectKeys(JsonNode node) { - if (node.isObject()) { - return StreamSupport.stream( - Spliterators.spliteratorUnknownSize(node.fieldNames(), Spliterator.ORDERED), false - ).collect(Collectors.toList()); - } - throw new UnprocessableEntityException(MAPPING_EXCEPTION_ERROR); - } - - private List getJsonObjectValues(JsonNode node) { - return getJsonObjectKeys(node).stream().map(key -> node.get(key).asText()) - .collect(Collectors.toList()); - } - - public abstract KsqlCommandResponseDTO serializeResponse(JsonNode response); - - protected abstract String getTestRegExp(); -} diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/strategy/ksql/statement/CreateStrategy.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/strategy/ksql/statement/CreateStrategy.java deleted file mode 100644 index d26046a0fd..0000000000 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/strategy/ksql/statement/CreateStrategy.java +++ /dev/null @@ -1,20 +0,0 @@ -package com.provectus.kafka.ui.strategy.ksql.statement; - -import com.fasterxml.jackson.databind.JsonNode; -import com.provectus.kafka.ui.model.KsqlCommandResponseDTO; -import org.springframework.stereotype.Component; - -@Component -public class CreateStrategy extends BaseStrategy { - private static final String RESPONSE_VALUE_KEY = "commandStatus"; - - @Override - public KsqlCommandResponseDTO serializeResponse(JsonNode response) { - return serializeMessageResponse(response, RESPONSE_VALUE_KEY); - } - - @Override - protected String getTestRegExp() { - return "create (table|stream)(.*)(with|as select(.*)from)(.*);"; - } -} diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/strategy/ksql/statement/DescribeStrategy.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/strategy/ksql/statement/DescribeStrategy.java deleted file mode 100644 index b8d7435bad..0000000000 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/strategy/ksql/statement/DescribeStrategy.java +++ /dev/null @@ -1,20 +0,0 @@ -package com.provectus.kafka.ui.strategy.ksql.statement; - -import com.fasterxml.jackson.databind.JsonNode; -import com.provectus.kafka.ui.model.KsqlCommandResponseDTO; -import org.springframework.stereotype.Component; - -@Component -public class DescribeStrategy extends BaseStrategy { - private static final String RESPONSE_VALUE_KEY = "sourceDescription"; - - @Override - public KsqlCommandResponseDTO serializeResponse(JsonNode response) { - return serializeTableResponse(response, RESPONSE_VALUE_KEY); - } - - @Override - protected String getTestRegExp() { - return "describe (.*);"; - } -} diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/strategy/ksql/statement/DropStrategy.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/strategy/ksql/statement/DropStrategy.java deleted file mode 100644 index 95b6884dc1..0000000000 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/strategy/ksql/statement/DropStrategy.java +++ /dev/null @@ -1,20 +0,0 @@ -package com.provectus.kafka.ui.strategy.ksql.statement; - -import com.fasterxml.jackson.databind.JsonNode; -import com.provectus.kafka.ui.model.KsqlCommandResponseDTO; -import org.springframework.stereotype.Component; - -@Component -public class DropStrategy extends BaseStrategy { - private static final String RESPONSE_VALUE_KEY = "commandStatus"; - - @Override - public KsqlCommandResponseDTO serializeResponse(JsonNode response) { - return serializeMessageResponse(response, RESPONSE_VALUE_KEY); - } - - @Override - protected String getTestRegExp() { - return "drop (table|stream) (.*);"; - } -} diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/strategy/ksql/statement/ExplainStrategy.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/strategy/ksql/statement/ExplainStrategy.java deleted file mode 100644 index 221113b7e8..0000000000 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/strategy/ksql/statement/ExplainStrategy.java +++ /dev/null @@ -1,20 +0,0 @@ -package com.provectus.kafka.ui.strategy.ksql.statement; - -import com.fasterxml.jackson.databind.JsonNode; -import com.provectus.kafka.ui.model.KsqlCommandResponseDTO; -import org.springframework.stereotype.Component; - -@Component -public class ExplainStrategy extends BaseStrategy { - private static final String RESPONSE_VALUE_KEY = "queryDescription"; - - @Override - public KsqlCommandResponseDTO serializeResponse(JsonNode response) { - return serializeTableResponse(response, RESPONSE_VALUE_KEY); - } - - @Override - protected String getTestRegExp() { - return "explain (.*);"; - } -} diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/strategy/ksql/statement/SelectStrategy.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/strategy/ksql/statement/SelectStrategy.java deleted file mode 100644 index e535c8107d..0000000000 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/strategy/ksql/statement/SelectStrategy.java +++ /dev/null @@ -1,24 +0,0 @@ -package com.provectus.kafka.ui.strategy.ksql.statement; - -import com.fasterxml.jackson.databind.JsonNode; -import com.provectus.kafka.ui.model.KsqlCommandResponseDTO; -import org.springframework.stereotype.Component; - -@Component -public class SelectStrategy extends BaseStrategy { - - @Override - public KsqlCommandResponseDTO serializeResponse(JsonNode response) { - return serializeQueryResponse(response); - } - - @Override - protected String getRequestPath() { - return BaseStrategy.QUERY_REQUEST_PATH; - } - - @Override - protected String getTestRegExp() { - return "select (.*) from (.*);"; - } -} diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/strategy/ksql/statement/ShowStrategy.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/strategy/ksql/statement/ShowStrategy.java deleted file mode 100644 index 93c635b044..0000000000 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/strategy/ksql/statement/ShowStrategy.java +++ /dev/null @@ -1,67 +0,0 @@ -package com.provectus.kafka.ui.strategy.ksql.statement; - -import com.fasterxml.jackson.databind.JsonNode; -import com.provectus.kafka.ui.model.KsqlCommandDTO; -import com.provectus.kafka.ui.model.KsqlCommandResponseDTO; -import java.util.List; -import java.util.Optional; -import org.springframework.stereotype.Component; - -@Component -public class ShowStrategy extends BaseStrategy { - private static final List SHOW_STATEMENTS = - List.of("functions", "topics", "streams", "tables", "queries", "properties"); - private static final List LIST_STATEMENTS = - List.of("functions", "topics", "streams", "tables"); - private String responseValueKey = ""; - - @Override - public KsqlCommandResponseDTO serializeResponse(JsonNode response) { - return serializeTableResponse(response, responseValueKey); - } - - @Override - public boolean test(String sql) { - Optional statement = SHOW_STATEMENTS.stream() - .filter(s -> testSql(sql, getShowRegExp(s)) || testSql(sql, getListRegExp(s))) - .findFirst(); - if (statement.isPresent()) { - setResponseValueKey(statement.get()); - return true; - } - return false; - } - - @Override - protected String getTestRegExp() { - return ""; - } - - @Override - public BaseStrategy ksqlCommand(KsqlCommandDTO ksqlCommand) { - // return new instance to avoid conflicts for parallel requests - ShowStrategy clone = new ShowStrategy(); - clone.setResponseValueKey(responseValueKey); - clone.ksqlCommand = ksqlCommand; - return clone; - } - - protected String getShowRegExp(String key) { - return "show " + key + ";"; - } - - protected String getListRegExp(String key) { - if (LIST_STATEMENTS.contains(key)) { - return "list " + key + ";"; - } - return ""; - } - - private void setResponseValueKey(String path) { - responseValueKey = path; - } - - private boolean testSql(String sql, String pattern) { - return sql.trim().toLowerCase().matches(pattern); - } -} diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/strategy/ksql/statement/TerminateStrategy.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/strategy/ksql/statement/TerminateStrategy.java deleted file mode 100644 index b043b8c6c9..0000000000 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/strategy/ksql/statement/TerminateStrategy.java +++ /dev/null @@ -1,20 +0,0 @@ -package com.provectus.kafka.ui.strategy.ksql.statement; - -import com.fasterxml.jackson.databind.JsonNode; -import com.provectus.kafka.ui.model.KsqlCommandResponseDTO; -import org.springframework.stereotype.Component; - -@Component -public class TerminateStrategy extends BaseStrategy { - private static final String RESPONSE_VALUE_KEY = "commandStatus"; - - @Override - public KsqlCommandResponseDTO serializeResponse(JsonNode response) { - return serializeMessageResponse(response, RESPONSE_VALUE_KEY); - } - - @Override - protected String getTestRegExp() { - return "terminate (.*);"; - } -} diff --git a/kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/KsqlServiceTest.java b/kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/KsqlServiceTest.java deleted file mode 100644 index ed434efba4..0000000000 --- a/kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/KsqlServiceTest.java +++ /dev/null @@ -1,104 +0,0 @@ -package com.provectus.kafka.ui.service; - -import static org.assertj.core.api.Assertions.assertThat; -import static org.mockito.ArgumentMatchers.any; -import static org.mockito.ArgumentMatchers.eq; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; - -import com.provectus.kafka.ui.client.KsqlClient; -import com.provectus.kafka.ui.exception.KsqlDbNotFoundException; -import com.provectus.kafka.ui.exception.UnprocessableEntityException; -import com.provectus.kafka.ui.model.InternalKsqlServer; -import com.provectus.kafka.ui.model.KafkaCluster; -import com.provectus.kafka.ui.model.KsqlCommandDTO; -import com.provectus.kafka.ui.model.KsqlCommandResponseDTO; -import com.provectus.kafka.ui.strategy.ksql.statement.BaseStrategy; -import com.provectus.kafka.ui.strategy.ksql.statement.DescribeStrategy; -import com.provectus.kafka.ui.strategy.ksql.statement.ShowStrategy; -import java.util.List; -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.extension.ExtendWith; -import org.mockito.Mock; -import org.mockito.Mockito; -import org.mockito.junit.jupiter.MockitoExtension; -import reactor.core.publisher.Mono; -import reactor.test.StepVerifier; - -@ExtendWith(MockitoExtension.class) -class KsqlServiceTest { - private KsqlService ksqlService; - private BaseStrategy baseStrategy; - private BaseStrategy alternativeStrategy; - - @Mock - private ClustersStorage clustersStorage; - @Mock - private KsqlClient ksqlClient; - - - @BeforeEach - public void setUp() { - this.baseStrategy = new ShowStrategy(); - this.alternativeStrategy = new DescribeStrategy(); - this.ksqlService = new KsqlService( - this.ksqlClient, - List.of(baseStrategy, alternativeStrategy) - ); - } - - @Test - void shouldThrowKsqlDbNotFoundExceptionOnExecuteKsqlCommand() { - KsqlCommandDTO command = (new KsqlCommandDTO()).ksql("show streams;"); - KafkaCluster kafkaCluster = Mockito.mock(KafkaCluster.class); - when(kafkaCluster.getKsqldbServer()).thenReturn(null); - - StepVerifier.create(ksqlService.executeKsqlCommand(kafkaCluster, Mono.just(command))) - .verifyError(KsqlDbNotFoundException.class); - } - - @Test - void shouldThrowUnprocessableEntityExceptionOnExecuteKsqlCommand() { - KsqlCommandDTO command = - (new KsqlCommandDTO()).ksql("CREATE STREAM users WITH (KAFKA_TOPIC='users');"); - KafkaCluster kafkaCluster = Mockito.mock(KafkaCluster.class); - when(kafkaCluster.getKsqldbServer()).thenReturn(InternalKsqlServer.builder().url("localhost:8088").build()); - - StepVerifier.create(ksqlService.executeKsqlCommand(kafkaCluster, Mono.just(command))) - .verifyError(UnprocessableEntityException.class); - - StepVerifier.create(ksqlService.executeKsqlCommand(kafkaCluster, Mono.just(command))) - .verifyErrorMessage("Invalid sql"); - } - - @Test - void shouldSetHostToStrategy() { - String host = "localhost:8088"; - KsqlCommandDTO command = (new KsqlCommandDTO()).ksql("describe streams;"); - KafkaCluster kafkaCluster = Mockito.mock(KafkaCluster.class); - - when(kafkaCluster.getKsqldbServer()).thenReturn(InternalKsqlServer.builder().url(host).build()); - when(ksqlClient.execute(any(), any())).thenReturn(Mono.just(new KsqlCommandResponseDTO())); - - ksqlService.executeKsqlCommand(kafkaCluster, Mono.just(command)).block(); - assertThat(alternativeStrategy.getUri()).isEqualTo(host + "/ksql"); - } - - @Test - void shouldCallClientAndReturnResponse() { - KsqlCommandDTO command = (new KsqlCommandDTO()).ksql("describe streams;"); - KafkaCluster kafkaCluster = Mockito.mock(KafkaCluster.class); - KsqlCommandResponseDTO response = new KsqlCommandResponseDTO().message("success"); - - when(kafkaCluster.getKsqldbServer()).thenReturn(InternalKsqlServer.builder().url("host").build()); - when(ksqlClient.execute(any(), any())).thenReturn(Mono.just(response)); - - KsqlCommandResponseDTO receivedResponse = - ksqlService.executeKsqlCommand(kafkaCluster, Mono.just(command)).block(); - verify(ksqlClient, times(1)).execute(eq(alternativeStrategy), any()); - assertThat(receivedResponse).isEqualTo(response); - - } -} diff --git a/kafka-ui-api/src/test/java/com/provectus/kafka/ui/strategy/ksql/statement/CreateStrategyTest.java b/kafka-ui-api/src/test/java/com/provectus/kafka/ui/strategy/ksql/statement/CreateStrategyTest.java deleted file mode 100644 index 257fb36d35..0000000000 --- a/kafka-ui-api/src/test/java/com/provectus/kafka/ui/strategy/ksql/statement/CreateStrategyTest.java +++ /dev/null @@ -1,85 +0,0 @@ -package com.provectus.kafka.ui.strategy.ksql.statement; - -import static org.assertj.core.api.Assertions.assertThat; -import static org.junit.jupiter.api.Assertions.assertFalse; -import static org.junit.jupiter.api.Assertions.assertThrows; -import static org.junit.jupiter.api.Assertions.assertTrue; - -import com.fasterxml.jackson.databind.JsonNode; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.provectus.kafka.ui.exception.UnprocessableEntityException; -import com.provectus.kafka.ui.model.KsqlCommandResponseDTO; -import lombok.SneakyThrows; -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.extension.ExtendWith; -import org.mockito.junit.jupiter.MockitoExtension; - -@ExtendWith(MockitoExtension.class) -class CreateStrategyTest { - private final ObjectMapper mapper = new ObjectMapper(); - private CreateStrategy strategy; - - @BeforeEach - void setUp() { - strategy = new CreateStrategy(); - } - - @Test - void shouldReturnUri() { - strategy.host("ksqldb-server:8088"); - assertThat(strategy.getUri()).isEqualTo("ksqldb-server:8088/ksql"); - } - - @Test - void shouldReturnTrueInTest() { - assertTrue(strategy.test("CREATE STREAM stream WITH (KAFKA_TOPIC='topic');")); - assertTrue(strategy.test("CREATE STREAM stream" - + " AS SELECT users.id AS userid FROM users EMIT CHANGES;" - )); - assertTrue(strategy.test( - "CREATE TABLE table (id VARCHAR) WITH (KAFKA_TOPIC='table');" - )); - assertTrue(strategy.test( - "CREATE TABLE pageviews_regions WITH (KEY_FORMAT='JSON')" - + " AS SELECT gender, COUNT(*) AS numbers" - + " FROM pageviews EMIT CHANGES;" - )); - } - - @Test - void shouldReturnFalseInTest() { - assertFalse(strategy.test("show streams;")); - assertFalse(strategy.test("show tables;")); - assertFalse(strategy.test("CREATE TABLE test;")); - assertFalse(strategy.test("CREATE STREAM test;")); - } - - @Test - void shouldSerializeResponse() { - String message = "updated successful"; - JsonNode node = getResponseWithMessage(message); - KsqlCommandResponseDTO serializedResponse = strategy.serializeResponse(node); - assertThat(serializedResponse.getMessage()).isEqualTo(message); - - } - - @Test - void shouldSerializeWithException() { - JsonNode commandStatusNode = mapper.createObjectNode().put("commandStatus", "nodeWithMessage"); - JsonNode node = mapper.createArrayNode().add(mapper.valueToTree(commandStatusNode)); - Exception exception = assertThrows( - UnprocessableEntityException.class, - () -> strategy.serializeResponse(node) - ); - - assertThat(exception.getMessage()).isEqualTo("KSQL DB response mapping error"); - } - - @SneakyThrows - private JsonNode getResponseWithMessage(String message) { - JsonNode nodeWithMessage = mapper.createObjectNode().put("message", message); - JsonNode commandStatusNode = mapper.createObjectNode().set("commandStatus", nodeWithMessage); - return mapper.createArrayNode().add(mapper.valueToTree(commandStatusNode)); - } -} diff --git a/kafka-ui-api/src/test/java/com/provectus/kafka/ui/strategy/ksql/statement/DescribeStrategyTest.java b/kafka-ui-api/src/test/java/com/provectus/kafka/ui/strategy/ksql/statement/DescribeStrategyTest.java deleted file mode 100644 index 51cb0c742a..0000000000 --- a/kafka-ui-api/src/test/java/com/provectus/kafka/ui/strategy/ksql/statement/DescribeStrategyTest.java +++ /dev/null @@ -1,76 +0,0 @@ -package com.provectus.kafka.ui.strategy.ksql.statement; - -import static org.assertj.core.api.Assertions.assertThat; -import static org.junit.jupiter.api.Assertions.assertFalse; -import static org.junit.jupiter.api.Assertions.assertThrows; -import static org.junit.jupiter.api.Assertions.assertTrue; - -import com.fasterxml.jackson.databind.JsonNode; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.provectus.kafka.ui.exception.UnprocessableEntityException; -import com.provectus.kafka.ui.model.KsqlCommandResponseDTO; -import com.provectus.kafka.ui.model.TableDTO; -import java.util.List; -import lombok.SneakyThrows; -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.extension.ExtendWith; -import org.mockito.junit.jupiter.MockitoExtension; - -@ExtendWith(MockitoExtension.class) -class DescribeStrategyTest { - private final ObjectMapper mapper = new ObjectMapper(); - private DescribeStrategy strategy; - - @BeforeEach - void setUp() { - strategy = new DescribeStrategy(); - } - - @Test - void shouldReturnUri() { - strategy.host("ksqldb-server:8088"); - assertThat(strategy.getUri()).isEqualTo("ksqldb-server:8088/ksql"); - } - - @Test - void shouldReturnTrueInTest() { - assertTrue(strategy.test("DESCRIBE users;")); - assertTrue(strategy.test("DESCRIBE EXTENDED users;")); - } - - @Test - void shouldReturnFalseInTest() { - assertFalse(strategy.test("list streams;")); - assertFalse(strategy.test("show tables;")); - } - - @Test - void shouldSerializeResponse() { - JsonNode node = getResponseWithObjectNode(); - KsqlCommandResponseDTO serializedResponse = strategy.serializeResponse(node); - TableDTO table = serializedResponse.getData(); - assertThat(table.getHeaders()).isEqualTo(List.of("key", "value")); - assertThat(table.getRows()).isEqualTo(List.of(List.of("name", "kafka"))); - } - - @Test - void shouldSerializeWithException() { - JsonNode sourceDescriptionNode = - mapper.createObjectNode().put("sourceDescription", "nodeWithMessage"); - JsonNode node = mapper.createArrayNode().add(mapper.valueToTree(sourceDescriptionNode)); - Exception exception = assertThrows( - UnprocessableEntityException.class, - () -> strategy.serializeResponse(node) - ); - - assertThat(exception.getMessage()).isEqualTo("KSQL DB response mapping error"); - } - - @SneakyThrows - private JsonNode getResponseWithObjectNode() { - JsonNode nodeWithMessage = mapper.createObjectNode().put("name", "kafka"); - JsonNode nodeWithResponse = mapper.createObjectNode().set("sourceDescription", nodeWithMessage); - return mapper.createArrayNode().add(mapper.valueToTree(nodeWithResponse)); - } -} diff --git a/kafka-ui-api/src/test/java/com/provectus/kafka/ui/strategy/ksql/statement/DropStrategyTest.java b/kafka-ui-api/src/test/java/com/provectus/kafka/ui/strategy/ksql/statement/DropStrategyTest.java deleted file mode 100644 index 5f2b8fcc84..0000000000 --- a/kafka-ui-api/src/test/java/com/provectus/kafka/ui/strategy/ksql/statement/DropStrategyTest.java +++ /dev/null @@ -1,75 +0,0 @@ -package com.provectus.kafka.ui.strategy.ksql.statement; - -import static org.assertj.core.api.Assertions.assertThat; -import static org.junit.jupiter.api.Assertions.assertFalse; -import static org.junit.jupiter.api.Assertions.assertThrows; -import static org.junit.jupiter.api.Assertions.assertTrue; - -import com.fasterxml.jackson.databind.JsonNode; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.provectus.kafka.ui.exception.UnprocessableEntityException; -import com.provectus.kafka.ui.model.KsqlCommandResponseDTO; -import lombok.SneakyThrows; -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.extension.ExtendWith; -import org.mockito.junit.jupiter.MockitoExtension; - -@ExtendWith(MockitoExtension.class) -class DropStrategyTest { - private final ObjectMapper mapper = new ObjectMapper(); - private DropStrategy strategy; - - @BeforeEach - void setUp() { - strategy = new DropStrategy(); - } - - @Test - void shouldReturnUri() { - strategy.host("ksqldb-server:8088"); - assertThat(strategy.getUri()).isEqualTo("ksqldb-server:8088/ksql"); - } - - @Test - void shouldReturnTrueInTest() { - assertTrue(strategy.test("drop table table1;")); - assertTrue(strategy.test("drop stream stream2;")); - } - - @Test - void shouldReturnFalseInTest() { - assertFalse(strategy.test("show streams;")); - assertFalse(strategy.test("show tables;")); - assertFalse(strategy.test("create table test;")); - assertFalse(strategy.test("create stream test;")); - } - - @Test - void shouldSerializeResponse() { - String message = "updated successful"; - JsonNode node = getResponseWithMessage(message); - KsqlCommandResponseDTO serializedResponse = strategy.serializeResponse(node); - assertThat(serializedResponse.getMessage()).isEqualTo(message); - - } - - @Test - void shouldSerializeWithException() { - JsonNode commandStatusNode = mapper.createObjectNode().put("commandStatus", "nodeWithMessage"); - JsonNode node = mapper.createArrayNode().add(mapper.valueToTree(commandStatusNode)); - Exception exception = assertThrows( - UnprocessableEntityException.class, - () -> strategy.serializeResponse(node) - ); - - assertThat(exception.getMessage()).isEqualTo("KSQL DB response mapping error"); - } - - @SneakyThrows - private JsonNode getResponseWithMessage(String message) { - JsonNode nodeWithMessage = mapper.createObjectNode().put("message", message); - JsonNode commandStatusNode = mapper.createObjectNode().set("commandStatus", nodeWithMessage); - return mapper.createArrayNode().add(mapper.valueToTree(commandStatusNode)); - } -} diff --git a/kafka-ui-api/src/test/java/com/provectus/kafka/ui/strategy/ksql/statement/ExplainStrategyTest.java b/kafka-ui-api/src/test/java/com/provectus/kafka/ui/strategy/ksql/statement/ExplainStrategyTest.java deleted file mode 100644 index 2582abedbf..0000000000 --- a/kafka-ui-api/src/test/java/com/provectus/kafka/ui/strategy/ksql/statement/ExplainStrategyTest.java +++ /dev/null @@ -1,74 +0,0 @@ -package com.provectus.kafka.ui.strategy.ksql.statement; - -import static org.assertj.core.api.Assertions.assertThat; -import static org.junit.jupiter.api.Assertions.assertFalse; -import static org.junit.jupiter.api.Assertions.assertThrows; -import static org.junit.jupiter.api.Assertions.assertTrue; - -import com.fasterxml.jackson.databind.JsonNode; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.provectus.kafka.ui.exception.UnprocessableEntityException; -import com.provectus.kafka.ui.model.KsqlCommandResponseDTO; -import com.provectus.kafka.ui.model.TableDTO; -import java.util.List; -import lombok.SneakyThrows; -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.extension.ExtendWith; -import org.mockito.junit.jupiter.MockitoExtension; - -@ExtendWith(MockitoExtension.class) -class ExplainStrategyTest { - private final ObjectMapper mapper = new ObjectMapper(); - private ExplainStrategy strategy; - - @BeforeEach - void setUp() { - strategy = new ExplainStrategy(); - } - - @Test - void shouldReturnUri() { - strategy.host("ksqldb-server:8088"); - assertThat(strategy.getUri()).isEqualTo("ksqldb-server:8088/ksql"); - } - - @Test - void shouldReturnTrueInTest() { - assertTrue(strategy.test("explain users_query_id;")); - } - - @Test - void shouldReturnFalseInTest() { - assertFalse(strategy.test("show queries;")); - } - - @Test - void shouldSerializeResponse() { - JsonNode node = getResponseWithObjectNode(); - KsqlCommandResponseDTO serializedResponse = strategy.serializeResponse(node); - TableDTO table = serializedResponse.getData(); - assertThat(table.getHeaders()).isEqualTo(List.of("key", "value")); - assertThat(table.getRows()).isEqualTo(List.of(List.of("name", "kafka"))); - } - - @Test - void shouldSerializeWithException() { - JsonNode sourceDescriptionNode = - mapper.createObjectNode().put("sourceDescription", "nodeWithMessage"); - JsonNode node = mapper.createArrayNode().add(mapper.valueToTree(sourceDescriptionNode)); - Exception exception = assertThrows( - UnprocessableEntityException.class, - () -> strategy.serializeResponse(node) - ); - - assertThat(exception.getMessage()).isEqualTo("KSQL DB response mapping error"); - } - - @SneakyThrows - private JsonNode getResponseWithObjectNode() { - JsonNode nodeWithMessage = mapper.createObjectNode().put("name", "kafka"); - JsonNode nodeWithResponse = mapper.createObjectNode().set("queryDescription", nodeWithMessage); - return mapper.createArrayNode().add(mapper.valueToTree(nodeWithResponse)); - } -} diff --git a/kafka-ui-api/src/test/java/com/provectus/kafka/ui/strategy/ksql/statement/SelectStrategyTest.java b/kafka-ui-api/src/test/java/com/provectus/kafka/ui/strategy/ksql/statement/SelectStrategyTest.java deleted file mode 100644 index efeb87d584..0000000000 --- a/kafka-ui-api/src/test/java/com/provectus/kafka/ui/strategy/ksql/statement/SelectStrategyTest.java +++ /dev/null @@ -1,79 +0,0 @@ -package com.provectus.kafka.ui.strategy.ksql.statement; - -import static org.assertj.core.api.Assertions.assertThat; -import static org.junit.jupiter.api.Assertions.assertFalse; -import static org.junit.jupiter.api.Assertions.assertThrows; -import static org.junit.jupiter.api.Assertions.assertTrue; - -import com.fasterxml.jackson.databind.JsonNode; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.provectus.kafka.ui.exception.UnprocessableEntityException; -import com.provectus.kafka.ui.model.KsqlCommandResponseDTO; -import com.provectus.kafka.ui.model.TableDTO; -import java.util.List; -import lombok.SneakyThrows; -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.extension.ExtendWith; -import org.mockito.junit.jupiter.MockitoExtension; - -@ExtendWith(MockitoExtension.class) -class SelectStrategyTest { - private final ObjectMapper mapper = new ObjectMapper(); - private SelectStrategy strategy; - - @BeforeEach - void setUp() { - strategy = new SelectStrategy(); - } - - @Test - void shouldReturnUri() { - strategy.host("ksqldb-server:8088"); - assertThat(strategy.getUri()).isEqualTo("ksqldb-server:8088/query"); - } - - @Test - void shouldReturnTrueInTest() { - assertTrue(strategy.test("select * from users;")); - } - - @Test - void shouldReturnFalseInTest() { - assertFalse(strategy.test("show streams;")); - assertFalse(strategy.test("select *;")); - } - - @Test - void shouldSerializeResponse() { - JsonNode node = getResponseWithData(); - KsqlCommandResponseDTO serializedResponse = strategy.serializeResponse(node); - TableDTO table = serializedResponse.getData(); - assertThat(table.getHeaders()).isEqualTo(List.of("header1", "header2")); - assertThat(table.getRows()).isEqualTo(List.of(List.of("value1", "value2"))); - } - - @Test - void shouldSerializeWithException() { - JsonNode node = mapper.createObjectNode(); - Exception exception = assertThrows( - UnprocessableEntityException.class, - () -> strategy.serializeResponse(node) - ); - - assertThat(exception.getMessage()).isEqualTo("KSQL DB response mapping error"); - } - - @SneakyThrows - private JsonNode getResponseWithData() { - JsonNode headerNode = mapper.createObjectNode().set( - "header", mapper.createObjectNode().put("schema", "header1, header2") - ); - JsonNode row = mapper.createObjectNode().set( - "row", mapper.createObjectNode().set( - "columns", mapper.createArrayNode().add("value1").add("value2") - ) - ); - return mapper.createArrayNode().add(headerNode).add(row); - } -} diff --git a/kafka-ui-api/src/test/java/com/provectus/kafka/ui/strategy/ksql/statement/ShowStrategyTest.java b/kafka-ui-api/src/test/java/com/provectus/kafka/ui/strategy/ksql/statement/ShowStrategyTest.java deleted file mode 100644 index 3b12afa71a..0000000000 --- a/kafka-ui-api/src/test/java/com/provectus/kafka/ui/strategy/ksql/statement/ShowStrategyTest.java +++ /dev/null @@ -1,102 +0,0 @@ -package com.provectus.kafka.ui.strategy.ksql.statement; - -import static org.assertj.core.api.Assertions.assertThat; -import static org.junit.jupiter.api.Assertions.assertFalse; -import static org.junit.jupiter.api.Assertions.assertThrows; -import static org.junit.jupiter.api.Assertions.assertTrue; - -import com.fasterxml.jackson.databind.JsonNode; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.provectus.kafka.ui.exception.UnprocessableEntityException; -import com.provectus.kafka.ui.model.KsqlCommandResponseDTO; -import com.provectus.kafka.ui.model.TableDTO; -import java.util.List; -import lombok.SneakyThrows; -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.DynamicTest; -import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.TestFactory; -import org.junit.jupiter.api.extension.ExtendWith; -import org.mockito.junit.jupiter.MockitoExtension; - -@ExtendWith(MockitoExtension.class) -class ShowStrategyTest { - private final ObjectMapper mapper = new ObjectMapper(); - private ShowStrategy strategy; - - @BeforeEach - void setUp() { - strategy = new ShowStrategy(); - } - - @Test - void shouldReturnUri() { - strategy.host("ksqldb-server:8088"); - assertThat(strategy.getUri()).isEqualTo("ksqldb-server:8088/ksql"); - } - - @Test - void shouldReturnTrueInTest() { - assertTrue(strategy.test("SHOW STREAMS;")); - assertTrue(strategy.test("SHOW TABLES;")); - assertTrue(strategy.test("SHOW TOPICS;")); - assertTrue(strategy.test("SHOW QUERIES;")); - assertTrue(strategy.test("SHOW PROPERTIES;")); - assertTrue(strategy.test("SHOW FUNCTIONS;")); - assertTrue(strategy.test("LIST STREAMS;")); - assertTrue(strategy.test("LIST TABLES;")); - assertTrue(strategy.test("LIST TOPICS;")); - assertTrue(strategy.test("LIST FUNCTIONS;")); - } - - @Test - void shouldReturnFalseInTest() { - assertFalse(strategy.test("LIST QUERIES;")); - assertFalse(strategy.test("LIST PROPERTIES;")); - } - - @TestFactory - public Iterable shouldSerialize() { - return List.of( - shouldSerializeGenerate("streams", "show streams;"), - shouldSerializeGenerate("tables", "show tables;"), - shouldSerializeGenerate("topics", "show topics;"), - shouldSerializeGenerate("properties", "show properties;"), - shouldSerializeGenerate("functions", "show functions;"), - shouldSerializeGenerate("queries", "show queries;") - ); - } - - public DynamicTest shouldSerializeGenerate(final String key, final String sql) { - return DynamicTest.dynamicTest("Should serialize " + key, - () -> { - JsonNode node = getResponseWithData(key); - strategy.test(sql); - KsqlCommandResponseDTO serializedResponse = strategy.serializeResponse(node); - TableDTO table = serializedResponse.getData(); - assertThat(table.getHeaders()).isEqualTo(List.of("header")); - assertThat(table.getRows()).isEqualTo(List.of(List.of("value"))); - } - ); - } - - @Test - void shouldSerializeWithException() { - JsonNode node = getResponseWithData("streams"); - strategy.test("show tables;"); - Exception exception = assertThrows( - UnprocessableEntityException.class, - () -> strategy.serializeResponse(node) - ); - - assertThat(exception.getMessage()).isEqualTo("KSQL DB response mapping error"); - } - - @SneakyThrows - private JsonNode getResponseWithData(String key) { - JsonNode nodeWithDataItem = mapper.createObjectNode().put("header", "value"); - JsonNode nodeWithData = mapper.createArrayNode().add(nodeWithDataItem); - JsonNode nodeWithResponse = mapper.createObjectNode().set(key, nodeWithData); - return mapper.createArrayNode().add(mapper.valueToTree(nodeWithResponse)); - } -} diff --git a/kafka-ui-api/src/test/java/com/provectus/kafka/ui/strategy/ksql/statement/TerminateStrategyTest.java b/kafka-ui-api/src/test/java/com/provectus/kafka/ui/strategy/ksql/statement/TerminateStrategyTest.java deleted file mode 100644 index 2f3b8756a1..0000000000 --- a/kafka-ui-api/src/test/java/com/provectus/kafka/ui/strategy/ksql/statement/TerminateStrategyTest.java +++ /dev/null @@ -1,72 +0,0 @@ -package com.provectus.kafka.ui.strategy.ksql.statement; - -import static org.assertj.core.api.Assertions.assertThat; -import static org.junit.jupiter.api.Assertions.assertFalse; -import static org.junit.jupiter.api.Assertions.assertThrows; -import static org.junit.jupiter.api.Assertions.assertTrue; - -import com.fasterxml.jackson.databind.JsonNode; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.provectus.kafka.ui.exception.UnprocessableEntityException; -import com.provectus.kafka.ui.model.KsqlCommandResponseDTO; -import lombok.SneakyThrows; -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.extension.ExtendWith; -import org.mockito.junit.jupiter.MockitoExtension; - -@ExtendWith(MockitoExtension.class) -class TerminateStrategyTest { - private final ObjectMapper mapper = new ObjectMapper(); - private TerminateStrategy strategy; - - @BeforeEach - void setUp() { - strategy = new TerminateStrategy(); - } - - @Test - void shouldReturnUri() { - strategy.host("ksqldb-server:8088"); - assertThat(strategy.getUri()).isEqualTo("ksqldb-server:8088/ksql"); - } - - @Test - void shouldReturnTrueInTest() { - assertTrue(strategy.test("terminate query_id;")); - } - - @Test - void shouldReturnFalseInTest() { - assertFalse(strategy.test("show streams;")); - assertFalse(strategy.test("create table test;")); - } - - @Test - void shouldSerializeResponse() { - String message = "query terminated."; - JsonNode node = getResponseWithMessage(message); - KsqlCommandResponseDTO serializedResponse = strategy.serializeResponse(node); - assertThat(serializedResponse.getMessage()).isEqualTo(message); - - } - - @Test - void shouldSerializeWithException() { - JsonNode commandStatusNode = mapper.createObjectNode().put("commandStatus", "nodeWithMessage"); - JsonNode node = mapper.createArrayNode().add(mapper.valueToTree(commandStatusNode)); - Exception exception = assertThrows( - UnprocessableEntityException.class, - () -> strategy.serializeResponse(node) - ); - - assertThat(exception.getMessage()).isEqualTo("KSQL DB response mapping error"); - } - - @SneakyThrows - private JsonNode getResponseWithMessage(String message) { - JsonNode nodeWithMessage = mapper.createObjectNode().put("message", message); - JsonNode commandStatusNode = mapper.createObjectNode().set("commandStatus", nodeWithMessage); - return mapper.createArrayNode().add(mapper.valueToTree(commandStatusNode)); - } -} diff --git a/kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml b/kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml index d07c24a61b..55ea795f03 100644 --- a/kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml +++ b/kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml @@ -1561,31 +1561,6 @@ paths: 200: description: OK - /api/clusters/{clusterName}/ksql: - description: Deprecated - use ksql/v2 instead! - post: - tags: - - Ksql - summary: executeKsqlCommand - operationId: executeKsqlCommand - parameters: - - name: clusterName - in: path - required: true - schema: - type: string - requestBody: - content: - application/json: - schema: - $ref: '#/components/schemas/KsqlCommand' - responses: - 200: - description: OK - content: - application/json: - schema: - $ref: '#/components/schemas/KsqlCommandResponse' /api/clusters/{clusterName}/ksql/v2: post: @@ -2986,18 +2961,6 @@ components: items: $ref: '#/components/schemas/ConnectorPluginConfig' - KsqlCommand: - type: object - properties: - ksql: - type: string - streamsProperties: - type: object - additionalProperties: - type: string - required: - - ksql - KsqlCommandV2: type: object properties: @@ -3044,31 +3007,6 @@ components: valueFormat: type: string - KsqlCommandResponse: - type: object - properties: - data: - $ref: '#/components/schemas/Table' - message: - type: string - - Table: - type: object - properties: - headers: - type: array - items: - type: string - rows: - type: array - items: - type: array - items: - type: string - required: - - headers - - rows - KsqlResponse: type: object properties: