|
@@ -106,16 +106,12 @@ public class MetricsRestController implements ApiClustersApi {
|
|
|
|
|
|
@Override
|
|
@Override
|
|
public Mono<ResponseEntity<SchemaSubject>> getLatestSchema(String clusterName, String schemaName, ServerWebExchange exchange) {
|
|
public Mono<ResponseEntity<SchemaSubject>> getLatestSchema(String clusterName, String schemaName, ServerWebExchange exchange) {
|
|
- return schemaRegistryService.getLatestSchemaSubject(clusterName, schemaName)
|
|
|
|
- .map(ResponseEntity::ok)
|
|
|
|
- .onErrorReturn(ResponseEntity.notFound().build());
|
|
|
|
|
|
+ return schemaRegistryService.getLatestSchemaSubject(clusterName, schemaName).map(ResponseEntity::ok);
|
|
}
|
|
}
|
|
|
|
|
|
@Override
|
|
@Override
|
|
public Mono<ResponseEntity<SchemaSubject>> getSchemaByVersion(String clusterName, String schemaName, Integer version, ServerWebExchange exchange) {
|
|
public Mono<ResponseEntity<SchemaSubject>> getSchemaByVersion(String clusterName, String schemaName, Integer version, ServerWebExchange exchange) {
|
|
- return schemaRegistryService.getSchemaSubjectByVersion(clusterName, schemaName, version)
|
|
|
|
- .map(ResponseEntity::ok)
|
|
|
|
- .onErrorReturn(ResponseEntity.notFound().build());
|
|
|
|
|
|
+ return schemaRegistryService.getSchemaSubjectByVersion(clusterName, schemaName, version).map(ResponseEntity::ok);
|
|
}
|
|
}
|
|
|
|
|
|
@Override
|
|
@Override
|
|
@@ -126,34 +122,29 @@ public class MetricsRestController implements ApiClustersApi {
|
|
|
|
|
|
@Override
|
|
@Override
|
|
public Mono<ResponseEntity<Flux<Integer>>> getSchemaVersions(String clusterName, String subjectName, ServerWebExchange exchange) {
|
|
public Mono<ResponseEntity<Flux<Integer>>> getSchemaVersions(String clusterName, String subjectName, ServerWebExchange exchange) {
|
|
- return Mono.just(ResponseEntity.ok(schemaRegistryService.getSchemaSubjectVersions(clusterName, subjectName)))
|
|
|
|
- .onErrorReturn(ResponseEntity.notFound().build());
|
|
|
|
|
|
+ return Mono.just(ResponseEntity.ok(schemaRegistryService.getSchemaSubjectVersions(clusterName, subjectName)));
|
|
}
|
|
}
|
|
|
|
|
|
@Override
|
|
@Override
|
|
public Mono<ResponseEntity<Void>> deleteLatestSchema(String clusterName, String schemaName, ServerWebExchange exchange) {
|
|
public Mono<ResponseEntity<Void>> deleteLatestSchema(String clusterName, String schemaName, ServerWebExchange exchange) {
|
|
- return schemaRegistryService.deleteLatestSchemaSubject(clusterName, schemaName)
|
|
|
|
- .onErrorReturn(ResponseEntity.notFound().build());
|
|
|
|
|
|
+ return schemaRegistryService.deleteLatestSchemaSubject(clusterName, schemaName);
|
|
}
|
|
}
|
|
|
|
|
|
@Override
|
|
@Override
|
|
public Mono<ResponseEntity<Void>> deleteSchemaByVersion(String clusterName, String subjectName, Integer version, ServerWebExchange exchange) {
|
|
public Mono<ResponseEntity<Void>> deleteSchemaByVersion(String clusterName, String subjectName, Integer version, ServerWebExchange exchange) {
|
|
- return schemaRegistryService.deleteSchemaSubjectByVersion(clusterName, subjectName, version)
|
|
|
|
- .onErrorReturn(ResponseEntity.notFound().build());
|
|
|
|
|
|
+ return schemaRegistryService.deleteSchemaSubjectByVersion(clusterName, subjectName, version);
|
|
}
|
|
}
|
|
|
|
|
|
@Override
|
|
@Override
|
|
public Mono<ResponseEntity<Void>> deleteSchema(String clusterName, String subjectName, ServerWebExchange exchange) {
|
|
public Mono<ResponseEntity<Void>> deleteSchema(String clusterName, String subjectName, ServerWebExchange exchange) {
|
|
- return schemaRegistryService.deleteSchemaSubject(clusterName, subjectName)
|
|
|
|
- .onErrorReturn(ResponseEntity.notFound().build());
|
|
|
|
|
|
+ return schemaRegistryService.deleteSchemaSubject(clusterName, subjectName);
|
|
}
|
|
}
|
|
|
|
|
|
@Override
|
|
@Override
|
|
public Mono<ResponseEntity<SchemaSubject>> createNewSchema(String clusterName, String schemaName,
|
|
public Mono<ResponseEntity<SchemaSubject>> createNewSchema(String clusterName, String schemaName,
|
|
@Valid Mono<NewSchemaSubject> newSchemaSubject,
|
|
@Valid Mono<NewSchemaSubject> newSchemaSubject,
|
|
ServerWebExchange exchange) {
|
|
ServerWebExchange exchange) {
|
|
- return schemaRegistryService.createNewSubject(clusterName, schemaName, newSchemaSubject)
|
|
|
|
- .onErrorReturn(ResponseEntity.badRequest().build());
|
|
|
|
|
|
+ return schemaRegistryService.createNewSubject(clusterName, schemaName, newSchemaSubject);
|
|
}
|
|
}
|
|
|
|
|
|
@Override
|
|
@Override
|
|
@@ -170,32 +161,29 @@ public class MetricsRestController implements ApiClustersApi {
|
|
public Mono<ResponseEntity<CompatibilityLevel>> getGlobalSchemaCompatibilityLevel(String clusterName, ServerWebExchange exchange) {
|
|
public Mono<ResponseEntity<CompatibilityLevel>> getGlobalSchemaCompatibilityLevel(String clusterName, ServerWebExchange exchange) {
|
|
return schemaRegistryService.getGlobalSchemaCompatibilityLevel(clusterName)
|
|
return schemaRegistryService.getGlobalSchemaCompatibilityLevel(clusterName)
|
|
.map(ResponseEntity::ok)
|
|
.map(ResponseEntity::ok)
|
|
- .defaultIfEmpty(ResponseEntity.badRequest().build());
|
|
|
|
|
|
+ .defaultIfEmpty(ResponseEntity.notFound().build());
|
|
}
|
|
}
|
|
|
|
|
|
@Override
|
|
@Override
|
|
public Mono<ResponseEntity<Void>> updateGlobalSchemaCompatibilityLevel(String clusterName, @Valid Mono<CompatibilityLevel> compatibilityLevel, ServerWebExchange exchange) {
|
|
public Mono<ResponseEntity<Void>> updateGlobalSchemaCompatibilityLevel(String clusterName, @Valid Mono<CompatibilityLevel> compatibilityLevel, ServerWebExchange exchange) {
|
|
log.info("Updating schema compatibility globally");
|
|
log.info("Updating schema compatibility globally");
|
|
return schemaRegistryService.updateSchemaCompatibility(clusterName, compatibilityLevel)
|
|
return schemaRegistryService.updateSchemaCompatibility(clusterName, compatibilityLevel)
|
|
- .map(ResponseEntity::ok)
|
|
|
|
- .onErrorReturn(ResponseEntity.notFound().build());
|
|
|
|
|
|
+ .map(ResponseEntity::ok);
|
|
}
|
|
}
|
|
|
|
|
|
@Override
|
|
@Override
|
|
public Mono<ResponseEntity<CompatibilityCheckResponse>> checkSchemaCompatibility(String clusterName, String schemaName,
|
|
public Mono<ResponseEntity<CompatibilityCheckResponse>> checkSchemaCompatibility(String clusterName, String schemaName,
|
|
- @Valid Mono<NewSchemaSubject> newSchemaSubject,
|
|
|
|
- ServerWebExchange exchange) {
|
|
|
|
|
|
+ @Valid Mono<NewSchemaSubject> newSchemaSubject,
|
|
|
|
+ ServerWebExchange exchange) {
|
|
return schemaRegistryService.checksSchemaCompatibility(clusterName, schemaName, newSchemaSubject)
|
|
return schemaRegistryService.checksSchemaCompatibility(clusterName, schemaName, newSchemaSubject)
|
|
- .map(ResponseEntity::ok)
|
|
|
|
- .onErrorReturn(ResponseEntity.badRequest().build());
|
|
|
|
|
|
+ .map(ResponseEntity::ok);
|
|
}
|
|
}
|
|
|
|
|
|
@Override
|
|
@Override
|
|
public Mono<ResponseEntity<Void>> updateSchemaCompatibilityLevel(String clusterName, String schemaName, @Valid Mono<CompatibilityLevel> compatibilityLevel, ServerWebExchange exchange) {
|
|
public Mono<ResponseEntity<Void>> updateSchemaCompatibilityLevel(String clusterName, String schemaName, @Valid Mono<CompatibilityLevel> compatibilityLevel, ServerWebExchange exchange) {
|
|
log.info("Updating schema compatibility for schema: {}", schemaName);
|
|
log.info("Updating schema compatibility for schema: {}", schemaName);
|
|
return schemaRegistryService.updateSchemaCompatibility(clusterName, schemaName, compatibilityLevel)
|
|
return schemaRegistryService.updateSchemaCompatibility(clusterName, schemaName, compatibilityLevel)
|
|
- .map(ResponseEntity::ok)
|
|
|
|
- .onErrorReturn(ResponseEntity.notFound().build());
|
|
|
|
|
|
+ .map(ResponseEntity::ok);
|
|
}
|
|
}
|
|
|
|
|
|
private Mono<ConsumerPosition> parseConsumerPosition(SeekType seekType, List<String> seekTo) {
|
|
private Mono<ConsumerPosition> parseConsumerPosition(SeekType seekType, List<String> seekTo) {
|