|
@@ -14,11 +14,16 @@ import com.provectus.kafka.ui.model.CreateTopicMessageDTO;
|
|
|
import com.provectus.kafka.ui.model.KafkaCluster;
|
|
|
import com.provectus.kafka.ui.model.MessageFilterTypeDTO;
|
|
|
import com.provectus.kafka.ui.model.SeekDirectionDTO;
|
|
|
+import com.provectus.kafka.ui.model.SmartFilterTestExecutionDTO;
|
|
|
+import com.provectus.kafka.ui.model.SmartFilterTestExecutionResultDTO;
|
|
|
import com.provectus.kafka.ui.model.TopicMessageDTO;
|
|
|
import com.provectus.kafka.ui.model.TopicMessageEventDTO;
|
|
|
import com.provectus.kafka.ui.serde.api.Serde;
|
|
|
import com.provectus.kafka.ui.serdes.ProducerRecordCreator;
|
|
|
import com.provectus.kafka.ui.util.SslPropertiesUtil;
|
|
|
+import java.time.Instant;
|
|
|
+import java.time.OffsetDateTime;
|
|
|
+import java.time.ZoneOffset;
|
|
|
import java.util.List;
|
|
|
import java.util.Map;
|
|
|
import java.util.Optional;
|
|
@@ -81,6 +86,40 @@ public class MessagesService {
|
|
|
.switchIfEmpty(Mono.error(new TopicNotFoundException()));
|
|
|
}
|
|
|
|
|
|
+ public static SmartFilterTestExecutionResultDTO execSmartFilterTest(SmartFilterTestExecutionDTO execData) {
|
|
|
+ Predicate<TopicMessageDTO> predicate;
|
|
|
+ try {
|
|
|
+ predicate = MessageFilters.createMsgFilter(
|
|
|
+ execData.getFilterCode(),
|
|
|
+ MessageFilterTypeDTO.GROOVY_SCRIPT
|
|
|
+ );
|
|
|
+ } catch (Exception e) {
|
|
|
+ log.info("Smart filter '{}' compilation error", execData.getFilterCode(), e);
|
|
|
+ return new SmartFilterTestExecutionResultDTO()
|
|
|
+ .error("Compilation error : " + e.getMessage());
|
|
|
+ }
|
|
|
+ try {
|
|
|
+ var result = predicate.test(
|
|
|
+ new TopicMessageDTO()
|
|
|
+ .key(execData.getKey())
|
|
|
+ .content(execData.getValue())
|
|
|
+ .headers(execData.getHeaders())
|
|
|
+ .offset(execData.getOffset())
|
|
|
+ .partition(execData.getPartition())
|
|
|
+ .timestamp(
|
|
|
+ Optional.ofNullable(execData.getTimestampMs())
|
|
|
+ .map(ts -> OffsetDateTime.ofInstant(Instant.ofEpochMilli(ts), ZoneOffset.UTC))
|
|
|
+ .orElse(null))
|
|
|
+ );
|
|
|
+ return new SmartFilterTestExecutionResultDTO()
|
|
|
+ .result(result);
|
|
|
+ } catch (Exception e) {
|
|
|
+ log.info("Smart filter {} execution error", execData, e);
|
|
|
+ return new SmartFilterTestExecutionResultDTO()
|
|
|
+ .error("Execution error : " + e.getMessage());
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
public Mono<Void> deleteTopicMessages(KafkaCluster cluster, String topicName,
|
|
|
List<Integer> partitionsToInclude) {
|
|
|
return withExistingTopic(cluster, topicName)
|