|
@@ -47,6 +47,9 @@ public class KafkaService {
|
|
var kafkaCluster1 = metrics.getKafkaCluster();
|
|
var kafkaCluster1 = metrics.getKafkaCluster();
|
|
kafkaCluster1.getCluster().setBytesInPerSec(metrics.getBytesInPerSec());
|
|
kafkaCluster1.getCluster().setBytesInPerSec(metrics.getBytesInPerSec());
|
|
kafkaCluster1.getCluster().setBytesOutPerSec(metrics.getBytesOutPerSec());
|
|
kafkaCluster1.getCluster().setBytesOutPerSec(metrics.getBytesOutPerSec());
|
|
|
|
+ kafkaCluster1.getCluster().setBrokerCount(metrics.getBrokerCount());
|
|
|
|
+ kafkaCluster1.getBrokersMetrics().setBrokerCount(metrics.getBrokerCount());
|
|
|
|
+ kafkaCluster1.getBrokersMetrics().setActiveControllers(metrics.getActiveControllers());
|
|
return clusterId;
|
|
return clusterId;
|
|
}).flatMap(this::updateTopicsData);
|
|
}).flatMap(this::updateTopicsData);
|
|
}
|
|
}
|
|
@@ -218,11 +221,10 @@ public class KafkaService {
|
|
metrics.setKafkaCluster(kafkaCluster);
|
|
metrics.setKafkaCluster(kafkaCluster);
|
|
return ClusterUtil.toMono(adminClient.describeCluster().nodes()).flatMap(brokers -> {
|
|
return ClusterUtil.toMono(adminClient.describeCluster().nodes()).flatMap(brokers -> {
|
|
var brokerCount = brokers.size();
|
|
var brokerCount = brokers.size();
|
|
- kafkaCluster.getCluster().setBrokerCount(brokerCount);
|
|
|
|
- kafkaCluster.getBrokersMetrics().setBrokerCount(brokerCount);
|
|
|
|
|
|
+ metrics.setBrokerCount(brokerCount);
|
|
return ClusterUtil.toMono(adminClient.describeCluster().controller());
|
|
return ClusterUtil.toMono(adminClient.describeCluster().controller());
|
|
}).map(c -> {
|
|
}).map(c -> {
|
|
- kafkaCluster.getBrokersMetrics().setActiveControllers(c != null ? 1 : 0);
|
|
|
|
|
|
+ metrics.setActiveControllers(c != null ? 1 : 0);
|
|
for (Map.Entry<MetricName, ? extends Metric> metricNameEntry : adminClient.metrics().entrySet()) {
|
|
for (Map.Entry<MetricName, ? extends Metric> metricNameEntry : adminClient.metrics().entrySet()) {
|
|
if (metricNameEntry.getKey().name().equals(IN_BYTE_PER_SEC_METRIC)
|
|
if (metricNameEntry.getKey().name().equals(IN_BYTE_PER_SEC_METRIC)
|
|
&& metricNameEntry.getKey().description().equals(IN_BYTE_PER_SEC_METRIC_DESCRIPTION)) {
|
|
&& metricNameEntry.getKey().description().equals(IN_BYTE_PER_SEC_METRIC_DESCRIPTION)) {
|