DescribeLogDirsMapper.java 2.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566
  1. package com.provectus.kafka.ui.mapper;
  2. import com.provectus.kafka.ui.model.BrokerTopicLogdirsDTO;
  3. import com.provectus.kafka.ui.model.BrokerTopicPartitionLogdirDTO;
  4. import com.provectus.kafka.ui.model.BrokersLogdirsDTO;
  5. import java.util.Collection;
  6. import java.util.List;
  7. import java.util.Map;
  8. import java.util.stream.Collectors;
  9. import org.apache.kafka.common.TopicPartition;
  10. import org.apache.kafka.common.protocol.Errors;
  11. import org.apache.kafka.common.requests.DescribeLogDirsResponse;
  12. import org.springframework.stereotype.Component;
  13. @Component
  14. public class DescribeLogDirsMapper {
  15. public List<BrokersLogdirsDTO> toBrokerLogDirsList(
  16. Map<Integer, Map<String, DescribeLogDirsResponse.LogDirInfo>> logDirsInfo) {
  17. return logDirsInfo.entrySet().stream().map(
  18. mapEntry -> mapEntry.getValue().entrySet().stream()
  19. .map(e -> toBrokerLogDirs(mapEntry.getKey(), e.getKey(), e.getValue()))
  20. .toList()
  21. ).flatMap(Collection::stream).collect(Collectors.toList());
  22. }
  23. private BrokersLogdirsDTO toBrokerLogDirs(Integer broker, String dirName,
  24. DescribeLogDirsResponse.LogDirInfo logDirInfo) {
  25. BrokersLogdirsDTO result = new BrokersLogdirsDTO();
  26. result.setName(dirName);
  27. if (logDirInfo.error != null && logDirInfo.error != Errors.NONE) {
  28. result.setError(logDirInfo.error.message());
  29. }
  30. var topics = logDirInfo.replicaInfos.entrySet().stream()
  31. .collect(Collectors.groupingBy(e -> e.getKey().topic())).entrySet().stream()
  32. .map(e -> toTopicLogDirs(broker, e.getKey(), e.getValue()))
  33. .toList();
  34. result.setTopics(topics);
  35. return result;
  36. }
  37. private BrokerTopicLogdirsDTO toTopicLogDirs(Integer broker, String name,
  38. List<Map.Entry<TopicPartition,
  39. DescribeLogDirsResponse.ReplicaInfo>> partitions) {
  40. BrokerTopicLogdirsDTO topic = new BrokerTopicLogdirsDTO();
  41. topic.setName(name);
  42. topic.setPartitions(
  43. partitions.stream().map(
  44. e -> topicPartitionLogDir(
  45. broker, e.getKey().partition(), e.getValue())).toList()
  46. );
  47. return topic;
  48. }
  49. private BrokerTopicPartitionLogdirDTO topicPartitionLogDir(Integer broker, Integer partition,
  50. DescribeLogDirsResponse.ReplicaInfo
  51. replicaInfo) {
  52. BrokerTopicPartitionLogdirDTO logDir = new BrokerTopicPartitionLogdirDTO();
  53. logDir.setBroker(broker);
  54. logDir.setPartition(partition);
  55. logDir.setSize(replicaInfo.size);
  56. logDir.setOffsetLag(replicaInfo.offsetLag);
  57. return logDir;
  58. }
  59. }