Browse Source

Fix random topic creation fails (#3004)

ReactiveAdminClient::listOffsetsUnsafe now skipping non-initialized partitions. This prevents UnknownTopicOrPartitionException to be leaked to high-level logic levels (preventing TopicService::loadTopicAfterCreation to do retries)
Ilya Kuramshin 2 years ago
parent
commit
43ccca43c2

+ 20 - 9
kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/ReactiveAdminClient.java

@@ -3,6 +3,7 @@ package com.provectus.kafka.ui.service;
 import static com.google.common.util.concurrent.Uninterruptibles.getUninterruptibly;
 import static java.util.stream.Collectors.toList;
 import static java.util.stream.Collectors.toMap;
+import static org.apache.kafka.clients.admin.ListOffsetsResult.ListOffsetsResultInfo;
 
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Iterators;
@@ -16,6 +17,7 @@ import java.io.Closeable;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
@@ -45,6 +47,7 @@ import org.apache.kafka.clients.admin.ConsumerGroupDescription;
 import org.apache.kafka.clients.admin.ConsumerGroupListing;
 import org.apache.kafka.clients.admin.DescribeConfigsOptions;
 import org.apache.kafka.clients.admin.ListConsumerGroupOffsetsOptions;
+import org.apache.kafka.clients.admin.ListOffsetsResult;
 import org.apache.kafka.clients.admin.ListTopicsOptions;
 import org.apache.kafka.clients.admin.NewPartitionReassignment;
 import org.apache.kafka.clients.admin.NewPartitions;
@@ -170,6 +173,7 @@ public class ReactiveAdminClient implements Closeable {
     return listTopics(true).flatMap(topics -> getTopicsConfig(topics, false));
   }
 
+  //NOTE: skips not-found topics (for which UnknownTopicOrPartitionException was thrown by AdminClient)
   public Mono<Map<String, List<ConfigEntry>>> getTopicsConfig(Collection<String> topicNames, boolean includeDoc) {
     var includeDocFixed = features.contains(SupportedFeature.CONFIG_DOCUMENTATION_RETRIEVAL) && includeDoc;
     // we need to partition calls, because it can lead to AdminClient timeouts in case of large topics count
@@ -258,7 +262,7 @@ public class ReactiveAdminClient implements Closeable {
    * This method converts input map into Mono[Map] ignoring keys for which KafkaFutures
    * finished with <code>clazz</code> exception.
    */
-  private <K, V> Mono<Map<K, V>> toMonoWithExceptionFilter(Map<K, KafkaFuture<V>> values,
+  static <K, V> Mono<Map<K, V>> toMonoWithExceptionFilter(Map<K, KafkaFuture<V>> values,
                                                            Class<? extends KafkaException> clazz) {
     if (values.isEmpty()) {
       return Mono.just(Map.of());
@@ -468,19 +472,26 @@ public class ReactiveAdminClient implements Closeable {
   }
 
   // 1. NOTE(!): should only apply for partitions with existing leader,
-  // otherwise AdminClient will try to fetch topic metadata, fail and retry infinitely (until timeout)
-  // 2. TODO: check if it is a bug that AdminClient never throws LeaderNotAvailableException and just retrying instead
+  //    otherwise AdminClient will try to fetch topic metadata, fail and retry infinitely (until timeout)
+  // 2. NOTE(!): Skips partitions that were not initialized yet
+  //    (UnknownTopicOrPartitionException thrown, ex. after topic creation)
+  // 3. TODO: check if it is a bug that AdminClient never throws LeaderNotAvailableException and just retrying instead
   @KafkaClientInternalsDependant
   public Mono<Map<TopicPartition, Long>> listOffsetsUnsafe(Collection<TopicPartition> partitions,
                                                            OffsetSpec offsetSpec) {
 
     Function<Collection<TopicPartition>, Mono<Map<TopicPartition, Long>>> call =
-        parts -> toMono(
-            client.listOffsets(parts.stream().collect(toMap(tp -> tp, tp -> offsetSpec))).all())
-            .map(offsets -> offsets.entrySet().stream()
-                // filtering partitions for which offsets were not found
-                .filter(e -> e.getValue().offset() >= 0)
-                .collect(toMap(Map.Entry::getKey, e -> e.getValue().offset())));
+        parts -> {
+          ListOffsetsResult r = client.listOffsets(parts.stream().collect(toMap(tp -> tp, tp -> offsetSpec)));
+          Map<TopicPartition, KafkaFuture<ListOffsetsResultInfo>> perPartitionResults = new HashMap<>();
+          parts.forEach(p -> perPartitionResults.put(p, r.partitionResult(p)));
+
+          return toMonoWithExceptionFilter(perPartitionResults, UnknownTopicOrPartitionException.class)
+              .map(offsets -> offsets.entrySet().stream()
+                  // filtering partitions for which offsets were not found
+                  .filter(e -> e.getValue().offset() >= 0)
+                  .collect(toMap(Map.Entry::getKey, e -> e.getValue().offset())));
+        };
 
     return partitionCalls(
         partitions,

+ 56 - 0
kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/ReactiveAdminClientTest.java

@@ -1,9 +1,11 @@
 package com.provectus.kafka.ui.service;
 
+import static com.provectus.kafka.ui.service.ReactiveAdminClient.toMonoWithExceptionFilter;
 import static java.util.Objects.requireNonNull;
 import static org.assertj.core.api.Assertions.assertThat;
 
 import com.provectus.kafka.ui.AbstractIntegrationTest;
+import com.provectus.kafka.ui.producer.KafkaTestProducer;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
@@ -15,7 +17,13 @@ import org.apache.kafka.clients.admin.AlterConfigOp;
 import org.apache.kafka.clients.admin.Config;
 import org.apache.kafka.clients.admin.ConfigEntry;
 import org.apache.kafka.clients.admin.NewTopic;
+import org.apache.kafka.clients.admin.OffsetSpec;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.KafkaFuture;
+import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.config.ConfigResource;
+import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
+import org.apache.kafka.common.internals.KafkaFutureImpl;
 import org.junit.function.ThrowingRunnable;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
@@ -88,5 +96,53 @@ class ReactiveAdminClientTest extends AbstractIntegrationTest {
     clearings.add(() -> adminClient.deleteTopics(Stream.of(topics).map(NewTopic::name).toList()).all().get());
   }
 
+  @Test
+  void testToMonoWithExceptionFilter() {
+    var failedFuture = new KafkaFutureImpl<String>();
+    failedFuture.completeExceptionally(new UnknownTopicOrPartitionException());
+
+    var okFuture = new KafkaFutureImpl<String>();
+    okFuture.complete("done");
+
+    Map<String, KafkaFuture<String>> arg = Map.of("failure", failedFuture, "ok", okFuture);
+    StepVerifier.create(toMonoWithExceptionFilter(arg, UnknownTopicOrPartitionException.class))
+        .assertNext(result -> assertThat(result).hasSize(1).containsEntry("ok", "done"))
+        .verifyComplete();
+  }
+
+  @Test
+  void testListOffsetsUnsafe() {
+    String topic = UUID.randomUUID().toString();
+    createTopics(new NewTopic(topic, 2, (short) 1));
+
+    // sending messages to have non-zero offsets for tp
+    try (var producer = KafkaTestProducer.forKafka(kafka)) {
+      producer.send(new ProducerRecord<>(topic, 1, "k", "v"));
+      producer.send(new ProducerRecord<>(topic, 1, "k", "v"));
+    }
+
+    var requestedPartitions = List.of(
+        new TopicPartition(topic, 0),
+        new TopicPartition(topic, 1)
+    );
+
+    StepVerifier.create(reactiveAdminClient.listOffsetsUnsafe(requestedPartitions, OffsetSpec.earliest()))
+        .assertNext(offsets -> {
+          assertThat(offsets)
+              .hasSize(2)
+              .containsEntry(new TopicPartition(topic, 0), 0L)
+              .containsEntry(new TopicPartition(topic, 1), 0L);
+        })
+        .verifyComplete();
+
+    StepVerifier.create(reactiveAdminClient.listOffsetsUnsafe(requestedPartitions, OffsetSpec.latest()))
+        .assertNext(offsets -> {
+          assertThat(offsets)
+              .hasSize(2)
+              .containsEntry(new TopicPartition(topic, 0), 0L)
+              .containsEntry(new TopicPartition(topic, 1), 2L);
+        })
+        .verifyComplete();
+  }
 
 }