|
@@ -60,25 +60,16 @@ public class ClusterService {
|
|
|
|
|
|
public Mono<Topic> createTopic(String name, Mono<TopicFormData> topicFormData) {
|
|
|
return clustersStorage.getClusterByName(name).map(
|
|
|
- cluster -> kafkaService.createTopic(cluster, topicFormData)
|
|
|
- .flatMap(t -> kafkaService.getUpdatedCluster(cluster)
|
|
|
- .map(c -> {
|
|
|
- clustersStorage.setKafkaCluster(name, c);
|
|
|
- return t;
|
|
|
- })
|
|
|
- )
|
|
|
- ).orElse(Mono.empty()).map(clusterMapper::toTopic);
|
|
|
+ cluster -> kafkaService.createTopic(cluster, topicFormData)
|
|
|
+ .flatMap(t -> updateCluster(t, name, cluster)))
|
|
|
+ .orElse(Mono.empty()).map(clusterMapper::toTopic);
|
|
|
}
|
|
|
|
|
|
@SneakyThrows
|
|
|
public Mono<ResponseEntity<Topic>> updateTopic(String clusterName, String topicName, Mono<TopicFormData> topicFormData) {
|
|
|
return clustersStorage.getClusterByName(clusterName).map(cl ->
|
|
|
topicFormData.flatMap(t -> kafkaService.updateTopic(cl, topicName, t))
|
|
|
- .flatMap(t -> kafkaService.getUpdatedCluster(cl)
|
|
|
- .map(c -> {
|
|
|
- clustersStorage.setKafkaCluster(clusterName, c);
|
|
|
- return t;
|
|
|
- })
|
|
|
+ .flatMap(t -> updateCluster(t, clusterName, cl)
|
|
|
.map(ResponseEntity::ok)))
|
|
|
.orElse(Mono.empty());
|
|
|
}
|
|
@@ -89,4 +80,12 @@ public class ClusterService {
|
|
|
.map(kafkaService::getConsumerGroups)
|
|
|
.orElse(Mono.empty());
|
|
|
}
|
|
|
+
|
|
|
+ private <T> Mono<T> updateCluster (T topic, String clusterName, KafkaCluster cluster) {
|
|
|
+ return kafkaService.getUpdatedCluster(cluster)
|
|
|
+ .map(c -> {
|
|
|
+ clustersStorage.setKafkaCluster(clusterName, c);
|
|
|
+ return topic;
|
|
|
+ });
|
|
|
+ }
|
|
|
}
|