This commit is contained in:
iliax 2023-08-03 17:30:42 +04:00
parent 69ebd3d52b
commit 0a8d3f1ef4
2 changed files with 32 additions and 14 deletions

View file

@ -87,26 +87,21 @@ public class BackwardRecordEmitter
private List<ConsumerRecord<Bytes, Bytes>> partitionPollIteration(
TopicPartition tp,
long fromOffset,
long toOffset,
long fromOffset, //inclusive
long toOffset, //exclusive
EnhancedConsumer consumer,
FluxSink<TopicMessageEventDTO> sink
) {
consumer.assign(Collections.singleton(tp));
consumer.seek(tp, fromOffset);
sendPhase(sink, String.format("Polling partition: %s from offset %s", tp, fromOffset));
int desiredMsgsToPoll = (int) (toOffset - fromOffset);
var recordsToSend = new ArrayList<ConsumerRecord<Bytes, Bytes>>();
EmptyPollsCounter emptyPolls = pollingSettings.createEmptyPollsCounter();
while (!sink.isCancelled()
&& !sendLimitReached()
&& recordsToSend.size() < desiredMsgsToPoll
&& !emptyPolls.noDataEmptyPollsReached()) {
var polledRecords = poll(sink, consumer, pollingSettings.getPartitionPollTimeout());
emptyPolls.count(polledRecords.count());
&& consumer.position(tp) < toOffset) {
var polledRecords = poll(sink, consumer, pollingSettings.getPartitionPollTimeout());
log.debug("{} records polled from {}", polledRecords.count(), tp);
var filteredRecords = polledRecords.records(tp).stream()

View file

@ -1,11 +1,21 @@
package com.provectus.kafka.ui.emitter;
import com.provectus.kafka.ui.model.ConsumerPosition;
import com.provectus.kafka.ui.model.KafkaCluster;
import com.provectus.kafka.ui.model.TopicMessageEventDTO;
import com.provectus.kafka.ui.util.SslPropertiesUtil;
import java.util.Properties;
import java.util.Random;
import java.util.function.Supplier;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.RandomStringUtils;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.errors.InterruptException;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.utils.Bytes;
import reactor.core.publisher.FluxSink;
@ -35,15 +45,11 @@ public class ForwardRecordEmitter
var seekOperations = SeekOperations.create(consumer, position);
seekOperations.assignAndSeekNonEmptyPartitions();
EmptyPollsCounter emptyPolls = pollingSettings.createEmptyPollsCounter();
while (!sink.isCancelled()
&& !sendLimitReached()
&& !seekOperations.assignedPartitionsFullyPolled()
&& !emptyPolls.noDataEmptyPollsReached()) {
&& !seekOperations.assignedPartitionsFullyPolled()) {
sendPhase(sink, "Polling");
var records = poll(sink, consumer);
emptyPolls.count(records.count());
log.debug("{} records polled", records.count());
@ -61,4 +67,21 @@ public class ForwardRecordEmitter
sink.error(e);
}
}
// public static void main(String[] args) {
// String topic = "test";
//
// Properties properties = new Properties();
// properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
// properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
// properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
// try (var producer = new KafkaProducer<>(properties)) {
// for (int i = 0; i < 10; i++) {
// for (int j = 0; j < 30; j++) {
// producer.send(new ProducerRecord<>(topic, (i + 1) + "", "j=" + j + "-" + RandomStringUtils.random(5)));
// }
// }
// }
// }
}