SeekOperations.java 4.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124
  1. package com.provectus.kafka.ui.emitter;
  2. import com.google.common.annotations.VisibleForTesting;
  3. import com.google.common.base.Preconditions;
  4. import com.provectus.kafka.ui.model.ConsumerPosition;
  5. import com.provectus.kafka.ui.model.SeekTypeDTO;
  6. import java.util.HashMap;
  7. import java.util.Map;
  8. import java.util.stream.Collectors;
  9. import javax.annotation.Nullable;
  10. import lombok.AccessLevel;
  11. import lombok.RequiredArgsConstructor;
  12. import org.apache.commons.lang3.mutable.MutableLong;
  13. import org.apache.kafka.clients.consumer.Consumer;
  14. import org.apache.kafka.common.TopicPartition;
  15. @RequiredArgsConstructor(access = AccessLevel.PACKAGE)
  16. public class SeekOperations {
  17. private final Consumer<?, ?> consumer;
  18. private final OffsetsInfo offsetsInfo;
  19. private final Map<TopicPartition, Long> offsetsForSeek; //only contains non-empty partitions!
  20. public static SeekOperations create(Consumer<?, ?> consumer, ConsumerPosition consumerPosition) {
  21. OffsetsInfo offsetsInfo;
  22. if (consumerPosition.getSeekTo() == null) {
  23. offsetsInfo = new OffsetsInfo(consumer, consumerPosition.getTopic());
  24. } else {
  25. offsetsInfo = new OffsetsInfo(consumer, consumerPosition.getSeekTo().keySet());
  26. }
  27. return new SeekOperations(
  28. consumer,
  29. offsetsInfo,
  30. getOffsetsForSeek(consumer, offsetsInfo, consumerPosition.getSeekType(), consumerPosition.getSeekTo())
  31. );
  32. }
  33. public void assignAndSeekNonEmptyPartitions() {
  34. consumer.assign(offsetsForSeek.keySet());
  35. offsetsForSeek.forEach(consumer::seek);
  36. }
  37. public Map<TopicPartition, Long> getBeginOffsets() {
  38. return offsetsInfo.getBeginOffsets();
  39. }
  40. public Map<TopicPartition, Long> getEndOffsets() {
  41. return offsetsInfo.getEndOffsets();
  42. }
  43. public boolean assignedPartitionsFullyPolled() {
  44. return offsetsInfo.assignedPartitionsFullyPolled();
  45. }
  46. // sum of (end - start) offsets for all partitions
  47. public long summaryOffsetsRange() {
  48. return offsetsInfo.summaryOffsetsRange();
  49. }
  50. // sum of differences between initial consumer seek and current consumer position (across all partitions)
  51. public long offsetsProcessedFromSeek() {
  52. MutableLong count = new MutableLong();
  53. offsetsForSeek.forEach((tp, initialOffset) -> count.add(consumer.position(tp) - initialOffset));
  54. return count.getValue();
  55. }
  56. // Get offsets to seek to. NOTE: offsets do not contain empty partitions offsets
  57. public Map<TopicPartition, Long> getOffsetsForSeek() {
  58. return offsetsForSeek;
  59. }
  60. /**
  61. * Finds offsets for ConsumerPosition. Note: will return empty map if no offsets found for desired criteria.
  62. */
  63. @VisibleForTesting
  64. static Map<TopicPartition, Long> getOffsetsForSeek(Consumer<?, ?> consumer,
  65. OffsetsInfo offsetsInfo,
  66. SeekTypeDTO seekType,
  67. @Nullable Map<TopicPartition, Long> seekTo) {
  68. switch (seekType) {
  69. case LATEST:
  70. return consumer.endOffsets(offsetsInfo.getNonEmptyPartitions());
  71. case BEGINNING:
  72. return consumer.beginningOffsets(offsetsInfo.getNonEmptyPartitions());
  73. case OFFSET:
  74. Preconditions.checkNotNull(seekTo);
  75. return fixOffsets(offsetsInfo, seekTo);
  76. case TIMESTAMP:
  77. Preconditions.checkNotNull(seekTo);
  78. return offsetsForTimestamp(consumer, offsetsInfo, seekTo);
  79. default:
  80. throw new IllegalStateException();
  81. }
  82. }
  83. private static Map<TopicPartition, Long> fixOffsets(OffsetsInfo offsetsInfo, Map<TopicPartition, Long> offsets) {
  84. offsets = new HashMap<>(offsets);
  85. offsets.keySet().retainAll(offsetsInfo.getNonEmptyPartitions());
  86. Map<TopicPartition, Long> result = new HashMap<>();
  87. offsets.forEach((tp, targetOffset) -> {
  88. long endOffset = offsetsInfo.getEndOffsets().get(tp);
  89. long beginningOffset = offsetsInfo.getBeginOffsets().get(tp);
  90. // fixing offsets with min - max bounds
  91. if (targetOffset > endOffset) {
  92. targetOffset = endOffset;
  93. } else if (targetOffset < beginningOffset) {
  94. targetOffset = beginningOffset;
  95. }
  96. result.put(tp, targetOffset);
  97. });
  98. return result;
  99. }
  100. private static Map<TopicPartition, Long> offsetsForTimestamp(Consumer<?, ?> consumer, OffsetsInfo offsetsInfo,
  101. Map<TopicPartition, Long> timestamps) {
  102. timestamps = new HashMap<>(timestamps);
  103. timestamps.keySet().retainAll(offsetsInfo.getNonEmptyPartitions());
  104. return consumer.offsetsForTimes(timestamps).entrySet().stream()
  105. .filter(e -> e.getValue() != null)
  106. .collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue().offset()));
  107. }
  108. }