Jelajahi Sumber

Merge branch 'feature/12-consumer-group-details' into backend-group-details

Roman Nedzvetskiy 5 tahun lalu
induk
melakukan
29493c738e

+ 14 - 5
kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/service/ClusterService.java

@@ -2,14 +2,12 @@ package com.provectus.kafka.ui.cluster.service;
 
 import com.provectus.kafka.ui.cluster.model.ClustersStorage;
 import com.provectus.kafka.ui.cluster.model.KafkaCluster;
+import com.provectus.kafka.ui.cluster.util.ClusterUtil;
 import com.provectus.kafka.ui.kafka.KafkaService;
 import com.provectus.kafka.ui.model.*;
 import lombok.RequiredArgsConstructor;
-import org.apache.kafka.clients.consumer.ConsumerConfig;
-import org.apache.kafka.clients.consumer.ConsumerGroupMetadata;
-import org.apache.kafka.clients.consumer.KafkaConsumer;
-import org.apache.kafka.common.KafkaFuture;
-import org.apache.zookeeper.ZooKeeper;
+import lombok.SneakyThrows;
+import org.apache.kafka.clients.admin.ConsumerGroupListing;
 import org.springframework.http.ResponseEntity;
 import org.springframework.stereotype.Service;
 import reactor.core.publisher.Flux;
@@ -78,4 +76,15 @@ public class ClusterService {
         cluster.getAdminClient().describeTopics().all().get().get("").partitions().get(0).;
         cluster.getAdminClient().listConsumerGroupOffsets("").partitionsToOffsetAndMetadata().get().get("").offset();
     }
+
+    @SneakyThrows
+    public Mono<ResponseEntity<Flux<ConsumerGroup>>> getConsumerGroup (String clusterName) {
+            var cluster = clustersStorage.getClusterByName(clusterName);
+            return ClusterUtil.toMono(cluster.getAdminClient().listConsumerGroups().all())
+                    .flatMap(s -> ClusterUtil.toMono(cluster.getAdminClient()
+                            .describeConsumerGroups(s.stream().map(ConsumerGroupListing::groupId).collect(Collectors.toList())).all()))
+                    .map(s -> s.values().stream()
+                            .map(c -> ClusterUtil.convertToConsumerGroup(c, cluster)).collect(Collectors.toList()))
+                    .map(s -> ResponseEntity.ok(Flux.fromIterable(s)));
+    }
 }

+ 34 - 0
kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/util/ClusterUtil.java

@@ -0,0 +1,34 @@
+package com.provectus.kafka.ui.cluster.util;
+
+import com.provectus.kafka.ui.cluster.model.KafkaCluster;
+import com.provectus.kafka.ui.model.ConsumerGroup;
+import org.apache.kafka.clients.admin.ConsumerGroupDescription;
+import org.apache.kafka.common.KafkaFuture;
+import reactor.core.publisher.Mono;
+
+import java.util.HashSet;
+import java.util.Set;
+
+public class ClusterUtil {
+
+    public static <T> Mono<T> toMono(KafkaFuture<T> future){
+        return Mono.create(sink -> future.whenComplete((res, ex)->{
+            if (ex!=null) {
+                sink.error(ex);
+            } else {
+                sink.success(res);
+            }
+        }));
+    }
+
+    public static ConsumerGroup convertToConsumerGroup(ConsumerGroupDescription c, KafkaCluster cluster) {
+        ConsumerGroup consumerGroup = new ConsumerGroup();
+        consumerGroup.setClusterId(cluster.getCluster().getId());
+        consumerGroup.setConsumerGroupId(c.groupId());
+        consumerGroup.setNumConsumers(c.members().size());
+        Set<String> topics = new HashSet<>();
+        c.members().forEach(s1 -> s1.assignment().topicPartitions().forEach(s2 -> topics.add(s2.topic())));
+        consumerGroup.setNumTopics(topics.size());
+        return consumerGroup;
+    }
+}

+ 6 - 0
kafka-ui-api/src/main/java/com/provectus/kafka/ui/rest/MetricsRestController.java

@@ -4,6 +4,7 @@ import com.provectus.kafka.ui.api.ApiClustersApi;
 import com.provectus.kafka.ui.cluster.service.ClusterService;
 import com.provectus.kafka.ui.model.*;
 import lombok.RequiredArgsConstructor;
+import org.apache.kafka.clients.admin.ListConsumerGroupsResult;
 import org.springframework.http.ResponseEntity;
 import org.springframework.web.bind.annotation.RestController;
 import org.springframework.web.server.ServerWebExchange;
@@ -54,6 +55,11 @@ public class MetricsRestController implements ApiClustersApi {
         return Mono.just(ResponseEntity.ok(Flux.fromIterable(new ArrayList<>())));
     }
 
+    @Override
+    public Mono<ResponseEntity<Flux<ConsumerGroup>>> getConsumerGroup(String clusterName, ServerWebExchange exchange) {
+        return clusterService.getConsumerGroup(clusterName);
+    }
+
     @Override
     public Mono<ResponseEntity<ConsumerGroupDetails>> getConsumerGroupDetail(String consumerGroupId, ServerWebExchange exchange) {
         return Mono.just(ResponseEntity.ok(new ConsumerGroupDetails()));

+ 34 - 0
kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml

@@ -189,6 +189,28 @@ paths:
               schema:
                 $ref: '#/components/schemas/ConsumerGroupDetails'
 
+  /api/clusters/{clusterName}/consumerGroups:
+    get:
+      tags:
+        - /api/clusters
+      summary: getConsumerGroup
+      operationId: getConsumerGroup
+      parameters:
+        - name: clusterName
+          in: path
+          required: true
+          schema:
+            type: string
+      responses:
+        200:
+          description: OK
+          content:
+            application/json:
+              schema:
+                type: array
+                items:
+                  $ref: '#/components/schemas/ConsumerGroup'
+
 components:
   schemas:
     Cluster:
@@ -338,6 +360,18 @@ components:
           type: string
         partition:
           type: string
+
+    ConsumerGroup:
+      type: object
+      properties:
+        clusterId:
+          type: string
+        consumerGroupId:
+            type: string
+        numConsumers:
+            type: integer
+        numTopics:
+            type: integer
         messagesBehind:
           type: integer
         currentOffset:

+ 2 - 8
kafka-ui-react-app/mock/payload/brokerMetrics.json

@@ -6,8 +6,6 @@
     "zooKeeperStatus": 1,
     "activeControllers": 1,
     "uncleanLeaderElectionCount": 0,
-    "networkPoolUsage": 0.001970896739179595,
-    "requestPoolUsage": 0.00730438980248805,
     "onlinePartitionCount": 19,
     "underReplicatedPartitionCount": 9,
     "offlinePartitionCount": 3,
@@ -18,8 +16,7 @@
         "brokerId": 1,
         "segmentSize": 479900675
       }
-    ],
-    "diskUsageDistribution": "even"
+    ]
   },
   {
     "clusterName": "kafka-ui.cluster",
@@ -28,8 +25,6 @@
     "zooKeeperStatus": 1,
     "activeControllers": 1,
     "uncleanLeaderElectionCount": 0,
-    "networkPoolUsage": 0.004401004145400575,
-    "requestPoolUsage": 0.004089519725388984,
     "onlinePartitionCount": 70,
     "underReplicatedPartitionCount": 1,
     "offlinePartitionCount": 2,
@@ -40,7 +35,6 @@
         "brokerId": 1,
         "segmentSize": 968226532
       }
-    ],
-    "diskUsageDistribution": "even"
+    ]
   }
 ]

+ 0 - 38
kafka-ui-react-app/src/components/Brokers/Brokers.tsx

@@ -1,7 +1,6 @@
 import React from 'react';
 import { ClusterName, BrokerMetrics, ZooKeeperStatus } from 'redux/interfaces';
 import useInterval from 'lib/hooks/useInterval';
-import formatBytes from 'lib/utils/formatBytes';
 import cx from 'classnames';
 import MetricsWrapper from 'components/common/Dashboard/MetricsWrapper';
 import Indicator from 'components/common/Dashboard/Indicator';
@@ -10,8 +9,6 @@ import Breadcrumb from 'components/common/Breadcrumb/Breadcrumb';
 interface Props extends BrokerMetrics {
   clusterName: ClusterName;
   isFetched: boolean;
-  minDiskUsage: number;
-  maxDiskUsage: number;
   fetchBrokers: (clusterName: ClusterName) => void;
   fetchBrokerMetrics: (clusterName: ClusterName) => void;
 }
@@ -27,11 +24,6 @@ const Topics: React.FC<Props> = ({
   inSyncReplicasCount,
   outOfSyncReplicasCount,
   underReplicatedPartitionCount,
-  diskUsageDistribution,
-  minDiskUsage,
-  maxDiskUsage,
-  networkPoolUsage,
-  requestPoolUsage,
   fetchBrokers,
   fetchBrokerMetrics,
 }) => {
@@ -45,9 +37,6 @@ const Topics: React.FC<Props> = ({
 
   useInterval(() => { fetchBrokerMetrics(clusterName); }, 5000);
 
-  const [minDiskUsageValue, minDiskUsageSize] = formatBytes(minDiskUsage);
-  const [maxDiskUsageValue, maxDiskUsageSize] = formatBytes(maxDiskUsage);
-
   const zkOnline = zooKeeperStatus === ZooKeeperStatus.online;
 
   return (
@@ -85,33 +74,6 @@ const Topics: React.FC<Props> = ({
           {outOfSyncReplicasCount}
         </Indicator>
       </MetricsWrapper>
-
-      <MetricsWrapper title="Disk">
-        <Indicator label="Max usage">
-          {maxDiskUsageValue}
-          <span className="subtitle has-text-weight-light"> {maxDiskUsageSize}</span>
-        </Indicator>
-        <Indicator label="Min usage">
-          {minDiskUsageValue}
-          <span className="subtitle has-text-weight-light"> {minDiskUsageSize}</span>
-        </Indicator>
-        <Indicator label="Distribution">
-          <span className="is-capitalized">
-            {diskUsageDistribution}
-          </span>
-        </Indicator>
-      </MetricsWrapper>
-
-      <MetricsWrapper title="System">
-        <Indicator label="Network pool usage">
-          {Math.round(networkPoolUsage * 10000) / 100}
-          <span className="subtitle has-text-weight-light">%</span>
-        </Indicator>
-        <Indicator label="Request pool usage">
-          {Math.round(requestPoolUsage * 10000) / 100}
-          <span className="subtitle has-text-weight-light">%</span>
-        </Indicator>
-      </MetricsWrapper>
     </div>
   );
 };

+ 1 - 6
kafka-ui-react-app/src/components/Brokers/BrokersContainer.ts

@@ -20,16 +20,11 @@ const mapStateToProps = (state: RootState, { match: { params: { clusterName } }}
   brokerCount: brokerSelectors.getBrokerCount(state),
   zooKeeperStatus: brokerSelectors.getZooKeeperStatus(state),
   activeControllers: brokerSelectors.getActiveControllers(state),
-  networkPoolUsage: brokerSelectors.getNetworkPoolUsage(state),
-  requestPoolUsage: brokerSelectors.getRequestPoolUsage(state),
   onlinePartitionCount: brokerSelectors.getOnlinePartitionCount(state),
   offlinePartitionCount: brokerSelectors.getOfflinePartitionCount(state),
   inSyncReplicasCount: brokerSelectors.getInSyncReplicasCount(state),
   outOfSyncReplicasCount: brokerSelectors.getOutOfSyncReplicasCount(state),
-  underReplicatedPartitionCount: brokerSelectors.getUnderReplicatedPartitionCount(state),
-  diskUsageDistribution: brokerSelectors.getDiskUsageDistribution(state),
-  minDiskUsage: brokerSelectors.getMinDiskUsage(state),
-  maxDiskUsage: brokerSelectors.getMaxDiskUsage(state),
+  underReplicatedPartitionCount: brokerSelectors.getUnderReplicatedPartitionCount(state)
 });
 
 const mapDispatchToProps = {

+ 3 - 2
kafka-ui-react-app/src/components/ConsumerGroups/Details/Details.tsx

@@ -3,7 +3,7 @@ import { ClusterName } from 'redux/interfaces';
 import Breadcrumb from 'components/common/Breadcrumb/Breadcrumb';
 import { clusterConsumerGroupsPath } from 'lib/paths';
 import { ConsumerGroupID, ConsumerGroup, ConsumerGroupDetails } from 'redux/interfaces/consumerGroup';
-import { Consumer } from '../../../redux/interfaces/consumerGroup';
+import { Consumer } from 'redux/interfaces/consumerGroup';
 import ListItem from './ListItem';
 
 interface Props extends ConsumerGroup, ConsumerGroupDetails {
@@ -55,7 +55,8 @@ const Details: React.FC<Props> = ({
               .map((consumer, index) => (
                 <ListItem
                   key={`consumers-list-item-key-${index}`}
-                  {...{clusterName, ...consumer}}
+                  clusterName={clusterName}
+                  {...consumer}
                 />
               ))}
           </tbody>

+ 1 - 1
kafka-ui-react-app/src/components/ConsumerGroups/Details/ListItem.tsx

@@ -1,7 +1,7 @@
 import React from 'react';
 import { Consumer } from 'redux/interfaces/consumerGroup';
 import { NavLink } from 'react-router-dom';
-import { ClusterName } from '../../../redux/interfaces/cluster';
+import { ClusterName } from 'redux/interfaces/cluster';
 
 
 interface Props extends Consumer {

+ 1 - 1
kafka-ui-react-app/src/redux/api/consumerGroups.ts

@@ -9,4 +9,4 @@ export const getConsumerGroups = (clusterName: ClusterName): Promise<ConsumerGro
 
 export const getConsumerGroupDetails = (clusterName: ClusterName, consumerGroupID: ConsumerGroupID): Promise<ConsumerGroupDetails> =>
   fetch(`${BASE_URL}/clusters/${clusterName}/consumer-groups/${consumerGroupID}`, { ...BASE_PARAMS })
-    .then(res => res.json());
+    .then(res => res.json());

+ 0 - 3
kafka-ui-react-app/src/redux/interfaces/broker.ts

@@ -19,14 +19,11 @@ export interface BrokerMetrics {
   brokerCount: number;
   zooKeeperStatus: ZooKeeperStatus;
   activeControllers: number;
-  networkPoolUsage: number;
-  requestPoolUsage: number;
   onlinePartitionCount: number;
   offlinePartitionCount: number;
   inSyncReplicasCount: number,
   outOfSyncReplicasCount: number,
   underReplicatedPartitionCount: number;
-  diskUsageDistribution?: string;
   diskUsage: BrokerDiskUsage[];
 }
 

+ 0 - 3
kafka-ui-react-app/src/redux/reducers/brokers/reducer.ts

@@ -13,14 +13,11 @@ export const initialState: BrokersState =  {
   brokerCount: 0,
   zooKeeperStatus: ZooKeeperStatus.offline,
   activeControllers: 0,
-  networkPoolUsage: 0,
-  requestPoolUsage: 0,
   onlinePartitionCount: 0,
   offlinePartitionCount: 0,
   inSyncReplicasCount: 0,
   outOfSyncReplicasCount: 0,
   underReplicatedPartitionCount: 0,
-  diskUsageDistribution: undefined,
   diskUsage: [],
 };
 

+ 0 - 23
kafka-ui-react-app/src/redux/reducers/brokers/selectors.ts

@@ -16,31 +16,8 @@ const getBrokerList = createSelector(brokersState, ({ items }) => items);
 export const getBrokerCount = createSelector(brokersState, ({ brokerCount }) => brokerCount);
 export const getZooKeeperStatus = createSelector(brokersState, ({ zooKeeperStatus }) => zooKeeperStatus);
 export const getActiveControllers = createSelector(brokersState, ({ activeControllers }) => activeControllers);
-export const getNetworkPoolUsage = createSelector(brokersState, ({ networkPoolUsage }) => networkPoolUsage);
-export const getRequestPoolUsage = createSelector(brokersState, ({ requestPoolUsage }) => requestPoolUsage);
 export const getOnlinePartitionCount = createSelector(brokersState, ({ onlinePartitionCount }) => onlinePartitionCount);
 export const getOfflinePartitionCount = createSelector(brokersState, ({ offlinePartitionCount }) => offlinePartitionCount);
 export const getInSyncReplicasCount = createSelector(brokersState, ({ inSyncReplicasCount }) => inSyncReplicasCount);
 export const getOutOfSyncReplicasCount = createSelector(brokersState, ({ outOfSyncReplicasCount }) => outOfSyncReplicasCount);
-export const getDiskUsageDistribution = createSelector(brokersState, ({ diskUsageDistribution }) => diskUsageDistribution);
 export const getUnderReplicatedPartitionCount = createSelector(brokersState, ({ underReplicatedPartitionCount }) => underReplicatedPartitionCount);
-
-export const getMinDiskUsage = createSelector(
-  getBrokerList,
-  (brokers) => {
-    if (brokers.length === 0) {
-      return 0;
-    }
-    return Math.min(...brokers.map(({ segmentSize }) => segmentSize));
-  },
-);
-
-export const getMaxDiskUsage = createSelector(
-  getBrokerList,
-  (brokers) => {
-    if (brokers.length === 0) {
-      return 0;
-    }
-    return Math.max(...brokers.map(({ segmentSize }) => segmentSize));
-  },
-);

+ 1 - 1
kafka-ui-react-app/src/redux/reducers/consumerGroups/selectors.ts

@@ -38,4 +38,4 @@ export const getConsumerGroupByID = createSelector(
   getConsumerGroupsMap,
   getConsumerGroupID,
   (consumerGroups, consumerGroupID) => consumerGroups[consumerGroupID],
-);
+);