BrokerService.java 6.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145
  1. package com.provectus.kafka.ui.service;
  2. import com.provectus.kafka.ui.exception.InvalidRequestApiException;
  3. import com.provectus.kafka.ui.exception.LogDirNotFoundApiException;
  4. import com.provectus.kafka.ui.exception.NotFoundException;
  5. import com.provectus.kafka.ui.exception.TopicOrPartitionNotFoundException;
  6. import com.provectus.kafka.ui.mapper.DescribeLogDirsMapper;
  7. import com.provectus.kafka.ui.model.BrokerLogdirUpdateDTO;
  8. import com.provectus.kafka.ui.model.BrokersLogdirsDTO;
  9. import com.provectus.kafka.ui.model.InternalBroker;
  10. import com.provectus.kafka.ui.model.InternalBrokerConfig;
  11. import com.provectus.kafka.ui.model.KafkaCluster;
  12. import com.provectus.kafka.ui.model.PartitionDistributionStats;
  13. import com.provectus.kafka.ui.service.metrics.RawMetric;
  14. import java.util.Collections;
  15. import java.util.HashMap;
  16. import java.util.List;
  17. import java.util.Map;
  18. import java.util.stream.Collectors;
  19. import lombok.RequiredArgsConstructor;
  20. import lombok.extern.slf4j.Slf4j;
  21. import org.apache.kafka.clients.admin.ConfigEntry;
  22. import org.apache.kafka.common.Node;
  23. import org.apache.kafka.common.TopicPartitionReplica;
  24. import org.apache.kafka.common.errors.InvalidRequestException;
  25. import org.apache.kafka.common.errors.LogDirNotFoundException;
  26. import org.apache.kafka.common.errors.TimeoutException;
  27. import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
  28. import org.apache.kafka.common.requests.DescribeLogDirsResponse;
  29. import org.springframework.stereotype.Service;
  30. import reactor.core.publisher.Flux;
  31. import reactor.core.publisher.Mono;
  32. @Service
  33. @RequiredArgsConstructor
  34. @Slf4j
  35. public class BrokerService {
  36. private final StatisticsCache statisticsCache;
  37. private final AdminClientService adminClientService;
  38. private final DescribeLogDirsMapper describeLogDirsMapper;
  39. private Mono<Map<Integer, List<ConfigEntry>>> loadBrokersConfig(
  40. KafkaCluster cluster, List<Integer> brokersIds) {
  41. return adminClientService.get(cluster).flatMap(ac -> ac.loadBrokersConfig(brokersIds));
  42. }
  43. private Mono<List<ConfigEntry>> loadBrokersConfig(
  44. KafkaCluster cluster, Integer brokerId) {
  45. return loadBrokersConfig(cluster, Collections.singletonList(brokerId))
  46. .map(map -> map.values().stream().findFirst().orElse(List.of()));
  47. }
  48. private Flux<InternalBrokerConfig> getBrokersConfig(KafkaCluster cluster, Integer brokerId) {
  49. if (statisticsCache.get(cluster).getClusterDescription().getNodes()
  50. .stream().noneMatch(node -> node.id() == brokerId)) {
  51. return Flux.error(
  52. new NotFoundException(String.format("Broker with id %s not found", brokerId)));
  53. }
  54. return loadBrokersConfig(cluster, brokerId)
  55. .map(list -> list.stream()
  56. .map(InternalBrokerConfig::from)
  57. .collect(Collectors.toList()))
  58. .flatMapMany(Flux::fromIterable);
  59. }
  60. public Flux<InternalBroker> getBrokers(KafkaCluster cluster) {
  61. var stats = statisticsCache.get(cluster);
  62. var partitionsDistribution = PartitionDistributionStats.create(stats);
  63. return adminClientService
  64. .get(cluster)
  65. .flatMap(ReactiveAdminClient::describeCluster)
  66. .map(description -> description.getNodes().stream()
  67. .map(node -> new InternalBroker(node, partitionsDistribution, stats))
  68. .collect(Collectors.toList()))
  69. .flatMapMany(Flux::fromIterable);
  70. }
  71. public Mono<Void> updateBrokerLogDir(KafkaCluster cluster,
  72. Integer broker,
  73. BrokerLogdirUpdateDTO brokerLogDir) {
  74. return adminClientService.get(cluster)
  75. .flatMap(ac -> updateBrokerLogDir(ac, brokerLogDir, broker));
  76. }
  77. private Mono<Void> updateBrokerLogDir(ReactiveAdminClient admin,
  78. BrokerLogdirUpdateDTO b,
  79. Integer broker) {
  80. Map<TopicPartitionReplica, String> req = Map.of(
  81. new TopicPartitionReplica(b.getTopic(), b.getPartition(), broker),
  82. b.getLogDir());
  83. return admin.alterReplicaLogDirs(req)
  84. .onErrorResume(UnknownTopicOrPartitionException.class,
  85. e -> Mono.error(new TopicOrPartitionNotFoundException()))
  86. .onErrorResume(LogDirNotFoundException.class,
  87. e -> Mono.error(new LogDirNotFoundApiException()))
  88. .doOnError(e -> log.error("Unexpected error", e));
  89. }
  90. public Mono<Void> updateBrokerConfigByName(KafkaCluster cluster,
  91. Integer broker,
  92. String name,
  93. String value) {
  94. return adminClientService.get(cluster)
  95. .flatMap(ac -> ac.updateBrokerConfigByName(broker, name, value))
  96. .onErrorResume(InvalidRequestException.class,
  97. e -> Mono.error(new InvalidRequestApiException(e.getMessage())))
  98. .doOnError(e -> log.error("Unexpected error", e));
  99. }
  100. private Mono<Map<Integer, Map<String, DescribeLogDirsResponse.LogDirInfo>>> getClusterLogDirs(
  101. KafkaCluster cluster, List<Integer> reqBrokers) {
  102. return adminClientService.get(cluster)
  103. .flatMap(admin -> {
  104. List<Integer> brokers = statisticsCache.get(cluster).getClusterDescription().getNodes()
  105. .stream()
  106. .map(Node::id)
  107. .collect(Collectors.toList());
  108. if (reqBrokers != null && !reqBrokers.isEmpty()) {
  109. brokers.retainAll(reqBrokers);
  110. }
  111. return admin.describeLogDirs(brokers);
  112. })
  113. .onErrorResume(TimeoutException.class, (TimeoutException e) -> {
  114. log.error("Error during fetching log dirs", e);
  115. return Mono.just(new HashMap<>());
  116. });
  117. }
  118. public Flux<BrokersLogdirsDTO> getAllBrokersLogdirs(KafkaCluster cluster, List<Integer> brokers) {
  119. return getClusterLogDirs(cluster, brokers)
  120. .map(describeLogDirsMapper::toBrokerLogDirsList)
  121. .flatMapMany(Flux::fromIterable);
  122. }
  123. public Flux<InternalBrokerConfig> getBrokerConfig(KafkaCluster cluster, Integer brokerId) {
  124. return getBrokersConfig(cluster, brokerId);
  125. }
  126. public Mono<List<RawMetric>> getBrokerMetrics(KafkaCluster cluster, Integer brokerId) {
  127. return Mono.justOrEmpty(statisticsCache.get(cluster).getMetrics().getPerBrokerMetrics().get(brokerId));
  128. }
  129. }