OffsetsSeek.java 2.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384
  1. package com.provectus.kafka.ui.util;
  2. import com.provectus.kafka.ui.model.ConsumerPosition;
  3. import com.provectus.kafka.ui.model.SeekType;
  4. import com.provectus.kafka.ui.service.ConsumingService;
  5. import java.util.HashMap;
  6. import java.util.List;
  7. import java.util.Map;
  8. import java.util.stream.Collectors;
  9. import lombok.extern.log4j.Log4j2;
  10. import org.apache.kafka.clients.consumer.Consumer;
  11. import org.apache.kafka.clients.consumer.ConsumerRecord;
  12. import org.apache.kafka.common.TopicPartition;
  13. import org.apache.kafka.common.utils.Bytes;
  14. @Log4j2
  15. public abstract class OffsetsSeek {
  16. protected final String topic;
  17. protected final ConsumerPosition consumerPosition;
  18. public OffsetsSeek(String topic, ConsumerPosition consumerPosition) {
  19. this.topic = topic;
  20. this.consumerPosition = consumerPosition;
  21. }
  22. public WaitingOffsets assignAndSeek(Consumer<Bytes, Bytes> consumer) {
  23. SeekType seekType = consumerPosition.getSeekType();
  24. log.info("Positioning consumer for topic {} with {}", topic, consumerPosition);
  25. switch (seekType) {
  26. case OFFSET:
  27. assignAndSeekForOffset(consumer);
  28. break;
  29. case TIMESTAMP:
  30. assignAndSeekForTimestamp(consumer);
  31. break;
  32. case BEGINNING:
  33. assignAndSeekFromBeginning(consumer);
  34. break;
  35. default:
  36. throw new IllegalArgumentException("Unknown seekType: " + seekType);
  37. }
  38. log.info("Assignment: {}", consumer.assignment());
  39. return new WaitingOffsets(topic, consumer);
  40. }
  41. protected List<TopicPartition> getRequestedPartitions(Consumer<Bytes, Bytes> consumer) {
  42. Map<Integer, Long> partitionPositions = consumerPosition.getSeekTo();
  43. return consumer.partitionsFor(topic).stream()
  44. .filter(
  45. p -> partitionPositions.isEmpty() || partitionPositions.containsKey(p.partition()))
  46. .map(p -> new TopicPartition(p.topic(), p.partition()))
  47. .collect(Collectors.toList());
  48. }
  49. protected abstract void assignAndSeekFromBeginning(Consumer<Bytes, Bytes> consumer);
  50. protected abstract void assignAndSeekForTimestamp(Consumer<Bytes, Bytes> consumer);
  51. protected abstract void assignAndSeekForOffset(Consumer<Bytes, Bytes> consumer);
  52. public static class WaitingOffsets {
  53. final Map<Integer, Long> offsets = new HashMap<>(); // partition number -> offset
  54. public WaitingOffsets(String topic, Consumer<?, ?> consumer) {
  55. var partitions = consumer.assignment().stream()
  56. .map(TopicPartition::partition)
  57. .collect(Collectors.toList());
  58. ConsumingService.significantOffsets(consumer, topic, partitions)
  59. .forEach((tp, offset) -> offsets.put(tp.partition(), offset - 1));
  60. }
  61. public void markPolled(ConsumerRecord<?, ?> rec) {
  62. Long waiting = offsets.get(rec.partition());
  63. if (waiting != null && waiting <= rec.offset()) {
  64. offsets.remove(rec.partition());
  65. }
  66. }
  67. public boolean endReached() {
  68. return offsets.isEmpty();
  69. }
  70. }
  71. }