SeekOperations.java 4.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111
  1. package com.provectus.kafka.ui.emitter;
  2. import com.google.common.annotations.VisibleForTesting;
  3. import com.google.common.base.Preconditions;
  4. import com.provectus.kafka.ui.model.ConsumerPosition;
  5. import com.provectus.kafka.ui.model.SeekTypeDTO;
  6. import java.util.HashMap;
  7. import java.util.Map;
  8. import java.util.stream.Collectors;
  9. import javax.annotation.Nullable;
  10. import lombok.AccessLevel;
  11. import lombok.RequiredArgsConstructor;
  12. import org.apache.kafka.clients.consumer.Consumer;
  13. import org.apache.kafka.common.TopicPartition;
  14. @RequiredArgsConstructor(access = AccessLevel.PACKAGE)
  15. public class SeekOperations {
  16. private final Consumer<?, ?> consumer;
  17. private final OffsetsInfo offsetsInfo;
  18. private final Map<TopicPartition, Long> offsetsForSeek; //only contains non-empty partitions!
  19. public static SeekOperations create(Consumer<?, ?> consumer, ConsumerPosition consumerPosition) {
  20. OffsetsInfo offsetsInfo;
  21. if (consumerPosition.getSeekTo() == null) {
  22. offsetsInfo = new OffsetsInfo(consumer, consumerPosition.getTopic());
  23. } else {
  24. offsetsInfo = new OffsetsInfo(consumer, consumerPosition.getSeekTo().keySet());
  25. }
  26. return new SeekOperations(
  27. consumer,
  28. offsetsInfo,
  29. getOffsetsForSeek(consumer, offsetsInfo, consumerPosition.getSeekType(), consumerPosition.getSeekTo())
  30. );
  31. }
  32. public void assignAndSeekNonEmptyPartitions() {
  33. consumer.assign(offsetsForSeek.keySet());
  34. offsetsForSeek.forEach(consumer::seek);
  35. }
  36. public Map<TopicPartition, Long> getBeginOffsets() {
  37. return offsetsInfo.getBeginOffsets();
  38. }
  39. public Map<TopicPartition, Long> getEndOffsets() {
  40. return offsetsInfo.getEndOffsets();
  41. }
  42. public boolean assignedPartitionsFullyPolled() {
  43. return offsetsInfo.assignedPartitionsFullyPolled();
  44. }
  45. // Get offsets to seek to. NOTE: offsets do not contain empty partitions offsets
  46. public Map<TopicPartition, Long> getOffsetsForSeek() {
  47. return offsetsForSeek;
  48. }
  49. /**
  50. * Finds offsets for ConsumerPosition. Note: will return empty map if no offsets found for desired criteria.
  51. */
  52. @VisibleForTesting
  53. static Map<TopicPartition, Long> getOffsetsForSeek(Consumer<?, ?> consumer,
  54. OffsetsInfo offsetsInfo,
  55. SeekTypeDTO seekType,
  56. @Nullable Map<TopicPartition, Long> seekTo) {
  57. switch (seekType) {
  58. case LATEST:
  59. return consumer.endOffsets(offsetsInfo.getNonEmptyPartitions());
  60. case BEGINNING:
  61. return consumer.beginningOffsets(offsetsInfo.getNonEmptyPartitions());
  62. case OFFSET:
  63. Preconditions.checkNotNull(seekTo);
  64. return fixOffsets(offsetsInfo, seekTo);
  65. case TIMESTAMP:
  66. Preconditions.checkNotNull(seekTo);
  67. return offsetsForTimestamp(consumer, offsetsInfo, seekTo);
  68. default:
  69. throw new IllegalStateException();
  70. }
  71. }
  72. private static Map<TopicPartition, Long> fixOffsets(OffsetsInfo offsetsInfo, Map<TopicPartition, Long> offsets) {
  73. offsets = new HashMap<>(offsets);
  74. offsets.keySet().retainAll(offsetsInfo.getNonEmptyPartitions());
  75. Map<TopicPartition, Long> result = new HashMap<>();
  76. offsets.forEach((tp, targetOffset) -> {
  77. long endOffset = offsetsInfo.getEndOffsets().get(tp);
  78. long beginningOffset = offsetsInfo.getBeginOffsets().get(tp);
  79. // fixing offsets with min - max bounds
  80. if (targetOffset > endOffset) {
  81. targetOffset = endOffset;
  82. } else if (targetOffset < beginningOffset) {
  83. targetOffset = beginningOffset;
  84. }
  85. result.put(tp, targetOffset);
  86. });
  87. return result;
  88. }
  89. private static Map<TopicPartition, Long> offsetsForTimestamp(Consumer<?, ?> consumer, OffsetsInfo offsetsInfo,
  90. Map<TopicPartition, Long> timestamps) {
  91. timestamps = new HashMap<>(timestamps);
  92. timestamps.keySet().retainAll(offsetsInfo.getNonEmptyPartitions());
  93. return consumer.offsetsForTimes(timestamps).entrySet().stream()
  94. .filter(e -> e.getValue() != null)
  95. .collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue().offset()));
  96. }
  97. }