|
@@ -212,17 +212,24 @@ public class ReactiveAdminClient implements Closeable {
|
|
|
.map(brokerId -> new ConfigResource(ConfigResource.Type.BROKER, Integer.toString(brokerId)))
|
|
|
.collect(toList());
|
|
|
return toMono(client.describeConfigs(resources).all())
|
|
|
- // some kafka backends (like MSK serverless) do not support broker's configs retrieval,
|
|
|
- // in that case InvalidRequestException will be thrown
|
|
|
- .onErrorResume(InvalidRequestException.class, th -> {
|
|
|
- log.trace("Error while getting broker {} configs", brokerIds, th);
|
|
|
- return Mono.just(Map.of());
|
|
|
- })
|
|
|
+ // some kafka backends don't support broker's configs retrieval,
|
|
|
+ // and throw various exceptions on describeConfigs() call
|
|
|
+ .onErrorResume(th -> th instanceof InvalidRequestException // MSK Serverless
|
|
|
+ || th instanceof UnknownTopicOrPartitionException, // Azure event hub
|
|
|
+ th -> {
|
|
|
+ log.trace("Error while getting configs for brokers {}", brokerIds, th);
|
|
|
+ return Mono.just(Map.of());
|
|
|
+ })
|
|
|
// there are situations when kafka-ui user has no DESCRIBE_CONFIGS permission on cluster
|
|
|
.onErrorResume(ClusterAuthorizationException.class, th -> {
|
|
|
log.trace("AuthorizationException while getting configs for brokers {}", brokerIds, th);
|
|
|
return Mono.just(Map.of());
|
|
|
})
|
|
|
+ // catching all remaining exceptions, but logging on WARN level
|
|
|
+ .onErrorResume(th -> true, th -> {
|
|
|
+ log.warn("Unexpected error while getting configs for brokers {}", brokerIds, th);
|
|
|
+ return Mono.just(Map.of());
|
|
|
+ })
|
|
|
.map(config -> config.entrySet().stream()
|
|
|
.collect(toMap(
|
|
|
c -> Integer.valueOf(c.getKey().name()),
|