|
@@ -1,5 +1,6 @@
|
|
|
package com.provectus.kafka.ui.cluster.service;
|
|
|
|
|
|
+import com.provectus.kafka.ui.cluster.exception.NotFoundException;
|
|
|
import com.provectus.kafka.ui.cluster.mapper.ClusterMapper;
|
|
|
import com.provectus.kafka.ui.cluster.model.ClustersStorage;
|
|
|
import com.provectus.kafka.ui.cluster.model.ConsumerPosition;
|
|
@@ -169,12 +170,27 @@ public class ClusterService {
|
|
|
).orElse(Mono.empty());
|
|
|
}
|
|
|
|
|
|
+ public Mono<Void> deleteTopic(String clusterName, String topicName) {
|
|
|
+ var cluster = clustersStorage.getClusterByName(clusterName)
|
|
|
+ .orElseThrow(() -> new NotFoundException("No such cluster"));
|
|
|
+ getTopicDetails(clusterName, topicName)
|
|
|
+ .orElseThrow(() -> new NotFoundException("No such topic"));
|
|
|
+ return kafkaService.deleteTopic(cluster, topicName)
|
|
|
+ .doOnNext(t -> updateCluster(topicName, clusterName, cluster));
|
|
|
+ }
|
|
|
+
|
|
|
private KafkaCluster updateCluster(InternalTopic topic, String clusterName, KafkaCluster cluster) {
|
|
|
final KafkaCluster updatedCluster = kafkaService.getUpdatedCluster(cluster, topic);
|
|
|
clustersStorage.setKafkaCluster(clusterName, updatedCluster);
|
|
|
return updatedCluster;
|
|
|
}
|
|
|
|
|
|
+ private KafkaCluster updateCluster(String topicToDelete, String clusterName, KafkaCluster cluster) {
|
|
|
+ final KafkaCluster updatedCluster = kafkaService.getUpdatedCluster(cluster, topicToDelete);
|
|
|
+ clustersStorage.setKafkaCluster(clusterName, updatedCluster);
|
|
|
+ return updatedCluster;
|
|
|
+ }
|
|
|
+
|
|
|
public Flux<TopicMessage> getMessages(String clusterName, String topicName, ConsumerPosition consumerPosition, String query, Integer limit) {
|
|
|
return clustersStorage.getClusterByName(clusterName)
|
|
|
.map(c -> consumingService.loadMessages(c, topicName, consumerPosition, query, limit))
|