diff --git a/kafka-ui-api/pom.xml b/kafka-ui-api/pom.xml
index c97c23104b..23b54c1087 100644
--- a/kafka-ui-api/pom.xml
+++ b/kafka-ui-api/pom.xml
@@ -100,6 +100,12 @@
reactor-test
test
+
+ org.apache.commons
+ commons-pool2
+ ${apache.commons.version}
+
+
diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/config/ClustersProperties.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/config/ClustersProperties.java
index f77ee01b15..851714c4b0 100644
--- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/config/ClustersProperties.java
+++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/config/ClustersProperties.java
@@ -21,5 +21,6 @@ public class ClustersProperties {
String zookeeper;
String schemaRegistry;
String schemaNameTemplate = "%s-value";
+ int jmxPort;
}
}
diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/config/Config.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/config/Config.java
new file mode 100644
index 0000000000..83c86553c9
--- /dev/null
+++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/config/Config.java
@@ -0,0 +1,38 @@
+package com.provectus.kafka.ui.cluster.config;
+
+import com.provectus.kafka.ui.cluster.util.JmxPoolFactory;
+import org.apache.commons.pool2.KeyedObjectPool;
+import org.apache.commons.pool2.impl.GenericKeyedObjectPool;
+import org.apache.commons.pool2.impl.GenericKeyedObjectPoolConfig;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.jmx.export.MBeanExporter;
+
+import javax.management.remote.JMXConnector;
+
+@Configuration
+public class Config {
+
+ @Bean
+ public KeyedObjectPool pool() {
+ GenericKeyedObjectPool pool = new GenericKeyedObjectPool<>(new JmxPoolFactory());
+ pool.setConfig(poolConfig());
+ return pool;
+ }
+
+ private GenericKeyedObjectPoolConfig poolConfig() {
+ GenericKeyedObjectPoolConfig poolConfig = new GenericKeyedObjectPoolConfig();
+ poolConfig.setMaxIdlePerKey(3);
+ poolConfig.setMaxTotalPerKey(3);
+ return poolConfig;
+ }
+
+ @Bean
+ public MBeanExporter exporter()
+ {
+ final MBeanExporter exporter = new MBeanExporter();
+ exporter.setAutodetect(true);
+ exporter.setExcludedBeans("pool");
+ return exporter;
+ }
+}
diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/model/InternalClusterMetrics.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/model/InternalClusterMetrics.java
index 3fa72b61db..35671ea7b1 100644
--- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/model/InternalClusterMetrics.java
+++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/model/InternalClusterMetrics.java
@@ -3,6 +3,7 @@ package com.provectus.kafka.ui.cluster.model;
import lombok.Builder;
import lombok.Data;
+import java.math.BigDecimal;
import java.util.Map;
@@ -18,11 +19,9 @@ public class InternalClusterMetrics {
private final int offlinePartitionCount;
private final int inSyncReplicasCount;
private final int outOfSyncReplicasCount;
- //TODO: find way to fill
- private final int bytesInPerSec;
- private final int bytesOutPerSec;
+ private final Map bytesInPerSec;
+ private final Map bytesOutPerSec;
private final int segmentCount;
- //TODO: find way to fill
private final long segmentSize;
private final Map internalBrokerMetrics;
private final int zooKeeperStatus;
diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/model/KafkaCluster.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/model/KafkaCluster.java
index c4592f42d7..daa1da98f5 100644
--- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/model/KafkaCluster.java
+++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/model/KafkaCluster.java
@@ -11,8 +11,7 @@ import java.util.Map;
public class KafkaCluster {
private final String name;
- private final String jmxHost;
- private final String jmxPort;
+ private final int jmxPort;
private final String bootstrapServers;
private final String zookeeper;
private final String schemaRegistry;
diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/service/ClusterService.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/service/ClusterService.java
index bad9625f12..d4e3258bbb 100644
--- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/service/ClusterService.java
+++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/service/ClusterService.java
@@ -9,8 +9,6 @@ import com.provectus.kafka.ui.kafka.KafkaService;
import com.provectus.kafka.ui.model.*;
import lombok.RequiredArgsConstructor;
import lombok.SneakyThrows;
-
-import org.apache.kafka.clients.admin.ConsumerGroupListing;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
@@ -20,7 +18,6 @@ import org.springframework.stereotype.Service;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
-import java.time.OffsetDateTime;
import java.util.*;
import java.util.stream.Collectors;
import java.util.stream.Stream;
diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/service/ConsumingService.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/service/ConsumingService.java
index a85b628a04..90b8166e68 100644
--- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/service/ConsumingService.java
+++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/service/ConsumingService.java
@@ -112,7 +112,7 @@ public class ConsumingService {
.collect(Collectors.toMap(
partitionPosition -> new TopicPartition(topic, partitionPosition.getKey()),
Map.Entry::getValue
- ));
+ ));
consumer.offsetsForTimes(timestampsToSearch)
.forEach((topicPartition, offsetAndTimestamp) ->
consumer.seek(topicPartition, offsetAndTimestamp.offset())
diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/service/MetricsUpdateService.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/service/MetricsUpdateService.java
index 783f17e30b..8860468e20 100644
--- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/service/MetricsUpdateService.java
+++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/service/MetricsUpdateService.java
@@ -2,7 +2,6 @@ package com.provectus.kafka.ui.cluster.service;
import com.provectus.kafka.ui.cluster.model.KafkaCluster;
import com.provectus.kafka.ui.kafka.KafkaService;
-import com.provectus.kafka.ui.zookeeper.ZookeeperService;
import lombok.RequiredArgsConstructor;
import lombok.extern.log4j.Log4j2;
import org.springframework.stereotype.Service;
diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/util/ClusterUtil.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/util/ClusterUtil.java
index 817fd1bf12..1b014544bf 100644
--- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/util/ClusterUtil.java
+++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/util/ClusterUtil.java
@@ -4,8 +4,6 @@ import com.provectus.kafka.ui.cluster.model.*;
import com.provectus.kafka.ui.cluster.deserialization.RecordDeserializer;
import com.provectus.kafka.ui.model.*;
import lombok.extern.slf4j.Slf4j;
-import com.provectus.kafka.ui.model.TopicMessage;
-
import org.apache.kafka.clients.admin.*;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
@@ -15,13 +13,11 @@ import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.config.ConfigResource;
import org.apache.kafka.common.record.TimestampType;
import org.apache.kafka.common.utils.Bytes;
-
import reactor.core.publisher.Mono;
import java.time.Instant;
import java.time.OffsetDateTime;
import java.time.ZoneId;
-import java.util.HashMap;
import java.util.*;
import java.util.stream.Collectors;
import java.util.stream.Stream;
@@ -32,13 +28,17 @@ import static org.apache.kafka.common.config.TopicConfig.MESSAGE_FORMAT_VERSION_
@Slf4j
public class ClusterUtil {
+
+
+
+
private static final String CLUSTER_VERSION_PARAM_KEY = "inter.broker.protocol.version";
private static final ZoneId UTC_ZONE_ID = ZoneId.of("UTC");
- public static Mono toMono(KafkaFuture future){
- return Mono.create(sink -> future.whenComplete((res, ex)->{
- if (ex!=null) {
+ public static Mono toMono(KafkaFuture future) {
+ return Mono.create(sink -> future.whenComplete((res, ex) -> {
+ if (ex != null) {
sink.error(ex);
} else {
sink.success(res);
@@ -46,9 +46,9 @@ public class ClusterUtil {
}));
}
- public static Mono toMono(KafkaFuture future, String topicName){
- return Mono.create(sink -> future.whenComplete((res, ex)->{
- if (ex!=null) {
+ public static Mono toMono(KafkaFuture future, String topicName) {
+ return Mono.create(sink -> future.whenComplete((res, ex) -> {
+ if (ex != null) {
sink.error(ex);
} else {
sink.success(topicName);
@@ -111,7 +111,7 @@ public class ClusterUtil {
partitionDto.inSyncReplicasCount(partition.isr().size());
partitionDto.replicasCount(partition.replicas().size());
List replicas = partition.replicas().stream().map(
- r -> new InternalReplica(r.id(), partition.leader().id()!=r.id(), partition.isr().contains(r)))
+ r -> new InternalReplica(r.id(), partition.leader().id() != r.id(), partition.isr().contains(r)))
.collect(Collectors.toList());
partitionDto.replicas(replicas);
return partitionDto.build();
@@ -185,6 +185,7 @@ public class ClusterUtil {
throw new IllegalArgumentException("Unknown timestampType: " + timestampType);
}
}
+
public static Mono> getSupportedFeatures(AdminClient adminClient) {
return ClusterUtil.toMono(adminClient.describeCluster().controller())
.map(Node::id)
@@ -210,7 +211,7 @@ public class ClusterUtil {
}
}
- public static Topic convertToTopic (InternalTopic internalTopic) {
+ public static Topic convertToTopic(InternalTopic internalTopic) {
Topic topic = new Topic();
topic.setName(internalTopic.getName());
List partitions = internalTopic.getPartitions().stream().flatMap(s -> {
diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/util/JmxClusterUtil.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/util/JmxClusterUtil.java
new file mode 100644
index 0000000000..02e3c28e99
--- /dev/null
+++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/util/JmxClusterUtil.java
@@ -0,0 +1,74 @@
+package com.provectus.kafka.ui.cluster.util;
+
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.pool2.KeyedObjectPool;
+import org.springframework.stereotype.Component;
+
+import javax.management.*;
+import javax.management.remote.JMXConnector;
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.net.MalformedURLException;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+@Component
+@Slf4j
+@RequiredArgsConstructor
+public class JmxClusterUtil {
+
+ private final KeyedObjectPool pool;
+
+ private static final String JMX_URL = "service:jmx:rmi:///jndi/rmi://";
+ private static final String JMX_SERVICE_TYPE = "jmxrmi";
+
+ public static final String BYTES_IN_PER_SEC = "BytesInPerSec";
+ public static final String BYTES_OUT_PER_SEC = "BytesOutPerSec";
+ private static final String BYTES_IN_PER_SEC_MBEAN_OBJECT_NAME = "kafka.server:type=BrokerTopicMetrics,name=" + BYTES_IN_PER_SEC;
+ private static final String BYTES_OUT_PER_SEC_MBEAN_OBJECT_NAME = "kafka.server:type=BrokerTopicMetrics,name=" + BYTES_OUT_PER_SEC;
+
+ private static final List attrNames = Arrays.asList("OneMinuteRate", "FiveMinuteRate", "FifteenMinuteRate");
+
+ public Map getJmxTrafficMetrics(int jmxPort, String jmxHost, String metricName) {
+ String jmxUrl = JMX_URL + jmxHost + ":" + jmxPort + "/" + JMX_SERVICE_TYPE;
+ Map result = new HashMap<>();
+ JMXConnector srv = null;
+ try {
+ srv = pool.borrowObject(jmxUrl);
+ MBeanServerConnection msc = srv.getMBeanServerConnection();
+ ObjectName name = metricName.equals(BYTES_IN_PER_SEC) ? new ObjectName(BYTES_IN_PER_SEC_MBEAN_OBJECT_NAME) :
+ new ObjectName(BYTES_OUT_PER_SEC_MBEAN_OBJECT_NAME);
+ for (String attrName : attrNames) {
+ result.put(attrName, BigDecimal.valueOf((Double) msc.getAttribute(name, attrName)));
+ }
+ pool.returnObject(jmxUrl, srv);
+ } catch (MalformedURLException url) {
+ log.error("Cannot create JmxServiceUrl from {}", jmxUrl);
+ closeConnectionExceptionally(jmxUrl, srv);
+ } catch (IOException io) {
+ log.error("Cannot connect to KafkaJmxServer with url {}", jmxUrl);
+ closeConnectionExceptionally(jmxUrl, srv);
+ } catch (MBeanException | AttributeNotFoundException | InstanceNotFoundException | ReflectionException e) {
+ log.error("Cannot find attribute", e);
+ closeConnectionExceptionally(jmxUrl, srv);
+ } catch (MalformedObjectNameException objectNameE) {
+ log.error("Cannot create objectName", objectNameE);
+ closeConnectionExceptionally(jmxUrl, srv);
+ } catch (Exception e) {
+ log.error("Error while retrieving connection {} from pool", jmxUrl);
+ closeConnectionExceptionally(jmxUrl, srv);
+ }
+ return result;
+ }
+
+ private void closeConnectionExceptionally(String url, JMXConnector srv) {
+ try {
+ pool.invalidateObject(url, srv);
+ } catch (Exception e) {
+ log.error("Cannot invalidate object in pool, {}", url);
+ }
+ }
+}
diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/util/JmxPoolFactory.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/util/JmxPoolFactory.java
new file mode 100644
index 0000000000..89ce72c60f
--- /dev/null
+++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/util/JmxPoolFactory.java
@@ -0,0 +1,34 @@
+package com.provectus.kafka.ui.cluster.util;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.pool2.BaseKeyedPooledObjectFactory;
+import org.apache.commons.pool2.PooledObject;
+import org.apache.commons.pool2.impl.DefaultPooledObject;
+
+import javax.management.remote.JMXConnector;
+import javax.management.remote.JMXConnectorFactory;
+import javax.management.remote.JMXServiceURL;
+import java.io.IOException;
+
+@Slf4j
+public class JmxPoolFactory extends BaseKeyedPooledObjectFactory {
+
+ @Override
+ public JMXConnector create(String s) throws Exception {
+ return JMXConnectorFactory.connect(new JMXServiceURL(s));
+ }
+
+ @Override
+ public PooledObject wrap(JMXConnector jmxConnector) {
+ return new DefaultPooledObject<>(jmxConnector);
+ }
+
+ @Override
+ public void destroyObject(String key, PooledObject p) {
+ try {
+ p.getObject().close();
+ } catch (IOException e) {
+ log.error("Cannot close connection with {}", key);
+ }
+ }
+}
diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/kafka/KafkaService.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/kafka/KafkaService.java
index aaadef4486..0f6eb011a6 100644
--- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/kafka/KafkaService.java
+++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/kafka/KafkaService.java
@@ -2,6 +2,7 @@ package com.provectus.kafka.ui.kafka;
import com.provectus.kafka.ui.cluster.model.*;
import com.provectus.kafka.ui.cluster.util.ClusterUtil;
+import com.provectus.kafka.ui.cluster.util.JmxClusterUtil;
import com.provectus.kafka.ui.model.ConsumerGroup;
import com.provectus.kafka.ui.model.ServerStatus;
import com.provectus.kafka.ui.model.Topic;
@@ -17,14 +18,15 @@ import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.config.ConfigResource;
-import org.springframework.beans.factory.annotation.Value;
import org.apache.kafka.common.serialization.BytesDeserializer;
import org.apache.kafka.common.utils.Bytes;
+import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import reactor.core.publisher.Mono;
import reactor.util.function.Tuple2;
import reactor.util.function.Tuples;
+import java.math.BigDecimal;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
@@ -43,11 +45,12 @@ public class KafkaService {
private final ZookeeperService zookeeperService;
private final Map adminClientCache = new ConcurrentHashMap<>();
private final Map> leadersCache = new ConcurrentHashMap<>();
+ private final JmxClusterUtil jmxClusterUtil;
@SneakyThrows
public Mono getUpdatedCluster(KafkaCluster cluster) {
return getOrCreateAdminClient(cluster).flatMap(
- ac -> getClusterMetrics(ac.getAdminClient())
+ ac -> getClusterMetrics(cluster, ac.getAdminClient())
.flatMap( clusterMetrics ->
getTopicsData(ac.getAdminClient()).flatMap( topics ->
@@ -156,17 +159,19 @@ public class KafkaService {
.map( m -> m.values().stream().map(ClusterUtil::mapToInternalTopic).collect(Collectors.toList()));
}
- private Mono getClusterMetrics(AdminClient client) {
+ private Mono getClusterMetrics(KafkaCluster cluster, AdminClient client) {
return ClusterUtil.toMono(client.describeCluster().nodes())
.flatMap(brokers ->
ClusterUtil.toMono(client.describeCluster().controller()).map(
c -> {
InternalClusterMetrics.InternalClusterMetricsBuilder metricsBuilder = InternalClusterMetrics.builder();
metricsBuilder.brokerCount(brokers.size()).activeControllers(c != null ? 1 : 0);
- // TODO: fill bytes in/out metrics
+ Map bytesInPerSec = jmxClusterUtil.getJmxTrafficMetrics(cluster.getJmxPort(), c.host(), JmxClusterUtil.BYTES_IN_PER_SEC);
+ Map bytesOutPerSec = jmxClusterUtil.getJmxTrafficMetrics(cluster.getJmxPort(), c.host(), JmxClusterUtil.BYTES_OUT_PER_SEC);
metricsBuilder
- .internalBrokerMetrics((brokers.stream().map(Node::id).collect(Collectors.toMap(k -> k, v -> InternalBrokerMetrics.builder().build()))));
-
+ .internalBrokerMetrics((brokers.stream().map(Node::id).collect(Collectors.toMap(k -> k, v -> InternalBrokerMetrics.builder().build()))))
+ .bytesOutPerSec(bytesOutPerSec)
+ .bytesInPerSec(bytesInPerSec);
return metricsBuilder.build();
}
)
@@ -199,11 +204,6 @@ public class KafkaService {
}
@SneakyThrows
- private Mono getClusterId(AdminClient adminClient) {
- return ClusterUtil.toMono(adminClient.describeCluster().clusterId());
- }
-
-
public Mono getOrCreateAdminClient(KafkaCluster cluster) {
return Mono.justOrEmpty(adminClientCache.get(cluster.getName()))
.switchIfEmpty(createAdminClient(cluster))
diff --git a/kafka-ui-api/src/main/resources/application-local.yml b/kafka-ui-api/src/main/resources/application-local.yml
index 553bfe61a6..16e8107e58 100644
--- a/kafka-ui-api/src/main/resources/application-local.yml
+++ b/kafka-ui-api/src/main/resources/application-local.yml
@@ -6,14 +6,20 @@ kafka:
zookeeper: localhost:2183
schemaRegistry: http://localhost:8085
# schemaNameTemplate: "%s-value"
+ jmxPort: 9997
-
name: secondLocal
bootstrapServers: localhost:29092
zookeeper: localhost:2182
+ jmxPort: 9998
-
name: localReplica
bootstrapServers: localhost:29093
zookeeper: localhost:2183
+ jmxPort: 9997
admin-client-timeout: 5000
zookeeper:
connection-timeout: 1000
+spring:
+ jmx:
+ enabled: true
\ No newline at end of file
diff --git a/kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml b/kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml
index e21372e118..4e515c6c85 100644
--- a/kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml
+++ b/kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml
@@ -304,9 +304,13 @@ components:
topicCount:
type: integer
bytesInPerSec:
- type: integer
+ type: object
+ additionalProperties:
+ type: number
bytesOutPerSec:
- type: integer
+ type: object
+ additionalProperties:
+ type: number
required:
- id
- name
diff --git a/pom.xml b/pom.xml
index 15288234fc..67f8d113cd 100644
--- a/pom.xml
+++ b/pom.xml
@@ -30,6 +30,7 @@
2.4.1
1.9.2
5.5.0
+ 2.2