MessagesController.java 2.6 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970
  1. package com.provectus.kafka.ui.controller;
  2. import com.provectus.kafka.ui.api.MessagesApi;
  3. import com.provectus.kafka.ui.model.ConsumerPosition;
  4. import com.provectus.kafka.ui.model.SeekDirection;
  5. import com.provectus.kafka.ui.model.SeekType;
  6. import com.provectus.kafka.ui.model.TopicMessage;
  7. import com.provectus.kafka.ui.service.ClusterService;
  8. import java.util.Collections;
  9. import java.util.List;
  10. import java.util.Optional;
  11. import java.util.function.Function;
  12. import javax.validation.Valid;
  13. import lombok.RequiredArgsConstructor;
  14. import lombok.extern.log4j.Log4j2;
  15. import org.apache.commons.lang3.tuple.Pair;
  16. import org.springframework.http.ResponseEntity;
  17. import org.springframework.web.bind.annotation.RestController;
  18. import org.springframework.web.server.ServerWebExchange;
  19. import reactor.core.publisher.Flux;
  20. import reactor.core.publisher.Mono;
  21. @RestController
  22. @RequiredArgsConstructor
  23. @Log4j2
  24. public class MessagesController implements MessagesApi {
  25. private final ClusterService clusterService;
  26. @Override
  27. public Mono<ResponseEntity<Void>> deleteTopicMessages(
  28. String clusterName, String topicName, @Valid List<Integer> partitions,
  29. ServerWebExchange exchange) {
  30. return clusterService.deleteTopicMessages(
  31. clusterName,
  32. topicName,
  33. Optional.ofNullable(partitions).orElse(List.of())
  34. ).map(ResponseEntity::ok);
  35. }
  36. @Override
  37. public Mono<ResponseEntity<Flux<TopicMessage>>> getTopicMessages(
  38. String clusterName, String topicName, @Valid SeekType seekType, @Valid List<String> seekTo,
  39. @Valid Integer limit, @Valid String q, @Valid SeekDirection seekDirection,
  40. ServerWebExchange exchange) {
  41. return parseConsumerPosition(seekType, seekTo, seekDirection)
  42. .map(consumerPosition -> ResponseEntity
  43. .ok(clusterService.getMessages(clusterName, topicName, consumerPosition, q, limit)));
  44. }
  45. private Mono<ConsumerPosition> parseConsumerPosition(
  46. SeekType seekType, List<String> seekTo, SeekDirection seekDirection) {
  47. return Mono.justOrEmpty(seekTo)
  48. .defaultIfEmpty(Collections.emptyList())
  49. .flatMapIterable(Function.identity())
  50. .map(p -> {
  51. String[] splited = p.split("::");
  52. if (splited.length != 2) {
  53. throw new IllegalArgumentException(
  54. "Wrong seekTo argument format. See API docs for details");
  55. }
  56. return Pair.of(Integer.parseInt(splited[0]), Long.parseLong(splited[1]));
  57. })
  58. .collectMap(Pair::getKey, Pair::getValue)
  59. .map(positions -> new ConsumerPosition(seekType != null ? seekType : SeekType.BEGINNING,
  60. positions, seekDirection));
  61. }
  62. }