diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/exception/TopicMetadataException.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/exception/TopicMetadataException.java index 7ccceefe61..a659f94f97 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/exception/TopicMetadataException.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/exception/TopicMetadataException.java @@ -6,6 +6,10 @@ public class TopicMetadataException extends CustomBaseException { super(message); } + public TopicMetadataException(String message, Throwable cause) { + super(message, cause); + } + @Override public ErrorCode getErrorCode() { return ErrorCode.INVALID_ENTITY_STATE; diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/BrokerService.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/BrokerService.java index cd6ad93ba0..9c258fa8de 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/BrokerService.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/BrokerService.java @@ -47,10 +47,7 @@ public class BrokerService { private Mono> loadBrokersConfig( KafkaCluster cluster, Integer brokerId) { return loadBrokersConfig(cluster, Collections.singletonList(brokerId)) - .map(map -> map.values().stream() - .findFirst() - .orElseThrow(() -> new NotFoundException( - String.format("Config for broker %s not found", brokerId)))); + .map(map -> map.values().stream().findFirst().orElse(List.of())); } private Flux getBrokersConfig(KafkaCluster cluster, Integer brokerId) { @@ -81,13 +78,6 @@ public class BrokerService { .flatMapMany(Flux::fromIterable); } - public Mono getController(KafkaCluster cluster) { - return adminClientService - .get(cluster) - .flatMap(ReactiveAdminClient::describeCluster) - .map(ReactiveAdminClient.ClusterDescription::getController); - } - public Mono updateBrokerLogDir(KafkaCluster cluster, Integer broker, BrokerLogdirUpdateDTO brokerLogDir) { diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/FeatureService.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/FeatureService.java index 24a824fb16..9097f2b25e 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/FeatureService.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/FeatureService.java @@ -60,6 +60,6 @@ public class FeatureService { .filter(e -> e.name().equals(DELETE_TOPIC_ENABLED_SERVER_PROPERTY)) .map(e -> Boolean.parseBoolean(e.value())) .findFirst() - .orElse(false)); + .orElse(true)); } } diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/ReactiveAdminClient.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/ReactiveAdminClient.java index 0556b3a046..c08b839410 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/ReactiveAdminClient.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/ReactiveAdminClient.java @@ -18,6 +18,7 @@ import java.util.List; import java.util.Map; import java.util.Optional; import java.util.Set; +import java.util.concurrent.CompletionException; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutionException; import java.util.concurrent.atomic.AtomicInteger; @@ -54,6 +55,7 @@ import org.apache.kafka.common.acl.AclOperation; import org.apache.kafka.common.config.ConfigResource; import org.apache.kafka.common.errors.GroupIdNotFoundException; import org.apache.kafka.common.errors.GroupNotEmptyException; +import org.apache.kafka.common.errors.InvalidRequestException; import org.apache.kafka.common.errors.UnknownTopicOrPartitionException; import org.apache.kafka.common.requests.DescribeLogDirsResponse; import reactor.core.publisher.Mono; @@ -81,7 +83,7 @@ public class ReactiveAdminClient implements Closeable { } public static Mono create(AdminClient adminClient) { - return getClusterVersionImpl(adminClient) + return getClusterVersion(adminClient) .map(ver -> new ReactiveAdminClient( adminClient, @@ -105,7 +107,13 @@ public class ReactiveAdminClient implements Closeable { private static Mono toMono(KafkaFuture future) { return Mono.create(sink -> future.whenComplete((res, ex) -> { if (ex != null) { - sink.error(ex); + // KafkaFuture doc is unclear about what exception wrapper will be used + // (from docs it should be ExecutionException, be we actually see CompletionException, so checking both + if (ex instanceof CompletionException || ex instanceof ExecutionException) { + sink.error(ex.getCause()); //unwrapping exception + } else { + sink.error(ex); + } } else { sink.success(res); } @@ -166,17 +174,29 @@ public class ReactiveAdminClient implements Closeable { c -> List.copyOf(c.getValue().entries())))); } - public Mono>> loadBrokersConfig(List brokerIds) { + private static Mono>> loadBrokersConfig(AdminClient client, List brokerIds) { List resources = brokerIds.stream() .map(brokerId -> new ConfigResource(ConfigResource.Type.BROKER, Integer.toString(brokerId))) .collect(toList()); return toMono(client.describeConfigs(resources).all()) + .doOnError(InvalidRequestException.class, + th -> log.trace("Error while getting broker {} configs", brokerIds, th)) + // some kafka backends (like MSK serverless) do not support broker's configs retrieval, + // in that case InvalidRequestException will be thrown + .onErrorResume(InvalidRequestException.class, th -> Mono.just(Map.of())) .map(config -> config.entrySet().stream() .collect(toMap( c -> Integer.valueOf(c.getKey().name()), c -> new ArrayList<>(c.getValue().entries())))); } + /** + * Return per-broker configs or empty map if broker's configs retrieval not supported. + */ + public Mono>> loadBrokersConfig(List brokerIds) { + return loadBrokersConfig(client, brokerIds); + } + public Mono> describeTopics() { return listTopics(true).flatMap(this::describeTopics); } @@ -280,20 +300,16 @@ public class ReactiveAdminClient implements Closeable { })); } - private static Mono getClusterVersionImpl(AdminClient client) { - return toMono(client.describeCluster().controller()).flatMap(controller -> - toMono(client.describeConfigs( - List.of(new ConfigResource( - ConfigResource.Type.BROKER, String.valueOf(controller.id())))) - .all() - .thenApply(configs -> - configs.values().stream() - .map(Config::entries) - .flatMap(Collection::stream) - .filter(entry -> entry.name().contains("inter.broker.protocol.version")) - .findFirst().map(ConfigEntry::value) - .orElse("1.0-UNKNOWN") - ))); + private static Mono getClusterVersion(AdminClient client) { + return toMono(client.describeCluster().controller()) + .flatMap(controller -> loadBrokersConfig(client, List.of(controller.id()))) + .map(configs -> configs.values().stream() + .flatMap(Collection::stream) + .filter(entry -> entry.name().contains("inter.broker.protocol.version")) + .findFirst() + .map(ConfigEntry::value) + .orElse("1.0-UNKNOWN") + ); } public Mono deleteConsumerGroups(Collection groupIds) { @@ -306,10 +322,14 @@ public class ReactiveAdminClient implements Closeable { public Mono createTopic(String name, int numPartitions, - short replicationFactor, + @Nullable Integer replicationFactor, Map configs) { - return toMono(client.createTopics( - List.of(new NewTopic(name, numPartitions, replicationFactor).configs(configs))).all()); + var newTopic = new NewTopic( + name, + Optional.of(numPartitions), + Optional.ofNullable(replicationFactor).map(Integer::shortValue) + ).configs(configs); + return toMono(client.createTopics(List.of(newTopic)).all()); } public Mono alterPartitionReassignments( diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/TopicsService.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/TopicsService.java index 1790a3005c..2dffe0de75 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/TopicsService.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/TopicsService.java @@ -170,11 +170,11 @@ public class TopicsService { adminClient.createTopic( topicData.getName(), topicData.getPartitions(), - topicData.getReplicationFactor().shortValue(), + topicData.getReplicationFactor(), topicData.getConfigs() ).thenReturn(topicData) ) - .onErrorResume(t -> Mono.error(new TopicMetadataException(t.getMessage()))) + .onErrorMap(t -> new TopicMetadataException(t.getMessage(), t)) .flatMap(topicData -> loadTopicAfterCreation(c, topicData.getName())); } @@ -194,7 +194,7 @@ public class TopicsService { ac.createTopic( topic.getName(), topic.getPartitionCount(), - (short) topic.getReplicationFactor(), + topic.getReplicationFactor(), topic.getTopicConfigs() .stream() .collect(Collectors.toMap(InternalTopicConfig::getName, @@ -430,7 +430,7 @@ public class TopicsService { ac.createTopic( newTopicName, topic.getPartitionCount(), - (short) topic.getReplicationFactor(), + topic.getReplicationFactor(), topic.getTopicConfigs() .stream() .collect(Collectors diff --git a/kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml b/kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml index 68b8afa65a..af6c6c3e6a 100644 --- a/kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml +++ b/kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml @@ -2261,7 +2261,6 @@ components: required: - name - partitions - - replicationFactor TopicUpdate: type: object