KafkaConnectController.java 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301
  1. package com.provectus.kafka.ui.controller;
  2. import static com.provectus.kafka.ui.model.ConnectorActionDTO.RESTART;
  3. import static com.provectus.kafka.ui.model.ConnectorActionDTO.RESTART_ALL_TASKS;
  4. import static com.provectus.kafka.ui.model.ConnectorActionDTO.RESTART_FAILED_TASKS;
  5. import com.provectus.kafka.ui.api.KafkaConnectApi;
  6. import com.provectus.kafka.ui.model.ConnectDTO;
  7. import com.provectus.kafka.ui.model.ConnectorActionDTO;
  8. import com.provectus.kafka.ui.model.ConnectorColumnsToSortDTO;
  9. import com.provectus.kafka.ui.model.ConnectorDTO;
  10. import com.provectus.kafka.ui.model.ConnectorPluginConfigValidationResponseDTO;
  11. import com.provectus.kafka.ui.model.ConnectorPluginDTO;
  12. import com.provectus.kafka.ui.model.FullConnectorInfoDTO;
  13. import com.provectus.kafka.ui.model.NewConnectorDTO;
  14. import com.provectus.kafka.ui.model.SortOrderDTO;
  15. import com.provectus.kafka.ui.model.TaskDTO;
  16. import com.provectus.kafka.ui.model.rbac.AccessContext;
  17. import com.provectus.kafka.ui.model.rbac.permission.ConnectAction;
  18. import com.provectus.kafka.ui.service.KafkaConnectService;
  19. import java.util.Comparator;
  20. import java.util.Map;
  21. import java.util.Set;
  22. import javax.validation.Valid;
  23. import lombok.RequiredArgsConstructor;
  24. import lombok.extern.slf4j.Slf4j;
  25. import org.springframework.http.ResponseEntity;
  26. import org.springframework.web.bind.annotation.RestController;
  27. import org.springframework.web.server.ServerWebExchange;
  28. import reactor.core.publisher.Flux;
  29. import reactor.core.publisher.Mono;
  30. @RestController
  31. @RequiredArgsConstructor
  32. @Slf4j
  33. public class KafkaConnectController extends AbstractController implements KafkaConnectApi {
  34. private static final Set<ConnectorActionDTO> RESTART_ACTIONS
  35. = Set.of(RESTART, RESTART_FAILED_TASKS, RESTART_ALL_TASKS);
  36. private static final String CONNECTOR_NAME = "connectorName";
  37. private final KafkaConnectService kafkaConnectService;
  38. @Override
  39. public Mono<ResponseEntity<Flux<ConnectDTO>>> getConnects(String clusterName,
  40. ServerWebExchange exchange) {
  41. Flux<ConnectDTO> availableConnects = kafkaConnectService.getConnects(getCluster(clusterName))
  42. .filterWhen(dto -> accessControlService.isConnectAccessible(dto, clusterName));
  43. return Mono.just(ResponseEntity.ok(availableConnects));
  44. }
  45. @Override
  46. public Mono<ResponseEntity<Flux<String>>> getConnectors(String clusterName, String connectName,
  47. ServerWebExchange exchange) {
  48. var context = AccessContext.builder()
  49. .cluster(clusterName)
  50. .connect(connectName)
  51. .connectActions(ConnectAction.VIEW)
  52. .operationName("getConnectors")
  53. .build();
  54. return validateAccess(context)
  55. .thenReturn(ResponseEntity.ok(kafkaConnectService.getConnectorNames(getCluster(clusterName), connectName)))
  56. .doOnEach(sig -> audit(context, sig));
  57. }
  58. @Override
  59. public Mono<ResponseEntity<ConnectorDTO>> createConnector(String clusterName, String connectName,
  60. @Valid Mono<NewConnectorDTO> connector,
  61. ServerWebExchange exchange) {
  62. var context = AccessContext.builder()
  63. .cluster(clusterName)
  64. .connect(connectName)
  65. .connectActions(ConnectAction.VIEW, ConnectAction.CREATE)
  66. .operationName("createConnector")
  67. .build();
  68. return validateAccess(context).then(
  69. kafkaConnectService.createConnector(getCluster(clusterName), connectName, connector)
  70. .map(ResponseEntity::ok)
  71. ).doOnEach(sig -> audit(context, sig));
  72. }
  73. @Override
  74. public Mono<ResponseEntity<ConnectorDTO>> getConnector(String clusterName, String connectName,
  75. String connectorName,
  76. ServerWebExchange exchange) {
  77. var context = AccessContext.builder()
  78. .cluster(clusterName)
  79. .connect(connectName)
  80. .connectActions(ConnectAction.VIEW)
  81. .connector(connectorName)
  82. .operationName("getConnector")
  83. .build();
  84. return validateAccess(context).then(
  85. kafkaConnectService.getConnector(getCluster(clusterName), connectName, connectorName)
  86. .map(ResponseEntity::ok)
  87. ).doOnEach(sig -> audit(context, sig));
  88. }
  89. @Override
  90. public Mono<ResponseEntity<Void>> deleteConnector(String clusterName, String connectName,
  91. String connectorName,
  92. ServerWebExchange exchange) {
  93. var context = AccessContext.builder()
  94. .cluster(clusterName)
  95. .connect(connectName)
  96. .connectActions(ConnectAction.VIEW, ConnectAction.EDIT)
  97. .operationName("deleteConnector")
  98. .operationParams(Map.of(CONNECTOR_NAME, connectName))
  99. .build();
  100. return validateAccess(context).then(
  101. kafkaConnectService.deleteConnector(getCluster(clusterName), connectName, connectorName)
  102. .map(ResponseEntity::ok)
  103. ).doOnEach(sig -> audit(context, sig));
  104. }
  105. @Override
  106. public Mono<ResponseEntity<Flux<FullConnectorInfoDTO>>> getAllConnectors(
  107. String clusterName,
  108. String search,
  109. ConnectorColumnsToSortDTO orderBy,
  110. SortOrderDTO sortOrder,
  111. ServerWebExchange exchange
  112. ) {
  113. var context = AccessContext.builder()
  114. .cluster(clusterName)
  115. .connectActions(ConnectAction.VIEW, ConnectAction.EDIT)
  116. .operationName("getAllConnectors")
  117. .build();
  118. var comparator = sortOrder == null || sortOrder.equals(SortOrderDTO.ASC)
  119. ? getConnectorsComparator(orderBy)
  120. : getConnectorsComparator(orderBy).reversed();
  121. Flux<FullConnectorInfoDTO> job = kafkaConnectService.getAllConnectors(getCluster(clusterName), search)
  122. .filterWhen(dto -> accessControlService.isConnectAccessible(dto.getConnect(), clusterName))
  123. .filterWhen(dto -> accessControlService.isConnectorAccessible(dto.getConnect(), dto.getName(), clusterName))
  124. .sort(comparator);
  125. return Mono.just(ResponseEntity.ok(job))
  126. .doOnEach(sig -> audit(context, sig));
  127. }
  128. @Override
  129. public Mono<ResponseEntity<Map<String, Object>>> getConnectorConfig(String clusterName,
  130. String connectName,
  131. String connectorName,
  132. ServerWebExchange exchange) {
  133. var context = AccessContext.builder()
  134. .cluster(clusterName)
  135. .connect(connectName)
  136. .connectActions(ConnectAction.VIEW)
  137. .operationName("getConnectorConfig")
  138. .build();
  139. return validateAccess(context).then(
  140. kafkaConnectService
  141. .getConnectorConfig(getCluster(clusterName), connectName, connectorName)
  142. .map(ResponseEntity::ok)
  143. ).doOnEach(sig -> audit(context, sig));
  144. }
  145. @Override
  146. public Mono<ResponseEntity<ConnectorDTO>> setConnectorConfig(String clusterName, String connectName,
  147. String connectorName,
  148. Mono<Map<String, Object>> requestBody,
  149. ServerWebExchange exchange) {
  150. var context = AccessContext.builder()
  151. .cluster(clusterName)
  152. .connect(connectName)
  153. .connectActions(ConnectAction.VIEW, ConnectAction.EDIT)
  154. .operationName("setConnectorConfig")
  155. .operationParams(Map.of(CONNECTOR_NAME, connectorName))
  156. .build();
  157. return validateAccess(context).then(
  158. kafkaConnectService
  159. .setConnectorConfig(getCluster(clusterName), connectName, connectorName, requestBody)
  160. .map(ResponseEntity::ok))
  161. .doOnEach(sig -> audit(context, sig));
  162. }
  163. @Override
  164. public Mono<ResponseEntity<Void>> updateConnectorState(String clusterName, String connectName,
  165. String connectorName,
  166. ConnectorActionDTO action,
  167. ServerWebExchange exchange) {
  168. ConnectAction[] connectActions;
  169. if (RESTART_ACTIONS.contains(action)) {
  170. connectActions = new ConnectAction[] {ConnectAction.VIEW, ConnectAction.RESTART};
  171. } else {
  172. connectActions = new ConnectAction[] {ConnectAction.VIEW, ConnectAction.EDIT};
  173. }
  174. var context = AccessContext.builder()
  175. .cluster(clusterName)
  176. .connect(connectName)
  177. .connectActions(connectActions)
  178. .operationName("updateConnectorState")
  179. .operationParams(Map.of(CONNECTOR_NAME, connectorName))
  180. .build();
  181. return validateAccess(context).then(
  182. kafkaConnectService
  183. .updateConnectorState(getCluster(clusterName), connectName, connectorName, action)
  184. .map(ResponseEntity::ok)
  185. ).doOnEach(sig -> audit(context, sig));
  186. }
  187. @Override
  188. public Mono<ResponseEntity<Flux<TaskDTO>>> getConnectorTasks(String clusterName,
  189. String connectName,
  190. String connectorName,
  191. ServerWebExchange exchange) {
  192. var context = AccessContext.builder()
  193. .cluster(clusterName)
  194. .connect(connectName)
  195. .connectActions(ConnectAction.VIEW)
  196. .operationName("getConnectorTasks")
  197. .operationParams(Map.of(CONNECTOR_NAME, connectorName))
  198. .build();
  199. return validateAccess(context).thenReturn(
  200. ResponseEntity
  201. .ok(kafkaConnectService
  202. .getConnectorTasks(getCluster(clusterName), connectName, connectorName))
  203. ).doOnEach(sig -> audit(context, sig));
  204. }
  205. @Override
  206. public Mono<ResponseEntity<Void>> restartConnectorTask(String clusterName, String connectName,
  207. String connectorName, Integer taskId,
  208. ServerWebExchange exchange) {
  209. var context = AccessContext.builder()
  210. .cluster(clusterName)
  211. .connect(connectName)
  212. .connectActions(ConnectAction.VIEW, ConnectAction.RESTART)
  213. .operationName("restartConnectorTask")
  214. .operationParams(Map.of(CONNECTOR_NAME, connectorName))
  215. .build();
  216. return validateAccess(context).then(
  217. kafkaConnectService
  218. .restartConnectorTask(getCluster(clusterName), connectName, connectorName, taskId)
  219. .map(ResponseEntity::ok)
  220. ).doOnEach(sig -> audit(context, sig));
  221. }
  222. @Override
  223. public Mono<ResponseEntity<Flux<ConnectorPluginDTO>>> getConnectorPlugins(
  224. String clusterName, String connectName, ServerWebExchange exchange) {
  225. var context = AccessContext.builder()
  226. .cluster(clusterName)
  227. .connect(connectName)
  228. .connectActions(ConnectAction.VIEW)
  229. .operationName("getConnectorPlugins")
  230. .build();
  231. return validateAccess(context).then(
  232. Mono.just(
  233. ResponseEntity.ok(
  234. kafkaConnectService.getConnectorPlugins(getCluster(clusterName), connectName)))
  235. ).doOnEach(sig -> audit(context, sig));
  236. }
  237. @Override
  238. public Mono<ResponseEntity<ConnectorPluginConfigValidationResponseDTO>> validateConnectorPluginConfig(
  239. String clusterName, String connectName, String pluginName, @Valid Mono<Map<String, Object>> requestBody,
  240. ServerWebExchange exchange) {
  241. return kafkaConnectService
  242. .validateConnectorPluginConfig(
  243. getCluster(clusterName), connectName, pluginName, requestBody)
  244. .map(ResponseEntity::ok);
  245. }
  246. private Comparator<FullConnectorInfoDTO> getConnectorsComparator(ConnectorColumnsToSortDTO orderBy) {
  247. var defaultComparator = Comparator.comparing(FullConnectorInfoDTO::getName);
  248. if (orderBy == null) {
  249. return defaultComparator;
  250. }
  251. return switch (orderBy) {
  252. case CONNECT -> Comparator.comparing(FullConnectorInfoDTO::getConnect);
  253. case TYPE -> Comparator.comparing(FullConnectorInfoDTO::getType);
  254. case STATUS -> Comparator.comparing(fullConnectorInfoDTO -> fullConnectorInfoDTO.getStatus().getState());
  255. default -> defaultComparator;
  256. };
  257. }
  258. }