Sfoglia il codice sorgente

Broker partitions skew added to API (#3566)

Ilya Kuramshin 2 anni fa
parent
commit
8ecb719e9b

+ 16 - 3
kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/InternalBroker.java

@@ -1,6 +1,7 @@
 package com.provectus.kafka.ui.model;
 
 import java.math.BigDecimal;
+import javax.annotation.Nullable;
 import lombok.Data;
 import org.apache.kafka.common.Node;
 
@@ -10,15 +11,27 @@ public class InternalBroker {
   private final Integer id;
   private final String host;
   private final Integer port;
-  private final BigDecimal bytesInPerSec;
-  private final BigDecimal bytesOutPerSec;
+  private final @Nullable BigDecimal bytesInPerSec;
+  private final @Nullable BigDecimal bytesOutPerSec;
+  private final @Nullable Integer partitionsLeader;
+  private final @Nullable Integer partitions;
+  private final @Nullable Integer inSyncPartitions;
+  private final @Nullable BigDecimal leadersSkew;
+  private final @Nullable BigDecimal partitionsSkew;
 
-  public InternalBroker(Node node, Statistics statistics) {
+  public InternalBroker(Node node,
+                        PartitionDistributionStats partitionDistribution,
+                        Statistics statistics) {
     this.id = node.id();
     this.host = node.host();
     this.port = node.port();
     this.bytesInPerSec = statistics.getMetrics().getBrokerBytesInPerSec().get(node.id());
     this.bytesOutPerSec = statistics.getMetrics().getBrokerBytesOutPerSec().get(node.id());
+    this.partitionsLeader = partitionDistribution.getPartitionLeaders().get(node);
+    this.partitions = partitionDistribution.getPartitionsCount().get(node);
+    this.inSyncPartitions = partitionDistribution.getInSyncPartitions().get(node);
+    this.leadersSkew = partitionDistribution.leadersSkew(node);
+    this.partitionsSkew = partitionDistribution.partitionsSkew(node);
   }
 
 }

+ 93 - 0
kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/PartitionDistributionStats.java

@@ -0,0 +1,93 @@
+package com.provectus.kafka.ui.model;
+
+import java.math.BigDecimal;
+import java.math.MathContext;
+import java.util.HashMap;
+import java.util.Map;
+import javax.annotation.Nullable;
+import lombok.AccessLevel;
+import lombok.Getter;
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.clients.admin.TopicDescription;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.TopicPartitionInfo;
+
+@RequiredArgsConstructor(access = AccessLevel.PRIVATE)
+@Getter
+@Slf4j
+public class PartitionDistributionStats {
+
+  // avg skew will show unuseful results on low number of partitions
+  private static final int MIN_PARTITIONS_FOR_SKEW_CALCULATION = 50;
+
+  private static final MathContext ROUNDING_MATH_CTX = new MathContext(3);
+
+  private final Map<Node, Integer> partitionLeaders;
+  private final Map<Node, Integer> partitionsCount;
+  private final Map<Node, Integer> inSyncPartitions;
+  private final double avgLeadersCntPerBroker;
+  private final double avgPartitionsPerBroker;
+  private final boolean skewCanBeCalculated;
+
+  public static PartitionDistributionStats create(Statistics stats) {
+    return create(stats, MIN_PARTITIONS_FOR_SKEW_CALCULATION);
+  }
+
+  static PartitionDistributionStats create(Statistics stats, int minPartitionsForSkewCalculation) {
+    var partitionLeaders = new HashMap<Node, Integer>();
+    var partitionsReplicated = new HashMap<Node, Integer>();
+    var isr = new HashMap<Node, Integer>();
+    int partitionsCnt = 0;
+    for (TopicDescription td : stats.getTopicDescriptions().values()) {
+      for (TopicPartitionInfo tp : td.partitions()) {
+        partitionsCnt++;
+        tp.replicas().forEach(r -> incr(partitionsReplicated, r));
+        tp.isr().forEach(r -> incr(isr, r));
+        if (tp.leader() != null) {
+          incr(partitionLeaders, tp.leader());
+        }
+      }
+    }
+    int nodesWithPartitions = partitionsReplicated.size();
+    int partitionReplications = partitionsReplicated.values().stream().mapToInt(i -> i).sum();
+    var avgPartitionsPerBroker = nodesWithPartitions == 0 ? 0 : ((double) partitionReplications) / nodesWithPartitions;
+
+    int nodesWithLeaders = partitionLeaders.size();
+    int leadersCnt = partitionLeaders.values().stream().mapToInt(i -> i).sum();
+    var avgLeadersCntPerBroker = nodesWithLeaders == 0 ? 0 : ((double) leadersCnt) / nodesWithLeaders;
+
+    return new PartitionDistributionStats(
+        partitionLeaders,
+        partitionsReplicated,
+        isr,
+        avgLeadersCntPerBroker,
+        avgPartitionsPerBroker,
+        partitionsCnt >= minPartitionsForSkewCalculation
+    );
+  }
+
+  private static void incr(Map<Node, Integer> map, Node n) {
+    map.compute(n, (k, c) -> c == null ? 1 : ++c);
+  }
+
+  @Nullable
+  public BigDecimal partitionsSkew(Node node) {
+    return calculateAvgSkew(partitionsCount.get(node), avgPartitionsPerBroker);
+  }
+
+  @Nullable
+  public BigDecimal leadersSkew(Node node) {
+    return calculateAvgSkew(partitionLeaders.get(node), avgLeadersCntPerBroker);
+  }
+
+  // Returns difference (in percents) from average value, null if it can't be calculated
+  @Nullable
+  private BigDecimal calculateAvgSkew(@Nullable Integer value, double avgValue) {
+    if (avgValue == 0 || !skewCanBeCalculated) {
+      return null;
+    }
+    value = value == null ? 0 : value;
+    return new BigDecimal((value - avgValue) / avgValue * 100.0).round(ROUNDING_MATH_CTX);
+  }
+}

+ 4 - 1
kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/BrokerService.java

@@ -10,6 +10,7 @@ import com.provectus.kafka.ui.model.BrokersLogdirsDTO;
 import com.provectus.kafka.ui.model.InternalBroker;
 import com.provectus.kafka.ui.model.InternalBrokerConfig;
 import com.provectus.kafka.ui.model.KafkaCluster;
+import com.provectus.kafka.ui.model.PartitionDistributionStats;
 import com.provectus.kafka.ui.service.metrics.RawMetric;
 import java.util.Collections;
 import java.util.HashMap;
@@ -64,11 +65,13 @@ public class BrokerService {
   }
 
   public Flux<InternalBroker> getBrokers(KafkaCluster cluster) {
+    var stats = statisticsCache.get(cluster);
+    var partitionsDistribution = PartitionDistributionStats.create(stats);
     return adminClientService
         .get(cluster)
         .flatMap(ReactiveAdminClient::describeCluster)
         .map(description -> description.getNodes().stream()
-            .map(node -> new InternalBroker(node, statisticsCache.get(cluster)))
+            .map(node -> new InternalBroker(node, partitionsDistribution, stats))
             .collect(Collectors.toList()))
         .flatMapMany(Flux::fromIterable);
   }

+ 83 - 0
kafka-ui-api/src/test/java/com/provectus/kafka/ui/model/PartitionDistributionStatsTest.java

@@ -0,0 +1,83 @@
+package com.provectus.kafka.ui.model;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import com.provectus.kafka.ui.service.ReactiveAdminClient;
+import java.math.BigDecimal;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import org.apache.kafka.clients.admin.TopicDescription;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.TopicPartitionInfo;
+import org.assertj.core.data.Percentage;
+import org.junit.jupiter.api.Test;
+
+class PartitionDistributionStatsTest {
+
+  @Test
+  void skewCalculatedBasedOnPartitionsCounts() {
+    Node n1 = new Node(1, "n1", 9092);
+    Node n2 = new Node(2, "n2", 9092);
+    Node n3 = new Node(3, "n3", 9092);
+    Node n4 = new Node(4, "n4", 9092);
+
+    var stats = PartitionDistributionStats.create(
+        Statistics.builder()
+            .clusterDescription(
+                new ReactiveAdminClient.ClusterDescription(null, "test", Set.of(n1, n2, n3), null))
+            .topicDescriptions(
+                Map.of(
+                    "t1", new TopicDescription(
+                        "t1", false,
+                        List.of(
+                            new TopicPartitionInfo(0, n1, List.of(n1, n2), List.of(n1, n2)),
+                            new TopicPartitionInfo(1, n2, List.of(n2, n3), List.of(n2, n3))
+                        )
+                    ),
+                    "t2", new TopicDescription(
+                        "t2", false,
+                        List.of(
+                            new TopicPartitionInfo(0, n1, List.of(n1, n2), List.of(n1, n2)),
+                            new TopicPartitionInfo(1, null, List.of(n2, n1), List.of(n1))
+                        )
+                    )
+                )
+            )
+            .build(), 4
+    );
+
+    assertThat(stats.getPartitionLeaders())
+        .containsExactlyInAnyOrderEntriesOf(Map.of(n1, 2, n2, 1));
+    assertThat(stats.getPartitionsCount())
+        .containsExactlyInAnyOrderEntriesOf(Map.of(n1, 3, n2, 4, n3, 1));
+    assertThat(stats.getInSyncPartitions())
+        .containsExactlyInAnyOrderEntriesOf(Map.of(n1, 3, n2, 3, n3, 1));
+
+    // Node(partitions): n1(3), n2(4), n3(1), n4(0)
+    // average partitions cnt = (3+4+1) / 3 = 2.666 (counting only nodes with partitions!)
+    assertThat(stats.getAvgPartitionsPerBroker())
+        .isCloseTo(2.666, Percentage.withPercentage(1));
+
+    assertThat(stats.partitionsSkew(n1))
+        .isCloseTo(BigDecimal.valueOf(12.5), Percentage.withPercentage(1));
+    assertThat(stats.partitionsSkew(n2))
+        .isCloseTo(BigDecimal.valueOf(50), Percentage.withPercentage(1));
+    assertThat(stats.partitionsSkew(n3))
+        .isCloseTo(BigDecimal.valueOf(-62.5), Percentage.withPercentage(1));
+    assertThat(stats.partitionsSkew(n4))
+        .isCloseTo(BigDecimal.valueOf(-100), Percentage.withPercentage(1));
+
+    //  Node(leaders): n1(2), n2(1), n3(0), n4(0)
+    //  average leaders cnt = (2+1) / 2 = 1.5 (counting only nodes with leaders!)
+    assertThat(stats.leadersSkew(n1))
+        .isCloseTo(BigDecimal.valueOf(33.33), Percentage.withPercentage(1));
+    assertThat(stats.leadersSkew(n2))
+        .isCloseTo(BigDecimal.valueOf(-33.33), Percentage.withPercentage(1));
+    assertThat(stats.leadersSkew(n3))
+        .isCloseTo(BigDecimal.valueOf(-100), Percentage.withPercentage(1));
+    assertThat(stats.leadersSkew(n4))
+        .isCloseTo(BigDecimal.valueOf(-100), Percentage.withPercentage(1));
+  }
+
+}

+ 10 - 0
kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml

@@ -2375,6 +2375,16 @@ components:
           type: number
         bytesOutPerSec:
           type: number
+        partitionsLeader:
+          type: integer
+        partitions:
+          type: integer
+        inSyncPartitions:
+          type: integer
+        partitionsSkew:
+          type: number
+        leadersSkew:
+          type: number
       required:
         - id