From 5dd3944faabe98b089958186200b290d2d6c7766 Mon Sep 17 00:00:00 2001 From: Ildar Almakaev Date: Wed, 23 Jun 2021 20:47:32 +0300 Subject: [PATCH] Add API to delete consumer group by id (#578) * [issue-516] Add API to delete consumer groups by IDs * Add more tests to check consumer groups deletions * Refactor and fix code style * Fix codestyle * Rethrow OperationInterruptedException with 500 error code if a thread is interrupted * Use SneakyTrhrows to handle InterruptedException. Refactor * Change deletion of groups API to single group delete * Fix codestyle * Rollback changes in kafka-ui-react-app/package-lock.json --- .../controller/ConsumerGroupsController.java | 7 ++ .../kafka/ui/exception/ErrorCode.java | 2 + .../IllegalEntityStateException.java | 12 +++ .../kafka/ui/exception/NotFoundException.java | 13 +++ .../kafka/ui/service/ClusterService.java | 31 ++++++ .../src/main/resources/application-local.yml | 25 +++-- .../src/main/resources/application-sdp.yml | 19 ++-- .../kafka/ui/KakfaConsumerGroupTests.java | 99 +++++++++++++++++++ .../main/resources/swagger/kafka-ui-api.yaml | 21 ++++ 9 files changed, 206 insertions(+), 23 deletions(-) create mode 100644 kafka-ui-api/src/main/java/com/provectus/kafka/ui/exception/IllegalEntityStateException.java create mode 100644 kafka-ui-api/src/main/java/com/provectus/kafka/ui/exception/NotFoundException.java create mode 100644 kafka-ui-api/src/test/java/com/provectus/kafka/ui/KakfaConsumerGroupTests.java diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/ConsumerGroupsController.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/ConsumerGroupsController.java index ba52586ee0..1a0d2f0748 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/ConsumerGroupsController.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/ConsumerGroupsController.java @@ -19,6 +19,13 @@ import reactor.core.publisher.Mono; public class ConsumerGroupsController implements ConsumerGroupsApi { private final ClusterService clusterService; + @Override + public Mono> deleteConsumerGroup(String clusterName, String id, + ServerWebExchange exchange) { + return clusterService.deleteConsumerGroupById(clusterName, id) + .map(ResponseEntity::ok); + } + @Override public Mono> getConsumerGroup( String clusterName, String consumerGroupId, ServerWebExchange exchange) { diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/exception/ErrorCode.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/exception/ErrorCode.java index 4517507764..506ed40ef8 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/exception/ErrorCode.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/exception/ErrorCode.java @@ -9,6 +9,8 @@ public enum ErrorCode { UNEXPECTED(5000, HttpStatus.INTERNAL_SERVER_ERROR), BINDING_FAIL(4001, HttpStatus.BAD_REQUEST), + NOT_FOUND(404, HttpStatus.NOT_FOUND), + INVALID_ENTITY_STATE(4001, HttpStatus.BAD_REQUEST), VALIDATION_FAIL(4002, HttpStatus.BAD_REQUEST), READ_ONLY_MODE_ENABLE(4003, HttpStatus.METHOD_NOT_ALLOWED), REBALANCE_IN_PROGRESS(4004, HttpStatus.CONFLICT), diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/exception/IllegalEntityStateException.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/exception/IllegalEntityStateException.java new file mode 100644 index 0000000000..d241cfc096 --- /dev/null +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/exception/IllegalEntityStateException.java @@ -0,0 +1,12 @@ +package com.provectus.kafka.ui.exception; + +public class IllegalEntityStateException extends CustomBaseException { + public IllegalEntityStateException(String message) { + super(message); + } + + @Override + public ErrorCode getErrorCode() { + return ErrorCode.INVALID_ENTITY_STATE; + } +} diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/exception/NotFoundException.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/exception/NotFoundException.java new file mode 100644 index 0000000000..1f3abc870a --- /dev/null +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/exception/NotFoundException.java @@ -0,0 +1,13 @@ +package com.provectus.kafka.ui.exception; + +public class NotFoundException extends CustomBaseException { + + public NotFoundException(String message) { + super(message); + } + + @Override + public ErrorCode getErrorCode() { + return ErrorCode.NOT_FOUND; + } +} diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/ClusterService.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/ClusterService.java index 2628584046..1bb8fc0195 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/ClusterService.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/ClusterService.java @@ -1,6 +1,8 @@ package com.provectus.kafka.ui.service; import com.provectus.kafka.ui.exception.ClusterNotFoundException; +import com.provectus.kafka.ui.exception.IllegalEntityStateException; +import com.provectus.kafka.ui.exception.NotFoundException; import com.provectus.kafka.ui.exception.TopicNotFoundException; import com.provectus.kafka.ui.mapper.ClusterMapper; import com.provectus.kafka.ui.model.Broker; @@ -11,6 +13,7 @@ import com.provectus.kafka.ui.model.ClusterStats; import com.provectus.kafka.ui.model.ConsumerGroup; import com.provectus.kafka.ui.model.ConsumerGroupDetails; import com.provectus.kafka.ui.model.ConsumerPosition; +import com.provectus.kafka.ui.model.ExtendedAdminClient; import com.provectus.kafka.ui.model.InternalTopic; import com.provectus.kafka.ui.model.KafkaCluster; import com.provectus.kafka.ui.model.Topic; @@ -33,8 +36,13 @@ import java.util.stream.Collectors; import java.util.stream.Stream; import lombok.RequiredArgsConstructor; import lombok.SneakyThrows; +import lombok.extern.log4j.Log4j2; import org.apache.commons.lang3.StringUtils; +import org.apache.kafka.clients.admin.DeleteConsumerGroupsResult; import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.errors.GroupIdNotFoundException; +import org.apache.kafka.common.errors.GroupNotEmptyException; +import org.jetbrains.annotations.NotNull; import org.springframework.stereotype.Service; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; @@ -42,6 +50,7 @@ import reactor.util.function.Tuples; @Service @RequiredArgsConstructor +@Log4j2 public class ClusterService { private static final Integer DEFAULT_PAGE_SIZE = 25; @@ -272,5 +281,27 @@ public class ClusterService { .flatMap(offsets -> kafkaService.deleteTopicMessages(cluster, offsets)); } + public Mono deleteConsumerGroupById(String clusterName, + String groupId) { + return clustersStorage.getClusterByName(clusterName) + .map(cluster -> kafkaService.getOrCreateAdminClient(cluster) + .map(ExtendedAdminClient::getAdminClient) + .map(adminClient -> adminClient.deleteConsumerGroups(List.of(groupId))) + .map(DeleteConsumerGroupsResult::all) + .flatMap(ClusterUtil::toMono) + .onErrorResume(this::reThrowCustomException) + ) + .orElse(Mono.empty()); + } + @NotNull + private Mono reThrowCustomException(Throwable e) { + if (e instanceof GroupIdNotFoundException) { + return Mono.error(new NotFoundException("The group id does not exist")); + } else if (e instanceof GroupNotEmptyException) { + return Mono.error(new IllegalEntityStateException("The group is not empty")); + } else { + return Mono.error(e); + } + } } \ No newline at end of file diff --git a/kafka-ui-api/src/main/resources/application-local.yml b/kafka-ui-api/src/main/resources/application-local.yml index 5822849109..a684928f68 100644 --- a/kafka-ui-api/src/main/resources/application-local.yml +++ b/kafka-ui-api/src/main/resources/application-local.yml @@ -1,24 +1,23 @@ kafka: clusters: - - - name: local - bootstrapServers: localhost:9092 + - name: local + bootstrapServers: localhost:9093 zookeeper: localhost:2181 schemaRegistry: http://localhost:8081 kafkaConnect: - name: first address: http://localhost:8083 jmxPort: 9997 - - - name: secondLocal - bootstrapServers: localhost:9093 - zookeeper: localhost:2182 - schemaRegistry: http://localhost:18085 - kafkaConnect: - - name: first - address: http://localhost:8083 - jmxPort: 9998 - read-only: true + # - + # name: secondLocal + # bootstrapServers: localhost:9093 + # zookeeper: localhost:2182 + # schemaRegistry: http://localhost:18085 + # kafkaConnect: + # - name: first + # address: http://localhost:8083 + # jmxPort: 9998 + # read-only: true admin-client-timeout: 5000 zookeeper: connection-timeout: 1000 diff --git a/kafka-ui-api/src/main/resources/application-sdp.yml b/kafka-ui-api/src/main/resources/application-sdp.yml index 46a0377799..12503b7549 100644 --- a/kafka-ui-api/src/main/resources/application-sdp.yml +++ b/kafka-ui-api/src/main/resources/application-sdp.yml @@ -1,15 +1,14 @@ kafka: clusters: - - - name: local - bootstrapServers: kafka0:29092 - zookeeper: zookeeper0:2181 - schemaRegistry: http://schemaregistry0:8085 - - - name: secondLocal - zookeeper: zookeeper1:2181 - bootstrapServers: kafka1:29092 - schemaRegistry: http://schemaregistry1:8085 + - name: local + bootstrapServers: localhost:9093 + zookeeper: localhost:2181 + schemaRegistry: http://localhost:8083 + # - + # name: secondLocal + # zookeeper: zookeeper1:2181 + # bootstrapServers: kafka1:29092 + # schemaRegistry: http://schemaregistry1:8085 admin-client-timeout: 5000 zookeeper: connection-timeout: 1000 diff --git a/kafka-ui-api/src/test/java/com/provectus/kafka/ui/KakfaConsumerGroupTests.java b/kafka-ui-api/src/test/java/com/provectus/kafka/ui/KakfaConsumerGroupTests.java new file mode 100644 index 0000000000..423d441c33 --- /dev/null +++ b/kafka-ui-api/src/test/java/com/provectus/kafka/ui/KakfaConsumerGroupTests.java @@ -0,0 +1,99 @@ +package com.provectus.kafka.ui; + +import java.time.Duration; +import java.util.List; +import java.util.Properties; +import java.util.UUID; +import lombok.extern.log4j.Log4j2; +import lombok.val; +import org.apache.kafka.clients.admin.NewTopic; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.common.serialization.BytesDeserializer; +import org.apache.kafka.common.utils.Bytes; +import org.junit.jupiter.api.Test; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.test.autoconfigure.web.reactive.AutoConfigureWebTestClient; +import org.springframework.test.context.ContextConfiguration; +import org.springframework.test.web.reactive.server.WebTestClient; + +@ContextConfiguration(initializers = {AbstractBaseTest.Initializer.class}) +@Log4j2 +@AutoConfigureWebTestClient(timeout = "10000") +public class KakfaConsumerGroupTests extends AbstractBaseTest { + @Autowired + WebTestClient webTestClient; + + @Test + void shouldNotFoundWhenNoSuchConsumerGroupId() { + String groupId = "groupA"; + String expError = "The group id does not exist"; + webTestClient + .delete() + .uri("/api/clusters/{clusterName}/consumer-groups/{groupId}", LOCAL, groupId) + .exchange() + .expectStatus() + .isNotFound(); + } + + @Test + void shouldOkWhenConsumerGroupIsNotActive() { + String topicName = createTopicWithRandomName(); + + //Create a consumer and subscribe to the topic + String groupId = UUID.randomUUID().toString(); + val consumer = createTestConsumerWithGroupId(groupId); + consumer.subscribe(List.of(topicName)); + consumer.poll(Duration.ofMillis(100)); + + //Unsubscribe from all topics to be able to delete this consumer + consumer.unsubscribe(); + + //Delete the consumer when it's INACTIVE and check + webTestClient + .delete() + .uri("/api/clusters/{clusterName}/consumer-groups/{groupId}", LOCAL, groupId) + .exchange() + .expectStatus() + .isOk(); + } + + @Test + void shouldBeBadRequestWhenConsumerGroupIsActive() { + String topicName = createTopicWithRandomName(); + + //Create a consumer and subscribe to the topic + String groupId = UUID.randomUUID().toString(); + val consumer = createTestConsumerWithGroupId(groupId); + consumer.subscribe(List.of(topicName)); + consumer.poll(Duration.ofMillis(100)); + + //Try to delete the consumer when it's ACTIVE + String expError = "The group is not empty"; + webTestClient + .delete() + .uri("/api/clusters/{clusterName}/consumer-groups/{groupId}", LOCAL, groupId) + .exchange() + .expectStatus() + .isBadRequest(); + } + + private String createTopicWithRandomName() { + String topicName = UUID.randomUUID().toString(); + short replicationFactor = 1; + int partitions = 1; + createTopic(new NewTopic(topicName, partitions, replicationFactor)); + return topicName; + } + + private KafkaConsumer createTestConsumerWithGroupId(String groupId) { + Properties props = new Properties(); + props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId); + props.put(ConsumerConfig.CLIENT_ID_CONFIG, groupId); + props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka.getBootstrapServers()); + props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, BytesDeserializer.class); + props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, BytesDeserializer.class); + props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); + return new KafkaConsumer<>(props); + } +} diff --git a/kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml b/kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml index f77b6b1881..e6e2ef9443 100644 --- a/kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml +++ b/kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml @@ -415,6 +415,26 @@ paths: schema: $ref: '#/components/schemas/ConsumerGroupDetails' + delete: + tags: + - Consumer Groups + summary: Delete Consumer Group by ID + operationId: deleteConsumerGroup + parameters: + - name: clusterName + in: path + required: true + schema: + type: string + - name: id + in: path + required: true + schema: + type: string + responses: + 200: + description: OK + /api/clusters/{clusterName}/consumerGroups: get: tags: @@ -1857,3 +1877,4 @@ components: - name - connect - status +