KafkaConnectController.java 6.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145
  1. package com.provectus.kafka.ui.controller;
  2. import com.provectus.kafka.ui.api.KafkaConnectApi;
  3. import com.provectus.kafka.ui.model.ConnectDTO;
  4. import com.provectus.kafka.ui.model.ConnectorActionDTO;
  5. import com.provectus.kafka.ui.model.ConnectorDTO;
  6. import com.provectus.kafka.ui.model.ConnectorPluginConfigValidationResponseDTO;
  7. import com.provectus.kafka.ui.model.ConnectorPluginDTO;
  8. import com.provectus.kafka.ui.model.FullConnectorInfoDTO;
  9. import com.provectus.kafka.ui.model.NewConnectorDTO;
  10. import com.provectus.kafka.ui.model.TaskDTO;
  11. import com.provectus.kafka.ui.service.KafkaConnectService;
  12. import java.util.Map;
  13. import javax.validation.Valid;
  14. import lombok.RequiredArgsConstructor;
  15. import lombok.extern.slf4j.Slf4j;
  16. import org.springframework.http.ResponseEntity;
  17. import org.springframework.web.bind.annotation.RestController;
  18. import org.springframework.web.server.ServerWebExchange;
  19. import reactor.core.publisher.Flux;
  20. import reactor.core.publisher.Mono;
  21. @RestController
  22. @RequiredArgsConstructor
  23. @Slf4j
  24. public class KafkaConnectController extends AbstractController implements KafkaConnectApi {
  25. private final KafkaConnectService kafkaConnectService;
  26. @Override
  27. public Mono<ResponseEntity<Flux<ConnectDTO>>> getConnects(String clusterName,
  28. ServerWebExchange exchange) {
  29. return kafkaConnectService.getConnects(getCluster(clusterName)).map(ResponseEntity::ok);
  30. }
  31. @Override
  32. public Mono<ResponseEntity<Flux<String>>> getConnectors(String clusterName, String connectName,
  33. ServerWebExchange exchange) {
  34. var connectors = kafkaConnectService.getConnectors(getCluster(clusterName), connectName);
  35. return Mono.just(ResponseEntity.ok(connectors));
  36. }
  37. @Override
  38. public Mono<ResponseEntity<ConnectorDTO>> createConnector(String clusterName, String connectName,
  39. @Valid Mono<NewConnectorDTO> connector,
  40. ServerWebExchange exchange) {
  41. return kafkaConnectService.createConnector(getCluster(clusterName), connectName, connector)
  42. .map(ResponseEntity::ok);
  43. }
  44. @Override
  45. public Mono<ResponseEntity<ConnectorDTO>> getConnector(String clusterName, String connectName,
  46. String connectorName,
  47. ServerWebExchange exchange) {
  48. return kafkaConnectService.getConnector(getCluster(clusterName), connectName, connectorName)
  49. .map(ResponseEntity::ok);
  50. }
  51. @Override
  52. public Mono<ResponseEntity<Void>> deleteConnector(String clusterName, String connectName,
  53. String connectorName,
  54. ServerWebExchange exchange) {
  55. return kafkaConnectService.deleteConnector(getCluster(clusterName), connectName, connectorName)
  56. .map(ResponseEntity::ok);
  57. }
  58. @Override
  59. public Mono<ResponseEntity<Flux<FullConnectorInfoDTO>>> getAllConnectors(
  60. String clusterName,
  61. String search,
  62. ServerWebExchange exchange
  63. ) {
  64. return Mono.just(ResponseEntity.ok(
  65. kafkaConnectService.getAllConnectors(getCluster(clusterName), search)));
  66. }
  67. @Override
  68. public Mono<ResponseEntity<Map<String, Object>>> getConnectorConfig(String clusterName,
  69. String connectName,
  70. String connectorName,
  71. ServerWebExchange exchange) {
  72. return kafkaConnectService
  73. .getConnectorConfig(getCluster(clusterName), connectName, connectorName)
  74. .map(ResponseEntity::ok);
  75. }
  76. @Override
  77. public Mono<ResponseEntity<ConnectorDTO>> setConnectorConfig(String clusterName,
  78. String connectName,
  79. String connectorName,
  80. @Valid Mono<Object> requestBody,
  81. ServerWebExchange exchange) {
  82. return kafkaConnectService
  83. .setConnectorConfig(getCluster(clusterName), connectName, connectorName, requestBody)
  84. .map(ResponseEntity::ok);
  85. }
  86. @Override
  87. public Mono<ResponseEntity<Void>> updateConnectorState(String clusterName, String connectName,
  88. String connectorName,
  89. ConnectorActionDTO action,
  90. ServerWebExchange exchange) {
  91. return kafkaConnectService
  92. .updateConnectorState(getCluster(clusterName), connectName, connectorName, action)
  93. .map(ResponseEntity::ok);
  94. }
  95. @Override
  96. public Mono<ResponseEntity<Flux<TaskDTO>>> getConnectorTasks(String clusterName,
  97. String connectName,
  98. String connectorName,
  99. ServerWebExchange exchange) {
  100. return Mono.just(ResponseEntity
  101. .ok(kafkaConnectService
  102. .getConnectorTasks(getCluster(clusterName), connectName, connectorName)));
  103. }
  104. @Override
  105. public Mono<ResponseEntity<Void>> restartConnectorTask(String clusterName, String connectName,
  106. String connectorName, Integer taskId,
  107. ServerWebExchange exchange) {
  108. return kafkaConnectService
  109. .restartConnectorTask(getCluster(clusterName), connectName, connectorName, taskId)
  110. .map(ResponseEntity::ok);
  111. }
  112. @Override
  113. public Mono<ResponseEntity<Flux<ConnectorPluginDTO>>> getConnectorPlugins(
  114. String clusterName, String connectName, ServerWebExchange exchange) {
  115. return kafkaConnectService
  116. .getConnectorPlugins(getCluster(clusterName), connectName)
  117. .map(ResponseEntity::ok);
  118. }
  119. @Override
  120. public Mono<ResponseEntity<ConnectorPluginConfigValidationResponseDTO>>
  121. validateConnectorPluginConfig(
  122. String clusterName, String connectName, String pluginName, @Valid Mono<Object> requestBody,
  123. ServerWebExchange exchange) {
  124. return kafkaConnectService
  125. .validateConnectorPluginConfig(
  126. getCluster(clusterName), connectName, pluginName, requestBody)
  127. .map(ResponseEntity::ok);
  128. }
  129. }