iliax 2 years ago
parent
commit
70b08498d9
34 changed files with 616 additions and 316 deletions
  1. 2 1
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/BrokersController.java
  2. 22 9
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/mapper/ClusterMapper.java
  3. 11 11
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/mapper/DescribeLogDirsMapper.java
  4. 2 2
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/InternalBroker.java
  5. 8 4
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/InternalClusterState.java
  6. 44 8
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/InternalLogDirStats.java
  7. 2 5
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/InternalTopic.java
  8. 1 1
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/KafkaCluster.java
  9. 51 17
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/Metrics.java
  10. 1 1
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/Statistics.java
  11. 6 5
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/BrokerService.java
  12. 2 2
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/KafkaClusterFactory.java
  13. 3 12
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/ReactiveAdminClient.java
  14. 11 20
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/StatisticsService.java
  15. 19 24
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/metrics/RawMetric.java
  16. 9 9
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/metrics/scrape/MetricsScrapping.java
  17. 1 1
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/metrics/scrape/PerBrokerScrapedMetrics.java
  18. 157 0
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/metrics/scrape/ScrapedClusterState.java
  19. 1 1
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/metrics/scrape/WellKnownMetrics.java
  20. 2 1
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/metrics/scrape/inferred/InferredMetrics.java
  21. 234 0
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/metrics/scrape/inferred/InferredMetricsScraper.java
  22. 1 1
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/metrics/scrape/jmx/JmxMetricsFormatter.java
  23. 1 2
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/metrics/scrape/jmx/JmxMetricsRetriever.java
  24. 2 2
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/metrics/scrape/jmx/JmxMetricsScraper.java
  25. 1 1
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/metrics/scrape/jmx/JmxSslSocketFactory.java
  26. 2 2
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/metrics/scrape/prometheus/PrometheusEndpointMetricsParser.java
  27. 2 2
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/metrics/scrape/prometheus/PrometheusMetricsRetriever.java
  28. 2 2
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/metrics/scrape/prometheus/PrometheusScraper.java
  29. 0 93
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/metrics/v2/scrape/ScrapedClusterState.java
  30. 0 30
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/metrics/v2/scrape/inferred/InferredMetricsScraper.java
  31. 9 40
      kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/metrics/scrape/WellKnownMetricsTest.java
  32. 2 2
      kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/metrics/scrape/jmx/JmxMetricsFormatterTest.java
  33. 2 2
      kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/metrics/scrape/prometheus/PrometheusEndpointMetricsParserTest.java
  34. 3 3
      kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/metrics/scrape/prometheus/PrometheusMetricsRetrieverTest.java

+ 2 - 1
kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/BrokersController.java

@@ -12,6 +12,7 @@ import com.provectus.kafka.ui.model.rbac.AccessContext;
 import com.provectus.kafka.ui.model.rbac.permission.ClusterConfigAction;
 import com.provectus.kafka.ui.service.BrokerService;
 import com.provectus.kafka.ui.service.rbac.AccessControlService;
+import io.prometheus.client.Collector;
 import java.util.List;
 import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
@@ -50,7 +51,7 @@ public class BrokersController extends AbstractController implements BrokersApi
 
     return validateAccess.then(
         brokerService.getBrokerMetrics(getCluster(clusterName), id)
-            .map(clusterMapper::toBrokerMetrics)
+            .map(metrics -> clusterMapper.toBrokerMetrics(metrics.stream()))
             .map(ResponseEntity::ok)
             .onErrorReturn(ResponseEntity.notFound().build())
     );

+ 22 - 9
kafka-ui-api/src/main/java/com/provectus/kafka/ui/mapper/ClusterMapper.java

@@ -1,5 +1,7 @@
 package com.provectus.kafka.ui.mapper;
 
+import static io.prometheus.client.Collector.*;
+
 import com.provectus.kafka.ui.config.ClustersProperties;
 import com.provectus.kafka.ui.model.BrokerConfigDTO;
 import com.provectus.kafka.ui.model.BrokerDTO;
@@ -31,9 +33,14 @@ import com.provectus.kafka.ui.model.TopicConfigDTO;
 import com.provectus.kafka.ui.model.TopicDTO;
 import com.provectus.kafka.ui.model.TopicDetailsDTO;
 import com.provectus.kafka.ui.service.metrics.RawMetric;
+import io.prometheus.client.Collector;
+import java.math.BigDecimal;
+import java.util.Collection;
 import java.util.List;
 import java.util.Map;
 import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import java.util.stream.Stream;
 import org.apache.kafka.clients.admin.ConfigEntry;
 import org.apache.kafka.common.acl.AccessControlEntry;
 import org.apache.kafka.common.acl.AclBinding;
@@ -55,19 +62,25 @@ public interface ClusterMapper {
   @Deprecated
   default ClusterMetricsDTO toClusterMetrics(Metrics metrics) {
     return new ClusterMetricsDTO()
-        .items(metrics.getSummarizedMetrics().map(this::convert).collect(Collectors.toList()));
+        .items(convert(metrics.getSummarizedBrokersMetrics()).toList());
   }
 
-  private MetricDTO convert(RawMetric rawMetric) {
-    return new MetricDTO()
-        .name(rawMetric.name())
-        .labels(rawMetric.labels())
-        .value(rawMetric.value());
+  private Stream<MetricDTO> convert(Stream<MetricFamilySamples> metrics) {
+    return metrics
+        .flatMap(m -> m.samples.stream())
+        .map(s ->
+            new MetricDTO()
+                .name(s.name)
+                .labels(IntStream.range(0, s.labelNames.size())
+                    .boxed()
+                    .collect(Collectors.toMap(s.labelNames::get, s.labelValues::get)))
+                .value(BigDecimal.valueOf(s.value))
+        );
   }
 
-  default BrokerMetricsDTO toBrokerMetrics(List<RawMetric> metrics) {
-    return new BrokerMetricsDTO()
-        .metrics(metrics.stream().map(this::convert).collect(Collectors.toList()));
+  @Deprecated
+  default BrokerMetricsDTO toBrokerMetrics(Stream<MetricFamilySamples> metrics) {
+    return new BrokerMetricsDTO().metrics(convert(metrics).toList());
   }
 
   @Mapping(target = "isSensitive", source = "sensitive")

+ 11 - 11
kafka-ui-api/src/main/java/com/provectus/kafka/ui/mapper/DescribeLogDirsMapper.java

@@ -7,6 +7,8 @@ import java.util.Collection;
 import java.util.List;
 import java.util.Map;
 import java.util.stream.Collectors;
+import org.apache.kafka.clients.admin.LogDirDescription;
+import org.apache.kafka.clients.admin.ReplicaInfo;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.requests.DescribeLogDirsResponse;
@@ -16,7 +18,7 @@ import org.springframework.stereotype.Component;
 public class DescribeLogDirsMapper {
 
   public List<BrokersLogdirsDTO> toBrokerLogDirsList(
-      Map<Integer, Map<String, DescribeLogDirsResponse.LogDirInfo>> logDirsInfo) {
+      Map<Integer, Map<String, LogDirDescription>> logDirsInfo) {
 
     return logDirsInfo.entrySet().stream().map(
         mapEntry -> mapEntry.getValue().entrySet().stream()
@@ -26,13 +28,13 @@ public class DescribeLogDirsMapper {
   }
 
   private BrokersLogdirsDTO toBrokerLogDirs(Integer broker, String dirName,
-                                            DescribeLogDirsResponse.LogDirInfo logDirInfo) {
+                                            LogDirDescription logDirInfo) {
     BrokersLogdirsDTO result = new BrokersLogdirsDTO();
     result.setName(dirName);
-    if (logDirInfo.error != null && logDirInfo.error != Errors.NONE) {
-      result.setError(logDirInfo.error.message());
+    if (logDirInfo.error() != null) {
+      result.setError(logDirInfo.error().getMessage());
     }
-    var topics = logDirInfo.replicaInfos.entrySet().stream()
+    var topics = logDirInfo.replicaInfos().entrySet().stream()
         .collect(Collectors.groupingBy(e -> e.getKey().topic())).entrySet().stream()
         .map(e -> toTopicLogDirs(broker, e.getKey(), e.getValue()))
         .collect(Collectors.toList());
@@ -41,8 +43,7 @@ public class DescribeLogDirsMapper {
   }
 
   private BrokerTopicLogdirsDTO toTopicLogDirs(Integer broker, String name,
-                                               List<Map.Entry<TopicPartition,
-                                                   DescribeLogDirsResponse.ReplicaInfo>> partitions) {
+                                               List<Map.Entry<TopicPartition, ReplicaInfo>> partitions) {
     BrokerTopicLogdirsDTO topic = new BrokerTopicLogdirsDTO();
     topic.setName(name);
     topic.setPartitions(
@@ -54,13 +55,12 @@ public class DescribeLogDirsMapper {
   }
 
   private BrokerTopicPartitionLogdirDTO topicPartitionLogDir(Integer broker, Integer partition,
-                                                             DescribeLogDirsResponse.ReplicaInfo
-                                                                 replicaInfo) {
+                                                             ReplicaInfo replicaInfo) {
     BrokerTopicPartitionLogdirDTO logDir = new BrokerTopicPartitionLogdirDTO();
     logDir.setBroker(broker);
     logDir.setPartition(partition);
-    logDir.setSize(replicaInfo.size);
-    logDir.setOffsetLag(replicaInfo.offsetLag);
+    logDir.setSize(replicaInfo.size());
+    logDir.setOffsetLag(replicaInfo.offsetLag());
     return logDir;
   }
 }

+ 2 - 2
kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/InternalBroker.java

@@ -25,8 +25,8 @@ public class InternalBroker {
     this.id = node.id();
     this.host = node.host();
     this.port = node.port();
-    this.bytesInPerSec = statistics.getMetrics().getBrokerBytesInPerSec().get(node.id());
-    this.bytesOutPerSec = statistics.getMetrics().getBrokerBytesOutPerSec().get(node.id());
+    this.bytesInPerSec = null; //statistics.getMetrics().getBrokerBytesInPerSec().get(node.id());
+    this.bytesOutPerSec = null;//statistics.getMetrics().getBrokerBytesOutPerSec().get(node.id());
     this.partitionsLeader = partitionDistribution.getPartitionLeaders().get(node);
     this.partitions = partitionDistribution.getPartitionsCount().get(node);
     this.inSyncPartitions = partitionDistribution.getInSyncPartitions().get(node);

+ 8 - 4
kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/InternalClusterState.java

@@ -56,15 +56,19 @@ public class InternalClusterState {
 
     bytesInPerSec = statistics
         .getMetrics()
-        .getBrokerBytesInPerSec()
-        .values().stream()
+        .getIoRates()
+        .brokerBytesInPerSec()
+        .values()
+        .stream()
         .reduce(BigDecimal::add)
         .orElse(null);
 
     bytesOutPerSec = statistics
         .getMetrics()
-        .getBrokerBytesOutPerSec()
-        .values().stream()
+        .getIoRates()
+        .brokerBytesOutPerSec()
+        .values()
+        .stream()
         .reduce(BigDecimal::add)
         .orElse(null);
 

+ 44 - 8
kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/InternalLogDirStats.java

@@ -3,14 +3,16 @@ package com.provectus.kafka.ui.model;
 import static java.util.stream.Collectors.collectingAndThen;
 import static java.util.stream.Collectors.groupingBy;
 import static java.util.stream.Collectors.summarizingLong;
-import static java.util.stream.Collectors.toList;
 
+import jakarta.annotation.Nullable;
+import java.util.HashMap;
 import java.util.List;
 import java.util.LongSummaryStatistics;
 import java.util.Map;
+import java.util.concurrent.atomic.LongAdder;
 import lombok.Value;
+import org.apache.kafka.clients.admin.LogDirDescription;
 import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.common.requests.DescribeLogDirsResponse;
 import reactor.util.function.Tuple2;
 import reactor.util.function.Tuple3;
 import reactor.util.function.Tuples;
@@ -20,8 +22,8 @@ public class InternalLogDirStats {
 
   @Value
   public static class SegmentStats {
-    long segmentSize;
-    int segmentsCount;
+    Long segmentSize;
+    Integer segmentsCount;
 
     public SegmentStats(LongSummaryStatistics s) {
       segmentSize = s.getSum();
@@ -29,22 +31,29 @@ public class InternalLogDirStats {
     }
   }
 
+  public record LogDirSpaceStats(@Nullable Long totalBytes,
+                                 @Nullable Long usableBytes,
+                                 Map<String, Long> totalPerDir,
+                                 Map<String, Long> usablePerDir) {
+  }
+
   Map<TopicPartition, SegmentStats> partitionsStats;
   Map<String, SegmentStats> topicStats;
   Map<Integer, SegmentStats> brokerStats;
+  Map<Integer, LogDirSpaceStats> brokerDirsStats;
 
   public static InternalLogDirStats empty() {
     return new InternalLogDirStats(Map.of());
   }
 
-  public InternalLogDirStats(Map<Integer, Map<String, DescribeLogDirsResponse.LogDirInfo>> log) {
+  public InternalLogDirStats(Map<Integer, Map<String, LogDirDescription>> log) {
     final List<Tuple3<Integer, TopicPartition, Long>> topicPartitions =
         log.entrySet().stream().flatMap(b ->
             b.getValue().entrySet().stream().flatMap(topicMap ->
-                topicMap.getValue().replicaInfos.entrySet().stream()
-                    .map(e -> Tuples.of(b.getKey(), e.getKey(), e.getValue().size))
+                topicMap.getValue().replicaInfos().entrySet().stream()
+                    .map(e -> Tuples.of(b.getKey(), e.getKey(), e.getValue().size()))
             )
-        ).collect(toList());
+        ).toList();
 
     partitionsStats = topicPartitions.stream().collect(
         groupingBy(
@@ -64,5 +73,32 @@ public class InternalLogDirStats {
             Tuple2::getT1,
             collectingAndThen(
                 summarizingLong(Tuple3::getT3), SegmentStats::new)));
+
+    brokerDirsStats = calculateSpaceStats(log);
+  }
+
+  private static Map<Integer, LogDirSpaceStats> calculateSpaceStats(Map<Integer, Map<String, LogDirDescription>> log) {
+    var stats = new HashMap<Integer, LogDirSpaceStats>();
+    log.forEach((brokerId, logDirStats) -> {
+      Map<String, Long> totalBytes = new HashMap<>();
+      Map<String, Long> usableBytes = new HashMap<>();
+      logDirStats.forEach((logDir, descr) -> {
+        if (descr.error() == null) {
+          return;
+        }
+        descr.totalBytes().ifPresent(b -> totalBytes.merge(logDir, b, Long::sum));
+        descr.usableBytes().ifPresent(b -> usableBytes.merge(logDir, b, Long::sum));
+      });
+      stats.put(
+          brokerId,
+          new LogDirSpaceStats(
+              totalBytes.isEmpty() ? null : totalBytes.values().stream().mapToLong(i -> i).sum(),
+              usableBytes.isEmpty() ? null : usableBytes.values().stream().mapToLong(i -> i).sum(),
+              totalBytes,
+              usableBytes
+          )
+      );
+    });
+    return stats;
   }
 }

+ 2 - 5
kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/InternalTopic.java

@@ -1,6 +1,5 @@
 package com.provectus.kafka.ui.model;
 
-import com.provectus.kafka.ui.config.ClustersProperties;
 import java.math.BigDecimal;
 import java.util.List;
 import java.util.Map;
@@ -16,8 +15,6 @@ import org.apache.kafka.common.TopicPartition;
 @Builder(toBuilder = true)
 public class InternalTopic {
 
-  ClustersProperties clustersProperties;
-
   // from TopicDescription
   private final String name;
   private final boolean internal;
@@ -114,8 +111,8 @@ public class InternalTopic {
       topic.segmentSize(segmentStats.getSegmentSize());
     }
 
-    topic.bytesInPerSec(metrics.getTopicBytesInPerSec().get(topicDescription.name()));
-    topic.bytesOutPerSec(metrics.getTopicBytesOutPerSec().get(topicDescription.name()));
+//    topic.bytesInPerSec(metrics.getTopicBytesInPerSec().get(topicDescription.name()));
+//    topic.bytesOutPerSec(metrics.getTopicBytesOutPerSec().get(topicDescription.name()));
 
     topic.topicConfigs(
         configs.stream().map(InternalTopicConfig::from).collect(Collectors.toList()));

+ 1 - 1
kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/KafkaCluster.java

@@ -5,7 +5,7 @@ import com.provectus.kafka.ui.connect.api.KafkaConnectClientApi;
 import com.provectus.kafka.ui.emitter.PollingSettings;
 import com.provectus.kafka.ui.service.ksql.KsqlApiClient;
 import com.provectus.kafka.ui.service.masking.DataMasking;
-import com.provectus.kafka.ui.service.metrics.v2.scrape.MetricsScrapping;
+import com.provectus.kafka.ui.service.metrics.scrape.MetricsScrapping;
 import com.provectus.kafka.ui.sr.api.KafkaSrClientApi;
 import com.provectus.kafka.ui.util.ReactiveFailover;
 import java.util.Map;

+ 51 - 17
kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/Metrics.java

@@ -3,9 +3,9 @@ package com.provectus.kafka.ui.model;
 import static io.prometheus.client.Collector.*;
 import static java.util.stream.Collectors.toMap;
 
-import com.provectus.kafka.ui.service.metrics.RawMetric;
-import com.provectus.kafka.ui.service.metrics.v2.scrape.inferred.InferredMetrics;
-import io.prometheus.client.Collector;
+import com.google.common.collect.Streams;
+import com.provectus.kafka.ui.service.metrics.scrape.inferred.InferredMetrics;
+import groovy.lang.Tuple;
 import java.math.BigDecimal;
 import java.util.Collection;
 import java.util.List;
@@ -19,9 +19,13 @@ import lombok.Value;
 @Value
 public class Metrics {
 
+  IoRates ioRates;
+  InferredMetrics inferredMetrics;
+  Map<Integer, List<MetricFamilySamples>> perBrokerScrapedMetrics;
+
   public static Metrics empty() {
     return Metrics.builder()
-        .ioRates(null) //TODO: empty
+        .ioRates(IoRates.empty())
         .perBrokerScrapedMetrics(Map.of())
         .inferredMetrics(InferredMetrics.empty())
         .build();
@@ -32,22 +36,52 @@ public class Metrics {
                         Map<Integer, BigDecimal> brokerBytesOutPerSec,
                         Map<String, BigDecimal> topicBytesInPerSec,
                         Map<String, BigDecimal> topicBytesOutPerSec) {
+
+    public static IoRates empty() {
+      return IoRates.builder()
+          .brokerBytesOutPerSec(Map.of())
+          .brokerBytesInPerSec(Map.of())
+          .topicBytesOutPerSec(Map.of())
+          .topicBytesInPerSec(Map.of())
+          .build();
+    }
   }
 
-  IoRates ioRates;
-  InferredMetrics inferredMetrics;
-  Map<Integer, List<MetricFamilySamples>> perBrokerScrapedMetrics;
+  public Stream<MetricFamilySamples> getSummarizedBrokersMetrics() {
+    return Streams.concat(
+        inferredMetrics.asList().stream(),
+        perBrokerScrapedMetrics
+            .values()
+            .stream()
+            .flatMap(Collection::stream)
+            .collect(toMap(mfs -> mfs.name, mfs -> mfs, Metrics::summarizeMfs))
+            .values()
+            .stream()
+    );
+  }
 
-  @Deprecated
-  public Stream<RawMetric> getSummarizedMetrics() {
-    return perBrokerScrapedMetrics
-        .values()
-        .stream()
-        .flatMap(Collection::stream)
-        .flatMap(RawMetric::create)
-        .collect(toMap(RawMetric::identityKey, m -> m, (m1, m2) -> m1.copyWithValue(m1.value().add(m2.value()))))
-        .values()
-        .stream();
+  private static MetricFamilySamples summarizeMfs(MetricFamilySamples mfs1, MetricFamilySamples mfs2) {
+    return new MetricFamilySamples(
+        mfs1.name,
+        mfs1.type,
+        mfs1.help,
+        Stream.concat(mfs1.samples.stream(), mfs2.samples.stream())
+            .collect(
+                toMap(
+                    s -> Tuple.tuple(s.labelNames, s.labelValues),
+                    s -> s,
+                    (s1, s2) -> new MetricFamilySamples.Sample(
+                        s1.name,
+                        s1.labelNames,
+                        s1.labelValues,
+                        s1.value + s2.value
+                    )
+                )
+            )
+            .values()
+            .stream()
+            .toList()
+    );
   }
 
 }

+ 1 - 1
kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/Statistics.java

@@ -1,7 +1,7 @@
 package com.provectus.kafka.ui.model;
 
 import com.provectus.kafka.ui.service.ReactiveAdminClient;
-import com.provectus.kafka.ui.service.metrics.v2.scrape.ScrapedClusterState;
+import com.provectus.kafka.ui.service.metrics.scrape.ScrapedClusterState;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;

+ 6 - 5
kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/BrokerService.java

@@ -1,5 +1,7 @@
 package com.provectus.kafka.ui.service;
 
+import static io.prometheus.client.Collector.*;
+
 import com.provectus.kafka.ui.exception.InvalidRequestApiException;
 import com.provectus.kafka.ui.exception.LogDirNotFoundApiException;
 import com.provectus.kafka.ui.exception.NotFoundException;
@@ -11,7 +13,6 @@ import com.provectus.kafka.ui.model.InternalBroker;
 import com.provectus.kafka.ui.model.InternalBrokerConfig;
 import com.provectus.kafka.ui.model.KafkaCluster;
 import com.provectus.kafka.ui.model.PartitionDistributionStats;
-import com.provectus.kafka.ui.service.metrics.RawMetric;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
@@ -20,13 +21,13 @@ import java.util.stream.Collectors;
 import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.kafka.clients.admin.ConfigEntry;
+import org.apache.kafka.clients.admin.LogDirDescription;
 import org.apache.kafka.common.Node;
 import org.apache.kafka.common.TopicPartitionReplica;
 import org.apache.kafka.common.errors.InvalidRequestException;
 import org.apache.kafka.common.errors.LogDirNotFoundException;
 import org.apache.kafka.common.errors.TimeoutException;
 import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
-import org.apache.kafka.common.requests.DescribeLogDirsResponse;
 import org.springframework.stereotype.Service;
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
@@ -109,7 +110,7 @@ public class BrokerService {
         .doOnError(e -> log.error("Unexpected error", e));
   }
 
-  private Mono<Map<Integer, Map<String, DescribeLogDirsResponse.LogDirInfo>>> getClusterLogDirs(
+  private Mono<Map<Integer, Map<String, LogDirDescription>>> getClusterLogDirs(
       KafkaCluster cluster, List<Integer> reqBrokers) {
     return adminClientService.get(cluster)
         .flatMap(admin -> {
@@ -138,8 +139,8 @@ public class BrokerService {
     return getBrokersConfig(cluster, brokerId);
   }
 
-  public Mono<List<RawMetric>> getBrokerMetrics(KafkaCluster cluster, Integer brokerId) {
-    return Mono.justOrEmpty(statisticsCache.get(cluster).getMetrics().getPerBrokerMetrics().get(brokerId));
+  public Mono<List<MetricFamilySamples>> getBrokerMetrics(KafkaCluster cluster, Integer brokerId) {
+    return Mono.justOrEmpty(statisticsCache.get(cluster).getMetrics().getPerBrokerScrapedMetrics().get(brokerId));
   }
 
 }

+ 2 - 2
kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/KafkaClusterFactory.java

@@ -10,8 +10,8 @@ import com.provectus.kafka.ui.model.ClusterConfigValidationDTO;
 import com.provectus.kafka.ui.model.KafkaCluster;
 import com.provectus.kafka.ui.service.ksql.KsqlApiClient;
 import com.provectus.kafka.ui.service.masking.DataMasking;
-import com.provectus.kafka.ui.service.metrics.v2.scrape.jmx.JmxMetricsRetriever;
-import com.provectus.kafka.ui.service.metrics.v2.scrape.MetricsScrapping;
+import com.provectus.kafka.ui.service.metrics.scrape.jmx.JmxMetricsRetriever;
+import com.provectus.kafka.ui.service.metrics.scrape.MetricsScrapping;
 import com.provectus.kafka.ui.sr.ApiClient;
 import com.provectus.kafka.ui.sr.api.KafkaSrClientApi;
 import com.provectus.kafka.ui.util.KafkaServicesValidation;

+ 3 - 12
kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/ReactiveAdminClient.java

@@ -12,9 +12,7 @@ import com.google.common.collect.Table;
 import com.provectus.kafka.ui.exception.IllegalEntityStateException;
 import com.provectus.kafka.ui.exception.NotFoundException;
 import com.provectus.kafka.ui.exception.ValidationException;
-import com.provectus.kafka.ui.model.KafkaCluster;
 import com.provectus.kafka.ui.util.KafkaVersion;
-import com.provectus.kafka.ui.util.SslPropertiesUtil;
 import com.provectus.kafka.ui.util.annotation.KafkaClientInternalsDependant;
 import java.io.Closeable;
 import java.time.Duration;
@@ -53,6 +51,7 @@ import org.apache.kafka.clients.admin.DescribeConfigsOptions;
 import org.apache.kafka.clients.admin.ListConsumerGroupOffsetsSpec;
 import org.apache.kafka.clients.admin.ListOffsetsResult;
 import org.apache.kafka.clients.admin.ListTopicsOptions;
+import org.apache.kafka.clients.admin.LogDirDescription;
 import org.apache.kafka.clients.admin.NewPartitionReassignment;
 import org.apache.kafka.clients.admin.NewPartitions;
 import org.apache.kafka.clients.admin.NewTopic;
@@ -81,7 +80,6 @@ import org.apache.kafka.common.errors.SecurityDisabledException;
 import org.apache.kafka.common.errors.TopicAuthorizationException;
 import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
 import org.apache.kafka.common.errors.UnsupportedVersionException;
-import org.apache.kafka.common.requests.DescribeLogDirsResponse;
 import org.apache.kafka.common.resource.ResourcePatternFilter;
 import org.apache.kafka.common.serialization.BytesDeserializer;
 import org.apache.kafka.common.utils.Bytes;
@@ -379,15 +377,8 @@ public class ReactiveAdminClient implements Closeable {
     );
   }
 
-  public Mono<Map<Integer, Map<String, DescribeLogDirsResponse.LogDirInfo>>> describeLogDirs() {
-    return describeCluster()
-        .map(d -> d.getNodes().stream().map(Node::id).collect(toList()))
-        .flatMap(this::describeLogDirs);
-  }
-
-  public Mono<Map<Integer, Map<String, DescribeLogDirsResponse.LogDirInfo>>> describeLogDirs(
-      Collection<Integer> brokerIds) {
-    return toMono(client.describeLogDirs(brokerIds).all())
+  public Mono<Map<Integer, Map<String, LogDirDescription>>> describeLogDirs(Collection<Integer> brokerIds) {
+    return toMono(client.describeLogDirs(brokerIds).allDescriptions())
         .onErrorResume(UnsupportedVersionException.class, th -> Mono.just(Map.of()))
         .onErrorResume(ClusterAuthorizationException.class, th -> Mono.just(Map.of()))
         .onErrorResume(th -> true, th -> {

+ 11 - 20
kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/StatisticsService.java

@@ -2,24 +2,18 @@ package com.provectus.kafka.ui.service;
 
 import static com.provectus.kafka.ui.service.ReactiveAdminClient.ClusterDescription;
 
-import com.provectus.kafka.ui.model.ClusterFeature;
 import com.provectus.kafka.ui.model.InternalLogDirStats;
 import com.provectus.kafka.ui.model.KafkaCluster;
 import com.provectus.kafka.ui.model.Metrics;
 import com.provectus.kafka.ui.model.ServerStatusDTO;
 import com.provectus.kafka.ui.model.Statistics;
-import com.provectus.kafka.ui.service.metrics.v2.scrape.ScrapedClusterState;
-import java.util.List;
+import com.provectus.kafka.ui.service.metrics.scrape.ScrapedClusterState;
 import java.util.Map;
 import java.util.stream.Collectors;
 import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
-import org.apache.kafka.clients.admin.ConfigEntry;
-import org.apache.kafka.clients.admin.TopicDescription;
-import org.apache.kafka.common.Node;
 import org.springframework.stereotype.Service;
 import reactor.core.publisher.Mono;
-import reactor.util.function.Tuple2;
 
 @Service
 @RequiredArgsConstructor
@@ -53,6 +47,16 @@ public class StatisticsService {
                                             .metrics(metrics)
                                             .features(featuresAndState.getT1())
                                             .clusterState(featuresAndState.getT2())
+                                            //TODO: RM ->>>
+                                            .topicDescriptions(
+                                                featuresAndState.getT2().getTopicStates().entrySet().stream()
+                                                    .collect(Collectors.toMap(
+                                                        Map.Entry::getKey, e -> e.getValue().description())))
+                                            .topicConfigs(
+                                                featuresAndState.getT2().getTopicStates().entrySet().stream()
+                                                    .collect(Collectors.toMap(
+                                                        Map.Entry::getKey, e -> e.getValue().configs())))
+                                            .logDirInfo(InternalLogDirStats.empty())
                                             .build())))))
         .doOnError(e ->
             log.error("Failed to collect cluster {} info", cluster.getName(), e))
@@ -60,19 +64,6 @@ public class StatisticsService {
             e -> Mono.just(Statistics.empty().toBuilder().lastKafkaException(e).build()));
   }
 
-  private Mono<InternalLogDirStats> getLogDirInfo(ClusterDescription desc, ReactiveAdminClient ac) {
-    var brokerIds = desc.getNodes().stream().map(Node::id).collect(Collectors.toSet());
-    return ac.describeLogDirs(brokerIds).map(InternalLogDirStats::new);
-  }
-
-  private Mono<Map<String, TopicDescription>> describeTopics(KafkaCluster c) {
-    return adminClientService.get(c).flatMap(ReactiveAdminClient::describeTopics);
-  }
-
-  private Mono<Map<String, List<ConfigEntry>>> loadTopicConfigs(KafkaCluster c) {
-    return adminClientService.get(c).flatMap(ReactiveAdminClient::getTopicsConfig);
-  }
-
   private Mono<ScrapedClusterState> loadClusterState(ClusterDescription clusterDescription,
                                                      ReactiveAdminClient ac) {
     return ScrapedClusterState.scrape(clusterDescription, ac);

+ 19 - 24
kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/metrics/RawMetric.java

@@ -2,16 +2,15 @@ package com.provectus.kafka.ui.service.metrics;
 
 import static io.prometheus.client.Collector.*;
 
-import io.prometheus.client.Collector;
 import java.math.BigDecimal;
+import java.util.ArrayList;
 import java.util.Collection;
+import java.util.LinkedHashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 import java.util.stream.Stream;
-import lombok.AllArgsConstructor;
-import lombok.EqualsAndHashCode;
-import lombok.ToString;
 
 public interface RawMetric {
 
@@ -21,24 +20,27 @@ public interface RawMetric {
 
   BigDecimal value();
 
-  // Key, that can be used for metrics reductions
-  default Object identityKey() {
-    return name() + "_" + labels();
-  }
-
-  RawMetric copyWithValue(BigDecimal newValue);
-
   //--------------------------------------------------
 
-  static Stream<MetricFamilySamples> groupIntoMFS(Collection<RawMetric> lst) {
-    //TODO: impl
-    return null;
-  }
-
   static RawMetric create(String name, Map<String, String> labels, BigDecimal value) {
     return new SimpleMetric(name, labels, value);
   }
 
+  static Stream<MetricFamilySamples> groupIntoMFS(Collection<RawMetric> lst) {
+    Map<String, MetricFamilySamples> map = new LinkedHashMap<>();
+    for (RawMetric m : lst) {
+      var mfs = map.get(m.name());
+      if (mfs == null) {
+        mfs = new MetricFamilySamples(m.name(), Type.GAUGE, m.name(), new ArrayList<>());
+        map.put(m.name(), mfs);
+      }
+      List<String> lbls = m.labels().keySet().stream().toList();
+      List<String> lblVals = lbls.stream().map(l -> m.labels().get(l)).toList();
+      mfs.samples.add(new MetricFamilySamples.Sample(m.name(), lbls, lblVals, m.value().doubleValue()));
+    }
+    return map.values().stream();
+  }
+
   static Stream<RawMetric> create(MetricFamilySamples samples) {
     return samples.samples.stream()
         .map(s -> create(
@@ -51,14 +53,7 @@ public interface RawMetric {
         );
   }
 
-  record SimpleMetric(String name,
-                      Map<String, String> labels,
-                      BigDecimal value) implements RawMetric {
-
-    @Override
-    public RawMetric copyWithValue(BigDecimal newValue) {
-      return new SimpleMetric(name, labels, newValue);
-    }
+  record SimpleMetric(String name, Map<String, String> labels, BigDecimal value) implements RawMetric {
   }
 
 }

+ 9 - 9
kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/metrics/v2/scrape/MetricsScrapping.java → kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/metrics/scrape/MetricsScrapping.java

@@ -1,17 +1,18 @@
-package com.provectus.kafka.ui.service.metrics.v2.scrape;
+package com.provectus.kafka.ui.service.metrics.scrape;
 
 import static com.provectus.kafka.ui.config.ClustersProperties.*;
 import static com.provectus.kafka.ui.model.MetricsScrapeProperties.*;
 
 import com.provectus.kafka.ui.model.Metrics;
 import com.provectus.kafka.ui.model.MetricsScrapeProperties;
-import com.provectus.kafka.ui.service.metrics.v2.scrape.inferred.InferredMetrics;
-import com.provectus.kafka.ui.service.metrics.v2.scrape.inferred.InferredMetricsScraper;
-import com.provectus.kafka.ui.service.metrics.v2.scrape.jmx.JmxMetricsRetriever;
-import com.provectus.kafka.ui.service.metrics.v2.scrape.jmx.JmxMetricsScraper;
-import com.provectus.kafka.ui.service.metrics.v2.scrape.prometheus.PrometheusScraper;
+import com.provectus.kafka.ui.service.metrics.scrape.inferred.InferredMetrics;
+import com.provectus.kafka.ui.service.metrics.scrape.inferred.InferredMetricsScraper;
+import com.provectus.kafka.ui.service.metrics.scrape.jmx.JmxMetricsRetriever;
+import com.provectus.kafka.ui.service.metrics.scrape.jmx.JmxMetricsScraper;
+import com.provectus.kafka.ui.service.metrics.scrape.prometheus.PrometheusScraper;
 import jakarta.annotation.Nullable;
 import java.util.Collection;
+import java.util.Optional;
 import lombok.RequiredArgsConstructor;
 import org.apache.kafka.common.Node;
 import reactor.core.publisher.Mono;
@@ -29,7 +30,6 @@ public class MetricsScrapping {
 
   public static MetricsScrapping create(Cluster cluster,
                                         JmxMetricsRetriever jmxMetricsRetriever) {
-    InferredMetricsScraper inferredMetricsScraper = new InferredMetricsScraper();
     JmxMetricsScraper jmxMetricsScraper = null;
     PrometheusScraper prometheusScraper = null;
 
@@ -42,14 +42,14 @@ public class MetricsScrapping {
         prometheusScraper = new PrometheusScraper(scrapeProperties);
       }
     }
-    return new MetricsScrapping(inferredMetricsScraper, jmxMetricsScraper, prometheusScraper);
+    return new MetricsScrapping(new InferredMetricsScraper(), jmxMetricsScraper, prometheusScraper);
   }
 
   private static MetricsScrapeProperties createScrapeProps(Cluster cluster) {
     var metrics = cluster.getMetrics();
     return MetricsScrapeProperties.builder()
         .port(metrics.getPort())
-        .ssl(metrics.getSsl())
+        .ssl(Optional.ofNullable(metrics.getSsl()).orElse(false))
         .username(metrics.getUsername())
         .password(metrics.getPassword())
         .truststoreConfig(cluster.getSsl())

+ 1 - 1
kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/metrics/v2/scrape/PerBrokerScrapedMetrics.java → kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/metrics/scrape/PerBrokerScrapedMetrics.java

@@ -1,4 +1,4 @@
-package com.provectus.kafka.ui.service.metrics.v2.scrape;
+package com.provectus.kafka.ui.service.metrics.scrape;
 
 import com.provectus.kafka.ui.model.Metrics;
 import io.prometheus.client.Collector;

+ 157 - 0
kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/metrics/scrape/ScrapedClusterState.java

@@ -0,0 +1,157 @@
+package com.provectus.kafka.ui.service.metrics.scrape;
+
+import static com.provectus.kafka.ui.model.InternalLogDirStats.*;
+import static com.provectus.kafka.ui.service.ReactiveAdminClient.*;
+
+import com.google.common.collect.Table;
+import com.provectus.kafka.ui.model.InternalLogDirStats;
+import com.provectus.kafka.ui.service.ReactiveAdminClient;
+import jakarta.annotation.Nullable;
+import java.time.Instant;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.stream.Collectors;
+import lombok.Builder;
+import lombok.Value;
+import org.apache.kafka.clients.admin.ConfigEntry;
+import org.apache.kafka.clients.admin.ConsumerGroupDescription;
+import org.apache.kafka.clients.admin.ConsumerGroupListing;
+import org.apache.kafka.clients.admin.OffsetSpec;
+import org.apache.kafka.clients.admin.TopicDescription;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.TopicPartition;
+import reactor.core.publisher.Mono;
+
+@Builder
+@Value
+public class ScrapedClusterState {
+
+  public record NodeState(int id,
+                          Node node,
+                          @Nullable SegmentStats segmentStats,
+                          @Nullable LogDirSpaceStats logDirSpaceStats) {
+  }
+
+  public record TopicState(
+      String name,
+      TopicDescription description,
+      List<ConfigEntry> configs,
+      Map<Integer, Long> startOffsets,
+      Map<Integer, Long> endOffsets,
+      @Nullable SegmentStats segmentStats,
+      @Nullable Map<Integer, SegmentStats> partitionsSegmentStats) {
+  }
+
+  public record ConsumerGroupState(
+      String group,
+      ConsumerGroupDescription description,
+      Map<TopicPartition, Long> committedOffsets) {
+  }
+
+  Instant scrapeStartTime;
+  Map<Integer, NodeState> nodesStates;
+  Map<String, TopicState> topicStates;
+  Map<String, ConsumerGroupState> consumerGroupsStates;
+
+  public static ScrapedClusterState empty() {
+    return ScrapedClusterState.builder()
+        .scrapeStartTime(Instant.now())
+        .nodesStates(Map.of())
+        .topicStates(Map.of())
+        .consumerGroupsStates(Map.of())
+        .build();
+  }
+
+  public static Mono<ScrapedClusterState> scrape(ClusterDescription clusterDescription,
+                                                 ReactiveAdminClient ac) {
+    return Mono.zip(
+        ac.describeLogDirs(clusterDescription.getNodes().stream().map(Node::id).toList())
+            .map(InternalLogDirStats::new),
+        ac.listConsumerGroups().map(l -> l.stream().map(ConsumerGroupListing::groupId).toList()),
+        ac.describeTopics(),
+        ac.getTopicsConfig()
+    ).flatMap(phase1 ->
+        Mono.zip(
+            ac.listOffsets(phase1.getT3().values(), OffsetSpec.latest()),
+            ac.listOffsets(phase1.getT3().values(), OffsetSpec.earliest()),
+            ac.describeConsumerGroups(phase1.getT2()),
+            ac.listConsumerGroupOffsets(phase1.getT2(), null)
+        ).map(phase2 ->
+            create(
+                clusterDescription,
+                phase1.getT1(),
+                phase1.getT3(),
+                phase1.getT4(),
+                phase2.getT1(),
+                phase2.getT2(),
+                phase2.getT3(),
+                phase2.getT4()
+            )));
+  }
+
+  private static ScrapedClusterState create(ClusterDescription clusterDescription,
+                                            InternalLogDirStats segmentStats,
+                                            Map<String, TopicDescription> topicDescriptions,
+                                            Map<String, List<ConfigEntry>> topicConfigs,
+                                            Map<TopicPartition, Long> latestOffsets,
+                                            Map<TopicPartition, Long> earliestOffsets,
+                                            Map<String, ConsumerGroupDescription> consumerDescriptions,
+                                            Table<String, TopicPartition, Long> consumerOffsets) {
+
+
+    Map<String, TopicState> topicStates = new HashMap<>();
+    topicDescriptions.forEach((name, desc) ->
+        topicStates.put(
+            name,
+            new TopicState(
+                name,
+                desc,
+                topicConfigs.getOrDefault(name, List.of()),
+                cutTopic(name, earliestOffsets),
+                cutTopic(name, latestOffsets),
+                segmentStats.getTopicStats().get(name),
+                Optional.ofNullable(segmentStats.getPartitionsStats())
+                    .map(topicForFilter -> cutTopic(name, topicForFilter))
+                    .orElse(null)
+            )));
+
+    Map<String, ConsumerGroupState> consumerGroupsStates = new HashMap<>();
+    consumerDescriptions.forEach((name, desc) ->
+        consumerGroupsStates.put(
+            name,
+            new ConsumerGroupState(
+                name,
+                desc,
+                consumerOffsets.row(name)
+            )));
+
+    Map<Integer, NodeState> nodesStates = new HashMap<>();
+    clusterDescription.getNodes().forEach(node ->
+        nodesStates.put(
+            node.id(),
+            new NodeState(
+                node.id(),
+                node,
+                segmentStats.getBrokerStats().get(node.id()),
+                segmentStats.getBrokerDirsStats().get(node.id())
+            )));
+
+    return new ScrapedClusterState(
+        Instant.now(),
+        Map.copyOf(nodesStates),
+        Map.copyOf(topicStates),
+        Map.copyOf(consumerGroupsStates)
+    );
+  }
+
+  private static <T> Map<Integer, T> cutTopic(String topicForFilter, Map<TopicPartition, T> tpMap) {
+    return tpMap.entrySet()
+        .stream()
+        .filter(tp -> tp.getKey().topic().equals(topicForFilter))
+        .collect(Collectors.toMap(e -> e.getKey().partition(), Map.Entry::getValue));
+  }
+
+
+}

+ 1 - 1
kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/metrics/v2/scrape/WellKnownMetrics.java → kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/metrics/scrape/WellKnownMetrics.java

@@ -1,4 +1,4 @@
-package com.provectus.kafka.ui.service.metrics.v2.scrape;
+package com.provectus.kafka.ui.service.metrics.scrape;
 
 import static org.apache.commons.lang3.StringUtils.containsIgnoreCase;
 import static org.apache.commons.lang3.StringUtils.endsWithIgnoreCase;

+ 2 - 1
kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/metrics/v2/scrape/inferred/InferredMetrics.java → kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/metrics/scrape/inferred/InferredMetrics.java

@@ -1,9 +1,10 @@
-package com.provectus.kafka.ui.service.metrics.v2.scrape.inferred;
+package com.provectus.kafka.ui.service.metrics.scrape.inferred;
 
 import static io.prometheus.client.Collector.MetricFamilySamples;
 
 import java.util.List;
 
+//TODO: maybe rename to state-based metrics?
 public class InferredMetrics {
 
   private final List<MetricFamilySamples> metrics;

+ 234 - 0
kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/metrics/scrape/inferred/InferredMetricsScraper.java

@@ -0,0 +1,234 @@
+package com.provectus.kafka.ui.service.metrics.scrape.inferred;
+
+import com.provectus.kafka.ui.service.metrics.scrape.ScrapedClusterState;
+import io.prometheus.client.Collector.MetricFamilySamples;
+import io.prometheus.client.GaugeMetricFamily;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.clients.admin.MemberDescription;
+import org.apache.kafka.common.Node;
+import reactor.core.publisher.Mono;
+
+@Slf4j
+@RequiredArgsConstructor
+public class InferredMetricsScraper {
+
+  private ScrapedClusterState prevState = null;
+
+  public synchronized Mono<InferredMetrics> scrape(ScrapedClusterState newState) {
+    if (prevState == null) {
+      prevState = newState;
+      return Mono.just(InferredMetrics.empty());
+    }
+    var inferred = infer(prevState, newState);
+    prevState = newState;
+    return Mono.just(inferred);
+  }
+
+  private static InferredMetrics infer(ScrapedClusterState prevState,
+                                       ScrapedClusterState newState) {
+
+    log.debug("Scraped cluster state: {}", newState); //TODO: rm
+    var registry = new MetricsRegistry();
+    fillNodesMetrics(registry, newState);
+    fillTopicMetrics(registry, newState);
+    fillConsumerGroupsMetrics(registry, newState);
+    List<MetricFamilySamples> metrics = registry.metrics.values().stream().toList();
+    log.debug("{} metrics inferred from cluster state", metrics.size());
+    return new InferredMetrics(metrics);
+  }
+
+  private static class MetricsRegistry {
+
+    final Map<String, MetricFamilySamples> metrics = new LinkedHashMap<>();
+
+    void gauge(String name,
+               String help,
+               List<String> lbls,
+               List<String> lblVals,
+               Number value) {
+      var found = metrics.get(name);
+      GaugeMetricFamily gauge;
+      if (found != null) {
+        gauge = (GaugeMetricFamily) found;
+      } else {
+        gauge = new GaugeMetricFamily(name, help, lbls);
+        metrics.put(name, gauge);
+      }
+      gauge.addMetric(lblVals, value.doubleValue());
+    }
+  }
+
+  private static void fillNodesMetrics(MetricsRegistry registry, ScrapedClusterState newState) {
+    registry.gauge(
+        "broker_count",
+        "Number of brokers in the Kafka cluster",
+        List.of(),
+        List.of(),
+        newState.getNodesStates().size()
+    );
+
+    newState.getNodesStates().forEach((nodeId, state) -> {
+      if (state.segmentStats() != null) {
+        registry.gauge(
+            "broker_bytes_disk",
+            "Written disk size in bytes of a broker",
+            List.of("node_id"),
+            List.of(nodeId.toString()),
+            state.segmentStats().getSegmentSize()
+        );
+      }
+      if (state.logDirSpaceStats() != null) {
+        if (state.logDirSpaceStats().usableBytes() != null) {
+          registry.gauge(
+              "broker_bytes_usable",
+              "Usable disk size in bytes of a broker",
+              List.of("node_id"),
+              List.of(nodeId.toString()),
+              state.logDirSpaceStats().usableBytes()
+          );
+        }
+        if (state.logDirSpaceStats().totalBytes() != null) {
+          registry.gauge(
+              "broker_bytes_total",
+              "Total disk size in bytes of a broker",
+              List.of("node_id"),
+              List.of(nodeId.toString()),
+              state.logDirSpaceStats().totalBytes()
+          );
+        }
+        //TODO: maybe add per-directory stats also
+      }
+    });
+  }
+
+  private static void fillTopicMetrics(MetricsRegistry registry, ScrapedClusterState clusterState) {
+    registry.gauge(
+        "topic_count",
+        "Number of topics in the Kafka cluster",
+        List.of(),
+        List.of(),
+        clusterState.getTopicStates().size()
+    );
+
+    clusterState.getTopicStates().forEach((topicName, state) -> {
+      registry.gauge(
+          "kafka_topic_partitions",
+          "Number of partitions for this Topic",
+          List.of("topic"),
+          List.of(topicName),
+          state.description().partitions().size()
+      );
+      state.endOffsets().forEach((partition, endOffset) -> {
+        registry.gauge(
+            "kafka_topic_partition_current_offset",
+            "Current Offset of a Broker at Topic/Partition",
+            List.of("topic", "partition"),
+            List.of(topicName, String.valueOf(partition)),
+            endOffset
+        );
+      });
+      state.startOffsets().forEach((partition, startOffset) -> {
+        registry.gauge(
+            "kafka_topic_partition_oldest_offset",
+            "Oldest Offset of a Broker at Topic/Partition",
+            List.of("topic", "partition"),
+            List.of(topicName, String.valueOf(partition)),
+            startOffset
+        );
+      });
+      state.description().partitions().forEach(p -> {
+        registry.gauge(
+            "kafka_topic_partition_in_sync_replica",
+            "Number of In-Sync Replicas for this Topic/Partition",
+            List.of("topic", "partition"),
+            List.of(topicName, String.valueOf(p.partition())),
+            p.isr().size()
+        );
+        registry.gauge(
+            "kafka_topic_partition_replicas",
+            "Number of Replicas for this Topic/Partition",
+            List.of("topic", "partition"),
+            List.of(topicName, String.valueOf(p.partition())),
+            p.replicas().size()
+        );
+        registry.gauge(
+            "kafka_topic_partition_leader",
+            "Leader Broker ID of this Topic/Partition (-1, if no leader)",
+            List.of("topic", "partition"),
+            List.of(topicName, String.valueOf(p.partition())),
+            Optional.ofNullable(p.leader()).map(Node::id).orElse(-1)
+        );
+      });
+      if (state.segmentStats() != null) {
+        registry.gauge(
+            "topic_bytes_disk",
+            "Disk size in bytes of a topic",
+            List.of("topic"),
+            List.of(topicName),
+            state.segmentStats().getSegmentSize()
+        );
+      }
+    });
+  }
+
+  private static void fillConsumerGroupsMetrics(MetricsRegistry registry, ScrapedClusterState clusterState) {
+    registry.gauge(
+        "group_count",
+        "Number of consumer groups in the Kafka cluster",
+        List.of(),
+        List.of(),
+        clusterState.getConsumerGroupsStates().size()
+    );
+
+    clusterState.getConsumerGroupsStates().forEach((groupName, state) -> {
+      registry.gauge(
+          "group_state",
+          "State of the consumer group, value = ordinal of org.apache.kafka.common.ConsumerGroupState",
+          List.of("group"),
+          List.of(groupName),
+          state.description().state().ordinal()
+      );
+      registry.gauge(
+          "group_member_count",
+          "Number of member assignments in the consumer group.",
+          List.of("group"),
+          List.of(groupName),
+          state.description().members().size()
+      );
+      registry.gauge(
+          "group_host_count",
+          "Number of distinct hosts in the consumer group.",
+          List.of("group"),
+          List.of(groupName),
+          state.description().members().stream().map(MemberDescription::host).distinct().count()
+      );
+
+      state.committedOffsets().forEach((tp, committedOffset) -> {
+        registry.gauge(
+            "kafka_consumergroup_current_offset",
+            "Current Offset of a ConsumerGroup at Topic/Partition",
+            List.of("consumergroup", "topic", "partition"),
+            List.of(groupName, tp.topic(), String.valueOf(tp.partition())),
+            committedOffset
+        );
+
+        Optional.ofNullable(clusterState.getTopicStates().get(tp.topic()))
+            .flatMap(s -> Optional.ofNullable(s.endOffsets().get(tp.partition())))
+            .ifPresent(endOffset ->
+                registry.gauge(
+                    "kafka_consumergroup_lag",
+                    "Current Approximate Lag of a ConsumerGroup at Topic/Partition",
+                    List.of("consumergroup", "topic", "partition"),
+                    List.of(groupName, tp.topic(), String.valueOf(tp.partition())),
+                    endOffset - committedOffset //TODO: check +-1
+                ));
+
+      });
+    });
+  }
+}

+ 1 - 1
kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/metrics/v2/scrape/jmx/JmxMetricsFormatter.java → kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/metrics/scrape/jmx/JmxMetricsFormatter.java

@@ -1,4 +1,4 @@
-package com.provectus.kafka.ui.service.metrics.v2.scrape.jmx;
+package com.provectus.kafka.ui.service.metrics.scrape.jmx;
 
 import com.provectus.kafka.ui.service.metrics.RawMetric;
 import io.prometheus.client.Collector;

+ 1 - 2
kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/metrics/v2/scrape/jmx/JmxMetricsRetriever.java → kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/metrics/scrape/jmx/JmxMetricsRetriever.java

@@ -1,6 +1,5 @@
-package com.provectus.kafka.ui.service.metrics.v2.scrape.jmx;
+package com.provectus.kafka.ui.service.metrics.scrape.jmx;
 
-import com.provectus.kafka.ui.model.KafkaCluster;
 import com.provectus.kafka.ui.model.MetricsScrapeProperties;
 import com.provectus.kafka.ui.service.metrics.RawMetric;
 import java.io.Closeable;

+ 2 - 2
kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/metrics/v2/scrape/jmx/JmxMetricsScraper.java → kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/metrics/scrape/jmx/JmxMetricsScraper.java

@@ -1,10 +1,10 @@
-package com.provectus.kafka.ui.service.metrics.v2.scrape.jmx;
+package com.provectus.kafka.ui.service.metrics.scrape.jmx;
 
 import static io.prometheus.client.Collector.*;
 
 import com.provectus.kafka.ui.model.MetricsScrapeProperties;
 import com.provectus.kafka.ui.service.metrics.RawMetric;
-import com.provectus.kafka.ui.service.metrics.v2.scrape.PerBrokerScrapedMetrics;
+import com.provectus.kafka.ui.service.metrics.scrape.PerBrokerScrapedMetrics;
 import java.util.Collection;
 import java.util.List;
 import java.util.Map;

+ 1 - 1
kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/metrics/v2/scrape/jmx/JmxSslSocketFactory.java → kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/metrics/scrape/jmx/JmxSslSocketFactory.java

@@ -1,4 +1,4 @@
-package com.provectus.kafka.ui.service.metrics.v2.scrape.jmx;
+package com.provectus.kafka.ui.service.metrics.scrape.jmx;
 
 import com.google.common.base.Preconditions;
 import java.io.FileInputStream;

+ 2 - 2
kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/metrics/v2/scrape/prometheus/PrometheusEndpointMetricsParser.java → kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/metrics/scrape/prometheus/PrometheusEndpointMetricsParser.java

@@ -1,4 +1,4 @@
-package com.provectus.kafka.ui.service.metrics.v2.scrape.prometheus;
+package com.provectus.kafka.ui.service.metrics.scrape.prometheus;
 
 import com.provectus.kafka.ui.service.metrics.RawMetric;
 import java.math.BigDecimal;
@@ -11,7 +11,7 @@ import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.lang3.math.NumberUtils;
 
 @Slf4j
-class PrometheusEndpointMetricsParser {
+public class PrometheusEndpointMetricsParser {
 
   /**
    * Matches openmetrics format. For example, string:

+ 2 - 2
kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/metrics/v2/scrape/prometheus/PrometheusMetricsRetriever.java → kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/metrics/scrape/prometheus/PrometheusMetricsRetriever.java

@@ -1,4 +1,4 @@
-package com.provectus.kafka.ui.service.metrics.v2.scrape.prometheus;
+package com.provectus.kafka.ui.service.metrics.scrape.prometheus;
 
 import static io.prometheus.client.Collector.*;
 
@@ -26,7 +26,7 @@ class PrometheusMetricsRetriever {
   private static final String METRICS_ENDPOINT_PATH = "/metrics";
   private static final int DEFAULT_EXPORTER_PORT = 11001;
 
-  public Mono<List<MetricFamilySamples>> retrieve(MetricsScrapeProperties metricsConfig, Node node) {
+  Mono<List<MetricFamilySamples>> retrieve(MetricsScrapeProperties metricsConfig, Node node) {
     log.debug("Retrieving metrics from prometheus exporter: {}:{}", node.host(), metricsConfig.getPort());
 
     var webClient = new WebClientConfigurator()

+ 2 - 2
kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/metrics/v2/scrape/prometheus/PrometheusScraper.java → kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/metrics/scrape/prometheus/PrometheusScraper.java

@@ -1,7 +1,7 @@
-package com.provectus.kafka.ui.service.metrics.v2.scrape.prometheus;
+package com.provectus.kafka.ui.service.metrics.scrape.prometheus;
 
 import com.provectus.kafka.ui.model.MetricsScrapeProperties;
-import com.provectus.kafka.ui.service.metrics.v2.scrape.PerBrokerScrapedMetrics;
+import com.provectus.kafka.ui.service.metrics.scrape.PerBrokerScrapedMetrics;
 import io.prometheus.client.Collector;
 import java.util.Collection;
 import java.util.List;

+ 0 - 93
kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/metrics/v2/scrape/ScrapedClusterState.java

@@ -1,93 +0,0 @@
-package com.provectus.kafka.ui.service.metrics.v2.scrape;
-
-import static com.provectus.kafka.ui.service.ReactiveAdminClient.*;
-
-import com.google.common.collect.Table;
-import com.provectus.kafka.ui.model.InternalLogDirStats;
-import com.provectus.kafka.ui.service.ReactiveAdminClient;
-import java.time.Instant;
-import java.util.Collection;
-import java.util.List;
-import java.util.Map;
-import lombok.Builder;
-import lombok.Value;
-import org.apache.kafka.clients.admin.ConfigEntry;
-import org.apache.kafka.clients.admin.ConsumerGroupDescription;
-import org.apache.kafka.clients.admin.ConsumerGroupListing;
-import org.apache.kafka.clients.admin.TopicDescription;
-import org.apache.kafka.common.ConsumerGroupState;
-import org.apache.kafka.common.requests.DescribeLogDirsResponse;
-import org.apache.kafka.common.requests.DescribeLogDirsResponse.LogDirInfo;
-import org.apache.kafka.common.resource.ResourcePatternFilter;
-import reactor.core.publisher.Mono;
-
-@Builder
-@Value
-public class ScrapedClusterState {
-
-  record NodeState(SegmentStats segmentStats) {
-  }
-
-  record TopicState(
-      Instant scrapeTime,
-      String name,
-      List<ConfigEntry> configs,
-      TopicDescription description,
-      Map<Integer, Long> endOffsets,
-      SegmentStats segmentStats,
-      Map<Integer, SegmentStats> partitionsSegmentStats) {
-  }
-
-  record ConsumerGroupState(
-      Instant scrapeTime,
-      String group,
-      org.apache.kafka.common.ConsumerGroupState state,
-      ConsumerGroupDescription description,
-      Table<String, Integer, Long> committedOffsets,
-      Map<String, Instant> lastTopicActivity) {
-  }
-
-  record SegmentStats(long segmentSize,
-                      int segmentsCount) {
-  }
-
-  Instant scrapeStartTime;
-  Map<Integer, NodeState> nodesStates;
-  Map<String, TopicState> topicStates;
-  Map<String, ConsumerGroupState> consumerGroupsStates;
-
-  public static ScrapedClusterState empty() {
-    return ScrapedClusterState.builder()
-        .scrapeStartTime(Instant.now())
-        .nodesStates(Map.of())
-        .topicStates(Map.of())
-        .consumerGroupsStates(Map.of())
-        .build();
-  }
-
-  public static Mono<ScrapedClusterState> scrape(ClusterDescription clusterDescription,
-                                                 ReactiveAdminClient ac) {
-
-    Mono<InternalLogDirStats> segmentStatsMono = ac.describeLogDirs().map(InternalLogDirStats::new);
-    Mono<List<String>> cgListingsMono = ac.listConsumerGroups().map(l -> l.stream().map(ConsumerGroupListing::groupId).toList());
-    Mono<Map<String, TopicDescription>> topicDescriptionsMono = ac.describeTopics();
-    Mono<Map<String, List<ConfigEntry>>> topicConfigsMono = ac.getTopicsConfig();
-
-    Mono.zip(
-        segmentStatsMono,
-        cgListingsMono,
-        topicDescriptionsMono,
-        topicConfigsMono
-    ).flatMap(tuple -> {
-      InternalLogDirStats segmentStats = tuple.getT1();
-      List<String> consumerGroups = tuple.getT2();
-      Map<String, TopicDescription> topicDescriptions = tuple.getT3();
-      Map<String, List<ConfigEntry>> topicConfigs = tuple.getT4();
-
-      Mono<>
-    })
-
-    return null;//TODO impl
-  }
-
-}

+ 0 - 30
kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/metrics/v2/scrape/inferred/InferredMetricsScraper.java

@@ -1,30 +0,0 @@
-package com.provectus.kafka.ui.service.metrics.v2.scrape.inferred;
-
-import com.provectus.kafka.ui.service.metrics.v2.scrape.ScrapedClusterState;
-import java.util.List;
-import lombok.RequiredArgsConstructor;
-import reactor.core.publisher.Mono;
-
-@RequiredArgsConstructor
-public class InferredMetricsScraper {
-
-  private ScrapedClusterState prevState = null;
-
-  public synchronized Mono<InferredMetrics> scrape(ScrapedClusterState newState) {
-    if (prevState == null) {
-      prevState = newState;
-      return Mono.just(InferredMetrics.empty());
-    }
-    var inferred = infer(prevState, newState);
-    prevState = newState;
-    return Mono.just(inferred);
-  }
-
-  private static InferredMetrics infer(ScrapedClusterState prevState,
-                                       ScrapedClusterState newState) {
-
-    //TODO: impl
-    return new InferredMetrics(List.of());
-  }
-
-}

+ 9 - 40
kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/metrics/WellKnownMetricsTest.java → kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/metrics/scrape/WellKnownMetricsTest.java

@@ -1,20 +1,14 @@
-package com.provectus.kafka.ui.service.metrics;
+package com.provectus.kafka.ui.service.metrics.scrape;
 
 import static org.assertj.core.api.Assertions.assertThat;
 
-import com.provectus.kafka.ui.model.Metrics;
-import com.provectus.kafka.ui.service.metrics.v2.scrape.WellKnownMetrics;
-import com.provectus.kafka.ui.service.metrics.v2.scrape.prometheus.PrometheusEndpointMetricsParser;
 import java.math.BigDecimal;
-import java.util.Arrays;
-import java.util.Map;
-import java.util.Optional;
 import org.apache.kafka.common.Node;
 import org.junit.jupiter.api.Test;
 
 class WellKnownMetricsTest {
 
-  private final WellKnownMetrics wellKnownMetrics = new WellKnownMetrics();
+  private WellKnownMetrics wellKnownMetrics;
 
   @Test
   void bytesIoTopicMetricsPopulated() {
@@ -57,39 +51,14 @@ class WellKnownMetricsTest {
         .containsEntry(2, new BigDecimal("20.0"));
   }
 
-  @Test
-  void appliesInnerStateToMetricsBuilder() {
-    //filling per topic io rates
-    wellKnownMetrics.bytesInFifteenMinuteRate.put("topic", new BigDecimal(1));
-    wellKnownMetrics.bytesOutFifteenMinuteRate.put("topic", new BigDecimal(2));
-
-    //filling per broker io rates
-    wellKnownMetrics.brokerBytesInFifteenMinuteRate.put(1, new BigDecimal(1));
-    wellKnownMetrics.brokerBytesOutFifteenMinuteRate.put(1, new BigDecimal(2));
-    wellKnownMetrics.brokerBytesInFifteenMinuteRate.put(2, new BigDecimal(10));
-    wellKnownMetrics.brokerBytesOutFifteenMinuteRate.put(2, new BigDecimal(20));
-
-    Metrics.MetricsBuilder builder = Metrics.builder();
-    wellKnownMetrics.ioRates(builder);
-    var metrics = builder.build();
-
-    // checking per topic io rates
-    assertThat(metrics.getTopicBytesInPerSec()).containsExactlyEntriesOf(wellKnownMetrics.bytesInFifteenMinuteRate);
-    assertThat(metrics.getTopicBytesOutPerSec()).containsExactlyEntriesOf(wellKnownMetrics.bytesOutFifteenMinuteRate);
-
-    // checking per broker io rates
-    assertThat(metrics.getBrokerBytesInPerSec()).containsExactlyInAnyOrderEntriesOf(
-        Map.of(1, new BigDecimal(1), 2, new BigDecimal(10)));
-    assertThat(metrics.getBrokerBytesOutPerSec()).containsExactlyInAnyOrderEntriesOf(
-        Map.of(1, new BigDecimal(2), 2, new BigDecimal(20)));
-  }
-
   private void populateWith(Node n, String... prometheusMetric) {
-    Arrays.stream(prometheusMetric)
-        .map(PrometheusEndpointMetricsParser::parse)
-        .filter(Optional::isPresent)
-        .map(Optional::get)
-        .forEach(m -> wellKnownMetrics.populate(n, m));
+    //TODO: uncomment
+//    wellKnownMetrics = new WellKnownMetrics(
+//        Arrays.stream(prometheusMetric)
+//        .map(PrometheusEndpointMetricsParser::parse)
+//        .filter(Optional::isPresent)
+//        .map(Optional::get)
+//    );
   }
 
 }

+ 2 - 2
kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/metrics/JmxMetricsFormatterTest.java → kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/metrics/scrape/jmx/JmxMetricsFormatterTest.java

@@ -1,8 +1,8 @@
-package com.provectus.kafka.ui.service.metrics;
+package com.provectus.kafka.ui.service.metrics.scrape.jmx;
 
 import static org.assertj.core.api.Assertions.assertThat;
 
-import com.provectus.kafka.ui.service.metrics.v2.scrape.jmx.JmxMetricsFormatter;
+import com.provectus.kafka.ui.service.metrics.RawMetric;
 import java.math.BigDecimal;
 import java.util.List;
 import java.util.Map;

+ 2 - 2
kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/metrics/PrometheusEndpointMetricsParserTest.java → kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/metrics/scrape/prometheus/PrometheusEndpointMetricsParserTest.java

@@ -1,8 +1,8 @@
-package com.provectus.kafka.ui.service.metrics;
+package com.provectus.kafka.ui.service.metrics.scrape.prometheus;
 
 import static org.assertj.core.api.Assertions.assertThat;
 
-import com.provectus.kafka.ui.service.metrics.v2.scrape.prometheus.PrometheusEndpointMetricsParser;
+import com.provectus.kafka.ui.service.metrics.RawMetric;
 import java.util.Map;
 import java.util.Optional;
 import org.junit.jupiter.api.Test;

+ 3 - 3
kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/metrics/PrometheusMetricsRetrieverTest.java → kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/metrics/scrape/prometheus/PrometheusMetricsRetrieverTest.java

@@ -1,7 +1,7 @@
-package com.provectus.kafka.ui.service.metrics;
+package com.provectus.kafka.ui.service.metrics.scrape.prometheus;
 
 import com.provectus.kafka.ui.model.MetricsScrapeProperties;
-import com.provectus.kafka.ui.service.metrics.v2.scrape.prometheus.PrometheusMetricsRetriever;
+import com.provectus.kafka.ui.service.metrics.RawMetric;
 import java.io.IOException;
 import java.math.BigDecimal;
 import java.util.List;
@@ -74,7 +74,7 @@ class PrometheusMetricsRetrieverTest {
     return MetricsScrapeProperties.builder()
         .ssl(false)
         .port(port)
-        .type(MetricsScrapeProperties.PROMETHEUS_METRICS_TYPE)
+        //.type(MetricsScrapeProperties.PROMETHEUS_METRICS_TYPE)
         .username(username)
         .password(password)
         .build();