Selaa lähdekoodia

ReactiveAdminClient.SupportedFeature refactor

iliax 2 vuotta sitten
vanhempi
commit
86d931af75

+ 25 - 10
kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/ReactiveAdminClient.java

@@ -12,6 +12,7 @@ import com.provectus.kafka.ui.util.MapUtil;
 import com.provectus.kafka.ui.util.NumberUtil;
 import java.io.Closeable;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collection;
 import java.util.Iterator;
 import java.util.List;
@@ -69,8 +70,24 @@ import reactor.util.function.Tuples;
 public class ReactiveAdminClient implements Closeable {
 
   private enum SupportedFeature {
-    INCREMENTAL_ALTER_CONFIGS,
-    ALTER_CONFIGS
+    INCREMENTAL_ALTER_CONFIGS(2.3f),
+    CONFIG_DOCUMENTATION_RETRIEVAL(2.6f);
+
+    private final float sinceVersion;
+
+    SupportedFeature(float sinceVersion) {
+      this.sinceVersion = sinceVersion;
+    }
+
+    static Set<SupportedFeature> forVersion(float kafkaVersion) {
+      return Arrays.stream(SupportedFeature.values())
+          .filter(f -> kafkaVersion >= f.sinceVersion)
+          .collect(Collectors.toSet());
+    }
+
+    static Set<SupportedFeature> defaultFeatures() {
+      return Set.of();
+    }
   }
 
   @Value
@@ -88,18 +105,15 @@ public class ReactiveAdminClient implements Closeable {
             new ReactiveAdminClient(
                 adminClient,
                 ver,
-                Set.of(getSupportedUpdateFeatureForVersion(ver))));
+                getSupportedUpdateFeaturesForVersion(ver)));
   }
 
-  private static SupportedFeature getSupportedUpdateFeatureForVersion(String versionStr) {
+  private static Set<SupportedFeature> getSupportedUpdateFeaturesForVersion(String versionStr) {
     try {
       float version = NumberUtil.parserClusterVersion(versionStr);
-      return version <= 2.3f
-          ? SupportedFeature.ALTER_CONFIGS
-          : SupportedFeature.INCREMENTAL_ALTER_CONFIGS;
+      return SupportedFeature.forVersion(version);
     } catch (NumberFormatException e) {
-      log.info("Assuming non-incremental alter configs due to version parsing error");
-      return SupportedFeature.ALTER_CONFIGS;
+      return SupportedFeature.defaultFeatures();
     }
   }
 
@@ -149,11 +163,12 @@ public class ReactiveAdminClient implements Closeable {
   }
 
   public Mono<Map<String, List<ConfigEntry>>> getTopicsConfig(Collection<String> topicNames, boolean includeDoc) {
+    var includeDocFixed = features.contains(SupportedFeature.CONFIG_DOCUMENTATION_RETRIEVAL) && includeDoc;
     // we need to partition calls, because it can lead to AdminClient timeouts in case of large topics count
     return partitionCalls(
         topicNames,
         200,
-        part -> getTopicsConfigImpl(part, includeDoc),
+        part -> getTopicsConfigImpl(part, includeDocFixed),
         (m1, m2) -> ImmutableMap.<String, List<ConfigEntry>>builder().putAll(m1).putAll(m2).build()
     );
   }

+ 3 - 3
kafka-ui-api/src/test/java/com/provectus/kafka/ui/KafkaConsumerTests.java

@@ -3,10 +3,10 @@ package com.provectus.kafka.ui;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.springframework.http.MediaType.TEXT_EVENT_STREAM;
 
-import com.provectus.kafka.ui.api.model.TopicConfig;
 import com.provectus.kafka.ui.model.BrokerConfigDTO;
 import com.provectus.kafka.ui.model.PartitionsIncreaseDTO;
 import com.provectus.kafka.ui.model.PartitionsIncreaseResponseDTO;
+import com.provectus.kafka.ui.model.TopicConfigDTO;
 import com.provectus.kafka.ui.model.TopicCreationDTO;
 import com.provectus.kafka.ui.model.TopicDetailsDTO;
 import com.provectus.kafka.ui.model.TopicMessageEventDTO;
@@ -206,12 +206,12 @@ public class KafkaConsumerTests extends AbstractIntegrationTest {
             .expectStatus()
             .isOk();
 
-    List<TopicConfig> configs = webTestClient.get()
+    List<TopicConfigDTO> configs = webTestClient.get()
             .uri("/api/clusters/{clusterName}/topics/{topicName}/config", LOCAL, topicName)
             .exchange()
             .expectStatus()
             .isOk()
-            .expectBodyList(TopicConfig.class)
+            .expectBodyList(TopicConfigDTO.class)
             .returnResult()
             .getResponseBody();