From c5e5717a9858cb9b2575339183dbc497e035294d Mon Sep 17 00:00:00 2001 From: Ilya Kuramshin Date: Wed, 27 Oct 2021 21:31:15 +0300 Subject: [PATCH] Handling for 404 responses from connect added (#1025) * handling for 404 responses from connect added * connector name uniqness check added (#1026) --- .../ui/client/RetryingKafkaConnectClient.java | 26 +++++---- .../kafka/ui/exception/ErrorCode.java | 2 +- .../KafkaConnectConflictReponseException.java | 17 ++++++ .../RebalanceInProgressException.java | 14 ----- .../kafka/ui/service/KafkaConnectService.java | 57 +++++++++++++------ .../kafka/ui/KafkaConnectServiceTests.java | 18 ++++++ 6 files changed, 91 insertions(+), 43 deletions(-) create mode 100644 kafka-ui-api/src/main/java/com/provectus/kafka/ui/exception/KafkaConnectConflictReponseException.java delete mode 100644 kafka-ui-api/src/main/java/com/provectus/kafka/ui/exception/RebalanceInProgressException.java diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/client/RetryingKafkaConnectClient.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/client/RetryingKafkaConnectClient.java index f8d52f83f4..372a484016 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/client/RetryingKafkaConnectClient.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/client/RetryingKafkaConnectClient.java @@ -4,8 +4,9 @@ import com.provectus.kafka.ui.connect.ApiClient; import com.provectus.kafka.ui.connect.api.KafkaConnectClientApi; import com.provectus.kafka.ui.connect.model.Connector; import com.provectus.kafka.ui.connect.model.NewConnector; -import com.provectus.kafka.ui.exception.RebalanceInProgressException; +import com.provectus.kafka.ui.exception.KafkaConnectConflictReponseException; import com.provectus.kafka.ui.exception.ValidationException; +import java.time.Duration; import java.util.List; import java.util.Map; import lombok.extern.log4j.Log4j2; @@ -19,29 +20,32 @@ import org.springframework.web.reactive.function.client.WebClientResponseExcepti import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.util.retry.Retry; +import reactor.util.retry.RetryBackoffSpec; @Log4j2 public class RetryingKafkaConnectClient extends KafkaConnectClientApi { private static final int MAX_RETRIES = 5; + private static final Duration RETRIES_DELAY = Duration.ofMillis(200); public RetryingKafkaConnectClient(String basePath) { super(new RetryingApiClient().setBasePath(basePath)); } + private static Retry conflictCodeRetry() { + return RetryBackoffSpec + .fixedDelay(MAX_RETRIES, RETRIES_DELAY) + .filter(e -> e instanceof WebClientResponseException.Conflict) + .onRetryExhaustedThrow((spec, signal) -> + new KafkaConnectConflictReponseException( + (WebClientResponseException.Conflict) signal.failure())); + } + private static Mono withRetryOnConflict(Mono publisher) { - return publisher.retryWhen( - Retry.max(MAX_RETRIES).filter(e -> e instanceof WebClientResponseException.Conflict)) - .onErrorResume(WebClientResponseException.Conflict.class, - e -> Mono.error(new RebalanceInProgressException())) - .doOnError(log::error); + return publisher.retryWhen(conflictCodeRetry()); } private static Flux withRetryOnConflict(Flux publisher) { - return publisher.retryWhen( - Retry.max(MAX_RETRIES).filter(e -> e instanceof WebClientResponseException.Conflict)) - .onErrorResume(WebClientResponseException.Conflict.class, - e -> Mono.error(new RebalanceInProgressException())) - .doOnError(log::error); + return publisher.retryWhen(conflictCodeRetry()); } private static Mono withBadRequestErrorHandling(Mono publisher) { diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/exception/ErrorCode.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/exception/ErrorCode.java index 8714738904..7315c20a61 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/exception/ErrorCode.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/exception/ErrorCode.java @@ -13,7 +13,7 @@ public enum ErrorCode { INVALID_ENTITY_STATE(4001, HttpStatus.BAD_REQUEST), VALIDATION_FAIL(4002, HttpStatus.BAD_REQUEST), READ_ONLY_MODE_ENABLE(4003, HttpStatus.METHOD_NOT_ALLOWED), - REBALANCE_IN_PROGRESS(4004, HttpStatus.CONFLICT), + CONNECT_CONFLICT_RESPONSE(4004, HttpStatus.CONFLICT), DUPLICATED_ENTITY(4005, HttpStatus.CONFLICT), UNPROCESSABLE_ENTITY(4006, HttpStatus.UNPROCESSABLE_ENTITY), CLUSTER_NOT_FOUND(4007, HttpStatus.NOT_FOUND), diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/exception/KafkaConnectConflictReponseException.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/exception/KafkaConnectConflictReponseException.java new file mode 100644 index 0000000000..ae33150073 --- /dev/null +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/exception/KafkaConnectConflictReponseException.java @@ -0,0 +1,17 @@ +package com.provectus.kafka.ui.exception; + + +import org.springframework.web.reactive.function.client.WebClientResponseException; + +public class KafkaConnectConflictReponseException extends CustomBaseException { + + public KafkaConnectConflictReponseException(WebClientResponseException.Conflict e) { + super("Kafka Connect responded with 409 (Conflict) code. Response body: " + + e.getResponseBodyAsString()); + } + + @Override + public ErrorCode getErrorCode() { + return ErrorCode.CONNECT_CONFLICT_RESPONSE; + } +} diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/exception/RebalanceInProgressException.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/exception/RebalanceInProgressException.java deleted file mode 100644 index 2f6f91c6e6..0000000000 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/exception/RebalanceInProgressException.java +++ /dev/null @@ -1,14 +0,0 @@ -package com.provectus.kafka.ui.exception; - - -public class RebalanceInProgressException extends CustomBaseException { - - public RebalanceInProgressException() { - super("Rebalance is in progress."); - } - - @Override - public ErrorCode getErrorCode() { - return ErrorCode.REBALANCE_IN_PROGRESS; - } -} diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/KafkaConnectService.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/KafkaConnectService.java index 1b6eb30773..803112a607 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/KafkaConnectService.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/KafkaConnectService.java @@ -3,10 +3,12 @@ package com.provectus.kafka.ui.service; import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.ObjectMapper; import com.provectus.kafka.ui.client.KafkaConnectClients; +import com.provectus.kafka.ui.connect.model.ConnectorStatus; +import com.provectus.kafka.ui.connect.model.ConnectorStatusConnector; import com.provectus.kafka.ui.connect.model.ConnectorTopics; import com.provectus.kafka.ui.connect.model.TaskStatus; -import com.provectus.kafka.ui.exception.ClusterNotFoundException; import com.provectus.kafka.ui.exception.ConnectNotFoundException; +import com.provectus.kafka.ui.exception.ValidationException; import com.provectus.kafka.ui.mapper.ClusterMapper; import com.provectus.kafka.ui.mapper.KafkaConnectMapper; import com.provectus.kafka.ui.model.ConnectDTO; @@ -32,6 +34,7 @@ import lombok.SneakyThrows; import lombok.extern.log4j.Log4j2; import org.apache.commons.lang3.StringUtils; import org.springframework.stereotype.Service; +import org.springframework.web.reactive.function.client.WebClientResponseException; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.util.function.Tuple2; @@ -41,7 +44,6 @@ import reactor.util.function.Tuples; @Log4j2 @RequiredArgsConstructor public class KafkaConnectService { - private final ClustersStorage clustersStorage; private final ClusterMapper clusterMapper; private final KafkaConnectMapper kafkaConnectMapper; private final ObjectMapper objectMapper; @@ -60,7 +62,7 @@ public class KafkaConnectService { final String search) { return getConnects(cluster) .flatMapMany(Function.identity()) - .flatMap(connect -> getConnectorNames(cluster, connect)) + .flatMap(connect -> getConnectorNames(cluster, connect.getName())) .flatMap(pair -> getConnector(cluster, pair.getT1(), pair.getT2())) .flatMap(connector -> getConnectorConfig(cluster, connector.getConnect(), connector.getName()) @@ -123,13 +125,13 @@ public class KafkaConnectService { ); } - private Flux> getConnectorNames(KafkaCluster cluster, ConnectDTO connect) { - return getConnectors(cluster, connect.getName()) + private Flux> getConnectorNames(KafkaCluster cluster, String connectName) { + return getConnectors(cluster, connectName) .collectList().map(e -> e.get(0)) // for some reason `getConnectors` method returns the response as a single string .map(this::parseToList) .flatMapMany(Flux::fromIterable) - .map(connector -> Tuples.of(connect.getName(), connector)); + .map(connector -> Tuples.of(connectName, connector)); } @SneakyThrows @@ -147,18 +149,32 @@ public class KafkaConnectService { } public Mono createConnector(KafkaCluster cluster, String connectName, - Mono connector) { + Mono connector) { return getConnectAddress(cluster, connectName) - .flatMap(connect -> + .flatMap(connectUrl -> connector + .flatMap(c -> connectorExists(cluster, connectName, c.getName()) + .map(exists -> { + if (exists) { + throw new ValidationException( + String.format("Connector with name %s already exists", c.getName())); + } + return c; + })) .map(kafkaConnectMapper::toClient) - .flatMap(c -> - KafkaConnectClients.withBaseUrl(connect).createConnector(c) - ) + .flatMap(c -> KafkaConnectClients.withBaseUrl(connectUrl).createConnector(c)) .flatMap(c -> getConnector(cluster, connectName, c.getName())) ); } + private Mono connectorExists(KafkaCluster cluster, String connectName, + String connectorName) { + return getConnectorNames(cluster, connectName) + .map(Tuple2::getT2) + .collectList() + .map(connectorNames -> connectorNames.contains(connectorName)); + } + public Mono getConnector(KafkaCluster cluster, String connectName, String connectorName) { return getConnectAddress(cluster, connectName) @@ -166,6 +182,9 @@ public class KafkaConnectService { .map(kafkaConnectMapper::fromClient) .flatMap(connector -> KafkaConnectClients.withBaseUrl(connect).getConnectorStatus(connector.getName()) + // status request can return 404 if tasks not assigned yet + .onErrorResume(WebClientResponseException.NotFound.class, + e -> emptyStatus(connectorName)) .map(connectorStatus -> { var status = connectorStatus.getConnector(); ConnectorDTO result = (ConnectorDTO) new ConnectorDTO() @@ -191,6 +210,14 @@ public class KafkaConnectService { ); } + private Mono emptyStatus(String connectorName) { + return Mono.just(new ConnectorStatus() + .name(connectorName) + .tasks(List.of()) + .connector(new ConnectorStatusConnector() + .state(ConnectorStatusConnector.StateEnum.UNASSIGNED))); + } + public Mono> getConnectorConfig(KafkaCluster cluster, String connectName, String connectorName) { return getConnectAddress(cluster, connectName) @@ -247,10 +274,12 @@ public class KafkaConnectService { return getConnectAddress(cluster, connectName) .flatMapMany(connect -> KafkaConnectClients.withBaseUrl(connect).getConnectorTasks(connectorName) + .onErrorResume(WebClientResponseException.NotFound.class, e -> Flux.empty()) .map(kafkaConnectMapper::fromClient) .flatMap(task -> KafkaConnectClients.withBaseUrl(connect) .getConnectorTaskStatus(connectorName, task.getId().getTask()) + .onErrorResume(WebClientResponseException.NotFound.class, e -> Mono.empty()) .map(kafkaConnectMapper::fromClient) .map(task::status) ) @@ -286,12 +315,6 @@ public class KafkaConnectService { ); } - private Mono getCluster(String clusterName) { - return clustersStorage.getClusterByName(clusterName) - .map(Mono::just) - .orElse(Mono.error(ClusterNotFoundException::new)); - } - private Mono getConnectAddress(KafkaCluster cluster, String connectName) { return Mono.justOrEmpty(cluster.getKafkaConnect().stream() .filter(connect -> connect.getName().equals(connectName)) diff --git a/kafka-ui-api/src/test/java/com/provectus/kafka/ui/KafkaConnectServiceTests.java b/kafka-ui-api/src/test/java/com/provectus/kafka/ui/KafkaConnectServiceTests.java index 1314eb4517..8ab59e9a16 100644 --- a/kafka-ui-api/src/test/java/com/provectus/kafka/ui/KafkaConnectServiceTests.java +++ b/kafka-ui-api/src/test/java/com/provectus/kafka/ui/KafkaConnectServiceTests.java @@ -390,4 +390,22 @@ public class KafkaConnectServiceTests extends AbstractBaseTest { ); }); } + + @Test + public void shouldReturn400WhenTryingToCreateConnectorWithExistingName() { + webTestClient.post() + .uri("/api/clusters/{clusterName}/connects/{connectName}/connectors", LOCAL, connectName) + .bodyValue(new NewConnectorDTO() + .name(connectorName) + .config(Map.of( + "connector.class", "org.apache.kafka.connect.file.FileStreamSinkConnector", + "tasks.max", "1", + "topics", "output-topic", + "file", "/tmp/test" + )) + ) + .exchange() + .expectStatus() + .isBadRequest(); + } }