InternalLogDirStats.java 2.2 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768
  1. package com.provectus.kafka.ui.model;
  2. import static java.util.stream.Collectors.collectingAndThen;
  3. import static java.util.stream.Collectors.groupingBy;
  4. import static java.util.stream.Collectors.summarizingLong;
  5. import static java.util.stream.Collectors.toList;
  6. import java.util.List;
  7. import java.util.LongSummaryStatistics;
  8. import java.util.Map;
  9. import lombok.Value;
  10. import org.apache.kafka.common.TopicPartition;
  11. import org.apache.kafka.common.requests.DescribeLogDirsResponse;
  12. import reactor.util.function.Tuple2;
  13. import reactor.util.function.Tuple3;
  14. import reactor.util.function.Tuples;
  15. @Value
  16. public class InternalLogDirStats {
  17. @Value
  18. public static class SegmentStats {
  19. long segmentSize;
  20. int segmentsCount;
  21. public SegmentStats(LongSummaryStatistics s) {
  22. segmentSize = s.getSum();
  23. segmentsCount = (int) s.getCount();
  24. }
  25. }
  26. Map<TopicPartition, SegmentStats> partitionsStats;
  27. Map<String, SegmentStats> topicStats;
  28. Map<Integer, SegmentStats> brokerStats;
  29. public static InternalLogDirStats empty() {
  30. return new InternalLogDirStats(Map.of());
  31. }
  32. public InternalLogDirStats(Map<Integer, Map<String, DescribeLogDirsResponse.LogDirInfo>> log) {
  33. final List<Tuple3<Integer, TopicPartition, Long>> topicPartitions =
  34. log.entrySet().stream().flatMap(b ->
  35. b.getValue().entrySet().stream().flatMap(topicMap ->
  36. topicMap.getValue().replicaInfos.entrySet().stream()
  37. .map(e -> Tuples.of(b.getKey(), e.getKey(), e.getValue().size))
  38. )
  39. ).toList();
  40. partitionsStats = topicPartitions.stream().collect(
  41. groupingBy(
  42. Tuple2::getT2,
  43. collectingAndThen(
  44. summarizingLong(Tuple3::getT3), SegmentStats::new)));
  45. topicStats =
  46. topicPartitions.stream().collect(
  47. groupingBy(
  48. t -> t.getT2().topic(),
  49. collectingAndThen(
  50. summarizingLong(Tuple3::getT3), SegmentStats::new)));
  51. brokerStats = topicPartitions.stream().collect(
  52. groupingBy(
  53. Tuple2::getT1,
  54. collectingAndThen(
  55. summarizingLong(Tuple3::getT3), SegmentStats::new)));
  56. }
  57. }