1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253 |
- package com.provectus.kafka.ui.emitter;
- import com.provectus.kafka.ui.model.TopicMessageEventDTO;
- import jakarta.annotation.Nullable;
- import java.time.Duration;
- import org.apache.kafka.clients.consumer.ConsumerRecord;
- import org.apache.kafka.common.utils.Bytes;
- import reactor.core.publisher.FluxSink;
- public abstract class AbstractEmitter implements java.util.function.Consumer<FluxSink<TopicMessageEventDTO>> {
- private final MessagesProcessing messagesProcessing;
- protected final PollingSettings pollingSettings;
- protected AbstractEmitter(MessagesProcessing messagesProcessing, PollingSettings pollingSettings) {
- this.messagesProcessing = messagesProcessing;
- this.pollingSettings = pollingSettings;
- }
- protected PolledRecords poll(
- FluxSink<TopicMessageEventDTO> sink, EnhancedConsumer consumer) {
- return poll(sink, consumer, pollingSettings.getPollTimeout());
- }
- protected PolledRecords poll(FluxSink<TopicMessageEventDTO> sink, EnhancedConsumer consumer, Duration timeout) {
- var records = consumer.pollEnhanced(timeout);
- sendConsuming(sink, records);
- return records;
- }
- protected boolean isSendLimitReached() {
- return messagesProcessing.limitReached();
- }
- protected void sendMessage(FluxSink<TopicMessageEventDTO> sink,
- ConsumerRecord<Bytes, Bytes> msg) {
- messagesProcessing.sendMsg(sink, msg);
- }
- protected void sendPhase(FluxSink<TopicMessageEventDTO> sink, String name) {
- messagesProcessing.sendPhase(sink, name);
- }
- protected void sendConsuming(FluxSink<TopicMessageEventDTO> sink, PolledRecords records) {
- messagesProcessing.sentConsumingInfo(sink, records);
- }
- // cursor is null if target partitions were fully polled (no, need to do paging)
- protected void sendFinishStatsAndCompleteSink(FluxSink<TopicMessageEventDTO> sink, @Nullable Cursor.Tracking cursor) {
- messagesProcessing.sendFinishEvents(sink, cursor);
- sink.complete();
- }
- }
|