|
@@ -31,39 +31,52 @@ public class KafkaService {
|
|
|
|
|
|
private Map<String, AdminClient> adminClientCache = new ConcurrentHashMap<>();
|
|
private Map<String, AdminClient> adminClientCache = new ConcurrentHashMap<>();
|
|
|
|
|
|
|
|
+ @SneakyThrows
|
|
public Mono<ClusterWithId> getUpdatedCluster(ClusterWithId clusterWithId) {
|
|
public Mono<ClusterWithId> getUpdatedCluster(ClusterWithId clusterWithId) {
|
|
- var kafkaCluster = clusterWithId.getKafkaCluster();
|
|
|
|
|
|
+ var tempCluster = ClusterUtil.clone(clusterWithId.getKafkaCluster());
|
|
|
|
+ var internalCluster = clusterWithId.getKafkaCluster().toBuilder();
|
|
return getOrCreateAdminClient(clusterWithId).flatMap(
|
|
return getOrCreateAdminClient(clusterWithId).flatMap(
|
|
ac ->
|
|
ac ->
|
|
- getClusterMetrics(ac, kafkaCluster).flatMap(
|
|
|
|
- metrics -> {
|
|
|
|
- Cluster cluster = kafkaCluster.getCluster();
|
|
|
|
- cluster.setStatus(ServerStatus.ONLINE);
|
|
|
|
- cluster.setBytesInPerSec(metrics.getBytesInPerSec());
|
|
|
|
- cluster.setBytesOutPerSec(metrics.getBytesOutPerSec());
|
|
|
|
- BrokersMetrics brokersMetrics = kafkaCluster.getBrokersMetrics() == null ? new BrokersMetrics() : kafkaCluster.getBrokersMetrics();
|
|
|
|
- brokersMetrics.activeControllers(metrics.getActiveControllers());
|
|
|
|
- brokersMetrics.brokerCount(metrics.getBrokerCount());
|
|
|
|
- cluster.setBrokerCount(metrics.getBrokerCount());
|
|
|
|
- var internalCluster = kafkaCluster.toBuilder().cluster(cluster).brokersMetrics(brokersMetrics).build();
|
|
|
|
- return getTopicsData(ac, internalCluster)
|
|
|
|
- .map(topics -> {
|
|
|
|
- internalCluster.setTopics(ClusterUtil.convertToExternalTopicList(topics));
|
|
|
|
- internalCluster.getCluster().setTopicCount(topics.size());
|
|
|
|
- return internalCluster;
|
|
|
|
- }).map(kc -> clusterWithId.toBuilder().kafkaCluster(
|
|
|
|
- kc.toBuilder()
|
|
|
|
- .cluster(cluster)
|
|
|
|
- .brokersMetrics(brokersMetrics)
|
|
|
|
- .build()
|
|
|
|
- ).build());
|
|
|
|
- })
|
|
|
|
|
|
+ getClusterMetrics(ac).flatMap(
|
|
|
|
+ metrics -> {
|
|
|
|
+ Cluster cluster = ClusterUtil.clone(tempCluster.getCluster());
|
|
|
|
+ cluster.setStatus(ServerStatus.ONLINE);
|
|
|
|
+ cluster.setBytesInPerSec(metrics.getBytesInPerSec());
|
|
|
|
+ cluster.setBytesOutPerSec(metrics.getBytesOutPerSec());
|
|
|
|
+ BrokersMetrics brokersMetrics = tempCluster.getBrokersMetrics() != null
|
|
|
|
+ ? ClusterUtil.clone(tempCluster.getBrokersMetrics()) : new BrokersMetrics();
|
|
|
|
+ brokersMetrics.setBrokerCount(metrics.getBrokerCount());
|
|
|
|
+ brokersMetrics.activeControllers(metrics.getActiveControllers());
|
|
|
|
+ brokersMetrics.brokerCount(metrics.getBrokerCount());
|
|
|
|
+ resetMetrics(brokersMetrics);
|
|
|
|
+ cluster.setBrokerCount(metrics.getBrokerCount());
|
|
|
|
+ return getTopicsData(ac, internalCluster, cluster, brokersMetrics, tempCluster)
|
|
|
|
+ .map(topics -> {
|
|
|
|
+ internalCluster.topics(ClusterUtil.convertToExternalTopicList(topics));
|
|
|
|
+ cluster.setTopicCount(topics.size());
|
|
|
|
+ return topics;
|
|
|
|
+ })
|
|
|
|
+ .flatMap(topics ->
|
|
|
|
+ loadTopicConfig(ac, topics.stream().map(InternalTopic::getName).collect(Collectors.toList())).collectList()
|
|
|
|
+ .map(s -> s.stream().collect(Collectors.toMap(map -> new ArrayList<>(map.entrySet()).get(0).getKey(),
|
|
|
|
+ e -> new ArrayList<>(e.entrySet()).get(0).getValue())))
|
|
|
|
+ .map(topicsConfig -> {
|
|
|
|
+ internalCluster.topicConfigsMap(topicsConfig);
|
|
|
|
+ return internalCluster;
|
|
|
|
+ })
|
|
|
|
+ ).map(kc -> clusterWithId.toBuilder().kafkaCluster(
|
|
|
|
+ kc
|
|
|
|
+ .cluster(cluster)
|
|
|
|
+ .brokersMetrics(brokersMetrics)
|
|
|
|
+ .build()
|
|
|
|
+ ).build());
|
|
|
|
+ })
|
|
).onErrorResume(
|
|
).onErrorResume(
|
|
e -> {
|
|
e -> {
|
|
- Cluster cluster = kafkaCluster.getCluster();
|
|
|
|
|
|
+ Cluster cluster = ClusterUtil.clone(tempCluster.getCluster());
|
|
cluster.setStatus(ServerStatus.OFFLINE);
|
|
cluster.setStatus(ServerStatus.OFFLINE);
|
|
return Mono.just(clusterWithId.toBuilder().kafkaCluster(
|
|
return Mono.just(clusterWithId.toBuilder().kafkaCluster(
|
|
- kafkaCluster.toBuilder()
|
|
|
|
|
|
+ tempCluster.toBuilder()
|
|
.lastKafkaException(e)
|
|
.lastKafkaException(e)
|
|
.cluster(cluster)
|
|
.cluster(cluster)
|
|
.build()
|
|
.build()
|
|
@@ -73,24 +86,24 @@ public class KafkaService {
|
|
}
|
|
}
|
|
|
|
|
|
@SneakyThrows
|
|
@SneakyThrows
|
|
- private Mono<List<InternalTopic>> getTopicsData(AdminClient adminClient, KafkaCluster kafkaCluster) {
|
|
|
|
|
|
+ private Mono<List<InternalTopic>> getTopicsData(AdminClient adminClient, KafkaCluster.KafkaClusterBuilder kafkaCluster,
|
|
|
|
+ Cluster cluster, BrokersMetrics brokersMetrics, KafkaCluster tempCluster) {
|
|
ListTopicsOptions listTopicsOptions = new ListTopicsOptions();
|
|
ListTopicsOptions listTopicsOptions = new ListTopicsOptions();
|
|
listTopicsOptions.listInternal(true);
|
|
listTopicsOptions.listInternal(true);
|
|
return ClusterUtil.toMono(adminClient.listTopics(listTopicsOptions).names())
|
|
return ClusterUtil.toMono(adminClient.listTopics(listTopicsOptions).names())
|
|
.map(tl -> {
|
|
.map(tl -> {
|
|
- kafkaCluster.getCluster().setTopicCount(tl.size());
|
|
|
|
|
|
+ cluster.setTopicCount(tl.size());
|
|
DescribeTopicsResult topicDescriptionsWrapper = adminClient.describeTopics(tl);
|
|
DescribeTopicsResult topicDescriptionsWrapper = adminClient.describeTopics(tl);
|
|
Map<String, KafkaFuture<TopicDescription>> topicDescriptionFuturesMap = topicDescriptionsWrapper.values();
|
|
Map<String, KafkaFuture<TopicDescription>> topicDescriptionFuturesMap = topicDescriptionsWrapper.values();
|
|
- resetMetrics(kafkaCluster);
|
|
|
|
return topicDescriptionFuturesMap.entrySet();
|
|
return topicDescriptionFuturesMap.entrySet();
|
|
})
|
|
})
|
|
.flatMapMany(Flux::fromIterable)
|
|
.flatMapMany(Flux::fromIterable)
|
|
.flatMap(s -> ClusterUtil.toMono(s.getValue()))
|
|
.flatMap(s -> ClusterUtil.toMono(s.getValue()))
|
|
- .flatMap(e -> collectTopicData(kafkaCluster, adminClient, e))
|
|
|
|
|
|
+ .map(e -> collectTopicData(kafkaCluster, e, cluster, brokersMetrics, tempCluster))
|
|
.collectList();
|
|
.collectList();
|
|
}
|
|
}
|
|
|
|
|
|
- private Mono<Metrics> getClusterMetrics(AdminClient client, KafkaCluster kafkaCluster) {
|
|
|
|
|
|
+ private Mono<Metrics> getClusterMetrics(AdminClient client) {
|
|
return ClusterUtil.toMono(client.describeCluster().nodes())
|
|
return ClusterUtil.toMono(client.describeCluster().nodes())
|
|
.map(Collection::size)
|
|
.map(Collection::size)
|
|
.flatMap(brokers ->
|
|
.flatMap(brokers ->
|
|
@@ -134,13 +147,8 @@ public class KafkaService {
|
|
throw new RuntimeException("Can't find created topic");
|
|
throw new RuntimeException("Can't find created topic");
|
|
}
|
|
}
|
|
return s;
|
|
return s;
|
|
- })
|
|
|
|
- .flatMap(td -> collectTopicData(cluster, adminClient, td))
|
|
|
|
- .map(topic -> {
|
|
|
|
- var resultTopic = ClusterUtil.convertToExternalTopic(topic);
|
|
|
|
- cluster.getTopics().add(resultTopic);
|
|
|
|
- return resultTopic;
|
|
|
|
- });
|
|
|
|
|
|
+ }).map(s -> getUpdatedCluster(new ClusterWithId(cluster.getName(), cluster)))
|
|
|
|
+ .map(s -> new Topic());
|
|
}
|
|
}
|
|
|
|
|
|
@SneakyThrows
|
|
@SneakyThrows
|
|
@@ -169,15 +177,16 @@ public class KafkaService {
|
|
return getClusterId(adminClient).map( r -> adminClient);
|
|
return getClusterId(adminClient).map( r -> adminClient);
|
|
}
|
|
}
|
|
|
|
|
|
- private void resetMetrics(KafkaCluster kafkaCluster) {
|
|
|
|
- kafkaCluster.getBrokersMetrics().setOnlinePartitionCount(0);
|
|
|
|
- kafkaCluster.getBrokersMetrics().setOfflinePartitionCount(0);
|
|
|
|
- kafkaCluster.getBrokersMetrics().setUnderReplicatedPartitionCount(0);
|
|
|
|
- kafkaCluster.getBrokersMetrics().setInSyncReplicasCount(0);
|
|
|
|
- kafkaCluster.getBrokersMetrics().setOutOfSyncReplicasCount(0);
|
|
|
|
|
|
+ private void resetMetrics(BrokersMetrics brokersMetrics) {
|
|
|
|
+ brokersMetrics.setOnlinePartitionCount(0);
|
|
|
|
+ brokersMetrics.setOfflinePartitionCount(0);
|
|
|
|
+ brokersMetrics.setUnderReplicatedPartitionCount(0);
|
|
|
|
+ brokersMetrics.setInSyncReplicasCount(0);
|
|
|
|
+ brokersMetrics.setOutOfSyncReplicasCount(0);
|
|
}
|
|
}
|
|
|
|
|
|
- private Mono<InternalTopic> collectTopicData(KafkaCluster kafkaCluster, AdminClient adminClient, TopicDescription topicDescription) {
|
|
|
|
|
|
+ private InternalTopic collectTopicData(KafkaCluster.KafkaClusterBuilder kafkaClusterBuilder, TopicDescription topicDescription,
|
|
|
|
+ Cluster cluster, BrokersMetrics brokersMetrics, KafkaCluster kafkaCluster) {
|
|
var topic = InternalTopic.builder();
|
|
var topic = InternalTopic.builder();
|
|
topic.internal(topicDescription.isInternal());
|
|
topic.internal(topicDescription.isInternal());
|
|
topic.name(topicDescription.name());
|
|
topic.name(topicDescription.name());
|
|
@@ -205,7 +214,11 @@ public class KafkaService {
|
|
|
|
|
|
topic.partitions(partitions);
|
|
topic.partitions(partitions);
|
|
|
|
|
|
- var topicDetails = kafkaCluster.getOrCreateTopicDetails(topicDescription.name());
|
|
|
|
|
|
+ if (kafkaCluster.getTopicDetailsMap() == null) {
|
|
|
|
+ kafkaClusterBuilder.topicDetailsMap(new HashMap<>());
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ var topicDetails = kafkaClusterBuilder.build().getOrCreateTopicDetails(topicDescription.name());
|
|
|
|
|
|
topicDetails.setReplicas(replicasCount);
|
|
topicDetails.setReplicas(replicasCount);
|
|
topicDetails.setPartitionCount(topicDescription.partitions().size());
|
|
topicDetails.setPartitionCount(topicDescription.partitions().size());
|
|
@@ -214,19 +227,17 @@ public class KafkaService {
|
|
? topicDescription.partitions().get(0).replicas().size()
|
|
? topicDescription.partitions().get(0).replicas().size()
|
|
: null);
|
|
: null);
|
|
topicDetails.setUnderReplicatedPartitions(urpCount);
|
|
topicDetails.setUnderReplicatedPartitions(urpCount);
|
|
- kafkaCluster.getCluster().setOnlinePartitionCount(kafkaCluster.getBrokersMetrics().getOnlinePartitionCount());
|
|
|
|
- kafkaCluster.getBrokersMetrics().setUnderReplicatedPartitionCount(
|
|
|
|
- kafkaCluster.getBrokersMetrics().getUnderReplicatedPartitionCount() + urpCount);
|
|
|
|
- kafkaCluster.getBrokersMetrics().setInSyncReplicasCount(
|
|
|
|
- kafkaCluster.getBrokersMetrics().getInSyncReplicasCount() + inSyncReplicasCount);
|
|
|
|
- kafkaCluster.getBrokersMetrics().setOutOfSyncReplicasCount(
|
|
|
|
- kafkaCluster.getBrokersMetrics().getOutOfSyncReplicasCount() + (replicasCount - inSyncReplicasCount));
|
|
|
|
-
|
|
|
|
- kafkaCluster.getBrokersMetrics().setOnlinePartitionCount(partitions.stream().filter(s -> s.getLeader() != null).map(e -> 1).reduce(0, Integer::sum));
|
|
|
|
- kafkaCluster.getBrokersMetrics().setOfflinePartitionCount(partitions.stream().filter(s -> s.getLeader() == null).map(e -> 1).reduce(0, Integer::sum));
|
|
|
|
- var resultTopic = topic.build();
|
|
|
|
|
|
+ cluster.setOnlinePartitionCount(brokersMetrics.getOnlinePartitionCount());
|
|
|
|
+ brokersMetrics.setUnderReplicatedPartitionCount(
|
|
|
|
+ brokersMetrics.getUnderReplicatedPartitionCount() + urpCount);
|
|
|
|
+ brokersMetrics.setInSyncReplicasCount(
|
|
|
|
+ brokersMetrics.getInSyncReplicasCount() + inSyncReplicasCount);
|
|
|
|
+ brokersMetrics.setOutOfSyncReplicasCount(
|
|
|
|
+ brokersMetrics.getOutOfSyncReplicasCount() + (replicasCount - inSyncReplicasCount));
|
|
|
|
|
|
- return loadTopicConfig(adminClient, kafkaCluster, resultTopic.getName()).map(l -> resultTopic);
|
|
|
|
|
|
+ brokersMetrics.setOnlinePartitionCount(partitions.stream().filter(s -> s.getLeader() != null).map(e -> 1).reduce(0, Integer::sum));
|
|
|
|
+ brokersMetrics.setOfflinePartitionCount(partitions.stream().filter(s -> s.getLeader() == null).map(e -> 1).reduce(0, Integer::sum));
|
|
|
|
+ return topic.build();
|
|
}
|
|
}
|
|
|
|
|
|
private Mono<TopicDescription> getTopicDescription(Map.Entry<String, KafkaFuture<TopicDescription>> entry) {
|
|
private Mono<TopicDescription> getTopicDescription(Map.Entry<String, KafkaFuture<TopicDescription>> entry) {
|
|
@@ -238,27 +249,28 @@ public class KafkaService {
|
|
}
|
|
}
|
|
|
|
|
|
@SneakyThrows
|
|
@SneakyThrows
|
|
- private Mono<List<TopicConfig>> loadTopicConfig(AdminClient adminClient, KafkaCluster kafkaCluster, String topicName) {
|
|
|
|
- Set<ConfigResource> resources = Collections.singleton(new ConfigResource(ConfigResource.Type.TOPIC, topicName));
|
|
|
|
- return ClusterUtil.toMono(adminClient.describeConfigs(resources).all())
|
|
|
|
- .map(configs -> {
|
|
|
|
- if (!configs.isEmpty()) return Collections.emptyList();
|
|
|
|
- Collection<ConfigEntry> entries = configs.values().iterator().next().entries();
|
|
|
|
- List<TopicConfig> topicConfigs = new ArrayList<>();
|
|
|
|
- for (ConfigEntry entry : entries) {
|
|
|
|
- TopicConfig topicConfig = new TopicConfig();
|
|
|
|
- topicConfig.setName(entry.name());
|
|
|
|
- topicConfig.setValue(entry.value());
|
|
|
|
- if (topicConfig.getName().equals(MESSAGE_FORMAT_VERSION_CONFIG)) {
|
|
|
|
- topicConfig.setDefaultValue(topicConfig.getValue());
|
|
|
|
- } else {
|
|
|
|
- topicConfig.setDefaultValue(TOPIC_DEFAULT_CONFIGS.get(entry.name()));
|
|
|
|
|
|
+ private Flux<Map<String, List<TopicConfig>>> loadTopicConfig(AdminClient adminClient, List<String> topicNames) {
|
|
|
|
+ return Flux.fromIterable(topicNames).flatMap(topicName -> {
|
|
|
|
+ Set<ConfigResource> resources = Collections.singleton(new ConfigResource(ConfigResource.Type.TOPIC, topicName));
|
|
|
|
+ return ClusterUtil.toMono(adminClient.describeConfigs(resources).all())
|
|
|
|
+ .map(configs -> {
|
|
|
|
+ if (configs.isEmpty()) return Collections.emptyMap();
|
|
|
|
+ Collection<ConfigEntry> entries = configs.values().iterator().next().entries();
|
|
|
|
+ List<TopicConfig> topicConfigs = new ArrayList<>();
|
|
|
|
+ for (ConfigEntry entry : entries) {
|
|
|
|
+ TopicConfig topicConfig = new TopicConfig();
|
|
|
|
+ topicConfig.setName(entry.name());
|
|
|
|
+ topicConfig.setValue(entry.value());
|
|
|
|
+ if (topicConfig.getName().equals(MESSAGE_FORMAT_VERSION_CONFIG)) {
|
|
|
|
+ topicConfig.setDefaultValue(topicConfig.getValue());
|
|
|
|
+ } else {
|
|
|
|
+ topicConfig.setDefaultValue(TOPIC_DEFAULT_CONFIGS.get(entry.name()));
|
|
|
|
+ }
|
|
|
|
+ topicConfigs.add(topicConfig);
|
|
}
|
|
}
|
|
- topicConfigs.add(topicConfig);
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- return kafkaCluster.getTopicConfigsMap().put(topicName, topicConfigs);
|
|
|
|
- });
|
|
|
|
|
|
+ return Collections.singletonMap(topicName, topicConfigs);
|
|
|
|
+ });
|
|
|
|
+ });
|
|
}
|
|
}
|
|
|
|
|
|
@SneakyThrows
|
|
@SneakyThrows
|