|
@@ -380,11 +380,12 @@ public class KafkaService {
|
|
|
}
|
|
|
|
|
|
private Map<String, BigDecimal> getTopicPartitionOffset(KafkaCluster c, List<TopicPartition> topicPartitions ) {
|
|
|
- var offset = ClusterUtil.toSingleMap(createConsumer(c).beginningOffsets(topicPartitions).entrySet().stream()
|
|
|
+ try (var consumer = createConsumer(c)) {
|
|
|
+ var offset = ClusterUtil.toSingleMap(consumer.beginningOffsets(topicPartitions).entrySet().stream()
|
|
|
.map(e -> Map.of(e.getKey().topic() + '-' + e.getKey().partition() + '-' + "min", new BigDecimal(e.getValue()))));
|
|
|
- offset.putAll(ClusterUtil.toSingleMap(createConsumer(c).endOffsets(topicPartitions).entrySet().stream()
|
|
|
+ offset.putAll(ClusterUtil.toSingleMap(consumer.endOffsets(topicPartitions).entrySet().stream()
|
|
|
.map(e -> Map.of(e.getKey().topic() + '-' + e.getKey().partition() + '-' + "min", new BigDecimal(e.getValue())))));
|
|
|
return offset;
|
|
|
-
|
|
|
+ }
|
|
|
}
|
|
|
}
|