123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226 |
- package com.provectus.kafka.ui;
- import static org.assertj.core.api.Assertions.assertThat;
- import static org.springframework.http.MediaType.TEXT_EVENT_STREAM;
- import com.provectus.kafka.ui.model.BrokerConfigDTO;
- import com.provectus.kafka.ui.model.PartitionsIncreaseDTO;
- import com.provectus.kafka.ui.model.PartitionsIncreaseResponseDTO;
- import com.provectus.kafka.ui.model.TopicConfigDTO;
- import com.provectus.kafka.ui.model.TopicCreationDTO;
- import com.provectus.kafka.ui.model.TopicDetailsDTO;
- import com.provectus.kafka.ui.model.TopicMessageEventDTO;
- import com.provectus.kafka.ui.producer.KafkaTestProducer;
- import java.util.List;
- import java.util.Map;
- import java.util.UUID;
- import java.util.stream.Stream;
- import lombok.extern.slf4j.Slf4j;
- import org.junit.jupiter.api.Assertions;
- import org.junit.jupiter.api.Test;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.test.web.reactive.server.WebTestClient;
- import reactor.core.publisher.Flux;
- import reactor.core.publisher.Mono;
- @Slf4j
- public class KafkaConsumerTests extends AbstractIntegrationTest {
- @Autowired
- private WebTestClient webTestClient;
- @Test
- public void shouldDeleteRecords() {
- var topicName = UUID.randomUUID().toString();
- webTestClient.post()
- .uri("/api/clusters/{clusterName}/topics", LOCAL)
- .bodyValue(new TopicCreationDTO()
- .name(topicName)
- .partitions(1)
- .replicationFactor(1)
- .configs(Map.of())
- )
- .exchange()
- .expectStatus()
- .isOk();
- try (KafkaTestProducer<String, String> producer = KafkaTestProducer.forKafka(kafka)) {
- Flux.fromStream(
- Stream.of("one", "two", "three", "four")
- .map(value -> Mono.fromFuture(producer.send(topicName, value)))
- ).blockLast();
- } catch (Throwable e) {
- log.error("Error on sending", e);
- throw new RuntimeException(e);
- }
- long count = webTestClient.get()
- .uri("/api/clusters/{clusterName}/topics/{topicName}/messages", LOCAL, topicName)
- .accept(TEXT_EVENT_STREAM)
- .exchange()
- .expectStatus()
- .isOk()
- .expectBodyList(TopicMessageEventDTO.class)
- .returnResult()
- .getResponseBody()
- .stream()
- .filter(e -> e.getType().equals(TopicMessageEventDTO.TypeEnum.MESSAGE))
- .count();
- assertThat(count).isEqualTo(4);
- webTestClient.delete()
- .uri("/api/clusters/{clusterName}/topics/{topicName}/messages", LOCAL, topicName)
- .exchange()
- .expectStatus()
- .isOk();
- count = webTestClient.get()
- .uri("/api/clusters/{clusterName}/topics/{topicName}/messages", LOCAL, topicName)
- .exchange()
- .expectStatus()
- .isOk()
- .expectBodyList(TopicMessageEventDTO.class)
- .returnResult()
- .getResponseBody()
- .stream()
- .filter(e -> e.getType().equals(TopicMessageEventDTO.TypeEnum.MESSAGE))
- .count();
- assertThat(count).isZero();
- }
- @Test
- public void shouldIncreasePartitionsUpTo10() {
- var topicName = UUID.randomUUID().toString();
- webTestClient.post()
- .uri("/api/clusters/{clusterName}/topics", LOCAL)
- .bodyValue(new TopicCreationDTO()
- .name(topicName)
- .partitions(1)
- .replicationFactor(1)
- .configs(Map.of())
- )
- .exchange()
- .expectStatus()
- .isOk();
- PartitionsIncreaseResponseDTO response = webTestClient.patch()
- .uri("/api/clusters/{clusterName}/topics/{topicName}/partitions",
- LOCAL,
- topicName)
- .bodyValue(new PartitionsIncreaseDTO()
- .totalPartitionsCount(10)
- )
- .exchange()
- .expectStatus()
- .isOk()
- .expectBody(PartitionsIncreaseResponseDTO.class)
- .returnResult()
- .getResponseBody();
- assert response != null;
- Assertions.assertEquals(10, response.getTotalPartitionsCount());
- TopicDetailsDTO topicDetails = webTestClient.get()
- .uri("/api/clusters/{clusterName}/topics/{topicName}",
- LOCAL,
- topicName)
- .exchange()
- .expectStatus()
- .isOk()
- .expectBody(TopicDetailsDTO.class)
- .returnResult()
- .getResponseBody();
- assert topicDetails != null;
- Assertions.assertEquals(10, topicDetails.getPartitionCount());
- }
- @Test
- public void shouldReturn404ForNonExistingTopic() {
- var topicName = UUID.randomUUID().toString();
- webTestClient.delete()
- .uri("/api/clusters/{clusterName}/topics/{topicName}/messages", LOCAL, topicName)
- .exchange()
- .expectStatus()
- .isNotFound();
- webTestClient.get()
- .uri("/api/clusters/{clusterName}/topics/{topicName}/config", LOCAL, topicName)
- .exchange()
- .expectStatus()
- .isNotFound();
- }
- @Test
- public void shouldReturnConfigsForBroker() {
- var topicName = UUID.randomUUID().toString();
- List<BrokerConfigDTO> configs = webTestClient.get()
- .uri("/api/clusters/{clusterName}/brokers/{id}/configs",
- LOCAL,
- 1)
- .exchange()
- .expectStatus()
- .isOk()
- .expectBodyList(BrokerConfigDTO.class)
- .returnResult()
- .getResponseBody();
- Assertions.assertNotNull(configs);
- assert !configs.isEmpty();
- Assertions.assertNotNull(configs.get(0).getName());
- Assertions.assertNotNull(configs.get(0).getIsReadOnly());
- Assertions.assertNotNull(configs.get(0).getIsSensitive());
- Assertions.assertNotNull(configs.get(0).getSource());
- Assertions.assertNotNull(configs.get(0).getSynonyms());
- }
- @Test
- public void shouldReturn404ForNonExistingBroker() {
- webTestClient.get()
- .uri("/api/clusters/{clusterName}/brokers/{id}/configs",
- LOCAL,
- 0)
- .exchange()
- .expectStatus()
- .isNotFound();
- }
- @Test
- public void shouldRetrieveTopicConfig() {
- var topicName = UUID.randomUUID().toString();
- webTestClient.post()
- .uri("/api/clusters/{clusterName}/topics", LOCAL)
- .bodyValue(new TopicCreationDTO()
- .name(topicName)
- .partitions(1)
- .replicationFactor(1)
- .configs(Map.of())
- )
- .exchange()
- .expectStatus()
- .isOk();
- List<TopicConfigDTO> configs = webTestClient.get()
- .uri("/api/clusters/{clusterName}/topics/{topicName}/config", LOCAL, topicName)
- .exchange()
- .expectStatus()
- .isOk()
- .expectBodyList(TopicConfigDTO.class)
- .returnResult()
- .getResponseBody();
- Assertions.assertNotNull(configs);
- assert !configs.isEmpty();
- Assertions.assertNotNull(configs.get(0).getName());
- Assertions.assertNotNull(configs.get(0).getIsReadOnly());
- Assertions.assertNotNull(configs.get(0).getIsSensitive());
- Assertions.assertNotNull(configs.get(0).getSource());
- Assertions.assertNotNull(configs.get(0).getSynonyms());
- }
- }
|