KafkaConsumerTests.java 7.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226
  1. package com.provectus.kafka.ui;
  2. import static org.assertj.core.api.Assertions.assertThat;
  3. import static org.springframework.http.MediaType.TEXT_EVENT_STREAM;
  4. import com.provectus.kafka.ui.model.BrokerConfigDTO;
  5. import com.provectus.kafka.ui.model.PartitionsIncreaseDTO;
  6. import com.provectus.kafka.ui.model.PartitionsIncreaseResponseDTO;
  7. import com.provectus.kafka.ui.model.TopicConfigDTO;
  8. import com.provectus.kafka.ui.model.TopicCreationDTO;
  9. import com.provectus.kafka.ui.model.TopicDetailsDTO;
  10. import com.provectus.kafka.ui.model.TopicMessageEventDTO;
  11. import com.provectus.kafka.ui.producer.KafkaTestProducer;
  12. import java.util.List;
  13. import java.util.Map;
  14. import java.util.UUID;
  15. import java.util.stream.Stream;
  16. import lombok.extern.slf4j.Slf4j;
  17. import org.junit.jupiter.api.Assertions;
  18. import org.junit.jupiter.api.Test;
  19. import org.springframework.beans.factory.annotation.Autowired;
  20. import org.springframework.test.web.reactive.server.WebTestClient;
  21. import reactor.core.publisher.Flux;
  22. import reactor.core.publisher.Mono;
  23. @Slf4j
  24. public class KafkaConsumerTests extends AbstractIntegrationTest {
  25. @Autowired
  26. private WebTestClient webTestClient;
  27. @Test
  28. public void shouldDeleteRecords() {
  29. var topicName = UUID.randomUUID().toString();
  30. webTestClient.post()
  31. .uri("/api/clusters/{clusterName}/topics", LOCAL)
  32. .bodyValue(new TopicCreationDTO()
  33. .name(topicName)
  34. .partitions(1)
  35. .replicationFactor(1)
  36. .configs(Map.of())
  37. )
  38. .exchange()
  39. .expectStatus()
  40. .isOk();
  41. try (KafkaTestProducer<String, String> producer = KafkaTestProducer.forKafka(kafka)) {
  42. Flux.fromStream(
  43. Stream.of("one", "two", "three", "four")
  44. .map(value -> Mono.fromFuture(producer.send(topicName, value)))
  45. ).blockLast();
  46. } catch (Throwable e) {
  47. log.error("Error on sending", e);
  48. throw new RuntimeException(e);
  49. }
  50. long count = webTestClient.get()
  51. .uri("/api/clusters/{clusterName}/topics/{topicName}/messages", LOCAL, topicName)
  52. .accept(TEXT_EVENT_STREAM)
  53. .exchange()
  54. .expectStatus()
  55. .isOk()
  56. .expectBodyList(TopicMessageEventDTO.class)
  57. .returnResult()
  58. .getResponseBody()
  59. .stream()
  60. .filter(e -> e.getType().equals(TopicMessageEventDTO.TypeEnum.MESSAGE))
  61. .count();
  62. assertThat(count).isEqualTo(4);
  63. webTestClient.delete()
  64. .uri("/api/clusters/{clusterName}/topics/{topicName}/messages", LOCAL, topicName)
  65. .exchange()
  66. .expectStatus()
  67. .isOk();
  68. count = webTestClient.get()
  69. .uri("/api/clusters/{clusterName}/topics/{topicName}/messages", LOCAL, topicName)
  70. .exchange()
  71. .expectStatus()
  72. .isOk()
  73. .expectBodyList(TopicMessageEventDTO.class)
  74. .returnResult()
  75. .getResponseBody()
  76. .stream()
  77. .filter(e -> e.getType().equals(TopicMessageEventDTO.TypeEnum.MESSAGE))
  78. .count();
  79. assertThat(count).isZero();
  80. }
  81. @Test
  82. public void shouldIncreasePartitionsUpTo10() {
  83. var topicName = UUID.randomUUID().toString();
  84. webTestClient.post()
  85. .uri("/api/clusters/{clusterName}/topics", LOCAL)
  86. .bodyValue(new TopicCreationDTO()
  87. .name(topicName)
  88. .partitions(1)
  89. .replicationFactor(1)
  90. .configs(Map.of())
  91. )
  92. .exchange()
  93. .expectStatus()
  94. .isOk();
  95. PartitionsIncreaseResponseDTO response = webTestClient.patch()
  96. .uri("/api/clusters/{clusterName}/topics/{topicName}/partitions",
  97. LOCAL,
  98. topicName)
  99. .bodyValue(new PartitionsIncreaseDTO()
  100. .totalPartitionsCount(10)
  101. )
  102. .exchange()
  103. .expectStatus()
  104. .isOk()
  105. .expectBody(PartitionsIncreaseResponseDTO.class)
  106. .returnResult()
  107. .getResponseBody();
  108. assert response != null;
  109. Assertions.assertEquals(10, response.getTotalPartitionsCount());
  110. TopicDetailsDTO topicDetails = webTestClient.get()
  111. .uri("/api/clusters/{clusterName}/topics/{topicName}",
  112. LOCAL,
  113. topicName)
  114. .exchange()
  115. .expectStatus()
  116. .isOk()
  117. .expectBody(TopicDetailsDTO.class)
  118. .returnResult()
  119. .getResponseBody();
  120. assert topicDetails != null;
  121. Assertions.assertEquals(10, topicDetails.getPartitionCount());
  122. }
  123. @Test
  124. public void shouldReturn404ForNonExistingTopic() {
  125. var topicName = UUID.randomUUID().toString();
  126. webTestClient.delete()
  127. .uri("/api/clusters/{clusterName}/topics/{topicName}/messages", LOCAL, topicName)
  128. .exchange()
  129. .expectStatus()
  130. .isNotFound();
  131. webTestClient.get()
  132. .uri("/api/clusters/{clusterName}/topics/{topicName}/config", LOCAL, topicName)
  133. .exchange()
  134. .expectStatus()
  135. .isNotFound();
  136. }
  137. @Test
  138. public void shouldReturnConfigsForBroker() {
  139. var topicName = UUID.randomUUID().toString();
  140. List<BrokerConfigDTO> configs = webTestClient.get()
  141. .uri("/api/clusters/{clusterName}/brokers/{id}/configs",
  142. LOCAL,
  143. 1)
  144. .exchange()
  145. .expectStatus()
  146. .isOk()
  147. .expectBodyList(BrokerConfigDTO.class)
  148. .returnResult()
  149. .getResponseBody();
  150. Assertions.assertNotNull(configs);
  151. assert !configs.isEmpty();
  152. Assertions.assertNotNull(configs.get(0).getName());
  153. Assertions.assertNotNull(configs.get(0).getIsReadOnly());
  154. Assertions.assertNotNull(configs.get(0).getIsSensitive());
  155. Assertions.assertNotNull(configs.get(0).getSource());
  156. Assertions.assertNotNull(configs.get(0).getSynonyms());
  157. }
  158. @Test
  159. public void shouldReturn404ForNonExistingBroker() {
  160. webTestClient.get()
  161. .uri("/api/clusters/{clusterName}/brokers/{id}/configs",
  162. LOCAL,
  163. 0)
  164. .exchange()
  165. .expectStatus()
  166. .isNotFound();
  167. }
  168. @Test
  169. public void shouldRetrieveTopicConfig() {
  170. var topicName = UUID.randomUUID().toString();
  171. webTestClient.post()
  172. .uri("/api/clusters/{clusterName}/topics", LOCAL)
  173. .bodyValue(new TopicCreationDTO()
  174. .name(topicName)
  175. .partitions(1)
  176. .replicationFactor(1)
  177. .configs(Map.of())
  178. )
  179. .exchange()
  180. .expectStatus()
  181. .isOk();
  182. List<TopicConfigDTO> configs = webTestClient.get()
  183. .uri("/api/clusters/{clusterName}/topics/{topicName}/config", LOCAL, topicName)
  184. .exchange()
  185. .expectStatus()
  186. .isOk()
  187. .expectBodyList(TopicConfigDTO.class)
  188. .returnResult()
  189. .getResponseBody();
  190. Assertions.assertNotNull(configs);
  191. assert !configs.isEmpty();
  192. Assertions.assertNotNull(configs.get(0).getName());
  193. Assertions.assertNotNull(configs.get(0).getIsReadOnly());
  194. Assertions.assertNotNull(configs.get(0).getIsSensitive());
  195. Assertions.assertNotNull(configs.get(0).getSource());
  196. Assertions.assertNotNull(configs.get(0).getSynonyms());
  197. }
  198. }