|
@@ -5,6 +5,7 @@ import lombok.extern.log4j.Log4j2;
|
|
|
|
|
|
import java.time.Duration;
|
|
|
import java.util.List;
|
|
|
+import java.util.Map;
|
|
|
import java.util.Optional;
|
|
|
import java.util.stream.Collectors;
|
|
|
|
|
@@ -15,10 +16,11 @@ import org.apache.kafka.common.TopicPartition;
|
|
|
import org.apache.kafka.common.utils.Bytes;
|
|
|
import org.springframework.stereotype.Service;
|
|
|
|
|
|
-import com.provectus.kafka.ui.cluster.model.InternalTopic;
|
|
|
+import com.provectus.kafka.ui.cluster.model.ConsumerPosition;
|
|
|
import com.provectus.kafka.ui.cluster.model.KafkaCluster;
|
|
|
import com.provectus.kafka.ui.cluster.util.ClusterUtil;
|
|
|
import com.provectus.kafka.ui.kafka.KafkaService;
|
|
|
+import com.provectus.kafka.ui.model.SeekType;
|
|
|
import com.provectus.kafka.ui.model.TopicMessage;
|
|
|
|
|
|
import reactor.core.publisher.Flux;
|
|
@@ -30,18 +32,21 @@ import reactor.core.scheduler.Schedulers;
|
|
|
@RequiredArgsConstructor
|
|
|
public class ConsumingService {
|
|
|
|
|
|
-
|
|
|
- // TODO: make this configurable
|
|
|
- private static final int BATCH_SIZE = 20;
|
|
|
+ private static final int MAX_RECORD_LIMIT = 100;
|
|
|
+ private static final int DEFAULT_RECORD_LIMIT = 20;
|
|
|
+ private static final int MAX_POLLS_COUNT = 30;
|
|
|
|
|
|
private final KafkaService kafkaService;
|
|
|
|
|
|
- public Flux<TopicMessage> loadMessages(KafkaCluster cluster, String topic) {
|
|
|
- RecordEmitter emitter = new RecordEmitter(kafkaService, cluster, topic);
|
|
|
+ public Flux<TopicMessage> loadMessages(KafkaCluster cluster, String topic, ConsumerPosition consumerPosition, Integer limit) {
|
|
|
+ int recordsLimit = Optional.ofNullable(limit)
|
|
|
+ .map(s -> Math.min(s, MAX_RECORD_LIMIT))
|
|
|
+ .orElse(DEFAULT_RECORD_LIMIT);
|
|
|
+ RecordEmitter emitter = new RecordEmitter(kafkaService, cluster, topic, consumerPosition);
|
|
|
return Flux.create(emitter::emit)
|
|
|
.subscribeOn(Schedulers.boundedElastic())
|
|
|
.map(ClusterUtil::mapToTopicMessage)
|
|
|
- .limitRequest(BATCH_SIZE);
|
|
|
+ .limitRequest(recordsLimit);
|
|
|
}
|
|
|
|
|
|
@RequiredArgsConstructor
|
|
@@ -52,11 +57,14 @@ public class ConsumingService {
|
|
|
private final KafkaService kafkaService;
|
|
|
private final KafkaCluster cluster;
|
|
|
private final String topic;
|
|
|
+ private final ConsumerPosition consumerPosition;
|
|
|
|
|
|
public void emit(FluxSink<ConsumerRecord<Bytes, Bytes>> sink) {
|
|
|
try (KafkaConsumer<Bytes, Bytes> consumer = kafkaService.createConsumer(cluster)) {
|
|
|
- assignPartitions(consumer, topic);
|
|
|
- while (!sink.isCancelled()) {
|
|
|
+ assignPartitions(consumer);
|
|
|
+ seekOffsets(consumer);
|
|
|
+ int pollsCount = 0;
|
|
|
+ while (!sink.isCancelled() || ++pollsCount > MAX_POLLS_COUNT) {
|
|
|
ConsumerRecords<Bytes, Bytes> records = consumer.poll(POLL_TIMEOUT_MS);
|
|
|
log.info("{} records polled", records.count());
|
|
|
records.iterator()
|
|
@@ -68,16 +76,50 @@ public class ConsumingService {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- private void assignPartitions(KafkaConsumer<Bytes, Bytes> consumer, String topicName) {
|
|
|
- List<TopicPartition> partitions = Optional.ofNullable(cluster.getTopics().get(topicName))
|
|
|
- .orElseThrow(() -> new IllegalArgumentException("Unknown topic: " + topicName))
|
|
|
+ private List<TopicPartition> getRequestedPartitions() {
|
|
|
+ Map<Integer, Long> partitionPositions = consumerPosition.getSeekTo();
|
|
|
+
|
|
|
+ return Optional.ofNullable(cluster.getTopics().get(topic))
|
|
|
+ .orElseThrow(() -> new IllegalArgumentException("Unknown topic: " + topic))
|
|
|
.getPartitions().stream()
|
|
|
- .map(partitionInfo -> new TopicPartition(topicName, partitionInfo.getPartition()))
|
|
|
+ .filter(internalPartition -> partitionPositions.isEmpty() || partitionPositions.containsKey(internalPartition.getPartition()))
|
|
|
+ .map(partitionInfo -> new TopicPartition(topic, partitionInfo.getPartition()))
|
|
|
.collect(Collectors.toList());
|
|
|
+ }
|
|
|
+
|
|
|
+ private void assignPartitions(KafkaConsumer<Bytes, Bytes> consumer) {
|
|
|
+ List<TopicPartition> partitions = getRequestedPartitions();
|
|
|
|
|
|
consumer.assign(partitions);
|
|
|
- // TODO: seek to requested offsets
|
|
|
- consumer.seekToBeginning(partitions);
|
|
|
+ }
|
|
|
+
|
|
|
+ private void seekOffsets(KafkaConsumer<Bytes, Bytes> consumer) {
|
|
|
+ SeekType seekType = consumerPosition.getSeekType();
|
|
|
+ switch (seekType) {
|
|
|
+ case OFFSET:
|
|
|
+ consumerPosition.getSeekTo().forEach((partition, offset) -> {
|
|
|
+ TopicPartition topicPartition = new TopicPartition(topic, partition);
|
|
|
+ consumer.seek(topicPartition, offset);
|
|
|
+ });
|
|
|
+ break;
|
|
|
+ case TIMESTAMP:
|
|
|
+ Map<TopicPartition, Long> timestampsToSearch = consumerPosition.getSeekTo().entrySet().stream()
|
|
|
+ .collect(Collectors.toMap(
|
|
|
+ partitionPosition -> new TopicPartition(topic, partitionPosition.getKey()),
|
|
|
+ Map.Entry::getValue
|
|
|
+ ));
|
|
|
+ consumer.offsetsForTimes(timestampsToSearch)
|
|
|
+ .forEach((topicPartition, offsetAndTimestamp) ->
|
|
|
+ consumer.seek(topicPartition, offsetAndTimestamp.offset())
|
|
|
+ );
|
|
|
+ break;
|
|
|
+ case BEGINNING:
|
|
|
+ List<TopicPartition> partitions = getRequestedPartitions();
|
|
|
+ consumer.seekToBeginning(partitions);
|
|
|
+ break;
|
|
|
+ default:
|
|
|
+ throw new IllegalArgumentException("Unknown seekType: " + seekType);
|
|
|
+ }
|
|
|
}
|
|
|
}
|
|
|
}
|