OffsetsInfo.java 1.9 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364
  1. package com.provectus.kafka.ui.emitter;
  2. import com.google.common.base.Preconditions;
  3. import java.util.Collection;
  4. import java.util.HashSet;
  5. import java.util.Map;
  6. import java.util.Set;
  7. import lombok.Getter;
  8. import lombok.extern.slf4j.Slf4j;
  9. import org.apache.commons.lang3.mutable.MutableLong;
  10. import org.apache.kafka.clients.consumer.Consumer;
  11. import org.apache.kafka.common.TopicPartition;
  12. @Slf4j
  13. @Getter
  14. class OffsetsInfo {
  15. private final Consumer<?, ?> consumer;
  16. private final Map<TopicPartition, Long> beginOffsets;
  17. private final Map<TopicPartition, Long> endOffsets;
  18. private final Set<TopicPartition> nonEmptyPartitions = new HashSet<>();
  19. private final Set<TopicPartition> emptyPartitions = new HashSet<>();
  20. OffsetsInfo(Consumer<?, ?> consumer, String topic) {
  21. this(consumer,
  22. consumer.partitionsFor(topic).stream()
  23. .map(pi -> new TopicPartition(topic, pi.partition()))
  24. .toList()
  25. );
  26. }
  27. OffsetsInfo(Consumer<?, ?> consumer, Collection<TopicPartition> targetPartitions) {
  28. this.consumer = consumer;
  29. this.beginOffsets = consumer.beginningOffsets(targetPartitions);
  30. this.endOffsets = consumer.endOffsets(targetPartitions);
  31. endOffsets.forEach((tp, endOffset) -> {
  32. var beginningOffset = beginOffsets.get(tp);
  33. if (endOffset > beginningOffset) {
  34. nonEmptyPartitions.add(tp);
  35. } else {
  36. emptyPartitions.add(tp);
  37. }
  38. });
  39. }
  40. boolean assignedPartitionsFullyPolled() {
  41. for (var tp : consumer.assignment()) {
  42. Preconditions.checkArgument(endOffsets.containsKey(tp));
  43. if (endOffsets.get(tp) > consumer.position(tp)) {
  44. return false;
  45. }
  46. }
  47. return true;
  48. }
  49. long summaryOffsetsRange() {
  50. MutableLong cnt = new MutableLong();
  51. nonEmptyPartitions.forEach(tp -> cnt.add(endOffsets.get(tp) - beginOffsets.get(tp)));
  52. return cnt.getValue();
  53. }
  54. }