This commit is contained in:
iliax 2023-08-07 16:55:35 +04:00
parent f985f0e360
commit fda75b4a5e

View file

@ -18,6 +18,12 @@ public abstract class AbstractEmitter implements java.util.function.Consumer<Flu
this.pollingSettings = pollingSettings;
}
private PolledRecords poll(FluxSink<TopicMessageEventDTO> sink, EnhancedConsumer consumer, Duration timeout) {
var records = consumer.pollEnhanced(timeout);
sendConsuming(sink, records);
return records;
}
protected PolledRecords poll(FluxSink<TopicMessageEventDTO> sink, EnhancedConsumer consumer) {
return poll(sink, consumer, pollingSettings.getPollTimeout());
}
@ -26,12 +32,6 @@ public abstract class AbstractEmitter implements java.util.function.Consumer<Flu
return poll(sink, consumer, pollingSettings.getPartitionPollTimeout());
}
private PolledRecords poll(FluxSink<TopicMessageEventDTO> sink, EnhancedConsumer consumer, Duration timeout) {
var records = consumer.pollEnhanced(timeout);
sendConsuming(sink, records);
return records;
}
protected void buffer(ConsumerRecord<Bytes, Bytes> rec) {
messagesProcessing.buffer(rec);
}