OffsetsSeekBackward.java 4.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120
  1. package com.provectus.kafka.ui.util;
  2. import com.provectus.kafka.ui.model.ConsumerPosition;
  3. import java.util.Collection;
  4. import java.util.HashMap;
  5. import java.util.HashSet;
  6. import java.util.List;
  7. import java.util.Map;
  8. import java.util.Set;
  9. import java.util.stream.Collectors;
  10. import lombok.extern.slf4j.Slf4j;
  11. import org.apache.kafka.clients.consumer.Consumer;
  12. import org.apache.kafka.common.TopicPartition;
  13. import org.apache.kafka.common.utils.Bytes;
  14. import reactor.util.function.Tuple2;
  15. import reactor.util.function.Tuples;
  16. @Slf4j
  17. public class OffsetsSeekBackward extends OffsetsSeek {
  18. private final int maxMessages;
  19. public OffsetsSeekBackward(String topic,
  20. ConsumerPosition consumerPosition, int maxMessages) {
  21. super(topic, consumerPosition);
  22. this.maxMessages = maxMessages;
  23. }
  24. public int msgsPerPartition(int partitionsSize) {
  25. return msgsPerPartition(maxMessages, partitionsSize);
  26. }
  27. public int msgsPerPartition(long awaitingMessages, int partitionsSize) {
  28. return (int) Math.ceil((double) awaitingMessages / partitionsSize);
  29. }
  30. protected Map<TopicPartition, Long> offsetsFromPositions(Consumer<Bytes, Bytes> consumer,
  31. List<TopicPartition> partitions) {
  32. return findOffsetsInt(consumer, consumerPosition.getSeekTo(), partitions);
  33. }
  34. protected Map<TopicPartition, Long> offsetsFromBeginning(Consumer<Bytes, Bytes> consumer,
  35. List<TopicPartition> partitions) {
  36. return findOffsets(consumer, Map.of(), partitions);
  37. }
  38. protected Map<TopicPartition, Long> offsetsForTimestamp(Consumer<Bytes, Bytes> consumer) {
  39. Map<TopicPartition, Long> timestampsToSearch =
  40. consumerPosition.getSeekTo().entrySet().stream()
  41. .collect(Collectors.toMap(
  42. Map.Entry::getKey,
  43. Map.Entry::getValue
  44. ));
  45. Map<TopicPartition, Long> offsetsForTimestamps = consumer.offsetsForTimes(timestampsToSearch)
  46. .entrySet().stream()
  47. .filter(e -> e.getValue() != null)
  48. .map(v -> Tuples.of(v.getKey(), v.getValue().offset()))
  49. .collect(Collectors.toMap(Tuple2::getT1, Tuple2::getT2));
  50. if (offsetsForTimestamps.isEmpty()) {
  51. throw new IllegalArgumentException("No offsets were found for requested timestamps");
  52. }
  53. log.info("Timestamps: {} to offsets: {}", timestampsToSearch, offsetsForTimestamps);
  54. return findOffsets(consumer, offsetsForTimestamps, offsetsForTimestamps.keySet());
  55. }
  56. protected Map<TopicPartition, Long> findOffsetsInt(
  57. Consumer<Bytes, Bytes> consumer, Map<TopicPartition, Long> seekTo,
  58. List<TopicPartition> partitions) {
  59. return findOffsets(consumer, seekTo, partitions);
  60. }
  61. protected Map<TopicPartition, Long> findOffsets(
  62. Consumer<Bytes, Bytes> consumer, Map<TopicPartition, Long> seekTo,
  63. Collection<TopicPartition> partitions) {
  64. final Map<TopicPartition, Long> beginningOffsets = consumer.beginningOffsets(partitions);
  65. final Map<TopicPartition, Long> endOffsets = consumer.endOffsets(partitions);
  66. final Map<TopicPartition, Long> seekMap = new HashMap<>();
  67. final Set<TopicPartition> emptyPartitions = new HashSet<>();
  68. for (Map.Entry<TopicPartition, Long> entry : seekTo.entrySet()) {
  69. final Long endOffset = endOffsets.get(entry.getKey());
  70. final Long beginningOffset = beginningOffsets.get(entry.getKey());
  71. if (beginningOffset != null
  72. && endOffset != null
  73. && beginningOffset < endOffset
  74. && entry.getValue() > beginningOffset
  75. ) {
  76. final Long value;
  77. if (entry.getValue() > endOffset) {
  78. value = endOffset;
  79. } else {
  80. value = entry.getValue();
  81. }
  82. seekMap.put(entry.getKey(), value);
  83. } else {
  84. emptyPartitions.add(entry.getKey());
  85. }
  86. }
  87. Set<TopicPartition> waiting = new HashSet<>(partitions);
  88. waiting.removeAll(emptyPartitions);
  89. waiting.removeAll(seekMap.keySet());
  90. for (TopicPartition topicPartition : waiting) {
  91. seekMap.put(topicPartition, endOffsets.get(topicPartition));
  92. }
  93. return seekMap;
  94. }
  95. }