Explorar el Código

Smart filters: Groovy script messages filter implementation (reopened) (#1547)

* groovy script messages filter added
Ilya Kuramshin hace 3 años
padre
commit
1699663bac

+ 12 - 0
kafka-ui-api/pom.xml

@@ -202,6 +202,18 @@
             <artifactId>spring-security-ldap</artifactId>
         </dependency>
 
+
+        <dependency>
+            <groupId>org.codehaus.groovy</groupId>
+            <artifactId>groovy-jsr223</artifactId>
+            <version>${groovy.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.codehaus.groovy</groupId>
+            <artifactId>groovy-json</artifactId>
+            <version>${groovy.version}</version>
+        </dependency>
+
     </dependencies>
 
     <build>

+ 5 - 4
kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/MessagesController.java

@@ -3,6 +3,7 @@ package com.provectus.kafka.ui.controller;
 import com.provectus.kafka.ui.api.MessagesApi;
 import com.provectus.kafka.ui.model.ConsumerPosition;
 import com.provectus.kafka.ui.model.CreateTopicMessageDTO;
+import com.provectus.kafka.ui.model.MessageFilterTypeDTO;
 import com.provectus.kafka.ui.model.SeekDirectionDTO;
 import com.provectus.kafka.ui.model.SeekTypeDTO;
 import com.provectus.kafka.ui.model.TopicMessageEventDTO;
@@ -44,14 +45,14 @@ public class MessagesController extends AbstractController implements MessagesAp
 
   @Override
   public Mono<ResponseEntity<Flux<TopicMessageEventDTO>>> getTopicMessages(
-      String clusterName, String topicName, @Valid SeekTypeDTO seekType, @Valid List<String> seekTo,
-      @Valid Integer limit, @Valid String q, @Valid SeekDirectionDTO seekDirection,
-      ServerWebExchange exchange) {
+      String clusterName, String topicName, SeekTypeDTO seekType, List<String> seekTo,
+      Integer limit, String q, MessageFilterTypeDTO filterQueryType,
+      SeekDirectionDTO seekDirection, ServerWebExchange exchange) {
     return parseConsumerPosition(topicName, seekType, seekTo, seekDirection)
         .map(position ->
             ResponseEntity.ok(
                 messagesService.loadMessages(
-                    getCluster(clusterName), topicName, position, q, limit)
+                    getCluster(clusterName), topicName, position, q, filterQueryType, limit)
             )
         );
   }

+ 91 - 0
kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/MessageFilters.java

@@ -0,0 +1,91 @@
+package com.provectus.kafka.ui.emitter;
+
+import com.provectus.kafka.ui.exception.ValidationException;
+import com.provectus.kafka.ui.model.MessageFilterTypeDTO;
+import com.provectus.kafka.ui.model.TopicMessageDTO;
+import groovy.json.JsonSlurper;
+import java.util.function.Predicate;
+import javax.annotation.Nullable;
+import javax.script.CompiledScript;
+import javax.script.ScriptEngineManager;
+import javax.script.ScriptException;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.StringUtils;
+import org.codehaus.groovy.jsr223.GroovyScriptEngineImpl;
+
+@Slf4j
+public class MessageFilters {
+
+  private static GroovyScriptEngineImpl GROOVY_ENGINE;
+
+  public static Predicate<TopicMessageDTO> createMsgFilter(String query, MessageFilterTypeDTO type) {
+    switch (type) {
+      case STRING_CONTAINS:
+        return containsStringFilter(query);
+      case GROOVY_SCRIPT:
+        return groovyScriptFilter(query);
+      default:
+        throw new IllegalStateException("Unknown query type: " + type);
+    }
+  }
+
+  static Predicate<TopicMessageDTO> containsStringFilter(String string) {
+    return msg -> StringUtils.contains(msg.getKey(), string)
+        || StringUtils.contains(msg.getContent(), string);
+  }
+
+  static Predicate<TopicMessageDTO> groovyScriptFilter(String script) {
+    var compiledScript = compileScript(script);
+    var jsonSlurper = new JsonSlurper();
+    return msg -> {
+      var bindings = getGroovyEngine().createBindings();
+      bindings.put("partition", msg.getPartition());
+      bindings.put("timestampMs", msg.getTimestamp().toInstant().toEpochMilli());
+      bindings.put("keyAsText", msg.getKey());
+      bindings.put("valueAsText", msg.getContent());
+      bindings.put("headers", msg.getHeaders());
+      bindings.put("key", parseToJsonOrReturnNull(jsonSlurper, msg.getKey()));
+      bindings.put("value", parseToJsonOrReturnNull(jsonSlurper, msg.getContent()));
+      try {
+        var result = compiledScript.eval(bindings);
+        if (result instanceof Boolean) {
+          return (Boolean) result;
+        }
+        return false;
+      } catch (Exception e) {
+        log.trace("Error executing filter script '{}' on message '{}' ", script, msg, e);
+        return false;
+      }
+    };
+  }
+
+  @Nullable
+  private static Object parseToJsonOrReturnNull(JsonSlurper parser, @Nullable String str) {
+    if (str == null) {
+      return null;
+    }
+    try {
+      return parser.parseText(str);
+    } catch (Exception e) {
+      return null;
+    }
+  }
+
+  private static synchronized GroovyScriptEngineImpl getGroovyEngine() {
+    // it is pretty heavy object, so initializing it on-demand
+    if (GROOVY_ENGINE == null) {
+      GROOVY_ENGINE = (GroovyScriptEngineImpl)
+          new ScriptEngineManager().getEngineByName("groovy");
+    }
+    return GROOVY_ENGINE;
+  }
+
+  private static CompiledScript compileScript(String script) {
+    try {
+      return getGroovyEngine().compile(script);
+    } catch (ScriptException e) {
+      throw new ValidationException("Script syntax error: " + e.getMessage());
+    }
+  }
+
+}

+ 19 - 11
kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/MessagesService.java

@@ -2,13 +2,14 @@ package com.provectus.kafka.ui.service;
 
 import com.provectus.kafka.ui.emitter.BackwardRecordEmitter;
 import com.provectus.kafka.ui.emitter.ForwardRecordEmitter;
+import com.provectus.kafka.ui.emitter.MessageFilters;
 import com.provectus.kafka.ui.exception.TopicNotFoundException;
 import com.provectus.kafka.ui.exception.ValidationException;
 import com.provectus.kafka.ui.model.ConsumerPosition;
 import com.provectus.kafka.ui.model.CreateTopicMessageDTO;
 import com.provectus.kafka.ui.model.KafkaCluster;
+import com.provectus.kafka.ui.model.MessageFilterTypeDTO;
 import com.provectus.kafka.ui.model.SeekDirectionDTO;
-import com.provectus.kafka.ui.model.TopicMessageDTO;
 import com.provectus.kafka.ui.model.TopicMessageEventDTO;
 import com.provectus.kafka.ui.serde.DeserializationService;
 import com.provectus.kafka.ui.serde.RecordSerDe;
@@ -20,10 +21,12 @@ import java.util.Map;
 import java.util.Optional;
 import java.util.Properties;
 import java.util.concurrent.CompletableFuture;
+import java.util.function.Predicate;
 import java.util.stream.Collectors;
 import javax.annotation.Nullable;
 import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.kafka.clients.admin.OffsetSpec;
 import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.kafka.clients.producer.ProducerConfig;
@@ -35,7 +38,6 @@ import org.apache.kafka.common.header.internals.RecordHeader;
 import org.apache.kafka.common.header.internals.RecordHeaders;
 import org.apache.kafka.common.serialization.ByteArraySerializer;
 import org.springframework.stereotype.Service;
-import org.springframework.util.StringUtils;
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.FluxSink;
 import reactor.core.publisher.Mono;
@@ -131,6 +133,7 @@ public class MessagesService {
 
   public Flux<TopicMessageEventDTO> loadMessages(KafkaCluster cluster, String topic,
                                                  ConsumerPosition consumerPosition, String query,
+                                                 MessageFilterTypeDTO filterQueryType,
                                                  Integer limit) {
     int recordsLimit = Optional.ofNullable(limit)
         .map(s -> Math.min(s, MAX_LOAD_RECORD_LIMIT))
@@ -153,21 +156,26 @@ public class MessagesService {
       );
     }
     return Flux.create(emitter)
-        .filter(m -> filterTopicMessage(m, query))
+        .filter(getMsgFilter(query, filterQueryType))
         .takeWhile(new FilterTopicMessageEvents(recordsLimit))
         .subscribeOn(Schedulers.elastic())
         .share();
   }
 
-  private boolean filterTopicMessage(TopicMessageEventDTO message, String query) {
-    if (StringUtils.isEmpty(query)
-        || !message.getType().equals(TopicMessageEventDTO.TypeEnum.MESSAGE)) {
-      return true;
+  private Predicate<TopicMessageEventDTO> getMsgFilter(String query, MessageFilterTypeDTO filterQueryType) {
+    if (StringUtils.isEmpty(query)) {
+      return evt -> true;
     }
-
-    final TopicMessageDTO msg = message.getMessage();
-    return (!StringUtils.isEmpty(msg.getKey()) && msg.getKey().contains(query))
-        || (!StringUtils.isEmpty(msg.getContent()) && msg.getContent().contains(query));
+    filterQueryType = Optional.ofNullable(filterQueryType)
+        .orElse(MessageFilterTypeDTO.STRING_CONTAINS);
+    var messageFilter = MessageFilters.createMsgFilter(query, filterQueryType);
+    return evt -> {
+      // we only apply filter for message events
+      if (evt.getType() == TopicMessageEventDTO.TypeEnum.MESSAGE) {
+        return messageFilter.test(evt.getMessage());
+      }
+      return true;
+    };
   }
 
 }

+ 173 - 0
kafka-ui-api/src/test/java/com/provectus/kafka/ui/emitter/MessageFiltersTest.java

@@ -0,0 +1,173 @@
+package com.provectus.kafka.ui.emitter;
+
+import static com.provectus.kafka.ui.emitter.MessageFilters.containsStringFilter;
+import static com.provectus.kafka.ui.emitter.MessageFilters.groovyScriptFilter;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import com.provectus.kafka.ui.exception.ValidationException;
+import com.provectus.kafka.ui.model.TopicMessageDTO;
+import java.time.OffsetDateTime;
+import java.time.temporal.ChronoUnit;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.function.Predicate;
+import org.apache.commons.lang3.RandomStringUtils;
+import org.junit.jupiter.api.Nested;
+import org.junit.jupiter.api.Test;
+
+class MessageFiltersTest {
+
+  @Nested
+  class StringContainsFilter {
+
+    Predicate<TopicMessageDTO> filter = containsStringFilter("abC");
+
+    @Test
+    void returnsTrueWhenStringContainedInKeyOrContentOrInBoth() {
+      assertTrue(
+          filter.test(msg().key("contains abCd").content("some str"))
+      );
+
+      assertTrue(
+          filter.test(msg().key("some str").content("contains abCd"))
+      );
+
+      assertTrue(
+          filter.test(msg().key("contains abCd").content("contains abCd"))
+      );
+    }
+
+    @Test
+    void returnsFalseOtherwise() {
+      assertFalse(
+          filter.test(msg().key("some str").content("some str"))
+      );
+
+      assertFalse(
+          filter.test(msg().key(null).content(null))
+      );
+
+      assertFalse(
+          filter.test(msg().key("aBc").content("AbC"))
+      );
+    }
+
+  }
+
+  @Nested
+  class GroovyScriptFilter {
+
+    @Test
+    void throwsExceptionOnInvalidGroovySyntax() {
+      assertThrows(ValidationException.class,
+          () -> groovyScriptFilter("this is invalid groovy syntax = 1"));
+    }
+
+    @Test
+    void canCheckPartition() {
+      var f = groovyScriptFilter("partition == 1");
+      assertTrue(f.test(msg().partition(1)));
+      assertFalse(f.test(msg().partition(0)));
+    }
+
+    @Test
+    void canCheckTimestampMs() {
+      var ts = OffsetDateTime.now();
+      var f = groovyScriptFilter("timestampMs == " + ts.toInstant().toEpochMilli());
+      assertTrue(f.test(msg().timestamp(ts)));
+      assertFalse(f.test(msg().timestamp(ts.plus(1L, ChronoUnit.SECONDS))));
+    }
+
+    @Test
+    void canCheckValueAsText() {
+      var f = groovyScriptFilter("valueAsText == 'some text'");
+      assertTrue(f.test(msg().content("some text")));
+      assertFalse(f.test(msg().content("some other text")));
+    }
+
+    @Test
+    void canCheckKeyAsText() {
+      var f = groovyScriptFilter("keyAsText == 'some text'");
+      assertTrue(f.test(msg().key("some text")));
+      assertFalse(f.test(msg().key("some other text")));
+    }
+
+    @Test
+    void canCheckKeyAsJsonObjectIfItCanBeParsedToJson() {
+      var f = groovyScriptFilter("key.name.first == 'user1'");
+      assertTrue(f.test(msg().key("{ \"name\" : { \"first\" : \"user1\" } }")));
+      assertFalse(f.test(msg().key("{ \"name\" : { \"first\" : \"user2\" } }")));
+    }
+
+    @Test
+    void keySetToNullIfKeyCantBeParsedToJson() {
+      var f = groovyScriptFilter("key == null");
+      assertTrue(f.test(msg().key("not json")));
+      assertFalse(f.test(msg().key("{ \"k\" : \"v\" }")));
+    }
+
+    @Test
+    void canCheckValueAsJsonObjectIfItCanBeParsedToJson() {
+      var f = groovyScriptFilter("value.name.first == 'user1'");
+      assertTrue(f.test(msg().content("{ \"name\" : { \"first\" : \"user1\" } }")));
+      assertFalse(f.test(msg().content("{ \"name\" : { \"first\" : \"user2\" } }")));
+    }
+
+    @Test
+    void valueSetToNullIfKeyCantBeParsedToJson() {
+      var f = groovyScriptFilter("value == null");
+      assertTrue(f.test(msg().content("not json")));
+      assertFalse(f.test(msg().content("{ \"k\" : \"v\" }")));
+    }
+
+    @Test
+    void canRunMultiStatementScripts() {
+      var f = groovyScriptFilter("def name = value.name.first \n return name == 'user1' ");
+      assertTrue(f.test(msg().content("{ \"name\" : { \"first\" : \"user1\" } }")));
+      assertFalse(f.test(msg().content("{ \"name\" : { \"first\" : \"user2\" } }")));
+
+      f = groovyScriptFilter("def name = value.name.first; return name == 'user1' ");
+      assertTrue(f.test(msg().content("{ \"name\" : { \"first\" : \"user1\" } }")));
+      assertFalse(f.test(msg().content("{ \"name\" : { \"first\" : \"user2\" } }")));
+
+      f = groovyScriptFilter("def name = value.name.first; name == 'user1' ");
+      assertTrue(f.test(msg().content("{ \"name\" : { \"first\" : \"user1\" } }")));
+      assertFalse(f.test(msg().content("{ \"name\" : { \"first\" : \"user2\" } }")));
+    }
+
+
+    @Test
+    void filterSpeedIsAtLeast10kPerSec() {
+      var f = groovyScriptFilter("value.name.first == 'user1' && keyAsText.startsWith('a') ");
+
+      List<TopicMessageDTO> toFilter = new ArrayList<>();
+      for (int i = 0; i < 5_000; i++) {
+        String name = i % 2 == 0 ? "user1" : RandomStringUtils.randomAlphabetic(10);
+        String randString = RandomStringUtils.randomAlphabetic(30);
+        String jsonContent = String.format(
+            "{ \"name\" : {  \"randomStr\": \"%s\", \"first\" : \"%s\"} }",
+            randString, name);
+        toFilter.add(msg().content(jsonContent).key(randString));
+      }
+      // first iteration for warmup
+      toFilter.stream().filter(f).count();
+
+      long before = System.currentTimeMillis();
+      long matched = toFilter.stream().filter(f).count();
+      long took = System.currentTimeMillis() - before;
+
+      assertThat(took).isLessThan(500);
+      assertThat(matched).isGreaterThan(0);
+    }
+  }
+
+  private TopicMessageDTO msg() {
+    return new TopicMessageDTO()
+        .timestamp(OffsetDateTime.now())
+        .partition(1);
+  }
+
+}

+ 1 - 0
kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/SendAndReadTests.java

@@ -547,6 +547,7 @@ public class SendAndReadTests extends AbstractBaseTest {
                     SeekDirectionDTO.FORWARD
                 ),
                 null,
+                null,
                 1
             ).filter(e -> e.getType().equals(TopicMessageEventDTO.TypeEnum.MESSAGE))
             .map(TopicMessageEventDTO::getMessage)

+ 10 - 0
kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml

@@ -500,6 +500,10 @@ paths:
           in: query
           schema:
             type: string
+        - name: filterQueryType
+          in: query
+          schema:
+            $ref:  "#/components/schemas/MessageFilterType"
         - name: seekDirection
           in: query
           schema:
@@ -2088,6 +2092,12 @@ components:
         - OFFSET
         - TIMESTAMP
 
+    MessageFilterType:
+      type: string
+      enum:
+        - STRING_CONTAINS
+        - GROOVY_SCRIPT
+
     SeekDirection:
       type: string
       enum:

+ 1 - 0
pom.xml

@@ -40,6 +40,7 @@
         <mockito.version>2.21.0</mockito.version>
         <assertj.version>3.19.0</assertj.version>
         <antlr4-maven-plugin.version>4.7.1</antlr4-maven-plugin.version>
+        <groovy.version>3.0.9</groovy.version>
 
         <frontend-generated-sources-directory>..//kafka-ui-react-app/src/generated-sources
         </frontend-generated-sources-directory>