Переглянути джерело

Add io-metrics to topic and broker

Signed-off-by: daliu <liudaax@126.com>
daliu 1 рік тому
батько
коміт
dd23835893

+ 12 - 0
kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/TopicsController.java

@@ -371,6 +371,18 @@ public class TopicsController extends AbstractController implements TopicsApi {
         return Comparator.comparing(InternalTopic::getReplicationFactor);
       case SIZE:
         return Comparator.comparing(InternalTopic::getSegmentSize);
+      case BYTESIN_PERSEC:
+        return Comparator.comparing(InternalTopic::getBytesInPerSec);
+      case BYTESOUT_PERSEC:
+        return Comparator.comparing(InternalTopic::getBytesOutPerSec);
+      case MSG_RATE:
+      	return Comparator.comparing(InternalTopic::getMessageInMeanRate);
+      case MSG_5_RATE:
+      	return Comparator.comparing(InternalTopic::getMessageInFiveMinuteRate);
+      case FETCH_RATE:
+      	return Comparator.comparing(InternalTopic::getFetchRequestsMeanRate);
+      case FETCH_5_RATE:
+      	return Comparator.comparing(InternalTopic::getFetchRequestsFiveMinuteRate);
       case NAME:
       default:
         return defaultComparator;

+ 61 - 2
kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/InternalTopic.java

@@ -2,6 +2,7 @@ package com.provectus.kafka.ui.model;
 
 import com.provectus.kafka.ui.config.ClustersProperties;
 import java.math.BigDecimal;
+import java.math.RoundingMode;
 import java.util.List;
 import java.util.Map;
 import java.util.stream.Collectors;
@@ -35,6 +36,18 @@ public class InternalTopic {
   // rates from metrics
   private final BigDecimal bytesInPerSec;
   private final BigDecimal bytesOutPerSec;
+  private final BigDecimal messageInMeanRate;
+  private final BigDecimal messageInOneMinuteRate;
+  private final BigDecimal messageInFiveMinuteRate;
+  private final BigDecimal messageInFifteenMinuteRate;
+  private final BigDecimal fetchRequestsMeanRate;
+  private final BigDecimal fetchRequestsOneMinuteRate;
+  private final BigDecimal fetchRequestsFiveMinuteRate;
+  private final BigDecimal fetchRequestsFifteenMinuteRate;
+  private final BigDecimal produceRequestsMeanRate;
+  private final BigDecimal produceRequestsOneMinuteRate;
+  private final BigDecimal produceRequestsFiveMinuteRate;
+  private final BigDecimal produceRequestsFifteenMinuteRate;
 
   // from log dir data
   private final long segmentSize;
@@ -114,8 +127,54 @@ public class InternalTopic {
       topic.segmentSize(segmentStats.getSegmentSize());
     }
 
-    topic.bytesInPerSec(metrics.getTopicBytesInPerSec().get(topicDescription.name()));
-    topic.bytesOutPerSec(metrics.getTopicBytesOutPerSec().get(topicDescription.name()));
+    topic.bytesInPerSec(metrics.getTopicBytesInPerSec().get(topicDescription.name()) == null
+            ? BigDecimal.ZERO : metrics.getTopicBytesInPerSec().get(topicDescription.name()));
+    topic.bytesOutPerSec(metrics.getTopicBytesOutPerSec().get(topicDescription.name()) == null 
+            ? BigDecimal.ZERO : metrics.getTopicBytesOutPerSec().get(topicDescription.name()));
+        
+    topic.messageInMeanRate(metrics.getMessageInMeanRate().get(topicDescription.name()) == null
+            ? BigDecimal.ZERO 
+            : metrics.getMessageInMeanRate().get(topicDescription.name()).setScale(2, RoundingMode.HALF_UP));
+    topic.messageInOneMinuteRate(metrics.getMessageInOneMinuteRate().get(topicDescription.name()) == null
+            ? BigDecimal.ZERO 
+            : metrics.getMessageInOneMinuteRate().get(topicDescription.name()).setScale(2, RoundingMode.HALF_UP));
+    topic.messageInFiveMinuteRate(metrics.getMessageInFiveMinuteRate().get(topicDescription.name()) == null
+            ? BigDecimal.ZERO 
+            : metrics.getMessageInFiveMinuteRate().get(topicDescription.name()).setScale(2, RoundingMode.HALF_UP));
+    topic.messageInFifteenMinuteRate(metrics.getMessageInFifteenMinuteRate().get(topicDescription.name()) == null
+            ? BigDecimal.ZERO 
+            : metrics.getMessageInFifteenMinuteRate().get(topicDescription.name()).setScale(2, RoundingMode.HALF_UP));
+        
+    topic.fetchRequestsMeanRate(metrics.getFetchRequestsMeanRate().get(topicDescription.name()) == null
+            ? BigDecimal.ZERO 
+            : metrics.getFetchRequestsMeanRate().get(topicDescription.name()).setScale(2, RoundingMode.HALF_UP));
+    topic.fetchRequestsOneMinuteRate(metrics.getFetchRequestsOneMinuteRate().get(topicDescription.name()) == null
+            ? BigDecimal.ZERO 
+            : metrics.getFetchRequestsOneMinuteRate().get(topicDescription.name()).setScale(2, RoundingMode.HALF_UP));
+    topic.fetchRequestsFiveMinuteRate(metrics.getFetchRequestsFiveMinuteRate().get(topicDescription.name()) == null
+            ? BigDecimal.ZERO 
+            : metrics.getFetchRequestsFiveMinuteRate().get(topicDescription.name()).setScale(2, RoundingMode.HALF_UP));
+    topic.fetchRequestsFifteenMinuteRate(metrics.getFetchRequestsFifteenMinuteRate().get(
+            topicDescription.name()) == null
+            ? BigDecimal.ZERO 
+            : metrics.getFetchRequestsFifteenMinuteRate().get(topicDescription.name())
+                .setScale(2, RoundingMode.HALF_UP));
+            
+    topic.produceRequestsMeanRate(metrics.getProduceRequestsMeanRate().get(topicDescription.name()) == null
+            ? BigDecimal.ZERO 
+            : metrics.getProduceRequestsMeanRate().get(topicDescription.name()).setScale(2, RoundingMode.HALF_UP));
+    topic.produceRequestsOneMinuteRate(metrics.getProduceRequestsOneMinuteRate().get(topicDescription.name()) == null
+            ? BigDecimal.ZERO 
+            : metrics.getProduceRequestsOneMinuteRate().get(topicDescription.name()).setScale(2, RoundingMode.HALF_UP));
+    topic.produceRequestsFiveMinuteRate(metrics.getProduceRequestsFiveMinuteRate().get(topicDescription.name()) == null
+            ? BigDecimal.ZERO 
+            : metrics.getProduceRequestsFiveMinuteRate().get(topicDescription.name())
+                .setScale(2, RoundingMode.HALF_UP));
+    topic.produceRequestsFifteenMinuteRate(metrics.getProduceRequestsFifteenMinuteRate().get(
+            topicDescription.name()) == null
+            ? BigDecimal.ZERO 
+            : metrics.getProduceRequestsFifteenMinuteRate().get(topicDescription.name())
+                .setScale(2, RoundingMode.HALF_UP));
 
     topic.topicConfigs(
         configs.stream().map(InternalTopicConfig::from).collect(Collectors.toList()));

+ 24 - 0
kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/Metrics.java

@@ -21,6 +21,18 @@ public class Metrics {
   Map<String, BigDecimal> topicBytesInPerSec;
   Map<String, BigDecimal> topicBytesOutPerSec;
   Map<Integer, List<RawMetric>> perBrokerMetrics;
+  Map<String, BigDecimal> messageInMeanRate;
+  Map<String, BigDecimal> messageInOneMinuteRate;
+  Map<String, BigDecimal> messageInFiveMinuteRate;
+  Map<String, BigDecimal> messageInFifteenMinuteRate;
+  Map<String, BigDecimal> fetchRequestsMeanRate;
+  Map<String, BigDecimal> fetchRequestsOneMinuteRate;
+  Map<String, BigDecimal> fetchRequestsFiveMinuteRate;
+  Map<String, BigDecimal> fetchRequestsFifteenMinuteRate;
+  Map<String, BigDecimal> produceRequestsMeanRate;
+  Map<String, BigDecimal> produceRequestsOneMinuteRate;
+  Map<String, BigDecimal> produceRequestsFiveMinuteRate;
+  Map<String, BigDecimal> produceRequestsFifteenMinuteRate;
 
   public static Metrics empty() {
     return Metrics.builder()
@@ -29,6 +41,18 @@ public class Metrics {
         .topicBytesInPerSec(Map.of())
         .topicBytesOutPerSec(Map.of())
         .perBrokerMetrics(Map.of())
+        .messageInMeanRate(Map.of())
+       .messageInOneMinuteRate(Map.of())
+       .messageInFiveMinuteRate(Map.of())
+       .messageInFifteenMinuteRate(Map.of())
+       .fetchRequestsMeanRate(Map.of())
+       .fetchRequestsOneMinuteRate(Map.of())
+       .fetchRequestsFiveMinuteRate(Map.of())
+       .fetchRequestsFifteenMinuteRate(Map.of())
+       .produceRequestsMeanRate(Map.of())
+       .produceRequestsOneMinuteRate(Map.of())
+       .produceRequestsFiveMinuteRate(Map.of())
+       .produceRequestsFifteenMinuteRate(Map.of())
         .build();
   }
 

+ 86 - 12
kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/metrics/WellKnownMetrics.java

@@ -12,6 +12,9 @@ import org.apache.kafka.common.Node;
 class WellKnownMetrics {
 
   private static final String BROKER_TOPIC_METRICS = "BrokerTopicMetrics";
+  private static final String MEAN_RATE = "MeanRate";
+  private static final String ONE_MINUTE_RATE = "OneMinuteRate";
+  private static final String FIVE_MINUTE_RATE = "FiveMinuteRate";
   private static final String FIFTEEN_MINUTE_RATE = "FifteenMinuteRate";
 
   // per broker
@@ -21,6 +24,18 @@ class WellKnownMetrics {
   // per topic
   final Map<String, BigDecimal> bytesInFifteenMinuteRate = new HashMap<>();
   final Map<String, BigDecimal> bytesOutFifteenMinuteRate = new HashMap<>();
+  final Map<String, BigDecimal> messageInMeanRate = new HashMap<>();
+  final Map<String, BigDecimal> messageInOneMinuteRate = new HashMap<>();
+  final Map<String, BigDecimal> messageInFiveMinuteRate = new HashMap<>();
+  final Map<String, BigDecimal> messageInFifteenMinuteRate = new HashMap<>();
+  final Map<String, BigDecimal> fetchRequestsMeanRate = new HashMap<>();
+  final Map<String, BigDecimal> fetchRequestsOneMinuteRate = new HashMap<>();
+  final Map<String, BigDecimal> fetchRequestsFiveMinuteRate = new HashMap<>();
+  final Map<String, BigDecimal> fetchRequestsFifteenMinuteRate = new HashMap<>();
+  final Map<String, BigDecimal> produceRequestsMeanRate = new HashMap<>();
+  final Map<String, BigDecimal> produceRequestsOneMinuteRate = new HashMap<>();
+  final Map<String, BigDecimal> produceRequestsFiveMinuteRate = new HashMap<>();
+  final Map<String, BigDecimal> produceRequestsFifteenMinuteRate = new HashMap<>();
 
   void populate(Node node, RawMetric rawMetric) {
     updateBrokerIOrates(node, rawMetric);
@@ -32,6 +47,18 @@ class WellKnownMetrics {
     metricsBuilder.topicBytesOutPerSec(bytesOutFifteenMinuteRate);
     metricsBuilder.brokerBytesInPerSec(brokerBytesInFifteenMinuteRate);
     metricsBuilder.brokerBytesOutPerSec(brokerBytesOutFifteenMinuteRate);
+    metricsBuilder.messageInMeanRate(messageInMeanRate);
+	metricsBuilder.messageInOneMinuteRate(messageInOneMinuteRate);
+	metricsBuilder.messageInFiveMinuteRate(messageInFiveMinuteRate);
+	metricsBuilder.messageInFifteenMinuteRate(messageInFifteenMinuteRate);
+	metricsBuilder.fetchRequestsMeanRate(fetchRequestsMeanRate);
+	metricsBuilder.fetchRequestsOneMinuteRate(fetchRequestsOneMinuteRate);
+	metricsBuilder.fetchRequestsFiveMinuteRate(fetchRequestsFiveMinuteRate);
+	metricsBuilder.fetchRequestsFifteenMinuteRate(fetchRequestsFifteenMinuteRate);
+	metricsBuilder.produceRequestsMeanRate(produceRequestsMeanRate);
+	metricsBuilder.produceRequestsOneMinuteRate(produceRequestsOneMinuteRate);
+	metricsBuilder.produceRequestsFiveMinuteRate(produceRequestsFiveMinuteRate);
+	metricsBuilder.produceRequestsFifteenMinuteRate(produceRequestsFifteenMinuteRate);
   }
 
   private void updateBrokerIOrates(Node node, RawMetric rawMetric) {
@@ -53,18 +80,65 @@ class WellKnownMetrics {
   }
 
   private void updateTopicsIOrates(RawMetric rawMetric) {
-    String name = rawMetric.name();
-    String topic = rawMetric.labels().get("topic");
-    if (topic != null
-        && containsIgnoreCase(name, BROKER_TOPIC_METRICS)
-        && endsWithIgnoreCase(name, FIFTEEN_MINUTE_RATE)) {
-      String nameProperty = rawMetric.labels().get("name");
-      if ("BytesInPerSec".equalsIgnoreCase(nameProperty)) {
-        bytesInFifteenMinuteRate.compute(topic, (k, v) -> v == null ? rawMetric.value() : v.add(rawMetric.value()));
-      } else if ("BytesOutPerSec".equalsIgnoreCase(nameProperty)) {
-        bytesOutFifteenMinuteRate.compute(topic, (k, v) -> v == null ? rawMetric.value() : v.add(rawMetric.value()));
-      }
-    }
+	String name = rawMetric.name();
+	String topic = rawMetric.labels().get("topic");
+	if (topic != null && containsIgnoreCase(name, BROKER_TOPIC_METRICS)) {
+	  if (endsWithIgnoreCase(name, MEAN_RATE)) {
+		String nameProperty = rawMetric.labels().get("name");
+		if ("MessagesInPerSec".equalsIgnoreCase(nameProperty)) {
+		  messageInMeanRate.compute(topic,
+				(k, v) -> v == null ? rawMetric.value() : v.add(rawMetric.value()));
+		} else if ("TotalFetchRequestsPerSec".equalsIgnoreCase(nameProperty)) {
+		  fetchRequestsMeanRate.compute(topic,
+				(k, v) -> v == null ? rawMetric.value() : v.add(rawMetric.value()));
+		} else if ("TotalProduceRequestsPerSec".equalsIgnoreCase(nameProperty)) {
+		  produceRequestsMeanRate.compute(topic,
+				(k, v) -> v == null ? rawMetric.value() : v.add(rawMetric.value()));
+		}
+	  }else if (endsWithIgnoreCase(name, ONE_MINUTE_RATE)) {
+		String nameProperty = rawMetric.labels().get("name");
+		if ("MessagesInPerSec".equalsIgnoreCase(nameProperty)) {
+		  messageInOneMinuteRate.compute(topic,
+				(k, v) -> v == null ? rawMetric.value() : v.add(rawMetric.value()));
+		} else if ("TotalFetchRequestsPerSec".equalsIgnoreCase(nameProperty)) {
+		  fetchRequestsOneMinuteRate.compute(topic,
+				(k, v) -> v == null ? rawMetric.value() : v.add(rawMetric.value()));
+		} else if ("TotalProduceRequestsPerSec".equalsIgnoreCase(nameProperty)) {
+		  produceRequestsOneMinuteRate.compute(topic,
+				(k, v) -> v == null ? rawMetric.value() : v.add(rawMetric.value()));
+		}
+	  }else if (endsWithIgnoreCase(name, FIVE_MINUTE_RATE)) {
+		String nameProperty = rawMetric.labels().get("name");
+		if ("MessagesInPerSec".equalsIgnoreCase(nameProperty)) {
+		  messageInFiveMinuteRate.compute(topic,
+				(k, v) -> v == null ? rawMetric.value() : v.add(rawMetric.value()));
+		} else if ("TotalFetchRequestsPerSec".equalsIgnoreCase(nameProperty)) {
+		  fetchRequestsFiveMinuteRate.compute(topic,
+				(k, v) -> v == null ? rawMetric.value() : v.add(rawMetric.value()));
+		} else if ("TotalProduceRequestsPerSec".equalsIgnoreCase(nameProperty)) {
+		  produceRequestsFiveMinuteRate.compute(topic,
+				(k, v) -> v == null ? rawMetric.value() : v.add(rawMetric.value()));
+		}
+	  }else if (endsWithIgnoreCase(name, FIFTEEN_MINUTE_RATE)) {
+		String nameProperty = rawMetric.labels().get("name");
+		if ("BytesInPerSec".equalsIgnoreCase(nameProperty)) {
+		  bytesInFifteenMinuteRate.compute(topic,
+			    (k, v) -> v == null ? rawMetric.value() : v.add(rawMetric.value()));
+		} else if ("BytesOutPerSec".equalsIgnoreCase(nameProperty)) {
+		  bytesOutFifteenMinuteRate.compute(topic,
+				(k, v) -> v == null ? rawMetric.value() : v.add(rawMetric.value()));
+		} else if ("MessagesInPerSec".equalsIgnoreCase(nameProperty)) {
+		  messageInFifteenMinuteRate.compute(topic,
+				(k, v) -> v == null ? rawMetric.value() : v.add(rawMetric.value()));
+		} else if ("TotalFetchRequestsPerSec".equalsIgnoreCase(nameProperty)) {
+		  fetchRequestsFifteenMinuteRate.compute(topic,
+				(k, v) -> v == null ? rawMetric.value() : v.add(rawMetric.value()));
+		} else if ("TotalProduceRequestsPerSec".equalsIgnoreCase(nameProperty)) {
+		  produceRequestsFifteenMinuteRate.compute(topic,
+				(k, v) -> v == null ? rawMetric.value() : v.add(rawMetric.value()));
+		}
+	  }
+	}
   }
 
 }

+ 30 - 0
kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml

@@ -2318,6 +2318,12 @@ components:
         - TOTAL_PARTITIONS
         - REPLICATION_FACTOR
         - SIZE
+        - BYTESIN_PERSEC
+        - BYTESOUT_PERSEC
+        - MSG_RATE
+        - MSG_5_RATE
+        - FETCH_RATE
+        - FETCH_5_RATE
 
     ConnectorColumnsToSort:
       type: string
@@ -2357,6 +2363,30 @@ components:
           type: number
         bytesOutPerSec:
           type: number
+        messageInMeanRate:
+          type: number
+        messageInOneMinuteRate:
+          type: number
+        messageInFiveMinuteRate:
+          type: number
+        messageInFifteenMinuteRate:
+          type: number
+        fetchRequestsMeanRate:
+          type: number
+        fetchRequestsOneMinuteRate:
+          type: number
+        fetchRequestsFiveMinuteRate:
+          type: number
+        fetchRequestsFifteenMinuteRate:
+          type: number
+        produceRequestsMeanRate:
+          type: number
+        produceRequestsOneMinuteRate:
+          type: number
+        produceRequestsFiveMinuteRate:
+          type: number
+        produceRequestsFifteenMinuteRate:
+          type: number
         underReplicatedPartitions:
           type: integer
         cleanUpPolicy:

+ 4 - 0
kafka-ui-react-app/src/components/Brokers/BrokersList/BrokersList.tsx

@@ -63,6 +63,8 @@ const BrokersList: React.FC = () => {
         partitionsSkew: broker?.partitionsSkew,
         leadersSkew: broker?.leadersSkew,
         inSyncPartitions: broker?.inSyncPartitions,
+        bytesInPerSec: broker?.bytesInPerSec,
+        bytesOutPerSec: broker?.bytesOutPerSec,
       };
     });
   }, [diskUsage, brokers]);
@@ -160,6 +162,8 @@ const BrokersList: React.FC = () => {
         header: 'Host',
         accessorKey: 'host',
       },
+      { header: 'IN /sec', accessorKey: 'bytesInPerSec',  cell: SizeCell},
+      { header: 'OUT /sec', accessorKey: 'bytesOutPerSec', cell: SizeCell},
     ],
     []
   );

+ 48 - 0
kafka-ui-react-app/src/components/Topics/List/TopicTable.tsx

@@ -65,6 +65,54 @@ const TopicTable: React.FC = () => {
         accessorKey: 'replicationFactor',
         enableSorting: false,
       },
+      {
+        id: TopicColumnsToSort.BYTESIN_PERSEC,
+        header: 'BytesIn /sec',
+        accessorKey: 'bytesInPerSec',
+        enableSorting: true,
+        cell: SizeCell,
+      },
+      {
+        id: TopicColumnsToSort.BYTESOUT_PERSEC,
+        header: 'BytesOut /sec',
+        accessorKey: 'bytesOutPerSec',
+        enableSorting: true,
+        cell: SizeCell,
+      },
+      {
+        id: TopicColumnsToSort.MSG_RATE,
+        header: 'Msg /sec',
+        accessorKey: 'messageInMeanRate',
+        enableSorting: true,
+      },
+      {
+        id: TopicColumnsToSort.MSG_5_RATE,
+        header: 'Msg-5min /sec',
+        accessorKey: 'messageInFiveMinuteRate',
+        enableSorting: true,
+      },
+      {
+        id: TopicColumnsToSort.FETCH_RATE,
+        header: 'Fetch /sec',
+        accessorKey: 'fetchRequestsMeanRate',
+        enableSorting: true,
+      },
+      {
+        id: TopicColumnsToSort.FETCH_5_RATE,
+        header: 'Fetch-5min /sec',
+        accessorKey: 'fetchRequestsFiveMinuteRate',
+        enableSorting: true,
+      },
+      {
+        header: 'Produce /sec',
+        accessorKey: 'produceRequestsMeanRate',
+        enableSorting: false,
+      },
+      {
+        header: 'Produce-5min /sec',
+        accessorKey: 'produceRequestsFiveMinuteRate',
+        enableSorting: false,
+      },
       {
         header: 'Number of messages',
         accessorKey: 'partitions',