Browse Source

Jmx connections moved to pool, methods moved to separate classes

Roman Nedzvetskiy 5 years ago
parent
commit
2bcd7e6a19

+ 6 - 0
kafka-ui-api/pom.xml

@@ -85,6 +85,12 @@
             <artifactId>reactor-test</artifactId>
             <scope>test</scope>
         </dependency>
+        <dependency>
+            <groupId>org.apache.commons</groupId>
+            <artifactId>commons-dbcp2</artifactId>
+            <version>${apache.commons.version}</version>
+        </dependency>
+
     </dependencies>
 
     <build>

+ 2 - 0
kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/config/ClustersProperties.java

@@ -19,5 +19,7 @@ public class ClustersProperties {
         String name;
         String bootstrapServers;
         String zookeeper;
+        int jmxPort;
+        String jmxHost;
     }
 }

+ 1 - 1
kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/model/KafkaCluster.java

@@ -12,7 +12,7 @@ public class KafkaCluster {
 
     private final String name;
     private final String jmxHost;
-    private final String jmxPort;
+    private final int jmxPort;
     private final String bootstrapServers;
     private final String zookeeper;
     private final ServerStatus status;

+ 0 - 2
kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/service/ClusterService.java

@@ -8,8 +8,6 @@ import com.provectus.kafka.ui.kafka.KafkaService;
 import com.provectus.kafka.ui.model.*;
 import lombok.RequiredArgsConstructor;
 import lombok.SneakyThrows;
-
-import org.apache.kafka.clients.admin.ConsumerGroupListing;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.clients.consumer.OffsetAndMetadata;

+ 9 - 13
kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/service/ConsumingService.java

@@ -1,30 +1,26 @@
 package com.provectus.kafka.ui.cluster.service;
 
+import com.provectus.kafka.ui.cluster.model.KafkaCluster;
+import com.provectus.kafka.ui.cluster.util.ClusterUtil;
+import com.provectus.kafka.ui.kafka.KafkaService;
+import com.provectus.kafka.ui.model.TopicMessage;
 import lombok.RequiredArgsConstructor;
 import lombok.extern.log4j.Log4j2;
-
-import java.time.Duration;
-import java.util.List;
-import java.util.Optional;
-import java.util.stream.Collectors;
-
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.ConsumerRecords;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.utils.Bytes;
 import org.springframework.stereotype.Service;
-
-import com.provectus.kafka.ui.cluster.model.InternalTopic;
-import com.provectus.kafka.ui.cluster.model.KafkaCluster;
-import com.provectus.kafka.ui.cluster.util.ClusterUtil;
-import com.provectus.kafka.ui.kafka.KafkaService;
-import com.provectus.kafka.ui.model.TopicMessage;
-
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.FluxSink;
 import reactor.core.scheduler.Schedulers;
 
+import java.time.Duration;
+import java.util.List;
+import java.util.Optional;
+import java.util.stream.Collectors;
+
 @Service
 @Log4j2
 @RequiredArgsConstructor

+ 0 - 1
kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/service/MetricsUpdateService.java

@@ -2,7 +2,6 @@ package com.provectus.kafka.ui.cluster.service;
 
 import com.provectus.kafka.ui.cluster.model.KafkaCluster;
 import com.provectus.kafka.ui.kafka.KafkaService;
-import com.provectus.kafka.ui.zookeeper.ZookeeperService;
 import lombok.RequiredArgsConstructor;
 import lombok.extern.log4j.Log4j2;
 import org.springframework.stereotype.Service;

+ 2 - 30
kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/util/ClusterUtil.java

@@ -33,12 +33,9 @@ import static org.apache.kafka.common.config.TopicConfig.MESSAGE_FORMAT_VERSION_
 @Slf4j
 public class ClusterUtil {
 
-    public static final String BYTES_IN_PER_SEC = "BytesInPerSec";
-    public static final String BYTES_OUT_PER_SEC = "BytesOutPerSec";
-    private static final String BYTES_IN_PER_SEC_MBEAN_OBJECT_NAME = "kafka.server:type=BrokerTopicMetrics,name=" + BYTES_IN_PER_SEC;
-    private static final String BYTES_OUT_PER_SEC_MBEAN_OBJECT_NAME = "kafka.server:type=BrokerTopicMetrics,name=" + BYTES_OUT_PER_SEC;
 
-    private static final List<String> attrNames = Arrays.asList("OneMinuteRate", "FiveMinuteRate", "FifteenMinuteRate");
+
+
 
     private static final String CLUSTER_VERSION_PARAM_KEY = "inter.broker.protocol.version";
 
@@ -236,31 +233,6 @@ public class ClusterUtil {
         return topic;
     }
 
-    public static Map<String, String> getJmxTrafficMetrics(String jmxUrl, String metricName) {
-        Map<String, String> result = new HashMap<>();
-        try {
-            JMXServiceURL url = new JMXServiceURL(jmxUrl);
-            JMXConnector srv = JMXConnectorFactory.connect(url);
-            MBeanServerConnection msc = srv.getMBeanServerConnection();
-            ObjectName name = metricName.equals(BYTES_IN_PER_SEC) ? new ObjectName(BYTES_IN_PER_SEC_MBEAN_OBJECT_NAME) :
-                    new ObjectName(BYTES_OUT_PER_SEC_MBEAN_OBJECT_NAME);
-            for (String attrName : attrNames) {
-                result.put(attrName, msc.getAttribute(name, attrName).toString());
-            }
-        } catch (MalformedURLException url) {
-            log.error("Cannot create JmxServiceUrl from {}", jmxUrl);
-        } catch (IOException io) {
-            log.error("Cannot connect to KafkaJmxServer with url {}", jmxUrl);
-        } catch (MBeanException | AttributeNotFoundException | InstanceNotFoundException | ReflectionException e) {
-            log.error("Cannot find attribute from");
-            log.error(e.getMessage());
-        } catch (MalformedObjectNameException objectNameE) {
-            log.error("Cannot create objectName");
-            log.error(objectNameE.getMessage());
-        }
-        return result;
-    }
-
     public static <T, R> Map<T, R> toSingleMap (Stream<Map<T, R>> streamOfMaps) {
         return streamOfMaps.reduce((map1, map2) -> Stream.concat(map1.entrySet().stream(), map2.entrySet().stream())
                 .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue))).orElseThrow();

+ 77 - 0
kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/util/JmxClusterUtil.java

@@ -0,0 +1,77 @@
+package com.provectus.kafka.ui.cluster.util;
+
+import com.provectus.kafka.ui.cluster.model.ClustersStorage;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.pool2.KeyedObjectPool;
+import org.apache.commons.pool2.impl.GenericKeyedObjectPool;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
+
+import javax.annotation.PostConstruct;
+import javax.management.*;
+import javax.management.remote.JMXConnector;
+import java.io.IOException;
+import java.net.MalformedURLException;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+@Component
+@Slf4j
+public class JmxClusterUtil {
+
+    @Autowired
+    private ClustersStorage clustersStorage;
+
+    private static final String JMX_URL = "service:jmx:rmi:///jndi/rmi://";
+    private static final String JMX_SERVICE_TYPE = "jmxrmi";
+
+    public static final String BYTES_IN_PER_SEC = "BytesInPerSec";
+    public static final String BYTES_OUT_PER_SEC = "BytesOutPerSec";
+    private static final String BYTES_IN_PER_SEC_MBEAN_OBJECT_NAME = "kafka.server:type=BrokerTopicMetrics,name=" + BYTES_IN_PER_SEC;
+    private static final String BYTES_OUT_PER_SEC_MBEAN_OBJECT_NAME = "kafka.server:type=BrokerTopicMetrics,name=" + BYTES_OUT_PER_SEC;
+
+    private static final List<String> attrNames = Arrays.asList("OneMinuteRate", "FiveMinuteRate", "FifteenMinuteRate");
+
+    private static KeyedObjectPool pool = new GenericKeyedObjectPool(new JmxPoolFactory());
+
+    public static Map<String, String> getJmxTrafficMetrics(int jmxPort, String jmxHost, String metricName) {
+        String jmxUrl = JMX_URL + jmxHost + ":" + jmxPort + "/" + JMX_SERVICE_TYPE;
+        Map<String, String> result = new HashMap<>();
+        try {
+            JMXConnector srv = (JMXConnector) pool.borrowObject(jmxUrl);
+            MBeanServerConnection msc = srv.getMBeanServerConnection();
+            ObjectName name = metricName.equals(BYTES_IN_PER_SEC) ? new ObjectName(BYTES_IN_PER_SEC_MBEAN_OBJECT_NAME) :
+                    new ObjectName(BYTES_OUT_PER_SEC_MBEAN_OBJECT_NAME);
+            for (String attrName : attrNames) {
+                result.put(attrName, msc.getAttribute(name, attrName).toString());
+            }
+        } catch (MalformedURLException url) {
+            log.error("Cannot create JmxServiceUrl from {}", jmxUrl);
+        } catch (IOException io) {
+            log.error("Cannot connect to KafkaJmxServer with url {}", jmxUrl);
+        } catch (MBeanException | AttributeNotFoundException | InstanceNotFoundException | ReflectionException e) {
+            log.error("Cannot find attribute from");
+            log.error(e.getMessage());
+        } catch (MalformedObjectNameException objectNameE) {
+            log.error("Cannot create objectName");
+            log.error(objectNameE.getMessage());
+        } catch (Exception e) {
+            log.error("Error while retrieving connection {} from pool", jmxUrl);
+        }
+        return result;
+    }
+
+    @PostConstruct
+    public void fillJmxPool() {
+        clustersStorage.getKafkaClusters().stream().forEach(c -> {
+            String jmxUrl = JMX_URL + c.getJmxHost() + ":" + c.getJmxPort() + "/" + JMX_SERVICE_TYPE;
+            try {
+                pool.addObject(jmxUrl);
+            } catch (Exception e) {
+                log.error("Cannot connect to {}", jmxUrl);
+            }
+        });
+    }
+}

+ 22 - 0
kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/util/JmxPoolFactory.java

@@ -0,0 +1,22 @@
+package com.provectus.kafka.ui.cluster.util;
+
+import org.apache.commons.pool2.BaseKeyedPooledObjectFactory;
+import org.apache.commons.pool2.PooledObject;
+import org.apache.commons.pool2.impl.DefaultPooledObject;
+
+import javax.management.remote.JMXConnector;
+import javax.management.remote.JMXConnectorFactory;
+import javax.management.remote.JMXServiceURL;
+
+public class JmxPoolFactory extends BaseKeyedPooledObjectFactory<String, JMXConnector> {
+
+    @Override
+    public JMXConnector create(String s) throws Exception {
+        return JMXConnectorFactory.connect(new JMXServiceURL(s));
+    }
+
+    @Override
+    public PooledObject<JMXConnector> wrap(JMXConnector jmxConnector) {
+        return new DefaultPooledObject<>(jmxConnector);
+    }
+}

+ 5 - 6
kafka-ui-api/src/main/java/com/provectus/kafka/ui/kafka/KafkaService.java

@@ -3,6 +3,7 @@ package com.provectus.kafka.ui.kafka;
 import com.provectus.kafka.ui.cluster.config.KafkaJmxProperties;
 import com.provectus.kafka.ui.cluster.model.*;
 import com.provectus.kafka.ui.cluster.util.ClusterUtil;
+import com.provectus.kafka.ui.cluster.util.JmxClusterUtil;
 import com.provectus.kafka.ui.model.ConsumerGroup;
 import com.provectus.kafka.ui.model.ServerStatus;
 import com.provectus.kafka.ui.model.Topic;
@@ -54,7 +55,7 @@ public class KafkaService {
     @SneakyThrows
     public Mono<KafkaCluster> getUpdatedCluster(KafkaCluster cluster) {
         return getOrCreateAdminClient(cluster).flatMap(
-                ac -> getClusterMetrics(ac.getAdminClient())
+                ac -> getClusterMetrics(cluster, ac.getAdminClient())
 
                         .flatMap( clusterMetrics ->
                             getTopicsData(ac.getAdminClient()).flatMap( topics ->
@@ -175,7 +176,7 @@ public class KafkaService {
                     .map( m -> m.values().stream().map(ClusterUtil::mapToInternalTopic).collect(Collectors.toList()));
     }
 
-    private Mono<InternalClusterMetrics> getClusterMetrics(AdminClient client) {
+    private Mono<InternalClusterMetrics> getClusterMetrics(KafkaCluster cluster, AdminClient client) {
         return ClusterUtil.toMono(client.describeCluster().nodes())
                 .flatMap(brokers ->
                     ClusterUtil.toMono(client.describeCluster().controller()).map(
@@ -184,10 +185,8 @@ public class KafkaService {
                             metricsBuilder.brokerCount(brokers.size()).activeControllers(c != null ? 1 : 0);
                             Map<String, String> bytesInPerSec;
                             Map<String, String> bytesOutPerSec;
-                            var jmx = kafkaJmxDto.getJmxParams().stream().filter(j -> j.getBrokerId() == c.id()).findFirst().orElseThrow();
-                            var jmxUrl = jmx.getUrl() + jmx.getHost() + ":" + jmx.getPort() + "/" + jmx.getServiceType();
-                            bytesInPerSec = ClusterUtil.getJmxTrafficMetrics(jmxUrl, ClusterUtil.BYTES_IN_PER_SEC);
-                            bytesOutPerSec = ClusterUtil.getJmxTrafficMetrics(jmxUrl, ClusterUtil.BYTES_OUT_PER_SEC);
+                            bytesInPerSec = JmxClusterUtil.getJmxTrafficMetrics(cluster.getJmxPort(), cluster.getJmxHost(), JmxClusterUtil.BYTES_IN_PER_SEC);
+                            bytesOutPerSec = JmxClusterUtil.getJmxTrafficMetrics(cluster.getJmxPort(), cluster.getJmxHost(), JmxClusterUtil.BYTES_OUT_PER_SEC);
                             metricsBuilder
                                     .internalBrokerMetrics((brokers.stream().map(Node::id).collect(Collectors.toMap(k -> k, v -> InternalBrokerMetrics.builder().build()))))
                                     .bytesOutPerSec(bytesOutPerSec)

+ 6 - 8
kafka-ui-api/src/main/resources/application-local.yml

@@ -1,25 +1,23 @@
 kafka:
-  jmxParams:
-    -
-      clusterName: local
-      brokerid: 1
-      port: 9997
-      host: localhost
-      url: service:jmx:rmi:///jndi/rmi://
-      serviceType: jmxrmi
   clusters:
     -
       name: local
       bootstrapServers: localhost:29091
       zookeeper: localhost:2183
+      jmxPort: 9997
+      jmxHost: localhost
     -
       name: secondLocal
       bootstrapServers: localhost:29092
       zookeeper: localhost:2182
+      jmxPort: 9998
+      jmxHost: localhost
     -
       name: localReplica
       bootstrapServers: localhost:29093
       zookeeper: localhost:2183
+      jmxPort: 9997
+      jmxHost: localhost
   admin-client-timeout: 5000
 zookeeper:
   connection-timeout: 1000

+ 6 - 2
kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml

@@ -303,9 +303,13 @@ components:
         topicCount:
           type: integer
         bytesInPerSec:
-          type: integer
+          type: object
+          additionalProperties:
+            type: string
         bytesOutPerSec:
-          type: integer
+          type: object
+          additionalProperties:
+            type: string
       required:
         - id
         - name

+ 1 - 0
pom.xml

@@ -28,6 +28,7 @@
 		<swagger-annotations.version>1.6.0</swagger-annotations.version>
 		<springdoc-openapi-webflux-ui.version>1.2.32</springdoc-openapi-webflux-ui.version>
 		<kafka.version>2.4.1</kafka.version>
+		<apache.commons.version>2.7.0</apache.commons.version>
 	</properties>
 
 	<groupId>com.provectus</groupId>