|
@@ -1,26 +1,32 @@
|
|
|
package com.provectus.kafka.ui.model;
|
|
|
|
|
|
import java.math.BigDecimal;
|
|
|
+import java.math.MathContext;
|
|
|
import java.util.HashMap;
|
|
|
import java.util.Map;
|
|
|
import javax.annotation.Nullable;
|
|
|
import lombok.AccessLevel;
|
|
|
import lombok.Getter;
|
|
|
import lombok.RequiredArgsConstructor;
|
|
|
+import lombok.extern.slf4j.Slf4j;
|
|
|
import org.apache.kafka.clients.admin.TopicDescription;
|
|
|
import org.apache.kafka.common.Node;
|
|
|
import org.apache.kafka.common.TopicPartitionInfo;
|
|
|
|
|
|
@RequiredArgsConstructor(access = AccessLevel.PRIVATE)
|
|
|
@Getter
|
|
|
+@Slf4j
|
|
|
public class PartitionDistributionStats {
|
|
|
|
|
|
// avg skew will show unuseful results on low number of partitions
|
|
|
private static final int MIN_PARTITIONS_FOR_SKEW_CALCULATION = 50;
|
|
|
|
|
|
+ private static final MathContext ROUNDING_MATH_CTX = new MathContext(3);
|
|
|
+
|
|
|
private final Map<Node, Integer> partitionLeaders;
|
|
|
private final Map<Node, Integer> partitionsCount;
|
|
|
private final Map<Node, Integer> inSyncPartitions;
|
|
|
+ private final double avgLeadersCntPerBroker;
|
|
|
private final double avgPartitionsPerBroker;
|
|
|
private final boolean skewCanBeCalculated;
|
|
|
|
|
@@ -32,10 +38,8 @@ public class PartitionDistributionStats {
|
|
|
var partitionLeaders = new HashMap<Node, Integer>();
|
|
|
var partitionsReplicated = new HashMap<Node, Integer>();
|
|
|
var isr = new HashMap<Node, Integer>();
|
|
|
- int partitionsCnt = 0;
|
|
|
for (TopicDescription td : stats.getTopicDescriptions().values()) {
|
|
|
for (TopicPartitionInfo tp : td.partitions()) {
|
|
|
- partitionsCnt++;
|
|
|
tp.replicas().forEach(r -> incr(partitionsReplicated, r));
|
|
|
tp.isr().forEach(r -> incr(isr, r));
|
|
|
if (tp.leader() != null) {
|
|
@@ -43,14 +47,21 @@ public class PartitionDistributionStats {
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
- int nodesCount = stats.getClusterDescription().getNodes().size();
|
|
|
- double avgPartitionsPerBroker = nodesCount == 0 ? 0 : ((double) partitionsCnt) / nodesCount;
|
|
|
+ int nodesWithPartitions = partitionsReplicated.size();
|
|
|
+ int partitionReplications = partitionsReplicated.values().stream().mapToInt(i -> i).sum();
|
|
|
+ var avgPartitionsPerBroker = nodesWithPartitions == 0 ? 0 : ((double) partitionReplications) / nodesWithPartitions;
|
|
|
+
|
|
|
+ int nodesWithLeaders = partitionLeaders.size();
|
|
|
+ int leadersCnt = partitionLeaders.values().stream().mapToInt(i -> i).sum();
|
|
|
+ var avgLeadersCntPerBroker = nodesWithLeaders == 0 ? 0 : ((double) leadersCnt) / nodesWithLeaders;
|
|
|
+
|
|
|
return new PartitionDistributionStats(
|
|
|
partitionLeaders,
|
|
|
partitionsReplicated,
|
|
|
isr,
|
|
|
+ avgLeadersCntPerBroker,
|
|
|
avgPartitionsPerBroker,
|
|
|
- partitionsCnt >= minPartitionsForSkewCalculation
|
|
|
+ partitionReplications >= minPartitionsForSkewCalculation
|
|
|
);
|
|
|
}
|
|
|
|
|
@@ -65,7 +76,7 @@ public class PartitionDistributionStats {
|
|
|
|
|
|
@Nullable
|
|
|
public BigDecimal leadersSkew(Node node) {
|
|
|
- return calculateAvgSkew(partitionLeaders.get(node), avgPartitionsPerBroker);
|
|
|
+ return calculateAvgSkew(partitionLeaders.get(node), avgLeadersCntPerBroker);
|
|
|
}
|
|
|
|
|
|
// Returns difference (in percents) from average value, null if it can't be calculated
|
|
@@ -75,6 +86,6 @@ public class PartitionDistributionStats {
|
|
|
return null;
|
|
|
}
|
|
|
value = value == null ? 0 : value;
|
|
|
- return new BigDecimal((value - avgValue) / avgValue * 100.0);
|
|
|
+ return new BigDecimal((value - avgValue) / avgValue * 100.0).round(ROUNDING_MATH_CTX);
|
|
|
}
|
|
|
}
|