Parcourir la source

BE: Fix sanitizing on KC null values config entries (#3937)

Co-authored-by: iliax <ikuramshin@provectus.com>
Ilya Kuramshin il y a 2 ans
Parent
commit
03b7d1bd60

+ 12 - 5
kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/KafkaConfigSanitizer.java

@@ -5,11 +5,13 @@ import static java.util.regex.Pattern.CASE_INSENSITIVE;
 import com.google.common.collect.ImmutableList;
 import java.util.Arrays;
 import java.util.Collection;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.regex.Pattern;
 import java.util.stream.Collectors;
+import javax.annotation.Nullable;
 import org.apache.kafka.common.config.ConfigDef;
 import org.apache.kafka.common.config.SaslConfigs;
 import org.apache.kafka.common.config.SslConfigs;
@@ -17,7 +19,7 @@ import org.springframework.beans.factory.annotation.Value;
 import org.springframework.stereotype.Component;
 
 @Component
-class KafkaConfigSanitizer  {
+class KafkaConfigSanitizer {
 
   private static final String SANITIZED_VALUE = "******";
 
@@ -65,10 +67,8 @@ class KafkaConfigSanitizer  {
         .collect(Collectors.toSet());
   }
 
-  public Object sanitize(String key, Object value) {
-    if (value == null) {
-      return null;
-    }
+  @Nullable
+  public Object sanitize(String key, @Nullable Object value) {
     for (Pattern pattern : sanitizeKeysPatterns) {
       if (pattern.matcher(key).matches()) {
         return SANITIZED_VALUE;
@@ -77,5 +77,12 @@ class KafkaConfigSanitizer  {
     return value;
   }
 
+  public Map<String, Object> sanitizeConnectorConfig(@Nullable Map<String, Object> original) {
+    var result = new HashMap<String, Object>(); //null-values supporting map!
+    if (original != null) {
+      original.forEach((k, v) -> result.put(k, sanitize(k, v)));
+    }
+    return result;
+  }
 
 }

+ 4 - 15
kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/KafkaConnectService.java

@@ -24,7 +24,6 @@ import com.provectus.kafka.ui.model.NewConnectorDTO;
 import com.provectus.kafka.ui.model.TaskDTO;
 import com.provectus.kafka.ui.model.connect.InternalConnectInfo;
 import com.provectus.kafka.ui.util.ReactiveFailover;
-import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
@@ -176,19 +175,14 @@ public class KafkaConnectService {
                         e -> emptyStatus(connectorName))
                     .map(connectorStatus -> {
                       var status = connectorStatus.getConnector();
-                      final Map<String, Object> obfuscatedConfig = connector.getConfig().entrySet()
-                          .stream()
-                          .collect(Collectors.toMap(
-                              Map.Entry::getKey,
-                              e -> kafkaConfigSanitizer.sanitize(e.getKey(), e.getValue())
-                          ));
-                      ConnectorDTO result = (ConnectorDTO) new ConnectorDTO()
+                      var sanitizedConfig = kafkaConfigSanitizer.sanitizeConnectorConfig(connector.getConfig());
+                      ConnectorDTO result = new ConnectorDTO()
                           .connect(connectName)
                           .status(kafkaConnectMapper.fromClient(status))
                           .type(connector.getType())
                           .tasks(connector.getTasks())
                           .name(connector.getName())
-                          .config(obfuscatedConfig);
+                          .config(sanitizedConfig);
 
                       if (connectorStatus.getTasks() != null) {
                         boolean isAnyTaskFailed = connectorStatus.getTasks().stream()
@@ -217,12 +211,7 @@ public class KafkaConnectService {
                                                       String connectorName) {
     return api(cluster, connectName)
         .mono(c -> c.getConnectorConfig(connectorName))
-        .map(connectorConfig -> {
-          final Map<String, Object> obfuscatedMap = new HashMap<>();
-          connectorConfig.forEach((key, value) ->
-              obfuscatedMap.put(key, kafkaConfigSanitizer.sanitize(key, value)));
-          return obfuscatedMap;
-        });
+        .map(kafkaConfigSanitizer::sanitizeConnectorConfig);
   }
 
   public Mono<ConnectorDTO> setConnectorConfig(KafkaCluster cluster, String connectName,

+ 24 - 4
kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/KafkaConfigSanitizerTest.java

@@ -3,14 +3,16 @@ package com.provectus.kafka.ui.service;
 import static org.assertj.core.api.Assertions.assertThat;
 
 import java.util.Arrays;
-import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
 import org.junit.jupiter.api.Test;
 
 class KafkaConfigSanitizerTest {
 
   @Test
   void doNothingIfEnabledPropertySetToFalse() {
-    final var sanitizer = new KafkaConfigSanitizer(false, Collections.emptyList());
+    final var sanitizer = new KafkaConfigSanitizer(false, List.of());
     assertThat(sanitizer.sanitize("password", "secret")).isEqualTo("secret");
     assertThat(sanitizer.sanitize("sasl.jaas.config", "secret")).isEqualTo("secret");
     assertThat(sanitizer.sanitize("database.password", "secret")).isEqualTo("secret");
@@ -18,7 +20,7 @@ class KafkaConfigSanitizerTest {
 
   @Test
   void obfuscateCredentials() {
-    final var sanitizer = new KafkaConfigSanitizer(true, Collections.emptyList());
+    final var sanitizer = new KafkaConfigSanitizer(true, List.of());
     assertThat(sanitizer.sanitize("sasl.jaas.config", "secret")).isEqualTo("******");
     assertThat(sanitizer.sanitize("consumer.sasl.jaas.config", "secret")).isEqualTo("******");
     assertThat(sanitizer.sanitize("producer.sasl.jaas.config", "secret")).isEqualTo("******");
@@ -36,7 +38,7 @@ class KafkaConfigSanitizerTest {
 
   @Test
   void notObfuscateNormalConfigs() {
-    final var sanitizer = new KafkaConfigSanitizer(true, Collections.emptyList());
+    final var sanitizer = new KafkaConfigSanitizer(true, List.of());
     assertThat(sanitizer.sanitize("security.protocol", "SASL_SSL")).isEqualTo("SASL_SSL");
     final String[] bootstrapServer = new String[] {"test1:9092", "test2:9092"};
     assertThat(sanitizer.sanitize("bootstrap.servers", bootstrapServer)).isEqualTo(bootstrapServer);
@@ -52,4 +54,22 @@ class KafkaConfigSanitizerTest {
     assertThat(sanitizer.sanitize("database.password", "no longer credential"))
             .isEqualTo("no longer credential");
   }
+
+  @Test
+  void sanitizeConnectorConfigDoNotFailOnNullableValues() {
+    Map<String, Object> originalConfig = new HashMap<>();
+    originalConfig.put("password", "secret");
+    originalConfig.put("asIs", "normal");
+    originalConfig.put("nullVal", null);
+
+    var sanitizedConfig = new KafkaConfigSanitizer(true, List.of())
+        .sanitizeConnectorConfig(originalConfig);
+
+    assertThat(sanitizedConfig)
+        .hasSize(3)
+        .containsEntry("password", "******")
+        .containsEntry("asIs", "normal")
+        .containsEntry("nullVal", null);
+  }
+
 }