MessagesController.java 5.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130
  1. package com.provectus.kafka.ui.controller;
  2. import static com.provectus.kafka.ui.serde.api.Serde.Target.KEY;
  3. import static com.provectus.kafka.ui.serde.api.Serde.Target.VALUE;
  4. import static java.util.stream.Collectors.toMap;
  5. import com.provectus.kafka.ui.api.MessagesApi;
  6. import com.provectus.kafka.ui.model.ConsumerPosition;
  7. import com.provectus.kafka.ui.model.CreateTopicMessageDTO;
  8. import com.provectus.kafka.ui.model.MessageFilterTypeDTO;
  9. import com.provectus.kafka.ui.model.SeekDirectionDTO;
  10. import com.provectus.kafka.ui.model.SeekTypeDTO;
  11. import com.provectus.kafka.ui.model.SerdeUsageDTO;
  12. import com.provectus.kafka.ui.model.TopicMessageEventDTO;
  13. import com.provectus.kafka.ui.model.TopicSerdeSuggestionDTO;
  14. import com.provectus.kafka.ui.service.DeserializationService;
  15. import com.provectus.kafka.ui.service.MessagesService;
  16. import java.util.List;
  17. import java.util.Map;
  18. import java.util.Optional;
  19. import javax.validation.Valid;
  20. import lombok.RequiredArgsConstructor;
  21. import lombok.extern.slf4j.Slf4j;
  22. import org.apache.commons.lang3.tuple.Pair;
  23. import org.apache.kafka.common.TopicPartition;
  24. import org.springframework.http.ResponseEntity;
  25. import org.springframework.web.bind.annotation.RestController;
  26. import org.springframework.web.server.ServerWebExchange;
  27. import reactor.core.publisher.Flux;
  28. import reactor.core.publisher.Mono;
  29. @RestController
  30. @RequiredArgsConstructor
  31. @Slf4j
  32. public class MessagesController extends AbstractController implements MessagesApi {
  33. private static final int MAX_LOAD_RECORD_LIMIT = 100;
  34. private static final int DEFAULT_LOAD_RECORD_LIMIT = 20;
  35. private final MessagesService messagesService;
  36. private final DeserializationService deserializationService;
  37. @Override
  38. public Mono<ResponseEntity<Void>> deleteTopicMessages(
  39. String clusterName, String topicName, @Valid List<Integer> partitions,
  40. ServerWebExchange exchange) {
  41. return messagesService.deleteTopicMessages(
  42. getCluster(clusterName),
  43. topicName,
  44. Optional.ofNullable(partitions).orElse(List.of())
  45. ).thenReturn(ResponseEntity.ok().build());
  46. }
  47. @Override
  48. public Mono<ResponseEntity<Flux<TopicMessageEventDTO>>> getTopicMessages(String clusterName,
  49. String topicName,
  50. SeekTypeDTO seekType,
  51. List<String> seekTo,
  52. Integer limit,
  53. String q,
  54. MessageFilterTypeDTO filterQueryType,
  55. SeekDirectionDTO seekDirection,
  56. String keySerde,
  57. String valueSerde,
  58. ServerWebExchange exchange) {
  59. var positions = new ConsumerPosition(
  60. seekType != null ? seekType : SeekTypeDTO.BEGINNING,
  61. parseSeekTo(topicName, seekTo),
  62. seekDirection
  63. );
  64. int recordsLimit = Optional.ofNullable(limit)
  65. .map(s -> Math.min(s, MAX_LOAD_RECORD_LIMIT))
  66. .orElse(DEFAULT_LOAD_RECORD_LIMIT);
  67. return Mono.just(
  68. ResponseEntity.ok(
  69. messagesService.loadMessages(
  70. getCluster(clusterName), topicName, positions, q, filterQueryType, recordsLimit, keySerde, valueSerde)
  71. )
  72. );
  73. }
  74. @Override
  75. public Mono<ResponseEntity<Void>> sendTopicMessages(
  76. String clusterName, String topicName, @Valid Mono<CreateTopicMessageDTO> createTopicMessage,
  77. ServerWebExchange exchange) {
  78. return createTopicMessage.flatMap(msg ->
  79. messagesService.sendMessage(getCluster(clusterName), topicName, msg).then()
  80. ).map(ResponseEntity::ok);
  81. }
  82. /**
  83. * The format is [partition]::[offset] for specifying offsets
  84. * or [partition]::[timestamp in millis] for specifying timestamps.
  85. */
  86. private Map<TopicPartition, Long> parseSeekTo(String topic, List<String> seekTo) {
  87. if (seekTo == null || seekTo.isEmpty()) {
  88. return Map.of();
  89. }
  90. return seekTo.stream()
  91. .map(p -> {
  92. String[] split = p.split("::");
  93. if (split.length != 2) {
  94. throw new IllegalArgumentException(
  95. "Wrong seekTo argument format. See API docs for details");
  96. }
  97. return Pair.of(
  98. new TopicPartition(topic, Integer.parseInt(split[0])),
  99. Long.parseLong(split[1])
  100. );
  101. })
  102. .collect(toMap(Pair::getKey, Pair::getValue));
  103. }
  104. @Override
  105. public Mono<ResponseEntity<TopicSerdeSuggestionDTO>> getSerdes(String clusterName,
  106. String topicName,
  107. SerdeUsageDTO use,
  108. ServerWebExchange exchange) {
  109. return Mono.just(
  110. new TopicSerdeSuggestionDTO()
  111. .key(use == SerdeUsageDTO.SERIALIZE
  112. ? deserializationService.getSerdesForSerialize(getCluster(clusterName), topicName, KEY)
  113. : deserializationService.getSerdesForDeserialize(getCluster(clusterName), topicName, KEY))
  114. .value(use == SerdeUsageDTO.SERIALIZE
  115. ? deserializationService.getSerdesForSerialize(getCluster(clusterName), topicName, VALUE)
  116. : deserializationService.getSerdesForDeserialize(getCluster(clusterName), topicName, VALUE))
  117. ).map(ResponseEntity::ok);
  118. }
  119. }