AbstractEmitter.java 1.5 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344
  1. package com.provectus.kafka.ui.emitter;
  2. import com.provectus.kafka.ui.model.TopicMessageEventDTO;
  3. import org.apache.kafka.clients.consumer.ConsumerRecord;
  4. import org.apache.kafka.common.utils.Bytes;
  5. import reactor.core.publisher.FluxSink;
  6. abstract class AbstractEmitter implements java.util.function.Consumer<FluxSink<TopicMessageEventDTO>> {
  7. private final MessagesProcessing messagesProcessing;
  8. private final PollingSettings pollingSettings;
  9. protected AbstractEmitter(MessagesProcessing messagesProcessing, PollingSettings pollingSettings) {
  10. this.messagesProcessing = messagesProcessing;
  11. this.pollingSettings = pollingSettings;
  12. }
  13. protected PolledRecords poll(FluxSink<TopicMessageEventDTO> sink, EnhancedConsumer consumer) {
  14. var records = consumer.pollEnhanced(pollingSettings.getPollTimeout());
  15. sendConsuming(sink, records);
  16. return records;
  17. }
  18. protected boolean sendLimitReached() {
  19. return messagesProcessing.limitReached();
  20. }
  21. protected void send(FluxSink<TopicMessageEventDTO> sink, Iterable<ConsumerRecord<Bytes, Bytes>> records) {
  22. messagesProcessing.send(sink, records);
  23. }
  24. protected void sendPhase(FluxSink<TopicMessageEventDTO> sink, String name) {
  25. messagesProcessing.sendPhase(sink, name);
  26. }
  27. protected void sendConsuming(FluxSink<TopicMessageEventDTO> sink, PolledRecords records) {
  28. messagesProcessing.sentConsumingInfo(sink, records);
  29. }
  30. protected void sendFinishStatsAndCompleteSink(FluxSink<TopicMessageEventDTO> sink) {
  31. messagesProcessing.sendFinishEvent(sink);
  32. sink.complete();
  33. }
  34. }