|
@@ -1,10 +1,8 @@
|
|
|
package com.provectus.kafka.ui.cluster.util;
|
|
|
|
|
|
-import com.provectus.kafka.ui.cluster.model.KafkaCluster;
|
|
|
+import lombok.RequiredArgsConstructor;
|
|
|
import lombok.extern.slf4j.Slf4j;
|
|
|
import org.apache.commons.pool2.KeyedObjectPool;
|
|
|
-import org.apache.kafka.common.Node;
|
|
|
-import org.springframework.beans.factory.annotation.Autowired;
|
|
|
import org.springframework.stereotype.Component;
|
|
|
|
|
|
import javax.management.*;
|
|
@@ -19,10 +17,10 @@ import java.util.Map;
|
|
|
|
|
|
@Component
|
|
|
@Slf4j
|
|
|
+@RequiredArgsConstructor
|
|
|
public class JmxClusterUtil {
|
|
|
|
|
|
- @Autowired
|
|
|
- private KeyedObjectPool pool;
|
|
|
+ private final KeyedObjectPool<String, JMXConnector> pool;
|
|
|
|
|
|
private static final String JMX_URL = "service:jmx:rmi:///jndi/rmi://";
|
|
|
private static final String JMX_SERVICE_TYPE = "jmxrmi";
|
|
@@ -39,7 +37,7 @@ public class JmxClusterUtil {
|
|
|
Map<String, BigDecimal> result = new HashMap<>();
|
|
|
JMXConnector srv = null;
|
|
|
try {
|
|
|
- srv = (JMXConnector) pool.borrowObject(jmxUrl);
|
|
|
+ srv = pool.borrowObject(jmxUrl);
|
|
|
MBeanServerConnection msc = srv.getMBeanServerConnection();
|
|
|
ObjectName name = metricName.equals(BYTES_IN_PER_SEC) ? new ObjectName(BYTES_IN_PER_SEC_MBEAN_OBJECT_NAME) :
|
|
|
new ObjectName(BYTES_OUT_PER_SEC_MBEAN_OBJECT_NAME);
|
|
@@ -64,13 +62,11 @@ public class JmxClusterUtil {
|
|
|
log.error("Error while retrieving connection {} from pool", jmxUrl);
|
|
|
closeConnectionExceptionally(jmxUrl, srv);
|
|
|
}
|
|
|
- finally {
|
|
|
- if (srv != null) {
|
|
|
- try {
|
|
|
- pool.returnObject(jmxUrl, srv);
|
|
|
- } catch (Exception e) {
|
|
|
- log.error("Cannot return object to poll, {}", jmxUrl);
|
|
|
- }
|
|
|
+ if (srv != null) {
|
|
|
+ try {
|
|
|
+ pool.returnObject(jmxUrl, srv);
|
|
|
+ } catch(Exception e){
|
|
|
+ log.error("Cannot return object to poll, {}", jmxUrl);
|
|
|
}
|
|
|
}
|
|
|
return result;
|
|
@@ -86,14 +82,4 @@ public class JmxClusterUtil {
|
|
|
log.error("Cannot invalidate object in pool, {}", url);
|
|
|
}
|
|
|
}
|
|
|
-
|
|
|
-
|
|
|
- public void fillJmxPool(Node broker, KafkaCluster cluster) {
|
|
|
- String jmxUrl = JMX_URL + broker.host() + ":" + cluster.getJmxPort() + "/" + JMX_SERVICE_TYPE;
|
|
|
- try {
|
|
|
- pool.addObject(jmxUrl);
|
|
|
- } catch (Exception e) {
|
|
|
- log.error("Cannot connect to {}", jmxUrl);
|
|
|
- }
|
|
|
- }
|
|
|
}
|