From edabfca9660ae6769b7849e62ffa5cef006d23d6 Mon Sep 17 00:00:00 2001 From: ValentinPrischepa Date: Wed, 9 Feb 2022 05:05:08 -0800 Subject: [PATCH] Implement recreating a topic * [ISSUE-998][backend] Add functionality to re-create topic in one click * [ISSUE-998][backend] Add functionality to re-create topic in one click * [ISSUE-998][backend] Add functionality to re-create topic in one click Co-authored-by: Roman Zabaluev --- README.md | 8 ++-- .../kafka/ui/controller/TopicsController.java | 7 ++++ .../kafka/ui/exception/ErrorCode.java | 3 +- .../exception/TopicRecreationException.java | 13 +++++++ .../kafka/ui/service/TopicsService.java | 39 +++++++++++++++++-- .../kafka/ui/KafkaTopicCreateTests.java | 25 ++++++++++++ .../main/resources/swagger/kafka-ui-api.yaml | 27 +++++++++++++ 7 files changed, 115 insertions(+), 7 deletions(-) create mode 100644 kafka-ui-api/src/main/java/com/provectus/kafka/ui/exception/TopicRecreationException.java diff --git a/README.md b/README.md index 55deb5f4a0..7a0f30e773 100644 --- a/README.md +++ b/README.md @@ -154,9 +154,9 @@ For example, if you want to use an environment variable to set the `name` parame |Name |Description |-----------------------|------------------------------- -|`SERVER_SERVLET_CONTEXT_PATH` | URI basePath +|`SERVER_SERVLET_CONTEXT_PATH` | URI basePath |`LOGGING_LEVEL_ROOT` | Setting log level (trace, debug, info, warn, error). Default: info -|`LOGGING_LEVEL_COM_PROVECTUS` |Setting log level (trace, debug, info, warn, error). Default: debug +|`LOGGING_LEVEL_COM_PROVECTUS` |Setting log level (trace, debug, info, warn, error). Default: debug |`SERVER_PORT` |Port for the embedded server. Default: `8080` |`KAFKA_ADMIN-CLIENT-TIMEOUT` | Kafka API timeout in ms. Default: `30000` |`KAFKA_CLUSTERS_0_NAME` | Cluster name @@ -167,7 +167,7 @@ For example, if you want to use an environment variable to set the `name` parame |`KAFKA_CLUSTERS_0_SCHEMAREGISTRY` |SchemaRegistry's address |`KAFKA_CLUSTERS_0_SCHEMAREGISTRYAUTH_USERNAME` |SchemaRegistry's basic authentication username |`KAFKA_CLUSTERS_0_SCHEMAREGISTRYAUTH_PASSWORD` |SchemaRegistry's basic authentication password -|`KAFKA_CLUSTERS_0_SCHEMANAMETEMPLATE` |How keys are saved to schemaRegistry +|`KAFKA_CLUSTERS_0_SCHEMANAMETEMPLATE` |How keys are saved to schemaRegistry |`KAFKA_CLUSTERS_0_JMXPORT` |Open jmxPosrts of a broker |`KAFKA_CLUSTERS_0_READONLY` |Enable read-only mode. Default: false |`KAFKA_CLUSTERS_0_DISABLELOGDIRSCOLLECTION` |Disable collecting segments information. It should be true for confluent cloud. Default: false @@ -176,3 +176,5 @@ For example, if you want to use an environment variable to set the `name` parame |`KAFKA_CLUSTERS_0_JMXSSL` |Enable SSL for JMX? `true` or `false`. For advanced setup, see `kafka-ui-jmx-secured.yml` |`KAFKA_CLUSTERS_0_JMXUSERNAME` |Username for JMX authentication |`KAFKA_CLUSTERS_0_JMXPASSWORD` |Password for JMX authentication +|`TOPIC_RECREATE_DELAY_SECONDS` |Time delay between topic deletion and topic creation attempts for topic recreate functionality. Default: 1 +|`TOPIC_RECREATE_MAXRETRIES` |Number of attempts of topic creation after topic deletion for topic recreate functionality. Default: 15 diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/TopicsController.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/TopicsController.java index 893c23474e..6d58f5eaf6 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/TopicsController.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/TopicsController.java @@ -39,6 +39,13 @@ public class TopicsController extends AbstractController implements TopicsApi { .switchIfEmpty(Mono.just(ResponseEntity.notFound().build())); } + @Override + public Mono> recreateTopic(String clusterName, + String topicName, ServerWebExchange serverWebExchange) { + return topicsService.recreateTopic(getCluster(clusterName), topicName) + .map(s -> new ResponseEntity<>(s, HttpStatus.CREATED)); + } + @Override public Mono> deleteTopic( String clusterName, String topicName, ServerWebExchange exchange) { diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/exception/ErrorCode.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/exception/ErrorCode.java index 522ac2cef2..20df819ec9 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/exception/ErrorCode.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/exception/ErrorCode.java @@ -24,7 +24,8 @@ public enum ErrorCode { KSQLDB_NOT_FOUND(4011, HttpStatus.NOT_FOUND), DIR_NOT_FOUND(4012, HttpStatus.BAD_REQUEST), TOPIC_OR_PARTITION_NOT_FOUND(4013, HttpStatus.BAD_REQUEST), - INVALID_REQUEST(4014, HttpStatus.BAD_REQUEST); + INVALID_REQUEST(4014, HttpStatus.BAD_REQUEST), + RECREATE_TOPIC_TIMEOUT(4015, HttpStatus.REQUEST_TIMEOUT); static { // codes uniqueness check diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/exception/TopicRecreationException.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/exception/TopicRecreationException.java new file mode 100644 index 0000000000..0eca1fb62d --- /dev/null +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/exception/TopicRecreationException.java @@ -0,0 +1,13 @@ +package com.provectus.kafka.ui.exception; + +public class TopicRecreationException extends CustomBaseException { + @Override + public ErrorCode getErrorCode() { + return ErrorCode.RECREATE_TOPIC_TIMEOUT; + } + + public TopicRecreationException(String topicName, int seconds) { + super(String.format("Can't create topic '%s' in %d seconds: " + + "topic deletion is still in progress", topicName, seconds)); + } +} diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/TopicsService.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/TopicsService.java index 5a14e8c9af..f0929b1ac3 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/TopicsService.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/TopicsService.java @@ -6,6 +6,7 @@ import static java.util.stream.Collectors.toMap; import com.google.common.annotations.VisibleForTesting; import com.provectus.kafka.ui.exception.TopicMetadataException; import com.provectus.kafka.ui.exception.TopicNotFoundException; +import com.provectus.kafka.ui.exception.TopicRecreationException; import com.provectus.kafka.ui.exception.ValidationException; import com.provectus.kafka.ui.mapper.ClusterMapper; import com.provectus.kafka.ui.model.Feature; @@ -31,6 +32,7 @@ import com.provectus.kafka.ui.model.TopicUpdateDTO; import com.provectus.kafka.ui.model.TopicsResponseDTO; import com.provectus.kafka.ui.serde.DeserializationService; import com.provectus.kafka.ui.util.JmxClusterUtil; +import java.time.Duration; import java.util.Collection; import java.util.Collections; import java.util.Comparator; @@ -39,8 +41,8 @@ import java.util.Map; import java.util.Optional; import java.util.function.Function; import java.util.function.Predicate; +import java.util.stream.Collectors; import lombok.RequiredArgsConstructor; -import lombok.Value; import org.apache.commons.lang3.StringUtils; import org.apache.kafka.clients.admin.ConfigEntry; import org.apache.kafka.clients.admin.NewPartitionReassignment; @@ -49,8 +51,11 @@ import org.apache.kafka.clients.admin.OffsetSpec; import org.apache.kafka.clients.admin.TopicDescription; import org.apache.kafka.common.Node; import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.errors.TopicExistsException; +import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Service; import reactor.core.publisher.Mono; +import reactor.util.retry.Retry; @Service @RequiredArgsConstructor @@ -62,6 +67,10 @@ public class TopicsService { private final ClusterMapper clusterMapper; private final DeserializationService deserializationService; private final MetricsCache metricsCache; + @Value("${topic.recreate.maxRetries:15}") + private int recreateMaxRetries; + @Value("${topic.recreate.delay.seconds:1}") + private int recreateDelayInSeconds; public Mono getTopics(KafkaCluster cluster, Optional pageNum, @@ -182,6 +191,30 @@ public class TopicsService { .map(clusterMapper::toTopic); } + public Mono recreateTopic(KafkaCluster cluster, String topicName) { + return loadTopic(cluster, topicName) + .flatMap(t -> deleteTopic(cluster, topicName) + .thenReturn(t).delayElement(Duration.ofSeconds(recreateDelayInSeconds)) + .flatMap(topic -> adminClientService.get(cluster).flatMap(ac -> ac.createTopic(topic.getName(), + topic.getPartitionCount(), + (short) topic.getReplicationFactor(), + topic.getTopicConfigs() + .stream() + .collect(Collectors + .toMap(InternalTopicConfig::getName, + InternalTopicConfig::getValue))) + .thenReturn(topicName)) + .retryWhen(Retry.fixedDelay(recreateMaxRetries, + Duration.ofSeconds(recreateDelayInSeconds)) + .filter(throwable -> throwable instanceof TopicExistsException) + .onRetryExhaustedThrow((a, b) -> + new TopicRecreationException(topicName, + recreateMaxRetries * recreateDelayInSeconds))) + .flatMap(a -> loadTopic(cluster, topicName)).map(clusterMapper::toTopic) + ) + ); + } + private Mono updateTopic(KafkaCluster cluster, String topicName, TopicUpdateDTO topicUpdate) { @@ -395,12 +428,12 @@ public class TopicsService { } @VisibleForTesting - @Value + @lombok.Value static class Pagination { ReactiveAdminClient adminClient; MetricsCache.Metrics metrics; - @Value + @lombok.Value static class Page { List topics; int totalPages; diff --git a/kafka-ui-api/src/test/java/com/provectus/kafka/ui/KafkaTopicCreateTests.java b/kafka-ui-api/src/test/java/com/provectus/kafka/ui/KafkaTopicCreateTests.java index 6470865bc0..a301bee396 100644 --- a/kafka-ui-api/src/test/java/com/provectus/kafka/ui/KafkaTopicCreateTests.java +++ b/kafka-ui-api/src/test/java/com/provectus/kafka/ui/KafkaTopicCreateTests.java @@ -57,4 +57,29 @@ public class KafkaTopicCreateTests extends AbstractBaseTest { .expectStatus() .isBadRequest(); } + + @Test + void shouldRecreateExistingTopicSuccessfully() { + TopicCreationDTO topicCreation = new TopicCreationDTO() + .replicationFactor(1) + .partitions(3) + .name(UUID.randomUUID().toString()); + + webTestClient.post() + .uri("/api/clusters/{clusterName}/topics", LOCAL) + .bodyValue(topicCreation) + .exchange() + .expectStatus() + .isOk(); + + webTestClient.post() + .uri("/api/clusters/{clusterName}/topics/" + topicCreation.getName(), LOCAL) + .exchange() + .expectStatus() + .isCreated() + .expectBody() + .jsonPath("partitionCount").isEqualTo(topicCreation.getPartitions().toString()) + .jsonPath("replicationFactor").isEqualTo(topicCreation.getReplicationFactor().toString()) + .jsonPath("name").isEqualTo(topicCreation.getName()); + } } diff --git a/kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml b/kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml index b804ee332f..277c6d7152 100644 --- a/kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml +++ b/kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml @@ -355,6 +355,33 @@ paths: application/json: schema: $ref: '#/components/schemas/TopicDetails' + post: + tags: + - Topics + summary: recreateTopic + operationId: recreateTopic + parameters: + - name: clusterName + in: path + required: true + schema: + type: string + - name: topicName + in: path + required: true + schema: + type: string + responses: + 201: + description: Created + content: + application/json: + schema: + $ref: '#/components/schemas/Topic' + 404: + description: Not found + 408: + description: Topic recreation timeout patch: tags: - Topics