Added bytesIn/OutPerSec params for clusterMetrics object (#56)

* Added bytesIn/OutPerSec params for clusterMetrics object

* Removed redundant todos, cleaned imports

* Jmx connections moved to pool, methods moved to separate classes

* Added pool handling and returning methods

* Fix after previous PR comments - fixed result map, configured pool, removed redundant methods and code

* Removed redundant imports and empty initialization

* Removed fill method

* Closing connection replaced to destroyObject method

* Try catch block while returning object to pool was fixed

* Removed redundant logs and try catch

Co-authored-by: Roman Nedzvetskiy <roman@Romans-MacBook-Pro.local>
This commit is contained in:
Roman Nedzvetskiy 2020-06-17 14:44:34 +03:00 committed by GitHub
parent f974febb5f
commit 32a09faa95
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
15 changed files with 195 additions and 36 deletions

View file

@ -100,6 +100,12 @@
<artifactId>reactor-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-pool2</artifactId>
<version>${apache.commons.version}</version>
</dependency>
</dependencies>
<build>

View file

@ -21,5 +21,6 @@ public class ClustersProperties {
String zookeeper;
String schemaRegistry;
String schemaNameTemplate = "%s-value";
int jmxPort;
}
}

View file

@ -0,0 +1,38 @@
package com.provectus.kafka.ui.cluster.config;
import com.provectus.kafka.ui.cluster.util.JmxPoolFactory;
import org.apache.commons.pool2.KeyedObjectPool;
import org.apache.commons.pool2.impl.GenericKeyedObjectPool;
import org.apache.commons.pool2.impl.GenericKeyedObjectPoolConfig;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jmx.export.MBeanExporter;
import javax.management.remote.JMXConnector;
@Configuration
public class Config {
@Bean
public KeyedObjectPool<String, JMXConnector> pool() {
GenericKeyedObjectPool<String, JMXConnector> pool = new GenericKeyedObjectPool<>(new JmxPoolFactory());
pool.setConfig(poolConfig());
return pool;
}
private GenericKeyedObjectPoolConfig poolConfig() {
GenericKeyedObjectPoolConfig poolConfig = new GenericKeyedObjectPoolConfig();
poolConfig.setMaxIdlePerKey(3);
poolConfig.setMaxTotalPerKey(3);
return poolConfig;
}
@Bean
public MBeanExporter exporter()
{
final MBeanExporter exporter = new MBeanExporter();
exporter.setAutodetect(true);
exporter.setExcludedBeans("pool");
return exporter;
}
}

View file

@ -3,6 +3,7 @@ package com.provectus.kafka.ui.cluster.model;
import lombok.Builder;
import lombok.Data;
import java.math.BigDecimal;
import java.util.Map;
@ -18,11 +19,9 @@ public class InternalClusterMetrics {
private final int offlinePartitionCount;
private final int inSyncReplicasCount;
private final int outOfSyncReplicasCount;
//TODO: find way to fill
private final int bytesInPerSec;
private final int bytesOutPerSec;
private final Map<String, BigDecimal> bytesInPerSec;
private final Map<String, BigDecimal> bytesOutPerSec;
private final int segmentCount;
//TODO: find way to fill
private final long segmentSize;
private final Map<Integer, InternalBrokerMetrics> internalBrokerMetrics;
private final int zooKeeperStatus;

View file

@ -11,8 +11,7 @@ import java.util.Map;
public class KafkaCluster {
private final String name;
private final String jmxHost;
private final String jmxPort;
private final int jmxPort;
private final String bootstrapServers;
private final String zookeeper;
private final String schemaRegistry;

View file

@ -9,8 +9,6 @@ import com.provectus.kafka.ui.kafka.KafkaService;
import com.provectus.kafka.ui.model.*;
import lombok.RequiredArgsConstructor;
import lombok.SneakyThrows;
import org.apache.kafka.clients.admin.ConsumerGroupListing;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
@ -20,7 +18,6 @@ import org.springframework.stereotype.Service;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import java.time.OffsetDateTime;
import java.util.*;
import java.util.stream.Collectors;
import java.util.stream.Stream;

View file

@ -112,7 +112,7 @@ public class ConsumingService {
.collect(Collectors.toMap(
partitionPosition -> new TopicPartition(topic, partitionPosition.getKey()),
Map.Entry::getValue
));
));
consumer.offsetsForTimes(timestampsToSearch)
.forEach((topicPartition, offsetAndTimestamp) ->
consumer.seek(topicPartition, offsetAndTimestamp.offset())

View file

@ -2,7 +2,6 @@ package com.provectus.kafka.ui.cluster.service;
import com.provectus.kafka.ui.cluster.model.KafkaCluster;
import com.provectus.kafka.ui.kafka.KafkaService;
import com.provectus.kafka.ui.zookeeper.ZookeeperService;
import lombok.RequiredArgsConstructor;
import lombok.extern.log4j.Log4j2;
import org.springframework.stereotype.Service;

View file

@ -4,8 +4,6 @@ import com.provectus.kafka.ui.cluster.model.*;
import com.provectus.kafka.ui.cluster.deserialization.RecordDeserializer;
import com.provectus.kafka.ui.model.*;
import lombok.extern.slf4j.Slf4j;
import com.provectus.kafka.ui.model.TopicMessage;
import org.apache.kafka.clients.admin.*;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
@ -15,13 +13,11 @@ import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.config.ConfigResource;
import org.apache.kafka.common.record.TimestampType;
import org.apache.kafka.common.utils.Bytes;
import reactor.core.publisher.Mono;
import java.time.Instant;
import java.time.OffsetDateTime;
import java.time.ZoneId;
import java.util.HashMap;
import java.util.*;
import java.util.stream.Collectors;
import java.util.stream.Stream;
@ -32,13 +28,17 @@ import static org.apache.kafka.common.config.TopicConfig.MESSAGE_FORMAT_VERSION_
@Slf4j
public class ClusterUtil {
private static final String CLUSTER_VERSION_PARAM_KEY = "inter.broker.protocol.version";
private static final ZoneId UTC_ZONE_ID = ZoneId.of("UTC");
public static <T> Mono<T> toMono(KafkaFuture<T> future){
return Mono.create(sink -> future.whenComplete((res, ex)->{
if (ex!=null) {
public static <T> Mono<T> toMono(KafkaFuture<T> future) {
return Mono.create(sink -> future.whenComplete((res, ex) -> {
if (ex != null) {
sink.error(ex);
} else {
sink.success(res);
@ -46,9 +46,9 @@ public class ClusterUtil {
}));
}
public static Mono<String> toMono(KafkaFuture<Void> future, String topicName){
return Mono.create(sink -> future.whenComplete((res, ex)->{
if (ex!=null) {
public static Mono<String> toMono(KafkaFuture<Void> future, String topicName) {
return Mono.create(sink -> future.whenComplete((res, ex) -> {
if (ex != null) {
sink.error(ex);
} else {
sink.success(topicName);
@ -111,7 +111,7 @@ public class ClusterUtil {
partitionDto.inSyncReplicasCount(partition.isr().size());
partitionDto.replicasCount(partition.replicas().size());
List<InternalReplica> replicas = partition.replicas().stream().map(
r -> new InternalReplica(r.id(), partition.leader().id()!=r.id(), partition.isr().contains(r)))
r -> new InternalReplica(r.id(), partition.leader().id() != r.id(), partition.isr().contains(r)))
.collect(Collectors.toList());
partitionDto.replicas(replicas);
return partitionDto.build();
@ -185,6 +185,7 @@ public class ClusterUtil {
throw new IllegalArgumentException("Unknown timestampType: " + timestampType);
}
}
public static Mono<Set<ExtendedAdminClient.SupportedFeature>> getSupportedFeatures(AdminClient adminClient) {
return ClusterUtil.toMono(adminClient.describeCluster().controller())
.map(Node::id)
@ -210,7 +211,7 @@ public class ClusterUtil {
}
}
public static Topic convertToTopic (InternalTopic internalTopic) {
public static Topic convertToTopic(InternalTopic internalTopic) {
Topic topic = new Topic();
topic.setName(internalTopic.getName());
List<Partition> partitions = internalTopic.getPartitions().stream().flatMap(s -> {

View file

@ -0,0 +1,74 @@
package com.provectus.kafka.ui.cluster.util;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.pool2.KeyedObjectPool;
import org.springframework.stereotype.Component;
import javax.management.*;
import javax.management.remote.JMXConnector;
import java.io.IOException;
import java.math.BigDecimal;
import java.net.MalformedURLException;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@Component
@Slf4j
@RequiredArgsConstructor
public class JmxClusterUtil {
private final KeyedObjectPool<String, JMXConnector> pool;
private static final String JMX_URL = "service:jmx:rmi:///jndi/rmi://";
private static final String JMX_SERVICE_TYPE = "jmxrmi";
public static final String BYTES_IN_PER_SEC = "BytesInPerSec";
public static final String BYTES_OUT_PER_SEC = "BytesOutPerSec";
private static final String BYTES_IN_PER_SEC_MBEAN_OBJECT_NAME = "kafka.server:type=BrokerTopicMetrics,name=" + BYTES_IN_PER_SEC;
private static final String BYTES_OUT_PER_SEC_MBEAN_OBJECT_NAME = "kafka.server:type=BrokerTopicMetrics,name=" + BYTES_OUT_PER_SEC;
private static final List<String> attrNames = Arrays.asList("OneMinuteRate", "FiveMinuteRate", "FifteenMinuteRate");
public Map<String, BigDecimal> getJmxTrafficMetrics(int jmxPort, String jmxHost, String metricName) {
String jmxUrl = JMX_URL + jmxHost + ":" + jmxPort + "/" + JMX_SERVICE_TYPE;
Map<String, BigDecimal> result = new HashMap<>();
JMXConnector srv = null;
try {
srv = pool.borrowObject(jmxUrl);
MBeanServerConnection msc = srv.getMBeanServerConnection();
ObjectName name = metricName.equals(BYTES_IN_PER_SEC) ? new ObjectName(BYTES_IN_PER_SEC_MBEAN_OBJECT_NAME) :
new ObjectName(BYTES_OUT_PER_SEC_MBEAN_OBJECT_NAME);
for (String attrName : attrNames) {
result.put(attrName, BigDecimal.valueOf((Double) msc.getAttribute(name, attrName)));
}
pool.returnObject(jmxUrl, srv);
} catch (MalformedURLException url) {
log.error("Cannot create JmxServiceUrl from {}", jmxUrl);
closeConnectionExceptionally(jmxUrl, srv);
} catch (IOException io) {
log.error("Cannot connect to KafkaJmxServer with url {}", jmxUrl);
closeConnectionExceptionally(jmxUrl, srv);
} catch (MBeanException | AttributeNotFoundException | InstanceNotFoundException | ReflectionException e) {
log.error("Cannot find attribute", e);
closeConnectionExceptionally(jmxUrl, srv);
} catch (MalformedObjectNameException objectNameE) {
log.error("Cannot create objectName", objectNameE);
closeConnectionExceptionally(jmxUrl, srv);
} catch (Exception e) {
log.error("Error while retrieving connection {} from pool", jmxUrl);
closeConnectionExceptionally(jmxUrl, srv);
}
return result;
}
private void closeConnectionExceptionally(String url, JMXConnector srv) {
try {
pool.invalidateObject(url, srv);
} catch (Exception e) {
log.error("Cannot invalidate object in pool, {}", url);
}
}
}

View file

@ -0,0 +1,34 @@
package com.provectus.kafka.ui.cluster.util;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.pool2.BaseKeyedPooledObjectFactory;
import org.apache.commons.pool2.PooledObject;
import org.apache.commons.pool2.impl.DefaultPooledObject;
import javax.management.remote.JMXConnector;
import javax.management.remote.JMXConnectorFactory;
import javax.management.remote.JMXServiceURL;
import java.io.IOException;
@Slf4j
public class JmxPoolFactory extends BaseKeyedPooledObjectFactory<String, JMXConnector> {
@Override
public JMXConnector create(String s) throws Exception {
return JMXConnectorFactory.connect(new JMXServiceURL(s));
}
@Override
public PooledObject<JMXConnector> wrap(JMXConnector jmxConnector) {
return new DefaultPooledObject<>(jmxConnector);
}
@Override
public void destroyObject(String key, PooledObject<JMXConnector> p) {
try {
p.getObject().close();
} catch (IOException e) {
log.error("Cannot close connection with {}", key);
}
}
}

View file

@ -2,6 +2,7 @@ package com.provectus.kafka.ui.kafka;
import com.provectus.kafka.ui.cluster.model.*;
import com.provectus.kafka.ui.cluster.util.ClusterUtil;
import com.provectus.kafka.ui.cluster.util.JmxClusterUtil;
import com.provectus.kafka.ui.model.ConsumerGroup;
import com.provectus.kafka.ui.model.ServerStatus;
import com.provectus.kafka.ui.model.Topic;
@ -17,14 +18,15 @@ import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.config.ConfigResource;
import org.springframework.beans.factory.annotation.Value;
import org.apache.kafka.common.serialization.BytesDeserializer;
import org.apache.kafka.common.utils.Bytes;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import reactor.core.publisher.Mono;
import reactor.util.function.Tuple2;
import reactor.util.function.Tuples;
import java.math.BigDecimal;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
@ -43,11 +45,12 @@ public class KafkaService {
private final ZookeeperService zookeeperService;
private final Map<String, ExtendedAdminClient> adminClientCache = new ConcurrentHashMap<>();
private final Map<AdminClient, Map<TopicPartition, Integer>> leadersCache = new ConcurrentHashMap<>();
private final JmxClusterUtil jmxClusterUtil;
@SneakyThrows
public Mono<KafkaCluster> getUpdatedCluster(KafkaCluster cluster) {
return getOrCreateAdminClient(cluster).flatMap(
ac -> getClusterMetrics(ac.getAdminClient())
ac -> getClusterMetrics(cluster, ac.getAdminClient())
.flatMap( clusterMetrics ->
getTopicsData(ac.getAdminClient()).flatMap( topics ->
@ -156,17 +159,19 @@ public class KafkaService {
.map( m -> m.values().stream().map(ClusterUtil::mapToInternalTopic).collect(Collectors.toList()));
}
private Mono<InternalClusterMetrics> getClusterMetrics(AdminClient client) {
private Mono<InternalClusterMetrics> getClusterMetrics(KafkaCluster cluster, AdminClient client) {
return ClusterUtil.toMono(client.describeCluster().nodes())
.flatMap(brokers ->
ClusterUtil.toMono(client.describeCluster().controller()).map(
c -> {
InternalClusterMetrics.InternalClusterMetricsBuilder metricsBuilder = InternalClusterMetrics.builder();
metricsBuilder.brokerCount(brokers.size()).activeControllers(c != null ? 1 : 0);
// TODO: fill bytes in/out metrics
Map<String, BigDecimal> bytesInPerSec = jmxClusterUtil.getJmxTrafficMetrics(cluster.getJmxPort(), c.host(), JmxClusterUtil.BYTES_IN_PER_SEC);
Map<String, BigDecimal> bytesOutPerSec = jmxClusterUtil.getJmxTrafficMetrics(cluster.getJmxPort(), c.host(), JmxClusterUtil.BYTES_OUT_PER_SEC);
metricsBuilder
.internalBrokerMetrics((brokers.stream().map(Node::id).collect(Collectors.toMap(k -> k, v -> InternalBrokerMetrics.builder().build()))));
.internalBrokerMetrics((brokers.stream().map(Node::id).collect(Collectors.toMap(k -> k, v -> InternalBrokerMetrics.builder().build()))))
.bytesOutPerSec(bytesOutPerSec)
.bytesInPerSec(bytesInPerSec);
return metricsBuilder.build();
}
)
@ -199,11 +204,6 @@ public class KafkaService {
}
@SneakyThrows
private Mono<String> getClusterId(AdminClient adminClient) {
return ClusterUtil.toMono(adminClient.describeCluster().clusterId());
}
public Mono<ExtendedAdminClient> getOrCreateAdminClient(KafkaCluster cluster) {
return Mono.justOrEmpty(adminClientCache.get(cluster.getName()))
.switchIfEmpty(createAdminClient(cluster))

View file

@ -6,14 +6,20 @@ kafka:
zookeeper: localhost:2183
schemaRegistry: http://localhost:8085
# schemaNameTemplate: "%s-value"
jmxPort: 9997
-
name: secondLocal
bootstrapServers: localhost:29092
zookeeper: localhost:2182
jmxPort: 9998
-
name: localReplica
bootstrapServers: localhost:29093
zookeeper: localhost:2183
jmxPort: 9997
admin-client-timeout: 5000
zookeeper:
connection-timeout: 1000
spring:
jmx:
enabled: true

View file

@ -304,9 +304,13 @@ components:
topicCount:
type: integer
bytesInPerSec:
type: integer
type: object
additionalProperties:
type: number
bytesOutPerSec:
type: integer
type: object
additionalProperties:
type: number
required:
- id
- name

View file

@ -30,6 +30,7 @@
<kafka.version>2.4.1</kafka.version>
<avro.version>1.9.2</avro.version>
<confluent.version>5.5.0</confluent.version>
<apache.commons.version>2.2</apache.commons.version>
</properties>
<repositories>