|
@@ -1,6 +1,7 @@
|
|
|
package com.provectus.kafka.ui.kafka;
|
|
|
|
|
|
import com.provectus.kafka.ui.cluster.model.KafkaCluster;
|
|
|
+import com.provectus.kafka.ui.cluster.util.ClusterUtil;
|
|
|
import com.provectus.kafka.ui.model.*;
|
|
|
import lombok.RequiredArgsConstructor;
|
|
|
import lombok.SneakyThrows;
|
|
@@ -113,21 +114,22 @@ public class KafkaService {
|
|
|
AdminClient adminClient = kafkaCluster.getAdminClient();
|
|
|
ListTopicsOptions listTopicsOptions = new ListTopicsOptions();
|
|
|
listTopicsOptions.listInternal(true);
|
|
|
- var topicListings = adminClient.listTopics(listTopicsOptions).names().get();
|
|
|
- kafkaCluster.getCluster().setTopicCount(topicListings.size());
|
|
|
-
|
|
|
- DescribeTopicsResult topicDescriptionsWrapper = adminClient.describeTopics(topicListings);
|
|
|
- Map<String, KafkaFuture<TopicDescription>> topicDescriptionFuturesMap = topicDescriptionsWrapper.values();
|
|
|
- List<Topic> foundTopics = new ArrayList<>();
|
|
|
- resetMetrics(kafkaCluster);
|
|
|
-
|
|
|
- for (var entry : topicDescriptionFuturesMap.entrySet()) {
|
|
|
- var topicDescription = getTopicDescription(entry);
|
|
|
- if (topicDescription == null) continue;
|
|
|
- Topic topic = collectTopicData(kafkaCluster, topicDescription);
|
|
|
- foundTopics.add(topic);
|
|
|
+ var topicListings = ClusterUtil.toMono(adminClient.listTopics(listTopicsOptions).names()).map(tl -> {
|
|
|
+ kafkaCluster.getCluster().setTopicCount(tl.size());
|
|
|
+
|
|
|
+ DescribeTopicsResult topicDescriptionsWrapper = adminClient.describeTopics(tl);
|
|
|
+ Map<String, KafkaFuture<TopicDescription>> topicDescriptionFuturesMap = topicDescriptionsWrapper.values();
|
|
|
+ List<Topic> foundTopics = new ArrayList<>();
|
|
|
+ resetMetrics(kafkaCluster);
|
|
|
+
|
|
|
+ for (var entry : topicDescriptionFuturesMap.entrySet()) {
|
|
|
+ var topicDescription = getTopicDescription(entry);
|
|
|
+ if (topicDescription == null) continue;
|
|
|
+ Topic topic = collectTopicData(kafkaCluster, topicDescription);
|
|
|
+ foundTopics.add(topic);
|
|
|
+ }
|
|
|
+ kafkaCluster.setTopics(foundTopics);
|
|
|
}
|
|
|
- kafkaCluster.setTopics(foundTopics);
|
|
|
}
|
|
|
|
|
|
private void resetMetrics(KafkaCluster kafkaCluster) {
|