Data masking logic moved to ConsumerRecordDeserializer
This commit is contained in:
parent
1e9e7a0b11
commit
4d536297a6
5 changed files with 29 additions and 34 deletions
|
@ -1,6 +1,7 @@
|
|||
package com.provectus.kafka.ui.serdes;
|
||||
|
||||
import com.provectus.kafka.ui.model.TopicMessageDTO;
|
||||
import com.provectus.kafka.ui.model.TopicMessageDTO.TimestampTypeEnum;
|
||||
import com.provectus.kafka.ui.serde.api.Serde;
|
||||
import java.time.Instant;
|
||||
import java.time.OffsetDateTime;
|
||||
|
@ -8,6 +9,7 @@ import java.time.ZoneId;
|
|||
import java.util.Arrays;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.function.UnaryOperator;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.apache.kafka.clients.consumer.ConsumerRecord;
|
||||
|
@ -32,6 +34,8 @@ public class ConsumerRecordDeserializer {
|
|||
private final Serde.Deserializer fallbackKeyDeserializer;
|
||||
private final Serde.Deserializer fallbackValueDeserializer;
|
||||
|
||||
private final UnaryOperator<TopicMessageDTO> masker;
|
||||
|
||||
public TopicMessageDTO deserialize(ConsumerRecord<Bytes, Bytes> rec) {
|
||||
var message = new TopicMessageDTO();
|
||||
fillKey(message, rec);
|
||||
|
@ -47,20 +51,15 @@ public class ConsumerRecordDeserializer {
|
|||
message.setValueSize(getValueSize(rec));
|
||||
message.setHeadersSize(getHeadersSize(rec));
|
||||
|
||||
return message;
|
||||
return masker.apply(message);
|
||||
}
|
||||
|
||||
private static TopicMessageDTO.TimestampTypeEnum mapToTimestampType(TimestampType timestampType) {
|
||||
switch (timestampType) {
|
||||
case CREATE_TIME:
|
||||
return TopicMessageDTO.TimestampTypeEnum.CREATE_TIME;
|
||||
case LOG_APPEND_TIME:
|
||||
return TopicMessageDTO.TimestampTypeEnum.LOG_APPEND_TIME;
|
||||
case NO_TIMESTAMP_TYPE:
|
||||
return TopicMessageDTO.TimestampTypeEnum.NO_TIMESTAMP_TYPE;
|
||||
default:
|
||||
throw new IllegalArgumentException("Unknown timestampType: " + timestampType);
|
||||
}
|
||||
private static TimestampTypeEnum mapToTimestampType(TimestampType timestampType) {
|
||||
return switch (timestampType) {
|
||||
case CREATE_TIME -> TimestampTypeEnum.CREATE_TIME;
|
||||
case LOG_APPEND_TIME -> TimestampTypeEnum.LOG_APPEND_TIME;
|
||||
case NO_TIMESTAMP_TYPE -> TimestampTypeEnum.NO_TIMESTAMP_TYPE;
|
||||
};
|
||||
}
|
||||
|
||||
private void fillHeaders(TopicMessageDTO message, ConsumerRecord<Bytes, Bytes> rec) {
|
||||
|
|
|
@ -102,7 +102,8 @@ public class DeserializationService implements Closeable {
|
|||
valueSerde.deserializer(topic, Serde.Target.VALUE),
|
||||
fallbackSerde.getName(),
|
||||
fallbackSerde.deserializer(topic, Serde.Target.KEY),
|
||||
fallbackSerde.deserializer(topic, Serde.Target.VALUE)
|
||||
fallbackSerde.deserializer(topic, Serde.Target.VALUE),
|
||||
cluster.getMasking().getMaskerForTopic(topic)
|
||||
);
|
||||
}
|
||||
|
||||
|
|
|
@ -247,24 +247,9 @@ public class MessagesService {
|
|||
);
|
||||
};
|
||||
return Flux.create(emitter)
|
||||
.map(getDataMasker(cluster, topic))
|
||||
.map(throttleUiPublish(seekDirection));
|
||||
}
|
||||
|
||||
private UnaryOperator<TopicMessageEventDTO> getDataMasker(KafkaCluster cluster, String topicName) {
|
||||
var keyMasker = cluster.getMasking().getMaskingFunction(topicName, Serde.Target.KEY);
|
||||
var valMasker = cluster.getMasking().getMaskingFunction(topicName, Serde.Target.VALUE);
|
||||
return evt -> {
|
||||
if (evt.getType() != TopicMessageEventDTO.TypeEnum.MESSAGE) {
|
||||
return evt;
|
||||
}
|
||||
return evt.message(
|
||||
evt.getMessage()
|
||||
.key(keyMasker.apply(evt.getMessage().getKey()))
|
||||
.content(valMasker.apply(evt.getMessage().getContent())));
|
||||
};
|
||||
}
|
||||
|
||||
private Predicate<TopicMessageDTO> getMsgFilter(String query,
|
||||
MessageFilterTypeDTO filterQueryType) {
|
||||
if (StringUtils.isEmpty(query)) {
|
||||
|
|
|
@ -1,7 +1,5 @@
|
|||
package com.provectus.kafka.ui.service.masking;
|
||||
|
||||
import static java.util.stream.Collectors.toList;
|
||||
|
||||
import com.fasterxml.jackson.core.JsonProcessingException;
|
||||
import com.fasterxml.jackson.databind.JsonNode;
|
||||
import com.fasterxml.jackson.databind.json.JsonMapper;
|
||||
|
@ -9,6 +7,7 @@ import com.fasterxml.jackson.databind.node.ContainerNode;
|
|||
import com.google.common.annotations.VisibleForTesting;
|
||||
import com.google.common.base.Preconditions;
|
||||
import com.provectus.kafka.ui.config.ClustersProperties;
|
||||
import com.provectus.kafka.ui.model.TopicMessageDTO;
|
||||
import com.provectus.kafka.ui.serde.api.Serde;
|
||||
import com.provectus.kafka.ui.service.masking.policies.MaskingPolicy;
|
||||
import java.util.List;
|
||||
|
@ -54,7 +53,8 @@ public class DataMasking {
|
|||
Optional.ofNullable(property.getTopicValuesPattern()).map(Pattern::compile).orElse(null),
|
||||
MaskingPolicy.create(property)
|
||||
);
|
||||
}).collect(toList()));
|
||||
}).toList()
|
||||
);
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
|
@ -62,8 +62,17 @@ public class DataMasking {
|
|||
this.masks = masks;
|
||||
}
|
||||
|
||||
public UnaryOperator<String> getMaskingFunction(String topic, Serde.Target target) {
|
||||
var targetMasks = masks.stream().filter(m -> m.shouldBeApplied(topic, target)).collect(toList());
|
||||
public UnaryOperator<TopicMessageDTO> getMaskerForTopic(String topic) {
|
||||
var keyMasker = getMaskingFunction(topic, Serde.Target.KEY);
|
||||
var valMasker = getMaskingFunction(topic, Serde.Target.VALUE);
|
||||
return msg -> msg
|
||||
.key(keyMasker.apply(msg.getKey()))
|
||||
.content(valMasker.apply(msg.getContent()));
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
UnaryOperator<String> getMaskingFunction(String topic, Serde.Target target) {
|
||||
var targetMasks = masks.stream().filter(m -> m.shouldBeApplied(topic, target)).toList();
|
||||
if (targetMasks.isEmpty()) {
|
||||
return UnaryOperator.identity();
|
||||
}
|
||||
|
|
|
@ -106,7 +106,8 @@ class RecordEmitterTest extends AbstractIntegrationTest {
|
|||
s.deserializer(null, Serde.Target.VALUE),
|
||||
StringSerde.name(),
|
||||
s.deserializer(null, Serde.Target.KEY),
|
||||
s.deserializer(null, Serde.Target.VALUE)
|
||||
s.deserializer(null, Serde.Target.VALUE),
|
||||
msg -> msg
|
||||
);
|
||||
}
|
||||
|
||||
|
|
Loading…
Add table
Reference in a new issue