BrokerService.java 6.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163
  1. package com.provectus.kafka.ui.service;
  2. import com.provectus.kafka.ui.exception.InvalidRequestApiException;
  3. import com.provectus.kafka.ui.exception.LogDirNotFoundApiException;
  4. import com.provectus.kafka.ui.exception.NotFoundException;
  5. import com.provectus.kafka.ui.exception.TopicOrPartitionNotFoundException;
  6. import com.provectus.kafka.ui.mapper.ClusterMapper;
  7. import com.provectus.kafka.ui.mapper.DescribeLogDirsMapper;
  8. import com.provectus.kafka.ui.model.BrokerConfigDTO;
  9. import com.provectus.kafka.ui.model.BrokerDTO;
  10. import com.provectus.kafka.ui.model.BrokerLogdirUpdateDTO;
  11. import com.provectus.kafka.ui.model.BrokerMetricsDTO;
  12. import com.provectus.kafka.ui.model.BrokersLogdirsDTO;
  13. import com.provectus.kafka.ui.model.InternalBrokerConfig;
  14. import com.provectus.kafka.ui.model.KafkaCluster;
  15. import java.util.Collections;
  16. import java.util.HashMap;
  17. import java.util.List;
  18. import java.util.Map;
  19. import java.util.stream.Collectors;
  20. import lombok.RequiredArgsConstructor;
  21. import lombok.extern.slf4j.Slf4j;
  22. import org.apache.kafka.clients.admin.ConfigEntry;
  23. import org.apache.kafka.common.Node;
  24. import org.apache.kafka.common.TopicPartitionReplica;
  25. import org.apache.kafka.common.errors.InvalidRequestException;
  26. import org.apache.kafka.common.errors.LogDirNotFoundException;
  27. import org.apache.kafka.common.errors.TimeoutException;
  28. import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
  29. import org.apache.kafka.common.requests.DescribeLogDirsResponse;
  30. import org.springframework.stereotype.Service;
  31. import reactor.core.publisher.Flux;
  32. import reactor.core.publisher.Mono;
  33. @Service
  34. @RequiredArgsConstructor
  35. @Slf4j
  36. public class BrokerService {
  37. private final MetricsCache metricsCache;
  38. private final AdminClientService adminClientService;
  39. private final DescribeLogDirsMapper describeLogDirsMapper;
  40. private final ClusterMapper clusterMapper;
  41. private Mono<Map<Integer, List<ConfigEntry>>> loadBrokersConfig(
  42. KafkaCluster cluster, List<Integer> brokersIds) {
  43. return adminClientService.get(cluster).flatMap(ac -> ac.loadBrokersConfig(brokersIds));
  44. }
  45. private Mono<List<ConfigEntry>> loadBrokersConfig(
  46. KafkaCluster cluster, Integer brokerId) {
  47. return loadBrokersConfig(cluster, Collections.singletonList(brokerId))
  48. .map(map -> map.values().stream()
  49. .findFirst()
  50. .orElseThrow(() -> new NotFoundException(
  51. String.format("Config for broker %s not found", brokerId))));
  52. }
  53. private Flux<InternalBrokerConfig> getBrokersConfig(KafkaCluster cluster, Integer brokerId) {
  54. if (metricsCache.get(cluster).getClusterDescription().getNodes()
  55. .stream().noneMatch(node -> node.id() == brokerId)) {
  56. return Flux.error(
  57. new NotFoundException(String.format("Broker with id %s not found", brokerId)));
  58. }
  59. return loadBrokersConfig(cluster, brokerId)
  60. .map(list -> list.stream()
  61. .map(InternalBrokerConfig::from)
  62. .collect(Collectors.toList()))
  63. .flatMapMany(Flux::fromIterable);
  64. }
  65. public Flux<BrokerDTO> getBrokers(KafkaCluster cluster) {
  66. return adminClientService
  67. .get(cluster)
  68. .flatMap(ReactiveAdminClient::describeCluster)
  69. .map(description -> description.getNodes().stream()
  70. .map(node -> {
  71. BrokerDTO broker = new BrokerDTO();
  72. broker.setId(node.id());
  73. broker.setHost(node.host());
  74. broker.setPort(node.port());
  75. return broker;
  76. }).collect(Collectors.toList()))
  77. .flatMapMany(Flux::fromIterable);
  78. }
  79. public Mono<Node> getController(KafkaCluster cluster) {
  80. return adminClientService
  81. .get(cluster)
  82. .flatMap(ReactiveAdminClient::describeCluster)
  83. .map(ReactiveAdminClient.ClusterDescription::getController);
  84. }
  85. public Mono<Void> updateBrokerLogDir(KafkaCluster cluster,
  86. Integer broker,
  87. BrokerLogdirUpdateDTO brokerLogDir) {
  88. return adminClientService.get(cluster)
  89. .flatMap(ac -> updateBrokerLogDir(ac, brokerLogDir, broker));
  90. }
  91. private Mono<Void> updateBrokerLogDir(ReactiveAdminClient admin,
  92. BrokerLogdirUpdateDTO b,
  93. Integer broker) {
  94. Map<TopicPartitionReplica, String> req = Map.of(
  95. new TopicPartitionReplica(b.getTopic(), b.getPartition(), broker),
  96. b.getLogDir());
  97. return admin.alterReplicaLogDirs(req)
  98. .onErrorResume(UnknownTopicOrPartitionException.class,
  99. e -> Mono.error(new TopicOrPartitionNotFoundException()))
  100. .onErrorResume(LogDirNotFoundException.class,
  101. e -> Mono.error(new LogDirNotFoundApiException()))
  102. .doOnError(e -> log.error("Unexpected error", e));
  103. }
  104. public Mono<Void> updateBrokerConfigByName(KafkaCluster cluster,
  105. Integer broker,
  106. String name,
  107. String value) {
  108. return adminClientService.get(cluster)
  109. .flatMap(ac -> ac.updateBrokerConfigByName(broker, name, value))
  110. .onErrorResume(InvalidRequestException.class,
  111. e -> Mono.error(new InvalidRequestApiException(e.getMessage())))
  112. .doOnError(e -> log.error("Unexpected error", e));
  113. }
  114. private Mono<Map<Integer, Map<String, DescribeLogDirsResponse.LogDirInfo>>> getClusterLogDirs(
  115. KafkaCluster cluster, List<Integer> reqBrokers) {
  116. return adminClientService.get(cluster)
  117. .flatMap(admin -> {
  118. List<Integer> brokers = metricsCache.get(cluster).getClusterDescription().getNodes()
  119. .stream()
  120. .map(Node::id)
  121. .collect(Collectors.toList());
  122. if (reqBrokers != null && !reqBrokers.isEmpty()) {
  123. brokers.retainAll(reqBrokers);
  124. }
  125. return admin.describeLogDirs(brokers);
  126. })
  127. .onErrorResume(TimeoutException.class, (TimeoutException e) -> {
  128. log.error("Error during fetching log dirs", e);
  129. return Mono.just(new HashMap<>());
  130. });
  131. }
  132. public Flux<BrokersLogdirsDTO> getAllBrokersLogdirs(KafkaCluster cluster, List<Integer> brokers) {
  133. return getClusterLogDirs(cluster, brokers)
  134. .map(describeLogDirsMapper::toBrokerLogDirsList)
  135. .flatMapMany(Flux::fromIterable);
  136. }
  137. public Flux<BrokerConfigDTO> getBrokerConfig(KafkaCluster cluster, Integer brokerId) {
  138. return getBrokersConfig(cluster, brokerId)
  139. .map(clusterMapper::toBrokerConfig);
  140. }
  141. public Mono<BrokerMetricsDTO> getBrokerMetrics(KafkaCluster cluster, Integer brokerId) {
  142. return Mono.justOrEmpty(
  143. metricsCache.get(cluster).getJmxMetrics().getInternalBrokerMetrics().get(brokerId))
  144. .map(clusterMapper::toBrokerMetrics);
  145. }
  146. }