AbstractEmitter.java 3.6 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485
  1. package com.provectus.kafka.ui.emitter;
  2. import com.provectus.kafka.ui.model.TopicMessageDTO;
  3. import com.provectus.kafka.ui.model.TopicMessageEventDTO;
  4. import com.provectus.kafka.ui.model.TopicMessagePhaseDTO;
  5. import com.provectus.kafka.ui.serdes.ConsumerRecordDeserializer;
  6. import com.provectus.kafka.ui.util.PollingThrottler;
  7. import java.time.Duration;
  8. import java.time.Instant;
  9. import org.apache.kafka.clients.consumer.Consumer;
  10. import org.apache.kafka.clients.consumer.ConsumerRecord;
  11. import org.apache.kafka.clients.consumer.ConsumerRecords;
  12. import org.apache.kafka.common.utils.Bytes;
  13. import reactor.core.publisher.FluxSink;
  14. public abstract class AbstractEmitter {
  15. private static final Duration DEFAULT_POLL_TIMEOUT_MS = Duration.ofMillis(1000L);
  16. // In some situations it is hard to say whether records range (between two offsets) was fully polled.
  17. // This happens when we have holes in records sequences that is usual case for compact topics or
  18. // topics with transactional writes. In such cases if you want to poll all records between offsets X and Y
  19. // there is no guarantee that you will ever see record with offset Y.
  20. // To workaround this we can assume that after N consecutive empty polls all target messages were read.
  21. public static final int NO_MORE_DATA_EMPTY_POLLS_COUNT = 3;
  22. private final ConsumerRecordDeserializer recordDeserializer;
  23. private final ConsumingStats consumingStats = new ConsumingStats();
  24. private final PollingThrottler throttler;
  25. protected AbstractEmitter(ConsumerRecordDeserializer recordDeserializer, PollingThrottler throttler) {
  26. this.recordDeserializer = recordDeserializer;
  27. this.throttler = throttler;
  28. }
  29. protected ConsumerRecords<Bytes, Bytes> poll(
  30. FluxSink<TopicMessageEventDTO> sink, Consumer<Bytes, Bytes> consumer) {
  31. return poll(sink, consumer, DEFAULT_POLL_TIMEOUT_MS);
  32. }
  33. protected ConsumerRecords<Bytes, Bytes> poll(
  34. FluxSink<TopicMessageEventDTO> sink, Consumer<Bytes, Bytes> consumer, Duration timeout) {
  35. Instant start = Instant.now();
  36. ConsumerRecords<Bytes, Bytes> records = consumer.poll(timeout);
  37. Instant finish = Instant.now();
  38. int polledBytes = sendConsuming(sink, records, Duration.between(start, finish).toMillis());
  39. throttler.throttleAfterPoll(polledBytes);
  40. return records;
  41. }
  42. protected void sendMessage(FluxSink<TopicMessageEventDTO> sink,
  43. ConsumerRecord<Bytes, Bytes> msg) {
  44. final TopicMessageDTO topicMessage = recordDeserializer.deserialize(msg);
  45. sink.next(
  46. new TopicMessageEventDTO()
  47. .type(TopicMessageEventDTO.TypeEnum.MESSAGE)
  48. .message(topicMessage)
  49. );
  50. }
  51. protected void sendPhase(FluxSink<TopicMessageEventDTO> sink, String name) {
  52. sink.next(
  53. new TopicMessageEventDTO()
  54. .type(TopicMessageEventDTO.TypeEnum.PHASE)
  55. .phase(new TopicMessagePhaseDTO().name(name))
  56. );
  57. }
  58. protected int sendConsuming(FluxSink<TopicMessageEventDTO> sink,
  59. ConsumerRecords<Bytes, Bytes> records,
  60. long elapsed) {
  61. return consumingStats.sendConsumingEvt(sink, records, elapsed, getFilterApplyErrors(sink));
  62. }
  63. protected void sendFinishStatsAndCompleteSink(FluxSink<TopicMessageEventDTO> sink) {
  64. consumingStats.sendFinishEvent(sink, getFilterApplyErrors(sink));
  65. sink.complete();
  66. }
  67. protected Number getFilterApplyErrors(FluxSink<?> sink) {
  68. return sink.contextView()
  69. .<MessageFilterStats>getOrEmpty(MessageFilterStats.class)
  70. .<Number>map(MessageFilterStats::getFilterApplyErrors)
  71. .orElse(0);
  72. }
  73. }