diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/InternalPartition.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/InternalPartition.java index 76f916cb10..f5d16e0b65 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/InternalPartition.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/InternalPartition.java @@ -13,12 +13,12 @@ public class InternalPartition { private final int inSyncReplicasCount; private final int replicasCount; - private final long offsetMin; - private final long offsetMax; + private final Long offsetMin; + private final Long offsetMax; // from log dir - private final long segmentSize; - private final long segmentCount; + private final Long segmentSize; + private final Integer segmentCount; } diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/ConsumerGroupService.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/ConsumerGroupService.java index ee9a9d8a6a..2de70c88ca 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/ConsumerGroupService.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/ConsumerGroupService.java @@ -49,8 +49,8 @@ public class ConsumerGroupService { var tpsFromGroupOffsets = groupOffsetsMap.values().stream() .flatMap(v -> v.keySet().stream()) .collect(Collectors.toSet()); - // 2. getting end offsets for partitions with in committed offsets - return ac.listOffsets(tpsFromGroupOffsets, OffsetSpec.latest()) + // 2. getting end offsets for partitions with committed offsets + return ac.listOffsets(tpsFromGroupOffsets, OffsetSpec.latest(), false) .map(endOffsets -> descriptions.stream() .map(desc -> { @@ -68,7 +68,7 @@ public class ConsumerGroupService { String topic) { return adminClientService.get(cluster) // 1. getting topic's end offsets - .flatMap(ac -> ac.listOffsets(topic, OffsetSpec.latest()) + .flatMap(ac -> ac.listTopicOffsets(topic, OffsetSpec.latest(), false) .flatMap(endOffsets -> { var tps = new ArrayList<>(endOffsets.keySet()); // 2. getting all consumer groups diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/MessagesService.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/MessagesService.java index 1f217b1e40..ecfeda0122 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/MessagesService.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/MessagesService.java @@ -65,8 +65,8 @@ public class MessagesService { private Mono> offsetsForDeletion(KafkaCluster cluster, String topicName, List partitionsToInclude) { return adminClientService.get(cluster).flatMap(ac -> - ac.listOffsets(topicName, OffsetSpec.earliest()) - .zipWith(ac.listOffsets(topicName, OffsetSpec.latest()), + ac.listTopicOffsets(topicName, OffsetSpec.earliest(), true) + .zipWith(ac.listTopicOffsets(topicName, OffsetSpec.latest(), true), (start, end) -> end.entrySet().stream() .filter(e -> partitionsToInclude.isEmpty() diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/OffsetsResetService.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/OffsetsResetService.java index b2675d51be..36b812473e 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/OffsetsResetService.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/OffsetsResetService.java @@ -47,11 +47,12 @@ public class OffsetsResetService { @Nullable Collection partitions, OffsetSpec spec) { if (partitions == null) { - return client.listOffsets(topic, spec); + return client.listTopicOffsets(topic, spec, true); } return client.listOffsets( partitions.stream().map(idx -> new TopicPartition(topic, idx)).collect(toSet()), - spec + spec, + true ); } @@ -84,9 +85,9 @@ public class OffsetsResetService { .collect(toMap(e -> new TopicPartition(topic, e.getKey()), Map.Entry::getValue)); return checkGroupCondition(cluster, group).flatMap( ac -> - ac.listOffsets(partitionOffsets.keySet(), OffsetSpec.earliest()) + ac.listOffsets(partitionOffsets.keySet(), OffsetSpec.earliest(), true) .flatMap(earliest -> - ac.listOffsets(partitionOffsets.keySet(), OffsetSpec.latest()) + ac.listOffsets(partitionOffsets.keySet(), OffsetSpec.latest(), true) .map(latest -> editOffsetsBounds(partitionOffsets, earliest, latest)) .flatMap(offsetsToCommit -> resetOffsets(ac, group, offsetsToCommit))) ); diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/ReactiveAdminClient.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/ReactiveAdminClient.java index 463e2d9b6c..2504473b17 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/ReactiveAdminClient.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/ReactiveAdminClient.java @@ -8,12 +8,15 @@ import com.google.common.collect.ImmutableMap; import com.google.common.collect.Iterators; import com.provectus.kafka.ui.exception.IllegalEntityStateException; import com.provectus.kafka.ui.exception.NotFoundException; +import com.provectus.kafka.ui.exception.ValidationException; import com.provectus.kafka.ui.util.MapUtil; import com.provectus.kafka.ui.util.NumberUtil; +import com.provectus.kafka.ui.util.annotations.KafkaClientInternalsDependant; import java.io.Closeable; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; +import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Map; @@ -25,6 +28,7 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.BiFunction; import java.util.function.Function; +import java.util.function.Predicate; import java.util.stream.Collectors; import java.util.stream.Stream; import javax.annotation.Nullable; @@ -51,6 +55,7 @@ import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.KafkaFuture; import org.apache.kafka.common.Node; import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.TopicPartitionInfo; import org.apache.kafka.common.TopicPartitionReplica; import org.apache.kafka.common.acl.AclOperation; import org.apache.kafka.common.config.ConfigResource; @@ -400,32 +405,81 @@ public class ReactiveAdminClient implements Closeable { .all()); } - public Mono> listOffsets(String topic, - OffsetSpec offsetSpec) { - return topicPartitions(topic).flatMap(tps -> listOffsets(tps, offsetSpec)); + /** + * List offset for the topic's partitions and OffsetSpec. + * @param failOnUnknownLeader true - throw exception in case of no-leader partitions, + * false - skip partitions with no leader + */ + public Mono> listTopicOffsets(String topic, + OffsetSpec offsetSpec, + boolean failOnUnknownLeader) { + return describeTopic(topic) + .map(td -> filterPartitionsWithLeaderCheck(List.of(td), p -> true, failOnUnknownLeader)) + .flatMap(partitions -> listOffsetsUnsafe(partitions, offsetSpec)); } + /** + * List offset for the specified partitions and OffsetSpec. + * @param failOnUnknownLeader true - throw exception in case of no-leader partitions, + * false - skip partitions with no leader + */ public Mono> listOffsets(Collection partitions, - OffsetSpec offsetSpec) { - //TODO: need to split this into multiple calls if number of target partitions is big - return toMono( - client.listOffsets(partitions.stream().collect(toMap(tp -> tp, tp -> offsetSpec))).all()) - .map(offsets -> offsets.entrySet() - .stream() - // filtering partitions for which offsets were not found - .filter(e -> e.getValue().offset() >= 0) - .collect(toMap(Map.Entry::getKey, e -> e.getValue().offset()))); + OffsetSpec offsetSpec, + boolean failOnUnknownLeader) { + return filterPartitionsWithLeaderCheck(partitions, failOnUnknownLeader) + .flatMap(parts -> listOffsetsUnsafe(parts, offsetSpec)); } - private Mono> topicPartitions(String topic) { - return toMono(client.describeTopics(List.of(topic)).all()) - .map(r -> r.values().stream() - .findFirst() - .stream() - .flatMap(d -> d.partitions().stream()) - .map(p -> new TopicPartition(topic, p.partition())) - .collect(Collectors.toSet()) - ); + private Mono> filterPartitionsWithLeaderCheck(Collection partitions, + boolean failOnUnknownLeader) { + var targetTopics = partitions.stream().map(TopicPartition::topic).collect(Collectors.toSet()); + return describeTopicsImpl(targetTopics) + .map(descriptions -> + filterPartitionsWithLeaderCheck( + descriptions.values(), partitions::contains, failOnUnknownLeader)); + } + + private Set filterPartitionsWithLeaderCheck(Collection topicDescriptions, + Predicate partitionPredicate, + boolean failOnUnknownLeader) { + var goodPartitions = new HashSet(); + for (TopicDescription description : topicDescriptions) { + for (TopicPartitionInfo partitionInfo : description.partitions()) { + TopicPartition topicPartition = new TopicPartition(description.name(), partitionInfo.partition()); + if (!partitionPredicate.test(topicPartition)) { + continue; + } + if (partitionInfo.leader() != null) { + goodPartitions.add(topicPartition); + } else if (failOnUnknownLeader) { + throw new ValidationException(String.format("Topic partition %s has no leader", topicPartition)); + } + } + } + return goodPartitions; + } + + // 1. NOTE(!): should only apply for partitions with existing leader, + // otherwise AdminClient will try to fetch topic metadata, fail and retry infinitely (until timeout) + // 2. TODO: check if it is a bug that AdminClient never throws LeaderNotAvailableException and just retrying instead + @KafkaClientInternalsDependant + public Mono> listOffsetsUnsafe(Collection partitions, + OffsetSpec offsetSpec) { + + Function, Mono>> call = + parts -> toMono( + client.listOffsets(parts.stream().collect(toMap(tp -> tp, tp -> offsetSpec))).all()) + .map(offsets -> offsets.entrySet().stream() + // filtering partitions for which offsets were not found + .filter(e -> e.getValue().offset() >= 0) + .collect(toMap(Map.Entry::getKey, e -> e.getValue().offset()))); + + return partitionCalls( + partitions, + 200, + call, + (m1, m2) -> ImmutableMap.builder().putAll(m1).putAll(m2).build() + ); } public Mono updateBrokerConfigByName(Integer brokerId, String name, String value) { diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/TopicsService.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/TopicsService.java index 8badcebc36..7b08d69fd6 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/TopicsService.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/TopicsService.java @@ -138,11 +138,15 @@ public class TopicsService { ReactiveAdminClient ac) { var topicPartitions = descriptions.values().stream() .flatMap(desc -> - desc.partitions().stream().map(p -> new TopicPartition(desc.name(), p.partition()))) + desc.partitions().stream() + // list offsets should only be applied to partitions with existing leader + // (see ReactiveAdminClient.listOffsetsUnsafe(..) docs) + .filter(tp -> tp.leader() != null) + .map(p -> new TopicPartition(desc.name(), p.partition()))) .collect(toList()); - return ac.listOffsets(topicPartitions, OffsetSpec.earliest()) - .zipWith(ac.listOffsets(topicPartitions, OffsetSpec.latest()), + return ac.listOffsetsUnsafe(topicPartitions, OffsetSpec.earliest()) + .zipWith(ac.listOffsetsUnsafe(topicPartitions, OffsetSpec.latest()), (earliest, latest) -> topicPartitions.stream() .filter(tp -> earliest.containsKey(tp) && latest.containsKey(tp))