Browse Source

Merge branch 'master' into backend-topics-changing

Roman Nedzvetskiy 5 năm trước cách đây
mục cha
commit
3d7ef5b1b8
32 tập tin đã thay đổi với 730 bổ sung491 xóa
  1. 10 4
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/ClustersMetricsScheduler.java
  2. 13 0
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/mapper/BrokersMetricsMapper.java
  3. 12 0
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/mapper/ClusterDtoMapper.java
  4. 19 14
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/mapper/ClusterMapper.java
  5. 15 4
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/model/ClustersStorage.java
  6. 25 0
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/model/InternalClusterMetrics.java
  7. 16 0
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/model/InternalPartition.java
  8. 14 0
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/model/InternalReplica.java
  9. 25 0
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/model/InternalTopic.java
  10. 13 0
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/model/InternalTopicConfig.java
  11. 15 41
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/model/KafkaCluster.java
  12. 0 16
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/model/KafkaMetrics.java
  13. 34 37
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/service/ClusterService.java
  14. 4 7
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/service/MetricsUpdateService.java
  15. 72 5
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/util/ClusterUtil.java
  16. 193 216
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/kafka/KafkaService.java
  17. 27 7
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/rest/MetricsRestController.java
  18. 1 1
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/rest/config/CustomWebFilter.java
  19. 0 10
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/zookeeper/ZooKeeperConstants.java
  20. 20 26
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/zookeeper/ZookeeperService.java
  21. 0 4
      kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml
  22. 31 15
      kafka-ui-react-app/src/components/App.tsx
  23. 11 8
      kafka-ui-react-app/src/components/ConsumerGroups/ConsumerGroups.tsx
  24. 8 9
      kafka-ui-react-app/src/components/Nav/Nav.tsx
  25. 21 20
      kafka-ui-react-app/src/components/Topics/New/New.tsx
  26. 29 0
      kafka-ui-react-app/src/components/Topics/New/TimeToRetainBtn.tsx
  27. 36 0
      kafka-ui-react-app/src/components/Topics/New/TimeToRetainBtns.tsx
  28. 22 6
      kafka-ui-react-app/src/components/Topics/Topics.tsx
  29. 21 22
      kafka-ui-react-app/src/components/Topics/shared/Form/TimeToRetain.tsx
  30. 2 0
      kafka-ui-react-app/src/lib/constants.ts
  31. 1 1
      kafka-ui-react-app/src/lib/paths.ts
  32. 20 18
      kafka-ui-react-app/src/redux/api/topics.ts

+ 10 - 4
kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/ClustersMetricsScheduler.java

@@ -1,12 +1,15 @@
 package com.provectus.kafka.ui.cluster;
 
 import com.provectus.kafka.ui.cluster.model.ClustersStorage;
-import com.provectus.kafka.ui.cluster.model.KafkaCluster;
 import com.provectus.kafka.ui.cluster.service.MetricsUpdateService;
 import lombok.RequiredArgsConstructor;
 import lombok.extern.log4j.Log4j2;
 import org.springframework.scheduling.annotation.Scheduled;
 import org.springframework.stereotype.Component;
+import reactor.core.publisher.Flux;
+import reactor.core.scheduler.Schedulers;
+
+import java.util.Map;
 
 @Component
 @RequiredArgsConstructor
@@ -19,8 +22,11 @@ public class ClustersMetricsScheduler {
 
     @Scheduled(fixedRate = 30000)
     public void updateMetrics() {
-        for (KafkaCluster kafkaCluster : clustersStorage.getKafkaClusters()) {
-            metricsUpdateService.updateMetrics(kafkaCluster);
-        }
+        Flux.fromIterable(clustersStorage.getKafkaClustersMap().entrySet())
+                .subscribeOn(Schedulers.parallel())
+                .map(Map.Entry::getValue)
+                .flatMap(metricsUpdateService::updateMetrics)
+                .doOnNext(s -> clustersStorage.setKafkaCluster(s.getId(), s))
+                .subscribe();
     }
 }

+ 13 - 0
kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/mapper/BrokersMetricsMapper.java

@@ -0,0 +1,13 @@
+package com.provectus.kafka.ui.cluster.mapper;
+
+import com.provectus.kafka.ui.cluster.model.InternalClusterMetrics;
+import com.provectus.kafka.ui.model.BrokersMetrics;
+import org.mapstruct.Mapper;
+
+@Mapper(componentModel = "spring")
+public interface BrokersMetricsMapper {
+
+    InternalClusterMetrics toBrokersMetricsDto (BrokersMetrics brokersMetrics);
+
+    BrokersMetrics toBrokersMetrics (InternalClusterMetrics brokersMetrics);
+}

+ 12 - 0
kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/mapper/ClusterDtoMapper.java

@@ -0,0 +1,12 @@
+package com.provectus.kafka.ui.cluster.mapper;
+
+import com.provectus.kafka.ui.cluster.model.KafkaCluster;
+import com.provectus.kafka.ui.model.Cluster;
+import org.mapstruct.Mapper;
+
+@Mapper(componentModel = "spring")
+public interface ClusterDtoMapper {
+
+    KafkaCluster toInternalCluster(Cluster cluster);
+    Cluster toClusterDto(KafkaCluster cluster);
+}

+ 19 - 14
kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/mapper/ClusterMapper.java

@@ -1,23 +1,28 @@
 package com.provectus.kafka.ui.cluster.mapper;
 
 import com.provectus.kafka.ui.cluster.config.ClustersProperties;
+import com.provectus.kafka.ui.cluster.model.InternalClusterMetrics;
+import com.provectus.kafka.ui.cluster.model.InternalTopic;
+import com.provectus.kafka.ui.cluster.model.InternalTopicConfig;
 import com.provectus.kafka.ui.cluster.model.KafkaCluster;
+import com.provectus.kafka.ui.model.*;
 import org.mapstruct.Mapper;
 import org.mapstruct.Mapping;
 
-@Mapper
-public abstract class ClusterMapper {
+@Mapper(componentModel = "spring")
+public interface ClusterMapper {
 
-    @Mapping(source = "name", target = "cluster.name")
-    @Mapping(target = "brokersMetrics", ignore = true)
-    @Mapping(target = "cluster", ignore = true)
-    @Mapping(target = "lastKafkaException", ignore = true)
-    @Mapping(target = "lastZookeeperException", ignore = true)
-    @Mapping(target = "topicConfigsMap", ignore = true)
-    @Mapping(target = "topicDetailsMap", ignore = true)
-    @Mapping(target = "topics", ignore = true)
-    @Mapping(target = "zkClient", ignore = true)
-    @Mapping(target = "zookeeperStatus", ignore = true)
-    @Mapping(target = "adminClient", ignore = true)
-    public abstract KafkaCluster toKafkaCluster(ClustersProperties.Cluster clusterProperties);
+    KafkaCluster toKafkaCluster(ClustersProperties.Cluster clusterProperties);
+
+    @Mapping(target = "brokerCount", source = "metrics.brokerCount")
+    @Mapping(target = "onlinePartitionCount", source = "metrics.onlinePartitionCount")
+    @Mapping(target = "topicCount", source = "metrics.topicCount")
+    @Mapping(target = "bytesInPerSec", source = "metrics.bytesInPerSec")
+    @Mapping(target = "bytesOutPerSec", source = "metrics.bytesOutPerSec")
+    Cluster toCluster(KafkaCluster cluster);
+
+    BrokersMetrics toBrokerMetrics(InternalClusterMetrics metrics);
+    Topic toTopic(InternalTopic topic);
+    TopicDetails toTopicDetails(InternalTopic topic);
+    TopicConfig toTopicConfig(InternalTopicConfig topic);
 }

+ 15 - 4
kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/model/ClustersStorage.java

@@ -7,13 +7,16 @@ import org.mapstruct.factory.Mappers;
 import org.springframework.stereotype.Component;
 
 import javax.annotation.PostConstruct;
-import java.util.*;
+import java.util.Collection;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.ConcurrentHashMap;
 
 @Component
 @RequiredArgsConstructor
 public class ClustersStorage {
 
-    private final Map<String, KafkaCluster> kafkaClusters = new HashMap<>();
+    private final Map<String, KafkaCluster> kafkaClusters = new ConcurrentHashMap<>();
 
     private final ClustersProperties clusterProperties;
 
@@ -33,7 +36,15 @@ public class ClustersStorage {
         return kafkaClusters.values();
     }
 
-    public KafkaCluster getClusterByName(String clusterName) {
-        return kafkaClusters.get(clusterName);
+    public Optional<KafkaCluster> getClusterByName(String clusterName) {
+        return Optional.ofNullable(kafkaClusters.get(clusterName));
+    }
+
+    public void setKafkaCluster(String key, KafkaCluster kafkaCluster) {
+        this.kafkaClusters.put(key, kafkaCluster);
+    }
+
+    public Map<String, KafkaCluster> getKafkaClustersMap() {
+        return kafkaClusters;
     }
 }

+ 25 - 0
kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/model/InternalClusterMetrics.java

@@ -0,0 +1,25 @@
+package com.provectus.kafka.ui.cluster.model;
+
+import lombok.Builder;
+import lombok.Data;
+
+
+@Data
+@Builder(toBuilder = true)
+public class InternalClusterMetrics {
+    private final int brokerCount;
+    private final int topicCount;
+    private final int activeControllers;
+    private final int uncleanLeaderElectionCount;
+    private final int onlinePartitionCount;
+    private final int underReplicatedPartitionCount;
+    private final int offlinePartitionCount;
+    private final int inSyncReplicasCount;
+    private final int outOfSyncReplicasCount;
+    //TODO: find way to fill
+    private final int bytesInPerSec;
+    private final int bytesOutPerSec;
+    //TODO: find way to fill
+    private final int segmentSize;
+    private final int segmentCount;
+}

+ 16 - 0
kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/model/InternalPartition.java

@@ -0,0 +1,16 @@
+package com.provectus.kafka.ui.cluster.model;
+
+import lombok.Builder;
+import lombok.Data;
+
+import java.util.List;
+
+@Data
+@Builder
+public class InternalPartition {
+    private final int partition;
+    private final Integer leader;
+    private final List<InternalReplica> replicas;
+    private final int inSyncReplicasCount;
+    private final int replicasCount;
+}

+ 14 - 0
kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/model/InternalReplica.java

@@ -0,0 +1,14 @@
+package com.provectus.kafka.ui.cluster.model;
+
+import lombok.Builder;
+import lombok.Data;
+import lombok.RequiredArgsConstructor;
+
+@Data
+@Builder
+@RequiredArgsConstructor
+public class InternalReplica {
+    private final int broker;
+    private final boolean leader;
+    private final boolean inSync;
+}

+ 25 - 0
kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/model/InternalTopic.java

@@ -0,0 +1,25 @@
+package com.provectus.kafka.ui.cluster.model;
+
+import lombok.Builder;
+import lombok.Data;
+
+import java.util.List;
+
+@Data
+@Builder(toBuilder = true)
+public class InternalTopic {
+
+    private final String name;
+    private final boolean internal;
+    private final List<InternalPartition> partitions;
+    private final List<InternalTopicConfig> topicConfigs;
+
+    private final int replicas;
+    private final int partitionCount;
+    private final int inSyncReplicas;
+    private final int replicationFactor;
+    private final int underReplicatedPartitions;
+    //TODO: find way to fill
+    private final int segmentSize;
+    private final int segmentCount;
+}

+ 13 - 0
kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/model/InternalTopicConfig.java

@@ -0,0 +1,13 @@
+package com.provectus.kafka.ui.cluster.model;
+
+
+import lombok.Builder;
+import lombok.Data;
+
+@Data
+@Builder
+public class InternalTopicConfig {
+    private final String name;
+    private final String value;
+    private final String defaultValue;
+}

+ 15 - 41
kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/model/KafkaCluster.java

@@ -1,51 +1,25 @@
 package com.provectus.kafka.ui.cluster.model;
 
-import com.provectus.kafka.ui.cluster.util.SupportedCommands;
-import com.provectus.kafka.ui.model.*;
-import lombok.AccessLevel;
+import com.provectus.kafka.ui.model.ServerStatus;
+import lombok.Builder;
 import lombok.Data;
-import lombok.experimental.FieldDefaults;
-import org.I0Itec.zkclient.ZkClient;
-import org.apache.kafka.clients.admin.AdminClient;
 
-import java.util.ArrayList;
-import java.util.List;
 import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
 
 @Data
-@FieldDefaults(level = AccessLevel.PRIVATE)
+@Builder(toBuilder = true)
 public class KafkaCluster {
 
-    String id = "";
-    String name;
-    String jmxHost;
-    String jmxPort;
-    String bootstrapServers;
-    String zookeeper;
-
-    Cluster cluster = new Cluster();
-    BrokersMetrics brokersMetrics = new BrokersMetrics();
-
-    List<Topic> topics = new ArrayList<>();
-    private Map<String, TopicDetails> topicDetailsMap = new ConcurrentHashMap<>();
-    private Map<String, List<TopicConfig>> topicConfigsMap = new ConcurrentHashMap<>();
-
-
-    ZkClient zkClient;
-    AdminClient adminClient;
-    List<SupportedCommands> supportedCommands;
-    ServerStatus zookeeperStatus = ServerStatus.OFFLINE;
-
-    Exception lastKafkaException;
-    Exception lastZookeeperException;
-
-    public TopicDetails getOrCreateTopicDetails(String key) {
-        var topicDetails = topicDetailsMap.get(key);
-        if(topicDetails == null) {
-            topicDetailsMap.putIfAbsent(key, new TopicDetails());
-            topicDetails = topicDetailsMap.get(key);
-        }
-        return topicDetails;
-    }
+    private final String id = "";
+    private final String name;
+    private final String jmxHost;
+    private final String jmxPort;
+    private final String bootstrapServers;
+    private final String zookeeper;
+    private final ServerStatus status;
+    private final ServerStatus zookeeperStatus;
+    private final InternalClusterMetrics metrics;
+    private final Map<String, InternalTopic> topics;
+    private final Throwable lastKafkaException;
+    private final Throwable lastZookeeperException;
 }

+ 0 - 16
kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/model/KafkaMetrics.java

@@ -1,16 +0,0 @@
-package com.provectus.kafka.ui.cluster.model;
-
-import lombok.Data;
-
-@Data
-public class KafkaMetrics {
-
-    Double bytesInPerSec;
-    Double bytesOutPerSec;
-    Integer brokersCount;
-    Integer topicCount;
-    Integer activeControllerCount;
-    Integer onlinePartitionCount;
-    Integer offlinePartitionCount;
-    Integer underReplicatedPartitions;
-}

+ 34 - 37
kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/service/ClusterService.java

@@ -1,19 +1,18 @@
 package com.provectus.kafka.ui.cluster.service;
 
+import com.provectus.kafka.ui.cluster.mapper.ClusterMapper;
 import com.provectus.kafka.ui.cluster.model.ClustersStorage;
 import com.provectus.kafka.ui.cluster.model.KafkaCluster;
-import com.provectus.kafka.ui.cluster.util.ClusterUtil;
 import com.provectus.kafka.ui.kafka.KafkaService;
 import com.provectus.kafka.ui.model.*;
 import lombok.RequiredArgsConstructor;
 import lombok.SneakyThrows;
-import org.apache.kafka.clients.admin.ConsumerGroupListing;
-import org.springframework.http.ResponseEntity;
 import org.springframework.stereotype.Service;
-import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
 
+import java.util.Collections;
 import java.util.List;
+import java.util.Optional;
 import java.util.stream.Collectors;
 
 @Service
@@ -21,45 +20,47 @@ import java.util.stream.Collectors;
 public class ClusterService {
 
     private final ClustersStorage clustersStorage;
+    private final ClusterMapper clusterMapper;
     private final KafkaService kafkaService;
 
-    public Mono<ResponseEntity<Flux<Cluster>>> getClusters() {
-        List<Cluster> clusters = clustersStorage.getKafkaClusters()
+    public List<Cluster> getClusters() {
+        return clustersStorage.getKafkaClusters()
                 .stream()
-                .map(KafkaCluster::getCluster)
+                .map(clusterMapper::toCluster)
                 .collect(Collectors.toList());
-
-        return Mono.just(ResponseEntity.ok(Flux.fromIterable(clusters)));
     }
 
-    public Mono<ResponseEntity<BrokersMetrics>> getBrokersMetrics(String name) {
-        KafkaCluster cluster = clustersStorage.getClusterByName(name);
-        if (cluster == null) return null;
-        return Mono.just(ResponseEntity.ok(cluster.getBrokersMetrics()));
+    public Optional<BrokersMetrics> getBrokersMetrics(String name) {
+        return clustersStorage.getClusterByName(name)
+                .map(KafkaCluster::getMetrics)
+                .map(clusterMapper::toBrokerMetrics);
     }
 
-    public Mono<ResponseEntity<Flux<Topic>>> getTopics(String name) {
-        KafkaCluster cluster = clustersStorage.getClusterByName(name);
-        if (cluster == null) return null;
-        return Mono.just(ResponseEntity.ok(Flux.fromIterable(cluster.getTopics())));
+    public List<Topic> getTopics(String name) {
+        return clustersStorage.getClusterByName(name)
+                .map( c ->
+                        c.getTopics().values().stream()
+                                .map(clusterMapper::toTopic)
+                                .collect(Collectors.toList())
+                ).orElse(Collections.emptyList());
     }
 
-    public Mono<ResponseEntity<TopicDetails>> getTopicDetails(String name, String topicName) {
-        KafkaCluster cluster = clustersStorage.getClusterByName(name);
-        if (cluster == null) return null;
-        return Mono.just(ResponseEntity.ok(cluster.getOrCreateTopicDetails(topicName)));
+    public Optional<TopicDetails> getTopicDetails(String name, String topicName) {
+        return clustersStorage.getClusterByName(name).flatMap(
+                c -> Optional.ofNullable(c.getTopics().get(topicName))
+        ).map(clusterMapper::toTopicDetails);
     }
 
-    public Mono<ResponseEntity<Flux<TopicConfig>>> getTopicConfigs(String name, String topicName) {
-        KafkaCluster cluster = clustersStorage.getClusterByName(name);
-        if (cluster == null) return null;
-        return Mono.just(ResponseEntity.ok(Flux.fromIterable(cluster.getTopicConfigsMap().get(topicName))));
+    public Optional<List<TopicConfig>> getTopicConfigs(String name, String topicName) {
+        return clustersStorage.getClusterByName(name).flatMap(
+                c -> Optional.ofNullable(c.getTopics().get(topicName))
+        ).map( t -> t.getTopicConfigs().stream().map(clusterMapper::toTopicConfig).collect(Collectors.toList()));
     }
 
-    public Mono<ResponseEntity<Topic>> createTopic(String name, Mono<TopicFormData> topicFormData) {
-        KafkaCluster cluster = clustersStorage.getClusterByName(name);
-        if (cluster == null) return null;
-        return kafkaService.createTopic(cluster, topicFormData);
+    public Mono<Topic> createTopic(String name, Mono<TopicFormData> topicFormData) {
+        return clustersStorage.getClusterByName(name).map(
+                cluster -> kafkaService.createTopic(cluster, topicFormData)
+        ).orElse(Mono.empty()).map(clusterMapper::toTopic);
     }
 
     public Mono<ResponseEntity<Topic>> updateTopic(String clusterName, String topicName, Mono<TopicFormData> topicFormData, Integer id) {
@@ -69,13 +70,9 @@ public class ClusterService {
     }
 
     @SneakyThrows
-    public Mono<ResponseEntity<Flux<ConsumerGroup>>> getConsumerGroup (String clusterName) {
-            var cluster = clustersStorage.getClusterByName(clusterName);
-            return ClusterUtil.toMono(cluster.getAdminClient().listConsumerGroups().all())
-                    .flatMap(s -> ClusterUtil.toMono(cluster.getAdminClient()
-                            .describeConsumerGroups(s.stream().map(ConsumerGroupListing::groupId).collect(Collectors.toList())).all()))
-                    .map(s -> s.values().stream()
-                            .map(c -> ClusterUtil.convertToConsumerGroup(c, cluster)).collect(Collectors.toList()))
-                    .map(s -> ResponseEntity.ok(Flux.fromIterable(s)));
+    public Mono<List<ConsumerGroup>> getConsumerGroups(String clusterName) {
+            return clustersStorage.getClusterByName(clusterName)
+                    .map(kafkaService::getConsumerGroups)
+                    .orElse(Mono.empty());
     }
 }

+ 4 - 7
kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/service/MetricsUpdateService.java

@@ -5,8 +5,8 @@ import com.provectus.kafka.ui.kafka.KafkaService;
 import com.provectus.kafka.ui.zookeeper.ZookeeperService;
 import lombok.RequiredArgsConstructor;
 import lombok.extern.log4j.Log4j2;
-import org.springframework.scheduling.annotation.Async;
 import org.springframework.stereotype.Service;
+import reactor.core.publisher.Mono;
 
 @Service
 @RequiredArgsConstructor
@@ -14,12 +14,9 @@ import org.springframework.stereotype.Service;
 public class MetricsUpdateService {
 
     private final KafkaService kafkaService;
-    private final ZookeeperService zookeeperService;
 
-    @Async
-    public void updateMetrics(KafkaCluster kafkaCluster) {
-        log.debug("Start getting metrics for kafkaCluster: " + kafkaCluster.getName());
-        kafkaService.loadClusterMetrics(kafkaCluster);
-        zookeeperService.checkZookeeperStatus(kafkaCluster);
+    public Mono<KafkaCluster> updateMetrics(KafkaCluster kafkaCluster) {
+        log.debug("Start getting metrics for kafkaCluster: {}", kafkaCluster);
+        return kafkaService.getUpdatedCluster(kafkaCluster);
     }
 }

+ 72 - 5
kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/util/ClusterUtil.java

@@ -1,10 +1,12 @@
 package com.provectus.kafka.ui.cluster.util;
 
-import com.provectus.kafka.ui.cluster.model.KafkaCluster;
+import com.provectus.kafka.ui.cluster.model.*;
 import com.provectus.kafka.ui.model.ConsumerGroup;
 import lombok.extern.log4j.Log4j2;
 import org.apache.kafka.clients.admin.Config;
+import org.apache.kafka.clients.admin.ConfigEntry;
 import org.apache.kafka.clients.admin.ConsumerGroupDescription;
+import org.apache.kafka.clients.admin.TopicDescription;
 import org.apache.kafka.common.KafkaFuture;
 import org.apache.kafka.common.config.ConfigResource;
 import reactor.core.publisher.Mono;
@@ -13,6 +15,11 @@ import java.util.HashSet;
 import java.util.Map;
 import java.util.NoSuchElementException;
 import java.util.Set;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import static com.provectus.kafka.ui.kafka.KafkaConstants.TOPIC_DEFAULT_CONFIGS;
+import static org.apache.kafka.common.config.TopicConfig.MESSAGE_FORMAT_VERSION_CONFIG;
 
 @Log4j2
 public class ClusterUtil {
@@ -31,12 +38,11 @@ public class ClusterUtil {
 
     public static ConsumerGroup convertToConsumerGroup(ConsumerGroupDescription c, KafkaCluster cluster) {
         ConsumerGroup consumerGroup = new ConsumerGroup();
-        consumerGroup.setClusterId(cluster.getCluster().getId());
+        consumerGroup.setClusterId(cluster.getId());
         consumerGroup.setConsumerGroupId(c.groupId());
         consumerGroup.setNumConsumers(c.members().size());
-        Set<String> topics = new HashSet<>();
-        c.members().forEach(s1 -> s1.assignment().topicPartitions().forEach(s2 -> topics.add(s2.topic())));
-        consumerGroup.setNumTopics(topics.size());
+        int numTopics = c.members().stream().mapToInt( m -> m.assignment().topicPartitions().size()).sum();
+        consumerGroup.setNumTopics(numTopics);
         return consumerGroup;
     }
 
@@ -57,4 +63,65 @@ public class ClusterUtil {
             throw e;
         }
     }
+
+    public static InternalTopicConfig mapToInternalTopicConfig(ConfigEntry configEntry) {
+        InternalTopicConfig.InternalTopicConfigBuilder builder = InternalTopicConfig.builder()
+                .name(configEntry.name())
+                .value(configEntry.value());
+        if (configEntry.name().equals(MESSAGE_FORMAT_VERSION_CONFIG)) {
+            builder.defaultValue(configEntry.value());
+        } else {
+            builder.defaultValue(TOPIC_DEFAULT_CONFIGS.get(configEntry.name()));
+        }
+        return builder.build();
+    }
+
+    public static InternalTopic mapToInternalTopic(TopicDescription topicDescription) {
+        var topic = InternalTopic.builder();
+        topic.internal(topicDescription.isInternal());
+        topic.name(topicDescription.name());
+
+        List<InternalPartition> partitions = topicDescription.partitions().stream().map(
+                partition -> {
+                    var partitionDto = InternalPartition.builder();
+                    partitionDto.leader(partition.leader().id());
+                    partitionDto.partition(partition.partition());
+                    partitionDto.inSyncReplicasCount(partition.isr().size());
+                    partitionDto.replicasCount(partition.replicas().size());
+                    List<InternalReplica> replicas = partition.replicas().stream().map(
+                            r -> new InternalReplica(r.id(), partition.leader().id()!=r.id(), partition.isr().contains(r)))
+                            .collect(Collectors.toList());
+                    partitionDto.replicas(replicas);
+                    return partitionDto.build();
+                })
+                .collect(Collectors.toList());
+
+        int urpCount = partitions.stream()
+                .flatMap(partition -> partition.getReplicas().stream())
+                .filter(InternalReplica::isInSync).mapToInt(e -> 1)
+                .sum();
+
+        int inSyncReplicasCount = partitions.stream()
+                .mapToInt(InternalPartition::getInSyncReplicasCount)
+                .sum();
+
+        int replicasCount = partitions.stream()
+                .mapToInt(InternalPartition::getReplicasCount)
+                .sum();
+
+        topic.partitions(partitions);
+        topic.replicas(replicasCount);
+        topic.partitionCount(topicDescription.partitions().size());
+        topic.inSyncReplicas(inSyncReplicasCount);
+
+        topic.replicationFactor(
+                topicDescription.partitions().size() > 0 ?
+                        topicDescription.partitions().get(0).replicas().size() : 0
+        );
+
+        topic.underReplicatedPartitions(urpCount);
+
+        return topic.build();
+    }
+
 }

+ 193 - 216
kafka-ui-api/src/main/java/com/provectus/kafka/ui/kafka/KafkaService.java

@@ -1,79 +1,177 @@
 package com.provectus.kafka.ui.kafka;
 
+import com.provectus.kafka.ui.cluster.model.InternalClusterMetrics;
+import com.provectus.kafka.ui.cluster.model.InternalTopic;
+import com.provectus.kafka.ui.cluster.model.InternalTopicConfig;
 import com.provectus.kafka.ui.cluster.model.KafkaCluster;
 import com.provectus.kafka.ui.cluster.util.ClusterUtil;
 import com.provectus.kafka.ui.cluster.util.SupportedCommands;
-import com.provectus.kafka.ui.model.*;
+import com.provectus.kafka.ui.model.ConsumerGroup;
+import com.provectus.kafka.ui.model.ServerStatus;
+import com.provectus.kafka.ui.model.Topic;
+import com.provectus.kafka.ui.model.TopicFormData;
+import com.provectus.kafka.ui.zookeeper.ZookeeperService;
 import lombok.RequiredArgsConstructor;
 import lombok.SneakyThrows;
 import lombok.extern.log4j.Log4j2;
 import org.apache.kafka.clients.admin.*;
-import org.apache.kafka.common.*;
+import org.apache.kafka.common.KafkaFuture;
+import org.apache.kafka.common.Node;
 import org.apache.kafka.common.config.ConfigResource;
-import org.springframework.http.HttpStatus;
-import org.springframework.http.ResponseEntity;
-import org.springframework.scheduling.annotation.Async;
 import org.springframework.stereotype.Service;
 import reactor.core.publisher.Mono;
-
-import java.util.*;
+import reactor.util.function.Tuple2;
+import reactor.util.function.Tuples;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
-import static com.provectus.kafka.ui.kafka.KafkaConstants.*;
-import static org.apache.kafka.common.config.TopicConfig.MESSAGE_FORMAT_VERSION_CONFIG;
-
 @Service
 @RequiredArgsConstructor
 @Log4j2
 public class KafkaService {
 
+    private static final ListTopicsOptions LIST_TOPICS_OPTIONS = new ListTopicsOptions().listInternal(true);
+
+    private final ZookeeperService zookeeperService;
+    private final Map<String, AdminClient> adminClientCache = new ConcurrentHashMap<>();
+
     @SneakyThrows
-    @Async
-    public void loadClusterMetrics(KafkaCluster kafkaCluster) {
-        log.debug("Start getting Kafka metrics for cluster: " + kafkaCluster.getName());
-        boolean isConnected = false;
-        if (kafkaCluster.getAdminClient() != null) {
-            isConnected = isAdminClientConnected(kafkaCluster);
-        }
-        if (kafkaCluster.getAdminClient() == null || !isConnected) {
-            isConnected = createAdminClient(kafkaCluster);
+    public Mono<KafkaCluster> getUpdatedCluster(KafkaCluster cluster) {
+        return getOrCreateAdminClient(cluster).flatMap(
+                ac -> getClusterMetrics(ac).flatMap( clusterMetrics ->
+                            getTopicsData(ac).flatMap( topics ->
+                                loadTopicsConfig(ac, topics.stream().map(InternalTopic::getName).collect(Collectors.toList()))
+                                        .map( configs -> mergeWithConfigs(topics, configs) )
+                            ).map( topics -> buildFromData(cluster, clusterMetrics, topics))
+                        )
+        ).onErrorResume(
+                e -> Mono.just(cluster.toBuilder()
+                        .status(ServerStatus.OFFLINE)
+                        .lastKafkaException(e)
+                        .build())
+        );
+    }
+
+    private KafkaCluster buildFromData(KafkaCluster currentCluster, InternalClusterMetrics brokersMetrics, Map<String, InternalTopic> topics) {
+
+        InternalClusterMetrics.InternalClusterMetricsBuilder metricsBuilder = brokersMetrics.toBuilder();
+
+        InternalClusterMetrics topicsMetrics = collectTopicsMetrics(topics);
+
+        ServerStatus zookeeperStatus = ServerStatus.OFFLINE;
+        Throwable zookeeperException = null;
+        try {
+            zookeeperStatus = zookeeperService.isZookeeperOnline(currentCluster) ? ServerStatus.ONLINE : ServerStatus.OFFLINE;
+        } catch (Throwable e) {
+            zookeeperException = e;
         }
 
-        if (!isConnected) {
-            kafkaCluster.getCluster().setStatus(ServerStatus.OFFLINE);
+        InternalClusterMetrics clusterMetrics = metricsBuilder
+                .activeControllers(brokersMetrics.getActiveControllers())
+                .brokerCount(brokersMetrics.getBrokerCount())
+                .underReplicatedPartitionCount(topicsMetrics.getUnderReplicatedPartitionCount())
+                .inSyncReplicasCount(topicsMetrics.getInSyncReplicasCount())
+                .outOfSyncReplicasCount(topicsMetrics.getOutOfSyncReplicasCount())
+                .onlinePartitionCount(topicsMetrics.getOnlinePartitionCount())
+                .offlinePartitionCount(topicsMetrics.getOfflinePartitionCount()).build();
+
+        return currentCluster.toBuilder()
+                .status(ServerStatus.ONLINE)
+                .zookeeperStatus(zookeeperStatus)
+                .lastZookeeperException(zookeeperException)
+                .lastKafkaException(null)
+                .metrics(clusterMetrics)
+                .topics(topics)
+                .build();
+    }
+
+    private InternalClusterMetrics collectTopicsMetrics(Map<String,InternalTopic> topics) {
+
+        int underReplicatedPartitions = 0;
+        int inSyncReplicasCount = 0;
+        int outOfSyncReplicasCount = 0;
+        int onlinePartitionCount = 0;
+        int offlinePartitionCount = 0;
 
-            return;
+        for (InternalTopic topic : topics.values()) {
+            underReplicatedPartitions += topic.getUnderReplicatedPartitions();
+            inSyncReplicasCount += topic.getInSyncReplicas();
+            outOfSyncReplicasCount += (topic.getReplicas() - topic.getInSyncReplicas());
+            onlinePartitionCount += topic.getPartitions().stream().mapToInt(s -> s.getLeader() == null ? 0 : 1).sum();
+            offlinePartitionCount += topic.getPartitions().stream().mapToInt(s -> s.getLeader() != null ? 0 : 1).sum();
         }
 
-        kafkaCluster.getCluster().setId(kafkaCluster.getId());
-        kafkaCluster.getCluster().setStatus(ServerStatus.ONLINE);
-        loadMetrics(kafkaCluster);
-        loadTopicsData(kafkaCluster);
+        return InternalClusterMetrics.builder()
+                .underReplicatedPartitionCount(underReplicatedPartitions)
+                .inSyncReplicasCount(inSyncReplicasCount)
+                .outOfSyncReplicasCount(outOfSyncReplicasCount)
+                .onlinePartitionCount(onlinePartitionCount)
+                .offlinePartitionCount(offlinePartitionCount)
+                .build();
+    }
+
+    private Map<String, InternalTopic> mergeWithConfigs(List<InternalTopic> topics, Map<String, List<InternalTopicConfig>> configs) {
+        return topics.stream().map(
+                t -> t.toBuilder().topicConfigs(configs.get(t.getName())).build()
+        ).collect(Collectors.toMap(
+                InternalTopic::getName,
+                e -> e
+        ));
+    }
+
+    @SneakyThrows
+    private Mono<List<InternalTopic>> getTopicsData(AdminClient adminClient) {
+        return ClusterUtil.toMono(adminClient.listTopics(LIST_TOPICS_OPTIONS).names())
+                    .flatMap(topics -> ClusterUtil.toMono(adminClient.describeTopics(topics).all()))
+                    .map( m -> m.values().stream().map(ClusterUtil::mapToInternalTopic).collect(Collectors.toList()));
+    }
+
+    private Mono<InternalClusterMetrics> getClusterMetrics(AdminClient client) {
+        return ClusterUtil.toMono(client.describeCluster().nodes())
+                .flatMap(brokers ->
+                    ClusterUtil.toMono(client.describeCluster().controller()).map(
+                        c -> {
+                            InternalClusterMetrics.InternalClusterMetricsBuilder builder = InternalClusterMetrics.builder();
+                            builder.brokerCount(brokers.size()).activeControllers(c != null ? 1 : 0);
+                            // TODO: fill bytes in/out metrics
+                            List<Integer> brokerIds = brokers.stream().map(Node::id).collect(Collectors.toList());
+
+                            return builder.build();
+                        }
+                    )
+                );
     }
 
 
+    public Mono<InternalTopic> createTopic(KafkaCluster cluster, Mono<TopicFormData> topicFormData) {
+        AdminClient adminClient = this.createAdminClient(cluster);
+        return this.createTopic(adminClient, topicFormData);
+    }
+
     @SneakyThrows
-    public Mono<ResponseEntity<Topic>> createTopic(KafkaCluster cluster, Mono<TopicFormData> topicFormData) {
+    public Mono<InternalTopic> createTopic(AdminClient adminClient, Mono<TopicFormData> topicFormData) {
         return topicFormData.flatMap(
                 topicData -> {
-                    AdminClient adminClient = cluster.getAdminClient();
                     NewTopic newTopic = new NewTopic(topicData.getName(), topicData.getPartitions(), topicData.getReplicationFactor().shortValue());
                     newTopic.configs(topicData.getConfigs());
-
-                    createTopic(adminClient, newTopic);
-
-                    DescribeTopicsResult topicDescriptionsWrapper = adminClient.describeTopics(Collections.singletonList(topicData.getName()));
-                    Map<String, KafkaFuture<TopicDescription>> topicDescriptionFuturesMap = topicDescriptionsWrapper.values();
-                    var entry = topicDescriptionFuturesMap.entrySet().iterator().next();
-                    var topicDescription = getTopicDescription(entry);
-                    if (topicDescription == null) return Mono.error(new RuntimeException("Can't find created topic"));
-
-                    Topic topic = collectTopicData(cluster, topicDescription);
-                    cluster.getTopics().add(topic);
-                    return Mono.just(new ResponseEntity<>(topic, HttpStatus.CREATED));
-                }
-        );
+                    return createTopic(adminClient, newTopic).map( v -> topicData);
+                }).flatMap(topicData -> {
+                    var tdw = adminClient.describeTopics(Collections.singletonList(topicData.getName()));
+                    return getTopicDescription(tdw.values().get(topicData.getName()), topicData.getName());
+                })
+                .switchIfEmpty(Mono.error(new RuntimeException("Can't find created topic")))
+                .map(ClusterUtil::mapToInternalTopic)
+                .flatMap( t ->
+                        loadTopicsConfig(adminClient, Collections.singletonList(t.getName()))
+                                .map( c -> mergeWithConfigs(Collections.singletonList(t), c))
+                                .map( m -> m.values().iterator().next())
+                );
     }
 
     public Mono<Topic> updateTopic(KafkaCluster cluster, String topicName, TopicFormData topicFormData, Integer id) {
@@ -101,200 +199,79 @@ public class KafkaService {
     }
 
 
-
     @SneakyThrows
-    private void loadTopicsData(KafkaCluster kafkaCluster) {
-        AdminClient adminClient = kafkaCluster.getAdminClient();
-        ListTopicsOptions listTopicsOptions = new ListTopicsOptions();
-        listTopicsOptions.listInternal(true);
-        var topicListings = adminClient.listTopics(listTopicsOptions).names().get();
-        kafkaCluster.getCluster().setTopicCount(topicListings.size());
-
-        DescribeTopicsResult topicDescriptionsWrapper = adminClient.describeTopics(topicListings);
-        Map<String, KafkaFuture<TopicDescription>> topicDescriptionFuturesMap = topicDescriptionsWrapper.values();
-        List<Topic> foundTopics = new ArrayList<>();
-        resetMetrics(kafkaCluster);
-
-        for (var entry : topicDescriptionFuturesMap.entrySet()) {
-            var topicDescription = getTopicDescription(entry);
-            if (topicDescription == null) continue;
-            Topic topic = collectTopicData(kafkaCluster, topicDescription);
-            foundTopics.add(topic);
-        }
-        kafkaCluster.setTopics(foundTopics);
+    private Mono<String> getClusterId(AdminClient adminClient) {
+        return ClusterUtil.toMono(adminClient.describeCluster().clusterId());
     }
 
-    private void resetMetrics(KafkaCluster kafkaCluster) {
-        kafkaCluster.getBrokersMetrics().setOnlinePartitionCount(0);
-        kafkaCluster.getBrokersMetrics().setOfflinePartitionCount(0);
-        kafkaCluster.getBrokersMetrics().setUnderReplicatedPartitionCount(0);
-        kafkaCluster.getBrokersMetrics().setInSyncReplicasCount(0);
-        kafkaCluster.getBrokersMetrics().setOutOfSyncReplicasCount(0);
-    }
 
-    private Topic collectTopicData(KafkaCluster kafkaCluster, TopicDescription topicDescription) {
-        var topic = new Topic();
-        topic.setInternal(topicDescription.isInternal());
-        topic.setName(topicDescription.name());
-
-        int inSyncReplicasCount = 0, replicasCount = 0;
-        List<Partition> partitions = new ArrayList<>();
-
-        int urpCount = 0;
-        for (TopicPartitionInfo partition : topicDescription.partitions()) {
-            var partitionDto = new Partition();
-            partitionDto.setLeader(partition.leader().id());
-            partitionDto.setPartition(partition.partition());
-            List<Replica> replicas = new ArrayList<>();
-
-            boolean isUrp = false;
-            for (Node replicaNode : partition.replicas()) {
-                var replica = new Replica();
-                replica.setBroker(replicaNode.id());
-                replica.setLeader(partition.leader() != null && partition.leader().id() == replicaNode.id());
-                replica.setInSync(partition.isr().contains(replicaNode));
-                if (!replica.getInSync()) {
-                    isUrp = true;
-                }
-                replicas.add(replica);
-
-                inSyncReplicasCount += partition.isr().size();
-                replicasCount += partition.replicas().size();
-            }
-            if (isUrp) {
-                urpCount++;
-            }
-            partitionDto.setReplicas(replicas);
-            partitions.add(partitionDto);
-
-            if (partition.leader() != null) {
-                kafkaCluster.getBrokersMetrics().setOnlinePartitionCount(kafkaCluster.getBrokersMetrics().getOnlinePartitionCount() + 1);
-            } else {
-                kafkaCluster.getBrokersMetrics().setOfflinePartitionCount(kafkaCluster.getBrokersMetrics().getOfflinePartitionCount() + 1);
-            }
-        }
+    public Mono<AdminClient> getOrCreateAdminClient(KafkaCluster cluster) {
+        AdminClient adminClient = adminClientCache.computeIfAbsent(
+                cluster.getId(),
+                (id) -> createAdminClient(cluster)
+        );
 
-        kafkaCluster.getCluster().setOnlinePartitionCount(kafkaCluster.getBrokersMetrics().getOnlinePartitionCount());
-        kafkaCluster.getBrokersMetrics().setUnderReplicatedPartitionCount(
-                kafkaCluster.getBrokersMetrics().getUnderReplicatedPartitionCount() + urpCount);
-        kafkaCluster.getBrokersMetrics().setInSyncReplicasCount(
-                kafkaCluster.getBrokersMetrics().getInSyncReplicasCount() + inSyncReplicasCount);
-        kafkaCluster.getBrokersMetrics().setOutOfSyncReplicasCount(
-                kafkaCluster.getBrokersMetrics().getOutOfSyncReplicasCount() + (replicasCount - inSyncReplicasCount));
-
-        topic.setPartitions(partitions);
-        TopicDetails topicDetails = kafkaCluster.getOrCreateTopicDetails(topicDescription.name());
-        topicDetails.setReplicas(replicasCount);
-        topicDetails.setPartitionCount(topicDescription.partitions().size());
-        topicDetails.setInSyncReplicas(inSyncReplicasCount);
-        topicDetails.setReplicationFactor(topicDescription.partitions().size() > 0
-                ? topicDescription.partitions().get(0).replicas().size()
-                : null);
-        topicDetails.setUnderReplicatedPartitions(urpCount);
-
-        loadTopicConfig(kafkaCluster, topicDescription.name());
-
-        return topic;
+        return isAdminClientConnected(adminClient);
     }
 
-    private TopicDescription getTopicDescription(Map.Entry<String, KafkaFuture<TopicDescription>> entry) {
-        try {
-            return entry.getValue().get();
-        } catch (Exception e) {
-            log.error("Can't get topic with name: " + entry.getKey(), e);
-
-            return null;
-        }
+    public AdminClient createAdminClient(KafkaCluster kafkaCluster) {
+        Properties properties = new Properties();
+        properties.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaCluster.getBootstrapServers());
+        properties.put(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, 5000);
+        return AdminClient.create(properties);
     }
 
-    private void loadMetrics(KafkaCluster kafkaCluster) throws InterruptedException, java.util.concurrent.ExecutionException {
-        AdminClient adminClient = kafkaCluster.getAdminClient();
-        int brokerCount = adminClient.describeCluster().nodes().get().size();
-        kafkaCluster.getCluster().setBrokerCount(brokerCount);
-        kafkaCluster.getBrokersMetrics().setBrokerCount(brokerCount);
-        kafkaCluster.getBrokersMetrics().setActiveControllers(adminClient.describeCluster().controller().get() != null ? 1 : 0);
-
-        for (Map.Entry<MetricName, ? extends Metric> metricNameEntry : adminClient.metrics().entrySet()) {
-            if (metricNameEntry.getKey().name().equals(IN_BYTE_PER_SEC_METRIC)
-                    && metricNameEntry.getKey().description().equals(IN_BYTE_PER_SEC_METRIC_DESCRIPTION)) {
-                kafkaCluster.getCluster().setBytesInPerSec((int) Math.round((double) metricNameEntry.getValue().metricValue()));
-            }
-            if (metricNameEntry.getKey().name().equals(OUT_BYTE_PER_SEC_METRIC)
-                    && metricNameEntry.getKey().description().equals(OUT_BYTE_PER_SEC_METRIC_DESCRIPTION)) {
-                kafkaCluster.getCluster().setBytesOutPerSec((int) Math.round((double) metricNameEntry.getValue().metricValue()));
-            }
-        }
+    private Mono<AdminClient> isAdminClientConnected(AdminClient adminClient) {
+        return getClusterId(adminClient).map( r -> adminClient);
     }
 
-    @SneakyThrows
-    private void loadTopicConfig(KafkaCluster kafkaCluster, String topicName) {
-        AdminClient adminClient = kafkaCluster.getAdminClient();
-
-        Set<ConfigResource> resources = Collections.singleton(new ConfigResource(ConfigResource.Type.TOPIC, topicName));
-        final Map<ConfigResource, Config> configs = adminClient.describeConfigs(resources).all().get();
-
-        if (configs.isEmpty()) return;
-
-        Collection<ConfigEntry> entries = configs.values().iterator().next().entries();
-        List<TopicConfig> topicConfigs = new ArrayList<>();
-        for (ConfigEntry entry : entries) {
-            TopicConfig topicConfig = new TopicConfig();
-            topicConfig.setName(entry.name());
-            topicConfig.setValue(entry.value());
-            if (topicConfig.getName().equals(MESSAGE_FORMAT_VERSION_CONFIG)) {
-                topicConfig.setDefaultValue(topicConfig.getValue());
-            } else {
-                topicConfig.setDefaultValue(TOPIC_DEFAULT_CONFIGS.get(entry.name()));
-            }
-            topicConfigs.add(topicConfig);
-        }
 
-        kafkaCluster.getTopicConfigsMap().put(topicName, topicConfigs);
+
+    private Mono<TopicDescription> getTopicDescription(KafkaFuture<TopicDescription> entry, String topicName) {
+        return ClusterUtil.toMono(entry)
+                    .onErrorResume(e -> {
+                        log.error("Can't get topic with name: " + topicName);
+                        return Mono.empty();
+                    });
     }
 
     @SneakyThrows
-    private void createTopic(AdminClient adminClient, NewTopic newTopic) {
-        adminClient.createTopics(Collections.singletonList(newTopic))
-                .values()
-                .values()
-                .iterator()
-                .next()
-                .get();
+    private Mono<Map<String, List<InternalTopicConfig>>> loadTopicsConfig(AdminClient adminClient, List<String> topicNames) {
+        List<ConfigResource> resources = topicNames.stream()
+                .map(topicName -> new ConfigResource(ConfigResource.Type.TOPIC, topicName))
+                .collect(Collectors.toList());
+
+        return ClusterUtil.toMono(adminClient.describeConfigs(resources).all())
+                .map(configs ->
+                        configs.entrySet().stream().map(
+                                c -> Tuples.of(
+                                        c.getKey().name(),
+                                        c.getValue().entries().stream().map(ClusterUtil::mapToInternalTopicConfig).collect(Collectors.toList())
+                                )
+                        ).collect(Collectors.toMap(
+                                Tuple2::getT1,
+                                Tuple2::getT2
+                        ))
+                );
     }
 
-    private boolean createAdminClient(KafkaCluster kafkaCluster) {
-        try {
-            Properties properties = new Properties();
-            properties.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaCluster.getBootstrapServers());
-            properties.put(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, 5000);
-            kafkaCluster.setAdminClient(AdminClient.create(properties));
-            kafkaCluster.setId(getClusterId(kafkaCluster));
-            kafkaCluster.getCluster().setId(kafkaCluster.getId());
-
-            return true;
-        } catch (Exception e) {
-            log.error(e.getMessage());
-            kafkaCluster.setLastKafkaException(e);
-
-            return false;
-        }
-    }
+    public Mono<List<ConsumerGroup>> getConsumerGroups(KafkaCluster cluster) {
+        var adminClient =  this.createAdminClient(cluster);
 
-    @SneakyThrows
-    private String getClusterId(KafkaCluster kafkaCluster) {
-        return kafkaCluster.getAdminClient().describeCluster().clusterId().get();
+        return ClusterUtil.toMono(adminClient.listConsumerGroups().all())
+                .flatMap(s -> ClusterUtil.toMono(adminClient
+                        .describeConsumerGroups(s.stream().map(ConsumerGroupListing::groupId).collect(Collectors.toList())).all()))
+                .map(s -> s.values().stream()
+                        .map(c -> ClusterUtil.convertToConsumerGroup(c, cluster)).collect(Collectors.toList()));
     }
 
-    private boolean isAdminClientConnected(KafkaCluster kafkaCluster) {
-        try {
-            getClusterId(kafkaCluster);
-
-            return true;
-        } catch (Exception e) {
-            log.error(e.getMessage());
-            kafkaCluster.setLastKafkaException(e);
 
-            return false;
-        }
+    @SneakyThrows
+    private Mono<Void> createTopic(AdminClient adminClient, NewTopic newTopic) {
+        return ClusterUtil.toMono(adminClient.createTopics(Collections.singletonList(newTopic))
+                    .values()
+                    .values()
+                    .iterator()
+                    .next());
     }
 }

+ 27 - 7
kafka-ui-api/src/main/java/com/provectus/kafka/ui/rest/MetricsRestController.java

@@ -7,6 +7,7 @@ import com.provectus.kafka.ui.cluster.util.ClusterUtil;
 import com.provectus.kafka.ui.model.*;
 import lombok.RequiredArgsConstructor;
 import org.apache.kafka.common.Node;
+import org.springframework.http.HttpStatus;
 import org.springframework.http.ResponseEntity;
 import org.springframework.web.bind.annotation.RestController;
 import org.springframework.web.server.ServerWebExchange;
@@ -26,22 +27,30 @@ public class MetricsRestController implements ApiClustersApi {
 
     @Override
     public Mono<ResponseEntity<Flux<Cluster>>> getClusters(ServerWebExchange exchange) {
-        return clusterService.getClusters();
+        return Mono.just(ResponseEntity.ok(Flux.fromIterable(clusterService.getClusters())));
     }
 
     @Override
     public Mono<ResponseEntity<BrokersMetrics>> getBrokersMetrics(String clusterId, ServerWebExchange exchange) {
-        return clusterService.getBrokersMetrics(clusterId);
+        return Mono.just(
+                clusterService.getBrokersMetrics(clusterId)
+                        .map(ResponseEntity::ok)
+                        .orElse(ResponseEntity.notFound().build())
+        );
     }
 
     @Override
     public Mono<ResponseEntity<Flux<Topic>>> getTopics(String clusterId, ServerWebExchange exchange) {
-        return clusterService.getTopics(clusterId);
+        return Mono.just(ResponseEntity.ok(Flux.fromIterable(clusterService.getTopics(clusterId))));
     }
 
     @Override
     public Mono<ResponseEntity<TopicDetails>> getTopicDetails(String clusterId, String topicName, ServerWebExchange exchange) {
-        return clusterService.getTopicDetails(clusterId, topicName);
+        return Mono.just(
+                clusterService.getTopicDetails(clusterId, topicName)
+                        .map(ResponseEntity::ok)
+                        .orElse(ResponseEntity.notFound().build())
+        );
     }
 
     @Override
@@ -54,21 +63,32 @@ public class MetricsRestController implements ApiClustersApi {
 
     @Override
     public Mono<ResponseEntity<Flux<TopicConfig>>> getTopicConfigs(String clusterId, String topicName, ServerWebExchange exchange) {
-        return clusterService.getTopicConfigs(clusterId, topicName);
+        return Mono.just(
+                clusterService.getTopicConfigs(clusterId, topicName)
+                        .map(Flux::fromIterable)
+                        .map(ResponseEntity::ok)
+                        .orElse(ResponseEntity.notFound().build())
+        );
     }
 
     @Override
     public Mono<ResponseEntity<Topic>> createTopic(String clusterId, @Valid Mono<TopicFormData> topicFormData, ServerWebExchange exchange) {
-        return clusterService.createTopic(clusterId, topicFormData);
+        return clusterService.createTopic(clusterId, topicFormData)
+                .map(s -> new ResponseEntity<>(s, HttpStatus.OK))
+                .switchIfEmpty(Mono.just(ResponseEntity.notFound().build()));
     }
 
     @Override
     public Mono<ResponseEntity<Flux<Broker>>> getBrokers(String clusterId, ServerWebExchange exchange) {
+        //TODO: ????
         return Mono.just(ResponseEntity.ok(Flux.fromIterable(new ArrayList<>())));
     }
 
     @Override
     public Mono<ResponseEntity<Flux<ConsumerGroup>>> getConsumerGroup(String clusterName, ServerWebExchange exchange) {
-        return clusterService.getConsumerGroup(clusterName);
+        return clusterService.getConsumerGroups(clusterName)
+                .map(Flux::fromIterable)
+                .map(ResponseEntity::ok)
+                .switchIfEmpty(Mono.just(ResponseEntity.notFound().build())); // TODO: check behaviour on cluster not found and empty groups list
     }
 }

+ 1 - 1
kafka-ui-api/src/main/java/com/provectus/kafka/ui/rest/config/CustomWebFilter.java

@@ -10,7 +10,7 @@ import reactor.core.publisher.Mono;
 public class CustomWebFilter implements WebFilter {
     @Override
     public Mono<Void> filter(ServerWebExchange exchange, WebFilterChain chain) {
-        if (exchange.getRequest().getURI().getPath().equals("/")) {
+        if (exchange.getRequest().getURI().getPath().equals("/") || exchange.getRequest().getURI().getPath().startsWith("/ui")) {
             return chain.filter(exchange.mutate().request(exchange.getRequest().mutate().path("/index.html").build()).build());
         }
 

+ 0 - 10
kafka-ui-api/src/main/java/com/provectus/kafka/ui/zookeeper/ZooKeeperConstants.java

@@ -1,10 +0,0 @@
-package com.provectus.kafka.ui.zookeeper;
-
-public final class ZooKeeperConstants {
-
-    private ZooKeeperConstants() {}
-
-    public static int ONLINE = 1;
-    public static int OFFLINE = 0;
-
-}

+ 20 - 26
kafka-ui-api/src/main/java/com/provectus/kafka/ui/zookeeper/ZookeeperService.java

@@ -1,5 +1,6 @@
 package com.provectus.kafka.ui.zookeeper;
 
+import com.provectus.kafka.ui.cluster.model.ClustersStorage;
 import com.provectus.kafka.ui.cluster.model.KafkaCluster;
 import lombok.RequiredArgsConstructor;
 import lombok.extern.log4j.Log4j2;
@@ -8,6 +9,9 @@ import org.springframework.beans.factory.annotation.Value;
 import org.springframework.scheduling.annotation.Async;
 import org.springframework.stereotype.Service;
 
+import java.util.HashMap;
+import java.util.Map;
+
 @Service
 @RequiredArgsConstructor
 @Log4j2
@@ -26,39 +30,29 @@ public class ZookeeperService {
         if (kafkaCluster.getZkClient() == null || !isConnected) {
             isConnected = createZookeeperConnection(kafkaCluster);
         }
-
-        if (!isConnected) {
-            kafkaCluster.getBrokersMetrics().setZooKeeperStatus(ZooKeeperConstants.OFFLINE);
-
-            return;
+    private final Map<String, ZkClient> cachedZkClient = new HashMap<>();
+
+    public boolean isZookeeperOnline(KafkaCluster kafkaCluster) {
+        var isConnected = false;
+        var zkClient = getOrCreateZkClient(kafkaCluster);
+        log.debug("Start getting Zookeeper metrics for kafkaCluster: {}", kafkaCluster.getName());
+        if (zkClient != null) {
+            isConnected = isZkClientConnected(zkClient);
         }
-
-        kafkaCluster.getBrokersMetrics().setZooKeeperStatus(ZooKeeperConstants.ONLINE);
+        return isConnected;
     }
 
-    private boolean createZookeeperConnection(KafkaCluster kafkaCluster) {
-        try {
-            kafkaCluster.setZkClient(new ZkClient(kafkaCluster.getZookeeper(), sessionTimeout));
-
-            return true;
-        } catch (Exception e) {
-            log.error(e);
-            kafkaCluster.setLastZookeeperException(e);
-
-            return false;
-        }
+    private boolean isZkClientConnected(ZkClient zkClient) {
+        zkClient.getChildren("/brokers/ids");
+        return true;
     }
 
-    private boolean isZkClientConnected(KafkaCluster kafkaCluster) {
+    private ZkClient getOrCreateZkClient (KafkaCluster cluster) {
         try {
-            kafkaCluster.getZkClient().getChildren("/brokers/ids");
-
-            return true;
+            return cachedZkClient.getOrDefault(cluster.getName(), new ZkClient(cluster.getZookeeper(), 1000));
         } catch (Exception e) {
-            log.error(e);
-            kafkaCluster.setLastZookeeperException(e);
-
-            return false;
+            log.error("Error while creating zookeeper client for cluster {}", cluster.getName());
+            return null;
         }
     }
 }

+ 0 - 4
kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml

@@ -256,16 +256,12 @@ components:
     BrokersMetrics:
       type: object
       properties:
-        brokerCount:
-          type: integer
         zooKeeperStatus:
           type: integer
         activeControllers:
           type: integer
         uncleanLeaderElectionCount:
           type: integer
-        onlinePartitionCount:
-          type: integer
         underReplicatedPartitionCount:
           type: integer
         offlinePartitionCount:

+ 31 - 15
kafka-ui-react-app/src/components/App.tsx

@@ -1,9 +1,5 @@
 import React from 'react';
-import {
-  Switch,
-  Route,
-  Redirect,
-} from 'react-router-dom';
+import { Switch, Route, Redirect } from 'react-router-dom';
 import './App.scss';
 import BrokersContainer from './Brokers/BrokersContainer';
 import TopicsContainer from './Topics/TopicsContainer';
@@ -21,13 +17,19 @@ const App: React.FC<AppProps> = ({
   isClusterListFetched,
   fetchClustersList,
 }) => {
-  React.useEffect(() => { fetchClustersList() }, [fetchClustersList]);
+  React.useEffect(() => {
+    fetchClustersList();
+  }, [fetchClustersList]);
 
   return (
     <div className="Layout">
-      <nav className="navbar is-fixed-top is-white Layout__header" role="navigation" aria-label="main navigation">
+      <nav
+        className="navbar is-fixed-top is-white Layout__header"
+        role="navigation"
+        aria-label="main navigation"
+      >
         <div className="navbar-brand">
-          <a className="navbar-item title is-5 is-marginless" href="/">
+          <a className="navbar-item title is-5 is-marginless" href="/ui">
             Kafka UI
           </a>
         </div>
@@ -36,17 +38,31 @@ const App: React.FC<AppProps> = ({
         <NavConatiner className="Layout__navbar" />
         {isClusterListFetched ? (
           <Switch>
-            <Route exact path="/" component={Dashboard} />
-            <Route exact path="/clusters" component={Dashboard} />
-            <Route path="/clusters/:clusterName/topics" component={TopicsContainer} />
-            <Route path="/clusters/:clusterName/brokers" component={BrokersContainer} />
-            <Route path="/clusters/:clusterName/consumer-groups" component={ConsumersGroupsContainer} />
-            <Redirect from="/clusters/:clusterName" to="/clusters/:clusterName/brokers" />
+            <Route
+              exact
+              path={['/', '/ui', '/ui/clusters']}
+              component={Dashboard}
+            />
+            <Route
+              path="/ui/clusters/:clusterName/brokers"
+              component={BrokersContainer}
+            />
+            <Route
+              path="/ui/clusters/:clusterName/topics"
+              component={TopicsContainer}
+            />
+            <Route
+              path="/ui/clusters/:clusterName/consumer-groups"
+              component={ConsumersGroupsContainer}
+            />
+            <Redirect
+              from="/ui/clusters/:clusterName"
+              to="/ui/clusters/:clusterName/brokers"
+            />
           </Switch>
         ) : (
           <PageLoader />
         )}
-
       </main>
     </div>
   );

+ 11 - 8
kafka-ui-react-app/src/components/ConsumerGroups/ConsumerGroups.tsx

@@ -1,11 +1,8 @@
 import React from 'react';
 import { ClusterName } from 'redux/interfaces';
-import {
-  Switch,
-  Route,
-} from 'react-router-dom';
-import ListContainer from './List/ListContainer';
+import { Switch, Route } from 'react-router-dom';
 import PageLoader from 'components/common/PageLoader/PageLoader';
+import ListContainer from './List/ListContainer';
 
 interface Props {
   clusterName: ClusterName;
@@ -18,17 +15,23 @@ const ConsumerGroups: React.FC<Props> = ({
   isFetched,
   fetchConsumerGroupsList,
 }) => {
-  React.useEffect(() => { fetchConsumerGroupsList(clusterName); }, [fetchConsumerGroupsList, clusterName]);
+  React.useEffect(() => {
+    fetchConsumerGroupsList(clusterName);
+  }, [fetchConsumerGroupsList, clusterName]);
 
   if (isFetched) {
     return (
       <Switch>
-        <Route exact path="/clusters/:clusterName/consumer-groups" component={ListContainer} />
+        <Route
+          exact
+          path="/ui/clusters/:clusterName/consumer-groups"
+          component={ListContainer}
+        />
       </Switch>
     );
   }
 
-  return (<PageLoader />);
+  return <PageLoader />;
 };
 
 export default ConsumerGroups;

+ 8 - 9
kafka-ui-react-app/src/components/Nav/Nav.tsx

@@ -5,7 +5,7 @@ import cx from 'classnames';
 import ClusterMenu from './ClusterMenu';
 
 interface Props {
-  isClusterListFetched: boolean,
+  isClusterListFetched: boolean;
   clusters: Cluster[];
   className?: string;
 }
@@ -16,22 +16,21 @@ const Nav: React.FC<Props> = ({
   className,
 }) => (
   <aside className={cx('menu has-shadow has-background-white', className)}>
-    <p className="menu-label">
-      General
-    </p>
+    <p className="menu-label">General</p>
     <ul className="menu-list">
       <li>
-        <NavLink exact to="/" activeClassName="is-active" title="Dashboard">
+        <NavLink exact to="/ui" activeClassName="is-active" title="Dashboard">
           Dashboard
         </NavLink>
       </li>
     </ul>
-    <p className="menu-label">
-      Clusters
-    </p>
+    <p className="menu-label">Clusters</p>
     {!isClusterListFetched && <div className="loader" />}
 
-    {isClusterListFetched && clusters.map((cluster, index) => <ClusterMenu {...cluster} key={`cluster-list-item-key-${index}`}/>)}
+    {isClusterListFetched &&
+      clusters.map((cluster, index) => (
+        <ClusterMenu {...cluster} key={`cluster-list-item-key-${index}`} />
+      ))}
   </aside>
 );
 

+ 21 - 20
kafka-ui-react-app/src/components/Topics/New/New.tsx

@@ -15,33 +15,34 @@ interface Props {
 }
 
 const New: React.FC<Props> = ({
-                                clusterName,
-                                isTopicCreated,
-                                createTopic,
-                                redirectToTopicPath,
-                                resetUploadedState
-                              }) => {
+  clusterName,
+  isTopicCreated,
+  createTopic,
+  redirectToTopicPath,
+  resetUploadedState,
+}) => {
   const methods = useForm<TopicFormData>();
   const [isSubmitting, setIsSubmitting] = React.useState<boolean>(false);
 
-  React.useEffect(
-    () => {
-      if (isSubmitting && isTopicCreated) {
-        const {name} = methods.getValues();
-        redirectToTopicPath(clusterName, name);
-      }
-    },
-    [isSubmitting, isTopicCreated, redirectToTopicPath, clusterName, methods.getValues],
-  );
+  React.useEffect(() => {
+    if (isSubmitting && isTopicCreated) {
+      const { name } = methods.getValues();
+      redirectToTopicPath(clusterName, name);
+    }
+  }, [
+      isSubmitting,
+      isTopicCreated,
+      redirectToTopicPath,
+      clusterName,
+      methods.getValues,
+    ]);
 
   const onSubmit = async (data: TopicFormData) => {
-    //TODO: need to fix loader. After success loading the first time, we won't wait for creation any more, because state is
-    //loaded, and we will try to get entity immediately after pressing the button, and we will receive null
-    //going to object page on the second creation. Resetting loaded state is workaround, need to tweak loader logic
+    // TODO: need to fix loader. After success loading the first time, we won't wait for creation any more, because state is
+    // loaded, and we will try to get entity immediately after pressing the button, and we will receive null
+    // going to object page on the second creation. Resetting loaded state is workaround, need to tweak loader logic
     resetUploadedState();
     setIsSubmitting(true);
-    console.log(data);
-
     createTopic(clusterName, data);
   };
 

+ 29 - 0
kafka-ui-react-app/src/components/Topics/New/TimeToRetainBtn.tsx

@@ -0,0 +1,29 @@
+import React from 'react';
+import { useFormContext } from 'react-hook-form';
+import cx from 'classnames';
+import { MILLISECONDS_IN_WEEK } from 'lib/constants';
+
+interface Props {
+  inputName: string;
+  text: string;
+  value: number;
+}
+
+const TimeToRetainBtn: React.FC<Props> = ({ inputName, text, value }) => {
+  const { setValue, watch } = useFormContext();
+  const watchedValue = watch(inputName, MILLISECONDS_IN_WEEK.toString());
+
+  return (
+    <button
+      type="button"
+      className={cx('button', {
+        'is-info': watchedValue === value.toString(),
+      })}
+      onClick={() => setValue(inputName, value)}
+    >
+      {text}
+    </button>
+  );
+};
+
+export default TimeToRetainBtn;

+ 36 - 0
kafka-ui-react-app/src/components/Topics/New/TimeToRetainBtns.tsx

@@ -0,0 +1,36 @@
+import React from 'react';
+import { MILLISECONDS_IN_DAY } from 'lib/constants';
+import TimeToRetainBtn from './TimeToRetainBtn';
+
+interface Props {
+  name: string;
+  value: string;
+}
+
+const TimeToRetainBtns: React.FC<Props> = ({ name }) => (
+  <div className="buttons are-small">
+    <TimeToRetainBtn
+      text="12h"
+      inputName={name}
+      value={MILLISECONDS_IN_DAY / 2}
+    />
+    <TimeToRetainBtn text="1d" inputName={name} value={MILLISECONDS_IN_DAY} />
+    <TimeToRetainBtn
+      text="2d"
+      inputName={name}
+      value={MILLISECONDS_IN_DAY * 2}
+    />
+    <TimeToRetainBtn
+      text="7d"
+      inputName={name}
+      value={MILLISECONDS_IN_DAY * 7}
+    />
+    <TimeToRetainBtn
+      text="4w"
+      inputName={name}
+      value={MILLISECONDS_IN_DAY * 7 * 24}
+    />
+  </div>
+);
+
+export default TimeToRetainBtns;

+ 22 - 6
kafka-ui-react-app/src/components/Topics/Topics.tsx

@@ -22,20 +22,36 @@ const Topics: React.FC<Props> = ({
   isFetched,
   fetchTopicList,
 }) => {
-  React.useEffect(() => { fetchTopicList(clusterName); }, [fetchTopicList, clusterName]);
+  React.useEffect(() => {
+    fetchTopicList(clusterName);
+  }, [fetchTopicList, clusterName]);
 
   if (isFetched) {
     return (
       <Switch>
-        <Route exact path="/clusters/:clusterName/topics" component={ListContainer} />
-        <Route exact path="/clusters/:clusterName/topics/new" component={NewContainer} />
-        <Route path="/clusters/:clusterName/topics/:topicName/edit" component={EditContainer} />
-        <Route path="/clusters/:clusterName/topics/:topicName" component={DetailsContainer} />
+        <Route
+          exact
+          path="/ui/clusters/:clusterName/topics"
+          component={ListContainer}
+        />
+        <Route
+          exact
+          path="/ui/clusters/:clusterName/topics/new"
+          component={NewContainer}
+        />
+        <Route
+          exact
+          path="/ui/clusters/:clusterName/topics/:topicName/edit"
+          component={EditContainer} />
+        <Route
+          path="/ui/clusters/:clusterName/topics/:topicName"
+          component={DetailsContainer}
+        />
       </Switch>
     );
   }
 
-  return (<PageLoader />);
+  return <PageLoader />;
 };
 
 export default Topics;

+ 21 - 22
kafka-ui-react-app/src/components/Topics/shared/Form/TimeToRetain.tsx

@@ -1,53 +1,52 @@
 import React from 'react';
 import prettyMilliseconds from 'pretty-ms';
 import { useFormContext, ErrorMessage } from 'react-hook-form';
-import { MILLISECONDS_IN_WEEK } from 'lib/constants';
-
-const MILLISECONDS_IN_SECOND = 1000;
+import { MILLISECONDS_IN_WEEK, MILLISECONDS_IN_SECOND } from 'lib/constants';
+import TimeToRetainBtns from './TimeToRetainBtns';
 
 interface Props {
   isSubmitting: boolean;
 }
 
-const TimeToRetain: React.FC<Props> = ({
-  isSubmitting,
-}) => {
+const TimeToRetain: React.FC<Props> = ({ isSubmitting }) => {
   const { register, errors, watch } = useFormContext();
   const defaultValue = MILLISECONDS_IN_WEEK;
-  const name: string = 'retentionMs';
-  const watchedValue: any = watch(name, defaultValue.toString());
+  const name = 'retentionMs';
+  const watchedValue = watch(name, defaultValue.toString());
 
   const valueHint = React.useMemo(() => {
     const value = parseInt(watchedValue, 10);
     return value >= MILLISECONDS_IN_SECOND ? prettyMilliseconds(value) : false;
-  }, [watchedValue])
+  }, [watchedValue]);
 
   return (
     <>
-      <label className="label">
-        Time to retain data (in ms)
+      <label
+        className="label is-flex"
+        style={{ justifyContent: 'space-between' }}
+      >
+        <div>Time to retain data (in ms)</div>
+        {valueHint && <span className="has-text-info">{valueHint}</span>}
       </label>
       <input
         className="input"
+        id="timeToRetain"
         type="number"
         defaultValue={defaultValue}
         name={name}
-        ref={register(
-          { min: { value: -1, message: 'must be greater than or equal to -1' }}
-        )}
+        ref={register({
+          min: { value: -1, message: 'must be greater than or equal to -1' },
+        })}
         disabled={isSubmitting}
       />
+
       <p className="help is-danger">
-        <ErrorMessage errors={errors} name={name}/>
+        <ErrorMessage errors={errors} name={name} />
       </p>
-      {
-        valueHint &&
-        <p className="help is-info">
-          {valueHint}
-        </p>
-      }
+
+      <TimeToRetainBtns name={name} value={watchedValue} />
     </>
   );
-}
+};
 
 export default TimeToRetain;

+ 2 - 0
kafka-ui-react-app/src/lib/constants.ts

@@ -11,5 +11,7 @@ export const BASE_URL = process.env.REACT_APP_API_URL;
 export const TOPIC_NAME_VALIDATION_PATTERN = RegExp(/^[.,A-Za-z0-9_-]+$/);
 
 export const MILLISECONDS_IN_WEEK = 604_800_000;
+export const MILLISECONDS_IN_DAY = 86_400_000;
+export const MILLISECONDS_IN_SECOND = 1_000;
 
 export const BYTES_IN_GB = 1_073_741_824;

+ 1 - 1
kafka-ui-react-app/src/lib/paths.ts

@@ -1,6 +1,6 @@
 import { ClusterName, TopicName } from 'redux/interfaces';
 
-const clusterPath = (clusterName: ClusterName) => `/clusters/${clusterName}`;
+const clusterPath = (clusterName: ClusterName) => `/ui/clusters/${clusterName}`;
 
 export const clusterBrokersPath = (clusterName: ClusterName) =>
   `${clusterPath(clusterName)}/brokers`;

+ 20 - 18
kafka-ui-react-app/src/redux/api/topics.ts

@@ -8,7 +8,10 @@ import {
   TopicFormData,
   TopicFormCustomParam,
 } from 'redux/interfaces';
-import { BASE_URL, BASE_PARAMS } from 'lib/constants';
+import {
+  BASE_URL,
+  BASE_PARAMS,
+} from 'lib/constants';
 
 export const getTopicConfig = (
   clusterName: ClusterName,
@@ -35,18 +38,6 @@ interface Result {
   [index: string]: string;
 }
 
-const parsedCustomParams = (params: {
-  [paramIndex: string]: TopicFormCustomParam;
-}) =>
-  reduce(
-    Object.values(params),
-    (result: Result, customParam: TopicFormCustomParam) => {
-      result[customParam.name] = customParam.value;
-      return result;
-    },
-    {}
-  );
-
 export const postTopic = (
   clusterName: ClusterName,
   form: TopicFormData
@@ -60,9 +51,20 @@ export const postTopic = (
     retentionMs,
     maxMessageBytes,
     minInSyncReplicas,
-    customParams,
   } = form;
 
+  const customParams =
+    (form.customParams &&
+      reduce(
+        Object.values(form.customParams),
+        (result: Result, customParam: TopicFormCustomParam) => {
+          result[customParam.name] = customParam.value;
+          return result;
+        },
+        {}
+      )) ||
+    {};
+
   const body = JSON.stringify({
     name,
     partitions,
@@ -73,15 +75,15 @@ export const postTopic = (
       'retention.bytes': retentionBytes,
       'max.message.bytes': maxMessageBytes,
       'min.insync.replicas': minInSyncReplicas,
-      ...parsedCustomParams(customParams),
-    },
+      ...customParams,
+    }
   });
 
   return fetch(`${BASE_URL}/clusters/${clusterName}/topics`, {
     ...BASE_PARAMS,
     method: 'POST',
     body,
-  }).then((res) => res.json());
+  }).then(res => res.json());
 };
 
 export const patchTopic = (
@@ -107,7 +109,7 @@ export const patchTopic = (
       'retention.bytes': retentionBytes,
       'max.message.bytes': maxMessageBytes,
       'min.insync.replicas': minInSyncReplicas,
-      ...parsedCustomParams(customParams),
+      ...customParams,
     },
   });