Merge branch 'master' into feature/14-add-custom-params-for-topics-creation

This commit is contained in:
Azat Gataullin 2020-04-10 17:00:34 +03:00 committed by GitHub
commit e4dbebbc20
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
29 changed files with 447 additions and 32 deletions

View file

@ -2,9 +2,12 @@ package com.provectus.kafka.ui.cluster.service;
import com.provectus.kafka.ui.cluster.model.ClustersStorage;
import com.provectus.kafka.ui.cluster.model.KafkaCluster;
import com.provectus.kafka.ui.cluster.util.ClusterUtil;
import com.provectus.kafka.ui.kafka.KafkaService;
import com.provectus.kafka.ui.model.*;
import lombok.RequiredArgsConstructor;
import lombok.SneakyThrows;
import org.apache.kafka.clients.admin.ConsumerGroupListing;
import org.springframework.http.ResponseEntity;
import org.springframework.stereotype.Service;
import reactor.core.publisher.Flux;
@ -58,4 +61,15 @@ public class ClusterService {
if (cluster == null) return null;
return kafkaService.createTopic(cluster, topicFormData);
}
@SneakyThrows
public Mono<ResponseEntity<Flux<ConsumerGroup>>> getConsumerGroup (String clusterName) {
var cluster = clustersStorage.getClusterByName(clusterName);
return ClusterUtil.toMono(cluster.getAdminClient().listConsumerGroups().all())
.flatMap(s -> ClusterUtil.toMono(cluster.getAdminClient()
.describeConsumerGroups(s.stream().map(ConsumerGroupListing::groupId).collect(Collectors.toList())).all()))
.map(s -> s.values().stream()
.map(c -> ClusterUtil.convertToConsumerGroup(c, cluster)).collect(Collectors.toList()))
.map(s -> ResponseEntity.ok(Flux.fromIterable(s)));
}
}

View file

@ -0,0 +1,34 @@
package com.provectus.kafka.ui.cluster.util;
import com.provectus.kafka.ui.cluster.model.KafkaCluster;
import com.provectus.kafka.ui.model.ConsumerGroup;
import org.apache.kafka.clients.admin.ConsumerGroupDescription;
import org.apache.kafka.common.KafkaFuture;
import reactor.core.publisher.Mono;
import java.util.HashSet;
import java.util.Set;
public class ClusterUtil {
public static <T> Mono<T> toMono(KafkaFuture<T> future){
return Mono.create(sink -> future.whenComplete((res, ex)->{
if (ex!=null) {
sink.error(ex);
} else {
sink.success(res);
}
}));
}
public static ConsumerGroup convertToConsumerGroup(ConsumerGroupDescription c, KafkaCluster cluster) {
ConsumerGroup consumerGroup = new ConsumerGroup();
consumerGroup.setClusterId(cluster.getCluster().getId());
consumerGroup.setConsumerGroupId(c.groupId());
consumerGroup.setNumConsumers(c.members().size());
Set<String> topics = new HashSet<>();
c.members().forEach(s1 -> s1.assignment().topicPartitions().forEach(s2 -> topics.add(s2.topic())));
consumerGroup.setNumTopics(topics.size());
return consumerGroup;
}
}

View file

@ -4,6 +4,7 @@ import com.provectus.kafka.ui.api.ApiClustersApi;
import com.provectus.kafka.ui.cluster.service.ClusterService;
import com.provectus.kafka.ui.model.*;
import lombok.RequiredArgsConstructor;
import org.apache.kafka.clients.admin.ListConsumerGroupsResult;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.server.ServerWebExchange;
@ -53,4 +54,9 @@ public class MetricsRestController implements ApiClustersApi {
public Mono<ResponseEntity<Flux<Broker>>> getBrokers(String clusterId, ServerWebExchange exchange) {
return Mono.just(ResponseEntity.ok(Flux.fromIterable(new ArrayList<>())));
}
@Override
public Mono<ResponseEntity<Flux<ConsumerGroup>>> getConsumerGroup(String clusterName, ServerWebExchange exchange) {
return clusterService.getConsumerGroup(clusterName);
}
}

View file

@ -169,6 +169,28 @@ paths:
items:
$ref: '#/components/schemas/TopicConfig'
/api/clusters/{clusterName}/consumerGroups:
get:
tags:
- /api/clusters
summary: getConsumerGroup
operationId: getConsumerGroup
parameters:
- name: clusterName
in: path
required: true
schema:
type: string
responses:
200:
description: OK
content:
application/json:
schema:
type: array
items:
$ref: '#/components/schemas/ConsumerGroup'
components:
schemas:
Cluster:
@ -307,4 +329,16 @@ components:
type: object
properties:
id:
type: string
type: string
ConsumerGroup:
type: object
properties:
clusterId:
type: string
consumerGroupId:
type: string
numConsumers:
type: integer
numTopics:
type: integer

View file

@ -5,6 +5,7 @@ const brokerMetrics = require('./payload/brokerMetrics.json');
const topics = require('./payload/topics.json');
const topicDetails = require('./payload/topicDetails.json');
const topicConfigs = require('./payload/topicConfigs.json');
const consumerGroups = require('./payload/consumerGroups.json');
const db = {
clusters,
@ -13,6 +14,7 @@ const db = {
topics: topics.map((topic) => ({...topic, id: topic.name})),
topicDetails,
topicConfigs,
consumerGroups: consumerGroups.map((group) => ({...group, id: group.consumerGroupId}))
};
const server = jsonServer.create();
const router = jsonServer.router(db);

View file

@ -0,0 +1,39 @@
[
{
"clusterId": "fake.cluster",
"consumerGroupId": "_fake.cluster.consumer_1",
"numConsumers": 1,
"numTopics": 11
},
{
"clusterId": "fake.cluster",
"consumerGroupId": "_fake.cluster.consumer_2",
"numConsumers": 2,
"numTopics": 22
},
{
"clusterId": "fake.cluster",
"consumerGroupId": "_fake.cluster.consumer_3",
"numConsumers": 3,
"numTopics": 33
},
{
"clusterId": "kafka-ui.cluster",
"consumerGroupId": "_kafka-ui.cluster.consumer_1",
"numConsumers": 4,
"numTopics": 44
},
{
"clusterId": "kafka-ui.cluster",
"consumerGroupId": "_kafka-ui.cluster.consumer_2",
"numConsumers": 5,
"numTopics": 55
},
{
"clusterId": "kafka-ui.cluster",
"consumerGroupId": "_kafka-ui.cluster.consumer_3",
"numConsumers": 6,
"numTopics": 66
}
]

View file

@ -10478,6 +10478,11 @@
"json-parse-better-errors": "^1.0.1"
}
},
"parse-ms": {
"version": "2.1.0",
"resolved": "https://registry.npmjs.org/parse-ms/-/parse-ms-2.1.0.tgz",
"integrity": "sha512-kHt7kzLoS9VBZfUsiKjv43mr91ea+U05EyKkEtqp7vNbHxmaVuEqN7XxeEVnGrMtYOAxGrDElSi96K7EgO1zCA=="
},
"parse5": {
"version": "4.0.0",
"resolved": "https://registry.npmjs.org/parse5/-/parse5-4.0.0.tgz",
@ -11686,6 +11691,14 @@
"react-is": "^16.8.4"
}
},
"pretty-ms": {
"version": "6.0.1",
"resolved": "https://registry.npmjs.org/pretty-ms/-/pretty-ms-6.0.1.tgz",
"integrity": "sha512-ke4njoVmlotekHlHyCZ3wI/c5AMT8peuHs8rKJqekj/oR5G8lND2dVpicFlUz5cbZgE290vvkMuDwfj/OcW1kw==",
"requires": {
"parse-ms": "^2.1.0"
}
},
"private": {
"version": "0.1.8",
"resolved": "https://registry.npmjs.org/private/-/private-0.1.8.tgz",

View file

@ -22,6 +22,7 @@
"json-server": "^0.15.1",
"lodash": "^4.17.15",
"node-sass": "^4.13.1",
"pretty-ms": "^6.0.1",
"react": "^16.12.0",
"react-dom": "^16.12.0",
"react-hook-form": "^4.5.5",

View file

@ -10,6 +10,7 @@ import TopicsContainer from './Topics/TopicsContainer';
import NavConatiner from './Nav/NavConatiner';
import PageLoader from './common/PageLoader/PageLoader';
import Dashboard from './Dashboard/Dashboard';
import ConsumersGroupsContainer from './ConsumerGroups/ConsumersGroupsContainer';
interface AppProps {
isClusterListFetched: boolean;
@ -39,6 +40,7 @@ const App: React.FC<AppProps> = ({
<Route exact path="/clusters" component={Dashboard} />
<Route path="/clusters/:clusterName/topics" component={TopicsContainer} />
<Route path="/clusters/:clusterName/brokers" component={BrokersContainer} />
<Route path="/clusters/:clusterName/consumer-groups" component={ConsumersGroupsContainer} />
<Redirect from="/clusters/:clusterName" to="/clusters/:clusterName/brokers" />
</Switch>
) : (

View file

@ -0,0 +1,34 @@
import React from 'react';
import { ClusterName } from 'redux/interfaces';
import {
Switch,
Route,
} from 'react-router-dom';
import ListContainer from './List/ListContainer';
import PageLoader from 'components/common/PageLoader/PageLoader';
interface Props {
clusterName: ClusterName;
isFetched: boolean;
fetchConsumerGroupsList: (clusterName: ClusterName) => void;
}
const ConsumerGroups: React.FC<Props> = ({
clusterName,
isFetched,
fetchConsumerGroupsList,
}) => {
React.useEffect(() => { fetchConsumerGroupsList(clusterName); }, [fetchConsumerGroupsList, clusterName]);
if (isFetched) {
return (
<Switch>
<Route exact path="/clusters/:clusterName/consumer-groups" component={ListContainer} />
</Switch>
);
}
return (<PageLoader />);
};
export default ConsumerGroups;

View file

@ -0,0 +1,24 @@
import { connect } from 'react-redux';
import { fetchConsumerGroupsList } from 'redux/actions';
import { RootState, ClusterName } from 'redux/interfaces';
import { RouteComponentProps } from 'react-router-dom';
import ConsumerGroups from './ConsumerGroups';
import { getIsConsumerGroupsListFetched } from '../../redux/reducers/consumerGroups/selectors';
interface RouteProps {
clusterName: ClusterName;
}
interface OwnProps extends RouteComponentProps<RouteProps> { }
const mapStateToProps = (state: RootState, { match: { params: { clusterName } }}: OwnProps) => ({
isFetched: getIsConsumerGroupsListFetched(state),
clusterName,
});
const mapDispatchToProps = {
fetchConsumerGroupsList: (clusterName: ClusterName) => fetchConsumerGroupsList(clusterName),
};
export default connect(mapStateToProps, mapDispatchToProps)(ConsumerGroups);

View file

@ -0,0 +1,64 @@
import React, { ChangeEvent } from 'react';
import { ConsumerGroup, ClusterName } from 'redux/interfaces';
import ListItem from './ListItem';
import Breadcrumb from 'components/common/Breadcrumb/Breadcrumb';
interface Props {
clusterName: ClusterName;
consumerGroups: (ConsumerGroup)[];
}
const List: React.FC<Props> = ({
consumerGroups,
}) => {
const [searchText, setSearchText] = React.useState<string>('');
const handleInputChange = (event: React.ChangeEvent<HTMLInputElement>) => {
setSearchText(event.target.value);
};
const items = consumerGroups;
return (
<div className="section">
<Breadcrumb>All Consumer Groups</Breadcrumb>
<div className="box">
<div className="columns">
<div className="column is-half is-offset-half">
<input id="searchText"
type="text"
name="searchText"
className="input"
placeholder="Search"
value={searchText}
onChange={handleInputChange}
/>
</div>
</div>
<table className="table is-striped is-fullwidth">
<thead>
<tr>
<th>Consumer group ID</th>
<th>Num of consumers</th>
<th>Num of topics</th>
</tr>
</thead>
<tbody>
{items
.filter( (consumerGroup) => !searchText || consumerGroup?.consumerGroupId?.indexOf(searchText) >= 0)
.map((consumerGroup, index) => (
<ListItem
key={`consumer-group-list-item-key-${index}`}
{...consumerGroup}
/>
))}
</tbody>
</table>
</div>
</div>
);
};
export default List;

View file

@ -0,0 +1,20 @@
import { connect } from 'react-redux';
import {ClusterName, RootState} from 'redux/interfaces';
import { getConsumerGroupsList } from 'redux/reducers/consumerGroups/selectors';
import List from './List';
import { withRouter, RouteComponentProps } from 'react-router-dom';
interface RouteProps {
clusterName: ClusterName;
}
interface OwnProps extends RouteComponentProps<RouteProps> { }
const mapStateToProps = (state: RootState, { match: { params: { clusterName } } }: OwnProps) => ({
clusterName,
consumerGroups: getConsumerGroupsList(state)
});
export default withRouter(
connect(mapStateToProps)(List)
);

View file

@ -0,0 +1,24 @@
import React from 'react';
import { NavLink } from 'react-router-dom';
import { ConsumerGroup } from 'redux/interfaces';
const ListItem: React.FC<ConsumerGroup> = ({
consumerGroupId,
numConsumers,
numTopics,
}) => {
return (
<tr>
{/* <td>
<NavLink exact to={`consumer-groups/${consumerGroupId}`} activeClassName="is-active" className="title is-6">
{consumerGroupId}
</NavLink>
</td> */}
<td>{consumerGroupId}</td>
<td>{numConsumers}</td>
<td>{numTopics}</td>
</tr>
);
}
export default ListItem;

View file

@ -1,7 +1,7 @@
import React, { CSSProperties } from 'react';
import { Cluster } from 'redux/interfaces';
import { NavLink } from 'react-router-dom';
import { clusterBrokersPath, clusterTopicsPath } from 'lib/paths';
import { clusterBrokersPath, clusterTopicsPath, clusterConsumerGroupsPath } from 'lib/paths';
interface Props extends Cluster {}
@ -37,6 +37,9 @@ const ClusterMenu: React.FC<Props> = ({
<NavLink to={clusterTopicsPath(name)} activeClassName="is-active" title="Topics">
Topics
</NavLink>
<NavLink to={clusterConsumerGroupsPath(name)} activeClassName="is-active" title="Consumers">
Consumers
</NavLink>
</ul>
</li>
</ul>

View file

@ -1,15 +1,17 @@
import React from 'react';
import { ClusterName, CleanupPolicy, TopicFormData, TopicName } from 'redux/interfaces';
import { useForm, FormContext, ErrorMessage } from 'react-hook-form';
import Breadcrumb from 'components/common/Breadcrumb/Breadcrumb';
import CustomParamsContainer from "./CustomParams/CustomParamsContainer";
import TimeToRetain from './TimeToRetain';
import { clusterTopicsPath } from 'lib/paths';
import { useForm, FormContext, ErrorMessage } from 'react-hook-form';
import {
TOPIC_NAME_VALIDATION_PATTERN,
MILLISECONDS_IN_DAY,
BYTES_IN_GB,
} from 'lib/constants';
interface Props {
clusterName: ClusterName;
isTopicCreated: boolean;
@ -166,33 +168,7 @@ const New: React.FC<Props> = ({
</div>
<div className="column is-one-third">
<label className="label">
Time to retain data
</label>
<div className="select is-block">
<select
defaultValue={MILLISECONDS_IN_DAY * 7}
name="retentionMs"
ref={methods.register}
disabled={isSubmitting}
>
<option value={MILLISECONDS_IN_DAY / 2}>
12 hours
</option>
<option value={MILLISECONDS_IN_DAY}>
1 day
</option>
<option value={MILLISECONDS_IN_DAY * 2}>
2 days
</option>
<option value={MILLISECONDS_IN_DAY * 7}>
1 week
</option>
<option value={MILLISECONDS_IN_DAY * 7 * 4}>
4 weeks
</option>
</select>
</div>
<TimeToRetain isSubmitting={isSubmitting} />
</div>
<div className="column is-one-third">

View file

@ -0,0 +1,53 @@
import React from 'react';
import prettyMilliseconds from 'pretty-ms';
import { useFormContext, ErrorMessage } from 'react-hook-form';
import { MILLISECONDS_IN_WEEK } from 'lib/constants';
const MILLISECONDS_IN_SECOND = 1000;
interface Props {
isSubmitting: boolean;
}
const TimeToRetain: React.FC<Props> = ({
isSubmitting,
}) => {
const { register, errors, watch } = useFormContext();
const defaultValue = MILLISECONDS_IN_WEEK;
const name: string = 'retentionMs';
const watchedValue: any = watch(name, defaultValue.toString());
const valueHint = React.useMemo(() => {
const value = parseInt(watchedValue, 10);
return value >= MILLISECONDS_IN_SECOND ? prettyMilliseconds(value) : false;
}, [watchedValue])
return (
<>
<label className="label">
Time to retain data (in ms)
</label>
<input
className="input"
type="number"
defaultValue={defaultValue}
name={name}
ref={register(
{ min: { value: -1, message: 'must be greater than or equal to -1' }}
)}
disabled={isSubmitting}
/>
<p className="help is-danger">
<ErrorMessage errors={errors} name={name}/>
</p>
{
valueHint &&
<p className="help is-info">
{valueHint}
</p>
}
</>
);
}
export default TimeToRetain;

View file

@ -10,6 +10,6 @@ export const BASE_URL = process.env.REACT_APP_API_URL;
export const TOPIC_NAME_VALIDATION_PATTERN = RegExp(/^[.,A-Za-z0-9_-]+$/);
export const MILLISECONDS_IN_DAY = 86_400_000;
export const MILLISECONDS_IN_WEEK = 604_800_000;
export const BYTES_IN_GB = 1_073_741_824;

View file

@ -13,3 +13,5 @@ export const clusterTopicNewPath = (clusterName: ClusterName) => `${clusterPath(
export const clusterTopicPath = (clusterName: ClusterName, topicName: TopicName) => `${clusterTopicsPath(clusterName)}/${topicName}`;
export const clusterTopicSettingsPath = (clusterName: ClusterName, topicName: TopicName) => `${clusterTopicsPath(clusterName)}/${topicName}/settings`;
export const clusterTopicMessagesPath = (clusterName: ClusterName, topicName: TopicName) => `${clusterTopicsPath(clusterName)}/${topicName}/messages`;
export const clusterConsumerGroupsPath = (clusterName: ClusterName) => `${clusterPath(clusterName)}/consumer-groups`;

View file

@ -26,4 +26,8 @@ export enum ActionType {
POST_TOPIC__REQUEST = 'POST_TOPIC__REQUEST',
POST_TOPIC__SUCCESS = 'POST_TOPIC__SUCCESS',
POST_TOPIC__FAILURE = 'POST_TOPIC__FAILURE',
GET_CONSUMER_GROUPS__REQUEST = 'GET_CONSUMER_GROUPS__REQUEST',
GET_CONSUMER_GROUPS__SUCCESS = 'GET_CONSUMER_GROUPS__SUCCESS',
GET_CONSUMER_GROUPS__FAILURE = 'GET_CONSUMER_GROUPS__FAILURE',
};

View file

@ -1,5 +1,6 @@
import { createAsyncAction } from 'typesafe-actions';
import { ActionType } from 'redux/actionType';
import { ConsumerGroup } from '../interfaces/consumerGroup';
import {
Broker,
BrokerMetrics,
@ -51,3 +52,9 @@ export const createTopicAction = createAsyncAction(
ActionType.POST_TOPIC__SUCCESS,
ActionType.POST_TOPIC__FAILURE,
)<undefined, Topic, undefined>();
export const fetchConsumerGroupsAction = createAsyncAction(
ActionType.GET_CONSUMER_GROUPS__REQUEST,
ActionType.GET_CONSUMER_GROUPS__SUCCESS,
ActionType.GET_CONSUMER_GROUPS__FAILURE,
)<undefined, ConsumerGroup[], undefined>();

View file

@ -77,3 +77,13 @@ export const createTopic = (clusterName: ClusterName, form: TopicFormData): Prom
dispatch(actions.createTopicAction.failure());
}
};
export const fetchConsumerGroupsList = (clusterName: ClusterName): PromiseThunk<void> => async (dispatch) => {
dispatch(actions.fetchConsumerGroupsAction.request());
try {
const consumerGroups = await api.getConsumerGroups(clusterName);
dispatch(actions.fetchConsumerGroupsAction.success(consumerGroups));
} catch (e) {
dispatch(actions.fetchConsumerGroupsAction.failure());
}
};

View file

@ -0,0 +1,8 @@
import { ClusterName } from '../interfaces/cluster';
import { ConsumerGroup } from '../interfaces/consumerGroup';
import { BASE_PARAMS, BASE_URL } from '../../lib/constants';
export const getConsumerGroups = (clusterName: ClusterName): Promise<ConsumerGroup[]> =>
fetch(`${BASE_URL}/clusters/${clusterName}/consumerGroups`, { ...BASE_PARAMS })
.then(res => res.json());

View file

@ -1,3 +1,4 @@
export * from './topics';
export * from './clusters';
export * from './brokers';
export * from './consumerGroups';

View file

@ -0,0 +1,5 @@
export interface ConsumerGroup {
consumerGroupId: string;
numConsumers: number;
numTopics: number;
}

View file

@ -8,10 +8,12 @@ import { TopicsState } from './topic';
import { Cluster } from './cluster';
import { BrokersState } from './broker';
import { LoaderState } from './loader';
import { ConsumerGroup } from './consumerGroup';
export * from './topic';
export * from './cluster';
export * from './broker';
export * from './consumerGroup';
export * from './loader';
export enum FetchStatus {
@ -25,6 +27,7 @@ export interface RootState {
topics: TopicsState;
clusters: Cluster[];
brokers: BrokersState;
consumerGroups: ConsumerGroup[];
loader: LoaderState;
}

View file

@ -0,0 +1,15 @@
import { Action, ConsumerGroup } from 'redux/interfaces';
import { ActionType } from 'redux/actionType';
export const initialState: ConsumerGroup[] = [];
const reducer = (state = initialState, action: Action): ConsumerGroup[] => {
switch (action.type) {
case ActionType.GET_CONSUMER_GROUPS__SUCCESS:
return action.payload;
default:
return state;
}
};
export default reducer;

View file

@ -0,0 +1,15 @@
import { createSelector } from 'reselect';
import { ConsumerGroup, RootState, FetchStatus } from 'redux/interfaces';
import { createFetchingSelector } from 'redux/reducers/loader/selectors';
const consumerGroupsState = ({ consumerGroups }: RootState): ConsumerGroup[] => consumerGroups;
const getConsumerGroupsListFetchingStatus = createFetchingSelector('GET_CONSUMER_GROUPS');
export const getIsConsumerGroupsListFetched = createSelector(
getConsumerGroupsListFetchingStatus,
(status) => status === FetchStatus.fetched,
);
export const getConsumerGroupsList = createSelector(consumerGroupsState, (consumerGroups) => consumerGroups);

View file

@ -2,6 +2,7 @@ import { combineReducers } from 'redux';
import topics from './topics/reducer';
import clusters from './clusters/reducer';
import brokers from './brokers/reducer';
import consumerGroups from './consumerGroups/reducer';
import loader from './loader/reducer';
import { RootState } from 'redux/interfaces';
@ -9,5 +10,6 @@ export default combineReducers<RootState>({
topics,
clusters,
brokers,
consumerGroups,
loader,
});