Browse Source

create topic

Zhenya Taran 5 years ago
parent
commit
758a87bbbb

+ 7 - 0
kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/service/ClusterService.java

@@ -2,6 +2,7 @@ package com.provectus.kafka.ui.cluster.service;
 
 import com.provectus.kafka.ui.cluster.model.ClustersStorage;
 import com.provectus.kafka.ui.cluster.model.KafkaCluster;
+import com.provectus.kafka.ui.kafka.KafkaService;
 import com.provectus.kafka.ui.model.*;
 import lombok.RequiredArgsConstructor;
 import org.springframework.http.ResponseEntity;
@@ -17,6 +18,7 @@ import java.util.stream.Collectors;
 public class ClusterService {
 
     private final ClustersStorage clustersStorage;
+    private final KafkaService kafkaService;
 
     public Mono<ResponseEntity<Flux<Cluster>>> getClusters() {
         List<Cluster> clusters = clustersStorage.getKafkaClusters()
@@ -46,4 +48,9 @@ public class ClusterService {
         KafkaCluster cluster = clustersStorage.getClusterById(clusterId);
         return Mono.just(ResponseEntity.ok(Flux.fromIterable(cluster.getTopicConfigsMap().get(topicName))));
     }
+
+    public Mono<ResponseEntity<Void>> createTopic(String clusterId, Mono<TopicFormData> topicFormData) {
+        KafkaCluster cluster = clustersStorage.getClusterById(clusterId);
+        return kafkaService.createTopic(cluster, topicFormData);
+    }
 }

+ 36 - 0
kafka-ui-api/src/main/java/com/provectus/kafka/ui/kafka/KafkaService.java

@@ -8,8 +8,11 @@ import lombok.extern.log4j.Log4j2;
 import org.apache.kafka.clients.admin.*;
 import org.apache.kafka.common.*;
 import org.apache.kafka.common.config.ConfigResource;
+import org.springframework.http.HttpStatus;
+import org.springframework.http.ResponseEntity;
 import org.springframework.scheduling.annotation.Async;
 import org.springframework.stereotype.Service;
+import reactor.core.publisher.Mono;
 
 import java.util.*;
 
@@ -217,4 +220,37 @@ public class KafkaService {
 
         kafkaCluster.getTopicConfigsMap().put(topicName, topicConfigs);
     }
+
+    @SneakyThrows
+    public Mono<ResponseEntity<Void>> createTopic(KafkaCluster cluster, Mono<TopicFormData> topicFormData) {
+        return topicFormData.flatMap(
+                topicData -> {
+                    AdminClient adminClient = cluster.getAdminClient();
+                    NewTopic newTopic = new NewTopic(topicData.getName(), topicData.getPartitions(), topicData.getReplicationFactor().shortValue());
+                    newTopic.configs(topicData.getConfigs());
+
+                    createTopic(adminClient, newTopic);
+
+                    DescribeTopicsResult topicDescriptionsWrapper = adminClient.describeTopics(Collections.singletonList(topicData.getName()));
+                    Map<String, KafkaFuture<TopicDescription>> topicDescriptionFuturesMap = topicDescriptionsWrapper.values();
+                    var entry = topicDescriptionFuturesMap.entrySet().iterator().next();
+                    var topicDescription = getTopicDescription(entry);
+                    if (topicDescription == null) return Mono.error(new RuntimeException("Can't find created topic"));
+
+                    Topic topic = collectTopicData(cluster, topicDescription);
+                    cluster.getTopics().add(topic);
+                    return Mono.just(new ResponseEntity<>(HttpStatus.CREATED));
+                }
+        );
+    }
+
+    @SneakyThrows
+    private void createTopic(AdminClient adminClient, NewTopic newTopic) {
+        adminClient.createTopics(Collections.singletonList(newTopic))
+                .values()
+                .values()
+                .iterator()
+                .next()
+                .get();
+    }
 }

+ 8 - 0
kafka-ui-api/src/main/java/com/provectus/kafka/ui/rest/MetricsRestController.java

@@ -10,6 +10,8 @@ import org.springframework.web.server.ServerWebExchange;
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
 
+import javax.validation.Valid;
+
 @RestController
 @RequiredArgsConstructor
 public class MetricsRestController implements ClustersApi {
@@ -40,4 +42,10 @@ public class MetricsRestController implements ClustersApi {
     public Mono<ResponseEntity<Flux<TopicConfig>>> getTopicConfigs(String clusterId, String topicName, ServerWebExchange exchange) {
         return clusterService.getTopicConfigs(clusterId, topicName);
     }
+
+    @Override
+    public Mono<ResponseEntity<Void>> createTopic(String clusterId, @Valid Mono<TopicFormData> topicFormData, ServerWebExchange exchange) {
+        return clusterService.createTopic(clusterId, topicFormData);
+    }
+
 }

+ 34 - 1
kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml

@@ -71,6 +71,25 @@ paths:
                 type: array
                 items:
                   $ref: '#/components/schemas/Topic'
+    post:
+      tags:
+        - /clusters
+      summary: createTopic
+      operationId: createTopic
+      parameters:
+        - name: clusterId
+          in: path
+          required: true
+          schema:
+            type: string
+      requestBody:
+        content:
+          application/json:
+            schema:
+              $ref: '#/components/schemas/TopicFormData'
+      responses:
+        201:
+          description: Created
 
   /clusters/{clusterId}/topics/{topicName}:
     get:
@@ -262,4 +281,18 @@ components:
         value:
           type: string
         defaultValue:
-          type: string
+          type: string
+
+    TopicFormData:
+      type: object
+      properties:
+        name:
+          type: string
+        partitions:
+          type: integer
+        replicationFactor:
+          type: integer
+        configs:
+          type: object
+          additionalProperties:
+            type: string