|
@@ -348,27 +348,23 @@ public class KafkaService {
|
|
|
);
|
|
|
}
|
|
|
|
|
|
- public InternalTopic fillOffsets (InternalTopic topic, KafkaCluster cluster) {
|
|
|
+ public List<TopicPartitionDto> partitionDtoList (InternalTopic topic, KafkaCluster cluster) {
|
|
|
var topicPartitions = topic.getPartitions().stream().map(t -> new TopicPartition(topic.getName(), t.getPartition())).collect(Collectors.toList());
|
|
|
- return topic.toBuilder().offsets(getTopicPartitionOffset(cluster, topicPartitions)).build();
|
|
|
+ return getTopicPartitionOffset(cluster, topicPartitions);
|
|
|
}
|
|
|
|
|
|
private List<TopicPartitionDto> getTopicPartitionOffset(KafkaCluster c, List<TopicPartition> topicPartitions ) {
|
|
|
try (var consumer = createConsumer(c)) {
|
|
|
- var offset = consumer.beginningOffsets(topicPartitions).entrySet().stream()
|
|
|
- .map(e -> {
|
|
|
- var tempResult = new TopicPartitionDto();
|
|
|
- tempResult.setTopic(e.getKey().topic());
|
|
|
- tempResult.setPartition(e.getKey().partition());
|
|
|
- tempResult.setOffsetMin(e.getValue().intValue());
|
|
|
- return tempResult;
|
|
|
- }).map(o -> consumer.endOffsets(topicPartitions).entrySet()
|
|
|
- .stream().filter(max -> o.getTopic().equals(max.getKey().topic()))
|
|
|
- .map(max -> {
|
|
|
- o.setOffsetMax(max.getValue().intValue());
|
|
|
- return o;
|
|
|
- }).findFirst().orElse(o)).collect(Collectors.toList());
|
|
|
- return offset;
|
|
|
+ final Map<TopicPartition, Long> earliest = consumer.beginningOffsets(topicPartitions);
|
|
|
+ final Map<TopicPartition, Long> latest = consumer.endOffsets(topicPartitions);
|
|
|
+
|
|
|
+ return topicPartitions.stream()
|
|
|
+ .map( tp -> new TopicPartitionDto()
|
|
|
+ .topic(tp.topic())
|
|
|
+ .partition(tp.partition())
|
|
|
+ .offsetMin(Optional.ofNullable(earliest.get(tp)).orElse(0L))
|
|
|
+ .offsetMax(Optional.ofNullable(latest.get(tp)).orElse(0L))
|
|
|
+ ).collect(Collectors.toList());
|
|
|
} catch (Exception e) {
|
|
|
return Collections.emptyList();
|
|
|
}
|