ClusterService.java 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254
  1. package com.provectus.kafka.ui.service;
  2. import com.provectus.kafka.ui.exception.ClusterNotFoundException;
  3. import com.provectus.kafka.ui.exception.TopicNotFoundException;
  4. import com.provectus.kafka.ui.mapper.ClusterMapper;
  5. import com.provectus.kafka.ui.model.Broker;
  6. import com.provectus.kafka.ui.model.BrokerMetrics;
  7. import com.provectus.kafka.ui.model.Cluster;
  8. import com.provectus.kafka.ui.model.ClusterMetrics;
  9. import com.provectus.kafka.ui.model.ClusterStats;
  10. import com.provectus.kafka.ui.model.ConsumerGroup;
  11. import com.provectus.kafka.ui.model.ConsumerGroupDetails;
  12. import com.provectus.kafka.ui.model.ConsumerPosition;
  13. import com.provectus.kafka.ui.model.InternalTopic;
  14. import com.provectus.kafka.ui.model.KafkaCluster;
  15. import com.provectus.kafka.ui.model.Topic;
  16. import com.provectus.kafka.ui.model.TopicConfig;
  17. import com.provectus.kafka.ui.model.TopicCreation;
  18. import com.provectus.kafka.ui.model.TopicDetails;
  19. import com.provectus.kafka.ui.model.TopicMessage;
  20. import com.provectus.kafka.ui.model.TopicUpdate;
  21. import com.provectus.kafka.ui.model.TopicsResponse;
  22. import com.provectus.kafka.ui.util.ClusterUtil;
  23. import java.util.Collection;
  24. import java.util.Collections;
  25. import java.util.Comparator;
  26. import java.util.List;
  27. import java.util.Map;
  28. import java.util.Optional;
  29. import java.util.Properties;
  30. import java.util.UUID;
  31. import java.util.function.Predicate;
  32. import java.util.stream.Collectors;
  33. import java.util.stream.Stream;
  34. import lombok.RequiredArgsConstructor;
  35. import lombok.SneakyThrows;
  36. import org.apache.kafka.clients.consumer.ConsumerConfig;
  37. import org.apache.kafka.clients.consumer.KafkaConsumer;
  38. import org.apache.kafka.clients.consumer.OffsetAndMetadata;
  39. import org.apache.kafka.common.TopicPartition;
  40. import org.apache.kafka.common.serialization.StringDeserializer;
  41. import org.springframework.stereotype.Service;
  42. import reactor.core.publisher.Flux;
  43. import reactor.core.publisher.Mono;
  44. @Service
  45. @RequiredArgsConstructor
  46. public class ClusterService {
  47. private static final Integer DEFAULT_PAGE_SIZE = 20;
  48. private final ClustersStorage clustersStorage;
  49. private final ClusterMapper clusterMapper;
  50. private final KafkaService kafkaService;
  51. private final ConsumingService consumingService;
  52. public List<Cluster> getClusters() {
  53. return clustersStorage.getKafkaClusters()
  54. .stream()
  55. .map(clusterMapper::toCluster)
  56. .collect(Collectors.toList());
  57. }
  58. public Mono<BrokerMetrics> getBrokerMetrics(String name, Integer id) {
  59. return Mono.justOrEmpty(clustersStorage.getClusterByName(name)
  60. .map(c -> c.getMetrics().getInternalBrokerMetrics())
  61. .map(m -> m.get(id))
  62. .map(clusterMapper::toBrokerMetrics));
  63. }
  64. public Mono<ClusterStats> getClusterStats(String name) {
  65. return Mono.justOrEmpty(
  66. clustersStorage.getClusterByName(name)
  67. .map(KafkaCluster::getMetrics)
  68. .map(clusterMapper::toClusterStats)
  69. );
  70. }
  71. public Mono<ClusterMetrics> getClusterMetrics(String name) {
  72. return Mono.justOrEmpty(
  73. clustersStorage.getClusterByName(name)
  74. .map(KafkaCluster::getMetrics)
  75. .map(clusterMapper::toClusterMetrics)
  76. );
  77. }
  78. public TopicsResponse getTopics(String name, Optional<Integer> page,
  79. Optional<Integer> nullablePerPage) {
  80. Predicate<Integer> positiveInt = i -> i > 0;
  81. int perPage = nullablePerPage.filter(positiveInt).orElse(DEFAULT_PAGE_SIZE);
  82. var topicsToSkip = (page.filter(positiveInt).orElse(1) - 1) * perPage;
  83. var cluster = clustersStorage.getClusterByName(name)
  84. .orElseThrow(ClusterNotFoundException::new);
  85. var totalPages = (cluster.getTopics().size() / perPage)
  86. + (cluster.getTopics().size() % perPage == 0 ? 0 : 1);
  87. return new TopicsResponse()
  88. .pageCount(totalPages)
  89. .topics(
  90. cluster.getTopics().values().stream()
  91. .sorted(Comparator.comparing(InternalTopic::getName))
  92. .skip(topicsToSkip)
  93. .limit(perPage)
  94. .map(clusterMapper::toTopic)
  95. .collect(Collectors.toList())
  96. );
  97. }
  98. public Optional<TopicDetails> getTopicDetails(String name, String topicName) {
  99. return clustersStorage.getClusterByName(name)
  100. .flatMap(c ->
  101. Optional.ofNullable(
  102. c.getTopics().get(topicName)
  103. ).map(
  104. t -> t.toBuilder().partitions(
  105. kafkaService.getTopicPartitions(c, t)
  106. ).build()
  107. ).map(t -> clusterMapper.toTopicDetails(t, c.getMetrics()))
  108. );
  109. }
  110. public Optional<List<TopicConfig>> getTopicConfigs(String name, String topicName) {
  111. return clustersStorage.getClusterByName(name)
  112. .map(KafkaCluster::getTopics)
  113. .map(t -> t.get(topicName))
  114. .map(t -> t.getTopicConfigs().stream().map(clusterMapper::toTopicConfig)
  115. .collect(Collectors.toList()));
  116. }
  117. public Mono<Topic> createTopic(String clusterName, Mono<TopicCreation> topicCreation) {
  118. return clustersStorage.getClusterByName(clusterName).map(cluster ->
  119. kafkaService.createTopic(cluster, topicCreation)
  120. .doOnNext(t -> updateCluster(t, clusterName, cluster))
  121. .map(clusterMapper::toTopic)
  122. ).orElse(Mono.empty());
  123. }
  124. @SneakyThrows
  125. public Mono<ConsumerGroupDetails> getConsumerGroupDetail(String clusterName,
  126. String consumerGroupId) {
  127. var cluster = clustersStorage.getClusterByName(clusterName).orElseThrow(Throwable::new);
  128. return kafkaService.getOrCreateAdminClient(cluster).map(ac ->
  129. ac.getAdminClient().describeConsumerGroups(Collections.singletonList(consumerGroupId)).all()
  130. ).flatMap(groups ->
  131. groupMetadata(cluster, consumerGroupId)
  132. .flatMap(offsets -> {
  133. Map<TopicPartition, Long> endOffsets =
  134. topicPartitionsEndOffsets(cluster, offsets.keySet());
  135. return ClusterUtil.toMono(groups).map(s -> s.get(consumerGroupId).members().stream()
  136. .flatMap(c -> Stream.of(ClusterUtil
  137. .convertToConsumerTopicPartitionDetails(c, offsets, endOffsets)))
  138. .collect(Collectors.toList()).stream()
  139. .flatMap(t -> t.stream().flatMap(Stream::of)).collect(Collectors.toList()));
  140. })
  141. )
  142. .map(c -> new ConsumerGroupDetails().consumers(c).consumerGroupId(consumerGroupId));
  143. }
  144. public Mono<Map<TopicPartition, OffsetAndMetadata>> groupMetadata(KafkaCluster cluster,
  145. String consumerGroupId) {
  146. return
  147. kafkaService.getOrCreateAdminClient(cluster)
  148. .map(ac -> ac.getAdminClient().listConsumerGroupOffsets(consumerGroupId)
  149. .partitionsToOffsetAndMetadata())
  150. .flatMap(ClusterUtil::toMono);
  151. }
  152. public Map<TopicPartition, Long> topicPartitionsEndOffsets(
  153. KafkaCluster cluster, Collection<TopicPartition> topicPartitions) {
  154. Properties properties = new Properties();
  155. properties.putAll(cluster.getProperties());
  156. properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, cluster.getBootstrapServers());
  157. properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
  158. properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
  159. properties.put(ConsumerConfig.GROUP_ID_CONFIG, UUID.randomUUID().toString());
  160. try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties)) {
  161. return consumer.endOffsets(topicPartitions);
  162. }
  163. }
  164. public Mono<List<ConsumerGroup>> getConsumerGroups(String clusterName) {
  165. return Mono.justOrEmpty(clustersStorage.getClusterByName(clusterName))
  166. .switchIfEmpty(Mono.error(ClusterNotFoundException::new))
  167. .flatMap(kafkaService::getConsumerGroups);
  168. }
  169. public Flux<Broker> getBrokers(String clusterName) {
  170. return kafkaService
  171. .getOrCreateAdminClient(clustersStorage.getClusterByName(clusterName).orElseThrow())
  172. .flatMap(client -> ClusterUtil.toMono(client.getAdminClient().describeCluster().nodes())
  173. .map(n -> n.stream().map(node -> {
  174. Broker broker = new Broker();
  175. broker.setId(node.id());
  176. broker.setHost(node.host());
  177. return broker;
  178. }).collect(Collectors.toList())))
  179. .flatMapMany(Flux::fromIterable);
  180. }
  181. @SneakyThrows
  182. public Mono<Topic> updateTopic(String clusterName, String topicName,
  183. Mono<TopicUpdate> topicUpdate) {
  184. return clustersStorage.getClusterByName(clusterName).map(cl ->
  185. topicUpdate
  186. .flatMap(t -> kafkaService.updateTopic(cl, topicName, t))
  187. .doOnNext(t -> updateCluster(t, clusterName, cl))
  188. .map(clusterMapper::toTopic)
  189. ).orElse(Mono.empty());
  190. }
  191. public Mono<Void> deleteTopic(String clusterName, String topicName) {
  192. var cluster = clustersStorage.getClusterByName(clusterName)
  193. .orElseThrow(ClusterNotFoundException::new);
  194. var topic = getTopicDetails(clusterName, topicName)
  195. .orElseThrow(TopicNotFoundException::new);
  196. return kafkaService.deleteTopic(cluster, topic.getName())
  197. .doOnNext(t -> updateCluster(topicName, clusterName, cluster));
  198. }
  199. private KafkaCluster updateCluster(InternalTopic topic, String clusterName,
  200. KafkaCluster cluster) {
  201. final KafkaCluster updatedCluster = kafkaService.getUpdatedCluster(cluster, topic);
  202. clustersStorage.setKafkaCluster(clusterName, updatedCluster);
  203. return updatedCluster;
  204. }
  205. private KafkaCluster updateCluster(String topicToDelete, String clusterName,
  206. KafkaCluster cluster) {
  207. final KafkaCluster updatedCluster = kafkaService.getUpdatedCluster(cluster, topicToDelete);
  208. clustersStorage.setKafkaCluster(clusterName, updatedCluster);
  209. return updatedCluster;
  210. }
  211. public Flux<TopicMessage> getMessages(String clusterName, String topicName,
  212. ConsumerPosition consumerPosition, String query,
  213. Integer limit) {
  214. return clustersStorage.getClusterByName(clusterName)
  215. .map(c -> consumingService.loadMessages(c, topicName, consumerPosition, query, limit))
  216. .orElse(Flux.empty());
  217. }
  218. public Mono<Void> deleteTopicMessages(String clusterName, String topicName,
  219. List<Integer> partitions) {
  220. var cluster = clustersStorage.getClusterByName(clusterName)
  221. .orElseThrow(ClusterNotFoundException::new);
  222. if (!cluster.getTopics().containsKey(topicName)) {
  223. throw new TopicNotFoundException();
  224. }
  225. return consumingService.offsetsForDeletion(cluster, topicName, partitions)
  226. .flatMap(offsets -> kafkaService.deleteTopicMessages(cluster, offsets));
  227. }
  228. }