package com.provectus.kafka.ui.util; import com.provectus.kafka.ui.model.ConsumerPosition; import com.provectus.kafka.ui.model.SeekType; import com.provectus.kafka.ui.service.ConsumingService; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.stream.Collectors; import lombok.extern.log4j.Log4j2; import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.utils.Bytes; @Log4j2 public abstract class OffsetsSeek { protected final String topic; protected final ConsumerPosition consumerPosition; public OffsetsSeek(String topic, ConsumerPosition consumerPosition) { this.topic = topic; this.consumerPosition = consumerPosition; } public WaitingOffsets assignAndSeek(Consumer consumer) { SeekType seekType = consumerPosition.getSeekType(); log.info("Positioning consumer for topic {} with {}", topic, consumerPosition); switch (seekType) { case OFFSET: assignAndSeekForOffset(consumer); break; case TIMESTAMP: assignAndSeekForTimestamp(consumer); break; case BEGINNING: assignAndSeekFromBeginning(consumer); break; default: throw new IllegalArgumentException("Unknown seekType: " + seekType); } log.info("Assignment: {}", consumer.assignment()); return new WaitingOffsets(topic, consumer); } protected List getRequestedPartitions(Consumer consumer) { Map partitionPositions = consumerPosition.getSeekTo(); return consumer.partitionsFor(topic).stream() .filter( p -> partitionPositions.isEmpty() || partitionPositions.containsKey(p.partition())) .map(p -> new TopicPartition(p.topic(), p.partition())) .collect(Collectors.toList()); } protected abstract void assignAndSeekFromBeginning(Consumer consumer); protected abstract void assignAndSeekForTimestamp(Consumer consumer); protected abstract void assignAndSeekForOffset(Consumer consumer); public static class WaitingOffsets { final Map offsets = new HashMap<>(); // partition number -> offset public WaitingOffsets(String topic, Consumer consumer) { var partitions = consumer.assignment().stream() .map(TopicPartition::partition) .collect(Collectors.toList()); ConsumingService.significantOffsets(consumer, topic, partitions) .forEach((tp, offset) -> offsets.put(tp.partition(), offset - 1)); } public void markPolled(ConsumerRecord rec) { Long waiting = offsets.get(rec.partition()); if (waiting != null && waiting <= rec.offset()) { offsets.remove(rec.partition()); } } public boolean endReached() { return offsets.isEmpty(); } } }