Merge branch 'master' of github.com:provectus/kafka-ui into feature/paginate_topics_backend

This commit is contained in:
Ramazan Yapparov 2021-03-15 19:24:47 +03:00
commit a8e992ff89
77 changed files with 765 additions and 440 deletions

View file

@ -1,5 +1,8 @@
name: backend name: backend
on: on:
pull_request:
branches:
- "master"
push: push:
branches: branches:
- "**" - "**"

View file

@ -1,5 +1,8 @@
name: frontend name: frontend
on: on:
pull_request:
branches:
- "master"
push: push:
branches: branches:
- "**" - "**"

View file

@ -0,0 +1,15 @@
package com.provectus.kafka.ui.client;
import com.provectus.kafka.ui.connect.api.KafkaConnectClientApi;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
public final class KafkaConnectClients {
private static final Map<String, KafkaConnectClientApi> CACHE = new ConcurrentHashMap<>();
public static KafkaConnectClientApi withBaseUrl(String basePath) {
return CACHE.computeIfAbsent(basePath, RetryingKafkaConnectClient::new);
}
}

View file

@ -1,9 +1,9 @@
package com.provectus.kafka.ui.cluster.client; package com.provectus.kafka.ui.client;
import com.provectus.kafka.ui.cluster.exception.RebalanceInProgressException; import com.provectus.kafka.ui.exception.RebalanceInProgressException;
import com.provectus.kafka.ui.cluster.exception.ValidationException; import com.provectus.kafka.ui.exception.ValidationException;
import com.provectus.kafka.ui.connect.ApiClient; import com.provectus.kafka.ui.connect.ApiClient;
import com.provectus.kafka.ui.connect.api.ConnectApi; import com.provectus.kafka.ui.connect.api.KafkaConnectClientApi;
import com.provectus.kafka.ui.connect.model.Connector; import com.provectus.kafka.ui.connect.model.Connector;
import com.provectus.kafka.ui.connect.model.NewConnector; import com.provectus.kafka.ui.connect.model.NewConnector;
import lombok.extern.log4j.Log4j2; import lombok.extern.log4j.Log4j2;
@ -22,7 +22,7 @@ import java.util.List;
import java.util.Map; import java.util.Map;
@Log4j2 @Log4j2
public class RetryingKafkaConnectClient extends ConnectApi { public class RetryingKafkaConnectClient extends KafkaConnectClientApi {
private static final int MAX_RETRIES = 5; private static final int MAX_RETRIES = 5;
public RetryingKafkaConnectClient(String basePath) { public RetryingKafkaConnectClient(String basePath) {

View file

@ -1,15 +0,0 @@
package com.provectus.kafka.ui.cluster.client;
import com.provectus.kafka.ui.connect.api.ConnectApi;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
public final class KafkaConnectClients {
private static final Map<String, ConnectApi> CACHE = new ConcurrentHashMap<>();
public static ConnectApi withBaseUrl(String basePath) {
return CACHE.computeIfAbsent(basePath, RetryingKafkaConnectClient::new);
}
}

View file

@ -1,4 +1,4 @@
package com.provectus.kafka.ui.cluster.config; package com.provectus.kafka.ui.config;
import java.util.Properties; import java.util.Properties;
import lombok.Data; import lombok.Data;

View file

@ -1,6 +1,6 @@
package com.provectus.kafka.ui.cluster.config; package com.provectus.kafka.ui.config;
import com.provectus.kafka.ui.cluster.util.JmxPoolFactory; import com.provectus.kafka.ui.util.JmxPoolFactory;
import org.apache.commons.pool2.KeyedObjectPool; import org.apache.commons.pool2.KeyedObjectPool;
import org.apache.commons.pool2.impl.GenericKeyedObjectPool; import org.apache.commons.pool2.impl.GenericKeyedObjectPool;
import org.apache.commons.pool2.impl.GenericKeyedObjectPoolConfig; import org.apache.commons.pool2.impl.GenericKeyedObjectPoolConfig;

View file

@ -1,4 +1,4 @@
package com.provectus.kafka.ui.rest.config; package com.provectus.kafka.ui.config;
import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Profile; import org.springframework.context.annotation.Profile;

View file

@ -1,4 +1,4 @@
package com.provectus.kafka.ui.rest.config; package com.provectus.kafka.ui.config;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import org.springframework.web.server.ServerWebExchange; import org.springframework.web.server.ServerWebExchange;

View file

@ -1,8 +1,8 @@
package com.provectus.kafka.ui.rest.config; package com.provectus.kafka.ui.config;
import com.provectus.kafka.ui.cluster.exception.NotFoundException; import com.provectus.kafka.ui.exception.NotFoundException;
import com.provectus.kafka.ui.cluster.exception.ReadOnlyException; import com.provectus.kafka.ui.exception.ReadOnlyException;
import com.provectus.kafka.ui.cluster.model.ClustersStorage; import com.provectus.kafka.ui.service.ClustersStorage;
import lombok.RequiredArgsConstructor; import lombok.RequiredArgsConstructor;
import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.NotNull;
import org.springframework.core.annotation.Order; import org.springframework.core.annotation.Order;

View file

@ -1,4 +1,4 @@
package com.provectus.kafka.ui.cluster.config; package com.provectus.kafka.ui.config;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Bean;

View file

@ -0,0 +1,32 @@
package com.provectus.kafka.ui.controller;
import com.provectus.kafka.ui.api.BrokersApi;
import com.provectus.kafka.ui.service.ClusterService;
import com.provectus.kafka.ui.model.Broker;
import com.provectus.kafka.ui.model.BrokerMetrics;
import lombok.RequiredArgsConstructor;
import lombok.extern.log4j.Log4j2;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.server.ServerWebExchange;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
@RestController
@RequiredArgsConstructor
@Log4j2
public class BrokersController implements BrokersApi {
private final ClusterService clusterService;
@Override
public Mono<ResponseEntity<BrokerMetrics>> getBrokersMetrics(String clusterName, Integer id, ServerWebExchange exchange) {
return clusterService.getBrokerMetrics(clusterName, id)
.map(ResponseEntity::ok)
.onErrorReturn(ResponseEntity.notFound().build());
}
@Override
public Mono<ResponseEntity<Flux<Broker>>> getBrokers(String clusterName, ServerWebExchange exchange) {
return Mono.just(ResponseEntity.ok(clusterService.getBrokers(clusterName)));
}
}

View file

@ -0,0 +1,40 @@
package com.provectus.kafka.ui.controller;
import com.provectus.kafka.ui.api.ClustersApi;
import com.provectus.kafka.ui.service.ClusterService;
import com.provectus.kafka.ui.model.Cluster;
import com.provectus.kafka.ui.model.ClusterMetrics;
import com.provectus.kafka.ui.model.ClusterStats;
import lombok.RequiredArgsConstructor;
import lombok.extern.log4j.Log4j2;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.server.ServerWebExchange;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
@RestController
@RequiredArgsConstructor
@Log4j2
public class ClustersController implements ClustersApi {
private final ClusterService clusterService;
@Override
public Mono<ResponseEntity<ClusterMetrics>> getClusterMetrics(String clusterName, ServerWebExchange exchange) {
return clusterService.getClusterMetrics(clusterName)
.map(ResponseEntity::ok)
.onErrorReturn(ResponseEntity.notFound().build());
}
@Override
public Mono<ResponseEntity<ClusterStats>> getClusterStats(String clusterName, ServerWebExchange exchange) {
return clusterService.getClusterStats(clusterName)
.map(ResponseEntity::ok)
.onErrorReturn(ResponseEntity.notFound().build());
}
@Override
public Mono<ResponseEntity<Flux<Cluster>>> getClusters(ServerWebExchange exchange) {
return Mono.just(ResponseEntity.ok(Flux.fromIterable(clusterService.getClusters())));
}
}

View file

@ -0,0 +1,35 @@
package com.provectus.kafka.ui.controller;
import com.provectus.kafka.ui.api.ConsumerGroupsApi;
import com.provectus.kafka.ui.service.ClusterService;
import com.provectus.kafka.ui.model.ConsumerGroup;
import com.provectus.kafka.ui.model.ConsumerGroupDetails;
import lombok.RequiredArgsConstructor;
import lombok.extern.log4j.Log4j2;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.server.ServerWebExchange;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
@RestController
@RequiredArgsConstructor
@Log4j2
public class ConsumerGroupsController implements ConsumerGroupsApi {
private final ClusterService clusterService;
@Override
public Mono<ResponseEntity<ConsumerGroupDetails>> getConsumerGroup(
String clusterName, String consumerGroupId, ServerWebExchange exchange) {
return clusterService.getConsumerGroupDetail(clusterName, consumerGroupId).map(ResponseEntity::ok);
}
@Override
public Mono<ResponseEntity<Flux<ConsumerGroup>>> getConsumerGroups(String clusterName, ServerWebExchange exchange) {
return clusterService.getConsumerGroups(clusterName)
.map(Flux::fromIterable)
.map(ResponseEntity::ok)
.switchIfEmpty(Mono.just(ResponseEntity.notFound().build())); // TODO: check behaviour on cluster not found and empty groups list
}
}

View file

@ -1,7 +1,7 @@
package com.provectus.kafka.ui.rest; package com.provectus.kafka.ui.controller;
import com.provectus.kafka.ui.api.ApiClustersConnectsApi; import com.provectus.kafka.ui.api.KafkaConnectApi;
import com.provectus.kafka.ui.cluster.service.KafkaConnectService; import com.provectus.kafka.ui.service.KafkaConnectService;
import com.provectus.kafka.ui.model.*; import com.provectus.kafka.ui.model.*;
import lombok.RequiredArgsConstructor; import lombok.RequiredArgsConstructor;
import lombok.extern.log4j.Log4j2; import lombok.extern.log4j.Log4j2;
@ -17,7 +17,7 @@ import java.util.Map;
@RestController @RestController
@RequiredArgsConstructor @RequiredArgsConstructor
@Log4j2 @Log4j2
public class KafkaConnectRestController implements ApiClustersConnectsApi { public class KafkaConnectController implements KafkaConnectApi {
private final KafkaConnectService kafkaConnectService; private final KafkaConnectService kafkaConnectService;
@Override @Override

View file

@ -0,0 +1,63 @@
package com.provectus.kafka.ui.controller;
import com.provectus.kafka.ui.api.MessagesApi;
import com.provectus.kafka.ui.model.ConsumerPosition;
import com.provectus.kafka.ui.service.ClusterService;
import com.provectus.kafka.ui.model.SeekType;
import com.provectus.kafka.ui.model.TopicMessage;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.function.Function;
import javax.validation.Valid;
import lombok.RequiredArgsConstructor;
import lombok.extern.log4j.Log4j2;
import org.apache.commons.lang3.tuple.Pair;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.server.ServerWebExchange;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
@RestController
@RequiredArgsConstructor
@Log4j2
public class MessagesController implements MessagesApi {
private final ClusterService clusterService;
@Override
public Mono<ResponseEntity<Void>> deleteTopicMessages(
String clusterName, String topicName, @Valid List<Integer> partitions, ServerWebExchange exchange) {
return clusterService.deleteTopicMessages(
clusterName,
topicName,
Optional.ofNullable(partitions).orElse(List.of())
).map(ResponseEntity::ok);
}
@Override
public Mono<ResponseEntity<Flux<TopicMessage>>> getTopicMessages(
String clusterName, String topicName, @Valid SeekType seekType, @Valid List<String> seekTo,
@Valid Integer limit, @Valid String q, ServerWebExchange exchange) {
return parseConsumerPosition(seekType, seekTo)
.map(consumerPosition -> ResponseEntity.ok(clusterService.getMessages(clusterName, topicName, consumerPosition, q, limit)));
}
private Mono<ConsumerPosition> parseConsumerPosition(SeekType seekType, List<String> seekTo) {
return Mono.justOrEmpty(seekTo)
.defaultIfEmpty(Collections.emptyList())
.flatMapIterable(Function.identity())
.map(p -> {
String[] splited = p.split("::");
if (splited.length != 2) {
throw new IllegalArgumentException("Wrong seekTo argument format. See API docs for details");
}
return Pair.of(Integer.parseInt(splited[0]), Long.parseLong(splited[1]));
})
.collectMap(Pair::getKey, Pair::getValue)
.map(positions -> new ConsumerPosition(seekType != null ? seekType : SeekType.BEGINNING, positions));
}
}

View file

@ -0,0 +1,106 @@
package com.provectus.kafka.ui.controller;
import com.provectus.kafka.ui.api.SchemasApi;
import com.provectus.kafka.ui.service.SchemaRegistryService;
import com.provectus.kafka.ui.model.CompatibilityCheckResponse;
import com.provectus.kafka.ui.model.CompatibilityLevel;
import com.provectus.kafka.ui.model.NewSchemaSubject;
import com.provectus.kafka.ui.model.SchemaSubject;
import javax.validation.Valid;
import lombok.RequiredArgsConstructor;
import lombok.extern.log4j.Log4j2;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.server.ServerWebExchange;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
@RestController
@RequiredArgsConstructor
@Log4j2
public class SchemasController implements SchemasApi {
private final SchemaRegistryService schemaRegistryService;
@Override
public Mono<ResponseEntity<CompatibilityCheckResponse>> checkSchemaCompatibility(
String clusterName, String subject, @Valid Mono<NewSchemaSubject> newSchemaSubject, ServerWebExchange exchange) {
return schemaRegistryService.checksSchemaCompatibility(clusterName, subject, newSchemaSubject)
.map(ResponseEntity::ok);
}
@Override
public Mono<ResponseEntity<SchemaSubject>> createNewSchema(String clusterName,
@Valid Mono<NewSchemaSubject> newSchemaSubject,
ServerWebExchange exchange) {
return schemaRegistryService
.registerNewSchema(clusterName, newSchemaSubject)
.map(ResponseEntity::ok);
}
@Override
public Mono<ResponseEntity<Void>> deleteLatestSchema(
String clusterName, String subject, ServerWebExchange exchange) {
return schemaRegistryService.deleteLatestSchemaSubject(clusterName, subject);
}
@Override
public Mono<ResponseEntity<Void>> deleteSchema(
String clusterName, String subjectName, ServerWebExchange exchange) {
return schemaRegistryService.deleteSchemaSubjectEntirely(clusterName, subjectName);
}
@Override
public Mono<ResponseEntity<Void>> deleteSchemaByVersion(
String clusterName, String subjectName, Integer version, ServerWebExchange exchange) {
return schemaRegistryService.deleteSchemaSubjectByVersion(clusterName, subjectName, version);
}
@Override
public Mono<ResponseEntity<Flux<SchemaSubject>>> getAllVersionsBySubject(
String clusterName, String subjectName, ServerWebExchange exchange) {
Flux<SchemaSubject> schemas = schemaRegistryService.getAllVersionsBySubject(clusterName, subjectName);
return Mono.just(ResponseEntity.ok(schemas));
}
@Override
public Mono<ResponseEntity<CompatibilityLevel>> getGlobalSchemaCompatibilityLevel(
String clusterName, ServerWebExchange exchange) {
return schemaRegistryService.getGlobalSchemaCompatibilityLevel(clusterName)
.map(ResponseEntity::ok)
.defaultIfEmpty(ResponseEntity.notFound().build());
}
@Override
public Mono<ResponseEntity<SchemaSubject>> getLatestSchema(String clusterName, String subject, ServerWebExchange exchange) {
return schemaRegistryService.getLatestSchemaVersionBySubject(clusterName, subject).map(ResponseEntity::ok);
}
@Override
public Mono<ResponseEntity<SchemaSubject>> getSchemaByVersion(
String clusterName, String subject, Integer version, ServerWebExchange exchange) {
return schemaRegistryService.getSchemaSubjectByVersion(clusterName, subject, version).map(ResponseEntity::ok);
}
@Override
public Mono<ResponseEntity<Flux<SchemaSubject>>> getSchemas(String clusterName, ServerWebExchange exchange) {
Flux<SchemaSubject> subjects = schemaRegistryService.getAllLatestVersionSchemas(clusterName);
return Mono.just(ResponseEntity.ok(subjects));
}
@Override
public Mono<ResponseEntity<Void>> updateGlobalSchemaCompatibilityLevel(
String clusterName, @Valid Mono<CompatibilityLevel> compatibilityLevel, ServerWebExchange exchange) {
log.info("Updating schema compatibility globally");
return schemaRegistryService.updateSchemaCompatibility(clusterName, compatibilityLevel)
.map(ResponseEntity::ok);
}
@Override
public Mono<ResponseEntity<Void>> updateSchemaCompatibilityLevel(
String clusterName, String subject, @Valid Mono<CompatibilityLevel> compatibilityLevel, ServerWebExchange exchange) {
log.info("Updating schema compatibility for subject: {}", subject);
return schemaRegistryService.updateSchemaCompatibility(clusterName, subject, compatibilityLevel)
.map(ResponseEntity::ok);
}
}

View file

@ -0,0 +1,71 @@
package com.provectus.kafka.ui.controller;
import com.provectus.kafka.ui.api.TopicsApi;
import com.provectus.kafka.ui.service.ClusterService;
import com.provectus.kafka.ui.model.Topic;
import com.provectus.kafka.ui.model.TopicConfig;
import com.provectus.kafka.ui.model.TopicDetails;
import com.provectus.kafka.ui.model.TopicFormData;
import javax.validation.Valid;
import lombok.RequiredArgsConstructor;
import lombok.extern.log4j.Log4j2;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.server.ServerWebExchange;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
@RestController
@RequiredArgsConstructor
@Log4j2
public class TopicsController implements TopicsApi {
private final ClusterService clusterService;
@Override
public Mono<ResponseEntity<Topic>> createTopic(
String clusterName, @Valid Mono<TopicFormData> topicFormData, ServerWebExchange exchange) {
return clusterService.createTopic(clusterName, topicFormData)
.map(s -> new ResponseEntity<>(s, HttpStatus.OK))
.switchIfEmpty(Mono.just(ResponseEntity.notFound().build()));
}
@Override
public Mono<ResponseEntity<Void>> deleteTopic(
String clusterName, String topicName, ServerWebExchange exchange) {
return clusterService.deleteTopic(clusterName, topicName).map(ResponseEntity::ok);
}
@Override
public Mono<ResponseEntity<Flux<TopicConfig>>> getTopicConfigs(
String clusterName, String topicName, ServerWebExchange exchange) {
return Mono.just(
clusterService.getTopicConfigs(clusterName, topicName)
.map(Flux::fromIterable)
.map(ResponseEntity::ok)
.orElse(ResponseEntity.notFound().build())
);
}
@Override
public Mono<ResponseEntity<TopicDetails>> getTopicDetails(
String clusterName, String topicName, ServerWebExchange exchange) {
return Mono.just(
clusterService.getTopicDetails(clusterName, topicName)
.map(ResponseEntity::ok)
.orElse(ResponseEntity.notFound().build())
);
}
@Override
public Mono<ResponseEntity<Flux<Topic>>> getTopics(String clusterName, ServerWebExchange exchange) {
return Mono.just(ResponseEntity.ok(Flux.fromIterable(clusterService.getTopics(clusterName))));
}
@Override
public Mono<ResponseEntity<Topic>> updateTopic(
String clusterId, String topicName, @Valid Mono<TopicFormData> topicFormData, ServerWebExchange exchange) {
return clusterService.updateTopic(clusterId, topicName, topicFormData).map(ResponseEntity::ok);
}
}

View file

@ -1,8 +1,8 @@
package com.provectus.kafka.ui.cluster.deserialization; package com.provectus.kafka.ui.deserialization;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
import com.provectus.kafka.ui.cluster.model.ClustersStorage; import com.provectus.kafka.ui.service.ClustersStorage;
import com.provectus.kafka.ui.cluster.model.KafkaCluster; import com.provectus.kafka.ui.model.KafkaCluster;
import lombok.RequiredArgsConstructor; import lombok.RequiredArgsConstructor;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;

View file

@ -1,4 +1,4 @@
package com.provectus.kafka.ui.cluster.deserialization; package com.provectus.kafka.ui.deserialization;
import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;

View file

@ -1,4 +1,4 @@
package com.provectus.kafka.ui.cluster.deserialization; package com.provectus.kafka.ui.deserialization;
import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.common.utils.Bytes;

View file

@ -1,9 +1,9 @@
package com.provectus.kafka.ui.cluster.deserialization; package com.provectus.kafka.ui.deserialization;
import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.protobuf.Message; import com.google.protobuf.Message;
import com.provectus.kafka.ui.cluster.model.KafkaCluster; import com.provectus.kafka.ui.model.KafkaCluster;
import io.confluent.kafka.schemaregistry.SchemaProvider; import io.confluent.kafka.schemaregistry.SchemaProvider;
import io.confluent.kafka.schemaregistry.avro.AvroSchemaProvider; import io.confluent.kafka.schemaregistry.avro.AvroSchemaProvider;
import io.confluent.kafka.schemaregistry.avro.AvroSchemaUtils; import io.confluent.kafka.schemaregistry.avro.AvroSchemaUtils;

View file

@ -1,4 +1,4 @@
package com.provectus.kafka.ui.cluster.deserialization; package com.provectus.kafka.ui.deserialization;
import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.serialization.StringDeserializer;

View file

@ -1,4 +1,4 @@
package com.provectus.kafka.ui.cluster.exception; package com.provectus.kafka.ui.exception;
import org.springframework.http.HttpStatus; import org.springframework.http.HttpStatus;

View file

@ -1,4 +1,4 @@
package com.provectus.kafka.ui.cluster.exception; package com.provectus.kafka.ui.exception;
import org.springframework.http.HttpStatus; import org.springframework.http.HttpStatus;

View file

@ -1,4 +1,4 @@
package com.provectus.kafka.ui.cluster.exception; package com.provectus.kafka.ui.exception;
import org.springframework.boot.web.reactive.error.DefaultErrorAttributes; import org.springframework.boot.web.reactive.error.DefaultErrorAttributes;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;

View file

@ -1,4 +1,4 @@
package com.provectus.kafka.ui.cluster.exception; package com.provectus.kafka.ui.exception;
import org.springframework.boot.autoconfigure.web.ResourceProperties; import org.springframework.boot.autoconfigure.web.ResourceProperties;
import org.springframework.boot.autoconfigure.web.reactive.error.AbstractErrorWebExceptionHandler; import org.springframework.boot.autoconfigure.web.reactive.error.AbstractErrorWebExceptionHandler;

View file

@ -1,4 +1,4 @@
package com.provectus.kafka.ui.cluster.exception; package com.provectus.kafka.ui.exception;
import org.springframework.http.HttpStatus; import org.springframework.http.HttpStatus;

View file

@ -1,4 +1,4 @@
package com.provectus.kafka.ui.cluster.exception; package com.provectus.kafka.ui.exception;
import org.springframework.http.HttpStatus; import org.springframework.http.HttpStatus;

View file

@ -1,4 +1,4 @@
package com.provectus.kafka.ui.cluster.exception; package com.provectus.kafka.ui.exception;
import org.springframework.http.HttpStatus; import org.springframework.http.HttpStatus;

View file

@ -1,4 +1,4 @@
package com.provectus.kafka.ui.cluster.exception; package com.provectus.kafka.ui.exception;
import org.springframework.http.HttpStatus; import org.springframework.http.HttpStatus;

View file

@ -1,4 +1,4 @@
package com.provectus.kafka.ui.cluster.exception; package com.provectus.kafka.ui.exception;
import org.springframework.http.HttpStatus; import org.springframework.http.HttpStatus;

View file

@ -1,10 +1,9 @@
package com.provectus.kafka.ui.cluster.mapper; package com.provectus.kafka.ui.mapper;
import com.provectus.kafka.ui.cluster.config.ClustersProperties; import com.provectus.kafka.ui.config.ClustersProperties;
import com.provectus.kafka.ui.cluster.model.*;
import com.provectus.kafka.ui.cluster.model.schemaregistry.InternalCompatibilityCheck;
import com.provectus.kafka.ui.cluster.model.schemaregistry.InternalCompatibilityLevel;
import com.provectus.kafka.ui.model.*; import com.provectus.kafka.ui.model.*;
import com.provectus.kafka.ui.model.schemaregistry.InternalCompatibilityCheck;
import com.provectus.kafka.ui.model.schemaregistry.InternalCompatibilityLevel;
import java.util.Properties; import java.util.Properties;
import org.mapstruct.Mapper; import org.mapstruct.Mapper;
import org.mapstruct.Mapping; import org.mapstruct.Mapping;

View file

@ -1,4 +1,4 @@
package com.provectus.kafka.ui.cluster.mapper; package com.provectus.kafka.ui.mapper;
import com.provectus.kafka.ui.connect.model.ConnectorStatusConnector; import com.provectus.kafka.ui.connect.model.ConnectorStatusConnector;
import com.provectus.kafka.ui.connect.model.ConnectorTask; import com.provectus.kafka.ui.connect.model.ConnectorTask;

View file

@ -1,11 +1,9 @@
package com.provectus.kafka.ui.cluster.model; package com.provectus.kafka.ui.model;
import lombok.Value; import lombok.Value;
import java.util.Map; import java.util.Map;
import com.provectus.kafka.ui.model.SeekType;
@Value @Value
public class ConsumerPosition { public class ConsumerPosition {

View file

@ -1,6 +1,6 @@
package com.provectus.kafka.ui.cluster.model; package com.provectus.kafka.ui.model;
import com.provectus.kafka.ui.cluster.util.ClusterUtil; import com.provectus.kafka.ui.util.ClusterUtil;
import lombok.Data; import lombok.Data;
import lombok.RequiredArgsConstructor; import lombok.RequiredArgsConstructor;
import org.apache.kafka.clients.admin.AdminClient; import org.apache.kafka.clients.admin.AdminClient;

View file

@ -1,4 +1,4 @@
package com.provectus.kafka.ui.cluster.model; package com.provectus.kafka.ui.model;
import lombok.Builder; import lombok.Builder;
import lombok.Data; import lombok.Data;

View file

@ -1,6 +1,5 @@
package com.provectus.kafka.ui.cluster.model; package com.provectus.kafka.ui.model;
import com.provectus.kafka.ui.model.Metric;
import lombok.Builder; import lombok.Builder;
import lombok.Data; import lombok.Data;

View file

@ -1,6 +1,5 @@
package com.provectus.kafka.ui.cluster.model; package com.provectus.kafka.ui.model;
import com.provectus.kafka.ui.model.Metric;
import lombok.Builder; import lombok.Builder;
import lombok.Data; import lombok.Data;

View file

@ -1,4 +1,4 @@
package com.provectus.kafka.ui.cluster.model; package com.provectus.kafka.ui.model;
import lombok.Builder; import lombok.Builder;
import lombok.Data; import lombok.Data;

View file

@ -1,4 +1,4 @@
package com.provectus.kafka.ui.cluster.model; package com.provectus.kafka.ui.model;
import lombok.Builder; import lombok.Builder;
import lombok.Data; import lombok.Data;

View file

@ -1,4 +1,4 @@
package com.provectus.kafka.ui.cluster.model; package com.provectus.kafka.ui.model;
import lombok.Builder; import lombok.Builder;
import lombok.Data; import lombok.Data;

View file

@ -1,8 +1,7 @@
package com.provectus.kafka.ui.cluster.model; package com.provectus.kafka.ui.model;
import lombok.Builder; import lombok.Builder;
import lombok.Data; import lombok.Data;
import org.apache.kafka.common.TopicPartition;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;

View file

@ -1,4 +1,4 @@
package com.provectus.kafka.ui.cluster.model; package com.provectus.kafka.ui.model;
import lombok.Builder; import lombok.Builder;

View file

@ -1,6 +1,4 @@
package com.provectus.kafka.ui.cluster.model; package com.provectus.kafka.ui.model;
import com.provectus.kafka.ui.model.ServerStatus;
import java.util.List; import java.util.List;
import java.util.Properties; import java.util.Properties;

View file

@ -1,4 +1,4 @@
package com.provectus.kafka.ui.cluster.model; package com.provectus.kafka.ui.model;
import lombok.Builder; import lombok.Builder;
import lombok.Data; import lombok.Data;

View file

@ -1,4 +1,4 @@
package com.provectus.kafka.ui.cluster.model.schemaregistry; package com.provectus.kafka.ui.model.schemaregistry;
import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonProperty;
import lombok.Data; import lombok.Data;

View file

@ -1,4 +1,4 @@
package com.provectus.kafka.ui.cluster.model.schemaregistry; package com.provectus.kafka.ui.model.schemaregistry;
import lombok.Data; import lombok.Data;

View file

@ -1,4 +1,4 @@
package com.provectus.kafka.ui.cluster.model.schemaregistry; package com.provectus.kafka.ui.model.schemaregistry;
import com.fasterxml.jackson.annotation.JsonInclude; import com.fasterxml.jackson.annotation.JsonInclude;
import com.provectus.kafka.ui.model.SchemaType; import com.provectus.kafka.ui.model.SchemaType;

View file

@ -1,4 +1,4 @@
package com.provectus.kafka.ui.cluster.model.schemaregistry; package com.provectus.kafka.ui.model.schemaregistry;
import lombok.Data; import lombok.Data;

View file

@ -1,213 +0,0 @@
package com.provectus.kafka.ui.rest;
import com.provectus.kafka.ui.api.ApiClustersApi;
import com.provectus.kafka.ui.cluster.model.ConsumerPosition;
import com.provectus.kafka.ui.cluster.service.ClusterService;
import com.provectus.kafka.ui.cluster.service.SchemaRegistryService;
import com.provectus.kafka.ui.model.*;
import lombok.RequiredArgsConstructor;
import lombok.extern.log4j.Log4j2;
import org.apache.commons.lang3.tuple.Pair;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.server.ServerWebExchange;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import javax.validation.Valid;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.function.Function;
@RestController
@RequiredArgsConstructor
@Log4j2
public class MetricsRestController implements ApiClustersApi {
private final ClusterService clusterService;
private final SchemaRegistryService schemaRegistryService;
@Override
public Mono<ResponseEntity<Flux<Cluster>>> getClusters(ServerWebExchange exchange) {
return Mono.just(ResponseEntity.ok(Flux.fromIterable(clusterService.getClusters())));
}
@Override
public Mono<ResponseEntity<BrokerMetrics>> getBrokersMetrics(String clusterName, Integer id, ServerWebExchange exchange) {
return clusterService.getBrokerMetrics(clusterName, id)
.map(ResponseEntity::ok)
.onErrorReturn(ResponseEntity.notFound().build());
}
@Override
public Mono<ResponseEntity<ClusterMetrics>> getClusterMetrics(String clusterName, ServerWebExchange exchange) {
return clusterService.getClusterMetrics(clusterName)
.map(ResponseEntity::ok)
.onErrorReturn(ResponseEntity.notFound().build());
}
@Override
public Mono<ResponseEntity<ClusterStats>> getClusterStats(String clusterName, ServerWebExchange exchange) {
return clusterService.getClusterStats(clusterName)
.map(ResponseEntity::ok)
.onErrorReturn(ResponseEntity.notFound().build());
}
@Override
public Mono<ResponseEntity<TopicsResponse>> getTopics(String clusterName, @Valid Integer page, @Valid Integer pageSize, ServerWebExchange exchange) {
return Mono.just(ResponseEntity.ok(clusterService.getTopics(clusterName, Optional.ofNullable(page), Optional.ofNullable(pageSize))));
}
@Override
public Mono<ResponseEntity<TopicDetails>> getTopicDetails(String clusterName, String topicName, ServerWebExchange exchange) {
return Mono.just(
clusterService.getTopicDetails(clusterName, topicName)
.map(ResponseEntity::ok)
.orElse(ResponseEntity.notFound().build())
);
}
@Override
public Mono<ResponseEntity<Flux<TopicConfig>>> getTopicConfigs(String clusterName, String topicName, ServerWebExchange exchange) {
return Mono.just(
clusterService.getTopicConfigs(clusterName, topicName)
.map(Flux::fromIterable)
.map(ResponseEntity::ok)
.orElse(ResponseEntity.notFound().build())
);
}
@Override
public Mono<ResponseEntity<Flux<TopicMessage>>> getTopicMessages(String clusterName, String topicName, @Valid SeekType seekType, @Valid List<String> seekTo, @Valid Integer limit, @Valid String q, ServerWebExchange exchange) {
return parseConsumerPosition(seekType, seekTo)
.map(consumerPosition -> ResponseEntity.ok(clusterService.getMessages(clusterName, topicName, consumerPosition, q, limit)));
}
@Override
public Mono<ResponseEntity<Topic>> createTopic(String clusterName, @Valid Mono<TopicFormData> topicFormData, ServerWebExchange exchange) {
return clusterService.createTopic(clusterName, topicFormData)
.map(s -> new ResponseEntity<>(s, HttpStatus.OK))
.switchIfEmpty(Mono.just(ResponseEntity.notFound().build()));
}
@Override
public Mono<ResponseEntity<Flux<Broker>>> getBrokers(String clusterName, ServerWebExchange exchange) {
return Mono.just(ResponseEntity.ok(clusterService.getBrokers(clusterName)));
}
@Override
public Mono<ResponseEntity<Flux<ConsumerGroup>>> getConsumerGroups(String clusterName, ServerWebExchange exchange) {
return clusterService.getConsumerGroups(clusterName)
.map(Flux::fromIterable)
.map(ResponseEntity::ok)
.switchIfEmpty(Mono.just(ResponseEntity.notFound().build())); // TODO: check behaviour on cluster not found and empty groups list
}
@Override
public Mono<ResponseEntity<SchemaSubject>> getLatestSchema(String clusterName, String subject, ServerWebExchange exchange) {
return schemaRegistryService.getLatestSchemaVersionBySubject(clusterName, subject).map(ResponseEntity::ok);
}
@Override
public Mono<ResponseEntity<SchemaSubject>> getSchemaByVersion(String clusterName, String subject, Integer version, ServerWebExchange exchange) {
return schemaRegistryService.getSchemaSubjectByVersion(clusterName, subject, version).map(ResponseEntity::ok);
}
@Override
public Mono<ResponseEntity<Flux<SchemaSubject>>> getSchemas(String clusterName, ServerWebExchange exchange) {
Flux<SchemaSubject> subjects = schemaRegistryService.getAllLatestVersionSchemas(clusterName);
return Mono.just(ResponseEntity.ok(subjects));
}
@Override
public Mono<ResponseEntity<Flux<SchemaSubject>>> getAllVersionsBySubject(String clusterName, String subjectName, ServerWebExchange exchange) {
Flux<SchemaSubject> schemas = schemaRegistryService.getAllVersionsBySubject(clusterName, subjectName);
return Mono.just(ResponseEntity.ok(schemas));
}
@Override
public Mono<ResponseEntity<Void>> deleteLatestSchema(String clusterName, String subject, ServerWebExchange exchange) {
return schemaRegistryService.deleteLatestSchemaSubject(clusterName, subject);
}
@Override
public Mono<ResponseEntity<Void>> deleteSchemaByVersion(String clusterName, String subjectName, Integer version, ServerWebExchange exchange) {
return schemaRegistryService.deleteSchemaSubjectByVersion(clusterName, subjectName, version);
}
@Override
public Mono<ResponseEntity<Void>> deleteSchema(String clusterName, String subjectName, ServerWebExchange exchange) {
return schemaRegistryService.deleteSchemaSubjectEntirely(clusterName, subjectName);
}
@Override
public Mono<ResponseEntity<SchemaSubject>> createNewSchema(String clusterName,
@Valid Mono<NewSchemaSubject> newSchemaSubject,
ServerWebExchange exchange) {
return schemaRegistryService
.registerNewSchema(clusterName, newSchemaSubject)
.map(ResponseEntity::ok);
}
@Override
public Mono<ResponseEntity<ConsumerGroupDetails>> getConsumerGroup(String clusterName, String consumerGroupId, ServerWebExchange exchange) {
return clusterService.getConsumerGroupDetail(clusterName, consumerGroupId).map(ResponseEntity::ok);
}
@Override
public Mono<ResponseEntity<Topic>> updateTopic(String clusterId, String topicName, @Valid Mono<TopicFormData> topicFormData, ServerWebExchange exchange) {
return clusterService.updateTopic(clusterId, topicName, topicFormData).map(ResponseEntity::ok);
}
@Override
public Mono<ResponseEntity<Void>> deleteTopic(String clusterName, String topicName, ServerWebExchange exchange) {
return clusterService.deleteTopic(clusterName, topicName).map(ResponseEntity::ok);
}
@Override
public Mono<ResponseEntity<CompatibilityLevel>> getGlobalSchemaCompatibilityLevel(String clusterName, ServerWebExchange exchange) {
return schemaRegistryService.getGlobalSchemaCompatibilityLevel(clusterName)
.map(ResponseEntity::ok)
.defaultIfEmpty(ResponseEntity.notFound().build());
}
@Override
public Mono<ResponseEntity<Void>> updateGlobalSchemaCompatibilityLevel(String clusterName, @Valid Mono<CompatibilityLevel> compatibilityLevel, ServerWebExchange exchange) {
log.info("Updating schema compatibility globally");
return schemaRegistryService.updateSchemaCompatibility(clusterName, compatibilityLevel)
.map(ResponseEntity::ok);
}
@Override
public Mono<ResponseEntity<CompatibilityCheckResponse>> checkSchemaCompatibility(String clusterName, String subject,
@Valid Mono<NewSchemaSubject> newSchemaSubject,
ServerWebExchange exchange) {
return schemaRegistryService.checksSchemaCompatibility(clusterName, subject, newSchemaSubject)
.map(ResponseEntity::ok);
}
@Override
public Mono<ResponseEntity<Void>> updateSchemaCompatibilityLevel(String clusterName, String subject, @Valid Mono<CompatibilityLevel> compatibilityLevel, ServerWebExchange exchange) {
log.info("Updating schema compatibility for subject: {}", subject);
return schemaRegistryService.updateSchemaCompatibility(clusterName, subject, compatibilityLevel)
.map(ResponseEntity::ok);
}
private Mono<ConsumerPosition> parseConsumerPosition(SeekType seekType, List<String> seekTo) {
return Mono.justOrEmpty(seekTo)
.defaultIfEmpty(Collections.emptyList())
.flatMapIterable(Function.identity())
.map(p -> {
String[] splited = p.split("::");
if (splited.length != 2) {
throw new IllegalArgumentException("Wrong seekTo argument format. See API docs for details");
}
return Pair.of(Integer.parseInt(splited[0]), Long.parseLong(splited[1]));
})
.collectMap(Pair::getKey, Pair::getValue)
.map(positions -> new ConsumerPosition(seekType != null ? seekType : SeekType.BEGINNING, positions));
}
}

View file

@ -1,13 +1,11 @@
package com.provectus.kafka.ui.cluster.service; package com.provectus.kafka.ui.service;
import com.provectus.kafka.ui.cluster.exception.NotFoundException; import com.provectus.kafka.ui.exception.NotFoundException;
import com.provectus.kafka.ui.cluster.mapper.ClusterMapper; import com.provectus.kafka.ui.mapper.ClusterMapper;
import com.provectus.kafka.ui.cluster.model.ClustersStorage; import com.provectus.kafka.ui.model.ConsumerPosition;
import com.provectus.kafka.ui.cluster.model.ConsumerPosition; import com.provectus.kafka.ui.model.InternalTopic;
import com.provectus.kafka.ui.cluster.model.InternalTopic; import com.provectus.kafka.ui.model.KafkaCluster;
import com.provectus.kafka.ui.cluster.model.KafkaCluster; import com.provectus.kafka.ui.util.ClusterUtil;
import com.provectus.kafka.ui.cluster.util.ClusterUtil;
import com.provectus.kafka.ui.kafka.KafkaService;
import com.provectus.kafka.ui.model.*; import com.provectus.kafka.ui.model.*;
import lombok.RequiredArgsConstructor; import lombok.RequiredArgsConstructor;
import lombok.SneakyThrows; import lombok.SneakyThrows;
@ -206,4 +204,14 @@ public class ClusterService {
.map(c -> consumingService.loadMessages(c, topicName, consumerPosition, query, limit)) .map(c -> consumingService.loadMessages(c, topicName, consumerPosition, query, limit))
.orElse(Flux.empty()); .orElse(Flux.empty());
} }
public Mono<Void> deleteTopicMessages(String clusterName, String topicName, List<Integer> partitions) {
var cluster = clustersStorage.getClusterByName(clusterName)
.orElseThrow(() -> new NotFoundException("No such cluster"));
if (!cluster.getTopics().containsKey(topicName)) {
throw new NotFoundException("No such topic");
}
return consumingService.loadOffsets(cluster, topicName, partitions)
.flatMap(offsets -> kafkaService.deleteTopicMessages(cluster, offsets));
}
} }

View file

@ -1,7 +1,5 @@
package com.provectus.kafka.ui.cluster; package com.provectus.kafka.ui.service;
import com.provectus.kafka.ui.cluster.model.ClustersStorage;
import com.provectus.kafka.ui.cluster.service.MetricsUpdateService;
import lombok.RequiredArgsConstructor; import lombok.RequiredArgsConstructor;
import lombok.extern.log4j.Log4j2; import lombok.extern.log4j.Log4j2;
import org.springframework.scheduling.annotation.Scheduled; import org.springframework.scheduling.annotation.Scheduled;

View file

@ -1,7 +1,8 @@
package com.provectus.kafka.ui.cluster.model; package com.provectus.kafka.ui.service;
import com.provectus.kafka.ui.cluster.config.ClustersProperties; import com.provectus.kafka.ui.config.ClustersProperties;
import com.provectus.kafka.ui.cluster.mapper.ClusterMapper; import com.provectus.kafka.ui.mapper.ClusterMapper;
import com.provectus.kafka.ui.model.KafkaCluster;
import lombok.RequiredArgsConstructor; import lombok.RequiredArgsConstructor;
import org.mapstruct.factory.Mappers; import org.mapstruct.factory.Mappers;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;

View file

@ -1,13 +1,12 @@
package com.provectus.kafka.ui.cluster.service; package com.provectus.kafka.ui.service;
import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
import com.provectus.kafka.ui.cluster.deserialization.DeserializationService; import com.provectus.kafka.ui.deserialization.DeserializationService;
import com.provectus.kafka.ui.cluster.deserialization.RecordDeserializer; import com.provectus.kafka.ui.deserialization.RecordDeserializer;
import com.provectus.kafka.ui.cluster.model.ConsumerPosition; import com.provectus.kafka.ui.model.ConsumerPosition;
import com.provectus.kafka.ui.cluster.model.KafkaCluster; import com.provectus.kafka.ui.model.KafkaCluster;
import com.provectus.kafka.ui.cluster.util.ClusterUtil; import com.provectus.kafka.ui.util.ClusterUtil;
import com.provectus.kafka.ui.kafka.KafkaService;
import com.provectus.kafka.ui.model.SeekType; import com.provectus.kafka.ui.model.SeekType;
import com.provectus.kafka.ui.model.TopicMessage; import com.provectus.kafka.ui.model.TopicMessage;
import lombok.RequiredArgsConstructor; import lombok.RequiredArgsConstructor;
@ -21,6 +20,7 @@ import org.apache.kafka.common.utils.Bytes;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import reactor.core.publisher.Flux; import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink; import reactor.core.publisher.FluxSink;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers; import reactor.core.scheduler.Schedulers;
import java.time.Duration; import java.time.Duration;
@ -55,6 +55,25 @@ public class ConsumingService {
.limitRequest(recordsLimit); .limitRequest(recordsLimit);
} }
public Mono<Map<TopicPartition, Long>> loadOffsets(KafkaCluster cluster, String topicName, List<Integer> partitionsToInclude) {
return Mono.fromSupplier(() -> {
try (KafkaConsumer<Bytes, Bytes> consumer = kafkaService.createConsumer(cluster)) {
var partitions = consumer.partitionsFor(topicName).stream()
.filter(p -> partitionsToInclude.isEmpty() || partitionsToInclude.contains(p.partition()))
.map(p -> new TopicPartition(topicName, p.partition()))
.collect(Collectors.toList());
var beginningOffsets = consumer.beginningOffsets(partitions);
var endOffsets = consumer.endOffsets(partitions);
return endOffsets.entrySet().stream()
.filter(entry -> !beginningOffsets.get(entry.getKey()).equals(entry.getValue()))
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
} catch (Exception e) {
log.error("Error occurred while consuming records", e);
throw new RuntimeException(e);
}
});
}
private boolean filterTopicMessage(TopicMessage message, String query) { private boolean filterTopicMessage(TopicMessage message, String query) {
if (StringUtils.isEmpty(query)) { if (StringUtils.isEmpty(query)) {
return true; return true;

View file

@ -1,12 +1,11 @@
package com.provectus.kafka.ui.cluster.service; package com.provectus.kafka.ui.service;
import com.provectus.kafka.ui.cluster.client.KafkaConnectClients; import com.provectus.kafka.ui.client.KafkaConnectClients;
import com.provectus.kafka.ui.cluster.exception.NotFoundException; import com.provectus.kafka.ui.exception.NotFoundException;
import com.provectus.kafka.ui.cluster.mapper.ClusterMapper; import com.provectus.kafka.ui.mapper.ClusterMapper;
import com.provectus.kafka.ui.cluster.mapper.KafkaConnectMapper; import com.provectus.kafka.ui.mapper.KafkaConnectMapper;
import com.provectus.kafka.ui.cluster.model.ClustersStorage; import com.provectus.kafka.ui.model.KafkaCluster;
import com.provectus.kafka.ui.cluster.model.KafkaCluster; import com.provectus.kafka.ui.model.KafkaConnectCluster;
import com.provectus.kafka.ui.cluster.model.KafkaConnectCluster;
import com.provectus.kafka.ui.model.*; import com.provectus.kafka.ui.model.*;
import lombok.RequiredArgsConstructor; import lombok.RequiredArgsConstructor;
import lombok.extern.log4j.Log4j2; import lombok.extern.log4j.Log4j2;
@ -38,7 +37,6 @@ public class KafkaConnectService {
); );
} }
public Flux<String> getConnectors(String clusterName, String connectName) { public Flux<String> getConnectors(String clusterName, String connectName) {
return getConnectAddress(clusterName, connectName) return getConnectAddress(clusterName, connectName)
.flatMapMany(connect -> .flatMapMany(connect ->

View file

@ -1,15 +1,14 @@
package com.provectus.kafka.ui.kafka; package com.provectus.kafka.ui.service;
import com.provectus.kafka.ui.cluster.model.*; import com.provectus.kafka.ui.model.*;
import com.provectus.kafka.ui.cluster.util.ClusterUtil; import com.provectus.kafka.ui.util.ClusterUtil;
import com.provectus.kafka.ui.cluster.util.JmxClusterUtil; import com.provectus.kafka.ui.util.JmxClusterUtil;
import com.provectus.kafka.ui.cluster.util.JmxMetricsName; import com.provectus.kafka.ui.util.JmxMetricsName;
import com.provectus.kafka.ui.cluster.util.JmxMetricsValueName; import com.provectus.kafka.ui.util.JmxMetricsValueName;
import com.provectus.kafka.ui.model.ConsumerGroup; import com.provectus.kafka.ui.model.ConsumerGroup;
import com.provectus.kafka.ui.model.Metric; import com.provectus.kafka.ui.model.Metric;
import com.provectus.kafka.ui.model.ServerStatus; import com.provectus.kafka.ui.model.ServerStatus;
import com.provectus.kafka.ui.model.TopicFormData; import com.provectus.kafka.ui.model.TopicFormData;
import com.provectus.kafka.ui.zookeeper.ZookeeperService;
import lombok.RequiredArgsConstructor; import lombok.RequiredArgsConstructor;
import lombok.SneakyThrows; import lombok.SneakyThrows;
import lombok.extern.log4j.Log4j2; import lombok.extern.log4j.Log4j2;
@ -490,4 +489,12 @@ public class KafkaService {
return Collections.emptyMap(); return Collections.emptyMap();
} }
} }
public Mono<Void> deleteTopicMessages(KafkaCluster cluster, Map<TopicPartition, Long> offsets) {
var records = offsets.entrySet().stream()
.map(entry -> Map.entry(entry.getKey(), RecordsToDelete.beforeOffset(entry.getValue())))
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
return getOrCreateAdminClient(cluster).map(ExtendedAdminClient::getAdminClient)
.map(ac -> ac.deleteRecords(records)).then();
}
} }

View file

@ -1,7 +1,6 @@
package com.provectus.kafka.ui.cluster.service; package com.provectus.kafka.ui.service;
import com.provectus.kafka.ui.cluster.model.KafkaCluster; import com.provectus.kafka.ui.model.KafkaCluster;
import com.provectus.kafka.ui.kafka.KafkaService;
import lombok.RequiredArgsConstructor; import lombok.RequiredArgsConstructor;
import lombok.extern.log4j.Log4j2; import lombok.extern.log4j.Log4j2;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;

View file

@ -1,15 +1,14 @@
package com.provectus.kafka.ui.cluster.service; package com.provectus.kafka.ui.service;
import com.provectus.kafka.ui.cluster.exception.DuplicateEntityException; import com.provectus.kafka.ui.exception.DuplicateEntityException;
import com.provectus.kafka.ui.cluster.exception.NotFoundException; import com.provectus.kafka.ui.exception.NotFoundException;
import com.provectus.kafka.ui.cluster.exception.UnprocessableEntityException; import com.provectus.kafka.ui.exception.UnprocessableEntityException;
import com.provectus.kafka.ui.cluster.mapper.ClusterMapper; import com.provectus.kafka.ui.mapper.ClusterMapper;
import com.provectus.kafka.ui.cluster.model.ClustersStorage; import com.provectus.kafka.ui.model.KafkaCluster;
import com.provectus.kafka.ui.cluster.model.KafkaCluster; import com.provectus.kafka.ui.model.schemaregistry.InternalCompatibilityCheck;
import com.provectus.kafka.ui.cluster.model.schemaregistry.InternalCompatibilityCheck; import com.provectus.kafka.ui.model.schemaregistry.InternalCompatibilityLevel;
import com.provectus.kafka.ui.cluster.model.schemaregistry.InternalCompatibilityLevel; import com.provectus.kafka.ui.model.schemaregistry.InternalNewSchema;
import com.provectus.kafka.ui.cluster.model.schemaregistry.InternalNewSchema; import com.provectus.kafka.ui.model.schemaregistry.SubjectIdResponse;
import com.provectus.kafka.ui.cluster.model.schemaregistry.SubjectIdResponse;
import com.provectus.kafka.ui.model.*; import com.provectus.kafka.ui.model.*;
import lombok.RequiredArgsConstructor; import lombok.RequiredArgsConstructor;
import lombok.extern.log4j.Log4j2; import lombok.extern.log4j.Log4j2;

View file

@ -1,13 +1,12 @@
package com.provectus.kafka.ui.zookeeper; package com.provectus.kafka.ui.service;
import com.provectus.kafka.ui.cluster.model.KafkaCluster; import com.provectus.kafka.ui.model.KafkaCluster;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import lombok.RequiredArgsConstructor; import lombok.RequiredArgsConstructor;
import lombok.extern.log4j.Log4j2; import lombok.extern.log4j.Log4j2;
import org.I0Itec.zkclient.ZkClient; import org.I0Itec.zkclient.ZkClient;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import java.util.HashMap;
import java.util.Map; import java.util.Map;
@Service @Service

View file

@ -1,7 +1,6 @@
package com.provectus.kafka.ui.cluster.util; package com.provectus.kafka.ui.util;
import com.provectus.kafka.ui.cluster.deserialization.RecordDeserializer; import com.provectus.kafka.ui.deserialization.RecordDeserializer;
import com.provectus.kafka.ui.cluster.model.*;
import com.provectus.kafka.ui.model.*; import com.provectus.kafka.ui.model.*;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.admin.*; import org.apache.kafka.clients.admin.*;
@ -22,7 +21,7 @@ import java.util.*;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import java.util.stream.Stream; import java.util.stream.Stream;
import static com.provectus.kafka.ui.kafka.KafkaConstants.TOPIC_DEFAULT_CONFIGS; import static com.provectus.kafka.ui.util.KafkaConstants.TOPIC_DEFAULT_CONFIGS;
import static org.apache.kafka.common.config.TopicConfig.MESSAGE_FORMAT_VERSION_CONFIG; import static org.apache.kafka.common.config.TopicConfig.MESSAGE_FORMAT_VERSION_CONFIG;
@Slf4j @Slf4j

View file

@ -1,4 +1,4 @@
package com.provectus.kafka.ui.cluster.util; package com.provectus.kafka.ui.util;
import com.provectus.kafka.ui.model.Metric; import com.provectus.kafka.ui.model.Metric;
import lombok.RequiredArgsConstructor; import lombok.RequiredArgsConstructor;

View file

@ -1,4 +1,4 @@
package com.provectus.kafka.ui.cluster.util; package com.provectus.kafka.ui.util;
public enum JmxMetricsName { public enum JmxMetricsName {
MessagesInPerSec, MessagesInPerSec,

View file

@ -1,4 +1,4 @@
package com.provectus.kafka.ui.cluster.util; package com.provectus.kafka.ui.util;
public enum JmxMetricsValueName { public enum JmxMetricsValueName {
Count, Count,

View file

@ -1,4 +1,4 @@
package com.provectus.kafka.ui.cluster.util; package com.provectus.kafka.ui.util;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.apache.commons.pool2.BaseKeyedPooledObjectFactory; import org.apache.commons.pool2.BaseKeyedPooledObjectFactory;

View file

@ -1,4 +1,4 @@
package com.provectus.kafka.ui.kafka; package com.provectus.kafka.ui.util;
import java.util.AbstractMap; import java.util.AbstractMap;
import java.util.Map; import java.util.Map;

View file

@ -1,5 +1,7 @@
package com.provectus.kafka.ui; package com.provectus.kafka.ui;
import com.provectus.kafka.ui.container.KafkaConnectContainer;
import com.provectus.kafka.ui.container.SchemaRegistryContainer;
import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.NotNull;
import org.junit.jupiter.api.extension.ExtendWith; import org.junit.jupiter.api.extension.ExtendWith;
import org.springframework.boot.test.context.SpringBootTest; import org.springframework.boot.test.context.SpringBootTest;

View file

@ -0,0 +1,79 @@
package com.provectus.kafka.ui;
import com.provectus.kafka.ui.model.TopicFormData;
import com.provectus.kafka.ui.model.TopicMessage;
import com.provectus.kafka.ui.producer.KafkaTestProducer;
import lombok.extern.log4j.Log4j2;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.autoconfigure.web.reactive.AutoConfigureWebTestClient;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.web.reactive.server.WebTestClient;
import java.util.Map;
import java.util.UUID;
import java.util.stream.Stream;
@ContextConfiguration(initializers = {AbstractBaseTest.Initializer.class})
@Log4j2
@AutoConfigureWebTestClient(timeout = "60000")
public class KafkaConsumerTests extends AbstractBaseTest {
@Autowired
private WebTestClient webTestClient;
@Test
public void shouldDeleteRecords() {
var topicName = UUID.randomUUID().toString();
webTestClient.post()
.uri("/api/clusters/{clusterName}/topics", LOCAL)
.bodyValue(new TopicFormData()
.name(topicName)
.partitions(1)
.replicationFactor(1)
.configs(Map.of())
)
.exchange()
.expectStatus()
.isOk();
try(KafkaTestProducer<String, String> producer = KafkaTestProducer.forKafka(kafka)) {
Stream.of("one", "two", "three", "four")
.forEach(value -> producer.send(topicName, value));
}
webTestClient.get()
.uri("/api/clusters/{clusterName}/topics/{topicName}/messages", LOCAL, topicName)
.exchange()
.expectStatus()
.isOk()
.expectBodyList(TopicMessage.class)
.hasSize(4);
webTestClient.delete()
.uri("/api/clusters/{clusterName}/topics/{topicName}/messages", LOCAL, topicName)
.exchange()
.expectStatus()
.isOk();
webTestClient.get()
.uri("/api/clusters/{clusterName}/topics/{topicName}/messages", LOCAL, topicName)
.exchange()
.expectStatus()
.isOk()
.expectBodyList(TopicMessage.class)
.hasSize(0);
}
@Test
public void shouldReturn404ForNonExistingTopic() {
var topicName = UUID.randomUUID().toString();
webTestClient.delete()
.uri("/api/clusters/{clusterName}/topics/{topicName}/messages", LOCAL, topicName)
.exchange()
.expectStatus()
.isNotFound();
}
}

View file

@ -65,7 +65,7 @@ class SchemaRegistryServiceTests extends AbstractBaseTest {
.post() .post()
.uri("/api/clusters/{clusterName}/schemas", LOCAL) .uri("/api/clusters/{clusterName}/schemas", LOCAL)
.contentType(MediaType.APPLICATION_JSON) .contentType(MediaType.APPLICATION_JSON)
.body(BodyInserters.fromValue(schema.formatted(subject))) .body(BodyInserters.fromValue(String.format(schema, subject)))
.exchange() .exchange()
.expectStatus().isBadRequest(); .expectStatus().isBadRequest();
} }
@ -78,7 +78,7 @@ class SchemaRegistryServiceTests extends AbstractBaseTest {
.post() .post()
.uri("/api/clusters/{clusterName}/schemas", LOCAL) .uri("/api/clusters/{clusterName}/schemas", LOCAL)
.contentType(MediaType.APPLICATION_JSON) .contentType(MediaType.APPLICATION_JSON)
.body(BodyInserters.fromValue(schema.formatted(subject))) .body(BodyInserters.fromValue(String.format(schema, subject)))
.exchange() .exchange()
.expectStatus().isEqualTo(HttpStatus.OK); .expectStatus().isEqualTo(HttpStatus.OK);
@ -86,7 +86,7 @@ class SchemaRegistryServiceTests extends AbstractBaseTest {
.post() .post()
.uri("/api/clusters/{clusterName}/schemas", LOCAL) .uri("/api/clusters/{clusterName}/schemas", LOCAL)
.contentType(MediaType.APPLICATION_JSON) .contentType(MediaType.APPLICATION_JSON)
.body(BodyInserters.fromValue(schema.formatted(subject))) .body(BodyInserters.fromValue(String.format(schema, subject)))
.exchange() .exchange()
.expectStatus().isEqualTo(HttpStatus.CONFLICT); .expectStatus().isEqualTo(HttpStatus.CONFLICT);
} }
@ -201,7 +201,12 @@ class SchemaRegistryServiceTests extends AbstractBaseTest {
.post() .post()
.uri("/api/clusters/{clusterName}/schemas", LOCAL) .uri("/api/clusters/{clusterName}/schemas", LOCAL)
.contentType(MediaType.APPLICATION_JSON) .contentType(MediaType.APPLICATION_JSON)
.body(BodyInserters.fromValue("{\"subject\":\"%s\",\"schemaType\":\"AVRO\",\"schema\":\"{\\\"type\\\": \\\"string\\\"}\"}".formatted(subject))) .body(BodyInserters.fromValue(
String.format(
"{\"subject\":\"%s\",\"schemaType\":\"AVRO\",\"schema\":\"{\\\"type\\\": \\\"string\\\"}\"}",
subject
)
))
.exchange() .exchange()
.expectStatus().isOk() .expectStatus().isOk()
.expectBody(SchemaSubject.class) .expectBody(SchemaSubject.class)

View file

@ -1,4 +1,4 @@
package com.provectus.kafka.ui; package com.provectus.kafka.ui.container;
import org.testcontainers.containers.GenericContainer; import org.testcontainers.containers.GenericContainer;
import org.testcontainers.containers.KafkaContainer; import org.testcontainers.containers.KafkaContainer;

View file

@ -1,4 +1,4 @@
package com.provectus.kafka.ui; package com.provectus.kafka.ui.container;
import org.testcontainers.containers.GenericContainer; import org.testcontainers.containers.GenericContainer;
import org.testcontainers.containers.KafkaContainer; import org.testcontainers.containers.KafkaContainer;

View file

@ -1,7 +1,7 @@
package com.provectus.kafka.ui.cluster.deserialization; package com.provectus.kafka.ui.deserialization;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
import com.provectus.kafka.ui.cluster.model.KafkaCluster; import com.provectus.kafka.ui.model.KafkaCluster;
import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.common.utils.Bytes;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;

View file

@ -0,0 +1,35 @@
package com.provectus.kafka.ui.producer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import org.testcontainers.containers.KafkaContainer;
import java.util.Map;
public class KafkaTestProducer<KeyT, ValueT> implements AutoCloseable {
private final KafkaProducer<KeyT, ValueT> producer;
private KafkaTestProducer(KafkaProducer<KeyT, ValueT> producer) {
this.producer = producer;
}
public static KafkaTestProducer<String, String> forKafka(KafkaContainer kafkaContainer) {
return new KafkaTestProducer<>(new KafkaProducer<>(Map.of(
ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaContainer.getBootstrapServers(),
ProducerConfig.CLIENT_ID_CONFIG, "KafkaTestProducer",
ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class,
ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class
)));
}
public void send(String topic, ValueT value) {
producer.send(new ProducerRecord<>(topic, value));
}
@Override
public void close() {
producer.close();
}
}

View file

@ -17,7 +17,7 @@ paths:
/connectors: /connectors:
get: get:
tags: tags:
- /connect - KafkaConnectClient
summary: get all connectors from Kafka Connect service summary: get all connectors from Kafka Connect service
operationId: getConnectors operationId: getConnectors
responses: responses:
@ -31,7 +31,7 @@ paths:
type: string type: string
post: post:
tags: tags:
- /connect - KafkaConnectClient
summary: create new connector summary: create new connector
operationId: createConnector operationId: createConnector
requestBody: requestBody:
@ -56,7 +56,7 @@ paths:
/connectors/{connectorName}: /connectors/{connectorName}:
get: get:
tags: tags:
- /connect - KafkaConnectClient
summary: get information about the connector summary: get information about the connector
operationId: getConnector operationId: getConnector
parameters: parameters:
@ -74,7 +74,7 @@ paths:
$ref: '#/components/schemas/Connector' $ref: '#/components/schemas/Connector'
delete: delete:
tags: tags:
- /connect - KafkaConnectClient
summary: delete connector summary: delete connector
operationId: deleteConnector operationId: deleteConnector
parameters: parameters:
@ -92,7 +92,7 @@ paths:
/connectors/{connectorName}/config: /connectors/{connectorName}/config:
get: get:
tags: tags:
- /connect - KafkaConnectClient
summary: get connector configuration summary: get connector configuration
operationId: getConnectorConfig operationId: getConnectorConfig
parameters: parameters:
@ -110,7 +110,7 @@ paths:
$ref: '#/components/schemas/ConnectorConfig' $ref: '#/components/schemas/ConnectorConfig'
put: put:
tags: tags:
- /connect - KafkaConnectClient
summary: update or create connector with provided config summary: update or create connector with provided config
operationId: setConnectorConfig operationId: setConnectorConfig
parameters: parameters:
@ -142,7 +142,7 @@ paths:
/connectors/{connectorName}/status: /connectors/{connectorName}/status:
get: get:
tags: tags:
- /connect - KafkaConnectClient
summary: get connector status summary: get connector status
operationId: getConnectorStatus operationId: getConnectorStatus
parameters: parameters:
@ -162,7 +162,7 @@ paths:
/connectors/{connectorName}/restart: /connectors/{connectorName}/restart:
post: post:
tags: tags:
- /connect - KafkaConnectClient
summary: restart the connector summary: restart the connector
operationId: restartConnector operationId: restartConnector
parameters: parameters:
@ -180,7 +180,7 @@ paths:
/connectors/{connectorName}/pause: /connectors/{connectorName}/pause:
put: put:
tags: tags:
- /connect - KafkaConnectClient
summary: pause the connector summary: pause the connector
operationId: pauseConnector operationId: pauseConnector
parameters: parameters:
@ -196,7 +196,7 @@ paths:
/connectors/{connectorName}/resume: /connectors/{connectorName}/resume:
put: put:
tags: tags:
- /connect - KafkaConnectClient
summary: resume the connector summary: resume the connector
operationId: resumeConnector operationId: resumeConnector
parameters: parameters:
@ -212,7 +212,7 @@ paths:
/connectors/{connectorName}/tasks: /connectors/{connectorName}/tasks:
get: get:
tags: tags:
- /connect - KafkaConnectClient
summary: get connector tasks summary: get connector tasks
operationId: getConnectorTasks operationId: getConnectorTasks
parameters: parameters:
@ -234,7 +234,7 @@ paths:
/connectors/{connectorName}/tasks/{taskId}/status: /connectors/{connectorName}/tasks/{taskId}/status:
get: get:
tags: tags:
- /connect - KafkaConnectClient
summary: get connector task status summary: get connector task status
operationId: getConnectorTaskStatus operationId: getConnectorTaskStatus
parameters: parameters:
@ -259,7 +259,7 @@ paths:
/connectors/{connectorName}/tasks/{taskId}/restart: /connectors/{connectorName}/tasks/{taskId}/restart:
post: post:
tags: tags:
- /connect - KafkaConnectClient
summary: restart connector task summary: restart connector task
operationId: restartConnectorTask operationId: restartConnectorTask
parameters: parameters:
@ -280,7 +280,7 @@ paths:
/connector-plugins: /connector-plugins:
get: get:
tags: tags:
- /connect - KafkaConnectClient
summary: get connector plugins summary: get connector plugins
operationId: getConnectorPlugins operationId: getConnectorPlugins
responses: responses:
@ -296,7 +296,7 @@ paths:
/connector-plugins/{pluginName}/config/validate: /connector-plugins/{pluginName}/config/validate:
put: put:
tags: tags:
- /connect - KafkaConnectClient
summary: validate connector plugin configuration summary: validate connector plugin configuration
operationId: validateConnectorPluginConfig operationId: validateConnectorPluginConfig
parameters: parameters:

View file

@ -18,7 +18,7 @@ paths:
/api/clusters: /api/clusters:
get: get:
tags: tags:
- /api/clusters - Clusters
summary: getClusters summary: getClusters
operationId: getClusters operationId: getClusters
responses: responses:
@ -34,7 +34,7 @@ paths:
/api/clusters/{clusterName}/brokers: /api/clusters/{clusterName}/brokers:
get: get:
tags: tags:
- /api/clusters - Brokers
summary: getBrokers summary: getBrokers
operationId: getBrokers operationId: getBrokers
parameters: parameters:
@ -56,7 +56,7 @@ paths:
/api/clusters/{clusterName}/metrics: /api/clusters/{clusterName}/metrics:
get: get:
tags: tags:
- /api/clusters - Clusters
summary: getClusterMetrics summary: getClusterMetrics
operationId: getClusterMetrics operationId: getClusterMetrics
parameters: parameters:
@ -76,7 +76,7 @@ paths:
/api/clusters/{clusterName}/stats: /api/clusters/{clusterName}/stats:
get: get:
tags: tags:
- /api/clusters - Clusters
summary: getClusterStats summary: getClusterStats
operationId: getClusterStats operationId: getClusterStats
parameters: parameters:
@ -96,7 +96,7 @@ paths:
/api/clusters/{clusterName}/brokers/{id}/metrics: /api/clusters/{clusterName}/brokers/{id}/metrics:
get: get:
tags: tags:
- /api/clusters - Brokers
summary: getBrokersMetrics summary: getBrokersMetrics
operationId: getBrokersMetrics operationId: getBrokersMetrics
parameters: parameters:
@ -121,7 +121,7 @@ paths:
/api/clusters/{clusterName}/topics: /api/clusters/{clusterName}/topics:
get: get:
tags: tags:
- /api/clusters - Topics
summary: getTopics summary: getTopics
operationId: getTopics operationId: getTopics
parameters: parameters:
@ -149,7 +149,7 @@ paths:
$ref: '#/components/schemas/TopicsResponse' $ref: '#/components/schemas/TopicsResponse'
post: post:
tags: tags:
- /api/clusters - Topics
summary: createTopic summary: createTopic
operationId: createTopic operationId: createTopic
parameters: parameters:
@ -174,7 +174,7 @@ paths:
/api/clusters/{clusterName}/topics/{topicName}: /api/clusters/{clusterName}/topics/{topicName}:
get: get:
tags: tags:
- /api/clusters - Topics
summary: getTopicDetails summary: getTopicDetails
operationId: getTopicDetails operationId: getTopicDetails
parameters: parameters:
@ -197,7 +197,7 @@ paths:
$ref: '#/components/schemas/TopicDetails' $ref: '#/components/schemas/TopicDetails'
patch: patch:
tags: tags:
- /api/clusters - Topics
summary: updateTopic summary: updateTopic
operationId: updateTopic operationId: updateTopic
parameters: parameters:
@ -225,7 +225,7 @@ paths:
$ref: '#/components/schemas/Topic' $ref: '#/components/schemas/Topic'
delete: delete:
tags: tags:
- /api/clusters - Topics
summary: deleteTopic summary: deleteTopic
operationId: deleteTopic operationId: deleteTopic
parameters: parameters:
@ -248,7 +248,7 @@ paths:
/api/clusters/{clusterName}/topics/{topicName}/config: /api/clusters/{clusterName}/topics/{topicName}/config:
get: get:
tags: tags:
- /api/clusters - Topics
summary: getTopicConfigs summary: getTopicConfigs
operationId: getTopicConfigs operationId: getTopicConfigs
parameters: parameters:
@ -275,7 +275,7 @@ paths:
/api/clusters/{clusterName}/topics/{topicName}/messages: /api/clusters/{clusterName}/topics/{topicName}/messages:
get: get:
tags: tags:
- /api/clusters - Messages
summary: getTopicMessages summary: getTopicMessages
operationId: getTopicMessages operationId: getTopicMessages
parameters: parameters:
@ -317,11 +317,39 @@ paths:
type: array type: array
items: items:
$ref: '#/components/schemas/TopicMessage' $ref: '#/components/schemas/TopicMessage'
delete:
tags:
- Messages
summary: deleteTopicMessages
operationId: deleteTopicMessages
parameters:
- name: clusterName
in: path
required: true
schema:
type: string
- name: topicName
in: path
required: true
schema:
type: string
- name: partitions
in: query
required: false
schema:
type: array
items:
type: integer
responses:
200:
description: OK
404:
description: Not found
/api/clusters/{clusterName}/consumer-groups/{id}: /api/clusters/{clusterName}/consumer-groups/{id}:
get: get:
tags: tags:
- /api/clusters - Consumer Groups
summary: get Consumer Group By Id summary: get Consumer Group By Id
operationId: getConsumerGroup operationId: getConsumerGroup
parameters: parameters:
@ -346,7 +374,7 @@ paths:
/api/clusters/{clusterName}/consumerGroups: /api/clusters/{clusterName}/consumerGroups:
get: get:
tags: tags:
- /api/clusters - Consumer Groups
summary: get all ConsumerGroups summary: get all ConsumerGroups
operationId: getConsumerGroups operationId: getConsumerGroups
parameters: parameters:
@ -368,7 +396,7 @@ paths:
/api/clusters/{clusterName}/schemas: /api/clusters/{clusterName}/schemas:
post: post:
tags: tags:
- /api/clusters - Schemas
summary: create a new subject schema summary: create a new subject schema
operationId: createNewSchema operationId: createNewSchema
parameters: parameters:
@ -397,7 +425,7 @@ paths:
description: Invalid parameters description: Invalid parameters
get: get:
tags: tags:
- /api/clusters - Schemas
summary: get all schemas of latest version from Schema Registry service summary: get all schemas of latest version from Schema Registry service
operationId: getSchemas operationId: getSchemas
parameters: parameters:
@ -419,7 +447,7 @@ paths:
/api/clusters/{clusterName}/schemas/{subject}: /api/clusters/{clusterName}/schemas/{subject}:
delete: delete:
tags: tags:
- /api/clusters - Schemas
summary: delete schema from Schema Registry service summary: delete schema from Schema Registry service
operationId: deleteSchema operationId: deleteSchema
parameters: parameters:
@ -442,7 +470,7 @@ paths:
/api/clusters/{clusterName}/schemas/{subject}/versions: /api/clusters/{clusterName}/schemas/{subject}/versions:
get: get:
tags: tags:
- /api/clusters - Schemas
summary: get all version of subject from Schema Registry service summary: get all version of subject from Schema Registry service
operationId: getAllVersionsBySubject operationId: getAllVersionsBySubject
parameters: parameters:
@ -469,7 +497,7 @@ paths:
/api/clusters/{clusterName}/schemas/{subject}/latest: /api/clusters/{clusterName}/schemas/{subject}/latest:
get: get:
tags: tags:
- /api/clusters - Schemas
summary: get the latest schema from Schema Registry service summary: get the latest schema from Schema Registry service
operationId: getLatestSchema operationId: getLatestSchema
parameters: parameters:
@ -492,7 +520,7 @@ paths:
$ref: '#/components/schemas/SchemaSubject' $ref: '#/components/schemas/SchemaSubject'
delete: delete:
tags: tags:
- /api/clusters - Schemas
summary: delete the latest schema from schema registry summary: delete the latest schema from schema registry
operationId: deleteLatestSchema operationId: deleteLatestSchema
parameters: parameters:
@ -516,7 +544,7 @@ paths:
/api/clusters/{clusterName}/schemas/{subject}/versions/{version}: /api/clusters/{clusterName}/schemas/{subject}/versions/{version}:
get: get:
tags: tags:
- /api/clusters - Schemas
summary: get schema by version from Schema Registry service summary: get schema by version from Schema Registry service
operationId: getSchemaByVersion operationId: getSchemaByVersion
parameters: parameters:
@ -544,7 +572,7 @@ paths:
$ref: '#/components/schemas/SchemaSubject' $ref: '#/components/schemas/SchemaSubject'
delete: delete:
tags: tags:
- /api/clusters - Schemas
summary: delete schema by version from schema registry summary: delete schema by version from schema registry
operationId: deleteSchemaByVersion operationId: deleteSchemaByVersion
parameters: parameters:
@ -572,7 +600,7 @@ paths:
/api/clusters/{clusterName}/schemas/compatibility: /api/clusters/{clusterName}/schemas/compatibility:
get: get:
tags: tags:
- /api/clusters - Schemas
summary: Get global schema compatibility level summary: Get global schema compatibility level
operationId: getGlobalSchemaCompatibilityLevel operationId: getGlobalSchemaCompatibilityLevel
parameters: parameters:
@ -590,7 +618,7 @@ paths:
$ref: '#/components/schemas/CompatibilityLevel' $ref: '#/components/schemas/CompatibilityLevel'
put: put:
tags: tags:
- /api/clusters - Schemas
summary: Update compatibility level globally summary: Update compatibility level globally
operationId: updateGlobalSchemaCompatibilityLevel operationId: updateGlobalSchemaCompatibilityLevel
parameters: parameters:
@ -613,7 +641,7 @@ paths:
/api/clusters/{clusterName}/schemas/{subject}/compatibility: /api/clusters/{clusterName}/schemas/{subject}/compatibility:
put: put:
tags: tags:
- /api/clusters - Schemas
summary: Update compatibility level for specific schema. summary: Update compatibility level for specific schema.
operationId: updateSchemaCompatibilityLevel operationId: updateSchemaCompatibilityLevel
parameters: parameters:
@ -641,7 +669,7 @@ paths:
/api/clusters/{clusterName}/schemas/{subject}/check: /api/clusters/{clusterName}/schemas/{subject}/check:
post: post:
tags: tags:
- /api/clusters - Schemas
summary: Check compatibility of the schema. summary: Check compatibility of the schema.
operationId: checkSchemaCompatibility operationId: checkSchemaCompatibility
parameters: parameters:
@ -673,7 +701,7 @@ paths:
/api/clusters/{clusterName}/connects: /api/clusters/{clusterName}/connects:
get: get:
tags: tags:
- /api/clusters/connects - Kafka Connect
summary: get all kafka connect instances summary: get all kafka connect instances
operationId: getConnects operationId: getConnects
parameters: parameters:
@ -695,7 +723,7 @@ paths:
/api/clusters/{clusterName}/connects/{connectName}/connectors: /api/clusters/{clusterName}/connects/{connectName}/connectors:
get: get:
tags: tags:
- /api/clusters/connects - Kafka Connect
summary: get all connectors from Kafka Connect service summary: get all connectors from Kafka Connect service
operationId: getConnectors operationId: getConnectors
parameters: parameters:
@ -720,7 +748,7 @@ paths:
type: string type: string
post: post:
tags: tags:
- /api/clusters/connects - Kafka Connect
summary: create new connector summary: create new connector
operationId: createConnector operationId: createConnector
parameters: parameters:
@ -752,7 +780,7 @@ paths:
/api/clusters/{clusterName}/connects/{connectName}/connectors/{connectorName}: /api/clusters/{clusterName}/connects/{connectName}/connectors/{connectorName}:
get: get:
tags: tags:
- /api/clusters/connects - Kafka Connect
summary: get information about the connector summary: get information about the connector
operationId: getConnector operationId: getConnector
parameters: parameters:
@ -780,7 +808,7 @@ paths:
$ref: '#/components/schemas/Connector' $ref: '#/components/schemas/Connector'
delete: delete:
tags: tags:
- /api/clusters/connects - Kafka Connect
summary: delete connector summary: delete connector
operationId: deleteConnector operationId: deleteConnector
parameters: parameters:
@ -808,7 +836,7 @@ paths:
/api/clusters/{clusterName}/connects/{connectName}/connectors/{connectorName}/action/{action}: /api/clusters/{clusterName}/connects/{connectName}/connectors/{connectorName}/action/{action}:
post: post:
tags: tags:
- /api/clusters/connects - Kafka Connect
summary: update connector state (restart, pause or resume) summary: update connector state (restart, pause or resume)
operationId: updateConnectorState operationId: updateConnectorState
parameters: parameters:
@ -841,7 +869,7 @@ paths:
/api/clusters/{clusterName}/connects/{connectName}/connectors/{connectorName}/config: /api/clusters/{clusterName}/connects/{connectName}/connectors/{connectorName}/config:
get: get:
tags: tags:
- /api/clusters/connects - Kafka Connect
summary: get connector configuration summary: get connector configuration
operationId: getConnectorConfig operationId: getConnectorConfig
parameters: parameters:
@ -869,7 +897,7 @@ paths:
$ref: '#/components/schemas/ConnectorConfig' $ref: '#/components/schemas/ConnectorConfig'
put: put:
tags: tags:
- /api/clusters/connects - Kafka Connect
summary: update or create connector with provided config summary: update or create connector with provided config
operationId: setConnectorConfig operationId: setConnectorConfig
parameters: parameters:
@ -906,7 +934,7 @@ paths:
/api/clusters/{clusterName}/connects/{connectName}/connectors/{connectorName}/tasks: /api/clusters/{clusterName}/connects/{connectName}/connectors/{connectorName}/tasks:
get: get:
tags: tags:
- /api/clusters/connects - Kafka Connect
summary: get connector tasks summary: get connector tasks
operationId: getConnectorTasks operationId: getConnectorTasks
parameters: parameters:
@ -938,7 +966,7 @@ paths:
/api/clusters/{clusterName}/connects/{connectName}/connectors/{connectorName}/tasks/{taskId}/action/restart: /api/clusters/{clusterName}/connects/{connectName}/connectors/{connectorName}/tasks/{taskId}/action/restart:
post: post:
tags: tags:
- /api/clusters/connects - Kafka Connect
summary: restart connector task summary: restart connector task
operationId: restartConnectorTask operationId: restartConnectorTask
parameters: parameters:
@ -969,7 +997,7 @@ paths:
/api/clusters/{clusterName}/connects/{connectName}/plugins: /api/clusters/{clusterName}/connects/{connectName}/plugins:
get: get:
tags: tags:
- /api/clusters/connects - Kafka Connect
summary: get connector plugins summary: get connector plugins
operationId: getConnectorPlugins operationId: getConnectorPlugins
parameters: parameters:
@ -996,7 +1024,7 @@ paths:
/api/clusters/{clusterName}/connects/{connectName}/plugins/{pluginName}/config/validate: /api/clusters/{clusterName}/connects/{connectName}/plugins/{pluginName}/config/validate:
put: put:
tags: tags:
- /api/clusters/connects - Kafka Connect
summary: validate connector plugin configuration summary: validate connector plugin configuration
operationId: validateConnectorPluginConfig operationId: validateConnectorPluginConfig
parameters: parameters:

View file

@ -1,5 +1,10 @@
import { import {
ApiClustersApi, ClustersApi,
BrokersApi,
TopicsApi,
ConsumerGroupsApi,
SchemasApi,
MessagesApi,
Configuration, Configuration,
Cluster, Cluster,
Topic, Topic,
@ -24,12 +29,17 @@ import { BASE_PARAMS } from 'lib/constants';
import * as actions from './actions'; import * as actions from './actions';
const apiClientConf = new Configuration(BASE_PARAMS); const apiClientConf = new Configuration(BASE_PARAMS);
export const apiClient = new ApiClustersApi(apiClientConf); export const clustersApiClient = new ClustersApi(apiClientConf);
export const brokersApiClient = new BrokersApi(apiClientConf);
export const topicsApiClient = new TopicsApi(apiClientConf);
export const consumerGroupsApiClient = new ConsumerGroupsApi(apiClientConf);
export const schemasApiClient = new SchemasApi(apiClientConf);
export const messagesApiClient = new MessagesApi(apiClientConf);
export const fetchClustersList = (): PromiseThunkResult => async (dispatch) => { export const fetchClustersList = (): PromiseThunkResult => async (dispatch) => {
dispatch(actions.fetchClusterListAction.request()); dispatch(actions.fetchClusterListAction.request());
try { try {
const clusters: Cluster[] = await apiClient.getClusters(); const clusters: Cluster[] = await clustersApiClient.getClusters();
dispatch(actions.fetchClusterListAction.success(clusters)); dispatch(actions.fetchClusterListAction.success(clusters));
} catch (e) { } catch (e) {
dispatch(actions.fetchClusterListAction.failure()); dispatch(actions.fetchClusterListAction.failure());
@ -41,7 +51,7 @@ export const fetchClusterStats = (
): PromiseThunkResult => async (dispatch) => { ): PromiseThunkResult => async (dispatch) => {
dispatch(actions.fetchClusterStatsAction.request()); dispatch(actions.fetchClusterStatsAction.request());
try { try {
const payload = await apiClient.getClusterStats({ clusterName }); const payload = await clustersApiClient.getClusterStats({ clusterName });
dispatch(actions.fetchClusterStatsAction.success(payload)); dispatch(actions.fetchClusterStatsAction.success(payload));
} catch (e) { } catch (e) {
dispatch(actions.fetchClusterStatsAction.failure()); dispatch(actions.fetchClusterStatsAction.failure());
@ -53,7 +63,7 @@ export const fetchClusterMetrics = (
): PromiseThunkResult => async (dispatch) => { ): PromiseThunkResult => async (dispatch) => {
dispatch(actions.fetchClusterMetricsAction.request()); dispatch(actions.fetchClusterMetricsAction.request());
try { try {
const payload = await apiClient.getClusterMetrics({ clusterName }); const payload = await clustersApiClient.getClusterMetrics({ clusterName });
dispatch(actions.fetchClusterMetricsAction.success(payload)); dispatch(actions.fetchClusterMetricsAction.success(payload));
} catch (e) { } catch (e) {
dispatch(actions.fetchClusterMetricsAction.failure()); dispatch(actions.fetchClusterMetricsAction.failure());
@ -65,7 +75,7 @@ export const fetchBrokers = (
): PromiseThunkResult => async (dispatch) => { ): PromiseThunkResult => async (dispatch) => {
dispatch(actions.fetchBrokersAction.request()); dispatch(actions.fetchBrokersAction.request());
try { try {
const payload = await apiClient.getBrokers({ clusterName }); const payload = await brokersApiClient.getBrokers({ clusterName });
dispatch(actions.fetchBrokersAction.success(payload)); dispatch(actions.fetchBrokersAction.success(payload));
} catch (e) { } catch (e) {
dispatch(actions.fetchBrokersAction.failure()); dispatch(actions.fetchBrokersAction.failure());
@ -78,7 +88,7 @@ export const fetchBrokerMetrics = (
): PromiseThunkResult => async (dispatch) => { ): PromiseThunkResult => async (dispatch) => {
dispatch(actions.fetchBrokerMetricsAction.request()); dispatch(actions.fetchBrokerMetricsAction.request());
try { try {
const payload = await apiClient.getBrokersMetrics({ const payload = await brokersApiClient.getBrokersMetrics({
clusterName, clusterName,
id: brokerId, id: brokerId,
}); });
@ -93,7 +103,7 @@ export const fetchTopicsList = (
): PromiseThunkResult => async (dispatch) => { ): PromiseThunkResult => async (dispatch) => {
dispatch(actions.fetchTopicsListAction.request()); dispatch(actions.fetchTopicsListAction.request());
try { try {
const topics = await apiClient.getTopics({ clusterName }); const topics = await topicsApiClient.getTopics({ clusterName });
dispatch(actions.fetchTopicsListAction.success(topics.topics || [])); dispatch(actions.fetchTopicsListAction.success(topics.topics || []));
} catch (e) { } catch (e) {
dispatch(actions.fetchTopicsListAction.failure()); dispatch(actions.fetchTopicsListAction.failure());
@ -107,7 +117,7 @@ export const fetchTopicMessages = (
): PromiseThunkResult => async (dispatch) => { ): PromiseThunkResult => async (dispatch) => {
dispatch(actions.fetchTopicMessagesAction.request()); dispatch(actions.fetchTopicMessagesAction.request());
try { try {
const messages = await apiClient.getTopicMessages({ const messages = await messagesApiClient.getTopicMessages({
clusterName, clusterName,
topicName, topicName,
...queryParams, ...queryParams,
@ -124,7 +134,7 @@ export const fetchTopicDetails = (
): PromiseThunkResult => async (dispatch) => { ): PromiseThunkResult => async (dispatch) => {
dispatch(actions.fetchTopicDetailsAction.request()); dispatch(actions.fetchTopicDetailsAction.request());
try { try {
const topicDetails = await apiClient.getTopicDetails({ const topicDetails = await topicsApiClient.getTopicDetails({
clusterName, clusterName,
topicName, topicName,
}); });
@ -145,7 +155,10 @@ export const fetchTopicConfig = (
): PromiseThunkResult => async (dispatch) => { ): PromiseThunkResult => async (dispatch) => {
dispatch(actions.fetchTopicConfigAction.request()); dispatch(actions.fetchTopicConfigAction.request());
try { try {
const config = await apiClient.getTopicConfigs({ clusterName, topicName }); const config = await topicsApiClient.getTopicConfigs({
clusterName,
topicName,
});
dispatch(actions.fetchTopicConfigAction.success({ topicName, config })); dispatch(actions.fetchTopicConfigAction.success({ topicName, config }));
} catch (e) { } catch (e) {
dispatch(actions.fetchTopicConfigAction.failure()); dispatch(actions.fetchTopicConfigAction.failure());
@ -194,7 +207,7 @@ export const createTopic = (
): PromiseThunkResult => async (dispatch) => { ): PromiseThunkResult => async (dispatch) => {
dispatch(actions.createTopicAction.request()); dispatch(actions.createTopicAction.request());
try { try {
const topic: Topic = await apiClient.createTopic({ const topic: Topic = await topicsApiClient.createTopic({
clusterName, clusterName,
topicFormData: formatTopicFormData(form), topicFormData: formatTopicFormData(form),
}); });
@ -210,7 +223,7 @@ export const updateTopic = (
): PromiseThunkResult => async (dispatch) => { ): PromiseThunkResult => async (dispatch) => {
dispatch(actions.updateTopicAction.request()); dispatch(actions.updateTopicAction.request());
try { try {
const topic: Topic = await apiClient.updateTopic({ const topic: Topic = await topicsApiClient.updateTopic({
clusterName, clusterName,
topicName: form.name, topicName: form.name,
topicFormData: formatTopicFormData(form), topicFormData: formatTopicFormData(form),
@ -226,7 +239,9 @@ export const fetchConsumerGroupsList = (
): PromiseThunkResult => async (dispatch) => { ): PromiseThunkResult => async (dispatch) => {
dispatch(actions.fetchConsumerGroupsAction.request()); dispatch(actions.fetchConsumerGroupsAction.request());
try { try {
const consumerGroups = await apiClient.getConsumerGroups({ clusterName }); const consumerGroups = await consumerGroupsApiClient.getConsumerGroups({
clusterName,
});
dispatch(actions.fetchConsumerGroupsAction.success(consumerGroups)); dispatch(actions.fetchConsumerGroupsAction.success(consumerGroups));
} catch (e) { } catch (e) {
dispatch(actions.fetchConsumerGroupsAction.failure()); dispatch(actions.fetchConsumerGroupsAction.failure());
@ -239,10 +254,12 @@ export const fetchConsumerGroupDetails = (
): PromiseThunkResult => async (dispatch) => { ): PromiseThunkResult => async (dispatch) => {
dispatch(actions.fetchConsumerGroupDetailsAction.request()); dispatch(actions.fetchConsumerGroupDetailsAction.request());
try { try {
const consumerGroupDetails = await apiClient.getConsumerGroup({ const consumerGroupDetails = await consumerGroupsApiClient.getConsumerGroup(
clusterName, {
id: consumerGroupID, clusterName,
}); id: consumerGroupID,
}
);
dispatch( dispatch(
actions.fetchConsumerGroupDetailsAction.success({ actions.fetchConsumerGroupDetailsAction.success({
consumerGroupID, consumerGroupID,
@ -259,7 +276,7 @@ export const fetchSchemasByClusterName = (
): PromiseThunkResult<void> => async (dispatch) => { ): PromiseThunkResult<void> => async (dispatch) => {
dispatch(actions.fetchSchemasByClusterNameAction.request()); dispatch(actions.fetchSchemasByClusterNameAction.request());
try { try {
const schemas = await apiClient.getSchemas({ clusterName }); const schemas = await schemasApiClient.getSchemas({ clusterName });
dispatch(actions.fetchSchemasByClusterNameAction.success(schemas)); dispatch(actions.fetchSchemasByClusterNameAction.success(schemas));
} catch (e) { } catch (e) {
dispatch(actions.fetchSchemasByClusterNameAction.failure()); dispatch(actions.fetchSchemasByClusterNameAction.failure());
@ -273,7 +290,7 @@ export const fetchSchemaVersions = (
if (!subject) return; if (!subject) return;
dispatch(actions.fetchSchemaVersionsAction.request()); dispatch(actions.fetchSchemaVersionsAction.request());
try { try {
const versions = await apiClient.getAllVersionsBySubject({ const versions = await schemasApiClient.getAllVersionsBySubject({
clusterName, clusterName,
subject, subject,
}); });
@ -289,7 +306,7 @@ export const createSchema = (
): PromiseThunkResult => async (dispatch) => { ): PromiseThunkResult => async (dispatch) => {
dispatch(actions.createSchemaAction.request()); dispatch(actions.createSchemaAction.request());
try { try {
const schema: SchemaSubject = await apiClient.createNewSchema({ const schema: SchemaSubject = await schemasApiClient.createNewSchema({
clusterName, clusterName,
newSchemaSubject, newSchemaSubject,
}); });