瀏覽代碼

ISSUE-2083: New ksql endpoints added to replace deprecated (#2089)

* New Ksql apis created to replace deprecated
Ilya Kuramshin 3 年之前
父節點
當前提交
c0b3ca3bc6

+ 14 - 0
kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/KsqlController.java

@@ -6,6 +6,8 @@ import com.provectus.kafka.ui.model.KsqlCommandResponseDTO;
 import com.provectus.kafka.ui.model.KsqlCommandV2DTO;
 import com.provectus.kafka.ui.model.KsqlCommandV2ResponseDTO;
 import com.provectus.kafka.ui.model.KsqlResponseDTO;
+import com.provectus.kafka.ui.model.KsqlStreamDescriptionDTO;
+import com.provectus.kafka.ui.model.KsqlTableDescriptionDTO;
 import com.provectus.kafka.ui.model.KsqlTableResponseDTO;
 import com.provectus.kafka.ui.service.KsqlService;
 import com.provectus.kafka.ui.service.ksql.KsqlServiceV2;
@@ -64,4 +66,16 @@ public class KsqlController extends AbstractController implements KsqlApi {
                         .columnNames(table.getColumnNames())
                         .values((List<List<Object>>) ((List<?>) (table.getValues())))))));
   }
+
+  @Override
+  public Mono<ResponseEntity<Flux<KsqlStreamDescriptionDTO>>> listStreams(String clusterName,
+                                                                         ServerWebExchange exchange) {
+    return Mono.just(ResponseEntity.ok(ksqlServiceV2.listStreams(getCluster(clusterName))));
+  }
+
+  @Override
+  public Mono<ResponseEntity<Flux<KsqlTableDescriptionDTO>>> listTables(String clusterName,
+                                                                        ServerWebExchange exchange) {
+    return Mono.just(ResponseEntity.ok(ksqlServiceV2.listTables(getCluster(clusterName))));
+  }
 }

+ 4 - 0
kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/ksql/KsqlApiClient.java

@@ -42,6 +42,10 @@ public class KsqlApiClient {
     String header;
     List<String> columnNames;
     List<List<JsonNode>> values;
+
+    public Optional<JsonNode> getColumnValue(List<JsonNode> row, String column) {
+      return Optional.ofNullable(row.get(columnNames.indexOf(column)));
+    }
   }
 
   @Value

+ 50 - 0
kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/ksql/KsqlServiceV2.java

@@ -1,17 +1,24 @@
 package com.provectus.kafka.ui.service.ksql;
 
+import com.fasterxml.jackson.databind.JsonNode;
 import com.google.common.cache.Cache;
 import com.google.common.cache.CacheBuilder;
+import com.provectus.kafka.ui.exception.KsqlApiException;
 import com.provectus.kafka.ui.exception.ValidationException;
 import com.provectus.kafka.ui.model.KafkaCluster;
+import com.provectus.kafka.ui.model.KsqlStreamDescriptionDTO;
+import com.provectus.kafka.ui.model.KsqlTableDescriptionDTO;
 import com.provectus.kafka.ui.service.ksql.KsqlApiClient.KsqlResponseTable;
 import java.util.Map;
 import java.util.UUID;
 import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
 import lombok.Value;
+import lombok.extern.slf4j.Slf4j;
 import org.springframework.stereotype.Service;
 import reactor.core.publisher.Flux;
 
+@Slf4j
 @Service
 public class KsqlServiceV2 {
 
@@ -45,4 +52,47 @@ public class KsqlServiceV2 {
         .execute(cmd.ksql, cmd.streamProperties);
   }
 
+  public Flux<KsqlTableDescriptionDTO> listTables(KafkaCluster cluster) {
+    return new KsqlApiClient(cluster)
+        .execute("LIST TABLES;", Map.of())
+        .flatMap(resp -> {
+          if (!resp.getHeader().equals("Tables")) {
+            log.error("Unexpected result header: {}", resp.getHeader());
+            log.debug("Unexpected result {}", resp);
+            return Flux.error(new KsqlApiException("Error retrieving tables list"));
+          }
+          return Flux.fromIterable(resp.getValues()
+              .stream()
+              .map(row ->
+                  new KsqlTableDescriptionDTO()
+                      .name(resp.getColumnValue(row, "name").map(JsonNode::asText).orElse(null))
+                      .topic(resp.getColumnValue(row, "topic").map(JsonNode::asText).orElse(null))
+                      .keyFormat(resp.getColumnValue(row, "keyFormat").map(JsonNode::asText).orElse(null))
+                      .valueFormat(resp.getColumnValue(row, "valueFormat").map(JsonNode::asText).orElse(null))
+                      .isWindowed(resp.getColumnValue(row, "isWindowed").map(JsonNode::asBoolean).orElse(null)))
+              .collect(Collectors.toList()));
+        });
+  }
+
+  public Flux<KsqlStreamDescriptionDTO> listStreams(KafkaCluster cluster) {
+    return new KsqlApiClient(cluster)
+        .execute("LIST STREAMS;", Map.of())
+        .flatMap(resp -> {
+          if (!resp.getHeader().equals("Streams")) {
+            log.error("Unexpected result header: {}", resp.getHeader());
+            log.debug("Unexpected result {}", resp);
+            return Flux.error(new KsqlApiException("Error retrieving streams list"));
+          }
+          return Flux.fromIterable(resp.getValues()
+              .stream()
+              .map(row ->
+                  new KsqlStreamDescriptionDTO()
+                      .name(resp.getColumnValue(row, "name").map(JsonNode::asText).orElse(null))
+                      .topic(resp.getColumnValue(row, "topic").map(JsonNode::asText).orElse(null))
+                      .keyFormat(resp.getColumnValue(row, "keyFormat").map(JsonNode::asText).orElse(null))
+                      .valueFormat(resp.getColumnValue(row, "valueFormat").map(JsonNode::asText).orElse(null)))
+              .collect(Collectors.toList()));
+        });
+  }
+
 }

+ 108 - 0
kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/ksql/KsqlServiceV2Test.java

@@ -0,0 +1,108 @@
+package com.provectus.kafka.ui.service.ksql;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import com.provectus.kafka.ui.AbstractIntegrationTest;
+import com.provectus.kafka.ui.container.KsqlDbContainer;
+import com.provectus.kafka.ui.model.KafkaCluster;
+import com.provectus.kafka.ui.model.KsqlStreamDescriptionDTO;
+import com.provectus.kafka.ui.model.KsqlTableDescriptionDTO;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CopyOnWriteArraySet;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+import org.testcontainers.utility.DockerImageName;
+
+class KsqlServiceV2Test extends AbstractIntegrationTest {
+
+  private static final KsqlDbContainer KSQL_DB = new KsqlDbContainer(
+      DockerImageName.parse("confluentinc/ksqldb-server").withTag("0.24.0"))
+      .withKafka(kafka);
+
+  private static final Set<String> STREAMS_TO_DELETE = new CopyOnWriteArraySet<>();
+  private static final Set<String> TABLES_TO_DELETE = new CopyOnWriteArraySet<>();
+
+  @BeforeAll
+  static void init() {
+    KSQL_DB.start();
+  }
+
+  @AfterAll
+  static void cleanup() {
+    var client = new KsqlApiClient(KafkaCluster.builder().ksqldbServer(KSQL_DB.url()).build());
+
+    TABLES_TO_DELETE.forEach(t ->
+        client.execute(String.format("DROP TABLE IF EXISTS %s DELETE TOPIC;", t), Map.of())
+            .blockLast());
+
+    STREAMS_TO_DELETE.forEach(s ->
+        client.execute(String.format("DROP STREAM IF EXISTS %s DELETE TOPIC;", s), Map.of())
+            .blockLast());
+
+    KSQL_DB.stop();
+  }
+
+  private final KsqlServiceV2 ksqlService = new KsqlServiceV2();
+
+  @Test
+  void listStreamsReturnsAllKsqlStreams() {
+    var cluster = KafkaCluster.builder().ksqldbServer(KSQL_DB.url()).build();
+    var streamName = "stream_" + System.currentTimeMillis();
+    STREAMS_TO_DELETE.add(streamName);
+
+    new KsqlApiClient(cluster)
+        .execute(
+            String.format("CREATE STREAM %s ( "
+                + "  c1 BIGINT KEY, "
+                + "  c2 VARCHAR "
+                + " ) WITH ( "
+                + "  KAFKA_TOPIC = '%s_topic', "
+                + "  PARTITIONS = 1, "
+                + "  VALUE_FORMAT = 'JSON' "
+                + " );", streamName, streamName),
+            Map.of())
+        .blockLast();
+
+    var streams = ksqlService.listStreams(cluster).collectList().block();
+    assertThat(streams).contains(
+        new KsqlStreamDescriptionDTO()
+            .name(streamName.toUpperCase())
+            .topic(streamName + "_topic")
+            .keyFormat("KAFKA")
+            .valueFormat("JSON")
+    );
+  }
+
+  @Test
+  void listTablesReturnsAllKsqlTables() {
+    var cluster = KafkaCluster.builder().ksqldbServer(KSQL_DB.url()).build();
+    var tableName = "table_" + System.currentTimeMillis();
+    TABLES_TO_DELETE.add(tableName);
+
+    new KsqlApiClient(cluster)
+        .execute(
+            String.format("CREATE TABLE %s ( "
+                + "   c1 BIGINT PRIMARY KEY, "
+                + "   c2 VARCHAR "
+                + " ) WITH ( "
+                + "  KAFKA_TOPIC = '%s_topic', "
+                + "  PARTITIONS = 1, "
+                + "  VALUE_FORMAT = 'JSON' "
+                + " );", tableName, tableName),
+            Map.of())
+        .blockLast();
+
+    var tables = ksqlService.listTables(cluster).collectList().block();
+    assertThat(tables).contains(
+        new KsqlTableDescriptionDTO()
+            .name(tableName.toUpperCase())
+            .topic(tableName + "_topic")
+            .keyFormat("KAFKA")
+            .valueFormat("JSON")
+            .isWindowed(false)
+    );
+  }
+
+}

+ 70 - 0
kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml

@@ -1514,6 +1514,50 @@ paths:
               schema:
                 $ref: '#/components/schemas/KsqlCommandV2Response'
 
+  /api/clusters/{clusterName}/ksql/tables:
+    get:
+      tags:
+        - Ksql
+      summary: listTables
+      operationId: listTables
+      parameters:
+        - name: clusterName
+          in: path
+          required: true
+          schema:
+            type: string
+      responses:
+        200:
+          description: OK
+          content:
+            application/json:
+              schema:
+                type: array
+                items:
+                  $ref: '#/components/schemas/KsqlTableDescription'
+
+  /api/clusters/{clusterName}/ksql/streams:
+    get:
+      tags:
+        - Ksql
+      summary: listStreams
+      operationId: listStreams
+      parameters:
+        - name: clusterName
+          in: path
+          required: true
+          schema:
+            type: string
+      responses:
+        200:
+          description: OK
+          content:
+            application/json:
+              schema:
+                type: array
+                items:
+                  $ref: '#/components/schemas/KsqlStreamDescription'
+
   /api/clusters/{clusterName}/ksql/response:
     get:
       tags:
@@ -2690,6 +2734,32 @@ components:
       required:
         - pipeId
 
+    KsqlTableDescription:
+      type: object
+      properties:
+        name:
+          type: string
+        topic:
+          type: string
+        keyFormat:
+          type: string
+        valueFormat:
+          type: string
+        isWindowed:
+          type: boolean
+
+    KsqlStreamDescription:
+      type: object
+      properties:
+        name:
+          type: string
+        topic:
+          type: string
+        keyFormat:
+          type: string
+        valueFormat:
+          type: string
+
     KsqlCommandResponse:
       type: object
       properties: