[#207] feat(api, contract): added strategies for statements, return message or data with datble as response

This commit is contained in:
Ilnur Farukhshin 2021-07-20 15:39:34 +03:00
parent 3686e75c8f
commit fb68d19802
14 changed files with 295 additions and 85 deletions

View file

@ -2,7 +2,7 @@ package com.provectus.kafka.ui.client;
import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
import com.provectus.kafka.ui.model.KsqlResponseTable; import com.provectus.kafka.ui.model.KsqlCommandResponse;
import com.provectus.kafka.ui.strategy.ksqlStatement.KsqlStatementStrategy; import com.provectus.kafka.ui.strategy.ksqlStatement.KsqlStatementStrategy;
import lombok.RequiredArgsConstructor; import lombok.RequiredArgsConstructor;
import lombok.SneakyThrows; import lombok.SneakyThrows;
@ -20,7 +20,7 @@ public final class KsqlClient {
private final WebClient webClient; private final WebClient webClient;
private final ObjectMapper mapper; private final ObjectMapper mapper;
public Mono<KsqlResponseTable> execute(KsqlStatementStrategy ksqlStatement) { public Mono<KsqlCommandResponse> execute(KsqlStatementStrategy ksqlStatement) {
return webClient.post() return webClient.post()
.uri(ksqlStatement.getUri()) .uri(ksqlStatement.getUri())
.accept(new MediaType("application","vnd.ksql.v1+json")) .accept(new MediaType("application","vnd.ksql.v1+json"))

View file

@ -3,7 +3,7 @@ package com.provectus.kafka.ui.controller;
import com.provectus.kafka.ui.api.KsqlApi; import com.provectus.kafka.ui.api.KsqlApi;
import com.provectus.kafka.ui.model.KsqlCommand; import com.provectus.kafka.ui.model.KsqlCommand;
import com.provectus.kafka.ui.model.KsqlResponseTable; import com.provectus.kafka.ui.model.KsqlCommandResponse;
import com.provectus.kafka.ui.service.KsqlService; import com.provectus.kafka.ui.service.KsqlService;
import lombok.RequiredArgsConstructor; import lombok.RequiredArgsConstructor;
import lombok.extern.log4j.Log4j2; import lombok.extern.log4j.Log4j2;
@ -19,9 +19,9 @@ public class KsqlController implements KsqlApi {
private final KsqlService ksqlService; private final KsqlService ksqlService;
@Override @Override
public Mono<ResponseEntity<KsqlResponseTable>> executeKsqlCommand(String clusterName, public Mono<ResponseEntity<KsqlCommandResponse>> executeKsqlCommand(String clusterName,
Mono<KsqlCommand> ksqlCommand, Mono<KsqlCommand> ksqlCommand,
ServerWebExchange exchange) { ServerWebExchange exchange) {
return ksqlService.executeKsqlCommand(clusterName, ksqlCommand).map(ResponseEntity::ok); return ksqlService.executeKsqlCommand(clusterName, ksqlCommand).map(ResponseEntity::ok);
} }
} }

View file

@ -6,7 +6,7 @@ import com.provectus.kafka.ui.exception.KsqlDbNotFoundException;
import com.provectus.kafka.ui.exception.UnprocessableEntityException; import com.provectus.kafka.ui.exception.UnprocessableEntityException;
import com.provectus.kafka.ui.model.KafkaCluster; import com.provectus.kafka.ui.model.KafkaCluster;
import com.provectus.kafka.ui.model.KsqlCommand; import com.provectus.kafka.ui.model.KsqlCommand;
import com.provectus.kafka.ui.model.KsqlResponseTable; import com.provectus.kafka.ui.model.KsqlCommandResponse;
import com.provectus.kafka.ui.strategy.ksqlStatement.KsqlStatementStrategy; import com.provectus.kafka.ui.strategy.ksqlStatement.KsqlStatementStrategy;
import lombok.RequiredArgsConstructor; import lombok.RequiredArgsConstructor;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
@ -19,9 +19,9 @@ import java.util.List;
public class KsqlService { public class KsqlService {
private final KsqlClient ksqlClient; private final KsqlClient ksqlClient;
private final ClustersStorage clustersStorage; private final ClustersStorage clustersStorage;
private final List<KsqlStatementStrategy> commandParamsStrategies; private final List<KsqlStatementStrategy> ksqlStatementStrategies;
public Mono<KsqlResponseTable> executeKsqlCommand(String clusterName, Mono<KsqlCommand> ksqlCommand) { public Mono<KsqlCommandResponse> executeKsqlCommand(String clusterName, Mono<KsqlCommand> ksqlCommand) {
return Mono.justOrEmpty(clustersStorage.getClusterByName(clusterName)) return Mono.justOrEmpty(clustersStorage.getClusterByName(clusterName))
.switchIfEmpty(Mono.error(ClusterNotFoundException::new)) .switchIfEmpty(Mono.error(ClusterNotFoundException::new))
.map(KafkaCluster::getKsqldbServer) .map(KafkaCluster::getKsqldbServer)
@ -34,7 +34,7 @@ public class KsqlService {
private Mono<KsqlStatementStrategy> getStatementStrategyForKsqlCommand(Mono<KsqlCommand> ksqlCommand) { private Mono<KsqlStatementStrategy> getStatementStrategyForKsqlCommand(Mono<KsqlCommand> ksqlCommand) {
return ksqlCommand return ksqlCommand
.map(command -> commandParamsStrategies.stream() .map(command -> ksqlStatementStrategies.stream()
.filter(s -> s.test(command.getKsql())) .filter(s -> s.test(command.getKsql()))
.map(s -> s.ksqlCommand(command)) .map(s -> s.ksqlCommand(command))
.findFirst()) .findFirst())

View file

@ -0,0 +1,27 @@
package com.provectus.kafka.ui.strategy.ksqlStatement;
import com.fasterxml.jackson.databind.JsonNode;
import com.provectus.kafka.ui.model.KsqlCommandResponse;
import org.springframework.stereotype.Component;
@Component
public class CreateStrategy extends KsqlStatementStrategy {
private final String requestPath = "/ksql";
private final String responseValueKey = "commandStatus";
@Override
public KsqlCommandResponse serializeResponse(JsonNode response) {
return serializeMessageResponse(response, responseValueKey);
}
@Override
protected String getRequestPath() {
return requestPath;
}
@Override
protected String getTestRegExp() {
return "create (table|stream)(.*)(with|as select(.*)from)(.*);";
}
}

View file

@ -1,17 +1,18 @@
package com.provectus.kafka.ui.strategy.ksqlStatement; package com.provectus.kafka.ui.strategy.ksqlStatement;
import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.JsonNode;
import com.provectus.kafka.ui.model.KsqlResponseTable; import com.provectus.kafka.ui.model.KsqlCommandResponse;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
@Component @Component
public class ShowQueriesStrategy extends KsqlStatementStrategy { public class DescribeStrategy extends KsqlStatementStrategy {
private final String requestPath = "/ksql"; private final String requestPath = "/ksql";
private final String responseValueKey = "query"; private final String responseValueKey = "sourceDescription";
@Override @Override
public KsqlResponseTable serializeResponse(JsonNode response) { public KsqlCommandResponse serializeResponse(JsonNode response) {
System.out.println(response);
return serializeTableResponse(response, responseValueKey); return serializeTableResponse(response, responseValueKey);
} }
@ -22,6 +23,6 @@ public class ShowQueriesStrategy extends KsqlStatementStrategy {
@Override @Override
protected String getTestRegExp() { protected String getTestRegExp() {
return "show queries;"; return "describe (.*);";
} }
} }

View file

@ -1,18 +1,18 @@
package com.provectus.kafka.ui.strategy.ksqlStatement; package com.provectus.kafka.ui.strategy.ksqlStatement;
import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.JsonNode;
import com.provectus.kafka.ui.model.KsqlResponseTable; import com.provectus.kafka.ui.model.KsqlCommandResponse;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
@Component @Component
public class ShowStreamsStrategy extends KsqlStatementStrategy { public class DropStrategy extends KsqlStatementStrategy {
private final String requestPath = "/ksql"; private final String requestPath = "/ksql";
private final String responseValueKey = "streams"; private final String responseValueKey = "commandStatus";
@Override @Override
public KsqlResponseTable serializeResponse(JsonNode response) { public KsqlCommandResponse serializeResponse(JsonNode response) {
return serializeTableResponse(response, responseValueKey); return serializeMessageResponse(response, responseValueKey);
} }
@Override @Override
@ -22,6 +22,6 @@ public class ShowStreamsStrategy extends KsqlStatementStrategy {
@Override @Override
protected String getTestRegExp() { protected String getTestRegExp() {
return "(list|show) streams;"; return "drop (table|stream) (.*);";
} }
} }

View file

@ -1,16 +1,16 @@
package com.provectus.kafka.ui.strategy.ksqlStatement; package com.provectus.kafka.ui.strategy.ksqlStatement;
import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.JsonNode;
import com.provectus.kafka.ui.model.KsqlResponseTable; import com.provectus.kafka.ui.model.KsqlCommandResponse;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
@Component @Component
public class ShowTopicsStrategy extends KsqlStatementStrategy { public class ExplainStrategy extends KsqlStatementStrategy {
private final String requestPath = "/ksql"; private final String requestPath = "/ksql";
private final String responseValueKey = "topics"; private final String responseValueKey = "queryDescription";
@Override @Override
public KsqlResponseTable serializeResponse(JsonNode response) { public KsqlCommandResponse serializeResponse(JsonNode response) {
return serializeTableResponse(response, responseValueKey); return serializeTableResponse(response, responseValueKey);
} }
@ -21,6 +21,6 @@ public class ShowTopicsStrategy extends KsqlStatementStrategy {
@Override @Override
protected String getTestRegExp() { protected String getTestRegExp() {
return "(list|show) topics;"; return "explain (.*);";
} }
} }

View file

@ -2,10 +2,13 @@ package com.provectus.kafka.ui.strategy.ksqlStatement;
import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.JsonNode;
import com.provectus.kafka.ui.model.KsqlCommand; import com.provectus.kafka.ui.model.KsqlCommand;
import com.provectus.kafka.ui.model.KsqlResponseTable; import com.provectus.kafka.ui.model.KsqlCommandResponse;
import com.provectus.kafka.ui.model.Table;
import java.util.*; import java.util.*;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;
import java.util.stream.StreamSupport; import java.util.stream.StreamSupport;
public abstract class KsqlStatementStrategy { public abstract class KsqlStatementStrategy {
@ -37,40 +40,102 @@ public abstract class KsqlStatementStrategy {
return this; return this;
} }
protected KsqlResponseTable getKsqlTable(JsonNode node) { protected KsqlCommandResponse serializeTableResponse(JsonNode response, String path) {
KsqlResponseTable table = new KsqlResponseTable(); if (response.isArray() && response.size() > 0) {
KsqlCommandResponse commandResponse = new KsqlCommandResponse();
JsonNode first = response.get(0);
JsonNode items = first.path(path);
Table table = items.isArray() ? getTableFromArray(items) : getTableFromObject(items);
return commandResponse.data(table);
}
throw new InternalError("Invalid data format");
}
protected KsqlCommandResponse serializeMessageResponse(JsonNode response, String path) {
if (response.isArray() && response.size() > 0) {
KsqlCommandResponse commandResponse = new KsqlCommandResponse();
JsonNode first = response.get(0);
JsonNode item = first.path(path);
return commandResponse.message(getMessageFromObject(item));
}
// TODO: handle
throw new InternalError("Invalid data format");
}
protected KsqlCommandResponse serializeQueryResponse(JsonNode response) {
KsqlCommandResponse commandResponse = new KsqlCommandResponse();
Table table = (new Table())
.headers(getQueryResponseHeader(response))
.rows(getQueryResponseRows(response));
return commandResponse.data(table);
}
private List<String> getQueryResponseHeader(JsonNode response) {
JsonNode headerRow = response.get(0);
if (headerRow.isObject() && headerRow.size() > 0) {
String schema = headerRow.get("header").get("schema").asText();
return Arrays.stream(schema.split(",")).map(s -> s.trim()).collect(Collectors.toList());
}
return new ArrayList<>();
}
private List<List<String>> getQueryResponseRows(JsonNode node) {
return getStreamForJsonArray(node)
.filter(row -> row.has("row") && row.get("row").has("columns"))
.map(row -> row.get("row").get("columns"))
.map(cellNode -> getStreamForJsonArray(cellNode)
.map(cell -> cell.asText())
.collect(Collectors.toList())
)
.collect(Collectors.toList());
}
private Table getTableFromArray(JsonNode node) {
Table table = new Table();
table.headers(new ArrayList<>()).rows(new ArrayList<>()); table.headers(new ArrayList<>()).rows(new ArrayList<>());
if (node.size() > 0) { if (node.size() > 0) {
List<String> keys = getTableHeaders(node.get(0)); List<String> keys = getJsonObjectKeys(node.get(0));
List<List<String>> rows = getTableRows(node, keys); List<List<String>> rows = getTableRows(node, keys);
table.headers(keys).rows(rows); table.headers(keys).rows(rows);
} }
return table; return table;
} }
protected KsqlResponseTable serializeTableResponse(JsonNode response, String path) { private Table getTableFromObject(JsonNode node) {
if (response.isArray() && response.size() > 0) { List<String> keys = getJsonObjectKeys(node);
JsonNode first = response.get(0); List<String> values = getJsonObjectValues(node);
JsonNode items = first.path(path); List<List<String>> rows = IntStream
return this.getKsqlTable(items); .range(0, keys.size())
.mapToObj(i -> List.of(keys.get(i), values.get(i)))
.collect(Collectors.toList());
return (new Table()).headers(List.of("key", "value")).rows(rows);
}
private String getMessageFromObject(JsonNode node) {
if (node.isObject() && node.has("message")) {
return node.get("message").asText();
} }
throw new InternalError("Invalid data format"); throw new InternalError("can't get message from empty object or array");
} }
private List<List<String>> getTableRows(JsonNode node, List<String> keys) { private List<List<String>> getTableRows(JsonNode node, List<String> keys) {
if (node.isArray() && node.size() > 0) { return getStreamForJsonArray(node)
return StreamSupport.stream(node.spliterator(), false) .map(row -> keys.stream()
.map(row -> keys.stream() .map(header -> row.get(header).asText())
.map(header -> row.get(header).asText()) .collect(Collectors.toList())
.collect(Collectors.toList()) )
) .collect(Collectors.toList());
.collect(Collectors.toList());
}
// TODO: handle
throw new InternalError("Invalid data format");
} }
private List<String> getTableHeaders(JsonNode node) { private Stream<JsonNode> getStreamForJsonArray(JsonNode node) {
if (node.isArray() && node.size() > 0) {
return StreamSupport.stream(node.spliterator(), false);
}
// TODO: handle
throw new InternalError("not JsonArray or empty");
}
private List<String> getJsonObjectKeys(JsonNode node) {
if (node.isObject()) { if (node.isObject()) {
return StreamSupport.stream( return StreamSupport.stream(
Spliterators.spliteratorUnknownSize(node.fieldNames(), Spliterator.ORDERED), false Spliterators.spliteratorUnknownSize(node.fieldNames(), Spliterator.ORDERED), false
@ -80,7 +145,12 @@ public abstract class KsqlStatementStrategy {
throw new InternalError("Invalid data format"); throw new InternalError("Invalid data format");
} }
public abstract KsqlResponseTable serializeResponse(JsonNode response); private List<String> getJsonObjectValues(JsonNode node) {
return getJsonObjectKeys(node).stream().map(key -> node.get(key).asText())
.collect(Collectors.toList());
}
public abstract KsqlCommandResponse serializeResponse(JsonNode response);
protected abstract String getRequestPath(); protected abstract String getRequestPath();

View file

@ -0,0 +1,50 @@
package com.provectus.kafka.ui.strategy.ksqlStatement;
import com.fasterxml.jackson.databind.JsonNode;
import com.provectus.kafka.ui.model.KsqlCommandResponse;
import org.springframework.stereotype.Component;
import java.util.List;
import java.util.Optional;
@Component
public class ListStrategy extends KsqlStatementStrategy {
private final String requestPath = "/ksql";
private String responseValueKey = "";
private final List<String> statements = List.of("functions", "topics", "streams", "tables");
@Override
public KsqlCommandResponse serializeResponse(JsonNode response) {
return serializeTableResponse(response, responseValueKey);
}
@Override
protected String getRequestPath() {
return requestPath;
}
@Override
public boolean test(String sql) {
Optional<String> statement = statements.stream()
.filter(s -> sql.trim().toLowerCase().matches(getTestRegExp(s)))
.findFirst();
if (statement.isPresent()) {
setResponseValueKey(statement.get());
return true;
}
return false;
}
@Override
protected String getTestRegExp() {
return "";
}
private String getTestRegExp(String key) {
return "list " + key + ";";
}
private void setResponseValueKey(String path) {
responseValueKey = path;
}
}

View file

@ -0,0 +1,27 @@
package com.provectus.kafka.ui.strategy.ksqlStatement;
import com.fasterxml.jackson.databind.JsonNode;
import com.provectus.kafka.ui.model.KsqlCommandResponse;
import org.springframework.stereotype.Component;
@Component
public class SelectStrategy extends KsqlStatementStrategy {
private final String requestPath = "/query";
@Override
public KsqlCommandResponse serializeResponse(JsonNode response) {
System.out.println(response);
return serializeQueryResponse(response);
}
@Override
protected String getRequestPath() {
return requestPath;
}
@Override
protected String getTestRegExp() {
return "select (.*) from (.*);";
}
}

View file

@ -0,0 +1,51 @@
package com.provectus.kafka.ui.strategy.ksqlStatement;
import com.fasterxml.jackson.databind.JsonNode;
import com.provectus.kafka.ui.model.KsqlCommandResponse;
import org.springframework.stereotype.Component;
import java.util.List;
import java.util.Optional;
@Component
public class ShowStrategy extends KsqlStatementStrategy {
private final String requestPath = "/ksql";
private String responseValueKey = "";
private final List<String> statements = List.of("functions", "topics", "streams", "tables", "queries", "properties");
@Override
public KsqlCommandResponse serializeResponse(JsonNode response) {
return serializeTableResponse(response, responseValueKey);
}
@Override
protected String getRequestPath() {
return requestPath;
}
@Override
public boolean test(String sql) {
Optional<String> statement = statements.stream()
.filter(s -> sql.trim().toLowerCase().matches(getTestRegExp(s)))
.findFirst();
if (statement.isPresent()) {
setResponseValueKey(statement.get());
return true;
}
return false;
}
@Override
protected String getTestRegExp() {
return "";
}
private String getTestRegExp(String key) {
return "show " + key + ";";
}
private void setResponseValueKey(String path) {
responseValueKey = path;
}
}

View file

@ -1,26 +0,0 @@
package com.provectus.kafka.ui.strategy.ksqlStatement;
import com.fasterxml.jackson.databind.JsonNode;
import com.provectus.kafka.ui.model.KsqlResponseTable;
import org.springframework.stereotype.Component;
@Component
public class ShowTablesStrategy extends KsqlStatementStrategy {
private final String requestPath = "/ksql";
private final String responseValueKey = "tables";
@Override
public KsqlResponseTable serializeResponse(JsonNode response) {
return serializeTableResponse(response, responseValueKey);
}
@Override
protected String getRequestPath() {
return requestPath;
}
@Override
protected String getTestRegExp() {
return "(list|show) tables;";
}
}

View file

@ -1,18 +1,17 @@
package com.provectus.kafka.ui.strategy.ksqlStatement; package com.provectus.kafka.ui.strategy.ksqlStatement;
import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.JsonNode;
import com.provectus.kafka.ui.model.KsqlResponseTable; import com.provectus.kafka.ui.model.KsqlCommandResponse;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
@Component @Component
public class ShowPropertiesStrategy extends KsqlStatementStrategy { public class TerminateStrategy extends KsqlStatementStrategy {
private final String requestPath = "/ksql"; private final String requestPath = "/ksql";
private final String responseValueKey = "properties"; private final String responseValueKey = "commandStatus";
@Override @Override
public KsqlResponseTable serializeResponse(JsonNode response) { public KsqlCommandResponse serializeResponse(JsonNode response) {
return serializeTableResponse(response, responseValueKey); return serializeMessageResponse(response, responseValueKey);
} }
@Override @Override
@ -22,6 +21,6 @@ public class ShowPropertiesStrategy extends KsqlStatementStrategy {
@Override @Override
protected String getTestRegExp() { protected String getTestRegExp() {
return "show properties;"; return "terminate (.*);";
} }
} }

View file

@ -1079,7 +1079,7 @@ paths:
content: content:
application/json: application/json:
schema: schema:
$ref: '#/components/schemas/KsqlResponseTable' $ref: '#/components/schemas/KsqlCommandResponse'
/api/clusters/{clusterName}/connects/{connectName}/plugins: /api/clusters/{clusterName}/connects/{connectName}/plugins:
get: get:
@ -1852,11 +1852,21 @@ components:
ksql: ksql:
type: string type: string
streamsProperties: streamsProperties:
type: string type: object
additionalProperties:
type: string
required: required:
- ksql - ksql
KsqlResponseTable: KsqlCommandResponse:
type: object
properties:
data:
$ref: '#/components/schemas/Table'
message:
type: string
Table:
type: object type: object
properties: properties:
headers: headers:
@ -1870,7 +1880,8 @@ components:
items: items:
type: string type: string
required: required:
- ksql - headers
- rows
FullConnectorInfo: FullConnectorInfo:
type: object type: object