Browse Source

[BE Refactoring] Reactive admin client (#882)

* All AdminClient interactions moved to ReactiveAdminClient
Ilya Kuramshin 3 years ago
parent
commit
b05da3373e

+ 1 - 1
kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/ConsumerGroupsController.java

@@ -108,7 +108,7 @@ public class ConsumerGroupsController implements ConsumerGroupsApi {
               new ValidationException("Unknown resetType " + reset.getResetType())
           );
       }
-    }).map(o -> ResponseEntity.ok().build());
+    }).thenReturn(ResponseEntity.ok().build());
   }
 
 }

+ 0 - 27
kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/ExtendedAdminClient.java

@@ -1,27 +0,0 @@
-package com.provectus.kafka.ui.model;
-
-import com.provectus.kafka.ui.util.ClusterUtil;
-import java.util.Set;
-import lombok.Data;
-import lombok.RequiredArgsConstructor;
-import org.apache.kafka.clients.admin.AdminClient;
-import reactor.core.publisher.Mono;
-
-@Data
-@RequiredArgsConstructor
-public class ExtendedAdminClient {
-
-  private final AdminClient adminClient;
-  private final Set<SupportedFeature> supportedFeatures;
-
-  public static Mono<ExtendedAdminClient> extendedAdminClient(AdminClient adminClient) {
-
-    return ClusterUtil.getSupportedFeatures(adminClient)
-        .map(s -> new ExtendedAdminClient(adminClient, s));
-  }
-
-  public enum SupportedFeature {
-    INCREMENTAL_ALTER_CONFIGS,
-    ALTER_CONFIGS
-  }
-}

+ 2 - 15
kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/AdminClientService.java

@@ -1,23 +1,10 @@
 package com.provectus.kafka.ui.service;
 
-import com.provectus.kafka.ui.model.ExtendedAdminClient;
 import com.provectus.kafka.ui.model.KafkaCluster;
 import reactor.core.publisher.Mono;
 
 public interface AdminClientService {
-  /**
-   * Get ExtendedAdminClient from cache if exists or create new if not.
-   *
-   * @param cluster - cluster
-   * @return The Mono of ExtendedAdminClient
-   */
-  Mono<ExtendedAdminClient> getOrCreateAdminClient(KafkaCluster cluster);
 
-  /**
-   * Create new ExtendedAdminClient.
-   *
-   * @param cluster - cluster
-   * @return The Mono of ExtendedAdminClient
-   */
-  Mono<ExtendedAdminClient> createAdminClient(KafkaCluster cluster);
+  Mono<ReactiveAdminClient> get(KafkaCluster cluster);
+
 }

+ 11 - 7
kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/AdminClientServiceImpl.java

@@ -1,7 +1,7 @@
 package com.provectus.kafka.ui.service;
 
-import com.provectus.kafka.ui.model.ExtendedAdminClient;
 import com.provectus.kafka.ui.model.KafkaCluster;
+import java.io.Closeable;
 import java.util.Map;
 import java.util.Properties;
 import java.util.concurrent.ConcurrentHashMap;
@@ -17,21 +17,20 @@ import reactor.core.publisher.Mono;
 @Service
 @RequiredArgsConstructor
 @Log4j2
-public class AdminClientServiceImpl implements AdminClientService {
-  private final Map<String, ExtendedAdminClient> adminClientCache = new ConcurrentHashMap<>();
+public class AdminClientServiceImpl implements AdminClientService, Closeable {
+  private final Map<String, ReactiveAdminClient> adminClientCache = new ConcurrentHashMap<>();
   @Setter // used in tests
   @Value("${kafka.admin-client-timeout}")
   private int clientTimeout;
 
   @Override
-  public Mono<ExtendedAdminClient> getOrCreateAdminClient(KafkaCluster cluster) {
+  public Mono<ReactiveAdminClient> get(KafkaCluster cluster) {
     return Mono.justOrEmpty(adminClientCache.get(cluster.getName()))
         .switchIfEmpty(createAdminClient(cluster))
         .map(e -> adminClientCache.computeIfAbsent(cluster.getName(), key -> e));
   }
 
-  @Override
-  public Mono<ExtendedAdminClient> createAdminClient(KafkaCluster cluster) {
+  private Mono<ReactiveAdminClient> createAdminClient(KafkaCluster cluster) {
     return Mono.fromSupplier(() -> {
       Properties properties = new Properties();
       properties.putAll(cluster.getProperties());
@@ -39,6 +38,11 @@ public class AdminClientServiceImpl implements AdminClientService {
           .put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, cluster.getBootstrapServers());
       properties.put(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, clientTimeout);
       return AdminClient.create(properties);
-    }).flatMap(ExtendedAdminClient::extendedAdminClient);
+    }).flatMap(ReactiveAdminClient::create);
+  }
+
+  @Override
+  public void close() {
+    adminClientCache.values().forEach(ReactiveAdminClient::close);
   }
 }

+ 10 - 29
kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/BrokerServiceImpl.java

@@ -3,22 +3,17 @@ package com.provectus.kafka.ui.service;
 import com.provectus.kafka.ui.exception.IllegalEntityStateException;
 import com.provectus.kafka.ui.exception.NotFoundException;
 import com.provectus.kafka.ui.model.BrokerDTO;
-import com.provectus.kafka.ui.model.ExtendedAdminClient;
 import com.provectus.kafka.ui.model.InternalBrokerConfig;
 import com.provectus.kafka.ui.model.KafkaCluster;
 import com.provectus.kafka.ui.util.ClusterUtil;
 import java.util.Collections;
-import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.stream.Collectors;
 import lombok.RequiredArgsConstructor;
 import lombok.extern.log4j.Log4j2;
 import org.apache.kafka.clients.admin.ConfigEntry;
-import org.apache.kafka.clients.admin.DescribeConfigsOptions;
 import org.apache.kafka.common.Node;
-import org.apache.kafka.common.config.ConfigResource;
-import org.apache.kafka.common.errors.UnsupportedVersionException;
 import org.springframework.stereotype.Service;
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
@@ -32,23 +27,8 @@ public class BrokerServiceImpl implements BrokerService {
 
   private Mono<Map<Integer, List<ConfigEntry>>> loadBrokersConfig(
       KafkaCluster cluster, List<Integer> brokersIds) {
-    List<ConfigResource> resources = brokersIds.stream()
-        .map(brokerId -> new ConfigResource(ConfigResource.Type.BROKER, Integer.toString(brokerId)))
-        .collect(Collectors.toList());
-
-    return adminClientService.getOrCreateAdminClient(cluster)
-        .map(ExtendedAdminClient::getAdminClient)
-        .flatMap(adminClient ->
-            ClusterUtil.toMono(adminClient.describeConfigs(resources,
-                    new DescribeConfigsOptions().includeSynonyms(true)).all())
-                .map(config -> config.entrySet()
-                    .stream()
-                    .collect(Collectors.toMap(
-                        c -> Integer.valueOf(c.getKey().name()),
-                        c -> List.copyOf(c.getValue().entries())
-                    ))
-                ))
-        .onErrorResume(UnsupportedVersionException.class, (e) -> Mono.just(new HashMap<>()));
+    return adminClientService.get(cluster)
+        .flatMap(ac -> ac.loadBrokersConfig(brokersIds));
   }
 
   private Mono<List<ConfigEntry>> loadBrokersConfig(
@@ -87,22 +67,23 @@ public class BrokerServiceImpl implements BrokerService {
   @Override
   public Flux<BrokerDTO> getBrokers(KafkaCluster cluster) {
     return adminClientService
-        .getOrCreateAdminClient(cluster)
-        .flatMap(client -> ClusterUtil.toMono(client.getAdminClient().describeCluster().nodes())
-            .map(n -> n.stream().map(node -> {
+        .get(cluster)
+        .flatMap(ReactiveAdminClient::describeCluster)
+        .map(description -> description.getNodes().stream()
+            .map(node -> {
               BrokerDTO broker = new BrokerDTO();
               broker.setId(node.id());
               broker.setHost(node.host());
               return broker;
-            }).collect(Collectors.toList())))
+            }).collect(Collectors.toList()))
         .flatMapMany(Flux::fromIterable);
   }
 
   @Override
   public Mono<Node> getController(KafkaCluster cluster) {
     return adminClientService
-        .getOrCreateAdminClient(cluster)
-        .map(ExtendedAdminClient::getAdminClient)
-        .flatMap(adminClient -> ClusterUtil.toMono(adminClient.describeCluster().controller()));
+        .get(cluster)
+        .flatMap(ReactiveAdminClient::describeCluster)
+        .map(ReactiveAdminClient.ClusterDescription::getController);
   }
 }

+ 2 - 8
kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/ClusterService.java

@@ -19,7 +19,6 @@ import com.provectus.kafka.ui.model.ConsumerGroupDTO;
 import com.provectus.kafka.ui.model.ConsumerGroupDetailsDTO;
 import com.provectus.kafka.ui.model.ConsumerPosition;
 import com.provectus.kafka.ui.model.CreateTopicMessageDTO;
-import com.provectus.kafka.ui.model.ExtendedAdminClient;
 import com.provectus.kafka.ui.model.Feature;
 import com.provectus.kafka.ui.model.InternalTopic;
 import com.provectus.kafka.ui.model.KafkaCluster;
@@ -310,13 +309,8 @@ public class ClusterService {
   public Mono<Void> deleteConsumerGroupById(String clusterName,
                                             String groupId) {
     return clustersStorage.getClusterByName(clusterName)
-        .map(cluster -> adminClientService.getOrCreateAdminClient(cluster)
-            .map(ExtendedAdminClient::getAdminClient)
-            .flatMap(adminClient ->
-                ClusterUtil.toMono(
-                    adminClient.deleteConsumerGroups(List.of(groupId)).all()
-                )
-            )
+        .map(cluster -> adminClientService.get(cluster)
+            .flatMap(adminClient -> adminClient.deleteConsumerGroups(List.of(groupId)))
             .onErrorResume(this::reThrowCustomException)
         )
         .orElse(Mono.empty());

+ 87 - 212
kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/KafkaService.java

@@ -8,7 +8,6 @@ import com.provectus.kafka.ui.exception.ValidationException;
 import com.provectus.kafka.ui.model.BrokerLogdirUpdateDTO;
 import com.provectus.kafka.ui.model.CleanupPolicy;
 import com.provectus.kafka.ui.model.CreateTopicMessageDTO;
-import com.provectus.kafka.ui.model.ExtendedAdminClient;
 import com.provectus.kafka.ui.model.InternalBrokerDiskUsage;
 import com.provectus.kafka.ui.model.InternalBrokerMetrics;
 import com.provectus.kafka.ui.model.InternalClusterMetrics;
@@ -31,7 +30,6 @@ import com.provectus.kafka.ui.util.ClusterUtil;
 import com.provectus.kafka.ui.util.JmxClusterUtil;
 import com.provectus.kafka.ui.util.JmxMetricsName;
 import com.provectus.kafka.ui.util.JmxMetricsValueName;
-import com.provectus.kafka.ui.util.MapUtil;
 import java.math.BigDecimal;
 import java.util.ArrayList;
 import java.util.Collection;
@@ -46,24 +44,13 @@ import java.util.Properties;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 import java.util.stream.Collectors;
-import java.util.stream.Stream;
 import lombok.RequiredArgsConstructor;
 import lombok.SneakyThrows;
 import lombok.extern.log4j.Log4j2;
-import org.apache.kafka.clients.admin.AdminClient;
-import org.apache.kafka.clients.admin.AlterConfigOp;
-import org.apache.kafka.clients.admin.Config;
-import org.apache.kafka.clients.admin.ConfigEntry;
-import org.apache.kafka.clients.admin.ConsumerGroupListing;
-import org.apache.kafka.clients.admin.DescribeConfigsOptions;
-import org.apache.kafka.clients.admin.ListTopicsOptions;
 import org.apache.kafka.clients.admin.NewPartitionReassignment;
 import org.apache.kafka.clients.admin.NewPartitions;
-import org.apache.kafka.clients.admin.NewTopic;
-import org.apache.kafka.clients.admin.RecordsToDelete;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
-import org.apache.kafka.clients.consumer.OffsetAndMetadata;
 import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.kafka.clients.producer.ProducerConfig;
 import org.apache.kafka.clients.producer.ProducerRecord;
@@ -71,7 +58,6 @@ import org.apache.kafka.clients.producer.RecordMetadata;
 import org.apache.kafka.common.Node;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.TopicPartitionReplica;
-import org.apache.kafka.common.config.ConfigResource;
 import org.apache.kafka.common.errors.InvalidRequestException;
 import org.apache.kafka.common.errors.LogDirNotFoundException;
 import org.apache.kafka.common.errors.TimeoutException;
@@ -95,8 +81,6 @@ import reactor.util.function.Tuples;
 @Log4j2
 public class KafkaService {
 
-  private static final ListTopicsOptions LIST_TOPICS_OPTIONS =
-      new ListTopicsOptions().listInternal(true);
   private final ZookeeperService zookeeperService;
   private final JmxClusterUtil jmxClusterUtil;
   private final ClustersStorage clustersStorage;
@@ -122,18 +106,17 @@ public class KafkaService {
 
   @SneakyThrows
   public Mono<KafkaCluster> getUpdatedCluster(KafkaCluster cluster) {
-    return adminClientService.getOrCreateAdminClient(cluster)
+    return adminClientService.get(cluster)
         .flatMap(
-            ac -> ClusterUtil.getClusterVersion(ac.getAdminClient()).flatMap(
+            ac -> ac.getClusterVersion().flatMap(
                 version ->
-                    getClusterMetrics(ac.getAdminClient())
-                        .flatMap(i -> fillJmxMetrics(i, cluster.getName(), ac.getAdminClient()))
+                    getClusterMetrics(ac)
+                        .flatMap(i -> fillJmxMetrics(i, cluster.getName(), ac))
                         .flatMap(clusterMetrics ->
-                            getTopicsData(ac.getAdminClient()).flatMap(it -> {
+                            getTopicsData(ac).flatMap(it -> {
                                   if (cluster.getDisableLogDirsCollection() == null
                                       || !cluster.getDisableLogDirsCollection()) {
-                                    return updateSegmentMetrics(
-                                        ac.getAdminClient(), clusterMetrics, it
+                                    return updateSegmentMetrics(ac, clusterMetrics, it
                                     );
                                   } else {
                                     return emptySegmentMetrics(clusterMetrics, it);
@@ -250,16 +233,16 @@ public class KafkaService {
   }
 
   @SneakyThrows
-  private Mono<List<InternalTopic>> getTopicsData(AdminClient adminClient) {
-    return ClusterUtil.toMono(adminClient.listTopics(LIST_TOPICS_OPTIONS).names())
-        .flatMap(topics -> getTopicsData(adminClient, topics).collectList());
+  private Mono<List<InternalTopic>> getTopicsData(ReactiveAdminClient client) {
+    return client.listTopics(true)
+        .flatMap(topics -> getTopicsData(client, topics).collectList());
   }
 
-  private Flux<InternalTopic> getTopicsData(AdminClient adminClient, Collection<String> topics) {
+  private Flux<InternalTopic> getTopicsData(ReactiveAdminClient client, Collection<String> topics) {
     final Mono<Map<String, List<InternalTopicConfig>>> configsMono =
-        loadTopicsConfig(adminClient, topics);
+        loadTopicsConfig(client, topics);
 
-    return ClusterUtil.toMono(adminClient.describeTopics(topics).all())
+    return client.describeTopics(topics)
         .map(m -> m.values().stream()
             .map(ClusterUtil::mapToInternalTopic).collect(Collectors.toList()))
         .flatMap(internalTopics -> configsMono
@@ -268,104 +251,72 @@ public class KafkaService {
   }
 
 
-  private Mono<InternalClusterMetrics> getClusterMetrics(AdminClient client) {
-    return ClusterUtil.toMono(client.describeCluster().nodes())
-        .flatMap(brokers ->
-            ClusterUtil.toMono(client.describeCluster().controller()).map(
-                c -> {
-                  InternalClusterMetrics.InternalClusterMetricsBuilder metricsBuilder =
-                      InternalClusterMetrics.builder();
-                  metricsBuilder.brokerCount(brokers.size()).activeControllers(c != null ? 1 : 0);
-                  return metricsBuilder.build();
-                }
-            )
-        );
-  }
-
-  @SneakyThrows
-  private Mono<String> createTopic(AdminClient adminClient, NewTopic newTopic) {
-    return ClusterUtil.toMono(adminClient.createTopics(Collections.singletonList(newTopic)).all(),
-        newTopic.name());
+  private Mono<InternalClusterMetrics> getClusterMetrics(ReactiveAdminClient client) {
+    return client.describeCluster().map(desc ->
+        InternalClusterMetrics.builder()
+            .brokerCount(desc.getNodes().size())
+            .activeControllers(desc.getController() != null ? 1 : 0)
+            .build()
+    );
   }
 
   @SneakyThrows
-  public Mono<InternalTopic> createTopic(AdminClient adminClient,
+  public Mono<InternalTopic> createTopic(ReactiveAdminClient adminClient,
                                          Mono<TopicCreationDTO> topicCreation) {
-    return topicCreation.flatMap(
-        topicData -> {
-          NewTopic newTopic = new NewTopic(topicData.getName(), topicData.getPartitions(),
-              topicData.getReplicationFactor().shortValue());
-          newTopic.configs(topicData.getConfigs());
-          return createTopic(adminClient, newTopic).map(v -> topicData);
-        })
+    return topicCreation.flatMap(topicData ->
+            adminClient.createTopic(
+                topicData.getName(),
+                topicData.getPartitions(),
+                topicData.getReplicationFactor().shortValue(),
+                topicData.getConfigs()
+            ).thenReturn(topicData)
+        )
         .onErrorResume(t -> Mono.error(new TopicMetadataException(t.getMessage())))
-        .flatMap(
-            topicData ->
-                getTopicsData(adminClient, Collections.singleton(topicData.getName()))
-                    .next()
-        ).switchIfEmpty(Mono.error(new RuntimeException("Can't find created topic")));
+        .flatMap(topicData -> getUpdatedTopic(adminClient, topicData.getName()))
+        .switchIfEmpty(Mono.error(new RuntimeException("Can't find created topic")));
   }
 
   public Mono<InternalTopic> createTopic(
       KafkaCluster cluster, Mono<TopicCreationDTO> topicCreation) {
-    return adminClientService.getOrCreateAdminClient(cluster)
-        .flatMap(ac -> createTopic(ac.getAdminClient(), topicCreation));
+    return adminClientService.get(cluster).flatMap(ac -> createTopic(ac, topicCreation));
   }
 
   public Mono<Void> deleteTopic(KafkaCluster cluster, String topicName) {
-    return adminClientService.getOrCreateAdminClient(cluster)
-        .map(ExtendedAdminClient::getAdminClient)
-        .flatMap(adminClient ->
-                ClusterUtil.toMono(adminClient.deleteTopics(List.of(topicName)).all())
-        );
+    return adminClientService.get(cluster).flatMap(c -> c.deleteTopic(topicName));
   }
 
   @SneakyThrows
   private Mono<Map<String, List<InternalTopicConfig>>> loadTopicsConfig(
-      AdminClient adminClient, Collection<String> topicNames) {
-    List<ConfigResource> resources = topicNames.stream()
-        .map(topicName -> new ConfigResource(ConfigResource.Type.TOPIC, topicName))
-        .collect(Collectors.toList());
-
-    return ClusterUtil.toMono(adminClient.describeConfigs(resources,
-        new DescribeConfigsOptions().includeSynonyms(true)).all())
+      ReactiveAdminClient client, Collection<String> topicNames) {
+    return client.getTopicsConfig(topicNames)
         .map(configs ->
             configs.entrySet().stream().collect(Collectors.toMap(
-                c -> c.getKey().name(),
-                c -> c.getValue().entries().stream()
+                Map.Entry::getKey,
+                c -> c.getValue().stream()
                     .map(ClusterUtil::mapToInternalTopicConfig)
                     .collect(Collectors.toList()))));
   }
 
-  public Mono<List<InternalConsumerGroup>> getConsumerGroupsInternal(
-      KafkaCluster cluster) {
-    return adminClientService.getOrCreateAdminClient(cluster).flatMap(ac ->
-        ClusterUtil.toMono(ac.getAdminClient().listConsumerGroups().all())
-            .flatMap(s ->
-                getConsumerGroupsInternal(
-                    cluster,
-                    s.stream().map(ConsumerGroupListing::groupId).collect(Collectors.toList()))
-            )
-    );
+  public Mono<List<InternalConsumerGroup>> getConsumerGroupsInternal(KafkaCluster cluster) {
+    return adminClientService.get(cluster).flatMap(ac ->
+        ac.listConsumerGroups()
+            .flatMap(groupIds -> getConsumerGroupsInternal(cluster, groupIds)));
   }
 
-  public Mono<List<InternalConsumerGroup>> getConsumerGroupsInternal(
-      KafkaCluster cluster, List<String> groupIds) {
-
-    return adminClientService.getOrCreateAdminClient(cluster).flatMap(ac ->
-        ClusterUtil.toMono(
-            ac.getAdminClient().describeConsumerGroups(groupIds).all()
-        ).map(Map::values)
-    ).flatMap(descriptions ->
-        Flux.fromIterable(descriptions)
-            .parallel()
-            .flatMap(d ->
-                groupMetadata(cluster, d.groupId())
-                    .map(offsets -> ClusterUtil.convertToInternalConsumerGroup(d, offsets))
-            )
-            .sequential()
-            .collectList()
-    );
+  public Mono<List<InternalConsumerGroup>> getConsumerGroupsInternal(KafkaCluster cluster,
+                                                                     List<String> groupIds) {
+    return adminClientService.get(cluster).flatMap(ac ->
+        ac.describeConsumerGroups(groupIds)
+            .map(Map::values)
+            .flatMap(descriptions ->
+                Flux.fromIterable(descriptions)
+                    .parallel()
+                    .flatMap(d ->
+                        ac.listConsumerGroupOffsets(d.groupId())
+                            .map(offsets -> ClusterUtil.convertToInternalConsumerGroup(d, offsets))
+                    )
+                    .sequential()
+                    .collectList()));
   }
 
   public Mono<List<InternalConsumerGroup>> getConsumerGroups(
@@ -392,15 +343,6 @@ public class KafkaService {
     );
   }
 
-  public Mono<Map<TopicPartition, OffsetAndMetadata>> groupMetadata(KafkaCluster cluster,
-                                                                    String consumerGroupId) {
-    return adminClientService.getOrCreateAdminClient(cluster).map(ac ->
-        ac.getAdminClient()
-            .listConsumerGroupOffsets(consumerGroupId)
-            .partitionsToOffsetAndMetadata()
-    ).flatMap(ClusterUtil::toMono).map(MapUtil::removeNullValues);
-  }
-
   public Map<TopicPartition, Long> topicPartitionsEndOffsets(
       KafkaCluster cluster, Collection<TopicPartition> topicPartitions) {
     try (KafkaConsumer<Bytes, Bytes> consumer = createConsumer(cluster)) {
@@ -427,47 +369,17 @@ public class KafkaService {
   }
 
   @SneakyThrows
-  public Mono<InternalTopic> updateTopic(KafkaCluster cluster, String topicName,
+  public Mono<InternalTopic> updateTopic(KafkaCluster cluster,
+                                         String topicName,
                                          TopicUpdateDTO topicUpdate) {
-    ConfigResource topicCr = new ConfigResource(ConfigResource.Type.TOPIC, topicName);
-    return adminClientService.getOrCreateAdminClient(cluster)
-        .flatMap(ac -> {
-          if (ac.getSupportedFeatures()
-              .contains(ExtendedAdminClient.SupportedFeature.INCREMENTAL_ALTER_CONFIGS)) {
-            return incrementalAlterConfig(topicUpdate, topicCr, ac)
-                .flatMap(c -> getUpdatedTopic(ac, topicName));
-          } else {
-            return alterConfig(topicUpdate, topicCr, ac)
-                .flatMap(c -> getUpdatedTopic(ac, topicName));
-          }
-        });
-  }
-
-  private Mono<InternalTopic> getUpdatedTopic(ExtendedAdminClient ac, String topicName) {
-    return getTopicsData(ac.getAdminClient())
-        .map(s -> s.stream()
-            .filter(t -> t.getName().equals(topicName)).findFirst().orElseThrow());
-  }
-
-  private Mono<String> incrementalAlterConfig(TopicUpdateDTO topicUpdate, ConfigResource topicCr,
-                                              ExtendedAdminClient ac) {
-    List<AlterConfigOp> listOp = topicUpdate.getConfigs().entrySet().stream()
-        .flatMap(cfg -> Stream.of(new AlterConfigOp(new ConfigEntry(cfg.getKey(), cfg.getValue()),
-            AlterConfigOp.OpType.SET))).collect(Collectors.toList());
-    return ClusterUtil.toMono(
-        ac.getAdminClient().incrementalAlterConfigs(Collections.singletonMap(topicCr, listOp))
-            .all(), topicCr.name());
+    return adminClientService.get(cluster)
+        .flatMap(ac ->
+            ac.updateTopicConfig(topicName,
+                topicUpdate.getConfigs()).then(getUpdatedTopic(ac, topicName)));
   }
 
-  @SuppressWarnings("deprecation")
-  private Mono<String> alterConfig(TopicUpdateDTO topicUpdate, ConfigResource topicCr,
-                                   ExtendedAdminClient ac) {
-    List<ConfigEntry> configEntries = topicUpdate.getConfigs().entrySet().stream()
-        .flatMap(cfg -> Stream.of(new ConfigEntry(cfg.getKey(), cfg.getValue())))
-        .collect(Collectors.toList());
-    Config config = new Config(configEntries);
-    Map<ConfigResource, Config> map = Collections.singletonMap(topicCr, config);
-    return ClusterUtil.toMono(ac.getAdminClient().alterConfigs(map).all(), topicCr.name());
+  private Mono<InternalTopic> getUpdatedTopic(ReactiveAdminClient ac, String topicName) {
+    return getTopicsData(ac, List.of(topicName)).next();
   }
 
   private InternalTopic mergeWithStats(InternalTopic topic,
@@ -520,18 +432,12 @@ public class KafkaService {
     );
   }
 
-  private Mono<InternalSegmentSizeDto> updateSegmentMetrics(AdminClient ac,
+  private Mono<InternalSegmentSizeDto> updateSegmentMetrics(ReactiveAdminClient ac,
                                                             InternalClusterMetrics clusterMetrics,
                                                             List<InternalTopic> internalTopics) {
-    List<String> names =
-        internalTopics.stream().map(InternalTopic::getName).collect(Collectors.toList());
-    return ClusterUtil.toMono(ac.describeTopics(names).all()).flatMap(topic ->
-        ClusterUtil.toMono(ac.describeCluster().nodes()).flatMap(nodes ->
-
-            ClusterUtil.toMono(
-                ac.describeLogDirs(
-                    nodes.stream().map(Node::id).collect(Collectors.toList())).all()
-                ).map(log -> {
+    return ac.describeCluster().flatMap(
+        clusterDescription ->
+                ac.describeLogDirs().map(log -> {
                   final List<Tuple3<Integer, TopicPartition, Long>> topicPartitions =
                       log.entrySet().stream().flatMap(b ->
                           b.getValue().entrySet().stream().flatMap(topicMap ->
@@ -598,7 +504,6 @@ public class KafkaService {
                       )
                       .internalTopicWithSegmentSize(resultTopics).build();
                 })
-        )
     );
   }
 
@@ -612,15 +517,15 @@ public class KafkaService {
   }
 
   private Mono<InternalClusterMetrics> fillJmxMetrics(InternalClusterMetrics internalClusterMetrics,
-                                                      String clusterName, AdminClient ac) {
+                                                      String clusterName, ReactiveAdminClient ac) {
     return fillBrokerMetrics(internalClusterMetrics, clusterName, ac)
         .map(this::calculateClusterMetrics);
   }
 
   private Mono<InternalClusterMetrics> fillBrokerMetrics(
-      InternalClusterMetrics internalClusterMetrics, String clusterName, AdminClient ac) {
-    return ClusterUtil.toMono(ac.describeCluster().nodes())
-        .flatMapIterable(nodes -> nodes)
+      InternalClusterMetrics internalClusterMetrics, String clusterName, ReactiveAdminClient ac) {
+    return ac.describeCluster()
+        .flatMapIterable(desc -> desc.getNodes())
         .map(broker ->
             Map.of(broker.id(), InternalBrokerMetrics.builder()
                 .metrics(getJmxMetric(clusterName, broker)).build())
@@ -699,14 +604,7 @@ public class KafkaService {
   }
 
   public Mono<Void> deleteTopicMessages(KafkaCluster cluster, Map<TopicPartition, Long> offsets) {
-    var records = offsets.entrySet().stream()
-        .map(entry -> Map.entry(entry.getKey(), RecordsToDelete.beforeOffset(entry.getValue())))
-        .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
-    return adminClientService.getOrCreateAdminClient(cluster)
-        .map(ExtendedAdminClient::getAdminClient)
-        .flatMap(ac ->
-            ClusterUtil.toMono(ac.deleteRecords(records).all())
-        );
+    return adminClientService.get(cluster).flatMap(ac -> ac.deleteRecords(offsets));
   }
 
   public Mono<RecordMetadata> sendMessage(KafkaCluster cluster, String topic,
@@ -754,19 +652,11 @@ public class KafkaService {
     return headers;
   }
 
-  private Mono<InternalTopic> increaseTopicPartitions(AdminClient adminClient,
-                                                      String topicName,
-                                                      Map<String, NewPartitions> newPartitionsMap
-  ) {
-    return ClusterUtil.toMono(adminClient.createPartitions(newPartitionsMap).all(), topicName)
-        .flatMap(topic -> getTopicsData(adminClient, Collections.singleton(topic)).next());
-  }
-
   public Mono<InternalTopic> increaseTopicPartitions(
       KafkaCluster cluster,
       String topicName,
       PartitionsIncreaseDTO partitionsIncrease) {
-    return adminClientService.getOrCreateAdminClient(cluster)
+    return adminClientService.get(cluster)
         .flatMap(ac -> {
           Integer actualCount = cluster.getTopics().get(topicName).getPartitionCount();
           Integer requestedCount = partitionsIncrease.getTotalPartitionsCount();
@@ -787,18 +677,18 @@ public class KafkaService {
               topicName,
               NewPartitions.increaseTo(partitionsIncrease.getTotalPartitionsCount())
           );
-          return increaseTopicPartitions(ac.getAdminClient(), topicName, newPartitionsMap);
+          return ac.createPartitions(newPartitionsMap)
+              .then(getUpdatedTopic(ac, topicName));
         });
   }
 
   private Mono<InternalTopic> changeReplicationFactor(
-      AdminClient adminClient,
+      ReactiveAdminClient adminClient,
       String topicName,
       Map<TopicPartition, Optional<NewPartitionReassignment>> reassignments
   ) {
-    return ClusterUtil.toMono(adminClient
-        .alterPartitionReassignments(reassignments).all(), topicName)
-        .flatMap(topic -> getTopicsData(adminClient, Collections.singleton(topic)).next());
+    return adminClient.alterPartitionReassignments(reassignments)
+        .then(getUpdatedTopic(adminClient, topicName));
   }
 
   /**
@@ -808,7 +698,7 @@ public class KafkaService {
       KafkaCluster cluster,
       String topicName,
       ReplicationFactorChangeDTO replicationFactorChange) {
-    return adminClientService.getOrCreateAdminClient(cluster)
+    return adminClientService.get(cluster)
         .flatMap(ac -> {
           Integer actual = cluster.getTopics().get(topicName).getReplicationFactor();
           Integer requested = replicationFactorChange.getTotalReplicationFactor();
@@ -825,7 +715,7 @@ public class KafkaService {
                     String.format("Requested replication factor %s more than brokers count %s.",
                         requested, brokersCount)));
           }
-          return changeReplicationFactor(ac.getAdminClient(), topicName,
+          return changeReplicationFactor(ac, topicName,
               getPartitionsReassignments(cluster, topicName,
                   replicationFactorChange));
         });
@@ -833,15 +723,14 @@ public class KafkaService {
 
   public Mono<Map<Integer, Map<String, DescribeLogDirsResponse.LogDirInfo>>> getClusterLogDirs(
       KafkaCluster cluster, List<Integer> reqBrokers) {
-    return adminClientService.getOrCreateAdminClient(cluster)
-        .map(admin -> {
+    return adminClientService.get(cluster)
+        .flatMap(admin -> {
           List<Integer> brokers = new ArrayList<>(cluster.getBrokers());
           if (reqBrokers != null && !reqBrokers.isEmpty()) {
             brokers.retainAll(reqBrokers);
           }
-          return admin.getAdminClient().describeLogDirs(brokers);
+          return admin.describeLogDirs(brokers);
         })
-        .flatMap(result -> ClusterUtil.toMono(result.all()))
         .onErrorResume(TimeoutException.class, (TimeoutException e) -> {
           log.error("Error during fetching log dirs", e);
           return Mono.just(new HashMap<>());
@@ -949,20 +838,18 @@ public class KafkaService {
 
   public Mono<Void> updateBrokerLogDir(KafkaCluster cluster, Integer broker,
                                        BrokerLogdirUpdateDTO brokerLogDir) {
-    return adminClientService.getOrCreateAdminClient(cluster)
+    return adminClientService.get(cluster)
         .flatMap(ac -> updateBrokerLogDir(ac, brokerLogDir, broker));
   }
 
-  private Mono<Void> updateBrokerLogDir(ExtendedAdminClient adminMono,
+  private Mono<Void> updateBrokerLogDir(ReactiveAdminClient admin,
                                         BrokerLogdirUpdateDTO b,
                                         Integer broker) {
 
     Map<TopicPartitionReplica, String> req = Map.of(
         new TopicPartitionReplica(b.getTopic(), b.getPartition(), broker),
         b.getLogDir());
-    return Mono.just(adminMono)
-        .map(admin -> admin.getAdminClient().alterReplicaLogDirs(req))
-        .flatMap(result -> ClusterUtil.toMono(result.all()))
+    return  admin.alterReplicaLogDirs(req)
         .onErrorResume(UnknownTopicOrPartitionException.class,
             e -> Mono.error(new TopicOrPartitionNotFoundException()))
         .onErrorResume(LogDirNotFoundException.class,
@@ -974,20 +861,8 @@ public class KafkaService {
                                              Integer broker,
                                              String name,
                                              String value) {
-    return adminClientService.getOrCreateAdminClient(cluster)
-        .flatMap(ac -> updateBrokerConfigByName(ac, broker, name, value));
-  }
-
-  private Mono<Void> updateBrokerConfigByName(ExtendedAdminClient admin,
-                                              Integer broker,
-                                              String name,
-                                              String value) {
-    ConfigResource cr = new ConfigResource(ConfigResource.Type.BROKER, String.valueOf(broker));
-    AlterConfigOp op = new AlterConfigOp(new ConfigEntry(name, value), AlterConfigOp.OpType.SET);
-
-    return Mono.just(admin)
-        .map(a -> a.getAdminClient().incrementalAlterConfigs(Map.of(cr, List.of(op))))
-        .flatMap(result -> ClusterUtil.toMono(result.all()))
+    return adminClientService.get(cluster)
+        .flatMap(ac -> ac.updateBrokerConfigByName(broker, name, value))
         .onErrorResume(InvalidRequestException.class,
             e -> Mono.error(new InvalidRequestApiException(e.getMessage())))
         .doOnError(log::error);

+ 63 - 102
kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/OffsetsResetService.java

@@ -1,28 +1,23 @@
 package com.provectus.kafka.ui.service;
 
-import static com.google.common.base.Preconditions.checkArgument;
-import static com.provectus.kafka.ui.util.ClusterUtil.toMono;
 import static java.util.stream.Collectors.toMap;
 import static java.util.stream.Collectors.toSet;
 import static org.apache.kafka.common.ConsumerGroupState.DEAD;
 import static org.apache.kafka.common.ConsumerGroupState.EMPTY;
 
+import com.google.common.base.Preconditions;
 import com.provectus.kafka.ui.exception.NotFoundException;
 import com.provectus.kafka.ui.exception.ValidationException;
 import com.provectus.kafka.ui.model.KafkaCluster;
 import java.util.Collection;
 import java.util.HashMap;
-import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import javax.annotation.Nullable;
 import lombok.RequiredArgsConstructor;
 import lombok.extern.log4j.Log4j2;
-import org.apache.kafka.clients.admin.ConsumerGroupDescription;
-import org.apache.kafka.clients.consumer.Consumer;
-import org.apache.kafka.clients.consumer.ConsumerConfig;
-import org.apache.kafka.clients.consumer.OffsetAndMetadata;
-import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
+import org.apache.kafka.clients.admin.OffsetSpec;
 import org.apache.kafka.common.TopicPartition;
 import org.springframework.stereotype.Component;
 import reactor.core.publisher.Mono;
@@ -37,71 +32,74 @@ import reactor.core.publisher.Mono;
 @RequiredArgsConstructor
 public class OffsetsResetService {
 
-  private final KafkaService kafkaService;
   private final AdminClientService adminClientService;
 
-  public Mono<Map<TopicPartition, OffsetAndMetadata>> resetToEarliest(
+  public Mono<Void> resetToEarliest(
       KafkaCluster cluster, String group, String topic, Collection<Integer> partitions) {
     return checkGroupCondition(cluster, group)
-        .flatMap(g -> {
-          try (var consumer = getConsumer(cluster, group)) {
-            var targetPartitions = getTargetPartitions(consumer, topic, partitions);
-            var offsets = consumer.beginningOffsets(targetPartitions);
-            return commitOffsets(consumer, offsets);
-          }
-        });
+        .flatMap(ac ->
+            offsets(ac, topic, partitions, OffsetSpec.earliest())
+                .flatMap(offsets -> resetOffsets(ac, group, offsets)));
   }
 
-  public Mono<Map<TopicPartition, OffsetAndMetadata>> resetToLatest(
-      KafkaCluster cluster, String group, String topic, Collection<Integer> partitions) {
-    return checkGroupCondition(cluster, group).flatMap(
-        g -> {
-          try (var consumer = getConsumer(cluster, group)) {
-            var targetPartitions = getTargetPartitions(consumer, topic, partitions);
-            var offsets = consumer.endOffsets(targetPartitions);
-            return commitOffsets(consumer, offsets);
-          }
-        }
+  private Mono<Map<TopicPartition, Long>> offsets(ReactiveAdminClient client,
+                                                  String topic,
+                                                  @Nullable Collection<Integer> partitions,
+                                                  OffsetSpec spec) {
+    if (partitions == null) {
+      return client.listOffsets(topic, spec);
+    }
+    return client.listOffsets(
+        partitions.stream().map(idx -> new TopicPartition(topic, idx)).collect(toSet()),
+        spec
     );
   }
 
-  public Mono<Map<TopicPartition, OffsetAndMetadata>> resetToTimestamp(
+  public Mono<Void> resetToLatest(
+      KafkaCluster cluster, String group, String topic, Collection<Integer> partitions) {
+    return checkGroupCondition(cluster, group)
+        .flatMap(ac ->
+            offsets(ac, topic, partitions, OffsetSpec.latest())
+                .flatMap(offsets -> resetOffsets(ac, group, offsets)));
+  }
+
+  public Mono<Void> resetToTimestamp(
       KafkaCluster cluster, String group, String topic, Collection<Integer> partitions,
       long targetTimestamp) {
-    return checkGroupCondition(cluster, group).flatMap(
-        g -> {
-          try (var consumer = getConsumer(cluster, group)) {
-            var targetPartitions = getTargetPartitions(consumer, topic, partitions);
-            var offsets = offsetsByTimestamp(consumer, targetPartitions, targetTimestamp);
-            return commitOffsets(consumer, offsets);
-          }
-        }
-    );
+    return checkGroupCondition(cluster, group)
+        .flatMap(ac ->
+            offsets(ac, topic, partitions, OffsetSpec.forTimestamp(targetTimestamp))
+                .flatMap(
+                    foundOffsets -> offsets(ac, topic, partitions, OffsetSpec.latest())
+                        .map(endOffsets -> editTsOffsets(foundOffsets, endOffsets))
+                )
+                .flatMap(offsets -> resetOffsets(ac, group, offsets))
+        );
   }
 
-  public Mono<Map<TopicPartition, OffsetAndMetadata>> resetToOffsets(
+  public Mono<Void> resetToOffsets(
       KafkaCluster cluster, String group, String topic, Map<Integer, Long> targetOffsets) {
+    Preconditions.checkNotNull(targetOffsets);
+    var partitionOffsets = targetOffsets.entrySet().stream()
+        .collect(toMap(e -> new TopicPartition(topic, e.getKey()), Map.Entry::getValue));
     return checkGroupCondition(cluster, group).flatMap(
-        g -> {
-          try (var consumer = getConsumer(cluster, group)) {
-            var offsets = targetOffsets.entrySet().stream()
-                .collect(toMap(e -> new TopicPartition(topic, e.getKey()), Map.Entry::getValue));
-            offsets = editOffsetsIfNeeded(consumer, offsets);
-            return commitOffsets(consumer, offsets);
-          }
-        }
+        ac ->
+            ac.listOffsets(partitionOffsets.keySet(), OffsetSpec.earliest())
+                .flatMap(earliest ->
+                    ac.listOffsets(partitionOffsets.keySet(), OffsetSpec.latest())
+                        .map(latest -> editOffsetsBounds(partitionOffsets, earliest, latest))
+                        .flatMap(offsetsToCommit -> resetOffsets(ac, group, offsetsToCommit)))
     );
   }
 
-  private Mono<ConsumerGroupDescription> checkGroupCondition(KafkaCluster cluster, String groupId) {
-    return adminClientService.getOrCreateAdminClient(cluster)
+  private Mono<ReactiveAdminClient> checkGroupCondition(KafkaCluster cluster, String groupId) {
+    return adminClientService.get(cluster)
         .flatMap(ac ->
             // we need to call listConsumerGroups() to check group existence, because
             // describeConsumerGroups() will return consumer group even if it doesn't exist
-            toMono(ac.getAdminClient().listConsumerGroups().all())
-                .filter(cgs -> cgs.stream().anyMatch(g -> g.groupId().equals(groupId)))
-                .flatMap(cgs -> toMono(
-                    ac.getAdminClient().describeConsumerGroups(List.of(groupId)).all()))
+            ac.listConsumerGroups()
+                .filter(cgs -> cgs.stream().anyMatch(g -> g.equals(groupId)))
+                .flatMap(cgs -> ac.describeConsumerGroups(List.of(groupId)))
                 .filter(cgs -> cgs.containsKey(groupId))
                 .map(cgs -> cgs.get(groupId))
                 .flatMap(cg -> {
@@ -116,47 +114,18 @@ public class OffsetsResetService {
                         )
                     );
                   }
-                  return Mono.just(cg);
+                  return Mono.just(ac);
                 })
                 .switchIfEmpty(Mono.error(new NotFoundException("Consumer group not found")))
         );
   }
 
-  private Map<TopicPartition, Long> offsetsByTimestamp(Consumer<?, ?> consumer,
-                                                       Set<TopicPartition> partitions,
-                                                       long timestamp) {
-    Map<TopicPartition, OffsetAndTimestamp> timestampedOffsets = consumer
-        .offsetsForTimes(partitions.stream().collect(toMap(p -> p, p -> timestamp)));
-
-    var foundOffsets = timestampedOffsets.entrySet().stream()
-        .filter(e -> e.getValue() != null)
-        .collect(toMap(Map.Entry::getKey, e -> e.getValue().offset()));
-
+  private Map<TopicPartition, Long> editTsOffsets(Map<TopicPartition, Long> foundTsOffsets,
+                                                  Map<TopicPartition, Long> endOffsets) {
     // for partitions where we didnt find offset by timestamp, we use end offsets
-    Set<TopicPartition> endOffsets = new HashSet<>(partitions);
-    endOffsets.removeAll(foundOffsets.keySet());
-    foundOffsets.putAll(consumer.endOffsets(endOffsets));
-
-    return foundOffsets;
-  }
-
-  private Set<TopicPartition> getTargetPartitions(Consumer<?, ?> consumer, String topic,
-                                                  Collection<Integer> partitions) {
-    var allPartitions = allTopicPartitions(consumer, topic);
-    if (partitions == null || partitions.isEmpty()) {
-      return allPartitions;
-    } else {
-      return partitions.stream()
-          .map(idx -> new TopicPartition(topic, idx))
-          .peek(tp -> checkArgument(allPartitions.contains(tp), "Invalid partition %s", tp))
-          .collect(toSet());
-    }
-  }
-
-  private Set<TopicPartition> allTopicPartitions(Consumer<?, ?> consumer, String topic) {
-    return consumer.partitionsFor(topic).stream()
-        .map(info -> new TopicPartition(topic, info.partition()))
-        .collect(toSet());
+    Map<TopicPartition, Long> result = new HashMap<>(endOffsets);
+    result.putAll(foundTsOffsets);
+    return result;
   }
 
   /**
@@ -164,10 +133,9 @@ public class OffsetsResetService {
    * fail we reset offset to either earliest or latest offsets (To follow logic from
    * kafka.admin.ConsumerGroupCommand.scala)
    */
-  private Map<TopicPartition, Long> editOffsetsIfNeeded(Consumer<?, ?> consumer,
-                                                        Map<TopicPartition, Long> offsetsToCheck) {
-    var earliestOffsets = consumer.beginningOffsets(offsetsToCheck.keySet());
-    var latestOffsets = consumer.endOffsets(offsetsToCheck.keySet());
+  private Map<TopicPartition, Long> editOffsetsBounds(Map<TopicPartition, Long> offsetsToCheck,
+                                                      Map<TopicPartition, Long> earliestOffsets,
+                                                      Map<TopicPartition, Long> latestOffsets) {
     var result = new HashMap<TopicPartition, Long>();
     offsetsToCheck.forEach((tp, offset) -> {
       if (earliestOffsets.get(tp) > offset) {
@@ -184,17 +152,10 @@ public class OffsetsResetService {
     return result;
   }
 
-  private Mono<Map<TopicPartition, OffsetAndMetadata>> commitOffsets(
-      Consumer<?, ?> consumer, Map<TopicPartition, Long> offsets
-  ) {
-    var toCommit = offsets.entrySet().stream()
-        .collect(toMap(Map.Entry::getKey, e -> new OffsetAndMetadata(e.getValue())));
-    consumer.commitSync(toCommit);
-    return Mono.just(toCommit);
-  }
-
-  private Consumer<?, ?> getConsumer(KafkaCluster cluster, String groupId) {
-    return kafkaService.createConsumer(cluster, Map.of(ConsumerConfig.GROUP_ID_CONFIG, groupId));
+  private Mono<Void> resetOffsets(ReactiveAdminClient adminClient,
+                                  String groupId,
+                                  Map<TopicPartition, Long> offsets) {
+    return adminClient.alterConsumerGroupOffsets(groupId, offsets);
   }
 
 }

+ 311 - 0
kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/ReactiveAdminClient.java

@@ -0,0 +1,311 @@
+package com.provectus.kafka.ui.service;
+
+import static com.google.common.util.concurrent.Uninterruptibles.getUninterruptibly;
+import static java.util.stream.Collectors.toList;
+import static java.util.stream.Collectors.toMap;
+
+import com.provectus.kafka.ui.util.MapUtil;
+import com.provectus.kafka.ui.util.NumberUtil;
+import java.io.Closeable;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+import lombok.RequiredArgsConstructor;
+import lombok.Value;
+import lombok.extern.log4j.Log4j2;
+import org.apache.kafka.clients.admin.AdminClient;
+import org.apache.kafka.clients.admin.AlterConfigOp;
+import org.apache.kafka.clients.admin.Config;
+import org.apache.kafka.clients.admin.ConfigEntry;
+import org.apache.kafka.clients.admin.ConsumerGroupDescription;
+import org.apache.kafka.clients.admin.ConsumerGroupListing;
+import org.apache.kafka.clients.admin.DescribeClusterOptions;
+import org.apache.kafka.clients.admin.DescribeConfigsOptions;
+import org.apache.kafka.clients.admin.ListTopicsOptions;
+import org.apache.kafka.clients.admin.NewPartitionReassignment;
+import org.apache.kafka.clients.admin.NewPartitions;
+import org.apache.kafka.clients.admin.NewTopic;
+import org.apache.kafka.clients.admin.OffsetSpec;
+import org.apache.kafka.clients.admin.RecordsToDelete;
+import org.apache.kafka.clients.admin.TopicDescription;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.common.KafkaFuture;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.TopicPartitionReplica;
+import org.apache.kafka.common.acl.AclOperation;
+import org.apache.kafka.common.config.ConfigResource;
+import org.apache.kafka.common.requests.DescribeLogDirsResponse;
+import reactor.core.publisher.Mono;
+
+
+@Log4j2
+@RequiredArgsConstructor
+public class ReactiveAdminClient implements Closeable {
+
+  private enum SupportedFeature {
+    INCREMENTAL_ALTER_CONFIGS,
+    ALTER_CONFIGS
+  }
+
+  @Value
+  public static class ClusterDescription {
+    Node controller;
+    String clusterId;
+    Collection<Node> nodes;
+    Set<AclOperation> authorizedOperations;
+  }
+
+  public static Mono<ReactiveAdminClient> create(AdminClient adminClient) {
+    return getClusterVersionImpl(adminClient)
+        .map(ver ->
+            new ReactiveAdminClient(
+                adminClient,
+                Set.of(getSupportedUpdateFeatureForVersion(ver))));
+  }
+
+  private static SupportedFeature getSupportedUpdateFeatureForVersion(String versionStr) {
+    float version = NumberUtil.parserClusterVersion(versionStr);
+    return version <= 2.3f
+        ? SupportedFeature.ALTER_CONFIGS
+        : SupportedFeature.INCREMENTAL_ALTER_CONFIGS;
+  }
+
+  //TODO: discuss - maybe we should map kafka-library's exceptions to our exceptions here
+  private static <T> Mono<T> toMono(KafkaFuture<T> future) {
+    return Mono.create(sink -> future.whenComplete((res, ex) -> {
+      if (ex != null) {
+        sink.error(ex);
+      } else {
+        sink.success(res);
+      }
+    }));
+  }
+
+  //---------------------------------------------------------------------------------
+
+  private final AdminClient client;
+  private final Set<SupportedFeature> features;
+
+  public Mono<Set<String>> listTopics(boolean listInternal) {
+    return toMono(client.listTopics(new ListTopicsOptions().listInternal(listInternal)).names());
+  }
+
+  public Mono<Void> deleteTopic(String topicName) {
+    return toMono(client.deleteTopics(List.of(topicName)).all());
+  }
+
+  public Mono<Map<String, List<ConfigEntry>>> getTopicsConfig(Collection<String> topicNames) {
+    List<ConfigResource> resources = topicNames.stream()
+        .map(topicName -> new ConfigResource(ConfigResource.Type.TOPIC, topicName))
+        .collect(toList());
+
+    return toMono(
+        client.describeConfigs(
+            resources,
+            new DescribeConfigsOptions().includeSynonyms(true)
+        ).all())
+        .map(config -> config.entrySet().stream()
+            .collect(toMap(
+                c -> c.getKey().name(),
+                c -> new ArrayList<>(c.getValue().entries()))));
+  }
+
+  public Mono<Map<Integer, List<ConfigEntry>>> loadBrokersConfig(List<Integer> brokerIds) {
+    List<ConfigResource> resources = brokerIds.stream()
+        .map(brokerId -> new ConfigResource(ConfigResource.Type.BROKER, Integer.toString(brokerId)))
+        .collect(toList());
+    return toMono(client.describeConfigs(resources).all())
+        .map(config -> config.entrySet().stream()
+            .collect(toMap(
+                c -> Integer.valueOf(c.getKey().name()),
+                c -> new ArrayList<>(c.getValue().entries()))));
+  }
+
+  public Mono<Map<String, TopicDescription>> describeTopics(Collection<String> topics) {
+    return toMono(client.describeTopics(topics).all());
+  }
+
+  public Mono<Map<Integer, Map<String, DescribeLogDirsResponse.LogDirInfo>>> describeLogDirs() {
+    return describeCluster()
+        .map(d -> d.getNodes().stream().map(Node::id).collect(toList()))
+        .flatMap(this::describeLogDirs);
+  }
+
+  public Mono<Map<Integer, Map<String, DescribeLogDirsResponse.LogDirInfo>>> describeLogDirs(
+      Collection<Integer> brokerIds) {
+    return toMono(client.describeLogDirs(brokerIds).all());
+  }
+
+  public Mono<ClusterDescription> describeCluster() {
+    var r = client.describeCluster(new DescribeClusterOptions().includeAuthorizedOperations(true));
+    var all = KafkaFuture.allOf(r.nodes(), r.clusterId(), r.controller(), r.authorizedOperations());
+    return Mono.create(sink -> all.whenComplete((res, ex) -> {
+      if (ex != null) {
+        sink.error(ex);
+      } else {
+        try {
+          sink.success(
+              new ClusterDescription(
+                  getUninterruptibly(r.controller()),
+                  getUninterruptibly(r.clusterId()),
+                  getUninterruptibly(r.nodes()),
+                  getUninterruptibly(r.authorizedOperations())
+              )
+          );
+        } catch (ExecutionException e) {
+          // can't be here, because all futures already completed
+        }
+      }
+    }));
+  }
+
+  private static Mono<String> getClusterVersionImpl(AdminClient client) {
+    return toMono(client.describeCluster().controller()).flatMap(controller ->
+        toMono(client.describeConfigs(
+                List.of(new ConfigResource(
+                    ConfigResource.Type.BROKER, String.valueOf(controller.id()))))
+            .all()
+            .thenApply(configs ->
+                configs.values().stream()
+                    .map(Config::entries)
+                    .flatMap(Collection::stream)
+                    .filter(entry -> entry.name().contains("inter.broker.protocol.version"))
+                    .findFirst().map(ConfigEntry::value)
+                    .orElse("1.0-UNKNOWN")
+            )));
+  }
+
+  public Mono<String> getClusterVersion() {
+    return getClusterVersionImpl(client);
+  }
+
+  public Mono<Void> deleteConsumerGroups(Collection<String> groupIds) {
+    return toMono(client.deleteConsumerGroups(groupIds).all());
+  }
+
+  public Mono<Void> createTopic(String name,
+                                int numPartitions,
+                                short replicationFactor,
+                                Map<String, String> configs) {
+    return toMono(client.createTopics(
+        List.of(new NewTopic(name, numPartitions, replicationFactor).configs(configs))).all());
+  }
+
+  public Mono<Void> alterPartitionReassignments(
+      Map<TopicPartition, Optional<NewPartitionReassignment>> reassignments) {
+    return toMono(client.alterPartitionReassignments(reassignments).all());
+  }
+
+  public Mono<Void> createPartitions(Map<String, NewPartitions> newPartitionsMap) {
+    return toMono(client.createPartitions(newPartitionsMap).all());
+  }
+
+  public Mono<Void> updateTopicConfig(String topicName, Map<String, String> configs) {
+    if (features.contains(SupportedFeature.INCREMENTAL_ALTER_CONFIGS)) {
+      return incrementalAlterConfig(topicName, configs);
+    } else {
+      return alterConfig(topicName, configs);
+    }
+  }
+
+  public Mono<List<String>> listConsumerGroups() {
+    return toMono(client.listConsumerGroups().all())
+        .map(lst -> lst.stream().map(ConsumerGroupListing::groupId).collect(toList()));
+  }
+
+  public Mono<Map<String, ConsumerGroupDescription>> describeConsumerGroups(List<String> groupIds) {
+    return toMono(client.describeConsumerGroups(groupIds).all());
+  }
+
+  public Mono<Map<TopicPartition, OffsetAndMetadata>> listConsumerGroupOffsets(String groupId) {
+    return toMono(client.listConsumerGroupOffsets(groupId).partitionsToOffsetAndMetadata())
+        .map(MapUtil::removeNullValues);
+  }
+
+  public Mono<Void> alterConsumerGroupOffsets(String groupId, Map<TopicPartition, Long> offsets) {
+    return toMono(client.alterConsumerGroupOffsets(
+            groupId,
+            offsets.entrySet().stream()
+                .collect(toMap(Map.Entry::getKey, e -> new OffsetAndMetadata(e.getValue()))))
+        .all());
+  }
+
+  public Mono<Map<TopicPartition, Long>> listOffsets(String topic,
+                                                     OffsetSpec offsetSpec) {
+    return topicPartitions(topic).flatMap(tps -> listOffsets(tps, offsetSpec));
+  }
+
+  public Mono<Map<TopicPartition, Long>> listOffsets(Set<TopicPartition> partitions,
+                                                     OffsetSpec offsetSpec) {
+    return toMono(
+        client.listOffsets(partitions.stream().collect(toMap(tp -> tp, tp -> offsetSpec))).all())
+        .map(offsets -> offsets.entrySet()
+            .stream()
+            // filtering partitions for which offsets were not found
+            .filter(e -> e.getValue().offset() >= 0)
+            .collect(toMap(Map.Entry::getKey, e -> e.getValue().offset())));
+  }
+
+  private Mono<Set<TopicPartition>> topicPartitions(String topic) {
+    return toMono(client.describeTopics(List.of(topic)).all())
+        .map(r -> r.values().stream()
+            .findFirst()
+            .stream()
+            .flatMap(d -> d.partitions().stream())
+            .map(p -> new TopicPartition(topic, p.partition()))
+            .collect(Collectors.toSet())
+        );
+  }
+
+  public Mono<Void> updateBrokerConfigByName(Integer brokerId, String name, String value) {
+    ConfigResource cr = new ConfigResource(ConfigResource.Type.BROKER, String.valueOf(brokerId));
+    AlterConfigOp op = new AlterConfigOp(new ConfigEntry(name, value), AlterConfigOp.OpType.SET);
+    return toMono(client.incrementalAlterConfigs(Map.of(cr, List.of(op))).all());
+  }
+
+  public Mono<Void> deleteRecords(Map<TopicPartition, Long> offsets) {
+    var records = offsets.entrySet().stream()
+        .map(entry -> Map.entry(entry.getKey(), RecordsToDelete.beforeOffset(entry.getValue())))
+        .collect(toMap(Map.Entry::getKey, Map.Entry::getValue));
+    return toMono(client.deleteRecords(records).all());
+  }
+
+  public Mono<Void> alterReplicaLogDirs(Map<TopicPartitionReplica, String> replicaAssignment) {
+    return toMono(client.alterReplicaLogDirs(replicaAssignment).all());
+  }
+
+  private Mono<Void> incrementalAlterConfig(String topicName, Map<String, String> configs) {
+    var config = configs.entrySet().stream()
+        .flatMap(cfg -> Stream.of(
+            new AlterConfigOp(
+                new ConfigEntry(
+                    cfg.getKey(),
+                    cfg.getValue()),
+                AlterConfigOp.OpType.SET)))
+        .collect(toList());
+    var topicResource = new ConfigResource(ConfigResource.Type.TOPIC, topicName);
+    return toMono(client.incrementalAlterConfigs(Map.of(topicResource, config)).all());
+  }
+
+  @SuppressWarnings("deprecation")
+  private Mono<Void> alterConfig(String topicName, Map<String, String> configs) {
+    List<ConfigEntry> configEntries = configs.entrySet().stream()
+        .flatMap(cfg -> Stream.of(new ConfigEntry(cfg.getKey(), cfg.getValue())))
+        .collect(toList());
+    Config config = new Config(configEntries);
+    var topicResource = new ConfigResource(ConfigResource.Type.TOPIC, topicName);
+    return toMono(client.alterConfigs(Map.of(topicResource, config)).all());
+  }
+
+  @Override
+  public void close() {
+    client.close();
+  }
+}

+ 0 - 69
kafka-ui-api/src/main/java/com/provectus/kafka/ui/util/ClusterUtil.java

@@ -8,7 +8,6 @@ import com.provectus.kafka.ui.model.ConsumerGroupDTO;
 import com.provectus.kafka.ui.model.ConsumerGroupDetailsDTO;
 import com.provectus.kafka.ui.model.ConsumerGroupStateDTO;
 import com.provectus.kafka.ui.model.ConsumerGroupTopicPartitionDTO;
-import com.provectus.kafka.ui.model.ExtendedAdminClient;
 import com.provectus.kafka.ui.model.InternalBrokerConfig;
 import com.provectus.kafka.ui.model.InternalConsumerGroup;
 import com.provectus.kafka.ui.model.InternalPartition;
@@ -24,7 +23,6 @@ import java.time.OffsetDateTime;
 import java.time.ZoneId;
 import java.util.ArrayList;
 import java.util.Collection;
-import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -33,48 +31,21 @@ import java.util.Set;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 import lombok.extern.log4j.Log4j2;
-import org.apache.kafka.clients.admin.AdminClient;
-import org.apache.kafka.clients.admin.Config;
 import org.apache.kafka.clients.admin.ConfigEntry;
 import org.apache.kafka.clients.admin.ConsumerGroupDescription;
 import org.apache.kafka.clients.admin.TopicDescription;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.OffsetAndMetadata;
-import org.apache.kafka.common.KafkaFuture;
 import org.apache.kafka.common.Node;
 import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.common.config.ConfigResource;
 import org.apache.kafka.common.record.TimestampType;
 import org.apache.kafka.common.utils.Bytes;
-import reactor.core.publisher.Mono;
 
 @Log4j2
 public class ClusterUtil {
 
-  private static final String CLUSTER_VERSION_PARAM_KEY = "inter.broker.protocol.version";
-
   private static final ZoneId UTC_ZONE_ID = ZoneId.of("UTC");
 
-  public static <T> Mono<T> toMono(KafkaFuture<T> future) {
-    return Mono.create(sink -> future.whenComplete((res, ex) -> {
-      if (ex != null) {
-        sink.error(ex);
-      } else {
-        sink.success(res);
-      }
-    }));
-  }
-
-  public static Mono<String> toMono(KafkaFuture<Void> future, String topicName) {
-    return Mono.create(sink -> future.whenComplete((res, ex) -> {
-      if (ex != null) {
-        sink.error(ex);
-      } else {
-        sink.success(topicName);
-      }
-    }));
-  }
-
   public static InternalConsumerGroup convertToInternalConsumerGroup(
       ConsumerGroupDescription description, Map<TopicPartition, OffsetAndMetadata> offsets) {
 
@@ -334,46 +305,6 @@ public class ClusterUtil {
     }
   }
 
-  public static Mono<Set<ExtendedAdminClient.SupportedFeature>> getSupportedFeatures(
-      AdminClient adminClient) {
-    return getClusterVersion(adminClient)
-        .map(ClusterUtil::getSupportedUpdateFeature)
-        .map(Collections::singleton);
-  }
-
-  private static ExtendedAdminClient.SupportedFeature getSupportedUpdateFeature(String version) {
-    try {
-      final String[] parts = version.split("\\.");
-      if (parts.length > 2) {
-        version = parts[0] + "." + parts[1];
-      }
-      return Float.parseFloat(version.split("-")[0]) <= 2.3f
-          ? ExtendedAdminClient.SupportedFeature.ALTER_CONFIGS :
-          ExtendedAdminClient.SupportedFeature.INCREMENTAL_ALTER_CONFIGS;
-    } catch (Exception e) {
-      log.error("Conversion clusterVersion {} to float value failed", version);
-      throw e;
-    }
-  }
-
-  public static Mono<String> getClusterVersion(AdminClient adminClient) {
-    return ClusterUtil.toMono(adminClient.describeCluster().controller())
-        .map(Node::id)
-        .map(id -> Collections
-            .singletonList(new ConfigResource(ConfigResource.Type.BROKER, id.toString())))
-        .map(brokerCR -> adminClient.describeConfigs(brokerCR).all())
-        .flatMap(ClusterUtil::toMono)
-        .map(ClusterUtil::getClusterVersion);
-  }
-
-  public static String getClusterVersion(Map<ConfigResource, Config> configs) {
-    return configs.values().stream()
-        .map(Config::entries)
-        .flatMap(Collection::stream)
-        .filter(entry -> entry.name().contains(CLUSTER_VERSION_PARAM_KEY))
-        .findFirst().map(ConfigEntry::value).orElse("1.0-UNKNOWN");
-  }
-
 
   public static <T, R> Map<T, R> toSingleMap(Stream<Map<T, R>> streamOfMaps) {
     return streamOfMaps

+ 15 - 0
kafka-ui-api/src/main/java/com/provectus/kafka/ui/util/NumberUtil.java

@@ -1,7 +1,9 @@
 package com.provectus.kafka.ui.util;
 
+import lombok.extern.log4j.Log4j2;
 import org.apache.commons.lang3.math.NumberUtils;
 
+@Log4j2
 public class NumberUtil {
 
   private NumberUtil() {
@@ -10,4 +12,17 @@ public class NumberUtil {
   public static boolean isNumeric(Object value) {
     return value != null && NumberUtils.isCreatable(value.toString());
   }
+
+  public static float parserClusterVersion(String version) {
+    try {
+      final String[] parts = version.split("\\.");
+      if (parts.length > 2) {
+        version = parts[0] + "." + parts[1];
+      }
+      return Float.parseFloat(version.split("-")[0]);
+    } catch (Exception e) {
+      log.error("Conversion clusterVersion {} to float value failed", version);
+      throw e;
+    }
+  }
 }

+ 5 - 3
kafka-ui-api/src/test/java/com/provectus/kafka/ui/ReadOnlyModeTests.java

@@ -61,19 +61,21 @@ public class ReadOnlyModeTests extends AbstractBaseTest {
             .name(topicName)
             .partitions(1)
             .replicationFactor(1)
-            .configs(Map.of())
         )
         .exchange()
         .expectStatus()
         .isOk();
+
     webTestClient.patch()
         .uri("/api/clusters/{clusterName}/topics/{topicName}", LOCAL, topicName)
         .bodyValue(new TopicUpdateDTO()
-            .configs(Map.of())
+            .configs(Map.of("cleanup.policy", "compact"))
         )
         .exchange()
         .expectStatus()
-        .isOk();
+        .isOk()
+        .expectBody()
+        .jsonPath("$.cleanUpPolicy").isEqualTo("COMPACT");
   }
 
   @Test

+ 11 - 6
kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/OffsetsResetServiceTest.java

@@ -19,11 +19,13 @@ import java.util.stream.Stream;
 import org.apache.kafka.clients.admin.NewTopic;
 import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.clients.consumer.OffsetAndMetadata;
 import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.kafka.clients.producer.ProducerConfig;
 import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.serialization.BytesDeserializer;
 import org.apache.kafka.common.serialization.BytesSerializer;
 import org.apache.kafka.common.utils.Bytes;
 import org.junit.jupiter.api.AfterEach;
@@ -46,17 +48,13 @@ public class OffsetsResetServiceTest extends AbstractBaseTest {
   private final String groupId = "OffsetsResetServiceTestGroup-" + UUID.randomUUID();
   private final String topic = "OffsetsResetServiceTestTopic-" + UUID.randomUUID();
 
-  private KafkaService kafkaService;
   private OffsetsResetService offsetsResetService;
 
   @BeforeEach
   void init() {
     AdminClientServiceImpl adminClientService = new AdminClientServiceImpl();
-    BrokerService brokerService = new BrokerServiceImpl(adminClientService);
-    FeatureService featureService = new FeatureServiceImpl(brokerService);
     adminClientService.setClientTimeout(5_000);
-    kafkaService = new KafkaService(null, null, null, null, adminClientService, featureService);
-    offsetsResetService = new OffsetsResetService(kafkaService, adminClientService);
+    offsetsResetService = new OffsetsResetService(adminClientService);
 
     createTopic(new NewTopic(topic, PARTITIONS, (short) 1));
     createConsumerGroup();
@@ -228,7 +226,14 @@ public class OffsetsResetServiceTest extends AbstractBaseTest {
   }
 
   private Consumer<?, ?> groupConsumer() {
-    return kafkaService.createConsumer(CLUSTER, Map.of(ConsumerConfig.GROUP_ID_CONFIG, groupId));
+    Properties props = new Properties();
+    props.put(ConsumerConfig.CLIENT_ID_CONFIG, "kafka-ui-" + UUID.randomUUID());
+    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.getBootstrapServers());
+    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, BytesDeserializer.class);
+    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, BytesDeserializer.class);
+    props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+    props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
+    return new KafkaConsumer<>(props);
   }
 
 }

+ 1 - 1
kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/SendAndReadTests.java

@@ -200,7 +200,7 @@ public class SendAndReadTests extends AbstractBaseTest {
         .withKeySchema(AVRO_SCHEMA_PRIMITIVE_STRING)
         .withValueSchema(AVRO_SCHEMA_PRIMITIVE_INT)
         .withMsgToSend(
-            new CreateTopicMessage()
+            new CreateTopicMessageDTO()
                 .key("\"some string\"")
                 .content("123")
         )