MetricsRestController.java 3.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081
  1. package com.provectus.kafka.ui.rest;
  2. import com.provectus.kafka.ui.api.ApiClustersApi;
  3. import com.provectus.kafka.ui.cluster.service.ClusterService;
  4. import com.provectus.kafka.ui.model.*;
  5. import lombok.RequiredArgsConstructor;
  6. import org.springframework.http.HttpStatus;
  7. import org.springframework.http.ResponseEntity;
  8. import org.springframework.web.bind.annotation.RestController;
  9. import org.springframework.web.server.ServerWebExchange;
  10. import reactor.core.publisher.Flux;
  11. import reactor.core.publisher.Mono;
  12. import javax.validation.Valid;
  13. import java.util.ArrayList;
  14. @RestController
  15. @RequiredArgsConstructor
  16. public class MetricsRestController implements ApiClustersApi {
  17. private final ClusterService clusterService;
  18. @Override
  19. public Mono<ResponseEntity<Flux<Cluster>>> getClusters(ServerWebExchange exchange) {
  20. return Mono.just(ResponseEntity.ok(Flux.fromIterable(clusterService.getClusters())));
  21. }
  22. @Override
  23. public Mono<ResponseEntity<BrokersMetrics>> getBrokersMetrics(String clusterId, ServerWebExchange exchange) {
  24. return Mono.just(
  25. clusterService.getBrokersMetrics(clusterId)
  26. .map(ResponseEntity::ok)
  27. .orElse(ResponseEntity.notFound().build())
  28. );
  29. }
  30. @Override
  31. public Mono<ResponseEntity<Flux<Topic>>> getTopics(String clusterId, ServerWebExchange exchange) {
  32. return Mono.just(ResponseEntity.ok(Flux.fromIterable(clusterService.getTopics(clusterId))));
  33. }
  34. @Override
  35. public Mono<ResponseEntity<TopicDetails>> getTopicDetails(String clusterId, String topicName, ServerWebExchange exchange) {
  36. return Mono.just(
  37. clusterService.getTopicDetails(clusterId, topicName)
  38. .map(ResponseEntity::ok)
  39. .orElse(ResponseEntity.notFound().build())
  40. );
  41. }
  42. @Override
  43. public Mono<ResponseEntity<Flux<TopicConfig>>> getTopicConfigs(String clusterId, String topicName, ServerWebExchange exchange) {
  44. return Mono.just(
  45. clusterService.getTopicConfigs(clusterId, topicName)
  46. .map(Flux::fromIterable)
  47. .map(ResponseEntity::ok)
  48. .orElse(ResponseEntity.notFound().build())
  49. );
  50. }
  51. @Override
  52. public Mono<ResponseEntity<Topic>> createTopic(String clusterId, @Valid Mono<TopicFormData> topicFormData, ServerWebExchange exchange) {
  53. return clusterService.createTopic(clusterId, topicFormData)
  54. .map(s -> new ResponseEntity<>(s, HttpStatus.OK))
  55. .switchIfEmpty(Mono.just(ResponseEntity.notFound().build()));
  56. }
  57. @Override
  58. public Mono<ResponseEntity<Flux<Broker>>> getBrokers(String clusterId, ServerWebExchange exchange) {
  59. //TODO: ????
  60. return Mono.just(ResponseEntity.ok(Flux.fromIterable(new ArrayList<>())));
  61. }
  62. @Override
  63. public Mono<ResponseEntity<Flux<ConsumerGroup>>> getConsumerGroup(String clusterName, ServerWebExchange exchange) {
  64. return clusterService.getConsumerGroups(clusterName)
  65. .map(Flux::fromIterable)
  66. .map(ResponseEntity::ok)
  67. .switchIfEmpty(Mono.just(ResponseEntity.notFound().build())); // TODO: check behaviour on cluster not found and empty groups list
  68. }
  69. }