Browse Source

changed collecttopicdata method to correctly

Roman Nedzvetskiy 5 years ago
parent
commit
dad2b5ec43

+ 55 - 60
kafka-ui-api/src/main/java/com/provectus/kafka/ui/kafka/KafkaService.java

@@ -135,70 +135,65 @@ public class KafkaService {
     }
 
     private Mono<Topic> collectTopicData(KafkaCluster kafkaCluster, TopicDescription topicDescription) {
-        return Mono.just(topicDescription).map(td -> {
-            var topic = new Topic();
-            topic.setInternal(td.isInternal());
-            topic.setName(td.name());
-
-            int inSyncReplicasCount = 0, replicasCount = 0;
-            List<Partition> partitions = new ArrayList<>();
-
-            int urpCount = 0;
-            for (TopicPartitionInfo partition : td.partitions()) {
-                var partitionDto = new Partition();
-                partitionDto.setLeader(partition.leader().id());
-                partitionDto.setPartition(partition.partition());
-                List<Replica> replicas = new ArrayList<>();
-
-                boolean isUrp = false;
-                for (Node replicaNode : partition.replicas()) {
-                    var replica = new Replica();
-                    replica.setBroker(replicaNode.id());
-                    replica.setLeader(partition.leader() != null && partition.leader().id() == replicaNode.id());
-                    replica.setInSync(partition.isr().contains(replicaNode));
-                    if (!replica.getInSync()) {
-                        isUrp = true;
-                    }
-                    replicas.add(replica);
-
-                    inSyncReplicasCount += partition.isr().size();
-                    replicasCount += partition.replicas().size();
-                }
-                if (isUrp) {
-                    urpCount++;
+        var topic = new Topic();
+        topic.setInternal(topicDescription.isInternal());
+        topic.setName(topicDescription.name());
+
+        int inSyncReplicasCount = 0, replicasCount = 0;
+        List<Partition> partitions = new ArrayList<>();
+
+        int urpCount = 0;
+        for (TopicPartitionInfo partition : topicDescription.partitions()) {
+            var partitionDto = new Partition();
+            partitionDto.setLeader(partition.leader().id());
+            partitionDto.setPartition(partition.partition());
+            List<Replica> replicas = new ArrayList<>();
+
+            boolean isUrp = false;
+            for (Node replicaNode : partition.replicas()) {
+                var replica = new Replica();
+                replica.setBroker(replicaNode.id());
+                replica.setLeader(partition.leader() != null && partition.leader().id() == replicaNode.id());
+                replica.setInSync(partition.isr().contains(replicaNode));
+                if (!replica.getInSync()) {
+                    isUrp = true;
                 }
-                partitionDto.setReplicas(replicas);
-                partitions.add(partitionDto);
+                replicas.add(replica);
 
-                if (partition.leader() != null) {
-                    kafkaCluster.getBrokersMetrics().setOnlinePartitionCount(kafkaCluster.getBrokersMetrics().getOnlinePartitionCount() + 1);
-                } else {
-                    kafkaCluster.getBrokersMetrics().setOfflinePartitionCount(kafkaCluster.getBrokersMetrics().getOfflinePartitionCount() + 1);
-                }
+                inSyncReplicasCount += partition.isr().size();
+                replicasCount += partition.replicas().size();
+            }
+            if (isUrp) {
+                urpCount++;
             }
+            partitionDto.setReplicas(replicas);
+            partitions.add(partitionDto);
 
-            kafkaCluster.getCluster().setOnlinePartitionCount(kafkaCluster.getBrokersMetrics().getOnlinePartitionCount());
-            kafkaCluster.getBrokersMetrics().setUnderReplicatedPartitionCount(
-                    kafkaCluster.getBrokersMetrics().getUnderReplicatedPartitionCount() + urpCount);
-            kafkaCluster.getBrokersMetrics().setInSyncReplicasCount(
-                    kafkaCluster.getBrokersMetrics().getInSyncReplicasCount() + inSyncReplicasCount);
-            kafkaCluster.getBrokersMetrics().setOutOfSyncReplicasCount(
-                    kafkaCluster.getBrokersMetrics().getOutOfSyncReplicasCount() + (replicasCount - inSyncReplicasCount));
-
-            topic.setPartitions(partitions);
-            TopicDetails topicDetails = kafkaCluster.getOrCreateTopicDetails(td.name());
-            topicDetails.setReplicas(replicasCount);
-            topicDetails.setPartitionCount(td.partitions().size());
-            topicDetails.setInSyncReplicas(inSyncReplicasCount);
-            topicDetails.setReplicationFactor(td.partitions().size() > 0
-                    ? td.partitions().get(0).replicas().size()
-                    : null);
-            topicDetails.setUnderReplicatedPartitions(urpCount);
-            return topic;
-        }).flatMap(topic -> {
-                    loadTopicConfig(kafkaCluster, topic.getName());
-                    return Mono.just(topic);
-                });
+            if (partition.leader() != null) {
+                kafkaCluster.getBrokersMetrics().setOnlinePartitionCount(kafkaCluster.getBrokersMetrics().getOnlinePartitionCount() + 1);
+            } else {
+                kafkaCluster.getBrokersMetrics().setOfflinePartitionCount(kafkaCluster.getBrokersMetrics().getOfflinePartitionCount() + 1);
+            }
+        }
+
+        kafkaCluster.getCluster().setOnlinePartitionCount(kafkaCluster.getBrokersMetrics().getOnlinePartitionCount());
+        kafkaCluster.getBrokersMetrics().setUnderReplicatedPartitionCount(
+                kafkaCluster.getBrokersMetrics().getUnderReplicatedPartitionCount() + urpCount);
+        kafkaCluster.getBrokersMetrics().setInSyncReplicasCount(
+                kafkaCluster.getBrokersMetrics().getInSyncReplicasCount() + inSyncReplicasCount);
+        kafkaCluster.getBrokersMetrics().setOutOfSyncReplicasCount(
+                kafkaCluster.getBrokersMetrics().getOutOfSyncReplicasCount() + (replicasCount - inSyncReplicasCount));
+
+        topic.setPartitions(partitions);
+        TopicDetails topicDetails = kafkaCluster.getOrCreateTopicDetails(topicDescription.name());
+        topicDetails.setReplicas(replicasCount);
+        topicDetails.setPartitionCount(topicDescription.partitions().size());
+        topicDetails.setInSyncReplicas(inSyncReplicasCount);
+        topicDetails.setReplicationFactor(topicDescription.partitions().size() > 0
+                ? topicDescription.partitions().get(0).replicas().size()
+                : null);
+        topicDetails.setUnderReplicatedPartitions(urpCount);
+        return loadTopicConfig(kafkaCluster, topic.getName()).map(l -> topic);
     }
 
     private Mono<TopicDescription> getTopicDescription(Map.Entry<String, KafkaFuture<TopicDescription>> entry) {