Преглед изворни кода

Support prometheus as a metrics interface (#2190)

* Unified internal metric representation (RawMetric)
* Prometheus retriever implemented
* Jmx metrics formatted to jmx-exporter openmentrics format

Co-authored-by: Mohamad Choukair <mchoukair@provectus.com>
Co-authored-by: Roman Zabaluev <rzabaluev@provectus.com>
Co-authored-by: Daniel Ching <110129035+daching-provectus@users.noreply.github.com>
Co-authored-by: Daniel Ching <daching@provectus.com>
Co-authored-by: German Osin <germanosin@Germans-MacBook-Pro.local>
Co-authored-by: iliax <ikuramshin@provectus.com>
Co-authored-by: Ilya Kuramshin <iliax@proton.me>
Marat Gumarov пре 2 година
родитељ
комит
6dff8f105e
56 измењених фајлова са 1034 додато и 499 уклоњено
  1. 10 6
      README.md
  2. 1 0
      documentation/compose/DOCKER_COMPOSE.md
  3. 2 2
      documentation/compose/auth-ldap.yaml
  4. 1 1
      documentation/compose/e2e-tests.yaml
  5. 2 0
      documentation/compose/jmx-exporter/kafka-broker.yml
  6. 10 0
      documentation/compose/jmx-exporter/kafka-prepare-and-run
  7. 1 1
      documentation/compose/kafka-ui-auth-context.yaml
  8. 1 1
      documentation/compose/kafka-ui-connectors-auth.yaml
  9. 4 4
      documentation/compose/kafka-ui-jmx-secured.yml
  10. 45 0
      documentation/compose/kafka-ui-with-jmx-exporter.yaml
  11. 2 2
      documentation/compose/kafka-ui.yaml
  12. 12 0
      kafka-ui-api/pom.xml
  13. 10 4
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/config/ClustersProperties.java
  14. 0 21
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/config/Config.java
  15. 19 5
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/mapper/ClusterMapper.java
  16. 1 1
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/BrokerMetrics.java
  17. 2 2
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/InternalClusterMetrics.java
  18. 19 14
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/InternalClusterState.java
  19. 4 5
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/InternalTopic.java
  20. 1 4
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/KafkaCluster.java
  21. 38 0
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/Metrics.java
  22. 20 0
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/MetricsConfig.java
  23. 38 0
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/Statistics.java
  24. 0 1
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/AdminClientServiceImpl.java
  25. 6 7
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/BrokerService.java
  26. 6 6
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/ClusterService.java
  27. 4 4
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/ClustersStatisticsScheduler.java
  28. 7 40
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/StatisticsCache.java
  29. 13 11
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/StatisticsService.java
  30. 24 23
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/TopicsService.java
  31. 85 0
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/metrics/JmxMetricsFormatter.java
  32. 106 0
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/metrics/JmxMetricsRetriever.java
  33. 69 0
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/metrics/MetricsCollector.java
  34. 9 0
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/metrics/MetricsRetriever.java
  35. 46 0
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/metrics/PrometheusEndpointMetricsParser.java
  36. 58 0
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/metrics/PrometheusMetricsRetriever.java
  37. 60 0
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/metrics/RawMetric.java
  38. 42 0
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/metrics/WellKnownMetrics.java
  39. 0 0
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/util/ClusterUtil.java
  40. 0 217
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/util/JmxClusterUtil.java
  41. 0 41
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/util/JmxMetricsName.java
  42. 0 19
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/util/JmxMetricsValueName.java
  43. 0 4
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/util/NumberUtil.java
  44. 0 2
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/util/OffsetsSeek.java
  45. 6 2
      kafka-ui-api/src/main/resources/application-local.yml
  46. 0 1
      kafka-ui-api/src/test/java/com/provectus/kafka/ui/AbstractIntegrationTest.java
  47. 0 1
      kafka-ui-api/src/test/java/com/provectus/kafka/ui/producer/KafkaTestProducer.java
  48. 0 3
      kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/SendAndReadTests.java
  49. 10 10
      kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/TopicsServicePaginationTest.java
  50. 77 0
      kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/metrics/JmxMetricsFormatterTest.java
  51. 30 0
      kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/metrics/PrometheusEndpointMetricsParserTest.java
  52. 64 0
      kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/metrics/PrometheusMetricsRetrieverTest.java
  53. 66 0
      kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/metrics/WellKnownMetricsTest.java
  54. 0 28
      kafka-ui-api/src/test/java/com/provectus/kafka/ui/util/NumberUtilTest.java
  55. 2 6
      kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml
  56. 1 0
      pom.xml

+ 10 - 6
README.md

@@ -92,7 +92,9 @@ kafka:
         username: username
         password: password
 #     schemaNameTemplate: "%s-value"
-      jmxPort: 9997
+      metrics:
+        port: 9997
+        type: JMX
     -
 ```
 
@@ -102,7 +104,8 @@ kafka:
 * `schemaRegistryAuth.username`: schemaRegistry's basic authentication username
 * `schemaRegistryAuth.password`: schemaRegistry's basic authentication password
 * `schemaNameTemplate`: how keys are saved to schemaRegistry
-* `jmxPort`: open JMX port of a broker
+* `metrics.port`: open JMX port of a broker
+* `metrics.type`: Type of metrics, either JMX or PROMETHEUS. Defaulted to JMX.
 * `readOnly`: enable read only mode
 
 Configure as many clusters as you need by adding their configs below separated with `-`.
@@ -181,15 +184,16 @@ For example, if you want to use an environment variable to set the `name` parame
 |`KAFKA_CLUSTERS_0_SCHEMAREGISTRYAUTH_USERNAME`   	|SchemaRegistry's basic authentication username
 |`KAFKA_CLUSTERS_0_SCHEMAREGISTRYAUTH_PASSWORD`   	|SchemaRegistry's basic authentication password
 |`KAFKA_CLUSTERS_0_SCHEMANAMETEMPLATE` |How keys are saved to schemaRegistry
-|`KAFKA_CLUSTERS_0_JMXPORT`        	|Open JMX port of a broker
+|`KAFKA_CLUSTERS_0_METRICS_PORT`        	 |Open metrics port of a broker
+|`KAFKA_CLUSTERS_0_METRICS_TYPE`        	 |Type of metrics retriever to use. Valid values are JMX (default) or PROMETHEUS. If Prometheus, then metrics are read from prometheus-jmx-exporter instead of jmx
 |`KAFKA_CLUSTERS_0_READONLY`        	|Enable read-only mode. Default: false
 |`KAFKA_CLUSTERS_0_DISABLELOGDIRSCOLLECTION`        	|Disable collecting segments information. It should be true for confluent cloud. Default: false
 |`KAFKA_CLUSTERS_0_KAFKACONNECT_0_NAME` |Given name for the Kafka Connect cluster
 |`KAFKA_CLUSTERS_0_KAFKACONNECT_0_ADDRESS` |Address of the Kafka Connect service endpoint
 |`KAFKA_CLUSTERS_0_KAFKACONNECT_0_USERNAME`| Kafka Connect cluster's basic authentication username
 |`KAFKA_CLUSTERS_0_KAFKACONNECT_0_PASSWORD`| Kafka Connect cluster's basic authentication password
-|`KAFKA_CLUSTERS_0_JMXSSL` |Enable SSL for JMX? `true` or `false`. For advanced setup, see `kafka-ui-jmx-secured.yml`
-|`KAFKA_CLUSTERS_0_JMXUSERNAME` |Username for JMX authentication
-|`KAFKA_CLUSTERS_0_JMXPASSWORD` |Password for JMX authentication
+|`KAFKA_CLUSTERS_0_METRICS_SSL`          |Enable SSL for Metrics? `true` or `false`. For advanced setup, see `kafka-ui-jmx-secured.yml`
+|`KAFKA_CLUSTERS_0_METRICS_USERNAME` |Username for Metrics authentication
+|`KAFKA_CLUSTERS_0_METRICS_PASSWORD` |Password for Metrics authentication
 |`TOPIC_RECREATE_DELAY_SECONDS` |Time delay between topic deletion and topic creation attempts for topic recreate functionality. Default: 1
 |`TOPIC_RECREATE_MAXRETRIES`  |Number of attempts of topic creation after topic deletion for topic recreate functionality. Default: 15

+ 1 - 0
documentation/compose/DOCKER_COMPOSE.md

@@ -12,3 +12,4 @@
 10. [kafka-ui-sasl.yaml](./kafka-ui-sasl.yaml) - SASL auth for Kafka.
 11. [kafka-ui-traefik-proxy.yaml](./kafka-ui-traefik-proxy.yaml) - Traefik specific proxy configuration.
 12. [oauth-cognito.yaml](./oauth-cognito.yaml) - OAuth2 with Cognito
+13. [kafka-ui-with-jmx-exporter.yaml](./kafka-ui-with-jmx-exporter.yaml) - A configuration with 2 kafka clusters with enabled prometheus jmx exporters instead of jmx.

+ 2 - 2
documentation/compose/auth-ldap.yaml

@@ -15,14 +15,14 @@ services:
       KAFKA_CLUSTERS_0_NAME: local
       KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka0:29092
       KAFKA_CLUSTERS_0_ZOOKEEPER: zookeeper0:2181
-      KAFKA_CLUSTERS_0_JMXPORT: 9997
+      KAFKA_CLUSTERS_0_METRICS_PORT: 9997
       KAFKA_CLUSTERS_0_SCHEMAREGISTRY: http://schemaregistry0:8085
       KAFKA_CLUSTERS_0_KAFKACONNECT_0_NAME: first
       KAFKA_CLUSTERS_0_KAFKACONNECT_0_ADDRESS: http://kafka-connect0:8083
       KAFKA_CLUSTERS_1_NAME: secondLocal
       KAFKA_CLUSTERS_1_BOOTSTRAPSERVERS: kafka1:29092
       KAFKA_CLUSTERS_1_ZOOKEEPER: zookeeper1:2181
-      KAFKA_CLUSTERS_1_JMXPORT: 9998
+      KAFKA_CLUSTERS_1_METRICS_PORT: 9998
       KAFKA_CLUSTERS_1_SCHEMAREGISTRY: http://schemaregistry1:8085
       KAFKA_CLUSTERS_1_KAFKACONNECT_0_NAME: first
       KAFKA_CLUSTERS_1_KAFKACONNECT_0_ADDRESS: http://kafka-connect0:8083

+ 1 - 1
documentation/compose/e2e-tests.yaml

@@ -16,7 +16,7 @@ services:
       KAFKA_CLUSTERS_0_NAME: local
       KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka0:29092
       KAFKA_CLUSTERS_0_ZOOKEEPER: zookeeper0:2181
-      KAFKA_CLUSTERS_0_JMXPORT: 9997
+      KAFKA_CLUSTERS_0_METRICS_PORT: 9997
       KAFKA_CLUSTERS_0_SCHEMAREGISTRY: http://schemaregistry0:8085
       KAFKA_CLUSTERS_0_KAFKACONNECT_0_NAME: first
       KAFKA_CLUSTERS_0_KAFKACONNECT_0_ADDRESS: http://kafka-connect0:8083

+ 2 - 0
documentation/compose/jmx-exporter/kafka-broker.yml

@@ -0,0 +1,2 @@
+rules:
+  - pattern: ".*"

+ 10 - 0
documentation/compose/jmx-exporter/kafka-prepare-and-run

@@ -0,0 +1,10 @@
+#!/usr/bin/env bash
+
+JAVA_AGENT_FILE="/usr/share/jmx_exporter/jmx_prometheus_javaagent.jar"
+if [ ! -f "$JAVA_AGENT_FILE" ]
+then
+  echo "Downloading jmx_exporter javaagent"
+  curl -o $JAVA_AGENT_FILE https://repo1.maven.org/maven2/io/prometheus/jmx/jmx_prometheus_javaagent/0.16.1/jmx_prometheus_javaagent-0.16.1.jar
+fi
+
+exec /etc/confluent/docker/run

+ 1 - 1
documentation/compose/kafka-ui-auth-context.yaml

@@ -14,7 +14,7 @@ services:
       KAFKA_CLUSTERS_0_NAME: local
       KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka0:29092
       KAFKA_CLUSTERS_0_ZOOKEEPER: zookeeper0:2181
-      KAFKA_CLUSTERS_0_JMXPORT: 9997
+      KAFKA_CLUSTERS_0_METRICS_PORT: 9997
       SERVER_SERVLET_CONTEXT_PATH: /kafkaui
       AUTH_TYPE: "LOGIN_FORM"
       SPRING_SECURITY_USER_NAME: admin

+ 1 - 1
documentation/compose/kafka-ui-connectors-auth.yaml

@@ -18,7 +18,7 @@ services:
       KAFKA_CLUSTERS_0_NAME: local
       KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka0:29092
       KAFKA_CLUSTERS_0_ZOOKEEPER: zookeeper0:2181
-      KAFKA_CLUSTERS_0_JMXPORT: 9997
+      KAFKA_CLUSTERS_0_METRICS_PORT: 9997
       KAFKA_CLUSTERS_0_SCHEMAREGISTRY: http://schemaregistry0:8085
       KAFKA_CLUSTERS_0_KAFKACONNECT_0_NAME: first
       KAFKA_CLUSTERS_0_KAFKACONNECT_0_ADDRESS: http://kafka-connect0:8083

+ 4 - 4
documentation/compose/kafka-ui-jmx-secured.yml

@@ -20,10 +20,10 @@ services:
       KAFKA_CLUSTERS_0_SCHEMAREGISTRY: http://schemaregistry0:8085
       KAFKA_CLUSTERS_0_KAFKACONNECT_0_NAME: first
       KAFKA_CLUSTERS_0_KAFKACONNECT_0_ADDRESS: http://kafka-connect0:8083
-      KAFKA_CLUSTERS_0_JMXPORT: 9997
-      KAFKA_CLUSTERS_0_JMXSSL: 'true'
-      KAFKA_CLUSTERS_0_JMXUSERNAME: root
-      KAFKA_CLUSTERS_0_JMXPASSWORD: password
+      KAFKA_CLUSTERS_0_METRICS_PORT: 9997
+      KAFKA_CLUSTERS_0_METRICS_SSL: 'true'
+      KAFKA_CLUSTERS_0_METRICS_USERNAME: root
+      KAFKA_CLUSTERS_0_METRICS_PASSWORD: password
       JAVA_OPTS: >-
         -agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=*:5005
         -Djavax.net.ssl.trustStore=/jmx/clienttruststore

+ 45 - 0
documentation/compose/kafka-ui-with-jmx-exporter.yaml

@@ -0,0 +1,45 @@
+---
+version: '2'
+services:
+
+  zookeeper0:
+    image: confluentinc/cp-zookeeper:5.2.4
+    environment:
+      ZOOKEEPER_CLIENT_PORT: 2181
+      ZOOKEEPER_TICK_TIME: 2000
+    ports:
+      - 2181:2181
+
+  kafka0:
+    image: confluentinc/cp-kafka:5.3.1
+    # downloading jmx_exporter javaagent and starting kafka
+    command: "/usr/share/jmx_exporter/kafka-prepare-and-run"
+    depends_on:
+      - zookeeper0
+    environment:
+      KAFKA_BROKER_ID: 1
+      KAFKA_ZOOKEEPER_CONNECT: zookeeper0:2181
+      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka0:29092,PLAINTEXT_HOST://localhost:9092
+      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
+      KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
+      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
+      KAFKA_OPTS: -javaagent:/usr/share/jmx_exporter/jmx_prometheus_javaagent.jar=11001:/usr/share/jmx_exporter/kafka-broker.yml
+    ports:
+      - 9092:9092
+      - 11001:11001
+    volumes:
+      - ./jmx-exporter:/usr/share/jmx_exporter/
+
+  kafka-ui:
+    container_name: kafka-ui
+    image: provectuslabs/kafka-ui:latest
+    ports:
+      - 8080:8080
+    depends_on:
+      - zookeeper0
+      - kafka0
+    environment:
+      KAFKA_CLUSTERS_0_NAME: local
+      KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka0:29092
+      KAFKA_CLUSTERS_0_METRICS_PORT: 11001
+      KAFKA_CLUSTERS_0_METRICS_TYPE: PROMETHEUS

+ 2 - 2
documentation/compose/kafka-ui.yaml

@@ -18,14 +18,14 @@ services:
       KAFKA_CLUSTERS_0_NAME: local
       KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka0:29092
       KAFKA_CLUSTERS_0_ZOOKEEPER: zookeeper0:2181
-      KAFKA_CLUSTERS_0_JMXPORT: 9997
+      KAFKA_CLUSTERS_0_METRICS_PORT: 9997
       KAFKA_CLUSTERS_0_SCHEMAREGISTRY: http://schemaregistry0:8085
       KAFKA_CLUSTERS_0_KAFKACONNECT_0_NAME: first
       KAFKA_CLUSTERS_0_KAFKACONNECT_0_ADDRESS: http://kafka-connect0:8083
       KAFKA_CLUSTERS_1_NAME: secondLocal
       KAFKA_CLUSTERS_1_BOOTSTRAPSERVERS: kafka1:29092
       KAFKA_CLUSTERS_1_ZOOKEEPER: zookeeper1:2181
-      KAFKA_CLUSTERS_1_JMXPORT: 9998
+      KAFKA_CLUSTERS_1_METRICS_PORT: 9998
       KAFKA_CLUSTERS_1_SCHEMAREGISTRY: http://schemaregistry1:8085
       KAFKA_CLUSTERS_1_KAFKACONNECT_0_NAME: first
       KAFKA_CLUSTERS_1_KAFKACONNECT_0_ADDRESS: http://kafka-connect0:8083

+ 12 - 0
kafka-ui-api/pom.xml

@@ -185,6 +185,18 @@
             <version>2.2.14</version>
             <scope>test</scope>
         </dependency>
+        <dependency>
+            <groupId>com.squareup.okhttp3</groupId>
+            <artifactId>mockwebserver</artifactId>
+            <version>${okhttp3.mockwebserver.version}</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>com.squareup.okhttp3</groupId>
+            <artifactId>okhttp</artifactId>
+            <version>${okhttp3.mockwebserver.version}</version>
+            <scope>test</scope>
+        </dependency>
 
         <dependency>
             <groupId>org.springframework.boot</groupId>

+ 10 - 4
kafka-ui-api/src/main/java/com/provectus/kafka/ui/config/ClustersProperties.java

@@ -30,10 +30,7 @@ public class ClustersProperties {
     String ksqldbServer;
     KsqldbServerAuth ksqldbServerAuth;
     List<ConnectCluster> kafkaConnect;
-    int jmxPort;
-    boolean jmxSsl;
-    String jmxUsername;
-    String jmxPassword;
+    MetricsConfigData metrics;
     Properties properties;
     boolean readOnly = false;
     boolean disableLogDirsCollection = false;
@@ -42,6 +39,15 @@ public class ClustersProperties {
     String defaultValueSerde;
   }
 
+  @Data
+  public static class MetricsConfigData {
+    String type;
+    Integer port;
+    boolean ssl;
+    String username;
+    String password;
+  }
+
   @Data
   public static class ConnectCluster {
     String name;

+ 0 - 21
kafka-ui-api/src/main/java/com/provectus/kafka/ui/config/Config.java

@@ -1,14 +1,8 @@
 package com.provectus.kafka.ui.config;
 
-import com.provectus.kafka.ui.model.JmxConnectionInfo;
-import com.provectus.kafka.ui.util.JmxPoolFactory;
 import java.util.Collections;
 import java.util.Map;
-import javax.management.remote.JMXConnector;
 import lombok.AllArgsConstructor;
-import org.apache.commons.pool2.KeyedObjectPool;
-import org.apache.commons.pool2.impl.GenericKeyedObjectPool;
-import org.apache.commons.pool2.impl.GenericKeyedObjectPoolConfig;
 import org.openapitools.jackson.nullable.JsonNullableModule;
 import org.springframework.beans.factory.ObjectProvider;
 import org.springframework.beans.factory.annotation.Value;
@@ -49,21 +43,6 @@ public class Config {
     return httpHandler;
   }
 
-
-  @Bean
-  public KeyedObjectPool<JmxConnectionInfo, JMXConnector> pool() {
-    var pool = new GenericKeyedObjectPool<>(new JmxPoolFactory());
-    pool.setConfig(poolConfig());
-    return pool;
-  }
-
-  private GenericKeyedObjectPoolConfig poolConfig() {
-    final var poolConfig = new GenericKeyedObjectPoolConfig();
-    poolConfig.setMaxIdlePerKey(3);
-    poolConfig.setMaxTotalPerKey(3);
-    return poolConfig;
-  }
-
   @Bean
   public MBeanExporter exporter() {
     final var exporter = new MBeanExporter();

+ 19 - 5
kafka-ui-api/src/main/java/com/provectus/kafka/ui/mapper/ClusterMapper.java

@@ -23,9 +23,10 @@ import com.provectus.kafka.ui.model.InternalReplica;
 import com.provectus.kafka.ui.model.InternalSchemaRegistry;
 import com.provectus.kafka.ui.model.InternalTopic;
 import com.provectus.kafka.ui.model.InternalTopicConfig;
-import com.provectus.kafka.ui.model.JmxBrokerMetrics;
 import com.provectus.kafka.ui.model.KafkaCluster;
 import com.provectus.kafka.ui.model.KafkaConnectCluster;
+import com.provectus.kafka.ui.model.MetricDTO;
+import com.provectus.kafka.ui.model.Metrics;
 import com.provectus.kafka.ui.model.PartitionDTO;
 import com.provectus.kafka.ui.model.ReplicaDTO;
 import com.provectus.kafka.ui.model.TopicConfigDTO;
@@ -33,7 +34,8 @@ import com.provectus.kafka.ui.model.TopicDTO;
 import com.provectus.kafka.ui.model.TopicDetailsDTO;
 import com.provectus.kafka.ui.model.schemaregistry.InternalCompatibilityCheck;
 import com.provectus.kafka.ui.model.schemaregistry.InternalCompatibilityLevel;
-import com.provectus.kafka.ui.util.JmxClusterUtil;
+import com.provectus.kafka.ui.service.metrics.RawMetric;
+import java.nio.file.Path;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
@@ -53,15 +55,27 @@ public interface ClusterMapper {
   @Mapping(target = "properties", source = "properties", qualifiedByName = "setProperties")
   @Mapping(target = "schemaRegistry", source = ".", qualifiedByName = "setSchemaRegistry")
   @Mapping(target = "ksqldbServer", source = ".", qualifiedByName = "setKsqldbServer")
+  @Mapping(target = "metricsConfig", source = "metrics")
   KafkaCluster toKafkaCluster(ClustersProperties.Cluster clusterProperties);
 
   ClusterStatsDTO toClusterStats(InternalClusterState clusterState);
 
-  default ClusterMetricsDTO toClusterMetrics(JmxClusterUtil.JmxMetrics jmxMetrics) {
-    return new ClusterMetricsDTO().items(jmxMetrics.getMetrics());
+  default ClusterMetricsDTO toClusterMetrics(Metrics metrics) {
+    return new ClusterMetricsDTO()
+        .items(metrics.getSummarizedMetrics().map(this::convert).collect(Collectors.toList()));
   }
 
-  BrokerMetricsDTO toBrokerMetrics(JmxBrokerMetrics metrics);
+  private MetricDTO convert(RawMetric rawMetric) {
+    return new MetricDTO()
+        .name(rawMetric.name())
+        .labels(rawMetric.labels())
+        .value(rawMetric.value());
+  }
+
+  default BrokerMetricsDTO toBrokerMetrics(List<RawMetric> metrics) {
+    return new BrokerMetricsDTO()
+        .metrics(metrics.stream().map(this::convert).collect(Collectors.toList()));
+  }
 
   @Mapping(target = "isSensitive", source = "sensitive")
   @Mapping(target = "isReadOnly", source = "readOnly")

+ 1 - 1
kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/JmxBrokerMetrics.java → kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/BrokerMetrics.java

@@ -6,6 +6,6 @@ import lombok.Data;
 
 @Data
 @Builder(toBuilder = true)
-public class JmxBrokerMetrics {
+public class BrokerMetrics {
   private final List<MetricDTO> metrics;
 }

+ 2 - 2
kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/InternalClusterMetrics.java

@@ -46,10 +46,10 @@ public class InternalClusterMetrics {
   @Nullable // will be null if log dir collection disabled
   private final Map<Integer, InternalBrokerDiskUsage> internalBrokerDiskUsage;
 
-  // metrics from jmx
+  // metrics from metrics collector
   private final BigDecimal bytesInPerSec;
   private final BigDecimal bytesOutPerSec;
-  private final Map<Integer, JmxBrokerMetrics> internalBrokerMetrics;
+  private final Map<Integer, BrokerMetrics> internalBrokerMetrics;
   private final List<MetricDTO> metrics;
 
 }

+ 19 - 14
kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/InternalClusterState.java

@@ -1,7 +1,6 @@
 package com.provectus.kafka.ui.model;
 
 import com.google.common.base.Throwables;
-import com.provectus.kafka.ui.service.MetricsCache;
 import java.math.BigDecimal;
 import java.util.List;
 import java.util.Optional;
@@ -28,21 +27,21 @@ public class InternalClusterState {
   private BigDecimal bytesOutPerSec;
   private Boolean readOnly;
 
-  public InternalClusterState(KafkaCluster cluster, MetricsCache.Metrics metrics) {
+  public InternalClusterState(KafkaCluster cluster, Statistics statistics) {
     name = cluster.getName();
-    status = metrics.getStatus();
-    lastError = Optional.ofNullable(metrics.getLastKafkaException())
+    status = statistics.getStatus();
+    lastError = Optional.ofNullable(statistics.getLastKafkaException())
         .map(e -> new MetricsCollectionErrorDTO()
             .message(e.getMessage())
             .stackTrace(Throwables.getStackTraceAsString(e)))
         .orElse(null);
-    topicCount = metrics.getTopicDescriptions().size();
-    brokerCount = metrics.getClusterDescription().getNodes().size();
-    activeControllers = metrics.getClusterDescription().getController() != null ? 1 : 0;
-    version = metrics.getVersion();
+    topicCount = statistics.getTopicDescriptions().size();
+    brokerCount = statistics.getClusterDescription().getNodes().size();
+    activeControllers = statistics.getClusterDescription().getController() != null ? 1 : 0;
+    version = statistics.getVersion();
 
-    if (metrics.getLogDirInfo() != null) {
-      diskUsage = metrics.getLogDirInfo().getBrokerStats().entrySet().stream()
+    if (statistics.getLogDirInfo() != null) {
+      diskUsage = statistics.getLogDirInfo().getBrokerStats().entrySet().stream()
           .map(e -> new BrokerDiskUsageDTO()
               .brokerId(e.getKey())
               .segmentSize(e.getValue().getSegmentSize())
@@ -50,15 +49,21 @@ public class InternalClusterState {
           .collect(Collectors.toList());
     }
 
-    features = metrics.getFeatures();
+    features = statistics.getFeatures();
 
-    bytesInPerSec = metrics.getJmxMetrics().getBytesInPerSec().values().stream()
+    bytesInPerSec = statistics
+        .getMetrics()
+        .getBytesInPerSec()
+        .values().stream()
         .reduce(BigDecimal.ZERO, BigDecimal::add);
 
-    bytesOutPerSec = metrics.getJmxMetrics().getBytesOutPerSec().values().stream()
+    bytesOutPerSec = statistics
+        .getMetrics()
+        .getBytesOutPerSec()
+        .values().stream()
         .reduce(BigDecimal.ZERO, BigDecimal::add);
 
-    var partitionsStats = new PartitionsStats(metrics.getTopicDescriptions().values());
+    var partitionsStats = new PartitionsStats(statistics.getTopicDescriptions().values());
     onlinePartitionCount = partitionsStats.getOnlinePartitionCount();
     offlinePartitionCount = partitionsStats.getOfflinePartitionCount();
     inSyncReplicasCount = partitionsStats.getInSyncReplicasCount();

+ 4 - 5
kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/InternalTopic.java

@@ -1,6 +1,5 @@
 package com.provectus.kafka.ui.model;
 
-import com.provectus.kafka.ui.util.JmxClusterUtil;
 import java.math.BigDecimal;
 import java.util.List;
 import java.util.Map;
@@ -29,7 +28,7 @@ public class InternalTopic {
   private final List<InternalTopicConfig> topicConfigs;
   private final CleanupPolicy cleanUpPolicy;
 
-  // rates from jmx
+  // rates from metrics
   private final BigDecimal bytesInPerSec;
   private final BigDecimal bytesOutPerSec;
 
@@ -40,7 +39,7 @@ public class InternalTopic {
   public static InternalTopic from(TopicDescription topicDescription,
                                    List<ConfigEntry> configs,
                                    InternalPartitionsOffsets partitionsOffsets,
-                                   JmxClusterUtil.JmxMetrics jmxMetrics,
+                                   Metrics metrics,
                                    InternalLogDirStats logDirInfo) {
     var topic = InternalTopic.builder();
     topic.internal(
@@ -105,8 +104,8 @@ public class InternalTopic {
       topic.segmentSize(segmentStats.getSegmentSize());
     }
 
-    topic.bytesInPerSec(jmxMetrics.getBytesInPerSec().get(topicDescription.name()));
-    topic.bytesOutPerSec(jmxMetrics.getBytesOutPerSec().get(topicDescription.name()));
+    topic.bytesInPerSec(metrics.getBytesInPerSec().get(topicDescription.name()));
+    topic.bytesOutPerSec(metrics.getBytesOutPerSec().get(topicDescription.name()));
 
     topic.topicConfigs(
         configs.stream().map(InternalTopicConfig::from).collect(Collectors.toList()));

+ 1 - 4
kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/KafkaCluster.java

@@ -13,10 +13,6 @@ import lombok.Data;
 public class KafkaCluster {
   private final String name;
   private final String version;
-  private final Integer jmxPort;
-  private final boolean jmxSsl;
-  private final String jmxUsername;
-  private final String jmxPassword;
   private final String bootstrapServers;
   private final InternalSchemaRegistry schemaRegistry;
   private final InternalKsqlServer ksqldbServer;
@@ -24,4 +20,5 @@ public class KafkaCluster {
   private final Properties properties;
   private final boolean readOnly;
   private final boolean disableLogDirsCollection;
+  private final MetricsConfig metricsConfig;
 }

+ 38 - 0
kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/Metrics.java

@@ -0,0 +1,38 @@
+package com.provectus.kafka.ui.model;
+
+import static java.util.stream.Collectors.toMap;
+
+import com.provectus.kafka.ui.service.metrics.RawMetric;
+import java.math.BigDecimal;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Stream;
+import lombok.Builder;
+import lombok.Value;
+
+
+@Builder
+@Value
+public class Metrics {
+  Map<String, BigDecimal> bytesInPerSec;
+  Map<String, BigDecimal> bytesOutPerSec;
+  Map<Integer, List<RawMetric>> perBrokerMetrics;
+
+  public static Metrics empty() {
+    return Metrics.builder()
+        .bytesInPerSec(Map.of())
+        .bytesOutPerSec(Map.of())
+        .perBrokerMetrics(Map.of())
+        .build();
+  }
+
+  public Stream<RawMetric> getSummarizedMetrics() {
+    return perBrokerMetrics.values().stream()
+        .flatMap(Collection::stream)
+        .collect(toMap(RawMetric::identityKey, m -> m, (m1, m2) -> m1.copyWithValue(m1.value().add(m2.value()))))
+        .values()
+        .stream();
+  }
+
+}

+ 20 - 0
kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/MetricsConfig.java

@@ -0,0 +1,20 @@
+package com.provectus.kafka.ui.model;
+
+import lombok.AccessLevel;
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Data;
+
+@Data
+@Builder(toBuilder = true)
+@AllArgsConstructor(access = AccessLevel.PRIVATE)
+public class MetricsConfig {
+  public static final String JMX_METRICS_TYPE = "JMX";
+  public static final String PROMETHEUS_METRICS_TYPE = "PROMETHEUS";
+
+  private final String type;
+  private final Integer port;
+  private final boolean ssl;
+  private final String username;
+  private final String password;
+}

+ 38 - 0
kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/Statistics.java

@@ -0,0 +1,38 @@
+package com.provectus.kafka.ui.model;
+
+import com.provectus.kafka.ui.service.ReactiveAdminClient;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import lombok.Builder;
+import lombok.Value;
+import org.apache.kafka.clients.admin.ConfigEntry;
+import org.apache.kafka.clients.admin.TopicDescription;
+
+@Value
+@Builder(toBuilder = true)
+public class Statistics {
+  ServerStatusDTO status;
+  Throwable lastKafkaException;
+  String version;
+  List<Feature> features;
+  ReactiveAdminClient.ClusterDescription clusterDescription;
+  Metrics metrics;
+  InternalLogDirStats logDirInfo;
+  Map<String, TopicDescription> topicDescriptions;
+  Map<String, List<ConfigEntry>> topicConfigs;
+
+  public static Statistics empty() {
+    return builder()
+        .status(ServerStatusDTO.OFFLINE)
+        .version("Unknown")
+        .features(List.of())
+        .clusterDescription(
+            new ReactiveAdminClient.ClusterDescription(null, null, List.of(), Set.of()))
+        .metrics(Metrics.empty())
+        .logDirInfo(InternalLogDirStats.empty())
+        .topicDescriptions(Map.of())
+        .topicConfigs(Map.of())
+        .build();
+  }
+}

+ 0 - 1
kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/AdminClientServiceImpl.java

@@ -12,7 +12,6 @@ import org.apache.kafka.clients.admin.AdminClient;
 import org.apache.kafka.clients.admin.AdminClientConfig;
 import org.springframework.beans.factory.annotation.Value;
 import org.springframework.stereotype.Service;
-import org.springframework.util.StringUtils;
 import reactor.core.publisher.Mono;
 
 @Service

+ 6 - 7
kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/BrokerService.java

@@ -9,8 +9,8 @@ import com.provectus.kafka.ui.model.BrokerDTO;
 import com.provectus.kafka.ui.model.BrokerLogdirUpdateDTO;
 import com.provectus.kafka.ui.model.BrokersLogdirsDTO;
 import com.provectus.kafka.ui.model.InternalBrokerConfig;
-import com.provectus.kafka.ui.model.JmxBrokerMetrics;
 import com.provectus.kafka.ui.model.KafkaCluster;
+import com.provectus.kafka.ui.service.metrics.RawMetric;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
@@ -35,7 +35,7 @@ import reactor.core.publisher.Mono;
 @Slf4j
 public class BrokerService {
 
-  private final MetricsCache metricsCache;
+  private final StatisticsCache statisticsCache;
   private final AdminClientService adminClientService;
   private final DescribeLogDirsMapper describeLogDirsMapper;
 
@@ -54,7 +54,7 @@ public class BrokerService {
   }
 
   private Flux<InternalBrokerConfig> getBrokersConfig(KafkaCluster cluster, Integer brokerId) {
-    if (metricsCache.get(cluster).getClusterDescription().getNodes()
+    if (statisticsCache.get(cluster).getClusterDescription().getNodes()
         .stream().noneMatch(node -> node.id() == brokerId)) {
       return Flux.error(
           new NotFoundException(String.format("Broker with id %s not found", brokerId)));
@@ -125,7 +125,7 @@ public class BrokerService {
       KafkaCluster cluster, List<Integer> reqBrokers) {
     return adminClientService.get(cluster)
         .flatMap(admin -> {
-          List<Integer> brokers = metricsCache.get(cluster).getClusterDescription().getNodes()
+          List<Integer> brokers = statisticsCache.get(cluster).getClusterDescription().getNodes()
               .stream()
               .map(Node::id)
               .collect(Collectors.toList());
@@ -150,9 +150,8 @@ public class BrokerService {
     return getBrokersConfig(cluster, brokerId);
   }
 
-  public Mono<JmxBrokerMetrics> getBrokerMetrics(KafkaCluster cluster, Integer brokerId) {
-    return Mono.justOrEmpty(
-            metricsCache.get(cluster).getJmxMetrics().getInternalBrokerMetrics().get(brokerId));
+  public Mono<List<RawMetric>> getBrokerMetrics(KafkaCluster cluster, Integer brokerId) {
+    return Mono.justOrEmpty(statisticsCache.get(cluster).getMetrics().getPerBrokerMetrics().get(brokerId));
   }
 
 }

+ 6 - 6
kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/ClusterService.java

@@ -18,33 +18,33 @@ import reactor.core.publisher.Mono;
 @Slf4j
 public class ClusterService {
 
-  private final MetricsCache metricsCache;
+  private final StatisticsCache statisticsCache;
   private final ClustersStorage clustersStorage;
   private final ClusterMapper clusterMapper;
-  private final MetricsService metricsService;
+  private final StatisticsService statisticsService;
 
   public List<ClusterDTO> getClusters() {
     return clustersStorage.getKafkaClusters()
         .stream()
-        .map(c -> clusterMapper.toCluster(new InternalClusterState(c, metricsCache.get(c))))
+        .map(c -> clusterMapper.toCluster(new InternalClusterState(c, statisticsCache.get(c))))
         .collect(Collectors.toList());
   }
 
   public Mono<ClusterStatsDTO> getClusterStats(KafkaCluster cluster) {
     return Mono.justOrEmpty(
         clusterMapper.toClusterStats(
-            new InternalClusterState(cluster, metricsCache.get(cluster)))
+            new InternalClusterState(cluster, statisticsCache.get(cluster)))
     );
   }
 
   public Mono<ClusterMetricsDTO> getClusterMetrics(KafkaCluster cluster) {
     return Mono.just(
         clusterMapper.toClusterMetrics(
-            metricsCache.get(cluster).getJmxMetrics()));
+            statisticsCache.get(cluster).getMetrics()));
   }
 
   public Mono<ClusterDTO> updateCluster(KafkaCluster cluster) {
-    return metricsService.updateCache(cluster)
+    return statisticsService.updateCache(cluster)
         .map(metrics -> clusterMapper.toCluster(new InternalClusterState(cluster, metrics)));
   }
 }

+ 4 - 4
kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/ClustersMetricsScheduler.java → kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/ClustersStatisticsScheduler.java

@@ -10,20 +10,20 @@ import reactor.core.scheduler.Schedulers;
 @Component
 @RequiredArgsConstructor
 @Slf4j
-public class ClustersMetricsScheduler {
+public class ClustersStatisticsScheduler {
 
   private final ClustersStorage clustersStorage;
 
-  private final MetricsService metricsService;
+  private final StatisticsService statisticsService;
 
   @Scheduled(fixedRateString = "${kafka.update-metrics-rate-millis:30000}")
-  public void updateMetrics() {
+  public void updateStatistics() {
     Flux.fromIterable(clustersStorage.getKafkaClusters())
         .parallel()
         .runOn(Schedulers.parallel())
         .flatMap(cluster -> {
           log.debug("Start getting metrics for kafkaCluster: {}", cluster.getName());
-          return metricsService.updateCache(cluster)
+          return statisticsService.updateCache(cluster)
               .doOnSuccess(m -> log.debug("Metrics updated for cluster: {}", cluster.getName()));
         })
         .then()

+ 7 - 40
kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/MetricsCache.java → kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/StatisticsCache.java

@@ -1,61 +1,28 @@
 package com.provectus.kafka.ui.service;
 
-import com.provectus.kafka.ui.model.Feature;
-import com.provectus.kafka.ui.model.InternalLogDirStats;
 import com.provectus.kafka.ui.model.KafkaCluster;
 import com.provectus.kafka.ui.model.ServerStatusDTO;
-import com.provectus.kafka.ui.util.JmxClusterUtil;
+import com.provectus.kafka.ui.model.Statistics;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
-import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
-import lombok.Builder;
-import lombok.Value;
 import org.apache.kafka.clients.admin.ConfigEntry;
 import org.apache.kafka.clients.admin.TopicDescription;
 import org.springframework.stereotype.Component;
 
 @Component
-public class MetricsCache {
+public class StatisticsCache {
 
-  @Value
-  @Builder(toBuilder = true)
-  public static class Metrics {
-    ServerStatusDTO status;
-    Throwable lastKafkaException;
-    String version;
-    List<Feature> features;
-    ReactiveAdminClient.ClusterDescription clusterDescription;
-    JmxClusterUtil.JmxMetrics jmxMetrics;
-    InternalLogDirStats logDirInfo;
-    Map<String, TopicDescription> topicDescriptions;
-    Map<String, List<ConfigEntry>> topicConfigs;
+  private final Map<String, Statistics> cache = new ConcurrentHashMap<>();
 
-    public static Metrics empty() {
-      return builder()
-          .status(ServerStatusDTO.OFFLINE)
-          .version("Unknown")
-          .features(List.of())
-          .clusterDescription(
-              new ReactiveAdminClient.ClusterDescription(null, null, List.of(), Set.of()))
-          .jmxMetrics(JmxClusterUtil.JmxMetrics.empty())
-          .logDirInfo(InternalLogDirStats.empty())
-          .topicDescriptions(Map.of())
-          .topicConfigs(Map.of())
-          .build();
-    }
-  }
-
-  private final Map<String, Metrics> cache = new ConcurrentHashMap<>();
-
-  public MetricsCache(ClustersStorage clustersStorage) {
-    var initializing = Metrics.empty().toBuilder().status(ServerStatusDTO.INITIALIZING).build();
+  public StatisticsCache(ClustersStorage clustersStorage) {
+    var initializing = Statistics.empty().toBuilder().status(ServerStatusDTO.INITIALIZING).build();
     clustersStorage.getKafkaClusters().forEach(c -> cache.put(c.getName(), initializing));
   }
 
-  public synchronized void replace(KafkaCluster c, Metrics stats) {
+  public synchronized void replace(KafkaCluster c, Statistics stats) {
     cache.put(c.getName(), stats);
   }
 
@@ -91,7 +58,7 @@ public class MetricsCache {
     );
   }
 
-  public Metrics get(KafkaCluster c) {
+  public Statistics get(KafkaCluster c) {
     return Objects.requireNonNull(cache.get(c.getName()), "Unknown cluster metrics requested");
   }
 

+ 13 - 11
kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/MetricsService.java → kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/StatisticsService.java

@@ -3,8 +3,10 @@ package com.provectus.kafka.ui.service;
 import com.provectus.kafka.ui.model.Feature;
 import com.provectus.kafka.ui.model.InternalLogDirStats;
 import com.provectus.kafka.ui.model.KafkaCluster;
+import com.provectus.kafka.ui.model.Metrics;
 import com.provectus.kafka.ui.model.ServerStatusDTO;
-import com.provectus.kafka.ui.util.JmxClusterUtil;
+import com.provectus.kafka.ui.model.Statistics;
+import com.provectus.kafka.ui.service.metrics.MetricsCollector;
 import java.util.List;
 import java.util.Map;
 import lombok.RequiredArgsConstructor;
@@ -17,33 +19,33 @@ import reactor.core.publisher.Mono;
 @Service
 @RequiredArgsConstructor
 @Slf4j
-public class MetricsService {
+public class StatisticsService {
 
-  private final JmxClusterUtil jmxClusterUtil;
+  private final MetricsCollector metricsClusterUtil;
   private final AdminClientService adminClientService;
   private final FeatureService featureService;
-  private final MetricsCache cache;
+  private final StatisticsCache cache;
 
-  public Mono<MetricsCache.Metrics> updateCache(KafkaCluster c) {
-    return getMetrics(c).doOnSuccess(m -> cache.replace(c, m));
+  public Mono<Statistics> updateCache(KafkaCluster c) {
+    return getStatistics(c).doOnSuccess(m -> cache.replace(c, m));
   }
 
-  private Mono<MetricsCache.Metrics> getMetrics(KafkaCluster cluster) {
+  private Mono<Statistics> getStatistics(KafkaCluster cluster) {
     return adminClientService.get(cluster).flatMap(ac ->
             ac.describeCluster().flatMap(description ->
                 Mono.zip(
                     List.of(
-                        jmxClusterUtil.getBrokerMetrics(cluster, description.getNodes()),
+                        metricsClusterUtil.getBrokerMetrics(cluster, description.getNodes()),
                         getLogDirInfo(cluster, ac),
                         featureService.getAvailableFeatures(cluster, description.getController()),
                         loadTopicConfigs(cluster),
                         describeTopics(cluster)),
                     results ->
-                        MetricsCache.Metrics.builder()
+                        Statistics.builder()
                             .status(ServerStatusDTO.ONLINE)
                             .clusterDescription(description)
                             .version(ac.getVersion())
-                            .jmxMetrics((JmxClusterUtil.JmxMetrics) results[0])
+                            .metrics((Metrics) results[0])
                             .logDirInfo((InternalLogDirStats) results[1])
                             .features((List<Feature>) results[2])
                             .topicConfigs((Map<String, List<ConfigEntry>>) results[3])
@@ -53,7 +55,7 @@ public class MetricsService {
         .doOnError(e ->
             log.error("Failed to collect cluster {} info", cluster.getName(), e))
         .onErrorResume(
-            e -> Mono.just(MetricsCache.Metrics.empty().toBuilder().lastKafkaException(e).build()));
+            e -> Mono.just(Statistics.empty().toBuilder().lastKafkaException(e).build()));
   }
 
   private Mono<InternalLogDirStats> getLogDirInfo(KafkaCluster cluster, ReactiveAdminClient c) {

+ 24 - 23
kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/TopicsService.java

@@ -15,13 +15,14 @@ import com.provectus.kafka.ui.model.InternalReplica;
 import com.provectus.kafka.ui.model.InternalTopic;
 import com.provectus.kafka.ui.model.InternalTopicConfig;
 import com.provectus.kafka.ui.model.KafkaCluster;
+import com.provectus.kafka.ui.model.Metrics;
 import com.provectus.kafka.ui.model.PartitionsIncreaseDTO;
 import com.provectus.kafka.ui.model.PartitionsIncreaseResponseDTO;
 import com.provectus.kafka.ui.model.ReplicationFactorChangeDTO;
 import com.provectus.kafka.ui.model.ReplicationFactorChangeResponseDTO;
+import com.provectus.kafka.ui.model.Statistics;
 import com.provectus.kafka.ui.model.TopicCreationDTO;
 import com.provectus.kafka.ui.model.TopicUpdateDTO;
-import com.provectus.kafka.ui.util.JmxClusterUtil;
 import java.time.Duration;
 import java.util.Collection;
 import java.util.Collections;
@@ -51,7 +52,7 @@ public class TopicsService {
 
   private final AdminClientService adminClientService;
   private final DeserializationService deserializationService;
-  private final MetricsCache metricsCache;
+  private final StatisticsCache statisticsCache;
   @Value("${topic.recreate.maxRetries:15}")
   private int recreateMaxRetries;
   @Value("${topic.recreate.delay.seconds:1}")
@@ -69,15 +70,15 @@ public class TopicsService {
         .flatMap(ac ->
             ac.describeTopics(topics).zipWith(ac.getTopicsConfig(topics),
                 (descriptions, configs) -> {
-                  metricsCache.update(c, descriptions, configs);
+                  statisticsCache.update(c, descriptions, configs);
                   return getPartitionOffsets(descriptions, ac).map(offsets -> {
-                    var metrics = metricsCache.get(c);
+                    var metrics = statisticsCache.get(c);
                     return createList(
                         topics,
                         descriptions,
                         configs,
                         offsets,
-                        metrics.getJmxMetrics(),
+                        metrics.getMetrics(),
                         metrics.getLogDirInfo()
                     );
                   });
@@ -118,7 +119,7 @@ public class TopicsService {
                                          Map<String, TopicDescription> descriptions,
                                          Map<String, List<ConfigEntry>> configs,
                                          InternalPartitionsOffsets partitionsOffsets,
-                                         JmxClusterUtil.JmxMetrics jmxMetrics,
+                                         Metrics metrics,
                                          InternalLogDirStats logDirInfo) {
     return orderedNames.stream()
         .filter(descriptions::containsKey)
@@ -126,7 +127,7 @@ public class TopicsService {
             descriptions.get(t),
             configs.getOrDefault(t, List.of()),
             partitionsOffsets,
-            jmxMetrics,
+            metrics,
             logDirInfo
         ))
         .collect(toList());
@@ -249,7 +250,7 @@ public class TopicsService {
         .flatMap(ac -> {
           Integer actual = topic.getReplicationFactor();
           Integer requested = replicationFactorChange.getTotalReplicationFactor();
-          Integer brokersCount = metricsCache.get(cluster).getClusterDescription()
+          Integer brokersCount = statisticsCache.get(cluster).getClusterDescription()
               .getNodes().size();
 
           if (requested.equals(actual)) {
@@ -365,7 +366,7 @@ public class TopicsService {
 
   private Map<Integer, Integer> getBrokersMap(KafkaCluster cluster,
                                               Map<Integer, List<Integer>> currentAssignment) {
-    Map<Integer, Integer> result = metricsCache.get(cluster).getClusterDescription().getNodes()
+    Map<Integer, Integer> result = statisticsCache.get(cluster).getClusterDescription().getNodes()
         .stream()
         .map(Node::id)
         .collect(toMap(
@@ -413,9 +414,9 @@ public class TopicsService {
   }
 
   public Mono<Void> deleteTopic(KafkaCluster cluster, String topicName) {
-    if (metricsCache.get(cluster).getFeatures().contains(Feature.TOPIC_DELETION)) {
+    if (statisticsCache.get(cluster).getFeatures().contains(Feature.TOPIC_DELETION)) {
       return adminClientService.get(cluster).flatMap(c -> c.deleteTopic(topicName))
-          .doOnSuccess(t -> metricsCache.onTopicDelete(cluster, topicName));
+          .doOnSuccess(t -> statisticsCache.onTopicDelete(cluster, topicName));
     } else {
       return Mono.error(new ValidationException("Topic deletion restricted"));
     }
@@ -441,18 +442,18 @@ public class TopicsService {
   }
 
   public Mono<List<InternalTopic>> getTopicsForPagination(KafkaCluster cluster) {
-    MetricsCache.Metrics metrics = metricsCache.get(cluster);
-    return filterExisting(cluster, metrics.getTopicDescriptions().keySet())
-            .map(lst -> lst.stream()
-                .map(topicName ->
-                    InternalTopic.from(
-                        metrics.getTopicDescriptions().get(topicName),
-                        metrics.getTopicConfigs().getOrDefault(topicName, List.of()),
-                        InternalPartitionsOffsets.empty(),
-                        metrics.getJmxMetrics(),
-                        metrics.getLogDirInfo()))
-                .collect(toList())
-            );
+    Statistics stats = statisticsCache.get(cluster);
+    return filterExisting(cluster, stats.getTopicDescriptions().keySet())
+        .map(lst -> lst.stream()
+            .map(topicName ->
+                InternalTopic.from(
+                    stats.getTopicDescriptions().get(topicName),
+                    stats.getTopicConfigs().getOrDefault(topicName, List.of()),
+                    InternalPartitionsOffsets.empty(),
+                    stats.getMetrics(),
+                    stats.getLogDirInfo()))
+            .collect(toList())
+        );
   }
 
   private Mono<List<String>> filterExisting(KafkaCluster cluster, Collection<String> topics) {

+ 85 - 0
kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/metrics/JmxMetricsFormatter.java

@@ -0,0 +1,85 @@
+package com.provectus.kafka.ui.service.metrics;
+
+import java.math.BigDecimal;
+import java.util.ArrayList;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Optional;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import javax.management.MBeanAttributeInfo;
+import javax.management.ObjectName;
+
+/**
+ * Converts JMX metrics into JmxExporter prometheus format: <a href="https://github.com/prometheus/jmx_exporter#default-format">format</a>.
+ */
+class JmxMetricsFormatter {
+
+  // copied from https://github.com/prometheus/jmx_exporter/blob/b6b811b4aae994e812e902b26dd41f29364c0e2b/collector/src/main/java/io/prometheus/jmx/JmxMBeanPropertyCache.java#L15
+  private static final Pattern PROPERTY_PATTERN = Pattern.compile(
+      "([^,=:\\*\\?]+)=(\"(?:[^\\\\\"]*(?:\\\\.)?)*\"|[^,=:\"]*)");
+
+  static List<RawMetric> constructMetricsList(ObjectName jmxMetric,
+                                              MBeanAttributeInfo[] attributes,
+                                              Object[] attrValues) {
+    String domain = fixIllegalChars(jmxMetric.getDomain());
+    LinkedHashMap<String, String> labels = getLabelsMap(jmxMetric);
+    String firstLabel = labels.keySet().iterator().next();
+    String firstLabelValue = fixIllegalChars(labels.get(firstLabel));
+    labels.remove(firstLabel); //removing first label since it's value will be in name
+
+    List<RawMetric> result = new ArrayList<>(attributes.length);
+    for (int i = 0; i < attributes.length; i++) {
+      String attrName = fixIllegalChars(attributes[i].getName());
+      convertNumericValue(attrValues[i]).ifPresent(convertedValue -> {
+        String name = String.format("%s_%s_%s", domain, firstLabelValue, attrName);
+        var metric = RawMetric.create(name, labels, convertedValue);
+        result.add(metric);
+      });
+    }
+    return result;
+  }
+
+  private static String fixIllegalChars(String str) {
+    return str
+        .replace('.', '_')
+        .replace('-', '_');
+  }
+
+  private static Optional<BigDecimal> convertNumericValue(Object value) {
+    if (!(value instanceof Number)) {
+      return Optional.empty();
+    }
+    try {
+      if (value instanceof Long) {
+        return Optional.of(new BigDecimal((Long) value));
+      } else if (value instanceof Integer) {
+        return Optional.of(new BigDecimal((Integer) value));
+      }
+      return Optional.of(new BigDecimal(value.toString()));
+    } catch (NumberFormatException nfe) {
+      return Optional.empty();
+    }
+  }
+
+  /**
+   * Converts Mbean properties to map keeping order (copied from jmx_exporter repo).
+   */
+  private static LinkedHashMap<String, String> getLabelsMap(ObjectName mbeanName) {
+    LinkedHashMap<String, String> keyProperties = new LinkedHashMap<>();
+    String properties = mbeanName.getKeyPropertyListString();
+    Matcher match = PROPERTY_PATTERN.matcher(properties);
+    while (match.lookingAt()) {
+      String labelName = fixIllegalChars(match.group(1)); // label names should be fixed
+      String labelValue = match.group(2);
+      keyProperties.put(labelName, labelValue);
+      properties = properties.substring(match.end());
+      if (properties.startsWith(",")) {
+        properties = properties.substring(1);
+      }
+      match.reset(properties);
+    }
+    return keyProperties;
+  }
+
+}

+ 106 - 0
kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/metrics/JmxMetricsRetriever.java

@@ -0,0 +1,106 @@
+package com.provectus.kafka.ui.service.metrics;
+
+import com.provectus.kafka.ui.model.JmxConnectionInfo;
+import com.provectus.kafka.ui.model.KafkaCluster;
+import com.provectus.kafka.ui.util.JmxPoolFactory;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import javax.management.MBeanAttributeInfo;
+import javax.management.MBeanServerConnection;
+import javax.management.ObjectName;
+import javax.management.remote.JMXConnector;
+import lombok.SneakyThrows;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.pool2.impl.GenericKeyedObjectPool;
+import org.apache.commons.pool2.impl.GenericKeyedObjectPoolConfig;
+import org.apache.kafka.common.Node;
+import org.springframework.context.annotation.Lazy;
+import org.springframework.stereotype.Service;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+import reactor.core.scheduler.Schedulers;
+
+
+@Service
+@Lazy
+@Slf4j
+class JmxMetricsRetriever implements MetricsRetriever, AutoCloseable {
+
+  private static final String JMX_URL = "service:jmx:rmi:///jndi/rmi://";
+  private static final String JMX_SERVICE_TYPE = "jmxrmi";
+  private static final String CANONICAL_NAME_PATTERN = "kafka.server*:*";
+
+  private final GenericKeyedObjectPool<JmxConnectionInfo, JMXConnector> pool;
+
+  public JmxMetricsRetriever() {
+    this.pool = new GenericKeyedObjectPool<>(new JmxPoolFactory());
+    GenericKeyedObjectPoolConfig<JMXConnector> poolConfig = new GenericKeyedObjectPoolConfig<>();
+    poolConfig.setMaxIdlePerKey(3);
+    poolConfig.setMaxTotalPerKey(3);
+    this.pool.setConfig(poolConfig);
+  }
+
+  @Override
+  public Flux<RawMetric> retrieve(KafkaCluster c, Node node) {
+    return Mono.fromSupplier(() -> retrieveSync(c, node))
+        .subscribeOn(Schedulers.boundedElastic())
+        .flatMapMany(Flux::fromIterable);
+  }
+
+  private List<RawMetric> retrieveSync(KafkaCluster c, Node node) {
+    String jmxUrl = JMX_URL + node.host() + ":" + c.getMetricsConfig().getPort() + "/" + JMX_SERVICE_TYPE;
+    log.debug("Collection JMX metrics for {}", jmxUrl);
+    final var connectionInfo = JmxConnectionInfo.builder()
+        .url(jmxUrl)
+        .ssl(c.getMetricsConfig().isSsl())
+        .username(c.getMetricsConfig().getUsername())
+        .password(c.getMetricsConfig().getPassword())
+        .build();
+    JMXConnector srv;
+    try {
+      srv = pool.borrowObject(connectionInfo);
+    } catch (Exception e) {
+      log.error("Cannot get JMX connector for the pool due to: ", e);
+      return Collections.emptyList();
+    }
+    List<RawMetric> result = new ArrayList<>();
+    try {
+      MBeanServerConnection msc = srv.getMBeanServerConnection();
+      var jmxMetrics = msc.queryNames(new ObjectName(CANONICAL_NAME_PATTERN), null);
+      for (ObjectName jmxMetric : jmxMetrics) {
+        result.addAll(extractObjectMetrics(jmxMetric, msc));
+      }
+      pool.returnObject(connectionInfo, srv);
+    } catch (Exception e) {
+      log.error("Error getting jmx metrics from {}", jmxUrl, e);
+      closeConnectionExceptionally(jmxUrl, srv);
+    }
+    log.debug("{} metrics collected for {}", result.size(), jmxUrl);
+    return result;
+  }
+
+  private void closeConnectionExceptionally(String url, JMXConnector srv) {
+    try {
+      pool.invalidateObject(new JmxConnectionInfo(url), srv);
+    } catch (Exception e) {
+      log.error("Cannot invalidate object in pool, {}", url, e);
+    }
+  }
+
+  @SneakyThrows
+  private List<RawMetric> extractObjectMetrics(ObjectName objectName, MBeanServerConnection msc) {
+    MBeanAttributeInfo[] attrNames = msc.getMBeanInfo(objectName).getAttributes();
+    Object[] attrValues = new Object[attrNames.length];
+    for (int i = 0; i < attrNames.length; i++) {
+      attrValues[i] = msc.getAttribute(objectName, attrNames[i].getName());
+    }
+    return JmxMetricsFormatter.constructMetricsList(objectName, attrNames, attrValues);
+  }
+
+  @Override
+  public void close() {
+    this.pool.close();
+  }
+}
+

+ 69 - 0
kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/metrics/MetricsCollector.java

@@ -0,0 +1,69 @@
+package com.provectus.kafka.ui.service.metrics;
+
+import com.provectus.kafka.ui.model.KafkaCluster;
+import com.provectus.kafka.ui.model.Metrics;
+import com.provectus.kafka.ui.model.MetricsConfig;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.common.Node;
+import org.springframework.stereotype.Component;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+import reactor.util.function.Tuple2;
+import reactor.util.function.Tuples;
+
+@Component
+@Slf4j
+@RequiredArgsConstructor
+public class MetricsCollector {
+
+  private final JmxMetricsRetriever jmxMetricsRetriever;
+  private final PrometheusMetricsRetriever prometheusMetricsRetriever;
+
+  public Mono<Metrics> getBrokerMetrics(KafkaCluster cluster, Collection<Node> nodes) {
+    return Flux.fromIterable(nodes)
+        .flatMap(n -> getMetrics(cluster, n).map(lst -> Tuples.of(n, lst)))
+        .collectMap(Tuple2::getT1, Tuple2::getT2)
+        .map(nodeMetrics -> collectMetrics(cluster, nodeMetrics))
+        .defaultIfEmpty(Metrics.empty());
+  }
+
+  private Mono<List<RawMetric>> getMetrics(KafkaCluster kafkaCluster, Node node) {
+    Flux<RawMetric> metricFlux = Flux.empty();
+    if (kafkaCluster.getMetricsConfig() != null) {
+      String type = kafkaCluster.getMetricsConfig().getType();
+      if (type == null || type.equalsIgnoreCase(MetricsConfig.JMX_METRICS_TYPE)) {
+        metricFlux = jmxMetricsRetriever.retrieve(kafkaCluster, node);
+      } else if (type.equalsIgnoreCase(MetricsConfig.PROMETHEUS_METRICS_TYPE)) {
+        metricFlux = prometheusMetricsRetriever.retrieve(kafkaCluster, node);
+      }
+    }
+    return metricFlux.collectList();
+  }
+
+  public Metrics collectMetrics(KafkaCluster cluster, Map<Node, List<RawMetric>> perBrokerMetrics) {
+    Metrics.MetricsBuilder builder = Metrics.builder()
+        .perBrokerMetrics(
+            perBrokerMetrics.entrySet()
+                .stream()
+                .collect(Collectors.toMap(e -> e.getKey().id(), Map.Entry::getValue)));
+
+    populateWellknowMetrics(cluster, perBrokerMetrics)
+        .apply(builder);
+
+    return builder.build();
+  }
+
+  private WellKnownMetrics populateWellknowMetrics(KafkaCluster cluster, Map<Node, List<RawMetric>> perBrokerMetrics) {
+    WellKnownMetrics wellKnownMetrics = new WellKnownMetrics();
+    perBrokerMetrics.forEach((node, metrics) ->
+        metrics.forEach(metric ->
+            wellKnownMetrics.populate(cluster, node, metric)));
+    return wellKnownMetrics;
+  }
+
+}

+ 9 - 0
kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/metrics/MetricsRetriever.java

@@ -0,0 +1,9 @@
+package com.provectus.kafka.ui.service.metrics;
+
+import com.provectus.kafka.ui.model.KafkaCluster;
+import org.apache.kafka.common.Node;
+import reactor.core.publisher.Flux;
+
+interface MetricsRetriever {
+  Flux<RawMetric> retrieve(KafkaCluster c, Node node);
+}

+ 46 - 0
kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/metrics/PrometheusEndpointMetricsParser.java

@@ -0,0 +1,46 @@
+package com.provectus.kafka.ui.service.metrics;
+
+import java.math.BigDecimal;
+import java.util.Arrays;
+import java.util.Optional;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.math.NumberUtils;
+
+@Slf4j
+class PrometheusEndpointMetricsParser {
+
+  /**
+   * Matches openmetrics format. For example, string:
+   * kafka_server_BrokerTopicMetrics_FiveMinuteRate{name="BytesInPerSec",topic="__consumer_offsets",} 16.94886650744339
+   * will produce:
+   * name=kafka_server_BrokerTopicMetrics_FiveMinuteRate
+   * value=16.94886650744339
+   * labels={name="BytesInPerSec", topic="__consumer_offsets"}",
+   */
+  private static final Pattern PATTERN = Pattern.compile(
+      "(?<metricName>^\\w+)([ \t]*\\{*(?<properties>.*)}*)[ \\t]+(?<value>[\\d]+\\.?[\\d]+)?");
+
+  static Optional<RawMetric> parse(String s) {
+    Matcher matcher = PATTERN.matcher(s);
+    if (matcher.matches()) {
+      String value = matcher.group("value");
+      String metricName = matcher.group("metricName");
+      if (metricName == null || !NumberUtils.isCreatable(value)) {
+        return Optional.empty();
+      }
+      var labels = Arrays.stream(matcher.group("properties").split(","))
+          .filter(str -> !"".equals(str))
+          .map(str -> str.split("="))
+          .filter(spit -> spit.length == 2)
+          .collect(Collectors.toUnmodifiableMap(
+              str -> str[0].trim(),
+              str -> str[1].trim().replace("\"", "")));
+
+      return Optional.of(RawMetric.create(metricName, labels, new BigDecimal(value)));
+    }
+    return Optional.empty();
+  }
+}

+ 58 - 0
kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/metrics/PrometheusMetricsRetriever.java

@@ -0,0 +1,58 @@
+package com.provectus.kafka.ui.service.metrics;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Strings;
+import com.provectus.kafka.ui.model.KafkaCluster;
+import java.util.Arrays;
+import java.util.Optional;
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.common.Node;
+import org.springframework.stereotype.Service;
+import org.springframework.web.reactive.function.client.WebClient;
+import org.springframework.web.util.UriComponentsBuilder;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+
+@Service
+@RequiredArgsConstructor
+@Slf4j
+class PrometheusMetricsRetriever implements MetricsRetriever {
+
+  private static final String METRICS_ENDPOINT_PATH = "/metrics";
+  private static final int DEFAULT_EXPORTER_PORT = 11001;
+
+  private final WebClient webClient;
+
+  @Override
+  public Flux<RawMetric> retrieve(KafkaCluster c, Node node) {
+    log.debug("Retrieving metrics from prometheus exporter: {}:{}", node.host(), c.getMetricsConfig().getPort());
+    var metricsConfig = c.getMetricsConfig();
+    Integer port = Optional.ofNullable(metricsConfig.getPort()).orElse(DEFAULT_EXPORTER_PORT);
+    return retrieve(node.host(), port, metricsConfig.isSsl());
+  }
+
+  @VisibleForTesting
+  Flux<RawMetric> retrieve(String host, int port, boolean ssl) {
+    WebClient.ResponseSpec responseSpec = webClient.get()
+        .uri(UriComponentsBuilder.newInstance()
+            .scheme(ssl ? "https" : "http")
+            .host(host)
+            .port(port)
+            .path(METRICS_ENDPOINT_PATH).build().toUri())
+        .retrieve();
+
+    return responseSpec.bodyToMono(String.class)
+        .doOnError(e -> log.error("Error while getting metrics from {}", host, e))
+        .onErrorResume(th -> Mono.empty())
+        .flatMapMany(body ->
+            Flux.fromStream(
+                Arrays.stream(body.split("\\n"))
+                    .filter(str -> !Strings.isNullOrEmpty(str) && !str.startsWith("#")) // skipping comments strings
+                    .map(PrometheusEndpointMetricsParser::parse)
+                    .filter(Optional::isPresent)
+                    .map(Optional::get)
+            )
+        );
+  }
+}

+ 60 - 0
kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/metrics/RawMetric.java

@@ -0,0 +1,60 @@
+package com.provectus.kafka.ui.service.metrics;
+
+import java.math.BigDecimal;
+import java.util.Map;
+import lombok.AllArgsConstructor;
+import lombok.EqualsAndHashCode;
+import lombok.ToString;
+
+public interface RawMetric {
+
+  String name();
+
+  Map<String, String> labels();
+
+  BigDecimal value();
+
+  // Key, that can be used for metrics reductions
+  default Object identityKey() {
+    return name() + "_" + labels();
+  }
+
+  RawMetric copyWithValue(BigDecimal newValue);
+
+  //--------------------------------------------------
+
+  static RawMetric create(String name, Map<String, String> labels, BigDecimal value) {
+    return new SimpleMetric(name, labels, value);
+  }
+
+  @AllArgsConstructor
+  @EqualsAndHashCode
+  @ToString
+  class SimpleMetric implements RawMetric {
+
+    private final String name;
+    private final Map<String, String> labels;
+    private final BigDecimal value;
+
+    @Override
+    public String name() {
+      return name;
+    }
+
+    @Override
+    public Map<String, String> labels() {
+      return labels;
+    }
+
+    @Override
+    public BigDecimal value() {
+      return value;
+    }
+
+    @Override
+    public RawMetric copyWithValue(BigDecimal newValue) {
+      return new SimpleMetric(name, labels, newValue);
+    }
+  }
+
+}

+ 42 - 0
kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/metrics/WellKnownMetrics.java

@@ -0,0 +1,42 @@
+package com.provectus.kafka.ui.service.metrics;
+
+import static org.apache.commons.lang3.StringUtils.containsIgnoreCase;
+import static org.apache.commons.lang3.StringUtils.endsWithIgnoreCase;
+
+import com.provectus.kafka.ui.model.KafkaCluster;
+import com.provectus.kafka.ui.model.Metrics;
+import java.math.BigDecimal;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.kafka.common.Node;
+
+class WellKnownMetrics {
+
+  final Map<String, BigDecimal> bytesInFifteenMinuteRate = new HashMap<>();
+  final Map<String, BigDecimal> bytesOutFifteenMinuteRate = new HashMap<>();
+
+  void populate(KafkaCluster cluster, Node node, RawMetric rawMetric) {
+    updateTopicsIOrates(rawMetric);
+  }
+
+  void apply(Metrics.MetricsBuilder metricsBuilder) {
+    metricsBuilder.bytesInPerSec(bytesInFifteenMinuteRate);
+    metricsBuilder.bytesOutPerSec(bytesOutFifteenMinuteRate);
+  }
+
+  private void updateTopicsIOrates(RawMetric rawMetric) {
+    String name = rawMetric.name();
+    String topic = rawMetric.labels().get("topic");
+    if (topic != null
+        && containsIgnoreCase(name, "BrokerTopicMetrics")
+        && endsWithIgnoreCase(name, "FifteenMinuteRate")) {
+      String nameProperty = rawMetric.labels().get("name");
+      if ("BytesInPerSec".equalsIgnoreCase(nameProperty)) {
+        bytesInFifteenMinuteRate.compute(topic, (k, v) -> v == null ? rawMetric.value() : v.add(rawMetric.value()));
+      } else if ("BytesOutPerSec".equalsIgnoreCase(nameProperty)) {
+        bytesOutFifteenMinuteRate.compute(topic, (k, v) -> v == null ? rawMetric.value() : v.add(rawMetric.value()));
+      }
+    }
+  }
+
+}

+ 0 - 0
kafka-ui-api/src/main/java/com/provectus/kafka/ui/util/ClusterUtil.java


+ 0 - 217
kafka-ui-api/src/main/java/com/provectus/kafka/ui/util/JmxClusterUtil.java

@@ -1,217 +0,0 @@
-package com.provectus.kafka.ui.util;
-
-import static java.util.stream.Collectors.groupingBy;
-import static java.util.stream.Collectors.reducing;
-import static java.util.stream.Collectors.toList;
-
-import com.provectus.kafka.ui.model.JmxBrokerMetrics;
-import com.provectus.kafka.ui.model.JmxConnectionInfo;
-import com.provectus.kafka.ui.model.KafkaCluster;
-import com.provectus.kafka.ui.model.MetricDTO;
-import java.math.BigDecimal;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Hashtable;
-import java.util.List;
-import java.util.Map;
-import java.util.Optional;
-import java.util.stream.Collectors;
-import java.util.stream.Stream;
-import javax.management.MBeanAttributeInfo;
-import javax.management.MBeanServerConnection;
-import javax.management.ObjectName;
-import javax.management.remote.JMXConnector;
-import lombok.Builder;
-import lombok.RequiredArgsConstructor;
-import lombok.SneakyThrows;
-import lombok.Value;
-import lombok.extern.slf4j.Slf4j;
-import org.apache.commons.pool2.KeyedObjectPool;
-import org.apache.kafka.common.Node;
-import org.jetbrains.annotations.Nullable;
-import org.springframework.stereotype.Component;
-import reactor.core.publisher.Flux;
-import reactor.core.publisher.Mono;
-import reactor.core.scheduler.Schedulers;
-import reactor.util.function.Tuple2;
-import reactor.util.function.Tuples;
-
-@Component
-@Slf4j
-@RequiredArgsConstructor
-public class JmxClusterUtil {
-
-  private static final String JMX_URL = "service:jmx:rmi:///jndi/rmi://";
-  private static final String JMX_SERVICE_TYPE = "jmxrmi";
-  private static final String KAFKA_SERVER_PARAM = "kafka.server";
-  private static final String NAME_METRIC_FIELD = "name";
-  private final KeyedObjectPool<JmxConnectionInfo, JMXConnector> pool;
-
-  @Builder
-  @Value
-  public static class JmxMetrics {
-    Map<String, BigDecimal> bytesInPerSec;
-    Map<String, BigDecimal> bytesOutPerSec;
-    Map<Integer, JmxBrokerMetrics> internalBrokerMetrics;
-    List<MetricDTO> metrics;
-
-    public static JmxMetrics empty() {
-      return JmxClusterUtil.JmxMetrics.builder()
-          .bytesInPerSec(Map.of())
-          .bytesOutPerSec(Map.of())
-          .internalBrokerMetrics(Map.of())
-          .metrics(List.of())
-          .build();
-    }
-  }
-
-  public Mono<JmxMetrics> getBrokerMetrics(KafkaCluster cluster, Collection<Node> nodes) {
-    return Flux.fromIterable(nodes)
-        // jmx is a blocking api, so we trying to parallelize its execution on boundedElastic scheduler
-        .parallel()
-        .runOn(Schedulers.boundedElastic())
-        .map(n -> Map.entry(n.id(),
-            JmxBrokerMetrics.builder().metrics(getJmxMetric(cluster, n)).build()))
-        .sequential()
-        .collectMap(Map.Entry::getKey, Map.Entry::getValue)
-        .map(this::collectMetrics);
-  }
-
-  private List<MetricDTO> getJmxMetric(KafkaCluster cluster, Node node) {
-    return Optional.of(cluster)
-        .filter(c -> c.getJmxPort() != null)
-        .filter(c -> c.getJmxPort() > 0)
-        .map(c -> getJmxMetrics(node.host(), c.getJmxPort(), c.isJmxSsl(),
-            c.getJmxUsername(), c.getJmxPassword()))
-        .orElse(Collections.emptyList());
-  }
-
-  @SneakyThrows
-  private List<MetricDTO> getJmxMetrics(String host, int port, boolean jmxSsl,
-                                        @Nullable String username, @Nullable String password) {
-    String jmxUrl = JMX_URL + host + ":" + port + "/" + JMX_SERVICE_TYPE;
-    final var connectionInfo = JmxConnectionInfo.builder()
-        .url(jmxUrl)
-        .ssl(jmxSsl)
-        .username(username)
-        .password(password)
-        .build();
-    JMXConnector srv;
-    try {
-      srv = pool.borrowObject(connectionInfo);
-    } catch (Exception e) {
-      log.error("Cannot get JMX connector for the pool due to: ", e);
-      return Collections.emptyList();
-    }
-
-    List<MetricDTO> result = new ArrayList<>();
-    try {
-      MBeanServerConnection msc = srv.getMBeanServerConnection();
-      var jmxMetrics = msc.queryNames(null, null).stream()
-          .filter(q -> q.getCanonicalName().startsWith(KAFKA_SERVER_PARAM))
-          .collect(Collectors.toList());
-      for (ObjectName jmxMetric : jmxMetrics) {
-        final Hashtable<String, String> params = jmxMetric.getKeyPropertyList();
-        MetricDTO metric = new MetricDTO();
-        metric.setName(params.get(NAME_METRIC_FIELD));
-        metric.setCanonicalName(jmxMetric.getCanonicalName());
-        metric.setParams(params);
-        metric.setValue(getJmxMetrics(jmxMetric.getCanonicalName(), msc));
-        result.add(metric);
-      }
-      pool.returnObject(connectionInfo, srv);
-    } catch (Exception e) {
-      log.error("Cannot get jmxMetricsNames, {}", jmxUrl, e);
-      closeConnectionExceptionally(jmxUrl, srv);
-    }
-    return result;
-  }
-
-  @SneakyThrows
-  private Map<String, BigDecimal> getJmxMetrics(String canonicalName, MBeanServerConnection msc) {
-    Map<String, BigDecimal> resultAttr = new HashMap<>();
-    ObjectName name = new ObjectName(canonicalName);
-    var attrNames = msc.getMBeanInfo(name).getAttributes();
-    for (MBeanAttributeInfo attrName : attrNames) {
-      var value = msc.getAttribute(name, attrName.getName());
-      if (NumberUtil.isNumeric(value)) {
-        resultAttr.put(attrName.getName(), new BigDecimal(value.toString()));
-      }
-    }
-    return resultAttr;
-  }
-
-  private JmxMetrics collectMetrics(Map<Integer, JmxBrokerMetrics> perBrokerJmxMetrics) {
-    final List<MetricDTO> metrics = perBrokerJmxMetrics.values()
-        .stream()
-        .flatMap(b -> b.getMetrics().stream())
-        .collect(
-            groupingBy(
-                MetricDTO::getCanonicalName,
-                reducing(this::reduceJmxMetrics)
-            )
-        ).values().stream()
-        .filter(Optional::isPresent)
-        .map(Optional::get)
-        .collect(toList());
-    return JmxMetrics.builder()
-        .metrics(metrics)
-        .internalBrokerMetrics(perBrokerJmxMetrics)
-        .bytesInPerSec(findTopicMetrics(
-            metrics, JmxMetricsName.BYTES_IN_PER_SEC, JmxMetricsValueName.FIFTEEN_MINUTE_RATE))
-        .bytesOutPerSec(findTopicMetrics(
-            metrics, JmxMetricsName.BYTES_OUT_PER_SEC, JmxMetricsValueName.FIFTEEN_MINUTE_RATE))
-        .build();
-  }
-
-  private Map<String, BigDecimal> findTopicMetrics(List<MetricDTO> metrics,
-                                                   JmxMetricsName metricsName,
-                                                   JmxMetricsValueName valueName) {
-    return metrics.stream().filter(m -> metricsName.getValue().equals(m.getName()))
-        .filter(m -> m.getParams().containsKey("topic"))
-        .filter(m -> m.getValue().containsKey(valueName.getValue()))
-        .map(m -> Tuples.of(
-            m.getParams().get("topic"),
-            m.getValue().get(valueName.getValue())
-        )).collect(groupingBy(
-            Tuple2::getT1,
-            reducing(BigDecimal.ZERO, Tuple2::getT2, BigDecimal::add)
-        ));
-  }
-
-  private void closeConnectionExceptionally(String url, JMXConnector srv) {
-    try {
-      pool.invalidateObject(new JmxConnectionInfo(url), srv);
-    } catch (Exception e) {
-      log.error("Cannot invalidate object in pool, {}", url);
-    }
-  }
-
-  public MetricDTO reduceJmxMetrics(MetricDTO metric1, MetricDTO metric2) {
-    var result = new MetricDTO();
-    Map<String, BigDecimal> value = Stream.concat(
-        metric1.getValue().entrySet().stream(),
-        metric2.getValue().entrySet().stream()
-    ).collect(Collectors.groupingBy(
-        Map.Entry::getKey,
-        Collectors.reducing(BigDecimal.ZERO, Map.Entry::getValue, BigDecimal::add)
-    ));
-    result.setName(metric1.getName());
-    result.setCanonicalName(metric1.getCanonicalName());
-    result.setParams(metric1.getParams());
-    result.setValue(value);
-    return result;
-  }
-
-  private boolean isWellKnownMetric(MetricDTO metric) {
-    final Optional<String> param =
-        Optional.ofNullable(metric.getParams().get(NAME_METRIC_FIELD)).filter(p ->
-            Arrays.stream(JmxMetricsName.values()).map(JmxMetricsName::getValue)
-                .anyMatch(n -> n.equals(p))
-        );
-    return metric.getCanonicalName().contains(KAFKA_SERVER_PARAM) && param.isPresent();
-  }
-}

+ 0 - 41
kafka-ui-api/src/main/java/com/provectus/kafka/ui/util/JmxMetricsName.java

@@ -1,41 +0,0 @@
-package com.provectus.kafka.ui.util;
-
-public enum JmxMetricsName {
-  MESSAGES_IN_PER_SEC("MessagesInPerSec"),
-  BYTES_IN_PER_SEC("BytesInPerSec"),
-  REPLICATION_BYTES_IN_PER_SEC("ReplicationBytesInPerSec"),
-  REQUESTS_PER_SEC("RequestsPerSec"),
-  ERRORS_PER_SEC("ErrorsPerSec"),
-  MESSAGE_CONVERSIONS_PER_SEC("MessageConversionsPerSec"),
-  BYTES_OUT_PER_SEC("BytesOutPerSec"),
-  REPLICATION_BYTES_OUT_PER_SEC("ReplicationBytesOutPerSec"),
-  NO_KEY_COMPACTED_TOPIC_RECORDS_PER_SEC("NoKeyCompactedTopicRecordsPerSec"),
-  INVALID_MAGIC_NUMBER_RECORDS_PER_SEC("InvalidMagicNumberRecordsPerSec"),
-  INVALID_MESSAGE_CRC_RECORDS_PER_SEC("InvalidMessageCrcRecordsPerSec"),
-  INVALID_OFFSET_OR_SEQUENCE_RECORDS_PER_SEC("InvalidOffsetOrSequenceRecordsPerSec"),
-  UNCLEAN_LEADER_ELECTIONS_PER_SEC("UncleanLeaderElectionsPerSec"),
-  ISR_SHRINKS_PER_SEC("IsrShrinksPerSec"),
-  ISR_EXPANDS_PER_SEC("IsrExpandsPerSec"),
-  REASSIGNMENT_BYTES_OUT_PER_SEC("ReassignmentBytesOutPerSec"),
-  REASSIGNMENT_BYTES_IN_PER_SEC("ReassignmentBytesInPerSec"),
-  PRODUCE_MESSAGE_CONVERSIONS_PER_SEC("ProduceMessageConversionsPerSec"),
-  FAILED_FETCH_REQUESTS_PER_SEC("FailedFetchRequestsPerSec"),
-  ZOOKEEPER_SYNC_CONNECTS_PER_SEC("ZooKeeperSyncConnectsPerSec"),
-  BYTES_REJECTED_PER_SEC("BytesRejectedPerSec"),
-  ZOO_KEEPER_AUTH_FAILURES_PER_SEC("ZooKeeperAuthFailuresPerSec"),
-  TOTAL_FETCH_REQUESTS_PER_SEC("TotalFetchRequestsPerSec"),
-  FAILED_ISR_UPDATES_PER_SEC("FailedIsrUpdatesPerSec"),
-  INCREMENTAL_FETCH_SESSION_EVICTIONS_PER_SEC("IncrementalFetchSessionEvictionsPerSec"),
-  FETCH_MESSAGE_CONVERSIONS_PER_SEC("FetchMessageConversionsPerSec"),
-  FAILED_PRODUCE_REQUESTS_PER_SEC("FailedProduceRequestsPerSe");
-
-  private final String value;
-
-  JmxMetricsName(String value) {
-    this.value = value;
-  }
-
-  public String getValue() {
-    return value;
-  }
-}

+ 0 - 19
kafka-ui-api/src/main/java/com/provectus/kafka/ui/util/JmxMetricsValueName.java

@@ -1,19 +0,0 @@
-package com.provectus.kafka.ui.util;
-
-public enum JmxMetricsValueName {
-  COUNT("Count"),
-  ONE_MINUTE_RATE("OneMinuteRate"),
-  FIFTEEN_MINUTE_RATE("FifteenMinuteRate"),
-  FIVE_MINUTE_RATE("FiveMinuteRate"),
-  MEAN_RATE("MeanRate");
-
-  private final String value;
-
-  JmxMetricsValueName(String value) {
-    this.value = value;
-  }
-
-  public String getValue() {
-    return value;
-  }
-}

+ 0 - 4
kafka-ui-api/src/main/java/com/provectus/kafka/ui/util/NumberUtil.java

@@ -1,7 +1,6 @@
 package com.provectus.kafka.ui.util;
 
 import lombok.extern.slf4j.Slf4j;
-import org.apache.commons.lang3.math.NumberUtils;
 
 @Slf4j
 public class NumberUtil {
@@ -9,9 +8,6 @@ public class NumberUtil {
   private NumberUtil() {
   }
 
-  public static boolean isNumeric(Object value) {
-    return value != null && NumberUtils.isCreatable(value.toString());
-  }
 
   public static float parserClusterVersion(String version) throws NumberFormatException {
     log.trace("Parsing cluster version [{}]", version);

+ 0 - 2
kafka-ui-api/src/main/java/com/provectus/kafka/ui/util/OffsetsSeek.java

@@ -93,11 +93,9 @@ public abstract class OffsetsSeek {
   public static class WaitingOffsets {
     private final Map<Integer, Long> endOffsets; // partition number -> offset
     private final Map<Integer, Long> beginOffsets; // partition number -> offset
-    private final String topic;
 
     public WaitingOffsets(String topic, Consumer<?, ?> consumer,
                           Collection<TopicPartition> partitions) {
-      this.topic = topic;
       var allBeginningOffsets = consumer.beginningOffsets(partitions);
       var allEndOffsets = consumer.endOffsets(partitions);
 

+ 6 - 2
kafka-ui-api/src/main/resources/application-local.yml

@@ -8,7 +8,9 @@ kafka:
       kafkaConnect:
         - name: first
           address: http://localhost:8083
-      jmxPort: 9997
+      metrics:
+        port: 9997
+        type: JMX
   #    -
   #      name: secondLocal
   #      bootstrapServers: localhost:9093
@@ -17,7 +19,9 @@ kafka:
   #      kafkaConnect:
   #        - name: first
   #          address: http://localhost:8083
-  #      jmxPort: 9998
+  #      metrics:
+  #        port: 9998
+  #        type: JMX
   #      read-only: true
   #    -
   #      name: localUsingProtobufFile

+ 0 - 1
kafka-ui-api/src/test/java/com/provectus/kafka/ui/AbstractIntegrationTest.java

@@ -8,7 +8,6 @@ import org.apache.kafka.clients.admin.AdminClient;
 import org.apache.kafka.clients.admin.AdminClientConfig;
 import org.apache.kafka.clients.admin.NewTopic;
 import org.jetbrains.annotations.NotNull;
-import org.junit.jupiter.api.extension.ExtendWith;
 import org.junit.jupiter.api.function.ThrowingConsumer;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.boot.test.autoconfigure.web.reactive.AutoConfigureWebTestClient;

+ 0 - 1
kafka-ui-api/src/test/java/com/provectus/kafka/ui/producer/KafkaTestProducer.java

@@ -2,7 +2,6 @@ package com.provectus.kafka.ui.producer;
 
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.Future;
 import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.kafka.clients.producer.ProducerConfig;
 import org.apache.kafka.clients.producer.ProducerRecord;

+ 0 - 3
kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/SendAndReadTests.java

@@ -129,9 +129,6 @@ public class SendAndReadTests extends AbstractIntegrationTest {
   @Autowired
   private ClustersStorage clustersStorage;
 
-  @Autowired
-  private ClustersMetricsScheduler clustersMetricsScheduler;
-
   @BeforeEach
   void init() {
     targetCluster = clustersStorage.getClusterByName(LOCAL).get();

+ 10 - 10
kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/TopicsServicePaginationTest.java

@@ -14,11 +14,11 @@ import com.provectus.kafka.ui.model.InternalPartitionsOffsets;
 import com.provectus.kafka.ui.model.InternalSchemaRegistry;
 import com.provectus.kafka.ui.model.InternalTopic;
 import com.provectus.kafka.ui.model.KafkaCluster;
+import com.provectus.kafka.ui.model.Metrics;
 import com.provectus.kafka.ui.model.SortOrderDTO;
 import com.provectus.kafka.ui.model.TopicColumnsToSortDTO;
 import com.provectus.kafka.ui.model.TopicDTO;
 import com.provectus.kafka.ui.service.analyze.TopicAnalysisService;
-import com.provectus.kafka.ui.util.JmxClusterUtil;
 import java.util.ArrayList;
 import java.util.Comparator;
 import java.util.List;
@@ -42,7 +42,7 @@ class TopicsServicePaginationTest {
   private final ClustersStorage clustersStorage = mock(ClustersStorage.class);
   private final ClusterMapper clusterMapper = new ClusterMapperImpl();
 
-  private final TopicsController topicsController  = new TopicsController(
+  private final TopicsController topicsController = new TopicsController(
       topicsService, mock(TopicAnalysisService.class), clusterMapper);
 
   private void init(Map<String, InternalTopic> topicsInCache) {
@@ -66,7 +66,7 @@ class TopicsServicePaginationTest {
             .map(Objects::toString)
             .map(name -> new TopicDescription(name, false, List.of()))
             .map(topicDescription -> InternalTopic.from(topicDescription, List.of(), null,
-                JmxClusterUtil.JmxMetrics.empty(), InternalLogDirStats.empty()))
+                Metrics.empty(), InternalLogDirStats.empty()))
             .collect(Collectors.toMap(InternalTopic::getName, Function.identity()))
     );
 
@@ -93,7 +93,7 @@ class TopicsServicePaginationTest {
         .map(Objects::toString)
         .map(name -> new TopicDescription(name, false, List.of()))
         .map(topicDescription -> InternalTopic.from(topicDescription, List.of(), null,
-            JmxClusterUtil.JmxMetrics.empty(), InternalLogDirStats.empty()))
+            Metrics.empty(), InternalLogDirStats.empty()))
         .collect(Collectors.toMap(InternalTopic::getName, Function.identity()));
     init(internalTopics);
 
@@ -120,7 +120,7 @@ class TopicsServicePaginationTest {
             .map(Objects::toString)
             .map(name -> new TopicDescription(name, false, List.of()))
             .map(topicDescription -> InternalTopic.from(topicDescription, List.of(), null,
-                JmxClusterUtil.JmxMetrics.empty(), InternalLogDirStats.empty()))
+                Metrics.empty(), InternalLogDirStats.empty()))
             .collect(Collectors.toMap(InternalTopic::getName, Function.identity()))
     );
 
@@ -139,7 +139,7 @@ class TopicsServicePaginationTest {
             .map(Objects::toString)
             .map(name -> new TopicDescription(name, false, List.of()))
             .map(topicDescription -> InternalTopic.from(topicDescription, List.of(), null,
-                JmxClusterUtil.JmxMetrics.empty(), InternalLogDirStats.empty()))
+                Metrics.empty(), InternalLogDirStats.empty()))
             .collect(Collectors.toMap(InternalTopic::getName, Function.identity()))
     );
 
@@ -158,7 +158,7 @@ class TopicsServicePaginationTest {
             .map(Objects::toString)
             .map(name -> new TopicDescription(name, Integer.parseInt(name) % 10 == 0, List.of()))
             .map(topicDescription -> InternalTopic.from(topicDescription, List.of(), null,
-                JmxClusterUtil.JmxMetrics.empty(), InternalLogDirStats.empty()))
+                Metrics.empty(), InternalLogDirStats.empty()))
             .collect(Collectors.toMap(InternalTopic::getName, Function.identity()))
     );
 
@@ -179,7 +179,7 @@ class TopicsServicePaginationTest {
             .map(Objects::toString)
             .map(name -> new TopicDescription(name, Integer.parseInt(name) % 5 == 0, List.of()))
             .map(topicDescription -> InternalTopic.from(topicDescription, List.of(), null,
-                JmxClusterUtil.JmxMetrics.empty(), InternalLogDirStats.empty()))
+                Metrics.empty(), InternalLogDirStats.empty()))
             .collect(Collectors.toMap(InternalTopic::getName, Function.identity()))
     );
 
@@ -200,7 +200,7 @@ class TopicsServicePaginationTest {
             .map(Objects::toString)
             .map(name -> new TopicDescription(name, false, List.of()))
             .map(topicDescription -> InternalTopic.from(topicDescription, List.of(), null,
-                JmxClusterUtil.JmxMetrics.empty(), InternalLogDirStats.empty()))
+                Metrics.empty(), InternalLogDirStats.empty()))
             .collect(Collectors.toMap(InternalTopic::getName, Function.identity()))
     );
 
@@ -222,7 +222,7 @@ class TopicsServicePaginationTest {
                     new TopicPartitionInfo(p, null, List.of(), List.of()))
                 .collect(Collectors.toList())))
         .map(topicDescription -> InternalTopic.from(topicDescription, List.of(), InternalPartitionsOffsets.empty(),
-            JmxClusterUtil.JmxMetrics.empty(), InternalLogDirStats.empty()))
+            Metrics.empty(), InternalLogDirStats.empty()))
         .collect(Collectors.toMap(InternalTopic::getName, Function.identity()));
 
     init(internalTopics);

+ 77 - 0
kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/metrics/JmxMetricsFormatterTest.java

@@ -0,0 +1,77 @@
+package com.provectus.kafka.ui.service.metrics;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.math.BigDecimal;
+import java.util.List;
+import java.util.Map;
+import javax.management.MBeanAttributeInfo;
+import javax.management.ObjectName;
+import org.assertj.core.data.Offset;
+import org.junit.jupiter.api.Test;
+
+class JmxMetricsFormatterTest {
+
+  /**
+   * Original format is <a href="https://github.com/prometheus/jmx_exporter#default-format">here</a>.
+   */
+  @Test
+  void convertsJmxMetricsAccordingToJmxExporterFormat() throws Exception {
+    List<RawMetric> metrics = JmxMetricsFormatter.constructMetricsList(
+        new ObjectName(
+            "kafka.server:type=Some.BrokerTopic-Metrics,name=BytesOutPer-Sec,topic=test,some-lbl=123"),
+        new MBeanAttributeInfo[] {
+            createMbeanInfo("FifteenMinuteRate"),
+            createMbeanInfo("Mean"),
+            createMbeanInfo("Calls-count"),
+            createMbeanInfo("SkipValue"),
+        },
+        new Object[] {
+            123.0,
+            100.0,
+            10L,
+            "string values not supported"
+        }
+    );
+
+    assertThat(metrics).hasSize(3);
+
+    assertMetricsEqual(
+        RawMetric.create(
+            "kafka_server_Some_BrokerTopic_Metrics_FifteenMinuteRate",
+            Map.of("name", "BytesOutPer-Sec", "topic", "test",  "some_lbl", "123"),
+            BigDecimal.valueOf(123.0)
+        ),
+        metrics.get(0)
+    );
+
+    assertMetricsEqual(
+        RawMetric.create(
+            "kafka_server_Some_BrokerTopic_Metrics_Mean",
+            Map.of("name", "BytesOutPer-Sec", "topic", "test", "some_lbl", "123"),
+            BigDecimal.valueOf(100.0)
+        ),
+        metrics.get(1)
+    );
+
+    assertMetricsEqual(
+        RawMetric.create(
+            "kafka_server_Some_BrokerTopic_Metrics_Calls_count",
+            Map.of("name", "BytesOutPer-Sec", "topic", "test", "some_lbl", "123"),
+            BigDecimal.valueOf(10)
+        ),
+        metrics.get(2)
+    );
+  }
+
+  private static MBeanAttributeInfo createMbeanInfo(String name) {
+    return new MBeanAttributeInfo(name, "sometype-notused", null, true, true, false, null);
+  }
+
+  private void assertMetricsEqual(RawMetric expected, RawMetric actual) {
+    assertThat(actual.name()).isEqualTo(expected.name());
+    assertThat(actual.labels()).isEqualTo(expected.labels());
+    assertThat(actual.value()).isCloseTo(expected.value(), Offset.offset(new BigDecimal("0.001")));
+  }
+
+}

+ 30 - 0
kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/metrics/PrometheusEndpointMetricsParserTest.java

@@ -0,0 +1,30 @@
+package com.provectus.kafka.ui.service.metrics;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.util.Map;
+import java.util.Optional;
+import org.junit.jupiter.api.Test;
+
+class PrometheusEndpointMetricsParserTest {
+
+  @Test
+  void test() {
+    String metricsString =
+        "kafka_server_BrokerTopicMetrics_FifteenMinuteRate"
+            + "{name=\"BytesOutPerSec\",topic=\"__confluent.support.metrics\",} 123.1234";
+
+    Optional<RawMetric> parsedOpt = PrometheusEndpointMetricsParser.parse(metricsString);
+
+    assertThat(parsedOpt).hasValueSatisfying(metric -> {
+      assertThat(metric.name()).isEqualTo("kafka_server_BrokerTopicMetrics_FifteenMinuteRate");
+      assertThat(metric.value()).isEqualTo("123.1234");
+      assertThat(metric.labels()).containsExactlyEntriesOf(
+          Map.of(
+              "name", "BytesOutPerSec",
+              "topic", "__confluent.support.metrics"
+          ));
+    });
+  }
+
+}

+ 64 - 0
kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/metrics/PrometheusMetricsRetrieverTest.java

@@ -0,0 +1,64 @@
+package com.provectus.kafka.ui.service.metrics;
+
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.util.Map;
+import okhttp3.mockwebserver.MockResponse;
+import okhttp3.mockwebserver.MockWebServer;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.springframework.web.reactive.function.client.WebClient;
+import reactor.test.StepVerifier;
+
+class PrometheusMetricsRetrieverTest {
+
+  private final PrometheusMetricsRetriever retriever = new PrometheusMetricsRetriever(WebClient.create());
+
+  private final MockWebServer mockWebServer = new MockWebServer();
+
+  @BeforeEach
+  void startMockServer() throws IOException {
+    mockWebServer.start();
+  }
+
+  @AfterEach
+  void stopMockServer() throws IOException {
+    mockWebServer.close();
+  }
+
+  @Test
+  void callsMetricsEndpointAndConvertsResponceToRawMetric() {
+    var url = mockWebServer.url("/metrics");
+    // body copied from real jmx exporter
+    MockResponse response = new MockResponse().setBody(
+        "# HELP kafka_server_KafkaRequestHandlerPool_FifteenMinuteRate Attribute exposed for management \n"
+            + "# TYPE kafka_server_KafkaRequestHandlerPool_FifteenMinuteRate untyped\n"
+            + "kafka_server_KafkaRequestHandlerPool_FifteenMinuteRate{name=\"RequestHandlerAvgIdlePercent\",} 0.898\n"
+            + "# HELP kafka_server_socket_server_metrics_request_size_avg The average size of requests sent. \n"
+            + "# TYPE kafka_server_socket_server_metrics_request_size_avg untyped\n"
+            + "kafka_server_socket_server_metrics_request_size_avg{listener=\"PLAIN\",networkProcessor=\"1\",} 101.1\n"
+            + "kafka_server_socket_server_metrics_request_size_avg{listener=\"PLAIN2\",networkProcessor=\"5\",} NaN"
+    );
+    mockWebServer.enqueue(response);
+
+    var firstMetric = RawMetric.create(
+        "kafka_server_KafkaRequestHandlerPool_FifteenMinuteRate",
+        Map.of("name", "RequestHandlerAvgIdlePercent"),
+        new BigDecimal("0.898")
+    );
+
+    var second = RawMetric.create(
+        "kafka_server_socket_server_metrics_request_size_avg",
+        Map.of("listener", "PLAIN", "networkProcessor", "1"),
+        new BigDecimal("101.1")
+    );
+
+    StepVerifier.create(retriever.retrieve(url.host(), url.port(), false))
+        .expectNext(firstMetric)
+        .expectNext(second)
+        // third metric should not be present, since it has "NaN" value
+        .verifyComplete();
+  }
+
+}

+ 66 - 0
kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/metrics/WellKnownMetricsTest.java

@@ -0,0 +1,66 @@
+package com.provectus.kafka.ui.service.metrics;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import com.provectus.kafka.ui.model.KafkaCluster;
+import com.provectus.kafka.ui.model.Metrics;
+import java.math.BigDecimal;
+import java.util.Map;
+import org.apache.kafka.common.Node;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.CsvSource;
+
+class WellKnownMetricsTest {
+
+  private final WellKnownMetrics wellKnownMetrics = new WellKnownMetrics();
+
+  @ParameterizedTest
+  @CsvSource({
+      //default jmx exporter format
+      //example: kafka_server_BrokerTopicMetrics_FifteenMinuteRate{name="BytesInPerSec",topic="test-topic",} 222.0
+      //example: kafka_server_BrokerTopicMetrics_FifteenMinuteRate{name="BytesOutPerSec",topic="test-topic",} 111.0
+      "kafka_server_BrokerTopicMetrics_FifteenMinuteRate, BytesInPerSec, BytesOutPerSec",
+
+      //default jmx exporter format, but lower-cased
+      //example: kafka_server_brokertopicmetrics_fifteenminuterate{name="bytesinpersec",topic="test-topic",} 222.0
+      //example: kafka_server_brokertopicmetrics_fifteenminuterate{name="bytesoutpersec",topic="test-topic",} 111.0
+      "kafka_server_brokertopicmetrics_fifteenminuterate, bytesinpersec, bytesoutpersec",
+
+      //unknown prefix metric name
+      "some_unknown_prefix_brokertopicmetrics_fifteenminuterate, bytesinpersec, bytesoutpersec",
+  })
+  void bytesIoTopicMetricsPopulated(String metricName, String bytesInLabel, String bytesOutLabel) {
+    var clusterParam = KafkaCluster.builder().build();
+    var nodeParam = new Node(0, "host", 123);
+
+    var in = RawMetric.create(metricName, Map.of("name", bytesInLabel, "topic", "test-topic"), new BigDecimal("1.0"));
+    var out = RawMetric.create(metricName, Map.of("name", bytesOutLabel, "topic", "test-topic"), new BigDecimal("2.0"));
+
+    // feeding metrics
+    for (int i = 0; i < 3; i++) {
+      wellKnownMetrics.populate(clusterParam, nodeParam, in);
+      wellKnownMetrics.populate(clusterParam, nodeParam, out);
+    }
+
+    assertThat(wellKnownMetrics.bytesInFifteenMinuteRate)
+        .containsEntry("test-topic", new BigDecimal("3.0"));
+
+    assertThat(wellKnownMetrics.bytesOutFifteenMinuteRate)
+        .containsEntry("test-topic", new BigDecimal("6.0"));
+  }
+
+  @Test
+  void appliesInnerStateToMetricsBuilder() {
+    wellKnownMetrics.bytesInFifteenMinuteRate.put("topic", new BigDecimal(1));
+    wellKnownMetrics.bytesOutFifteenMinuteRate.put("topic", new BigDecimal(2));
+
+    Metrics.MetricsBuilder builder = Metrics.builder();
+    wellKnownMetrics.apply(builder);
+    var metrics = builder.build();
+
+    assertThat(metrics.getBytesInPerSec()).containsExactlyEntriesOf(wellKnownMetrics.bytesInFifteenMinuteRate);
+    assertThat(metrics.getBytesOutPerSec()).containsExactlyEntriesOf(wellKnownMetrics.bytesOutFifteenMinuteRate);
+  }
+
+}

+ 0 - 28
kafka-ui-api/src/test/java/com/provectus/kafka/ui/util/NumberUtilTest.java

@@ -1,28 +0,0 @@
-package com.provectus.kafka.ui.util;
-
-import org.junit.jupiter.api.Assertions;
-import org.junit.jupiter.api.Test;
-
-class NumberUtilTest {
-
-  @Test
-  void shouldReturnFalseWhenNonNumeric() {
-    Assertions.assertFalse(NumberUtil.isNumeric(Double.POSITIVE_INFINITY));
-    Assertions.assertFalse(NumberUtil.isNumeric(Double.NEGATIVE_INFINITY));
-    Assertions.assertFalse(NumberUtil.isNumeric(Double.NaN));
-    Assertions.assertFalse(NumberUtil.isNumeric(null));
-    Assertions.assertFalse(NumberUtil.isNumeric(" "));
-    Assertions.assertFalse(NumberUtil.isNumeric(new Object()));
-    Assertions.assertFalse(NumberUtil.isNumeric("1231asd"));
-  }
-
-  @Test
-  void shouldReturnTrueWhenNumeric() {
-    Assertions.assertTrue(NumberUtil.isNumeric("123.45"));
-    Assertions.assertTrue(NumberUtil.isNumeric(123.45));
-    Assertions.assertTrue(NumberUtil.isNumeric(123));
-    Assertions.assertTrue(NumberUtil.isNumeric(-123.45));
-    Assertions.assertTrue(NumberUtil.isNumeric(-1e-10));
-    Assertions.assertTrue(NumberUtil.isNumeric(1e-10));
-  }
-}

+ 2 - 6
kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml

@@ -2572,16 +2572,12 @@ components:
       properties:
         name:
           type: string
-        canonicalName:
-          type: string
-        params:
+        labels:
           type: string
           additionalProperties:
             type: string
         value:
-          type: string
-          additionalProperties:
-            type: number
+          type: number
 
     TopicLogdirs:
       type: object

+ 1 - 0
pom.xml

@@ -37,6 +37,7 @@
         <confluent.version>7.0.1</confluent.version>
         <apache.commons.version>2.11.1</apache.commons.version>
         <test.containers.version>1.17.1</test.containers.version>
+        <okhttp3.mockwebserver.version>4.10.0</okhttp3.mockwebserver.version>
         <junit-jupiter-engine.version>5.7.2</junit-jupiter-engine.version>
         <mockito.version>2.21.0</mockito.version>
         <assertj.version>3.19.0</assertj.version>