浏览代码

new tests

iliax 2 年之前
父节点
当前提交
a17afc0422

+ 12 - 10
kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/ForwardRecordEmitter.java

@@ -4,10 +4,10 @@ import com.provectus.kafka.ui.model.ConsumerPosition;
 import com.provectus.kafka.ui.model.TopicMessageEventDTO;
 import java.util.function.Supplier;
 import lombok.extern.slf4j.Slf4j;
-import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.ConsumerRecords;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.errors.InterruptException;
 import org.apache.kafka.common.utils.Bytes;
 import reactor.core.publisher.FluxSink;
@@ -49,15 +49,21 @@ public class ForwardRecordEmitter extends AbstractEmitter {
         sendPhase(sink, "Polling");
         ConsumerRecords<Bytes, Bytes> records = poll(sink, consumer);
         emptyPolls.count(records);
-        trackOffsetsAfterPoll(consumer);
-
         log.debug("{} records polled", records.count());
 
-        for (ConsumerRecord<Bytes, Bytes> msg : records) {
-          sendMessage(sink, msg);
+        for (TopicPartition tp : records.partitions()) {
+          for (ConsumerRecord<Bytes, Bytes> record : records.records(tp)) {
+            // checking if send limit reached - if so, we will skip some
+            // of already polled records (and we don't need to track their offsets) - they
+            // should be present on next page, polled by cursor
+            if (!isSendLimitReached()) {
+              sendMessage(sink, record);
+              cursor.trackOffset(tp, record.offset() + 1);
+            }
+          }
         }
       }
-      sendFinishStatsAndCompleteSink(sink, seekOperations.assignedPartitionsFullyPolled() ? null : cursor);
+      sendFinishStatsAndCompleteSink(sink, !isSendLimitReached() ? null : cursor);
       log.debug("Polling finished");
     } catch (InterruptException kafkaInterruptException) {
       log.debug("Polling finished due to thread interruption");
@@ -68,8 +74,4 @@ public class ForwardRecordEmitter extends AbstractEmitter {
     }
   }
 
-  private void trackOffsetsAfterPoll(Consumer<Bytes, Bytes> consumer) {
-    consumer.assignment().forEach(tp -> cursor.trackOffset(tp, consumer.position(tp)));
-  }
-
 }

+ 1 - 0
kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/MessagesService.java

@@ -56,6 +56,7 @@ public class MessagesService {
 
   private static final int DEFAULT_MAX_PAGE_SIZE = 500;
   private static final int DEFAULT_PAGE_SIZE = 100;
+
   // limiting UI messages rate to 20/sec in tailing mode
   private static final int TAILING_UI_MESSAGE_THROTTLE_RATE = 20;
 

+ 6 - 0
kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/PollingCursorsStorage.java

@@ -1,8 +1,10 @@
 package com.provectus.kafka.ui.service;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.cache.Cache;
 import com.google.common.cache.CacheBuilder;
 import com.provectus.kafka.ui.emitter.Cursor;
+import java.util.Map;
 import java.util.Optional;
 import org.apache.commons.lang3.RandomStringUtils;
 
@@ -22,4 +24,8 @@ public class PollingCursorsStorage {
     return id;
   }
 
+  @VisibleForTesting
+  public Map<String, Cursor> asMap() {
+    return cursorsCache.asMap();
+  }
 }

+ 181 - 0
kafka-ui-api/src/test/java/com/provectus/kafka/ui/emitter/CursorTest.java

@@ -0,0 +1,181 @@
+package com.provectus.kafka.ui.emitter;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import com.provectus.kafka.ui.AbstractIntegrationTest;
+import com.provectus.kafka.ui.model.ConsumerPosition;
+import com.provectus.kafka.ui.model.PollingModeDTO;
+import com.provectus.kafka.ui.model.TopicMessageEventDTO;
+import com.provectus.kafka.ui.producer.KafkaTestProducer;
+import com.provectus.kafka.ui.serde.api.Serde;
+import com.provectus.kafka.ui.serdes.ConsumerRecordDeserializer;
+import com.provectus.kafka.ui.serdes.PropertyResolverImpl;
+import com.provectus.kafka.ui.serdes.builtin.StringSerde;
+import com.provectus.kafka.ui.service.PollingCursorsStorage;
+import java.io.Serializable;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.UUID;
+import java.util.function.Consumer;
+import org.apache.kafka.clients.admin.NewTopic;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.serialization.BytesDeserializer;
+import org.apache.kafka.common.utils.Bytes;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+import reactor.core.publisher.Flux;
+import reactor.test.StepVerifier;
+
+
+public class CursorTest extends AbstractIntegrationTest {
+
+  static final String TOPIC = CursorTest.class.getSimpleName() + "_" + UUID.randomUUID();
+
+  static final int MSGS_IN_PARTITION = 20;
+  static final int PAGE_SIZE = 11;
+
+  @BeforeAll
+  static void setup() {
+    createTopic(new NewTopic(TOPIC, 1, (short) 1));
+    try (var producer = KafkaTestProducer.forKafka(kafka)) {
+      for (int i = 0; i < MSGS_IN_PARTITION; i++) {
+        producer.send(new ProducerRecord<>(TOPIC, "msg_" + i));
+      }
+    }
+  }
+
+  @AfterAll
+  static void cleanup() {
+    deleteTopic(TOPIC);
+  }
+
+  @Test
+  void backwardEmitter() {
+    var cursorsStorage = new PollingCursorsStorage();
+    var consumerPosition = new ConsumerPosition(PollingModeDTO.LATEST, TOPIC, List.of(), null, null);
+
+    var cursor = new Cursor.Tracking(
+        createRecordsDeserializer(),
+        consumerPosition,
+        m -> true,
+        PAGE_SIZE,
+        cursorsStorage::register
+    );
+
+    var emitter = createBackwardEmitter(consumerPosition, cursor);
+    verifyMessagesEmitted(emitter);
+    assertCursor(
+        cursorsStorage,
+        PollingModeDTO.TO_OFFSET,
+        offsets -> assertThat(offsets)
+            .hasSize(1)
+            .containsEntry(new TopicPartition(TOPIC, 0), 9L)
+    );
+  }
+
+  @Test
+  void forwardEmitter() {
+    var cursorsStorage = new PollingCursorsStorage();
+    var consumerPosition = new ConsumerPosition(PollingModeDTO.EARLIEST, TOPIC, List.of(), null, null);
+
+    var cursor = new Cursor.Tracking(
+        createRecordsDeserializer(),
+        consumerPosition,
+        m -> true,
+        PAGE_SIZE,
+        cursorsStorage::register
+    );
+
+    var emitter = createForwardEmitter(consumerPosition, cursor);
+    verifyMessagesEmitted(emitter);
+    assertCursor(
+        cursorsStorage,
+        PollingModeDTO.FROM_OFFSET,
+        offsets -> assertThat(offsets)
+            .hasSize(1)
+            .containsEntry(new TopicPartition(TOPIC, 0), 11L)
+    );
+  }
+
+  private void assertCursor(PollingCursorsStorage storage,
+                            PollingModeDTO expectedMode,
+                            Consumer<Map<TopicPartition, Long>> offsetsAssert) {
+    Cursor registeredCursor = storage.asMap().values().stream().findFirst().orElse(null);
+    assertThat(registeredCursor).isNotNull();
+    assertThat(registeredCursor.limit()).isEqualTo(PAGE_SIZE);
+    assertThat(registeredCursor.deserializer()).isNotNull();
+    assertThat(registeredCursor.filter()).isNotNull();
+
+    var cursorPosition = registeredCursor.consumerPosition();
+    assertThat(cursorPosition).isNotNull();
+    assertThat(cursorPosition.topic()).isEqualTo(TOPIC);
+    assertThat(cursorPosition.partitions()).isEqualTo(List.of());
+    assertThat(cursorPosition.pollingMode()).isEqualTo(expectedMode);
+
+    offsetsAssert.accept(cursorPosition.offsets().tpOffsets());
+  }
+
+  private void verifyMessagesEmitted(AbstractEmitter emitter) {
+    StepVerifier.create(
+            Flux.create(emitter)
+                .filter(e -> e.getType() == TopicMessageEventDTO.TypeEnum.MESSAGE)
+                .map(e -> e.getMessage().getContent())
+        )
+        .expectNextCount(PAGE_SIZE)
+        .verifyComplete();
+  }
+
+  private BackwardRecordEmitter createBackwardEmitter(ConsumerPosition position, Cursor.Tracking cursor) {
+    return new BackwardRecordEmitter(
+        this::createConsumer,
+        position,
+        PAGE_SIZE,
+        new MessagesProcessing(createRecordsDeserializer(), m -> true, PAGE_SIZE),
+        PollingSettings.createDefault(),
+        cursor
+    );
+  }
+
+  private ForwardRecordEmitter createForwardEmitter(ConsumerPosition position, Cursor.Tracking cursor) {
+    return new ForwardRecordEmitter(
+        this::createConsumer,
+        position,
+        new MessagesProcessing(createRecordsDeserializer(), m -> true, PAGE_SIZE),
+        PollingSettings.createDefault(),
+        cursor
+    );
+  }
+
+  private KafkaConsumer<Bytes, Bytes> createConsumer() {
+    final Map<String, ? extends Serializable> map = Map.of(
+        ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka.getBootstrapServers(),
+        ConsumerConfig.GROUP_ID_CONFIG, UUID.randomUUID().toString(),
+        ConsumerConfig.MAX_POLL_RECORDS_CONFIG, PAGE_SIZE - 1, // to check multiple polls
+        ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, BytesDeserializer.class,
+        ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, BytesDeserializer.class
+    );
+    Properties props = new Properties();
+    props.putAll(map);
+    return new KafkaConsumer<>(props);
+  }
+
+  private static ConsumerRecordDeserializer createRecordsDeserializer() {
+    Serde s = new StringSerde();
+    s.configure(PropertyResolverImpl.empty(), PropertyResolverImpl.empty(), PropertyResolverImpl.empty());
+    return new ConsumerRecordDeserializer(
+        StringSerde.name(),
+        s.deserializer(null, Serde.Target.KEY),
+        StringSerde.name(),
+        s.deserializer(null, Serde.Target.VALUE),
+        StringSerde.name(),
+        s.deserializer(null, Serde.Target.KEY),
+        s.deserializer(null, Serde.Target.VALUE)
+    );
+  }
+
+}

+ 88 - 20
kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/MessagesServiceTest.java

@@ -1,5 +1,7 @@
 package com.provectus.kafka.ui.service;
 
+import static org.assertj.core.api.Assertions.assertThat;
+
 import com.provectus.kafka.ui.AbstractIntegrationTest;
 import com.provectus.kafka.ui.exception.TopicNotFoundException;
 import com.provectus.kafka.ui.model.ConsumerPosition;
@@ -10,11 +12,17 @@ import com.provectus.kafka.ui.model.TopicMessageDTO;
 import com.provectus.kafka.ui.model.TopicMessageEventDTO;
 import com.provectus.kafka.ui.producer.KafkaTestProducer;
 import com.provectus.kafka.ui.serdes.builtin.StringSerde;
+import java.util.HashSet;
 import java.util.List;
+import java.util.Set;
 import java.util.UUID;
+import java.util.concurrent.atomic.AtomicReference;
 import org.apache.kafka.clients.admin.NewTopic;
+import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.CsvSource;
 import org.springframework.beans.factory.annotation.Autowired;
 import reactor.core.publisher.Flux;
 import reactor.test.StepVerifier;
@@ -29,6 +37,8 @@ class MessagesServiceTest extends AbstractIntegrationTest {
 
   KafkaCluster cluster;
 
+  Set<String> createdTopics = new HashSet<>();
+
   @BeforeEach
   void init() {
     cluster = applicationContext
@@ -37,6 +47,11 @@ class MessagesServiceTest extends AbstractIntegrationTest {
         .get();
   }
 
+  @AfterEach
+  void deleteCreatedTopics() {
+    createdTopics.forEach(MessagesServiceTest::deleteTopic);
+  }
+
   @Test
   void deleteTopicMessagesReturnsExceptionWhenTopicNotFound() {
     StepVerifier.create(messagesService.deleteTopicMessages(cluster, NON_EXISTING_TOPIC, List.of()))
@@ -64,31 +79,84 @@ class MessagesServiceTest extends AbstractIntegrationTest {
   @Test
   void maskingAppliedOnConfiguredClusters() throws Exception {
     String testTopic = MASKED_TOPICS_PREFIX + UUID.randomUUID();
+    createTopicWithCleanup(new NewTopic(testTopic, 1, (short) 1));
+
     try (var producer = KafkaTestProducer.forKafka(kafka)) {
-      createTopic(new NewTopic(testTopic, 1, (short) 1));
       producer.send(testTopic, "message1");
       producer.send(testTopic, "message2").get();
+    }
+
+    Flux<TopicMessageDTO> msgsFlux = messagesService.loadMessages(
+            cluster,
+            testTopic,
+            new ConsumerPosition(PollingModeDTO.EARLIEST, testTopic, List.of(), null, null),
+            null,
+            null,
+            100,
+            StringSerde.name(),
+            StringSerde.name()
+        ).filter(evt -> evt.getType() == TopicMessageEventDTO.TypeEnum.MESSAGE)
+        .map(TopicMessageEventDTO::getMessage);
+
+    // both messages should be masked
+    StepVerifier.create(msgsFlux)
+        .expectNextMatches(msg -> msg.getContent().equals("***"))
+        .expectNextMatches(msg -> msg.getContent().equals("***"))
+        .verifyComplete();
+  }
+
+  @ParameterizedTest
+  @CsvSource({"EARLIEST", "LATEST"})
+  void cursorIsRegisteredAfterPollingIsDoneAndCanBeUsedForNextPagePolling(PollingModeDTO mode) {
+    String testTopic = MessagesServiceTest.class.getSimpleName() + UUID.randomUUID();
+    createTopicWithCleanup(new NewTopic(testTopic, 5, (short) 1));
 
-      Flux<TopicMessageDTO> msgsFlux = messagesService.loadMessages(
-          cluster,
-          testTopic,
-          new ConsumerPosition(PollingModeDTO.EARLIEST, testTopic, List.of(), null, null),
-          null,
-          null,
-          100,
-          StringSerde.name(),
-          StringSerde.name()
-      ).filter(evt -> evt.getType() == TopicMessageEventDTO.TypeEnum.MESSAGE)
-          .map(TopicMessageEventDTO::getMessage);
-
-      // both messages should be masked
-      StepVerifier.create(msgsFlux)
-          .expectNextMatches(msg -> msg.getContent().equals("***"))
-          .expectNextMatches(msg -> msg.getContent().equals("***"))
-          .verifyComplete();
-    } finally {
-      deleteTopic(testTopic);
+    int msgsToGenerate = 100;
+    int pageSize = (msgsToGenerate / 2) + 1;
+
+    try (var producer = KafkaTestProducer.forKafka(kafka)) {
+      for (int i = 0; i < msgsToGenerate; i++) {
+        producer.send(testTopic, "message_" + i);
+      }
     }
+
+    var cursorIdCatcher = new AtomicReference<String>();
+    Flux<String> msgsFlux = messagesService.loadMessages(
+            cluster, testTopic,
+            new ConsumerPosition(mode, testTopic, List.of(), null, null),
+            null, null, pageSize, StringSerde.name(), StringSerde.name())
+        .doOnNext(evt -> {
+          if (evt.getType() == TopicMessageEventDTO.TypeEnum.DONE) {
+            assertThat(evt.getCursor()).isNotNull();
+            cursorIdCatcher.set(evt.getCursor().getId());
+          }
+        })
+        .filter(evt -> evt.getType() == TopicMessageEventDTO.TypeEnum.MESSAGE)
+        .map(evt -> evt.getMessage().getContent());
+
+    StepVerifier.create(msgsFlux)
+        .expectNextCount(pageSize)
+        .verifyComplete();
+
+    assertThat(cursorIdCatcher.get()).isNotNull();
+
+    Flux<String> remainingMsgs = messagesService.loadMessages(cluster, testTopic, cursorIdCatcher.get())
+        .doOnNext(evt -> {
+          if (evt.getType() == TopicMessageEventDTO.TypeEnum.DONE) {
+            assertThat(evt.getCursor()).isNull();
+          }
+        })
+        .filter(evt -> evt.getType() == TopicMessageEventDTO.TypeEnum.MESSAGE)
+        .map(evt -> evt.getMessage().getContent());
+
+    StepVerifier.create(remainingMsgs)
+        .expectNextCount(msgsToGenerate - pageSize)
+        .verifyComplete();
+  }
+
+  private void createTopicWithCleanup(NewTopic newTopic) {
+    createTopic(newTopic);
+    createdTopics.add(newTopic.name());
   }
 
 }