cluster storage usage removed from SchemaRegistryService (#1262)

Co-authored-by: iliax <ikuramshin@provectus.com>
This commit is contained in:
Ilya Kuramshin 2021-12-17 11:51:31 +03:00 committed by GitHub
parent 2d40c3eac1
commit 2ce80f6d80
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
2 changed files with 147 additions and 156 deletions

View file

@ -1,8 +1,10 @@
package com.provectus.kafka.ui.controller;
import com.provectus.kafka.ui.api.SchemasApi;
import com.provectus.kafka.ui.exception.ValidationException;
import com.provectus.kafka.ui.model.CompatibilityCheckResponseDTO;
import com.provectus.kafka.ui.model.CompatibilityLevelDTO;
import com.provectus.kafka.ui.model.KafkaCluster;
import com.provectus.kafka.ui.model.NewSchemaSubjectDTO;
import com.provectus.kafka.ui.model.SchemaSubjectDTO;
import com.provectus.kafka.ui.service.SchemaRegistryService;
@ -18,15 +20,25 @@ import reactor.core.publisher.Mono;
@RestController
@RequiredArgsConstructor
@Log4j2
public class SchemasController implements SchemasApi {
public class SchemasController extends AbstractController implements SchemasApi {
private final SchemaRegistryService schemaRegistryService;
@Override
protected KafkaCluster getCluster(String clusterName) {
var c = super.getCluster(clusterName);
if (c.getSchemaRegistry() == null) {
throw new ValidationException("Schema Registry is not set for cluster " + clusterName);
}
return c;
}
@Override
public Mono<ResponseEntity<CompatibilityCheckResponseDTO>> checkSchemaCompatibility(
String clusterName, String subject, @Valid Mono<NewSchemaSubjectDTO> newSchemaSubject,
ServerWebExchange exchange) {
return schemaRegistryService.checksSchemaCompatibility(clusterName, subject, newSchemaSubject)
return schemaRegistryService.checksSchemaCompatibility(
getCluster(clusterName), subject, newSchemaSubject)
.map(ResponseEntity::ok);
}
@ -35,40 +47,41 @@ public class SchemasController implements SchemasApi {
String clusterName, @Valid Mono<NewSchemaSubjectDTO> newSchemaSubject,
ServerWebExchange exchange) {
return schemaRegistryService
.registerNewSchema(clusterName, newSchemaSubject)
.registerNewSchema(getCluster(clusterName), newSchemaSubject)
.map(ResponseEntity::ok);
}
@Override
public Mono<ResponseEntity<Void>> deleteLatestSchema(
String clusterName, String subject, ServerWebExchange exchange) {
return schemaRegistryService.deleteLatestSchemaSubject(clusterName, subject);
return schemaRegistryService.deleteLatestSchemaSubject(getCluster(clusterName), subject);
}
@Override
public Mono<ResponseEntity<Void>> deleteSchema(
String clusterName, String subjectName, ServerWebExchange exchange) {
return schemaRegistryService.deleteSchemaSubjectEntirely(clusterName, subjectName);
return schemaRegistryService.deleteSchemaSubjectEntirely(getCluster(clusterName), subjectName);
}
@Override
public Mono<ResponseEntity<Void>> deleteSchemaByVersion(
String clusterName, String subjectName, Integer version, ServerWebExchange exchange) {
return schemaRegistryService.deleteSchemaSubjectByVersion(clusterName, subjectName, version);
return schemaRegistryService.deleteSchemaSubjectByVersion(
getCluster(clusterName), subjectName, version);
}
@Override
public Mono<ResponseEntity<Flux<SchemaSubjectDTO>>> getAllVersionsBySubject(
String clusterName, String subjectName, ServerWebExchange exchange) {
Flux<SchemaSubjectDTO> schemas =
schemaRegistryService.getAllVersionsBySubject(clusterName, subjectName);
schemaRegistryService.getAllVersionsBySubject(getCluster(clusterName), subjectName);
return Mono.just(ResponseEntity.ok(schemas));
}
@Override
public Mono<ResponseEntity<CompatibilityLevelDTO>> getGlobalSchemaCompatibilityLevel(
String clusterName, ServerWebExchange exchange) {
return schemaRegistryService.getGlobalSchemaCompatibilityLevel(clusterName)
return schemaRegistryService.getGlobalSchemaCompatibilityLevel(getCluster(clusterName))
.map(ResponseEntity::ok)
.defaultIfEmpty(ResponseEntity.notFound().build());
}
@ -76,21 +89,24 @@ public class SchemasController implements SchemasApi {
@Override
public Mono<ResponseEntity<SchemaSubjectDTO>> getLatestSchema(String clusterName, String subject,
ServerWebExchange exchange) {
return schemaRegistryService.getLatestSchemaVersionBySubject(clusterName, subject)
return schemaRegistryService.getLatestSchemaVersionBySubject(getCluster(clusterName), subject)
.map(ResponseEntity::ok);
}
@Override
public Mono<ResponseEntity<SchemaSubjectDTO>> getSchemaByVersion(
String clusterName, String subject, Integer version, ServerWebExchange exchange) {
return schemaRegistryService.getSchemaSubjectByVersion(clusterName, subject, version)
return schemaRegistryService.getSchemaSubjectByVersion(
getCluster(clusterName), subject, version)
.map(ResponseEntity::ok);
}
@Override
public Mono<ResponseEntity<Flux<SchemaSubjectDTO>>> getSchemas(String clusterName,
ServerWebExchange exchange) {
Flux<SchemaSubjectDTO> subjects = schemaRegistryService.getAllLatestVersionSchemas(clusterName);
Flux<SchemaSubjectDTO> subjects = schemaRegistryService.getAllLatestVersionSchemas(
getCluster(clusterName)
);
return Mono.just(ResponseEntity.ok(subjects));
}
@ -99,7 +115,8 @@ public class SchemasController implements SchemasApi {
String clusterName, @Valid Mono<CompatibilityLevelDTO> compatibilityLevel,
ServerWebExchange exchange) {
log.info("Updating schema compatibility globally");
return schemaRegistryService.updateSchemaCompatibility(clusterName, compatibilityLevel)
return schemaRegistryService.updateSchemaCompatibility(
getCluster(clusterName), compatibilityLevel)
.map(ResponseEntity::ok);
}
@ -108,7 +125,8 @@ public class SchemasController implements SchemasApi {
String clusterName, String subject, @Valid Mono<CompatibilityLevelDTO> compatibilityLevel,
ServerWebExchange exchange) {
log.info("Updating schema compatibility for subject: {}", subject);
return schemaRegistryService.updateSchemaCompatibility(clusterName, subject, compatibilityLevel)
return schemaRegistryService.updateSchemaCompatibility(
getCluster(clusterName), subject, compatibilityLevel)
.map(ResponseEntity::ok);
}
}

View file

@ -3,7 +3,6 @@ package com.provectus.kafka.ui.service;
import static org.springframework.http.HttpStatus.NOT_FOUND;
import static org.springframework.http.HttpStatus.UNPROCESSABLE_ENTITY;
import com.provectus.kafka.ui.exception.ClusterNotFoundException;
import com.provectus.kafka.ui.exception.DuplicateEntityException;
import com.provectus.kafka.ui.exception.SchemaNotFoundException;
import com.provectus.kafka.ui.exception.UnprocessableEntityException;
@ -52,79 +51,71 @@ public class SchemaRegistryService {
private static final String URL_SUBJECT_BY_VERSION = "/subjects/{schemaName}/versions/{version}";
private static final String LATEST = "latest";
private final ClustersStorage clustersStorage;
private final ClusterMapper mapper;
private final WebClient webClient;
public Flux<SchemaSubjectDTO> getAllLatestVersionSchemas(String clusterName) {
var allSubjectNames = getAllSubjectNames(clusterName);
public Flux<SchemaSubjectDTO> getAllLatestVersionSchemas(KafkaCluster cluster) {
var allSubjectNames = getAllSubjectNames(cluster);
return allSubjectNames
.flatMapMany(Flux::fromArray)
.flatMap(subject -> getLatestSchemaVersionBySubject(clusterName, subject));
.flatMap(subject -> getLatestSchemaVersionBySubject(cluster, subject));
}
public Mono<String[]> getAllSubjectNames(String clusterName) {
return clustersStorage.getClusterByName(clusterName)
.map(cluster -> configuredWebClient(
cluster,
HttpMethod.GET,
URL_SUBJECTS)
.retrieve()
.bodyToMono(String[].class)
.doOnError(log::error)
)
.orElse(Mono.error(ClusterNotFoundException::new));
public Mono<String[]> getAllSubjectNames(KafkaCluster cluster) {
return configuredWebClient(
cluster,
HttpMethod.GET,
URL_SUBJECTS)
.retrieve()
.bodyToMono(String[].class)
.doOnError(log::error);
}
public Flux<SchemaSubjectDTO> getAllVersionsBySubject(String clusterName, String subject) {
Flux<Integer> versions = getSubjectVersions(clusterName, subject);
return versions.flatMap(version -> getSchemaSubjectByVersion(clusterName, subject, version));
public Flux<SchemaSubjectDTO> getAllVersionsBySubject(KafkaCluster cluster, String subject) {
Flux<Integer> versions = getSubjectVersions(cluster, subject);
return versions.flatMap(version -> getSchemaSubjectByVersion(cluster, subject, version));
}
private Flux<Integer> getSubjectVersions(String clusterName, String schemaName) {
return clustersStorage.getClusterByName(clusterName)
.map(cluster -> configuredWebClient(
cluster,
HttpMethod.GET,
URL_SUBJECT_VERSIONS, schemaName)
.retrieve()
.onStatus(NOT_FOUND::equals,
throwIfNotFoundStatus(formatted(NO_SUCH_SCHEMA, schemaName))
).bodyToFlux(Integer.class)
).orElse(Flux.error(ClusterNotFoundException::new));
private Flux<Integer> getSubjectVersions(KafkaCluster cluster, String schemaName) {
return configuredWebClient(
cluster,
HttpMethod.GET,
URL_SUBJECT_VERSIONS, schemaName)
.retrieve()
.onStatus(NOT_FOUND::equals,
throwIfNotFoundStatus(formatted(NO_SUCH_SCHEMA, schemaName)))
.bodyToFlux(Integer.class);
}
public Mono<SchemaSubjectDTO> getSchemaSubjectByVersion(String clusterName, String schemaName,
Integer version) {
return this.getSchemaSubject(clusterName, schemaName, String.valueOf(version));
public Mono<SchemaSubjectDTO> getSchemaSubjectByVersion(KafkaCluster cluster, String schemaName,
Integer version) {
return this.getSchemaSubject(cluster, schemaName, String.valueOf(version));
}
public Mono<SchemaSubjectDTO> getLatestSchemaVersionBySubject(String clusterName,
public Mono<SchemaSubjectDTO> getLatestSchemaVersionBySubject(KafkaCluster cluster,
String schemaName) {
return this.getSchemaSubject(clusterName, schemaName, LATEST);
return this.getSchemaSubject(cluster, schemaName, LATEST);
}
private Mono<SchemaSubjectDTO> getSchemaSubject(String clusterName, String schemaName,
String version) {
return clustersStorage.getClusterByName(clusterName)
.map(cluster -> configuredWebClient(
cluster,
HttpMethod.GET,
URL_SUBJECT_BY_VERSION, schemaName, version)
.retrieve()
.onStatus(NOT_FOUND::equals,
throwIfNotFoundStatus(formatted(NO_SUCH_SCHEMA_VERSION, schemaName, version))
).bodyToMono(SchemaSubjectDTO.class)
.map(this::withSchemaType)
.zipWith(getSchemaCompatibilityInfoOrGlobal(clusterName, schemaName))
.map(tuple -> {
SchemaSubjectDTO schema = tuple.getT1();
String compatibilityLevel = tuple.getT2().getCompatibility().getValue();
schema.setCompatibilityLevel(compatibilityLevel);
return schema;
})
private Mono<SchemaSubjectDTO> getSchemaSubject(KafkaCluster cluster, String schemaName,
String version) {
return configuredWebClient(
cluster,
HttpMethod.GET,
URL_SUBJECT_BY_VERSION, schemaName, version)
.retrieve()
.onStatus(NOT_FOUND::equals,
throwIfNotFoundStatus(formatted(NO_SUCH_SCHEMA_VERSION, schemaName, version))
)
.orElse(Mono.error(ClusterNotFoundException::new));
.bodyToMono(SchemaSubjectDTO.class)
.map(this::withSchemaType)
.zipWith(getSchemaCompatibilityInfoOrGlobal(cluster, schemaName))
.map(tuple -> {
SchemaSubjectDTO schema = tuple.getT1();
String compatibilityLevel = tuple.getT2().getCompatibility().getValue();
schema.setCompatibilityLevel(compatibilityLevel);
return schema;
});
}
/**
@ -136,52 +127,47 @@ public class SchemaRegistryService {
return s.schemaType(Optional.ofNullable(s.getSchemaType()).orElse(SchemaTypeDTO.AVRO));
}
public Mono<ResponseEntity<Void>> deleteSchemaSubjectByVersion(String clusterName,
public Mono<ResponseEntity<Void>> deleteSchemaSubjectByVersion(KafkaCluster cluster,
String schemaName,
Integer version) {
return this.deleteSchemaSubject(clusterName, schemaName, String.valueOf(version));
return this.deleteSchemaSubject(cluster, schemaName, String.valueOf(version));
}
public Mono<ResponseEntity<Void>> deleteLatestSchemaSubject(String clusterName,
public Mono<ResponseEntity<Void>> deleteLatestSchemaSubject(KafkaCluster cluster,
String schemaName) {
return this.deleteSchemaSubject(clusterName, schemaName, LATEST);
return this.deleteSchemaSubject(cluster, schemaName, LATEST);
}
private Mono<ResponseEntity<Void>> deleteSchemaSubject(String clusterName, String schemaName,
private Mono<ResponseEntity<Void>> deleteSchemaSubject(KafkaCluster cluster, String schemaName,
String version) {
return clustersStorage.getClusterByName(clusterName)
.map(cluster -> configuredWebClient(
cluster,
HttpMethod.DELETE,
URL_SUBJECT_BY_VERSION, schemaName, version)
.retrieve()
.onStatus(NOT_FOUND::equals,
throwIfNotFoundStatus(formatted(NO_SUCH_SCHEMA_VERSION, schemaName, version))
).toBodilessEntity()
).orElse(Mono.error(ClusterNotFoundException::new));
return configuredWebClient(
cluster,
HttpMethod.DELETE,
URL_SUBJECT_BY_VERSION, schemaName, version)
.retrieve()
.onStatus(NOT_FOUND::equals,
throwIfNotFoundStatus(formatted(NO_SUCH_SCHEMA_VERSION, schemaName, version))
).toBodilessEntity();
}
public Mono<ResponseEntity<Void>> deleteSchemaSubjectEntirely(String clusterName,
public Mono<ResponseEntity<Void>> deleteSchemaSubjectEntirely(KafkaCluster cluster,
String schemaName) {
return clustersStorage.getClusterByName(clusterName)
.map(cluster -> configuredWebClient(
cluster,
HttpMethod.DELETE,
URL_SUBJECT, schemaName)
.retrieve()
.onStatus(NOT_FOUND::equals,
throwIfNotFoundStatus(formatted(NO_SUCH_SCHEMA, schemaName))
)
.toBodilessEntity())
.orElse(Mono.error(ClusterNotFoundException::new));
return configuredWebClient(
cluster,
HttpMethod.DELETE,
URL_SUBJECT, schemaName)
.retrieve()
.onStatus(NOT_FOUND::equals,
throwIfNotFoundStatus(formatted(NO_SUCH_SCHEMA, schemaName)))
.toBodilessEntity();
}
/**
* Checks whether the provided schema duplicates the previous or not, creates a new schema
* and then returns the whole content by requesting its latest version.
*/
public Mono<SchemaSubjectDTO> registerNewSchema(String clusterName,
Mono<NewSchemaSubjectDTO> newSchemaSubject) {
public Mono<SchemaSubjectDTO> registerNewSchema(KafkaCluster cluster,
Mono<NewSchemaSubjectDTO> newSchemaSubject) {
return newSchemaSubject
.flatMap(schema -> {
SchemaTypeDTO schemaType =
@ -189,14 +175,10 @@ public class SchemaRegistryService {
Mono<InternalNewSchema> newSchema =
Mono.just(new InternalNewSchema(schema.getSchema(), schemaType));
String subject = schema.getSubject();
return clustersStorage.getClusterByName(clusterName)
.map(KafkaCluster::getSchemaRegistry)
.map(
schemaRegistry -> checkSchemaOnDuplicate(subject, newSchema, schemaRegistry)
.flatMap(s -> submitNewSchema(subject, newSchema, schemaRegistry))
.flatMap(resp -> getLatestSchemaVersionBySubject(clusterName, subject))
)
.orElse(Mono.error(ClusterNotFoundException::new));
var schemaRegistry = cluster.getSchemaRegistry();
return checkSchemaOnDuplicate(subject, newSchema, schemaRegistry)
.flatMap(s -> submitNewSchema(subject, newSchema, schemaRegistry))
.flatMap(resp -> getLatestSchemaVersionBySubject(cluster, subject));
});
}
@ -249,71 +231,62 @@ public class SchemaRegistryService {
* @param schemaName is a schema subject name
* @see com.provectus.kafka.ui.model.CompatibilityLevelDTO.CompatibilityEnum
*/
public Mono<Void> updateSchemaCompatibility(String clusterName, String schemaName,
public Mono<Void> updateSchemaCompatibility(KafkaCluster cluster, String schemaName,
Mono<CompatibilityLevelDTO> compatibilityLevel) {
return clustersStorage.getClusterByName(clusterName)
.map(cluster -> {
String configEndpoint = Objects.isNull(schemaName) ? "/config" : "/config/{schemaName}";
return configuredWebClient(
cluster,
HttpMethod.PUT,
configEndpoint, schemaName)
.contentType(MediaType.APPLICATION_JSON)
.body(BodyInserters.fromPublisher(compatibilityLevel, CompatibilityLevelDTO.class))
.retrieve()
.onStatus(NOT_FOUND::equals,
throwIfNotFoundStatus(formatted(NO_SUCH_SCHEMA, schemaName)))
.bodyToMono(Void.class);
}).orElse(Mono.error(ClusterNotFoundException::new));
String configEndpoint = Objects.isNull(schemaName) ? "/config" : "/config/{schemaName}";
return configuredWebClient(
cluster,
HttpMethod.PUT,
configEndpoint, schemaName)
.contentType(MediaType.APPLICATION_JSON)
.body(BodyInserters.fromPublisher(compatibilityLevel, CompatibilityLevelDTO.class))
.retrieve()
.onStatus(NOT_FOUND::equals,
throwIfNotFoundStatus(formatted(NO_SUCH_SCHEMA, schemaName)))
.bodyToMono(Void.class);
}
public Mono<Void> updateSchemaCompatibility(String clusterName,
public Mono<Void> updateSchemaCompatibility(KafkaCluster cluster,
Mono<CompatibilityLevelDTO> compatibilityLevel) {
return updateSchemaCompatibility(clusterName, null, compatibilityLevel);
return updateSchemaCompatibility(cluster, null, compatibilityLevel);
}
public Mono<CompatibilityLevelDTO> getSchemaCompatibilityLevel(String clusterName,
String schemaName) {
return clustersStorage.getClusterByName(clusterName)
.map(cluster -> {
String configEndpoint = Objects.isNull(schemaName) ? "/config" : "/config/{schemaName}";
return configuredWebClient(
cluster,
HttpMethod.GET,
configEndpoint, schemaName)
.retrieve()
.bodyToMono(InternalCompatibilityLevel.class)
.map(mapper::toCompatibilityLevel)
.onErrorResume(error -> Mono.empty());
}).orElse(Mono.empty());
public Mono<CompatibilityLevelDTO> getSchemaCompatibilityLevel(KafkaCluster cluster,
String schemaName) {
String configEndpoint = Objects.isNull(schemaName) ? "/config" : "/config/{schemaName}";
return configuredWebClient(
cluster,
HttpMethod.GET,
configEndpoint, schemaName)
.retrieve()
.bodyToMono(InternalCompatibilityLevel.class)
.map(mapper::toCompatibilityLevel)
.onErrorResume(error -> Mono.empty());
}
public Mono<CompatibilityLevelDTO> getGlobalSchemaCompatibilityLevel(String clusterName) {
return this.getSchemaCompatibilityLevel(clusterName, null);
public Mono<CompatibilityLevelDTO> getGlobalSchemaCompatibilityLevel(KafkaCluster cluster) {
return this.getSchemaCompatibilityLevel(cluster, null);
}
private Mono<CompatibilityLevelDTO> getSchemaCompatibilityInfoOrGlobal(String clusterName,
private Mono<CompatibilityLevelDTO> getSchemaCompatibilityInfoOrGlobal(KafkaCluster cluster,
String schemaName) {
return this.getSchemaCompatibilityLevel(clusterName, schemaName)
.switchIfEmpty(this.getGlobalSchemaCompatibilityLevel(clusterName));
return this.getSchemaCompatibilityLevel(cluster, schemaName)
.switchIfEmpty(this.getGlobalSchemaCompatibilityLevel(cluster));
}
public Mono<CompatibilityCheckResponseDTO> checksSchemaCompatibility(
String clusterName, String schemaName, Mono<NewSchemaSubjectDTO> newSchemaSubject) {
return clustersStorage.getClusterByName(clusterName)
.map(cluster -> configuredWebClient(
cluster,
HttpMethod.POST,
"/compatibility/subjects/{schemaName}/versions/latest", schemaName)
.contentType(MediaType.APPLICATION_JSON)
.body(BodyInserters.fromPublisher(newSchemaSubject, NewSchemaSubjectDTO.class))
.retrieve()
.onStatus(NOT_FOUND::equals,
throwIfNotFoundStatus(formatted(NO_SUCH_SCHEMA, schemaName)))
.bodyToMono(InternalCompatibilityCheck.class)
.map(mapper::toCompatibilityCheckResponse)
.log()
).orElse(Mono.error(ClusterNotFoundException::new));
KafkaCluster cluster, String schemaName, Mono<NewSchemaSubjectDTO> newSchemaSubject) {
return configuredWebClient(
cluster,
HttpMethod.POST,
"/compatibility/subjects/{schemaName}/versions/latest", schemaName)
.contentType(MediaType.APPLICATION_JSON)
.body(BodyInserters.fromPublisher(newSchemaSubject, NewSchemaSubjectDTO.class))
.retrieve()
.onStatus(NOT_FOUND::equals,
throwIfNotFoundStatus(formatted(NO_SUCH_SCHEMA, schemaName)))
.bodyToMono(InternalCompatibilityCheck.class)
.map(mapper::toCompatibilityCheckResponse);
}
public String formatted(String str, Object... args) {