diff --git a/README.md b/README.md
index 4236f40116..1440cb7086 100644
--- a/README.md
+++ b/README.md
@@ -92,7 +92,9 @@ kafka:
username: username
password: password
# schemaNameTemplate: "%s-value"
- jmxPort: 9997
+ metrics:
+ port: 9997
+ type: JMX
-
```
@@ -102,7 +104,8 @@ kafka:
* `schemaRegistryAuth.username`: schemaRegistry's basic authentication username
* `schemaRegistryAuth.password`: schemaRegistry's basic authentication password
* `schemaNameTemplate`: how keys are saved to schemaRegistry
-* `jmxPort`: open JMX port of a broker
+* `metrics.port`: open JMX port of a broker
+* `metrics.type`: Type of metrics, either JMX or PROMETHEUS. Defaulted to JMX.
* `readOnly`: enable read only mode
Configure as many clusters as you need by adding their configs below separated with `-`.
@@ -181,15 +184,16 @@ For example, if you want to use an environment variable to set the `name` parame
|`KAFKA_CLUSTERS_0_SCHEMAREGISTRYAUTH_USERNAME` |SchemaRegistry's basic authentication username
|`KAFKA_CLUSTERS_0_SCHEMAREGISTRYAUTH_PASSWORD` |SchemaRegistry's basic authentication password
|`KAFKA_CLUSTERS_0_SCHEMANAMETEMPLATE` |How keys are saved to schemaRegistry
-|`KAFKA_CLUSTERS_0_JMXPORT` |Open JMX port of a broker
+|`KAFKA_CLUSTERS_0_METRICS_PORT` |Open metrics port of a broker
+|`KAFKA_CLUSTERS_0_METRICS_TYPE` |Type of metrics retriever to use. Valid values are JMX (default) or PROMETHEUS. If Prometheus, then metrics are read from prometheus-jmx-exporter instead of jmx
|`KAFKA_CLUSTERS_0_READONLY` |Enable read-only mode. Default: false
|`KAFKA_CLUSTERS_0_DISABLELOGDIRSCOLLECTION` |Disable collecting segments information. It should be true for confluent cloud. Default: false
|`KAFKA_CLUSTERS_0_KAFKACONNECT_0_NAME` |Given name for the Kafka Connect cluster
|`KAFKA_CLUSTERS_0_KAFKACONNECT_0_ADDRESS` |Address of the Kafka Connect service endpoint
|`KAFKA_CLUSTERS_0_KAFKACONNECT_0_USERNAME`| Kafka Connect cluster's basic authentication username
|`KAFKA_CLUSTERS_0_KAFKACONNECT_0_PASSWORD`| Kafka Connect cluster's basic authentication password
-|`KAFKA_CLUSTERS_0_JMXSSL` |Enable SSL for JMX? `true` or `false`. For advanced setup, see `kafka-ui-jmx-secured.yml`
-|`KAFKA_CLUSTERS_0_JMXUSERNAME` |Username for JMX authentication
-|`KAFKA_CLUSTERS_0_JMXPASSWORD` |Password for JMX authentication
+|`KAFKA_CLUSTERS_0_METRICS_SSL` |Enable SSL for Metrics? `true` or `false`. For advanced setup, see `kafka-ui-jmx-secured.yml`
+|`KAFKA_CLUSTERS_0_METRICS_USERNAME` |Username for Metrics authentication
+|`KAFKA_CLUSTERS_0_METRICS_PASSWORD` |Password for Metrics authentication
|`TOPIC_RECREATE_DELAY_SECONDS` |Time delay between topic deletion and topic creation attempts for topic recreate functionality. Default: 1
|`TOPIC_RECREATE_MAXRETRIES` |Number of attempts of topic creation after topic deletion for topic recreate functionality. Default: 15
diff --git a/documentation/compose/DOCKER_COMPOSE.md b/documentation/compose/DOCKER_COMPOSE.md
index d019427d74..c380c99173 100644
--- a/documentation/compose/DOCKER_COMPOSE.md
+++ b/documentation/compose/DOCKER_COMPOSE.md
@@ -12,3 +12,4 @@
10. [kafka-ui-sasl.yaml](./kafka-ui-sasl.yaml) - SASL auth for Kafka.
11. [kafka-ui-traefik-proxy.yaml](./kafka-ui-traefik-proxy.yaml) - Traefik specific proxy configuration.
12. [oauth-cognito.yaml](./oauth-cognito.yaml) - OAuth2 with Cognito
+13. [kafka-ui-with-jmx-exporter.yaml](./kafka-ui-with-jmx-exporter.yaml) - A configuration with 2 kafka clusters with enabled prometheus jmx exporters instead of jmx.
diff --git a/documentation/compose/auth-ldap.yaml b/documentation/compose/auth-ldap.yaml
index 12069639f2..4d296927c9 100644
--- a/documentation/compose/auth-ldap.yaml
+++ b/documentation/compose/auth-ldap.yaml
@@ -15,14 +15,14 @@ services:
KAFKA_CLUSTERS_0_NAME: local
KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka0:29092
KAFKA_CLUSTERS_0_ZOOKEEPER: zookeeper0:2181
- KAFKA_CLUSTERS_0_JMXPORT: 9997
+ KAFKA_CLUSTERS_0_METRICS_PORT: 9997
KAFKA_CLUSTERS_0_SCHEMAREGISTRY: http://schemaregistry0:8085
KAFKA_CLUSTERS_0_KAFKACONNECT_0_NAME: first
KAFKA_CLUSTERS_0_KAFKACONNECT_0_ADDRESS: http://kafka-connect0:8083
KAFKA_CLUSTERS_1_NAME: secondLocal
KAFKA_CLUSTERS_1_BOOTSTRAPSERVERS: kafka1:29092
KAFKA_CLUSTERS_1_ZOOKEEPER: zookeeper1:2181
- KAFKA_CLUSTERS_1_JMXPORT: 9998
+ KAFKA_CLUSTERS_1_METRICS_PORT: 9998
KAFKA_CLUSTERS_1_SCHEMAREGISTRY: http://schemaregistry1:8085
KAFKA_CLUSTERS_1_KAFKACONNECT_0_NAME: first
KAFKA_CLUSTERS_1_KAFKACONNECT_0_ADDRESS: http://kafka-connect0:8083
diff --git a/documentation/compose/e2e-tests.yaml b/documentation/compose/e2e-tests.yaml
index bc0efffa90..e09ec517d2 100644
--- a/documentation/compose/e2e-tests.yaml
+++ b/documentation/compose/e2e-tests.yaml
@@ -16,7 +16,7 @@ services:
KAFKA_CLUSTERS_0_NAME: local
KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka0:29092
KAFKA_CLUSTERS_0_ZOOKEEPER: zookeeper0:2181
- KAFKA_CLUSTERS_0_JMXPORT: 9997
+ KAFKA_CLUSTERS_0_METRICS_PORT: 9997
KAFKA_CLUSTERS_0_SCHEMAREGISTRY: http://schemaregistry0:8085
KAFKA_CLUSTERS_0_KAFKACONNECT_0_NAME: first
KAFKA_CLUSTERS_0_KAFKACONNECT_0_ADDRESS: http://kafka-connect0:8083
diff --git a/documentation/compose/jmx-exporter/kafka-broker.yml b/documentation/compose/jmx-exporter/kafka-broker.yml
new file mode 100644
index 0000000000..cb19fee9f8
--- /dev/null
+++ b/documentation/compose/jmx-exporter/kafka-broker.yml
@@ -0,0 +1,2 @@
+rules:
+ - pattern: ".*"
\ No newline at end of file
diff --git a/documentation/compose/jmx-exporter/kafka-prepare-and-run b/documentation/compose/jmx-exporter/kafka-prepare-and-run
new file mode 100755
index 0000000000..2ccf17df50
--- /dev/null
+++ b/documentation/compose/jmx-exporter/kafka-prepare-and-run
@@ -0,0 +1,10 @@
+#!/usr/bin/env bash
+
+JAVA_AGENT_FILE="/usr/share/jmx_exporter/jmx_prometheus_javaagent.jar"
+if [ ! -f "$JAVA_AGENT_FILE" ]
+then
+ echo "Downloading jmx_exporter javaagent"
+ curl -o $JAVA_AGENT_FILE https://repo1.maven.org/maven2/io/prometheus/jmx/jmx_prometheus_javaagent/0.16.1/jmx_prometheus_javaagent-0.16.1.jar
+fi
+
+exec /etc/confluent/docker/run
\ No newline at end of file
diff --git a/documentation/compose/kafka-ui-auth-context.yaml b/documentation/compose/kafka-ui-auth-context.yaml
index a3c4bee36b..fac1af0aa2 100644
--- a/documentation/compose/kafka-ui-auth-context.yaml
+++ b/documentation/compose/kafka-ui-auth-context.yaml
@@ -14,7 +14,7 @@ services:
KAFKA_CLUSTERS_0_NAME: local
KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka0:29092
KAFKA_CLUSTERS_0_ZOOKEEPER: zookeeper0:2181
- KAFKA_CLUSTERS_0_JMXPORT: 9997
+ KAFKA_CLUSTERS_0_METRICS_PORT: 9997
SERVER_SERVLET_CONTEXT_PATH: /kafkaui
AUTH_TYPE: "LOGIN_FORM"
SPRING_SECURITY_USER_NAME: admin
diff --git a/documentation/compose/kafka-ui-connectors-auth.yaml b/documentation/compose/kafka-ui-connectors-auth.yaml
index a303435142..f6a6199802 100644
--- a/documentation/compose/kafka-ui-connectors-auth.yaml
+++ b/documentation/compose/kafka-ui-connectors-auth.yaml
@@ -18,7 +18,7 @@ services:
KAFKA_CLUSTERS_0_NAME: local
KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka0:29092
KAFKA_CLUSTERS_0_ZOOKEEPER: zookeeper0:2181
- KAFKA_CLUSTERS_0_JMXPORT: 9997
+ KAFKA_CLUSTERS_0_METRICS_PORT: 9997
KAFKA_CLUSTERS_0_SCHEMAREGISTRY: http://schemaregistry0:8085
KAFKA_CLUSTERS_0_KAFKACONNECT_0_NAME: first
KAFKA_CLUSTERS_0_KAFKACONNECT_0_ADDRESS: http://kafka-connect0:8083
diff --git a/documentation/compose/kafka-ui-jmx-secured.yml b/documentation/compose/kafka-ui-jmx-secured.yml
index 133a19986d..71f61b1e55 100644
--- a/documentation/compose/kafka-ui-jmx-secured.yml
+++ b/documentation/compose/kafka-ui-jmx-secured.yml
@@ -20,10 +20,10 @@ services:
KAFKA_CLUSTERS_0_SCHEMAREGISTRY: http://schemaregistry0:8085
KAFKA_CLUSTERS_0_KAFKACONNECT_0_NAME: first
KAFKA_CLUSTERS_0_KAFKACONNECT_0_ADDRESS: http://kafka-connect0:8083
- KAFKA_CLUSTERS_0_JMXPORT: 9997
- KAFKA_CLUSTERS_0_JMXSSL: 'true'
- KAFKA_CLUSTERS_0_JMXUSERNAME: root
- KAFKA_CLUSTERS_0_JMXPASSWORD: password
+ KAFKA_CLUSTERS_0_METRICS_PORT: 9997
+ KAFKA_CLUSTERS_0_METRICS_SSL: 'true'
+ KAFKA_CLUSTERS_0_METRICS_USERNAME: root
+ KAFKA_CLUSTERS_0_METRICS_PASSWORD: password
JAVA_OPTS: >-
-agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=*:5005
-Djavax.net.ssl.trustStore=/jmx/clienttruststore
diff --git a/documentation/compose/kafka-ui-with-jmx-exporter.yaml b/documentation/compose/kafka-ui-with-jmx-exporter.yaml
new file mode 100644
index 0000000000..e6d21584a7
--- /dev/null
+++ b/documentation/compose/kafka-ui-with-jmx-exporter.yaml
@@ -0,0 +1,45 @@
+---
+version: '2'
+services:
+
+ zookeeper0:
+ image: confluentinc/cp-zookeeper:5.2.4
+ environment:
+ ZOOKEEPER_CLIENT_PORT: 2181
+ ZOOKEEPER_TICK_TIME: 2000
+ ports:
+ - 2181:2181
+
+ kafka0:
+ image: confluentinc/cp-kafka:5.3.1
+ # downloading jmx_exporter javaagent and starting kafka
+ command: "/usr/share/jmx_exporter/kafka-prepare-and-run"
+ depends_on:
+ - zookeeper0
+ environment:
+ KAFKA_BROKER_ID: 1
+ KAFKA_ZOOKEEPER_CONNECT: zookeeper0:2181
+ KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka0:29092,PLAINTEXT_HOST://localhost:9092
+ KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
+ KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
+ KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
+ KAFKA_OPTS: -javaagent:/usr/share/jmx_exporter/jmx_prometheus_javaagent.jar=11001:/usr/share/jmx_exporter/kafka-broker.yml
+ ports:
+ - 9092:9092
+ - 11001:11001
+ volumes:
+ - ./jmx-exporter:/usr/share/jmx_exporter/
+
+ kafka-ui:
+ container_name: kafka-ui
+ image: provectuslabs/kafka-ui:latest
+ ports:
+ - 8080:8080
+ depends_on:
+ - zookeeper0
+ - kafka0
+ environment:
+ KAFKA_CLUSTERS_0_NAME: local
+ KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka0:29092
+ KAFKA_CLUSTERS_0_METRICS_PORT: 11001
+ KAFKA_CLUSTERS_0_METRICS_TYPE: PROMETHEUS
diff --git a/documentation/compose/kafka-ui.yaml b/documentation/compose/kafka-ui.yaml
index 8afe6b6d2f..ff808ae1ab 100644
--- a/documentation/compose/kafka-ui.yaml
+++ b/documentation/compose/kafka-ui.yaml
@@ -18,14 +18,14 @@ services:
KAFKA_CLUSTERS_0_NAME: local
KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka0:29092
KAFKA_CLUSTERS_0_ZOOKEEPER: zookeeper0:2181
- KAFKA_CLUSTERS_0_JMXPORT: 9997
+ KAFKA_CLUSTERS_0_METRICS_PORT: 9997
KAFKA_CLUSTERS_0_SCHEMAREGISTRY: http://schemaregistry0:8085
KAFKA_CLUSTERS_0_KAFKACONNECT_0_NAME: first
KAFKA_CLUSTERS_0_KAFKACONNECT_0_ADDRESS: http://kafka-connect0:8083
KAFKA_CLUSTERS_1_NAME: secondLocal
KAFKA_CLUSTERS_1_BOOTSTRAPSERVERS: kafka1:29092
KAFKA_CLUSTERS_1_ZOOKEEPER: zookeeper1:2181
- KAFKA_CLUSTERS_1_JMXPORT: 9998
+ KAFKA_CLUSTERS_1_METRICS_PORT: 9998
KAFKA_CLUSTERS_1_SCHEMAREGISTRY: http://schemaregistry1:8085
KAFKA_CLUSTERS_1_KAFKACONNECT_0_NAME: first
KAFKA_CLUSTERS_1_KAFKACONNECT_0_ADDRESS: http://kafka-connect0:8083
diff --git a/kafka-ui-api/pom.xml b/kafka-ui-api/pom.xml
index 9a27d29b16..ab5fa62bd1 100644
--- a/kafka-ui-api/pom.xml
+++ b/kafka-ui-api/pom.xml
@@ -185,6 +185,18 @@
2.2.14
test
+
+ com.squareup.okhttp3
+ mockwebserver
+ ${okhttp3.mockwebserver.version}
+ test
+
+
+ com.squareup.okhttp3
+ okhttp
+ ${okhttp3.mockwebserver.version}
+ test
+
org.springframework.boot
diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/config/ClustersProperties.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/config/ClustersProperties.java
index 0c7b8fb3d4..5b59800354 100644
--- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/config/ClustersProperties.java
+++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/config/ClustersProperties.java
@@ -30,10 +30,7 @@ public class ClustersProperties {
String ksqldbServer;
KsqldbServerAuth ksqldbServerAuth;
List kafkaConnect;
- int jmxPort;
- boolean jmxSsl;
- String jmxUsername;
- String jmxPassword;
+ MetricsConfigData metrics;
Properties properties;
boolean readOnly = false;
boolean disableLogDirsCollection = false;
@@ -42,6 +39,15 @@ public class ClustersProperties {
String defaultValueSerde;
}
+ @Data
+ public static class MetricsConfigData {
+ String type;
+ Integer port;
+ boolean ssl;
+ String username;
+ String password;
+ }
+
@Data
public static class ConnectCluster {
String name;
diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/config/Config.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/config/Config.java
index 92174d2a47..37495b5029 100644
--- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/config/Config.java
+++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/config/Config.java
@@ -1,14 +1,8 @@
package com.provectus.kafka.ui.config;
-import com.provectus.kafka.ui.model.JmxConnectionInfo;
-import com.provectus.kafka.ui.util.JmxPoolFactory;
import java.util.Collections;
import java.util.Map;
-import javax.management.remote.JMXConnector;
import lombok.AllArgsConstructor;
-import org.apache.commons.pool2.KeyedObjectPool;
-import org.apache.commons.pool2.impl.GenericKeyedObjectPool;
-import org.apache.commons.pool2.impl.GenericKeyedObjectPoolConfig;
import org.openapitools.jackson.nullable.JsonNullableModule;
import org.springframework.beans.factory.ObjectProvider;
import org.springframework.beans.factory.annotation.Value;
@@ -49,21 +43,6 @@ public class Config {
return httpHandler;
}
-
- @Bean
- public KeyedObjectPool pool() {
- var pool = new GenericKeyedObjectPool<>(new JmxPoolFactory());
- pool.setConfig(poolConfig());
- return pool;
- }
-
- private GenericKeyedObjectPoolConfig poolConfig() {
- final var poolConfig = new GenericKeyedObjectPoolConfig();
- poolConfig.setMaxIdlePerKey(3);
- poolConfig.setMaxTotalPerKey(3);
- return poolConfig;
- }
-
@Bean
public MBeanExporter exporter() {
final var exporter = new MBeanExporter();
diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/mapper/ClusterMapper.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/mapper/ClusterMapper.java
index ac0fc4ea87..bf7cb33636 100644
--- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/mapper/ClusterMapper.java
+++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/mapper/ClusterMapper.java
@@ -23,9 +23,10 @@ import com.provectus.kafka.ui.model.InternalReplica;
import com.provectus.kafka.ui.model.InternalSchemaRegistry;
import com.provectus.kafka.ui.model.InternalTopic;
import com.provectus.kafka.ui.model.InternalTopicConfig;
-import com.provectus.kafka.ui.model.JmxBrokerMetrics;
import com.provectus.kafka.ui.model.KafkaCluster;
import com.provectus.kafka.ui.model.KafkaConnectCluster;
+import com.provectus.kafka.ui.model.MetricDTO;
+import com.provectus.kafka.ui.model.Metrics;
import com.provectus.kafka.ui.model.PartitionDTO;
import com.provectus.kafka.ui.model.ReplicaDTO;
import com.provectus.kafka.ui.model.TopicConfigDTO;
@@ -33,7 +34,8 @@ import com.provectus.kafka.ui.model.TopicDTO;
import com.provectus.kafka.ui.model.TopicDetailsDTO;
import com.provectus.kafka.ui.model.schemaregistry.InternalCompatibilityCheck;
import com.provectus.kafka.ui.model.schemaregistry.InternalCompatibilityLevel;
-import com.provectus.kafka.ui.util.JmxClusterUtil;
+import com.provectus.kafka.ui.service.metrics.RawMetric;
+import java.nio.file.Path;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
@@ -53,15 +55,27 @@ public interface ClusterMapper {
@Mapping(target = "properties", source = "properties", qualifiedByName = "setProperties")
@Mapping(target = "schemaRegistry", source = ".", qualifiedByName = "setSchemaRegistry")
@Mapping(target = "ksqldbServer", source = ".", qualifiedByName = "setKsqldbServer")
+ @Mapping(target = "metricsConfig", source = "metrics")
KafkaCluster toKafkaCluster(ClustersProperties.Cluster clusterProperties);
ClusterStatsDTO toClusterStats(InternalClusterState clusterState);
- default ClusterMetricsDTO toClusterMetrics(JmxClusterUtil.JmxMetrics jmxMetrics) {
- return new ClusterMetricsDTO().items(jmxMetrics.getMetrics());
+ default ClusterMetricsDTO toClusterMetrics(Metrics metrics) {
+ return new ClusterMetricsDTO()
+ .items(metrics.getSummarizedMetrics().map(this::convert).collect(Collectors.toList()));
}
- BrokerMetricsDTO toBrokerMetrics(JmxBrokerMetrics metrics);
+ private MetricDTO convert(RawMetric rawMetric) {
+ return new MetricDTO()
+ .name(rawMetric.name())
+ .labels(rawMetric.labels())
+ .value(rawMetric.value());
+ }
+
+ default BrokerMetricsDTO toBrokerMetrics(List metrics) {
+ return new BrokerMetricsDTO()
+ .metrics(metrics.stream().map(this::convert).collect(Collectors.toList()));
+ }
@Mapping(target = "isSensitive", source = "sensitive")
@Mapping(target = "isReadOnly", source = "readOnly")
diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/JmxBrokerMetrics.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/BrokerMetrics.java
similarity index 85%
rename from kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/JmxBrokerMetrics.java
rename to kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/BrokerMetrics.java
index e57cbbad30..2ffc8de185 100644
--- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/JmxBrokerMetrics.java
+++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/BrokerMetrics.java
@@ -6,6 +6,6 @@ import lombok.Data;
@Data
@Builder(toBuilder = true)
-public class JmxBrokerMetrics {
+public class BrokerMetrics {
private final List metrics;
}
diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/InternalClusterMetrics.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/InternalClusterMetrics.java
index b9706ef6c8..17aa8e5131 100644
--- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/InternalClusterMetrics.java
+++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/InternalClusterMetrics.java
@@ -46,10 +46,10 @@ public class InternalClusterMetrics {
@Nullable // will be null if log dir collection disabled
private final Map internalBrokerDiskUsage;
- // metrics from jmx
+ // metrics from metrics collector
private final BigDecimal bytesInPerSec;
private final BigDecimal bytesOutPerSec;
- private final Map internalBrokerMetrics;
+ private final Map internalBrokerMetrics;
private final List metrics;
}
diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/InternalClusterState.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/InternalClusterState.java
index 410cb6319b..bdd7a32d04 100644
--- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/InternalClusterState.java
+++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/InternalClusterState.java
@@ -1,7 +1,6 @@
package com.provectus.kafka.ui.model;
import com.google.common.base.Throwables;
-import com.provectus.kafka.ui.service.MetricsCache;
import java.math.BigDecimal;
import java.util.List;
import java.util.Optional;
@@ -28,21 +27,21 @@ public class InternalClusterState {
private BigDecimal bytesOutPerSec;
private Boolean readOnly;
- public InternalClusterState(KafkaCluster cluster, MetricsCache.Metrics metrics) {
+ public InternalClusterState(KafkaCluster cluster, Statistics statistics) {
name = cluster.getName();
- status = metrics.getStatus();
- lastError = Optional.ofNullable(metrics.getLastKafkaException())
+ status = statistics.getStatus();
+ lastError = Optional.ofNullable(statistics.getLastKafkaException())
.map(e -> new MetricsCollectionErrorDTO()
.message(e.getMessage())
.stackTrace(Throwables.getStackTraceAsString(e)))
.orElse(null);
- topicCount = metrics.getTopicDescriptions().size();
- brokerCount = metrics.getClusterDescription().getNodes().size();
- activeControllers = metrics.getClusterDescription().getController() != null ? 1 : 0;
- version = metrics.getVersion();
+ topicCount = statistics.getTopicDescriptions().size();
+ brokerCount = statistics.getClusterDescription().getNodes().size();
+ activeControllers = statistics.getClusterDescription().getController() != null ? 1 : 0;
+ version = statistics.getVersion();
- if (metrics.getLogDirInfo() != null) {
- diskUsage = metrics.getLogDirInfo().getBrokerStats().entrySet().stream()
+ if (statistics.getLogDirInfo() != null) {
+ diskUsage = statistics.getLogDirInfo().getBrokerStats().entrySet().stream()
.map(e -> new BrokerDiskUsageDTO()
.brokerId(e.getKey())
.segmentSize(e.getValue().getSegmentSize())
@@ -50,15 +49,21 @@ public class InternalClusterState {
.collect(Collectors.toList());
}
- features = metrics.getFeatures();
+ features = statistics.getFeatures();
- bytesInPerSec = metrics.getJmxMetrics().getBytesInPerSec().values().stream()
+ bytesInPerSec = statistics
+ .getMetrics()
+ .getBytesInPerSec()
+ .values().stream()
.reduce(BigDecimal.ZERO, BigDecimal::add);
- bytesOutPerSec = metrics.getJmxMetrics().getBytesOutPerSec().values().stream()
+ bytesOutPerSec = statistics
+ .getMetrics()
+ .getBytesOutPerSec()
+ .values().stream()
.reduce(BigDecimal.ZERO, BigDecimal::add);
- var partitionsStats = new PartitionsStats(metrics.getTopicDescriptions().values());
+ var partitionsStats = new PartitionsStats(statistics.getTopicDescriptions().values());
onlinePartitionCount = partitionsStats.getOnlinePartitionCount();
offlinePartitionCount = partitionsStats.getOfflinePartitionCount();
inSyncReplicasCount = partitionsStats.getInSyncReplicasCount();
diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/InternalTopic.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/InternalTopic.java
index 206e3a83fb..b669d0db41 100644
--- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/InternalTopic.java
+++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/InternalTopic.java
@@ -1,6 +1,5 @@
package com.provectus.kafka.ui.model;
-import com.provectus.kafka.ui.util.JmxClusterUtil;
import java.math.BigDecimal;
import java.util.List;
import java.util.Map;
@@ -29,7 +28,7 @@ public class InternalTopic {
private final List topicConfigs;
private final CleanupPolicy cleanUpPolicy;
- // rates from jmx
+ // rates from metrics
private final BigDecimal bytesInPerSec;
private final BigDecimal bytesOutPerSec;
@@ -40,7 +39,7 @@ public class InternalTopic {
public static InternalTopic from(TopicDescription topicDescription,
List configs,
InternalPartitionsOffsets partitionsOffsets,
- JmxClusterUtil.JmxMetrics jmxMetrics,
+ Metrics metrics,
InternalLogDirStats logDirInfo) {
var topic = InternalTopic.builder();
topic.internal(
@@ -105,8 +104,8 @@ public class InternalTopic {
topic.segmentSize(segmentStats.getSegmentSize());
}
- topic.bytesInPerSec(jmxMetrics.getBytesInPerSec().get(topicDescription.name()));
- topic.bytesOutPerSec(jmxMetrics.getBytesOutPerSec().get(topicDescription.name()));
+ topic.bytesInPerSec(metrics.getBytesInPerSec().get(topicDescription.name()));
+ topic.bytesOutPerSec(metrics.getBytesOutPerSec().get(topicDescription.name()));
topic.topicConfigs(
configs.stream().map(InternalTopicConfig::from).collect(Collectors.toList()));
diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/KafkaCluster.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/KafkaCluster.java
index c69a3d3c0d..f90faa391d 100644
--- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/KafkaCluster.java
+++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/KafkaCluster.java
@@ -13,10 +13,6 @@ import lombok.Data;
public class KafkaCluster {
private final String name;
private final String version;
- private final Integer jmxPort;
- private final boolean jmxSsl;
- private final String jmxUsername;
- private final String jmxPassword;
private final String bootstrapServers;
private final InternalSchemaRegistry schemaRegistry;
private final InternalKsqlServer ksqldbServer;
@@ -24,4 +20,5 @@ public class KafkaCluster {
private final Properties properties;
private final boolean readOnly;
private final boolean disableLogDirsCollection;
+ private final MetricsConfig metricsConfig;
}
diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/Metrics.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/Metrics.java
new file mode 100644
index 0000000000..f9f6943808
--- /dev/null
+++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/Metrics.java
@@ -0,0 +1,38 @@
+package com.provectus.kafka.ui.model;
+
+import static java.util.stream.Collectors.toMap;
+
+import com.provectus.kafka.ui.service.metrics.RawMetric;
+import java.math.BigDecimal;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Stream;
+import lombok.Builder;
+import lombok.Value;
+
+
+@Builder
+@Value
+public class Metrics {
+ Map bytesInPerSec;
+ Map bytesOutPerSec;
+ Map> perBrokerMetrics;
+
+ public static Metrics empty() {
+ return Metrics.builder()
+ .bytesInPerSec(Map.of())
+ .bytesOutPerSec(Map.of())
+ .perBrokerMetrics(Map.of())
+ .build();
+ }
+
+ public Stream getSummarizedMetrics() {
+ return perBrokerMetrics.values().stream()
+ .flatMap(Collection::stream)
+ .collect(toMap(RawMetric::identityKey, m -> m, (m1, m2) -> m1.copyWithValue(m1.value().add(m2.value()))))
+ .values()
+ .stream();
+ }
+
+}
diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/MetricsConfig.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/MetricsConfig.java
new file mode 100644
index 0000000000..2554008080
--- /dev/null
+++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/MetricsConfig.java
@@ -0,0 +1,20 @@
+package com.provectus.kafka.ui.model;
+
+import lombok.AccessLevel;
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Data;
+
+@Data
+@Builder(toBuilder = true)
+@AllArgsConstructor(access = AccessLevel.PRIVATE)
+public class MetricsConfig {
+ public static final String JMX_METRICS_TYPE = "JMX";
+ public static final String PROMETHEUS_METRICS_TYPE = "PROMETHEUS";
+
+ private final String type;
+ private final Integer port;
+ private final boolean ssl;
+ private final String username;
+ private final String password;
+}
diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/Statistics.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/Statistics.java
new file mode 100644
index 0000000000..cb74c5d5ab
--- /dev/null
+++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/Statistics.java
@@ -0,0 +1,38 @@
+package com.provectus.kafka.ui.model;
+
+import com.provectus.kafka.ui.service.ReactiveAdminClient;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import lombok.Builder;
+import lombok.Value;
+import org.apache.kafka.clients.admin.ConfigEntry;
+import org.apache.kafka.clients.admin.TopicDescription;
+
+@Value
+@Builder(toBuilder = true)
+public class Statistics {
+ ServerStatusDTO status;
+ Throwable lastKafkaException;
+ String version;
+ List features;
+ ReactiveAdminClient.ClusterDescription clusterDescription;
+ Metrics metrics;
+ InternalLogDirStats logDirInfo;
+ Map topicDescriptions;
+ Map> topicConfigs;
+
+ public static Statistics empty() {
+ return builder()
+ .status(ServerStatusDTO.OFFLINE)
+ .version("Unknown")
+ .features(List.of())
+ .clusterDescription(
+ new ReactiveAdminClient.ClusterDescription(null, null, List.of(), Set.of()))
+ .metrics(Metrics.empty())
+ .logDirInfo(InternalLogDirStats.empty())
+ .topicDescriptions(Map.of())
+ .topicConfigs(Map.of())
+ .build();
+ }
+}
diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/AdminClientServiceImpl.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/AdminClientServiceImpl.java
index d8c96f8cbc..7b93b1420e 100644
--- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/AdminClientServiceImpl.java
+++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/AdminClientServiceImpl.java
@@ -12,7 +12,6 @@ import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
-import org.springframework.util.StringUtils;
import reactor.core.publisher.Mono;
@Service
diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/BrokerService.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/BrokerService.java
index 28125874bb..cd6ad93ba0 100644
--- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/BrokerService.java
+++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/BrokerService.java
@@ -9,8 +9,8 @@ import com.provectus.kafka.ui.model.BrokerDTO;
import com.provectus.kafka.ui.model.BrokerLogdirUpdateDTO;
import com.provectus.kafka.ui.model.BrokersLogdirsDTO;
import com.provectus.kafka.ui.model.InternalBrokerConfig;
-import com.provectus.kafka.ui.model.JmxBrokerMetrics;
import com.provectus.kafka.ui.model.KafkaCluster;
+import com.provectus.kafka.ui.service.metrics.RawMetric;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
@@ -35,7 +35,7 @@ import reactor.core.publisher.Mono;
@Slf4j
public class BrokerService {
- private final MetricsCache metricsCache;
+ private final StatisticsCache statisticsCache;
private final AdminClientService adminClientService;
private final DescribeLogDirsMapper describeLogDirsMapper;
@@ -54,7 +54,7 @@ public class BrokerService {
}
private Flux getBrokersConfig(KafkaCluster cluster, Integer brokerId) {
- if (metricsCache.get(cluster).getClusterDescription().getNodes()
+ if (statisticsCache.get(cluster).getClusterDescription().getNodes()
.stream().noneMatch(node -> node.id() == brokerId)) {
return Flux.error(
new NotFoundException(String.format("Broker with id %s not found", brokerId)));
@@ -125,7 +125,7 @@ public class BrokerService {
KafkaCluster cluster, List reqBrokers) {
return adminClientService.get(cluster)
.flatMap(admin -> {
- List brokers = metricsCache.get(cluster).getClusterDescription().getNodes()
+ List brokers = statisticsCache.get(cluster).getClusterDescription().getNodes()
.stream()
.map(Node::id)
.collect(Collectors.toList());
@@ -150,9 +150,8 @@ public class BrokerService {
return getBrokersConfig(cluster, brokerId);
}
- public Mono getBrokerMetrics(KafkaCluster cluster, Integer brokerId) {
- return Mono.justOrEmpty(
- metricsCache.get(cluster).getJmxMetrics().getInternalBrokerMetrics().get(brokerId));
+ public Mono> getBrokerMetrics(KafkaCluster cluster, Integer brokerId) {
+ return Mono.justOrEmpty(statisticsCache.get(cluster).getMetrics().getPerBrokerMetrics().get(brokerId));
}
}
diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/ClusterService.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/ClusterService.java
index d38af2a18c..42c75f3d4e 100644
--- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/ClusterService.java
+++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/ClusterService.java
@@ -18,33 +18,33 @@ import reactor.core.publisher.Mono;
@Slf4j
public class ClusterService {
- private final MetricsCache metricsCache;
+ private final StatisticsCache statisticsCache;
private final ClustersStorage clustersStorage;
private final ClusterMapper clusterMapper;
- private final MetricsService metricsService;
+ private final StatisticsService statisticsService;
public List getClusters() {
return clustersStorage.getKafkaClusters()
.stream()
- .map(c -> clusterMapper.toCluster(new InternalClusterState(c, metricsCache.get(c))))
+ .map(c -> clusterMapper.toCluster(new InternalClusterState(c, statisticsCache.get(c))))
.collect(Collectors.toList());
}
public Mono getClusterStats(KafkaCluster cluster) {
return Mono.justOrEmpty(
clusterMapper.toClusterStats(
- new InternalClusterState(cluster, metricsCache.get(cluster)))
+ new InternalClusterState(cluster, statisticsCache.get(cluster)))
);
}
public Mono getClusterMetrics(KafkaCluster cluster) {
return Mono.just(
clusterMapper.toClusterMetrics(
- metricsCache.get(cluster).getJmxMetrics()));
+ statisticsCache.get(cluster).getMetrics()));
}
public Mono updateCluster(KafkaCluster cluster) {
- return metricsService.updateCache(cluster)
+ return statisticsService.updateCache(cluster)
.map(metrics -> clusterMapper.toCluster(new InternalClusterState(cluster, metrics)));
}
}
\ No newline at end of file
diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/ClustersMetricsScheduler.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/ClustersStatisticsScheduler.java
similarity index 82%
rename from kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/ClustersMetricsScheduler.java
rename to kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/ClustersStatisticsScheduler.java
index 53fff7060b..f7ac1239a0 100644
--- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/ClustersMetricsScheduler.java
+++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/ClustersStatisticsScheduler.java
@@ -10,20 +10,20 @@ import reactor.core.scheduler.Schedulers;
@Component
@RequiredArgsConstructor
@Slf4j
-public class ClustersMetricsScheduler {
+public class ClustersStatisticsScheduler {
private final ClustersStorage clustersStorage;
- private final MetricsService metricsService;
+ private final StatisticsService statisticsService;
@Scheduled(fixedRateString = "${kafka.update-metrics-rate-millis:30000}")
- public void updateMetrics() {
+ public void updateStatistics() {
Flux.fromIterable(clustersStorage.getKafkaClusters())
.parallel()
.runOn(Schedulers.parallel())
.flatMap(cluster -> {
log.debug("Start getting metrics for kafkaCluster: {}", cluster.getName());
- return metricsService.updateCache(cluster)
+ return statisticsService.updateCache(cluster)
.doOnSuccess(m -> log.debug("Metrics updated for cluster: {}", cluster.getName()));
})
.then()
diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/MetricsCache.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/StatisticsCache.java
similarity index 54%
rename from kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/MetricsCache.java
rename to kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/StatisticsCache.java
index c568ef9df0..3acd64262b 100644
--- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/MetricsCache.java
+++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/StatisticsCache.java
@@ -1,61 +1,28 @@
package com.provectus.kafka.ui.service;
-import com.provectus.kafka.ui.model.Feature;
-import com.provectus.kafka.ui.model.InternalLogDirStats;
import com.provectus.kafka.ui.model.KafkaCluster;
import com.provectus.kafka.ui.model.ServerStatusDTO;
-import com.provectus.kafka.ui.util.JmxClusterUtil;
+import com.provectus.kafka.ui.model.Statistics;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
-import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
-import lombok.Builder;
-import lombok.Value;
import org.apache.kafka.clients.admin.ConfigEntry;
import org.apache.kafka.clients.admin.TopicDescription;
import org.springframework.stereotype.Component;
@Component
-public class MetricsCache {
+public class StatisticsCache {
- @Value
- @Builder(toBuilder = true)
- public static class Metrics {
- ServerStatusDTO status;
- Throwable lastKafkaException;
- String version;
- List features;
- ReactiveAdminClient.ClusterDescription clusterDescription;
- JmxClusterUtil.JmxMetrics jmxMetrics;
- InternalLogDirStats logDirInfo;
- Map topicDescriptions;
- Map> topicConfigs;
+ private final Map cache = new ConcurrentHashMap<>();
- public static Metrics empty() {
- return builder()
- .status(ServerStatusDTO.OFFLINE)
- .version("Unknown")
- .features(List.of())
- .clusterDescription(
- new ReactiveAdminClient.ClusterDescription(null, null, List.of(), Set.of()))
- .jmxMetrics(JmxClusterUtil.JmxMetrics.empty())
- .logDirInfo(InternalLogDirStats.empty())
- .topicDescriptions(Map.of())
- .topicConfigs(Map.of())
- .build();
- }
- }
-
- private final Map cache = new ConcurrentHashMap<>();
-
- public MetricsCache(ClustersStorage clustersStorage) {
- var initializing = Metrics.empty().toBuilder().status(ServerStatusDTO.INITIALIZING).build();
+ public StatisticsCache(ClustersStorage clustersStorage) {
+ var initializing = Statistics.empty().toBuilder().status(ServerStatusDTO.INITIALIZING).build();
clustersStorage.getKafkaClusters().forEach(c -> cache.put(c.getName(), initializing));
}
- public synchronized void replace(KafkaCluster c, Metrics stats) {
+ public synchronized void replace(KafkaCluster c, Statistics stats) {
cache.put(c.getName(), stats);
}
@@ -91,7 +58,7 @@ public class MetricsCache {
);
}
- public Metrics get(KafkaCluster c) {
+ public Statistics get(KafkaCluster c) {
return Objects.requireNonNull(cache.get(c.getName()), "Unknown cluster metrics requested");
}
diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/MetricsService.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/StatisticsService.java
similarity index 76%
rename from kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/MetricsService.java
rename to kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/StatisticsService.java
index ac58ba8b77..9f0b938999 100644
--- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/MetricsService.java
+++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/StatisticsService.java
@@ -3,8 +3,10 @@ package com.provectus.kafka.ui.service;
import com.provectus.kafka.ui.model.Feature;
import com.provectus.kafka.ui.model.InternalLogDirStats;
import com.provectus.kafka.ui.model.KafkaCluster;
+import com.provectus.kafka.ui.model.Metrics;
import com.provectus.kafka.ui.model.ServerStatusDTO;
-import com.provectus.kafka.ui.util.JmxClusterUtil;
+import com.provectus.kafka.ui.model.Statistics;
+import com.provectus.kafka.ui.service.metrics.MetricsCollector;
import java.util.List;
import java.util.Map;
import lombok.RequiredArgsConstructor;
@@ -17,33 +19,33 @@ import reactor.core.publisher.Mono;
@Service
@RequiredArgsConstructor
@Slf4j
-public class MetricsService {
+public class StatisticsService {
- private final JmxClusterUtil jmxClusterUtil;
+ private final MetricsCollector metricsClusterUtil;
private final AdminClientService adminClientService;
private final FeatureService featureService;
- private final MetricsCache cache;
+ private final StatisticsCache cache;
- public Mono updateCache(KafkaCluster c) {
- return getMetrics(c).doOnSuccess(m -> cache.replace(c, m));
+ public Mono updateCache(KafkaCluster c) {
+ return getStatistics(c).doOnSuccess(m -> cache.replace(c, m));
}
- private Mono getMetrics(KafkaCluster cluster) {
+ private Mono getStatistics(KafkaCluster cluster) {
return adminClientService.get(cluster).flatMap(ac ->
ac.describeCluster().flatMap(description ->
Mono.zip(
List.of(
- jmxClusterUtil.getBrokerMetrics(cluster, description.getNodes()),
+ metricsClusterUtil.getBrokerMetrics(cluster, description.getNodes()),
getLogDirInfo(cluster, ac),
featureService.getAvailableFeatures(cluster, description.getController()),
loadTopicConfigs(cluster),
describeTopics(cluster)),
results ->
- MetricsCache.Metrics.builder()
+ Statistics.builder()
.status(ServerStatusDTO.ONLINE)
.clusterDescription(description)
.version(ac.getVersion())
- .jmxMetrics((JmxClusterUtil.JmxMetrics) results[0])
+ .metrics((Metrics) results[0])
.logDirInfo((InternalLogDirStats) results[1])
.features((List) results[2])
.topicConfigs((Map>) results[3])
@@ -53,7 +55,7 @@ public class MetricsService {
.doOnError(e ->
log.error("Failed to collect cluster {} info", cluster.getName(), e))
.onErrorResume(
- e -> Mono.just(MetricsCache.Metrics.empty().toBuilder().lastKafkaException(e).build()));
+ e -> Mono.just(Statistics.empty().toBuilder().lastKafkaException(e).build()));
}
private Mono getLogDirInfo(KafkaCluster cluster, ReactiveAdminClient c) {
diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/TopicsService.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/TopicsService.java
index 91e5ebd2e6..1790a3005c 100644
--- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/TopicsService.java
+++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/TopicsService.java
@@ -15,13 +15,14 @@ import com.provectus.kafka.ui.model.InternalReplica;
import com.provectus.kafka.ui.model.InternalTopic;
import com.provectus.kafka.ui.model.InternalTopicConfig;
import com.provectus.kafka.ui.model.KafkaCluster;
+import com.provectus.kafka.ui.model.Metrics;
import com.provectus.kafka.ui.model.PartitionsIncreaseDTO;
import com.provectus.kafka.ui.model.PartitionsIncreaseResponseDTO;
import com.provectus.kafka.ui.model.ReplicationFactorChangeDTO;
import com.provectus.kafka.ui.model.ReplicationFactorChangeResponseDTO;
+import com.provectus.kafka.ui.model.Statistics;
import com.provectus.kafka.ui.model.TopicCreationDTO;
import com.provectus.kafka.ui.model.TopicUpdateDTO;
-import com.provectus.kafka.ui.util.JmxClusterUtil;
import java.time.Duration;
import java.util.Collection;
import java.util.Collections;
@@ -51,7 +52,7 @@ public class TopicsService {
private final AdminClientService adminClientService;
private final DeserializationService deserializationService;
- private final MetricsCache metricsCache;
+ private final StatisticsCache statisticsCache;
@Value("${topic.recreate.maxRetries:15}")
private int recreateMaxRetries;
@Value("${topic.recreate.delay.seconds:1}")
@@ -69,15 +70,15 @@ public class TopicsService {
.flatMap(ac ->
ac.describeTopics(topics).zipWith(ac.getTopicsConfig(topics),
(descriptions, configs) -> {
- metricsCache.update(c, descriptions, configs);
+ statisticsCache.update(c, descriptions, configs);
return getPartitionOffsets(descriptions, ac).map(offsets -> {
- var metrics = metricsCache.get(c);
+ var metrics = statisticsCache.get(c);
return createList(
topics,
descriptions,
configs,
offsets,
- metrics.getJmxMetrics(),
+ metrics.getMetrics(),
metrics.getLogDirInfo()
);
});
@@ -118,7 +119,7 @@ public class TopicsService {
Map descriptions,
Map> configs,
InternalPartitionsOffsets partitionsOffsets,
- JmxClusterUtil.JmxMetrics jmxMetrics,
+ Metrics metrics,
InternalLogDirStats logDirInfo) {
return orderedNames.stream()
.filter(descriptions::containsKey)
@@ -126,7 +127,7 @@ public class TopicsService {
descriptions.get(t),
configs.getOrDefault(t, List.of()),
partitionsOffsets,
- jmxMetrics,
+ metrics,
logDirInfo
))
.collect(toList());
@@ -249,7 +250,7 @@ public class TopicsService {
.flatMap(ac -> {
Integer actual = topic.getReplicationFactor();
Integer requested = replicationFactorChange.getTotalReplicationFactor();
- Integer brokersCount = metricsCache.get(cluster).getClusterDescription()
+ Integer brokersCount = statisticsCache.get(cluster).getClusterDescription()
.getNodes().size();
if (requested.equals(actual)) {
@@ -365,7 +366,7 @@ public class TopicsService {
private Map getBrokersMap(KafkaCluster cluster,
Map> currentAssignment) {
- Map result = metricsCache.get(cluster).getClusterDescription().getNodes()
+ Map result = statisticsCache.get(cluster).getClusterDescription().getNodes()
.stream()
.map(Node::id)
.collect(toMap(
@@ -413,9 +414,9 @@ public class TopicsService {
}
public Mono deleteTopic(KafkaCluster cluster, String topicName) {
- if (metricsCache.get(cluster).getFeatures().contains(Feature.TOPIC_DELETION)) {
+ if (statisticsCache.get(cluster).getFeatures().contains(Feature.TOPIC_DELETION)) {
return adminClientService.get(cluster).flatMap(c -> c.deleteTopic(topicName))
- .doOnSuccess(t -> metricsCache.onTopicDelete(cluster, topicName));
+ .doOnSuccess(t -> statisticsCache.onTopicDelete(cluster, topicName));
} else {
return Mono.error(new ValidationException("Topic deletion restricted"));
}
@@ -441,18 +442,18 @@ public class TopicsService {
}
public Mono> getTopicsForPagination(KafkaCluster cluster) {
- MetricsCache.Metrics metrics = metricsCache.get(cluster);
- return filterExisting(cluster, metrics.getTopicDescriptions().keySet())
- .map(lst -> lst.stream()
- .map(topicName ->
- InternalTopic.from(
- metrics.getTopicDescriptions().get(topicName),
- metrics.getTopicConfigs().getOrDefault(topicName, List.of()),
- InternalPartitionsOffsets.empty(),
- metrics.getJmxMetrics(),
- metrics.getLogDirInfo()))
- .collect(toList())
- );
+ Statistics stats = statisticsCache.get(cluster);
+ return filterExisting(cluster, stats.getTopicDescriptions().keySet())
+ .map(lst -> lst.stream()
+ .map(topicName ->
+ InternalTopic.from(
+ stats.getTopicDescriptions().get(topicName),
+ stats.getTopicConfigs().getOrDefault(topicName, List.of()),
+ InternalPartitionsOffsets.empty(),
+ stats.getMetrics(),
+ stats.getLogDirInfo()))
+ .collect(toList())
+ );
}
private Mono> filterExisting(KafkaCluster cluster, Collection topics) {
diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/metrics/JmxMetricsFormatter.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/metrics/JmxMetricsFormatter.java
new file mode 100644
index 0000000000..4d3d31f50f
--- /dev/null
+++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/metrics/JmxMetricsFormatter.java
@@ -0,0 +1,85 @@
+package com.provectus.kafka.ui.service.metrics;
+
+import java.math.BigDecimal;
+import java.util.ArrayList;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Optional;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import javax.management.MBeanAttributeInfo;
+import javax.management.ObjectName;
+
+/**
+ * Converts JMX metrics into JmxExporter prometheus format: format.
+ */
+class JmxMetricsFormatter {
+
+ // copied from https://github.com/prometheus/jmx_exporter/blob/b6b811b4aae994e812e902b26dd41f29364c0e2b/collector/src/main/java/io/prometheus/jmx/JmxMBeanPropertyCache.java#L15
+ private static final Pattern PROPERTY_PATTERN = Pattern.compile(
+ "([^,=:\\*\\?]+)=(\"(?:[^\\\\\"]*(?:\\\\.)?)*\"|[^,=:\"]*)");
+
+ static List constructMetricsList(ObjectName jmxMetric,
+ MBeanAttributeInfo[] attributes,
+ Object[] attrValues) {
+ String domain = fixIllegalChars(jmxMetric.getDomain());
+ LinkedHashMap labels = getLabelsMap(jmxMetric);
+ String firstLabel = labels.keySet().iterator().next();
+ String firstLabelValue = fixIllegalChars(labels.get(firstLabel));
+ labels.remove(firstLabel); //removing first label since it's value will be in name
+
+ List result = new ArrayList<>(attributes.length);
+ for (int i = 0; i < attributes.length; i++) {
+ String attrName = fixIllegalChars(attributes[i].getName());
+ convertNumericValue(attrValues[i]).ifPresent(convertedValue -> {
+ String name = String.format("%s_%s_%s", domain, firstLabelValue, attrName);
+ var metric = RawMetric.create(name, labels, convertedValue);
+ result.add(metric);
+ });
+ }
+ return result;
+ }
+
+ private static String fixIllegalChars(String str) {
+ return str
+ .replace('.', '_')
+ .replace('-', '_');
+ }
+
+ private static Optional convertNumericValue(Object value) {
+ if (!(value instanceof Number)) {
+ return Optional.empty();
+ }
+ try {
+ if (value instanceof Long) {
+ return Optional.of(new BigDecimal((Long) value));
+ } else if (value instanceof Integer) {
+ return Optional.of(new BigDecimal((Integer) value));
+ }
+ return Optional.of(new BigDecimal(value.toString()));
+ } catch (NumberFormatException nfe) {
+ return Optional.empty();
+ }
+ }
+
+ /**
+ * Converts Mbean properties to map keeping order (copied from jmx_exporter repo).
+ */
+ private static LinkedHashMap getLabelsMap(ObjectName mbeanName) {
+ LinkedHashMap keyProperties = new LinkedHashMap<>();
+ String properties = mbeanName.getKeyPropertyListString();
+ Matcher match = PROPERTY_PATTERN.matcher(properties);
+ while (match.lookingAt()) {
+ String labelName = fixIllegalChars(match.group(1)); // label names should be fixed
+ String labelValue = match.group(2);
+ keyProperties.put(labelName, labelValue);
+ properties = properties.substring(match.end());
+ if (properties.startsWith(",")) {
+ properties = properties.substring(1);
+ }
+ match.reset(properties);
+ }
+ return keyProperties;
+ }
+
+}
diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/metrics/JmxMetricsRetriever.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/metrics/JmxMetricsRetriever.java
new file mode 100644
index 0000000000..8a6e10656d
--- /dev/null
+++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/metrics/JmxMetricsRetriever.java
@@ -0,0 +1,106 @@
+package com.provectus.kafka.ui.service.metrics;
+
+import com.provectus.kafka.ui.model.JmxConnectionInfo;
+import com.provectus.kafka.ui.model.KafkaCluster;
+import com.provectus.kafka.ui.util.JmxPoolFactory;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import javax.management.MBeanAttributeInfo;
+import javax.management.MBeanServerConnection;
+import javax.management.ObjectName;
+import javax.management.remote.JMXConnector;
+import lombok.SneakyThrows;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.pool2.impl.GenericKeyedObjectPool;
+import org.apache.commons.pool2.impl.GenericKeyedObjectPoolConfig;
+import org.apache.kafka.common.Node;
+import org.springframework.context.annotation.Lazy;
+import org.springframework.stereotype.Service;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+import reactor.core.scheduler.Schedulers;
+
+
+@Service
+@Lazy
+@Slf4j
+class JmxMetricsRetriever implements MetricsRetriever, AutoCloseable {
+
+ private static final String JMX_URL = "service:jmx:rmi:///jndi/rmi://";
+ private static final String JMX_SERVICE_TYPE = "jmxrmi";
+ private static final String CANONICAL_NAME_PATTERN = "kafka.server*:*";
+
+ private final GenericKeyedObjectPool pool;
+
+ public JmxMetricsRetriever() {
+ this.pool = new GenericKeyedObjectPool<>(new JmxPoolFactory());
+ GenericKeyedObjectPoolConfig poolConfig = new GenericKeyedObjectPoolConfig<>();
+ poolConfig.setMaxIdlePerKey(3);
+ poolConfig.setMaxTotalPerKey(3);
+ this.pool.setConfig(poolConfig);
+ }
+
+ @Override
+ public Flux retrieve(KafkaCluster c, Node node) {
+ return Mono.fromSupplier(() -> retrieveSync(c, node))
+ .subscribeOn(Schedulers.boundedElastic())
+ .flatMapMany(Flux::fromIterable);
+ }
+
+ private List retrieveSync(KafkaCluster c, Node node) {
+ String jmxUrl = JMX_URL + node.host() + ":" + c.getMetricsConfig().getPort() + "/" + JMX_SERVICE_TYPE;
+ log.debug("Collection JMX metrics for {}", jmxUrl);
+ final var connectionInfo = JmxConnectionInfo.builder()
+ .url(jmxUrl)
+ .ssl(c.getMetricsConfig().isSsl())
+ .username(c.getMetricsConfig().getUsername())
+ .password(c.getMetricsConfig().getPassword())
+ .build();
+ JMXConnector srv;
+ try {
+ srv = pool.borrowObject(connectionInfo);
+ } catch (Exception e) {
+ log.error("Cannot get JMX connector for the pool due to: ", e);
+ return Collections.emptyList();
+ }
+ List result = new ArrayList<>();
+ try {
+ MBeanServerConnection msc = srv.getMBeanServerConnection();
+ var jmxMetrics = msc.queryNames(new ObjectName(CANONICAL_NAME_PATTERN), null);
+ for (ObjectName jmxMetric : jmxMetrics) {
+ result.addAll(extractObjectMetrics(jmxMetric, msc));
+ }
+ pool.returnObject(connectionInfo, srv);
+ } catch (Exception e) {
+ log.error("Error getting jmx metrics from {}", jmxUrl, e);
+ closeConnectionExceptionally(jmxUrl, srv);
+ }
+ log.debug("{} metrics collected for {}", result.size(), jmxUrl);
+ return result;
+ }
+
+ private void closeConnectionExceptionally(String url, JMXConnector srv) {
+ try {
+ pool.invalidateObject(new JmxConnectionInfo(url), srv);
+ } catch (Exception e) {
+ log.error("Cannot invalidate object in pool, {}", url, e);
+ }
+ }
+
+ @SneakyThrows
+ private List extractObjectMetrics(ObjectName objectName, MBeanServerConnection msc) {
+ MBeanAttributeInfo[] attrNames = msc.getMBeanInfo(objectName).getAttributes();
+ Object[] attrValues = new Object[attrNames.length];
+ for (int i = 0; i < attrNames.length; i++) {
+ attrValues[i] = msc.getAttribute(objectName, attrNames[i].getName());
+ }
+ return JmxMetricsFormatter.constructMetricsList(objectName, attrNames, attrValues);
+ }
+
+ @Override
+ public void close() {
+ this.pool.close();
+ }
+}
+
diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/metrics/MetricsCollector.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/metrics/MetricsCollector.java
new file mode 100644
index 0000000000..bd0506bed8
--- /dev/null
+++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/metrics/MetricsCollector.java
@@ -0,0 +1,69 @@
+package com.provectus.kafka.ui.service.metrics;
+
+import com.provectus.kafka.ui.model.KafkaCluster;
+import com.provectus.kafka.ui.model.Metrics;
+import com.provectus.kafka.ui.model.MetricsConfig;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.common.Node;
+import org.springframework.stereotype.Component;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+import reactor.util.function.Tuple2;
+import reactor.util.function.Tuples;
+
+@Component
+@Slf4j
+@RequiredArgsConstructor
+public class MetricsCollector {
+
+ private final JmxMetricsRetriever jmxMetricsRetriever;
+ private final PrometheusMetricsRetriever prometheusMetricsRetriever;
+
+ public Mono getBrokerMetrics(KafkaCluster cluster, Collection nodes) {
+ return Flux.fromIterable(nodes)
+ .flatMap(n -> getMetrics(cluster, n).map(lst -> Tuples.of(n, lst)))
+ .collectMap(Tuple2::getT1, Tuple2::getT2)
+ .map(nodeMetrics -> collectMetrics(cluster, nodeMetrics))
+ .defaultIfEmpty(Metrics.empty());
+ }
+
+ private Mono> getMetrics(KafkaCluster kafkaCluster, Node node) {
+ Flux metricFlux = Flux.empty();
+ if (kafkaCluster.getMetricsConfig() != null) {
+ String type = kafkaCluster.getMetricsConfig().getType();
+ if (type == null || type.equalsIgnoreCase(MetricsConfig.JMX_METRICS_TYPE)) {
+ metricFlux = jmxMetricsRetriever.retrieve(kafkaCluster, node);
+ } else if (type.equalsIgnoreCase(MetricsConfig.PROMETHEUS_METRICS_TYPE)) {
+ metricFlux = prometheusMetricsRetriever.retrieve(kafkaCluster, node);
+ }
+ }
+ return metricFlux.collectList();
+ }
+
+ public Metrics collectMetrics(KafkaCluster cluster, Map> perBrokerMetrics) {
+ Metrics.MetricsBuilder builder = Metrics.builder()
+ .perBrokerMetrics(
+ perBrokerMetrics.entrySet()
+ .stream()
+ .collect(Collectors.toMap(e -> e.getKey().id(), Map.Entry::getValue)));
+
+ populateWellknowMetrics(cluster, perBrokerMetrics)
+ .apply(builder);
+
+ return builder.build();
+ }
+
+ private WellKnownMetrics populateWellknowMetrics(KafkaCluster cluster, Map> perBrokerMetrics) {
+ WellKnownMetrics wellKnownMetrics = new WellKnownMetrics();
+ perBrokerMetrics.forEach((node, metrics) ->
+ metrics.forEach(metric ->
+ wellKnownMetrics.populate(cluster, node, metric)));
+ return wellKnownMetrics;
+ }
+
+}
diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/metrics/MetricsRetriever.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/metrics/MetricsRetriever.java
new file mode 100644
index 0000000000..7e1e126fa0
--- /dev/null
+++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/metrics/MetricsRetriever.java
@@ -0,0 +1,9 @@
+package com.provectus.kafka.ui.service.metrics;
+
+import com.provectus.kafka.ui.model.KafkaCluster;
+import org.apache.kafka.common.Node;
+import reactor.core.publisher.Flux;
+
+interface MetricsRetriever {
+ Flux retrieve(KafkaCluster c, Node node);
+}
diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/metrics/PrometheusEndpointMetricsParser.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/metrics/PrometheusEndpointMetricsParser.java
new file mode 100644
index 0000000000..1a51ca0afa
--- /dev/null
+++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/metrics/PrometheusEndpointMetricsParser.java
@@ -0,0 +1,46 @@
+package com.provectus.kafka.ui.service.metrics;
+
+import java.math.BigDecimal;
+import java.util.Arrays;
+import java.util.Optional;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.math.NumberUtils;
+
+@Slf4j
+class PrometheusEndpointMetricsParser {
+
+ /**
+ * Matches openmetrics format. For example, string:
+ * kafka_server_BrokerTopicMetrics_FiveMinuteRate{name="BytesInPerSec",topic="__consumer_offsets",} 16.94886650744339
+ * will produce:
+ * name=kafka_server_BrokerTopicMetrics_FiveMinuteRate
+ * value=16.94886650744339
+ * labels={name="BytesInPerSec", topic="__consumer_offsets"}",
+ */
+ private static final Pattern PATTERN = Pattern.compile(
+ "(?^\\w+)([ \t]*\\{*(?.*)}*)[ \\t]+(?[\\d]+\\.?[\\d]+)?");
+
+ static Optional parse(String s) {
+ Matcher matcher = PATTERN.matcher(s);
+ if (matcher.matches()) {
+ String value = matcher.group("value");
+ String metricName = matcher.group("metricName");
+ if (metricName == null || !NumberUtils.isCreatable(value)) {
+ return Optional.empty();
+ }
+ var labels = Arrays.stream(matcher.group("properties").split(","))
+ .filter(str -> !"".equals(str))
+ .map(str -> str.split("="))
+ .filter(spit -> spit.length == 2)
+ .collect(Collectors.toUnmodifiableMap(
+ str -> str[0].trim(),
+ str -> str[1].trim().replace("\"", "")));
+
+ return Optional.of(RawMetric.create(metricName, labels, new BigDecimal(value)));
+ }
+ return Optional.empty();
+ }
+}
diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/metrics/PrometheusMetricsRetriever.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/metrics/PrometheusMetricsRetriever.java
new file mode 100644
index 0000000000..8e9334b418
--- /dev/null
+++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/metrics/PrometheusMetricsRetriever.java
@@ -0,0 +1,58 @@
+package com.provectus.kafka.ui.service.metrics;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Strings;
+import com.provectus.kafka.ui.model.KafkaCluster;
+import java.util.Arrays;
+import java.util.Optional;
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.common.Node;
+import org.springframework.stereotype.Service;
+import org.springframework.web.reactive.function.client.WebClient;
+import org.springframework.web.util.UriComponentsBuilder;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+
+@Service
+@RequiredArgsConstructor
+@Slf4j
+class PrometheusMetricsRetriever implements MetricsRetriever {
+
+ private static final String METRICS_ENDPOINT_PATH = "/metrics";
+ private static final int DEFAULT_EXPORTER_PORT = 11001;
+
+ private final WebClient webClient;
+
+ @Override
+ public Flux retrieve(KafkaCluster c, Node node) {
+ log.debug("Retrieving metrics from prometheus exporter: {}:{}", node.host(), c.getMetricsConfig().getPort());
+ var metricsConfig = c.getMetricsConfig();
+ Integer port = Optional.ofNullable(metricsConfig.getPort()).orElse(DEFAULT_EXPORTER_PORT);
+ return retrieve(node.host(), port, metricsConfig.isSsl());
+ }
+
+ @VisibleForTesting
+ Flux retrieve(String host, int port, boolean ssl) {
+ WebClient.ResponseSpec responseSpec = webClient.get()
+ .uri(UriComponentsBuilder.newInstance()
+ .scheme(ssl ? "https" : "http")
+ .host(host)
+ .port(port)
+ .path(METRICS_ENDPOINT_PATH).build().toUri())
+ .retrieve();
+
+ return responseSpec.bodyToMono(String.class)
+ .doOnError(e -> log.error("Error while getting metrics from {}", host, e))
+ .onErrorResume(th -> Mono.empty())
+ .flatMapMany(body ->
+ Flux.fromStream(
+ Arrays.stream(body.split("\\n"))
+ .filter(str -> !Strings.isNullOrEmpty(str) && !str.startsWith("#")) // skipping comments strings
+ .map(PrometheusEndpointMetricsParser::parse)
+ .filter(Optional::isPresent)
+ .map(Optional::get)
+ )
+ );
+ }
+}
diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/metrics/RawMetric.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/metrics/RawMetric.java
new file mode 100644
index 0000000000..659212f23f
--- /dev/null
+++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/metrics/RawMetric.java
@@ -0,0 +1,60 @@
+package com.provectus.kafka.ui.service.metrics;
+
+import java.math.BigDecimal;
+import java.util.Map;
+import lombok.AllArgsConstructor;
+import lombok.EqualsAndHashCode;
+import lombok.ToString;
+
+public interface RawMetric {
+
+ String name();
+
+ Map labels();
+
+ BigDecimal value();
+
+ // Key, that can be used for metrics reductions
+ default Object identityKey() {
+ return name() + "_" + labels();
+ }
+
+ RawMetric copyWithValue(BigDecimal newValue);
+
+ //--------------------------------------------------
+
+ static RawMetric create(String name, Map labels, BigDecimal value) {
+ return new SimpleMetric(name, labels, value);
+ }
+
+ @AllArgsConstructor
+ @EqualsAndHashCode
+ @ToString
+ class SimpleMetric implements RawMetric {
+
+ private final String name;
+ private final Map labels;
+ private final BigDecimal value;
+
+ @Override
+ public String name() {
+ return name;
+ }
+
+ @Override
+ public Map labels() {
+ return labels;
+ }
+
+ @Override
+ public BigDecimal value() {
+ return value;
+ }
+
+ @Override
+ public RawMetric copyWithValue(BigDecimal newValue) {
+ return new SimpleMetric(name, labels, newValue);
+ }
+ }
+
+}
diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/metrics/WellKnownMetrics.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/metrics/WellKnownMetrics.java
new file mode 100644
index 0000000000..fe8a55da12
--- /dev/null
+++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/metrics/WellKnownMetrics.java
@@ -0,0 +1,42 @@
+package com.provectus.kafka.ui.service.metrics;
+
+import static org.apache.commons.lang3.StringUtils.containsIgnoreCase;
+import static org.apache.commons.lang3.StringUtils.endsWithIgnoreCase;
+
+import com.provectus.kafka.ui.model.KafkaCluster;
+import com.provectus.kafka.ui.model.Metrics;
+import java.math.BigDecimal;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.kafka.common.Node;
+
+class WellKnownMetrics {
+
+ final Map bytesInFifteenMinuteRate = new HashMap<>();
+ final Map bytesOutFifteenMinuteRate = new HashMap<>();
+
+ void populate(KafkaCluster cluster, Node node, RawMetric rawMetric) {
+ updateTopicsIOrates(rawMetric);
+ }
+
+ void apply(Metrics.MetricsBuilder metricsBuilder) {
+ metricsBuilder.bytesInPerSec(bytesInFifteenMinuteRate);
+ metricsBuilder.bytesOutPerSec(bytesOutFifteenMinuteRate);
+ }
+
+ private void updateTopicsIOrates(RawMetric rawMetric) {
+ String name = rawMetric.name();
+ String topic = rawMetric.labels().get("topic");
+ if (topic != null
+ && containsIgnoreCase(name, "BrokerTopicMetrics")
+ && endsWithIgnoreCase(name, "FifteenMinuteRate")) {
+ String nameProperty = rawMetric.labels().get("name");
+ if ("BytesInPerSec".equalsIgnoreCase(nameProperty)) {
+ bytesInFifteenMinuteRate.compute(topic, (k, v) -> v == null ? rawMetric.value() : v.add(rawMetric.value()));
+ } else if ("BytesOutPerSec".equalsIgnoreCase(nameProperty)) {
+ bytesOutFifteenMinuteRate.compute(topic, (k, v) -> v == null ? rawMetric.value() : v.add(rawMetric.value()));
+ }
+ }
+ }
+
+}
diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/util/ClusterUtil.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/util/ClusterUtil.java
new file mode 100644
index 0000000000..e69de29bb2
diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/util/JmxClusterUtil.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/util/JmxClusterUtil.java
deleted file mode 100644
index 163ebb5e9e..0000000000
--- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/util/JmxClusterUtil.java
+++ /dev/null
@@ -1,217 +0,0 @@
-package com.provectus.kafka.ui.util;
-
-import static java.util.stream.Collectors.groupingBy;
-import static java.util.stream.Collectors.reducing;
-import static java.util.stream.Collectors.toList;
-
-import com.provectus.kafka.ui.model.JmxBrokerMetrics;
-import com.provectus.kafka.ui.model.JmxConnectionInfo;
-import com.provectus.kafka.ui.model.KafkaCluster;
-import com.provectus.kafka.ui.model.MetricDTO;
-import java.math.BigDecimal;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Hashtable;
-import java.util.List;
-import java.util.Map;
-import java.util.Optional;
-import java.util.stream.Collectors;
-import java.util.stream.Stream;
-import javax.management.MBeanAttributeInfo;
-import javax.management.MBeanServerConnection;
-import javax.management.ObjectName;
-import javax.management.remote.JMXConnector;
-import lombok.Builder;
-import lombok.RequiredArgsConstructor;
-import lombok.SneakyThrows;
-import lombok.Value;
-import lombok.extern.slf4j.Slf4j;
-import org.apache.commons.pool2.KeyedObjectPool;
-import org.apache.kafka.common.Node;
-import org.jetbrains.annotations.Nullable;
-import org.springframework.stereotype.Component;
-import reactor.core.publisher.Flux;
-import reactor.core.publisher.Mono;
-import reactor.core.scheduler.Schedulers;
-import reactor.util.function.Tuple2;
-import reactor.util.function.Tuples;
-
-@Component
-@Slf4j
-@RequiredArgsConstructor
-public class JmxClusterUtil {
-
- private static final String JMX_URL = "service:jmx:rmi:///jndi/rmi://";
- private static final String JMX_SERVICE_TYPE = "jmxrmi";
- private static final String KAFKA_SERVER_PARAM = "kafka.server";
- private static final String NAME_METRIC_FIELD = "name";
- private final KeyedObjectPool pool;
-
- @Builder
- @Value
- public static class JmxMetrics {
- Map bytesInPerSec;
- Map bytesOutPerSec;
- Map internalBrokerMetrics;
- List metrics;
-
- public static JmxMetrics empty() {
- return JmxClusterUtil.JmxMetrics.builder()
- .bytesInPerSec(Map.of())
- .bytesOutPerSec(Map.of())
- .internalBrokerMetrics(Map.of())
- .metrics(List.of())
- .build();
- }
- }
-
- public Mono getBrokerMetrics(KafkaCluster cluster, Collection nodes) {
- return Flux.fromIterable(nodes)
- // jmx is a blocking api, so we trying to parallelize its execution on boundedElastic scheduler
- .parallel()
- .runOn(Schedulers.boundedElastic())
- .map(n -> Map.entry(n.id(),
- JmxBrokerMetrics.builder().metrics(getJmxMetric(cluster, n)).build()))
- .sequential()
- .collectMap(Map.Entry::getKey, Map.Entry::getValue)
- .map(this::collectMetrics);
- }
-
- private List getJmxMetric(KafkaCluster cluster, Node node) {
- return Optional.of(cluster)
- .filter(c -> c.getJmxPort() != null)
- .filter(c -> c.getJmxPort() > 0)
- .map(c -> getJmxMetrics(node.host(), c.getJmxPort(), c.isJmxSsl(),
- c.getJmxUsername(), c.getJmxPassword()))
- .orElse(Collections.emptyList());
- }
-
- @SneakyThrows
- private List getJmxMetrics(String host, int port, boolean jmxSsl,
- @Nullable String username, @Nullable String password) {
- String jmxUrl = JMX_URL + host + ":" + port + "/" + JMX_SERVICE_TYPE;
- final var connectionInfo = JmxConnectionInfo.builder()
- .url(jmxUrl)
- .ssl(jmxSsl)
- .username(username)
- .password(password)
- .build();
- JMXConnector srv;
- try {
- srv = pool.borrowObject(connectionInfo);
- } catch (Exception e) {
- log.error("Cannot get JMX connector for the pool due to: ", e);
- return Collections.emptyList();
- }
-
- List result = new ArrayList<>();
- try {
- MBeanServerConnection msc = srv.getMBeanServerConnection();
- var jmxMetrics = msc.queryNames(null, null).stream()
- .filter(q -> q.getCanonicalName().startsWith(KAFKA_SERVER_PARAM))
- .collect(Collectors.toList());
- for (ObjectName jmxMetric : jmxMetrics) {
- final Hashtable params = jmxMetric.getKeyPropertyList();
- MetricDTO metric = new MetricDTO();
- metric.setName(params.get(NAME_METRIC_FIELD));
- metric.setCanonicalName(jmxMetric.getCanonicalName());
- metric.setParams(params);
- metric.setValue(getJmxMetrics(jmxMetric.getCanonicalName(), msc));
- result.add(metric);
- }
- pool.returnObject(connectionInfo, srv);
- } catch (Exception e) {
- log.error("Cannot get jmxMetricsNames, {}", jmxUrl, e);
- closeConnectionExceptionally(jmxUrl, srv);
- }
- return result;
- }
-
- @SneakyThrows
- private Map getJmxMetrics(String canonicalName, MBeanServerConnection msc) {
- Map resultAttr = new HashMap<>();
- ObjectName name = new ObjectName(canonicalName);
- var attrNames = msc.getMBeanInfo(name).getAttributes();
- for (MBeanAttributeInfo attrName : attrNames) {
- var value = msc.getAttribute(name, attrName.getName());
- if (NumberUtil.isNumeric(value)) {
- resultAttr.put(attrName.getName(), new BigDecimal(value.toString()));
- }
- }
- return resultAttr;
- }
-
- private JmxMetrics collectMetrics(Map perBrokerJmxMetrics) {
- final List metrics = perBrokerJmxMetrics.values()
- .stream()
- .flatMap(b -> b.getMetrics().stream())
- .collect(
- groupingBy(
- MetricDTO::getCanonicalName,
- reducing(this::reduceJmxMetrics)
- )
- ).values().stream()
- .filter(Optional::isPresent)
- .map(Optional::get)
- .collect(toList());
- return JmxMetrics.builder()
- .metrics(metrics)
- .internalBrokerMetrics(perBrokerJmxMetrics)
- .bytesInPerSec(findTopicMetrics(
- metrics, JmxMetricsName.BYTES_IN_PER_SEC, JmxMetricsValueName.FIFTEEN_MINUTE_RATE))
- .bytesOutPerSec(findTopicMetrics(
- metrics, JmxMetricsName.BYTES_OUT_PER_SEC, JmxMetricsValueName.FIFTEEN_MINUTE_RATE))
- .build();
- }
-
- private Map findTopicMetrics(List metrics,
- JmxMetricsName metricsName,
- JmxMetricsValueName valueName) {
- return metrics.stream().filter(m -> metricsName.getValue().equals(m.getName()))
- .filter(m -> m.getParams().containsKey("topic"))
- .filter(m -> m.getValue().containsKey(valueName.getValue()))
- .map(m -> Tuples.of(
- m.getParams().get("topic"),
- m.getValue().get(valueName.getValue())
- )).collect(groupingBy(
- Tuple2::getT1,
- reducing(BigDecimal.ZERO, Tuple2::getT2, BigDecimal::add)
- ));
- }
-
- private void closeConnectionExceptionally(String url, JMXConnector srv) {
- try {
- pool.invalidateObject(new JmxConnectionInfo(url), srv);
- } catch (Exception e) {
- log.error("Cannot invalidate object in pool, {}", url);
- }
- }
-
- public MetricDTO reduceJmxMetrics(MetricDTO metric1, MetricDTO metric2) {
- var result = new MetricDTO();
- Map value = Stream.concat(
- metric1.getValue().entrySet().stream(),
- metric2.getValue().entrySet().stream()
- ).collect(Collectors.groupingBy(
- Map.Entry::getKey,
- Collectors.reducing(BigDecimal.ZERO, Map.Entry::getValue, BigDecimal::add)
- ));
- result.setName(metric1.getName());
- result.setCanonicalName(metric1.getCanonicalName());
- result.setParams(metric1.getParams());
- result.setValue(value);
- return result;
- }
-
- private boolean isWellKnownMetric(MetricDTO metric) {
- final Optional param =
- Optional.ofNullable(metric.getParams().get(NAME_METRIC_FIELD)).filter(p ->
- Arrays.stream(JmxMetricsName.values()).map(JmxMetricsName::getValue)
- .anyMatch(n -> n.equals(p))
- );
- return metric.getCanonicalName().contains(KAFKA_SERVER_PARAM) && param.isPresent();
- }
-}
diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/util/JmxMetricsName.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/util/JmxMetricsName.java
deleted file mode 100644
index 2384fc8536..0000000000
--- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/util/JmxMetricsName.java
+++ /dev/null
@@ -1,41 +0,0 @@
-package com.provectus.kafka.ui.util;
-
-public enum JmxMetricsName {
- MESSAGES_IN_PER_SEC("MessagesInPerSec"),
- BYTES_IN_PER_SEC("BytesInPerSec"),
- REPLICATION_BYTES_IN_PER_SEC("ReplicationBytesInPerSec"),
- REQUESTS_PER_SEC("RequestsPerSec"),
- ERRORS_PER_SEC("ErrorsPerSec"),
- MESSAGE_CONVERSIONS_PER_SEC("MessageConversionsPerSec"),
- BYTES_OUT_PER_SEC("BytesOutPerSec"),
- REPLICATION_BYTES_OUT_PER_SEC("ReplicationBytesOutPerSec"),
- NO_KEY_COMPACTED_TOPIC_RECORDS_PER_SEC("NoKeyCompactedTopicRecordsPerSec"),
- INVALID_MAGIC_NUMBER_RECORDS_PER_SEC("InvalidMagicNumberRecordsPerSec"),
- INVALID_MESSAGE_CRC_RECORDS_PER_SEC("InvalidMessageCrcRecordsPerSec"),
- INVALID_OFFSET_OR_SEQUENCE_RECORDS_PER_SEC("InvalidOffsetOrSequenceRecordsPerSec"),
- UNCLEAN_LEADER_ELECTIONS_PER_SEC("UncleanLeaderElectionsPerSec"),
- ISR_SHRINKS_PER_SEC("IsrShrinksPerSec"),
- ISR_EXPANDS_PER_SEC("IsrExpandsPerSec"),
- REASSIGNMENT_BYTES_OUT_PER_SEC("ReassignmentBytesOutPerSec"),
- REASSIGNMENT_BYTES_IN_PER_SEC("ReassignmentBytesInPerSec"),
- PRODUCE_MESSAGE_CONVERSIONS_PER_SEC("ProduceMessageConversionsPerSec"),
- FAILED_FETCH_REQUESTS_PER_SEC("FailedFetchRequestsPerSec"),
- ZOOKEEPER_SYNC_CONNECTS_PER_SEC("ZooKeeperSyncConnectsPerSec"),
- BYTES_REJECTED_PER_SEC("BytesRejectedPerSec"),
- ZOO_KEEPER_AUTH_FAILURES_PER_SEC("ZooKeeperAuthFailuresPerSec"),
- TOTAL_FETCH_REQUESTS_PER_SEC("TotalFetchRequestsPerSec"),
- FAILED_ISR_UPDATES_PER_SEC("FailedIsrUpdatesPerSec"),
- INCREMENTAL_FETCH_SESSION_EVICTIONS_PER_SEC("IncrementalFetchSessionEvictionsPerSec"),
- FETCH_MESSAGE_CONVERSIONS_PER_SEC("FetchMessageConversionsPerSec"),
- FAILED_PRODUCE_REQUESTS_PER_SEC("FailedProduceRequestsPerSe");
-
- private final String value;
-
- JmxMetricsName(String value) {
- this.value = value;
- }
-
- public String getValue() {
- return value;
- }
-}
diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/util/JmxMetricsValueName.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/util/JmxMetricsValueName.java
deleted file mode 100644
index cbcc6cee07..0000000000
--- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/util/JmxMetricsValueName.java
+++ /dev/null
@@ -1,19 +0,0 @@
-package com.provectus.kafka.ui.util;
-
-public enum JmxMetricsValueName {
- COUNT("Count"),
- ONE_MINUTE_RATE("OneMinuteRate"),
- FIFTEEN_MINUTE_RATE("FifteenMinuteRate"),
- FIVE_MINUTE_RATE("FiveMinuteRate"),
- MEAN_RATE("MeanRate");
-
- private final String value;
-
- JmxMetricsValueName(String value) {
- this.value = value;
- }
-
- public String getValue() {
- return value;
- }
-}
diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/util/NumberUtil.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/util/NumberUtil.java
index 9ea8c037cc..7237f1b886 100644
--- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/util/NumberUtil.java
+++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/util/NumberUtil.java
@@ -1,7 +1,6 @@
package com.provectus.kafka.ui.util;
import lombok.extern.slf4j.Slf4j;
-import org.apache.commons.lang3.math.NumberUtils;
@Slf4j
public class NumberUtil {
@@ -9,9 +8,6 @@ public class NumberUtil {
private NumberUtil() {
}
- public static boolean isNumeric(Object value) {
- return value != null && NumberUtils.isCreatable(value.toString());
- }
public static float parserClusterVersion(String version) throws NumberFormatException {
log.trace("Parsing cluster version [{}]", version);
diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/util/OffsetsSeek.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/util/OffsetsSeek.java
index 0fd830b323..e8d475a65d 100644
--- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/util/OffsetsSeek.java
+++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/util/OffsetsSeek.java
@@ -93,11 +93,9 @@ public abstract class OffsetsSeek {
public static class WaitingOffsets {
private final Map endOffsets; // partition number -> offset
private final Map beginOffsets; // partition number -> offset
- private final String topic;
public WaitingOffsets(String topic, Consumer, ?> consumer,
Collection partitions) {
- this.topic = topic;
var allBeginningOffsets = consumer.beginningOffsets(partitions);
var allEndOffsets = consumer.endOffsets(partitions);
diff --git a/kafka-ui-api/src/main/resources/application-local.yml b/kafka-ui-api/src/main/resources/application-local.yml
index 4f55ed49cc..fd14a57ea0 100644
--- a/kafka-ui-api/src/main/resources/application-local.yml
+++ b/kafka-ui-api/src/main/resources/application-local.yml
@@ -8,7 +8,9 @@ kafka:
kafkaConnect:
- name: first
address: http://localhost:8083
- jmxPort: 9997
+ metrics:
+ port: 9997
+ type: JMX
# -
# name: secondLocal
# bootstrapServers: localhost:9093
@@ -17,7 +19,9 @@ kafka:
# kafkaConnect:
# - name: first
# address: http://localhost:8083
- # jmxPort: 9998
+ # metrics:
+ # port: 9998
+ # type: JMX
# read-only: true
# -
# name: localUsingProtobufFile
diff --git a/kafka-ui-api/src/test/java/com/provectus/kafka/ui/AbstractIntegrationTest.java b/kafka-ui-api/src/test/java/com/provectus/kafka/ui/AbstractIntegrationTest.java
index 5de2b93baa..afecccd107 100644
--- a/kafka-ui-api/src/test/java/com/provectus/kafka/ui/AbstractIntegrationTest.java
+++ b/kafka-ui-api/src/test/java/com/provectus/kafka/ui/AbstractIntegrationTest.java
@@ -8,7 +8,6 @@ import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.admin.NewTopic;
import org.jetbrains.annotations.NotNull;
-import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.api.function.ThrowingConsumer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.autoconfigure.web.reactive.AutoConfigureWebTestClient;
diff --git a/kafka-ui-api/src/test/java/com/provectus/kafka/ui/producer/KafkaTestProducer.java b/kafka-ui-api/src/test/java/com/provectus/kafka/ui/producer/KafkaTestProducer.java
index c892418843..eaceb9ef24 100644
--- a/kafka-ui-api/src/test/java/com/provectus/kafka/ui/producer/KafkaTestProducer.java
+++ b/kafka-ui-api/src/test/java/com/provectus/kafka/ui/producer/KafkaTestProducer.java
@@ -2,7 +2,6 @@ package com.provectus.kafka.ui.producer;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.Future;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
diff --git a/kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/SendAndReadTests.java b/kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/SendAndReadTests.java
index 1ddfd2d9ec..4de939e033 100644
--- a/kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/SendAndReadTests.java
+++ b/kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/SendAndReadTests.java
@@ -129,9 +129,6 @@ public class SendAndReadTests extends AbstractIntegrationTest {
@Autowired
private ClustersStorage clustersStorage;
- @Autowired
- private ClustersMetricsScheduler clustersMetricsScheduler;
-
@BeforeEach
void init() {
targetCluster = clustersStorage.getClusterByName(LOCAL).get();
diff --git a/kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/TopicsServicePaginationTest.java b/kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/TopicsServicePaginationTest.java
index 42e44b0897..cc057db0b0 100644
--- a/kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/TopicsServicePaginationTest.java
+++ b/kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/TopicsServicePaginationTest.java
@@ -14,11 +14,11 @@ import com.provectus.kafka.ui.model.InternalPartitionsOffsets;
import com.provectus.kafka.ui.model.InternalSchemaRegistry;
import com.provectus.kafka.ui.model.InternalTopic;
import com.provectus.kafka.ui.model.KafkaCluster;
+import com.provectus.kafka.ui.model.Metrics;
import com.provectus.kafka.ui.model.SortOrderDTO;
import com.provectus.kafka.ui.model.TopicColumnsToSortDTO;
import com.provectus.kafka.ui.model.TopicDTO;
import com.provectus.kafka.ui.service.analyze.TopicAnalysisService;
-import com.provectus.kafka.ui.util.JmxClusterUtil;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;
@@ -42,7 +42,7 @@ class TopicsServicePaginationTest {
private final ClustersStorage clustersStorage = mock(ClustersStorage.class);
private final ClusterMapper clusterMapper = new ClusterMapperImpl();
- private final TopicsController topicsController = new TopicsController(
+ private final TopicsController topicsController = new TopicsController(
topicsService, mock(TopicAnalysisService.class), clusterMapper);
private void init(Map topicsInCache) {
@@ -66,7 +66,7 @@ class TopicsServicePaginationTest {
.map(Objects::toString)
.map(name -> new TopicDescription(name, false, List.of()))
.map(topicDescription -> InternalTopic.from(topicDescription, List.of(), null,
- JmxClusterUtil.JmxMetrics.empty(), InternalLogDirStats.empty()))
+ Metrics.empty(), InternalLogDirStats.empty()))
.collect(Collectors.toMap(InternalTopic::getName, Function.identity()))
);
@@ -93,7 +93,7 @@ class TopicsServicePaginationTest {
.map(Objects::toString)
.map(name -> new TopicDescription(name, false, List.of()))
.map(topicDescription -> InternalTopic.from(topicDescription, List.of(), null,
- JmxClusterUtil.JmxMetrics.empty(), InternalLogDirStats.empty()))
+ Metrics.empty(), InternalLogDirStats.empty()))
.collect(Collectors.toMap(InternalTopic::getName, Function.identity()));
init(internalTopics);
@@ -120,7 +120,7 @@ class TopicsServicePaginationTest {
.map(Objects::toString)
.map(name -> new TopicDescription(name, false, List.of()))
.map(topicDescription -> InternalTopic.from(topicDescription, List.of(), null,
- JmxClusterUtil.JmxMetrics.empty(), InternalLogDirStats.empty()))
+ Metrics.empty(), InternalLogDirStats.empty()))
.collect(Collectors.toMap(InternalTopic::getName, Function.identity()))
);
@@ -139,7 +139,7 @@ class TopicsServicePaginationTest {
.map(Objects::toString)
.map(name -> new TopicDescription(name, false, List.of()))
.map(topicDescription -> InternalTopic.from(topicDescription, List.of(), null,
- JmxClusterUtil.JmxMetrics.empty(), InternalLogDirStats.empty()))
+ Metrics.empty(), InternalLogDirStats.empty()))
.collect(Collectors.toMap(InternalTopic::getName, Function.identity()))
);
@@ -158,7 +158,7 @@ class TopicsServicePaginationTest {
.map(Objects::toString)
.map(name -> new TopicDescription(name, Integer.parseInt(name) % 10 == 0, List.of()))
.map(topicDescription -> InternalTopic.from(topicDescription, List.of(), null,
- JmxClusterUtil.JmxMetrics.empty(), InternalLogDirStats.empty()))
+ Metrics.empty(), InternalLogDirStats.empty()))
.collect(Collectors.toMap(InternalTopic::getName, Function.identity()))
);
@@ -179,7 +179,7 @@ class TopicsServicePaginationTest {
.map(Objects::toString)
.map(name -> new TopicDescription(name, Integer.parseInt(name) % 5 == 0, List.of()))
.map(topicDescription -> InternalTopic.from(topicDescription, List.of(), null,
- JmxClusterUtil.JmxMetrics.empty(), InternalLogDirStats.empty()))
+ Metrics.empty(), InternalLogDirStats.empty()))
.collect(Collectors.toMap(InternalTopic::getName, Function.identity()))
);
@@ -200,7 +200,7 @@ class TopicsServicePaginationTest {
.map(Objects::toString)
.map(name -> new TopicDescription(name, false, List.of()))
.map(topicDescription -> InternalTopic.from(topicDescription, List.of(), null,
- JmxClusterUtil.JmxMetrics.empty(), InternalLogDirStats.empty()))
+ Metrics.empty(), InternalLogDirStats.empty()))
.collect(Collectors.toMap(InternalTopic::getName, Function.identity()))
);
@@ -222,7 +222,7 @@ class TopicsServicePaginationTest {
new TopicPartitionInfo(p, null, List.of(), List.of()))
.collect(Collectors.toList())))
.map(topicDescription -> InternalTopic.from(topicDescription, List.of(), InternalPartitionsOffsets.empty(),
- JmxClusterUtil.JmxMetrics.empty(), InternalLogDirStats.empty()))
+ Metrics.empty(), InternalLogDirStats.empty()))
.collect(Collectors.toMap(InternalTopic::getName, Function.identity()));
init(internalTopics);
diff --git a/kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/metrics/JmxMetricsFormatterTest.java b/kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/metrics/JmxMetricsFormatterTest.java
new file mode 100644
index 0000000000..1a4ff5134e
--- /dev/null
+++ b/kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/metrics/JmxMetricsFormatterTest.java
@@ -0,0 +1,77 @@
+package com.provectus.kafka.ui.service.metrics;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.math.BigDecimal;
+import java.util.List;
+import java.util.Map;
+import javax.management.MBeanAttributeInfo;
+import javax.management.ObjectName;
+import org.assertj.core.data.Offset;
+import org.junit.jupiter.api.Test;
+
+class JmxMetricsFormatterTest {
+
+ /**
+ * Original format is here.
+ */
+ @Test
+ void convertsJmxMetricsAccordingToJmxExporterFormat() throws Exception {
+ List metrics = JmxMetricsFormatter.constructMetricsList(
+ new ObjectName(
+ "kafka.server:type=Some.BrokerTopic-Metrics,name=BytesOutPer-Sec,topic=test,some-lbl=123"),
+ new MBeanAttributeInfo[] {
+ createMbeanInfo("FifteenMinuteRate"),
+ createMbeanInfo("Mean"),
+ createMbeanInfo("Calls-count"),
+ createMbeanInfo("SkipValue"),
+ },
+ new Object[] {
+ 123.0,
+ 100.0,
+ 10L,
+ "string values not supported"
+ }
+ );
+
+ assertThat(metrics).hasSize(3);
+
+ assertMetricsEqual(
+ RawMetric.create(
+ "kafka_server_Some_BrokerTopic_Metrics_FifteenMinuteRate",
+ Map.of("name", "BytesOutPer-Sec", "topic", "test", "some_lbl", "123"),
+ BigDecimal.valueOf(123.0)
+ ),
+ metrics.get(0)
+ );
+
+ assertMetricsEqual(
+ RawMetric.create(
+ "kafka_server_Some_BrokerTopic_Metrics_Mean",
+ Map.of("name", "BytesOutPer-Sec", "topic", "test", "some_lbl", "123"),
+ BigDecimal.valueOf(100.0)
+ ),
+ metrics.get(1)
+ );
+
+ assertMetricsEqual(
+ RawMetric.create(
+ "kafka_server_Some_BrokerTopic_Metrics_Calls_count",
+ Map.of("name", "BytesOutPer-Sec", "topic", "test", "some_lbl", "123"),
+ BigDecimal.valueOf(10)
+ ),
+ metrics.get(2)
+ );
+ }
+
+ private static MBeanAttributeInfo createMbeanInfo(String name) {
+ return new MBeanAttributeInfo(name, "sometype-notused", null, true, true, false, null);
+ }
+
+ private void assertMetricsEqual(RawMetric expected, RawMetric actual) {
+ assertThat(actual.name()).isEqualTo(expected.name());
+ assertThat(actual.labels()).isEqualTo(expected.labels());
+ assertThat(actual.value()).isCloseTo(expected.value(), Offset.offset(new BigDecimal("0.001")));
+ }
+
+}
\ No newline at end of file
diff --git a/kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/metrics/PrometheusEndpointMetricsParserTest.java b/kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/metrics/PrometheusEndpointMetricsParserTest.java
new file mode 100644
index 0000000000..294215c8b1
--- /dev/null
+++ b/kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/metrics/PrometheusEndpointMetricsParserTest.java
@@ -0,0 +1,30 @@
+package com.provectus.kafka.ui.service.metrics;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.util.Map;
+import java.util.Optional;
+import org.junit.jupiter.api.Test;
+
+class PrometheusEndpointMetricsParserTest {
+
+ @Test
+ void test() {
+ String metricsString =
+ "kafka_server_BrokerTopicMetrics_FifteenMinuteRate"
+ + "{name=\"BytesOutPerSec\",topic=\"__confluent.support.metrics\",} 123.1234";
+
+ Optional parsedOpt = PrometheusEndpointMetricsParser.parse(metricsString);
+
+ assertThat(parsedOpt).hasValueSatisfying(metric -> {
+ assertThat(metric.name()).isEqualTo("kafka_server_BrokerTopicMetrics_FifteenMinuteRate");
+ assertThat(metric.value()).isEqualTo("123.1234");
+ assertThat(metric.labels()).containsExactlyEntriesOf(
+ Map.of(
+ "name", "BytesOutPerSec",
+ "topic", "__confluent.support.metrics"
+ ));
+ });
+ }
+
+}
\ No newline at end of file
diff --git a/kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/metrics/PrometheusMetricsRetrieverTest.java b/kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/metrics/PrometheusMetricsRetrieverTest.java
new file mode 100644
index 0000000000..36d6c87581
--- /dev/null
+++ b/kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/metrics/PrometheusMetricsRetrieverTest.java
@@ -0,0 +1,64 @@
+package com.provectus.kafka.ui.service.metrics;
+
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.util.Map;
+import okhttp3.mockwebserver.MockResponse;
+import okhttp3.mockwebserver.MockWebServer;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.springframework.web.reactive.function.client.WebClient;
+import reactor.test.StepVerifier;
+
+class PrometheusMetricsRetrieverTest {
+
+ private final PrometheusMetricsRetriever retriever = new PrometheusMetricsRetriever(WebClient.create());
+
+ private final MockWebServer mockWebServer = new MockWebServer();
+
+ @BeforeEach
+ void startMockServer() throws IOException {
+ mockWebServer.start();
+ }
+
+ @AfterEach
+ void stopMockServer() throws IOException {
+ mockWebServer.close();
+ }
+
+ @Test
+ void callsMetricsEndpointAndConvertsResponceToRawMetric() {
+ var url = mockWebServer.url("/metrics");
+ // body copied from real jmx exporter
+ MockResponse response = new MockResponse().setBody(
+ "# HELP kafka_server_KafkaRequestHandlerPool_FifteenMinuteRate Attribute exposed for management \n"
+ + "# TYPE kafka_server_KafkaRequestHandlerPool_FifteenMinuteRate untyped\n"
+ + "kafka_server_KafkaRequestHandlerPool_FifteenMinuteRate{name=\"RequestHandlerAvgIdlePercent\",} 0.898\n"
+ + "# HELP kafka_server_socket_server_metrics_request_size_avg The average size of requests sent. \n"
+ + "# TYPE kafka_server_socket_server_metrics_request_size_avg untyped\n"
+ + "kafka_server_socket_server_metrics_request_size_avg{listener=\"PLAIN\",networkProcessor=\"1\",} 101.1\n"
+ + "kafka_server_socket_server_metrics_request_size_avg{listener=\"PLAIN2\",networkProcessor=\"5\",} NaN"
+ );
+ mockWebServer.enqueue(response);
+
+ var firstMetric = RawMetric.create(
+ "kafka_server_KafkaRequestHandlerPool_FifteenMinuteRate",
+ Map.of("name", "RequestHandlerAvgIdlePercent"),
+ new BigDecimal("0.898")
+ );
+
+ var second = RawMetric.create(
+ "kafka_server_socket_server_metrics_request_size_avg",
+ Map.of("listener", "PLAIN", "networkProcessor", "1"),
+ new BigDecimal("101.1")
+ );
+
+ StepVerifier.create(retriever.retrieve(url.host(), url.port(), false))
+ .expectNext(firstMetric)
+ .expectNext(second)
+ // third metric should not be present, since it has "NaN" value
+ .verifyComplete();
+ }
+
+}
\ No newline at end of file
diff --git a/kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/metrics/WellKnownMetricsTest.java b/kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/metrics/WellKnownMetricsTest.java
new file mode 100644
index 0000000000..0917b68466
--- /dev/null
+++ b/kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/metrics/WellKnownMetricsTest.java
@@ -0,0 +1,66 @@
+package com.provectus.kafka.ui.service.metrics;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import com.provectus.kafka.ui.model.KafkaCluster;
+import com.provectus.kafka.ui.model.Metrics;
+import java.math.BigDecimal;
+import java.util.Map;
+import org.apache.kafka.common.Node;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.CsvSource;
+
+class WellKnownMetricsTest {
+
+ private final WellKnownMetrics wellKnownMetrics = new WellKnownMetrics();
+
+ @ParameterizedTest
+ @CsvSource({
+ //default jmx exporter format
+ //example: kafka_server_BrokerTopicMetrics_FifteenMinuteRate{name="BytesInPerSec",topic="test-topic",} 222.0
+ //example: kafka_server_BrokerTopicMetrics_FifteenMinuteRate{name="BytesOutPerSec",topic="test-topic",} 111.0
+ "kafka_server_BrokerTopicMetrics_FifteenMinuteRate, BytesInPerSec, BytesOutPerSec",
+
+ //default jmx exporter format, but lower-cased
+ //example: kafka_server_brokertopicmetrics_fifteenminuterate{name="bytesinpersec",topic="test-topic",} 222.0
+ //example: kafka_server_brokertopicmetrics_fifteenminuterate{name="bytesoutpersec",topic="test-topic",} 111.0
+ "kafka_server_brokertopicmetrics_fifteenminuterate, bytesinpersec, bytesoutpersec",
+
+ //unknown prefix metric name
+ "some_unknown_prefix_brokertopicmetrics_fifteenminuterate, bytesinpersec, bytesoutpersec",
+ })
+ void bytesIoTopicMetricsPopulated(String metricName, String bytesInLabel, String bytesOutLabel) {
+ var clusterParam = KafkaCluster.builder().build();
+ var nodeParam = new Node(0, "host", 123);
+
+ var in = RawMetric.create(metricName, Map.of("name", bytesInLabel, "topic", "test-topic"), new BigDecimal("1.0"));
+ var out = RawMetric.create(metricName, Map.of("name", bytesOutLabel, "topic", "test-topic"), new BigDecimal("2.0"));
+
+ // feeding metrics
+ for (int i = 0; i < 3; i++) {
+ wellKnownMetrics.populate(clusterParam, nodeParam, in);
+ wellKnownMetrics.populate(clusterParam, nodeParam, out);
+ }
+
+ assertThat(wellKnownMetrics.bytesInFifteenMinuteRate)
+ .containsEntry("test-topic", new BigDecimal("3.0"));
+
+ assertThat(wellKnownMetrics.bytesOutFifteenMinuteRate)
+ .containsEntry("test-topic", new BigDecimal("6.0"));
+ }
+
+ @Test
+ void appliesInnerStateToMetricsBuilder() {
+ wellKnownMetrics.bytesInFifteenMinuteRate.put("topic", new BigDecimal(1));
+ wellKnownMetrics.bytesOutFifteenMinuteRate.put("topic", new BigDecimal(2));
+
+ Metrics.MetricsBuilder builder = Metrics.builder();
+ wellKnownMetrics.apply(builder);
+ var metrics = builder.build();
+
+ assertThat(metrics.getBytesInPerSec()).containsExactlyEntriesOf(wellKnownMetrics.bytesInFifteenMinuteRate);
+ assertThat(metrics.getBytesOutPerSec()).containsExactlyEntriesOf(wellKnownMetrics.bytesOutFifteenMinuteRate);
+ }
+
+}
\ No newline at end of file
diff --git a/kafka-ui-api/src/test/java/com/provectus/kafka/ui/util/NumberUtilTest.java b/kafka-ui-api/src/test/java/com/provectus/kafka/ui/util/NumberUtilTest.java
deleted file mode 100644
index 1c85d65535..0000000000
--- a/kafka-ui-api/src/test/java/com/provectus/kafka/ui/util/NumberUtilTest.java
+++ /dev/null
@@ -1,28 +0,0 @@
-package com.provectus.kafka.ui.util;
-
-import org.junit.jupiter.api.Assertions;
-import org.junit.jupiter.api.Test;
-
-class NumberUtilTest {
-
- @Test
- void shouldReturnFalseWhenNonNumeric() {
- Assertions.assertFalse(NumberUtil.isNumeric(Double.POSITIVE_INFINITY));
- Assertions.assertFalse(NumberUtil.isNumeric(Double.NEGATIVE_INFINITY));
- Assertions.assertFalse(NumberUtil.isNumeric(Double.NaN));
- Assertions.assertFalse(NumberUtil.isNumeric(null));
- Assertions.assertFalse(NumberUtil.isNumeric(" "));
- Assertions.assertFalse(NumberUtil.isNumeric(new Object()));
- Assertions.assertFalse(NumberUtil.isNumeric("1231asd"));
- }
-
- @Test
- void shouldReturnTrueWhenNumeric() {
- Assertions.assertTrue(NumberUtil.isNumeric("123.45"));
- Assertions.assertTrue(NumberUtil.isNumeric(123.45));
- Assertions.assertTrue(NumberUtil.isNumeric(123));
- Assertions.assertTrue(NumberUtil.isNumeric(-123.45));
- Assertions.assertTrue(NumberUtil.isNumeric(-1e-10));
- Assertions.assertTrue(NumberUtil.isNumeric(1e-10));
- }
-}
\ No newline at end of file
diff --git a/kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml b/kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml
index 86e939562a..68b8afa65a 100644
--- a/kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml
+++ b/kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml
@@ -2572,16 +2572,12 @@ components:
properties:
name:
type: string
- canonicalName:
- type: string
- params:
+ labels:
type: string
additionalProperties:
type: string
value:
- type: string
- additionalProperties:
- type: number
+ type: number
TopicLogdirs:
type: object
diff --git a/pom.xml b/pom.xml
index c8b8d5d5c3..c01fb507dd 100644
--- a/pom.xml
+++ b/pom.xml
@@ -37,6 +37,7 @@
7.0.1
2.11.1
1.17.1
+ 4.10.0
5.7.2
2.21.0
3.19.0