Sfoglia il codice sorgente

Metrics & cache refactoring (#1036)

Metrics retrieval refactoring:
* metrics data moved to separate class with its own storage
* topics list pagination logic changed for better actuality
* some mappings moved to InternalXXX classed
Ilya Kuramshin 3 anni fa
parent
commit
43a0e383cf
29 ha cambiato i file con 994 aggiunte e 921 eliminazioni
  1. 4 4
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/ClustersController.java
  2. 7 9
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/TopicsController.java
  3. 7 23
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/mapper/ClusterMapper.java
  4. 1 2
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/CleanupPolicy.java
  5. 11 0
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/InternalBrokerConfig.java
  6. 63 0
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/InternalClusterState.java
  7. 68 0
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/InternalLogDirStats.java
  8. 0 8
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/InternalPartition.java
  9. 33 0
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/InternalPartitionsOffsets.java
  10. 85 5
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/InternalTopic.java
  11. 19 0
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/InternalTopicConfig.java
  12. 0 4
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/KafkaCluster.java
  13. 38 0
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/PartitionsStats.java
  14. 14 23
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/BrokerService.java
  15. 13 19
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/ClusterService.java
  16. 8 3
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/ClustersMetricsScheduler.java
  17. 1 42
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/ClustersStorage.java
  18. 22 18
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/FeatureService.java
  19. 4 3
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/MessagesService.java
  20. 94 0
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/MetricsCache.java
  21. 39 222
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/MetricsService.java
  22. 66 12
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/ReactiveAdminClient.java
  23. 222 193
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/TopicsService.java
  24. 1 0
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/ZookeeperService.java
  25. 1 101
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/util/ClusterUtil.java
  26. 0 5
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/util/Constants.java
  27. 9 0
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/util/JmxClusterUtil.java
  28. 164 0
      kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/TopicsServicePaginationTest.java
  29. 0 225
      kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/TopicsServiceTest.java

+ 4 - 4
kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/ClustersController.java

@@ -16,13 +16,13 @@ import reactor.core.publisher.Mono;
 @RestController
 @RequiredArgsConstructor
 @Log4j2
-public class ClustersController implements ClustersApi {
+public class ClustersController extends AbstractController implements ClustersApi {
   private final ClusterService clusterService;
 
   @Override
   public Mono<ResponseEntity<ClusterMetricsDTO>> getClusterMetrics(String clusterName,
                                                                 ServerWebExchange exchange) {
-    return clusterService.getClusterMetrics(clusterName)
+    return clusterService.getClusterMetrics(getCluster(clusterName))
         .map(ResponseEntity::ok)
         .onErrorReturn(ResponseEntity.notFound().build());
   }
@@ -30,7 +30,7 @@ public class ClustersController implements ClustersApi {
   @Override
   public Mono<ResponseEntity<ClusterStatsDTO>> getClusterStats(String clusterName,
                                                             ServerWebExchange exchange) {
-    return clusterService.getClusterStats(clusterName)
+    return clusterService.getClusterStats(getCluster(clusterName))
         .map(ResponseEntity::ok)
         .onErrorReturn(ResponseEntity.notFound().build());
   }
@@ -43,6 +43,6 @@ public class ClustersController implements ClustersApi {
   @Override
   public Mono<ResponseEntity<ClusterDTO>> updateClusterInfo(String clusterName,
                                                          ServerWebExchange exchange) {
-    return clusterService.updateCluster(clusterName).map(ResponseEntity::ok);
+    return clusterService.updateCluster(getCluster(clusterName)).map(ResponseEntity::ok);
   }
 }

+ 7 - 9
kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/TopicsController.java

@@ -48,18 +48,16 @@ public class TopicsController extends AbstractController implements TopicsApi {
   @Override
   public Mono<ResponseEntity<Flux<TopicConfigDTO>>> getTopicConfigs(
       String clusterName, String topicName, ServerWebExchange exchange) {
-    return Mono.just(
-        ResponseEntity.ok(
-            Flux.fromIterable(topicsService.getTopicConfigs(getCluster(clusterName), topicName))));
+    return topicsService.getTopicConfigs(getCluster(clusterName), topicName)
+        .map(Flux::fromIterable)
+        .map(ResponseEntity::ok);
   }
 
   @Override
   public Mono<ResponseEntity<TopicDetailsDTO>> getTopicDetails(
       String clusterName, String topicName, ServerWebExchange exchange) {
-    return Mono.just(
-        ResponseEntity.ok(
-            topicsService.getTopicDetails(getCluster(clusterName), topicName))
-    );
+    return topicsService.getTopicDetails(getCluster(clusterName), topicName)
+        .map(ResponseEntity::ok);
   }
 
   @Override
@@ -69,7 +67,7 @@ public class TopicsController extends AbstractController implements TopicsApi {
                                                         @Valid String search,
                                                         @Valid TopicColumnsToSortDTO orderBy,
                                                         ServerWebExchange exchange) {
-    return Mono.just(ResponseEntity.ok(topicsService
+    return topicsService
         .getTopics(
             getCluster(clusterName),
             Optional.ofNullable(page),
@@ -77,7 +75,7 @@ public class TopicsController extends AbstractController implements TopicsApi {
             Optional.ofNullable(showInternal),
             Optional.ofNullable(search),
             Optional.ofNullable(orderBy)
-        )));
+        ).map(ResponseEntity::ok);
   }
 
   @Override

+ 7 - 23
kafka-ui-api/src/main/java/com/provectus/kafka/ui/mapper/ClusterMapper.java

@@ -15,7 +15,7 @@ import com.provectus.kafka.ui.model.ConnectDTO;
 import com.provectus.kafka.ui.model.Feature;
 import com.provectus.kafka.ui.model.InternalBrokerConfig;
 import com.provectus.kafka.ui.model.InternalBrokerDiskUsage;
-import com.provectus.kafka.ui.model.InternalClusterMetrics;
+import com.provectus.kafka.ui.model.InternalClusterState;
 import com.provectus.kafka.ui.model.InternalPartition;
 import com.provectus.kafka.ui.model.InternalReplica;
 import com.provectus.kafka.ui.model.InternalSchemaRegistry;
@@ -31,6 +31,7 @@ import com.provectus.kafka.ui.model.TopicDTO;
 import com.provectus.kafka.ui.model.TopicDetailsDTO;
 import com.provectus.kafka.ui.model.schemaregistry.InternalCompatibilityCheck;
 import com.provectus.kafka.ui.model.schemaregistry.InternalCompatibilityLevel;
+import com.provectus.kafka.ui.util.JmxClusterUtil;
 import java.nio.file.Path;
 import java.util.Arrays;
 import java.util.Collections;
@@ -46,26 +47,18 @@ import org.mapstruct.Named;
 @Mapper(componentModel = "spring")
 public interface ClusterMapper {
 
-  @Mapping(target = "brokerCount", source = "metrics.brokerCount")
-  @Mapping(target = "status", source = "metrics.status")
-  @Mapping(target = "version", source = "metrics.version")
-  @Mapping(target = "onlinePartitionCount", source = "metrics.onlinePartitionCount")
-  @Mapping(target = "topicCount", source = "metrics.topicCount")
-  @Mapping(target = "bytesInPerSec", source = "metrics.bytesInPerSec")
-  @Mapping(target = "bytesOutPerSec", source = "metrics.bytesOutPerSec")
-  ClusterDTO toCluster(KafkaCluster cluster);
+  ClusterDTO toCluster(InternalClusterState clusterState);
 
   @Mapping(target = "protobufFile", source = "protobufFile", qualifiedByName = "resolvePath")
   @Mapping(target = "properties", source = "properties", qualifiedByName = "setProperties")
   @Mapping(target = "schemaRegistry", source = ".", qualifiedByName = "setSchemaRegistry")
   KafkaCluster toKafkaCluster(ClustersProperties.Cluster clusterProperties);
 
-  @Mapping(target = "diskUsage", source = "internalBrokerDiskUsage",
-      qualifiedByName = "mapDiskUsage")
-  ClusterStatsDTO toClusterStats(InternalClusterMetrics metrics);
+  ClusterStatsDTO toClusterStats(InternalClusterState clusterState);
 
-  @Mapping(target = "items", source = "metrics")
-  ClusterMetricsDTO toClusterMetrics(InternalClusterMetrics metrics);
+  default ClusterMetricsDTO toClusterMetrics(JmxClusterUtil.JmxMetrics jmxMetrics) {
+    return new ClusterMetricsDTO().items(jmxMetrics.getMetrics());
+  }
 
   BrokerMetricsDTO toBrokerMetrics(JmxBrokerMetrics metrics);
 
@@ -146,15 +139,6 @@ public interface ClusterMapper {
     return brokerDiskUsage;
   }
 
-  @Named("mapDiskUsage")
-  default List<BrokerDiskUsageDTO> mapDiskUsage(Map<Integer, InternalBrokerDiskUsage> brokers) {
-    if (brokers == null) {
-      return null;
-    }
-    return brokers.entrySet().stream().map(e -> this.map(e.getKey(), e.getValue()))
-        .collect(Collectors.toList());
-  }
-
   @Named("resolvePath")
   default Path resolvePath(String path) {
     if (path != null) {

+ 1 - 2
kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/CleanupPolicy.java

@@ -33,7 +33,6 @@ public enum CleanupPolicy {
                 )
             )
         ).findFirst()
-        .orElseThrow(() ->
-            new IllegalEntityStateException("Unknown cleanup policy value: " + string));
+        .orElse(UNKNOWN);
   }
 }

+ 11 - 0
kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/InternalBrokerConfig.java

@@ -15,4 +15,15 @@ public class InternalBrokerConfig {
   private final boolean isSensitive;
   private final boolean isReadOnly;
   private final List<ConfigEntry.ConfigSynonym> synonyms;
+
+  public static InternalBrokerConfig from(ConfigEntry configEntry) {
+    InternalBrokerConfig.InternalBrokerConfigBuilder builder = InternalBrokerConfig.builder()
+        .name(configEntry.name())
+        .value(configEntry.value())
+        .source(configEntry.source())
+        .isReadOnly(configEntry.isReadOnly())
+        .isSensitive(configEntry.isSensitive())
+        .synonyms(configEntry.synonyms());
+    return builder.build();
+  }
 }

+ 63 - 0
kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/InternalClusterState.java

@@ -0,0 +1,63 @@
+package com.provectus.kafka.ui.model;
+
+import com.provectus.kafka.ui.service.MetricsCache;
+import com.provectus.kafka.ui.util.ClusterUtil;
+import java.math.BigDecimal;
+import java.util.List;
+import java.util.stream.Collectors;
+import lombok.Data;
+
+@Data
+public class InternalClusterState {
+  private String name;
+  private ServerStatusDTO status;
+  private Integer topicCount;
+  private Integer brokerCount;
+  private Integer zooKeeperStatus;
+  private Integer activeControllers;
+  private Integer onlinePartitionCount;
+  private Integer offlinePartitionCount;
+  private Integer inSyncReplicasCount;
+  private Integer outOfSyncReplicasCount;
+  private Integer underReplicatedPartitionCount;
+  private List<BrokerDiskUsageDTO> diskUsage;
+  private String version;
+  private List<Feature> features;
+  private BigDecimal bytesInPerSec;
+  private BigDecimal bytesOutPerSec;
+
+  public InternalClusterState(KafkaCluster cluster, MetricsCache.Metrics metrics) {
+    name = cluster.getName();
+    status = metrics.getStatus();
+    topicCount = metrics.getTopicDescriptions().size();
+    brokerCount = metrics.getClusterDescription().getNodes().size();
+    zooKeeperStatus = ClusterUtil.convertToIntServerStatus(metrics.getZkStatus().getStatus());
+    activeControllers = metrics.getClusterDescription().getController() != null ? 1 : 0;
+    version = metrics.getVersion();
+
+    if (metrics.getLogDirInfo() != null) {
+      diskUsage = metrics.getLogDirInfo().getBrokerStats().entrySet().stream()
+          .map(e -> new BrokerDiskUsageDTO()
+              .brokerId(e.getKey())
+              .segmentSize(e.getValue().getSegmentSize())
+              .segmentCount(e.getValue().getSegmentsCount()))
+          .collect(Collectors.toList());
+    }
+
+    features = metrics.getFeatures();
+
+    bytesInPerSec = metrics.getJmxMetrics().getBytesInPerSec().values().stream()
+        .reduce(BigDecimal.ZERO, BigDecimal::add);
+
+    bytesOutPerSec = metrics.getJmxMetrics().getBytesOutPerSec().values().stream()
+        .reduce(BigDecimal.ZERO, BigDecimal::add);
+
+    var partitionsStats = new PartitionsStats(metrics.getTopicDescriptions().values());
+    onlinePartitionCount = partitionsStats.getOnlinePartitionCount();
+    offlinePartitionCount = partitionsStats.getOfflinePartitionCount();
+    inSyncReplicasCount = partitionsStats.getInSyncReplicasCount();
+    outOfSyncReplicasCount = partitionsStats.getOutOfSyncReplicasCount();
+    underReplicatedPartitionCount = partitionsStats.getUnderReplicatedPartitionCount();
+  }
+
+}

+ 68 - 0
kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/InternalLogDirStats.java

@@ -0,0 +1,68 @@
+package com.provectus.kafka.ui.model;
+
+import static java.util.stream.Collectors.collectingAndThen;
+import static java.util.stream.Collectors.groupingBy;
+import static java.util.stream.Collectors.summarizingLong;
+import static java.util.stream.Collectors.toList;
+
+import java.util.List;
+import java.util.LongSummaryStatistics;
+import java.util.Map;
+import lombok.Value;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.requests.DescribeLogDirsResponse;
+import reactor.util.function.Tuple2;
+import reactor.util.function.Tuple3;
+import reactor.util.function.Tuples;
+
+@Value
+public class InternalLogDirStats {
+
+  @Value
+  public static class SegmentStats {
+    long segmentSize;
+    int segmentsCount;
+
+    public SegmentStats(LongSummaryStatistics s) {
+      segmentSize = s.getSum();
+      segmentsCount = (int) s.getCount();
+    }
+  }
+
+  Map<TopicPartition, SegmentStats> partitionsStats;
+  Map<String, SegmentStats> topicStats;
+  Map<Integer, SegmentStats> brokerStats;
+
+  public static InternalLogDirStats empty() {
+    return new InternalLogDirStats(Map.of());
+  }
+
+  public InternalLogDirStats(Map<Integer, Map<String, DescribeLogDirsResponse.LogDirInfo>> log) {
+    final List<Tuple3<Integer, TopicPartition, Long>> topicPartitions =
+        log.entrySet().stream().flatMap(b ->
+            b.getValue().entrySet().stream().flatMap(topicMap ->
+                topicMap.getValue().replicaInfos.entrySet().stream()
+                    .map(e -> Tuples.of(b.getKey(), e.getKey(), e.getValue().size))
+            )
+        ).collect(toList());
+
+    partitionsStats = topicPartitions.stream().collect(
+        groupingBy(
+            Tuple2::getT2,
+            collectingAndThen(
+                summarizingLong(Tuple3::getT3), SegmentStats::new)));
+
+    topicStats =
+        topicPartitions.stream().collect(
+            groupingBy(
+                t -> t.getT2().topic(),
+                collectingAndThen(
+                    summarizingLong(Tuple3::getT3), SegmentStats::new)));
+
+    brokerStats = topicPartitions.stream().collect(
+        groupingBy(
+            Tuple2::getT1,
+            collectingAndThen(
+                summarizingLong(Tuple3::getT3), SegmentStats::new)));
+  }
+}

+ 0 - 8
kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/InternalPartition.java

@@ -13,7 +13,6 @@ public class InternalPartition {
   private final int inSyncReplicasCount;
   private final int replicasCount;
 
-  // should be updated manually on partitions return
   private final long offsetMin;
   private final long offsetMax;
 
@@ -21,12 +20,5 @@ public class InternalPartition {
   private final long segmentSize;
   private final long segmentCount;
 
-  public InternalPartition withOffsets(long min, long max) {
-    return toBuilder().offsetMin(min).offsetMax(max).build();
-  }
-
-  public InternalPartition withSegmentStats(long segmentSize, long segmentCount) {
-    return toBuilder().segmentSize(segmentSize).segmentCount(segmentCount).build();
-  }
 
 }

+ 33 - 0
kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/InternalPartitionsOffsets.java

@@ -0,0 +1,33 @@
+package com.provectus.kafka.ui.model;
+
+import com.google.common.collect.HashBasedTable;
+import com.google.common.collect.Table;
+import java.util.Map;
+import java.util.Optional;
+import lombok.Value;
+import org.apache.kafka.common.TopicPartition;
+
+
+public class InternalPartitionsOffsets {
+
+  @Value
+  public static class Offsets {
+    Long earliest;
+    Long latest;
+  }
+
+  private final Table<String, Integer, Offsets> offsets = HashBasedTable.create();
+
+  public InternalPartitionsOffsets(Map<TopicPartition, Offsets> offsetsMap) {
+    offsetsMap.forEach((tp, o) -> this.offsets.put(tp.topic(), tp.partition(), o));
+  }
+
+  public static InternalPartitionsOffsets empty() {
+    return new InternalPartitionsOffsets(Map.of());
+  }
+
+  public Optional<Offsets> get(String topic, int partition) {
+    return Optional.ofNullable(offsets.get(topic, partition));
+  }
+
+}

+ 85 - 5
kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/InternalTopic.java

@@ -1,10 +1,15 @@
 package com.provectus.kafka.ui.model;
 
+import com.provectus.kafka.ui.util.JmxClusterUtil;
 import java.math.BigDecimal;
 import java.util.List;
 import java.util.Map;
+import java.util.stream.Collectors;
 import lombok.Builder;
 import lombok.Data;
+import org.apache.kafka.clients.admin.ConfigEntry;
+import org.apache.kafka.clients.admin.TopicDescription;
+import org.apache.kafka.common.TopicPartition;
 
 @Data
 @Builder(toBuilder = true)
@@ -32,12 +37,87 @@ public class InternalTopic {
   private final long segmentSize;
   private final long segmentCount;
 
-  public InternalTopic withSegmentStats(long segmentSize, long segmentCount) {
-    return toBuilder().segmentSize(segmentSize).segmentCount(segmentCount).build();
-  }
+  public static InternalTopic from(TopicDescription topicDescription,
+                                   List<ConfigEntry> configs,
+                                   InternalPartitionsOffsets partitionsOffsets,
+                                   JmxClusterUtil.JmxMetrics jmxMetrics,
+                                   InternalLogDirStats logDirInfo) {
+    var topic = InternalTopic.builder();
+    topic.internal(
+        topicDescription.isInternal() || topicDescription.name().startsWith("_")
+    );
+    topic.name(topicDescription.name());
+
+    List<InternalPartition> partitions = topicDescription.partitions().stream()
+        .map(partition -> {
+          var partitionDto = InternalPartition.builder();
+
+          partitionDto.leader(partition.leader() != null ? partition.leader().id() : null);
+          partitionDto.partition(partition.partition());
+          partitionDto.inSyncReplicasCount(partition.isr().size());
+          partitionDto.replicasCount(partition.replicas().size());
+          List<InternalReplica> replicas = partition.replicas().stream()
+              .map(r -> new InternalReplica(r.id(),
+                  partition.leader() != null && partition.leader().id() != r.id(),
+                  partition.isr().contains(r)))
+              .collect(Collectors.toList());
+          partitionDto.replicas(replicas);
+
+          partitionsOffsets.get(topicDescription.name(), partition.partition())
+              .ifPresent(offsets -> {
+                partitionDto.offsetMin(offsets.getEarliest());
+                partitionDto.offsetMax(offsets.getLatest());
+              });
+
+          var segmentStats =
+              logDirInfo.getPartitionsStats().get(
+                  new TopicPartition(topicDescription.name(), partition.partition()));
+          if (segmentStats != null) {
+            partitionDto.segmentCount(segmentStats.getSegmentsCount());
+            partitionDto.segmentSize(segmentStats.getSegmentSize());
+          }
+
+          return partitionDto.build();
+        })
+        .collect(Collectors.toList());
+
+    topic.partitions(partitions.stream().collect(
+        Collectors.toMap(InternalPartition::getPartition, t -> t)));
+
+    var partitionsStats = new PartitionsStats(topicDescription);
+    topic.replicas(partitionsStats.getReplicasCount());
+    topic.partitionCount(partitionsStats.getPartitionsCount());
+    topic.inSyncReplicas(partitionsStats.getInSyncReplicasCount());
+    topic.underReplicatedPartitions(partitionsStats.getUnderReplicatedPartitionCount());
+
+    topic.replicationFactor(
+        topicDescription.partitions().isEmpty()
+            ? 0
+            : topicDescription.partitions().get(0).replicas().size()
+    );
+
+    var segmentStats = logDirInfo.getTopicStats().get(topicDescription.name());
+    if (segmentStats != null) {
+      topic.segmentCount(segmentStats.getSegmentsCount());
+      topic.segmentSize(segmentStats.getSegmentSize());
+    }
+
+    topic.bytesOutPerSec(jmxMetrics.getBytesOutPerSec().get(topicDescription.name()));
+    topic.bytesOutPerSec(jmxMetrics.getBytesOutPerSec().get(topicDescription.name()));
+
+    topic.topicConfigs(
+        configs.stream().map(InternalTopicConfig::from).collect(Collectors.toList()));
+
+    topic.cleanUpPolicy(
+        configs.stream()
+            .filter(config -> config.name().equals("cleanup.policy"))
+            .findFirst()
+            .map(ConfigEntry::value)
+            .map(CleanupPolicy::fromString)
+            .orElse(CleanupPolicy.UNKNOWN)
+    );
 
-  public InternalTopic withIoRates(BigDecimal bytesInPerSec, BigDecimal bytesOutPerSec) {
-    return toBuilder().bytesInPerSec(bytesInPerSec).bytesOutPerSec(bytesOutPerSec).build();
+    return topic.build();
   }
 
 }

+ 19 - 0
kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/InternalTopicConfig.java

@@ -1,11 +1,14 @@
 package com.provectus.kafka.ui.model;
 
+import static com.provectus.kafka.ui.util.KafkaConstants.TOPIC_DEFAULT_CONFIGS;
+import static org.apache.kafka.common.config.TopicConfig.MESSAGE_FORMAT_VERSION_CONFIG;
 
 import java.util.List;
 import lombok.Builder;
 import lombok.Data;
 import org.apache.kafka.clients.admin.ConfigEntry;
 
+
 @Data
 @Builder
 public class InternalTopicConfig {
@@ -16,4 +19,20 @@ public class InternalTopicConfig {
   private final boolean isSensitive;
   private final boolean isReadOnly;
   private final List<ConfigEntry.ConfigSynonym> synonyms;
+
+  public static InternalTopicConfig from(ConfigEntry configEntry) {
+    InternalTopicConfig.InternalTopicConfigBuilder builder = InternalTopicConfig.builder()
+        .name(configEntry.name())
+        .value(configEntry.value())
+        .source(configEntry.source())
+        .isReadOnly(configEntry.isReadOnly())
+        .isSensitive(configEntry.isSensitive())
+        .synonyms(configEntry.synonyms());
+    if (configEntry.name().equals(MESSAGE_FORMAT_VERSION_CONFIG)) {
+      builder.defaultValue(configEntry.value());
+    } else {
+      builder.defaultValue(TOPIC_DEFAULT_CONFIGS.get(configEntry.name()));
+    }
+    return builder.build();
+  }
 }

+ 0 - 4
kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/KafkaCluster.java

@@ -26,14 +26,10 @@ public class KafkaCluster {
   private final List<KafkaConnectCluster> kafkaConnect;
   private final String schemaNameTemplate;
   private final String keySchemaNameTemplate;
-  private final List<Feature> features;
   private final Path protobufFile;
   private final String protobufMessageName;
   private final Map<String, String> protobufMessageNameByTopic;
   private final Properties properties;
   private final Boolean readOnly;
   private final Boolean disableLogDirsCollection;
-
-  // state & metrics
-  private final InternalClusterMetrics metrics;
 }

+ 38 - 0
kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/PartitionsStats.java

@@ -0,0 +1,38 @@
+package com.provectus.kafka.ui.model;
+
+import java.util.Collection;
+import java.util.List;
+import lombok.Data;
+import org.apache.kafka.clients.admin.TopicDescription;
+
+@Data
+public class PartitionsStats {
+
+  private int partitionsCount;
+  private int replicasCount;
+  private int onlinePartitionCount;
+  private int offlinePartitionCount;
+  private int inSyncReplicasCount;
+  private int outOfSyncReplicasCount;
+  private int underReplicatedPartitionCount;
+
+  public PartitionsStats(TopicDescription description) {
+    this(List.of(description));
+  }
+
+  public PartitionsStats(Collection<TopicDescription> topicDescriptions) {
+    topicDescriptions.stream()
+        .flatMap(t -> t.partitions().stream())
+        .forEach(p -> {
+          partitionsCount++;
+          replicasCount += p.replicas().size();
+          onlinePartitionCount += p.leader() != null ? 1 : 0;
+          offlinePartitionCount += p.leader() == null ? 1 : 0;
+          inSyncReplicasCount += p.isr().size();
+          outOfSyncReplicasCount += (p.replicas().size() - p.isr().size());
+          if (p.replicas().size() > p.isr().size()) {
+            underReplicatedPartitionCount++;
+          }
+        });
+  }
+}

+ 14 - 23
kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/BrokerService.java

@@ -1,6 +1,5 @@
 package com.provectus.kafka.ui.service;
 
-import com.provectus.kafka.ui.exception.IllegalEntityStateException;
 import com.provectus.kafka.ui.exception.InvalidRequestApiException;
 import com.provectus.kafka.ui.exception.LogDirNotFoundApiException;
 import com.provectus.kafka.ui.exception.NotFoundException;
@@ -14,8 +13,6 @@ import com.provectus.kafka.ui.model.BrokerMetricsDTO;
 import com.provectus.kafka.ui.model.BrokersLogdirsDTO;
 import com.provectus.kafka.ui.model.InternalBrokerConfig;
 import com.provectus.kafka.ui.model.KafkaCluster;
-import com.provectus.kafka.ui.util.ClusterUtil;
-import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
@@ -40,14 +37,14 @@ import reactor.core.publisher.Mono;
 @Log4j2
 public class BrokerService {
 
+  private final MetricsCache metricsCache;
   private final AdminClientService adminClientService;
   private final DescribeLogDirsMapper describeLogDirsMapper;
   private final ClusterMapper clusterMapper;
 
   private Mono<Map<Integer, List<ConfigEntry>>> loadBrokersConfig(
       KafkaCluster cluster, List<Integer> brokersIds) {
-    return adminClientService.get(cluster)
-        .flatMap(ac -> ac.loadBrokersConfig(brokersIds));
+    return adminClientService.get(cluster).flatMap(ac -> ac.loadBrokersConfig(brokersIds));
   }
 
   private Mono<List<ConfigEntry>> loadBrokersConfig(
@@ -55,28 +52,19 @@ public class BrokerService {
     return loadBrokersConfig(cluster, Collections.singletonList(brokerId))
         .map(map -> map.values().stream()
             .findFirst()
-            .orElseThrow(() -> new IllegalEntityStateException(
-                String.format("Config for broker %s not found", brokerId)))
-        );
-  }
-
-  public Mono<Map<String, InternalBrokerConfig>> getBrokerConfigMap(KafkaCluster cluster,
-                                                                    Integer brokerId) {
-    return loadBrokersConfig(cluster, brokerId)
-        .map(list -> list.stream()
-            .collect(Collectors.toMap(
-                ConfigEntry::name,
-                ClusterUtil::mapToInternalBrokerConfig)));
+            .orElseThrow(() -> new NotFoundException(
+                String.format("Config for broker %s not found", brokerId))));
   }
 
   private Flux<InternalBrokerConfig> getBrokersConfig(KafkaCluster cluster, Integer brokerId) {
-    if (!cluster.getMetrics().getBrokers().contains(brokerId)) {
+    if (metricsCache.get(cluster).getClusterDescription().getNodes()
+        .stream().noneMatch(node -> node.id() == brokerId)) {
       return Flux.error(
           new NotFoundException(String.format("Broker with id %s not found", brokerId)));
     }
     return loadBrokersConfig(cluster, brokerId)
         .map(list -> list.stream()
-            .map(ClusterUtil::mapToInternalBrokerConfig)
+            .map(InternalBrokerConfig::from)
             .collect(Collectors.toList()))
         .flatMapMany(Flux::fromIterable);
   }
@@ -139,7 +127,10 @@ public class BrokerService {
       KafkaCluster cluster, List<Integer> reqBrokers) {
     return adminClientService.get(cluster)
         .flatMap(admin -> {
-          List<Integer> brokers = new ArrayList<>(cluster.getMetrics().getBrokers());
+          List<Integer> brokers = metricsCache.get(cluster).getClusterDescription().getNodes()
+              .stream()
+              .map(Node::id)
+              .collect(Collectors.toList());
           if (reqBrokers != null && !reqBrokers.isEmpty()) {
             brokers.retainAll(reqBrokers);
           }
@@ -162,9 +153,9 @@ public class BrokerService {
         .map(clusterMapper::toBrokerConfig);
   }
 
-  public Mono<BrokerMetricsDTO> getBrokerMetrics(KafkaCluster cluster, Integer id) {
-    return Mono.just(cluster.getMetrics().getInternalBrokerMetrics())
-        .map(m -> m.get(id))
+  public Mono<BrokerMetricsDTO> getBrokerMetrics(KafkaCluster cluster, Integer brokerId) {
+    return Mono.justOrEmpty(
+            metricsCache.get(cluster).getJmxMetrics().getInternalBrokerMetrics().get(brokerId))
         .map(clusterMapper::toBrokerMetrics);
   }
 

+ 13 - 19
kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/ClusterService.java

@@ -1,10 +1,10 @@
 package com.provectus.kafka.ui.service;
 
-import com.provectus.kafka.ui.exception.ClusterNotFoundException;
 import com.provectus.kafka.ui.mapper.ClusterMapper;
 import com.provectus.kafka.ui.model.ClusterDTO;
 import com.provectus.kafka.ui.model.ClusterMetricsDTO;
 import com.provectus.kafka.ui.model.ClusterStatsDTO;
+import com.provectus.kafka.ui.model.InternalClusterState;
 import com.provectus.kafka.ui.model.KafkaCluster;
 import java.util.List;
 import java.util.stream.Collectors;
@@ -18,6 +18,7 @@ import reactor.core.publisher.Mono;
 @Log4j2
 public class ClusterService {
 
+  private final MetricsCache metricsCache;
   private final ClustersStorage clustersStorage;
   private final ClusterMapper clusterMapper;
   private final MetricsService metricsService;
@@ -25,32 +26,25 @@ public class ClusterService {
   public List<ClusterDTO> getClusters() {
     return clustersStorage.getKafkaClusters()
         .stream()
-        .map(clusterMapper::toCluster)
+        .map(c -> clusterMapper.toCluster(new InternalClusterState(c, metricsCache.get(c))))
         .collect(Collectors.toList());
   }
 
-  public Mono<ClusterStatsDTO> getClusterStats(String name) {
+  public Mono<ClusterStatsDTO> getClusterStats(KafkaCluster cluster) {
     return Mono.justOrEmpty(
-        clustersStorage.getClusterByName(name)
-            .map(KafkaCluster::getMetrics)
-            .map(clusterMapper::toClusterStats)
+        clusterMapper.toClusterStats(
+            new InternalClusterState(cluster, metricsCache.get(cluster)))
     );
   }
 
-  public Mono<ClusterMetricsDTO> getClusterMetrics(String name) {
-    return Mono.justOrEmpty(
-        clustersStorage.getClusterByName(name)
-            .map(KafkaCluster::getMetrics)
-            .map(clusterMapper::toClusterMetrics)
-    );
+  public Mono<ClusterMetricsDTO> getClusterMetrics(KafkaCluster cluster) {
+    return Mono.just(
+        clusterMapper.toClusterMetrics(
+            metricsCache.get(cluster).getJmxMetrics()));
   }
 
-  public Mono<ClusterDTO> updateCluster(String clusterName) {
-    return clustersStorage.getClusterByName(clusterName)
-        .map(cluster -> metricsService.updateClusterMetrics(cluster)
-            .doOnNext(updatedCluster -> clustersStorage
-                .setKafkaCluster(updatedCluster.getName(), updatedCluster))
-            .map(clusterMapper::toCluster))
-        .orElse(Mono.error(new ClusterNotFoundException()));
+  public Mono<ClusterDTO> updateCluster(KafkaCluster cluster) {
+    return metricsService.updateCache(cluster)
+        .map(metrics -> clusterMapper.toCluster(new InternalClusterState(cluster, metrics)));
   }
 }

+ 8 - 3
kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/ClustersMetricsScheduler.java

@@ -1,6 +1,7 @@
 package com.provectus.kafka.ui.service;
 
 import java.util.Map;
+import javax.annotation.PostConstruct;
 import lombok.RequiredArgsConstructor;
 import lombok.extern.log4j.Log4j2;
 import org.springframework.scheduling.annotation.Scheduled;
@@ -17,7 +18,11 @@ public class ClustersMetricsScheduler {
 
   private final MetricsService metricsService;
 
-  @Scheduled(fixedRateString = "${kafka.update-metrics-rate-millis:30000}")
+  @PostConstruct //need to fill metrics before application startup to prevent invalid state render
+  @Scheduled(
+      fixedRateString = "${kafka.update-metrics-rate-millis:30000}",
+      initialDelayString = "${kafka.update-metrics-rate-millis:30000}"
+  )
   public void updateMetrics() {
     Flux.fromIterable(clustersStorage.getKafkaClustersMap().entrySet())
         .parallel()
@@ -25,9 +30,9 @@ public class ClustersMetricsScheduler {
         .map(Map.Entry::getValue)
         .flatMap(cluster -> {
           log.debug("Start getting metrics for kafkaCluster: {}", cluster.getName());
-          return metricsService.updateClusterMetrics(cluster);
+          return metricsService.updateCache(cluster)
+              .doOnSuccess(m -> log.debug("Metrics updated for cluster: {}", cluster.getName()));
         })
-        .doOnNext(s -> clustersStorage.setKafkaCluster(s.getName(), s))
         .then()
         .block();
   }

+ 1 - 42
kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/ClustersStorage.java

@@ -2,11 +2,8 @@ package com.provectus.kafka.ui.service;
 
 import com.provectus.kafka.ui.config.ClustersProperties;
 import com.provectus.kafka.ui.mapper.ClusterMapper;
-import com.provectus.kafka.ui.model.InternalClusterMetrics;
-import com.provectus.kafka.ui.model.InternalTopic;
 import com.provectus.kafka.ui.model.KafkaCluster;
 import java.util.Collection;
-import java.util.HashMap;
 import java.util.Map;
 import java.util.Optional;
 import java.util.concurrent.ConcurrentHashMap;
@@ -29,10 +26,7 @@ public class ClustersStorage {
   public void init() {
     for (ClustersProperties.Cluster clusterProperties : clusterProperties.getClusters()) {
       KafkaCluster cluster = clusterMapper.toKafkaCluster(clusterProperties);
-      kafkaClusters.put(
-          clusterProperties.getName(),
-          cluster.toBuilder().metrics(InternalClusterMetrics.empty()).build()
-      );
+      kafkaClusters.put(clusterProperties.getName(), cluster);
     }
   }
 
@@ -44,41 +38,6 @@ public class ClustersStorage {
     return Optional.ofNullable(kafkaClusters.get(clusterName));
   }
 
-  public KafkaCluster setKafkaCluster(String key, KafkaCluster kafkaCluster) {
-    this.kafkaClusters.put(key, kafkaCluster);
-    return kafkaCluster;
-  }
-
-  public void onTopicDeleted(String clusterName, String topicToDelete) {
-    var cluster = kafkaClusters.get(clusterName);
-    var topics = Optional.ofNullable(cluster.getMetrics().getTopics())
-        .map(HashMap::new)
-        .orElseGet(HashMap::new);
-    topics.remove(topicToDelete);
-    setUpdatedTopics(cluster, topics);
-  }
-
-  public void onTopicUpdated(String clusterName, InternalTopic updatedTopic) {
-    var cluster = kafkaClusters.get(clusterName);
-    var topics = Optional.ofNullable(cluster.getMetrics().getTopics())
-        .map(HashMap::new)
-        .orElseGet(HashMap::new);
-    topics.put(updatedTopic.getName(), updatedTopic);
-    setUpdatedTopics(cluster, topics);
-  }
-
-  private void setUpdatedTopics(KafkaCluster cluster, Map<String, InternalTopic> topics) {
-    setKafkaCluster(
-        cluster.getName(),
-        cluster.toBuilder()
-            .metrics(
-                cluster.getMetrics().toBuilder()
-                    .topics(topics)
-                    .build())
-            .build()
-    );
-  }
-
   public Map<String, KafkaCluster> getKafkaClustersMap() {
     return kafkaClusters;
   }

+ 22 - 18
kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/FeatureService.java

@@ -1,13 +1,13 @@
 package com.provectus.kafka.ui.service;
 
-import static com.provectus.kafka.ui.util.Constants.DELETE_TOPIC_ENABLE;
-
 import com.provectus.kafka.ui.model.Feature;
 import com.provectus.kafka.ui.model.KafkaCluster;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.List;
 import java.util.Optional;
 import java.util.function.Predicate;
+import javax.annotation.Nullable;
 import lombok.RequiredArgsConstructor;
 import lombok.extern.log4j.Log4j2;
 import org.apache.kafka.common.Node;
@@ -20,9 +20,11 @@ import reactor.core.publisher.Mono;
 @Log4j2
 public class FeatureService {
 
-  private final BrokerService brokerService;
+  private static final String DELETE_TOPIC_ENABLED_SERVER_PROPERTY = "delete.topic.enable";
+
+  private final AdminClientService adminClientService;
 
-  public Mono<List<Feature>> getAvailableFeatures(KafkaCluster cluster) {
+  public Mono<List<Feature>> getAvailableFeatures(KafkaCluster cluster, @Nullable Node controller) {
     List<Mono<Feature>> features = new ArrayList<>();
 
     if (Optional.ofNullable(cluster.getKafkaConnect())
@@ -39,23 +41,25 @@ public class FeatureService {
       features.add(Mono.just(Feature.SCHEMA_REGISTRY));
     }
 
-    features.add(
-        isTopicDeletionEnabled(cluster)
-            .flatMap(r -> r ? Mono.just(Feature.TOPIC_DELETION) : Mono.empty())
-    );
+    if (controller != null) {
+      features.add(
+          isTopicDeletionEnabled(cluster, controller)
+              .flatMap(r -> r ? Mono.just(Feature.TOPIC_DELETION) : Mono.empty())
+      );
+    }
 
     return Flux.fromIterable(features).flatMap(m -> m).collectList();
   }
 
-  private Mono<Boolean> isTopicDeletionEnabled(KafkaCluster cluster) {
-    return brokerService.getController(cluster)
-        .map(Node::id)
-        .flatMap(broker -> brokerService.getBrokerConfigMap(cluster, broker))
-        .map(config -> {
-          if (config != null && config.get(DELETE_TOPIC_ENABLE) != null) {
-            return Boolean.parseBoolean(config.get(DELETE_TOPIC_ENABLE).getValue());
-          }
-          return false;
-        });
+  private Mono<Boolean> isTopicDeletionEnabled(KafkaCluster cluster, Node controller) {
+    return adminClientService.get(cluster)
+        .flatMap(ac -> ac.loadBrokersConfig(List.of(controller.id())))
+        .map(config ->
+            config.values().stream()
+                .flatMap(Collection::stream)
+                .filter(e -> e.name().equals(DELETE_TOPIC_ENABLED_SERVER_PROPERTY))
+                .map(e -> Boolean.parseBoolean(e.value()))
+                .findFirst()
+                .orElse(false));
   }
 }

+ 4 - 3
kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/MessagesService.java

@@ -55,10 +55,11 @@ public class MessagesService {
   private final AdminClientService adminClientService;
   private final DeserializationService deserializationService;
   private final ConsumerGroupService consumerGroupService;
+  private final MetricsCache metricsCache;
 
   public Mono<Void> deleteTopicMessages(KafkaCluster cluster, String topicName,
                                         List<Integer> partitionsToInclude) {
-    if (!cluster.getMetrics().getTopics().containsKey(topicName)) {
+    if (!metricsCache.get(cluster).getTopicDescriptions().containsKey(topicName)) {
       throw new TopicNotFoundException();
     }
     return offsetsForDeletion(cluster, topicName, partitionsToInclude)
@@ -84,8 +85,8 @@ public class MessagesService {
       throw new ValidationException("Invalid message: both key and value can't be null");
     }
     if (msg.getPartition() != null
-        && msg.getPartition() > cluster.getMetrics().getTopics()
-          .get(topic).getPartitionCount() - 1) {
+        && msg.getPartition() > metricsCache.get(cluster).getTopicDescriptions()
+          .get(topic).partitions().size() - 1) {
       throw new ValidationException("Invalid partition");
     }
     RecordSerDe serde =

+ 94 - 0
kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/MetricsCache.java

@@ -0,0 +1,94 @@
+package com.provectus.kafka.ui.service;
+
+import com.provectus.kafka.ui.model.Feature;
+import com.provectus.kafka.ui.model.InternalLogDirStats;
+import com.provectus.kafka.ui.model.KafkaCluster;
+import com.provectus.kafka.ui.model.ServerStatusDTO;
+import com.provectus.kafka.ui.util.JmxClusterUtil;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import lombok.Builder;
+import lombok.Value;
+import org.apache.kafka.clients.admin.ConfigEntry;
+import org.apache.kafka.clients.admin.TopicDescription;
+import org.springframework.stereotype.Component;
+
+@Component
+public class MetricsCache {
+
+  @Value
+  @Builder(toBuilder = true)
+  public static class Metrics {
+    ServerStatusDTO status;
+    Throwable lastKafkaException;
+    String version;
+    List<Feature> features;
+    ZookeeperService.ZkStatus zkStatus;
+    ReactiveAdminClient.ClusterDescription clusterDescription;
+    JmxClusterUtil.JmxMetrics jmxMetrics;
+    InternalLogDirStats logDirInfo;
+    Map<String, TopicDescription> topicDescriptions;
+    Map<String, List<ConfigEntry>> topicConfigs;
+  }
+
+  public static Metrics empty() {
+    return Metrics.builder()
+        .status(ServerStatusDTO.OFFLINE)
+        .version("Unknown")
+        .features(List.of())
+        .zkStatus(new ZookeeperService.ZkStatus(ServerStatusDTO.OFFLINE, null))
+        .clusterDescription(
+            new ReactiveAdminClient.ClusterDescription(null, null, List.of(), Set.of()))
+        .jmxMetrics(JmxClusterUtil.JmxMetrics.empty())
+        .logDirInfo(InternalLogDirStats.empty())
+        .topicDescriptions(Map.of())
+        .topicConfigs(Map.of())
+        .build();
+  }
+
+  private final Map<String, Metrics> cache = new ConcurrentHashMap<>();
+
+  public synchronized void replace(KafkaCluster c, Metrics stats) {
+    cache.put(c.getName(), stats);
+  }
+
+  public synchronized void update(KafkaCluster c,
+                                  Map<String, TopicDescription> descriptions,
+                                  Map<String, List<ConfigEntry>> configs) {
+    var metrics = get(c);
+    var updatedDescriptions = new HashMap<>(metrics.getTopicDescriptions());
+    updatedDescriptions.putAll(descriptions);
+    var updatedConfigs = new HashMap<>(metrics.getTopicConfigs());
+    updatedConfigs.putAll(configs);
+    replace(
+        c,
+        metrics.toBuilder()
+            .topicDescriptions(updatedDescriptions)
+            .topicConfigs(updatedConfigs)
+            .build()
+    );
+  }
+
+  public synchronized void onTopicDelete(KafkaCluster c, String topic) {
+    var metrics = get(c);
+    var updatedDescriptions = new HashMap<>(metrics.getTopicDescriptions());
+    updatedDescriptions.remove(topic);
+    var updatedConfigs = new HashMap<>(metrics.getTopicConfigs());
+    updatedConfigs.remove(topic);
+    replace(
+        c,
+        metrics.toBuilder()
+            .topicDescriptions(updatedDescriptions)
+            .topicConfigs(updatedConfigs)
+            .build()
+    );
+  }
+
+  public Metrics get(KafkaCluster c) {
+    return cache.get(c.getName());
+  }
+
+}

+ 39 - 222
kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/MetricsService.java

@@ -1,38 +1,18 @@
 package com.provectus.kafka.ui.service;
 
-import static com.provectus.kafka.ui.service.ReactiveAdminClient.ClusterDescription;
-import static com.provectus.kafka.ui.service.ZookeeperService.ZkStatus;
-import static com.provectus.kafka.ui.util.JmxClusterUtil.JmxMetrics;
-import static java.util.stream.Collectors.groupingBy;
-import static java.util.stream.Collectors.summarizingLong;
-import static java.util.stream.Collectors.toList;
-import static java.util.stream.Collectors.toMap;
-
-import com.provectus.kafka.ui.model.InternalBrokerDiskUsage;
-import com.provectus.kafka.ui.model.InternalClusterMetrics;
-import com.provectus.kafka.ui.model.InternalPartition;
-import com.provectus.kafka.ui.model.InternalTopic;
+import com.provectus.kafka.ui.model.Feature;
+import com.provectus.kafka.ui.model.InternalLogDirStats;
 import com.provectus.kafka.ui.model.KafkaCluster;
 import com.provectus.kafka.ui.model.ServerStatusDTO;
-import com.provectus.kafka.ui.util.ClusterUtil;
 import com.provectus.kafka.ui.util.JmxClusterUtil;
-import java.math.BigDecimal;
 import java.util.List;
-import java.util.LongSummaryStatistics;
 import java.util.Map;
-import java.util.Optional;
-import java.util.stream.Collectors;
-import lombok.Builder;
 import lombok.RequiredArgsConstructor;
 import lombok.extern.log4j.Log4j2;
-import org.apache.kafka.common.Node;
-import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.common.requests.DescribeLogDirsResponse;
+import org.apache.kafka.clients.admin.ConfigEntry;
+import org.apache.kafka.clients.admin.TopicDescription;
 import org.springframework.stereotype.Service;
 import reactor.core.publisher.Mono;
-import reactor.util.function.Tuple2;
-import reactor.util.function.Tuple3;
-import reactor.util.function.Tuples;
 
 @Service
 @RequiredArgsConstructor
@@ -43,218 +23,55 @@ public class MetricsService {
   private final JmxClusterUtil jmxClusterUtil;
   private final AdminClientService adminClientService;
   private final FeatureService featureService;
-  private final TopicsService topicsService;
+  private final MetricsCache cache;
 
-  /**
-   * Updates cluster's metrics and topics structure.
-   *
-   * @param cluster to be updated
-   * @return cluster with up-to-date metrics and topics structure
-   */
-  public Mono<KafkaCluster> updateClusterMetrics(KafkaCluster cluster) {
-    return getMetrics(cluster)
-        .map(m -> cluster.toBuilder().metrics(m).build())
-        .zipWith(featureService.getAvailableFeatures(cluster),
-            (c, features) -> c.toBuilder().features(features).build());
+  public Mono<MetricsCache.Metrics> updateCache(KafkaCluster c) {
+    return getMetrics(c).doOnSuccess(m -> cache.replace(c, m));
   }
 
-  private Mono<InternalClusterMetrics> getMetrics(KafkaCluster cluster) {
+  private Mono<MetricsCache.Metrics> getMetrics(KafkaCluster cluster) {
     return adminClientService.get(cluster).flatMap(ac ->
-            ac.describeCluster().flatMap(
-                description -> Mono.just(
-                        MetricsCollector.builder()
+        ac.describeCluster().flatMap(description ->
+                Mono.zip(
+                    List.of(
+                        jmxClusterUtil.getBrokerMetrics(cluster, description.getNodes()),
+                        zookeeperService.getZkStatus(cluster),
+                        getLogDirInfo(cluster, ac),
+                        featureService.getAvailableFeatures(cluster, description.getController()),
+                        loadTopicConfigs(cluster),
+                        describeTopics(cluster)),
+                    results ->
+                        MetricsCache.Metrics.builder()
+                            .status(ServerStatusDTO.ONLINE)
                             .clusterDescription(description)
                             .version(ac.getVersion())
+                            .jmxMetrics((JmxClusterUtil.JmxMetrics) results[0])
+                            .zkStatus((ZookeeperService.ZkStatus) results[1])
+                            .logDirInfo((InternalLogDirStats) results[2])
+                            .features((List<Feature>) results[3])
+                            .topicConfigs((Map<String, List<ConfigEntry>>) results[4])
+                            .topicDescriptions((Map<String, TopicDescription>) results[5])
                             .build()
-                    )
-                    .zipWith(jmxClusterUtil.getBrokerMetrics(cluster, description.getNodes()),
-                        (b, jmx) -> b.toBuilder().jmxMetrics(jmx).build())
-                    .zipWith(zookeeperService.getZkStatus(cluster),
-                        (b, status) -> b.toBuilder().zkStatus(status).build()))
-                    .zipWith(topicsService.getTopicsData(ac),
-                        (b, td) -> b.toBuilder().topicsData(td).build())
-                    .zipWith(getLogDirInfo(cluster, ac),
-                        (b, ld) -> b.toBuilder().logDirResult(ld).build())
-                .map(MetricsCollector::build)
-        )
-        .doOnError(e ->
-            log.error("Failed to collect cluster {} info", cluster.getName(), e)
-        ).onErrorResume(
-            e -> Mono.just(cluster.getMetrics().toBuilder()
-                .status(ServerStatusDTO.OFFLINE)
-                .lastKafkaException(e)
-                .build())
-        );
-  }
-
-  @Builder(toBuilder = true)
-  private static class MetricsCollector {
-    String version;
-    ClusterDescription clusterDescription;
-    JmxMetrics jmxMetrics;
-    List<InternalTopic> topicsData;
-    ZkStatus zkStatus;
-    Optional<LogDirInfo> logDirResult; // empty if log dir collection disabled
-
-    InternalClusterMetrics build() {
-      var metricsBuilder = InternalClusterMetrics.builder();
-      metricsBuilder.version(version);
-      metricsBuilder.status(ServerStatusDTO.ONLINE);
-      metricsBuilder.lastKafkaException(null);
-
-      metricsBuilder.zookeeperStatus(zkStatus.getStatus());
-      metricsBuilder.zooKeeperStatus(ClusterUtil.convertToIntServerStatus(zkStatus.getStatus()));
-      metricsBuilder.lastZookeeperException(zkStatus.getError());
-
-      metricsBuilder.brokers(
-          clusterDescription.getNodes().stream().map(Node::id).collect(toList()));
-      metricsBuilder.brokerCount(clusterDescription.getNodes().size());
-      metricsBuilder.activeControllers(clusterDescription.getController() != null ? 1 : 0);
-
-      fillTopicsMetrics(metricsBuilder, topicsData);
-      fillJmxMetrics(metricsBuilder, jmxMetrics);
-
-      logDirResult.ifPresent(r -> r.enrichWithLogDirInfo(metricsBuilder));
-
-      return metricsBuilder.build();
-    }
+                ))
+            .doOnError(e ->
+                log.error("Failed to collect cluster {} info", cluster.getName(), e))
+            .onErrorResume(
+                e -> Mono.just(MetricsCache.empty().toBuilder().lastKafkaException(e).build())));
   }
 
-  private static void fillJmxMetrics(
-      InternalClusterMetrics.InternalClusterMetricsBuilder metricsBuilder,
-      JmxMetrics jmxMetrics) {
-    metricsBuilder.metrics(jmxMetrics.getMetrics());
-    metricsBuilder.internalBrokerMetrics(jmxMetrics.getInternalBrokerMetrics());
-
-    metricsBuilder.bytesInPerSec(
-        jmxMetrics.getBytesInPerSec().values().stream()
-            .reduce(BigDecimal.ZERO, BigDecimal::add));
-
-    metricsBuilder.bytesOutPerSec(
-        jmxMetrics.getBytesOutPerSec().values().stream()
-            .reduce(BigDecimal.ZERO, BigDecimal::add));
-
-    metricsBuilder.topics(
-        metricsBuilder.build().getTopics().values().stream()
-            .map(t ->
-                t.withIoRates(
-                    jmxMetrics.getBytesInPerSec().get(t.getName()),
-                    jmxMetrics.getBytesOutPerSec().get(t.getName()))
-            ).collect(Collectors.toMap(InternalTopic::getName, t -> t))
-    );
-  }
-
-  private Mono<Optional<LogDirInfo>> getLogDirInfo(KafkaCluster cluster, ReactiveAdminClient c) {
+  private Mono<InternalLogDirStats> getLogDirInfo(KafkaCluster cluster, ReactiveAdminClient c) {
     if (cluster.getDisableLogDirsCollection() == null || !cluster.getDisableLogDirsCollection()) {
-      return c.describeLogDirs().map(LogDirInfo::new).map(Optional::of);
+      return c.describeLogDirs().map(InternalLogDirStats::new);
     }
-    return Mono.just(Optional.empty());
+    return Mono.just(InternalLogDirStats.empty());
   }
 
-  private static void fillTopicsMetrics(
-      InternalClusterMetrics.InternalClusterMetricsBuilder builder,
-      List<InternalTopic> topics) {
-
-    int underReplicatedPartitions = 0;
-    int inSyncReplicasCount = 0;
-    int outOfSyncReplicasCount = 0;
-    int onlinePartitionCount = 0;
-    int offlinePartitionCount = 0;
-
-    for (InternalTopic topic : topics) {
-      underReplicatedPartitions += topic.getUnderReplicatedPartitions();
-      inSyncReplicasCount += topic.getInSyncReplicas();
-      outOfSyncReplicasCount += (topic.getReplicas() - topic.getInSyncReplicas());
-      onlinePartitionCount +=
-          topic.getPartitions().values().stream().mapToInt(s -> s.getLeader() == null ? 0 : 1)
-              .sum();
-      offlinePartitionCount +=
-          topic.getPartitions().values().stream().mapToInt(s -> s.getLeader() != null ? 0 : 1)
-              .sum();
-    }
-
-    builder
-        .underReplicatedPartitionCount(underReplicatedPartitions)
-        .inSyncReplicasCount(inSyncReplicasCount)
-        .outOfSyncReplicasCount(outOfSyncReplicasCount)
-        .onlinePartitionCount(onlinePartitionCount)
-        .offlinePartitionCount(offlinePartitionCount)
-        .topicCount(topics.size())
-        .topics(topics.stream().collect(Collectors.toMap(InternalTopic::getName, t -> t)));
+  private Mono<Map<String, TopicDescription>> describeTopics(KafkaCluster c) {
+    return adminClientService.get(c).flatMap(ReactiveAdminClient::describeTopics);
   }
 
-  private static class LogDirInfo {
-
-    private final Map<TopicPartition, LongSummaryStatistics> partitionsStats;
-    private final Map<String, LongSummaryStatistics> topicStats;
-    private final Map<Integer, LongSummaryStatistics> brokerStats;
-
-    LogDirInfo(Map<Integer, Map<String, DescribeLogDirsResponse.LogDirInfo>> log) {
-      final List<Tuple3<Integer, TopicPartition, Long>> topicPartitions =
-          log.entrySet().stream().flatMap(b ->
-              b.getValue().entrySet().stream().flatMap(topicMap ->
-                  topicMap.getValue().replicaInfos.entrySet().stream()
-                      .map(e -> Tuples.of(b.getKey(), e.getKey(), e.getValue().size))
-              )
-          ).collect(toList());
-
-      partitionsStats = topicPartitions.stream().collect(
-          groupingBy(
-              Tuple2::getT2,
-              summarizingLong(Tuple3::getT3)));
-
-      topicStats =
-          topicPartitions.stream().collect(
-              groupingBy(
-                  t -> t.getT2().topic(),
-                  summarizingLong(Tuple3::getT3)));
-
-      brokerStats = topicPartitions.stream().collect(
-          groupingBy(
-              Tuple2::getT1,
-              summarizingLong(Tuple3::getT3)));
-    }
-
-    private InternalTopic enrichTopicWithSegmentStats(InternalTopic topic) {
-      LongSummaryStatistics stats = topicStats.get(topic.getName());
-      return topic.withSegmentStats(stats.getSum(), stats.getCount())
-          .toBuilder()
-          .partitions(
-              topic.getPartitions().entrySet().stream().map(e ->
-                  Tuples.of(e.getKey(),
-                      enrichPartitionWithSegmentsData(topic.getName(), e.getValue()))
-              ).collect(toMap(Tuple2::getT1, Tuple2::getT2))
-          ).build();
-    }
-
-    private InternalPartition enrichPartitionWithSegmentsData(String topic,
-                                                              InternalPartition partition) {
-      final LongSummaryStatistics stats =
-          partitionsStats.get(new TopicPartition(topic, partition.getPartition()));
-      return partition.withSegmentStats(stats.getSum(), stats.getCount());
-    }
-
-    private Map<Integer, InternalBrokerDiskUsage> getBrokersDiskUsage() {
-      return brokerStats.entrySet().stream().map(e ->
-          Tuples.of(e.getKey(), InternalBrokerDiskUsage.builder()
-              .segmentSize(e.getValue().getSum())
-              .segmentCount(e.getValue().getCount())
-              .build()
-          )
-      ).collect(toMap(Tuple2::getT1, Tuple2::getT2));
-    }
-
-    private Map<String, InternalTopic> enrichTopics(Map<String, InternalTopic> topics) {
-      return topics.values().stream()
-          .map(this::enrichTopicWithSegmentStats)
-          .collect(Collectors.toMap(InternalTopic::getName, t -> t));
-    }
-
-    public void enrichWithLogDirInfo(
-        InternalClusterMetrics.InternalClusterMetricsBuilder builder) {
-      builder
-          .topics(enrichTopics(builder.build().getTopics()))
-          .internalBrokerDiskUsage(getBrokersDiskUsage());
-    }
+  private Mono<Map<String, List<ConfigEntry>>> loadTopicConfigs(KafkaCluster c) {
+    return adminClientService.get(c).flatMap(ReactiveAdminClient::getTopicsConfig);
   }
+
 }

+ 66 - 12
kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/ReactiveAdminClient.java

@@ -15,7 +15,9 @@ import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutionException;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 import javax.annotation.Nullable;
@@ -37,6 +39,7 @@ import org.apache.kafka.clients.admin.OffsetSpec;
 import org.apache.kafka.clients.admin.RecordsToDelete;
 import org.apache.kafka.clients.admin.TopicDescription;
 import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.KafkaFuture;
 import org.apache.kafka.common.Node;
 import org.apache.kafka.common.TopicPartition;
@@ -45,8 +48,11 @@ import org.apache.kafka.common.acl.AclOperation;
 import org.apache.kafka.common.config.ConfigResource;
 import org.apache.kafka.common.errors.GroupIdNotFoundException;
 import org.apache.kafka.common.errors.GroupNotEmptyException;
+import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
 import org.apache.kafka.common.requests.DescribeLogDirsResponse;
 import reactor.core.publisher.Mono;
+import reactor.util.function.Tuple2;
+import reactor.util.function.Tuples;
 
 
 @Log4j2
@@ -112,20 +118,24 @@ public class ReactiveAdminClient implements Closeable {
     return version;
   }
 
+  public Mono<Map<String, List<ConfigEntry>>> getTopicsConfig() {
+    return listTopics(true).flatMap(this::getTopicsConfig);
+  }
+
   public Mono<Map<String, List<ConfigEntry>>> getTopicsConfig(Collection<String> topicNames) {
     List<ConfigResource> resources = topicNames.stream()
         .map(topicName -> new ConfigResource(ConfigResource.Type.TOPIC, topicName))
         .collect(toList());
 
-    return toMono(
+    return toMonoWithExceptionFilter(
         client.describeConfigs(
             resources,
-            new DescribeConfigsOptions().includeSynonyms(true)
-        ).all())
-        .map(config -> config.entrySet().stream()
-            .collect(toMap(
-                c -> c.getKey().name(),
-                c -> new ArrayList<>(c.getValue().entries()))));
+            new DescribeConfigsOptions().includeSynonyms(true)).values(),
+        UnknownTopicOrPartitionException.class
+    ).map(config -> config.entrySet().stream()
+        .collect(toMap(
+            c -> c.getKey().name(),
+            c -> List.copyOf(c.getValue().entries()))));
   }
 
   public Mono<Map<Integer, List<ConfigEntry>>> loadBrokersConfig(List<Integer> brokerIds) {
@@ -139,8 +149,56 @@ public class ReactiveAdminClient implements Closeable {
                 c -> new ArrayList<>(c.getValue().entries()))));
   }
 
+  public Mono<Map<String, TopicDescription>> describeTopics() {
+    return listTopics(true).flatMap(this::describeTopics);
+  }
+
   public Mono<Map<String, TopicDescription>> describeTopics(Collection<String> topics) {
-    return toMono(client.describeTopics(topics).all());
+    return toMonoWithExceptionFilter(
+        client.describeTopics(topics).values(),
+        UnknownTopicOrPartitionException.class
+    );
+  }
+
+  /**
+   * Kafka API often returns Map responses with KafkaFuture values. If we do allOf()
+   * logic resulting Mono will be failing if any of Futures finished with error.
+   * In some situations it is not what we what, ex. we call describeTopics(List names) method and
+   * we getting UnknownTopicOrPartitionException for unknown topics and we what to just not put
+   * such topics in resulting map.
+   * <p/>
+   * This method converts input map into Mono[Map] ignoring keys for which KafkaFutures
+   * finished with <code>clazz</code> exception.
+   */
+  private <K, V> Mono<Map<K, V>> toMonoWithExceptionFilter(Map<K, KafkaFuture<V>> values,
+                                                           Class<? extends KafkaException> clazz) {
+    if (values.isEmpty()) {
+      return Mono.just(Map.of());
+    }
+
+    List<Mono<Tuple2<K, V>>> monos = values.entrySet().stream()
+        .map(e -> toMono(e.getValue()).map(r -> Tuples.of(e.getKey(), r)))
+        .collect(toList());
+
+    return Mono.create(sink -> {
+      var finishedCnt = new AtomicInteger();
+      var results = new ConcurrentHashMap<K, V>();
+      monos.forEach(mono -> mono.subscribe(
+          r -> {
+            results.put(r.getT1(), r.getT2());
+            if (finishedCnt.incrementAndGet() == monos.size()) {
+              sink.success(results);
+            }
+          },
+          th -> {
+            if (!th.getClass().isAssignableFrom(clazz)) {
+              sink.error(th);
+            } else if (finishedCnt.incrementAndGet() == monos.size()) {
+              sink.success(results);
+            }
+          }
+      ));
+    });
   }
 
   public Mono<Map<Integer, Map<String, DescribeLogDirsResponse.LogDirInfo>>> describeLogDirs() {
@@ -193,10 +251,6 @@ public class ReactiveAdminClient implements Closeable {
             )));
   }
 
-  public Mono<String> getClusterVersion() {
-    return getClusterVersionImpl(client);
-  }
-
   public Mono<Void> deleteConsumerGroups(Collection<String> groupIds) {
     return toMono(client.deleteConsumerGroups(groupIds).all())
         .onErrorResume(GroupIdNotFoundException.class,

+ 222 - 193
kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/TopicsService.java

@@ -1,12 +1,17 @@
 package com.provectus.kafka.ui.service;
 
+import static java.util.stream.Collectors.toList;
+import static java.util.stream.Collectors.toMap;
+
+import com.google.common.annotations.VisibleForTesting;
 import com.provectus.kafka.ui.exception.TopicMetadataException;
 import com.provectus.kafka.ui.exception.TopicNotFoundException;
 import com.provectus.kafka.ui.exception.ValidationException;
 import com.provectus.kafka.ui.mapper.ClusterMapper;
-import com.provectus.kafka.ui.model.CleanupPolicy;
 import com.provectus.kafka.ui.model.Feature;
+import com.provectus.kafka.ui.model.InternalLogDirStats;
 import com.provectus.kafka.ui.model.InternalPartition;
+import com.provectus.kafka.ui.model.InternalPartitionsOffsets;
 import com.provectus.kafka.ui.model.InternalReplica;
 import com.provectus.kafka.ui.model.InternalTopic;
 import com.provectus.kafka.ui.model.InternalTopicConfig;
@@ -24,23 +29,26 @@ import com.provectus.kafka.ui.model.TopicMessageSchemaDTO;
 import com.provectus.kafka.ui.model.TopicUpdateDTO;
 import com.provectus.kafka.ui.model.TopicsResponseDTO;
 import com.provectus.kafka.ui.serde.DeserializationService;
-import com.provectus.kafka.ui.util.ClusterUtil;
+import com.provectus.kafka.ui.util.JmxClusterUtil;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.Comparator;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
+import java.util.function.Function;
 import java.util.function.Predicate;
-import java.util.stream.Collectors;
 import lombok.RequiredArgsConstructor;
-import lombok.SneakyThrows;
+import lombok.Value;
 import org.apache.commons.lang3.StringUtils;
+import org.apache.kafka.clients.admin.ConfigEntry;
 import org.apache.kafka.clients.admin.NewPartitionReassignment;
 import org.apache.kafka.clients.admin.NewPartitions;
+import org.apache.kafka.clients.admin.OffsetSpec;
+import org.apache.kafka.clients.admin.TopicDescription;
+import org.apache.kafka.common.Node;
 import org.apache.kafka.common.TopicPartition;
 import org.springframework.stereotype.Service;
-import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
 
 @Service
@@ -50,102 +58,110 @@ public class TopicsService {
   private static final Integer DEFAULT_PAGE_SIZE = 25;
 
   private final AdminClientService adminClientService;
-  private final ConsumerGroupService consumerGroupService;
-  private final ClustersStorage clustersStorage;
   private final ClusterMapper clusterMapper;
   private final DeserializationService deserializationService;
-
-  public TopicsResponseDTO getTopics(KafkaCluster cluster,
-                                     Optional<Integer> page,
-                                     Optional<Integer> nullablePerPage,
-                                     Optional<Boolean> showInternal,
-                                     Optional<String> search,
-                                     Optional<TopicColumnsToSortDTO> sortBy) {
-    Predicate<Integer> positiveInt = i -> i > 0;
-    int perPage = nullablePerPage.filter(positiveInt).orElse(DEFAULT_PAGE_SIZE);
-    var topicsToSkip = (page.filter(positiveInt).orElse(1) - 1) * perPage;
-    List<InternalTopic> topics = cluster.getMetrics().getTopics().values().stream()
-        .filter(topic -> !topic.isInternal()
-            || showInternal
-            .map(i -> topic.isInternal() == i)
-            .orElse(true))
-        .filter(topic ->
-            search
-                .map(s -> StringUtils.containsIgnoreCase(topic.getName(), s))
-                .orElse(true))
-        .sorted(getComparatorForTopic(sortBy))
-        .collect(Collectors.toList());
-    var totalPages = (topics.size() / perPage)
-        + (topics.size() % perPage == 0 ? 0 : 1);
-    return new TopicsResponseDTO()
-        .pageCount(totalPages)
-        .topics(
-            topics.stream()
-                .skip(topicsToSkip)
-                .limit(perPage)
-                .map(t ->
-                    clusterMapper.toTopic(
-                        t.toBuilder().partitions(getTopicPartitions(cluster, t)).build()
-                    )
-                )
-                .collect(Collectors.toList())
-        );
+  private final MetricsCache metricsCache;
+
+  public Mono<TopicsResponseDTO> getTopics(KafkaCluster cluster,
+                                           Optional<Integer> pageNum,
+                                           Optional<Integer> nullablePerPage,
+                                           Optional<Boolean> showInternal,
+                                           Optional<String> search,
+                                           Optional<TopicColumnsToSortDTO> sortBy) {
+    return adminClientService.get(cluster).flatMap(ac ->
+      new Pagination(ac, metricsCache.get(cluster))
+          .getPage(pageNum, nullablePerPage, showInternal, search, sortBy)
+          .flatMap(page ->
+              loadTopics(cluster, page.getTopics())
+                  .map(topics ->
+                      new TopicsResponseDTO()
+                          .topics(topics.stream().map(clusterMapper::toTopic).collect(toList()))
+                          .pageCount(page.getTotalPages()))));
   }
 
-  private Comparator<InternalTopic> getComparatorForTopic(Optional<TopicColumnsToSortDTO> sortBy) {
-    var defaultComparator = Comparator.comparing(InternalTopic::getName);
-    if (sortBy.isEmpty()) {
-      return defaultComparator;
-    }
-    switch (sortBy.get()) {
-      case TOTAL_PARTITIONS:
-        return Comparator.comparing(InternalTopic::getPartitionCount);
-      case OUT_OF_SYNC_REPLICAS:
-        return Comparator.comparing(t -> t.getReplicas() - t.getInSyncReplicas());
-      case REPLICATION_FACTOR:
-        return Comparator.comparing(InternalTopic::getReplicationFactor);
-      case NAME:
-      default:
-        return defaultComparator;
+  private Mono<List<InternalTopic>> loadTopics(KafkaCluster c, List<String> topics) {
+    if (topics.isEmpty()) {
+      return Mono.just(List.of());
     }
+    return adminClientService.get(c)
+        .flatMap(ac ->
+            ac.describeTopics(topics).zipWith(ac.getTopicsConfig(topics),
+                (descriptions, configs) -> {
+                  metricsCache.update(c, descriptions, configs);
+                  return getPartitionOffsets(descriptions, ac).map(offsets -> {
+                    var metrics = metricsCache.get(c);
+                    return createList(
+                        topics,
+                        descriptions,
+                        configs,
+                        offsets,
+                        metrics.getJmxMetrics(),
+                        metrics.getLogDirInfo()
+                    );
+                  });
+                })).flatMap(Function.identity());
   }
 
-  public TopicDetailsDTO getTopicDetails(KafkaCluster cluster, String topicName) {
-    var topic = getTopic(cluster, topicName);
-    var upToDatePartitions = getTopicPartitions(cluster, topic);
-    topic = topic.toBuilder().partitions(upToDatePartitions).build();
-    return clusterMapper.toTopicDetails(topic);
+  private Mono<InternalTopic> loadTopic(KafkaCluster c, String topicName) {
+    return loadTopics(c, List.of(topicName))
+        .map(lst -> lst.stream().findFirst().orElseThrow(TopicNotFoundException::new));
   }
 
-  @SneakyThrows
-  public Mono<List<InternalTopic>> getTopicsData(ReactiveAdminClient client) {
-    return client.listTopics(true)
-        .flatMap(topics -> getTopicsData(client, topics).collectList());
+  private List<InternalTopic> createList(List<String> orderedNames,
+                                         Map<String, TopicDescription> descriptions,
+                                         Map<String, List<ConfigEntry>> configs,
+                                         InternalPartitionsOffsets partitionsOffsets,
+                                         JmxClusterUtil.JmxMetrics jmxMetrics,
+                                         InternalLogDirStats logDirInfo) {
+    return orderedNames.stream()
+        .filter(descriptions::containsKey)
+        .map(t -> InternalTopic.from(
+            descriptions.get(t),
+            configs.getOrDefault(t, List.of()),
+            partitionsOffsets,
+            jmxMetrics,
+            logDirInfo
+        ))
+        .collect(toList());
   }
 
-  private Flux<InternalTopic> getTopicsData(ReactiveAdminClient client, Collection<String> topics) {
-    final Mono<Map<String, List<InternalTopicConfig>>> configsMono =
-        loadTopicsConfig(client, topics);
-
-    return client.describeTopics(topics)
-        .map(m -> m.values().stream()
-            .map(ClusterUtil::mapToInternalTopic).collect(Collectors.toList()))
-        .flatMap(internalTopics -> configsMono
-            .map(configs -> mergeWithConfigs(internalTopics, configs).values()))
-        .flatMapMany(Flux::fromIterable);
+  private Mono<InternalPartitionsOffsets> getPartitionOffsets(Map<String, TopicDescription>
+                                                                  descriptions,
+                                                              ReactiveAdminClient ac) {
+    var topicPartitions = descriptions.values().stream()
+        .flatMap(desc ->
+            desc.partitions().stream().map(p -> new TopicPartition(desc.name(), p.partition())))
+        .collect(toList());
+
+    return ac.listOffsets(topicPartitions, OffsetSpec.earliest())
+        .zipWith(ac.listOffsets(topicPartitions, OffsetSpec.latest()),
+            (earliest, latest) ->
+                topicPartitions.stream()
+                    .filter(tp -> earliest.containsKey(tp) && latest.containsKey(tp))
+                    .map(tp ->
+                        Map.entry(tp,
+                            new InternalPartitionsOffsets.Offsets(
+                                earliest.get(tp), latest.get(tp))))
+                    .collect(toMap(Map.Entry::getKey, Map.Entry::getValue)))
+        .map(InternalPartitionsOffsets::new);
   }
 
-  public List<TopicConfigDTO> getTopicConfigs(KafkaCluster cluster, String topicName) {
-    var configs =  getTopic(cluster, topicName).getTopicConfigs();
-    return configs.stream()
-        .map(clusterMapper::toTopicConfig)
-        .collect(Collectors.toList());
+  public Mono<TopicDetailsDTO> getTopicDetails(KafkaCluster cluster, String topicName) {
+    return loadTopic(cluster, topicName).map(clusterMapper::toTopicDetails);
   }
 
+  public Mono<List<TopicConfigDTO>> getTopicConfigs(KafkaCluster cluster, String topicName) {
+    return adminClientService.get(cluster)
+        .flatMap(ac -> ac.getTopicsConfig(List.of(topicName)))
+        .map(m -> m.values().stream().findFirst().orElseThrow(TopicNotFoundException::new))
+        .map(lst -> lst.stream()
+            .map(InternalTopicConfig::from)
+            .map(clusterMapper::toTopicConfig)
+            .collect(toList()));
+  }
 
-  @SneakyThrows
-  private Mono<InternalTopic> createTopic(ReactiveAdminClient adminClient,
-                                         Mono<TopicCreationDTO> topicCreation) {
+  private Mono<InternalTopic> createTopic(KafkaCluster c, ReactiveAdminClient adminClient,
+                                          Mono<TopicCreationDTO> topicCreation) {
     return topicCreation.flatMap(topicData ->
             adminClient.createTopic(
                 topicData.getName(),
@@ -155,73 +171,39 @@ public class TopicsService {
             ).thenReturn(topicData)
         )
         .onErrorResume(t -> Mono.error(new TopicMetadataException(t.getMessage())))
-        .flatMap(topicData -> getUpdatedTopic(adminClient, topicData.getName()))
-        .switchIfEmpty(Mono.error(new RuntimeException("Can't find created topic")));
+        .flatMap(topicData -> loadTopic(c, topicData.getName()));
   }
 
-  public Mono<TopicDTO> createTopic(
-      KafkaCluster cluster, Mono<TopicCreationDTO> topicCreation) {
-    return adminClientService.get(cluster).flatMap(ac -> createTopic(ac, topicCreation))
-        .doOnNext(t -> clustersStorage.onTopicUpdated(cluster.getName(), t))
+  public Mono<TopicDTO> createTopic(KafkaCluster cluster, Mono<TopicCreationDTO> topicCreation) {
+    return adminClientService.get(cluster)
+        .flatMap(ac -> createTopic(cluster, ac, topicCreation))
         .map(clusterMapper::toTopic);
   }
 
-  private Map<String, InternalTopic> mergeWithConfigs(
-      List<InternalTopic> topics, Map<String, List<InternalTopicConfig>> configs) {
-    return topics.stream()
-        .map(t -> t.toBuilder().topicConfigs(configs.get(t.getName())).build())
-        .map(t -> t.toBuilder().cleanUpPolicy(
-            CleanupPolicy.fromString(t.getTopicConfigs().stream()
-                .filter(config -> config.getName().equals("cleanup.policy"))
-                .findFirst()
-                .orElseGet(() -> InternalTopicConfig.builder().value("unknown").build())
-                .getValue())).build())
-        .collect(Collectors.toMap(
-            InternalTopic::getName,
-            e -> e
-        ));
-  }
-
-  public Mono<InternalTopic> getUpdatedTopic(ReactiveAdminClient ac, String topicName) {
-    return getTopicsData(ac, List.of(topicName)).next();
-  }
-
-  public Mono<InternalTopic> updateTopic(KafkaCluster cluster,
+  private Mono<InternalTopic> updateTopic(KafkaCluster cluster,
                                          String topicName,
                                          TopicUpdateDTO topicUpdate) {
     return adminClientService.get(cluster)
         .flatMap(ac ->
-            ac.updateTopicConfig(topicName,
-                topicUpdate.getConfigs()).then(getUpdatedTopic(ac, topicName)));
+            ac.updateTopicConfig(topicName, topicUpdate.getConfigs())
+                .then(loadTopic(cluster, topicName)));
   }
 
   public Mono<TopicDTO> updateTopic(KafkaCluster cl, String topicName,
                                     Mono<TopicUpdateDTO> topicUpdate) {
     return topicUpdate
         .flatMap(t -> updateTopic(cl, topicName, t))
-        .doOnNext(t -> clustersStorage.onTopicUpdated(cl.getName(), t))
         .map(clusterMapper::toTopic);
   }
 
-  @SneakyThrows
-  private Mono<Map<String, List<InternalTopicConfig>>> loadTopicsConfig(
-      ReactiveAdminClient client, Collection<String> topicNames) {
-    return client.getTopicsConfig(topicNames)
-        .map(configs ->
-            configs.entrySet().stream().collect(Collectors.toMap(
-                Map.Entry::getKey,
-                c -> c.getValue().stream()
-                    .map(ClusterUtil::mapToInternalTopicConfig)
-                    .collect(Collectors.toList()))));
-  }
-
   private Mono<InternalTopic> changeReplicationFactor(
+      KafkaCluster cluster,
       ReactiveAdminClient adminClient,
       String topicName,
       Map<TopicPartition, Optional<NewPartitionReassignment>> reassignments
   ) {
     return adminClient.alterPartitionReassignments(reassignments)
-        .then(getUpdatedTopic(adminClient, topicName));
+        .then(loadTopic(cluster, topicName));
   }
 
   /**
@@ -231,11 +213,12 @@ public class TopicsService {
       KafkaCluster cluster,
       String topicName,
       ReplicationFactorChangeDTO replicationFactorChange) {
-    return adminClientService.get(cluster)
+    return loadTopic(cluster, topicName).flatMap(topic -> adminClientService.get(cluster)
         .flatMap(ac -> {
-          Integer actual = getTopic(cluster, topicName).getReplicationFactor();
+          Integer actual = topic.getReplicationFactor();
           Integer requested = replicationFactorChange.getTotalReplicationFactor();
-          Integer brokersCount = cluster.getMetrics().getBrokerCount();
+          Integer brokersCount = metricsCache.get(cluster).getClusterDescription()
+              .getNodes().size();
 
           if (requested.equals(actual)) {
             return Mono.error(
@@ -248,25 +231,24 @@ public class TopicsService {
                     String.format("Requested replication factor %s more than brokers count %s.",
                         requested, brokersCount)));
           }
-          return changeReplicationFactor(ac, topicName,
-              getPartitionsReassignments(cluster, topicName,
+          return changeReplicationFactor(cluster, ac, topicName,
+              getPartitionsReassignments(cluster, topic,
                   replicationFactorChange));
         })
-        .doOnNext(topic -> clustersStorage.onTopicUpdated(cluster.getName(), topic))
         .map(t -> new ReplicationFactorChangeResponseDTO()
             .topicName(t.getName())
-            .totalReplicationFactor(t.getReplicationFactor()));
+            .totalReplicationFactor(t.getReplicationFactor())));
   }
 
   private Map<TopicPartition, Optional<NewPartitionReassignment>> getPartitionsReassignments(
       KafkaCluster cluster,
-      String topicName,
+      InternalTopic topic,
       ReplicationFactorChangeDTO replicationFactorChange) {
     // Current assignment map (Partition number -> List of brokers)
-    Map<Integer, List<Integer>> currentAssignment = getCurrentAssignment(cluster, topicName);
+    Map<Integer, List<Integer>> currentAssignment = getCurrentAssignment(topic);
     // Brokers map (Broker id -> count)
     Map<Integer, Integer> brokersUsage = getBrokersMap(cluster, currentAssignment);
-    int currentReplicationFactor = getTopic(cluster, topicName).getReplicationFactor();
+    int currentReplicationFactor = topic.getReplicationFactor();
 
     // If we should to increase Replication factor
     if (replicationFactorChange.getTotalReplicationFactor() > currentReplicationFactor) {
@@ -276,7 +258,7 @@ public class TopicsService {
         var brokers = brokersUsage.entrySet().stream()
             .sorted(Map.Entry.comparingByValue())
             .map(Map.Entry::getKey)
-            .collect(Collectors.toList());
+            .collect(toList());
 
         // Iterate brokers and try to add them in assignment
         // while (partition replicas count != requested replication factor)
@@ -304,13 +286,13 @@ public class TopicsService {
         var brokersUsageList = brokersUsage.entrySet().stream()
             .sorted(Map.Entry.comparingByValue(Comparator.reverseOrder()))
             .map(Map.Entry::getKey)
-            .collect(Collectors.toList());
+            .collect(toList());
 
         // Iterate brokers and try to remove them from assignment
         // while (partition replicas count != requested replication factor)
         for (Integer broker : brokersUsageList) {
           // Check is the broker the leader of partition
-          if (!getTopic(cluster, topicName).getPartitions().get(partition).getLeader()
+          if (!topic.getPartitions().get(partition).getLeader()
               .equals(broker)) {
             brokers.remove(broker);
             brokersUsage.merge(broker, -1, Integer::sum);
@@ -328,26 +310,28 @@ public class TopicsService {
     }
 
     // Return result map
-    return currentAssignment.entrySet().stream().collect(Collectors.toMap(
-        e -> new TopicPartition(topicName, e.getKey()),
+    return currentAssignment.entrySet().stream().collect(toMap(
+        e -> new TopicPartition(topic.getName(), e.getKey()),
         e -> Optional.of(new NewPartitionReassignment(e.getValue()))
     ));
   }
 
-  private Map<Integer, List<Integer>> getCurrentAssignment(KafkaCluster cluster, String topicName) {
-    return getTopic(cluster, topicName).getPartitions().values().stream()
-        .collect(Collectors.toMap(
+  private Map<Integer, List<Integer>> getCurrentAssignment(InternalTopic topic) {
+    return topic.getPartitions().values().stream()
+        .collect(toMap(
             InternalPartition::getPartition,
             p -> p.getReplicas().stream()
                 .map(InternalReplica::getBroker)
-                .collect(Collectors.toList())
+                .collect(toList())
         ));
   }
 
   private Map<Integer, Integer> getBrokersMap(KafkaCluster cluster,
                                               Map<Integer, List<Integer>> currentAssignment) {
-    Map<Integer, Integer> result = cluster.getMetrics().getBrokers().stream()
-        .collect(Collectors.toMap(
+    Map<Integer, Integer> result = metricsCache.get(cluster).getClusterDescription().getNodes()
+        .stream()
+        .map(Node::id)
+        .collect(toMap(
             c -> c,
             c -> 0
         ));
@@ -361,9 +345,9 @@ public class TopicsService {
       KafkaCluster cluster,
       String topicName,
       PartitionsIncreaseDTO partitionsIncrease) {
-    return adminClientService.get(cluster)
-        .flatMap(ac -> {
-          Integer actualCount = getTopic(cluster, topicName).getPartitionCount();
+    return loadTopic(cluster, topicName).flatMap(topic ->
+        adminClientService.get(cluster).flatMap(ac -> {
+          Integer actualCount = topic.getPartitionCount();
           Integer requestedCount = partitionsIncrease.getTotalPartitionsCount();
 
           if (requestedCount < actualCount) {
@@ -383,55 +367,24 @@ public class TopicsService {
               NewPartitions.increaseTo(partitionsIncrease.getTotalPartitionsCount())
           );
           return ac.createPartitions(newPartitionsMap)
-              .then(getUpdatedTopic(ac, topicName));
+              .then(loadTopic(cluster, topicName));
         })
-        .doOnNext(t -> clustersStorage.onTopicUpdated(cluster.getName(), t))
         .map(t -> new PartitionsIncreaseResponseDTO()
             .topicName(t.getName())
-            .totalPartitionsCount(t.getPartitionCount()));
-  }
-
-  private Map<Integer, InternalPartition> getTopicPartitions(KafkaCluster c, InternalTopic topic) {
-    var tps = topic.getPartitions().values().stream()
-        .map(t -> new TopicPartition(topic.getName(), t.getPartition()))
-        .collect(Collectors.toList());
-    Map<Integer, InternalPartition> partitions =
-        topic.getPartitions().values().stream().collect(Collectors.toMap(
-            InternalPartition::getPartition,
-            tp -> tp
-        ));
-
-    try (var consumer = consumerGroupService.createConsumer(c)) {
-      final Map<TopicPartition, Long> earliest = consumer.beginningOffsets(tps);
-      final Map<TopicPartition, Long> latest = consumer.endOffsets(tps);
-
-      return tps.stream()
-          .map(tp -> partitions.get(tp.partition())
-              .withOffsets(
-                  earliest.getOrDefault(tp, -1L),
-                  latest.getOrDefault(tp, -1L)
-              )
-          ).collect(Collectors.toMap(
-              InternalPartition::getPartition,
-              tp -> tp
-          ));
-    } catch (Exception e) {
-      return Collections.emptyMap();
-    }
+            .totalPartitionsCount(t.getPartitionCount())));
   }
 
   public Mono<Void> deleteTopic(KafkaCluster cluster, String topicName) {
-    var topicDetails = getTopicDetails(cluster, topicName);
-    if (cluster.getFeatures().contains(Feature.TOPIC_DELETION)) {
+    if (metricsCache.get(cluster).getFeatures().contains(Feature.TOPIC_DELETION)) {
       return adminClientService.get(cluster).flatMap(c -> c.deleteTopic(topicName))
-          .doOnSuccess(t -> clustersStorage.onTopicDeleted(cluster.getName(), topicName));
+          .doOnSuccess(t -> metricsCache.onTopicDelete(cluster, topicName));
     } else {
       return Mono.error(new ValidationException("Topic deletion restricted"));
     }
   }
 
   public TopicMessageSchemaDTO getTopicSchema(KafkaCluster cluster, String topicName) {
-    if (!cluster.getMetrics().getTopics().containsKey(topicName)) {
+    if (!metricsCache.get(cluster).getTopicDescriptions().containsKey(topicName)) {
       throw new TopicNotFoundException();
     }
     return deserializationService
@@ -439,12 +392,88 @@ public class TopicsService {
         .getTopicSchema(topicName);
   }
 
-  private InternalTopic getTopic(KafkaCluster c, String topicName) {
-    var topic = c.getMetrics().getTopics().get(topicName);
-    if (topic == null) {
-      throw new TopicNotFoundException();
+  @VisibleForTesting
+  @Value
+  static class Pagination {
+    ReactiveAdminClient adminClient;
+    MetricsCache.Metrics metrics;
+
+    @Value
+    static class Page {
+      List<String> topics;
+      int totalPages;
+    }
+
+    Mono<Page> getPage(
+        Optional<Integer> pageNum,
+        Optional<Integer> nullablePerPage,
+        Optional<Boolean> showInternal,
+        Optional<String> search,
+        Optional<TopicColumnsToSortDTO> sortBy) {
+      return geTopicsForPagination()
+          .map(paginatingTopics -> {
+            Predicate<Integer> positiveInt = i -> i > 0;
+            int perPage = nullablePerPage.filter(positiveInt).orElse(DEFAULT_PAGE_SIZE);
+            var topicsToSkip = (pageNum.filter(positiveInt).orElse(1) - 1) * perPage;
+            List<InternalTopic> topics = paginatingTopics.stream()
+                .filter(topic -> !topic.isInternal()
+                    || showInternal.map(i -> topic.isInternal() == i).orElse(true))
+                .filter(topic ->
+                    search
+                        .map(s -> StringUtils.containsIgnoreCase(topic.getName(), s))
+                        .orElse(true))
+                .sorted(getComparatorForTopic(sortBy))
+                .collect(toList());
+            var totalPages = (topics.size() / perPage)
+                + (topics.size() % perPage == 0 ? 0 : 1);
+
+            List<String> topicsToRender = topics.stream()
+                .skip(topicsToSkip)
+                .limit(perPage)
+                .map(InternalTopic::getName)
+                .collect(toList());
+
+            return new Page(topicsToRender, totalPages);
+          });
+    }
+
+    private Comparator<InternalTopic> getComparatorForTopic(
+        Optional<TopicColumnsToSortDTO> sortBy) {
+      var defaultComparator = Comparator.comparing(InternalTopic::getName);
+      if (sortBy.isEmpty()) {
+        return defaultComparator;
+      }
+      switch (sortBy.get()) {
+        case TOTAL_PARTITIONS:
+          return Comparator.comparing(InternalTopic::getPartitionCount);
+        case OUT_OF_SYNC_REPLICAS:
+          return Comparator.comparing(t -> t.getReplicas() - t.getInSyncReplicas());
+        case REPLICATION_FACTOR:
+          return Comparator.comparing(InternalTopic::getReplicationFactor);
+        case NAME:
+        default:
+          return defaultComparator;
+      }
+    }
+
+    private Mono<List<String>> filterExisting(Collection<String> topics) {
+      return adminClient.listTopics(true)
+          .map(existing -> existing.stream().filter(topics::contains).collect(toList()));
+    }
+
+    private Mono<List<InternalTopic>> geTopicsForPagination() {
+      return filterExisting(metrics.getTopicDescriptions().keySet())
+          .map(lst -> lst.stream()
+              .map(topicName ->
+                  InternalTopic.from(
+                      metrics.getTopicDescriptions().get(topicName),
+                      metrics.getTopicConfigs().getOrDefault(topicName, List.of()),
+                      InternalPartitionsOffsets.empty(),
+                      metrics.getJmxMetrics(),
+                      metrics.getLogDirInfo()))
+              .collect(toList())
+          );
     }
-    return topic;
   }
 
 }

+ 1 - 0
kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/ZookeeperService.java

@@ -15,6 +15,7 @@ import org.apache.zookeeper.ZooKeeper;
 import org.springframework.stereotype.Service;
 import org.springframework.util.StringUtils;
 import reactor.core.publisher.Mono;
+import reactor.core.scheduler.Schedulers;
 
 @Service
 @RequiredArgsConstructor

+ 1 - 101
kafka-ui-api/src/main/java/com/provectus/kafka/ui/util/ClusterUtil.java

@@ -1,19 +1,11 @@
 package com.provectus.kafka.ui.util;
 
-import static com.provectus.kafka.ui.util.KafkaConstants.TOPIC_DEFAULT_CONFIGS;
-import static org.apache.kafka.common.config.TopicConfig.MESSAGE_FORMAT_VERSION_CONFIG;
-
 import com.provectus.kafka.ui.model.BrokerDTO;
 import com.provectus.kafka.ui.model.ConsumerGroupDTO;
 import com.provectus.kafka.ui.model.ConsumerGroupDetailsDTO;
 import com.provectus.kafka.ui.model.ConsumerGroupStateDTO;
 import com.provectus.kafka.ui.model.ConsumerGroupTopicPartitionDTO;
-import com.provectus.kafka.ui.model.InternalBrokerConfig;
 import com.provectus.kafka.ui.model.InternalConsumerGroup;
-import com.provectus.kafka.ui.model.InternalPartition;
-import com.provectus.kafka.ui.model.InternalReplica;
-import com.provectus.kafka.ui.model.InternalTopic;
-import com.provectus.kafka.ui.model.InternalTopicConfig;
 import com.provectus.kafka.ui.model.MessageFormatDTO;
 import com.provectus.kafka.ui.model.ServerStatusDTO;
 import com.provectus.kafka.ui.model.TopicMessageDTO;
@@ -24,16 +16,13 @@ import java.time.ZoneId;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashMap;
-import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 import lombok.extern.log4j.Log4j2;
-import org.apache.kafka.clients.admin.ConfigEntry;
 import org.apache.kafka.clients.admin.ConsumerGroupDescription;
-import org.apache.kafka.clients.admin.TopicDescription;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.OffsetAndMetadata;
 import org.apache.kafka.common.Node;
@@ -41,6 +30,7 @@ import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.record.TimestampType;
 import org.apache.kafka.common.utils.Bytes;
 
+
 @Log4j2
 public class ClusterUtil {
 
@@ -163,89 +153,6 @@ public class ClusterUtil {
     }
   }
 
-
-  public static InternalTopicConfig mapToInternalTopicConfig(ConfigEntry configEntry) {
-    InternalTopicConfig.InternalTopicConfigBuilder builder = InternalTopicConfig.builder()
-        .name(configEntry.name())
-        .value(configEntry.value())
-        .source(configEntry.source())
-        .isReadOnly(configEntry.isReadOnly())
-        .isSensitive(configEntry.isSensitive())
-        .synonyms(configEntry.synonyms());
-    if (configEntry.name().equals(MESSAGE_FORMAT_VERSION_CONFIG)) {
-      builder.defaultValue(configEntry.value());
-    } else {
-      builder.defaultValue(TOPIC_DEFAULT_CONFIGS.get(configEntry.name()));
-    }
-    return builder.build();
-  }
-
-  public static InternalBrokerConfig mapToInternalBrokerConfig(ConfigEntry configEntry) {
-    InternalBrokerConfig.InternalBrokerConfigBuilder builder = InternalBrokerConfig.builder()
-        .name(configEntry.name())
-        .value(configEntry.value())
-        .source(configEntry.source())
-        .isReadOnly(configEntry.isReadOnly())
-        .isSensitive(configEntry.isSensitive())
-        .synonyms(configEntry.synonyms());
-    return builder.build();
-  }
-
-  public static InternalTopic mapToInternalTopic(TopicDescription topicDescription) {
-    var topic = InternalTopic.builder();
-    topic.internal(
-        topicDescription.isInternal() || topicDescription.name().startsWith("_")
-    );
-    topic.name(topicDescription.name());
-
-    List<InternalPartition> partitions = topicDescription.partitions().stream().map(
-        partition -> {
-          var partitionDto = InternalPartition.builder();
-          partitionDto.leader(partition.leader().id());
-          partitionDto.partition(partition.partition());
-          partitionDto.inSyncReplicasCount(partition.isr().size());
-          partitionDto.replicasCount(partition.replicas().size());
-          List<InternalReplica> replicas = partition.replicas().stream().map(
-              r -> new InternalReplica(r.id(), partition.leader().id() != r.id(),
-                  partition.isr().contains(r)))
-              .collect(Collectors.toList());
-          partitionDto.replicas(replicas);
-          return partitionDto.build();
-        })
-        .collect(Collectors.toList());
-
-    int urpCount = partitions.stream()
-        .flatMap(partition -> partition.getReplicas().stream())
-        .filter(p -> !p.isInSync()).mapToInt(e -> 1)
-        .sum();
-
-    int inSyncReplicasCount = partitions.stream()
-        .mapToInt(InternalPartition::getInSyncReplicasCount)
-        .sum();
-
-    int replicasCount = partitions.stream()
-        .mapToInt(InternalPartition::getReplicasCount)
-        .sum();
-
-    topic.partitions(partitions.stream().collect(Collectors.toMap(
-        InternalPartition::getPartition,
-        t -> t
-    )));
-    topic.replicas(replicasCount);
-    topic.partitionCount(topicDescription.partitions().size());
-    topic.inSyncReplicas(inSyncReplicasCount);
-
-    topic.replicationFactor(
-        topicDescription.partitions().isEmpty()
-            ? 0
-            : topicDescription.partitions().get(0).replicas().size()
-    );
-
-    topic.underReplicatedPartitions(urpCount);
-
-    return topic.build();
-  }
-
   public static int convertToIntServerStatus(ServerStatusDTO serverStatus) {
     return serverStatus.equals(ServerStatusDTO.ONLINE) ? 1 : 0;
   }
@@ -305,13 +212,6 @@ public class ClusterUtil {
     }
   }
 
-
-  public static <T, R> Map<T, R> toSingleMap(Stream<Map<T, R>> streamOfMaps) {
-    return streamOfMaps
-        .reduce((map1, map2) -> Stream.concat(map1.entrySet().stream(), map2.entrySet().stream())
-            .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue))).orElseThrow();
-  }
-
   public static Optional<InternalConsumerGroup> filterConsumerGroupTopic(
       InternalConsumerGroup consumerGroup, Optional<String> topic) {
 

+ 0 - 5
kafka-ui-api/src/main/java/com/provectus/kafka/ui/util/Constants.java

@@ -1,5 +0,0 @@
-package com.provectus.kafka.ui.util;
-
-public class Constants {
-  public static final String DELETE_TOPIC_ENABLE = "delete.topic.enable";
-}

+ 9 - 0
kafka-ui-api/src/main/java/com/provectus/kafka/ui/util/JmxClusterUtil.java

@@ -56,6 +56,15 @@ public class JmxClusterUtil {
     Map<String, BigDecimal> bytesOutPerSec;
     Map<Integer, JmxBrokerMetrics> internalBrokerMetrics;
     List<MetricDTO> metrics;
+
+    public static JmxMetrics empty() {
+      return JmxClusterUtil.JmxMetrics.builder()
+          .bytesInPerSec(Map.of())
+          .bytesOutPerSec(Map.of())
+          .internalBrokerMetrics(Map.of())
+          .metrics(List.of())
+          .build();
+    }
   }
 
   public Mono<JmxMetrics> getBrokerMetrics(KafkaCluster cluster, Collection<Node> nodes) {

+ 164 - 0
kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/TopicsServicePaginationTest.java

@@ -0,0 +1,164 @@
+package com.provectus.kafka.ui.service;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import com.provectus.kafka.ui.model.TopicColumnsToSortDTO;
+import java.util.Collection;
+import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.UUID;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import org.apache.kafka.clients.admin.TopicDescription;
+import org.apache.kafka.common.TopicPartitionInfo;
+import org.junit.jupiter.api.Test;
+import reactor.core.publisher.Mono;
+
+class TopicsServicePaginationTest {
+
+  private TopicsService.Pagination pagination;
+
+  private void init(Collection<TopicDescription> topicsInCache) {
+    ReactiveAdminClient adminClient = when(mock(ReactiveAdminClient.class).listTopics(true))
+        .thenReturn(Mono.just(topicsInCache.stream().map(TopicDescription::name)
+            .collect(Collectors.toSet())))
+        .getMock();
+
+    MetricsCache.Metrics metricsCache = MetricsCache.empty().toBuilder()
+        .topicDescriptions(
+            topicsInCache.stream().collect(Collectors.toMap(TopicDescription::name, d -> d)))
+        .build();
+
+    pagination = new TopicsService.Pagination(adminClient, metricsCache);
+  }
+
+  @Test
+  public void shouldListFirst25Topics() {
+    init(
+        IntStream.rangeClosed(1, 100).boxed()
+            .map(Objects::toString)
+            .map(name -> new TopicDescription(name, false, List.of()))
+            .collect(Collectors.toList())
+    );
+
+    var topics = pagination.getPage(
+        Optional.empty(), Optional.empty(), Optional.empty(),
+        Optional.empty(), Optional.empty()).block();
+    assertThat(topics.getTotalPages()).isEqualTo(4);
+    assertThat(topics.getTopics()).hasSize(25);
+    assertThat(topics.getTopics()).isSorted();
+  }
+
+  @Test
+  public void shouldCalculateCorrectPageCountForNonDivisiblePageSize() {
+    init(
+        IntStream.rangeClosed(1, 100).boxed()
+            .map(Objects::toString)
+            .map(name -> new TopicDescription(name, false, List.of()))
+            .collect(Collectors.toList())
+    );
+
+    var topics = pagination.getPage(Optional.of(4), Optional.of(33),
+        Optional.empty(), Optional.empty(), Optional.empty()).block();
+    assertThat(topics.getTotalPages()).isEqualTo(4);
+    assertThat(topics.getTopics()).hasSize(1)
+        .first().isEqualTo("99");
+  }
+
+  @Test
+  public void shouldCorrectlyHandleNonPositivePageNumberAndPageSize() {
+    init(
+        IntStream.rangeClosed(1, 100).boxed()
+            .map(Objects::toString)
+            .map(name -> new TopicDescription(name, false, List.of()))
+            .collect(Collectors.toList())
+    );
+
+    var topics = pagination.getPage(Optional.of(0), Optional.of(-1),
+        Optional.empty(), Optional.empty(), Optional.empty()).block();
+    assertThat(topics.getTotalPages()).isEqualTo(4);
+    assertThat(topics.getTopics()).hasSize(25);
+    assertThat(topics.getTopics()).isSorted();
+  }
+
+  @Test
+  public void shouldListBotInternalAndNonInternalTopics() {
+    init(
+        IntStream.rangeClosed(1, 100).boxed()
+            .map(Objects::toString)
+            .map(name -> new TopicDescription(name, Integer.parseInt(name) % 10 == 0, List.of()))
+            .collect(Collectors.toList())
+    );
+
+    var topics = pagination.getPage(
+        Optional.empty(), Optional.empty(), Optional.of(true),
+        Optional.empty(), Optional.empty()).block();
+    assertThat(topics.getTotalPages()).isEqualTo(4);
+    assertThat(topics.getTopics()).hasSize(25);
+    assertThat(topics.getTopics()).isSorted();
+  }
+
+
+  @Test
+  public void shouldListOnlyNonInternalTopics() {
+    init(
+        IntStream.rangeClosed(1, 100).boxed()
+            .map(Objects::toString)
+            .map(name -> new TopicDescription(name, false, List.of()))
+            .collect(Collectors.toList())
+    );
+
+    var topics = pagination.getPage(
+        Optional.empty(), Optional.empty(), Optional.of(true),
+        Optional.empty(), Optional.empty()).block();
+    assertThat(topics.getTotalPages()).isEqualTo(4);
+    assertThat(topics.getTopics()).hasSize(25);
+    assertThat(topics.getTopics()).isSorted();
+  }
+
+
+  @Test
+  public void shouldListOnlyTopicsContainingOne() {
+    init(
+        IntStream.rangeClosed(1, 100).boxed()
+            .map(Objects::toString)
+            .map(name -> new TopicDescription(name, false, List.of()))
+            .collect(Collectors.toList())
+    );
+
+    var topics = pagination.getPage(
+        Optional.empty(), Optional.empty(), Optional.empty(),
+        Optional.of("1"), Optional.empty()).block();
+    assertThat(topics.getTotalPages()).isEqualTo(1);
+    assertThat(topics.getTopics()).hasSize(20);
+    assertThat(topics.getTopics()).isSorted();
+  }
+
+  @Test
+  public void shouldListTopicsOrderedByPartitionsCount() {
+    List<TopicDescription> topicDescriptions = IntStream.rangeClosed(1, 100).boxed()
+        .map(i -> new TopicDescription(UUID.randomUUID().toString(), false,
+            IntStream.range(0, i)
+                .mapToObj(p ->
+                    new TopicPartitionInfo(p, null, List.of(), List.of()))
+                .collect(Collectors.toList())))
+        .collect(Collectors.toList());
+
+    init(topicDescriptions);
+
+    var topics = pagination.getPage(
+        Optional.empty(), Optional.empty(), Optional.empty(),
+        Optional.empty(), Optional.of(TopicColumnsToSortDTO.TOTAL_PARTITIONS)).block();
+    assertThat(topics.getTotalPages()).isEqualTo(4);
+    assertThat(topics.getTopics()).hasSize(25);
+    assertThat(topics.getTopics()).containsExactlyElementsOf(
+        topicDescriptions.stream()
+            .map(TopicDescription::name)
+            .limit(25)
+            .collect(Collectors.toList()));
+  }
+
+}

+ 0 - 225
kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/TopicsServiceTest.java

@@ -1,225 +0,0 @@
-package com.provectus.kafka.ui.service;
-
-import static org.assertj.core.api.Assertions.assertThat;
-
-import com.provectus.kafka.ui.mapper.ClusterMapper;
-import com.provectus.kafka.ui.model.InternalClusterMetrics;
-import com.provectus.kafka.ui.model.InternalTopic;
-import com.provectus.kafka.ui.model.InternalTopicConfig;
-import com.provectus.kafka.ui.model.KafkaCluster;
-import com.provectus.kafka.ui.model.TopicColumnsToSortDTO;
-import com.provectus.kafka.ui.model.TopicDTO;
-import com.provectus.kafka.ui.serde.DeserializationService;
-import java.util.List;
-import java.util.Map;
-import java.util.Objects;
-import java.util.Optional;
-import java.util.function.Function;
-import java.util.stream.Collectors;
-import java.util.stream.IntStream;
-import org.apache.kafka.clients.admin.ConfigEntry;
-import org.junit.jupiter.api.Test;
-import org.junit.jupiter.api.extension.ExtendWith;
-import org.mapstruct.factory.Mappers;
-import org.mockito.InjectMocks;
-import org.mockito.Mock;
-import org.mockito.Spy;
-import org.mockito.junit.jupiter.MockitoExtension;
-
-@ExtendWith(MockitoExtension.class)
-class TopicsServiceTest {
-  @Spy
-  private final ClusterMapper clusterMapper = Mappers.getMapper(ClusterMapper.class);
-  @InjectMocks
-  private TopicsService topicsService;
-  @Mock
-  private AdminClientService adminClientService;
-  @Mock
-  private ConsumerGroupService consumerGroupService;
-  @Mock
-  private ClustersStorage clustersStorage;
-
-  @Mock
-  private DeserializationService deserializationService;
-
-  @Test
-  public void shouldListFirst25Topics() {
-    final KafkaCluster cluster = clusterWithTopics(
-            IntStream.rangeClosed(1, 100).boxed()
-                .map(Objects::toString)
-                .collect(Collectors.toMap(Function.identity(), e -> InternalTopic.builder()
-                    .partitions(Map.of())
-                    .name(e)
-                    .build()))
-        );
-
-    var topics = topicsService.getTopics(cluster,
-        Optional.empty(), Optional.empty(), Optional.empty(),
-        Optional.empty(), Optional.empty());
-    assertThat(topics.getPageCount()).isEqualTo(4);
-    assertThat(topics.getTopics()).hasSize(25);
-    assertThat(topics.getTopics()).map(TopicDTO::getName).isSorted();
-  }
-
-  @Test
-  public void shouldCalculateCorrectPageCountForNonDivisiblePageSize() {
-    var cluster = clusterWithTopics(
-            IntStream.rangeClosed(1, 100).boxed()
-                .map(Objects::toString)
-                .collect(Collectors.toMap(Function.identity(), e -> InternalTopic.builder()
-                    .partitions(Map.of())
-                    .name(e)
-                    .build()))
-        );
-
-    var topics = topicsService.getTopics(cluster, Optional.of(4), Optional.of(33),
-        Optional.empty(), Optional.empty(), Optional.empty());
-    assertThat(topics.getPageCount()).isEqualTo(4);
-    assertThat(topics.getTopics()).hasSize(1)
-        .first().extracting(TopicDTO::getName).isEqualTo("99");
-  }
-
-  @Test
-  public void shouldCorrectlyHandleNonPositivePageNumberAndPageSize() {
-    var cluster = clusterWithTopics(
-            IntStream.rangeClosed(1, 100).boxed()
-                .map(Objects::toString)
-                .collect(Collectors.toMap(Function.identity(), e -> InternalTopic.builder()
-                    .partitions(Map.of())
-                    .name(e)
-                    .build()))
-        );
-
-    var topics = topicsService.getTopics(cluster, Optional.of(0), Optional.of(-1),
-        Optional.empty(), Optional.empty(), Optional.empty());
-    assertThat(topics.getPageCount()).isEqualTo(4);
-    assertThat(topics.getTopics()).hasSize(25);
-    assertThat(topics.getTopics()).map(TopicDTO::getName).isSorted();
-  }
-
-  @Test
-  public void shouldListBotInternalAndNonInternalTopics() {
-    var cluster = clusterWithTopics(
-            IntStream.rangeClosed(1, 100).boxed()
-                .map(Objects::toString)
-                .collect(Collectors.toMap(Function.identity(), e -> InternalTopic.builder()
-                    .partitions(Map.of())
-                    .name(e)
-                    .internal(Integer.parseInt(e) % 10 == 0)
-                    .build()))
-        );
-
-    var topics = topicsService.getTopics(cluster,
-        Optional.empty(), Optional.empty(), Optional.of(true),
-        Optional.empty(), Optional.empty());
-    assertThat(topics.getPageCount()).isEqualTo(4);
-    assertThat(topics.getTopics()).hasSize(25);
-    assertThat(topics.getTopics()).map(TopicDTO::getName).isSorted();
-  }
-
-
-  @Test
-  public void shouldListOnlyNonInternalTopics() {
-    var cluster = clusterWithTopics(
-            IntStream.rangeClosed(1, 100).boxed()
-                .map(Objects::toString)
-                .collect(Collectors.toMap(Function.identity(), e -> InternalTopic.builder()
-                    .partitions(Map.of())
-                    .name(e)
-                    .internal(Integer.parseInt(e) % 10 == 0)
-                    .build()))
-        );
-
-    var topics = topicsService.getTopics(cluster,
-        Optional.empty(), Optional.empty(), Optional.of(true),
-        Optional.empty(), Optional.empty());
-    assertThat(topics.getPageCount()).isEqualTo(4);
-    assertThat(topics.getTopics()).hasSize(25);
-    assertThat(topics.getTopics()).map(TopicDTO::getName).isSorted();
-  }
-
-
-  @Test
-  public void shouldListOnlyTopicsContainingOne() {
-    var cluster = clusterWithTopics(
-            IntStream.rangeClosed(1, 100).boxed()
-                .map(Objects::toString)
-                .collect(Collectors.toMap(Function.identity(), e -> InternalTopic.builder()
-                    .partitions(Map.of())
-                    .name(e)
-                    .build()))
-        );
-
-    var topics = topicsService.getTopics(cluster,
-        Optional.empty(), Optional.empty(), Optional.empty(),
-        Optional.of("1"), Optional.empty());
-    assertThat(topics.getPageCount()).isEqualTo(1);
-    assertThat(topics.getTopics()).hasSize(20);
-    assertThat(topics.getTopics()).map(TopicDTO::getName).isSorted();
-  }
-
-  @Test
-  public void shouldListTopicsOrderedByPartitionsCount() {
-    var cluster = clusterWithTopics(
-            IntStream.rangeClosed(1, 100).boxed()
-                .map(Objects::toString)
-                .collect(Collectors.toMap(Function.identity(), e -> InternalTopic.builder()
-                    .partitions(Map.of())
-                    .name(e)
-                    .partitionCount(100 - Integer.parseInt(e))
-                    .build()))
-        );
-
-    var topics = topicsService.getTopics(cluster,
-        Optional.empty(), Optional.empty(), Optional.empty(),
-        Optional.empty(), Optional.of(TopicColumnsToSortDTO.TOTAL_PARTITIONS));
-    assertThat(topics.getPageCount()).isEqualTo(4);
-    assertThat(topics.getTopics()).hasSize(25);
-    assertThat(topics.getTopics()).map(TopicDTO::getPartitionCount).isSorted();
-  }
-
-  @Test
-  public void shouldRetrieveTopicConfigs() {
-    var cluster = clusterWithTopics(
-            IntStream.rangeClosed(1, 100).boxed()
-                .map(Objects::toString)
-                .collect(Collectors.toMap(Function.identity(), e -> InternalTopic.builder()
-                    .name(e)
-                    .topicConfigs(
-                        List.of(InternalTopicConfig.builder()
-                            .name("testName")
-                            .value("testValue")
-                            .defaultValue("testDefaultValue")
-                            .source(ConfigEntry.ConfigSource.DEFAULT_CONFIG)
-                            .isReadOnly(true)
-                            .isSensitive(true)
-                            .synonyms(List.of())
-                            .build()
-                        )
-                    )
-                    .build()))
-        );
-
-    var topicConfigs = topicsService.getTopicConfigs(cluster, "1");
-    assertThat(topicConfigs).hasSize(1);
-
-    var topicConfig = topicConfigs.get(0);
-    assertThat(topicConfig.getName()).isEqualTo("testName");
-    assertThat(topicConfig.getValue()).isEqualTo("testValue");
-    assertThat(topicConfig.getDefaultValue()).isEqualTo("testDefaultValue");
-    assertThat(topicConfig.getSource().getValue())
-            .isEqualTo(ConfigEntry.ConfigSource.DEFAULT_CONFIG.name());
-    assertThat(topicConfig.getSynonyms()).isNotNull();
-    assertThat(topicConfig.getIsReadOnly()).isTrue();
-    assertThat(topicConfig.getIsSensitive()).isTrue();
-  }
-
-  private KafkaCluster clusterWithTopics(Map<String, InternalTopic> topics) {
-    return KafkaCluster.builder()
-        .metrics(InternalClusterMetrics.builder()
-            .topics(topics)
-            .build())
-        .build();
-  }
-
-}