diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/MessagesService.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/MessagesService.java index b991884548..f79eaf8ba5 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/MessagesService.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/MessagesService.java @@ -29,6 +29,7 @@ import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; import org.apache.kafka.clients.admin.OffsetSpec; +import org.apache.kafka.clients.admin.TopicDescription; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; @@ -51,16 +52,20 @@ public class MessagesService { private final AdminClientService adminClientService; private final DeserializationService deserializationService; private final ConsumerGroupService consumerGroupService; - private final MetricsCache metricsCache; + + private Mono withExistingTopic(KafkaCluster cluster, String topicName) { + return adminClientService.get(cluster) + .flatMap(client -> client.describeTopic(topicName)) + .switchIfEmpty(Mono.error(new TopicNotFoundException())); + } public Mono deleteTopicMessages(KafkaCluster cluster, String topicName, List partitionsToInclude) { - if (!metricsCache.get(cluster).getTopicDescriptions().containsKey(topicName)) { - throw new TopicNotFoundException(); - } - return offsetsForDeletion(cluster, topicName, partitionsToInclude) - .flatMap(offsets -> - adminClientService.get(cluster).flatMap(ac -> ac.deleteRecords(offsets))); + return withExistingTopic(cluster, topicName) + .flatMap(td -> + offsetsForDeletion(cluster, topicName, partitionsToInclude) + .flatMap(offsets -> + adminClientService.get(cluster).flatMap(ac -> ac.deleteRecords(offsets)))); } private Mono> offsetsForDeletion(KafkaCluster cluster, String topicName, @@ -80,9 +85,15 @@ public class MessagesService { public Mono sendMessage(KafkaCluster cluster, String topic, CreateTopicMessageDTO msg) { + return withExistingTopic(cluster, topic) + .flatMap(desc -> sendMessageImpl(cluster, desc, msg)); + } + + private Mono sendMessageImpl(KafkaCluster cluster, + TopicDescription topicDescription, + CreateTopicMessageDTO msg) { if (msg.getPartition() != null - && msg.getPartition() > metricsCache.get(cluster).getTopicDescriptions() - .get(topic).partitions().size() - 1) { + && msg.getPartition() > topicDescription.partitions().size() - 1) { return Mono.error(new ValidationException("Invalid partition")); } RecordSerDe serde = @@ -95,7 +106,7 @@ public class MessagesService { properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class); try (KafkaProducer producer = new KafkaProducer<>(properties)) { ProducerRecord producerRecord = serde.serialize( - topic, + topicDescription.name(), msg.getKey().orElse(null), msg.getContent().orElse(null), msg.getPartition() @@ -134,6 +145,15 @@ public class MessagesService { ConsumerPosition consumerPosition, String query, MessageFilterTypeDTO filterQueryType, int limit) { + return withExistingTopic(cluster, topic) + .flux() + .flatMap(td -> loadMessagesImpl(cluster, topic, consumerPosition, query, filterQueryType, limit)); + } + + private Flux loadMessagesImpl(KafkaCluster cluster, String topic, + ConsumerPosition consumerPosition, String query, + MessageFilterTypeDTO filterQueryType, + int limit) { java.util.function.Consumer> emitter; RecordSerDe recordDeserializer = diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/ReactiveAdminClient.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/ReactiveAdminClient.java index b333d3ecba..1bc18ca96c 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/ReactiveAdminClient.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/ReactiveAdminClient.java @@ -161,6 +161,13 @@ public class ReactiveAdminClient implements Closeable { ); } + /** + * Returns TopicDescription mono, or Empty Mono if topic not found. + */ + public Mono describeTopic(String topic) { + return describeTopics(List.of(topic)).flatMap(m -> Mono.justOrEmpty(m.get(topic))); + } + /** * Kafka API often returns Map responses with KafkaFuture values. If we do allOf() * logic resulting Mono will be failing if any of Futures finished with error. diff --git a/kafka-ui-api/src/test/java/com/provectus/kafka/ui/AbstractBaseTest.java b/kafka-ui-api/src/test/java/com/provectus/kafka/ui/AbstractBaseTest.java index 755c6895b5..b12d3ab903 100644 --- a/kafka-ui-api/src/test/java/com/provectus/kafka/ui/AbstractBaseTest.java +++ b/kafka-ui-api/src/test/java/com/provectus/kafka/ui/AbstractBaseTest.java @@ -10,6 +10,7 @@ import org.apache.kafka.clients.admin.NewTopic; import org.jetbrains.annotations.NotNull; import org.junit.jupiter.api.extension.ExtendWith; import org.junit.jupiter.api.function.ThrowingConsumer; +import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.context.ApplicationContextInitializer; import org.springframework.context.ConfigurableApplicationContext; @@ -88,4 +89,8 @@ public abstract class AbstractBaseTest { } } } + + @Autowired + protected ConfigurableApplicationContext applicationContext; + } diff --git a/kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/MessagesServiceTest.java b/kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/MessagesServiceTest.java new file mode 100644 index 0000000000..b999a23730 --- /dev/null +++ b/kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/MessagesServiceTest.java @@ -0,0 +1,54 @@ +package com.provectus.kafka.ui.service; + +import com.provectus.kafka.ui.AbstractBaseTest; +import com.provectus.kafka.ui.exception.TopicNotFoundException; +import com.provectus.kafka.ui.model.CreateTopicMessageDTO; +import com.provectus.kafka.ui.model.KafkaCluster; +import java.util.List; +import java.util.UUID; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.test.context.ContextConfiguration; +import reactor.test.StepVerifier; + +@ContextConfiguration(initializers = {AbstractBaseTest.Initializer.class}) +class MessagesServiceTest extends AbstractBaseTest { + + private static final String NON_EXISTING_TOPIC = UUID.randomUUID().toString(); + + @Autowired + MessagesService messagesService; + + KafkaCluster cluster; + + @BeforeEach + void init() { + cluster = applicationContext + .getBean(ClustersStorage.class) + .getClusterByName(LOCAL) + .get(); + } + + @Test + void deleteTopicMessagesReturnsExceptionWhenTopicNotFound() { + StepVerifier.create(messagesService.deleteTopicMessages(cluster, NON_EXISTING_TOPIC, List.of())) + .expectError(TopicNotFoundException.class) + .verify(); + } + + @Test + void sendMessageReturnsExceptionWhenTopicNotFound() { + StepVerifier.create(messagesService.sendMessage(cluster, NON_EXISTING_TOPIC, new CreateTopicMessageDTO())) + .expectError(TopicNotFoundException.class) + .verify(); + } + + @Test + void loadMessagesReturnsExceptionWhenTopicNotFound() { + StepVerifier.create(messagesService.loadMessages(cluster, NON_EXISTING_TOPIC, null, null, null, 1)) + .expectError(TopicNotFoundException.class) + .verify(); + } + +} \ No newline at end of file