Explorar el Código

Fixed metrics and stats endpoints (#115)

German Osin hace 4 años
padre
commit
3c196587a3

+ 10 - 2
docker/kafka-ui.yaml

@@ -17,10 +17,12 @@ services:
       KAFKA_CLUSTERS_0_NAME: local
       KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka0:29092
       KAFKA_CLUSTERS_0_ZOOKEEPER: zookeeper0:2181
+      KAFKA_CLUSTERS_0_JMXPORT: 9997
       KAFKA_CLUSTERS_0_SCHEMAREGISTRY: http://schemaregistry0:8085
       KAFKA_CLUSTERS_1_NAME: secondLocal
       KAFKA_CLUSTERS_1_BOOTSTRAPSERVERS: kafka1:29092
       KAFKA_CLUSTERS_1_ZOOKEEPER: zookeeper1:2181
+      KAFKA_CLUSTERS_1_JMXPORT: 9998
 
   zookeeper0:
     image: confluentinc/cp-zookeeper:5.1.0
@@ -32,6 +34,9 @@ services:
     image: confluentinc/cp-kafka:5.1.0
     depends_on:
       - zookeeper0
+    ports:
+      - 9092:9092
+      - 9997:9997
     environment:
       KAFKA_BROKER_ID: 1
       KAFKA_ZOOKEEPER_CONNECT: zookeeper0:2181
@@ -52,6 +57,9 @@ services:
     image: confluentinc/cp-kafka:5.1.0
     depends_on:
       - zookeeper1
+    ports:
+      - 9093:9093
+      - 9998:9998
     environment:
       KAFKA_BROKER_ID: 1
       KAFKA_ZOOKEEPER_CONNECT: zookeeper1:2181
@@ -59,8 +67,8 @@ services:
       KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
       KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
       KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
-      JMX_PORT: 9997
-      KAFKA_JMX_OPTS: -Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Djava.rmi.server.hostname=kafka1 -Dcom.sun.management.jmxremote.rmi.port=9997
+      JMX_PORT: 9998
+      KAFKA_JMX_OPTS: -Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Djava.rmi.server.hostname=kafka1 -Dcom.sun.management.jmxremote.rmi.port=9998
 
   schemaregistry0:
     image: confluentinc/cp-schema-registry:5.1.0

+ 0 - 9
kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/config/Config.java

@@ -1,6 +1,5 @@
 package com.provectus.kafka.ui.cluster.config;
 
-import com.provectus.kafka.ui.cluster.util.JmxMetricsNames;
 import com.provectus.kafka.ui.cluster.util.JmxPoolFactory;
 import org.apache.commons.pool2.KeyedObjectPool;
 import org.apache.commons.pool2.impl.GenericKeyedObjectPool;
@@ -10,9 +9,6 @@ import org.springframework.context.annotation.Configuration;
 import org.springframework.jmx.export.MBeanExporter;
 
 import javax.management.remote.JMXConnector;
-import java.util.List;
-import java.util.stream.Collectors;
-import java.util.stream.Stream;
 
 @Configuration
 public class Config {
@@ -39,9 +35,4 @@ public class Config {
         exporter.setExcludedBeans("pool");
         return exporter;
     }
-
-    @Bean
-    public List<String> jmxMetricsNames() {
-        return Stream.of(JmxMetricsNames.values()).map(Enum::name).collect(Collectors.toList());
-    }
 }

+ 35 - 2
kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/mapper/ClusterMapper.java

@@ -6,6 +6,8 @@ import com.provectus.kafka.ui.model.*;
 import org.mapstruct.Mapper;
 import org.mapstruct.Mapping;
 
+import java.math.BigDecimal;
+import java.util.List;
 import java.util.Map;
 import java.util.stream.Collectors;
 
@@ -15,10 +17,14 @@ public interface ClusterMapper {
     @Mapping(target = "brokerCount", source = "metrics.brokerCount")
     @Mapping(target = "onlinePartitionCount", source = "metrics.onlinePartitionCount")
     @Mapping(target = "topicCount", source = "metrics.topicCount")
-    @Mapping(target = "metrics", source = "metrics.metrics")
+    @Mapping(target = "bytesInPerSec", source = "metrics.bytesInPerSec", qualifiedByName = "sumMetrics")
+    @Mapping(target = "bytesOutPerSec", source = "metrics.bytesOutPerSec", qualifiedByName = "sumMetrics")
     Cluster toCluster(KafkaCluster cluster);
 
     KafkaCluster toKafkaCluster(ClustersProperties.Cluster clusterProperties);
+    @Mapping(target = "diskUsage", source = "internalBrokerDiskUsage", qualifiedByName="mapDiskUsage")
+    ClusterStats toClusterStats(InternalClusterMetrics metrics);
+    @Mapping(target = "items", source = "metrics")
     ClusterMetrics toClusterMetrics(InternalClusterMetrics metrics);
     BrokerMetrics toBrokerMetrics(InternalBrokerMetrics metrics);
     Topic toTopic(InternalTopic topic);
@@ -27,8 +33,35 @@ public interface ClusterMapper {
     TopicConfig toTopicConfig(InternalTopicConfig topic);
     Replica toReplica(InternalReplica replica);
 
-     default java.util.List<Partition> map(Map<Integer, InternalPartition> map) {
+    default TopicDetails toTopicDetails(InternalTopic topic, InternalClusterMetrics metrics) {
+        final TopicDetails result = toTopicDetails(topic);
+        result.setBytesInPerSec(
+                metrics.getBytesInPerSec().get(topic.getName())
+        );
+        result.setBytesOutPerSec(
+                metrics.getBytesOutPerSec().get(topic.getName())
+        );
+        return result;
+    }
+
+     default List<Partition> map(Map<Integer, InternalPartition> map) {
          return map.values().stream().map(this::toPartition).collect(Collectors.toList());
      }
 
+     default List<BrokerDiskUsage> mapDiskUsage(Map<Integer, InternalBrokerDiskUsage> brokers) {
+         return brokers.entrySet().stream().map(e -> this.map(e.getKey(), e.getValue())).collect(Collectors.toList());
+     }
+
+     default BrokerDiskUsage map(Integer id, InternalBrokerDiskUsage internalBrokerDiskUsage) {
+         final BrokerDiskUsage brokerDiskUsage = new BrokerDiskUsage();
+         brokerDiskUsage.setBrokerId(id);
+         brokerDiskUsage.segmentCount((int)internalBrokerDiskUsage.getSegmentCount());
+         brokerDiskUsage.segmentSize(internalBrokerDiskUsage.getSegmentSize());
+         return brokerDiskUsage;
+     }
+
+     default BigDecimal sumMetrics(Map<String, BigDecimal> metrics) {
+         return metrics.values().stream().reduce(BigDecimal.ZERO, BigDecimal::add);
+     }
+
 }

+ 11 - 0
kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/model/InternalBrokerDiskUsage.java

@@ -0,0 +1,11 @@
+package com.provectus.kafka.ui.cluster.model;
+
+import lombok.Builder;
+import lombok.Data;
+
+@Data
+@Builder(toBuilder = true)
+public class InternalBrokerDiskUsage {
+    private final long segmentCount;
+    private final long segmentSize;
+}

+ 0 - 1
kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/model/InternalBrokerMetrics.java

@@ -9,6 +9,5 @@ import java.util.List;
 @Data
 @Builder(toBuilder = true)
 public class InternalBrokerMetrics {
-    private final Long segmentSize;
     private final List<Metric> metrics;
 }

+ 4 - 2
kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/model/InternalClusterMetrics.java

@@ -4,6 +4,7 @@ import com.provectus.kafka.ui.model.Metric;
 import lombok.Builder;
 import lombok.Data;
 
+import java.math.BigDecimal;
 import java.util.List;
 import java.util.Map;
 
@@ -20,10 +21,11 @@ public class InternalClusterMetrics {
     private final int offlinePartitionCount;
     private final int inSyncReplicasCount;
     private final int outOfSyncReplicasCount;
-    private final Map<String, Number> bytesInPerSec;
-    private final Map<String, Number> bytesOutPerSec;
+    private final Map<String, BigDecimal> bytesInPerSec;
+    private final Map<String, BigDecimal> bytesOutPerSec;
     private final long segmentCount;
     private final long segmentSize;
+    private final Map<Integer, InternalBrokerDiskUsage> internalBrokerDiskUsage;
     private final Map<Integer, InternalBrokerMetrics> internalBrokerMetrics;
     private final List<Metric> metrics;
     private final int zooKeeperStatus;

+ 0 - 1
kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/model/InternalTopic.java

@@ -23,5 +23,4 @@ public class InternalTopic {
     private final int underReplicatedPartitions;
     private final long segmentSize;
     private final long segmentCount;
-//    private final Map<TopicPartition, Long> partitionSegmentSize;
 }

+ 2 - 0
kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/model/MetricDto.java

@@ -4,11 +4,13 @@ import lombok.AllArgsConstructor;
 import lombok.Getter;
 
 import java.math.BigDecimal;
+import java.util.Map;
 
 @Getter
 @AllArgsConstructor
 public class MetricDto {
     private String canonicalName;
     private String metricName;
+    private Map<String,String> params;
     private BigDecimal value;
 }

+ 9 - 1
kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/service/ClusterService.java

@@ -46,6 +46,14 @@ public class ClusterService {
                 .map(clusterMapper::toBrokerMetrics));
     }
 
+    public Mono<ClusterStats> getClusterStats(String name) {
+        return Mono.justOrEmpty(
+                clustersStorage.getClusterByName(name)
+                        .map(KafkaCluster::getMetrics)
+                        .map(clusterMapper::toClusterStats)
+        );
+    }
+
     public Mono<ClusterMetrics> getClusterMetrics(String name) {
         return Mono.justOrEmpty(
                 clustersStorage.getClusterByName(name)
@@ -73,7 +81,7 @@ public class ClusterService {
                           t -> t.toBuilder().partitions(
                                   kafkaService.getTopicPartitions(c, t)
                           ).build()
-                        ).map(clusterMapper::toTopicDetails)
+                        ).map(t -> clusterMapper.toTopicDetails(t, c.getMetrics()))
                 );
     }
                                                                            

+ 25 - 33
kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/util/JmxClusterUtil.java

@@ -1,7 +1,5 @@
 package com.provectus.kafka.ui.cluster.util;
 
-import com.provectus.kafka.ui.cluster.model.InternalClusterMetrics;
-import com.provectus.kafka.ui.cluster.model.MetricDto;
 import com.provectus.kafka.ui.model.Metric;
 import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
@@ -13,12 +11,9 @@ import javax.management.remote.JMXConnector;
 import java.io.IOException;
 import java.math.BigDecimal;
 import java.net.MalformedURLException;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.function.Function;
+import java.util.*;
 import java.util.stream.Collectors;
+import java.util.stream.Stream;
 
 @Component
 @Slf4j
@@ -26,12 +21,11 @@ import java.util.stream.Collectors;
 public class JmxClusterUtil {
 
     private final KeyedObjectPool<String, JMXConnector> pool;
-    private final List<String> jmxMetricsNames;
 
     private static final String JMX_URL = "service:jmx:rmi:///jndi/rmi://";
     private static final String JMX_SERVICE_TYPE = "jmxrmi";
     private static final String KAFKA_SERVER_PARAM = "kafka.server";
-    private static final String NAME_METRIC_FIELD = "name=";
+    private static final String NAME_METRIC_FIELD = "name";
 
     public List<Metric> getJmxMetrics(int jmxPort, String jmxHost) {
         String jmxUrl = JMX_URL + jmxHost + ":" + jmxPort + "/" + JMX_SERVICE_TYPE;
@@ -42,11 +36,14 @@ public class JmxClusterUtil {
             MBeanServerConnection msc = srv.getMBeanServerConnection();
             var jmxMetrics = msc.queryNames(null, null).stream().filter(q -> q.getCanonicalName().startsWith(KAFKA_SERVER_PARAM)).collect(Collectors.toList());
             for (ObjectName jmxMetric : jmxMetrics) {
+                final Hashtable<String, String> params = jmxMetric.getKeyPropertyList();
                 Metric metric = new Metric();
+                metric.setName(params.get(NAME_METRIC_FIELD));
                 metric.setCanonicalName(jmxMetric.getCanonicalName());
+                metric.setParams(params);
                 metric.setValue(getJmxMetric(jmxMetric.getCanonicalName(), msc, srv, jmxUrl));
                 result.add(metric);
-            };
+            }
             pool.returnObject(jmxUrl, srv);
         } catch (IOException ioe) {
             log.error("Cannot get jmxMetricsNames, {}", jmxUrl, ioe);
@@ -58,6 +55,8 @@ public class JmxClusterUtil {
         return result;
     }
 
+
+
     private Map<String, BigDecimal> getJmxMetric(String canonicalName, MBeanServerConnection msc, JMXConnector srv, String jmxUrl) {
         Map<String, BigDecimal> resultAttr = new HashMap<>();
         try {
@@ -97,34 +96,27 @@ public class JmxClusterUtil {
         }
     }
 
-    public List<MetricDto> convertToMetricDto(InternalClusterMetrics internalClusterMetrics) {
-        return internalClusterMetrics.getInternalBrokerMetrics().values().stream()
-                .map(c ->
-                        c.getMetrics().stream()
-                                .filter(j -> isSameMetric(j.getCanonicalName()))
-                                .map(j -> j.getValue().entrySet().stream()
-                                        .map(e -> new MetricDto(j.getCanonicalName(), e.getKey(), e.getValue()))))
-                .flatMap(Function.identity()).flatMap(Function.identity()).collect(Collectors.toList());
-    }
-
     public Metric reduceJmxMetrics (Metric metric1, Metric metric2) {
         var result = new Metric();
-        Map<String, BigDecimal> jmx1 = new HashMap<>(metric1.getValue());
-        Map<String, BigDecimal> jmx2 = new HashMap<>(metric2.getValue());
-        jmx1.forEach((k, v) -> jmx2.merge(k, v, BigDecimal::add));
+        Map<String, BigDecimal> value = Stream.concat(
+                metric1.getValue().entrySet().stream(),
+                metric2.getValue().entrySet().stream()
+        ).collect(Collectors.groupingBy(
+                Map.Entry::getKey,
+                Collectors.reducing(BigDecimal.ZERO, Map.Entry::getValue, BigDecimal::add)
+        ));
+        result.setName(metric1.getName());
         result.setCanonicalName(metric1.getCanonicalName());
-        result.setValue(jmx2);
+        result.setParams(metric1.getParams());
+        result.setValue(value);
         return result;
     }
 
-    private boolean isSameMetric (String metric) {
-        if (metric.contains(NAME_METRIC_FIELD)) {
-            int beginIndex = metric.indexOf(NAME_METRIC_FIELD);
-            int endIndex = metric.indexOf(',', beginIndex);
-            endIndex = endIndex < 0 ? metric.length() - 1 : endIndex;
-            return jmxMetricsNames.contains(metric.substring(beginIndex + 5, endIndex));
-        } else {
-            return false;
-        }
+    private boolean isWellKnownMetric(Metric metric) {
+        final Optional<String> param = Optional.ofNullable(metric.getParams().get(NAME_METRIC_FIELD)).filter(p ->
+                Arrays.stream(JmxMetricsName.values()).map(Enum::name)
+                        .anyMatch(n -> n.equals(p))
+        );
+        return metric.getCanonicalName().contains(KAFKA_SERVER_PARAM) && param.isPresent();
     }
 }

+ 1 - 1
kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/util/JmxMetricsNames.java → kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/util/JmxMetricsName.java

@@ -1,6 +1,6 @@
 package com.provectus.kafka.ui.cluster.util;
 
-public enum JmxMetricsNames {
+public enum JmxMetricsName {
     MessagesInPerSec,
     BytesInPerSec,
     ReplicationBytesInPerSec,

+ 9 - 0
kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/util/JmxMetricsValueName.java

@@ -0,0 +1,9 @@
+package com.provectus.kafka.ui.cluster.util;
+
+public enum  JmxMetricsValueName {
+    Count,
+    OneMinuteRate,
+    FifteenMinuteRate,
+    FiveMinuteRate,
+    MeanRate
+}

+ 101 - 56
kafka-ui-api/src/main/java/com/provectus/kafka/ui/kafka/KafkaService.java

@@ -3,6 +3,8 @@ package com.provectus.kafka.ui.kafka;
 import com.provectus.kafka.ui.cluster.model.*;
 import com.provectus.kafka.ui.cluster.util.ClusterUtil;
 import com.provectus.kafka.ui.cluster.util.JmxClusterUtil;
+import com.provectus.kafka.ui.cluster.util.JmxMetricsName;
+import com.provectus.kafka.ui.cluster.util.JmxMetricsValueName;
 import com.provectus.kafka.ui.model.ConsumerGroup;
 import com.provectus.kafka.ui.model.Metric;
 import com.provectus.kafka.ui.model.ServerStatus;
@@ -27,6 +29,7 @@ import reactor.util.function.Tuple2;
 import reactor.util.function.Tuple3;
 import reactor.util.function.Tuples;
 
+import java.math.BigDecimal;
 import java.util.*;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.stream.Collectors;
@@ -319,50 +322,71 @@ public class KafkaService {
     private Mono<InternalSegmentSizeDto> updateSegmentMetrics(AdminClient ac, InternalClusterMetrics clusterMetrics, List<InternalTopic> internalTopics) {
         List<String> names = internalTopics.stream().map(InternalTopic::getName).collect(Collectors.toList());
         return ClusterUtil.toMono(ac.describeTopics(names).all()).flatMap(topic ->
-            ClusterUtil.toMono(ac.describeLogDirs(clusterMetrics.getInternalBrokerMetrics().keySet()).all())
-                .map(log -> {
-
-                    final List<Tuple3<Integer, TopicPartition, Long>> topicPartitions =
-                            log.entrySet().stream().flatMap(b ->
-                                    b.getValue().entrySet().stream().flatMap(topicMap ->
-                                            topicMap.getValue().replicaInfos.entrySet().stream()
-                                                    .map(e -> Tuples.of(b.getKey(), e.getKey(), e.getValue().size))
-                                    )
-                    ).collect(Collectors.toList());
-
-                    final Map<TopicPartition, LongSummaryStatistics> partitionStats = topicPartitions.stream().collect(
-                            Collectors.groupingBy(
-                                    Tuple2::getT2,
-                                    Collectors.summarizingLong(Tuple3::getT3)
-                            )
-                    );
-
-                    final Map<String, LongSummaryStatistics> topicStats = topicPartitions.stream().collect(
-                            Collectors.groupingBy(
-                                    t -> t.getT2().topic(),
-                                    Collectors.summarizingLong(Tuple3::getT3)
-                            )
-                    );
-
-                    final LongSummaryStatistics summary = topicPartitions.stream().collect(Collectors.summarizingLong(Tuple3::getT3));
-
-
-                    final Map<String, InternalTopic> resultTopics = internalTopics.stream().map(e ->
-                            Tuples.of(e.getName(), mergeWithStats(e, topicStats, partitionStats))
-                    ).collect(Collectors.toMap(
-                            Tuple2::getT1,
-                            Tuple2::getT2
-                    ));
-
-                    return InternalSegmentSizeDto.builder()
-                            .clusterMetricsWithSegmentSize(
-                                    clusterMetrics.toBuilder()
-                                            .segmentSize(summary.getSum())
-                                            .segmentCount(summary.getCount())
-                                            .build()
-                            )
-                            .internalTopicWithSegmentSize(resultTopics).build();
-                })
+            ClusterUtil.toMono(ac.describeCluster().nodes()).flatMap( nodes ->
+                    ClusterUtil.toMono(ac.describeLogDirs(nodes.stream().map(Node::id).collect(Collectors.toList())).all())
+                            .map(log -> {
+                                final List<Tuple3<Integer, TopicPartition, Long>> topicPartitions =
+                                        log.entrySet().stream().flatMap(b ->
+                                                b.getValue().entrySet().stream().flatMap(topicMap ->
+                                                        topicMap.getValue().replicaInfos.entrySet().stream()
+                                                                .map(e -> Tuples.of(b.getKey(), e.getKey(), e.getValue().size))
+                                                )
+                                        ).collect(Collectors.toList());
+
+                                final Map<TopicPartition, LongSummaryStatistics> partitionStats = topicPartitions.stream().collect(
+                                        Collectors.groupingBy(
+                                                Tuple2::getT2,
+                                                Collectors.summarizingLong(Tuple3::getT3)
+                                        )
+                                );
+
+                                final Map<String, LongSummaryStatistics> topicStats = topicPartitions.stream().collect(
+                                        Collectors.groupingBy(
+                                                t -> t.getT2().topic(),
+                                                Collectors.summarizingLong(Tuple3::getT3)
+                                        )
+                                );
+
+                                final Map<Integer, LongSummaryStatistics> brokerStats = topicPartitions.stream().collect(
+                                        Collectors.groupingBy(
+                                                t -> t.getT1(),
+                                                Collectors.summarizingLong(Tuple3::getT3)
+                                        )
+                                );
+
+
+                                final LongSummaryStatistics summary = topicPartitions.stream().collect(Collectors.summarizingLong(Tuple3::getT3));
+
+
+                                final Map<String, InternalTopic> resultTopics = internalTopics.stream().map(e ->
+                                        Tuples.of(e.getName(), mergeWithStats(e, topicStats, partitionStats))
+                                ).collect(Collectors.toMap(
+                                        Tuple2::getT1,
+                                        Tuple2::getT2
+                                ));
+
+                                final Map<Integer, InternalBrokerDiskUsage> resultBrokers = brokerStats.entrySet().stream().map(e ->
+                                        Tuples.of(e.getKey(), InternalBrokerDiskUsage.builder()
+                                                .segmentSize(e.getValue().getSum())
+                                                .segmentCount(e.getValue().getCount())
+                                                .build()
+                                        )
+                                ).collect(Collectors.toMap(
+                                        Tuple2::getT1,
+                                        Tuple2::getT2
+                                ));
+
+                                return InternalSegmentSizeDto.builder()
+                                        .clusterMetricsWithSegmentSize(
+                                                clusterMetrics.toBuilder()
+                                                        .segmentSize(summary.getSum())
+                                                        .segmentCount(summary.getCount())
+                                                        .internalBrokerDiskUsage(resultBrokers)
+                                                        .build()
+                                        )
+                                        .internalTopicWithSegmentSize(resultTopics).build();
+                            })
+            )
         );
     }
 
@@ -387,18 +411,39 @@ public class KafkaService {
     }
 
     private InternalClusterMetrics calculateClusterMetrics(InternalClusterMetrics internalClusterMetrics) {
-        return internalClusterMetrics.toBuilder().metrics(
-                    jmxClusterUtil.convertToMetricDto(internalClusterMetrics)
-                            .stream().map(c -> {
-                        Metric jmx = new Metric();
-                        jmx.setCanonicalName(c.getCanonicalName());
-                        jmx.setValue(Map.of(c.getMetricName(), c.getValue()));
-                        return jmx;
-                    }).collect(Collectors.groupingBy(Metric::getCanonicalName, Collectors.reducing(jmxClusterUtil::reduceJmxMetrics)))
-                    .values().stream()
-                    .filter(Optional::isPresent)
-                    .map(Optional::get)
-                    .collect(Collectors.toList())).build();
+        final List<Metric> metrics = internalClusterMetrics.getInternalBrokerMetrics().values().stream()
+                .flatMap(b -> b.getMetrics().stream())
+                .collect(
+                        Collectors.groupingBy(
+                                Metric::getCanonicalName,
+                                Collectors.reducing(jmxClusterUtil::reduceJmxMetrics)
+                        )
+                ).values().stream()
+                .filter(Optional::isPresent)
+                .map(Optional::get)
+                .collect(Collectors.toList());
+        final InternalClusterMetrics.InternalClusterMetricsBuilder metricsBuilder =
+                internalClusterMetrics.toBuilder().metrics(metrics);
+        metricsBuilder.bytesInPerSec(findTopicMetrics(
+                metrics, JmxMetricsName.BytesInPerSec, JmxMetricsValueName.FiveMinuteRate
+        ));
+        metricsBuilder.bytesOutPerSec(findTopicMetrics(
+                metrics, JmxMetricsName.BytesOutPerSec, JmxMetricsValueName.FiveMinuteRate
+        ));
+        return metricsBuilder.build();
+    }
+
+    private Map<String, BigDecimal> findTopicMetrics(List<Metric> metrics, JmxMetricsName metricsName, JmxMetricsValueName valueName) {
+        return metrics.stream().filter(m -> metricsName.name().equals(m.getName()))
+                .filter(m -> m.getParams().containsKey("topic"))
+                .filter(m -> m.getValue().containsKey(valueName.name()))
+                .map(m -> Tuples.of(
+                        m.getParams().get("topic"),
+                        m.getValue().get(valueName.name())
+                )).collect(Collectors.groupingBy(
+                    Tuple2::getT1,
+                    Collectors.reducing(BigDecimal.ZERO, Tuple2::getT2, BigDecimal::add)
+                ));
     }
 
     public Map<Integer, InternalPartition> getTopicPartitions(KafkaCluster c, InternalTopic topic )  {

+ 7 - 0
kafka-ui-api/src/main/java/com/provectus/kafka/ui/rest/MetricsRestController.java

@@ -43,6 +43,13 @@ public class MetricsRestController implements ApiClustersApi {
                 .onErrorReturn(ResponseEntity.notFound().build());
     }
 
+    @Override
+    public Mono<ResponseEntity<ClusterStats>> getClusterStats(String clusterName, ServerWebExchange exchange) {
+        return clusterService.getClusterStats(clusterName)
+                .map(ResponseEntity::ok)
+                .onErrorReturn(ResponseEntity.notFound().build());
+    }
+
     @Override
     public Mono<ResponseEntity<Flux<Topic>>> getTopics(String clusterName, ServerWebExchange exchange) {
         return Mono.just(ResponseEntity.ok(Flux.fromIterable(clusterService.getTopics(clusterName))));

+ 0 - 5
kafka-ui-api/src/main/resources/application-local.yml

@@ -12,11 +12,6 @@ kafka:
       bootstrapServers: localhost:9093
       zookeeper: localhost:2182
       jmxPort: 9998
-    -
-      name: localReplica
-      bootstrapServers: localhost:9094
-      zookeeper: localhost:2181
-      jmxPort: 9999
   admin-client-timeout: 5000
 zookeeper:
   connection-timeout: 1000

+ 48 - 7
kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml

@@ -72,7 +72,27 @@ paths:
               schema:
                 $ref: '#/components/schemas/ClusterMetrics'
 
-  /api/clusters/{clusterName}/metrics/broker/{id}:
+  /api/clusters/{clusterName}/stats:
+    get:
+      tags:
+        - /api/clusters
+      summary: getClusterStats
+      operationId: getClusterStats
+      parameters:
+        - name: clusterName
+          in: path
+          required: true
+          schema:
+            type: string
+      responses:
+        200:
+          description: OK
+          content:
+            application/json:
+              schema:
+                $ref: '#/components/schemas/ClusterStats'
+
+  /api/clusters/{clusterName}/brokers/{id}/metrics:
     get:
       tags:
         - /api/clusters
@@ -332,10 +352,10 @@ components:
           type: integer
         topicCount:
           type: integer
-        metrics:
-          type: array
-          items:
-            $ref: '#/components/schemas/Metric'
+        bytesInPerSec:
+          type: number
+        bytesOutPerSec:
+          type: number
       required:
         - id
         - name
@@ -348,6 +368,14 @@ components:
         - offline
 
     ClusterMetrics:
+      type: object
+      properties:
+        items:
+          type: array
+          items:
+            $ref: '#/components/schemas/Metric'
+
+    ClusterStats:
       type: object
       properties:
         brokerCount:
@@ -379,11 +407,16 @@ components:
         segmentSize:
           type: integer
           format: int64
+        segmentCount:
+          type: integer
 
     BrokerMetrics:
       type: object
       properties:
-        segmentZise:
+        segmentSize:
+          type: integer
+          format: int64
+        segmentCount:
           type: integer
         metrics:
           type: array
@@ -446,7 +479,9 @@ components:
         inSyncReplicas:
           type: integer
         bytesInPerSec:
-          type: integer
+          type: number
+        bytesOutPerSec:
+          type: number
         segmentSize:
           type: integer
         segmentCount:
@@ -589,8 +624,14 @@ components:
     Metric:
       type: object
       properties:
+        name:
+          type: string
         canonicalName:
           type: string
+        params:
+          type: string
+          additionalProperties:
+            type: string
         value:
           type: string
           additionalProperties: