From 19e38fb1bfe31d369bb9cd9b9eed5ddcbcef8b92 Mon Sep 17 00:00:00 2001 From: Ilya Kuramshin Date: Thu, 13 Oct 2022 22:30:07 +0400 Subject: [PATCH] Msk serverless support (BE) (#2737) Msk serverless support: 1. ReactiveAdminClient.loadBrokersConfig returns empty map if configs retrieval not supported by kafka backend 2. ReactiveAdminClient.toMone exception unwrapping added 3. FeatureService delete topics enabled set true by default 4. TopicCreationDTO.replicationFactor made optional Co-authored-by: iliax --- .../ui/exception/TopicMetadataException.java | 4 ++ .../kafka/ui/service/BrokerService.java | 12 +--- .../kafka/ui/service/FeatureService.java | 2 +- .../kafka/ui/service/ReactiveAdminClient.java | 60 ++++++++++++------- .../kafka/ui/service/TopicsService.java | 8 +-- .../main/resources/swagger/kafka-ui-api.yaml | 1 - 6 files changed, 50 insertions(+), 37 deletions(-) diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/exception/TopicMetadataException.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/exception/TopicMetadataException.java index 7ccceefe61..a659f94f97 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/exception/TopicMetadataException.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/exception/TopicMetadataException.java @@ -6,6 +6,10 @@ public class TopicMetadataException extends CustomBaseException { super(message); } + public TopicMetadataException(String message, Throwable cause) { + super(message, cause); + } + @Override public ErrorCode getErrorCode() { return ErrorCode.INVALID_ENTITY_STATE; diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/BrokerService.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/BrokerService.java index cd6ad93ba0..9c258fa8de 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/BrokerService.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/BrokerService.java @@ -47,10 +47,7 @@ public class BrokerService { private Mono> loadBrokersConfig( KafkaCluster cluster, Integer brokerId) { return loadBrokersConfig(cluster, Collections.singletonList(brokerId)) - .map(map -> map.values().stream() - .findFirst() - .orElseThrow(() -> new NotFoundException( - String.format("Config for broker %s not found", brokerId)))); + .map(map -> map.values().stream().findFirst().orElse(List.of())); } private Flux getBrokersConfig(KafkaCluster cluster, Integer brokerId) { @@ -81,13 +78,6 @@ public class BrokerService { .flatMapMany(Flux::fromIterable); } - public Mono getController(KafkaCluster cluster) { - return adminClientService - .get(cluster) - .flatMap(ReactiveAdminClient::describeCluster) - .map(ReactiveAdminClient.ClusterDescription::getController); - } - public Mono updateBrokerLogDir(KafkaCluster cluster, Integer broker, BrokerLogdirUpdateDTO brokerLogDir) { diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/FeatureService.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/FeatureService.java index 24a824fb16..9097f2b25e 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/FeatureService.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/FeatureService.java @@ -60,6 +60,6 @@ public class FeatureService { .filter(e -> e.name().equals(DELETE_TOPIC_ENABLED_SERVER_PROPERTY)) .map(e -> Boolean.parseBoolean(e.value())) .findFirst() - .orElse(false)); + .orElse(true)); } } diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/ReactiveAdminClient.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/ReactiveAdminClient.java index 0556b3a046..c08b839410 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/ReactiveAdminClient.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/ReactiveAdminClient.java @@ -18,6 +18,7 @@ import java.util.List; import java.util.Map; import java.util.Optional; import java.util.Set; +import java.util.concurrent.CompletionException; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutionException; import java.util.concurrent.atomic.AtomicInteger; @@ -54,6 +55,7 @@ import org.apache.kafka.common.acl.AclOperation; import org.apache.kafka.common.config.ConfigResource; import org.apache.kafka.common.errors.GroupIdNotFoundException; import org.apache.kafka.common.errors.GroupNotEmptyException; +import org.apache.kafka.common.errors.InvalidRequestException; import org.apache.kafka.common.errors.UnknownTopicOrPartitionException; import org.apache.kafka.common.requests.DescribeLogDirsResponse; import reactor.core.publisher.Mono; @@ -81,7 +83,7 @@ public class ReactiveAdminClient implements Closeable { } public static Mono create(AdminClient adminClient) { - return getClusterVersionImpl(adminClient) + return getClusterVersion(adminClient) .map(ver -> new ReactiveAdminClient( adminClient, @@ -105,7 +107,13 @@ public class ReactiveAdminClient implements Closeable { private static Mono toMono(KafkaFuture future) { return Mono.create(sink -> future.whenComplete((res, ex) -> { if (ex != null) { - sink.error(ex); + // KafkaFuture doc is unclear about what exception wrapper will be used + // (from docs it should be ExecutionException, be we actually see CompletionException, so checking both + if (ex instanceof CompletionException || ex instanceof ExecutionException) { + sink.error(ex.getCause()); //unwrapping exception + } else { + sink.error(ex); + } } else { sink.success(res); } @@ -166,17 +174,29 @@ public class ReactiveAdminClient implements Closeable { c -> List.copyOf(c.getValue().entries())))); } - public Mono>> loadBrokersConfig(List brokerIds) { + private static Mono>> loadBrokersConfig(AdminClient client, List brokerIds) { List resources = brokerIds.stream() .map(brokerId -> new ConfigResource(ConfigResource.Type.BROKER, Integer.toString(brokerId))) .collect(toList()); return toMono(client.describeConfigs(resources).all()) + .doOnError(InvalidRequestException.class, + th -> log.trace("Error while getting broker {} configs", brokerIds, th)) + // some kafka backends (like MSK serverless) do not support broker's configs retrieval, + // in that case InvalidRequestException will be thrown + .onErrorResume(InvalidRequestException.class, th -> Mono.just(Map.of())) .map(config -> config.entrySet().stream() .collect(toMap( c -> Integer.valueOf(c.getKey().name()), c -> new ArrayList<>(c.getValue().entries())))); } + /** + * Return per-broker configs or empty map if broker's configs retrieval not supported. + */ + public Mono>> loadBrokersConfig(List brokerIds) { + return loadBrokersConfig(client, brokerIds); + } + public Mono> describeTopics() { return listTopics(true).flatMap(this::describeTopics); } @@ -280,20 +300,16 @@ public class ReactiveAdminClient implements Closeable { })); } - private static Mono getClusterVersionImpl(AdminClient client) { - return toMono(client.describeCluster().controller()).flatMap(controller -> - toMono(client.describeConfigs( - List.of(new ConfigResource( - ConfigResource.Type.BROKER, String.valueOf(controller.id())))) - .all() - .thenApply(configs -> - configs.values().stream() - .map(Config::entries) - .flatMap(Collection::stream) - .filter(entry -> entry.name().contains("inter.broker.protocol.version")) - .findFirst().map(ConfigEntry::value) - .orElse("1.0-UNKNOWN") - ))); + private static Mono getClusterVersion(AdminClient client) { + return toMono(client.describeCluster().controller()) + .flatMap(controller -> loadBrokersConfig(client, List.of(controller.id()))) + .map(configs -> configs.values().stream() + .flatMap(Collection::stream) + .filter(entry -> entry.name().contains("inter.broker.protocol.version")) + .findFirst() + .map(ConfigEntry::value) + .orElse("1.0-UNKNOWN") + ); } public Mono deleteConsumerGroups(Collection groupIds) { @@ -306,10 +322,14 @@ public class ReactiveAdminClient implements Closeable { public Mono createTopic(String name, int numPartitions, - short replicationFactor, + @Nullable Integer replicationFactor, Map configs) { - return toMono(client.createTopics( - List.of(new NewTopic(name, numPartitions, replicationFactor).configs(configs))).all()); + var newTopic = new NewTopic( + name, + Optional.of(numPartitions), + Optional.ofNullable(replicationFactor).map(Integer::shortValue) + ).configs(configs); + return toMono(client.createTopics(List.of(newTopic)).all()); } public Mono alterPartitionReassignments( diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/TopicsService.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/TopicsService.java index 1790a3005c..2dffe0de75 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/TopicsService.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/TopicsService.java @@ -170,11 +170,11 @@ public class TopicsService { adminClient.createTopic( topicData.getName(), topicData.getPartitions(), - topicData.getReplicationFactor().shortValue(), + topicData.getReplicationFactor(), topicData.getConfigs() ).thenReturn(topicData) ) - .onErrorResume(t -> Mono.error(new TopicMetadataException(t.getMessage()))) + .onErrorMap(t -> new TopicMetadataException(t.getMessage(), t)) .flatMap(topicData -> loadTopicAfterCreation(c, topicData.getName())); } @@ -194,7 +194,7 @@ public class TopicsService { ac.createTopic( topic.getName(), topic.getPartitionCount(), - (short) topic.getReplicationFactor(), + topic.getReplicationFactor(), topic.getTopicConfigs() .stream() .collect(Collectors.toMap(InternalTopicConfig::getName, @@ -430,7 +430,7 @@ public class TopicsService { ac.createTopic( newTopicName, topic.getPartitionCount(), - (short) topic.getReplicationFactor(), + topic.getReplicationFactor(), topic.getTopicConfigs() .stream() .collect(Collectors diff --git a/kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml b/kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml index 68b8afa65a..af6c6c3e6a 100644 --- a/kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml +++ b/kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml @@ -2261,7 +2261,6 @@ components: required: - name - partitions - - replicationFactor TopicUpdate: type: object