123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187 |
- package com.provectus.kafka.ui.service;
- import com.fasterxml.jackson.databind.JsonNode;
- import com.fasterxml.jackson.databind.ObjectMapper;
- import com.provectus.kafka.ui.deserialization.DeserializationService;
- import com.provectus.kafka.ui.deserialization.RecordDeserializer;
- import com.provectus.kafka.ui.model.ConsumerPosition;
- import com.provectus.kafka.ui.model.KafkaCluster;
- import com.provectus.kafka.ui.model.SeekDirection;
- import com.provectus.kafka.ui.model.TopicMessage;
- import com.provectus.kafka.ui.util.ClusterUtil;
- import com.provectus.kafka.ui.util.OffsetsSeek;
- import com.provectus.kafka.ui.util.OffsetsSeekBackward;
- import com.provectus.kafka.ui.util.OffsetsSeekForward;
- import java.time.Duration;
- import java.util.Collection;
- import java.util.Comparator;
- import java.util.LinkedList;
- import java.util.List;
- import java.util.Map;
- import java.util.Optional;
- import java.util.function.Supplier;
- import java.util.stream.Collectors;
- import java.util.stream.StreamSupport;
- import lombok.RequiredArgsConstructor;
- import lombok.extern.log4j.Log4j2;
- import org.apache.commons.lang3.StringUtils;
- import org.apache.kafka.clients.consumer.Consumer;
- import org.apache.kafka.clients.consumer.ConsumerRecord;
- import org.apache.kafka.clients.consumer.ConsumerRecords;
- import org.apache.kafka.clients.consumer.KafkaConsumer;
- import org.apache.kafka.common.TopicPartition;
- import org.apache.kafka.common.utils.Bytes;
- import org.springframework.stereotype.Service;
- import reactor.core.publisher.Flux;
- import reactor.core.publisher.FluxSink;
- import reactor.core.publisher.Mono;
- import reactor.core.scheduler.Schedulers;
- @Service
- @Log4j2
- @RequiredArgsConstructor
- public class ConsumingService {
- private static final int MAX_RECORD_LIMIT = 100;
- private static final int DEFAULT_RECORD_LIMIT = 20;
- private final KafkaService kafkaService;
- private final DeserializationService deserializationService;
- private final ObjectMapper objectMapper = new ObjectMapper();
- public Flux<TopicMessage> loadMessages(KafkaCluster cluster, String topic,
- ConsumerPosition consumerPosition, String query,
- Integer limit) {
- int recordsLimit = Optional.ofNullable(limit)
- .map(s -> Math.min(s, MAX_RECORD_LIMIT))
- .orElse(DEFAULT_RECORD_LIMIT);
- RecordEmitter emitter = new RecordEmitter(
- () -> kafkaService.createConsumer(cluster),
- consumerPosition.getSeekDirection().equals(SeekDirection.FORWARD)
- ? new OffsetsSeekForward(topic, consumerPosition)
- : new OffsetsSeekBackward(topic, consumerPosition, recordsLimit)
- );
- RecordDeserializer recordDeserializer =
- deserializationService.getRecordDeserializerForCluster(cluster);
- return Flux.create(emitter)
- .subscribeOn(Schedulers.boundedElastic())
- .map(r -> ClusterUtil.mapToTopicMessage(r, recordDeserializer))
- .filter(m -> filterTopicMessage(m, query))
- .limitRequest(recordsLimit);
- }
- public Mono<Map<TopicPartition, Long>> offsetsForDeletion(KafkaCluster cluster, String topicName,
- List<Integer> partitionsToInclude) {
- return Mono.fromSupplier(() -> {
- try (KafkaConsumer<Bytes, Bytes> consumer = kafkaService.createConsumer(cluster)) {
- return significantOffsets(consumer, topicName, partitionsToInclude);
- } catch (Exception e) {
- log.error("Error occurred while consuming records", e);
- throw new RuntimeException(e);
- }
- });
- }
- /**
- * returns end offsets for partitions where start offset != end offsets.
- * This is useful when we need to verify that partition is not empty.
- */
- public static Map<TopicPartition, Long> significantOffsets(Consumer<?, ?> consumer,
- String topicName,
- Collection<Integer>
- partitionsToInclude) {
- var partitions = consumer.partitionsFor(topicName).stream()
- .filter(p -> partitionsToInclude.isEmpty() || partitionsToInclude.contains(p.partition()))
- .map(p -> new TopicPartition(topicName, p.partition()))
- .collect(Collectors.toList());
- var beginningOffsets = consumer.beginningOffsets(partitions);
- var endOffsets = consumer.endOffsets(partitions);
- return endOffsets.entrySet().stream()
- .filter(entry -> !beginningOffsets.get(entry.getKey()).equals(entry.getValue()))
- .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
- }
- private boolean filterTopicMessage(TopicMessage message, String query) {
- if (StringUtils.isEmpty(query)) {
- return true;
- }
- Object content = message.getContent();
- JsonNode tree = objectMapper.valueToTree(content);
- return treeContainsValue(tree, query);
- }
- private boolean treeContainsValue(JsonNode tree, String query) {
- LinkedList<JsonNode> nodesForSearch = new LinkedList<>();
- nodesForSearch.add(tree);
- while (!nodesForSearch.isEmpty()) {
- JsonNode node = nodesForSearch.removeFirst();
- if (node.isContainerNode()) {
- node.elements().forEachRemaining(nodesForSearch::add);
- continue;
- }
- String nodeValue = node.asText();
- if (nodeValue.contains(query)) {
- return true;
- }
- }
- return false;
- }
- @RequiredArgsConstructor
- static class RecordEmitter
- implements java.util.function.Consumer<FluxSink<ConsumerRecord<Bytes, Bytes>>> {
- private static final Duration POLL_TIMEOUT_MS = Duration.ofMillis(1000L);
- private static final Comparator<ConsumerRecord<?, ?>> PARTITION_COMPARING =
- Comparator.comparing(
- ConsumerRecord::partition,
- Comparator.nullsFirst(Comparator.naturalOrder())
- );
- private static final Comparator<ConsumerRecord<?, ?>> REVERED_COMPARING =
- PARTITION_COMPARING.thenComparing(ConsumerRecord::offset).reversed();
- private final Supplier<KafkaConsumer<Bytes, Bytes>> consumerSupplier;
- private final OffsetsSeek offsetsSeek;
- @Override
- public void accept(FluxSink<ConsumerRecord<Bytes, Bytes>> sink) {
- try (KafkaConsumer<Bytes, Bytes> consumer = consumerSupplier.get()) {
- var waitingOffsets = offsetsSeek.assignAndSeek(consumer);
- while (!sink.isCancelled() && !waitingOffsets.endReached()) {
- ConsumerRecords<Bytes, Bytes> records = consumer.poll(POLL_TIMEOUT_MS);
- log.info("{} records polled", records.count());
- final Iterable<ConsumerRecord<Bytes, Bytes>> iterable;
- if (offsetsSeek.getConsumerPosition().getSeekDirection().equals(SeekDirection.FORWARD)) {
- iterable = records;
- } else {
- iterable = StreamSupport.stream(records.spliterator(), false)
- .sorted(REVERED_COMPARING).collect(Collectors.toList());
- }
- for (ConsumerRecord<Bytes, Bytes> record : iterable) {
- if (!sink.isCancelled() && !waitingOffsets.endReached()) {
- sink.next(record);
- waitingOffsets.markPolled(record);
- } else {
- break;
- }
- }
- }
- sink.complete();
- log.info("Polling finished");
- } catch (Exception e) {
- log.error("Error occurred while consuming records", e);
- throw new RuntimeException(e);
- }
- }
- }
- }
|