MessagesController.java 3.3 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889
  1. package com.provectus.kafka.ui.controller;
  2. import com.provectus.kafka.ui.api.MessagesApi;
  3. import com.provectus.kafka.ui.model.ConsumerPosition;
  4. import com.provectus.kafka.ui.model.CreateTopicMessage;
  5. import com.provectus.kafka.ui.model.SeekDirection;
  6. import com.provectus.kafka.ui.model.SeekType;
  7. import com.provectus.kafka.ui.model.TopicMessage;
  8. import com.provectus.kafka.ui.model.TopicMessageSchema;
  9. import com.provectus.kafka.ui.service.ClusterService;
  10. import java.util.Collections;
  11. import java.util.List;
  12. import java.util.Optional;
  13. import java.util.function.Function;
  14. import javax.validation.Valid;
  15. import lombok.RequiredArgsConstructor;
  16. import lombok.extern.log4j.Log4j2;
  17. import org.apache.commons.lang3.tuple.Pair;
  18. import org.springframework.http.ResponseEntity;
  19. import org.springframework.web.bind.annotation.RestController;
  20. import org.springframework.web.server.ServerWebExchange;
  21. import reactor.core.publisher.Flux;
  22. import reactor.core.publisher.Mono;
  23. @RestController
  24. @RequiredArgsConstructor
  25. @Log4j2
  26. public class MessagesController implements MessagesApi {
  27. private final ClusterService clusterService;
  28. @Override
  29. public Mono<ResponseEntity<Void>> deleteTopicMessages(
  30. String clusterName, String topicName, @Valid List<Integer> partitions,
  31. ServerWebExchange exchange) {
  32. return clusterService.deleteTopicMessages(
  33. clusterName,
  34. topicName,
  35. Optional.ofNullable(partitions).orElse(List.of())
  36. ).map(ResponseEntity::ok);
  37. }
  38. @Override
  39. public Mono<ResponseEntity<Flux<TopicMessage>>> getTopicMessages(
  40. String clusterName, String topicName, @Valid SeekType seekType, @Valid List<String> seekTo,
  41. @Valid Integer limit, @Valid String q, @Valid SeekDirection seekDirection,
  42. ServerWebExchange exchange) {
  43. return parseConsumerPosition(seekType, seekTo, seekDirection)
  44. .map(consumerPosition -> ResponseEntity
  45. .ok(clusterService.getMessages(clusterName, topicName, consumerPosition, q, limit)));
  46. }
  47. @Override
  48. public Mono<ResponseEntity<TopicMessageSchema>> getTopicSchema(
  49. String clusterName, String topicName, ServerWebExchange exchange) {
  50. return Mono.just(clusterService.getTopicSchema(clusterName, topicName))
  51. .map(ResponseEntity::ok);
  52. }
  53. @Override
  54. public Mono<ResponseEntity<Void>> sendTopicMessages(
  55. String clusterName, String topicName, @Valid Mono<CreateTopicMessage> createTopicMessage,
  56. ServerWebExchange exchange) {
  57. return createTopicMessage.flatMap(msg ->
  58. clusterService.sendMessage(clusterName, topicName, msg)
  59. ).map(ResponseEntity::ok);
  60. }
  61. private Mono<ConsumerPosition> parseConsumerPosition(
  62. SeekType seekType, List<String> seekTo, SeekDirection seekDirection) {
  63. return Mono.justOrEmpty(seekTo)
  64. .defaultIfEmpty(Collections.emptyList())
  65. .flatMapIterable(Function.identity())
  66. .map(p -> {
  67. String[] splited = p.split("::");
  68. if (splited.length != 2) {
  69. throw new IllegalArgumentException(
  70. "Wrong seekTo argument format. See API docs for details");
  71. }
  72. return Pair.of(Integer.parseInt(splited[0]), Long.parseLong(splited[1]));
  73. })
  74. .collectMap(Pair::getKey, Pair::getValue)
  75. .map(positions -> new ConsumerPosition(seekType != null ? seekType : SeekType.BEGINNING,
  76. positions, seekDirection));
  77. }
  78. }