InternalTopic.java 4.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123
  1. package com.provectus.kafka.ui.model;
  2. import java.math.BigDecimal;
  3. import java.util.List;
  4. import java.util.Map;
  5. import java.util.stream.Collectors;
  6. import lombok.Builder;
  7. import lombok.Data;
  8. import org.apache.kafka.clients.admin.ConfigEntry;
  9. import org.apache.kafka.clients.admin.TopicDescription;
  10. import org.apache.kafka.common.TopicPartition;
  11. @Data
  12. @Builder(toBuilder = true)
  13. public class InternalTopic {
  14. // from TopicDescription
  15. private final String name;
  16. private final boolean internal;
  17. private final int replicas;
  18. private final int partitionCount;
  19. private final int inSyncReplicas;
  20. private final int replicationFactor;
  21. private final int underReplicatedPartitions;
  22. private final Map<Integer, InternalPartition> partitions;
  23. // topic configs
  24. private final List<InternalTopicConfig> topicConfigs;
  25. private final CleanupPolicy cleanUpPolicy;
  26. // rates from metrics
  27. private final BigDecimal bytesInPerSec;
  28. private final BigDecimal bytesOutPerSec;
  29. // from log dir data
  30. private final long segmentSize;
  31. private final long segmentCount;
  32. public static InternalTopic from(TopicDescription topicDescription,
  33. List<ConfigEntry> configs,
  34. InternalPartitionsOffsets partitionsOffsets,
  35. Metrics metrics,
  36. InternalLogDirStats logDirInfo) {
  37. var topic = InternalTopic.builder();
  38. topic.internal(topicDescription.isInternal());
  39. topic.name(topicDescription.name());
  40. List<InternalPartition> partitions = topicDescription.partitions().stream()
  41. .map(partition -> {
  42. var partitionDto = InternalPartition.builder();
  43. partitionDto.leader(partition.leader() != null ? partition.leader().id() : null);
  44. partitionDto.partition(partition.partition());
  45. partitionDto.inSyncReplicasCount(partition.isr().size());
  46. partitionDto.replicasCount(partition.replicas().size());
  47. List<InternalReplica> replicas = partition.replicas().stream()
  48. .map(r ->
  49. InternalReplica.builder()
  50. .broker(r.id())
  51. .inSync(partition.isr().contains(r))
  52. .leader(partition.leader() != null && partition.leader().id() == r.id())
  53. .build())
  54. .collect(Collectors.toList());
  55. partitionDto.replicas(replicas);
  56. partitionsOffsets.get(topicDescription.name(), partition.partition())
  57. .ifPresent(offsets -> {
  58. partitionDto.offsetMin(offsets.getEarliest());
  59. partitionDto.offsetMax(offsets.getLatest());
  60. });
  61. var segmentStats =
  62. logDirInfo.getPartitionsStats().get(
  63. new TopicPartition(topicDescription.name(), partition.partition()));
  64. if (segmentStats != null) {
  65. partitionDto.segmentCount(segmentStats.getSegmentsCount());
  66. partitionDto.segmentSize(segmentStats.getSegmentSize());
  67. }
  68. return partitionDto.build();
  69. })
  70. .collect(Collectors.toList());
  71. topic.partitions(partitions.stream().collect(
  72. Collectors.toMap(InternalPartition::getPartition, t -> t)));
  73. var partitionsStats = new PartitionsStats(topicDescription);
  74. topic.replicas(partitionsStats.getReplicasCount());
  75. topic.partitionCount(partitionsStats.getPartitionsCount());
  76. topic.inSyncReplicas(partitionsStats.getInSyncReplicasCount());
  77. topic.underReplicatedPartitions(partitionsStats.getUnderReplicatedPartitionCount());
  78. topic.replicationFactor(
  79. topicDescription.partitions().isEmpty()
  80. ? 0
  81. : topicDescription.partitions().get(0).replicas().size()
  82. );
  83. var segmentStats = logDirInfo.getTopicStats().get(topicDescription.name());
  84. if (segmentStats != null) {
  85. topic.segmentCount(segmentStats.getSegmentsCount());
  86. topic.segmentSize(segmentStats.getSegmentSize());
  87. }
  88. topic.bytesInPerSec(metrics.getBytesInPerSec().get(topicDescription.name()));
  89. topic.bytesOutPerSec(metrics.getBytesOutPerSec().get(topicDescription.name()));
  90. topic.topicConfigs(
  91. configs.stream().map(InternalTopicConfig::from).collect(Collectors.toList()));
  92. topic.cleanUpPolicy(
  93. configs.stream()
  94. .filter(config -> config.name().equals("cleanup.policy"))
  95. .findFirst()
  96. .map(ConfigEntry::value)
  97. .map(CleanupPolicy::fromString)
  98. .orElse(CleanupPolicy.UNKNOWN)
  99. );
  100. return topic.build();
  101. }
  102. }