Backend jmx metrics (#64)

* Start doing endpoint for jmx metrics

* Added endpoint for getting jmx metric per broker

* Cluster jmx metrics sum endpoit added

* Added endpoints for cluster metrics and broker metrics

* Cleared some code

* Fixed jmxmetrics names

* Changed to all values in metrics

* Removed redundant imports

* Renamed param constant

* Changed to calculate brokers and clusters metrics in one place

* Removed redundant imports

* Fixed some mistakes

* Replaced multiple method usage into single

* Fixed mulptiple call

* Removed cluster level metrics, now only broker-level metrics in cluster

* Just small fixes

* removed redundant variable

* Renamed method for cluster level metrics

* Fixed after PR and added sum for number cluster metrics by num and persec keywords in canonicalname

* Added metricdto object

* Added list of metrics to enum

* Renames and optimizings

* Renamed jmxmetrics objects param to metrics

Co-authored-by: Roman Nedzvetskiy <roman@Romans-MacBook-Pro.local>
This commit is contained in:
Roman Nedzvetskiy 2020-07-30 14:03:07 +03:00 committed by GitHub
parent 66afaa4971
commit efc35a9cfb
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
16 changed files with 260 additions and 118 deletions

View file

@ -23,39 +23,35 @@ services:
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka0:9092,PLAINTEXT_HOST://localhost:29091 #,PLAIN://kafka0:29090
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT #,PLAIN:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 2
JMX_PORT: 9997
KAFKA_JMX_OPTS: -Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Djava.rmi.server.hostname=localhost -Dcom.sun.management.jmxremote.rmi.port=9997
kafka01:
image: confluentinc/cp-kafka:5.1.0
depends_on:
- zookeeper0
ports:
- 29093:29093
- 9999:9999
environment:
KAFKA_BROKER_ID: 2
KAFKA_ZOOKEEPER_CONNECT: zookeeper0:2183
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka01:9092,PLAINTEXT_HOST://localhost:29093,PLAIN://kafka0:29090
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT,PLAIN:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 2
JMX_PORT: 9997
KAFKA_JMX_OPTS: -Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Djava.rmi.server.hostname=localhost -Dcom.sun.management.jmxremote.rmi.port=9997
#
# kafka01:
# image: confluentinc/cp-kafka:5.1.0
# depends_on:
# - zookeeper0
# ports:
# - 29093:29093
# - 9999:9999
# environment:
# KAFKA_BROKER_ID: 2
# KAFKA_ZOOKEEPER_CONNECT: zookeeper0:2183
# KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka01:9092,PLAINTEXT_HOST://localhost:29093,PLAIN://kafka0:29090
# KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT,PLAIN:PLAINTEXT
# KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
# KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 2
# JMX_PORT: 9997
# KAFKA_JMX_OPTS: -Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Djava.rmi.server.hostname=localhost -Dcom.sun.management.jmxremote.rmi.port=9997
kafka-init-topics0:
image: confluentinc/cp-kafka:5.1.0
depends_on:
- kafka0
- kafka1
command:
"kafka-console-producer --broker-list kafka1:9092 --topic secondUsers && \
This is message 1 && \
This is message 2 && \
This is message 3 && \
Message 4 && \
Message 5"
command: "bash -c 'echo Waiting for Kafka to be ready... && \
cub kafka-ready -b kafka0:9092 1 20 && \
kafka-topics --create --topic users --partitions 2 --replication-factor 2 --if-not-exists --zookeeper zookeeper0:2183 && \
kafka-topics --create --topic messages --partitions 3 --replication-factor 2 --if-not-exists --zookeeper zookeeper0:2183'"
environment:
KAFKA_BROKER_ID: ignored
KAFKA_ZOOKEEPER_CONNECT: ignored
@ -101,21 +97,21 @@ services:
networks:
- default
# schemaregistry0:
# image: confluentinc/cp-schema-registry:5.1.0
# depends_on:
# - zookeeper0
# - kafka0
# - kafka01
# ports:
# - 8085:8085
# environment:
# SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: PLAINTEXT://kafka0:9092,PLAINTEXT://kafka01:9092
# SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL: zookeeper0:2183
# SCHEMA_REGISTRY_KAFKASTORE_SECURITY_PROTOCOL: PLAINTEXT
# SCHEMA_REGISTRY_HOST_NAME: schemaregistry
# SCHEMA_REGISTRY_LISTENERS: http://0.0.0.0:8085
#
# SCHEMA_REGISTRY_SCHEMA_REGISTRY_INTER_INSTANCE_PROTOCOL: "http"
# SCHEMA_REGISTRY_LOG4J_ROOT_LOGLEVEL: INFO
# SCHEMA_REGISTRY_KAFKASTORE_TOPIC: _schemas
schemaregistry0:
image: confluentinc/cp-schema-registry:5.1.0
depends_on:
- zookeeper0
- kafka0
- kafka01
ports:
- 8085:8085
environment:
SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: PLAINTEXT://kafka0:9092,PLAINTEXT://kafka01:9092
SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL: zookeeper0:2183
SCHEMA_REGISTRY_KAFKASTORE_SECURITY_PROTOCOL: PLAINTEXT
SCHEMA_REGISTRY_HOST_NAME: schemaregistry
SCHEMA_REGISTRY_LISTENERS: http://0.0.0.0:8085
SCHEMA_REGISTRY_SCHEMA_REGISTRY_INTER_INSTANCE_PROTOCOL: "http"
SCHEMA_REGISTRY_LOG4J_ROOT_LOGLEVEL: INFO
SCHEMA_REGISTRY_KAFKASTORE_TOPIC: _schemas

View file

@ -1,5 +1,6 @@
package com.provectus.kafka.ui.cluster.config;
import com.provectus.kafka.ui.cluster.util.JmxMetricsNames;
import com.provectus.kafka.ui.cluster.util.JmxPoolFactory;
import org.apache.commons.pool2.KeyedObjectPool;
import org.apache.commons.pool2.impl.GenericKeyedObjectPool;
@ -9,6 +10,9 @@ import org.springframework.context.annotation.Configuration;
import org.springframework.jmx.export.MBeanExporter;
import javax.management.remote.JMXConnector;
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.Stream;
@Configuration
public class Config {
@ -35,4 +39,9 @@ public class Config {
exporter.setExcludedBeans("pool");
return exporter;
}
@Bean
public List<String> jmxMetricsNames() {
return Stream.of(JmxMetricsNames.values()).map(Enum::name).collect(Collectors.toList());
}
}

View file

@ -9,24 +9,16 @@ import com.provectus.kafka.ui.model.*;
import org.mapstruct.Mapper;
import org.mapstruct.Mapping;
import java.math.BigDecimal;
@Mapper(componentModel = "spring")
public interface ClusterMapper {
KafkaCluster toKafkaCluster(ClustersProperties.Cluster clusterProperties);
@Mapping(target = "brokerCount", source = "metrics.brokerCount")
@Mapping(target = "onlinePartitionCount", source = "metrics.onlinePartitionCount")
@Mapping(target = "topicCount", source = "metrics.topicCount")
@Mapping(target = "bytesInPerSec", source = "metrics.bytesInPerSec")
@Mapping(target = "bytesOutPerSec", source = "metrics.bytesOutPerSec")
@Mapping(target = "metrics", source = "metrics.metrics")
Cluster toCluster(KafkaCluster cluster);
default BigDecimal map (Number number) {
return new BigDecimal(number.toString());
}
KafkaCluster toKafkaCluster(ClustersProperties.Cluster clusterProperties);
BrokersMetrics toBrokerMetrics(InternalClusterMetrics metrics);
Topic toTopic(InternalTopic topic);
TopicDetails toTopicDetails(InternalTopic topic);

View file

@ -1,10 +1,14 @@
package com.provectus.kafka.ui.cluster.model;
import com.provectus.kafka.ui.model.Metric;
import lombok.Builder;
import lombok.Data;
import java.util.List;
@Data
@Builder(toBuilder = true)
public class InternalBrokerMetrics {
private final Long segmentSize;
private final List<Metric> jmxMetrics;
}

View file

@ -1,9 +1,10 @@
package com.provectus.kafka.ui.cluster.model;
import com.provectus.kafka.ui.model.Metric;
import lombok.Builder;
import lombok.Data;
import java.math.BigDecimal;
import java.util.List;
import java.util.Map;
@ -24,5 +25,6 @@ public class InternalClusterMetrics {
private final int segmentCount;
private final long segmentSize;
private final Map<Integer, InternalBrokerMetrics> internalBrokerMetrics;
private final List<Metric> metrics;
private final int zooKeeperStatus;
}

View file

@ -22,7 +22,6 @@ public class InternalTopic {
private final int inSyncReplicas;
private final int replicationFactor;
private final int underReplicatedPartitions;
//TODO: find way to fill
private final long segmentSize;
private final int segmentCount;
private final Map<TopicPartition, Long> partitionSegmentSize;

View file

@ -0,0 +1,14 @@
package com.provectus.kafka.ui.cluster.model;
import lombok.AllArgsConstructor;
import lombok.Getter;
import java.math.BigDecimal;
@Getter
@AllArgsConstructor
public class MetricDto {
private String canonicalName;
private String metricName;
private BigDecimal value;
}

View file

@ -38,10 +38,15 @@ public class ClusterService {
.collect(Collectors.toList());
}
public Optional<BrokersMetrics> getBrokersMetrics(String name) {
return clustersStorage.getClusterByName(name)
public Mono<BrokersMetrics> getBrokersMetrics(String name, Integer id) {
return Mono.justOrEmpty(clustersStorage.getClusterByName(name)
.map(KafkaCluster::getMetrics)
.map(clusterMapper::toBrokerMetrics);
.map(s -> {
var brokerMetrics = clusterMapper.toBrokerMetrics(s);
brokerMetrics.setMetrics(s.getInternalBrokerMetrics().get(id).getJmxMetrics());
brokerMetrics.setSegmentZise(Long.valueOf(s.getSegmentSize()).intValue());
return brokerMetrics;
}));
}
public List<Topic> getTopics(String name) {
@ -127,6 +132,7 @@ public class ClusterService {
.map(n -> n.stream().map(node -> {
Broker broker = new Broker();
broker.setId(node.idString());
broker.setHost(node.host());
return broker;
}).collect(Collectors.toList())))
.flatMapMany(Flux::fromIterable);
@ -154,6 +160,5 @@ public class ClusterService {
return clustersStorage.getClusterByName(clusterName)
.map(c -> consumingService.loadMessages(c, topicName, consumerPosition, query, limit))
.orElse(Flux.empty());
}
}

View file

@ -1,7 +1,7 @@
package com.provectus.kafka.ui.cluster.util;
import com.provectus.kafka.ui.cluster.model.*;
import com.provectus.kafka.ui.cluster.deserialization.RecordDeserializer;
import com.provectus.kafka.ui.cluster.model.*;
import com.provectus.kafka.ui.model.*;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.admin.*;
@ -28,10 +28,6 @@ import static org.apache.kafka.common.config.TopicConfig.MESSAGE_FORMAT_VERSION_
@Slf4j
public class ClusterUtil {
private static final String CLUSTER_VERSION_PARAM_KEY = "inter.broker.protocol.version";
private static final ZoneId UTC_ZONE_ID = ZoneId.of("UTC");
@ -56,7 +52,7 @@ public class ClusterUtil {
}));
}
public static ConsumerGroup convertToConsumerGroup(ConsumerGroupDescription c, KafkaCluster cluster) {
public static ConsumerGroup convertToConsumerGroup(ConsumerGroupDescription c) {
ConsumerGroup consumerGroup = new ConsumerGroup();
consumerGroup.setConsumerGroupId(c.groupId());
consumerGroup.setNumConsumers(c.members().size());

View file

@ -1,5 +1,8 @@
package com.provectus.kafka.ui.cluster.util;
import com.provectus.kafka.ui.cluster.model.InternalClusterMetrics;
import com.provectus.kafka.ui.cluster.model.MetricDto;
import com.provectus.kafka.ui.model.Metric;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.pool2.KeyedObjectPool;
@ -10,10 +13,12 @@ import javax.management.remote.JMXConnector;
import java.io.IOException;
import java.math.BigDecimal;
import java.net.MalformedURLException;
import java.util.Arrays;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.function.Function;
import java.util.stream.Collectors;
@Component
@Slf4j
@ -21,31 +26,50 @@ import java.util.Map;
public class JmxClusterUtil {
private final KeyedObjectPool<String, JMXConnector> pool;
private final List<String> jmxMetricsNames;
private static final String JMX_URL = "service:jmx:rmi:///jndi/rmi://";
private static final String JMX_SERVICE_TYPE = "jmxrmi";
private static final String KAFKA_SERVER_PARAM = "kafka.server";
private static final String NAME_METRIC_FIELD = "name=";
public static final String BYTES_IN_PER_SEC = "BytesInPerSec";
public static final String BYTES_OUT_PER_SEC = "BytesOutPerSec";
private static final String BYTES_IN_PER_SEC_MBEAN_OBJECT_NAME = "kafka.server:type=BrokerTopicMetrics,name=" + BYTES_IN_PER_SEC;
private static final String BYTES_OUT_PER_SEC_MBEAN_OBJECT_NAME = "kafka.server:type=BrokerTopicMetrics,name=" + BYTES_OUT_PER_SEC;
private static final List<String> attrNames = Arrays.asList("OneMinuteRate", "FiveMinuteRate", "FifteenMinuteRate");
public Map<String, Number> getJmxTrafficMetrics(int jmxPort, String jmxHost, String metricName) {
public List<Metric> getJmxMetrics(int jmxPort, String jmxHost) {
String jmxUrl = JMX_URL + jmxHost + ":" + jmxPort + "/" + JMX_SERVICE_TYPE;
Map<String, Number> result = new HashMap<>();
List<Metric> result = new ArrayList<>();
JMXConnector srv = null;
try {
srv = pool.borrowObject(jmxUrl);
MBeanServerConnection msc = srv.getMBeanServerConnection();
ObjectName name = metricName.equals(BYTES_IN_PER_SEC) ? new ObjectName(BYTES_IN_PER_SEC_MBEAN_OBJECT_NAME) :
new ObjectName(BYTES_OUT_PER_SEC_MBEAN_OBJECT_NAME);
for (String attrName : attrNames) {
Number value = (Number) msc.getAttribute(name, attrName);
result.put(attrName, value instanceof Double ? BigDecimal.valueOf((Double) value) : Integer.valueOf(value.toString()));
}
var jmxMetrics = msc.queryNames(null, null).stream().filter(q -> q.getCanonicalName().startsWith(KAFKA_SERVER_PARAM)).collect(Collectors.toList());
for (ObjectName jmxMetric : jmxMetrics) {
Metric metric = new Metric();
metric.setCanonicalName(jmxMetric.getCanonicalName());
metric.setValue(getJmxMetric(jmxMetric.getCanonicalName(), msc, srv, jmxUrl));
result.add(metric);
};
pool.returnObject(jmxUrl, srv);
} catch (IOException ioe) {
log.error("Cannot get jmxMetricsNames, {}", jmxUrl, ioe);
closeConnectionExceptionally(jmxUrl, srv);
} catch (Exception e) {
log.error("Cannot get JmxConnection from pool, {}", jmxUrl, e);
closeConnectionExceptionally(jmxUrl, srv);
}
return result;
}
private Map<String, BigDecimal> getJmxMetric(String canonicalName, MBeanServerConnection msc, JMXConnector srv, String jmxUrl) {
Map<String, BigDecimal> resultAttr = new HashMap<>();
try {
ObjectName name = new ObjectName(canonicalName);
var attrNames = msc.getMBeanInfo(name).getAttributes();
for (MBeanAttributeInfo attrName : attrNames) {
var value = msc.getAttribute(name, attrName.getName());
if (value instanceof Number) {
if (!(value instanceof Double) || !((Double) value).isInfinite())
resultAttr.put(attrName.getName(), new BigDecimal(value.toString()));
}
}
} catch (MalformedURLException url) {
log.error("Cannot create JmxServiceUrl from {}", jmxUrl);
closeConnectionExceptionally(jmxUrl, srv);
@ -62,7 +86,7 @@ public class JmxClusterUtil {
log.error("Error while retrieving connection {} from pool", jmxUrl);
closeConnectionExceptionally(jmxUrl, srv);
}
return result;
return resultAttr;
}
private void closeConnectionExceptionally(String url, JMXConnector srv) {
@ -72,4 +96,35 @@ public class JmxClusterUtil {
log.error("Cannot invalidate object in pool, {}", url);
}
}
public List<MetricDto> convertToMetricDto(InternalClusterMetrics internalClusterMetrics) {
return internalClusterMetrics.getInternalBrokerMetrics().values().stream()
.map(c ->
c.getJmxMetrics().stream()
.filter(j -> isSameMetric(j.getCanonicalName()))
.map(j -> j.getValue().entrySet().stream()
.map(e -> new MetricDto(j.getCanonicalName(), e.getKey(), e.getValue()))))
.flatMap(Function.identity()).flatMap(Function.identity()).collect(Collectors.toList());
}
public Metric reduceJmxMetrics (Metric metric1, Metric metric2) {
var result = new Metric();
Map<String, BigDecimal> jmx1 = new HashMap<>(metric1.getValue());
Map<String, BigDecimal> jmx2 = new HashMap<>(metric2.getValue());
jmx1.forEach((k, v) -> jmx2.merge(k, v, BigDecimal::add));
result.setCanonicalName(metric1.getCanonicalName());
result.setValue(jmx2);
return result;
}
private boolean isSameMetric (String metric) {
if (metric.contains(NAME_METRIC_FIELD)) {
int beginIndex = metric.indexOf(NAME_METRIC_FIELD);
int endIndex = metric.indexOf(',', beginIndex);
endIndex = endIndex < 0 ? metric.length() - 1 : endIndex;
return jmxMetricsNames.contains(metric.substring(beginIndex + 5, endIndex));
} else {
return false;
}
}
}

View file

@ -0,0 +1,31 @@
package com.provectus.kafka.ui.cluster.util;
public enum JmxMetricsNames {
MessagesInPerSec,
BytesInPerSec,
ReplicationBytesInPerSec,
RequestsPerSec,
ErrorsPerSec,
MessageConversionsPerSec,
BytesOutPerSec,
ReplicationBytesOutPerSec,
NoKeyCompactedTopicRecordsPerSec,
InvalidMagicNumberRecordsPerSec,
InvalidMessageCrcRecordsPerSec,
InvalidOffsetOrSequenceRecordsPerSec,
UncleanLeaderElectionsPerSec,
IsrShrinksPerSec,
IsrExpandsPerSec,
ReassignmentBytesOutPerSec,
ReassignmentBytesInPerSec,
ProduceMessageConversionsPerSec,
FailedFetchRequestsPerSec,
ZooKeeperSyncConnectsPerSec,
BytesRejectedPerSec,
ZooKeeperAuthFailuresPerSec,
TotalFetchRequestsPerSec,
FailedIsrUpdatesPerSec,
IncrementalFetchSessionEvictionsPerSec,
FetchMessageConversionsPerSec,
FailedProduceRequestsPerSec
}

View file

@ -10,11 +10,6 @@ public final class KafkaConstants {
private KafkaConstants() {
}
public static String IN_BYTE_PER_SEC_METRIC = "incoming-byte-rate";
public static String IN_BYTE_PER_SEC_METRIC_DESCRIPTION = "The number of bytes read off all sockets per second";
public static String OUT_BYTE_PER_SEC_METRIC = "outgoing-byte-rate";
public static String OUT_BYTE_PER_SEC_METRIC_DESCRIPTION = "The number of outgoing bytes sent to all servers per second";
public static Map<String, String> TOPIC_DEFAULT_CONFIGS = Map.ofEntries(
new AbstractMap.SimpleEntry<>(CLEANUP_POLICY_CONFIG, CLEANUP_POLICY_DELETE),
new AbstractMap.SimpleEntry<>(COMPRESSION_TYPE_CONFIG, "producer"),

View file

@ -42,12 +42,14 @@ public class KafkaService {
private final Map<String, ExtendedAdminClient> adminClientCache = new ConcurrentHashMap<>();
private final Map<AdminClient, Map<TopicPartition, Integer>> leadersCache = new ConcurrentHashMap<>();
private final JmxClusterUtil jmxClusterUtil;
private final ClustersStorage clustersStorage;
@SneakyThrows
public Mono<KafkaCluster> getUpdatedCluster(KafkaCluster cluster) {
return getOrCreateAdminClient(cluster).flatMap(
ac -> getClusterMetrics(cluster, ac.getAdminClient())
return getOrCreateAdminClient(cluster)
.flatMap(
ac -> getClusterMetrics(ac.getAdminClient())
.flatMap(i -> fillJmxMetrics(i, cluster.getName(), ac.getAdminClient()))
.flatMap( clusterMetrics ->
getTopicsData(ac.getAdminClient()).flatMap( topics ->
loadTopicsConfig(ac.getAdminClient(), topics.stream().map(InternalTopic::getName).collect(Collectors.toList()))
@ -155,19 +157,13 @@ public class KafkaService {
.map( m -> m.values().stream().map(ClusterUtil::mapToInternalTopic).collect(Collectors.toList()));
}
private Mono<InternalClusterMetrics> getClusterMetrics(KafkaCluster cluster, AdminClient client) {
private Mono<InternalClusterMetrics> getClusterMetrics(AdminClient client) {
return ClusterUtil.toMono(client.describeCluster().nodes())
.flatMap(brokers ->
ClusterUtil.toMono(client.describeCluster().controller()).map(
c -> {
InternalClusterMetrics.InternalClusterMetricsBuilder metricsBuilder = InternalClusterMetrics.builder();
metricsBuilder.brokerCount(brokers.size()).activeControllers(c != null ? 1 : 0);
Map<String, Number> bytesInPerSec = jmxClusterUtil.getJmxTrafficMetrics(cluster.getJmxPort(), c.host(), JmxClusterUtil.BYTES_IN_PER_SEC);
Map<String, Number> bytesOutPerSec = jmxClusterUtil.getJmxTrafficMetrics(cluster.getJmxPort(), c.host(), JmxClusterUtil.BYTES_OUT_PER_SEC);
metricsBuilder
.internalBrokerMetrics((brokers.stream().map(Node::id).collect(Collectors.toMap(k -> k, v -> InternalBrokerMetrics.builder().build()))))
.bytesOutPerSec(bytesOutPerSec)
.bytesInPerSec(bytesInPerSec);
return metricsBuilder.build();
}
)
@ -249,7 +245,7 @@ public class KafkaService {
.flatMap(s -> ClusterUtil.toMono(ac.getAdminClient()
.describeConsumerGroups(s.stream().map(ConsumerGroupListing::groupId).collect(Collectors.toList())).all()))
.map(s -> s.values().stream()
.map(c -> ClusterUtil.convertToConsumerGroup(c, cluster)).collect(Collectors.toList())));
.map(ClusterUtil::convertToConsumerGroup).collect(Collectors.toList())));
}
public KafkaConsumer<Bytes, Bytes> createConsumer(KafkaCluster cluster) {
@ -332,7 +328,7 @@ public class KafkaService {
var brokerSegmentSize = log.get(e.getKey()).values().stream()
.mapToLong(v -> v.replicaInfos.values().stream()
.mapToLong(r -> r.size).sum()).sum();
InternalBrokerMetrics tempBrokerMetrics = InternalBrokerMetrics.builder().segmentSize(brokerSegmentSize).build();
InternalBrokerMetrics tempBrokerMetrics = e.getValue().toBuilder().segmentSize(brokerSegmentSize).build();
return Collections.singletonMap(e.getKey(), tempBrokerMetrics);
});
@ -348,6 +344,39 @@ public class KafkaService {
);
}
public List<Metric> getJmxMetric(String clusterName, Node node) {
return clustersStorage.getClusterByName(clusterName)
.map(c -> jmxClusterUtil.getJmxMetrics(c.getJmxPort(), node.host())).orElse(Collections.emptyList());
}
private Mono<InternalClusterMetrics> fillJmxMetrics (InternalClusterMetrics internalClusterMetrics, String clusterName, AdminClient ac) {
return fillBrokerMetrics(internalClusterMetrics, clusterName, ac).map(this::calculateClusterMetrics);
}
private Mono<InternalClusterMetrics> fillBrokerMetrics(InternalClusterMetrics internalClusterMetrics, String clusterName, AdminClient ac) {
return ClusterUtil.toMono(ac.describeCluster().nodes())
.flatMapIterable(nodes -> nodes)
.map(broker -> Map.of(broker.id(), InternalBrokerMetrics.builder().
jmxMetrics(getJmxMetric(clusterName, broker)).build()))
.collectList()
.map(s -> internalClusterMetrics.toBuilder().internalBrokerMetrics(ClusterUtil.toSingleMap(s.stream())).build());
}
private InternalClusterMetrics calculateClusterMetrics(InternalClusterMetrics internalClusterMetrics) {
return internalClusterMetrics.toBuilder().metrics(
jmxClusterUtil.convertToMetricDto(internalClusterMetrics)
.stream().map(c -> {
Metric jmx = new Metric();
jmx.setCanonicalName(c.getCanonicalName());
jmx.setValue(Map.of(c.getMetricName(), c.getValue()));
return jmx;
}).collect(Collectors.groupingBy(Metric::getCanonicalName, Collectors.reducing(jmxClusterUtil::reduceJmxMetrics)))
.values().stream()
.filter(Optional::isPresent)
.map(Optional::get)
.collect(Collectors.toList())).build();
}
public List<TopicPartitionDto> partitionDtoList (InternalTopic topic, KafkaCluster cluster) {
var topicPartitions = topic.getPartitions().stream().map(t -> new TopicPartition(topic.getName(), t.getPartition())).collect(Collectors.toList());
return getTopicPartitionOffset(cluster, topicPartitions);

View file

@ -30,12 +30,10 @@ public class MetricsRestController implements ApiClustersApi {
}
@Override
public Mono<ResponseEntity<BrokersMetrics>> getBrokersMetrics(String clusterName, ServerWebExchange exchange) {
return Mono.just(
clusterService.getBrokersMetrics(clusterName)
public Mono<ResponseEntity<BrokersMetrics>> getBrokersMetrics(String clusterName, Integer id, ServerWebExchange exchange) {
return clusterService.getBrokersMetrics(clusterName, id)
.map(ResponseEntity::ok)
.orElse(ResponseEntity.notFound().build())
);
.onErrorReturn(ResponseEntity.notFound().build());
}
@Override
@ -98,6 +96,7 @@ public class MetricsRestController implements ApiClustersApi {
return clusterService.updateTopic(clusterId, topicName, topicFormData).map(ResponseEntity::ok);
}
private Mono<ConsumerPosition> parseConsumerPosition(SeekType seekType, List<String> seekTo) {
return Mono.justOrEmpty(seekTo)
.defaultIfEmpty(Collections.emptyList())

View file

@ -1,6 +1,5 @@
package com.provectus.kafka.ui.zookeeper;
import com.provectus.kafka.ui.cluster.model.ClustersStorage;
import com.provectus.kafka.ui.cluster.model.KafkaCluster;
import lombok.RequiredArgsConstructor;
import lombok.extern.log4j.Log4j2;

View file

@ -52,7 +52,7 @@ paths:
items:
$ref: '#/components/schemas/Broker'
/api/clusters/{clusterName}/metrics/broker:
/api/clusters/{clusterName}/metrics/broker/{id}:
get:
tags:
- /api/clusters
@ -64,6 +64,11 @@ paths:
required: true
schema:
type: string
- name: id
in: path
required: true
schema:
type: integer
responses:
200:
description: OK
@ -307,14 +312,10 @@ components:
type: integer
topicCount:
type: integer
bytesInPerSec:
type: object
additionalProperties:
type: number
bytesOutPerSec:
type: object
additionalProperties:
type: number
metrics:
type: array
items:
$ref: '#/components/schemas/Metric'
required:
- id
- name
@ -345,6 +346,10 @@ components:
type: integer
segmentZise:
type: integer
metrics:
type: array
items:
$ref: '#/components/schemas/Metric'
Topic:
type: object
@ -433,6 +438,8 @@ components:
properties:
id:
type: string
host:
type: string
ConsumerGroup:
type: object
@ -527,3 +534,13 @@ components:
type: array
items:
$ref: '#/components/schemas/ConsumerTopicPartitionDetail'
Metric:
type: object
properties:
canonicalName:
type: string
value:
type: string
additionalProperties:
type: number