|
@@ -1,6 +1,8 @@
|
|
package com.provectus.kafka.ui.service;
|
|
package com.provectus.kafka.ui.service;
|
|
|
|
|
|
|
|
+import com.provectus.kafka.ui.exception.TopicMetadataException;
|
|
import com.provectus.kafka.ui.exception.ValidationException;
|
|
import com.provectus.kafka.ui.exception.ValidationException;
|
|
|
|
+import com.provectus.kafka.ui.model.CleanupPolicy;
|
|
import com.provectus.kafka.ui.model.CreateTopicMessage;
|
|
import com.provectus.kafka.ui.model.CreateTopicMessage;
|
|
import com.provectus.kafka.ui.model.ExtendedAdminClient;
|
|
import com.provectus.kafka.ui.model.ExtendedAdminClient;
|
|
import com.provectus.kafka.ui.model.InternalBrokerDiskUsage;
|
|
import com.provectus.kafka.ui.model.InternalBrokerDiskUsage;
|
|
@@ -159,6 +161,7 @@ public class KafkaService {
|
|
.onlinePartitionCount(topicsMetrics.getOnlinePartitionCount())
|
|
.onlinePartitionCount(topicsMetrics.getOnlinePartitionCount())
|
|
.offlinePartitionCount(topicsMetrics.getOfflinePartitionCount())
|
|
.offlinePartitionCount(topicsMetrics.getOfflinePartitionCount())
|
|
.zooKeeperStatus(ClusterUtil.convertToIntServerStatus(zookeeperStatus))
|
|
.zooKeeperStatus(ClusterUtil.convertToIntServerStatus(zookeeperStatus))
|
|
|
|
+ .version(version)
|
|
.build();
|
|
.build();
|
|
|
|
|
|
return currentCluster.toBuilder()
|
|
return currentCluster.toBuilder()
|
|
@@ -205,12 +208,18 @@ public class KafkaService {
|
|
|
|
|
|
private Map<String, InternalTopic> mergeWithConfigs(
|
|
private Map<String, InternalTopic> mergeWithConfigs(
|
|
List<InternalTopic> topics, Map<String, List<InternalTopicConfig>> configs) {
|
|
List<InternalTopic> topics, Map<String, List<InternalTopicConfig>> configs) {
|
|
- return topics.stream().map(
|
|
|
|
- t -> t.toBuilder().topicConfigs(configs.get(t.getName())).build()
|
|
|
|
- ).collect(Collectors.toMap(
|
|
|
|
- InternalTopic::getName,
|
|
|
|
- e -> e
|
|
|
|
- ));
|
|
|
|
|
|
+ return topics.stream()
|
|
|
|
+ .map(t -> t.toBuilder().topicConfigs(configs.get(t.getName())).build())
|
|
|
|
+ .map(t -> t.toBuilder().cleanUpPolicy(
|
|
|
|
+ CleanupPolicy.fromString(t.getTopicConfigs().stream()
|
|
|
|
+ .filter(config -> config.getName().equals("cleanup.policy"))
|
|
|
|
+ .findFirst()
|
|
|
|
+ .orElseGet(() -> InternalTopicConfig.builder().value("unknown").build())
|
|
|
|
+ .getValue())).build())
|
|
|
|
+ .collect(Collectors.toMap(
|
|
|
|
+ InternalTopic::getName,
|
|
|
|
+ e -> e
|
|
|
|
+ ));
|
|
}
|
|
}
|
|
|
|
|
|
@SneakyThrows
|
|
@SneakyThrows
|
|
@@ -223,11 +232,12 @@ public class KafkaService {
|
|
final Mono<Map<String, List<InternalTopicConfig>>> configsMono =
|
|
final Mono<Map<String, List<InternalTopicConfig>>> configsMono =
|
|
loadTopicsConfig(adminClient, topics);
|
|
loadTopicsConfig(adminClient, topics);
|
|
|
|
|
|
- return ClusterUtil.toMono(adminClient.describeTopics(topics).all()).map(
|
|
|
|
- m -> m.values().stream().map(ClusterUtil::mapToInternalTopic).collect(Collectors.toList())
|
|
|
|
- ).flatMap(internalTopics -> configsMono.map(configs ->
|
|
|
|
- mergeWithConfigs(internalTopics, configs).values()
|
|
|
|
- )).flatMapMany(Flux::fromIterable);
|
|
|
|
|
|
+ return ClusterUtil.toMono(adminClient.describeTopics(topics).all())
|
|
|
|
+ .map(m -> m.values().stream()
|
|
|
|
+ .map(ClusterUtil::mapToInternalTopic).collect(Collectors.toList()))
|
|
|
|
+ .flatMap(internalTopics -> configsMono
|
|
|
|
+ .map(configs -> mergeWithConfigs(internalTopics, configs).values()))
|
|
|
|
+ .flatMapMany(Flux::fromIterable);
|
|
}
|
|
}
|
|
|
|
|
|
|
|
|
|
@@ -260,10 +270,12 @@ public class KafkaService {
|
|
topicData.getReplicationFactor().shortValue());
|
|
topicData.getReplicationFactor().shortValue());
|
|
newTopic.configs(topicData.getConfigs());
|
|
newTopic.configs(topicData.getConfigs());
|
|
return createTopic(adminClient, newTopic).map(v -> topicData);
|
|
return createTopic(adminClient, newTopic).map(v -> topicData);
|
|
- }).flatMap(
|
|
|
|
- topicData ->
|
|
|
|
- getTopicsData(adminClient, Collections.singleton(topicData.getName()))
|
|
|
|
- .next()
|
|
|
|
|
|
+ })
|
|
|
|
+ .onErrorResume(t -> Mono.error(new TopicMetadataException(t.getMessage())))
|
|
|
|
+ .flatMap(
|
|
|
|
+ topicData ->
|
|
|
|
+ getTopicsData(adminClient, Collections.singleton(topicData.getName()))
|
|
|
|
+ .next()
|
|
).switchIfEmpty(Mono.error(new RuntimeException("Can't find created topic")))
|
|
).switchIfEmpty(Mono.error(new RuntimeException("Can't find created topic")))
|
|
.flatMap(t ->
|
|
.flatMap(t ->
|
|
loadTopicsConfig(adminClient, Collections.singletonList(t.getName()))
|
|
loadTopicsConfig(adminClient, Collections.singletonList(t.getName()))
|