InternalTopicConsumerGroup.java 1.9 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061
  1. package com.provectus.kafka.ui.model;
  2. import java.util.Map;
  3. import java.util.Optional;
  4. import javax.annotation.Nullable;
  5. import lombok.Builder;
  6. import lombok.Value;
  7. import org.apache.kafka.clients.admin.ConsumerGroupDescription;
  8. import org.apache.kafka.common.ConsumerGroupState;
  9. import org.apache.kafka.common.Node;
  10. import org.apache.kafka.common.TopicPartition;
  11. @Value
  12. @Builder
  13. public class InternalTopicConsumerGroup {
  14. String groupId;
  15. int members;
  16. @Nullable
  17. Long consumerLag; //null means no committed offsets found for this group
  18. boolean isSimple;
  19. String partitionAssignor;
  20. ConsumerGroupState state;
  21. @Nullable
  22. Node coordinator;
  23. public static InternalTopicConsumerGroup create(
  24. String topic,
  25. ConsumerGroupDescription g,
  26. Map<TopicPartition, Long> committedOffsets,
  27. Map<TopicPartition, Long> endOffsets) {
  28. return InternalTopicConsumerGroup.builder()
  29. .groupId(g.groupId())
  30. .members(
  31. (int) g.members().stream()
  32. // counting only members with target topic assignment
  33. .filter(m -> m.assignment().topicPartitions().stream().anyMatch(p -> p.topic().equals(topic)))
  34. .count()
  35. )
  36. .consumerLag(calculateConsumerLag(committedOffsets, endOffsets))
  37. .isSimple(g.isSimpleConsumerGroup())
  38. .partitionAssignor(g.partitionAssignor())
  39. .state(g.state())
  40. .coordinator(g.coordinator())
  41. .build();
  42. }
  43. @Nullable
  44. private static Long calculateConsumerLag(Map<TopicPartition, Long> committedOffsets,
  45. Map<TopicPartition, Long> endOffsets) {
  46. if (committedOffsets.isEmpty()) {
  47. return null;
  48. }
  49. return committedOffsets.entrySet().stream()
  50. .mapToLong(e ->
  51. Optional.ofNullable(endOffsets.get(e.getKey()))
  52. .map(o -> o - e.getValue())
  53. .orElse(0L)
  54. ).sum();
  55. }
  56. }