KafkaConnectController.java 5.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125
  1. package com.provectus.kafka.ui.controller;
  2. import com.provectus.kafka.ui.api.KafkaConnectApi;
  3. import com.provectus.kafka.ui.model.Connect;
  4. import com.provectus.kafka.ui.model.Connector;
  5. import com.provectus.kafka.ui.model.ConnectorAction;
  6. import com.provectus.kafka.ui.model.ConnectorPlugin;
  7. import com.provectus.kafka.ui.model.ConnectorPluginConfigValidationResponse;
  8. import com.provectus.kafka.ui.model.NewConnector;
  9. import com.provectus.kafka.ui.model.Task;
  10. import com.provectus.kafka.ui.service.KafkaConnectService;
  11. import java.util.Map;
  12. import javax.validation.Valid;
  13. import lombok.RequiredArgsConstructor;
  14. import lombok.extern.log4j.Log4j2;
  15. import org.springframework.http.ResponseEntity;
  16. import org.springframework.web.bind.annotation.RestController;
  17. import org.springframework.web.server.ServerWebExchange;
  18. import reactor.core.publisher.Flux;
  19. import reactor.core.publisher.Mono;
  20. @RestController
  21. @RequiredArgsConstructor
  22. @Log4j2
  23. public class KafkaConnectController implements KafkaConnectApi {
  24. private final KafkaConnectService kafkaConnectService;
  25. @Override
  26. public Mono<ResponseEntity<Flux<Connect>>> getConnects(String clusterName,
  27. ServerWebExchange exchange) {
  28. return kafkaConnectService.getConnects(clusterName).map(ResponseEntity::ok);
  29. }
  30. @Override
  31. public Mono<ResponseEntity<Flux<String>>> getConnectors(String clusterName, String connectName,
  32. ServerWebExchange exchange) {
  33. Flux<String> connectors = kafkaConnectService.getConnectors(clusterName, connectName);
  34. return Mono.just(ResponseEntity.ok(connectors));
  35. }
  36. @Override
  37. public Mono<ResponseEntity<Connector>> createConnector(String clusterName, String connectName,
  38. @Valid Mono<NewConnector> connector,
  39. ServerWebExchange exchange) {
  40. return kafkaConnectService.createConnector(clusterName, connectName, connector)
  41. .map(ResponseEntity::ok);
  42. }
  43. @Override
  44. public Mono<ResponseEntity<Connector>> getConnector(String clusterName, String connectName,
  45. String connectorName,
  46. ServerWebExchange exchange) {
  47. return kafkaConnectService.getConnector(clusterName, connectName, connectorName)
  48. .map(ResponseEntity::ok);
  49. }
  50. @Override
  51. public Mono<ResponseEntity<Void>> deleteConnector(String clusterName, String connectName,
  52. String connectorName,
  53. ServerWebExchange exchange) {
  54. return kafkaConnectService.deleteConnector(clusterName, connectName, connectorName)
  55. .map(ResponseEntity::ok);
  56. }
  57. @Override
  58. public Mono<ResponseEntity<Map<String, Object>>> getConnectorConfig(String clusterName,
  59. String connectName,
  60. String connectorName,
  61. ServerWebExchange exchange) {
  62. return kafkaConnectService.getConnectorConfig(clusterName, connectName, connectorName)
  63. .map(ResponseEntity::ok);
  64. }
  65. @Override
  66. public Mono<ResponseEntity<Connector>> setConnectorConfig(String clusterName, String connectName,
  67. String connectorName,
  68. @Valid Mono<Object> requestBody,
  69. ServerWebExchange exchange) {
  70. return kafkaConnectService
  71. .setConnectorConfig(clusterName, connectName, connectorName, requestBody)
  72. .map(ResponseEntity::ok);
  73. }
  74. @Override
  75. public Mono<ResponseEntity<Void>> updateConnectorState(String clusterName, String connectName,
  76. String connectorName,
  77. ConnectorAction action,
  78. ServerWebExchange exchange) {
  79. return kafkaConnectService.updateConnectorState(clusterName, connectName, connectorName, action)
  80. .map(ResponseEntity::ok);
  81. }
  82. @Override
  83. public Mono<ResponseEntity<Flux<Task>>> getConnectorTasks(String clusterName, String connectName,
  84. String connectorName,
  85. ServerWebExchange exchange) {
  86. return Mono.just(ResponseEntity
  87. .ok(kafkaConnectService.getConnectorTasks(clusterName, connectName, connectorName)));
  88. }
  89. @Override
  90. public Mono<ResponseEntity<Void>> restartConnectorTask(String clusterName, String connectName,
  91. String connectorName, Integer taskId,
  92. ServerWebExchange exchange) {
  93. return kafkaConnectService.restartConnectorTask(clusterName, connectName, connectorName, taskId)
  94. .map(ResponseEntity::ok);
  95. }
  96. @Override
  97. public Mono<ResponseEntity<Flux<ConnectorPlugin>>> getConnectorPlugins(
  98. String clusterName, String connectName, ServerWebExchange exchange) {
  99. return kafkaConnectService.getConnectorPlugins(clusterName, connectName)
  100. .map(ResponseEntity::ok);
  101. }
  102. @Override
  103. public Mono<ResponseEntity<ConnectorPluginConfigValidationResponse>>
  104. validateConnectorPluginConfig(
  105. String clusterName, String connectName, String pluginName, @Valid Mono<Object> requestBody,
  106. ServerWebExchange exchange) {
  107. return kafkaConnectService
  108. .validateConnectorPluginConfig(clusterName, connectName, pluginName, requestBody)
  109. .map(ResponseEntity::ok);
  110. }
  111. }