Browse Source

Handling for 404 responses from connect added (#1025)

* handling for 404 responses from connect added
* connector name uniqness check added (#1026)
Ilya Kuramshin 3 years ago
parent
commit
c5e5717a98

+ 15 - 11
kafka-ui-api/src/main/java/com/provectus/kafka/ui/client/RetryingKafkaConnectClient.java

@@ -4,8 +4,9 @@ import com.provectus.kafka.ui.connect.ApiClient;
 import com.provectus.kafka.ui.connect.api.KafkaConnectClientApi;
 import com.provectus.kafka.ui.connect.model.Connector;
 import com.provectus.kafka.ui.connect.model.NewConnector;
-import com.provectus.kafka.ui.exception.RebalanceInProgressException;
+import com.provectus.kafka.ui.exception.KafkaConnectConflictReponseException;
 import com.provectus.kafka.ui.exception.ValidationException;
+import java.time.Duration;
 import java.util.List;
 import java.util.Map;
 import lombok.extern.log4j.Log4j2;
@@ -19,29 +20,32 @@ import org.springframework.web.reactive.function.client.WebClientResponseExcepti
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
 import reactor.util.retry.Retry;
+import reactor.util.retry.RetryBackoffSpec;
 
 @Log4j2
 public class RetryingKafkaConnectClient extends KafkaConnectClientApi {
   private static final int MAX_RETRIES = 5;
+  private static final Duration RETRIES_DELAY = Duration.ofMillis(200);
 
   public RetryingKafkaConnectClient(String basePath) {
     super(new RetryingApiClient().setBasePath(basePath));
   }
 
+  private static Retry conflictCodeRetry() {
+    return RetryBackoffSpec
+        .fixedDelay(MAX_RETRIES, RETRIES_DELAY)
+        .filter(e -> e instanceof WebClientResponseException.Conflict)
+        .onRetryExhaustedThrow((spec, signal) ->
+            new KafkaConnectConflictReponseException(
+                (WebClientResponseException.Conflict) signal.failure()));
+  }
+
   private static <T> Mono<T> withRetryOnConflict(Mono<T> publisher) {
-    return publisher.retryWhen(
-            Retry.max(MAX_RETRIES).filter(e -> e instanceof WebClientResponseException.Conflict))
-        .onErrorResume(WebClientResponseException.Conflict.class,
-            e -> Mono.error(new RebalanceInProgressException()))
-        .doOnError(log::error);
+    return publisher.retryWhen(conflictCodeRetry());
   }
 
   private static <T> Flux<T> withRetryOnConflict(Flux<T> publisher) {
-    return publisher.retryWhen(
-            Retry.max(MAX_RETRIES).filter(e -> e instanceof WebClientResponseException.Conflict))
-        .onErrorResume(WebClientResponseException.Conflict.class,
-            e -> Mono.error(new RebalanceInProgressException()))
-        .doOnError(log::error);
+    return publisher.retryWhen(conflictCodeRetry());
   }
 
   private static <T> Mono<T> withBadRequestErrorHandling(Mono<T> publisher) {

+ 1 - 1
kafka-ui-api/src/main/java/com/provectus/kafka/ui/exception/ErrorCode.java

@@ -13,7 +13,7 @@ public enum ErrorCode {
   INVALID_ENTITY_STATE(4001, HttpStatus.BAD_REQUEST),
   VALIDATION_FAIL(4002, HttpStatus.BAD_REQUEST),
   READ_ONLY_MODE_ENABLE(4003, HttpStatus.METHOD_NOT_ALLOWED),
-  REBALANCE_IN_PROGRESS(4004, HttpStatus.CONFLICT),
+  CONNECT_CONFLICT_RESPONSE(4004, HttpStatus.CONFLICT),
   DUPLICATED_ENTITY(4005, HttpStatus.CONFLICT),
   UNPROCESSABLE_ENTITY(4006, HttpStatus.UNPROCESSABLE_ENTITY),
   CLUSTER_NOT_FOUND(4007, HttpStatus.NOT_FOUND),

+ 17 - 0
kafka-ui-api/src/main/java/com/provectus/kafka/ui/exception/KafkaConnectConflictReponseException.java

@@ -0,0 +1,17 @@
+package com.provectus.kafka.ui.exception;
+
+
+import org.springframework.web.reactive.function.client.WebClientResponseException;
+
+public class KafkaConnectConflictReponseException extends CustomBaseException {
+
+  public KafkaConnectConflictReponseException(WebClientResponseException.Conflict e) {
+    super("Kafka Connect responded with 409 (Conflict) code. Response body: "
+        + e.getResponseBodyAsString());
+  }
+
+  @Override
+  public ErrorCode getErrorCode() {
+    return ErrorCode.CONNECT_CONFLICT_RESPONSE;
+  }
+}

+ 0 - 14
kafka-ui-api/src/main/java/com/provectus/kafka/ui/exception/RebalanceInProgressException.java

@@ -1,14 +0,0 @@
-package com.provectus.kafka.ui.exception;
-
-
-public class RebalanceInProgressException extends CustomBaseException {
-
-  public RebalanceInProgressException() {
-    super("Rebalance is in progress.");
-  }
-
-  @Override
-  public ErrorCode getErrorCode() {
-    return ErrorCode.REBALANCE_IN_PROGRESS;
-  }
-}

+ 40 - 17
kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/KafkaConnectService.java

@@ -3,10 +3,12 @@ package com.provectus.kafka.ui.service;
 import com.fasterxml.jackson.core.type.TypeReference;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.provectus.kafka.ui.client.KafkaConnectClients;
+import com.provectus.kafka.ui.connect.model.ConnectorStatus;
+import com.provectus.kafka.ui.connect.model.ConnectorStatusConnector;
 import com.provectus.kafka.ui.connect.model.ConnectorTopics;
 import com.provectus.kafka.ui.connect.model.TaskStatus;
-import com.provectus.kafka.ui.exception.ClusterNotFoundException;
 import com.provectus.kafka.ui.exception.ConnectNotFoundException;
+import com.provectus.kafka.ui.exception.ValidationException;
 import com.provectus.kafka.ui.mapper.ClusterMapper;
 import com.provectus.kafka.ui.mapper.KafkaConnectMapper;
 import com.provectus.kafka.ui.model.ConnectDTO;
@@ -32,6 +34,7 @@ import lombok.SneakyThrows;
 import lombok.extern.log4j.Log4j2;
 import org.apache.commons.lang3.StringUtils;
 import org.springframework.stereotype.Service;
+import org.springframework.web.reactive.function.client.WebClientResponseException;
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
 import reactor.util.function.Tuple2;
@@ -41,7 +44,6 @@ import reactor.util.function.Tuples;
 @Log4j2
 @RequiredArgsConstructor
 public class KafkaConnectService {
-  private final ClustersStorage clustersStorage;
   private final ClusterMapper clusterMapper;
   private final KafkaConnectMapper kafkaConnectMapper;
   private final ObjectMapper objectMapper;
@@ -60,7 +62,7 @@ public class KafkaConnectService {
                                                      final String search) {
     return getConnects(cluster)
         .flatMapMany(Function.identity())
-        .flatMap(connect -> getConnectorNames(cluster, connect))
+        .flatMap(connect -> getConnectorNames(cluster, connect.getName()))
         .flatMap(pair -> getConnector(cluster, pair.getT1(), pair.getT2()))
         .flatMap(connector ->
             getConnectorConfig(cluster, connector.getConnect(), connector.getName())
@@ -123,13 +125,13 @@ public class KafkaConnectService {
         );
   }
 
-  private Flux<Tuple2<String, String>> getConnectorNames(KafkaCluster cluster, ConnectDTO connect) {
-    return getConnectors(cluster, connect.getName())
+  private Flux<Tuple2<String, String>> getConnectorNames(KafkaCluster cluster, String connectName) {
+    return getConnectors(cluster, connectName)
         .collectList().map(e -> e.get(0))
         // for some reason `getConnectors` method returns the response as a single string
         .map(this::parseToList)
         .flatMapMany(Flux::fromIterable)
-        .map(connector -> Tuples.of(connect.getName(), connector));
+        .map(connector -> Tuples.of(connectName, connector));
   }
 
   @SneakyThrows
@@ -147,18 +149,32 @@ public class KafkaConnectService {
   }
 
   public Mono<ConnectorDTO> createConnector(KafkaCluster cluster, String connectName,
-                                         Mono<NewConnectorDTO> connector) {
+                                            Mono<NewConnectorDTO> connector) {
     return getConnectAddress(cluster, connectName)
-        .flatMap(connect ->
+        .flatMap(connectUrl ->
             connector
+                .flatMap(c -> connectorExists(cluster, connectName, c.getName())
+                    .map(exists -> {
+                      if (exists) {
+                        throw new ValidationException(
+                            String.format("Connector with name %s already exists", c.getName()));
+                      }
+                      return c;
+                    }))
                 .map(kafkaConnectMapper::toClient)
-                .flatMap(c ->
-                    KafkaConnectClients.withBaseUrl(connect).createConnector(c)
-                )
+                .flatMap(c -> KafkaConnectClients.withBaseUrl(connectUrl).createConnector(c))
                 .flatMap(c -> getConnector(cluster, connectName, c.getName()))
         );
   }
 
+  private Mono<Boolean> connectorExists(KafkaCluster cluster, String connectName,
+                                        String connectorName) {
+    return getConnectorNames(cluster, connectName)
+        .map(Tuple2::getT2)
+        .collectList()
+        .map(connectorNames -> connectorNames.contains(connectorName));
+  }
+
   public Mono<ConnectorDTO> getConnector(KafkaCluster cluster, String connectName,
                                       String connectorName) {
     return getConnectAddress(cluster, connectName)
@@ -166,6 +182,9 @@ public class KafkaConnectService {
             .map(kafkaConnectMapper::fromClient)
             .flatMap(connector ->
                 KafkaConnectClients.withBaseUrl(connect).getConnectorStatus(connector.getName())
+                    // status request can return 404 if tasks not assigned yet
+                    .onErrorResume(WebClientResponseException.NotFound.class,
+                        e -> emptyStatus(connectorName))
                     .map(connectorStatus -> {
                       var status = connectorStatus.getConnector();
                       ConnectorDTO result = (ConnectorDTO) new ConnectorDTO()
@@ -191,6 +210,14 @@ public class KafkaConnectService {
         );
   }
 
+  private Mono<ConnectorStatus> emptyStatus(String connectorName) {
+    return Mono.just(new ConnectorStatus()
+        .name(connectorName)
+        .tasks(List.of())
+        .connector(new ConnectorStatusConnector()
+            .state(ConnectorStatusConnector.StateEnum.UNASSIGNED)));
+  }
+
   public Mono<Map<String, Object>> getConnectorConfig(KafkaCluster cluster, String connectName,
                                                       String connectorName) {
     return getConnectAddress(cluster, connectName)
@@ -247,10 +274,12 @@ public class KafkaConnectService {
     return getConnectAddress(cluster, connectName)
         .flatMapMany(connect ->
             KafkaConnectClients.withBaseUrl(connect).getConnectorTasks(connectorName)
+                .onErrorResume(WebClientResponseException.NotFound.class, e -> Flux.empty())
                 .map(kafkaConnectMapper::fromClient)
                 .flatMap(task ->
                     KafkaConnectClients.withBaseUrl(connect)
                         .getConnectorTaskStatus(connectorName, task.getId().getTask())
+                        .onErrorResume(WebClientResponseException.NotFound.class, e -> Mono.empty())
                         .map(kafkaConnectMapper::fromClient)
                         .map(task::status)
                 )
@@ -286,12 +315,6 @@ public class KafkaConnectService {
         );
   }
 
-  private Mono<KafkaCluster> getCluster(String clusterName) {
-    return clustersStorage.getClusterByName(clusterName)
-        .map(Mono::just)
-        .orElse(Mono.error(ClusterNotFoundException::new));
-  }
-
   private Mono<String> getConnectAddress(KafkaCluster cluster, String connectName) {
     return Mono.justOrEmpty(cluster.getKafkaConnect().stream()
             .filter(connect -> connect.getName().equals(connectName))

+ 18 - 0
kafka-ui-api/src/test/java/com/provectus/kafka/ui/KafkaConnectServiceTests.java

@@ -390,4 +390,22 @@ public class KafkaConnectServiceTests extends AbstractBaseTest {
           );
         });
   }
+
+  @Test
+  public void shouldReturn400WhenTryingToCreateConnectorWithExistingName() {
+    webTestClient.post()
+        .uri("/api/clusters/{clusterName}/connects/{connectName}/connectors", LOCAL, connectName)
+        .bodyValue(new NewConnectorDTO()
+            .name(connectorName)
+            .config(Map.of(
+                "connector.class", "org.apache.kafka.connect.file.FileStreamSinkConnector",
+                "tasks.max", "1",
+                "topics", "output-topic",
+                "file", "/tmp/test"
+            ))
+        )
+        .exchange()
+        .expectStatus()
+        .isBadRequest();
+  }
 }