TopicsController.java 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326
  1. package com.provectus.kafka.ui.controller;
  2. import static com.provectus.kafka.ui.model.rbac.permission.TopicAction.CREATE;
  3. import static com.provectus.kafka.ui.model.rbac.permission.TopicAction.DELETE;
  4. import static com.provectus.kafka.ui.model.rbac.permission.TopicAction.EDIT;
  5. import static com.provectus.kafka.ui.model.rbac.permission.TopicAction.MESSAGES_READ;
  6. import static com.provectus.kafka.ui.model.rbac.permission.TopicAction.VIEW;
  7. import static java.util.stream.Collectors.toList;
  8. import com.provectus.kafka.ui.api.TopicsApi;
  9. import com.provectus.kafka.ui.mapper.ClusterMapper;
  10. import com.provectus.kafka.ui.model.InternalTopic;
  11. import com.provectus.kafka.ui.model.InternalTopicConfig;
  12. import com.provectus.kafka.ui.model.PartitionsIncreaseDTO;
  13. import com.provectus.kafka.ui.model.PartitionsIncreaseResponseDTO;
  14. import com.provectus.kafka.ui.model.ReplicationFactorChangeDTO;
  15. import com.provectus.kafka.ui.model.ReplicationFactorChangeResponseDTO;
  16. import com.provectus.kafka.ui.model.SortOrderDTO;
  17. import com.provectus.kafka.ui.model.TopicAnalysisDTO;
  18. import com.provectus.kafka.ui.model.TopicColumnsToSortDTO;
  19. import com.provectus.kafka.ui.model.TopicConfigDTO;
  20. import com.provectus.kafka.ui.model.TopicCreationDTO;
  21. import com.provectus.kafka.ui.model.TopicDTO;
  22. import com.provectus.kafka.ui.model.TopicDetailsDTO;
  23. import com.provectus.kafka.ui.model.TopicUpdateDTO;
  24. import com.provectus.kafka.ui.model.TopicsResponseDTO;
  25. import com.provectus.kafka.ui.model.rbac.AccessContext;
  26. import com.provectus.kafka.ui.service.TopicsService;
  27. import com.provectus.kafka.ui.service.analyze.TopicAnalysisService;
  28. import com.provectus.kafka.ui.service.rbac.AccessControlService;
  29. import java.util.Comparator;
  30. import java.util.List;
  31. import javax.validation.Valid;
  32. import lombok.RequiredArgsConstructor;
  33. import lombok.extern.slf4j.Slf4j;
  34. import org.apache.commons.lang3.StringUtils;
  35. import org.springframework.http.HttpStatus;
  36. import org.springframework.http.ResponseEntity;
  37. import org.springframework.web.bind.annotation.RestController;
  38. import org.springframework.web.server.ServerWebExchange;
  39. import reactor.core.publisher.Flux;
  40. import reactor.core.publisher.Mono;
  41. @RestController
  42. @RequiredArgsConstructor
  43. @Slf4j
  44. public class TopicsController extends AbstractController implements TopicsApi {
  45. private static final Integer DEFAULT_PAGE_SIZE = 25;
  46. private final TopicsService topicsService;
  47. private final TopicAnalysisService topicAnalysisService;
  48. private final ClusterMapper clusterMapper;
  49. private final AccessControlService accessControlService;
  50. @Override
  51. public Mono<ResponseEntity<TopicDTO>> createTopic(
  52. String clusterName, @Valid Mono<TopicCreationDTO> topicCreation, ServerWebExchange exchange) {
  53. Mono<Void> validateAccess = accessControlService.validateAccess(AccessContext.builder()
  54. .cluster(clusterName)
  55. .topicActions(CREATE)
  56. .build());
  57. return validateAccess.then(
  58. topicsService.createTopic(getCluster(clusterName), topicCreation)
  59. .map(clusterMapper::toTopic)
  60. .map(s -> new ResponseEntity<>(s, HttpStatus.OK))
  61. .switchIfEmpty(Mono.just(ResponseEntity.notFound().build()))
  62. );
  63. }
  64. @Override
  65. public Mono<ResponseEntity<TopicDTO>> recreateTopic(String clusterName,
  66. String topicName, ServerWebExchange exchange) {
  67. Mono<Void> validateAccess = accessControlService.validateAccess(AccessContext.builder()
  68. .cluster(clusterName)
  69. .topic(topicName)
  70. .topicActions(VIEW, CREATE, DELETE)
  71. .build());
  72. return validateAccess.then(
  73. topicsService.recreateTopic(getCluster(clusterName), topicName)
  74. .map(clusterMapper::toTopic)
  75. .map(s -> new ResponseEntity<>(s, HttpStatus.CREATED))
  76. );
  77. }
  78. @Override
  79. public Mono<ResponseEntity<TopicDTO>> cloneTopic(
  80. String clusterName, String topicName, String newTopicName, ServerWebExchange exchange) {
  81. Mono<Void> validateAccess = accessControlService.validateAccess(AccessContext.builder()
  82. .cluster(clusterName)
  83. .topic(topicName)
  84. .topicActions(VIEW, CREATE)
  85. .build());
  86. return validateAccess.then(topicsService.cloneTopic(getCluster(clusterName), topicName, newTopicName)
  87. .map(clusterMapper::toTopic)
  88. .map(s -> new ResponseEntity<>(s, HttpStatus.CREATED))
  89. );
  90. }
  91. @Override
  92. public Mono<ResponseEntity<Void>> deleteTopic(
  93. String clusterName, String topicName, ServerWebExchange exchange) {
  94. Mono<Void> validateAccess = accessControlService.validateAccess(AccessContext.builder()
  95. .cluster(clusterName)
  96. .topic(topicName)
  97. .topicActions(DELETE)
  98. .build());
  99. return validateAccess.then(
  100. topicsService.deleteTopic(getCluster(clusterName), topicName).map(ResponseEntity::ok)
  101. );
  102. }
  103. @Override
  104. public Mono<ResponseEntity<Flux<TopicConfigDTO>>> getTopicConfigs(
  105. String clusterName, String topicName, ServerWebExchange exchange) {
  106. Mono<Void> validateAccess = accessControlService.validateAccess(AccessContext.builder()
  107. .cluster(clusterName)
  108. .topic(topicName)
  109. .topicActions(VIEW)
  110. .build());
  111. return validateAccess.then(
  112. topicsService.getTopicConfigs(getCluster(clusterName), topicName)
  113. .map(lst -> lst.stream()
  114. .map(InternalTopicConfig::from)
  115. .map(clusterMapper::toTopicConfig)
  116. .collect(toList()))
  117. .map(Flux::fromIterable)
  118. .map(ResponseEntity::ok)
  119. );
  120. }
  121. @Override
  122. public Mono<ResponseEntity<TopicDetailsDTO>> getTopicDetails(
  123. String clusterName, String topicName, ServerWebExchange exchange) {
  124. Mono<Void> validateAccess = accessControlService.validateAccess(AccessContext.builder()
  125. .cluster(clusterName)
  126. .topic(topicName)
  127. .topicActions(VIEW)
  128. .build());
  129. return validateAccess.then(
  130. topicsService.getTopicDetails(getCluster(clusterName), topicName)
  131. .map(clusterMapper::toTopicDetails)
  132. .map(ResponseEntity::ok)
  133. );
  134. }
  135. @Override
  136. public Mono<ResponseEntity<TopicsResponseDTO>> getTopics(String clusterName,
  137. @Valid Integer page,
  138. @Valid Integer perPage,
  139. @Valid Boolean showInternal,
  140. @Valid String search,
  141. @Valid TopicColumnsToSortDTO orderBy,
  142. @Valid SortOrderDTO sortOrder,
  143. ServerWebExchange exchange) {
  144. return topicsService.getTopicsForPagination(getCluster(clusterName))
  145. .flatMap(existingTopics -> {
  146. int pageSize = perPage != null && perPage > 0 ? perPage : DEFAULT_PAGE_SIZE;
  147. var topicsToSkip = ((page != null && page > 0 ? page : 1) - 1) * pageSize;
  148. var comparator = sortOrder == null || !sortOrder.equals(SortOrderDTO.DESC)
  149. ? getComparatorForTopic(orderBy) : getComparatorForTopic(orderBy).reversed();
  150. List<InternalTopic> filtered = existingTopics.stream()
  151. .filter(topic -> !topic.isInternal()
  152. || showInternal != null && showInternal)
  153. .filter(topic -> search == null || StringUtils.contains(topic.getName(), search))
  154. .sorted(comparator)
  155. .toList();
  156. var totalPages = (filtered.size() / pageSize)
  157. + (filtered.size() % pageSize == 0 ? 0 : 1);
  158. List<String> topicsPage = filtered.stream()
  159. .skip(topicsToSkip)
  160. .limit(pageSize)
  161. .map(InternalTopic::getName)
  162. .collect(toList());
  163. return topicsService.loadTopics(getCluster(clusterName), topicsPage)
  164. .flatMapMany(Flux::fromIterable)
  165. .filterWhen(dto -> accessControlService.isTopicAccessible(dto, clusterName))
  166. .collectList()
  167. .map(topicsToRender ->
  168. new TopicsResponseDTO()
  169. .topics(topicsToRender.stream().map(clusterMapper::toTopic).collect(toList()))
  170. .pageCount(totalPages));
  171. })
  172. .map(ResponseEntity::ok);
  173. }
  174. @Override
  175. public Mono<ResponseEntity<TopicDTO>> updateTopic(
  176. String clusterName, String topicName, @Valid Mono<TopicUpdateDTO> topicUpdate,
  177. ServerWebExchange exchange) {
  178. Mono<Void> validateAccess = accessControlService.validateAccess(AccessContext.builder()
  179. .cluster(clusterName)
  180. .topic(topicName)
  181. .topicActions(VIEW, EDIT)
  182. .build());
  183. return validateAccess.then(
  184. topicsService
  185. .updateTopic(getCluster(clusterName), topicName, topicUpdate)
  186. .map(clusterMapper::toTopic)
  187. .map(ResponseEntity::ok)
  188. );
  189. }
  190. @Override
  191. public Mono<ResponseEntity<PartitionsIncreaseResponseDTO>> increaseTopicPartitions(
  192. String clusterName, String topicName,
  193. Mono<PartitionsIncreaseDTO> partitionsIncrease,
  194. ServerWebExchange exchange) {
  195. Mono<Void> validateAccess = accessControlService.validateAccess(AccessContext.builder()
  196. .cluster(clusterName)
  197. .topic(topicName)
  198. .topicActions(VIEW, EDIT)
  199. .build());
  200. return validateAccess.then(
  201. partitionsIncrease.flatMap(partitions ->
  202. topicsService.increaseTopicPartitions(getCluster(clusterName), topicName, partitions)
  203. ).map(ResponseEntity::ok)
  204. );
  205. }
  206. @Override
  207. public Mono<ResponseEntity<ReplicationFactorChangeResponseDTO>> changeReplicationFactor(
  208. String clusterName, String topicName,
  209. Mono<ReplicationFactorChangeDTO> replicationFactorChange,
  210. ServerWebExchange exchange) {
  211. Mono<Void> validateAccess = accessControlService.validateAccess(AccessContext.builder()
  212. .cluster(clusterName)
  213. .topic(topicName)
  214. .topicActions(VIEW, EDIT)
  215. .build());
  216. return validateAccess.then(
  217. replicationFactorChange
  218. .flatMap(rfc ->
  219. topicsService.changeReplicationFactor(getCluster(clusterName), topicName, rfc))
  220. .map(ResponseEntity::ok)
  221. );
  222. }
  223. @Override
  224. public Mono<ResponseEntity<Void>> analyzeTopic(String clusterName, String topicName, ServerWebExchange exchange) {
  225. Mono<Void> validateAccess = accessControlService.validateAccess(AccessContext.builder()
  226. .cluster(clusterName)
  227. .topic(topicName)
  228. .topicActions(MESSAGES_READ)
  229. .build());
  230. return validateAccess.then(
  231. topicAnalysisService.analyze(getCluster(clusterName), topicName)
  232. .thenReturn(ResponseEntity.ok().build())
  233. );
  234. }
  235. @Override
  236. public Mono<ResponseEntity<Void>> cancelTopicAnalysis(String clusterName, String topicName,
  237. ServerWebExchange exchange) {
  238. Mono<Void> validateAccess = accessControlService.validateAccess(AccessContext.builder()
  239. .cluster(clusterName)
  240. .topic(topicName)
  241. .topicActions(MESSAGES_READ)
  242. .build());
  243. topicAnalysisService.cancelAnalysis(getCluster(clusterName), topicName);
  244. return validateAccess.thenReturn(ResponseEntity.ok().build());
  245. }
  246. @Override
  247. public Mono<ResponseEntity<TopicAnalysisDTO>> getTopicAnalysis(String clusterName,
  248. String topicName,
  249. ServerWebExchange exchange) {
  250. Mono<Void> validateAccess = accessControlService.validateAccess(AccessContext.builder()
  251. .cluster(clusterName)
  252. .topic(topicName)
  253. .topicActions(MESSAGES_READ)
  254. .build());
  255. return validateAccess.thenReturn(topicAnalysisService.getTopicAnalysis(getCluster(clusterName), topicName)
  256. .map(ResponseEntity::ok)
  257. .orElseGet(() -> ResponseEntity.notFound().build()));
  258. }
  259. private Comparator<InternalTopic> getComparatorForTopic(
  260. TopicColumnsToSortDTO orderBy) {
  261. var defaultComparator = Comparator.comparing(InternalTopic::getName);
  262. if (orderBy == null) {
  263. return defaultComparator;
  264. }
  265. switch (orderBy) {
  266. case TOTAL_PARTITIONS:
  267. return Comparator.comparing(InternalTopic::getPartitionCount);
  268. case OUT_OF_SYNC_REPLICAS:
  269. return Comparator.comparing(t -> t.getReplicas() - t.getInSyncReplicas());
  270. case REPLICATION_FACTOR:
  271. return Comparator.comparing(InternalTopic::getReplicationFactor);
  272. case SIZE:
  273. return Comparator.comparing(InternalTopic::getSegmentSize);
  274. case NAME:
  275. default:
  276. return defaultComparator;
  277. }
  278. }
  279. }