OffsetsInfo.java 1.7 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758
  1. package com.provectus.kafka.ui.emitter;
  2. import com.google.common.base.Preconditions;
  3. import java.util.Collection;
  4. import java.util.HashSet;
  5. import java.util.Map;
  6. import java.util.Set;
  7. import lombok.Getter;
  8. import lombok.extern.slf4j.Slf4j;
  9. import org.apache.kafka.clients.consumer.Consumer;
  10. import org.apache.kafka.common.TopicPartition;
  11. @Slf4j
  12. @Getter
  13. public class OffsetsInfo {
  14. private final Consumer<?, ?> consumer;
  15. private final Map<TopicPartition, Long> beginOffsets;
  16. private final Map<TopicPartition, Long> endOffsets;
  17. private final Set<TopicPartition> nonEmptyPartitions = new HashSet<>();
  18. private final Set<TopicPartition> emptyPartitions = new HashSet<>();
  19. public OffsetsInfo(Consumer<?, ?> consumer, String topic) {
  20. this(consumer,
  21. consumer.partitionsFor(topic).stream()
  22. .map(pi -> new TopicPartition(topic, pi.partition()))
  23. .toList()
  24. );
  25. }
  26. public OffsetsInfo(Consumer<?, ?> consumer,
  27. Collection<TopicPartition> targetPartitions) {
  28. this.consumer = consumer;
  29. this.beginOffsets = consumer.beginningOffsets(targetPartitions);
  30. this.endOffsets = consumer.endOffsets(targetPartitions);
  31. endOffsets.forEach((tp, endOffset) -> {
  32. var beginningOffset = beginOffsets.get(tp);
  33. if (endOffset > beginningOffset) {
  34. nonEmptyPartitions.add(tp);
  35. } else {
  36. emptyPartitions.add(tp);
  37. }
  38. });
  39. }
  40. public boolean assignedPartitionsFullyPolled() {
  41. for (var tp: consumer.assignment()) {
  42. Preconditions.checkArgument(endOffsets.containsKey(tp));
  43. if (endOffsets.get(tp) > consumer.position(tp)) {
  44. return false;
  45. }
  46. }
  47. return true;
  48. }
  49. }