package com.provectus.kafka.ui.emitter; import com.provectus.kafka.ui.model.TopicMessageEventDTO; import jakarta.annotation.Nullable; import java.time.Duration; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.common.utils.Bytes; import reactor.core.publisher.FluxSink; public abstract class AbstractEmitter implements java.util.function.Consumer> { private final MessagesProcessing messagesProcessing; protected final PollingSettings pollingSettings; protected AbstractEmitter(MessagesProcessing messagesProcessing, PollingSettings pollingSettings) { this.messagesProcessing = messagesProcessing; this.pollingSettings = pollingSettings; } protected PolledRecords poll( FluxSink sink, EnhancedConsumer consumer) { return poll(sink, consumer, pollingSettings.getPollTimeout()); } protected PolledRecords poll(FluxSink sink, EnhancedConsumer consumer, Duration timeout) { var records = consumer.pollEnhanced(timeout); sendConsuming(sink, records); return records; } protected boolean isSendLimitReached() { return messagesProcessing.limitReached(); } protected void sendMessage(FluxSink sink, ConsumerRecord msg) { messagesProcessing.sendMsg(sink, msg); } protected void sendPhase(FluxSink sink, String name) { messagesProcessing.sendPhase(sink, name); } protected void sendConsuming(FluxSink sink, PolledRecords records) { messagesProcessing.sentConsumingInfo(sink, records); } // cursor is null if target partitions were fully polled (no, need to do paging) protected void sendFinishStatsAndCompleteSink(FluxSink sink, @Nullable Cursor.Tracking cursor) { messagesProcessing.sendFinishEvents(sink, cursor); sink.complete(); } }