浏览代码

[issue207] feat(api, contract): ksqldb controller, client, service, statement strategies

Ilnur Farukhshin 4 年之前
父节点
当前提交
3d5bbc854a

+ 30 - 0
kafka-ui-api/src/main/java/com/provectus/kafka/ui/client/KsqlClient.java

@@ -0,0 +1,30 @@
+package com.provectus.kafka.ui.client;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.provectus.kafka.ui.strategy.ksqlStatement.KsqlStatementStrategy;
+import lombok.RequiredArgsConstructor;
+import lombok.extern.log4j.Log4j2;
+import org.springframework.http.MediaType;
+import org.springframework.stereotype.Service;
+import org.springframework.web.reactive.function.BodyInserters;
+import org.springframework.web.reactive.function.client.WebClient;
+import reactor.core.publisher.Mono;
+
+@Service
+@RequiredArgsConstructor
+@Log4j2
+public final class KsqlClient {
+  private final WebClient webClient;
+  private ObjectMapper objectMapper;
+
+  public Mono<Object> execute(KsqlStatementStrategy ksqlStatement) {
+    return webClient.post()
+      .uri(ksqlStatement.getUri())
+      .accept(new MediaType("application","vnd.ksql.v1+json"))
+      .body(BodyInserters.fromValue(ksqlStatement.getKsqlCommand()))
+      .retrieve()
+      .bodyToMono(String.class)
+      .map(ksqlStatement::serializeResponse)
+      .doOnError(log::error);
+  }
+}

+ 1 - 0
kafka-ui-api/src/main/java/com/provectus/kafka/ui/config/ClustersProperties.java

@@ -20,6 +20,7 @@ public class ClustersProperties {
     String bootstrapServers;
     String zookeeper;
     String schemaRegistry;
+    String ksqldbServer;
     String schemaNameTemplate = "%s-value";
     String protobufFile;
     String protobufMessageName;

+ 26 - 0
kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/KsqlController.java

@@ -0,0 +1,26 @@
+package com.provectus.kafka.ui.controller;
+
+import com.provectus.kafka.ui.api.KsqlApi;
+import com.provectus.kafka.ui.model.KsqlCommand;
+
+import com.provectus.kafka.ui.service.KsqlService;
+import lombok.RequiredArgsConstructor;
+import lombok.extern.log4j.Log4j2;
+import org.springframework.http.ResponseEntity;
+import org.springframework.web.bind.annotation.RestController;
+import org.springframework.web.server.ServerWebExchange;
+import reactor.core.publisher.Mono;
+
+@RestController
+@RequiredArgsConstructor
+@Log4j2
+public class KsqlController implements KsqlApi {
+  private final KsqlService ksqlService;
+
+  @Override
+  public Mono<ResponseEntity<Object>> executeKsqlCommand(String clusterName,
+                                                               Mono<KsqlCommand> ksqlCommand,
+                                                               ServerWebExchange exchange) {
+    return Mono.just(ResponseEntity.ok(ksqlService.getListStreams(clusterName, ksqlCommand)));
+  }
+}

+ 2 - 1
kafka-ui-api/src/main/java/com/provectus/kafka/ui/exception/ErrorCode.java

@@ -17,7 +17,8 @@ public enum ErrorCode {
   CLUSTER_NOT_FOUND(4007, HttpStatus.NOT_FOUND),
   TOPIC_NOT_FOUND(4008, HttpStatus.NOT_FOUND),
   SCHEMA_NOT_FOUND(4009, HttpStatus.NOT_FOUND),
-  CONNECT_NOT_FOUND(4010, HttpStatus.NOT_FOUND);
+  CONNECT_NOT_FOUND(4010, HttpStatus.NOT_FOUND),
+  KSQLDB_NOT_FOUND(4011, HttpStatus.NOT_FOUND);
 
   static {
     // codes uniqueness check

+ 13 - 0
kafka-ui-api/src/main/java/com/provectus/kafka/ui/exception/KsqlDbNotFoundException.java

@@ -0,0 +1,13 @@
+package com.provectus.kafka.ui.exception;
+
+public class KsqlDbNotFoundException extends CustomBaseException {
+
+  public KsqlDbNotFoundException() {
+    super("KSQL DB not found");
+  }
+
+  @Override
+  public ErrorCode getErrorCode() {
+    return ErrorCode.KSQLDB_NOT_FOUND;
+  }
+}

+ 1 - 0
kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/KafkaCluster.java

@@ -14,6 +14,7 @@ public class KafkaCluster {
   private final Integer jmxPort;
   private final String bootstrapServers;
   private final String zookeeper;
+  private final String ksqldbServer;
   private final String schemaRegistry;
   private final List<KafkaConnectCluster> kafkaConnect;
   private final String schemaNameTemplate;

+ 44 - 0
kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/KsqlService.java

@@ -0,0 +1,44 @@
+package com.provectus.kafka.ui.service;
+
+import com.provectus.kafka.ui.client.KsqlClient;
+import com.provectus.kafka.ui.exception.ClusterNotFoundException;
+import com.provectus.kafka.ui.exception.KsqlDbNotFoundException;
+import com.provectus.kafka.ui.exception.UnprocessableEntityException;
+import com.provectus.kafka.ui.model.KafkaCluster;
+import com.provectus.kafka.ui.model.KsqlCommand;
+import com.provectus.kafka.ui.strategy.ksqlStatement.KsqlStatementStrategy;
+import lombok.RequiredArgsConstructor;
+import org.springframework.stereotype.Service;
+import reactor.core.publisher.Mono;
+
+import java.util.List;
+
+@Service
+@RequiredArgsConstructor
+public class KsqlService {
+  private final KsqlClient ksqlClient;
+  private final ClustersStorage clustersStorage;
+  private final List<KsqlStatementStrategy> commandParamsStrategies;
+
+  public Mono<Object> getListStreams(String name, Mono<KsqlCommand> ksqlCommand) {
+    return Mono.justOrEmpty(clustersStorage.getClusterByName(name))
+            .switchIfEmpty(Mono.error(ClusterNotFoundException::new))
+            .map(KafkaCluster::getKsqldbServer)
+            .switchIfEmpty(Mono.error(KsqlDbNotFoundException::new))
+            .flatMap(host -> getStatementStrategyForKsqlCommand(ksqlCommand)
+                    .map(statement -> statement.host(host))
+            )
+            .flatMap(statement -> ksqlClient.execute(statement));
+  }
+
+  private Mono<KsqlStatementStrategy> getStatementStrategyForKsqlCommand(Mono<KsqlCommand> ksqlCommand) {
+    return ksqlCommand
+            .map(command -> commandParamsStrategies.stream()
+                    .filter(s -> s.test(command.getKsql()))
+                    .map(s -> s.ksqlCommand(command))
+                    .findFirst())
+            .flatMap(Mono::justOrEmpty)
+// TODO: how to handle not parsed statements?
+            .switchIfEmpty(Mono.error(new UnprocessableEntityException("Invalid sql")));
+  }
+}

+ 33 - 0
kafka-ui-api/src/main/java/com/provectus/kafka/ui/strategy/ksqlStatement/KsqlStatementStrategy.java

@@ -0,0 +1,33 @@
+package com.provectus.kafka.ui.strategy.ksqlStatement;
+
+import com.provectus.kafka.ui.model.KsqlCommand;
+
+public abstract class KsqlStatementStrategy {
+    protected String host = null;
+    protected KsqlCommand ksqlCommand = null;
+
+    public String getUri() {
+        if (this.host != null) { return this.host + this.getRequestPath(); }
+        return null;
+    }
+
+    public KsqlStatementStrategy host(String host) {
+        this.host = host;
+        return this;
+    }
+
+    public KsqlCommand getKsqlCommand() {
+        return ksqlCommand;
+    }
+
+    public KsqlStatementStrategy ksqlCommand(KsqlCommand ksqlCommand) {
+        this.ksqlCommand = ksqlCommand;
+        return this;
+    }
+
+    public abstract Object serializeResponse(String response);
+
+    public abstract boolean test(String sql);
+
+    protected abstract String getRequestPath();
+}

+ 29 - 0
kafka-ui-api/src/main/java/com/provectus/kafka/ui/strategy/ksqlStatement/ListStreamsStrategy.java

@@ -0,0 +1,29 @@
+package com.provectus.kafka.ui.strategy.ksqlStatement;
+
+import com.google.gson.JsonArray;
+import com.google.gson.JsonParser;
+import lombok.SneakyThrows;
+import org.springframework.stereotype.Component;
+
+
+@Component
+public class ListStreamsStrategy extends KsqlStatementStrategy {
+    private final String requestPath = "/ksql";
+
+    @SneakyThrows
+    @Override
+    public Object serializeResponse(String response) {
+        JsonArray jsonArray = JsonParser.parseString(response).getAsJsonArray();
+        return jsonArray.get(0).getAsJsonObject().get("streams").getAsJsonArray().toString();
+    }
+
+    @Override
+    public boolean test(String sql) {
+        return sql.trim().toLowerCase().matches("list streams;");
+    }
+
+    @Override
+    protected String getRequestPath() {
+        return requestPath;
+    }
+}

+ 26 - 0
kafka-ui-api/src/main/java/com/provectus/kafka/ui/strategy/ksqlStatement/ListTopicsStrategy.java

@@ -0,0 +1,26 @@
+package com.provectus.kafka.ui.strategy.ksqlStatement;
+
+import com.google.gson.JsonArray;
+import com.google.gson.JsonParser;
+import org.springframework.stereotype.Component;
+
+@Component
+public class ListTopicsStrategy extends KsqlStatementStrategy {
+    private final String requestPath = "/ksql";
+
+    @Override
+    public Object serializeResponse(String response) {
+        JsonArray jsonArray = JsonParser.parseString(response).getAsJsonArray();
+        return jsonArray.get(0).getAsJsonObject().get("topics").getAsJsonArray().toString();
+    }
+
+    @Override
+    public boolean test(String sql) {
+        return sql.trim().toLowerCase().matches("list topics;");
+    }
+
+    @Override
+    protected String getRequestPath() {
+        return requestPath;
+    }
+}

+ 35 - 0
kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml

@@ -1056,6 +1056,31 @@ paths:
         200:
           description: OK
 
+  /api/clusters/{clusterName}/ksql:
+    post:
+      tags:
+        - Ksql
+      summary: executeKsqlCommand
+      operationId: executeKsqlCommand
+      parameters:
+        - name: clusterName
+          in: path
+          required: true
+          schema:
+            type: string
+      requestBody:
+        content:
+          application/json:
+            schema:
+              $ref: '#/components/schemas/KsqlCommand'
+      responses:
+        200:
+          description: OK
+          content:
+            application/json:
+              schema:
+                type: object
+
   /api/clusters/{clusterName}/connects/{connectName}/plugins:
     get:
       tags:
@@ -1821,6 +1846,16 @@ components:
           items:
             $ref: '#/components/schemas/ConnectorPluginConfig'
 
+    KsqlCommand:
+      type: object
+      properties:
+        ksql:
+          type: string
+        streamsProperties:
+          type: string
+      required:
+        - ksql
+
     FullConnectorInfo:
       type: object
       properties: