TopicsController.java 14 KB


  1. package com.provectus.kafka.ui.controller;
  2. import static com.provectus.kafka.ui.model.rbac.permission.TopicAction.CREATE;
  3. import static com.provectus.kafka.ui.model.rbac.permission.TopicAction.DELETE;
  4. import static com.provectus.kafka.ui.model.rbac.permission.TopicAction.EDIT;
  5. import static com.provectus.kafka.ui.model.rbac.permission.TopicAction.MESSAGES_READ;
  6. import static com.provectus.kafka.ui.model.rbac.permission.TopicAction.VIEW;
  7. import static java.util.stream.Collectors.toList;
  8. import com.provectus.kafka.ui.api.TopicsApi;
  9. import com.provectus.kafka.ui.mapper.ClusterMapper;
  10. import com.provectus.kafka.ui.model.InternalTopic;
  11. import com.provectus.kafka.ui.model.InternalTopicConfig;
  12. import com.provectus.kafka.ui.model.PartitionsIncreaseDTO;
  13. import com.provectus.kafka.ui.model.PartitionsIncreaseResponseDTO;
  14. import com.provectus.kafka.ui.model.ReplicationFactorChangeDTO;
  15. import com.provectus.kafka.ui.model.ReplicationFactorChangeResponseDTO;
  16. import com.provectus.kafka.ui.model.SortOrderDTO;
  17. import com.provectus.kafka.ui.model.TopicAnalysisDTO;
  18. import com.provectus.kafka.ui.model.TopicColumnsToSortDTO;
  19. import com.provectus.kafka.ui.model.TopicConfigDTO;
  20. import com.provectus.kafka.ui.model.TopicCreationDTO;
  21. import com.provectus.kafka.ui.model.TopicDTO;
  22. import com.provectus.kafka.ui.model.TopicDetailsDTO;
  23. import com.provectus.kafka.ui.model.TopicUpdateDTO;
  24. import com.provectus.kafka.ui.model.TopicsResponseDTO;
  25. import com.provectus.kafka.ui.model.rbac.AccessContext;
  26. import com.provectus.kafka.ui.service.TopicsService;
  27. import com.provectus.kafka.ui.service.analyze.TopicAnalysisService;
  28. import com.provectus.kafka.ui.service.audit.AuditService;
  29. import com.provectus.kafka.ui.service.rbac.AccessControlService;
  30. import java.util.Comparator;
  31. import java.util.List;
  32. import java.util.Map;
  33. import javax.validation.Valid;
  34. import lombok.RequiredArgsConstructor;
  35. import lombok.extern.slf4j.Slf4j;
  36. import org.apache.commons.lang3.StringUtils;
  37. import org.springframework.http.HttpStatus;
  38. import org.springframework.http.ResponseEntity;
  39. import org.springframework.web.bind.annotation.RestController;
  40. import org.springframework.web.server.ServerWebExchange;
  41. import reactor.core.publisher.Flux;
  42. import reactor.core.publisher.Mono;
  43. @RestController
  44. @RequiredArgsConstructor
  45. @Slf4j
  46. public class TopicsController extends AbstractController implements TopicsApi {
  47. private static final Integer DEFAULT_PAGE_SIZE = 25;
  48. private final TopicsService topicsService;
  49. private final TopicAnalysisService topicAnalysisService;
  50. private final ClusterMapper clusterMapper;
  51. private final AccessControlService accessControlService;
  52. private final AuditService auditService;
  53. @Override
  54. public Mono<ResponseEntity<TopicDTO>> createTopic(
  55. String clusterName, @Valid Mono<TopicCreationDTO> topicCreationMono, ServerWebExchange exchange) {
  56. return topicCreationMono.flatMap(topicCreation -> {
  57. var context = AccessContext.builder()
  58. .cluster(clusterName)
  59. .topicActions(CREATE)
  60. .operationName("createTopic")
  61. .operationParams(topicCreation)
  62. .build();
  63. return accessControlService.validateAccess(context)
  64. .then(topicsService.createTopic(getCluster(clusterName), topicCreation))
  65. .map(clusterMapper::toTopic)
  66. .map(s -> new ResponseEntity<>(s, HttpStatus.OK))
  67. .switchIfEmpty(Mono.just(ResponseEntity.notFound().build()))
  68. .doOnEach(sig -> auditService.audit(context, sig));
  69. });
  70. }
  71. @Override
  72. public Mono<ResponseEntity<TopicDTO>> recreateTopic(String clusterName,
  73. String topicName, ServerWebExchange exchange) {
  74. var context = AccessContext.builder()
  75. .cluster(clusterName)
  76. .topic(topicName)
  77. .topicActions(VIEW, CREATE, DELETE)
  78. .operationName("recreateTopic")
  79. .build();
  80. return accessControlService.validateAccess(context).then(
  81. topicsService.recreateTopic(getCluster(clusterName), topicName)
  82. .map(clusterMapper::toTopic)
  83. .map(s -> new ResponseEntity<>(s, HttpStatus.CREATED))
  84. ).doOnEach(sig -> auditService.audit(context, sig));
  85. }
  86. @Override
  87. public Mono<ResponseEntity<TopicDTO>> cloneTopic(
  88. String clusterName, String topicName, String newTopicName, ServerWebExchange exchange) {
  89. var context = AccessContext.builder()
  90. .cluster(clusterName)
  91. .topic(topicName)
  92. .topicActions(VIEW, CREATE)
  93. .operationName("cloneTopic")
  94. .operationParams(Map.of("newTopicName", newTopicName))
  95. .build();
  96. return accessControlService.validateAccess(context)
  97. .then(topicsService.cloneTopic(getCluster(clusterName), topicName, newTopicName)
  98. .map(clusterMapper::toTopic)
  99. .map(s -> new ResponseEntity<>(s, HttpStatus.CREATED))
  100. ).doOnEach(sig -> auditService.audit(context, sig));
  101. }
  102. @Override
  103. public Mono<ResponseEntity<Void>> deleteTopic(
  104. String clusterName, String topicName, ServerWebExchange exchange) {
  105. var context = AccessContext.builder()
  106. .cluster(clusterName)
  107. .topic(topicName)
  108. .topicActions(DELETE)
  109. .operationName("deleteTopic")
  110. .build();
  111. return accessControlService.validateAccess(context).then(
  112. topicsService.deleteTopic(getCluster(clusterName), topicName).map(ResponseEntity::ok)
  113. ).doOnEach(sig -> auditService.audit(context, sig));
  114. }
  115. @Override
  116. public Mono<ResponseEntity<Flux<TopicConfigDTO>>> getTopicConfigs(
  117. String clusterName, String topicName, ServerWebExchange exchange) {
  118. var context = AccessContext.builder()
  119. .cluster(clusterName)
  120. .topic(topicName)
  121. .topicActions(VIEW)
  122. .operationName("getTopicConfigs")
  123. .build();
  124. return accessControlService.validateAccess(context).then(
  125. topicsService.getTopicConfigs(getCluster(clusterName), topicName)
  126. .map(lst -> lst.stream()
  127. .map(InternalTopicConfig::from)
  128. .map(clusterMapper::toTopicConfig)
  129. .collect(toList()))
  130. .map(Flux::fromIterable)
  131. .map(ResponseEntity::ok)
  132. ).doOnEach(sig -> auditService.audit(context, sig));
  133. }
  134. @Override
  135. public Mono<ResponseEntity<TopicDetailsDTO>> getTopicDetails(
  136. String clusterName, String topicName, ServerWebExchange exchange) {
  137. var context = AccessContext.builder()
  138. .cluster(clusterName)
  139. .topic(topicName)
  140. .topicActions(VIEW)
  141. .operationName("getTopicDetails")
  142. .build();
  143. return accessControlService.validateAccess(context).then(
  144. topicsService.getTopicDetails(getCluster(clusterName), topicName)
  145. .map(clusterMapper::toTopicDetails)
  146. .map(ResponseEntity::ok)
  147. ).doOnEach(sig -> auditService.audit(context, sig));
  148. }
  149. @Override
  150. public Mono<ResponseEntity<TopicsResponseDTO>> getTopics(String clusterName,
  151. @Valid Integer page,
  152. @Valid Integer perPage,
  153. @Valid Boolean showInternal,
  154. @Valid String search,
  155. @Valid TopicColumnsToSortDTO orderBy,
  156. @Valid SortOrderDTO sortOrder,
  157. ServerWebExchange exchange) {
  158. AccessContext context = AccessContext.builder()
  159. .cluster(clusterName)
  160. .operationName("getTopics")
  161. .build();
  162. return topicsService.getTopicsForPagination(getCluster(clusterName))
  163. .flatMap(topics -> accessControlService.filterViewableTopics(topics, clusterName))
  164. .flatMap(topics -> {
  165. int pageSize = perPage != null && perPage > 0 ? perPage : DEFAULT_PAGE_SIZE;
  166. var topicsToSkip = ((page != null && page > 0 ? page : 1) - 1) * pageSize;
  167. var comparator = sortOrder == null || !sortOrder.equals(SortOrderDTO.DESC)
  168. ? getComparatorForTopic(orderBy) : getComparatorForTopic(orderBy).reversed();
  169. List<InternalTopic> filtered = topics.stream()
  170. .filter(topic -> !topic.isInternal()
  171. || showInternal != null && showInternal)
  172. .filter(topic -> search == null || StringUtils.containsIgnoreCase(topic.getName(), search))
  173. .sorted(comparator)
  174. .toList();
  175. var totalPages = (filtered.size() / pageSize)
  176. + (filtered.size() % pageSize == 0 ? 0 : 1);
  177. List<String> topicsPage = filtered.stream()
  178. .skip(topicsToSkip)
  179. .limit(pageSize)
  180. .map(InternalTopic::getName)
  181. .collect(toList());
  182. return topicsService.loadTopics(getCluster(clusterName), topicsPage)
  183. .map(topicsToRender ->
  184. new TopicsResponseDTO()
  185. .topics(topicsToRender.stream().map(clusterMapper::toTopic).collect(toList()))
  186. .pageCount(totalPages));
  187. })
  188. .map(ResponseEntity::ok)
  189. .doOnEach(sig -> auditService.audit(context, sig));
  190. }
  191. @Override
  192. public Mono<ResponseEntity<TopicDTO>> updateTopic(
  193. String clusterName, String topicName, @Valid Mono<TopicUpdateDTO> topicUpdate,
  194. ServerWebExchange exchange) {
  195. var context = AccessContext.builder()
  196. .cluster(clusterName)
  197. .topic(topicName)
  198. .topicActions(VIEW, EDIT)
  199. .operationName("updateTopic")
  200. .build();
  201. return accessControlService.validateAccess(context).then(
  202. topicsService
  203. .updateTopic(getCluster(clusterName), topicName, topicUpdate)
  204. .map(clusterMapper::toTopic)
  205. .map(ResponseEntity::ok)
  206. ).doOnEach(sig -> auditService.audit(context, sig));
  207. }
  208. @Override
  209. public Mono<ResponseEntity<PartitionsIncreaseResponseDTO>> increaseTopicPartitions(
  210. String clusterName, String topicName,
  211. Mono<PartitionsIncreaseDTO> partitionsIncrease,
  212. ServerWebExchange exchange) {
  213. var context = AccessContext.builder()
  214. .cluster(clusterName)
  215. .topic(topicName)
  216. .topicActions(VIEW, EDIT)
  217. .build();
  218. return accessControlService.validateAccess(context).then(
  219. partitionsIncrease.flatMap(partitions ->
  220. topicsService.increaseTopicPartitions(getCluster(clusterName), topicName, partitions)
  221. ).map(ResponseEntity::ok)
  222. ).doOnEach(sig -> auditService.audit(context, sig));
  223. }
  224. @Override
  225. public Mono<ResponseEntity<ReplicationFactorChangeResponseDTO>> changeReplicationFactor(
  226. String clusterName, String topicName,
  227. Mono<ReplicationFactorChangeDTO> replicationFactorChange,
  228. ServerWebExchange exchange) {
  229. var context = AccessContext.builder()
  230. .cluster(clusterName)
  231. .topic(topicName)
  232. .topicActions(VIEW, EDIT)
  233. .operationName("changeReplicationFactor")
  234. .build();
  235. return accessControlService.validateAccess(context).then(
  236. replicationFactorChange
  237. .flatMap(rfc ->
  238. topicsService.changeReplicationFactor(getCluster(clusterName), topicName, rfc))
  239. .map(ResponseEntity::ok)
  240. ).doOnEach(sig -> auditService.audit(context, sig));
  241. }
  242. @Override
  243. public Mono<ResponseEntity<Void>> analyzeTopic(String clusterName, String topicName, ServerWebExchange exchange) {
  244. var context = AccessContext.builder()
  245. .cluster(clusterName)
  246. .topic(topicName)
  247. .topicActions(MESSAGES_READ)
  248. .operationName("analyzeTopic")
  249. .build();
  250. return accessControlService.validateAccess(context).then(
  251. topicAnalysisService.analyze(getCluster(clusterName), topicName)
  252. .doOnEach(sig -> auditService.audit(context, sig))
  253. .thenReturn(ResponseEntity.ok().build())
  254. );
  255. }
  256. @Override
  257. public Mono<ResponseEntity<Void>> cancelTopicAnalysis(String clusterName, String topicName,
  258. ServerWebExchange exchange) {
  259. var context = AccessContext.builder()
  260. .cluster(clusterName)
  261. .topic(topicName)
  262. .topicActions(MESSAGES_READ)
  263. .operationName("cancelTopicAnalysis")
  264. .build();
  265. return accessControlService.validateAccess(context)
  266. .then(Mono.fromRunnable(() -> topicAnalysisService.cancelAnalysis(getCluster(clusterName), topicName)))
  267. .doOnEach(sig -> auditService.audit(context, sig))
  268. .thenReturn(ResponseEntity.ok().build());
  269. }
  270. @Override
  271. public Mono<ResponseEntity<TopicAnalysisDTO>> getTopicAnalysis(String clusterName,
  272. String topicName,
  273. ServerWebExchange exchange) {
  274. var context = AccessContext.builder()
  275. .cluster(clusterName)
  276. .topic(topicName)
  277. .topicActions(MESSAGES_READ)
  278. .operationName("getTopicAnalysis")
  279. .build();
  280. return accessControlService.validateAccess(context)
  281. .thenReturn(topicAnalysisService.getTopicAnalysis(getCluster(clusterName), topicName)
  282. .map(ResponseEntity::ok)
  283. .orElseGet(() -> ResponseEntity.notFound().build()))
  284. .doOnEach(sig -> auditService.audit(context, sig));
  285. }
  286. private Comparator<InternalTopic> getComparatorForTopic(
  287. TopicColumnsToSortDTO orderBy) {
  288. var defaultComparator = Comparator.comparing(InternalTopic::getName);
  289. if (orderBy == null) {
  290. return defaultComparator;
  291. }
  292. switch (orderBy) {
  293. case TOTAL_PARTITIONS:
  294. return Comparator.comparing(InternalTopic::getPartitionCount);
  295. case OUT_OF_SYNC_REPLICAS:
  296. return Comparator.comparing(t -> t.getReplicas() - t.getInSyncReplicas());
  297. case REPLICATION_FACTOR:
  298. return Comparator.comparing(InternalTopic::getReplicationFactor);
  299. case SIZE:
  300. return Comparator.comparing(InternalTopic::getSegmentSize);
  301. case NAME:
  302. default:
  303. return defaultComparator;
  304. }
  305. }
  306. }