Browse Source

Return 409/Conflict error code if schema is duplicate. Change endpoint of createNewSchema method

Ildar Almakaev 4 years ago
parent
commit
705377612c

+ 15 - 0
kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/exception/DuplicateEntityException.java

@@ -0,0 +1,15 @@
+package com.provectus.kafka.ui.cluster.exception;
+
+import org.springframework.http.HttpStatus;
+
+public class DuplicateEntityException extends CustomBaseException{
+
+    public DuplicateEntityException(String message) {
+        super(message);
+    }
+
+    @Override
+    public HttpStatus getResponseStatusCode() {
+        return HttpStatus.CONFLICT;
+    }
+}

+ 2 - 1
kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/mapper/ClusterMapper.java

@@ -2,7 +2,8 @@ package com.provectus.kafka.ui.cluster.mapper;
 
 import com.provectus.kafka.ui.cluster.config.ClustersProperties;
 import com.provectus.kafka.ui.cluster.model.*;
-import com.provectus.kafka.ui.cluster.model.InternalCompatibilityCheck;
+import com.provectus.kafka.ui.cluster.model.schemaregistry.InternalCompatibilityCheck;
+import com.provectus.kafka.ui.cluster.model.schemaregistry.InternalCompatibilityLevel;
 import com.provectus.kafka.ui.model.*;
 import java.util.Properties;
 import org.mapstruct.Mapper;

+ 1 - 1
kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/model/InternalCompatibilityCheck.java → kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/model/schemaregistry/InternalCompatibilityCheck.java

@@ -1,4 +1,4 @@
-package com.provectus.kafka.ui.cluster.model;
+package com.provectus.kafka.ui.cluster.model.schemaregistry;
 
 import com.fasterxml.jackson.annotation.JsonProperty;
 import lombok.Data;

+ 1 - 1
kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/model/InternalCompatibilityLevel.java → kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/model/schemaregistry/InternalCompatibilityLevel.java

@@ -1,4 +1,4 @@
-package com.provectus.kafka.ui.cluster.model;
+package com.provectus.kafka.ui.cluster.model.schemaregistry;
 
 import lombok.Data;
 

+ 10 - 0
kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/model/schemaregistry/InternalNewSchema.java

@@ -0,0 +1,10 @@
+package com.provectus.kafka.ui.cluster.model.schemaregistry;
+
+import lombok.AllArgsConstructor;
+import lombok.Data;
+
+@Data
+@AllArgsConstructor
+public class InternalNewSchema {
+    private String schema;
+}

+ 58 - 14
kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/service/SchemaRegistryService.java

@@ -1,29 +1,36 @@
 package com.provectus.kafka.ui.cluster.service;
 
+import com.provectus.kafka.ui.cluster.exception.DuplicateEntityException;
 import com.provectus.kafka.ui.cluster.exception.NotFoundException;
 import com.provectus.kafka.ui.cluster.exception.UnprocessableEntityException;
 import com.provectus.kafka.ui.cluster.mapper.ClusterMapper;
 import com.provectus.kafka.ui.cluster.model.ClustersStorage;
-import com.provectus.kafka.ui.cluster.model.InternalCompatibilityCheck;
-import com.provectus.kafka.ui.cluster.model.InternalCompatibilityLevel;
+import com.provectus.kafka.ui.cluster.model.KafkaCluster;
+import com.provectus.kafka.ui.cluster.model.schemaregistry.InternalCompatibilityCheck;
+import com.provectus.kafka.ui.cluster.model.schemaregistry.InternalCompatibilityLevel;
+import com.provectus.kafka.ui.cluster.model.schemaregistry.InternalNewSchema;
 import com.provectus.kafka.ui.cluster.model.schemaregistry.SubjectIdResponse;
 import com.provectus.kafka.ui.model.CompatibilityCheckResponse;
 import com.provectus.kafka.ui.model.CompatibilityLevel;
 import com.provectus.kafka.ui.model.NewSchemaSubject;
 import com.provectus.kafka.ui.model.SchemaSubject;
-import java.util.Formatter;
 import lombok.RequiredArgsConstructor;
 import lombok.extern.log4j.Log4j2;
+import org.apache.commons.lang3.tuple.Pair;
 import org.jetbrains.annotations.NotNull;
+import org.springframework.http.HttpStatus;
 import org.springframework.http.MediaType;
+import org.springframework.http.ReactiveHttpOutputMessage;
 import org.springframework.http.ResponseEntity;
 import org.springframework.stereotype.Service;
+import org.springframework.web.reactive.function.BodyInserter;
 import org.springframework.web.reactive.function.BodyInserters;
 import org.springframework.web.reactive.function.client.ClientResponse;
 import org.springframework.web.reactive.function.client.WebClient;
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
 
+import java.util.Formatter;
 import java.util.Objects;
 import java.util.function.Function;
 
@@ -135,18 +142,47 @@ public class SchemaRegistryService {
                 .orElse(Mono.error(new NotFoundException("No such cluster")));
     }
 
-    public Mono<SchemaSubject> createNewSchema(String clusterName, String schemaName, Mono<NewSchemaSubject> newSchemaSubject) {
-        var response = clustersStorage.getClusterByName(clusterName)
-                .map(cluster -> webClient.post()
-                        .uri(cluster.getSchemaRegistry() + URL_SUBJECT_VERSIONS, schemaName)
-                        .contentType(MediaType.APPLICATION_JSON)
-                        .body(BodyInserters.fromPublisher(newSchemaSubject, NewSchemaSubject.class))
-                        .retrieve()
-                        .onStatus(UNPROCESSABLE_ENTITY::equals, r -> Mono.error(new UnprocessableEntityException("Invalid params")))
-                        .bodyToMono(SubjectIdResponse.class)
-                        .log())
+    public Mono<SchemaSubject> registerNewSchema(String clusterName, Mono<NewSchemaSubject> newSchemaSubject) {
+        return newSchemaSubject
+                .flatMap(schema -> {
+                    Mono<InternalNewSchema> newSchema = Mono.just(new InternalNewSchema(schema.getSchema()));
+                    String subject = schema.getSubject();
+                    return createNewSchema(clusterName, subject, newSchema);
+                });
+    }
+
+    private Mono<SchemaSubject> createNewSchema(String clusterName, String subject, Mono<InternalNewSchema> newSchemaSubject) {
+        return clustersStorage.getClusterByName(clusterName)
+                .map(KafkaCluster::getSchemaRegistry)
+                .map(schemaRegistry -> {
+                            Mono<SchemaSubject> checking = webClient.post()
+                                    .uri(schemaRegistry + URL_SUBJECT, subject)
+                                    .contentType(MediaType.APPLICATION_JSON)
+                                    .body(BodyInserters.fromPublisher(newSchemaSubject, InternalNewSchema.class))
+                                    .retrieve()
+                                    .onStatus(NOT_FOUND::equals, res -> Mono.empty())
+                                    .bodyToMono(SchemaSubject.class)
+                                    .handle((schema, handler) -> {
+                                        if(Objects.isNull(schema) && Objects.isNull(schema.getId())) {
+                                            handler.complete();
+                                        } else {
+                                            handler.error(new DuplicateEntityException("Such schema already exists"));
+                                        }
+                                    });
+//                    webClient.post()
+//                            .uri(schemaRegistry + URL_SUBJECT_VERSIONS, subject)
+//                            .contentType(MediaType.APPLICATION_JSON)
+//                            .body(BodyInserters.fromPublisher(newSchemaSubject, InternalNewSchema.class))
+//                            .retrieve()
+//                            .onStatus(UNPROCESSABLE_ENTITY::equals, r -> Mono.error(new UnprocessableEntityException("Invalid params")))
+//                            .bodyToMono(SubjectIdResponse.class)
+
+                            checking.subscribe(System.out::println);
+                            return checking;
+                        }
+                )
+//                .map(resp -> getLatestSchemaSubject(clusterName, subject))
                 .orElse(Mono.error(new NotFoundException("No such cluster")));
-        return response.then(getLatestSchemaSubject(clusterName, schemaName));
     }
 
     @NotNull
@@ -218,4 +254,12 @@ public class SchemaRegistryService {
     public String formatted(String str, Object... args) {
         return new Formatter().format(str, args).toString();
     }
+
+    public static String extractRecordType(String schema) {
+        if (schema.contains("record")) {
+            return "AVRO";
+        } else if (schema.contains("proto")) {
+            return "PROTO";
+        } else return schema;
+    }
 }

+ 2 - 2
kafka-ui-api/src/main/java/com/provectus/kafka/ui/rest/MetricsRestController.java

@@ -142,11 +142,11 @@ public class MetricsRestController implements ApiClustersApi {
     }
 
     @Override
-    public Mono<ResponseEntity<SchemaSubject>> createNewSchema(String clusterName, String subject,
+    public Mono<ResponseEntity<SchemaSubject>> createNewSchema(String clusterName,
                                                                @Valid Mono<NewSchemaSubject> newSchemaSubject,
                                                                ServerWebExchange exchange) {
         return schemaRegistryService
-                .createNewSchema(clusterName, subject, newSchemaSubject)
+                .registerNewSchema(clusterName, newSchemaSubject)
                 .map(ResponseEntity::ok);
     }
 

+ 28 - 26
kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml

@@ -336,58 +336,57 @@ paths:
                   $ref: '#/components/schemas/ConsumerGroup'
 
   /api/clusters/{clusterName}/schemas:
-    get:
+    post:
       tags:
         - /api/clusters
-      summary: get all schemas of latest version from Schema Registry service
-      operationId: getSchemas
+      summary: create a new subject schema
+      operationId: createNewSchema
       parameters:
         - name: clusterName
           in: path
           required: true
           schema:
             type: string
+      requestBody:
+        content:
+          application/json:
+            schema:
+              $ref: '#/components/schemas/NewSchemaSubject'
       responses:
         200:
-          description: OK
+          description: Ok
           content:
             application/json:
               schema:
-                type: array
-                items:
-                  $ref: '#/components/schemas/SchemaSubject'
-
-  /api/clusters/{clusterName}/schemas/{subject}:
-    post:
+                $ref: '#/components/schemas/SchemaSubject'
+        400:
+          description: Bad request
+        409:
+          description: Duplicate schema
+        422:
+          description: Invalid parameters
+    get:
       tags:
         - /api/clusters
-      summary: create a new subject schema
-      operationId: createNewSchema
+      summary: get all schemas of latest version from Schema Registry service
+      operationId: getSchemas
       parameters:
         - name: clusterName
           in: path
           required: true
           schema:
             type: string
-        - name: subject
-          in: path
-          required: true
-          schema:
-            type: string
-      requestBody:
-        content:
-          application/json:
-            schema:
-              $ref: '#/components/schemas/NewSchemaSubject'
       responses:
         200:
-          description: Updated
+          description: OK
           content:
             application/json:
               schema:
-                $ref: '#/components/schemas/SchemaSubject'
-        400:
-          description: Bad request
+                type: array
+                items:
+                  $ref: '#/components/schemas/SchemaSubject'
+
+  /api/clusters/{clusterName}/schemas/{subject}:
     delete:
       tags:
         - /api/clusters
@@ -988,9 +987,12 @@ components:
     NewSchemaSubject:
       type: object
       properties:
+        subject:
+          type: string
         schema:
           type: string
       required:
+        - subject
         - schema
 
     CompatibilityLevel: