KafkaConsumerTests.java 3.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129
  1. package com.provectus.kafka.ui;
  2. import com.provectus.kafka.ui.model.PartitionsIncrease;
  3. import com.provectus.kafka.ui.model.PartitionsIncreaseResponse;
  4. import com.provectus.kafka.ui.model.TopicCreation;
  5. import com.provectus.kafka.ui.model.TopicDetails;
  6. import com.provectus.kafka.ui.model.TopicMessage;
  7. import com.provectus.kafka.ui.producer.KafkaTestProducer;
  8. import java.util.Map;
  9. import java.util.UUID;
  10. import java.util.stream.Stream;
  11. import lombok.extern.log4j.Log4j2;
  12. import org.junit.jupiter.api.Assertions;
  13. import org.junit.jupiter.api.Test;
  14. import org.springframework.beans.factory.annotation.Autowired;
  15. import org.springframework.boot.test.autoconfigure.web.reactive.AutoConfigureWebTestClient;
  16. import org.springframework.test.context.ContextConfiguration;
  17. import org.springframework.test.web.reactive.server.WebTestClient;
  18. @ContextConfiguration(initializers = {AbstractBaseTest.Initializer.class})
  19. @Log4j2
  20. @AutoConfigureWebTestClient(timeout = "60000")
  21. public class KafkaConsumerTests extends AbstractBaseTest {
  22. @Autowired
  23. private WebTestClient webTestClient;
  24. @Test
  25. public void shouldDeleteRecords() {
  26. var topicName = UUID.randomUUID().toString();
  27. webTestClient.post()
  28. .uri("/api/clusters/{clusterName}/topics", LOCAL)
  29. .bodyValue(new TopicCreation()
  30. .name(topicName)
  31. .partitions(1)
  32. .replicationFactor(1)
  33. .configs(Map.of())
  34. )
  35. .exchange()
  36. .expectStatus()
  37. .isOk();
  38. try (KafkaTestProducer<String, String> producer = KafkaTestProducer.forKafka(kafka)) {
  39. Stream.of("one", "two", "three", "four")
  40. .forEach(value -> producer.send(topicName, value));
  41. }
  42. webTestClient.get()
  43. .uri("/api/clusters/{clusterName}/topics/{topicName}/messages", LOCAL, topicName)
  44. .exchange()
  45. .expectStatus()
  46. .isOk()
  47. .expectBodyList(TopicMessage.class)
  48. .hasSize(4);
  49. webTestClient.delete()
  50. .uri("/api/clusters/{clusterName}/topics/{topicName}/messages", LOCAL, topicName)
  51. .exchange()
  52. .expectStatus()
  53. .isOk();
  54. webTestClient.get()
  55. .uri("/api/clusters/{clusterName}/topics/{topicName}/messages", LOCAL, topicName)
  56. .exchange()
  57. .expectStatus()
  58. .isOk()
  59. .expectBodyList(TopicMessage.class)
  60. .hasSize(0);
  61. }
  62. @Test
  63. public void shouldIncreasePartitionsUpTo10() {
  64. var topicName = UUID.randomUUID().toString();
  65. webTestClient.post()
  66. .uri("/api/clusters/{clusterName}/topics", LOCAL)
  67. .bodyValue(new TopicCreation()
  68. .name(topicName)
  69. .partitions(1)
  70. .replicationFactor(1)
  71. .configs(Map.of())
  72. )
  73. .exchange()
  74. .expectStatus()
  75. .isOk();
  76. PartitionsIncreaseResponse response = webTestClient.patch()
  77. .uri("/api/clusters/{clusterName}/topics/{topicName}/partitions",
  78. LOCAL,
  79. topicName)
  80. .bodyValue(new PartitionsIncrease()
  81. .totalPartitionsCount(10)
  82. )
  83. .exchange()
  84. .expectStatus()
  85. .isOk()
  86. .expectBody(PartitionsIncreaseResponse.class)
  87. .returnResult()
  88. .getResponseBody();
  89. assert response != null;
  90. Assertions.assertEquals(10, response.getTotalPartitionsCount());
  91. TopicDetails topicDetails = webTestClient.get()
  92. .uri("/api/clusters/{clusterName}/topics/{topicName}",
  93. LOCAL,
  94. topicName)
  95. .exchange()
  96. .expectStatus()
  97. .isOk()
  98. .expectBody(TopicDetails.class)
  99. .returnResult()
  100. .getResponseBody();
  101. assert topicDetails != null;
  102. Assertions.assertEquals(10, topicDetails.getPartitionCount());
  103. }
  104. @Test
  105. public void shouldReturn404ForNonExistingTopic() {
  106. var topicName = UUID.randomUUID().toString();
  107. webTestClient.delete()
  108. .uri("/api/clusters/{clusterName}/topics/{topicName}/messages", LOCAL, topicName)
  109. .exchange()
  110. .expectStatus()
  111. .isNotFound();
  112. }
  113. }