OffsetsInfo.java 1.7 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859
  1. package com.provectus.kafka.ui.emitter;
  2. import com.google.common.base.Preconditions;
  3. import java.util.Collection;
  4. import java.util.HashSet;
  5. import java.util.Map;
  6. import java.util.Set;
  7. import java.util.stream.Collectors;
  8. import lombok.Getter;
  9. import lombok.extern.slf4j.Slf4j;
  10. import org.apache.kafka.clients.consumer.Consumer;
  11. import org.apache.kafka.common.TopicPartition;
  12. @Slf4j
  13. @Getter
  14. public class OffsetsInfo {
  15. private final Consumer<?, ?> consumer;
  16. private final Map<TopicPartition, Long> beginOffsets;
  17. private final Map<TopicPartition, Long> endOffsets;
  18. private final Set<TopicPartition> nonEmptyPartitions = new HashSet<>();
  19. private final Set<TopicPartition> emptyPartitions = new HashSet<>();
  20. public OffsetsInfo(Consumer<?, ?> consumer, String topic) {
  21. this(consumer,
  22. consumer.partitionsFor(topic).stream()
  23. .map(pi -> new TopicPartition(topic, pi.partition()))
  24. .collect(Collectors.toList())
  25. );
  26. }
  27. public OffsetsInfo(Consumer<?, ?> consumer,
  28. Collection<TopicPartition> targetPartitions) {
  29. this.consumer = consumer;
  30. this.beginOffsets = consumer.beginningOffsets(targetPartitions);
  31. this.endOffsets = consumer.endOffsets(targetPartitions);
  32. endOffsets.forEach((tp, endOffset) -> {
  33. var beginningOffset = beginOffsets.get(tp);
  34. if (endOffset > beginningOffset) {
  35. nonEmptyPartitions.add(tp);
  36. } else {
  37. emptyPartitions.add(tp);
  38. }
  39. });
  40. }
  41. public boolean assignedPartitionsFullyPolled() {
  42. for (var tp: consumer.assignment()) {
  43. Preconditions.checkArgument(endOffsets.containsKey(tp));
  44. if (endOffsets.get(tp) > consumer.position(tp)) {
  45. return false;
  46. }
  47. }
  48. return true;
  49. }
  50. }