浏览代码

#702 Supported incremental config update for brokers (#751)

Timur Davletov 3 年之前
父节点
当前提交
fa4ef337a7

+ 13 - 0
kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/BrokersController.java

@@ -3,6 +3,7 @@ package com.provectus.kafka.ui.controller;
 import com.provectus.kafka.ui.api.BrokersApi;
 import com.provectus.kafka.ui.model.Broker;
 import com.provectus.kafka.ui.model.BrokerConfig;
+import com.provectus.kafka.ui.model.BrokerConfigItem;
 import com.provectus.kafka.ui.model.BrokerLogdirUpdate;
 import com.provectus.kafka.ui.model.BrokerMetrics;
 import com.provectus.kafka.ui.model.BrokersLogdirs;
@@ -61,4 +62,16 @@ public class BrokersController implements BrokersApi {
         .flatMap(bld -> clusterService.updateBrokerLogDir(clusterName, id, bld))
         .map(ResponseEntity::ok);
   }
+
+  @Override
+  public Mono<ResponseEntity<Void>> updateBrokerConfigByName(String clusterName,
+                                                             Integer id,
+                                                             String name,
+                                                             Mono<BrokerConfigItem> brokerConfig,
+                                                             ServerWebExchange exchange) {
+    return brokerConfig
+        .flatMap(bci -> clusterService.updateBrokerConfigByName(
+            clusterName, id, name, bci.getValue()))
+        .map(ResponseEntity::ok);
+  }
 }

+ 2 - 1
kafka-ui-api/src/main/java/com/provectus/kafka/ui/exception/ErrorCode.java

@@ -22,7 +22,8 @@ public enum ErrorCode {
   CONNECT_NOT_FOUND(4010, HttpStatus.NOT_FOUND),
   KSQLDB_NOT_FOUND(4011, HttpStatus.NOT_FOUND),
   DIR_NOT_FOUND(4012, HttpStatus.BAD_REQUEST),
-  TOPIC_OR_PARTITION_NOT_FOUND(4013, HttpStatus.BAD_REQUEST);
+  TOPIC_OR_PARTITION_NOT_FOUND(4013, HttpStatus.BAD_REQUEST),
+  INVALID_REQUEST(4014, HttpStatus.BAD_REQUEST);
 
   static {
     // codes uniqueness check

+ 13 - 0
kafka-ui-api/src/main/java/com/provectus/kafka/ui/exception/InvalidRequestApiException.java

@@ -0,0 +1,13 @@
+package com.provectus.kafka.ui.exception;
+
+public class InvalidRequestApiException extends CustomBaseException {
+
+  public InvalidRequestApiException(String message) {
+    super(message);
+  }
+
+  @Override
+  public ErrorCode getErrorCode() {
+    return ErrorCode.INVALID_REQUEST;
+  }
+}

+ 8 - 0
kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/ClusterService.java

@@ -386,4 +386,12 @@ public class ClusterService {
     return Mono.justOrEmpty(clustersStorage.getClusterByName(clusterName))
         .flatMap(c -> kafkaService.updateBrokerLogDir(c, id, brokerLogDir));
   }
+
+  public Mono<Void> updateBrokerConfigByName(String clusterName,
+                                             Integer id,
+                                             String name,
+                                             String value) {
+    return Mono.justOrEmpty(clustersStorage.getClusterByName(clusterName))
+        .flatMap(c -> kafkaService.updateBrokerConfigByName(c, id, name, value));
+  }
 }

+ 25 - 0
kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/KafkaService.java

@@ -1,6 +1,7 @@
 package com.provectus.kafka.ui.service;
 
 import com.provectus.kafka.ui.exception.IllegalEntityStateException;
+import com.provectus.kafka.ui.exception.InvalidRequestApiException;
 import com.provectus.kafka.ui.exception.LogDirNotFoundApiException;
 import com.provectus.kafka.ui.exception.NotFoundException;
 import com.provectus.kafka.ui.exception.TopicMetadataException;
@@ -76,6 +77,7 @@ import org.apache.kafka.common.Node;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.TopicPartitionReplica;
 import org.apache.kafka.common.config.ConfigResource;
+import org.apache.kafka.common.errors.InvalidRequestException;
 import org.apache.kafka.common.errors.LogDirNotFoundException;
 import org.apache.kafka.common.errors.TimeoutException;
 import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
@@ -966,4 +968,27 @@ public class KafkaService {
             e -> Mono.error(new LogDirNotFoundApiException()))
         .doOnError(log::error);
   }
+
+  public Mono<Void> updateBrokerConfigByName(KafkaCluster cluster,
+                                             Integer broker,
+                                             String name,
+                                             String value) {
+    return getOrCreateAdminClient(cluster)
+        .flatMap(ac -> updateBrokerConfigByName(ac, broker, name, value));
+  }
+
+  private Mono<Void> updateBrokerConfigByName(ExtendedAdminClient admin,
+                                              Integer broker,
+                                              String name,
+                                              String value) {
+    ConfigResource cr = new ConfigResource(ConfigResource.Type.BROKER, String.valueOf(broker));
+    AlterConfigOp op = new AlterConfigOp(new ConfigEntry(name, value), AlterConfigOp.OpType.SET);
+
+    return Mono.just(admin)
+        .map(a -> a.getAdminClient().incrementalAlterConfigs(Map.of(cr, List.of(op))))
+        .flatMap(result -> ClusterUtil.toMono(result.all()))
+        .onErrorResume(InvalidRequestException.class,
+            e -> Mono.error(new InvalidRequestApiException(e.getMessage())))
+        .doOnError(log::error);
+  }
 }

+ 79 - 0
kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/ConfigTest.java

@@ -0,0 +1,79 @@
+package com.provectus.kafka.ui.service;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import com.provectus.kafka.ui.AbstractBaseTest;
+import com.provectus.kafka.ui.model.BrokerConfig;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import org.junit.jupiter.api.Test;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.test.autoconfigure.web.reactive.AutoConfigureWebTestClient;
+import org.springframework.core.ParameterizedTypeReference;
+import org.springframework.test.context.ContextConfiguration;
+import org.springframework.test.web.reactive.server.WebTestClient;
+
+@ContextConfiguration(initializers = {AbstractBaseTest.Initializer.class})
+@AutoConfigureWebTestClient(timeout = "60000")
+public class ConfigTest extends AbstractBaseTest {
+
+  @Autowired
+  private WebTestClient webTestClient;
+
+  @Test
+  public void testAlterConfig() throws Exception {
+    String name = "background.threads";
+
+    Optional<BrokerConfig> bc = getConfig(name);
+    assertThat(bc.isPresent()).isTrue();
+    assertThat(bc.get().getValue()).isEqualTo("10");
+
+    webTestClient.put()
+        .uri("/api/clusters/{clusterName}/brokers/{id}/configs/{name}", LOCAL, 1, name)
+        .bodyValue(Map.of(
+            "name", name,
+            "value", "5"
+            )
+        )
+        .exchange()
+        .expectStatus().isOk();
+
+    // Without sleep it returns old config so we need to wait a little bit
+    Thread.sleep(1000);
+
+    Optional<BrokerConfig> bcc = getConfig(name);
+    assertThat(bcc.isPresent()).isTrue();
+    assertThat(bcc.get().getValue()).isEqualTo("5");
+  }
+
+  @Test
+  public void testAlterReadonlyConfig() {
+    String name = "log.dirs";
+
+    webTestClient.put()
+        .uri("/api/clusters/{clusterName}/brokers/{id}/configs/{name}", LOCAL, 1, name)
+        .bodyValue(Map.of(
+            "name", name,
+            "value", "/var/lib/kafka2"
+            )
+        )
+        .exchange()
+        .expectStatus().isBadRequest();
+  }
+
+  private Optional<BrokerConfig> getConfig(String name) {
+    List<BrokerConfig> configs = webTestClient.get()
+        .uri("/api/clusters/{clusterName}/brokers/{id}/configs", LOCAL, 1)
+        .exchange()
+        .expectStatus().isOk()
+        .expectBody(new ParameterizedTypeReference<List<BrokerConfig>>() {
+        })
+        .returnResult()
+        .getResponseBody();
+
+    return configs.stream()
+        .filter(c -> c.getName().equals(name))
+        .findAny();
+  }
+}

+ 0 - 1
kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/LogDirsTest.java

@@ -5,7 +5,6 @@ import static org.assertj.core.api.Assertions.assertThat;
 import com.provectus.kafka.ui.AbstractBaseTest;
 import com.provectus.kafka.ui.exception.LogDirNotFoundApiException;
 import com.provectus.kafka.ui.exception.TopicOrPartitionNotFoundException;
-import com.provectus.kafka.ui.model.BrokerLogdirUpdate;
 import com.provectus.kafka.ui.model.BrokerTopicLogdirs;
 import com.provectus.kafka.ui.model.BrokersLogdirs;
 import com.provectus.kafka.ui.model.ErrorResponse;

+ 37 - 0
kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml

@@ -106,6 +106,37 @@ paths:
         404:
           description: Not found
 
+  /api/clusters/{clusterName}/brokers/{id}/configs/{name}:
+    put:
+      tags:
+        - Brokers
+      summary: updateBrokerConfigByName
+      operationId: updateBrokerConfigByName
+      parameters:
+        - name: clusterName
+          in: path
+          required: true
+          schema:
+            type: string
+        - name: id
+          in: path
+          required: true
+          schema:
+            type: integer
+        - name: name
+          in: path
+          required: true
+          schema:
+            type: string
+      requestBody:
+        content:
+          application/json:
+            schema:
+              $ref: '#/components/schemas/BrokerConfigItem'
+      responses:
+        200:
+          description: OK
+
   /api/clusters/{clusterName}/metrics:
     get:
       tags:
@@ -2400,6 +2431,12 @@ components:
         - totalReplicationFactor
         - topicName
 
+    BrokerConfigItem:
+      type: object
+      properties:
+        value:
+          type: string
+
     BrokerConfig:
       type: object
       properties: