Browse Source

Fix timeouts with huge amount of topics (#2019)

* partitioning getTopicsConfig & describeTopics calls for cases where topics count is big

* comments added

Co-authored-by: iliax <ikuramshin@provectus.com>
Co-authored-by: Roman Zabaluev <rzabaluev@provectus.com>
Ilya Kuramshin 3 years ago
parent
commit
2fcb0d1abe

+ 46 - 0
kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/ReactiveAdminClient.java

@@ -4,6 +4,8 @@ import static com.google.common.util.concurrent.Uninterruptibles.getUninterrupti
 import static java.util.stream.Collectors.toList;
 import static java.util.stream.Collectors.toMap;
 
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Iterators;
 import com.provectus.kafka.ui.exception.IllegalEntityStateException;
 import com.provectus.kafka.ui.exception.NotFoundException;
 import com.provectus.kafka.ui.util.MapUtil;
@@ -11,6 +13,7 @@ import com.provectus.kafka.ui.util.NumberUtil;
 import java.io.Closeable;
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
@@ -18,6 +21,8 @@ import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.BiFunction;
+import java.util.function.Function;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 import javax.annotation.Nullable;
@@ -131,6 +136,16 @@ public class ReactiveAdminClient implements Closeable {
   }
 
   public Mono<Map<String, List<ConfigEntry>>> getTopicsConfig(Collection<String> topicNames) {
+    // we need to partition calls, because it can lead to AdminClient timeouts in case of large topics count
+    return partitionCalls(
+        topicNames,
+        200,
+        this::getTopicsConfigImpl,
+        (m1, m2) -> ImmutableMap.<String, List<ConfigEntry>>builder().putAll(m1).putAll(m2).build()
+    );
+  }
+
+  private Mono<Map<String, List<ConfigEntry>>> getTopicsConfigImpl(Collection<String> topicNames) {
     List<ConfigResource> resources = topicNames.stream()
         .map(topicName -> new ConfigResource(ConfigResource.Type.TOPIC, topicName))
         .collect(toList());
@@ -162,6 +177,16 @@ public class ReactiveAdminClient implements Closeable {
   }
 
   public Mono<Map<String, TopicDescription>> describeTopics(Collection<String> topics) {
+    // we need to partition calls, because it can lead to AdminClient timeouts in case of large topics count
+    return partitionCalls(
+        topics,
+        200,
+        this::describeTopicsImpl,
+        (m1, m2) -> ImmutableMap.<String, TopicDescription>builder().putAll(m1).putAll(m2).build()
+    );
+  }
+
+  private Mono<Map<String, TopicDescription>> describeTopicsImpl(Collection<String> topics) {
     return toMonoWithExceptionFilter(
         client.describeTopics(topics).values(),
         UnknownTopicOrPartitionException.class
@@ -402,6 +427,27 @@ public class ReactiveAdminClient implements Closeable {
     return toMono(client.alterConfigs(Map.of(topicResource, config)).all());
   }
 
+  /**
+   * Splits input collection into batches, applies each batch sequentially to function
+   * and merges output Monos into one Mono.
+   */
+  private static <R, I> Mono<R> partitionCalls(Collection<I> items,
+                                               int partitionSize,
+                                               Function<Collection<I>, Mono<R>> call,
+                                               BiFunction<R, R, R> merger) {
+    if (items.isEmpty()) {
+      return call.apply(items);
+    }
+    Iterator<List<I>> parts = Iterators.partition(items.iterator(), partitionSize);
+    Mono<R> mono = call.apply(parts.next());
+    while (parts.hasNext()) {
+      var nextPart = parts.next();
+      // calls will be executed sequentially
+      mono = mono.flatMap(res1 -> call.apply(nextPart).map(res2 -> merger.apply(res1, res2)));
+    }
+    return mono;
+  }
+
   @Override
   public void close() {
     client.close();