BugFix ActiveController not correct
Signed-off-by: daliu <liudaax@126.com>
This commit is contained in:
parent
77d4b5f48f
commit
b6818f6542
4 changed files with 28 additions and 5 deletions
|
@ -38,9 +38,12 @@ public class InternalClusterState {
|
|||
.orElse(null);
|
||||
topicCount = statistics.getTopicDescriptions().size();
|
||||
brokerCount = statistics.getClusterDescription().getNodes().size();
|
||||
activeControllers = Optional.ofNullable(statistics.getClusterDescription().getController())
|
||||
.map(Node::id)
|
||||
.orElse(null);
|
||||
activeControllers = statistics.getMetrics().getController();
|
||||
if (activeControllers == null || activeControllers < 0) {
|
||||
activeControllers = Optional.ofNullable(statistics.getClusterDescription().getController())
|
||||
.map(Node::id)
|
||||
.orElse(null);
|
||||
}
|
||||
version = statistics.getVersion();
|
||||
|
||||
if (statistics.getLogDirInfo() != null) {
|
||||
|
|
|
@ -33,6 +33,7 @@ public class Metrics {
|
|||
Map<String, BigDecimal> produceRequestsOneMinuteRate;
|
||||
Map<String, BigDecimal> produceRequestsFiveMinuteRate;
|
||||
Map<String, BigDecimal> produceRequestsFifteenMinuteRate;
|
||||
Integer controller;
|
||||
|
||||
public static Metrics empty() {
|
||||
return Metrics.builder()
|
||||
|
|
|
@ -37,6 +37,7 @@ class JmxMetricsRetriever implements MetricsRetriever, Closeable {
|
|||
private static final String JMX_URL = "service:jmx:rmi:///jndi/rmi://";
|
||||
private static final String JMX_SERVICE_TYPE = "jmxrmi";
|
||||
private static final String CANONICAL_NAME_PATTERN = "kafka.server*:*";
|
||||
private static final String CONTROLLER_NAME_PATTERN = "kafka.controller*:*";
|
||||
|
||||
@Override
|
||||
public void close() {
|
||||
|
@ -118,6 +119,10 @@ class JmxMetricsRetriever implements MetricsRetriever, Closeable {
|
|||
for (ObjectName jmxMetric : jmxMetrics) {
|
||||
sink.addAll(extractObjectMetrics(jmxMetric, msc));
|
||||
}
|
||||
var controllerMetrics = msc.queryNames(new ObjectName(CONTROLLER_NAME_PATTERN), null);
|
||||
for (ObjectName jmxMetric : controllerMetrics) {
|
||||
sink.addAll(extractObjectMetrics(jmxMetric, msc));
|
||||
}
|
||||
}
|
||||
|
||||
@SneakyThrows
|
||||
|
|
|
@ -36,9 +36,11 @@ class WellKnownMetrics {
|
|||
final Map<String, BigDecimal> produceRequestsOneMinuteRate = new HashMap<>();
|
||||
final Map<String, BigDecimal> produceRequestsFiveMinuteRate = new HashMap<>();
|
||||
final Map<String, BigDecimal> produceRequestsFifteenMinuteRate = new HashMap<>();
|
||||
|
||||
int controller = -1;
|
||||
|
||||
void populate(Node node, RawMetric rawMetric) {
|
||||
updateBrokerIOrates(node, rawMetric);
|
||||
updateController(node, rawMetric);
|
||||
updateTopicsIOrates(rawMetric);
|
||||
}
|
||||
|
||||
|
@ -59,6 +61,7 @@ class WellKnownMetrics {
|
|||
metricsBuilder.produceRequestsOneMinuteRate(produceRequestsOneMinuteRate);
|
||||
metricsBuilder.produceRequestsFiveMinuteRate(produceRequestsFiveMinuteRate);
|
||||
metricsBuilder.produceRequestsFifteenMinuteRate(produceRequestsFifteenMinuteRate);
|
||||
metricsBuilder.controller(controller);
|
||||
}
|
||||
|
||||
private void updateBrokerIOrates(Node node, RawMetric rawMetric) {
|
||||
|
@ -140,5 +143,16 @@ class WellKnownMetrics {
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
private void updateController(Node node, RawMetric rawMetric) {
|
||||
if (controller < 0 && containsIgnoreCase(rawMetric.name(), "KafkaController")) {
|
||||
String nameProperty = rawMetric.labels().get("name");
|
||||
if ("ActiveControllerCount".equals(nameProperty)) {
|
||||
BigDecimal value = rawMetric.value();
|
||||
if (value.intValue() == 1) {
|
||||
controller = node.id();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Add table
Reference in a new issue