فهرست منبع

Merge branch 'master' into issues/3959

Roman Zabaluev 2 سال پیش
والد
کامیت
30eded9cc8

+ 3 - 0
.gitignore

@@ -31,6 +31,9 @@ build/
 .vscode/
 /kafka-ui-api/app/node
 
+### SDKMAN ###
+.sdkmanrc
+
 .DS_Store
 *.code-workspace
 

+ 1 - 1
kafka-ui-api/pom.xml

@@ -311,7 +311,7 @@
             <plugin>
                 <groupId>org.apache.maven.plugins</groupId>
                 <artifactId>maven-checkstyle-plugin</artifactId>
-                <version>3.1.2</version>
+                <version>3.3.0</version>
                 <dependencies>
                     <dependency>
                         <groupId>com.puppycrawl.tools</groupId>

+ 14 - 0
kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/MessagesController.java

@@ -15,6 +15,8 @@ import com.provectus.kafka.ui.model.MessageFilterTypeDTO;
 import com.provectus.kafka.ui.model.SeekDirectionDTO;
 import com.provectus.kafka.ui.model.SeekTypeDTO;
 import com.provectus.kafka.ui.model.SerdeUsageDTO;
+import com.provectus.kafka.ui.model.SmartFilterTestExecutionDTO;
+import com.provectus.kafka.ui.model.SmartFilterTestExecutionResultDTO;
 import com.provectus.kafka.ui.model.TopicMessageEventDTO;
 import com.provectus.kafka.ui.model.TopicSerdeSuggestionDTO;
 import com.provectus.kafka.ui.model.rbac.AccessContext;
@@ -70,6 +72,14 @@ public class MessagesController extends AbstractController implements MessagesAp
     ).doOnEach(sig -> auditService.audit(context, sig));
   }
 
+  @Override
+  public Mono<ResponseEntity<SmartFilterTestExecutionResultDTO>> executeSmartFilterTest(
+      Mono<SmartFilterTestExecutionDTO> smartFilterTestExecutionDto, ServerWebExchange exchange) {
+    return smartFilterTestExecutionDto
+        .map(MessagesService::execSmartFilterTest)
+        .map(ResponseEntity::ok);
+  }
+
   @Override
   public Mono<ResponseEntity<Flux<TopicMessageEventDTO>>> getTopicMessages(String clusterName,
                                                                            String topicName,
@@ -188,4 +198,8 @@ public class MessagesController extends AbstractController implements MessagesAp
             .map(ResponseEntity::ok)
     );
   }
+
+
+
+
 }

+ 6 - 14
kafka-ui-api/src/main/java/com/provectus/kafka/ui/serdes/builtin/ProtobufFileSerde.java

@@ -50,7 +50,6 @@ import io.confluent.kafka.schemaregistry.protobuf.ProtobufSchemaUtils;
 import java.io.ByteArrayInputStream;
 import java.nio.file.Files;
 import java.nio.file.Path;
-import java.nio.file.Paths;
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.List;
@@ -204,17 +203,13 @@ public class ProtobufFileSerde implements BuiltInSerde {
                        Map<String, Descriptor> keyMessageDescriptorMap) {
 
     static boolean canBeAutoConfigured(PropertyResolver kafkaClusterProperties) {
-      Optional<String> protobufFile = kafkaClusterProperties.getProperty("protobufFile", String.class);
       Optional<List<String>> protobufFiles = kafkaClusterProperties.getListProperty("protobufFiles", String.class);
       Optional<String> protobufFilesDir = kafkaClusterProperties.getProperty("protobufFilesDir", String.class);
-      return protobufFilesDir.isPresent()
-          || protobufFile.isPresent()
-          || protobufFiles.filter(files -> !files.isEmpty()).isPresent();
+      return protobufFilesDir.isPresent() || protobufFiles.filter(files -> !files.isEmpty()).isPresent();
     }
 
     static Configuration create(PropertyResolver properties) {
       var protobufSchemas = loadSchemas(
-          properties.getProperty("protobufFile", String.class),
           properties.getListProperty("protobufFiles", String.class),
           properties.getProperty("protobufFilesDir", String.class)
       );
@@ -272,12 +267,11 @@ public class ProtobufFileSerde implements BuiltInSerde {
     }
 
     @VisibleForTesting
-    static Map<Path, ProtobufSchema> loadSchemas(Optional<String> protobufFile,
-                                                 Optional<List<String>> protobufFiles,
+    static Map<Path, ProtobufSchema> loadSchemas(Optional<List<String>> protobufFiles,
                                                  Optional<String> protobufFilesDir) {
       if (protobufFilesDir.isPresent()) {
-        if (protobufFile.isPresent() || protobufFiles.isPresent()) {
-          log.warn("protobufFile and protobufFiles properties will be ignored, since protobufFilesDir provided");
+        if (protobufFiles.isPresent()) {
+          log.warn("protobufFiles properties will be ignored, since protobufFilesDir provided");
         }
         List<ProtoFile> loadedFiles = new ProtoSchemaLoader(protobufFilesDir.get()).load();
         Map<String, ProtoFileElement> allPaths = loadedFiles.stream()
@@ -288,10 +282,8 @@ public class ProtobufFileSerde implements BuiltInSerde {
                 f -> new ProtobufSchema(f.toElement(), List.of(), allPaths)));
       }
       //Supporting for backward-compatibility. Normally, protobufFilesDir setting should be used
-      return Stream.concat(
-              protobufFile.stream(),
-              protobufFiles.stream().flatMap(Collection::stream)
-          )
+      return protobufFiles.stream()
+          .flatMap(Collection::stream)
           .distinct()
           .map(Path::of)
           .collect(Collectors.toMap(path -> path, path -> new ProtobufSchema(readFileAsString(path))));

+ 39 - 0
kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/MessagesService.java

@@ -14,11 +14,16 @@ import com.provectus.kafka.ui.model.CreateTopicMessageDTO;
 import com.provectus.kafka.ui.model.KafkaCluster;
 import com.provectus.kafka.ui.model.MessageFilterTypeDTO;
 import com.provectus.kafka.ui.model.SeekDirectionDTO;
+import com.provectus.kafka.ui.model.SmartFilterTestExecutionDTO;
+import com.provectus.kafka.ui.model.SmartFilterTestExecutionResultDTO;
 import com.provectus.kafka.ui.model.TopicMessageDTO;
 import com.provectus.kafka.ui.model.TopicMessageEventDTO;
 import com.provectus.kafka.ui.serde.api.Serde;
 import com.provectus.kafka.ui.serdes.ProducerRecordCreator;
 import com.provectus.kafka.ui.util.SslPropertiesUtil;
+import java.time.Instant;
+import java.time.OffsetDateTime;
+import java.time.ZoneOffset;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
@@ -81,6 +86,40 @@ public class MessagesService {
         .switchIfEmpty(Mono.error(new TopicNotFoundException()));
   }
 
+  public static SmartFilterTestExecutionResultDTO execSmartFilterTest(SmartFilterTestExecutionDTO execData) {
+    Predicate<TopicMessageDTO> predicate;
+    try {
+      predicate = MessageFilters.createMsgFilter(
+          execData.getFilterCode(),
+          MessageFilterTypeDTO.GROOVY_SCRIPT
+      );
+    } catch (Exception e) {
+      log.info("Smart filter '{}' compilation error", execData.getFilterCode(), e);
+      return new SmartFilterTestExecutionResultDTO()
+          .error("Compilation error : " + e.getMessage());
+    }
+    try {
+      var result = predicate.test(
+          new TopicMessageDTO()
+              .key(execData.getKey())
+              .content(execData.getValue())
+              .headers(execData.getHeaders())
+              .offset(execData.getOffset())
+              .partition(execData.getPartition())
+              .timestamp(
+                  Optional.ofNullable(execData.getTimestampMs())
+                      .map(ts -> OffsetDateTime.ofInstant(Instant.ofEpochMilli(ts), ZoneOffset.UTC))
+                      .orElse(null))
+      );
+      return new SmartFilterTestExecutionResultDTO()
+          .result(result);
+    } catch (Exception e) {
+      log.info("Smart filter {} execution error", execData, e);
+      return new SmartFilterTestExecutionResultDTO()
+          .error("Execution error : " + e.getMessage());
+    }
+  }
+
   public Mono<Void> deleteTopicMessages(KafkaCluster cluster, String topicName,
                                         List<Integer> partitionsToInclude) {
     return withExistingTopic(cluster, topicName)

+ 1 - 14
kafka-ui-api/src/test/java/com/provectus/kafka/ui/serdes/builtin/ProtobufFileSerdeTest.java

@@ -47,7 +47,6 @@ class ProtobufFileSerdeTest {
   @BeforeEach
   void setUp() throws Exception {
     Map<Path, ProtobufSchema> files = ProtobufFileSerde.Configuration.loadSchemas(
-        Optional.empty(),
         Optional.empty(),
         Optional.of(protoFilesDir())
     );
@@ -107,15 +106,6 @@ class ProtobufFileSerdeTest {
           .isFalse();
     }
 
-    @Test
-    void canBeAutoConfiguredReturnsTrueIfNoProtoFileHasBeenProvided() {
-      PropertyResolver resolver = mock(PropertyResolver.class);
-      when(resolver.getProperty("protobufFile", String.class))
-          .thenReturn(Optional.of("file.proto"));
-      assertThat(Configuration.canBeAutoConfigured(resolver))
-          .isTrue();
-    }
-
     @Test
     void canBeAutoConfiguredReturnsTrueIfProtoFilesHasBeenProvided() {
       PropertyResolver resolver = mock(PropertyResolver.class);
@@ -193,13 +183,10 @@ class ProtobufFileSerdeTest {
     @Test
     void createConfigureFillsDescriptorMappingsWhenProtoFilesListProvided() throws Exception {
       PropertyResolver resolver = mock(PropertyResolver.class);
-      when(resolver.getProperty("protobufFile", String.class))
-          .thenReturn(Optional.of(
-              ResourceUtils.getFile("classpath:protobuf-serde/sensor.proto").getPath()));
-
       when(resolver.getListProperty("protobufFiles", String.class))
           .thenReturn(Optional.of(
               List.of(
+                  ResourceUtils.getFile("classpath:protobuf-serde/sensor.proto").getPath(),
                   ResourceUtils.getFile("classpath:protobuf-serde/address-book.proto").getPath())));
 
       when(resolver.getProperty("protobufMessageName", String.class))

+ 41 - 0
kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/MessagesServiceTest.java

@@ -1,5 +1,8 @@
 package com.provectus.kafka.ui.service;
 
+import static com.provectus.kafka.ui.service.MessagesService.execSmartFilterTest;
+import static org.assertj.core.api.Assertions.assertThat;
+
 import com.provectus.kafka.ui.AbstractIntegrationTest;
 import com.provectus.kafka.ui.exception.TopicNotFoundException;
 import com.provectus.kafka.ui.model.ConsumerPosition;
@@ -7,11 +10,13 @@ import com.provectus.kafka.ui.model.CreateTopicMessageDTO;
 import com.provectus.kafka.ui.model.KafkaCluster;
 import com.provectus.kafka.ui.model.SeekDirectionDTO;
 import com.provectus.kafka.ui.model.SeekTypeDTO;
+import com.provectus.kafka.ui.model.SmartFilterTestExecutionDTO;
 import com.provectus.kafka.ui.model.TopicMessageDTO;
 import com.provectus.kafka.ui.model.TopicMessageEventDTO;
 import com.provectus.kafka.ui.producer.KafkaTestProducer;
 import com.provectus.kafka.ui.serdes.builtin.StringSerde;
 import java.util.List;
+import java.util.Map;
 import java.util.UUID;
 import org.apache.kafka.clients.admin.NewTopic;
 import org.junit.jupiter.api.BeforeEach;
@@ -91,4 +96,40 @@ class MessagesServiceTest extends AbstractIntegrationTest {
     }
   }
 
+  @Test
+  void execSmartFilterTestReturnsExecutionResult() {
+    var params = new SmartFilterTestExecutionDTO()
+        .filterCode("key != null && value != null && headers != null && timestampMs != null && offset != null")
+        .key("1234")
+        .value("{ \"some\" : \"value\" } ")
+        .headers(Map.of("h1", "hv1"))
+        .offset(12345L)
+        .timestampMs(System.currentTimeMillis())
+        .partition(1);
+    assertThat(execSmartFilterTest(params).getResult()).isTrue();
+
+    params.setFilterCode("return false");
+    assertThat(execSmartFilterTest(params).getResult()).isFalse();
+  }
+
+  @Test
+  void execSmartFilterTestReturnsErrorOnFilterApplyError() {
+    var result = execSmartFilterTest(
+        new SmartFilterTestExecutionDTO()
+            .filterCode("return 1/0")
+    );
+    assertThat(result.getResult()).isNull();
+    assertThat(result.getError()).containsIgnoringCase("execution error");
+  }
+
+  @Test
+  void execSmartFilterTestReturnsErrorOnFilterCompilationError() {
+    var result = execSmartFilterTest(
+        new SmartFilterTestExecutionDTO()
+            .filterCode("this is invalid groovy syntax = 1")
+    );
+    assertThat(result.getResult()).isNull();
+    assertThat(result.getError()).containsIgnoringCase("Compilation error");
+  }
+
 }

+ 50 - 0
kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml

@@ -625,6 +625,25 @@ paths:
               schema:
                 $ref: '#/components/schemas/TopicSerdeSuggestion'
 
+  /api/smartfilters/testexecutions:
+    put:
+      tags:
+        - Messages
+      summary: executeSmartFilterTest
+      operationId: executeSmartFilterTest
+      requestBody:
+        content:
+          application/json:
+            schema:
+              $ref: '#/components/schemas/SmartFilterTestExecution'
+      responses:
+        200:
+          description: OK
+          content:
+            application/json:
+              schema:
+                $ref: '#/components/schemas/SmartFilterTestExecutionResult'
+
 
   /api/clusters/{clusterName}/topics/{topicName}/messages:
     get:
@@ -2584,6 +2603,37 @@ components:
           items:
             $ref: '#/components/schemas/ConsumerGroup'
 
+    SmartFilterTestExecution:
+      type: object
+      required: [filterCode]
+      properties:
+        filterCode:
+          type: string
+        key:
+          type: string
+        value:
+          type: string
+        headers:
+          type: object
+          additionalProperties:
+            type: string
+        partition:
+          type: integer
+        offset:
+          type: integer
+          format: int64
+        timestampMs:
+          type: integer
+          format: int64
+
+    SmartFilterTestExecutionResult:
+      type: object
+      properties:
+        result:
+          type: boolean
+        error:
+          type: string
+
     CreateTopicMessage:
       type: object
       properties:

+ 1 - 1
kafka-ui-e2e-checks/pom.xml

@@ -267,7 +267,7 @@
                     <plugin>
                         <groupId>org.apache.maven.plugins</groupId>
                         <artifactId>maven-checkstyle-plugin</artifactId>
-                        <version>3.1.2</version>
+                        <version>3.3.0</version>
                         <dependencies>
                             <dependency>
                                 <groupId>com.puppycrawl.tools</groupId>