ConsumerGroupMapper.java 5.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140
  1. package com.provectus.kafka.ui.mapper;
  2. import com.provectus.kafka.ui.model.BrokerDTO;
  3. import com.provectus.kafka.ui.model.ConsumerGroupDTO;
  4. import com.provectus.kafka.ui.model.ConsumerGroupDetailsDTO;
  5. import com.provectus.kafka.ui.model.ConsumerGroupStateDTO;
  6. import com.provectus.kafka.ui.model.ConsumerGroupTopicPartitionDTO;
  7. import com.provectus.kafka.ui.model.InternalConsumerGroup;
  8. import com.provectus.kafka.ui.model.InternalTopicConsumerGroup;
  9. import java.util.ArrayList;
  10. import java.util.HashMap;
  11. import java.util.Map;
  12. import java.util.Optional;
  13. import java.util.stream.Collectors;
  14. import java.util.stream.Stream;
  15. import org.apache.kafka.common.Node;
  16. import org.apache.kafka.common.TopicPartition;
  17. public class ConsumerGroupMapper {
  18. private ConsumerGroupMapper() {
  19. }
  20. public static ConsumerGroupDTO toDto(InternalConsumerGroup c) {
  21. return convertToConsumerGroup(c, new ConsumerGroupDTO());
  22. }
  23. public static ConsumerGroupDTO toDto(InternalTopicConsumerGroup c) {
  24. ConsumerGroupDTO consumerGroup = new ConsumerGroupDetailsDTO();
  25. consumerGroup.setTopics(1); //for ui backward-compatibility, need to rm usage from ui
  26. consumerGroup.setGroupId(c.getGroupId());
  27. consumerGroup.setMembers(c.getMembers());
  28. consumerGroup.setMessagesBehind(c.getMessagesBehind());
  29. consumerGroup.setSimple(c.isSimple());
  30. consumerGroup.setPartitionAssignor(c.getPartitionAssignor());
  31. consumerGroup.setState(mapConsumerGroupState(c.getState()));
  32. Optional.ofNullable(c.getCoordinator())
  33. .ifPresent(cd -> consumerGroup.setCoordinator(mapCoordinator(cd)));
  34. return consumerGroup;
  35. }
  36. public static ConsumerGroupDetailsDTO toDetailsDto(InternalConsumerGroup g) {
  37. ConsumerGroupDetailsDTO details = convertToConsumerGroup(g, new ConsumerGroupDetailsDTO());
  38. Map<TopicPartition, ConsumerGroupTopicPartitionDTO> partitionMap = new HashMap<>();
  39. for (Map.Entry<TopicPartition, Long> entry : g.getOffsets().entrySet()) {
  40. ConsumerGroupTopicPartitionDTO partition = new ConsumerGroupTopicPartitionDTO();
  41. partition.setTopic(entry.getKey().topic());
  42. partition.setPartition(entry.getKey().partition());
  43. partition.setCurrentOffset(entry.getValue());
  44. final Optional<Long> endOffset = Optional.ofNullable(g.getEndOffsets())
  45. .map(o -> o.get(entry.getKey()));
  46. final Long behind = endOffset.map(o -> o - entry.getValue())
  47. .orElse(0L);
  48. partition.setEndOffset(endOffset.orElse(0L));
  49. partition.setMessagesBehind(behind);
  50. partitionMap.put(entry.getKey(), partition);
  51. }
  52. for (InternalConsumerGroup.InternalMember member : g.getMembers()) {
  53. for (TopicPartition topicPartition : member.getAssignment()) {
  54. final ConsumerGroupTopicPartitionDTO partition = partitionMap.computeIfAbsent(
  55. topicPartition,
  56. tp -> new ConsumerGroupTopicPartitionDTO()
  57. .topic(tp.topic())
  58. .partition(tp.partition())
  59. );
  60. partition.setHost(member.getHost());
  61. partition.setConsumerId(member.getConsumerId());
  62. partitionMap.put(topicPartition, partition);
  63. }
  64. }
  65. details.setPartitions(new ArrayList<>(partitionMap.values()));
  66. return details;
  67. }
  68. private static <T extends ConsumerGroupDTO> T convertToConsumerGroup(
  69. InternalConsumerGroup c, T consumerGroup) {
  70. consumerGroup.setGroupId(c.getGroupId());
  71. consumerGroup.setMembers(c.getMembers().size());
  72. int numTopics = Stream.concat(
  73. c.getOffsets().keySet().stream().map(TopicPartition::topic),
  74. c.getMembers().stream()
  75. .flatMap(m -> m.getAssignment().stream().map(TopicPartition::topic))
  76. ).collect(Collectors.toSet()).size();
  77. Long messagesBehind = null;
  78. // messagesBehind should be undefined if no committed offsets found for topic
  79. if (!c.getOffsets().isEmpty()) {
  80. messagesBehind = c.getOffsets().entrySet().stream()
  81. .mapToLong(e ->
  82. Optional.ofNullable(c.getEndOffsets())
  83. .map(o -> o.get(e.getKey()))
  84. .map(o -> o - e.getValue())
  85. .orElse(0L)
  86. ).sum();
  87. }
  88. consumerGroup.setMessagesBehind(messagesBehind);
  89. consumerGroup.setTopics(numTopics);
  90. consumerGroup.setSimple(c.isSimple());
  91. Optional.ofNullable(c.getState())
  92. .ifPresent(s -> consumerGroup.setState(mapConsumerGroupState(s)));
  93. Optional.ofNullable(c.getCoordinator())
  94. .ifPresent(cd -> consumerGroup.setCoordinator(mapCoordinator(cd)));
  95. consumerGroup.setPartitionAssignor(c.getPartitionAssignor());
  96. return consumerGroup;
  97. }
  98. private static BrokerDTO mapCoordinator(Node node) {
  99. return new BrokerDTO().host(node.host()).id(node.id()).port(node.port());
  100. }
  101. private static ConsumerGroupStateDTO mapConsumerGroupState(
  102. org.apache.kafka.common.ConsumerGroupState state) {
  103. switch (state) {
  104. case DEAD:
  105. return ConsumerGroupStateDTO.DEAD;
  106. case EMPTY:
  107. return ConsumerGroupStateDTO.EMPTY;
  108. case STABLE:
  109. return ConsumerGroupStateDTO.STABLE;
  110. case PREPARING_REBALANCE:
  111. return ConsumerGroupStateDTO.PREPARING_REBALANCE;
  112. case COMPLETING_REBALANCE:
  113. return ConsumerGroupStateDTO.COMPLETING_REBALANCE;
  114. default:
  115. return ConsumerGroupStateDTO.UNKNOWN;
  116. }
  117. }
  118. }