From 37e9427cdb3ca4adb8487798dfa67bbe0196fe7f Mon Sep 17 00:00:00 2001 From: Ildar Almakaev Date: Thu, 28 Jan 2021 14:43:33 +0300 Subject: [PATCH] Add ability to create/delete schema subjects in/from schema-registry service --- .../cluster/exception/NotFoundException.java | 11 +++ .../ui/cluster/service/ClusterService.java | 13 --- .../service/SchemaRegistryService.java | 50 +++++++++- .../kafka/ui/rest/MetricsRestController.java | 23 ++++- .../main/resources/swagger/kafka-ui-api.yaml | 96 ++++++++++++++++++- 5 files changed, 171 insertions(+), 22 deletions(-) create mode 100644 kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/exception/NotFoundException.java diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/exception/NotFoundException.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/exception/NotFoundException.java new file mode 100644 index 0000000000..d7d41b35f8 --- /dev/null +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/exception/NotFoundException.java @@ -0,0 +1,11 @@ +package com.provectus.kafka.ui.cluster.exception; + +import org.springframework.http.HttpStatus; +import org.springframework.web.bind.annotation.ResponseStatus; + +@ResponseStatus(HttpStatus.NOT_FOUND) +public class NotFoundException extends RuntimeException { + public NotFoundException(String message) { + super(message); + } +} \ No newline at end of file diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/service/ClusterService.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/service/ClusterService.java index cd26f9b448..bd70b1ccb0 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/service/ClusterService.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/service/ClusterService.java @@ -31,7 +31,6 @@ public class ClusterService { private final ClusterMapper clusterMapper; private final KafkaService kafkaService; private final ConsumingService consumingService; - private final SchemaRegistryService schemaRegistryService; public List getClusters() { return clustersStorage.getKafkaClusters() @@ -180,16 +179,4 @@ public class ClusterService { .map(c -> consumingService.loadMessages(c, topicName, consumerPosition, query, limit)) .orElse(Flux.empty()); } - - public Flux getSchemaSubjects(String clusterName) { - return schemaRegistryService.getAllSchemaSubjects(clusterName); - } - - public Flux getSchemaSubjectVersions(String clusterName, String subjectName) { - return schemaRegistryService.getSchemaSubjectVersions(clusterName, subjectName); - } - - public Flux getSchemaSubjectByVersion(String clusterName, String subjectName, Integer version) { - return schemaRegistryService.getSchemaSubjectByVersion(clusterName, subjectName, version); - } } diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/service/SchemaRegistryService.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/service/SchemaRegistryService.java index 92da1dfab2..e0346e18e9 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/service/SchemaRegistryService.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/service/SchemaRegistryService.java @@ -1,13 +1,20 @@ package com.provectus.kafka.ui.cluster.service; +import com.provectus.kafka.ui.cluster.exception.NotFoundException; import com.provectus.kafka.ui.cluster.model.ClustersStorage; import com.provectus.kafka.ui.cluster.model.KafkaCluster; +import com.provectus.kafka.ui.model.NewSchemaSubject; import com.provectus.kafka.ui.model.SubjectSchema; import lombok.RequiredArgsConstructor; import lombok.extern.log4j.Log4j2; +import org.springframework.http.HttpStatus; +import org.springframework.http.MediaType; +import org.springframework.http.ResponseEntity; import org.springframework.stereotype.Service; +import org.springframework.web.reactive.function.BodyInserters; import org.springframework.web.reactive.function.client.WebClient; import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; @Service @Log4j2 @@ -15,12 +22,12 @@ import reactor.core.publisher.Flux; public class SchemaRegistryService { private final ClustersStorage clustersStorage; public static final String URL_SUBJECTS = "/subjects"; + public static final String URL_SUBJECT = "/subjects/{subjectName}"; public static final String URL_SUBJECT_VERSIONS = "/subjects/{subjectName}/versions"; - public static final String URL_SUBJECT = "/subjects/{subjectName}/versions/{version}"; + public static final String URL_SUBJECT_BY_VERSION = "/subjects/{subjectName}/versions/{version}"; public Flux getAllSchemaSubjects(String clusterName) { KafkaCluster kafkaCluster = clustersStorage.getClusterByName(clusterName).orElseThrow(); -// todo: use it as a bean WebClient webClient = WebClient.create(kafkaCluster.getSchemaRegistry()); return webClient.get() .uri(URL_SUBJECTS) @@ -30,11 +37,11 @@ public class SchemaRegistryService { public Flux getSchemaSubjectVersions(String clusterName, String subjectName) { KafkaCluster kafkaCluster = clustersStorage.getClusterByName(clusterName).orElseThrow(); -// todo: use it as a bean WebClient webClient = WebClient.create(kafkaCluster.getSchemaRegistry()); return webClient.get() .uri(URL_SUBJECT_VERSIONS, subjectName) .retrieve() + .onStatus(HttpStatus.NOT_FOUND::equals, resp -> Mono.error(new NotFoundException("No such subject"))) .bodyToFlux(Integer.class); } @@ -42,8 +49,43 @@ public class SchemaRegistryService { KafkaCluster kafkaCluster = clustersStorage.getClusterByName(clusterName).orElseThrow(); WebClient webClient = WebClient.create(kafkaCluster.getSchemaRegistry()); return webClient.get() - .uri(URL_SUBJECT, subjectName, version) + .uri(URL_SUBJECT_BY_VERSION, subjectName, version) .retrieve() + .onStatus(HttpStatus.NOT_FOUND::equals, resp -> Mono.error(new NotFoundException("No such subject or version"))) .bodyToFlux(SubjectSchema.class); } + + public Mono deleteSchemaSubjectByVersion(String clusterName, String subjectName, Integer version) { + KafkaCluster kafkaCluster = clustersStorage.getClusterByName(clusterName).orElseThrow(); + WebClient webClient = WebClient.create(kafkaCluster.getSchemaRegistry()); + return webClient.delete() + .uri(URL_SUBJECT_BY_VERSION, subjectName, version) + .retrieve() + .onStatus(HttpStatus.NOT_FOUND::equals, resp -> Mono.error(new NotFoundException("No such subject or version"))) + .bodyToMono(Object.class); + } + + public Mono deleteSchemaSubject(String clusterName, String subjectName) { + KafkaCluster kafkaCluster = clustersStorage.getClusterByName(clusterName).orElseThrow(); + WebClient webClient = WebClient.create(kafkaCluster.getSchemaRegistry()); + return webClient.delete() + .uri(URL_SUBJECT, subjectName) + .retrieve() + .onStatus(HttpStatus.NOT_FOUND::equals, resp -> Mono.error(new NotFoundException("No such subject or version"))) + .bodyToMono(Object.class); + } + + public Mono> createNewSubject(String clusterName, String subjectSchema, Mono newSchemaSubject) { + return clustersStorage.getClusterByName(clusterName) + .map(kafkaCluster -> WebClient.create(kafkaCluster.getSchemaRegistry())) + .map(webClient -> webClient + .post() + .uri(URL_SUBJECT_VERSIONS, subjectSchema) + .contentType(MediaType.APPLICATION_JSON) + .body(BodyInserters.fromPublisher(newSchemaSubject, NewSchemaSubject.class)) + .retrieve() + .toEntity(SubjectSchema.class) + ) + .orElse(Mono.empty()); + } } diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/rest/MetricsRestController.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/rest/MetricsRestController.java index 975c797d55..23ad418c55 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/rest/MetricsRestController.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/rest/MetricsRestController.java @@ -3,6 +3,7 @@ package com.provectus.kafka.ui.rest; import com.provectus.kafka.ui.api.ApiClustersApi; import com.provectus.kafka.ui.cluster.model.ConsumerPosition; import com.provectus.kafka.ui.cluster.service.ClusterService; +import com.provectus.kafka.ui.cluster.service.SchemaRegistryService; import com.provectus.kafka.ui.model.*; import lombok.RequiredArgsConstructor; import org.apache.commons.lang3.tuple.Pair; @@ -23,6 +24,7 @@ import java.util.function.Function; public class MetricsRestController implements ApiClustersApi { private final ClusterService clusterService; + private final SchemaRegistryService schemaRegistryService; @Override public Mono>> getClusters(ServerWebExchange exchange) { @@ -102,19 +104,34 @@ public class MetricsRestController implements ApiClustersApi { @Override public Mono>> getSchemaSubjectByVersion(String clusterName, String subjectName, Integer version, ServerWebExchange exchange) { - Flux flux = clusterService.getSchemaSubjectByVersion(clusterName, subjectName, version); + Flux flux = schemaRegistryService.getSchemaSubjectByVersion(clusterName, subjectName, version); return Mono.just(ResponseEntity.ok(flux)); } @Override public Mono>> getSchemaSubjects(String clusterName, ServerWebExchange exchange) { - Flux subjects = clusterService.getSchemaSubjects(clusterName); + Flux subjects = schemaRegistryService.getAllSchemaSubjects(clusterName); return Mono.just(ResponseEntity.ok(subjects)); } @Override public Mono>> getSchemaSubjectVersions(String clusterName, String subjectName, ServerWebExchange exchange) { - return Mono.just(ResponseEntity.ok(clusterService.getSchemaSubjectVersions(clusterName, subjectName))); + return Mono.just(ResponseEntity.ok(schemaRegistryService.getSchemaSubjectVersions(clusterName, subjectName))); + } + + @Override + public Mono> deleteSchemaByVersion(String clusterName, String subjectName, Integer version, ServerWebExchange exchange) { + return Mono.just(ResponseEntity.ok(schemaRegistryService.deleteSchemaSubjectByVersion(clusterName, subjectName, version))); + } + + @Override + public Mono> deleteSchemaSubject(String clusterName, String subjectName, ServerWebExchange exchange) { + return Mono.just(ResponseEntity.ok(schemaRegistryService.deleteSchemaSubject(clusterName, subjectName))); + } + + @Override + public Mono> createNewSubjectSchema(String clusterName, String subjectName, @Valid Mono newSchemaSubject, ServerWebExchange exchange) { + return schemaRegistryService.createNewSubject(clusterName, subjectName, newSchemaSubject); } @Override diff --git a/kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml b/kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml index f053fa8c20..1b2dc5fe6d 100644 --- a/kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml +++ b/kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml @@ -357,6 +357,31 @@ paths: items: type: string + /api/clusters/{clusterName}/schema/subjects/{subjectName}: + delete: + tags: + - /api/clusters + summary: delete subject from schema registry + operationId: deleteSchemaSubject + parameters: + - name: clusterName + in: path + required: true + schema: + type: string + - name: subjectName + in: path + required: true + schema: + type: string + responses: + 200: + description: OK + content: + application/json: + schema: + type: object + /api/clusters/{clusterName}/schema/subjects/{subjectName}/versions: get: tags: @@ -383,6 +408,34 @@ paths: type: array items: type: integer + post: + tags: + - /api/clusters + summary: create a new subject schema + operationId: createNewSubjectSchema + parameters: + - name: clusterName + in: path + required: true + schema: + type: string + - name: subjectName + in: path + required: true + schema: + type: string + requestBody: + content: + application/json: + schema: + $ref: '#/components/schemas/NewSchemaSubject' + responses: + 200: + description: Updated + content: + application/json: + schema: + $ref: '#/components/schemas/SubjectSchema' /api/clusters/{clusterName}/schema/subjects/{subjectName}/versions/{version}: get: @@ -415,6 +468,40 @@ paths: type: array items: $ref: '#/components/schemas/SubjectSchema' + delete: + tags: + - /api/clusters + summary: delete schema by version from schema registry + operationId: deleteSchemaByVersion + parameters: + - name: clusterName + in: path + required: true + schema: + type: string + - name: subjectName + in: path + required: true + schema: + type: string + - name: version + in: path + required: true + schema: + type: integer + responses: + 204: + description: Deleted + content: + application/json: + schema: + type: object + 404: + description: Not found + content: + application/json: + schema: + type: object components: schemas: @@ -752,7 +839,12 @@ components: schema: type: string required: - - subject - - version - id + + NewSchemaSubject: + type: object + properties: + schema: + type: string + required: - schema \ No newline at end of file