diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/service/ClusterService.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/service/ClusterService.java index dbfdcd223c..ca38ff6185 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/service/ClusterService.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/service/ClusterService.java @@ -1,5 +1,6 @@ package com.provectus.kafka.ui.cluster.service; +import com.provectus.kafka.ui.cluster.exception.NotFoundException; import com.provectus.kafka.ui.cluster.mapper.ClusterMapper; import com.provectus.kafka.ui.cluster.model.ClustersStorage; import com.provectus.kafka.ui.cluster.model.ConsumerPosition; @@ -169,12 +170,27 @@ public class ClusterService { ).orElse(Mono.empty()); } + public Mono deleteTopic(String clusterName, String topicName) { + var cluster = clustersStorage.getClusterByName(clusterName) + .orElseThrow(() -> new NotFoundException("No such cluster")); + getTopicDetails(clusterName, topicName) + .orElseThrow(() -> new NotFoundException("No such topic")); + return kafkaService.deleteTopic(cluster, topicName) + .doOnNext(t -> updateCluster(topicName, clusterName, cluster)); + } + private KafkaCluster updateCluster(InternalTopic topic, String clusterName, KafkaCluster cluster) { final KafkaCluster updatedCluster = kafkaService.getUpdatedCluster(cluster, topic); clustersStorage.setKafkaCluster(clusterName, updatedCluster); return updatedCluster; } + private KafkaCluster updateCluster(String topicToDelete, String clusterName, KafkaCluster cluster) { + final KafkaCluster updatedCluster = kafkaService.getUpdatedCluster(cluster, topicToDelete); + clustersStorage.setKafkaCluster(clusterName, updatedCluster); + return updatedCluster; + } + public Flux getMessages(String clusterName, String topicName, ConsumerPosition consumerPosition, String query, Integer limit) { return clustersStorage.getClusterByName(clusterName) .map(c -> consumingService.loadMessages(c, topicName, consumerPosition, query, limit)) diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/kafka/KafkaService.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/kafka/KafkaService.java index d26afd07eb..5c59b282e6 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/kafka/KafkaService.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/kafka/KafkaService.java @@ -56,6 +56,12 @@ public class KafkaService { return cluster.toBuilder().topics(topics).build(); } + public KafkaCluster getUpdatedCluster(KafkaCluster cluster, String topicToDelete) { + final Map topics = new HashMap<>(cluster.getTopics()); + topics.remove(topicToDelete); + return cluster.toBuilder().topics(topics).build(); + } + @SneakyThrows public Mono getUpdatedCluster(KafkaCluster cluster) { return getOrCreateAdminClient(cluster) @@ -184,6 +190,13 @@ public class KafkaService { return getOrCreateAdminClient(cluster).flatMap(ac -> createTopic(ac.getAdminClient(), topicFormData)); } + public Mono deleteTopic(KafkaCluster cluster, String topicName) { + return getOrCreateAdminClient(cluster) + .map(ExtendedAdminClient::getAdminClient) + .map(adminClient -> adminClient.deleteTopics(List.of(topicName))) + .then(); + } + @SneakyThrows public Mono createTopic(AdminClient adminClient, Mono topicFormData) { return topicFormData.flatMap( diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/rest/MetricsRestController.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/rest/MetricsRestController.java index 7b150c8f43..a2cacbba8e 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/rest/MetricsRestController.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/rest/MetricsRestController.java @@ -158,6 +158,11 @@ public class MetricsRestController implements ApiClustersApi { return clusterService.updateTopic(clusterId, topicName, topicFormData).map(ResponseEntity::ok); } + @Override + public Mono> deleteTopic(String clusterName, String topicName, ServerWebExchange exchange) { + return clusterService.deleteTopic(clusterName, topicName).map(ResponseEntity::ok); + } + @Override public Mono> getGlobalSchemaCompatibilityLevel(String clusterName, ServerWebExchange exchange) { return schemaRegistryService.getGlobalSchemaCompatibilityLevel(clusterName) diff --git a/kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml b/kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml index 85e192fac0..635f5b1693 100644 --- a/kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml +++ b/kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml @@ -215,6 +215,27 @@ paths: application/json: schema: $ref: '#/components/schemas/Topic' + delete: + tags: + - /api/clusters + summary: deleteTopic + operationId: deleteTopic + parameters: + - name: clusterName + in: path + required: true + schema: + type: string + - name: topicName + in: path + required: true + schema: + type: string + responses: + 200: + description: OK + 404: + description: Not found /api/clusters/{clusterName}/topics/{topicName}/config: get: