TopicsController.java 3.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101
  1. package com.provectus.kafka.ui.controller;
  2. import com.provectus.kafka.ui.api.TopicsApi;
  3. import com.provectus.kafka.ui.model.PartitionsIncrease;
  4. import com.provectus.kafka.ui.model.PartitionsIncreaseResponse;
  5. import com.provectus.kafka.ui.model.Topic;
  6. import com.provectus.kafka.ui.model.TopicColumnsToSort;
  7. import com.provectus.kafka.ui.model.TopicConfig;
  8. import com.provectus.kafka.ui.model.TopicCreation;
  9. import com.provectus.kafka.ui.model.TopicDetails;
  10. import com.provectus.kafka.ui.model.TopicUpdate;
  11. import com.provectus.kafka.ui.model.TopicsResponse;
  12. import com.provectus.kafka.ui.service.ClusterService;
  13. import java.util.Optional;
  14. import javax.validation.Valid;
  15. import lombok.RequiredArgsConstructor;
  16. import lombok.extern.log4j.Log4j2;
  17. import org.springframework.http.HttpStatus;
  18. import org.springframework.http.ResponseEntity;
  19. import org.springframework.web.bind.annotation.RestController;
  20. import org.springframework.web.server.ServerWebExchange;
  21. import reactor.core.publisher.Flux;
  22. import reactor.core.publisher.Mono;
  23. @RestController
  24. @RequiredArgsConstructor
  25. @Log4j2
  26. public class TopicsController implements TopicsApi {
  27. private final ClusterService clusterService;
  28. @Override
  29. public Mono<ResponseEntity<Topic>> createTopic(
  30. String clusterName, @Valid Mono<TopicCreation> topicCreation, ServerWebExchange exchange) {
  31. return clusterService.createTopic(clusterName, topicCreation)
  32. .map(s -> new ResponseEntity<>(s, HttpStatus.OK))
  33. .switchIfEmpty(Mono.just(ResponseEntity.notFound().build()));
  34. }
  35. @Override
  36. public Mono<ResponseEntity<Void>> deleteTopic(
  37. String clusterName, String topicName, ServerWebExchange exchange) {
  38. return clusterService.deleteTopic(clusterName, topicName).map(ResponseEntity::ok);
  39. }
  40. @Override
  41. public Mono<ResponseEntity<Flux<TopicConfig>>> getTopicConfigs(
  42. String clusterName, String topicName, ServerWebExchange exchange) {
  43. return Mono.just(
  44. clusterService.getTopicConfigs(clusterName, topicName)
  45. .map(Flux::fromIterable)
  46. .map(ResponseEntity::ok)
  47. .orElse(ResponseEntity.notFound().build())
  48. );
  49. }
  50. @Override
  51. public Mono<ResponseEntity<TopicDetails>> getTopicDetails(
  52. String clusterName, String topicName, ServerWebExchange exchange) {
  53. return Mono.just(
  54. clusterService.getTopicDetails(clusterName, topicName)
  55. .map(ResponseEntity::ok)
  56. .orElse(ResponseEntity.notFound().build())
  57. );
  58. }
  59. @Override
  60. public Mono<ResponseEntity<TopicsResponse>> getTopics(String clusterName, @Valid Integer page,
  61. @Valid Integer perPage,
  62. @Valid Boolean showInternal,
  63. @Valid String search,
  64. @Valid TopicColumnsToSort orderBy,
  65. ServerWebExchange exchange) {
  66. return Mono.just(ResponseEntity.ok(clusterService
  67. .getTopics(
  68. clusterName,
  69. Optional.ofNullable(page),
  70. Optional.ofNullable(perPage),
  71. Optional.ofNullable(showInternal),
  72. Optional.ofNullable(search),
  73. Optional.ofNullable(orderBy)
  74. )));
  75. }
  76. @Override
  77. public Mono<ResponseEntity<Topic>> updateTopic(
  78. String clusterId, String topicName, @Valid Mono<TopicUpdate> topicUpdate,
  79. ServerWebExchange exchange) {
  80. return clusterService.updateTopic(clusterId, topicName, topicUpdate).map(ResponseEntity::ok);
  81. }
  82. @Override
  83. public Mono<ResponseEntity<PartitionsIncreaseResponse>> increaseTopicPartitions(
  84. String clusterName, String topicName,
  85. Mono<PartitionsIncrease> partitionsIncrease,
  86. ServerWebExchange exchange) {
  87. return partitionsIncrease.flatMap(
  88. partitions -> clusterService.increaseTopicPartitions(clusterName, topicName, partitions))
  89. .map(ResponseEntity::ok);
  90. }
  91. }