Refactor registerNewSchema method

This commit is contained in:
Ildar Almakaev 2021-03-02 15:27:24 +03:00
parent 35736033a8
commit a68fa15354

View file

@ -138,29 +138,29 @@ public class SchemaRegistryService {
.orElse(Mono.error(new NotFoundException("No such cluster")));
}
/**
* Checks whether the provided schema duplicates the previous or not, creates a new schema
* and then returns the whole content by requesting its latest version.
*/
public Mono<SchemaSubject> registerNewSchema(String clusterName, Mono<NewSchemaSubject> newSchemaSubject) {
return newSchemaSubject
.flatMap(schema -> {
Mono<InternalNewSchema> newSchema = Mono.just(new InternalNewSchema(schema.getSchema()));
String subject = schema.getSubject();
return createNewSchema(clusterName, subject, newSchema);
return clustersStorage.getClusterByName(clusterName)
.map(KafkaCluster::getSchemaRegistry)
.map(schemaRegistryUrl -> checkSchemaOnDuplicate(subject, newSchema, schemaRegistryUrl)
.flatMap(s -> submitNewSchema(subject, newSchema, schemaRegistryUrl))
.flatMap(resp -> getLatestSchemaVersionBySubject(clusterName, subject))
)
.orElse(Mono.error(new NotFoundException("No such cluster")));
});
}
private Mono<SchemaSubject> createNewSchema(String clusterName, String subject, Mono<InternalNewSchema> newSchemaSubject) {
return clustersStorage.getClusterByName(clusterName)
.map(KafkaCluster::getSchemaRegistry)
.map(schemaRegistryUrl -> checkSchemaOnDuplicate(subject, newSchemaSubject, schemaRegistryUrl)
.flatMap(s -> submitNewSchema(subject, newSchemaSubject, schemaRegistryUrl))
.flatMap(resp -> getLatestSchemaVersionBySubject(clusterName, subject))
)
.orElse(Mono.error(new NotFoundException("No such cluster")));
}
@NotNull
private Mono<SubjectIdResponse> submitNewSchema(String subject, Mono<InternalNewSchema> newSchemaSubject, String schemaRegistry) {
private Mono<SubjectIdResponse> submitNewSchema(String subject, Mono<InternalNewSchema> newSchemaSubject, String schemaRegistryUrl) {
return webClient.post()
.uri(schemaRegistry + URL_SUBJECT_VERSIONS, subject)
.uri(schemaRegistryUrl + URL_SUBJECT_VERSIONS, subject)
.contentType(MediaType.APPLICATION_JSON)
.body(BodyInserters.fromPublisher(newSchemaSubject, InternalNewSchema.class))
.retrieve()
@ -169,13 +169,14 @@ public class SchemaRegistryService {
}
@NotNull
private Mono<SchemaSubject> checkSchemaOnDuplicate(String subject, Mono<InternalNewSchema> newSchemaSubject, String schemaRegistry) {
private Mono<SchemaSubject> checkSchemaOnDuplicate(String subject, Mono<InternalNewSchema> newSchemaSubject, String schemaRegistryUrl) {
return webClient.post()
.uri(schemaRegistry + URL_SUBJECT, subject)
.uri(schemaRegistryUrl + URL_SUBJECT, subject)
.contentType(MediaType.APPLICATION_JSON)
.body(BodyInserters.fromPublisher(newSchemaSubject, InternalNewSchema.class))
.retrieve()
.onStatus(NOT_FOUND::equals, res -> Mono.empty())
.onStatus(UNPROCESSABLE_ENTITY::equals, r -> Mono.error(new UnprocessableEntityException("Invalid params")))
.bodyToMono(SchemaSubject.class)
.filter(s -> Objects.isNull(s.getId()))
.switchIfEmpty(Mono.error(new DuplicateEntityException("Such schema already exists")));
@ -256,6 +257,8 @@ public class SchemaRegistryService {
return "AVRO";
} else if (schema.contains("proto")) {
return "PROTO";
} else if (schema.contains("json")) {
return "JSON";
} else return schema;
}
}