|
@@ -6,7 +6,13 @@ import com.provectus.kafka.ui.config.ClustersProperties;
|
|
|
import com.provectus.kafka.ui.connect.ApiClient;
|
|
|
import com.provectus.kafka.ui.connect.api.KafkaConnectClientApi;
|
|
|
import com.provectus.kafka.ui.connect.model.Connector;
|
|
|
+import com.provectus.kafka.ui.connect.model.ConnectorPlugin;
|
|
|
+import com.provectus.kafka.ui.connect.model.ConnectorPluginConfigValidationResponse;
|
|
|
+import com.provectus.kafka.ui.connect.model.ConnectorStatus;
|
|
|
+import com.provectus.kafka.ui.connect.model.ConnectorTask;
|
|
|
+import com.provectus.kafka.ui.connect.model.ConnectorTopics;
|
|
|
import com.provectus.kafka.ui.connect.model.NewConnector;
|
|
|
+import com.provectus.kafka.ui.connect.model.TaskStatus;
|
|
|
import com.provectus.kafka.ui.exception.KafkaConnectConflictReponseException;
|
|
|
import com.provectus.kafka.ui.exception.ValidationException;
|
|
|
import com.provectus.kafka.ui.util.WebClientConfigurator;
|
|
@@ -15,11 +21,7 @@ import java.util.List;
|
|
|
import java.util.Map;
|
|
|
import javax.annotation.Nullable;
|
|
|
import lombok.extern.slf4j.Slf4j;
|
|
|
-import org.springframework.core.ParameterizedTypeReference;
|
|
|
-import org.springframework.http.HttpHeaders;
|
|
|
-import org.springframework.http.HttpMethod;
|
|
|
-import org.springframework.http.MediaType;
|
|
|
-import org.springframework.util.MultiValueMap;
|
|
|
+import org.springframework.http.ResponseEntity;
|
|
|
import org.springframework.util.unit.DataSize;
|
|
|
import org.springframework.web.client.RestClientException;
|
|
|
import org.springframework.web.reactive.function.client.WebClient;
|
|
@@ -79,6 +81,176 @@ public class RetryingKafkaConnectClient extends KafkaConnectClientApi {
|
|
|
);
|
|
|
}
|
|
|
|
|
|
+ @Override
|
|
|
+ public Mono<ResponseEntity<Connector>> createConnectorWithHttpInfo(NewConnector newConnector)
|
|
|
+ throws WebClientResponseException {
|
|
|
+ return withRetryOnConflict(super.createConnectorWithHttpInfo(newConnector));
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public Mono<Void> deleteConnector(String connectorName) throws WebClientResponseException {
|
|
|
+ return withRetryOnConflict(super.deleteConnector(connectorName));
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public Mono<ResponseEntity<Void>> deleteConnectorWithHttpInfo(String connectorName)
|
|
|
+ throws WebClientResponseException {
|
|
|
+ return withRetryOnConflict(super.deleteConnectorWithHttpInfo(connectorName));
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public Mono<Connector> getConnector(String connectorName) throws WebClientResponseException {
|
|
|
+ return withRetryOnConflict(super.getConnector(connectorName));
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public Mono<ResponseEntity<Connector>> getConnectorWithHttpInfo(String connectorName)
|
|
|
+ throws WebClientResponseException {
|
|
|
+ return withRetryOnConflict(super.getConnectorWithHttpInfo(connectorName));
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public Mono<Map<String, Object>> getConnectorConfig(String connectorName) throws WebClientResponseException {
|
|
|
+ return withRetryOnConflict(super.getConnectorConfig(connectorName));
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public Mono<ResponseEntity<Map<String, Object>>> getConnectorConfigWithHttpInfo(String connectorName)
|
|
|
+ throws WebClientResponseException {
|
|
|
+ return withRetryOnConflict(super.getConnectorConfigWithHttpInfo(connectorName));
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public Flux<ConnectorPlugin> getConnectorPlugins() throws WebClientResponseException {
|
|
|
+ return withRetryOnConflict(super.getConnectorPlugins());
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public Mono<ResponseEntity<List<ConnectorPlugin>>> getConnectorPluginsWithHttpInfo()
|
|
|
+ throws WebClientResponseException {
|
|
|
+ return withRetryOnConflict(super.getConnectorPluginsWithHttpInfo());
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public Mono<ConnectorStatus> getConnectorStatus(String connectorName) throws WebClientResponseException {
|
|
|
+ return withRetryOnConflict(super.getConnectorStatus(connectorName));
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public Mono<ResponseEntity<ConnectorStatus>> getConnectorStatusWithHttpInfo(String connectorName)
|
|
|
+ throws WebClientResponseException {
|
|
|
+ return withRetryOnConflict(super.getConnectorStatusWithHttpInfo(connectorName));
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public Mono<TaskStatus> getConnectorTaskStatus(String connectorName, Integer taskId)
|
|
|
+ throws WebClientResponseException {
|
|
|
+ return withRetryOnConflict(super.getConnectorTaskStatus(connectorName, taskId));
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public Mono<ResponseEntity<TaskStatus>> getConnectorTaskStatusWithHttpInfo(String connectorName, Integer taskId)
|
|
|
+ throws WebClientResponseException {
|
|
|
+ return withRetryOnConflict(super.getConnectorTaskStatusWithHttpInfo(connectorName, taskId));
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public Flux<ConnectorTask> getConnectorTasks(String connectorName) throws WebClientResponseException {
|
|
|
+ return withRetryOnConflict(super.getConnectorTasks(connectorName));
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public Mono<ResponseEntity<List<ConnectorTask>>> getConnectorTasksWithHttpInfo(String connectorName)
|
|
|
+ throws WebClientResponseException {
|
|
|
+ return withRetryOnConflict(super.getConnectorTasksWithHttpInfo(connectorName));
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public Mono<Map<String, ConnectorTopics>> getConnectorTopics(String connectorName) throws WebClientResponseException {
|
|
|
+ return withRetryOnConflict(super.getConnectorTopics(connectorName));
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public Mono<ResponseEntity<Map<String, ConnectorTopics>>> getConnectorTopicsWithHttpInfo(String connectorName)
|
|
|
+ throws WebClientResponseException {
|
|
|
+ return withRetryOnConflict(super.getConnectorTopicsWithHttpInfo(connectorName));
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public Flux<String> getConnectors(String search) throws WebClientResponseException {
|
|
|
+ return withRetryOnConflict(super.getConnectors(search));
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public Mono<ResponseEntity<List<String>>> getConnectorsWithHttpInfo(String search) throws WebClientResponseException {
|
|
|
+ return withRetryOnConflict(super.getConnectorsWithHttpInfo(search));
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public Mono<Void> pauseConnector(String connectorName) throws WebClientResponseException {
|
|
|
+ return withRetryOnConflict(super.pauseConnector(connectorName));
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public Mono<ResponseEntity<Void>> pauseConnectorWithHttpInfo(String connectorName) throws WebClientResponseException {
|
|
|
+ return withRetryOnConflict(super.pauseConnectorWithHttpInfo(connectorName));
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public Mono<Void> restartConnector(String connectorName, Boolean includeTasks, Boolean onlyFailed)
|
|
|
+ throws WebClientResponseException {
|
|
|
+ return withRetryOnConflict(super.restartConnector(connectorName, includeTasks, onlyFailed));
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public Mono<ResponseEntity<Void>> restartConnectorWithHttpInfo(String connectorName, Boolean includeTasks,
|
|
|
+ Boolean onlyFailed) throws WebClientResponseException {
|
|
|
+ return withRetryOnConflict(super.restartConnectorWithHttpInfo(connectorName, includeTasks, onlyFailed));
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public Mono<Void> restartConnectorTask(String connectorName, Integer taskId) throws WebClientResponseException {
|
|
|
+ return withRetryOnConflict(super.restartConnectorTask(connectorName, taskId));
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public Mono<ResponseEntity<Void>> restartConnectorTaskWithHttpInfo(String connectorName, Integer taskId)
|
|
|
+ throws WebClientResponseException {
|
|
|
+ return withRetryOnConflict(super.restartConnectorTaskWithHttpInfo(connectorName, taskId));
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public Mono<Void> resumeConnector(String connectorName) throws WebClientResponseException {
|
|
|
+ return super.resumeConnector(connectorName);
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public Mono<ResponseEntity<Void>> resumeConnectorWithHttpInfo(String connectorName)
|
|
|
+ throws WebClientResponseException {
|
|
|
+ return withRetryOnConflict(super.resumeConnectorWithHttpInfo(connectorName));
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public Mono<ResponseEntity<Connector>> setConnectorConfigWithHttpInfo(String connectorName,
|
|
|
+ Map<String, Object> requestBody)
|
|
|
+ throws WebClientResponseException {
|
|
|
+ return withRetryOnConflict(super.setConnectorConfigWithHttpInfo(connectorName, requestBody));
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public Mono<ConnectorPluginConfigValidationResponse> validateConnectorPluginConfig(String pluginName,
|
|
|
+ Map<String, Object> requestBody)
|
|
|
+ throws WebClientResponseException {
|
|
|
+ return withRetryOnConflict(super.validateConnectorPluginConfig(pluginName, requestBody));
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public Mono<ResponseEntity<ConnectorPluginConfigValidationResponse>> validateConnectorPluginConfigWithHttpInfo(
|
|
|
+ String pluginName, Map<String, Object> requestBody) throws WebClientResponseException {
|
|
|
+ return withRetryOnConflict(super.validateConnectorPluginConfigWithHttpInfo(pluginName, requestBody));
|
|
|
+ }
|
|
|
+
|
|
|
private static class RetryingApiClient extends ApiClient {
|
|
|
|
|
|
public RetryingApiClient(ConnectCluster config,
|
|
@@ -108,35 +280,5 @@ public class RetryingKafkaConnectClient extends KafkaConnectClientApi {
|
|
|
.configureBufferSize(maxBuffSize)
|
|
|
.build();
|
|
|
}
|
|
|
-
|
|
|
- @Override
|
|
|
- public <T> Mono<T> invokeAPI(String path, HttpMethod method, Map<String, Object> pathParams,
|
|
|
- MultiValueMap<String, String> queryParams, Object body,
|
|
|
- HttpHeaders headerParams,
|
|
|
- MultiValueMap<String, String> cookieParams,
|
|
|
- MultiValueMap<String, Object> formParams, List<MediaType> accept,
|
|
|
- MediaType contentType, String[] authNames,
|
|
|
- ParameterizedTypeReference<T> returnType)
|
|
|
- throws RestClientException {
|
|
|
- return withRetryOnConflict(
|
|
|
- super.invokeAPI(path, method, pathParams, queryParams, body, headerParams, cookieParams,
|
|
|
- formParams, accept, contentType, authNames, returnType)
|
|
|
- );
|
|
|
- }
|
|
|
-
|
|
|
- @Override
|
|
|
- public <T> Flux<T> invokeFluxAPI(String path, HttpMethod method, Map<String, Object> pathParams,
|
|
|
- MultiValueMap<String, String> queryParams, Object body,
|
|
|
- HttpHeaders headerParams,
|
|
|
- MultiValueMap<String, String> cookieParams,
|
|
|
- MultiValueMap<String, Object> formParams,
|
|
|
- List<MediaType> accept, MediaType contentType,
|
|
|
- String[] authNames, ParameterizedTypeReference<T> returnType)
|
|
|
- throws RestClientException {
|
|
|
- return withRetryOnConflict(
|
|
|
- super.invokeFluxAPI(path, method, pathParams, queryParams, body, headerParams,
|
|
|
- cookieParams, formParams, accept, contentType, authNames, returnType)
|
|
|
- );
|
|
|
- }
|
|
|
}
|
|
|
}
|