BrokersController.java 3.3 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576
  1. package com.provectus.kafka.ui.controller;
  2. import com.provectus.kafka.ui.api.BrokersApi;
  3. import com.provectus.kafka.ui.model.BrokerConfigDTO;
  4. import com.provectus.kafka.ui.model.BrokerConfigItemDTO;
  5. import com.provectus.kafka.ui.model.BrokerDTO;
  6. import com.provectus.kafka.ui.model.BrokerLogdirUpdateDTO;
  7. import com.provectus.kafka.ui.model.BrokerMetricsDTO;
  8. import com.provectus.kafka.ui.model.BrokersLogdirsDTO;
  9. import com.provectus.kafka.ui.service.BrokerService;
  10. import java.util.List;
  11. import lombok.RequiredArgsConstructor;
  12. import lombok.extern.log4j.Log4j2;
  13. import org.springframework.http.ResponseEntity;
  14. import org.springframework.web.bind.annotation.RestController;
  15. import org.springframework.web.server.ServerWebExchange;
  16. import reactor.core.publisher.Flux;
  17. import reactor.core.publisher.Mono;
  18. @RestController
  19. @RequiredArgsConstructor
  20. @Log4j2
  21. public class BrokersController extends AbstractController implements BrokersApi {
  22. private final BrokerService brokerService;
  23. @Override
  24. public Mono<ResponseEntity<BrokerMetricsDTO>> getBrokersMetrics(String clusterName, Integer id,
  25. ServerWebExchange exchange) {
  26. return brokerService.getBrokerMetrics(getCluster(clusterName), id)
  27. .map(ResponseEntity::ok)
  28. .onErrorReturn(ResponseEntity.notFound().build());
  29. }
  30. @Override
  31. public Mono<ResponseEntity<Flux<BrokerDTO>>> getBrokers(String clusterName,
  32. ServerWebExchange exchange) {
  33. return Mono.just(ResponseEntity.ok(brokerService.getBrokers(getCluster(clusterName))));
  34. }
  35. @Override
  36. public Mono<ResponseEntity<Flux<BrokersLogdirsDTO>>> getAllBrokersLogdirs(String clusterName,
  37. List<Integer> brokers,
  38. ServerWebExchange exchange
  39. ) {
  40. return Mono.just(ResponseEntity.ok(
  41. brokerService.getAllBrokersLogdirs(getCluster(clusterName), brokers)));
  42. }
  43. @Override
  44. public Mono<ResponseEntity<Flux<BrokerConfigDTO>>> getBrokerConfig(String clusterName, Integer id,
  45. ServerWebExchange exchange) {
  46. return Mono.just(ResponseEntity.ok(
  47. brokerService.getBrokerConfig(getCluster(clusterName), id)));
  48. }
  49. @Override
  50. public Mono<ResponseEntity<Void>> updateBrokerTopicPartitionLogDir(
  51. String clusterName, Integer id, Mono<BrokerLogdirUpdateDTO> brokerLogdir,
  52. ServerWebExchange exchange) {
  53. return brokerLogdir
  54. .flatMap(bld -> brokerService.updateBrokerLogDir(getCluster(clusterName), id, bld))
  55. .map(ResponseEntity::ok);
  56. }
  57. @Override
  58. public Mono<ResponseEntity<Void>> updateBrokerConfigByName(String clusterName,
  59. Integer id,
  60. String name,
  61. Mono<BrokerConfigItemDTO> brokerConfig,
  62. ServerWebExchange exchange) {
  63. return brokerConfig
  64. .flatMap(bci -> brokerService.updateBrokerConfigByName(
  65. getCluster(clusterName), id, name, bci.getValue()))
  66. .map(ResponseEntity::ok);
  67. }
  68. }