diff --git a/docker/kafka-ui.yaml b/docker/kafka-ui.yaml index e538c16959..b62dfd37da 100644 --- a/docker/kafka-ui.yaml +++ b/docker/kafka-ui.yaml @@ -17,10 +17,12 @@ services: KAFKA_CLUSTERS_0_NAME: local KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka0:29092 KAFKA_CLUSTERS_0_ZOOKEEPER: zookeeper0:2181 + KAFKA_CLUSTERS_0_JMXPORT: 9997 KAFKA_CLUSTERS_0_SCHEMAREGISTRY: http://schemaregistry0:8085 KAFKA_CLUSTERS_1_NAME: secondLocal KAFKA_CLUSTERS_1_BOOTSTRAPSERVERS: kafka1:29092 KAFKA_CLUSTERS_1_ZOOKEEPER: zookeeper1:2181 + KAFKA_CLUSTERS_1_JMXPORT: 9998 zookeeper0: image: confluentinc/cp-zookeeper:5.1.0 @@ -32,6 +34,9 @@ services: image: confluentinc/cp-kafka:5.1.0 depends_on: - zookeeper0 + ports: + - 9092:9092 + - 9997:9997 environment: KAFKA_BROKER_ID: 1 KAFKA_ZOOKEEPER_CONNECT: zookeeper0:2181 @@ -52,6 +57,9 @@ services: image: confluentinc/cp-kafka:5.1.0 depends_on: - zookeeper1 + ports: + - 9093:9093 + - 9998:9998 environment: KAFKA_BROKER_ID: 1 KAFKA_ZOOKEEPER_CONNECT: zookeeper1:2181 @@ -59,8 +67,8 @@ services: KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 - JMX_PORT: 9997 - KAFKA_JMX_OPTS: -Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Djava.rmi.server.hostname=kafka1 -Dcom.sun.management.jmxremote.rmi.port=9997 + JMX_PORT: 9998 + KAFKA_JMX_OPTS: -Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Djava.rmi.server.hostname=kafka1 -Dcom.sun.management.jmxremote.rmi.port=9998 schemaregistry0: image: confluentinc/cp-schema-registry:5.1.0 diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/config/Config.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/config/Config.java index 5b6a623b00..83c86553c9 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/config/Config.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/config/Config.java @@ -1,6 +1,5 @@ package com.provectus.kafka.ui.cluster.config; -import com.provectus.kafka.ui.cluster.util.JmxMetricsNames; import com.provectus.kafka.ui.cluster.util.JmxPoolFactory; import org.apache.commons.pool2.KeyedObjectPool; import org.apache.commons.pool2.impl.GenericKeyedObjectPool; @@ -10,9 +9,6 @@ import org.springframework.context.annotation.Configuration; import org.springframework.jmx.export.MBeanExporter; import javax.management.remote.JMXConnector; -import java.util.List; -import java.util.stream.Collectors; -import java.util.stream.Stream; @Configuration public class Config { @@ -39,9 +35,4 @@ public class Config { exporter.setExcludedBeans("pool"); return exporter; } - - @Bean - public List jmxMetricsNames() { - return Stream.of(JmxMetricsNames.values()).map(Enum::name).collect(Collectors.toList()); - } } diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/mapper/ClusterMapper.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/mapper/ClusterMapper.java index 85256c08ef..e85690ac95 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/mapper/ClusterMapper.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/mapper/ClusterMapper.java @@ -6,6 +6,8 @@ import com.provectus.kafka.ui.model.*; import org.mapstruct.Mapper; import org.mapstruct.Mapping; +import java.math.BigDecimal; +import java.util.List; import java.util.Map; import java.util.stream.Collectors; @@ -15,10 +17,14 @@ public interface ClusterMapper { @Mapping(target = "brokerCount", source = "metrics.brokerCount") @Mapping(target = "onlinePartitionCount", source = "metrics.onlinePartitionCount") @Mapping(target = "topicCount", source = "metrics.topicCount") - @Mapping(target = "metrics", source = "metrics.metrics") + @Mapping(target = "bytesInPerSec", source = "metrics.bytesInPerSec", qualifiedByName = "sumMetrics") + @Mapping(target = "bytesOutPerSec", source = "metrics.bytesOutPerSec", qualifiedByName = "sumMetrics") Cluster toCluster(KafkaCluster cluster); KafkaCluster toKafkaCluster(ClustersProperties.Cluster clusterProperties); + @Mapping(target = "diskUsage", source = "internalBrokerDiskUsage", qualifiedByName="mapDiskUsage") + ClusterStats toClusterStats(InternalClusterMetrics metrics); + @Mapping(target = "items", source = "metrics") ClusterMetrics toClusterMetrics(InternalClusterMetrics metrics); BrokerMetrics toBrokerMetrics(InternalBrokerMetrics metrics); Topic toTopic(InternalTopic topic); @@ -27,8 +33,35 @@ public interface ClusterMapper { TopicConfig toTopicConfig(InternalTopicConfig topic); Replica toReplica(InternalReplica replica); - default java.util.List map(Map map) { + default TopicDetails toTopicDetails(InternalTopic topic, InternalClusterMetrics metrics) { + final TopicDetails result = toTopicDetails(topic); + result.setBytesInPerSec( + metrics.getBytesInPerSec().get(topic.getName()) + ); + result.setBytesOutPerSec( + metrics.getBytesOutPerSec().get(topic.getName()) + ); + return result; + } + + default List map(Map map) { return map.values().stream().map(this::toPartition).collect(Collectors.toList()); } + default List mapDiskUsage(Map brokers) { + return brokers.entrySet().stream().map(e -> this.map(e.getKey(), e.getValue())).collect(Collectors.toList()); + } + + default BrokerDiskUsage map(Integer id, InternalBrokerDiskUsage internalBrokerDiskUsage) { + final BrokerDiskUsage brokerDiskUsage = new BrokerDiskUsage(); + brokerDiskUsage.setBrokerId(id); + brokerDiskUsage.segmentCount((int)internalBrokerDiskUsage.getSegmentCount()); + brokerDiskUsage.segmentSize(internalBrokerDiskUsage.getSegmentSize()); + return brokerDiskUsage; + } + + default BigDecimal sumMetrics(Map metrics) { + return metrics.values().stream().reduce(BigDecimal.ZERO, BigDecimal::add); + } + } diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/model/InternalBrokerDiskUsage.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/model/InternalBrokerDiskUsage.java new file mode 100644 index 0000000000..42d57faba7 --- /dev/null +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/model/InternalBrokerDiskUsage.java @@ -0,0 +1,11 @@ +package com.provectus.kafka.ui.cluster.model; + +import lombok.Builder; +import lombok.Data; + +@Data +@Builder(toBuilder = true) +public class InternalBrokerDiskUsage { + private final long segmentCount; + private final long segmentSize; +} diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/model/InternalBrokerMetrics.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/model/InternalBrokerMetrics.java index 662cf0ddf5..a7a872bcdc 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/model/InternalBrokerMetrics.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/model/InternalBrokerMetrics.java @@ -9,6 +9,5 @@ import java.util.List; @Data @Builder(toBuilder = true) public class InternalBrokerMetrics { - private final Long segmentSize; private final List metrics; } diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/model/InternalClusterMetrics.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/model/InternalClusterMetrics.java index 128316cda9..a411dbbac1 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/model/InternalClusterMetrics.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/model/InternalClusterMetrics.java @@ -4,6 +4,7 @@ import com.provectus.kafka.ui.model.Metric; import lombok.Builder; import lombok.Data; +import java.math.BigDecimal; import java.util.List; import java.util.Map; @@ -20,10 +21,11 @@ public class InternalClusterMetrics { private final int offlinePartitionCount; private final int inSyncReplicasCount; private final int outOfSyncReplicasCount; - private final Map bytesInPerSec; - private final Map bytesOutPerSec; + private final Map bytesInPerSec; + private final Map bytesOutPerSec; private final long segmentCount; private final long segmentSize; + private final Map internalBrokerDiskUsage; private final Map internalBrokerMetrics; private final List metrics; private final int zooKeeperStatus; diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/model/InternalTopic.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/model/InternalTopic.java index 36c0741b20..f027bcabf8 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/model/InternalTopic.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/model/InternalTopic.java @@ -23,5 +23,4 @@ public class InternalTopic { private final int underReplicatedPartitions; private final long segmentSize; private final long segmentCount; -// private final Map partitionSegmentSize; } diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/model/MetricDto.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/model/MetricDto.java index 44b5b896e8..801ae2d47b 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/model/MetricDto.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/model/MetricDto.java @@ -4,11 +4,13 @@ import lombok.AllArgsConstructor; import lombok.Getter; import java.math.BigDecimal; +import java.util.Map; @Getter @AllArgsConstructor public class MetricDto { private String canonicalName; private String metricName; + private Map params; private BigDecimal value; } diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/service/ClusterService.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/service/ClusterService.java index 32e95bfe1b..5cbb824ad0 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/service/ClusterService.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/service/ClusterService.java @@ -46,6 +46,14 @@ public class ClusterService { .map(clusterMapper::toBrokerMetrics)); } + public Mono getClusterStats(String name) { + return Mono.justOrEmpty( + clustersStorage.getClusterByName(name) + .map(KafkaCluster::getMetrics) + .map(clusterMapper::toClusterStats) + ); + } + public Mono getClusterMetrics(String name) { return Mono.justOrEmpty( clustersStorage.getClusterByName(name) @@ -73,7 +81,7 @@ public class ClusterService { t -> t.toBuilder().partitions( kafkaService.getTopicPartitions(c, t) ).build() - ).map(clusterMapper::toTopicDetails) + ).map(t -> clusterMapper.toTopicDetails(t, c.getMetrics())) ); } diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/util/JmxClusterUtil.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/util/JmxClusterUtil.java index f45f968853..b4090d9b92 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/util/JmxClusterUtil.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/util/JmxClusterUtil.java @@ -1,7 +1,5 @@ package com.provectus.kafka.ui.cluster.util; -import com.provectus.kafka.ui.cluster.model.InternalClusterMetrics; -import com.provectus.kafka.ui.cluster.model.MetricDto; import com.provectus.kafka.ui.model.Metric; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; @@ -13,12 +11,9 @@ import javax.management.remote.JMXConnector; import java.io.IOException; import java.math.BigDecimal; import java.net.MalformedURLException; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.function.Function; +import java.util.*; import java.util.stream.Collectors; +import java.util.stream.Stream; @Component @Slf4j @@ -26,12 +21,11 @@ import java.util.stream.Collectors; public class JmxClusterUtil { private final KeyedObjectPool pool; - private final List jmxMetricsNames; private static final String JMX_URL = "service:jmx:rmi:///jndi/rmi://"; private static final String JMX_SERVICE_TYPE = "jmxrmi"; private static final String KAFKA_SERVER_PARAM = "kafka.server"; - private static final String NAME_METRIC_FIELD = "name="; + private static final String NAME_METRIC_FIELD = "name"; public List getJmxMetrics(int jmxPort, String jmxHost) { String jmxUrl = JMX_URL + jmxHost + ":" + jmxPort + "/" + JMX_SERVICE_TYPE; @@ -42,11 +36,14 @@ public class JmxClusterUtil { MBeanServerConnection msc = srv.getMBeanServerConnection(); var jmxMetrics = msc.queryNames(null, null).stream().filter(q -> q.getCanonicalName().startsWith(KAFKA_SERVER_PARAM)).collect(Collectors.toList()); for (ObjectName jmxMetric : jmxMetrics) { + final Hashtable params = jmxMetric.getKeyPropertyList(); Metric metric = new Metric(); + metric.setName(params.get(NAME_METRIC_FIELD)); metric.setCanonicalName(jmxMetric.getCanonicalName()); + metric.setParams(params); metric.setValue(getJmxMetric(jmxMetric.getCanonicalName(), msc, srv, jmxUrl)); result.add(metric); - }; + } pool.returnObject(jmxUrl, srv); } catch (IOException ioe) { log.error("Cannot get jmxMetricsNames, {}", jmxUrl, ioe); @@ -58,6 +55,8 @@ public class JmxClusterUtil { return result; } + + private Map getJmxMetric(String canonicalName, MBeanServerConnection msc, JMXConnector srv, String jmxUrl) { Map resultAttr = new HashMap<>(); try { @@ -97,34 +96,27 @@ public class JmxClusterUtil { } } - public List convertToMetricDto(InternalClusterMetrics internalClusterMetrics) { - return internalClusterMetrics.getInternalBrokerMetrics().values().stream() - .map(c -> - c.getMetrics().stream() - .filter(j -> isSameMetric(j.getCanonicalName())) - .map(j -> j.getValue().entrySet().stream() - .map(e -> new MetricDto(j.getCanonicalName(), e.getKey(), e.getValue())))) - .flatMap(Function.identity()).flatMap(Function.identity()).collect(Collectors.toList()); - } - public Metric reduceJmxMetrics (Metric metric1, Metric metric2) { var result = new Metric(); - Map jmx1 = new HashMap<>(metric1.getValue()); - Map jmx2 = new HashMap<>(metric2.getValue()); - jmx1.forEach((k, v) -> jmx2.merge(k, v, BigDecimal::add)); + Map value = Stream.concat( + metric1.getValue().entrySet().stream(), + metric2.getValue().entrySet().stream() + ).collect(Collectors.groupingBy( + Map.Entry::getKey, + Collectors.reducing(BigDecimal.ZERO, Map.Entry::getValue, BigDecimal::add) + )); + result.setName(metric1.getName()); result.setCanonicalName(metric1.getCanonicalName()); - result.setValue(jmx2); + result.setParams(metric1.getParams()); + result.setValue(value); return result; } - private boolean isSameMetric (String metric) { - if (metric.contains(NAME_METRIC_FIELD)) { - int beginIndex = metric.indexOf(NAME_METRIC_FIELD); - int endIndex = metric.indexOf(',', beginIndex); - endIndex = endIndex < 0 ? metric.length() - 1 : endIndex; - return jmxMetricsNames.contains(metric.substring(beginIndex + 5, endIndex)); - } else { - return false; - } + private boolean isWellKnownMetric(Metric metric) { + final Optional param = Optional.ofNullable(metric.getParams().get(NAME_METRIC_FIELD)).filter(p -> + Arrays.stream(JmxMetricsName.values()).map(Enum::name) + .anyMatch(n -> n.equals(p)) + ); + return metric.getCanonicalName().contains(KAFKA_SERVER_PARAM) && param.isPresent(); } } diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/util/JmxMetricsNames.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/util/JmxMetricsName.java similarity index 96% rename from kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/util/JmxMetricsNames.java rename to kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/util/JmxMetricsName.java index 62197dc60c..842c9d5790 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/util/JmxMetricsNames.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/util/JmxMetricsName.java @@ -1,6 +1,6 @@ package com.provectus.kafka.ui.cluster.util; -public enum JmxMetricsNames { +public enum JmxMetricsName { MessagesInPerSec, BytesInPerSec, ReplicationBytesInPerSec, diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/util/JmxMetricsValueName.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/util/JmxMetricsValueName.java new file mode 100644 index 0000000000..182d144a36 --- /dev/null +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/util/JmxMetricsValueName.java @@ -0,0 +1,9 @@ +package com.provectus.kafka.ui.cluster.util; + +public enum JmxMetricsValueName { + Count, + OneMinuteRate, + FifteenMinuteRate, + FiveMinuteRate, + MeanRate +} diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/kafka/KafkaService.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/kafka/KafkaService.java index 38b69c9dcd..49f8281b39 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/kafka/KafkaService.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/kafka/KafkaService.java @@ -3,6 +3,8 @@ package com.provectus.kafka.ui.kafka; import com.provectus.kafka.ui.cluster.model.*; import com.provectus.kafka.ui.cluster.util.ClusterUtil; import com.provectus.kafka.ui.cluster.util.JmxClusterUtil; +import com.provectus.kafka.ui.cluster.util.JmxMetricsName; +import com.provectus.kafka.ui.cluster.util.JmxMetricsValueName; import com.provectus.kafka.ui.model.ConsumerGroup; import com.provectus.kafka.ui.model.Metric; import com.provectus.kafka.ui.model.ServerStatus; @@ -27,6 +29,7 @@ import reactor.util.function.Tuple2; import reactor.util.function.Tuple3; import reactor.util.function.Tuples; +import java.math.BigDecimal; import java.util.*; import java.util.concurrent.ConcurrentHashMap; import java.util.stream.Collectors; @@ -319,50 +322,71 @@ public class KafkaService { private Mono updateSegmentMetrics(AdminClient ac, InternalClusterMetrics clusterMetrics, List internalTopics) { List names = internalTopics.stream().map(InternalTopic::getName).collect(Collectors.toList()); return ClusterUtil.toMono(ac.describeTopics(names).all()).flatMap(topic -> - ClusterUtil.toMono(ac.describeLogDirs(clusterMetrics.getInternalBrokerMetrics().keySet()).all()) - .map(log -> { + ClusterUtil.toMono(ac.describeCluster().nodes()).flatMap( nodes -> + ClusterUtil.toMono(ac.describeLogDirs(nodes.stream().map(Node::id).collect(Collectors.toList())).all()) + .map(log -> { + final List> topicPartitions = + log.entrySet().stream().flatMap(b -> + b.getValue().entrySet().stream().flatMap(topicMap -> + topicMap.getValue().replicaInfos.entrySet().stream() + .map(e -> Tuples.of(b.getKey(), e.getKey(), e.getValue().size)) + ) + ).collect(Collectors.toList()); - final List> topicPartitions = - log.entrySet().stream().flatMap(b -> - b.getValue().entrySet().stream().flatMap(topicMap -> - topicMap.getValue().replicaInfos.entrySet().stream() - .map(e -> Tuples.of(b.getKey(), e.getKey(), e.getValue().size)) - ) - ).collect(Collectors.toList()); + final Map partitionStats = topicPartitions.stream().collect( + Collectors.groupingBy( + Tuple2::getT2, + Collectors.summarizingLong(Tuple3::getT3) + ) + ); - final Map partitionStats = topicPartitions.stream().collect( - Collectors.groupingBy( - Tuple2::getT2, - Collectors.summarizingLong(Tuple3::getT3) - ) - ); + final Map topicStats = topicPartitions.stream().collect( + Collectors.groupingBy( + t -> t.getT2().topic(), + Collectors.summarizingLong(Tuple3::getT3) + ) + ); - final Map topicStats = topicPartitions.stream().collect( - Collectors.groupingBy( - t -> t.getT2().topic(), - Collectors.summarizingLong(Tuple3::getT3) - ) - ); - - final LongSummaryStatistics summary = topicPartitions.stream().collect(Collectors.summarizingLong(Tuple3::getT3)); + final Map brokerStats = topicPartitions.stream().collect( + Collectors.groupingBy( + t -> t.getT1(), + Collectors.summarizingLong(Tuple3::getT3) + ) + ); - final Map resultTopics = internalTopics.stream().map(e -> - Tuples.of(e.getName(), mergeWithStats(e, topicStats, partitionStats)) - ).collect(Collectors.toMap( - Tuple2::getT1, - Tuple2::getT2 - )); + final LongSummaryStatistics summary = topicPartitions.stream().collect(Collectors.summarizingLong(Tuple3::getT3)); - return InternalSegmentSizeDto.builder() - .clusterMetricsWithSegmentSize( - clusterMetrics.toBuilder() - .segmentSize(summary.getSum()) - .segmentCount(summary.getCount()) - .build() - ) - .internalTopicWithSegmentSize(resultTopics).build(); - }) + + final Map resultTopics = internalTopics.stream().map(e -> + Tuples.of(e.getName(), mergeWithStats(e, topicStats, partitionStats)) + ).collect(Collectors.toMap( + Tuple2::getT1, + Tuple2::getT2 + )); + + final Map resultBrokers = brokerStats.entrySet().stream().map(e -> + Tuples.of(e.getKey(), InternalBrokerDiskUsage.builder() + .segmentSize(e.getValue().getSum()) + .segmentCount(e.getValue().getCount()) + .build() + ) + ).collect(Collectors.toMap( + Tuple2::getT1, + Tuple2::getT2 + )); + + return InternalSegmentSizeDto.builder() + .clusterMetricsWithSegmentSize( + clusterMetrics.toBuilder() + .segmentSize(summary.getSum()) + .segmentCount(summary.getCount()) + .internalBrokerDiskUsage(resultBrokers) + .build() + ) + .internalTopicWithSegmentSize(resultTopics).build(); + }) + ) ); } @@ -387,18 +411,39 @@ public class KafkaService { } private InternalClusterMetrics calculateClusterMetrics(InternalClusterMetrics internalClusterMetrics) { - return internalClusterMetrics.toBuilder().metrics( - jmxClusterUtil.convertToMetricDto(internalClusterMetrics) - .stream().map(c -> { - Metric jmx = new Metric(); - jmx.setCanonicalName(c.getCanonicalName()); - jmx.setValue(Map.of(c.getMetricName(), c.getValue())); - return jmx; - }).collect(Collectors.groupingBy(Metric::getCanonicalName, Collectors.reducing(jmxClusterUtil::reduceJmxMetrics))) - .values().stream() - .filter(Optional::isPresent) - .map(Optional::get) - .collect(Collectors.toList())).build(); + final List metrics = internalClusterMetrics.getInternalBrokerMetrics().values().stream() + .flatMap(b -> b.getMetrics().stream()) + .collect( + Collectors.groupingBy( + Metric::getCanonicalName, + Collectors.reducing(jmxClusterUtil::reduceJmxMetrics) + ) + ).values().stream() + .filter(Optional::isPresent) + .map(Optional::get) + .collect(Collectors.toList()); + final InternalClusterMetrics.InternalClusterMetricsBuilder metricsBuilder = + internalClusterMetrics.toBuilder().metrics(metrics); + metricsBuilder.bytesInPerSec(findTopicMetrics( + metrics, JmxMetricsName.BytesInPerSec, JmxMetricsValueName.FiveMinuteRate + )); + metricsBuilder.bytesOutPerSec(findTopicMetrics( + metrics, JmxMetricsName.BytesOutPerSec, JmxMetricsValueName.FiveMinuteRate + )); + return metricsBuilder.build(); + } + + private Map findTopicMetrics(List metrics, JmxMetricsName metricsName, JmxMetricsValueName valueName) { + return metrics.stream().filter(m -> metricsName.name().equals(m.getName())) + .filter(m -> m.getParams().containsKey("topic")) + .filter(m -> m.getValue().containsKey(valueName.name())) + .map(m -> Tuples.of( + m.getParams().get("topic"), + m.getValue().get(valueName.name()) + )).collect(Collectors.groupingBy( + Tuple2::getT1, + Collectors.reducing(BigDecimal.ZERO, Tuple2::getT2, BigDecimal::add) + )); } public Map getTopicPartitions(KafkaCluster c, InternalTopic topic ) { diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/rest/MetricsRestController.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/rest/MetricsRestController.java index ce65342b28..e5885cc79c 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/rest/MetricsRestController.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/rest/MetricsRestController.java @@ -43,6 +43,13 @@ public class MetricsRestController implements ApiClustersApi { .onErrorReturn(ResponseEntity.notFound().build()); } + @Override + public Mono> getClusterStats(String clusterName, ServerWebExchange exchange) { + return clusterService.getClusterStats(clusterName) + .map(ResponseEntity::ok) + .onErrorReturn(ResponseEntity.notFound().build()); + } + @Override public Mono>> getTopics(String clusterName, ServerWebExchange exchange) { return Mono.just(ResponseEntity.ok(Flux.fromIterable(clusterService.getTopics(clusterName)))); diff --git a/kafka-ui-api/src/main/resources/application-local.yml b/kafka-ui-api/src/main/resources/application-local.yml index 765596e496..e1f9351d51 100644 --- a/kafka-ui-api/src/main/resources/application-local.yml +++ b/kafka-ui-api/src/main/resources/application-local.yml @@ -12,11 +12,6 @@ kafka: bootstrapServers: localhost:9093 zookeeper: localhost:2182 jmxPort: 9998 - - - name: localReplica - bootstrapServers: localhost:9094 - zookeeper: localhost:2181 - jmxPort: 9999 admin-client-timeout: 5000 zookeeper: connection-timeout: 1000 diff --git a/kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml b/kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml index 2cc6d8a830..f07812ca8a 100644 --- a/kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml +++ b/kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml @@ -72,7 +72,27 @@ paths: schema: $ref: '#/components/schemas/ClusterMetrics' - /api/clusters/{clusterName}/metrics/broker/{id}: + /api/clusters/{clusterName}/stats: + get: + tags: + - /api/clusters + summary: getClusterStats + operationId: getClusterStats + parameters: + - name: clusterName + in: path + required: true + schema: + type: string + responses: + 200: + description: OK + content: + application/json: + schema: + $ref: '#/components/schemas/ClusterStats' + + /api/clusters/{clusterName}/brokers/{id}/metrics: get: tags: - /api/clusters @@ -332,10 +352,10 @@ components: type: integer topicCount: type: integer - metrics: - type: array - items: - $ref: '#/components/schemas/Metric' + bytesInPerSec: + type: number + bytesOutPerSec: + type: number required: - id - name @@ -348,6 +368,14 @@ components: - offline ClusterMetrics: + type: object + properties: + items: + type: array + items: + $ref: '#/components/schemas/Metric' + + ClusterStats: type: object properties: brokerCount: @@ -379,11 +407,16 @@ components: segmentSize: type: integer format: int64 + segmentCount: + type: integer BrokerMetrics: type: object properties: - segmentZise: + segmentSize: + type: integer + format: int64 + segmentCount: type: integer metrics: type: array @@ -446,7 +479,9 @@ components: inSyncReplicas: type: integer bytesInPerSec: - type: integer + type: number + bytesOutPerSec: + type: number segmentSize: type: integer segmentCount: @@ -589,8 +624,14 @@ components: Metric: type: object properties: + name: + type: string canonicalName: type: string + params: + type: string + additionalProperties: + type: string value: type: string additionalProperties: