Browse Source

Fix after previous PR comments - fixed result map, configured pool, removed redundant methods and code

Roman Nedzvetskiy 5 years ago
parent
commit
1e06548d8f

+ 1 - 1
kafka-ui-api/pom.xml

@@ -87,7 +87,7 @@
         </dependency>
         </dependency>
         <dependency>
         <dependency>
             <groupId>org.apache.commons</groupId>
             <groupId>org.apache.commons</groupId>
-            <artifactId>commons-dbcp2</artifactId>
+            <artifactId>commons-pool2</artifactId>
             <version>${apache.commons.version}</version>
             <version>${apache.commons.version}</version>
         </dependency>
         </dependency>
 
 

+ 0 - 1
kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/config/ClustersProperties.java

@@ -20,6 +20,5 @@ public class ClustersProperties {
         String bootstrapServers;
         String bootstrapServers;
         String zookeeper;
         String zookeeper;
         int jmxPort;
         int jmxPort;
-        String jmxHost;
     }
     }
 }
 }

+ 36 - 0
kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/config/Config.java

@@ -0,0 +1,36 @@
+package com.provectus.kafka.ui.cluster.config;
+
+import com.provectus.kafka.ui.cluster.util.JmxPoolFactory;
+import org.apache.commons.pool2.KeyedObjectPool;
+import org.apache.commons.pool2.impl.GenericKeyedObjectPool;
+import org.apache.commons.pool2.impl.GenericKeyedObjectPoolConfig;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.jmx.export.MBeanExporter;
+
+@Configuration
+public class Config {
+
+    @Bean
+    public KeyedObjectPool pool() {
+        GenericKeyedObjectPool pool =  new GenericKeyedObjectPool(new JmxPoolFactory());
+        pool.setConfig(poolConfig());
+        return pool;
+    }
+
+    private GenericKeyedObjectPoolConfig poolConfig() {
+        GenericKeyedObjectPoolConfig poolConfig = new GenericKeyedObjectPoolConfig();
+        poolConfig.setMaxIdlePerKey(3);
+        poolConfig.setMaxTotalPerKey(3);
+        return poolConfig;
+    }
+
+    @Bean
+    public MBeanExporter exporter()
+    {
+        final MBeanExporter exporter = new MBeanExporter();
+        exporter.setAutodetect(true);
+        exporter.setExcludedBeans("pool");
+        return exporter;
+    }
+}

+ 0 - 25
kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/config/KafkaJmxProperties.java

@@ -1,25 +0,0 @@
-package com.provectus.kafka.ui.cluster.config;
-
-import lombok.Data;
-import org.springframework.boot.context.properties.ConfigurationProperties;
-import org.springframework.context.annotation.Configuration;
-
-import java.util.ArrayList;
-import java.util.List;
-
-@Configuration
-@ConfigurationProperties("kafka")
-@Data
-public class KafkaJmxProperties {
-    List<Jmx> jmxParams = new ArrayList<>();
-
-    @Data
-    public static class Jmx {
-        private String clusterName;
-        private int brokerId;
-        private int port;
-        private String host;
-        private String url;
-        private String serviceType;
-    }
-}

+ 3 - 2
kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/model/InternalClusterMetrics.java

@@ -3,6 +3,7 @@ package com.provectus.kafka.ui.cluster.model;
 import lombok.Builder;
 import lombok.Builder;
 import lombok.Data;
 import lombok.Data;
 
 
+import java.math.BigDecimal;
 import java.util.Map;
 import java.util.Map;
 
 
 
 
@@ -18,8 +19,8 @@ public class InternalClusterMetrics {
     private final int offlinePartitionCount;
     private final int offlinePartitionCount;
     private final int inSyncReplicasCount;
     private final int inSyncReplicasCount;
     private final int outOfSyncReplicasCount;
     private final int outOfSyncReplicasCount;
-    private final Map<String, String> bytesInPerSec;
-    private final Map<String, String> bytesOutPerSec;
+    private final Map<String, BigDecimal> bytesInPerSec;
+    private final Map<String, BigDecimal> bytesOutPerSec;
     private final int segmentCount;
     private final int segmentCount;
     private final long segmentSize;
     private final long segmentSize;
     private final Map<Integer, InternalBrokerMetrics> internalBrokerMetrics;
     private final Map<Integer, InternalBrokerMetrics> internalBrokerMetrics;

+ 0 - 1
kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/model/KafkaCluster.java

@@ -11,7 +11,6 @@ import java.util.Map;
 public class KafkaCluster {
 public class KafkaCluster {
 
 
     private final String name;
     private final String name;
-    private final String jmxHost;
     private final int jmxPort;
     private final int jmxPort;
     private final String bootstrapServers;
     private final String bootstrapServers;
     private final String zookeeper;
     private final String zookeeper;

+ 32 - 26
kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/util/JmxClusterUtil.java

@@ -1,17 +1,16 @@
 package com.provectus.kafka.ui.cluster.util;
 package com.provectus.kafka.ui.cluster.util;
 
 
-import com.provectus.kafka.ui.cluster.model.ClustersStorage;
-import lombok.SneakyThrows;
+import com.provectus.kafka.ui.cluster.model.KafkaCluster;
 import lombok.extern.slf4j.Slf4j;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.pool2.KeyedObjectPool;
 import org.apache.commons.pool2.KeyedObjectPool;
-import org.apache.commons.pool2.impl.GenericKeyedObjectPool;
+import org.apache.kafka.common.Node;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Component;
 import org.springframework.stereotype.Component;
 
 
-import javax.annotation.PostConstruct;
 import javax.management.*;
 import javax.management.*;
 import javax.management.remote.JMXConnector;
 import javax.management.remote.JMXConnector;
 import java.io.IOException;
 import java.io.IOException;
+import java.math.BigDecimal;
 import java.net.MalformedURLException;
 import java.net.MalformedURLException;
 import java.util.Arrays;
 import java.util.Arrays;
 import java.util.HashMap;
 import java.util.HashMap;
@@ -23,7 +22,7 @@ import java.util.Map;
 public class JmxClusterUtil {
 public class JmxClusterUtil {
 
 
     @Autowired
     @Autowired
-    private ClustersStorage clustersStorage;
+    private KeyedObjectPool pool;
 
 
     private static final String JMX_URL = "service:jmx:rmi:///jndi/rmi://";
     private static final String JMX_URL = "service:jmx:rmi:///jndi/rmi://";
     private static final String JMX_SERVICE_TYPE = "jmxrmi";
     private static final String JMX_SERVICE_TYPE = "jmxrmi";
@@ -35,11 +34,9 @@ public class JmxClusterUtil {
 
 
     private static final List<String> attrNames = Arrays.asList("OneMinuteRate", "FiveMinuteRate", "FifteenMinuteRate");
     private static final List<String> attrNames = Arrays.asList("OneMinuteRate", "FiveMinuteRate", "FifteenMinuteRate");
 
 
-    private static KeyedObjectPool pool = new GenericKeyedObjectPool(new JmxPoolFactory());
-
-    public static Map<String, String> getJmxTrafficMetrics(int jmxPort, String jmxHost, String metricName) {
+    public Map<String, BigDecimal> getJmxTrafficMetrics(int jmxPort, String jmxHost, String metricName) {
         String jmxUrl = JMX_URL + jmxHost + ":" + jmxPort + "/" + JMX_SERVICE_TYPE;
         String jmxUrl = JMX_URL + jmxHost + ":" + jmxPort + "/" + JMX_SERVICE_TYPE;
-        Map<String, String> result = new HashMap<>();
+        Map<String, BigDecimal> result = new HashMap<>();
         JMXConnector srv = null;
         JMXConnector srv = null;
         try {
         try {
             srv = (JMXConnector) pool.borrowObject(jmxUrl);
             srv = (JMXConnector) pool.borrowObject(jmxUrl);
@@ -47,47 +44,56 @@ public class JmxClusterUtil {
             ObjectName name = metricName.equals(BYTES_IN_PER_SEC) ? new ObjectName(BYTES_IN_PER_SEC_MBEAN_OBJECT_NAME) :
             ObjectName name = metricName.equals(BYTES_IN_PER_SEC) ? new ObjectName(BYTES_IN_PER_SEC_MBEAN_OBJECT_NAME) :
                     new ObjectName(BYTES_OUT_PER_SEC_MBEAN_OBJECT_NAME);
                     new ObjectName(BYTES_OUT_PER_SEC_MBEAN_OBJECT_NAME);
             for (String attrName : attrNames) {
             for (String attrName : attrNames) {
-                result.put(attrName, msc.getAttribute(name, attrName).toString());
+                result.put(attrName, BigDecimal.valueOf((Double) msc.getAttribute(name, attrName)));
             }
             }
         } catch (MalformedURLException url) {
         } catch (MalformedURLException url) {
             log.error("Cannot create JmxServiceUrl from {}", jmxUrl);
             log.error("Cannot create JmxServiceUrl from {}", jmxUrl);
+            closeConnectionExceptionally(jmxUrl, srv);
         } catch (IOException io) {
         } catch (IOException io) {
             log.error("Cannot connect to KafkaJmxServer with url {}", jmxUrl);
             log.error("Cannot connect to KafkaJmxServer with url {}", jmxUrl);
+            closeConnectionExceptionally(jmxUrl, srv);
         } catch (MBeanException | AttributeNotFoundException | InstanceNotFoundException | ReflectionException e) {
         } catch (MBeanException | AttributeNotFoundException | InstanceNotFoundException | ReflectionException e) {
             log.error("Cannot find attribute from");
             log.error("Cannot find attribute from");
             log.error(e.getMessage());
             log.error(e.getMessage());
+            closeConnectionExceptionally(jmxUrl, srv);
         } catch (MalformedObjectNameException objectNameE) {
         } catch (MalformedObjectNameException objectNameE) {
             log.error("Cannot create objectName");
             log.error("Cannot create objectName");
             log.error(objectNameE.getMessage());
             log.error(objectNameE.getMessage());
+            closeConnectionExceptionally(jmxUrl, srv);
         } catch (Exception e) {
         } catch (Exception e) {
             log.error("Error while retrieving connection {} from pool", jmxUrl);
             log.error("Error while retrieving connection {} from pool", jmxUrl);
-            try {
-                pool.invalidateObject(jmxUrl, srv);
-            } catch (Exception ie) {
-                log.error("Cannot invalidate object to pool, {}", jmxUrl);
-            }
+            closeConnectionExceptionally(jmxUrl, srv);
         }
         }
         finally {
         finally {
             if (srv != null) {
             if (srv != null) {
                 try {
                 try {
                     pool.returnObject(jmxUrl, srv);
                     pool.returnObject(jmxUrl, srv);
                 } catch (Exception e) {
                 } catch (Exception e) {
-                    log.error("Cannot returl object to poll, {}", jmxUrl);
+                    log.error("Cannot return object to poll, {}", jmxUrl);
                 }
                 }
             }
             }
         }
         }
         return result;
         return result;
     }
     }
 
 
-    @PostConstruct
-    public void fillJmxPool() {
-        clustersStorage.getKafkaClusters().stream().forEach(c -> {
-            String jmxUrl = JMX_URL + c.getJmxHost() + ":" + c.getJmxPort() + "/" + JMX_SERVICE_TYPE;
-            try {
-                pool.addObject(jmxUrl);
-            } catch (Exception e) {
-                log.error("Cannot connect to {}", jmxUrl);
-            }
-        });
+    private void closeConnectionExceptionally(String url, JMXConnector srv) {
+        try {
+            pool.invalidateObject(url, srv);
+            srv.close();
+        } catch (IOException ioe) {
+            log.error("Cannot close connection with {}", url);
+        } catch (Exception e) {
+            log.error("Cannot invalidate object in pool, {}", url);
+        }
+    }
+
+
+    public void fillJmxPool(Node broker, KafkaCluster cluster) {
+        String jmxUrl = JMX_URL + broker.host() + ":" + cluster.getJmxPort() + "/" + JMX_SERVICE_TYPE;
+        try {
+            pool.addObject(jmxUrl);
+        } catch (Exception e) {
+            log.error("Cannot connect to {}", jmxUrl);
+        }
     }
     }
 }
 }

+ 7 - 33
kafka-ui-api/src/main/java/com/provectus/kafka/ui/kafka/KafkaService.java

@@ -1,6 +1,5 @@
 package com.provectus.kafka.ui.kafka;
 package com.provectus.kafka.ui.kafka;
 
 
-import com.provectus.kafka.ui.cluster.config.KafkaJmxProperties;
 import com.provectus.kafka.ui.cluster.model.*;
 import com.provectus.kafka.ui.cluster.model.*;
 import com.provectus.kafka.ui.cluster.util.ClusterUtil;
 import com.provectus.kafka.ui.cluster.util.ClusterUtil;
 import com.provectus.kafka.ui.cluster.util.JmxClusterUtil;
 import com.provectus.kafka.ui.cluster.util.JmxClusterUtil;
@@ -13,7 +12,6 @@ import lombok.RequiredArgsConstructor;
 import lombok.SneakyThrows;
 import lombok.SneakyThrows;
 import lombok.extern.log4j.Log4j2;
 import lombok.extern.log4j.Log4j2;
 import org.apache.kafka.clients.admin.*;
 import org.apache.kafka.clients.admin.*;
-import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.common.KafkaFuture;
 import org.apache.kafka.common.KafkaFuture;
@@ -21,8 +19,6 @@ import org.apache.kafka.common.Node;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.config.ConfigResource;
 import org.apache.kafka.common.config.ConfigResource;
 import org.apache.kafka.common.serialization.BytesDeserializer;
 import org.apache.kafka.common.serialization.BytesDeserializer;
-import org.apache.kafka.common.serialization.LongDeserializer;
-import org.apache.kafka.common.serialization.StringDeserializer;
 import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.common.utils.Bytes;
 import org.springframework.beans.factory.annotation.Value;
 import org.springframework.beans.factory.annotation.Value;
 import org.springframework.stereotype.Service;
 import org.springframework.stereotype.Service;
@@ -30,8 +26,7 @@ import reactor.core.publisher.Mono;
 import reactor.util.function.Tuple2;
 import reactor.util.function.Tuple2;
 import reactor.util.function.Tuples;
 import reactor.util.function.Tuples;
 
 
-import javax.management.MBeanServer;
-import java.lang.management.ManagementFactory;
+import java.math.BigDecimal;
 import java.util.*;
 import java.util.*;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.stream.Collectors;
 import java.util.stream.Collectors;
@@ -50,7 +45,7 @@ public class KafkaService {
     private final ZookeeperService zookeeperService;
     private final ZookeeperService zookeeperService;
     private final Map<String, ExtendedAdminClient> adminClientCache = new ConcurrentHashMap<>();
     private final Map<String, ExtendedAdminClient> adminClientCache = new ConcurrentHashMap<>();
     private final Map<AdminClient, Map<TopicPartition, Integer>> leadersCache = new ConcurrentHashMap<>();
     private final Map<AdminClient, Map<TopicPartition, Integer>> leadersCache = new ConcurrentHashMap<>();
-    private final KafkaJmxProperties kafkaJmxDto;
+    private final JmxClusterUtil jmxClusterUtil;
 
 
     @SneakyThrows
     @SneakyThrows
     public Mono<KafkaCluster> getUpdatedCluster(KafkaCluster cluster) {
     public Mono<KafkaCluster> getUpdatedCluster(KafkaCluster cluster) {
@@ -72,18 +67,6 @@ public class KafkaService {
         );
         );
     }
     }
 
 
-    public static Consumer<Long, String> createConsumer() {
-        Properties props = new Properties();
-        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:29091");
-        props.put(ConsumerConfig.GROUP_ID_CONFIG, "test");
-        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class.getName());
-        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
-        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
-        Consumer<Long, String> consumer = new KafkaConsumer<>(props);
-        consumer.subscribe(Collections.singletonList("users"));
-        return consumer;
-    }
-
     private KafkaCluster buildFromData(KafkaCluster currentCluster, InternalSegmentSizeDto segmentSizeDto) {
     private KafkaCluster buildFromData(KafkaCluster currentCluster, InternalSegmentSizeDto segmentSizeDto) {
 
 
         var topics = segmentSizeDto.getInternalTopicWithSegmentSize();
         var topics = segmentSizeDto.getInternalTopicWithSegmentSize();
@@ -181,17 +164,17 @@ public class KafkaService {
                 .flatMap(brokers ->
                 .flatMap(brokers ->
                     ClusterUtil.toMono(client.describeCluster().controller()).map(
                     ClusterUtil.toMono(client.describeCluster().controller()).map(
                         c -> {
                         c -> {
+                            jmxClusterUtil.fillJmxPool(c, cluster);
                             InternalClusterMetrics.InternalClusterMetricsBuilder metricsBuilder = InternalClusterMetrics.builder();
                             InternalClusterMetrics.InternalClusterMetricsBuilder metricsBuilder = InternalClusterMetrics.builder();
                             metricsBuilder.brokerCount(brokers.size()).activeControllers(c != null ? 1 : 0);
                             metricsBuilder.brokerCount(brokers.size()).activeControllers(c != null ? 1 : 0);
-                            Map<String, String> bytesInPerSec;
-                            Map<String, String> bytesOutPerSec;
-                            bytesInPerSec = JmxClusterUtil.getJmxTrafficMetrics(cluster.getJmxPort(), cluster.getJmxHost(), JmxClusterUtil.BYTES_IN_PER_SEC);
-                            bytesOutPerSec = JmxClusterUtil.getJmxTrafficMetrics(cluster.getJmxPort(), cluster.getJmxHost(), JmxClusterUtil.BYTES_OUT_PER_SEC);
+                            Map<String, BigDecimal> bytesInPerSec;
+                            Map<String, BigDecimal> bytesOutPerSec;
+                            bytesInPerSec = jmxClusterUtil.getJmxTrafficMetrics(cluster.getJmxPort(), c.host(), JmxClusterUtil.BYTES_IN_PER_SEC);
+                            bytesOutPerSec = jmxClusterUtil.getJmxTrafficMetrics(cluster.getJmxPort(), c.host(), JmxClusterUtil.BYTES_OUT_PER_SEC);
                             metricsBuilder
                             metricsBuilder
                                     .internalBrokerMetrics((brokers.stream().map(Node::id).collect(Collectors.toMap(k -> k, v -> InternalBrokerMetrics.builder().build()))))
                                     .internalBrokerMetrics((brokers.stream().map(Node::id).collect(Collectors.toMap(k -> k, v -> InternalBrokerMetrics.builder().build()))))
                                     .bytesOutPerSec(bytesOutPerSec)
                                     .bytesOutPerSec(bytesOutPerSec)
                                     .bytesInPerSec(bytesInPerSec);
                                     .bytesInPerSec(bytesInPerSec);
-
                             return metricsBuilder.build();
                             return metricsBuilder.build();
                         }
                         }
                     )
                     )
@@ -223,17 +206,8 @@ public class KafkaService {
                 );
                 );
     }
     }
 
 
-    @SneakyThrows
-    private Mono<String> getClusterId(AdminClient adminClient) {
-        return ClusterUtil.toMono(adminClient.describeCluster().clusterId());
-    }
-
-
     @SneakyThrows
     @SneakyThrows
     public Mono<ExtendedAdminClient> getOrCreateAdminClient(KafkaCluster cluster) {
     public Mono<ExtendedAdminClient> getOrCreateAdminClient(KafkaCluster cluster) {
-        Consumer<Long, String> kek = createConsumer();
-        MBeanServer server = ManagementFactory.getPlatformMBeanServer();
-
         return Mono.justOrEmpty(adminClientCache.get(cluster.getName()))
         return Mono.justOrEmpty(adminClientCache.get(cluster.getName()))
                 .switchIfEmpty(createAdminClient(cluster))
                 .switchIfEmpty(createAdminClient(cluster))
                 .map(e -> adminClientCache.computeIfAbsent(cluster.getName(), key -> e));
                 .map(e -> adminClientCache.computeIfAbsent(cluster.getName(), key -> e));

+ 0 - 3
kafka-ui-api/src/main/resources/application-local.yml

@@ -5,19 +5,16 @@ kafka:
       bootstrapServers: localhost:29091
       bootstrapServers: localhost:29091
       zookeeper: localhost:2183
       zookeeper: localhost:2183
       jmxPort: 9997
       jmxPort: 9997
-      jmxHost: localhost
     -
     -
       name: secondLocal
       name: secondLocal
       bootstrapServers: localhost:29092
       bootstrapServers: localhost:29092
       zookeeper: localhost:2182
       zookeeper: localhost:2182
       jmxPort: 9998
       jmxPort: 9998
-      jmxHost: localhost
     -
     -
       name: localReplica
       name: localReplica
       bootstrapServers: localhost:29093
       bootstrapServers: localhost:29093
       zookeeper: localhost:2183
       zookeeper: localhost:2183
       jmxPort: 9997
       jmxPort: 9997
-      jmxHost: localhost
   admin-client-timeout: 5000
   admin-client-timeout: 5000
 zookeeper:
 zookeeper:
   connection-timeout: 1000
   connection-timeout: 1000

+ 2 - 2
kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml

@@ -305,11 +305,11 @@ components:
         bytesInPerSec:
         bytesInPerSec:
           type: object
           type: object
           additionalProperties:
           additionalProperties:
-            type: string
+            type: number
         bytesOutPerSec:
         bytesOutPerSec:
           type: object
           type: object
           additionalProperties:
           additionalProperties:
-            type: string
+            type: number
       required:
       required:
         - id
         - id
         - name
         - name

+ 1 - 1
pom.xml

@@ -28,7 +28,7 @@
 		<swagger-annotations.version>1.6.0</swagger-annotations.version>
 		<swagger-annotations.version>1.6.0</swagger-annotations.version>
 		<springdoc-openapi-webflux-ui.version>1.2.32</springdoc-openapi-webflux-ui.version>
 		<springdoc-openapi-webflux-ui.version>1.2.32</springdoc-openapi-webflux-ui.version>
 		<kafka.version>2.4.1</kafka.version>
 		<kafka.version>2.4.1</kafka.version>
-		<apache.commons.version>2.7.0</apache.commons.version>
+		<apache.commons.version>2.2</apache.commons.version>
 	</properties>
 	</properties>
 
 
 	<groupId>com.provectus</groupId>
 	<groupId>com.provectus</groupId>