|
@@ -72,8 +72,9 @@ abstract class RangePollingEmitter extends AbstractEmitter {
|
|
|
FluxSink<TopicMessageEventDTO> sink,
|
|
|
TreeMap<TopicPartition, FromToOffset> range) {
|
|
|
log.trace("Polling range {}", range);
|
|
|
+ sendPhase(sink,
|
|
|
+ "Polling partitions: %s".formatted(range.keySet().stream().map(TopicPartition::partition).sorted().toList()));
|
|
|
|
|
|
- sendPhase(sink, String.format("Polling partitions: %s", range.keySet()));
|
|
|
consumer.assign(range.keySet());
|
|
|
range.forEach((tp, fromTo) -> consumer.seek(tp, fromTo.from));
|
|
|
|