InternalTopic.java 4.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132
  1. package com.provectus.kafka.ui.model;
  2. import java.math.BigDecimal;
  3. import java.util.List;
  4. import java.util.Map;
  5. import java.util.stream.Collectors;
  6. import javax.annotation.Nullable;
  7. import lombok.Builder;
  8. import lombok.Data;
  9. import org.apache.kafka.clients.admin.ConfigEntry;
  10. import org.apache.kafka.clients.admin.TopicDescription;
  11. import org.apache.kafka.common.TopicPartition;
  12. @Data
  13. @Builder(toBuilder = true)
  14. public class InternalTopic {
  15. // from TopicDescription
  16. private final String name;
  17. private final boolean internal;
  18. private final int replicas;
  19. private final int partitionCount;
  20. private final int inSyncReplicas;
  21. private final int replicationFactor;
  22. private final int underReplicatedPartitions;
  23. private final Map<Integer, InternalPartition> partitions;
  24. // topic configs
  25. private final List<InternalTopicConfig> topicConfigs;
  26. private final CleanupPolicy cleanUpPolicy;
  27. // rates from metrics
  28. private final BigDecimal bytesInPerSec;
  29. private final BigDecimal bytesOutPerSec;
  30. // from log dir data
  31. private final long segmentSize;
  32. private final long segmentCount;
  33. public static InternalTopic from(TopicDescription topicDescription,
  34. List<ConfigEntry> configs,
  35. InternalPartitionsOffsets partitionsOffsets,
  36. Metrics metrics,
  37. InternalLogDirStats logDirInfo,
  38. @Nullable String internalTopicPrefix) {
  39. var topic = InternalTopic.builder();
  40. internalTopicPrefix = internalTopicPrefix == null || internalTopicPrefix.isEmpty()
  41. ? "_"
  42. : internalTopicPrefix;
  43. topic.internal(
  44. topicDescription.isInternal() || topicDescription.name().startsWith(internalTopicPrefix)
  45. );
  46. topic.name(topicDescription.name());
  47. List<InternalPartition> partitions = topicDescription.partitions().stream()
  48. .map(partition -> {
  49. var partitionDto = InternalPartition.builder();
  50. partitionDto.leader(partition.leader() != null ? partition.leader().id() : null);
  51. partitionDto.partition(partition.partition());
  52. partitionDto.inSyncReplicasCount(partition.isr().size());
  53. partitionDto.replicasCount(partition.replicas().size());
  54. List<InternalReplica> replicas = partition.replicas().stream()
  55. .map(r ->
  56. InternalReplica.builder()
  57. .broker(r.id())
  58. .inSync(partition.isr().contains(r))
  59. .leader(partition.leader() != null && partition.leader().id() == r.id())
  60. .build())
  61. .collect(Collectors.toList());
  62. partitionDto.replicas(replicas);
  63. partitionsOffsets.get(topicDescription.name(), partition.partition())
  64. .ifPresent(offsets -> {
  65. partitionDto.offsetMin(offsets.getEarliest());
  66. partitionDto.offsetMax(offsets.getLatest());
  67. });
  68. var segmentStats =
  69. logDirInfo.getPartitionsStats().get(
  70. new TopicPartition(topicDescription.name(), partition.partition()));
  71. if (segmentStats != null) {
  72. partitionDto.segmentCount(segmentStats.getSegmentsCount());
  73. partitionDto.segmentSize(segmentStats.getSegmentSize());
  74. }
  75. return partitionDto.build();
  76. })
  77. .toList();
  78. topic.partitions(partitions.stream().collect(
  79. Collectors.toMap(InternalPartition::getPartition, t -> t)));
  80. var partitionsStats = new PartitionsStats(topicDescription);
  81. topic.replicas(partitionsStats.getReplicasCount());
  82. topic.partitionCount(partitionsStats.getPartitionsCount());
  83. topic.inSyncReplicas(partitionsStats.getInSyncReplicasCount());
  84. topic.underReplicatedPartitions(partitionsStats.getUnderReplicatedPartitionCount());
  85. topic.replicationFactor(
  86. topicDescription.partitions().isEmpty()
  87. ? 0
  88. : topicDescription.partitions().get(0).replicas().size()
  89. );
  90. var segmentStats = logDirInfo.getTopicStats().get(topicDescription.name());
  91. if (segmentStats != null) {
  92. topic.segmentCount(segmentStats.getSegmentsCount());
  93. topic.segmentSize(segmentStats.getSegmentSize());
  94. }
  95. // topic.bytesInPerSec(metrics.getTopicBytesInPerSec().get(topicDescription.name()));
  96. // topic.bytesOutPerSec(metrics.getTopicBytesOutPerSec().get(topicDescription.name()));
  97. topic.topicConfigs(
  98. configs.stream().map(InternalTopicConfig::from).collect(Collectors.toList()));
  99. topic.cleanUpPolicy(
  100. configs.stream()
  101. .filter(config -> config.name().equals("cleanup.policy"))
  102. .findFirst()
  103. .map(ConfigEntry::value)
  104. .map(CleanupPolicy::fromString)
  105. .orElse(CleanupPolicy.UNKNOWN)
  106. );
  107. return topic.build();
  108. }
  109. }