Procházet zdrojové kódy

Data masking (#2850)

Data masking:
1. properties & mapping added to ClustersProperties
2. DataMasking provides function that doing masking for specified topic & target
3. Masking policies implemented: MASK, REMOVE, REPLACE
Ilya Kuramshin před 2 roky
rodič
revize
20ecc74dd9
18 změnil soubory, kde provedl 847 přidání a 8 odebrání
  1. 1 1
      .github/workflows/documentation.yaml
  2. 123 0
      documentation/guides/DataMasking.md
  3. 15 0
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/config/ClustersProperties.java
  4. 5 0
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/mapper/ClusterMapper.java
  5. 2 0
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/KafkaCluster.java
  6. 11 7
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/DeserializationService.java
  7. 17 0
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/MessagesService.java
  8. 92 0
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/masking/DataMasking.java
  9. 94 0
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/masking/policies/Mask.java
  10. 39 0
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/masking/policies/MaskingPolicy.java
  11. 43 0
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/masking/policies/Remove.java
  12. 64 0
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/masking/policies/Replace.java
  13. 3 0
      kafka-ui-api/src/test/java/com/provectus/kafka/ui/AbstractIntegrationTest.java
  14. 41 0
      kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/MessagesServiceTest.java
  15. 89 0
      kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/masking/DataMaskingTest.java
  16. 68 0
      kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/masking/policies/MaskTest.java
  17. 72 0
      kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/masking/policies/RemoveTest.java
  18. 68 0
      kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/masking/policies/ReplaceTest.java

+ 1 - 1
.github/workflows/documentation.yaml

@@ -18,6 +18,6 @@ jobs:
         uses: urlstechie/urlchecker-action@0.0.33
         with:
           exclude_patterns: localhost,127.0.,192.168.
-          exclude_urls: https://api.server,https://graph.microsoft.com/User.Read,https://dev-a63ggcut.auth0.com/
+          exclude_urls: https://api.server,https://graph.microsoft.com/User.Read,https://dev-a63ggcut.auth0.com/,http://main-schema-registry:8081,http://schema-registry:8081,http://another-yet-schema-registry:8081,http://another-schema-registry:8081
           print_all: false
           file_types: .md

+ 123 - 0
documentation/guides/DataMasking.md

@@ -0,0 +1,123 @@
+# Topics data masking
+
+You can configure kafka-ui to mask sensitive data shown in Messages page.
+
+Several masking policies supported:
+
+### REMOVE
+For json objects - remove target fields, otherwise - return "null" string.
+```yaml
+- type: REMOVE
+  fields: [ "id", "name" ]
+  ...
+```
+
+Apply examples:
+```
+{ "id": 1234, "name": { "first": "James" }, "age": 30 } 
+ ->
+{ "age": 30 } 
+```
+```
+non-json string -> null
+```
+
+### REPLACE
+For json objects - replace target field's values with specified replacement string (by default with `***DATA_MASKED***`). Note: if target field's value is object, then replacement applied to all its fields recursively (see example). 
+
+```yaml
+- type: REPLACE
+  fields: [ "id", "name" ]
+  replacement: "***"  #optional, "***DATA_MASKED***" by default
+  ...
+```
+
+Apply examples:
+```
+{ "id": 1234, "name": { "first": "James", "last": "Bond" }, "age": 30 } 
+ ->
+{ "id": "***", "name": { "first": "***", "last": "***" }, "age": 30 } 
+```
+```
+non-json string -> ***
+```
+
+### MASK
+Mask target field's values with specified masking characters, recursively (spaces and line separators will be kept as-is).
+`pattern` array specifies what symbols will be used to replace upper-case chars, lower-case chars, digits and other symbols correspondingly.
+
+```yaml
+- type: MASK
+  fields: [ "id", "name" ]
+  pattern: ["A", "a", "N", "_"]   # optional, default is ["X", "x", "n", "-"]
+  ...
+```
+
+Apply examples:
+```
+{ "id": 1234, "name": { "first": "James", "last": "Bond!" }, "age": 30 } 
+ ->
+{ "id": "NNNN", "name": { "first": "Aaaaa", "last": "Aaaa_" }, "age": 30 } 
+```
+```
+Some string! -> Aaaa aaaaaa_
+```
+
+----
+
+For each policy, if `fields` not specified, then policy will be applied to all object's fields or whole string if it is not a json-object.
+
+You can specify which masks will be applied to topic's keys/values. Multiple policies will be applied if topic matches both policy's patterns.
+
+Yaml configuration example:
+```yaml
+kafka:
+  clusters:
+    - name: ClusterName
+      # Other Cluster configuration omitted ... 
+      masking:
+        - type: REMOVE
+          fields: [ "id" ]
+          topicKeysPattern: "events-with-ids-.*"
+          topicValuesPattern: "events-with-ids-.*"
+          
+        - type: REPLACE
+          fields: [ "companyName", "organizationName" ]
+          replacement: "***MASKED_ORG_NAME***"   #optional
+          topicValuesPattern: "org-events-.*"
+        
+        - type: MASK
+          fields: [ "name", "surname" ]
+          pattern: ["A", "a", "N", "_"]  #optional
+          topicValuesPattern: "user-states"
+
+        - type: MASK
+          topicValuesPattern: "very-secured-topic"
+```
+
+Same configuration in env-vars fashion:
+```
+...
+KAFKA_CLUSTERS_0_MASKING_0_TYPE: REMOVE
+KAFKA_CLUSTERS_0_MASKING_0_FIELDS_0: "id"
+KAFKA_CLUSTERS_0_MASKING_0_TOPICKEYSPATTERN: "events-with-ids-.*"
+KAFKA_CLUSTERS_0_MASKING_0_TOPICVALUESPATTERN: "events-with-ids-.*"
+
+KAFKA_CLUSTERS_0_MASKING_1_TYPE: REPLACE
+KAFKA_CLUSTERS_0_MASKING_1_FIELDS_0: "companyName"
+KAFKA_CLUSTERS_0_MASKING_1_FIELDS_1: "organizationName"
+KAFKA_CLUSTERS_0_MASKING_1_REPLACEMENT: "***MASKED_ORG_NAME***"
+KAFKA_CLUSTERS_0_MASKING_1_TOPICVALUESPATTERN: "org-events-.*"
+
+KAFKA_CLUSTERS_0_MASKING_2_TYPE: MASK
+KAFKA_CLUSTERS_0_MASKING_2_FIELDS_0: "name"
+KAFKA_CLUSTERS_0_MASKING_2_FIELDS_1: "surname"
+KAFKA_CLUSTERS_0_MASKING_2_PATTERN_0: 'A'
+KAFKA_CLUSTERS_0_MASKING_2_PATTERN_1: 'a'
+KAFKA_CLUSTERS_0_MASKING_2_PATTERN_2: 'N'
+KAFKA_CLUSTERS_0_MASKING_2_PATTERN_3: '_'
+KAFKA_CLUSTERS_0_MASKING_2_TOPICVALUESPATTERN: "user-states"
+
+KAFKA_CLUSTERS_0_MASKING_3_TYPE: MASK
+KAFKA_CLUSTERS_0_MASKING_3_TOPICVALUESPATTERN: "very-secured-topic"
+```

+ 15 - 0
kafka-ui-api/src/main/java/com/provectus/kafka/ui/config/ClustersProperties.java

@@ -38,6 +38,7 @@ public class ClustersProperties {
     List<SerdeConfig> serde = new ArrayList<>();
     String defaultKeySerde;
     String defaultValueSerde;
+    List<Masking> masking = new ArrayList<>();
   }
 
   @Data
@@ -92,6 +93,20 @@ public class ClustersProperties {
     String password;
   }
 
+  @Data
+  public static class Masking {
+    Type type;
+    List<String> fields = List.of(); //if empty - policy will be applied to all fields
+    List<String> pattern = List.of("X", "x", "n", "-"); //used when type=MASK
+    String replacement = "***DATA_MASKED***"; //used when type=REPLACE
+    String topicKeysPattern;
+    String topicValuesPattern;
+
+    public enum Type {
+      REMOVE, MASK, REPLACE
+    }
+  }
+
   @PostConstruct
   public void validateAndSetDefaults() {
     validateClusterNames();

+ 5 - 0
kafka-ui-api/src/main/java/com/provectus/kafka/ui/mapper/ClusterMapper.java

@@ -34,6 +34,7 @@ import com.provectus.kafka.ui.model.TopicDTO;
 import com.provectus.kafka.ui.model.TopicDetailsDTO;
 import com.provectus.kafka.ui.model.schemaregistry.InternalCompatibilityCheck;
 import com.provectus.kafka.ui.model.schemaregistry.InternalCompatibilityLevel;
+import com.provectus.kafka.ui.service.masking.DataMasking;
 import com.provectus.kafka.ui.service.metrics.RawMetric;
 import java.util.Arrays;
 import java.util.Collections;
@@ -178,6 +179,10 @@ public interface ClusterMapper {
     return brokerDiskUsage;
   }
 
+  default DataMasking map(List<ClustersProperties.Masking> maskingProperties) {
+    return DataMasking.create(maskingProperties);
+  }
+
   @Named("setProperties")
   default Properties setProperties(Properties properties) {
     Properties copy = new Properties();

+ 2 - 0
kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/KafkaCluster.java

@@ -1,5 +1,6 @@
 package com.provectus.kafka.ui.model;
 
+import com.provectus.kafka.ui.service.masking.DataMasking;
 import java.util.List;
 import java.util.Properties;
 import lombok.AccessLevel;
@@ -21,4 +22,5 @@ public class KafkaCluster {
   private final boolean readOnly;
   private final boolean disableLogDirsCollection;
   private final MetricsConfig metricsConfig;
+  private final DataMasking masking;
 }

+ 11 - 7
kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/DeserializationService.java

@@ -24,7 +24,7 @@ import org.springframework.stereotype.Component;
 @Component
 public class DeserializationService implements Closeable {
 
-  private final Map<KafkaCluster, ClusterSerdes> clusterSerdes = new ConcurrentHashMap<>();
+  private final Map<String, ClusterSerdes> clusterSerdes = new ConcurrentHashMap<>();
 
   public DeserializationService(Environment env,
                                 ClustersStorage clustersStorage,
@@ -32,15 +32,19 @@ public class DeserializationService implements Closeable {
     for (int i = 0; i < clustersProperties.getClusters().size(); i++) {
       var clusterProperties = clustersProperties.getClusters().get(i);
       var cluster = clustersStorage.getClusterByName(clusterProperties.getName()).get();
-      clusterSerdes.put(cluster, new ClusterSerdes(env, clustersProperties, i));
+      clusterSerdes.put(cluster.getName(), new ClusterSerdes(env, clustersProperties, i));
     }
   }
 
+  private ClusterSerdes getSerdesFor(KafkaCluster cluster) {
+    return clusterSerdes.get(cluster.getName());
+  }
+
   private Serde.Serializer getSerializer(KafkaCluster cluster,
                                          String topic,
                                          Serde.Target type,
                                          String serdeName) {
-    var serdes = this.clusterSerdes.get(cluster);
+    var serdes = getSerdesFor(cluster);
     var serde = serdes.serdeForName(serdeName)
         .orElseThrow(() -> new ValidationException(
             String.format("Serde %s not found", serdeName)));
@@ -55,7 +59,7 @@ public class DeserializationService implements Closeable {
                                                String topic,
                                                Serde.Target type,
                                                @Nullable String serdeName) {
-    var serdes = this.clusterSerdes.get(cluster);
+    var serdes = getSerdesFor(cluster);
     if (serdeName != null) {
       var serde = serdes.serdeForName(serdeName)
           .orElseThrow(() -> new ValidationException(String.format("Serde '%s' not found", serdeName)));
@@ -85,7 +89,7 @@ public class DeserializationService implements Closeable {
                                                     @Nullable String valueSerdeName) {
     var keySerde = getSerdeForDeserialize(cluster, topic, Serde.Target.KEY, keySerdeName);
     var valueSerde = getSerdeForDeserialize(cluster, topic, Serde.Target.VALUE, valueSerdeName);
-    var fallbackSerde = clusterSerdes.get(cluster).getFallbackSerde();
+    var fallbackSerde = getSerdesFor(cluster).getFallbackSerde();
     return new ConsumerRecordDeserializer(
         keySerde.getName(),
         keySerde.deserializer(topic, Serde.Target.KEY),
@@ -100,7 +104,7 @@ public class DeserializationService implements Closeable {
   public List<SerdeDescriptionDTO> getSerdesForSerialize(KafkaCluster cluster,
                                                          String topic,
                                                          Serde.Target serdeType) {
-    var serdes = clusterSerdes.get(cluster);
+    var serdes = getSerdesFor(cluster);
     var preferred = serdes.suggestSerdeForSerialize(topic, serdeType);
     var result = new ArrayList<SerdeDescriptionDTO>();
     result.add(toDto(preferred, topic, serdeType, true));
@@ -114,7 +118,7 @@ public class DeserializationService implements Closeable {
   public List<SerdeDescriptionDTO> getSerdesForDeserialize(KafkaCluster cluster,
                                                            String topic,
                                                            Serde.Target serdeType) {
-    var serdes = clusterSerdes.get(cluster);
+    var serdes = getSerdesFor(cluster);
     var preferred = serdes.suggestSerdeForDeserialize(topic, serdeType);
     var result = new ArrayList<SerdeDescriptionDTO>();
     result.add(toDto(preferred, topic, serdeType, true));

+ 17 - 0
kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/MessagesService.java

@@ -13,6 +13,7 @@ import com.provectus.kafka.ui.model.KafkaCluster;
 import com.provectus.kafka.ui.model.MessageFilterTypeDTO;
 import com.provectus.kafka.ui.model.SeekDirectionDTO;
 import com.provectus.kafka.ui.model.TopicMessageEventDTO;
+import com.provectus.kafka.ui.serde.api.Serde;
 import com.provectus.kafka.ui.serdes.ConsumerRecordDeserializer;
 import com.provectus.kafka.ui.serdes.ProducerRecordCreator;
 import com.provectus.kafka.ui.util.ResultSizeLimiter;
@@ -21,6 +22,7 @@ import java.util.Map;
 import java.util.Properties;
 import java.util.concurrent.CompletableFuture;
 import java.util.function.Predicate;
+import java.util.function.UnaryOperator;
 import java.util.stream.Collectors;
 import javax.annotation.Nullable;
 import lombok.RequiredArgsConstructor;
@@ -177,6 +179,7 @@ public class MessagesService {
     return Flux.create(emitter)
         .contextWrite(ctx -> ctx.put(MessageFilterStats.class, filterStats))
         .filter(getMsgFilter(query, filterQueryType, filterStats))
+        .map(getDataMasker(cluster, topic))
         .takeWhile(createTakeWhilePredicate(seekDirection, limit))
         .subscribeOn(Schedulers.boundedElastic())
         .share();
@@ -189,6 +192,20 @@ public class MessagesService {
         : new ResultSizeLimiter(limit);
   }
 
+  private UnaryOperator<TopicMessageEventDTO> getDataMasker(KafkaCluster cluster, String topicName) {
+    var keyMasker = cluster.getMasking().getMaskingFunction(topicName, Serde.Target.KEY);
+    var valMasker = cluster.getMasking().getMaskingFunction(topicName, Serde.Target.VALUE);
+    return evt -> {
+      if (evt.getType() != TopicMessageEventDTO.TypeEnum.MESSAGE) {
+        return evt;
+      }
+      return evt.message(
+        evt.getMessage()
+            .key(keyMasker.apply(evt.getMessage().getKey()))
+            .content(valMasker.apply(evt.getMessage().getContent())));
+    };
+  }
+
   private Predicate<TopicMessageEventDTO> getMsgFilter(String query,
                                                        MessageFilterTypeDTO filterQueryType,
                                                        MessageFilterStats filterStats) {

+ 92 - 0
kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/masking/DataMasking.java

@@ -0,0 +1,92 @@
+package com.provectus.kafka.ui.service.masking;
+
+import static java.util.stream.Collectors.toList;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.json.JsonMapper;
+import com.fasterxml.jackson.databind.node.ContainerNode;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.provectus.kafka.ui.config.ClustersProperties;
+import com.provectus.kafka.ui.serde.api.Serde;
+import com.provectus.kafka.ui.service.masking.policies.MaskingPolicy;
+import java.util.List;
+import java.util.Optional;
+import java.util.function.UnaryOperator;
+import java.util.regex.Pattern;
+import javax.annotation.Nullable;
+import lombok.Value;
+import org.apache.commons.lang3.StringUtils;
+
+public class DataMasking {
+
+  private static final JsonMapper JSON_MAPPER = new JsonMapper();
+
+  @Value
+  static class Mask {
+    @Nullable
+    Pattern topicKeysPattern;
+    @Nullable
+    Pattern topicValuesPattern;
+
+    MaskingPolicy policy;
+
+    boolean shouldBeApplied(String topic, Serde.Target target) {
+      return target == Serde.Target.KEY
+          ? topicKeysPattern != null && topicKeysPattern.matcher(topic).matches()
+          : topicValuesPattern != null && topicValuesPattern.matcher(topic).matches();
+    }
+  }
+
+  private final List<Mask> masks;
+
+  public static DataMasking create(List<ClustersProperties.Masking> config) {
+    return new DataMasking(
+        config.stream().map(property -> {
+          Preconditions.checkNotNull(property.getType(), "masking type not specifed");
+          Preconditions.checkArgument(
+              StringUtils.isNotEmpty(property.getTopicKeysPattern())
+                  || StringUtils.isNotEmpty(property.getTopicValuesPattern()),
+              "topicKeysPattern or topicValuesPattern (or both) should be set for masking policy");
+          return new Mask(
+              Optional.ofNullable(property.getTopicKeysPattern()).map(Pattern::compile).orElse(null),
+              Optional.ofNullable(property.getTopicValuesPattern()).map(Pattern::compile).orElse(null),
+              MaskingPolicy.create(property)
+          );
+        }).collect(toList()));
+  }
+
+  @VisibleForTesting
+  DataMasking(List<Mask> masks) {
+    this.masks = masks;
+  }
+
+  public UnaryOperator<String> getMaskingFunction(String topic, Serde.Target target) {
+    var targetMasks = masks.stream().filter(m -> m.shouldBeApplied(topic, target)).collect(toList());
+    if (targetMasks.isEmpty()) {
+      return UnaryOperator.identity();
+    }
+    return inputStr -> {
+      if (inputStr == null) {
+        return null;
+      }
+      try {
+        JsonNode json = JSON_MAPPER.readTree(inputStr);
+        if (json.isContainerNode()) {
+          for (Mask targetMask : targetMasks) {
+            json = targetMask.policy.applyToJsonContainer((ContainerNode<?>) json);
+          }
+          return json.toString();
+        }
+      } catch (JsonProcessingException jsonException) {
+        //just ignore
+      }
+      // if we can't parse input as json or parsed json is not object/array
+      // we just apply first found policy
+      // (there is no need to apply all of them, because they will just override each other)
+      return targetMasks.get(0).policy.applyToString(inputStr);
+    };
+  }
+
+}

+ 94 - 0
kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/masking/policies/Mask.java

@@ -0,0 +1,94 @@
+package com.provectus.kafka.ui.service.masking.policies;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.node.ArrayNode;
+import com.fasterxml.jackson.databind.node.ContainerNode;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.fasterxml.jackson.databind.node.TextNode;
+import com.google.common.base.Preconditions;
+import java.util.List;
+import java.util.function.UnaryOperator;
+
+class Mask extends MaskingPolicy {
+
+  private final UnaryOperator<String> masker;
+
+  Mask(List<String> fieldNames, List<String> maskingChars) {
+    super(fieldNames);
+    this.masker = createMasker(maskingChars);
+  }
+
+  @Override
+  public ContainerNode<?> applyToJsonContainer(ContainerNode<?> node) {
+    return (ContainerNode<?>) maskWithFieldsCheck(node);
+  }
+
+  @Override
+  public String applyToString(String str) {
+    return masker.apply(str);
+  }
+
+  private static UnaryOperator<String> createMasker(List<String> maskingChars) {
+    Preconditions.checkNotNull(maskingChars);
+    Preconditions.checkArgument(maskingChars.size() == 4, "mask pattern should contain 4 elements");
+    return input -> {
+      StringBuilder sb = new StringBuilder(input.length());
+      for (int i = 0; i < input.length(); i++) {
+        int cp = input.codePointAt(i);
+        switch (Character.getType(cp)) {
+          case Character.SPACE_SEPARATOR:
+          case Character.LINE_SEPARATOR:
+          case Character.PARAGRAPH_SEPARATOR:
+            sb.appendCodePoint(cp); // keeping separators as-is
+            break;
+          case Character.UPPERCASE_LETTER:
+            sb.append(maskingChars.get(0));
+            break;
+          case Character.LOWERCASE_LETTER:
+            sb.append(maskingChars.get(1));
+            break;
+          case Character.DECIMAL_DIGIT_NUMBER:
+            sb.append(maskingChars.get(2));
+            break;
+          default:
+            sb.append(maskingChars.get(3));
+        }
+      }
+      return sb.toString();
+    };
+  }
+
+  private JsonNode maskWithFieldsCheck(JsonNode node) {
+    if (node.isObject()) {
+      ObjectNode obj = ((ObjectNode) node).objectNode();
+      node.fields().forEachRemaining(f -> {
+        String fieldName = f.getKey();
+        JsonNode fieldVal = f.getValue();
+        if (fieldShouldBeMasked(fieldName)) {
+          obj.set(fieldName, maskNodeRecursively(fieldVal));
+        } else {
+          obj.set(fieldName, maskWithFieldsCheck(fieldVal));
+        }
+      });
+      return obj;
+    } else if (node.isArray()) {
+      ArrayNode arr = ((ArrayNode) node).arrayNode(node.size());
+      node.elements().forEachRemaining(e -> arr.add(maskWithFieldsCheck(e)));
+      return arr;
+    }
+    return node;
+  }
+
+  private JsonNode maskNodeRecursively(JsonNode node) {
+    if (node.isObject()) {
+      ObjectNode obj = ((ObjectNode) node).objectNode();
+      node.fields().forEachRemaining(f -> obj.set(f.getKey(), maskNodeRecursively(f.getValue())));
+      return obj;
+    } else if (node.isArray()) {
+      ArrayNode arr = ((ArrayNode) node).arrayNode(node.size());
+      node.elements().forEachRemaining(e -> arr.add(maskNodeRecursively(e)));
+      return arr;
+    }
+    return new TextNode(masker.apply(node.asText()));
+  }
+}

+ 39 - 0
kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/masking/policies/MaskingPolicy.java

@@ -0,0 +1,39 @@
+package com.provectus.kafka.ui.service.masking.policies;
+
+import com.fasterxml.jackson.databind.node.ContainerNode;
+import com.google.common.base.Preconditions;
+import com.provectus.kafka.ui.config.ClustersProperties;
+import java.util.List;
+import lombok.RequiredArgsConstructor;
+
+@RequiredArgsConstructor
+public abstract class MaskingPolicy {
+
+  public static MaskingPolicy create(ClustersProperties.Masking property) {
+    Preconditions.checkNotNull(property.getFields());
+    switch (property.getType()) {
+      case REMOVE:
+        return new Remove(property.getFields());
+      case REPLACE:
+        return new Replace(property.getFields(), property.getReplacement());
+      case MASK:
+        return new Mask(property.getFields(), property.getPattern());
+      default:
+        throw new IllegalStateException("Unknown policy type: " + property.getType());
+    }
+  }
+
+  //----------------------------------------------------------------
+
+  // empty list means policy will be applied to all fields
+  private final List<String> fieldNames;
+
+  protected boolean fieldShouldBeMasked(String fieldName) {
+    return fieldNames.isEmpty() || fieldNames.contains(fieldName);
+  }
+
+  public abstract ContainerNode<?> applyToJsonContainer(ContainerNode<?> node);
+
+  public abstract String applyToString(String str);
+
+}

+ 43 - 0
kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/masking/policies/Remove.java

@@ -0,0 +1,43 @@
+package com.provectus.kafka.ui.service.masking.policies;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.node.ArrayNode;
+import com.fasterxml.jackson.databind.node.ContainerNode;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import java.util.List;
+
+class Remove extends MaskingPolicy {
+
+  Remove(List<String> fieldNames) {
+    super(fieldNames);
+  }
+
+  @Override
+  public String applyToString(String str) {
+    return "null";
+  }
+
+  @Override
+  public ContainerNode<?> applyToJsonContainer(ContainerNode<?> node) {
+    return (ContainerNode<?>) removeFields(node);
+  }
+
+  private JsonNode removeFields(JsonNode node) {
+    if (node.isObject()) {
+      ObjectNode obj = ((ObjectNode) node).objectNode();
+      node.fields().forEachRemaining(f -> {
+        String fieldName = f.getKey();
+        JsonNode fieldVal = f.getValue();
+        if (!fieldShouldBeMasked(fieldName)) {
+          obj.set(fieldName, removeFields(fieldVal));
+        }
+      });
+      return obj;
+    } else if (node.isArray()) {
+      var arr = ((ArrayNode) node).arrayNode(node.size());
+      node.elements().forEachRemaining(e -> arr.add(removeFields(e)));
+      return arr;
+    }
+    return node;
+  }
+}

+ 64 - 0
kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/masking/policies/Replace.java

@@ -0,0 +1,64 @@
+package com.provectus.kafka.ui.service.masking.policies;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.node.ArrayNode;
+import com.fasterxml.jackson.databind.node.ContainerNode;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.fasterxml.jackson.databind.node.TextNode;
+import com.google.common.base.Preconditions;
+import java.util.List;
+
+class Replace extends MaskingPolicy {
+
+  private final String replacement;
+
+  Replace(List<String> fieldNames, String replacementString) {
+    super(fieldNames);
+    this.replacement = Preconditions.checkNotNull(replacementString);
+  }
+
+  @Override
+  public String applyToString(String str) {
+    return replacement;
+  }
+
+  @Override
+  public ContainerNode<?> applyToJsonContainer(ContainerNode<?> node) {
+    return (ContainerNode<?>) replaceWithFieldsCheck(node);
+  }
+
+  private JsonNode replaceWithFieldsCheck(JsonNode node) {
+    if (node.isObject()) {
+      ObjectNode obj = ((ObjectNode) node).objectNode();
+      node.fields().forEachRemaining(f -> {
+        String fieldName = f.getKey();
+        JsonNode fieldVal = f.getValue();
+        if (fieldShouldBeMasked(fieldName)) {
+          obj.set(fieldName, replaceRecursive(fieldVal));
+        } else {
+          obj.set(fieldName, replaceWithFieldsCheck(fieldVal));
+        }
+      });
+      return obj;
+    } else if (node.isArray()) {
+      ArrayNode arr = ((ArrayNode) node).arrayNode(node.size());
+      node.elements().forEachRemaining(e -> arr.add(replaceWithFieldsCheck(e)));
+      return arr;
+    }
+    // if it is not an object or array - we have nothing to replace here
+    return node;
+  }
+
+  private JsonNode replaceRecursive(JsonNode node) {
+    if (node.isObject()) {
+      ObjectNode obj = ((ObjectNode) node).objectNode();
+      node.fields().forEachRemaining(f -> obj.set(f.getKey(), replaceRecursive(f.getValue())));
+      return obj;
+    } else if (node.isArray()) {
+      ArrayNode arr = ((ArrayNode) node).arrayNode(node.size());
+      node.elements().forEachRemaining(e -> arr.add(replaceRecursive(e)));
+      return arr;
+    }
+    return new TextNode(replacement);
+  }
+}

+ 3 - 0
kafka-ui-api/src/test/java/com/provectus/kafka/ui/AbstractIntegrationTest.java

@@ -66,6 +66,9 @@ public abstract class AbstractIntegrationTest {
       System.setProperty("kafka.clusters.0.kafkaConnect.0.userName", "kafka-connect");
       System.setProperty("kafka.clusters.0.kafkaConnect.0.password", "kafka-connect");
       System.setProperty("kafka.clusters.0.kafkaConnect.0.address", kafkaConnect.getTarget());
+      System.setProperty("kafka.clusters.0.masking.0.type", "REPLACE");
+      System.setProperty("kafka.clusters.0.masking.0.replacement", "***");
+      System.setProperty("kafka.clusters.0.masking.0.topicValuesPattern", "masking-test-.*");
 
       System.setProperty("kafka.clusters.1.name", SECOND_LOCAL);
       System.setProperty("kafka.clusters.1.readOnly", "true");

+ 41 - 0
kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/MessagesServiceTest.java

@@ -2,17 +2,27 @@ package com.provectus.kafka.ui.service;
 
 import com.provectus.kafka.ui.AbstractIntegrationTest;
 import com.provectus.kafka.ui.exception.TopicNotFoundException;
+import com.provectus.kafka.ui.model.ConsumerPosition;
 import com.provectus.kafka.ui.model.CreateTopicMessageDTO;
 import com.provectus.kafka.ui.model.KafkaCluster;
+import com.provectus.kafka.ui.model.SeekDirectionDTO;
+import com.provectus.kafka.ui.model.SeekTypeDTO;
+import com.provectus.kafka.ui.model.TopicMessageDTO;
+import com.provectus.kafka.ui.model.TopicMessageEventDTO;
+import com.provectus.kafka.ui.producer.KafkaTestProducer;
+import com.provectus.kafka.ui.serdes.builtin.StringSerde;
 import java.util.List;
 import java.util.UUID;
+import org.apache.kafka.clients.admin.NewTopic;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.springframework.beans.factory.annotation.Autowired;
+import reactor.core.publisher.Flux;
 import reactor.test.StepVerifier;
 
 class MessagesServiceTest extends AbstractIntegrationTest {
 
+  private static final String MASKED_TOPICS_PREFIX = "masking-test-";
   private static final String NON_EXISTING_TOPIC = UUID.randomUUID().toString();
 
   @Autowired
@@ -50,4 +60,35 @@ class MessagesServiceTest extends AbstractIntegrationTest {
         .verify();
   }
 
+  @Test
+  void maskingAppliedOnConfiguredClusters() {
+    String testTopic = MASKED_TOPICS_PREFIX + UUID.randomUUID();
+    try (var producer = KafkaTestProducer.forKafka(kafka)) {
+      createTopic(new NewTopic(testTopic, 1, (short) 1));
+      producer.send(testTopic, "message1");
+      producer.send(testTopic, "message2");
+
+      Flux<TopicMessageDTO> msgsFlux = messagesService.loadMessages(
+          cluster,
+          testTopic,
+          new ConsumerPosition(SeekTypeDTO.BEGINNING, testTopic, null),
+          null,
+          null,
+          100,
+          SeekDirectionDTO.FORWARD,
+          StringSerde.name(),
+          StringSerde.name()
+      ).filter(evt -> evt.getType() == TopicMessageEventDTO.TypeEnum.MESSAGE)
+          .map(TopicMessageEventDTO::getMessage);
+
+      // both messages should be masked
+      StepVerifier.create(msgsFlux)
+          .expectNextMatches(msg -> msg.getContent().equals("***"))
+          .expectNextMatches(msg -> msg.getContent().equals("***"))
+          .verifyComplete();
+    } finally {
+      deleteTopic(testTopic);
+    }
+  }
+
 }

+ 89 - 0
kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/masking/DataMaskingTest.java

@@ -0,0 +1,89 @@
+package com.provectus.kafka.ui.service.masking;
+
+import static org.mockito.Mockito.eq;
+import static org.mockito.Mockito.reset;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoInteractions;
+
+import com.fasterxml.jackson.databind.json.JsonMapper;
+import com.fasterxml.jackson.databind.node.ContainerNode;
+import com.provectus.kafka.ui.config.ClustersProperties;
+import com.provectus.kafka.ui.serde.api.Serde;
+import com.provectus.kafka.ui.service.masking.policies.MaskingPolicy;
+import java.util.List;
+import java.util.regex.Pattern;
+import lombok.SneakyThrows;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
+
+class DataMaskingTest {
+
+  private static final String TOPIC = "test_topic";
+
+  private DataMasking masking;
+
+  private MaskingPolicy policy1;
+  private MaskingPolicy policy2;
+  private MaskingPolicy policy3;
+
+  @BeforeEach
+  void init() {
+    policy1 = spy(createMaskPolicy());
+    policy2 = spy(createMaskPolicy());
+    policy3 = spy(createMaskPolicy());
+
+    masking = new DataMasking(
+        List.of(
+            new DataMasking.Mask(Pattern.compile(TOPIC), null, policy1),
+            new DataMasking.Mask(null, Pattern.compile(TOPIC), policy2),
+            new DataMasking.Mask(null, Pattern.compile(TOPIC + "|otherTopic"), policy3)));
+  }
+
+  private MaskingPolicy createMaskPolicy() {
+    var props = new ClustersProperties.Masking();
+    props.setType(ClustersProperties.Masking.Type.REMOVE);
+    return MaskingPolicy.create(props);
+  }
+
+  @ParameterizedTest
+  @ValueSource(strings = {
+      "{\"some\": \"json\"}",
+      "[ {\"json\": \"array\"} ]"
+  })
+  @SneakyThrows
+  void appliesMasksToJsonContainerArgsBasedOnTopicPatterns(String jsonObjOrArr) {
+    var parsedJson = (ContainerNode<?>) new JsonMapper().readTree(jsonObjOrArr);
+
+    masking.getMaskingFunction(TOPIC, Serde.Target.KEY).apply(jsonObjOrArr);
+    verify(policy1).applyToJsonContainer(eq(parsedJson));
+    verifyNoInteractions(policy2, policy3);
+
+    reset(policy1, policy2, policy3);
+
+    masking.getMaskingFunction(TOPIC, Serde.Target.VALUE).apply(jsonObjOrArr);
+    verify(policy2).applyToJsonContainer(eq(parsedJson));
+    verify(policy3).applyToJsonContainer(eq(policy2.applyToJsonContainer(parsedJson)));
+    verifyNoInteractions(policy1);
+  }
+
+  @ParameterizedTest
+  @ValueSource(strings = {
+      "non json str",
+      "234",
+      "null"
+  })
+  void appliesFirstFoundMaskToStringArgsBasedOnTopicPatterns(String nonJsonObjOrArrString) {
+    masking.getMaskingFunction(TOPIC, Serde.Target.KEY).apply(nonJsonObjOrArrString);
+    verify(policy1).applyToString(eq(nonJsonObjOrArrString));
+    verifyNoInteractions(policy2, policy3);
+
+    reset(policy1, policy2, policy3);
+
+    masking.getMaskingFunction(TOPIC, Serde.Target.VALUE).apply(nonJsonObjOrArrString);
+    verify(policy2).applyToString(eq(nonJsonObjOrArrString));
+    verifyNoInteractions(policy1, policy3);
+  }
+
+}

+ 68 - 0
kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/masking/policies/MaskTest.java

@@ -0,0 +1,68 @@
+package com.provectus.kafka.ui.service.masking.policies;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.json.JsonMapper;
+import com.fasterxml.jackson.databind.node.ContainerNode;
+import java.util.List;
+import java.util.stream.Stream;
+import lombok.SneakyThrows;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.CsvSource;
+import org.junit.jupiter.params.provider.MethodSource;
+
+class MaskTest {
+
+  private static final List<String> TARGET_FIELDS = List.of("id", "name");
+  private static final List<String> PATTERN = List.of("X", "x", "n", "-");
+
+  @ParameterizedTest
+  @MethodSource
+  void testApplyToJsonContainer(List<String> fields, ContainerNode<?> original, ContainerNode<?> expected) {
+    Mask policy = new Mask(fields, PATTERN);
+    assertThat(policy.applyToJsonContainer(original)).isEqualTo(expected);
+  }
+
+  private static Stream<Arguments> testApplyToJsonContainer() {
+    return Stream.of(
+        Arguments.of(
+            TARGET_FIELDS,
+            parse("{ \"id\": 123, \"name\": { \"first\": \"James\", \"surname\": \"Bond777!\"}}"),
+            parse("{ \"id\": \"nnn\", \"name\": { \"first\": \"Xxxxx\", \"surname\": \"Xxxxnnn-\"}}")
+        ),
+        Arguments.of(
+            TARGET_FIELDS,
+            parse("[{ \"id\": 123, \"f2\": 234}, { \"name\": \"1.2\", \"f2\": 345} ]"),
+            parse("[{ \"id\": \"nnn\", \"f2\": 234}, { \"name\": \"n-n\", \"f2\": 345} ]")
+        ),
+        Arguments.of(
+            TARGET_FIELDS,
+            parse("{ \"outer\": { \"f1\": \"James\", \"name\": \"Bond777!\"}}"),
+            parse("{ \"outer\": { \"f1\": \"James\", \"name\": \"Xxxxnnn-\"}}")
+        ),
+        Arguments.of(
+            List.of(),
+            parse("{ \"outer\": { \"f1\": \"James\", \"name\": \"Bond777!\"}}"),
+            parse("{ \"outer\": { \"f1\": \"Xxxxx\", \"name\": \"Xxxxnnn-\"}}")
+        )
+    );
+  }
+
+  @ParameterizedTest
+  @CsvSource({
+      "Some string?!1, Xxxx xxxxxx--n",
+      "1.24343, n-nnnnn",
+      "null, xxxx"
+  })
+  void testApplyToString(String original, String expected) {
+    Mask policy = new Mask(List.of(), PATTERN);
+    assertThat(policy.applyToString(original)).isEqualTo(expected);
+  }
+
+  @SneakyThrows
+  private static JsonNode parse(String str) {
+    return new JsonMapper().readTree(str);
+  }
+}

+ 72 - 0
kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/masking/policies/RemoveTest.java

@@ -0,0 +1,72 @@
+package com.provectus.kafka.ui.service.masking.policies;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.json.JsonMapper;
+import com.fasterxml.jackson.databind.node.ContainerNode;
+import java.util.List;
+import java.util.stream.Stream;
+import lombok.SneakyThrows;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.CsvSource;
+import org.junit.jupiter.params.provider.MethodSource;
+
+class RemoveTest {
+
+  private static final List<String> TARGET_FIELDS = List.of("id", "name");
+
+  @ParameterizedTest
+  @MethodSource
+  void testApplyToJsonContainer(List<String> fields, ContainerNode<?> original, ContainerNode<?>  expected) {
+    var policy = new Remove(fields);
+    assertThat(policy.applyToJsonContainer(original)).isEqualTo(expected);
+  }
+
+  private static Stream<Arguments> testApplyToJsonContainer() {
+    return Stream.of(
+        Arguments.of(
+            TARGET_FIELDS,
+            parse("{ \"id\": 123, \"name\": { \"first\": \"James\", \"surname\": \"Bond777!\"}}"),
+            parse("{}")
+        ),
+        Arguments.of(
+            TARGET_FIELDS,
+            parse("[{ \"id\": 123, \"f2\": 234}, { \"name\": \"1.2\", \"f2\": 345} ]"),
+            parse("[{ \"f2\": 234}, { \"f2\": 345} ]")
+        ),
+        Arguments.of(
+            TARGET_FIELDS,
+            parse("{ \"outer\": { \"f1\": \"James\", \"name\": \"Bond777!\"}}"),
+            parse("{ \"outer\": { \"f1\": \"James\"}}")
+        ),
+        Arguments.of(
+            List.of(),
+            parse("{ \"outer\": { \"f1\": \"v1\", \"f2\": \"v2\", \"inner\" : {\"if1\": \"iv1\"}}}"),
+            parse("{}")
+        ),
+        Arguments.of(
+            List.of(),
+            parse("[{ \"f1\": 123}, { \"f2\": \"1.2\"} ]"),
+            parse("[{}, {}]")
+        )
+    );
+  }
+
+  @SneakyThrows
+  private static JsonNode parse(String str) {
+    return new JsonMapper().readTree(str);
+  }
+
+  @ParameterizedTest
+  @CsvSource({
+      "Some string?!1, null",
+      "1.24343, null",
+      "null, null"
+  })
+  void testApplyToString(String original, String expected) {
+    var policy = new Remove(List.of());
+    assertThat(policy.applyToString(original)).isEqualTo(expected);
+  }
+}

+ 68 - 0
kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/masking/policies/ReplaceTest.java

@@ -0,0 +1,68 @@
+package com.provectus.kafka.ui.service.masking.policies;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.json.JsonMapper;
+import com.fasterxml.jackson.databind.node.ContainerNode;
+import java.util.List;
+import java.util.stream.Stream;
+import lombok.SneakyThrows;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.CsvSource;
+import org.junit.jupiter.params.provider.MethodSource;
+
+class ReplaceTest {
+
+  private static final List<String> TARGET_FIELDS = List.of("id", "name");
+  private static final String REPLACEMENT_STRING = "***";
+
+  @ParameterizedTest
+  @MethodSource
+  void testApplyToJsonContainer(List<String> fields, ContainerNode<?> original, ContainerNode<?>  expected) {
+    var policy = new Replace(fields, REPLACEMENT_STRING);
+    assertThat(policy.applyToJsonContainer(original)).isEqualTo(expected);
+  }
+
+  private static Stream<Arguments> testApplyToJsonContainer() {
+    return Stream.of(
+        Arguments.of(
+            TARGET_FIELDS,
+            parse("{ \"id\": 123, \"name\": { \"first\": \"James\", \"surname\": \"Bond777!\"}}"),
+            parse("{ \"id\": \"***\", \"name\": { \"first\": \"***\", \"surname\": \"***\"}}")
+        ),
+        Arguments.of(
+            TARGET_FIELDS,
+            parse("[{ \"id\": 123, \"f2\": 234}, { \"name\": \"1.2\", \"f2\": 345} ]"),
+            parse("[{ \"id\": \"***\", \"f2\": 234}, { \"name\": \"***\", \"f2\": 345} ]")
+        ),
+        Arguments.of(
+            TARGET_FIELDS,
+            parse("{ \"outer\": { \"f1\": \"James\", \"name\": \"Bond777!\"}}"),
+            parse("{ \"outer\": { \"f1\": \"James\", \"name\": \"***\"}}")
+        ),
+        Arguments.of(
+            List.of(),
+            parse("{ \"outer\": { \"f1\": \"v1\", \"f2\": \"v2\", \"inner\" : {\"if1\": \"iv1\"}}}"),
+            parse("{ \"outer\": { \"f1\": \"***\", \"f2\": \"***\", \"inner\" : {\"if1\": \"***\"}}}}")
+        )
+    );
+  }
+
+  @SneakyThrows
+  private static JsonNode parse(String str) {
+    return new JsonMapper().readTree(str);
+  }
+
+  @ParameterizedTest
+  @CsvSource({
+      "Some string?!1, ***",
+      "1.24343, ***",
+      "null, ***"
+  })
+  void testApplyToString(String original, String expected) {
+    var policy = new Replace(List.of(), REPLACEMENT_STRING);
+    assertThat(policy.applyToString(original)).isEqualTo(expected);
+  }
+}