added test for delete action

This commit is contained in:
Ramazan Yapparov 2021-03-11 16:30:33 +03:00
parent 1b5f5e29d0
commit 49e10c9b6e
7 changed files with 114 additions and 7 deletions

View file

@ -207,6 +207,6 @@ public class ClusterService {
.map(partition -> new TopicPartition(topicName, partition))
.collect(Collectors.toList());
return consumingService.loadOffsets(cluster, partitions)
.flatMap(offsets -> kafkaService.deleteTopicMessages(cluster, topicName, offsets)).next();
.flatMap(offsets -> kafkaService.deleteTopicMessages(cluster, topicName, offsets));
}
}

View file

@ -21,6 +21,8 @@ import org.apache.kafka.common.utils.Bytes;
import org.springframework.stereotype.Service;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;
import reactor.core.publisher.Mono;
import reactor.core.publisher.MonoSink;
import reactor.core.scheduler.Schedulers;
import java.time.Duration;
@ -55,9 +57,9 @@ public class ConsumingService {
.limitRequest(recordsLimit);
}
public Flux<Map<TopicPartition, Long>> loadOffsets(KafkaCluster cluster, List<TopicPartition> partitions) {
public Mono<Map<TopicPartition, Long>> loadOffsets(KafkaCluster cluster, List<TopicPartition> partitions) {
OffsetEmitter emitter = new OffsetEmitter(kafkaService, cluster, partitions);
return Flux.create(emitter::emit)
return Mono.create(emitter::emit)
.subscribeOn(Schedulers.boundedElastic());
}
@ -194,10 +196,10 @@ public class ConsumingService {
private final KafkaCluster cluster;
private final List<TopicPartition> partitions;
public void emit(FluxSink<Map<TopicPartition, Long>> sink) {
public void emit(MonoSink<Map<TopicPartition, Long>> sink) {
try (KafkaConsumer<Bytes, Bytes> consumer = kafkaService.createConsumer(cluster)) {
Map<TopicPartition, Long> offsets = consumer.endOffsets(partitions);
sink.next(offsets);
sink.success(offsets);
} catch (Exception e) {
log.error("Error occurred while consuming records", e);
throw new RuntimeException(e);

View file

@ -1,5 +1,7 @@
package com.provectus.kafka.ui;
import com.provectus.kafka.ui.container.KafkaConnectContainer;
import com.provectus.kafka.ui.container.SchemaRegistryContainer;
import org.jetbrains.annotations.NotNull;
import org.junit.jupiter.api.extension.ExtendWith;
import org.springframework.boot.test.context.SpringBootTest;

View file

@ -0,0 +1,68 @@
package com.provectus.kafka.ui;
import com.provectus.kafka.ui.model.TopicFormData;
import com.provectus.kafka.ui.model.TopicMessage;
import com.provectus.kafka.ui.producer.KafkaTestProducer;
import lombok.extern.log4j.Log4j2;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.autoconfigure.web.reactive.AutoConfigureWebTestClient;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.web.reactive.server.WebTestClient;
import java.util.Map;
import java.util.UUID;
import java.util.stream.Stream;
@ContextConfiguration(initializers = {AbstractBaseTest.Initializer.class})
@Log4j2
@AutoConfigureWebTestClient(timeout = "60000")
public class KafkaConsumerTests extends AbstractBaseTest {
@Autowired
private WebTestClient webTestClient;
@Test
public void shouldDeleteRecords() {
var topicName = UUID.randomUUID().toString();
webTestClient.post()
.uri("/api/clusters/{clusterName}/topics", LOCAL)
.bodyValue(new TopicFormData()
.name(topicName)
.partitions(1)
.replicationFactor(1)
.configs(Map.of())
)
.exchange()
.expectStatus()
.isOk();
try(KafkaTestProducer<String, String> producer = KafkaTestProducer.forKafka(kafka)) {
Stream.of("one", "two", "three", "four")
.forEach(value -> producer.send(topicName, value));
}
webTestClient.get()
.uri("/api/clusters/{clusterName}/topics/{topicName}/messages", LOCAL, topicName)
.exchange()
.expectStatus()
.isOk()
.expectBodyList(TopicMessage.class)
.hasSize(4);
webTestClient.delete()
.uri("/api/clusters/{clusterName}/topics/{topicName}/messages", LOCAL, topicName)
.exchange()
.expectStatus()
.isOk();
webTestClient.get()
.uri("/api/clusters/{clusterName}/topics/{topicName}/messages", LOCAL, topicName)
.exchange()
.expectStatus()
.isOk()
.expectBodyList(TopicMessage.class)
.hasSize(0);
}
}

View file

@ -1,4 +1,4 @@
package com.provectus.kafka.ui;
package com.provectus.kafka.ui.container;
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.containers.KafkaContainer;

View file

@ -1,4 +1,4 @@
package com.provectus.kafka.ui;
package com.provectus.kafka.ui.container;
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.containers.KafkaContainer;

View file

@ -0,0 +1,35 @@
package com.provectus.kafka.ui.producer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import org.testcontainers.containers.KafkaContainer;
import java.util.Map;
public class KafkaTestProducer<KeyT, ValueT> implements AutoCloseable {
private final KafkaProducer<KeyT, ValueT> producer;
private KafkaTestProducer(KafkaProducer<KeyT, ValueT> producer) {
this.producer = producer;
}
public static KafkaTestProducer<String, String> forKafka(KafkaContainer kafkaContainer) {
return new KafkaTestProducer<>(new KafkaProducer<>(Map.of(
ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaContainer.getBootstrapServers(),
ProducerConfig.CLIENT_ID_CONFIG, "KafkaTestProducer",
ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class,
ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class
)));
}
public void send(String topic, ValueT value) {
producer.send(new ProducerRecord<>(topic, value));
}
@Override
public void close() {
producer.close();
}
}