ClusterService.java 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400
  1. package com.provectus.kafka.ui.service;
  2. import static com.provectus.kafka.ui.util.Constants.DELETE_TOPIC_ENABLE;
  3. import com.provectus.kafka.ui.exception.ClusterNotFoundException;
  4. import com.provectus.kafka.ui.exception.IllegalEntityStateException;
  5. import com.provectus.kafka.ui.exception.NotFoundException;
  6. import com.provectus.kafka.ui.exception.TopicNotFoundException;
  7. import com.provectus.kafka.ui.exception.ValidationException;
  8. import com.provectus.kafka.ui.mapper.ClusterMapper;
  9. import com.provectus.kafka.ui.mapper.DescribeLogDirsMapper;
  10. import com.provectus.kafka.ui.model.Broker;
  11. import com.provectus.kafka.ui.model.BrokerConfig;
  12. import com.provectus.kafka.ui.model.BrokerLogdirUpdate;
  13. import com.provectus.kafka.ui.model.BrokerMetrics;
  14. import com.provectus.kafka.ui.model.BrokersLogdirs;
  15. import com.provectus.kafka.ui.model.Cluster;
  16. import com.provectus.kafka.ui.model.ClusterMetrics;
  17. import com.provectus.kafka.ui.model.ClusterStats;
  18. import com.provectus.kafka.ui.model.ConsumerGroup;
  19. import com.provectus.kafka.ui.model.ConsumerGroupDetails;
  20. import com.provectus.kafka.ui.model.ConsumerPosition;
  21. import com.provectus.kafka.ui.model.CreateTopicMessage;
  22. import com.provectus.kafka.ui.model.ExtendedAdminClient;
  23. import com.provectus.kafka.ui.model.Feature;
  24. import com.provectus.kafka.ui.model.InternalTopic;
  25. import com.provectus.kafka.ui.model.KafkaCluster;
  26. import com.provectus.kafka.ui.model.PartitionsIncrease;
  27. import com.provectus.kafka.ui.model.PartitionsIncreaseResponse;
  28. import com.provectus.kafka.ui.model.ReplicationFactorChange;
  29. import com.provectus.kafka.ui.model.ReplicationFactorChangeResponse;
  30. import com.provectus.kafka.ui.model.Topic;
  31. import com.provectus.kafka.ui.model.TopicColumnsToSort;
  32. import com.provectus.kafka.ui.model.TopicConfig;
  33. import com.provectus.kafka.ui.model.TopicCreation;
  34. import com.provectus.kafka.ui.model.TopicDetails;
  35. import com.provectus.kafka.ui.model.TopicMessage;
  36. import com.provectus.kafka.ui.model.TopicMessageEvent;
  37. import com.provectus.kafka.ui.model.TopicMessageSchema;
  38. import com.provectus.kafka.ui.model.TopicUpdate;
  39. import com.provectus.kafka.ui.model.TopicsResponse;
  40. import com.provectus.kafka.ui.serde.DeserializationService;
  41. import com.provectus.kafka.ui.util.ClusterUtil;
  42. import java.util.Collections;
  43. import java.util.Comparator;
  44. import java.util.List;
  45. import java.util.Optional;
  46. import java.util.function.Predicate;
  47. import java.util.stream.Collectors;
  48. import lombok.RequiredArgsConstructor;
  49. import lombok.SneakyThrows;
  50. import lombok.extern.log4j.Log4j2;
  51. import org.apache.commons.lang3.StringUtils;
  52. import org.apache.kafka.clients.admin.DeleteConsumerGroupsResult;
  53. import org.apache.kafka.common.errors.GroupIdNotFoundException;
  54. import org.apache.kafka.common.errors.GroupNotEmptyException;
  55. import org.jetbrains.annotations.NotNull;
  56. import org.springframework.stereotype.Service;
  57. import reactor.core.publisher.Flux;
  58. import reactor.core.publisher.Mono;
  59. @Service
  60. @RequiredArgsConstructor
  61. @Log4j2
  62. public class ClusterService {
  63. private static final Integer DEFAULT_PAGE_SIZE = 25;
  64. private final ClustersStorage clustersStorage;
  65. private final ClusterMapper clusterMapper;
  66. private final KafkaService kafkaService;
  67. private final AdminClientService adminClientService;
  68. private final BrokerService brokerService;
  69. private final ConsumingService consumingService;
  70. private final DeserializationService deserializationService;
  71. private final DescribeLogDirsMapper describeLogDirsMapper;
  72. public List<Cluster> getClusters() {
  73. return clustersStorage.getKafkaClusters()
  74. .stream()
  75. .map(clusterMapper::toCluster)
  76. .collect(Collectors.toList());
  77. }
  78. public Mono<BrokerMetrics> getBrokerMetrics(String name, Integer id) {
  79. return Mono.justOrEmpty(clustersStorage.getClusterByName(name)
  80. .map(c -> c.getMetrics().getInternalBrokerMetrics())
  81. .map(m -> m.get(id))
  82. .map(clusterMapper::toBrokerMetrics));
  83. }
  84. public Mono<ClusterStats> getClusterStats(String name) {
  85. return Mono.justOrEmpty(
  86. clustersStorage.getClusterByName(name)
  87. .map(KafkaCluster::getMetrics)
  88. .map(clusterMapper::toClusterStats)
  89. );
  90. }
  91. public Mono<ClusterMetrics> getClusterMetrics(String name) {
  92. return Mono.justOrEmpty(
  93. clustersStorage.getClusterByName(name)
  94. .map(KafkaCluster::getMetrics)
  95. .map(clusterMapper::toClusterMetrics)
  96. );
  97. }
  98. public TopicsResponse getTopics(String name, Optional<Integer> page,
  99. Optional<Integer> nullablePerPage,
  100. Optional<Boolean> showInternal,
  101. Optional<String> search,
  102. Optional<TopicColumnsToSort> sortBy) {
  103. Predicate<Integer> positiveInt = i -> i > 0;
  104. int perPage = nullablePerPage.filter(positiveInt).orElse(DEFAULT_PAGE_SIZE);
  105. var topicsToSkip = (page.filter(positiveInt).orElse(1) - 1) * perPage;
  106. var cluster = clustersStorage.getClusterByName(name)
  107. .orElseThrow(ClusterNotFoundException::new);
  108. List<InternalTopic> topics = cluster.getTopics().values().stream()
  109. .filter(topic -> !topic.isInternal()
  110. || showInternal
  111. .map(i -> topic.isInternal() == i)
  112. .orElse(true))
  113. .filter(topic ->
  114. search
  115. .map(s -> StringUtils.containsIgnoreCase(topic.getName(), s))
  116. .orElse(true))
  117. .sorted(getComparatorForTopic(sortBy))
  118. .collect(Collectors.toList());
  119. var totalPages = (topics.size() / perPage)
  120. + (topics.size() % perPage == 0 ? 0 : 1);
  121. return new TopicsResponse()
  122. .pageCount(totalPages)
  123. .topics(
  124. topics.stream()
  125. .skip(topicsToSkip)
  126. .limit(perPage)
  127. .map(t ->
  128. clusterMapper.toTopic(
  129. t.toBuilder().partitions(
  130. kafkaService.getTopicPartitions(cluster, t)
  131. ).build()
  132. )
  133. )
  134. .collect(Collectors.toList())
  135. );
  136. }
  137. private Comparator<InternalTopic> getComparatorForTopic(Optional<TopicColumnsToSort> sortBy) {
  138. var defaultComparator = Comparator.comparing(InternalTopic::getName);
  139. if (sortBy.isEmpty()) {
  140. return defaultComparator;
  141. }
  142. switch (sortBy.get()) {
  143. case TOTAL_PARTITIONS:
  144. return Comparator.comparing(InternalTopic::getPartitionCount);
  145. case OUT_OF_SYNC_REPLICAS:
  146. return Comparator.comparing(t -> t.getReplicas() - t.getInSyncReplicas());
  147. case REPLICATION_FACTOR:
  148. return Comparator.comparing(InternalTopic::getReplicationFactor);
  149. case NAME:
  150. default:
  151. return defaultComparator;
  152. }
  153. }
  154. public Optional<TopicDetails> getTopicDetails(String name, String topicName) {
  155. return clustersStorage.getClusterByName(name)
  156. .flatMap(c ->
  157. Optional.ofNullable(c.getTopics()).map(l -> l.get(topicName)).map(
  158. t -> t.toBuilder().partitions(
  159. kafkaService.getTopicPartitions(c, t)
  160. ).build()
  161. ).map(t -> clusterMapper.toTopicDetails(t, c.getMetrics()))
  162. );
  163. }
  164. public Optional<List<TopicConfig>> getTopicConfigs(String name, String topicName) {
  165. return clustersStorage.getClusterByName(name)
  166. .map(KafkaCluster::getTopics)
  167. .map(t -> t.get(topicName))
  168. .map(t -> t.getTopicConfigs().stream().map(clusterMapper::toTopicConfig)
  169. .collect(Collectors.toList()));
  170. }
  171. public Mono<Topic> createTopic(String clusterName, Mono<TopicCreation> topicCreation) {
  172. return clustersStorage.getClusterByName(clusterName).map(cluster ->
  173. kafkaService.createTopic(cluster, topicCreation)
  174. .doOnNext(t -> updateCluster(t, clusterName, cluster))
  175. .map(clusterMapper::toTopic)
  176. ).orElse(Mono.empty());
  177. }
  178. @SneakyThrows
  179. public Mono<ConsumerGroupDetails> getConsumerGroupDetail(String clusterName,
  180. String consumerGroupId) {
  181. var cluster = clustersStorage.getClusterByName(clusterName).orElseThrow(Throwable::new);
  182. return kafkaService.getConsumerGroups(
  183. cluster,
  184. Optional.empty(),
  185. Collections.singletonList(consumerGroupId)
  186. ).filter(groups -> !groups.isEmpty()).map(groups -> groups.get(0)).map(
  187. ClusterUtil::convertToConsumerGroupDetails
  188. );
  189. }
  190. public Mono<List<ConsumerGroup>> getConsumerGroups(String clusterName) {
  191. return getConsumerGroups(clusterName, Optional.empty());
  192. }
  193. public Mono<List<ConsumerGroup>> getConsumerGroups(String clusterName, Optional<String> topic) {
  194. return Mono.justOrEmpty(clustersStorage.getClusterByName(clusterName))
  195. .switchIfEmpty(Mono.error(ClusterNotFoundException::new))
  196. .flatMap(c -> kafkaService.getConsumerGroups(c, topic, Collections.emptyList()))
  197. .map(c ->
  198. c.stream().map(ClusterUtil::convertToConsumerGroup).collect(Collectors.toList())
  199. );
  200. }
  201. public Flux<Broker> getBrokers(String clusterName) {
  202. return Mono.justOrEmpty(clustersStorage.getClusterByName(clusterName))
  203. .switchIfEmpty(Mono.error(ClusterNotFoundException::new))
  204. .flatMapMany(brokerService::getBrokers);
  205. }
  206. public Flux<BrokerConfig> getBrokerConfig(String clusterName, Integer brokerId) {
  207. return Mono.justOrEmpty(clustersStorage.getClusterByName(clusterName))
  208. .switchIfEmpty(Mono.error(ClusterNotFoundException::new))
  209. .flatMapMany(c -> brokerService.getBrokersConfig(c, brokerId))
  210. .map(clusterMapper::toBrokerConfig);
  211. }
  212. @SneakyThrows
  213. public Mono<Topic> updateTopic(String clusterName, String topicName,
  214. Mono<TopicUpdate> topicUpdate) {
  215. return clustersStorage.getClusterByName(clusterName).map(cl ->
  216. topicUpdate
  217. .flatMap(t -> kafkaService.updateTopic(cl, topicName, t))
  218. .doOnNext(t -> updateCluster(t, clusterName, cl))
  219. .map(clusterMapper::toTopic)
  220. ).orElse(Mono.empty());
  221. }
  222. public Mono<Void> deleteTopic(String clusterName, String topicName) {
  223. var cluster = clustersStorage.getClusterByName(clusterName)
  224. .orElseThrow(ClusterNotFoundException::new);
  225. var topic = getTopicDetails(clusterName, topicName)
  226. .orElseThrow(TopicNotFoundException::new);
  227. if (cluster.getFeatures().contains(Feature.TOPIC_DELETION)) {
  228. return kafkaService.deleteTopic(cluster, topic.getName())
  229. .doOnSuccess(t -> updateCluster(topicName, clusterName, cluster));
  230. } else {
  231. return Mono.error(new ValidationException("Topic deletion restricted"));
  232. }
  233. }
  234. private KafkaCluster updateCluster(InternalTopic topic, String clusterName,
  235. KafkaCluster cluster) {
  236. final KafkaCluster updatedCluster = kafkaService.getUpdatedCluster(cluster, topic);
  237. clustersStorage.setKafkaCluster(clusterName, updatedCluster);
  238. return updatedCluster;
  239. }
  240. private KafkaCluster updateCluster(String topicToDelete, String clusterName,
  241. KafkaCluster cluster) {
  242. final KafkaCluster updatedCluster = kafkaService.getUpdatedCluster(cluster, topicToDelete);
  243. clustersStorage.setKafkaCluster(clusterName, updatedCluster);
  244. return updatedCluster;
  245. }
  246. public Mono<Cluster> updateCluster(String clusterName) {
  247. return clustersStorage.getClusterByName(clusterName)
  248. .map(cluster -> kafkaService.getUpdatedCluster(cluster)
  249. .doOnNext(updatedCluster -> clustersStorage
  250. .setKafkaCluster(updatedCluster.getName(), updatedCluster))
  251. .map(clusterMapper::toCluster))
  252. .orElse(Mono.error(new ClusterNotFoundException()));
  253. }
  254. public Flux<TopicMessageEvent> getMessages(String clusterName, String topicName,
  255. ConsumerPosition consumerPosition, String query,
  256. Integer limit) {
  257. return clustersStorage.getClusterByName(clusterName)
  258. .map(c -> consumingService.loadMessages(c, topicName, consumerPosition, query, limit))
  259. .orElse(Flux.empty());
  260. }
  261. public Mono<Void> deleteTopicMessages(String clusterName, String topicName,
  262. List<Integer> partitions) {
  263. var cluster = clustersStorage.getClusterByName(clusterName)
  264. .orElseThrow(ClusterNotFoundException::new);
  265. if (!cluster.getTopics().containsKey(topicName)) {
  266. throw new TopicNotFoundException();
  267. }
  268. return consumingService.offsetsForDeletion(cluster, topicName, partitions)
  269. .flatMap(offsets -> kafkaService.deleteTopicMessages(cluster, offsets));
  270. }
  271. public Mono<PartitionsIncreaseResponse> increaseTopicPartitions(
  272. String clusterName,
  273. String topicName,
  274. PartitionsIncrease partitionsIncrease) {
  275. return clustersStorage.getClusterByName(clusterName).map(cluster ->
  276. kafkaService.increaseTopicPartitions(cluster, topicName, partitionsIncrease)
  277. .doOnNext(t -> updateCluster(t, cluster.getName(), cluster))
  278. .map(t -> new PartitionsIncreaseResponse()
  279. .topicName(t.getName())
  280. .totalPartitionsCount(t.getPartitionCount())))
  281. .orElse(Mono.error(new ClusterNotFoundException(
  282. String.format("No cluster for name '%s'", clusterName)
  283. )));
  284. }
  285. public Mono<Void> deleteConsumerGroupById(String clusterName,
  286. String groupId) {
  287. return clustersStorage.getClusterByName(clusterName)
  288. .map(cluster -> adminClientService.getOrCreateAdminClient(cluster)
  289. .map(ExtendedAdminClient::getAdminClient)
  290. .flatMap(adminClient ->
  291. ClusterUtil.toMono(
  292. adminClient.deleteConsumerGroups(List.of(groupId)).all()
  293. )
  294. )
  295. .onErrorResume(this::reThrowCustomException)
  296. )
  297. .orElse(Mono.empty());
  298. }
  299. public TopicMessageSchema getTopicSchema(String clusterName, String topicName) {
  300. var cluster = clustersStorage.getClusterByName(clusterName)
  301. .orElseThrow(ClusterNotFoundException::new);
  302. if (!cluster.getTopics().containsKey(topicName)) {
  303. throw new TopicNotFoundException();
  304. }
  305. return deserializationService
  306. .getRecordDeserializerForCluster(cluster)
  307. .getTopicSchema(topicName);
  308. }
  309. public Mono<Void> sendMessage(String clusterName, String topicName, CreateTopicMessage msg) {
  310. var cluster = clustersStorage.getClusterByName(clusterName)
  311. .orElseThrow(ClusterNotFoundException::new);
  312. if (!cluster.getTopics().containsKey(topicName)) {
  313. throw new TopicNotFoundException();
  314. }
  315. if (msg.getKey() == null && msg.getContent() == null) {
  316. throw new ValidationException("Invalid message: both key and value can't be null");
  317. }
  318. if (msg.getPartition() != null
  319. && msg.getPartition() > cluster.getTopics().get(topicName).getPartitionCount() - 1) {
  320. throw new ValidationException("Invalid partition");
  321. }
  322. return kafkaService.sendMessage(cluster, topicName, msg).then();
  323. }
  324. @NotNull
  325. private Mono<Void> reThrowCustomException(Throwable e) {
  326. if (e instanceof GroupIdNotFoundException) {
  327. return Mono.error(new NotFoundException("The group id does not exist"));
  328. } else if (e instanceof GroupNotEmptyException) {
  329. return Mono.error(new IllegalEntityStateException("The group is not empty"));
  330. } else {
  331. return Mono.error(e);
  332. }
  333. }
  334. public Mono<ReplicationFactorChangeResponse> changeReplicationFactor(
  335. String clusterName,
  336. String topicName,
  337. ReplicationFactorChange replicationFactorChange) {
  338. return clustersStorage.getClusterByName(clusterName).map(cluster ->
  339. kafkaService.changeReplicationFactor(cluster, topicName, replicationFactorChange)
  340. .doOnNext(topic -> updateCluster(topic, cluster.getName(), cluster))
  341. .map(t -> new ReplicationFactorChangeResponse()
  342. .topicName(t.getName())
  343. .totalReplicationFactor(t.getReplicationFactor())))
  344. .orElse(Mono.error(new ClusterNotFoundException(
  345. String.format("No cluster for name '%s'", clusterName))));
  346. }
  347. public Flux<BrokersLogdirs> getAllBrokersLogdirs(String clusterName, List<Integer> brokers) {
  348. return Mono.justOrEmpty(clustersStorage.getClusterByName(clusterName))
  349. .flatMap(c -> kafkaService.getClusterLogDirs(c, brokers))
  350. .map(describeLogDirsMapper::toBrokerLogDirsList)
  351. .flatMapMany(Flux::fromIterable);
  352. }
  353. public Mono<Void> updateBrokerLogDir(
  354. String clusterName, Integer id, BrokerLogdirUpdate brokerLogDir) {
  355. return Mono.justOrEmpty(clustersStorage.getClusterByName(clusterName))
  356. .flatMap(c -> kafkaService.updateBrokerLogDir(c, id, brokerLogDir));
  357. }
  358. public Mono<Void> updateBrokerConfigByName(String clusterName,
  359. Integer id,
  360. String name,
  361. String value) {
  362. return Mono.justOrEmpty(clustersStorage.getClusterByName(clusterName))
  363. .flatMap(c -> kafkaService.updateBrokerConfigByName(c, id, name, value));
  364. }
  365. }