MessagesController.java 7.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185
  1. package com.provectus.kafka.ui.controller;
  2. import static com.provectus.kafka.ui.model.rbac.permission.TopicAction.MESSAGES_DELETE;
  3. import static com.provectus.kafka.ui.model.rbac.permission.TopicAction.MESSAGES_PRODUCE;
  4. import static com.provectus.kafka.ui.model.rbac.permission.TopicAction.MESSAGES_READ;
  5. import static com.provectus.kafka.ui.serde.api.Serde.Target.KEY;
  6. import static com.provectus.kafka.ui.serde.api.Serde.Target.VALUE;
  7. import static java.util.stream.Collectors.toMap;
  8. import com.provectus.kafka.ui.api.MessagesApi;
  9. import com.provectus.kafka.ui.exception.ValidationException;
  10. import com.provectus.kafka.ui.model.ConsumerPosition;
  11. import com.provectus.kafka.ui.model.CreateTopicMessageDTO;
  12. import com.provectus.kafka.ui.model.MessageFilterTypeDTO;
  13. import com.provectus.kafka.ui.model.SeekDirectionDTO;
  14. import com.provectus.kafka.ui.model.SeekTypeDTO;
  15. import com.provectus.kafka.ui.model.SerdeUsageDTO;
  16. import com.provectus.kafka.ui.model.TopicMessageEventDTO;
  17. import com.provectus.kafka.ui.model.TopicSerdeSuggestionDTO;
  18. import com.provectus.kafka.ui.model.rbac.AccessContext;
  19. import com.provectus.kafka.ui.model.rbac.permission.TopicAction;
  20. import com.provectus.kafka.ui.service.DeserializationService;
  21. import com.provectus.kafka.ui.service.MessagesService;
  22. import com.provectus.kafka.ui.service.rbac.AccessControlService;
  23. import java.util.List;
  24. import java.util.Map;
  25. import java.util.Optional;
  26. import javax.annotation.Nullable;
  27. import javax.validation.Valid;
  28. import lombok.RequiredArgsConstructor;
  29. import lombok.extern.slf4j.Slf4j;
  30. import org.apache.commons.lang3.tuple.Pair;
  31. import org.apache.kafka.common.TopicPartition;
  32. import org.springframework.http.ResponseEntity;
  33. import org.springframework.web.bind.annotation.RestController;
  34. import org.springframework.web.server.ServerWebExchange;
  35. import reactor.core.publisher.Flux;
  36. import reactor.core.publisher.Mono;
  37. import reactor.core.scheduler.Schedulers;
  38. @RestController
  39. @RequiredArgsConstructor
  40. @Slf4j
  41. public class MessagesController extends AbstractController implements MessagesApi {
  42. private static final int MAX_LOAD_RECORD_LIMIT = 100;
  43. private static final int DEFAULT_LOAD_RECORD_LIMIT = 20;
  44. private final MessagesService messagesService;
  45. private final DeserializationService deserializationService;
  46. private final AccessControlService accessControlService;
  47. @Override
  48. public Mono<ResponseEntity<Void>> deleteTopicMessages(
  49. String clusterName, String topicName, @Valid List<Integer> partitions,
  50. ServerWebExchange exchange) {
  51. Mono<Void> validateAccess = accessControlService.validateAccess(AccessContext.builder()
  52. .cluster(clusterName)
  53. .topic(topicName)
  54. .topicActions(MESSAGES_DELETE)
  55. .build());
  56. return validateAccess.then(
  57. messagesService.deleteTopicMessages(
  58. getCluster(clusterName),
  59. topicName,
  60. Optional.ofNullable(partitions).orElse(List.of())
  61. ).thenReturn(ResponseEntity.ok().build())
  62. );
  63. }
  64. @Override
  65. public Mono<ResponseEntity<Flux<TopicMessageEventDTO>>> getTopicMessages(String clusterName,
  66. String topicName,
  67. SeekTypeDTO seekType,
  68. List<String> seekTo,
  69. Integer limit,
  70. String q,
  71. MessageFilterTypeDTO filterQueryType,
  72. SeekDirectionDTO seekDirection,
  73. String keySerde,
  74. String valueSerde,
  75. ServerWebExchange exchange) {
  76. final Mono<Void> validateAccess = accessControlService.validateAccess(AccessContext.builder()
  77. .cluster(clusterName)
  78. .topic(topicName)
  79. .topicActions(MESSAGES_READ)
  80. .build());
  81. seekType = seekType != null ? seekType : SeekTypeDTO.BEGINNING;
  82. seekDirection = seekDirection != null ? seekDirection : SeekDirectionDTO.FORWARD;
  83. filterQueryType = filterQueryType != null ? filterQueryType : MessageFilterTypeDTO.STRING_CONTAINS;
  84. int recordsLimit =
  85. Optional.ofNullable(limit).map(s -> Math.min(s, MAX_LOAD_RECORD_LIMIT)).orElse(DEFAULT_LOAD_RECORD_LIMIT);
  86. var positions = new ConsumerPosition(
  87. seekType,
  88. topicName,
  89. parseSeekTo(topicName, seekType, seekTo)
  90. );
  91. Mono<ResponseEntity<Flux<TopicMessageEventDTO>>> job = Mono.just(
  92. ResponseEntity.ok(
  93. messagesService.loadMessages(
  94. getCluster(clusterName), topicName, positions, q, filterQueryType,
  95. recordsLimit, seekDirection, keySerde, valueSerde)
  96. )
  97. );
  98. return validateAccess.then(job);
  99. }
  100. @Override
  101. public Mono<ResponseEntity<Void>> sendTopicMessages(
  102. String clusterName, String topicName, @Valid Mono<CreateTopicMessageDTO> createTopicMessage,
  103. ServerWebExchange exchange) {
  104. Mono<Void> validateAccess = accessControlService.validateAccess(AccessContext.builder()
  105. .cluster(clusterName)
  106. .topic(topicName)
  107. .topicActions(MESSAGES_PRODUCE)
  108. .build());
  109. return validateAccess.then(
  110. createTopicMessage.flatMap(msg ->
  111. messagesService.sendMessage(getCluster(clusterName), topicName, msg).then()
  112. ).map(ResponseEntity::ok)
  113. );
  114. }
  115. /**
  116. * The format is [partition]::[offset] for specifying offsets
  117. * or [partition]::[timestamp in millis] for specifying timestamps.
  118. */
  119. @Nullable
  120. private Map<TopicPartition, Long> parseSeekTo(String topic, SeekTypeDTO seekType, List<String> seekTo) {
  121. if (seekTo == null || seekTo.isEmpty()) {
  122. if (seekType == SeekTypeDTO.LATEST || seekType == SeekTypeDTO.BEGINNING) {
  123. return null;
  124. }
  125. throw new ValidationException("seekTo should be set if seekType is " + seekType);
  126. }
  127. return seekTo.stream()
  128. .map(p -> {
  129. String[] split = p.split("::");
  130. if (split.length != 2) {
  131. throw new IllegalArgumentException(
  132. "Wrong seekTo argument format. See API docs for details");
  133. }
  134. return Pair.of(
  135. new TopicPartition(topic, Integer.parseInt(split[0])),
  136. Long.parseLong(split[1])
  137. );
  138. })
  139. .collect(toMap(Pair::getKey, Pair::getValue));
  140. }
  141. @Override
  142. public Mono<ResponseEntity<TopicSerdeSuggestionDTO>> getSerdes(String clusterName,
  143. String topicName,
  144. SerdeUsageDTO use,
  145. ServerWebExchange exchange) {
  146. Mono<Void> validateAccess = accessControlService.validateAccess(AccessContext.builder()
  147. .cluster(clusterName)
  148. .topic(topicName)
  149. .topicActions(TopicAction.VIEW)
  150. .build());
  151. TopicSerdeSuggestionDTO dto = new TopicSerdeSuggestionDTO()
  152. .key(use == SerdeUsageDTO.SERIALIZE
  153. ? deserializationService.getSerdesForSerialize(getCluster(clusterName), topicName, KEY)
  154. : deserializationService.getSerdesForDeserialize(getCluster(clusterName), topicName, KEY))
  155. .value(use == SerdeUsageDTO.SERIALIZE
  156. ? deserializationService.getSerdesForSerialize(getCluster(clusterName), topicName, VALUE)
  157. : deserializationService.getSerdesForDeserialize(getCluster(clusterName), topicName, VALUE));
  158. return validateAccess.then(
  159. Mono.just(dto)
  160. .subscribeOn(Schedulers.boundedElastic())
  161. .map(ResponseEntity::ok)
  162. );
  163. }
  164. }