浏览代码

Clone topic functionality: backend (#1631)

* [ISSUE-1013]Clone topic functionality

* [ISSUE-1013]Clone topic functionality

* [ISSUE-1013]Clone topic functionality

* [ISSUE-1013]Clone topic functionality

Co-authored-by: Roman Zabaluev <rzabaluev@provectus.com>
ValentinPrischepa 3 年之前
父节点
当前提交
ed1e2bd405

+ 7 - 0
kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/TopicsController.java

@@ -46,6 +46,13 @@ public class TopicsController extends AbstractController implements TopicsApi {
         .map(s -> new ResponseEntity<>(s, HttpStatus.CREATED));
   }
 
+  @Override
+  public Mono<ResponseEntity<TopicDTO>> cloneTopic(
+      String clusterName, String topicName, String newTopicName, ServerWebExchange exchange) {
+    return topicsService.cloneTopic(getCluster(clusterName), topicName, newTopicName)
+        .map(s -> new ResponseEntity<>(s, HttpStatus.CREATED));
+  }
+
   @Override
   public Mono<ResponseEntity<Void>> deleteTopic(
       String clusterName, String topicName, ServerWebExchange exchange) {

+ 12 - 0
kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/TopicsService.java

@@ -428,6 +428,18 @@ public class TopicsService {
         .getTopicSchema(topicName);
   }
 
+  public Mono<TopicDTO> cloneTopic(
+      KafkaCluster cluster, String topicName, String newTopicName) {
+    return loadTopic(cluster, topicName).flatMap(topic ->
+        adminClientService.get(cluster).flatMap(ac -> ac.createTopic(newTopicName,
+            topic.getPartitionCount(),
+            (short) topic.getReplicationFactor(),
+            topic.getTopicConfigs()
+                .stream()
+                .collect(Collectors.toMap(InternalTopicConfig::getName, InternalTopicConfig::getValue)))
+        ).thenReturn(newTopicName).flatMap(a -> loadTopic(cluster, newTopicName)).map(clusterMapper::toTopic));
+  }
+
   @VisibleForTesting
   @lombok.Value
   static class Pagination {

+ 28 - 0
kafka-ui-api/src/test/java/com/provectus/kafka/ui/KafkaTopicCreateTests.java

@@ -76,4 +76,32 @@ public class KafkaTopicCreateTests extends AbstractIntegrationTest {
             .jsonPath("replicationFactor").isEqualTo(topicCreation.getReplicationFactor().toString())
             .jsonPath("name").isEqualTo(topicCreation.getName());
   }
+
+  @Test
+  void shouldCloneExistingTopicSuccessfully() {
+    TopicCreationDTO topicCreation = new TopicCreationDTO()
+        .replicationFactor(1)
+        .partitions(3)
+        .name(UUID.randomUUID().toString());
+    String clonedTopicName = UUID.randomUUID().toString();
+
+    webTestClient.post()
+        .uri("/api/clusters/{clusterName}/topics", LOCAL)
+        .bodyValue(topicCreation)
+        .exchange()
+        .expectStatus()
+        .isOk();
+
+    webTestClient.post()
+        .uri("/api/clusters/{clusterName}/topics/{topicName}/clone?newTopicName=" + clonedTopicName,
+            LOCAL, topicCreation.getName())
+        .exchange()
+        .expectStatus()
+        .isCreated()
+        .expectBody()
+        .jsonPath("partitionCount").isEqualTo(topicCreation.getPartitions().toString())
+        .jsonPath("replicationFactor").isEqualTo(topicCreation.getReplicationFactor().toString())
+        .jsonPath("name").isEqualTo(clonedTopicName);
+  }
+
 }

+ 32 - 0
kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml

@@ -331,6 +331,38 @@ paths:
               schema:
                 $ref: '#/components/schemas/Topic'
 
+  /api/clusters/{clusterName}/topics/{topicName}/clone:
+    post:
+      tags:
+        - Topics
+      summary: cloneTopic
+      operationId: cloneTopic
+      parameters:
+        - name: clusterName
+          in: path
+          required: true
+          schema:
+            type: string
+        - name: topicName
+          in: path
+          required: true
+          schema:
+            type: string
+        - name: newTopicName
+          in: query
+          required: true
+          schema:
+            type: string
+      responses:
+        201:
+          description: Created
+          content:
+            application/json:
+              schema:
+                $ref: '#/components/schemas/Topic'
+        404:
+          description: Not found
+
   /api/clusters/{clusterName}/topics/{topicName}:
     get:
       tags: