From bf23e9c193b3a52b05954734f8564bdb8497ccad Mon Sep 17 00:00:00 2001 From: Anton Petrov Date: Tue, 11 Aug 2020 17:23:37 +0300 Subject: [PATCH 1/2] Fix NPE if no offsets were found for specified timestamp --- .../ui/cluster/service/ConsumingService.java | 67 ++++++++++++------- 1 file changed, 43 insertions(+), 24 deletions(-) diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/service/ConsumingService.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/service/ConsumingService.java index 84b87b72ce..e6491698ce 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/service/ConsumingService.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/service/ConsumingService.java @@ -8,12 +8,14 @@ import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.function.Function; import java.util.stream.Collectors; import org.apache.commons.lang3.StringUtils; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.consumer.OffsetAndTimestamp; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.utils.Bytes; import org.springframework.stereotype.Service; @@ -101,8 +103,9 @@ public class ConsumingService { public void emit(FluxSink> sink) { try (KafkaConsumer consumer = kafkaService.createConsumer(cluster)) { - assignPartitions(consumer); - seekOffsets(consumer); +// assignPartitions(consumer); +// seekOffsets(consumer); + assignAndSeek(consumer); int pollsCount = 0; while (!sink.isCancelled() && ++pollsCount < MAX_POLLS_COUNT) { ConsumerRecords records = consumer.poll(POLL_TIMEOUT_MS); @@ -131,39 +134,55 @@ public class ConsumingService { .collect(Collectors.toList()); } - private void assignPartitions(KafkaConsumer consumer) { - List partitions = getRequestedPartitions(); - - consumer.assign(partitions); - } - - private void seekOffsets(KafkaConsumer consumer) { + private void assignAndSeek(KafkaConsumer consumer) { SeekType seekType = consumerPosition.getSeekType(); switch (seekType) { case OFFSET: - consumerPosition.getSeekTo().forEach((partition, offset) -> { - TopicPartition topicPartition = new TopicPartition(topic, partition); - consumer.seek(topicPartition, offset); - }); + assignAndSeekForOffset(consumer); break; case TIMESTAMP: - Map timestampsToSearch = consumerPosition.getSeekTo().entrySet().stream() - .collect(Collectors.toMap( - partitionPosition -> new TopicPartition(topic, partitionPosition.getKey()), - Map.Entry::getValue - )); - consumer.offsetsForTimes(timestampsToSearch) - .forEach((topicPartition, offsetAndTimestamp) -> - consumer.seek(topicPartition, offsetAndTimestamp.offset()) - ); + assignAndSeekForTimestamp(consumer); break; case BEGINNING: - List partitions = getRequestedPartitions(); - consumer.seekToBeginning(partitions); + assignAndSeekFromBeginning(consumer); break; default: throw new IllegalArgumentException("Unknown seekType: " + seekType); } } + + private void assignAndSeekForOffset(KafkaConsumer consumer) { + List partitions = getRequestedPartitions(); + consumer.assign(partitions); + consumerPosition.getSeekTo().forEach((partition, offset) -> { + TopicPartition topicPartition = new TopicPartition(topic, partition); + consumer.seek(topicPartition, offset); + }); + } + + private void assignAndSeekForTimestamp(KafkaConsumer consumer) { + Map timestampsToSearch = consumerPosition.getSeekTo().entrySet().stream() + .collect(Collectors.toMap( + partitionPosition -> new TopicPartition(topic, partitionPosition.getKey()), + Map.Entry::getValue + )); + Map offsetsForTimestamps = consumer.offsetsForTimes(timestampsToSearch) + .entrySet().stream() + .filter(e -> e.getValue() != null) + .collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue().offset())); + + if (offsetsForTimestamps.isEmpty()) { + throw new IllegalArgumentException("No offsets were found for requested timestamps"); + } + + consumer.assign(offsetsForTimestamps.keySet()); + offsetsForTimestamps.forEach(consumer::seek); + } + + private void assignAndSeekFromBeginning(KafkaConsumer consumer) { + List partitions = getRequestedPartitions(); + consumer.assign(partitions); + consumer.seekToBeginning(partitions); + } } }