AbstractEmitter.java 1.9 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253
  1. package com.provectus.kafka.ui.emitter;
  2. import com.provectus.kafka.ui.model.TopicMessageEventDTO;
  3. import jakarta.annotation.Nullable;
  4. import java.time.Duration;
  5. import org.apache.kafka.clients.consumer.ConsumerRecord;
  6. import org.apache.kafka.common.utils.Bytes;
  7. import reactor.core.publisher.FluxSink;
  8. public abstract class AbstractEmitter implements java.util.function.Consumer<FluxSink<TopicMessageEventDTO>> {
  9. private final MessagesProcessing messagesProcessing;
  10. protected final PollingSettings pollingSettings;
  11. protected AbstractEmitter(MessagesProcessing messagesProcessing, PollingSettings pollingSettings) {
  12. this.messagesProcessing = messagesProcessing;
  13. this.pollingSettings = pollingSettings;
  14. }
  15. protected PolledRecords poll(
  16. FluxSink<TopicMessageEventDTO> sink, EnhancedConsumer consumer) {
  17. return poll(sink, consumer, pollingSettings.getPollTimeout());
  18. }
  19. protected PolledRecords poll(FluxSink<TopicMessageEventDTO> sink, EnhancedConsumer consumer, Duration timeout) {
  20. var records = consumer.pollEnhanced(timeout);
  21. sendConsuming(sink, records);
  22. return records;
  23. }
  24. protected boolean isSendLimitReached() {
  25. return messagesProcessing.limitReached();
  26. }
  27. protected void sendMessage(FluxSink<TopicMessageEventDTO> sink,
  28. ConsumerRecord<Bytes, Bytes> msg) {
  29. messagesProcessing.sendMsg(sink, msg);
  30. }
  31. protected void sendPhase(FluxSink<TopicMessageEventDTO> sink, String name) {
  32. messagesProcessing.sendPhase(sink, name);
  33. }
  34. protected void sendConsuming(FluxSink<TopicMessageEventDTO> sink, PolledRecords records) {
  35. messagesProcessing.sentConsumingInfo(sink, records);
  36. }
  37. // cursor is null if target partitions were fully polled (no, need to do paging)
  38. protected void sendFinishStatsAndCompleteSink(FluxSink<TopicMessageEventDTO> sink, @Nullable Cursor.Tracking cursor) {
  39. messagesProcessing.sendFinishEvents(sink, cursor);
  40. sink.complete();
  41. }
  42. }