ConsumingService.java 7.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203
  1. package com.provectus.kafka.ui.service;
  2. import com.fasterxml.jackson.databind.JsonNode;
  3. import com.fasterxml.jackson.databind.ObjectMapper;
  4. import com.provectus.kafka.ui.deserialization.DeserializationService;
  5. import com.provectus.kafka.ui.deserialization.RecordDeserializer;
  6. import com.provectus.kafka.ui.model.ConsumerPosition;
  7. import com.provectus.kafka.ui.model.KafkaCluster;
  8. import com.provectus.kafka.ui.util.ClusterUtil;
  9. import com.provectus.kafka.ui.model.SeekType;
  10. import com.provectus.kafka.ui.model.TopicMessage;
  11. import lombok.RequiredArgsConstructor;
  12. import lombok.extern.log4j.Log4j2;
  13. import org.apache.commons.lang3.StringUtils;
  14. import org.apache.kafka.clients.consumer.ConsumerRecord;
  15. import org.apache.kafka.clients.consumer.ConsumerRecords;
  16. import org.apache.kafka.clients.consumer.KafkaConsumer;
  17. import org.apache.kafka.common.TopicPartition;
  18. import org.apache.kafka.common.utils.Bytes;
  19. import org.springframework.stereotype.Service;
  20. import reactor.core.publisher.Flux;
  21. import reactor.core.publisher.FluxSink;
  22. import reactor.core.publisher.Mono;
  23. import reactor.core.scheduler.Schedulers;
  24. import java.time.Duration;
  25. import java.util.LinkedList;
  26. import java.util.List;
  27. import java.util.Map;
  28. import java.util.Optional;
  29. import java.util.stream.Collectors;
  30. @Service
  31. @Log4j2
  32. @RequiredArgsConstructor
  33. public class ConsumingService {
  34. private static final int MAX_RECORD_LIMIT = 100;
  35. private static final int DEFAULT_RECORD_LIMIT = 20;
  36. private final KafkaService kafkaService;
  37. private final DeserializationService deserializationService;
  38. private final ObjectMapper objectMapper = new ObjectMapper();
  39. public Flux<TopicMessage> loadMessages(KafkaCluster cluster, String topic, ConsumerPosition consumerPosition, String query, Integer limit) {
  40. int recordsLimit = Optional.ofNullable(limit)
  41. .map(s -> Math.min(s, MAX_RECORD_LIMIT))
  42. .orElse(DEFAULT_RECORD_LIMIT);
  43. RecordEmitter emitter = new RecordEmitter(kafkaService, cluster, topic, consumerPosition);
  44. RecordDeserializer recordDeserializer = deserializationService.getRecordDeserializerForCluster(cluster);
  45. return Flux.create(emitter::emit)
  46. .subscribeOn(Schedulers.boundedElastic())
  47. .map(r -> ClusterUtil.mapToTopicMessage(r, recordDeserializer))
  48. .filter(m -> filterTopicMessage(m, query))
  49. .limitRequest(recordsLimit);
  50. }
  51. public Mono<Map<TopicPartition, Long>> loadOffsets(KafkaCluster cluster, String topicName, List<Integer> partitionsToInclude) {
  52. return Mono.fromSupplier(() -> {
  53. try (KafkaConsumer<Bytes, Bytes> consumer = kafkaService.createConsumer(cluster)) {
  54. var partitions = consumer.partitionsFor(topicName).stream()
  55. .filter(p -> partitionsToInclude.isEmpty() || partitionsToInclude.contains(p.partition()))
  56. .map(p -> new TopicPartition(topicName, p.partition()))
  57. .collect(Collectors.toList());
  58. var beginningOffsets = consumer.beginningOffsets(partitions);
  59. var endOffsets = consumer.endOffsets(partitions);
  60. return endOffsets.entrySet().stream()
  61. .filter(entry -> !beginningOffsets.get(entry.getKey()).equals(entry.getValue()))
  62. .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
  63. } catch (Exception e) {
  64. log.error("Error occurred while consuming records", e);
  65. throw new RuntimeException(e);
  66. }
  67. });
  68. }
  69. private boolean filterTopicMessage(TopicMessage message, String query) {
  70. if (StringUtils.isEmpty(query)) {
  71. return true;
  72. }
  73. Object content = message.getContent();
  74. JsonNode tree = objectMapper.valueToTree(content);
  75. return treeContainsValue(tree, query);
  76. }
  77. private boolean treeContainsValue(JsonNode tree, String query) {
  78. LinkedList<JsonNode> nodesForSearch = new LinkedList<>();
  79. nodesForSearch.add(tree);
  80. while (!nodesForSearch.isEmpty()) {
  81. JsonNode node = nodesForSearch.removeFirst();
  82. if (node.isContainerNode()) {
  83. node.elements().forEachRemaining(nodesForSearch::add);
  84. continue;
  85. }
  86. String nodeValue = node.asText();
  87. if (nodeValue.contains(query)) {
  88. return true;
  89. }
  90. }
  91. return false;
  92. }
  93. @RequiredArgsConstructor
  94. private static class RecordEmitter {
  95. private static final int MAX_EMPTY_POLLS_COUNT = 3;
  96. private static final Duration POLL_TIMEOUT_MS = Duration.ofMillis(1000L);
  97. private final KafkaService kafkaService;
  98. private final KafkaCluster cluster;
  99. private final String topic;
  100. private final ConsumerPosition consumerPosition;
  101. public void emit(FluxSink<ConsumerRecord<Bytes, Bytes>> sink) {
  102. try (KafkaConsumer<Bytes, Bytes> consumer = kafkaService.createConsumer(cluster)) {
  103. assignAndSeek(consumer);
  104. int emptyPollsCount = 0;
  105. log.info("assignment: {}", consumer.assignment());
  106. while (!sink.isCancelled()) {
  107. ConsumerRecords<Bytes, Bytes> records = consumer.poll(POLL_TIMEOUT_MS);
  108. log.info("{} records polled", records.count());
  109. if (records.count() == 0 && emptyPollsCount > MAX_EMPTY_POLLS_COUNT) {
  110. break;
  111. } else {
  112. emptyPollsCount++;
  113. }
  114. records.iterator()
  115. .forEachRemaining(sink::next);
  116. }
  117. sink.complete();
  118. } catch (Exception e) {
  119. log.error("Error occurred while consuming records", e);
  120. throw new RuntimeException(e);
  121. }
  122. }
  123. private List<TopicPartition> getRequestedPartitions() {
  124. Map<Integer, Long> partitionPositions = consumerPosition.getSeekTo();
  125. return Optional.ofNullable(cluster.getTopics().get(topic))
  126. .orElseThrow(() -> new IllegalArgumentException("Unknown topic: " + topic))
  127. .getPartitions().values().stream()
  128. .filter(internalPartition -> partitionPositions.isEmpty() || partitionPositions.containsKey(internalPartition.getPartition()))
  129. .map(partitionInfo -> new TopicPartition(topic, partitionInfo.getPartition()))
  130. .collect(Collectors.toList());
  131. }
  132. private void assignAndSeek(KafkaConsumer<Bytes, Bytes> consumer) {
  133. SeekType seekType = consumerPosition.getSeekType();
  134. switch (seekType) {
  135. case OFFSET:
  136. assignAndSeekForOffset(consumer);
  137. break;
  138. case TIMESTAMP:
  139. assignAndSeekForTimestamp(consumer);
  140. break;
  141. case BEGINNING:
  142. assignAndSeekFromBeginning(consumer);
  143. break;
  144. default:
  145. throw new IllegalArgumentException("Unknown seekType: " + seekType);
  146. }
  147. }
  148. private void assignAndSeekForOffset(KafkaConsumer<Bytes, Bytes> consumer) {
  149. List<TopicPartition> partitions = getRequestedPartitions();
  150. consumer.assign(partitions);
  151. consumerPosition.getSeekTo().forEach((partition, offset) -> {
  152. TopicPartition topicPartition = new TopicPartition(topic, partition);
  153. consumer.seek(topicPartition, offset);
  154. });
  155. }
  156. private void assignAndSeekForTimestamp(KafkaConsumer<Bytes, Bytes> consumer) {
  157. Map<TopicPartition, Long> timestampsToSearch = consumerPosition.getSeekTo().entrySet().stream()
  158. .collect(Collectors.toMap(
  159. partitionPosition -> new TopicPartition(topic, partitionPosition.getKey()),
  160. Map.Entry::getValue
  161. ));
  162. Map<TopicPartition, Long> offsetsForTimestamps = consumer.offsetsForTimes(timestampsToSearch)
  163. .entrySet().stream()
  164. .filter(e -> e.getValue() != null)
  165. .collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue().offset()));
  166. if (offsetsForTimestamps.isEmpty()) {
  167. throw new IllegalArgumentException("No offsets were found for requested timestamps");
  168. }
  169. consumer.assign(offsetsForTimestamps.keySet());
  170. offsetsForTimestamps.forEach(consumer::seek);
  171. }
  172. private void assignAndSeekFromBeginning(KafkaConsumer<Bytes, Bytes> consumer) {
  173. List<TopicPartition> partitions = getRequestedPartitions();
  174. consumer.assign(partitions);
  175. consumer.seekToBeginning(partitions);
  176. }
  177. }
  178. }