ConsumingService.java 7.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187
  1. package com.provectus.kafka.ui.service;
  2. import com.fasterxml.jackson.databind.JsonNode;
  3. import com.fasterxml.jackson.databind.ObjectMapper;
  4. import com.provectus.kafka.ui.deserialization.DeserializationService;
  5. import com.provectus.kafka.ui.deserialization.RecordDeserializer;
  6. import com.provectus.kafka.ui.model.ConsumerPosition;
  7. import com.provectus.kafka.ui.model.KafkaCluster;
  8. import com.provectus.kafka.ui.model.SeekDirection;
  9. import com.provectus.kafka.ui.model.TopicMessage;
  10. import com.provectus.kafka.ui.util.ClusterUtil;
  11. import com.provectus.kafka.ui.util.OffsetsSeek;
  12. import com.provectus.kafka.ui.util.OffsetsSeekBackward;
  13. import com.provectus.kafka.ui.util.OffsetsSeekForward;
  14. import java.time.Duration;
  15. import java.util.Collection;
  16. import java.util.Comparator;
  17. import java.util.LinkedList;
  18. import java.util.List;
  19. import java.util.Map;
  20. import java.util.Optional;
  21. import java.util.function.Supplier;
  22. import java.util.stream.Collectors;
  23. import java.util.stream.StreamSupport;
  24. import lombok.RequiredArgsConstructor;
  25. import lombok.extern.log4j.Log4j2;
  26. import org.apache.commons.lang3.StringUtils;
  27. import org.apache.kafka.clients.consumer.Consumer;
  28. import org.apache.kafka.clients.consumer.ConsumerRecord;
  29. import org.apache.kafka.clients.consumer.ConsumerRecords;
  30. import org.apache.kafka.clients.consumer.KafkaConsumer;
  31. import org.apache.kafka.common.TopicPartition;
  32. import org.apache.kafka.common.utils.Bytes;
  33. import org.springframework.stereotype.Service;
  34. import reactor.core.publisher.Flux;
  35. import reactor.core.publisher.FluxSink;
  36. import reactor.core.publisher.Mono;
  37. import reactor.core.scheduler.Schedulers;
  38. @Service
  39. @Log4j2
  40. @RequiredArgsConstructor
  41. public class ConsumingService {
  42. private static final int MAX_RECORD_LIMIT = 100;
  43. private static final int DEFAULT_RECORD_LIMIT = 20;
  44. private final KafkaService kafkaService;
  45. private final DeserializationService deserializationService;
  46. private final ObjectMapper objectMapper = new ObjectMapper();
  47. public Flux<TopicMessage> loadMessages(KafkaCluster cluster, String topic,
  48. ConsumerPosition consumerPosition, String query,
  49. Integer limit) {
  50. int recordsLimit = Optional.ofNullable(limit)
  51. .map(s -> Math.min(s, MAX_RECORD_LIMIT))
  52. .orElse(DEFAULT_RECORD_LIMIT);
  53. RecordEmitter emitter = new RecordEmitter(
  54. () -> kafkaService.createConsumer(cluster),
  55. consumerPosition.getSeekDirection().equals(SeekDirection.FORWARD)
  56. ? new OffsetsSeekForward(topic, consumerPosition)
  57. : new OffsetsSeekBackward(topic, consumerPosition, recordsLimit)
  58. );
  59. RecordDeserializer recordDeserializer =
  60. deserializationService.getRecordDeserializerForCluster(cluster);
  61. return Flux.create(emitter)
  62. .subscribeOn(Schedulers.boundedElastic())
  63. .map(r -> ClusterUtil.mapToTopicMessage(r, recordDeserializer))
  64. .filter(m -> filterTopicMessage(m, query))
  65. .limitRequest(recordsLimit);
  66. }
  67. public Mono<Map<TopicPartition, Long>> offsetsForDeletion(KafkaCluster cluster, String topicName,
  68. List<Integer> partitionsToInclude) {
  69. return Mono.fromSupplier(() -> {
  70. try (KafkaConsumer<Bytes, Bytes> consumer = kafkaService.createConsumer(cluster)) {
  71. return significantOffsets(consumer, topicName, partitionsToInclude);
  72. } catch (Exception e) {
  73. log.error("Error occurred while consuming records", e);
  74. throw new RuntimeException(e);
  75. }
  76. });
  77. }
  78. /**
  79. * returns end offsets for partitions where start offset != end offsets.
  80. * This is useful when we need to verify that partition is not empty.
  81. */
  82. public static Map<TopicPartition, Long> significantOffsets(Consumer<?, ?> consumer,
  83. String topicName,
  84. Collection<Integer>
  85. partitionsToInclude) {
  86. var partitions = consumer.partitionsFor(topicName).stream()
  87. .filter(p -> partitionsToInclude.isEmpty() || partitionsToInclude.contains(p.partition()))
  88. .map(p -> new TopicPartition(topicName, p.partition()))
  89. .collect(Collectors.toList());
  90. var beginningOffsets = consumer.beginningOffsets(partitions);
  91. var endOffsets = consumer.endOffsets(partitions);
  92. return endOffsets.entrySet().stream()
  93. .filter(entry -> !beginningOffsets.get(entry.getKey()).equals(entry.getValue()))
  94. .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
  95. }
  96. private boolean filterTopicMessage(TopicMessage message, String query) {
  97. if (StringUtils.isEmpty(query)) {
  98. return true;
  99. }
  100. Object content = message.getContent();
  101. JsonNode tree = objectMapper.valueToTree(content);
  102. return treeContainsValue(tree, query);
  103. }
  104. private boolean treeContainsValue(JsonNode tree, String query) {
  105. LinkedList<JsonNode> nodesForSearch = new LinkedList<>();
  106. nodesForSearch.add(tree);
  107. while (!nodesForSearch.isEmpty()) {
  108. JsonNode node = nodesForSearch.removeFirst();
  109. if (node.isContainerNode()) {
  110. node.elements().forEachRemaining(nodesForSearch::add);
  111. continue;
  112. }
  113. String nodeValue = node.asText();
  114. if (nodeValue.contains(query)) {
  115. return true;
  116. }
  117. }
  118. return false;
  119. }
  120. @RequiredArgsConstructor
  121. static class RecordEmitter
  122. implements java.util.function.Consumer<FluxSink<ConsumerRecord<Bytes, Bytes>>> {
  123. private static final Duration POLL_TIMEOUT_MS = Duration.ofMillis(1000L);
  124. private static final Comparator<ConsumerRecord<?, ?>> PARTITION_COMPARING =
  125. Comparator.comparing(
  126. ConsumerRecord::partition,
  127. Comparator.nullsFirst(Comparator.naturalOrder())
  128. );
  129. private static final Comparator<ConsumerRecord<?, ?>> REVERED_COMPARING =
  130. PARTITION_COMPARING.thenComparing(ConsumerRecord::offset).reversed();
  131. private final Supplier<KafkaConsumer<Bytes, Bytes>> consumerSupplier;
  132. private final OffsetsSeek offsetsSeek;
  133. @Override
  134. public void accept(FluxSink<ConsumerRecord<Bytes, Bytes>> sink) {
  135. try (KafkaConsumer<Bytes, Bytes> consumer = consumerSupplier.get()) {
  136. var waitingOffsets = offsetsSeek.assignAndSeek(consumer);
  137. while (!sink.isCancelled() && !waitingOffsets.endReached()) {
  138. ConsumerRecords<Bytes, Bytes> records = consumer.poll(POLL_TIMEOUT_MS);
  139. log.info("{} records polled", records.count());
  140. final Iterable<ConsumerRecord<Bytes, Bytes>> iterable;
  141. if (offsetsSeek.getConsumerPosition().getSeekDirection().equals(SeekDirection.FORWARD)) {
  142. iterable = records;
  143. } else {
  144. iterable = StreamSupport.stream(records.spliterator(), false)
  145. .sorted(REVERED_COMPARING).collect(Collectors.toList());
  146. }
  147. for (ConsumerRecord<Bytes, Bytes> record : iterable) {
  148. if (!sink.isCancelled() && !waitingOffsets.endReached()) {
  149. sink.next(record);
  150. waitingOffsets.markPolled(record);
  151. } else {
  152. break;
  153. }
  154. }
  155. }
  156. sink.complete();
  157. log.info("Polling finished");
  158. } catch (Exception e) {
  159. log.error("Error occurred while consuming records", e);
  160. throw new RuntimeException(e);
  161. }
  162. }
  163. }
  164. }