diff --git a/README.md b/README.md index 927b67b280..11f4568b27 100644 --- a/README.md +++ b/README.md @@ -163,6 +163,7 @@ For example, if you want to use an environment variable to set the `name` parame |`KAFKA_CLUSTERS_0_NAME` | Cluster name |`KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS` |Address where to connect |`KAFKA_CLUSTERS_0_ZOOKEEPER` | Zookeper service address +|`KAFKA_CLUSTERS_0_KSQLDBSERVER` | KSQL DB server address |`KAFKA_CLUSTERS_0_PROPERTIES_SECURITY_PROTOCOL` |Security protocol to connect to the brokers. For SSL connection use "SSL", for plaintext connection don't set this environment variable |`KAFKA_CLUSTERS_0_SCHEMAREGISTRY` |SchemaRegistry's address |`KAFKA_CLUSTERS_0_SCHEMAREGISTRYAUTH_USERNAME` |SchemaRegistry's basic authentication username diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/client/KsqlClient.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/client/KsqlClient.java new file mode 100644 index 0000000000..58ece1b7a7 --- /dev/null +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/client/KsqlClient.java @@ -0,0 +1,50 @@ +package com.provectus.kafka.ui.client; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.provectus.kafka.ui.exception.UnprocessableEntityException; +import com.provectus.kafka.ui.model.KsqlCommandResponse; +import com.provectus.kafka.ui.strategy.ksql.statement.BaseStrategy; +import lombok.RequiredArgsConstructor; +import lombok.SneakyThrows; +import lombok.extern.log4j.Log4j2; +import org.springframework.http.HttpStatus; +import org.springframework.http.MediaType; +import org.springframework.stereotype.Service; +import org.springframework.web.reactive.function.BodyInserters; +import org.springframework.web.reactive.function.client.ClientResponse; +import org.springframework.web.reactive.function.client.WebClient; +import reactor.core.publisher.Mono; + +@Service +@RequiredArgsConstructor +@Log4j2 +public class KsqlClient { + private final WebClient webClient; + private final ObjectMapper mapper; + + public Mono execute(BaseStrategy ksqlStatement) { + return webClient.post() + .uri(ksqlStatement.getUri()) + .accept(new MediaType("application", "vnd.ksql.v1+json")) + .body(BodyInserters.fromValue(ksqlStatement.getKsqlCommand())) + .retrieve() + .onStatus(HttpStatus::isError, this::getErrorMessage) + .bodyToMono(byte[].class) + .map(this::toJson) + .map(ksqlStatement::serializeResponse); + } + + private Mono getErrorMessage(ClientResponse response) { + return response + .bodyToMono(byte[].class) + .map(this::toJson) + .map(jsonNode -> jsonNode.get("message").asText()) + .flatMap(error -> Mono.error(new UnprocessableEntityException(error))); + } + + @SneakyThrows + private JsonNode toJson(byte[] content) { + return this.mapper.readTree(content); + } +} diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/config/ClustersProperties.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/config/ClustersProperties.java index f847ca2a80..63925376ca 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/config/ClustersProperties.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/config/ClustersProperties.java @@ -21,6 +21,7 @@ public class ClustersProperties { String zookeeper; String schemaRegistry; SchemaRegistryAuth schemaRegistryAuth; + String ksqldbServer; String schemaNameTemplate = "%s-value"; String keySchemaNameTemplate = "%s-key"; String protobufFile; diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/KsqlController.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/KsqlController.java new file mode 100644 index 0000000000..0a02de3b9a --- /dev/null +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/KsqlController.java @@ -0,0 +1,26 @@ +package com.provectus.kafka.ui.controller; + +import com.provectus.kafka.ui.api.KsqlApi; +import com.provectus.kafka.ui.model.KsqlCommand; +import com.provectus.kafka.ui.model.KsqlCommandResponse; +import com.provectus.kafka.ui.service.KsqlService; +import lombok.RequiredArgsConstructor; +import lombok.extern.log4j.Log4j2; +import org.springframework.http.ResponseEntity; +import org.springframework.web.bind.annotation.RestController; +import org.springframework.web.server.ServerWebExchange; +import reactor.core.publisher.Mono; + +@RestController +@RequiredArgsConstructor +@Log4j2 +public class KsqlController implements KsqlApi { + private final KsqlService ksqlService; + + @Override + public Mono> executeKsqlCommand(String clusterName, + Mono ksqlCommand, + ServerWebExchange exchange) { + return ksqlService.executeKsqlCommand(clusterName, ksqlCommand).map(ResponseEntity::ok); + } +} diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/exception/ErrorCode.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/exception/ErrorCode.java index 506ed40ef8..7c66fb98df 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/exception/ErrorCode.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/exception/ErrorCode.java @@ -19,7 +19,8 @@ public enum ErrorCode { CLUSTER_NOT_FOUND(4007, HttpStatus.NOT_FOUND), TOPIC_NOT_FOUND(4008, HttpStatus.NOT_FOUND), SCHEMA_NOT_FOUND(4009, HttpStatus.NOT_FOUND), - CONNECT_NOT_FOUND(4010, HttpStatus.NOT_FOUND); + CONNECT_NOT_FOUND(4010, HttpStatus.NOT_FOUND), + KSQLDB_NOT_FOUND(4011, HttpStatus.NOT_FOUND); static { // codes uniqueness check diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/exception/KsqlDbNotFoundException.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/exception/KsqlDbNotFoundException.java new file mode 100644 index 0000000000..c896ab0b98 --- /dev/null +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/exception/KsqlDbNotFoundException.java @@ -0,0 +1,13 @@ +package com.provectus.kafka.ui.exception; + +public class KsqlDbNotFoundException extends CustomBaseException { + + public KsqlDbNotFoundException() { + super("KSQL DB not found"); + } + + @Override + public ErrorCode getErrorCode() { + return ErrorCode.KSQLDB_NOT_FOUND; + } +} diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/Feature.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/Feature.java index 94a9fddf02..8b6e70264a 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/Feature.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/Feature.java @@ -11,6 +11,7 @@ public enum Feature { .filter(Predicate.not(List::isEmpty)) .isPresent() ), + KSQL_DB(cluster -> cluster.getKsqldbServer() != null), SCHEMA_REGISTRY(cluster -> cluster.getSchemaRegistry() != null); private final Predicate isEnabled; diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/KafkaCluster.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/KafkaCluster.java index 68b1607376..c87d494540 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/KafkaCluster.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/KafkaCluster.java @@ -16,6 +16,7 @@ public class KafkaCluster { private final String bootstrapServers; private final String zookeeper; private final InternalSchemaRegistry schemaRegistry; + private final String ksqldbServer; private final List kafkaConnect; private final String schemaNameTemplate; private final String keySchemaNameTemplate; diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/KsqlService.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/KsqlService.java new file mode 100644 index 0000000000..18824b36ce --- /dev/null +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/KsqlService.java @@ -0,0 +1,49 @@ +package com.provectus.kafka.ui.service; + +import com.provectus.kafka.ui.client.KsqlClient; +import com.provectus.kafka.ui.exception.ClusterNotFoundException; +import com.provectus.kafka.ui.exception.KsqlDbNotFoundException; +import com.provectus.kafka.ui.exception.UnprocessableEntityException; +import com.provectus.kafka.ui.model.KafkaCluster; +import com.provectus.kafka.ui.model.KsqlCommand; +import com.provectus.kafka.ui.model.KsqlCommandResponse; +import com.provectus.kafka.ui.strategy.ksql.statement.BaseStrategy; +import java.util.List; +import lombok.RequiredArgsConstructor; +import org.springframework.stereotype.Service; +import reactor.core.publisher.Mono; + +@Service +@RequiredArgsConstructor +public class KsqlService { + private final KsqlClient ksqlClient; + private final ClustersStorage clustersStorage; + private final List ksqlStatementStrategies; + + public Mono executeKsqlCommand(String clusterName, + Mono ksqlCommand) { + return Mono.justOrEmpty(clustersStorage.getClusterByName(clusterName)) + .switchIfEmpty(Mono.error(ClusterNotFoundException::new)) + .map(KafkaCluster::getKsqldbServer) + .onErrorResume(e -> { + Throwable throwable = + e instanceof ClusterNotFoundException ? e : new KsqlDbNotFoundException(); + return Mono.error(throwable); + }) + .flatMap(host -> getStatementStrategyForKsqlCommand(ksqlCommand) + .map(statement -> statement.host(host)) + ) + .flatMap(ksqlClient::execute); + } + + private Mono getStatementStrategyForKsqlCommand( + Mono ksqlCommand) { + return ksqlCommand + .map(command -> ksqlStatementStrategies.stream() + .filter(s -> s.test(command.getKsql())) + .map(s -> s.ksqlCommand(command)) + .findFirst()) + .flatMap(Mono::justOrEmpty) + .switchIfEmpty(Mono.error(new UnprocessableEntityException("Invalid sql"))); + } +} diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/strategy/ksql/statement/BaseStrategy.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/strategy/ksql/statement/BaseStrategy.java new file mode 100644 index 0000000000..4e6a9ae4f2 --- /dev/null +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/strategy/ksql/statement/BaseStrategy.java @@ -0,0 +1,166 @@ +package com.provectus.kafka.ui.strategy.ksql.statement; + +import com.fasterxml.jackson.databind.JsonNode; +import com.provectus.kafka.ui.exception.UnprocessableEntityException; +import com.provectus.kafka.ui.model.KsqlCommand; +import com.provectus.kafka.ui.model.KsqlCommandResponse; +import com.provectus.kafka.ui.model.Table; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Spliterator; +import java.util.Spliterators; +import java.util.stream.Collectors; +import java.util.stream.IntStream; +import java.util.stream.Stream; +import java.util.stream.StreamSupport; + +public abstract class BaseStrategy { + protected static final String KSQL_REQUEST_PATH = "/ksql"; + protected static final String QUERY_REQUEST_PATH = "/query"; + private static final String MAPPING_EXCEPTION_ERROR = "KSQL DB response mapping error"; + protected String host = null; + protected KsqlCommand ksqlCommand = null; + + public String getUri() { + if (this.host != null) { + return this.host + this.getRequestPath(); + } + throw new UnprocessableEntityException("Strategy doesn't have host"); + } + + public boolean test(String sql) { + return sql.trim().toLowerCase().matches(getTestRegExp()); + } + + public BaseStrategy host(String host) { + this.host = host; + return this; + } + + public KsqlCommand getKsqlCommand() { + return ksqlCommand; + } + + public BaseStrategy ksqlCommand(KsqlCommand ksqlCommand) { + this.ksqlCommand = ksqlCommand; + return this; + } + + protected String getRequestPath() { + return BaseStrategy.KSQL_REQUEST_PATH; + } + + protected KsqlCommandResponse serializeTableResponse(JsonNode response, String key) { + JsonNode item = getResponseFirstItemValue(response, key); + Table table = item.isArray() ? getTableFromArray(item) : getTableFromObject(item); + return (new KsqlCommandResponse()).data(table); + } + + protected KsqlCommandResponse serializeMessageResponse(JsonNode response, String key) { + JsonNode item = getResponseFirstItemValue(response, key); + return (new KsqlCommandResponse()).message(getMessageFromObject(item)); + } + + protected KsqlCommandResponse serializeQueryResponse(JsonNode response) { + if (response.isArray() && response.size() > 0) { + Table table = (new Table()) + .headers(getQueryResponseHeader(response)) + .rows(getQueryResponseRows(response)); + return (new KsqlCommandResponse()).data(table); + } + throw new UnprocessableEntityException(MAPPING_EXCEPTION_ERROR); + } + + private JsonNode getResponseFirstItemValue(JsonNode response, String key) { + if (response.isArray() && response.size() > 0) { + JsonNode first = response.get(0); + if (first.has(key)) { + return first.path(key); + } + } + throw new UnprocessableEntityException(MAPPING_EXCEPTION_ERROR); + } + + private List getQueryResponseHeader(JsonNode response) { + JsonNode headerRow = response.get(0); + if (headerRow.isObject() && headerRow.has("header")) { + String schema = headerRow.get("header").get("schema").asText(); + return Arrays.stream(schema.split(",")).map(String::trim).collect(Collectors.toList()); + } + return new ArrayList<>(); + } + + private List> getQueryResponseRows(JsonNode node) { + return getStreamForJsonArray(node) + .filter(row -> row.has("row") && row.get("row").has("columns")) + .map(row -> row.get("row").get("columns")) + .map(cellNode -> getStreamForJsonArray(cellNode) + .map(JsonNode::asText) + .collect(Collectors.toList()) + ) + .collect(Collectors.toList()); + } + + private Table getTableFromArray(JsonNode node) { + Table table = new Table(); + table.headers(new ArrayList<>()).rows(new ArrayList<>()); + if (node.size() > 0) { + List keys = getJsonObjectKeys(node.get(0)); + List> rows = getTableRows(node, keys); + table.headers(keys).rows(rows); + } + return table; + } + + private Table getTableFromObject(JsonNode node) { + List keys = getJsonObjectKeys(node); + List values = getJsonObjectValues(node); + List> rows = IntStream + .range(0, keys.size()) + .mapToObj(i -> List.of(keys.get(i), values.get(i))) + .collect(Collectors.toList()); + return (new Table()).headers(List.of("key", "value")).rows(rows); + } + + private String getMessageFromObject(JsonNode node) { + if (node.isObject() && node.has("message")) { + return node.get("message").asText(); + } + throw new UnprocessableEntityException(MAPPING_EXCEPTION_ERROR); + } + + private List> getTableRows(JsonNode node, List keys) { + return getStreamForJsonArray(node) + .map(row -> keys.stream() + .map(header -> row.get(header).asText()) + .collect(Collectors.toList()) + ) + .collect(Collectors.toList()); + } + + private Stream getStreamForJsonArray(JsonNode node) { + if (node.isArray() && node.size() > 0) { + return StreamSupport.stream(node.spliterator(), false); + } + throw new UnprocessableEntityException(MAPPING_EXCEPTION_ERROR); + } + + private List getJsonObjectKeys(JsonNode node) { + if (node.isObject()) { + return StreamSupport.stream( + Spliterators.spliteratorUnknownSize(node.fieldNames(), Spliterator.ORDERED), false + ).collect(Collectors.toList()); + } + throw new UnprocessableEntityException(MAPPING_EXCEPTION_ERROR); + } + + private List getJsonObjectValues(JsonNode node) { + return getJsonObjectKeys(node).stream().map(key -> node.get(key).asText()) + .collect(Collectors.toList()); + } + + public abstract KsqlCommandResponse serializeResponse(JsonNode response); + + protected abstract String getTestRegExp(); +} diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/strategy/ksql/statement/CreateStrategy.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/strategy/ksql/statement/CreateStrategy.java new file mode 100644 index 0000000000..bfdbce6773 --- /dev/null +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/strategy/ksql/statement/CreateStrategy.java @@ -0,0 +1,20 @@ +package com.provectus.kafka.ui.strategy.ksql.statement; + +import com.fasterxml.jackson.databind.JsonNode; +import com.provectus.kafka.ui.model.KsqlCommandResponse; +import org.springframework.stereotype.Component; + +@Component +public class CreateStrategy extends BaseStrategy { + private static final String RESPONSE_VALUE_KEY = "commandStatus"; + + @Override + public KsqlCommandResponse serializeResponse(JsonNode response) { + return serializeMessageResponse(response, RESPONSE_VALUE_KEY); + } + + @Override + protected String getTestRegExp() { + return "create (table|stream)(.*)(with|as select(.*)from)(.*);"; + } +} diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/strategy/ksql/statement/DescribeStrategy.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/strategy/ksql/statement/DescribeStrategy.java new file mode 100644 index 0000000000..611cae99c8 --- /dev/null +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/strategy/ksql/statement/DescribeStrategy.java @@ -0,0 +1,20 @@ +package com.provectus.kafka.ui.strategy.ksql.statement; + +import com.fasterxml.jackson.databind.JsonNode; +import com.provectus.kafka.ui.model.KsqlCommandResponse; +import org.springframework.stereotype.Component; + +@Component +public class DescribeStrategy extends BaseStrategy { + private static final String RESPONSE_VALUE_KEY = "sourceDescription"; + + @Override + public KsqlCommandResponse serializeResponse(JsonNode response) { + return serializeTableResponse(response, RESPONSE_VALUE_KEY); + } + + @Override + protected String getTestRegExp() { + return "describe (.*);"; + } +} diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/strategy/ksql/statement/DropStrategy.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/strategy/ksql/statement/DropStrategy.java new file mode 100644 index 0000000000..253c3906c6 --- /dev/null +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/strategy/ksql/statement/DropStrategy.java @@ -0,0 +1,20 @@ +package com.provectus.kafka.ui.strategy.ksql.statement; + +import com.fasterxml.jackson.databind.JsonNode; +import com.provectus.kafka.ui.model.KsqlCommandResponse; +import org.springframework.stereotype.Component; + +@Component +public class DropStrategy extends BaseStrategy { + private static final String RESPONSE_VALUE_KEY = "commandStatus"; + + @Override + public KsqlCommandResponse serializeResponse(JsonNode response) { + return serializeMessageResponse(response, RESPONSE_VALUE_KEY); + } + + @Override + protected String getTestRegExp() { + return "drop (table|stream) (.*);"; + } +} diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/strategy/ksql/statement/ExplainStrategy.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/strategy/ksql/statement/ExplainStrategy.java new file mode 100644 index 0000000000..c80b255eae --- /dev/null +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/strategy/ksql/statement/ExplainStrategy.java @@ -0,0 +1,20 @@ +package com.provectus.kafka.ui.strategy.ksql.statement; + +import com.fasterxml.jackson.databind.JsonNode; +import com.provectus.kafka.ui.model.KsqlCommandResponse; +import org.springframework.stereotype.Component; + +@Component +public class ExplainStrategy extends BaseStrategy { + private static final String RESPONSE_VALUE_KEY = "queryDescription"; + + @Override + public KsqlCommandResponse serializeResponse(JsonNode response) { + return serializeTableResponse(response, RESPONSE_VALUE_KEY); + } + + @Override + protected String getTestRegExp() { + return "explain (.*);"; + } +} diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/strategy/ksql/statement/SelectStrategy.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/strategy/ksql/statement/SelectStrategy.java new file mode 100644 index 0000000000..0c7a6a6d87 --- /dev/null +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/strategy/ksql/statement/SelectStrategy.java @@ -0,0 +1,24 @@ +package com.provectus.kafka.ui.strategy.ksql.statement; + +import com.fasterxml.jackson.databind.JsonNode; +import com.provectus.kafka.ui.model.KsqlCommandResponse; +import org.springframework.stereotype.Component; + +@Component +public class SelectStrategy extends BaseStrategy { + + @Override + public KsqlCommandResponse serializeResponse(JsonNode response) { + return serializeQueryResponse(response); + } + + @Override + protected String getRequestPath() { + return BaseStrategy.QUERY_REQUEST_PATH; + } + + @Override + protected String getTestRegExp() { + return "select (.*) from (.*);"; + } +} diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/strategy/ksql/statement/ShowStrategy.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/strategy/ksql/statement/ShowStrategy.java new file mode 100644 index 0000000000..1b80c6648b --- /dev/null +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/strategy/ksql/statement/ShowStrategy.java @@ -0,0 +1,57 @@ +package com.provectus.kafka.ui.strategy.ksql.statement; + +import com.fasterxml.jackson.databind.JsonNode; +import com.provectus.kafka.ui.model.KsqlCommandResponse; +import java.util.List; +import java.util.Optional; +import org.springframework.stereotype.Component; + +@Component +public class ShowStrategy extends BaseStrategy { + private static final List SHOW_STATEMENTS = + List.of("functions", "topics", "streams", "tables", "queries", "properties"); + private static final List LIST_STATEMENTS = + List.of("functions", "topics", "streams", "tables"); + private String responseValueKey = ""; + + @Override + public KsqlCommandResponse serializeResponse(JsonNode response) { + return serializeTableResponse(response, responseValueKey); + } + + @Override + public boolean test(String sql) { + Optional statement = SHOW_STATEMENTS.stream() + .filter(s -> testSql(sql, getShowRegExp(s)) || testSql(sql, getListRegExp(s))) + .findFirst(); + if (statement.isPresent()) { + setResponseValueKey(statement.get()); + return true; + } + return false; + } + + @Override + protected String getTestRegExp() { + return ""; + } + + protected String getShowRegExp(String key) { + return "show " + key + ";"; + } + + protected String getListRegExp(String key) { + if (LIST_STATEMENTS.contains(key)) { + return "list " + key + ";"; + } + return ""; + } + + private void setResponseValueKey(String path) { + responseValueKey = path; + } + + private boolean testSql(String sql, String pattern) { + return sql.trim().toLowerCase().matches(pattern); + } +} diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/strategy/ksql/statement/TerminateStrategy.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/strategy/ksql/statement/TerminateStrategy.java new file mode 100644 index 0000000000..a7b531d0bb --- /dev/null +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/strategy/ksql/statement/TerminateStrategy.java @@ -0,0 +1,20 @@ +package com.provectus.kafka.ui.strategy.ksql.statement; + +import com.fasterxml.jackson.databind.JsonNode; +import com.provectus.kafka.ui.model.KsqlCommandResponse; +import org.springframework.stereotype.Component; + +@Component +public class TerminateStrategy extends BaseStrategy { + private static final String RESPONSE_VALUE_KEY = "commandStatus"; + + @Override + public KsqlCommandResponse serializeResponse(JsonNode response) { + return serializeMessageResponse(response, RESPONSE_VALUE_KEY); + } + + @Override + protected String getTestRegExp() { + return "terminate (.*);"; + } +} diff --git a/kafka-ui-api/src/main/resources/application-local.yml b/kafka-ui-api/src/main/resources/application-local.yml index a684928f68..31190a7788 100644 --- a/kafka-ui-api/src/main/resources/application-local.yml +++ b/kafka-ui-api/src/main/resources/application-local.yml @@ -4,6 +4,7 @@ kafka: bootstrapServers: localhost:9093 zookeeper: localhost:2181 schemaRegistry: http://localhost:8081 + ksqldbServer: http://localhost:8088 kafkaConnect: - name: first address: http://localhost:8083 @@ -25,4 +26,4 @@ spring: jmx: enabled: true auth: - enabled: false \ No newline at end of file + enabled: false diff --git a/kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/KsqlServiceTest.java b/kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/KsqlServiceTest.java new file mode 100644 index 0000000000..aef12d8ec8 --- /dev/null +++ b/kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/KsqlServiceTest.java @@ -0,0 +1,124 @@ +package com.provectus.kafka.ui.service; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import com.provectus.kafka.ui.client.KsqlClient; +import com.provectus.kafka.ui.exception.ClusterNotFoundException; +import com.provectus.kafka.ui.exception.KsqlDbNotFoundException; +import com.provectus.kafka.ui.exception.UnprocessableEntityException; +import com.provectus.kafka.ui.model.KafkaCluster; +import com.provectus.kafka.ui.model.KsqlCommand; +import com.provectus.kafka.ui.model.KsqlCommandResponse; +import com.provectus.kafka.ui.strategy.ksql.statement.BaseStrategy; +import com.provectus.kafka.ui.strategy.ksql.statement.ShowStrategy; +import java.util.List; +import java.util.Optional; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.Mockito; +import org.mockito.junit.jupiter.MockitoExtension; +import reactor.core.publisher.Mono; +import reactor.test.StepVerifier; + +@ExtendWith(MockitoExtension.class) +class KsqlServiceTest { + private KsqlService ksqlService; + private BaseStrategy baseStrategy; + + @Mock + private ClustersStorage clustersStorage; + @Mock + private KsqlClient ksqlClient; + + + @BeforeEach + public void setUp() { + this.baseStrategy = new ShowStrategy(); + this.ksqlService = new KsqlService( + this.ksqlClient, + this.clustersStorage, + List.of(baseStrategy) + ); + } + + @Test + void shouldThrowClusterNotFoundExceptionOnExecuteKsqlCommand() { + String clusterName = "test"; + KsqlCommand command = (new KsqlCommand()).ksql("show streams;"); + when(clustersStorage.getClusterByName(clusterName)).thenReturn(Optional.ofNullable(null)); + + StepVerifier.create(ksqlService.executeKsqlCommand(clusterName, Mono.just(command))) + .verifyError(ClusterNotFoundException.class); + } + + @Test + void shouldThrowKsqlDbNotFoundExceptionOnExecuteKsqlCommand() { + String clusterName = "test"; + KsqlCommand command = (new KsqlCommand()).ksql("show streams;"); + KafkaCluster kafkaCluster = Mockito.mock(KafkaCluster.class); + when(clustersStorage.getClusterByName(clusterName)) + .thenReturn(Optional.ofNullable(kafkaCluster)); + when(kafkaCluster.getKsqldbServer()).thenReturn(null); + + StepVerifier.create(ksqlService.executeKsqlCommand(clusterName, Mono.just(command))) + .verifyError(KsqlDbNotFoundException.class); + } + + @Test + void shouldThrowUnprocessableEntityExceptionOnExecuteKsqlCommand() { + String clusterName = "test"; + KsqlCommand command = + (new KsqlCommand()).ksql("CREATE STREAM users WITH (KAFKA_TOPIC='users');"); + KafkaCluster kafkaCluster = Mockito.mock(KafkaCluster.class); + when(clustersStorage.getClusterByName(clusterName)) + .thenReturn(Optional.ofNullable(kafkaCluster)); + when(kafkaCluster.getKsqldbServer()).thenReturn("localhost:8088"); + + StepVerifier.create(ksqlService.executeKsqlCommand(clusterName, Mono.just(command))) + .verifyError(UnprocessableEntityException.class); + + StepVerifier.create(ksqlService.executeKsqlCommand(clusterName, Mono.just(command))) + .verifyErrorMessage("Invalid sql"); + } + + @Test + void shouldSetHostToStrategy() { + String clusterName = "test"; + String host = "localhost:8088"; + KsqlCommand command = (new KsqlCommand()).ksql("show streams;"); + KafkaCluster kafkaCluster = Mockito.mock(KafkaCluster.class); + + when(clustersStorage.getClusterByName(clusterName)) + .thenReturn(Optional.ofNullable(kafkaCluster)); + when(kafkaCluster.getKsqldbServer()).thenReturn(host); + when(ksqlClient.execute(any())).thenReturn(Mono.just(new KsqlCommandResponse())); + + ksqlService.executeKsqlCommand(clusterName, Mono.just(command)).block(); + assertThat(baseStrategy.getUri()).isEqualTo(host + "/ksql"); + } + + @Test + void shouldCallClientAndReturnResponse() { + String clusterName = "test"; + KsqlCommand command = (new KsqlCommand()).ksql("show streams;"); + KafkaCluster kafkaCluster = Mockito.mock(KafkaCluster.class); + KsqlCommandResponse response = new KsqlCommandResponse().message("success"); + + when(clustersStorage.getClusterByName(clusterName)) + .thenReturn(Optional.ofNullable(kafkaCluster)); + when(kafkaCluster.getKsqldbServer()).thenReturn("host"); + when(ksqlClient.execute(any())).thenReturn(Mono.just(response)); + + KsqlCommandResponse receivedResponse = + ksqlService.executeKsqlCommand(clusterName, Mono.just(command)).block(); + verify(ksqlClient, times(1)).execute(baseStrategy); + assertThat(receivedResponse).isEqualTo(response); + + } +} diff --git a/kafka-ui-api/src/test/java/com/provectus/kafka/ui/strategy/ksql/statement/CreateStrategyTest.java b/kafka-ui-api/src/test/java/com/provectus/kafka/ui/strategy/ksql/statement/CreateStrategyTest.java new file mode 100644 index 0000000000..58e838531b --- /dev/null +++ b/kafka-ui-api/src/test/java/com/provectus/kafka/ui/strategy/ksql/statement/CreateStrategyTest.java @@ -0,0 +1,85 @@ +package com.provectus.kafka.ui.strategy.ksql.statement; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.provectus.kafka.ui.exception.UnprocessableEntityException; +import com.provectus.kafka.ui.model.KsqlCommandResponse; +import lombok.SneakyThrows; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.junit.jupiter.MockitoExtension; + +@ExtendWith(MockitoExtension.class) +class CreateStrategyTest { + private final ObjectMapper mapper = new ObjectMapper(); + private CreateStrategy strategy; + + @BeforeEach + void setUp() { + strategy = new CreateStrategy(); + } + + @Test + void shouldReturnUri() { + strategy.host("ksqldb-server:8088"); + assertThat(strategy.getUri()).isEqualTo("ksqldb-server:8088/ksql"); + } + + @Test + void shouldReturnTrueInTest() { + assertTrue(strategy.test("CREATE STREAM stream WITH (KAFKA_TOPIC='topic');")); + assertTrue(strategy.test("CREATE STREAM stream" + + " AS SELECT users.id AS userid FROM users EMIT CHANGES;" + )); + assertTrue(strategy.test( + "CREATE TABLE table (id VARCHAR) WITH (KAFKA_TOPIC='table');" + )); + assertTrue(strategy.test( + "CREATE TABLE pageviews_regions WITH (KEY_FORMAT='JSON')" + + " AS SELECT gender, COUNT(*) AS numbers" + + " FROM pageviews EMIT CHANGES;" + )); + } + + @Test + void shouldReturnFalseInTest() { + assertFalse(strategy.test("show streams;")); + assertFalse(strategy.test("show tables;")); + assertFalse(strategy.test("CREATE TABLE test;")); + assertFalse(strategy.test("CREATE STREAM test;")); + } + + @Test + void shouldSerializeResponse() { + String message = "updated successful"; + JsonNode node = getResponseWithMessage(message); + KsqlCommandResponse serializedResponse = strategy.serializeResponse(node); + assertThat(serializedResponse.getMessage()).isEqualTo(message); + + } + + @Test + void shouldSerializeWithException() { + JsonNode commandStatusNode = mapper.createObjectNode().put("commandStatus", "nodeWithMessage"); + JsonNode node = mapper.createArrayNode().add(mapper.valueToTree(commandStatusNode)); + Exception exception = assertThrows( + UnprocessableEntityException.class, + () -> strategy.serializeResponse(node) + ); + + assertThat(exception.getMessage()).isEqualTo("KSQL DB response mapping error"); + } + + @SneakyThrows + private JsonNode getResponseWithMessage(String message) { + JsonNode nodeWithMessage = mapper.createObjectNode().put("message", message); + JsonNode commandStatusNode = mapper.createObjectNode().set("commandStatus", nodeWithMessage); + return mapper.createArrayNode().add(mapper.valueToTree(commandStatusNode)); + } +} diff --git a/kafka-ui-api/src/test/java/com/provectus/kafka/ui/strategy/ksql/statement/DescribeStrategyTest.java b/kafka-ui-api/src/test/java/com/provectus/kafka/ui/strategy/ksql/statement/DescribeStrategyTest.java new file mode 100644 index 0000000000..9591959533 --- /dev/null +++ b/kafka-ui-api/src/test/java/com/provectus/kafka/ui/strategy/ksql/statement/DescribeStrategyTest.java @@ -0,0 +1,76 @@ +package com.provectus.kafka.ui.strategy.ksql.statement; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.provectus.kafka.ui.exception.UnprocessableEntityException; +import com.provectus.kafka.ui.model.KsqlCommandResponse; +import com.provectus.kafka.ui.model.Table; +import java.util.List; +import lombok.SneakyThrows; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.junit.jupiter.MockitoExtension; + +@ExtendWith(MockitoExtension.class) +class DescribeStrategyTest { + private final ObjectMapper mapper = new ObjectMapper(); + private DescribeStrategy strategy; + + @BeforeEach + void setUp() { + strategy = new DescribeStrategy(); + } + + @Test + void shouldReturnUri() { + strategy.host("ksqldb-server:8088"); + assertThat(strategy.getUri()).isEqualTo("ksqldb-server:8088/ksql"); + } + + @Test + void shouldReturnTrueInTest() { + assertTrue(strategy.test("DESCRIBE users;")); + assertTrue(strategy.test("DESCRIBE EXTENDED users;")); + } + + @Test + void shouldReturnFalseInTest() { + assertFalse(strategy.test("list streams;")); + assertFalse(strategy.test("show tables;")); + } + + @Test + void shouldSerializeResponse() { + JsonNode node = getResponseWithObjectNode(); + KsqlCommandResponse serializedResponse = strategy.serializeResponse(node); + Table table = serializedResponse.getData(); + assertThat(table.getHeaders()).isEqualTo(List.of("key", "value")); + assertThat(table.getRows()).isEqualTo(List.of(List.of("name", "kafka"))); + } + + @Test + void shouldSerializeWithException() { + JsonNode sourceDescriptionNode = + mapper.createObjectNode().put("sourceDescription", "nodeWithMessage"); + JsonNode node = mapper.createArrayNode().add(mapper.valueToTree(sourceDescriptionNode)); + Exception exception = assertThrows( + UnprocessableEntityException.class, + () -> strategy.serializeResponse(node) + ); + + assertThat(exception.getMessage()).isEqualTo("KSQL DB response mapping error"); + } + + @SneakyThrows + private JsonNode getResponseWithObjectNode() { + JsonNode nodeWithMessage = mapper.createObjectNode().put("name", "kafka"); + JsonNode nodeWithResponse = mapper.createObjectNode().set("sourceDescription", nodeWithMessage); + return mapper.createArrayNode().add(mapper.valueToTree(nodeWithResponse)); + } +} diff --git a/kafka-ui-api/src/test/java/com/provectus/kafka/ui/strategy/ksql/statement/DropStrategyTest.java b/kafka-ui-api/src/test/java/com/provectus/kafka/ui/strategy/ksql/statement/DropStrategyTest.java new file mode 100644 index 0000000000..2c95dbe87e --- /dev/null +++ b/kafka-ui-api/src/test/java/com/provectus/kafka/ui/strategy/ksql/statement/DropStrategyTest.java @@ -0,0 +1,75 @@ +package com.provectus.kafka.ui.strategy.ksql.statement; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.provectus.kafka.ui.exception.UnprocessableEntityException; +import com.provectus.kafka.ui.model.KsqlCommandResponse; +import lombok.SneakyThrows; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.junit.jupiter.MockitoExtension; + +@ExtendWith(MockitoExtension.class) +class DropStrategyTest { + private final ObjectMapper mapper = new ObjectMapper(); + private DropStrategy strategy; + + @BeforeEach + void setUp() { + strategy = new DropStrategy(); + } + + @Test + void shouldReturnUri() { + strategy.host("ksqldb-server:8088"); + assertThat(strategy.getUri()).isEqualTo("ksqldb-server:8088/ksql"); + } + + @Test + void shouldReturnTrueInTest() { + assertTrue(strategy.test("drop table table1;")); + assertTrue(strategy.test("drop stream stream2;")); + } + + @Test + void shouldReturnFalseInTest() { + assertFalse(strategy.test("show streams;")); + assertFalse(strategy.test("show tables;")); + assertFalse(strategy.test("create table test;")); + assertFalse(strategy.test("create stream test;")); + } + + @Test + void shouldSerializeResponse() { + String message = "updated successful"; + JsonNode node = getResponseWithMessage(message); + KsqlCommandResponse serializedResponse = strategy.serializeResponse(node); + assertThat(serializedResponse.getMessage()).isEqualTo(message); + + } + + @Test + void shouldSerializeWithException() { + JsonNode commandStatusNode = mapper.createObjectNode().put("commandStatus", "nodeWithMessage"); + JsonNode node = mapper.createArrayNode().add(mapper.valueToTree(commandStatusNode)); + Exception exception = assertThrows( + UnprocessableEntityException.class, + () -> strategy.serializeResponse(node) + ); + + assertThat(exception.getMessage()).isEqualTo("KSQL DB response mapping error"); + } + + @SneakyThrows + private JsonNode getResponseWithMessage(String message) { + JsonNode nodeWithMessage = mapper.createObjectNode().put("message", message); + JsonNode commandStatusNode = mapper.createObjectNode().set("commandStatus", nodeWithMessage); + return mapper.createArrayNode().add(mapper.valueToTree(commandStatusNode)); + } +} diff --git a/kafka-ui-api/src/test/java/com/provectus/kafka/ui/strategy/ksql/statement/ExplainStrategyTest.java b/kafka-ui-api/src/test/java/com/provectus/kafka/ui/strategy/ksql/statement/ExplainStrategyTest.java new file mode 100644 index 0000000000..6ad4d15563 --- /dev/null +++ b/kafka-ui-api/src/test/java/com/provectus/kafka/ui/strategy/ksql/statement/ExplainStrategyTest.java @@ -0,0 +1,74 @@ +package com.provectus.kafka.ui.strategy.ksql.statement; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.provectus.kafka.ui.exception.UnprocessableEntityException; +import com.provectus.kafka.ui.model.KsqlCommandResponse; +import com.provectus.kafka.ui.model.Table; +import java.util.List; +import lombok.SneakyThrows; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.junit.jupiter.MockitoExtension; + +@ExtendWith(MockitoExtension.class) +class ExplainStrategyTest { + private final ObjectMapper mapper = new ObjectMapper(); + private ExplainStrategy strategy; + + @BeforeEach + void setUp() { + strategy = new ExplainStrategy(); + } + + @Test + void shouldReturnUri() { + strategy.host("ksqldb-server:8088"); + assertThat(strategy.getUri()).isEqualTo("ksqldb-server:8088/ksql"); + } + + @Test + void shouldReturnTrueInTest() { + assertTrue(strategy.test("explain users_query_id;")); + } + + @Test + void shouldReturnFalseInTest() { + assertFalse(strategy.test("show queries;")); + } + + @Test + void shouldSerializeResponse() { + JsonNode node = getResponseWithObjectNode(); + KsqlCommandResponse serializedResponse = strategy.serializeResponse(node); + Table table = serializedResponse.getData(); + assertThat(table.getHeaders()).isEqualTo(List.of("key", "value")); + assertThat(table.getRows()).isEqualTo(List.of(List.of("name", "kafka"))); + } + + @Test + void shouldSerializeWithException() { + JsonNode sourceDescriptionNode = + mapper.createObjectNode().put("sourceDescription", "nodeWithMessage"); + JsonNode node = mapper.createArrayNode().add(mapper.valueToTree(sourceDescriptionNode)); + Exception exception = assertThrows( + UnprocessableEntityException.class, + () -> strategy.serializeResponse(node) + ); + + assertThat(exception.getMessage()).isEqualTo("KSQL DB response mapping error"); + } + + @SneakyThrows + private JsonNode getResponseWithObjectNode() { + JsonNode nodeWithMessage = mapper.createObjectNode().put("name", "kafka"); + JsonNode nodeWithResponse = mapper.createObjectNode().set("queryDescription", nodeWithMessage); + return mapper.createArrayNode().add(mapper.valueToTree(nodeWithResponse)); + } +} diff --git a/kafka-ui-api/src/test/java/com/provectus/kafka/ui/strategy/ksql/statement/SelectStrategyTest.java b/kafka-ui-api/src/test/java/com/provectus/kafka/ui/strategy/ksql/statement/SelectStrategyTest.java new file mode 100644 index 0000000000..7509c12301 --- /dev/null +++ b/kafka-ui-api/src/test/java/com/provectus/kafka/ui/strategy/ksql/statement/SelectStrategyTest.java @@ -0,0 +1,79 @@ +package com.provectus.kafka.ui.strategy.ksql.statement; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.provectus.kafka.ui.exception.UnprocessableEntityException; +import com.provectus.kafka.ui.model.KsqlCommandResponse; +import com.provectus.kafka.ui.model.Table; +import java.util.List; +import lombok.SneakyThrows; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.junit.jupiter.MockitoExtension; + +@ExtendWith(MockitoExtension.class) +class SelectStrategyTest { + private final ObjectMapper mapper = new ObjectMapper(); + private SelectStrategy strategy; + + @BeforeEach + void setUp() { + strategy = new SelectStrategy(); + } + + @Test + void shouldReturnUri() { + strategy.host("ksqldb-server:8088"); + assertThat(strategy.getUri()).isEqualTo("ksqldb-server:8088/query"); + } + + @Test + void shouldReturnTrueInTest() { + assertTrue(strategy.test("select * from users;")); + } + + @Test + void shouldReturnFalseInTest() { + assertFalse(strategy.test("show streams;")); + assertFalse(strategy.test("select *;")); + } + + @Test + void shouldSerializeResponse() { + JsonNode node = getResponseWithData(); + KsqlCommandResponse serializedResponse = strategy.serializeResponse(node); + Table table = serializedResponse.getData(); + assertThat(table.getHeaders()).isEqualTo(List.of("header1", "header2")); + assertThat(table.getRows()).isEqualTo(List.of(List.of("value1", "value2"))); + } + + @Test + void shouldSerializeWithException() { + JsonNode node = mapper.createObjectNode(); + Exception exception = assertThrows( + UnprocessableEntityException.class, + () -> strategy.serializeResponse(node) + ); + + assertThat(exception.getMessage()).isEqualTo("KSQL DB response mapping error"); + } + + @SneakyThrows + private JsonNode getResponseWithData() { + JsonNode headerNode = mapper.createObjectNode().set( + "header", mapper.createObjectNode().put("schema", "header1, header2") + ); + JsonNode row = mapper.createObjectNode().set( + "row", mapper.createObjectNode().set( + "columns", mapper.createArrayNode().add("value1").add("value2") + ) + ); + return mapper.createArrayNode().add(headerNode).add(row); + } +} diff --git a/kafka-ui-api/src/test/java/com/provectus/kafka/ui/strategy/ksql/statement/ShowStrategyTest.java b/kafka-ui-api/src/test/java/com/provectus/kafka/ui/strategy/ksql/statement/ShowStrategyTest.java new file mode 100644 index 0000000000..056e5f153b --- /dev/null +++ b/kafka-ui-api/src/test/java/com/provectus/kafka/ui/strategy/ksql/statement/ShowStrategyTest.java @@ -0,0 +1,135 @@ +package com.provectus.kafka.ui.strategy.ksql.statement; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.provectus.kafka.ui.exception.UnprocessableEntityException; +import com.provectus.kafka.ui.model.KsqlCommandResponse; +import com.provectus.kafka.ui.model.Table; +import java.util.List; +import lombok.SneakyThrows; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.junit.jupiter.MockitoExtension; + +@ExtendWith(MockitoExtension.class) +class ShowStrategyTest { + private final ObjectMapper mapper = new ObjectMapper(); + private ShowStrategy strategy; + + @BeforeEach + void setUp() { + strategy = new ShowStrategy(); + } + + @Test + void shouldReturnUri() { + strategy.host("ksqldb-server:8088"); + assertThat(strategy.getUri()).isEqualTo("ksqldb-server:8088/ksql"); + } + + @Test + void shouldReturnTrueInTest() { + assertTrue(strategy.test("SHOW STREAMS;")); + assertTrue(strategy.test("SHOW TABLES;")); + assertTrue(strategy.test("SHOW TOPICS;")); + assertTrue(strategy.test("SHOW QUERIES;")); + assertTrue(strategy.test("SHOW PROPERTIES;")); + assertTrue(strategy.test("SHOW FUNCTIONS;")); + assertTrue(strategy.test("LIST STREAMS;")); + assertTrue(strategy.test("LIST TABLES;")); + assertTrue(strategy.test("LIST TOPICS;")); + assertTrue(strategy.test("LIST FUNCTIONS;")); + } + + @Test + void shouldReturnFalseInTest() { + assertFalse(strategy.test("LIST QUERIES;")); + assertFalse(strategy.test("LIST PROPERTIES;")); + } + + @Test + void shouldSerializeStreamsResponse() { + JsonNode node = getResponseWithData("streams"); + strategy.test("show streams;"); + KsqlCommandResponse serializedResponse = strategy.serializeResponse(node); + Table table = serializedResponse.getData(); + assertThat(table.getHeaders()).isEqualTo(List.of("header")); + assertThat(table.getRows()).isEqualTo(List.of(List.of("value"))); + } + + @Test + void shouldSerializeTablesResponse() { + JsonNode node = getResponseWithData("tables"); + strategy.test("show tables;"); + KsqlCommandResponse serializedResponse = strategy.serializeResponse(node); + Table table = serializedResponse.getData(); + assertThat(table.getHeaders()).isEqualTo(List.of("header")); + assertThat(table.getRows()).isEqualTo(List.of(List.of("value"))); + } + + @Test + void shouldSerializeTopicsResponse() { + JsonNode node = getResponseWithData("topics"); + strategy.test("show topics;"); + KsqlCommandResponse serializedResponse = strategy.serializeResponse(node); + Table table = serializedResponse.getData(); + assertThat(table.getHeaders()).isEqualTo(List.of("header")); + assertThat(table.getRows()).isEqualTo(List.of(List.of("value"))); + } + + @Test + void shouldSerializePropertiesResponse() { + JsonNode node = getResponseWithData("properties"); + strategy.test("show properties;"); + KsqlCommandResponse serializedResponse = strategy.serializeResponse(node); + Table table = serializedResponse.getData(); + assertThat(table.getHeaders()).isEqualTo(List.of("header")); + assertThat(table.getRows()).isEqualTo(List.of(List.of("value"))); + } + + @Test + void shouldSerializeFunctionsResponse() { + JsonNode node = getResponseWithData("functions"); + strategy.test("show functions;"); + KsqlCommandResponse serializedResponse = strategy.serializeResponse(node); + Table table = serializedResponse.getData(); + assertThat(table.getHeaders()).isEqualTo(List.of("header")); + assertThat(table.getRows()).isEqualTo(List.of(List.of("value"))); + } + + @Test + void shouldSerializeQueriesResponse() { + JsonNode node = getResponseWithData("queries"); + strategy.test("show queries;"); + KsqlCommandResponse serializedResponse = strategy.serializeResponse(node); + Table table = serializedResponse.getData(); + assertThat(table.getHeaders()).isEqualTo(List.of("header")); + assertThat(table.getRows()).isEqualTo(List.of(List.of("value"))); + } + + @Test + void shouldSerializeWithException() { + JsonNode node = getResponseWithData("streams"); + strategy.test("show tables;"); + Exception exception = assertThrows( + UnprocessableEntityException.class, + () -> strategy.serializeResponse(node) + ); + + assertThat(exception.getMessage()).isEqualTo("KSQL DB response mapping error"); + } + + @SneakyThrows + private JsonNode getResponseWithData(String key) { + JsonNode nodeWithDataItem = mapper.createObjectNode().put("header", "value"); + JsonNode nodeWithData = mapper.createArrayNode().add(nodeWithDataItem); + JsonNode nodeWithResponse = mapper.createObjectNode().set(key, nodeWithData); + return mapper.createArrayNode().add(mapper.valueToTree(nodeWithResponse)); + } +} diff --git a/kafka-ui-api/src/test/java/com/provectus/kafka/ui/strategy/ksql/statement/TerminateStrategyTest.java b/kafka-ui-api/src/test/java/com/provectus/kafka/ui/strategy/ksql/statement/TerminateStrategyTest.java new file mode 100644 index 0000000000..c84ca2d5da --- /dev/null +++ b/kafka-ui-api/src/test/java/com/provectus/kafka/ui/strategy/ksql/statement/TerminateStrategyTest.java @@ -0,0 +1,72 @@ +package com.provectus.kafka.ui.strategy.ksql.statement; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.provectus.kafka.ui.exception.UnprocessableEntityException; +import com.provectus.kafka.ui.model.KsqlCommandResponse; +import lombok.SneakyThrows; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.junit.jupiter.MockitoExtension; + +@ExtendWith(MockitoExtension.class) +class TerminateStrategyTest { + private final ObjectMapper mapper = new ObjectMapper(); + private TerminateStrategy strategy; + + @BeforeEach + void setUp() { + strategy = new TerminateStrategy(); + } + + @Test + void shouldReturnUri() { + strategy.host("ksqldb-server:8088"); + assertThat(strategy.getUri()).isEqualTo("ksqldb-server:8088/ksql"); + } + + @Test + void shouldReturnTrueInTest() { + assertTrue(strategy.test("terminate query_id;")); + } + + @Test + void shouldReturnFalseInTest() { + assertFalse(strategy.test("show streams;")); + assertFalse(strategy.test("create table test;")); + } + + @Test + void shouldSerializeResponse() { + String message = "query terminated."; + JsonNode node = getResponseWithMessage(message); + KsqlCommandResponse serializedResponse = strategy.serializeResponse(node); + assertThat(serializedResponse.getMessage()).isEqualTo(message); + + } + + @Test + void shouldSerializeWithException() { + JsonNode commandStatusNode = mapper.createObjectNode().put("commandStatus", "nodeWithMessage"); + JsonNode node = mapper.createArrayNode().add(mapper.valueToTree(commandStatusNode)); + Exception exception = assertThrows( + UnprocessableEntityException.class, + () -> strategy.serializeResponse(node) + ); + + assertThat(exception.getMessage()).isEqualTo("KSQL DB response mapping error"); + } + + @SneakyThrows + private JsonNode getResponseWithMessage(String message) { + JsonNode nodeWithMessage = mapper.createObjectNode().put("message", message); + JsonNode commandStatusNode = mapper.createObjectNode().set("commandStatus", nodeWithMessage); + return mapper.createArrayNode().add(mapper.valueToTree(commandStatusNode)); + } +} diff --git a/kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml b/kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml index 9eb892fa5a..9135b039ab 100644 --- a/kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml +++ b/kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml @@ -1247,6 +1247,31 @@ paths: 200: description: OK + /api/clusters/{clusterName}/ksql: + post: + tags: + - Ksql + summary: executeKsqlCommand + operationId: executeKsqlCommand + parameters: + - name: clusterName + in: path + required: true + schema: + type: string + requestBody: + content: + application/json: + schema: + $ref: '#/components/schemas/KsqlCommand' + responses: + 200: + description: OK + content: + application/json: + schema: + $ref: '#/components/schemas/KsqlCommandResponse' + /api/clusters/{clusterName}/connects/{connectName}/plugins: get: tags: @@ -1406,6 +1431,7 @@ components: enum: - SCHEMA_REGISTRY - KAFKA_CONNECT + - KSQL_DB required: - id - name @@ -2207,6 +2233,43 @@ components: items: $ref: '#/components/schemas/ConnectorPluginConfig' + KsqlCommand: + type: object + properties: + ksql: + type: string + streamsProperties: + type: object + additionalProperties: + type: string + required: + - ksql + + KsqlCommandResponse: + type: object + properties: + data: + $ref: '#/components/schemas/Table' + message: + type: string + + Table: + type: object + properties: + headers: + type: array + items: + type: string + rows: + type: array + items: + type: array + items: + type: string + required: + - headers + - rows + FullConnectorInfo: type: object properties: