diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/BrokersController.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/BrokersController.java index d1e90a545e..0b34efdbad 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/BrokersController.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/BrokersController.java @@ -3,6 +3,7 @@ package com.provectus.kafka.ui.controller; import com.provectus.kafka.ui.api.BrokersApi; import com.provectus.kafka.ui.model.Broker; import com.provectus.kafka.ui.model.BrokerConfig; +import com.provectus.kafka.ui.model.BrokerLogdirUpdate; import com.provectus.kafka.ui.model.BrokerMetrics; import com.provectus.kafka.ui.model.BrokersLogdirs; import com.provectus.kafka.ui.service.ClusterService; @@ -51,4 +52,13 @@ public class BrokersController implements BrokersApi { .map(ResponseEntity::ok) .onErrorReturn(ResponseEntity.notFound().build()); } + + @Override + public Mono> updateBrokerTopicPartitionLogDir( + String clusterName, Integer id, Mono brokerLogdir, + ServerWebExchange exchange) { + return brokerLogdir + .flatMap(bld -> clusterService.updateBrokerLogDir(clusterName, id, bld)) + .map(ResponseEntity::ok); + } } diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/exception/ErrorCode.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/exception/ErrorCode.java index 7c66fb98df..f87fccbaaf 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/exception/ErrorCode.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/exception/ErrorCode.java @@ -20,7 +20,9 @@ public enum ErrorCode { TOPIC_NOT_FOUND(4008, HttpStatus.NOT_FOUND), SCHEMA_NOT_FOUND(4009, HttpStatus.NOT_FOUND), CONNECT_NOT_FOUND(4010, HttpStatus.NOT_FOUND), - KSQLDB_NOT_FOUND(4011, HttpStatus.NOT_FOUND); + KSQLDB_NOT_FOUND(4011, HttpStatus.NOT_FOUND), + DIR_NOT_FOUND(4012, HttpStatus.BAD_REQUEST), + TOPIC_OR_PARTITION_NOT_FOUND(4013, HttpStatus.BAD_REQUEST); static { // codes uniqueness check diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/exception/LogDirNotFoundApiException.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/exception/LogDirNotFoundApiException.java new file mode 100644 index 0000000000..ab1666180d --- /dev/null +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/exception/LogDirNotFoundApiException.java @@ -0,0 +1,13 @@ +package com.provectus.kafka.ui.exception; + +public class LogDirNotFoundApiException extends CustomBaseException { + + public LogDirNotFoundApiException() { + super("The user-specified log directory is not found in the broker config."); + } + + @Override + public ErrorCode getErrorCode() { + return ErrorCode.DIR_NOT_FOUND; + } +} diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/exception/TopicOrPartitionNotFoundException.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/exception/TopicOrPartitionNotFoundException.java new file mode 100644 index 0000000000..0f67b4ff69 --- /dev/null +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/exception/TopicOrPartitionNotFoundException.java @@ -0,0 +1,13 @@ +package com.provectus.kafka.ui.exception; + +public class TopicOrPartitionNotFoundException extends CustomBaseException { + + public TopicOrPartitionNotFoundException() { + super("This server does not host this topic-partition."); + } + + @Override + public ErrorCode getErrorCode() { + return ErrorCode.TOPIC_OR_PARTITION_NOT_FOUND; + } +} diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/ClusterService.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/ClusterService.java index 4bfbea56fa..b75c7a6255 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/ClusterService.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/ClusterService.java @@ -9,6 +9,7 @@ import com.provectus.kafka.ui.mapper.ClusterMapper; import com.provectus.kafka.ui.mapper.DescribeLogDirsMapper; import com.provectus.kafka.ui.model.Broker; import com.provectus.kafka.ui.model.BrokerConfig; +import com.provectus.kafka.ui.model.BrokerLogdirUpdate; import com.provectus.kafka.ui.model.BrokerMetrics; import com.provectus.kafka.ui.model.BrokersLogdirs; import com.provectus.kafka.ui.model.Cluster; @@ -379,4 +380,10 @@ public class ClusterService { .map(describeLogDirsMapper::toBrokerLogDirsList) .flatMapMany(Flux::fromIterable); } + + public Mono updateBrokerLogDir( + String clusterName, Integer id, BrokerLogdirUpdate brokerLogDir) { + return Mono.justOrEmpty(clustersStorage.getClusterByName(clusterName)) + .flatMap(c -> kafkaService.updateBrokerLogDir(c, id, brokerLogDir)); + } } \ No newline at end of file diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/KafkaService.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/KafkaService.java index bfe0e7a829..8dc5c3aed3 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/KafkaService.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/KafkaService.java @@ -1,9 +1,12 @@ package com.provectus.kafka.ui.service; import com.provectus.kafka.ui.exception.IllegalEntityStateException; +import com.provectus.kafka.ui.exception.LogDirNotFoundApiException; import com.provectus.kafka.ui.exception.NotFoundException; import com.provectus.kafka.ui.exception.TopicMetadataException; +import com.provectus.kafka.ui.exception.TopicOrPartitionNotFoundException; import com.provectus.kafka.ui.exception.ValidationException; +import com.provectus.kafka.ui.model.BrokerLogdirUpdate; import com.provectus.kafka.ui.model.CleanupPolicy; import com.provectus.kafka.ui.model.CreateTopicMessage; import com.provectus.kafka.ui.model.ExtendedAdminClient; @@ -44,7 +47,6 @@ import java.util.Properties; import java.util.UUID; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ExecutionException; import java.util.stream.Collectors; import java.util.stream.Stream; import lombok.RequiredArgsConstructor; @@ -58,7 +60,6 @@ import org.apache.kafka.clients.admin.Config; import org.apache.kafka.clients.admin.ConfigEntry; import org.apache.kafka.clients.admin.ConsumerGroupListing; import org.apache.kafka.clients.admin.DescribeConfigsOptions; -import org.apache.kafka.clients.admin.DescribeLogDirsResult; import org.apache.kafka.clients.admin.ListTopicsOptions; import org.apache.kafka.clients.admin.NewPartitionReassignment; import org.apache.kafka.clients.admin.NewPartitions; @@ -73,8 +74,11 @@ import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; import org.apache.kafka.common.Node; import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.TopicPartitionReplica; import org.apache.kafka.common.config.ConfigResource; +import org.apache.kafka.common.errors.LogDirNotFoundException; import org.apache.kafka.common.errors.TimeoutException; +import org.apache.kafka.common.errors.UnknownTopicOrPartitionException; import org.apache.kafka.common.requests.DescribeLogDirsResponse; import org.apache.kafka.common.serialization.ByteArraySerializer; import org.apache.kafka.common.serialization.BytesDeserializer; @@ -940,6 +944,26 @@ public class KafkaService { return result; } + public Mono updateBrokerLogDir(KafkaCluster cluster, Integer broker, + BrokerLogdirUpdate brokerLogDir) { + return getOrCreateAdminClient(cluster) + .flatMap(ac -> updateBrokerLogDir(ac, brokerLogDir, broker)); + } + private Mono updateBrokerLogDir(ExtendedAdminClient adminMono, + BrokerLogdirUpdate b, + Integer broker) { + Map req = Map.of( + new TopicPartitionReplica(b.getTopic(), b.getPartition(), broker), + b.getLogDir()); + return Mono.just(adminMono) + .map(admin -> admin.getAdminClient().alterReplicaLogDirs(req)) + .flatMap(result -> ClusterUtil.toMono(result.all())) + .onErrorResume(UnknownTopicOrPartitionException.class, + e -> Mono.error(new TopicOrPartitionNotFoundException())) + .onErrorResume(LogDirNotFoundException.class, + e -> Mono.error(new LogDirNotFoundApiException())) + .doOnError(log::error); + } } diff --git a/kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/LogDirsTest.java b/kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/LogDirsTest.java index 79bcce8d82..5412086a57 100644 --- a/kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/LogDirsTest.java +++ b/kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/LogDirsTest.java @@ -3,9 +3,14 @@ package com.provectus.kafka.ui.service; import static org.assertj.core.api.Assertions.assertThat; import com.provectus.kafka.ui.AbstractBaseTest; +import com.provectus.kafka.ui.exception.LogDirNotFoundApiException; +import com.provectus.kafka.ui.exception.TopicOrPartitionNotFoundException; +import com.provectus.kafka.ui.model.BrokerLogdirUpdate; import com.provectus.kafka.ui.model.BrokerTopicLogdirs; import com.provectus.kafka.ui.model.BrokersLogdirs; +import com.provectus.kafka.ui.model.ErrorResponse; import java.util.List; +import java.util.Map; import org.junit.jupiter.api.Test; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.autoconfigure.web.reactive.AutoConfigureWebTestClient; @@ -82,4 +87,41 @@ public class LogDirsTest extends AbstractBaseTest { assertThat(dirs).isEmpty(); } + + @Test + public void testChangeDirToWrongDir() { + ErrorResponse dirs = webTestClient.patch() + .uri("/api/clusters/{clusterName}/brokers/{id}/logdirs", LOCAL, 1) + .bodyValue(Map.of( + "topic", "__consumer_offsets", + "partition", "0", + "logDir", "/asdf/as" + ) + ) + .exchange() + .expectStatus().isBadRequest() + .expectBody(ErrorResponse.class) + .returnResult() + .getResponseBody(); + + assertThat(dirs.getMessage()) + .isEqualTo(new LogDirNotFoundApiException().getMessage()); + + dirs = webTestClient.patch() + .uri("/api/clusters/{clusterName}/brokers/{id}/logdirs", LOCAL, 1) + .bodyValue(Map.of( + "topic", "asdf", + "partition", "0", + "logDir", "/var/lib/kafka/data" + ) + ) + .exchange() + .expectStatus().isBadRequest() + .expectBody(ErrorResponse.class) + .returnResult() + .getResponseBody(); + + assertThat(dirs.getMessage()) + .isEqualTo(new TopicOrPartitionNotFoundException().getMessage()); + } } diff --git a/kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml b/kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml index bc849e6350..7fc50d5358 100644 --- a/kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml +++ b/kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml @@ -201,6 +201,31 @@ paths: items: $ref: '#/components/schemas/BrokersLogdirs' + /api/clusters/{clusterName}/brokers/{id}/logdirs: + patch: + tags: + - Brokers + summary: updateBrokerTopicPartitionLogDir + operationId: updateBrokerTopicPartitionLogDir + parameters: + - name: clusterName + in: path + required: true + schema: + type: string + - name: id + in: path + required: true + schema: + type: integer + requestBody: + content: + application/json: + schema: + $ref: '#/components/schemas/BrokerLogdirUpdate' + responses: + 200: + description: OK /api/clusters/{clusterName}/topics: get: @@ -1702,6 +1727,16 @@ components: required: - id + BrokerLogdirUpdate: + type: object + properties: + topic: + type: string + partition: + type: integer + logDir: + type: string + ConsumerGroupState: type: string enum: