MessagesController.java 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235
  1. package com.provectus.kafka.ui.controller;
  2. import static com.provectus.kafka.ui.model.rbac.permission.TopicAction.MESSAGES_DELETE;
  3. import static com.provectus.kafka.ui.model.rbac.permission.TopicAction.MESSAGES_PRODUCE;
  4. import static com.provectus.kafka.ui.model.rbac.permission.TopicAction.MESSAGES_READ;
  5. import static com.provectus.kafka.ui.serde.api.Serde.Target.KEY;
  6. import static com.provectus.kafka.ui.serde.api.Serde.Target.VALUE;
  7. import static java.util.stream.Collectors.toMap;
  8. import com.provectus.kafka.ui.api.MessagesApi;
  9. import com.provectus.kafka.ui.exception.ValidationException;
  10. import com.provectus.kafka.ui.model.ConsumerPosition;
  11. import com.provectus.kafka.ui.model.CreateTopicMessageDTO;
  12. import com.provectus.kafka.ui.model.MessageFilterIdDTO;
  13. import com.provectus.kafka.ui.model.MessageFilterRegistrationDTO;
  14. import com.provectus.kafka.ui.model.MessageFilterTypeDTO;
  15. import com.provectus.kafka.ui.model.PollingModeDTO;
  16. import com.provectus.kafka.ui.model.SeekDirectionDTO;
  17. import com.provectus.kafka.ui.model.SeekTypeDTO;
  18. import com.provectus.kafka.ui.model.SerdeUsageDTO;
  19. import com.provectus.kafka.ui.model.TopicMessageEventDTO;
  20. import com.provectus.kafka.ui.model.TopicSerdeSuggestionDTO;
  21. import com.provectus.kafka.ui.model.rbac.AccessContext;
  22. import com.provectus.kafka.ui.model.rbac.permission.TopicAction;
  23. import com.provectus.kafka.ui.service.DeserializationService;
  24. import com.provectus.kafka.ui.service.MessagesService;
  25. import com.provectus.kafka.ui.service.rbac.AccessControlService;
  26. import java.util.List;
  27. import java.util.Map;
  28. import java.util.Optional;
  29. import javax.annotation.Nullable;
  30. import javax.validation.Valid;
  31. import lombok.RequiredArgsConstructor;
  32. import lombok.extern.slf4j.Slf4j;
  33. import org.apache.commons.lang3.tuple.Pair;
  34. import org.apache.kafka.common.TopicPartition;
  35. import org.springframework.http.ResponseEntity;
  36. import org.springframework.web.bind.annotation.RestController;
  37. import org.springframework.web.server.ServerWebExchange;
  38. import reactor.core.publisher.Flux;
  39. import reactor.core.publisher.Mono;
  40. import reactor.core.scheduler.Schedulers;
  41. @RestController
  42. @RequiredArgsConstructor
  43. @Slf4j
  44. public class MessagesController extends AbstractController implements MessagesApi {
  45. private static final int MAX_LOAD_RECORD_LIMIT = 100;
  46. private static final int DEFAULT_LOAD_RECORD_LIMIT = 20;
  47. private final MessagesService messagesService;
  48. private final DeserializationService deserializationService;
  49. private final AccessControlService accessControlService;
  50. @Override
  51. public Mono<ResponseEntity<Void>> deleteTopicMessages(
  52. String clusterName, String topicName, @Valid List<Integer> partitions,
  53. ServerWebExchange exchange) {
  54. Mono<Void> validateAccess = accessControlService.validateAccess(AccessContext.builder()
  55. .cluster(clusterName)
  56. .topic(topicName)
  57. .topicActions(MESSAGES_DELETE)
  58. .build());
  59. return validateAccess.then(
  60. messagesService.deleteTopicMessages(
  61. getCluster(clusterName),
  62. topicName,
  63. Optional.ofNullable(partitions).orElse(List.of())
  64. ).thenReturn(ResponseEntity.ok().build())
  65. );
  66. }
  67. @Deprecated
  68. @Override
  69. public Mono<ResponseEntity<Flux<TopicMessageEventDTO>>> getTopicMessages(String clusterName,
  70. String topicName,
  71. SeekTypeDTO seekType,
  72. List<String> seekTo,
  73. Integer limit,
  74. String q,
  75. MessageFilterTypeDTO filterQueryType,
  76. SeekDirectionDTO seekDirection,
  77. String keySerde,
  78. String valueSerde,
  79. ServerWebExchange exchange) {
  80. final Mono<Void> validateAccess = accessControlService.validateAccess(AccessContext.builder()
  81. .cluster(clusterName)
  82. .topic(topicName)
  83. .topicActions(MESSAGES_READ)
  84. .build());
  85. seekType = seekType != null ? seekType : SeekTypeDTO.BEGINNING;
  86. seekDirection = seekDirection != null ? seekDirection : SeekDirectionDTO.FORWARD;
  87. filterQueryType = filterQueryType != null ? filterQueryType : MessageFilterTypeDTO.STRING_CONTAINS;
  88. int recordsLimit =
  89. Optional.ofNullable(limit).map(s -> Math.min(s, MAX_LOAD_RECORD_LIMIT)).orElse(DEFAULT_LOAD_RECORD_LIMIT);
  90. var positions = new ConsumerPosition(
  91. seekType,
  92. topicName,
  93. parseSeekTo(topicName, seekType, seekTo)
  94. );
  95. Mono<ResponseEntity<Flux<TopicMessageEventDTO>>> job = Mono.just(
  96. ResponseEntity.ok(
  97. messagesService.loadMessages(
  98. getCluster(clusterName), topicName, positions, q, filterQueryType,
  99. recordsLimit, seekDirection, keySerde, valueSerde)
  100. )
  101. );
  102. return validateAccess.then(job);
  103. }
  104. @Override
  105. public Mono<ResponseEntity<Void>> sendTopicMessages(
  106. String clusterName, String topicName, @Valid Mono<CreateTopicMessageDTO> createTopicMessage,
  107. ServerWebExchange exchange) {
  108. Mono<Void> validateAccess = accessControlService.validateAccess(AccessContext.builder()
  109. .cluster(clusterName)
  110. .topic(topicName)
  111. .topicActions(MESSAGES_PRODUCE)
  112. .build());
  113. return validateAccess.then(
  114. createTopicMessage.flatMap(msg ->
  115. messagesService.sendMessage(getCluster(clusterName), topicName, msg).then()
  116. ).map(ResponseEntity::ok)
  117. );
  118. }
  119. /**
  120. * The format is [partition]::[offset] for specifying offsets
  121. * or [partition]::[timestamp in millis] for specifying timestamps.
  122. */
  123. @Nullable
  124. private Map<TopicPartition, Long> parseSeekTo(String topic, SeekTypeDTO seekType, List<String> seekTo) {
  125. if (seekTo == null || seekTo.isEmpty()) {
  126. if (seekType == SeekTypeDTO.LATEST || seekType == SeekTypeDTO.BEGINNING) {
  127. return null;
  128. }
  129. throw new ValidationException("seekTo should be set if seekType is " + seekType);
  130. }
  131. return seekTo.stream()
  132. .map(p -> {
  133. String[] split = p.split("::");
  134. if (split.length != 2) {
  135. throw new IllegalArgumentException(
  136. "Wrong seekTo argument format. See API docs for details");
  137. }
  138. return Pair.of(
  139. new TopicPartition(topic, Integer.parseInt(split[0])),
  140. Long.parseLong(split[1])
  141. );
  142. })
  143. .collect(toMap(Pair::getKey, Pair::getValue));
  144. }
  145. @Override
  146. public Mono<ResponseEntity<TopicSerdeSuggestionDTO>> getSerdes(String clusterName,
  147. String topicName,
  148. SerdeUsageDTO use,
  149. ServerWebExchange exchange) {
  150. Mono<Void> validateAccess = accessControlService.validateAccess(AccessContext.builder()
  151. .cluster(clusterName)
  152. .topic(topicName)
  153. .topicActions(TopicAction.VIEW)
  154. .build());
  155. TopicSerdeSuggestionDTO dto = new TopicSerdeSuggestionDTO()
  156. .key(use == SerdeUsageDTO.SERIALIZE
  157. ? deserializationService.getSerdesForSerialize(getCluster(clusterName), topicName, KEY)
  158. : deserializationService.getSerdesForDeserialize(getCluster(clusterName), topicName, KEY))
  159. .value(use == SerdeUsageDTO.SERIALIZE
  160. ? deserializationService.getSerdesForSerialize(getCluster(clusterName), topicName, VALUE)
  161. : deserializationService.getSerdesForDeserialize(getCluster(clusterName), topicName, VALUE));
  162. return validateAccess.then(
  163. Mono.just(dto)
  164. .subscribeOn(Schedulers.boundedElastic())
  165. .map(ResponseEntity::ok)
  166. );
  167. }
  168. @Override
  169. public Mono<ResponseEntity<Flux<TopicMessageEventDTO>>> getTopicMessagesV2(String clusterName, String topicName,
  170. PollingModeDTO mode,
  171. @Nullable List<Integer> partitions,
  172. @Nullable Integer limit,
  173. @Nullable String query,
  174. @Nullable String filterId,
  175. @Nullable String offsetString,
  176. @Nullable Long ts,
  177. @Nullable String ks,
  178. @Nullable String vs,
  179. ServerWebExchange exchange) {
  180. final Mono<Void> validateAccess = accessControlService.validateAccess(AccessContext.builder()
  181. .cluster(clusterName)
  182. .topic(topicName)
  183. .topicActions(MESSAGES_READ)
  184. .build());
  185. int recordsLimit =
  186. Optional.ofNullable(limit).map(s -> Math.min(s, MAX_LOAD_RECORD_LIMIT)).orElse(DEFAULT_LOAD_RECORD_LIMIT);
  187. return validateAccess.then(
  188. Mono.just(
  189. ResponseEntity.ok(
  190. messagesService.loadMessagesV2(
  191. getCluster(clusterName), topicName, positions, q, filterQueryType,
  192. recordsLimit, seekDirection, keySerde, valueSerde)
  193. )
  194. )
  195. );
  196. }
  197. interface PollingMode {
  198. static PollingMode create(PollingModeDTO mode, @Nullable String offsetString, @Nullable Long timestamp) {
  199. return null;
  200. }
  201. }
  202. @Override
  203. public Mono<ResponseEntity<Flux<MessageFilterIdDTO>>> registerFilter(String clusterName, String topicName,
  204. Mono<MessageFilterRegistrationDTO> messageFilterRegistrationDTO,
  205. ServerWebExchange exchange) {
  206. return null;
  207. }
  208. }