German Osin vor 5 Jahren
Ursprung
Commit
c1ee1cf013
21 geänderte Dateien mit 367 neuen und 417 gelöschten Zeilen
  1. 4 3
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/ClustersMetricsScheduler.java
  2. 3 3
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/mapper/BrokersMetricsMapper.java
  3. 3 4
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/mapper/ClusterDtoMapper.java
  4. 19 10
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/mapper/ClusterMapper.java
  5. 0 13
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/model/ClusterWithId.java
  6. 3 2
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/model/ClustersStorage.java
  7. 0 39
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/model/InternalBrokersMetrics.java
  8. 0 18
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/model/InternalCluster.java
  9. 26 0
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/model/InternalClusterMetrics.java
  10. 0 15
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/model/InternalMetrics.java
  11. 0 1
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/model/InternalPartition.java
  12. 1 1
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/model/InternalReplica.java
  13. 10 7
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/model/InternalTopic.java
  14. 13 0
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/model/InternalTopicConfig.java
  15. 4 17
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/model/KafkaCluster.java
  16. 33 34
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/service/ClusterService.java
  17. 4 4
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/service/MetricsUpdateService.java
  18. 66 31
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/util/ClusterUtil.java
  19. 146 197
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/kafka/KafkaService.java
  20. 26 7
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/rest/MetricsRestController.java
  21. 6 11
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/zookeeper/ZookeeperService.java

+ 4 - 3
kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/ClustersMetricsScheduler.java

@@ -1,6 +1,5 @@
 package com.provectus.kafka.ui.cluster;
 
-import com.provectus.kafka.ui.cluster.model.ClusterWithId;
 import com.provectus.kafka.ui.cluster.model.ClustersStorage;
 import com.provectus.kafka.ui.cluster.service.MetricsUpdateService;
 import lombok.RequiredArgsConstructor;
@@ -10,6 +9,8 @@ import org.springframework.stereotype.Component;
 import reactor.core.publisher.Flux;
 import reactor.core.scheduler.Schedulers;
 
+import java.util.Map;
+
 @Component
 @RequiredArgsConstructor
 @Log4j2
@@ -23,9 +24,9 @@ public class ClustersMetricsScheduler {
     public void updateMetrics() {
         Flux.fromIterable(clustersStorage.getKafkaClustersMap().entrySet())
                 .subscribeOn(Schedulers.parallel())
-                .map(s -> new ClusterWithId(s.getKey(), s.getValue()))
+                .map(Map.Entry::getValue)
                 .flatMap(metricsUpdateService::updateMetrics)
-                .doOnNext(s -> clustersStorage.setKafkaCluster(s.getId(), s.getKafkaCluster()))
+                .doOnNext(s -> clustersStorage.setKafkaCluster(s.getId(), s))
                 .subscribe();
     }
 }

+ 3 - 3
kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/mapper/BrokersMetricsMapper.java

@@ -1,13 +1,13 @@
 package com.provectus.kafka.ui.cluster.mapper;
 
-import com.provectus.kafka.ui.cluster.model.InternalBrokersMetrics;
+import com.provectus.kafka.ui.cluster.model.InternalClusterMetrics;
 import com.provectus.kafka.ui.model.BrokersMetrics;
 import org.mapstruct.Mapper;
 
 @Mapper(componentModel = "spring")
 public interface BrokersMetricsMapper {
 
-    InternalBrokersMetrics toBrokersMetricsDto (BrokersMetrics brokersMetrics);
+    InternalClusterMetrics toBrokersMetricsDto (BrokersMetrics brokersMetrics);
 
-    BrokersMetrics toBrokersMetrics (InternalBrokersMetrics brokersMetrics);
+    BrokersMetrics toBrokersMetrics (InternalClusterMetrics brokersMetrics);
 }

+ 3 - 4
kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/mapper/ClusterDtoMapper.java

@@ -1,13 +1,12 @@
 package com.provectus.kafka.ui.cluster.mapper;
 
-import com.provectus.kafka.ui.cluster.model.InternalCluster;
+import com.provectus.kafka.ui.cluster.model.KafkaCluster;
 import com.provectus.kafka.ui.model.Cluster;
 import org.mapstruct.Mapper;
 
 @Mapper(componentModel = "spring")
 public interface ClusterDtoMapper {
 
-    InternalCluster toClusterDto(Cluster cluster);
-
-    Cluster toCluster(InternalCluster cluster);
+    KafkaCluster toInternalCluster(Cluster cluster);
+    Cluster toClusterDto(KafkaCluster cluster);
 }

+ 19 - 10
kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/mapper/ClusterMapper.java

@@ -1,19 +1,28 @@
 package com.provectus.kafka.ui.cluster.mapper;
 
 import com.provectus.kafka.ui.cluster.config.ClustersProperties;
+import com.provectus.kafka.ui.cluster.model.InternalClusterMetrics;
+import com.provectus.kafka.ui.cluster.model.InternalTopic;
+import com.provectus.kafka.ui.cluster.model.InternalTopicConfig;
 import com.provectus.kafka.ui.cluster.model.KafkaCluster;
+import com.provectus.kafka.ui.model.*;
 import org.mapstruct.Mapper;
 import org.mapstruct.Mapping;
 
-@Mapper
-public abstract class ClusterMapper {
+@Mapper(componentModel = "spring")
+public interface ClusterMapper {
 
-    @Mapping(source = "name", target = "cluster.name")
-    @Mapping(target = "brokersMetrics", ignore = true)
-    @Mapping(target = "cluster", ignore = true)
-    @Mapping(target = "lastKafkaException", ignore = true)
-    @Mapping(target = "lastZookeeperException", ignore = true)
-    @Mapping(target = "topicConfigsMap", ignore = true)
-//    @Mapping(target = "topics", ignore = true)
-    public abstract KafkaCluster toKafkaCluster(ClustersProperties.Cluster clusterProperties);
+    KafkaCluster toKafkaCluster(ClustersProperties.Cluster clusterProperties);
+
+    @Mapping(target = "brokerCount", source = "metrics.brokerCount")
+    @Mapping(target = "onlinePartitionCount", source = "metrics.onlinePartitionCount")
+    @Mapping(target = "topicCount", source = "metrics.topicCount")
+    @Mapping(target = "bytesInPerSec", source = "metrics.bytesInPerSec")
+    @Mapping(target = "bytesOutPerSec", source = "metrics.bytesOutPerSec")
+    Cluster toCluster(KafkaCluster cluster);
+
+    BrokersMetrics toBrokerMetrics(InternalClusterMetrics metrics);
+    Topic toTopic(InternalTopic topic);
+    TopicDetails toTopicDetails(InternalTopic topic);
+    TopicConfig toTopicConfig(InternalTopicConfig topic);
 }

+ 0 - 13
kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/model/ClusterWithId.java

@@ -1,13 +0,0 @@
-package com.provectus.kafka.ui.cluster.model;
-
-import lombok.AllArgsConstructor;
-import lombok.Builder;
-import lombok.Data;
-
-@Data
-@AllArgsConstructor
-@Builder(toBuilder = true)
-public class ClusterWithId {
-    private final String id;
-    private final KafkaCluster kafkaCluster;
-}

+ 3 - 2
kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/model/ClustersStorage.java

@@ -9,6 +9,7 @@ import org.springframework.stereotype.Component;
 import javax.annotation.PostConstruct;
 import java.util.Collection;
 import java.util.Map;
+import java.util.Optional;
 import java.util.concurrent.ConcurrentHashMap;
 
 @Component
@@ -35,8 +36,8 @@ public class ClustersStorage {
         return kafkaClusters.values();
     }
 
-    public KafkaCluster getClusterByName(String clusterName) {
-        return kafkaClusters.get(clusterName);
+    public Optional<KafkaCluster> getClusterByName(String clusterName) {
+        return Optional.ofNullable(kafkaClusters.get(clusterName));
     }
 
     public void setKafkaCluster(String key, KafkaCluster kafkaCluster) {

+ 0 - 39
kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/model/InternalBrokersMetrics.java

@@ -1,39 +0,0 @@
-package com.provectus.kafka.ui.cluster.model;
-
-import lombok.Builder;
-import lombok.Data;
-
-@Builder(toBuilder = true)
-@Data
-public class InternalBrokersMetrics {
-
-    private Integer brokerCount;
-    private Integer zooKeeperStatus;
-    private Integer activeControllers;
-    private Integer uncleanLeaderElectionCount;
-    private Integer onlinePartitionCount;
-    private Integer underReplicatedPartitionCount;
-    private Integer offlinePartitionCount;
-    private Integer inSyncReplicasCount;
-    private Integer outOfSyncReplicasCount;
-
-    public void increaseOnlinePartitionCount(Integer value) {
-        this.onlinePartitionCount = this.onlinePartitionCount + value;
-    }
-
-    public void increaseOfflinePartitionCount(Integer value) {
-        this.offlinePartitionCount = this.offlinePartitionCount + value;
-    }
-
-    public void increaseUnderReplicatedPartitionCount(Integer value) {
-        this.underReplicatedPartitionCount = this.underReplicatedPartitionCount + value;
-    }
-
-    public void increaseInSyncReplicasCount(Integer value) {
-        this.inSyncReplicasCount = this.inSyncReplicasCount + value;
-    }
-
-    public void increaseOutOfSyncReplicasCount(Integer value) {
-        this.outOfSyncReplicasCount = this.outOfSyncReplicasCount + value;
-    }
-}

+ 0 - 18
kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/model/InternalCluster.java

@@ -1,18 +0,0 @@
-package com.provectus.kafka.ui.cluster.model;
-
-import com.provectus.kafka.ui.model.ServerStatus;
-import lombok.Data;
-
-@Data
-public class InternalCluster {
-
-    private String name;
-    private String id;
-    private boolean defaultCluster;
-    private ServerStatus status;
-    private Integer brokerCount;
-    private Integer onlinePartitionCount;
-    private Integer topicCount;
-    private Integer bytesInPerSec;
-    private Integer bytesOutPerSec;
-}

+ 26 - 0
kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/model/InternalClusterMetrics.java

@@ -0,0 +1,26 @@
+package com.provectus.kafka.ui.cluster.model;
+
+import com.provectus.kafka.ui.model.ServerStatus;
+import lombok.Builder;
+import lombok.Data;
+
+
+@Data
+@Builder(toBuilder = true)
+public class InternalClusterMetrics {
+    private final int brokerCount;
+    private final int topicCount;
+    private final int activeControllers;
+    private final int uncleanLeaderElectionCount;
+    private final int onlinePartitionCount;
+    private final int underReplicatedPartitionCount;
+    private final int offlinePartitionCount;
+    private final int inSyncReplicasCount;
+    private final int outOfSyncReplicasCount;
+    //TODO: find way to fill
+    private final int bytesInPerSec;
+    private final int bytesOutPerSec;
+    //TODO: find way to fill
+    private final int segmentSize;
+    private final int segmentCount;
+}

+ 0 - 15
kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/model/InternalMetrics.java

@@ -1,15 +0,0 @@
-package com.provectus.kafka.ui.cluster.model;
-
-import lombok.Data;
-
-@Data
-public class InternalMetrics {
-
-    private Integer bytesInPerSec;
-
-    private Integer bytesOutPerSec;
-
-    private Integer brokerCount;
-
-    private Integer activeControllers;
-}

+ 0 - 1
kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/model/InternalPartition.java

@@ -13,5 +13,4 @@ public class InternalPartition {
     private final List<InternalReplica> replicas;
     private final int inSyncReplicasCount;
     private final int replicasCount;
-
 }

+ 1 - 1
kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/model/InternalReplica.java

@@ -8,7 +8,7 @@ import lombok.RequiredArgsConstructor;
 @Builder
 @RequiredArgsConstructor
 public class InternalReplica {
-    private final Integer broker;
+    private final int broker;
     private final boolean leader;
     private final boolean inSync;
 }

+ 10 - 7
kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/model/InternalTopic.java

@@ -13,13 +13,16 @@ import java.util.Map;
 public class InternalTopic {
 
     private final String name;
-
     private final boolean internal;
-
     private final List<InternalPartition> partitions;
-
-    private final TopicDetails topicDetails;
-
-    private final List<TopicConfig> topicConfigs;
-
+    private final List<InternalTopicConfig> topicConfigs;
+
+    private final int replicas;
+    private final int partitionCount;
+    private final int inSyncReplicas;
+    private final int replicationFactor;
+    private final int underReplicatedPartitions;
+    //TODO: find way to fill
+    private final int segmentSize;
+    private final int segmentCount;
 }

+ 13 - 0
kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/model/InternalTopicConfig.java

@@ -0,0 +1,13 @@
+package com.provectus.kafka.ui.cluster.model;
+
+
+import lombok.Builder;
+import lombok.Data;
+
+@Data
+@Builder
+public class InternalTopicConfig {
+    private final String name;
+    private final String value;
+    private final String defaultValue;
+}

+ 4 - 17
kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/model/KafkaCluster.java

@@ -8,7 +8,7 @@ import java.util.List;
 import java.util.Map;
 
 @Data
-@Builder(toBuilder = true, builderClassName = "KafkaClusterBuilder")
+@Builder(toBuilder = true)
 public class KafkaCluster {
 
     private final String id = "";
@@ -17,24 +17,11 @@ public class KafkaCluster {
     private final String jmxPort;
     private final String bootstrapServers;
     private final String zookeeper;
-
-    private final Cluster cluster;
-    private final BrokersMetrics brokersMetrics;
-
-    private final List<Topic> topics;
-    private final Map<String, TopicDetails> topicDetailsMap;
-    private final Map<String, List<TopicConfig>> topicConfigsMap;
+    private final ServerStatus status;
     private final ServerStatus zookeeperStatus;
-
+    private final InternalClusterMetrics metrics;
+    private final Map<String, InternalTopic> topics;
     private final Throwable lastKafkaException;
     private final Throwable lastZookeeperException;
 
-    public TopicDetails getOrCreateTopicDetails(String key) {
-        var topicDetails = topicDetailsMap.get(key);
-        if(topicDetails == null) {
-            topicDetailsMap.putIfAbsent(key, new TopicDetails());
-            topicDetails = topicDetailsMap.get(key);
-        }
-        return topicDetails;
-    }
 }

+ 33 - 34
kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/service/ClusterService.java

@@ -1,5 +1,6 @@
 package com.provectus.kafka.ui.cluster.service;
 
+import com.provectus.kafka.ui.cluster.mapper.ClusterMapper;
 import com.provectus.kafka.ui.cluster.model.ClustersStorage;
 import com.provectus.kafka.ui.cluster.model.KafkaCluster;
 import com.provectus.kafka.ui.cluster.util.ClusterUtil;
@@ -12,7 +13,9 @@ import org.springframework.stereotype.Service;
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
 
+import java.util.Collections;
 import java.util.List;
+import java.util.Optional;
 import java.util.stream.Collectors;
 
 @Service
@@ -20,57 +23,53 @@ import java.util.stream.Collectors;
 public class ClusterService {
 
     private final ClustersStorage clustersStorage;
+    private final ClusterMapper clusterMapper;
     private final KafkaService kafkaService;
 
-    public Flux<Cluster> getClusters() {
-        List<Cluster> clusters = clustersStorage.getKafkaClusters()
+    public List<Cluster> getClusters() {
+        return clustersStorage.getKafkaClusters()
                 .stream()
-                .map(KafkaCluster::getCluster)
+                .map(clusterMapper::toCluster)
                 .collect(Collectors.toList());
-
-        return Flux.fromIterable(clusters);
     }
 
-    public BrokersMetrics getBrokersMetrics(String name) {
-        KafkaCluster cluster = clustersStorage.getClusterByName(name);
-        if (cluster == null) return null;
-        return cluster.getBrokersMetrics();
+    public Optional<BrokersMetrics> getBrokersMetrics(String name) {
+        return clustersStorage.getClusterByName(name)
+                .map(KafkaCluster::getMetrics)
+                .map(clusterMapper::toBrokerMetrics);
     }
 
-    public Flux<Topic> getTopics(String name) {
-        KafkaCluster cluster = clustersStorage.getClusterByName(name);
-        if (cluster == null) return null;
-        return Flux.fromIterable(cluster.getTopics());
+    public List<Topic> getTopics(String name) {
+        return clustersStorage.getClusterByName(name)
+                .map( c ->
+                        c.getTopics().values().stream()
+                                .map(clusterMapper::toTopic)
+                                .collect(Collectors.toList())
+                ).orElse(Collections.emptyList());
     }
 
-    public TopicDetails getTopicDetails(String name, String topicName) {
-        KafkaCluster cluster = clustersStorage.getClusterByName(name);
-        if (cluster == null) return null;
-        return cluster.getOrCreateTopicDetails(topicName);
+    public Optional<TopicDetails> getTopicDetails(String name, String topicName) {
+        return clustersStorage.getClusterByName(name).flatMap(
+                c -> Optional.ofNullable(c.getTopics().get(topicName))
+        ).map(clusterMapper::toTopicDetails);
     }
 
-    public Flux<TopicConfig> getTopicConfigs(String name, String topicName) {
-        KafkaCluster cluster = clustersStorage.getClusterByName(name);
-        if (cluster == null) return null;
-        return Flux.fromIterable(cluster.getTopicConfigsMap().get(topicName));
+    public Optional<List<TopicConfig>> getTopicConfigs(String name, String topicName) {
+        return clustersStorage.getClusterByName(name).flatMap(
+                c -> Optional.ofNullable(c.getTopics().get(topicName))
+        ).map( t -> t.getTopicConfigs().stream().map(clusterMapper::toTopicConfig).collect(Collectors.toList()));
     }
 
     public Mono<Topic> createTopic(String name, Mono<TopicFormData> topicFormData) {
-        KafkaCluster cluster = clustersStorage.getClusterByName(name);
-        if (cluster == null) return null;
-        var adminClient = kafkaService.createAdminClient(cluster);
-        return kafkaService.createTopic(adminClient, cluster, topicFormData);
+        return clustersStorage.getClusterByName(name).map(
+                cluster -> kafkaService.createTopic(cluster, topicFormData)
+        ).orElse(Mono.empty()).map(clusterMapper::toTopic);
     }
 
     @SneakyThrows
-    public Flux<ConsumerGroup> getConsumerGroup (String clusterName) {
-            var cluster = clustersStorage.getClusterByName(clusterName);
-            var adminClient =  kafkaService.createAdminClient(cluster);
-            return ClusterUtil.toMono(adminClient.listConsumerGroups().all())
-                    .flatMap(s -> ClusterUtil.toMono(adminClient
-                            .describeConsumerGroups(s.stream().map(ConsumerGroupListing::groupId).collect(Collectors.toList())).all()))
-                    .map(s -> s.values().stream()
-                            .map(c -> ClusterUtil.convertToConsumerGroup(c, cluster)).collect(Collectors.toList()))
-                    .flatMapIterable(s -> s);
+    public Mono<List<ConsumerGroup>> getConsumerGroups(String clusterName) {
+            return clustersStorage.getClusterByName(clusterName)
+                    .map(kafkaService::getConsumerGroups)
+                    .orElse(Mono.empty());
     }
 }

+ 4 - 4
kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/service/MetricsUpdateService.java

@@ -1,6 +1,6 @@
 package com.provectus.kafka.ui.cluster.service;
 
-import com.provectus.kafka.ui.cluster.model.ClusterWithId;
+import com.provectus.kafka.ui.cluster.model.KafkaCluster;
 import com.provectus.kafka.ui.kafka.KafkaService;
 import com.provectus.kafka.ui.zookeeper.ZookeeperService;
 import lombok.RequiredArgsConstructor;
@@ -16,8 +16,8 @@ public class MetricsUpdateService {
     private final KafkaService kafkaService;
     private final ZookeeperService zookeeperService;
 
-    public Mono<ClusterWithId> updateMetrics(ClusterWithId clusterWithId) {
-        log.debug("Start getting metrics for kafkaCluster: {}", clusterWithId.getKafkaCluster());
-        return kafkaService.getUpdatedCluster(clusterWithId);
+    public Mono<KafkaCluster> updateMetrics(KafkaCluster kafkaCluster) {
+        log.debug("Start getting metrics for kafkaCluster: {}", kafkaCluster);
+        return kafkaService.getUpdatedCluster(kafkaCluster);
     }
 }

+ 66 - 31
kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/util/ClusterUtil.java

@@ -1,20 +1,18 @@
 package com.provectus.kafka.ui.cluster.util;
 
-import com.provectus.kafka.ui.cluster.model.InternalTopic;
-import com.provectus.kafka.ui.cluster.model.KafkaCluster;
+import com.provectus.kafka.ui.cluster.model.*;
 import com.provectus.kafka.ui.model.ConsumerGroup;
-import com.provectus.kafka.ui.model.Partition;
-import com.provectus.kafka.ui.model.Replica;
-import com.provectus.kafka.ui.model.Topic;
+import org.apache.kafka.clients.admin.ConfigEntry;
 import org.apache.kafka.clients.admin.ConsumerGroupDescription;
+import org.apache.kafka.clients.admin.TopicDescription;
 import org.apache.kafka.common.KafkaFuture;
 import reactor.core.publisher.Mono;
 
-import java.util.HashSet;
 import java.util.List;
-import java.util.Set;
 import java.util.stream.Collectors;
-import java.util.stream.Stream;
+
+import static com.provectus.kafka.ui.kafka.KafkaConstants.TOPIC_DEFAULT_CONFIGS;
+import static org.apache.kafka.common.config.TopicConfig.MESSAGE_FORMAT_VERSION_CONFIG;
 
 public class ClusterUtil {
 
@@ -30,35 +28,72 @@ public class ClusterUtil {
 
     public static ConsumerGroup convertToConsumerGroup(ConsumerGroupDescription c, KafkaCluster cluster) {
         ConsumerGroup consumerGroup = new ConsumerGroup();
-        consumerGroup.setClusterId(cluster.getCluster().getId());
+        consumerGroup.setClusterId(cluster.getId());
         consumerGroup.setConsumerGroupId(c.groupId());
         consumerGroup.setNumConsumers(c.members().size());
-        Set<String> topics = new HashSet<>();
-        c.members().forEach(s1 -> s1.assignment().topicPartitions().forEach(s2 -> topics.add(s2.topic())));
-        consumerGroup.setNumTopics(topics.size());
+        int numTopics = c.members().stream().mapToInt( m -> m.assignment().topicPartitions().size()).sum();
+        consumerGroup.setNumTopics(numTopics);
         return consumerGroup;
     }
 
-    public static List<Topic> convertToExternalTopicList(List<InternalTopic> internalTopics) {
-        return internalTopics.stream().flatMap(s -> Stream.of(convertToExternalTopic(s))).collect(Collectors.toList());
+    public static InternalTopicConfig mapToInternalTopicConfig(ConfigEntry configEntry) {
+        InternalTopicConfig.InternalTopicConfigBuilder builder = InternalTopicConfig.builder()
+                .name(configEntry.name())
+                .value(configEntry.value());
+        if (configEntry.name().equals(MESSAGE_FORMAT_VERSION_CONFIG)) {
+            builder.defaultValue(configEntry.value());
+        } else {
+            builder.defaultValue(TOPIC_DEFAULT_CONFIGS.get(configEntry.name()));
+        }
+        return builder.build();
     }
 
-    public static Topic convertToExternalTopic(InternalTopic internalTopic) {
-        Topic topic = new Topic();
-        topic.setName(internalTopic.getName());
-        topic.setPartitions(internalTopic.getPartitions().stream().flatMap(s -> {
-            Partition partition = new Partition();
-            partition.setLeader(s.getLeader());
-            partition.setPartition(s.getPartition());
-            partition.setReplicas(s.getReplicas().stream().flatMap(r -> {
-                Replica replica = new Replica();
-                replica.setBroker(r.getBroker());
-                replica.setInSync(r.isInSync());
-                replica.setLeader(r.isLeader());
-                return Stream.of(replica);
-            }).collect(Collectors.toList()));
-            return Stream.of(partition);
-        }).collect(Collectors.toList()));
-        return topic;
+    public static InternalTopic mapToInternalTopic(TopicDescription topicDescription) {
+        var topic = InternalTopic.builder();
+        topic.internal(topicDescription.isInternal());
+        topic.name(topicDescription.name());
+
+        List<InternalPartition> partitions = topicDescription.partitions().stream().map(
+                partition -> {
+                    var partitionDto = InternalPartition.builder();
+                    partitionDto.leader(partition.leader().id());
+                    partitionDto.partition(partition.partition());
+                    partitionDto.inSyncReplicasCount(partition.isr().size());
+                    partitionDto.replicasCount(partition.replicas().size());
+                    List<InternalReplica> replicas = partition.replicas().stream().map(
+                            r -> new InternalReplica(r.id(), partition.leader().id()!=r.id(), partition.isr().contains(r)))
+                            .collect(Collectors.toList());
+                    partitionDto.replicas(replicas);
+                    return partitionDto.build();
+                })
+                .collect(Collectors.toList());
+
+        int urpCount = partitions.stream()
+                .flatMap(partition -> partition.getReplicas().stream())
+                .filter(InternalReplica::isInSync).mapToInt(e -> 1)
+                .sum();
+
+        int inSyncReplicasCount = partitions.stream()
+                .mapToInt(InternalPartition::getInSyncReplicasCount)
+                .sum();
+
+        int replicasCount = partitions.stream()
+                .mapToInt(InternalPartition::getReplicasCount)
+                .sum();
+
+        topic.partitions(partitions);
+        topic.replicas(replicasCount);
+        topic.partitionCount(topicDescription.partitions().size());
+        topic.inSyncReplicas(inSyncReplicasCount);
+
+        topic.replicationFactor(
+                topicDescription.partitions().size() > 0 ?
+                        topicDescription.partitions().get(0).replicas().size() : 0
+        );
+
+        topic.underReplicatedPartitions(urpCount);
+
+        return topic.build();
     }
+
 }

+ 146 - 197
kafka-ui-api/src/main/java/com/provectus/kafka/ui/kafka/KafkaService.java

@@ -1,10 +1,11 @@
 package com.provectus.kafka.ui.kafka;
 
-import com.provectus.kafka.ui.cluster.mapper.BrokersMetricsMapper;
-import com.provectus.kafka.ui.cluster.mapper.ClusterDtoMapper;
 import com.provectus.kafka.ui.cluster.model.*;
 import com.provectus.kafka.ui.cluster.util.ClusterUtil;
-import com.provectus.kafka.ui.model.*;
+import com.provectus.kafka.ui.model.Cluster;
+import com.provectus.kafka.ui.model.ConsumerGroup;
+import com.provectus.kafka.ui.model.ServerStatus;
+import com.provectus.kafka.ui.model.TopicFormData;
 import com.provectus.kafka.ui.zookeeper.ZookeeperService;
 import lombok.RequiredArgsConstructor;
 import lombok.SneakyThrows;
@@ -13,15 +14,17 @@ import org.apache.kafka.clients.admin.*;
 import org.apache.kafka.common.KafkaFuture;
 import org.apache.kafka.common.Metric;
 import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.Node;
 import org.apache.kafka.common.config.ConfigResource;
+import org.apache.kafka.common.requests.DescribeLogDirsResponse;
 import org.springframework.stereotype.Service;
-import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
+import reactor.util.function.Tuple2;
+import reactor.util.function.Tuples;
 
 import java.util.*;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.stream.Collectors;
-import java.util.stream.Stream;
 
 import static com.provectus.kafka.ui.kafka.KafkaConstants.*;
 import static org.apache.kafka.common.config.TopicConfig.MESSAGE_FORMAT_VERSION_CONFIG;
@@ -31,135 +34,142 @@ import static org.apache.kafka.common.config.TopicConfig.MESSAGE_FORMAT_VERSION_
 @Log4j2
 public class KafkaService {
 
+    private static final ListTopicsOptions LIST_TOPICS_OPTIONS = new ListTopicsOptions().listInternal(true);
+
     private final ZookeeperService zookeeperService;
+    private final Map<String, AdminClient> adminClientCache = new ConcurrentHashMap<>();
 
-    private Map<String, AdminClient> adminClientCache = new ConcurrentHashMap<>();
+    @SneakyThrows
+    public Mono<KafkaCluster> getUpdatedCluster(KafkaCluster cluster) {
+        return getOrCreateAdminClient(cluster).flatMap(
+                ac -> getClusterMetrics(ac).flatMap( clusterMetrics ->
+                            getTopicsData(ac).flatMap( topics ->
+                                loadTopicsConfig(ac, topics.stream().map(InternalTopic::getName).collect(Collectors.toList()))
+                                        .map( configs -> mergeWithConfigs(topics, configs) )
+                            ).map( topics -> buildFromData(cluster, clusterMetrics, topics))
+                        )
+        ).onErrorResume(
+                e -> Mono.just(cluster.toBuilder()
+                        .status(ServerStatus.OFFLINE)
+                        .lastKafkaException(e)
+                        .build())
+        );
+    }
 
-    private final ClusterDtoMapper clusterDtoMapper;
+    private KafkaCluster buildFromData(KafkaCluster currentCluster, InternalClusterMetrics brokersMetrics, Map<String, InternalTopic> topics) {
 
-    private final BrokersMetricsMapper brokersMetricsMapper;
+        InternalClusterMetrics.InternalClusterMetricsBuilder metricsBuilder = brokersMetrics.toBuilder();
 
-    @SneakyThrows
-    public Mono<ClusterWithId> getUpdatedCluster(ClusterWithId clusterWithId) {
-        var internalCluster = clusterWithId.getKafkaCluster();
-        return getOrCreateAdminClient(clusterWithId).flatMap(
-                    ac ->
-                        getClusterMetrics(ac).flatMap(
-                                internalMetrics ->
-                                    getTopicsData(ac)
-                                        .flatMap(topics ->
-                                            loadTopicConfig(ac, topics.stream().map(InternalTopic::getName).collect(Collectors.toList())).collectList()
-                                                .map(s -> s.stream().collect(HashMap<String, List<TopicConfig>>::new, HashMap::putAll, HashMap::putAll))
-                                                .map(s -> s.entrySet().stream().map(t -> InternalTopic.builder()
-                                                            .name(t.getKey())
-                                                            .topicConfigs(t.getValue())
-                                                            .topicDetails(topics.stream().filter(to -> to.getName().equals(t.getKey())).findFirst().orElseThrow().getTopicDetails())
-                                                            .partitions(topics.stream().filter(to -> to.getName().equals(t.getKey())).findFirst().orElseThrow().getPartitions())
-                                                            .build()).collect(Collectors.toList()))
-                                        ).map(topics -> {
-
-                                            InternalBrokersMetrics brokersMetrics = internalCluster.getBrokersMetrics() != null
-                                                    ? brokersMetricsMapper.toBrokersMetricsDto(internalCluster.getBrokersMetrics()) : InternalBrokersMetrics.builder().build();
-                                            resetPartitionMetrics(brokersMetrics);
-                                            brokersMetrics.setActiveControllers(internalMetrics.getActiveControllers());
-                                            brokersMetrics.setZooKeeperStatus(zookeeperService.isZookeeperOnline(internalCluster) ? 1 : 0);
-                                            brokersMetrics.setBrokerCount(internalMetrics.getBrokerCount());
-                                            var internalBrokersMetrics = updateBrokersMetrics(brokersMetrics, topics);
-
-                                            InternalCluster cluster = clusterDtoMapper.toClusterDto(internalCluster.getCluster());
-                                            cluster.setStatus(ServerStatus.ONLINE);
-                                            cluster.setBytesInPerSec(internalMetrics.getBytesInPerSec());
-                                            cluster.setBytesOutPerSec(internalMetrics.getBytesOutPerSec());
-                                            cluster.setBrokerCount(internalMetrics.getBrokerCount());
-                                            cluster.setTopicCount(topics.size());
-                                            cluster.setOnlinePartitionCount(internalBrokersMetrics.getOnlinePartitionCount());
-
-                                            return ClusterWithId.builder()
-                                                    .id(internalCluster.getName())
-                                                    .kafkaCluster(
-                                                        KafkaCluster.builder().topics(ClusterUtil.convertToExternalTopicList(topics))
-                                                        .name(cluster.getName())
-                                                        .zookeeperStatus(zookeeperService.isZookeeperOnline(internalCluster) ? ServerStatus.ONLINE : ServerStatus.OFFLINE)
-                                                        .cluster(clusterDtoMapper.toCluster(cluster))
-                                                        .brokersMetrics(brokersMetricsMapper.toBrokersMetrics(internalBrokersMetrics))
-                                                        .build()
-                                                    ).build();
-                                            })
-                                )
-            ).onErrorResume(
-                    e -> {
-                        InternalCluster cluster = clusterDtoMapper.toClusterDto(internalCluster.getCluster());
-                        cluster.setStatus(ServerStatus.OFFLINE);
-                        return Mono.just(clusterWithId.toBuilder().kafkaCluster(
-                                internalCluster.toBuilder()
-                                .lastKafkaException(e)
-                                .cluster(clusterDtoMapper.toCluster(cluster))
-                                .build()
-                        ).build());
-                    }
-            );
+        InternalClusterMetrics topicsMetrics = collectTopicsMetrics(topics);
+
+        ServerStatus zookeeperStatus = ServerStatus.OFFLINE;
+        Throwable zookeeperException = null;
+        try {
+            zookeeperStatus = zookeeperService.isZookeeperOnline(currentCluster) ? ServerStatus.ONLINE : ServerStatus.OFFLINE;
+        } catch (Throwable e) {
+            zookeeperException = e;
+        }
+
+        InternalClusterMetrics clusterMetrics = metricsBuilder
+                .activeControllers(brokersMetrics.getActiveControllers())
+                .brokerCount(brokersMetrics.getBrokerCount())
+                .underReplicatedPartitionCount(topicsMetrics.getUnderReplicatedPartitionCount())
+                .inSyncReplicasCount(topicsMetrics.getInSyncReplicasCount())
+                .outOfSyncReplicasCount(topicsMetrics.getOutOfSyncReplicasCount())
+                .onlinePartitionCount(topicsMetrics.getOnlinePartitionCount())
+                .offlinePartitionCount(topicsMetrics.getOfflinePartitionCount()).build();
+
+        return currentCluster.toBuilder()
+                .status(ServerStatus.ONLINE)
+                .zookeeperStatus(zookeeperStatus)
+                .lastZookeeperException(zookeeperException)
+                .lastKafkaException(null)
+                .metrics(clusterMetrics)
+                .topics(topics)
+                .build();
+    }
+
+    private InternalClusterMetrics collectTopicsMetrics(Map<String,InternalTopic> topics) {
+
+        int underReplicatedPartitions = 0;
+        int inSyncReplicasCount = 0;
+        int outOfSyncReplicasCount = 0;
+        int onlinePartitionCount = 0;
+        int offlinePartitionCount = 0;
+
+        for (InternalTopic topic : topics.values()) {
+            underReplicatedPartitions += topic.getUnderReplicatedPartitions();
+            inSyncReplicasCount += topic.getInSyncReplicas();
+            outOfSyncReplicasCount += (topic.getReplicas() - topic.getInSyncReplicas());
+            onlinePartitionCount += topic.getPartitions().stream().mapToInt(s -> s.getLeader() == null ? 0 : 1).sum();
+            offlinePartitionCount += topic.getPartitions().stream().mapToInt(s -> s.getLeader() != null ? 0 : 1).sum();
+        }
+
+        return InternalClusterMetrics.builder()
+                .underReplicatedPartitionCount(underReplicatedPartitions)
+                .inSyncReplicasCount(inSyncReplicasCount)
+                .outOfSyncReplicasCount(outOfSyncReplicasCount)
+                .onlinePartitionCount(onlinePartitionCount)
+                .offlinePartitionCount(offlinePartitionCount)
+                .build();
+    }
+
+    private Map<String, InternalTopic> mergeWithConfigs(List<InternalTopic> topics, Map<String, List<InternalTopicConfig>> configs) {
+        return topics.stream().map(
+                t -> t.toBuilder().topicConfigs(configs.get(t.getName())).build()
+        ).collect(Collectors.toMap(
+                InternalTopic::getName,
+                e -> e
+        ));
     }
 
     @SneakyThrows
     private Mono<List<InternalTopic>> getTopicsData(AdminClient adminClient) {
-        ListTopicsOptions listTopicsOptions = new ListTopicsOptions();
-        listTopicsOptions.listInternal(true);
-        return ClusterUtil.toMono(adminClient.listTopics(listTopicsOptions).names())
-                    .map(tl -> {
-                        DescribeTopicsResult topicDescriptionsWrapper = adminClient.describeTopics(tl);
-                        Map<String, KafkaFuture<TopicDescription>> topicDescriptionFuturesMap = topicDescriptionsWrapper.values();
-                        return topicDescriptionFuturesMap.entrySet();
-                    })
-                    .flatMapMany(Flux::fromIterable)
-                    .flatMap(s -> ClusterUtil.toMono(s.getValue()))
-                    .map(this::collectTopicData)
-                    .collectList();
+        return ClusterUtil.toMono(adminClient.listTopics(LIST_TOPICS_OPTIONS).names())
+                    .flatMap(topics -> ClusterUtil.toMono(adminClient.describeTopics(topics).all()))
+                    .map( m -> m.values().stream().map(ClusterUtil::mapToInternalTopic).collect(Collectors.toList()));
     }
 
-    private Mono<InternalMetrics> getClusterMetrics(AdminClient client) {
+    private Mono<InternalClusterMetrics> getClusterMetrics(AdminClient client) {
         return ClusterUtil.toMono(client.describeCluster().nodes())
-                .map(Collection::size)
                 .flatMap(brokers ->
                     ClusterUtil.toMono(client.describeCluster().controller()).map(
                         c -> {
-                            InternalMetrics internalMetrics = new InternalMetrics();
-                            internalMetrics.setBrokerCount(brokers);
-                            internalMetrics.setActiveControllers(c != null ? 1 : 0);
-                            for (Map.Entry<MetricName, ? extends Metric> metricNameEntry : client.metrics().entrySet()) {
-                                if (metricNameEntry.getKey().name().equals(IN_BYTE_PER_SEC_METRIC)
-                                        && metricNameEntry.getKey().description().equals(IN_BYTE_PER_SEC_METRIC_DESCRIPTION)) {
-                                    internalMetrics.setBytesInPerSec((int) Math.round((double) metricNameEntry.getValue().metricValue()));
-                                }
-                                if (metricNameEntry.getKey().name().equals(OUT_BYTE_PER_SEC_METRIC)
-                                        && metricNameEntry.getKey().description().equals(OUT_BYTE_PER_SEC_METRIC_DESCRIPTION)) {
-                                    internalMetrics.setBytesOutPerSec((int) Math.round((double) metricNameEntry.getValue().metricValue()));
-                                }
-                            }
-                            return internalMetrics;
+                            InternalClusterMetrics.InternalClusterMetricsBuilder builder = InternalClusterMetrics.builder();
+                            builder.brokerCount(brokers.size()).activeControllers(c != null ? 1 : 0);
+                            // TODO: fill bytes in/out metrics
+                            List<Integer> brokerIds = brokers.stream().map(Node::id).collect(Collectors.toList());
+
+                            return builder.build();
                         }
                     )
                 );
     }
 
 
+    public Mono<InternalTopic> createTopic(KafkaCluster cluster, Mono<TopicFormData> topicFormData) {
+        AdminClient adminClient = this.createAdminClient(cluster);
+        return this.createTopic(adminClient, topicFormData);
+    }
+
     @SneakyThrows
-    public Mono<Topic> createTopic(AdminClient adminClient, KafkaCluster cluster, Mono<TopicFormData> topicFormData) {
+    public Mono<InternalTopic> createTopic(AdminClient adminClient, Mono<TopicFormData> topicFormData) {
         return topicFormData.flatMap(
                 topicData -> {
                     NewTopic newTopic = new NewTopic(topicData.getName(), topicData.getPartitions(), topicData.getReplicationFactor().shortValue());
                     newTopic.configs(topicData.getConfigs());
-                    createTopic(adminClient, newTopic);
-                    return topicFormData;
+                    return createTopic(adminClient, newTopic).map( v -> topicData);
                 }).flatMap(topicData -> {
                     var tdw = adminClient.describeTopics(Collections.singletonList(topicData.getName()));
                     return getTopicDescription(tdw.values().get(topicData.getName()), topicData.getName());
-                }).map(s -> {
-                    if (s == null) {
-                        throw new RuntimeException("Can't find created topic");
-                    }
-                return s;
-                }).map(s -> getUpdatedCluster(new ClusterWithId(cluster.getName(), cluster)))
-                .map(s -> new Topic());
+                })
+                .switchIfEmpty(Mono.error(new RuntimeException("Can't find created topic")))
+                .map(ClusterUtil::mapToInternalTopic)
+                .flatMap( t ->
+                        loadTopicsConfig(adminClient, Collections.singletonList(t.getName()))
+                                .map( c -> mergeWithConfigs(Collections.singletonList(t), c))
+                                .map( m -> m.values().iterator().next())
+                );
     }
 
     @SneakyThrows
@@ -168,10 +178,10 @@ public class KafkaService {
     }
 
 
-    public Mono<AdminClient> getOrCreateAdminClient(ClusterWithId clusterWithId) {
+    public Mono<AdminClient> getOrCreateAdminClient(KafkaCluster cluster) {
         AdminClient adminClient = adminClientCache.computeIfAbsent(
-                clusterWithId.getId(),
-                (id) -> createAdminClient(clusterWithId.getKafkaCluster())
+                cluster.getId(),
+                (id) -> createAdminClient(cluster)
         );
 
         return isAdminClientConnected(adminClient);
@@ -188,57 +198,7 @@ public class KafkaService {
         return getClusterId(adminClient).map( r -> adminClient);
     }
 
-    private void resetPartitionMetrics(InternalBrokersMetrics brokersMetrics) {
-        brokersMetrics.setOnlinePartitionCount(0);
-        brokersMetrics.setOfflinePartitionCount(0);
-        brokersMetrics.setUnderReplicatedPartitionCount(0);
-        brokersMetrics.setInSyncReplicasCount(0);
-        brokersMetrics.setOutOfSyncReplicasCount(0);
-    }
-
-    private InternalTopic collectTopicData(TopicDescription topicDescription) {
-        TopicDetails topicDetails = new TopicDetails();
-        var topic = InternalTopic.builder();
-        topic.internal(topicDescription.isInternal());
-        topic.name(topicDescription.name());
-        List<InternalPartition> partitions = new ArrayList<>();
-
-        int inSyncReplicasCount;
-        int replicasCount;
-
-        partitions.addAll(topicDescription.partitions().stream().map(
-                partition -> {
-                    var partitionDto = InternalPartition.builder();
-                    partitionDto.leader(partition.leader().id());
-                    partitionDto.partition(partition.partition());
-                    partitionDto.inSyncReplicasCount(partition.isr().size());
-                    partitionDto.replicasCount(partition.replicas().size());
-                    List<InternalReplica> replicas = partition.replicas().stream().map(
-                            r -> new InternalReplica(r.id(), partition.leader().id()!=r.id(), partition.isr().contains(r)))
-                            .collect(Collectors.toList());
-                    partitionDto.replicas(replicas);
-                    return partitionDto.build();
-                })
-                .collect(Collectors.toList()));
-
-        Integer urpCount = partitions.stream().flatMap(partition -> partition.getReplicas().stream()).filter(InternalReplica::isInSync).map(e -> 1).reduce(0, Integer::sum);
-        inSyncReplicasCount = partitions.stream().flatMap(s -> Stream.of(s.getInSyncReplicasCount())).reduce(Integer::sum).orElseGet(() -> 0);
-        replicasCount = partitions.stream().flatMap(s -> Stream.of(s.getReplicasCount())).reduce(Integer::sum).orElseGet(() -> 0);
 
-        topic.partitions(partitions);
-
-        topicDetails.setReplicas(replicasCount);
-        topicDetails.setPartitionCount(topicDescription.partitions().size());
-        topicDetails.setInSyncReplicas(inSyncReplicasCount);
-        topicDetails.setReplicationFactor(topicDescription.partitions().size() > 0
-                ? topicDescription.partitions().get(0).replicas().size()
-                : null);
-        topicDetails.setUnderReplicatedPartitions(urpCount);
-
-        topic.topicDetails(topicDetails);
-
-        return topic.build();
-    }
 
     private Mono<TopicDescription> getTopicDescription(KafkaFuture<TopicDescription> entry, String topicName) {
         return ClusterUtil.toMono(entry)
@@ -249,30 +209,36 @@ public class KafkaService {
     }
 
     @SneakyThrows
-    private Flux<Map<String, List<TopicConfig>>> loadTopicConfig(AdminClient adminClient, List<String> topicNames) {
-        return Flux.fromIterable(topicNames).flatMap(topicName -> {
-            Set<ConfigResource> resources = Collections.singleton(new ConfigResource(ConfigResource.Type.TOPIC, topicName));
-            return ClusterUtil.toMono(adminClient.describeConfigs(resources).all())
-                    .map(configs -> {
-                        if (configs.isEmpty()) return Collections.emptyMap();
-                        Collection<ConfigEntry> entries = configs.values().iterator().next().entries();
-                        List<TopicConfig> topicConfigs = new ArrayList<>();
-                        for (ConfigEntry entry : entries) {
-                            TopicConfig topicConfig = new TopicConfig();
-                            topicConfig.setName(entry.name());
-                            topicConfig.setValue(entry.value());
-                            if (topicConfig.getName().equals(MESSAGE_FORMAT_VERSION_CONFIG)) {
-                                topicConfig.setDefaultValue(topicConfig.getValue());
-                            } else {
-                                topicConfig.setDefaultValue(TOPIC_DEFAULT_CONFIGS.get(entry.name()));
-                            }
-                            topicConfigs.add(topicConfig);
-                        }
-                        return Collections.singletonMap(topicName, topicConfigs);
-                    });
-         });
+    private Mono<Map<String, List<InternalTopicConfig>>> loadTopicsConfig(AdminClient adminClient, List<String> topicNames) {
+        List<ConfigResource> resources = topicNames.stream()
+                .map(topicName -> new ConfigResource(ConfigResource.Type.TOPIC, topicName))
+                .collect(Collectors.toList());
+
+        return ClusterUtil.toMono(adminClient.describeConfigs(resources).all())
+                .map(configs ->
+                        configs.entrySet().stream().map(
+                                c -> Tuples.of(
+                                        c.getKey().name(),
+                                        c.getValue().entries().stream().map(ClusterUtil::mapToInternalTopicConfig).collect(Collectors.toList())
+                                )
+                        ).collect(Collectors.toMap(
+                                Tuple2::getT1,
+                                Tuple2::getT2
+                        ))
+                );
+    }
+
+    public Mono<List<ConsumerGroup>> getConsumerGroups(KafkaCluster cluster) {
+        var adminClient =  this.createAdminClient(cluster);
+
+        return ClusterUtil.toMono(adminClient.listConsumerGroups().all())
+                .flatMap(s -> ClusterUtil.toMono(adminClient
+                        .describeConsumerGroups(s.stream().map(ConsumerGroupListing::groupId).collect(Collectors.toList())).all()))
+                .map(s -> s.values().stream()
+                        .map(c -> ClusterUtil.convertToConsumerGroup(c, cluster)).collect(Collectors.toList()));
     }
 
+
     @SneakyThrows
     private Mono<Void> createTopic(AdminClient adminClient, NewTopic newTopic) {
         return ClusterUtil.toMono(adminClient.createTopics(Collections.singletonList(newTopic))
@@ -281,21 +247,4 @@ public class KafkaService {
                     .iterator()
                     .next());
     }
-
-    private InternalBrokersMetrics updateBrokersMetrics(InternalBrokersMetrics brokersMetricsInput, List<InternalTopic> topics) {
-        var tempBrokersMetrics = InternalBrokersMetrics.builder().build();
-        var brokersMetrics = brokersMetricsInput.toBuilder();
-        for (InternalTopic topic : topics) {
-            tempBrokersMetrics.increaseUnderReplicatedPartitionCount(topic.getTopicDetails().getUnderReplicatedPartitions());
-            tempBrokersMetrics.increaseInSyncReplicasCount(topic.getTopicDetails().getInSyncReplicas());
-            tempBrokersMetrics.increaseOutOfSyncReplicasCount(topic.getTopicDetails().getReplicas() - topic.getTopicDetails().getInSyncReplicas());
-            tempBrokersMetrics.increaseOnlinePartitionCount(topic.getPartitions().stream().filter(s -> s.getLeader() != null).map(e -> 1).reduce(0, Integer::sum));
-            tempBrokersMetrics.increaseOfflinePartitionCount(topic.getPartitions().stream().filter(s -> s.getLeader() == null).map(e -> 1).reduce(0, Integer::sum));
-        }
-        return brokersMetrics.underReplicatedPartitionCount(tempBrokersMetrics.getUnderReplicatedPartitionCount())
-                .inSyncReplicasCount(tempBrokersMetrics.getInSyncReplicasCount())
-                .outOfSyncReplicasCount(tempBrokersMetrics.getOutOfSyncReplicasCount())
-                .onlinePartitionCount(tempBrokersMetrics.getOnlinePartitionCount())
-                .offlinePartitionCount(tempBrokersMetrics.getOfflinePartitionCount()).build();
-    }
 }

+ 26 - 7
kafka-ui-api/src/main/java/com/provectus/kafka/ui/rest/MetricsRestController.java

@@ -22,41 +22,60 @@ public class MetricsRestController implements ApiClustersApi {
 
     @Override
     public Mono<ResponseEntity<Flux<Cluster>>> getClusters(ServerWebExchange exchange) {
-        return Mono.just(new ResponseEntity<>(clusterService.getClusters(), HttpStatus.OK));
+        return Mono.just(ResponseEntity.ok(Flux.fromIterable(clusterService.getClusters())));
     }
 
     @Override
     public Mono<ResponseEntity<BrokersMetrics>> getBrokersMetrics(String clusterId, ServerWebExchange exchange) {
-        return Mono.just(new ResponseEntity<>(clusterService.getBrokersMetrics(clusterId), HttpStatus.OK));
+        return Mono.just(
+                clusterService.getBrokersMetrics(clusterId)
+                        .map(ResponseEntity::ok)
+                        .orElse(ResponseEntity.notFound().build())
+        );
     }
 
     @Override
     public Mono<ResponseEntity<Flux<Topic>>> getTopics(String clusterId, ServerWebExchange exchange) {
-        return Mono.just(new ResponseEntity<>(clusterService.getTopics(clusterId), HttpStatus.OK));
+        return Mono.just(ResponseEntity.ok(Flux.fromIterable(clusterService.getTopics(clusterId))));
     }
 
     @Override
     public Mono<ResponseEntity<TopicDetails>> getTopicDetails(String clusterId, String topicName, ServerWebExchange exchange) {
-        return Mono.just(new ResponseEntity<>(clusterService.getTopicDetails(clusterId, topicName), HttpStatus.OK));
+        return Mono.just(
+                clusterService.getTopicDetails(clusterId, topicName)
+                        .map(ResponseEntity::ok)
+                        .orElse(ResponseEntity.notFound().build())
+        );
     }
 
     @Override
     public Mono<ResponseEntity<Flux<TopicConfig>>> getTopicConfigs(String clusterId, String topicName, ServerWebExchange exchange) {
-        return Mono.just(new ResponseEntity<>(clusterService.getTopicConfigs(clusterId, topicName), HttpStatus.OK));
+        return Mono.just(
+                clusterService.getTopicConfigs(clusterId, topicName)
+                        .map(Flux::fromIterable)
+                        .map(ResponseEntity::ok)
+                        .orElse(ResponseEntity.notFound().build())
+        );
     }
 
     @Override
     public Mono<ResponseEntity<Topic>> createTopic(String clusterId, @Valid Mono<TopicFormData> topicFormData, ServerWebExchange exchange) {
-        return clusterService.createTopic(clusterId, topicFormData).map(s -> new ResponseEntity<>(s, HttpStatus.OK));
+        return clusterService.createTopic(clusterId, topicFormData)
+                .map(s -> new ResponseEntity<>(s, HttpStatus.OK))
+                .switchIfEmpty(Mono.just(ResponseEntity.notFound().build()));
     }
 
     @Override
     public Mono<ResponseEntity<Flux<Broker>>> getBrokers(String clusterId, ServerWebExchange exchange) {
+        //TODO: ????
         return Mono.just(ResponseEntity.ok(Flux.fromIterable(new ArrayList<>())));
     }
 
     @Override
     public Mono<ResponseEntity<Flux<ConsumerGroup>>> getConsumerGroup(String clusterName, ServerWebExchange exchange) {
-        return Mono.just(new ResponseEntity<>(clusterService.getConsumerGroup(clusterName), HttpStatus.OK));
+        return clusterService.getConsumerGroups(clusterName)
+                .map(Flux::fromIterable)
+                .map(ResponseEntity::ok)
+                .switchIfEmpty(Mono.just(ResponseEntity.notFound().build())); // TODO: check behaviour on cluster not found and empty groups list
     }
 }

+ 6 - 11
kafka-ui-api/src/main/java/com/provectus/kafka/ui/zookeeper/ZookeeperService.java

@@ -21,7 +21,7 @@ public class ZookeeperService {
 
     public boolean isZookeeperOnline(KafkaCluster kafkaCluster) {
         var isConnected = false;
-        var zkClient = getOrCreateZkClient(kafkaCluster.getName());
+        var zkClient = getOrCreateZkClient(kafkaCluster);
         log.debug("Start getting Zookeeper metrics for kafkaCluster: {}", kafkaCluster.getName());
         if (zkClient != null) {
             isConnected = isZkClientConnected(zkClient);
@@ -30,20 +30,15 @@ public class ZookeeperService {
     }
 
     private boolean isZkClientConnected(ZkClient zkClient) {
-        try {
-            zkClient.getChildren("/brokers/ids");
-            return true;
-        } catch (Exception e) {
-            log.error(e);
-            return false;
-        }
+        zkClient.getChildren("/brokers/ids");
+        return true;
     }
 
-    private ZkClient getOrCreateZkClient (String clusterName) {
+    private ZkClient getOrCreateZkClient (KafkaCluster cluster) {
         try {
-            return cachedZkClient.getOrDefault(clusterName, new ZkClient(clustersStorage.getClusterByName(clusterName).getZookeeper(), 1000));
+            return cachedZkClient.getOrDefault(cluster.getName(), new ZkClient(cluster.getZookeeper(), 1000));
         } catch (Exception e) {
-            log.error("Error while creating zookeeper client for cluster {}", clusterName);
+            log.error("Error while creating zookeeper client for cluster {}", cluster.getName());
             return null;
         }
     }