From 8585c86f96d79e9f6758d8f7e35e812d721b7d3e Mon Sep 17 00:00:00 2001 From: Ramazan Yapparov Date: Fri, 12 Mar 2021 14:02:44 +0300 Subject: [PATCH] added partition parameter --- .../provectus/kafka/ui/cluster/service/ClusterService.java | 5 +++-- .../com/provectus/kafka/ui/rest/MetricsRestController.java | 5 +++-- .../src/main/resources/swagger/kafka-ui-api.yaml | 5 +++++ 3 files changed, 11 insertions(+), 4 deletions(-) diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/service/ClusterService.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/service/ClusterService.java index 4001fadc8a..dd3d7812a3 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/service/ClusterService.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/service/ClusterService.java @@ -197,14 +197,15 @@ public class ClusterService { .orElse(Flux.empty()); } - public Mono deleteTopicMessages(String clusterName, String topicName) { + public Mono deleteTopicMessages(String clusterName, String topicName, Optional partition) { var cluster = clustersStorage.getClusterByName(clusterName) .orElseThrow(() -> new NotFoundException("No such cluster")); var partitions = getTopicDetails(clusterName, topicName) .orElseThrow(() -> new NotFoundException("No such topic")) .getPartitions().stream() .map(Partition::getPartition) - .map(partition -> new TopicPartition(topicName, partition)) + .filter(p -> partition.isEmpty() || partition.get().equals(p)) + .map(p-> new TopicPartition(topicName, p)) .collect(Collectors.toList()); return consumingService.loadOffsets(cluster, partitions) .flatMap(offsets -> kafkaService.deleteTopicMessages(cluster, topicName, offsets)); diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/rest/MetricsRestController.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/rest/MetricsRestController.java index f3fb2d2b4d..db9b1b4a29 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/rest/MetricsRestController.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/rest/MetricsRestController.java @@ -18,6 +18,7 @@ import reactor.core.publisher.Mono; import javax.validation.Valid; import java.util.Collections; import java.util.List; +import java.util.Optional; import java.util.function.Function; @RestController @@ -85,8 +86,8 @@ public class MetricsRestController implements ApiClustersApi { } @Override - public Mono> deleteTopicMessages(String clusterName, String topicName, ServerWebExchange exchange) { - return clusterService.deleteTopicMessages(clusterName, topicName).map(ResponseEntity::ok); + public Mono> deleteTopicMessages(String clusterName, String topicName, @Valid Integer partition, ServerWebExchange exchange) { + return clusterService.deleteTopicMessages(clusterName, topicName, Optional.ofNullable(partition)).map(ResponseEntity::ok); } @Override diff --git a/kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml b/kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml index 036c249bcc..0849228851 100644 --- a/kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml +++ b/kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml @@ -325,6 +325,11 @@ paths: required: true schema: type: string + - name: partition + in: query + required: false + schema: + type: integer responses: 200: description: OK