|
@@ -352,8 +352,9 @@ public class KafkaService {
|
|
|
);
|
|
|
}
|
|
|
|
|
|
- public Mono<Map<String, BigDecimal>> getOffsets (AdminClient ac, KafkaCluster c) {
|
|
|
- return getTopicPartitionList(ac)
|
|
|
+ public Mono<Map<String, BigDecimal>> getOffsets (KafkaCluster c) {
|
|
|
+ return getOrCreateAdminClient(c).flatMap(ac ->
|
|
|
+ getTopicPartitionList(ac.getAdminClient())
|
|
|
.map(s -> {
|
|
|
var tps = s.stream()
|
|
|
.map(tp ->
|
|
@@ -366,7 +367,7 @@ public class KafkaService {
|
|
|
.stream().flatMap(List::stream).collect(Collectors.toList())
|
|
|
.stream().flatMap(List::stream).collect(Collectors.toList());
|
|
|
return getTopicPartitionOffset(c, tps);
|
|
|
- });
|
|
|
+ }));
|
|
|
}
|
|
|
|
|
|
private Mono<List<Map<String, List<Integer>>>> getTopicPartitionList(AdminClient ac) {
|
|
@@ -386,6 +387,8 @@ public class KafkaService {
|
|
|
offset.putAll(ClusterUtil.toSingleMap(consumer.endOffsets(topicPartitions).entrySet().stream()
|
|
|
.map(e -> Map.of(e.getKey().topic() + '-' + e.getKey().partition() + '-' + "min", new BigDecimal(e.getValue())))));
|
|
|
return offset;
|
|
|
+ } catch (Exception e) {
|
|
|
+ return Collections.emptyMap();
|
|
|
}
|
|
|
}
|
|
|
}
|