master merge
This commit is contained in:
parent
68ee46d789
commit
5e5ac5ff86
3 changed files with 86 additions and 36 deletions
|
@ -95,11 +95,12 @@ public class MessagesController extends AbstractController implements MessagesAp
|
|||
String valueSerde,
|
||||
String cursor,
|
||||
ServerWebExchange exchange) {
|
||||
final Mono<Void> validateAccess = accessControlService.validateAccess(AccessContext.builder()
|
||||
var context = AccessContext.builder()
|
||||
.cluster(clusterName)
|
||||
.topic(topicName)
|
||||
.topicActions(MESSAGES_READ)
|
||||
.build());
|
||||
.operationName("getTopicMessages")
|
||||
.build();
|
||||
|
||||
Flux<TopicMessageEventDTO> messagesFlux;
|
||||
if (cursor != null) {
|
||||
|
@ -116,7 +117,9 @@ public class MessagesController extends AbstractController implements MessagesAp
|
|||
valueSerde
|
||||
);
|
||||
}
|
||||
return validateAccess.then(Mono.just(ResponseEntity.ok(messagesFlux)));
|
||||
return accessControlService.validateAccess(context)
|
||||
.then(Mono.just(ResponseEntity.ok(messagesFlux)))
|
||||
.doOnEach(sig -> auditService.audit(context, sig));
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -17,13 +17,20 @@ import com.provectus.kafka.ui.exception.ValidationException;
|
|||
import com.provectus.kafka.ui.model.ConsumerPosition;
|
||||
import com.provectus.kafka.ui.model.CreateTopicMessageDTO;
|
||||
import com.provectus.kafka.ui.model.KafkaCluster;
|
||||
import com.provectus.kafka.ui.model.MessageFilterTypeDTO;
|
||||
import com.provectus.kafka.ui.model.PollingModeDTO;
|
||||
import com.provectus.kafka.ui.model.SeekDirectionDTO;
|
||||
import com.provectus.kafka.ui.model.SmartFilterTestExecutionDTO;
|
||||
import com.provectus.kafka.ui.model.SmartFilterTestExecutionResultDTO;
|
||||
import com.provectus.kafka.ui.model.TopicMessageDTO;
|
||||
import com.provectus.kafka.ui.model.TopicMessageEventDTO;
|
||||
import com.provectus.kafka.ui.serde.api.Serde;
|
||||
import com.provectus.kafka.ui.serdes.ConsumerRecordDeserializer;
|
||||
import com.provectus.kafka.ui.serdes.ProducerRecordCreator;
|
||||
import com.provectus.kafka.ui.util.SslPropertiesUtil;
|
||||
import java.time.Instant;
|
||||
import java.time.OffsetDateTime;
|
||||
import java.time.ZoneOffset;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
|
@ -94,6 +101,37 @@ public class MessagesService {
|
|||
.switchIfEmpty(Mono.error(new TopicNotFoundException()));
|
||||
}
|
||||
|
||||
public static SmartFilterTestExecutionResultDTO execSmartFilterTest(SmartFilterTestExecutionDTO execData) {
|
||||
Predicate<TopicMessageDTO> predicate;
|
||||
try {
|
||||
predicate = MessageFilters.groovyScriptFilter(execData.getFilterCode());
|
||||
} catch (Exception e) {
|
||||
log.info("Smart filter '{}' compilation error", execData.getFilterCode(), e);
|
||||
return new SmartFilterTestExecutionResultDTO()
|
||||
.error("Compilation error : " + e.getMessage());
|
||||
}
|
||||
try {
|
||||
var result = predicate.test(
|
||||
new TopicMessageDTO()
|
||||
.key(execData.getKey())
|
||||
.content(execData.getValue())
|
||||
.headers(execData.getHeaders())
|
||||
.offset(execData.getOffset())
|
||||
.partition(execData.getPartition())
|
||||
.timestamp(
|
||||
Optional.ofNullable(execData.getTimestampMs())
|
||||
.map(ts -> OffsetDateTime.ofInstant(Instant.ofEpochMilli(ts), ZoneOffset.UTC))
|
||||
.orElse(null))
|
||||
);
|
||||
return new SmartFilterTestExecutionResultDTO()
|
||||
.result(result);
|
||||
} catch (Exception e) {
|
||||
log.info("Smart filter {} execution error", execData, e);
|
||||
return new SmartFilterTestExecutionResultDTO()
|
||||
.error("Execution error : " + e.getMessage());
|
||||
}
|
||||
}
|
||||
|
||||
public Mono<Void> deleteTopicMessages(KafkaCluster cluster, String topicName,
|
||||
List<Integer> partitionsToInclude) {
|
||||
return withExistingTopic(cluster, topicName)
|
||||
|
@ -140,13 +178,7 @@ public class MessagesService {
|
|||
msg.getValueSerde().get()
|
||||
);
|
||||
|
||||
Properties properties = new Properties();
|
||||
SslPropertiesUtil.addKafkaSslProperties(cluster.getOriginalProperties().getSsl(), properties);
|
||||
properties.putAll(cluster.getProperties());
|
||||
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, cluster.getBootstrapServers());
|
||||
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);
|
||||
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);
|
||||
try (KafkaProducer<byte[], byte[]> producer = new KafkaProducer<>(properties)) {
|
||||
try (KafkaProducer<byte[], byte[]> producer = createProducer(cluster, Map.of())) {
|
||||
ProducerRecord<byte[], byte[]> producerRecord = producerRecordCreator.create(
|
||||
topicDescription.name(),
|
||||
msg.getPartition(),
|
||||
|
@ -168,34 +200,26 @@ public class MessagesService {
|
|||
}
|
||||
}
|
||||
|
||||
private int fixPageSize(@Nullable Integer pageSize) {
|
||||
return Optional.ofNullable(pageSize)
|
||||
.filter(ps -> ps > 0 && ps <= maxPageSize)
|
||||
.orElse(defaultPageSize);
|
||||
}
|
||||
|
||||
private UnaryOperator<TopicMessageEventDTO> getDataMasker(KafkaCluster cluster, String topicName) {
|
||||
var keyMasker = cluster.getMasking().getMaskingFunction(topicName, Serde.Target.KEY);
|
||||
var valMasker = cluster.getMasking().getMaskingFunction(topicName, Serde.Target.VALUE);
|
||||
return evt -> {
|
||||
if (evt.getType() != TopicMessageEventDTO.TypeEnum.MESSAGE) {
|
||||
return evt;
|
||||
}
|
||||
return evt.message(
|
||||
evt.getMessage()
|
||||
.key(keyMasker.apply(evt.getMessage().getKey()))
|
||||
.content(valMasker.apply(evt.getMessage().getContent())));
|
||||
};
|
||||
public static KafkaProducer<byte[], byte[]> createProducer(KafkaCluster cluster,
|
||||
Map<String, Object> additionalProps) {
|
||||
Properties properties = new Properties();
|
||||
SslPropertiesUtil.addKafkaSslProperties(cluster.getOriginalProperties().getSsl(), properties);
|
||||
properties.putAll(cluster.getProperties());
|
||||
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, cluster.getBootstrapServers());
|
||||
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);
|
||||
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);
|
||||
properties.putAll(additionalProps);
|
||||
return new KafkaProducer<>(properties);
|
||||
}
|
||||
|
||||
public Flux<TopicMessageEventDTO> loadMessages(KafkaCluster cluster,
|
||||
String topic,
|
||||
ConsumerPosition consumerPosition,
|
||||
@Nullable String containsStringFilter,
|
||||
@Nullable String filterId,
|
||||
@Nullable Integer limit,
|
||||
@Nullable String keySerde,
|
||||
@Nullable String valueSerde) {
|
||||
String topic,
|
||||
ConsumerPosition consumerPosition,
|
||||
@Nullable String containsStringFilter,
|
||||
@Nullable String filterId,
|
||||
@Nullable Integer limit,
|
||||
@Nullable String keySerde,
|
||||
@Nullable String valueSerde) {
|
||||
return loadMessages(
|
||||
cluster,
|
||||
topic,
|
||||
|
@ -228,7 +252,7 @@ public class MessagesService {
|
|||
return withExistingTopic(cluster, topic)
|
||||
.flux()
|
||||
.publishOn(Schedulers.boundedElastic())
|
||||
.flatMap(td -> loadMessagesImpl(cluster, topic, deserializer, consumerPosition, filter, fixPageSize(limit)));
|
||||
.flatMap(td -> loadMessagesImpl(cluster, topic, deserializer, consumerPosition, filter, limit));
|
||||
}
|
||||
|
||||
private Flux<TopicMessageEventDTO> loadMessagesImpl(KafkaCluster cluster,
|
||||
|
@ -271,6 +295,12 @@ public class MessagesService {
|
|||
.map(throttleUiPublish(consumerPosition.pollingMode()));
|
||||
}
|
||||
|
||||
private int fixPageSize(@Nullable Integer pageSize) {
|
||||
return Optional.ofNullable(pageSize)
|
||||
.filter(ps -> ps > 0 && ps <= maxPageSize)
|
||||
.orElse(defaultPageSize);
|
||||
}
|
||||
|
||||
public String registerMessageFilter(String groovyCode) {
|
||||
String saltedCode = groovyCode + SALT_FOR_HASHING;
|
||||
String filterId = Hashing.sha256()
|
||||
|
@ -283,6 +313,20 @@ public class MessagesService {
|
|||
return filterId;
|
||||
}
|
||||
|
||||
private UnaryOperator<TopicMessageEventDTO> getDataMasker(KafkaCluster cluster, String topicName) {
|
||||
var keyMasker = cluster.getMasking().getMaskingFunction(topicName, Serde.Target.KEY);
|
||||
var valMasker = cluster.getMasking().getMaskingFunction(topicName, Serde.Target.VALUE);
|
||||
return evt -> {
|
||||
if (evt.getType() != TopicMessageEventDTO.TypeEnum.MESSAGE) {
|
||||
return evt;
|
||||
}
|
||||
return evt.message(
|
||||
evt.getMessage()
|
||||
.key(keyMasker.apply(evt.getMessage().getKey()))
|
||||
.content(valMasker.apply(evt.getMessage().getContent())));
|
||||
};
|
||||
}
|
||||
|
||||
private Predicate<TopicMessageDTO> getMsgFilter(@Nullable String containsStrFilter,
|
||||
@Nullable String smartFilterId) {
|
||||
Predicate<TopicMessageDTO> messageFilter = MessageFilters.noop();
|
||||
|
|
|
@ -1,5 +1,6 @@
|
|||
package com.provectus.kafka.ui.service;
|
||||
|
||||
import static com.provectus.kafka.ui.service.MessagesService.execSmartFilterTest;
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
|
||||
import com.provectus.kafka.ui.AbstractIntegrationTest;
|
||||
|
@ -8,12 +9,14 @@ import com.provectus.kafka.ui.model.ConsumerPosition;
|
|||
import com.provectus.kafka.ui.model.CreateTopicMessageDTO;
|
||||
import com.provectus.kafka.ui.model.KafkaCluster;
|
||||
import com.provectus.kafka.ui.model.PollingModeDTO;
|
||||
import com.provectus.kafka.ui.model.SmartFilterTestExecutionDTO;
|
||||
import com.provectus.kafka.ui.model.TopicMessageDTO;
|
||||
import com.provectus.kafka.ui.model.TopicMessageEventDTO;
|
||||
import com.provectus.kafka.ui.producer.KafkaTestProducer;
|
||||
import com.provectus.kafka.ui.serdes.builtin.StringSerde;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.UUID;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
|
|
Loading…
Add table
Reference in a new issue