From 5e0de324392692fa10e4a0e8e90a88a39c42c957 Mon Sep 17 00:00:00 2001 From: Ildar Almakaev Date: Tue, 2 Feb 2021 11:46:14 +0300 Subject: [PATCH] Add endpoint to check schema compatibility --- .../ui/cluster/mapper/ClusterMapper.java | 4 ++ .../model/InternalCompatibilityCheck.java | 10 +++++ .../service/SchemaRegistryService.java | 21 ++++++++-- .../kafka/ui/rest/MetricsRestController.java | 9 ++++ .../main/resources/swagger/kafka-ui-api.yaml | 42 ++++++++++++++++++- 5 files changed, 81 insertions(+), 5 deletions(-) create mode 100644 kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/model/InternalCompatibilityCheck.java diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/mapper/ClusterMapper.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/mapper/ClusterMapper.java index 6ceb08a1a1..c2909ff611 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/mapper/ClusterMapper.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/mapper/ClusterMapper.java @@ -2,6 +2,7 @@ package com.provectus.kafka.ui.cluster.mapper; import com.provectus.kafka.ui.cluster.config.ClustersProperties; import com.provectus.kafka.ui.cluster.model.*; +import com.provectus.kafka.ui.cluster.model.InternalCompatibilityCheck; import com.provectus.kafka.ui.model.*; import java.util.Properties; import org.mapstruct.Mapper; @@ -36,6 +37,9 @@ public interface ClusterMapper { TopicConfig toTopicConfig(InternalTopicConfig topic); Replica toReplica(InternalReplica replica); + @Mapping(target = "isCompatible", source = "compatible") + CompatibilityCheckResponse toCompatibilityCheckResponse(InternalCompatibilityCheck dto); + default TopicDetails toTopicDetails(InternalTopic topic, InternalClusterMetrics metrics) { final TopicDetails result = toTopicDetails(topic); result.setBytesInPerSec( diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/model/InternalCompatibilityCheck.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/model/InternalCompatibilityCheck.java new file mode 100644 index 0000000000..88f8020402 --- /dev/null +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/model/InternalCompatibilityCheck.java @@ -0,0 +1,10 @@ +package com.provectus.kafka.ui.cluster.model; + +import com.fasterxml.jackson.annotation.JsonProperty; +import lombok.Data; + +@Data +public class InternalCompatibilityCheck { + @JsonProperty("is_compatible") + private final boolean isCompatible; +} diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/service/SchemaRegistryService.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/service/SchemaRegistryService.java index f6551829d1..cbed665937 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/service/SchemaRegistryService.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/service/SchemaRegistryService.java @@ -1,11 +1,10 @@ package com.provectus.kafka.ui.cluster.service; import com.provectus.kafka.ui.cluster.exception.NotFoundException; +import com.provectus.kafka.ui.cluster.mapper.ClusterMapper; import com.provectus.kafka.ui.cluster.model.ClustersStorage; -import com.provectus.kafka.ui.model.CompatibilityLevel; -import com.provectus.kafka.ui.model.CompatibilityLevelResponse; -import com.provectus.kafka.ui.model.NewSchemaSubject; -import com.provectus.kafka.ui.model.SchemaSubject; +import com.provectus.kafka.ui.cluster.model.InternalCompatibilityCheck; +import com.provectus.kafka.ui.model.*; import lombok.RequiredArgsConstructor; import lombok.extern.log4j.Log4j2; import org.springframework.http.HttpStatus; @@ -31,6 +30,7 @@ public class SchemaRegistryService { private static final String LATEST = "latest"; private final ClustersStorage clustersStorage; + private final ClusterMapper mapper; private final WebClient webClient; public Flux getAllSchemaSubjects(String clusterName) { @@ -146,4 +146,17 @@ public class SchemaRegistryService { .bodyToMono(CompatibilityLevelResponse.class); }).orElse(Mono.error(new NotFoundException("No such cluster"))); } + + public Mono checksSchemaCompatibility(String clusterName, String schemaName, Mono newSchemaSubject) { + return clustersStorage.getClusterByName(clusterName) + .map(cluster -> webClient.post() + .uri(cluster.getSchemaRegistry() + "/compatibility/subjects/{subjectName}/versions/latest", schemaName) + .contentType(MediaType.APPLICATION_JSON) + .body(BodyInserters.fromPublisher(newSchemaSubject, NewSchemaSubject.class)) + .retrieve() + .bodyToMono(InternalCompatibilityCheck.class) + .map(mapper::toCompatibilityCheckResponse) + .log() + ).orElse(Mono.error(new NotFoundException("No such cluster"))); + } } diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/rest/MetricsRestController.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/rest/MetricsRestController.java index f7b4ac632f..36fc001ddc 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/rest/MetricsRestController.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/rest/MetricsRestController.java @@ -186,6 +186,15 @@ public class MetricsRestController implements ApiClustersApi { .onErrorReturn(ResponseEntity.notFound().build()); } + @Override + public Mono> checkSchemaCompatibility(String clusterName, String schemaName, + @Valid Mono newSchemaSubject, + ServerWebExchange exchange) { + return schemaRegistryService.checksSchemaCompatibility(clusterName, schemaName, newSchemaSubject) + .map(ResponseEntity::ok) + .onErrorReturn(ResponseEntity.badRequest().build()); + } + @Override public Mono> updateSchemaCompatibilityLevel(String clusterName, String schemaName, @Valid Mono compatibilityLevel, ServerWebExchange exchange) { log.info("Updating schema compatibility for schema: {}", schemaName); diff --git a/kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml b/kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml index 45fb60aea5..b78b04774f 100644 --- a/kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml +++ b/kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml @@ -636,6 +636,38 @@ paths: 404: description: Not Found + /api/clusters/{clusterName}/schemas/compatibility/{schemaName}/check: + post: + tags: + - /api/clusters + summary: Check compatibility of the schema. + operationId: checkSchemaCompatibility + parameters: + - name: clusterName + in: path + required: true + schema: + type: string + - name: schemaName + in: path + required: true + schema: + type: string + requestBody: + content: + application/json: + schema: + $ref: '#/components/schemas/NewSchemaSubject' + responses: + 200: + description: OK + content: + application/json: + schema: + $ref: '#/components/schemas/CompatibilityCheckResponse' + 404: + description: Not Found + components: schemas: Cluster: @@ -1004,4 +1036,12 @@ components: compatibilityLevel: type: string required: - - compatibilityLevel \ No newline at end of file + - compatibilityLevel + + CompatibilityCheckResponse: + type: object + properties: + isCompatible: + type: boolean + required: + - isCompatible