package com.provectus.kafka.ui.service; import com.provectus.kafka.ui.emitter.BackwardRecordEmitter; import com.provectus.kafka.ui.emitter.ForwardRecordEmitter; import com.provectus.kafka.ui.exception.TopicNotFoundException; import com.provectus.kafka.ui.exception.ValidationException; import com.provectus.kafka.ui.model.ConsumerPosition; import com.provectus.kafka.ui.model.CreateTopicMessageDTO; import com.provectus.kafka.ui.model.KafkaCluster; import com.provectus.kafka.ui.model.SeekDirectionDTO; import com.provectus.kafka.ui.model.TopicMessageDTO; import com.provectus.kafka.ui.model.TopicMessageEventDTO; import com.provectus.kafka.ui.serde.DeserializationService; import com.provectus.kafka.ui.serde.RecordSerDe; import com.provectus.kafka.ui.util.FilterTopicMessageEvents; import com.provectus.kafka.ui.util.OffsetsSeekBackward; import com.provectus.kafka.ui.util.OffsetsSeekForward; import java.util.Collection; import java.util.List; import java.util.Map; import java.util.Optional; import java.util.Properties; import java.util.concurrent.CompletableFuture; import java.util.stream.Collectors; import javax.annotation.Nullable; import lombok.RequiredArgsConstructor; import lombok.extern.log4j.Log4j2; import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.header.Header; import org.apache.kafka.common.header.internals.RecordHeader; import org.apache.kafka.common.header.internals.RecordHeaders; import org.apache.kafka.common.serialization.ByteArraySerializer; import org.apache.kafka.common.utils.Bytes; import org.springframework.stereotype.Service; import org.springframework.util.StringUtils; import reactor.core.publisher.Flux; import reactor.core.publisher.FluxSink; import reactor.core.publisher.Mono; import reactor.core.scheduler.Schedulers; @Service @RequiredArgsConstructor @Log4j2 public class MessagesService { private static final int MAX_LOAD_RECORD_LIMIT = 100; private static final int DEFAULT_LOAD_RECORD_LIMIT = 20; private final AdminClientService adminClientService; private final DeserializationService deserializationService; private final ConsumerGroupService consumerGroupService; public Mono deleteTopicMessages(KafkaCluster cluster, String topicName, List partitionsToInclude) { if (!cluster.getTopics().containsKey(topicName)) { throw new TopicNotFoundException(); } return offsetsForDeletion(cluster, topicName, partitionsToInclude) .flatMap(offsets -> adminClientService.get(cluster).flatMap(ac -> ac.deleteRecords(offsets))); } private Mono> offsetsForDeletion(KafkaCluster cluster, String topicName, List partitionsToInclude) { return Mono.fromSupplier(() -> { try (KafkaConsumer consumer = consumerGroupService.createConsumer(cluster)) { return significantOffsets(consumer, topicName, partitionsToInclude); } catch (Exception e) { log.error("Error occurred while consuming records", e); throw new RuntimeException(e); } }); } public Mono sendMessage(KafkaCluster cluster, String topic, CreateTopicMessageDTO msg) { if (msg.getKey() == null && msg.getContent() == null) { throw new ValidationException("Invalid message: both key and value can't be null"); } if (msg.getPartition() != null && msg.getPartition() > cluster.getTopics().get(topic).getPartitionCount() - 1) { throw new ValidationException("Invalid partition"); } RecordSerDe serde = deserializationService.getRecordDeserializerForCluster(cluster); Properties properties = new Properties(); properties.putAll(cluster.getProperties()); properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, cluster.getBootstrapServers()); properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class); properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class); try (KafkaProducer producer = new KafkaProducer<>(properties)) { ProducerRecord producerRecord = serde.serialize( topic, msg.getKey(), msg.getContent(), msg.getPartition() ); producerRecord = new ProducerRecord<>( producerRecord.topic(), producerRecord.partition(), producerRecord.key(), producerRecord.value(), createHeaders(msg.getHeaders())); CompletableFuture cf = new CompletableFuture<>(); producer.send(producerRecord, (metadata, exception) -> { if (exception != null) { cf.completeExceptionally(exception); } else { cf.complete(metadata); } }); return Mono.fromFuture(cf); } } private Iterable
createHeaders(@Nullable Map clientHeaders) { if (clientHeaders == null) { return new RecordHeaders(); } RecordHeaders headers = new RecordHeaders(); clientHeaders.forEach((k, v) -> headers.add(new RecordHeader(k, v.getBytes()))); return headers; } public Flux loadMessages(KafkaCluster cluster, String topic, ConsumerPosition consumerPosition, String query, Integer limit) { int recordsLimit = Optional.ofNullable(limit) .map(s -> Math.min(s, MAX_LOAD_RECORD_LIMIT)) .orElse(DEFAULT_LOAD_RECORD_LIMIT); java.util.function.Consumer> emitter; RecordSerDe recordDeserializer = deserializationService.getRecordDeserializerForCluster(cluster); if (consumerPosition.getSeekDirection().equals(SeekDirectionDTO.FORWARD)) { emitter = new ForwardRecordEmitter( () -> consumerGroupService.createConsumer(cluster), new OffsetsSeekForward(topic, consumerPosition), recordDeserializer ); } else { emitter = new BackwardRecordEmitter( (Map props) -> consumerGroupService.createConsumer(cluster, props), new OffsetsSeekBackward(topic, consumerPosition, recordsLimit), recordDeserializer ); } return Flux.create(emitter) .filter(m -> filterTopicMessage(m, query)) .takeWhile(new FilterTopicMessageEvents(recordsLimit)) .subscribeOn(Schedulers.elastic()) .share(); } /** * returns end offsets for partitions where start offset != end offsets. * This is useful when we need to verify that partition is not empty. */ public static Map significantOffsets(Consumer consumer, String topicName, Collection partitionsToInclude) { var partitions = consumer.partitionsFor(topicName).stream() .filter(p -> partitionsToInclude.isEmpty() || partitionsToInclude.contains(p.partition())) .map(p -> new TopicPartition(topicName, p.partition())) .collect(Collectors.toList()); var beginningOffsets = consumer.beginningOffsets(partitions); var endOffsets = consumer.endOffsets(partitions); return endOffsets.entrySet().stream() .filter(entry -> !beginningOffsets.get(entry.getKey()).equals(entry.getValue())) .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); } private boolean filterTopicMessage(TopicMessageEventDTO message, String query) { if (StringUtils.isEmpty(query) || !message.getType().equals(TopicMessageEventDTO.TypeEnum.MESSAGE)) { return true; } final TopicMessageDTO msg = message.getMessage(); return (!StringUtils.isEmpty(msg.getKey()) && msg.getKey().contains(query)) || (!StringUtils.isEmpty(msg.getContent()) && msg.getContent().contains(query)); } }