瀏覽代碼

KSQL SSE refactoring (#1479)

* sse refactoring

* checkstyle fix

* checkstyle fix

* refactor

* refactor

* api spec changes

* ReactiveAdminClient toMono fix

* fixes

* fixes

* fixes

* fixes

* small improvement

Co-authored-by: iliax <ikuramshin@provectus.com>
Co-authored-by: Roman Zabaluev <rzabaluev@provectus.com>
Co-authored-by: Damir Abdulganiev <dabdulganiev@provectus.com>
Ilya Kuramshin 3 年之前
父節點
當前提交
11c6ce25ff

+ 28 - 19
kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/KsqlController.java

@@ -3,10 +3,12 @@ package com.provectus.kafka.ui.controller;
 import com.provectus.kafka.ui.api.KsqlApi;
 import com.provectus.kafka.ui.model.KsqlCommandDTO;
 import com.provectus.kafka.ui.model.KsqlCommandResponseDTO;
+import com.provectus.kafka.ui.model.KsqlCommandV2DTO;
+import com.provectus.kafka.ui.model.KsqlCommandV2ResponseDTO;
 import com.provectus.kafka.ui.model.KsqlResponseDTO;
 import com.provectus.kafka.ui.model.KsqlTableResponseDTO;
 import com.provectus.kafka.ui.service.KsqlService;
-import com.provectus.kafka.ui.service.ksql.KsqlApiClient;
+import com.provectus.kafka.ui.service.ksql.KsqlServiceV2;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
@@ -24,6 +26,7 @@ import reactor.core.publisher.Mono;
 @Slf4j
 public class KsqlController extends AbstractController implements KsqlApi {
   private final KsqlService ksqlService;
+  private final KsqlServiceV2 ksqlServiceV2;
 
   @Override
   public Mono<ResponseEntity<KsqlCommandResponseDTO>> executeKsqlCommand(String clusterName,
@@ -34,25 +37,31 @@ public class KsqlController extends AbstractController implements KsqlApi {
         .map(ResponseEntity::ok);
   }
 
-  @SuppressWarnings("unchecked")
   @Override
-  public Mono<ResponseEntity<Flux<KsqlResponseDTO>>> executeKsql(String clusterName,
-                                                                 Mono<KsqlCommandDTO> ksqlCommand,
-                                                                 ServerWebExchange exchange) {
+  public Mono<ResponseEntity<KsqlCommandV2ResponseDTO>> executeKsql(String clusterName,
+                                                                    Mono<KsqlCommandV2DTO>
+                                                                        ksqlCommand2Dto,
+                                                                    ServerWebExchange exchange) {
+    return ksqlCommand2Dto.map(dto -> {
+      var id = ksqlServiceV2.registerCommand(
+          getCluster(clusterName),
+          dto.getKsql(),
+          Optional.ofNullable(dto.getStreamsProperties()).orElse(Map.of()));
+      return new KsqlCommandV2ResponseDTO().pipeId(id);
+    }).map(ResponseEntity::ok);
+  }
+
+  @Override
+  public Mono<ResponseEntity<Flux<KsqlResponseDTO>>> openKsqlResponsePipe(String clusterName,
+                                                                          String pipeId,
+                                                                          ServerWebExchange exchange) {
     return Mono.just(
-        ResponseEntity.ok(
-            ksqlCommand
-                .flux()
-                .flatMap(command ->
-                    new KsqlApiClient(getCluster(clusterName))
-                        .execute(
-                            command.getKsql(),
-                            Optional.ofNullable(command.getStreamsProperties()).orElse(Map.of())))
-                .map(table -> new KsqlResponseDTO()
-                    .table(
-                        new KsqlTableResponseDTO()
-                            .header(table.getHeader())
-                            .columnNames(table.getColumnNames())
-                            .values((List<List<Object>>) ((List<?>) (table.getValues())))))));
+        ResponseEntity.ok(ksqlServiceV2.execute(pipeId)
+            .map(table -> new KsqlResponseDTO()
+                .table(
+                    new KsqlTableResponseDTO()
+                        .header(table.getHeader())
+                        .columnNames(table.getColumnNames())
+                        .values((List<List<Object>>) ((List<?>) (table.getValues())))))));
   }
 }

+ 2 - 2
kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/ReactiveAdminClient.java

@@ -92,13 +92,13 @@ public class ReactiveAdminClient implements Closeable {
 
   //TODO: discuss - maybe we should map kafka-library's exceptions to our exceptions here
   private static <T> Mono<T> toMono(KafkaFuture<T> future) {
-    return Mono.create(sink -> future.whenComplete((res, ex) -> {
+    return Mono.<T>create(sink -> future.whenComplete((res, ex) -> {
       if (ex != null) {
         sink.error(ex);
       } else {
         sink.success(res);
       }
-    }));
+    })).doOnCancel(() -> future.cancel(true));
   }
 
   //---------------------------------------------------------------------------------

+ 44 - 9
kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/ksql/KsqlApiClient.java

@@ -1,22 +1,35 @@
 package com.provectus.kafka.ui.service.ksql;
 
+import static ksql.KsqlGrammarParser.DefineVariableContext;
+import static ksql.KsqlGrammarParser.PrintTopicContext;
+import static ksql.KsqlGrammarParser.SingleStatementContext;
+import static ksql.KsqlGrammarParser.UndefineVariableContext;
+
 import com.fasterxml.jackson.databind.JsonNode;
 import com.fasterxml.jackson.databind.node.TextNode;
-import com.provectus.kafka.ui.exception.ValidationException;
 import com.provectus.kafka.ui.model.KafkaCluster;
 import com.provectus.kafka.ui.service.ksql.response.ResponseParser;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
+import java.util.Set;
 import lombok.Builder;
 import lombok.Value;
+import lombok.extern.slf4j.Slf4j;
 import org.springframework.http.MediaType;
 import org.springframework.web.reactive.function.client.WebClient;
 import org.springframework.web.reactive.function.client.WebClientResponseException;
 import reactor.core.publisher.Flux;
 
+@Slf4j
 public class KsqlApiClient {
 
+  private static final Set<Class<?>> UNSUPPORTED_STMT_TYPES = Set.of(
+      PrintTopicContext.class,
+      DefineVariableContext.class,
+      UndefineVariableContext.class
+  );
+
   @Builder
   @Value
   public static class KsqlResponseTable {
@@ -97,18 +110,40 @@ public class KsqlApiClient {
   }
 
   public Flux<KsqlResponseTable> execute(String ksql, Map<String, String> streamProperties) {
-    var parsed = KsqlGrammar.parse(ksql);
-    if (parsed.getStatements().size() > 1) {
-      throw new ValidationException("Only single statement supported now");
+    var parsedStatements = KsqlGrammar.parse(ksql);
+    if (parsedStatements.isEmpty()) {
+      return errorTableFlux("Sql statement is invalid or unsupported");
+    }
+    var statements = parsedStatements.get().getStatements();
+    if (statements.size() > 1) {
+      return errorTableFlux("Only single statement supported now");
+    }
+    if (statements.size() == 0) {
+      return errorTableFlux("No valid ksql statement found");
     }
-    if (parsed.getStatements().isEmpty()) {
-      throw new ValidationException("No valid ksql statement found");
+    if (isUnsupportedStatementType(statements.get(0))) {
+      return errorTableFlux("Unsupported statement type");
     }
-    if (KsqlGrammar.isSelect(parsed.getStatements().get(0))) {
-      return executeSelect(ksql, streamProperties);
+    Flux<KsqlResponseTable> outputFlux;
+    if (KsqlGrammar.isSelect(statements.get(0))) {
+      outputFlux =  executeSelect(ksql, streamProperties);
     } else {
-      return executeStatement(ksql, streamProperties);
+      outputFlux = executeStatement(ksql, streamProperties);
     }
+    return outputFlux.onErrorResume(Exception.class,
+        e -> {
+          log.error("Unexpected error while execution ksql: {}", ksql, e);
+          return errorTableFlux("Unexpected error: " + e.getMessage());
+        });
+  }
+
+  private  Flux<KsqlResponseTable> errorTableFlux(String errorText) {
+    return Flux.just(ResponseParser.errorTableWithTextMsg(errorText));
+  }
+
+  private boolean isUnsupportedStatementType(SingleStatementContext context) {
+    var ctxClass = context.statement().getClass();
+    return UNSUPPORTED_STMT_TYPES.contains(ctxClass);
   }
 
 }

+ 5 - 3
kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/ksql/KsqlGrammar.java

@@ -2,6 +2,7 @@ package com.provectus.kafka.ui.service.ksql;
 
 import com.provectus.kafka.ui.exception.ValidationException;
 import java.util.List;
+import java.util.Optional;
 import ksql.KsqlGrammarLexer;
 import ksql.KsqlGrammarParser;
 import lombok.RequiredArgsConstructor;
@@ -26,13 +27,14 @@ class KsqlGrammar {
     List<KsqlGrammarParser.SingleStatementContext> statements;
   }
 
-  static KsqlStatements parse(String ksql) {
+  // returns Empty if no valid statements found
+  static Optional<KsqlStatements> parse(String ksql) {
     var parsed = parseStatements(ksql);
     if (parsed.singleStatement().stream()
         .anyMatch(s -> s.statement().exception != null)) {
-      throw new ValidationException("Error parsing ksql statement. Check syntax!");
+      return Optional.empty();
     }
-    return new KsqlStatements(parsed.singleStatement());
+    return Optional.of(new KsqlStatements(parsed.singleStatement()));
   }
 
   static boolean isSelect(KsqlGrammarParser.SingleStatementContext statement) {

+ 48 - 0
kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/ksql/KsqlServiceV2.java

@@ -0,0 +1,48 @@
+package com.provectus.kafka.ui.service.ksql;
+
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+import com.provectus.kafka.ui.exception.ValidationException;
+import com.provectus.kafka.ui.model.KafkaCluster;
+import com.provectus.kafka.ui.service.ksql.KsqlApiClient.KsqlResponseTable;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+import lombok.Value;
+import org.springframework.stereotype.Service;
+import reactor.core.publisher.Flux;
+
+@Service
+public class KsqlServiceV2 {
+
+  @Value
+  private static class KsqlExecuteCommand {
+    KafkaCluster cluster;
+    String ksql;
+    Map<String, String> streamProperties;
+  }
+
+  private final Cache<String, KsqlExecuteCommand> registeredCommands =
+      CacheBuilder.newBuilder()
+          .expireAfterWrite(1, TimeUnit.MINUTES)
+          .build();
+
+  public String registerCommand(KafkaCluster cluster,
+                                String ksql,
+                                Map<String, String> streamProperties) {
+    String uuid = UUID.randomUUID().toString();
+    registeredCommands.put(uuid, new KsqlExecuteCommand(cluster, ksql, streamProperties));
+    return uuid;
+  }
+
+  public Flux<KsqlResponseTable> execute(String commandId) {
+    var cmd = registeredCommands.getIfPresent(commandId);
+    if (cmd == null) {
+      throw new ValidationException("No command registered with id " + commandId);
+    }
+    registeredCommands.invalidate(commandId);
+    return new KsqlApiClient(cmd.cluster)
+        .execute(cmd.ksql, cmd.streamProperties);
+  }
+
+}

+ 12 - 6
kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/ksql/response/ResponseParser.java

@@ -2,6 +2,7 @@ package com.provectus.kafka.ui.service.ksql.response;
 
 import com.fasterxml.jackson.databind.JsonNode;
 import com.fasterxml.jackson.databind.json.JsonMapper;
+import com.fasterxml.jackson.databind.node.TextNode;
 import com.google.common.collect.Lists;
 import com.provectus.kafka.ui.exception.KsqlApiException;
 import com.provectus.kafka.ui.service.ksql.KsqlApiClient;
@@ -45,15 +46,23 @@ public class ResponseParser {
     return Optional.empty();
   }
 
+  public static KsqlApiClient.KsqlResponseTable errorTableWithTextMsg(String errorText) {
+    return KsqlApiClient.KsqlResponseTable.builder()
+        .header("Execution error")
+        .columnNames(List.of("message"))
+        .values(List.of(List.of(new TextNode(errorText))))
+        .build();
+  }
+
   public static KsqlApiClient.KsqlResponseTable parseErrorResponse(WebClientResponseException e) {
     try {
       var errBody = new JsonMapper().readTree(e.getResponseBodyAsString());
       return DynamicParser.parseObject("Execution error", errBody);
     } catch (Exception ex) {
-      throw new KsqlApiException(
+      return errorTableWithTextMsg(
           String.format(
               "Unparsable error response from ksqdb, status:'%s', body: '%s'",
-              e.getStatusCode(), e.getResponseBodyAsString()), e);
+              e.getStatusCode(), e.getResponseBodyAsString()));
     }
   }
 
@@ -77,7 +86,7 @@ public class ResponseParser {
       case "sourceDescription":
         return parseObjectDynamically("Source Description", jsonNode.get("sourceDescription"));
       case "queryDescription":
-        return parseArray("Queries Description", "queryDescription", jsonNode);
+        return parseObjectDynamically("Queries Description", jsonNode.get("queryDescription"));
       case "topicDescription":
         return parseObject(
             "Topic Description",
@@ -147,9 +156,6 @@ public class ResponseParser {
       tables.add(DynamicParser.parseArray("overwrittenProperties",
           jsonNode.get("overwrittenProperties")));
     }
-    if (arrayFieldNonEmpty(jsonNode, "defaultProperties")) {
-      tables.add(DynamicParser.parseArray("defaultProperties", jsonNode.get("defaultProperties")));
-    }
     return tables;
   }
 

+ 46 - 1
kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml

@@ -1473,7 +1473,32 @@ paths:
         content:
           application/json:
             schema:
-              $ref: '#/components/schemas/KsqlCommand'
+              $ref: '#/components/schemas/KsqlCommandV2'
+      responses:
+        200:
+          description: OK
+          content:
+            application/json:
+              schema:
+                $ref: '#/components/schemas/KsqlCommandV2Response'
+
+  /api/clusters/{clusterName}/ksql/response:
+    get:
+      tags:
+        - Ksql
+      summary: Open SSE pipe
+      operationId: openKsqlResponsePipe
+      parameters:
+        - name: clusterName
+          in: path
+          required: true
+          schema:
+            type: string
+        - name: pipeId
+          in: query
+          required: true
+          schema:
+            type: string
       responses:
         200:
           description: OK
@@ -2612,6 +2637,26 @@ components:
       required:
         - ksql
 
+    KsqlCommandV2:
+      type: object
+      properties:
+        ksql:
+          type: string
+        streamsProperties:
+          type: object
+          additionalProperties:
+            type: string
+      required:
+        - ksql
+
+    KsqlCommandV2Response:
+      type: object
+      properties:
+        pipeId:
+          type: string
+      required:
+        - pipeId
+
     KsqlCommandResponse:
       type: object
       properties: