|
@@ -21,7 +21,7 @@ public class InternalConsumerGroup {
|
|
|
private final Collection<InternalMember> members;
|
|
|
private final Map<TopicPartition, Long> offsets;
|
|
|
private final Map<TopicPartition, Long> endOffsets;
|
|
|
- private final Long messagesBehind;
|
|
|
+ private final Long consumerLag;
|
|
|
private final Integer topicNum;
|
|
|
private final String partitionAssignor;
|
|
|
private final ConsumerGroupState state;
|
|
@@ -50,17 +50,17 @@ public class InternalConsumerGroup {
|
|
|
builder.members(internalMembers);
|
|
|
builder.offsets(groupOffsets);
|
|
|
builder.endOffsets(topicEndOffsets);
|
|
|
- builder.messagesBehind(calculateMessagesBehind(groupOffsets, topicEndOffsets));
|
|
|
+ builder.consumerLag(calculateConsumerLag(groupOffsets, topicEndOffsets));
|
|
|
builder.topicNum(calculateTopicNum(groupOffsets, internalMembers));
|
|
|
Optional.ofNullable(description.coordinator()).ifPresent(builder::coordinator);
|
|
|
return builder.build();
|
|
|
}
|
|
|
|
|
|
- private static Long calculateMessagesBehind(Map<TopicPartition, Long> offsets, Map<TopicPartition, Long> endOffsets) {
|
|
|
- Long messagesBehind = null;
|
|
|
- // messagesBehind should be undefined if no committed offsets found for topic
|
|
|
+ private static Long calculateConsumerLag(Map<TopicPartition, Long> offsets, Map<TopicPartition, Long> endOffsets) {
|
|
|
+ Long consumerLag = null;
|
|
|
+ // consumerLag should be undefined if no committed offsets found for topic
|
|
|
if (!offsets.isEmpty()) {
|
|
|
- messagesBehind = offsets.entrySet().stream()
|
|
|
+ consumerLag = offsets.entrySet().stream()
|
|
|
.mapToLong(e ->
|
|
|
Optional.ofNullable(endOffsets)
|
|
|
.map(o -> o.get(e.getKey()))
|
|
@@ -69,7 +69,7 @@ public class InternalConsumerGroup {
|
|
|
).sum();
|
|
|
}
|
|
|
|
|
|
- return messagesBehind;
|
|
|
+ return consumerLag;
|
|
|
}
|
|
|
|
|
|
private static Integer calculateTopicNum(Map<TopicPartition, Long> offsets, Collection<InternalMember> members) {
|