|
@@ -1,10 +1,10 @@
|
|
|
package com.provectus.kafka.ui.kafka;
|
|
|
|
|
|
-import com.provectus.kafka.ui.cluster.model.ClusterWithId;
|
|
|
-import com.provectus.kafka.ui.cluster.model.KafkaCluster;
|
|
|
+import com.provectus.kafka.ui.cluster.model.*;
|
|
|
import com.provectus.kafka.ui.cluster.model.Metrics;
|
|
|
import com.provectus.kafka.ui.cluster.util.ClusterUtil;
|
|
|
import com.provectus.kafka.ui.model.*;
|
|
|
+import com.provectus.kafka.ui.model.Cluster;
|
|
|
import lombok.RequiredArgsConstructor;
|
|
|
import lombok.SneakyThrows;
|
|
|
import lombok.extern.log4j.Log4j2;
|
|
@@ -16,6 +16,7 @@ import reactor.core.publisher.Flux;
|
|
|
import reactor.core.publisher.Mono;
|
|
|
|
|
|
import java.util.*;
|
|
|
+import java.util.concurrent.ConcurrentHashMap;
|
|
|
|
|
|
import static com.provectus.kafka.ui.kafka.KafkaConstants.*;
|
|
|
import static org.apache.kafka.common.config.TopicConfig.MESSAGE_FORMAT_VERSION_CONFIG;
|
|
@@ -25,36 +26,115 @@ import static org.apache.kafka.common.config.TopicConfig.MESSAGE_FORMAT_VERSION_
|
|
|
@Log4j2
|
|
|
public class KafkaService {
|
|
|
|
|
|
- @SneakyThrows
|
|
|
- public Mono<ClusterWithId> updateClusterMetrics(ClusterWithId clusterWithId) {
|
|
|
+ private Map<String,AdminClient> adminClientCache = new ConcurrentHashMap<>();
|
|
|
+
|
|
|
+ public Mono<ClusterWithId> getUpdatedCluster(ClusterWithId clusterWithId) {
|
|
|
var kafkaCluster = clusterWithId.getKafkaCluster();
|
|
|
- var isConnected = false;
|
|
|
- log.debug("Start getting metrics for kafkaCluster: {}", kafkaCluster.getName());
|
|
|
- if (kafkaCluster.getAdminClient() != null) {
|
|
|
- isConnected = isAdminClientConnected(kafkaCluster);
|
|
|
- }
|
|
|
- if (kafkaCluster.getAdminClient() == null || !isConnected) {
|
|
|
- isConnected = createAdminClient(kafkaCluster);
|
|
|
- }
|
|
|
- if (!isConnected) {
|
|
|
- kafkaCluster.getCluster().setStatus(ServerStatus.OFFLINE);
|
|
|
- return Mono.empty();
|
|
|
- }
|
|
|
- kafkaCluster.getCluster().setId(kafkaCluster.getId());
|
|
|
- kafkaCluster.getCluster().setStatus(ServerStatus.ONLINE);
|
|
|
- return loadMetrics(kafkaCluster).map(metrics -> {
|
|
|
- var clusterId = new ClusterWithId(metrics.getKafkaCluster().getName(), metrics.getKafkaCluster());
|
|
|
- var kafkaCluster1 = metrics.getKafkaCluster();
|
|
|
- kafkaCluster1.getCluster().setBytesInPerSec(metrics.getBytesInPerSec());
|
|
|
- kafkaCluster1.getCluster().setBytesOutPerSec(metrics.getBytesOutPerSec());
|
|
|
- kafkaCluster1.getCluster().setBrokerCount(metrics.getBrokerCount());
|
|
|
- kafkaCluster1.getBrokersMetrics().setBrokerCount(metrics.getBrokerCount());
|
|
|
- kafkaCluster1.getBrokersMetrics().setActiveControllers(metrics.getActiveControllers());
|
|
|
- clusterId.setKafkaCluster(kafkaCluster1);
|
|
|
- return clusterId;
|
|
|
- }).flatMap(this::updateTopicsData);
|
|
|
+ return getOrCreateAdminClient(clusterWithId).flatMap(
|
|
|
+ ac ->
|
|
|
+ getClusterMetrics(ac, kafkaCluster).map(
|
|
|
+ metrics -> {
|
|
|
+ //TODO: generate new cluster
|
|
|
+ Cluster cluster = kafkaCluster.getCluster();
|
|
|
+ cluster.setStatus(ServerStatus.ONLINE);
|
|
|
+ cluster.setBytesInPerSec(metrics.getBytesInPerSec());
|
|
|
+ cluster.setBytesOutPerSec(metrics.getBytesOutPerSec());
|
|
|
+ //TODO: generate new BrokersMetrics
|
|
|
+ BrokersMetrics brokersMetrics = kafkaCluster.getBrokersMetrics();
|
|
|
+ brokersMetrics.activeControllers(metrics.getActiveControllers());
|
|
|
+ brokersMetrics.brokerCount(metrics.getBrokerCount());
|
|
|
+
|
|
|
+ getTopicsData(ac, )
|
|
|
+
|
|
|
+ return clusterWithId.toBuilder().kafkaCluster(
|
|
|
+ kafkaCluster.toBuilder()
|
|
|
+ .cluster(cluster)
|
|
|
+ .brokersMetrics(brokersMetrics)
|
|
|
+ .build()
|
|
|
+ ).build();
|
|
|
+ }
|
|
|
+
|
|
|
+ )
|
|
|
+ ).onErrorResume(
|
|
|
+ e -> {
|
|
|
+ //TODO: Copy Cluster
|
|
|
+ Cluster cluster = kafkaCluster.getCluster();
|
|
|
+ cluster.setStatus(ServerStatus.OFFLINE);
|
|
|
+ return Mono.just(clusterWithId.toBuilder().kafkaCluster(
|
|
|
+ kafkaCluster.toBuilder()
|
|
|
+ .lastKafkaException(e)
|
|
|
+ .cluster(cluster)
|
|
|
+ .build()
|
|
|
+ ).build());
|
|
|
+ }
|
|
|
+ );
|
|
|
}
|
|
|
|
|
|
+ @SneakyThrows
|
|
|
+ private Mono<List<InternalTopic>> getTopicsData(AdminClient adminClient, ClusterWithId clusterWithId) {
|
|
|
+ ListTopicsOptions listTopicsOptions = new ListTopicsOptions();
|
|
|
+ listTopicsOptions.listInternal(true);
|
|
|
+ return ClusterUtil.toMono(adminClient.listTopics(listTopicsOptions).names())
|
|
|
+ .map(tl -> {
|
|
|
+// clusterWithId.getKafkaCluster().getCluster().setTopicCount(tl.size());
|
|
|
+ DescribeTopicsResult topicDescriptionsWrapper = adminClient.describeTopics(tl);
|
|
|
+ Map<String, KafkaFuture<TopicDescription>> topicDescriptionFuturesMap = topicDescriptionsWrapper.values();
|
|
|
+ resetMetrics(clusterWithId.getKafkaCluster());
|
|
|
+ return topicDescriptionFuturesMap.entrySet();
|
|
|
+ })
|
|
|
+ .flatMapMany(Flux::fromIterable)
|
|
|
+ .flatMap(s -> ClusterUtil.toMono(s.getValue()))
|
|
|
+ .flatMap(e -> collectTopicData(clusterWithId.getKafkaCluster(), e))
|
|
|
+ .collectList()
|
|
|
+ .map(s -> {
|
|
|
+ clusterWithId.getKafkaCluster().setTopics(s);
|
|
|
+ return clusterWithId;
|
|
|
+ });
|
|
|
+ }
|
|
|
+
|
|
|
+ private Mono<Metrics> getClusterMetrics(AdminClient client, KafkaCluster kafkaCluster) {
|
|
|
+ return ClusterUtil.toMono(client.describeCluster().nodes())
|
|
|
+ .map(Collection::size)
|
|
|
+ .flatMap(brokers ->
|
|
|
+ ClusterUtil.toMono(client.describeCluster().controller()).map(
|
|
|
+ c -> {
|
|
|
+ Metrics metrics = new Metrics();
|
|
|
+ metrics.setBrokerCount(brokers);
|
|
|
+ metrics.setActiveControllers(c != null ? 1 : 0);
|
|
|
+ for (Map.Entry<MetricName, ? extends Metric> metricNameEntry : client.metrics().entrySet()) {
|
|
|
+ if (metricNameEntry.getKey().name().equals(IN_BYTE_PER_SEC_METRIC)
|
|
|
+ && metricNameEntry.getKey().description().equals(IN_BYTE_PER_SEC_METRIC_DESCRIPTION)) {
|
|
|
+ metrics.setBytesOutPerSec((int) Math.round((double) metricNameEntry.getValue().metricValue()));
|
|
|
+ }
|
|
|
+ if (metricNameEntry.getKey().name().equals(OUT_BYTE_PER_SEC_METRIC)
|
|
|
+ && metricNameEntry.getKey().description().equals(OUT_BYTE_PER_SEC_METRIC_DESCRIPTION)) {
|
|
|
+ metrics.setBytesOutPerSec((int) Math.round((double) metricNameEntry.getValue().metricValue()));
|
|
|
+ }
|
|
|
+ }
|
|
|
+ return metrics;
|
|
|
+ }
|
|
|
+ )
|
|
|
+ );
|
|
|
+ }
|
|
|
+
|
|
|
+// @SneakyThrows
|
|
|
+// public Mono<ClusterWithId> updateClusterMetrics(ClusterWithId clusterWithId) {
|
|
|
+//
|
|
|
+// kafkaCluster.getCluster().setId(kafkaCluster.getId());
|
|
|
+// kafkaCluster.getCluster().setStatus(ServerStatus.ONLINE);
|
|
|
+// return loadMetrics(kafkaCluster).map(metrics -> {
|
|
|
+// var clusterId = new ClusterWithId(metrics.getKafkaCluster().getName(), metrics.getKafkaCluster());
|
|
|
+// var kafkaCluster1 = metrics.getKafkaCluster();
|
|
|
+// kafkaCluster1.getCluster().setBytesInPerSec(metrics.getBytesInPerSec());
|
|
|
+// kafkaCluster1.getCluster().setBytesOutPerSec(metrics.getBytesOutPerSec());
|
|
|
+// kafkaCluster1.getCluster().setBrokerCount(metrics.getBrokerCount());
|
|
|
+// kafkaCluster1.getBrokersMetrics().setBrokerCount(metrics.getBrokerCount());
|
|
|
+// kafkaCluster1.getBrokersMetrics().setActiveControllers(metrics.getActiveControllers());
|
|
|
+// clusterId.setKafkaCluster(kafkaCluster1);
|
|
|
+// return clusterId;
|
|
|
+// }).flatMap(this::updateTopicsData);
|
|
|
+// }
|
|
|
+
|
|
|
|
|
|
@SneakyThrows
|
|
|
public Mono<Topic> createTopic(AdminClient adminClient, KafkaCluster cluster, Mono<TopicFormData> topicFormData) {
|
|
@@ -84,35 +164,30 @@ public class KafkaService {
|
|
|
return kafkaCluster.getAdminClient().describeCluster().clusterId().get();
|
|
|
}
|
|
|
|
|
|
- private boolean createAdminClient(KafkaCluster kafkaCluster) {
|
|
|
- try {
|
|
|
- Properties properties = new Properties();
|
|
|
- properties.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaCluster.getBootstrapServers());
|
|
|
- properties.put(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, 5000);
|
|
|
- kafkaCluster.setAdminClient(AdminClient.create(properties));
|
|
|
- kafkaCluster.setId(getClusterId(kafkaCluster));
|
|
|
- kafkaCluster.getCluster().setId(kafkaCluster.getId());
|
|
|
-
|
|
|
- return true;
|
|
|
- } catch (Exception e) {
|
|
|
- log.error(e);
|
|
|
- kafkaCluster.setLastKafkaException(e);
|
|
|
-
|
|
|
- return false;
|
|
|
- }
|
|
|
+ @SneakyThrows
|
|
|
+ private Mono<String> getClusterId(AdminClient adminClient) {
|
|
|
+ return ClusterUtil.toMono(adminClient.describeCluster().clusterId());
|
|
|
}
|
|
|
|
|
|
- private boolean isAdminClientConnected(KafkaCluster kafkaCluster) {
|
|
|
- try {
|
|
|
- getClusterId(kafkaCluster);
|
|
|
|
|
|
- return true;
|
|
|
- } catch (Exception e) {
|
|
|
- log.error(e);
|
|
|
- kafkaCluster.setLastKafkaException(e);
|
|
|
+ private Mono<AdminClient> getOrCreateAdminClient(ClusterWithId clusterWithId) {
|
|
|
+ AdminClient adminClient = adminClientCache.computeIfAbsent(
|
|
|
+ clusterWithId.getId(),
|
|
|
+ (id) -> createAdminClient(clusterWithId.getKafkaCluster())
|
|
|
+ );
|
|
|
|
|
|
- return false;
|
|
|
- }
|
|
|
+ return isAdminClientConnected(adminClient);
|
|
|
+ }
|
|
|
+
|
|
|
+ private AdminClient createAdminClient(KafkaCluster kafkaCluster) {
|
|
|
+ Properties properties = new Properties();
|
|
|
+ properties.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaCluster.getBootstrapServers());
|
|
|
+ properties.put(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, 5000);
|
|
|
+ return AdminClient.create(properties);
|
|
|
+ }
|
|
|
+
|
|
|
+ private Mono<AdminClient> isAdminClientConnected(AdminClient adminClient) {
|
|
|
+ return getClusterId(adminClient).map( r -> adminClient);
|
|
|
}
|
|
|
|
|
|
@SneakyThrows
|
|
@@ -146,19 +221,35 @@ public class KafkaService {
|
|
|
kafkaCluster.getBrokersMetrics().setOutOfSyncReplicasCount(0);
|
|
|
}
|
|
|
|
|
|
- private Mono<Topic> collectTopicData(KafkaCluster kafkaCluster, TopicDescription topicDescription) {
|
|
|
- var topic = new Topic();
|
|
|
- topic.setInternal(topicDescription.isInternal());
|
|
|
- topic.setName(topicDescription.name());
|
|
|
+ private Mono<InternalTopic> collectTopicData(KafkaCluster kafkaCluster, TopicDescription topicDescription) {
|
|
|
+ var topic = InternalTopic.builder();
|
|
|
+ topic.internal(topicDescription.isInternal());
|
|
|
+ topic.name(topicDescription.name());
|
|
|
|
|
|
int inSyncReplicasCount = 0, replicasCount = 0;
|
|
|
List<Partition> partitions = new ArrayList<>();
|
|
|
|
|
|
int urpCount = 0;
|
|
|
+
|
|
|
+ topicDescription.partitions().stream().map(
|
|
|
+ td -> {
|
|
|
+
|
|
|
+ }
|
|
|
+ )
|
|
|
for (TopicPartitionInfo partition : topicDescription.partitions()) {
|
|
|
- var partitionDto = new Partition();
|
|
|
- partitionDto.setLeader(partition.leader().id());
|
|
|
- partitionDto.setPartition(partition.partition());
|
|
|
+ var partitionDto = InternalPartition.builder();
|
|
|
+ partitionDto.leader(partition.leader().id());
|
|
|
+ partitionDto.partition(partition.partition());
|
|
|
+
|
|
|
+ partition.replicas().stream().map(
|
|
|
+ r -> {
|
|
|
+ InternalReplica.InternalReplicaBuilder replica = InternalReplica.builder();
|
|
|
+ replica.broker(r.id());
|
|
|
+ replica.leader(partition.leader().id()!=r.id());
|
|
|
+ replica.inSync(partition.isr().contains(r));
|
|
|
+
|
|
|
+ }
|
|
|
+ )
|
|
|
List<Replica> replicas = new ArrayList<>();
|
|
|
|
|
|
boolean isUrp = false;
|
|
@@ -216,29 +307,7 @@ public class KafkaService {
|
|
|
});
|
|
|
}
|
|
|
|
|
|
- private Mono<Metrics> loadMetrics(KafkaCluster kafkaCluster) {
|
|
|
- Metrics metrics = new Metrics();
|
|
|
- AdminClient adminClient = kafkaCluster.getAdminClient();
|
|
|
- metrics.setKafkaCluster(kafkaCluster);
|
|
|
- return ClusterUtil.toMono(adminClient.describeCluster().nodes()).flatMap(brokers -> {
|
|
|
- var brokerCount = brokers.size();
|
|
|
- metrics.setBrokerCount(brokerCount);
|
|
|
- return ClusterUtil.toMono(adminClient.describeCluster().controller());
|
|
|
- }).map(c -> {
|
|
|
- metrics.setActiveControllers(c != null ? 1 : 0);
|
|
|
- for (Map.Entry<MetricName, ? extends Metric> metricNameEntry : adminClient.metrics().entrySet()) {
|
|
|
- if (metricNameEntry.getKey().name().equals(IN_BYTE_PER_SEC_METRIC)
|
|
|
- && metricNameEntry.getKey().description().equals(IN_BYTE_PER_SEC_METRIC_DESCRIPTION)) {
|
|
|
- metrics.setBytesOutPerSec((int) Math.round((double) metricNameEntry.getValue().metricValue()));
|
|
|
- }
|
|
|
- if (metricNameEntry.getKey().name().equals(OUT_BYTE_PER_SEC_METRIC)
|
|
|
- && metricNameEntry.getKey().description().equals(OUT_BYTE_PER_SEC_METRIC_DESCRIPTION)) {
|
|
|
- metrics.setBytesOutPerSec((int) Math.round((double) metricNameEntry.getValue().metricValue()));
|
|
|
- }
|
|
|
- }
|
|
|
- return metrics;
|
|
|
- });
|
|
|
- }
|
|
|
+
|
|
|
|
|
|
@SneakyThrows
|
|
|
private Mono<List<TopicConfig>> loadTopicConfig(KafkaCluster kafkaCluster, String topicName) {
|