Fix NPE if no offsets were found for specified timestamp

This commit is contained in:
Anton Petrov 2020-08-11 17:23:37 +03:00
parent dd5389cb1c
commit bf23e9c193

View file

@ -8,12 +8,14 @@ import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.utils.Bytes;
import org.springframework.stereotype.Service;
@ -101,8 +103,9 @@ public class ConsumingService {
public void emit(FluxSink<ConsumerRecord<Bytes, Bytes>> sink) {
try (KafkaConsumer<Bytes, Bytes> consumer = kafkaService.createConsumer(cluster)) {
assignPartitions(consumer);
seekOffsets(consumer);
// assignPartitions(consumer);
// seekOffsets(consumer);
assignAndSeek(consumer);
int pollsCount = 0;
while (!sink.isCancelled() && ++pollsCount < MAX_POLLS_COUNT) {
ConsumerRecords<Bytes, Bytes> records = consumer.poll(POLL_TIMEOUT_MS);
@ -131,39 +134,55 @@ public class ConsumingService {
.collect(Collectors.toList());
}
private void assignPartitions(KafkaConsumer<Bytes, Bytes> consumer) {
List<TopicPartition> partitions = getRequestedPartitions();
consumer.assign(partitions);
}
private void seekOffsets(KafkaConsumer<Bytes, Bytes> consumer) {
private void assignAndSeek(KafkaConsumer<Bytes, Bytes> consumer) {
SeekType seekType = consumerPosition.getSeekType();
switch (seekType) {
case OFFSET:
consumerPosition.getSeekTo().forEach((partition, offset) -> {
TopicPartition topicPartition = new TopicPartition(topic, partition);
consumer.seek(topicPartition, offset);
});
assignAndSeekForOffset(consumer);
break;
case TIMESTAMP:
Map<TopicPartition, Long> timestampsToSearch = consumerPosition.getSeekTo().entrySet().stream()
.collect(Collectors.toMap(
partitionPosition -> new TopicPartition(topic, partitionPosition.getKey()),
Map.Entry::getValue
));
consumer.offsetsForTimes(timestampsToSearch)
.forEach((topicPartition, offsetAndTimestamp) ->
consumer.seek(topicPartition, offsetAndTimestamp.offset())
);
assignAndSeekForTimestamp(consumer);
break;
case BEGINNING:
List<TopicPartition> partitions = getRequestedPartitions();
consumer.seekToBeginning(partitions);
assignAndSeekFromBeginning(consumer);
break;
default:
throw new IllegalArgumentException("Unknown seekType: " + seekType);
}
}
private void assignAndSeekForOffset(KafkaConsumer<Bytes, Bytes> consumer) {
List<TopicPartition> partitions = getRequestedPartitions();
consumer.assign(partitions);
consumerPosition.getSeekTo().forEach((partition, offset) -> {
TopicPartition topicPartition = new TopicPartition(topic, partition);
consumer.seek(topicPartition, offset);
});
}
private void assignAndSeekForTimestamp(KafkaConsumer<Bytes, Bytes> consumer) {
Map<TopicPartition, Long> timestampsToSearch = consumerPosition.getSeekTo().entrySet().stream()
.collect(Collectors.toMap(
partitionPosition -> new TopicPartition(topic, partitionPosition.getKey()),
Map.Entry::getValue
));
Map<TopicPartition, Long> offsetsForTimestamps = consumer.offsetsForTimes(timestampsToSearch)
.entrySet().stream()
.filter(e -> e.getValue() != null)
.collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue().offset()));
if (offsetsForTimestamps.isEmpty()) {
throw new IllegalArgumentException("No offsets were found for requested timestamps");
}
consumer.assign(offsetsForTimestamps.keySet());
offsetsForTimestamps.forEach(consumer::seek);
}
private void assignAndSeekFromBeginning(KafkaConsumer<Bytes, Bytes> consumer) {
List<TopicPartition> partitions = getRequestedPartitions();
consumer.assign(partitions);
consumer.seekToBeginning(partitions);
}
}
}