Merge branch 'master' of github.com:provectus/kafka-ui into feature/schema_registry_views
This commit is contained in:
commit
e629a25161
24 changed files with 140 additions and 191 deletions
|
@ -4,12 +4,14 @@ import com.fasterxml.jackson.core.type.TypeReference;
|
|||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.google.protobuf.Message;
|
||||
import com.provectus.kafka.ui.cluster.model.KafkaCluster;
|
||||
import io.confluent.kafka.schemaregistry.SchemaProvider;
|
||||
import io.confluent.kafka.schemaregistry.avro.AvroSchemaProvider;
|
||||
import io.confluent.kafka.schemaregistry.avro.AvroSchemaUtils;
|
||||
import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient;
|
||||
import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
|
||||
import io.confluent.kafka.schemaregistry.client.rest.entities.Schema;
|
||||
import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException;
|
||||
import io.confluent.kafka.schemaregistry.protobuf.ProtobufSchemaProvider;
|
||||
import io.confluent.kafka.schemaregistry.protobuf.ProtobufSchemaUtils;
|
||||
import io.confluent.kafka.serializers.KafkaAvroDeserializer;
|
||||
import io.confluent.kafka.serializers.protobuf.KafkaProtobufDeserializer;
|
||||
|
@ -41,14 +43,17 @@ public class SchemaRegistryRecordDeserializer implements RecordDeserializer {
|
|||
this.cluster = cluster;
|
||||
this.objectMapper = objectMapper;
|
||||
|
||||
this.schemaRegistryClient = Optional.ofNullable(cluster.getSchemaRegistry()).map(e ->
|
||||
new CachedSchemaRegistryClient(
|
||||
Collections.singletonList(e),
|
||||
CLIENT_IDENTITY_MAP_CAPACITY,
|
||||
Collections.singletonList(new AvroSchemaProvider()),
|
||||
Collections.emptyMap()
|
||||
)
|
||||
).orElse(null);
|
||||
this.schemaRegistryClient = Optional.ofNullable(cluster.getSchemaRegistry())
|
||||
.map(schemaRegistryUrl -> {
|
||||
List<SchemaProvider> schemaProviders = List.of(new AvroSchemaProvider(), new ProtobufSchemaProvider());
|
||||
return new CachedSchemaRegistryClient(
|
||||
Collections.singletonList(schemaRegistryUrl),
|
||||
CLIENT_IDENTITY_MAP_CAPACITY,
|
||||
schemaProviders,
|
||||
Collections.emptyMap()
|
||||
);
|
||||
}
|
||||
).orElse(null);
|
||||
|
||||
this.avroDeserializer = Optional.ofNullable(this.schemaRegistryClient)
|
||||
.map(KafkaAvroDeserializer::new)
|
||||
|
|
|
@ -9,6 +9,7 @@ import com.provectus.kafka.ui.model.CompatibilityCheckResponse;
|
|||
import com.provectus.kafka.ui.model.CompatibilityLevel;
|
||||
import com.provectus.kafka.ui.model.NewSchemaSubject;
|
||||
import com.provectus.kafka.ui.model.SchemaSubject;
|
||||
import java.util.Formatter;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import lombok.extern.log4j.Log4j2;
|
||||
import org.springframework.http.HttpStatus;
|
||||
|
@ -51,9 +52,12 @@ public class SchemaRegistryService {
|
|||
.map(cluster -> webClient.get()
|
||||
.uri(cluster.getSchemaRegistry() + URL_SUBJECT_VERSIONS, schemaName)
|
||||
.retrieve()
|
||||
.onStatus(HttpStatus.NOT_FOUND::equals, resp -> Mono.error(new NotFoundException("No such schema %s".formatted(schemaName))))
|
||||
.bodyToFlux(Integer.class))
|
||||
.orElse(Flux.error(new NotFoundException("No such cluster")));
|
||||
.onStatus(HttpStatus.NOT_FOUND::equals,
|
||||
resp -> Mono.error(
|
||||
new NotFoundException(formatted("No such schema %s"))
|
||||
)
|
||||
).bodyToFlux(Integer.class)
|
||||
).orElse(Flux.error(new NotFoundException("No such cluster")));
|
||||
}
|
||||
|
||||
public Mono<SchemaSubject> getSchemaSubjectByVersion(String clusterName, String schemaName, Integer version) {
|
||||
|
@ -70,8 +74,12 @@ public class SchemaRegistryService {
|
|||
.uri(cluster.getSchemaRegistry() + URL_SUBJECT_BY_VERSION, schemaName, version)
|
||||
.retrieve()
|
||||
.onStatus(HttpStatus.NOT_FOUND::equals,
|
||||
resp -> Mono.error(new NotFoundException("No such schema %s with version %s".formatted(schemaName, version))))
|
||||
.bodyToMono(SchemaSubject.class)
|
||||
resp -> Mono.error(
|
||||
new NotFoundException(
|
||||
formatted("No such schema %s with version %s", schemaName, version)
|
||||
)
|
||||
)
|
||||
).bodyToMono(SchemaSubject.class)
|
||||
.zipWith(getSchemaCompatibilityInfoOrGlobal(clusterName, schemaName))
|
||||
.map(tuple -> {
|
||||
SchemaSubject schema = tuple.getT1();
|
||||
|
@ -97,9 +105,13 @@ public class SchemaRegistryService {
|
|||
.uri(cluster.getSchemaRegistry() + URL_SUBJECT_BY_VERSION, schemaName, version)
|
||||
.retrieve()
|
||||
.onStatus(HttpStatus.NOT_FOUND::equals,
|
||||
resp -> Mono.error(new NotFoundException("No such schema %s with version %s".formatted(schemaName, version))))
|
||||
.toBodilessEntity())
|
||||
.orElse(Mono.error(new NotFoundException("No such cluster")));
|
||||
resp -> Mono.error(
|
||||
new NotFoundException(
|
||||
formatted("No such schema %s with version %s", schemaName, version)
|
||||
)
|
||||
)
|
||||
).toBodilessEntity()
|
||||
).orElse(Mono.error(new NotFoundException("No such cluster")));
|
||||
}
|
||||
|
||||
public Mono<ResponseEntity<Void>> deleteSchemaSubject(String clusterName, String schemaName) {
|
||||
|
@ -107,7 +119,13 @@ public class SchemaRegistryService {
|
|||
.map(cluster -> webClient.delete()
|
||||
.uri(cluster.getSchemaRegistry() + URL_SUBJECT, schemaName)
|
||||
.retrieve()
|
||||
.onStatus(HttpStatus.NOT_FOUND::equals, resp -> Mono.error(new NotFoundException("No such schema %s".formatted(schemaName))))
|
||||
.onStatus(HttpStatus.NOT_FOUND::equals,
|
||||
resp -> Mono.error(
|
||||
new NotFoundException(
|
||||
formatted("No such schema %s", schemaName)
|
||||
)
|
||||
)
|
||||
)
|
||||
.toBodilessEntity())
|
||||
.orElse(Mono.error(new NotFoundException("No such cluster")));
|
||||
}
|
||||
|
@ -120,7 +138,9 @@ public class SchemaRegistryService {
|
|||
.body(BodyInserters.fromPublisher(newSchemaSubject, NewSchemaSubject.class))
|
||||
.retrieve()
|
||||
.onStatus(HttpStatus.NOT_FOUND::equals,
|
||||
resp -> Mono.error(new NotFoundException("No such schema %s".formatted(schemaName))))
|
||||
resp -> Mono.error(
|
||||
new NotFoundException(formatted("No such schema %s", schemaName)))
|
||||
)
|
||||
.toEntity(SchemaSubject.class)
|
||||
.log())
|
||||
.orElse(Mono.error(new NotFoundException("No such cluster")));
|
||||
|
@ -142,7 +162,7 @@ public class SchemaRegistryService {
|
|||
.body(BodyInserters.fromPublisher(compatibilityLevel, CompatibilityLevel.class))
|
||||
.retrieve()
|
||||
.onStatus(HttpStatus.NOT_FOUND::equals,
|
||||
resp -> Mono.error(new NotFoundException("No such schema %s".formatted(schemaName))))
|
||||
resp -> Mono.error(new NotFoundException(formatted("No such schema %s", schemaName))))
|
||||
.bodyToMono(Void.class);
|
||||
}).orElse(Mono.error(new NotFoundException("No such cluster")));
|
||||
}
|
||||
|
@ -181,10 +201,14 @@ public class SchemaRegistryService {
|
|||
.body(BodyInserters.fromPublisher(newSchemaSubject, NewSchemaSubject.class))
|
||||
.retrieve()
|
||||
.onStatus(HttpStatus.NOT_FOUND::equals,
|
||||
resp -> Mono.error(new NotFoundException("No such schema %s".formatted(schemaName))))
|
||||
resp -> Mono.error(new NotFoundException(formatted("No such schema %s", schemaName))))
|
||||
.bodyToMono(InternalCompatibilityCheck.class)
|
||||
.map(mapper::toCompatibilityCheckResponse)
|
||||
.log()
|
||||
).orElse(Mono.error(new NotFoundException("No such cluster")));
|
||||
}
|
||||
|
||||
public String formatted(String str, Object... args) {
|
||||
return new Formatter().format(str, args).toString();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,6 +1,7 @@
|
|||
package com.provectus.kafka.ui.zookeeper;
|
||||
|
||||
import com.provectus.kafka.ui.cluster.model.KafkaCluster;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import lombok.extern.log4j.Log4j2;
|
||||
import org.I0Itec.zkclient.ZkClient;
|
||||
|
@ -14,7 +15,7 @@ import java.util.Map;
|
|||
@Log4j2
|
||||
public class ZookeeperService {
|
||||
|
||||
private final Map<String, ZkClient> cachedZkClient = new HashMap<>();
|
||||
private final Map<String, ZkClient> cachedZkClient = new ConcurrentHashMap<>();
|
||||
|
||||
public boolean isZookeeperOnline(KafkaCluster kafkaCluster) {
|
||||
var isConnected = false;
|
||||
|
@ -33,7 +34,10 @@ public class ZookeeperService {
|
|||
|
||||
private ZkClient getOrCreateZkClient (KafkaCluster cluster) {
|
||||
try {
|
||||
return cachedZkClient.getOrDefault(cluster.getName(), new ZkClient(cluster.getZookeeper(), 1000));
|
||||
return cachedZkClient.computeIfAbsent(
|
||||
cluster.getName(),
|
||||
(n) -> new ZkClient(cluster.getZookeeper(), 1000)
|
||||
);
|
||||
} catch (Exception e) {
|
||||
log.error("Error while creating zookeeper client for cluster {}", cluster.getName());
|
||||
return null;
|
||||
|
|
|
@ -1,8 +1,6 @@
|
|||
import React from 'react';
|
||||
import { TopicMessage } from 'generated-sources';
|
||||
import CustomParamButton, {
|
||||
CustomParamButtonType,
|
||||
} from '../../shared/Form/CustomParams/CustomParamButton';
|
||||
import CustomParamButton from 'components/Topics/shared/Form/CustomParams/CustomParamButton';
|
||||
import MessageItem from './MessageItem';
|
||||
|
||||
interface MessagesTableProp {
|
||||
|
@ -44,7 +42,7 @@ const MessagesTable: React.FC<MessagesTableProp> = ({ messages, onNext }) => {
|
|||
<div className="column is-full">
|
||||
<CustomParamButton
|
||||
className="is-link is-pulled-right"
|
||||
type={CustomParamButtonType.chevronRight}
|
||||
type="fa-chevron-right"
|
||||
onClick={onNext}
|
||||
btnText="Next"
|
||||
/>
|
||||
|
|
|
@ -1,5 +1,5 @@
|
|||
import React from 'react';
|
||||
import CustomParamButton, { CustomParamButtonType } from './CustomParamButton';
|
||||
import CustomParamButton from './CustomParamButton';
|
||||
|
||||
interface Props {
|
||||
index: string;
|
||||
|
@ -11,7 +11,7 @@ const CustomParamAction: React.FC<Props> = ({ index, onRemove }) => (
|
|||
<label className="label"> </label>
|
||||
<CustomParamButton
|
||||
className="is-danger"
|
||||
type={CustomParamButtonType.minus}
|
||||
type="fa-minus"
|
||||
onClick={() => onRemove(index)}
|
||||
/>
|
||||
</>
|
||||
|
|
|
@ -1,15 +1,9 @@
|
|||
import React from 'react';
|
||||
|
||||
export enum CustomParamButtonType {
|
||||
plus = 'fa-plus',
|
||||
minus = 'fa-minus',
|
||||
chevronRight = 'fa-chevron-right',
|
||||
}
|
||||
|
||||
interface Props {
|
||||
onClick: (event: React.MouseEvent<HTMLButtonElement>) => void;
|
||||
className: string;
|
||||
type: CustomParamButtonType;
|
||||
type: 'fa-plus' | 'fa-minus' | 'fa-chevron-right';
|
||||
btnText?: string;
|
||||
}
|
||||
|
||||
|
|
|
@ -7,7 +7,7 @@ import {
|
|||
TopicConfigByName,
|
||||
TopicConfigParams,
|
||||
} from 'redux/interfaces';
|
||||
import CustomParamButton, { CustomParamButtonType } from './CustomParamButton';
|
||||
import CustomParamButton from './CustomParamButton';
|
||||
import CustomParamField from './CustomParamField';
|
||||
|
||||
export const INDEX_PREFIX = 'customParams';
|
||||
|
@ -79,7 +79,7 @@ const CustomParams: React.FC<Props> = ({ isSubmitting, config }) => {
|
|||
<div className="column">
|
||||
<CustomParamButton
|
||||
className="is-success"
|
||||
type={CustomParamButtonType.plus}
|
||||
type="fa-plus"
|
||||
onClick={onAdd}
|
||||
btnText="Add Custom Parameter"
|
||||
/>
|
||||
|
|
|
@ -1,59 +0,0 @@
|
|||
enum ActionType {
|
||||
GET_CLUSTERS__REQUEST = 'GET_CLUSTERS__REQUEST',
|
||||
GET_CLUSTERS__SUCCESS = 'GET_CLUSTERS__SUCCESS',
|
||||
GET_CLUSTERS__FAILURE = 'GET_CLUSTERS__FAILURE',
|
||||
|
||||
GET_CLUSTER_STATS__REQUEST = 'GET_CLUSTER_STATUS__REQUEST',
|
||||
GET_CLUSTER_STATS__SUCCESS = 'GET_CLUSTER_STATUS__SUCCESS',
|
||||
GET_CLUSTER_STATS__FAILURE = 'GET_CLUSTER_STATUS__FAILURE',
|
||||
|
||||
GET_CLUSTER_METRICS__REQUEST = 'GET_CLUSTER_METRICS__REQUEST',
|
||||
GET_CLUSTER_METRICS__SUCCESS = 'GET_CLUSTER_METRICS__SUCCESS',
|
||||
GET_CLUSTER_METRICS__FAILURE = 'GET_CLUSTER_METRICS__FAILURE',
|
||||
|
||||
GET_BROKERS__REQUEST = 'GET_BROKERS__REQUEST',
|
||||
GET_BROKERS__SUCCESS = 'GET_BROKERS__SUCCESS',
|
||||
GET_BROKERS__FAILURE = 'GET_BROKERS__FAILURE',
|
||||
|
||||
GET_BROKER_METRICS__REQUEST = 'GET_BROKER_METRICS__REQUEST',
|
||||
GET_BROKER_METRICS__SUCCESS = 'GET_BROKER_METRICS__SUCCESS',
|
||||
GET_BROKER_METRICS__FAILURE = 'GET_BROKER_METRICS__FAILURE',
|
||||
|
||||
GET_TOPICS__REQUEST = 'GET_TOPICS__REQUEST',
|
||||
GET_TOPICS__SUCCESS = 'GET_TOPICS__SUCCESS',
|
||||
GET_TOPICS__FAILURE = 'GET_TOPICS__FAILURE',
|
||||
|
||||
GET_TOPIC_MESSAGES__REQUEST = 'GET_TOPIC_MESSAGES__REQUEST',
|
||||
GET_TOPIC_MESSAGES__SUCCESS = 'GET_TOPIC_MESSAGES__SUCCESS',
|
||||
GET_TOPIC_MESSAGES__FAILURE = 'GET_TOPIC_MESSAGES__FAILURE',
|
||||
|
||||
GET_TOPIC_DETAILS__REQUEST = 'GET_TOPIC_DETAILS__REQUEST',
|
||||
GET_TOPIC_DETAILS__SUCCESS = 'GET_TOPIC_DETAILS__SUCCESS',
|
||||
GET_TOPIC_DETAILS__FAILURE = 'GET_TOPIC_DETAILS__FAILURE',
|
||||
|
||||
GET_TOPIC_CONFIG__REQUEST = 'GET_TOPIC_CONFIG__REQUEST',
|
||||
GET_TOPIC_CONFIG__SUCCESS = 'GET_TOPIC_CONFIG__SUCCESS',
|
||||
GET_TOPIC_CONFIG__FAILURE = 'GET_TOPIC_CONFIG__FAILURE',
|
||||
|
||||
POST_TOPIC__REQUEST = 'POST_TOPIC__REQUEST',
|
||||
POST_TOPIC__SUCCESS = 'POST_TOPIC__SUCCESS',
|
||||
POST_TOPIC__FAILURE = 'POST_TOPIC__FAILURE',
|
||||
|
||||
PATCH_TOPIC__REQUEST = 'PATCH_TOPIC__REQUEST',
|
||||
PATCH_TOPIC__SUCCESS = 'PATCH_TOPIC__SUCCESS',
|
||||
PATCH_TOPIC__FAILURE = 'PATCH_TOPIC__FAILURE',
|
||||
|
||||
GET_CONSUMER_GROUPS__REQUEST = 'GET_CONSUMER_GROUPS__REQUEST',
|
||||
GET_CONSUMER_GROUPS__SUCCESS = 'GET_CONSUMER_GROUPS__SUCCESS',
|
||||
GET_CONSUMER_GROUPS__FAILURE = 'GET_CONSUMER_GROUPS__FAILURE',
|
||||
|
||||
GET_CONSUMER_GROUP_DETAILS__REQUEST = 'GET_CONSUMER_GROUP_DETAILS__REQUEST',
|
||||
GET_CONSUMER_GROUP_DETAILS__SUCCESS = 'GET_CONSUMER_GROUP_DETAILS__SUCCESS',
|
||||
GET_CONSUMER_GROUP_DETAILS__FAILURE = 'GET_CONSUMER_GROUP_DETAILS__FAILURE',
|
||||
|
||||
GET_CLUSTER_SCHEMAS__REQUEST = 'GET_CLUSTER_SCHEMAS__REQUEST',
|
||||
GET_CLUSTER_SCHEMAS__SUCCESS = 'GET_CLUSTER_SCHEMAS__SUCCESS',
|
||||
GET_CLUSTER_SCHEMAS__FAILURE = 'GET_CLUSTER_SCHEMAS__FAILURE',
|
||||
}
|
||||
|
||||
export default ActionType;
|
|
@ -1,5 +1,4 @@
|
|||
import { createAsyncAction } from 'typesafe-actions';
|
||||
import ActionType from 'redux/actionType';
|
||||
import { TopicName, ConsumerGroupID } from 'redux/interfaces';
|
||||
|
||||
import {
|
||||
|
@ -18,81 +17,81 @@ import {
|
|||
} from 'generated-sources';
|
||||
|
||||
export const fetchClusterStatsAction = createAsyncAction(
|
||||
ActionType.GET_CLUSTER_STATS__REQUEST,
|
||||
ActionType.GET_CLUSTER_STATS__SUCCESS,
|
||||
ActionType.GET_CLUSTER_STATS__FAILURE
|
||||
'GET_CLUSTER_STATUS__REQUEST',
|
||||
'GET_CLUSTER_STATUS__SUCCESS',
|
||||
'GET_CLUSTER_STATUS__FAILURE'
|
||||
)<undefined, ClusterStats, undefined>();
|
||||
|
||||
export const fetchClusterMetricsAction = createAsyncAction(
|
||||
ActionType.GET_CLUSTER_METRICS__REQUEST,
|
||||
ActionType.GET_CLUSTER_METRICS__SUCCESS,
|
||||
ActionType.GET_CLUSTER_METRICS__FAILURE
|
||||
'GET_CLUSTER_METRICS__REQUEST',
|
||||
'GET_CLUSTER_METRICS__SUCCESS',
|
||||
'GET_CLUSTER_METRICS__FAILURE'
|
||||
)<undefined, ClusterMetrics, undefined>();
|
||||
|
||||
export const fetchBrokersAction = createAsyncAction(
|
||||
ActionType.GET_BROKERS__REQUEST,
|
||||
ActionType.GET_BROKERS__SUCCESS,
|
||||
ActionType.GET_BROKERS__FAILURE
|
||||
'GET_BROKERS__REQUEST',
|
||||
'GET_BROKERS__SUCCESS',
|
||||
'GET_BROKERS__FAILURE'
|
||||
)<undefined, Broker[], undefined>();
|
||||
|
||||
export const fetchBrokerMetricsAction = createAsyncAction(
|
||||
ActionType.GET_BROKER_METRICS__REQUEST,
|
||||
ActionType.GET_BROKER_METRICS__SUCCESS,
|
||||
ActionType.GET_BROKER_METRICS__FAILURE
|
||||
'GET_BROKER_METRICS__REQUEST',
|
||||
'GET_BROKER_METRICS__SUCCESS',
|
||||
'GET_BROKER_METRICS__FAILURE'
|
||||
)<undefined, BrokerMetrics, undefined>();
|
||||
|
||||
export const fetchClusterListAction = createAsyncAction(
|
||||
ActionType.GET_CLUSTERS__REQUEST,
|
||||
ActionType.GET_CLUSTERS__SUCCESS,
|
||||
ActionType.GET_CLUSTERS__FAILURE
|
||||
'GET_CLUSTERS__REQUEST',
|
||||
'GET_CLUSTERS__SUCCESS',
|
||||
'GET_CLUSTERS__FAILURE'
|
||||
)<undefined, Cluster[], undefined>();
|
||||
|
||||
export const fetchTopicsListAction = createAsyncAction(
|
||||
ActionType.GET_TOPICS__REQUEST,
|
||||
ActionType.GET_TOPICS__SUCCESS,
|
||||
ActionType.GET_TOPICS__FAILURE
|
||||
'GET_TOPICS__REQUEST',
|
||||
'GET_TOPICS__SUCCESS',
|
||||
'GET_TOPICS__FAILURE'
|
||||
)<undefined, Topic[], undefined>();
|
||||
|
||||
export const fetchTopicMessagesAction = createAsyncAction(
|
||||
ActionType.GET_TOPIC_MESSAGES__REQUEST,
|
||||
ActionType.GET_TOPIC_MESSAGES__SUCCESS,
|
||||
ActionType.GET_TOPIC_MESSAGES__FAILURE
|
||||
'GET_TOPIC_MESSAGES__REQUEST',
|
||||
'GET_TOPIC_MESSAGES__SUCCESS',
|
||||
'GET_TOPIC_MESSAGES__FAILURE'
|
||||
)<undefined, TopicMessage[], undefined>();
|
||||
|
||||
export const fetchTopicDetailsAction = createAsyncAction(
|
||||
ActionType.GET_TOPIC_DETAILS__REQUEST,
|
||||
ActionType.GET_TOPIC_DETAILS__SUCCESS,
|
||||
ActionType.GET_TOPIC_DETAILS__FAILURE
|
||||
'GET_TOPIC_DETAILS__REQUEST',
|
||||
'GET_TOPIC_DETAILS__SUCCESS',
|
||||
'GET_TOPIC_DETAILS__FAILURE'
|
||||
)<undefined, { topicName: TopicName; details: TopicDetails }, undefined>();
|
||||
|
||||
export const fetchTopicConfigAction = createAsyncAction(
|
||||
ActionType.GET_TOPIC_CONFIG__REQUEST,
|
||||
ActionType.GET_TOPIC_CONFIG__SUCCESS,
|
||||
ActionType.GET_TOPIC_CONFIG__FAILURE
|
||||
'GET_TOPIC_CONFIG__REQUEST',
|
||||
'GET_TOPIC_CONFIG__SUCCESS',
|
||||
'GET_TOPIC_CONFIG__FAILURE'
|
||||
)<undefined, { topicName: TopicName; config: TopicConfig[] }, undefined>();
|
||||
|
||||
export const createTopicAction = createAsyncAction(
|
||||
ActionType.POST_TOPIC__REQUEST,
|
||||
ActionType.POST_TOPIC__SUCCESS,
|
||||
ActionType.POST_TOPIC__FAILURE
|
||||
'POST_TOPIC__REQUEST',
|
||||
'POST_TOPIC__SUCCESS',
|
||||
'POST_TOPIC__FAILURE'
|
||||
)<undefined, Topic, undefined>();
|
||||
|
||||
export const updateTopicAction = createAsyncAction(
|
||||
ActionType.PATCH_TOPIC__REQUEST,
|
||||
ActionType.PATCH_TOPIC__SUCCESS,
|
||||
ActionType.PATCH_TOPIC__FAILURE
|
||||
'PATCH_TOPIC__REQUEST',
|
||||
'PATCH_TOPIC__SUCCESS',
|
||||
'PATCH_TOPIC__FAILURE'
|
||||
)<undefined, Topic, undefined>();
|
||||
|
||||
export const fetchConsumerGroupsAction = createAsyncAction(
|
||||
ActionType.GET_CONSUMER_GROUPS__REQUEST,
|
||||
ActionType.GET_CONSUMER_GROUPS__SUCCESS,
|
||||
ActionType.GET_CONSUMER_GROUPS__FAILURE
|
||||
'GET_CONSUMER_GROUPS__REQUEST',
|
||||
'GET_CONSUMER_GROUPS__SUCCESS',
|
||||
'GET_CONSUMER_GROUPS__FAILURE'
|
||||
)<undefined, ConsumerGroup[], undefined>();
|
||||
|
||||
export const fetchConsumerGroupDetailsAction = createAsyncAction(
|
||||
ActionType.GET_CONSUMER_GROUP_DETAILS__REQUEST,
|
||||
ActionType.GET_CONSUMER_GROUP_DETAILS__SUCCESS,
|
||||
ActionType.GET_CONSUMER_GROUP_DETAILS__FAILURE
|
||||
'GET_CONSUMER_GROUP_DETAILS__REQUEST',
|
||||
'GET_CONSUMER_GROUP_DETAILS__SUCCESS',
|
||||
'GET_CONSUMER_GROUP_DETAILS__FAILURE'
|
||||
)<
|
||||
undefined,
|
||||
{ consumerGroupID: ConsumerGroupID; details: ConsumerGroupDetails },
|
||||
|
@ -100,7 +99,7 @@ export const fetchConsumerGroupDetailsAction = createAsyncAction(
|
|||
>();
|
||||
|
||||
export const fetchSchemasByClusterNameAction = createAsyncAction(
|
||||
ActionType.GET_CLUSTER_SCHEMAS__REQUEST,
|
||||
ActionType.GET_CLUSTER_SCHEMAS__SUCCESS,
|
||||
ActionType.GET_CLUSTER_SCHEMAS__FAILURE
|
||||
'GET_CLUSTER_SCHEMAS__REQUEST',
|
||||
'GET_CLUSTER_SCHEMAS__SUCCESS',
|
||||
'GET_CLUSTER_SCHEMAS__FAILURE'
|
||||
)<undefined, SchemaSubject[], undefined>();
|
||||
|
|
|
@ -1,4 +1,3 @@
|
|||
import { flatten } from 'lodash';
|
||||
import {
|
||||
ApiClustersApi,
|
||||
Configuration,
|
||||
|
@ -261,13 +260,13 @@ export const fetchSchemasByClusterName = (
|
|||
const schemaNames = await apiClient.getSchemas({ clusterName });
|
||||
|
||||
// TODO: Remove me after API refactoring
|
||||
const schemas: SchemaSubject[][] = await Promise.all(
|
||||
const schemas: SchemaSubject[] = await Promise.all(
|
||||
schemaNames.map((schemaName) =>
|
||||
apiClient.getLatestSchema({ clusterName, schemaName })
|
||||
)
|
||||
);
|
||||
|
||||
dispatch(actions.fetchSchemasByClusterNameAction.success(flatten(schemas)));
|
||||
dispatch(actions.fetchSchemasByClusterNameAction.success(schemas));
|
||||
} catch (e) {
|
||||
dispatch(actions.fetchSchemasByClusterNameAction.failure());
|
||||
}
|
||||
|
|
|
@ -18,13 +18,6 @@ export * from './consumerGroup';
|
|||
export * from './schema';
|
||||
export * from './loader';
|
||||
|
||||
export enum FetchStatus {
|
||||
notFetched = 'notFetched',
|
||||
fetching = 'fetching',
|
||||
fetched = 'fetched',
|
||||
errorFetching = 'errorFetching',
|
||||
}
|
||||
|
||||
export interface RootState {
|
||||
topics: TopicsState;
|
||||
clusters: ClusterState;
|
||||
|
|
|
@ -1,5 +1,3 @@
|
|||
import { FetchStatus } from 'redux/interfaces/index';
|
||||
|
||||
export interface LoaderState {
|
||||
[key: string]: FetchStatus;
|
||||
[key: string]: 'notFetched' | 'fetching' | 'fetched' | 'errorFetching';
|
||||
}
|
||||
|
|
|
@ -1,6 +1,5 @@
|
|||
import { Action, BrokersState, ZooKeeperStatus } from 'redux/interfaces';
|
||||
import { ClusterStats } from 'generated-sources';
|
||||
import ActionType from 'redux/actionType';
|
||||
|
||||
export const initialState: BrokersState = {
|
||||
items: [],
|
||||
|
@ -36,14 +35,14 @@ const updateBrokerSegmentSize = (
|
|||
|
||||
const reducer = (state = initialState, action: Action): BrokersState => {
|
||||
switch (action.type) {
|
||||
case ActionType.GET_BROKERS__REQUEST:
|
||||
case 'GET_BROKERS__REQUEST':
|
||||
return initialState;
|
||||
case ActionType.GET_BROKERS__SUCCESS:
|
||||
case 'GET_BROKERS__SUCCESS':
|
||||
return {
|
||||
...state,
|
||||
items: action.payload,
|
||||
};
|
||||
case ActionType.GET_CLUSTER_STATS__SUCCESS:
|
||||
case 'GET_CLUSTER_STATUS__SUCCESS':
|
||||
return updateBrokerSegmentSize(state, action.payload);
|
||||
default:
|
||||
return state;
|
||||
|
|
|
@ -1,5 +1,5 @@
|
|||
import { createSelector } from 'reselect';
|
||||
import { RootState, FetchStatus, BrokersState } from 'redux/interfaces';
|
||||
import { RootState, BrokersState } from 'redux/interfaces';
|
||||
import { createFetchingSelector } from 'redux/reducers/loader/selectors';
|
||||
|
||||
const brokersState = ({ brokers }: RootState): BrokersState => brokers;
|
||||
|
@ -8,7 +8,7 @@ const getBrokerListFetchingStatus = createFetchingSelector('GET_BROKERS');
|
|||
|
||||
export const getIsBrokerListFetched = createSelector(
|
||||
getBrokerListFetchingStatus,
|
||||
(status) => status === FetchStatus.fetched
|
||||
(status) => status === 'fetched'
|
||||
);
|
||||
|
||||
export const getBrokerCount = createSelector(
|
||||
|
|
|
@ -1,12 +1,11 @@
|
|||
import { Action } from 'redux/interfaces';
|
||||
import { Cluster } from 'generated-sources';
|
||||
import ActionType from 'redux/actionType';
|
||||
|
||||
export const initialState: Cluster[] = [];
|
||||
|
||||
const reducer = (state = initialState, action: Action): Cluster[] => {
|
||||
switch (action.type) {
|
||||
case ActionType.GET_CLUSTERS__SUCCESS:
|
||||
case 'GET_CLUSTERS__SUCCESS':
|
||||
return action.payload;
|
||||
default:
|
||||
return state;
|
||||
|
|
|
@ -1,5 +1,5 @@
|
|||
import { createSelector } from 'reselect';
|
||||
import { RootState, FetchStatus } from 'redux/interfaces';
|
||||
import { RootState } from 'redux/interfaces';
|
||||
import { createFetchingSelector } from 'redux/reducers/loader/selectors';
|
||||
import { Cluster, ServerStatus } from 'generated-sources';
|
||||
|
||||
|
@ -9,7 +9,7 @@ const getClusterListFetchingStatus = createFetchingSelector('GET_CLUSTERS');
|
|||
|
||||
export const getIsClusterListFetched = createSelector(
|
||||
getClusterListFetchingStatus,
|
||||
(status) => status === FetchStatus.fetched
|
||||
(status) => status === 'fetched'
|
||||
);
|
||||
|
||||
export const getClusterList = createSelector(
|
||||
|
|
|
@ -1,6 +1,5 @@
|
|||
import { Action, ConsumerGroupsState } from 'redux/interfaces';
|
||||
import { ConsumerGroup } from 'generated-sources';
|
||||
import ActionType from 'redux/actionType';
|
||||
|
||||
export const initialState: ConsumerGroupsState = {
|
||||
byID: {},
|
||||
|
@ -34,9 +33,9 @@ const updateConsumerGroupsList = (
|
|||
|
||||
const reducer = (state = initialState, action: Action): ConsumerGroupsState => {
|
||||
switch (action.type) {
|
||||
case ActionType.GET_CONSUMER_GROUPS__SUCCESS:
|
||||
case 'GET_CONSUMER_GROUPS__SUCCESS':
|
||||
return updateConsumerGroupsList(state, action.payload);
|
||||
case ActionType.GET_CONSUMER_GROUP_DETAILS__SUCCESS:
|
||||
case 'GET_CONSUMER_GROUP_DETAILS__SUCCESS':
|
||||
return {
|
||||
...state,
|
||||
byID: {
|
||||
|
|
|
@ -1,5 +1,5 @@
|
|||
import { createSelector } from 'reselect';
|
||||
import { RootState, FetchStatus } from 'redux/interfaces';
|
||||
import { RootState } from 'redux/interfaces';
|
||||
import { createFetchingSelector } from 'redux/reducers/loader/selectors';
|
||||
import {
|
||||
ConsumerGroupID,
|
||||
|
@ -24,12 +24,12 @@ const getConsumerGroupDetailsFetchingStatus = createFetchingSelector(
|
|||
|
||||
export const getIsConsumerGroupsListFetched = createSelector(
|
||||
getConsumerGroupsListFetchingStatus,
|
||||
(status) => status === FetchStatus.fetched
|
||||
(status) => status === 'fetched'
|
||||
);
|
||||
|
||||
export const getIsConsumerGroupDetailsFetched = createSelector(
|
||||
getConsumerGroupDetailsFetchingStatus,
|
||||
(status) => status === FetchStatus.fetched
|
||||
(status) => status === 'fetched'
|
||||
);
|
||||
|
||||
export const getConsumerGroupsList = createSelector(
|
||||
|
|
|
@ -1,4 +1,4 @@
|
|||
import { FetchStatus, Action, LoaderState } from 'redux/interfaces';
|
||||
import { Action, LoaderState } from 'redux/interfaces';
|
||||
|
||||
export const initialState: LoaderState = {};
|
||||
|
||||
|
@ -15,17 +15,17 @@ const reducer = (state = initialState, action: Action): LoaderState => {
|
|||
case 'REQUEST':
|
||||
return {
|
||||
...state,
|
||||
[requestName]: FetchStatus.fetching,
|
||||
[requestName]: 'fetching',
|
||||
};
|
||||
case 'SUCCESS':
|
||||
return {
|
||||
...state,
|
||||
[requestName]: FetchStatus.fetched,
|
||||
[requestName]: 'fetched',
|
||||
};
|
||||
case 'FAILURE':
|
||||
return {
|
||||
...state,
|
||||
[requestName]: FetchStatus.errorFetching,
|
||||
[requestName]: 'errorFetching',
|
||||
};
|
||||
default:
|
||||
return state;
|
||||
|
|
|
@ -1,4 +1,4 @@
|
|||
import { RootState, FetchStatus } from 'redux/interfaces';
|
||||
import { RootState } from 'redux/interfaces';
|
||||
|
||||
export const createFetchingSelector = (action: string) => (state: RootState) =>
|
||||
state.loader[action] || FetchStatus.notFetched;
|
||||
state.loader[action] || 'notFetched';
|
||||
|
|
|
@ -1,5 +1,4 @@
|
|||
import { SchemaSubject } from 'generated-sources';
|
||||
import ActionType from 'redux/actionType';
|
||||
import { Action, SchemasState } from 'redux/interfaces';
|
||||
|
||||
export const initialState: SchemasState = {
|
||||
|
@ -34,7 +33,7 @@ const updateSchemaList = (
|
|||
|
||||
const reducer = (state = initialState, action: Action): SchemasState => {
|
||||
switch (action.type) {
|
||||
case ActionType.GET_CLUSTER_SCHEMAS__SUCCESS:
|
||||
case 'GET_CLUSTER_SCHEMAS__SUCCESS':
|
||||
return updateSchemaList(state, action.payload);
|
||||
default:
|
||||
return state;
|
||||
|
|
|
@ -1,5 +1,5 @@
|
|||
import { createSelector } from 'reselect';
|
||||
import { FetchStatus, RootState, SchemasState } from 'redux/interfaces';
|
||||
import { RootState, SchemasState } from 'redux/interfaces';
|
||||
import { createFetchingSelector } from 'redux/reducers/loader/selectors';
|
||||
|
||||
const schemasState = ({ schemas }: RootState): SchemasState => schemas;
|
||||
|
@ -13,7 +13,7 @@ const getSchemaListFetchingStatus = createFetchingSelector(
|
|||
|
||||
export const getIsSchemaListFetched = createSelector(
|
||||
getSchemaListFetchingStatus,
|
||||
(status) => status === FetchStatus.fetched
|
||||
(status) => status === 'fetched'
|
||||
);
|
||||
|
||||
export const getSchemaList = createSelector(
|
||||
|
|
|
@ -1,7 +1,6 @@
|
|||
import { v4 } from 'uuid';
|
||||
import { Topic, TopicMessage } from 'generated-sources';
|
||||
import { Action, TopicsState } from 'redux/interfaces';
|
||||
import ActionType from 'redux/actionType';
|
||||
|
||||
export const initialState: TopicsState = {
|
||||
byName: {},
|
||||
|
@ -68,9 +67,9 @@ const transformTopicMessages = (
|
|||
|
||||
const reducer = (state = initialState, action: Action): TopicsState => {
|
||||
switch (action.type) {
|
||||
case ActionType.GET_TOPICS__SUCCESS:
|
||||
case 'GET_TOPICS__SUCCESS':
|
||||
return updateTopicList(state, action.payload);
|
||||
case ActionType.GET_TOPIC_DETAILS__SUCCESS:
|
||||
case 'GET_TOPIC_DETAILS__SUCCESS':
|
||||
return {
|
||||
...state,
|
||||
byName: {
|
||||
|
@ -81,9 +80,9 @@ const reducer = (state = initialState, action: Action): TopicsState => {
|
|||
},
|
||||
},
|
||||
};
|
||||
case ActionType.GET_TOPIC_MESSAGES__SUCCESS:
|
||||
case 'GET_TOPIC_MESSAGES__SUCCESS':
|
||||
return transformTopicMessages(state, action.payload);
|
||||
case ActionType.GET_TOPIC_CONFIG__SUCCESS:
|
||||
case 'GET_TOPIC_CONFIG__SUCCESS':
|
||||
return {
|
||||
...state,
|
||||
byName: {
|
||||
|
@ -97,7 +96,7 @@ const reducer = (state = initialState, action: Action): TopicsState => {
|
|||
},
|
||||
},
|
||||
};
|
||||
case ActionType.POST_TOPIC__SUCCESS:
|
||||
case 'POST_TOPIC__SUCCESS':
|
||||
return addToTopicList(state, action.payload);
|
||||
default:
|
||||
return state;
|
||||
|
|
|
@ -2,7 +2,6 @@ import { createSelector } from 'reselect';
|
|||
import {
|
||||
RootState,
|
||||
TopicName,
|
||||
FetchStatus,
|
||||
TopicsState,
|
||||
TopicConfigByName,
|
||||
} from 'redux/interfaces';
|
||||
|
@ -29,32 +28,32 @@ const getTopicUpdateStatus = createFetchingSelector('PATCH_TOPIC');
|
|||
|
||||
export const getIsTopicListFetched = createSelector(
|
||||
getTopicListFetchingStatus,
|
||||
(status) => status === FetchStatus.fetched
|
||||
(status) => status === 'fetched'
|
||||
);
|
||||
|
||||
export const getIsTopicDetailsFetched = createSelector(
|
||||
getTopicDetailsFetchingStatus,
|
||||
(status) => status === FetchStatus.fetched
|
||||
(status) => status === 'fetched'
|
||||
);
|
||||
|
||||
export const getIsTopicMessagesFetched = createSelector(
|
||||
getTopicMessagesFetchingStatus,
|
||||
(status) => status === FetchStatus.fetched
|
||||
(status) => status === 'fetched'
|
||||
);
|
||||
|
||||
export const getTopicConfigFetched = createSelector(
|
||||
getTopicConfigFetchingStatus,
|
||||
(status) => status === FetchStatus.fetched
|
||||
(status) => status === 'fetched'
|
||||
);
|
||||
|
||||
export const getTopicCreated = createSelector(
|
||||
getTopicCreationStatus,
|
||||
(status) => status === FetchStatus.fetched
|
||||
(status) => status === 'fetched'
|
||||
);
|
||||
|
||||
export const getTopicUpdated = createSelector(
|
||||
getTopicUpdateStatus,
|
||||
(status) => status === FetchStatus.fetched
|
||||
(status) => status === 'fetched'
|
||||
);
|
||||
|
||||
export const getTopicList = createSelector(
|
||||
|
|
Loading…
Add table
Reference in a new issue