OffsetsSeek.java 5.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146
  1. package com.provectus.kafka.ui.util;
  2. import com.provectus.kafka.ui.model.ConsumerPosition;
  3. import com.provectus.kafka.ui.model.SeekType;
  4. import java.util.Collection;
  5. import java.util.List;
  6. import java.util.Map;
  7. import java.util.stream.Collectors;
  8. import lombok.extern.log4j.Log4j2;
  9. import org.apache.kafka.clients.consumer.Consumer;
  10. import org.apache.kafka.clients.consumer.ConsumerRecord;
  11. import org.apache.kafka.common.TopicPartition;
  12. import org.apache.kafka.common.utils.Bytes;
  13. import reactor.util.function.Tuple2;
  14. import reactor.util.function.Tuples;
  15. @Log4j2
  16. public abstract class OffsetsSeek {
  17. protected final String topic;
  18. protected final ConsumerPosition consumerPosition;
  19. protected OffsetsSeek(String topic, ConsumerPosition consumerPosition) {
  20. this.topic = topic;
  21. this.consumerPosition = consumerPosition;
  22. }
  23. public ConsumerPosition getConsumerPosition() {
  24. return consumerPosition;
  25. }
  26. public Map<TopicPartition, Long> getPartitionsOffsets(Consumer<Bytes, Bytes> consumer) {
  27. SeekType seekType = consumerPosition.getSeekType();
  28. List<TopicPartition> partitions = getRequestedPartitions(consumer);
  29. log.info("Positioning consumer for topic {} with {}", topic, consumerPosition);
  30. Map<TopicPartition, Long> offsets;
  31. switch (seekType) {
  32. case OFFSET:
  33. offsets = offsetsFromPositions(consumer, partitions);
  34. break;
  35. case TIMESTAMP:
  36. offsets = offsetsForTimestamp(consumer);
  37. break;
  38. case BEGINNING:
  39. offsets = offsetsFromBeginning(consumer, partitions);
  40. break;
  41. default:
  42. throw new IllegalArgumentException("Unknown seekType: " + seekType);
  43. }
  44. return offsets;
  45. }
  46. public WaitingOffsets waitingOffsets(Consumer<Bytes, Bytes> consumer,
  47. Collection<TopicPartition> partitions) {
  48. return new WaitingOffsets(topic, consumer, partitions);
  49. }
  50. public WaitingOffsets assignAndSeek(Consumer<Bytes, Bytes> consumer) {
  51. final Map<TopicPartition, Long> partitionsOffsets = getPartitionsOffsets(consumer);
  52. consumer.assign(partitionsOffsets.keySet());
  53. partitionsOffsets.forEach(consumer::seek);
  54. log.info("Assignment: {}", consumer.assignment());
  55. return waitingOffsets(consumer, partitionsOffsets.keySet());
  56. }
  57. public List<TopicPartition> getRequestedPartitions(Consumer<Bytes, Bytes> consumer) {
  58. Map<TopicPartition, Long> partitionPositions = consumerPosition.getSeekTo();
  59. return consumer.partitionsFor(topic).stream()
  60. .filter(
  61. p -> partitionPositions.isEmpty()
  62. || partitionPositions.containsKey(new TopicPartition(p.topic(), p.partition()))
  63. ).map(p -> new TopicPartition(p.topic(), p.partition()))
  64. .collect(Collectors.toList());
  65. }
  66. protected abstract Map<TopicPartition, Long> offsetsFromBeginning(
  67. Consumer<Bytes, Bytes> consumer, List<TopicPartition> partitions);
  68. protected abstract Map<TopicPartition, Long> offsetsForTimestamp(
  69. Consumer<Bytes, Bytes> consumer);
  70. protected abstract Map<TopicPartition, Long> offsetsFromPositions(
  71. Consumer<Bytes, Bytes> consumer, List<TopicPartition> partitions);
  72. public static class WaitingOffsets {
  73. private final Map<Integer, Long> endOffsets; // partition number -> offset
  74. private final Map<Integer, Long> beginOffsets; // partition number -> offset
  75. private final String topic;
  76. public WaitingOffsets(String topic, Consumer<?, ?> consumer,
  77. Collection<TopicPartition> partitions) {
  78. this.topic = topic;
  79. var allBeginningOffsets = consumer.beginningOffsets(partitions);
  80. var allEndOffsets = consumer.endOffsets(partitions);
  81. this.endOffsets = allEndOffsets.entrySet().stream()
  82. .filter(entry -> !allBeginningOffsets.get(entry.getKey()).equals(entry.getValue()))
  83. .map(e -> Tuples.of(e.getKey().partition(), e.getValue() - 1))
  84. .collect(Collectors.toMap(Tuple2::getT1, Tuple2::getT2));
  85. this.beginOffsets = this.endOffsets.keySet().stream()
  86. .map(p -> Tuples.of(p, allBeginningOffsets.get(new TopicPartition(topic, p))))
  87. .collect(Collectors.toMap(Tuple2::getT1, Tuple2::getT2));
  88. }
  89. public List<TopicPartition> topicPartitions() {
  90. return this.endOffsets.keySet().stream()
  91. .map(p -> new TopicPartition(topic, p))
  92. .collect(Collectors.toList());
  93. }
  94. public void markPolled(int partition) {
  95. endOffsets.remove(partition);
  96. beginOffsets.remove(partition);
  97. }
  98. public void markPolled(ConsumerRecord<?, ?> rec) {
  99. Long endWaiting = endOffsets.get(rec.partition());
  100. if (endWaiting != null && endWaiting <= rec.offset()) {
  101. endOffsets.remove(rec.partition());
  102. }
  103. Long beginWaiting = beginOffsets.get(rec.partition());
  104. if (beginWaiting != null && beginWaiting >= rec.offset()) {
  105. beginOffsets.remove(rec.partition());
  106. }
  107. }
  108. public boolean endReached() {
  109. return endOffsets.isEmpty();
  110. }
  111. public boolean beginReached() {
  112. return beginOffsets.isEmpty();
  113. }
  114. public Map<Integer, Long> getEndOffsets() {
  115. return endOffsets;
  116. }
  117. public Map<Integer, Long> getBeginOffsets() {
  118. return beginOffsets;
  119. }
  120. }
  121. }