TopicsController.java 4.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116
  1. package com.provectus.kafka.ui.controller;
  2. import com.provectus.kafka.ui.api.TopicsApi;
  3. import com.provectus.kafka.ui.model.PartitionsIncreaseDTO;
  4. import com.provectus.kafka.ui.model.PartitionsIncreaseResponseDTO;
  5. import com.provectus.kafka.ui.model.ReplicationFactorChangeDTO;
  6. import com.provectus.kafka.ui.model.ReplicationFactorChangeResponseDTO;
  7. import com.provectus.kafka.ui.model.TopicColumnsToSortDTO;
  8. import com.provectus.kafka.ui.model.TopicConfigDTO;
  9. import com.provectus.kafka.ui.model.TopicCreationDTO;
  10. import com.provectus.kafka.ui.model.TopicDTO;
  11. import com.provectus.kafka.ui.model.TopicDetailsDTO;
  12. import com.provectus.kafka.ui.model.TopicUpdateDTO;
  13. import com.provectus.kafka.ui.model.TopicsResponseDTO;
  14. import com.provectus.kafka.ui.service.TopicsService;
  15. import java.util.Optional;
  16. import javax.validation.Valid;
  17. import lombok.RequiredArgsConstructor;
  18. import lombok.extern.log4j.Log4j2;
  19. import org.springframework.http.HttpStatus;
  20. import org.springframework.http.ResponseEntity;
  21. import org.springframework.web.bind.annotation.RestController;
  22. import org.springframework.web.server.ServerWebExchange;
  23. import reactor.core.publisher.Flux;
  24. import reactor.core.publisher.Mono;
  25. @RestController
  26. @RequiredArgsConstructor
  27. @Log4j2
  28. public class TopicsController extends AbstractController implements TopicsApi {
  29. private final TopicsService topicsService;
  30. @Override
  31. public Mono<ResponseEntity<TopicDTO>> createTopic(
  32. String clusterName, @Valid Mono<TopicCreationDTO> topicCreation, ServerWebExchange exchange) {
  33. return topicsService.createTopic(getCluster(clusterName), topicCreation)
  34. .map(s -> new ResponseEntity<>(s, HttpStatus.OK))
  35. .switchIfEmpty(Mono.just(ResponseEntity.notFound().build()));
  36. }
  37. @Override
  38. public Mono<ResponseEntity<Void>> deleteTopic(
  39. String clusterName, String topicName, ServerWebExchange exchange) {
  40. return topicsService.deleteTopic(getCluster(clusterName), topicName).map(ResponseEntity::ok);
  41. }
  42. @Override
  43. public Mono<ResponseEntity<Flux<TopicConfigDTO>>> getTopicConfigs(
  44. String clusterName, String topicName, ServerWebExchange exchange) {
  45. return Mono.just(
  46. topicsService.getTopicConfigs(getCluster(clusterName), topicName)
  47. .map(Flux::fromIterable)
  48. .map(ResponseEntity::ok)
  49. .orElse(ResponseEntity.notFound().build())
  50. );
  51. }
  52. @Override
  53. public Mono<ResponseEntity<TopicDetailsDTO>> getTopicDetails(
  54. String clusterName, String topicName, ServerWebExchange exchange) {
  55. return Mono.just(
  56. topicsService.getTopicDetails(getCluster(clusterName), topicName)
  57. .map(ResponseEntity::ok)
  58. .orElse(ResponseEntity.notFound().build())
  59. );
  60. }
  61. @Override
  62. public Mono<ResponseEntity<TopicsResponseDTO>> getTopics(String clusterName, @Valid Integer page,
  63. @Valid Integer perPage,
  64. @Valid Boolean showInternal,
  65. @Valid String search,
  66. @Valid TopicColumnsToSortDTO orderBy,
  67. ServerWebExchange exchange) {
  68. return Mono.just(ResponseEntity.ok(topicsService
  69. .getTopics(
  70. getCluster(clusterName),
  71. Optional.ofNullable(page),
  72. Optional.ofNullable(perPage),
  73. Optional.ofNullable(showInternal),
  74. Optional.ofNullable(search),
  75. Optional.ofNullable(orderBy)
  76. )));
  77. }
  78. @Override
  79. public Mono<ResponseEntity<TopicDTO>> updateTopic(
  80. String clusterId, String topicName, @Valid Mono<TopicUpdateDTO> topicUpdate,
  81. ServerWebExchange exchange) {
  82. return topicsService
  83. .updateTopic(getCluster(clusterId), topicName, topicUpdate).map(ResponseEntity::ok);
  84. }
  85. @Override
  86. public Mono<ResponseEntity<PartitionsIncreaseResponseDTO>> increaseTopicPartitions(
  87. String clusterName, String topicName,
  88. Mono<PartitionsIncreaseDTO> partitionsIncrease,
  89. ServerWebExchange exchange) {
  90. return partitionsIncrease.flatMap(
  91. partitions ->
  92. topicsService.increaseTopicPartitions(getCluster(clusterName), topicName, partitions))
  93. .map(ResponseEntity::ok);
  94. }
  95. @Override
  96. public Mono<ResponseEntity<ReplicationFactorChangeResponseDTO>> changeReplicationFactor(
  97. String clusterName, String topicName,
  98. Mono<ReplicationFactorChangeDTO> replicationFactorChange,
  99. ServerWebExchange exchange) {
  100. return replicationFactorChange
  101. .flatMap(rfc ->
  102. topicsService.changeReplicationFactor(getCluster(clusterName), topicName, rfc))
  103. .map(ResponseEntity::ok);
  104. }
  105. }