|
@@ -7,8 +7,8 @@ import com.provectus.kafka.ui.model.ExtendedAdminClient;
|
|
import com.provectus.kafka.ui.model.InternalBrokerConfig;
|
|
import com.provectus.kafka.ui.model.InternalBrokerConfig;
|
|
import com.provectus.kafka.ui.model.KafkaCluster;
|
|
import com.provectus.kafka.ui.model.KafkaCluster;
|
|
import com.provectus.kafka.ui.util.ClusterUtil;
|
|
import com.provectus.kafka.ui.util.ClusterUtil;
|
|
-import java.util.ArrayList;
|
|
|
|
import java.util.Collections;
|
|
import java.util.Collections;
|
|
|
|
+import java.util.HashMap;
|
|
import java.util.List;
|
|
import java.util.List;
|
|
import java.util.Map;
|
|
import java.util.Map;
|
|
import java.util.stream.Collectors;
|
|
import java.util.stream.Collectors;
|
|
@@ -18,6 +18,7 @@ import org.apache.kafka.clients.admin.ConfigEntry;
|
|
import org.apache.kafka.clients.admin.DescribeConfigsOptions;
|
|
import org.apache.kafka.clients.admin.DescribeConfigsOptions;
|
|
import org.apache.kafka.common.Node;
|
|
import org.apache.kafka.common.Node;
|
|
import org.apache.kafka.common.config.ConfigResource;
|
|
import org.apache.kafka.common.config.ConfigResource;
|
|
|
|
+import org.apache.kafka.common.errors.UnsupportedVersionException;
|
|
import org.springframework.stereotype.Service;
|
|
import org.springframework.stereotype.Service;
|
|
import reactor.core.publisher.Flux;
|
|
import reactor.core.publisher.Flux;
|
|
import reactor.core.publisher.Mono;
|
|
import reactor.core.publisher.Mono;
|
|
@@ -39,11 +40,15 @@ public class BrokerServiceImpl implements BrokerService {
|
|
.map(ExtendedAdminClient::getAdminClient)
|
|
.map(ExtendedAdminClient::getAdminClient)
|
|
.flatMap(adminClient ->
|
|
.flatMap(adminClient ->
|
|
ClusterUtil.toMono(adminClient.describeConfigs(resources,
|
|
ClusterUtil.toMono(adminClient.describeConfigs(resources,
|
|
- new DescribeConfigsOptions().includeSynonyms(true)).all())
|
|
|
|
- .map(config -> config.entrySet().stream()
|
|
|
|
|
|
+ new DescribeConfigsOptions().includeSynonyms(true)).all())
|
|
|
|
+ .map(config -> config.entrySet()
|
|
|
|
+ .stream()
|
|
.collect(Collectors.toMap(
|
|
.collect(Collectors.toMap(
|
|
c -> Integer.valueOf(c.getKey().name()),
|
|
c -> Integer.valueOf(c.getKey().name()),
|
|
- c -> new ArrayList<>(c.getValue().entries())))));
|
|
|
|
|
|
+ c -> List.copyOf(c.getValue().entries())
|
|
|
|
+ ))
|
|
|
|
+ ))
|
|
|
|
+ .onErrorResume(UnsupportedVersionException.class, (e) -> Mono.just(new HashMap<>()));
|
|
}
|
|
}
|
|
|
|
|
|
private Mono<List<ConfigEntry>> loadBrokersConfig(
|
|
private Mono<List<ConfigEntry>> loadBrokersConfig(
|