PartitionDistributionStats.java 3.3 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192
  1. package com.provectus.kafka.ui.model;
  2. import java.math.BigDecimal;
  3. import java.math.RoundingMode;
  4. import java.util.HashMap;
  5. import java.util.Map;
  6. import javax.annotation.Nullable;
  7. import lombok.AccessLevel;
  8. import lombok.Getter;
  9. import lombok.RequiredArgsConstructor;
  10. import lombok.extern.slf4j.Slf4j;
  11. import org.apache.kafka.clients.admin.TopicDescription;
  12. import org.apache.kafka.common.Node;
  13. import org.apache.kafka.common.TopicPartitionInfo;
  14. @RequiredArgsConstructor(access = AccessLevel.PRIVATE)
  15. @Getter
  16. @Slf4j
  17. public class PartitionDistributionStats {
  18. // avg skew will show unuseful results on low number of partitions
  19. private static final int MIN_PARTITIONS_FOR_SKEW_CALCULATION = 50;
  20. private final Map<Node, Integer> partitionLeaders;
  21. private final Map<Node, Integer> partitionsCount;
  22. private final Map<Node, Integer> inSyncPartitions;
  23. private final double avgLeadersCntPerBroker;
  24. private final double avgPartitionsPerBroker;
  25. private final boolean skewCanBeCalculated;
  26. public static PartitionDistributionStats create(Statistics stats) {
  27. return create(stats, MIN_PARTITIONS_FOR_SKEW_CALCULATION);
  28. }
  29. static PartitionDistributionStats create(Statistics stats, int minPartitionsForSkewCalculation) {
  30. var partitionLeaders = new HashMap<Node, Integer>();
  31. var partitionsReplicated = new HashMap<Node, Integer>();
  32. var isr = new HashMap<Node, Integer>();
  33. int partitionsCnt = 0;
  34. for (TopicDescription td : stats.getTopicDescriptions().values()) {
  35. for (TopicPartitionInfo tp : td.partitions()) {
  36. partitionsCnt++;
  37. tp.replicas().forEach(r -> incr(partitionsReplicated, r));
  38. tp.isr().forEach(r -> incr(isr, r));
  39. if (tp.leader() != null) {
  40. incr(partitionLeaders, tp.leader());
  41. }
  42. }
  43. }
  44. int nodesWithPartitions = partitionsReplicated.size();
  45. int partitionReplications = partitionsReplicated.values().stream().mapToInt(i -> i).sum();
  46. var avgPartitionsPerBroker = nodesWithPartitions == 0 ? 0 : ((double) partitionReplications) / nodesWithPartitions;
  47. int nodesWithLeaders = partitionLeaders.size();
  48. int leadersCnt = partitionLeaders.values().stream().mapToInt(i -> i).sum();
  49. var avgLeadersCntPerBroker = nodesWithLeaders == 0 ? 0 : ((double) leadersCnt) / nodesWithLeaders;
  50. return new PartitionDistributionStats(
  51. partitionLeaders,
  52. partitionsReplicated,
  53. isr,
  54. avgLeadersCntPerBroker,
  55. avgPartitionsPerBroker,
  56. partitionsCnt >= minPartitionsForSkewCalculation
  57. );
  58. }
  59. private static void incr(Map<Node, Integer> map, Node n) {
  60. map.compute(n, (k, c) -> c == null ? 1 : ++c);
  61. }
  62. @Nullable
  63. public BigDecimal partitionsSkew(Node node) {
  64. return calculateAvgSkew(partitionsCount.get(node), avgPartitionsPerBroker);
  65. }
  66. @Nullable
  67. public BigDecimal leadersSkew(Node node) {
  68. return calculateAvgSkew(partitionLeaders.get(node), avgLeadersCntPerBroker);
  69. }
  70. // Returns difference (in percents) from average value, null if it can't be calculated
  71. @Nullable
  72. private BigDecimal calculateAvgSkew(@Nullable Integer value, double avgValue) {
  73. if (avgValue == 0 || !skewCanBeCalculated) {
  74. return null;
  75. }
  76. value = value == null ? 0 : value;
  77. return new BigDecimal((value - avgValue) / avgValue * 100.0)
  78. .setScale(1, RoundingMode.HALF_UP);
  79. }
  80. }