SeekOperations.java 4.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112
  1. package com.provectus.kafka.ui.emitter;
  2. import static com.provectus.kafka.ui.model.PollingModeDTO.TO_TIMESTAMP;
  3. import static java.util.Objects.requireNonNull;
  4. import com.google.common.annotations.VisibleForTesting;
  5. import com.provectus.kafka.ui.model.ConsumerPosition;
  6. import com.provectus.kafka.ui.model.PollingModeDTO;
  7. import java.util.HashMap;
  8. import java.util.Map;
  9. import lombok.AccessLevel;
  10. import lombok.RequiredArgsConstructor;
  11. import org.apache.kafka.clients.consumer.Consumer;
  12. import org.apache.kafka.common.TopicPartition;
  13. @RequiredArgsConstructor(access = AccessLevel.PACKAGE)
  14. class SeekOperations {
  15. private final Consumer<?, ?> consumer;
  16. private final OffsetsInfo offsetsInfo;
  17. private final Map<TopicPartition, Long> offsetsForSeek; //only contains non-empty partitions!
  18. public static SeekOperations create(Consumer<?, ?> consumer, ConsumerPosition consumerPosition) {
  19. OffsetsInfo offsetsInfo = consumerPosition.partitions().isEmpty()
  20. ? new OffsetsInfo(consumer, consumerPosition.topic())
  21. : new OffsetsInfo(consumer, consumerPosition.partitions());
  22. var offsetsToSeek = getOffsetsForSeek(consumer, offsetsInfo, consumerPosition);
  23. return new SeekOperations(consumer, offsetsInfo, offsetsToSeek);
  24. }
  25. public void assignAndSeekNonEmptyPartitions() {
  26. consumer.assign(offsetsForSeek.keySet());
  27. offsetsForSeek.forEach(consumer::seek);
  28. }
  29. Map<TopicPartition, Long> getBeginOffsets() {
  30. return offsetsInfo.getBeginOffsets();
  31. }
  32. boolean assignedPartitionsFullyPolled() {
  33. return offsetsInfo.assignedPartitionsFullyPolled();
  34. }
  35. // Get offsets to seek to. NOTE: offsets do not contain empty partitions offsets
  36. Map<TopicPartition, Long> getOffsetsForSeek() {
  37. return offsetsForSeek;
  38. }
  39. /**
  40. * Finds offsets for ConsumerPosition. Note: will return empty map if no offsets found for desired criteria.
  41. */
  42. @VisibleForTesting
  43. static Map<TopicPartition, Long> getOffsetsForSeek(Consumer<?, ?> consumer,
  44. OffsetsInfo offsetsInfo,
  45. ConsumerPosition position) {
  46. return switch (position.pollingMode()) {
  47. case TAILING -> consumer.endOffsets(offsetsInfo.allTargetPartitions());
  48. case LATEST -> consumer.endOffsets(offsetsInfo.getNonEmptyPartitions());
  49. case EARLIEST -> consumer.beginningOffsets(offsetsInfo.getNonEmptyPartitions());
  50. case FROM_OFFSET, TO_OFFSET -> fixOffsets(offsetsInfo, requireNonNull(position.offsets()));
  51. case FROM_TIMESTAMP, TO_TIMESTAMP ->
  52. offsetsForTimestamp(consumer, position.pollingMode(), offsetsInfo, requireNonNull(position.timestamp()));
  53. };
  54. }
  55. private static Map<TopicPartition, Long> fixOffsets(OffsetsInfo offsetsInfo,
  56. ConsumerPosition.Offsets positionOffset) {
  57. var offsets = new HashMap<TopicPartition, Long>();
  58. if (positionOffset.offset() != null) {
  59. offsetsInfo.getNonEmptyPartitions().forEach(tp -> offsets.put(tp, positionOffset.offset()));
  60. } else {
  61. offsets.putAll(requireNonNull(positionOffset.tpOffsets()));
  62. offsets.keySet().retainAll(offsetsInfo.getNonEmptyPartitions());
  63. }
  64. Map<TopicPartition, Long> result = new HashMap<>();
  65. offsets.forEach((tp, targetOffset) -> {
  66. long endOffset = offsetsInfo.getEndOffsets().get(tp);
  67. long beginningOffset = offsetsInfo.getBeginOffsets().get(tp);
  68. // fixing offsets with min - max bounds
  69. if (targetOffset > endOffset) {
  70. targetOffset = endOffset;
  71. } else if (targetOffset < beginningOffset) {
  72. targetOffset = beginningOffset;
  73. }
  74. result.put(tp, targetOffset);
  75. });
  76. return result;
  77. }
  78. private static Map<TopicPartition, Long> offsetsForTimestamp(Consumer<?, ?> consumer,
  79. PollingModeDTO pollingMode,
  80. OffsetsInfo offsetsInfo,
  81. Long timestamp) {
  82. Map<TopicPartition, Long> timestamps = new HashMap<>();
  83. offsetsInfo.getNonEmptyPartitions().forEach(tp -> timestamps.put(tp, timestamp));
  84. Map<TopicPartition, Long> result = new HashMap<>();
  85. consumer.offsetsForTimes(timestamps).forEach((tp, offsetAndTimestamp) -> {
  86. if (offsetAndTimestamp == null) {
  87. if (pollingMode == TO_TIMESTAMP && offsetsInfo.getNonEmptyPartitions().contains(tp)) {
  88. // if no offset was returned this means that *all* timestamps are lower
  89. // than target timestamp. Is case of TO_OFFSET mode we need to read from the ending of tp
  90. result.put(tp, offsetsInfo.getEndOffsets().get(tp));
  91. }
  92. } else {
  93. result.put(tp, offsetAndTimestamp.offset());
  94. }
  95. });
  96. return result;
  97. }
  98. }