Forráskód Böngészése

ISSUE-2968: Deleting a config parameter of topic doesn't take effect (#3001)

ReactiveAdminClient::incrementalAlterConfig now replaces all topic configs, not only altering changed
Ilya Kuramshin 2 éve
szülő
commit
052d8af456

+ 25 - 12
kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/ReactiveAdminClient.java

@@ -32,6 +32,8 @@ import java.util.function.Predicate;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 import javax.annotation.Nullable;
+import lombok.AccessLevel;
+import lombok.Getter;
 import lombok.RequiredArgsConstructor;
 import lombok.Value;
 import lombok.extern.slf4j.Slf4j;
@@ -147,6 +149,7 @@ public class ReactiveAdminClient implements Closeable {
 
   //---------------------------------------------------------------------------------
 
+  @Getter(AccessLevel.PACKAGE) // visible for testing
   private final AdminClient client;
   private final String version;
   private final Set<SupportedFeature> features;
@@ -361,9 +364,14 @@ public class ReactiveAdminClient implements Closeable {
     return toMono(client.createPartitions(newPartitionsMap).all());
   }
 
+
+  // NOTE: places whole current topic config with new one. Entries that were present in old config,
+  // but missed in new will be set to default
   public Mono<Void> updateTopicConfig(String topicName, Map<String, String> configs) {
     if (features.contains(SupportedFeature.INCREMENTAL_ALTER_CONFIGS)) {
-      return incrementalAlterConfig(topicName, configs);
+      return getTopicsConfigImpl(List.of(topicName), false)
+          .map(conf -> conf.getOrDefault(topicName, List.of()))
+          .flatMap(currentConfigs -> incrementalAlterConfig(topicName, currentConfigs, configs));
     } else {
       return alterConfig(topicName, configs);
     }
@@ -499,17 +507,22 @@ public class ReactiveAdminClient implements Closeable {
     return toMono(client.alterReplicaLogDirs(replicaAssignment).all());
   }
 
-  private Mono<Void> incrementalAlterConfig(String topicName, Map<String, String> configs) {
-    var config = configs.entrySet().stream()
-        .flatMap(cfg -> Stream.of(
-            new AlterConfigOp(
-                new ConfigEntry(
-                    cfg.getKey(),
-                    cfg.getValue()),
-                AlterConfigOp.OpType.SET)))
-        .collect(toList());
-    var topicResource = new ConfigResource(ConfigResource.Type.TOPIC, topicName);
-    return toMono(client.incrementalAlterConfigs(Map.of(topicResource, config)).all());
+  private Mono<Void> incrementalAlterConfig(String topicName,
+                                            List<ConfigEntry> currentConfigs,
+                                            Map<String, String> newConfigs) {
+    var configsToDelete = currentConfigs.stream()
+        .filter(e -> e.source() == ConfigEntry.ConfigSource.DYNAMIC_TOPIC_CONFIG) //manually set configs only
+        .filter(e -> !newConfigs.containsKey(e.name()))
+        .map(e -> new AlterConfigOp(e, AlterConfigOp.OpType.DELETE));
+
+    var configsToSet = newConfigs.entrySet().stream()
+        .map(e -> new AlterConfigOp(new ConfigEntry(e.getKey(), e.getValue()), AlterConfigOp.OpType.SET));
+
+    return toMono(client.incrementalAlterConfigs(
+        Map.of(
+            new ConfigResource(ConfigResource.Type.TOPIC, topicName),
+            Stream.concat(configsToDelete, configsToSet).toList()
+        )).all());
   }
 
   @SuppressWarnings("deprecation")

+ 92 - 0
kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/ReactiveAdminClientTest.java

@@ -0,0 +1,92 @@
+package com.provectus.kafka.ui.service;
+
+import static java.util.Objects.requireNonNull;
+import static org.assertj.core.api.Assertions.assertThat;
+
+import com.provectus.kafka.ui.AbstractIntegrationTest;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.stream.Stream;
+import lombok.SneakyThrows;
+import org.apache.kafka.clients.admin.AdminClient;
+import org.apache.kafka.clients.admin.AlterConfigOp;
+import org.apache.kafka.clients.admin.Config;
+import org.apache.kafka.clients.admin.ConfigEntry;
+import org.apache.kafka.clients.admin.NewTopic;
+import org.apache.kafka.common.config.ConfigResource;
+import org.junit.function.ThrowingRunnable;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import reactor.test.StepVerifier;
+
+class ReactiveAdminClientTest extends AbstractIntegrationTest {
+
+  private final List<ThrowingRunnable> clearings = new ArrayList<>();
+
+  private AdminClient adminClient;
+  private ReactiveAdminClient reactiveAdminClient;
+
+  @BeforeEach
+  void init() {
+    AdminClientService adminClientService = applicationContext.getBean(AdminClientService.class);
+    ClustersStorage clustersStorage = applicationContext.getBean(ClustersStorage.class);
+    reactiveAdminClient = requireNonNull(adminClientService.get(clustersStorage.getClusterByName(LOCAL).get()).block());
+    adminClient = reactiveAdminClient.getClient();
+  }
+
+  @AfterEach
+  void tearDown() {
+    for (ThrowingRunnable clearing : clearings) {
+      try {
+        clearing.run();
+      } catch (Throwable th) {
+        //NOOP
+      }
+    }
+  }
+
+  @Test
+  void testUpdateTopicConfigs() throws Exception {
+    String topic = UUID.randomUUID().toString();
+    createTopics(new NewTopic(topic, 1, (short) 1));
+
+    var configResource = new ConfigResource(ConfigResource.Type.TOPIC, topic);
+
+    adminClient.incrementalAlterConfigs(
+        Map.of(
+            configResource,
+            List.of(
+                new AlterConfigOp(new ConfigEntry("compression.type", "gzip"), AlterConfigOp.OpType.SET),
+                new AlterConfigOp(new ConfigEntry("retention.bytes", "12345678"), AlterConfigOp.OpType.SET)
+            )
+        )
+    ).all().get();
+
+    StepVerifier.create(
+        reactiveAdminClient.updateTopicConfig(
+            topic,
+            Map.of(
+                "compression.type", "snappy", //changing existing config
+                "file.delete.delay.ms", "12345" // adding new one
+            )
+        )
+    ).expectComplete().verify();
+
+    Config config = adminClient.describeConfigs(List.of(configResource)).values().get(configResource).get();
+    assertThat(config.get("retention.bytes").value()).isNotEqualTo("12345678"); // wes reset to default
+    assertThat(config.get("compression.type").value()).isEqualTo("snappy");
+    assertThat(config.get("file.delete.delay.ms").value()).isEqualTo("12345");
+  }
+
+
+  @SneakyThrows
+  void createTopics(NewTopic... topics) {
+    adminClient.createTopics(List.of(topics)).all().get();
+    clearings.add(() -> adminClient.deleteTopics(Stream.of(topics).map(NewTopic::name).toList()).all().get());
+  }
+
+
+}