From 49e10c9b6e3134d0483190ab5c0a935ada68f2ac Mon Sep 17 00:00:00 2001 From: Ramazan Yapparov Date: Thu, 11 Mar 2021 16:30:33 +0300 Subject: [PATCH] added test for delete action --- .../ui/cluster/service/ClusterService.java | 2 +- .../ui/cluster/service/ConsumingService.java | 10 +-- .../provectus/kafka/ui/AbstractBaseTest.java | 2 + .../kafka/ui/KafkaConsumerTests.java | 68 +++++++++++++++++++ .../KafkaConnectContainer.java | 2 +- .../SchemaRegistryContainer.java | 2 +- .../kafka/ui/producer/KafkaTestProducer.java | 35 ++++++++++ 7 files changed, 114 insertions(+), 7 deletions(-) create mode 100644 kafka-ui-api/src/test/java/com/provectus/kafka/ui/KafkaConsumerTests.java rename kafka-ui-api/src/test/java/com/provectus/kafka/ui/{ => container}/KafkaConnectContainer.java (98%) rename kafka-ui-api/src/test/java/com/provectus/kafka/ui/{ => container}/SchemaRegistryContainer.java (96%) create mode 100644 kafka-ui-api/src/test/java/com/provectus/kafka/ui/producer/KafkaTestProducer.java diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/service/ClusterService.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/service/ClusterService.java index ab068eb5e6..448841e44b 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/service/ClusterService.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/service/ClusterService.java @@ -207,6 +207,6 @@ public class ClusterService { .map(partition -> new TopicPartition(topicName, partition)) .collect(Collectors.toList()); return consumingService.loadOffsets(cluster, partitions) - .flatMap(offsets -> kafkaService.deleteTopicMessages(cluster, topicName, offsets)).next(); + .flatMap(offsets -> kafkaService.deleteTopicMessages(cluster, topicName, offsets)); } } diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/service/ConsumingService.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/service/ConsumingService.java index 65adc4fb55..68069af051 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/service/ConsumingService.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/service/ConsumingService.java @@ -21,6 +21,8 @@ import org.apache.kafka.common.utils.Bytes; import org.springframework.stereotype.Service; import reactor.core.publisher.Flux; import reactor.core.publisher.FluxSink; +import reactor.core.publisher.Mono; +import reactor.core.publisher.MonoSink; import reactor.core.scheduler.Schedulers; import java.time.Duration; @@ -55,9 +57,9 @@ public class ConsumingService { .limitRequest(recordsLimit); } - public Flux> loadOffsets(KafkaCluster cluster, List partitions) { + public Mono> loadOffsets(KafkaCluster cluster, List partitions) { OffsetEmitter emitter = new OffsetEmitter(kafkaService, cluster, partitions); - return Flux.create(emitter::emit) + return Mono.create(emitter::emit) .subscribeOn(Schedulers.boundedElastic()); } @@ -194,10 +196,10 @@ public class ConsumingService { private final KafkaCluster cluster; private final List partitions; - public void emit(FluxSink> sink) { + public void emit(MonoSink> sink) { try (KafkaConsumer consumer = kafkaService.createConsumer(cluster)) { Map offsets = consumer.endOffsets(partitions); - sink.next(offsets); + sink.success(offsets); } catch (Exception e) { log.error("Error occurred while consuming records", e); throw new RuntimeException(e); diff --git a/kafka-ui-api/src/test/java/com/provectus/kafka/ui/AbstractBaseTest.java b/kafka-ui-api/src/test/java/com/provectus/kafka/ui/AbstractBaseTest.java index 4065b4006d..38ff5c9810 100644 --- a/kafka-ui-api/src/test/java/com/provectus/kafka/ui/AbstractBaseTest.java +++ b/kafka-ui-api/src/test/java/com/provectus/kafka/ui/AbstractBaseTest.java @@ -1,5 +1,7 @@ package com.provectus.kafka.ui; +import com.provectus.kafka.ui.container.KafkaConnectContainer; +import com.provectus.kafka.ui.container.SchemaRegistryContainer; import org.jetbrains.annotations.NotNull; import org.junit.jupiter.api.extension.ExtendWith; import org.springframework.boot.test.context.SpringBootTest; diff --git a/kafka-ui-api/src/test/java/com/provectus/kafka/ui/KafkaConsumerTests.java b/kafka-ui-api/src/test/java/com/provectus/kafka/ui/KafkaConsumerTests.java new file mode 100644 index 0000000000..bb9200a2f0 --- /dev/null +++ b/kafka-ui-api/src/test/java/com/provectus/kafka/ui/KafkaConsumerTests.java @@ -0,0 +1,68 @@ +package com.provectus.kafka.ui; + +import com.provectus.kafka.ui.model.TopicFormData; +import com.provectus.kafka.ui.model.TopicMessage; +import com.provectus.kafka.ui.producer.KafkaTestProducer; +import lombok.extern.log4j.Log4j2; +import org.junit.jupiter.api.Test; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.test.autoconfigure.web.reactive.AutoConfigureWebTestClient; +import org.springframework.test.context.ContextConfiguration; +import org.springframework.test.web.reactive.server.WebTestClient; + +import java.util.Map; +import java.util.UUID; +import java.util.stream.Stream; + +@ContextConfiguration(initializers = {AbstractBaseTest.Initializer.class}) +@Log4j2 +@AutoConfigureWebTestClient(timeout = "60000") +public class KafkaConsumerTests extends AbstractBaseTest { + + @Autowired + private WebTestClient webTestClient; + + + @Test + public void shouldDeleteRecords() { + var topicName = UUID.randomUUID().toString(); + webTestClient.post() + .uri("/api/clusters/{clusterName}/topics", LOCAL) + .bodyValue(new TopicFormData() + .name(topicName) + .partitions(1) + .replicationFactor(1) + .configs(Map.of()) + ) + .exchange() + .expectStatus() + .isOk(); + + try(KafkaTestProducer producer = KafkaTestProducer.forKafka(kafka)) { + Stream.of("one", "two", "three", "four") + .forEach(value -> producer.send(topicName, value)); + } + + webTestClient.get() + .uri("/api/clusters/{clusterName}/topics/{topicName}/messages", LOCAL, topicName) + .exchange() + .expectStatus() + .isOk() + .expectBodyList(TopicMessage.class) + .hasSize(4); + + webTestClient.delete() + .uri("/api/clusters/{clusterName}/topics/{topicName}/messages", LOCAL, topicName) + .exchange() + .expectStatus() + .isOk(); + + webTestClient.get() + .uri("/api/clusters/{clusterName}/topics/{topicName}/messages", LOCAL, topicName) + .exchange() + .expectStatus() + .isOk() + .expectBodyList(TopicMessage.class) + .hasSize(0); + } +} diff --git a/kafka-ui-api/src/test/java/com/provectus/kafka/ui/KafkaConnectContainer.java b/kafka-ui-api/src/test/java/com/provectus/kafka/ui/container/KafkaConnectContainer.java similarity index 98% rename from kafka-ui-api/src/test/java/com/provectus/kafka/ui/KafkaConnectContainer.java rename to kafka-ui-api/src/test/java/com/provectus/kafka/ui/container/KafkaConnectContainer.java index 06cdaa2a73..f7f2be49c5 100644 --- a/kafka-ui-api/src/test/java/com/provectus/kafka/ui/KafkaConnectContainer.java +++ b/kafka-ui-api/src/test/java/com/provectus/kafka/ui/container/KafkaConnectContainer.java @@ -1,4 +1,4 @@ -package com.provectus.kafka.ui; +package com.provectus.kafka.ui.container; import org.testcontainers.containers.GenericContainer; import org.testcontainers.containers.KafkaContainer; diff --git a/kafka-ui-api/src/test/java/com/provectus/kafka/ui/SchemaRegistryContainer.java b/kafka-ui-api/src/test/java/com/provectus/kafka/ui/container/SchemaRegistryContainer.java similarity index 96% rename from kafka-ui-api/src/test/java/com/provectus/kafka/ui/SchemaRegistryContainer.java rename to kafka-ui-api/src/test/java/com/provectus/kafka/ui/container/SchemaRegistryContainer.java index 1c9dc51225..b4bc94dab5 100644 --- a/kafka-ui-api/src/test/java/com/provectus/kafka/ui/SchemaRegistryContainer.java +++ b/kafka-ui-api/src/test/java/com/provectus/kafka/ui/container/SchemaRegistryContainer.java @@ -1,4 +1,4 @@ -package com.provectus.kafka.ui; +package com.provectus.kafka.ui.container; import org.testcontainers.containers.GenericContainer; import org.testcontainers.containers.KafkaContainer; diff --git a/kafka-ui-api/src/test/java/com/provectus/kafka/ui/producer/KafkaTestProducer.java b/kafka-ui-api/src/test/java/com/provectus/kafka/ui/producer/KafkaTestProducer.java new file mode 100644 index 0000000000..374caa8129 --- /dev/null +++ b/kafka-ui-api/src/test/java/com/provectus/kafka/ui/producer/KafkaTestProducer.java @@ -0,0 +1,35 @@ +package com.provectus.kafka.ui.producer; + +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.serialization.StringSerializer; +import org.testcontainers.containers.KafkaContainer; + +import java.util.Map; + +public class KafkaTestProducer implements AutoCloseable { + private final KafkaProducer producer; + + private KafkaTestProducer(KafkaProducer producer) { + this.producer = producer; + } + + public static KafkaTestProducer forKafka(KafkaContainer kafkaContainer) { + return new KafkaTestProducer<>(new KafkaProducer<>(Map.of( + ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaContainer.getBootstrapServers(), + ProducerConfig.CLIENT_ID_CONFIG, "KafkaTestProducer", + ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class, + ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class + ))); + } + + public void send(String topic, ValueT value) { + producer.send(new ProducerRecord<>(topic, value)); + } + + @Override + public void close() { + producer.close(); + } +}