diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/client/KsqlClient.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/client/KsqlClient.java index 746b50ca03..be38685d03 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/client/KsqlClient.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/client/KsqlClient.java @@ -1,8 +1,11 @@ package com.provectus.kafka.ui.client; +import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; +import com.provectus.kafka.ui.model.KsqlResponseTable; import com.provectus.kafka.ui.strategy.ksqlStatement.KsqlStatementStrategy; import lombok.RequiredArgsConstructor; +import lombok.SneakyThrows; import lombok.extern.log4j.Log4j2; import org.springframework.http.MediaType; import org.springframework.stereotype.Service; @@ -15,15 +18,22 @@ import reactor.core.publisher.Mono; @Log4j2 public final class KsqlClient { private final WebClient webClient; + private final ObjectMapper mapper; - public Mono execute(KsqlStatementStrategy ksqlStatement) { + public Mono execute(KsqlStatementStrategy ksqlStatement) { return webClient.post() .uri(ksqlStatement.getUri()) .accept(new MediaType("application","vnd.ksql.v1+json")) .body(BodyInserters.fromValue(ksqlStatement.getKsqlCommand())) .retrieve() - .bodyToMono(String.class) + .bodyToMono(byte[].class) + .map(this::toJson) .map(ksqlStatement::serializeResponse) .doOnError(log::error); } + + @SneakyThrows + private JsonNode toJson(byte[] content) { + return this.mapper.readTree(content); + } } diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/KsqlController.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/KsqlController.java index 187a6e6563..fb780b4bc0 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/KsqlController.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/KsqlController.java @@ -3,6 +3,7 @@ package com.provectus.kafka.ui.controller; import com.provectus.kafka.ui.api.KsqlApi; import com.provectus.kafka.ui.model.KsqlCommand; +import com.provectus.kafka.ui.model.KsqlResponseTable; import com.provectus.kafka.ui.service.KsqlService; import lombok.RequiredArgsConstructor; import lombok.extern.log4j.Log4j2; @@ -18,9 +19,9 @@ public class KsqlController implements KsqlApi { private final KsqlService ksqlService; @Override - public Mono> executeKsqlCommand(String clusterName, - Mono ksqlCommand, - ServerWebExchange exchange) { - return Mono.just(ResponseEntity.ok(ksqlService.executeKsqlCommand(clusterName, ksqlCommand))); + public Mono> executeKsqlCommand(String clusterName, + Mono ksqlCommand, + ServerWebExchange exchange) { + return ksqlService.executeKsqlCommand(clusterName, ksqlCommand).map(ResponseEntity::ok); } } diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/Feature.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/Feature.java index 94a9fddf02..d624e298ad 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/Feature.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/Feature.java @@ -12,6 +12,7 @@ public enum Feature { .isPresent() ), SCHEMA_REGISTRY(cluster -> cluster.getSchemaRegistry() != null); + // TODO: add feature for FE app private final Predicate isEnabled; diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/KsqlService.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/KsqlService.java index cb0a8eae9e..1280bb8e9a 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/KsqlService.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/KsqlService.java @@ -6,6 +6,7 @@ import com.provectus.kafka.ui.exception.KsqlDbNotFoundException; import com.provectus.kafka.ui.exception.UnprocessableEntityException; import com.provectus.kafka.ui.model.KafkaCluster; import com.provectus.kafka.ui.model.KsqlCommand; +import com.provectus.kafka.ui.model.KsqlResponseTable; import com.provectus.kafka.ui.strategy.ksqlStatement.KsqlStatementStrategy; import lombok.RequiredArgsConstructor; import org.springframework.stereotype.Service; @@ -20,7 +21,7 @@ public class KsqlService { private final ClustersStorage clustersStorage; private final List commandParamsStrategies; - public Mono executeKsqlCommand(String clusterName, Mono ksqlCommand) { + public Mono executeKsqlCommand(String clusterName, Mono ksqlCommand) { return Mono.justOrEmpty(clustersStorage.getClusterByName(clusterName)) .switchIfEmpty(Mono.error(ClusterNotFoundException::new)) .map(KafkaCluster::getKsqldbServer) diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/strategy/ksqlStatement/KsqlStatementStrategy.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/strategy/ksqlStatement/KsqlStatementStrategy.java index 880f981223..7aba214511 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/strategy/ksqlStatement/KsqlStatementStrategy.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/strategy/ksqlStatement/KsqlStatementStrategy.java @@ -1,16 +1,28 @@ package com.provectus.kafka.ui.strategy.ksqlStatement; +import com.fasterxml.jackson.databind.JsonNode; import com.provectus.kafka.ui.model.KsqlCommand; +import com.provectus.kafka.ui.model.KsqlResponseTable; + +import java.util.*; +import java.util.stream.Collectors; +import java.util.stream.StreamSupport; public abstract class KsqlStatementStrategy { protected String host = null; protected KsqlCommand ksqlCommand = null; public String getUri() { - if (this.host != null) { return this.host + this.getRequestPath(); } + if (this.host != null) { + return this.host + this.getRequestPath(); + } return null; } + public boolean test(String sql) { + return sql.trim().toLowerCase().matches(getTestRegExp()); + } + public KsqlStatementStrategy host(String host) { this.host = host; return this; @@ -25,9 +37,52 @@ public abstract class KsqlStatementStrategy { return this; } - public abstract Object serializeResponse(String response); + protected KsqlResponseTable getKsqlTable(JsonNode node) { + KsqlResponseTable table = new KsqlResponseTable(); + table.headers(new ArrayList<>()).rows(new ArrayList<>()); + if (node.size() > 0) { + List keys = getTableHeaders(node.get(0)); + List> rows = getTableRows(node, keys); + table.headers(keys).rows(rows); + } + return table; + } - public abstract boolean test(String sql); + protected KsqlResponseTable serializeTableResponse(JsonNode response, String path) { + if (response.isArray() && response.size() > 0) { + JsonNode first = response.get(0); + JsonNode items = first.path(path); + return this.getKsqlTable(items); + } + throw new InternalError("Invalid data format"); + } + + private List> getTableRows(JsonNode node, List keys) { + if (node.isArray() && node.size() > 0) { + return StreamSupport.stream(node.spliterator(), false) + .map(row -> keys.stream() + .map(header -> row.get(header).asText()) + .collect(Collectors.toList()) + ) + .collect(Collectors.toList()); + } + // TODO: handle + throw new InternalError("Invalid data format"); + } + + private List getTableHeaders(JsonNode node) { + if (node.isObject()) { + return StreamSupport.stream( + Spliterators.spliteratorUnknownSize(node.fieldNames(), Spliterator.ORDERED), false + ).collect(Collectors.toList()); + } + // TODO: handle + throw new InternalError("Invalid data format"); + } + + public abstract KsqlResponseTable serializeResponse(JsonNode response); protected abstract String getRequestPath(); + + protected abstract String getTestRegExp(); } diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/strategy/ksqlStatement/ListStreamsStrategy.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/strategy/ksqlStatement/ListStreamsStrategy.java deleted file mode 100644 index 966a7336ff..0000000000 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/strategy/ksqlStatement/ListStreamsStrategy.java +++ /dev/null @@ -1,29 +0,0 @@ -package com.provectus.kafka.ui.strategy.ksqlStatement; - -import com.google.gson.JsonArray; -import com.google.gson.JsonParser; -import lombok.SneakyThrows; -import org.springframework.stereotype.Component; - - -@Component -public class ListStreamsStrategy extends KsqlStatementStrategy { - private final String requestPath = "/ksql"; - - @SneakyThrows - @Override - public Object serializeResponse(String response) { - JsonArray jsonArray = JsonParser.parseString(response).getAsJsonArray(); - return jsonArray.get(0).getAsJsonObject().get("streams").getAsJsonArray().toString(); - } - - @Override - public boolean test(String sql) { - return sql.trim().toLowerCase().matches("(list|show) streams;"); - } - - @Override - protected String getRequestPath() { - return requestPath; - } -} diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/strategy/ksqlStatement/ListTopicsStrategy.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/strategy/ksqlStatement/ListTopicsStrategy.java deleted file mode 100644 index c2e8933946..0000000000 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/strategy/ksqlStatement/ListTopicsStrategy.java +++ /dev/null @@ -1,26 +0,0 @@ -package com.provectus.kafka.ui.strategy.ksqlStatement; - -import com.google.gson.JsonArray; -import com.google.gson.JsonParser; -import org.springframework.stereotype.Component; - -@Component -public class ListTopicsStrategy extends KsqlStatementStrategy { - private final String requestPath = "/ksql"; - - @Override - public Object serializeResponse(String response) { - JsonArray jsonArray = JsonParser.parseString(response).getAsJsonArray(); - return jsonArray.get(0).getAsJsonObject().get("topics").getAsJsonArray().toString(); - } - - @Override - public boolean test(String sql) { - return sql.trim().toLowerCase().matches("(list|show) topics;"); - } - - @Override - protected String getRequestPath() { - return requestPath; - } -} diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/strategy/ksqlStatement/ShowPropertiesStrategy.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/strategy/ksqlStatement/ShowPropertiesStrategy.java new file mode 100644 index 0000000000..a0b1245142 --- /dev/null +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/strategy/ksqlStatement/ShowPropertiesStrategy.java @@ -0,0 +1,27 @@ +package com.provectus.kafka.ui.strategy.ksqlStatement; + +import com.fasterxml.jackson.databind.JsonNode; +import com.provectus.kafka.ui.model.KsqlResponseTable; +import org.springframework.stereotype.Component; + + +@Component +public class ShowPropertiesStrategy extends KsqlStatementStrategy { + private final String requestPath = "/ksql"; + private final String responseValueKey = "properties"; + + @Override + public KsqlResponseTable serializeResponse(JsonNode response) { + return serializeTableResponse(response, responseValueKey); + } + + @Override + protected String getRequestPath() { + return requestPath; + } + + @Override + protected String getTestRegExp() { + return "show properties;"; + } +} diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/strategy/ksqlStatement/ShowQueriesStrategy.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/strategy/ksqlStatement/ShowQueriesStrategy.java new file mode 100644 index 0000000000..6dc2bd2c68 --- /dev/null +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/strategy/ksqlStatement/ShowQueriesStrategy.java @@ -0,0 +1,27 @@ +package com.provectus.kafka.ui.strategy.ksqlStatement; + +import com.fasterxml.jackson.databind.JsonNode; +import com.provectus.kafka.ui.model.KsqlResponseTable; +import org.springframework.stereotype.Component; + + +@Component +public class ShowQueriesStrategy extends KsqlStatementStrategy { + private final String requestPath = "/ksql"; + private final String responseValueKey = "query"; + + @Override + public KsqlResponseTable serializeResponse(JsonNode response) { + return serializeTableResponse(response, responseValueKey); + } + + @Override + protected String getRequestPath() { + return requestPath; + } + + @Override + protected String getTestRegExp() { + return "show queries;"; + } +} diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/strategy/ksqlStatement/ShowStreamsStrategy.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/strategy/ksqlStatement/ShowStreamsStrategy.java new file mode 100644 index 0000000000..cfcfdaa3ba --- /dev/null +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/strategy/ksqlStatement/ShowStreamsStrategy.java @@ -0,0 +1,27 @@ +package com.provectus.kafka.ui.strategy.ksqlStatement; + +import com.fasterxml.jackson.databind.JsonNode; +import com.provectus.kafka.ui.model.KsqlResponseTable; +import org.springframework.stereotype.Component; + + +@Component +public class ShowStreamsStrategy extends KsqlStatementStrategy { + private final String requestPath = "/ksql"; + private final String responseValueKey = "streams"; + + @Override + public KsqlResponseTable serializeResponse(JsonNode response) { + return serializeTableResponse(response, responseValueKey); + } + + @Override + protected String getRequestPath() { + return requestPath; + } + + @Override + protected String getTestRegExp() { + return "(list|show) streams;"; + } +} diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/strategy/ksqlStatement/ShowTablesStrategy.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/strategy/ksqlStatement/ShowTablesStrategy.java new file mode 100644 index 0000000000..03b35adc9a --- /dev/null +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/strategy/ksqlStatement/ShowTablesStrategy.java @@ -0,0 +1,26 @@ +package com.provectus.kafka.ui.strategy.ksqlStatement; + +import com.fasterxml.jackson.databind.JsonNode; +import com.provectus.kafka.ui.model.KsqlResponseTable; +import org.springframework.stereotype.Component; + +@Component +public class ShowTablesStrategy extends KsqlStatementStrategy { + private final String requestPath = "/ksql"; + private final String responseValueKey = "tables"; + + @Override + public KsqlResponseTable serializeResponse(JsonNode response) { + return serializeTableResponse(response, responseValueKey); + } + + @Override + protected String getRequestPath() { + return requestPath; + } + + @Override + protected String getTestRegExp() { + return "(list|show) tables;"; + } +} diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/strategy/ksqlStatement/ShowTopicsStrategy.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/strategy/ksqlStatement/ShowTopicsStrategy.java new file mode 100644 index 0000000000..7eae256666 --- /dev/null +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/strategy/ksqlStatement/ShowTopicsStrategy.java @@ -0,0 +1,26 @@ +package com.provectus.kafka.ui.strategy.ksqlStatement; + +import com.fasterxml.jackson.databind.JsonNode; +import com.provectus.kafka.ui.model.KsqlResponseTable; +import org.springframework.stereotype.Component; + +@Component +public class ShowTopicsStrategy extends KsqlStatementStrategy { + private final String requestPath = "/ksql"; + private final String responseValueKey = "topics"; + + @Override + public KsqlResponseTable serializeResponse(JsonNode response) { + return serializeTableResponse(response, responseValueKey); + } + + @Override + protected String getRequestPath() { + return requestPath; + } + + @Override + protected String getTestRegExp() { + return "(list|show) topics;"; + } +} diff --git a/kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml b/kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml index 41ae457b05..e753168c39 100644 --- a/kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml +++ b/kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml @@ -1079,7 +1079,7 @@ paths: content: application/json: schema: - type: object + $ref: '#/components/schemas/KsqlResponseTable' /api/clusters/{clusterName}/connects/{connectName}/plugins: get: @@ -1856,6 +1856,22 @@ components: required: - ksql + KsqlResponseTable: + type: object + properties: + headers: + type: array + items: + type: string + rows: + type: array + items: + type: array + items: + type: string + required: + - ksql + FullConnectorInfo: type: object properties: