MessagesController.java 6.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140
  1. package com.provectus.kafka.ui.controller;
  2. import static com.provectus.kafka.ui.serde.api.Serde.Target.KEY;
  3. import static com.provectus.kafka.ui.serde.api.Serde.Target.VALUE;
  4. import static java.util.stream.Collectors.toMap;
  5. import com.provectus.kafka.ui.api.MessagesApi;
  6. import com.provectus.kafka.ui.exception.ValidationException;
  7. import com.provectus.kafka.ui.model.ConsumerPosition;
  8. import com.provectus.kafka.ui.model.CreateTopicMessageDTO;
  9. import com.provectus.kafka.ui.model.MessageFilterTypeDTO;
  10. import com.provectus.kafka.ui.model.SeekDirectionDTO;
  11. import com.provectus.kafka.ui.model.SeekTypeDTO;
  12. import com.provectus.kafka.ui.model.SerdeUsageDTO;
  13. import com.provectus.kafka.ui.model.TopicMessageEventDTO;
  14. import com.provectus.kafka.ui.model.TopicSerdeSuggestionDTO;
  15. import com.provectus.kafka.ui.service.DeserializationService;
  16. import com.provectus.kafka.ui.service.MessagesService;
  17. import java.util.List;
  18. import java.util.Map;
  19. import java.util.Optional;
  20. import javax.annotation.Nullable;
  21. import javax.validation.Valid;
  22. import lombok.RequiredArgsConstructor;
  23. import lombok.extern.slf4j.Slf4j;
  24. import org.apache.commons.lang3.tuple.Pair;
  25. import org.apache.kafka.common.TopicPartition;
  26. import org.springframework.http.ResponseEntity;
  27. import org.springframework.web.bind.annotation.RestController;
  28. import org.springframework.web.server.ServerWebExchange;
  29. import reactor.core.publisher.Flux;
  30. import reactor.core.publisher.Mono;
  31. @RestController
  32. @RequiredArgsConstructor
  33. @Slf4j
  34. public class MessagesController extends AbstractController implements MessagesApi {
  35. private static final int MAX_LOAD_RECORD_LIMIT = 100;
  36. private static final int DEFAULT_LOAD_RECORD_LIMIT = 20;
  37. private final MessagesService messagesService;
  38. private final DeserializationService deserializationService;
  39. @Override
  40. public Mono<ResponseEntity<Void>> deleteTopicMessages(
  41. String clusterName, String topicName, @Valid List<Integer> partitions,
  42. ServerWebExchange exchange) {
  43. return messagesService.deleteTopicMessages(
  44. getCluster(clusterName),
  45. topicName,
  46. Optional.ofNullable(partitions).orElse(List.of())
  47. ).thenReturn(ResponseEntity.ok().build());
  48. }
  49. @Override
  50. public Mono<ResponseEntity<Flux<TopicMessageEventDTO>>> getTopicMessages(String clusterName,
  51. String topicName,
  52. SeekTypeDTO seekType,
  53. List<String> seekTo,
  54. Integer limit,
  55. String q,
  56. MessageFilterTypeDTO filterQueryType,
  57. SeekDirectionDTO seekDirection,
  58. String keySerde,
  59. String valueSerde,
  60. ServerWebExchange exchange) {
  61. seekType = seekType != null ? seekType : SeekTypeDTO.BEGINNING;
  62. seekDirection = seekDirection != null ? seekDirection : SeekDirectionDTO.FORWARD;
  63. filterQueryType = filterQueryType != null ? filterQueryType : MessageFilterTypeDTO.STRING_CONTAINS;
  64. int recordsLimit =
  65. Optional.ofNullable(limit).map(s -> Math.min(s, MAX_LOAD_RECORD_LIMIT)).orElse(DEFAULT_LOAD_RECORD_LIMIT);
  66. var positions = new ConsumerPosition(
  67. seekType,
  68. topicName,
  69. parseSeekTo(topicName, seekType, seekTo)
  70. );
  71. return Mono.just(
  72. ResponseEntity.ok(
  73. messagesService.loadMessages(
  74. getCluster(clusterName), topicName, positions, q, filterQueryType,
  75. recordsLimit, seekDirection, keySerde, valueSerde)
  76. )
  77. );
  78. }
  79. @Override
  80. public Mono<ResponseEntity<Void>> sendTopicMessages(
  81. String clusterName, String topicName, @Valid Mono<CreateTopicMessageDTO> createTopicMessage,
  82. ServerWebExchange exchange) {
  83. return createTopicMessage.flatMap(msg ->
  84. messagesService.sendMessage(getCluster(clusterName), topicName, msg).then()
  85. ).map(ResponseEntity::ok);
  86. }
  87. /**
  88. * The format is [partition]::[offset] for specifying offsets
  89. * or [partition]::[timestamp in millis] for specifying timestamps.
  90. */
  91. @Nullable
  92. private Map<TopicPartition, Long> parseSeekTo(String topic, SeekTypeDTO seekType, List<String> seekTo) {
  93. if (seekTo == null || seekTo.isEmpty()) {
  94. if (seekType == SeekTypeDTO.LATEST || seekType == SeekTypeDTO.BEGINNING) {
  95. return null;
  96. }
  97. throw new ValidationException("seekTo should be set if seekType is " + seekType);
  98. }
  99. return seekTo.stream()
  100. .map(p -> {
  101. String[] split = p.split("::");
  102. if (split.length != 2) {
  103. throw new IllegalArgumentException(
  104. "Wrong seekTo argument format. See API docs for details");
  105. }
  106. return Pair.of(
  107. new TopicPartition(topic, Integer.parseInt(split[0])),
  108. Long.parseLong(split[1])
  109. );
  110. })
  111. .collect(toMap(Pair::getKey, Pair::getValue));
  112. }
  113. @Override
  114. public Mono<ResponseEntity<TopicSerdeSuggestionDTO>> getSerdes(String clusterName,
  115. String topicName,
  116. SerdeUsageDTO use,
  117. ServerWebExchange exchange) {
  118. return Mono.just(
  119. new TopicSerdeSuggestionDTO()
  120. .key(use == SerdeUsageDTO.SERIALIZE
  121. ? deserializationService.getSerdesForSerialize(getCluster(clusterName), topicName, KEY)
  122. : deserializationService.getSerdesForDeserialize(getCluster(clusterName), topicName, KEY))
  123. .value(use == SerdeUsageDTO.SERIALIZE
  124. ? deserializationService.getSerdesForSerialize(getCluster(clusterName), topicName, VALUE)
  125. : deserializationService.getSerdesForDeserialize(getCluster(clusterName), topicName, VALUE))
  126. ).map(ResponseEntity::ok);
  127. }
  128. }