123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204 |
- package com.provectus.kafka.ui.rest;
- import com.provectus.kafka.ui.api.ApiClustersApi;
- import com.provectus.kafka.ui.cluster.model.ConsumerPosition;
- import com.provectus.kafka.ui.cluster.service.ClusterService;
- import com.provectus.kafka.ui.cluster.service.SchemaRegistryService;
- import com.provectus.kafka.ui.model.*;
- import lombok.RequiredArgsConstructor;
- import lombok.extern.log4j.Log4j2;
- import org.apache.commons.lang3.tuple.Pair;
- import org.springframework.http.HttpStatus;
- import org.springframework.http.ResponseEntity;
- import org.springframework.web.bind.annotation.RestController;
- import org.springframework.web.server.ServerWebExchange;
- import reactor.core.publisher.Flux;
- import reactor.core.publisher.Mono;
- import javax.validation.Valid;
- import java.util.Collections;
- import java.util.List;
- import java.util.function.Function;
- @RestController
- @RequiredArgsConstructor
- @Log4j2
- public class MetricsRestController implements ApiClustersApi {
- private final ClusterService clusterService;
- private final SchemaRegistryService schemaRegistryService;
- @Override
- public Mono<ResponseEntity<Flux<Cluster>>> getClusters(ServerWebExchange exchange) {
- return Mono.just(ResponseEntity.ok(Flux.fromIterable(clusterService.getClusters())));
- }
- @Override
- public Mono<ResponseEntity<BrokerMetrics>> getBrokersMetrics(String clusterName, Integer id, ServerWebExchange exchange) {
- return clusterService.getBrokerMetrics(clusterName, id)
- .map(ResponseEntity::ok)
- .onErrorReturn(ResponseEntity.notFound().build());
- }
- @Override
- public Mono<ResponseEntity<ClusterMetrics>> getClusterMetrics(String clusterName, ServerWebExchange exchange) {
- return clusterService.getClusterMetrics(clusterName)
- .map(ResponseEntity::ok)
- .onErrorReturn(ResponseEntity.notFound().build());
- }
- @Override
- public Mono<ResponseEntity<ClusterStats>> getClusterStats(String clusterName, ServerWebExchange exchange) {
- return clusterService.getClusterStats(clusterName)
- .map(ResponseEntity::ok)
- .onErrorReturn(ResponseEntity.notFound().build());
- }
- @Override
- public Mono<ResponseEntity<Flux<Topic>>> getTopics(String clusterName, ServerWebExchange exchange) {
- return Mono.just(ResponseEntity.ok(Flux.fromIterable(clusterService.getTopics(clusterName))));
- }
- @Override
- public Mono<ResponseEntity<TopicDetails>> getTopicDetails(String clusterName, String topicName, ServerWebExchange exchange) {
- return Mono.just(
- clusterService.getTopicDetails(clusterName, topicName)
- .map(ResponseEntity::ok)
- .orElse(ResponseEntity.notFound().build())
- );
- }
- @Override
- public Mono<ResponseEntity<Flux<TopicConfig>>> getTopicConfigs(String clusterName, String topicName, ServerWebExchange exchange) {
- return Mono.just(
- clusterService.getTopicConfigs(clusterName, topicName)
- .map(Flux::fromIterable)
- .map(ResponseEntity::ok)
- .orElse(ResponseEntity.notFound().build())
- );
- }
- @Override
- public Mono<ResponseEntity<Flux<TopicMessage>>> getTopicMessages(String clusterName, String topicName, @Valid SeekType seekType, @Valid List<String> seekTo, @Valid Integer limit, @Valid String q, ServerWebExchange exchange) {
- return parseConsumerPosition(seekType, seekTo)
- .map(consumerPosition -> ResponseEntity.ok(clusterService.getMessages(clusterName, topicName, consumerPosition, q, limit)));
- }
- @Override
- public Mono<ResponseEntity<Topic>> createTopic(String clusterName, @Valid Mono<TopicFormData> topicFormData, ServerWebExchange exchange) {
- return clusterService.createTopic(clusterName, topicFormData)
- .map(s -> new ResponseEntity<>(s, HttpStatus.OK))
- .switchIfEmpty(Mono.just(ResponseEntity.notFound().build()));
- }
- @Override
- public Mono<ResponseEntity<Flux<Broker>>> getBrokers(String clusterName, ServerWebExchange exchange) {
- return Mono.just(ResponseEntity.ok(clusterService.getBrokers(clusterName)));
- }
- @Override
- public Mono<ResponseEntity<Flux<ConsumerGroup>>> getConsumerGroups(String clusterName, ServerWebExchange exchange) {
- return clusterService.getConsumerGroups(clusterName)
- .map(Flux::fromIterable)
- .map(ResponseEntity::ok)
- .switchIfEmpty(Mono.just(ResponseEntity.notFound().build())); // TODO: check behaviour on cluster not found and empty groups list
- }
- @Override
- public Mono<ResponseEntity<SchemaSubject>> getLatestSchema(String clusterName, String schemaName, ServerWebExchange exchange) {
- return schemaRegistryService.getLatestSchemaSubject(clusterName, schemaName).map(ResponseEntity::ok);
- }
- @Override
- public Mono<ResponseEntity<SchemaSubject>> getSchemaByVersion(String clusterName, String schemaName, Integer version, ServerWebExchange exchange) {
- return schemaRegistryService.getSchemaSubjectByVersion(clusterName, schemaName, version).map(ResponseEntity::ok);
- }
- @Override
- public Mono<ResponseEntity<Flux<String>>> getSchemas(String clusterName, ServerWebExchange exchange) {
- Flux<String> subjects = schemaRegistryService.getAllSchemaSubjects(clusterName);
- return Mono.just(ResponseEntity.ok(subjects));
- }
- @Override
- public Mono<ResponseEntity<Flux<Integer>>> getSchemaVersions(String clusterName, String subjectName, ServerWebExchange exchange) {
- return Mono.just(ResponseEntity.ok(schemaRegistryService.getSchemaSubjectVersions(clusterName, subjectName)));
- }
- @Override
- public Mono<ResponseEntity<Void>> deleteLatestSchema(String clusterName, String schemaName, ServerWebExchange exchange) {
- return schemaRegistryService.deleteLatestSchemaSubject(clusterName, schemaName);
- }
- @Override
- public Mono<ResponseEntity<Void>> deleteSchemaByVersion(String clusterName, String subjectName, Integer version, ServerWebExchange exchange) {
- return schemaRegistryService.deleteSchemaSubjectByVersion(clusterName, subjectName, version);
- }
- @Override
- public Mono<ResponseEntity<Void>> deleteSchema(String clusterName, String subjectName, ServerWebExchange exchange) {
- return schemaRegistryService.deleteSchemaSubject(clusterName, subjectName);
- }
- @Override
- public Mono<ResponseEntity<SchemaSubject>> createNewSchema(String clusterName, String schemaName,
- @Valid Mono<NewSchemaSubject> newSchemaSubject,
- ServerWebExchange exchange) {
- return schemaRegistryService.createNewSubject(clusterName, schemaName, newSchemaSubject);
- }
- @Override
- public Mono<ResponseEntity<ConsumerGroupDetails>> getConsumerGroup(String clusterName, String consumerGroupId, ServerWebExchange exchange) {
- return clusterService.getConsumerGroupDetail(clusterName, consumerGroupId).map(ResponseEntity::ok);
- }
- @Override
- public Mono<ResponseEntity<Topic>> updateTopic(String clusterId, String topicName, @Valid Mono<TopicFormData> topicFormData, ServerWebExchange exchange) {
- return clusterService.updateTopic(clusterId, topicName, topicFormData).map(ResponseEntity::ok);
- }
- @Override
- public Mono<ResponseEntity<CompatibilityLevel>> getGlobalSchemaCompatibilityLevel(String clusterName, ServerWebExchange exchange) {
- return schemaRegistryService.getGlobalSchemaCompatibilityLevel(clusterName)
- .map(ResponseEntity::ok)
- .defaultIfEmpty(ResponseEntity.notFound().build());
- }
- @Override
- public Mono<ResponseEntity<Void>> updateGlobalSchemaCompatibilityLevel(String clusterName, @Valid Mono<CompatibilityLevel> compatibilityLevel, ServerWebExchange exchange) {
- log.info("Updating schema compatibility globally");
- return schemaRegistryService.updateSchemaCompatibility(clusterName, compatibilityLevel)
- .map(ResponseEntity::ok);
- }
- @Override
- public Mono<ResponseEntity<CompatibilityCheckResponse>> checkSchemaCompatibility(String clusterName, String schemaName,
- @Valid Mono<NewSchemaSubject> newSchemaSubject,
- ServerWebExchange exchange) {
- return schemaRegistryService.checksSchemaCompatibility(clusterName, schemaName, newSchemaSubject)
- .map(ResponseEntity::ok);
- }
- @Override
- public Mono<ResponseEntity<Void>> updateSchemaCompatibilityLevel(String clusterName, String schemaName, @Valid Mono<CompatibilityLevel> compatibilityLevel, ServerWebExchange exchange) {
- log.info("Updating schema compatibility for schema: {}", schemaName);
- return schemaRegistryService.updateSchemaCompatibility(clusterName, schemaName, compatibilityLevel)
- .map(ResponseEntity::ok);
- }
- private Mono<ConsumerPosition> parseConsumerPosition(SeekType seekType, List<String> seekTo) {
- return Mono.justOrEmpty(seekTo)
- .defaultIfEmpty(Collections.emptyList())
- .flatMapIterable(Function.identity())
- .map(p -> {
- String[] splited = p.split("::");
- if (splited.length != 2) {
- throw new IllegalArgumentException("Wrong seekTo argument format. See API docs for details");
- }
- return Pair.of(Integer.parseInt(splited[0]), Long.parseLong(splited[1]));
- })
- .collectMap(Pair::getKey, Pair::getValue)
- .map(positions -> new ConsumerPosition(seekType != null ? seekType : SeekType.BEGINNING, positions));
- }
- }
|