iliax 2 years ago
parent
commit
5474a34936
27 changed files with 297 additions and 302 deletions
  1. 1 1
      documentation/compose/kafka-ui-arm64.yaml
  2. 17 8
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/PrometheusExposeController.java
  3. 0 9
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/mapper/ClusterMapper.java
  4. 3 3
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/InternalBroker.java
  5. 0 11
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/InternalBrokerDiskUsage.java
  6. 0 55
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/InternalClusterMetrics.java
  7. 9 10
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/InternalClusterState.java
  8. 1 1
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/InternalLogDirStats.java
  9. 8 0
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/InternalPartitionsOffsets.java
  10. 0 13
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/InternalSegmentSizeDto.java
  11. 19 15
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/InternalTopic.java
  12. 1 1
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/Metrics.java
  13. 9 3
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/PartitionDistributionStats.java
  14. 6 8
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/Statistics.java
  15. 1 1
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/BrokerService.java
  16. 2 2
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/ClustersStatisticsScheduler.java
  17. 10 18
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/StatisticsCache.java
  18. 0 10
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/StatisticsService.java
  19. 30 24
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/TopicsService.java
  20. 12 11
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/integration/odd/TopicsExporter.java
  21. 45 5
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/metrics/scrape/ScrapedClusterState.java
  22. 3 2
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/metrics/scrape/inferred/InferredMetrics.java
  23. 1 1
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/metrics/scrape/prometheus/PrometheusEndpointMetricsParser.java
  24. 15 20
      kafka-ui-api/src/test/java/com/provectus/kafka/ui/model/PartitionDistributionStatsTest.java
  25. 8 8
      kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/TopicsServicePaginationTest.java
  26. 52 39
      kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/integration/odd/TopicsExporterTest.java
  27. 44 23
      kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/metrics/scrape/IoRatesMetricsScannerTest.java

+ 1 - 1
documentation/compose/kafka-ui-arm64.yaml

@@ -15,7 +15,7 @@ services:
     environment:
     environment:
       KAFKA_CLUSTERS_0_NAME: local
       KAFKA_CLUSTERS_0_NAME: local
       KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka0:29092
       KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka0:29092
-      KAFKA_CLUSTERS_0_METRICS_PORT: 9997
+#      KAFKA_CLUSTERS_0_METRICS_PORT: 9997
       KAFKA_CLUSTERS_0_SCHEMAREGISTRY: http://schema-registry0:8085
       KAFKA_CLUSTERS_0_SCHEMAREGISTRY: http://schema-registry0:8085
       KAFKA_CLUSTERS_0_KAFKACONNECT_0_NAME: first
       KAFKA_CLUSTERS_0_KAFKACONNECT_0_NAME: first
       KAFKA_CLUSTERS_0_KAFKACONNECT_0_ADDRESS: http://kafka-connect0:8083
       KAFKA_CLUSTERS_0_KAFKACONNECT_0_ADDRESS: http://kafka-connect0:8083

+ 17 - 8
kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/PrometheusExposeController.java

@@ -9,6 +9,7 @@ import io.prometheus.client.exporter.common.TextFormat;
 import java.io.StringWriter;
 import java.io.StringWriter;
 import java.util.ArrayList;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.List;
+import java.util.stream.Collectors;
 import java.util.stream.Stream;
 import java.util.stream.Stream;
 import lombok.RequiredArgsConstructor;
 import lombok.RequiredArgsConstructor;
 import lombok.SneakyThrows;
 import lombok.SneakyThrows;
@@ -58,21 +59,29 @@ public class PrometheusExposeController extends AbstractController implements Pr
         .flatMap(c -> statisticsCache.get(c)
         .flatMap(c -> statisticsCache.get(c)
             .getMetrics()
             .getMetrics()
             .getSummarizedMetrics()
             .getSummarizedMetrics()
-            .map(mfs -> appendClusterLbl(mfs, c.getName())));
+            .map(mfs -> appendLbl(mfs, "cluster", c.getName())))
+        // merging MFS with same name
+        .collect(Collectors.toMap(mfs -> mfs.name, mfs -> mfs, PrometheusExposeController::merge))
+        .values()
+        .stream();
   }
   }
 
 
-  private static MetricFamilySamples appendClusterLbl(MetricFamilySamples mfs, String clusterName) {
+  private static MetricFamilySamples merge(MetricFamilySamples mfs1, MetricFamilySamples mfs2) {
     return new MetricFamilySamples(
     return new MetricFamilySamples(
-        mfs.name,
-        mfs.unit,
-        mfs.type,
-        mfs.help,
+        mfs1.name, mfs1.unit, mfs1.type, mfs1.help,
+        Stream.concat(mfs1.samples.stream(), mfs2.samples.stream()).toList()
+    );
+  }
+
+  private static MetricFamilySamples appendLbl(MetricFamilySamples mfs, String lblName, String lblVal) {
+    return new MetricFamilySamples(
+        mfs.name, mfs.unit, mfs.type, mfs.help,
         mfs.samples.stream()
         mfs.samples.stream()
             .map(sample ->
             .map(sample ->
                 new MetricFamilySamples.Sample(
                 new MetricFamilySamples.Sample(
                     sample.name,
                     sample.name,
-                    prependToList(sample.labelNames, "cluster"),
-                    prependToList(sample.labelValues, clusterName),
+                    prependToList(sample.labelNames, lblName),
+                    prependToList(sample.labelValues, lblVal),
                     sample.value
                     sample.value
                 )).toList()
                 )).toList()
     );
     );

+ 0 - 9
kafka-ui-api/src/main/java/com/provectus/kafka/ui/mapper/ClusterMapper.java

@@ -16,7 +16,6 @@ import com.provectus.kafka.ui.model.ConfigSynonymDTO;
 import com.provectus.kafka.ui.model.ConnectDTO;
 import com.provectus.kafka.ui.model.ConnectDTO;
 import com.provectus.kafka.ui.model.InternalBroker;
 import com.provectus.kafka.ui.model.InternalBroker;
 import com.provectus.kafka.ui.model.InternalBrokerConfig;
 import com.provectus.kafka.ui.model.InternalBrokerConfig;
-import com.provectus.kafka.ui.model.InternalBrokerDiskUsage;
 import com.provectus.kafka.ui.model.InternalClusterState;
 import com.provectus.kafka.ui.model.InternalClusterState;
 import com.provectus.kafka.ui.model.InternalPartition;
 import com.provectus.kafka.ui.model.InternalPartition;
 import com.provectus.kafka.ui.model.InternalReplica;
 import com.provectus.kafka.ui.model.InternalReplica;
@@ -121,14 +120,6 @@ public interface ClusterMapper {
     return map.values().stream().map(this::toPartition).collect(Collectors.toList());
     return map.values().stream().map(this::toPartition).collect(Collectors.toList());
   }
   }
 
 
-  default BrokerDiskUsageDTO map(Integer id, InternalBrokerDiskUsage internalBrokerDiskUsage) {
-    final BrokerDiskUsageDTO brokerDiskUsage = new BrokerDiskUsageDTO();
-    brokerDiskUsage.setBrokerId(id);
-    brokerDiskUsage.segmentCount((int) internalBrokerDiskUsage.getSegmentCount());
-    brokerDiskUsage.segmentSize(internalBrokerDiskUsage.getSegmentSize());
-    return brokerDiskUsage;
-  }
-
   static KafkaAclDTO.OperationEnum mapAclOperation(AclOperation operation) {
   static KafkaAclDTO.OperationEnum mapAclOperation(AclOperation operation) {
     return switch (operation) {
     return switch (operation) {
       case ALL -> KafkaAclDTO.OperationEnum.ALL;
       case ALL -> KafkaAclDTO.OperationEnum.ALL;

+ 3 - 3
kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/InternalBroker.java

@@ -21,12 +21,12 @@ public class InternalBroker {
 
 
   public InternalBroker(Node node,
   public InternalBroker(Node node,
                         PartitionDistributionStats partitionDistribution,
                         PartitionDistributionStats partitionDistribution,
-                        Statistics statistics) {
+                        Metrics metrics) {
     this.id = node.id();
     this.id = node.id();
     this.host = node.host();
     this.host = node.host();
     this.port = node.port();
     this.port = node.port();
-    this.bytesInPerSec = null; //statistics.getMetrics().getBrokerBytesInPerSec().get(node.id());
-    this.bytesOutPerSec = null;//statistics.getMetrics().getBrokerBytesOutPerSec().get(node.id());
+    this.bytesInPerSec = metrics.getIoRates().brokerBytesInPerSec().get(node.id());
+    this.bytesOutPerSec = metrics.getIoRates().brokerBytesOutPerSec().get(node.id());
     this.partitionsLeader = partitionDistribution.getPartitionLeaders().get(node);
     this.partitionsLeader = partitionDistribution.getPartitionLeaders().get(node);
     this.partitions = partitionDistribution.getPartitionsCount().get(node);
     this.partitions = partitionDistribution.getPartitionsCount().get(node);
     this.inSyncPartitions = partitionDistribution.getInSyncPartitions().get(node);
     this.inSyncPartitions = partitionDistribution.getInSyncPartitions().get(node);

+ 0 - 11
kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/InternalBrokerDiskUsage.java

@@ -1,11 +0,0 @@
-package com.provectus.kafka.ui.model;
-
-import lombok.Builder;
-import lombok.Data;
-
-@Data
-@Builder(toBuilder = true)
-public class InternalBrokerDiskUsage {
-  private final long segmentCount;
-  private final long segmentSize;
-}

+ 0 - 55
kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/InternalClusterMetrics.java

@@ -1,55 +0,0 @@
-package com.provectus.kafka.ui.model;
-
-import java.math.BigDecimal;
-import java.util.List;
-import java.util.Map;
-import javax.annotation.Nullable;
-import lombok.Builder;
-import lombok.Data;
-
-
-@Data
-@Builder(toBuilder = true)
-public class InternalClusterMetrics {
-
-  public static InternalClusterMetrics empty() {
-    return InternalClusterMetrics.builder()
-        .brokers(List.of())
-        .topics(Map.of())
-        .status(ServerStatusDTO.OFFLINE)
-        .internalBrokerMetrics(Map.of())
-        .metrics(List.of())
-        .version("unknown")
-        .build();
-  }
-
-  private final String version;
-
-  private final ServerStatusDTO status;
-  private final Throwable lastKafkaException;
-
-  private final int brokerCount;
-  private final int activeControllers;
-  private final List<Integer> brokers;
-
-  private final int topicCount;
-  private final Map<String, InternalTopic> topics;
-
-  // partitions stats
-  private final int underReplicatedPartitionCount;
-  private final int onlinePartitionCount;
-  private final int offlinePartitionCount;
-  private final int inSyncReplicasCount;
-  private final int outOfSyncReplicasCount;
-
-  // log dir stats
-  @Nullable // will be null if log dir collection disabled
-  private final Map<Integer, InternalBrokerDiskUsage> internalBrokerDiskUsage;
-
-  // metrics from metrics collector
-  private final BigDecimal bytesInPerSec;
-  private final BigDecimal bytesOutPerSec;
-  private final Map<Integer, BrokerMetrics> internalBrokerMetrics;
-  private final List<MetricDTO> metrics;
-
-}

+ 9 - 10
kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/InternalClusterState.java

@@ -36,21 +36,20 @@ public class InternalClusterState {
             .message(e.getMessage())
             .message(e.getMessage())
             .stackTrace(Throwables.getStackTraceAsString(e)))
             .stackTrace(Throwables.getStackTraceAsString(e)))
         .orElse(null);
         .orElse(null);
-    topicCount = statistics.getTopicDescriptions().size();
+    topicCount = (int) statistics.topicDescriptions().count();
     brokerCount = statistics.getClusterDescription().getNodes().size();
     brokerCount = statistics.getClusterDescription().getNodes().size();
     activeControllers = Optional.ofNullable(statistics.getClusterDescription().getController())
     activeControllers = Optional.ofNullable(statistics.getClusterDescription().getController())
         .map(Node::id)
         .map(Node::id)
         .orElse(null);
         .orElse(null);
     version = statistics.getVersion();
     version = statistics.getVersion();
 
 
-    if (statistics.getLogDirInfo() != null) {
-      diskUsage = statistics.getLogDirInfo().getBrokerStats().entrySet().stream()
-          .map(e -> new BrokerDiskUsageDTO()
-              .brokerId(e.getKey())
-              .segmentSize(e.getValue().getSegmentSize())
-              .segmentCount(e.getValue().getSegmentsCount()))
-          .collect(Collectors.toList());
-    }
+    diskUsage = statistics.getClusterState().getNodesStates().values().stream()
+        .filter(n -> n.segmentStats() != null)
+        .map(n -> new BrokerDiskUsageDTO()
+            .brokerId(n.id())
+            .segmentSize(n.segmentStats().getSegmentSize())
+            .segmentCount(n.segmentStats().getSegmentsCount()))
+        .collect(Collectors.toList());
 
 
     features = statistics.getFeatures();
     features = statistics.getFeatures();
 
 
@@ -72,7 +71,7 @@ public class InternalClusterState {
         .reduce(BigDecimal::add)
         .reduce(BigDecimal::add)
         .orElse(null);
         .orElse(null);
 
 
-    var partitionsStats = new PartitionsStats(statistics.getTopicDescriptions().values());
+    var partitionsStats = new PartitionsStats(statistics.topicDescriptions().toList());
     onlinePartitionCount = partitionsStats.getOnlinePartitionCount();
     onlinePartitionCount = partitionsStats.getOnlinePartitionCount();
     offlinePartitionCount = partitionsStats.getOfflinePartitionCount();
     offlinePartitionCount = partitionsStats.getOfflinePartitionCount();
     inSyncReplicasCount = partitionsStats.getInSyncReplicasCount();
     inSyncReplicasCount = partitionsStats.getInSyncReplicasCount();

+ 1 - 1
kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/InternalLogDirStats.java

@@ -25,7 +25,7 @@ public class InternalLogDirStats {
     Long segmentSize;
     Long segmentSize;
     Integer segmentsCount;
     Integer segmentsCount;
 
 
-    public SegmentStats(LongSummaryStatistics s) {
+    private SegmentStats(LongSummaryStatistics s) {
       segmentSize = s.getSum();
       segmentSize = s.getSum();
       segmentsCount = (int) s.getCount();
       segmentsCount = (int) s.getCount();
     }
     }

+ 8 - 0
kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/InternalPartitionsOffsets.java

@@ -4,6 +4,7 @@ import com.google.common.collect.HashBasedTable;
 import com.google.common.collect.Table;
 import com.google.common.collect.Table;
 import java.util.Map;
 import java.util.Map;
 import java.util.Optional;
 import java.util.Optional;
+import java.util.stream.Collectors;
 import lombok.Value;
 import lombok.Value;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.TopicPartition;
 
 
@@ -30,4 +31,11 @@ public class InternalPartitionsOffsets {
     return Optional.ofNullable(offsets.get(topic, partition));
     return Optional.ofNullable(offsets.get(topic, partition));
   }
   }
 
 
+  public Map<Integer, Long> topicOffsets(String topic, boolean earliest) {
+    return offsets.row(topic)
+        .entrySet()
+        .stream()
+        .collect(Collectors.toMap(Map.Entry::getKey, e -> earliest ? e.getValue().earliest : e.getValue().getLatest()));
+  }
+
 }
 }

+ 0 - 13
kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/InternalSegmentSizeDto.java

@@ -1,13 +0,0 @@
-package com.provectus.kafka.ui.model;
-
-import java.util.Map;
-import lombok.Builder;
-import lombok.Data;
-
-@Data
-@Builder(toBuilder = true)
-public class InternalSegmentSizeDto {
-
-  private final Map<String, InternalTopic> internalTopicWithSegmentSize;
-  private final InternalClusterMetrics clusterMetricsWithSegmentSize;
-}

+ 19 - 15
kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/InternalTopic.java

@@ -1,8 +1,12 @@
 package com.provectus.kafka.ui.model;
 package com.provectus.kafka.ui.model;
 
 
+import static com.provectus.kafka.ui.model.InternalLogDirStats.*;
+
+import com.provectus.kafka.ui.service.metrics.scrape.ScrapedClusterState;
 import java.math.BigDecimal;
 import java.math.BigDecimal;
 import java.util.List;
 import java.util.List;
 import java.util.Map;
 import java.util.Map;
+import java.util.Optional;
 import java.util.stream.Collectors;
 import java.util.stream.Collectors;
 import javax.annotation.Nullable;
 import javax.annotation.Nullable;
 import lombok.Builder;
 import lombok.Builder;
@@ -41,7 +45,8 @@ public class InternalTopic {
                                    List<ConfigEntry> configs,
                                    List<ConfigEntry> configs,
                                    InternalPartitionsOffsets partitionsOffsets,
                                    InternalPartitionsOffsets partitionsOffsets,
                                    Metrics metrics,
                                    Metrics metrics,
-                                   InternalLogDirStats logDirInfo,
+                                   @Nullable SegmentStats segmentStats,
+                                   @Nullable Map<Integer, SegmentStats> partitionsSegmentStats,
                                    @Nullable String internalTopicPrefix) {
                                    @Nullable String internalTopicPrefix) {
     var topic = InternalTopic.builder();
     var topic = InternalTopic.builder();
 
 
@@ -78,13 +83,12 @@ public class InternalTopic {
                 partitionDto.offsetMax(offsets.getLatest());
                 partitionDto.offsetMax(offsets.getLatest());
               });
               });
 
 
-          var segmentStats =
-              logDirInfo.getPartitionsStats().get(
-                  new TopicPartition(topicDescription.name(), partition.partition()));
-          if (segmentStats != null) {
-            partitionDto.segmentCount(segmentStats.getSegmentsCount());
-            partitionDto.segmentSize(segmentStats.getSegmentSize());
-          }
+          Optional.ofNullable(partitionsSegmentStats)
+              .flatMap(s -> Optional.ofNullable(s.get(partition.partition())))
+              .ifPresent(stats -> {
+                partitionDto.segmentCount(stats.getSegmentsCount());
+                partitionDto.segmentSize(stats.getSegmentSize());
+              });
 
 
           return partitionDto.build();
           return partitionDto.build();
         })
         })
@@ -105,14 +109,14 @@ public class InternalTopic {
             : topicDescription.partitions().get(0).replicas().size()
             : topicDescription.partitions().get(0).replicas().size()
     );
     );
 
 
-    var segmentStats = logDirInfo.getTopicStats().get(topicDescription.name());
-    if (segmentStats != null) {
-      topic.segmentCount(segmentStats.getSegmentsCount());
-      topic.segmentSize(segmentStats.getSegmentSize());
-    }
+    Optional.ofNullable(segmentStats)
+        .ifPresent(stats -> {
+          topic.segmentCount(stats.getSegmentsCount());
+          topic.segmentSize(stats.getSegmentSize());
+        });
 
 
-//    topic.bytesInPerSec(metrics.getTopicBytesInPerSec().get(topicDescription.name()));
-//    topic.bytesOutPerSec(metrics.getTopicBytesOutPerSec().get(topicDescription.name()));
+    topic.bytesInPerSec(metrics.getIoRates().topicBytesInPerSec().get(topicDescription.name()));
+    topic.bytesOutPerSec(metrics.getIoRates().topicBytesOutPerSec().get(topicDescription.name()));
 
 
     topic.topicConfigs(
     topic.topicConfigs(
         configs.stream().map(InternalTopicConfig::from).collect(Collectors.toList()));
         configs.stream().map(InternalTopicConfig::from).collect(Collectors.toList()));

+ 1 - 1
kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/Metrics.java

@@ -49,7 +49,7 @@ public class Metrics {
 
 
   public Stream<MetricFamilySamples> getSummarizedMetrics() {
   public Stream<MetricFamilySamples> getSummarizedMetrics() {
     return Streams.concat(
     return Streams.concat(
-        inferredMetrics.asList().stream(),
+        inferredMetrics.asStream(),
         perBrokerScrapedMetrics
         perBrokerScrapedMetrics
             .values()
             .values()
             .stream()
             .stream()

+ 9 - 3
kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/PartitionDistributionStats.java

@@ -1,14 +1,17 @@
 package com.provectus.kafka.ui.model;
 package com.provectus.kafka.ui.model;
 
 
+import com.provectus.kafka.ui.service.metrics.scrape.ScrapedClusterState;
 import java.math.BigDecimal;
 import java.math.BigDecimal;
 import java.math.RoundingMode;
 import java.math.RoundingMode;
 import java.util.HashMap;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.Map;
 import javax.annotation.Nullable;
 import javax.annotation.Nullable;
 import lombok.AccessLevel;
 import lombok.AccessLevel;
 import lombok.Getter;
 import lombok.Getter;
 import lombok.RequiredArgsConstructor;
 import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.mutable.MutableInt;
 import org.apache.kafka.clients.admin.TopicDescription;
 import org.apache.kafka.clients.admin.TopicDescription;
 import org.apache.kafka.common.Node;
 import org.apache.kafka.common.Node;
 import org.apache.kafka.common.TopicPartitionInfo;
 import org.apache.kafka.common.TopicPartitionInfo;
@@ -29,15 +32,18 @@ public class PartitionDistributionStats {
   private final boolean skewCanBeCalculated;
   private final boolean skewCanBeCalculated;
 
 
   public static PartitionDistributionStats create(Statistics stats) {
   public static PartitionDistributionStats create(Statistics stats) {
-    return create(stats, MIN_PARTITIONS_FOR_SKEW_CALCULATION);
+    return create(
+        stats.topicDescriptions().toList(),
+        MIN_PARTITIONS_FOR_SKEW_CALCULATION
+    );
   }
   }
 
 
-  static PartitionDistributionStats create(Statistics stats, int minPartitionsForSkewCalculation) {
+  static PartitionDistributionStats create(List<TopicDescription> topicDescriptions, int minPartitionsForSkewCalculation) {
     var partitionLeaders = new HashMap<Node, Integer>();
     var partitionLeaders = new HashMap<Node, Integer>();
     var partitionsReplicated = new HashMap<Node, Integer>();
     var partitionsReplicated = new HashMap<Node, Integer>();
     var isr = new HashMap<Node, Integer>();
     var isr = new HashMap<Node, Integer>();
     int partitionsCnt = 0;
     int partitionsCnt = 0;
-    for (TopicDescription td : stats.getTopicDescriptions().values()) {
+    for (TopicDescription td : topicDescriptions) {
       for (TopicPartitionInfo tp : td.partitions()) {
       for (TopicPartitionInfo tp : td.partitions()) {
         partitionsCnt++;
         partitionsCnt++;
         tp.replicas().forEach(r -> incr(partitionsReplicated, r));
         tp.replicas().forEach(r -> incr(partitionsReplicated, r));

+ 6 - 8
kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/Statistics.java

@@ -5,6 +5,7 @@ import com.provectus.kafka.ui.service.metrics.scrape.ScrapedClusterState;
 import java.util.List;
 import java.util.List;
 import java.util.Map;
 import java.util.Map;
 import java.util.Set;
 import java.util.Set;
+import java.util.stream.Stream;
 import lombok.Builder;
 import lombok.Builder;
 import lombok.Value;
 import lombok.Value;
 import org.apache.kafka.clients.admin.ConfigEntry;
 import org.apache.kafka.clients.admin.ConfigEntry;
@@ -21,11 +22,6 @@ public class Statistics {
   Metrics metrics;
   Metrics metrics;
   ScrapedClusterState clusterState;
   ScrapedClusterState clusterState;
 
 
-  //TODO: to be removed -->>
-  InternalLogDirStats logDirInfo;
-  Map<String, TopicDescription> topicDescriptions;
-  Map<String, List<ConfigEntry>> topicConfigs;
-
   public static Statistics empty() {
   public static Statistics empty() {
     return builder()
     return builder()
         .status(ServerStatusDTO.OFFLINE)
         .status(ServerStatusDTO.OFFLINE)
@@ -34,10 +30,12 @@ public class Statistics {
         .clusterDescription(
         .clusterDescription(
             new ReactiveAdminClient.ClusterDescription(null, null, List.of(), Set.of()))
             new ReactiveAdminClient.ClusterDescription(null, null, List.of(), Set.of()))
         .metrics(Metrics.empty())
         .metrics(Metrics.empty())
-        .logDirInfo(InternalLogDirStats.empty())
-        .topicDescriptions(Map.of())
-        .topicConfigs(Map.of())
         .clusterState(ScrapedClusterState.empty())
         .clusterState(ScrapedClusterState.empty())
         .build();
         .build();
   }
   }
+
+  public Stream<TopicDescription> topicDescriptions(){
+    return clusterState.getTopicStates().values().stream().map(ScrapedClusterState.TopicState::description);
+  }
+
 }
 }

+ 1 - 1
kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/BrokerService.java

@@ -72,7 +72,7 @@ public class BrokerService {
         .get(cluster)
         .get(cluster)
         .flatMap(ReactiveAdminClient::describeCluster)
         .flatMap(ReactiveAdminClient::describeCluster)
         .map(description -> description.getNodes().stream()
         .map(description -> description.getNodes().stream()
-            .map(node -> new InternalBroker(node, partitionsDistribution, stats))
+            .map(node -> new InternalBroker(node, partitionsDistribution, stats.getMetrics()))
             .collect(Collectors.toList()))
             .collect(Collectors.toList()))
         .flatMapMany(Flux::fromIterable);
         .flatMapMany(Flux::fromIterable);
   }
   }

+ 2 - 2
kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/ClustersStatisticsScheduler.java

@@ -22,9 +22,9 @@ public class ClustersStatisticsScheduler {
         .parallel()
         .parallel()
         .runOn(Schedulers.parallel())
         .runOn(Schedulers.parallel())
         .flatMap(cluster -> {
         .flatMap(cluster -> {
-          log.debug("Start getting metrics for kafkaCluster: {}", cluster.getName());
+          log.debug("Start collection statistics for cluster: {}", cluster.getName());
           return statisticsService.updateCache(cluster)
           return statisticsService.updateCache(cluster)
-              .doOnSuccess(m -> log.debug("Metrics updated for cluster: {}", cluster.getName()));
+              .doOnSuccess(m -> log.debug("Statistics updated for cluster: {}", cluster.getName()));
         })
         })
         .then()
         .then()
         .block();
         .block();

+ 10 - 18
kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/StatisticsCache.java

@@ -1,5 +1,6 @@
 package com.provectus.kafka.ui.service;
 package com.provectus.kafka.ui.service;
 
 
+import com.provectus.kafka.ui.model.InternalPartitionsOffsets;
 import com.provectus.kafka.ui.model.KafkaCluster;
 import com.provectus.kafka.ui.model.KafkaCluster;
 import com.provectus.kafka.ui.model.ServerStatusDTO;
 import com.provectus.kafka.ui.model.ServerStatusDTO;
 import com.provectus.kafka.ui.model.Statistics;
 import com.provectus.kafka.ui.model.Statistics;
@@ -28,38 +29,29 @@ public class StatisticsCache {
 
 
   public synchronized void update(KafkaCluster c,
   public synchronized void update(KafkaCluster c,
                                   Map<String, TopicDescription> descriptions,
                                   Map<String, TopicDescription> descriptions,
-                                  Map<String, List<ConfigEntry>> configs) {
-    var metrics = get(c);
-    var updatedDescriptions = new HashMap<>(metrics.getTopicDescriptions());
-    updatedDescriptions.putAll(descriptions);
-    var updatedConfigs = new HashMap<>(metrics.getTopicConfigs());
-    updatedConfigs.putAll(configs);
+                                  Map<String, List<ConfigEntry>> configs,
+                                  InternalPartitionsOffsets partitionsOffsets) {
+    var stats = get(c);
     replace(
     replace(
         c,
         c,
-        metrics.toBuilder()
-            .topicDescriptions(updatedDescriptions)
-            .topicConfigs(updatedConfigs)
+        stats.toBuilder()
+            .clusterState(stats.getClusterState().updateTopics(descriptions, configs, partitionsOffsets))
             .build()
             .build()
     );
     );
   }
   }
 
 
   public synchronized void onTopicDelete(KafkaCluster c, String topic) {
   public synchronized void onTopicDelete(KafkaCluster c, String topic) {
-    var metrics = get(c);
-    var updatedDescriptions = new HashMap<>(metrics.getTopicDescriptions());
-    updatedDescriptions.remove(topic);
-    var updatedConfigs = new HashMap<>(metrics.getTopicConfigs());
-    updatedConfigs.remove(topic);
+    var stats = get(c);
     replace(
     replace(
         c,
         c,
-        metrics.toBuilder()
-            .topicDescriptions(updatedDescriptions)
-            .topicConfigs(updatedConfigs)
+        stats.toBuilder()
+            .clusterState(stats.getClusterState().topicDeleted(topic))
             .build()
             .build()
     );
     );
   }
   }
 
 
   public Statistics get(KafkaCluster c) {
   public Statistics get(KafkaCluster c) {
-    return Objects.requireNonNull(cache.get(c.getName()), "Unknown cluster metrics requested");
+    return Objects.requireNonNull(cache.get(c.getName()), "Statistics for unknown cluster requested");
   }
   }
 
 
 }
 }

+ 0 - 10
kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/StatisticsService.java

@@ -47,16 +47,6 @@ public class StatisticsService {
                                             .metrics(metrics)
                                             .metrics(metrics)
                                             .features(featuresAndState.getT1())
                                             .features(featuresAndState.getT1())
                                             .clusterState(featuresAndState.getT2())
                                             .clusterState(featuresAndState.getT2())
-                                            //TODO: RM ->>>
-                                            .topicDescriptions(
-                                                featuresAndState.getT2().getTopicStates().entrySet().stream()
-                                                    .collect(Collectors.toMap(
-                                                        Map.Entry::getKey, e -> e.getValue().description())))
-                                            .topicConfigs(
-                                                featuresAndState.getT2().getTopicStates().entrySet().stream()
-                                                    .collect(Collectors.toMap(
-                                                        Map.Entry::getKey, e -> e.getValue().configs())))
-                                            .logDirInfo(InternalLogDirStats.empty())
                                             .build())))))
                                             .build())))))
         .doOnError(e ->
         .doOnError(e ->
             log.error("Failed to collect cluster {} info", cluster.getName(), e))
             log.error("Failed to collect cluster {} info", cluster.getName(), e))

+ 30 - 24
kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/TopicsService.java

@@ -1,5 +1,6 @@
 package com.provectus.kafka.ui.service;
 package com.provectus.kafka.ui.service;
 
 
+import static com.provectus.kafka.ui.service.metrics.scrape.ScrapedClusterState.*;
 import static java.util.stream.Collectors.toList;
 import static java.util.stream.Collectors.toList;
 import static java.util.stream.Collectors.toMap;
 import static java.util.stream.Collectors.toMap;
 
 
@@ -25,6 +26,7 @@ import com.provectus.kafka.ui.model.ReplicationFactorChangeResponseDTO;
 import com.provectus.kafka.ui.model.Statistics;
 import com.provectus.kafka.ui.model.Statistics;
 import com.provectus.kafka.ui.model.TopicCreationDTO;
 import com.provectus.kafka.ui.model.TopicCreationDTO;
 import com.provectus.kafka.ui.model.TopicUpdateDTO;
 import com.provectus.kafka.ui.model.TopicUpdateDTO;
+import com.provectus.kafka.ui.service.metrics.scrape.ScrapedClusterState;
 import java.time.Duration;
 import java.time.Duration;
 import java.util.Collection;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.Collections;
@@ -71,20 +73,19 @@ public class TopicsService {
     return adminClientService.get(c)
     return adminClientService.get(c)
         .flatMap(ac ->
         .flatMap(ac ->
             ac.describeTopics(topics).zipWith(ac.getTopicsConfig(topics, false),
             ac.describeTopics(topics).zipWith(ac.getTopicsConfig(topics, false),
-                (descriptions, configs) -> {
-                  statisticsCache.update(c, descriptions, configs);
-                  return getPartitionOffsets(descriptions, ac).map(offsets -> {
-                    var metrics = statisticsCache.get(c);
-                    return createList(
-                        topics,
-                        descriptions,
-                        configs,
-                        offsets,
-                        metrics.getMetrics(),
-                        metrics.getLogDirInfo()
-                    );
-                  });
-                })).flatMap(Function.identity());
+                (descriptions, configs) ->
+                    getPartitionOffsets(descriptions, ac).map(offsets -> {
+                      statisticsCache.update(c, descriptions, configs, offsets);
+                      var stats = statisticsCache.get(c);
+                      return createList(
+                          topics,
+                          descriptions,
+                          configs,
+                          offsets,
+                          stats.getMetrics(),
+                          stats.getClusterState()
+                      );
+                    }))).flatMap(Function.identity());
   }
   }
 
 
   private Mono<InternalTopic> loadTopic(KafkaCluster c, String topicName) {
   private Mono<InternalTopic> loadTopic(KafkaCluster c, String topicName) {
@@ -95,8 +96,8 @@ public class TopicsService {
   }
   }
 
 
   /**
   /**
-   *  After creation topic can be invisible via API for some time.
-   *  To workaround this, we retyring topic loading until it becomes visible.
+   * After creation topic can be invisible via API for some time.
+   * To workaround this, we retyring topic loading until it becomes visible.
    */
    */
   private Mono<InternalTopic> loadTopicAfterCreation(KafkaCluster c, String topicName) {
   private Mono<InternalTopic> loadTopicAfterCreation(KafkaCluster c, String topicName) {
     return loadTopic(c, topicName)
     return loadTopic(c, topicName)
@@ -122,7 +123,7 @@ public class TopicsService {
                                          Map<String, List<ConfigEntry>> configs,
                                          Map<String, List<ConfigEntry>> configs,
                                          InternalPartitionsOffsets partitionsOffsets,
                                          InternalPartitionsOffsets partitionsOffsets,
                                          Metrics metrics,
                                          Metrics metrics,
-                                         InternalLogDirStats logDirInfo) {
+                                         ScrapedClusterState clusterState) {
     return orderedNames.stream()
     return orderedNames.stream()
         .filter(descriptions::containsKey)
         .filter(descriptions::containsKey)
         .map(t -> InternalTopic.from(
         .map(t -> InternalTopic.from(
@@ -130,7 +131,8 @@ public class TopicsService {
             configs.getOrDefault(t, List.of()),
             configs.getOrDefault(t, List.of()),
             partitionsOffsets,
             partitionsOffsets,
             metrics,
             metrics,
-            logDirInfo,
+            Optional.ofNullable(clusterState.getTopicStates().get(t)).map(s -> s.segmentStats()).orElse(null),
+            Optional.ofNullable(clusterState.getTopicStates().get(t)).map(s -> s.partitionsSegmentStats()).orElse(null),
             clustersProperties.getInternalTopicPrefix()
             clustersProperties.getInternalTopicPrefix()
         ))
         ))
         .collect(toList());
         .collect(toList());
@@ -228,7 +230,7 @@ public class TopicsService {
   }
   }
 
 
   public Mono<InternalTopic> updateTopic(KafkaCluster cl, String topicName,
   public Mono<InternalTopic> updateTopic(KafkaCluster cl, String topicName,
-                                    Mono<TopicUpdateDTO> topicUpdate) {
+                                         Mono<TopicUpdateDTO> topicUpdate) {
     return topicUpdate
     return topicUpdate
         .flatMap(t -> updateTopic(cl, topicName, t));
         .flatMap(t -> updateTopic(cl, topicName, t));
   }
   }
@@ -447,17 +449,21 @@ public class TopicsService {
 
 
   public Mono<List<InternalTopic>> getTopicsForPagination(KafkaCluster cluster) {
   public Mono<List<InternalTopic>> getTopicsForPagination(KafkaCluster cluster) {
     Statistics stats = statisticsCache.get(cluster);
     Statistics stats = statisticsCache.get(cluster);
-    return filterExisting(cluster, stats.getTopicDescriptions().keySet())
+    Map<String, TopicState> topicStates = stats.getClusterState().getTopicStates();
+    return filterExisting(cluster, topicStates.keySet())
         .map(lst -> lst.stream()
         .map(lst -> lst.stream()
             .map(topicName ->
             .map(topicName ->
                 InternalTopic.from(
                 InternalTopic.from(
-                    stats.getTopicDescriptions().get(topicName),
-                    stats.getTopicConfigs().getOrDefault(topicName, List.of()),
+                    topicStates.get(topicName).description(),
+                    topicStates.get(topicName).configs(),
                     InternalPartitionsOffsets.empty(),
                     InternalPartitionsOffsets.empty(),
                     stats.getMetrics(),
                     stats.getMetrics(),
-                    stats.getLogDirInfo(),
+                    Optional.ofNullable(topicStates.get(topicName))
+                        .map(TopicState::segmentStats).orElse(null),
+                    Optional.ofNullable(topicStates.get(topicName))
+                        .map(TopicState::partitionsSegmentStats).orElse(null),
                     clustersProperties.getInternalTopicPrefix()
                     clustersProperties.getInternalTopicPrefix()
-                    ))
+                ))
             .collect(toList())
             .collect(toList())
         );
         );
   }
   }

+ 12 - 11
kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/integration/odd/TopicsExporter.java

@@ -1,8 +1,9 @@
 package com.provectus.kafka.ui.service.integration.odd;
 package com.provectus.kafka.ui.service.integration.odd;
 
 
+import static com.provectus.kafka.ui.service.metrics.scrape.ScrapedClusterState.TopicState;
+
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ImmutableMap;
 import com.provectus.kafka.ui.model.KafkaCluster;
 import com.provectus.kafka.ui.model.KafkaCluster;
-import com.provectus.kafka.ui.model.Statistics;
 import com.provectus.kafka.ui.service.StatisticsCache;
 import com.provectus.kafka.ui.service.StatisticsCache;
 import com.provectus.kafka.ui.service.integration.odd.schema.DataSetFieldsExtractors;
 import com.provectus.kafka.ui.service.integration.odd.schema.DataSetFieldsExtractors;
 import java.net.URI;
 import java.net.URI;
@@ -34,10 +35,10 @@ class TopicsExporter {
 
 
   Flux<DataEntityList> export(KafkaCluster cluster) {
   Flux<DataEntityList> export(KafkaCluster cluster) {
     String clusterOddrn = Oddrn.clusterOddrn(cluster);
     String clusterOddrn = Oddrn.clusterOddrn(cluster);
-    Statistics stats = statisticsCache.get(cluster);
-    return Flux.fromIterable(stats.getTopicDescriptions().keySet())
+    var clusterState = statisticsCache.get(cluster).getClusterState();
+    return Flux.fromIterable(clusterState.getTopicStates().keySet())
         .filter(topicFilter)
         .filter(topicFilter)
-        .flatMap(topic -> createTopicDataEntity(cluster, topic, stats))
+        .flatMap(topic -> createTopicDataEntity(cluster, topic, clusterState.getTopicStates().get(topic)))
         .buffer(100)
         .buffer(100)
         .map(topicsEntities ->
         .map(topicsEntities ->
             new DataEntityList()
             new DataEntityList()
@@ -45,7 +46,7 @@ class TopicsExporter {
                 .items(topicsEntities));
                 .items(topicsEntities));
   }
   }
 
 
-  private Mono<DataEntity> createTopicDataEntity(KafkaCluster cluster, String topic, Statistics stats) {
+  private Mono<DataEntity> createTopicDataEntity(KafkaCluster cluster, String topic, TopicState topicState) {
     KafkaPath topicOddrnPath = Oddrn.topicOddrnPath(cluster, topic);
     KafkaPath topicOddrnPath = Oddrn.topicOddrnPath(cluster, topic);
     return
     return
         Mono.zip(
         Mono.zip(
@@ -65,13 +66,13 @@ class TopicsExporter {
                       .addMetadataItem(
                       .addMetadataItem(
                           new MetadataExtension()
                           new MetadataExtension()
                               .schemaUrl(URI.create("wontbeused.oops"))
                               .schemaUrl(URI.create("wontbeused.oops"))
-                              .metadata(getTopicMetadata(topic, stats)));
+                              .metadata(getTopicMetadata(topicState)));
                 }
                 }
             );
             );
   }
   }
 
 
-  private Map<String, Object> getNonDefaultConfigs(String topic, Statistics stats) {
-    List<ConfigEntry> config = stats.getTopicConfigs().get(topic);
+  private Map<String, Object> getNonDefaultConfigs(TopicState topicState) {
+    List<ConfigEntry> config = topicState.configs();
     if (config == null) {
     if (config == null) {
       return Map.of();
       return Map.of();
     }
     }
@@ -80,12 +81,12 @@ class TopicsExporter {
         .collect(Collectors.toMap(ConfigEntry::name, ConfigEntry::value));
         .collect(Collectors.toMap(ConfigEntry::name, ConfigEntry::value));
   }
   }
 
 
-  private Map<String, Object> getTopicMetadata(String topic, Statistics stats) {
-    TopicDescription topicDescription = stats.getTopicDescriptions().get(topic);
+  private Map<String, Object> getTopicMetadata(TopicState topicState) {
+    TopicDescription topicDescription = topicState.description();
     return ImmutableMap.<String, Object>builder()
     return ImmutableMap.<String, Object>builder()
         .put("partitions", topicDescription.partitions().size())
         .put("partitions", topicDescription.partitions().size())
         .put("replication_factor", topicDescription.partitions().get(0).replicas().size())
         .put("replication_factor", topicDescription.partitions().get(0).replicas().size())
-        .putAll(getNonDefaultConfigs(topic, stats))
+        .putAll(getNonDefaultConfigs(topicState))
         .build();
         .build();
   }
   }
 
 

+ 45 - 5
kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/metrics/scrape/ScrapedClusterState.java

@@ -3,8 +3,10 @@ package com.provectus.kafka.ui.service.metrics.scrape;
 import static com.provectus.kafka.ui.model.InternalLogDirStats.*;
 import static com.provectus.kafka.ui.model.InternalLogDirStats.*;
 import static com.provectus.kafka.ui.service.ReactiveAdminClient.*;
 import static com.provectus.kafka.ui.service.ReactiveAdminClient.*;
 
 
+import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Table;
 import com.google.common.collect.Table;
 import com.provectus.kafka.ui.model.InternalLogDirStats;
 import com.provectus.kafka.ui.model.InternalLogDirStats;
+import com.provectus.kafka.ui.model.InternalPartitionsOffsets;
 import com.provectus.kafka.ui.service.ReactiveAdminClient;
 import com.provectus.kafka.ui.service.ReactiveAdminClient;
 import jakarta.annotation.Nullable;
 import jakarta.annotation.Nullable;
 import java.time.Instant;
 import java.time.Instant;
@@ -24,7 +26,7 @@ import org.apache.kafka.common.Node;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.TopicPartition;
 import reactor.core.publisher.Mono;
 import reactor.core.publisher.Mono;
 
 
-@Builder
+@Builder(toBuilder = true)
 @Value
 @Value
 public class ScrapedClusterState {
 public class ScrapedClusterState {
 
 
@@ -64,6 +66,44 @@ public class ScrapedClusterState {
         .build();
         .build();
   }
   }
 
 
+
+  public ScrapedClusterState updateTopics(Map<String, TopicDescription> descriptions,
+                                          Map<String, List<ConfigEntry>> configs,
+                                          InternalPartitionsOffsets partitionsOffsets) {
+    var updatedTopicStates = new HashMap<>(topicStates);
+    descriptions.forEach((topic, description) -> {
+      SegmentStats segmentStats = null;
+      Map<Integer, SegmentStats> partitionsSegmentStats = null;
+      if (topicStates.containsKey(topic)) {
+        segmentStats = topicStates.get(topic).segmentStats();
+        partitionsSegmentStats = topicStates.get(topic).partitionsSegmentStats();
+      }
+      updatedTopicStates.put(
+          topic,
+          new TopicState(
+              topic,
+              description,
+              configs.getOrDefault(topic, List.of()),
+              partitionsOffsets.topicOffsets(topic, true),
+              partitionsOffsets.topicOffsets(topic, false),
+              segmentStats,
+              partitionsSegmentStats
+          )
+      );
+    });
+    return toBuilder()
+        .topicStates(ImmutableMap.copyOf(updatedTopicStates))
+        .build();
+  }
+
+  public ScrapedClusterState topicDeleted(String topic) {
+    var newTopicStates = new HashMap<>(topicStates);
+    newTopicStates.remove(topic);
+    return toBuilder()
+        .topicStates(ImmutableMap.copyOf(newTopicStates))
+        .build();
+  }
+
   public static Mono<ScrapedClusterState> scrape(ClusterDescription clusterDescription,
   public static Mono<ScrapedClusterState> scrape(ClusterDescription clusterDescription,
                                                  ReactiveAdminClient ac) {
                                                  ReactiveAdminClient ac) {
     return Mono.zip(
     return Mono.zip(
@@ -109,11 +149,11 @@ public class ScrapedClusterState {
                 name,
                 name,
                 desc,
                 desc,
                 topicConfigs.getOrDefault(name, List.of()),
                 topicConfigs.getOrDefault(name, List.of()),
-                cutTopic(name, earliestOffsets),
-                cutTopic(name, latestOffsets),
+                filterTopic(name, earliestOffsets),
+                filterTopic(name, latestOffsets),
                 segmentStats.getTopicStats().get(name),
                 segmentStats.getTopicStats().get(name),
                 Optional.ofNullable(segmentStats.getPartitionsStats())
                 Optional.ofNullable(segmentStats.getPartitionsStats())
-                    .map(topicForFilter -> cutTopic(name, topicForFilter))
+                    .map(topicForFilter -> filterTopic(name, topicForFilter))
                     .orElse(null)
                     .orElse(null)
             )));
             )));
 
 
@@ -146,7 +186,7 @@ public class ScrapedClusterState {
     );
     );
   }
   }
 
 
-  private static <T> Map<Integer, T> cutTopic(String topicForFilter, Map<TopicPartition, T> tpMap) {
+  private static <T> Map<Integer, T> filterTopic(String topicForFilter, Map<TopicPartition, T> tpMap) {
     return tpMap.entrySet()
     return tpMap.entrySet()
         .stream()
         .stream()
         .filter(tp -> tp.getKey().topic().equals(topicForFilter))
         .filter(tp -> tp.getKey().topic().equals(topicForFilter))

+ 3 - 2
kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/metrics/scrape/inferred/InferredMetrics.java

@@ -3,6 +3,7 @@ package com.provectus.kafka.ui.service.metrics.scrape.inferred;
 import static io.prometheus.client.Collector.MetricFamilySamples;
 import static io.prometheus.client.Collector.MetricFamilySamples;
 
 
 import java.util.List;
 import java.util.List;
+import java.util.stream.Stream;
 
 
 //TODO: maybe rename to state-based metrics?
 //TODO: maybe rename to state-based metrics?
 public class InferredMetrics {
 public class InferredMetrics {
@@ -17,8 +18,8 @@ public class InferredMetrics {
     this.metrics = metrics;
     this.metrics = metrics;
   }
   }
 
 
-  public List<MetricFamilySamples> asList() {
-    return metrics;
+  public Stream<MetricFamilySamples> asStream() {
+    return metrics.stream();
   }
   }
 
 
 }
 }

+ 1 - 1
kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/metrics/scrape/prometheus/PrometheusEndpointMetricsParser.java

@@ -24,7 +24,7 @@ public class PrometheusEndpointMetricsParser {
   private static final Pattern PATTERN = Pattern.compile(
   private static final Pattern PATTERN = Pattern.compile(
       "(?<metricName>^\\w+)([ \t]*\\{*(?<properties>.*)}*)[ \\t]+(?<value>[\\d]+\\.?[\\d]+)?");
       "(?<metricName>^\\w+)([ \t]*\\{*(?<properties>.*)}*)[ \\t]+(?<value>[\\d]+\\.?[\\d]+)?");
 
 
-  static Optional<RawMetric> parse(String s) {
+  public static Optional<RawMetric> parse(String s) {
     Matcher matcher = PATTERN.matcher(s);
     Matcher matcher = PATTERN.matcher(s);
     if (matcher.matches()) {
     if (matcher.matches()) {
       String value = matcher.group("value");
       String value = matcher.group("value");

+ 15 - 20
kafka-ui-api/src/test/java/com/provectus/kafka/ui/model/PartitionDistributionStatsTest.java

@@ -23,28 +23,23 @@ class PartitionDistributionStatsTest {
     Node n4 = new Node(4, "n4", 9092);
     Node n4 = new Node(4, "n4", 9092);
 
 
     var stats = PartitionDistributionStats.create(
     var stats = PartitionDistributionStats.create(
-        Statistics.builder()
-            .clusterDescription(
-                new ReactiveAdminClient.ClusterDescription(null, "test", Set.of(n1, n2, n3), null))
-            .topicDescriptions(
-                Map.of(
-                    "t1", new TopicDescription(
-                        "t1", false,
-                        List.of(
-                            new TopicPartitionInfo(0, n1, List.of(n1, n2), List.of(n1, n2)),
-                            new TopicPartitionInfo(1, n2, List.of(n2, n3), List.of(n2, n3))
-                        )
-                    ),
-                    "t2", new TopicDescription(
-                        "t2", false,
-                        List.of(
-                            new TopicPartitionInfo(0, n1, List.of(n1, n2), List.of(n1, n2)),
-                            new TopicPartitionInfo(1, null, List.of(n2, n1), List.of(n1))
-                        )
-                    )
+        List.of(
+            new TopicDescription(
+                "t1", false,
+                List.of(
+                    new TopicPartitionInfo(0, n1, List.of(n1, n2), List.of(n1, n2)),
+                    new TopicPartitionInfo(1, n2, List.of(n2, n3), List.of(n2, n3))
+                )
+            ),
+            new TopicDescription(
+                "t2", false,
+                List.of(
+                    new TopicPartitionInfo(0, n1, List.of(n1, n2), List.of(n1, n2)),
+                    new TopicPartitionInfo(1, null, List.of(n2, n1), List.of(n1))
                 )
                 )
             )
             )
-            .build(), 4
+        ),
+        4
     );
     );
 
 
     assertThat(stats.getPartitionLeaders())
     assertThat(stats.getPartitionLeaders())

+ 8 - 8
kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/TopicsServicePaginationTest.java

@@ -69,7 +69,7 @@ class TopicsServicePaginationTest {
             .map(Objects::toString)
             .map(Objects::toString)
             .map(name -> new TopicDescription(name, false, List.of()))
             .map(name -> new TopicDescription(name, false, List.of()))
             .map(topicDescription -> InternalTopic.from(topicDescription, List.of(), null,
             .map(topicDescription -> InternalTopic.from(topicDescription, List.of(), null,
-                Metrics.empty(), InternalLogDirStats.empty(), "_"))
+                Metrics.empty(), null, null, "_"))
             .collect(Collectors.toMap(InternalTopic::getName, Function.identity()))
             .collect(Collectors.toMap(InternalTopic::getName, Function.identity()))
     );
     );
 
 
@@ -95,7 +95,7 @@ class TopicsServicePaginationTest {
         .map(Objects::toString)
         .map(Objects::toString)
         .map(name -> new TopicDescription(name, false, List.of()))
         .map(name -> new TopicDescription(name, false, List.of()))
         .map(topicDescription -> InternalTopic.from(topicDescription, List.of(), null,
         .map(topicDescription -> InternalTopic.from(topicDescription, List.of(), null,
-            Metrics.empty(), InternalLogDirStats.empty(), "_"))
+            Metrics.empty(), null, null, "_"))
         .collect(Collectors.toMap(InternalTopic::getName, Function.identity()));
         .collect(Collectors.toMap(InternalTopic::getName, Function.identity()));
     init(internalTopics);
     init(internalTopics);
 
 
@@ -122,7 +122,7 @@ class TopicsServicePaginationTest {
             .map(Objects::toString)
             .map(Objects::toString)
             .map(name -> new TopicDescription(name, false, List.of()))
             .map(name -> new TopicDescription(name, false, List.of()))
             .map(topicDescription -> InternalTopic.from(topicDescription, List.of(), null,
             .map(topicDescription -> InternalTopic.from(topicDescription, List.of(), null,
-                Metrics.empty(), InternalLogDirStats.empty(), "_"))
+                Metrics.empty(), null, null, "_"))
             .collect(Collectors.toMap(InternalTopic::getName, Function.identity()))
             .collect(Collectors.toMap(InternalTopic::getName, Function.identity()))
     );
     );
 
 
@@ -141,7 +141,7 @@ class TopicsServicePaginationTest {
             .map(Objects::toString)
             .map(Objects::toString)
             .map(name -> new TopicDescription(name, false, List.of()))
             .map(name -> new TopicDescription(name, false, List.of()))
             .map(topicDescription -> InternalTopic.from(topicDescription, List.of(), null,
             .map(topicDescription -> InternalTopic.from(topicDescription, List.of(), null,
-                Metrics.empty(), InternalLogDirStats.empty(), "_"))
+                Metrics.empty(), null, null, "_"))
             .collect(Collectors.toMap(InternalTopic::getName, Function.identity()))
             .collect(Collectors.toMap(InternalTopic::getName, Function.identity()))
     );
     );
 
 
@@ -160,7 +160,7 @@ class TopicsServicePaginationTest {
             .map(Objects::toString)
             .map(Objects::toString)
             .map(name -> new TopicDescription(name, Integer.parseInt(name) % 10 == 0, List.of()))
             .map(name -> new TopicDescription(name, Integer.parseInt(name) % 10 == 0, List.of()))
             .map(topicDescription -> InternalTopic.from(topicDescription, List.of(), null,
             .map(topicDescription -> InternalTopic.from(topicDescription, List.of(), null,
-                Metrics.empty(), InternalLogDirStats.empty(), "_"))
+                Metrics.empty(), null, null, "_"))
             .collect(Collectors.toMap(InternalTopic::getName, Function.identity()))
             .collect(Collectors.toMap(InternalTopic::getName, Function.identity()))
     );
     );
 
 
@@ -181,7 +181,7 @@ class TopicsServicePaginationTest {
             .map(Objects::toString)
             .map(Objects::toString)
             .map(name -> new TopicDescription(name, Integer.parseInt(name) % 5 == 0, List.of()))
             .map(name -> new TopicDescription(name, Integer.parseInt(name) % 5 == 0, List.of()))
             .map(topicDescription -> InternalTopic.from(topicDescription, List.of(), null,
             .map(topicDescription -> InternalTopic.from(topicDescription, List.of(), null,
-                Metrics.empty(), InternalLogDirStats.empty(), "_"))
+                Metrics.empty(), null, null, "_"))
             .collect(Collectors.toMap(InternalTopic::getName, Function.identity()))
             .collect(Collectors.toMap(InternalTopic::getName, Function.identity()))
     );
     );
 
 
@@ -202,7 +202,7 @@ class TopicsServicePaginationTest {
             .map(Objects::toString)
             .map(Objects::toString)
             .map(name -> new TopicDescription(name, false, List.of()))
             .map(name -> new TopicDescription(name, false, List.of()))
             .map(topicDescription -> InternalTopic.from(topicDescription, List.of(), null,
             .map(topicDescription -> InternalTopic.from(topicDescription, List.of(), null,
-                Metrics.empty(), InternalLogDirStats.empty(), "_"))
+                Metrics.empty(), null, null, "_"))
             .collect(Collectors.toMap(InternalTopic::getName, Function.identity()))
             .collect(Collectors.toMap(InternalTopic::getName, Function.identity()))
     );
     );
 
 
@@ -224,7 +224,7 @@ class TopicsServicePaginationTest {
                     new TopicPartitionInfo(p, null, List.of(), List.of()))
                     new TopicPartitionInfo(p, null, List.of(), List.of()))
                 .collect(Collectors.toList())))
                 .collect(Collectors.toList())))
         .map(topicDescription -> InternalTopic.from(topicDescription, List.of(), InternalPartitionsOffsets.empty(),
         .map(topicDescription -> InternalTopic.from(topicDescription, List.of(), InternalPartitionsOffsets.empty(),
-            Metrics.empty(), InternalLogDirStats.empty(), "_"))
+            Metrics.empty(), null, null, "_"))
         .collect(Collectors.toMap(InternalTopic::getName, Function.identity()));
         .collect(Collectors.toMap(InternalTopic::getName, Function.identity()));
 
 
     init(internalTopics);
     init(internalTopics);

+ 52 - 39
kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/integration/odd/TopicsExporterTest.java

@@ -1,5 +1,6 @@
 package com.provectus.kafka.ui.service.integration.odd;
 package com.provectus.kafka.ui.service.integration.odd;
 
 
+import static com.provectus.kafka.ui.service.metrics.scrape.ScrapedClusterState.*;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.mockito.ArgumentMatchers.anyString;
 import static org.mockito.ArgumentMatchers.anyString;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.mock;
@@ -8,6 +9,7 @@ import static org.mockito.Mockito.when;
 import com.provectus.kafka.ui.model.KafkaCluster;
 import com.provectus.kafka.ui.model.KafkaCluster;
 import com.provectus.kafka.ui.model.Statistics;
 import com.provectus.kafka.ui.model.Statistics;
 import com.provectus.kafka.ui.service.StatisticsCache;
 import com.provectus.kafka.ui.service.StatisticsCache;
+import com.provectus.kafka.ui.service.metrics.scrape.ScrapedClusterState;
 import com.provectus.kafka.ui.sr.api.KafkaSrClientApi;
 import com.provectus.kafka.ui.sr.api.KafkaSrClientApi;
 import com.provectus.kafka.ui.sr.model.SchemaSubject;
 import com.provectus.kafka.ui.sr.model.SchemaSubject;
 import com.provectus.kafka.ui.sr.model.SchemaType;
 import com.provectus.kafka.ui.sr.model.SchemaType;
@@ -57,15 +59,22 @@ class TopicsExporterTest {
 
 
     stats = Statistics.empty()
     stats = Statistics.empty()
         .toBuilder()
         .toBuilder()
-        .topicDescriptions(
-            Map.of(
-                "_hidden", new TopicDescription("_hidden", false, List.of(
-                    new TopicPartitionInfo(0, null, List.of(), List.of())
-                )),
-                "visible", new TopicDescription("visible", false, List.of(
-                    new TopicPartitionInfo(0, null, List.of(), List.of())
-                ))
-            )
+        .clusterState(
+            empty().toBuilder().topicStates(
+                Map.of(
+                    "_hidden",
+                    new TopicState(
+                        "_hidden",
+                        new TopicDescription("_hidden", false, List.of(
+                            new TopicPartitionInfo(0, null, List.of(), List.of())
+                        )), null, null, null, null, null),
+                    "visible",
+                    new TopicState("visible",
+                        new TopicDescription("visible", false, List.of(
+                            new TopicPartitionInfo(0, null, List.of(), List.of())
+                        )), null, null, null, null, null)
+                )
+            ).build()
         )
         )
         .build();
         .build();
 
 
@@ -99,40 +108,44 @@ class TopicsExporterTest {
 
 
     stats = Statistics.empty()
     stats = Statistics.empty()
         .toBuilder()
         .toBuilder()
-        .topicDescriptions(
-            Map.of(
-                "testTopic",
-                new TopicDescription(
-                    "testTopic",
-                    false,
-                    List.of(
-                        new TopicPartitionInfo(
-                            0,
-                            null,
+        .clusterState(
+            ScrapedClusterState.empty().toBuilder()
+                .topicStates(
+                    Map.of(
+                        "testTopic",
+                        new TopicState(
+                            "testTopic",
+                            new TopicDescription(
+                                "testTopic",
+                                false,
+                                List.of(
+                                    new TopicPartitionInfo(
+                                        0,
+                                        null,
+                                        List.of(
+                                            new Node(1, "host1", 9092),
+                                            new Node(2, "host2", 9092)
+                                        ),
+                                        List.of())
+                                )
+                            ),
                             List.of(
                             List.of(
-                                new Node(1, "host1", 9092),
-                                new Node(2, "host2", 9092)
+                                new ConfigEntry(
+                                    "custom.config",
+                                    "100500",
+                                    ConfigEntry.ConfigSource.DYNAMIC_TOPIC_CONFIG,
+                                    false,
+                                    false,
+                                    List.of(),
+                                    ConfigEntry.ConfigType.INT,
+                                    null
+                                )
                             ),
                             ),
-                            List.of())
-                    ))
-            )
-        )
-        .topicConfigs(
-            Map.of(
-                "testTopic", List.of(
-                    new ConfigEntry(
-                        "custom.config",
-                        "100500",
-                        ConfigEntry.ConfigSource.DYNAMIC_TOPIC_CONFIG,
-                        false,
-                        false,
-                        List.of(),
-                        ConfigEntry.ConfigType.INT,
-                        null
+                            null, null, null, null
+                        )
                     )
                     )
                 )
                 )
-            )
-        )
+                .build())
         .build();
         .build();
 
 
     StepVerifier.create(topicsExporter.export(cluster))
     StepVerifier.create(topicsExporter.export(cluster))

+ 44 - 23
kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/metrics/scrape/IoRatesMetricsScannerTest.java

@@ -1,8 +1,16 @@
 package com.provectus.kafka.ui.service.metrics.scrape;
 package com.provectus.kafka.ui.service.metrics.scrape;
 
 
+import static io.prometheus.client.Collector.MetricFamilySamples;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThat;
 
 
+import com.provectus.kafka.ui.service.metrics.RawMetric;
+import com.provectus.kafka.ui.service.metrics.scrape.prometheus.PrometheusEndpointMetricsParser;
 import java.math.BigDecimal;
 import java.math.BigDecimal;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.stream.Collectors;
 import org.apache.kafka.common.Node;
 import org.apache.kafka.common.Node;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.Test;
 
 
@@ -13,13 +21,15 @@ class IoRatesMetricsScannerTest {
   @Test
   @Test
   void bytesIoTopicMetricsPopulated() {
   void bytesIoTopicMetricsPopulated() {
     populateWith(
     populateWith(
-        new Node(0, "host", 123),
-        "kafka_server_BrokerTopicMetrics_FifteenMinuteRate{name=\"BytesInPerSec\",topic=\"test-topic\",} 1.0",
-        "kafka_server_BrokerTopicMetrics_FifteenMinuteRate{name=\"BytesOutPerSec\",topic=\"test-topic\",} 2.0",
-        "kafka_server_brokertopicmetrics_fifteenminuterate{name=\"bytesinpersec\",topic=\"test-topic\",} 1.0",
-        "kafka_server_brokertopicmetrics_fifteenminuterate{name=\"bytesoutpersec\",topic=\"test-topic\",} 2.0",
-        "some_unknown_prefix_brokertopicmetrics_fifteenminuterate{name=\"bytesinpersec\",topic=\"test-topic\",} 1.0",
-        "some_unknown_prefix_brokertopicmetrics_fifteenminuterate{name=\"bytesoutpersec\",topic=\"test-topic\",} 2.0"
+        nodeMetrics(
+            new Node(0, "host", 123),
+            "kafka_server_BrokerTopicMetrics_FifteenMinuteRate{name=\"BytesInPerSec\",topic=\"test-topic\",} 1.0",
+            "kafka_server_BrokerTopicMetrics_FifteenMinuteRate{name=\"BytesOutPerSec\",topic=\"test-topic\",} 2.0",
+            "kafka_server_brokertopicmetrics_fifteenminuterate{name=\"bytesinpersec\",topic=\"test-topic\",} 1.0",
+            "kafka_server_brokertopicmetrics_fifteenminuterate{name=\"bytesoutpersec\",topic=\"test-topic\",} 2.0",
+            "some_unknown_prefix_brokertopicmetrics_fifteenminuterate{name=\"bytesinpersec\",topic=\"test-topic\",} 1.0",
+            "some_unknown_prefix_brokertopicmetrics_fifteenminuterate{name=\"bytesoutpersec\",topic=\"test-topic\",} 2.0"
+        )
     );
     );
     assertThat(ioRatesMetricsScanner.bytesInFifteenMinuteRate)
     assertThat(ioRatesMetricsScanner.bytesInFifteenMinuteRate)
         .containsEntry("test-topic", new BigDecimal("3.0"));
         .containsEntry("test-topic", new BigDecimal("3.0"));
@@ -30,14 +40,16 @@ class IoRatesMetricsScannerTest {
   @Test
   @Test
   void bytesIoBrokerMetricsPopulated() {
   void bytesIoBrokerMetricsPopulated() {
     populateWith(
     populateWith(
-        new Node(1, "host1", 123),
-        "kafka_server_BrokerTopicMetrics_FifteenMinuteRate{name=\"BytesInPerSec\",} 1.0",
-        "kafka_server_BrokerTopicMetrics_FifteenMinuteRate{name=\"BytesOutPerSec\",} 2.0"
-    );
-    populateWith(
-        new Node(2, "host2", 345),
-        "some_unknown_prefix_brokertopicmetrics_fifteenminuterate{name=\"bytesinpersec\",} 10.0",
-        "some_unknown_prefix_brokertopicmetrics_fifteenminuterate{name=\"bytesoutpersec\",} 20.0"
+        nodeMetrics(
+            new Node(1, "host1", 123),
+            "kafka_server_BrokerTopicMetrics_FifteenMinuteRate{name=\"BytesInPerSec\",} 1.0",
+            "kafka_server_BrokerTopicMetrics_FifteenMinuteRate{name=\"BytesOutPerSec\",} 2.0"
+        ),
+        nodeMetrics(
+            new Node(2, "host2", 345),
+            "some_unknown_prefix_brokertopicmetrics_fifteenminuterate{name=\"bytesinpersec\",} 10.0",
+            "some_unknown_prefix_brokertopicmetrics_fifteenminuterate{name=\"bytesoutpersec\",} 20.0"
+        )
     );
     );
 
 
     assertThat(ioRatesMetricsScanner.brokerBytesInFifteenMinuteRate)
     assertThat(ioRatesMetricsScanner.brokerBytesInFifteenMinuteRate)
@@ -51,14 +63,23 @@ class IoRatesMetricsScannerTest {
         .containsEntry(2, new BigDecimal("20.0"));
         .containsEntry(2, new BigDecimal("20.0"));
   }
   }
 
 
-  private void populateWith(Node n, String... prometheusMetric) {
-    //TODO: uncomment
-//    wellKnownMetrics = new WellKnownMetrics(
-//        Arrays.stream(prometheusMetric)
-//        .map(PrometheusEndpointMetricsParser::parse)
-//        .filter(Optional::isPresent)
-//        .map(Optional::get)
-//    );
+  private void populateWith(Map.Entry<Integer, List<MetricFamilySamples>>... entries) {
+    ioRatesMetricsScanner = new IoRatesMetricsScanner(
+        Arrays.stream(entries).collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue))
+    );
+  }
+
+  private Map.Entry<Integer, List<MetricFamilySamples>> nodeMetrics(Node n, String... prometheusMetrics) {
+    return Map.entry(
+        n.id(),
+        RawMetric.groupIntoMFS(
+            Arrays.stream(prometheusMetrics)
+                .map(PrometheusEndpointMetricsParser::parse)
+                .filter(Optional::isPresent)
+                .map(Optional::get)
+                .toList()
+        ).toList()
+    );
   }
   }
 
 
 }
 }