|
@@ -1,21 +1,20 @@
|
|
|
package com.provectus.kafka.ui.kafka;
|
|
|
|
|
|
import com.provectus.kafka.ui.cluster.model.KafkaCluster;
|
|
|
-import com.provectus.kafka.ui.cluster.model.MetricsConstants;
|
|
|
import com.provectus.kafka.ui.model.*;
|
|
|
import lombok.RequiredArgsConstructor;
|
|
|
import lombok.SneakyThrows;
|
|
|
import lombok.extern.log4j.Log4j2;
|
|
|
import org.apache.kafka.clients.admin.*;
|
|
|
-import org.apache.kafka.common.KafkaFuture;
|
|
|
-import org.apache.kafka.common.Node;
|
|
|
-import org.apache.kafka.common.TopicPartitionInfo;
|
|
|
+import org.apache.kafka.common.*;
|
|
|
+import org.apache.kafka.common.config.ConfigResource;
|
|
|
import org.springframework.scheduling.annotation.Async;
|
|
|
import org.springframework.stereotype.Service;
|
|
|
|
|
|
import java.util.*;
|
|
|
|
|
|
-import static com.provectus.kafka.ui.cluster.model.MetricsConstants.CLUSTER_ID;
|
|
|
+import static com.provectus.kafka.ui.kafka.KafkaConstants.*;
|
|
|
+import static org.apache.kafka.common.config.TopicConfig.MESSAGE_FORMAT_VERSION_CONFIG;
|
|
|
|
|
|
@Service
|
|
|
@RequiredArgsConstructor
|
|
@@ -35,14 +34,16 @@ public class KafkaService {
|
|
|
}
|
|
|
|
|
|
if (!isConnected) {
|
|
|
- kafkaCluster.setStatus(ServerStatus.OFFLINE);
|
|
|
+ kafkaCluster.getCluster().setStatus(ServerStatus.OFFLINE);
|
|
|
|
|
|
return;
|
|
|
}
|
|
|
|
|
|
- kafkaCluster.setStatus(ServerStatus.ONLINE);
|
|
|
+ kafkaCluster.setId(kafkaCluster.getAdminClient().describeCluster().clusterId().get());
|
|
|
+ kafkaCluster.getCluster().setId(kafkaCluster.getAdminClient().describeCluster().clusterId().get());
|
|
|
+ kafkaCluster.getCluster().setStatus(ServerStatus.ONLINE);
|
|
|
loadMetrics(kafkaCluster);
|
|
|
- loadTopics(kafkaCluster);
|
|
|
+ loadTopicsData(kafkaCluster);
|
|
|
}
|
|
|
|
|
|
private boolean createAdminClient(KafkaCluster kafkaCluster) {
|
|
@@ -75,68 +76,90 @@ public class KafkaService {
|
|
|
}
|
|
|
|
|
|
@SneakyThrows
|
|
|
- private void loadTopics(KafkaCluster kafkaCluster) {
|
|
|
+ private void loadTopicsData(KafkaCluster kafkaCluster) {
|
|
|
AdminClient adminClient = kafkaCluster.getAdminClient();
|
|
|
ListTopicsOptions listTopicsOptions = new ListTopicsOptions();
|
|
|
listTopicsOptions.listInternal(true);
|
|
|
var topicListings = adminClient.listTopics(listTopicsOptions).names().get();
|
|
|
+ kafkaCluster.getCluster().setTopicCount(topicListings.size());
|
|
|
|
|
|
DescribeTopicsResult topicDescriptionsWrapper = adminClient.describeTopics(topicListings);
|
|
|
Map<String, KafkaFuture<TopicDescription>> topicDescriptionFuturesMap = topicDescriptionsWrapper.values();
|
|
|
List<Topic> foundTopics = new ArrayList<>();
|
|
|
- String clusterId = kafkaCluster.getMetricsMap().get(CLUSTER_ID);
|
|
|
+ resetMetrics(kafkaCluster);
|
|
|
|
|
|
for (var entry : topicDescriptionFuturesMap.entrySet()) {
|
|
|
var topicDescription = getTopicDescription(entry);
|
|
|
if (topicDescription == null) continue;
|
|
|
-
|
|
|
- Topic topic = collectTopicData(clusterId, topicDescription);
|
|
|
- TopicDetails topicDetails = kafkaCluster.getTopicDetails(entry.getKey());
|
|
|
- collectTopicDetailsData(topicDetails, topicDescription);
|
|
|
-
|
|
|
+ Topic topic = collectTopicData(kafkaCluster, topicDescription);
|
|
|
foundTopics.add(topic);
|
|
|
}
|
|
|
kafkaCluster.setTopics(foundTopics);
|
|
|
}
|
|
|
|
|
|
- private void collectTopicDetailsData(TopicDetails topicDetails, TopicDescription topicDescription) {
|
|
|
- int inSyncReplicas = 0, replicas = 0;
|
|
|
- for (TopicPartitionInfo partition : topicDescription.partitions()) {
|
|
|
- inSyncReplicas += partition.isr().size();
|
|
|
- replicas += partition.replicas().size();
|
|
|
- }
|
|
|
-
|
|
|
- topicDetails.setReplicas(replicas);
|
|
|
- topicDetails.setPartitionCount(topicDescription.partitions().size());
|
|
|
- topicDetails.setInSyncReplicas(inSyncReplicas);
|
|
|
- topicDetails.setReplicationFactor(topicDescription.partitions().size() > 0
|
|
|
- ? topicDescription.partitions().get(0).replicas().size()
|
|
|
- : null);
|
|
|
+ private void resetMetrics(KafkaCluster kafkaCluster) {
|
|
|
+ kafkaCluster.getBrokersMetrics().setOnlinePartitionCount(0);
|
|
|
+ kafkaCluster.getBrokersMetrics().setOfflinePartitionCount(0);
|
|
|
+ kafkaCluster.getBrokersMetrics().setUnderReplicatedPartitionCount(0);
|
|
|
}
|
|
|
|
|
|
- private Topic collectTopicData(String clusterId, TopicDescription topicDescription) {
|
|
|
- var topic = new Topic().clusterId(clusterId);
|
|
|
+ private Topic collectTopicData(KafkaCluster kafkaCluster, TopicDescription topicDescription) {
|
|
|
+ var topic = new Topic().clusterId(kafkaCluster.getId());
|
|
|
topic.setInternal(topicDescription.isInternal());
|
|
|
topic.setName(topicDescription.name());
|
|
|
+
|
|
|
+ int inSyncReplicasCount = 0, replicasCount = 0;
|
|
|
List<Partition> partitions = new ArrayList<>();
|
|
|
|
|
|
+ int urpCount = 0;
|
|
|
for (TopicPartitionInfo partition : topicDescription.partitions()) {
|
|
|
var partitionDto = new Partition();
|
|
|
partitionDto.setLeader(partition.leader().id());
|
|
|
partitionDto.setPartition(partition.partition());
|
|
|
List<Replica> replicas = new ArrayList<>();
|
|
|
+
|
|
|
+ boolean isUrp = false;
|
|
|
for (Node replicaNode : partition.replicas()) {
|
|
|
var replica = new Replica();
|
|
|
replica.setBroker(replicaNode.id());
|
|
|
replica.setLeader(partition.leader() != null && partition.leader().id() == replicaNode.id());
|
|
|
replica.setInSync(partition.isr().contains(replicaNode));
|
|
|
+ if (!replica.getInSync()) {
|
|
|
+ isUrp = true;
|
|
|
+ }
|
|
|
replicas.add(replica);
|
|
|
+
|
|
|
+ inSyncReplicasCount += partition.isr().size();
|
|
|
+ replicasCount += partition.replicas().size();
|
|
|
+ }
|
|
|
+ if (isUrp) {
|
|
|
+ urpCount++;
|
|
|
}
|
|
|
partitionDto.setReplicas(replicas);
|
|
|
partitions.add(partitionDto);
|
|
|
+
|
|
|
+ if (partition.leader() != null) {
|
|
|
+ kafkaCluster.getBrokersMetrics().setOnlinePartitionCount(kafkaCluster.getBrokersMetrics().getOnlinePartitionCount() + 1);
|
|
|
+ } else {
|
|
|
+ kafkaCluster.getBrokersMetrics().setOfflinePartitionCount(kafkaCluster.getBrokersMetrics().getOfflinePartitionCount() + 1);
|
|
|
+ }
|
|
|
}
|
|
|
+ kafkaCluster.getCluster().setOnlinePartitionCount(kafkaCluster.getBrokersMetrics().getOnlinePartitionCount());
|
|
|
+ kafkaCluster.getBrokersMetrics().setUnderReplicatedPartitionCount(
|
|
|
+ kafkaCluster.getBrokersMetrics().getUnderReplicatedPartitionCount() + urpCount);
|
|
|
topic.setPartitions(partitions);
|
|
|
|
|
|
+ TopicDetails topicDetails = kafkaCluster.getTopicDetails(topicDescription.name());
|
|
|
+ topicDetails.setReplicas(replicasCount);
|
|
|
+ topicDetails.setPartitionCount(topicDescription.partitions().size());
|
|
|
+ topicDetails.setInSyncReplicas(inSyncReplicasCount);
|
|
|
+ topicDetails.setReplicationFactor(topicDescription.partitions().size() > 0
|
|
|
+ ? topicDescription.partitions().get(0).replicas().size()
|
|
|
+ : null);
|
|
|
+ topicDetails.setUnderReplicatedPartitions(urpCount);
|
|
|
+
|
|
|
+ loadTopicConfig(kafkaCluster, topicDescription.name());
|
|
|
+
|
|
|
return topic;
|
|
|
}
|
|
|
|
|
@@ -152,17 +175,46 @@ public class KafkaService {
|
|
|
|
|
|
private void loadMetrics(KafkaCluster kafkaCluster) throws InterruptedException, java.util.concurrent.ExecutionException {
|
|
|
AdminClient adminClient = kafkaCluster.getAdminClient();
|
|
|
- kafkaCluster.putMetric(MetricsConstants.BROKERS_COUNT, String.valueOf(adminClient.describeCluster().nodes().get().size()));
|
|
|
- ListTopicsOptions listTopicsOptions = new ListTopicsOptions();
|
|
|
- listTopicsOptions.listInternal(false);
|
|
|
- Set<String> topicNames = adminClient.listTopics(listTopicsOptions).names().get();
|
|
|
- kafkaCluster.putMetric(MetricsConstants.TOPIC_COUNT, String.valueOf(topicNames.size()));
|
|
|
-
|
|
|
- int partitionsNum = 0;
|
|
|
- DescribeTopicsResult describeTopicsResult = adminClient.describeTopics(topicNames);
|
|
|
- for (TopicDescription topicDescription : describeTopicsResult.all().get().values()) {
|
|
|
- partitionsNum += topicDescription.partitions().size();
|
|
|
+ int brokerCount = adminClient.describeCluster().nodes().get().size();
|
|
|
+ kafkaCluster.getCluster().setBrokerCount(brokerCount);
|
|
|
+ kafkaCluster.getBrokersMetrics().setBrokerCount(brokerCount);
|
|
|
+ kafkaCluster.getBrokersMetrics().setActiveControllers(adminClient.describeCluster().controller().get() != null ? 1 : 0);
|
|
|
+
|
|
|
+ for (Map.Entry<MetricName, ? extends Metric> metricNameEntry : adminClient.metrics().entrySet()) {
|
|
|
+ if (metricNameEntry.getKey().name().equals(IN_BYTE_PER_SEC_METRIC)
|
|
|
+ && metricNameEntry.getKey().description().equals(IN_BYTE_PER_SEC_METRIC_DESCRIPTION)) {
|
|
|
+ kafkaCluster.getCluster().setBytesInPerSec((int) Math.round((double) metricNameEntry.getValue().metricValue()));
|
|
|
+ }
|
|
|
+ if (metricNameEntry.getKey().name().equals(OUT_BYTE_PER_SEC_METRIC)
|
|
|
+ && metricNameEntry.getKey().description().equals(OUT_BYTE_PER_SEC_METRIC_DESCRIPTION)) {
|
|
|
+ kafkaCluster.getCluster().setBytesOutPerSec((int) Math.round((double) metricNameEntry.getValue().metricValue()));
|
|
|
+ }
|
|
|
}
|
|
|
- kafkaCluster.putMetric(MetricsConstants.PARTITIONS_COUNT, String.valueOf(partitionsNum));
|
|
|
+ }
|
|
|
+
|
|
|
+ @SneakyThrows
|
|
|
+ private void loadTopicConfig(KafkaCluster kafkaCluster, String topicName) {
|
|
|
+ AdminClient adminClient = kafkaCluster.getAdminClient();
|
|
|
+
|
|
|
+ Set<ConfigResource> resources = Collections.singleton(new ConfigResource(ConfigResource.Type.TOPIC, "messages"));
|
|
|
+ final Map<ConfigResource, Config> configs = adminClient.describeConfigs(resources).all().get();
|
|
|
+
|
|
|
+ if (configs.isEmpty()) return;
|
|
|
+
|
|
|
+ Collection<ConfigEntry> entries = configs.values().iterator().next().entries();
|
|
|
+ List<TopicConfig> topicConfigs = new ArrayList<>();
|
|
|
+ for (ConfigEntry entry : entries) {
|
|
|
+ TopicConfig topicConfig = new TopicConfig();
|
|
|
+ topicConfig.setName(entry.name());
|
|
|
+ topicConfig.setValue(entry.value());
|
|
|
+ if (topicConfig.getName().equals(MESSAGE_FORMAT_VERSION_CONFIG)) {
|
|
|
+ topicConfig.setDefaultValue(topicConfig.getValue());
|
|
|
+ } else {
|
|
|
+ topicConfig.setDefaultValue(TOPIC_DEFAULT_CONFIGS.get(entry.name()));
|
|
|
+ }
|
|
|
+ topicConfigs.add(topicConfig);
|
|
|
+ }
|
|
|
+
|
|
|
+ kafkaCluster.getTopicConfigsMap().put(topicName, topicConfigs);
|
|
|
}
|
|
|
}
|