package com.provectus.kafka.ui.controller; import com.provectus.kafka.ui.api.KafkaConnectApi; import com.provectus.kafka.ui.model.Connect; import com.provectus.kafka.ui.model.Connector; import com.provectus.kafka.ui.model.ConnectorAction; import com.provectus.kafka.ui.model.ConnectorPlugin; import com.provectus.kafka.ui.model.ConnectorPluginConfigValidationResponse; import com.provectus.kafka.ui.model.NewConnector; import com.provectus.kafka.ui.model.Task; import com.provectus.kafka.ui.service.KafkaConnectService; import java.util.Map; import javax.validation.Valid; import lombok.RequiredArgsConstructor; import lombok.extern.log4j.Log4j2; import org.springframework.http.ResponseEntity; import org.springframework.web.bind.annotation.RestController; import org.springframework.web.server.ServerWebExchange; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; @RestController @RequiredArgsConstructor @Log4j2 public class KafkaConnectController implements KafkaConnectApi { private final KafkaConnectService kafkaConnectService; @Override public Mono>> getConnects(String clusterName, ServerWebExchange exchange) { return kafkaConnectService.getConnects(clusterName).map(ResponseEntity::ok); } @Override public Mono>> getConnectors(String clusterName, String connectName, ServerWebExchange exchange) { Flux connectors = kafkaConnectService.getConnectors(clusterName, connectName); return Mono.just(ResponseEntity.ok(connectors)); } @Override public Mono> createConnector(String clusterName, String connectName, @Valid Mono connector, ServerWebExchange exchange) { return kafkaConnectService.createConnector(clusterName, connectName, connector) .map(ResponseEntity::ok); } @Override public Mono> getConnector(String clusterName, String connectName, String connectorName, ServerWebExchange exchange) { return kafkaConnectService.getConnector(clusterName, connectName, connectorName) .map(ResponseEntity::ok); } @Override public Mono> deleteConnector(String clusterName, String connectName, String connectorName, ServerWebExchange exchange) { return kafkaConnectService.deleteConnector(clusterName, connectName, connectorName) .map(ResponseEntity::ok); } @Override public Mono>> getConnectorConfig(String clusterName, String connectName, String connectorName, ServerWebExchange exchange) { return kafkaConnectService.getConnectorConfig(clusterName, connectName, connectorName) .map(ResponseEntity::ok); } @Override public Mono> setConnectorConfig(String clusterName, String connectName, String connectorName, @Valid Mono requestBody, ServerWebExchange exchange) { return kafkaConnectService .setConnectorConfig(clusterName, connectName, connectorName, requestBody) .map(ResponseEntity::ok); } @Override public Mono> updateConnectorState(String clusterName, String connectName, String connectorName, ConnectorAction action, ServerWebExchange exchange) { return kafkaConnectService.updateConnectorState(clusterName, connectName, connectorName, action) .map(ResponseEntity::ok); } @Override public Mono>> getConnectorTasks(String clusterName, String connectName, String connectorName, ServerWebExchange exchange) { return Mono.just(ResponseEntity .ok(kafkaConnectService.getConnectorTasks(clusterName, connectName, connectorName))); } @Override public Mono> restartConnectorTask(String clusterName, String connectName, String connectorName, Integer taskId, ServerWebExchange exchange) { return kafkaConnectService.restartConnectorTask(clusterName, connectName, connectorName, taskId) .map(ResponseEntity::ok); } @Override public Mono>> getConnectorPlugins( String clusterName, String connectName, ServerWebExchange exchange) { return kafkaConnectService.getConnectorPlugins(clusterName, connectName) .map(ResponseEntity::ok); } @Override public Mono> validateConnectorPluginConfig( String clusterName, String connectName, String pluginName, @Valid Mono requestBody, ServerWebExchange exchange) { return kafkaConnectService .validateConnectorPluginConfig(clusterName, connectName, pluginName, requestBody) .map(ResponseEntity::ok); } }