TopicsService.java 23 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532
  1. package com.provectus.kafka.ui.service;
  2. import static java.util.stream.Collectors.toList;
  3. import static java.util.stream.Collectors.toMap;
  4. import com.google.common.annotations.VisibleForTesting;
  5. import com.provectus.kafka.ui.exception.TopicMetadataException;
  6. import com.provectus.kafka.ui.exception.TopicNotFoundException;
  7. import com.provectus.kafka.ui.exception.TopicRecreationException;
  8. import com.provectus.kafka.ui.exception.ValidationException;
  9. import com.provectus.kafka.ui.mapper.ClusterMapper;
  10. import com.provectus.kafka.ui.model.Feature;
  11. import com.provectus.kafka.ui.model.InternalLogDirStats;
  12. import com.provectus.kafka.ui.model.InternalPartition;
  13. import com.provectus.kafka.ui.model.InternalPartitionsOffsets;
  14. import com.provectus.kafka.ui.model.InternalReplica;
  15. import com.provectus.kafka.ui.model.InternalTopic;
  16. import com.provectus.kafka.ui.model.InternalTopicConfig;
  17. import com.provectus.kafka.ui.model.KafkaCluster;
  18. import com.provectus.kafka.ui.model.PartitionsIncreaseDTO;
  19. import com.provectus.kafka.ui.model.PartitionsIncreaseResponseDTO;
  20. import com.provectus.kafka.ui.model.ReplicationFactorChangeDTO;
  21. import com.provectus.kafka.ui.model.ReplicationFactorChangeResponseDTO;
  22. import com.provectus.kafka.ui.model.SortOrderDTO;
  23. import com.provectus.kafka.ui.model.TopicColumnsToSortDTO;
  24. import com.provectus.kafka.ui.model.TopicConfigDTO;
  25. import com.provectus.kafka.ui.model.TopicCreationDTO;
  26. import com.provectus.kafka.ui.model.TopicDTO;
  27. import com.provectus.kafka.ui.model.TopicDetailsDTO;
  28. import com.provectus.kafka.ui.model.TopicMessageSchemaDTO;
  29. import com.provectus.kafka.ui.model.TopicUpdateDTO;
  30. import com.provectus.kafka.ui.model.TopicsResponseDTO;
  31. import com.provectus.kafka.ui.serde.DeserializationService;
  32. import com.provectus.kafka.ui.util.JmxClusterUtil;
  33. import java.time.Duration;
  34. import java.util.Collection;
  35. import java.util.Collections;
  36. import java.util.Comparator;
  37. import java.util.List;
  38. import java.util.Map;
  39. import java.util.Optional;
  40. import java.util.function.Function;
  41. import java.util.function.Predicate;
  42. import java.util.stream.Collectors;
  43. import lombok.RequiredArgsConstructor;
  44. import org.apache.commons.lang3.StringUtils;
  45. import org.apache.kafka.clients.admin.ConfigEntry;
  46. import org.apache.kafka.clients.admin.NewPartitionReassignment;
  47. import org.apache.kafka.clients.admin.NewPartitions;
  48. import org.apache.kafka.clients.admin.OffsetSpec;
  49. import org.apache.kafka.clients.admin.TopicDescription;
  50. import org.apache.kafka.common.Node;
  51. import org.apache.kafka.common.TopicPartition;
  52. import org.apache.kafka.common.errors.TopicExistsException;
  53. import org.springframework.beans.factory.annotation.Value;
  54. import org.springframework.stereotype.Service;
  55. import reactor.core.publisher.Mono;
  56. import reactor.util.retry.Retry;
  57. @Service
  58. @RequiredArgsConstructor
  59. public class TopicsService {
  60. private static final Integer DEFAULT_PAGE_SIZE = 25;
  61. private final AdminClientService adminClientService;
  62. private final ClusterMapper clusterMapper;
  63. private final DeserializationService deserializationService;
  64. private final MetricsCache metricsCache;
  65. @Value("${topic.recreate.maxRetries:15}")
  66. private int recreateMaxRetries;
  67. @Value("${topic.recreate.delay.seconds:1}")
  68. private int recreateDelayInSeconds;
  69. public Mono<TopicsResponseDTO> getTopics(KafkaCluster cluster,
  70. Optional<Integer> pageNum,
  71. Optional<Integer> nullablePerPage,
  72. Optional<Boolean> showInternal,
  73. Optional<String> search,
  74. Optional<TopicColumnsToSortDTO> sortBy,
  75. Optional<SortOrderDTO> sortOrder) {
  76. return adminClientService.get(cluster).flatMap(ac ->
  77. new Pagination(ac, metricsCache.get(cluster))
  78. .getPage(pageNum, nullablePerPage, showInternal, search, sortBy, sortOrder)
  79. .flatMap(page ->
  80. loadTopics(cluster, page.getTopics())
  81. .map(topics ->
  82. new TopicsResponseDTO()
  83. .topics(topics.stream().map(clusterMapper::toTopic).collect(toList()))
  84. .pageCount(page.getTotalPages()))));
  85. }
  86. private Mono<List<InternalTopic>> loadTopics(KafkaCluster c, List<String> topics) {
  87. if (topics.isEmpty()) {
  88. return Mono.just(List.of());
  89. }
  90. return adminClientService.get(c)
  91. .flatMap(ac ->
  92. ac.describeTopics(topics).zipWith(ac.getTopicsConfig(topics),
  93. (descriptions, configs) -> {
  94. metricsCache.update(c, descriptions, configs);
  95. return getPartitionOffsets(descriptions, ac).map(offsets -> {
  96. var metrics = metricsCache.get(c);
  97. return createList(
  98. topics,
  99. descriptions,
  100. configs,
  101. offsets,
  102. metrics.getJmxMetrics(),
  103. metrics.getLogDirInfo()
  104. );
  105. });
  106. })).flatMap(Function.identity());
  107. }
  108. private Mono<InternalTopic> loadTopic(KafkaCluster c, String topicName) {
  109. return loadTopics(c, List.of(topicName))
  110. .map(lst -> lst.stream().findFirst().orElseThrow(TopicNotFoundException::new));
  111. }
  112. private List<InternalTopic> createList(List<String> orderedNames,
  113. Map<String, TopicDescription> descriptions,
  114. Map<String, List<ConfigEntry>> configs,
  115. InternalPartitionsOffsets partitionsOffsets,
  116. JmxClusterUtil.JmxMetrics jmxMetrics,
  117. InternalLogDirStats logDirInfo) {
  118. return orderedNames.stream()
  119. .filter(descriptions::containsKey)
  120. .map(t -> InternalTopic.from(
  121. descriptions.get(t),
  122. configs.getOrDefault(t, List.of()),
  123. partitionsOffsets,
  124. jmxMetrics,
  125. logDirInfo
  126. ))
  127. .collect(toList());
  128. }
  129. private Mono<InternalPartitionsOffsets> getPartitionOffsets(Map<String, TopicDescription>
  130. descriptions,
  131. ReactiveAdminClient ac) {
  132. var topicPartitions = descriptions.values().stream()
  133. .flatMap(desc ->
  134. desc.partitions().stream().map(p -> new TopicPartition(desc.name(), p.partition())))
  135. .collect(toList());
  136. return ac.listOffsets(topicPartitions, OffsetSpec.earliest())
  137. .zipWith(ac.listOffsets(topicPartitions, OffsetSpec.latest()),
  138. (earliest, latest) ->
  139. topicPartitions.stream()
  140. .filter(tp -> earliest.containsKey(tp) && latest.containsKey(tp))
  141. .map(tp ->
  142. Map.entry(tp,
  143. new InternalPartitionsOffsets.Offsets(
  144. earliest.get(tp), latest.get(tp))))
  145. .collect(toMap(Map.Entry::getKey, Map.Entry::getValue)))
  146. .map(InternalPartitionsOffsets::new);
  147. }
  148. public Mono<TopicDetailsDTO> getTopicDetails(KafkaCluster cluster, String topicName) {
  149. return loadTopic(cluster, topicName).map(clusterMapper::toTopicDetails);
  150. }
  151. public Mono<List<TopicConfigDTO>> getTopicConfigs(KafkaCluster cluster, String topicName) {
  152. return adminClientService.get(cluster)
  153. .flatMap(ac -> ac.getTopicsConfig(List.of(topicName)))
  154. .map(m -> m.values().stream().findFirst().orElseThrow(TopicNotFoundException::new))
  155. .map(lst -> lst.stream()
  156. .map(InternalTopicConfig::from)
  157. .map(clusterMapper::toTopicConfig)
  158. .collect(toList()));
  159. }
  160. private Mono<InternalTopic> createTopic(KafkaCluster c, ReactiveAdminClient adminClient,
  161. Mono<TopicCreationDTO> topicCreation) {
  162. return topicCreation.flatMap(topicData ->
  163. adminClient.createTopic(
  164. topicData.getName(),
  165. topicData.getPartitions(),
  166. topicData.getReplicationFactor().shortValue(),
  167. topicData.getConfigs()
  168. ).thenReturn(topicData)
  169. )
  170. .onErrorResume(t -> Mono.error(new TopicMetadataException(t.getMessage())))
  171. .flatMap(topicData -> loadTopic(c, topicData.getName()));
  172. }
  173. public Mono<TopicDTO> createTopic(KafkaCluster cluster, Mono<TopicCreationDTO> topicCreation) {
  174. return adminClientService.get(cluster)
  175. .flatMap(ac -> createTopic(cluster, ac, topicCreation))
  176. .map(clusterMapper::toTopic);
  177. }
  178. public Mono<TopicDTO> recreateTopic(KafkaCluster cluster, String topicName) {
  179. return loadTopic(cluster, topicName)
  180. .flatMap(t -> deleteTopic(cluster, topicName)
  181. .thenReturn(t).delayElement(Duration.ofSeconds(recreateDelayInSeconds))
  182. .flatMap(topic -> adminClientService.get(cluster).flatMap(ac -> ac.createTopic(topic.getName(),
  183. topic.getPartitionCount(),
  184. (short) topic.getReplicationFactor(),
  185. topic.getTopicConfigs()
  186. .stream()
  187. .collect(Collectors
  188. .toMap(InternalTopicConfig::getName,
  189. InternalTopicConfig::getValue)))
  190. .thenReturn(topicName))
  191. .retryWhen(Retry.fixedDelay(recreateMaxRetries,
  192. Duration.ofSeconds(recreateDelayInSeconds))
  193. .filter(TopicExistsException.class::isInstance)
  194. .onRetryExhaustedThrow((a, b) ->
  195. new TopicRecreationException(topicName,
  196. recreateMaxRetries * recreateDelayInSeconds)))
  197. .flatMap(a -> loadTopic(cluster, topicName)).map(clusterMapper::toTopic)
  198. )
  199. );
  200. }
  201. private Mono<InternalTopic> updateTopic(KafkaCluster cluster,
  202. String topicName,
  203. TopicUpdateDTO topicUpdate) {
  204. return adminClientService.get(cluster)
  205. .flatMap(ac ->
  206. ac.updateTopicConfig(topicName, topicUpdate.getConfigs())
  207. .then(loadTopic(cluster, topicName)));
  208. }
  209. public Mono<TopicDTO> updateTopic(KafkaCluster cl, String topicName,
  210. Mono<TopicUpdateDTO> topicUpdate) {
  211. return topicUpdate
  212. .flatMap(t -> updateTopic(cl, topicName, t))
  213. .map(clusterMapper::toTopic);
  214. }
  215. private Mono<InternalTopic> changeReplicationFactor(
  216. KafkaCluster cluster,
  217. ReactiveAdminClient adminClient,
  218. String topicName,
  219. Map<TopicPartition, Optional<NewPartitionReassignment>> reassignments
  220. ) {
  221. return adminClient.alterPartitionReassignments(reassignments)
  222. .then(loadTopic(cluster, topicName));
  223. }
  224. /**
  225. * Change topic replication factor, works on brokers versions 5.4.x and higher
  226. */
  227. public Mono<ReplicationFactorChangeResponseDTO> changeReplicationFactor(
  228. KafkaCluster cluster,
  229. String topicName,
  230. ReplicationFactorChangeDTO replicationFactorChange) {
  231. return loadTopic(cluster, topicName).flatMap(topic -> adminClientService.get(cluster)
  232. .flatMap(ac -> {
  233. Integer actual = topic.getReplicationFactor();
  234. Integer requested = replicationFactorChange.getTotalReplicationFactor();
  235. Integer brokersCount = metricsCache.get(cluster).getClusterDescription()
  236. .getNodes().size();
  237. if (requested.equals(actual)) {
  238. return Mono.error(
  239. new ValidationException(
  240. String.format("Topic already has replicationFactor %s.", actual)));
  241. }
  242. if (requested > brokersCount) {
  243. return Mono.error(
  244. new ValidationException(
  245. String.format("Requested replication factor %s more than brokers count %s.",
  246. requested, brokersCount)));
  247. }
  248. return changeReplicationFactor(cluster, ac, topicName,
  249. getPartitionsReassignments(cluster, topic,
  250. replicationFactorChange));
  251. })
  252. .map(t -> new ReplicationFactorChangeResponseDTO()
  253. .topicName(t.getName())
  254. .totalReplicationFactor(t.getReplicationFactor())));
  255. }
  256. private Map<TopicPartition, Optional<NewPartitionReassignment>> getPartitionsReassignments(
  257. KafkaCluster cluster,
  258. InternalTopic topic,
  259. ReplicationFactorChangeDTO replicationFactorChange) {
  260. // Current assignment map (Partition number -> List of brokers)
  261. Map<Integer, List<Integer>> currentAssignment = getCurrentAssignment(topic);
  262. // Brokers map (Broker id -> count)
  263. Map<Integer, Integer> brokersUsage = getBrokersMap(cluster, currentAssignment);
  264. int currentReplicationFactor = topic.getReplicationFactor();
  265. // If we should to increase Replication factor
  266. if (replicationFactorChange.getTotalReplicationFactor() > currentReplicationFactor) {
  267. // For each partition
  268. for (var assignmentList : currentAssignment.values()) {
  269. // Get brokers list sorted by usage
  270. var brokers = brokersUsage.entrySet().stream()
  271. .sorted(Map.Entry.comparingByValue())
  272. .map(Map.Entry::getKey)
  273. .collect(toList());
  274. // Iterate brokers and try to add them in assignment
  275. // while partition replicas count != requested replication factor
  276. for (Integer broker : brokers) {
  277. if (!assignmentList.contains(broker)) {
  278. assignmentList.add(broker);
  279. brokersUsage.merge(broker, 1, Integer::sum);
  280. }
  281. if (assignmentList.size() == replicationFactorChange.getTotalReplicationFactor()) {
  282. break;
  283. }
  284. }
  285. if (assignmentList.size() != replicationFactorChange.getTotalReplicationFactor()) {
  286. throw new ValidationException("Something went wrong during adding replicas");
  287. }
  288. }
  289. // If we should to decrease Replication factor
  290. } else if (replicationFactorChange.getTotalReplicationFactor() < currentReplicationFactor) {
  291. for (Map.Entry<Integer, List<Integer>> assignmentEntry : currentAssignment.entrySet()) {
  292. var partition = assignmentEntry.getKey();
  293. var brokers = assignmentEntry.getValue();
  294. // Get brokers list sorted by usage in reverse order
  295. var brokersUsageList = brokersUsage.entrySet().stream()
  296. .sorted(Map.Entry.comparingByValue(Comparator.reverseOrder()))
  297. .map(Map.Entry::getKey)
  298. .collect(toList());
  299. // Iterate brokers and try to remove them from assignment
  300. // while partition replicas count != requested replication factor
  301. for (Integer broker : brokersUsageList) {
  302. // Check is the broker the leader of partition
  303. if (!topic.getPartitions().get(partition).getLeader()
  304. .equals(broker)) {
  305. brokers.remove(broker);
  306. brokersUsage.merge(broker, -1, Integer::sum);
  307. }
  308. if (brokers.size() == replicationFactorChange.getTotalReplicationFactor()) {
  309. break;
  310. }
  311. }
  312. if (brokers.size() != replicationFactorChange.getTotalReplicationFactor()) {
  313. throw new ValidationException("Something went wrong during removing replicas");
  314. }
  315. }
  316. } else {
  317. throw new ValidationException("Replication factor already equals requested");
  318. }
  319. // Return result map
  320. return currentAssignment.entrySet().stream().collect(toMap(
  321. e -> new TopicPartition(topic.getName(), e.getKey()),
  322. e -> Optional.of(new NewPartitionReassignment(e.getValue()))
  323. ));
  324. }
  325. private Map<Integer, List<Integer>> getCurrentAssignment(InternalTopic topic) {
  326. return topic.getPartitions().values().stream()
  327. .collect(toMap(
  328. InternalPartition::getPartition,
  329. p -> p.getReplicas().stream()
  330. .map(InternalReplica::getBroker)
  331. .collect(toList())
  332. ));
  333. }
  334. private Map<Integer, Integer> getBrokersMap(KafkaCluster cluster,
  335. Map<Integer, List<Integer>> currentAssignment) {
  336. Map<Integer, Integer> result = metricsCache.get(cluster).getClusterDescription().getNodes()
  337. .stream()
  338. .map(Node::id)
  339. .collect(toMap(
  340. c -> c,
  341. c -> 0
  342. ));
  343. currentAssignment.values().forEach(brokers -> brokers
  344. .forEach(broker -> result.put(broker, result.get(broker) + 1)));
  345. return result;
  346. }
  347. public Mono<PartitionsIncreaseResponseDTO> increaseTopicPartitions(
  348. KafkaCluster cluster,
  349. String topicName,
  350. PartitionsIncreaseDTO partitionsIncrease) {
  351. return loadTopic(cluster, topicName).flatMap(topic ->
  352. adminClientService.get(cluster).flatMap(ac -> {
  353. Integer actualCount = topic.getPartitionCount();
  354. Integer requestedCount = partitionsIncrease.getTotalPartitionsCount();
  355. if (requestedCount < actualCount) {
  356. return Mono.error(
  357. new ValidationException(String.format(
  358. "Topic currently has %s partitions, which is higher than the requested %s.",
  359. actualCount, requestedCount)));
  360. }
  361. if (requestedCount.equals(actualCount)) {
  362. return Mono.error(
  363. new ValidationException(
  364. String.format("Topic already has %s partitions.", actualCount)));
  365. }
  366. Map<String, NewPartitions> newPartitionsMap = Collections.singletonMap(
  367. topicName,
  368. NewPartitions.increaseTo(partitionsIncrease.getTotalPartitionsCount())
  369. );
  370. return ac.createPartitions(newPartitionsMap)
  371. .then(loadTopic(cluster, topicName));
  372. }).map(t -> new PartitionsIncreaseResponseDTO()
  373. .topicName(t.getName())
  374. .totalPartitionsCount(t.getPartitionCount())
  375. )
  376. );
  377. }
  378. public Mono<Void> deleteTopic(KafkaCluster cluster, String topicName) {
  379. if (metricsCache.get(cluster).getFeatures().contains(Feature.TOPIC_DELETION)) {
  380. return adminClientService.get(cluster).flatMap(c -> c.deleteTopic(topicName))
  381. .doOnSuccess(t -> metricsCache.onTopicDelete(cluster, topicName));
  382. } else {
  383. return Mono.error(new ValidationException("Topic deletion restricted"));
  384. }
  385. }
  386. public TopicMessageSchemaDTO getTopicSchema(KafkaCluster cluster, String topicName) {
  387. if (!metricsCache.get(cluster).getTopicDescriptions().containsKey(topicName)) {
  388. throw new TopicNotFoundException();
  389. }
  390. return deserializationService
  391. .getRecordDeserializerForCluster(cluster)
  392. .getTopicSchema(topicName);
  393. }
  394. public Mono<TopicDTO> cloneTopic(
  395. KafkaCluster cluster, String topicName, String newTopicName) {
  396. return loadTopic(cluster, topicName).flatMap(topic ->
  397. adminClientService.get(cluster).flatMap(ac -> ac.createTopic(newTopicName,
  398. topic.getPartitionCount(),
  399. (short) topic.getReplicationFactor(),
  400. topic.getTopicConfigs()
  401. .stream()
  402. .collect(Collectors.toMap(InternalTopicConfig::getName, InternalTopicConfig::getValue)))
  403. ).thenReturn(newTopicName).flatMap(a -> loadTopic(cluster, newTopicName)).map(clusterMapper::toTopic));
  404. }
  405. @VisibleForTesting
  406. @lombok.Value
  407. static class Pagination {
  408. ReactiveAdminClient adminClient;
  409. MetricsCache.Metrics metrics;
  410. @lombok.Value
  411. static class Page {
  412. List<String> topics;
  413. int totalPages;
  414. }
  415. Mono<Page> getPage(
  416. Optional<Integer> pageNum,
  417. Optional<Integer> nullablePerPage,
  418. Optional<Boolean> showInternal,
  419. Optional<String> search,
  420. Optional<TopicColumnsToSortDTO> sortBy,
  421. Optional<SortOrderDTO> sortOrder) {
  422. return geTopicsForPagination()
  423. .map(paginatingTopics -> {
  424. Predicate<Integer> positiveInt = i -> i > 0;
  425. int perPage = nullablePerPage.filter(positiveInt).orElse(DEFAULT_PAGE_SIZE);
  426. var topicsToSkip = (pageNum.filter(positiveInt).orElse(1) - 1) * perPage;
  427. var comparator = sortOrder.isEmpty() || !sortOrder.get().equals(SortOrderDTO.DESC)
  428. ? getComparatorForTopic(sortBy) : getComparatorForTopic(sortBy).reversed();
  429. List<InternalTopic> topics = paginatingTopics.stream()
  430. .filter(topic -> !topic.isInternal()
  431. || showInternal.map(i -> topic.isInternal() == i).orElse(true))
  432. .filter(topic ->
  433. search
  434. .map(s -> StringUtils.containsIgnoreCase(topic.getName(), s))
  435. .orElse(true))
  436. .sorted(comparator)
  437. .collect(toList());
  438. var totalPages = (topics.size() / perPage)
  439. + (topics.size() % perPage == 0 ? 0 : 1);
  440. List<String> topicsToRender = topics.stream()
  441. .skip(topicsToSkip)
  442. .limit(perPage)
  443. .map(InternalTopic::getName)
  444. .collect(toList());
  445. return new Page(topicsToRender, totalPages);
  446. });
  447. }
  448. private Comparator<InternalTopic> getComparatorForTopic(
  449. Optional<TopicColumnsToSortDTO> sortBy) {
  450. var defaultComparator = Comparator.comparing(InternalTopic::getName);
  451. if (sortBy.isEmpty()) {
  452. return defaultComparator;
  453. }
  454. switch (sortBy.get()) {
  455. case TOTAL_PARTITIONS:
  456. return Comparator.comparing(InternalTopic::getPartitionCount);
  457. case OUT_OF_SYNC_REPLICAS:
  458. return Comparator.comparing(t -> t.getReplicas() - t.getInSyncReplicas());
  459. case REPLICATION_FACTOR:
  460. return Comparator.comparing(InternalTopic::getReplicationFactor);
  461. case SIZE:
  462. return Comparator.comparing(InternalTopic::getSegmentSize);
  463. case NAME:
  464. default:
  465. return defaultComparator;
  466. }
  467. }
  468. private Mono<List<String>> filterExisting(Collection<String> topics) {
  469. return adminClient.listTopics(true)
  470. .map(existing -> existing.stream().filter(topics::contains).collect(toList()));
  471. }
  472. private Mono<List<InternalTopic>> geTopicsForPagination() {
  473. return filterExisting(metrics.getTopicDescriptions().keySet())
  474. .map(lst -> lst.stream()
  475. .map(topicName ->
  476. InternalTopic.from(
  477. metrics.getTopicDescriptions().get(topicName),
  478. metrics.getTopicConfigs().getOrDefault(topicName, List.of()),
  479. InternalPartitionsOffsets.empty(),
  480. metrics.getJmxMetrics(),
  481. metrics.getLogDirInfo()))
  482. .collect(toList())
  483. );
  484. }
  485. }
  486. }