Browse Source

Metrics collection refactoring

* BE Services split:
* Unrelated logic moved from ClusterService to proper services
* KafkaCluster existence check moved to controllers level
* useless interfaces removed

* ConsumingService merged into MessagesService

* sonar fix

* PR fixes

* methods rename

* checkstyle fix

* wip

* wip

* wip

* wip

* wip

* wip

* getTopicDetails partitions set returned

* compilation fix

* Disk usage fields fixes

* typo & unused removal

* features retrieval returned

Co-authored-by: Ilya Kuramshin <ikuramshin@provectus.com>
Ilya Kuramshin 3 years ago
parent
commit
627885d0ec
17 changed files with 457 additions and 411 deletions
  1. 4 8
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/TopicsController.java
  2. 9 27
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/mapper/ClusterMapper.java
  3. 41 10
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/InternalClusterMetrics.java
  4. 13 0
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/InternalPartition.java
  5. 22 4
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/InternalTopic.java
  6. 1 1
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/JmxBrokerMetrics.java
  7. 4 8
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/KafkaCluster.java
  8. 2 2
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/BrokerService.java
  9. 18 9
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/ClustersStorage.java
  10. 2 2
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/FeatureService.java
  11. 3 2
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/MessagesService.java
  12. 168 271
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/MetricsService.java
  13. 8 0
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/ReactiveAdminClient.java
  14. 32 25
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/TopicsService.java
  15. 21 2
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/ZookeeperService.java
  16. 81 4
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/util/JmxClusterUtil.java
  17. 28 36
      kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/TopicsServiceTest.java

+ 4 - 8
kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/TopicsController.java

@@ -49,20 +49,16 @@ public class TopicsController extends AbstractController implements TopicsApi {
   public Mono<ResponseEntity<Flux<TopicConfigDTO>>> getTopicConfigs(
   public Mono<ResponseEntity<Flux<TopicConfigDTO>>> getTopicConfigs(
       String clusterName, String topicName, ServerWebExchange exchange) {
       String clusterName, String topicName, ServerWebExchange exchange) {
     return Mono.just(
     return Mono.just(
-        topicsService.getTopicConfigs(getCluster(clusterName), topicName)
-            .map(Flux::fromIterable)
-            .map(ResponseEntity::ok)
-            .orElse(ResponseEntity.notFound().build())
-    );
+        ResponseEntity.ok(
+            Flux.fromIterable(topicsService.getTopicConfigs(getCluster(clusterName), topicName))));
   }
   }
 
 
   @Override
   @Override
   public Mono<ResponseEntity<TopicDetailsDTO>> getTopicDetails(
   public Mono<ResponseEntity<TopicDetailsDTO>> getTopicDetails(
       String clusterName, String topicName, ServerWebExchange exchange) {
       String clusterName, String topicName, ServerWebExchange exchange) {
     return Mono.just(
     return Mono.just(
-        topicsService.getTopicDetails(getCluster(clusterName), topicName)
-            .map(ResponseEntity::ok)
-            .orElse(ResponseEntity.notFound().build())
+        ResponseEntity.ok(
+            topicsService.getTopicDetails(getCluster(clusterName), topicName))
     );
     );
   }
   }
 
 

+ 9 - 27
kafka-ui-api/src/main/java/com/provectus/kafka/ui/mapper/ClusterMapper.java

@@ -15,13 +15,13 @@ import com.provectus.kafka.ui.model.ConnectDTO;
 import com.provectus.kafka.ui.model.Feature;
 import com.provectus.kafka.ui.model.Feature;
 import com.provectus.kafka.ui.model.InternalBrokerConfig;
 import com.provectus.kafka.ui.model.InternalBrokerConfig;
 import com.provectus.kafka.ui.model.InternalBrokerDiskUsage;
 import com.provectus.kafka.ui.model.InternalBrokerDiskUsage;
-import com.provectus.kafka.ui.model.InternalBrokerMetrics;
 import com.provectus.kafka.ui.model.InternalClusterMetrics;
 import com.provectus.kafka.ui.model.InternalClusterMetrics;
 import com.provectus.kafka.ui.model.InternalPartition;
 import com.provectus.kafka.ui.model.InternalPartition;
 import com.provectus.kafka.ui.model.InternalReplica;
 import com.provectus.kafka.ui.model.InternalReplica;
 import com.provectus.kafka.ui.model.InternalSchemaRegistry;
 import com.provectus.kafka.ui.model.InternalSchemaRegistry;
 import com.provectus.kafka.ui.model.InternalTopic;
 import com.provectus.kafka.ui.model.InternalTopic;
 import com.provectus.kafka.ui.model.InternalTopicConfig;
 import com.provectus.kafka.ui.model.InternalTopicConfig;
+import com.provectus.kafka.ui.model.JmxBrokerMetrics;
 import com.provectus.kafka.ui.model.KafkaCluster;
 import com.provectus.kafka.ui.model.KafkaCluster;
 import com.provectus.kafka.ui.model.KafkaConnectCluster;
 import com.provectus.kafka.ui.model.KafkaConnectCluster;
 import com.provectus.kafka.ui.model.PartitionDTO;
 import com.provectus.kafka.ui.model.PartitionDTO;
@@ -31,7 +31,6 @@ import com.provectus.kafka.ui.model.TopicDTO;
 import com.provectus.kafka.ui.model.TopicDetailsDTO;
 import com.provectus.kafka.ui.model.TopicDetailsDTO;
 import com.provectus.kafka.ui.model.schemaregistry.InternalCompatibilityCheck;
 import com.provectus.kafka.ui.model.schemaregistry.InternalCompatibilityCheck;
 import com.provectus.kafka.ui.model.schemaregistry.InternalCompatibilityLevel;
 import com.provectus.kafka.ui.model.schemaregistry.InternalCompatibilityLevel;
-import java.math.BigDecimal;
 import java.nio.file.Path;
 import java.nio.file.Path;
 import java.util.Arrays;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.Collections;
@@ -48,12 +47,12 @@ import org.mapstruct.Named;
 public interface ClusterMapper {
 public interface ClusterMapper {
 
 
   @Mapping(target = "brokerCount", source = "metrics.brokerCount")
   @Mapping(target = "brokerCount", source = "metrics.brokerCount")
+  @Mapping(target = "status", source = "metrics.status")
+  @Mapping(target = "version", source = "metrics.version")
   @Mapping(target = "onlinePartitionCount", source = "metrics.onlinePartitionCount")
   @Mapping(target = "onlinePartitionCount", source = "metrics.onlinePartitionCount")
   @Mapping(target = "topicCount", source = "metrics.topicCount")
   @Mapping(target = "topicCount", source = "metrics.topicCount")
-  @Mapping(target = "bytesInPerSec", source = "metrics.bytesInPerSec",
-      qualifiedByName = "sumMetrics")
-  @Mapping(target = "bytesOutPerSec", source = "metrics.bytesOutPerSec",
-      qualifiedByName = "sumMetrics")
+  @Mapping(target = "bytesInPerSec", source = "metrics.bytesInPerSec")
+  @Mapping(target = "bytesOutPerSec", source = "metrics.bytesOutPerSec")
   ClusterDTO toCluster(KafkaCluster cluster);
   ClusterDTO toCluster(KafkaCluster cluster);
 
 
   @Mapping(target = "protobufFile", source = "protobufFile", qualifiedByName = "resolvePath")
   @Mapping(target = "protobufFile", source = "protobufFile", qualifiedByName = "resolvePath")
@@ -68,7 +67,7 @@ public interface ClusterMapper {
   @Mapping(target = "items", source = "metrics")
   @Mapping(target = "items", source = "metrics")
   ClusterMetricsDTO toClusterMetrics(InternalClusterMetrics metrics);
   ClusterMetricsDTO toClusterMetrics(InternalClusterMetrics metrics);
 
 
-  BrokerMetricsDTO toBrokerMetrics(InternalBrokerMetrics metrics);
+  BrokerMetricsDTO toBrokerMetrics(JmxBrokerMetrics metrics);
 
 
   @Mapping(target = "isSensitive", source = "sensitive")
   @Mapping(target = "isSensitive", source = "sensitive")
   @Mapping(target = "isReadOnly", source = "readOnly")
   @Mapping(target = "isReadOnly", source = "readOnly")
@@ -119,17 +118,6 @@ public interface ClusterMapper {
 
 
   TopicDetailsDTO toTopicDetails(InternalTopic topic);
   TopicDetailsDTO toTopicDetails(InternalTopic topic);
 
 
-  default TopicDetailsDTO toTopicDetails(InternalTopic topic, InternalClusterMetrics metrics) {
-    final TopicDetailsDTO result = toTopicDetails(topic);
-    result.setBytesInPerSec(
-        metrics.getBytesInPerSec().get(topic.getName())
-    );
-    result.setBytesOutPerSec(
-        metrics.getBytesOutPerSec().get(topic.getName())
-    );
-    return result;
-  }
-
   @Mapping(target = "isReadOnly", source = "readOnly")
   @Mapping(target = "isReadOnly", source = "readOnly")
   @Mapping(target = "isSensitive", source = "sensitive")
   @Mapping(target = "isSensitive", source = "sensitive")
   TopicConfigDTO toTopicConfig(InternalTopicConfig topic);
   TopicConfigDTO toTopicConfig(InternalTopicConfig topic);
@@ -160,19 +148,13 @@ public interface ClusterMapper {
 
 
   @Named("mapDiskUsage")
   @Named("mapDiskUsage")
   default List<BrokerDiskUsageDTO> mapDiskUsage(Map<Integer, InternalBrokerDiskUsage> brokers) {
   default List<BrokerDiskUsageDTO> mapDiskUsage(Map<Integer, InternalBrokerDiskUsage> brokers) {
+    if (brokers == null) {
+      return null;
+    }
     return brokers.entrySet().stream().map(e -> this.map(e.getKey(), e.getValue()))
     return brokers.entrySet().stream().map(e -> this.map(e.getKey(), e.getValue()))
         .collect(Collectors.toList());
         .collect(Collectors.toList());
   }
   }
 
 
-  @Named("sumMetrics")
-  default BigDecimal sumMetrics(Map<String, BigDecimal> metrics) {
-    if (metrics != null) {
-      return metrics.values().stream().reduce(BigDecimal.ZERO, BigDecimal::add);
-    } else {
-      return BigDecimal.ZERO;
-    }
-  }
-
   @Named("resolvePath")
   @Named("resolvePath")
   default Path resolvePath(String path) {
   default Path resolvePath(String path) {
     if (path != null) {
     if (path != null) {

+ 41 - 10
kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/InternalClusterMetrics.java

@@ -3,6 +3,7 @@ package com.provectus.kafka.ui.model;
 import java.math.BigDecimal;
 import java.math.BigDecimal;
 import java.util.List;
 import java.util.List;
 import java.util.Map;
 import java.util.Map;
+import javax.annotation.Nullable;
 import lombok.Builder;
 import lombok.Builder;
 import lombok.Data;
 import lombok.Data;
 
 
@@ -10,22 +11,52 @@ import lombok.Data;
 @Data
 @Data
 @Builder(toBuilder = true)
 @Builder(toBuilder = true)
 public class InternalClusterMetrics {
 public class InternalClusterMetrics {
+
+  public static InternalClusterMetrics empty() {
+    return InternalClusterMetrics.builder()
+        .brokers(List.of())
+        .topics(Map.of())
+        .status(ServerStatusDTO.OFFLINE)
+        .zookeeperStatus(ServerStatusDTO.OFFLINE)
+        .internalBrokerMetrics(Map.of())
+        .metrics(List.of())
+        .version("unknown")
+        .build();
+  }
+
+  private final String version;
+
+  private final ServerStatusDTO status;
+  private final Throwable lastKafkaException;
+
   private final int brokerCount;
   private final int brokerCount;
-  private final int topicCount;
   private final int activeControllers;
   private final int activeControllers;
-  private final int uncleanLeaderElectionCount;
-  private final int onlinePartitionCount;
+  private final List<Integer> brokers;
+
+  private final int topicCount;
+  private final Map<String, InternalTopic> topics;
+
+  // zk stats
+  @Deprecated //use 'zookeeperStatus' field with enum type instead
+  private final int zooKeeperStatus;
+  private final ServerStatusDTO zookeeperStatus;
+  private final Throwable lastZookeeperException;
+
+  // partitions stats
   private final int underReplicatedPartitionCount;
   private final int underReplicatedPartitionCount;
+  private final int onlinePartitionCount;
   private final int offlinePartitionCount;
   private final int offlinePartitionCount;
   private final int inSyncReplicasCount;
   private final int inSyncReplicasCount;
   private final int outOfSyncReplicasCount;
   private final int outOfSyncReplicasCount;
-  private final Map<String, BigDecimal> bytesInPerSec;
-  private final Map<String, BigDecimal> bytesOutPerSec;
-  private final long segmentCount;
-  private final long segmentSize;
+
+  // log dir stats
+  @Nullable // will be null if log dir collection disabled
   private final Map<Integer, InternalBrokerDiskUsage> internalBrokerDiskUsage;
   private final Map<Integer, InternalBrokerDiskUsage> internalBrokerDiskUsage;
-  private final Map<Integer, InternalBrokerMetrics> internalBrokerMetrics;
+
+  // metrics from jmx
+  private final BigDecimal bytesInPerSec;
+  private final BigDecimal bytesOutPerSec;
+  private final Map<Integer, JmxBrokerMetrics> internalBrokerMetrics;
   private final List<MetricDTO> metrics;
   private final List<MetricDTO> metrics;
-  private final int zooKeeperStatus;
-  private final String version;
+
 }
 }

+ 13 - 0
kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/InternalPartition.java

@@ -12,8 +12,21 @@ public class InternalPartition {
   private final List<InternalReplica> replicas;
   private final List<InternalReplica> replicas;
   private final int inSyncReplicasCount;
   private final int inSyncReplicasCount;
   private final int replicasCount;
   private final int replicasCount;
+
+  // should be updated manually on partitions return
   private final long offsetMin;
   private final long offsetMin;
   private final long offsetMax;
   private final long offsetMax;
+
+  // from log dir
   private final long segmentSize;
   private final long segmentSize;
   private final long segmentCount;
   private final long segmentCount;
+
+  public InternalPartition withOffsets(long min, long max) {
+    return toBuilder().offsetMin(min).offsetMax(max).build();
+  }
+
+  public InternalPartition withSegmentStats(long segmentSize, long segmentCount) {
+    return toBuilder().segmentSize(segmentSize).segmentCount(segmentCount).build();
+  }
+
 }
 }

+ 22 - 4
kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/InternalTopic.java

@@ -1,5 +1,6 @@
 package com.provectus.kafka.ui.model;
 package com.provectus.kafka.ui.model;
 
 
+import java.math.BigDecimal;
 import java.util.List;
 import java.util.List;
 import java.util.Map;
 import java.util.Map;
 import lombok.Builder;
 import lombok.Builder;
@@ -9,17 +10,34 @@ import lombok.Data;
 @Builder(toBuilder = true)
 @Builder(toBuilder = true)
 public class InternalTopic {
 public class InternalTopic {
 
 
+  // from TopicDescription
   private final String name;
   private final String name;
   private final boolean internal;
   private final boolean internal;
-  private final Map<Integer, InternalPartition> partitions;
-  private final List<InternalTopicConfig> topicConfigs;
-
-  private final CleanupPolicy cleanUpPolicy;
   private final int replicas;
   private final int replicas;
   private final int partitionCount;
   private final int partitionCount;
   private final int inSyncReplicas;
   private final int inSyncReplicas;
   private final int replicationFactor;
   private final int replicationFactor;
   private final int underReplicatedPartitions;
   private final int underReplicatedPartitions;
+  private final Map<Integer, InternalPartition> partitions;
+
+  // topic configs
+  private final List<InternalTopicConfig> topicConfigs;
+  private final CleanupPolicy cleanUpPolicy;
+
+  // rates from jmx
+  private final BigDecimal bytesInPerSec;
+  private final BigDecimal bytesOutPerSec;
+
+  // from log dir data
   private final long segmentSize;
   private final long segmentSize;
   private final long segmentCount;
   private final long segmentCount;
+
+  public InternalTopic withSegmentStats(long segmentSize, long segmentCount) {
+    return toBuilder().segmentSize(segmentSize).segmentCount(segmentCount).build();
+  }
+
+  public InternalTopic withIoRates(BigDecimal bytesInPerSec, BigDecimal bytesOutPerSec) {
+    return toBuilder().bytesInPerSec(bytesInPerSec).bytesOutPerSec(bytesOutPerSec).build();
+  }
+
 }
 }

+ 1 - 1
kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/InternalBrokerMetrics.java → kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/JmxBrokerMetrics.java

@@ -6,6 +6,6 @@ import lombok.Data;
 
 
 @Data
 @Data
 @Builder(toBuilder = true)
 @Builder(toBuilder = true)
-public class InternalBrokerMetrics {
+public class JmxBrokerMetrics {
   private final List<MetricDTO> metrics;
   private final List<MetricDTO> metrics;
 }
 }

+ 4 - 8
kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/KafkaCluster.java

@@ -26,18 +26,14 @@ public class KafkaCluster {
   private final List<KafkaConnectCluster> kafkaConnect;
   private final List<KafkaConnectCluster> kafkaConnect;
   private final String schemaNameTemplate;
   private final String schemaNameTemplate;
   private final String keySchemaNameTemplate;
   private final String keySchemaNameTemplate;
-  private final ServerStatusDTO status;
-  private final ServerStatusDTO zookeeperStatus;
-  private final InternalClusterMetrics metrics;
-  private final Map<String, InternalTopic> topics;
-  private final List<Integer> brokers;
-  private final Throwable lastKafkaException;
-  private final Throwable lastZookeeperException;
+  private final List<Feature> features;
   private final Path protobufFile;
   private final Path protobufFile;
   private final String protobufMessageName;
   private final String protobufMessageName;
   private final Map<String, String> protobufMessageNameByTopic;
   private final Map<String, String> protobufMessageNameByTopic;
   private final Properties properties;
   private final Properties properties;
   private final Boolean readOnly;
   private final Boolean readOnly;
   private final Boolean disableLogDirsCollection;
   private final Boolean disableLogDirsCollection;
-  private final List<Feature> features;
+
+  // state & metrics
+  private final InternalClusterMetrics metrics;
 }
 }

+ 2 - 2
kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/BrokerService.java

@@ -70,7 +70,7 @@ public class BrokerService {
   }
   }
 
 
   private Flux<InternalBrokerConfig> getBrokersConfig(KafkaCluster cluster, Integer brokerId) {
   private Flux<InternalBrokerConfig> getBrokersConfig(KafkaCluster cluster, Integer brokerId) {
-    if (!cluster.getBrokers().contains(brokerId)) {
+    if (!cluster.getMetrics().getBrokers().contains(brokerId)) {
       return Flux.error(
       return Flux.error(
           new NotFoundException(String.format("Broker with id %s not found", brokerId)));
           new NotFoundException(String.format("Broker with id %s not found", brokerId)));
     }
     }
@@ -139,7 +139,7 @@ public class BrokerService {
       KafkaCluster cluster, List<Integer> reqBrokers) {
       KafkaCluster cluster, List<Integer> reqBrokers) {
     return adminClientService.get(cluster)
     return adminClientService.get(cluster)
         .flatMap(admin -> {
         .flatMap(admin -> {
-          List<Integer> brokers = new ArrayList<>(cluster.getBrokers());
+          List<Integer> brokers = new ArrayList<>(cluster.getMetrics().getBrokers());
           if (reqBrokers != null && !reqBrokers.isEmpty()) {
           if (reqBrokers != null && !reqBrokers.isEmpty()) {
             brokers.retainAll(reqBrokers);
             brokers.retainAll(reqBrokers);
           }
           }

+ 18 - 9
kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/ClustersStorage.java

@@ -2,6 +2,7 @@ package com.provectus.kafka.ui.service;
 
 
 import com.provectus.kafka.ui.config.ClustersProperties;
 import com.provectus.kafka.ui.config.ClustersProperties;
 import com.provectus.kafka.ui.mapper.ClusterMapper;
 import com.provectus.kafka.ui.mapper.ClusterMapper;
+import com.provectus.kafka.ui.model.InternalClusterMetrics;
 import com.provectus.kafka.ui.model.InternalTopic;
 import com.provectus.kafka.ui.model.InternalTopic;
 import com.provectus.kafka.ui.model.KafkaCluster;
 import com.provectus.kafka.ui.model.KafkaCluster;
 import java.util.Collection;
 import java.util.Collection;
@@ -30,9 +31,7 @@ public class ClustersStorage {
       KafkaCluster cluster = clusterMapper.toKafkaCluster(clusterProperties);
       KafkaCluster cluster = clusterMapper.toKafkaCluster(clusterProperties);
       kafkaClusters.put(
       kafkaClusters.put(
           clusterProperties.getName(),
           clusterProperties.getName(),
-          cluster.toBuilder()
-              .topics(new HashMap<>())
-              .build()
+          cluster.toBuilder().metrics(InternalClusterMetrics.empty()).build()
       );
       );
     }
     }
   }
   }
@@ -52,22 +51,32 @@ public class ClustersStorage {
 
 
   public void onTopicDeleted(String clusterName, String topicToDelete) {
   public void onTopicDeleted(String clusterName, String topicToDelete) {
     var cluster = kafkaClusters.get(clusterName);
     var cluster = kafkaClusters.get(clusterName);
-    var topics = Optional.ofNullable(cluster.getTopics())
+    var topics = Optional.ofNullable(cluster.getMetrics().getTopics())
         .map(HashMap::new)
         .map(HashMap::new)
         .orElseGet(HashMap::new);
         .orElseGet(HashMap::new);
     topics.remove(topicToDelete);
     topics.remove(topicToDelete);
-    var updatedCluster = cluster.toBuilder().topics(topics).build();
-    setKafkaCluster(cluster.getName(), updatedCluster);
+    setUpdatedTopics(cluster, topics);
   }
   }
 
 
   public void onTopicUpdated(String clusterName, InternalTopic updatedTopic) {
   public void onTopicUpdated(String clusterName, InternalTopic updatedTopic) {
     var cluster = kafkaClusters.get(clusterName);
     var cluster = kafkaClusters.get(clusterName);
-    var topics = Optional.ofNullable(cluster.getTopics())
+    var topics = Optional.ofNullable(cluster.getMetrics().getTopics())
         .map(HashMap::new)
         .map(HashMap::new)
         .orElseGet(HashMap::new);
         .orElseGet(HashMap::new);
     topics.put(updatedTopic.getName(), updatedTopic);
     topics.put(updatedTopic.getName(), updatedTopic);
-    var updatedCluster = cluster.toBuilder().topics(topics).build();
-    setKafkaCluster(cluster.getName(), updatedCluster);
+    setUpdatedTopics(cluster, topics);
+  }
+
+  private void setUpdatedTopics(KafkaCluster cluster, Map<String, InternalTopic> topics) {
+    setKafkaCluster(
+        cluster.getName(),
+        cluster.toBuilder()
+            .metrics(
+                cluster.getMetrics().toBuilder()
+                    .topics(topics)
+                    .build())
+            .build()
+    );
   }
   }
 
 
   public Map<String, KafkaCluster> getKafkaClustersMap() {
   public Map<String, KafkaCluster> getKafkaClustersMap() {

+ 2 - 2
kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/FeatureService.java

@@ -22,7 +22,7 @@ public class FeatureService {
 
 
   private final BrokerService brokerService;
   private final BrokerService brokerService;
 
 
-  public Flux<Feature> getAvailableFeatures(KafkaCluster cluster) {
+  public Mono<List<Feature>> getAvailableFeatures(KafkaCluster cluster) {
     List<Mono<Feature>> features = new ArrayList<>();
     List<Mono<Feature>> features = new ArrayList<>();
 
 
     if (Optional.ofNullable(cluster.getKafkaConnect())
     if (Optional.ofNullable(cluster.getKafkaConnect())
@@ -44,7 +44,7 @@ public class FeatureService {
             .flatMap(r -> r ? Mono.just(Feature.TOPIC_DELETION) : Mono.empty())
             .flatMap(r -> r ? Mono.just(Feature.TOPIC_DELETION) : Mono.empty())
     );
     );
 
 
-    return Flux.fromIterable(features).flatMap(m -> m);
+    return Flux.fromIterable(features).flatMap(m -> m).collectList();
   }
   }
 
 
   private Mono<Boolean> isTopicDeletionEnabled(KafkaCluster cluster) {
   private Mono<Boolean> isTopicDeletionEnabled(KafkaCluster cluster) {

+ 3 - 2
kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/MessagesService.java

@@ -58,7 +58,7 @@ public class MessagesService {
 
 
   public Mono<Void> deleteTopicMessages(KafkaCluster cluster, String topicName,
   public Mono<Void> deleteTopicMessages(KafkaCluster cluster, String topicName,
                                         List<Integer> partitionsToInclude) {
                                         List<Integer> partitionsToInclude) {
-    if (!cluster.getTopics().containsKey(topicName)) {
+    if (!cluster.getMetrics().getTopics().containsKey(topicName)) {
       throw new TopicNotFoundException();
       throw new TopicNotFoundException();
     }
     }
     return offsetsForDeletion(cluster, topicName, partitionsToInclude)
     return offsetsForDeletion(cluster, topicName, partitionsToInclude)
@@ -84,7 +84,8 @@ public class MessagesService {
       throw new ValidationException("Invalid message: both key and value can't be null");
       throw new ValidationException("Invalid message: both key and value can't be null");
     }
     }
     if (msg.getPartition() != null
     if (msg.getPartition() != null
-        && msg.getPartition() > cluster.getTopics().get(topic).getPartitionCount() - 1) {
+        && msg.getPartition() > cluster.getMetrics().getTopics()
+          .get(topic).getPartitionCount() - 1) {
       throw new ValidationException("Invalid partition");
       throw new ValidationException("Invalid partition");
     }
     }
     RecordSerDe serde =
     RecordSerDe serde =

+ 168 - 271
kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/MetricsService.java

@@ -1,30 +1,33 @@
 package com.provectus.kafka.ui.service;
 package com.provectus.kafka.ui.service;
 
 
+import static com.provectus.kafka.ui.service.ReactiveAdminClient.ClusterDescription;
+import static com.provectus.kafka.ui.service.ZookeeperService.ZkStatus;
+import static com.provectus.kafka.ui.util.JmxClusterUtil.JmxMetrics;
+import static java.util.stream.Collectors.groupingBy;
+import static java.util.stream.Collectors.summarizingLong;
+import static java.util.stream.Collectors.toList;
+import static java.util.stream.Collectors.toMap;
+
 import com.provectus.kafka.ui.model.InternalBrokerDiskUsage;
 import com.provectus.kafka.ui.model.InternalBrokerDiskUsage;
-import com.provectus.kafka.ui.model.InternalBrokerMetrics;
 import com.provectus.kafka.ui.model.InternalClusterMetrics;
 import com.provectus.kafka.ui.model.InternalClusterMetrics;
 import com.provectus.kafka.ui.model.InternalPartition;
 import com.provectus.kafka.ui.model.InternalPartition;
-import com.provectus.kafka.ui.model.InternalSegmentSizeDto;
 import com.provectus.kafka.ui.model.InternalTopic;
 import com.provectus.kafka.ui.model.InternalTopic;
 import com.provectus.kafka.ui.model.KafkaCluster;
 import com.provectus.kafka.ui.model.KafkaCluster;
-import com.provectus.kafka.ui.model.MetricDTO;
 import com.provectus.kafka.ui.model.ServerStatusDTO;
 import com.provectus.kafka.ui.model.ServerStatusDTO;
 import com.provectus.kafka.ui.util.ClusterUtil;
 import com.provectus.kafka.ui.util.ClusterUtil;
 import com.provectus.kafka.ui.util.JmxClusterUtil;
 import com.provectus.kafka.ui.util.JmxClusterUtil;
-import com.provectus.kafka.ui.util.JmxMetricsName;
-import com.provectus.kafka.ui.util.JmxMetricsValueName;
 import java.math.BigDecimal;
 import java.math.BigDecimal;
-import java.util.ArrayList;
-import java.util.Collections;
 import java.util.List;
 import java.util.List;
 import java.util.LongSummaryStatistics;
 import java.util.LongSummaryStatistics;
 import java.util.Map;
 import java.util.Map;
 import java.util.Optional;
 import java.util.Optional;
 import java.util.stream.Collectors;
 import java.util.stream.Collectors;
+import lombok.Builder;
 import lombok.RequiredArgsConstructor;
 import lombok.RequiredArgsConstructor;
 import lombok.extern.log4j.Log4j2;
 import lombok.extern.log4j.Log4j2;
 import org.apache.kafka.common.Node;
 import org.apache.kafka.common.Node;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.requests.DescribeLogDirsResponse;
 import org.springframework.stereotype.Service;
 import org.springframework.stereotype.Service;
 import reactor.core.publisher.Mono;
 import reactor.core.publisher.Mono;
 import reactor.util.function.Tuple2;
 import reactor.util.function.Tuple2;
@@ -44,91 +47,113 @@ public class MetricsService {
 
 
   /**
   /**
    * Updates cluster's metrics and topics structure.
    * Updates cluster's metrics and topics structure.
+   *
    * @param cluster to be updated
    * @param cluster to be updated
    * @return cluster with up-to-date metrics and topics structure
    * @return cluster with up-to-date metrics and topics structure
    */
    */
   public Mono<KafkaCluster> updateClusterMetrics(KafkaCluster cluster) {
   public Mono<KafkaCluster> updateClusterMetrics(KafkaCluster cluster) {
-    return adminClientService.get(cluster)
-        .flatMap(
-            ac -> ac.getClusterVersion().flatMap(
-                version ->
-                    getClusterMetrics(ac)
-                        .flatMap(i -> fillJmxMetrics(i, cluster, ac))
-                        .flatMap(clusterMetrics ->
-                            topicsService.getTopicsData(ac).flatMap(it -> {
-                                  if (cluster.getDisableLogDirsCollection() == null
-                                      || !cluster.getDisableLogDirsCollection()) {
-                                    return updateSegmentMetrics(ac, clusterMetrics, it
-                                    );
-                                  } else {
-                                    return emptySegmentMetrics(clusterMetrics, it);
-                                  }
-                                }
-                            ).map(segmentSizeDto -> buildFromData(cluster, version, segmentSizeDto))
-                        )
-            )
-        ).flatMap(
-            nc -> featureService.getAvailableFeatures(cluster).collectList()
-                .map(f -> nc.toBuilder().features(f).build())
-        ).doOnError(e ->
+    return getMetrics(cluster)
+        .map(m -> cluster.toBuilder().metrics(m).build())
+        .zipWith(featureService.getAvailableFeatures(cluster),
+            (c, features) -> c.toBuilder().features(features).build());
+  }
+
+  private Mono<InternalClusterMetrics> getMetrics(KafkaCluster cluster) {
+    return adminClientService.get(cluster).flatMap(ac ->
+            ac.describeCluster().flatMap(
+                description -> Mono.just(
+                        MetricsCollector.builder()
+                            .clusterDescription(description)
+                            .version(ac.getVersion())
+                            .build()
+                    )
+                    .zipWith(jmxClusterUtil.getBrokerMetrics(cluster, description.getNodes()),
+                        (b, jmx) -> b.toBuilder().jmxMetrics(jmx).build())
+                    .zipWith(zookeeperService.getZkStatus(cluster),
+                        (b, status) -> b.toBuilder().zkStatus(status).build()))
+                    .zipWith(topicsService.getTopicsData(ac),
+                        (b, td) -> b.toBuilder().topicsData(td).build())
+                    .zipWith(getLogDirInfo(cluster, ac),
+                        (b, ld) -> b.toBuilder().logDirResult(ld).build())
+                .map(MetricsCollector::build)
+        )
+        .doOnError(e ->
             log.error("Failed to collect cluster {} info", cluster.getName(), e)
             log.error("Failed to collect cluster {} info", cluster.getName(), e)
         ).onErrorResume(
         ).onErrorResume(
-            e -> Mono.just(cluster.toBuilder()
+            e -> Mono.just(cluster.getMetrics().toBuilder()
                 .status(ServerStatusDTO.OFFLINE)
                 .status(ServerStatusDTO.OFFLINE)
                 .lastKafkaException(e)
                 .lastKafkaException(e)
                 .build())
                 .build())
         );
         );
   }
   }
 
 
-  private KafkaCluster buildFromData(KafkaCluster currentCluster,
-                                     String version,
-                                     InternalSegmentSizeDto segmentSizeDto) {
+  @Builder(toBuilder = true)
+  private static class MetricsCollector {
+    String version;
+    ClusterDescription clusterDescription;
+    JmxMetrics jmxMetrics;
+    List<InternalTopic> topicsData;
+    ZkStatus zkStatus;
+    Optional<LogDirInfo> logDirResult; // empty if log dir collection disabled
+
+    InternalClusterMetrics build() {
+      var metricsBuilder = InternalClusterMetrics.builder();
+      metricsBuilder.version(version);
+      metricsBuilder.status(ServerStatusDTO.ONLINE);
+      metricsBuilder.lastKafkaException(null);
 
 
-    var topics = segmentSizeDto.getInternalTopicWithSegmentSize();
-    var brokersMetrics = segmentSizeDto.getClusterMetricsWithSegmentSize();
-    var brokersIds = new ArrayList<>(brokersMetrics.getInternalBrokerMetrics().keySet());
+      metricsBuilder.zookeeperStatus(zkStatus.getStatus());
+      metricsBuilder.zooKeeperStatus(ClusterUtil.convertToIntServerStatus(zkStatus.getStatus()));
+      metricsBuilder.lastZookeeperException(zkStatus.getError());
 
 
-    InternalClusterMetrics.InternalClusterMetricsBuilder metricsBuilder =
-        brokersMetrics.toBuilder();
+      metricsBuilder.brokers(
+          clusterDescription.getNodes().stream().map(Node::id).collect(toList()));
+      metricsBuilder.brokerCount(clusterDescription.getNodes().size());
+      metricsBuilder.activeControllers(clusterDescription.getController() != null ? 1 : 0);
 
 
-    InternalClusterMetrics topicsMetrics = collectTopicsMetrics(topics);
+      fillTopicsMetrics(metricsBuilder, topicsData);
+      fillJmxMetrics(metricsBuilder, jmxMetrics);
 
 
-    ServerStatusDTO zookeeperStatus = ServerStatusDTO.OFFLINE;
-    Throwable zookeeperException = null;
-    try {
-      zookeeperStatus = zookeeperService.isZookeeperOnline(currentCluster)
-          ? ServerStatusDTO.ONLINE
-          : ServerStatusDTO.OFFLINE;
-    } catch (Throwable e) {
-      zookeeperException = e;
+      logDirResult.ifPresent(r -> r.enrichWithLogDirInfo(metricsBuilder));
+
+      return metricsBuilder.build();
     }
     }
+  }
+
+  private static void fillJmxMetrics(
+      InternalClusterMetrics.InternalClusterMetricsBuilder metricsBuilder,
+      JmxMetrics jmxMetrics) {
+    metricsBuilder.metrics(jmxMetrics.getMetrics());
+    metricsBuilder.internalBrokerMetrics(jmxMetrics.getInternalBrokerMetrics());
+
+    metricsBuilder.bytesInPerSec(
+        jmxMetrics.getBytesInPerSec().values().stream()
+            .reduce(BigDecimal.ZERO, BigDecimal::add));
+
+    metricsBuilder.bytesOutPerSec(
+        jmxMetrics.getBytesOutPerSec().values().stream()
+            .reduce(BigDecimal.ZERO, BigDecimal::add));
+
+    metricsBuilder.topics(
+        metricsBuilder.build().getTopics().values().stream()
+            .map(t ->
+                t.withIoRates(
+                    jmxMetrics.getBytesInPerSec().get(t.getName()),
+                    jmxMetrics.getBytesOutPerSec().get(t.getName()))
+            ).collect(Collectors.toMap(InternalTopic::getName, t -> t))
+    );
+  }
 
 
-    InternalClusterMetrics clusterMetrics = metricsBuilder
-        .activeControllers(brokersMetrics.getActiveControllers())
-        .topicCount(topicsMetrics.getTopicCount())
-        .brokerCount(brokersMetrics.getBrokerCount())
-        .underReplicatedPartitionCount(topicsMetrics.getUnderReplicatedPartitionCount())
-        .inSyncReplicasCount(topicsMetrics.getInSyncReplicasCount())
-        .outOfSyncReplicasCount(topicsMetrics.getOutOfSyncReplicasCount())
-        .onlinePartitionCount(topicsMetrics.getOnlinePartitionCount())
-        .offlinePartitionCount(topicsMetrics.getOfflinePartitionCount())
-        .zooKeeperStatus(ClusterUtil.convertToIntServerStatus(zookeeperStatus))
-        .version(version)
-        .build();
-
-    return currentCluster.toBuilder()
-        .version(version)
-        .status(ServerStatusDTO.ONLINE)
-        .zookeeperStatus(zookeeperStatus)
-        .lastZookeeperException(zookeeperException)
-        .lastKafkaException(null)
-        .metrics(clusterMetrics)
-        .topics(topics)
-        .brokers(brokersIds)
-        .build();
+  private Mono<Optional<LogDirInfo>> getLogDirInfo(KafkaCluster cluster, ReactiveAdminClient c) {
+    if (cluster.getDisableLogDirsCollection() == null || !cluster.getDisableLogDirsCollection()) {
+      return c.describeLogDirs().map(LogDirInfo::new).map(Optional::of);
+    }
+    return Mono.just(Optional.empty());
   }
   }
 
 
-  private InternalClusterMetrics collectTopicsMetrics(Map<String, InternalTopic> topics) {
+  private static void fillTopicsMetrics(
+      InternalClusterMetrics.InternalClusterMetricsBuilder builder,
+      List<InternalTopic> topics) {
 
 
     int underReplicatedPartitions = 0;
     int underReplicatedPartitions = 0;
     int inSyncReplicasCount = 0;
     int inSyncReplicasCount = 0;
@@ -136,7 +161,7 @@ public class MetricsService {
     int onlinePartitionCount = 0;
     int onlinePartitionCount = 0;
     int offlinePartitionCount = 0;
     int offlinePartitionCount = 0;
 
 
-    for (InternalTopic topic : topics.values()) {
+    for (InternalTopic topic : topics) {
       underReplicatedPartitions += topic.getUnderReplicatedPartitions();
       underReplicatedPartitions += topic.getUnderReplicatedPartitions();
       inSyncReplicasCount += topic.getInSyncReplicas();
       inSyncReplicasCount += topic.getInSyncReplicas();
       outOfSyncReplicasCount += (topic.getReplicas() - topic.getInSyncReplicas());
       outOfSyncReplicasCount += (topic.getReplicas() - topic.getInSyncReplicas());
@@ -148,216 +173,88 @@ public class MetricsService {
               .sum();
               .sum();
     }
     }
 
 
-    return InternalClusterMetrics.builder()
+    builder
         .underReplicatedPartitionCount(underReplicatedPartitions)
         .underReplicatedPartitionCount(underReplicatedPartitions)
         .inSyncReplicasCount(inSyncReplicasCount)
         .inSyncReplicasCount(inSyncReplicasCount)
         .outOfSyncReplicasCount(outOfSyncReplicasCount)
         .outOfSyncReplicasCount(outOfSyncReplicasCount)
         .onlinePartitionCount(onlinePartitionCount)
         .onlinePartitionCount(onlinePartitionCount)
         .offlinePartitionCount(offlinePartitionCount)
         .offlinePartitionCount(offlinePartitionCount)
         .topicCount(topics.size())
         .topicCount(topics.size())
-        .build();
-  }
-
-  private Mono<InternalClusterMetrics> getClusterMetrics(ReactiveAdminClient client) {
-    return client.describeCluster().map(desc ->
-        InternalClusterMetrics.builder()
-            .brokerCount(desc.getNodes().size())
-            .activeControllers(desc.getController() != null ? 1 : 0)
-            .build()
-    );
-  }
-
-  private InternalTopic mergeWithStats(InternalTopic topic,
-                                       Map<String, LongSummaryStatistics> topics,
-                                       Map<TopicPartition, LongSummaryStatistics> partitions) {
-    final LongSummaryStatistics stats = topics.get(topic.getName());
-
-    return topic.toBuilder()
-        .segmentSize(stats.getSum())
-        .segmentCount(stats.getCount())
-        .partitions(
-            topic.getPartitions().entrySet().stream().map(e ->
-                Tuples.of(e.getKey(), mergeWithStats(topic.getName(), e.getValue(), partitions))
-            ).collect(Collectors.toMap(
-                Tuple2::getT1,
-                Tuple2::getT2
-            ))
-        ).build();
-  }
-
-  private InternalPartition mergeWithStats(String topic, InternalPartition partition,
-                                           Map<TopicPartition, LongSummaryStatistics> partitions) {
-    final LongSummaryStatistics stats =
-        partitions.get(new TopicPartition(topic, partition.getPartition()));
-    return partition.toBuilder()
-        .segmentSize(stats.getSum())
-        .segmentCount(stats.getCount())
-        .build();
+        .topics(topics.stream().collect(Collectors.toMap(InternalTopic::getName, t -> t)));
   }
   }
 
 
-  private Mono<InternalSegmentSizeDto> emptySegmentMetrics(InternalClusterMetrics clusterMetrics,
-                                                            List<InternalTopic> internalTopics) {
-    return Mono.just(
-        InternalSegmentSizeDto.builder()
-        .clusterMetricsWithSegmentSize(
-            clusterMetrics.toBuilder()
-                .segmentSize(0)
-                .segmentCount(0)
-                .internalBrokerDiskUsage(Collections.emptyMap())
-                .build()
-        )
-        .internalTopicWithSegmentSize(
-            internalTopics.stream().collect(
-                Collectors.toMap(
-                    InternalTopic::getName,
-                    i -> i
-                )
-            )
-        ).build()
-    );
-  }
-
-  private Mono<InternalSegmentSizeDto> updateSegmentMetrics(ReactiveAdminClient ac,
-                                                            InternalClusterMetrics clusterMetrics,
-                                                            List<InternalTopic> internalTopics) {
-    return ac.describeCluster().flatMap(
-        clusterDescription ->
-                ac.describeLogDirs().map(log -> {
-                  final List<Tuple3<Integer, TopicPartition, Long>> topicPartitions =
-                      log.entrySet().stream().flatMap(b ->
-                          b.getValue().entrySet().stream().flatMap(topicMap ->
-                              topicMap.getValue().replicaInfos.entrySet().stream()
-                                  .map(e -> Tuples.of(b.getKey(), e.getKey(), e.getValue().size))
-                          )
-                      ).collect(Collectors.toList());
-
-                  final Map<TopicPartition, LongSummaryStatistics> partitionStats =
-                      topicPartitions.stream().collect(
-                          Collectors.groupingBy(
-                              Tuple2::getT2,
-                              Collectors.summarizingLong(Tuple3::getT3)
-                          )
-                      );
-
-                  final Map<String, LongSummaryStatistics> topicStats =
-                      topicPartitions.stream().collect(
-                          Collectors.groupingBy(
-                              t -> t.getT2().topic(),
-                              Collectors.summarizingLong(Tuple3::getT3)
-                          )
-                      );
-
-                  final Map<Integer, LongSummaryStatistics> brokerStats =
-                      topicPartitions.stream().collect(
-                          Collectors.groupingBy(
-                              Tuple2::getT1,
-                              Collectors.summarizingLong(Tuple3::getT3)
-                          )
-                      );
-
-
-                  final LongSummaryStatistics summary =
-                      topicPartitions.stream().collect(Collectors.summarizingLong(Tuple3::getT3));
-
-
-                  final Map<String, InternalTopic> resultTopics = internalTopics.stream().map(e ->
-                      Tuples.of(e.getName(), mergeWithStats(e, topicStats, partitionStats))
-                  ).collect(Collectors.toMap(
-                      Tuple2::getT1,
-                      Tuple2::getT2
-                  ));
-
-                  final Map<Integer, InternalBrokerDiskUsage> resultBrokers =
-                      brokerStats.entrySet().stream().map(e ->
-                          Tuples.of(e.getKey(), InternalBrokerDiskUsage.builder()
-                              .segmentSize(e.getValue().getSum())
-                              .segmentCount(e.getValue().getCount())
-                              .build()
-                          )
-                      ).collect(Collectors.toMap(
-                          Tuple2::getT1,
-                          Tuple2::getT2
-                      ));
-
-                  return InternalSegmentSizeDto.builder()
-                      .clusterMetricsWithSegmentSize(
-                          clusterMetrics.toBuilder()
-                              .segmentSize(summary.getSum())
-                              .segmentCount(summary.getCount())
-                              .internalBrokerDiskUsage(resultBrokers)
-                              .build()
-                      )
-                      .internalTopicWithSegmentSize(resultTopics).build();
-                })
-    );
-  }
+  private static class LogDirInfo {
+
+    private final Map<TopicPartition, LongSummaryStatistics> partitionsStats;
+    private final Map<String, LongSummaryStatistics> topicStats;
+    private final Map<Integer, LongSummaryStatistics> brokerStats;
+
+    LogDirInfo(Map<Integer, Map<String, DescribeLogDirsResponse.LogDirInfo>> log) {
+      final List<Tuple3<Integer, TopicPartition, Long>> topicPartitions =
+          log.entrySet().stream().flatMap(b ->
+              b.getValue().entrySet().stream().flatMap(topicMap ->
+                  topicMap.getValue().replicaInfos.entrySet().stream()
+                      .map(e -> Tuples.of(b.getKey(), e.getKey(), e.getValue().size))
+              )
+          ).collect(toList());
+
+      partitionsStats = topicPartitions.stream().collect(
+          groupingBy(
+              Tuple2::getT2,
+              summarizingLong(Tuple3::getT3)));
+
+      topicStats =
+          topicPartitions.stream().collect(
+              groupingBy(
+                  t -> t.getT2().topic(),
+                  summarizingLong(Tuple3::getT3)));
+
+      brokerStats = topicPartitions.stream().collect(
+          groupingBy(
+              Tuple2::getT1,
+              summarizingLong(Tuple3::getT3)));
+    }
 
 
-  private List<MetricDTO> getJmxMetric(KafkaCluster cluster, Node node) {
-    return Optional.of(cluster)
-        .filter(c -> c.getJmxPort() != null)
-        .filter(c -> c.getJmxPort() > 0)
-        .map(c -> jmxClusterUtil.getJmxMetrics(node.host(), c.getJmxPort(), c.isJmxSsl(),
-                c.getJmxUsername(), c.getJmxPassword()))
-        .orElse(Collections.emptyList());
-  }
+    private InternalTopic enrichTopicWithSegmentStats(InternalTopic topic) {
+      LongSummaryStatistics stats = topicStats.get(topic.getName());
+      return topic.withSegmentStats(stats.getSum(), stats.getCount())
+          .toBuilder()
+          .partitions(
+              topic.getPartitions().entrySet().stream().map(e ->
+                  Tuples.of(e.getKey(),
+                      enrichPartitionWithSegmentsData(topic.getName(), e.getValue()))
+              ).collect(toMap(Tuple2::getT1, Tuple2::getT2))
+          ).build();
+    }
 
 
-  private Mono<InternalClusterMetrics> fillJmxMetrics(InternalClusterMetrics internalClusterMetrics,
-                                                      KafkaCluster cluster,
-                                                      ReactiveAdminClient ac) {
-    return fillBrokerMetrics(internalClusterMetrics, cluster, ac)
-        .map(this::calculateClusterMetrics);
-  }
+    private InternalPartition enrichPartitionWithSegmentsData(String topic,
+                                                              InternalPartition partition) {
+      final LongSummaryStatistics stats =
+          partitionsStats.get(new TopicPartition(topic, partition.getPartition()));
+      return partition.withSegmentStats(stats.getSum(), stats.getCount());
+    }
 
 
-  private Mono<InternalClusterMetrics> fillBrokerMetrics(
-      InternalClusterMetrics internalClusterMetrics, KafkaCluster cluster, ReactiveAdminClient ac) {
-    return ac.describeCluster()
-        .flatMapIterable(ReactiveAdminClient.ClusterDescription::getNodes)
-        .map(broker ->
-            Map.of(broker.id(), InternalBrokerMetrics.builder()
-                .metrics(getJmxMetric(cluster, broker)).build())
-        )
-        .collectList()
-        .map(s -> internalClusterMetrics.toBuilder()
-            .internalBrokerMetrics(ClusterUtil.toSingleMap(s.stream())).build());
-  }
+    private Map<Integer, InternalBrokerDiskUsage> getBrokersDiskUsage() {
+      return brokerStats.entrySet().stream().map(e ->
+          Tuples.of(e.getKey(), InternalBrokerDiskUsage.builder()
+              .segmentSize(e.getValue().getSum())
+              .segmentCount(e.getValue().getCount())
+              .build()
+          )
+      ).collect(toMap(Tuple2::getT1, Tuple2::getT2));
+    }
 
 
-  private InternalClusterMetrics calculateClusterMetrics(
-      InternalClusterMetrics internalClusterMetrics) {
-    final List<MetricDTO> metrics = internalClusterMetrics.getInternalBrokerMetrics().values()
-        .stream()
-        .flatMap(b -> b.getMetrics().stream())
-        .collect(
-            Collectors.groupingBy(
-                MetricDTO::getCanonicalName,
-                Collectors.reducing(jmxClusterUtil::reduceJmxMetrics)
-            )
-        ).values().stream()
-        .filter(Optional::isPresent)
-        .map(Optional::get)
-        .collect(Collectors.toList());
-    final InternalClusterMetrics.InternalClusterMetricsBuilder metricsBuilder =
-        internalClusterMetrics.toBuilder().metrics(metrics);
-    metricsBuilder.bytesInPerSec(findTopicMetrics(
-        metrics, JmxMetricsName.BytesInPerSec, JmxMetricsValueName.FiveMinuteRate
-    ));
-    metricsBuilder.bytesOutPerSec(findTopicMetrics(
-        metrics, JmxMetricsName.BytesOutPerSec, JmxMetricsValueName.FiveMinuteRate
-    ));
-    return metricsBuilder.build();
-  }
+    private Map<String, InternalTopic> enrichTopics(Map<String, InternalTopic> topics) {
+      return topics.values().stream()
+          .map(this::enrichTopicWithSegmentStats)
+          .collect(Collectors.toMap(InternalTopic::getName, t -> t));
+    }
 
 
-  private Map<String, BigDecimal> findTopicMetrics(List<MetricDTO> metrics,
-                                                   JmxMetricsName metricsName,
-                                                   JmxMetricsValueName valueName) {
-    return metrics.stream().filter(m -> metricsName.name().equals(m.getName()))
-        .filter(m -> m.getParams().containsKey("topic"))
-        .filter(m -> m.getValue().containsKey(valueName.name()))
-        .map(m -> Tuples.of(
-            m.getParams().get("topic"),
-            m.getValue().get(valueName.name())
-        )).collect(Collectors.groupingBy(
-            Tuple2::getT1,
-            Collectors.reducing(BigDecimal.ZERO, Tuple2::getT2, BigDecimal::add)
-        ));
+    public void enrichWithLogDirInfo(
+        InternalClusterMetrics.InternalClusterMetricsBuilder builder) {
+      builder
+          .topics(enrichTopics(builder.build().getTopics()))
+          .internalBrokerDiskUsage(getBrokersDiskUsage());
+    }
   }
   }
 }
 }

+ 8 - 0
kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/ReactiveAdminClient.java

@@ -18,6 +18,7 @@ import java.util.Set;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutionException;
 import java.util.stream.Collectors;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 import java.util.stream.Stream;
+import javax.annotation.Nullable;
 import lombok.RequiredArgsConstructor;
 import lombok.RequiredArgsConstructor;
 import lombok.Value;
 import lombok.Value;
 import lombok.extern.log4j.Log4j2;
 import lombok.extern.log4j.Log4j2;
@@ -59,6 +60,7 @@ public class ReactiveAdminClient implements Closeable {
 
 
   @Value
   @Value
   public static class ClusterDescription {
   public static class ClusterDescription {
+    @Nullable
     Node controller;
     Node controller;
     String clusterId;
     String clusterId;
     Collection<Node> nodes;
     Collection<Node> nodes;
@@ -70,6 +72,7 @@ public class ReactiveAdminClient implements Closeable {
         .map(ver ->
         .map(ver ->
             new ReactiveAdminClient(
             new ReactiveAdminClient(
                 adminClient,
                 adminClient,
+                ver,
                 Set.of(getSupportedUpdateFeatureForVersion(ver))));
                 Set.of(getSupportedUpdateFeatureForVersion(ver))));
   }
   }
 
 
@@ -94,6 +97,7 @@ public class ReactiveAdminClient implements Closeable {
   //---------------------------------------------------------------------------------
   //---------------------------------------------------------------------------------
 
 
   private final AdminClient client;
   private final AdminClient client;
+  private final String version;
   private final Set<SupportedFeature> features;
   private final Set<SupportedFeature> features;
 
 
   public Mono<Set<String>> listTopics(boolean listInternal) {
   public Mono<Set<String>> listTopics(boolean listInternal) {
@@ -104,6 +108,10 @@ public class ReactiveAdminClient implements Closeable {
     return toMono(client.deleteTopics(List.of(topicName)).all());
     return toMono(client.deleteTopics(List.of(topicName)).all());
   }
   }
 
 
+  public String getVersion() {
+    return version;
+  }
+
   public Mono<Map<String, List<ConfigEntry>>> getTopicsConfig(Collection<String> topicNames) {
   public Mono<Map<String, List<ConfigEntry>>> getTopicsConfig(Collection<String> topicNames) {
     List<ConfigResource> resources = topicNames.stream()
     List<ConfigResource> resources = topicNames.stream()
         .map(topicName -> new ConfigResource(ConfigResource.Type.TOPIC, topicName))
         .map(topicName -> new ConfigResource(ConfigResource.Type.TOPIC, topicName))

+ 32 - 25
kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/TopicsService.java

@@ -64,7 +64,7 @@ public class TopicsService {
     Predicate<Integer> positiveInt = i -> i > 0;
     Predicate<Integer> positiveInt = i -> i > 0;
     int perPage = nullablePerPage.filter(positiveInt).orElse(DEFAULT_PAGE_SIZE);
     int perPage = nullablePerPage.filter(positiveInt).orElse(DEFAULT_PAGE_SIZE);
     var topicsToSkip = (page.filter(positiveInt).orElse(1) - 1) * perPage;
     var topicsToSkip = (page.filter(positiveInt).orElse(1) - 1) * perPage;
-    List<InternalTopic> topics = cluster.getTopics().values().stream()
+    List<InternalTopic> topics = cluster.getMetrics().getTopics().values().stream()
         .filter(topic -> !topic.isInternal()
         .filter(topic -> !topic.isInternal()
             || showInternal
             || showInternal
             .map(i -> topic.isInternal() == i)
             .map(i -> topic.isInternal() == i)
@@ -110,11 +110,11 @@ public class TopicsService {
     }
     }
   }
   }
 
 
-  public Optional<TopicDetailsDTO> getTopicDetails(KafkaCluster cluster, String topicName) {
-    return Optional.ofNullable(cluster.getTopics()).map(l -> l.get(topicName)).map(
-        t -> t.toBuilder().partitions(getTopicPartitions(cluster, t)
-        ).build()
-    ).map(t -> clusterMapper.toTopicDetails(t, cluster.getMetrics()));
+  public TopicDetailsDTO getTopicDetails(KafkaCluster cluster, String topicName) {
+    var topic = getTopic(cluster, topicName);
+    var upToDatePartitions = getTopicPartitions(cluster, topic);
+    topic = topic.toBuilder().partitions(upToDatePartitions).build();
+    return clusterMapper.toTopicDetails(topic);
   }
   }
 
 
   @SneakyThrows
   @SneakyThrows
@@ -135,12 +135,11 @@ public class TopicsService {
         .flatMapMany(Flux::fromIterable);
         .flatMapMany(Flux::fromIterable);
   }
   }
 
 
-  public Optional<List<TopicConfigDTO>> getTopicConfigs(KafkaCluster cluster, String topicName) {
-    return Optional.of(cluster)
-        .map(KafkaCluster::getTopics)
-        .map(t -> t.get(topicName))
-        .map(t -> t.getTopicConfigs().stream().map(clusterMapper::toTopicConfig)
-            .collect(Collectors.toList()));
+  public List<TopicConfigDTO> getTopicConfigs(KafkaCluster cluster, String topicName) {
+    var configs =  getTopic(cluster, topicName).getTopicConfigs();
+    return configs.stream()
+        .map(clusterMapper::toTopicConfig)
+        .collect(Collectors.toList());
   }
   }
 
 
 
 
@@ -234,7 +233,7 @@ public class TopicsService {
       ReplicationFactorChangeDTO replicationFactorChange) {
       ReplicationFactorChangeDTO replicationFactorChange) {
     return adminClientService.get(cluster)
     return adminClientService.get(cluster)
         .flatMap(ac -> {
         .flatMap(ac -> {
-          Integer actual = cluster.getTopics().get(topicName).getReplicationFactor();
+          Integer actual = getTopic(cluster, topicName).getReplicationFactor();
           Integer requested = replicationFactorChange.getTotalReplicationFactor();
           Integer requested = replicationFactorChange.getTotalReplicationFactor();
           Integer brokersCount = cluster.getMetrics().getBrokerCount();
           Integer brokersCount = cluster.getMetrics().getBrokerCount();
 
 
@@ -267,7 +266,7 @@ public class TopicsService {
     Map<Integer, List<Integer>> currentAssignment = getCurrentAssignment(cluster, topicName);
     Map<Integer, List<Integer>> currentAssignment = getCurrentAssignment(cluster, topicName);
     // Brokers map (Broker id -> count)
     // Brokers map (Broker id -> count)
     Map<Integer, Integer> brokersUsage = getBrokersMap(cluster, currentAssignment);
     Map<Integer, Integer> brokersUsage = getBrokersMap(cluster, currentAssignment);
-    int currentReplicationFactor = cluster.getTopics().get(topicName).getReplicationFactor();
+    int currentReplicationFactor = getTopic(cluster, topicName).getReplicationFactor();
 
 
     // If we should to increase Replication factor
     // If we should to increase Replication factor
     if (replicationFactorChange.getTotalReplicationFactor() > currentReplicationFactor) {
     if (replicationFactorChange.getTotalReplicationFactor() > currentReplicationFactor) {
@@ -311,7 +310,7 @@ public class TopicsService {
         // while (partition replicas count != requested replication factor)
         // while (partition replicas count != requested replication factor)
         for (Integer broker : brokersUsageList) {
         for (Integer broker : brokersUsageList) {
           // Check is the broker the leader of partition
           // Check is the broker the leader of partition
-          if (!cluster.getTopics().get(topicName).getPartitions().get(partition).getLeader()
+          if (!getTopic(cluster, topicName).getPartitions().get(partition).getLeader()
               .equals(broker)) {
               .equals(broker)) {
             brokers.remove(broker);
             brokers.remove(broker);
             brokersUsage.merge(broker, -1, Integer::sum);
             brokersUsage.merge(broker, -1, Integer::sum);
@@ -336,7 +335,7 @@ public class TopicsService {
   }
   }
 
 
   private Map<Integer, List<Integer>> getCurrentAssignment(KafkaCluster cluster, String topicName) {
   private Map<Integer, List<Integer>> getCurrentAssignment(KafkaCluster cluster, String topicName) {
-    return cluster.getTopics().get(topicName).getPartitions().values().stream()
+    return getTopic(cluster, topicName).getPartitions().values().stream()
         .collect(Collectors.toMap(
         .collect(Collectors.toMap(
             InternalPartition::getPartition,
             InternalPartition::getPartition,
             p -> p.getReplicas().stream()
             p -> p.getReplicas().stream()
@@ -347,7 +346,7 @@ public class TopicsService {
 
 
   private Map<Integer, Integer> getBrokersMap(KafkaCluster cluster,
   private Map<Integer, Integer> getBrokersMap(KafkaCluster cluster,
                                               Map<Integer, List<Integer>> currentAssignment) {
                                               Map<Integer, List<Integer>> currentAssignment) {
-    Map<Integer, Integer> result = cluster.getBrokers().stream()
+    Map<Integer, Integer> result = cluster.getMetrics().getBrokers().stream()
         .collect(Collectors.toMap(
         .collect(Collectors.toMap(
             c -> c,
             c -> c,
             c -> 0
             c -> 0
@@ -364,7 +363,7 @@ public class TopicsService {
       PartitionsIncreaseDTO partitionsIncrease) {
       PartitionsIncreaseDTO partitionsIncrease) {
     return adminClientService.get(cluster)
     return adminClientService.get(cluster)
         .flatMap(ac -> {
         .flatMap(ac -> {
-          Integer actualCount = cluster.getTopics().get(topicName).getPartitionCount();
+          Integer actualCount = getTopic(cluster, topicName).getPartitionCount();
           Integer requestedCount = partitionsIncrease.getTotalPartitionsCount();
           Integer requestedCount = partitionsIncrease.getTotalPartitionsCount();
 
 
           if (requestedCount < actualCount) {
           if (requestedCount < actualCount) {
@@ -407,10 +406,11 @@ public class TopicsService {
       final Map<TopicPartition, Long> latest = consumer.endOffsets(tps);
       final Map<TopicPartition, Long> latest = consumer.endOffsets(tps);
 
 
       return tps.stream()
       return tps.stream()
-          .map(tp -> partitions.get(tp.partition()).toBuilder()
-              .offsetMin(Optional.ofNullable(earliest.get(tp)).orElse(0L))
-              .offsetMax(Optional.ofNullable(latest.get(tp)).orElse(0L))
-              .build()
+          .map(tp -> partitions.get(tp.partition())
+              .withOffsets(
+                  earliest.getOrDefault(tp, -1L),
+                  latest.getOrDefault(tp, -1L)
+              )
           ).collect(Collectors.toMap(
           ).collect(Collectors.toMap(
               InternalPartition::getPartition,
               InternalPartition::getPartition,
               tp -> tp
               tp -> tp
@@ -421,8 +421,7 @@ public class TopicsService {
   }
   }
 
 
   public Mono<Void> deleteTopic(KafkaCluster cluster, String topicName) {
   public Mono<Void> deleteTopic(KafkaCluster cluster, String topicName) {
-    var topicDetails = getTopicDetails(cluster, topicName)
-        .orElseThrow(TopicNotFoundException::new);
+    var topicDetails = getTopicDetails(cluster, topicName);
     if (cluster.getFeatures().contains(Feature.TOPIC_DELETION)) {
     if (cluster.getFeatures().contains(Feature.TOPIC_DELETION)) {
       return adminClientService.get(cluster).flatMap(c -> c.deleteTopic(topicName))
       return adminClientService.get(cluster).flatMap(c -> c.deleteTopic(topicName))
           .doOnSuccess(t -> clustersStorage.onTopicDeleted(cluster.getName(), topicName));
           .doOnSuccess(t -> clustersStorage.onTopicDeleted(cluster.getName(), topicName));
@@ -432,7 +431,7 @@ public class TopicsService {
   }
   }
 
 
   public TopicMessageSchemaDTO getTopicSchema(KafkaCluster cluster, String topicName) {
   public TopicMessageSchemaDTO getTopicSchema(KafkaCluster cluster, String topicName) {
-    if (!cluster.getTopics().containsKey(topicName)) {
+    if (!cluster.getMetrics().getTopics().containsKey(topicName)) {
       throw new TopicNotFoundException();
       throw new TopicNotFoundException();
     }
     }
     return deserializationService
     return deserializationService
@@ -440,4 +439,12 @@ public class TopicsService {
         .getTopicSchema(topicName);
         .getTopicSchema(topicName);
   }
   }
 
 
+  private InternalTopic getTopic(KafkaCluster c, String topicName) {
+    var topic = c.getMetrics().getTopics().get(topicName);
+    if (topic == null) {
+      throw new TopicNotFoundException();
+    }
+    return topic;
+  }
+
 }
 }

+ 21 - 2
kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/ZookeeperService.java

@@ -2,16 +2,19 @@ package com.provectus.kafka.ui.service;
 
 
 import com.provectus.kafka.ui.exception.ZooKeeperException;
 import com.provectus.kafka.ui.exception.ZooKeeperException;
 import com.provectus.kafka.ui.model.KafkaCluster;
 import com.provectus.kafka.ui.model.KafkaCluster;
+import com.provectus.kafka.ui.model.ServerStatusDTO;
 import java.io.IOException;
 import java.io.IOException;
 import java.util.Map;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentHashMap;
+import javax.annotation.Nullable;
 import lombok.RequiredArgsConstructor;
 import lombok.RequiredArgsConstructor;
+import lombok.Value;
 import lombok.extern.log4j.Log4j2;
 import lombok.extern.log4j.Log4j2;
 import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.ZooKeeper;
 import org.apache.zookeeper.ZooKeeper;
-import org.jetbrains.annotations.Nullable;
 import org.springframework.stereotype.Service;
 import org.springframework.stereotype.Service;
 import org.springframework.util.StringUtils;
 import org.springframework.util.StringUtils;
+import reactor.core.publisher.Mono;
 
 
 @Service
 @Service
 @RequiredArgsConstructor
 @RequiredArgsConstructor
@@ -20,7 +23,23 @@ public class ZookeeperService {
 
 
   private final Map<String, ZooKeeper> cachedZkClient = new ConcurrentHashMap<>();
   private final Map<String, ZooKeeper> cachedZkClient = new ConcurrentHashMap<>();
 
 
-  public boolean isZookeeperOnline(KafkaCluster kafkaCluster) {
+  @Value
+  public static class ZkStatus {
+    ServerStatusDTO status;
+    @Nullable
+    Throwable error;
+  }
+
+  public Mono<ZkStatus> getZkStatus(KafkaCluster kafkaCluster) {
+    return Mono.fromSupplier(() ->
+            new ZkStatus(
+                isZookeeperOnline(kafkaCluster)
+                    ? ServerStatusDTO.ONLINE
+                    : ServerStatusDTO.OFFLINE, null))
+        .onErrorResume(th -> Mono.just(new ZkStatus(ServerStatusDTO.OFFLINE, th)));
+  }
+
+  private boolean isZookeeperOnline(KafkaCluster kafkaCluster) {
     var isConnected = false;
     var isConnected = false;
     if (StringUtils.hasText(kafkaCluster.getZookeeper())) {
     if (StringUtils.hasText(kafkaCluster.getZookeeper())) {
       var zkClient = getOrCreateZkClient(kafkaCluster);
       var zkClient = getOrCreateZkClient(kafkaCluster);

+ 81 - 4
kafka-ui-api/src/main/java/com/provectus/kafka/ui/util/JmxClusterUtil.java

@@ -1,10 +1,17 @@
 package com.provectus.kafka.ui.util;
 package com.provectus.kafka.ui.util;
 
 
+import static java.util.stream.Collectors.groupingBy;
+import static java.util.stream.Collectors.reducing;
+import static java.util.stream.Collectors.toList;
+
+import com.provectus.kafka.ui.model.JmxBrokerMetrics;
 import com.provectus.kafka.ui.model.JmxConnectionInfo;
 import com.provectus.kafka.ui.model.JmxConnectionInfo;
+import com.provectus.kafka.ui.model.KafkaCluster;
 import com.provectus.kafka.ui.model.MetricDTO;
 import com.provectus.kafka.ui.model.MetricDTO;
 import java.math.BigDecimal;
 import java.math.BigDecimal;
 import java.util.ArrayList;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Arrays;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashMap;
 import java.util.Hashtable;
 import java.util.Hashtable;
@@ -17,12 +24,19 @@ import javax.management.MBeanAttributeInfo;
 import javax.management.MBeanServerConnection;
 import javax.management.MBeanServerConnection;
 import javax.management.ObjectName;
 import javax.management.ObjectName;
 import javax.management.remote.JMXConnector;
 import javax.management.remote.JMXConnector;
+import lombok.Builder;
 import lombok.RequiredArgsConstructor;
 import lombok.RequiredArgsConstructor;
 import lombok.SneakyThrows;
 import lombok.SneakyThrows;
+import lombok.Value;
 import lombok.extern.log4j.Log4j2;
 import lombok.extern.log4j.Log4j2;
 import org.apache.commons.pool2.KeyedObjectPool;
 import org.apache.commons.pool2.KeyedObjectPool;
+import org.apache.kafka.common.Node;
 import org.jetbrains.annotations.Nullable;
 import org.jetbrains.annotations.Nullable;
 import org.springframework.stereotype.Component;
 import org.springframework.stereotype.Component;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+import reactor.util.function.Tuple2;
+import reactor.util.function.Tuples;
 
 
 @Component
 @Component
 @Log4j2
 @Log4j2
@@ -35,8 +49,34 @@ public class JmxClusterUtil {
   private static final String NAME_METRIC_FIELD = "name";
   private static final String NAME_METRIC_FIELD = "name";
   private final KeyedObjectPool<JmxConnectionInfo, JMXConnector> pool;
   private final KeyedObjectPool<JmxConnectionInfo, JMXConnector> pool;
 
 
+  @Builder
+  @Value
+  public static class JmxMetrics {
+    Map<String, BigDecimal> bytesInPerSec;
+    Map<String, BigDecimal> bytesOutPerSec;
+    Map<Integer, JmxBrokerMetrics> internalBrokerMetrics;
+    List<MetricDTO> metrics;
+  }
+
+  public Mono<JmxMetrics> getBrokerMetrics(KafkaCluster cluster, Collection<Node> nodes) {
+    return Flux.fromIterable(nodes)
+        .map(n -> Map.entry(n.id(),
+            JmxBrokerMetrics.builder().metrics(getJmxMetric(cluster, n)).build()))
+        .collectMap(Map.Entry::getKey, Map.Entry::getValue)
+        .map(this::collectMetrics);
+  }
+
+  private List<MetricDTO> getJmxMetric(KafkaCluster cluster, Node node) {
+    return Optional.of(cluster)
+        .filter(c -> c.getJmxPort() != null)
+        .filter(c -> c.getJmxPort() > 0)
+        .map(c -> getJmxMetrics(node.host(), c.getJmxPort(), c.isJmxSsl(),
+            c.getJmxUsername(), c.getJmxPassword()))
+        .orElse(Collections.emptyList());
+  }
+
   @SneakyThrows
   @SneakyThrows
-  public List<MetricDTO> getJmxMetrics(String host, int port, boolean jmxSsl,
+  private List<MetricDTO> getJmxMetrics(String host, int port, boolean jmxSsl,
                                     @Nullable String username, @Nullable String password) {
                                     @Nullable String username, @Nullable String password) {
     String jmxUrl = JMX_URL + host + ":" + port + "/" + JMX_SERVICE_TYPE;
     String jmxUrl = JMX_URL + host + ":" + port + "/" + JMX_SERVICE_TYPE;
     final var connectionInfo = JmxConnectionInfo.builder()
     final var connectionInfo = JmxConnectionInfo.builder()
@@ -65,7 +105,7 @@ public class JmxClusterUtil {
         metric.setName(params.get(NAME_METRIC_FIELD));
         metric.setName(params.get(NAME_METRIC_FIELD));
         metric.setCanonicalName(jmxMetric.getCanonicalName());
         metric.setCanonicalName(jmxMetric.getCanonicalName());
         metric.setParams(params);
         metric.setParams(params);
-        metric.setValue(getJmxMetric(jmxMetric.getCanonicalName(), msc));
+        metric.setValue(getJmxMetrics(jmxMetric.getCanonicalName(), msc));
         result.add(metric);
         result.add(metric);
       }
       }
       pool.returnObject(connectionInfo, srv);
       pool.returnObject(connectionInfo, srv);
@@ -76,9 +116,8 @@ public class JmxClusterUtil {
     return result;
     return result;
   }
   }
 
 
-
   @SneakyThrows
   @SneakyThrows
-  private Map<String, BigDecimal> getJmxMetric(String canonicalName, MBeanServerConnection msc) {
+  private Map<String, BigDecimal> getJmxMetrics(String canonicalName, MBeanServerConnection msc) {
     Map<String, BigDecimal> resultAttr = new HashMap<>();
     Map<String, BigDecimal> resultAttr = new HashMap<>();
     ObjectName name = new ObjectName(canonicalName);
     ObjectName name = new ObjectName(canonicalName);
     var attrNames = msc.getMBeanInfo(name).getAttributes();
     var attrNames = msc.getMBeanInfo(name).getAttributes();
@@ -91,6 +130,44 @@ public class JmxClusterUtil {
     return resultAttr;
     return resultAttr;
   }
   }
 
 
+  private JmxMetrics collectMetrics(Map<Integer, JmxBrokerMetrics> perBrokerJmxMetrics) {
+    final List<MetricDTO> metrics = perBrokerJmxMetrics.values()
+        .stream()
+        .flatMap(b -> b.getMetrics().stream())
+        .collect(
+            groupingBy(
+                MetricDTO::getCanonicalName,
+                reducing(this::reduceJmxMetrics)
+            )
+        ).values().stream()
+        .filter(Optional::isPresent)
+        .map(Optional::get)
+        .collect(toList());
+    return JmxMetrics.builder()
+        .metrics(metrics)
+        .internalBrokerMetrics(perBrokerJmxMetrics)
+        .bytesInPerSec(findTopicMetrics(
+            metrics, JmxMetricsName.BytesInPerSec, JmxMetricsValueName.FiveMinuteRate))
+        .bytesOutPerSec(findTopicMetrics(
+            metrics, JmxMetricsName.BytesOutPerSec, JmxMetricsValueName.FiveMinuteRate))
+        .build();
+  }
+
+  private Map<String, BigDecimal> findTopicMetrics(List<MetricDTO> metrics,
+                                                   JmxMetricsName metricsName,
+                                                   JmxMetricsValueName valueName) {
+    return metrics.stream().filter(m -> metricsName.name().equals(m.getName()))
+        .filter(m -> m.getParams().containsKey("topic"))
+        .filter(m -> m.getValue().containsKey(valueName.name()))
+        .map(m -> Tuples.of(
+            m.getParams().get("topic"),
+            m.getValue().get(valueName.name())
+        )).collect(groupingBy(
+            Tuple2::getT1,
+            reducing(BigDecimal.ZERO, Tuple2::getT2, BigDecimal::add)
+        ));
+  }
+
   private void closeConnectionExceptionally(String url, JMXConnector srv) {
   private void closeConnectionExceptionally(String url, JMXConnector srv) {
     try {
     try {
       pool.invalidateObject(new JmxConnectionInfo(url), srv);
       pool.invalidateObject(new JmxConnectionInfo(url), srv);

+ 28 - 36
kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/TopicsServiceTest.java

@@ -3,6 +3,7 @@ package com.provectus.kafka.ui.service;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThat;
 
 
 import com.provectus.kafka.ui.mapper.ClusterMapper;
 import com.provectus.kafka.ui.mapper.ClusterMapper;
+import com.provectus.kafka.ui.model.InternalClusterMetrics;
 import com.provectus.kafka.ui.model.InternalTopic;
 import com.provectus.kafka.ui.model.InternalTopic;
 import com.provectus.kafka.ui.model.InternalTopicConfig;
 import com.provectus.kafka.ui.model.InternalTopicConfig;
 import com.provectus.kafka.ui.model.KafkaCluster;
 import com.provectus.kafka.ui.model.KafkaCluster;
@@ -43,16 +44,14 @@ class TopicsServiceTest {
 
 
   @Test
   @Test
   public void shouldListFirst25Topics() {
   public void shouldListFirst25Topics() {
-    final KafkaCluster cluster = KafkaCluster.builder()
-        .topics(
+    final KafkaCluster cluster = clusterWithTopics(
             IntStream.rangeClosed(1, 100).boxed()
             IntStream.rangeClosed(1, 100).boxed()
                 .map(Objects::toString)
                 .map(Objects::toString)
                 .collect(Collectors.toMap(Function.identity(), e -> InternalTopic.builder()
                 .collect(Collectors.toMap(Function.identity(), e -> InternalTopic.builder()
                     .partitions(Map.of())
                     .partitions(Map.of())
                     .name(e)
                     .name(e)
                     .build()))
                     .build()))
-        )
-        .build();
+        );
 
 
     var topics = topicsService.getTopics(cluster,
     var topics = topicsService.getTopics(cluster,
         Optional.empty(), Optional.empty(), Optional.empty(),
         Optional.empty(), Optional.empty(), Optional.empty(),
@@ -64,16 +63,14 @@ class TopicsServiceTest {
 
 
   @Test
   @Test
   public void shouldCalculateCorrectPageCountForNonDivisiblePageSize() {
   public void shouldCalculateCorrectPageCountForNonDivisiblePageSize() {
-    var cluster = KafkaCluster.builder()
-        .topics(
+    var cluster = clusterWithTopics(
             IntStream.rangeClosed(1, 100).boxed()
             IntStream.rangeClosed(1, 100).boxed()
                 .map(Objects::toString)
                 .map(Objects::toString)
                 .collect(Collectors.toMap(Function.identity(), e -> InternalTopic.builder()
                 .collect(Collectors.toMap(Function.identity(), e -> InternalTopic.builder()
                     .partitions(Map.of())
                     .partitions(Map.of())
                     .name(e)
                     .name(e)
                     .build()))
                     .build()))
-        )
-        .build();
+        );
 
 
     var topics = topicsService.getTopics(cluster, Optional.of(4), Optional.of(33),
     var topics = topicsService.getTopics(cluster, Optional.of(4), Optional.of(33),
         Optional.empty(), Optional.empty(), Optional.empty());
         Optional.empty(), Optional.empty(), Optional.empty());
@@ -84,17 +81,14 @@ class TopicsServiceTest {
 
 
   @Test
   @Test
   public void shouldCorrectlyHandleNonPositivePageNumberAndPageSize() {
   public void shouldCorrectlyHandleNonPositivePageNumberAndPageSize() {
-    var cluster = KafkaCluster.builder()
-        .topics(
+    var cluster = clusterWithTopics(
             IntStream.rangeClosed(1, 100).boxed()
             IntStream.rangeClosed(1, 100).boxed()
                 .map(Objects::toString)
                 .map(Objects::toString)
                 .collect(Collectors.toMap(Function.identity(), e -> InternalTopic.builder()
                 .collect(Collectors.toMap(Function.identity(), e -> InternalTopic.builder()
                     .partitions(Map.of())
                     .partitions(Map.of())
                     .name(e)
                     .name(e)
                     .build()))
                     .build()))
-        )
-        .build();
-
+        );
 
 
     var topics = topicsService.getTopics(cluster, Optional.of(0), Optional.of(-1),
     var topics = topicsService.getTopics(cluster, Optional.of(0), Optional.of(-1),
         Optional.empty(), Optional.empty(), Optional.empty());
         Optional.empty(), Optional.empty(), Optional.empty());
@@ -105,8 +99,7 @@ class TopicsServiceTest {
 
 
   @Test
   @Test
   public void shouldListBotInternalAndNonInternalTopics() {
   public void shouldListBotInternalAndNonInternalTopics() {
-    var cluster = KafkaCluster.builder()
-        .topics(
+    var cluster = clusterWithTopics(
             IntStream.rangeClosed(1, 100).boxed()
             IntStream.rangeClosed(1, 100).boxed()
                 .map(Objects::toString)
                 .map(Objects::toString)
                 .collect(Collectors.toMap(Function.identity(), e -> InternalTopic.builder()
                 .collect(Collectors.toMap(Function.identity(), e -> InternalTopic.builder()
@@ -114,8 +107,7 @@ class TopicsServiceTest {
                     .name(e)
                     .name(e)
                     .internal(Integer.parseInt(e) % 10 == 0)
                     .internal(Integer.parseInt(e) % 10 == 0)
                     .build()))
                     .build()))
-        )
-        .build();
+        );
 
 
     var topics = topicsService.getTopics(cluster,
     var topics = topicsService.getTopics(cluster,
         Optional.empty(), Optional.empty(), Optional.of(true),
         Optional.empty(), Optional.empty(), Optional.of(true),
@@ -128,8 +120,7 @@ class TopicsServiceTest {
 
 
   @Test
   @Test
   public void shouldListOnlyNonInternalTopics() {
   public void shouldListOnlyNonInternalTopics() {
-    var cluster = KafkaCluster.builder()
-        .topics(
+    var cluster = clusterWithTopics(
             IntStream.rangeClosed(1, 100).boxed()
             IntStream.rangeClosed(1, 100).boxed()
                 .map(Objects::toString)
                 .map(Objects::toString)
                 .collect(Collectors.toMap(Function.identity(), e -> InternalTopic.builder()
                 .collect(Collectors.toMap(Function.identity(), e -> InternalTopic.builder()
@@ -137,8 +128,7 @@ class TopicsServiceTest {
                     .name(e)
                     .name(e)
                     .internal(Integer.parseInt(e) % 10 == 0)
                     .internal(Integer.parseInt(e) % 10 == 0)
                     .build()))
                     .build()))
-        )
-        .build();
+        );
 
 
     var topics = topicsService.getTopics(cluster,
     var topics = topicsService.getTopics(cluster,
         Optional.empty(), Optional.empty(), Optional.of(true),
         Optional.empty(), Optional.empty(), Optional.of(true),
@@ -151,16 +141,14 @@ class TopicsServiceTest {
 
 
   @Test
   @Test
   public void shouldListOnlyTopicsContainingOne() {
   public void shouldListOnlyTopicsContainingOne() {
-    var cluster = KafkaCluster.builder()
-        .topics(
+    var cluster = clusterWithTopics(
             IntStream.rangeClosed(1, 100).boxed()
             IntStream.rangeClosed(1, 100).boxed()
                 .map(Objects::toString)
                 .map(Objects::toString)
                 .collect(Collectors.toMap(Function.identity(), e -> InternalTopic.builder()
                 .collect(Collectors.toMap(Function.identity(), e -> InternalTopic.builder()
                     .partitions(Map.of())
                     .partitions(Map.of())
                     .name(e)
                     .name(e)
                     .build()))
                     .build()))
-        )
-        .build();
+        );
 
 
     var topics = topicsService.getTopics(cluster,
     var topics = topicsService.getTopics(cluster,
         Optional.empty(), Optional.empty(), Optional.empty(),
         Optional.empty(), Optional.empty(), Optional.empty(),
@@ -172,8 +160,7 @@ class TopicsServiceTest {
 
 
   @Test
   @Test
   public void shouldListTopicsOrderedByPartitionsCount() {
   public void shouldListTopicsOrderedByPartitionsCount() {
-    var cluster = KafkaCluster.builder()
-        .topics(
+    var cluster = clusterWithTopics(
             IntStream.rangeClosed(1, 100).boxed()
             IntStream.rangeClosed(1, 100).boxed()
                 .map(Objects::toString)
                 .map(Objects::toString)
                 .collect(Collectors.toMap(Function.identity(), e -> InternalTopic.builder()
                 .collect(Collectors.toMap(Function.identity(), e -> InternalTopic.builder()
@@ -181,8 +168,7 @@ class TopicsServiceTest {
                     .name(e)
                     .name(e)
                     .partitionCount(100 - Integer.parseInt(e))
                     .partitionCount(100 - Integer.parseInt(e))
                     .build()))
                     .build()))
-        )
-        .build();
+        );
 
 
     var topics = topicsService.getTopics(cluster,
     var topics = topicsService.getTopics(cluster,
         Optional.empty(), Optional.empty(), Optional.empty(),
         Optional.empty(), Optional.empty(), Optional.empty(),
@@ -194,8 +180,7 @@ class TopicsServiceTest {
 
 
   @Test
   @Test
   public void shouldRetrieveTopicConfigs() {
   public void shouldRetrieveTopicConfigs() {
-    var cluster = KafkaCluster.builder()
-        .topics(
+    var cluster = clusterWithTopics(
             IntStream.rangeClosed(1, 100).boxed()
             IntStream.rangeClosed(1, 100).boxed()
                 .map(Objects::toString)
                 .map(Objects::toString)
                 .collect(Collectors.toMap(Function.identity(), e -> InternalTopic.builder()
                 .collect(Collectors.toMap(Function.identity(), e -> InternalTopic.builder()
@@ -213,13 +198,12 @@ class TopicsServiceTest {
                         )
                         )
                     )
                     )
                     .build()))
                     .build()))
-        )
-        .build();
+        );
 
 
-    var configs = topicsService.getTopicConfigs(cluster, "1");
-    var topicConfig = configs.isPresent() ? configs.get().get(0) : null;
+    var topicConfigs = topicsService.getTopicConfigs(cluster, "1");
+    assertThat(topicConfigs).hasSize(1);
 
 
-    assertThat(configs.isPresent()).isTrue();
+    var topicConfig = topicConfigs.get(0);
     assertThat(topicConfig.getName()).isEqualTo("testName");
     assertThat(topicConfig.getName()).isEqualTo("testName");
     assertThat(topicConfig.getValue()).isEqualTo("testValue");
     assertThat(topicConfig.getValue()).isEqualTo("testValue");
     assertThat(topicConfig.getDefaultValue()).isEqualTo("testDefaultValue");
     assertThat(topicConfig.getDefaultValue()).isEqualTo("testDefaultValue");
@@ -230,4 +214,12 @@ class TopicsServiceTest {
     assertThat(topicConfig.getIsSensitive()).isTrue();
     assertThat(topicConfig.getIsSensitive()).isTrue();
   }
   }
 
 
+  private KafkaCluster clusterWithTopics(Map<String, InternalTopic> topics) {
+    return KafkaCluster.builder()
+        .metrics(InternalClusterMetrics.builder()
+            .topics(topics)
+            .build())
+        .build();
+  }
+
 }
 }