diff --git a/docker/kafka-clusters-only.yaml b/docker/kafka-clusters-only.yaml
index 91b74cd327..6911054ceb 100644
--- a/docker/kafka-clusters-only.yaml
+++ b/docker/kafka-clusters-only.yaml
@@ -88,6 +88,32 @@ services:
ports:
- 8081:8081
+ kafka-connect0:
+ image: confluentinc/cp-kafka-connect:5.1.0
+ ports:
+ - 8083:8083
+ depends_on:
+ - kafka0
+ - schemaregistry0
+ environment:
+ CONNECT_BOOTSTRAP_SERVERS: kafka0:29092
+ CONNECT_GROUP_ID: compose-connect-group
+ CONNECT_CONFIG_STORAGE_TOPIC: _connect_configs
+ CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 1
+ CONNECT_OFFSET_STORAGE_TOPIC: _connect_offset
+ CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 1
+ CONNECT_STATUS_STORAGE_TOPIC: _connect_status
+ CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 1
+ CONNECT_KEY_CONVERTER: org.apache.kafka.connect.storage.StringConverter
+ CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL: http://schemaregistry0:8085
+ CONNECT_VALUE_CONVERTER: org.apache.kafka.connect.storage.StringConverter
+ CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: http://schemaregistry0:8085
+ CONNECT_INTERNAL_KEY_CONVERTER: org.apache.kafka.connect.json.JsonConverter
+ CONNECT_INTERNAL_VALUE_CONVERTER: org.apache.kafka.connect.json.JsonConverter
+ CONNECT_REST_ADVERTISED_HOST_NAME: kafka-connect0
+ CONNECT_PLUGIN_PATH: "/usr/share/java,/usr/share/confluent-hub-components"
+
+
kafka-init-topics:
image: confluentinc/cp-kafka:5.1.0
volumes:
@@ -98,4 +124,5 @@ services:
cub kafka-ready -b kafka1:29092 1 30 && \
kafka-topics --create --topic second.users --partitions 3 --replication-factor 1 --if-not-exists --zookeeper zookeeper1:2181 && \
kafka-topics --create --topic second.messages --partitions 2 --replication-factor 1 --if-not-exists --zookeeper zookeeper1:2181 && \
+ kafka-topics --create --topic first.messages --partitions 2 --replication-factor 1 --if-not-exists --zookeeper zookeeper0:2181 && \
kafka-console-producer --broker-list kafka1:29092 -topic second.users < /data/message.json'"
diff --git a/docker/kafka-ui.yaml b/docker/kafka-ui.yaml
index 780785920d..9740be15d1 100644
--- a/docker/kafka-ui.yaml
+++ b/docker/kafka-ui.yaml
@@ -13,17 +13,22 @@ services:
- kafka0
- kafka1
- schemaregistry0
+ - kafka-connect0
environment:
KAFKA_CLUSTERS_0_NAME: local
KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka0:29092
KAFKA_CLUSTERS_0_ZOOKEEPER: zookeeper0:2181
KAFKA_CLUSTERS_0_JMXPORT: 9997
KAFKA_CLUSTERS_0_SCHEMAREGISTRY: http://schemaregistry0:8085
+ KAFKA_CLUSTERS_0_KAFKACONNECT_0_NAME: first
+ KAFKA_CLUSTERS_0_KAFKACONNECT_0_ADDRESS: http://kafka-connect0:8083
KAFKA_CLUSTERS_1_NAME: secondLocal
KAFKA_CLUSTERS_1_BOOTSTRAPSERVERS: kafka1:29092
KAFKA_CLUSTERS_1_ZOOKEEPER: zookeeper1:2181
KAFKA_CLUSTERS_1_JMXPORT: 9998
KAFKA_CLUSTERS_1_SCHEMAREGISTRY: http://schemaregistry0:8085
+ KAFKA_CLUSTERS_1_KAFKACONNECT_0_NAME: first
+ KAFKA_CLUSTERS_1_KAFKACONNECT_0_ADDRESS: http://kafka-connect0:8083
zookeeper0:
image: confluentinc/cp-zookeeper:5.1.0
@@ -91,6 +96,31 @@ services:
SCHEMA_REGISTRY_LOG4J_ROOT_LOGLEVEL: INFO
SCHEMA_REGISTRY_KAFKASTORE_TOPIC: _schemas
+ kafka-connect0:
+ image: confluentinc/cp-kafka-connect:5.1.0
+ ports:
+ - 8083:8083
+ depends_on:
+ - kafka0
+ - schemaregistry0
+ environment:
+ CONNECT_BOOTSTRAP_SERVERS: kafka0:29092
+ CONNECT_GROUP_ID: compose-connect-group
+ CONNECT_CONFIG_STORAGE_TOPIC: _connect_configs
+ CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 1
+ CONNECT_OFFSET_STORAGE_TOPIC: _connect_offset
+ CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 1
+ CONNECT_STATUS_STORAGE_TOPIC: _connect_status
+ CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 1
+ CONNECT_KEY_CONVERTER: org.apache.kafka.connect.storage.StringConverter
+ CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL: http://schemaregistry0:8085
+ CONNECT_VALUE_CONVERTER: org.apache.kafka.connect.storage.StringConverter
+ CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: http://schemaregistry0:8085
+ CONNECT_INTERNAL_KEY_CONVERTER: org.apache.kafka.connect.json.JsonConverter
+ CONNECT_INTERNAL_VALUE_CONVERTER: org.apache.kafka.connect.json.JsonConverter
+ CONNECT_REST_ADVERTISED_HOST_NAME: kafka-connect0
+ CONNECT_PLUGIN_PATH: "/usr/share/java,/usr/share/confluent-hub-components"
+
kafka-init-topics:
image: confluentinc/cp-kafka:5.1.0
volumes:
@@ -101,4 +131,5 @@ services:
cub kafka-ready -b kafka1:29092 1 30 && \
kafka-topics --create --topic second.users --partitions 3 --replication-factor 1 --if-not-exists --zookeeper zookeeper1:2181 && \
kafka-topics --create --topic second.messages --partitions 2 --replication-factor 1 --if-not-exists --zookeeper zookeeper1:2181 && \
+ kafka-topics --create --topic first.messages --partitions 2 --replication-factor 1 --if-not-exists --zookeeper zookeeper0:2181 && \
kafka-console-producer --broker-list kafka1:29092 -topic second.users < /data/message.json'"
diff --git a/kafka-ui-api/pom.xml b/kafka-ui-api/pom.xml
index b5924a3ed1..3df0f29bb9 100644
--- a/kafka-ui-api/pom.xml
+++ b/kafka-ui-api/pom.xml
@@ -103,6 +103,10 @@
org.springframework.boot
spring-boot-starter-log4j2
+
+ io.projectreactor.addons
+ reactor-extra
+
org.springframework.boot
@@ -164,8 +168,8 @@
maven-compiler-plugin
${maven-compiler-plugin.version}
- 13
- 13
+ ${maven.compiler.source}
+ ${maven.compiler.target}
org.mapstruct
@@ -185,6 +189,16 @@
+
+ org.apache.maven.plugins
+ maven-surefire-plugin
+ ${maven-surefire-plugin.version}
+
+
+ --illegal-access=permit
+
+
+
diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/client/KafkaConnectClients.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/client/KafkaConnectClients.java
new file mode 100644
index 0000000000..5c9ed96969
--- /dev/null
+++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/client/KafkaConnectClients.java
@@ -0,0 +1,15 @@
+package com.provectus.kafka.ui.cluster.client;
+
+import com.provectus.kafka.ui.connect.api.ConnectApi;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+public final class KafkaConnectClients {
+
+ private static final Map CACHE = new ConcurrentHashMap<>();
+
+ public static ConnectApi withBaseUrl(String basePath) {
+ return CACHE.computeIfAbsent(basePath, RetryingKafkaConnectClient::new);
+ }
+}
diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/client/RetryingKafkaConnectClient.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/client/RetryingKafkaConnectClient.java
new file mode 100644
index 0000000000..a11564e84a
--- /dev/null
+++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/client/RetryingKafkaConnectClient.java
@@ -0,0 +1,87 @@
+package com.provectus.kafka.ui.cluster.client;
+
+import com.provectus.kafka.ui.cluster.exception.RebalanceInProgressException;
+import com.provectus.kafka.ui.cluster.exception.ValidationException;
+import com.provectus.kafka.ui.connect.ApiClient;
+import com.provectus.kafka.ui.connect.api.ConnectApi;
+import com.provectus.kafka.ui.connect.model.Connector;
+import com.provectus.kafka.ui.connect.model.NewConnector;
+import lombok.extern.log4j.Log4j2;
+import org.springframework.core.ParameterizedTypeReference;
+import org.springframework.http.HttpHeaders;
+import org.springframework.http.HttpMethod;
+import org.springframework.http.MediaType;
+import org.springframework.util.MultiValueMap;
+import org.springframework.web.client.RestClientException;
+import org.springframework.web.reactive.function.client.WebClientResponseException;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+import reactor.retry.Retry;
+
+import java.util.List;
+import java.util.Map;
+
+@Log4j2
+public class RetryingKafkaConnectClient extends ConnectApi {
+ private static final int MAX_RETRIES = 5;
+
+ public RetryingKafkaConnectClient(String basePath) {
+ super(new RetryingApiClient().setBasePath(basePath));
+ }
+
+ @Override
+ public Mono createConnector(NewConnector newConnector) throws RestClientException {
+ return withBadRequestErrorHandling(
+ super.createConnector(newConnector)
+ );
+ }
+
+ @Override
+ public Mono setConnectorConfig(String connectorName, Map requestBody) throws RestClientException {
+ return withBadRequestErrorHandling(
+ super.setConnectorConfig(connectorName, requestBody)
+ );
+ }
+
+ private static class RetryingApiClient extends ApiClient {
+ @Override
+ public Mono invokeAPI(String path, HttpMethod method, Map pathParams, MultiValueMap queryParams, Object body, HttpHeaders headerParams, MultiValueMap cookieParams, MultiValueMap formParams, List accept, MediaType contentType, String[] authNames, ParameterizedTypeReference returnType) throws RestClientException {
+ return withRetryOnConflict(
+ super.invokeAPI(path, method, pathParams, queryParams, body, headerParams, cookieParams, formParams, accept, contentType, authNames, returnType)
+ );
+ }
+
+ @Override
+ public Flux invokeFluxAPI(String path, HttpMethod method, Map pathParams, MultiValueMap queryParams, Object body, HttpHeaders headerParams, MultiValueMap cookieParams, MultiValueMap formParams, List accept, MediaType contentType, String[] authNames, ParameterizedTypeReference returnType) throws RestClientException {
+ return withRetryOnConflict(
+ super.invokeFluxAPI(path, method, pathParams, queryParams, body, headerParams, cookieParams, formParams, accept, contentType, authNames, returnType)
+ );
+ }
+ }
+
+ private static Mono withRetryOnConflict(Mono publisher) {
+ return publisher.retryWhen(
+ Retry.onlyIf(e -> e.exception() instanceof WebClientResponseException.Conflict)
+ .retryMax(MAX_RETRIES)
+ )
+ .onErrorResume(WebClientResponseException.Conflict.class, e -> Mono.error(new RebalanceInProgressException()))
+ .doOnError(log::error);
+ }
+
+ private static Flux withRetryOnConflict(Flux publisher) {
+ return publisher.retryWhen(
+ Retry.onlyIf(e -> e.exception() instanceof WebClientResponseException.Conflict)
+ .retryMax(MAX_RETRIES)
+ )
+ .onErrorResume(WebClientResponseException.Conflict.class, e -> Mono.error(new RebalanceInProgressException()))
+ .doOnError(log::error);
+ }
+
+ private static Mono withBadRequestErrorHandling(Mono publisher) {
+ return publisher
+ .onErrorResume(WebClientResponseException.BadRequest.class, e ->
+ Mono.error(new ValidationException("Invalid configuration")))
+ .onErrorResume(WebClientResponseException.InternalServerError.class, e ->
+ Mono.error(new ValidationException("Invalid configuration")));
+ }
+}
diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/config/ClustersProperties.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/config/ClustersProperties.java
index 093bfefef8..9db628c4f4 100644
--- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/config/ClustersProperties.java
+++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/config/ClustersProperties.java
@@ -24,7 +24,14 @@ public class ClustersProperties {
String schemaNameTemplate = "%s-value";
String protobufFile;
String protobufMessageName;
+ List kafkaConnect;
int jmxPort;
Properties properties;
}
+
+ @Data
+ public static class ConnectCluster {
+ String name;
+ String address;
+ }
}
diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/exception/RebalanceInProgressException.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/exception/RebalanceInProgressException.java
new file mode 100644
index 0000000000..a6fe7c4865
--- /dev/null
+++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/exception/RebalanceInProgressException.java
@@ -0,0 +1,15 @@
+package com.provectus.kafka.ui.cluster.exception;
+
+import org.springframework.http.HttpStatus;
+
+public class RebalanceInProgressException extends CustomBaseException {
+
+ public RebalanceInProgressException() {
+ super("Rebalance is in progress.");
+ }
+
+ @Override
+ public HttpStatus getResponseStatusCode() {
+ return HttpStatus.CONFLICT;
+ }
+}
diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/exception/ValidationException.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/exception/ValidationException.java
new file mode 100644
index 0000000000..ff2e9b72af
--- /dev/null
+++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/exception/ValidationException.java
@@ -0,0 +1,14 @@
+package com.provectus.kafka.ui.cluster.exception;
+
+import org.springframework.http.HttpStatus;
+
+public class ValidationException extends CustomBaseException {
+ public ValidationException(String message) {
+ super(message);
+ }
+
+ @Override
+ public HttpStatus getResponseStatusCode() {
+ return HttpStatus.BAD_REQUEST;
+ }
+}
diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/mapper/ClusterMapper.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/mapper/ClusterMapper.java
index 91cadf3c26..a9e48bd627 100644
--- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/mapper/ClusterMapper.java
+++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/mapper/ClusterMapper.java
@@ -36,6 +36,7 @@ public interface ClusterMapper {
TopicDetails toTopicDetails(InternalTopic topic);
TopicConfig toTopicConfig(InternalTopicConfig topic);
Replica toReplica(InternalReplica replica);
+ Connect toKafkaConnect(KafkaConnectCluster connect);
@Mapping(target = "isCompatible", source = "compatible")
CompatibilityCheckResponse toCompatibilityCheckResponse(InternalCompatibilityCheck dto);
diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/mapper/KafkaConnectMapper.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/mapper/KafkaConnectMapper.java
new file mode 100644
index 0000000000..4010e102dd
--- /dev/null
+++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/mapper/KafkaConnectMapper.java
@@ -0,0 +1,24 @@
+package com.provectus.kafka.ui.cluster.mapper;
+
+import com.provectus.kafka.ui.connect.model.ConnectorStatusConnector;
+import com.provectus.kafka.ui.connect.model.ConnectorTask;
+import com.provectus.kafka.ui.connect.model.NewConnector;
+import com.provectus.kafka.ui.model.*;
+import org.mapstruct.Mapper;
+
+@Mapper(componentModel = "spring")
+public interface KafkaConnectMapper {
+ NewConnector toClient(com.provectus.kafka.ui.model.NewConnector newConnector);
+
+ Connector fromClient(com.provectus.kafka.ui.connect.model.Connector connector);
+
+ ConnectorStatus fromClient(ConnectorStatusConnector connectorStatus);
+
+ Task fromClient(ConnectorTask connectorTask);
+
+ TaskStatus fromClient(com.provectus.kafka.ui.connect.model.TaskStatus taskStatus);
+
+ ConnectorPlugin fromClient(com.provectus.kafka.ui.connect.model.ConnectorPlugin connectorPlugin);
+
+ ConnectorPluginConfigValidationResponse fromClient(com.provectus.kafka.ui.connect.model.ConnectorPluginConfigValidationResponse connectorPluginConfigValidationResponse);
+}
diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/model/KafkaCluster.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/model/KafkaCluster.java
index a4a833fddf..cf6e109627 100644
--- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/model/KafkaCluster.java
+++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/model/KafkaCluster.java
@@ -1,6 +1,8 @@
package com.provectus.kafka.ui.cluster.model;
import com.provectus.kafka.ui.model.ServerStatus;
+
+import java.util.List;
import java.util.Properties;
import lombok.Builder;
import lombok.Data;
@@ -16,6 +18,7 @@ public class KafkaCluster {
private final String bootstrapServers;
private final String zookeeper;
private final String schemaRegistry;
+ private final List kafkaConnect;
private final String schemaNameTemplate;
private final ServerStatus status;
private final ServerStatus zookeeperStatus;
diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/model/KafkaConnectCluster.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/model/KafkaConnectCluster.java
new file mode 100644
index 0000000000..352a8369ac
--- /dev/null
+++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/model/KafkaConnectCluster.java
@@ -0,0 +1,11 @@
+package com.provectus.kafka.ui.cluster.model;
+
+import lombok.Builder;
+import lombok.Data;
+
+@Data
+@Builder(toBuilder = true)
+public class KafkaConnectCluster {
+ private final String name;
+ private final String address;
+}
diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/service/KafkaConnectService.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/service/KafkaConnectService.java
new file mode 100644
index 0000000000..9362103ffa
--- /dev/null
+++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/service/KafkaConnectService.java
@@ -0,0 +1,183 @@
+package com.provectus.kafka.ui.cluster.service;
+
+import com.provectus.kafka.ui.cluster.client.KafkaConnectClients;
+import com.provectus.kafka.ui.cluster.exception.NotFoundException;
+import com.provectus.kafka.ui.cluster.mapper.ClusterMapper;
+import com.provectus.kafka.ui.cluster.mapper.KafkaConnectMapper;
+import com.provectus.kafka.ui.cluster.model.ClustersStorage;
+import com.provectus.kafka.ui.cluster.model.KafkaCluster;
+import com.provectus.kafka.ui.cluster.model.KafkaConnectCluster;
+import com.provectus.kafka.ui.model.*;
+import lombok.RequiredArgsConstructor;
+import lombok.extern.log4j.Log4j2;
+import org.springframework.stereotype.Service;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+
+import java.util.Collection;
+import java.util.Map;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+@Service
+@Log4j2
+@RequiredArgsConstructor
+public class KafkaConnectService {
+ private final ClustersStorage clustersStorage;
+ private final ClusterMapper clusterMapper;
+ private final KafkaConnectMapper kafkaConnectMapper;
+
+ public Mono> getConnects(String clusterName) {
+ return Mono.just(
+ Flux.fromIterable(clustersStorage.getClusterByName(clusterName)
+ .map(KafkaCluster::getKafkaConnect).stream()
+ .flatMap(Collection::stream)
+ .map(clusterMapper::toKafkaConnect)
+ .collect(Collectors.toList())
+ )
+ );
+ }
+
+
+ public Flux getConnectors(String clusterName, String connectName) {
+ return getConnectAddress(clusterName, connectName)
+ .flatMapMany(connect ->
+ KafkaConnectClients.withBaseUrl(connect).getConnectors()
+ .doOnError(log::error)
+ );
+ }
+
+ public Mono createConnector(String clusterName, String connectName, Mono connector) {
+ return getConnectAddress(clusterName, connectName)
+ .flatMap(connect ->
+ connector
+ .map(kafkaConnectMapper::toClient)
+ .flatMap(c ->
+ KafkaConnectClients.withBaseUrl(connect).createConnector(c)
+ )
+ .flatMap(c -> getConnector(clusterName, connectName, c.getName()))
+ );
+ }
+
+ public Mono getConnector(String clusterName, String connectName, String connectorName) {
+ return getConnectAddress(clusterName, connectName)
+ .flatMap(connect ->
+ KafkaConnectClients.withBaseUrl(connect).getConnector(connectorName)
+ .map(kafkaConnectMapper::fromClient)
+ .flatMap(connector ->
+ KafkaConnectClients.withBaseUrl(connect).getConnectorStatus(connector.getName())
+ .map(connectorStatus -> {
+ var status = connectorStatus.getConnector();
+ connector.status(kafkaConnectMapper.fromClient(status));
+ return (Connector) new Connector()
+ .status(kafkaConnectMapper.fromClient(status))
+ .type(connector.getType())
+ .tasks(connector.getTasks())
+ .name(connector.getName())
+ .config(connector.getConfig());
+ })
+ )
+ );
+ }
+
+ public Mono
+
+ com.google.code.findbugs
+ jsr305
+ 3.0.2
+ provided
+
@@ -91,12 +97,50 @@
+
+ generate-connect-client
+
+ generate
+
+
+ ${project.basedir}/src/main/resources/swagger/kafka-connect-api.yaml
+
+
+ java
+ false
+ false
+
+
+ com.provectus.kafka.ui.connect.model
+ com.provectus.kafka.ui.connect.api
+ kafka-connect-client
+
+ true
+ webclient
+
+ true
+ java8
+
+
+
+
+ org.apache.maven.plugins
+ maven-clean-plugin
+ ${maven-clean-plugin.version}
+
+
+
+ ${basedir}/${frontend-generated-sources-directory}
+
+
+
+
org.apache.maven.plugins
maven-resources-plugin
- 3.1.0
+ ${maven-resources-plugin.version}
copy-resource-one
@@ -106,7 +150,7 @@
- ${basedir}/..//kafka-ui-react-app/src/generated-sources
+ ${basedir}/${frontend-generated-sources-directory}
${project.build.directory}/generated-sources/frontend/
diff --git a/kafka-ui-contract/src/main/resources/swagger/kafka-connect-api.yaml b/kafka-ui-contract/src/main/resources/swagger/kafka-connect-api.yaml
new file mode 100644
index 0000000000..0581c5f3bc
--- /dev/null
+++ b/kafka-ui-contract/src/main/resources/swagger/kafka-connect-api.yaml
@@ -0,0 +1,501 @@
+openapi: 3.0.0
+info:
+ description: Api Documentation
+ version: 0.1.0
+ title: Api Documentation
+ termsOfService: urn:tos
+ contact: {}
+ license:
+ name: Apache 2.0
+ url: http://www.apache.org/licenses/LICENSE-2.0
+tags:
+ - name: /connect
+servers:
+ - url: /localhost
+
+paths:
+ /connectors:
+ get:
+ tags:
+ - /connect
+ summary: get all connectors from Kafka Connect service
+ operationId: getConnectors
+ responses:
+ 200:
+ description: OK
+ content:
+ application/json:
+ schema:
+ type: array
+ items:
+ type: string
+ post:
+ tags:
+ - /connect
+ summary: create new connector
+ operationId: createConnector
+ requestBody:
+ content:
+ application/json:
+ schema:
+ $ref: '#/components/schemas/NewConnector'
+ responses:
+ 200:
+ description: OK
+ content:
+ application/json:
+ schema:
+ $ref: '#/components/schemas/Connector'
+ 400:
+ description: Bad request
+ 409:
+ description: rebalance is in progress
+ 500:
+ description: Internal server error
+
+ /connectors/{connectorName}:
+ get:
+ tags:
+ - /connect
+ summary: get information about the connector
+ operationId: getConnector
+ parameters:
+ - name: connectorName
+ in: path
+ required: true
+ schema:
+ type: string
+ responses:
+ 200:
+ description: OK
+ content:
+ application/json:
+ schema:
+ $ref: '#/components/schemas/Connector'
+ delete:
+ tags:
+ - /connect
+ summary: delete connector
+ operationId: deleteConnector
+ parameters:
+ - name: connectorName
+ in: path
+ required: true
+ schema:
+ type: string
+ responses:
+ 200:
+ description: OK
+ 409:
+ description: rebalance is in progress
+
+ /connectors/{connectorName}/config:
+ get:
+ tags:
+ - /connect
+ summary: get connector configuration
+ operationId: getConnectorConfig
+ parameters:
+ - name: connectorName
+ in: path
+ required: true
+ schema:
+ type: string
+ responses:
+ 200:
+ description: OK
+ content:
+ application/json:
+ schema:
+ $ref: '#/components/schemas/ConnectorConfig'
+ put:
+ tags:
+ - /connect
+ summary: update or create connector with provided config
+ operationId: setConnectorConfig
+ parameters:
+ - name: connectorName
+ in: path
+ required: true
+ schema:
+ type: string
+ requestBody:
+ content:
+ application/json:
+ schema:
+ $ref: '#/components/schemas/ConnectorConfig'
+ responses:
+ 200:
+ description: OK
+ content:
+ application/json:
+ schema:
+ $ref: '#/components/schemas/Connector'
+ 400:
+ description: Bad request
+ 409:
+ description: rebalance is in progress
+ 500:
+ description: Internal server error
+
+
+ /connectors/{connectorName}/status:
+ get:
+ tags:
+ - /connect
+ summary: get connector status
+ operationId: getConnectorStatus
+ parameters:
+ - name: connectorName
+ in: path
+ required: true
+ schema:
+ type: string
+ responses:
+ 200:
+ description: OK
+ content:
+ application/json:
+ schema:
+ $ref: '#/components/schemas/ConnectorStatus'
+
+ /connectors/{connectorName}/restart:
+ post:
+ tags:
+ - /connect
+ summary: restart the connector
+ operationId: restartConnector
+ parameters:
+ - name: connectorName
+ in: path
+ required: true
+ schema:
+ type: string
+ responses:
+ 200:
+ description: OK
+ 409:
+ description: rebalance is in progress
+
+ /connectors/{connectorName}/pause:
+ put:
+ tags:
+ - /connect
+ summary: pause the connector
+ operationId: pauseConnector
+ parameters:
+ - name: connectorName
+ in: path
+ required: true
+ schema:
+ type: string
+ responses:
+ 202:
+ description: Accepted
+
+ /connectors/{connectorName}/resume:
+ put:
+ tags:
+ - /connect
+ summary: resume the connector
+ operationId: resumeConnector
+ parameters:
+ - name: connectorName
+ in: path
+ required: true
+ schema:
+ type: string
+ responses:
+ 202:
+ description: Accepted
+
+ /connectors/{connectorName}/tasks:
+ get:
+ tags:
+ - /connect
+ summary: get connector tasks
+ operationId: getConnectorTasks
+ parameters:
+ - name: connectorName
+ in: path
+ required: true
+ schema:
+ type: string
+ responses:
+ 200:
+ description: OK
+ content:
+ application/json:
+ schema:
+ type: array
+ items:
+ $ref: '#/components/schemas/ConnectorTask'
+
+ /connectors/{connectorName}/tasks/{taskId}/status:
+ get:
+ tags:
+ - /connect
+ summary: get connector task status
+ operationId: getConnectorTaskStatus
+ parameters:
+ - name: connectorName
+ in: path
+ required: true
+ schema:
+ type: string
+ - name: taskId
+ in: path
+ required: true
+ schema:
+ type: integer
+ responses:
+ 200:
+ description: OK
+ content:
+ application/json:
+ schema:
+ $ref: '#/components/schemas/TaskStatus'
+
+ /connectors/{connectorName}/tasks/{taskId}/restart:
+ post:
+ tags:
+ - /connect
+ summary: restart connector task
+ operationId: restartConnectorTask
+ parameters:
+ - name: connectorName
+ in: path
+ required: true
+ schema:
+ type: string
+ - name: taskId
+ in: path
+ required: true
+ schema:
+ type: integer
+ responses:
+ 200:
+ description: OK
+
+ /connector-plugins:
+ get:
+ tags:
+ - /connect
+ summary: get connector plugins
+ operationId: getConnectorPlugins
+ responses:
+ 200:
+ description: OK
+ content:
+ application/json:
+ schema:
+ type: array
+ items:
+ $ref: '#/components/schemas/ConnectorPlugin'
+
+ /connector-plugins/{pluginName}/config/validate:
+ put:
+ tags:
+ - /connect
+ summary: validate connector plugin configuration
+ operationId: validateConnectorPluginConfig
+ parameters:
+ - name: pluginName
+ in: path
+ required: true
+ schema:
+ type: string
+ requestBody:
+ content:
+ application/json:
+ schema:
+ $ref: '#/components/schemas/ConnectorConfig'
+ responses:
+ 200:
+ description: OK
+ content:
+ application/json:
+ schema:
+ $ref: '#/components/schemas/ConnectorPluginConfigValidationResponse'
+
+components:
+ schemas:
+ ConnectorConfig:
+ type: object
+ additionalProperties:
+ type: object
+
+ Task:
+ type: object
+ properties:
+ connector:
+ type: string
+ task:
+ type: integer
+
+ ConnectorTask:
+ type: object
+ properties:
+ id:
+ $ref: '#/components/schemas/Task'
+ config:
+ $ref: '#/components/schemas/ConnectorConfig'
+
+ NewConnector:
+ type: object
+ properties:
+ name:
+ type: string
+ config:
+ $ref: '#/components/schemas/ConnectorConfig'
+ required:
+ - name
+ - config
+
+ Connector:
+ allOf:
+ - $ref: '#/components/schemas/NewConnector'
+ - type: object
+ properties:
+ tasks:
+ type: array
+ items:
+ $ref: '#/components/schemas/Task'
+ type:
+ type: string
+ enum:
+ - source
+ - sink
+
+ TaskStatus:
+ type: object
+ properties:
+ id:
+ type: integer
+ state:
+ type: string
+ enum:
+ - RUNNING
+ - FAILED
+ - PAUSED
+ - UNASSIGNED
+ worker_id:
+ type: string
+
+ ConnectorStatus:
+ type: object
+ properties:
+ name:
+ type: string
+ connector:
+ type: object
+ properties:
+ state:
+ type: string
+ enum:
+ - RUNNING
+ - FAILED
+ - PAUSED
+ - UNASSIGNED
+ worker_id:
+ type: string
+ tasks:
+ type: array
+ items:
+ $ref: '#/components/schemas/TaskStatus'
+
+ ConnectorPlugin:
+ type: object
+ properties:
+ class:
+ type: string
+
+ ConnectorPluginConfigDefinition:
+ type: object
+ properties:
+ name:
+ type: string
+ type:
+ type: string
+ enum:
+ - BOOLEAN
+ - CLASS
+ - DOUBLE
+ - INT
+ - LIST
+ - LONG
+ - PASSWORD
+ - SHORT
+ - STRING
+ required:
+ type: boolean
+ default_value:
+ type: string
+ importance:
+ type: string
+ enum:
+ - LOW
+ - MEDIUM
+ - HIGH
+ documentation:
+ type: string
+ group:
+ type: string
+ width:
+ type: string
+ enum:
+ - SHORT
+ - MEDIUM
+ - LONG
+ - NONE
+ display_name:
+ type: string
+ dependents:
+ type: array
+ items:
+ type: string
+ order:
+ type: integer
+
+ ConnectorPluginConfigValue:
+ type: object
+ properties:
+ name:
+ type: string
+ value:
+ type: string
+ recommended_values:
+ type: array
+ items:
+ type: string
+ errors:
+ type: array
+ items:
+ type: string
+ visible:
+ type: boolean
+
+ ConnectorPluginConfig:
+ type: object
+ properties:
+ definition:
+ $ref: '#/components/schemas/ConnectorPluginConfigDefinition'
+ value:
+ $ref: '#/components/schemas/ConnectorPluginConfigValue'
+
+ ConnectorPluginConfigValidationResponse:
+ type: object
+ properties:
+ name:
+ type: string
+ error_count:
+ type: integer
+ groups:
+ type: array
+ items:
+ type: string
+ configs:
+ type: array
+ items:
+ $ref: '#/components/schemas/ConnectorPluginConfig'
+
diff --git a/kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml b/kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml
index 51139ebd13..1a24dae624 100644
--- a/kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml
+++ b/kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml
@@ -10,6 +10,7 @@ info:
url: http://www.apache.org/licenses/LICENSE-2.0
tags:
- name: /api/clusters
+ - name: /api/clusters/connects
servers:
- url: /localhost
@@ -641,6 +642,364 @@ paths:
404:
description: Not Found
+ /api/clusters/{clusterName}/connects:
+ get:
+ tags:
+ - /api/clusters/connects
+ summary: get all kafka connect instances
+ operationId: getConnects
+ parameters:
+ - name: clusterName
+ in: path
+ required: true
+ schema:
+ type: string
+ responses:
+ 200:
+ description: OK
+ content:
+ application/json:
+ schema:
+ type: array
+ items:
+ $ref: '#/components/schemas/Connect'
+
+ /api/clusters/{clusterName}/connects/{connectName}/connectors:
+ get:
+ tags:
+ - /api/clusters/connects
+ summary: get all connectors from Kafka Connect service
+ operationId: getConnectors
+ parameters:
+ - name: clusterName
+ in: path
+ required: true
+ schema:
+ type: string
+ - name: connectName
+ in: path
+ required: true
+ schema:
+ type: string
+ responses:
+ 200:
+ description: OK
+ content:
+ application/json:
+ schema:
+ type: array
+ items:
+ type: string
+ post:
+ tags:
+ - /api/clusters/connects
+ summary: create new connector
+ operationId: createConnector
+ parameters:
+ - name: clusterName
+ in: path
+ required: true
+ schema:
+ type: string
+ - name: connectName
+ in: path
+ required: true
+ schema:
+ type: string
+ requestBody:
+ content:
+ application/json:
+ schema:
+ $ref: '#/components/schemas/NewConnector'
+ responses:
+ 200:
+ description: OK
+ content:
+ application/json:
+ schema:
+ $ref: '#/components/schemas/Connector'
+ 409:
+ description: rebalance is in progress
+
+ /api/clusters/{clusterName}/connects/{connectName}/connectors/{connectorName}:
+ get:
+ tags:
+ - /api/clusters/connects
+ summary: get information about the connector
+ operationId: getConnector
+ parameters:
+ - name: clusterName
+ in: path
+ required: true
+ schema:
+ type: string
+ - name: connectName
+ in: path
+ required: true
+ schema:
+ type: string
+ - name: connectorName
+ in: path
+ required: true
+ schema:
+ type: string
+ responses:
+ 200:
+ description: OK
+ content:
+ application/json:
+ schema:
+ $ref: '#/components/schemas/Connector'
+ delete:
+ tags:
+ - /api/clusters/connects
+ summary: delete connector
+ operationId: deleteConnector
+ parameters:
+ - name: clusterName
+ in: path
+ required: true
+ schema:
+ type: string
+ - name: connectName
+ in: path
+ required: true
+ schema:
+ type: string
+ - name: connectorName
+ in: path
+ required: true
+ schema:
+ type: string
+ responses:
+ 200:
+ description: OK
+ 409:
+ description: rebalance is in progress
+
+ /api/clusters/{clusterName}/connects/{connectName}/connectors/{connectorName}/action/{action}:
+ post:
+ tags:
+ - /api/clusters/connects
+ summary: update connector state (restart, pause or resume)
+ operationId: updateConnectorState
+ parameters:
+ - name: clusterName
+ in: path
+ required: true
+ schema:
+ type: string
+ - name: connectName
+ in: path
+ required: true
+ schema:
+ type: string
+ - name: connectorName
+ in: path
+ required: true
+ schema:
+ type: string
+ - name: action
+ in: path
+ required: true
+ schema:
+ $ref: '#/components/schemas/ConnectorAction'
+ responses:
+ 200:
+ description: OK
+ 409:
+ description: rebalance is in progress
+
+ /api/clusters/{clusterName}/connects/{connectName}/connectors/{connectorName}/config:
+ get:
+ tags:
+ - /api/clusters/connects
+ summary: get connector configuration
+ operationId: getConnectorConfig
+ parameters:
+ - name: clusterName
+ in: path
+ required: true
+ schema:
+ type: string
+ - name: connectName
+ in: path
+ required: true
+ schema:
+ type: string
+ - name: connectorName
+ in: path
+ required: true
+ schema:
+ type: string
+ responses:
+ 200:
+ description: OK
+ content:
+ application/json:
+ schema:
+ $ref: '#/components/schemas/ConnectorConfig'
+ put:
+ tags:
+ - /api/clusters/connects
+ summary: update or create connector with provided config
+ operationId: setConnectorConfig
+ parameters:
+ - name: clusterName
+ in: path
+ required: true
+ schema:
+ type: string
+ - name: connectName
+ in: path
+ required: true
+ schema:
+ type: string
+ - name: connectorName
+ in: path
+ required: true
+ schema:
+ type: string
+ requestBody:
+ content:
+ application/json:
+ schema:
+ $ref: '#/components/schemas/ConnectorConfig'
+ responses:
+ 200:
+ description: OK
+ content:
+ application/json:
+ schema:
+ $ref: '#/components/schemas/Connector'
+ 409:
+ description: rebalance is in progress
+
+ /api/clusters/{clusterName}/connects/{connectName}/connectors/{connectorName}/tasks:
+ get:
+ tags:
+ - /api/clusters/connects
+ summary: get connector tasks
+ operationId: getConnectorTasks
+ parameters:
+ - name: clusterName
+ in: path
+ required: true
+ schema:
+ type: string
+ - name: connectName
+ in: path
+ required: true
+ schema:
+ type: string
+ - name: connectorName
+ in: path
+ required: true
+ schema:
+ type: string
+ responses:
+ 200:
+ description: OK
+ content:
+ application/json:
+ schema:
+ type: array
+ items:
+ $ref: '#/components/schemas/Task'
+
+ /api/clusters/{clusterName}/connects/{connectName}/connectors/{connectorName}/tasks/{taskId}/action/restart:
+ post:
+ tags:
+ - /api/clusters/connects
+ summary: restart connector task
+ operationId: restartConnectorTask
+ parameters:
+ - name: clusterName
+ in: path
+ required: true
+ schema:
+ type: string
+ - name: connectName
+ in: path
+ required: true
+ schema:
+ type: string
+ - name: connectorName
+ in: path
+ required: true
+ schema:
+ type: string
+ - name: taskId
+ in: path
+ required: true
+ schema:
+ type: integer
+ responses:
+ 200:
+ description: OK
+
+ /api/clusters/{clusterName}/connects/{connectName}/plugins:
+ get:
+ tags:
+ - /api/clusters/connects
+ summary: get connector plugins
+ operationId: getConnectorPlugins
+ parameters:
+ - name: clusterName
+ in: path
+ required: true
+ schema:
+ type: string
+ - name: connectName
+ in: path
+ required: true
+ schema:
+ type: string
+ responses:
+ 200:
+ description: OK
+ content:
+ application/json:
+ schema:
+ type: array
+ items:
+ $ref: '#/components/schemas/ConnectorPlugin'
+
+ /api/clusters/{clusterName}/connects/{connectName}/plugins/{pluginName}/config/validate:
+ put:
+ tags:
+ - /api/clusters/connects
+ summary: validate connector plugin configuration
+ operationId: validateConnectorPluginConfig
+ parameters:
+ - name: clusterName
+ in: path
+ required: true
+ schema:
+ type: string
+ - name: connectName
+ in: path
+ required: true
+ schema:
+ type: string
+ - name: pluginName
+ in: path
+ required: true
+ schema:
+ type: string
+ requestBody:
+ content:
+ application/json:
+ schema:
+ $ref: '#/components/schemas/ConnectorConfig'
+ responses:
+ 200:
+ description: OK
+ content:
+ application/json:
+ schema:
+ $ref: '#/components/schemas/ConnectorPluginConfigValidationResponse'
+
components:
schemas:
Cluster:
@@ -1020,3 +1379,200 @@ components:
type: boolean
required:
- isCompatible
+
+ Connect:
+ type: object
+ properties:
+ name:
+ type: string
+ address:
+ type: string
+
+ ConnectorConfig:
+ type: object
+ additionalProperties:
+ type: object
+
+ TaskId:
+ type: object
+ properties:
+ connector:
+ type: string
+ task:
+ type: integer
+
+ Task:
+ type: object
+ properties:
+ id:
+ $ref: '#/components/schemas/TaskId'
+ status:
+ $ref: '#/components/schemas/TaskStatus'
+ config:
+ $ref: '#/components/schemas/ConnectorConfig'
+
+ NewConnector:
+ type: object
+ properties:
+ name:
+ type: string
+ config:
+ $ref: '#/components/schemas/ConnectorConfig'
+ required:
+ - name
+ - config
+
+ Connector:
+ allOf:
+ - $ref: '#/components/schemas/NewConnector'
+ - type: object
+ properties:
+ tasks:
+ type: array
+ items:
+ $ref: '#/components/schemas/TaskId'
+ type:
+ type: string
+ enum:
+ - source
+ - sink
+ status:
+ $ref: '#/components/schemas/ConnectorStatus'
+
+
+ TaskStatus:
+ type: object
+ properties:
+ id:
+ type: integer
+ state:
+ type: string
+ enum:
+ - RUNNING
+ - FAILED
+ - PAUSED
+ - UNASSIGNED
+ worker_id:
+ type: string
+ trace:
+ type: string
+
+ ConnectorStatus:
+ type: object
+ properties:
+ state:
+ type: string
+ enum:
+ - RUNNING
+ - FAILED
+ - PAUSED
+ - UNASSIGNED
+ worker_id:
+ type: string
+
+ ConnectorAction:
+ type: string
+ enum:
+ - restart
+ - pause
+ - resume
+
+ TaskAction:
+ type: string
+ enum:
+ - restart
+
+ ConnectorPlugin:
+ type: object
+ properties:
+ class:
+ type: string
+
+ ConnectorPluginConfigDefinition:
+ type: object
+ properties:
+ name:
+ type: string
+ type:
+ type: string
+ enum:
+ - BOOLEAN
+ - CLASS
+ - DOUBLE
+ - INT
+ - LIST
+ - LONG
+ - PASSWORD
+ - SHORT
+ - STRING
+ required:
+ type: boolean
+ default_value:
+ type: string
+ importance:
+ type: string
+ enum:
+ - LOW
+ - MEDIUM
+ - HIGH
+ documentation:
+ type: string
+ group:
+ type: string
+ width:
+ type: string
+ enum:
+ - SHORT
+ - MEDIUM
+ - LONG
+ - NONE
+ display_name:
+ type: string
+ dependents:
+ type: array
+ items:
+ type: string
+ order:
+ type: integer
+
+ ConnectorPluginConfigValue:
+ type: object
+ properties:
+ name:
+ type: string
+ value:
+ type: string
+ recommended_values:
+ type: array
+ items:
+ type: string
+ errors:
+ type: array
+ items:
+ type: string
+ visible:
+ type: boolean
+
+ ConnectorPluginConfig:
+ type: object
+ properties:
+ definition:
+ $ref: '#/components/schemas/ConnectorPluginConfigDefinition'
+ value:
+ $ref: '#/components/schemas/ConnectorPluginConfigValue'
+
+ ConnectorPluginConfigValidationResponse:
+ type: object
+ properties:
+ name:
+ type: string
+ error_count:
+ type: integer
+ groups:
+ type: array
+ items:
+ type: string
+ configs:
+ type: array
+ items:
+ $ref: '#/components/schemas/ConnectorPluginConfig'
diff --git a/pom.xml b/pom.xml
index 45a00f5aa3..62106bc7a2 100644
--- a/pom.xml
+++ b/pom.xml
@@ -24,6 +24,9 @@
1.4.10
1.8.0
3.5.1
+ 3.1.0
+ 3.1.0
+ 2.22.0
4.3.0
1.6.0
1.2.32
@@ -33,6 +36,8 @@
2.2
1.15.1
5.4.0
+
+ ..//kafka-ui-react-app/src/generated-sources