Merge branch 'master' of github.com:provectus/kafka-ui into feature/schema_registry_views

This commit is contained in:
Guzel Kafizova 2021-02-15 15:57:26 +03:00
commit f5394ca4af
5 changed files with 106 additions and 67 deletions

View file

@ -38,7 +38,7 @@ public class GlobalErrorWebExceptionHandler extends AbstractErrorWebExceptionHan
private Mono<ServerResponse> renderErrorResponse(ServerRequest request) {
Map<String, Object> errorAttributes = getErrorAttributes(request, false);
HttpStatus statusCode = Optional.ofNullable(errorAttributes.get(GlobalErrorAttributes.STATUS))
.map(code -> (HttpStatus) code)
.map(code -> code instanceof Integer ? HttpStatus.valueOf((Integer) code) : (HttpStatus) code)
.orElse(HttpStatus.BAD_REQUEST);
return ServerResponse
.status(statusCode)

View file

@ -12,6 +12,8 @@ import com.provectus.kafka.ui.model.SchemaSubject;
import java.util.Formatter;
import lombok.RequiredArgsConstructor;
import lombok.extern.log4j.Log4j2;
import org.springframework.core.ParameterizedTypeReference;
import org.springframework.http.HttpEntity;
import org.springframework.http.HttpStatus;
import org.springframework.http.MediaType;
import org.springframework.http.ResponseEntity;
@ -21,6 +23,8 @@ import org.springframework.web.reactive.function.client.WebClient;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import java.util.Arrays;
import java.util.List;
import java.util.Objects;
@Service
@ -37,17 +41,30 @@ public class SchemaRegistryService {
private final ClusterMapper mapper;
private final WebClient webClient;
public Flux<String> getAllSchemaSubjects(String clusterName) {
public Flux<SchemaSubject> getAllLatestVersionSchemas(String clusterName) {
var allSubjectNames = getAllSubjectNames(clusterName);
return allSubjectNames
.flatMapMany(Flux::fromArray)
.flatMap(subject -> getLatestSchemaSubject(clusterName, subject));
}
public Mono<String[]> getAllSubjectNames(String clusterName) {
return clustersStorage.getClusterByName(clusterName)
.map(cluster -> webClient.get()
.uri(cluster.getSchemaRegistry() + URL_SUBJECTS)
.retrieve()
.bodyToFlux(String.class)
.doOnError(log::error))
.orElse(Flux.error(new NotFoundException("No such cluster")));
.bodyToMono(String[].class)
.doOnError(log::error)
)
.orElse(Mono.error(new NotFoundException("No such cluster")));
}
public Flux<Integer> getSchemaSubjectVersions(String clusterName, String schemaName) {
public Flux<SchemaSubject> getAllVersionsBySubject(String clusterName, String subject) {
Flux<Integer> versions = getSubjectVersions(clusterName, subject);
return versions.flatMap(version -> getSchemaSubjectByVersion(clusterName, subject, version));
}
private Flux<Integer> getSubjectVersions(String clusterName, String schemaName) {
return clustersStorage.getClusterByName(clusterName)
.map(cluster -> webClient.get()
.uri(cluster.getSchemaRegistry() + URL_SUBJECT_VERSIONS, schemaName)

View file

@ -105,29 +105,30 @@ public class MetricsRestController implements ApiClustersApi {
}
@Override
public Mono<ResponseEntity<SchemaSubject>> getLatestSchema(String clusterName, String schemaName, ServerWebExchange exchange) {
return schemaRegistryService.getLatestSchemaSubject(clusterName, schemaName).map(ResponseEntity::ok);
public Mono<ResponseEntity<SchemaSubject>> getLatestSchema(String clusterName, String subject, ServerWebExchange exchange) {
return schemaRegistryService.getLatestSchemaSubject(clusterName, subject).map(ResponseEntity::ok);
}
@Override
public Mono<ResponseEntity<SchemaSubject>> getSchemaByVersion(String clusterName, String schemaName, Integer version, ServerWebExchange exchange) {
return schemaRegistryService.getSchemaSubjectByVersion(clusterName, schemaName, version).map(ResponseEntity::ok);
public Mono<ResponseEntity<SchemaSubject>> getSchemaByVersion(String clusterName, String subject, Integer version, ServerWebExchange exchange) {
return schemaRegistryService.getSchemaSubjectByVersion(clusterName, subject, version).map(ResponseEntity::ok);
}
@Override
public Mono<ResponseEntity<Flux<String>>> getSchemas(String clusterName, ServerWebExchange exchange) {
Flux<String> subjects = schemaRegistryService.getAllSchemaSubjects(clusterName);
public Mono<ResponseEntity<Flux<SchemaSubject>>> getSchemas(String clusterName, ServerWebExchange exchange) {
Flux<SchemaSubject> subjects = schemaRegistryService.getAllLatestVersionSchemas(clusterName);
return Mono.just(ResponseEntity.ok(subjects));
}
@Override
public Mono<ResponseEntity<Flux<Integer>>> getSchemaVersions(String clusterName, String subjectName, ServerWebExchange exchange) {
return Mono.just(ResponseEntity.ok(schemaRegistryService.getSchemaSubjectVersions(clusterName, subjectName)));
public Mono<ResponseEntity<Flux<SchemaSubject>>> getAllVersionsBySubject(String clusterName, String subjectName, ServerWebExchange exchange) {
Flux<SchemaSubject> schemas = schemaRegistryService.getAllVersionsBySubject(clusterName, subjectName);
return Mono.just(ResponseEntity.ok(schemas));
}
@Override
public Mono<ResponseEntity<Void>> deleteLatestSchema(String clusterName, String schemaName, ServerWebExchange exchange) {
return schemaRegistryService.deleteLatestSchemaSubject(clusterName, schemaName);
public Mono<ResponseEntity<Void>> deleteLatestSchema(String clusterName, String subject, ServerWebExchange exchange) {
return schemaRegistryService.deleteLatestSchemaSubject(clusterName, subject);
}
@Override
@ -141,10 +142,10 @@ public class MetricsRestController implements ApiClustersApi {
}
@Override
public Mono<ResponseEntity<SchemaSubject>> createNewSchema(String clusterName, String schemaName,
public Mono<ResponseEntity<SchemaSubject>> createNewSchema(String clusterName, String subject,
@Valid Mono<NewSchemaSubject> newSchemaSubject,
ServerWebExchange exchange) {
return schemaRegistryService.createNewSubject(clusterName, schemaName, newSchemaSubject);
return schemaRegistryService.createNewSubject(clusterName, subject, newSchemaSubject);
}
@Override
@ -172,17 +173,17 @@ public class MetricsRestController implements ApiClustersApi {
}
@Override
public Mono<ResponseEntity<CompatibilityCheckResponse>> checkSchemaCompatibility(String clusterName, String schemaName,
public Mono<ResponseEntity<CompatibilityCheckResponse>> checkSchemaCompatibility(String clusterName, String subject,
@Valid Mono<NewSchemaSubject> newSchemaSubject,
ServerWebExchange exchange) {
return schemaRegistryService.checksSchemaCompatibility(clusterName, schemaName, newSchemaSubject)
return schemaRegistryService.checksSchemaCompatibility(clusterName, subject, newSchemaSubject)
.map(ResponseEntity::ok);
}
@Override
public Mono<ResponseEntity<Void>> updateSchemaCompatibilityLevel(String clusterName, String schemaName, @Valid Mono<CompatibilityLevel> compatibilityLevel, ServerWebExchange exchange) {
log.info("Updating schema compatibility for schema: {}", schemaName);
return schemaRegistryService.updateSchemaCompatibility(clusterName, schemaName, compatibilityLevel)
public Mono<ResponseEntity<Void>> updateSchemaCompatibilityLevel(String clusterName, String subject, @Valid Mono<CompatibilityLevel> compatibilityLevel, ServerWebExchange exchange) {
log.info("Updating schema compatibility for subject: {}", subject);
return schemaRegistryService.updateSchemaCompatibility(clusterName, subject, compatibilityLevel)
.map(ResponseEntity::ok);
}

View file

@ -5,6 +5,7 @@ import com.provectus.kafka.ui.model.SchemaSubject;
import lombok.extern.log4j.Log4j2;
import lombok.val;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.autoconfigure.web.reactive.AutoConfigureWebTestClient;
@ -23,6 +24,12 @@ import java.util.UUID;
class SchemaRegistryServiceTests extends AbstractBaseTest {
@Autowired
WebTestClient webTestClient;
String subject;
@BeforeEach
void setUpBefore() {
this.subject = UUID.randomUUID().toString();
}
@Test
public void should404WhenGetAllSchemasForUnknownCluster() {
@ -34,11 +41,11 @@ class SchemaRegistryServiceTests extends AbstractBaseTest {
}
@Test
void shouldReturn404WhenGetLatestSchemaByNonExistingSchemaName() {
void shouldReturn404WhenGetLatestSchemaByNonExistingSubject() {
String unknownSchema = "unknown-schema";
webTestClient
.get()
.uri("http://localhost:8080/api/clusters/local/schemas/{schemaName}/latest", unknownSchema)
.uri("http://localhost:8080/api/clusters/local/schemas/{subject}/latest", unknownSchema)
.exchange()
.expectStatus().isNotFound();
}
@ -59,49 +66,51 @@ class SchemaRegistryServiceTests extends AbstractBaseTest {
}
@Test
public void shouldReturnNotNullResponseWhenGetAllSchemas() {
public void shouldReturnNotEmptyResponseWhenGetAllSchemas() {
createNewSubjectAndAssert(subject);
webTestClient
.get()
.uri("http://localhost:8080/api/clusters/local/schemas")
.exchange()
.expectStatus().isOk()
.expectBodyList(String.class)
.expectBodyList(SchemaSubject.class)
.consumeWith(result -> {
List<String> responseBody = result.getResponseBody();
Assertions.assertNotNull(responseBody);
List<SchemaSubject> responseBody = result.getResponseBody();
log.info("Response of test schemas: {}", responseBody);
Assertions.assertNotNull(responseBody);
Assertions.assertFalse(responseBody.isEmpty());
SchemaSubject actualSchemaSubject = responseBody.stream()
.filter(schemaSubject -> subject.equals(schemaSubject.getSubject()))
.findFirst()
.orElseThrow();
Assertions.assertNotNull(actualSchemaSubject.getId());
Assertions.assertNotNull(actualSchemaSubject.getVersion());
Assertions.assertNotNull(actualSchemaSubject.getCompatibilityLevel());
Assertions.assertEquals("\"string\"", actualSchemaSubject.getSchema());
});
}
@Test
public void shouldOkWhenCreateNewSchemaThenGetAndUpdateItsCompatibilityLevel() {
String schemaName = UUID.randomUUID().toString();
// Create a new schema
webTestClient
.post()
.uri("http://localhost:8080/api/clusters/local/schemas/{schemaName}", schemaName)
.contentType(MediaType.APPLICATION_JSON)
.body(BodyInserters.fromValue("{\"schema\":\"{\\\"type\\\": \\\"string\\\"}\"}"))
.exchange()
.expectStatus().isOk()
.expectBody(SchemaSubject.class)
.consumeWith(this::assertResponseBodyWhenCreateNewSchema);
createNewSubjectAndAssert(subject);
//Get the created schema and check its items
webTestClient
.get()
.uri("http://localhost:8080/api/clusters/local/schemas/{schemaName}/latest", schemaName)
.uri("http://localhost:8080/api/clusters/local/schemas/{subject}/latest", subject)
.exchange()
.expectStatus().isOk()
.expectBodyList(SchemaSubject.class)
.consumeWith(listEntityExchangeResult -> {
val expectedCompatibility = CompatibilityLevel.CompatibilityEnum.BACKWARD;
assertSchemaWhenGetLatest(schemaName, listEntityExchangeResult, expectedCompatibility);
assertSchemaWhenGetLatest(subject, listEntityExchangeResult, expectedCompatibility);
});
//Now let's change compatibility level of this schema to FULL whereas the global level should be BACKWARD
webTestClient.put()
.uri("http://localhost:8080/api/clusters/local/schemas/{schemaName}/compatibility", schemaName)
.uri("http://localhost:8080/api/clusters/local/schemas/{subject}/compatibility", subject)
.contentType(MediaType.APPLICATION_JSON)
.body(BodyInserters.fromValue("{\"compatibility\":\"FULL\"}"))
.exchange()
@ -110,23 +119,35 @@ class SchemaRegistryServiceTests extends AbstractBaseTest {
//Get one more time to check the schema compatibility level is changed to FULL
webTestClient
.get()
.uri("http://localhost:8080/api/clusters/local/schemas/{schemaName}/latest", schemaName)
.uri("http://localhost:8080/api/clusters/local/schemas/{subject}/latest", subject)
.exchange()
.expectStatus().isOk()
.expectBodyList(SchemaSubject.class)
.consumeWith(listEntityExchangeResult -> {
val expectedCompatibility = CompatibilityLevel.CompatibilityEnum.FULL;
assertSchemaWhenGetLatest(schemaName, listEntityExchangeResult, expectedCompatibility);
assertSchemaWhenGetLatest(subject, listEntityExchangeResult, expectedCompatibility);
});
}
private void assertSchemaWhenGetLatest(String schemaName, EntityExchangeResult<List<SchemaSubject>> listEntityExchangeResult, CompatibilityLevel.CompatibilityEnum expectedCompatibility) {
private void createNewSubjectAndAssert(String subject) {
webTestClient
.post()
.uri("http://localhost:8080/api/clusters/local/schemas/{subject}", subject)
.contentType(MediaType.APPLICATION_JSON)
.body(BodyInserters.fromValue("{\"schema\":\"{\\\"type\\\": \\\"string\\\"}\"}"))
.exchange()
.expectStatus().isOk()
.expectBody(SchemaSubject.class)
.consumeWith(this::assertResponseBodyWhenCreateNewSchema);
}
private void assertSchemaWhenGetLatest(String subject, EntityExchangeResult<List<SchemaSubject>> listEntityExchangeResult, CompatibilityLevel.CompatibilityEnum expectedCompatibility) {
List<SchemaSubject> responseBody = listEntityExchangeResult.getResponseBody();
Assertions.assertNotNull(responseBody);
Assertions.assertEquals(1, responseBody.size());
SchemaSubject actualSchema = responseBody.get(0);
Assertions.assertNotNull(actualSchema);
Assertions.assertEquals(schemaName, actualSchema.getSubject());
Assertions.assertEquals(subject, actualSchema.getSubject());
Assertions.assertEquals("\"string\"", actualSchema.getSchema());
Assertions.assertNotNull(actualSchema.getCompatibilityLevel());

View file

@ -339,7 +339,7 @@ paths:
get:
tags:
- /api/clusters
summary: get all schemas from Schema Registry service
summary: get all schemas of latest version from Schema Registry service
operationId: getSchemas
parameters:
- name: clusterName
@ -355,9 +355,9 @@ paths:
schema:
type: array
items:
type: string
$ref: '#/components/schemas/SchemaSubject'
/api/clusters/{clusterName}/schemas/{schemaName}:
/api/clusters/{clusterName}/schemas/{subject}:
post:
tags:
- /api/clusters
@ -369,7 +369,7 @@ paths:
required: true
schema:
type: string
- name: schemaName
- name: subject
in: path
required: true
schema:
@ -399,7 +399,7 @@ paths:
required: true
schema:
type: string
- name: schemaName
- name: subject
in: path
required: true
schema:
@ -410,19 +410,19 @@ paths:
404:
description: Not found
/api/clusters/{clusterName}/schemas/{schemaName}/versions:
/api/clusters/{clusterName}/schemas/{subject}/versions:
get:
tags:
- /api/clusters
summary: get all version of schema from Schema Registry service
operationId: getSchemaVersions
summary: get all version of subject from Schema Registry service
operationId: getAllVersionsBySubject
parameters:
- name: clusterName
in: path
required: true
schema:
type: string
- name: schemaName
- name: subject
in: path
required: true
schema:
@ -435,9 +435,9 @@ paths:
schema:
type: array
items:
type: integer
$ref: '#/components/schemas/SchemaSubject'
/api/clusters/{clusterName}/schemas/{schemaName}/latest:
/api/clusters/{clusterName}/schemas/{subject}/latest:
get:
tags:
- /api/clusters
@ -449,7 +449,7 @@ paths:
required: true
schema:
type: string
- name: schemaName
- name: subject
in: path
required: true
schema:
@ -472,7 +472,7 @@ paths:
required: true
schema:
type: string
- name: schemaName
- name: subject
in: path
required: true
schema:
@ -484,7 +484,7 @@ paths:
description: Not found
/api/clusters/{clusterName}/schemas/{schemaName}/versions/{version}:
/api/clusters/{clusterName}/schemas/{subject}/versions/{version}:
get:
tags:
- /api/clusters
@ -496,7 +496,7 @@ paths:
required: true
schema:
type: string
- name: schemaName
- name: subject
in: path
required: true
schema:
@ -524,7 +524,7 @@ paths:
required: true
schema:
type: string
- name: schemaName
- name: subject
in: path
required: true
schema:
@ -581,7 +581,7 @@ paths:
404:
description: Not Found
/api/clusters/{clusterName}/schemas/{schemaName}/compatibility:
/api/clusters/{clusterName}/schemas/{subject}/compatibility:
put:
tags:
- /api/clusters
@ -593,7 +593,7 @@ paths:
required: true
schema:
type: string
- name: schemaName
- name: subject
in: path
required: true
schema:
@ -609,7 +609,7 @@ paths:
404:
description: Not Found
/api/clusters/{clusterName}/schemas/{schemaName}/check:
/api/clusters/{clusterName}/schemas/{subject}/check:
post:
tags:
- /api/clusters
@ -621,7 +621,7 @@ paths:
required: true
schema:
type: string
- name: schemaName
- name: subject
in: path
required: true
schema: