Add endpoint to check schema compatibility
This commit is contained in:
parent
15f48e7649
commit
5e0de32439
5 changed files with 81 additions and 5 deletions
|
@ -2,6 +2,7 @@ package com.provectus.kafka.ui.cluster.mapper;
|
||||||
|
|
||||||
import com.provectus.kafka.ui.cluster.config.ClustersProperties;
|
import com.provectus.kafka.ui.cluster.config.ClustersProperties;
|
||||||
import com.provectus.kafka.ui.cluster.model.*;
|
import com.provectus.kafka.ui.cluster.model.*;
|
||||||
|
import com.provectus.kafka.ui.cluster.model.InternalCompatibilityCheck;
|
||||||
import com.provectus.kafka.ui.model.*;
|
import com.provectus.kafka.ui.model.*;
|
||||||
import java.util.Properties;
|
import java.util.Properties;
|
||||||
import org.mapstruct.Mapper;
|
import org.mapstruct.Mapper;
|
||||||
|
@ -36,6 +37,9 @@ public interface ClusterMapper {
|
||||||
TopicConfig toTopicConfig(InternalTopicConfig topic);
|
TopicConfig toTopicConfig(InternalTopicConfig topic);
|
||||||
Replica toReplica(InternalReplica replica);
|
Replica toReplica(InternalReplica replica);
|
||||||
|
|
||||||
|
@Mapping(target = "isCompatible", source = "compatible")
|
||||||
|
CompatibilityCheckResponse toCompatibilityCheckResponse(InternalCompatibilityCheck dto);
|
||||||
|
|
||||||
default TopicDetails toTopicDetails(InternalTopic topic, InternalClusterMetrics metrics) {
|
default TopicDetails toTopicDetails(InternalTopic topic, InternalClusterMetrics metrics) {
|
||||||
final TopicDetails result = toTopicDetails(topic);
|
final TopicDetails result = toTopicDetails(topic);
|
||||||
result.setBytesInPerSec(
|
result.setBytesInPerSec(
|
||||||
|
|
|
@ -0,0 +1,10 @@
|
||||||
|
package com.provectus.kafka.ui.cluster.model;
|
||||||
|
|
||||||
|
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||||
|
import lombok.Data;
|
||||||
|
|
||||||
|
@Data
|
||||||
|
public class InternalCompatibilityCheck {
|
||||||
|
@JsonProperty("is_compatible")
|
||||||
|
private final boolean isCompatible;
|
||||||
|
}
|
|
@ -1,11 +1,10 @@
|
||||||
package com.provectus.kafka.ui.cluster.service;
|
package com.provectus.kafka.ui.cluster.service;
|
||||||
|
|
||||||
import com.provectus.kafka.ui.cluster.exception.NotFoundException;
|
import com.provectus.kafka.ui.cluster.exception.NotFoundException;
|
||||||
|
import com.provectus.kafka.ui.cluster.mapper.ClusterMapper;
|
||||||
import com.provectus.kafka.ui.cluster.model.ClustersStorage;
|
import com.provectus.kafka.ui.cluster.model.ClustersStorage;
|
||||||
import com.provectus.kafka.ui.model.CompatibilityLevel;
|
import com.provectus.kafka.ui.cluster.model.InternalCompatibilityCheck;
|
||||||
import com.provectus.kafka.ui.model.CompatibilityLevelResponse;
|
import com.provectus.kafka.ui.model.*;
|
||||||
import com.provectus.kafka.ui.model.NewSchemaSubject;
|
|
||||||
import com.provectus.kafka.ui.model.SchemaSubject;
|
|
||||||
import lombok.RequiredArgsConstructor;
|
import lombok.RequiredArgsConstructor;
|
||||||
import lombok.extern.log4j.Log4j2;
|
import lombok.extern.log4j.Log4j2;
|
||||||
import org.springframework.http.HttpStatus;
|
import org.springframework.http.HttpStatus;
|
||||||
|
@ -31,6 +30,7 @@ public class SchemaRegistryService {
|
||||||
private static final String LATEST = "latest";
|
private static final String LATEST = "latest";
|
||||||
|
|
||||||
private final ClustersStorage clustersStorage;
|
private final ClustersStorage clustersStorage;
|
||||||
|
private final ClusterMapper mapper;
|
||||||
private final WebClient webClient;
|
private final WebClient webClient;
|
||||||
|
|
||||||
public Flux<String> getAllSchemaSubjects(String clusterName) {
|
public Flux<String> getAllSchemaSubjects(String clusterName) {
|
||||||
|
@ -146,4 +146,17 @@ public class SchemaRegistryService {
|
||||||
.bodyToMono(CompatibilityLevelResponse.class);
|
.bodyToMono(CompatibilityLevelResponse.class);
|
||||||
}).orElse(Mono.error(new NotFoundException("No such cluster")));
|
}).orElse(Mono.error(new NotFoundException("No such cluster")));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public Mono<CompatibilityCheckResponse> checksSchemaCompatibility(String clusterName, String schemaName, Mono<NewSchemaSubject> newSchemaSubject) {
|
||||||
|
return clustersStorage.getClusterByName(clusterName)
|
||||||
|
.map(cluster -> webClient.post()
|
||||||
|
.uri(cluster.getSchemaRegistry() + "/compatibility/subjects/{subjectName}/versions/latest", schemaName)
|
||||||
|
.contentType(MediaType.APPLICATION_JSON)
|
||||||
|
.body(BodyInserters.fromPublisher(newSchemaSubject, NewSchemaSubject.class))
|
||||||
|
.retrieve()
|
||||||
|
.bodyToMono(InternalCompatibilityCheck.class)
|
||||||
|
.map(mapper::toCompatibilityCheckResponse)
|
||||||
|
.log()
|
||||||
|
).orElse(Mono.error(new NotFoundException("No such cluster")));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -186,6 +186,15 @@ public class MetricsRestController implements ApiClustersApi {
|
||||||
.onErrorReturn(ResponseEntity.notFound().build());
|
.onErrorReturn(ResponseEntity.notFound().build());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Mono<ResponseEntity<CompatibilityCheckResponse>> checkSchemaCompatibility(String clusterName, String schemaName,
|
||||||
|
@Valid Mono<NewSchemaSubject> newSchemaSubject,
|
||||||
|
ServerWebExchange exchange) {
|
||||||
|
return schemaRegistryService.checksSchemaCompatibility(clusterName, schemaName, newSchemaSubject)
|
||||||
|
.map(ResponseEntity::ok)
|
||||||
|
.onErrorReturn(ResponseEntity.badRequest().build());
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Mono<ResponseEntity<Void>> updateSchemaCompatibilityLevel(String clusterName, String schemaName, @Valid Mono<CompatibilityLevel> compatibilityLevel, ServerWebExchange exchange) {
|
public Mono<ResponseEntity<Void>> updateSchemaCompatibilityLevel(String clusterName, String schemaName, @Valid Mono<CompatibilityLevel> compatibilityLevel, ServerWebExchange exchange) {
|
||||||
log.info("Updating schema compatibility for schema: {}", schemaName);
|
log.info("Updating schema compatibility for schema: {}", schemaName);
|
||||||
|
|
|
@ -636,6 +636,38 @@ paths:
|
||||||
404:
|
404:
|
||||||
description: Not Found
|
description: Not Found
|
||||||
|
|
||||||
|
/api/clusters/{clusterName}/schemas/compatibility/{schemaName}/check:
|
||||||
|
post:
|
||||||
|
tags:
|
||||||
|
- /api/clusters
|
||||||
|
summary: Check compatibility of the schema.
|
||||||
|
operationId: checkSchemaCompatibility
|
||||||
|
parameters:
|
||||||
|
- name: clusterName
|
||||||
|
in: path
|
||||||
|
required: true
|
||||||
|
schema:
|
||||||
|
type: string
|
||||||
|
- name: schemaName
|
||||||
|
in: path
|
||||||
|
required: true
|
||||||
|
schema:
|
||||||
|
type: string
|
||||||
|
requestBody:
|
||||||
|
content:
|
||||||
|
application/json:
|
||||||
|
schema:
|
||||||
|
$ref: '#/components/schemas/NewSchemaSubject'
|
||||||
|
responses:
|
||||||
|
200:
|
||||||
|
description: OK
|
||||||
|
content:
|
||||||
|
application/json:
|
||||||
|
schema:
|
||||||
|
$ref: '#/components/schemas/CompatibilityCheckResponse'
|
||||||
|
404:
|
||||||
|
description: Not Found
|
||||||
|
|
||||||
components:
|
components:
|
||||||
schemas:
|
schemas:
|
||||||
Cluster:
|
Cluster:
|
||||||
|
@ -1004,4 +1036,12 @@ components:
|
||||||
compatibilityLevel:
|
compatibilityLevel:
|
||||||
type: string
|
type: string
|
||||||
required:
|
required:
|
||||||
- compatibilityLevel
|
- compatibilityLevel
|
||||||
|
|
||||||
|
CompatibilityCheckResponse:
|
||||||
|
type: object
|
||||||
|
properties:
|
||||||
|
isCompatible:
|
||||||
|
type: boolean
|
||||||
|
required:
|
||||||
|
- isCompatible
|
||||||
|
|
Loading…
Add table
Reference in a new issue