package com.provectus.kafka.ui.emitter; import com.provectus.kafka.ui.model.TopicMessageEventDTO; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.common.utils.Bytes; import reactor.core.publisher.FluxSink; abstract class AbstractEmitter implements java.util.function.Consumer> { private final MessagesProcessing messagesProcessing; private final PollingSettings pollingSettings; protected AbstractEmitter(MessagesProcessing messagesProcessing, PollingSettings pollingSettings) { this.messagesProcessing = messagesProcessing; this.pollingSettings = pollingSettings; } protected PolledRecords poll(FluxSink sink, EnhancedConsumer consumer) { var records = consumer.pollEnhanced(pollingSettings.getPollTimeout()); sendConsuming(sink, records); return records; } protected boolean sendLimitReached() { return messagesProcessing.limitReached(); } protected void send(FluxSink sink, Iterable> records) { messagesProcessing.send(sink, records); } protected void sendPhase(FluxSink sink, String name) { messagesProcessing.sendPhase(sink, name); } protected void sendConsuming(FluxSink sink, PolledRecords records) { messagesProcessing.sentConsumingInfo(sink, records); } protected void sendFinishStatsAndCompleteSink(FluxSink sink) { messagesProcessing.sendFinishEvent(sink); sink.complete(); } }