diff --git a/kafka-ui-api/pom.xml b/kafka-ui-api/pom.xml
index a6092e8447..0a07d32256 100644
--- a/kafka-ui-api/pom.xml
+++ b/kafka-ui-api/pom.xml
@@ -202,6 +202,18 @@
spring-security-ldap
+
+
+ org.codehaus.groovy
+ groovy-jsr223
+ ${groovy.version}
+
+
+ org.codehaus.groovy
+ groovy-json
+ ${groovy.version}
+
+
diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/MessagesController.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/MessagesController.java
index a7abe364b8..de626778d9 100644
--- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/MessagesController.java
+++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/MessagesController.java
@@ -3,6 +3,7 @@ package com.provectus.kafka.ui.controller;
import com.provectus.kafka.ui.api.MessagesApi;
import com.provectus.kafka.ui.model.ConsumerPosition;
import com.provectus.kafka.ui.model.CreateTopicMessageDTO;
+import com.provectus.kafka.ui.model.MessageFilterTypeDTO;
import com.provectus.kafka.ui.model.SeekDirectionDTO;
import com.provectus.kafka.ui.model.SeekTypeDTO;
import com.provectus.kafka.ui.model.TopicMessageEventDTO;
@@ -44,14 +45,14 @@ public class MessagesController extends AbstractController implements MessagesAp
@Override
public Mono>> getTopicMessages(
- String clusterName, String topicName, @Valid SeekTypeDTO seekType, @Valid List seekTo,
- @Valid Integer limit, @Valid String q, @Valid SeekDirectionDTO seekDirection,
- ServerWebExchange exchange) {
+ String clusterName, String topicName, SeekTypeDTO seekType, List seekTo,
+ Integer limit, String q, MessageFilterTypeDTO filterQueryType,
+ SeekDirectionDTO seekDirection, ServerWebExchange exchange) {
return parseConsumerPosition(topicName, seekType, seekTo, seekDirection)
.map(position ->
ResponseEntity.ok(
messagesService.loadMessages(
- getCluster(clusterName), topicName, position, q, limit)
+ getCluster(clusterName), topicName, position, q, filterQueryType, limit)
)
);
}
diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/MessageFilters.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/MessageFilters.java
new file mode 100644
index 0000000000..ee14f1b66d
--- /dev/null
+++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/MessageFilters.java
@@ -0,0 +1,91 @@
+package com.provectus.kafka.ui.emitter;
+
+import com.provectus.kafka.ui.exception.ValidationException;
+import com.provectus.kafka.ui.model.MessageFilterTypeDTO;
+import com.provectus.kafka.ui.model.TopicMessageDTO;
+import groovy.json.JsonSlurper;
+import java.util.function.Predicate;
+import javax.annotation.Nullable;
+import javax.script.CompiledScript;
+import javax.script.ScriptEngineManager;
+import javax.script.ScriptException;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.StringUtils;
+import org.codehaus.groovy.jsr223.GroovyScriptEngineImpl;
+
+@Slf4j
+public class MessageFilters {
+
+ private static GroovyScriptEngineImpl GROOVY_ENGINE;
+
+ public static Predicate createMsgFilter(String query, MessageFilterTypeDTO type) {
+ switch (type) {
+ case STRING_CONTAINS:
+ return containsStringFilter(query);
+ case GROOVY_SCRIPT:
+ return groovyScriptFilter(query);
+ default:
+ throw new IllegalStateException("Unknown query type: " + type);
+ }
+ }
+
+ static Predicate containsStringFilter(String string) {
+ return msg -> StringUtils.contains(msg.getKey(), string)
+ || StringUtils.contains(msg.getContent(), string);
+ }
+
+ static Predicate groovyScriptFilter(String script) {
+ var compiledScript = compileScript(script);
+ var jsonSlurper = new JsonSlurper();
+ return msg -> {
+ var bindings = getGroovyEngine().createBindings();
+ bindings.put("partition", msg.getPartition());
+ bindings.put("timestampMs", msg.getTimestamp().toInstant().toEpochMilli());
+ bindings.put("keyAsText", msg.getKey());
+ bindings.put("valueAsText", msg.getContent());
+ bindings.put("headers", msg.getHeaders());
+ bindings.put("key", parseToJsonOrReturnNull(jsonSlurper, msg.getKey()));
+ bindings.put("value", parseToJsonOrReturnNull(jsonSlurper, msg.getContent()));
+ try {
+ var result = compiledScript.eval(bindings);
+ if (result instanceof Boolean) {
+ return (Boolean) result;
+ }
+ return false;
+ } catch (Exception e) {
+ log.trace("Error executing filter script '{}' on message '{}' ", script, msg, e);
+ return false;
+ }
+ };
+ }
+
+ @Nullable
+ private static Object parseToJsonOrReturnNull(JsonSlurper parser, @Nullable String str) {
+ if (str == null) {
+ return null;
+ }
+ try {
+ return parser.parseText(str);
+ } catch (Exception e) {
+ return null;
+ }
+ }
+
+ private static synchronized GroovyScriptEngineImpl getGroovyEngine() {
+ // it is pretty heavy object, so initializing it on-demand
+ if (GROOVY_ENGINE == null) {
+ GROOVY_ENGINE = (GroovyScriptEngineImpl)
+ new ScriptEngineManager().getEngineByName("groovy");
+ }
+ return GROOVY_ENGINE;
+ }
+
+ private static CompiledScript compileScript(String script) {
+ try {
+ return getGroovyEngine().compile(script);
+ } catch (ScriptException e) {
+ throw new ValidationException("Script syntax error: " + e.getMessage());
+ }
+ }
+
+}
diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/MessagesService.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/MessagesService.java
index 04998d4bb5..504f088ab9 100644
--- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/MessagesService.java
+++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/MessagesService.java
@@ -2,13 +2,14 @@ package com.provectus.kafka.ui.service;
import com.provectus.kafka.ui.emitter.BackwardRecordEmitter;
import com.provectus.kafka.ui.emitter.ForwardRecordEmitter;
+import com.provectus.kafka.ui.emitter.MessageFilters;
import com.provectus.kafka.ui.exception.TopicNotFoundException;
import com.provectus.kafka.ui.exception.ValidationException;
import com.provectus.kafka.ui.model.ConsumerPosition;
import com.provectus.kafka.ui.model.CreateTopicMessageDTO;
import com.provectus.kafka.ui.model.KafkaCluster;
+import com.provectus.kafka.ui.model.MessageFilterTypeDTO;
import com.provectus.kafka.ui.model.SeekDirectionDTO;
-import com.provectus.kafka.ui.model.TopicMessageDTO;
import com.provectus.kafka.ui.model.TopicMessageEventDTO;
import com.provectus.kafka.ui.serde.DeserializationService;
import com.provectus.kafka.ui.serde.RecordSerDe;
@@ -20,10 +21,12 @@ import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.concurrent.CompletableFuture;
+import java.util.function.Predicate;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.clients.admin.OffsetSpec;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
@@ -35,7 +38,6 @@ import org.apache.kafka.common.header.internals.RecordHeader;
import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.springframework.stereotype.Service;
-import org.springframework.util.StringUtils;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;
import reactor.core.publisher.Mono;
@@ -131,6 +133,7 @@ public class MessagesService {
public Flux loadMessages(KafkaCluster cluster, String topic,
ConsumerPosition consumerPosition, String query,
+ MessageFilterTypeDTO filterQueryType,
Integer limit) {
int recordsLimit = Optional.ofNullable(limit)
.map(s -> Math.min(s, MAX_LOAD_RECORD_LIMIT))
@@ -153,21 +156,26 @@ public class MessagesService {
);
}
return Flux.create(emitter)
- .filter(m -> filterTopicMessage(m, query))
+ .filter(getMsgFilter(query, filterQueryType))
.takeWhile(new FilterTopicMessageEvents(recordsLimit))
.subscribeOn(Schedulers.elastic())
.share();
}
- private boolean filterTopicMessage(TopicMessageEventDTO message, String query) {
- if (StringUtils.isEmpty(query)
- || !message.getType().equals(TopicMessageEventDTO.TypeEnum.MESSAGE)) {
- return true;
+ private Predicate getMsgFilter(String query, MessageFilterTypeDTO filterQueryType) {
+ if (StringUtils.isEmpty(query)) {
+ return evt -> true;
}
-
- final TopicMessageDTO msg = message.getMessage();
- return (!StringUtils.isEmpty(msg.getKey()) && msg.getKey().contains(query))
- || (!StringUtils.isEmpty(msg.getContent()) && msg.getContent().contains(query));
+ filterQueryType = Optional.ofNullable(filterQueryType)
+ .orElse(MessageFilterTypeDTO.STRING_CONTAINS);
+ var messageFilter = MessageFilters.createMsgFilter(query, filterQueryType);
+ return evt -> {
+ // we only apply filter for message events
+ if (evt.getType() == TopicMessageEventDTO.TypeEnum.MESSAGE) {
+ return messageFilter.test(evt.getMessage());
+ }
+ return true;
+ };
}
}
diff --git a/kafka-ui-api/src/test/java/com/provectus/kafka/ui/emitter/MessageFiltersTest.java b/kafka-ui-api/src/test/java/com/provectus/kafka/ui/emitter/MessageFiltersTest.java
new file mode 100644
index 0000000000..ded921bfcd
--- /dev/null
+++ b/kafka-ui-api/src/test/java/com/provectus/kafka/ui/emitter/MessageFiltersTest.java
@@ -0,0 +1,173 @@
+package com.provectus.kafka.ui.emitter;
+
+import static com.provectus.kafka.ui.emitter.MessageFilters.containsStringFilter;
+import static com.provectus.kafka.ui.emitter.MessageFilters.groovyScriptFilter;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import com.provectus.kafka.ui.exception.ValidationException;
+import com.provectus.kafka.ui.model.TopicMessageDTO;
+import java.time.OffsetDateTime;
+import java.time.temporal.ChronoUnit;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.function.Predicate;
+import org.apache.commons.lang3.RandomStringUtils;
+import org.junit.jupiter.api.Nested;
+import org.junit.jupiter.api.Test;
+
+class MessageFiltersTest {
+
+ @Nested
+ class StringContainsFilter {
+
+ Predicate filter = containsStringFilter("abC");
+
+ @Test
+ void returnsTrueWhenStringContainedInKeyOrContentOrInBoth() {
+ assertTrue(
+ filter.test(msg().key("contains abCd").content("some str"))
+ );
+
+ assertTrue(
+ filter.test(msg().key("some str").content("contains abCd"))
+ );
+
+ assertTrue(
+ filter.test(msg().key("contains abCd").content("contains abCd"))
+ );
+ }
+
+ @Test
+ void returnsFalseOtherwise() {
+ assertFalse(
+ filter.test(msg().key("some str").content("some str"))
+ );
+
+ assertFalse(
+ filter.test(msg().key(null).content(null))
+ );
+
+ assertFalse(
+ filter.test(msg().key("aBc").content("AbC"))
+ );
+ }
+
+ }
+
+ @Nested
+ class GroovyScriptFilter {
+
+ @Test
+ void throwsExceptionOnInvalidGroovySyntax() {
+ assertThrows(ValidationException.class,
+ () -> groovyScriptFilter("this is invalid groovy syntax = 1"));
+ }
+
+ @Test
+ void canCheckPartition() {
+ var f = groovyScriptFilter("partition == 1");
+ assertTrue(f.test(msg().partition(1)));
+ assertFalse(f.test(msg().partition(0)));
+ }
+
+ @Test
+ void canCheckTimestampMs() {
+ var ts = OffsetDateTime.now();
+ var f = groovyScriptFilter("timestampMs == " + ts.toInstant().toEpochMilli());
+ assertTrue(f.test(msg().timestamp(ts)));
+ assertFalse(f.test(msg().timestamp(ts.plus(1L, ChronoUnit.SECONDS))));
+ }
+
+ @Test
+ void canCheckValueAsText() {
+ var f = groovyScriptFilter("valueAsText == 'some text'");
+ assertTrue(f.test(msg().content("some text")));
+ assertFalse(f.test(msg().content("some other text")));
+ }
+
+ @Test
+ void canCheckKeyAsText() {
+ var f = groovyScriptFilter("keyAsText == 'some text'");
+ assertTrue(f.test(msg().key("some text")));
+ assertFalse(f.test(msg().key("some other text")));
+ }
+
+ @Test
+ void canCheckKeyAsJsonObjectIfItCanBeParsedToJson() {
+ var f = groovyScriptFilter("key.name.first == 'user1'");
+ assertTrue(f.test(msg().key("{ \"name\" : { \"first\" : \"user1\" } }")));
+ assertFalse(f.test(msg().key("{ \"name\" : { \"first\" : \"user2\" } }")));
+ }
+
+ @Test
+ void keySetToNullIfKeyCantBeParsedToJson() {
+ var f = groovyScriptFilter("key == null");
+ assertTrue(f.test(msg().key("not json")));
+ assertFalse(f.test(msg().key("{ \"k\" : \"v\" }")));
+ }
+
+ @Test
+ void canCheckValueAsJsonObjectIfItCanBeParsedToJson() {
+ var f = groovyScriptFilter("value.name.first == 'user1'");
+ assertTrue(f.test(msg().content("{ \"name\" : { \"first\" : \"user1\" } }")));
+ assertFalse(f.test(msg().content("{ \"name\" : { \"first\" : \"user2\" } }")));
+ }
+
+ @Test
+ void valueSetToNullIfKeyCantBeParsedToJson() {
+ var f = groovyScriptFilter("value == null");
+ assertTrue(f.test(msg().content("not json")));
+ assertFalse(f.test(msg().content("{ \"k\" : \"v\" }")));
+ }
+
+ @Test
+ void canRunMultiStatementScripts() {
+ var f = groovyScriptFilter("def name = value.name.first \n return name == 'user1' ");
+ assertTrue(f.test(msg().content("{ \"name\" : { \"first\" : \"user1\" } }")));
+ assertFalse(f.test(msg().content("{ \"name\" : { \"first\" : \"user2\" } }")));
+
+ f = groovyScriptFilter("def name = value.name.first; return name == 'user1' ");
+ assertTrue(f.test(msg().content("{ \"name\" : { \"first\" : \"user1\" } }")));
+ assertFalse(f.test(msg().content("{ \"name\" : { \"first\" : \"user2\" } }")));
+
+ f = groovyScriptFilter("def name = value.name.first; name == 'user1' ");
+ assertTrue(f.test(msg().content("{ \"name\" : { \"first\" : \"user1\" } }")));
+ assertFalse(f.test(msg().content("{ \"name\" : { \"first\" : \"user2\" } }")));
+ }
+
+
+ @Test
+ void filterSpeedIsAtLeast10kPerSec() {
+ var f = groovyScriptFilter("value.name.first == 'user1' && keyAsText.startsWith('a') ");
+
+ List toFilter = new ArrayList<>();
+ for (int i = 0; i < 5_000; i++) {
+ String name = i % 2 == 0 ? "user1" : RandomStringUtils.randomAlphabetic(10);
+ String randString = RandomStringUtils.randomAlphabetic(30);
+ String jsonContent = String.format(
+ "{ \"name\" : { \"randomStr\": \"%s\", \"first\" : \"%s\"} }",
+ randString, name);
+ toFilter.add(msg().content(jsonContent).key(randString));
+ }
+ // first iteration for warmup
+ toFilter.stream().filter(f).count();
+
+ long before = System.currentTimeMillis();
+ long matched = toFilter.stream().filter(f).count();
+ long took = System.currentTimeMillis() - before;
+
+ assertThat(took).isLessThan(500);
+ assertThat(matched).isGreaterThan(0);
+ }
+ }
+
+ private TopicMessageDTO msg() {
+ return new TopicMessageDTO()
+ .timestamp(OffsetDateTime.now())
+ .partition(1);
+ }
+
+}
\ No newline at end of file
diff --git a/kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/SendAndReadTests.java b/kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/SendAndReadTests.java
index 864c191248..84edb4518e 100644
--- a/kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/SendAndReadTests.java
+++ b/kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/SendAndReadTests.java
@@ -547,6 +547,7 @@ public class SendAndReadTests extends AbstractBaseTest {
SeekDirectionDTO.FORWARD
),
null,
+ null,
1
).filter(e -> e.getType().equals(TopicMessageEventDTO.TypeEnum.MESSAGE))
.map(TopicMessageEventDTO::getMessage)
diff --git a/kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml b/kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml
index 6fd9c4efa6..9436dc8c9e 100644
--- a/kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml
+++ b/kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml
@@ -500,6 +500,10 @@ paths:
in: query
schema:
type: string
+ - name: filterQueryType
+ in: query
+ schema:
+ $ref: "#/components/schemas/MessageFilterType"
- name: seekDirection
in: query
schema:
@@ -2088,6 +2092,12 @@ components:
- OFFSET
- TIMESTAMP
+ MessageFilterType:
+ type: string
+ enum:
+ - STRING_CONTAINS
+ - GROOVY_SCRIPT
+
SeekDirection:
type: string
enum:
diff --git a/pom.xml b/pom.xml
index 77f9345a89..b08a67743a 100644
--- a/pom.xml
+++ b/pom.xml
@@ -40,6 +40,7 @@
2.21.0
3.19.0
4.7.1
+ 3.0.9
..//kafka-ui-react-app/src/generated-sources