From 09ebd03e71190a0829a4894ee3a6feaade29c5f0 Mon Sep 17 00:00:00 2001 From: Marsel <49659004+MarselAhmetov@users.noreply.github.com> Date: Fri, 2 Jul 2021 14:38:22 +0300 Subject: [PATCH] Issue#314 partitions count increase (#579) * adding topic's partitions count increase * adding tests * making pull request fixes * making pull request fixes * adding checkstyle fix * pull request fixes * adding cluster update after increasing partitions * pull requset fix Co-authored-by: marselakhmetov --- .../kafka/ui/controller/TopicsController.java | 12 ++++ .../kafka/ui/service/ClusterService.java | 17 ++++++ .../kafka/ui/service/KafkaService.java | 41 +++++++++++++ .../kafka/ui/KafkaConsumerTests.java | 51 ++++++++++++++++ .../kafka/ui/service/ClusterServiceTest.java | 1 + .../main/resources/swagger/kafka-ui-api.yaml | 60 +++++++++++++++++-- 6 files changed, 178 insertions(+), 4 deletions(-) diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/TopicsController.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/TopicsController.java index b7e934e7b8..d456e092d9 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/TopicsController.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/TopicsController.java @@ -1,6 +1,8 @@ package com.provectus.kafka.ui.controller; import com.provectus.kafka.ui.api.TopicsApi; +import com.provectus.kafka.ui.model.PartitionsIncrease; +import com.provectus.kafka.ui.model.PartitionsIncreaseResponse; import com.provectus.kafka.ui.model.Topic; import com.provectus.kafka.ui.model.TopicColumnsToSort; import com.provectus.kafka.ui.model.TopicConfig; @@ -86,4 +88,14 @@ public class TopicsController implements TopicsApi { ServerWebExchange exchange) { return clusterService.updateTopic(clusterId, topicName, topicUpdate).map(ResponseEntity::ok); } + + @Override + public Mono> increaseTopicPartitions( + String clusterName, String topicName, + Mono partitionsIncrease, + ServerWebExchange exchange) { + return partitionsIncrease.flatMap( + partitions -> clusterService.increaseTopicPartitions(clusterName, topicName, partitions)) + .map(ResponseEntity::ok); + } } diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/ClusterService.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/ClusterService.java index 1d01465b53..6168462acc 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/ClusterService.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/ClusterService.java @@ -17,6 +17,8 @@ import com.provectus.kafka.ui.model.CreateTopicMessage; import com.provectus.kafka.ui.model.ExtendedAdminClient; import com.provectus.kafka.ui.model.InternalTopic; import com.provectus.kafka.ui.model.KafkaCluster; +import com.provectus.kafka.ui.model.PartitionsIncrease; +import com.provectus.kafka.ui.model.PartitionsIncreaseResponse; import com.provectus.kafka.ui.model.Topic; import com.provectus.kafka.ui.model.TopicColumnsToSort; import com.provectus.kafka.ui.model.TopicConfig; @@ -285,6 +287,21 @@ public class ClusterService { .flatMap(offsets -> kafkaService.deleteTopicMessages(cluster, offsets)); } + public Mono increaseTopicPartitions( + String clusterName, + String topicName, + PartitionsIncrease partitionsIncrease) { + return clustersStorage.getClusterByName(clusterName).map(cluster -> + kafkaService.increaseTopicPartitions(cluster, topicName, partitionsIncrease) + .doOnNext(t -> updateCluster(t, cluster.getName(), cluster)) + .map(t -> new PartitionsIncreaseResponse() + .topicName(t.getName()) + .totalPartitionsCount(t.getPartitionCount()))) + .orElse(Mono.error(new ClusterNotFoundException( + String.format("No cluster for name '%s'", clusterName) + ))); + } + public Mono deleteConsumerGroupById(String clusterName, String groupId) { return clustersStorage.getClusterByName(clusterName) diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/KafkaService.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/KafkaService.java index 0bc7d7ea66..c480d043f7 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/KafkaService.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/KafkaService.java @@ -1,5 +1,6 @@ package com.provectus.kafka.ui.service; +import com.provectus.kafka.ui.exception.ValidationException; import com.provectus.kafka.ui.model.ConsumerGroup; import com.provectus.kafka.ui.model.CreateTopicMessage; import com.provectus.kafka.ui.model.ExtendedAdminClient; @@ -12,6 +13,8 @@ import com.provectus.kafka.ui.model.InternalTopic; import com.provectus.kafka.ui.model.InternalTopicConfig; import com.provectus.kafka.ui.model.KafkaCluster; import com.provectus.kafka.ui.model.Metric; +import com.provectus.kafka.ui.model.PartitionsIncrease; +import com.provectus.kafka.ui.model.PartitionsIncreaseResponse; import com.provectus.kafka.ui.model.ServerStatus; import com.provectus.kafka.ui.model.TopicConsumerGroups; import com.provectus.kafka.ui.model.TopicCreation; @@ -47,6 +50,7 @@ import org.apache.kafka.clients.admin.ConfigEntry; import org.apache.kafka.clients.admin.ConsumerGroupDescription; import org.apache.kafka.clients.admin.ConsumerGroupListing; import org.apache.kafka.clients.admin.ListTopicsOptions; +import org.apache.kafka.clients.admin.NewPartitions; import org.apache.kafka.clients.admin.NewTopic; import org.apache.kafka.clients.admin.RecordsToDelete; import org.apache.kafka.clients.consumer.ConsumerConfig; @@ -674,4 +678,41 @@ public class KafkaService { } } + private Mono increaseTopicPartitions(AdminClient adminClient, + String topicName, + Map newPartitionsMap + ) { + return ClusterUtil.toMono(adminClient.createPartitions(newPartitionsMap).all(), topicName) + .flatMap(topic -> getTopicsData(adminClient, Collections.singleton(topic)).next()); + } + + public Mono increaseTopicPartitions( + KafkaCluster cluster, + String topicName, + PartitionsIncrease partitionsIncrease) { + return getOrCreateAdminClient(cluster) + .flatMap(ac -> { + Integer actualCount = cluster.getTopics().get(topicName).getPartitionCount(); + Integer requestedCount = partitionsIncrease.getTotalPartitionsCount(); + + if (requestedCount < actualCount) { + return Mono.error( + new ValidationException(String.format( + "Topic currently has %s partitions, which is higher than the requested %s.", + actualCount, requestedCount))); + } + if (requestedCount.equals(actualCount)) { + return Mono.error( + new ValidationException( + String.format("Topic already has %s partitions.", actualCount))); + } + + Map newPartitionsMap = Collections.singletonMap( + topicName, + NewPartitions.increaseTo(partitionsIncrease.getTotalPartitionsCount()) + ); + return increaseTopicPartitions(ac.getAdminClient(), topicName, newPartitionsMap); + }); + } + } diff --git a/kafka-ui-api/src/test/java/com/provectus/kafka/ui/KafkaConsumerTests.java b/kafka-ui-api/src/test/java/com/provectus/kafka/ui/KafkaConsumerTests.java index 0e46ed9e25..c1dfd6bd6b 100644 --- a/kafka-ui-api/src/test/java/com/provectus/kafka/ui/KafkaConsumerTests.java +++ b/kafka-ui-api/src/test/java/com/provectus/kafka/ui/KafkaConsumerTests.java @@ -1,12 +1,16 @@ package com.provectus.kafka.ui; +import com.provectus.kafka.ui.model.PartitionsIncrease; +import com.provectus.kafka.ui.model.PartitionsIncreaseResponse; import com.provectus.kafka.ui.model.TopicCreation; +import com.provectus.kafka.ui.model.TopicDetails; import com.provectus.kafka.ui.model.TopicMessage; import com.provectus.kafka.ui.producer.KafkaTestProducer; import java.util.Map; import java.util.UUID; import java.util.stream.Stream; import lombok.extern.log4j.Log4j2; +import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.autoconfigure.web.reactive.AutoConfigureWebTestClient; @@ -65,6 +69,53 @@ public class KafkaConsumerTests extends AbstractBaseTest { .hasSize(0); } + @Test + public void shouldIncreasePartitionsUpTo10() { + var topicName = UUID.randomUUID().toString(); + webTestClient.post() + .uri("/api/clusters/{clusterName}/topics", LOCAL) + .bodyValue(new TopicCreation() + .name(topicName) + .partitions(1) + .replicationFactor(1) + .configs(Map.of()) + ) + .exchange() + .expectStatus() + .isOk(); + + PartitionsIncreaseResponse response = webTestClient.patch() + .uri("/api/clusters/{clusterName}/topics/{topicName}/partitions", + LOCAL, + topicName) + .bodyValue(new PartitionsIncrease() + .totalPartitionsCount(10) + ) + .exchange() + .expectStatus() + .isOk() + .expectBody(PartitionsIncreaseResponse.class) + .returnResult() + .getResponseBody(); + + assert response != null; + Assertions.assertEquals(10, response.getTotalPartitionsCount()); + + TopicDetails topicDetails = webTestClient.get() + .uri("/api/clusters/{clusterName}/topics/{topicName}", + LOCAL, + topicName) + .exchange() + .expectStatus() + .isOk() + .expectBody(TopicDetails.class) + .returnResult() + .getResponseBody(); + + assert topicDetails != null; + Assertions.assertEquals(10, topicDetails.getPartitionCount()); + } + @Test public void shouldReturn404ForNonExistingTopic() { var topicName = UUID.randomUUID().toString(); diff --git a/kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/ClusterServiceTest.java b/kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/ClusterServiceTest.java index 4f27026d2d..5c8d459992 100644 --- a/kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/ClusterServiceTest.java +++ b/kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/ClusterServiceTest.java @@ -202,4 +202,5 @@ class ClusterServiceTest { assertThat(topics.getTopics()).hasSize(25); assertThat(topics.getTopics()).map(Topic::getPartitionCount).isSorted(); } + } diff --git a/kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml b/kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml index a3208a1386..3a7a6fcd50 100644 --- a/kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml +++ b/kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml @@ -1193,6 +1193,38 @@ paths: schema: $ref: '#/components/schemas/ConnectorPluginConfigValidationResponse' + /api/clusters/{clusterName}/topics/{topicName}/partitions: + patch: + tags: + - Topics + summary: increaseTopicPartitions + operationId: increaseTopicPartitions + parameters: + - name: clusterName + in: path + required: true + schema: + type: string + - name: topicName + in: path + required: true + schema: + type: string + requestBody: + content: + application/json: + schema: + $ref: '#/components/schemas/PartitionsIncrease' + responses: + 200: + description: OK + content: + application/json: + schema: + $ref: '#/components/schemas/PartitionsIncreaseResponse' + 404: + description: Not found + components: schemas: ErrorResponse: @@ -1469,11 +1501,11 @@ components: clusterId: type: string consumerGroupId: - type: string + type: string numConsumers: - type: integer + type: integer numTopics: - type: integer + type: integer simple: type: boolean partitionAssignor: @@ -1637,7 +1669,7 @@ components: consumerGroupId: type: string simple: - type: boolean + type: boolean partitionAssignor: type: string state: @@ -1973,3 +2005,23 @@ components: - name - connect - status + + PartitionsIncrease: + type: object + properties: + totalPartitionsCount: + type: integer + minimum: 1 + required: + - totalPartitionsCount + + PartitionsIncreaseResponse: + type: object + properties: + totalPartitionsCount: + type: integer + topicName: + type: string + required: + - totalPartitionsCount + - topicName