TopicsController.java 9.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212
  1. package com.provectus.kafka.ui.controller;
  2. import static java.util.stream.Collectors.toList;
  3. import com.provectus.kafka.ui.api.TopicsApi;
  4. import com.provectus.kafka.ui.mapper.ClusterMapper;
  5. import com.provectus.kafka.ui.model.InternalTopic;
  6. import com.provectus.kafka.ui.model.InternalTopicConfig;
  7. import com.provectus.kafka.ui.model.PartitionsIncreaseDTO;
  8. import com.provectus.kafka.ui.model.PartitionsIncreaseResponseDTO;
  9. import com.provectus.kafka.ui.model.ReplicationFactorChangeDTO;
  10. import com.provectus.kafka.ui.model.ReplicationFactorChangeResponseDTO;
  11. import com.provectus.kafka.ui.model.SortOrderDTO;
  12. import com.provectus.kafka.ui.model.TopicAnalysisDTO;
  13. import com.provectus.kafka.ui.model.TopicColumnsToSortDTO;
  14. import com.provectus.kafka.ui.model.TopicConfigDTO;
  15. import com.provectus.kafka.ui.model.TopicCreationDTO;
  16. import com.provectus.kafka.ui.model.TopicDTO;
  17. import com.provectus.kafka.ui.model.TopicDetailsDTO;
  18. import com.provectus.kafka.ui.model.TopicUpdateDTO;
  19. import com.provectus.kafka.ui.model.TopicsResponseDTO;
  20. import com.provectus.kafka.ui.service.TopicsService;
  21. import com.provectus.kafka.ui.service.analyze.TopicAnalysisService;
  22. import java.util.Comparator;
  23. import java.util.List;
  24. import javax.validation.Valid;
  25. import lombok.RequiredArgsConstructor;
  26. import lombok.extern.slf4j.Slf4j;
  27. import org.apache.commons.lang3.StringUtils;
  28. import org.springframework.http.HttpStatus;
  29. import org.springframework.http.ResponseEntity;
  30. import org.springframework.web.bind.annotation.RestController;
  31. import org.springframework.web.server.ServerWebExchange;
  32. import reactor.core.publisher.Flux;
  33. import reactor.core.publisher.Mono;
  34. @RestController
  35. @RequiredArgsConstructor
  36. @Slf4j
  37. public class TopicsController extends AbstractController implements TopicsApi {
  38. private static final Integer DEFAULT_PAGE_SIZE = 25;
  39. private final TopicsService topicsService;
  40. private final TopicAnalysisService topicAnalysisService;
  41. private final ClusterMapper clusterMapper;
  42. @Override
  43. public Mono<ResponseEntity<TopicDTO>> createTopic(
  44. String clusterName, @Valid Mono<TopicCreationDTO> topicCreation, ServerWebExchange exchange) {
  45. return topicsService.createTopic(getCluster(clusterName), topicCreation)
  46. .map(clusterMapper::toTopic)
  47. .map(s -> new ResponseEntity<>(s, HttpStatus.OK))
  48. .switchIfEmpty(Mono.just(ResponseEntity.notFound().build()));
  49. }
  50. @Override
  51. public Mono<ResponseEntity<TopicDTO>> recreateTopic(String clusterName,
  52. String topicName, ServerWebExchange serverWebExchange) {
  53. return topicsService.recreateTopic(getCluster(clusterName), topicName)
  54. .map(clusterMapper::toTopic)
  55. .map(s -> new ResponseEntity<>(s, HttpStatus.CREATED));
  56. }
  57. @Override
  58. public Mono<ResponseEntity<TopicDTO>> cloneTopic(
  59. String clusterName, String topicName, String newTopicName, ServerWebExchange exchange) {
  60. return topicsService.cloneTopic(getCluster(clusterName), topicName, newTopicName)
  61. .map(clusterMapper::toTopic)
  62. .map(s -> new ResponseEntity<>(s, HttpStatus.CREATED));
  63. }
  64. @Override
  65. public Mono<ResponseEntity<Void>> deleteTopic(
  66. String clusterName, String topicName, ServerWebExchange exchange) {
  67. return topicsService.deleteTopic(getCluster(clusterName), topicName).map(ResponseEntity::ok);
  68. }
  69. @Override
  70. public Mono<ResponseEntity<Flux<TopicConfigDTO>>> getTopicConfigs(
  71. String clusterName, String topicName, ServerWebExchange exchange) {
  72. return topicsService.getTopicConfigs(getCluster(clusterName), topicName)
  73. .map(lst -> lst.stream()
  74. .map(InternalTopicConfig::from)
  75. .map(clusterMapper::toTopicConfig)
  76. .collect(toList()))
  77. .map(Flux::fromIterable)
  78. .map(ResponseEntity::ok);
  79. }
  80. @Override
  81. public Mono<ResponseEntity<TopicDetailsDTO>> getTopicDetails(
  82. String clusterName, String topicName, ServerWebExchange exchange) {
  83. return topicsService.getTopicDetails(getCluster(clusterName), topicName)
  84. .map(clusterMapper::toTopicDetails)
  85. .map(ResponseEntity::ok);
  86. }
  87. public Mono<ResponseEntity<TopicsResponseDTO>> getTopics(String clusterName, @Valid Integer page,
  88. @Valid Integer perPage,
  89. @Valid Boolean showInternal,
  90. @Valid String search,
  91. @Valid TopicColumnsToSortDTO orderBy,
  92. @Valid SortOrderDTO sortOrder,
  93. ServerWebExchange exchange) {
  94. return topicsService.getTopicsForPagination(getCluster(clusterName))
  95. .flatMap(existingTopics -> {
  96. int pageSize = perPage != null && perPage > 0 ? perPage : DEFAULT_PAGE_SIZE;
  97. var topicsToSkip = ((page != null && page > 0 ? page : 1) - 1) * pageSize;
  98. var comparator = sortOrder == null || !sortOrder.equals(SortOrderDTO.DESC)
  99. ? getComparatorForTopic(orderBy) : getComparatorForTopic(orderBy).reversed();
  100. List<InternalTopic> filtered = existingTopics.stream()
  101. .filter(topic -> !topic.isInternal()
  102. || showInternal != null && showInternal)
  103. .filter(topic -> search == null || StringUtils.contains(topic.getName(), search))
  104. .sorted(comparator)
  105. .collect(toList());
  106. var totalPages = (filtered.size() / pageSize)
  107. + (filtered.size() % pageSize == 0 ? 0 : 1);
  108. List<String> topicsPage = filtered.stream()
  109. .skip(topicsToSkip)
  110. .limit(pageSize)
  111. .map(InternalTopic::getName)
  112. .collect(toList());
  113. return topicsService.loadTopics(getCluster(clusterName), topicsPage)
  114. .map(topicsToRender ->
  115. new TopicsResponseDTO()
  116. .topics(topicsToRender.stream().map(clusterMapper::toTopic).collect(toList()))
  117. .pageCount(totalPages));
  118. }).map(ResponseEntity::ok);
  119. }
  120. private Comparator<InternalTopic> getComparatorForTopic(
  121. TopicColumnsToSortDTO orderBy) {
  122. var defaultComparator = Comparator.comparing(InternalTopic::getName);
  123. if (orderBy == null) {
  124. return defaultComparator;
  125. }
  126. switch (orderBy) {
  127. case TOTAL_PARTITIONS:
  128. return Comparator.comparing(InternalTopic::getPartitionCount);
  129. case OUT_OF_SYNC_REPLICAS:
  130. return Comparator.comparing(t -> t.getReplicas() - t.getInSyncReplicas());
  131. case REPLICATION_FACTOR:
  132. return Comparator.comparing(InternalTopic::getReplicationFactor);
  133. case SIZE:
  134. return Comparator.comparing(InternalTopic::getSegmentSize);
  135. case NAME:
  136. default:
  137. return defaultComparator;
  138. }
  139. }
  140. @Override
  141. public Mono<ResponseEntity<TopicDTO>> updateTopic(
  142. String clusterId, String topicName, @Valid Mono<TopicUpdateDTO> topicUpdate,
  143. ServerWebExchange exchange) {
  144. return topicsService
  145. .updateTopic(getCluster(clusterId), topicName, topicUpdate)
  146. .map(clusterMapper::toTopic)
  147. .map(ResponseEntity::ok);
  148. }
  149. @Override
  150. public Mono<ResponseEntity<PartitionsIncreaseResponseDTO>> increaseTopicPartitions(
  151. String clusterName, String topicName,
  152. Mono<PartitionsIncreaseDTO> partitionsIncrease,
  153. ServerWebExchange exchange) {
  154. return partitionsIncrease.flatMap(partitions ->
  155. topicsService.increaseTopicPartitions(getCluster(clusterName), topicName, partitions)
  156. ).map(ResponseEntity::ok);
  157. }
  158. @Override
  159. public Mono<ResponseEntity<ReplicationFactorChangeResponseDTO>> changeReplicationFactor(
  160. String clusterName, String topicName,
  161. Mono<ReplicationFactorChangeDTO> replicationFactorChange,
  162. ServerWebExchange exchange) {
  163. return replicationFactorChange
  164. .flatMap(rfc ->
  165. topicsService.changeReplicationFactor(getCluster(clusterName), topicName, rfc))
  166. .map(ResponseEntity::ok);
  167. }
  168. @Override
  169. public Mono<ResponseEntity<Void>> analyzeTopic(String clusterName, String topicName, ServerWebExchange exchange) {
  170. return topicAnalysisService.analyze(getCluster(clusterName), topicName)
  171. .thenReturn(ResponseEntity.ok().build());
  172. }
  173. @Override
  174. public Mono<ResponseEntity<Void>> cancelTopicAnalysis(String clusterName, String topicName,
  175. ServerWebExchange exchange) {
  176. topicAnalysisService.cancelAnalysis(getCluster(clusterName), topicName);
  177. return Mono.just(ResponseEntity.ok().build());
  178. }
  179. @Override
  180. public Mono<ResponseEntity<TopicAnalysisDTO>> getTopicAnalysis(String clusterName,
  181. String topicName,
  182. ServerWebExchange exchange) {
  183. return Mono.just(
  184. topicAnalysisService.getTopicAnalysis(getCluster(clusterName), topicName)
  185. .map(ResponseEntity::ok)
  186. .orElseGet(() -> ResponseEntity.notFound().build())
  187. );
  188. }
  189. }