|
@@ -7,13 +7,16 @@ import com.provectus.kafka.ui.model.KafkaCluster;
|
|
import com.provectus.kafka.ui.util.ClusterUtil;
|
|
import com.provectus.kafka.ui.util.ClusterUtil;
|
|
import java.util.Collection;
|
|
import java.util.Collection;
|
|
import java.util.Collections;
|
|
import java.util.Collections;
|
|
|
|
+import java.util.HashMap;
|
|
import java.util.List;
|
|
import java.util.List;
|
|
import java.util.Map;
|
|
import java.util.Map;
|
|
import java.util.Optional;
|
|
import java.util.Optional;
|
|
import java.util.Properties;
|
|
import java.util.Properties;
|
|
|
|
+import java.util.Set;
|
|
import java.util.UUID;
|
|
import java.util.UUID;
|
|
import java.util.stream.Collectors;
|
|
import java.util.stream.Collectors;
|
|
import lombok.RequiredArgsConstructor;
|
|
import lombok.RequiredArgsConstructor;
|
|
|
|
+import org.apache.kafka.clients.admin.OffsetSpec;
|
|
import org.apache.kafka.clients.consumer.ConsumerConfig;
|
|
import org.apache.kafka.clients.consumer.ConsumerConfig;
|
|
import org.apache.kafka.clients.consumer.KafkaConsumer;
|
|
import org.apache.kafka.clients.consumer.KafkaConsumer;
|
|
import org.apache.kafka.common.TopicPartition;
|
|
import org.apache.kafka.common.TopicPartition;
|
|
@@ -62,18 +65,25 @@ public class ConsumerGroupService {
|
|
consumerGroups = getConsumerGroupsInternal(cluster, groupIds);
|
|
consumerGroups = getConsumerGroupsInternal(cluster, groupIds);
|
|
}
|
|
}
|
|
|
|
|
|
- return consumerGroups.map(c ->
|
|
|
|
- c.stream()
|
|
|
|
- .map(d -> ClusterUtil.filterConsumerGroupTopic(d, topic))
|
|
|
|
- .filter(Optional::isPresent)
|
|
|
|
- .map(Optional::get)
|
|
|
|
- .map(g ->
|
|
|
|
- g.toBuilder().endOffsets(
|
|
|
|
- topicPartitionsEndOffsets(cluster, g.getOffsets().keySet())
|
|
|
|
- ).build()
|
|
|
|
- )
|
|
|
|
- .collect(Collectors.toList())
|
|
|
|
- );
|
|
|
|
|
|
+ return consumerGroups.flatMap(c -> {
|
|
|
|
+ final List<InternalConsumerGroup> groups = c.stream()
|
|
|
|
+ .map(d -> ClusterUtil.filterConsumerGroupTopic(d, topic))
|
|
|
|
+ .filter(Optional::isPresent)
|
|
|
|
+ .map(Optional::get)
|
|
|
|
+ .collect(Collectors.toList());
|
|
|
|
+
|
|
|
|
+ final Set<TopicPartition> topicPartitions =
|
|
|
|
+ groups.stream().flatMap(g -> g.getOffsets().keySet().stream())
|
|
|
|
+ .collect(Collectors.toSet());
|
|
|
|
+
|
|
|
|
+ return topicPartitionsEndOffsets(cluster, topicPartitions).map(offsets ->
|
|
|
|
+ groups.stream().map(g -> {
|
|
|
|
+ Map<TopicPartition, Long> offsetsCopy = new HashMap<>(offsets);
|
|
|
|
+ offsetsCopy.keySet().retainAll(g.getOffsets().keySet());
|
|
|
|
+ return g.toBuilder().endOffsets(offsetsCopy).build();
|
|
|
|
+ }).collect(Collectors.toList())
|
|
|
|
+ );
|
|
|
|
+ });
|
|
}
|
|
}
|
|
|
|
|
|
public Mono<List<ConsumerGroupDTO>> getConsumerGroups(KafkaCluster cluster) {
|
|
public Mono<List<ConsumerGroupDTO>> getConsumerGroups(KafkaCluster cluster) {
|
|
@@ -88,11 +98,12 @@ public class ConsumerGroupService {
|
|
);
|
|
);
|
|
}
|
|
}
|
|
|
|
|
|
- private Map<TopicPartition, Long> topicPartitionsEndOffsets(
|
|
|
|
|
|
+ private Mono<Map<TopicPartition, Long>> topicPartitionsEndOffsets(
|
|
KafkaCluster cluster, Collection<TopicPartition> topicPartitions) {
|
|
KafkaCluster cluster, Collection<TopicPartition> topicPartitions) {
|
|
- try (KafkaConsumer<Bytes, Bytes> consumer = createConsumer(cluster)) {
|
|
|
|
- return consumer.endOffsets(topicPartitions);
|
|
|
|
- }
|
|
|
|
|
|
+
|
|
|
|
+ return adminClientService.get(cluster).flatMap(ac ->
|
|
|
|
+ ac.listOffsets(topicPartitions, OffsetSpec.latest())
|
|
|
|
+ );
|
|
}
|
|
}
|
|
|
|
|
|
public Mono<ConsumerGroupDetailsDTO> getConsumerGroupDetail(KafkaCluster cluster,
|
|
public Mono<ConsumerGroupDetailsDTO> getConsumerGroupDetail(KafkaCluster cluster,
|