BrokerServiceImpl.java 3.2 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889
  1. package com.provectus.kafka.ui.service;
  2. import com.provectus.kafka.ui.exception.IllegalEntityStateException;
  3. import com.provectus.kafka.ui.exception.NotFoundException;
  4. import com.provectus.kafka.ui.model.BrokerDTO;
  5. import com.provectus.kafka.ui.model.InternalBrokerConfig;
  6. import com.provectus.kafka.ui.model.KafkaCluster;
  7. import com.provectus.kafka.ui.util.ClusterUtil;
  8. import java.util.Collections;
  9. import java.util.List;
  10. import java.util.Map;
  11. import java.util.stream.Collectors;
  12. import lombok.RequiredArgsConstructor;
  13. import lombok.extern.log4j.Log4j2;
  14. import org.apache.kafka.clients.admin.ConfigEntry;
  15. import org.apache.kafka.common.Node;
  16. import org.springframework.stereotype.Service;
  17. import reactor.core.publisher.Flux;
  18. import reactor.core.publisher.Mono;
  19. @Service
  20. @RequiredArgsConstructor
  21. @Log4j2
  22. public class BrokerServiceImpl implements BrokerService {
  23. private final AdminClientService adminClientService;
  24. private Mono<Map<Integer, List<ConfigEntry>>> loadBrokersConfig(
  25. KafkaCluster cluster, List<Integer> brokersIds) {
  26. return adminClientService.get(cluster)
  27. .flatMap(ac -> ac.loadBrokersConfig(brokersIds));
  28. }
  29. private Mono<List<ConfigEntry>> loadBrokersConfig(
  30. KafkaCluster cluster, Integer brokerId) {
  31. return loadBrokersConfig(cluster, Collections.singletonList(brokerId))
  32. .map(map -> map.values().stream()
  33. .findFirst()
  34. .orElseThrow(() -> new IllegalEntityStateException(
  35. String.format("Config for broker %s not found", brokerId)))
  36. );
  37. }
  38. @Override
  39. public Mono<Map<String, InternalBrokerConfig>> getBrokerConfigMap(KafkaCluster cluster,
  40. Integer brokerId) {
  41. return loadBrokersConfig(cluster, brokerId)
  42. .map(list -> list.stream()
  43. .collect(Collectors.toMap(
  44. ConfigEntry::name,
  45. ClusterUtil::mapToInternalBrokerConfig)));
  46. }
  47. @Override
  48. public Flux<InternalBrokerConfig> getBrokersConfig(KafkaCluster cluster, Integer brokerId) {
  49. if (!cluster.getBrokers().contains(brokerId)) {
  50. return Flux.error(
  51. new NotFoundException(String.format("Broker with id %s not found", brokerId)));
  52. }
  53. return loadBrokersConfig(cluster, brokerId)
  54. .map(list -> list.stream()
  55. .map(ClusterUtil::mapToInternalBrokerConfig)
  56. .collect(Collectors.toList()))
  57. .flatMapMany(Flux::fromIterable);
  58. }
  59. @Override
  60. public Flux<BrokerDTO> getBrokers(KafkaCluster cluster) {
  61. return adminClientService
  62. .get(cluster)
  63. .flatMap(ReactiveAdminClient::describeCluster)
  64. .map(description -> description.getNodes().stream()
  65. .map(node -> {
  66. BrokerDTO broker = new BrokerDTO();
  67. broker.setId(node.id());
  68. broker.setHost(node.host());
  69. return broker;
  70. }).collect(Collectors.toList()))
  71. .flatMapMany(Flux::fromIterable);
  72. }
  73. @Override
  74. public Mono<Node> getController(KafkaCluster cluster) {
  75. return adminClientService
  76. .get(cluster)
  77. .flatMap(ReactiveAdminClient::describeCluster)
  78. .map(ReactiveAdminClient.ClusterDescription::getController);
  79. }
  80. }