InternalConsumerGroup.java 3.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100
  1. package com.provectus.kafka.ui.model;
  2. import java.util.Collection;
  3. import java.util.Map;
  4. import java.util.Optional;
  5. import java.util.Set;
  6. import java.util.stream.Collectors;
  7. import java.util.stream.Stream;
  8. import lombok.Builder;
  9. import lombok.Data;
  10. import org.apache.kafka.clients.admin.ConsumerGroupDescription;
  11. import org.apache.kafka.common.ConsumerGroupState;
  12. import org.apache.kafka.common.Node;
  13. import org.apache.kafka.common.TopicPartition;
  14. @Data
  15. @Builder(toBuilder = true)
  16. public class InternalConsumerGroup {
  17. private final String groupId;
  18. private final boolean simple;
  19. private final Collection<InternalMember> members;
  20. private final Map<TopicPartition, Long> offsets;
  21. private final Map<TopicPartition, Long> endOffsets;
  22. private final Long consumerLag;
  23. private final Integer topicNum;
  24. private final String partitionAssignor;
  25. private final ConsumerGroupState state;
  26. private final Node coordinator;
  27. @Data
  28. @Builder(toBuilder = true)
  29. public static class InternalMember {
  30. private final String consumerId;
  31. private final String groupInstanceId;
  32. private final String clientId;
  33. private final String host;
  34. private final Set<TopicPartition> assignment;
  35. }
  36. public static InternalConsumerGroup create(
  37. ConsumerGroupDescription description,
  38. Map<TopicPartition, Long> groupOffsets,
  39. Map<TopicPartition, Long> topicEndOffsets) {
  40. var builder = InternalConsumerGroup.builder();
  41. builder.groupId(description.groupId());
  42. builder.simple(description.isSimpleConsumerGroup());
  43. builder.state(description.state());
  44. builder.partitionAssignor(description.partitionAssignor());
  45. Collection<InternalMember> internalMembers = initInternalMembers(description);
  46. builder.members(internalMembers);
  47. builder.offsets(groupOffsets);
  48. builder.endOffsets(topicEndOffsets);
  49. builder.consumerLag(calculateConsumerLag(groupOffsets, topicEndOffsets));
  50. builder.topicNum(calculateTopicNum(groupOffsets, internalMembers));
  51. Optional.ofNullable(description.coordinator()).ifPresent(builder::coordinator);
  52. return builder.build();
  53. }
  54. private static Long calculateConsumerLag(Map<TopicPartition, Long> offsets, Map<TopicPartition, Long> endOffsets) {
  55. Long consumerLag = null;
  56. // consumerLag should be undefined if no committed offsets found for topic
  57. if (!offsets.isEmpty()) {
  58. consumerLag = offsets.entrySet().stream()
  59. .mapToLong(e ->
  60. Optional.ofNullable(endOffsets)
  61. .map(o -> o.get(e.getKey()))
  62. .map(o -> o - e.getValue())
  63. .orElse(0L)
  64. ).sum();
  65. }
  66. return consumerLag;
  67. }
  68. private static Integer calculateTopicNum(Map<TopicPartition, Long> offsets, Collection<InternalMember> members) {
  69. return (int) Stream.concat(
  70. offsets.keySet().stream().map(TopicPartition::topic),
  71. members.stream()
  72. .flatMap(m -> m.getAssignment().stream().map(TopicPartition::topic))
  73. ).distinct().count();
  74. }
  75. private static Collection<InternalMember> initInternalMembers(ConsumerGroupDescription description) {
  76. return description.members().stream()
  77. .map(m ->
  78. InternalConsumerGroup.InternalMember.builder()
  79. .assignment(m.assignment().topicPartitions())
  80. .clientId(m.clientId())
  81. .groupInstanceId(m.groupInstanceId().orElse(""))
  82. .consumerId(m.consumerId())
  83. .clientId(m.clientId())
  84. .host(m.host())
  85. .build()
  86. ).collect(Collectors.toList());
  87. }
  88. }