Forráskód Böngészése

Implement recreating a topic

* [ISSUE-998][backend] Add functionality to re-create topic in one click

* [ISSUE-998][backend] Add functionality to re-create topic in one click

* [ISSUE-998][backend] Add functionality to re-create topic in one click

Co-authored-by: Roman Zabaluev <rzabaluev@provectus.com>
ValentinPrischepa 3 éve
szülő
commit
edabfca966

+ 5 - 3
README.md

@@ -154,9 +154,9 @@ For example, if you want to use an environment variable to set the `name` parame
 
 |Name               	|Description
 |-----------------------|-------------------------------
-|`SERVER_SERVLET_CONTEXT_PATH`  | URI basePath
+|`SERVER_SERVLET_CONTEXT_PATH` | URI basePath
 |`LOGGING_LEVEL_ROOT`        	| Setting log level (trace, debug, info, warn, error). Default: info
-|`LOGGING_LEVEL_COM_PROVECTUS`  |Setting log level (trace, debug, info, warn, error). Default: debug
+|`LOGGING_LEVEL_COM_PROVECTUS` |Setting log level (trace, debug, info, warn, error). Default: debug
 |`SERVER_PORT` |Port for the embedded server. Default: `8080`
 |`KAFKA_ADMIN-CLIENT-TIMEOUT` | Kafka API timeout in ms. Default: `30000`
 |`KAFKA_CLUSTERS_0_NAME` | Cluster name
@@ -167,7 +167,7 @@ For example, if you want to use an environment variable to set the `name` parame
 |`KAFKA_CLUSTERS_0_SCHEMAREGISTRY`   	|SchemaRegistry's address
 |`KAFKA_CLUSTERS_0_SCHEMAREGISTRYAUTH_USERNAME`   	|SchemaRegistry's basic authentication username
 |`KAFKA_CLUSTERS_0_SCHEMAREGISTRYAUTH_PASSWORD`   	|SchemaRegistry's basic authentication password
-|`KAFKA_CLUSTERS_0_SCHEMANAMETEMPLATE`  |How keys are saved to schemaRegistry
+|`KAFKA_CLUSTERS_0_SCHEMANAMETEMPLATE` |How keys are saved to schemaRegistry
 |`KAFKA_CLUSTERS_0_JMXPORT`        	|Open jmxPosrts of a broker
 |`KAFKA_CLUSTERS_0_READONLY`        	|Enable read-only mode. Default: false
 |`KAFKA_CLUSTERS_0_DISABLELOGDIRSCOLLECTION`        	|Disable collecting segments information. It should be true for confluent cloud. Default: false
@@ -176,3 +176,5 @@ For example, if you want to use an environment variable to set the `name` parame
 |`KAFKA_CLUSTERS_0_JMXSSL` |Enable SSL for JMX? `true` or `false`. For advanced setup, see `kafka-ui-jmx-secured.yml`
 |`KAFKA_CLUSTERS_0_JMXUSERNAME` |Username for JMX authentication
 |`KAFKA_CLUSTERS_0_JMXPASSWORD` |Password for JMX authentication
+|`TOPIC_RECREATE_DELAY_SECONDS` |Time delay between topic deletion and topic creation attempts for topic recreate functionality. Default: 1
+|`TOPIC_RECREATE_MAXRETRIES`  |Number of attempts of topic creation after topic deletion for topic recreate functionality. Default: 15

+ 7 - 0
kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/TopicsController.java

@@ -39,6 +39,13 @@ public class TopicsController extends AbstractController implements TopicsApi {
         .switchIfEmpty(Mono.just(ResponseEntity.notFound().build()));
   }
 
+  @Override
+  public Mono<ResponseEntity<TopicDTO>> recreateTopic(String clusterName,
+                                                      String topicName, ServerWebExchange serverWebExchange) {
+    return topicsService.recreateTopic(getCluster(clusterName), topicName)
+            .map(s -> new ResponseEntity<>(s, HttpStatus.CREATED));
+  }
+
   @Override
   public Mono<ResponseEntity<Void>> deleteTopic(
       String clusterName, String topicName, ServerWebExchange exchange) {

+ 2 - 1
kafka-ui-api/src/main/java/com/provectus/kafka/ui/exception/ErrorCode.java

@@ -24,7 +24,8 @@ public enum ErrorCode {
   KSQLDB_NOT_FOUND(4011, HttpStatus.NOT_FOUND),
   DIR_NOT_FOUND(4012, HttpStatus.BAD_REQUEST),
   TOPIC_OR_PARTITION_NOT_FOUND(4013, HttpStatus.BAD_REQUEST),
-  INVALID_REQUEST(4014, HttpStatus.BAD_REQUEST);
+  INVALID_REQUEST(4014, HttpStatus.BAD_REQUEST),
+  RECREATE_TOPIC_TIMEOUT(4015, HttpStatus.REQUEST_TIMEOUT);
 
   static {
     // codes uniqueness check

+ 13 - 0
kafka-ui-api/src/main/java/com/provectus/kafka/ui/exception/TopicRecreationException.java

@@ -0,0 +1,13 @@
+package com.provectus.kafka.ui.exception;
+
+public class TopicRecreationException extends CustomBaseException {
+  @Override
+  public ErrorCode getErrorCode() {
+    return ErrorCode.RECREATE_TOPIC_TIMEOUT;
+  }
+
+  public TopicRecreationException(String topicName, int seconds) {
+    super(String.format("Can't create topic '%s' in %d seconds: "
+                + "topic deletion is still in progress", topicName, seconds));
+  }
+}

+ 36 - 3
kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/TopicsService.java

@@ -6,6 +6,7 @@ import static java.util.stream.Collectors.toMap;
 import com.google.common.annotations.VisibleForTesting;
 import com.provectus.kafka.ui.exception.TopicMetadataException;
 import com.provectus.kafka.ui.exception.TopicNotFoundException;
+import com.provectus.kafka.ui.exception.TopicRecreationException;
 import com.provectus.kafka.ui.exception.ValidationException;
 import com.provectus.kafka.ui.mapper.ClusterMapper;
 import com.provectus.kafka.ui.model.Feature;
@@ -31,6 +32,7 @@ import com.provectus.kafka.ui.model.TopicUpdateDTO;
 import com.provectus.kafka.ui.model.TopicsResponseDTO;
 import com.provectus.kafka.ui.serde.DeserializationService;
 import com.provectus.kafka.ui.util.JmxClusterUtil;
+import java.time.Duration;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.Comparator;
@@ -39,8 +41,8 @@ import java.util.Map;
 import java.util.Optional;
 import java.util.function.Function;
 import java.util.function.Predicate;
+import java.util.stream.Collectors;
 import lombok.RequiredArgsConstructor;
-import lombok.Value;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.kafka.clients.admin.ConfigEntry;
 import org.apache.kafka.clients.admin.NewPartitionReassignment;
@@ -49,8 +51,11 @@ import org.apache.kafka.clients.admin.OffsetSpec;
 import org.apache.kafka.clients.admin.TopicDescription;
 import org.apache.kafka.common.Node;
 import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.TopicExistsException;
+import org.springframework.beans.factory.annotation.Value;
 import org.springframework.stereotype.Service;
 import reactor.core.publisher.Mono;
+import reactor.util.retry.Retry;
 
 @Service
 @RequiredArgsConstructor
@@ -62,6 +67,10 @@ public class TopicsService {
   private final ClusterMapper clusterMapper;
   private final DeserializationService deserializationService;
   private final MetricsCache metricsCache;
+  @Value("${topic.recreate.maxRetries:15}")
+  private int recreateMaxRetries;
+  @Value("${topic.recreate.delay.seconds:1}")
+  private int recreateDelayInSeconds;
 
   public Mono<TopicsResponseDTO> getTopics(KafkaCluster cluster,
                                            Optional<Integer> pageNum,
@@ -182,6 +191,30 @@ public class TopicsService {
         .map(clusterMapper::toTopic);
   }
 
+  public Mono<TopicDTO> recreateTopic(KafkaCluster cluster, String topicName) {
+    return loadTopic(cluster, topicName)
+            .flatMap(t -> deleteTopic(cluster, topicName)
+                    .thenReturn(t).delayElement(Duration.ofSeconds(recreateDelayInSeconds))
+                    .flatMap(topic -> adminClientService.get(cluster).flatMap(ac -> ac.createTopic(topic.getName(),
+                                            topic.getPartitionCount(),
+                                            (short) topic.getReplicationFactor(),
+                                            topic.getTopicConfigs()
+                                                    .stream()
+                                                    .collect(Collectors
+                                                            .toMap(InternalTopicConfig::getName,
+                                                                    InternalTopicConfig::getValue)))
+                                    .thenReturn(topicName))
+                            .retryWhen(Retry.fixedDelay(recreateMaxRetries,
+                                            Duration.ofSeconds(recreateDelayInSeconds))
+                                    .filter(throwable -> throwable instanceof TopicExistsException)
+                                    .onRetryExhaustedThrow((a, b) ->
+                                            new TopicRecreationException(topicName,
+                                                    recreateMaxRetries * recreateDelayInSeconds)))
+                            .flatMap(a -> loadTopic(cluster, topicName)).map(clusterMapper::toTopic)
+                    )
+            );
+  }
+
   private Mono<InternalTopic> updateTopic(KafkaCluster cluster,
                                          String topicName,
                                          TopicUpdateDTO topicUpdate) {
@@ -395,12 +428,12 @@ public class TopicsService {
   }
 
   @VisibleForTesting
-  @Value
+  @lombok.Value
   static class Pagination {
     ReactiveAdminClient adminClient;
     MetricsCache.Metrics metrics;
 
-    @Value
+    @lombok.Value
     static class Page {
       List<String> topics;
       int totalPages;

+ 25 - 0
kafka-ui-api/src/test/java/com/provectus/kafka/ui/KafkaTopicCreateTests.java

@@ -57,4 +57,29 @@ public class KafkaTopicCreateTests extends AbstractBaseTest {
         .expectStatus()
         .isBadRequest();
   }
+
+  @Test
+  void shouldRecreateExistingTopicSuccessfully() {
+    TopicCreationDTO topicCreation = new TopicCreationDTO()
+            .replicationFactor(1)
+            .partitions(3)
+            .name(UUID.randomUUID().toString());
+
+    webTestClient.post()
+            .uri("/api/clusters/{clusterName}/topics", LOCAL)
+            .bodyValue(topicCreation)
+            .exchange()
+            .expectStatus()
+            .isOk();
+
+    webTestClient.post()
+            .uri("/api/clusters/{clusterName}/topics/" + topicCreation.getName(), LOCAL)
+            .exchange()
+            .expectStatus()
+            .isCreated()
+            .expectBody()
+            .jsonPath("partitionCount").isEqualTo(topicCreation.getPartitions().toString())
+            .jsonPath("replicationFactor").isEqualTo(topicCreation.getReplicationFactor().toString())
+            .jsonPath("name").isEqualTo(topicCreation.getName());
+  }
 }

+ 27 - 0
kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml

@@ -355,6 +355,33 @@ paths:
             application/json:
               schema:
                 $ref: '#/components/schemas/TopicDetails'
+    post:
+      tags:
+        - Topics
+      summary: recreateTopic
+      operationId: recreateTopic
+      parameters:
+        - name: clusterName
+          in: path
+          required: true
+          schema:
+            type: string
+        - name: topicName
+          in: path
+          required: true
+          schema:
+            type: string
+      responses:
+        201:
+          description: Created
+          content:
+            application/json:
+              schema:
+                $ref: '#/components/schemas/Topic'
+        404:
+          description: Not found
+        408:
+          description: Topic recreation timeout
     patch:
       tags:
         - Topics