|
@@ -4,55 +4,100 @@ import com.provectus.kafka.ui.cluster.model.KafkaCluster;
|
|
|
import com.provectus.kafka.ui.cluster.model.MetricsConstants;
|
|
|
import com.provectus.kafka.ui.model.Partition;
|
|
|
import com.provectus.kafka.ui.model.Replica;
|
|
|
+import com.provectus.kafka.ui.model.ServerStatus;
|
|
|
import com.provectus.kafka.ui.model.Topic;
|
|
|
import lombok.RequiredArgsConstructor;
|
|
|
import lombok.SneakyThrows;
|
|
|
-import org.I0Itec.zkclient.ZkClient;
|
|
|
+import lombok.extern.log4j.Log4j2;
|
|
|
import org.apache.kafka.clients.admin.*;
|
|
|
import org.apache.kafka.common.KafkaFuture;
|
|
|
import org.apache.kafka.common.Node;
|
|
|
import org.apache.kafka.common.TopicPartitionInfo;
|
|
|
+import org.springframework.scheduling.annotation.Async;
|
|
|
import org.springframework.stereotype.Service;
|
|
|
|
|
|
import java.util.*;
|
|
|
|
|
|
import static com.provectus.kafka.ui.cluster.model.MetricsConstants.CLUSTER_ID;
|
|
|
-import static com.provectus.kafka.ui.cluster.model.MetricsConstants.ZOOKEEPER_STATUS;
|
|
|
|
|
|
@Service
|
|
|
@RequiredArgsConstructor
|
|
|
+@Log4j2
|
|
|
public class KafkaService {
|
|
|
|
|
|
@SneakyThrows
|
|
|
+ @Async
|
|
|
public void loadClusterMetrics(KafkaCluster kafkaCluster) {
|
|
|
- checkZookeperConnection(kafkaCluster);
|
|
|
+ log.debug("Start getting Kafka metrics for cluster: " + kafkaCluster.getName());
|
|
|
+ boolean isConnected = false;
|
|
|
+ if (kafkaCluster.getAdminClient() != null) {
|
|
|
+ isConnected = isAdminClientConnected(kafkaCluster);
|
|
|
+ }
|
|
|
+ if (kafkaCluster.getAdminClient() == null || !isConnected) {
|
|
|
+ isConnected = createAdminClient(kafkaCluster);
|
|
|
+ }
|
|
|
+
|
|
|
+ if (!isConnected) {
|
|
|
+ kafkaCluster.setStatus(ServerStatus.OFFLINE);
|
|
|
+
|
|
|
+ return;
|
|
|
+ }
|
|
|
+
|
|
|
+ kafkaCluster.setStatus(ServerStatus.ONLINE);
|
|
|
+ loadMetrics(kafkaCluster);
|
|
|
+ loadTopics(kafkaCluster);
|
|
|
+ loadTopicsDetails(kafkaCluster);
|
|
|
+ }
|
|
|
+
|
|
|
+ private boolean createAdminClient(KafkaCluster kafkaCluster) {
|
|
|
+ try {
|
|
|
+ Properties properties = new Properties();
|
|
|
+ properties.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaCluster.getBootstrapServers());
|
|
|
+ properties.put(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, 5000);
|
|
|
+ kafkaCluster.setAdminClient(AdminClient.create(properties));
|
|
|
|
|
|
- Properties properties = new Properties();
|
|
|
- properties.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaCluster.getBootstrapServers());
|
|
|
- properties.put(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, 5000);
|
|
|
- AdminClient adminClient = AdminClient.create(properties);
|
|
|
+ return true;
|
|
|
+ } catch (Exception e) {
|
|
|
+ log.error(e);
|
|
|
+ kafkaCluster.setLastKafkaException(e);
|
|
|
|
|
|
- loadMetrics(kafkaCluster, adminClient);
|
|
|
- loadTopics(kafkaCluster, adminClient);
|
|
|
- loadTopicsDetails(kafkaCluster, adminClient);
|
|
|
+ return false;
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ private boolean isAdminClientConnected(KafkaCluster kafkaCluster) {
|
|
|
+ try {
|
|
|
+ kafkaCluster.getAdminClient().listTopics();
|
|
|
+
|
|
|
+ return true;
|
|
|
+ } catch (Exception e) {
|
|
|
+ log.error(e);
|
|
|
+ kafkaCluster.setLastKafkaException(e);
|
|
|
+
|
|
|
+ return false;
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
- private void loadTopicsDetails(KafkaCluster kafkaCluster, AdminClient adminClient) {
|
|
|
+ private void loadTopicsDetails(KafkaCluster kafkaCluster) {
|
|
|
|
|
|
}
|
|
|
|
|
|
@SneakyThrows
|
|
|
- private void loadTopics(KafkaCluster kafkaCluster, AdminClient adminClient) {
|
|
|
+ private void loadTopics(KafkaCluster kafkaCluster) {
|
|
|
+ AdminClient adminClient = kafkaCluster.getAdminClient();
|
|
|
ListTopicsOptions listTopicsOptions = new ListTopicsOptions();
|
|
|
listTopicsOptions.listInternal(true);
|
|
|
- Collection<String> topicListings = adminClient.listTopics(listTopicsOptions).names().get();
|
|
|
+ var topicListings = adminClient.listTopics(listTopicsOptions).names().get();
|
|
|
|
|
|
- DescribeTopicsResult topicDescriptionsWrapper= adminClient.describeTopics(topicListings);
|
|
|
+ DescribeTopicsResult topicDescriptionsWrapper = adminClient.describeTopics(topicListings);
|
|
|
Map<String, KafkaFuture<TopicDescription>> topicDescriptionFuturesMap = topicDescriptionsWrapper.values();
|
|
|
List<Topic> foundTopics = new ArrayList<>();
|
|
|
String clusterId = kafkaCluster.getMetricsMap().get(CLUSTER_ID);
|
|
|
- for (KafkaFuture<TopicDescription> value : topicDescriptionFuturesMap.values()) {
|
|
|
- TopicDescription topicDescription = value.get();
|
|
|
+
|
|
|
+ for (var entry : topicDescriptionFuturesMap.entrySet()) {
|
|
|
+ var topicDescription = getTopicDescription(entry);
|
|
|
+ if (topicDescription == null) continue;
|
|
|
+
|
|
|
var topic = new Topic();
|
|
|
topic.setClusterId(clusterId);
|
|
|
topic.setInternal(topicDescription.isInternal());
|
|
@@ -79,9 +124,22 @@ public class KafkaService {
|
|
|
foundTopics.add(topic);
|
|
|
}
|
|
|
kafkaCluster.setTopics(foundTopics);
|
|
|
+
|
|
|
+
|
|
|
}
|
|
|
|
|
|
- private void loadMetrics(KafkaCluster kafkaCluster, AdminClient adminClient) throws InterruptedException, java.util.concurrent.ExecutionException {
|
|
|
+ private TopicDescription getTopicDescription(Map.Entry<String, KafkaFuture<TopicDescription>> entry) {
|
|
|
+ try {
|
|
|
+ return entry.getValue().get();
|
|
|
+ } catch (Exception e) {
|
|
|
+ log.error("Can't get topic with name: " + entry.getKey(), e);
|
|
|
+
|
|
|
+ return null;
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ private void loadMetrics(KafkaCluster kafkaCluster) throws InterruptedException, java.util.concurrent.ExecutionException {
|
|
|
+ AdminClient adminClient = kafkaCluster.getAdminClient();
|
|
|
kafkaCluster.putMetric(MetricsConstants.BROKERS_COUNT, String.valueOf(adminClient.describeCluster().nodes().get().size()));
|
|
|
|
|
|
ListTopicsOptions listTopicsOptions = new ListTopicsOptions();
|
|
@@ -96,20 +154,4 @@ public class KafkaService {
|
|
|
}
|
|
|
kafkaCluster.putMetric(MetricsConstants.PARTITIONS_COUNT, String.valueOf(partitionsNum));
|
|
|
}
|
|
|
-
|
|
|
- public static void checkZookeperConnection(KafkaCluster kafkaCluster){
|
|
|
- try {
|
|
|
- if (kafkaCluster.getZkClient() == null) {
|
|
|
-
|
|
|
- } else {
|
|
|
- }
|
|
|
- kafkaCluster.setZkClient(new ZkClient(kafkaCluster.getZookeeper(), 1000));
|
|
|
- kafkaCluster.putMetric(ZOOKEEPER_STATUS, "1");
|
|
|
- } catch (Exception e) {
|
|
|
- kafkaCluster.setZkClient(null);
|
|
|
- kafkaCluster.putMetric(ZOOKEEPER_STATUS, "0");
|
|
|
- kafkaCluster.setZookeeperException(e);
|
|
|
- throw e;
|
|
|
- }
|
|
|
- }
|
|
|
}
|