MessagesProcessing.java 4.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112
  1. package com.provectus.kafka.ui.emitter;
  2. import static java.util.stream.Collectors.collectingAndThen;
  3. import static java.util.stream.Collectors.groupingBy;
  4. import static java.util.stream.Collectors.toList;
  5. import com.google.common.annotations.VisibleForTesting;
  6. import com.google.common.collect.Iterables;
  7. import com.google.common.collect.Streams;
  8. import com.provectus.kafka.ui.model.TopicMessageDTO;
  9. import com.provectus.kafka.ui.model.TopicMessageEventDTO;
  10. import com.provectus.kafka.ui.model.TopicMessagePhaseDTO;
  11. import com.provectus.kafka.ui.serdes.ConsumerRecordDeserializer;
  12. import java.util.Comparator;
  13. import java.util.List;
  14. import java.util.Map;
  15. import java.util.TreeMap;
  16. import java.util.function.Predicate;
  17. import javax.annotation.Nullable;
  18. import lombok.RequiredArgsConstructor;
  19. import lombok.extern.slf4j.Slf4j;
  20. import org.apache.kafka.clients.consumer.ConsumerRecord;
  21. import org.apache.kafka.common.utils.Bytes;
  22. import reactor.core.publisher.FluxSink;
  23. @Slf4j
  24. @RequiredArgsConstructor
  25. class MessagesProcessing {
  26. private final ConsumingStats consumingStats = new ConsumingStats();
  27. private long sentMessages = 0;
  28. private final ConsumerRecordDeserializer deserializer;
  29. private final Predicate<TopicMessageDTO> filter;
  30. private final boolean ascendingSortBeforeSend;
  31. private final @Nullable Integer limit;
  32. boolean limitReached() {
  33. return limit != null && sentMessages >= limit;
  34. }
  35. void send(FluxSink<TopicMessageEventDTO> sink, Iterable<ConsumerRecord<Bytes, Bytes>> polled) {
  36. sortForSending(polled, ascendingSortBeforeSend)
  37. .forEach(rec -> {
  38. if (!limitReached() && !sink.isCancelled()) {
  39. TopicMessageDTO topicMessage = deserializer.deserialize(rec);
  40. try {
  41. if (filter.test(topicMessage)) {
  42. sink.next(
  43. new TopicMessageEventDTO()
  44. .type(TopicMessageEventDTO.TypeEnum.MESSAGE)
  45. .message(topicMessage)
  46. );
  47. sentMessages++;
  48. }
  49. } catch (Exception e) {
  50. consumingStats.incFilterApplyError();
  51. log.trace("Error applying filter for message {}", topicMessage);
  52. }
  53. }
  54. });
  55. }
  56. void sentConsumingInfo(FluxSink<TopicMessageEventDTO> sink, PolledRecords polledRecords) {
  57. if (!sink.isCancelled()) {
  58. consumingStats.sendConsumingEvt(sink, polledRecords);
  59. }
  60. }
  61. void sendFinishEvent(FluxSink<TopicMessageEventDTO> sink) {
  62. if (!sink.isCancelled()) {
  63. consumingStats.sendFinishEvent(sink);
  64. }
  65. }
  66. void sendPhase(FluxSink<TopicMessageEventDTO> sink, String name) {
  67. if (!sink.isCancelled()) {
  68. sink.next(
  69. new TopicMessageEventDTO()
  70. .type(TopicMessageEventDTO.TypeEnum.PHASE)
  71. .phase(new TopicMessagePhaseDTO().name(name))
  72. );
  73. }
  74. }
  75. /*
  76. * Sorting by timestamps, BUT requesting that records within same partitions should be ordered by offsets.
  77. */
  78. @VisibleForTesting
  79. static Iterable<ConsumerRecord<Bytes, Bytes>> sortForSending(Iterable<ConsumerRecord<Bytes, Bytes>> records,
  80. boolean asc) {
  81. Comparator<ConsumerRecord> offsetComparator = asc
  82. ? Comparator.comparingLong(ConsumerRecord::offset)
  83. : Comparator.<ConsumerRecord>comparingLong(ConsumerRecord::offset).reversed();
  84. // partition -> sorted by offsets records
  85. Map<Integer, List<ConsumerRecord<Bytes, Bytes>>> perPartition = Streams.stream(records)
  86. .collect(
  87. groupingBy(
  88. ConsumerRecord::partition,
  89. TreeMap::new,
  90. collectingAndThen(toList(), lst -> lst.stream().sorted(offsetComparator).toList())));
  91. Comparator<ConsumerRecord> tsComparator = asc
  92. ? Comparator.comparing(ConsumerRecord::timestamp)
  93. : Comparator.<ConsumerRecord>comparingLong(ConsumerRecord::timestamp).reversed();
  94. // merge-sorting records from partitions one by one using timestamp comparator
  95. return Iterables.mergeSorted(perPartition.values(), tsComparator);
  96. }
  97. }