Browse Source

#708 Added log dirs alter operation (#723)

* #708 Added log dirs alter operation
Timur Davletov 3 years ago
parent
commit
2ab1601a7f

+ 10 - 0
kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/BrokersController.java

@@ -3,6 +3,7 @@ package com.provectus.kafka.ui.controller;
 import com.provectus.kafka.ui.api.BrokersApi;
 import com.provectus.kafka.ui.model.Broker;
 import com.provectus.kafka.ui.model.BrokerConfig;
+import com.provectus.kafka.ui.model.BrokerLogdirUpdate;
 import com.provectus.kafka.ui.model.BrokerMetrics;
 import com.provectus.kafka.ui.model.BrokersLogdirs;
 import com.provectus.kafka.ui.service.ClusterService;
@@ -51,4 +52,13 @@ public class BrokersController implements BrokersApi {
         .map(ResponseEntity::ok)
         .onErrorReturn(ResponseEntity.notFound().build());
   }
+
+  @Override
+  public Mono<ResponseEntity<Void>> updateBrokerTopicPartitionLogDir(
+      String clusterName, Integer id, Mono<BrokerLogdirUpdate> brokerLogdir,
+      ServerWebExchange exchange) {
+    return brokerLogdir
+        .flatMap(bld -> clusterService.updateBrokerLogDir(clusterName, id, bld))
+        .map(ResponseEntity::ok);
+  }
 }

+ 3 - 1
kafka-ui-api/src/main/java/com/provectus/kafka/ui/exception/ErrorCode.java

@@ -20,7 +20,9 @@ public enum ErrorCode {
   TOPIC_NOT_FOUND(4008, HttpStatus.NOT_FOUND),
   SCHEMA_NOT_FOUND(4009, HttpStatus.NOT_FOUND),
   CONNECT_NOT_FOUND(4010, HttpStatus.NOT_FOUND),
-  KSQLDB_NOT_FOUND(4011, HttpStatus.NOT_FOUND);
+  KSQLDB_NOT_FOUND(4011, HttpStatus.NOT_FOUND),
+  DIR_NOT_FOUND(4012, HttpStatus.BAD_REQUEST),
+  TOPIC_OR_PARTITION_NOT_FOUND(4013, HttpStatus.BAD_REQUEST);
 
   static {
     // codes uniqueness check

+ 13 - 0
kafka-ui-api/src/main/java/com/provectus/kafka/ui/exception/LogDirNotFoundApiException.java

@@ -0,0 +1,13 @@
+package com.provectus.kafka.ui.exception;
+
+public class LogDirNotFoundApiException extends CustomBaseException {
+
+  public LogDirNotFoundApiException() {
+    super("The user-specified log directory is not found in the broker config.");
+  }
+
+  @Override
+  public ErrorCode getErrorCode() {
+    return ErrorCode.DIR_NOT_FOUND;
+  }
+}

+ 13 - 0
kafka-ui-api/src/main/java/com/provectus/kafka/ui/exception/TopicOrPartitionNotFoundException.java

@@ -0,0 +1,13 @@
+package com.provectus.kafka.ui.exception;
+
+public class TopicOrPartitionNotFoundException extends CustomBaseException {
+
+  public TopicOrPartitionNotFoundException() {
+    super("This server does not host this topic-partition.");
+  }
+
+  @Override
+  public ErrorCode getErrorCode() {
+    return ErrorCode.TOPIC_OR_PARTITION_NOT_FOUND;
+  }
+}

+ 7 - 0
kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/ClusterService.java

@@ -9,6 +9,7 @@ import com.provectus.kafka.ui.mapper.ClusterMapper;
 import com.provectus.kafka.ui.mapper.DescribeLogDirsMapper;
 import com.provectus.kafka.ui.model.Broker;
 import com.provectus.kafka.ui.model.BrokerConfig;
+import com.provectus.kafka.ui.model.BrokerLogdirUpdate;
 import com.provectus.kafka.ui.model.BrokerMetrics;
 import com.provectus.kafka.ui.model.BrokersLogdirs;
 import com.provectus.kafka.ui.model.Cluster;
@@ -379,4 +380,10 @@ public class ClusterService {
         .map(describeLogDirsMapper::toBrokerLogDirsList)
         .flatMapMany(Flux::fromIterable);
   }
+
+  public Mono<Void> updateBrokerLogDir(
+      String clusterName, Integer id, BrokerLogdirUpdate brokerLogDir) {
+    return Mono.justOrEmpty(clustersStorage.getClusterByName(clusterName))
+        .flatMap(c -> kafkaService.updateBrokerLogDir(c, id, brokerLogDir));
+  }
 }

+ 26 - 2
kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/KafkaService.java

@@ -1,9 +1,12 @@
 package com.provectus.kafka.ui.service;
 
 import com.provectus.kafka.ui.exception.IllegalEntityStateException;
+import com.provectus.kafka.ui.exception.LogDirNotFoundApiException;
 import com.provectus.kafka.ui.exception.NotFoundException;
 import com.provectus.kafka.ui.exception.TopicMetadataException;
+import com.provectus.kafka.ui.exception.TopicOrPartitionNotFoundException;
 import com.provectus.kafka.ui.exception.ValidationException;
+import com.provectus.kafka.ui.model.BrokerLogdirUpdate;
 import com.provectus.kafka.ui.model.CleanupPolicy;
 import com.provectus.kafka.ui.model.CreateTopicMessage;
 import com.provectus.kafka.ui.model.ExtendedAdminClient;
@@ -44,7 +47,6 @@ import java.util.Properties;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ExecutionException;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 import lombok.RequiredArgsConstructor;
@@ -58,7 +60,6 @@ import org.apache.kafka.clients.admin.Config;
 import org.apache.kafka.clients.admin.ConfigEntry;
 import org.apache.kafka.clients.admin.ConsumerGroupListing;
 import org.apache.kafka.clients.admin.DescribeConfigsOptions;
-import org.apache.kafka.clients.admin.DescribeLogDirsResult;
 import org.apache.kafka.clients.admin.ListTopicsOptions;
 import org.apache.kafka.clients.admin.NewPartitionReassignment;
 import org.apache.kafka.clients.admin.NewPartitions;
@@ -73,8 +74,11 @@ import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.clients.producer.RecordMetadata;
 import org.apache.kafka.common.Node;
 import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.TopicPartitionReplica;
 import org.apache.kafka.common.config.ConfigResource;
+import org.apache.kafka.common.errors.LogDirNotFoundException;
 import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
 import org.apache.kafka.common.requests.DescribeLogDirsResponse;
 import org.apache.kafka.common.serialization.ByteArraySerializer;
 import org.apache.kafka.common.serialization.BytesDeserializer;
@@ -940,6 +944,26 @@ public class KafkaService {
     return result;
   }
 
+  public Mono<Void> updateBrokerLogDir(KafkaCluster cluster, Integer broker,
+                                                           BrokerLogdirUpdate brokerLogDir) {
+    return getOrCreateAdminClient(cluster)
+        .flatMap(ac -> updateBrokerLogDir(ac, brokerLogDir, broker));
+  }
 
+  private Mono<Void> updateBrokerLogDir(ExtendedAdminClient adminMono,
+                                        BrokerLogdirUpdate b,
+                                        Integer broker) {
 
+    Map<TopicPartitionReplica, String> req = Map.of(
+        new TopicPartitionReplica(b.getTopic(), b.getPartition(), broker),
+        b.getLogDir());
+    return Mono.just(adminMono)
+        .map(admin -> admin.getAdminClient().alterReplicaLogDirs(req))
+        .flatMap(result -> ClusterUtil.toMono(result.all()))
+        .onErrorResume(UnknownTopicOrPartitionException.class,
+            e -> Mono.error(new TopicOrPartitionNotFoundException()))
+        .onErrorResume(LogDirNotFoundException.class,
+            e -> Mono.error(new LogDirNotFoundApiException()))
+        .doOnError(log::error);
+  }
 }

+ 42 - 0
kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/LogDirsTest.java

@@ -3,9 +3,14 @@ package com.provectus.kafka.ui.service;
 import static org.assertj.core.api.Assertions.assertThat;
 
 import com.provectus.kafka.ui.AbstractBaseTest;
+import com.provectus.kafka.ui.exception.LogDirNotFoundApiException;
+import com.provectus.kafka.ui.exception.TopicOrPartitionNotFoundException;
+import com.provectus.kafka.ui.model.BrokerLogdirUpdate;
 import com.provectus.kafka.ui.model.BrokerTopicLogdirs;
 import com.provectus.kafka.ui.model.BrokersLogdirs;
+import com.provectus.kafka.ui.model.ErrorResponse;
 import java.util.List;
+import java.util.Map;
 import org.junit.jupiter.api.Test;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.boot.test.autoconfigure.web.reactive.AutoConfigureWebTestClient;
@@ -82,4 +87,41 @@ public class LogDirsTest extends AbstractBaseTest {
 
     assertThat(dirs).isEmpty();
   }
+
+  @Test
+  public void testChangeDirToWrongDir() {
+    ErrorResponse dirs = webTestClient.patch()
+        .uri("/api/clusters/{clusterName}/brokers/{id}/logdirs", LOCAL, 1)
+        .bodyValue(Map.of(
+            "topic", "__consumer_offsets",
+            "partition", "0",
+            "logDir", "/asdf/as"
+            )
+        )
+        .exchange()
+        .expectStatus().isBadRequest()
+        .expectBody(ErrorResponse.class)
+        .returnResult()
+        .getResponseBody();
+
+    assertThat(dirs.getMessage())
+        .isEqualTo(new LogDirNotFoundApiException().getMessage());
+
+    dirs = webTestClient.patch()
+        .uri("/api/clusters/{clusterName}/brokers/{id}/logdirs", LOCAL, 1)
+        .bodyValue(Map.of(
+            "topic", "asdf",
+            "partition", "0",
+            "logDir", "/var/lib/kafka/data"
+            )
+        )
+        .exchange()
+        .expectStatus().isBadRequest()
+        .expectBody(ErrorResponse.class)
+        .returnResult()
+        .getResponseBody();
+
+    assertThat(dirs.getMessage())
+        .isEqualTo(new TopicOrPartitionNotFoundException().getMessage());
+  }
 }

+ 35 - 0
kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml

@@ -201,6 +201,31 @@ paths:
                 items:
                   $ref: '#/components/schemas/BrokersLogdirs'
 
+  /api/clusters/{clusterName}/brokers/{id}/logdirs:
+    patch:
+      tags:
+        - Brokers
+      summary: updateBrokerTopicPartitionLogDir
+      operationId: updateBrokerTopicPartitionLogDir
+      parameters:
+        - name: clusterName
+          in: path
+          required: true
+          schema:
+            type: string
+        - name: id
+          in: path
+          required: true
+          schema:
+            type: integer
+      requestBody:
+        content:
+          application/json:
+            schema:
+              $ref: '#/components/schemas/BrokerLogdirUpdate'
+      responses:
+        200:
+          description: OK
 
   /api/clusters/{clusterName}/topics:
     get:
@@ -1702,6 +1727,16 @@ components:
       required:
         - id
 
+    BrokerLogdirUpdate:
+      type: object
+      properties:
+        topic:
+          type: string
+        partition:
+          type: integer
+        logDir:
+          type: string
+
     ConsumerGroupState:
       type: string
       enum: