Prechádzať zdrojové kódy

[BE] Supress kafka authorization errors (#3376)

* wip

* wip

* typo fix, minor impr

* test fixes

* wip

---------

Co-authored-by: iliax <ikuramshin@provectus.com>
Co-authored-by: Roman Zabaluev <rzabaluev@provectus.com>
Ilya Kuramshin 2 rokov pred
rodič
commit
741bbc1be1

+ 25 - 10
kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/ReactiveAdminClient.java

@@ -62,9 +62,11 @@ import org.apache.kafka.common.TopicPartitionInfo;
 import org.apache.kafka.common.TopicPartitionReplica;
 import org.apache.kafka.common.acl.AclOperation;
 import org.apache.kafka.common.config.ConfigResource;
+import org.apache.kafka.common.errors.ClusterAuthorizationException;
 import org.apache.kafka.common.errors.GroupIdNotFoundException;
 import org.apache.kafka.common.errors.GroupNotEmptyException;
 import org.apache.kafka.common.errors.InvalidRequestException;
+import org.apache.kafka.common.errors.TopicAuthorizationException;
 import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
 import org.apache.kafka.common.errors.UnsupportedVersionException;
 import org.apache.kafka.common.requests.DescribeLogDirsResponse;
@@ -176,6 +178,7 @@ public class ReactiveAdminClient implements Closeable {
   }
 
   //NOTE: skips not-found topics (for which UnknownTopicOrPartitionException was thrown by AdminClient)
+  //and topics for which DESCRIBE_CONFIGS permission is not set (TopicAuthorizationException was thrown)
   public Mono<Map<String, List<ConfigEntry>>> getTopicsConfig(Collection<String> topicNames, boolean includeDoc) {
     var includeDocFixed = features.contains(SupportedFeature.CONFIG_DOCUMENTATION_RETRIEVAL) && includeDoc;
     // we need to partition calls, because it can lead to AdminClient timeouts in case of large topics count
@@ -196,7 +199,8 @@ public class ReactiveAdminClient implements Closeable {
         client.describeConfigs(
             resources,
             new DescribeConfigsOptions().includeSynonyms(true).includeDocumentation(includeDoc)).values(),
-        UnknownTopicOrPartitionException.class
+        UnknownTopicOrPartitionException.class,
+        TopicAuthorizationException.class
     ).map(config -> config.entrySet().stream()
         .collect(toMap(
             c -> c.getKey().name(),
@@ -208,11 +212,17 @@ public class ReactiveAdminClient implements Closeable {
         .map(brokerId -> new ConfigResource(ConfigResource.Type.BROKER, Integer.toString(brokerId)))
         .collect(toList());
     return toMono(client.describeConfigs(resources).all())
-        .doOnError(InvalidRequestException.class,
-            th -> log.trace("Error while getting broker {} configs", brokerIds, th))
         // some kafka backends (like MSK serverless) do not support broker's configs retrieval,
         // in that case InvalidRequestException will be thrown
-        .onErrorResume(InvalidRequestException.class, th -> Mono.just(Map.of()))
+        .onErrorResume(InvalidRequestException.class, th -> {
+          log.trace("Error while getting broker {} configs", brokerIds, th);
+          return Mono.just(Map.of());
+        })
+        // there are situations when kafka-ui user has no DESCRIBE_CONFIGS permission on cluster
+        .onErrorResume(ClusterAuthorizationException.class, th -> {
+          log.trace("AuthorizationException while getting configs for brokers {}", brokerIds, th);
+          return Mono.just(Map.of());
+        })
         .map(config -> config.entrySet().stream()
             .collect(toMap(
                 c -> Integer.valueOf(c.getKey().name()),
@@ -242,13 +252,16 @@ public class ReactiveAdminClient implements Closeable {
 
   private Mono<Map<String, TopicDescription>> describeTopicsImpl(Collection<String> topics) {
     return toMonoWithExceptionFilter(
-        client.describeTopics(topics).values(),
-        UnknownTopicOrPartitionException.class
+        client.describeTopics(topics).topicNameValues(),
+        UnknownTopicOrPartitionException.class,
+        // we only describe topics that we see from listTopics() API, so we should have permission to do it,
+        // but also adding this exception here for rare case when access restricted after we called listTopics()
+        TopicAuthorizationException.class
     );
   }
 
   /**
-   * Returns TopicDescription mono, or Empty Mono if topic not found.
+   * Returns TopicDescription mono, or Empty Mono if topic not visible.
    */
   public Mono<TopicDescription> describeTopic(String topic) {
     return describeTopics(List.of(topic)).flatMap(m -> Mono.justOrEmpty(m.get(topic)));
@@ -262,10 +275,11 @@ public class ReactiveAdminClient implements Closeable {
    * such topics in resulting map.
    * <p/>
    * This method converts input map into Mono[Map] ignoring keys for which KafkaFutures
-   * finished with <code>clazz</code> exception and empty Monos.
+   * finished with <code>classes</code> exceptions and empty Monos.
    */
+  @SafeVarargs
   static <K, V> Mono<Map<K, V>> toMonoWithExceptionFilter(Map<K, KafkaFuture<V>> values,
-                                                          Class<? extends KafkaException> clazz) {
+                                                          Class<? extends KafkaException>... classes) {
     if (values.isEmpty()) {
       return Mono.just(Map.of());
     }
@@ -277,7 +291,7 @@ public class ReactiveAdminClient implements Closeable {
                 .defaultIfEmpty(Tuples.of(e.getKey(), Optional.empty())) //tracking empty Monos
                 .onErrorResume(
                     // tracking Monos with suppressible error
-                    th -> th.getClass().isAssignableFrom(clazz),
+                    th -> Stream.of(classes).anyMatch(clazz -> th.getClass().isAssignableFrom(clazz)),
                     th -> Mono.just(Tuples.of(e.getKey(), Optional.empty()))))
         .toList();
 
@@ -300,6 +314,7 @@ public class ReactiveAdminClient implements Closeable {
       Collection<Integer> brokerIds) {
     return toMono(client.describeLogDirs(brokerIds).all())
         .onErrorResume(UnsupportedVersionException.class, th -> Mono.just(Map.of()))
+        .onErrorResume(ClusterAuthorizationException.class, th -> Mono.just(Map.of()))
         .onErrorResume(th -> true, th -> {
           log.warn("Error while calling describeLogDirs", th);
           return Mono.just(Map.of());

+ 7 - 2
kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/TopicsService.java

@@ -162,9 +162,14 @@ public class TopicsService {
   }
 
   public Mono<List<ConfigEntry>> getTopicConfigs(KafkaCluster cluster, String topicName) {
+    // there 2 case that we cover here:
+    // 1. topic not found/visible - describeTopic() will be empty and we will throw TopicNotFoundException
+    // 2. topic is visible, but we don't have DESCRIBE_CONFIG permission - we should return empty list
     return adminClientService.get(cluster)
-        .flatMap(ac -> ac.getTopicsConfig(List.of(topicName), true))
-        .map(m -> m.values().stream().findFirst().orElseThrow(TopicNotFoundException::new));
+        .flatMap(ac -> ac.describeTopic(topicName)
+            .switchIfEmpty(Mono.error(new TopicNotFoundException()))
+            .then(ac.getTopicsConfig(List.of(topicName), true))
+            .map(m -> m.values().stream().findFirst().orElse(List.of())));
   }
 
   private Mono<InternalTopic> createTopic(KafkaCluster c, ReactiveAdminClient adminClient,