AbstractEmitter.java 3.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081
  1. package com.provectus.kafka.ui.emitter;
  2. import com.provectus.kafka.ui.model.TopicMessageDTO;
  3. import com.provectus.kafka.ui.model.TopicMessageEventDTO;
  4. import com.provectus.kafka.ui.model.TopicMessagePhaseDTO;
  5. import com.provectus.kafka.ui.serdes.ConsumerRecordDeserializer;
  6. import java.time.Duration;
  7. import java.time.Instant;
  8. import org.apache.kafka.clients.consumer.Consumer;
  9. import org.apache.kafka.clients.consumer.ConsumerRecord;
  10. import org.apache.kafka.clients.consumer.ConsumerRecords;
  11. import org.apache.kafka.common.utils.Bytes;
  12. import reactor.core.publisher.FluxSink;
  13. public abstract class AbstractEmitter {
  14. private static final Duration DEFAULT_POLL_TIMEOUT_MS = Duration.ofMillis(1000L);
  15. // In some situations it is hard to say whether records range (between two offsets) was fully polled.
  16. // This happens when we have holes in records sequences that is usual case for compact topics or
  17. // topics with transactional writes. In such cases if you want to poll all records between offsets X and Y
  18. // there is no guarantee that you will ever see record with offset Y.
  19. // To workaround this we can assume that after N consecutive empty polls all target messages were read.
  20. public static final int NO_MORE_DATA_EMPTY_POLLS_COUNT = 3;
  21. private final ConsumerRecordDeserializer recordDeserializer;
  22. private final ConsumingStats consumingStats = new ConsumingStats();
  23. protected AbstractEmitter(ConsumerRecordDeserializer recordDeserializer) {
  24. this.recordDeserializer = recordDeserializer;
  25. }
  26. protected ConsumerRecords<Bytes, Bytes> poll(
  27. FluxSink<TopicMessageEventDTO> sink, Consumer<Bytes, Bytes> consumer) {
  28. return poll(sink, consumer, DEFAULT_POLL_TIMEOUT_MS);
  29. }
  30. protected ConsumerRecords<Bytes, Bytes> poll(
  31. FluxSink<TopicMessageEventDTO> sink, Consumer<Bytes, Bytes> consumer, Duration timeout) {
  32. Instant start = Instant.now();
  33. ConsumerRecords<Bytes, Bytes> records = consumer.poll(timeout);
  34. Instant finish = Instant.now();
  35. sendConsuming(sink, records, Duration.between(start, finish).toMillis());
  36. return records;
  37. }
  38. protected void sendMessage(FluxSink<TopicMessageEventDTO> sink,
  39. ConsumerRecord<Bytes, Bytes> msg) {
  40. final TopicMessageDTO topicMessage = recordDeserializer.deserialize(msg);
  41. sink.next(
  42. new TopicMessageEventDTO()
  43. .type(TopicMessageEventDTO.TypeEnum.MESSAGE)
  44. .message(topicMessage)
  45. );
  46. }
  47. protected void sendPhase(FluxSink<TopicMessageEventDTO> sink, String name) {
  48. sink.next(
  49. new TopicMessageEventDTO()
  50. .type(TopicMessageEventDTO.TypeEnum.PHASE)
  51. .phase(new TopicMessagePhaseDTO().name(name))
  52. );
  53. }
  54. protected void sendConsuming(FluxSink<TopicMessageEventDTO> sink,
  55. ConsumerRecords<Bytes, Bytes> records,
  56. long elapsed) {
  57. consumingStats.sendConsumingEvt(sink, records, elapsed, getFilterApplyErrors(sink));
  58. }
  59. protected void sendFinishStatsAndCompleteSink(FluxSink<TopicMessageEventDTO> sink) {
  60. consumingStats.sendFinishEvent(sink, getFilterApplyErrors(sink));
  61. sink.complete();
  62. }
  63. protected Number getFilterApplyErrors(FluxSink<?> sink) {
  64. return sink.contextView()
  65. .<MessageFilterStats>getOrEmpty(MessageFilterStats.class)
  66. .<Number>map(MessageFilterStats::getFilterApplyErrors)
  67. .orElse(0);
  68. }
  69. }