|
@@ -20,6 +20,7 @@ public class InternalConsumerGroup {
|
|
private final Collection<InternalMember> members;
|
|
private final Collection<InternalMember> members;
|
|
private final Map<TopicPartition, Long> offsets;
|
|
private final Map<TopicPartition, Long> offsets;
|
|
private final Map<TopicPartition, Long> endOffsets;
|
|
private final Map<TopicPartition, Long> endOffsets;
|
|
|
|
+ private final Long messagesBehind;
|
|
private final String partitionAssignor;
|
|
private final String partitionAssignor;
|
|
private final ConsumerGroupState state;
|
|
private final ConsumerGroupState state;
|
|
private final Node coordinator;
|
|
private final Node coordinator;
|
|
@@ -58,7 +59,25 @@ public class InternalConsumerGroup {
|
|
);
|
|
);
|
|
builder.offsets(groupOffsets);
|
|
builder.offsets(groupOffsets);
|
|
builder.endOffsets(topicEndOffsets);
|
|
builder.endOffsets(topicEndOffsets);
|
|
|
|
+ builder.messagesBehind(calculateMessagesBehind(groupOffsets, topicEndOffsets));
|
|
Optional.ofNullable(description.coordinator()).ifPresent(builder::coordinator);
|
|
Optional.ofNullable(description.coordinator()).ifPresent(builder::coordinator);
|
|
return builder.build();
|
|
return builder.build();
|
|
}
|
|
}
|
|
|
|
+
|
|
|
|
+ private static Long calculateMessagesBehind(Map<TopicPartition, Long> offsets, Map<TopicPartition, Long> endOffsets) {
|
|
|
|
+ Long messagesBehind = null;
|
|
|
|
+ // messagesBehind should be undefined if no committed offsets found for topic
|
|
|
|
+ if (!offsets.isEmpty()) {
|
|
|
|
+ messagesBehind = offsets.entrySet().stream()
|
|
|
|
+ .mapToLong(e ->
|
|
|
|
+ Optional.ofNullable(endOffsets)
|
|
|
|
+ .map(o -> o.get(e.getKey()))
|
|
|
|
+ .map(o -> o - e.getValue())
|
|
|
|
+ .orElse(0L)
|
|
|
|
+ ).sum();
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ return messagesBehind;
|
|
|
|
+ }
|
|
|
|
+
|
|
}
|
|
}
|