diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/service/SchemaRegistryService.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/service/SchemaRegistryService.java index 0d760ce195..3d25f80cd8 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/service/SchemaRegistryService.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/service/SchemaRegistryService.java @@ -11,6 +11,7 @@ import org.springframework.http.MediaType; import org.springframework.http.ResponseEntity; import org.springframework.stereotype.Service; import org.springframework.web.reactive.function.BodyInserters; +import org.springframework.web.reactive.function.client.ClientResponse; import org.springframework.web.reactive.function.client.WebClient; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; @@ -78,12 +79,16 @@ public class SchemaRegistryService { public Mono> createNewSubject(String clusterName, String subjectSchema, Mono newSchemaSubject) { return clustersStorage.getClusterByName(clusterName) - .map(cluster -> webClient.post() - .uri(cluster.getSchemaRegistry() + URL_SUBJECT_VERSIONS, subjectSchema) - .contentType(MediaType.APPLICATION_JSON) - .body(BodyInserters.fromPublisher(newSchemaSubject, NewSchemaSubject.class)) - .retrieve() - .toEntity(SchemaSubject.class)) + .map(cluster -> { + log.info("Submitting a new subject: {} to Schema Registry: {}", subjectSchema, cluster.getSchemaRegistry()); + return webClient.post() + .uri(cluster.getSchemaRegistry() + URL_SUBJECT_VERSIONS, subjectSchema) + .contentType(MediaType.APPLICATION_JSON) + .body(BodyInserters.fromPublisher(newSchemaSubject, NewSchemaSubject.class)) + .retrieve() + .onStatus(HttpStatus.INTERNAL_SERVER_ERROR::equals, ClientResponse::createException) + .toEntity(SchemaSubject.class); + }) .orElse(Mono.error(new NotFoundException("No such cluster"))); } } diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/rest/MetricsRestController.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/rest/MetricsRestController.java index bba489cec2..c3bde074b6 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/rest/MetricsRestController.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/rest/MetricsRestController.java @@ -137,7 +137,7 @@ public class MetricsRestController implements ApiClustersApi { @Valid Mono newSchemaSubject, ServerWebExchange exchange) { return schemaRegistryService.createNewSubject(clusterName, schemaName, newSchemaSubject) - .onErrorReturn(ResponseEntity.notFound().build()); + .onErrorReturn(ResponseEntity.badRequest().build()); } @Override diff --git a/kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml b/kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml index 8c564cc166..925d1b2594 100644 --- a/kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml +++ b/kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml @@ -434,6 +434,8 @@ paths: application/json: schema: $ref: '#/components/schemas/SchemaSubject' + 400: + description: Bad request /api/clusters/{clusterName}/schemas/{schemaName}/versions/{version}: get: