|
@@ -29,6 +29,7 @@ import lombok.RequiredArgsConstructor;
|
|
|
import lombok.extern.slf4j.Slf4j;
|
|
|
import org.apache.commons.lang3.StringUtils;
|
|
|
import org.apache.kafka.clients.admin.OffsetSpec;
|
|
|
+import org.apache.kafka.clients.admin.TopicDescription;
|
|
|
import org.apache.kafka.clients.producer.KafkaProducer;
|
|
|
import org.apache.kafka.clients.producer.ProducerConfig;
|
|
|
import org.apache.kafka.clients.producer.ProducerRecord;
|
|
@@ -51,16 +52,20 @@ public class MessagesService {
|
|
|
private final AdminClientService adminClientService;
|
|
|
private final DeserializationService deserializationService;
|
|
|
private final ConsumerGroupService consumerGroupService;
|
|
|
- private final MetricsCache metricsCache;
|
|
|
+
|
|
|
+ private Mono<TopicDescription> withExistingTopic(KafkaCluster cluster, String topicName) {
|
|
|
+ return adminClientService.get(cluster)
|
|
|
+ .flatMap(client -> client.describeTopic(topicName))
|
|
|
+ .switchIfEmpty(Mono.error(new TopicNotFoundException()));
|
|
|
+ }
|
|
|
|
|
|
public Mono<Void> deleteTopicMessages(KafkaCluster cluster, String topicName,
|
|
|
List<Integer> partitionsToInclude) {
|
|
|
- if (!metricsCache.get(cluster).getTopicDescriptions().containsKey(topicName)) {
|
|
|
- throw new TopicNotFoundException();
|
|
|
- }
|
|
|
- return offsetsForDeletion(cluster, topicName, partitionsToInclude)
|
|
|
- .flatMap(offsets ->
|
|
|
- adminClientService.get(cluster).flatMap(ac -> ac.deleteRecords(offsets)));
|
|
|
+ return withExistingTopic(cluster, topicName)
|
|
|
+ .flatMap(td ->
|
|
|
+ offsetsForDeletion(cluster, topicName, partitionsToInclude)
|
|
|
+ .flatMap(offsets ->
|
|
|
+ adminClientService.get(cluster).flatMap(ac -> ac.deleteRecords(offsets))));
|
|
|
}
|
|
|
|
|
|
private Mono<Map<TopicPartition, Long>> offsetsForDeletion(KafkaCluster cluster, String topicName,
|
|
@@ -80,9 +85,15 @@ public class MessagesService {
|
|
|
|
|
|
public Mono<RecordMetadata> sendMessage(KafkaCluster cluster, String topic,
|
|
|
CreateTopicMessageDTO msg) {
|
|
|
+ return withExistingTopic(cluster, topic)
|
|
|
+ .flatMap(desc -> sendMessageImpl(cluster, desc, msg));
|
|
|
+ }
|
|
|
+
|
|
|
+ private Mono<RecordMetadata> sendMessageImpl(KafkaCluster cluster,
|
|
|
+ TopicDescription topicDescription,
|
|
|
+ CreateTopicMessageDTO msg) {
|
|
|
if (msg.getPartition() != null
|
|
|
- && msg.getPartition() > metricsCache.get(cluster).getTopicDescriptions()
|
|
|
- .get(topic).partitions().size() - 1) {
|
|
|
+ && msg.getPartition() > topicDescription.partitions().size() - 1) {
|
|
|
return Mono.error(new ValidationException("Invalid partition"));
|
|
|
}
|
|
|
RecordSerDe serde =
|
|
@@ -95,7 +106,7 @@ public class MessagesService {
|
|
|
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);
|
|
|
try (KafkaProducer<byte[], byte[]> producer = new KafkaProducer<>(properties)) {
|
|
|
ProducerRecord<byte[], byte[]> producerRecord = serde.serialize(
|
|
|
- topic,
|
|
|
+ topicDescription.name(),
|
|
|
msg.getKey().orElse(null),
|
|
|
msg.getContent().orElse(null),
|
|
|
msg.getPartition()
|
|
@@ -134,6 +145,15 @@ public class MessagesService {
|
|
|
ConsumerPosition consumerPosition, String query,
|
|
|
MessageFilterTypeDTO filterQueryType,
|
|
|
int limit) {
|
|
|
+ return withExistingTopic(cluster, topic)
|
|
|
+ .flux()
|
|
|
+ .flatMap(td -> loadMessagesImpl(cluster, topic, consumerPosition, query, filterQueryType, limit));
|
|
|
+ }
|
|
|
+
|
|
|
+ private Flux<TopicMessageEventDTO> loadMessagesImpl(KafkaCluster cluster, String topic,
|
|
|
+ ConsumerPosition consumerPosition, String query,
|
|
|
+ MessageFilterTypeDTO filterQueryType,
|
|
|
+ int limit) {
|
|
|
|
|
|
java.util.function.Consumer<? super FluxSink<TopicMessageEventDTO>> emitter;
|
|
|
RecordSerDe recordDeserializer =
|