ConsumerGroupMapper.java 4.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119
  1. package com.provectus.kafka.ui.mapper;
  2. import com.provectus.kafka.ui.model.BrokerDTO;
  3. import com.provectus.kafka.ui.model.ConsumerGroupDTO;
  4. import com.provectus.kafka.ui.model.ConsumerGroupDetailsDTO;
  5. import com.provectus.kafka.ui.model.ConsumerGroupStateDTO;
  6. import com.provectus.kafka.ui.model.ConsumerGroupTopicPartitionDTO;
  7. import com.provectus.kafka.ui.model.InternalConsumerGroup;
  8. import com.provectus.kafka.ui.model.InternalTopicConsumerGroup;
  9. import java.util.ArrayList;
  10. import java.util.HashMap;
  11. import java.util.Map;
  12. import java.util.Optional;
  13. import org.apache.kafka.common.Node;
  14. import org.apache.kafka.common.TopicPartition;
  15. public class ConsumerGroupMapper {
  16. private ConsumerGroupMapper() {
  17. }
  18. public static ConsumerGroupDTO toDto(InternalConsumerGroup c) {
  19. return convertToConsumerGroup(c, new ConsumerGroupDTO());
  20. }
  21. public static ConsumerGroupDTO toDto(InternalTopicConsumerGroup c) {
  22. ConsumerGroupDTO consumerGroup = new ConsumerGroupDetailsDTO();
  23. consumerGroup.setTopics(1); //for ui backward-compatibility, need to rm usage from ui
  24. consumerGroup.setGroupId(c.getGroupId());
  25. consumerGroup.setMembers(c.getMembers());
  26. consumerGroup.setMessagesBehind(c.getMessagesBehind());
  27. consumerGroup.setSimple(c.isSimple());
  28. consumerGroup.setPartitionAssignor(c.getPartitionAssignor());
  29. consumerGroup.setState(mapConsumerGroupState(c.getState()));
  30. Optional.ofNullable(c.getCoordinator())
  31. .ifPresent(cd -> consumerGroup.setCoordinator(mapCoordinator(cd)));
  32. return consumerGroup;
  33. }
  34. public static ConsumerGroupDetailsDTO toDetailsDto(InternalConsumerGroup g) {
  35. ConsumerGroupDetailsDTO details = convertToConsumerGroup(g, new ConsumerGroupDetailsDTO());
  36. Map<TopicPartition, ConsumerGroupTopicPartitionDTO> partitionMap = new HashMap<>();
  37. for (Map.Entry<TopicPartition, Long> entry : g.getOffsets().entrySet()) {
  38. ConsumerGroupTopicPartitionDTO partition = new ConsumerGroupTopicPartitionDTO();
  39. partition.setTopic(entry.getKey().topic());
  40. partition.setPartition(entry.getKey().partition());
  41. partition.setCurrentOffset(entry.getValue());
  42. final Optional<Long> endOffset = Optional.ofNullable(g.getEndOffsets())
  43. .map(o -> o.get(entry.getKey()));
  44. final Long behind = endOffset.map(o -> o - entry.getValue())
  45. .orElse(0L);
  46. partition.setEndOffset(endOffset.orElse(0L));
  47. partition.setMessagesBehind(behind);
  48. partitionMap.put(entry.getKey(), partition);
  49. }
  50. for (InternalConsumerGroup.InternalMember member : g.getMembers()) {
  51. for (TopicPartition topicPartition : member.getAssignment()) {
  52. final ConsumerGroupTopicPartitionDTO partition = partitionMap.computeIfAbsent(
  53. topicPartition,
  54. tp -> new ConsumerGroupTopicPartitionDTO()
  55. .topic(tp.topic())
  56. .partition(tp.partition())
  57. );
  58. partition.setHost(member.getHost());
  59. partition.setConsumerId(member.getConsumerId());
  60. partitionMap.put(topicPartition, partition);
  61. }
  62. }
  63. details.setPartitions(new ArrayList<>(partitionMap.values()));
  64. return details;
  65. }
  66. private static <T extends ConsumerGroupDTO> T convertToConsumerGroup(
  67. InternalConsumerGroup c, T consumerGroup) {
  68. consumerGroup.setGroupId(c.getGroupId());
  69. consumerGroup.setMembers(c.getMembers().size());
  70. consumerGroup.setMessagesBehind(c.getMessagesBehind());
  71. consumerGroup.setTopics(c.getTopicNum());
  72. consumerGroup.setSimple(c.isSimple());
  73. Optional.ofNullable(c.getState())
  74. .ifPresent(s -> consumerGroup.setState(mapConsumerGroupState(s)));
  75. Optional.ofNullable(c.getCoordinator())
  76. .ifPresent(cd -> consumerGroup.setCoordinator(mapCoordinator(cd)));
  77. consumerGroup.setPartitionAssignor(c.getPartitionAssignor());
  78. return consumerGroup;
  79. }
  80. private static BrokerDTO mapCoordinator(Node node) {
  81. return new BrokerDTO().host(node.host()).id(node.id()).port(node.port());
  82. }
  83. private static ConsumerGroupStateDTO mapConsumerGroupState(
  84. org.apache.kafka.common.ConsumerGroupState state) {
  85. switch (state) {
  86. case DEAD:
  87. return ConsumerGroupStateDTO.DEAD;
  88. case EMPTY:
  89. return ConsumerGroupStateDTO.EMPTY;
  90. case STABLE:
  91. return ConsumerGroupStateDTO.STABLE;
  92. case PREPARING_REBALANCE:
  93. return ConsumerGroupStateDTO.PREPARING_REBALANCE;
  94. case COMPLETING_REBALANCE:
  95. return ConsumerGroupStateDTO.COMPLETING_REBALANCE;
  96. default:
  97. return ConsumerGroupStateDTO.UNKNOWN;
  98. }
  99. }
  100. }