InternalLogDirStats.java 3.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107
  1. package com.provectus.kafka.ui.model;
  2. import static java.util.stream.Collectors.collectingAndThen;
  3. import static java.util.stream.Collectors.groupingBy;
  4. import static java.util.stream.Collectors.summarizingLong;
  5. import jakarta.annotation.Nullable;
  6. import java.util.HashMap;
  7. import java.util.List;
  8. import java.util.LongSummaryStatistics;
  9. import java.util.Map;
  10. import java.util.concurrent.atomic.LongAdder;
  11. import lombok.RequiredArgsConstructor;
  12. import lombok.Value;
  13. import org.apache.kafka.clients.admin.LogDirDescription;
  14. import org.apache.kafka.common.TopicPartition;
  15. import reactor.util.function.Tuple2;
  16. import reactor.util.function.Tuple3;
  17. import reactor.util.function.Tuples;
  18. @Value
  19. public class InternalLogDirStats {
  20. @Value
  21. @RequiredArgsConstructor
  22. public static class SegmentStats {
  23. Long segmentSize;
  24. Integer segmentsCount;
  25. private SegmentStats(LongSummaryStatistics s) {
  26. this(s.getSum(), (int) s.getCount());
  27. }
  28. }
  29. public record LogDirSpaceStats(@Nullable Long totalBytes,
  30. @Nullable Long usableBytes,
  31. Map<String, Long> totalPerDir,
  32. Map<String, Long> usablePerDir) {
  33. }
  34. Map<TopicPartition, SegmentStats> partitionsStats;
  35. Map<String, SegmentStats> topicStats;
  36. Map<Integer, SegmentStats> brokerStats;
  37. Map<Integer, LogDirSpaceStats> brokerDirsStats;
  38. public static InternalLogDirStats empty() {
  39. return new InternalLogDirStats(Map.of());
  40. }
  41. public InternalLogDirStats(Map<Integer, Map<String, LogDirDescription>> logsInfo) {
  42. final List<Tuple3<Integer, TopicPartition, Long>> topicPartitions =
  43. logsInfo.entrySet().stream().flatMap(b ->
  44. b.getValue().entrySet().stream().flatMap(topicMap ->
  45. topicMap.getValue().replicaInfos().entrySet().stream()
  46. .map(e -> Tuples.of(b.getKey(), e.getKey(), e.getValue().size()))
  47. )
  48. ).toList();
  49. partitionsStats = topicPartitions.stream().collect(
  50. groupingBy(
  51. Tuple2::getT2,
  52. collectingAndThen(
  53. summarizingLong(Tuple3::getT3), SegmentStats::new)));
  54. topicStats =
  55. topicPartitions.stream().collect(
  56. groupingBy(
  57. t -> t.getT2().topic(),
  58. collectingAndThen(
  59. summarizingLong(Tuple3::getT3), SegmentStats::new)));
  60. brokerStats = topicPartitions.stream().collect(
  61. groupingBy(
  62. Tuple2::getT1,
  63. collectingAndThen(
  64. summarizingLong(Tuple3::getT3), SegmentStats::new)));
  65. brokerDirsStats = calculateSpaceStats(logsInfo);
  66. }
  67. private static Map<Integer, LogDirSpaceStats> calculateSpaceStats(
  68. Map<Integer, Map<String, LogDirDescription>> logsInfo) {
  69. var stats = new HashMap<Integer, LogDirSpaceStats>();
  70. logsInfo.forEach((brokerId, logDirStats) -> {
  71. Map<String, Long> totalBytes = new HashMap<>();
  72. Map<String, Long> usableBytes = new HashMap<>();
  73. logDirStats.forEach((logDir, descr) -> {
  74. if (descr.error() == null) {
  75. return;
  76. }
  77. descr.totalBytes().ifPresent(b -> totalBytes.merge(logDir, b, Long::sum));
  78. descr.usableBytes().ifPresent(b -> usableBytes.merge(logDir, b, Long::sum));
  79. });
  80. stats.put(
  81. brokerId,
  82. new LogDirSpaceStats(
  83. totalBytes.isEmpty() ? null : totalBytes.values().stream().mapToLong(i -> i).sum(),
  84. usableBytes.isEmpty() ? null : usableBytes.values().stream().mapToLong(i -> i).sum(),
  85. totalBytes,
  86. usableBytes
  87. )
  88. );
  89. });
  90. return stats;
  91. }
  92. }