123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140 |
- package com.provectus.kafka.ui.mapper;
- import com.provectus.kafka.ui.model.BrokerDTO;
- import com.provectus.kafka.ui.model.ConsumerGroupDTO;
- import com.provectus.kafka.ui.model.ConsumerGroupDetailsDTO;
- import com.provectus.kafka.ui.model.ConsumerGroupStateDTO;
- import com.provectus.kafka.ui.model.ConsumerGroupTopicPartitionDTO;
- import com.provectus.kafka.ui.model.InternalConsumerGroup;
- import com.provectus.kafka.ui.model.InternalTopicConsumerGroup;
- import java.util.ArrayList;
- import java.util.HashMap;
- import java.util.Map;
- import java.util.Optional;
- import java.util.stream.Collectors;
- import java.util.stream.Stream;
- import org.apache.kafka.common.Node;
- import org.apache.kafka.common.TopicPartition;
- public class ConsumerGroupMapper {
- private ConsumerGroupMapper() {
- }
- public static ConsumerGroupDTO toDto(InternalConsumerGroup c) {
- return convertToConsumerGroup(c, new ConsumerGroupDTO());
- }
- public static ConsumerGroupDTO toDto(InternalTopicConsumerGroup c) {
- ConsumerGroupDTO consumerGroup = new ConsumerGroupDetailsDTO();
- consumerGroup.setTopics(1); //for ui backward-compatibility, need to rm usage from ui
- consumerGroup.setGroupId(c.getGroupId());
- consumerGroup.setMembers(c.getMembers());
- consumerGroup.setMessagesBehind(c.getMessagesBehind());
- consumerGroup.setSimple(c.isSimple());
- consumerGroup.setPartitionAssignor(c.getPartitionAssignor());
- consumerGroup.setState(mapConsumerGroupState(c.getState()));
- Optional.ofNullable(c.getCoordinator())
- .ifPresent(cd -> consumerGroup.setCoordinator(mapCoordinator(cd)));
- return consumerGroup;
- }
- public static ConsumerGroupDetailsDTO toDetailsDto(InternalConsumerGroup g) {
- ConsumerGroupDetailsDTO details = convertToConsumerGroup(g, new ConsumerGroupDetailsDTO());
- Map<TopicPartition, ConsumerGroupTopicPartitionDTO> partitionMap = new HashMap<>();
- for (Map.Entry<TopicPartition, Long> entry : g.getOffsets().entrySet()) {
- ConsumerGroupTopicPartitionDTO partition = new ConsumerGroupTopicPartitionDTO();
- partition.setTopic(entry.getKey().topic());
- partition.setPartition(entry.getKey().partition());
- partition.setCurrentOffset(entry.getValue());
- final Optional<Long> endOffset = Optional.ofNullable(g.getEndOffsets())
- .map(o -> o.get(entry.getKey()));
- final Long behind = endOffset.map(o -> o - entry.getValue())
- .orElse(0L);
- partition.setEndOffset(endOffset.orElse(0L));
- partition.setMessagesBehind(behind);
- partitionMap.put(entry.getKey(), partition);
- }
- for (InternalConsumerGroup.InternalMember member : g.getMembers()) {
- for (TopicPartition topicPartition : member.getAssignment()) {
- final ConsumerGroupTopicPartitionDTO partition = partitionMap.computeIfAbsent(
- topicPartition,
- tp -> new ConsumerGroupTopicPartitionDTO()
- .topic(tp.topic())
- .partition(tp.partition())
- );
- partition.setHost(member.getHost());
- partition.setConsumerId(member.getConsumerId());
- partitionMap.put(topicPartition, partition);
- }
- }
- details.setPartitions(new ArrayList<>(partitionMap.values()));
- return details;
- }
- private static <T extends ConsumerGroupDTO> T convertToConsumerGroup(
- InternalConsumerGroup c, T consumerGroup) {
- consumerGroup.setGroupId(c.getGroupId());
- consumerGroup.setMembers(c.getMembers().size());
- int numTopics = Stream.concat(
- c.getOffsets().keySet().stream().map(TopicPartition::topic),
- c.getMembers().stream()
- .flatMap(m -> m.getAssignment().stream().map(TopicPartition::topic))
- ).collect(Collectors.toSet()).size();
- Long messagesBehind = null;
- // messagesBehind should be undefined if no committed offsets found for topic
- if (!c.getOffsets().isEmpty()) {
- messagesBehind = c.getOffsets().entrySet().stream()
- .mapToLong(e ->
- Optional.ofNullable(c.getEndOffsets())
- .map(o -> o.get(e.getKey()))
- .map(o -> o - e.getValue())
- .orElse(0L)
- ).sum();
- }
- consumerGroup.setMessagesBehind(messagesBehind);
- consumerGroup.setTopics(numTopics);
- consumerGroup.setSimple(c.isSimple());
- Optional.ofNullable(c.getState())
- .ifPresent(s -> consumerGroup.setState(mapConsumerGroupState(s)));
- Optional.ofNullable(c.getCoordinator())
- .ifPresent(cd -> consumerGroup.setCoordinator(mapCoordinator(cd)));
- consumerGroup.setPartitionAssignor(c.getPartitionAssignor());
- return consumerGroup;
- }
- private static BrokerDTO mapCoordinator(Node node) {
- return new BrokerDTO().host(node.host()).id(node.id()).port(node.port());
- }
- private static ConsumerGroupStateDTO mapConsumerGroupState(
- org.apache.kafka.common.ConsumerGroupState state) {
- switch (state) {
- case DEAD:
- return ConsumerGroupStateDTO.DEAD;
- case EMPTY:
- return ConsumerGroupStateDTO.EMPTY;
- case STABLE:
- return ConsumerGroupStateDTO.STABLE;
- case PREPARING_REBALANCE:
- return ConsumerGroupStateDTO.PREPARING_REBALANCE;
- case COMPLETING_REBALANCE:
- return ConsumerGroupStateDTO.COMPLETING_REBALANCE;
- default:
- return ConsumerGroupStateDTO.UNKNOWN;
- }
- }
- }
|