666: Adding new endpoint with corresponding service method. Also a… (#714)

* (#666): Adding new endpoint with corresponding service method. Also adding search box to kafka connect screen with consuming logic for added endopoint.

* Applying feedback: reusing same endpoint and removing '/filtered' version
This commit is contained in:
Victor Alfaro 2021-08-03 07:08:55 -06:00 committed by GitHub
parent 9770ad47af
commit 51646e786a
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
14 changed files with 175 additions and 13 deletions

View file

@ -4,7 +4,7 @@ services:
kafka-ui:
container_name: kafka-ui
image: provectuslabs/kafka-ui:master
image: kafka-ui:local
ports:
- 8080:8080
depends_on:

View file

@ -63,12 +63,14 @@ public class KafkaConnectController implements KafkaConnectApi {
.map(ResponseEntity::ok);
}
@Override
public Mono<ResponseEntity<Flux<FullConnectorInfo>>> getAllConnectors(
String clusterName,
String search,
ServerWebExchange exchange
) {
return Mono.just(ResponseEntity.ok(kafkaConnectService.getAllConnectors(clusterName)));
return Mono.just(ResponseEntity.ok(kafkaConnectService.getAllConnectors(clusterName, search)));
}
@Override

View file

@ -23,10 +23,13 @@ import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import lombok.RequiredArgsConstructor;
import lombok.SneakyThrows;
import lombok.extern.log4j.Log4j2;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.tuple.Pair;
import org.springframework.stereotype.Service;
import reactor.core.publisher.Flux;
@ -52,7 +55,7 @@ public class KafkaConnectService {
);
}
public Flux<FullConnectorInfo> getAllConnectors(String clusterName) {
public Flux<FullConnectorInfo> getAllConnectors(final String clusterName, final String search) {
return getConnects(clusterName)
.flatMapMany(Function.identity())
.flatMap(connect -> getConnectorNames(clusterName, connect))
@ -87,7 +90,25 @@ public class KafkaConnectService {
.build()
);
})
.map(kafkaConnectMapper::fullConnectorInfoFromTuple);
.map(kafkaConnectMapper::fullConnectorInfoFromTuple)
.filter(matchesSearchTerm(search));
}
private Predicate<FullConnectorInfo> matchesSearchTerm(final String search) {
return (connector) -> getSearchValues(connector)
.anyMatch(value -> value.contains(
StringUtils.defaultString(
search,
StringUtils.EMPTY)
.toUpperCase()));
}
private Stream<String> getSearchValues(FullConnectorInfo fullConnectorInfo) {
return Stream.of(
fullConnectorInfo.getName(),
fullConnectorInfo.getStatus().getState().getValue(),
fullConnectorInfo.getType().getValue())
.map(String::toUpperCase);
}
private Mono<ConnectorTopics> getConnectorTopics(String clusterName, String connectClusterName,
@ -118,7 +139,7 @@ public class KafkaConnectService {
public Flux<String> getConnectors(String clusterName, String connectName) {
return getConnectAddress(clusterName, connectName)
.flatMapMany(connect ->
KafkaConnectClients.withBaseUrl(connect).getConnectors()
KafkaConnectClients.withBaseUrl(connect).getConnectors(null)
.doOnError(log::error)
);
}

View file

@ -70,6 +70,73 @@ public class KafkaConnectServiceTests extends AbstractBaseTest {
.expectStatus().isOk();
}
@Test
public void shouldListAllConnectors() {
webTestClient.get()
.uri("/api/clusters/{clusterName}/connectors", LOCAL)
.exchange()
.expectStatus().isOk()
.expectBody()
.jsonPath(String.format("$[?(@.name == '%s')]", connectorName))
.exists();
}
@Test
public void shouldFilterByNameConnectors() {
webTestClient.get()
.uri(
"/api/clusters/{clusterName}/connectors?search={search}",
LOCAL,
connectorName.split("-")[1])
.exchange()
.expectStatus().isOk()
.expectBody()
.jsonPath(String.format("$[?(@.name == '%s')]", connectorName))
.exists();
}
@Test
public void shouldFilterByStatusConnectors() {
webTestClient.get()
.uri(
"/api/clusters/{clusterName}/connectors?search={search}",
LOCAL,
"running")
.exchange()
.expectStatus().isOk()
.expectBody()
.jsonPath(String.format("$[?(@.name == '%s')]", connectorName))
.exists();
}
@Test
public void shouldFilterByTypeConnectors() {
webTestClient.get()
.uri(
"/api/clusters/{clusterName}/connectors?search={search}",
LOCAL,
"sink")
.exchange()
.expectStatus().isOk()
.expectBody()
.jsonPath(String.format("$[?(@.name == '%s')]", connectorName))
.exists();
}
@Test
public void shouldNotFilterConnectors() {
webTestClient.get()
.uri(
"/api/clusters/{clusterName}/connectors?search={search}",
LOCAL,
"something-else")
.exchange()
.expectStatus().isOk()
.expectBody()
.jsonPath(String.format("$[?(@.name == '%s')]", connectorName))
.doesNotExist();
}
@Test
public void shouldListConnectors() {
webTestClient.get()

View file

@ -20,6 +20,12 @@ paths:
- KafkaConnectClient
summary: get all connectors from Kafka Connect service
operationId: getConnectors
parameters:
- name: search
in: query
required: false
schema:
type: string
responses:
200:
description: OK

View file

@ -1040,7 +1040,7 @@ paths:
get:
tags:
- Kafka Connect
summary: get all kafka connectors
summary: get filtered kafka connectors
operationId: getAllConnectors
parameters:
- name: clusterName
@ -1048,6 +1048,11 @@ paths:
required: true
schema:
type: string
- name: search
in: query
required: false
schema:
type: string
responses:
200:
description: OK

View file

@ -1,12 +1,13 @@
import React from 'react';
import { Link, useParams } from 'react-router-dom';
import { Connect, FullConnectorInfo } from 'generated-sources';
import { ClusterName } from 'redux/interfaces';
import { ClusterName, ConnectorSearch } from 'redux/interfaces';
import { clusterConnectorNewPath } from 'lib/paths';
import ClusterContext from 'components/contexts/ClusterContext';
import Indicator from 'components/common/Dashboard/Indicator';
import MetricsWrapper from 'components/common/Dashboard/MetricsWrapper';
import PageLoader from 'components/common/PageLoader/PageLoader';
import Search from 'components/common/Search/Search';
import ListItem from './ListItem';
@ -17,6 +18,8 @@ export interface ListProps {
connects: Connect[];
fetchConnects(clusterName: ClusterName): void;
fetchConnectors(clusterName: ClusterName): void;
search: string;
setConnectorSearch(value: ConnectorSearch): void;
}
const List: React.FC<ListProps> = ({
@ -26,6 +29,8 @@ const List: React.FC<ListProps> = ({
areConnectorsFetching,
fetchConnects,
fetchConnectors,
search,
setConnectorSearch,
}) => {
const { isReadOnly } = React.useContext(ClusterContext);
const { clusterName } = useParams<{ clusterName: string }>();
@ -35,6 +40,12 @@ const List: React.FC<ListProps> = ({
fetchConnectors(clusterName);
}, [fetchConnects, fetchConnectors, clusterName]);
const handleSearch = (value: string) =>
setConnectorSearch({
clusterName,
search: value,
});
return (
<>
<MetricsWrapper>
@ -47,6 +58,14 @@ const List: React.FC<ListProps> = ({
{connects.length}
</Indicator>
<div className="column">
<Search
handleSearch={handleSearch}
placeholder="Search by Connect Name, Status or Type"
value={search}
/>
</div>
{!isReadOnly && (
<div className="level-item level-right">
<Link

View file

@ -3,12 +3,14 @@ import { RootState } from 'redux/interfaces';
import {
fetchConnects,
fetchConnectors,
setConnectorSearch,
} from 'redux/actions/thunks/connectors';
import {
getConnects,
getConnectors,
getAreConnectsFetching,
getAreConnectorsFetching,
getConnectorSearch,
} from 'redux/reducers/connect/selectors';
import List from 'components/Connect/List/List';
@ -17,11 +19,13 @@ const mapStateToProps = (state: RootState) => ({
areConnectorsFetching: getAreConnectorsFetching(state),
connects: getConnects(state),
connectors: getConnectors(state),
search: getConnectorSearch(state),
});
const mapDispatchToProps = {
fetchConnects,
fetchConnectors,
setConnectorSearch,
};
export default connect(mapStateToProps, mapDispatchToProps)(List);

View file

@ -31,6 +31,7 @@ describe('Connectors List', () => {
describe('View', () => {
const fetchConnects = jest.fn();
const fetchConnectors = jest.fn();
const setConnectorSearch = jest.fn();
const setupComponent = (
props: Partial<ListProps> = {},
contextValue: ContextProps = initialValue
@ -44,6 +45,8 @@ describe('Connectors List', () => {
connects={[]}
fetchConnects={fetchConnects}
fetchConnectors={fetchConnectors}
search=""
setConnectorSearch={setConnectorSearch}
{...props}
/>
</ClusterContext.Provider>

View file

@ -59,7 +59,8 @@ describe('Thunks', () => {
it('creates GET_CONNECTORS__SUCCESS when fetching connectors', async () => {
fetchMock.getOnce(
`/api/clusters/${clusterName}/connectors`,
connectorsServerPayload
connectorsServerPayload,
{ query: { search: '' } }
);
await store.dispatch(thunks.fetchConnectors(clusterName));
expect(store.getActions()).toEqual([
@ -71,9 +72,10 @@ describe('Thunks', () => {
it('creates GET_CONNECTORS__SUCCESS when fetching connectors in silent mode', async () => {
fetchMock.getOnce(
`/api/clusters/${clusterName}/connectors`,
connectorsServerPayload
connectorsServerPayload,
{ query: { search: '' } }
);
await store.dispatch(thunks.fetchConnectors(clusterName, true));
await store.dispatch(thunks.fetchConnectors(clusterName, '', true));
expect(store.getActions()).toEqual([
actions.fetchConnectorsAction.success({
...store.getState().connect,
@ -83,7 +85,9 @@ describe('Thunks', () => {
});
it('creates GET_CONNECTORS__FAILURE', async () => {
fetchMock.getOnce(`/api/clusters/${clusterName}/connectors`, 404);
fetchMock.getOnce(`/api/clusters/${clusterName}/connectors`, 404, {
query: { search: '' },
});
await store.dispatch(thunks.fetchConnectors(clusterName));
expect(store.getActions()).toEqual([
actions.fetchConnectorsAction.request(),

View file

@ -12,6 +12,7 @@ import {
ConnectName,
ConnectorConfig,
ConnectorName,
ConnectorSearch,
FailurePayload,
PromiseThunkResult,
} from 'redux/interfaces';
@ -39,12 +40,17 @@ export const fetchConnects =
};
export const fetchConnectors =
(clusterName: ClusterName, silent = false): PromiseThunkResult<void> =>
(
clusterName: ClusterName,
search = '',
silent = false
): PromiseThunkResult<void> =>
async (dispatch) => {
if (!silent) dispatch(actions.fetchConnectorsAction.request());
try {
const connectors = await kafkaConnectApiClient.getAllConnectors({
clusterName,
search,
});
dispatch(actions.fetchConnectorsAction.success({ connectors }));
} catch (error) {
@ -127,7 +133,7 @@ export const deleteConnector =
connectorName,
});
dispatch(actions.deleteConnectorAction.success({ connectorName }));
dispatch(fetchConnectors(clusterName, true));
dispatch(fetchConnectors(clusterName, '', true));
} catch (error) {
const response = await getResponse(error);
const alert: FailurePayload = {
@ -338,3 +344,14 @@ export const updateConnectorConfig =
}
return undefined;
};
export const setConnectorSearch = (
connectorSearch: ConnectorSearch,
silent = false
): PromiseThunkResult<void> => {
return fetchConnectors(
connectorSearch.clusterName,
connectorSearch.search,
silent
);
};

View file

@ -1,5 +1,7 @@
import { Connect, Connector, FullConnectorInfo, Task } from 'generated-sources';
import { ClusterName } from './cluster';
export type ConnectName = Connect['name'];
export type ConnectorName = Connector['name'];
export type ConnectorConfig = Connector['config'];
@ -12,4 +14,10 @@ export interface ConnectState {
tasks: Task[];
config: ConnectorConfig | null;
};
search: string;
}
export interface ConnectorSearch {
clusterName: ClusterName;
search: string;
}

View file

@ -12,6 +12,7 @@ export const initialState: ConnectState = {
tasks: [],
config: null,
},
search: '',
};
const reducer = (state = initialState, action: Action): ConnectState => {

View file

@ -119,3 +119,8 @@ export const getConnectorConfig = createSelector(
getCurrentConnector,
({ config }) => config
);
export const getConnectorSearch = createSelector(
connectState,
(state) => state.search
);