SeekOperations.java 5.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121
  1. package com.provectus.kafka.ui.emitter;
  2. import static com.provectus.kafka.ui.model.PollingModeDTO.TO_TIMESTAMP;
  3. import com.google.common.annotations.VisibleForTesting;
  4. import com.google.common.base.Preconditions;
  5. import com.provectus.kafka.ui.model.ConsumerPosition;
  6. import com.provectus.kafka.ui.model.PollingModeDTO;
  7. import java.util.HashMap;
  8. import java.util.Map;
  9. import lombok.AccessLevel;
  10. import lombok.RequiredArgsConstructor;
  11. import org.apache.kafka.clients.consumer.Consumer;
  12. import org.apache.kafka.common.TopicPartition;
  13. @RequiredArgsConstructor(access = AccessLevel.PACKAGE)
  14. class SeekOperations {
  15. private final Consumer<?, ?> consumer;
  16. private final OffsetsInfo offsetsInfo;
  17. private final Map<TopicPartition, Long> offsetsForSeek; //only contains non-empty partitions!
  18. static SeekOperations create(Consumer<?, ?> consumer, ConsumerPosition consumerPosition) {
  19. OffsetsInfo offsetsInfo = consumerPosition.partitions().isEmpty()
  20. ? new OffsetsInfo(consumer, consumerPosition.topic())
  21. : new OffsetsInfo(consumer, consumerPosition.partitions());
  22. var offsetsToSeek = getOffsetsForSeek(consumer, offsetsInfo, consumerPosition);
  23. return new SeekOperations(consumer, offsetsInfo, offsetsToSeek);
  24. }
  25. void assignAndSeek() {
  26. consumer.assign(offsetsForSeek.keySet());
  27. offsetsForSeek.forEach(consumer::seek);
  28. }
  29. Map<TopicPartition, Long> getBeginOffsets() {
  30. return offsetsInfo.getBeginOffsets();
  31. }
  32. boolean assignedPartitionsFullyPolled() {
  33. return offsetsInfo.assignedPartitionsFullyPolled();
  34. }
  35. // Get offsets to seek to. NOTE: offsets do not contain empty partitions offsets
  36. Map<TopicPartition, Long> getOffsetsForSeek() {
  37. return offsetsForSeek;
  38. }
  39. /**
  40. * Finds offsets for ConsumerPosition. Note: will return empty map if no offsets found for desired criteria.
  41. */
  42. @VisibleForTesting
  43. static Map<TopicPartition, Long> getOffsetsForSeek(Consumer<?, ?> consumer,
  44. OffsetsInfo offsetsInfo,
  45. ConsumerPosition position) {
  46. switch (position.pollingMode()) {
  47. case TAILING:
  48. return consumer.endOffsets(offsetsInfo.allTargetPartitions());
  49. case LATEST:
  50. return consumer.endOffsets(offsetsInfo.getNonEmptyPartitions());
  51. case EARLIEST:
  52. return consumer.beginningOffsets(offsetsInfo.getNonEmptyPartitions());
  53. case FROM_OFFSET, TO_OFFSET:
  54. Preconditions.checkNotNull(position.offsets());
  55. return fixOffsets(offsetsInfo, position.offsets());
  56. case FROM_TIMESTAMP, TO_TIMESTAMP:
  57. Preconditions.checkNotNull(position.timestamp());
  58. return offsetsForTimestamp(consumer, position.pollingMode(), offsetsInfo, position.timestamp());
  59. default:
  60. throw new IllegalStateException();
  61. }
  62. }
  63. private static Map<TopicPartition, Long> fixOffsets(OffsetsInfo offsetsInfo,
  64. ConsumerPosition.Offsets positionOffset) {
  65. var offsets = new HashMap<TopicPartition, Long>();
  66. if (positionOffset.offset() != null) {
  67. offsetsInfo.getNonEmptyPartitions().forEach(tp -> offsets.put(tp, positionOffset.offset()));
  68. } else {
  69. Preconditions.checkNotNull(positionOffset.tpOffsets());
  70. offsets.putAll(positionOffset.tpOffsets());
  71. offsets.keySet().retainAll(offsetsInfo.getNonEmptyPartitions());
  72. }
  73. Map<TopicPartition, Long> result = new HashMap<>();
  74. offsets.forEach((tp, targetOffset) -> {
  75. long endOffset = offsetsInfo.getEndOffsets().get(tp);
  76. long beginningOffset = offsetsInfo.getBeginOffsets().get(tp);
  77. // fixing offsets with min - max bounds
  78. if (targetOffset > endOffset) {
  79. targetOffset = endOffset;
  80. } else if (targetOffset < beginningOffset) {
  81. targetOffset = beginningOffset;
  82. }
  83. result.put(tp, targetOffset);
  84. });
  85. return result;
  86. }
  87. private static Map<TopicPartition, Long> offsetsForTimestamp(Consumer<?, ?> consumer,
  88. PollingModeDTO pollingMode,
  89. OffsetsInfo offsetsInfo,
  90. Long timestamp) {
  91. Map<TopicPartition, Long> timestamps = new HashMap<>();
  92. offsetsInfo.getNonEmptyPartitions().forEach(tp -> timestamps.put(tp, timestamp));
  93. Map<TopicPartition, Long> result = new HashMap<>();
  94. consumer.offsetsForTimes(timestamps).forEach((tp, offsetAndTimestamp) -> {
  95. if (offsetAndTimestamp == null) {
  96. if (pollingMode == TO_TIMESTAMP && offsetsInfo.getNonEmptyPartitions().contains(tp)) {
  97. // if no offset was returned this means that *all* timestamps are lower
  98. // than target timestamp. Is case of TO_OFFSET mode we need to read from the ending of tp
  99. result.put(tp, offsetsInfo.getEndOffsets().get(tp));
  100. }
  101. } else {
  102. result.put(tp, offsetAndTimestamp.offset());
  103. }
  104. });
  105. return result;
  106. }
  107. }