added partition parameter

This commit is contained in:
Ramazan Yapparov 2021-03-12 14:02:44 +03:00
parent 4694ccd1bf
commit 8585c86f96
3 changed files with 11 additions and 4 deletions

View file

@ -197,14 +197,15 @@ public class ClusterService {
.orElse(Flux.empty()); .orElse(Flux.empty());
} }
public Mono<Void> deleteTopicMessages(String clusterName, String topicName) { public Mono<Void> deleteTopicMessages(String clusterName, String topicName, Optional<Integer> partition) {
var cluster = clustersStorage.getClusterByName(clusterName) var cluster = clustersStorage.getClusterByName(clusterName)
.orElseThrow(() -> new NotFoundException("No such cluster")); .orElseThrow(() -> new NotFoundException("No such cluster"));
var partitions = getTopicDetails(clusterName, topicName) var partitions = getTopicDetails(clusterName, topicName)
.orElseThrow(() -> new NotFoundException("No such topic")) .orElseThrow(() -> new NotFoundException("No such topic"))
.getPartitions().stream() .getPartitions().stream()
.map(Partition::getPartition) .map(Partition::getPartition)
.map(partition -> new TopicPartition(topicName, partition)) .filter(p -> partition.isEmpty() || partition.get().equals(p))
.map(p-> new TopicPartition(topicName, p))
.collect(Collectors.toList()); .collect(Collectors.toList());
return consumingService.loadOffsets(cluster, partitions) return consumingService.loadOffsets(cluster, partitions)
.flatMap(offsets -> kafkaService.deleteTopicMessages(cluster, topicName, offsets)); .flatMap(offsets -> kafkaService.deleteTopicMessages(cluster, topicName, offsets));

View file

@ -18,6 +18,7 @@ import reactor.core.publisher.Mono;
import javax.validation.Valid; import javax.validation.Valid;
import java.util.Collections; import java.util.Collections;
import java.util.List; import java.util.List;
import java.util.Optional;
import java.util.function.Function; import java.util.function.Function;
@RestController @RestController
@ -85,8 +86,8 @@ public class MetricsRestController implements ApiClustersApi {
} }
@Override @Override
public Mono<ResponseEntity<Void>> deleteTopicMessages(String clusterName, String topicName, ServerWebExchange exchange) { public Mono<ResponseEntity<Void>> deleteTopicMessages(String clusterName, String topicName, @Valid Integer partition, ServerWebExchange exchange) {
return clusterService.deleteTopicMessages(clusterName, topicName).map(ResponseEntity::ok); return clusterService.deleteTopicMessages(clusterName, topicName, Optional.ofNullable(partition)).map(ResponseEntity::ok);
} }
@Override @Override

View file

@ -325,6 +325,11 @@ paths:
required: true required: true
schema: schema:
type: string type: string
- name: partition
in: query
required: false
schema:
type: integer
responses: responses:
200: 200:
description: OK description: OK