|
@@ -27,25 +27,22 @@ public class KafkaService {
|
|
|
|
|
|
@SneakyThrows
|
|
|
public void loadClusterMetrics(KafkaCluster kafkaCluster) {
|
|
|
- Mono.just(false)
|
|
|
- .map(b -> {
|
|
|
- log.debug("Start getting metrics for kafkaCluster: {}", kafkaCluster.getName());
|
|
|
- if (kafkaCluster.getAdminClient() != null) {
|
|
|
- b = isAdminClientConnected(kafkaCluster);
|
|
|
- }
|
|
|
- if (kafkaCluster.getAdminClient() == null || !b) {
|
|
|
- b = createAdminClient(kafkaCluster);
|
|
|
- }
|
|
|
- if (!b) {
|
|
|
- kafkaCluster.getCluster().setStatus(ServerStatus.OFFLINE);
|
|
|
- }
|
|
|
- return b;
|
|
|
- }).filter(s -> s)
|
|
|
- .doOnNext(s -> {
|
|
|
- kafkaCluster.getCluster().setId(kafkaCluster.getId());
|
|
|
- kafkaCluster.getCluster().setStatus(ServerStatus.ONLINE);
|
|
|
- }).doOnNext(s -> loadMetrics(kafkaCluster))
|
|
|
- .doOnNext(s -> loadTopicsData(kafkaCluster)).subscribe();
|
|
|
+ var isConnected = false;
|
|
|
+ log.debug("Start getting metrics for kafkaCluster: {}", kafkaCluster.getName());
|
|
|
+ if (kafkaCluster.getAdminClient() != null) {
|
|
|
+ isConnected = isAdminClientConnected(kafkaCluster);
|
|
|
+ }
|
|
|
+ if (kafkaCluster.getAdminClient() == null || !isConnected) {
|
|
|
+ isConnected = createAdminClient(kafkaCluster);
|
|
|
+ }
|
|
|
+ if (!isConnected) {
|
|
|
+ kafkaCluster.getCluster().setStatus(ServerStatus.OFFLINE);
|
|
|
+ return;
|
|
|
+ }
|
|
|
+ kafkaCluster.getCluster().setId(kafkaCluster.getId());
|
|
|
+ kafkaCluster.getCluster().setStatus(ServerStatus.ONLINE);
|
|
|
+ loadMetrics(kafkaCluster);
|
|
|
+ loadTopicsData(kafkaCluster);
|
|
|
}
|
|
|
|
|
|
|
|
@@ -114,19 +111,19 @@ public class KafkaService {
|
|
|
ListTopicsOptions listTopicsOptions = new ListTopicsOptions();
|
|
|
listTopicsOptions.listInternal(true);
|
|
|
ClusterUtil.toMono(adminClient.listTopics(listTopicsOptions).names())
|
|
|
- .map(tl -> {
|
|
|
- kafkaCluster.getCluster().setTopicCount(tl.size());
|
|
|
- DescribeTopicsResult topicDescriptionsWrapper = adminClient.describeTopics(tl);
|
|
|
- Map<String, KafkaFuture<TopicDescription>> topicDescriptionFuturesMap = topicDescriptionsWrapper.values();
|
|
|
- resetMetrics(kafkaCluster);
|
|
|
- return topicDescriptionFuturesMap.entrySet();
|
|
|
- })
|
|
|
- .flatMapMany(Flux::fromIterable)
|
|
|
- .flatMap(s -> ClusterUtil.toMono(s.getValue()))
|
|
|
- .flatMap(e -> collectTopicData(kafkaCluster, e))
|
|
|
- .collectList()
|
|
|
- .doOnNext(kafkaCluster::setTopics)
|
|
|
- .subscribe();
|
|
|
+ .map(tl -> {
|
|
|
+ kafkaCluster.getCluster().setTopicCount(tl.size());
|
|
|
+ DescribeTopicsResult topicDescriptionsWrapper = adminClient.describeTopics(tl);
|
|
|
+ Map<String, KafkaFuture<TopicDescription>> topicDescriptionFuturesMap = topicDescriptionsWrapper.values();
|
|
|
+ resetMetrics(kafkaCluster);
|
|
|
+ return topicDescriptionFuturesMap.entrySet();
|
|
|
+ })
|
|
|
+ .flatMapMany(Flux::fromIterable)
|
|
|
+ .flatMap(s -> ClusterUtil.toMono(s.getValue()))
|
|
|
+ .flatMap(e -> collectTopicData(kafkaCluster, e))
|
|
|
+ .collectList()
|
|
|
+ .doOnNext(kafkaCluster::setTopics)
|
|
|
+ .subscribe();
|
|
|
}
|
|
|
|
|
|
private void resetMetrics(KafkaCluster kafkaCluster) {
|
|
@@ -268,6 +265,6 @@ public class KafkaService {
|
|
|
.values()
|
|
|
.values()
|
|
|
.iterator()
|
|
|
- .next()).map(s -> Mono.just(true));
|
|
|
+ .next()).subscribe();
|
|
|
}
|
|
|
}
|