KafkaConnectController.java 13 KB


  1. package com.provectus.kafka.ui.controller;
  2. import static com.provectus.kafka.ui.model.ConnectorActionDTO.RESTART;
  3. import static com.provectus.kafka.ui.model.ConnectorActionDTO.RESTART_ALL_TASKS;
  4. import static com.provectus.kafka.ui.model.ConnectorActionDTO.RESTART_FAILED_TASKS;
  5. import com.provectus.kafka.ui.api.KafkaConnectApi;
  6. import com.provectus.kafka.ui.model.ConnectDTO;
  7. import com.provectus.kafka.ui.model.ConnectorActionDTO;
  8. import com.provectus.kafka.ui.model.ConnectorColumnsToSortDTO;
  9. import com.provectus.kafka.ui.model.ConnectorDTO;
  10. import com.provectus.kafka.ui.model.ConnectorPluginConfigValidationResponseDTO;
  11. import com.provectus.kafka.ui.model.ConnectorPluginDTO;
  12. import com.provectus.kafka.ui.model.FullConnectorInfoDTO;
  13. import com.provectus.kafka.ui.model.NewConnectorDTO;
  14. import com.provectus.kafka.ui.model.SortOrderDTO;
  15. import com.provectus.kafka.ui.model.TaskDTO;
  16. import com.provectus.kafka.ui.model.rbac.AccessContext;
  17. import com.provectus.kafka.ui.model.rbac.permission.ConnectAction;
  18. import com.provectus.kafka.ui.service.KafkaConnectService;
  19. import com.provectus.kafka.ui.service.audit.AuditService;
  20. import com.provectus.kafka.ui.service.rbac.AccessControlService;
  21. import java.util.Comparator;
  22. import java.util.Map;
  23. import java.util.Set;
  24. import javax.validation.Valid;
  25. import lombok.RequiredArgsConstructor;
  26. import lombok.extern.slf4j.Slf4j;
  27. import org.springframework.http.ResponseEntity;
  28. import org.springframework.web.bind.annotation.RestController;
  29. import org.springframework.web.server.ServerWebExchange;
  30. import reactor.core.publisher.Flux;
  31. import reactor.core.publisher.Mono;
  32. @RestController
  33. @RequiredArgsConstructor
  34. @Slf4j
  35. public class KafkaConnectController extends AbstractController implements KafkaConnectApi {
  36. private static final Set<ConnectorActionDTO> RESTART_ACTIONS
  37. = Set.of(RESTART, RESTART_FAILED_TASKS, RESTART_ALL_TASKS);
  38. private final KafkaConnectService kafkaConnectService;
  39. private final AccessControlService accessControlService;
  40. private final AuditService auditService;
  41. @Override
  42. public Mono<ResponseEntity<Flux<ConnectDTO>>> getConnects(String clusterName,
  43. ServerWebExchange exchange) {
  44. Flux<ConnectDTO> availableConnects = kafkaConnectService.getConnects(getCluster(clusterName))
  45. .filterWhen(dto -> accessControlService.isConnectAccessible(dto, clusterName));
  46. return Mono.just(ResponseEntity.ok(availableConnects));
  47. }
  48. @Override
  49. public Mono<ResponseEntity<Flux<String>>> getConnectors(String clusterName, String connectName,
  50. ServerWebExchange exchange) {
  51. var context = AccessContext.builder()
  52. .cluster(clusterName)
  53. .connect(connectName)
  54. .connectActions(ConnectAction.VIEW)
  55. .operationName("getConnectors")
  56. .build();
  57. return accessControlService.validateAccess(context)
  58. .thenReturn(ResponseEntity.ok(kafkaConnectService.getConnectorNames(getCluster(clusterName), connectName)))
  59. .doOnEach(sig -> auditService.audit(context, sig));
  60. }
  61. @Override
  62. public Mono<ResponseEntity<ConnectorDTO>> createConnector(String clusterName, String connectName,
  63. @Valid Mono<NewConnectorDTO> connector,
  64. ServerWebExchange exchange) {
  65. var context = AccessContext.builder()
  66. .cluster(clusterName)
  67. .connect(connectName)
  68. .connectActions(ConnectAction.VIEW, ConnectAction.CREATE)
  69. .operationName("createConnector")
  70. .build();
  71. return accessControlService.validateAccess(context).then(
  72. kafkaConnectService.createConnector(getCluster(clusterName), connectName, connector)
  73. .map(ResponseEntity::ok)
  74. ).doOnEach(sig -> auditService.audit(context, sig));
  75. }
  76. @Override
  77. public Mono<ResponseEntity<ConnectorDTO>> getConnector(String clusterName, String connectName,
  78. String connectorName,
  79. ServerWebExchange exchange) {
  80. var context = AccessContext.builder()
  81. .cluster(clusterName)
  82. .connect(connectName)
  83. .connectActions(ConnectAction.VIEW)
  84. .connector(connectorName)
  85. .operationName("getConnector")
  86. .build();
  87. return accessControlService.validateAccess(context).then(
  88. kafkaConnectService.getConnector(getCluster(clusterName), connectName, connectorName)
  89. .map(ResponseEntity::ok)
  90. ).doOnEach(sig -> auditService.audit(context, sig));
  91. }
  92. @Override
  93. public Mono<ResponseEntity<Void>> deleteConnector(String clusterName, String connectName,
  94. String connectorName,
  95. ServerWebExchange exchange) {
  96. var context = AccessContext.builder()
  97. .cluster(clusterName)
  98. .connect(connectName)
  99. .connectActions(ConnectAction.VIEW, ConnectAction.EDIT)
  100. .operationName("deleteConnector")
  101. .operationParams(Map.of("connectorName", connectName))
  102. .build();
  103. return accessControlService.validateAccess(context).then(
  104. kafkaConnectService.deleteConnector(getCluster(clusterName), connectName, connectorName)
  105. .map(ResponseEntity::ok)
  106. ).doOnEach(sig -> auditService.audit(context, sig));
  107. }
  108. @Override
  109. public Mono<ResponseEntity<Flux<FullConnectorInfoDTO>>> getAllConnectors(
  110. String clusterName,
  111. String search,
  112. ConnectorColumnsToSortDTO orderBy,
  113. SortOrderDTO sortOrder,
  114. ServerWebExchange exchange
  115. ) {
  116. var context = AccessContext.builder()
  117. .cluster(clusterName)
  118. .connectActions(ConnectAction.VIEW, ConnectAction.EDIT)
  119. .operationName("getAllConnectors")
  120. .build();
  121. var comparator = sortOrder == null || sortOrder.equals(SortOrderDTO.ASC)
  122. ? getConnectorsComparator(orderBy)
  123. : getConnectorsComparator(orderBy).reversed();
  124. Flux<FullConnectorInfoDTO> job = kafkaConnectService.getAllConnectors(getCluster(clusterName), search)
  125. .filterWhen(dto -> accessControlService.isConnectAccessible(dto.getConnect(), clusterName))
  126. .filterWhen(dto -> accessControlService.isConnectorAccessible(dto.getConnect(), dto.getName(), clusterName))
  127. .sort(comparator);
  128. return Mono.just(ResponseEntity.ok(job))
  129. .doOnEach(sig -> auditService.audit(context, sig));
  130. }
  131. @Override
  132. public Mono<ResponseEntity<Map<String, Object>>> getConnectorConfig(String clusterName,
  133. String connectName,
  134. String connectorName,
  135. ServerWebExchange exchange) {
  136. var context = AccessContext.builder()
  137. .cluster(clusterName)
  138. .connect(connectName)
  139. .connectActions(ConnectAction.VIEW)
  140. .operationName("getConnectorConfig")
  141. .build();
  142. return accessControlService.validateAccess(context).then(
  143. kafkaConnectService
  144. .getConnectorConfig(getCluster(clusterName), connectName, connectorName)
  145. .map(ResponseEntity::ok)
  146. ).doOnEach(sig -> auditService.audit(context, sig));
  147. }
  148. @Override
  149. public Mono<ResponseEntity<ConnectorDTO>> setConnectorConfig(String clusterName, String connectName,
  150. String connectorName,
  151. Mono<Map<String, Object>> requestBody,
  152. ServerWebExchange exchange) {
  153. var context = AccessContext.builder()
  154. .cluster(clusterName)
  155. .connect(connectName)
  156. .connectActions(ConnectAction.VIEW, ConnectAction.EDIT)
  157. .operationName("setConnectorConfig")
  158. .operationParams(Map.of("connectorName", connectorName))
  159. .build();
  160. return accessControlService.validateAccess(context).then(
  161. kafkaConnectService
  162. .setConnectorConfig(getCluster(clusterName), connectName, connectorName, requestBody)
  163. .map(ResponseEntity::ok))
  164. .doOnEach(sig -> auditService.audit(context, sig));
  165. }
  166. @Override
  167. public Mono<ResponseEntity<Void>> updateConnectorState(String clusterName, String connectName,
  168. String connectorName,
  169. ConnectorActionDTO action,
  170. ServerWebExchange exchange) {
  171. ConnectAction[] connectActions;
  172. if (RESTART_ACTIONS.contains(action)) {
  173. connectActions = new ConnectAction[] {ConnectAction.VIEW, ConnectAction.RESTART};
  174. } else {
  175. connectActions = new ConnectAction[] {ConnectAction.VIEW, ConnectAction.EDIT};
  176. }
  177. var context = AccessContext.builder()
  178. .cluster(clusterName)
  179. .connect(connectName)
  180. .connectActions(connectActions)
  181. .operationName("updateConnectorState")
  182. .operationParams(Map.of("connectorName", connectorName))
  183. .build();
  184. return accessControlService.validateAccess(context).then(
  185. kafkaConnectService
  186. .updateConnectorState(getCluster(clusterName), connectName, connectorName, action)
  187. .map(ResponseEntity::ok)
  188. ).doOnEach(sig -> auditService.audit(context, sig));
  189. }
  190. @Override
  191. public Mono<ResponseEntity<Flux<TaskDTO>>> getConnectorTasks(String clusterName,
  192. String connectName,
  193. String connectorName,
  194. ServerWebExchange exchange) {
  195. var context = AccessContext.builder()
  196. .cluster(clusterName)
  197. .connect(connectName)
  198. .connectActions(ConnectAction.VIEW)
  199. .operationName("getConnectorTasks")
  200. .operationParams(Map.of("connectorName", connectorName))
  201. .build();
  202. return accessControlService.validateAccess(context).thenReturn(
  203. ResponseEntity
  204. .ok(kafkaConnectService
  205. .getConnectorTasks(getCluster(clusterName), connectName, connectorName))
  206. ).doOnEach(sig -> auditService.audit(context, sig));
  207. }
  208. @Override
  209. public Mono<ResponseEntity<Void>> restartConnectorTask(String clusterName, String connectName,
  210. String connectorName, Integer taskId,
  211. ServerWebExchange exchange) {
  212. var context = AccessContext.builder()
  213. .cluster(clusterName)
  214. .connect(connectName)
  215. .connectActions(ConnectAction.VIEW, ConnectAction.RESTART)
  216. .operationName("restartConnectorTask")
  217. .operationParams(Map.of("connectorName", connectorName))
  218. .build();
  219. return accessControlService.validateAccess(context).then(
  220. kafkaConnectService
  221. .restartConnectorTask(getCluster(clusterName), connectName, connectorName, taskId)
  222. .map(ResponseEntity::ok)
  223. ).doOnEach(sig -> auditService.audit(context, sig));
  224. }
  225. @Override
  226. public Mono<ResponseEntity<Flux<ConnectorPluginDTO>>> getConnectorPlugins(
  227. String clusterName, String connectName, ServerWebExchange exchange) {
  228. var context = AccessContext.builder()
  229. .cluster(clusterName)
  230. .connect(connectName)
  231. .connectActions(ConnectAction.VIEW)
  232. .operationName("getConnectorPlugins")
  233. .build();
  234. return accessControlService.validateAccess(context).then(
  235. Mono.just(
  236. ResponseEntity.ok(
  237. kafkaConnectService.getConnectorPlugins(getCluster(clusterName), connectName)))
  238. ).doOnEach(sig -> auditService.audit(context, sig));
  239. }
  240. @Override
  241. public Mono<ResponseEntity<ConnectorPluginConfigValidationResponseDTO>> validateConnectorPluginConfig(
  242. String clusterName, String connectName, String pluginName, @Valid Mono<Map<String, Object>> requestBody,
  243. ServerWebExchange exchange) {
  244. return kafkaConnectService
  245. .validateConnectorPluginConfig(
  246. getCluster(clusterName), connectName, pluginName, requestBody)
  247. .map(ResponseEntity::ok);
  248. }
  249. private Comparator<FullConnectorInfoDTO> getConnectorsComparator(ConnectorColumnsToSortDTO orderBy) {
  250. var defaultComparator = Comparator.comparing(FullConnectorInfoDTO::getName);
  251. if (orderBy == null) {
  252. return defaultComparator;
  253. }
  254. return switch (orderBy) {
  255. case CONNECT -> Comparator.comparing(FullConnectorInfoDTO::getConnect);
  256. case TYPE -> Comparator.comparing(FullConnectorInfoDTO::getType);
  257. case STATUS -> Comparator.comparing(fullConnectorInfoDTO -> fullConnectorInfoDTO.getStatus().getState());
  258. default -> defaultComparator;
  259. };
  260. }
  261. }