PrometheusEndpointUtil.java 3.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103
  1. package com.provectus.kafka.ui.util;
  2. import static io.prometheus.client.Collector.MetricFamilySamples;
  3. import com.google.common.annotations.VisibleForTesting;
  4. import com.google.common.collect.Iterators;
  5. import com.provectus.kafka.ui.model.Metrics;
  6. import io.prometheus.client.exporter.common.TextFormat;
  7. import java.io.StringWriter;
  8. import java.util.ArrayList;
  9. import java.util.LinkedHashMap;
  10. import java.util.List;
  11. import java.util.Map;
  12. import java.util.stream.Collectors;
  13. import java.util.stream.Stream;
  14. import lombok.SneakyThrows;
  15. import org.springframework.http.HttpHeaders;
  16. import org.springframework.http.ResponseEntity;
  17. public final class PrometheusEndpointUtil {
  18. private PrometheusEndpointUtil() {
  19. }
  20. public static ResponseEntity<String> exposeAllMetrics(Map<String, Metrics> clustersMetrics) {
  21. return constructResponse(getSummarizedMetricsWithClusterLbl(clustersMetrics));
  22. }
  23. public static ResponseEntity<String> exposeClusterMetrics(Metrics clusterMetrics) {
  24. return constructResponse(clusterMetrics.getSummarizedMetrics());
  25. }
  26. public static ResponseEntity<String> exposeBrokerMetrics(Metrics clusterMetrics, int brokerId) {
  27. //TODO: discuss - do we need to append broker_id lbl ?
  28. return constructResponse(
  29. clusterMetrics
  30. .getPerBrokerScrapedMetrics()
  31. .getOrDefault(brokerId, List.of())
  32. .stream()
  33. );
  34. }
  35. private static Stream<MetricFamilySamples> getSummarizedMetricsWithClusterLbl(Map<String, Metrics> clustersMetrics) {
  36. return clustersMetrics.entrySet()
  37. .stream()
  38. .flatMap(e -> e.getValue()
  39. .getSummarizedMetrics()
  40. .map(mfs -> addLbl(mfs, "cluster", e.getKey())))
  41. // merging MFS with same name, keeping order
  42. .collect(Collectors.toMap(mfs -> mfs.name, mfs -> mfs,
  43. PrometheusEndpointUtil::concatSamples, LinkedHashMap::new))
  44. .values()
  45. .stream();
  46. }
  47. private static MetricFamilySamples concatSamples(MetricFamilySamples mfs1,
  48. MetricFamilySamples mfs2) {
  49. return new MetricFamilySamples(
  50. mfs1.name,
  51. mfs1.unit,
  52. mfs1.type,
  53. mfs1.help,
  54. Stream.concat(mfs1.samples.stream(), mfs2.samples.stream()).toList()
  55. );
  56. }
  57. private static MetricFamilySamples addLbl(MetricFamilySamples mfs, String lblName, String lblVal) {
  58. return new MetricFamilySamples(
  59. mfs.name, mfs.unit, mfs.type, mfs.help,
  60. mfs.samples.stream()
  61. .map(sample ->
  62. new MetricFamilySamples.Sample(
  63. sample.name,
  64. prependToList(sample.labelNames, lblName),
  65. prependToList(sample.labelValues, lblVal),
  66. sample.value
  67. )).toList()
  68. );
  69. }
  70. private static <T> List<T> prependToList(List<T> lst, T toPrepend) {
  71. var result = new ArrayList<T>(lst.size() + 1);
  72. result.add(toPrepend);
  73. result.addAll(lst);
  74. return result;
  75. }
  76. @VisibleForTesting
  77. @SneakyThrows
  78. public static ResponseEntity<String> constructResponse(Stream<MetricFamilySamples> metrics) {
  79. StringWriter writer = new StringWriter();
  80. TextFormat.writeOpenMetrics100(writer, Iterators.asEnumeration(metrics.iterator()));
  81. HttpHeaders responseHeaders = new HttpHeaders();
  82. responseHeaders.set(HttpHeaders.CONTENT_TYPE, TextFormat.CONTENT_TYPE_OPENMETRICS_100);
  83. return ResponseEntity
  84. .ok()
  85. .headers(responseHeaders)
  86. .body(writer.toString());
  87. }
  88. }