KafkaConsumerGroupTests.java 6.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184
  1. package com.provectus.kafka.ui;
  2. import static org.assertj.core.api.Assertions.assertThat;
  3. import com.provectus.kafka.ui.model.ConsumerGroupDTO;
  4. import com.provectus.kafka.ui.model.ConsumerGroupsPageResponseDTO;
  5. import java.io.Closeable;
  6. import java.time.Duration;
  7. import java.util.Comparator;
  8. import java.util.List;
  9. import java.util.Properties;
  10. import java.util.UUID;
  11. import java.util.stream.Collectors;
  12. import java.util.stream.Stream;
  13. import lombok.extern.slf4j.Slf4j;
  14. import lombok.val;
  15. import org.apache.commons.lang3.RandomStringUtils;
  16. import org.apache.kafka.clients.admin.NewTopic;
  17. import org.apache.kafka.clients.consumer.ConsumerConfig;
  18. import org.apache.kafka.clients.consumer.KafkaConsumer;
  19. import org.apache.kafka.common.serialization.BytesDeserializer;
  20. import org.apache.kafka.common.utils.Bytes;
  21. import org.junit.jupiter.api.Test;
  22. import org.springframework.beans.factory.annotation.Autowired;
  23. import org.springframework.test.web.reactive.server.WebTestClient;
  24. @Slf4j
  25. public class KafkaConsumerGroupTests extends AbstractIntegrationTest {
  26. @Autowired
  27. WebTestClient webTestClient;
  28. @Test
  29. void shouldNotFoundWhenNoSuchConsumerGroupId() {
  30. String groupId = "groupA";
  31. String expError = "The group id does not exist";
  32. webTestClient
  33. .delete()
  34. .uri("/api/clusters/{clusterName}/consumer-groups/{groupId}", LOCAL, groupId)
  35. .exchange()
  36. .expectStatus()
  37. .isNotFound();
  38. }
  39. @Test
  40. void shouldOkWhenConsumerGroupIsNotActive() {
  41. String topicName = createTopicWithRandomName();
  42. //Create a consumer and subscribe to the topic
  43. String groupId = UUID.randomUUID().toString();
  44. val consumer = createTestConsumerWithGroupId(groupId);
  45. consumer.subscribe(List.of(topicName));
  46. consumer.poll(Duration.ofMillis(100));
  47. //Unsubscribe from all topics to be able to delete this consumer
  48. consumer.unsubscribe();
  49. //Delete the consumer when it's INACTIVE and check
  50. webTestClient
  51. .delete()
  52. .uri("/api/clusters/{clusterName}/consumer-groups/{groupId}", LOCAL, groupId)
  53. .exchange()
  54. .expectStatus()
  55. .isOk();
  56. }
  57. @Test
  58. void shouldBeBadRequestWhenConsumerGroupIsActive() {
  59. String topicName = createTopicWithRandomName();
  60. //Create a consumer and subscribe to the topic
  61. String groupId = UUID.randomUUID().toString();
  62. val consumer = createTestConsumerWithGroupId(groupId);
  63. consumer.subscribe(List.of(topicName));
  64. consumer.poll(Duration.ofMillis(100));
  65. //Try to delete the consumer when it's ACTIVE
  66. String expError = "The group is not empty";
  67. webTestClient
  68. .delete()
  69. .uri("/api/clusters/{clusterName}/consumer-groups/{groupId}", LOCAL, groupId)
  70. .exchange()
  71. .expectStatus()
  72. .isBadRequest();
  73. }
  74. @Test
  75. void shouldReturnConsumerGroupsWithPagination() throws Exception {
  76. try (var groups1 = startConsumerGroups(3, "cgPageTest1");
  77. var groups2 = startConsumerGroups(2, "cgPageTest2")) {
  78. webTestClient
  79. .get()
  80. .uri("/api/clusters/{clusterName}/consumer-groups/paged?perPage=3&search=cgPageTest", LOCAL)
  81. .exchange()
  82. .expectStatus()
  83. .isOk()
  84. .expectBody(ConsumerGroupsPageResponseDTO.class)
  85. .value(page -> {
  86. assertThat(page.getPageCount()).isEqualTo(2);
  87. assertThat(page.getConsumerGroups().size()).isEqualTo(3);
  88. });
  89. webTestClient
  90. .get()
  91. .uri("/api/clusters/{clusterName}/consumer-groups/paged?perPage=10&search=cgPageTest", LOCAL)
  92. .exchange()
  93. .expectStatus()
  94. .isOk()
  95. .expectBody(ConsumerGroupsPageResponseDTO.class)
  96. .value(page -> {
  97. assertThat(page.getPageCount()).isEqualTo(1);
  98. assertThat(page.getConsumerGroups().size()).isEqualTo(5);
  99. assertThat(page.getConsumerGroups())
  100. .isSortedAccordingTo(Comparator.comparing(ConsumerGroupDTO::getGroupId));
  101. });
  102. webTestClient
  103. .get()
  104. .uri("/api/clusters/{clusterName}/consumer-groups/paged?perPage=10&&search"
  105. + "=cgPageTest&orderBy=NAME&sortOrder=DESC", LOCAL)
  106. .exchange()
  107. .expectStatus()
  108. .isOk()
  109. .expectBody(ConsumerGroupsPageResponseDTO.class)
  110. .value(page -> {
  111. assertThat(page.getPageCount()).isEqualTo(1);
  112. assertThat(page.getConsumerGroups().size()).isEqualTo(5);
  113. assertThat(page.getConsumerGroups())
  114. .isSortedAccordingTo(Comparator.comparing(ConsumerGroupDTO::getGroupId).reversed());
  115. });
  116. webTestClient
  117. .get()
  118. .uri("/api/clusters/{clusterName}/consumer-groups/paged?perPage=10&&search"
  119. + "=cgPageTest&orderBy=MEMBERS&sortOrder=DESC", LOCAL)
  120. .exchange()
  121. .expectStatus()
  122. .isOk()
  123. .expectBody(ConsumerGroupsPageResponseDTO.class)
  124. .value(page -> {
  125. assertThat(page.getPageCount()).isEqualTo(1);
  126. assertThat(page.getConsumerGroups().size()).isEqualTo(5);
  127. assertThat(page.getConsumerGroups())
  128. .isSortedAccordingTo(Comparator.comparing(ConsumerGroupDTO::getMembers).reversed());
  129. });
  130. }
  131. }
  132. private Closeable startConsumerGroups(int count, String consumerGroupPrefix) {
  133. String topicName = createTopicWithRandomName();
  134. var consumers =
  135. Stream.generate(() -> {
  136. String groupId = consumerGroupPrefix + RandomStringUtils.randomAlphabetic(5);
  137. val consumer = createTestConsumerWithGroupId(groupId);
  138. consumer.subscribe(List.of(topicName));
  139. consumer.poll(Duration.ofMillis(100));
  140. return consumer;
  141. })
  142. .limit(count)
  143. .collect(Collectors.toList());
  144. return () -> {
  145. consumers.forEach(KafkaConsumer::close);
  146. deleteTopic(topicName);
  147. };
  148. }
  149. private String createTopicWithRandomName() {
  150. String topicName = getClass().getSimpleName() + "-" + UUID.randomUUID();
  151. short replicationFactor = 1;
  152. int partitions = 1;
  153. createTopic(new NewTopic(topicName, partitions, replicationFactor));
  154. return topicName;
  155. }
  156. private KafkaConsumer<Bytes, Bytes> createTestConsumerWithGroupId(String groupId) {
  157. Properties props = new Properties();
  158. props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
  159. props.put(ConsumerConfig.CLIENT_ID_CONFIG, groupId);
  160. props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka.getBootstrapServers());
  161. props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, BytesDeserializer.class);
  162. props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, BytesDeserializer.class);
  163. props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
  164. return new KafkaConsumer<>(props);
  165. }
  166. }