iliax 2 years ago
parent
commit
4be46ec520

+ 4 - 9
documentation/compose/kafka-ui.yaml

@@ -20,11 +20,6 @@ services:
       KAFKA_CLUSTERS_0_SCHEMAREGISTRY: http://schemaregistry0:8085
       KAFKA_CLUSTERS_0_KAFKACONNECT_0_NAME: first
       KAFKA_CLUSTERS_0_KAFKACONNECT_0_ADDRESS: http://kafka-connect0:8083
-      KAFKA_CLUSTERS_1_NAME: secondLocal
-      KAFKA_CLUSTERS_1_BOOTSTRAPSERVERS: kafka1:29092
-      KAFKA_CLUSTERS_1_METRICS_PORT: 9998
-      KAFKA_CLUSTERS_1_SCHEMAREGISTRY: http://schemaregistry1:8085
-      DYNAMIC_CONFIG_ENABLED: 'true'
 
   kafka0:
     image: confluentinc/cp-kafka:7.2.1
@@ -45,7 +40,7 @@ services:
       KAFKA_JMX_OPTS: -Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Djava.rmi.server.hostname=kafka0 -Dcom.sun.management.jmxremote.rmi.port=9997
       KAFKA_PROCESS_ROLES: 'broker,controller'
       KAFKA_NODE_ID: 1
-      KAFKA_CONTROLLER_QUORUM_VOTERS: '1@kafka0:29093'
+      KAFKA_CONTROLLER_QUORUM_VOTERS: '1@kafka0:29093,2@kafka1:29093'
       KAFKA_LISTENERS: 'PLAINTEXT://kafka0:29092,CONTROLLER://kafka0:29093,PLAINTEXT_HOST://0.0.0.0:9092'
       KAFKA_INTER_BROKER_LISTENER_NAME: 'PLAINTEXT'
       KAFKA_CONTROLLER_LISTENER_NAMES: 'CONTROLLER'
@@ -62,7 +57,7 @@ services:
       - "9093:9092"
       - "9998:9998"
     environment:
-      KAFKA_BROKER_ID: 1
+      KAFKA_BROKER_ID: 2
       KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: 'CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT'
       KAFKA_ADVERTISED_LISTENERS: 'PLAINTEXT://kafka1:29092,PLAINTEXT_HOST://localhost:9092'
       KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
@@ -72,8 +67,8 @@ services:
       KAFKA_JMX_PORT: 9998
       KAFKA_JMX_OPTS: -Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Djava.rmi.server.hostname=kafka0 -Dcom.sun.management.jmxremote.rmi.port=9998
       KAFKA_PROCESS_ROLES: 'broker,controller'
-      KAFKA_NODE_ID: 1
-      KAFKA_CONTROLLER_QUORUM_VOTERS: '1@kafka1:29093'
+      KAFKA_NODE_ID: 2
+      KAFKA_CONTROLLER_QUORUM_VOTERS: '2@kafka1:29093,1@kafka0:29093'
       KAFKA_LISTENERS: 'PLAINTEXT://kafka1:29092,CONTROLLER://kafka1:29093,PLAINTEXT_HOST://0.0.0.0:9092'
       KAFKA_INTER_BROKER_LISTENER_NAME: 'PLAINTEXT'
       KAFKA_CONTROLLER_LISTENER_NAMES: 'CONTROLLER'

+ 50 - 0
kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/MessagesController.java

@@ -11,7 +11,10 @@ import com.provectus.kafka.ui.api.MessagesApi;
 import com.provectus.kafka.ui.exception.ValidationException;
 import com.provectus.kafka.ui.model.ConsumerPosition;
 import com.provectus.kafka.ui.model.CreateTopicMessageDTO;
+import com.provectus.kafka.ui.model.MessageFilterIdDTO;
+import com.provectus.kafka.ui.model.MessageFilterRegistrationDTO;
 import com.provectus.kafka.ui.model.MessageFilterTypeDTO;
+import com.provectus.kafka.ui.model.PollingModeDTO;
 import com.provectus.kafka.ui.model.SeekDirectionDTO;
 import com.provectus.kafka.ui.model.SeekTypeDTO;
 import com.provectus.kafka.ui.model.SerdeUsageDTO;
@@ -70,6 +73,7 @@ public class MessagesController extends AbstractController implements MessagesAp
     );
   }
 
+  @Deprecated
   @Override
   public Mono<ResponseEntity<Flux<TopicMessageEventDTO>>> getTopicMessages(String clusterName,
                                                                            String topicName,
@@ -182,4 +186,50 @@ public class MessagesController extends AbstractController implements MessagesAp
             .map(ResponseEntity::ok)
     );
   }
+
+
+  @Override
+  public Mono<ResponseEntity<Flux<TopicMessageEventDTO>>> getTopicMessagesV2(String clusterName, String topicName,
+                                                                             PollingModeDTO mode,
+                                                                             @Nullable List<Integer> partitions,
+                                                                             @Nullable Integer limit,
+                                                                             @Nullable String query,
+                                                                             @Nullable String filterId,
+                                                                             @Nullable String offsetString,
+                                                                             @Nullable Long ts,
+                                                                             @Nullable String ks,
+                                                                             @Nullable String vs,
+                                                                             ServerWebExchange exchange) {
+    final Mono<Void> validateAccess = accessControlService.validateAccess(AccessContext.builder()
+        .cluster(clusterName)
+        .topic(topicName)
+        .topicActions(MESSAGES_READ)
+        .build());
+
+    int recordsLimit =
+        Optional.ofNullable(limit).map(s -> Math.min(s, MAX_LOAD_RECORD_LIMIT)).orElse(DEFAULT_LOAD_RECORD_LIMIT);
+
+    return validateAccess.then(
+        Mono.just(
+            ResponseEntity.ok(
+                messagesService.loadMessagesV2(
+                    getCluster(clusterName), topicName, positions, q, filterQueryType,
+                    recordsLimit, seekDirection, keySerde, valueSerde)
+            )
+        )
+    );
+  }
+
+   interface PollingMode {
+    static PollingMode create(PollingModeDTO mode, @Nullable String offsetString, @Nullable Long timestamp) {
+      return null;
+    }
+  }
+
+  @Override
+  public Mono<ResponseEntity<Flux<MessageFilterIdDTO>>> registerFilter(String clusterName, String topicName,
+                                                                       Mono<MessageFilterRegistrationDTO> messageFilterRegistrationDTO,
+                                                                       ServerWebExchange exchange) {
+    return null;
+  }
 }

+ 61 - 0
kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/MessagesService.java

@@ -12,6 +12,7 @@ import com.provectus.kafka.ui.model.ConsumerPosition;
 import com.provectus.kafka.ui.model.CreateTopicMessageDTO;
 import com.provectus.kafka.ui.model.KafkaCluster;
 import com.provectus.kafka.ui.model.MessageFilterTypeDTO;
+import com.provectus.kafka.ui.model.PollingModeDTO;
 import com.provectus.kafka.ui.model.SeekDirectionDTO;
 import com.provectus.kafka.ui.model.TopicMessageEventDTO;
 import com.provectus.kafka.ui.serde.api.Serde;
@@ -196,6 +197,66 @@ public class MessagesService {
         .map(throttleUiPublish(seekDirection));
   }
 
+  public Flux<TopicMessageEventDTO> loadMessagesV2(KafkaCluster cluster,
+                                                   String topic,
+                                                   PollingModeDTO pollingMode,
+                                                   @Nullable String query,
+                                                   @Nullable String filterId,
+                                                   int limit,
+                                                   @Nullable String keySerde,
+                                                   @Nullable String valueSerde) {
+    return withExistingTopic(cluster, topic)
+        .flux()
+        .publishOn(Schedulers.boundedElastic())
+        .flatMap(td -> loadMessagesImplV2(cluster, topic, consumerPosition, query,
+            filterQueryType, limit, seekDirection, keySerde, valueSerde));
+  }
+
+  private Flux<TopicMessageEventDTO> loadMessagesImplV2(KafkaCluster cluster,
+                                                        String topic,
+                                                        ConsumerPosition consumerPosition,
+                                                        @Nullable String query,
+                                                        MessageFilterTypeDTO filterQueryType,
+                                                        int limit,
+                                                        SeekDirectionDTO seekDirection,
+                                                        @Nullable String keySerde,
+                                                        @Nullable String valueSerde) {
+
+    java.util.function.Consumer<? super FluxSink<TopicMessageEventDTO>> emitter;
+    ConsumerRecordDeserializer recordDeserializer =
+        deserializationService.deserializerFor(cluster, topic, keySerde, valueSerde);
+    if (seekDirection.equals(SeekDirectionDTO.FORWARD)) {
+      emitter = new ForwardRecordEmitter(
+          () -> consumerGroupService.createConsumer(cluster),
+          consumerPosition,
+          recordDeserializer,
+          cluster.getThrottler().get()
+      );
+    } else if (seekDirection.equals(SeekDirectionDTO.BACKWARD)) {
+      emitter = new BackwardRecordEmitter(
+          () -> consumerGroupService.createConsumer(cluster),
+          consumerPosition,
+          limit,
+          recordDeserializer,
+          cluster.getThrottler().get()
+      );
+    } else {
+      emitter = new TailingEmitter(
+          () -> consumerGroupService.createConsumer(cluster),
+          consumerPosition,
+          recordDeserializer,
+          cluster.getThrottler().get()
+      );
+    }
+    MessageFilterStats filterStats = new MessageFilterStats();
+    return Flux.create(emitter)
+        .contextWrite(ctx -> ctx.put(MessageFilterStats.class, filterStats))
+        .filter(getMsgFilter(query, filterQueryType, filterStats))
+        .map(getDataMasker(cluster, topic))
+        .takeWhile(createTakeWhilePredicate(seekDirection, limit))
+        .map(throttleUiPublish(seekDirection));
+  }
+
   private Predicate<TopicMessageEventDTO> createTakeWhilePredicate(
       SeekDirectionDTO seekDirection, int limit) {
     return seekDirection == SeekDirectionDTO.TAILING

+ 12 - 8
kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml

@@ -797,6 +797,7 @@ paths:
         - name: m
           in: query
           description: Messages polling mode
+          required: true
           schema:
             $ref: "#/components/schemas/PollingMode"
         - name: p
@@ -821,18 +822,11 @@ paths:
           description: filter id, that was registered beforehand
           schema:
             type: string
-        - name: off
-          in: query
-          description: offset to read from / to
-          schema:
-            type: integer
-            format: int64
         - name: offs
           in: query
           description: partition offsets to read from / to. Format is "p1:off1,p2:off2,..."
           schema:
-            type: integer
-            format: int64
+            type: string
         - name: ts
           in: query
           description: timestamp (in ms) to read from / to
@@ -2576,6 +2570,7 @@ components:
             - MESSAGE
             - CONSUMING
             - DONE
+            - CURSOR
             - EMIT_THROTTLING
         message:
           $ref: "#/components/schemas/TopicMessage"
@@ -2583,6 +2578,8 @@ components:
           $ref: "#/components/schemas/TopicMessagePhase"
         consuming:
           $ref: "#/components/schemas/TopicMessageConsuming"
+        cursor:
+          $ref: "#/components/schemas/TopicMessageNextPageCursor"
 
     TopicMessagePhase:
       type: object
@@ -2612,6 +2609,13 @@ components:
         filterApplyErrors:
           type: integer
 
+    TopicMessageNextPageCursor:
+      type: object
+      properties:
+        offsetsString:
+          type: string
+        pollingMode:
+          $ref: "#/components/schemas/PollingMode"
 
     TopicMessage:
       type: object