Forward schema registry errors to API (#693)

* Forward schema registry errors to API

* Fix pr checks job
This commit is contained in:
Roman Zabaluev 2021-07-21 15:54:31 +03:00 committed by GitHub
parent 443ed8bc8c
commit b296108e5b
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
2 changed files with 19 additions and 2 deletions

View file

@ -0,0 +1,14 @@
package com.provectus.kafka.ui.model.schemaregistry;
import com.fasterxml.jackson.annotation.JsonProperty;
import lombok.Data;
@Data
public class ErrorResponse {
@JsonProperty("error_code")
private int errorCode;
private String message;
}

View file

@ -14,6 +14,7 @@ import com.provectus.kafka.ui.model.KafkaCluster;
import com.provectus.kafka.ui.model.NewSchemaSubject; import com.provectus.kafka.ui.model.NewSchemaSubject;
import com.provectus.kafka.ui.model.SchemaSubject; import com.provectus.kafka.ui.model.SchemaSubject;
import com.provectus.kafka.ui.model.SchemaType; import com.provectus.kafka.ui.model.SchemaType;
import com.provectus.kafka.ui.model.schemaregistry.ErrorResponse;
import com.provectus.kafka.ui.model.schemaregistry.InternalCompatibilityCheck; import com.provectus.kafka.ui.model.schemaregistry.InternalCompatibilityCheck;
import com.provectus.kafka.ui.model.schemaregistry.InternalCompatibilityLevel; import com.provectus.kafka.ui.model.schemaregistry.InternalCompatibilityLevel;
import com.provectus.kafka.ui.model.schemaregistry.InternalNewSchema; import com.provectus.kafka.ui.model.schemaregistry.InternalNewSchema;
@ -195,7 +196,8 @@ public class SchemaRegistryService {
.body(BodyInserters.fromPublisher(newSchemaSubject, InternalNewSchema.class)) .body(BodyInserters.fromPublisher(newSchemaSubject, InternalNewSchema.class))
.retrieve() .retrieve()
.onStatus(UNPROCESSABLE_ENTITY::equals, .onStatus(UNPROCESSABLE_ENTITY::equals,
r -> Mono.error(new UnprocessableEntityException("Invalid params"))) r -> r.bodyToMono(ErrorResponse.class)
.flatMap(x -> Mono.error(new UnprocessableEntityException(x.getMessage()))))
.bodyToMono(SubjectIdResponse.class); .bodyToMono(SubjectIdResponse.class);
} }
@ -210,7 +212,8 @@ public class SchemaRegistryService {
.retrieve() .retrieve()
.onStatus(NOT_FOUND::equals, res -> Mono.empty()) .onStatus(NOT_FOUND::equals, res -> Mono.empty())
.onStatus(UNPROCESSABLE_ENTITY::equals, .onStatus(UNPROCESSABLE_ENTITY::equals,
r -> Mono.error(new UnprocessableEntityException("Invalid params"))) r -> r.bodyToMono(ErrorResponse.class)
.flatMap(x -> Mono.error(new UnprocessableEntityException(x.getMessage()))))
.bodyToMono(SchemaSubject.class) .bodyToMono(SchemaSubject.class)
.filter(s -> Objects.isNull(s.getId())) .filter(s -> Objects.isNull(s.getId()))
.switchIfEmpty(Mono.error(new DuplicateEntityException("Such schema already exists"))); .switchIfEmpty(Mono.error(new DuplicateEntityException("Such schema already exists")));