PartitionDistributionStats.java 3.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899
  1. package com.provectus.kafka.ui.model;
  2. import com.provectus.kafka.ui.service.metrics.scrape.ScrapedClusterState;
  3. import java.math.BigDecimal;
  4. import java.math.RoundingMode;
  5. import java.util.HashMap;
  6. import java.util.List;
  7. import java.util.Map;
  8. import javax.annotation.Nullable;
  9. import lombok.AccessLevel;
  10. import lombok.Getter;
  11. import lombok.RequiredArgsConstructor;
  12. import lombok.extern.slf4j.Slf4j;
  13. import org.apache.commons.lang3.mutable.MutableInt;
  14. import org.apache.kafka.clients.admin.TopicDescription;
  15. import org.apache.kafka.common.Node;
  16. import org.apache.kafka.common.TopicPartitionInfo;
  17. @RequiredArgsConstructor(access = AccessLevel.PRIVATE)
  18. @Getter
  19. @Slf4j
  20. public class PartitionDistributionStats {
  21. // avg skew will show unuseful results on low number of partitions
  22. private static final int MIN_PARTITIONS_FOR_SKEW_CALCULATION = 50;
  23. private final Map<Node, Integer> partitionLeaders;
  24. private final Map<Node, Integer> partitionsCount;
  25. private final Map<Node, Integer> inSyncPartitions;
  26. private final double avgLeadersCntPerBroker;
  27. private final double avgPartitionsPerBroker;
  28. private final boolean skewCanBeCalculated;
  29. public static PartitionDistributionStats create(Statistics stats) {
  30. return create(
  31. stats.topicDescriptions().toList(),
  32. MIN_PARTITIONS_FOR_SKEW_CALCULATION
  33. );
  34. }
  35. static PartitionDistributionStats create(List<TopicDescription> topicDescriptions,
  36. int minPartitionsForSkewCalculation) {
  37. var partitionLeaders = new HashMap<Node, Integer>();
  38. var partitionsReplicated = new HashMap<Node, Integer>();
  39. var isr = new HashMap<Node, Integer>();
  40. int partitionsCnt = 0;
  41. for (TopicDescription td : topicDescriptions) {
  42. for (TopicPartitionInfo tp : td.partitions()) {
  43. partitionsCnt++;
  44. tp.replicas().forEach(r -> incr(partitionsReplicated, r));
  45. tp.isr().forEach(r -> incr(isr, r));
  46. if (tp.leader() != null) {
  47. incr(partitionLeaders, tp.leader());
  48. }
  49. }
  50. }
  51. int nodesWithPartitions = partitionsReplicated.size();
  52. int partitionReplications = partitionsReplicated.values().stream().mapToInt(i -> i).sum();
  53. var avgPartitionsPerBroker = nodesWithPartitions == 0 ? 0 : ((double) partitionReplications) / nodesWithPartitions;
  54. int nodesWithLeaders = partitionLeaders.size();
  55. int leadersCnt = partitionLeaders.values().stream().mapToInt(i -> i).sum();
  56. var avgLeadersCntPerBroker = nodesWithLeaders == 0 ? 0 : ((double) leadersCnt) / nodesWithLeaders;
  57. return new PartitionDistributionStats(
  58. partitionLeaders,
  59. partitionsReplicated,
  60. isr,
  61. avgLeadersCntPerBroker,
  62. avgPartitionsPerBroker,
  63. partitionsCnt >= minPartitionsForSkewCalculation
  64. );
  65. }
  66. private static void incr(Map<Node, Integer> map, Node n) {
  67. map.compute(n, (k, c) -> c == null ? 1 : ++c);
  68. }
  69. @Nullable
  70. public BigDecimal partitionsSkew(Node node) {
  71. return calculateAvgSkew(partitionsCount.get(node), avgPartitionsPerBroker);
  72. }
  73. @Nullable
  74. public BigDecimal leadersSkew(Node node) {
  75. return calculateAvgSkew(partitionLeaders.get(node), avgLeadersCntPerBroker);
  76. }
  77. // Returns difference (in percents) from average value, null if it can't be calculated
  78. @Nullable
  79. private BigDecimal calculateAvgSkew(@Nullable Integer value, double avgValue) {
  80. if (avgValue == 0 || !skewCanBeCalculated) {
  81. return null;
  82. }
  83. value = value == null ? 0 : value;
  84. return new BigDecimal((value - avgValue) / avgValue * 100.0)
  85. .setScale(1, RoundingMode.HALF_UP);
  86. }
  87. }