소스 검색

master merge

iliax 1 년 전
부모
커밋
3b8548aa30

+ 1 - 0
kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/AbstractEmitter.java

@@ -1,6 +1,7 @@
 package com.provectus.kafka.ui.emitter;
 
 import com.provectus.kafka.ui.model.TopicMessageEventDTO;
+import jakarta.annotation.Nullable;
 import java.time.Duration;
 import java.time.Instant;
 import javax.annotation.Nullable;

+ 0 - 2
kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/ForwardRecordEmitter.java

@@ -5,8 +5,6 @@ import com.provectus.kafka.ui.model.TopicMessageEventDTO;
 import java.util.function.Supplier;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.apache.kafka.clients.consumer.ConsumerRecords;
-import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.errors.InterruptException;
 import org.apache.kafka.common.utils.Bytes;

+ 5 - 0
kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/PolledRecords.java

@@ -3,6 +3,7 @@ package com.provectus.kafka.ui.emitter;
 import java.time.Duration;
 import java.util.Iterator;
 import java.util.List;
+import java.util.Set;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.ConsumerRecords;
 import org.apache.kafka.common.TopicPartition;
@@ -32,6 +33,10 @@ public record PolledRecords(int count,
     return records.iterator();
   }
 
+  public Set<TopicPartition> partitions() {
+    return records.partitions();
+  }
+
   private static int calculatePolledRecSize(Iterable<ConsumerRecord<Bytes, Bytes>> recs) {
     int polledBytes = 0;
     for (ConsumerRecord<Bytes, Bytes> rec : recs) {

+ 7 - 9
kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/TailingEmitter.java

@@ -2,25 +2,23 @@ package com.provectus.kafka.ui.emitter;
 
 import com.provectus.kafka.ui.model.ConsumerPosition;
 import com.provectus.kafka.ui.model.TopicMessageEventDTO;
-import com.provectus.kafka.ui.serdes.ConsumerRecordDeserializer;
 import java.util.function.Supplier;
 import lombok.extern.slf4j.Slf4j;
-import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.common.errors.InterruptException;
-import org.apache.kafka.common.utils.Bytes;
 import reactor.core.publisher.FluxSink;
 
 @Slf4j
-public class TailingEmitter extends AbstractEmitter {
+public class TailingEmitter extends AbstractEmitter
+    implements java.util.function.Consumer<FluxSink<TopicMessageEventDTO>> {
 
-  private final Supplier<KafkaConsumer<Bytes, Bytes>> consumerSupplier;
+  private final Supplier<EnhancedConsumer> consumerSupplier;
   private final ConsumerPosition consumerPosition;
 
-  public TailingEmitter(Supplier<KafkaConsumer<Bytes, Bytes>> consumerSupplier,
+  public TailingEmitter(Supplier<EnhancedConsumer> consumerSupplier,
                         ConsumerPosition consumerPosition,
-                        ConsumerRecordDeserializer recordDeserializer,
+                        MessagesProcessing messagesProcessing,
                         PollingSettings pollingSettings) {
-    super(recordDeserializer, pollingSettings);
+    super(messagesProcessing, pollingSettings);
     this.consumerSupplier = consumerSupplier;
     this.consumerPosition = consumerPosition;
   }
@@ -28,7 +26,7 @@ public class TailingEmitter extends AbstractEmitter {
   @Override
   public void accept(FluxSink<TopicMessageEventDTO> sink) {
     log.debug("Starting tailing polling for {}", consumerPosition);
-    try (KafkaConsumer<Bytes, Bytes> consumer = consumerSupplier.get()) {
+    try (EnhancedConsumer consumer = consumerSupplier.get()) {
       SeekOperations.create(consumer, consumerPosition)
           .assignAndSeek();
       while (!sink.isCancelled()) {

+ 6 - 14
kafka-ui-api/src/test/java/com/provectus/kafka/ui/emitter/CursorTest.java

@@ -12,7 +12,7 @@ import com.provectus.kafka.ui.serdes.ConsumerRecordDeserializer;
 import com.provectus.kafka.ui.serdes.PropertyResolverImpl;
 import com.provectus.kafka.ui.serdes.builtin.StringSerde;
 import com.provectus.kafka.ui.service.PollingCursorsStorage;
-import java.io.Serializable;
+import com.provectus.kafka.ui.util.ApplicationMetrics;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
@@ -20,11 +20,8 @@ import java.util.UUID;
 import java.util.function.Consumer;
 import org.apache.kafka.clients.admin.NewTopic;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
-import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.common.serialization.BytesDeserializer;
-import org.apache.kafka.common.utils.Bytes;
 import org.junit.jupiter.api.AfterAll;
 import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.Test;
@@ -172,17 +169,12 @@ class CursorTest extends AbstractIntegrationTest {
     );
   }
 
-  private KafkaConsumer<Bytes, Bytes> createConsumer() {
-    final Map<String, ? extends Serializable> map = Map.of(
-        ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka.getBootstrapServers(),
-        ConsumerConfig.GROUP_ID_CONFIG, UUID.randomUUID().toString(),
-        ConsumerConfig.MAX_POLL_RECORDS_CONFIG, PAGE_SIZE - 1, // to check multiple polls
-        ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, BytesDeserializer.class,
-        ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, BytesDeserializer.class
-    );
+  private EnhancedConsumer createConsumer() {
     Properties props = new Properties();
-    props.putAll(map);
-    return new KafkaConsumer<>(props);
+    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka.getBootstrapServers());
+    props.put(ConsumerConfig.GROUP_ID_CONFIG, UUID.randomUUID().toString());
+    props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, PAGE_SIZE - 1); // to check multiple polls
+    return new EnhancedConsumer(props, PollingThrottler.noop(), ApplicationMetrics.noop());
   }
 
   private static ConsumerRecordDeserializer createRecordsDeserializer() {