package com.provectus.kafka.ui.util; import com.provectus.kafka.ui.model.ConsumerPosition; import com.provectus.kafka.ui.model.SeekType; import java.util.Collection; import java.util.List; import java.util.Map; import java.util.stream.Collectors; import lombok.extern.log4j.Log4j2; import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.utils.Bytes; import reactor.util.function.Tuple2; import reactor.util.function.Tuples; @Log4j2 public abstract class OffsetsSeek { protected final String topic; protected final ConsumerPosition consumerPosition; protected OffsetsSeek(String topic, ConsumerPosition consumerPosition) { this.topic = topic; this.consumerPosition = consumerPosition; } public ConsumerPosition getConsumerPosition() { return consumerPosition; } public Map getPartitionsOffsets(Consumer consumer) { SeekType seekType = consumerPosition.getSeekType(); List partitions = getRequestedPartitions(consumer); log.info("Positioning consumer for topic {} with {}", topic, consumerPosition); Map offsets; switch (seekType) { case OFFSET: offsets = offsetsFromPositions(consumer, partitions); break; case TIMESTAMP: offsets = offsetsForTimestamp(consumer); break; case BEGINNING: offsets = offsetsFromBeginning(consumer, partitions); break; default: throw new IllegalArgumentException("Unknown seekType: " + seekType); } return offsets; } public WaitingOffsets waitingOffsets(Consumer consumer, Collection partitions) { return new WaitingOffsets(topic, consumer, partitions); } public WaitingOffsets assignAndSeek(Consumer consumer) { final Map partitionsOffsets = getPartitionsOffsets(consumer); consumer.assign(partitionsOffsets.keySet()); partitionsOffsets.forEach(consumer::seek); log.info("Assignment: {}", consumer.assignment()); return waitingOffsets(consumer, partitionsOffsets.keySet()); } public List getRequestedPartitions(Consumer consumer) { Map partitionPositions = consumerPosition.getSeekTo(); return consumer.partitionsFor(topic).stream() .filter( p -> partitionPositions.isEmpty() || partitionPositions.containsKey(new TopicPartition(p.topic(), p.partition())) ).map(p -> new TopicPartition(p.topic(), p.partition())) .collect(Collectors.toList()); } protected abstract Map offsetsFromBeginning( Consumer consumer, List partitions); protected abstract Map offsetsForTimestamp( Consumer consumer); protected abstract Map offsetsFromPositions( Consumer consumer, List partitions); public static class WaitingOffsets { private final Map endOffsets; // partition number -> offset private final Map beginOffsets; // partition number -> offset private final String topic; public WaitingOffsets(String topic, Consumer consumer, Collection partitions) { this.topic = topic; var allBeginningOffsets = consumer.beginningOffsets(partitions); var allEndOffsets = consumer.endOffsets(partitions); this.endOffsets = allEndOffsets.entrySet().stream() .filter(entry -> !allBeginningOffsets.get(entry.getKey()).equals(entry.getValue())) .map(e -> Tuples.of(e.getKey().partition(), e.getValue() - 1)) .collect(Collectors.toMap(Tuple2::getT1, Tuple2::getT2)); this.beginOffsets = this.endOffsets.keySet().stream() .map(p -> Tuples.of(p, allBeginningOffsets.get(new TopicPartition(topic, p)))) .collect(Collectors.toMap(Tuple2::getT1, Tuple2::getT2)); } public List topicPartitions() { return this.endOffsets.keySet().stream() .map(p -> new TopicPartition(topic, p)) .collect(Collectors.toList()); } public void markPolled(int partition) { endOffsets.remove(partition); beginOffsets.remove(partition); } public void markPolled(ConsumerRecord rec) { Long endWaiting = endOffsets.get(rec.partition()); if (endWaiting != null && endWaiting <= rec.offset()) { endOffsets.remove(rec.partition()); } Long beginWaiting = beginOffsets.get(rec.partition()); if (beginWaiting != null && beginWaiting >= rec.offset()) { beginOffsets.remove(rec.partition()); } } public boolean endReached() { return endOffsets.isEmpty(); } public boolean beginReached() { return beginOffsets.isEmpty(); } public Map getEndOffsets() { return endOffsets; } public Map getBeginOffsets() { return beginOffsets; } } }