Przeglądaj źródła

Changed caching of extendedAdminClient instances

Roman Nedzvetskiy 5 lat temu
rodzic
commit
dc440c1519

+ 3 - 3
kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/model/ExtendedAdminClient.java

@@ -6,16 +6,16 @@ import lombok.RequiredArgsConstructor;
 import org.apache.kafka.clients.admin.AdminClient;
 import reactor.core.publisher.Mono;
 
-import java.util.List;
+import java.util.Set;
 
 @Data
 @RequiredArgsConstructor
 public class ExtendedAdminClient {
 
     private final AdminClient adminClient;
-    private final List<SupportedFeatures> supportedFeatures;
+    private final Set<SupportedFeature> supportedFeatures;
 
-    public enum SupportedFeatures {
+    public enum SupportedFeature {
         INCREMENTAL_ALTER_CONFIGS,
         ALTER_CONFIGS
     }

+ 11 - 16
kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/util/ClusterUtil.java

@@ -11,10 +11,7 @@ import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.config.ConfigResource;
 import reactor.core.publisher.Mono;
 
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
+import java.util.*;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
@@ -140,27 +137,25 @@ public class ClusterUtil {
         return serverStatus.equals(ServerStatus.ONLINE) ? 1 : 0;
     }
 
-    public static Mono<List<ExtendedAdminClient.SupportedFeatures>> getSupportedFeatures(AdminClient adminClient) {
-        List<ExtendedAdminClient.SupportedFeatures> supportedFeatures = new ArrayList<>();
+    public static Mono<Set<ExtendedAdminClient.SupportedFeature>> getSupportedFeatures(AdminClient adminClient) {
         return ClusterUtil.toMono(adminClient.describeCluster().controller())
                 .map(Node::id)
                 .map(id -> Collections.singletonList(new ConfigResource(ConfigResource.Type.BROKER, id.toString())))
-                .flatMap(brokerCR -> ClusterUtil.toMono(adminClient.describeConfigs(brokerCR).all())
-                        .map(s -> {
-                            supportedFeatures.add(getSupportedUpdateFeature(s));
-                            return supportedFeatures;
-                        }));
+                .map(brokerCR -> adminClient.describeConfigs(brokerCR).all())
+                .flatMap(ClusterUtil::toMono)
+                .map(ClusterUtil::getSupportedUpdateFeature)
+                .map(Collections::singleton);
     }
 
-    private static ExtendedAdminClient.SupportedFeatures getSupportedUpdateFeature(Map<ConfigResource, Config> configs) {
+    private static ExtendedAdminClient.SupportedFeature getSupportedUpdateFeature(Map<ConfigResource, Config> configs) {
         String version = configs.values().stream()
-                .map(en -> en.entries().stream()
-                        .filter(en1 -> en1.name().contains(CLUSTER_VERSION_PARAM_KEY))
-                        .findFirst().orElseThrow())
+                .map(Config::entries)
+                .flatMap(Collection::stream)
+                .filter(entry -> entry.name().contains(CLUSTER_VERSION_PARAM_KEY))
                 .findFirst().orElseThrow().value();
         try {
             return Float.parseFloat(version.split("-")[0]) <= 2.3f
-                    ? ExtendedAdminClient.SupportedFeatures.ALTER_CONFIGS : ExtendedAdminClient.SupportedFeatures.INCREMENTAL_ALTER_CONFIGS;
+                    ? ExtendedAdminClient.SupportedFeature.ALTER_CONFIGS : ExtendedAdminClient.SupportedFeature.INCREMENTAL_ALTER_CONFIGS;
         } catch (Exception e) {
             log.error("Conversion clusterVersion {} to float value failed", version);
             throw e;

+ 5 - 10
kafka-ui-api/src/main/java/com/provectus/kafka/ui/kafka/KafkaService.java

@@ -39,7 +39,7 @@ public class KafkaService {
     private static final ListTopicsOptions LIST_TOPICS_OPTIONS = new ListTopicsOptions().listInternal(true);
 
     private final ZookeeperService zookeeperService;
-    private final Map<String, Mono<ExtendedAdminClient>> adminClientCache = new ConcurrentHashMap<>();
+    private final Map<String, ExtendedAdminClient> adminClientCache = new ConcurrentHashMap<>();
 
     @SneakyThrows
     public Mono<KafkaCluster> getUpdatedCluster(KafkaCluster cluster) {
@@ -183,10 +183,9 @@ public class KafkaService {
 
 
     public Mono<ExtendedAdminClient> getOrCreateAdminClient(KafkaCluster cluster) {
-        return adminClientCache.computeIfAbsent(
-                cluster.getName(),
-                (id) -> createAdminClient(cluster)
-        ).flatMap(this::isAdminClientConnected);
+        return Mono.justOrEmpty(adminClientCache.get(cluster.getName()))
+                .switchIfEmpty(createAdminClient(cluster))
+                .map(e -> adminClientCache.computeIfAbsent(cluster.getName(), key -> e));
     }
 
     public Mono<ExtendedAdminClient> createAdminClient(KafkaCluster kafkaCluster) {
@@ -197,10 +196,6 @@ public class KafkaService {
         return ExtendedAdminClient.extendedAdminClient(adminClient);
     }
 
-    private Mono<ExtendedAdminClient> isAdminClientConnected(ExtendedAdminClient adminClient) {
-        return getClusterId(adminClient.getAdminClient()).map( r -> adminClient);
-    }
-
 
 
     private Mono<TopicDescription> getTopicDescription(KafkaFuture<TopicDescription> entry, String topicName) {
@@ -250,7 +245,7 @@ public class KafkaService {
         ConfigResource topicCR = new ConfigResource(ConfigResource.Type.TOPIC, topicName);
         return getOrCreateAdminClient(cluster)
                 .flatMap(ac -> {
-                    if (ac.getSupportedFeatures().contains(ExtendedAdminClient.SupportedFeatures.INCREMENTAL_ALTER_CONFIGS)) {
+                    if (ac.getSupportedFeatures().contains(ExtendedAdminClient.SupportedFeature.INCREMENTAL_ALTER_CONFIGS)) {
                         return incrementalAlterConfig(topicFormData, topicCR, ac)
                                 .flatMap(c -> getUpdatedTopic(ac, topicName));
                     } else {