package com.provectus.kafka.ui.util; import com.provectus.kafka.ui.model.ConsumerPosition; import java.util.Collection; import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; import java.util.stream.Collectors; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.utils.Bytes; import reactor.util.function.Tuple2; import reactor.util.function.Tuples; @Slf4j public class OffsetsSeekBackward extends OffsetsSeek { private final int maxMessages; public OffsetsSeekBackward(String topic, ConsumerPosition consumerPosition, int maxMessages) { super(topic, consumerPosition); this.maxMessages = maxMessages; } public int msgsPerPartition(int partitionsSize) { return msgsPerPartition(maxMessages, partitionsSize); } public int msgsPerPartition(long awaitingMessages, int partitionsSize) { return (int) Math.ceil((double) awaitingMessages / partitionsSize); } protected Map offsetsFromPositions(Consumer consumer, List partitions) { return findOffsetsInt(consumer, consumerPosition.getSeekTo(), partitions); } protected Map offsetsFromBeginning(Consumer consumer, List partitions) { return findOffsets(consumer, Map.of(), partitions); } protected Map offsetsForTimestamp(Consumer consumer) { Map timestampsToSearch = consumerPosition.getSeekTo().entrySet().stream() .collect(Collectors.toMap( Map.Entry::getKey, Map.Entry::getValue )); Map offsetsForTimestamps = consumer.offsetsForTimes(timestampsToSearch) .entrySet().stream() .filter(e -> e.getValue() != null) .map(v -> Tuples.of(v.getKey(), v.getValue().offset())) .collect(Collectors.toMap(Tuple2::getT1, Tuple2::getT2)); if (offsetsForTimestamps.isEmpty()) { throw new IllegalArgumentException("No offsets were found for requested timestamps"); } log.info("Timestamps: {} to offsets: {}", timestampsToSearch, offsetsForTimestamps); return findOffsets(consumer, offsetsForTimestamps, offsetsForTimestamps.keySet()); } protected Map findOffsetsInt( Consumer consumer, Map seekTo, List partitions) { return findOffsets(consumer, seekTo, partitions); } protected Map findOffsets( Consumer consumer, Map seekTo, Collection partitions) { final Map beginningOffsets = consumer.beginningOffsets(partitions); final Map endOffsets = consumer.endOffsets(partitions); final Map seekMap = new HashMap<>(); final Set emptyPartitions = new HashSet<>(); for (Map.Entry entry : seekTo.entrySet()) { final Long endOffset = endOffsets.get(entry.getKey()); final Long beginningOffset = beginningOffsets.get(entry.getKey()); if (beginningOffset != null && endOffset != null && beginningOffset < endOffset && entry.getValue() > beginningOffset ) { final Long value; if (entry.getValue() > endOffset) { value = endOffset; } else { value = entry.getValue(); } seekMap.put(entry.getKey(), value); } else { emptyPartitions.add(entry.getKey()); } } Set waiting = new HashSet<>(partitions); waiting.removeAll(emptyPartitions); waiting.removeAll(seekMap.keySet()); for (TopicPartition topicPartition : waiting) { seekMap.put(topicPartition, endOffsets.get(topicPartition)); } return seekMap; } }