Jelajahi Sumber

BE: Fix loading freezes in case one of the brokers is down (#3618)

Co-authored-by: iliax <ikuramshin@provectus.com>
Co-authored-by: Roman Zabaluev <rzabaluev@provectus.com>
Ilya Kuramshin 2 tahun lalu
induk
melakukan
dbdced5bab

+ 29 - 10
kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/ReactiveAdminClient.java

@@ -4,6 +4,7 @@ import static java.util.stream.Collectors.toList;
 import static java.util.stream.Collectors.toMap;
 import static org.apache.kafka.clients.admin.ListOffsetsResult.ListOffsetsResultInfo;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.ImmutableTable;
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Table;
@@ -498,6 +499,14 @@ public class ReactiveAdminClient implements Closeable {
         .flatMap(parts -> listOffsetsUnsafe(parts, offsetSpec));
   }
 
+  /**
+   * List offset for the specified topics, skipping no-leader partitions.
+   */
+  public Mono<Map<TopicPartition, Long>> listOffsets(Collection<TopicDescription> topicDescriptions,
+                                                     OffsetSpec offsetSpec) {
+    return listOffsetsUnsafe(filterPartitionsWithLeaderCheck(topicDescriptions, p -> true, false), offsetSpec);
+  }
+
   private Mono<Collection<TopicPartition>> filterPartitionsWithLeaderCheck(Collection<TopicPartition> partitions,
                                                                            boolean failOnUnknownLeader) {
     var targetTopics = partitions.stream().map(TopicPartition::topic).collect(Collectors.toSet());
@@ -507,34 +516,44 @@ public class ReactiveAdminClient implements Closeable {
                 descriptions.values(), partitions::contains, failOnUnknownLeader));
   }
 
-  private Set<TopicPartition> filterPartitionsWithLeaderCheck(Collection<TopicDescription> topicDescriptions,
+  @VisibleForTesting
+  static Set<TopicPartition> filterPartitionsWithLeaderCheck(Collection<TopicDescription> topicDescriptions,
                                                               Predicate<TopicPartition> partitionPredicate,
                                                               boolean failOnUnknownLeader) {
     var goodPartitions = new HashSet<TopicPartition>();
     for (TopicDescription description : topicDescriptions) {
+      var goodTopicPartitions = new ArrayList<TopicPartition>();
       for (TopicPartitionInfo partitionInfo : description.partitions()) {
         TopicPartition topicPartition = new TopicPartition(description.name(), partitionInfo.partition());
-        if (!partitionPredicate.test(topicPartition)) {
-          continue;
+        if (partitionInfo.leader() == null) {
+          if (failOnUnknownLeader) {
+            throw new ValidationException(String.format("Topic partition %s has no leader", topicPartition));
+          } else {
+            // if ANY of topic partitions has no leader - we have to skip all topic partitions
+            goodTopicPartitions.clear();
+            break;
+          }
         }
-        if (partitionInfo.leader() != null) {
-          goodPartitions.add(topicPartition);
-        } else if (failOnUnknownLeader) {
-          throw new ValidationException(String.format("Topic partition %s has no leader", topicPartition));
+        if (partitionPredicate.test(topicPartition)) {
+          goodTopicPartitions.add(topicPartition);
         }
       }
+      goodPartitions.addAll(goodTopicPartitions);
     }
     return goodPartitions;
   }
 
-  // 1. NOTE(!): should only apply for partitions with existing leader,
+  // 1. NOTE(!): should only apply for partitions from topics where all partitions have leaders,
   //    otherwise AdminClient will try to fetch topic metadata, fail and retry infinitely (until timeout)
   // 2. NOTE(!): Skips partitions that were not initialized yet
   //    (UnknownTopicOrPartitionException thrown, ex. after topic creation)
   // 3. TODO: check if it is a bug that AdminClient never throws LeaderNotAvailableException and just retrying instead
   @KafkaClientInternalsDependant
-  public Mono<Map<TopicPartition, Long>> listOffsetsUnsafe(Collection<TopicPartition> partitions,
-                                                           OffsetSpec offsetSpec) {
+  @VisibleForTesting
+  Mono<Map<TopicPartition, Long>> listOffsetsUnsafe(Collection<TopicPartition> partitions, OffsetSpec offsetSpec) {
+    if (partitions.isEmpty()) {
+      return Mono.just(Map.of());
+    }
 
     Function<Collection<TopicPartition>, Mono<Map<TopicPartition, Long>>> call =
         parts -> {

+ 7 - 14
kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/TopicsService.java

@@ -3,6 +3,7 @@ package com.provectus.kafka.ui.service;
 import static java.util.stream.Collectors.toList;
 import static java.util.stream.Collectors.toMap;
 
+import com.google.common.collect.Sets;
 import com.provectus.kafka.ui.config.ClustersProperties;
 import com.provectus.kafka.ui.exception.TopicMetadataException;
 import com.provectus.kafka.ui.exception.TopicNotFoundException;
@@ -136,22 +137,14 @@ public class TopicsService {
   }
 
   private Mono<InternalPartitionsOffsets> getPartitionOffsets(Map<String, TopicDescription>
-                                                                  descriptions,
+                                                                  descriptionsMap,
                                                               ReactiveAdminClient ac) {
-    var topicPartitions = descriptions.values().stream()
-        .flatMap(desc ->
-            desc.partitions().stream()
-                // list offsets should only be applied to partitions with existing leader
-                // (see ReactiveAdminClient.listOffsetsUnsafe(..) docs)
-                .filter(tp -> tp.leader() != null)
-                .map(p -> new TopicPartition(desc.name(), p.partition())))
-        .collect(toList());
-
-    return ac.listOffsetsUnsafe(topicPartitions, OffsetSpec.earliest())
-        .zipWith(ac.listOffsetsUnsafe(topicPartitions, OffsetSpec.latest()),
+    var descriptions = descriptionsMap.values();
+    return ac.listOffsets(descriptions, OffsetSpec.earliest())
+        .zipWith(ac.listOffsets(descriptions, OffsetSpec.latest()),
             (earliest, latest) ->
-                topicPartitions.stream()
-                    .filter(tp -> earliest.containsKey(tp) && latest.containsKey(tp))
+                Sets.intersection(earliest.keySet(), latest.keySet())
+                    .stream()
                     .map(tp ->
                         Map.entry(tp,
                             new InternalPartitionsOffsets.Offsets(

+ 57 - 0
kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/ReactiveAdminClientTest.java

@@ -4,8 +4,11 @@ import static com.provectus.kafka.ui.service.ReactiveAdminClient.toMonoWithExcep
 import static java.util.Objects.requireNonNull;
 import static org.apache.kafka.clients.admin.ListOffsetsResult.ListOffsetsResultInfo;
 import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static org.assertj.core.api.ThrowableAssert.ThrowingCallable;
 
 import com.provectus.kafka.ui.AbstractIntegrationTest;
+import com.provectus.kafka.ui.exception.ValidationException;
 import com.provectus.kafka.ui.producer.KafkaTestProducer;
 import java.time.Duration;
 import java.util.ArrayList;
@@ -22,16 +25,20 @@ import org.apache.kafka.clients.admin.Config;
 import org.apache.kafka.clients.admin.ConfigEntry;
 import org.apache.kafka.clients.admin.NewTopic;
 import org.apache.kafka.clients.admin.OffsetSpec;
+import org.apache.kafka.clients.admin.TopicDescription;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.clients.consumer.OffsetAndMetadata;
 import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.common.KafkaFuture;
+import org.apache.kafka.common.Node;
 import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.TopicPartitionInfo;
 import org.apache.kafka.common.config.ConfigResource;
 import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
 import org.apache.kafka.common.internals.KafkaFutureImpl;
 import org.apache.kafka.common.serialization.StringDeserializer;
+import org.assertj.core.api.ThrowableAssert;
 import org.junit.function.ThrowingRunnable;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
@@ -133,6 +140,56 @@ class ReactiveAdminClientTest extends AbstractIntegrationTest {
         .verifyComplete();
   }
 
+  @Test
+  void filterPartitionsWithLeaderCheckSkipsPartitionsFromTopicWhereSomePartitionsHaveNoLeader() {
+    var filteredPartitions = ReactiveAdminClient.filterPartitionsWithLeaderCheck(
+        List.of(
+            // contains partitions with no leader
+            new TopicDescription("noLeaderTopic", false,
+                List.of(
+                    new TopicPartitionInfo(0, new Node(1, "n1", 9092), List.of(), List.of()),
+                    new TopicPartitionInfo(1, null, List.of(), List.of()))),
+            // should be skipped by predicate
+            new TopicDescription("skippingByPredicate", false,
+                List.of(
+                    new TopicPartitionInfo(0, new Node(1, "n1", 9092), List.of(), List.of()))),
+            // good topic
+            new TopicDescription("good", false,
+                List.of(
+                    new TopicPartitionInfo(0, new Node(1, "n1", 9092), List.of(), List.of()),
+                    new TopicPartitionInfo(1, new Node(2, "n2", 9092), List.of(), List.of()))
+            )),
+        p -> !p.topic().equals("skippingByPredicate"),
+        false
+    );
+
+    assertThat(filteredPartitions)
+        .containsExactlyInAnyOrder(
+            new TopicPartition("good", 0),
+            new TopicPartition("good", 1)
+        );
+  }
+
+  @Test
+  void filterPartitionsWithLeaderCheckThrowExceptionIfThereIsSomePartitionsWithoutLeaderAndFlagSet() {
+    ThrowingCallable call = () -> ReactiveAdminClient.filterPartitionsWithLeaderCheck(
+        List.of(
+            // contains partitions with no leader
+            new TopicDescription("t1", false,
+                List.of(
+                    new TopicPartitionInfo(0, new Node(1, "n1", 9092), List.of(), List.of()),
+                    new TopicPartitionInfo(1, null, List.of(), List.of()))),
+            new TopicDescription("t2", false,
+                List.of(
+                    new TopicPartitionInfo(0, new Node(1, "n1", 9092), List.of(), List.of()))
+            )),
+        p -> true,
+        // setting failOnNoLeader flag
+        true
+    );
+    assertThatThrownBy(call).isInstanceOf(ValidationException.class);
+  }
+
   @Test
   void testListOffsetsUnsafe() {
     String topic = UUID.randomUUID().toString();