Selaa lähdekoodia

Fix hanging schema delete (#1360)

* Fix hanging schema delete

Co-authored-by: Ilya Kuramshin <ilia-2k@rambler.ru>
Co-authored-by: Jonas Geiregat (31198) <jonas.geiregat@tvh.com>
Co-authored-by: Roman Zabaluev <rzabaluev@provectus.com>
Jonas Geiregat 3 vuotta sitten
vanhempi
commit
ce8627ea59

+ 2 - 1
kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/SchemasController.java

@@ -69,7 +69,8 @@ public class SchemasController extends AbstractController implements SchemasApi
   @Override
   public Mono<ResponseEntity<Void>> deleteSchema(
       String clusterName, String subjectName, ServerWebExchange exchange) {
-    return schemaRegistryService.deleteSchemaSubjectEntirely(getCluster(clusterName), subjectName);
+    return schemaRegistryService.deleteSchemaSubjectEntirely(getCluster(clusterName), subjectName)
+            .thenReturn(ResponseEntity.ok().build());
   }
 
   @Override

+ 3 - 2
kafka-ui-api/src/main/java/com/provectus/kafka/ui/exception/ErrorCode.java

@@ -25,7 +25,8 @@ public enum ErrorCode {
   DIR_NOT_FOUND(4012, HttpStatus.BAD_REQUEST),
   TOPIC_OR_PARTITION_NOT_FOUND(4013, HttpStatus.BAD_REQUEST),
   INVALID_REQUEST(4014, HttpStatus.BAD_REQUEST),
-  RECREATE_TOPIC_TIMEOUT(4015, HttpStatus.REQUEST_TIMEOUT);
+  RECREATE_TOPIC_TIMEOUT(4015, HttpStatus.REQUEST_TIMEOUT),
+  SCHEMA_NOT_DELETED(4015, HttpStatus.INTERNAL_SERVER_ERROR);
 
   static {
     // codes uniqueness check
@@ -33,7 +34,7 @@ public enum ErrorCode {
     for (ErrorCode value : ErrorCode.values()) {
       if (!codes.add(value.code())) {
         LoggerFactory.getLogger(ErrorCode.class)
-            .warn("Multiple {} values refer to code {}", ErrorCode.class, value.code);
+                .warn("Multiple {} values refer to code {}", ErrorCode.class, value.code);
       }
     }
   }

+ 13 - 0
kafka-ui-api/src/main/java/com/provectus/kafka/ui/exception/SchemaFailedToDeleteException.java

@@ -0,0 +1,13 @@
+package com.provectus.kafka.ui.exception;
+
+public class SchemaFailedToDeleteException extends CustomBaseException {
+
+  public SchemaFailedToDeleteException(String schemaName) {
+    super(String.format("Unable to delete schema with name %s", schemaName));
+  }
+
+  @Override
+  public ErrorCode getErrorCode() {
+    return ErrorCode.SCHEMA_NOT_DELETED;
+  }
+}

+ 15 - 4
kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/SchemaRegistryService.java

@@ -4,6 +4,7 @@ import static org.springframework.http.HttpStatus.NOT_FOUND;
 import static org.springframework.http.HttpStatus.UNPROCESSABLE_ENTITY;
 
 import com.provectus.kafka.ui.exception.DuplicateEntityException;
+import com.provectus.kafka.ui.exception.SchemaFailedToDeleteException;
 import com.provectus.kafka.ui.exception.SchemaNotFoundException;
 import com.provectus.kafka.ui.exception.SchemaTypeIsNotSupportedException;
 import com.provectus.kafka.ui.exception.UnprocessableEntityException;
@@ -32,6 +33,7 @@ import lombok.extern.slf4j.Slf4j;
 import org.jetbrains.annotations.NotNull;
 import org.springframework.http.HttpHeaders;
 import org.springframework.http.HttpMethod;
+import org.springframework.http.HttpStatus;
 import org.springframework.http.MediaType;
 import org.springframework.http.ResponseEntity;
 import org.springframework.stereotype.Service;
@@ -156,16 +158,16 @@ public class SchemaRegistryService {
         ).toBodilessEntity();
   }
 
-  public Mono<ResponseEntity<Void>> deleteSchemaSubjectEntirely(KafkaCluster cluster,
+  public Mono<Void> deleteSchemaSubjectEntirely(KafkaCluster cluster,
                                                                 String schemaName) {
     return configuredWebClient(
         cluster,
         HttpMethod.DELETE,
         URL_SUBJECT, schemaName)
         .retrieve()
-        .onStatus(NOT_FOUND::equals,
-            throwIfNotFoundStatus(formatted(NO_SUCH_SCHEMA, schemaName)))
-        .toBodilessEntity();
+        .onStatus(HttpStatus::isError, errorOnSchemaDeleteFailure(schemaName))
+        .toBodilessEntity()
+        .then();
   }
 
   /**
@@ -335,4 +337,13 @@ public class SchemaRegistryService {
   private boolean isUnrecognizedFieldSchemaTypeMessage(String errorMessage) {
     return errorMessage.contains(UNRECOGNIZED_FIELD_SCHEMA_TYPE);
   }
+
+  private Function<ClientResponse, Mono<? extends Throwable>> errorOnSchemaDeleteFailure(String schemaName) {
+    return resp -> {
+      if (NOT_FOUND.equals(resp.statusCode())) {
+        return Mono.error(new SchemaNotFoundException(schemaName));
+      }
+      return Mono.error(new SchemaFailedToDeleteException(schemaName));
+    };
+  }
 }