Fix NPE if no offsets were found for specified timestamp (#92)

This commit is contained in:
German Osin 2020-08-12 11:47:29 +03:00 committed by GitHub
commit 5ff3eaf743
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23

View file

@ -8,12 +8,14 @@ import java.util.LinkedList;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Optional; import java.util.Optional;
import java.util.function.Function;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.common.utils.Bytes;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
@ -101,8 +103,9 @@ public class ConsumingService {
public void emit(FluxSink<ConsumerRecord<Bytes, Bytes>> sink) { public void emit(FluxSink<ConsumerRecord<Bytes, Bytes>> sink) {
try (KafkaConsumer<Bytes, Bytes> consumer = kafkaService.createConsumer(cluster)) { try (KafkaConsumer<Bytes, Bytes> consumer = kafkaService.createConsumer(cluster)) {
assignPartitions(consumer); // assignPartitions(consumer);
seekOffsets(consumer); // seekOffsets(consumer);
assignAndSeek(consumer);
int pollsCount = 0; int pollsCount = 0;
while (!sink.isCancelled() && ++pollsCount < MAX_POLLS_COUNT) { while (!sink.isCancelled() && ++pollsCount < MAX_POLLS_COUNT) {
ConsumerRecords<Bytes, Bytes> records = consumer.poll(POLL_TIMEOUT_MS); ConsumerRecords<Bytes, Bytes> records = consumer.poll(POLL_TIMEOUT_MS);
@ -131,39 +134,55 @@ public class ConsumingService {
.collect(Collectors.toList()); .collect(Collectors.toList());
} }
private void assignPartitions(KafkaConsumer<Bytes, Bytes> consumer) { private void assignAndSeek(KafkaConsumer<Bytes, Bytes> consumer) {
List<TopicPartition> partitions = getRequestedPartitions();
consumer.assign(partitions);
}
private void seekOffsets(KafkaConsumer<Bytes, Bytes> consumer) {
SeekType seekType = consumerPosition.getSeekType(); SeekType seekType = consumerPosition.getSeekType();
switch (seekType) { switch (seekType) {
case OFFSET: case OFFSET:
consumerPosition.getSeekTo().forEach((partition, offset) -> { assignAndSeekForOffset(consumer);
TopicPartition topicPartition = new TopicPartition(topic, partition);
consumer.seek(topicPartition, offset);
});
break; break;
case TIMESTAMP: case TIMESTAMP:
Map<TopicPartition, Long> timestampsToSearch = consumerPosition.getSeekTo().entrySet().stream() assignAndSeekForTimestamp(consumer);
.collect(Collectors.toMap(
partitionPosition -> new TopicPartition(topic, partitionPosition.getKey()),
Map.Entry::getValue
));
consumer.offsetsForTimes(timestampsToSearch)
.forEach((topicPartition, offsetAndTimestamp) ->
consumer.seek(topicPartition, offsetAndTimestamp.offset())
);
break; break;
case BEGINNING: case BEGINNING:
List<TopicPartition> partitions = getRequestedPartitions(); assignAndSeekFromBeginning(consumer);
consumer.seekToBeginning(partitions);
break; break;
default: default:
throw new IllegalArgumentException("Unknown seekType: " + seekType); throw new IllegalArgumentException("Unknown seekType: " + seekType);
} }
} }
private void assignAndSeekForOffset(KafkaConsumer<Bytes, Bytes> consumer) {
List<TopicPartition> partitions = getRequestedPartitions();
consumer.assign(partitions);
consumerPosition.getSeekTo().forEach((partition, offset) -> {
TopicPartition topicPartition = new TopicPartition(topic, partition);
consumer.seek(topicPartition, offset);
});
}
private void assignAndSeekForTimestamp(KafkaConsumer<Bytes, Bytes> consumer) {
Map<TopicPartition, Long> timestampsToSearch = consumerPosition.getSeekTo().entrySet().stream()
.collect(Collectors.toMap(
partitionPosition -> new TopicPartition(topic, partitionPosition.getKey()),
Map.Entry::getValue
));
Map<TopicPartition, Long> offsetsForTimestamps = consumer.offsetsForTimes(timestampsToSearch)
.entrySet().stream()
.filter(e -> e.getValue() != null)
.collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue().offset()));
if (offsetsForTimestamps.isEmpty()) {
throw new IllegalArgumentException("No offsets were found for requested timestamps");
}
consumer.assign(offsetsForTimestamps.keySet());
offsetsForTimestamps.forEach(consumer::seek);
}
private void assignAndSeekFromBeginning(KafkaConsumer<Bytes, Bytes> consumer) {
List<TopicPartition> partitions = getRequestedPartitions();
consumer.assign(partitions);
consumer.seekToBeginning(partitions);
}
} }
} }