diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/client/KsqlClient.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/client/KsqlClient.java index be38685d03..c4714b61b7 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/client/KsqlClient.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/client/KsqlClient.java @@ -2,7 +2,7 @@ package com.provectus.kafka.ui.client; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; -import com.provectus.kafka.ui.model.KsqlResponseTable; +import com.provectus.kafka.ui.model.KsqlCommandResponse; import com.provectus.kafka.ui.strategy.ksqlStatement.KsqlStatementStrategy; import lombok.RequiredArgsConstructor; import lombok.SneakyThrows; @@ -20,7 +20,7 @@ public final class KsqlClient { private final WebClient webClient; private final ObjectMapper mapper; - public Mono execute(KsqlStatementStrategy ksqlStatement) { + public Mono execute(KsqlStatementStrategy ksqlStatement) { return webClient.post() .uri(ksqlStatement.getUri()) .accept(new MediaType("application","vnd.ksql.v1+json")) diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/KsqlController.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/KsqlController.java index fb780b4bc0..ef174a1de7 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/KsqlController.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/KsqlController.java @@ -3,7 +3,7 @@ package com.provectus.kafka.ui.controller; import com.provectus.kafka.ui.api.KsqlApi; import com.provectus.kafka.ui.model.KsqlCommand; -import com.provectus.kafka.ui.model.KsqlResponseTable; +import com.provectus.kafka.ui.model.KsqlCommandResponse; import com.provectus.kafka.ui.service.KsqlService; import lombok.RequiredArgsConstructor; import lombok.extern.log4j.Log4j2; @@ -19,9 +19,9 @@ public class KsqlController implements KsqlApi { private final KsqlService ksqlService; @Override - public Mono> executeKsqlCommand(String clusterName, - Mono ksqlCommand, - ServerWebExchange exchange) { + public Mono> executeKsqlCommand(String clusterName, + Mono ksqlCommand, + ServerWebExchange exchange) { return ksqlService.executeKsqlCommand(clusterName, ksqlCommand).map(ResponseEntity::ok); } } diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/KsqlService.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/KsqlService.java index 1280bb8e9a..5b896f578b 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/KsqlService.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/KsqlService.java @@ -6,7 +6,7 @@ import com.provectus.kafka.ui.exception.KsqlDbNotFoundException; import com.provectus.kafka.ui.exception.UnprocessableEntityException; import com.provectus.kafka.ui.model.KafkaCluster; import com.provectus.kafka.ui.model.KsqlCommand; -import com.provectus.kafka.ui.model.KsqlResponseTable; +import com.provectus.kafka.ui.model.KsqlCommandResponse; import com.provectus.kafka.ui.strategy.ksqlStatement.KsqlStatementStrategy; import lombok.RequiredArgsConstructor; import org.springframework.stereotype.Service; @@ -19,9 +19,9 @@ import java.util.List; public class KsqlService { private final KsqlClient ksqlClient; private final ClustersStorage clustersStorage; - private final List commandParamsStrategies; + private final List ksqlStatementStrategies; - public Mono executeKsqlCommand(String clusterName, Mono ksqlCommand) { + public Mono executeKsqlCommand(String clusterName, Mono ksqlCommand) { return Mono.justOrEmpty(clustersStorage.getClusterByName(clusterName)) .switchIfEmpty(Mono.error(ClusterNotFoundException::new)) .map(KafkaCluster::getKsqldbServer) @@ -34,7 +34,7 @@ public class KsqlService { private Mono getStatementStrategyForKsqlCommand(Mono ksqlCommand) { return ksqlCommand - .map(command -> commandParamsStrategies.stream() + .map(command -> ksqlStatementStrategies.stream() .filter(s -> s.test(command.getKsql())) .map(s -> s.ksqlCommand(command)) .findFirst()) diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/strategy/ksqlStatement/CreateStrategy.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/strategy/ksqlStatement/CreateStrategy.java new file mode 100644 index 0000000000..55793322e1 --- /dev/null +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/strategy/ksqlStatement/CreateStrategy.java @@ -0,0 +1,27 @@ +package com.provectus.kafka.ui.strategy.ksqlStatement; + +import com.fasterxml.jackson.databind.JsonNode; +import com.provectus.kafka.ui.model.KsqlCommandResponse; +import org.springframework.stereotype.Component; + + +@Component +public class CreateStrategy extends KsqlStatementStrategy { + private final String requestPath = "/ksql"; + private final String responseValueKey = "commandStatus"; + + @Override + public KsqlCommandResponse serializeResponse(JsonNode response) { + return serializeMessageResponse(response, responseValueKey); + } + + @Override + protected String getRequestPath() { + return requestPath; + } + + @Override + protected String getTestRegExp() { + return "create (table|stream)(.*)(with|as select(.*)from)(.*);"; + } +} diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/strategy/ksqlStatement/ShowQueriesStrategy.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/strategy/ksqlStatement/DescribeStrategy.java similarity index 58% rename from kafka-ui-api/src/main/java/com/provectus/kafka/ui/strategy/ksqlStatement/ShowQueriesStrategy.java rename to kafka-ui-api/src/main/java/com/provectus/kafka/ui/strategy/ksqlStatement/DescribeStrategy.java index 6dc2bd2c68..a6b02636a3 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/strategy/ksqlStatement/ShowQueriesStrategy.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/strategy/ksqlStatement/DescribeStrategy.java @@ -1,17 +1,18 @@ package com.provectus.kafka.ui.strategy.ksqlStatement; import com.fasterxml.jackson.databind.JsonNode; -import com.provectus.kafka.ui.model.KsqlResponseTable; +import com.provectus.kafka.ui.model.KsqlCommandResponse; import org.springframework.stereotype.Component; @Component -public class ShowQueriesStrategy extends KsqlStatementStrategy { +public class DescribeStrategy extends KsqlStatementStrategy { private final String requestPath = "/ksql"; - private final String responseValueKey = "query"; + private final String responseValueKey = "sourceDescription"; @Override - public KsqlResponseTable serializeResponse(JsonNode response) { + public KsqlCommandResponse serializeResponse(JsonNode response) { + System.out.println(response); return serializeTableResponse(response, responseValueKey); } @@ -22,6 +23,6 @@ public class ShowQueriesStrategy extends KsqlStatementStrategy { @Override protected String getTestRegExp() { - return "show queries;"; + return "describe (.*);"; } } diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/strategy/ksqlStatement/ShowStreamsStrategy.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/strategy/ksqlStatement/DropStrategy.java similarity index 51% rename from kafka-ui-api/src/main/java/com/provectus/kafka/ui/strategy/ksqlStatement/ShowStreamsStrategy.java rename to kafka-ui-api/src/main/java/com/provectus/kafka/ui/strategy/ksqlStatement/DropStrategy.java index cfcfdaa3ba..f9f30f3ffe 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/strategy/ksqlStatement/ShowStreamsStrategy.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/strategy/ksqlStatement/DropStrategy.java @@ -1,18 +1,18 @@ package com.provectus.kafka.ui.strategy.ksqlStatement; import com.fasterxml.jackson.databind.JsonNode; -import com.provectus.kafka.ui.model.KsqlResponseTable; +import com.provectus.kafka.ui.model.KsqlCommandResponse; import org.springframework.stereotype.Component; @Component -public class ShowStreamsStrategy extends KsqlStatementStrategy { +public class DropStrategy extends KsqlStatementStrategy { private final String requestPath = "/ksql"; - private final String responseValueKey = "streams"; + private final String responseValueKey = "commandStatus"; @Override - public KsqlResponseTable serializeResponse(JsonNode response) { - return serializeTableResponse(response, responseValueKey); + public KsqlCommandResponse serializeResponse(JsonNode response) { + return serializeMessageResponse(response, responseValueKey); } @Override @@ -22,6 +22,6 @@ public class ShowStreamsStrategy extends KsqlStatementStrategy { @Override protected String getTestRegExp() { - return "(list|show) streams;"; + return "drop (table|stream) (.*);"; } } diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/strategy/ksqlStatement/ShowTopicsStrategy.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/strategy/ksqlStatement/ExplainStrategy.java similarity index 61% rename from kafka-ui-api/src/main/java/com/provectus/kafka/ui/strategy/ksqlStatement/ShowTopicsStrategy.java rename to kafka-ui-api/src/main/java/com/provectus/kafka/ui/strategy/ksqlStatement/ExplainStrategy.java index 7eae256666..a106b46f64 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/strategy/ksqlStatement/ShowTopicsStrategy.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/strategy/ksqlStatement/ExplainStrategy.java @@ -1,16 +1,16 @@ package com.provectus.kafka.ui.strategy.ksqlStatement; import com.fasterxml.jackson.databind.JsonNode; -import com.provectus.kafka.ui.model.KsqlResponseTable; +import com.provectus.kafka.ui.model.KsqlCommandResponse; import org.springframework.stereotype.Component; @Component -public class ShowTopicsStrategy extends KsqlStatementStrategy { +public class ExplainStrategy extends KsqlStatementStrategy { private final String requestPath = "/ksql"; - private final String responseValueKey = "topics"; + private final String responseValueKey = "queryDescription"; @Override - public KsqlResponseTable serializeResponse(JsonNode response) { + public KsqlCommandResponse serializeResponse(JsonNode response) { return serializeTableResponse(response, responseValueKey); } @@ -21,6 +21,6 @@ public class ShowTopicsStrategy extends KsqlStatementStrategy { @Override protected String getTestRegExp() { - return "(list|show) topics;"; + return "explain (.*);"; } } diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/strategy/ksqlStatement/KsqlStatementStrategy.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/strategy/ksqlStatement/KsqlStatementStrategy.java index 7aba214511..6ee2cb0d44 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/strategy/ksqlStatement/KsqlStatementStrategy.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/strategy/ksqlStatement/KsqlStatementStrategy.java @@ -2,10 +2,13 @@ package com.provectus.kafka.ui.strategy.ksqlStatement; import com.fasterxml.jackson.databind.JsonNode; import com.provectus.kafka.ui.model.KsqlCommand; -import com.provectus.kafka.ui.model.KsqlResponseTable; +import com.provectus.kafka.ui.model.KsqlCommandResponse; +import com.provectus.kafka.ui.model.Table; import java.util.*; import java.util.stream.Collectors; +import java.util.stream.IntStream; +import java.util.stream.Stream; import java.util.stream.StreamSupport; public abstract class KsqlStatementStrategy { @@ -37,40 +40,102 @@ public abstract class KsqlStatementStrategy { return this; } - protected KsqlResponseTable getKsqlTable(JsonNode node) { - KsqlResponseTable table = new KsqlResponseTable(); + protected KsqlCommandResponse serializeTableResponse(JsonNode response, String path) { + if (response.isArray() && response.size() > 0) { + KsqlCommandResponse commandResponse = new KsqlCommandResponse(); + JsonNode first = response.get(0); + JsonNode items = first.path(path); + Table table = items.isArray() ? getTableFromArray(items) : getTableFromObject(items); + return commandResponse.data(table); + } + throw new InternalError("Invalid data format"); + } + + protected KsqlCommandResponse serializeMessageResponse(JsonNode response, String path) { + if (response.isArray() && response.size() > 0) { + KsqlCommandResponse commandResponse = new KsqlCommandResponse(); + JsonNode first = response.get(0); + JsonNode item = first.path(path); + return commandResponse.message(getMessageFromObject(item)); + } + // TODO: handle + throw new InternalError("Invalid data format"); + } + + protected KsqlCommandResponse serializeQueryResponse(JsonNode response) { + KsqlCommandResponse commandResponse = new KsqlCommandResponse(); + Table table = (new Table()) + .headers(getQueryResponseHeader(response)) + .rows(getQueryResponseRows(response)); + return commandResponse.data(table); + } + + private List getQueryResponseHeader(JsonNode response) { + JsonNode headerRow = response.get(0); + if (headerRow.isObject() && headerRow.size() > 0) { + String schema = headerRow.get("header").get("schema").asText(); + return Arrays.stream(schema.split(",")).map(s -> s.trim()).collect(Collectors.toList()); + } + return new ArrayList<>(); + } + + private List> getQueryResponseRows(JsonNode node) { + return getStreamForJsonArray(node) + .filter(row -> row.has("row") && row.get("row").has("columns")) + .map(row -> row.get("row").get("columns")) + .map(cellNode -> getStreamForJsonArray(cellNode) + .map(cell -> cell.asText()) + .collect(Collectors.toList()) + ) + .collect(Collectors.toList()); + } + + private Table getTableFromArray(JsonNode node) { + Table table = new Table(); table.headers(new ArrayList<>()).rows(new ArrayList<>()); if (node.size() > 0) { - List keys = getTableHeaders(node.get(0)); + List keys = getJsonObjectKeys(node.get(0)); List> rows = getTableRows(node, keys); table.headers(keys).rows(rows); } return table; } - protected KsqlResponseTable serializeTableResponse(JsonNode response, String path) { - if (response.isArray() && response.size() > 0) { - JsonNode first = response.get(0); - JsonNode items = first.path(path); - return this.getKsqlTable(items); + private Table getTableFromObject(JsonNode node) { + List keys = getJsonObjectKeys(node); + List values = getJsonObjectValues(node); + List> rows = IntStream + .range(0, keys.size()) + .mapToObj(i -> List.of(keys.get(i), values.get(i))) + .collect(Collectors.toList()); + return (new Table()).headers(List.of("key", "value")).rows(rows); + } + + private String getMessageFromObject(JsonNode node) { + if (node.isObject() && node.has("message")) { + return node.get("message").asText(); } - throw new InternalError("Invalid data format"); + throw new InternalError("can't get message from empty object or array"); } private List> getTableRows(JsonNode node, List keys) { - if (node.isArray() && node.size() > 0) { - return StreamSupport.stream(node.spliterator(), false) - .map(row -> keys.stream() - .map(header -> row.get(header).asText()) - .collect(Collectors.toList()) - ) - .collect(Collectors.toList()); - } - // TODO: handle - throw new InternalError("Invalid data format"); + return getStreamForJsonArray(node) + .map(row -> keys.stream() + .map(header -> row.get(header).asText()) + .collect(Collectors.toList()) + ) + .collect(Collectors.toList()); } - private List getTableHeaders(JsonNode node) { + private Stream getStreamForJsonArray(JsonNode node) { + if (node.isArray() && node.size() > 0) { + return StreamSupport.stream(node.spliterator(), false); + } + // TODO: handle + throw new InternalError("not JsonArray or empty"); + } + + private List getJsonObjectKeys(JsonNode node) { if (node.isObject()) { return StreamSupport.stream( Spliterators.spliteratorUnknownSize(node.fieldNames(), Spliterator.ORDERED), false @@ -80,7 +145,12 @@ public abstract class KsqlStatementStrategy { throw new InternalError("Invalid data format"); } - public abstract KsqlResponseTable serializeResponse(JsonNode response); + private List getJsonObjectValues(JsonNode node) { + return getJsonObjectKeys(node).stream().map(key -> node.get(key).asText()) + .collect(Collectors.toList()); + } + + public abstract KsqlCommandResponse serializeResponse(JsonNode response); protected abstract String getRequestPath(); diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/strategy/ksqlStatement/ListStrategy.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/strategy/ksqlStatement/ListStrategy.java new file mode 100644 index 0000000000..4ec19536b4 --- /dev/null +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/strategy/ksqlStatement/ListStrategy.java @@ -0,0 +1,50 @@ +package com.provectus.kafka.ui.strategy.ksqlStatement; + +import com.fasterxml.jackson.databind.JsonNode; +import com.provectus.kafka.ui.model.KsqlCommandResponse; +import org.springframework.stereotype.Component; + +import java.util.List; +import java.util.Optional; + +@Component +public class ListStrategy extends KsqlStatementStrategy { + private final String requestPath = "/ksql"; + private String responseValueKey = ""; + private final List statements = List.of("functions", "topics", "streams", "tables"); + + @Override + public KsqlCommandResponse serializeResponse(JsonNode response) { + return serializeTableResponse(response, responseValueKey); + } + + @Override + protected String getRequestPath() { + return requestPath; + } + + @Override + public boolean test(String sql) { + Optional statement = statements.stream() + .filter(s -> sql.trim().toLowerCase().matches(getTestRegExp(s))) + .findFirst(); + if (statement.isPresent()) { + setResponseValueKey(statement.get()); + return true; + } + return false; + } + + @Override + protected String getTestRegExp() { + return ""; + } + + private String getTestRegExp(String key) { + return "list " + key + ";"; + } + + private void setResponseValueKey(String path) { + responseValueKey = path; + } +} diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/strategy/ksqlStatement/SelectStrategy.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/strategy/ksqlStatement/SelectStrategy.java new file mode 100644 index 0000000000..309578a826 --- /dev/null +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/strategy/ksqlStatement/SelectStrategy.java @@ -0,0 +1,27 @@ +package com.provectus.kafka.ui.strategy.ksqlStatement; + +import com.fasterxml.jackson.databind.JsonNode; +import com.provectus.kafka.ui.model.KsqlCommandResponse; +import org.springframework.stereotype.Component; + +@Component +public class SelectStrategy extends KsqlStatementStrategy { + private final String requestPath = "/query"; + + @Override + public KsqlCommandResponse serializeResponse(JsonNode response) { + System.out.println(response); + return serializeQueryResponse(response); + } + + @Override + protected String getRequestPath() { + return requestPath; + } + + + @Override + protected String getTestRegExp() { + return "select (.*) from (.*);"; + } +} diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/strategy/ksqlStatement/ShowStrategy.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/strategy/ksqlStatement/ShowStrategy.java new file mode 100644 index 0000000000..fede08189f --- /dev/null +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/strategy/ksqlStatement/ShowStrategy.java @@ -0,0 +1,51 @@ +package com.provectus.kafka.ui.strategy.ksqlStatement; + +import com.fasterxml.jackson.databind.JsonNode; +import com.provectus.kafka.ui.model.KsqlCommandResponse; +import org.springframework.stereotype.Component; + +import java.util.List; +import java.util.Optional; + +@Component +public class ShowStrategy extends KsqlStatementStrategy { + private final String requestPath = "/ksql"; + private String responseValueKey = ""; + private final List statements = List.of("functions", "topics", "streams", "tables", "queries", "properties"); + + @Override + public KsqlCommandResponse serializeResponse(JsonNode response) { + return serializeTableResponse(response, responseValueKey); + } + + @Override + protected String getRequestPath() { + return requestPath; + } + + @Override + public boolean test(String sql) { + Optional statement = statements.stream() + .filter(s -> sql.trim().toLowerCase().matches(getTestRegExp(s))) + .findFirst(); + if (statement.isPresent()) { + setResponseValueKey(statement.get()); + return true; + } + return false; + } + + @Override + protected String getTestRegExp() { + return ""; + } + + private String getTestRegExp(String key) { + return "show " + key + ";"; + } + + private void setResponseValueKey(String path) { + responseValueKey = path; + } + +} diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/strategy/ksqlStatement/ShowTablesStrategy.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/strategy/ksqlStatement/ShowTablesStrategy.java deleted file mode 100644 index 03b35adc9a..0000000000 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/strategy/ksqlStatement/ShowTablesStrategy.java +++ /dev/null @@ -1,26 +0,0 @@ -package com.provectus.kafka.ui.strategy.ksqlStatement; - -import com.fasterxml.jackson.databind.JsonNode; -import com.provectus.kafka.ui.model.KsqlResponseTable; -import org.springframework.stereotype.Component; - -@Component -public class ShowTablesStrategy extends KsqlStatementStrategy { - private final String requestPath = "/ksql"; - private final String responseValueKey = "tables"; - - @Override - public KsqlResponseTable serializeResponse(JsonNode response) { - return serializeTableResponse(response, responseValueKey); - } - - @Override - protected String getRequestPath() { - return requestPath; - } - - @Override - protected String getTestRegExp() { - return "(list|show) tables;"; - } -} diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/strategy/ksqlStatement/ShowPropertiesStrategy.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/strategy/ksqlStatement/TerminateStrategy.java similarity index 52% rename from kafka-ui-api/src/main/java/com/provectus/kafka/ui/strategy/ksqlStatement/ShowPropertiesStrategy.java rename to kafka-ui-api/src/main/java/com/provectus/kafka/ui/strategy/ksqlStatement/TerminateStrategy.java index a0b1245142..34094cf869 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/strategy/ksqlStatement/ShowPropertiesStrategy.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/strategy/ksqlStatement/TerminateStrategy.java @@ -1,18 +1,17 @@ package com.provectus.kafka.ui.strategy.ksqlStatement; import com.fasterxml.jackson.databind.JsonNode; -import com.provectus.kafka.ui.model.KsqlResponseTable; +import com.provectus.kafka.ui.model.KsqlCommandResponse; import org.springframework.stereotype.Component; - @Component -public class ShowPropertiesStrategy extends KsqlStatementStrategy { +public class TerminateStrategy extends KsqlStatementStrategy { private final String requestPath = "/ksql"; - private final String responseValueKey = "properties"; + private final String responseValueKey = "commandStatus"; @Override - public KsqlResponseTable serializeResponse(JsonNode response) { - return serializeTableResponse(response, responseValueKey); + public KsqlCommandResponse serializeResponse(JsonNode response) { + return serializeMessageResponse(response, responseValueKey); } @Override @@ -22,6 +21,6 @@ public class ShowPropertiesStrategy extends KsqlStatementStrategy { @Override protected String getTestRegExp() { - return "show properties;"; + return "terminate (.*);"; } } diff --git a/kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml b/kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml index e753168c39..31e7ccf4fc 100644 --- a/kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml +++ b/kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml @@ -1079,7 +1079,7 @@ paths: content: application/json: schema: - $ref: '#/components/schemas/KsqlResponseTable' + $ref: '#/components/schemas/KsqlCommandResponse' /api/clusters/{clusterName}/connects/{connectName}/plugins: get: @@ -1852,11 +1852,21 @@ components: ksql: type: string streamsProperties: - type: string + type: object + additionalProperties: + type: string required: - ksql - KsqlResponseTable: + KsqlCommandResponse: + type: object + properties: + data: + $ref: '#/components/schemas/Table' + message: + type: string + + Table: type: object properties: headers: @@ -1870,7 +1880,8 @@ components: items: type: string required: - - ksql + - headers + - rows FullConnectorInfo: type: object