OffsetsInfo.java 2.0 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970
  1. package com.provectus.kafka.ui.emitter;
  2. import com.google.common.base.Preconditions;
  3. import com.google.common.collect.Sets;
  4. import java.util.Collection;
  5. import java.util.HashSet;
  6. import java.util.Map;
  7. import java.util.Set;
  8. import java.util.stream.Collectors;
  9. import lombok.Getter;
  10. import lombok.extern.slf4j.Slf4j;
  11. import org.apache.commons.lang3.mutable.MutableLong;
  12. import org.apache.kafka.clients.consumer.Consumer;
  13. import org.apache.kafka.common.TopicPartition;
  14. @Slf4j
  15. @Getter
  16. class OffsetsInfo {
  17. private final Consumer<?, ?> consumer;
  18. private final Map<TopicPartition, Long> beginOffsets;
  19. private final Map<TopicPartition, Long> endOffsets;
  20. private final Set<TopicPartition> nonEmptyPartitions = new HashSet<>();
  21. private final Set<TopicPartition> emptyPartitions = new HashSet<>();
  22. OffsetsInfo(Consumer<?, ?> consumer, String topic) {
  23. this(consumer,
  24. consumer.partitionsFor(topic).stream()
  25. .map(pi -> new TopicPartition(topic, pi.partition()))
  26. .toList()
  27. );
  28. }
  29. OffsetsInfo(Consumer<?, ?> consumer, Collection<TopicPartition> targetPartitions) {
  30. this.consumer = consumer;
  31. this.beginOffsets = consumer.beginningOffsets(targetPartitions);
  32. this.endOffsets = consumer.endOffsets(targetPartitions);
  33. endOffsets.forEach((tp, endOffset) -> {
  34. var beginningOffset = beginOffsets.get(tp);
  35. if (endOffset > beginningOffset) {
  36. nonEmptyPartitions.add(tp);
  37. } else {
  38. emptyPartitions.add(tp);
  39. }
  40. });
  41. }
  42. boolean assignedPartitionsFullyPolled() {
  43. for (var tp : consumer.assignment()) {
  44. Preconditions.checkArgument(endOffsets.containsKey(tp));
  45. if (endOffsets.get(tp) > consumer.position(tp)) {
  46. return false;
  47. }
  48. }
  49. return true;
  50. }
  51. long summaryOffsetsRange() {
  52. MutableLong cnt = new MutableLong();
  53. nonEmptyPartitions.forEach(tp -> cnt.add(endOffsets.get(tp) - beginOffsets.get(tp)));
  54. return cnt.getValue();
  55. }
  56. Set<TopicPartition> allTargetPartitions() {
  57. return Sets.union(nonEmptyPartitions, emptyPartitions);
  58. }
  59. }