KafkaConsumerTests.java 6.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207
  1. package com.provectus.kafka.ui;
  2. import com.provectus.kafka.ui.api.model.TopicConfig;
  3. import com.provectus.kafka.ui.model.BrokerConfig;
  4. import com.provectus.kafka.ui.model.PartitionsIncrease;
  5. import com.provectus.kafka.ui.model.PartitionsIncreaseResponse;
  6. import com.provectus.kafka.ui.model.TopicCreation;
  7. import com.provectus.kafka.ui.model.TopicDetails;
  8. import com.provectus.kafka.ui.model.TopicMessage;
  9. import com.provectus.kafka.ui.producer.KafkaTestProducer;
  10. import java.util.List;
  11. import java.util.Map;
  12. import java.util.UUID;
  13. import java.util.stream.Stream;
  14. import lombok.extern.log4j.Log4j2;
  15. import org.junit.jupiter.api.Assertions;
  16. import org.junit.jupiter.api.Test;
  17. import org.springframework.beans.factory.annotation.Autowired;
  18. import org.springframework.boot.test.autoconfigure.web.reactive.AutoConfigureWebTestClient;
  19. import org.springframework.test.context.ContextConfiguration;
  20. import org.springframework.test.web.reactive.server.WebTestClient;
  21. @ContextConfiguration(initializers = {AbstractBaseTest.Initializer.class})
  22. @Log4j2
  23. @AutoConfigureWebTestClient(timeout = "60000")
  24. public class KafkaConsumerTests extends AbstractBaseTest {
  25. @Autowired
  26. private WebTestClient webTestClient;
  27. @Test
  28. public void shouldDeleteRecords() {
  29. var topicName = UUID.randomUUID().toString();
  30. webTestClient.post()
  31. .uri("/api/clusters/{clusterName}/topics", LOCAL)
  32. .bodyValue(new TopicCreation()
  33. .name(topicName)
  34. .partitions(1)
  35. .replicationFactor(1)
  36. .configs(Map.of())
  37. )
  38. .exchange()
  39. .expectStatus()
  40. .isOk();
  41. try (KafkaTestProducer<String, String> producer = KafkaTestProducer.forKafka(kafka)) {
  42. Stream.of("one", "two", "three", "four")
  43. .forEach(value -> producer.send(topicName, value));
  44. }
  45. webTestClient.get()
  46. .uri("/api/clusters/{clusterName}/topics/{topicName}/messages", LOCAL, topicName)
  47. .exchange()
  48. .expectStatus()
  49. .isOk()
  50. .expectBodyList(TopicMessage.class)
  51. .hasSize(4);
  52. webTestClient.delete()
  53. .uri("/api/clusters/{clusterName}/topics/{topicName}/messages", LOCAL, topicName)
  54. .exchange()
  55. .expectStatus()
  56. .isOk();
  57. webTestClient.get()
  58. .uri("/api/clusters/{clusterName}/topics/{topicName}/messages", LOCAL, topicName)
  59. .exchange()
  60. .expectStatus()
  61. .isOk()
  62. .expectBodyList(TopicMessage.class)
  63. .hasSize(0);
  64. }
  65. @Test
  66. public void shouldIncreasePartitionsUpTo10() {
  67. var topicName = UUID.randomUUID().toString();
  68. webTestClient.post()
  69. .uri("/api/clusters/{clusterName}/topics", LOCAL)
  70. .bodyValue(new TopicCreation()
  71. .name(topicName)
  72. .partitions(1)
  73. .replicationFactor(1)
  74. .configs(Map.of())
  75. )
  76. .exchange()
  77. .expectStatus()
  78. .isOk();
  79. PartitionsIncreaseResponse response = webTestClient.patch()
  80. .uri("/api/clusters/{clusterName}/topics/{topicName}/partitions",
  81. LOCAL,
  82. topicName)
  83. .bodyValue(new PartitionsIncrease()
  84. .totalPartitionsCount(10)
  85. )
  86. .exchange()
  87. .expectStatus()
  88. .isOk()
  89. .expectBody(PartitionsIncreaseResponse.class)
  90. .returnResult()
  91. .getResponseBody();
  92. assert response != null;
  93. Assertions.assertEquals(10, response.getTotalPartitionsCount());
  94. TopicDetails topicDetails = webTestClient.get()
  95. .uri("/api/clusters/{clusterName}/topics/{topicName}",
  96. LOCAL,
  97. topicName)
  98. .exchange()
  99. .expectStatus()
  100. .isOk()
  101. .expectBody(TopicDetails.class)
  102. .returnResult()
  103. .getResponseBody();
  104. assert topicDetails != null;
  105. Assertions.assertEquals(10, topicDetails.getPartitionCount());
  106. }
  107. @Test
  108. public void shouldReturn404ForNonExistingTopic() {
  109. var topicName = UUID.randomUUID().toString();
  110. webTestClient.delete()
  111. .uri("/api/clusters/{clusterName}/topics/{topicName}/messages", LOCAL, topicName)
  112. .exchange()
  113. .expectStatus()
  114. .isNotFound();
  115. webTestClient.get()
  116. .uri("/api/clusters/{clusterName}/topics/{topicName}/config", LOCAL, topicName)
  117. .exchange()
  118. .expectStatus()
  119. .isNotFound();
  120. }
  121. @Test
  122. public void shouldReturnConfigsForBroker() {
  123. var topicName = UUID.randomUUID().toString();
  124. List<BrokerConfig> configs = webTestClient.get()
  125. .uri("/api/clusters/{clusterName}/brokers/{id}/configs",
  126. LOCAL,
  127. 1)
  128. .exchange()
  129. .expectStatus()
  130. .isOk()
  131. .expectBodyList(BrokerConfig.class)
  132. .returnResult()
  133. .getResponseBody();
  134. Assertions.assertNotNull(configs);
  135. assert !configs.isEmpty();
  136. Assertions.assertNotNull(configs.get(0).getName());
  137. Assertions.assertNotNull(configs.get(0).getIsReadOnly());
  138. Assertions.assertNotNull(configs.get(0).getIsSensitive());
  139. Assertions.assertNotNull(configs.get(0).getSource());
  140. Assertions.assertNotNull(configs.get(0).getSynonyms());
  141. }
  142. @Test
  143. public void shouldReturn404ForNonExistingBroker() {
  144. webTestClient.get()
  145. .uri("/api/clusters/{clusterName}/brokers/{id}/configs",
  146. LOCAL,
  147. 0)
  148. .exchange()
  149. .expectStatus()
  150. .isNotFound();
  151. }
  152. @Test
  153. public void shouldRetrieveTopicConfig() {
  154. var topicName = UUID.randomUUID().toString();
  155. webTestClient.post()
  156. .uri("/api/clusters/{clusterName}/topics", LOCAL)
  157. .bodyValue(new TopicCreation()
  158. .name(topicName)
  159. .partitions(1)
  160. .replicationFactor(1)
  161. .configs(Map.of())
  162. )
  163. .exchange()
  164. .expectStatus()
  165. .isOk();
  166. List<TopicConfig> configs = webTestClient.get()
  167. .uri("/api/clusters/{clusterName}/topics/{topicName}/config", LOCAL, topicName)
  168. .exchange()
  169. .expectStatus()
  170. .isOk()
  171. .expectBodyList(TopicConfig.class)
  172. .returnResult()
  173. .getResponseBody();
  174. Assertions.assertNotNull(configs);
  175. assert !configs.isEmpty();
  176. Assertions.assertNotNull(configs.get(0).getName());
  177. Assertions.assertNotNull(configs.get(0).getIsReadOnly());
  178. Assertions.assertNotNull(configs.get(0).getIsSensitive());
  179. Assertions.assertNotNull(configs.get(0).getSource());
  180. Assertions.assertNotNull(configs.get(0).getSynonyms());
  181. }
  182. }