Переглянути джерело

Merge branch 'master' into issue-200-update-schema-subject-object

Ildar Almakaev 4 роки тому
батько
коміт
85dffa7dae
61 змінених файлів з 2889 додано та 306 видалено
  1. 27 0
      docker/kafka-clusters-only.yaml
  2. 32 1
      docker/kafka-ui.yaml
  3. 18 4
      kafka-ui-api/pom.xml
  4. 15 0
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/client/KafkaConnectClients.java
  5. 87 0
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/client/RetryingKafkaConnectClient.java
  6. 8 0
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/config/ClustersProperties.java
  7. 15 0
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/exception/ReadOnlyException.java
  8. 15 0
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/exception/RebalanceInProgressException.java
  9. 14 0
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/exception/ValidationException.java
  10. 1 0
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/mapper/ClusterMapper.java
  11. 24 0
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/mapper/KafkaConnectMapper.java
  12. 4 0
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/model/KafkaCluster.java
  13. 11 0
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/model/KafkaConnectCluster.java
  14. 183 0
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/service/KafkaConnectService.java
  15. 92 0
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/rest/KafkaConnectRestController.java
  16. 49 0
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/rest/config/ReadOnlyModeFilter.java
  17. 7 1
      kafka-ui-api/src/main/resources/application-local.yml
  18. 49 15
      kafka-ui-api/src/test/java/com/provectus/kafka/ui/AbstractBaseTest.java
  19. 42 0
      kafka-ui-api/src/test/java/com/provectus/kafka/ui/KafkaConnectContainer.java
  20. 303 0
      kafka-ui-api/src/test/java/com/provectus/kafka/ui/KafkaConnectServiceTests.java
  21. 97 0
      kafka-ui-api/src/test/java/com/provectus/kafka/ui/ReadOnlyModeTests.java
  22. 11 11
      kafka-ui-api/src/test/java/com/provectus/kafka/ui/SchemaRegistryServiceTests.java
  23. 46 2
      kafka-ui-contract/pom.xml
  24. 501 0
      kafka-ui-contract/src/main/resources/swagger/kafka-connect-api.yaml
  25. 558 0
      kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml
  26. 3 3
      kafka-ui-react-app/package-lock.json
  27. 1 1
      kafka-ui-react-app/package.json
  28. 0 8
      kafka-ui-react-app/src/components/Schemas/Details/Details.tsx
  29. 0 32
      kafka-ui-react-app/src/components/Schemas/Details/__test__/__snapshots__/Details.spec.tsx.snap
  30. 27 4
      kafka-ui-react-app/src/components/Schemas/List/List.tsx
  31. 1 2
      kafka-ui-react-app/src/components/Schemas/List/ListContainer.tsx
  32. 9 12
      kafka-ui-react-app/src/components/Schemas/List/__test__/List.spec.tsx
  33. 0 102
      kafka-ui-react-app/src/components/Schemas/List/__test__/__snapshots__/List.spec.tsx.snap
  34. 2 2
      kafka-ui-react-app/src/components/Schemas/List/__test__/fixtures.ts
  35. 115 0
      kafka-ui-react-app/src/components/Schemas/New/New.tsx
  36. 15 0
      kafka-ui-react-app/src/components/Schemas/New/NewContainer.ts
  37. 37 0
      kafka-ui-react-app/src/components/Schemas/New/__test__/New.spec.tsx
  38. 189 0
      kafka-ui-react-app/src/components/Schemas/New/__test__/__snapshots__/New.spec.tsx.snap
  39. 27 21
      kafka-ui-react-app/src/components/Schemas/Schemas.tsx
  40. 5 22
      kafka-ui-react-app/src/components/Schemas/SchemasContainer.tsx
  41. 11 26
      kafka-ui-react-app/src/components/Schemas/__test__/Schemas.spec.tsx
  42. 0 18
      kafka-ui-react-app/src/components/Schemas/__test__/__snapshots__/Schemas.spec.tsx.snap
  43. 1 0
      kafka-ui-react-app/src/lib/constants.ts
  44. 8 1
      kafka-ui-react-app/src/lib/paths.ts
  45. 26 3
      kafka-ui-react-app/src/redux/actions/__test__/actions.spec.ts
  46. 6 1
      kafka-ui-react-app/src/redux/actions/__test__/fixtures.ts
  47. 32 0
      kafka-ui-react-app/src/redux/actions/__test__/thunks.spec.ts
  48. 6 0
      kafka-ui-react-app/src/redux/actions/actions.ts
  49. 21 0
      kafka-ui-react-app/src/redux/actions/thunks.ts
  50. 5 1
      kafka-ui-react-app/src/redux/interfaces/schema.ts
  51. 5 3
      kafka-ui-react-app/src/redux/reducers/brokers/reducer.ts
  52. 3 1
      kafka-ui-react-app/src/redux/reducers/clusters/reducer.ts
  53. 4 2
      kafka-ui-react-app/src/redux/reducers/consumerGroups/reducer.ts
  54. 18 0
      kafka-ui-react-app/src/redux/reducers/schemas/__test__/__snapshots__/reducer.spec.ts.snap
  55. 44 0
      kafka-ui-react-app/src/redux/reducers/schemas/__test__/fixtures.ts
  56. 10 0
      kafka-ui-react-app/src/redux/reducers/schemas/__test__/reducer.spec.ts
  57. 11 2
      kafka-ui-react-app/src/redux/reducers/schemas/__test__/selectors.spec.ts
  58. 14 0
      kafka-ui-react-app/src/redux/reducers/schemas/reducer.ts
  59. 12 0
      kafka-ui-react-app/src/redux/reducers/schemas/selectors.ts
  60. 7 5
      kafka-ui-react-app/src/redux/reducers/topics/reducer.ts
  61. 5 0
      pom.xml

+ 27 - 0
docker/kafka-clusters-only.yaml

@@ -88,6 +88,32 @@ services:
     ports:
     - 8081:8081
 
+  kafka-connect0:
+    image: confluentinc/cp-kafka-connect:5.1.0
+    ports:
+      - 8083:8083
+    depends_on:
+      - kafka0
+      - schemaregistry0
+    environment:
+      CONNECT_BOOTSTRAP_SERVERS: kafka0:29092
+      CONNECT_GROUP_ID: compose-connect-group
+      CONNECT_CONFIG_STORAGE_TOPIC: _connect_configs
+      CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 1
+      CONNECT_OFFSET_STORAGE_TOPIC: _connect_offset
+      CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 1
+      CONNECT_STATUS_STORAGE_TOPIC: _connect_status
+      CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 1
+      CONNECT_KEY_CONVERTER: org.apache.kafka.connect.storage.StringConverter
+      CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL: http://schemaregistry0:8085
+      CONNECT_VALUE_CONVERTER: org.apache.kafka.connect.storage.StringConverter
+      CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: http://schemaregistry0:8085
+      CONNECT_INTERNAL_KEY_CONVERTER: org.apache.kafka.connect.json.JsonConverter
+      CONNECT_INTERNAL_VALUE_CONVERTER: org.apache.kafka.connect.json.JsonConverter
+      CONNECT_REST_ADVERTISED_HOST_NAME: kafka-connect0
+      CONNECT_PLUGIN_PATH: "/usr/share/java,/usr/share/confluent-hub-components"
+
+
   kafka-init-topics:
     image: confluentinc/cp-kafka:5.1.0
     volumes:
@@ -98,4 +124,5 @@ services:
                cub kafka-ready -b kafka1:29092 1 30 && \
                kafka-topics --create --topic second.users --partitions 3 --replication-factor 1 --if-not-exists --zookeeper zookeeper1:2181 && \
                kafka-topics --create --topic second.messages --partitions 2 --replication-factor 1 --if-not-exists --zookeeper zookeeper1:2181 && \
+               kafka-topics --create --topic first.messages --partitions 2 --replication-factor 1 --if-not-exists --zookeeper zookeeper0:2181 && \
                kafka-console-producer --broker-list kafka1:29092 -topic second.users < /data/message.json'"

+ 32 - 1
docker/kafka-ui.yaml

@@ -4,7 +4,7 @@ services:
 
   kafka-ui:
     container_name: kafka-ui
-    image: kafka-ui:latest
+    image: provectuslabs/kafka-ui:latest
     ports:
       - 8080:8080
     depends_on:
@@ -13,17 +13,22 @@ services:
       - kafka0
       - kafka1
       - schemaregistry0
+      - kafka-connect0
     environment:
       KAFKA_CLUSTERS_0_NAME: local
       KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka0:29092
       KAFKA_CLUSTERS_0_ZOOKEEPER: zookeeper0:2181
       KAFKA_CLUSTERS_0_JMXPORT: 9997
       KAFKA_CLUSTERS_0_SCHEMAREGISTRY: http://schemaregistry0:8085
+      KAFKA_CLUSTERS_0_KAFKACONNECT_0_NAME: first
+      KAFKA_CLUSTERS_0_KAFKACONNECT_0_ADDRESS: http://kafka-connect0:8083
       KAFKA_CLUSTERS_1_NAME: secondLocal
       KAFKA_CLUSTERS_1_BOOTSTRAPSERVERS: kafka1:29092
       KAFKA_CLUSTERS_1_ZOOKEEPER: zookeeper1:2181
       KAFKA_CLUSTERS_1_JMXPORT: 9998
       KAFKA_CLUSTERS_1_SCHEMAREGISTRY: http://schemaregistry0:8085
+      KAFKA_CLUSTERS_1_KAFKACONNECT_0_NAME: first
+      KAFKA_CLUSTERS_1_KAFKACONNECT_0_ADDRESS: http://kafka-connect0:8083
 
   zookeeper0:
     image: confluentinc/cp-zookeeper:5.1.0
@@ -91,6 +96,31 @@ services:
       SCHEMA_REGISTRY_LOG4J_ROOT_LOGLEVEL: INFO
       SCHEMA_REGISTRY_KAFKASTORE_TOPIC: _schemas
 
+  kafka-connect0:
+    image: confluentinc/cp-kafka-connect:5.1.0
+    ports:
+      - 8083:8083
+    depends_on:
+      - kafka0
+      - schemaregistry0
+    environment:
+      CONNECT_BOOTSTRAP_SERVERS: kafka0:29092
+      CONNECT_GROUP_ID: compose-connect-group
+      CONNECT_CONFIG_STORAGE_TOPIC: _connect_configs
+      CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 1
+      CONNECT_OFFSET_STORAGE_TOPIC: _connect_offset
+      CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 1
+      CONNECT_STATUS_STORAGE_TOPIC: _connect_status
+      CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 1
+      CONNECT_KEY_CONVERTER: org.apache.kafka.connect.storage.StringConverter
+      CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL: http://schemaregistry0:8085
+      CONNECT_VALUE_CONVERTER: org.apache.kafka.connect.storage.StringConverter
+      CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: http://schemaregistry0:8085
+      CONNECT_INTERNAL_KEY_CONVERTER: org.apache.kafka.connect.json.JsonConverter
+      CONNECT_INTERNAL_VALUE_CONVERTER: org.apache.kafka.connect.json.JsonConverter
+      CONNECT_REST_ADVERTISED_HOST_NAME: kafka-connect0
+      CONNECT_PLUGIN_PATH: "/usr/share/java,/usr/share/confluent-hub-components"
+
   kafka-init-topics:
     image: confluentinc/cp-kafka:5.1.0
     volumes:
@@ -101,4 +131,5 @@ services:
                cub kafka-ready -b kafka1:29092 1 30 && \
                kafka-topics --create --topic second.users --partitions 3 --replication-factor 1 --if-not-exists --zookeeper zookeeper1:2181 && \
                kafka-topics --create --topic second.messages --partitions 2 --replication-factor 1 --if-not-exists --zookeeper zookeeper1:2181 && \
+               kafka-topics --create --topic first.messages --partitions 2 --replication-factor 1 --if-not-exists --zookeeper zookeeper0:2181 && \
                kafka-console-producer --broker-list kafka1:29092 -topic second.users < /data/message.json'"

+ 18 - 4
kafka-ui-api/pom.xml

@@ -103,6 +103,10 @@
             <groupId>org.springframework.boot</groupId>
             <artifactId>spring-boot-starter-log4j2</artifactId>
         </dependency>
+        <dependency>
+            <groupId>io.projectreactor.addons</groupId>
+            <artifactId>reactor-extra</artifactId>
+        </dependency>
 
         <dependency>
             <groupId>org.springframework.boot</groupId>
@@ -164,8 +168,8 @@
                 <artifactId>maven-compiler-plugin</artifactId>
                 <version>${maven-compiler-plugin.version}</version>
                 <configuration>
-                    <source>13</source>
-                    <target>13</target>
+                    <source>${maven.compiler.source}</source>
+                    <target>${maven.compiler.target}</target>
                     <annotationProcessorPaths>
                         <path>
                             <groupId>org.mapstruct</groupId>
@@ -185,6 +189,16 @@
                     </annotationProcessorPaths>
                 </configuration>
             </plugin>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-surefire-plugin</artifactId>
+                <version>${maven-surefire-plugin.version}</version>
+                <configuration>
+                    <argLine>
+                        --illegal-access=permit
+                    </argLine>
+                </configuration>
+            </plugin>
         </plugins>
     </build>
 
@@ -266,7 +280,7 @@
                                 </goals>
                                 <configuration>
                                     <tag>${git.revision}</tag>
-                                    <repository>kafka-ui</repository>
+									                  <repository>provectuslabs/kafka-ui</repository>
                                     <buildArgs>
                                         <JAR_FILE>${project.build.finalName}.jar</JAR_FILE>
                                         <JAR_NAME>${project.artifactId}.jar</JAR_NAME>
@@ -281,4 +295,4 @@
     </profiles>
 
 
-</project>
+</project>

+ 15 - 0
kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/client/KafkaConnectClients.java

@@ -0,0 +1,15 @@
+package com.provectus.kafka.ui.cluster.client;
+
+import com.provectus.kafka.ui.connect.api.ConnectApi;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+public final class KafkaConnectClients {
+
+    private static final Map<String, ConnectApi> CACHE = new ConcurrentHashMap<>();
+
+    public static ConnectApi withBaseUrl(String basePath) {
+        return CACHE.computeIfAbsent(basePath, RetryingKafkaConnectClient::new);
+    }
+}

+ 87 - 0
kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/client/RetryingKafkaConnectClient.java

@@ -0,0 +1,87 @@
+package com.provectus.kafka.ui.cluster.client;
+
+import com.provectus.kafka.ui.cluster.exception.RebalanceInProgressException;
+import com.provectus.kafka.ui.cluster.exception.ValidationException;
+import com.provectus.kafka.ui.connect.ApiClient;
+import com.provectus.kafka.ui.connect.api.ConnectApi;
+import com.provectus.kafka.ui.connect.model.Connector;
+import com.provectus.kafka.ui.connect.model.NewConnector;
+import lombok.extern.log4j.Log4j2;
+import org.springframework.core.ParameterizedTypeReference;
+import org.springframework.http.HttpHeaders;
+import org.springframework.http.HttpMethod;
+import org.springframework.http.MediaType;
+import org.springframework.util.MultiValueMap;
+import org.springframework.web.client.RestClientException;
+import org.springframework.web.reactive.function.client.WebClientResponseException;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+import reactor.retry.Retry;
+
+import java.util.List;
+import java.util.Map;
+
+@Log4j2
+public class RetryingKafkaConnectClient extends ConnectApi {
+    private static final int MAX_RETRIES = 5;
+
+    public RetryingKafkaConnectClient(String basePath) {
+        super(new RetryingApiClient().setBasePath(basePath));
+    }
+
+    @Override
+    public Mono<Connector> createConnector(NewConnector newConnector) throws RestClientException {
+        return withBadRequestErrorHandling(
+                super.createConnector(newConnector)
+        );
+    }
+
+    @Override
+    public Mono<Connector> setConnectorConfig(String connectorName, Map<String, Object> requestBody) throws RestClientException {
+        return withBadRequestErrorHandling(
+                super.setConnectorConfig(connectorName, requestBody)
+        );
+    }
+
+    private static class RetryingApiClient extends ApiClient {
+        @Override
+        public <T> Mono<T> invokeAPI(String path, HttpMethod method, Map<String, Object> pathParams, MultiValueMap<String, String> queryParams, Object body, HttpHeaders headerParams, MultiValueMap<String, String> cookieParams, MultiValueMap<String, Object> formParams, List<MediaType> accept, MediaType contentType, String[] authNames, ParameterizedTypeReference<T> returnType) throws RestClientException {
+            return withRetryOnConflict(
+                    super.invokeAPI(path, method, pathParams, queryParams, body, headerParams, cookieParams, formParams, accept, contentType, authNames, returnType)
+            );
+        }
+
+        @Override
+        public <T> Flux<T> invokeFluxAPI(String path, HttpMethod method, Map<String, Object> pathParams, MultiValueMap<String, String> queryParams, Object body, HttpHeaders headerParams, MultiValueMap<String, String> cookieParams, MultiValueMap<String, Object> formParams, List<MediaType> accept, MediaType contentType, String[] authNames, ParameterizedTypeReference<T> returnType) throws RestClientException {
+            return withRetryOnConflict(
+                    super.invokeFluxAPI(path, method, pathParams, queryParams, body, headerParams, cookieParams, formParams, accept, contentType, authNames, returnType)
+            );
+        }
+    }
+
+    private static <T> Mono<T> withRetryOnConflict(Mono<T> publisher) {
+        return publisher.retryWhen(
+                Retry.onlyIf(e -> e.exception() instanceof WebClientResponseException.Conflict)
+                        .retryMax(MAX_RETRIES)
+        )
+                .onErrorResume(WebClientResponseException.Conflict.class, e -> Mono.error(new RebalanceInProgressException()))
+                .doOnError(log::error);
+    }
+
+    private static <T> Flux<T> withRetryOnConflict(Flux<T> publisher) {
+        return publisher.retryWhen(
+                Retry.onlyIf(e -> e.exception() instanceof WebClientResponseException.Conflict)
+                        .retryMax(MAX_RETRIES)
+        )
+                .onErrorResume(WebClientResponseException.Conflict.class, e -> Mono.error(new RebalanceInProgressException()))
+                .doOnError(log::error);
+    }
+
+    private static <T> Mono<T> withBadRequestErrorHandling(Mono<T> publisher) {
+        return publisher
+                .onErrorResume(WebClientResponseException.BadRequest.class, e ->
+                        Mono.error(new ValidationException("Invalid configuration")))
+                .onErrorResume(WebClientResponseException.InternalServerError.class, e ->
+                        Mono.error(new ValidationException("Invalid configuration")));
+    }
+}

+ 8 - 0
kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/config/ClustersProperties.java

@@ -24,7 +24,15 @@ public class ClustersProperties {
         String schemaNameTemplate = "%s-value";
         String protobufFile;
         String protobufMessageName;
+        List<ConnectCluster> kafkaConnect;
         int jmxPort;
         Properties properties;
+        boolean readOnly = false;
+    }
+
+    @Data
+    public static class ConnectCluster {
+        String name;
+        String address;
     }
 }

+ 15 - 0
kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/exception/ReadOnlyException.java

@@ -0,0 +1,15 @@
+package com.provectus.kafka.ui.cluster.exception;
+
+import org.springframework.http.HttpStatus;
+
+public class ReadOnlyException extends CustomBaseException {
+
+    public ReadOnlyException() {
+        super("This cluster is in read-only mode.");
+    }
+
+    @Override
+    public HttpStatus getResponseStatusCode() {
+        return HttpStatus.METHOD_NOT_ALLOWED;
+    }
+}

+ 15 - 0
kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/exception/RebalanceInProgressException.java

@@ -0,0 +1,15 @@
+package com.provectus.kafka.ui.cluster.exception;
+
+import org.springframework.http.HttpStatus;
+
+public class RebalanceInProgressException extends CustomBaseException {
+
+    public RebalanceInProgressException() {
+        super("Rebalance is in progress.");
+    }
+
+    @Override
+    public HttpStatus getResponseStatusCode() {
+        return HttpStatus.CONFLICT;
+    }
+}

+ 14 - 0
kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/exception/ValidationException.java

@@ -0,0 +1,14 @@
+package com.provectus.kafka.ui.cluster.exception;
+
+import org.springframework.http.HttpStatus;
+
+public class ValidationException extends CustomBaseException {
+    public ValidationException(String message) {
+        super(message);
+    }
+
+    @Override
+    public HttpStatus getResponseStatusCode() {
+        return HttpStatus.BAD_REQUEST;
+    }
+}

+ 1 - 0
kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/mapper/ClusterMapper.java

@@ -37,6 +37,7 @@ public interface ClusterMapper {
     TopicDetails toTopicDetails(InternalTopic topic);
     TopicConfig toTopicConfig(InternalTopicConfig topic);
     Replica toReplica(InternalReplica replica);
+    Connect toKafkaConnect(KafkaConnectCluster connect);
 
     @Mapping(target = "isCompatible", source = "compatible")
     CompatibilityCheckResponse toCompatibilityCheckResponse(InternalCompatibilityCheck dto);

+ 24 - 0
kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/mapper/KafkaConnectMapper.java

@@ -0,0 +1,24 @@
+package com.provectus.kafka.ui.cluster.mapper;
+
+import com.provectus.kafka.ui.connect.model.ConnectorStatusConnector;
+import com.provectus.kafka.ui.connect.model.ConnectorTask;
+import com.provectus.kafka.ui.connect.model.NewConnector;
+import com.provectus.kafka.ui.model.*;
+import org.mapstruct.Mapper;
+
+@Mapper(componentModel = "spring")
+public interface KafkaConnectMapper {
+    NewConnector toClient(com.provectus.kafka.ui.model.NewConnector newConnector);
+
+    Connector fromClient(com.provectus.kafka.ui.connect.model.Connector connector);
+
+    ConnectorStatus fromClient(ConnectorStatusConnector connectorStatus);
+
+    Task fromClient(ConnectorTask connectorTask);
+
+    TaskStatus fromClient(com.provectus.kafka.ui.connect.model.TaskStatus taskStatus);
+
+    ConnectorPlugin fromClient(com.provectus.kafka.ui.connect.model.ConnectorPlugin connectorPlugin);
+
+    ConnectorPluginConfigValidationResponse fromClient(com.provectus.kafka.ui.connect.model.ConnectorPluginConfigValidationResponse connectorPluginConfigValidationResponse);
+}

+ 4 - 0
kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/model/KafkaCluster.java

@@ -1,6 +1,8 @@
 package com.provectus.kafka.ui.cluster.model;
 
 import com.provectus.kafka.ui.model.ServerStatus;
+
+import java.util.List;
 import java.util.Properties;
 import lombok.Builder;
 import lombok.Data;
@@ -16,6 +18,7 @@ public class KafkaCluster {
     private final String bootstrapServers;
     private final String zookeeper;
     private final String schemaRegistry;
+    private final List<KafkaConnectCluster> kafkaConnect;
     private final String schemaNameTemplate;
     private final ServerStatus status;
     private final ServerStatus zookeeperStatus;
@@ -26,4 +29,5 @@ public class KafkaCluster {
     private final Path protobufFile;
     private final String protobufMessageName;
     private final Properties properties;
+    private final Boolean readOnly;
 }

+ 11 - 0
kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/model/KafkaConnectCluster.java

@@ -0,0 +1,11 @@
+package com.provectus.kafka.ui.cluster.model;
+
+import lombok.Builder;
+import lombok.Data;
+
+@Data
+@Builder(toBuilder = true)
+public class KafkaConnectCluster {
+    private final String name;
+    private final String address;
+}

+ 183 - 0
kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/service/KafkaConnectService.java

@@ -0,0 +1,183 @@
+package com.provectus.kafka.ui.cluster.service;
+
+import com.provectus.kafka.ui.cluster.client.KafkaConnectClients;
+import com.provectus.kafka.ui.cluster.exception.NotFoundException;
+import com.provectus.kafka.ui.cluster.mapper.ClusterMapper;
+import com.provectus.kafka.ui.cluster.mapper.KafkaConnectMapper;
+import com.provectus.kafka.ui.cluster.model.ClustersStorage;
+import com.provectus.kafka.ui.cluster.model.KafkaCluster;
+import com.provectus.kafka.ui.cluster.model.KafkaConnectCluster;
+import com.provectus.kafka.ui.model.*;
+import lombok.RequiredArgsConstructor;
+import lombok.extern.log4j.Log4j2;
+import org.springframework.stereotype.Service;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+
+import java.util.Collection;
+import java.util.Map;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+@Service
+@Log4j2
+@RequiredArgsConstructor
+public class KafkaConnectService {
+    private final ClustersStorage clustersStorage;
+    private final ClusterMapper clusterMapper;
+    private final KafkaConnectMapper kafkaConnectMapper;
+
+    public Mono<Flux<Connect>> getConnects(String clusterName) {
+        return Mono.just(
+                Flux.fromIterable(clustersStorage.getClusterByName(clusterName)
+                        .map(KafkaCluster::getKafkaConnect).stream()
+                        .flatMap(Collection::stream)
+                        .map(clusterMapper::toKafkaConnect)
+                        .collect(Collectors.toList())
+                )
+        );
+    }
+
+
+    public Flux<String> getConnectors(String clusterName, String connectName) {
+        return getConnectAddress(clusterName, connectName)
+                .flatMapMany(connect ->
+                        KafkaConnectClients.withBaseUrl(connect).getConnectors()
+                                .doOnError(log::error)
+                );
+    }
+
+    public Mono<Connector> createConnector(String clusterName, String connectName, Mono<NewConnector> connector) {
+        return getConnectAddress(clusterName, connectName)
+                .flatMap(connect ->
+                        connector
+                                .map(kafkaConnectMapper::toClient)
+                                .flatMap(c ->
+                                        KafkaConnectClients.withBaseUrl(connect).createConnector(c)
+                                )
+                                .flatMap(c -> getConnector(clusterName, connectName, c.getName()))
+                );
+    }
+
+    public Mono<Connector> getConnector(String clusterName, String connectName, String connectorName) {
+        return getConnectAddress(clusterName, connectName)
+                .flatMap(connect ->
+                        KafkaConnectClients.withBaseUrl(connect).getConnector(connectorName)
+                                .map(kafkaConnectMapper::fromClient)
+                                .flatMap(connector ->
+                                        KafkaConnectClients.withBaseUrl(connect).getConnectorStatus(connector.getName())
+                                                .map(connectorStatus -> {
+                                                    var status = connectorStatus.getConnector();
+                                                    connector.status(kafkaConnectMapper.fromClient(status));
+                                                    return (Connector) new Connector()
+                                                            .status(kafkaConnectMapper.fromClient(status))
+                                                            .type(connector.getType())
+                                                            .tasks(connector.getTasks())
+                                                            .name(connector.getName())
+                                                            .config(connector.getConfig());
+                                                })
+                                )
+                );
+    }
+
+    public Mono<Map<String, Object>> getConnectorConfig(String clusterName, String connectName, String connectorName) {
+        return getConnectAddress(clusterName, connectName)
+                .flatMap(connect ->
+                        KafkaConnectClients.withBaseUrl(connect).getConnectorConfig(connectorName)
+                );
+    }
+
+    public Mono<Connector> setConnectorConfig(String clusterName, String connectName, String connectorName, Mono<Object> requestBody) {
+        return getConnectAddress(clusterName, connectName)
+                .flatMap(connect ->
+                        requestBody.flatMap(body ->
+                                KafkaConnectClients.withBaseUrl(connect).setConnectorConfig(connectorName, (Map<String, Object>) body)
+                        )
+                                .map(kafkaConnectMapper::fromClient)
+                );
+    }
+
+    public Mono<Void> deleteConnector(String clusterName, String connectName, String connectorName) {
+        return getConnectAddress(clusterName, connectName)
+                .flatMap(connect ->
+                        KafkaConnectClients.withBaseUrl(connect).deleteConnector(connectorName)
+                );
+    }
+
+    public Mono<Void> updateConnectorState(String clusterName, String connectName, String connectorName, ConnectorAction action) {
+        Function<String, Mono<Void>> kafkaClientCall;
+        switch (action) {
+            case RESTART:
+                kafkaClientCall = connect -> KafkaConnectClients.withBaseUrl(connect).restartConnector(connectorName);
+                break;
+            case PAUSE:
+                kafkaClientCall = connect -> KafkaConnectClients.withBaseUrl(connect).pauseConnector(connectorName);
+                break;
+            case RESUME:
+                kafkaClientCall = connect -> KafkaConnectClients.withBaseUrl(connect).resumeConnector(connectorName);
+                break;
+            default:
+                throw new IllegalStateException("Unexpected value: " + action);
+        }
+        return getConnectAddress(clusterName, connectName)
+                .flatMap(kafkaClientCall);
+    }
+
+    public Flux<Task> getConnectorTasks(String clusterName, String connectName, String connectorName) {
+        return getConnectAddress(clusterName, connectName)
+                .flatMapMany(connect ->
+                        KafkaConnectClients.withBaseUrl(connect).getConnectorTasks(connectorName)
+                                .map(kafkaConnectMapper::fromClient)
+                                .flatMap(task ->
+                                        KafkaConnectClients.withBaseUrl(connect).getConnectorTaskStatus(connectorName, task.getId().getTask())
+                                                .map(kafkaConnectMapper::fromClient)
+                                                .map(task::status)
+                                )
+                );
+    }
+
+    public Mono<Void> restartConnectorTask(String clusterName, String connectName, String connectorName, Integer taskId) {
+        return getConnectAddress(clusterName, connectName)
+                .flatMap(connect ->
+                        KafkaConnectClients.withBaseUrl(connect).restartConnectorTask(connectorName, taskId)
+                );
+    }
+
+    public Mono<Flux<ConnectorPlugin>> getConnectorPlugins(String clusterName, String connectName) {
+        return Mono.just(getConnectAddress(clusterName, connectName)
+                .flatMapMany(connect ->
+                        KafkaConnectClients.withBaseUrl(connect).getConnectorPlugins()
+                                .map(kafkaConnectMapper::fromClient)
+                ));
+    }
+
+    public Mono<ConnectorPluginConfigValidationResponse> validateConnectorPluginConfig(String clusterName, String connectName, String pluginName, Mono<Object> requestBody) {
+        return getConnectAddress(clusterName, connectName)
+                .flatMap(connect ->
+                        requestBody.flatMap(body ->
+                                KafkaConnectClients.withBaseUrl(connect).validateConnectorPluginConfig(pluginName, (Map<String, Object>) body)
+                        )
+                                .map(kafkaConnectMapper::fromClient)
+                );
+    }
+
+    private Mono<KafkaCluster> getCluster(String clusterName) {
+        return clustersStorage.getClusterByName(clusterName)
+                .map(Mono::just)
+                .orElse(Mono.error(new NotFoundException("No such cluster")));
+    }
+
+    private Mono<String> getConnectAddress(String clusterName, String connectName) {
+        return getCluster(clusterName)
+                .map(kafkaCluster ->
+                        kafkaCluster.getKafkaConnect().stream()
+                                .filter(connect -> connect.getName().equals(connectName))
+                                .findFirst()
+                                .map(KafkaConnectCluster::getAddress)
+                )
+                .flatMap(connect -> connect
+                        .map(Mono::just)
+                        .orElse(Mono.error(new NotFoundException("No such connect cluster")))
+                );
+    }
+}

+ 92 - 0
kafka-ui-api/src/main/java/com/provectus/kafka/ui/rest/KafkaConnectRestController.java

@@ -0,0 +1,92 @@
+package com.provectus.kafka.ui.rest;
+
+import com.provectus.kafka.ui.api.ApiClustersConnectsApi;
+import com.provectus.kafka.ui.cluster.service.KafkaConnectService;
+import com.provectus.kafka.ui.model.*;
+import lombok.RequiredArgsConstructor;
+import lombok.extern.log4j.Log4j2;
+import org.springframework.http.ResponseEntity;
+import org.springframework.web.bind.annotation.RestController;
+import org.springframework.web.server.ServerWebExchange;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+
+import javax.validation.Valid;
+import java.util.Map;
+
+@RestController
+@RequiredArgsConstructor
+@Log4j2
+public class KafkaConnectRestController implements ApiClustersConnectsApi {
+    private final KafkaConnectService kafkaConnectService;
+
+    @Override
+    public Mono<ResponseEntity<Flux<Connect>>> getConnects(String clusterName, ServerWebExchange exchange) {
+        return kafkaConnectService.getConnects(clusterName).map(ResponseEntity::ok);
+    }
+
+    @Override
+    public Mono<ResponseEntity<Flux<String>>> getConnectors(String clusterName, String connectName, ServerWebExchange exchange) {
+        Flux<String> connectors = kafkaConnectService.getConnectors(clusterName, connectName);
+        return Mono.just(ResponseEntity.ok(connectors));
+    }
+
+    @Override
+    public Mono<ResponseEntity<Connector>> createConnector(String clusterName, String connectName, @Valid Mono<NewConnector> connector, ServerWebExchange exchange) {
+        return kafkaConnectService.createConnector(clusterName, connectName, connector)
+                .map(ResponseEntity::ok);
+    }
+
+    @Override
+    public Mono<ResponseEntity<Connector>> getConnector(String clusterName, String connectName, String connectorName, ServerWebExchange exchange) {
+        return kafkaConnectService.getConnector(clusterName, connectName, connectorName)
+                .map(ResponseEntity::ok);
+    }
+
+    @Override
+    public Mono<ResponseEntity<Void>> deleteConnector(String clusterName, String connectName, String connectorName, ServerWebExchange exchange) {
+        return kafkaConnectService.deleteConnector(clusterName, connectName, connectorName)
+                .map(ResponseEntity::ok);
+    }
+
+    @Override
+    public Mono<ResponseEntity<Map<String, Object>>> getConnectorConfig(String clusterName, String connectName, String connectorName, ServerWebExchange exchange) {
+        return kafkaConnectService.getConnectorConfig(clusterName, connectName, connectorName)
+                .map(ResponseEntity::ok);
+    }
+
+    @Override
+    public Mono<ResponseEntity<Connector>> setConnectorConfig(String clusterName, String connectName, String connectorName, @Valid Mono<Object> requestBody, ServerWebExchange exchange) {
+        return kafkaConnectService.setConnectorConfig(clusterName, connectName, connectorName, requestBody)
+                .map(ResponseEntity::ok);
+    }
+
+    @Override
+    public Mono<ResponseEntity<Void>> updateConnectorState(String clusterName, String connectName, String connectorName, ConnectorAction action, ServerWebExchange exchange) {
+        return kafkaConnectService.updateConnectorState(clusterName, connectName, connectorName, action)
+                .map(ResponseEntity::ok);
+    }
+
+    @Override
+    public Mono<ResponseEntity<Flux<Task>>> getConnectorTasks(String clusterName, String connectName, String connectorName, ServerWebExchange exchange) {
+        return Mono.just(ResponseEntity.ok(kafkaConnectService.getConnectorTasks(clusterName, connectName, connectorName)));
+    }
+
+    @Override
+    public Mono<ResponseEntity<Void>> restartConnectorTask(String clusterName, String connectName, String connectorName, Integer taskId, ServerWebExchange exchange) {
+        return kafkaConnectService.restartConnectorTask(clusterName, connectName, connectorName, taskId)
+                .map(ResponseEntity::ok);
+    }
+
+    @Override
+    public Mono<ResponseEntity<Flux<ConnectorPlugin>>> getConnectorPlugins(String clusterName, String connectName, ServerWebExchange exchange) {
+        return kafkaConnectService.getConnectorPlugins(clusterName, connectName)
+                .map(ResponseEntity::ok);
+    }
+
+    @Override
+    public Mono<ResponseEntity<ConnectorPluginConfigValidationResponse>> validateConnectorPluginConfig(String clusterName, String connectName, String pluginName, @Valid Mono<Object> requestBody, ServerWebExchange exchange) {
+        return kafkaConnectService.validateConnectorPluginConfig(clusterName, connectName, pluginName, requestBody)
+                .map(ResponseEntity::ok);
+    }
+}

+ 49 - 0
kafka-ui-api/src/main/java/com/provectus/kafka/ui/rest/config/ReadOnlyModeFilter.java

@@ -0,0 +1,49 @@
+package com.provectus.kafka.ui.rest.config;
+
+import com.provectus.kafka.ui.cluster.exception.NotFoundException;
+import com.provectus.kafka.ui.cluster.exception.ReadOnlyException;
+import com.provectus.kafka.ui.cluster.model.ClustersStorage;
+import lombok.RequiredArgsConstructor;
+import org.jetbrains.annotations.NotNull;
+import org.springframework.core.annotation.Order;
+import org.springframework.http.HttpMethod;
+import org.springframework.stereotype.Component;
+import org.springframework.web.server.ServerWebExchange;
+import org.springframework.web.server.WebFilter;
+import org.springframework.web.server.WebFilterChain;
+import reactor.core.publisher.Mono;
+
+import java.util.regex.Pattern;
+
+@Order
+@Component
+@RequiredArgsConstructor
+public class ReadOnlyModeFilter implements WebFilter {
+    private static final Pattern CLUSTER_NAME_REGEX = Pattern.compile("/api/clusters/(?<clusterName>[^/]++)");
+
+    private final ClustersStorage clustersStorage;
+
+    @NotNull
+    @Override
+    public Mono<Void> filter(ServerWebExchange exchange, @NotNull WebFilterChain chain) {
+        var isSafeMethod = exchange.getRequest().getMethod() == HttpMethod.GET;
+        if (isSafeMethod) {
+            return chain.filter(exchange);
+        }
+
+        var path = exchange.getRequest().getURI().getPath();
+        var matcher = CLUSTER_NAME_REGEX.matcher(path);
+        if (!matcher.find()) {
+            return chain.filter(exchange);
+        }
+        var clusterName = matcher.group("clusterName");
+        var kafkaCluster = clustersStorage.getClusterByName(clusterName)
+                .orElseThrow(() -> new NotFoundException(String.format("No cluster for name '%s'", clusterName)));
+
+        if (!kafkaCluster.getReadOnly()) {
+            return chain.filter(exchange);
+        }
+
+        return Mono.error(ReadOnlyException::new);
+    }
+}

+ 7 - 1
kafka-ui-api/src/main/resources/application-local.yml

@@ -5,14 +5,20 @@ kafka:
       bootstrapServers: localhost:9093
       zookeeper: localhost:2181
       schemaRegistry: http://localhost:8081
-#      schemaNameTemplate: "%s-value"
+      kafkaConnect:
+        - name: first
+          address: http://localhost:8083
       jmxPort: 9997
     -
       name: secondLocal
       bootstrapServers: localhost:9093
       zookeeper: localhost:2182
       schemaRegistry: http://localhost:8081
+      kafkaConnect:
+        - name: first
+          address: http://localhost:8083
       jmxPort: 9998
+      read-only: true
   admin-client-timeout: 5000
 zookeeper:
   connection-timeout: 1000

+ 49 - 15
kafka-ui-api/src/test/java/com/provectus/kafka/ui/AbstractBaseTest.java

@@ -1,35 +1,69 @@
 package com.provectus.kafka.ui;
 
-import org.junit.runner.RunWith;
+import org.jetbrains.annotations.NotNull;
+import org.junit.jupiter.api.extension.ExtendWith;
 import org.springframework.boot.test.context.SpringBootTest;
 import org.springframework.context.ApplicationContextInitializer;
+import org.springframework.context.ApplicationListener;
 import org.springframework.context.ConfigurableApplicationContext;
+import org.springframework.context.event.ContextClosedEvent;
 import org.springframework.test.context.ActiveProfiles;
-import org.springframework.test.context.junit4.SpringRunner;
+import org.springframework.test.context.junit.jupiter.SpringExtension;
 import org.testcontainers.containers.KafkaContainer;
 import org.testcontainers.containers.Network;
-import org.testcontainers.junit.jupiter.Container;
-import org.testcontainers.junit.jupiter.Testcontainers;
+import org.testcontainers.containers.wait.strategy.Wait;
 import org.testcontainers.utility.DockerImageName;
 
-@RunWith(SpringRunner.class)
+import java.time.Duration;
+
+@ExtendWith(SpringExtension.class)
 @SpringBootTest
 @ActiveProfiles("test")
-@Testcontainers
 public abstract class AbstractBaseTest {
-    @Container
-    public static KafkaContainer kafka = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:5.2.1"))
-            .withNetwork(Network.SHARED);
-    @Container
-    public static SchemaRegistryContainer schemaRegistry = new SchemaRegistryContainer("5.2.1")
-            .withKafka(kafka)
-            .dependsOn(kafka);
+    public static String LOCAL = "local";
+    public static String SECOND_LOCAL = "secondLocal";
 
     public static class Initializer implements ApplicationContextInitializer<ConfigurableApplicationContext> {
+        public final KafkaContainer kafka = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:5.2.1"))
+                .withNetwork(Network.SHARED);
+
+        public final SchemaRegistryContainer schemaRegistry = new SchemaRegistryContainer("5.2.1")
+                .withKafka(kafka)
+                .dependsOn(kafka);
+
+        public final KafkaConnectContainer kafkaConnect = new KafkaConnectContainer("5.2.1")
+                .withKafka(kafka)
+                .waitingFor(
+                        Wait.forLogMessage(".*Finished starting connectors and tasks.*", 1)
+                )
+                .dependsOn(kafka)
+                .dependsOn(schemaRegistry)
+                .withStartupTimeout(Duration.ofMinutes(15));
+
         @Override
-        public void initialize(ConfigurableApplicationContext context) {
-            System.setProperty("kafka.clusters.0.name", "local");
+        public void initialize(@NotNull ConfigurableApplicationContext context) {
+            kafka.start();
+            schemaRegistry.start();
+            kafkaConnect.start();
+
+            System.setProperty("kafka.clusters.0.name", LOCAL);
+            System.setProperty("kafka.clusters.0.bootstrapServers", kafka.getBootstrapServers());
             System.setProperty("kafka.clusters.0.schemaRegistry", schemaRegistry.getTarget());
+            System.setProperty("kafka.clusters.0.kafkaConnect.0.name", "kafka-connect");
+            System.setProperty("kafka.clusters.0.kafkaConnect.0.address", kafkaConnect.getTarget());
+
+            System.setProperty("kafka.clusters.1.name", SECOND_LOCAL);
+            System.setProperty("kafka.clusters.1.readOnly", "true");
+            System.setProperty("kafka.clusters.1.bootstrapServers", kafka.getBootstrapServers());
+            System.setProperty("kafka.clusters.1.schemaRegistry", schemaRegistry.getTarget());
+            System.setProperty("kafka.clusters.1.kafkaConnect.0.name", "kafka-connect");
+            System.setProperty("kafka.clusters.1.kafkaConnect.0.address", kafkaConnect.getTarget());
+
+            context.addApplicationListener((ApplicationListener<ContextClosedEvent>) event -> {
+                kafkaConnect.close();
+                schemaRegistry.close();
+                kafka.close();
+            });
         }
     }
 }

+ 42 - 0
kafka-ui-api/src/test/java/com/provectus/kafka/ui/KafkaConnectContainer.java

@@ -0,0 +1,42 @@
+package com.provectus.kafka.ui;
+
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.KafkaContainer;
+import org.testcontainers.containers.Network;
+
+public class KafkaConnectContainer extends GenericContainer<KafkaConnectContainer> {
+    private static final int CONNECT_PORT = 8083;
+
+    public KafkaConnectContainer(String version) {
+        super("confluentinc/cp-kafka-connect:" + version);
+    }
+
+
+    public KafkaConnectContainer withKafka(KafkaContainer kafka) {
+        String bootstrapServers = kafka.getNetworkAliases().get(0) + ":9092";
+        return withKafka(kafka.getNetwork(), bootstrapServers);
+    }
+
+    public KafkaConnectContainer withKafka(Network network, String bootstrapServers) {
+        withNetwork(network);
+        withEnv("CONNECT_BOOTSTRAP_SERVERS", "PLAINTEXT://" + bootstrapServers);
+        withEnv("CONNECT_GROUP_ID", "connect-group");
+        withEnv("CONNECT_CONFIG_STORAGE_TOPIC", "_connect_configs");
+        withEnv("CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR", "1");
+        withEnv("CONNECT_OFFSET_STORAGE_TOPIC", "_connect_offset");
+        withEnv("CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR", "1");
+        withEnv("CONNECT_STATUS_STORAGE_TOPIC", "_connect_status");
+        withEnv("CONNECT_STATUS_STORAGE_REPLICATION_FACTOR", "1");
+        withEnv("CONNECT_KEY_CONVERTER", "org.apache.kafka.connect.storage.StringConverter");
+        withEnv("CONNECT_VALUE_CONVERTER", "org.apache.kafka.connect.storage.StringConverter");
+        withEnv("CONNECT_INTERNAL_KEY_CONVERTER", "org.apache.kafka.connect.json.JsonConverter");
+        withEnv("CONNECT_INTERNAL_VALUE_CONVERTER", "org.apache.kafka.connect.json.JsonConverter");
+        withEnv("CONNECT_REST_ADVERTISED_HOST_NAME", "kafka-connect");
+        withEnv("CONNECT_PLUGIN_PATH", "/usr/share/java,/usr/share/confluent-hub-components");
+        return self();
+    }
+
+    public String getTarget() {
+        return "http://" + getContainerIpAddress() + ":" + getMappedPort(CONNECT_PORT);
+    }
+}

+ 303 - 0
kafka-ui-api/src/test/java/com/provectus/kafka/ui/KafkaConnectServiceTests.java

@@ -0,0 +1,303 @@
+package com.provectus.kafka.ui;
+
+import com.provectus.kafka.ui.model.*;
+import lombok.extern.log4j.Log4j2;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.test.autoconfigure.web.reactive.AutoConfigureWebTestClient;
+import org.springframework.core.ParameterizedTypeReference;
+import org.springframework.test.context.ContextConfiguration;
+import org.springframework.test.web.reactive.server.WebTestClient;
+
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+
+import static java.util.function.Predicate.not;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+@ContextConfiguration(initializers = {AbstractBaseTest.Initializer.class})
+@Log4j2
+@AutoConfigureWebTestClient(timeout = "60000")
+public class KafkaConnectServiceTests extends AbstractBaseTest {
+    private final String connectName = "kafka-connect";
+    private final String connectorName = UUID.randomUUID().toString();
+    private final Map<String, Object> config = Map.of(
+            "name", connectorName,
+            "connector.class", "org.apache.kafka.connect.file.FileStreamSinkConnector",
+            "tasks.max", "1",
+            "topics", "output-topic",
+            "file", "/tmp/test"
+    );
+
+    @Autowired
+    private WebTestClient webTestClient;
+
+
+    @BeforeEach
+    public void setUp() {
+        webTestClient.post()
+                .uri("/api/clusters/{clusterName}/connects/{connectName}/connectors", LOCAL, connectName)
+                .bodyValue(new NewConnector()
+                        .name(connectorName)
+                        .config(Map.of(
+                                "connector.class", "org.apache.kafka.connect.file.FileStreamSinkConnector",
+                                "tasks.max", "1",
+                                "topics", "output-topic",
+                                "file", "/tmp/test"
+                        ))
+                )
+                .exchange()
+                .expectStatus().isOk();
+    }
+
+    @AfterEach
+    public void tearDown() {
+        webTestClient.delete()
+                .uri("/api/clusters/{clusterName}/connects/{connectName}/connectors/{connectorName}", LOCAL, connectName, connectorName)
+                .exchange()
+                .expectStatus().isOk();
+    }
+
+    @Test
+    public void shouldListConnectors() {
+        webTestClient.get()
+                .uri("/api/clusters/{clusterName}/connects/{connectName}/connectors", LOCAL, connectName)
+                .exchange()
+                .expectStatus().isOk()
+                .expectBody()
+                .jsonPath(String.format("$[?(@ == '%s')]", connectorName))
+                .exists();
+    }
+
+    @Test
+    public void shouldReturnNotFoundForNonExistingCluster() {
+        webTestClient.get()
+                .uri("/api/clusters/{clusterName}/connects/{connectName}/connectors", "nonExistingCluster", connectName)
+                .exchange()
+                .expectStatus().isNotFound();
+    }
+
+    @Test
+    public void shouldReturnNotFoundForNonExistingConnectName() {
+        webTestClient.get()
+                .uri("/api/clusters/{clusterName}/connects/{connectName}/connectors", LOCAL, "nonExistingConnect")
+                .exchange()
+                .expectStatus().isNotFound();
+    }
+
+    @Test
+    public void shouldRetrieveConnector() {
+        Connector expected = (Connector) new Connector()
+                .status(new ConnectorStatus()
+                        .state(ConnectorStatus.StateEnum.RUNNING)
+                        .workerId("kafka-connect:8083"))
+                .tasks(List.of(new TaskId()
+                        .connector(connectorName)
+                        .task(0)))
+                .type(Connector.TypeEnum.SINK)
+                .name(connectorName)
+                .config(config);
+        webTestClient.get()
+                .uri("/api/clusters/{clusterName}/connects/{connectName}/connectors/{connectorName}", LOCAL, connectName, connectorName)
+                .exchange()
+                .expectStatus().isOk()
+                .expectBody(Connector.class)
+                .value(connector -> assertEquals(expected, connector));
+    }
+
+    @Test
+    public void shouldUpdateConfig() {
+        webTestClient.put()
+                .uri("/api/clusters/{clusterName}/connects/{connectName}/connectors/{connectorName}/config", LOCAL, connectName, connectorName)
+                .bodyValue(Map.of(
+                        "connector.class", "org.apache.kafka.connect.file.FileStreamSinkConnector",
+                        "tasks.max", "1",
+                        "topics", "another-topic",
+                        "file", "/tmp/new"
+                        )
+                )
+                .exchange()
+                .expectStatus().isOk();
+
+        webTestClient.get()
+                .uri("/api/clusters/{clusterName}/connects/{connectName}/connectors/{connectorName}/config", LOCAL, connectName, connectorName)
+                .exchange()
+                .expectStatus().isOk()
+                .expectBody(new ParameterizedTypeReference<Map<String, Object>>() {
+                })
+                .isEqualTo(Map.of(
+                        "connector.class", "org.apache.kafka.connect.file.FileStreamSinkConnector",
+                        "tasks.max", "1",
+                        "topics", "another-topic",
+                        "file", "/tmp/new",
+                        "name", connectorName
+                ));
+    }
+
+    @Test
+    public void shouldReturn400WhenConnectReturns400ForInvalidConfigCreate() {
+        var connectorName = UUID.randomUUID().toString();
+        webTestClient.post()
+                .uri("/api/clusters/{clusterName}/connects/{connectName}/connectors", LOCAL, connectName)
+                .bodyValue(Map.of(
+                        "name", connectorName,
+                        "config", Map.of(
+                                "connector.class", "org.apache.kafka.connect.file.FileStreamSinkConnector",
+                                "tasks.max", "invalid number",
+                                "topics", "another-topic",
+                                "file", "/tmp/test"
+                        ))
+                )
+                .exchange()
+                .expectStatus().isBadRequest();
+
+        webTestClient.get()
+                .uri("/api/clusters/{clusterName}/connects/{connectName}/connectors", LOCAL, connectName)
+                .exchange()
+                .expectStatus().isOk()
+                .expectBody()
+                .jsonPath(String.format("$[?(@ == '%s')]", connectorName))
+                .doesNotExist();
+    }
+
+    @Test
+    public void shouldReturn400WhenConnectReturns500ForInvalidConfigCreate() {
+        var connectorName = UUID.randomUUID().toString();
+        webTestClient.post()
+                .uri("/api/clusters/{clusterName}/connects/{connectName}/connectors", LOCAL, connectName)
+                .bodyValue(Map.of(
+                        "name", connectorName,
+                        "config", Map.of(
+                                "connector.class", "org.apache.kafka.connect.file.FileStreamSinkConnector"
+                        ))
+                )
+                .exchange()
+                .expectStatus().isBadRequest();
+
+        webTestClient.get()
+                .uri("/api/clusters/{clusterName}/connects/{connectName}/connectors", LOCAL, connectName)
+                .exchange()
+                .expectStatus().isOk()
+                .expectBody()
+                .jsonPath(String.format("$[?(@ == '%s')]", connectorName))
+                .doesNotExist();
+    }
+
+
+    @Test
+    public void shouldReturn400WhenConnectReturns400ForInvalidConfigUpdate() {
+        webTestClient.put()
+                .uri("/api/clusters/{clusterName}/connects/{connectName}/connectors/{connectorName}/config", LOCAL, connectName, connectorName)
+                .bodyValue(Map.of(
+                        "connector.class", "org.apache.kafka.connect.file.FileStreamSinkConnector",
+                        "tasks.max", "invalid number",
+                        "topics", "another-topic",
+                        "file", "/tmp/test"
+                        )
+                )
+                .exchange()
+                .expectStatus().isBadRequest();
+
+        webTestClient.get()
+                .uri("/api/clusters/{clusterName}/connects/{connectName}/connectors/{connectorName}/config", LOCAL, connectName, connectorName)
+                .exchange()
+                .expectStatus().isOk()
+                .expectBody(new ParameterizedTypeReference<Map<String, Object>>() {
+                })
+                .isEqualTo(Map.of(
+                        "connector.class", "org.apache.kafka.connect.file.FileStreamSinkConnector",
+                        "tasks.max", "1",
+                        "topics", "output-topic",
+                        "file", "/tmp/test",
+                        "name", connectorName
+                ));
+    }
+
+    @Test
+    public void shouldReturn400WhenConnectReturns500ForInvalidConfigUpdate() {
+        webTestClient.put()
+                .uri("/api/clusters/{clusterName}/connects/{connectName}/connectors/{connectorName}/config", LOCAL, connectName, connectorName)
+                .bodyValue(Map.of(
+                        "connector.class", "org.apache.kafka.connect.file.FileStreamSinkConnector"
+                        )
+                )
+                .exchange()
+                .expectStatus().isBadRequest();
+
+        webTestClient.get()
+                .uri("/api/clusters/{clusterName}/connects/{connectName}/connectors/{connectorName}/config", LOCAL, connectName, connectorName)
+                .exchange()
+                .expectStatus().isOk()
+                .expectBody(new ParameterizedTypeReference<Map<String, Object>>() {
+                })
+                .isEqualTo(Map.of(
+                        "connector.class", "org.apache.kafka.connect.file.FileStreamSinkConnector",
+                        "tasks.max", "1",
+                        "topics", "output-topic",
+                        "file", "/tmp/test",
+                        "name", connectorName
+                ));
+    }
+
+    @Test
+    public void shouldRetrieveConnectorPlugins() {
+        webTestClient.get()
+                .uri("/api/clusters/{clusterName}/connects/{connectName}/plugins", LOCAL, connectName)
+                .exchange()
+                .expectStatus().isOk()
+                .expectBodyList(ConnectorPlugin.class)
+                .value(plugins -> assertEquals(13, plugins.size()));
+    }
+
+    @Test
+    public void shouldSuccessfullyValidateConnectorPluginConfiguration() {
+        var pluginName = "FileStreamSinkConnector";
+        webTestClient.put()
+                .uri("/api/clusters/{clusterName}/connects/{connectName}/plugins/{pluginName}/config/validate", LOCAL, connectName, pluginName)
+                .bodyValue(Map.of(
+                        "connector.class", "org.apache.kafka.connect.file.FileStreamSinkConnector",
+                        "tasks.max", "1",
+                        "topics", "output-topic",
+                        "file", "/tmp/test",
+                        "name", connectorName
+                        )
+                )
+                .exchange()
+                .expectStatus().isOk()
+                .expectBody(ConnectorPluginConfigValidationResponse.class)
+                .value(response -> assertEquals(0, response.getErrorCount()));
+    }
+
+    @Test
+    public void shouldValidateAndReturnErrorsOfConnectorPluginConfiguration() {
+        var pluginName = "FileStreamSinkConnector";
+        webTestClient.put()
+                .uri("/api/clusters/{clusterName}/connects/{connectName}/plugins/{pluginName}/config/validate", LOCAL, connectName, pluginName)
+                .bodyValue(Map.of(
+                        "connector.class", "org.apache.kafka.connect.file.FileStreamSinkConnector",
+                        "tasks.max", "0",
+                        "topics", "output-topic",
+                        "file", "/tmp/test",
+                        "name", connectorName
+                        )
+                )
+                .exchange()
+                .expectStatus().isOk()
+                .expectBody(ConnectorPluginConfigValidationResponse.class)
+                .value(response -> {
+                    assertEquals(1, response.getErrorCount());
+                    var error = response.getConfigs().stream()
+                            .map(ConnectorPluginConfig::getValue)
+                            .map(ConnectorPluginConfigValue::getErrors)
+                            .filter(not(List::isEmpty))
+                            .findFirst().get();
+                    assertEquals(
+                            "Invalid value 0 for configuration tasks.max: Value must be at least 1",
+                            error.get(0)
+                    );
+                });
+    }
+}

+ 97 - 0
kafka-ui-api/src/test/java/com/provectus/kafka/ui/ReadOnlyModeTests.java

@@ -0,0 +1,97 @@
+package com.provectus.kafka.ui;
+
+import com.provectus.kafka.ui.model.TopicFormData;
+import lombok.extern.log4j.Log4j2;
+import org.junit.jupiter.api.Test;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.test.autoconfigure.web.reactive.AutoConfigureWebTestClient;
+import org.springframework.http.HttpStatus;
+import org.springframework.test.context.ContextConfiguration;
+import org.springframework.test.web.reactive.server.WebTestClient;
+
+import java.util.Map;
+import java.util.UUID;
+
+@ContextConfiguration(initializers = {AbstractBaseTest.Initializer.class})
+@Log4j2
+@AutoConfigureWebTestClient(timeout = "60000")
+public class ReadOnlyModeTests extends AbstractBaseTest {
+
+    @Autowired
+    private WebTestClient webTestClient;
+
+    @Test
+    public void shouldCreateTopicForNonReadonlyCluster() {
+        var topicName = UUID.randomUUID().toString();
+        webTestClient.post()
+                .uri("/api/clusters/{clusterName}/topics", LOCAL)
+                .bodyValue(new TopicFormData()
+                        .name(topicName)
+                        .partitions(1)
+                        .replicationFactor(1)
+                        .configs(Map.of())
+                )
+                .exchange()
+                .expectStatus()
+                .isOk();
+    }
+
+    @Test
+    public void shouldNotCreateTopicForReadonlyCluster() {
+        var topicName = UUID.randomUUID().toString();
+        webTestClient.post()
+                .uri("/api/clusters/{clusterName}/topics", SECOND_LOCAL)
+                .bodyValue(new TopicFormData()
+                        .name(topicName)
+                        .partitions(1)
+                        .replicationFactor(1)
+                        .configs(Map.of())
+                )
+                .exchange()
+                .expectStatus()
+                .isEqualTo(HttpStatus.METHOD_NOT_ALLOWED);
+    }
+
+    @Test
+    public void shouldUpdateTopicForNonReadonlyCluster() {
+        var topicName = UUID.randomUUID().toString();
+        webTestClient.post()
+                .uri("/api/clusters/{clusterName}/topics", LOCAL)
+                .bodyValue(new TopicFormData()
+                        .name(topicName)
+                        .partitions(1)
+                        .replicationFactor(1)
+                        .configs(Map.of())
+                )
+                .exchange()
+                .expectStatus()
+                .isOk();
+        webTestClient.patch()
+                .uri("/api/clusters/{clusterName}/topics/{topicName}", LOCAL, topicName)
+                .bodyValue(new TopicFormData()
+                        .name(topicName)
+                        .partitions(2)
+                        .replicationFactor(1)
+                        .configs(Map.of())
+                )
+                .exchange()
+                .expectStatus()
+                .isOk();
+    }
+
+    @Test
+    public void shouldNotUpdateTopicForReadonlyCluster() {
+        var topicName = UUID.randomUUID().toString();
+        webTestClient.patch()
+                .uri("/api/clusters/{clusterName}/topics/{topicName}", SECOND_LOCAL, topicName)
+                .bodyValue(new TopicFormData()
+                        .name(topicName)
+                        .partitions(1)
+                        .replicationFactor(1)
+                        .configs(Map.of())
+                )
+                .exchange()
+                .expectStatus()
+                .isEqualTo(HttpStatus.METHOD_NOT_ALLOWED);
+    }
+}

+ 11 - 11
kafka-ui-api/src/test/java/com/provectus/kafka/ui/SchemaRegistryServiceTests.java

@@ -27,7 +27,7 @@ class SchemaRegistryServiceTests extends AbstractBaseTest {
     String subject;
 
     @BeforeEach
-    void setUpBefore() {
+    public void setUpBefore() {
         this.subject = UUID.randomUUID().toString();
     }
 
@@ -35,26 +35,26 @@ class SchemaRegistryServiceTests extends AbstractBaseTest {
     public void should404WhenGetAllSchemasForUnknownCluster() {
         webTestClient
                 .get()
-                .uri("http://localhost:8080/api/clusters/unknown-cluster/schemas")
+                .uri("/api/clusters/unknown-cluster/schemas")
                 .exchange()
                 .expectStatus().isNotFound();
     }
 
     @Test
-    void shouldReturn404WhenGetLatestSchemaByNonExistingSubject() {
+    public void shouldReturn404WhenGetLatestSchemaByNonExistingSubject() {
         String unknownSchema = "unknown-schema";
         webTestClient
                 .get()
-                .uri("http://localhost:8080/api/clusters/local/schemas/{subject}/latest", unknownSchema)
+                .uri("/api/clusters/{clusterName}/schemas/{subject}/latest", LOCAL, unknownSchema)
                 .exchange()
                 .expectStatus().isNotFound();
     }
 
     @Test
-    void shouldReturnBackwardAsGlobalCompatibilityLevelByDefault() {
+    public void shouldReturnBackwardAsGlobalCompatibilityLevelByDefault() {
         webTestClient
                 .get()
-                .uri("http://localhost:8080/api/clusters/local/schemas/compatibility")
+                .uri("/api/clusters/{clusterName}/schemas/compatibility", LOCAL)
                 .exchange()
                 .expectStatus().isOk()
                 .expectBody(CompatibilityLevel.class)
@@ -71,7 +71,7 @@ class SchemaRegistryServiceTests extends AbstractBaseTest {
 
         webTestClient
                 .get()
-                .uri("http://localhost:8080/api/clusters/local/schemas")
+                .uri("/api/clusters/{clusterName}/schemas", LOCAL)
                 .exchange()
                 .expectStatus().isOk()
                 .expectBodyList(SchemaSubject.class)
@@ -99,7 +99,7 @@ class SchemaRegistryServiceTests extends AbstractBaseTest {
         //Get the created schema and check its items
         webTestClient
                 .get()
-                .uri("http://localhost:8080/api/clusters/local/schemas/{subject}/latest", subject)
+                .uri("/api/clusters/{clusterName}/schemas/{subject}/latest", LOCAL, subject)
                 .exchange()
                 .expectStatus().isOk()
                 .expectBodyList(SchemaSubject.class)
@@ -110,7 +110,7 @@ class SchemaRegistryServiceTests extends AbstractBaseTest {
 
         //Now let's change compatibility level of this schema to FULL whereas the global level should be BACKWARD
         webTestClient.put()
-                .uri("http://localhost:8080/api/clusters/local/schemas/{subject}/compatibility", subject)
+                .uri("/api/clusters/{clusterName}/schemas/{subject}/compatibility", LOCAL, subject)
                 .contentType(MediaType.APPLICATION_JSON)
                 .body(BodyInserters.fromValue("{\"compatibility\":\"FULL\"}"))
                 .exchange()
@@ -119,7 +119,7 @@ class SchemaRegistryServiceTests extends AbstractBaseTest {
         //Get one more time to check the schema compatibility level is changed to FULL
         webTestClient
                 .get()
-                .uri("http://localhost:8080/api/clusters/local/schemas/{subject}/latest", subject)
+                .uri("/api/clusters/{clusterName}/schemas/{subject}/latest", LOCAL, subject)
                 .exchange()
                 .expectStatus().isOk()
                 .expectBodyList(SchemaSubject.class)
@@ -132,7 +132,7 @@ class SchemaRegistryServiceTests extends AbstractBaseTest {
     private void createNewSubjectAndAssert(String subject) {
         webTestClient
                 .post()
-                .uri("http://localhost:8080/api/clusters/local/schemas/{subject}", subject)
+                .uri("/api/clusters/{clusterName}/schemas/{subject}", LOCAL, subject)
                 .contentType(MediaType.APPLICATION_JSON)
                 .body(BodyInserters.fromValue("{\"schema\":\"{\\\"type\\\": \\\"string\\\"}\"}"))
                 .exchange()

+ 46 - 2
kafka-ui-contract/pom.xml

@@ -33,6 +33,12 @@
                     <artifactId>jackson-databind-nullable</artifactId>
                     <version>${jackson-databind-nullable.version}</version>
                 </dependency>
+                <dependency>
+                    <groupId>com.google.code.findbugs</groupId>
+                    <artifactId>jsr305</artifactId>
+                    <version>3.0.2</version>
+                    <scope>provided</scope>
+                </dependency>
             </dependencies>
 
             <build>
@@ -91,12 +97,50 @@
                                     </configOptions>
                                 </configuration>
                             </execution>
+                            <execution>
+                                <id>generate-connect-client</id>
+                                <goals>
+                                    <goal>generate</goal>
+                                </goals>
+                                <configuration>
+                                    <inputSpec>${project.basedir}/src/main/resources/swagger/kafka-connect-api.yaml
+                                    </inputSpec>
+                                    <output>${project.build.directory}/generated-sources/kafka-connect-client</output>
+                                    <generatorName>java</generatorName>
+                                    <generateApiTests>false</generateApiTests>
+                                    <generateModelTests>false</generateModelTests>
+
+                                    <configOptions>
+                                        <modelPackage>com.provectus.kafka.ui.connect.model</modelPackage>
+                                        <apiPackage>com.provectus.kafka.ui.connect.api</apiPackage>
+                                        <sourceFolder>kafka-connect-client</sourceFolder>
+
+                                        <asyncNative>true</asyncNative>
+                                        <library>webclient</library>
+
+                                        <useBeanValidation>true</useBeanValidation>
+                                        <dateLibrary>java8</dateLibrary>
+                                    </configOptions>
+                                </configuration>
+                            </execution>
                         </executions>
                     </plugin>
+                    <plugin>
+                        <groupId>org.apache.maven.plugins</groupId>
+                        <artifactId>maven-clean-plugin</artifactId>
+                        <version>${maven-clean-plugin.version}</version>
+                        <configuration>
+                            <filesets>
+                                <fileset>
+                                    <directory>${basedir}/${frontend-generated-sources-directory}</directory>
+                                </fileset>
+                            </filesets>
+                        </configuration>
+                    </plugin>
                     <plugin>
                         <groupId>org.apache.maven.plugins</groupId>
                         <artifactId>maven-resources-plugin</artifactId>
-                        <version>3.1.0</version>
+                        <version>${maven-resources-plugin.version}</version>
                         <executions>
                             <execution>
                                 <id>copy-resource-one</id>
@@ -106,7 +150,7 @@
                                 </goals>
 
                                 <configuration>
-                                    <outputDirectory>${basedir}/..//kafka-ui-react-app/src/generated-sources</outputDirectory>
+                                    <outputDirectory>${basedir}/${frontend-generated-sources-directory}</outputDirectory>
                                     <resources>
                                         <resource>
                                             <directory>${project.build.directory}/generated-sources/frontend/</directory>

+ 501 - 0
kafka-ui-contract/src/main/resources/swagger/kafka-connect-api.yaml

@@ -0,0 +1,501 @@
+openapi: 3.0.0
+info:
+  description: Api Documentation
+  version: 0.1.0
+  title: Api Documentation
+  termsOfService: urn:tos
+  contact: {}
+  license:
+    name: Apache 2.0
+    url: http://www.apache.org/licenses/LICENSE-2.0
+tags:
+  - name: /connect
+servers:
+  - url: /localhost
+
+paths:
+  /connectors:
+    get:
+      tags:
+        - /connect
+      summary: get all connectors from Kafka Connect service
+      operationId: getConnectors
+      responses:
+        200:
+          description: OK
+          content:
+            application/json:
+              schema:
+                type: array
+                items:
+                  type: string
+    post:
+      tags:
+        - /connect
+      summary: create new connector
+      operationId: createConnector
+      requestBody:
+        content:
+          application/json:
+            schema:
+              $ref: '#/components/schemas/NewConnector'
+      responses:
+        200:
+          description: OK
+          content:
+            application/json:
+              schema:
+                $ref: '#/components/schemas/Connector'
+        400:
+          description: Bad request
+        409:
+          description: rebalance is in progress
+        500:
+          description: Internal server error
+
+  /connectors/{connectorName}:
+    get:
+      tags:
+        - /connect
+      summary: get information about the connector
+      operationId: getConnector
+      parameters:
+        - name: connectorName
+          in: path
+          required: true
+          schema:
+            type: string
+      responses:
+        200:
+          description: OK
+          content:
+            application/json:
+              schema:
+                $ref: '#/components/schemas/Connector'
+    delete:
+      tags:
+        - /connect
+      summary: delete connector
+      operationId: deleteConnector
+      parameters:
+        - name: connectorName
+          in: path
+          required: true
+          schema:
+            type: string
+      responses:
+        200:
+          description: OK
+        409:
+          description: rebalance is in progress
+
+  /connectors/{connectorName}/config:
+    get:
+      tags:
+        - /connect
+      summary: get connector configuration
+      operationId: getConnectorConfig
+      parameters:
+        - name: connectorName
+          in: path
+          required: true
+          schema:
+            type: string
+      responses:
+        200:
+          description: OK
+          content:
+            application/json:
+              schema:
+                $ref: '#/components/schemas/ConnectorConfig'
+    put:
+      tags:
+        - /connect
+      summary: update or create connector with provided config
+      operationId: setConnectorConfig
+      parameters:
+        - name: connectorName
+          in: path
+          required: true
+          schema:
+            type: string
+      requestBody:
+        content:
+          application/json:
+            schema:
+              $ref: '#/components/schemas/ConnectorConfig'
+      responses:
+        200:
+          description: OK
+          content:
+            application/json:
+              schema:
+                $ref: '#/components/schemas/Connector'
+        400:
+          description: Bad request
+        409:
+          description: rebalance is in progress
+        500:
+          description: Internal server error
+
+
+  /connectors/{connectorName}/status:
+    get:
+      tags:
+        - /connect
+      summary: get connector status
+      operationId: getConnectorStatus
+      parameters:
+        - name: connectorName
+          in: path
+          required: true
+          schema:
+            type: string
+      responses:
+        200:
+          description: OK
+          content:
+            application/json:
+              schema:
+                $ref: '#/components/schemas/ConnectorStatus'
+
+  /connectors/{connectorName}/restart:
+    post:
+      tags:
+        - /connect
+      summary: restart the connector
+      operationId: restartConnector
+      parameters:
+        - name: connectorName
+          in: path
+          required: true
+          schema:
+            type: string
+      responses:
+        200:
+          description: OK
+        409:
+          description: rebalance is in progress
+
+  /connectors/{connectorName}/pause:
+    put:
+      tags:
+        - /connect
+      summary: pause the connector
+      operationId: pauseConnector
+      parameters:
+        - name: connectorName
+          in: path
+          required: true
+          schema:
+            type: string
+      responses:
+        202:
+          description: Accepted
+
+  /connectors/{connectorName}/resume:
+    put:
+      tags:
+        - /connect
+      summary: resume the connector
+      operationId: resumeConnector
+      parameters:
+        - name: connectorName
+          in: path
+          required: true
+          schema:
+            type: string
+      responses:
+        202:
+          description: Accepted
+
+  /connectors/{connectorName}/tasks:
+    get:
+      tags:
+        - /connect
+      summary: get connector tasks
+      operationId: getConnectorTasks
+      parameters:
+        - name: connectorName
+          in: path
+          required: true
+          schema:
+            type: string
+      responses:
+        200:
+          description: OK
+          content:
+            application/json:
+              schema:
+                type: array
+                items:
+                  $ref: '#/components/schemas/ConnectorTask'
+
+  /connectors/{connectorName}/tasks/{taskId}/status:
+    get:
+      tags:
+        - /connect
+      summary: get connector task status
+      operationId: getConnectorTaskStatus
+      parameters:
+        - name: connectorName
+          in: path
+          required: true
+          schema:
+            type: string
+        - name: taskId
+          in: path
+          required: true
+          schema:
+            type: integer
+      responses:
+        200:
+          description: OK
+          content:
+            application/json:
+              schema:
+                $ref: '#/components/schemas/TaskStatus'
+
+  /connectors/{connectorName}/tasks/{taskId}/restart:
+    post:
+      tags:
+        - /connect
+      summary: restart connector task
+      operationId: restartConnectorTask
+      parameters:
+        - name: connectorName
+          in: path
+          required: true
+          schema:
+            type: string
+        - name: taskId
+          in: path
+          required: true
+          schema:
+            type: integer
+      responses:
+        200:
+          description: OK
+
+  /connector-plugins:
+    get:
+      tags:
+        - /connect
+      summary: get connector plugins
+      operationId: getConnectorPlugins
+      responses:
+        200:
+          description: OK
+          content:
+            application/json:
+              schema:
+                type: array
+                items:
+                  $ref: '#/components/schemas/ConnectorPlugin'
+
+  /connector-plugins/{pluginName}/config/validate:
+    put:
+      tags:
+        - /connect
+      summary: validate connector plugin configuration
+      operationId: validateConnectorPluginConfig
+      parameters:
+        - name: pluginName
+          in: path
+          required: true
+          schema:
+            type: string
+      requestBody:
+        content:
+          application/json:
+            schema:
+              $ref: '#/components/schemas/ConnectorConfig'
+      responses:
+        200:
+          description: OK
+          content:
+            application/json:
+              schema:
+                $ref: '#/components/schemas/ConnectorPluginConfigValidationResponse'
+
+components:
+  schemas:
+    ConnectorConfig:
+      type: object
+      additionalProperties:
+        type: object
+
+    Task:
+      type: object
+      properties:
+        connector:
+          type: string
+        task:
+          type: integer
+
+    ConnectorTask:
+      type: object
+      properties:
+        id:
+          $ref: '#/components/schemas/Task'
+        config:
+          $ref: '#/components/schemas/ConnectorConfig'
+
+    NewConnector:
+      type: object
+      properties:
+        name:
+          type: string
+        config:
+          $ref: '#/components/schemas/ConnectorConfig'
+      required:
+        - name
+        - config
+
+    Connector:
+      allOf:
+        - $ref: '#/components/schemas/NewConnector'
+        - type: object
+          properties:
+            tasks:
+              type: array
+              items:
+                $ref: '#/components/schemas/Task'
+            type:
+              type: string
+              enum:
+                - source
+                - sink
+
+    TaskStatus:
+      type: object
+      properties:
+        id:
+          type: integer
+        state:
+          type: string
+          enum:
+            - RUNNING
+            - FAILED
+            - PAUSED
+            - UNASSIGNED
+        worker_id:
+          type: string
+
+    ConnectorStatus:
+      type: object
+      properties:
+        name:
+          type: string
+        connector:
+          type: object
+          properties:
+            state:
+              type: string
+              enum:
+                - RUNNING
+                - FAILED
+                - PAUSED
+                - UNASSIGNED
+            worker_id:
+              type: string
+        tasks:
+          type: array
+          items:
+            $ref: '#/components/schemas/TaskStatus'
+
+    ConnectorPlugin:
+      type: object
+      properties:
+        class:
+          type: string
+
+    ConnectorPluginConfigDefinition:
+      type: object
+      properties:
+        name:
+          type: string
+        type:
+          type: string
+          enum:
+            - BOOLEAN
+            - CLASS
+            - DOUBLE
+            - INT
+            - LIST
+            - LONG
+            - PASSWORD
+            - SHORT
+            - STRING
+        required:
+          type: boolean
+        default_value:
+          type: string
+        importance:
+          type: string
+          enum:
+            - LOW
+            - MEDIUM
+            - HIGH
+        documentation:
+          type: string
+        group:
+          type: string
+        width:
+          type: string
+          enum:
+            - SHORT
+            - MEDIUM
+            - LONG
+            - NONE
+        display_name:
+          type: string
+        dependents:
+          type: array
+          items:
+            type: string
+        order:
+          type: integer
+
+    ConnectorPluginConfigValue:
+      type: object
+      properties:
+        name:
+          type: string
+        value:
+          type: string
+        recommended_values:
+          type: array
+          items:
+            type: string
+        errors:
+          type: array
+          items:
+            type: string
+        visible:
+          type: boolean
+
+    ConnectorPluginConfig:
+      type: object
+      properties:
+        definition:
+          $ref: '#/components/schemas/ConnectorPluginConfigDefinition'
+        value:
+          $ref: '#/components/schemas/ConnectorPluginConfigValue'
+
+    ConnectorPluginConfigValidationResponse:
+      type: object
+      properties:
+        name:
+          type: string
+        error_count:
+          type: integer
+        groups:
+          type: array
+          items:
+            type: string
+        configs:
+          type: array
+          items:
+            $ref: '#/components/schemas/ConnectorPluginConfig'
+

+ 558 - 0
kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml

@@ -10,6 +10,7 @@ info:
     url: http://www.apache.org/licenses/LICENSE-2.0
 tags:
   - name: /api/clusters
+  - name: /api/clusters/connects
 servers:
   - url: /localhost
 
@@ -640,6 +641,364 @@ paths:
         404:
           description: Not Found
 
+  /api/clusters/{clusterName}/connects:
+    get:
+      tags:
+        - /api/clusters/connects
+      summary: get all kafka connect instances
+      operationId: getConnects
+      parameters:
+        - name: clusterName
+          in: path
+          required: true
+          schema:
+            type: string
+      responses:
+        200:
+          description: OK
+          content:
+            application/json:
+              schema:
+                type: array
+                items:
+                  $ref: '#/components/schemas/Connect'
+
+  /api/clusters/{clusterName}/connects/{connectName}/connectors:
+    get:
+      tags:
+        - /api/clusters/connects
+      summary: get all connectors from Kafka Connect service
+      operationId: getConnectors
+      parameters:
+        - name: clusterName
+          in: path
+          required: true
+          schema:
+            type: string
+        - name: connectName
+          in: path
+          required: true
+          schema:
+            type: string
+      responses:
+        200:
+          description: OK
+          content:
+            application/json:
+              schema:
+                type: array
+                items:
+                  type: string
+    post:
+      tags:
+        - /api/clusters/connects
+      summary: create new connector
+      operationId: createConnector
+      parameters:
+        - name: clusterName
+          in: path
+          required: true
+          schema:
+            type: string
+        - name: connectName
+          in: path
+          required: true
+          schema:
+            type: string
+      requestBody:
+        content:
+          application/json:
+            schema:
+              $ref: '#/components/schemas/NewConnector'
+      responses:
+        200:
+          description: OK
+          content:
+            application/json:
+              schema:
+                $ref: '#/components/schemas/Connector'
+        409:
+          description: rebalance is in progress
+
+  /api/clusters/{clusterName}/connects/{connectName}/connectors/{connectorName}:
+    get:
+      tags:
+        - /api/clusters/connects
+      summary: get information about the connector
+      operationId: getConnector
+      parameters:
+        - name: clusterName
+          in: path
+          required: true
+          schema:
+            type: string
+        - name: connectName
+          in: path
+          required: true
+          schema:
+            type: string
+        - name: connectorName
+          in: path
+          required: true
+          schema:
+            type: string
+      responses:
+        200:
+          description: OK
+          content:
+            application/json:
+              schema:
+                $ref: '#/components/schemas/Connector'
+    delete:
+      tags:
+        - /api/clusters/connects
+      summary: delete connector
+      operationId: deleteConnector
+      parameters:
+        - name: clusterName
+          in: path
+          required: true
+          schema:
+            type: string
+        - name: connectName
+          in: path
+          required: true
+          schema:
+            type: string
+        - name: connectorName
+          in: path
+          required: true
+          schema:
+            type: string
+      responses:
+        200:
+          description: OK
+        409:
+          description: rebalance is in progress
+
+  /api/clusters/{clusterName}/connects/{connectName}/connectors/{connectorName}/action/{action}:
+    post:
+      tags:
+        - /api/clusters/connects
+      summary: update connector state (restart, pause or resume)
+      operationId: updateConnectorState
+      parameters:
+        - name: clusterName
+          in: path
+          required: true
+          schema:
+            type: string
+        - name: connectName
+          in: path
+          required: true
+          schema:
+            type: string
+        - name: connectorName
+          in: path
+          required: true
+          schema:
+            type: string
+        - name: action
+          in: path
+          required: true
+          schema:
+            $ref: '#/components/schemas/ConnectorAction'
+      responses:
+        200:
+          description: OK
+        409:
+          description: rebalance is in progress
+
+  /api/clusters/{clusterName}/connects/{connectName}/connectors/{connectorName}/config:
+    get:
+      tags:
+        - /api/clusters/connects
+      summary: get connector configuration
+      operationId: getConnectorConfig
+      parameters:
+        - name: clusterName
+          in: path
+          required: true
+          schema:
+            type: string
+        - name: connectName
+          in: path
+          required: true
+          schema:
+            type: string
+        - name: connectorName
+          in: path
+          required: true
+          schema:
+            type: string
+      responses:
+        200:
+          description: OK
+          content:
+            application/json:
+              schema:
+                $ref: '#/components/schemas/ConnectorConfig'
+    put:
+      tags:
+        - /api/clusters/connects
+      summary: update or create connector with provided config
+      operationId: setConnectorConfig
+      parameters:
+        - name: clusterName
+          in: path
+          required: true
+          schema:
+            type: string
+        - name: connectName
+          in: path
+          required: true
+          schema:
+            type: string
+        - name: connectorName
+          in: path
+          required: true
+          schema:
+            type: string
+      requestBody:
+        content:
+          application/json:
+            schema:
+              $ref: '#/components/schemas/ConnectorConfig'
+      responses:
+        200:
+          description: OK
+          content:
+            application/json:
+              schema:
+                $ref: '#/components/schemas/Connector'
+        409:
+          description: rebalance is in progress
+
+  /api/clusters/{clusterName}/connects/{connectName}/connectors/{connectorName}/tasks:
+    get:
+      tags:
+        - /api/clusters/connects
+      summary: get connector tasks
+      operationId: getConnectorTasks
+      parameters:
+        - name: clusterName
+          in: path
+          required: true
+          schema:
+            type: string
+        - name: connectName
+          in: path
+          required: true
+          schema:
+            type: string
+        - name: connectorName
+          in: path
+          required: true
+          schema:
+            type: string
+      responses:
+        200:
+          description: OK
+          content:
+            application/json:
+              schema:
+                type: array
+                items:
+                  $ref: '#/components/schemas/Task'
+
+  /api/clusters/{clusterName}/connects/{connectName}/connectors/{connectorName}/tasks/{taskId}/action/restart:
+    post:
+      tags:
+        - /api/clusters/connects
+      summary: restart connector task
+      operationId: restartConnectorTask
+      parameters:
+        - name: clusterName
+          in: path
+          required: true
+          schema:
+            type: string
+        - name: connectName
+          in: path
+          required: true
+          schema:
+            type: string
+        - name: connectorName
+          in: path
+          required: true
+          schema:
+            type: string
+        - name: taskId
+          in: path
+          required: true
+          schema:
+            type: integer
+      responses:
+        200:
+          description: OK
+
+  /api/clusters/{clusterName}/connects/{connectName}/plugins:
+    get:
+      tags:
+        - /api/clusters/connects
+      summary: get connector plugins
+      operationId: getConnectorPlugins
+      parameters:
+        - name: clusterName
+          in: path
+          required: true
+          schema:
+            type: string
+        - name: connectName
+          in: path
+          required: true
+          schema:
+            type: string
+      responses:
+        200:
+          description: OK
+          content:
+            application/json:
+              schema:
+                type: array
+                items:
+                  $ref: '#/components/schemas/ConnectorPlugin'
+
+  /api/clusters/{clusterName}/connects/{connectName}/plugins/{pluginName}/config/validate:
+    put:
+      tags:
+        - /api/clusters/connects
+      summary: validate connector plugin configuration
+      operationId: validateConnectorPluginConfig
+      parameters:
+        - name: clusterName
+          in: path
+          required: true
+          schema:
+            type: string
+        - name: connectName
+          in: path
+          required: true
+          schema:
+            type: string
+        - name: pluginName
+          in: path
+          required: true
+          schema:
+            type: string
+      requestBody:
+        content:
+          application/json:
+            schema:
+              $ref: '#/components/schemas/ConnectorConfig'
+      responses:
+        200:
+          description: OK
+          content:
+            application/json:
+              schema:
+                $ref: '#/components/schemas/ConnectorPluginConfigValidationResponse'
+
 components:
   schemas:
     Cluster:
@@ -661,6 +1020,8 @@ components:
           type: number
         bytesOutPerSec:
           type: number
+        readOnly:
+          type: boolean
       required:
         - id
         - name
@@ -1018,3 +1379,200 @@ components:
           type: boolean
       required:
         - isCompatible
+
+    Connect:
+      type: object
+      properties:
+        name:
+          type: string
+        address:
+          type: string
+
+    ConnectorConfig:
+      type: object
+      additionalProperties:
+        type: object
+
+    TaskId:
+      type: object
+      properties:
+        connector:
+          type: string
+        task:
+          type: integer
+
+    Task:
+      type: object
+      properties:
+        id:
+          $ref: '#/components/schemas/TaskId'
+        status:
+          $ref: '#/components/schemas/TaskStatus'
+        config:
+          $ref: '#/components/schemas/ConnectorConfig'
+
+    NewConnector:
+      type: object
+      properties:
+        name:
+          type: string
+        config:
+          $ref: '#/components/schemas/ConnectorConfig'
+      required:
+        - name
+        - config
+
+    Connector:
+      allOf:
+        - $ref: '#/components/schemas/NewConnector'
+        - type: object
+          properties:
+            tasks:
+              type: array
+              items:
+                $ref: '#/components/schemas/TaskId'
+            type:
+              type: string
+              enum:
+                - source
+                - sink
+            status:
+              $ref: '#/components/schemas/ConnectorStatus'
+
+
+    TaskStatus:
+      type: object
+      properties:
+        id:
+          type: integer
+        state:
+          type: string
+          enum:
+            - RUNNING
+            - FAILED
+            - PAUSED
+            - UNASSIGNED
+        worker_id:
+          type: string
+        trace:
+          type: string
+
+    ConnectorStatus:
+      type: object
+      properties:
+        state:
+          type: string
+          enum:
+            - RUNNING
+            - FAILED
+            - PAUSED
+            - UNASSIGNED
+        worker_id:
+          type: string
+
+    ConnectorAction:
+      type: string
+      enum:
+        - restart
+        - pause
+        - resume
+
+    TaskAction:
+      type: string
+      enum:
+        - restart
+
+    ConnectorPlugin:
+      type: object
+      properties:
+        class:
+          type: string
+
+    ConnectorPluginConfigDefinition:
+      type: object
+      properties:
+        name:
+          type: string
+        type:
+          type: string
+          enum:
+            - BOOLEAN
+            - CLASS
+            - DOUBLE
+            - INT
+            - LIST
+            - LONG
+            - PASSWORD
+            - SHORT
+            - STRING
+        required:
+          type: boolean
+        default_value:
+          type: string
+        importance:
+          type: string
+          enum:
+            - LOW
+            - MEDIUM
+            - HIGH
+        documentation:
+          type: string
+        group:
+          type: string
+        width:
+          type: string
+          enum:
+            - SHORT
+            - MEDIUM
+            - LONG
+            - NONE
+        display_name:
+          type: string
+        dependents:
+          type: array
+          items:
+            type: string
+        order:
+          type: integer
+
+    ConnectorPluginConfigValue:
+      type: object
+      properties:
+        name:
+          type: string
+        value:
+          type: string
+        recommended_values:
+          type: array
+          items:
+            type: string
+        errors:
+          type: array
+          items:
+            type: string
+        visible:
+          type: boolean
+
+    ConnectorPluginConfig:
+      type: object
+      properties:
+        definition:
+          $ref: '#/components/schemas/ConnectorPluginConfigDefinition'
+        value:
+          $ref: '#/components/schemas/ConnectorPluginConfigValue'
+
+    ConnectorPluginConfigValidationResponse:
+      type: object
+      properties:
+        name:
+          type: string
+        error_count:
+          type: integer
+        groups:
+          type: array
+          items:
+            type: string
+        configs:
+          type: array
+          items:
+            $ref: '#/components/schemas/ConnectorPluginConfig'

+ 3 - 3
kafka-ui-react-app/package-lock.json

@@ -15794,9 +15794,9 @@
       "dev": true
     },
     "react-hook-form": {
-      "version": "6.15.1",
-      "resolved": "https://registry.npmjs.org/react-hook-form/-/react-hook-form-6.15.1.tgz",
-      "integrity": "sha512-bL0LQuQ3OlM3JYfbacKtBPLOHhmgYz8Lj6ivMrvu2M6e1wnt4sbGRtPEPYCc/8z3WDbjrMwfAfLX92OsB65pFA=="
+      "version": "6.15.4",
+      "resolved": "https://registry.npmjs.org/react-hook-form/-/react-hook-form-6.15.4.tgz",
+      "integrity": "sha512-K+Sw33DtTMengs8OdqFJI3glzNl1wBzSefD/ksQw/hJf9CnOHQAU6qy82eOrh0IRNt2G53sjr7qnnw1JDjvx1w=="
     },
     "react-is": {
       "version": "17.0.1",

+ 1 - 1
kafka-ui-react-app/package.json

@@ -17,7 +17,7 @@
     "react": "^17.0.1",
     "react-datepicker": "^3.5.0",
     "react-dom": "^17.0.1",
-    "react-hook-form": "^6.15.1",
+    "react-hook-form": "^6.15.4",
     "react-json-tree": "^0.13.0",
     "react-multi-select-component": "^2.0.14",
     "react-redux": "^7.2.2",

+ 0 - 8
kafka-ui-react-app/src/components/Schemas/Details/Details.tsx

@@ -55,14 +55,6 @@ const Details: React.FC<DetailsProps> = ({
             </div>
           </div>
           <div className="level-right">
-            <button
-              className="button is-primary is-small level-item"
-              type="button"
-              title="in development"
-              disabled
-            >
-              Create Schema
-            </button>
             <button
               className="button is-warning is-small level-item"
               type="button"

+ 0 - 32
kafka-ui-react-app/src/components/Schemas/Details/__test__/__snapshots__/Details.spec.tsx.snap

@@ -51,14 +51,6 @@ exports[`Details View Initial state matches snapshot 1`] = `
       <div
         className="level-right"
       >
-        <button
-          className="button is-primary is-small level-item"
-          disabled={true}
-          title="in development"
-          type="button"
-        >
-          Create Schema
-        </button>
         <button
           className="button is-warning is-small level-item"
           disabled={true}
@@ -165,14 +157,6 @@ exports[`Details View when page with schema versions is loading matches snapshot
       <div
         className="level-right"
       >
-        <button
-          className="button is-primary is-small level-item"
-          disabled={true}
-          title="in development"
-          type="button"
-        >
-          Create Schema
-        </button>
         <button
           className="button is-warning is-small level-item"
           disabled={true}
@@ -258,14 +242,6 @@ exports[`Details View when page with schema versions loaded when schema has vers
       <div
         className="level-right"
       >
-        <button
-          className="button is-primary is-small level-item"
-          disabled={true}
-          title="in development"
-          type="button"
-        >
-          Create Schema
-        </button>
         <button
           className="button is-warning is-small level-item"
           disabled={true}
@@ -397,14 +373,6 @@ exports[`Details View when page with schema versions loaded when versions are em
       <div
         className="level-right"
       >
-        <button
-          className="button is-primary is-small level-item"
-          disabled={true}
-          title="in development"
-          type="button"
-        >
-          Create Schema
-        </button>
         <button
           className="button is-warning is-small level-item"
           disabled={true}

+ 27 - 4
kafka-ui-react-app/src/components/Schemas/List/List.tsx

@@ -1,6 +1,8 @@
 import React from 'react';
 import { SchemaSubject } from 'generated-sources';
-import Breadcrumb from '../../common/Breadcrumb/Breadcrumb';
+import { NavLink, useParams } from 'react-router-dom';
+import { clusterSchemaNewPath } from 'lib/paths';
+import Breadcrumb from 'components/common/Breadcrumb/Breadcrumb';
 import ListItem from './ListItem';
 
 export interface ListProps {
@@ -8,9 +10,24 @@ export interface ListProps {
 }
 
 const List: React.FC<ListProps> = ({ schemas }) => {
+  const { clusterName } = useParams<{ clusterName: string }>();
+
   return (
     <div className="section">
       <Breadcrumb>Schema Registry</Breadcrumb>
+      <div className="box">
+        <div className="level">
+          <div className="level-item level-right">
+            <NavLink
+              className="button is-primary"
+              to={clusterSchemaNewPath(clusterName)}
+            >
+              Create Schema
+            </NavLink>
+          </div>
+        </div>
+      </div>
+
       <div className="box">
         <table className="table is-striped is-fullwidth">
           <thead>
@@ -21,9 +38,15 @@ const List: React.FC<ListProps> = ({ schemas }) => {
             </tr>
           </thead>
           <tbody>
-            {schemas.map((subject) => (
-              <ListItem key={subject.id} subject={subject} />
-            ))}
+            {schemas.length > 0 ? (
+              schemas.map((subject) => (
+                <ListItem key={subject.id} subject={subject} />
+              ))
+            ) : (
+              <tr>
+                <td colSpan={10}>No schemas found</td>
+              </tr>
+            )}
           </tbody>
         </table>
       </div>

+ 1 - 2
kafka-ui-react-app/src/components/Schemas/List/ListContainer.tsx

@@ -1,6 +1,5 @@
 import { connect } from 'react-redux';
 import { RootState } from 'redux/interfaces';
-import { withRouter } from 'react-router-dom';
 import { getSchemaList } from 'redux/reducers/schemas/selectors';
 import List from './List';
 
@@ -8,4 +7,4 @@ const mapStateToProps = (state: RootState) => ({
   schemas: getSchemaList(state),
 });
 
-export default withRouter(connect(mapStateToProps)(List));
+export default connect(mapStateToProps)(List);

+ 9 - 12
kafka-ui-react-app/src/components/Schemas/List/__test__/List.spec.tsx

@@ -1,6 +1,7 @@
 import React from 'react';
+import { mount, shallow } from 'enzyme';
 import { Provider } from 'react-redux';
-import { shallow } from 'enzyme';
+import { StaticRouter } from 'react-router';
 import configureStore from 'redux/store/configureStore';
 import ListContainer from '../ListContainer';
 import List, { ListProps } from '../List';
@@ -22,35 +23,31 @@ describe('List', () => {
   });
 
   describe('View', () => {
+    const pathname = `/ui/clusters/clusterName/schemas`;
+
     const setupWrapper = (props: Partial<ListProps> = {}) => (
-      <List schemas={[]} {...props} />
+      <StaticRouter location={{ pathname }} context={{}}>
+        <List schemas={[]} {...props} />
+      </StaticRouter>
     );
 
     describe('without schemas', () => {
       it('renders table heading without ListItem', () => {
-        const wrapper = shallow(setupWrapper());
+        const wrapper = mount(setupWrapper());
         expect(wrapper.exists('Breadcrumb')).toBeTruthy();
         expect(wrapper.exists('thead')).toBeTruthy();
         expect(wrapper.exists('ListItem')).toBeFalsy();
       });
-
-      it('matches snapshot', () => {
-        expect(shallow(setupWrapper())).toMatchSnapshot();
-      });
     });
 
     describe('with schemas', () => {
-      const wrapper = shallow(setupWrapper({ schemas }));
+      const wrapper = mount(setupWrapper({ schemas }));
 
       it('renders table heading with ListItem', () => {
         expect(wrapper.exists('Breadcrumb')).toBeTruthy();
         expect(wrapper.exists('thead')).toBeTruthy();
         expect(wrapper.find('ListItem').length).toEqual(3);
       });
-
-      it('matches snapshot', () => {
-        expect(shallow(setupWrapper({ schemas }))).toMatchSnapshot();
-      });
     });
   });
 });

+ 0 - 102
kafka-ui-react-app/src/components/Schemas/List/__test__/__snapshots__/List.spec.tsx.snap

@@ -1,102 +0,0 @@
-// Jest Snapshot v1, https://goo.gl/fbAQLP
-
-exports[`List View with schemas matches snapshot 1`] = `
-<div
-  className="section"
->
-  <Breadcrumb>
-    Schema Registry
-  </Breadcrumb>
-  <div
-    className="box"
-  >
-    <table
-      className="table is-striped is-fullwidth"
-    >
-      <thead>
-        <tr>
-          <th>
-            Schema Name
-          </th>
-          <th>
-            Version
-          </th>
-          <th>
-            Compatibility
-          </th>
-        </tr>
-      </thead>
-      <tbody>
-        <ListItem
-          key="1"
-          subject={
-            Object {
-              "compatibilityLevel": "BACKWARD",
-              "id": 1,
-              "schema": "{\\"type\\":\\"record\\",\\"name\\":\\"MyRecord1\\",\\"namespace\\":\\"com.mycompany\\",\\"fields\\":[{\\"name\\":\\"id\\",\\"type\\":\\"long\\"}]}",
-              "subject": "test",
-              "version": "1",
-            }
-          }
-        />
-        <ListItem
-          key="1"
-          subject={
-            Object {
-              "compatibilityLevel": "BACKWARD",
-              "id": 1,
-              "schema": "{\\"type\\":\\"record\\",\\"name\\":\\"MyRecord2\\",\\"namespace\\":\\"com.mycompany\\",\\"fields\\":[{\\"name\\":\\"id\\",\\"type\\":\\"long\\"}]}",
-              "subject": "test2",
-              "version": "1",
-            }
-          }
-        />
-        <ListItem
-          key="1"
-          subject={
-            Object {
-              "compatibilityLevel": "BACKWARD",
-              "id": 1,
-              "schema": "{\\"type\\":\\"record\\",\\"name\\":\\"MyRecord3\\",\\"namespace\\":\\"com.mycompany\\",\\"fields\\":[{\\"name\\":\\"id\\",\\"type\\":\\"long\\"}]}",
-              "subject": "test3",
-              "version": "1",
-            }
-          }
-        />
-      </tbody>
-    </table>
-  </div>
-</div>
-`;
-
-exports[`List View without schemas matches snapshot 1`] = `
-<div
-  className="section"
->
-  <Breadcrumb>
-    Schema Registry
-  </Breadcrumb>
-  <div
-    className="box"
-  >
-    <table
-      className="table is-striped is-fullwidth"
-    >
-      <thead>
-        <tr>
-          <th>
-            Schema Name
-          </th>
-          <th>
-            Version
-          </th>
-          <th>
-            Compatibility
-          </th>
-        </tr>
-      </thead>
-      <tbody />
-    </table>
-  </div>
-</div>
-`;

+ 2 - 2
kafka-ui-react-app/src/components/Schemas/List/__test__/fixtures.ts

@@ -12,7 +12,7 @@ export const schemas: SchemaSubject[] = [
   {
     subject: 'test2',
     version: '1',
-    id: 1,
+    id: 2,
     schema:
       '{"type":"record","name":"MyRecord2","namespace":"com.mycompany","fields":[{"name":"id","type":"long"}]}',
     compatibilityLevel: 'BACKWARD',
@@ -20,7 +20,7 @@ export const schemas: SchemaSubject[] = [
   {
     subject: 'test3',
     version: '1',
-    id: 1,
+    id: 12,
     schema:
       '{"type":"record","name":"MyRecord3","namespace":"com.mycompany","fields":[{"name":"id","type":"long"}]}',
     compatibilityLevel: 'BACKWARD',

+ 115 - 0
kafka-ui-react-app/src/components/Schemas/New/New.tsx

@@ -0,0 +1,115 @@
+import React from 'react';
+import { ClusterName, SchemaName, NewSchemaSubjectRaw } from 'redux/interfaces';
+import { useForm } from 'react-hook-form';
+import { ErrorMessage } from '@hookform/error-message';
+import Breadcrumb from 'components/common/Breadcrumb/Breadcrumb';
+import { clusterSchemaPath, clusterSchemasPath } from 'lib/paths';
+import { NewSchemaSubject } from 'generated-sources';
+import { SCHEMA_NAME_VALIDATION_PATTERN } from 'lib/constants';
+import { useHistory, useParams } from 'react-router';
+
+export interface NewProps {
+  createSchema: (
+    clusterName: ClusterName,
+    subject: SchemaName,
+    newSchemaSubject: NewSchemaSubject
+  ) => void;
+}
+
+const New: React.FC<NewProps> = ({ createSchema }) => {
+  const { clusterName } = useParams<{ clusterName: string }>();
+  const history = useHistory();
+  const {
+    register,
+    errors,
+    handleSubmit,
+    formState: { isDirty, isSubmitting },
+  } = useForm<NewSchemaSubjectRaw>();
+
+  const onSubmit = React.useCallback(
+    async ({ subject, schema }: NewSchemaSubjectRaw) => {
+      try {
+        await createSchema(clusterName, subject, { schema });
+        history.push(clusterSchemaPath(clusterName, subject));
+      } catch (e) {
+        // Show Error
+      }
+    },
+    [clusterName]
+  );
+
+  return (
+    <div className="section">
+      <div className="level">
+        <div className="level-item level-left">
+          <Breadcrumb
+            links={[
+              {
+                href: clusterSchemasPath(clusterName),
+                label: 'Schema Registry',
+              },
+            ]}
+          >
+            New Schema
+          </Breadcrumb>
+        </div>
+      </div>
+
+      <div className="box">
+        <form onSubmit={handleSubmit(onSubmit)}>
+          <div>
+            <div className="field">
+              <label className="label">Subject *</label>
+              <div className="control">
+                <input
+                  className="input"
+                  placeholder="Schema Name"
+                  ref={register({
+                    required: 'Topic Name is required.',
+                    pattern: {
+                      value: SCHEMA_NAME_VALIDATION_PATTERN,
+                      message: 'Only alphanumeric, _, -, and . allowed',
+                    },
+                  })}
+                  name="subject"
+                  autoComplete="off"
+                  disabled={isSubmitting}
+                />
+              </div>
+              <p className="help is-danger">
+                <ErrorMessage errors={errors} name="subject" />
+              </p>
+            </div>
+
+            <div className="field">
+              <label className="label">Schema *</label>
+              <div className="control">
+                <textarea
+                  className="textarea"
+                  ref={register}
+                  name="schema"
+                  disabled={isSubmitting}
+                />
+              </div>
+              <p className="help is-danger">
+                <ErrorMessage errors={errors} name="schema" />
+              </p>
+            </div>
+          </div>
+          <br />
+          <div className="field">
+            <div className="control">
+              <input
+                type="submit"
+                className="button is-primary"
+                disabled={isSubmitting || !isDirty}
+              />
+            </div>
+          </div>
+        </form>
+      </div>
+    </div>
+  );
+};
+
+export default New;

+ 15 - 0
kafka-ui-react-app/src/components/Schemas/New/NewContainer.ts

@@ -0,0 +1,15 @@
+import { connect } from 'react-redux';
+import { RootState } from 'redux/interfaces';
+import { createSchema } from 'redux/actions';
+import { getSchemaCreated } from 'redux/reducers/schemas/selectors';
+import New from './New';
+
+const mapStateToProps = (state: RootState) => ({
+  isSchemaCreated: getSchemaCreated(state),
+});
+
+const mapDispatchToProps = {
+  createSchema,
+};
+
+export default connect(mapStateToProps, mapDispatchToProps)(New);

+ 37 - 0
kafka-ui-react-app/src/components/Schemas/New/__test__/New.spec.tsx

@@ -0,0 +1,37 @@
+import React from 'react';
+import configureStore from 'redux/store/configureStore';
+import { mount, shallow } from 'enzyme';
+import { Provider } from 'react-redux';
+import { StaticRouter } from 'react-router-dom';
+import NewContainer from '../NewContainer';
+import New, { NewProps } from '../New';
+
+describe('New', () => {
+  describe('Container', () => {
+    const store = configureStore();
+
+    it('renders view', () => {
+      const component = shallow(
+        <Provider store={store}>
+          <NewContainer />
+        </Provider>
+      );
+
+      expect(component.exists()).toBeTruthy();
+    });
+  });
+
+  describe('View', () => {
+    const pathname = '/ui/clusters/clusterName/schemas/new';
+
+    const setupWrapper = (props: Partial<NewProps> = {}) => (
+      <StaticRouter location={{ pathname }} context={{}}>
+        <New createSchema={jest.fn()} {...props} />
+      </StaticRouter>
+    );
+
+    it('matches snapshot', () => {
+      expect(mount(setupWrapper())).toMatchSnapshot();
+    });
+  });
+});

+ 189 - 0
kafka-ui-react-app/src/components/Schemas/New/__test__/__snapshots__/New.spec.tsx.snap

@@ -0,0 +1,189 @@
+// Jest Snapshot v1, https://goo.gl/fbAQLP
+
+exports[`New View matches snapshot 1`] = `
+<StaticRouter
+  context={Object {}}
+  location={
+    Object {
+      "pathname": "/ui/clusters/clusterName/schemas/new",
+    }
+  }
+>
+  <Router
+    history={
+      Object {
+        "action": "POP",
+        "block": [Function],
+        "createHref": [Function],
+        "go": [Function],
+        "goBack": [Function],
+        "goForward": [Function],
+        "listen": [Function],
+        "location": Object {
+          "hash": "",
+          "pathname": "/ui/clusters/clusterName/schemas/new",
+          "search": "",
+        },
+        "push": [Function],
+        "replace": [Function],
+      }
+    }
+    staticContext={Object {}}
+  >
+    <New
+      createSchema={[MockFunction]}
+    >
+      <div
+        className="section"
+      >
+        <div
+          className="level"
+        >
+          <div
+            className="level-item level-left"
+          >
+            <Breadcrumb
+              links={
+                Array [
+                  Object {
+                    "href": "/ui/clusters/undefined/schemas",
+                    "label": "Schema Registry",
+                  },
+                ]
+              }
+            >
+              <nav
+                aria-label="breadcrumbs"
+                className="breadcrumb"
+              >
+                <ul>
+                  <li
+                    key="/ui/clusters/undefined/schemas"
+                  >
+                    <NavLink
+                      to="/ui/clusters/undefined/schemas"
+                    >
+                      <Link
+                        aria-current={null}
+                        to={
+                          Object {
+                            "hash": "",
+                            "pathname": "/ui/clusters/undefined/schemas",
+                            "search": "",
+                            "state": null,
+                          }
+                        }
+                      >
+                        <LinkAnchor
+                          aria-current={null}
+                          href="/ui/clusters/undefined/schemas"
+                          navigate={[Function]}
+                        >
+                          <a
+                            aria-current={null}
+                            href="/ui/clusters/undefined/schemas"
+                            onClick={[Function]}
+                          >
+                            Schema Registry
+                          </a>
+                        </LinkAnchor>
+                      </Link>
+                    </NavLink>
+                  </li>
+                  <li
+                    className="is-active"
+                  >
+                    <span
+                      className=""
+                    >
+                      New Schema
+                    </span>
+                  </li>
+                </ul>
+              </nav>
+            </Breadcrumb>
+          </div>
+        </div>
+        <div
+          className="box"
+        >
+          <form
+            onSubmit={[Function]}
+          >
+            <div>
+              <div
+                className="field"
+              >
+                <label
+                  className="label"
+                >
+                  Subject *
+                </label>
+                <div
+                  className="control"
+                >
+                  <input
+                    autoComplete="off"
+                    className="input"
+                    disabled={false}
+                    name="subject"
+                    placeholder="Schema Name"
+                  />
+                </div>
+                <p
+                  className="help is-danger"
+                >
+                  <Component
+                    errors={Object {}}
+                    name="subject"
+                  />
+                </p>
+              </div>
+              <div
+                className="field"
+              >
+                <label
+                  className="label"
+                >
+                  Schema *
+                </label>
+                <div
+                  className="control"
+                >
+                  <textarea
+                    className="textarea"
+                    disabled={false}
+                    name="schema"
+                  />
+                </div>
+                <p
+                  className="help is-danger"
+                >
+                  <Component
+                    errors={Object {}}
+                    name="schema"
+                  />
+                </p>
+              </div>
+            </div>
+            <br />
+            <div
+              className="field"
+            >
+              <div
+                className="control"
+              >
+                <input
+                  className="button is-primary"
+                  disabled={true}
+                  type="submit"
+                />
+              </div>
+            </div>
+          </form>
+        </div>
+      </div>
+    </New>
+  </Router>
+</StaticRouter>
+`;

+ 27 - 21
kafka-ui-react-app/src/components/Schemas/Schemas.tsx

@@ -1,43 +1,49 @@
 import React from 'react';
 import { ClusterName } from 'redux/interfaces';
-import { Switch, Route } from 'react-router-dom';
+import { Switch, Route, useParams } from 'react-router-dom';
 import PageLoader from 'components/common/PageLoader/PageLoader';
 import ListContainer from './List/ListContainer';
 import DetailsContainer from './Details/DetailsContainer';
+import NewContainer from './New/NewContainer';
 
 export interface SchemasProps {
-  isFetched: boolean;
-  clusterName: ClusterName;
+  isFetching: boolean;
   fetchSchemasByClusterName: (clusterName: ClusterName) => void;
 }
 
 const Schemas: React.FC<SchemasProps> = ({
-  isFetched,
+  isFetching,
   fetchSchemasByClusterName,
-  clusterName,
 }) => {
+  const { clusterName } = useParams<{ clusterName: string }>();
+
   React.useEffect(() => {
     fetchSchemasByClusterName(clusterName);
   }, [fetchSchemasByClusterName, clusterName]);
 
-  if (isFetched) {
-    return (
-      <Switch>
-        <Route
-          exact
-          path="/ui/clusters/:clusterName/schemas"
-          component={ListContainer}
-        />
-        <Route
-          exact
-          path="/ui/clusters/:clusterName/schemas/:subject/latest"
-          component={DetailsContainer}
-        />
-      </Switch>
-    );
+  if (isFetching) {
+    return <PageLoader />;
   }
 
-  return <PageLoader />;
+  return (
+    <Switch>
+      <Route
+        exact
+        path="/ui/clusters/:clusterName/schemas"
+        component={ListContainer}
+      />
+      <Route
+        exact
+        path="/ui/clusters/:clusterName/schemas/new"
+        component={NewContainer}
+      />
+      <Route
+        exact
+        path="/ui/clusters/:clusterName/schemas/:subject/latest"
+        component={DetailsContainer}
+      />
+    </Switch>
+  );
 };
 
 export default Schemas;

+ 5 - 22
kafka-ui-react-app/src/components/Schemas/SchemasContainer.tsx

@@ -1,32 +1,15 @@
 import { connect } from 'react-redux';
-import { ClusterName, RootState } from 'redux/interfaces';
+import { RootState } from 'redux/interfaces';
 import { fetchSchemasByClusterName } from 'redux/actions';
-import { getIsSchemaListFetched } from 'redux/reducers/schemas/selectors';
-import { RouteComponentProps, withRouter } from 'react-router-dom';
+import { getIsSchemaListFetching } from 'redux/reducers/schemas/selectors';
 import Schemas from './Schemas';
 
-interface RouteProps {
-  clusterName: ClusterName;
-}
-
-type OwnProps = RouteComponentProps<RouteProps>;
-
-const mapStateToProps = (
-  state: RootState,
-  {
-    match: {
-      params: { clusterName },
-    },
-  }: OwnProps
-) => ({
-  isFetched: getIsSchemaListFetched(state),
-  clusterName,
+const mapStateToProps = (state: RootState) => ({
+  isFetching: getIsSchemaListFetching(state),
 });
 
 const mapDispatchToProps = {
   fetchSchemasByClusterName,
 };
 
-export default withRouter(
-  connect(mapStateToProps, mapDispatchToProps)(Schemas)
-);
+export default connect(mapStateToProps, mapDispatchToProps)(Schemas);

+ 11 - 26
kafka-ui-react-app/src/components/Schemas/__test__/Schemas.spec.tsx

@@ -1,10 +1,8 @@
 import React from 'react';
 import { Provider } from 'react-redux';
-import { shallow } from 'enzyme';
+import { mount } from 'enzyme';
 import configureStore from 'redux/store/configureStore';
 import { StaticRouter } from 'react-router-dom';
-import { match } from 'react-router';
-import { ClusterName } from 'redux/interfaces';
 import Schemas, { SchemasProps } from '../Schemas';
 import SchemasContainer from '../SchemasContainer';
 
@@ -15,7 +13,7 @@ describe('Schemas', () => {
     const store = configureStore();
 
     it('renders view', () => {
-      const component = shallow(
+      const component = mount(
         <Provider store={store}>
           <StaticRouter location={{ pathname }} context={{}}>
             <SchemasContainer />
@@ -28,12 +26,13 @@ describe('Schemas', () => {
 
     describe('View', () => {
       const setupWrapper = (props: Partial<SchemasProps> = {}) => (
-        <Schemas
-          isFetched
-          clusterName="Test"
-          fetchSchemasByClusterName={jest.fn()}
-          {...props}
-        />
+        <StaticRouter location={{ pathname }} context={{}}>
+          <Schemas
+            isFetching
+            fetchSchemasByClusterName={jest.fn()}
+            {...props}
+          />
+        </StaticRouter>
       );
       describe('Initial state', () => {
         let useEffect: jest.SpyInstance<
@@ -43,7 +42,6 @@ describe('Schemas', () => {
             deps?: React.DependencyList | undefined
           ]
         >;
-        let wrapper;
         const mockedFn = jest.fn();
 
         const mockedUseEffect = () => {
@@ -53,33 +51,20 @@ describe('Schemas', () => {
         beforeEach(() => {
           useEffect = jest.spyOn(React, 'useEffect');
           mockedUseEffect();
-
-          wrapper = shallow(
-            setupWrapper({ fetchSchemasByClusterName: mockedFn })
-          );
         });
 
         it('should call fetchSchemasByClusterName every render', () => {
+          mount(setupWrapper({ fetchSchemasByClusterName: mockedFn }));
           expect(mockedFn).toHaveBeenCalled();
         });
-
-        it('matches snapshot', () => {
-          expect(
-            shallow(setupWrapper({ fetchSchemasByClusterName: mockedFn }))
-          ).toMatchSnapshot();
-        });
       });
 
       describe('when page is loading', () => {
-        const wrapper = shallow(setupWrapper({ isFetched: false }));
+        const wrapper = mount(setupWrapper({ isFetching: true }));
 
         it('renders PageLoader', () => {
           expect(wrapper.exists('PageLoader')).toBeTruthy();
         });
-
-        it('matches snapshot', () => {
-          expect(shallow(setupWrapper({ isFetched: false }))).toMatchSnapshot();
-        });
       });
     });
   });

+ 0 - 18
kafka-ui-react-app/src/components/Schemas/__test__/__snapshots__/Schemas.spec.tsx.snap

@@ -1,18 +0,0 @@
-// Jest Snapshot v1, https://goo.gl/fbAQLP
-
-exports[`Schemas Container View Initial state matches snapshot 1`] = `
-<Switch>
-  <Route
-    component={[Function]}
-    exact={true}
-    path="/ui/clusters/:clusterName/schemas"
-  />
-  <Route
-    component={[Function]}
-    exact={true}
-    path="/ui/clusters/:clusterName/schemas/:subject/latest"
-  />
-</Switch>
-`;
-
-exports[`Schemas Container View when page is loading matches snapshot 1`] = `<PageLoader />`;

+ 1 - 0
kafka-ui-react-app/src/lib/constants.ts

@@ -9,6 +9,7 @@ export const BASE_PARAMS: ConfigurationParameters = {
 };
 
 export const TOPIC_NAME_VALIDATION_PATTERN = RegExp(/^[.,A-Za-z0-9_-]+$/);
+export const SCHEMA_NAME_VALIDATION_PATTERN = RegExp(/^[.,A-Za-z0-9_-]+$/);
 
 export const MILLISECONDS_IN_WEEK = 604_800_000;
 export const MILLISECONDS_IN_DAY = 86_400_000;

+ 8 - 1
kafka-ui-react-app/src/lib/paths.ts

@@ -1,4 +1,4 @@
-import { ClusterName, TopicName } from 'redux/interfaces';
+import { ClusterName, SchemaName, TopicName } from 'redux/interfaces';
 
 const clusterPath = (clusterName: ClusterName) => `/ui/clusters/${clusterName}`;
 
@@ -12,6 +12,8 @@ export const clusterConsumerGroupsPath = (clusterName: ClusterName) =>
   `${clusterPath(clusterName)}/consumer-groups`;
 export const clusterSchemasPath = (clusterName: ClusterName) =>
   `${clusterPath(clusterName)}/schemas`;
+export const clusterSchemaNewPath = (clusterName: ClusterName) =>
+  `${clusterPath(clusterName)}/schemas/new`;
 
 export const clusterTopicPath = (
   clusterName: ClusterName,
@@ -30,3 +32,8 @@ export const clusterTopicsTopicEditPath = (
   clusterName: ClusterName,
   topicName: TopicName
 ) => `${clusterTopicsPath(clusterName)}/${topicName}/edit`;
+
+export const clusterSchemaPath = (
+  clusterName: ClusterName,
+  subject: SchemaName
+) => `${clusterSchemasPath(clusterName)}/${subject}/latest`;

+ 26 - 3
kafka-ui-react-app/src/redux/actions/__test__/actions.spec.ts

@@ -6,13 +6,13 @@ import * as actions from '../actions';
 
 describe('Actions', () => {
   describe('fetchClusterStatsAction', () => {
-    it('creates an REQUEST action', () => {
+    it('creates a REQUEST action', () => {
       expect(actions.fetchClusterStatsAction.request()).toEqual({
         type: 'GET_CLUSTER_STATUS__REQUEST',
       });
     });
 
-    it('creates an SUCCESS action', () => {
+    it('creates a SUCCESS action', () => {
       expect(
         actions.fetchClusterStatsAction.success({ brokerCount: 1 })
       ).toEqual({
@@ -23,7 +23,7 @@ describe('Actions', () => {
       });
     });
 
-    it('creates an FAILURE action', () => {
+    it('creates a FAILURE action', () => {
       expect(actions.fetchClusterStatsAction.failure()).toEqual({
         type: 'GET_CLUSTER_STATUS__FAILURE',
       });
@@ -75,4 +75,27 @@ describe('Actions', () => {
       });
     });
   });
+
+  describe('createSchemaAction', () => {
+    it('creates a REQUEST action', () => {
+      expect(actions.createSchemaAction.request()).toEqual({
+        type: 'POST_SCHEMA__REQUEST',
+      });
+    });
+
+    it('creates a SUCCESS action', () => {
+      expect(
+        actions.createSchemaAction.success(schemaVersionsPayload[0])
+      ).toEqual({
+        type: 'POST_SCHEMA__SUCCESS',
+        payload: schemaVersionsPayload[0],
+      });
+    });
+
+    it('creates a FAILURE action', () => {
+      expect(actions.createSchemaAction.failure()).toEqual({
+        type: 'POST_SCHEMA__FAILURE',
+      });
+    });
+  });
 });

+ 6 - 1
kafka-ui-react-app/src/redux/actions/__test__/fixtures.ts

@@ -1,4 +1,4 @@
-import { ClusterStats } from 'generated-sources';
+import { ClusterStats, NewSchemaSubject } from 'generated-sources';
 
 export const clusterStats: ClusterStats = {
   brokerCount: 1,
@@ -11,3 +11,8 @@ export const clusterStats: ClusterStats = {
   underReplicatedPartitionCount: 0,
   diskUsage: [{ brokerId: 1, segmentSize: 6538, segmentCount: 6 }],
 };
+
+export const schemaPayload: NewSchemaSubject = {
+  schema:
+    '{"type":"record","name":"MyRecord1","namespace":"com.mycompany","fields":[{"name":"id","type":"long"}]}',
+};

+ 32 - 0
kafka-ui-react-app/src/redux/actions/__test__/thunks.spec.ts

@@ -105,4 +105,36 @@ describe('Thunks', () => {
       ]);
     });
   });
+
+  describe('createSchema', () => {
+    it('creates POST_SCHEMA__SUCCESS when posting new schema', async () => {
+      fetchMock.postOnce(`/api/clusters/${clusterName}/schemas/${subject}`, {
+        body: schemaFixtures.schemaVersionsPayload[0],
+      });
+      await store.dispatch(
+        thunks.createSchema(clusterName, subject, fixtures.schemaPayload)
+      );
+      expect(store.getActions()).toEqual([
+        actions.createSchemaAction.request(),
+        actions.createSchemaAction.success(
+          schemaFixtures.schemaVersionsPayload[0]
+        ),
+      ]);
+    });
+
+    // it('creates POST_SCHEMA__FAILURE when posting new schema', async () => {
+    //   fetchMock.postOnce(
+    //     `/api/clusters/${clusterName}/schemas/${subject}`,
+    //     404
+    //   );
+    //   await store.dispatch(
+    //     thunks.createSchema(clusterName, subject, fixtures.schemaPayload)
+    //   );
+    //   expect(store.getActions()).toEqual([
+    //     actions.createSchemaAction.request(),
+    //     actions.createSchemaAction.failure(),
+    //   ]);
+    //   expect(store.getActions()).toThrow();
+    // });
+  });
 });

+ 6 - 0
kafka-ui-react-app/src/redux/actions/actions.ts

@@ -109,3 +109,9 @@ export const fetchSchemaVersionsAction = createAsyncAction(
   'GET_SCHEMA_VERSIONS__SUCCESS',
   'GET_SCHEMA_VERSIONS__FAILURE'
 )<undefined, SchemaSubject[], undefined>();
+
+export const createSchemaAction = createAsyncAction(
+  'POST_SCHEMA__REQUEST',
+  'POST_SCHEMA__SUCCESS',
+  'POST_SCHEMA__FAILURE'
+)<undefined, SchemaSubject, undefined>();

+ 21 - 0
kafka-ui-react-app/src/redux/actions/thunks.ts

@@ -5,6 +5,8 @@ import {
   Topic,
   TopicFormData,
   TopicConfig,
+  NewSchemaSubject,
+  SchemaSubject,
 } from 'generated-sources';
 import {
   ConsumerGroupID,
@@ -280,3 +282,22 @@ export const fetchSchemaVersions = (
     dispatch(actions.fetchSchemaVersionsAction.failure());
   }
 };
+
+export const createSchema = (
+  clusterName: ClusterName,
+  subject: SchemaName,
+  newSchemaSubject: NewSchemaSubject
+): PromiseThunkResult => async (dispatch) => {
+  dispatch(actions.createSchemaAction.request());
+  try {
+    const schema: SchemaSubject = await apiClient.createNewSchema({
+      clusterName,
+      subject,
+      newSchemaSubject,
+    });
+    dispatch(actions.createSchemaAction.success(schema));
+  } catch (e) {
+    dispatch(actions.createSchemaAction.failure());
+    throw e;
+  }
+};

+ 5 - 1
kafka-ui-react-app/src/redux/interfaces/schema.ts

@@ -1,4 +1,4 @@
-import { SchemaSubject } from 'generated-sources';
+import { NewSchemaSubject, SchemaSubject } from 'generated-sources';
 
 export type SchemaName = string;
 
@@ -7,3 +7,7 @@ export interface SchemasState {
   allNames: SchemaName[];
   currentSchemaVersions: SchemaSubject[];
 }
+
+export interface NewSchemaSubjectRaw extends NewSchemaSubject {
+  subject: string;
+}

+ 5 - 3
kafka-ui-react-app/src/redux/reducers/brokers/reducer.ts

@@ -1,5 +1,7 @@
 import { Action, BrokersState, ZooKeeperStatus } from 'redux/interfaces';
 import { ClusterStats } from 'generated-sources';
+import { getType } from 'typesafe-actions';
+import * as actions from 'redux/actions';
 
 export const initialState: BrokersState = {
   items: [],
@@ -35,14 +37,14 @@ const updateBrokerSegmentSize = (
 
 const reducer = (state = initialState, action: Action): BrokersState => {
   switch (action.type) {
-    case 'GET_BROKERS__REQUEST':
+    case getType(actions.fetchBrokersAction.request):
       return initialState;
-    case 'GET_BROKERS__SUCCESS':
+    case getType(actions.fetchBrokersAction.success):
       return {
         ...state,
         items: action.payload,
       };
-    case 'GET_CLUSTER_STATUS__SUCCESS':
+    case getType(actions.fetchClusterStatsAction.success):
       return updateBrokerSegmentSize(state, action.payload);
     default:
       return state;

+ 3 - 1
kafka-ui-react-app/src/redux/reducers/clusters/reducer.ts

@@ -1,11 +1,13 @@
 import { Action } from 'redux/interfaces';
 import { Cluster } from 'generated-sources';
+import { getType } from 'typesafe-actions';
+import * as actions from 'redux/actions';
 
 export const initialState: Cluster[] = [];
 
 const reducer = (state = initialState, action: Action): Cluster[] => {
   switch (action.type) {
-    case 'GET_CLUSTERS__SUCCESS':
+    case getType(actions.fetchClusterListAction.success):
       return action.payload;
     default:
       return state;

+ 4 - 2
kafka-ui-react-app/src/redux/reducers/consumerGroups/reducer.ts

@@ -1,5 +1,7 @@
 import { Action, ConsumerGroupsState } from 'redux/interfaces';
 import { ConsumerGroup } from 'generated-sources';
+import { getType } from 'typesafe-actions';
+import * as actions from 'redux/actions';
 
 export const initialState: ConsumerGroupsState = {
   byID: {},
@@ -33,9 +35,9 @@ const updateConsumerGroupsList = (
 
 const reducer = (state = initialState, action: Action): ConsumerGroupsState => {
   switch (action.type) {
-    case 'GET_CONSUMER_GROUPS__SUCCESS':
+    case getType(actions.fetchConsumerGroupsAction.success):
       return updateConsumerGroupsList(state, action.payload);
-    case 'GET_CONSUMER_GROUP_DETAILS__SUCCESS':
+    case getType(actions.fetchConsumerGroupDetailsAction.success):
       return {
         ...state,
         byID: {

+ 18 - 0
kafka-ui-react-app/src/redux/reducers/schemas/__test__/__snapshots__/reducer.spec.ts.snap

@@ -56,3 +56,21 @@ Object {
   ],
 }
 `;
+
+exports[`Schemas reducer reacts on POST_SCHEMA__SUCCESS and returns payload 1`] = `
+Object {
+  "allNames": Array [
+    "test",
+  ],
+  "byName": Object {
+    "test": Object {
+      "compatibilityLevel": "BACKWARD",
+      "id": 1,
+      "schema": "{\\"type\\":\\"record\\",\\"name\\":\\"MyRecord1\\",\\"namespace\\":\\"com.mycompany\\",\\"fields\\":[{\\"name\\":\\"id\\",\\"type\\":\\"long\\"}]}",
+      "subject": "test",
+      "version": "1",
+    },
+  },
+  "currentSchemaVersions": Array [],
+}
+`;

+ 44 - 0
kafka-ui-react-app/src/redux/reducers/schemas/__test__/fixtures.ts

@@ -52,3 +52,47 @@ export const schemaVersionsPayload: SchemaSubject[] = [
     compatibilityLevel: 'BACKWARD',
   },
 ];
+
+export const newSchemaPayload: SchemaSubject = {
+  subject: 'test4',
+  version: '2',
+  id: 2,
+  schema:
+    '{"type":"record","name":"MyRecord4","namespace":"com.mycompany","fields":[{"name":"id","type":"long"}]}',
+  compatibilityLevel: 'BACKWARD',
+};
+
+export const clusterSchemasPayloadWithNewSchema: SchemaSubject[] = [
+  {
+    subject: 'test2',
+    version: '3',
+    id: 4,
+    schema:
+      '{"type":"record","name":"MyRecord4","namespace":"com.mycompany","fields":[{"name":"id","type":"long"}]}',
+    compatibilityLevel: 'BACKWARD',
+  },
+  {
+    subject: 'test3',
+    version: '1',
+    id: 5,
+    schema:
+      '{"type":"record","name":"MyRecord","namespace":"com.mycompany","fields":[{"name":"id","type":"long"}]}',
+    compatibilityLevel: 'BACKWARD',
+  },
+  {
+    subject: 'test',
+    version: '2',
+    id: 2,
+    schema:
+      '{"type":"record","name":"MyRecord2","namespace":"com.mycompany","fields":[{"name":"id","type":"long"}]}',
+    compatibilityLevel: 'BACKWARD',
+  },
+  {
+    subject: 'test4',
+    version: '2',
+    id: 2,
+    schema:
+      '{"type":"record","name":"MyRecord4","namespace":"com.mycompany","fields":[{"name":"id","type":"long"}]}',
+    compatibilityLevel: 'BACKWARD',
+  },
+];

+ 10 - 0
kafka-ui-react-app/src/redux/reducers/schemas/__test__/reducer.spec.ts

@@ -1,4 +1,5 @@
 import {
+  createSchemaAction,
   fetchSchemasByClusterNameAction,
   fetchSchemaVersionsAction,
 } from 'redux/actions';
@@ -17,6 +18,9 @@ describe('Schemas reducer', () => {
     expect(reducer(undefined, fetchSchemaVersionsAction.request())).toEqual(
       initialState
     );
+    expect(reducer(undefined, createSchemaAction.request())).toEqual(
+      initialState
+    );
   });
 
   it('reacts on GET_CLUSTER_SCHEMAS__SUCCESS and returns payload', () => {
@@ -36,4 +40,10 @@ describe('Schemas reducer', () => {
       )
     ).toMatchSnapshot();
   });
+
+  it('reacts on POST_SCHEMA__SUCCESS and returns payload', () => {
+    expect(
+      reducer(undefined, createSchemaAction.success(schemaVersionsPayload[0]))
+    ).toMatchSnapshot();
+  });
 });

+ 11 - 2
kafka-ui-react-app/src/redux/reducers/schemas/__test__/selectors.spec.ts

@@ -1,10 +1,16 @@
 import {
+  createSchemaAction,
   fetchSchemasByClusterNameAction,
   fetchSchemaVersionsAction,
 } from 'redux/actions';
 import configureStore from 'redux/store/configureStore';
 import * as selectors from '../selectors';
-import { clusterSchemasPayload, schemaVersionsPayload } from './fixtures';
+import {
+  clusterSchemasPayload,
+  clusterSchemasPayloadWithNewSchema,
+  newSchemaPayload,
+  schemaVersionsPayload,
+} from './fixtures';
 
 const store = configureStore();
 
@@ -13,6 +19,7 @@ describe('Schemas selectors', () => {
     it('returns fetch status', () => {
       expect(selectors.getIsSchemaListFetched(store.getState())).toBeFalsy();
       expect(selectors.getIsSchemaVersionFetched(store.getState())).toBeFalsy();
+      expect(selectors.getSchemaCreated(store.getState())).toBeFalsy();
     });
 
     it('returns schema list', () => {
@@ -34,6 +41,7 @@ describe('Schemas selectors', () => {
         fetchSchemasByClusterNameAction.success(clusterSchemasPayload)
       );
       store.dispatch(fetchSchemaVersionsAction.success(schemaVersionsPayload));
+      store.dispatch(createSchemaAction.success(newSchemaPayload));
     });
 
     it('returns fetch status', () => {
@@ -41,11 +49,12 @@ describe('Schemas selectors', () => {
       expect(
         selectors.getIsSchemaVersionFetched(store.getState())
       ).toBeTruthy();
+      expect(selectors.getSchemaCreated(store.getState())).toBeTruthy();
     });
 
     it('returns schema list', () => {
       expect(selectors.getSchemaList(store.getState())).toEqual(
-        clusterSchemasPayload
+        clusterSchemasPayloadWithNewSchema
       );
     });
 

+ 14 - 0
kafka-ui-react-app/src/redux/reducers/schemas/reducer.ts

@@ -32,12 +32,26 @@ const updateSchemaList = (
   }, initialMemo);
 };
 
+const addToSchemaList = (
+  state: SchemasState,
+  payload: SchemaSubject
+): SchemasState => {
+  const newState: SchemasState = {
+    ...state,
+  };
+  newState.allNames.push(payload.subject as string);
+  newState.byName[payload.subject as string] = { ...payload };
+  return newState;
+};
+
 const reducer = (state = initialState, action: Action): SchemasState => {
   switch (action.type) {
     case 'GET_CLUSTER_SCHEMAS__SUCCESS':
       return updateSchemaList(state, action.payload);
     case 'GET_SCHEMA_VERSIONS__SUCCESS':
       return { ...state, currentSchemaVersions: action.payload };
+    case 'POST_SCHEMA__SUCCESS':
+      return addToSchemaList(state, action.payload);
     default:
       return state;
   }

+ 12 - 0
kafka-ui-react-app/src/redux/reducers/schemas/selectors.ts

@@ -15,16 +15,28 @@ const getSchemaVersionsFetchingStatus = createFetchingSelector(
   'GET_SCHEMA_VERSIONS'
 );
 
+const getSchemaCreationStatus = createFetchingSelector('POST_SCHEMA');
+
 export const getIsSchemaListFetched = createSelector(
   getSchemaListFetchingStatus,
   (status) => status === 'fetched'
 );
 
+export const getIsSchemaListFetching = createSelector(
+  getSchemaListFetchingStatus,
+  (status) => status === 'fetching' || status === 'notFetched'
+);
+
 export const getIsSchemaVersionFetched = createSelector(
   getSchemaVersionsFetchingStatus,
   (status) => status === 'fetched'
 );
 
+export const getSchemaCreated = createSelector(
+  getSchemaCreationStatus,
+  (status) => status === 'fetched'
+);
+
 export const getSchemaList = createSelector(
   getIsSchemaListFetched,
   getAllNames,

+ 7 - 5
kafka-ui-react-app/src/redux/reducers/topics/reducer.ts

@@ -1,6 +1,8 @@
 import { v4 } from 'uuid';
 import { Topic, TopicMessage } from 'generated-sources';
 import { Action, TopicsState } from 'redux/interfaces';
+import { getType } from 'typesafe-actions';
+import * as actions from 'redux/actions';
 
 export const initialState: TopicsState = {
   byName: {},
@@ -67,9 +69,9 @@ const transformTopicMessages = (
 
 const reducer = (state = initialState, action: Action): TopicsState => {
   switch (action.type) {
-    case 'GET_TOPICS__SUCCESS':
+    case getType(actions.fetchTopicsListAction.success):
       return updateTopicList(state, action.payload);
-    case 'GET_TOPIC_DETAILS__SUCCESS':
+    case getType(actions.fetchTopicDetailsAction.success):
       return {
         ...state,
         byName: {
@@ -80,9 +82,9 @@ const reducer = (state = initialState, action: Action): TopicsState => {
           },
         },
       };
-    case 'GET_TOPIC_MESSAGES__SUCCESS':
+    case getType(actions.fetchTopicMessagesAction.success):
       return transformTopicMessages(state, action.payload);
-    case 'GET_TOPIC_CONFIG__SUCCESS':
+    case getType(actions.fetchTopicConfigAction.success):
       return {
         ...state,
         byName: {
@@ -96,7 +98,7 @@ const reducer = (state = initialState, action: Action): TopicsState => {
           },
         },
       };
-    case 'POST_TOPIC__SUCCESS':
+    case getType(actions.createTopicAction.success):
       return addToTopicList(state, action.payload);
     default:
       return state;

+ 5 - 0
pom.xml

@@ -24,6 +24,9 @@
 		<dockerfile-maven-plugin.version>1.4.10</dockerfile-maven-plugin.version>
 		<frontend-maven-plugin.version>1.8.0</frontend-maven-plugin.version>
 		<maven-compiler-plugin.version>3.5.1</maven-compiler-plugin.version>
+		<maven-clean-plugin.version>3.1.0</maven-clean-plugin.version>
+		<maven-resources-plugin.version>3.1.0</maven-resources-plugin.version>
+		<maven-surefire-plugin.version>2.22.0</maven-surefire-plugin.version>
 		<openapi-generator-maven-plugin.version>4.3.0</openapi-generator-maven-plugin.version>
 		<swagger-annotations.version>1.6.0</swagger-annotations.version>
 		<springdoc-openapi-webflux-ui.version>1.2.32</springdoc-openapi-webflux-ui.version>
@@ -33,6 +36,8 @@
 		<apache.commons.version>2.2</apache.commons.version>
 		<test.containers.version>1.15.1</test.containers.version>
 		<junit-jupiter-engine.version>5.4.0</junit-jupiter-engine.version>
+
+		<frontend-generated-sources-directory>..//kafka-ui-react-app/src/generated-sources</frontend-generated-sources-directory>
 	</properties>
 
 	<repositories>