소스 검색

Issue-181 Refactor schema CRUD API (#192)

* Rename attribute 'schemaName' to 'subject' for consistency with Schema Registry terms

* Refactor 'GET /schemas' and 'GET .../schemas/{subject}/versions' to get the latest schemas data at once

* Fix getting error code from attributes in our custom GlobalErrorWebExceptionHandler
Ildar Almakaev 4 년 전
부모
커밋
6ec516345e

+ 1 - 1
kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/exception/GlobalErrorWebExceptionHandler.java

@@ -38,7 +38,7 @@ public class GlobalErrorWebExceptionHandler extends AbstractErrorWebExceptionHan
     private Mono<ServerResponse> renderErrorResponse(ServerRequest request) {
         Map<String, Object> errorAttributes = getErrorAttributes(request, false);
         HttpStatus statusCode = Optional.ofNullable(errorAttributes.get(GlobalErrorAttributes.STATUS))
-                .map(code -> (HttpStatus) code)
+                .map(code -> code instanceof Integer ? HttpStatus.valueOf((Integer) code) : (HttpStatus) code)
                 .orElse(HttpStatus.BAD_REQUEST);
         return ServerResponse
                 .status(statusCode)

+ 22 - 5
kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/service/SchemaRegistryService.java

@@ -12,6 +12,8 @@ import com.provectus.kafka.ui.model.SchemaSubject;
 import java.util.Formatter;
 import lombok.RequiredArgsConstructor;
 import lombok.extern.log4j.Log4j2;
+import org.springframework.core.ParameterizedTypeReference;
+import org.springframework.http.HttpEntity;
 import org.springframework.http.HttpStatus;
 import org.springframework.http.MediaType;
 import org.springframework.http.ResponseEntity;
@@ -21,6 +23,8 @@ import org.springframework.web.reactive.function.client.WebClient;
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
 
+import java.util.Arrays;
+import java.util.List;
 import java.util.Objects;
 
 @Service
@@ -37,17 +41,30 @@ public class SchemaRegistryService {
     private final ClusterMapper mapper;
     private final WebClient webClient;
 
-    public Flux<String> getAllSchemaSubjects(String clusterName) {
+    public Flux<SchemaSubject> getAllLatestVersionSchemas(String clusterName) {
+        var allSubjectNames = getAllSubjectNames(clusterName);
+        return allSubjectNames
+                .flatMapMany(Flux::fromArray)
+                .flatMap(subject -> getLatestSchemaSubject(clusterName, subject));
+    }
+
+    public Mono<String[]> getAllSubjectNames(String clusterName) {
         return clustersStorage.getClusterByName(clusterName)
                 .map(cluster -> webClient.get()
                         .uri(cluster.getSchemaRegistry() + URL_SUBJECTS)
                         .retrieve()
-                        .bodyToFlux(String.class)
-                        .doOnError(log::error))
-                .orElse(Flux.error(new NotFoundException("No such cluster")));
+                        .bodyToMono(String[].class)
+                        .doOnError(log::error)
+                )
+                .orElse(Mono.error(new NotFoundException("No such cluster")));
+    }
+
+    public Flux<SchemaSubject> getAllVersionsBySubject(String clusterName, String subject) {
+        Flux<Integer> versions = getSubjectVersions(clusterName, subject);
+        return versions.flatMap(version -> getSchemaSubjectByVersion(clusterName, subject, version));
     }
 
-    public Flux<Integer> getSchemaSubjectVersions(String clusterName, String schemaName) {
+    private Flux<Integer> getSubjectVersions(String clusterName, String schemaName) {
         return clustersStorage.getClusterByName(clusterName)
                 .map(cluster -> webClient.get()
                         .uri(cluster.getSchemaRegistry() + URL_SUBJECT_VERSIONS, schemaName)

+ 18 - 17
kafka-ui-api/src/main/java/com/provectus/kafka/ui/rest/MetricsRestController.java

@@ -105,29 +105,30 @@ public class MetricsRestController implements ApiClustersApi {
     }
 
     @Override
-    public Mono<ResponseEntity<SchemaSubject>> getLatestSchema(String clusterName, String schemaName, ServerWebExchange exchange) {
-        return schemaRegistryService.getLatestSchemaSubject(clusterName, schemaName).map(ResponseEntity::ok);
+    public Mono<ResponseEntity<SchemaSubject>> getLatestSchema(String clusterName, String subject, ServerWebExchange exchange) {
+        return schemaRegistryService.getLatestSchemaSubject(clusterName, subject).map(ResponseEntity::ok);
     }
 
     @Override
-    public Mono<ResponseEntity<SchemaSubject>> getSchemaByVersion(String clusterName, String schemaName, Integer version, ServerWebExchange exchange) {
-        return schemaRegistryService.getSchemaSubjectByVersion(clusterName, schemaName, version).map(ResponseEntity::ok);
+    public Mono<ResponseEntity<SchemaSubject>> getSchemaByVersion(String clusterName, String subject, Integer version, ServerWebExchange exchange) {
+        return schemaRegistryService.getSchemaSubjectByVersion(clusterName, subject, version).map(ResponseEntity::ok);
     }
 
     @Override
-    public Mono<ResponseEntity<Flux<String>>> getSchemas(String clusterName, ServerWebExchange exchange) {
-        Flux<String> subjects = schemaRegistryService.getAllSchemaSubjects(clusterName);
+    public Mono<ResponseEntity<Flux<SchemaSubject>>> getSchemas(String clusterName, ServerWebExchange exchange) {
+        Flux<SchemaSubject> subjects = schemaRegistryService.getAllLatestVersionSchemas(clusterName);
         return Mono.just(ResponseEntity.ok(subjects));
     }
 
     @Override
-    public Mono<ResponseEntity<Flux<Integer>>> getSchemaVersions(String clusterName, String subjectName, ServerWebExchange exchange) {
-        return Mono.just(ResponseEntity.ok(schemaRegistryService.getSchemaSubjectVersions(clusterName, subjectName)));
+    public Mono<ResponseEntity<Flux<SchemaSubject>>> getAllVersionsBySubject(String clusterName, String subjectName, ServerWebExchange exchange) {
+        Flux<SchemaSubject> schemas = schemaRegistryService.getAllVersionsBySubject(clusterName, subjectName);
+        return Mono.just(ResponseEntity.ok(schemas));
     }
 
     @Override
-    public Mono<ResponseEntity<Void>> deleteLatestSchema(String clusterName, String schemaName, ServerWebExchange exchange) {
-        return schemaRegistryService.deleteLatestSchemaSubject(clusterName, schemaName);
+    public Mono<ResponseEntity<Void>> deleteLatestSchema(String clusterName, String subject, ServerWebExchange exchange) {
+        return schemaRegistryService.deleteLatestSchemaSubject(clusterName, subject);
     }
 
     @Override
@@ -141,10 +142,10 @@ public class MetricsRestController implements ApiClustersApi {
     }
 
     @Override
-    public Mono<ResponseEntity<SchemaSubject>> createNewSchema(String clusterName, String schemaName,
+    public Mono<ResponseEntity<SchemaSubject>> createNewSchema(String clusterName, String subject,
                                                                @Valid Mono<NewSchemaSubject> newSchemaSubject,
                                                                ServerWebExchange exchange) {
-        return schemaRegistryService.createNewSubject(clusterName, schemaName, newSchemaSubject);
+        return schemaRegistryService.createNewSubject(clusterName, subject, newSchemaSubject);
     }
 
     @Override
@@ -172,17 +173,17 @@ public class MetricsRestController implements ApiClustersApi {
     }
 
     @Override
-    public Mono<ResponseEntity<CompatibilityCheckResponse>> checkSchemaCompatibility(String clusterName, String schemaName,
+    public Mono<ResponseEntity<CompatibilityCheckResponse>> checkSchemaCompatibility(String clusterName, String subject,
                                                                                      @Valid Mono<NewSchemaSubject> newSchemaSubject,
                                                                                      ServerWebExchange exchange) {
-        return schemaRegistryService.checksSchemaCompatibility(clusterName, schemaName, newSchemaSubject)
+        return schemaRegistryService.checksSchemaCompatibility(clusterName, subject, newSchemaSubject)
                 .map(ResponseEntity::ok);
     }
 
     @Override
-    public Mono<ResponseEntity<Void>> updateSchemaCompatibilityLevel(String clusterName, String schemaName, @Valid Mono<CompatibilityLevel> compatibilityLevel, ServerWebExchange exchange) {
-        log.info("Updating schema compatibility for schema: {}", schemaName);
-        return schemaRegistryService.updateSchemaCompatibility(clusterName, schemaName, compatibilityLevel)
+    public Mono<ResponseEntity<Void>> updateSchemaCompatibilityLevel(String clusterName, String subject, @Valid Mono<CompatibilityLevel> compatibilityLevel, ServerWebExchange exchange) {
+        log.info("Updating schema compatibility for subject: {}", subject);
+        return schemaRegistryService.updateSchemaCompatibility(clusterName, subject, compatibilityLevel)
                 .map(ResponseEntity::ok);
     }
 

+ 45 - 24
kafka-ui-api/src/test/java/com/provectus/kafka/ui/SchemaRegistryServiceTests.java

@@ -5,6 +5,7 @@ import com.provectus.kafka.ui.model.SchemaSubject;
 import lombok.extern.log4j.Log4j2;
 import lombok.val;
 import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.boot.test.autoconfigure.web.reactive.AutoConfigureWebTestClient;
@@ -23,6 +24,12 @@ import java.util.UUID;
 class SchemaRegistryServiceTests extends AbstractBaseTest {
     @Autowired
     WebTestClient webTestClient;
+    String subject;
+
+    @BeforeEach
+    void setUpBefore() {
+        this.subject = UUID.randomUUID().toString();
+    }
 
     @Test
     public void should404WhenGetAllSchemasForUnknownCluster() {
@@ -34,11 +41,11 @@ class SchemaRegistryServiceTests extends AbstractBaseTest {
     }
 
     @Test
-    void shouldReturn404WhenGetLatestSchemaByNonExistingSchemaName() {
+    void shouldReturn404WhenGetLatestSchemaByNonExistingSubject() {
         String unknownSchema = "unknown-schema";
         webTestClient
                 .get()
-                .uri("http://localhost:8080/api/clusters/local/schemas/{schemaName}/latest", unknownSchema)
+                .uri("http://localhost:8080/api/clusters/local/schemas/{subject}/latest", unknownSchema)
                 .exchange()
                 .expectStatus().isNotFound();
     }
@@ -59,49 +66,51 @@ class SchemaRegistryServiceTests extends AbstractBaseTest {
     }
 
     @Test
-    public void shouldReturnNotNullResponseWhenGetAllSchemas() {
+    public void shouldReturnNotEmptyResponseWhenGetAllSchemas() {
+        createNewSubjectAndAssert(subject);
+
         webTestClient
                 .get()
                 .uri("http://localhost:8080/api/clusters/local/schemas")
                 .exchange()
                 .expectStatus().isOk()
-                .expectBodyList(String.class)
+                .expectBodyList(SchemaSubject.class)
                 .consumeWith(result -> {
-                    List<String> responseBody = result.getResponseBody();
-                    Assertions.assertNotNull(responseBody);
+                    List<SchemaSubject> responseBody = result.getResponseBody();
                     log.info("Response of test schemas: {}", responseBody);
+                    Assertions.assertNotNull(responseBody);
+                    Assertions.assertFalse(responseBody.isEmpty());
+
+                    SchemaSubject actualSchemaSubject = responseBody.stream()
+                            .filter(schemaSubject -> subject.equals(schemaSubject.getSubject()))
+                            .findFirst()
+                            .orElseThrow();
+                    Assertions.assertNotNull(actualSchemaSubject.getId());
+                    Assertions.assertNotNull(actualSchemaSubject.getVersion());
+                    Assertions.assertNotNull(actualSchemaSubject.getCompatibilityLevel());
+                    Assertions.assertEquals("\"string\"", actualSchemaSubject.getSchema());
                 });
     }
 
     @Test
     public void shouldOkWhenCreateNewSchemaThenGetAndUpdateItsCompatibilityLevel() {
-        String schemaName = UUID.randomUUID().toString();
-        // Create a new schema
-        webTestClient
-                .post()
-                .uri("http://localhost:8080/api/clusters/local/schemas/{schemaName}", schemaName)
-                .contentType(MediaType.APPLICATION_JSON)
-                .body(BodyInserters.fromValue("{\"schema\":\"{\\\"type\\\": \\\"string\\\"}\"}"))
-                .exchange()
-                .expectStatus().isOk()
-                .expectBody(SchemaSubject.class)
-                .consumeWith(this::assertResponseBodyWhenCreateNewSchema);
+        createNewSubjectAndAssert(subject);
 
         //Get the created schema and check its items
         webTestClient
                 .get()
-                .uri("http://localhost:8080/api/clusters/local/schemas/{schemaName}/latest", schemaName)
+                .uri("http://localhost:8080/api/clusters/local/schemas/{subject}/latest", subject)
                 .exchange()
                 .expectStatus().isOk()
                 .expectBodyList(SchemaSubject.class)
                 .consumeWith(listEntityExchangeResult -> {
                     val expectedCompatibility = CompatibilityLevel.CompatibilityEnum.BACKWARD;
-                    assertSchemaWhenGetLatest(schemaName, listEntityExchangeResult, expectedCompatibility);
+                    assertSchemaWhenGetLatest(subject, listEntityExchangeResult, expectedCompatibility);
                 });
 
         //Now let's change compatibility level of this schema to FULL whereas the global level should be BACKWARD
         webTestClient.put()
-                .uri("http://localhost:8080/api/clusters/local/schemas/{schemaName}/compatibility", schemaName)
+                .uri("http://localhost:8080/api/clusters/local/schemas/{subject}/compatibility", subject)
                 .contentType(MediaType.APPLICATION_JSON)
                 .body(BodyInserters.fromValue("{\"compatibility\":\"FULL\"}"))
                 .exchange()
@@ -110,23 +119,35 @@ class SchemaRegistryServiceTests extends AbstractBaseTest {
         //Get one more time to check the schema compatibility level is changed to FULL
         webTestClient
                 .get()
-                .uri("http://localhost:8080/api/clusters/local/schemas/{schemaName}/latest", schemaName)
+                .uri("http://localhost:8080/api/clusters/local/schemas/{subject}/latest", subject)
                 .exchange()
                 .expectStatus().isOk()
                 .expectBodyList(SchemaSubject.class)
                 .consumeWith(listEntityExchangeResult -> {
                     val expectedCompatibility = CompatibilityLevel.CompatibilityEnum.FULL;
-                    assertSchemaWhenGetLatest(schemaName, listEntityExchangeResult, expectedCompatibility);
+                    assertSchemaWhenGetLatest(subject, listEntityExchangeResult, expectedCompatibility);
                 });
     }
 
-    private void assertSchemaWhenGetLatest(String schemaName, EntityExchangeResult<List<SchemaSubject>> listEntityExchangeResult, CompatibilityLevel.CompatibilityEnum expectedCompatibility) {
+    private void createNewSubjectAndAssert(String subject) {
+        webTestClient
+                .post()
+                .uri("http://localhost:8080/api/clusters/local/schemas/{subject}", subject)
+                .contentType(MediaType.APPLICATION_JSON)
+                .body(BodyInserters.fromValue("{\"schema\":\"{\\\"type\\\": \\\"string\\\"}\"}"))
+                .exchange()
+                .expectStatus().isOk()
+                .expectBody(SchemaSubject.class)
+                .consumeWith(this::assertResponseBodyWhenCreateNewSchema);
+    }
+
+    private void assertSchemaWhenGetLatest(String subject, EntityExchangeResult<List<SchemaSubject>> listEntityExchangeResult, CompatibilityLevel.CompatibilityEnum expectedCompatibility) {
         List<SchemaSubject> responseBody = listEntityExchangeResult.getResponseBody();
         Assertions.assertNotNull(responseBody);
         Assertions.assertEquals(1, responseBody.size());
         SchemaSubject actualSchema = responseBody.get(0);
         Assertions.assertNotNull(actualSchema);
-        Assertions.assertEquals(schemaName, actualSchema.getSubject());
+        Assertions.assertEquals(subject, actualSchema.getSubject());
         Assertions.assertEquals("\"string\"", actualSchema.getSchema());
 
         Assertions.assertNotNull(actualSchema.getCompatibilityLevel());

+ 20 - 20
kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml

@@ -339,7 +339,7 @@ paths:
     get:
       tags:
         - /api/clusters
-      summary: get all schemas from Schema Registry service
+      summary: get all schemas of latest version from Schema Registry service
       operationId: getSchemas
       parameters:
         - name: clusterName
@@ -355,9 +355,9 @@ paths:
               schema:
                 type: array
                 items:
-                  type: string
+                  $ref: '#/components/schemas/SchemaSubject'
 
-  /api/clusters/{clusterName}/schemas/{schemaName}:
+  /api/clusters/{clusterName}/schemas/{subject}:
     post:
       tags:
         - /api/clusters
@@ -369,7 +369,7 @@ paths:
           required: true
           schema:
             type: string
-        - name: schemaName
+        - name: subject
           in: path
           required: true
           schema:
@@ -399,7 +399,7 @@ paths:
           required: true
           schema:
             type: string
-        - name: schemaName
+        - name: subject
           in: path
           required: true
           schema:
@@ -410,19 +410,19 @@ paths:
         404:
           description: Not found
 
-  /api/clusters/{clusterName}/schemas/{schemaName}/versions:
+  /api/clusters/{clusterName}/schemas/{subject}/versions:
     get:
       tags:
         - /api/clusters
-      summary: get all version of schema from Schema Registry service
-      operationId: getSchemaVersions
+      summary: get all version of subject from Schema Registry service
+      operationId: getAllVersionsBySubject
       parameters:
         - name: clusterName
           in: path
           required: true
           schema:
             type: string
-        - name: schemaName
+        - name: subject
           in: path
           required: true
           schema:
@@ -435,9 +435,9 @@ paths:
               schema:
                 type: array
                 items:
-                  type: integer
+                  $ref: '#/components/schemas/SchemaSubject'
 
-  /api/clusters/{clusterName}/schemas/{schemaName}/latest:
+  /api/clusters/{clusterName}/schemas/{subject}/latest:
     get:
       tags:
         - /api/clusters
@@ -449,7 +449,7 @@ paths:
           required: true
           schema:
             type: string
-        - name: schemaName
+        - name: subject
           in: path
           required: true
           schema:
@@ -472,7 +472,7 @@ paths:
           required: true
           schema:
             type: string
-        - name: schemaName
+        - name: subject
           in: path
           required: true
           schema:
@@ -484,7 +484,7 @@ paths:
           description: Not found
 
 
-  /api/clusters/{clusterName}/schemas/{schemaName}/versions/{version}:
+  /api/clusters/{clusterName}/schemas/{subject}/versions/{version}:
     get:
       tags:
         - /api/clusters
@@ -496,7 +496,7 @@ paths:
           required: true
           schema:
             type: string
-        - name: schemaName
+        - name: subject
           in: path
           required: true
           schema:
@@ -524,7 +524,7 @@ paths:
           required: true
           schema:
             type: string
-        - name: schemaName
+        - name: subject
           in: path
           required: true
           schema:
@@ -581,7 +581,7 @@ paths:
         404:
           description: Not Found
 
-  /api/clusters/{clusterName}/schemas/{schemaName}/compatibility:
+  /api/clusters/{clusterName}/schemas/{subject}/compatibility:
     put:
       tags:
         - /api/clusters
@@ -593,7 +593,7 @@ paths:
           required: true
           schema:
             type: string
-        - name: schemaName
+        - name: subject
           in: path
           required: true
           schema:
@@ -609,7 +609,7 @@ paths:
         404:
           description: Not Found
 
-  /api/clusters/{clusterName}/schemas/{schemaName}/check:
+  /api/clusters/{clusterName}/schemas/{subject}/check:
     post:
       tags:
         - /api/clusters
@@ -621,7 +621,7 @@ paths:
           required: true
           schema:
             type: string
-        - name: schemaName
+        - name: subject
           in: path
           required: true
           schema: