|
@@ -8,12 +8,15 @@ import com.google.common.collect.ImmutableMap;
|
|
|
import com.google.common.collect.Iterators;
|
|
|
import com.provectus.kafka.ui.exception.IllegalEntityStateException;
|
|
|
import com.provectus.kafka.ui.exception.NotFoundException;
|
|
|
+import com.provectus.kafka.ui.exception.ValidationException;
|
|
|
import com.provectus.kafka.ui.util.MapUtil;
|
|
|
import com.provectus.kafka.ui.util.NumberUtil;
|
|
|
+import com.provectus.kafka.ui.util.annotations.KafkaClientInternalsDependant;
|
|
|
import java.io.Closeable;
|
|
|
import java.util.ArrayList;
|
|
|
import java.util.Arrays;
|
|
|
import java.util.Collection;
|
|
|
+import java.util.HashSet;
|
|
|
import java.util.Iterator;
|
|
|
import java.util.List;
|
|
|
import java.util.Map;
|
|
@@ -25,6 +28,7 @@ import java.util.concurrent.ExecutionException;
|
|
|
import java.util.concurrent.atomic.AtomicInteger;
|
|
|
import java.util.function.BiFunction;
|
|
|
import java.util.function.Function;
|
|
|
+import java.util.function.Predicate;
|
|
|
import java.util.stream.Collectors;
|
|
|
import java.util.stream.Stream;
|
|
|
import javax.annotation.Nullable;
|
|
@@ -51,6 +55,7 @@ import org.apache.kafka.common.KafkaException;
|
|
|
import org.apache.kafka.common.KafkaFuture;
|
|
|
import org.apache.kafka.common.Node;
|
|
|
import org.apache.kafka.common.TopicPartition;
|
|
|
+import org.apache.kafka.common.TopicPartitionInfo;
|
|
|
import org.apache.kafka.common.TopicPartitionReplica;
|
|
|
import org.apache.kafka.common.acl.AclOperation;
|
|
|
import org.apache.kafka.common.config.ConfigResource;
|
|
@@ -400,32 +405,81 @@ public class ReactiveAdminClient implements Closeable {
|
|
|
.all());
|
|
|
}
|
|
|
|
|
|
- public Mono<Map<TopicPartition, Long>> listOffsets(String topic,
|
|
|
- OffsetSpec offsetSpec) {
|
|
|
- return topicPartitions(topic).flatMap(tps -> listOffsets(tps, offsetSpec));
|
|
|
+ /**
|
|
|
+ * List offset for the topic's partitions and OffsetSpec.
|
|
|
+ * @param failOnUnknownLeader true - throw exception in case of no-leader partitions,
|
|
|
+ * false - skip partitions with no leader
|
|
|
+ */
|
|
|
+ public Mono<Map<TopicPartition, Long>> listTopicOffsets(String topic,
|
|
|
+ OffsetSpec offsetSpec,
|
|
|
+ boolean failOnUnknownLeader) {
|
|
|
+ return describeTopic(topic)
|
|
|
+ .map(td -> filterPartitionsWithLeaderCheck(List.of(td), p -> true, failOnUnknownLeader))
|
|
|
+ .flatMap(partitions -> listOffsetsUnsafe(partitions, offsetSpec));
|
|
|
}
|
|
|
|
|
|
+ /**
|
|
|
+ * List offset for the specified partitions and OffsetSpec.
|
|
|
+ * @param failOnUnknownLeader true - throw exception in case of no-leader partitions,
|
|
|
+ * false - skip partitions with no leader
|
|
|
+ */
|
|
|
public Mono<Map<TopicPartition, Long>> listOffsets(Collection<TopicPartition> partitions,
|
|
|
- OffsetSpec offsetSpec) {
|
|
|
- //TODO: need to split this into multiple calls if number of target partitions is big
|
|
|
- return toMono(
|
|
|
- client.listOffsets(partitions.stream().collect(toMap(tp -> tp, tp -> offsetSpec))).all())
|
|
|
- .map(offsets -> offsets.entrySet()
|
|
|
- .stream()
|
|
|
- // filtering partitions for which offsets were not found
|
|
|
- .filter(e -> e.getValue().offset() >= 0)
|
|
|
- .collect(toMap(Map.Entry::getKey, e -> e.getValue().offset())));
|
|
|
- }
|
|
|
-
|
|
|
- private Mono<Set<TopicPartition>> topicPartitions(String topic) {
|
|
|
- return toMono(client.describeTopics(List.of(topic)).all())
|
|
|
- .map(r -> r.values().stream()
|
|
|
- .findFirst()
|
|
|
- .stream()
|
|
|
- .flatMap(d -> d.partitions().stream())
|
|
|
- .map(p -> new TopicPartition(topic, p.partition()))
|
|
|
- .collect(Collectors.toSet())
|
|
|
- );
|
|
|
+ OffsetSpec offsetSpec,
|
|
|
+ boolean failOnUnknownLeader) {
|
|
|
+ return filterPartitionsWithLeaderCheck(partitions, failOnUnknownLeader)
|
|
|
+ .flatMap(parts -> listOffsetsUnsafe(parts, offsetSpec));
|
|
|
+ }
|
|
|
+
|
|
|
+ private Mono<Collection<TopicPartition>> filterPartitionsWithLeaderCheck(Collection<TopicPartition> partitions,
|
|
|
+ boolean failOnUnknownLeader) {
|
|
|
+ var targetTopics = partitions.stream().map(TopicPartition::topic).collect(Collectors.toSet());
|
|
|
+ return describeTopicsImpl(targetTopics)
|
|
|
+ .map(descriptions ->
|
|
|
+ filterPartitionsWithLeaderCheck(
|
|
|
+ descriptions.values(), partitions::contains, failOnUnknownLeader));
|
|
|
+ }
|
|
|
+
|
|
|
+ private Set<TopicPartition> filterPartitionsWithLeaderCheck(Collection<TopicDescription> topicDescriptions,
|
|
|
+ Predicate<TopicPartition> partitionPredicate,
|
|
|
+ boolean failOnUnknownLeader) {
|
|
|
+ var goodPartitions = new HashSet<TopicPartition>();
|
|
|
+ for (TopicDescription description : topicDescriptions) {
|
|
|
+ for (TopicPartitionInfo partitionInfo : description.partitions()) {
|
|
|
+ TopicPartition topicPartition = new TopicPartition(description.name(), partitionInfo.partition());
|
|
|
+ if (!partitionPredicate.test(topicPartition)) {
|
|
|
+ continue;
|
|
|
+ }
|
|
|
+ if (partitionInfo.leader() != null) {
|
|
|
+ goodPartitions.add(topicPartition);
|
|
|
+ } else if (failOnUnknownLeader) {
|
|
|
+ throw new ValidationException(String.format("Topic partition %s has no leader", topicPartition));
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ return goodPartitions;
|
|
|
+ }
|
|
|
+
|
|
|
+ // 1. NOTE(!): should only apply for partitions with existing leader,
|
|
|
+ // otherwise AdminClient will try to fetch topic metadata, fail and retry infinitely (until timeout)
|
|
|
+ // 2. TODO: check if it is a bug that AdminClient never throws LeaderNotAvailableException and just retrying instead
|
|
|
+ @KafkaClientInternalsDependant
|
|
|
+ public Mono<Map<TopicPartition, Long>> listOffsetsUnsafe(Collection<TopicPartition> partitions,
|
|
|
+ OffsetSpec offsetSpec) {
|
|
|
+
|
|
|
+ Function<Collection<TopicPartition>, Mono<Map<TopicPartition, Long>>> call =
|
|
|
+ parts -> toMono(
|
|
|
+ client.listOffsets(parts.stream().collect(toMap(tp -> tp, tp -> offsetSpec))).all())
|
|
|
+ .map(offsets -> offsets.entrySet().stream()
|
|
|
+ // filtering partitions for which offsets were not found
|
|
|
+ .filter(e -> e.getValue().offset() >= 0)
|
|
|
+ .collect(toMap(Map.Entry::getKey, e -> e.getValue().offset())));
|
|
|
+
|
|
|
+ return partitionCalls(
|
|
|
+ partitions,
|
|
|
+ 200,
|
|
|
+ call,
|
|
|
+ (m1, m2) -> ImmutableMap.<TopicPartition, Long>builder().putAll(m1).putAll(m2).build()
|
|
|
+ );
|
|
|
}
|
|
|
|
|
|
public Mono<Void> updateBrokerConfigByName(Integer brokerId, String name, String value) {
|