Browse Source

Changed to calculate brokers and clusters metrics in one place

Roman Nedzvetskiy 5 years ago
parent
commit
40e21c4575

+ 18 - 18
docker/kafka-clusters-only.yaml

@@ -23,26 +23,26 @@ services:
       KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka0:9092,PLAINTEXT_HOST://localhost:29091 #,PLAIN://kafka0:29090
       KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT #,PLAIN:PLAINTEXT
       KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
-      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 2
+      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
       JMX_PORT: 9997
       KAFKA_JMX_OPTS: -Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Djava.rmi.server.hostname=localhost -Dcom.sun.management.jmxremote.rmi.port=9997
 
-  kafka01:
-    image: confluentinc/cp-kafka:5.1.0
-    depends_on:
-      - zookeeper0
-    ports:
-      - 29093:29093
-      - 9999:9999
-    environment:
-      KAFKA_BROKER_ID: 2
-      KAFKA_ZOOKEEPER_CONNECT: zookeeper0:2183
-      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka01:9092,PLAINTEXT_HOST://localhost:29093,PLAIN://kafka0:29090
-      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT,PLAIN:PLAINTEXT
-      KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
-      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 2
-      JMX_PORT: 9997
-      KAFKA_JMX_OPTS: -Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Djava.rmi.server.hostname=localhost -Dcom.sun.management.jmxremote.rmi.port=9997
+  # kafka01:
+  #   image: confluentinc/cp-kafka:5.1.0
+  #   depends_on:
+  #     - zookeeper0
+  #   ports:
+  #     - 29093:29093
+  #     - 9999:9999
+  #   environment:
+  #     KAFKA_BROKER_ID: 2
+  #     KAFKA_ZOOKEEPER_CONNECT: zookeeper0:2183
+  #     KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka01:9092,PLAINTEXT_HOST://localhost:29093,PLAIN://kafka0:29090
+  #     KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT,PLAIN:PLAINTEXT
+  #     KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
+  #     KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 2
+  #     JMX_PORT: 9997
+  #     KAFKA_JMX_OPTS: -Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Djava.rmi.server.hostname=localhost -Dcom.sun.management.jmxremote.rmi.port=9997
 
   kafka-init-topics0:
     image: confluentinc/cp-kafka:5.1.0
@@ -102,7 +102,7 @@ services:
     depends_on:
       - zookeeper0
       - kafka0
-      - kafka01
+      # - kafka01
     ports:
       - 8085:8085
     environment:

+ 4 - 0
kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/model/InternalBrokerMetrics.java

@@ -1,10 +1,14 @@
 package com.provectus.kafka.ui.cluster.model;
 
+import com.provectus.kafka.ui.model.JmxMetric;
 import lombok.Builder;
 import lombok.Data;
 
+import java.util.List;
+
 @Data
 @Builder(toBuilder = true)
 public class InternalBrokerMetrics {
     private final Long segmentSize;
+    private final List<JmxMetric> jmxMetrics;
 }

+ 7 - 5
kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/service/ClusterService.java

@@ -31,7 +31,6 @@ public class ClusterService {
     private final ClusterMapper clusterMapper;
     private final KafkaService kafkaService;
     private final ConsumingService consumingService;
-    private final JmxClusterUtil jmxClusterUtil;
 
     public List<Cluster> getClusters() {
         return clustersStorage.getKafkaClusters()
@@ -41,11 +40,14 @@ public class ClusterService {
     }
 
     public Mono<BrokersMetrics> getBrokersMetrics(String name, Integer id) {
-        return clustersStorage.getClusterByName(name)
+        return Mono.just(clustersStorage.getClusterByName(name)
                 .map(KafkaCluster::getMetrics)
-                .map(s -> kafkaService.getJmxMetric(name, id)
-                        .map(j -> s.toBuilder().jmxMetrics(j).build()))
-                .map(s -> s.map(clusterMapper::toBrokerMetrics)).orElseThrow();
+                .map(s -> {
+                    var brokerMetrics = clusterMapper.toBrokerMetrics(s);
+                    brokerMetrics.setJmxMetrics(s.getInternalBrokerMetrics().get(id).getJmxMetrics());
+                    brokerMetrics.setSegmentZise(Long.valueOf(s.getSegmentSize()).intValue());
+                    return brokerMetrics;
+                }).orElseThrow());
     }
 
     public List<Topic> getTopics(String name) {

+ 4 - 1
kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/util/JmxClusterUtil.java

@@ -65,7 +65,10 @@ public class JmxClusterUtil {
             ObjectName name = new ObjectName(canonicalName);
             var attrNames = msc.getMBeanInfo(name).getAttributes();
             for (MBeanAttributeInfo attrName : attrNames) {
-                resultAttr.put(attrName.getName(), msc.getAttribute(name, attrName.getName()));
+                var value = msc.getAttribute(name, attrName.getName());
+                if (value instanceof Number) {
+                    resultAttr.put(attrName.getName(), value);
+                }
             }
             pool.returnObject(jmxUrl, srv);
         } catch (MalformedURLException url) {

+ 39 - 9
kafka-ui-api/src/main/java/com/provectus/kafka/ui/kafka/KafkaService.java

@@ -19,6 +19,7 @@ import org.apache.kafka.common.serialization.BytesDeserializer;
 import org.apache.kafka.common.utils.Bytes;
 import org.springframework.beans.factory.annotation.Value;
 import org.springframework.stereotype.Service;
+import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
 import reactor.util.function.Tuple2;
 import reactor.util.function.Tuples;
@@ -46,9 +47,10 @@ public class KafkaService {
 
     @SneakyThrows
     public Mono<KafkaCluster> getUpdatedCluster(KafkaCluster cluster) {
-        return getOrCreateAdminClient(cluster).flatMap(
+        return getOrCreateAdminClient(cluster)
+                .flatMap(
                 ac -> getClusterMetrics(cluster, ac.getAdminClient())
-
+                        .flatMap(i -> fillJmxMetrics(i, cluster.getName()))
                         .flatMap( clusterMetrics ->
                             getTopicsData(ac.getAdminClient()).flatMap( topics ->
                                 loadTopicsConfig(ac.getAdminClient(), topics.stream().map(InternalTopic::getName).collect(Collectors.toList()))
@@ -159,18 +161,16 @@ public class KafkaService {
     private Mono<InternalClusterMetrics> getClusterMetrics(KafkaCluster cluster, AdminClient client) {
         return ClusterUtil.toMono(client.describeCluster().nodes())
                 .flatMap(brokers ->
-                    ClusterUtil.toMono(client.describeCluster().controller()).flatMap(
-                        c ->
-                            getClusterJmxMetric(cluster.getName()).map(jmxMetric -> {
+                    ClusterUtil.toMono(client.describeCluster().controller()).map(
+                        c -> {
                             InternalClusterMetrics.InternalClusterMetricsBuilder metricsBuilder = InternalClusterMetrics.builder();
                             metricsBuilder.brokerCount(brokers.size()).activeControllers(c != null ? 1 : 0);
                             metricsBuilder
-                                    .internalBrokerMetrics((brokers.stream().map(Node::id).collect(Collectors.toMap(k -> k, v -> InternalBrokerMetrics.builder().build()))))
-                                    .jmxMetrics(jmxMetric);
+                                    .internalBrokerMetrics((brokers.stream().map(Node::id).collect(Collectors.toMap(k -> k, v -> InternalBrokerMetrics.builder().build()))));
                             return metricsBuilder.build();
                         }
                     )
-                ));
+                );
     }
 
 
@@ -331,7 +331,7 @@ public class KafkaService {
                                 var brokerSegmentSize = log.get(e.getKey()).values().stream()
                                         .mapToLong(v -> v.replicaInfos.values().stream()
                                                 .mapToLong(r -> r.size).sum()).sum();
-                                InternalBrokerMetrics tempBrokerMetrics = InternalBrokerMetrics.builder().segmentSize(brokerSegmentSize).build();
+                                InternalBrokerMetrics tempBrokerMetrics = e.getValue().toBuilder().segmentSize(brokerSegmentSize).build();
                                 return Collections.singletonMap(e.getKey(), tempBrokerMetrics);
                             });
 
@@ -374,4 +374,34 @@ public class KafkaService {
                                 .map(n -> n.stream().filter(s -> s.id() == nodeId).findFirst().orElseThrow().host()))
                         .map(host ->  jmxClusterUtil.getJmxMetrics(c.getJmxPort(), host))).orElseThrow();
     }
+
+    private Mono<InternalClusterMetrics> fillJmxMetrics (InternalClusterMetrics internalClusterMetrics, String clusterName) {
+        return Flux.fromIterable(internalClusterMetrics.getInternalBrokerMetrics().keySet())
+                .flatMap(id -> getJmxMetric(clusterName, id)
+                    .map(jmx -> {
+                        var jmxMetric = internalClusterMetrics.getInternalBrokerMetrics().get(id).toBuilder().jmxMetrics(jmx).build();
+                        var tempBrokerMetrics = internalClusterMetrics.getInternalBrokerMetrics();
+                        tempBrokerMetrics.put(id, jmxMetric);
+                        return internalClusterMetrics.toBuilder().internalBrokerMetrics(tempBrokerMetrics).build();
+                    })).collectList()
+                .map(s -> s.stream().reduce((s1, s2) -> {
+                    s1.getInternalBrokerMetrics().putAll(s2.getInternalBrokerMetrics().entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)));
+                    return s1;
+                }).orElseThrow())
+                .map(i -> {
+                    var tempMetrics = new HashMap<>(i.getInternalBrokerMetrics());
+                    tempMetrics.values().stream().flatMap(s -> Stream.of(s.getJmxMetrics())).reduce((s1, s2) -> {
+                        s1.forEach(j1 -> {
+                            s2.forEach(j2 -> {
+                                if (j1.getCanonicalName().equals(j2.getCanonicalName())) {
+                                    j1.getValue().keySet().forEach(k -> j2.getValue().compute(k, (k1, v1) ->
+                                            JmxClusterUtil.metricValueReduce(j1, j2.getValue().get(k1))));
+                                }
+                            });
+                        });
+                        return s1;
+                    });
+                    return i.toBuilder().jmxMetrics(tempMetrics.values().stream().findFirst().orElseThrow().getJmxMetrics()).build();
+                });
+    }
 }

+ 0 - 55
kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml

@@ -273,61 +273,6 @@ paths:
               schema:
                 $ref: '#/components/schemas/ConsumerGroupDetails'
 
-#  /api/clusters/{clusterName}/brokers/{brokerId}/metrics/{canonicalName}:
-#    get:
-#      tags:
-#        - /api/clusters
-#      summary: get specific JmxMetric for broker
-#      operationId: getBrokerJmxMetric
-#      parameters:
-#        - name: clusterName
-#          in: path
-#          required: true
-#          schema:
-#            type: string
-#        - name: brokerId
-#          in: path
-#          required: true
-#          schema:
-#            type: integer
-#        - name: canonicalName
-#          in: path
-#          required: true
-#          schema:
-#            type: string
-#      responses:
-#        200:
-#          description: OK
-#          content:
-#            application/json:
-#              schema:
-#                $ref: '#/components/schemas/JmxMetric'
-
-#  /api/clusters/{clusterName}/metrics:
-#    get:
-#      tags:
-#        - /api/clusters
-#      summary: get specific JmxMetric for cluster
-#      operationId: getClusterJmxMetric
-#      parameters:
-#        - name: clusterName
-#          in: path
-#          required: true
-#          schema:
-#            type: string
-#        - name: canonicalName
-#          in: path
-#          required: true
-#          schema:
-#            type: string
-#      responses:
-#        200:
-#          description: OK
-#          content:
-#            application/json:
-#              schema:
-#                $ref: '#/components/schemas/JmxMetric'
-
   /api/clusters/{clusterName}/consumerGroups:
     get:
       tags: