Issue#314 partitions count increase (#579)

* adding topic's partitions count increase

* adding tests

* making pull request fixes

* making pull request fixes

* adding checkstyle fix

* pull request fixes

* adding cluster update after increasing partitions

* pull requset fix

Co-authored-by: marselakhmetov <makhmetov@provectus.com>
This commit is contained in:
Marsel 2021-07-02 14:38:22 +03:00 committed by GitHub
parent 64a5985e81
commit 09ebd03e71
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
6 changed files with 178 additions and 4 deletions

View file

@ -1,6 +1,8 @@
package com.provectus.kafka.ui.controller;
import com.provectus.kafka.ui.api.TopicsApi;
import com.provectus.kafka.ui.model.PartitionsIncrease;
import com.provectus.kafka.ui.model.PartitionsIncreaseResponse;
import com.provectus.kafka.ui.model.Topic;
import com.provectus.kafka.ui.model.TopicColumnsToSort;
import com.provectus.kafka.ui.model.TopicConfig;
@ -86,4 +88,14 @@ public class TopicsController implements TopicsApi {
ServerWebExchange exchange) {
return clusterService.updateTopic(clusterId, topicName, topicUpdate).map(ResponseEntity::ok);
}
@Override
public Mono<ResponseEntity<PartitionsIncreaseResponse>> increaseTopicPartitions(
String clusterName, String topicName,
Mono<PartitionsIncrease> partitionsIncrease,
ServerWebExchange exchange) {
return partitionsIncrease.flatMap(
partitions -> clusterService.increaseTopicPartitions(clusterName, topicName, partitions))
.map(ResponseEntity::ok);
}
}

View file

@ -17,6 +17,8 @@ import com.provectus.kafka.ui.model.CreateTopicMessage;
import com.provectus.kafka.ui.model.ExtendedAdminClient;
import com.provectus.kafka.ui.model.InternalTopic;
import com.provectus.kafka.ui.model.KafkaCluster;
import com.provectus.kafka.ui.model.PartitionsIncrease;
import com.provectus.kafka.ui.model.PartitionsIncreaseResponse;
import com.provectus.kafka.ui.model.Topic;
import com.provectus.kafka.ui.model.TopicColumnsToSort;
import com.provectus.kafka.ui.model.TopicConfig;
@ -285,6 +287,21 @@ public class ClusterService {
.flatMap(offsets -> kafkaService.deleteTopicMessages(cluster, offsets));
}
public Mono<PartitionsIncreaseResponse> increaseTopicPartitions(
String clusterName,
String topicName,
PartitionsIncrease partitionsIncrease) {
return clustersStorage.getClusterByName(clusterName).map(cluster ->
kafkaService.increaseTopicPartitions(cluster, topicName, partitionsIncrease)
.doOnNext(t -> updateCluster(t, cluster.getName(), cluster))
.map(t -> new PartitionsIncreaseResponse()
.topicName(t.getName())
.totalPartitionsCount(t.getPartitionCount())))
.orElse(Mono.error(new ClusterNotFoundException(
String.format("No cluster for name '%s'", clusterName)
)));
}
public Mono<Void> deleteConsumerGroupById(String clusterName,
String groupId) {
return clustersStorage.getClusterByName(clusterName)

View file

@ -1,5 +1,6 @@
package com.provectus.kafka.ui.service;
import com.provectus.kafka.ui.exception.ValidationException;
import com.provectus.kafka.ui.model.ConsumerGroup;
import com.provectus.kafka.ui.model.CreateTopicMessage;
import com.provectus.kafka.ui.model.ExtendedAdminClient;
@ -12,6 +13,8 @@ import com.provectus.kafka.ui.model.InternalTopic;
import com.provectus.kafka.ui.model.InternalTopicConfig;
import com.provectus.kafka.ui.model.KafkaCluster;
import com.provectus.kafka.ui.model.Metric;
import com.provectus.kafka.ui.model.PartitionsIncrease;
import com.provectus.kafka.ui.model.PartitionsIncreaseResponse;
import com.provectus.kafka.ui.model.ServerStatus;
import com.provectus.kafka.ui.model.TopicConsumerGroups;
import com.provectus.kafka.ui.model.TopicCreation;
@ -47,6 +50,7 @@ import org.apache.kafka.clients.admin.ConfigEntry;
import org.apache.kafka.clients.admin.ConsumerGroupDescription;
import org.apache.kafka.clients.admin.ConsumerGroupListing;
import org.apache.kafka.clients.admin.ListTopicsOptions;
import org.apache.kafka.clients.admin.NewPartitions;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.admin.RecordsToDelete;
import org.apache.kafka.clients.consumer.ConsumerConfig;
@ -674,4 +678,41 @@ public class KafkaService {
}
}
private Mono<InternalTopic> increaseTopicPartitions(AdminClient adminClient,
String topicName,
Map<String, NewPartitions> newPartitionsMap
) {
return ClusterUtil.toMono(adminClient.createPartitions(newPartitionsMap).all(), topicName)
.flatMap(topic -> getTopicsData(adminClient, Collections.singleton(topic)).next());
}
public Mono<InternalTopic> increaseTopicPartitions(
KafkaCluster cluster,
String topicName,
PartitionsIncrease partitionsIncrease) {
return getOrCreateAdminClient(cluster)
.flatMap(ac -> {
Integer actualCount = cluster.getTopics().get(topicName).getPartitionCount();
Integer requestedCount = partitionsIncrease.getTotalPartitionsCount();
if (requestedCount < actualCount) {
return Mono.error(
new ValidationException(String.format(
"Topic currently has %s partitions, which is higher than the requested %s.",
actualCount, requestedCount)));
}
if (requestedCount.equals(actualCount)) {
return Mono.error(
new ValidationException(
String.format("Topic already has %s partitions.", actualCount)));
}
Map<String, NewPartitions> newPartitionsMap = Collections.singletonMap(
topicName,
NewPartitions.increaseTo(partitionsIncrease.getTotalPartitionsCount())
);
return increaseTopicPartitions(ac.getAdminClient(), topicName, newPartitionsMap);
});
}
}

View file

@ -1,12 +1,16 @@
package com.provectus.kafka.ui;
import com.provectus.kafka.ui.model.PartitionsIncrease;
import com.provectus.kafka.ui.model.PartitionsIncreaseResponse;
import com.provectus.kafka.ui.model.TopicCreation;
import com.provectus.kafka.ui.model.TopicDetails;
import com.provectus.kafka.ui.model.TopicMessage;
import com.provectus.kafka.ui.producer.KafkaTestProducer;
import java.util.Map;
import java.util.UUID;
import java.util.stream.Stream;
import lombok.extern.log4j.Log4j2;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.autoconfigure.web.reactive.AutoConfigureWebTestClient;
@ -65,6 +69,53 @@ public class KafkaConsumerTests extends AbstractBaseTest {
.hasSize(0);
}
@Test
public void shouldIncreasePartitionsUpTo10() {
var topicName = UUID.randomUUID().toString();
webTestClient.post()
.uri("/api/clusters/{clusterName}/topics", LOCAL)
.bodyValue(new TopicCreation()
.name(topicName)
.partitions(1)
.replicationFactor(1)
.configs(Map.of())
)
.exchange()
.expectStatus()
.isOk();
PartitionsIncreaseResponse response = webTestClient.patch()
.uri("/api/clusters/{clusterName}/topics/{topicName}/partitions",
LOCAL,
topicName)
.bodyValue(new PartitionsIncrease()
.totalPartitionsCount(10)
)
.exchange()
.expectStatus()
.isOk()
.expectBody(PartitionsIncreaseResponse.class)
.returnResult()
.getResponseBody();
assert response != null;
Assertions.assertEquals(10, response.getTotalPartitionsCount());
TopicDetails topicDetails = webTestClient.get()
.uri("/api/clusters/{clusterName}/topics/{topicName}",
LOCAL,
topicName)
.exchange()
.expectStatus()
.isOk()
.expectBody(TopicDetails.class)
.returnResult()
.getResponseBody();
assert topicDetails != null;
Assertions.assertEquals(10, topicDetails.getPartitionCount());
}
@Test
public void shouldReturn404ForNonExistingTopic() {
var topicName = UUID.randomUUID().toString();

View file

@ -202,4 +202,5 @@ class ClusterServiceTest {
assertThat(topics.getTopics()).hasSize(25);
assertThat(topics.getTopics()).map(Topic::getPartitionCount).isSorted();
}
}

View file

@ -1193,6 +1193,38 @@ paths:
schema:
$ref: '#/components/schemas/ConnectorPluginConfigValidationResponse'
/api/clusters/{clusterName}/topics/{topicName}/partitions:
patch:
tags:
- Topics
summary: increaseTopicPartitions
operationId: increaseTopicPartitions
parameters:
- name: clusterName
in: path
required: true
schema:
type: string
- name: topicName
in: path
required: true
schema:
type: string
requestBody:
content:
application/json:
schema:
$ref: '#/components/schemas/PartitionsIncrease'
responses:
200:
description: OK
content:
application/json:
schema:
$ref: '#/components/schemas/PartitionsIncreaseResponse'
404:
description: Not found
components:
schemas:
ErrorResponse:
@ -1469,11 +1501,11 @@ components:
clusterId:
type: string
consumerGroupId:
type: string
type: string
numConsumers:
type: integer
type: integer
numTopics:
type: integer
type: integer
simple:
type: boolean
partitionAssignor:
@ -1637,7 +1669,7 @@ components:
consumerGroupId:
type: string
simple:
type: boolean
type: boolean
partitionAssignor:
type: string
state:
@ -1973,3 +2005,23 @@ components:
- name
- connect
- status
PartitionsIncrease:
type: object
properties:
totalPartitionsCount:
type: integer
minimum: 1
required:
- totalPartitionsCount
PartitionsIncreaseResponse:
type: object
properties:
totalPartitionsCount:
type: integer
topicName:
type: string
required:
- totalPartitionsCount
- topicName