From 1699663bacb25c2e8dca7b3b778ce1d683ce41cf Mon Sep 17 00:00:00 2001 From: Ilya Kuramshin Date: Wed, 9 Feb 2022 13:28:27 +0300 Subject: [PATCH] Smart filters: Groovy script messages filter implementation (reopened) (#1547) * groovy script messages filter added --- kafka-ui-api/pom.xml | 12 ++ .../ui/controller/MessagesController.java | 9 +- .../kafka/ui/emitter/MessageFilters.java | 91 +++++++++ .../kafka/ui/service/MessagesService.java | 30 +-- .../kafka/ui/emitter/MessageFiltersTest.java | 173 ++++++++++++++++++ .../kafka/ui/service/SendAndReadTests.java | 1 + .../main/resources/swagger/kafka-ui-api.yaml | 10 + pom.xml | 1 + 8 files changed, 312 insertions(+), 15 deletions(-) create mode 100644 kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/MessageFilters.java create mode 100644 kafka-ui-api/src/test/java/com/provectus/kafka/ui/emitter/MessageFiltersTest.java diff --git a/kafka-ui-api/pom.xml b/kafka-ui-api/pom.xml index a6092e8447..0a07d32256 100644 --- a/kafka-ui-api/pom.xml +++ b/kafka-ui-api/pom.xml @@ -202,6 +202,18 @@ spring-security-ldap + + + org.codehaus.groovy + groovy-jsr223 + ${groovy.version} + + + org.codehaus.groovy + groovy-json + ${groovy.version} + + diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/MessagesController.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/MessagesController.java index a7abe364b8..de626778d9 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/MessagesController.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/MessagesController.java @@ -3,6 +3,7 @@ package com.provectus.kafka.ui.controller; import com.provectus.kafka.ui.api.MessagesApi; import com.provectus.kafka.ui.model.ConsumerPosition; import com.provectus.kafka.ui.model.CreateTopicMessageDTO; +import com.provectus.kafka.ui.model.MessageFilterTypeDTO; import com.provectus.kafka.ui.model.SeekDirectionDTO; import com.provectus.kafka.ui.model.SeekTypeDTO; import com.provectus.kafka.ui.model.TopicMessageEventDTO; @@ -44,14 +45,14 @@ public class MessagesController extends AbstractController implements MessagesAp @Override public Mono>> getTopicMessages( - String clusterName, String topicName, @Valid SeekTypeDTO seekType, @Valid List seekTo, - @Valid Integer limit, @Valid String q, @Valid SeekDirectionDTO seekDirection, - ServerWebExchange exchange) { + String clusterName, String topicName, SeekTypeDTO seekType, List seekTo, + Integer limit, String q, MessageFilterTypeDTO filterQueryType, + SeekDirectionDTO seekDirection, ServerWebExchange exchange) { return parseConsumerPosition(topicName, seekType, seekTo, seekDirection) .map(position -> ResponseEntity.ok( messagesService.loadMessages( - getCluster(clusterName), topicName, position, q, limit) + getCluster(clusterName), topicName, position, q, filterQueryType, limit) ) ); } diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/MessageFilters.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/MessageFilters.java new file mode 100644 index 0000000000..ee14f1b66d --- /dev/null +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/MessageFilters.java @@ -0,0 +1,91 @@ +package com.provectus.kafka.ui.emitter; + +import com.provectus.kafka.ui.exception.ValidationException; +import com.provectus.kafka.ui.model.MessageFilterTypeDTO; +import com.provectus.kafka.ui.model.TopicMessageDTO; +import groovy.json.JsonSlurper; +import java.util.function.Predicate; +import javax.annotation.Nullable; +import javax.script.CompiledScript; +import javax.script.ScriptEngineManager; +import javax.script.ScriptException; +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.StringUtils; +import org.codehaus.groovy.jsr223.GroovyScriptEngineImpl; + +@Slf4j +public class MessageFilters { + + private static GroovyScriptEngineImpl GROOVY_ENGINE; + + public static Predicate createMsgFilter(String query, MessageFilterTypeDTO type) { + switch (type) { + case STRING_CONTAINS: + return containsStringFilter(query); + case GROOVY_SCRIPT: + return groovyScriptFilter(query); + default: + throw new IllegalStateException("Unknown query type: " + type); + } + } + + static Predicate containsStringFilter(String string) { + return msg -> StringUtils.contains(msg.getKey(), string) + || StringUtils.contains(msg.getContent(), string); + } + + static Predicate groovyScriptFilter(String script) { + var compiledScript = compileScript(script); + var jsonSlurper = new JsonSlurper(); + return msg -> { + var bindings = getGroovyEngine().createBindings(); + bindings.put("partition", msg.getPartition()); + bindings.put("timestampMs", msg.getTimestamp().toInstant().toEpochMilli()); + bindings.put("keyAsText", msg.getKey()); + bindings.put("valueAsText", msg.getContent()); + bindings.put("headers", msg.getHeaders()); + bindings.put("key", parseToJsonOrReturnNull(jsonSlurper, msg.getKey())); + bindings.put("value", parseToJsonOrReturnNull(jsonSlurper, msg.getContent())); + try { + var result = compiledScript.eval(bindings); + if (result instanceof Boolean) { + return (Boolean) result; + } + return false; + } catch (Exception e) { + log.trace("Error executing filter script '{}' on message '{}' ", script, msg, e); + return false; + } + }; + } + + @Nullable + private static Object parseToJsonOrReturnNull(JsonSlurper parser, @Nullable String str) { + if (str == null) { + return null; + } + try { + return parser.parseText(str); + } catch (Exception e) { + return null; + } + } + + private static synchronized GroovyScriptEngineImpl getGroovyEngine() { + // it is pretty heavy object, so initializing it on-demand + if (GROOVY_ENGINE == null) { + GROOVY_ENGINE = (GroovyScriptEngineImpl) + new ScriptEngineManager().getEngineByName("groovy"); + } + return GROOVY_ENGINE; + } + + private static CompiledScript compileScript(String script) { + try { + return getGroovyEngine().compile(script); + } catch (ScriptException e) { + throw new ValidationException("Script syntax error: " + e.getMessage()); + } + } + +} diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/MessagesService.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/MessagesService.java index 04998d4bb5..504f088ab9 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/MessagesService.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/MessagesService.java @@ -2,13 +2,14 @@ package com.provectus.kafka.ui.service; import com.provectus.kafka.ui.emitter.BackwardRecordEmitter; import com.provectus.kafka.ui.emitter.ForwardRecordEmitter; +import com.provectus.kafka.ui.emitter.MessageFilters; import com.provectus.kafka.ui.exception.TopicNotFoundException; import com.provectus.kafka.ui.exception.ValidationException; import com.provectus.kafka.ui.model.ConsumerPosition; import com.provectus.kafka.ui.model.CreateTopicMessageDTO; import com.provectus.kafka.ui.model.KafkaCluster; +import com.provectus.kafka.ui.model.MessageFilterTypeDTO; import com.provectus.kafka.ui.model.SeekDirectionDTO; -import com.provectus.kafka.ui.model.TopicMessageDTO; import com.provectus.kafka.ui.model.TopicMessageEventDTO; import com.provectus.kafka.ui.serde.DeserializationService; import com.provectus.kafka.ui.serde.RecordSerDe; @@ -20,10 +21,12 @@ import java.util.Map; import java.util.Optional; import java.util.Properties; import java.util.concurrent.CompletableFuture; +import java.util.function.Predicate; import java.util.stream.Collectors; import javax.annotation.Nullable; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.StringUtils; import org.apache.kafka.clients.admin.OffsetSpec; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; @@ -35,7 +38,6 @@ import org.apache.kafka.common.header.internals.RecordHeader; import org.apache.kafka.common.header.internals.RecordHeaders; import org.apache.kafka.common.serialization.ByteArraySerializer; import org.springframework.stereotype.Service; -import org.springframework.util.StringUtils; import reactor.core.publisher.Flux; import reactor.core.publisher.FluxSink; import reactor.core.publisher.Mono; @@ -131,6 +133,7 @@ public class MessagesService { public Flux loadMessages(KafkaCluster cluster, String topic, ConsumerPosition consumerPosition, String query, + MessageFilterTypeDTO filterQueryType, Integer limit) { int recordsLimit = Optional.ofNullable(limit) .map(s -> Math.min(s, MAX_LOAD_RECORD_LIMIT)) @@ -153,21 +156,26 @@ public class MessagesService { ); } return Flux.create(emitter) - .filter(m -> filterTopicMessage(m, query)) + .filter(getMsgFilter(query, filterQueryType)) .takeWhile(new FilterTopicMessageEvents(recordsLimit)) .subscribeOn(Schedulers.elastic()) .share(); } - private boolean filterTopicMessage(TopicMessageEventDTO message, String query) { - if (StringUtils.isEmpty(query) - || !message.getType().equals(TopicMessageEventDTO.TypeEnum.MESSAGE)) { - return true; + private Predicate getMsgFilter(String query, MessageFilterTypeDTO filterQueryType) { + if (StringUtils.isEmpty(query)) { + return evt -> true; } - - final TopicMessageDTO msg = message.getMessage(); - return (!StringUtils.isEmpty(msg.getKey()) && msg.getKey().contains(query)) - || (!StringUtils.isEmpty(msg.getContent()) && msg.getContent().contains(query)); + filterQueryType = Optional.ofNullable(filterQueryType) + .orElse(MessageFilterTypeDTO.STRING_CONTAINS); + var messageFilter = MessageFilters.createMsgFilter(query, filterQueryType); + return evt -> { + // we only apply filter for message events + if (evt.getType() == TopicMessageEventDTO.TypeEnum.MESSAGE) { + return messageFilter.test(evt.getMessage()); + } + return true; + }; } } diff --git a/kafka-ui-api/src/test/java/com/provectus/kafka/ui/emitter/MessageFiltersTest.java b/kafka-ui-api/src/test/java/com/provectus/kafka/ui/emitter/MessageFiltersTest.java new file mode 100644 index 0000000000..ded921bfcd --- /dev/null +++ b/kafka-ui-api/src/test/java/com/provectus/kafka/ui/emitter/MessageFiltersTest.java @@ -0,0 +1,173 @@ +package com.provectus.kafka.ui.emitter; + +import static com.provectus.kafka.ui.emitter.MessageFilters.containsStringFilter; +import static com.provectus.kafka.ui.emitter.MessageFilters.groovyScriptFilter; +import static org.assertj.core.api.Assertions.assertThat; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import com.provectus.kafka.ui.exception.ValidationException; +import com.provectus.kafka.ui.model.TopicMessageDTO; +import java.time.OffsetDateTime; +import java.time.temporal.ChronoUnit; +import java.util.ArrayList; +import java.util.List; +import java.util.function.Predicate; +import org.apache.commons.lang3.RandomStringUtils; +import org.junit.jupiter.api.Nested; +import org.junit.jupiter.api.Test; + +class MessageFiltersTest { + + @Nested + class StringContainsFilter { + + Predicate filter = containsStringFilter("abC"); + + @Test + void returnsTrueWhenStringContainedInKeyOrContentOrInBoth() { + assertTrue( + filter.test(msg().key("contains abCd").content("some str")) + ); + + assertTrue( + filter.test(msg().key("some str").content("contains abCd")) + ); + + assertTrue( + filter.test(msg().key("contains abCd").content("contains abCd")) + ); + } + + @Test + void returnsFalseOtherwise() { + assertFalse( + filter.test(msg().key("some str").content("some str")) + ); + + assertFalse( + filter.test(msg().key(null).content(null)) + ); + + assertFalse( + filter.test(msg().key("aBc").content("AbC")) + ); + } + + } + + @Nested + class GroovyScriptFilter { + + @Test + void throwsExceptionOnInvalidGroovySyntax() { + assertThrows(ValidationException.class, + () -> groovyScriptFilter("this is invalid groovy syntax = 1")); + } + + @Test + void canCheckPartition() { + var f = groovyScriptFilter("partition == 1"); + assertTrue(f.test(msg().partition(1))); + assertFalse(f.test(msg().partition(0))); + } + + @Test + void canCheckTimestampMs() { + var ts = OffsetDateTime.now(); + var f = groovyScriptFilter("timestampMs == " + ts.toInstant().toEpochMilli()); + assertTrue(f.test(msg().timestamp(ts))); + assertFalse(f.test(msg().timestamp(ts.plus(1L, ChronoUnit.SECONDS)))); + } + + @Test + void canCheckValueAsText() { + var f = groovyScriptFilter("valueAsText == 'some text'"); + assertTrue(f.test(msg().content("some text"))); + assertFalse(f.test(msg().content("some other text"))); + } + + @Test + void canCheckKeyAsText() { + var f = groovyScriptFilter("keyAsText == 'some text'"); + assertTrue(f.test(msg().key("some text"))); + assertFalse(f.test(msg().key("some other text"))); + } + + @Test + void canCheckKeyAsJsonObjectIfItCanBeParsedToJson() { + var f = groovyScriptFilter("key.name.first == 'user1'"); + assertTrue(f.test(msg().key("{ \"name\" : { \"first\" : \"user1\" } }"))); + assertFalse(f.test(msg().key("{ \"name\" : { \"first\" : \"user2\" } }"))); + } + + @Test + void keySetToNullIfKeyCantBeParsedToJson() { + var f = groovyScriptFilter("key == null"); + assertTrue(f.test(msg().key("not json"))); + assertFalse(f.test(msg().key("{ \"k\" : \"v\" }"))); + } + + @Test + void canCheckValueAsJsonObjectIfItCanBeParsedToJson() { + var f = groovyScriptFilter("value.name.first == 'user1'"); + assertTrue(f.test(msg().content("{ \"name\" : { \"first\" : \"user1\" } }"))); + assertFalse(f.test(msg().content("{ \"name\" : { \"first\" : \"user2\" } }"))); + } + + @Test + void valueSetToNullIfKeyCantBeParsedToJson() { + var f = groovyScriptFilter("value == null"); + assertTrue(f.test(msg().content("not json"))); + assertFalse(f.test(msg().content("{ \"k\" : \"v\" }"))); + } + + @Test + void canRunMultiStatementScripts() { + var f = groovyScriptFilter("def name = value.name.first \n return name == 'user1' "); + assertTrue(f.test(msg().content("{ \"name\" : { \"first\" : \"user1\" } }"))); + assertFalse(f.test(msg().content("{ \"name\" : { \"first\" : \"user2\" } }"))); + + f = groovyScriptFilter("def name = value.name.first; return name == 'user1' "); + assertTrue(f.test(msg().content("{ \"name\" : { \"first\" : \"user1\" } }"))); + assertFalse(f.test(msg().content("{ \"name\" : { \"first\" : \"user2\" } }"))); + + f = groovyScriptFilter("def name = value.name.first; name == 'user1' "); + assertTrue(f.test(msg().content("{ \"name\" : { \"first\" : \"user1\" } }"))); + assertFalse(f.test(msg().content("{ \"name\" : { \"first\" : \"user2\" } }"))); + } + + + @Test + void filterSpeedIsAtLeast10kPerSec() { + var f = groovyScriptFilter("value.name.first == 'user1' && keyAsText.startsWith('a') "); + + List toFilter = new ArrayList<>(); + for (int i = 0; i < 5_000; i++) { + String name = i % 2 == 0 ? "user1" : RandomStringUtils.randomAlphabetic(10); + String randString = RandomStringUtils.randomAlphabetic(30); + String jsonContent = String.format( + "{ \"name\" : { \"randomStr\": \"%s\", \"first\" : \"%s\"} }", + randString, name); + toFilter.add(msg().content(jsonContent).key(randString)); + } + // first iteration for warmup + toFilter.stream().filter(f).count(); + + long before = System.currentTimeMillis(); + long matched = toFilter.stream().filter(f).count(); + long took = System.currentTimeMillis() - before; + + assertThat(took).isLessThan(500); + assertThat(matched).isGreaterThan(0); + } + } + + private TopicMessageDTO msg() { + return new TopicMessageDTO() + .timestamp(OffsetDateTime.now()) + .partition(1); + } + +} \ No newline at end of file diff --git a/kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/SendAndReadTests.java b/kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/SendAndReadTests.java index 864c191248..84edb4518e 100644 --- a/kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/SendAndReadTests.java +++ b/kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/SendAndReadTests.java @@ -547,6 +547,7 @@ public class SendAndReadTests extends AbstractBaseTest { SeekDirectionDTO.FORWARD ), null, + null, 1 ).filter(e -> e.getType().equals(TopicMessageEventDTO.TypeEnum.MESSAGE)) .map(TopicMessageEventDTO::getMessage) diff --git a/kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml b/kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml index 6fd9c4efa6..9436dc8c9e 100644 --- a/kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml +++ b/kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml @@ -500,6 +500,10 @@ paths: in: query schema: type: string + - name: filterQueryType + in: query + schema: + $ref: "#/components/schemas/MessageFilterType" - name: seekDirection in: query schema: @@ -2088,6 +2092,12 @@ components: - OFFSET - TIMESTAMP + MessageFilterType: + type: string + enum: + - STRING_CONTAINS + - GROOVY_SCRIPT + SeekDirection: type: string enum: diff --git a/pom.xml b/pom.xml index 77f9345a89..b08a67743a 100644 --- a/pom.xml +++ b/pom.xml @@ -40,6 +40,7 @@ 2.21.0 3.19.0 4.7.1 + 3.0.9 ..//kafka-ui-react-app/src/generated-sources