From 9c48e5682a9974a619f6cf0cb51c5ab2b80a5336 Mon Sep 17 00:00:00 2001 From: Roman Zabaluev Date: Wed, 6 Oct 2021 21:01:46 +0300 Subject: [PATCH] Fix unsupported DESCRIBE_CONFIGS (#939) --- .../kafka/ui/service/BrokerServiceImpl.java | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/BrokerServiceImpl.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/BrokerServiceImpl.java index ba5d8932a0..236ea36d28 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/BrokerServiceImpl.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/BrokerServiceImpl.java @@ -7,8 +7,8 @@ import com.provectus.kafka.ui.model.ExtendedAdminClient; import com.provectus.kafka.ui.model.InternalBrokerConfig; import com.provectus.kafka.ui.model.KafkaCluster; import com.provectus.kafka.ui.util.ClusterUtil; -import java.util.ArrayList; import java.util.Collections; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.stream.Collectors; @@ -18,6 +18,7 @@ import org.apache.kafka.clients.admin.ConfigEntry; import org.apache.kafka.clients.admin.DescribeConfigsOptions; import org.apache.kafka.common.Node; import org.apache.kafka.common.config.ConfigResource; +import org.apache.kafka.common.errors.UnsupportedVersionException; import org.springframework.stereotype.Service; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; @@ -39,11 +40,15 @@ public class BrokerServiceImpl implements BrokerService { .map(ExtendedAdminClient::getAdminClient) .flatMap(adminClient -> ClusterUtil.toMono(adminClient.describeConfigs(resources, - new DescribeConfigsOptions().includeSynonyms(true)).all()) - .map(config -> config.entrySet().stream() + new DescribeConfigsOptions().includeSynonyms(true)).all()) + .map(config -> config.entrySet() + .stream() .collect(Collectors.toMap( c -> Integer.valueOf(c.getKey().name()), - c -> new ArrayList<>(c.getValue().entries()))))); + c -> List.copyOf(c.getValue().entries()) + )) + )) + .onErrorResume(UnsupportedVersionException.class, (e) -> Mono.just(new HashMap<>())); } private Mono> loadBrokersConfig(