MessagesService.java 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359
  1. package com.provectus.kafka.ui.service;
  2. import com.google.common.base.Charsets;
  3. import com.google.common.cache.Cache;
  4. import com.google.common.cache.CacheBuilder;
  5. import com.google.common.hash.Hashing;
  6. import com.google.common.util.concurrent.RateLimiter;
  7. import com.provectus.kafka.ui.config.ClustersProperties;
  8. import com.provectus.kafka.ui.emitter.BackwardRecordEmitter;
  9. import com.provectus.kafka.ui.emitter.Cursor;
  10. import com.provectus.kafka.ui.emitter.ForwardRecordEmitter;
  11. import com.provectus.kafka.ui.emitter.MessageFilters;
  12. import com.provectus.kafka.ui.emitter.MessagesProcessing;
  13. import com.provectus.kafka.ui.emitter.TailingEmitter;
  14. import com.provectus.kafka.ui.exception.TopicNotFoundException;
  15. import com.provectus.kafka.ui.exception.ValidationException;
  16. import com.provectus.kafka.ui.model.ConsumerPosition;
  17. import com.provectus.kafka.ui.model.CreateTopicMessageDTO;
  18. import com.provectus.kafka.ui.model.KafkaCluster;
  19. import com.provectus.kafka.ui.model.MessageFilterTypeDTO;
  20. import com.provectus.kafka.ui.model.PollingModeDTO;
  21. import com.provectus.kafka.ui.model.SeekDirectionDTO;
  22. import com.provectus.kafka.ui.model.SmartFilterTestExecutionDTO;
  23. import com.provectus.kafka.ui.model.SmartFilterTestExecutionResultDTO;
  24. import com.provectus.kafka.ui.model.TopicMessageDTO;
  25. import com.provectus.kafka.ui.model.TopicMessageEventDTO;
  26. import com.provectus.kafka.ui.serde.api.Serde;
  27. import com.provectus.kafka.ui.serdes.ConsumerRecordDeserializer;
  28. import com.provectus.kafka.ui.serdes.ProducerRecordCreator;
  29. import com.provectus.kafka.ui.util.SslPropertiesUtil;
  30. import java.time.Instant;
  31. import java.time.OffsetDateTime;
  32. import java.time.ZoneOffset;
  33. import java.util.List;
  34. import java.util.Map;
  35. import java.util.Optional;
  36. import java.util.Properties;
  37. import java.util.concurrent.CompletableFuture;
  38. import java.util.concurrent.ThreadLocalRandom;
  39. import java.util.function.Predicate;
  40. import java.util.function.UnaryOperator;
  41. import java.util.stream.Collectors;
  42. import javax.annotation.Nullable;
  43. import lombok.extern.slf4j.Slf4j;
  44. import org.apache.kafka.clients.admin.OffsetSpec;
  45. import org.apache.kafka.clients.admin.TopicDescription;
  46. import org.apache.kafka.clients.producer.KafkaProducer;
  47. import org.apache.kafka.clients.producer.ProducerConfig;
  48. import org.apache.kafka.clients.producer.ProducerRecord;
  49. import org.apache.kafka.clients.producer.RecordMetadata;
  50. import org.apache.kafka.common.TopicPartition;
  51. import org.apache.kafka.common.serialization.ByteArraySerializer;
  52. import org.springframework.stereotype.Service;
  53. import reactor.core.publisher.Flux;
  54. import reactor.core.publisher.Mono;
  55. import reactor.core.scheduler.Schedulers;
  56. @Service
  57. @Slf4j
  58. public class MessagesService {
  59. private static final long SALT_FOR_HASHING = ThreadLocalRandom.current().nextLong();
  60. private static final int DEFAULT_MAX_PAGE_SIZE = 500;
  61. private static final int DEFAULT_PAGE_SIZE = 100;
  62. // limiting UI messages rate to 20/sec in tailing mode
  63. private static final int TAILING_UI_MESSAGE_THROTTLE_RATE = 20;
  64. private final AdminClientService adminClientService;
  65. private final DeserializationService deserializationService;
  66. private final ConsumerGroupService consumerGroupService;
  67. private final int maxPageSize;
  68. private final int defaultPageSize;
  69. private final Cache<String, Predicate<TopicMessageDTO>> registeredFilters = CacheBuilder.newBuilder()
  70. .maximumSize(PollingCursorsStorage.MAX_SIZE)
  71. .build();
  72. private final PollingCursorsStorage cursorsStorage = new PollingCursorsStorage();
  73. public MessagesService(AdminClientService adminClientService,
  74. DeserializationService deserializationService,
  75. ConsumerGroupService consumerGroupService,
  76. ClustersProperties properties) {
  77. this.adminClientService = adminClientService;
  78. this.deserializationService = deserializationService;
  79. this.consumerGroupService = consumerGroupService;
  80. var pollingProps = Optional.ofNullable(properties.getPolling())
  81. .orElseGet(ClustersProperties.PollingProperties::new);
  82. this.maxPageSize = Optional.ofNullable(pollingProps.getMaxPageSize())
  83. .orElse(DEFAULT_MAX_PAGE_SIZE);
  84. this.defaultPageSize = Optional.ofNullable(pollingProps.getDefaultPageSize())
  85. .orElse(DEFAULT_PAGE_SIZE);
  86. }
  87. private Mono<TopicDescription> withExistingTopic(KafkaCluster cluster, String topicName) {
  88. return adminClientService.get(cluster)
  89. .flatMap(client -> client.describeTopic(topicName))
  90. .switchIfEmpty(Mono.error(new TopicNotFoundException()));
  91. }
  92. public static SmartFilterTestExecutionResultDTO execSmartFilterTest(SmartFilterTestExecutionDTO execData) {
  93. Predicate<TopicMessageDTO> predicate;
  94. try {
  95. predicate = MessageFilters.groovyScriptFilter(execData.getFilterCode());
  96. } catch (Exception e) {
  97. log.info("Smart filter '{}' compilation error", execData.getFilterCode(), e);
  98. return new SmartFilterTestExecutionResultDTO()
  99. .error("Compilation error : " + e.getMessage());
  100. }
  101. try {
  102. var result = predicate.test(
  103. new TopicMessageDTO()
  104. .key(execData.getKey())
  105. .content(execData.getValue())
  106. .headers(execData.getHeaders())
  107. .offset(execData.getOffset())
  108. .partition(execData.getPartition())
  109. .timestamp(
  110. Optional.ofNullable(execData.getTimestampMs())
  111. .map(ts -> OffsetDateTime.ofInstant(Instant.ofEpochMilli(ts), ZoneOffset.UTC))
  112. .orElse(null))
  113. );
  114. return new SmartFilterTestExecutionResultDTO()
  115. .result(result);
  116. } catch (Exception e) {
  117. log.info("Smart filter {} execution error", execData, e);
  118. return new SmartFilterTestExecutionResultDTO()
  119. .error("Execution error : " + e.getMessage());
  120. }
  121. }
  122. public Mono<Void> deleteTopicMessages(KafkaCluster cluster, String topicName,
  123. List<Integer> partitionsToInclude) {
  124. return withExistingTopic(cluster, topicName)
  125. .flatMap(td ->
  126. offsetsForDeletion(cluster, topicName, partitionsToInclude)
  127. .flatMap(offsets ->
  128. adminClientService.get(cluster).flatMap(ac -> ac.deleteRecords(offsets))));
  129. }
  130. private Mono<Map<TopicPartition, Long>> offsetsForDeletion(KafkaCluster cluster, String topicName,
  131. List<Integer> partitionsToInclude) {
  132. return adminClientService.get(cluster).flatMap(ac ->
  133. ac.listTopicOffsets(topicName, OffsetSpec.earliest(), true)
  134. .zipWith(ac.listTopicOffsets(topicName, OffsetSpec.latest(), true),
  135. (start, end) ->
  136. end.entrySet().stream()
  137. .filter(e -> partitionsToInclude.isEmpty()
  138. || partitionsToInclude.contains(e.getKey().partition()))
  139. // we only need non-empty partitions (where start offset != end offset)
  140. .filter(entry -> !entry.getValue().equals(start.get(entry.getKey())))
  141. .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)))
  142. );
  143. }
  144. public Mono<RecordMetadata> sendMessage(KafkaCluster cluster, String topic,
  145. CreateTopicMessageDTO msg) {
  146. return withExistingTopic(cluster, topic)
  147. .publishOn(Schedulers.boundedElastic())
  148. .flatMap(desc -> sendMessageImpl(cluster, desc, msg));
  149. }
  150. private Mono<RecordMetadata> sendMessageImpl(KafkaCluster cluster,
  151. TopicDescription topicDescription,
  152. CreateTopicMessageDTO msg) {
  153. if (msg.getPartition() != null
  154. && msg.getPartition() > topicDescription.partitions().size() - 1) {
  155. return Mono.error(new ValidationException("Invalid partition"));
  156. }
  157. ProducerRecordCreator producerRecordCreator =
  158. deserializationService.producerRecordCreator(
  159. cluster,
  160. topicDescription.name(),
  161. msg.getKeySerde().get(),
  162. msg.getValueSerde().get()
  163. );
  164. try (KafkaProducer<byte[], byte[]> producer = createProducer(cluster, Map.of())) {
  165. ProducerRecord<byte[], byte[]> producerRecord = producerRecordCreator.create(
  166. topicDescription.name(),
  167. msg.getPartition(),
  168. msg.getKey().orElse(null),
  169. msg.getContent().orElse(null),
  170. msg.getHeaders()
  171. );
  172. CompletableFuture<RecordMetadata> cf = new CompletableFuture<>();
  173. producer.send(producerRecord, (metadata, exception) -> {
  174. if (exception != null) {
  175. cf.completeExceptionally(exception);
  176. } else {
  177. cf.complete(metadata);
  178. }
  179. });
  180. return Mono.fromFuture(cf);
  181. } catch (Throwable e) {
  182. return Mono.error(e);
  183. }
  184. }
  185. public static KafkaProducer<byte[], byte[]> createProducer(KafkaCluster cluster,
  186. Map<String, Object> additionalProps) {
  187. Properties properties = new Properties();
  188. SslPropertiesUtil.addKafkaSslProperties(cluster.getOriginalProperties().getSsl(), properties);
  189. properties.putAll(cluster.getProperties());
  190. properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, cluster.getBootstrapServers());
  191. properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);
  192. properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);
  193. properties.putAll(additionalProps);
  194. return new KafkaProducer<>(properties);
  195. }
  196. public Flux<TopicMessageEventDTO> loadMessages(KafkaCluster cluster,
  197. String topic,
  198. ConsumerPosition consumerPosition,
  199. @Nullable String containsStringFilter,
  200. @Nullable String filterId,
  201. @Nullable Integer limit,
  202. @Nullable String keySerde,
  203. @Nullable String valueSerde) {
  204. return loadMessages(
  205. cluster,
  206. topic,
  207. deserializationService.deserializerFor(cluster, topic, keySerde, valueSerde),
  208. consumerPosition,
  209. getMsgFilter(containsStringFilter, filterId),
  210. fixPageSize(limit)
  211. );
  212. }
  213. public Flux<TopicMessageEventDTO> loadMessages(KafkaCluster cluster, String topic, String cursorId) {
  214. Cursor cursor = cursorsStorage.getCursor(cursorId)
  215. .orElseThrow(() -> new ValidationException("Next page cursor not found. Maybe it was evicted from cache."));
  216. return loadMessages(
  217. cluster,
  218. topic,
  219. cursor.deserializer(),
  220. cursor.consumerPosition(),
  221. cursor.filter(),
  222. cursor.limit()
  223. );
  224. }
  225. private Flux<TopicMessageEventDTO> loadMessages(KafkaCluster cluster,
  226. String topic,
  227. ConsumerRecordDeserializer deserializer,
  228. ConsumerPosition consumerPosition,
  229. Predicate<TopicMessageDTO> filter,
  230. int limit) {
  231. return withExistingTopic(cluster, topic)
  232. .flux()
  233. .publishOn(Schedulers.boundedElastic())
  234. .flatMap(td -> loadMessagesImpl(cluster, topic, deserializer, consumerPosition, filter, limit));
  235. }
  236. private Flux<TopicMessageEventDTO> loadMessagesImpl(KafkaCluster cluster,
  237. String topic,
  238. ConsumerRecordDeserializer deserializer,
  239. ConsumerPosition consumerPosition,
  240. Predicate<TopicMessageDTO> filter,
  241. int limit) {
  242. var processing = new MessagesProcessing(
  243. deserializer,
  244. filter,
  245. consumerPosition.pollingMode() == PollingModeDTO.TAILING ? null : limit
  246. );
  247. var emitter = switch (consumerPosition.pollingMode()) {
  248. case TO_OFFSET, TO_TIMESTAMP, LATEST -> new BackwardRecordEmitter(
  249. () -> consumerGroupService.createConsumer(cluster),
  250. consumerPosition,
  251. limit,
  252. processing,
  253. cluster.getPollingSettings(),
  254. new Cursor.Tracking(deserializer, consumerPosition, filter, limit, cursorsStorage::register)
  255. );
  256. case FROM_OFFSET, FROM_TIMESTAMP, EARLIEST -> new ForwardRecordEmitter(
  257. () -> consumerGroupService.createConsumer(cluster),
  258. consumerPosition,
  259. processing,
  260. cluster.getPollingSettings(),
  261. new Cursor.Tracking(deserializer, consumerPosition, filter, limit, cursorsStorage::register)
  262. );
  263. case TAILING -> new TailingEmitter(
  264. () -> consumerGroupService.createConsumer(cluster),
  265. consumerPosition,
  266. processing,
  267. cluster.getPollingSettings()
  268. );
  269. };
  270. return Flux.create(emitter)
  271. .map(getDataMasker(cluster, topic))
  272. .map(throttleUiPublish(consumerPosition.pollingMode()));
  273. }
  274. private int fixPageSize(@Nullable Integer pageSize) {
  275. return Optional.ofNullable(pageSize)
  276. .filter(ps -> ps > 0 && ps <= maxPageSize)
  277. .orElse(defaultPageSize);
  278. }
  279. public String registerMessageFilter(String groovyCode) {
  280. String saltedCode = groovyCode + SALT_FOR_HASHING;
  281. String filterId = Hashing.sha256()
  282. .hashString(saltedCode, Charsets.UTF_8)
  283. .toString()
  284. .substring(0, 8);
  285. if (registeredFilters.getIfPresent(filterId) == null) {
  286. registeredFilters.put(filterId, MessageFilters.groovyScriptFilter(groovyCode));
  287. }
  288. return filterId;
  289. }
  290. private UnaryOperator<TopicMessageEventDTO> getDataMasker(KafkaCluster cluster, String topicName) {
  291. var keyMasker = cluster.getMasking().getMaskingFunction(topicName, Serde.Target.KEY);
  292. var valMasker = cluster.getMasking().getMaskingFunction(topicName, Serde.Target.VALUE);
  293. return evt -> {
  294. if (evt.getType() != TopicMessageEventDTO.TypeEnum.MESSAGE) {
  295. return evt;
  296. }
  297. return evt.message(
  298. evt.getMessage()
  299. .key(keyMasker.apply(evt.getMessage().getKey()))
  300. .content(valMasker.apply(evt.getMessage().getContent())));
  301. };
  302. }
  303. private Predicate<TopicMessageDTO> getMsgFilter(@Nullable String containsStrFilter,
  304. @Nullable String smartFilterId) {
  305. Predicate<TopicMessageDTO> messageFilter = MessageFilters.noop();
  306. if (containsStrFilter != null) {
  307. messageFilter = messageFilter.and(MessageFilters.containsStringFilter(containsStrFilter));
  308. }
  309. if (smartFilterId != null) {
  310. var registered = registeredFilters.getIfPresent(smartFilterId);
  311. if (registered == null) {
  312. throw new ValidationException("No filter was registered with id " + smartFilterId);
  313. }
  314. messageFilter = messageFilter.and(registered);
  315. }
  316. return messageFilter;
  317. }
  318. private <T> UnaryOperator<T> throttleUiPublish(PollingModeDTO pollingMode) {
  319. if (pollingMode == PollingModeDTO.TAILING) {
  320. RateLimiter rateLimiter = RateLimiter.create(TAILING_UI_MESSAGE_THROTTLE_RATE);
  321. return m -> {
  322. rateLimiter.acquire(1);
  323. return m;
  324. };
  325. }
  326. // there is no need to throttle UI production rate for non-tailing modes, since max number of produced
  327. // messages is limited for them (with page size)
  328. return UnaryOperator.identity();
  329. }
  330. }