Add GET/PUT methods to get/update a schema compatibility level globally or only for a scepific schema

This commit is contained in:
Ildar Almakaev 2021-02-01 13:45:52 +03:00
parent 2525772162
commit ce003eae93
3 changed files with 186 additions and 1 deletions

View file

@ -2,6 +2,8 @@ package com.provectus.kafka.ui.cluster.service;
import com.provectus.kafka.ui.cluster.exception.NotFoundException;
import com.provectus.kafka.ui.cluster.model.ClustersStorage;
import com.provectus.kafka.ui.model.CompatibilityLevel;
import com.provectus.kafka.ui.model.CompatibilityLevelResponse;
import com.provectus.kafka.ui.model.NewSchemaSubject;
import com.provectus.kafka.ui.model.SchemaSubject;
import lombok.RequiredArgsConstructor;
@ -16,6 +18,8 @@ import org.springframework.web.reactive.function.client.WebClient;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import java.util.Objects;
@Service
@Log4j2
@RequiredArgsConstructor
@ -91,4 +95,38 @@ public class SchemaRegistryService {
})
.orElse(Mono.error(new NotFoundException("No such cluster")));
}
/**
* Updates a compatibility level for a <code>schemaName</code>
*
* @param schemaName is a schema subject name
* @see com.provectus.kafka.ui.model.CompatibilityLevel.CompatibilityEnum
*/
public Mono<Void> updateSchemaCompatibility(String clusterName, String schemaName, Mono<CompatibilityLevel> compatibilityLevel) {
return clustersStorage.getClusterByName(clusterName)
.map(cluster -> {
String configEndpoint = Objects.isNull(schemaName) ? "/config" : "/config/{schemaName}";
return webClient.put()
.uri(cluster.getSchemaRegistry() + configEndpoint, schemaName)
.contentType(MediaType.APPLICATION_JSON)
.body(BodyInserters.fromPublisher(compatibilityLevel, CompatibilityLevel.class))
.retrieve()
.bodyToMono(Void.class);
}).orElse(Mono.error(new NotFoundException("No such cluster")));
}
public Mono<Void> updateSchemaCompatibility(String clusterName, Mono<CompatibilityLevel> compatibilityLevel) {
return updateSchemaCompatibility(clusterName, null, compatibilityLevel);
}
public Mono<CompatibilityLevelResponse> getSchemaCompatibilityLevel(String clusterName, String schemaName) {
return clustersStorage.getClusterByName(clusterName)
.map(cluster -> {
String configEndpoint = Objects.isNull(schemaName) ? "/config" : "/config/{schemaName}";
return webClient.get()
.uri(cluster.getSchemaRegistry() + configEndpoint, schemaName)
.retrieve()
.bodyToMono(CompatibilityLevelResponse.class);
}).orElse(Mono.error(new NotFoundException("No such cluster")));
}
}

View file

@ -6,6 +6,7 @@ import com.provectus.kafka.ui.cluster.service.ClusterService;
import com.provectus.kafka.ui.cluster.service.SchemaRegistryService;
import com.provectus.kafka.ui.model.*;
import lombok.RequiredArgsConstructor;
import lombok.extern.log4j.Log4j2;
import org.apache.commons.lang3.tuple.Pair;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
@ -21,6 +22,7 @@ import java.util.function.Function;
@RestController
@RequiredArgsConstructor
@Log4j2
public class MetricsRestController implements ApiClustersApi {
private final ClusterService clusterService;
@ -150,6 +152,35 @@ public class MetricsRestController implements ApiClustersApi {
return clusterService.updateTopic(clusterId, topicName, topicFormData).map(ResponseEntity::ok);
}
@Override
public Mono<ResponseEntity<CompatibilityLevelResponse>> getGlobalSchemaCompatibilityLevel(String clusterName, ServerWebExchange exchange) {
return schemaRegistryService.getSchemaCompatibilityLevel(clusterName, null)
.map(ResponseEntity::ok)
.onErrorReturn(ResponseEntity.badRequest().build());
}
@Override
public Mono<ResponseEntity<CompatibilityLevelResponse>> getSchemaCompatibilityLevel(String clusterName, String schemaName, ServerWebExchange exchange) {
return schemaRegistryService.getSchemaCompatibilityLevel(clusterName, schemaName)
.map(ResponseEntity::ok)
.onErrorReturn(ResponseEntity.badRequest().build());
}
@Override
public Mono<ResponseEntity<Void>> updateGlobalSchemaCompatibilityLevel(String clusterName, @Valid Mono<CompatibilityLevel> compatibilityLevel, ServerWebExchange exchange) {
log.info("Updating schema compatibility globally");
return schemaRegistryService.updateSchemaCompatibility(clusterName, compatibilityLevel)
.map(ResponseEntity::ok)
.onErrorReturn(ResponseEntity.notFound().build());
}
@Override
public Mono<ResponseEntity<Void>> updateSchemaCompatibilityLevel(String clusterName, String schemaName, @Valid Mono<CompatibilityLevel> compatibilityLevel, ServerWebExchange exchange) {
log.info("Updating schema compatibility for schema: {}", schemaName);
return schemaRegistryService.updateSchemaCompatibility(clusterName, schemaName, compatibilityLevel)
.map(ResponseEntity::ok)
.onErrorReturn(ResponseEntity.notFound().build());
}
private Mono<ConsumerPosition> parseConsumerPosition(SeekType seekType, List<String> seekTo) {
return Mono.justOrEmpty(seekTo)

View file

@ -495,6 +495,98 @@ paths:
404:
description: Not found
/api/clusters/{clusterName}/schemas/compatibility:
get:
tags:
- /api/clusters
summary: Get schema compatibility level globally
operationId: getGlobalSchemaCompatibilityLevel
parameters:
- name: clusterName
in: path
required: true
schema:
type: string
responses:
200:
description: OK
content:
application/json:
schema:
$ref: '#/components/schemas/CompatibilityLevelResponse'
put:
tags:
- /api/clusters
summary: Update compatibility level globally
operationId: updateGlobalSchemaCompatibilityLevel
parameters:
- name: clusterName
in: path
required: true
schema:
type: string
requestBody:
content:
application/json:
schema:
$ref: '#/components/schemas/CompatibilityLevel'
responses:
200:
description: OK
404:
description: Not Found
/api/clusters/{clusterName}/schemas/compatibility/{schemaName}:
get:
tags:
- /api/clusters
summary: Get schema compatibility level of specific schema
operationId: getSchemaCompatibilityLevel
parameters:
- name: clusterName
in: path
required: true
schema:
type: string
- name: schemaName
in: path
required: true
schema:
type: string
responses:
200:
description: OK
content:
application/json:
schema:
$ref: '#/components/schemas/CompatibilityLevelResponse'
put:
tags:
- /api/clusters
summary: Update compatibility level for specific schema.
operationId: updateSchemaCompatibilityLevel
parameters:
- name: clusterName
in: path
required: true
schema:
type: string
- name: schemaName
in: path
required: true
schema:
type: string
requestBody:
content:
application/json:
schema:
$ref: '#/components/schemas/CompatibilityLevel'
responses:
200:
description: OK
404:
description: Not Found
components:
schemas:
Cluster:
@ -839,4 +931,28 @@ components:
schema:
type: string
required:
- schema
- schema
CompatibilityLevel:
type: object
properties:
compatibility:
type: string
enum:
- BACKWARD
- BACKWARD_TRANSITIVE
- FORWARD
- FORWARD_TRANSITIVE
- FULL
- FULL_TRANSITIVE
- NONE
required:
- compatibility
CompatibilityLevelResponse:
type: object
properties:
compatibilityLevel:
type: string
required:
- compatibilityLevel