Add CRUD actions for Schema Registry service (#165)

* Map schema registry port to 8081 in kafka-clusters-only.yaml

* Add endpoint to retrieve subjects from schema registry by cluster name

* Add endpoint to retrieve subject versions from schema registry

* Get subject schema by version from schema-registry

* Add ability to create/delete schema subjects in/from schema-registry service

* Create WebClient bean and refactor its usage

* Refactor schema api contract

* Return 'Bad request' if there is 'Internal Server Error' from Schema Registry

* Add GET/PUT methods to get/update a schema compatibility level globally or only for a scepific schema

* Add SchemaRegistryContainer.java and testcontainers dependencies for integration tests

* Add junit-jupiter5 for testing

* Add GET/DELETE actions for the latest schema version

* Add endpoint to check schema compatibility

* Set up configuration for testing

* Add basic test for SchemaRegistryService

* Fix mapping from dto to web model

* Change createNewSchema action's endpoint

* Update tests in SchemaRegistryServiceTests

* Rename getLatestSchema method and update tests

* Add more test in SchemaRegistryServiceTests

* Include compatibility level info to schema response. Change Flux to Mono

* Update tests

* Pass schema-registry url for secondLocal cluster too

* Remove explicit returns of 404 status code (it'll be processed by global error handler)

* Add global error handler to response with exception details

* Autoconfigure WebTestClient in SchemaRegistryServiceTest
This commit is contained in:
Ildar Almakaev 2021-02-09 10:40:11 +03:00 committed by GitHub
parent b87c2f6411
commit 961d14454a
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
23 changed files with 1071 additions and 49 deletions

View file

@ -85,6 +85,8 @@ services:
SCHEMA_REGISTRY_SCHEMA_REGISTRY_INTER_INSTANCE_PROTOCOL: "http"
SCHEMA_REGISTRY_LOG4J_ROOT_LOGLEVEL: INFO
SCHEMA_REGISTRY_KAFKASTORE_TOPIC: _schemas
ports:
- 8081:8081
kafka-init-topics:
image: confluentinc/cp-kafka:5.1.0

View file

@ -23,6 +23,7 @@ services:
KAFKA_CLUSTERS_1_BOOTSTRAPSERVERS: kafka1:29092
KAFKA_CLUSTERS_1_ZOOKEEPER: zookeeper1:2181
KAFKA_CLUSTERS_1_JMXPORT: 9998
KAFKA_CLUSTERS_1_SCHEMAREGISTRY: http://schemaregistry0:8085
zookeeper0:
image: confluentinc/cp-zookeeper:5.1.0

View file

@ -119,7 +119,30 @@
<artifactId>commons-pool2</artifactId>
<version>${apache.commons.version}</version>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>testcontainers</artifactId>
<version>${test.containers.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>kafka</artifactId>
<version>${test.containers.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>junit-jupiter</artifactId>
<version>${test.containers.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-engine</artifactId>
<version>${junit-jupiter-engine.version}</version>
<scope>test</scope>
</dependency>
</dependencies>
<build>

View file

@ -7,6 +7,7 @@ import org.apache.commons.pool2.impl.GenericKeyedObjectPoolConfig;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jmx.export.MBeanExporter;
import org.springframework.web.reactive.function.client.WebClient;
import javax.management.remote.JMXConnector;
@ -15,7 +16,7 @@ public class Config {
@Bean
public KeyedObjectPool<String, JMXConnector> pool() {
GenericKeyedObjectPool<String, JMXConnector> pool = new GenericKeyedObjectPool<>(new JmxPoolFactory());
GenericKeyedObjectPool<String, JMXConnector> pool = new GenericKeyedObjectPool<>(new JmxPoolFactory());
pool.setConfig(poolConfig());
return pool;
}
@ -28,11 +29,15 @@ public class Config {
}
@Bean
public MBeanExporter exporter()
{
public MBeanExporter exporter() {
final MBeanExporter exporter = new MBeanExporter();
exporter.setAutodetect(true);
exporter.setExcludedBeans("pool");
return exporter;
}
@Bean
public WebClient webClient() {
return WebClient.create();
}
}

View file

@ -0,0 +1,26 @@
package com.provectus.kafka.ui.cluster.exception;
import org.springframework.http.HttpStatus;
public abstract class CustomBaseException extends RuntimeException {
public CustomBaseException() {
}
public CustomBaseException(String message) {
super(message);
}
public CustomBaseException(String message, Throwable cause) {
super(message, cause);
}
public CustomBaseException(Throwable cause) {
super(cause);
}
public CustomBaseException(String message, Throwable cause, boolean enableSuppression, boolean writableStackTrace) {
super(message, cause, enableSuppression, writableStackTrace);
}
public abstract HttpStatus getResponseStatusCode();
}

View file

@ -0,0 +1,32 @@
package com.provectus.kafka.ui.cluster.exception;
import org.springframework.boot.web.reactive.error.DefaultErrorAttributes;
import org.springframework.stereotype.Component;
import org.springframework.web.reactive.function.client.WebClientResponseException;
import org.springframework.web.reactive.function.server.ServerRequest;
import java.util.Map;
@Component
public class GlobalErrorAttributes extends DefaultErrorAttributes {
public static final String STATUS = "status";
@Override
public Map<String, Object> getErrorAttributes(ServerRequest request, boolean includeStackTrace) {
Map<String, Object> errorAttrs = super.getErrorAttributes(request, includeStackTrace);
includeCustomErrorAttributes(request, errorAttrs);
return errorAttrs;
}
private void includeCustomErrorAttributes(ServerRequest request, Map<String, Object> errorAttrs) {
Throwable error = getError(request);
if (error instanceof WebClientResponseException) {
var webClientError = (WebClientResponseException) error;
errorAttrs.put(STATUS, webClientError.getStatusCode());
} else if (error instanceof CustomBaseException) {
var customBaseError = (CustomBaseException) error;
errorAttrs.put(STATUS, customBaseError.getResponseStatusCode());
}
}
}

View file

@ -0,0 +1,48 @@
package com.provectus.kafka.ui.cluster.exception;
import org.springframework.boot.autoconfigure.web.ResourceProperties;
import org.springframework.boot.autoconfigure.web.reactive.error.AbstractErrorWebExceptionHandler;
import org.springframework.boot.web.reactive.error.ErrorAttributes;
import org.springframework.context.ApplicationContext;
import org.springframework.core.annotation.Order;
import org.springframework.http.HttpStatus;
import org.springframework.http.MediaType;
import org.springframework.http.codec.ServerCodecConfigurer;
import org.springframework.stereotype.Component;
import org.springframework.web.reactive.function.BodyInserters;
import org.springframework.web.reactive.function.server.*;
import reactor.core.publisher.Mono;
import java.util.Map;
import java.util.Optional;
/**
* The order of our global error handler is -2 to give it a higher priority than the default {@link org.springframework.boot.autoconfigure.web.reactive.error.DefaultErrorWebExceptionHandler}
* which is registered at <code>@Order(-1)</code>.
*/
@Component
@Order(-2)
public class GlobalErrorWebExceptionHandler extends AbstractErrorWebExceptionHandler {
public GlobalErrorWebExceptionHandler(GlobalErrorAttributes errorAttributes, ResourceProperties resourceProperties, ApplicationContext applicationContext,
ServerCodecConfigurer codecConfigurer) {
super(errorAttributes, resourceProperties, applicationContext);
this.setMessageWriters(codecConfigurer.getWriters());
}
@Override
protected RouterFunction<ServerResponse> getRoutingFunction(ErrorAttributes errorAttributes) {
return RouterFunctions.route(RequestPredicates.all(), this::renderErrorResponse);
}
private Mono<ServerResponse> renderErrorResponse(ServerRequest request) {
Map<String, Object> errorAttributes = getErrorAttributes(request, false);
HttpStatus statusCode = Optional.ofNullable(errorAttributes.get(GlobalErrorAttributes.STATUS))
.map(code -> (HttpStatus) code)
.orElse(HttpStatus.BAD_REQUEST);
return ServerResponse
.status(statusCode)
.contentType(MediaType.APPLICATION_JSON)
.body(BodyInserters.fromValue(errorAttributes));
}
}

View file

@ -0,0 +1,15 @@
package com.provectus.kafka.ui.cluster.exception;
import org.springframework.http.HttpStatus;
public class NotFoundException extends CustomBaseException {
public NotFoundException(String message) {
super(message);
}
@Override
public HttpStatus getResponseStatusCode() {
return HttpStatus.NOT_FOUND;
}
}

View file

@ -2,6 +2,7 @@ package com.provectus.kafka.ui.cluster.mapper;
import com.provectus.kafka.ui.cluster.config.ClustersProperties;
import com.provectus.kafka.ui.cluster.model.*;
import com.provectus.kafka.ui.cluster.model.InternalCompatibilityCheck;
import com.provectus.kafka.ui.model.*;
import java.util.Properties;
import org.mapstruct.Mapper;
@ -36,6 +37,12 @@ public interface ClusterMapper {
TopicConfig toTopicConfig(InternalTopicConfig topic);
Replica toReplica(InternalReplica replica);
@Mapping(target = "isCompatible", source = "compatible")
CompatibilityCheckResponse toCompatibilityCheckResponse(InternalCompatibilityCheck dto);
@Mapping(target = "compatibility", source = "compatibilityLevel")
CompatibilityLevel toCompatibilityLevel(InternalCompatibilityLevel dto);
default TopicDetails toTopicDetails(InternalTopic topic, InternalClusterMetrics metrics) {
final TopicDetails result = toTopicDetails(topic);
result.setBytesInPerSec(

View file

@ -0,0 +1,10 @@
package com.provectus.kafka.ui.cluster.model;
import com.fasterxml.jackson.annotation.JsonProperty;
import lombok.Data;
@Data
public class InternalCompatibilityCheck {
@JsonProperty("is_compatible")
private boolean isCompatible;
}

View file

@ -0,0 +1,8 @@
package com.provectus.kafka.ui.cluster.model;
import lombok.Data;
@Data
public class InternalCompatibilityLevel {
private String compatibilityLevel;
}

View file

@ -41,8 +41,8 @@ public class ClusterService {
public Mono<BrokerMetrics> getBrokerMetrics(String name, Integer id) {
return Mono.justOrEmpty(clustersStorage.getClusterByName(name)
.map( c -> c.getMetrics().getInternalBrokerMetrics())
.map( m -> m.get(id))
.map(c -> c.getMetrics().getInternalBrokerMetrics())
.map(m -> m.get(id))
.map(clusterMapper::toBrokerMetrics));
}
@ -75,17 +75,17 @@ public class ClusterService {
public Optional<TopicDetails> getTopicDetails(String name, String topicName) {
return clustersStorage.getClusterByName(name)
.flatMap( c ->
.flatMap(c ->
Optional.ofNullable(
c.getTopics().get(topicName)
c.getTopics().get(topicName)
).map(
t -> t.toBuilder().partitions(
kafkaService.getTopicPartitions(c, t)
).build()
t -> t.toBuilder().partitions(
kafkaService.getTopicPartitions(c, t)
).build()
).map(t -> clusterMapper.toTopicDetails(t, c.getMetrics()))
);
}
public Optional<List<TopicConfig>> getTopicConfigs(String name, String topicName) {
return clustersStorage.getClusterByName(name)
.map(KafkaCluster::getTopics)
@ -106,17 +106,17 @@ public class ClusterService {
var cluster = clustersStorage.getClusterByName(clusterName).orElseThrow(Throwable::new);
return kafkaService.getOrCreateAdminClient(cluster).map(ac ->
ac.getAdminClient().describeConsumerGroups(Collections.singletonList(consumerGroupId)).all()
).flatMap(groups ->
ac.getAdminClient().describeConsumerGroups(Collections.singletonList(consumerGroupId)).all()
).flatMap(groups ->
groupMetadata(cluster, consumerGroupId)
.flatMap(offsets -> {
Map<TopicPartition, Long> endOffsets = topicPartitionsEndOffsets(cluster, offsets.keySet());
.flatMap(offsets -> {
Map<TopicPartition, Long> endOffsets = topicPartitionsEndOffsets(cluster, offsets.keySet());
return ClusterUtil.toMono(groups).map(s -> s.get(consumerGroupId).members().stream()
.flatMap(c -> Stream.of(ClusterUtil.convertToConsumerTopicPartitionDetails(c, offsets, endOffsets)))
.flatMap(c -> Stream.of(ClusterUtil.convertToConsumerTopicPartitionDetails(c, offsets, endOffsets)))
.collect(Collectors.toList()).stream().flatMap(t -> t.stream().flatMap(Stream::of)).collect(Collectors.toList()));
})
)
.map(c -> new ConsumerGroupDetails().consumers(c).consumerGroupId(consumerGroupId));
})
)
.map(c -> new ConsumerGroupDetails().consumers(c).consumerGroupId(consumerGroupId));
}
@ -141,21 +141,21 @@ public class ClusterService {
}
@SneakyThrows
public Mono<List<ConsumerGroup>> getConsumerGroups (String clusterName) {
return clustersStorage.getClusterByName(clusterName)
.map(kafkaService::getConsumerGroups)
.orElse(Mono.empty());
public Mono<List<ConsumerGroup>> getConsumerGroups(String clusterName) {
return clustersStorage.getClusterByName(clusterName)
.map(kafkaService::getConsumerGroups)
.orElse(Mono.empty());
}
public Flux<Broker> getBrokers (String clusterName) {
public Flux<Broker> getBrokers(String clusterName) {
return kafkaService.getOrCreateAdminClient(clustersStorage.getClusterByName(clusterName).orElseThrow())
.flatMap(client -> ClusterUtil.toMono(client.getAdminClient().describeCluster().nodes())
.map(n -> n.stream().map(node -> {
Broker broker = new Broker();
broker.setId(node.id());
broker.setHost(node.host());
return broker;
}).collect(Collectors.toList())))
.map(n -> n.stream().map(node -> {
Broker broker = new Broker();
broker.setId(node.id());
broker.setHost(node.host());
return broker;
}).collect(Collectors.toList())))
.flatMapMany(Flux::fromIterable);
}
@ -180,5 +180,4 @@ public class ClusterService {
.map(c -> consumingService.loadMessages(c, topicName, consumerPosition, query, limit))
.orElse(Flux.empty());
}
}

View file

@ -0,0 +1,190 @@
package com.provectus.kafka.ui.cluster.service;
import com.provectus.kafka.ui.cluster.exception.NotFoundException;
import com.provectus.kafka.ui.cluster.mapper.ClusterMapper;
import com.provectus.kafka.ui.cluster.model.ClustersStorage;
import com.provectus.kafka.ui.cluster.model.InternalCompatibilityCheck;
import com.provectus.kafka.ui.cluster.model.InternalCompatibilityLevel;
import com.provectus.kafka.ui.model.CompatibilityCheckResponse;
import com.provectus.kafka.ui.model.CompatibilityLevel;
import com.provectus.kafka.ui.model.NewSchemaSubject;
import com.provectus.kafka.ui.model.SchemaSubject;
import lombok.RequiredArgsConstructor;
import lombok.extern.log4j.Log4j2;
import org.springframework.http.HttpStatus;
import org.springframework.http.MediaType;
import org.springframework.http.ResponseEntity;
import org.springframework.stereotype.Service;
import org.springframework.web.reactive.function.BodyInserters;
import org.springframework.web.reactive.function.client.WebClient;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import java.util.Objects;
@Service
@Log4j2
@RequiredArgsConstructor
public class SchemaRegistryService {
private static final String URL_SUBJECTS = "/subjects";
private static final String URL_SUBJECT = "/subjects/{schemaName}";
private static final String URL_SUBJECT_VERSIONS = "/subjects/{schemaName}/versions";
private static final String URL_SUBJECT_BY_VERSION = "/subjects/{schemaName}/versions/{version}";
private static final String LATEST = "latest";
private final ClustersStorage clustersStorage;
private final ClusterMapper mapper;
private final WebClient webClient;
public Flux<String> getAllSchemaSubjects(String clusterName) {
return clustersStorage.getClusterByName(clusterName)
.map(cluster -> webClient.get()
.uri(cluster.getSchemaRegistry() + URL_SUBJECTS)
.retrieve()
.bodyToFlux(String.class)
.doOnError(log::error))
.orElse(Flux.error(new NotFoundException("No such cluster")));
}
public Flux<Integer> getSchemaSubjectVersions(String clusterName, String schemaName) {
return clustersStorage.getClusterByName(clusterName)
.map(cluster -> webClient.get()
.uri(cluster.getSchemaRegistry() + URL_SUBJECT_VERSIONS, schemaName)
.retrieve()
.onStatus(HttpStatus.NOT_FOUND::equals, resp -> Mono.error(new NotFoundException("No such schema %s".formatted(schemaName))))
.bodyToFlux(Integer.class))
.orElse(Flux.error(new NotFoundException("No such cluster")));
}
public Mono<SchemaSubject> getSchemaSubjectByVersion(String clusterName, String schemaName, Integer version) {
return this.getSchemaSubject(clusterName, schemaName, String.valueOf(version));
}
public Mono<SchemaSubject> getLatestSchemaSubject(String clusterName, String schemaName) {
return this.getSchemaSubject(clusterName, schemaName, LATEST);
}
private Mono<SchemaSubject> getSchemaSubject(String clusterName, String schemaName, String version) {
return clustersStorage.getClusterByName(clusterName)
.map(cluster -> webClient.get()
.uri(cluster.getSchemaRegistry() + URL_SUBJECT_BY_VERSION, schemaName, version)
.retrieve()
.onStatus(HttpStatus.NOT_FOUND::equals,
resp -> Mono.error(new NotFoundException("No such schema %s with version %s".formatted(schemaName, version))))
.bodyToMono(SchemaSubject.class)
.zipWith(getSchemaCompatibilityInfoOrGlobal(clusterName, schemaName))
.map(tuple -> {
SchemaSubject schema = tuple.getT1();
String compatibilityLevel = tuple.getT2().getCompatibility().getValue();
schema.setCompatibilityLevel(compatibilityLevel);
return schema;
})
)
.orElseThrow();
}
public Mono<ResponseEntity<Void>> deleteSchemaSubjectByVersion(String clusterName, String schemaName, Integer version) {
return this.deleteSchemaSubject(clusterName, schemaName, String.valueOf(version));
}
public Mono<ResponseEntity<Void>> deleteLatestSchemaSubject(String clusterName, String schemaName) {
return this.deleteSchemaSubject(clusterName, schemaName, LATEST);
}
private Mono<ResponseEntity<Void>> deleteSchemaSubject(String clusterName, String schemaName, String version) {
return clustersStorage.getClusterByName(clusterName)
.map(cluster -> webClient.delete()
.uri(cluster.getSchemaRegistry() + URL_SUBJECT_BY_VERSION, schemaName, version)
.retrieve()
.onStatus(HttpStatus.NOT_FOUND::equals,
resp -> Mono.error(new NotFoundException("No such schema %s with version %s".formatted(schemaName, version))))
.toBodilessEntity())
.orElse(Mono.error(new NotFoundException("No such cluster")));
}
public Mono<ResponseEntity<Void>> deleteSchemaSubject(String clusterName, String schemaName) {
return clustersStorage.getClusterByName(clusterName)
.map(cluster -> webClient.delete()
.uri(cluster.getSchemaRegistry() + URL_SUBJECT, schemaName)
.retrieve()
.onStatus(HttpStatus.NOT_FOUND::equals, resp -> Mono.error(new NotFoundException("No such schema %s".formatted(schemaName))))
.toBodilessEntity())
.orElse(Mono.error(new NotFoundException("No such cluster")));
}
public Mono<ResponseEntity<SchemaSubject>> createNewSubject(String clusterName, String schemaName, Mono<NewSchemaSubject> newSchemaSubject) {
return clustersStorage.getClusterByName(clusterName)
.map(cluster -> webClient.post()
.uri(cluster.getSchemaRegistry() + URL_SUBJECT_VERSIONS, schemaName)
.contentType(MediaType.APPLICATION_JSON)
.body(BodyInserters.fromPublisher(newSchemaSubject, NewSchemaSubject.class))
.retrieve()
.onStatus(HttpStatus.NOT_FOUND::equals,
resp -> Mono.error(new NotFoundException("No such schema %s".formatted(schemaName))))
.toEntity(SchemaSubject.class)
.log())
.orElse(Mono.error(new NotFoundException("No such cluster")));
}
/**
* Updates a compatibility level for a <code>schemaName</code>
*
* @param schemaName is a schema subject name
* @see com.provectus.kafka.ui.model.CompatibilityLevel.CompatibilityEnum
*/
public Mono<Void> updateSchemaCompatibility(String clusterName, String schemaName, Mono<CompatibilityLevel> compatibilityLevel) {
return clustersStorage.getClusterByName(clusterName)
.map(cluster -> {
String configEndpoint = Objects.isNull(schemaName) ? "/config" : "/config/{schemaName}";
return webClient.put()
.uri(cluster.getSchemaRegistry() + configEndpoint, schemaName)
.contentType(MediaType.APPLICATION_JSON)
.body(BodyInserters.fromPublisher(compatibilityLevel, CompatibilityLevel.class))
.retrieve()
.onStatus(HttpStatus.NOT_FOUND::equals,
resp -> Mono.error(new NotFoundException("No such schema %s".formatted(schemaName))))
.bodyToMono(Void.class);
}).orElse(Mono.error(new NotFoundException("No such cluster")));
}
public Mono<Void> updateSchemaCompatibility(String clusterName, Mono<CompatibilityLevel> compatibilityLevel) {
return updateSchemaCompatibility(clusterName, null, compatibilityLevel);
}
public Mono<CompatibilityLevel> getSchemaCompatibilityLevel(String clusterName, String schemaName) {
return clustersStorage.getClusterByName(clusterName)
.map(cluster -> {
String configEndpoint = Objects.isNull(schemaName) ? "/config" : "/config/{schemaName}";
return webClient.get()
.uri(cluster.getSchemaRegistry() + configEndpoint, schemaName)
.retrieve()
.bodyToMono(InternalCompatibilityLevel.class)
.map(mapper::toCompatibilityLevel)
.onErrorResume(error -> Mono.empty());
}).orElse(Mono.empty());
}
public Mono<CompatibilityLevel> getGlobalSchemaCompatibilityLevel(String clusterName) {
return this.getSchemaCompatibilityLevel(clusterName, null);
}
private Mono<CompatibilityLevel> getSchemaCompatibilityInfoOrGlobal(String clusterName, String schemaName) {
return this.getSchemaCompatibilityLevel(clusterName, schemaName)
.switchIfEmpty(this.getGlobalSchemaCompatibilityLevel(clusterName));
}
public Mono<CompatibilityCheckResponse> checksSchemaCompatibility(String clusterName, String schemaName, Mono<NewSchemaSubject> newSchemaSubject) {
return clustersStorage.getClusterByName(clusterName)
.map(cluster -> webClient.post()
.uri(cluster.getSchemaRegistry() + "/compatibility/subjects/{schemaName}/versions/latest", schemaName)
.contentType(MediaType.APPLICATION_JSON)
.body(BodyInserters.fromPublisher(newSchemaSubject, NewSchemaSubject.class))
.retrieve()
.onStatus(HttpStatus.NOT_FOUND::equals,
resp -> Mono.error(new NotFoundException("No such schema %s".formatted(schemaName))))
.bodyToMono(InternalCompatibilityCheck.class)
.map(mapper::toCompatibilityCheckResponse)
.log()
).orElse(Mono.error(new NotFoundException("No such cluster")));
}
}

View file

@ -3,8 +3,10 @@ package com.provectus.kafka.ui.rest;
import com.provectus.kafka.ui.api.ApiClustersApi;
import com.provectus.kafka.ui.cluster.model.ConsumerPosition;
import com.provectus.kafka.ui.cluster.service.ClusterService;
import com.provectus.kafka.ui.cluster.service.SchemaRegistryService;
import com.provectus.kafka.ui.model.*;
import lombok.RequiredArgsConstructor;
import lombok.extern.log4j.Log4j2;
import org.apache.commons.lang3.tuple.Pair;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
@ -20,9 +22,11 @@ import java.util.function.Function;
@RestController
@RequiredArgsConstructor
@Log4j2
public class MetricsRestController implements ApiClustersApi {
private final ClusterService clusterService;
private final SchemaRegistryService schemaRegistryService;
@Override
public Mono<ResponseEntity<Flux<Cluster>>> getClusters(ServerWebExchange exchange) {
@ -32,8 +36,8 @@ public class MetricsRestController implements ApiClustersApi {
@Override
public Mono<ResponseEntity<BrokerMetrics>> getBrokersMetrics(String clusterName, Integer id, ServerWebExchange exchange) {
return clusterService.getBrokerMetrics(clusterName, id)
.map(ResponseEntity::ok)
.onErrorReturn(ResponseEntity.notFound().build());
.map(ResponseEntity::ok)
.onErrorReturn(ResponseEntity.notFound().build());
}
@Override
@ -100,6 +104,49 @@ public class MetricsRestController implements ApiClustersApi {
.switchIfEmpty(Mono.just(ResponseEntity.notFound().build())); // TODO: check behaviour on cluster not found and empty groups list
}
@Override
public Mono<ResponseEntity<SchemaSubject>> getLatestSchema(String clusterName, String schemaName, ServerWebExchange exchange) {
return schemaRegistryService.getLatestSchemaSubject(clusterName, schemaName).map(ResponseEntity::ok);
}
@Override
public Mono<ResponseEntity<SchemaSubject>> getSchemaByVersion(String clusterName, String schemaName, Integer version, ServerWebExchange exchange) {
return schemaRegistryService.getSchemaSubjectByVersion(clusterName, schemaName, version).map(ResponseEntity::ok);
}
@Override
public Mono<ResponseEntity<Flux<String>>> getSchemas(String clusterName, ServerWebExchange exchange) {
Flux<String> subjects = schemaRegistryService.getAllSchemaSubjects(clusterName);
return Mono.just(ResponseEntity.ok(subjects));
}
@Override
public Mono<ResponseEntity<Flux<Integer>>> getSchemaVersions(String clusterName, String subjectName, ServerWebExchange exchange) {
return Mono.just(ResponseEntity.ok(schemaRegistryService.getSchemaSubjectVersions(clusterName, subjectName)));
}
@Override
public Mono<ResponseEntity<Void>> deleteLatestSchema(String clusterName, String schemaName, ServerWebExchange exchange) {
return schemaRegistryService.deleteLatestSchemaSubject(clusterName, schemaName);
}
@Override
public Mono<ResponseEntity<Void>> deleteSchemaByVersion(String clusterName, String subjectName, Integer version, ServerWebExchange exchange) {
return schemaRegistryService.deleteSchemaSubjectByVersion(clusterName, subjectName, version);
}
@Override
public Mono<ResponseEntity<Void>> deleteSchema(String clusterName, String subjectName, ServerWebExchange exchange) {
return schemaRegistryService.deleteSchemaSubject(clusterName, subjectName);
}
@Override
public Mono<ResponseEntity<SchemaSubject>> createNewSchema(String clusterName, String schemaName,
@Valid Mono<NewSchemaSubject> newSchemaSubject,
ServerWebExchange exchange) {
return schemaRegistryService.createNewSubject(clusterName, schemaName, newSchemaSubject);
}
@Override
public Mono<ResponseEntity<ConsumerGroupDetails>> getConsumerGroup(String clusterName, String consumerGroupId, ServerWebExchange exchange) {
return clusterService.getConsumerGroupDetail(clusterName, consumerGroupId).map(ResponseEntity::ok);
@ -110,6 +157,34 @@ public class MetricsRestController implements ApiClustersApi {
return clusterService.updateTopic(clusterId, topicName, topicFormData).map(ResponseEntity::ok);
}
@Override
public Mono<ResponseEntity<CompatibilityLevel>> getGlobalSchemaCompatibilityLevel(String clusterName, ServerWebExchange exchange) {
return schemaRegistryService.getGlobalSchemaCompatibilityLevel(clusterName)
.map(ResponseEntity::ok)
.defaultIfEmpty(ResponseEntity.notFound().build());
}
@Override
public Mono<ResponseEntity<Void>> updateGlobalSchemaCompatibilityLevel(String clusterName, @Valid Mono<CompatibilityLevel> compatibilityLevel, ServerWebExchange exchange) {
log.info("Updating schema compatibility globally");
return schemaRegistryService.updateSchemaCompatibility(clusterName, compatibilityLevel)
.map(ResponseEntity::ok);
}
@Override
public Mono<ResponseEntity<CompatibilityCheckResponse>> checkSchemaCompatibility(String clusterName, String schemaName,
@Valid Mono<NewSchemaSubject> newSchemaSubject,
ServerWebExchange exchange) {
return schemaRegistryService.checksSchemaCompatibility(clusterName, schemaName, newSchemaSubject)
.map(ResponseEntity::ok);
}
@Override
public Mono<ResponseEntity<Void>> updateSchemaCompatibilityLevel(String clusterName, String schemaName, @Valid Mono<CompatibilityLevel> compatibilityLevel, ServerWebExchange exchange) {
log.info("Updating schema compatibility for schema: {}", schemaName);
return schemaRegistryService.updateSchemaCompatibility(clusterName, schemaName, compatibilityLevel)
.map(ResponseEntity::ok);
}
private Mono<ConsumerPosition> parseConsumerPosition(SeekType seekType, List<String> seekTo) {
return Mono.justOrEmpty(seekTo)

View file

@ -2,15 +2,16 @@ kafka:
clusters:
-
name: local
bootstrapServers: localhost:9092
bootstrapServers: localhost:9093
zookeeper: localhost:2181
schemaRegistry: http://localhost:8085
schemaRegistry: http://localhost:8081
# schemaNameTemplate: "%s-value"
jmxPort: 9997
-
name: secondLocal
bootstrapServers: localhost:9093
zookeeper: localhost:2182
schemaRegistry: http://localhost:8081
jmxPort: 9998
admin-client-timeout: 5000
zookeeper:

View file

@ -9,6 +9,7 @@ kafka:
name: secondLocal
zookeeper: zookeeper1:2181
bootstrapServers: kafka1:29092
schemaRegistry: http://schemaregistry0:8085
admin-client-timeout: 5000
zookeeper:
connection-timeout: 1000

View file

@ -0,0 +1,35 @@
package com.provectus.kafka.ui;
import org.junit.runner.RunWith;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.context.ApplicationContextInitializer;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.test.context.ActiveProfiles;
import org.springframework.test.context.junit4.SpringRunner;
import org.testcontainers.containers.KafkaContainer;
import org.testcontainers.containers.Network;
import org.testcontainers.junit.jupiter.Container;
import org.testcontainers.junit.jupiter.Testcontainers;
import org.testcontainers.utility.DockerImageName;
@RunWith(SpringRunner.class)
@SpringBootTest
@ActiveProfiles("test")
@Testcontainers
public abstract class AbstractBaseTest {
@Container
public static KafkaContainer kafka = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:5.2.1"))
.withNetwork(Network.SHARED);
@Container
public static SchemaRegistryContainer schemaRegistry = new SchemaRegistryContainer("5.2.1")
.withKafka(kafka)
.dependsOn(kafka);
public static class Initializer implements ApplicationContextInitializer<ConfigurableApplicationContext> {
@Override
public void initialize(ConfigurableApplicationContext context) {
System.setProperty("kafka.clusters.0.name", "local");
System.setProperty("kafka.clusters.0.schemaRegistry", schemaRegistry.getTarget());
}
}
}

View file

@ -1,12 +0,0 @@
package com.provectus.kafka.ui;
import org.junit.Test;
import org.springframework.boot.test.context.SpringBootTest;
@SpringBootTest
class KafkaMetricsApplicationTests {
@Test
public void contextLoads() {
}
}

View file

@ -0,0 +1,31 @@
package com.provectus.kafka.ui;
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.containers.KafkaContainer;
import org.testcontainers.containers.Network;
public class SchemaRegistryContainer extends GenericContainer<SchemaRegistryContainer> {
private static final int SCHEMA_PORT = 8081;
public SchemaRegistryContainer(String version) {
super("confluentinc/cp-schema-registry:" + version);
withExposedPorts(8081);
}
public SchemaRegistryContainer withKafka(KafkaContainer kafka) {
String bootstrapServers = kafka.getNetworkAliases().get(0) + ":9092";
return withKafka(kafka.getNetwork(), bootstrapServers);
}
public SchemaRegistryContainer withKafka(Network network, String bootstrapServers) {
withNetwork(network);
withEnv("SCHEMA_REGISTRY_HOST_NAME", "schema-registry");
withEnv("SCHEMA_REGISTRY_LISTENERS", "http://0.0.0.0:" + SCHEMA_PORT);
withEnv("SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS", "PLAINTEXT://" + bootstrapServers);
return self();
}
public String getTarget() {
return "http://" + getContainerIpAddress() + ":" + getMappedPort(SCHEMA_PORT);
}
}

View file

@ -0,0 +1,145 @@
package com.provectus.kafka.ui;
import com.provectus.kafka.ui.model.CompatibilityLevel;
import com.provectus.kafka.ui.model.SchemaSubject;
import lombok.extern.log4j.Log4j2;
import lombok.val;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.autoconfigure.web.reactive.AutoConfigureWebTestClient;
import org.springframework.http.MediaType;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.web.reactive.server.EntityExchangeResult;
import org.springframework.test.web.reactive.server.WebTestClient;
import org.springframework.web.reactive.function.BodyInserters;
import java.util.List;
import java.util.UUID;
@ContextConfiguration(initializers = {AbstractBaseTest.Initializer.class})
@Log4j2
@AutoConfigureWebTestClient(timeout = "10000")
class SchemaRegistryServiceTests extends AbstractBaseTest {
@Autowired
WebTestClient webTestClient;
@Test
public void should404WhenGetAllSchemasForUnknownCluster() {
webTestClient
.get()
.uri("http://localhost:8080/api/clusters/unknown-cluster/schemas")
.exchange()
.expectStatus().isNotFound();
}
@Test
void shouldReturn404WhenGetLatestSchemaByNonExistingSchemaName() {
String unknownSchema = "unknown-schema";
webTestClient
.get()
.uri("http://localhost:8080/api/clusters/local/schemas/{schemaName}/latest", unknownSchema)
.exchange()
.expectStatus().isNotFound();
}
@Test
void shouldReturnBackwardAsGlobalCompatibilityLevelByDefault() {
webTestClient
.get()
.uri("http://localhost:8080/api/clusters/local/schemas/compatibility")
.exchange()
.expectStatus().isOk()
.expectBody(CompatibilityLevel.class)
.consumeWith(result -> {
CompatibilityLevel responseBody = result.getResponseBody();
Assertions.assertNotNull(responseBody);
Assertions.assertEquals(CompatibilityLevel.CompatibilityEnum.BACKWARD, responseBody.getCompatibility());
});
}
@Test
public void shouldReturnNotNullResponseWhenGetAllSchemas() {
webTestClient
.get()
.uri("http://localhost:8080/api/clusters/local/schemas")
.exchange()
.expectStatus().isOk()
.expectBodyList(String.class)
.consumeWith(result -> {
List<String> responseBody = result.getResponseBody();
Assertions.assertNotNull(responseBody);
log.info("Response of test schemas: {}", responseBody);
});
}
@Test
public void shouldOkWhenCreateNewSchemaThenGetAndUpdateItsCompatibilityLevel() {
String schemaName = UUID.randomUUID().toString();
// Create a new schema
webTestClient
.post()
.uri("http://localhost:8080/api/clusters/local/schemas/{schemaName}", schemaName)
.contentType(MediaType.APPLICATION_JSON)
.body(BodyInserters.fromValue("{\"schema\":\"{\\\"type\\\": \\\"string\\\"}\"}"))
.exchange()
.expectStatus().isOk()
.expectBody(SchemaSubject.class)
.consumeWith(this::assertResponseBodyWhenCreateNewSchema);
//Get the created schema and check its items
webTestClient
.get()
.uri("http://localhost:8080/api/clusters/local/schemas/{schemaName}/latest", schemaName)
.exchange()
.expectStatus().isOk()
.expectBodyList(SchemaSubject.class)
.consumeWith(listEntityExchangeResult -> {
val expectedCompatibility = CompatibilityLevel.CompatibilityEnum.BACKWARD;
assertSchemaWhenGetLatest(schemaName, listEntityExchangeResult, expectedCompatibility);
});
//Now let's change compatibility level of this schema to FULL whereas the global level should be BACKWARD
webTestClient.put()
.uri("http://localhost:8080/api/clusters/local/schemas/{schemaName}/compatibility", schemaName)
.contentType(MediaType.APPLICATION_JSON)
.body(BodyInserters.fromValue("{\"compatibility\":\"FULL\"}"))
.exchange()
.expectStatus().isOk();
//Get one more time to check the schema compatibility level is changed to FULL
webTestClient
.get()
.uri("http://localhost:8080/api/clusters/local/schemas/{schemaName}/latest", schemaName)
.exchange()
.expectStatus().isOk()
.expectBodyList(SchemaSubject.class)
.consumeWith(listEntityExchangeResult -> {
val expectedCompatibility = CompatibilityLevel.CompatibilityEnum.FULL;
assertSchemaWhenGetLatest(schemaName, listEntityExchangeResult, expectedCompatibility);
});
}
private void assertSchemaWhenGetLatest(String schemaName, EntityExchangeResult<List<SchemaSubject>> listEntityExchangeResult, CompatibilityLevel.CompatibilityEnum expectedCompatibility) {
List<SchemaSubject> responseBody = listEntityExchangeResult.getResponseBody();
Assertions.assertNotNull(responseBody);
Assertions.assertEquals(1, responseBody.size());
SchemaSubject actualSchema = responseBody.get(0);
Assertions.assertNotNull(actualSchema);
Assertions.assertEquals(schemaName, actualSchema.getSubject());
Assertions.assertEquals("\"string\"", actualSchema.getSchema());
Assertions.assertNotNull(actualSchema.getCompatibilityLevel());
Assertions.assertEquals(expectedCompatibility.name(), actualSchema.getCompatibilityLevel());
}
private void assertResponseBodyWhenCreateNewSchema(EntityExchangeResult<SchemaSubject> exchangeResult) {
SchemaSubject responseBody = exchangeResult.getResponseBody();
Assertions.assertNotNull(responseBody);
Assertions.assertEquals(1, responseBody.getId(), "The schema ID should be non-null in the response");
String message = "It should be null";
Assertions.assertNull(responseBody.getSchema(), message);
Assertions.assertNull(responseBody.getSubject(), message);
Assertions.assertNull(responseBody.getVersion(), message);
}
}

View file

@ -0,0 +1,16 @@
kafka:
clusters:
-
name: local
bootstrapServers: localhost:9093
zookeeper: localhost:2181
schemaRegistry: http://localhost:8081
jmxPort: 9997
admin-client-timeout: 5000
zookeeper:
connection-timeout: 1000
spring:
jmx:
enabled: true
auth:
enabled: false

View file

@ -335,6 +335,312 @@ paths:
items:
$ref: '#/components/schemas/ConsumerGroup'
/api/clusters/{clusterName}/schemas:
get:
tags:
- /api/clusters
summary: get all schemas from Schema Registry service
operationId: getSchemas
parameters:
- name: clusterName
in: path
required: true
schema:
type: string
responses:
200:
description: OK
content:
application/json:
schema:
type: array
items:
type: string
/api/clusters/{clusterName}/schemas/{schemaName}:
post:
tags:
- /api/clusters
summary: create a new subject schema
operationId: createNewSchema
parameters:
- name: clusterName
in: path
required: true
schema:
type: string
- name: schemaName
in: path
required: true
schema:
type: string
requestBody:
content:
application/json:
schema:
$ref: '#/components/schemas/NewSchemaSubject'
responses:
200:
description: Updated
content:
application/json:
schema:
$ref: '#/components/schemas/SchemaSubject'
400:
description: Bad request
delete:
tags:
- /api/clusters
summary: delete schema from Schema Registry service
operationId: deleteSchema
parameters:
- name: clusterName
in: path
required: true
schema:
type: string
- name: schemaName
in: path
required: true
schema:
type: string
responses:
200:
description: OK
404:
description: Not found
/api/clusters/{clusterName}/schemas/{schemaName}/versions:
get:
tags:
- /api/clusters
summary: get all version of schema from Schema Registry service
operationId: getSchemaVersions
parameters:
- name: clusterName
in: path
required: true
schema:
type: string
- name: schemaName
in: path
required: true
schema:
type: string
responses:
200:
description: OK
content:
application/json:
schema:
type: array
items:
type: integer
/api/clusters/{clusterName}/schemas/{schemaName}/latest:
get:
tags:
- /api/clusters
summary: get the latest schema from Schema Registry service
operationId: getLatestSchema
parameters:
- name: clusterName
in: path
required: true
schema:
type: string
- name: schemaName
in: path
required: true
schema:
type: string
responses:
200:
description: OK
content:
application/json:
schema:
$ref: '#/components/schemas/SchemaSubject'
delete:
tags:
- /api/clusters
summary: delete the latest schema from schema registry
operationId: deleteLatestSchema
parameters:
- name: clusterName
in: path
required: true
schema:
type: string
- name: schemaName
in: path
required: true
schema:
type: string
responses:
200:
description: OK
404:
description: Not found
/api/clusters/{clusterName}/schemas/{schemaName}/versions/{version}:
get:
tags:
- /api/clusters
summary: get schema by version from Schema Registry service
operationId: getSchemaByVersion
parameters:
- name: clusterName
in: path
required: true
schema:
type: string
- name: schemaName
in: path
required: true
schema:
type: string
- name: version
in: path
required: true
schema:
type: integer
responses:
200:
description: OK
content:
application/json:
schema:
$ref: '#/components/schemas/SchemaSubject'
delete:
tags:
- /api/clusters
summary: delete schema by version from schema registry
operationId: deleteSchemaByVersion
parameters:
- name: clusterName
in: path
required: true
schema:
type: string
- name: schemaName
in: path
required: true
schema:
type: string
- name: version
in: path
required: true
schema:
type: integer
responses:
200:
description: OK
404:
description: Not found
/api/clusters/{clusterName}/schemas/compatibility:
get:
tags:
- /api/clusters
summary: Get global schema compatibility level
operationId: getGlobalSchemaCompatibilityLevel
parameters:
- name: clusterName
in: path
required: true
schema:
type: string
responses:
200:
description: OK
content:
application/json:
schema:
$ref: '#/components/schemas/CompatibilityLevel'
put:
tags:
- /api/clusters
summary: Update compatibility level globally
operationId: updateGlobalSchemaCompatibilityLevel
parameters:
- name: clusterName
in: path
required: true
schema:
type: string
requestBody:
content:
application/json:
schema:
$ref: '#/components/schemas/CompatibilityLevel'
responses:
200:
description: OK
404:
description: Not Found
/api/clusters/{clusterName}/schemas/{schemaName}/compatibility:
put:
tags:
- /api/clusters
summary: Update compatibility level for specific schema.
operationId: updateSchemaCompatibilityLevel
parameters:
- name: clusterName
in: path
required: true
schema:
type: string
- name: schemaName
in: path
required: true
schema:
type: string
requestBody:
content:
application/json:
schema:
$ref: '#/components/schemas/CompatibilityLevel'
responses:
200:
description: OK
404:
description: Not Found
/api/clusters/{clusterName}/schemas/{schemaName}/check:
post:
tags:
- /api/clusters
summary: Check compatibility of the schema.
operationId: checkSchemaCompatibility
parameters:
- name: clusterName
in: path
required: true
schema:
type: string
- name: schemaName
in: path
required: true
schema:
type: string
requestBody:
content:
application/json:
schema:
$ref: '#/components/schemas/NewSchemaSubject'
responses:
200:
description: OK
content:
application/json:
schema:
$ref: '#/components/schemas/CompatibilityCheckResponse'
404:
description: Not Found
components:
schemas:
Cluster:
@ -657,4 +963,60 @@ components:
value:
type: string
additionalProperties:
type: number
type: number
SchemaSubject:
type: object
properties:
subject:
type: string
version:
type: string
id:
type: integer
schema:
type: string
compatibilityLevel:
type: string
required:
- id
NewSchemaSubject:
type: object
properties:
schema:
type: string
required:
- schema
CompatibilityLevel:
type: object
properties:
compatibility:
type: string
enum:
- BACKWARD
- BACKWARD_TRANSITIVE
- FORWARD
- FORWARD_TRANSITIVE
- FULL
- FULL_TRANSITIVE
- NONE
required:
- compatibility
# CompatibilityLevelResponse:
# type: object
# properties:
# compatibilityLevel:
# type: string
# required:
# - compatibilityLevel
CompatibilityCheckResponse:
type: object
properties:
isCompatible:
type: boolean
required:
- isCompatible

View file

@ -31,6 +31,8 @@
<avro.version>1.9.2</avro.version>
<confluent.version>5.5.1</confluent.version>
<apache.commons.version>2.2</apache.commons.version>
<test.containers.version>1.15.1</test.containers.version>
<junit-jupiter-engine.version>5.4.0</junit-jupiter-engine.version>
</properties>
<repositories>