Add ability to create/delete schema subjects in/from schema-registry service

This commit is contained in:
Ildar Almakaev 2021-01-28 14:43:33 +03:00
parent 34b5bb4a3d
commit 37e9427cdb
5 changed files with 171 additions and 22 deletions

View file

@ -0,0 +1,11 @@
package com.provectus.kafka.ui.cluster.exception;
import org.springframework.http.HttpStatus;
import org.springframework.web.bind.annotation.ResponseStatus;
@ResponseStatus(HttpStatus.NOT_FOUND)
public class NotFoundException extends RuntimeException {
public NotFoundException(String message) {
super(message);
}
}

View file

@ -31,7 +31,6 @@ public class ClusterService {
private final ClusterMapper clusterMapper; private final ClusterMapper clusterMapper;
private final KafkaService kafkaService; private final KafkaService kafkaService;
private final ConsumingService consumingService; private final ConsumingService consumingService;
private final SchemaRegistryService schemaRegistryService;
public List<Cluster> getClusters() { public List<Cluster> getClusters() {
return clustersStorage.getKafkaClusters() return clustersStorage.getKafkaClusters()
@ -180,16 +179,4 @@ public class ClusterService {
.map(c -> consumingService.loadMessages(c, topicName, consumerPosition, query, limit)) .map(c -> consumingService.loadMessages(c, topicName, consumerPosition, query, limit))
.orElse(Flux.empty()); .orElse(Flux.empty());
} }
public Flux<String> getSchemaSubjects(String clusterName) {
return schemaRegistryService.getAllSchemaSubjects(clusterName);
}
public Flux<Integer> getSchemaSubjectVersions(String clusterName, String subjectName) {
return schemaRegistryService.getSchemaSubjectVersions(clusterName, subjectName);
}
public Flux<SubjectSchema> getSchemaSubjectByVersion(String clusterName, String subjectName, Integer version) {
return schemaRegistryService.getSchemaSubjectByVersion(clusterName, subjectName, version);
}
} }

View file

@ -1,13 +1,20 @@
package com.provectus.kafka.ui.cluster.service; package com.provectus.kafka.ui.cluster.service;
import com.provectus.kafka.ui.cluster.exception.NotFoundException;
import com.provectus.kafka.ui.cluster.model.ClustersStorage; import com.provectus.kafka.ui.cluster.model.ClustersStorage;
import com.provectus.kafka.ui.cluster.model.KafkaCluster; import com.provectus.kafka.ui.cluster.model.KafkaCluster;
import com.provectus.kafka.ui.model.NewSchemaSubject;
import com.provectus.kafka.ui.model.SubjectSchema; import com.provectus.kafka.ui.model.SubjectSchema;
import lombok.RequiredArgsConstructor; import lombok.RequiredArgsConstructor;
import lombok.extern.log4j.Log4j2; import lombok.extern.log4j.Log4j2;
import org.springframework.http.HttpStatus;
import org.springframework.http.MediaType;
import org.springframework.http.ResponseEntity;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import org.springframework.web.reactive.function.BodyInserters;
import org.springframework.web.reactive.function.client.WebClient; import org.springframework.web.reactive.function.client.WebClient;
import reactor.core.publisher.Flux; import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
@Service @Service
@Log4j2 @Log4j2
@ -15,12 +22,12 @@ import reactor.core.publisher.Flux;
public class SchemaRegistryService { public class SchemaRegistryService {
private final ClustersStorage clustersStorage; private final ClustersStorage clustersStorage;
public static final String URL_SUBJECTS = "/subjects"; public static final String URL_SUBJECTS = "/subjects";
public static final String URL_SUBJECT = "/subjects/{subjectName}";
public static final String URL_SUBJECT_VERSIONS = "/subjects/{subjectName}/versions"; public static final String URL_SUBJECT_VERSIONS = "/subjects/{subjectName}/versions";
public static final String URL_SUBJECT = "/subjects/{subjectName}/versions/{version}"; public static final String URL_SUBJECT_BY_VERSION = "/subjects/{subjectName}/versions/{version}";
public Flux<String> getAllSchemaSubjects(String clusterName) { public Flux<String> getAllSchemaSubjects(String clusterName) {
KafkaCluster kafkaCluster = clustersStorage.getClusterByName(clusterName).orElseThrow(); KafkaCluster kafkaCluster = clustersStorage.getClusterByName(clusterName).orElseThrow();
// todo: use it as a bean
WebClient webClient = WebClient.create(kafkaCluster.getSchemaRegistry()); WebClient webClient = WebClient.create(kafkaCluster.getSchemaRegistry());
return webClient.get() return webClient.get()
.uri(URL_SUBJECTS) .uri(URL_SUBJECTS)
@ -30,11 +37,11 @@ public class SchemaRegistryService {
public Flux<Integer> getSchemaSubjectVersions(String clusterName, String subjectName) { public Flux<Integer> getSchemaSubjectVersions(String clusterName, String subjectName) {
KafkaCluster kafkaCluster = clustersStorage.getClusterByName(clusterName).orElseThrow(); KafkaCluster kafkaCluster = clustersStorage.getClusterByName(clusterName).orElseThrow();
// todo: use it as a bean
WebClient webClient = WebClient.create(kafkaCluster.getSchemaRegistry()); WebClient webClient = WebClient.create(kafkaCluster.getSchemaRegistry());
return webClient.get() return webClient.get()
.uri(URL_SUBJECT_VERSIONS, subjectName) .uri(URL_SUBJECT_VERSIONS, subjectName)
.retrieve() .retrieve()
.onStatus(HttpStatus.NOT_FOUND::equals, resp -> Mono.error(new NotFoundException("No such subject")))
.bodyToFlux(Integer.class); .bodyToFlux(Integer.class);
} }
@ -42,8 +49,43 @@ public class SchemaRegistryService {
KafkaCluster kafkaCluster = clustersStorage.getClusterByName(clusterName).orElseThrow(); KafkaCluster kafkaCluster = clustersStorage.getClusterByName(clusterName).orElseThrow();
WebClient webClient = WebClient.create(kafkaCluster.getSchemaRegistry()); WebClient webClient = WebClient.create(kafkaCluster.getSchemaRegistry());
return webClient.get() return webClient.get()
.uri(URL_SUBJECT, subjectName, version) .uri(URL_SUBJECT_BY_VERSION, subjectName, version)
.retrieve() .retrieve()
.onStatus(HttpStatus.NOT_FOUND::equals, resp -> Mono.error(new NotFoundException("No such subject or version")))
.bodyToFlux(SubjectSchema.class); .bodyToFlux(SubjectSchema.class);
} }
public Mono<Object> deleteSchemaSubjectByVersion(String clusterName, String subjectName, Integer version) {
KafkaCluster kafkaCluster = clustersStorage.getClusterByName(clusterName).orElseThrow();
WebClient webClient = WebClient.create(kafkaCluster.getSchemaRegistry());
return webClient.delete()
.uri(URL_SUBJECT_BY_VERSION, subjectName, version)
.retrieve()
.onStatus(HttpStatus.NOT_FOUND::equals, resp -> Mono.error(new NotFoundException("No such subject or version")))
.bodyToMono(Object.class);
}
public Mono<Object> deleteSchemaSubject(String clusterName, String subjectName) {
KafkaCluster kafkaCluster = clustersStorage.getClusterByName(clusterName).orElseThrow();
WebClient webClient = WebClient.create(kafkaCluster.getSchemaRegistry());
return webClient.delete()
.uri(URL_SUBJECT, subjectName)
.retrieve()
.onStatus(HttpStatus.NOT_FOUND::equals, resp -> Mono.error(new NotFoundException("No such subject or version")))
.bodyToMono(Object.class);
}
public Mono<ResponseEntity<SubjectSchema>> createNewSubject(String clusterName, String subjectSchema, Mono<NewSchemaSubject> newSchemaSubject) {
return clustersStorage.getClusterByName(clusterName)
.map(kafkaCluster -> WebClient.create(kafkaCluster.getSchemaRegistry()))
.map(webClient -> webClient
.post()
.uri(URL_SUBJECT_VERSIONS, subjectSchema)
.contentType(MediaType.APPLICATION_JSON)
.body(BodyInserters.fromPublisher(newSchemaSubject, NewSchemaSubject.class))
.retrieve()
.toEntity(SubjectSchema.class)
)
.orElse(Mono.empty());
}
} }

View file

@ -3,6 +3,7 @@ package com.provectus.kafka.ui.rest;
import com.provectus.kafka.ui.api.ApiClustersApi; import com.provectus.kafka.ui.api.ApiClustersApi;
import com.provectus.kafka.ui.cluster.model.ConsumerPosition; import com.provectus.kafka.ui.cluster.model.ConsumerPosition;
import com.provectus.kafka.ui.cluster.service.ClusterService; import com.provectus.kafka.ui.cluster.service.ClusterService;
import com.provectus.kafka.ui.cluster.service.SchemaRegistryService;
import com.provectus.kafka.ui.model.*; import com.provectus.kafka.ui.model.*;
import lombok.RequiredArgsConstructor; import lombok.RequiredArgsConstructor;
import org.apache.commons.lang3.tuple.Pair; import org.apache.commons.lang3.tuple.Pair;
@ -23,6 +24,7 @@ import java.util.function.Function;
public class MetricsRestController implements ApiClustersApi { public class MetricsRestController implements ApiClustersApi {
private final ClusterService clusterService; private final ClusterService clusterService;
private final SchemaRegistryService schemaRegistryService;
@Override @Override
public Mono<ResponseEntity<Flux<Cluster>>> getClusters(ServerWebExchange exchange) { public Mono<ResponseEntity<Flux<Cluster>>> getClusters(ServerWebExchange exchange) {
@ -102,19 +104,34 @@ public class MetricsRestController implements ApiClustersApi {
@Override @Override
public Mono<ResponseEntity<Flux<SubjectSchema>>> getSchemaSubjectByVersion(String clusterName, String subjectName, Integer version, ServerWebExchange exchange) { public Mono<ResponseEntity<Flux<SubjectSchema>>> getSchemaSubjectByVersion(String clusterName, String subjectName, Integer version, ServerWebExchange exchange) {
Flux<SubjectSchema> flux = clusterService.getSchemaSubjectByVersion(clusterName, subjectName, version); Flux<SubjectSchema> flux = schemaRegistryService.getSchemaSubjectByVersion(clusterName, subjectName, version);
return Mono.just(ResponseEntity.ok(flux)); return Mono.just(ResponseEntity.ok(flux));
} }
@Override @Override
public Mono<ResponseEntity<Flux<String>>> getSchemaSubjects(String clusterName, ServerWebExchange exchange) { public Mono<ResponseEntity<Flux<String>>> getSchemaSubjects(String clusterName, ServerWebExchange exchange) {
Flux<String> subjects = clusterService.getSchemaSubjects(clusterName); Flux<String> subjects = schemaRegistryService.getAllSchemaSubjects(clusterName);
return Mono.just(ResponseEntity.ok(subjects)); return Mono.just(ResponseEntity.ok(subjects));
} }
@Override @Override
public Mono<ResponseEntity<Flux<Integer>>> getSchemaSubjectVersions(String clusterName, String subjectName, ServerWebExchange exchange) { public Mono<ResponseEntity<Flux<Integer>>> getSchemaSubjectVersions(String clusterName, String subjectName, ServerWebExchange exchange) {
return Mono.just(ResponseEntity.ok(clusterService.getSchemaSubjectVersions(clusterName, subjectName))); return Mono.just(ResponseEntity.ok(schemaRegistryService.getSchemaSubjectVersions(clusterName, subjectName)));
}
@Override
public Mono<ResponseEntity<Object>> deleteSchemaByVersion(String clusterName, String subjectName, Integer version, ServerWebExchange exchange) {
return Mono.just(ResponseEntity.ok(schemaRegistryService.deleteSchemaSubjectByVersion(clusterName, subjectName, version)));
}
@Override
public Mono<ResponseEntity<Object>> deleteSchemaSubject(String clusterName, String subjectName, ServerWebExchange exchange) {
return Mono.just(ResponseEntity.ok(schemaRegistryService.deleteSchemaSubject(clusterName, subjectName)));
}
@Override
public Mono<ResponseEntity<SubjectSchema>> createNewSubjectSchema(String clusterName, String subjectName, @Valid Mono<NewSchemaSubject> newSchemaSubject, ServerWebExchange exchange) {
return schemaRegistryService.createNewSubject(clusterName, subjectName, newSchemaSubject);
} }
@Override @Override

View file

@ -357,6 +357,31 @@ paths:
items: items:
type: string type: string
/api/clusters/{clusterName}/schema/subjects/{subjectName}:
delete:
tags:
- /api/clusters
summary: delete subject from schema registry
operationId: deleteSchemaSubject
parameters:
- name: clusterName
in: path
required: true
schema:
type: string
- name: subjectName
in: path
required: true
schema:
type: string
responses:
200:
description: OK
content:
application/json:
schema:
type: object
/api/clusters/{clusterName}/schema/subjects/{subjectName}/versions: /api/clusters/{clusterName}/schema/subjects/{subjectName}/versions:
get: get:
tags: tags:
@ -383,6 +408,34 @@ paths:
type: array type: array
items: items:
type: integer type: integer
post:
tags:
- /api/clusters
summary: create a new subject schema
operationId: createNewSubjectSchema
parameters:
- name: clusterName
in: path
required: true
schema:
type: string
- name: subjectName
in: path
required: true
schema:
type: string
requestBody:
content:
application/json:
schema:
$ref: '#/components/schemas/NewSchemaSubject'
responses:
200:
description: Updated
content:
application/json:
schema:
$ref: '#/components/schemas/SubjectSchema'
/api/clusters/{clusterName}/schema/subjects/{subjectName}/versions/{version}: /api/clusters/{clusterName}/schema/subjects/{subjectName}/versions/{version}:
get: get:
@ -415,6 +468,40 @@ paths:
type: array type: array
items: items:
$ref: '#/components/schemas/SubjectSchema' $ref: '#/components/schemas/SubjectSchema'
delete:
tags:
- /api/clusters
summary: delete schema by version from schema registry
operationId: deleteSchemaByVersion
parameters:
- name: clusterName
in: path
required: true
schema:
type: string
- name: subjectName
in: path
required: true
schema:
type: string
- name: version
in: path
required: true
schema:
type: integer
responses:
204:
description: Deleted
content:
application/json:
schema:
type: object
404:
description: Not found
content:
application/json:
schema:
type: object
components: components:
schemas: schemas:
@ -752,7 +839,12 @@ components:
schema: schema:
type: string type: string
required: required:
- subject
- version
- id - id
NewSchemaSubject:
type: object
properties:
schema:
type: string
required:
- schema - schema