BrokerService.java 6.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146
  1. package com.provectus.kafka.ui.service;
  2. import com.provectus.kafka.ui.exception.InvalidRequestApiException;
  3. import com.provectus.kafka.ui.exception.LogDirNotFoundApiException;
  4. import com.provectus.kafka.ui.exception.NotFoundException;
  5. import com.provectus.kafka.ui.exception.TopicOrPartitionNotFoundException;
  6. import com.provectus.kafka.ui.mapper.DescribeLogDirsMapper;
  7. import com.provectus.kafka.ui.model.BrokerLogdirUpdateDTO;
  8. import com.provectus.kafka.ui.model.BrokersLogdirsDTO;
  9. import com.provectus.kafka.ui.model.InternalBroker;
  10. import com.provectus.kafka.ui.model.InternalBrokerConfig;
  11. import com.provectus.kafka.ui.model.KafkaCluster;
  12. import com.provectus.kafka.ui.model.PartitionDistributionStats;
  13. import com.provectus.kafka.ui.service.metrics.RawMetric;
  14. import java.util.Collections;
  15. import java.util.HashMap;
  16. import java.util.List;
  17. import java.util.Map;
  18. import java.util.stream.Collectors;
  19. import javax.annotation.Nullable;
  20. import lombok.RequiredArgsConstructor;
  21. import lombok.extern.slf4j.Slf4j;
  22. import org.apache.kafka.clients.admin.ConfigEntry;
  23. import org.apache.kafka.common.Node;
  24. import org.apache.kafka.common.TopicPartitionReplica;
  25. import org.apache.kafka.common.errors.InvalidRequestException;
  26. import org.apache.kafka.common.errors.LogDirNotFoundException;
  27. import org.apache.kafka.common.errors.TimeoutException;
  28. import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
  29. import org.apache.kafka.common.requests.DescribeLogDirsResponse;
  30. import org.springframework.stereotype.Service;
  31. import reactor.core.publisher.Flux;
  32. import reactor.core.publisher.Mono;
  33. @Service
  34. @RequiredArgsConstructor
  35. @Slf4j
  36. public class BrokerService {
  37. private final StatisticsCache statisticsCache;
  38. private final AdminClientService adminClientService;
  39. private final DescribeLogDirsMapper describeLogDirsMapper;
  40. private Mono<Map<Integer, List<ConfigEntry>>> loadBrokersConfig(
  41. KafkaCluster cluster, List<Integer> brokersIds) {
  42. return adminClientService.get(cluster).flatMap(ac -> ac.loadBrokersConfig(brokersIds));
  43. }
  44. private Mono<List<ConfigEntry>> loadBrokersConfig(
  45. KafkaCluster cluster, Integer brokerId) {
  46. return loadBrokersConfig(cluster, Collections.singletonList(brokerId))
  47. .map(map -> map.values().stream().findFirst().orElse(List.of()));
  48. }
  49. private Flux<InternalBrokerConfig> getBrokersConfig(KafkaCluster cluster, Integer brokerId) {
  50. if (statisticsCache.get(cluster).getClusterDescription().getNodes()
  51. .stream().noneMatch(node -> node.id() == brokerId)) {
  52. return Flux.error(
  53. new NotFoundException(String.format("Broker with id %s not found", brokerId)));
  54. }
  55. return loadBrokersConfig(cluster, brokerId)
  56. .map(list -> list.stream()
  57. .map(InternalBrokerConfig::from)
  58. .collect(Collectors.toList()))
  59. .flatMapMany(Flux::fromIterable);
  60. }
  61. public Flux<InternalBroker> getBrokers(KafkaCluster cluster) {
  62. var stats = statisticsCache.get(cluster);
  63. var partitionsDistribution = PartitionDistributionStats.create(stats);
  64. return adminClientService
  65. .get(cluster)
  66. .flatMap(ReactiveAdminClient::describeCluster)
  67. .map(description -> description.getNodes().stream()
  68. .map(node -> new InternalBroker(node, partitionsDistribution, stats))
  69. .collect(Collectors.toList()))
  70. .flatMapMany(Flux::fromIterable);
  71. }
  72. public Mono<Void> updateBrokerLogDir(KafkaCluster cluster,
  73. Integer broker,
  74. BrokerLogdirUpdateDTO brokerLogDir) {
  75. return adminClientService.get(cluster)
  76. .flatMap(ac -> updateBrokerLogDir(ac, brokerLogDir, broker));
  77. }
  78. private Mono<Void> updateBrokerLogDir(ReactiveAdminClient admin,
  79. BrokerLogdirUpdateDTO b,
  80. Integer broker) {
  81. Map<TopicPartitionReplica, String> req = Map.of(
  82. new TopicPartitionReplica(b.getTopic(), b.getPartition(), broker),
  83. b.getLogDir());
  84. return admin.alterReplicaLogDirs(req)
  85. .onErrorResume(UnknownTopicOrPartitionException.class,
  86. e -> Mono.error(new TopicOrPartitionNotFoundException()))
  87. .onErrorResume(LogDirNotFoundException.class,
  88. e -> Mono.error(new LogDirNotFoundApiException()))
  89. .doOnError(e -> log.error("Unexpected error", e));
  90. }
  91. public Mono<Void> updateBrokerConfigByName(KafkaCluster cluster,
  92. Integer broker,
  93. String name,
  94. String value) {
  95. return adminClientService.get(cluster)
  96. .flatMap(ac -> ac.updateBrokerConfigByName(broker, name, value))
  97. .onErrorResume(InvalidRequestException.class,
  98. e -> Mono.error(new InvalidRequestApiException(e.getMessage())))
  99. .doOnError(e -> log.error("Unexpected error", e));
  100. }
  101. private Mono<Map<Integer, Map<String, DescribeLogDirsResponse.LogDirInfo>>> getClusterLogDirs(
  102. KafkaCluster cluster, List<Integer> reqBrokers) {
  103. return adminClientService.get(cluster)
  104. .flatMap(admin -> {
  105. List<Integer> brokers = statisticsCache.get(cluster).getClusterDescription().getNodes()
  106. .stream()
  107. .map(Node::id)
  108. .collect(Collectors.toList());
  109. if (!reqBrokers.isEmpty()) {
  110. brokers.retainAll(reqBrokers);
  111. }
  112. return admin.describeLogDirs(brokers);
  113. })
  114. .onErrorResume(TimeoutException.class, (TimeoutException e) -> {
  115. log.error("Error during fetching log dirs", e);
  116. return Mono.just(new HashMap<>());
  117. });
  118. }
  119. public Flux<BrokersLogdirsDTO> getAllBrokersLogdirs(KafkaCluster cluster, List<Integer> brokers) {
  120. return getClusterLogDirs(cluster, brokers)
  121. .map(describeLogDirsMapper::toBrokerLogDirsList)
  122. .flatMapMany(Flux::fromIterable);
  123. }
  124. public Flux<InternalBrokerConfig> getBrokerConfig(KafkaCluster cluster, Integer brokerId) {
  125. return getBrokersConfig(cluster, brokerId);
  126. }
  127. public Mono<List<RawMetric>> getBrokerMetrics(KafkaCluster cluster, Integer brokerId) {
  128. return Mono.justOrEmpty(statisticsCache.get(cluster).getMetrics().getPerBrokerMetrics().get(brokerId));
  129. }
  130. }