소스 검색

Added segmentSize to clusterMetrics object

Roman Nedzvetskiy 5 년 전
부모
커밋
7d127fb2b6

+ 1 - 1
docker/kafka-clusters-only.yaml

@@ -23,7 +23,7 @@ services:
       KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:29091,PLAINTEXT_HOST://localhost:9091,PLAIN://kafka0:29090
       KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT,PLAIN:PLAINTEXT
       KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
-      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 2
+      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
       JMX_PORT: 9997
       KAFKA_JMX_OPTS: -Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Djava.rmi.server.hostname=localhost -Dcom.sun.management.jmxremote.rmi.port=9997
 

+ 1 - 1
kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/model/InternalClusterMetrics.java

@@ -22,7 +22,7 @@ public class InternalClusterMetrics {
     private final int bytesInPerSec;
     private final int bytesOutPerSec;
     //TODO: find way to fill
-    private final int segmentSize;
+    private final long segmentSize;
     private final int segmentCount;
     private final List<Integer> brokersIds;
 }

+ 25 - 7
kafka-ui-api/src/main/java/com/provectus/kafka/ui/kafka/KafkaService.java

@@ -43,7 +43,7 @@ public class KafkaService {
                             getTopicsData(ac).flatMap( topics ->
                                 loadTopicsConfig(ac, topics.stream().map(InternalTopic::getName).collect(Collectors.toList()))
                                         .map( configs -> mergeWithConfigs(topics, configs))
-                                    .flatMap(it -> updateSegmentSize(ac, clusterMetrics, it))
+                                    .flatMap(it -> updateTopicSegmentSize(ac, clusterMetrics, it))
                             ).map( topics -> buildFromData(cluster, clusterMetrics, topics))
                         )
         ).onErrorResume(
@@ -141,7 +141,7 @@ public class KafkaService {
                             return builder.build();
                         }
                     )
-                );
+                ).flatMap(c -> updateClusterSegmentSize(client, c));
     }
 
 
@@ -246,20 +246,38 @@ public class KafkaService {
                     .next());
     }
 
-    private Mono<Map<String, InternalTopic>> updateSegmentSize(AdminClient ac, InternalClusterMetrics clusterMetrics, Map<String, InternalTopic> internalTopic) {
+    private Mono<Map<String, InternalTopic>> updateTopicSegmentSize(AdminClient ac, InternalClusterMetrics clusterMetrics, Map<String, InternalTopic> internalTopic) {
                 return ClusterUtil.toMono(ac.describeLogDirs(clusterMetrics.getBrokersIds()).all())
                         .map(l -> {
-                            System.out.println(l);
                             var topicsInfo = l.values().stream()
                                 .flatMap(v -> Stream.of(v.values()))
                                 .flatMap(v -> Stream.of(v.stream().map(s -> s.replicaInfos)))
                                 .findFirst().orElseThrow().collect(Collectors.toList());
-                            var internalSegments = topicsInfo.stream().flatMap(t -> t.entrySet().stream()
-                                    .flatMap(e -> Stream.of(new InternalSegmentSize(e.getKey().topic(), e.getValue().size)))).collect(Collectors.toList());
+                            var internalSegments = topicsInfo.stream()
+                                    .flatMap(t -> t.entrySet().stream()
+                                        .flatMap(e -> Stream.of(new InternalSegmentSize(e.getKey().topic(), e.getValue().size))))
+                                    .collect(Collectors.toList());
                             return internalTopic.values().stream().flatMap(k ->
                                     Stream.of(k.toBuilder().segmentSize(internalSegments.stream()
-                                            .filter(key -> key.getReplicaName().equals(k.getName())).mapToLong(InternalSegmentSize::getSegmentSize).sum()).build()))
+                                            .filter(key -> key.getReplicaName().equals(k.getName()))
+                                            .mapToLong(InternalSegmentSize::getSegmentSize)
+                                            .sum()).build()))
                                     .collect(Collectors.toMap(InternalTopic::getName, v -> v));
                         });
     }
+
+    private Mono<InternalClusterMetrics> updateClusterSegmentSize (AdminClient client, InternalClusterMetrics clusterMetrics) {
+        return ClusterUtil.toMono(client.describeLogDirs(clusterMetrics.getBrokersIds()).all())
+                .map(l -> {
+                    var replicasInfo = l.values().stream()
+                            .flatMap(v -> Stream.of(v.values()))
+                            .flatMap(v -> Stream.of(v.stream().map(s -> s.replicaInfos)))
+                            .findFirst().orElseThrow().collect(Collectors.toList());
+
+                    var internalSegments = replicasInfo.stream()
+                            .mapToLong(t -> t.entrySet().stream()
+                                    .mapToLong(e -> e.getValue().size).sum()).sum();
+                    return clusterMetrics.toBuilder().segmentSize(internalSegments).build();
+                });
+    }
 }