BackwardEmitter.java 2.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960
  1. package com.provectus.kafka.ui.emitter;
  2. import com.provectus.kafka.ui.model.ConsumerPosition;
  3. import com.provectus.kafka.ui.model.TopicMessageDTO;
  4. import com.provectus.kafka.ui.serdes.ConsumerRecordDeserializer;
  5. import java.util.Comparator;
  6. import java.util.Map;
  7. import java.util.TreeMap;
  8. import java.util.function.Predicate;
  9. import java.util.function.Supplier;
  10. import java.util.stream.Collectors;
  11. import org.apache.kafka.common.TopicPartition;
  12. public class BackwardEmitter extends RangePollingEmitter {
  13. public BackwardEmitter(Supplier<EnhancedConsumer> consumerSupplier,
  14. ConsumerPosition consumerPosition,
  15. int messagesPerPage,
  16. ConsumerRecordDeserializer deserializer,
  17. Predicate<TopicMessageDTO> filter,
  18. PollingSettings pollingSettings) {
  19. super(
  20. consumerSupplier,
  21. consumerPosition,
  22. messagesPerPage,
  23. new MessagesProcessing(
  24. deserializer,
  25. filter,
  26. false,
  27. messagesPerPage
  28. ),
  29. pollingSettings
  30. );
  31. }
  32. @Override
  33. protected TreeMap<TopicPartition, FromToOffset> nextPollingRange(TreeMap<TopicPartition, FromToOffset> prevRange,
  34. SeekOperations seekOperations) {
  35. TreeMap<TopicPartition, Long> readToOffsets = new TreeMap<>(Comparator.comparingInt(TopicPartition::partition));
  36. if (prevRange.isEmpty()) {
  37. readToOffsets.putAll(seekOperations.getOffsetsForSeek());
  38. } else {
  39. readToOffsets.putAll(
  40. prevRange.entrySet()
  41. .stream()
  42. .collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue().from()))
  43. );
  44. }
  45. int msgsToPollPerPartition = (int) Math.ceil((double) messagesPerPage / readToOffsets.size());
  46. TreeMap<TopicPartition, FromToOffset> result = new TreeMap<>(Comparator.comparingInt(TopicPartition::partition));
  47. readToOffsets.forEach((tp, toOffset) -> {
  48. long tpStartOffset = seekOperations.getBeginOffsets().get(tp);
  49. if (toOffset > tpStartOffset) {
  50. result.put(tp, new FromToOffset(Math.max(tpStartOffset, toOffset - msgsToPollPerPartition), toOffset));
  51. }
  52. });
  53. return result;
  54. }
  55. }