|
@@ -1,7 +1,5 @@
|
|
|
package com.provectus.kafka.ui.service;
|
|
|
|
|
|
-import static com.provectus.kafka.ui.util.Constants.DELETE_TOPIC_ENABLE;
|
|
|
-
|
|
|
import com.provectus.kafka.ui.exception.ClusterNotFoundException;
|
|
|
import com.provectus.kafka.ui.exception.IllegalEntityStateException;
|
|
|
import com.provectus.kafka.ui.exception.NotFoundException;
|
|
@@ -9,36 +7,35 @@ import com.provectus.kafka.ui.exception.TopicNotFoundException;
|
|
|
import com.provectus.kafka.ui.exception.ValidationException;
|
|
|
import com.provectus.kafka.ui.mapper.ClusterMapper;
|
|
|
import com.provectus.kafka.ui.mapper.DescribeLogDirsMapper;
|
|
|
-import com.provectus.kafka.ui.model.Broker;
|
|
|
-import com.provectus.kafka.ui.model.BrokerConfig;
|
|
|
-import com.provectus.kafka.ui.model.BrokerLogdirUpdate;
|
|
|
-import com.provectus.kafka.ui.model.BrokerMetrics;
|
|
|
-import com.provectus.kafka.ui.model.BrokersLogdirs;
|
|
|
-import com.provectus.kafka.ui.model.Cluster;
|
|
|
-import com.provectus.kafka.ui.model.ClusterMetrics;
|
|
|
-import com.provectus.kafka.ui.model.ClusterStats;
|
|
|
-import com.provectus.kafka.ui.model.ConsumerGroup;
|
|
|
-import com.provectus.kafka.ui.model.ConsumerGroupDetails;
|
|
|
+import com.provectus.kafka.ui.model.BrokerConfigDTO;
|
|
|
+import com.provectus.kafka.ui.model.BrokerDTO;
|
|
|
+import com.provectus.kafka.ui.model.BrokerLogdirUpdateDTO;
|
|
|
+import com.provectus.kafka.ui.model.BrokerMetricsDTO;
|
|
|
+import com.provectus.kafka.ui.model.BrokersLogdirsDTO;
|
|
|
+import com.provectus.kafka.ui.model.ClusterDTO;
|
|
|
+import com.provectus.kafka.ui.model.ClusterMetricsDTO;
|
|
|
+import com.provectus.kafka.ui.model.ClusterStatsDTO;
|
|
|
+import com.provectus.kafka.ui.model.ConsumerGroupDTO;
|
|
|
+import com.provectus.kafka.ui.model.ConsumerGroupDetailsDTO;
|
|
|
import com.provectus.kafka.ui.model.ConsumerPosition;
|
|
|
-import com.provectus.kafka.ui.model.CreateTopicMessage;
|
|
|
+import com.provectus.kafka.ui.model.CreateTopicMessageDTO;
|
|
|
import com.provectus.kafka.ui.model.ExtendedAdminClient;
|
|
|
import com.provectus.kafka.ui.model.Feature;
|
|
|
import com.provectus.kafka.ui.model.InternalTopic;
|
|
|
import com.provectus.kafka.ui.model.KafkaCluster;
|
|
|
-import com.provectus.kafka.ui.model.PartitionsIncrease;
|
|
|
-import com.provectus.kafka.ui.model.PartitionsIncreaseResponse;
|
|
|
-import com.provectus.kafka.ui.model.ReplicationFactorChange;
|
|
|
-import com.provectus.kafka.ui.model.ReplicationFactorChangeResponse;
|
|
|
-import com.provectus.kafka.ui.model.Topic;
|
|
|
-import com.provectus.kafka.ui.model.TopicColumnsToSort;
|
|
|
-import com.provectus.kafka.ui.model.TopicConfig;
|
|
|
-import com.provectus.kafka.ui.model.TopicCreation;
|
|
|
-import com.provectus.kafka.ui.model.TopicDetails;
|
|
|
-import com.provectus.kafka.ui.model.TopicMessage;
|
|
|
-import com.provectus.kafka.ui.model.TopicMessageEvent;
|
|
|
-import com.provectus.kafka.ui.model.TopicMessageSchema;
|
|
|
-import com.provectus.kafka.ui.model.TopicUpdate;
|
|
|
-import com.provectus.kafka.ui.model.TopicsResponse;
|
|
|
+import com.provectus.kafka.ui.model.PartitionsIncreaseDTO;
|
|
|
+import com.provectus.kafka.ui.model.PartitionsIncreaseResponseDTO;
|
|
|
+import com.provectus.kafka.ui.model.ReplicationFactorChangeDTO;
|
|
|
+import com.provectus.kafka.ui.model.ReplicationFactorChangeResponseDTO;
|
|
|
+import com.provectus.kafka.ui.model.TopicColumnsToSortDTO;
|
|
|
+import com.provectus.kafka.ui.model.TopicConfigDTO;
|
|
|
+import com.provectus.kafka.ui.model.TopicCreationDTO;
|
|
|
+import com.provectus.kafka.ui.model.TopicDTO;
|
|
|
+import com.provectus.kafka.ui.model.TopicDetailsDTO;
|
|
|
+import com.provectus.kafka.ui.model.TopicMessageEventDTO;
|
|
|
+import com.provectus.kafka.ui.model.TopicMessageSchemaDTO;
|
|
|
+import com.provectus.kafka.ui.model.TopicUpdateDTO;
|
|
|
+import com.provectus.kafka.ui.model.TopicsResponseDTO;
|
|
|
import com.provectus.kafka.ui.serde.DeserializationService;
|
|
|
import com.provectus.kafka.ui.util.ClusterUtil;
|
|
|
import java.util.Collections;
|
|
@@ -51,7 +48,6 @@ import lombok.RequiredArgsConstructor;
|
|
|
import lombok.SneakyThrows;
|
|
|
import lombok.extern.log4j.Log4j2;
|
|
|
import org.apache.commons.lang3.StringUtils;
|
|
|
-import org.apache.kafka.clients.admin.DeleteConsumerGroupsResult;
|
|
|
import org.apache.kafka.common.errors.GroupIdNotFoundException;
|
|
|
import org.apache.kafka.common.errors.GroupNotEmptyException;
|
|
|
import org.jetbrains.annotations.NotNull;
|
|
@@ -74,21 +70,21 @@ public class ClusterService {
|
|
|
private final DeserializationService deserializationService;
|
|
|
private final DescribeLogDirsMapper describeLogDirsMapper;
|
|
|
|
|
|
- public List<Cluster> getClusters() {
|
|
|
+ public List<ClusterDTO> getClusters() {
|
|
|
return clustersStorage.getKafkaClusters()
|
|
|
.stream()
|
|
|
.map(clusterMapper::toCluster)
|
|
|
.collect(Collectors.toList());
|
|
|
}
|
|
|
|
|
|
- public Mono<BrokerMetrics> getBrokerMetrics(String name, Integer id) {
|
|
|
+ public Mono<BrokerMetricsDTO> getBrokerMetrics(String name, Integer id) {
|
|
|
return Mono.justOrEmpty(clustersStorage.getClusterByName(name)
|
|
|
.map(c -> c.getMetrics().getInternalBrokerMetrics())
|
|
|
.map(m -> m.get(id))
|
|
|
.map(clusterMapper::toBrokerMetrics));
|
|
|
}
|
|
|
|
|
|
- public Mono<ClusterStats> getClusterStats(String name) {
|
|
|
+ public Mono<ClusterStatsDTO> getClusterStats(String name) {
|
|
|
return Mono.justOrEmpty(
|
|
|
clustersStorage.getClusterByName(name)
|
|
|
.map(KafkaCluster::getMetrics)
|
|
@@ -96,7 +92,7 @@ public class ClusterService {
|
|
|
);
|
|
|
}
|
|
|
|
|
|
- public Mono<ClusterMetrics> getClusterMetrics(String name) {
|
|
|
+ public Mono<ClusterMetricsDTO> getClusterMetrics(String name) {
|
|
|
return Mono.justOrEmpty(
|
|
|
clustersStorage.getClusterByName(name)
|
|
|
.map(KafkaCluster::getMetrics)
|
|
@@ -105,11 +101,11 @@ public class ClusterService {
|
|
|
}
|
|
|
|
|
|
|
|
|
- public TopicsResponse getTopics(String name, Optional<Integer> page,
|
|
|
+ public TopicsResponseDTO getTopics(String name, Optional<Integer> page,
|
|
|
Optional<Integer> nullablePerPage,
|
|
|
Optional<Boolean> showInternal,
|
|
|
Optional<String> search,
|
|
|
- Optional<TopicColumnsToSort> sortBy) {
|
|
|
+ Optional<TopicColumnsToSortDTO> sortBy) {
|
|
|
Predicate<Integer> positiveInt = i -> i > 0;
|
|
|
int perPage = nullablePerPage.filter(positiveInt).orElse(DEFAULT_PAGE_SIZE);
|
|
|
var topicsToSkip = (page.filter(positiveInt).orElse(1) - 1) * perPage;
|
|
@@ -128,7 +124,7 @@ public class ClusterService {
|
|
|
.collect(Collectors.toList());
|
|
|
var totalPages = (topics.size() / perPage)
|
|
|
+ (topics.size() % perPage == 0 ? 0 : 1);
|
|
|
- return new TopicsResponse()
|
|
|
+ return new TopicsResponseDTO()
|
|
|
.pageCount(totalPages)
|
|
|
.topics(
|
|
|
topics.stream()
|
|
@@ -145,7 +141,7 @@ public class ClusterService {
|
|
|
);
|
|
|
}
|
|
|
|
|
|
- private Comparator<InternalTopic> getComparatorForTopic(Optional<TopicColumnsToSort> sortBy) {
|
|
|
+ private Comparator<InternalTopic> getComparatorForTopic(Optional<TopicColumnsToSortDTO> sortBy) {
|
|
|
var defaultComparator = Comparator.comparing(InternalTopic::getName);
|
|
|
if (sortBy.isEmpty()) {
|
|
|
return defaultComparator;
|
|
@@ -163,7 +159,7 @@ public class ClusterService {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- public Optional<TopicDetails> getTopicDetails(String name, String topicName) {
|
|
|
+ public Optional<TopicDetailsDTO> getTopicDetails(String name, String topicName) {
|
|
|
return clustersStorage.getClusterByName(name)
|
|
|
.flatMap(c ->
|
|
|
Optional.ofNullable(c.getTopics()).map(l -> l.get(topicName)).map(
|
|
@@ -174,7 +170,7 @@ public class ClusterService {
|
|
|
);
|
|
|
}
|
|
|
|
|
|
- public Optional<List<TopicConfig>> getTopicConfigs(String name, String topicName) {
|
|
|
+ public Optional<List<TopicConfigDTO>> getTopicConfigs(String name, String topicName) {
|
|
|
return clustersStorage.getClusterByName(name)
|
|
|
.map(KafkaCluster::getTopics)
|
|
|
.map(t -> t.get(topicName))
|
|
@@ -182,7 +178,7 @@ public class ClusterService {
|
|
|
.collect(Collectors.toList()));
|
|
|
}
|
|
|
|
|
|
- public Mono<Topic> createTopic(String clusterName, Mono<TopicCreation> topicCreation) {
|
|
|
+ public Mono<TopicDTO> createTopic(String clusterName, Mono<TopicCreationDTO> topicCreation) {
|
|
|
return clustersStorage.getClusterByName(clusterName).map(cluster ->
|
|
|
kafkaService.createTopic(cluster, topicCreation)
|
|
|
.doOnNext(t -> updateCluster(t, clusterName, cluster))
|
|
@@ -191,7 +187,7 @@ public class ClusterService {
|
|
|
}
|
|
|
|
|
|
@SneakyThrows
|
|
|
- public Mono<ConsumerGroupDetails> getConsumerGroupDetail(String clusterName,
|
|
|
+ public Mono<ConsumerGroupDetailsDTO> getConsumerGroupDetail(String clusterName,
|
|
|
String consumerGroupId) {
|
|
|
var cluster = clustersStorage.getClusterByName(clusterName).orElseThrow(Throwable::new);
|
|
|
return kafkaService.getConsumerGroups(
|
|
@@ -203,11 +199,12 @@ public class ClusterService {
|
|
|
);
|
|
|
}
|
|
|
|
|
|
- public Mono<List<ConsumerGroup>> getConsumerGroups(String clusterName) {
|
|
|
+ public Mono<List<ConsumerGroupDTO>> getConsumerGroups(String clusterName) {
|
|
|
return getConsumerGroups(clusterName, Optional.empty());
|
|
|
}
|
|
|
|
|
|
- public Mono<List<ConsumerGroup>> getConsumerGroups(String clusterName, Optional<String> topic) {
|
|
|
+ public Mono<List<ConsumerGroupDTO>> getConsumerGroups(String clusterName,
|
|
|
+ Optional<String> topic) {
|
|
|
return Mono.justOrEmpty(clustersStorage.getClusterByName(clusterName))
|
|
|
.switchIfEmpty(Mono.error(ClusterNotFoundException::new))
|
|
|
.flatMap(c -> kafkaService.getConsumerGroups(c, topic, Collections.emptyList()))
|
|
@@ -216,13 +213,13 @@ public class ClusterService {
|
|
|
);
|
|
|
}
|
|
|
|
|
|
- public Flux<Broker> getBrokers(String clusterName) {
|
|
|
+ public Flux<BrokerDTO> getBrokers(String clusterName) {
|
|
|
return Mono.justOrEmpty(clustersStorage.getClusterByName(clusterName))
|
|
|
.switchIfEmpty(Mono.error(ClusterNotFoundException::new))
|
|
|
.flatMapMany(brokerService::getBrokers);
|
|
|
}
|
|
|
|
|
|
- public Flux<BrokerConfig> getBrokerConfig(String clusterName, Integer brokerId) {
|
|
|
+ public Flux<BrokerConfigDTO> getBrokerConfig(String clusterName, Integer brokerId) {
|
|
|
return Mono.justOrEmpty(clustersStorage.getClusterByName(clusterName))
|
|
|
.switchIfEmpty(Mono.error(ClusterNotFoundException::new))
|
|
|
.flatMapMany(c -> brokerService.getBrokersConfig(c, brokerId))
|
|
@@ -230,8 +227,8 @@ public class ClusterService {
|
|
|
}
|
|
|
|
|
|
@SneakyThrows
|
|
|
- public Mono<Topic> updateTopic(String clusterName, String topicName,
|
|
|
- Mono<TopicUpdate> topicUpdate) {
|
|
|
+ public Mono<TopicDTO> updateTopic(String clusterName, String topicName,
|
|
|
+ Mono<TopicUpdateDTO> topicUpdate) {
|
|
|
return clustersStorage.getClusterByName(clusterName).map(cl ->
|
|
|
topicUpdate
|
|
|
.flatMap(t -> kafkaService.updateTopic(cl, topicName, t))
|
|
@@ -267,7 +264,7 @@ public class ClusterService {
|
|
|
return updatedCluster;
|
|
|
}
|
|
|
|
|
|
- public Mono<Cluster> updateCluster(String clusterName) {
|
|
|
+ public Mono<ClusterDTO> updateCluster(String clusterName) {
|
|
|
return clustersStorage.getClusterByName(clusterName)
|
|
|
.map(cluster -> kafkaService.getUpdatedCluster(cluster)
|
|
|
.doOnNext(updatedCluster -> clustersStorage
|
|
@@ -276,7 +273,7 @@ public class ClusterService {
|
|
|
.orElse(Mono.error(new ClusterNotFoundException()));
|
|
|
}
|
|
|
|
|
|
- public Flux<TopicMessageEvent> getMessages(String clusterName, String topicName,
|
|
|
+ public Flux<TopicMessageEventDTO> getMessages(String clusterName, String topicName,
|
|
|
ConsumerPosition consumerPosition, String query,
|
|
|
Integer limit) {
|
|
|
return clustersStorage.getClusterByName(clusterName)
|
|
@@ -295,14 +292,14 @@ public class ClusterService {
|
|
|
.flatMap(offsets -> kafkaService.deleteTopicMessages(cluster, offsets));
|
|
|
}
|
|
|
|
|
|
- public Mono<PartitionsIncreaseResponse> increaseTopicPartitions(
|
|
|
+ public Mono<PartitionsIncreaseResponseDTO> increaseTopicPartitions(
|
|
|
String clusterName,
|
|
|
String topicName,
|
|
|
- PartitionsIncrease partitionsIncrease) {
|
|
|
+ PartitionsIncreaseDTO partitionsIncrease) {
|
|
|
return clustersStorage.getClusterByName(clusterName).map(cluster ->
|
|
|
kafkaService.increaseTopicPartitions(cluster, topicName, partitionsIncrease)
|
|
|
.doOnNext(t -> updateCluster(t, cluster.getName(), cluster))
|
|
|
- .map(t -> new PartitionsIncreaseResponse()
|
|
|
+ .map(t -> new PartitionsIncreaseResponseDTO()
|
|
|
.topicName(t.getName())
|
|
|
.totalPartitionsCount(t.getPartitionCount())))
|
|
|
.orElse(Mono.error(new ClusterNotFoundException(
|
|
@@ -325,7 +322,7 @@ public class ClusterService {
|
|
|
.orElse(Mono.empty());
|
|
|
}
|
|
|
|
|
|
- public TopicMessageSchema getTopicSchema(String clusterName, String topicName) {
|
|
|
+ public TopicMessageSchemaDTO getTopicSchema(String clusterName, String topicName) {
|
|
|
var cluster = clustersStorage.getClusterByName(clusterName)
|
|
|
.orElseThrow(ClusterNotFoundException::new);
|
|
|
if (!cluster.getTopics().containsKey(topicName)) {
|
|
@@ -336,7 +333,7 @@ public class ClusterService {
|
|
|
.getTopicSchema(topicName);
|
|
|
}
|
|
|
|
|
|
- public Mono<Void> sendMessage(String clusterName, String topicName, CreateTopicMessage msg) {
|
|
|
+ public Mono<Void> sendMessage(String clusterName, String topicName, CreateTopicMessageDTO msg) {
|
|
|
var cluster = clustersStorage.getClusterByName(clusterName)
|
|
|
.orElseThrow(ClusterNotFoundException::new);
|
|
|
if (!cluster.getTopics().containsKey(topicName)) {
|
|
@@ -363,21 +360,21 @@ public class ClusterService {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- public Mono<ReplicationFactorChangeResponse> changeReplicationFactor(
|
|
|
+ public Mono<ReplicationFactorChangeResponseDTO> changeReplicationFactor(
|
|
|
String clusterName,
|
|
|
String topicName,
|
|
|
- ReplicationFactorChange replicationFactorChange) {
|
|
|
+ ReplicationFactorChangeDTO replicationFactorChange) {
|
|
|
return clustersStorage.getClusterByName(clusterName).map(cluster ->
|
|
|
kafkaService.changeReplicationFactor(cluster, topicName, replicationFactorChange)
|
|
|
.doOnNext(topic -> updateCluster(topic, cluster.getName(), cluster))
|
|
|
- .map(t -> new ReplicationFactorChangeResponse()
|
|
|
+ .map(t -> new ReplicationFactorChangeResponseDTO()
|
|
|
.topicName(t.getName())
|
|
|
.totalReplicationFactor(t.getReplicationFactor())))
|
|
|
.orElse(Mono.error(new ClusterNotFoundException(
|
|
|
String.format("No cluster for name '%s'", clusterName))));
|
|
|
}
|
|
|
|
|
|
- public Flux<BrokersLogdirs> getAllBrokersLogdirs(String clusterName, List<Integer> brokers) {
|
|
|
+ public Flux<BrokersLogdirsDTO> getAllBrokersLogdirs(String clusterName, List<Integer> brokers) {
|
|
|
return Mono.justOrEmpty(clustersStorage.getClusterByName(clusterName))
|
|
|
.flatMap(c -> kafkaService.getClusterLogDirs(c, brokers))
|
|
|
.map(describeLogDirsMapper::toBrokerLogDirsList)
|
|
@@ -385,7 +382,7 @@ public class ClusterService {
|
|
|
}
|
|
|
|
|
|
public Mono<Void> updateBrokerLogDir(
|
|
|
- String clusterName, Integer id, BrokerLogdirUpdate brokerLogDir) {
|
|
|
+ String clusterName, Integer id, BrokerLogdirUpdateDTO brokerLogDir) {
|
|
|
return Mono.justOrEmpty(clustersStorage.getClusterByName(clusterName))
|
|
|
.flatMap(c -> kafkaService.updateBrokerLogDir(c, id, brokerLogDir));
|
|
|
}
|