ClusterService.java 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217
  1. package com.provectus.kafka.ui.service;
  2. import com.provectus.kafka.ui.exception.NotFoundException;
  3. import com.provectus.kafka.ui.mapper.ClusterMapper;
  4. import com.provectus.kafka.ui.model.ConsumerPosition;
  5. import com.provectus.kafka.ui.model.InternalTopic;
  6. import com.provectus.kafka.ui.model.KafkaCluster;
  7. import com.provectus.kafka.ui.util.ClusterUtil;
  8. import com.provectus.kafka.ui.model.*;
  9. import lombok.RequiredArgsConstructor;
  10. import lombok.SneakyThrows;
  11. import org.apache.kafka.clients.consumer.ConsumerConfig;
  12. import org.apache.kafka.clients.consumer.KafkaConsumer;
  13. import org.apache.kafka.clients.consumer.OffsetAndMetadata;
  14. import org.apache.kafka.common.TopicPartition;
  15. import org.apache.kafka.common.serialization.StringDeserializer;
  16. import org.springframework.stereotype.Service;
  17. import reactor.core.publisher.Flux;
  18. import reactor.core.publisher.Mono;
  19. import java.util.*;
  20. import java.util.function.Predicate;
  21. import java.util.stream.Collectors;
  22. import java.util.stream.Stream;
  23. @Service
  24. @RequiredArgsConstructor
  25. public class ClusterService {
  26. private static final Integer DEFAULT_PAGE_SIZE = 20;
  27. private final ClustersStorage clustersStorage;
  28. private final ClusterMapper clusterMapper;
  29. private final KafkaService kafkaService;
  30. private final ConsumingService consumingService;
  31. public List<Cluster> getClusters() {
  32. return clustersStorage.getKafkaClusters()
  33. .stream()
  34. .map(clusterMapper::toCluster)
  35. .collect(Collectors.toList());
  36. }
  37. public Mono<BrokerMetrics> getBrokerMetrics(String name, Integer id) {
  38. return Mono.justOrEmpty(clustersStorage.getClusterByName(name)
  39. .map(c -> c.getMetrics().getInternalBrokerMetrics())
  40. .map(m -> m.get(id))
  41. .map(clusterMapper::toBrokerMetrics));
  42. }
  43. public Mono<ClusterStats> getClusterStats(String name) {
  44. return Mono.justOrEmpty(
  45. clustersStorage.getClusterByName(name)
  46. .map(KafkaCluster::getMetrics)
  47. .map(clusterMapper::toClusterStats)
  48. );
  49. }
  50. public Mono<ClusterMetrics> getClusterMetrics(String name) {
  51. return Mono.justOrEmpty(
  52. clustersStorage.getClusterByName(name)
  53. .map(KafkaCluster::getMetrics)
  54. .map(clusterMapper::toClusterMetrics)
  55. );
  56. }
  57. public TopicsResponse getTopics(String name, Optional<Integer> page, Optional<Integer> nullablePerPage) {
  58. Predicate<Integer> positiveInt = i -> i > 0;
  59. int perPage = nullablePerPage.filter(positiveInt).orElse(DEFAULT_PAGE_SIZE);
  60. var topicsToSkip = (page.filter(positiveInt).orElse(1) - 1) * perPage;
  61. var cluster = clustersStorage.getClusterByName(name).orElseThrow(() -> new NotFoundException("No such cluster"));
  62. var totalPages = (cluster.getTopics().size() / perPage) + (cluster.getTopics().size() % perPage == 0 ? 0 : 1);
  63. return new TopicsResponse()
  64. .pageCount(totalPages)
  65. .topics(
  66. cluster.getTopics().values().stream()
  67. .sorted(Comparator.comparing(InternalTopic::getName))
  68. .skip(topicsToSkip)
  69. .limit(perPage)
  70. .map(clusterMapper::toTopic)
  71. .collect(Collectors.toList())
  72. );
  73. }
  74. public Optional<TopicDetails> getTopicDetails(String name, String topicName) {
  75. return clustersStorage.getClusterByName(name)
  76. .flatMap(c ->
  77. Optional.ofNullable(
  78. c.getTopics().get(topicName)
  79. ).map(
  80. t -> t.toBuilder().partitions(
  81. kafkaService.getTopicPartitions(c, t)
  82. ).build()
  83. ).map(t -> clusterMapper.toTopicDetails(t, c.getMetrics()))
  84. );
  85. }
  86. public Optional<List<TopicConfig>> getTopicConfigs(String name, String topicName) {
  87. return clustersStorage.getClusterByName(name)
  88. .map(KafkaCluster::getTopics)
  89. .map(t -> t.get(topicName))
  90. .map(t -> t.getTopicConfigs().stream().map(clusterMapper::toTopicConfig).collect(Collectors.toList()));
  91. }
  92. public Mono<Topic> createTopic(String clusterName, Mono<TopicFormData> topicFormData) {
  93. return clustersStorage.getClusterByName(clusterName).map(cluster ->
  94. kafkaService.createTopic(cluster, topicFormData)
  95. .doOnNext(t -> updateCluster(t, clusterName, cluster))
  96. .map(clusterMapper::toTopic)
  97. ).orElse(Mono.empty());
  98. }
  99. @SneakyThrows
  100. public Mono<ConsumerGroupDetails> getConsumerGroupDetail(String clusterName, String consumerGroupId) {
  101. var cluster = clustersStorage.getClusterByName(clusterName).orElseThrow(Throwable::new);
  102. return kafkaService.getOrCreateAdminClient(cluster).map(ac ->
  103. ac.getAdminClient().describeConsumerGroups(Collections.singletonList(consumerGroupId)).all()
  104. ).flatMap(groups ->
  105. groupMetadata(cluster, consumerGroupId)
  106. .flatMap(offsets -> {
  107. Map<TopicPartition, Long> endOffsets = topicPartitionsEndOffsets(cluster, offsets.keySet());
  108. return ClusterUtil.toMono(groups).map(s -> s.get(consumerGroupId).members().stream()
  109. .flatMap(c -> Stream.of(ClusterUtil.convertToConsumerTopicPartitionDetails(c, offsets, endOffsets)))
  110. .collect(Collectors.toList()).stream().flatMap(t -> t.stream().flatMap(Stream::of)).collect(Collectors.toList()));
  111. })
  112. )
  113. .map(c -> new ConsumerGroupDetails().consumers(c).consumerGroupId(consumerGroupId));
  114. }
  115. public Mono<Map<TopicPartition, OffsetAndMetadata>> groupMetadata(KafkaCluster cluster, String consumerGroupId) {
  116. return
  117. kafkaService.getOrCreateAdminClient(cluster)
  118. .map(ac -> ac.getAdminClient().listConsumerGroupOffsets(consumerGroupId).partitionsToOffsetAndMetadata())
  119. .flatMap(ClusterUtil::toMono);
  120. }
  121. public Map<TopicPartition, Long> topicPartitionsEndOffsets(KafkaCluster cluster, Collection<TopicPartition> topicPartitions) {
  122. Properties properties = new Properties();
  123. properties.putAll(cluster.getProperties());
  124. properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, cluster.getBootstrapServers());
  125. properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
  126. properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
  127. properties.put(ConsumerConfig.GROUP_ID_CONFIG, UUID.randomUUID().toString());
  128. try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties)) {
  129. return consumer.endOffsets(topicPartitions);
  130. }
  131. }
  132. @SneakyThrows
  133. public Mono<List<ConsumerGroup>> getConsumerGroups(String clusterName) {
  134. return clustersStorage.getClusterByName(clusterName)
  135. .map(kafkaService::getConsumerGroups)
  136. .orElse(Mono.empty());
  137. }
  138. public Flux<Broker> getBrokers(String clusterName) {
  139. return kafkaService.getOrCreateAdminClient(clustersStorage.getClusterByName(clusterName).orElseThrow())
  140. .flatMap(client -> ClusterUtil.toMono(client.getAdminClient().describeCluster().nodes())
  141. .map(n -> n.stream().map(node -> {
  142. Broker broker = new Broker();
  143. broker.setId(node.id());
  144. broker.setHost(node.host());
  145. return broker;
  146. }).collect(Collectors.toList())))
  147. .flatMapMany(Flux::fromIterable);
  148. }
  149. @SneakyThrows
  150. public Mono<Topic> updateTopic(String clusterName, String topicName, Mono<TopicFormData> topicFormData) {
  151. return clustersStorage.getClusterByName(clusterName).map(cl ->
  152. topicFormData
  153. .flatMap(t -> kafkaService.updateTopic(cl, topicName, t))
  154. .doOnNext(t -> updateCluster(t, clusterName, cl))
  155. .map(clusterMapper::toTopic)
  156. ).orElse(Mono.empty());
  157. }
  158. public Mono<Void> deleteTopic(String clusterName, String topicName) {
  159. var cluster = clustersStorage.getClusterByName(clusterName)
  160. .orElseThrow(() -> new NotFoundException("No such cluster"));
  161. getTopicDetails(clusterName, topicName)
  162. .orElseThrow(() -> new NotFoundException("No such topic"));
  163. return kafkaService.deleteTopic(cluster, topicName)
  164. .doOnNext(t -> updateCluster(topicName, clusterName, cluster));
  165. }
  166. private KafkaCluster updateCluster(InternalTopic topic, String clusterName, KafkaCluster cluster) {
  167. final KafkaCluster updatedCluster = kafkaService.getUpdatedCluster(cluster, topic);
  168. clustersStorage.setKafkaCluster(clusterName, updatedCluster);
  169. return updatedCluster;
  170. }
  171. private KafkaCluster updateCluster(String topicToDelete, String clusterName, KafkaCluster cluster) {
  172. final KafkaCluster updatedCluster = kafkaService.getUpdatedCluster(cluster, topicToDelete);
  173. clustersStorage.setKafkaCluster(clusterName, updatedCluster);
  174. return updatedCluster;
  175. }
  176. public Flux<TopicMessage> getMessages(String clusterName, String topicName, ConsumerPosition consumerPosition, String query, Integer limit) {
  177. return clustersStorage.getClusterByName(clusterName)
  178. .map(c -> consumingService.loadMessages(c, topicName, consumerPosition, query, limit))
  179. .orElse(Flux.empty());
  180. }
  181. public Mono<Void> deleteTopicMessages(String clusterName, String topicName, List<Integer> partitions) {
  182. var cluster = clustersStorage.getClusterByName(clusterName)
  183. .orElseThrow(() -> new NotFoundException("No such cluster"));
  184. if (!cluster.getTopics().containsKey(topicName)) {
  185. throw new NotFoundException("No such topic");
  186. }
  187. return consumingService.loadOffsets(cluster, topicName, partitions)
  188. .flatMap(offsets -> kafkaService.deleteTopicMessages(cluster, offsets));
  189. }
  190. }