From 29a3158df68662f9ed7795edbc775446831a977d Mon Sep 17 00:00:00 2001 From: Timur Davletov Date: Tue, 27 Jul 2021 13:03:21 +0300 Subject: [PATCH] #707 Exposed broker log dirs (#720) #707 Exposed broker log dirs --- .../ui/controller/BrokersController.java | 10 ++ .../ui/mapper/DescribeLogDirsMapper.java | 65 +++++++++++++ .../kafka/ui/service/ClusterService.java | 10 ++ .../kafka/ui/service/KafkaService.java | 21 ++++ .../kafka/ui/service/LogDirsTest.java | 85 +++++++++++++++++ .../main/resources/swagger/kafka-ui-api.yaml | 95 +++++++++++++++++++ 6 files changed, 286 insertions(+) create mode 100644 kafka-ui-api/src/main/java/com/provectus/kafka/ui/mapper/DescribeLogDirsMapper.java create mode 100644 kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/LogDirsTest.java diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/BrokersController.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/BrokersController.java index 18b5291d75..0678e414c3 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/BrokersController.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/BrokersController.java @@ -3,7 +3,9 @@ package com.provectus.kafka.ui.controller; import com.provectus.kafka.ui.api.BrokersApi; import com.provectus.kafka.ui.model.Broker; import com.provectus.kafka.ui.model.BrokerMetrics; +import com.provectus.kafka.ui.model.BrokersLogdirs; import com.provectus.kafka.ui.service.ClusterService; +import java.util.List; import lombok.RequiredArgsConstructor; import lombok.extern.log4j.Log4j2; import org.springframework.http.ResponseEntity; @@ -31,4 +33,12 @@ public class BrokersController implements BrokersApi { ServerWebExchange exchange) { return Mono.just(ResponseEntity.ok(clusterService.getBrokers(clusterName))); } + + @Override + public Mono>> getAllBrokersLogdirs(String clusterName, + List brokers, + ServerWebExchange exchange + ) { + return Mono.just(ResponseEntity.ok(clusterService.getAllBrokersLogdirs(clusterName, brokers))); + } } diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/mapper/DescribeLogDirsMapper.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/mapper/DescribeLogDirsMapper.java new file mode 100644 index 0000000000..2718fdab7c --- /dev/null +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/mapper/DescribeLogDirsMapper.java @@ -0,0 +1,65 @@ +package com.provectus.kafka.ui.mapper; + +import com.provectus.kafka.ui.model.BrokerTopicLogdirs; +import com.provectus.kafka.ui.model.BrokerTopicPartitionLogdir; +import com.provectus.kafka.ui.model.BrokersLogdirs; +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.requests.DescribeLogDirsResponse; +import org.springframework.stereotype.Component; + +@Component +public class DescribeLogDirsMapper { + + public List toBrokerLogDirsList( + Map> logDirsInfo) { + + return logDirsInfo.entrySet().stream().map( + mapEntry -> mapEntry.getValue().entrySet().stream() + .map(e -> toBrokerLogDirs(mapEntry.getKey(), e.getKey(), e.getValue())) + .collect(Collectors.toList()) + ).flatMap(Collection::stream).collect(Collectors.toList()); + } + + private BrokersLogdirs toBrokerLogDirs(Integer broker, String dirName, + DescribeLogDirsResponse.LogDirInfo logDirInfo) { + BrokersLogdirs result = new BrokersLogdirs(); + result.setName(dirName); + if (logDirInfo.error != null) { + result.setError(logDirInfo.error.message()); + } + var topics = logDirInfo.replicaInfos.entrySet().stream() + .collect(Collectors.groupingBy(e -> e.getKey().topic())).entrySet().stream() + .map(e -> toTopicLogDirs(broker, e.getKey(), e.getValue())) + .collect(Collectors.toList()); + result.setTopics(topics); + return result; + } + + private BrokerTopicLogdirs toTopicLogDirs(Integer broker, String name, + List> partitions) { + BrokerTopicLogdirs topic = new BrokerTopicLogdirs(); + topic.setName(name); + topic.setPartitions( + partitions.stream().map( + e -> topicPartitionLogDir( + broker, e.getKey().partition(), e.getValue())).collect(Collectors.toList()) + ); + return topic; + } + + private BrokerTopicPartitionLogdir topicPartitionLogDir(Integer broker, Integer partition, + DescribeLogDirsResponse.ReplicaInfo + replicaInfo) { + BrokerTopicPartitionLogdir logDir = new BrokerTopicPartitionLogdir(); + logDir.setBroker(broker); + logDir.setPartition(partition); + logDir.setSize(replicaInfo.size); + logDir.setOffsetLag(replicaInfo.offsetLag); + return logDir; + } +} diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/ClusterService.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/ClusterService.java index 57faf2d5e4..84b13767b5 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/ClusterService.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/ClusterService.java @@ -6,8 +6,10 @@ import com.provectus.kafka.ui.exception.NotFoundException; import com.provectus.kafka.ui.exception.TopicNotFoundException; import com.provectus.kafka.ui.exception.ValidationException; import com.provectus.kafka.ui.mapper.ClusterMapper; +import com.provectus.kafka.ui.mapper.DescribeLogDirsMapper; import com.provectus.kafka.ui.model.Broker; import com.provectus.kafka.ui.model.BrokerMetrics; +import com.provectus.kafka.ui.model.BrokersLogdirs; import com.provectus.kafka.ui.model.Cluster; import com.provectus.kafka.ui.model.ClusterMetrics; import com.provectus.kafka.ui.model.ClusterStats; @@ -62,6 +64,7 @@ public class ClusterService { private final KafkaService kafkaService; private final ConsumingService consumingService; private final DeserializationService deserializationService; + private final DescribeLogDirsMapper describeLogDirsMapper; public List getClusters() { return clustersStorage.getKafkaClusters() @@ -361,4 +364,11 @@ public class ClusterService { .orElse(Mono.error(new ClusterNotFoundException( String.format("No cluster for name '%s'", clusterName)))); } + + public Flux getAllBrokersLogdirs(String clusterName, List brokers) { + return Mono.justOrEmpty(clustersStorage.getClusterByName(clusterName)) + .flatMap(c -> kafkaService.getClusterLogDirs(c, brokers)) + .map(describeLogDirsMapper::toBrokerLogDirsList) + .flatMapMany(Flux::fromIterable); + } } \ No newline at end of file diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/KafkaService.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/KafkaService.java index e914a16a1e..ed7cd4e246 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/KafkaService.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/KafkaService.java @@ -41,6 +41,7 @@ import java.util.Properties; import java.util.UUID; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutionException; import java.util.stream.Collectors; import java.util.stream.Stream; import lombok.RequiredArgsConstructor; @@ -53,6 +54,7 @@ import org.apache.kafka.clients.admin.AlterConfigOp; import org.apache.kafka.clients.admin.Config; import org.apache.kafka.clients.admin.ConfigEntry; import org.apache.kafka.clients.admin.ConsumerGroupListing; +import org.apache.kafka.clients.admin.DescribeLogDirsResult; import org.apache.kafka.clients.admin.ListTopicsOptions; import org.apache.kafka.clients.admin.NewPartitionReassignment; import org.apache.kafka.clients.admin.NewPartitions; @@ -68,6 +70,8 @@ import org.apache.kafka.clients.producer.RecordMetadata; import org.apache.kafka.common.Node; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.config.ConfigResource; +import org.apache.kafka.common.errors.TimeoutException; +import org.apache.kafka.common.requests.DescribeLogDirsResponse; import org.apache.kafka.common.serialization.ByteArraySerializer; import org.apache.kafka.common.serialization.BytesDeserializer; import org.apache.kafka.common.utils.Bytes; @@ -786,6 +790,23 @@ public class KafkaService { }); } + public Mono>> getClusterLogDirs( + KafkaCluster cluster, List reqBrokers) { + return getOrCreateAdminClient(cluster) + .map(admin -> { + List brokers = new ArrayList<>(cluster.getBrokers()); + if (reqBrokers != null && !reqBrokers.isEmpty()) { + brokers.retainAll(reqBrokers); + } + return admin.getAdminClient().describeLogDirs(brokers); + }) + .flatMap(result -> ClusterUtil.toMono(result.all())) + .onErrorResume(TimeoutException.class, (TimeoutException e) -> { + log.error("Error during fetching log dirs", e); + return Mono.just(new HashMap<>()); + }); + } + private Map> getPartitionsReassignments( KafkaCluster cluster, String topicName, diff --git a/kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/LogDirsTest.java b/kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/LogDirsTest.java new file mode 100644 index 0000000000..79bcce8d82 --- /dev/null +++ b/kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/LogDirsTest.java @@ -0,0 +1,85 @@ +package com.provectus.kafka.ui.service; + +import static org.assertj.core.api.Assertions.assertThat; + +import com.provectus.kafka.ui.AbstractBaseTest; +import com.provectus.kafka.ui.model.BrokerTopicLogdirs; +import com.provectus.kafka.ui.model.BrokersLogdirs; +import java.util.List; +import org.junit.jupiter.api.Test; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.test.autoconfigure.web.reactive.AutoConfigureWebTestClient; +import org.springframework.core.ParameterizedTypeReference; +import org.springframework.test.context.ContextConfiguration; +import org.springframework.test.web.reactive.server.WebTestClient; + +@ContextConfiguration(initializers = {AbstractBaseTest.Initializer.class}) +@AutoConfigureWebTestClient(timeout = "60000") +public class LogDirsTest extends AbstractBaseTest { + + @Autowired + private WebTestClient webTestClient; + + @Test + public void testAllBrokers() { + List dirs = webTestClient.get() + .uri("/api/clusters/{clusterName}/brokers/logdirs", LOCAL) + .exchange() + .expectStatus().isOk() + .expectBody(new ParameterizedTypeReference>() {}) + .returnResult() + .getResponseBody(); + + assertThat(dirs).hasSize(1); + BrokersLogdirs dir = dirs.get(0); + assertThat(dir.getName()).isEqualTo("/var/lib/kafka/data"); + assertThat(dir.getTopics().stream().anyMatch(t -> t.getName().equals("__consumer_offsets"))) + .isTrue(); + + BrokerTopicLogdirs topic = dir.getTopics().stream() + .filter(t -> t.getName().equals("__consumer_offsets")) + .findAny().get(); + + assertThat(topic.getPartitions()).hasSize(1); + assertThat(topic.getPartitions().get(0).getBroker()).isEqualTo(1); + assertThat(topic.getPartitions().get(0).getSize()).isPositive(); + } + + @Test + public void testOneBrokers() { + List dirs = webTestClient.get() + .uri("/api/clusters/{clusterName}/brokers/logdirs?broker=1", LOCAL) + .exchange() + .expectStatus().isOk() + .expectBody(new ParameterizedTypeReference>() {}) + .returnResult() + .getResponseBody(); + + assertThat(dirs).hasSize(1); + BrokersLogdirs dir = dirs.get(0); + assertThat(dir.getName()).isEqualTo("/var/lib/kafka/data"); + assertThat(dir.getTopics().stream().anyMatch(t -> t.getName().equals("__consumer_offsets"))) + .isTrue(); + + BrokerTopicLogdirs topic = dir.getTopics().stream() + .filter(t -> t.getName().equals("__consumer_offsets")) + .findAny().get(); + + assertThat(topic.getPartitions()).hasSize(1); + assertThat(topic.getPartitions().get(0).getBroker()).isEqualTo(1); + assertThat(topic.getPartitions().get(0).getSize()).isPositive(); + } + + @Test + public void testWrongBrokers() { + List dirs = webTestClient.get() + .uri("/api/clusters/{clusterName}/brokers/logdirs?broker=2", LOCAL) + .exchange() + .expectStatus().isOk() + .expectBody(new ParameterizedTypeReference>() {}) + .returnResult() + .getResponseBody(); + + assertThat(dirs).isEmpty(); + } +} diff --git a/kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml b/kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml index 6643282ffc..9eb892fa5a 100644 --- a/kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml +++ b/kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml @@ -142,6 +142,37 @@ paths: schema: $ref: '#/components/schemas/BrokerMetrics' + /api/clusters/{clusterName}/brokers/logdirs: + get: + tags: + - Brokers + summary: getAllBrokersLogdirs + operationId: getAllBrokersLogdirs + parameters: + - name: clusterName + in: path + required: true + schema: + type: string + - name: broker + in: query + description: array of broker ids + required: false + schema: + type: array + items: + type: integer + responses: + 200: + description: OK + content: + application/json: + schema: + type: array + items: + $ref: '#/components/schemas/BrokersLogdirs' + + /api/clusters/{clusterName}/topics: get: tags: @@ -1446,6 +1477,30 @@ components: items: $ref: '#/components/schemas/Metric' + BrokerLogdirs: + type: object + properties: + name: + type: string + error: + type: string + topics: + type: array + items: + $ref: '#/components/schemas/TopicLogdirs' + + BrokersLogdirs: + type: object + properties: + name: + type: string + error: + type: string + topics: + type: array + items: + $ref: '#/components/schemas/BrokerTopicLogdirs' + TopicsResponse: type: object properties: @@ -1786,6 +1841,46 @@ components: additionalProperties: type: number + TopicLogdirs: + type: object + properties: + name: + type: string + partitions: + type: array + items: + $ref: '#/components/schemas/TopicPartitionLogdir' + + BrokerTopicLogdirs: + type: object + properties: + name: + type: string + partitions: + type: array + items: + $ref: '#/components/schemas/BrokerTopicPartitionLogdir' + + TopicPartitionLogdir: + type: object + properties: + partition: + type: integer + size: + type: integer + format: int64 + offsetLag: + type: integer + format: int64 + + BrokerTopicPartitionLogdir: + allOf: + - $ref: '#/components/schemas/TopicPartitionLogdir' + - type: object + properties: + broker: + type: integer + SchemaSubject: type: object properties: