MessagesService.java 8.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195
  1. package com.provectus.kafka.ui.service;
  2. import com.provectus.kafka.ui.emitter.BackwardRecordEmitter;
  3. import com.provectus.kafka.ui.emitter.ForwardRecordEmitter;
  4. import com.provectus.kafka.ui.exception.TopicNotFoundException;
  5. import com.provectus.kafka.ui.exception.ValidationException;
  6. import com.provectus.kafka.ui.model.ConsumerPosition;
  7. import com.provectus.kafka.ui.model.CreateTopicMessageDTO;
  8. import com.provectus.kafka.ui.model.KafkaCluster;
  9. import com.provectus.kafka.ui.model.SeekDirectionDTO;
  10. import com.provectus.kafka.ui.model.TopicMessageDTO;
  11. import com.provectus.kafka.ui.model.TopicMessageEventDTO;
  12. import com.provectus.kafka.ui.serde.DeserializationService;
  13. import com.provectus.kafka.ui.serde.RecordSerDe;
  14. import com.provectus.kafka.ui.util.FilterTopicMessageEvents;
  15. import com.provectus.kafka.ui.util.OffsetsSeekBackward;
  16. import com.provectus.kafka.ui.util.OffsetsSeekForward;
  17. import java.util.Collection;
  18. import java.util.List;
  19. import java.util.Map;
  20. import java.util.Optional;
  21. import java.util.Properties;
  22. import java.util.concurrent.CompletableFuture;
  23. import java.util.stream.Collectors;
  24. import javax.annotation.Nullable;
  25. import lombok.RequiredArgsConstructor;
  26. import lombok.extern.log4j.Log4j2;
  27. import org.apache.kafka.clients.consumer.Consumer;
  28. import org.apache.kafka.clients.consumer.KafkaConsumer;
  29. import org.apache.kafka.clients.producer.KafkaProducer;
  30. import org.apache.kafka.clients.producer.ProducerConfig;
  31. import org.apache.kafka.clients.producer.ProducerRecord;
  32. import org.apache.kafka.clients.producer.RecordMetadata;
  33. import org.apache.kafka.common.TopicPartition;
  34. import org.apache.kafka.common.header.Header;
  35. import org.apache.kafka.common.header.internals.RecordHeader;
  36. import org.apache.kafka.common.header.internals.RecordHeaders;
  37. import org.apache.kafka.common.serialization.ByteArraySerializer;
  38. import org.apache.kafka.common.utils.Bytes;
  39. import org.springframework.stereotype.Service;
  40. import org.springframework.util.StringUtils;
  41. import reactor.core.publisher.Flux;
  42. import reactor.core.publisher.FluxSink;
  43. import reactor.core.publisher.Mono;
  44. import reactor.core.scheduler.Schedulers;
  45. @Service
  46. @RequiredArgsConstructor
  47. @Log4j2
  48. public class MessagesService {
  49. private static final int MAX_LOAD_RECORD_LIMIT = 100;
  50. private static final int DEFAULT_LOAD_RECORD_LIMIT = 20;
  51. private final AdminClientService adminClientService;
  52. private final DeserializationService deserializationService;
  53. private final ConsumerGroupService consumerGroupService;
  54. private final MetricsCache metricsCache;
  55. public Mono<Void> deleteTopicMessages(KafkaCluster cluster, String topicName,
  56. List<Integer> partitionsToInclude) {
  57. if (!metricsCache.get(cluster).getTopicDescriptions().containsKey(topicName)) {
  58. throw new TopicNotFoundException();
  59. }
  60. return offsetsForDeletion(cluster, topicName, partitionsToInclude)
  61. .flatMap(offsets ->
  62. adminClientService.get(cluster).flatMap(ac -> ac.deleteRecords(offsets)));
  63. }
  64. private Mono<Map<TopicPartition, Long>> offsetsForDeletion(KafkaCluster cluster, String topicName,
  65. List<Integer> partitionsToInclude) {
  66. return Mono.fromSupplier(() -> {
  67. try (KafkaConsumer<Bytes, Bytes> consumer = consumerGroupService.createConsumer(cluster)) {
  68. return significantOffsets(consumer, topicName, partitionsToInclude);
  69. } catch (Exception e) {
  70. log.error("Error occurred while consuming records", e);
  71. throw new RuntimeException(e);
  72. }
  73. });
  74. }
  75. public Mono<RecordMetadata> sendMessage(KafkaCluster cluster, String topic,
  76. CreateTopicMessageDTO msg) {
  77. if (msg.getKey() == null && msg.getContent() == null) {
  78. throw new ValidationException("Invalid message: both key and value can't be null");
  79. }
  80. if (msg.getPartition() != null
  81. && msg.getPartition() > metricsCache.get(cluster).getTopicDescriptions()
  82. .get(topic).partitions().size() - 1) {
  83. throw new ValidationException("Invalid partition");
  84. }
  85. RecordSerDe serde =
  86. deserializationService.getRecordDeserializerForCluster(cluster);
  87. Properties properties = new Properties();
  88. properties.putAll(cluster.getProperties());
  89. properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, cluster.getBootstrapServers());
  90. properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);
  91. properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);
  92. try (KafkaProducer<byte[], byte[]> producer = new KafkaProducer<>(properties)) {
  93. ProducerRecord<byte[], byte[]> producerRecord = serde.serialize(
  94. topic,
  95. msg.getKey(),
  96. msg.getContent(),
  97. msg.getPartition()
  98. );
  99. producerRecord = new ProducerRecord<>(
  100. producerRecord.topic(),
  101. producerRecord.partition(),
  102. producerRecord.key(),
  103. producerRecord.value(),
  104. createHeaders(msg.getHeaders()));
  105. CompletableFuture<RecordMetadata> cf = new CompletableFuture<>();
  106. producer.send(producerRecord, (metadata, exception) -> {
  107. if (exception != null) {
  108. cf.completeExceptionally(exception);
  109. } else {
  110. cf.complete(metadata);
  111. }
  112. });
  113. return Mono.fromFuture(cf);
  114. }
  115. }
  116. private Iterable<Header> createHeaders(@Nullable Map<String, String> clientHeaders) {
  117. if (clientHeaders == null) {
  118. return new RecordHeaders();
  119. }
  120. RecordHeaders headers = new RecordHeaders();
  121. clientHeaders.forEach((k, v) -> headers.add(new RecordHeader(k, v.getBytes())));
  122. return headers;
  123. }
  124. public Flux<TopicMessageEventDTO> loadMessages(KafkaCluster cluster, String topic,
  125. ConsumerPosition consumerPosition, String query,
  126. Integer limit) {
  127. int recordsLimit = Optional.ofNullable(limit)
  128. .map(s -> Math.min(s, MAX_LOAD_RECORD_LIMIT))
  129. .orElse(DEFAULT_LOAD_RECORD_LIMIT);
  130. java.util.function.Consumer<? super FluxSink<TopicMessageEventDTO>> emitter;
  131. RecordSerDe recordDeserializer =
  132. deserializationService.getRecordDeserializerForCluster(cluster);
  133. if (consumerPosition.getSeekDirection().equals(SeekDirectionDTO.FORWARD)) {
  134. emitter = new ForwardRecordEmitter(
  135. () -> consumerGroupService.createConsumer(cluster),
  136. new OffsetsSeekForward(topic, consumerPosition),
  137. recordDeserializer
  138. );
  139. } else {
  140. emitter = new BackwardRecordEmitter(
  141. (Map<String, Object> props) -> consumerGroupService.createConsumer(cluster, props),
  142. new OffsetsSeekBackward(topic, consumerPosition, recordsLimit),
  143. recordDeserializer
  144. );
  145. }
  146. return Flux.create(emitter)
  147. .filter(m -> filterTopicMessage(m, query))
  148. .takeWhile(new FilterTopicMessageEvents(recordsLimit))
  149. .subscribeOn(Schedulers.elastic())
  150. .share();
  151. }
  152. /**
  153. * returns end offsets for partitions where start offset != end offsets.
  154. * This is useful when we need to verify that partition is not empty.
  155. */
  156. public static Map<TopicPartition, Long> significantOffsets(Consumer<?, ?> consumer,
  157. String topicName,
  158. Collection<Integer>
  159. partitionsToInclude) {
  160. var partitions = consumer.partitionsFor(topicName).stream()
  161. .filter(p -> partitionsToInclude.isEmpty() || partitionsToInclude.contains(p.partition()))
  162. .map(p -> new TopicPartition(topicName, p.partition()))
  163. .collect(Collectors.toList());
  164. var beginningOffsets = consumer.beginningOffsets(partitions);
  165. var endOffsets = consumer.endOffsets(partitions);
  166. return endOffsets.entrySet().stream()
  167. .filter(entry -> !beginningOffsets.get(entry.getKey()).equals(entry.getValue()))
  168. .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
  169. }
  170. private boolean filterTopicMessage(TopicMessageEventDTO message, String query) {
  171. if (StringUtils.isEmpty(query)
  172. || !message.getType().equals(TopicMessageEventDTO.TypeEnum.MESSAGE)) {
  173. return true;
  174. }
  175. final TopicMessageDTO msg = message.getMessage();
  176. return (!StringUtils.isEmpty(msg.getKey()) && msg.getKey().contains(query))
  177. || (!StringUtils.isEmpty(msg.getContent()) && msg.getContent().contains(query));
  178. }
  179. }