浏览代码

broker metrics endpoint

Zhenya Taran 5 年之前
父节点
当前提交
475245c3d5

+ 0 - 1
kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/model/KafkaCluster.java

@@ -13,7 +13,6 @@ import java.util.stream.Collectors;
 @FieldDefaults(level = AccessLevel.PRIVATE)
 public class KafkaCluster {
 
-    String id;
     String name;
     String jmxHost;
     String jmxPort;

+ 8 - 0
kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/model/MetricsConstants.java

@@ -2,11 +2,19 @@ package com.provectus.kafka.ui.cluster.model;
 
 public final class MetricsConstants {
 
+
     private MetricsConstants() {}
 
+    public static final String CLUSTER_ID = "ClusterId";
     public static final String BYTES_IN_PER_SEC = "BytesInPerSec";
     public static final String BYTES_OUT_PER_SEC = "BytesOutPerSec";
     public static final String BROKERS_COUNT = "BrokersCount";
     public static final String TOPIC_COUNT = "TopicCount";
     public static final String PARTITIONS_COUNT = "PartitionsCount";
+    public static final String ZOOKEEPER_STATUS = "ZookeeperStatus";
+    public static final String ACTIVE_CONTROLLER_COUNT = "ActiveControllerCount";
+    public static final String ONLINE_PARTITION_COUNT = "OnlinePartitionCount";
+    public static final String OFFLINE_PARTITION_COUNT = "OfflinePartitionCount";
+    public static final String UNDER_REPLICATED_PARTITIONS = "UnderReplicatedPartitions";
+
 }

+ 23 - 2
kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/service/ClusterService.java

@@ -4,6 +4,7 @@ import com.provectus.kafka.ui.cluster.config.ClustersProperties;
 import com.provectus.kafka.ui.cluster.mapper.ClusterMapper;
 import com.provectus.kafka.ui.cluster.model.KafkaCluster;
 import com.provectus.kafka.ui.cluster.model.MetricsConstants;
+import com.provectus.kafka.ui.model.BrokerMetrics;
 import com.provectus.kafka.ui.model.Cluster;
 import lombok.RequiredArgsConstructor;
 import org.mapstruct.factory.Mappers;
@@ -17,6 +18,8 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.stream.Collectors;
 
+import static com.provectus.kafka.ui.cluster.model.MetricsConstants.CLUSTER_ID;
+
 @Service
 @RequiredArgsConstructor
 public class ClusterService {
@@ -39,6 +42,7 @@ public class ClusterService {
                 .stream()
                 .map(kafkaCluster -> {
                     Cluster cluster = clusterMapper.toOpenApiCluster(kafkaCluster);
+                    cluster.setId(kafkaCluster.getMetric(CLUSTER_ID));
                     cluster.setBrokerCount(intValueOfOrNull(kafkaCluster.getMetric(MetricsConstants.BROKERS_COUNT)));
                     cluster.setTopicCount(intValueOfOrNull(kafkaCluster.getMetric(MetricsConstants.TOPIC_COUNT)));
                     cluster.setBytesInPerSec(intValueOfOrNull(kafkaCluster.getMetric(MetricsConstants.BYTES_IN_PER_SEC)));
@@ -51,6 +55,25 @@ public class ClusterService {
         return Mono.just(ResponseEntity.ok(Flux.fromIterable(clusters)));
     }
 
+    public Mono<ResponseEntity<BrokerMetrics>> getBrokerMetrics(String clusterId) {
+        KafkaCluster cluster = kafkaClusters.stream()
+                .filter(cltr -> cltr.getMetricsMap().get(CLUSTER_ID).equals(clusterId))
+                .findFirst()
+                .orElseThrow();
+
+        BrokerMetrics brokerMetrics = new BrokerMetrics();
+        brokerMetrics.setClusterId(cluster.getMetricsMap().get(CLUSTER_ID));
+        brokerMetrics.setBrokerCount(intValueOfOrNull(cluster.getMetric(MetricsConstants.BROKERS_COUNT)));
+        brokerMetrics.setBytesInPerSec(intValueOfOrNull(cluster.getMetric(MetricsConstants.BYTES_IN_PER_SEC)));
+        brokerMetrics.setZookeeperStatus(intValueOfOrNull(cluster.getMetric(MetricsConstants.ZOOKEEPER_STATUS)));
+        brokerMetrics.setActiveControllers(intValueOfOrNull(cluster.getMetric(MetricsConstants.ACTIVE_CONTROLLER_COUNT)));
+        brokerMetrics.setOnlinePartitionCount(intValueOfOrNull(cluster.getMetric(MetricsConstants.ONLINE_PARTITION_COUNT)));
+        brokerMetrics.setOfflinePartitionCount(intValueOfOrNull(cluster.getMetric(MetricsConstants.OFFLINE_PARTITION_COUNT)));
+        brokerMetrics.setUnderReplicatedPartitionCount(intValueOfOrNull(cluster.getMetric(MetricsConstants.UNDER_REPLICATED_PARTITIONS)));
+
+        return Mono.just(ResponseEntity.ok(brokerMetrics));
+    }
+
     public List<KafkaCluster> getKafkaClusters() {
         return kafkaClusters;
     }
@@ -62,6 +85,4 @@ public class ClusterService {
             return null;
         }
     }
-
-
 }

+ 6 - 1
kafka-ui-api/src/main/java/com/provectus/kafka/ui/jmx/JmxConstants.java

@@ -11,8 +11,13 @@ public final class JmxConstants {
     private JmxConstants() {}
 
     public static final Map<MBeanInfo, String> mbeanToAttributeMap = Map.ofEntries(
+            entry(MBeanInfo.of("kafka.server:type=KafkaServer,name=ClusterId", "Value"), MetricsConstants.CLUSTER_ID),
             entry(MBeanInfo.of("kafka.server:type=BrokerTopicMetrics,name=BytesInPerSec", "Count"), MetricsConstants.BYTES_IN_PER_SEC),
-            entry(MBeanInfo.of("kafka.server:type=BrokerTopicMetrics,name=BytesOutPerSec", "Count"), MetricsConstants.BYTES_OUT_PER_SEC)
+            entry(MBeanInfo.of("kafka.server:type=BrokerTopicMetrics,name=BytesOutPerSec", "Count"), MetricsConstants.BYTES_OUT_PER_SEC),
+            entry(MBeanInfo.of("kafka.controller:type=KafkaController,name=ActiveControllerCount", "Value"), MetricsConstants.ACTIVE_CONTROLLER_COUNT),
+            entry(MBeanInfo.of("kafka.controller:type=KafkaController,name=GlobalPartitionCount", "Value"), MetricsConstants.ONLINE_PARTITION_COUNT),
+            entry(MBeanInfo.of("kafka.controller:type=KafkaController,name=OfflinePartitionsCount", "Value"), MetricsConstants.OFFLINE_PARTITION_COUNT),
+            entry(MBeanInfo.of("kafka.server:type=ReplicaManager,name=UnderReplicatedPartitions", "Value"), MetricsConstants.UNDER_REPLICATED_PARTITIONS)
     );
 
 }

+ 10 - 2
kafka-ui-api/src/main/java/com/provectus/kafka/ui/kafka/KafkaService.java

@@ -11,6 +11,8 @@ import org.springframework.stereotype.Service;
 import java.util.Properties;
 import java.util.Set;
 
+import static com.provectus.kafka.ui.cluster.model.MetricsConstants.ZOOKEEPER_STATUS;
+
 @Service
 @RequiredArgsConstructor
 public class KafkaService {
@@ -43,7 +45,13 @@ public class KafkaService {
     public static void isZookeeperRunning(KafkaCluster kafkaCluster){
         //Because kafka connector waits for 2 minutes with retries before telling that there is no connection
         //ZKClient is used to not wait 2 minutes for response. If there is no connection, exception will be thrown
-        ZkClient zkClient = new ZkClient(kafkaCluster.getZookeeper(), 1000);
-        zkClient.close();
+        try {
+            ZkClient zkClient = new ZkClient(kafkaCluster.getZookeeper(), 1000);
+            kafkaCluster.putMetric(ZOOKEEPER_STATUS, "1");
+            zkClient.close();
+        } catch (Exception e) {
+            kafkaCluster.putMetric(ZOOKEEPER_STATUS, "0");
+            throw e;
+        }
     }
 }

+ 7 - 5
kafka-ui-api/src/main/java/com/provectus/kafka/ui/rest/MetricsRestController.java

@@ -2,6 +2,7 @@ package com.provectus.kafka.ui.rest;
 
 import com.provectus.kafka.ui.api.ClustersApi;
 import com.provectus.kafka.ui.cluster.service.ClusterService;
+import com.provectus.kafka.ui.model.BrokerMetrics;
 import com.provectus.kafka.ui.model.Cluster;
 import lombok.RequiredArgsConstructor;
 import org.springframework.http.ResponseEntity;
@@ -22,11 +23,6 @@ public class MetricsRestController implements ClustersApi {
 
     }
 
-    @GetMapping("/{clusterId}/metrics/broker")
-    public void getBrokerMetrics(@PathVariable("clusterId") String clusterId) {
-
-    }
-
     @GetMapping("/{clusterId}/topics")
     public void getTopics(@PathVariable("clusterId") String clusterId) {
 
@@ -47,6 +43,12 @@ public class MetricsRestController implements ClustersApi {
                          @PathVariable("topicId") String topicId) {
     }
 
+    @Override
+    @GetMapping("/{clusterId}/metrics/broker")
+    public Mono<ResponseEntity<BrokerMetrics>> getBrokersMetrics(@PathVariable String clusterId, ServerWebExchange exchange) {
+        return clusterService.getBrokerMetrics(clusterId);
+    }
+
     @Override
     @GetMapping
     public Mono<ResponseEntity<Flux<Cluster>>> getClusters(ServerWebExchange exchange) {

+ 0 - 2
kafka-ui-api/src/main/resources/application-local.yml

@@ -1,14 +1,12 @@
 kafka:
   clusters:
     -
-      id: wrYGf-csNgiGdK7B_ADF7Z
       name: local
       bootstrapServers: localhost:29091
       jmxHost: localhost
       jmxPort: 9997
       zookeeper: localhost:2181
     -
-      id: dMMQx-WRh77BKYas_g2ZTz
       name: secondLocal
       bootstrapServers: localhost:29092
       jmxHost: localhost

+ 61 - 5
kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml

@@ -29,14 +29,30 @@ paths:
                 type: array
                 items:
                   $ref: '#/components/schemas/Cluster'
-        401:
-          description: Unauthorized
-        403:
-          description: Forbidden
+
+  /clusters/{clusterId}/metrics/broker:
+    get:
+      tags:
+        - /clusters
+      summary: getBrokersMetrics
+      operationId: getBrokersMetrics
+      parameters:
+        - name: clusterId
+          in: path
+          description: clusterId
+          required: true
+          schema:
+            type: string
+      responses:
+        200:
+          description: OK
+          content:
+            application/json:
+              schema:
+                $ref: '#/components/schemas/BrokerMetrics'
 
 components:
   schemas:
-
     Cluster:
       type: object
       properties:
@@ -68,3 +84,43 @@ components:
       enum:
         - online
         - offline
+
+    BrokerMetrics:
+      type: object
+      properties:
+        clusterId:
+          type: string
+        bytesInPerSec:
+          type: integer
+        brokerCount:
+          type: integer
+        zookeeperStatus:
+          type: integer
+        activeControllers:
+          type: integer
+        uncleanLeaderElectionCount:
+          type: integer
+        networkPoolUsage:
+          type: number
+        requestPoolUsage:
+          type: number
+        onlinePartitionCount:
+          type: integer
+        underReplicatedPartitionCount:
+          type: integer
+        offlinePartitionCount:
+          type: integer
+        diskUsage:
+          $ref: '#/components/schemas/DiskUsage'
+        diskUsageDistribution:
+          type: string
+      required:
+        - id
+
+    DiskUsage:
+      type: object
+      properties:
+        brokerId:
+          type: number
+        segmentSize:
+          type: number