ClusterService.java 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364
  1. package com.provectus.kafka.ui.service;
  2. import com.provectus.kafka.ui.exception.ClusterNotFoundException;
  3. import com.provectus.kafka.ui.exception.IllegalEntityStateException;
  4. import com.provectus.kafka.ui.exception.NotFoundException;
  5. import com.provectus.kafka.ui.exception.TopicNotFoundException;
  6. import com.provectus.kafka.ui.exception.ValidationException;
  7. import com.provectus.kafka.ui.mapper.ClusterMapper;
  8. import com.provectus.kafka.ui.model.Broker;
  9. import com.provectus.kafka.ui.model.BrokerMetrics;
  10. import com.provectus.kafka.ui.model.Cluster;
  11. import com.provectus.kafka.ui.model.ClusterMetrics;
  12. import com.provectus.kafka.ui.model.ClusterStats;
  13. import com.provectus.kafka.ui.model.ConsumerGroup;
  14. import com.provectus.kafka.ui.model.ConsumerGroupDetails;
  15. import com.provectus.kafka.ui.model.ConsumerPosition;
  16. import com.provectus.kafka.ui.model.CreateTopicMessage;
  17. import com.provectus.kafka.ui.model.ExtendedAdminClient;
  18. import com.provectus.kafka.ui.model.InternalTopic;
  19. import com.provectus.kafka.ui.model.KafkaCluster;
  20. import com.provectus.kafka.ui.model.PartitionsIncrease;
  21. import com.provectus.kafka.ui.model.PartitionsIncreaseResponse;
  22. import com.provectus.kafka.ui.model.ReplicationFactorChange;
  23. import com.provectus.kafka.ui.model.ReplicationFactorChangeResponse;
  24. import com.provectus.kafka.ui.model.Topic;
  25. import com.provectus.kafka.ui.model.TopicColumnsToSort;
  26. import com.provectus.kafka.ui.model.TopicConfig;
  27. import com.provectus.kafka.ui.model.TopicCreation;
  28. import com.provectus.kafka.ui.model.TopicDetails;
  29. import com.provectus.kafka.ui.model.TopicMessage;
  30. import com.provectus.kafka.ui.model.TopicMessageSchema;
  31. import com.provectus.kafka.ui.model.TopicUpdate;
  32. import com.provectus.kafka.ui.model.TopicsResponse;
  33. import com.provectus.kafka.ui.serde.DeserializationService;
  34. import com.provectus.kafka.ui.util.ClusterUtil;
  35. import java.util.Collections;
  36. import java.util.Comparator;
  37. import java.util.List;
  38. import java.util.Optional;
  39. import java.util.function.Predicate;
  40. import java.util.stream.Collectors;
  41. import lombok.RequiredArgsConstructor;
  42. import lombok.SneakyThrows;
  43. import lombok.extern.log4j.Log4j2;
  44. import org.apache.commons.lang3.StringUtils;
  45. import org.apache.kafka.clients.admin.DeleteConsumerGroupsResult;
  46. import org.apache.kafka.common.errors.GroupIdNotFoundException;
  47. import org.apache.kafka.common.errors.GroupNotEmptyException;
  48. import org.jetbrains.annotations.NotNull;
  49. import org.springframework.stereotype.Service;
  50. import reactor.core.publisher.Flux;
  51. import reactor.core.publisher.Mono;
  52. @Service
  53. @RequiredArgsConstructor
  54. @Log4j2
  55. public class ClusterService {
  56. private static final Integer DEFAULT_PAGE_SIZE = 25;
  57. private final ClustersStorage clustersStorage;
  58. private final ClusterMapper clusterMapper;
  59. private final KafkaService kafkaService;
  60. private final ConsumingService consumingService;
  61. private final DeserializationService deserializationService;
  62. public List<Cluster> getClusters() {
  63. return clustersStorage.getKafkaClusters()
  64. .stream()
  65. .map(clusterMapper::toCluster)
  66. .collect(Collectors.toList());
  67. }
  68. public Mono<BrokerMetrics> getBrokerMetrics(String name, Integer id) {
  69. return Mono.justOrEmpty(clustersStorage.getClusterByName(name)
  70. .map(c -> c.getMetrics().getInternalBrokerMetrics())
  71. .map(m -> m.get(id))
  72. .map(clusterMapper::toBrokerMetrics));
  73. }
  74. public Mono<ClusterStats> getClusterStats(String name) {
  75. return Mono.justOrEmpty(
  76. clustersStorage.getClusterByName(name)
  77. .map(KafkaCluster::getMetrics)
  78. .map(clusterMapper::toClusterStats)
  79. );
  80. }
  81. public Mono<ClusterMetrics> getClusterMetrics(String name) {
  82. return Mono.justOrEmpty(
  83. clustersStorage.getClusterByName(name)
  84. .map(KafkaCluster::getMetrics)
  85. .map(clusterMapper::toClusterMetrics)
  86. );
  87. }
  88. public TopicsResponse getTopics(String name, Optional<Integer> page,
  89. Optional<Integer> nullablePerPage,
  90. Optional<Boolean> showInternal,
  91. Optional<String> search,
  92. Optional<TopicColumnsToSort> sortBy) {
  93. Predicate<Integer> positiveInt = i -> i > 0;
  94. int perPage = nullablePerPage.filter(positiveInt).orElse(DEFAULT_PAGE_SIZE);
  95. var topicsToSkip = (page.filter(positiveInt).orElse(1) - 1) * perPage;
  96. var cluster = clustersStorage.getClusterByName(name)
  97. .orElseThrow(ClusterNotFoundException::new);
  98. List<InternalTopic> topics = cluster.getTopics().values().stream()
  99. .filter(topic -> !topic.isInternal()
  100. || showInternal
  101. .map(i -> topic.isInternal() == i)
  102. .orElse(true))
  103. .filter(topic ->
  104. search
  105. .map(s -> StringUtils.containsIgnoreCase(topic.getName(), s))
  106. .orElse(true))
  107. .sorted(getComparatorForTopic(sortBy))
  108. .collect(Collectors.toList());
  109. var totalPages = (topics.size() / perPage)
  110. + (topics.size() % perPage == 0 ? 0 : 1);
  111. return new TopicsResponse()
  112. .pageCount(totalPages)
  113. .topics(
  114. topics.stream()
  115. .skip(topicsToSkip)
  116. .limit(perPage)
  117. .map(t ->
  118. clusterMapper.toTopic(
  119. t.toBuilder().partitions(
  120. kafkaService.getTopicPartitions(cluster, t)
  121. ).build()
  122. )
  123. )
  124. .collect(Collectors.toList())
  125. );
  126. }
  127. private Comparator<InternalTopic> getComparatorForTopic(Optional<TopicColumnsToSort> sortBy) {
  128. var defaultComparator = Comparator.comparing(InternalTopic::getName);
  129. if (sortBy.isEmpty()) {
  130. return defaultComparator;
  131. }
  132. switch (sortBy.get()) {
  133. case TOTAL_PARTITIONS:
  134. return Comparator.comparing(InternalTopic::getPartitionCount);
  135. case OUT_OF_SYNC_REPLICAS:
  136. return Comparator.comparing(t -> t.getReplicas() - t.getInSyncReplicas());
  137. case REPLICATION_FACTOR:
  138. return Comparator.comparing(InternalTopic::getReplicationFactor);
  139. case NAME:
  140. default:
  141. return defaultComparator;
  142. }
  143. }
  144. public Optional<TopicDetails> getTopicDetails(String name, String topicName) {
  145. return clustersStorage.getClusterByName(name)
  146. .flatMap(c ->
  147. Optional.ofNullable(
  148. c.getTopics().get(topicName)
  149. ).map(
  150. t -> t.toBuilder().partitions(
  151. kafkaService.getTopicPartitions(c, t)
  152. ).build()
  153. ).map(t -> clusterMapper.toTopicDetails(t, c.getMetrics()))
  154. );
  155. }
  156. public Optional<List<TopicConfig>> getTopicConfigs(String name, String topicName) {
  157. return clustersStorage.getClusterByName(name)
  158. .map(KafkaCluster::getTopics)
  159. .map(t -> t.get(topicName))
  160. .map(t -> t.getTopicConfigs().stream().map(clusterMapper::toTopicConfig)
  161. .collect(Collectors.toList()));
  162. }
  163. public Mono<Topic> createTopic(String clusterName, Mono<TopicCreation> topicCreation) {
  164. return clustersStorage.getClusterByName(clusterName).map(cluster ->
  165. kafkaService.createTopic(cluster, topicCreation)
  166. .doOnNext(t -> updateCluster(t, clusterName, cluster))
  167. .map(clusterMapper::toTopic)
  168. ).orElse(Mono.empty());
  169. }
  170. @SneakyThrows
  171. public Mono<ConsumerGroupDetails> getConsumerGroupDetail(String clusterName,
  172. String consumerGroupId) {
  173. var cluster = clustersStorage.getClusterByName(clusterName).orElseThrow(Throwable::new);
  174. return kafkaService.getConsumerGroups(
  175. cluster,
  176. Optional.empty(),
  177. Collections.singletonList(consumerGroupId)
  178. ).filter(groups -> !groups.isEmpty()).map(groups -> groups.get(0)).map(
  179. ClusterUtil::convertToConsumerGroupDetails
  180. );
  181. }
  182. public Mono<List<ConsumerGroup>> getConsumerGroups(String clusterName) {
  183. return getConsumerGroups(clusterName, Optional.empty());
  184. }
  185. public Mono<List<ConsumerGroup>> getConsumerGroups(String clusterName, Optional<String> topic) {
  186. return Mono.justOrEmpty(clustersStorage.getClusterByName(clusterName))
  187. .switchIfEmpty(Mono.error(ClusterNotFoundException::new))
  188. .flatMap(c -> kafkaService.getConsumerGroups(c, topic, Collections.emptyList()))
  189. .map(c ->
  190. c.stream().map(ClusterUtil::convertToConsumerGroup).collect(Collectors.toList())
  191. );
  192. }
  193. public Flux<Broker> getBrokers(String clusterName) {
  194. return kafkaService
  195. .getOrCreateAdminClient(clustersStorage.getClusterByName(clusterName).orElseThrow())
  196. .flatMap(client -> ClusterUtil.toMono(client.getAdminClient().describeCluster().nodes())
  197. .map(n -> n.stream().map(node -> {
  198. Broker broker = new Broker();
  199. broker.setId(node.id());
  200. broker.setHost(node.host());
  201. return broker;
  202. }).collect(Collectors.toList())))
  203. .flatMapMany(Flux::fromIterable);
  204. }
  205. @SneakyThrows
  206. public Mono<Topic> updateTopic(String clusterName, String topicName,
  207. Mono<TopicUpdate> topicUpdate) {
  208. return clustersStorage.getClusterByName(clusterName).map(cl ->
  209. topicUpdate
  210. .flatMap(t -> kafkaService.updateTopic(cl, topicName, t))
  211. .doOnNext(t -> updateCluster(t, clusterName, cl))
  212. .map(clusterMapper::toTopic)
  213. ).orElse(Mono.empty());
  214. }
  215. public Mono<Void> deleteTopic(String clusterName, String topicName) {
  216. var cluster = clustersStorage.getClusterByName(clusterName)
  217. .orElseThrow(ClusterNotFoundException::new);
  218. var topic = getTopicDetails(clusterName, topicName)
  219. .orElseThrow(TopicNotFoundException::new);
  220. return kafkaService.deleteTopic(cluster, topic.getName())
  221. .doOnNext(t -> updateCluster(topicName, clusterName, cluster));
  222. }
  223. private KafkaCluster updateCluster(InternalTopic topic, String clusterName,
  224. KafkaCluster cluster) {
  225. final KafkaCluster updatedCluster = kafkaService.getUpdatedCluster(cluster, topic);
  226. clustersStorage.setKafkaCluster(clusterName, updatedCluster);
  227. return updatedCluster;
  228. }
  229. private KafkaCluster updateCluster(String topicToDelete, String clusterName,
  230. KafkaCluster cluster) {
  231. final KafkaCluster updatedCluster = kafkaService.getUpdatedCluster(cluster, topicToDelete);
  232. clustersStorage.setKafkaCluster(clusterName, updatedCluster);
  233. return updatedCluster;
  234. }
  235. public Mono<Cluster> updateCluster(String clusterName) {
  236. return clustersStorage.getClusterByName(clusterName)
  237. .map(cluster -> kafkaService.getUpdatedCluster(cluster)
  238. .doOnNext(updatedCluster -> clustersStorage
  239. .setKafkaCluster(updatedCluster.getName(), updatedCluster))
  240. .map(clusterMapper::toCluster))
  241. .orElse(Mono.error(new ClusterNotFoundException()));
  242. }
  243. public Flux<TopicMessage> getMessages(String clusterName, String topicName,
  244. ConsumerPosition consumerPosition, String query,
  245. Integer limit) {
  246. return clustersStorage.getClusterByName(clusterName)
  247. .map(c -> consumingService.loadMessages(c, topicName, consumerPosition, query, limit))
  248. .orElse(Flux.empty());
  249. }
  250. public Mono<Void> deleteTopicMessages(String clusterName, String topicName,
  251. List<Integer> partitions) {
  252. var cluster = clustersStorage.getClusterByName(clusterName)
  253. .orElseThrow(ClusterNotFoundException::new);
  254. if (!cluster.getTopics().containsKey(topicName)) {
  255. throw new TopicNotFoundException();
  256. }
  257. return consumingService.offsetsForDeletion(cluster, topicName, partitions)
  258. .flatMap(offsets -> kafkaService.deleteTopicMessages(cluster, offsets));
  259. }
  260. public Mono<PartitionsIncreaseResponse> increaseTopicPartitions(
  261. String clusterName,
  262. String topicName,
  263. PartitionsIncrease partitionsIncrease) {
  264. return clustersStorage.getClusterByName(clusterName).map(cluster ->
  265. kafkaService.increaseTopicPartitions(cluster, topicName, partitionsIncrease)
  266. .doOnNext(t -> updateCluster(t, cluster.getName(), cluster))
  267. .map(t -> new PartitionsIncreaseResponse()
  268. .topicName(t.getName())
  269. .totalPartitionsCount(t.getPartitionCount())))
  270. .orElse(Mono.error(new ClusterNotFoundException(
  271. String.format("No cluster for name '%s'", clusterName)
  272. )));
  273. }
  274. public Mono<Void> deleteConsumerGroupById(String clusterName,
  275. String groupId) {
  276. return clustersStorage.getClusterByName(clusterName)
  277. .map(cluster -> kafkaService.getOrCreateAdminClient(cluster)
  278. .map(ExtendedAdminClient::getAdminClient)
  279. .map(adminClient -> adminClient.deleteConsumerGroups(List.of(groupId)))
  280. .map(DeleteConsumerGroupsResult::all)
  281. .flatMap(ClusterUtil::toMono)
  282. .onErrorResume(this::reThrowCustomException)
  283. )
  284. .orElse(Mono.empty());
  285. }
  286. public TopicMessageSchema getTopicSchema(String clusterName, String topicName) {
  287. var cluster = clustersStorage.getClusterByName(clusterName)
  288. .orElseThrow(ClusterNotFoundException::new);
  289. if (!cluster.getTopics().containsKey(topicName)) {
  290. throw new TopicNotFoundException();
  291. }
  292. return deserializationService
  293. .getRecordDeserializerForCluster(cluster)
  294. .getTopicSchema(topicName);
  295. }
  296. public Mono<Void> sendMessage(String clusterName, String topicName, CreateTopicMessage msg) {
  297. var cluster = clustersStorage.getClusterByName(clusterName)
  298. .orElseThrow(ClusterNotFoundException::new);
  299. if (!cluster.getTopics().containsKey(topicName)) {
  300. throw new TopicNotFoundException();
  301. }
  302. if (msg.getKey() == null && msg.getContent() == null) {
  303. throw new ValidationException("Invalid message: both key and value can't be null");
  304. }
  305. if (msg.getPartition() != null
  306. && msg.getPartition() > cluster.getTopics().get(topicName).getPartitionCount() - 1) {
  307. throw new ValidationException("Invalid partition");
  308. }
  309. return kafkaService.sendMessage(cluster, topicName, msg).then();
  310. }
  311. @NotNull
  312. private Mono<Void> reThrowCustomException(Throwable e) {
  313. if (e instanceof GroupIdNotFoundException) {
  314. return Mono.error(new NotFoundException("The group id does not exist"));
  315. } else if (e instanceof GroupNotEmptyException) {
  316. return Mono.error(new IllegalEntityStateException("The group is not empty"));
  317. } else {
  318. return Mono.error(e);
  319. }
  320. }
  321. public Mono<ReplicationFactorChangeResponse> changeReplicationFactor(
  322. String clusterName,
  323. String topicName,
  324. ReplicationFactorChange replicationFactorChange) {
  325. return clustersStorage.getClusterByName(clusterName).map(cluster ->
  326. kafkaService.changeReplicationFactor(cluster, topicName, replicationFactorChange)
  327. .doOnNext(topic -> updateCluster(topic, cluster.getName(), cluster))
  328. .map(t -> new ReplicationFactorChangeResponse()
  329. .topicName(t.getName())
  330. .totalReplicationFactor(t.getReplicationFactor())))
  331. .orElse(Mono.error(new ClusterNotFoundException(
  332. String.format("No cluster for name '%s'", clusterName))));
  333. }
  334. }