diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/client/KsqlClient.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/client/KsqlClient.java new file mode 100644 index 0000000000..96fe7e9e85 --- /dev/null +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/client/KsqlClient.java @@ -0,0 +1,30 @@ +package com.provectus.kafka.ui.client; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.provectus.kafka.ui.strategy.ksqlStatement.KsqlStatementStrategy; +import lombok.RequiredArgsConstructor; +import lombok.extern.log4j.Log4j2; +import org.springframework.http.MediaType; +import org.springframework.stereotype.Service; +import org.springframework.web.reactive.function.BodyInserters; +import org.springframework.web.reactive.function.client.WebClient; +import reactor.core.publisher.Mono; + +@Service +@RequiredArgsConstructor +@Log4j2 +public final class KsqlClient { + private final WebClient webClient; + private ObjectMapper objectMapper; + + public Mono execute(KsqlStatementStrategy ksqlStatement) { + return webClient.post() + .uri(ksqlStatement.getUri()) + .accept(new MediaType("application","vnd.ksql.v1+json")) + .body(BodyInserters.fromValue(ksqlStatement.getKsqlCommand())) + .retrieve() + .bodyToMono(String.class) + .map(ksqlStatement::serializeResponse) + .doOnError(log::error); + } +} diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/config/ClustersProperties.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/config/ClustersProperties.java index 2eee932958..2a1883287b 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/config/ClustersProperties.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/config/ClustersProperties.java @@ -20,6 +20,7 @@ public class ClustersProperties { String bootstrapServers; String zookeeper; String schemaRegistry; + String ksqldbServer; String schemaNameTemplate = "%s-value"; String protobufFile; String protobufMessageName; diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/KsqlController.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/KsqlController.java new file mode 100644 index 0000000000..8017aaaa24 --- /dev/null +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/KsqlController.java @@ -0,0 +1,26 @@ +package com.provectus.kafka.ui.controller; + +import com.provectus.kafka.ui.api.KsqlApi; +import com.provectus.kafka.ui.model.KsqlCommand; + +import com.provectus.kafka.ui.service.KsqlService; +import lombok.RequiredArgsConstructor; +import lombok.extern.log4j.Log4j2; +import org.springframework.http.ResponseEntity; +import org.springframework.web.bind.annotation.RestController; +import org.springframework.web.server.ServerWebExchange; +import reactor.core.publisher.Mono; + +@RestController +@RequiredArgsConstructor +@Log4j2 +public class KsqlController implements KsqlApi { + private final KsqlService ksqlService; + + @Override + public Mono> executeKsqlCommand(String clusterName, + Mono ksqlCommand, + ServerWebExchange exchange) { + return Mono.just(ResponseEntity.ok(ksqlService.getListStreams(clusterName, ksqlCommand))); + } +} diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/exception/ErrorCode.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/exception/ErrorCode.java index 4517507764..f50597d710 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/exception/ErrorCode.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/exception/ErrorCode.java @@ -17,7 +17,8 @@ public enum ErrorCode { CLUSTER_NOT_FOUND(4007, HttpStatus.NOT_FOUND), TOPIC_NOT_FOUND(4008, HttpStatus.NOT_FOUND), SCHEMA_NOT_FOUND(4009, HttpStatus.NOT_FOUND), - CONNECT_NOT_FOUND(4010, HttpStatus.NOT_FOUND); + CONNECT_NOT_FOUND(4010, HttpStatus.NOT_FOUND), + KSQLDB_NOT_FOUND(4011, HttpStatus.NOT_FOUND); static { // codes uniqueness check diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/exception/KsqlDbNotFoundException.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/exception/KsqlDbNotFoundException.java new file mode 100644 index 0000000000..c896ab0b98 --- /dev/null +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/exception/KsqlDbNotFoundException.java @@ -0,0 +1,13 @@ +package com.provectus.kafka.ui.exception; + +public class KsqlDbNotFoundException extends CustomBaseException { + + public KsqlDbNotFoundException() { + super("KSQL DB not found"); + } + + @Override + public ErrorCode getErrorCode() { + return ErrorCode.KSQLDB_NOT_FOUND; + } +} diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/KafkaCluster.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/KafkaCluster.java index 3c9e914a66..b396a63bcb 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/KafkaCluster.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/KafkaCluster.java @@ -14,6 +14,7 @@ public class KafkaCluster { private final Integer jmxPort; private final String bootstrapServers; private final String zookeeper; + private final String ksqldbServer; private final String schemaRegistry; private final List kafkaConnect; private final String schemaNameTemplate; diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/KsqlService.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/KsqlService.java new file mode 100644 index 0000000000..dda8a78fad --- /dev/null +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/KsqlService.java @@ -0,0 +1,44 @@ +package com.provectus.kafka.ui.service; + +import com.provectus.kafka.ui.client.KsqlClient; +import com.provectus.kafka.ui.exception.ClusterNotFoundException; +import com.provectus.kafka.ui.exception.KsqlDbNotFoundException; +import com.provectus.kafka.ui.exception.UnprocessableEntityException; +import com.provectus.kafka.ui.model.KafkaCluster; +import com.provectus.kafka.ui.model.KsqlCommand; +import com.provectus.kafka.ui.strategy.ksqlStatement.KsqlStatementStrategy; +import lombok.RequiredArgsConstructor; +import org.springframework.stereotype.Service; +import reactor.core.publisher.Mono; + +import java.util.List; + +@Service +@RequiredArgsConstructor +public class KsqlService { + private final KsqlClient ksqlClient; + private final ClustersStorage clustersStorage; + private final List commandParamsStrategies; + + public Mono getListStreams(String name, Mono ksqlCommand) { + return Mono.justOrEmpty(clustersStorage.getClusterByName(name)) + .switchIfEmpty(Mono.error(ClusterNotFoundException::new)) + .map(KafkaCluster::getKsqldbServer) + .switchIfEmpty(Mono.error(KsqlDbNotFoundException::new)) + .flatMap(host -> getStatementStrategyForKsqlCommand(ksqlCommand) + .map(statement -> statement.host(host)) + ) + .flatMap(statement -> ksqlClient.execute(statement)); + } + + private Mono getStatementStrategyForKsqlCommand(Mono ksqlCommand) { + return ksqlCommand + .map(command -> commandParamsStrategies.stream() + .filter(s -> s.test(command.getKsql())) + .map(s -> s.ksqlCommand(command)) + .findFirst()) + .flatMap(Mono::justOrEmpty) +// TODO: how to handle not parsed statements? + .switchIfEmpty(Mono.error(new UnprocessableEntityException("Invalid sql"))); + } +} diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/strategy/ksqlStatement/KsqlStatementStrategy.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/strategy/ksqlStatement/KsqlStatementStrategy.java new file mode 100644 index 0000000000..880f981223 --- /dev/null +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/strategy/ksqlStatement/KsqlStatementStrategy.java @@ -0,0 +1,33 @@ +package com.provectus.kafka.ui.strategy.ksqlStatement; + +import com.provectus.kafka.ui.model.KsqlCommand; + +public abstract class KsqlStatementStrategy { + protected String host = null; + protected KsqlCommand ksqlCommand = null; + + public String getUri() { + if (this.host != null) { return this.host + this.getRequestPath(); } + return null; + } + + public KsqlStatementStrategy host(String host) { + this.host = host; + return this; + } + + public KsqlCommand getKsqlCommand() { + return ksqlCommand; + } + + public KsqlStatementStrategy ksqlCommand(KsqlCommand ksqlCommand) { + this.ksqlCommand = ksqlCommand; + return this; + } + + public abstract Object serializeResponse(String response); + + public abstract boolean test(String sql); + + protected abstract String getRequestPath(); +} diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/strategy/ksqlStatement/ListStreamsStrategy.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/strategy/ksqlStatement/ListStreamsStrategy.java new file mode 100644 index 0000000000..b549fb12b1 --- /dev/null +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/strategy/ksqlStatement/ListStreamsStrategy.java @@ -0,0 +1,29 @@ +package com.provectus.kafka.ui.strategy.ksqlStatement; + +import com.google.gson.JsonArray; +import com.google.gson.JsonParser; +import lombok.SneakyThrows; +import org.springframework.stereotype.Component; + + +@Component +public class ListStreamsStrategy extends KsqlStatementStrategy { + private final String requestPath = "/ksql"; + + @SneakyThrows + @Override + public Object serializeResponse(String response) { + JsonArray jsonArray = JsonParser.parseString(response).getAsJsonArray(); + return jsonArray.get(0).getAsJsonObject().get("streams").getAsJsonArray().toString(); + } + + @Override + public boolean test(String sql) { + return sql.trim().toLowerCase().matches("list streams;"); + } + + @Override + protected String getRequestPath() { + return requestPath; + } +} diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/strategy/ksqlStatement/ListTopicsStrategy.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/strategy/ksqlStatement/ListTopicsStrategy.java new file mode 100644 index 0000000000..74f0978fdb --- /dev/null +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/strategy/ksqlStatement/ListTopicsStrategy.java @@ -0,0 +1,26 @@ +package com.provectus.kafka.ui.strategy.ksqlStatement; + +import com.google.gson.JsonArray; +import com.google.gson.JsonParser; +import org.springframework.stereotype.Component; + +@Component +public class ListTopicsStrategy extends KsqlStatementStrategy { + private final String requestPath = "/ksql"; + + @Override + public Object serializeResponse(String response) { + JsonArray jsonArray = JsonParser.parseString(response).getAsJsonArray(); + return jsonArray.get(0).getAsJsonObject().get("topics").getAsJsonArray().toString(); + } + + @Override + public boolean test(String sql) { + return sql.trim().toLowerCase().matches("list topics;"); + } + + @Override + protected String getRequestPath() { + return requestPath; + } +} diff --git a/kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml b/kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml index e04566b7ed..41ae457b05 100644 --- a/kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml +++ b/kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml @@ -1056,6 +1056,31 @@ paths: 200: description: OK + /api/clusters/{clusterName}/ksql: + post: + tags: + - Ksql + summary: executeKsqlCommand + operationId: executeKsqlCommand + parameters: + - name: clusterName + in: path + required: true + schema: + type: string + requestBody: + content: + application/json: + schema: + $ref: '#/components/schemas/KsqlCommand' + responses: + 200: + description: OK + content: + application/json: + schema: + type: object + /api/clusters/{clusterName}/connects/{connectName}/plugins: get: tags: @@ -1821,6 +1846,16 @@ components: items: $ref: '#/components/schemas/ConnectorPluginConfig' + KsqlCommand: + type: object + properties: + ksql: + type: string + streamsProperties: + type: string + required: + - ksql + FullConnectorInfo: type: object properties: