AbstractEmitter.java 2.3 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162
  1. package com.provectus.kafka.ui.emitter;
  2. import com.provectus.kafka.ui.model.TopicMessageEventDTO;
  3. import java.time.Duration;
  4. import java.time.Instant;
  5. import org.apache.kafka.clients.consumer.Consumer;
  6. import org.apache.kafka.clients.consumer.ConsumerRecord;
  7. import org.apache.kafka.clients.consumer.ConsumerRecords;
  8. import org.apache.kafka.common.utils.Bytes;
  9. import reactor.core.publisher.FluxSink;
  10. public abstract class AbstractEmitter {
  11. private final MessagesProcessing messagesProcessing;
  12. private final PollingThrottler throttler;
  13. protected final PollingSettings pollingSettings;
  14. protected AbstractEmitter(MessagesProcessing messagesProcessing, PollingSettings pollingSettings) {
  15. this.messagesProcessing = messagesProcessing;
  16. this.pollingSettings = pollingSettings;
  17. this.throttler = pollingSettings.getPollingThrottler();
  18. }
  19. protected ConsumerRecords<Bytes, Bytes> poll(
  20. FluxSink<TopicMessageEventDTO> sink, Consumer<Bytes, Bytes> consumer) {
  21. return poll(sink, consumer, pollingSettings.getPollTimeout());
  22. }
  23. protected ConsumerRecords<Bytes, Bytes> poll(
  24. FluxSink<TopicMessageEventDTO> sink, Consumer<Bytes, Bytes> consumer, Duration timeout) {
  25. Instant start = Instant.now();
  26. ConsumerRecords<Bytes, Bytes> records = consumer.poll(timeout);
  27. Instant finish = Instant.now();
  28. int polledBytes = sendConsuming(sink, records, Duration.between(start, finish).toMillis());
  29. throttler.throttleAfterPoll(polledBytes);
  30. return records;
  31. }
  32. protected boolean sendLimitReached() {
  33. return messagesProcessing.limitReached();
  34. }
  35. protected void sendMessage(FluxSink<TopicMessageEventDTO> sink,
  36. ConsumerRecord<Bytes, Bytes> msg) {
  37. messagesProcessing.sendMsg(sink, msg);
  38. }
  39. protected void sendPhase(FluxSink<TopicMessageEventDTO> sink, String name) {
  40. messagesProcessing.sendPhase(sink, name);
  41. }
  42. protected int sendConsuming(FluxSink<TopicMessageEventDTO> sink,
  43. ConsumerRecords<Bytes, Bytes> records,
  44. long elapsed) {
  45. return messagesProcessing.sentConsumingInfo(sink, records, elapsed);
  46. }
  47. protected void sendFinishStatsAndCompleteSink(FluxSink<TopicMessageEventDTO> sink) {
  48. messagesProcessing.sendFinishEvent(sink);
  49. sink.complete();
  50. }
  51. }