package com.provectus.kafka.ui.mapper; import com.provectus.kafka.ui.model.BrokerDTO; import com.provectus.kafka.ui.model.ConsumerGroupDTO; import com.provectus.kafka.ui.model.ConsumerGroupDetailsDTO; import com.provectus.kafka.ui.model.ConsumerGroupStateDTO; import com.provectus.kafka.ui.model.ConsumerGroupTopicPartitionDTO; import com.provectus.kafka.ui.model.InternalConsumerGroup; import com.provectus.kafka.ui.model.InternalTopicConsumerGroup; import java.util.ArrayList; import java.util.HashMap; import java.util.Map; import java.util.Optional; import java.util.stream.Collectors; import java.util.stream.Stream; import org.apache.kafka.common.Node; import org.apache.kafka.common.TopicPartition; public class ConsumerGroupMapper { private ConsumerGroupMapper() { } public static ConsumerGroupDTO toDto(InternalConsumerGroup c) { return convertToConsumerGroup(c, new ConsumerGroupDTO()); } public static ConsumerGroupDTO toDto(InternalTopicConsumerGroup c) { ConsumerGroupDTO consumerGroup = new ConsumerGroupDetailsDTO(); consumerGroup.setTopics(1); //for ui backward-compatibility, need to rm usage from ui consumerGroup.setGroupId(c.getGroupId()); consumerGroup.setMembers(c.getMembers()); consumerGroup.setMessagesBehind(c.getMessagesBehind()); consumerGroup.setSimple(c.isSimple()); consumerGroup.setPartitionAssignor(c.getPartitionAssignor()); consumerGroup.setState(mapConsumerGroupState(c.getState())); Optional.ofNullable(c.getCoordinator()) .ifPresent(cd -> consumerGroup.setCoordinator(mapCoordinator(cd))); return consumerGroup; } public static ConsumerGroupDetailsDTO toDetailsDto(InternalConsumerGroup g) { ConsumerGroupDetailsDTO details = convertToConsumerGroup(g, new ConsumerGroupDetailsDTO()); Map partitionMap = new HashMap<>(); for (Map.Entry entry : g.getOffsets().entrySet()) { ConsumerGroupTopicPartitionDTO partition = new ConsumerGroupTopicPartitionDTO(); partition.setTopic(entry.getKey().topic()); partition.setPartition(entry.getKey().partition()); partition.setCurrentOffset(entry.getValue()); final Optional endOffset = Optional.ofNullable(g.getEndOffsets()) .map(o -> o.get(entry.getKey())); final Long behind = endOffset.map(o -> o - entry.getValue()) .orElse(0L); partition.setEndOffset(endOffset.orElse(0L)); partition.setMessagesBehind(behind); partitionMap.put(entry.getKey(), partition); } for (InternalConsumerGroup.InternalMember member : g.getMembers()) { for (TopicPartition topicPartition : member.getAssignment()) { final ConsumerGroupTopicPartitionDTO partition = partitionMap.computeIfAbsent( topicPartition, tp -> new ConsumerGroupTopicPartitionDTO() .topic(tp.topic()) .partition(tp.partition()) ); partition.setHost(member.getHost()); partition.setConsumerId(member.getConsumerId()); partitionMap.put(topicPartition, partition); } } details.setPartitions(new ArrayList<>(partitionMap.values())); return details; } private static T convertToConsumerGroup( InternalConsumerGroup c, T consumerGroup) { consumerGroup.setGroupId(c.getGroupId()); consumerGroup.setMembers(c.getMembers().size()); int numTopics = Stream.concat( c.getOffsets().keySet().stream().map(TopicPartition::topic), c.getMembers().stream() .flatMap(m -> m.getAssignment().stream().map(TopicPartition::topic)) ).collect(Collectors.toSet()).size(); Long messagesBehind = null; // messagesBehind should be undefined if no committed offsets found for topic if (!c.getOffsets().isEmpty()) { messagesBehind = c.getOffsets().entrySet().stream() .mapToLong(e -> Optional.ofNullable(c.getEndOffsets()) .map(o -> o.get(e.getKey())) .map(o -> o - e.getValue()) .orElse(0L) ).sum(); } consumerGroup.setMessagesBehind(messagesBehind); consumerGroup.setTopics(numTopics); consumerGroup.setSimple(c.isSimple()); Optional.ofNullable(c.getState()) .ifPresent(s -> consumerGroup.setState(mapConsumerGroupState(s))); Optional.ofNullable(c.getCoordinator()) .ifPresent(cd -> consumerGroup.setCoordinator(mapCoordinator(cd))); consumerGroup.setPartitionAssignor(c.getPartitionAssignor()); return consumerGroup; } private static BrokerDTO mapCoordinator(Node node) { return new BrokerDTO().host(node.host()).id(node.id()).port(node.port()); } private static ConsumerGroupStateDTO mapConsumerGroupState( org.apache.kafka.common.ConsumerGroupState state) { switch (state) { case DEAD: return ConsumerGroupStateDTO.DEAD; case EMPTY: return ConsumerGroupStateDTO.EMPTY; case STABLE: return ConsumerGroupStateDTO.STABLE; case PREPARING_REBALANCE: return ConsumerGroupStateDTO.PREPARING_REBALANCE; case COMPLETING_REBALANCE: return ConsumerGroupStateDTO.COMPLETING_REBALANCE; default: return ConsumerGroupStateDTO.UNKNOWN; } } }