ClusterService.java 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373
  1. package com.provectus.kafka.ui.service;
  2. import com.provectus.kafka.ui.exception.ClusterNotFoundException;
  3. import com.provectus.kafka.ui.exception.IllegalEntityStateException;
  4. import com.provectus.kafka.ui.exception.NotFoundException;
  5. import com.provectus.kafka.ui.exception.TopicNotFoundException;
  6. import com.provectus.kafka.ui.mapper.ClusterMapper;
  7. import com.provectus.kafka.ui.model.Broker;
  8. import com.provectus.kafka.ui.model.BrokerMetrics;
  9. import com.provectus.kafka.ui.model.Cluster;
  10. import com.provectus.kafka.ui.model.ClusterMetrics;
  11. import com.provectus.kafka.ui.model.ClusterStats;
  12. import com.provectus.kafka.ui.model.ConsumerGroup;
  13. import com.provectus.kafka.ui.model.ConsumerGroupDetails;
  14. import com.provectus.kafka.ui.model.ConsumerPosition;
  15. import com.provectus.kafka.ui.model.CreateTopicMessage;
  16. import com.provectus.kafka.ui.model.ExtendedAdminClient;
  17. import com.provectus.kafka.ui.model.InternalTopic;
  18. import com.provectus.kafka.ui.model.KafkaCluster;
  19. import com.provectus.kafka.ui.model.PartitionsIncrease;
  20. import com.provectus.kafka.ui.model.PartitionsIncreaseResponse;
  21. import com.provectus.kafka.ui.model.ReplicationFactorChange;
  22. import com.provectus.kafka.ui.model.ReplicationFactorChangeResponse;
  23. import com.provectus.kafka.ui.model.Topic;
  24. import com.provectus.kafka.ui.model.TopicColumnsToSort;
  25. import com.provectus.kafka.ui.model.TopicConfig;
  26. import com.provectus.kafka.ui.model.TopicConsumerGroups;
  27. import com.provectus.kafka.ui.model.TopicCreation;
  28. import com.provectus.kafka.ui.model.TopicDetails;
  29. import com.provectus.kafka.ui.model.TopicMessage;
  30. import com.provectus.kafka.ui.model.TopicMessageSchema;
  31. import com.provectus.kafka.ui.model.TopicUpdate;
  32. import com.provectus.kafka.ui.model.TopicsResponse;
  33. import com.provectus.kafka.ui.serde.DeserializationService;
  34. import com.provectus.kafka.ui.util.ClusterUtil;
  35. import java.util.Collections;
  36. import java.util.Comparator;
  37. import java.util.List;
  38. import java.util.Map;
  39. import java.util.Optional;
  40. import java.util.function.Predicate;
  41. import java.util.stream.Collectors;
  42. import java.util.stream.Stream;
  43. import lombok.RequiredArgsConstructor;
  44. import lombok.SneakyThrows;
  45. import lombok.extern.log4j.Log4j2;
  46. import org.apache.commons.lang3.StringUtils;
  47. import org.apache.kafka.clients.admin.DeleteConsumerGroupsResult;
  48. import org.apache.kafka.common.TopicPartition;
  49. import org.apache.kafka.common.errors.GroupIdNotFoundException;
  50. import org.apache.kafka.common.errors.GroupNotEmptyException;
  51. import org.jetbrains.annotations.NotNull;
  52. import org.springframework.stereotype.Service;
  53. import reactor.core.publisher.Flux;
  54. import reactor.core.publisher.Mono;
  55. import reactor.util.function.Tuples;
  56. @Service
  57. @RequiredArgsConstructor
  58. @Log4j2
  59. public class ClusterService {
  60. private static final Integer DEFAULT_PAGE_SIZE = 25;
  61. private final ClustersStorage clustersStorage;
  62. private final ClusterMapper clusterMapper;
  63. private final KafkaService kafkaService;
  64. private final ConsumingService consumingService;
  65. private final DeserializationService deserializationService;
  66. public List<Cluster> getClusters() {
  67. return clustersStorage.getKafkaClusters()
  68. .stream()
  69. .map(clusterMapper::toCluster)
  70. .collect(Collectors.toList());
  71. }
  72. public Mono<BrokerMetrics> getBrokerMetrics(String name, Integer id) {
  73. return Mono.justOrEmpty(clustersStorage.getClusterByName(name)
  74. .map(c -> c.getMetrics().getInternalBrokerMetrics())
  75. .map(m -> m.get(id))
  76. .map(clusterMapper::toBrokerMetrics));
  77. }
  78. public Mono<ClusterStats> getClusterStats(String name) {
  79. return Mono.justOrEmpty(
  80. clustersStorage.getClusterByName(name)
  81. .map(KafkaCluster::getMetrics)
  82. .map(clusterMapper::toClusterStats)
  83. );
  84. }
  85. public Mono<ClusterMetrics> getClusterMetrics(String name) {
  86. return Mono.justOrEmpty(
  87. clustersStorage.getClusterByName(name)
  88. .map(KafkaCluster::getMetrics)
  89. .map(clusterMapper::toClusterMetrics)
  90. );
  91. }
  92. public TopicsResponse getTopics(String name, Optional<Integer> page,
  93. Optional<Integer> nullablePerPage,
  94. Optional<Boolean> showInternal,
  95. Optional<String> search,
  96. Optional<TopicColumnsToSort> sortBy) {
  97. Predicate<Integer> positiveInt = i -> i > 0;
  98. int perPage = nullablePerPage.filter(positiveInt).orElse(DEFAULT_PAGE_SIZE);
  99. var topicsToSkip = (page.filter(positiveInt).orElse(1) - 1) * perPage;
  100. var cluster = clustersStorage.getClusterByName(name)
  101. .orElseThrow(ClusterNotFoundException::new);
  102. List<Topic> topics = cluster.getTopics().values().stream()
  103. .filter(topic -> !topic.isInternal()
  104. || showInternal
  105. .map(i -> topic.isInternal() == i)
  106. .orElse(true))
  107. .filter(topic ->
  108. search
  109. .map(s -> StringUtils.containsIgnoreCase(topic.getName(), s))
  110. .orElse(true))
  111. .sorted(getComparatorForTopic(sortBy))
  112. .map(clusterMapper::toTopic)
  113. .collect(Collectors.toList());
  114. var totalPages = (topics.size() / perPage)
  115. + (topics.size() % perPage == 0 ? 0 : 1);
  116. return new TopicsResponse()
  117. .pageCount(totalPages)
  118. .topics(
  119. topics.stream()
  120. .skip(topicsToSkip)
  121. .limit(perPage)
  122. .collect(Collectors.toList())
  123. );
  124. }
  125. private Comparator<InternalTopic> getComparatorForTopic(Optional<TopicColumnsToSort> sortBy) {
  126. var defaultComparator = Comparator.comparing(InternalTopic::getName);
  127. if (sortBy.isEmpty()) {
  128. return defaultComparator;
  129. }
  130. switch (sortBy.get()) {
  131. case TOTAL_PARTITIONS:
  132. return Comparator.comparing(InternalTopic::getPartitionCount);
  133. case OUT_OF_SYNC_REPLICAS:
  134. return Comparator.comparing(t -> t.getReplicas() - t.getInSyncReplicas());
  135. case NAME:
  136. default:
  137. return defaultComparator;
  138. }
  139. }
  140. public Optional<TopicDetails> getTopicDetails(String name, String topicName) {
  141. return clustersStorage.getClusterByName(name)
  142. .flatMap(c ->
  143. Optional.ofNullable(
  144. c.getTopics().get(topicName)
  145. ).map(
  146. t -> t.toBuilder().partitions(
  147. kafkaService.getTopicPartitions(c, t)
  148. ).build()
  149. ).map(t -> clusterMapper.toTopicDetails(t, c.getMetrics()))
  150. );
  151. }
  152. public Optional<List<TopicConfig>> getTopicConfigs(String name, String topicName) {
  153. return clustersStorage.getClusterByName(name)
  154. .map(KafkaCluster::getTopics)
  155. .map(t -> t.get(topicName))
  156. .map(t -> t.getTopicConfigs().stream().map(clusterMapper::toTopicConfig)
  157. .collect(Collectors.toList()));
  158. }
  159. public Mono<Topic> createTopic(String clusterName, Mono<TopicCreation> topicCreation) {
  160. return clustersStorage.getClusterByName(clusterName).map(cluster ->
  161. kafkaService.createTopic(cluster, topicCreation)
  162. .doOnNext(t -> updateCluster(t, clusterName, cluster))
  163. .map(clusterMapper::toTopic)
  164. ).orElse(Mono.empty());
  165. }
  166. @SneakyThrows
  167. public Mono<ConsumerGroupDetails> getConsumerGroupDetail(String clusterName,
  168. String consumerGroupId) {
  169. var cluster = clustersStorage.getClusterByName(clusterName).orElseThrow(Throwable::new);
  170. return kafkaService.getOrCreateAdminClient(cluster).map(ac ->
  171. ac.getAdminClient().describeConsumerGroups(Collections.singletonList(consumerGroupId)).all()
  172. ).flatMap(groups ->
  173. kafkaService.groupMetadata(cluster, consumerGroupId)
  174. .flatMap(offsets -> {
  175. Map<TopicPartition, Long> endOffsets =
  176. kafkaService.topicPartitionsEndOffsets(cluster, offsets.keySet());
  177. return ClusterUtil.toMono(groups).map(s ->
  178. Tuples.of(
  179. s.get(consumerGroupId),
  180. s.get(consumerGroupId).members().stream()
  181. .flatMap(c ->
  182. Stream.of(
  183. ClusterUtil.convertToConsumerTopicPartitionDetails(
  184. c, offsets, endOffsets, consumerGroupId
  185. )
  186. )
  187. )
  188. .collect(Collectors.toList()).stream()
  189. .flatMap(t ->
  190. t.stream().flatMap(Stream::of)
  191. ).collect(Collectors.toList())
  192. )
  193. );
  194. }).map(c -> ClusterUtil.convertToConsumerGroupDetails(c.getT1(), c.getT2()))
  195. );
  196. }
  197. public Mono<List<ConsumerGroup>> getConsumerGroups(String clusterName) {
  198. return Mono.justOrEmpty(clustersStorage.getClusterByName(clusterName))
  199. .switchIfEmpty(Mono.error(ClusterNotFoundException::new))
  200. .flatMap(kafkaService::getConsumerGroups);
  201. }
  202. public Mono<TopicConsumerGroups> getTopicConsumerGroupDetail(
  203. String clusterName, String topicName) {
  204. return Mono.justOrEmpty(clustersStorage.getClusterByName(clusterName))
  205. .switchIfEmpty(Mono.error(ClusterNotFoundException::new))
  206. .flatMap(c -> kafkaService.getTopicConsumerGroups(c, topicName));
  207. }
  208. public Flux<Broker> getBrokers(String clusterName) {
  209. return kafkaService
  210. .getOrCreateAdminClient(clustersStorage.getClusterByName(clusterName).orElseThrow())
  211. .flatMap(client -> ClusterUtil.toMono(client.getAdminClient().describeCluster().nodes())
  212. .map(n -> n.stream().map(node -> {
  213. Broker broker = new Broker();
  214. broker.setId(node.id());
  215. broker.setHost(node.host());
  216. return broker;
  217. }).collect(Collectors.toList())))
  218. .flatMapMany(Flux::fromIterable);
  219. }
  220. @SneakyThrows
  221. public Mono<Topic> updateTopic(String clusterName, String topicName,
  222. Mono<TopicUpdate> topicUpdate) {
  223. return clustersStorage.getClusterByName(clusterName).map(cl ->
  224. topicUpdate
  225. .flatMap(t -> kafkaService.updateTopic(cl, topicName, t))
  226. .doOnNext(t -> updateCluster(t, clusterName, cl))
  227. .map(clusterMapper::toTopic)
  228. ).orElse(Mono.empty());
  229. }
  230. public Mono<Void> deleteTopic(String clusterName, String topicName) {
  231. var cluster = clustersStorage.getClusterByName(clusterName)
  232. .orElseThrow(ClusterNotFoundException::new);
  233. var topic = getTopicDetails(clusterName, topicName)
  234. .orElseThrow(TopicNotFoundException::new);
  235. return kafkaService.deleteTopic(cluster, topic.getName())
  236. .doOnNext(t -> updateCluster(topicName, clusterName, cluster));
  237. }
  238. private KafkaCluster updateCluster(InternalTopic topic, String clusterName,
  239. KafkaCluster cluster) {
  240. final KafkaCluster updatedCluster = kafkaService.getUpdatedCluster(cluster, topic);
  241. clustersStorage.setKafkaCluster(clusterName, updatedCluster);
  242. return updatedCluster;
  243. }
  244. private KafkaCluster updateCluster(String topicToDelete, String clusterName,
  245. KafkaCluster cluster) {
  246. final KafkaCluster updatedCluster = kafkaService.getUpdatedCluster(cluster, topicToDelete);
  247. clustersStorage.setKafkaCluster(clusterName, updatedCluster);
  248. return updatedCluster;
  249. }
  250. public Mono<Cluster> updateCluster(String clusterName) {
  251. return clustersStorage.getClusterByName(clusterName)
  252. .map(cluster -> kafkaService.getUpdatedCluster(cluster)
  253. .doOnNext(updatedCluster -> clustersStorage
  254. .setKafkaCluster(updatedCluster.getName(), updatedCluster))
  255. .map(clusterMapper::toCluster))
  256. .orElse(Mono.error(new ClusterNotFoundException()));
  257. }
  258. public Flux<TopicMessage> getMessages(String clusterName, String topicName,
  259. ConsumerPosition consumerPosition, String query,
  260. Integer limit) {
  261. return clustersStorage.getClusterByName(clusterName)
  262. .map(c -> consumingService.loadMessages(c, topicName, consumerPosition, query, limit))
  263. .orElse(Flux.empty());
  264. }
  265. public Mono<Void> deleteTopicMessages(String clusterName, String topicName,
  266. List<Integer> partitions) {
  267. var cluster = clustersStorage.getClusterByName(clusterName)
  268. .orElseThrow(ClusterNotFoundException::new);
  269. if (!cluster.getTopics().containsKey(topicName)) {
  270. throw new TopicNotFoundException();
  271. }
  272. return consumingService.offsetsForDeletion(cluster, topicName, partitions)
  273. .flatMap(offsets -> kafkaService.deleteTopicMessages(cluster, offsets));
  274. }
  275. public Mono<PartitionsIncreaseResponse> increaseTopicPartitions(
  276. String clusterName,
  277. String topicName,
  278. PartitionsIncrease partitionsIncrease) {
  279. return clustersStorage.getClusterByName(clusterName).map(cluster ->
  280. kafkaService.increaseTopicPartitions(cluster, topicName, partitionsIncrease)
  281. .doOnNext(t -> updateCluster(t, cluster.getName(), cluster))
  282. .map(t -> new PartitionsIncreaseResponse()
  283. .topicName(t.getName())
  284. .totalPartitionsCount(t.getPartitionCount())))
  285. .orElse(Mono.error(new ClusterNotFoundException(
  286. String.format("No cluster for name '%s'", clusterName)
  287. )));
  288. }
  289. public Mono<Void> deleteConsumerGroupById(String clusterName,
  290. String groupId) {
  291. return clustersStorage.getClusterByName(clusterName)
  292. .map(cluster -> kafkaService.getOrCreateAdminClient(cluster)
  293. .map(ExtendedAdminClient::getAdminClient)
  294. .map(adminClient -> adminClient.deleteConsumerGroups(List.of(groupId)))
  295. .map(DeleteConsumerGroupsResult::all)
  296. .flatMap(ClusterUtil::toMono)
  297. .onErrorResume(this::reThrowCustomException)
  298. )
  299. .orElse(Mono.empty());
  300. }
  301. public TopicMessageSchema getTopicSchema(String clusterName, String topicName) {
  302. var cluster = clustersStorage.getClusterByName(clusterName)
  303. .orElseThrow(ClusterNotFoundException::new);
  304. if (!cluster.getTopics().containsKey(topicName)) {
  305. throw new TopicNotFoundException();
  306. }
  307. return deserializationService
  308. .getRecordDeserializerForCluster(cluster)
  309. .getTopicSchema(topicName);
  310. }
  311. public Mono<Void> sendMessage(String clusterName, String topicName, CreateTopicMessage msg) {
  312. var cluster = clustersStorage.getClusterByName(clusterName)
  313. .orElseThrow(ClusterNotFoundException::new);
  314. if (!cluster.getTopics().containsKey(topicName)) {
  315. throw new TopicNotFoundException();
  316. }
  317. return kafkaService.sendMessage(cluster, topicName, msg).then();
  318. }
  319. @NotNull
  320. private Mono<Void> reThrowCustomException(Throwable e) {
  321. if (e instanceof GroupIdNotFoundException) {
  322. return Mono.error(new NotFoundException("The group id does not exist"));
  323. } else if (e instanceof GroupNotEmptyException) {
  324. return Mono.error(new IllegalEntityStateException("The group is not empty"));
  325. } else {
  326. return Mono.error(e);
  327. }
  328. }
  329. public Mono<ReplicationFactorChangeResponse> changeReplicationFactor(
  330. String clusterName,
  331. String topicName,
  332. ReplicationFactorChange replicationFactorChange) {
  333. return clustersStorage.getClusterByName(clusterName).map(cluster ->
  334. kafkaService.changeReplicationFactor(cluster, topicName, replicationFactorChange)
  335. .doOnNext(topic -> updateCluster(topic, cluster.getName(), cluster))
  336. .map(t -> new ReplicationFactorChangeResponse()
  337. .topicName(t.getName())
  338. .totalReplicationFactor(t.getReplicationFactor())))
  339. .orElse(Mono.error(new ClusterNotFoundException(
  340. String.format("No cluster for name '%s'", clusterName))));
  341. }
  342. }