Browse Source

Add API to delete consumer group by id (#578)

* [issue-516] Add API to delete consumer groups by IDs

* Add more tests to check consumer groups deletions

* Refactor and fix code style

* Fix codestyle

* Rethrow OperationInterruptedException with 500 error code if a thread is interrupted

* Use SneakyTrhrows to handle InterruptedException. Refactor

* Change deletion of groups API to single group delete

* Fix codestyle

* Rollback changes in kafka-ui-react-app/package-lock.json
Ildar Almakaev 4 năm trước cách đây
mục cha
commit
5dd3944faa

+ 7 - 0
kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/ConsumerGroupsController.java

@@ -19,6 +19,13 @@ import reactor.core.publisher.Mono;
 public class ConsumerGroupsController implements ConsumerGroupsApi {
 public class ConsumerGroupsController implements ConsumerGroupsApi {
   private final ClusterService clusterService;
   private final ClusterService clusterService;
 
 
+  @Override
+  public Mono<ResponseEntity<Void>> deleteConsumerGroup(String clusterName, String id,
+                                                        ServerWebExchange exchange) {
+    return clusterService.deleteConsumerGroupById(clusterName, id)
+        .map(ResponseEntity::ok);
+  }
+
   @Override
   @Override
   public Mono<ResponseEntity<ConsumerGroupDetails>> getConsumerGroup(
   public Mono<ResponseEntity<ConsumerGroupDetails>> getConsumerGroup(
       String clusterName, String consumerGroupId, ServerWebExchange exchange) {
       String clusterName, String consumerGroupId, ServerWebExchange exchange) {

+ 2 - 0
kafka-ui-api/src/main/java/com/provectus/kafka/ui/exception/ErrorCode.java

@@ -9,6 +9,8 @@ public enum ErrorCode {
 
 
   UNEXPECTED(5000, HttpStatus.INTERNAL_SERVER_ERROR),
   UNEXPECTED(5000, HttpStatus.INTERNAL_SERVER_ERROR),
   BINDING_FAIL(4001, HttpStatus.BAD_REQUEST),
   BINDING_FAIL(4001, HttpStatus.BAD_REQUEST),
+  NOT_FOUND(404, HttpStatus.NOT_FOUND),
+  INVALID_ENTITY_STATE(4001, HttpStatus.BAD_REQUEST),
   VALIDATION_FAIL(4002, HttpStatus.BAD_REQUEST),
   VALIDATION_FAIL(4002, HttpStatus.BAD_REQUEST),
   READ_ONLY_MODE_ENABLE(4003, HttpStatus.METHOD_NOT_ALLOWED),
   READ_ONLY_MODE_ENABLE(4003, HttpStatus.METHOD_NOT_ALLOWED),
   REBALANCE_IN_PROGRESS(4004, HttpStatus.CONFLICT),
   REBALANCE_IN_PROGRESS(4004, HttpStatus.CONFLICT),

+ 12 - 0
kafka-ui-api/src/main/java/com/provectus/kafka/ui/exception/IllegalEntityStateException.java

@@ -0,0 +1,12 @@
+package com.provectus.kafka.ui.exception;
+
+public class IllegalEntityStateException extends CustomBaseException {
+  public IllegalEntityStateException(String message) {
+    super(message);
+  }
+
+  @Override
+  public ErrorCode getErrorCode() {
+    return ErrorCode.INVALID_ENTITY_STATE;
+  }
+}

+ 13 - 0
kafka-ui-api/src/main/java/com/provectus/kafka/ui/exception/NotFoundException.java

@@ -0,0 +1,13 @@
+package com.provectus.kafka.ui.exception;
+
+public class NotFoundException extends CustomBaseException {
+
+  public NotFoundException(String message) {
+    super(message);
+  }
+
+  @Override
+  public ErrorCode getErrorCode() {
+    return ErrorCode.NOT_FOUND;
+  }
+}

+ 31 - 0
kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/ClusterService.java

@@ -1,6 +1,8 @@
 package com.provectus.kafka.ui.service;
 package com.provectus.kafka.ui.service;
 
 
 import com.provectus.kafka.ui.exception.ClusterNotFoundException;
 import com.provectus.kafka.ui.exception.ClusterNotFoundException;
+import com.provectus.kafka.ui.exception.IllegalEntityStateException;
+import com.provectus.kafka.ui.exception.NotFoundException;
 import com.provectus.kafka.ui.exception.TopicNotFoundException;
 import com.provectus.kafka.ui.exception.TopicNotFoundException;
 import com.provectus.kafka.ui.mapper.ClusterMapper;
 import com.provectus.kafka.ui.mapper.ClusterMapper;
 import com.provectus.kafka.ui.model.Broker;
 import com.provectus.kafka.ui.model.Broker;
@@ -11,6 +13,7 @@ import com.provectus.kafka.ui.model.ClusterStats;
 import com.provectus.kafka.ui.model.ConsumerGroup;
 import com.provectus.kafka.ui.model.ConsumerGroup;
 import com.provectus.kafka.ui.model.ConsumerGroupDetails;
 import com.provectus.kafka.ui.model.ConsumerGroupDetails;
 import com.provectus.kafka.ui.model.ConsumerPosition;
 import com.provectus.kafka.ui.model.ConsumerPosition;
+import com.provectus.kafka.ui.model.ExtendedAdminClient;
 import com.provectus.kafka.ui.model.InternalTopic;
 import com.provectus.kafka.ui.model.InternalTopic;
 import com.provectus.kafka.ui.model.KafkaCluster;
 import com.provectus.kafka.ui.model.KafkaCluster;
 import com.provectus.kafka.ui.model.Topic;
 import com.provectus.kafka.ui.model.Topic;
@@ -33,8 +36,13 @@ import java.util.stream.Collectors;
 import java.util.stream.Stream;
 import java.util.stream.Stream;
 import lombok.RequiredArgsConstructor;
 import lombok.RequiredArgsConstructor;
 import lombok.SneakyThrows;
 import lombok.SneakyThrows;
+import lombok.extern.log4j.Log4j2;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.commons.lang3.StringUtils;
+import org.apache.kafka.clients.admin.DeleteConsumerGroupsResult;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.GroupIdNotFoundException;
+import org.apache.kafka.common.errors.GroupNotEmptyException;
+import org.jetbrains.annotations.NotNull;
 import org.springframework.stereotype.Service;
 import org.springframework.stereotype.Service;
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
 import reactor.core.publisher.Mono;
@@ -42,6 +50,7 @@ import reactor.util.function.Tuples;
 
 
 @Service
 @Service
 @RequiredArgsConstructor
 @RequiredArgsConstructor
+@Log4j2
 public class ClusterService {
 public class ClusterService {
   private static final Integer DEFAULT_PAGE_SIZE = 25;
   private static final Integer DEFAULT_PAGE_SIZE = 25;
 
 
@@ -272,5 +281,27 @@ public class ClusterService {
         .flatMap(offsets -> kafkaService.deleteTopicMessages(cluster, offsets));
         .flatMap(offsets -> kafkaService.deleteTopicMessages(cluster, offsets));
   }
   }
 
 
+  public Mono<Void> deleteConsumerGroupById(String clusterName,
+                                            String groupId) {
+    return clustersStorage.getClusterByName(clusterName)
+        .map(cluster -> kafkaService.getOrCreateAdminClient(cluster)
+            .map(ExtendedAdminClient::getAdminClient)
+            .map(adminClient -> adminClient.deleteConsumerGroups(List.of(groupId)))
+            .map(DeleteConsumerGroupsResult::all)
+            .flatMap(ClusterUtil::toMono)
+            .onErrorResume(this::reThrowCustomException)
+        )
+        .orElse(Mono.empty());
+  }
 
 
+  @NotNull
+  private Mono<Void> reThrowCustomException(Throwable e) {
+    if (e instanceof GroupIdNotFoundException) {
+      return Mono.error(new NotFoundException("The group id does not exist"));
+    } else if (e instanceof GroupNotEmptyException) {
+      return Mono.error(new IllegalEntityStateException("The group is not empty"));
+    } else {
+      return Mono.error(e);
+    }
+  }
 }
 }

+ 12 - 13
kafka-ui-api/src/main/resources/application-local.yml

@@ -1,24 +1,23 @@
 kafka:
 kafka:
   clusters:
   clusters:
-    -
-      name: local
-      bootstrapServers: localhost:9092
+    - name: local
+      bootstrapServers: localhost:9093
       zookeeper: localhost:2181
       zookeeper: localhost:2181
       schemaRegistry: http://localhost:8081
       schemaRegistry: http://localhost:8081
       kafkaConnect:
       kafkaConnect:
         - name: first
         - name: first
           address: http://localhost:8083
           address: http://localhost:8083
       jmxPort: 9997
       jmxPort: 9997
-    -
-      name: secondLocal
-      bootstrapServers: localhost:9093
-      zookeeper: localhost:2182
-      schemaRegistry: http://localhost:18085
-      kafkaConnect:
-        - name: first
-          address: http://localhost:8083
-      jmxPort: 9998
-      read-only: true
+  #    -
+  #      name: secondLocal
+  #      bootstrapServers: localhost:9093
+  #      zookeeper: localhost:2182
+  #      schemaRegistry: http://localhost:18085
+  #      kafkaConnect:
+  #        - name: first
+  #          address: http://localhost:8083
+  #      jmxPort: 9998
+  #      read-only: true
   admin-client-timeout: 5000
   admin-client-timeout: 5000
 zookeeper:
 zookeeper:
   connection-timeout: 1000
   connection-timeout: 1000

+ 9 - 10
kafka-ui-api/src/main/resources/application-sdp.yml

@@ -1,15 +1,14 @@
 kafka:
 kafka:
   clusters:
   clusters:
-    -
-      name: local
-      bootstrapServers: kafka0:29092
-      zookeeper: zookeeper0:2181
-      schemaRegistry: http://schemaregistry0:8085
-    -
-      name: secondLocal
-      zookeeper: zookeeper1:2181
-      bootstrapServers: kafka1:29092
-      schemaRegistry: http://schemaregistry1:8085
+    - name: local
+      bootstrapServers: localhost:9093
+      zookeeper: localhost:2181
+      schemaRegistry: http://localhost:8083
+  #    -
+  #      name: secondLocal
+  #      zookeeper: zookeeper1:2181
+  #      bootstrapServers: kafka1:29092
+  #      schemaRegistry: http://schemaregistry1:8085
   admin-client-timeout: 5000
   admin-client-timeout: 5000
 zookeeper:
 zookeeper:
   connection-timeout: 1000
   connection-timeout: 1000

+ 99 - 0
kafka-ui-api/src/test/java/com/provectus/kafka/ui/KakfaConsumerGroupTests.java

@@ -0,0 +1,99 @@
+package com.provectus.kafka.ui;
+
+import java.time.Duration;
+import java.util.List;
+import java.util.Properties;
+import java.util.UUID;
+import lombok.extern.log4j.Log4j2;
+import lombok.val;
+import org.apache.kafka.clients.admin.NewTopic;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.serialization.BytesDeserializer;
+import org.apache.kafka.common.utils.Bytes;
+import org.junit.jupiter.api.Test;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.test.autoconfigure.web.reactive.AutoConfigureWebTestClient;
+import org.springframework.test.context.ContextConfiguration;
+import org.springframework.test.web.reactive.server.WebTestClient;
+
+@ContextConfiguration(initializers = {AbstractBaseTest.Initializer.class})
+@Log4j2
+@AutoConfigureWebTestClient(timeout = "10000")
+public class KakfaConsumerGroupTests extends AbstractBaseTest {
+  @Autowired
+  WebTestClient webTestClient;
+
+  @Test
+  void shouldNotFoundWhenNoSuchConsumerGroupId() {
+    String groupId = "groupA";
+    String expError = "The group id does not exist";
+    webTestClient
+        .delete()
+        .uri("/api/clusters/{clusterName}/consumer-groups/{groupId}", LOCAL, groupId)
+        .exchange()
+        .expectStatus()
+        .isNotFound();
+  }
+
+  @Test
+  void shouldOkWhenConsumerGroupIsNotActive() {
+    String topicName = createTopicWithRandomName();
+
+    //Create a consumer and subscribe to the topic
+    String groupId = UUID.randomUUID().toString();
+    val consumer = createTestConsumerWithGroupId(groupId);
+    consumer.subscribe(List.of(topicName));
+    consumer.poll(Duration.ofMillis(100));
+
+    //Unsubscribe from all topics to be able to delete this consumer
+    consumer.unsubscribe();
+
+    //Delete the consumer when it's INACTIVE and check
+    webTestClient
+        .delete()
+        .uri("/api/clusters/{clusterName}/consumer-groups/{groupId}", LOCAL, groupId)
+        .exchange()
+        .expectStatus()
+        .isOk();
+  }
+
+  @Test
+  void shouldBeBadRequestWhenConsumerGroupIsActive() {
+    String topicName = createTopicWithRandomName();
+
+    //Create a consumer and subscribe to the topic
+    String groupId = UUID.randomUUID().toString();
+    val consumer = createTestConsumerWithGroupId(groupId);
+    consumer.subscribe(List.of(topicName));
+    consumer.poll(Duration.ofMillis(100));
+
+    //Try to delete the consumer when it's ACTIVE
+    String expError = "The group is not empty";
+    webTestClient
+        .delete()
+        .uri("/api/clusters/{clusterName}/consumer-groups/{groupId}", LOCAL, groupId)
+        .exchange()
+        .expectStatus()
+        .isBadRequest();
+  }
+
+  private String createTopicWithRandomName() {
+    String topicName = UUID.randomUUID().toString();
+    short replicationFactor = 1;
+    int partitions = 1;
+    createTopic(new NewTopic(topicName, partitions, replicationFactor));
+    return topicName;
+  }
+
+  private KafkaConsumer<Bytes, Bytes> createTestConsumerWithGroupId(String groupId) {
+    Properties props = new Properties();
+    props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
+    props.put(ConsumerConfig.CLIENT_ID_CONFIG, groupId);
+    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka.getBootstrapServers());
+    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, BytesDeserializer.class);
+    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, BytesDeserializer.class);
+    props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+    return new KafkaConsumer<>(props);
+  }
+}

+ 21 - 0
kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml

@@ -415,6 +415,26 @@ paths:
               schema:
               schema:
                 $ref: '#/components/schemas/ConsumerGroupDetails'
                 $ref: '#/components/schemas/ConsumerGroupDetails'
 
 
+    delete:
+      tags:
+        - Consumer Groups
+      summary: Delete Consumer Group by ID
+      operationId: deleteConsumerGroup
+      parameters:
+        - name: clusterName
+          in: path
+          required: true
+          schema:
+            type: string
+        - name: id
+          in: path
+          required: true
+          schema:
+            type: string
+      responses:
+        200:
+          description: OK
+
   /api/clusters/{clusterName}/consumerGroups:
   /api/clusters/{clusterName}/consumerGroups:
     get:
     get:
       tags:
       tags:
@@ -1857,3 +1877,4 @@ components:
         - name
         - name
         - connect
         - connect
         - status
         - status
+