|
@@ -72,13 +72,10 @@ public class ClusterService {
|
|
|
.flatMap(s -> ClusterUtil.toMono(cluster.getAdminClient()
|
|
|
.describeConsumerGroups(s.stream().map(ConsumerGroupListing::groupId).collect(Collectors.toList())).all()))
|
|
|
.map(s -> {
|
|
|
- ConsumerGroupDetails result = new ConsumerGroupDetails();
|
|
|
+ var consumerDetails = s.get(consumerGroupId).members().stream().map(s1 -> ClusterUtil.partlyConvertToConsumerDetail(s1, consumerGroupId, cluster)).collect(Collectors.toList());
|
|
|
+ var result = new ConsumerGroupDetails();
|
|
|
+ result.setConsumers(consumerDetails);
|
|
|
result.setConsumerGroupId(consumerGroupId);
|
|
|
- result.setConsumers(new ArrayList<>());
|
|
|
- s.get(consumerGroupId).members().forEach(s1 -> {
|
|
|
- ConsumerDetail partlyResult = ClusterUtil.partlyConvertToConsumerDetail(s1, consumerGroupId, cluster);
|
|
|
- result.getConsumers().add(partlyResult);
|
|
|
- });
|
|
|
return result;
|
|
|
})
|
|
|
.flatMap(s -> ClusterUtil.toMono(cluster.getAdminClient().listConsumerGroupOffsets(consumerGroupId).partitionsToOffsetAndMetadata())
|
|
@@ -102,10 +99,13 @@ public class ClusterService {
|
|
|
private void fillOffsetParams(ConsumerDetail consumerDetail, Map<TopicPartition, OffsetAndMetadata> topicMetadata) {
|
|
|
List<Long> currentOffsets = new ArrayList<>();
|
|
|
List<Long> behindMessagesList = new ArrayList<>();
|
|
|
+ var endOffsets = consumerDetail.getEndOffset();
|
|
|
+ var topics = consumerDetail.getTopic();
|
|
|
+ var partitions = consumerDetail.getPartition();
|
|
|
for (int i = 0; i < consumerDetail.getTopic().size(); i++) {
|
|
|
- Long currentOffset = topicMetadata.get(new TopicPartition(consumerDetail.getTopic().get(i), consumerDetail.getPartition().get(i))).offset();
|
|
|
+ Long currentOffset = topicMetadata.get(new TopicPartition(topics.get(i), partitions.get(i))).offset();
|
|
|
currentOffsets.add(currentOffset);
|
|
|
- behindMessagesList.add(consumerDetail.getEndOffset().get(i) - currentOffset);
|
|
|
+ behindMessagesList.add(endOffsets.get(i) - currentOffset);
|
|
|
}
|
|
|
consumerDetail.setCurrentOffset(currentOffsets);
|
|
|
consumerDetail.setMessagesBehind(behindMessagesList);
|