parent
cdcba2d2e9
commit
de5a8652a1
2 changed files with 25 additions and 1 deletions
|
@ -14,12 +14,14 @@ import com.provectus.kafka.ui.util.OffsetsSeekBackward;
|
|||
import com.provectus.kafka.ui.util.OffsetsSeekForward;
|
||||
import java.time.Duration;
|
||||
import java.util.Collection;
|
||||
import java.util.Comparator;
|
||||
import java.util.LinkedList;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
import java.util.function.Supplier;
|
||||
import java.util.stream.Collectors;
|
||||
import java.util.stream.StreamSupport;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import lombok.extern.log4j.Log4j2;
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
|
@ -136,6 +138,15 @@ public class ConsumingService {
|
|||
|
||||
private static final Duration POLL_TIMEOUT_MS = Duration.ofMillis(1000L);
|
||||
|
||||
private static final Comparator<ConsumerRecord<?, ?>> PARTITION_COMPARING =
|
||||
Comparator.comparing(
|
||||
ConsumerRecord::partition,
|
||||
Comparator.nullsFirst(Comparator.naturalOrder())
|
||||
);
|
||||
private static final Comparator<ConsumerRecord<?, ?>> REVERED_COMPARING =
|
||||
PARTITION_COMPARING.thenComparing(ConsumerRecord::offset).reversed();
|
||||
|
||||
|
||||
private final Supplier<KafkaConsumer<Bytes, Bytes>> consumerSupplier;
|
||||
private final OffsetsSeek offsetsSeek;
|
||||
|
||||
|
@ -146,7 +157,16 @@ public class ConsumingService {
|
|||
while (!sink.isCancelled() && !waitingOffsets.endReached()) {
|
||||
ConsumerRecords<Bytes, Bytes> records = consumer.poll(POLL_TIMEOUT_MS);
|
||||
log.info("{} records polled", records.count());
|
||||
for (ConsumerRecord<Bytes, Bytes> record : records) {
|
||||
|
||||
final Iterable<ConsumerRecord<Bytes, Bytes>> iterable;
|
||||
if (offsetsSeek.getConsumerPosition().getSeekDirection().equals(SeekDirection.FORWARD)) {
|
||||
iterable = records;
|
||||
} else {
|
||||
iterable = StreamSupport.stream(records.spliterator(), false)
|
||||
.sorted(REVERED_COMPARING).collect(Collectors.toList());
|
||||
}
|
||||
|
||||
for (ConsumerRecord<Bytes, Bytes> record : iterable) {
|
||||
if (!sink.isCancelled() && !waitingOffsets.endReached()) {
|
||||
sink.next(record);
|
||||
waitingOffsets.markPolled(record);
|
||||
|
|
|
@ -23,6 +23,10 @@ public abstract class OffsetsSeek {
|
|||
this.consumerPosition = consumerPosition;
|
||||
}
|
||||
|
||||
public ConsumerPosition getConsumerPosition() {
|
||||
return consumerPosition;
|
||||
}
|
||||
|
||||
public WaitingOffsets assignAndSeek(Consumer<Bytes, Bytes> consumer) {
|
||||
SeekType seekType = consumerPosition.getSeekType();
|
||||
log.info("Positioning consumer for topic {} with {}", topic, consumerPosition);
|
||||
|
|
Loading…
Add table
Reference in a new issue