Jelajahi Sumber

Fix submittin new subject or new version if subject already exists

Ildar Almakaev 4 tahun lalu
induk
melakukan
35736033a8

+ 29 - 33
kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/service/SchemaRegistryService.java

@@ -16,14 +16,10 @@ import com.provectus.kafka.ui.model.NewSchemaSubject;
 import com.provectus.kafka.ui.model.SchemaSubject;
 import lombok.RequiredArgsConstructor;
 import lombok.extern.log4j.Log4j2;
-import org.apache.commons.lang3.tuple.Pair;
 import org.jetbrains.annotations.NotNull;
-import org.springframework.http.HttpStatus;
 import org.springframework.http.MediaType;
-import org.springframework.http.ReactiveHttpOutputMessage;
 import org.springframework.http.ResponseEntity;
 import org.springframework.stereotype.Service;
-import org.springframework.web.reactive.function.BodyInserter;
 import org.springframework.web.reactive.function.BodyInserters;
 import org.springframework.web.reactive.function.client.ClientResponse;
 import org.springframework.web.reactive.function.client.WebClient;
@@ -55,7 +51,7 @@ public class SchemaRegistryService {
         var allSubjectNames = getAllSubjectNames(clusterName);
         return allSubjectNames
                 .flatMapMany(Flux::fromArray)
-                .flatMap(subject -> getLatestSchemaSubject(clusterName, subject));
+                .flatMap(subject -> getLatestSchemaVersionBySubject(clusterName, subject));
     }
 
     public Mono<String[]> getAllSubjectNames(String clusterName) {
@@ -89,7 +85,7 @@ public class SchemaRegistryService {
         return this.getSchemaSubject(clusterName, schemaName, String.valueOf(version));
     }
 
-    public Mono<SchemaSubject> getLatestSchemaSubject(String clusterName, String schemaName) {
+    public Mono<SchemaSubject> getLatestSchemaVersionBySubject(String clusterName, String schemaName) {
         return this.getSchemaSubject(clusterName, schemaName, LATEST);
     }
 
@@ -154,37 +150,37 @@ public class SchemaRegistryService {
     private Mono<SchemaSubject> createNewSchema(String clusterName, String subject, Mono<InternalNewSchema> newSchemaSubject) {
         return clustersStorage.getClusterByName(clusterName)
                 .map(KafkaCluster::getSchemaRegistry)
-                .map(schemaRegistry -> {
-                            Mono<SchemaSubject> checking = webClient.post()
-                                    .uri(schemaRegistry + URL_SUBJECT, subject)
-                                    .contentType(MediaType.APPLICATION_JSON)
-                                    .body(BodyInserters.fromPublisher(newSchemaSubject, InternalNewSchema.class))
-                                    .retrieve()
-                                    .onStatus(NOT_FOUND::equals, res -> Mono.empty())
-                                    .bodyToMono(SchemaSubject.class)
-                                    .handle((schema, handler) -> {
-                                        if(Objects.isNull(schema) && Objects.isNull(schema.getId())) {
-                                            handler.complete();
-                                        } else {
-                                            handler.error(new DuplicateEntityException("Such schema already exists"));
-                                        }
-                                    });
-//                    webClient.post()
-//                            .uri(schemaRegistry + URL_SUBJECT_VERSIONS, subject)
-//                            .contentType(MediaType.APPLICATION_JSON)
-//                            .body(BodyInserters.fromPublisher(newSchemaSubject, InternalNewSchema.class))
-//                            .retrieve()
-//                            .onStatus(UNPROCESSABLE_ENTITY::equals, r -> Mono.error(new UnprocessableEntityException("Invalid params")))
-//                            .bodyToMono(SubjectIdResponse.class)
-
-                            checking.subscribe(System.out::println);
-                            return checking;
-                        }
+                .map(schemaRegistryUrl -> checkSchemaOnDuplicate(subject, newSchemaSubject, schemaRegistryUrl)
+                        .flatMap(s -> submitNewSchema(subject, newSchemaSubject, schemaRegistryUrl))
+                        .flatMap(resp -> getLatestSchemaVersionBySubject(clusterName, subject))
                 )
-//                .map(resp -> getLatestSchemaSubject(clusterName, subject))
                 .orElse(Mono.error(new NotFoundException("No such cluster")));
     }
 
+    @NotNull
+    private Mono<SubjectIdResponse> submitNewSchema(String subject, Mono<InternalNewSchema> newSchemaSubject, String schemaRegistry) {
+        return webClient.post()
+                .uri(schemaRegistry + URL_SUBJECT_VERSIONS, subject)
+                .contentType(MediaType.APPLICATION_JSON)
+                .body(BodyInserters.fromPublisher(newSchemaSubject, InternalNewSchema.class))
+                .retrieve()
+                .onStatus(UNPROCESSABLE_ENTITY::equals, r -> Mono.error(new UnprocessableEntityException("Invalid params")))
+                .bodyToMono(SubjectIdResponse.class);
+    }
+
+    @NotNull
+    private Mono<SchemaSubject> checkSchemaOnDuplicate(String subject, Mono<InternalNewSchema> newSchemaSubject, String schemaRegistry) {
+        return webClient.post()
+                .uri(schemaRegistry + URL_SUBJECT, subject)
+                .contentType(MediaType.APPLICATION_JSON)
+                .body(BodyInserters.fromPublisher(newSchemaSubject, InternalNewSchema.class))
+                .retrieve()
+                .onStatus(NOT_FOUND::equals, res -> Mono.empty())
+                .bodyToMono(SchemaSubject.class)
+                .filter(s -> Objects.isNull(s.getId()))
+                .switchIfEmpty(Mono.error(new DuplicateEntityException("Such schema already exists")));
+    }
+
     @NotNull
     private Function<ClientResponse, Mono<? extends Throwable>> throwIfNotFoundStatus(String formatted) {
         return resp -> Mono.error(new NotFoundException(formatted));

+ 1 - 1
kafka-ui-api/src/main/java/com/provectus/kafka/ui/rest/MetricsRestController.java

@@ -106,7 +106,7 @@ public class MetricsRestController implements ApiClustersApi {
 
     @Override
     public Mono<ResponseEntity<SchemaSubject>> getLatestSchema(String clusterName, String subject, ServerWebExchange exchange) {
-        return schemaRegistryService.getLatestSchemaSubject(clusterName, subject).map(ResponseEntity::ok);
+        return schemaRegistryService.getLatestSchemaVersionBySubject(clusterName, subject).map(ResponseEntity::ok);
     }
 
     @Override