BrokersController.java 3.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081
  1. package com.provectus.kafka.ui.controller;
  2. import com.provectus.kafka.ui.api.BrokersApi;
  3. import com.provectus.kafka.ui.mapper.ClusterMapper;
  4. import com.provectus.kafka.ui.model.BrokerConfigDTO;
  5. import com.provectus.kafka.ui.model.BrokerConfigItemDTO;
  6. import com.provectus.kafka.ui.model.BrokerDTO;
  7. import com.provectus.kafka.ui.model.BrokerLogdirUpdateDTO;
  8. import com.provectus.kafka.ui.model.BrokerMetricsDTO;
  9. import com.provectus.kafka.ui.model.BrokersLogdirsDTO;
  10. import com.provectus.kafka.ui.service.BrokerService;
  11. import java.util.List;
  12. import lombok.RequiredArgsConstructor;
  13. import lombok.extern.slf4j.Slf4j;
  14. import org.springframework.http.ResponseEntity;
  15. import org.springframework.web.bind.annotation.RestController;
  16. import org.springframework.web.server.ServerWebExchange;
  17. import reactor.core.publisher.Flux;
  18. import reactor.core.publisher.Mono;
  19. @RestController
  20. @RequiredArgsConstructor
  21. @Slf4j
  22. public class BrokersController extends AbstractController implements BrokersApi {
  23. private final BrokerService brokerService;
  24. private final ClusterMapper clusterMapper;
  25. @Override
  26. public Mono<ResponseEntity<BrokerMetricsDTO>> getBrokersMetrics(String clusterName, Integer id,
  27. ServerWebExchange exchange) {
  28. return brokerService.getBrokerMetrics(getCluster(clusterName), id)
  29. .map(clusterMapper::toBrokerMetrics)
  30. .map(ResponseEntity::ok)
  31. .onErrorReturn(ResponseEntity.notFound().build());
  32. }
  33. @Override
  34. public Mono<ResponseEntity<Flux<BrokerDTO>>> getBrokers(String clusterName,
  35. ServerWebExchange exchange) {
  36. return Mono.just(ResponseEntity.ok(
  37. brokerService.getBrokers(getCluster(clusterName)).map(clusterMapper::toBrokerDto)));
  38. }
  39. @Override
  40. public Mono<ResponseEntity<Flux<BrokersLogdirsDTO>>> getAllBrokersLogdirs(String clusterName,
  41. List<Integer> brokers,
  42. ServerWebExchange exchange
  43. ) {
  44. return Mono.just(ResponseEntity.ok(
  45. brokerService.getAllBrokersLogdirs(getCluster(clusterName), brokers)));
  46. }
  47. @Override
  48. public Mono<ResponseEntity<Flux<BrokerConfigDTO>>> getBrokerConfig(String clusterName, Integer id,
  49. ServerWebExchange exchange) {
  50. return Mono.just(ResponseEntity.ok(
  51. brokerService.getBrokerConfig(getCluster(clusterName), id)
  52. .map(clusterMapper::toBrokerConfig)));
  53. }
  54. @Override
  55. public Mono<ResponseEntity<Void>> updateBrokerTopicPartitionLogDir(
  56. String clusterName, Integer id, Mono<BrokerLogdirUpdateDTO> brokerLogdir,
  57. ServerWebExchange exchange) {
  58. return brokerLogdir
  59. .flatMap(bld -> brokerService.updateBrokerLogDir(getCluster(clusterName), id, bld))
  60. .map(ResponseEntity::ok);
  61. }
  62. @Override
  63. public Mono<ResponseEntity<Void>> updateBrokerConfigByName(String clusterName,
  64. Integer id,
  65. String name,
  66. Mono<BrokerConfigItemDTO> brokerConfig,
  67. ServerWebExchange exchange) {
  68. return brokerConfig
  69. .flatMap(bci -> brokerService.updateBrokerConfigByName(
  70. getCluster(clusterName), id, name, bci.getValue()))
  71. .map(ResponseEntity::ok);
  72. }
  73. }