Browse Source

refactored metrics to copyonwrite methoology

Roman Nedzvetskiy 5 years ago
parent
commit
a912a4274d

+ 5 - 4
kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/ClustersMetricsScheduler.java

@@ -1,5 +1,6 @@
 package com.provectus.kafka.ui.cluster;
 package com.provectus.kafka.ui.cluster;
 
 
+import com.provectus.kafka.ui.cluster.model.ClusterWithId;
 import com.provectus.kafka.ui.cluster.model.ClustersStorage;
 import com.provectus.kafka.ui.cluster.model.ClustersStorage;
 import com.provectus.kafka.ui.cluster.service.MetricsUpdateService;
 import com.provectus.kafka.ui.cluster.service.MetricsUpdateService;
 import lombok.RequiredArgsConstructor;
 import lombok.RequiredArgsConstructor;
@@ -9,8 +10,6 @@ import org.springframework.stereotype.Component;
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Flux;
 import reactor.core.scheduler.Schedulers;
 import reactor.core.scheduler.Schedulers;
 
 
-import java.util.ArrayList;
-
 @Component
 @Component
 @RequiredArgsConstructor
 @RequiredArgsConstructor
 @Log4j2
 @Log4j2
@@ -22,9 +21,11 @@ public class ClustersMetricsScheduler {
 
 
     @Scheduled(fixedRate = 30000)
     @Scheduled(fixedRate = 30000)
     public void updateMetrics() {
     public void updateMetrics() {
-        Flux.range(0, clustersStorage.getKafkaClusters().size())
+        Flux.fromIterable(clustersStorage.getKafkaClustersMap().entrySet())
                 .subscribeOn(Schedulers.parallel())
                 .subscribeOn(Schedulers.parallel())
-                .doOnNext(s -> metricsUpdateService.updateMetrics(new ArrayList<>(clustersStorage.getKafkaClusters()).get(s)))
+                .map(s -> new ClusterWithId(s.getKey(), s.getValue()))
+                .flatMap(metricsUpdateService::updateMetrics)
+                .doOnNext(s -> clustersStorage.setKafkaCluster(s.getId(), s.getKafkaCluster()))
                 .subscribe();
                 .subscribe();
     }
     }
 }
 }

+ 13 - 0
kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/model/ClusterWithId.java

@@ -0,0 +1,13 @@
+package com.provectus.kafka.ui.cluster.model;
+
+import lombok.AllArgsConstructor;
+import lombok.Data;
+
+@Data
+@AllArgsConstructor
+public class ClusterWithId {
+
+    private String id;
+
+    private KafkaCluster kafkaCluster;
+}

+ 12 - 2
kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/model/ClustersStorage.java

@@ -7,13 +7,15 @@ import org.mapstruct.factory.Mappers;
 import org.springframework.stereotype.Component;
 import org.springframework.stereotype.Component;
 
 
 import javax.annotation.PostConstruct;
 import javax.annotation.PostConstruct;
-import java.util.*;
+import java.util.Collection;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
 
 
 @Component
 @Component
 @RequiredArgsConstructor
 @RequiredArgsConstructor
 public class ClustersStorage {
 public class ClustersStorage {
 
 
-    private final Map<String, KafkaCluster> kafkaClusters = new HashMap<>();
+    private final Map<String, KafkaCluster> kafkaClusters = new ConcurrentHashMap<>();
 
 
     private final ClustersProperties clusterProperties;
     private final ClustersProperties clusterProperties;
 
 
@@ -36,4 +38,12 @@ public class ClustersStorage {
     public KafkaCluster getClusterByName(String clusterName) {
     public KafkaCluster getClusterByName(String clusterName) {
         return kafkaClusters.get(clusterName);
         return kafkaClusters.get(clusterName);
     }
     }
+
+    public void setKafkaCluster(String key, KafkaCluster kafkaCluster) {
+        this.kafkaClusters.put(key, kafkaCluster);
+    }
+
+    public Map<String, KafkaCluster> getKafkaClustersMap() {
+        return kafkaClusters;
+    }
 }
 }

+ 13 - 0
kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/model/Metrics.java

@@ -0,0 +1,13 @@
+package com.provectus.kafka.ui.cluster.model;
+
+import lombok.Data;
+
+@Data
+public class Metrics {
+
+    private KafkaCluster kafkaCluster;
+
+    private Integer bytesInPerSec;
+
+    private Integer bytesOutPerSec;
+}

+ 2 - 1
kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/service/ClusterService.java

@@ -8,6 +8,7 @@ import com.provectus.kafka.ui.model.*;
 import lombok.RequiredArgsConstructor;
 import lombok.RequiredArgsConstructor;
 import lombok.SneakyThrows;
 import lombok.SneakyThrows;
 import org.apache.kafka.clients.admin.ConsumerGroupListing;
 import org.apache.kafka.clients.admin.ConsumerGroupListing;
+import org.springframework.http.HttpStatus;
 import org.springframework.http.ResponseEntity;
 import org.springframework.http.ResponseEntity;
 import org.springframework.stereotype.Service;
 import org.springframework.stereotype.Service;
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Flux;
@@ -59,7 +60,7 @@ public class ClusterService {
     public Mono<ResponseEntity<Topic>> createTopic(String name, Mono<TopicFormData> topicFormData) {
     public Mono<ResponseEntity<Topic>> createTopic(String name, Mono<TopicFormData> topicFormData) {
         KafkaCluster cluster = clustersStorage.getClusterByName(name);
         KafkaCluster cluster = clustersStorage.getClusterByName(name);
         if (cluster == null) return null;
         if (cluster == null) return null;
-        return kafkaService.createTopic(cluster.getAdminClient(), cluster, topicFormData);
+        return kafkaService.createTopic(cluster.getAdminClient(), cluster, topicFormData).map(s -> new ResponseEntity<>(s, HttpStatus.CREATED));
     }
     }
 
 
     @SneakyThrows
     @SneakyThrows

+ 9 - 5
kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/service/MetricsUpdateService.java

@@ -1,11 +1,12 @@
 package com.provectus.kafka.ui.cluster.service;
 package com.provectus.kafka.ui.cluster.service;
 
 
-import com.provectus.kafka.ui.cluster.model.KafkaCluster;
+import com.provectus.kafka.ui.cluster.model.ClusterWithId;
 import com.provectus.kafka.ui.kafka.KafkaService;
 import com.provectus.kafka.ui.kafka.KafkaService;
 import com.provectus.kafka.ui.zookeeper.ZookeeperService;
 import com.provectus.kafka.ui.zookeeper.ZookeeperService;
 import lombok.RequiredArgsConstructor;
 import lombok.RequiredArgsConstructor;
 import lombok.extern.log4j.Log4j2;
 import lombok.extern.log4j.Log4j2;
 import org.springframework.stereotype.Service;
 import org.springframework.stereotype.Service;
+import reactor.core.publisher.Mono;
 
 
 @Service
 @Service
 @RequiredArgsConstructor
 @RequiredArgsConstructor
@@ -15,9 +16,12 @@ public class MetricsUpdateService {
     private final KafkaService kafkaService;
     private final KafkaService kafkaService;
     private final ZookeeperService zookeeperService;
     private final ZookeeperService zookeeperService;
 
 
-    public void updateMetrics(KafkaCluster kafkaCluster) {
-        log.debug("Start getting metrics for kafkaCluster: {}", kafkaCluster.getName());
-        kafkaService.loadClusterMetrics(kafkaCluster);
-        zookeeperService.checkZookeeperStatus(kafkaCluster);
+    public Mono<ClusterWithId> updateMetrics(ClusterWithId clusterWithId) {
+        log.debug("Start getting metrics for kafkaCluster: {}", clusterWithId.getKafkaCluster());
+        return kafkaService.updateClusterMetrics(clusterWithId)
+                .map(s -> {
+                    zookeeperService.checkZookeeperStatus(s.getKafkaCluster());
+                    return s;
+                });
     }
     }
 }
 }

+ 36 - 25
kafka-ui-api/src/main/java/com/provectus/kafka/ui/kafka/KafkaService.java

@@ -1,6 +1,8 @@
 package com.provectus.kafka.ui.kafka;
 package com.provectus.kafka.ui.kafka;
 
 
+import com.provectus.kafka.ui.cluster.model.ClusterWithId;
 import com.provectus.kafka.ui.cluster.model.KafkaCluster;
 import com.provectus.kafka.ui.cluster.model.KafkaCluster;
+import com.provectus.kafka.ui.cluster.model.Metrics;
 import com.provectus.kafka.ui.cluster.util.ClusterUtil;
 import com.provectus.kafka.ui.cluster.util.ClusterUtil;
 import com.provectus.kafka.ui.model.*;
 import com.provectus.kafka.ui.model.*;
 import lombok.RequiredArgsConstructor;
 import lombok.RequiredArgsConstructor;
@@ -9,8 +11,6 @@ import lombok.extern.log4j.Log4j2;
 import org.apache.kafka.clients.admin.*;
 import org.apache.kafka.clients.admin.*;
 import org.apache.kafka.common.*;
 import org.apache.kafka.common.*;
 import org.apache.kafka.common.config.ConfigResource;
 import org.apache.kafka.common.config.ConfigResource;
-import org.springframework.http.HttpStatus;
-import org.springframework.http.ResponseEntity;
 import org.springframework.stereotype.Service;
 import org.springframework.stereotype.Service;
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
 import reactor.core.publisher.Mono;
@@ -26,7 +26,8 @@ import static org.apache.kafka.common.config.TopicConfig.MESSAGE_FORMAT_VERSION_
 public class KafkaService {
 public class KafkaService {
 
 
     @SneakyThrows
     @SneakyThrows
-    public void loadClusterMetrics(KafkaCluster kafkaCluster) {
+    public Mono<ClusterWithId> updateClusterMetrics(ClusterWithId clusterWithId) {
+        var kafkaCluster = clusterWithId.getKafkaCluster();
         var isConnected = false;
         var isConnected = false;
         log.debug("Start getting metrics for kafkaCluster: {}", kafkaCluster.getName());
         log.debug("Start getting metrics for kafkaCluster: {}", kafkaCluster.getName());
         if (kafkaCluster.getAdminClient() != null) {
         if (kafkaCluster.getAdminClient() != null) {
@@ -37,17 +38,22 @@ public class KafkaService {
         }
         }
         if (!isConnected) {
         if (!isConnected) {
             kafkaCluster.getCluster().setStatus(ServerStatus.OFFLINE);
             kafkaCluster.getCluster().setStatus(ServerStatus.OFFLINE);
-            return;
+            return Mono.empty();
         }
         }
         kafkaCluster.getCluster().setId(kafkaCluster.getId());
         kafkaCluster.getCluster().setId(kafkaCluster.getId());
         kafkaCluster.getCluster().setStatus(ServerStatus.ONLINE);
         kafkaCluster.getCluster().setStatus(ServerStatus.ONLINE);
-        loadMetrics(kafkaCluster);
-        loadTopicsData(kafkaCluster);
+        return loadMetrics(kafkaCluster).map(metrics -> {
+            var clusterId = new ClusterWithId(metrics.getKafkaCluster().getName(), metrics.getKafkaCluster());
+            var kafkaCluster1 = metrics.getKafkaCluster();
+            kafkaCluster1.getCluster().setBytesInPerSec(metrics.getBytesInPerSec());
+            kafkaCluster1.getCluster().setBytesOutPerSec(metrics.getBytesOutPerSec());
+            return clusterId;
+        }).flatMap(this::updateTopicsData);
     }
     }
 
 
 
 
     @SneakyThrows
     @SneakyThrows
-    public Mono<ResponseEntity<Topic>> createTopic(AdminClient adminClient, KafkaCluster cluster, Mono<TopicFormData> topicFormData) {
+    public Mono<Topic> createTopic(AdminClient adminClient, KafkaCluster cluster, Mono<TopicFormData> topicFormData) {
         return topicFormData.flatMap(
         return topicFormData.flatMap(
                 topicData -> {
                 topicData -> {
                     NewTopic newTopic = new NewTopic(topicData.getName(), topicData.getPartitions(), topicData.getReplicationFactor().shortValue());
                     NewTopic newTopic = new NewTopic(topicData.getName(), topicData.getPartitions(), topicData.getReplicationFactor().shortValue());
@@ -65,7 +71,7 @@ public class KafkaService {
                     collectTopicData(cluster, td))
                     collectTopicData(cluster, td))
                 .map(topic -> {
                 .map(topic -> {
                     cluster.getTopics().add(topic);
                     cluster.getTopics().add(topic);
-                    return new ResponseEntity<>(topic, HttpStatus.CREATED);
+                    return topic;
                 });
                 });
     }
     }
 
 
@@ -106,24 +112,26 @@ public class KafkaService {
     }
     }
 
 
     @SneakyThrows
     @SneakyThrows
-    private void loadTopicsData(KafkaCluster kafkaCluster) {
-        AdminClient adminClient = kafkaCluster.getAdminClient();
+    private Mono<ClusterWithId> updateTopicsData(ClusterWithId clusterWithId) {
+        AdminClient adminClient = clusterWithId.getKafkaCluster().getAdminClient();
         ListTopicsOptions listTopicsOptions = new ListTopicsOptions();
         ListTopicsOptions listTopicsOptions = new ListTopicsOptions();
         listTopicsOptions.listInternal(true);
         listTopicsOptions.listInternal(true);
-        ClusterUtil.toMono(adminClient.listTopics(listTopicsOptions).names())
+        return ClusterUtil.toMono(adminClient.listTopics(listTopicsOptions).names())
             .map(tl -> {
             .map(tl -> {
-                kafkaCluster.getCluster().setTopicCount(tl.size());
+                clusterWithId.getKafkaCluster().getCluster().setTopicCount(tl.size());
                 DescribeTopicsResult topicDescriptionsWrapper = adminClient.describeTopics(tl);
                 DescribeTopicsResult topicDescriptionsWrapper = adminClient.describeTopics(tl);
                 Map<String, KafkaFuture<TopicDescription>> topicDescriptionFuturesMap = topicDescriptionsWrapper.values();
                 Map<String, KafkaFuture<TopicDescription>> topicDescriptionFuturesMap = topicDescriptionsWrapper.values();
-                resetMetrics(kafkaCluster);
+                resetMetrics(clusterWithId.getKafkaCluster());
                 return topicDescriptionFuturesMap.entrySet();
                 return topicDescriptionFuturesMap.entrySet();
             })
             })
             .flatMapMany(Flux::fromIterable)
             .flatMapMany(Flux::fromIterable)
             .flatMap(s -> ClusterUtil.toMono(s.getValue()))
             .flatMap(s -> ClusterUtil.toMono(s.getValue()))
-            .flatMap(e -> collectTopicData(kafkaCluster, e))
+            .flatMap(e -> collectTopicData(clusterWithId.getKafkaCluster(), e))
             .collectList()
             .collectList()
-            .doOnNext(kafkaCluster::setTopics)
-            .subscribe();
+            .map(s -> {
+                clusterWithId.getKafkaCluster().setTopics(s);
+                return clusterWithId;
+            });
     }
     }
 
 
     private void resetMetrics(KafkaCluster kafkaCluster) {
     private void resetMetrics(KafkaCluster kafkaCluster) {
@@ -204,26 +212,29 @@ public class KafkaService {
                     });
                     });
     }
     }
 
 
-    private void loadMetrics(KafkaCluster kafkaCluster) {
+    private Mono<Metrics> loadMetrics(KafkaCluster kafkaCluster) {
+        Metrics metrics = new Metrics();
         AdminClient adminClient = kafkaCluster.getAdminClient();
         AdminClient adminClient = kafkaCluster.getAdminClient();
-        ClusterUtil.toMono(adminClient.describeCluster().nodes()).flatMap(brokers -> {
+        metrics.setKafkaCluster(kafkaCluster);
+        return ClusterUtil.toMono(adminClient.describeCluster().nodes()).flatMap(brokers -> {
             var brokerCount = brokers.size();
             var brokerCount = brokers.size();
             kafkaCluster.getCluster().setBrokerCount(brokerCount);
             kafkaCluster.getCluster().setBrokerCount(brokerCount);
             kafkaCluster.getBrokersMetrics().setBrokerCount(brokerCount);
             kafkaCluster.getBrokersMetrics().setBrokerCount(brokerCount);
             return ClusterUtil.toMono(adminClient.describeCluster().controller());
             return ClusterUtil.toMono(adminClient.describeCluster().controller());
-        }).doOnNext(c -> {
+        }).map(c -> {
             kafkaCluster.getBrokersMetrics().setActiveControllers(c != null ? 1 : 0);
             kafkaCluster.getBrokersMetrics().setActiveControllers(c != null ? 1 : 0);
             for (Map.Entry<MetricName, ? extends Metric> metricNameEntry : adminClient.metrics().entrySet()) {
             for (Map.Entry<MetricName, ? extends Metric> metricNameEntry : adminClient.metrics().entrySet()) {
                 if (metricNameEntry.getKey().name().equals(IN_BYTE_PER_SEC_METRIC)
                 if (metricNameEntry.getKey().name().equals(IN_BYTE_PER_SEC_METRIC)
                         && metricNameEntry.getKey().description().equals(IN_BYTE_PER_SEC_METRIC_DESCRIPTION)) {
                         && metricNameEntry.getKey().description().equals(IN_BYTE_PER_SEC_METRIC_DESCRIPTION)) {
-                    kafkaCluster.getCluster().setBytesInPerSec((int) Math.round((double) metricNameEntry.getValue().metricValue()));
+                    metrics.setBytesOutPerSec((int) Math.round((double) metricNameEntry.getValue().metricValue()));
                 }
                 }
                 if (metricNameEntry.getKey().name().equals(OUT_BYTE_PER_SEC_METRIC)
                 if (metricNameEntry.getKey().name().equals(OUT_BYTE_PER_SEC_METRIC)
                         && metricNameEntry.getKey().description().equals(OUT_BYTE_PER_SEC_METRIC_DESCRIPTION)) {
                         && metricNameEntry.getKey().description().equals(OUT_BYTE_PER_SEC_METRIC_DESCRIPTION)) {
-                    kafkaCluster.getCluster().setBytesOutPerSec((int) Math.round((double) metricNameEntry.getValue().metricValue()));
+                    metrics.setBytesOutPerSec((int) Math.round((double) metricNameEntry.getValue().metricValue()));
                 }
                 }
             }
             }
-        }).subscribe();
+            return metrics;
+        });
     }
     }
 
 
     @SneakyThrows
     @SneakyThrows
@@ -255,11 +266,11 @@ public class KafkaService {
     }
     }
 
 
     @SneakyThrows
     @SneakyThrows
-    private void createTopic(AdminClient adminClient, NewTopic newTopic) {
-        ClusterUtil.toMono(adminClient.createTopics(Collections.singletonList(newTopic))
+    private Mono<Void> createTopic(AdminClient adminClient, NewTopic newTopic) {
+        return ClusterUtil.toMono(adminClient.createTopics(Collections.singletonList(newTopic))
                 .values()
                 .values()
                 .values()
                 .values()
                 .iterator()
                 .iterator()
-                .next()).subscribe();
+                .next());
     }
     }
 }
 }

+ 10 - 0
kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml

@@ -325,6 +325,16 @@ components:
           additionalProperties:
           additionalProperties:
             type: string
             type: string
 
 
+    Metrics:
+      type: object
+      properties:
+        bytesInPerSec:
+          type: integer
+        bytesOutPerSec:
+          type: integer
+        cluster:
+          $ref: '#/components/schemas/Cluster'
+
     Broker:
     Broker:
       type: object
       type: object
       properties:
       properties: