MessagesController.java 8.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205
  1. package com.provectus.kafka.ui.controller;
  2. import static com.provectus.kafka.ui.model.rbac.permission.TopicAction.MESSAGES_DELETE;
  3. import static com.provectus.kafka.ui.model.rbac.permission.TopicAction.MESSAGES_PRODUCE;
  4. import static com.provectus.kafka.ui.model.rbac.permission.TopicAction.MESSAGES_READ;
  5. import static com.provectus.kafka.ui.serde.api.Serde.Target.KEY;
  6. import static com.provectus.kafka.ui.serde.api.Serde.Target.VALUE;
  7. import static java.util.stream.Collectors.toMap;
  8. import com.provectus.kafka.ui.api.MessagesApi;
  9. import com.provectus.kafka.ui.exception.ValidationException;
  10. import com.provectus.kafka.ui.model.ConsumerPosition;
  11. import com.provectus.kafka.ui.model.CreateTopicMessageDTO;
  12. import com.provectus.kafka.ui.model.MessageFilterTypeDTO;
  13. import com.provectus.kafka.ui.model.SeekDirectionDTO;
  14. import com.provectus.kafka.ui.model.SeekTypeDTO;
  15. import com.provectus.kafka.ui.model.SerdeUsageDTO;
  16. import com.provectus.kafka.ui.model.SmartFilterTestExecutionDTO;
  17. import com.provectus.kafka.ui.model.SmartFilterTestExecutionResultDTO;
  18. import com.provectus.kafka.ui.model.TopicMessageEventDTO;
  19. import com.provectus.kafka.ui.model.TopicSerdeSuggestionDTO;
  20. import com.provectus.kafka.ui.model.rbac.AccessContext;
  21. import com.provectus.kafka.ui.model.rbac.permission.AuditAction;
  22. import com.provectus.kafka.ui.model.rbac.permission.TopicAction;
  23. import com.provectus.kafka.ui.service.DeserializationService;
  24. import com.provectus.kafka.ui.service.MessagesService;
  25. import com.provectus.kafka.ui.service.audit.AuditService;
  26. import com.provectus.kafka.ui.service.rbac.AccessControlService;
  27. import java.util.List;
  28. import java.util.Map;
  29. import java.util.Optional;
  30. import javax.annotation.Nullable;
  31. import javax.validation.Valid;
  32. import lombok.RequiredArgsConstructor;
  33. import lombok.extern.slf4j.Slf4j;
  34. import org.apache.commons.lang3.tuple.Pair;
  35. import org.apache.kafka.common.TopicPartition;
  36. import org.springframework.http.ResponseEntity;
  37. import org.springframework.web.bind.annotation.RestController;
  38. import org.springframework.web.server.ServerWebExchange;
  39. import reactor.core.publisher.Flux;
  40. import reactor.core.publisher.Mono;
  41. import reactor.core.scheduler.Schedulers;
  42. @RestController
  43. @RequiredArgsConstructor
  44. @Slf4j
  45. public class MessagesController extends AbstractController implements MessagesApi {
  46. private final MessagesService messagesService;
  47. private final DeserializationService deserializationService;
  48. private final AccessControlService accessControlService;
  49. private final AuditService auditService;
  50. @Override
  51. public Mono<ResponseEntity<Void>> deleteTopicMessages(
  52. String clusterName, String topicName, @Valid List<Integer> partitions,
  53. ServerWebExchange exchange) {
  54. var context = AccessContext.builder()
  55. .cluster(clusterName)
  56. .topic(topicName)
  57. .topicActions(MESSAGES_DELETE)
  58. .build();
  59. return accessControlService.validateAccess(context).<ResponseEntity<Void>>then(
  60. messagesService.deleteTopicMessages(
  61. getCluster(clusterName),
  62. topicName,
  63. Optional.ofNullable(partitions).orElse(List.of())
  64. ).thenReturn(ResponseEntity.ok().build())
  65. ).doOnEach(sig -> auditService.audit(context, sig));
  66. }
  67. @Override
  68. public Mono<ResponseEntity<SmartFilterTestExecutionResultDTO>> executeSmartFilterTest(
  69. Mono<SmartFilterTestExecutionDTO> smartFilterTestExecutionDto, ServerWebExchange exchange) {
  70. return smartFilterTestExecutionDto
  71. .map(MessagesService::execSmartFilterTest)
  72. .map(ResponseEntity::ok);
  73. }
  74. @Override
  75. public Mono<ResponseEntity<Flux<TopicMessageEventDTO>>> getTopicMessages(String clusterName,
  76. String topicName,
  77. SeekTypeDTO seekType,
  78. List<String> seekTo,
  79. Integer limit,
  80. String q,
  81. MessageFilterTypeDTO filterQueryType,
  82. SeekDirectionDTO seekDirection,
  83. String keySerde,
  84. String valueSerde,
  85. ServerWebExchange exchange) {
  86. var contextBuilder = AccessContext.builder()
  87. .cluster(clusterName)
  88. .topic(topicName)
  89. .topicActions(MESSAGES_READ)
  90. .operationName("getTopicMessages");
  91. if (auditService.isAuditTopic(getCluster(clusterName), topicName)) {
  92. contextBuilder.auditActions(AuditAction.VIEW);
  93. }
  94. seekType = seekType != null ? seekType : SeekTypeDTO.BEGINNING;
  95. seekDirection = seekDirection != null ? seekDirection : SeekDirectionDTO.FORWARD;
  96. filterQueryType = filterQueryType != null ? filterQueryType : MessageFilterTypeDTO.STRING_CONTAINS;
  97. var positions = new ConsumerPosition(
  98. seekType,
  99. topicName,
  100. parseSeekTo(topicName, seekType, seekTo)
  101. );
  102. Mono<ResponseEntity<Flux<TopicMessageEventDTO>>> job = Mono.just(
  103. ResponseEntity.ok(
  104. messagesService.loadMessages(
  105. getCluster(clusterName), topicName, positions, q, filterQueryType,
  106. limit, seekDirection, keySerde, valueSerde)
  107. )
  108. );
  109. var context = contextBuilder.build();
  110. return accessControlService.validateAccess(context)
  111. .then(job)
  112. .doOnEach(sig -> auditService.audit(context, sig));
  113. }
  114. @Override
  115. public Mono<ResponseEntity<Void>> sendTopicMessages(
  116. String clusterName, String topicName, @Valid Mono<CreateTopicMessageDTO> createTopicMessage,
  117. ServerWebExchange exchange) {
  118. var context = AccessContext.builder()
  119. .cluster(clusterName)
  120. .topic(topicName)
  121. .topicActions(MESSAGES_PRODUCE)
  122. .operationName("sendTopicMessages")
  123. .build();
  124. return accessControlService.validateAccess(context).then(
  125. createTopicMessage.flatMap(msg ->
  126. messagesService.sendMessage(getCluster(clusterName), topicName, msg).then()
  127. ).map(ResponseEntity::ok)
  128. ).doOnEach(sig -> auditService.audit(context, sig));
  129. }
  130. /**
  131. * The format is [partition]::[offset] for specifying offsets
  132. * or [partition]::[timestamp in millis] for specifying timestamps.
  133. */
  134. @Nullable
  135. private Map<TopicPartition, Long> parseSeekTo(String topic, SeekTypeDTO seekType, List<String> seekTo) {
  136. if (seekTo == null || seekTo.isEmpty()) {
  137. if (seekType == SeekTypeDTO.LATEST || seekType == SeekTypeDTO.BEGINNING) {
  138. return null;
  139. }
  140. throw new ValidationException("seekTo should be set if seekType is " + seekType);
  141. }
  142. return seekTo.stream()
  143. .map(p -> {
  144. String[] split = p.split("::");
  145. if (split.length != 2) {
  146. throw new IllegalArgumentException(
  147. "Wrong seekTo argument format. See API docs for details");
  148. }
  149. return Pair.of(
  150. new TopicPartition(topic, Integer.parseInt(split[0])),
  151. Long.parseLong(split[1])
  152. );
  153. })
  154. .collect(toMap(Pair::getKey, Pair::getValue));
  155. }
  156. @Override
  157. public Mono<ResponseEntity<TopicSerdeSuggestionDTO>> getSerdes(String clusterName,
  158. String topicName,
  159. SerdeUsageDTO use,
  160. ServerWebExchange exchange) {
  161. var context = AccessContext.builder()
  162. .cluster(clusterName)
  163. .topic(topicName)
  164. .topicActions(TopicAction.VIEW)
  165. .operationName("getSerdes")
  166. .build();
  167. TopicSerdeSuggestionDTO dto = new TopicSerdeSuggestionDTO()
  168. .key(use == SerdeUsageDTO.SERIALIZE
  169. ? deserializationService.getSerdesForSerialize(getCluster(clusterName), topicName, KEY)
  170. : deserializationService.getSerdesForDeserialize(getCluster(clusterName), topicName, KEY))
  171. .value(use == SerdeUsageDTO.SERIALIZE
  172. ? deserializationService.getSerdesForSerialize(getCluster(clusterName), topicName, VALUE)
  173. : deserializationService.getSerdesForDeserialize(getCluster(clusterName), topicName, VALUE));
  174. return accessControlService.validateAccess(context).then(
  175. Mono.just(dto)
  176. .subscribeOn(Schedulers.boundedElastic())
  177. .map(ResponseEntity::ok)
  178. );
  179. }
  180. }