Browse Source

#707 Exposed broker log dirs (#720)

#707 Exposed broker log dirs
Timur Davletov 3 years ago
parent
commit
29a3158df6

+ 10 - 0
kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/BrokersController.java

@@ -3,7 +3,9 @@ package com.provectus.kafka.ui.controller;
 import com.provectus.kafka.ui.api.BrokersApi;
 import com.provectus.kafka.ui.model.Broker;
 import com.provectus.kafka.ui.model.BrokerMetrics;
+import com.provectus.kafka.ui.model.BrokersLogdirs;
 import com.provectus.kafka.ui.service.ClusterService;
+import java.util.List;
 import lombok.RequiredArgsConstructor;
 import lombok.extern.log4j.Log4j2;
 import org.springframework.http.ResponseEntity;
@@ -31,4 +33,12 @@ public class BrokersController implements BrokersApi {
                                                        ServerWebExchange exchange) {
     return Mono.just(ResponseEntity.ok(clusterService.getBrokers(clusterName)));
   }
+
+  @Override
+  public Mono<ResponseEntity<Flux<BrokersLogdirs>>> getAllBrokersLogdirs(String clusterName,
+                                                                         List<Integer> brokers,
+                                                                         ServerWebExchange exchange
+  ) {
+    return Mono.just(ResponseEntity.ok(clusterService.getAllBrokersLogdirs(clusterName, brokers)));
+  }
 }

+ 65 - 0
kafka-ui-api/src/main/java/com/provectus/kafka/ui/mapper/DescribeLogDirsMapper.java

@@ -0,0 +1,65 @@
+package com.provectus.kafka.ui.mapper;
+
+import com.provectus.kafka.ui.model.BrokerTopicLogdirs;
+import com.provectus.kafka.ui.model.BrokerTopicPartitionLogdir;
+import com.provectus.kafka.ui.model.BrokersLogdirs;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.requests.DescribeLogDirsResponse;
+import org.springframework.stereotype.Component;
+
+@Component
+public class DescribeLogDirsMapper {
+
+  public List<BrokersLogdirs> toBrokerLogDirsList(
+      Map<Integer, Map<String, DescribeLogDirsResponse.LogDirInfo>> logDirsInfo) {
+
+    return logDirsInfo.entrySet().stream().map(
+        mapEntry -> mapEntry.getValue().entrySet().stream()
+            .map(e -> toBrokerLogDirs(mapEntry.getKey(), e.getKey(), e.getValue()))
+            .collect(Collectors.toList())
+    ).flatMap(Collection::stream).collect(Collectors.toList());
+  }
+
+  private BrokersLogdirs toBrokerLogDirs(Integer broker, String dirName,
+                                         DescribeLogDirsResponse.LogDirInfo logDirInfo) {
+    BrokersLogdirs result = new BrokersLogdirs();
+    result.setName(dirName);
+    if (logDirInfo.error != null) {
+      result.setError(logDirInfo.error.message());
+    }
+    var topics = logDirInfo.replicaInfos.entrySet().stream()
+        .collect(Collectors.groupingBy(e -> e.getKey().topic())).entrySet().stream()
+        .map(e -> toTopicLogDirs(broker, e.getKey(), e.getValue()))
+        .collect(Collectors.toList());
+    result.setTopics(topics);
+    return result;
+  }
+
+  private BrokerTopicLogdirs toTopicLogDirs(Integer broker, String name,
+                                            List<Map.Entry<TopicPartition,
+                                            DescribeLogDirsResponse.ReplicaInfo>> partitions) {
+    BrokerTopicLogdirs topic = new BrokerTopicLogdirs();
+    topic.setName(name);
+    topic.setPartitions(
+        partitions.stream().map(
+            e -> topicPartitionLogDir(
+                broker, e.getKey().partition(), e.getValue())).collect(Collectors.toList())
+    );
+    return topic;
+  }
+
+  private BrokerTopicPartitionLogdir topicPartitionLogDir(Integer broker, Integer partition,
+                                                          DescribeLogDirsResponse.ReplicaInfo
+                                                              replicaInfo) {
+    BrokerTopicPartitionLogdir logDir = new BrokerTopicPartitionLogdir();
+    logDir.setBroker(broker);
+    logDir.setPartition(partition);
+    logDir.setSize(replicaInfo.size);
+    logDir.setOffsetLag(replicaInfo.offsetLag);
+    return logDir;
+  }
+}

+ 10 - 0
kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/ClusterService.java

@@ -6,8 +6,10 @@ import com.provectus.kafka.ui.exception.NotFoundException;
 import com.provectus.kafka.ui.exception.TopicNotFoundException;
 import com.provectus.kafka.ui.exception.ValidationException;
 import com.provectus.kafka.ui.mapper.ClusterMapper;
+import com.provectus.kafka.ui.mapper.DescribeLogDirsMapper;
 import com.provectus.kafka.ui.model.Broker;
 import com.provectus.kafka.ui.model.BrokerMetrics;
+import com.provectus.kafka.ui.model.BrokersLogdirs;
 import com.provectus.kafka.ui.model.Cluster;
 import com.provectus.kafka.ui.model.ClusterMetrics;
 import com.provectus.kafka.ui.model.ClusterStats;
@@ -62,6 +64,7 @@ public class ClusterService {
   private final KafkaService kafkaService;
   private final ConsumingService consumingService;
   private final DeserializationService deserializationService;
+  private final DescribeLogDirsMapper describeLogDirsMapper;
 
   public List<Cluster> getClusters() {
     return clustersStorage.getKafkaClusters()
@@ -361,4 +364,11 @@ public class ClusterService {
         .orElse(Mono.error(new ClusterNotFoundException(
             String.format("No cluster for name '%s'", clusterName))));
   }
+
+  public Flux<BrokersLogdirs> getAllBrokersLogdirs(String clusterName, List<Integer> brokers) {
+    return Mono.justOrEmpty(clustersStorage.getClusterByName(clusterName))
+        .flatMap(c -> kafkaService.getClusterLogDirs(c, brokers))
+        .map(describeLogDirsMapper::toBrokerLogDirsList)
+        .flatMapMany(Flux::fromIterable);
+  }
 }

+ 21 - 0
kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/KafkaService.java

@@ -41,6 +41,7 @@ import java.util.Properties;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutionException;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 import lombok.RequiredArgsConstructor;
@@ -53,6 +54,7 @@ import org.apache.kafka.clients.admin.AlterConfigOp;
 import org.apache.kafka.clients.admin.Config;
 import org.apache.kafka.clients.admin.ConfigEntry;
 import org.apache.kafka.clients.admin.ConsumerGroupListing;
+import org.apache.kafka.clients.admin.DescribeLogDirsResult;
 import org.apache.kafka.clients.admin.ListTopicsOptions;
 import org.apache.kafka.clients.admin.NewPartitionReassignment;
 import org.apache.kafka.clients.admin.NewPartitions;
@@ -68,6 +70,8 @@ import org.apache.kafka.clients.producer.RecordMetadata;
 import org.apache.kafka.common.Node;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.config.ConfigResource;
+import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.kafka.common.requests.DescribeLogDirsResponse;
 import org.apache.kafka.common.serialization.ByteArraySerializer;
 import org.apache.kafka.common.serialization.BytesDeserializer;
 import org.apache.kafka.common.utils.Bytes;
@@ -786,6 +790,23 @@ public class KafkaService {
         });
   }
 
+  public Mono<Map<Integer, Map<String, DescribeLogDirsResponse.LogDirInfo>>> getClusterLogDirs(
+      KafkaCluster cluster, List<Integer> reqBrokers) {
+    return getOrCreateAdminClient(cluster)
+        .map(admin -> {
+          List<Integer> brokers = new ArrayList<>(cluster.getBrokers());
+          if (reqBrokers != null && !reqBrokers.isEmpty()) {
+            brokers.retainAll(reqBrokers);
+          }
+          return admin.getAdminClient().describeLogDirs(brokers);
+        })
+        .flatMap(result -> ClusterUtil.toMono(result.all()))
+        .onErrorResume(TimeoutException.class, (TimeoutException e) -> {
+          log.error("Error during fetching log dirs", e);
+          return Mono.just(new HashMap<>());
+        });
+  }
+
   private Map<TopicPartition, Optional<NewPartitionReassignment>> getPartitionsReassignments(
       KafkaCluster cluster,
       String topicName,

+ 85 - 0
kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/LogDirsTest.java

@@ -0,0 +1,85 @@
+package com.provectus.kafka.ui.service;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import com.provectus.kafka.ui.AbstractBaseTest;
+import com.provectus.kafka.ui.model.BrokerTopicLogdirs;
+import com.provectus.kafka.ui.model.BrokersLogdirs;
+import java.util.List;
+import org.junit.jupiter.api.Test;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.test.autoconfigure.web.reactive.AutoConfigureWebTestClient;
+import org.springframework.core.ParameterizedTypeReference;
+import org.springframework.test.context.ContextConfiguration;
+import org.springframework.test.web.reactive.server.WebTestClient;
+
+@ContextConfiguration(initializers = {AbstractBaseTest.Initializer.class})
+@AutoConfigureWebTestClient(timeout = "60000")
+public class LogDirsTest extends AbstractBaseTest {
+
+  @Autowired
+  private WebTestClient webTestClient;
+
+  @Test
+  public void testAllBrokers() {
+    List<BrokersLogdirs> dirs = webTestClient.get()
+        .uri("/api/clusters/{clusterName}/brokers/logdirs", LOCAL)
+        .exchange()
+        .expectStatus().isOk()
+        .expectBody(new ParameterizedTypeReference<List<BrokersLogdirs>>() {})
+        .returnResult()
+        .getResponseBody();
+
+    assertThat(dirs).hasSize(1);
+    BrokersLogdirs dir = dirs.get(0);
+    assertThat(dir.getName()).isEqualTo("/var/lib/kafka/data");
+    assertThat(dir.getTopics().stream().anyMatch(t -> t.getName().equals("__consumer_offsets")))
+        .isTrue();
+
+    BrokerTopicLogdirs topic = dir.getTopics().stream()
+        .filter(t -> t.getName().equals("__consumer_offsets"))
+        .findAny().get();
+
+    assertThat(topic.getPartitions()).hasSize(1);
+    assertThat(topic.getPartitions().get(0).getBroker()).isEqualTo(1);
+    assertThat(topic.getPartitions().get(0).getSize()).isPositive();
+  }
+
+  @Test
+  public void testOneBrokers() {
+    List<BrokersLogdirs> dirs = webTestClient.get()
+        .uri("/api/clusters/{clusterName}/brokers/logdirs?broker=1", LOCAL)
+        .exchange()
+        .expectStatus().isOk()
+        .expectBody(new ParameterizedTypeReference<List<BrokersLogdirs>>() {})
+        .returnResult()
+        .getResponseBody();
+
+    assertThat(dirs).hasSize(1);
+    BrokersLogdirs dir = dirs.get(0);
+    assertThat(dir.getName()).isEqualTo("/var/lib/kafka/data");
+    assertThat(dir.getTopics().stream().anyMatch(t -> t.getName().equals("__consumer_offsets")))
+        .isTrue();
+
+    BrokerTopicLogdirs topic = dir.getTopics().stream()
+        .filter(t -> t.getName().equals("__consumer_offsets"))
+        .findAny().get();
+
+    assertThat(topic.getPartitions()).hasSize(1);
+    assertThat(topic.getPartitions().get(0).getBroker()).isEqualTo(1);
+    assertThat(topic.getPartitions().get(0).getSize()).isPositive();
+  }
+
+  @Test
+  public void testWrongBrokers() {
+    List<BrokersLogdirs> dirs = webTestClient.get()
+        .uri("/api/clusters/{clusterName}/brokers/logdirs?broker=2", LOCAL)
+        .exchange()
+        .expectStatus().isOk()
+        .expectBody(new ParameterizedTypeReference<List<BrokersLogdirs>>() {})
+        .returnResult()
+        .getResponseBody();
+
+    assertThat(dirs).isEmpty();
+  }
+}

+ 95 - 0
kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml

@@ -142,6 +142,37 @@ paths:
               schema:
                 $ref: '#/components/schemas/BrokerMetrics'
 
+  /api/clusters/{clusterName}/brokers/logdirs:
+    get:
+      tags:
+        - Brokers
+      summary: getAllBrokersLogdirs
+      operationId: getAllBrokersLogdirs
+      parameters:
+        - name: clusterName
+          in: path
+          required: true
+          schema:
+            type: string
+        - name: broker
+          in: query
+          description: array of broker ids
+          required: false
+          schema:
+            type: array
+            items:
+              type: integer
+      responses:
+        200:
+          description: OK
+          content:
+            application/json:
+              schema:
+                type: array
+                items:
+                  $ref: '#/components/schemas/BrokersLogdirs'
+
+
   /api/clusters/{clusterName}/topics:
     get:
       tags:
@@ -1446,6 +1477,30 @@ components:
           items:
             $ref: '#/components/schemas/Metric'
 
+    BrokerLogdirs:
+      type: object
+      properties:
+        name:
+          type: string
+        error:
+          type: string
+        topics:
+          type: array
+          items:
+            $ref: '#/components/schemas/TopicLogdirs'
+
+    BrokersLogdirs:
+      type: object
+      properties:
+        name:
+          type: string
+        error:
+          type: string
+        topics:
+          type: array
+          items:
+            $ref: '#/components/schemas/BrokerTopicLogdirs'
+
     TopicsResponse:
       type: object
       properties:
@@ -1786,6 +1841,46 @@ components:
           additionalProperties:
             type: number
 
+    TopicLogdirs:
+      type: object
+      properties:
+        name:
+          type: string
+        partitions:
+          type: array
+          items:
+            $ref: '#/components/schemas/TopicPartitionLogdir'
+
+    BrokerTopicLogdirs:
+      type: object
+      properties:
+        name:
+          type: string
+        partitions:
+          type: array
+          items:
+            $ref: '#/components/schemas/BrokerTopicPartitionLogdir'
+
+    TopicPartitionLogdir:
+      type: object
+      properties:
+        partition:
+          type: integer
+        size:
+          type: integer
+          format: int64
+        offsetLag:
+          type: integer
+          format: int64
+
+    BrokerTopicPartitionLogdir:
+      allOf:
+        - $ref: '#/components/schemas/TopicPartitionLogdir'
+        - type: object
+          properties:
+            broker:
+              type: integer
+
     SchemaSubject:
       type: object
       properties: