MessagesController.java 9.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197
  1. package com.provectus.kafka.ui.controller;
  2. import static com.provectus.kafka.ui.model.rbac.permission.TopicAction.MESSAGES_DELETE;
  3. import static com.provectus.kafka.ui.model.rbac.permission.TopicAction.MESSAGES_PRODUCE;
  4. import static com.provectus.kafka.ui.model.rbac.permission.TopicAction.MESSAGES_READ;
  5. import static com.provectus.kafka.ui.serde.api.Serde.Target.KEY;
  6. import static com.provectus.kafka.ui.serde.api.Serde.Target.VALUE;
  7. import com.provectus.kafka.ui.api.MessagesApi;
  8. import com.provectus.kafka.ui.model.ConsumerPosition;
  9. import com.provectus.kafka.ui.model.CreateTopicMessageDTO;
  10. import com.provectus.kafka.ui.model.MessageFilterIdDTO;
  11. import com.provectus.kafka.ui.model.MessageFilterRegistrationDTO;
  12. import com.provectus.kafka.ui.model.MessageFilterTypeDTO;
  13. import com.provectus.kafka.ui.model.PollingModeDTO;
  14. import com.provectus.kafka.ui.model.SeekDirectionDTO;
  15. import com.provectus.kafka.ui.model.SeekTypeDTO;
  16. import com.provectus.kafka.ui.model.SerdeUsageDTO;
  17. import com.provectus.kafka.ui.model.SmartFilterTestExecutionDTO;
  18. import com.provectus.kafka.ui.model.SmartFilterTestExecutionResultDTO;
  19. import com.provectus.kafka.ui.model.TopicMessageEventDTO;
  20. import com.provectus.kafka.ui.model.TopicSerdeSuggestionDTO;
  21. import com.provectus.kafka.ui.model.rbac.AccessContext;
  22. import com.provectus.kafka.ui.model.rbac.permission.AuditAction;
  23. import com.provectus.kafka.ui.model.rbac.permission.TopicAction;
  24. import com.provectus.kafka.ui.service.DeserializationService;
  25. import com.provectus.kafka.ui.service.MessagesService;
  26. import com.provectus.kafka.ui.service.audit.AuditService;
  27. import com.provectus.kafka.ui.service.rbac.AccessControlService;
  28. import java.util.List;
  29. import java.util.Optional;
  30. import javax.validation.Valid;
  31. import javax.validation.ValidationException;
  32. import lombok.RequiredArgsConstructor;
  33. import lombok.extern.slf4j.Slf4j;
  34. import org.springframework.http.ResponseEntity;
  35. import org.springframework.web.bind.annotation.RestController;
  36. import org.springframework.web.server.ServerWebExchange;
  37. import reactor.core.publisher.Flux;
  38. import reactor.core.publisher.Mono;
  39. import reactor.core.scheduler.Schedulers;
  40. @RestController
  41. @RequiredArgsConstructor
  42. @Slf4j
  43. public class MessagesController extends AbstractController implements MessagesApi {
  44. private final MessagesService messagesService;
  45. private final DeserializationService deserializationService;
  46. private final AccessControlService accessControlService;
  47. private final AuditService auditService;
  48. @Override
  49. public Mono<ResponseEntity<Void>> deleteTopicMessages(
  50. String clusterName, String topicName, @Valid List<Integer> partitions,
  51. ServerWebExchange exchange) {
  52. var context = AccessContext.builder()
  53. .cluster(clusterName)
  54. .topic(topicName)
  55. .topicActions(MESSAGES_DELETE)
  56. .build();
  57. return accessControlService.validateAccess(context).<ResponseEntity<Void>>then(
  58. messagesService.deleteTopicMessages(
  59. getCluster(clusterName),
  60. topicName,
  61. Optional.ofNullable(partitions).orElse(List.of())
  62. ).thenReturn(ResponseEntity.ok().build())
  63. ).doOnEach(sig -> auditService.audit(context, sig));
  64. }
  65. @Override
  66. public Mono<ResponseEntity<SmartFilterTestExecutionResultDTO>> executeSmartFilterTest(
  67. Mono<SmartFilterTestExecutionDTO> smartFilterTestExecutionDto, ServerWebExchange exchange) {
  68. return smartFilterTestExecutionDto
  69. .map(MessagesService::execSmartFilterTest)
  70. .map(ResponseEntity::ok);
  71. }
  72. @Deprecated
  73. @Override
  74. public Mono<ResponseEntity<Flux<TopicMessageEventDTO>>> getTopicMessages(String clusterName,
  75. String topicName,
  76. SeekTypeDTO seekType,
  77. List<String> seekTo,
  78. Integer limit,
  79. String q,
  80. MessageFilterTypeDTO filterQueryType,
  81. SeekDirectionDTO seekDirection,
  82. String keySerde,
  83. String valueSerde,
  84. ServerWebExchange exchange) {
  85. throw new ValidationException("Not supported");
  86. }
  87. @Override
  88. public Mono<ResponseEntity<Flux<TopicMessageEventDTO>>> getTopicMessagesV2(String clusterName, String topicName,
  89. PollingModeDTO mode,
  90. List<Integer> partitions,
  91. Integer limit,
  92. String stringFilter,
  93. String smartFilterId,
  94. Long offset,
  95. Long timestamp,
  96. String keySerde,
  97. String valueSerde,
  98. String cursor,
  99. ServerWebExchange exchange) {
  100. final Mono<Void> validateAccess = accessControlService.validateAccess(AccessContext.builder()
  101. .cluster(clusterName)
  102. .topic(topicName)
  103. .topicActions(MESSAGES_READ)
  104. .build());
  105. Flux<TopicMessageEventDTO> messagesFlux;
  106. if (cursor != null) {
  107. messagesFlux = messagesService.loadMessages(getCluster(clusterName), topicName, cursor);
  108. } else {
  109. messagesFlux = messagesService.loadMessages(
  110. getCluster(clusterName),
  111. topicName,
  112. ConsumerPosition.create(mode, topicName, partitions, timestamp, offset),
  113. stringFilter,
  114. smartFilterId,
  115. limit,
  116. keySerde,
  117. valueSerde
  118. );
  119. }
  120. return validateAccess.then(Mono.just(ResponseEntity.ok(messagesFlux)));
  121. }
  122. @Override
  123. public Mono<ResponseEntity<Void>> sendTopicMessages(
  124. String clusterName, String topicName, @Valid Mono<CreateTopicMessageDTO> createTopicMessage,
  125. ServerWebExchange exchange) {
  126. var context = AccessContext.builder()
  127. .cluster(clusterName)
  128. .topic(topicName)
  129. .topicActions(MESSAGES_PRODUCE)
  130. .operationName("sendTopicMessages")
  131. .build();
  132. return accessControlService.validateAccess(context).then(
  133. createTopicMessage.flatMap(msg ->
  134. messagesService.sendMessage(getCluster(clusterName), topicName, msg).then()
  135. ).map(ResponseEntity::ok)
  136. ).doOnEach(sig -> auditService.audit(context, sig));
  137. }
  138. @Override
  139. public Mono<ResponseEntity<TopicSerdeSuggestionDTO>> getSerdes(String clusterName,
  140. String topicName,
  141. SerdeUsageDTO use,
  142. ServerWebExchange exchange) {
  143. var context = AccessContext.builder()
  144. .cluster(clusterName)
  145. .topic(topicName)
  146. .topicActions(TopicAction.VIEW)
  147. .operationName("getSerdes")
  148. .build();
  149. TopicSerdeSuggestionDTO dto = new TopicSerdeSuggestionDTO()
  150. .key(use == SerdeUsageDTO.SERIALIZE
  151. ? deserializationService.getSerdesForSerialize(getCluster(clusterName), topicName, KEY)
  152. : deserializationService.getSerdesForDeserialize(getCluster(clusterName), topicName, KEY))
  153. .value(use == SerdeUsageDTO.SERIALIZE
  154. ? deserializationService.getSerdesForSerialize(getCluster(clusterName), topicName, VALUE)
  155. : deserializationService.getSerdesForDeserialize(getCluster(clusterName), topicName, VALUE));
  156. return accessControlService.validateAccess(context).then(
  157. Mono.just(dto)
  158. .subscribeOn(Schedulers.boundedElastic())
  159. .map(ResponseEntity::ok)
  160. );
  161. }
  162. @Override
  163. public Mono<ResponseEntity<MessageFilterIdDTO>> registerFilter(String clusterName,
  164. String topicName,
  165. Mono<MessageFilterRegistrationDTO> registration,
  166. ServerWebExchange exchange) {
  167. final Mono<Void> validateAccess = accessControlService.validateAccess(AccessContext.builder()
  168. .cluster(clusterName)
  169. .topic(topicName)
  170. .topicActions(MESSAGES_READ)
  171. .build());
  172. return validateAccess.then(registration)
  173. .map(reg -> messagesService.registerMessageFilter(reg.getFilterCode()))
  174. .map(id -> ResponseEntity.ok(new MessageFilterIdDTO().id(id)));
  175. }
  176. }