Explorar el Código

Refactor GET /schemas method to get whole schema data (version, schema, id and subject) instead of plain subject names

Ildar Almakaev hace 4 años
padre
commit
7776301cee

+ 16 - 4
kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/service/SchemaRegistryService.java

@@ -11,6 +11,8 @@ import com.provectus.kafka.ui.model.NewSchemaSubject;
 import com.provectus.kafka.ui.model.SchemaSubject;
 import lombok.RequiredArgsConstructor;
 import lombok.extern.log4j.Log4j2;
+import org.springframework.core.ParameterizedTypeReference;
+import org.springframework.http.HttpEntity;
 import org.springframework.http.HttpStatus;
 import org.springframework.http.MediaType;
 import org.springframework.http.ResponseEntity;
@@ -20,6 +22,8 @@ import org.springframework.web.reactive.function.client.WebClient;
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
 
+import java.util.Arrays;
+import java.util.List;
 import java.util.Objects;
 
 @Service
@@ -36,14 +40,22 @@ public class SchemaRegistryService {
     private final ClusterMapper mapper;
     private final WebClient webClient;
 
-    public Flux<String> getAllSchemaSubjects(String clusterName) {
+    public Flux<SchemaSubject> getAllLatestVersionSchemas(String clusterName) {
+        var allSubjectNames = getAllSubjectNames(clusterName);
+        return allSubjectNames
+                .flatMapMany(Flux::fromArray)
+                .flatMap(subject -> getLatestSchemaSubject(clusterName, subject));
+    }
+
+    public Mono<String[]> getAllSubjectNames(String clusterName) {
         return clustersStorage.getClusterByName(clusterName)
                 .map(cluster -> webClient.get()
                         .uri(cluster.getSchemaRegistry() + URL_SUBJECTS)
                         .retrieve()
-                        .bodyToFlux(String.class)
-                        .doOnError(log::error))
-                .orElse(Flux.error(new NotFoundException("No such cluster")));
+                        .bodyToMono(String[].class)
+                        .doOnError(log::error)
+                )
+                .orElse(Mono.error(new NotFoundException("No such cluster")));
     }
 
     public Flux<Integer> getSchemaSubjectVersions(String clusterName, String schemaName) {

+ 2 - 2
kafka-ui-api/src/main/java/com/provectus/kafka/ui/rest/MetricsRestController.java

@@ -115,8 +115,8 @@ public class MetricsRestController implements ApiClustersApi {
     }
 
     @Override
-    public Mono<ResponseEntity<Flux<String>>> getSchemas(String clusterName, ServerWebExchange exchange) {
-        Flux<String> subjects = schemaRegistryService.getAllSchemaSubjects(clusterName);
+    public Mono<ResponseEntity<Flux<SchemaSubject>>> getSchemas(String clusterName, ServerWebExchange exchange) {
+        Flux<SchemaSubject> subjects = schemaRegistryService.getAllLatestVersionSchemas(clusterName);
         return Mono.just(ResponseEntity.ok(subjects));
     }
 

+ 46 - 24
kafka-ui-api/src/test/java/com/provectus/kafka/ui/SchemaRegistryServiceTests.java

@@ -5,6 +5,7 @@ import com.provectus.kafka.ui.model.SchemaSubject;
 import lombok.extern.log4j.Log4j2;
 import lombok.val;
 import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.boot.test.autoconfigure.web.reactive.AutoConfigureWebTestClient;
@@ -23,6 +24,12 @@ import java.util.UUID;
 class SchemaRegistryServiceTests extends AbstractBaseTest {
     @Autowired
     WebTestClient webTestClient;
+    String subject;
+
+    @BeforeEach
+    void setUpBefore() {
+        this.subject = UUID.randomUUID().toString();
+    }
 
     @Test
     public void should404WhenGetAllSchemasForUnknownCluster() {
@@ -34,11 +41,11 @@ class SchemaRegistryServiceTests extends AbstractBaseTest {
     }
 
     @Test
-    void shouldReturn404WhenGetLatestSchemaByNonExistingSchemaName() {
+    void shouldReturn404WhenGetLatestSchemaByNonExistingSubject() {
         String unknownSchema = "unknown-schema";
         webTestClient
                 .get()
-                .uri("http://localhost:8080/api/clusters/local/schemas/{schemaName}/latest", unknownSchema)
+                .uri("http://localhost:8080/api/clusters/local/schemas/{subject}/latest", unknownSchema)
                 .exchange()
                 .expectStatus().isNotFound();
     }
@@ -59,49 +66,52 @@ class SchemaRegistryServiceTests extends AbstractBaseTest {
     }
 
     @Test
-    public void shouldReturnNotNullResponseWhenGetAllSchemas() {
+    public void shouldReturnNotEmptyResponseWhenGetAllSchemas() {
+        createNewSubjectAndAssert(subject);
+
         webTestClient
                 .get()
                 .uri("http://localhost:8080/api/clusters/local/schemas")
                 .exchange()
                 .expectStatus().isOk()
-                .expectBodyList(String.class)
+                .expectBodyList(SchemaSubject.class)
                 .consumeWith(result -> {
-                    List<String> responseBody = result.getResponseBody();
-                    Assertions.assertNotNull(responseBody);
+                    List<SchemaSubject> responseBody = result.getResponseBody();
                     log.info("Response of test schemas: {}", responseBody);
+                    Assertions.assertNotNull(responseBody);
+                    Assertions.assertFalse(responseBody.isEmpty());
+
+                    SchemaSubject actualSchemaSubject = responseBody.stream()
+                            .filter(schemaSubject -> subject.equals(schemaSubject.getSubject()))
+                            .findFirst()
+                            .orElseThrow();
+                    Assertions.assertNotNull(actualSchemaSubject.getId());
+                    Assertions.assertNotNull(actualSchemaSubject.getVersion());
+                    Assertions.assertNotNull(actualSchemaSubject.getCompatibilityLevel());
+                    Assertions.assertEquals("\"string\"", actualSchemaSubject.getSchema());
+
                 });
     }
 
     @Test
     public void shouldOkWhenCreateNewSchemaThenGetAndUpdateItsCompatibilityLevel() {
-        String schemaName = UUID.randomUUID().toString();
-        // Create a new schema
-        webTestClient
-                .post()
-                .uri("http://localhost:8080/api/clusters/local/schemas/{schemaName}", schemaName)
-                .contentType(MediaType.APPLICATION_JSON)
-                .body(BodyInserters.fromValue("{\"schema\":\"{\\\"type\\\": \\\"string\\\"}\"}"))
-                .exchange()
-                .expectStatus().isOk()
-                .expectBody(SchemaSubject.class)
-                .consumeWith(this::assertResponseBodyWhenCreateNewSchema);
+        createNewSubjectAndAssert(subject);
 
         //Get the created schema and check its items
         webTestClient
                 .get()
-                .uri("http://localhost:8080/api/clusters/local/schemas/{schemaName}/latest", schemaName)
+                .uri("http://localhost:8080/api/clusters/local/schemas/{subject}/latest", subject)
                 .exchange()
                 .expectStatus().isOk()
                 .expectBodyList(SchemaSubject.class)
                 .consumeWith(listEntityExchangeResult -> {
                     val expectedCompatibility = CompatibilityLevel.CompatibilityEnum.BACKWARD;
-                    assertSchemaWhenGetLatest(schemaName, listEntityExchangeResult, expectedCompatibility);
+                    assertSchemaWhenGetLatest(subject, listEntityExchangeResult, expectedCompatibility);
                 });
 
         //Now let's change compatibility level of this schema to FULL whereas the global level should be BACKWARD
         webTestClient.put()
-                .uri("http://localhost:8080/api/clusters/local/schemas/{schemaName}/compatibility", schemaName)
+                .uri("http://localhost:8080/api/clusters/local/schemas/{subject}/compatibility", subject)
                 .contentType(MediaType.APPLICATION_JSON)
                 .body(BodyInserters.fromValue("{\"compatibility\":\"FULL\"}"))
                 .exchange()
@@ -110,23 +120,35 @@ class SchemaRegistryServiceTests extends AbstractBaseTest {
         //Get one more time to check the schema compatibility level is changed to FULL
         webTestClient
                 .get()
-                .uri("http://localhost:8080/api/clusters/local/schemas/{schemaName}/latest", schemaName)
+                .uri("http://localhost:8080/api/clusters/local/schemas/{subject}/latest", subject)
                 .exchange()
                 .expectStatus().isOk()
                 .expectBodyList(SchemaSubject.class)
                 .consumeWith(listEntityExchangeResult -> {
                     val expectedCompatibility = CompatibilityLevel.CompatibilityEnum.FULL;
-                    assertSchemaWhenGetLatest(schemaName, listEntityExchangeResult, expectedCompatibility);
+                    assertSchemaWhenGetLatest(subject, listEntityExchangeResult, expectedCompatibility);
                 });
     }
 
-    private void assertSchemaWhenGetLatest(String schemaName, EntityExchangeResult<List<SchemaSubject>> listEntityExchangeResult, CompatibilityLevel.CompatibilityEnum expectedCompatibility) {
+    private void createNewSubjectAndAssert(String subject) {
+        webTestClient
+                .post()
+                .uri("http://localhost:8080/api/clusters/local/schemas/{subject}", subject)
+                .contentType(MediaType.APPLICATION_JSON)
+                .body(BodyInserters.fromValue("{\"schema\":\"{\\\"type\\\": \\\"string\\\"}\"}"))
+                .exchange()
+                .expectStatus().isOk()
+                .expectBody(SchemaSubject.class)
+                .consumeWith(this::assertResponseBodyWhenCreateNewSchema);
+    }
+
+    private void assertSchemaWhenGetLatest(String subject, EntityExchangeResult<List<SchemaSubject>> listEntityExchangeResult, CompatibilityLevel.CompatibilityEnum expectedCompatibility) {
         List<SchemaSubject> responseBody = listEntityExchangeResult.getResponseBody();
         Assertions.assertNotNull(responseBody);
         Assertions.assertEquals(1, responseBody.size());
         SchemaSubject actualSchema = responseBody.get(0);
         Assertions.assertNotNull(actualSchema);
-        Assertions.assertEquals(schemaName, actualSchema.getSubject());
+        Assertions.assertEquals(subject, actualSchema.getSubject());
         Assertions.assertEquals("\"string\"", actualSchema.getSchema());
 
         Assertions.assertNotNull(actualSchema.getCompatibilityLevel());

+ 2 - 2
kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml

@@ -339,7 +339,7 @@ paths:
     get:
       tags:
         - /api/clusters
-      summary: get all schemas from Schema Registry service
+      summary: get all schemas of latest version from Schema Registry service
       operationId: getSchemas
       parameters:
         - name: clusterName
@@ -355,7 +355,7 @@ paths:
               schema:
                 type: array
                 items:
-                  type: string
+                  $ref: '#/components/schemas/SchemaSubject'
 
   /api/clusters/{clusterName}/schemas/{subject}:
     post: