KafkaConnectController.java 11 KB


  1. package com.provectus.kafka.ui.controller;
  2. import com.provectus.kafka.ui.api.KafkaConnectApi;
  3. import com.provectus.kafka.ui.model.ConnectDTO;
  4. import com.provectus.kafka.ui.model.ConnectorActionDTO;
  5. import com.provectus.kafka.ui.model.ConnectorColumnsToSortDTO;
  6. import com.provectus.kafka.ui.model.ConnectorDTO;
  7. import com.provectus.kafka.ui.model.ConnectorPluginConfigValidationResponseDTO;
  8. import com.provectus.kafka.ui.model.ConnectorPluginDTO;
  9. import com.provectus.kafka.ui.model.FullConnectorInfoDTO;
  10. import com.provectus.kafka.ui.model.NewConnectorDTO;
  11. import com.provectus.kafka.ui.model.SortOrderDTO;
  12. import com.provectus.kafka.ui.model.TaskDTO;
  13. import com.provectus.kafka.ui.model.rbac.AccessContext;
  14. import com.provectus.kafka.ui.model.rbac.permission.ConnectAction;
  15. import com.provectus.kafka.ui.service.KafkaConnectService;
  16. import com.provectus.kafka.ui.service.rbac.AccessControlService;
  17. import java.util.Comparator;
  18. import java.util.Map;
  19. import javax.validation.Valid;
  20. import lombok.RequiredArgsConstructor;
  21. import lombok.extern.slf4j.Slf4j;
  22. import org.springframework.http.ResponseEntity;
  23. import org.springframework.web.bind.annotation.RestController;
  24. import org.springframework.web.server.ServerWebExchange;
  25. import reactor.core.publisher.Flux;
  26. import reactor.core.publisher.Mono;
  27. @RestController
  28. @RequiredArgsConstructor
  29. @Slf4j
  30. public class KafkaConnectController extends AbstractController implements KafkaConnectApi {
  31. private final KafkaConnectService kafkaConnectService;
  32. private final AccessControlService accessControlService;
  33. @Override
  34. public Mono<ResponseEntity<Flux<ConnectDTO>>> getConnects(String clusterName,
  35. ServerWebExchange exchange) {
  36. Flux<ConnectDTO> availableConnects = kafkaConnectService.getConnects(getCluster(clusterName))
  37. .filterWhen(dto -> accessControlService.isConnectAccessible(dto, clusterName));
  38. return Mono.just(ResponseEntity.ok(availableConnects));
  39. }
  40. @Override
  41. public Mono<ResponseEntity<Flux<String>>> getConnectors(String clusterName, String connectName,
  42. ServerWebExchange exchange) {
  43. Mono<Void> validateAccess = accessControlService.validateAccess(AccessContext.builder()
  44. .cluster(clusterName)
  45. .connect(connectName)
  46. .connectActions(ConnectAction.VIEW)
  47. .build());
  48. return validateAccess.thenReturn(
  49. ResponseEntity.ok(kafkaConnectService.getConnectorNames(getCluster(clusterName), connectName))
  50. );
  51. }
  52. @Override
  53. public Mono<ResponseEntity<ConnectorDTO>> createConnector(String clusterName, String connectName,
  54. @Valid Mono<NewConnectorDTO> connector,
  55. ServerWebExchange exchange) {
  56. Mono<Void> validateAccess = accessControlService.validateAccess(AccessContext.builder()
  57. .cluster(clusterName)
  58. .connect(connectName)
  59. .connectActions(ConnectAction.VIEW, ConnectAction.CREATE)
  60. .build());
  61. return validateAccess.then(
  62. kafkaConnectService.createConnector(getCluster(clusterName), connectName, connector)
  63. .map(ResponseEntity::ok)
  64. );
  65. }
  66. @Override
  67. public Mono<ResponseEntity<ConnectorDTO>> getConnector(String clusterName, String connectName,
  68. String connectorName,
  69. ServerWebExchange exchange) {
  70. Mono<Void> validateAccess = accessControlService.validateAccess(AccessContext.builder()
  71. .cluster(clusterName)
  72. .connect(connectName)
  73. .connectActions(ConnectAction.VIEW)
  74. .connector(connectorName)
  75. .build());
  76. return validateAccess.then(
  77. kafkaConnectService.getConnector(getCluster(clusterName), connectName, connectorName)
  78. .map(ResponseEntity::ok)
  79. );
  80. }
  81. @Override
  82. public Mono<ResponseEntity<Void>> deleteConnector(String clusterName, String connectName,
  83. String connectorName,
  84. ServerWebExchange exchange) {
  85. Mono<Void> validateAccess = accessControlService.validateAccess(AccessContext.builder()
  86. .cluster(clusterName)
  87. .connect(connectName)
  88. .connectActions(ConnectAction.VIEW, ConnectAction.EDIT)
  89. .build());
  90. return validateAccess.then(
  91. kafkaConnectService.deleteConnector(getCluster(clusterName), connectName, connectorName)
  92. .map(ResponseEntity::ok)
  93. );
  94. }
  95. @Override
  96. public Mono<ResponseEntity<Flux<FullConnectorInfoDTO>>> getAllConnectors(
  97. String clusterName,
  98. String search,
  99. ConnectorColumnsToSortDTO orderBy,
  100. SortOrderDTO sortOrder,
  101. ServerWebExchange exchange
  102. ) {
  103. var comparator = sortOrder == null || sortOrder.equals(SortOrderDTO.ASC)
  104. ? getConnectorsComparator(orderBy)
  105. : getConnectorsComparator(orderBy).reversed();
  106. Flux<FullConnectorInfoDTO> job = kafkaConnectService.getAllConnectors(getCluster(clusterName), search)
  107. .filterWhen(dto -> accessControlService.isConnectAccessible(dto.getConnect(), clusterName))
  108. .filterWhen(dto -> accessControlService.isConnectorAccessible(dto.getConnect(), dto.getName(), clusterName));
  109. return Mono.just(ResponseEntity.ok(job.sort(comparator)));
  110. }
  111. @Override
  112. public Mono<ResponseEntity<Map<String, Object>>> getConnectorConfig(String clusterName,
  113. String connectName,
  114. String connectorName,
  115. ServerWebExchange exchange) {
  116. Mono<Void> validateAccess = accessControlService.validateAccess(AccessContext.builder()
  117. .cluster(clusterName)
  118. .connect(connectName)
  119. .connectActions(ConnectAction.VIEW)
  120. .build());
  121. return validateAccess.then(
  122. kafkaConnectService
  123. .getConnectorConfig(getCluster(clusterName), connectName, connectorName)
  124. .map(ResponseEntity::ok)
  125. );
  126. }
  127. @Override
  128. public Mono<ResponseEntity<ConnectorDTO>> setConnectorConfig(String clusterName, String connectName,
  129. String connectorName,
  130. Mono<Map<String, Object>> requestBody,
  131. ServerWebExchange exchange) {
  132. Mono<Void> validateAccess = accessControlService.validateAccess(AccessContext.builder()
  133. .cluster(clusterName)
  134. .connect(connectName)
  135. .connectActions(ConnectAction.VIEW, ConnectAction.EDIT)
  136. .build());
  137. return validateAccess.then(
  138. kafkaConnectService
  139. .setConnectorConfig(getCluster(clusterName), connectName, connectorName, requestBody)
  140. .map(ResponseEntity::ok));
  141. }
  142. @Override
  143. public Mono<ResponseEntity<Void>> updateConnectorState(String clusterName, String connectName,
  144. String connectorName,
  145. ConnectorActionDTO action,
  146. ServerWebExchange exchange) {
  147. Mono<Void> validateAccess = accessControlService.validateAccess(AccessContext.builder()
  148. .cluster(clusterName)
  149. .connect(connectName)
  150. .connectActions(ConnectAction.VIEW, ConnectAction.EDIT)
  151. .build());
  152. return validateAccess.then(
  153. kafkaConnectService
  154. .updateConnectorState(getCluster(clusterName), connectName, connectorName, action)
  155. .map(ResponseEntity::ok)
  156. );
  157. }
  158. @Override
  159. public Mono<ResponseEntity<Flux<TaskDTO>>> getConnectorTasks(String clusterName,
  160. String connectName,
  161. String connectorName,
  162. ServerWebExchange exchange) {
  163. Mono<Void> validateAccess = accessControlService.validateAccess(AccessContext.builder()
  164. .cluster(clusterName)
  165. .connect(connectName)
  166. .connectActions(ConnectAction.VIEW)
  167. .build());
  168. return validateAccess.thenReturn(
  169. ResponseEntity
  170. .ok(kafkaConnectService
  171. .getConnectorTasks(getCluster(clusterName), connectName, connectorName))
  172. );
  173. }
  174. @Override
  175. public Mono<ResponseEntity<Void>> restartConnectorTask(String clusterName, String connectName,
  176. String connectorName, Integer taskId,
  177. ServerWebExchange exchange) {
  178. Mono<Void> validateAccess = accessControlService.validateAccess(AccessContext.builder()
  179. .cluster(clusterName)
  180. .connect(connectName)
  181. .connectActions(ConnectAction.VIEW, ConnectAction.RESTART)
  182. .build());
  183. return validateAccess.then(
  184. kafkaConnectService
  185. .restartConnectorTask(getCluster(clusterName), connectName, connectorName, taskId)
  186. .map(ResponseEntity::ok)
  187. );
  188. }
  189. @Override
  190. public Mono<ResponseEntity<Flux<ConnectorPluginDTO>>> getConnectorPlugins(
  191. String clusterName, String connectName, ServerWebExchange exchange) {
  192. Mono<Void> validateAccess = accessControlService.validateAccess(AccessContext.builder()
  193. .cluster(clusterName)
  194. .connect(connectName)
  195. .connectActions(ConnectAction.VIEW)
  196. .build());
  197. return validateAccess.then(
  198. Mono.just(
  199. ResponseEntity.ok(
  200. kafkaConnectService.getConnectorPlugins(getCluster(clusterName), connectName)))
  201. );
  202. }
  203. @Override
  204. public Mono<ResponseEntity<ConnectorPluginConfigValidationResponseDTO>> validateConnectorPluginConfig(
  205. String clusterName, String connectName, String pluginName, @Valid Mono<Map<String, Object>> requestBody,
  206. ServerWebExchange exchange) {
  207. return kafkaConnectService
  208. .validateConnectorPluginConfig(
  209. getCluster(clusterName), connectName, pluginName, requestBody)
  210. .map(ResponseEntity::ok);
  211. }
  212. private Comparator<FullConnectorInfoDTO> getConnectorsComparator(ConnectorColumnsToSortDTO orderBy) {
  213. var defaultComparator = Comparator.comparing(FullConnectorInfoDTO::getName);
  214. if (orderBy == null) {
  215. return defaultComparator;
  216. }
  217. switch (orderBy) {
  218. case CONNECT:
  219. return Comparator.comparing(FullConnectorInfoDTO::getConnect);
  220. case TYPE:
  221. return Comparator.comparing(FullConnectorInfoDTO::getType);
  222. case STATUS:
  223. return Comparator.comparing(fullConnectorInfoDTO -> fullConnectorInfoDTO.getStatus().getState());
  224. case NAME:
  225. default:
  226. return defaultComparator;
  227. }
  228. }
  229. }