ConsumerGroupService.java 9.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212
  1. package com.provectus.kafka.ui.service;
  2. import com.provectus.kafka.ui.model.ConsumerGroupOrderingDTO;
  3. import com.provectus.kafka.ui.model.InternalConsumerGroup;
  4. import com.provectus.kafka.ui.model.InternalTopicConsumerGroup;
  5. import com.provectus.kafka.ui.model.KafkaCluster;
  6. import com.provectus.kafka.ui.model.SortOrderDTO;
  7. import java.util.ArrayList;
  8. import java.util.Comparator;
  9. import java.util.HashMap;
  10. import java.util.List;
  11. import java.util.Map;
  12. import java.util.Properties;
  13. import java.util.UUID;
  14. import java.util.function.ToIntFunction;
  15. import java.util.stream.Collectors;
  16. import javax.annotation.Nullable;
  17. import lombok.RequiredArgsConstructor;
  18. import lombok.Value;
  19. import org.apache.commons.lang3.StringUtils;
  20. import org.apache.kafka.clients.admin.ConsumerGroupDescription;
  21. import org.apache.kafka.clients.admin.OffsetSpec;
  22. import org.apache.kafka.clients.consumer.ConsumerConfig;
  23. import org.apache.kafka.clients.consumer.KafkaConsumer;
  24. import org.apache.kafka.common.TopicPartition;
  25. import org.apache.kafka.common.serialization.BytesDeserializer;
  26. import org.apache.kafka.common.utils.Bytes;
  27. import org.springframework.stereotype.Service;
  28. import reactor.core.publisher.Flux;
  29. import reactor.core.publisher.Mono;
  30. import reactor.util.function.Tuple2;
  31. import reactor.util.function.Tuples;
  32. @Service
  33. @RequiredArgsConstructor
  34. public class ConsumerGroupService {
  35. private final AdminClientService adminClientService;
  36. private Mono<List<InternalConsumerGroup>> getConsumerGroups(
  37. ReactiveAdminClient ac,
  38. List<ConsumerGroupDescription> descriptions) {
  39. return Flux.fromIterable(descriptions)
  40. // 1. getting committed offsets for all groups
  41. .flatMap(desc -> ac.listConsumerGroupOffsets(desc.groupId())
  42. .map(offsets -> Tuples.of(desc, offsets)))
  43. .collectMap(Tuple2::getT1, Tuple2::getT2)
  44. .flatMap((Map<ConsumerGroupDescription, Map<TopicPartition, Long>> groupOffsetsMap) -> {
  45. var tpsFromGroupOffsets = groupOffsetsMap.values().stream()
  46. .flatMap(v -> v.keySet().stream())
  47. .collect(Collectors.toSet());
  48. // 2. getting end offsets for partitions with committed offsets
  49. return ac.listOffsets(tpsFromGroupOffsets, OffsetSpec.latest(), false)
  50. .map(endOffsets ->
  51. descriptions.stream()
  52. .map(desc -> {
  53. var groupOffsets = groupOffsetsMap.get(desc);
  54. var endOffsetsForGroup = new HashMap<>(endOffsets);
  55. endOffsetsForGroup.keySet().retainAll(groupOffsets.keySet());
  56. // 3. gathering description & offsets
  57. return InternalConsumerGroup.create(desc, groupOffsets, endOffsetsForGroup);
  58. })
  59. .collect(Collectors.toList()));
  60. });
  61. }
  62. public Mono<List<InternalTopicConsumerGroup>> getConsumerGroupsForTopic(KafkaCluster cluster,
  63. String topic) {
  64. return adminClientService.get(cluster)
  65. // 1. getting topic's end offsets
  66. .flatMap(ac -> ac.listTopicOffsets(topic, OffsetSpec.latest(), false)
  67. .flatMap(endOffsets -> {
  68. var tps = new ArrayList<>(endOffsets.keySet());
  69. // 2. getting all consumer groups
  70. return describeConsumerGroups(ac, null)
  71. .flatMap((List<ConsumerGroupDescription> groups) ->
  72. Flux.fromIterable(groups)
  73. // 3. for each group trying to find committed offsets for topic
  74. .flatMap(g ->
  75. ac.listConsumerGroupOffsets(g.groupId(), tps)
  76. // 4. keeping only groups that relates to topic
  77. .filter(offsets -> isConsumerGroupRelatesToTopic(topic, g, offsets))
  78. // 5. constructing results
  79. .map(offsets -> InternalTopicConsumerGroup.create(topic, g, offsets, endOffsets))
  80. ).collectList());
  81. }));
  82. }
  83. private boolean isConsumerGroupRelatesToTopic(String topic,
  84. ConsumerGroupDescription description,
  85. Map<TopicPartition, Long> committedGroupOffsetsForTopic) {
  86. boolean hasActiveMembersForTopic = description.members()
  87. .stream()
  88. .anyMatch(m -> m.assignment().topicPartitions().stream().anyMatch(tp -> tp.topic().equals(topic)));
  89. boolean hasCommittedOffsets = !committedGroupOffsetsForTopic.isEmpty();
  90. return hasActiveMembersForTopic || hasCommittedOffsets;
  91. }
  92. @Value
  93. public static class ConsumerGroupsPage {
  94. List<InternalConsumerGroup> consumerGroups;
  95. int totalPages;
  96. }
  97. public Mono<ConsumerGroupsPage> getConsumerGroupsPage(
  98. KafkaCluster cluster,
  99. int page,
  100. int perPage,
  101. @Nullable String search,
  102. ConsumerGroupOrderingDTO orderBy,
  103. SortOrderDTO sortOrderDto
  104. ) {
  105. var comparator = sortOrderDto.equals(SortOrderDTO.ASC)
  106. ? getPaginationComparator(orderBy)
  107. : getPaginationComparator(orderBy).reversed();
  108. return adminClientService.get(cluster).flatMap(ac ->
  109. describeConsumerGroups(ac, search).flatMap(descriptions ->
  110. getConsumerGroups(
  111. ac,
  112. descriptions.stream()
  113. .sorted(comparator)
  114. .skip((long) (page - 1) * perPage)
  115. .limit(perPage)
  116. .collect(Collectors.toList())
  117. ).map(cgs -> new ConsumerGroupsPage(
  118. cgs,
  119. (descriptions.size() / perPage) + (descriptions.size() % perPage == 0 ? 0 : 1))))
  120. );
  121. }
  122. private Comparator<ConsumerGroupDescription> getPaginationComparator(ConsumerGroupOrderingDTO
  123. orderBy) {
  124. switch (orderBy) {
  125. case NAME:
  126. return Comparator.comparing(ConsumerGroupDescription::groupId);
  127. case STATE:
  128. ToIntFunction<ConsumerGroupDescription> statesPriorities = cg -> {
  129. switch (cg.state()) {
  130. case STABLE:
  131. return 0;
  132. case COMPLETING_REBALANCE:
  133. return 1;
  134. case PREPARING_REBALANCE:
  135. return 2;
  136. case EMPTY:
  137. return 3;
  138. case DEAD:
  139. return 4;
  140. case UNKNOWN:
  141. return 5;
  142. default:
  143. return 100;
  144. }
  145. };
  146. return Comparator.comparingInt(statesPriorities);
  147. case MEMBERS:
  148. return Comparator.comparingInt(cg -> cg.members().size());
  149. default:
  150. throw new IllegalStateException("Unsupported order by: " + orderBy);
  151. }
  152. }
  153. private Mono<List<ConsumerGroupDescription>> describeConsumerGroups(ReactiveAdminClient ac,
  154. @Nullable String search) {
  155. return ac.listConsumerGroups()
  156. .map(groupIds -> groupIds
  157. .stream()
  158. .filter(groupId -> search == null || StringUtils.containsIgnoreCase(groupId, search))
  159. .collect(Collectors.toList()))
  160. .flatMap(ac::describeConsumerGroups)
  161. .map(cgs -> new ArrayList<>(cgs.values()));
  162. }
  163. public Mono<InternalConsumerGroup> getConsumerGroupDetail(KafkaCluster cluster,
  164. String consumerGroupId) {
  165. return adminClientService.get(cluster)
  166. .flatMap(ac -> ac.describeConsumerGroups(List.of(consumerGroupId))
  167. .filter(m -> m.containsKey(consumerGroupId))
  168. .map(r -> r.get(consumerGroupId))
  169. .flatMap(descr ->
  170. getConsumerGroups(ac, List.of(descr))
  171. .filter(groups -> !groups.isEmpty())
  172. .map(groups -> groups.get(0))));
  173. }
  174. public Mono<Void> deleteConsumerGroupById(KafkaCluster cluster,
  175. String groupId) {
  176. return adminClientService.get(cluster)
  177. .flatMap(adminClient -> adminClient.deleteConsumerGroups(List.of(groupId)));
  178. }
  179. public KafkaConsumer<Bytes, Bytes> createConsumer(KafkaCluster cluster) {
  180. return createConsumer(cluster, Map.of());
  181. }
  182. public KafkaConsumer<Bytes, Bytes> createConsumer(KafkaCluster cluster,
  183. Map<String, Object> properties) {
  184. Properties props = new Properties();
  185. props.putAll(cluster.getProperties());
  186. props.put(ConsumerConfig.CLIENT_ID_CONFIG, "kafka-ui-" + UUID.randomUUID());
  187. props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, cluster.getBootstrapServers());
  188. props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, BytesDeserializer.class);
  189. props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, BytesDeserializer.class);
  190. props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
  191. props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
  192. props.put(ConsumerConfig.ALLOW_AUTO_CREATE_TOPICS_CONFIG, "false");
  193. props.putAll(properties);
  194. return new KafkaConsumer<>(props);
  195. }
  196. }