|
@@ -17,7 +17,11 @@ import com.provectus.kafka.ui.exception.ValidationException;
|
|
|
import com.provectus.kafka.ui.model.ConsumerPosition;
|
|
|
import com.provectus.kafka.ui.model.CreateTopicMessageDTO;
|
|
|
import com.provectus.kafka.ui.model.KafkaCluster;
|
|
|
+import com.provectus.kafka.ui.model.MessageFilterTypeDTO;
|
|
|
import com.provectus.kafka.ui.model.PollingModeDTO;
|
|
|
+import com.provectus.kafka.ui.model.SeekDirectionDTO;
|
|
|
+import com.provectus.kafka.ui.model.SmartFilterTestExecutionDTO;
|
|
|
+import com.provectus.kafka.ui.model.SmartFilterTestExecutionResultDTO;
|
|
|
import com.provectus.kafka.ui.model.TopicMessageDTO;
|
|
|
import com.provectus.kafka.ui.model.TopicMessageEventDTO;
|
|
|
import com.provectus.kafka.ui.serde.api.Serde;
|
|
@@ -100,10 +104,7 @@ public class MessagesService {
|
|
|
public static SmartFilterTestExecutionResultDTO execSmartFilterTest(SmartFilterTestExecutionDTO execData) {
|
|
|
Predicate<TopicMessageDTO> predicate;
|
|
|
try {
|
|
|
- predicate = MessageFilters.createMsgFilter(
|
|
|
- execData.getFilterCode(),
|
|
|
- MessageFilterTypeDTO.GROOVY_SCRIPT
|
|
|
- );
|
|
|
+ predicate = MessageFilters.groovyScriptFilter(execData.getFilterCode());
|
|
|
} catch (Exception e) {
|
|
|
log.info("Smart filter '{}' compilation error", execData.getFilterCode(), e);
|
|
|
return new SmartFilterTestExecutionResultDTO()
|
|
@@ -211,18 +212,47 @@ public class MessagesService {
|
|
|
return new KafkaProducer<>(properties);
|
|
|
}
|
|
|
|
|
|
- public Flux<TopicMessageEventDTO> loadMessages(KafkaCluster cluster, String topic,
|
|
|
+ public Flux<TopicMessageEventDTO> loadMessages(KafkaCluster cluster,
|
|
|
+ String topic,
|
|
|
ConsumerPosition consumerPosition,
|
|
|
- @Nullable String query,
|
|
|
- MessageFilterTypeDTO filterQueryType,
|
|
|
- @Nullable Integer pageSize,
|
|
|
- SeekDirectionDTO seekDirection,
|
|
|
+ @Nullable String containsStringFilter,
|
|
|
+ @Nullable String filterId,
|
|
|
+ @Nullable Integer limit,
|
|
|
@Nullable String keySerde,
|
|
|
@Nullable String valueSerde) {
|
|
|
+ return loadMessages(
|
|
|
+ cluster,
|
|
|
+ topic,
|
|
|
+ deserializationService.deserializerFor(cluster, topic, keySerde, valueSerde),
|
|
|
+ consumerPosition,
|
|
|
+ getMsgFilter(containsStringFilter, filterId),
|
|
|
+ fixPageSize(limit)
|
|
|
+ );
|
|
|
+ }
|
|
|
+
|
|
|
+ public Flux<TopicMessageEventDTO> loadMessages(KafkaCluster cluster, String topic, String cursorId) {
|
|
|
+ Cursor cursor = cursorsStorage.getCursor(cursorId)
|
|
|
+ .orElseThrow(() -> new ValidationException("Next page cursor not found. Maybe it was evicted from cache."));
|
|
|
+ return loadMessages(
|
|
|
+ cluster,
|
|
|
+ topic,
|
|
|
+ cursor.deserializer(),
|
|
|
+ cursor.consumerPosition(),
|
|
|
+ cursor.filter(),
|
|
|
+ cursor.limit()
|
|
|
+ );
|
|
|
+ }
|
|
|
+
|
|
|
+ private Flux<TopicMessageEventDTO> loadMessages(KafkaCluster cluster,
|
|
|
+ String topic,
|
|
|
+ ConsumerRecordDeserializer deserializer,
|
|
|
+ ConsumerPosition consumerPosition,
|
|
|
+ Predicate<TopicMessageDTO> filter,
|
|
|
+ int limit) {
|
|
|
return withExistingTopic(cluster, topic)
|
|
|
.flux()
|
|
|
.publishOn(Schedulers.boundedElastic())
|
|
|
- .flatMap(td -> loadMessagesImpl(cluster, topic, deserializer, consumerPosition, filter, fixPageSize(limit)));
|
|
|
+ .flatMap(td -> loadMessagesImpl(cluster, topic, deserializer, consumerPosition, filter, limit));
|
|
|
}
|
|
|
|
|
|
private Flux<TopicMessageEventDTO> loadMessagesImpl(KafkaCluster cluster,
|
|
@@ -265,6 +295,12 @@ public class MessagesService {
|
|
|
.map(throttleUiPublish(consumerPosition.pollingMode()));
|
|
|
}
|
|
|
|
|
|
+ private int fixPageSize(@Nullable Integer pageSize) {
|
|
|
+ return Optional.ofNullable(pageSize)
|
|
|
+ .filter(ps -> ps > 0 && ps <= maxPageSize)
|
|
|
+ .orElse(defaultPageSize);
|
|
|
+ }
|
|
|
+
|
|
|
public String registerMessageFilter(String groovyCode) {
|
|
|
String saltedCode = groovyCode + SALT_FOR_HASHING;
|
|
|
String filterId = Hashing.sha256()
|
|
@@ -277,6 +313,20 @@ public class MessagesService {
|
|
|
return filterId;
|
|
|
}
|
|
|
|
|
|
+ private UnaryOperator<TopicMessageEventDTO> getDataMasker(KafkaCluster cluster, String topicName) {
|
|
|
+ var keyMasker = cluster.getMasking().getMaskingFunction(topicName, Serde.Target.KEY);
|
|
|
+ var valMasker = cluster.getMasking().getMaskingFunction(topicName, Serde.Target.VALUE);
|
|
|
+ return evt -> {
|
|
|
+ if (evt.getType() != TopicMessageEventDTO.TypeEnum.MESSAGE) {
|
|
|
+ return evt;
|
|
|
+ }
|
|
|
+ return evt.message(
|
|
|
+ evt.getMessage()
|
|
|
+ .key(keyMasker.apply(evt.getMessage().getKey()))
|
|
|
+ .content(valMasker.apply(evt.getMessage().getContent())));
|
|
|
+ };
|
|
|
+ }
|
|
|
+
|
|
|
private Predicate<TopicMessageDTO> getMsgFilter(@Nullable String containsStrFilter,
|
|
|
@Nullable String smartFilterId) {
|
|
|
Predicate<TopicMessageDTO> messageFilter = MessageFilters.noop();
|