KafkaConnectController.java 12 KB


  1. package com.provectus.kafka.ui.controller;
  2. import com.provectus.kafka.ui.api.KafkaConnectApi;
  3. import com.provectus.kafka.ui.model.ConnectDTO;
  4. import com.provectus.kafka.ui.model.ConnectorActionDTO;
  5. import com.provectus.kafka.ui.model.ConnectorColumnsToSortDTO;
  6. import com.provectus.kafka.ui.model.ConnectorDTO;
  7. import com.provectus.kafka.ui.model.ConnectorPluginConfigValidationResponseDTO;
  8. import com.provectus.kafka.ui.model.ConnectorPluginDTO;
  9. import com.provectus.kafka.ui.model.FullConnectorInfoDTO;
  10. import com.provectus.kafka.ui.model.NewConnectorDTO;
  11. import com.provectus.kafka.ui.model.SortOrderDTO;
  12. import com.provectus.kafka.ui.model.TaskDTO;
  13. import com.provectus.kafka.ui.model.rbac.AccessContext;
  14. import com.provectus.kafka.ui.model.rbac.permission.ConnectAction;
  15. import com.provectus.kafka.ui.service.KafkaConnectService;
  16. import com.provectus.kafka.ui.service.audit.AuditService;
  17. import com.provectus.kafka.ui.service.rbac.AccessControlService;
  18. import java.util.Comparator;
  19. import java.util.Map;
  20. import javax.validation.Valid;
  21. import lombok.RequiredArgsConstructor;
  22. import lombok.extern.slf4j.Slf4j;
  23. import org.springframework.http.ResponseEntity;
  24. import org.springframework.web.bind.annotation.RestController;
  25. import org.springframework.web.server.ServerWebExchange;
  26. import reactor.core.publisher.Flux;
  27. import reactor.core.publisher.Mono;
  28. @RestController
  29. @RequiredArgsConstructor
  30. @Slf4j
  31. public class KafkaConnectController extends AbstractController implements KafkaConnectApi {
  32. private final KafkaConnectService kafkaConnectService;
  33. private final AccessControlService accessControlService;
  34. private final AuditService auditService;
  35. @Override
  36. public Mono<ResponseEntity<Flux<ConnectDTO>>> getConnects(String clusterName,
  37. ServerWebExchange exchange) {
  38. Flux<ConnectDTO> availableConnects = kafkaConnectService.getConnects(getCluster(clusterName))
  39. .filterWhen(dto -> accessControlService.isConnectAccessible(dto, clusterName));
  40. return Mono.just(ResponseEntity.ok(availableConnects));
  41. }
  42. @Override
  43. public Mono<ResponseEntity<Flux<String>>> getConnectors(String clusterName, String connectName,
  44. ServerWebExchange exchange) {
  45. var context = AccessContext.builder()
  46. .cluster(clusterName)
  47. .connect(connectName)
  48. .connectActions(ConnectAction.VIEW)
  49. .auditOperation("getConnectors")
  50. .build();
  51. return accessControlService.validateAccess(context)
  52. .thenReturn(ResponseEntity.ok(kafkaConnectService.getConnectorNames(getCluster(clusterName), connectName)))
  53. .doOnEach(sig -> auditService.audit(context, sig));
  54. }
  55. @Override
  56. public Mono<ResponseEntity<ConnectorDTO>> createConnector(String clusterName, String connectName,
  57. @Valid Mono<NewConnectorDTO> connector,
  58. ServerWebExchange exchange) {
  59. var context = AccessContext.builder()
  60. .cluster(clusterName)
  61. .connect(connectName)
  62. .connectActions(ConnectAction.VIEW, ConnectAction.CREATE)
  63. .auditOperation("createConnector")
  64. .build();
  65. return accessControlService.validateAccess(context).then(
  66. kafkaConnectService.createConnector(getCluster(clusterName), connectName, connector)
  67. .map(ResponseEntity::ok)
  68. ).doOnEach(sig -> auditService.audit(context, sig));
  69. }
  70. @Override
  71. public Mono<ResponseEntity<ConnectorDTO>> getConnector(String clusterName, String connectName,
  72. String connectorName,
  73. ServerWebExchange exchange) {
  74. var context = AccessContext.builder()
  75. .cluster(clusterName)
  76. .connect(connectName)
  77. .connectActions(ConnectAction.VIEW)
  78. .connector(connectorName)
  79. .auditOperation("getConnector")
  80. .build();
  81. return accessControlService.validateAccess(context).then(
  82. kafkaConnectService.getConnector(getCluster(clusterName), connectName, connectorName)
  83. .map(ResponseEntity::ok)
  84. ).doOnEach(sig -> auditService.audit(context, sig));
  85. }
  86. @Override
  87. public Mono<ResponseEntity<Void>> deleteConnector(String clusterName, String connectName,
  88. String connectorName,
  89. ServerWebExchange exchange) {
  90. var context = AccessContext.builder()
  91. .cluster(clusterName)
  92. .connect(connectName)
  93. .connectActions(ConnectAction.VIEW, ConnectAction.EDIT)
  94. .auditOperation("deleteConnector")
  95. .build();
  96. return accessControlService.validateAccess(context).then(
  97. kafkaConnectService.deleteConnector(getCluster(clusterName), connectName, connectorName)
  98. .map(ResponseEntity::ok)
  99. ).doOnEach(sig -> auditService.audit(context, sig));
  100. }
  101. @Override
  102. public Mono<ResponseEntity<Flux<FullConnectorInfoDTO>>> getAllConnectors(
  103. String clusterName,
  104. String search,
  105. ConnectorColumnsToSortDTO orderBy,
  106. SortOrderDTO sortOrder,
  107. ServerWebExchange exchange
  108. ) {
  109. var context = AccessContext.builder()
  110. .cluster(clusterName)
  111. .connectActions(ConnectAction.VIEW, ConnectAction.EDIT)
  112. .auditOperation("getAllConnectors")
  113. .build();
  114. var comparator = sortOrder == null || sortOrder.equals(SortOrderDTO.ASC)
  115. ? getConnectorsComparator(orderBy)
  116. : getConnectorsComparator(orderBy).reversed();
  117. Flux<FullConnectorInfoDTO> job = kafkaConnectService.getAllConnectors(getCluster(clusterName), search)
  118. .filterWhen(dto -> accessControlService.isConnectAccessible(dto.getConnect(), clusterName))
  119. .filterWhen(dto -> accessControlService.isConnectorAccessible(dto.getConnect(), dto.getName(), clusterName));
  120. return Mono.just(ResponseEntity.ok(job.sort(comparator)))
  121. .doOnEach(sig -> auditService.audit(context, sig));
  122. }
  123. @Override
  124. public Mono<ResponseEntity<Map<String, Object>>> getConnectorConfig(String clusterName,
  125. String connectName,
  126. String connectorName,
  127. ServerWebExchange exchange) {
  128. var context = AccessContext.builder()
  129. .cluster(clusterName)
  130. .connect(connectName)
  131. .connectActions(ConnectAction.VIEW)
  132. .auditOperation("getConnectorConfig")
  133. .build();
  134. return accessControlService.validateAccess(context).then(
  135. kafkaConnectService
  136. .getConnectorConfig(getCluster(clusterName), connectName, connectorName)
  137. .map(ResponseEntity::ok)
  138. ).doOnEach(sig -> auditService.audit(context, sig));
  139. }
  140. @Override
  141. public Mono<ResponseEntity<ConnectorDTO>> setConnectorConfig(String clusterName, String connectName,
  142. String connectorName,
  143. Mono<Map<String, Object>> requestBody,
  144. ServerWebExchange exchange) {
  145. var context = AccessContext.builder()
  146. .cluster(clusterName)
  147. .connect(connectName)
  148. .connectActions(ConnectAction.VIEW, ConnectAction.EDIT)
  149. .auditOperation("setConnectorConfig")
  150. .build();
  151. return accessControlService.validateAccess(context).then(
  152. kafkaConnectService
  153. .setConnectorConfig(getCluster(clusterName), connectName, connectorName, requestBody)
  154. .map(ResponseEntity::ok))
  155. .doOnEach(sig -> auditService.audit(context, sig));
  156. }
  157. @Override
  158. public Mono<ResponseEntity<Void>> updateConnectorState(String clusterName, String connectName,
  159. String connectorName,
  160. ConnectorActionDTO action,
  161. ServerWebExchange exchange) {
  162. var context = AccessContext.builder()
  163. .cluster(clusterName)
  164. .connect(connectName)
  165. .connectActions(ConnectAction.VIEW, ConnectAction.EDIT)
  166. .auditOperation("updateConnectorState")
  167. .build();
  168. return accessControlService.validateAccess(context).then(
  169. kafkaConnectService
  170. .updateConnectorState(getCluster(clusterName), connectName, connectorName, action)
  171. .map(ResponseEntity::ok)
  172. ).doOnEach(sig -> auditService.audit(context, sig));
  173. }
  174. @Override
  175. public Mono<ResponseEntity<Flux<TaskDTO>>> getConnectorTasks(String clusterName,
  176. String connectName,
  177. String connectorName,
  178. ServerWebExchange exchange) {
  179. var context = AccessContext.builder()
  180. .cluster(clusterName)
  181. .connect(connectName)
  182. .connectActions(ConnectAction.VIEW)
  183. .auditOperation("getConnectorTasks")
  184. .build();
  185. return accessControlService.validateAccess(context).thenReturn(
  186. ResponseEntity
  187. .ok(kafkaConnectService
  188. .getConnectorTasks(getCluster(clusterName), connectName, connectorName))
  189. ).doOnEach(sig -> auditService.audit(context, sig));
  190. }
  191. @Override
  192. public Mono<ResponseEntity<Void>> restartConnectorTask(String clusterName, String connectName,
  193. String connectorName, Integer taskId,
  194. ServerWebExchange exchange) {
  195. var context = AccessContext.builder()
  196. .cluster(clusterName)
  197. .connect(connectName)
  198. .connectActions(ConnectAction.VIEW, ConnectAction.RESTART)
  199. .auditOperation("restartConnectorTask")
  200. .build();
  201. return accessControlService.validateAccess(context).then(
  202. kafkaConnectService
  203. .restartConnectorTask(getCluster(clusterName), connectName, connectorName, taskId)
  204. .map(ResponseEntity::ok)
  205. ).doOnEach(sig -> auditService.audit(context, sig));
  206. }
  207. @Override
  208. public Mono<ResponseEntity<Flux<ConnectorPluginDTO>>> getConnectorPlugins(
  209. String clusterName, String connectName, ServerWebExchange exchange) {
  210. var context = AccessContext.builder()
  211. .cluster(clusterName)
  212. .connect(connectName)
  213. .connectActions(ConnectAction.VIEW)
  214. .auditOperation("getConnectorPlugins")
  215. .build();
  216. return accessControlService.validateAccess(context).then(
  217. Mono.just(
  218. ResponseEntity.ok(
  219. kafkaConnectService.getConnectorPlugins(getCluster(clusterName), connectName)))
  220. ).doOnEach(sig -> auditService.audit(context, sig));
  221. }
  222. @Override
  223. public Mono<ResponseEntity<ConnectorPluginConfigValidationResponseDTO>> validateConnectorPluginConfig(
  224. String clusterName, String connectName, String pluginName, @Valid Mono<Map<String, Object>> requestBody,
  225. ServerWebExchange exchange) {
  226. return kafkaConnectService
  227. .validateConnectorPluginConfig(
  228. getCluster(clusterName), connectName, pluginName, requestBody)
  229. .map(ResponseEntity::ok);
  230. }
  231. private Comparator<FullConnectorInfoDTO> getConnectorsComparator(ConnectorColumnsToSortDTO orderBy) {
  232. var defaultComparator = Comparator.comparing(FullConnectorInfoDTO::getName);
  233. if (orderBy == null) {
  234. return defaultComparator;
  235. }
  236. switch (orderBy) {
  237. case CONNECT:
  238. return Comparator.comparing(FullConnectorInfoDTO::getConnect);
  239. case TYPE:
  240. return Comparator.comparing(FullConnectorInfoDTO::getType);
  241. case STATUS:
  242. return Comparator.comparing(fullConnectorInfoDTO -> fullConnectorInfoDTO.getStatus().getState());
  243. case NAME:
  244. default:
  245. return defaultComparator;
  246. }
  247. }
  248. }