Просмотр исходного кода

onTopicXXX methods refactored to use only cluster name. This is important because cluster instance send as parameter can be outdated in case of simultaneous updates (#1012)

Ilya Kuramshin 3 лет назад
Родитель
Сommit
83fba5aca3

+ 4 - 2
kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/ClustersStorage.java

@@ -50,7 +50,8 @@ public class ClustersStorage {
     return kafkaCluster;
   }
 
-  public void onTopicDeleted(KafkaCluster cluster, String topicToDelete) {
+  public void onTopicDeleted(String clusterName, String topicToDelete) {
+    var cluster = kafkaClusters.get(clusterName);
     var topics = Optional.ofNullable(cluster.getTopics())
         .map(HashMap::new)
         .orElseGet(HashMap::new);
@@ -59,7 +60,8 @@ public class ClustersStorage {
     setKafkaCluster(cluster.getName(), updatedCluster);
   }
 
-  public void onTopicUpdated(KafkaCluster cluster, InternalTopic updatedTopic) {
+  public void onTopicUpdated(String clusterName, InternalTopic updatedTopic) {
+    var cluster = kafkaClusters.get(clusterName);
     var topics = Optional.ofNullable(cluster.getTopics())
         .map(HashMap::new)
         .orElseGet(HashMap::new);

+ 5 - 5
kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/TopicsService.java

@@ -163,7 +163,7 @@ public class TopicsService {
   public Mono<TopicDTO> createTopic(
       KafkaCluster cluster, Mono<TopicCreationDTO> topicCreation) {
     return adminClientService.get(cluster).flatMap(ac -> createTopic(ac, topicCreation))
-        .doOnNext(t -> clustersStorage.onTopicUpdated(cluster, t))
+        .doOnNext(t -> clustersStorage.onTopicUpdated(cluster.getName(), t))
         .map(clusterMapper::toTopic);
   }
 
@@ -200,7 +200,7 @@ public class TopicsService {
                                     Mono<TopicUpdateDTO> topicUpdate) {
     return topicUpdate
         .flatMap(t -> updateTopic(cl, topicName, t))
-        .doOnNext(t -> clustersStorage.onTopicUpdated(cl, t))
+        .doOnNext(t -> clustersStorage.onTopicUpdated(cl.getName(), t))
         .map(clusterMapper::toTopic);
   }
 
@@ -253,7 +253,7 @@ public class TopicsService {
               getPartitionsReassignments(cluster, topicName,
                   replicationFactorChange));
         })
-        .doOnNext(topic -> clustersStorage.onTopicUpdated(cluster, topic))
+        .doOnNext(topic -> clustersStorage.onTopicUpdated(cluster.getName(), topic))
         .map(t -> new ReplicationFactorChangeResponseDTO()
             .topicName(t.getName())
             .totalReplicationFactor(t.getReplicationFactor()));
@@ -386,7 +386,7 @@ public class TopicsService {
           return ac.createPartitions(newPartitionsMap)
               .then(getUpdatedTopic(ac, topicName));
         })
-        .doOnNext(t -> clustersStorage.onTopicUpdated(cluster, t))
+        .doOnNext(t -> clustersStorage.onTopicUpdated(cluster.getName(), t))
         .map(t -> new PartitionsIncreaseResponseDTO()
             .topicName(t.getName())
             .totalPartitionsCount(t.getPartitionCount()));
@@ -425,7 +425,7 @@ public class TopicsService {
         .orElseThrow(TopicNotFoundException::new);
     if (cluster.getFeatures().contains(Feature.TOPIC_DELETION)) {
       return adminClientService.get(cluster).flatMap(c -> c.deleteTopic(topicName))
-          .doOnSuccess(t -> clustersStorage.onTopicDeleted(cluster, topicName));
+          .doOnSuccess(t -> clustersStorage.onTopicDeleted(cluster.getName(), topicName));
     } else {
       return Mono.error(new ValidationException("Topic deletion restricted"));
     }