MessagesService.java 8.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193
  1. package com.provectus.kafka.ui.service;
  2. import com.provectus.kafka.ui.emitter.BackwardRecordEmitter;
  3. import com.provectus.kafka.ui.emitter.ForwardRecordEmitter;
  4. import com.provectus.kafka.ui.exception.TopicNotFoundException;
  5. import com.provectus.kafka.ui.exception.ValidationException;
  6. import com.provectus.kafka.ui.model.ConsumerPosition;
  7. import com.provectus.kafka.ui.model.CreateTopicMessageDTO;
  8. import com.provectus.kafka.ui.model.KafkaCluster;
  9. import com.provectus.kafka.ui.model.SeekDirectionDTO;
  10. import com.provectus.kafka.ui.model.TopicMessageDTO;
  11. import com.provectus.kafka.ui.model.TopicMessageEventDTO;
  12. import com.provectus.kafka.ui.serde.DeserializationService;
  13. import com.provectus.kafka.ui.serde.RecordSerDe;
  14. import com.provectus.kafka.ui.util.FilterTopicMessageEvents;
  15. import com.provectus.kafka.ui.util.OffsetsSeekBackward;
  16. import com.provectus.kafka.ui.util.OffsetsSeekForward;
  17. import java.util.Collection;
  18. import java.util.List;
  19. import java.util.Map;
  20. import java.util.Optional;
  21. import java.util.Properties;
  22. import java.util.concurrent.CompletableFuture;
  23. import java.util.stream.Collectors;
  24. import javax.annotation.Nullable;
  25. import lombok.RequiredArgsConstructor;
  26. import lombok.extern.log4j.Log4j2;
  27. import org.apache.kafka.clients.consumer.Consumer;
  28. import org.apache.kafka.clients.consumer.KafkaConsumer;
  29. import org.apache.kafka.clients.producer.KafkaProducer;
  30. import org.apache.kafka.clients.producer.ProducerConfig;
  31. import org.apache.kafka.clients.producer.ProducerRecord;
  32. import org.apache.kafka.clients.producer.RecordMetadata;
  33. import org.apache.kafka.common.TopicPartition;
  34. import org.apache.kafka.common.header.Header;
  35. import org.apache.kafka.common.header.internals.RecordHeader;
  36. import org.apache.kafka.common.header.internals.RecordHeaders;
  37. import org.apache.kafka.common.serialization.ByteArraySerializer;
  38. import org.apache.kafka.common.utils.Bytes;
  39. import org.springframework.stereotype.Service;
  40. import org.springframework.util.StringUtils;
  41. import reactor.core.publisher.Flux;
  42. import reactor.core.publisher.FluxSink;
  43. import reactor.core.publisher.Mono;
  44. import reactor.core.scheduler.Schedulers;
  45. @Service
  46. @RequiredArgsConstructor
  47. @Log4j2
  48. public class MessagesService {
  49. private static final int MAX_LOAD_RECORD_LIMIT = 100;
  50. private static final int DEFAULT_LOAD_RECORD_LIMIT = 20;
  51. private final AdminClientService adminClientService;
  52. private final DeserializationService deserializationService;
  53. private final ConsumerGroupService consumerGroupService;
  54. public Mono<Void> deleteTopicMessages(KafkaCluster cluster, String topicName,
  55. List<Integer> partitionsToInclude) {
  56. if (!cluster.getTopics().containsKey(topicName)) {
  57. throw new TopicNotFoundException();
  58. }
  59. return offsetsForDeletion(cluster, topicName, partitionsToInclude)
  60. .flatMap(offsets ->
  61. adminClientService.get(cluster).flatMap(ac -> ac.deleteRecords(offsets)));
  62. }
  63. private Mono<Map<TopicPartition, Long>> offsetsForDeletion(KafkaCluster cluster, String topicName,
  64. List<Integer> partitionsToInclude) {
  65. return Mono.fromSupplier(() -> {
  66. try (KafkaConsumer<Bytes, Bytes> consumer = consumerGroupService.createConsumer(cluster)) {
  67. return significantOffsets(consumer, topicName, partitionsToInclude);
  68. } catch (Exception e) {
  69. log.error("Error occurred while consuming records", e);
  70. throw new RuntimeException(e);
  71. }
  72. });
  73. }
  74. public Mono<RecordMetadata> sendMessage(KafkaCluster cluster, String topic,
  75. CreateTopicMessageDTO msg) {
  76. if (msg.getKey() == null && msg.getContent() == null) {
  77. throw new ValidationException("Invalid message: both key and value can't be null");
  78. }
  79. if (msg.getPartition() != null
  80. && msg.getPartition() > cluster.getTopics().get(topic).getPartitionCount() - 1) {
  81. throw new ValidationException("Invalid partition");
  82. }
  83. RecordSerDe serde =
  84. deserializationService.getRecordDeserializerForCluster(cluster);
  85. Properties properties = new Properties();
  86. properties.putAll(cluster.getProperties());
  87. properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, cluster.getBootstrapServers());
  88. properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);
  89. properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);
  90. try (KafkaProducer<byte[], byte[]> producer = new KafkaProducer<>(properties)) {
  91. ProducerRecord<byte[], byte[]> producerRecord = serde.serialize(
  92. topic,
  93. msg.getKey(),
  94. msg.getContent(),
  95. msg.getPartition()
  96. );
  97. producerRecord = new ProducerRecord<>(
  98. producerRecord.topic(),
  99. producerRecord.partition(),
  100. producerRecord.key(),
  101. producerRecord.value(),
  102. createHeaders(msg.getHeaders()));
  103. CompletableFuture<RecordMetadata> cf = new CompletableFuture<>();
  104. producer.send(producerRecord, (metadata, exception) -> {
  105. if (exception != null) {
  106. cf.completeExceptionally(exception);
  107. } else {
  108. cf.complete(metadata);
  109. }
  110. });
  111. return Mono.fromFuture(cf);
  112. }
  113. }
  114. private Iterable<Header> createHeaders(@Nullable Map<String, String> clientHeaders) {
  115. if (clientHeaders == null) {
  116. return new RecordHeaders();
  117. }
  118. RecordHeaders headers = new RecordHeaders();
  119. clientHeaders.forEach((k, v) -> headers.add(new RecordHeader(k, v.getBytes())));
  120. return headers;
  121. }
  122. public Flux<TopicMessageEventDTO> loadMessages(KafkaCluster cluster, String topic,
  123. ConsumerPosition consumerPosition, String query,
  124. Integer limit) {
  125. int recordsLimit = Optional.ofNullable(limit)
  126. .map(s -> Math.min(s, MAX_LOAD_RECORD_LIMIT))
  127. .orElse(DEFAULT_LOAD_RECORD_LIMIT);
  128. java.util.function.Consumer<? super FluxSink<TopicMessageEventDTO>> emitter;
  129. RecordSerDe recordDeserializer =
  130. deserializationService.getRecordDeserializerForCluster(cluster);
  131. if (consumerPosition.getSeekDirection().equals(SeekDirectionDTO.FORWARD)) {
  132. emitter = new ForwardRecordEmitter(
  133. () -> consumerGroupService.createConsumer(cluster),
  134. new OffsetsSeekForward(topic, consumerPosition),
  135. recordDeserializer
  136. );
  137. } else {
  138. emitter = new BackwardRecordEmitter(
  139. (Map<String, Object> props) -> consumerGroupService.createConsumer(cluster, props),
  140. new OffsetsSeekBackward(topic, consumerPosition, recordsLimit),
  141. recordDeserializer
  142. );
  143. }
  144. return Flux.create(emitter)
  145. .filter(m -> filterTopicMessage(m, query))
  146. .takeWhile(new FilterTopicMessageEvents(recordsLimit))
  147. .subscribeOn(Schedulers.elastic())
  148. .share();
  149. }
  150. /**
  151. * returns end offsets for partitions where start offset != end offsets.
  152. * This is useful when we need to verify that partition is not empty.
  153. */
  154. public static Map<TopicPartition, Long> significantOffsets(Consumer<?, ?> consumer,
  155. String topicName,
  156. Collection<Integer>
  157. partitionsToInclude) {
  158. var partitions = consumer.partitionsFor(topicName).stream()
  159. .filter(p -> partitionsToInclude.isEmpty() || partitionsToInclude.contains(p.partition()))
  160. .map(p -> new TopicPartition(topicName, p.partition()))
  161. .collect(Collectors.toList());
  162. var beginningOffsets = consumer.beginningOffsets(partitions);
  163. var endOffsets = consumer.endOffsets(partitions);
  164. return endOffsets.entrySet().stream()
  165. .filter(entry -> !beginningOffsets.get(entry.getKey()).equals(entry.getValue()))
  166. .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
  167. }
  168. private boolean filterTopicMessage(TopicMessageEventDTO message, String query) {
  169. if (StringUtils.isEmpty(query)
  170. || !message.getType().equals(TopicMessageEventDTO.TypeEnum.MESSAGE)) {
  171. return true;
  172. }
  173. final TopicMessageDTO msg = message.getMessage();
  174. return (!StringUtils.isEmpty(msg.getKey()) && msg.getKey().contains(query))
  175. || (!StringUtils.isEmpty(msg.getContent()) && msg.getContent().contains(query));
  176. }
  177. }