JmxClusterUtil.java 5.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138
  1. package com.provectus.kafka.ui.util;
  2. import com.provectus.kafka.ui.model.Metric;
  3. import java.io.IOException;
  4. import java.math.BigDecimal;
  5. import java.net.MalformedURLException;
  6. import java.util.ArrayList;
  7. import java.util.Arrays;
  8. import java.util.HashMap;
  9. import java.util.Hashtable;
  10. import java.util.List;
  11. import java.util.Map;
  12. import java.util.Optional;
  13. import java.util.stream.Collectors;
  14. import java.util.stream.Stream;
  15. import javax.management.AttributeNotFoundException;
  16. import javax.management.InstanceNotFoundException;
  17. import javax.management.MBeanAttributeInfo;
  18. import javax.management.MBeanException;
  19. import javax.management.MBeanServerConnection;
  20. import javax.management.MalformedObjectNameException;
  21. import javax.management.ObjectName;
  22. import javax.management.ReflectionException;
  23. import javax.management.remote.JMXConnector;
  24. import lombok.RequiredArgsConstructor;
  25. import lombok.extern.slf4j.Slf4j;
  26. import org.apache.commons.pool2.KeyedObjectPool;
  27. import org.springframework.stereotype.Component;
  28. @Component
  29. @Slf4j
  30. @RequiredArgsConstructor
  31. public class JmxClusterUtil {
  32. private static final String JMX_URL = "service:jmx:rmi:///jndi/rmi://";
  33. private static final String JMX_SERVICE_TYPE = "jmxrmi";
  34. private static final String KAFKA_SERVER_PARAM = "kafka.server";
  35. private static final String NAME_METRIC_FIELD = "name";
  36. private final KeyedObjectPool<String, JMXConnector> pool;
  37. public List<Metric> getJmxMetrics(int jmxPort, String jmxHost) {
  38. String jmxUrl = JMX_URL + jmxHost + ":" + jmxPort + "/" + JMX_SERVICE_TYPE;
  39. List<Metric> result = new ArrayList<>();
  40. JMXConnector srv = null;
  41. try {
  42. srv = pool.borrowObject(jmxUrl);
  43. MBeanServerConnection msc = srv.getMBeanServerConnection();
  44. var jmxMetrics = msc.queryNames(null, null).stream()
  45. .filter(q -> q.getCanonicalName().startsWith(KAFKA_SERVER_PARAM))
  46. .collect(Collectors.toList());
  47. for (ObjectName jmxMetric : jmxMetrics) {
  48. final Hashtable<String, String> params = jmxMetric.getKeyPropertyList();
  49. Metric metric = new Metric();
  50. metric.setName(params.get(NAME_METRIC_FIELD));
  51. metric.setCanonicalName(jmxMetric.getCanonicalName());
  52. metric.setParams(params);
  53. metric.setValue(getJmxMetric(jmxMetric.getCanonicalName(), msc, srv, jmxUrl));
  54. result.add(metric);
  55. }
  56. pool.returnObject(jmxUrl, srv);
  57. } catch (IOException ioe) {
  58. log.error("Cannot get jmxMetricsNames, {}", jmxUrl, ioe);
  59. closeConnectionExceptionally(jmxUrl, srv);
  60. } catch (Exception e) {
  61. log.error("Cannot get JmxConnection from pool, {}", jmxUrl, e);
  62. closeConnectionExceptionally(jmxUrl, srv);
  63. }
  64. return result;
  65. }
  66. private Map<String, BigDecimal> getJmxMetric(String canonicalName, MBeanServerConnection msc,
  67. JMXConnector srv, String jmxUrl) {
  68. Map<String, BigDecimal> resultAttr = new HashMap<>();
  69. try {
  70. ObjectName name = new ObjectName(canonicalName);
  71. var attrNames = msc.getMBeanInfo(name).getAttributes();
  72. for (MBeanAttributeInfo attrName : attrNames) {
  73. var value = msc.getAttribute(name, attrName.getName());
  74. if (value instanceof Number) {
  75. if (!(value instanceof Double) || !((Double) value).isInfinite()) {
  76. resultAttr.put(attrName.getName(), new BigDecimal(value.toString()));
  77. }
  78. }
  79. }
  80. } catch (MalformedURLException url) {
  81. log.error("Cannot create JmxServiceUrl from {}", jmxUrl);
  82. closeConnectionExceptionally(jmxUrl, srv);
  83. } catch (IOException io) {
  84. log.error("Cannot connect to KafkaJmxServer with url {}", jmxUrl);
  85. closeConnectionExceptionally(jmxUrl, srv);
  86. } catch (MBeanException | AttributeNotFoundException
  87. | InstanceNotFoundException | ReflectionException e) {
  88. log.error("Cannot find attribute", e);
  89. closeConnectionExceptionally(jmxUrl, srv);
  90. } catch (MalformedObjectNameException objectNameE) {
  91. log.error("Cannot create objectName", objectNameE);
  92. closeConnectionExceptionally(jmxUrl, srv);
  93. } catch (Exception e) {
  94. log.error("Error while retrieving connection {} from pool", jmxUrl);
  95. closeConnectionExceptionally(jmxUrl, srv);
  96. }
  97. return resultAttr;
  98. }
  99. private void closeConnectionExceptionally(String url, JMXConnector srv) {
  100. try {
  101. pool.invalidateObject(url, srv);
  102. } catch (Exception e) {
  103. log.error("Cannot invalidate object in pool, {}", url);
  104. }
  105. }
  106. public Metric reduceJmxMetrics(Metric metric1, Metric metric2) {
  107. var result = new Metric();
  108. Map<String, BigDecimal> value = Stream.concat(
  109. metric1.getValue().entrySet().stream(),
  110. metric2.getValue().entrySet().stream()
  111. ).collect(Collectors.groupingBy(
  112. Map.Entry::getKey,
  113. Collectors.reducing(BigDecimal.ZERO, Map.Entry::getValue, BigDecimal::add)
  114. ));
  115. result.setName(metric1.getName());
  116. result.setCanonicalName(metric1.getCanonicalName());
  117. result.setParams(metric1.getParams());
  118. result.setValue(value);
  119. return result;
  120. }
  121. private boolean isWellKnownMetric(Metric metric) {
  122. final Optional<String> param =
  123. Optional.ofNullable(metric.getParams().get(NAME_METRIC_FIELD)).filter(p ->
  124. Arrays.stream(JmxMetricsName.values()).map(Enum::name)
  125. .anyMatch(n -> n.equals(p))
  126. );
  127. return metric.getCanonicalName().contains(KAFKA_SERVER_PARAM) && param.isPresent();
  128. }
  129. }