소스 검색

Implement SchemaRegistry openapi client (#3123)

Failover clients for SchemaRegistry, Kafka-connect, KSQL:
1. SchemeRegistry openapi client generation
2. WebClient configuration logic moved to WebClientConfigurator class
3. ReactiveFailover utility implemented 
4. Connect, Ksql moved to ReactiveFailover usage
Ilya Kuramshin 2 년 전
부모
커밋
799c2c455a
32개의 변경된 파일1508개의 추가작업 그리고 1119개의 파일을 삭제
  1. 0 22
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/client/KafkaConnectClientsFactory.java
  2. 20 71
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/client/RetryingKafkaConnectClient.java
  3. 6 0
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/config/ClustersProperties.java
  4. 3 3
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/KafkaConnectController.java
  5. 64 61
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/SchemasController.java
  6. 2 2
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/exception/SchemaCompatibilityException.java
  7. 0 12
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/exception/SchemaTypeNotSupportedException.java
  8. 1 99
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/mapper/ClusterMapper.java
  9. 37 0
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/mapper/KafkaSrMapper.java
  10. 0 59
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/FailoverUrlList.java
  11. 0 19
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/InternalKsqlServer.java
  12. 0 33
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/InternalSchemaRegistry.java
  13. 11 4
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/KafkaCluster.java
  14. 0 21
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/KafkaConnectCluster.java
  15. 2 3
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/ClustersStorage.java
  16. 5 4
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/FeatureService.java
  17. 133 0
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/KafkaClusterFactory.java
  18. 40 38
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/KafkaConnectService.java
  19. 105 391
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/SchemaRegistryService.java
  20. 52 63
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/ksql/KsqlApiClient.java
  21. 6 14
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/ksql/KsqlServiceV2.java
  22. 154 0
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/util/ReactiveFailover.java
  23. 0 66
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/util/SecuredWebClient.java
  24. 136 0
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/util/WebClientConfigurator.java
  25. 0 69
      kafka-ui-api/src/test/java/com/provectus/kafka/ui/model/FailoverUrlListTest.java
  26. 43 43
      kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/SchemaRegistryPaginationTest.java
  27. 0 2
      kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/TopicsServicePaginationTest.java
  28. 5 7
      kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/ksql/KsqlApiClientTest.java
  29. 20 13
      kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/ksql/KsqlServiceV2Test.java
  30. 233 0
      kafka-ui-api/src/test/java/com/provectus/kafka/ui/util/ReactiveFailoverTest.java
  31. 26 0
      kafka-ui-contract/pom.xml
  32. 404 0
      kafka-ui-contract/src/main/resources/swagger/kafka-sr-api.yaml

+ 0 - 22
kafka-ui-api/src/main/java/com/provectus/kafka/ui/client/KafkaConnectClientsFactory.java

@@ -1,22 +0,0 @@
-package com.provectus.kafka.ui.client;
-
-import com.provectus.kafka.ui.connect.api.KafkaConnectClientApi;
-import com.provectus.kafka.ui.model.KafkaConnectCluster;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-import org.springframework.beans.factory.annotation.Value;
-import org.springframework.stereotype.Service;
-import org.springframework.util.unit.DataSize;
-
-@Service
-public class KafkaConnectClientsFactory {
-
-  @Value("${webclient.max-in-memory-buffer-size:20MB}")
-  private DataSize maxBuffSize;
-
-  private final Map<String, KafkaConnectClientApi> cache = new ConcurrentHashMap<>();
-
-  public KafkaConnectClientApi withKafkaConnectConfig(KafkaConnectCluster config) {
-    return cache.computeIfAbsent(config.getAddress(), s -> new RetryingKafkaConnectClient(config, maxBuffSize));
-  }
-}

+ 20 - 71
kafka-ui-api/src/main/java/com/provectus/kafka/ui/client/RetryingKafkaConnectClient.java

@@ -1,49 +1,29 @@
 package com.provectus.kafka.ui.client;
 
-import com.fasterxml.jackson.databind.DeserializationFeature;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
+import static com.provectus.kafka.ui.config.ClustersProperties.ConnectCluster;
+
 import com.provectus.kafka.ui.connect.ApiClient;
-import com.provectus.kafka.ui.connect.RFC3339DateFormat;
 import com.provectus.kafka.ui.connect.api.KafkaConnectClientApi;
 import com.provectus.kafka.ui.connect.model.Connector;
 import com.provectus.kafka.ui.connect.model.NewConnector;
 import com.provectus.kafka.ui.exception.KafkaConnectConflictReponseException;
 import com.provectus.kafka.ui.exception.ValidationException;
-import com.provectus.kafka.ui.model.InternalSchemaRegistry;
-import com.provectus.kafka.ui.model.KafkaCluster;
-import com.provectus.kafka.ui.model.KafkaConnectCluster;
-import com.provectus.kafka.ui.util.SecuredWebClient;
-import io.netty.handler.ssl.SslContext;
-import io.netty.handler.ssl.SslContextBuilder;
-import java.io.FileInputStream;
-import java.security.KeyStore;
-import java.text.DateFormat;
+import com.provectus.kafka.ui.util.WebClientConfigurator;
 import java.time.Duration;
 import java.util.List;
 import java.util.Map;
-import java.util.TimeZone;
-import javax.net.ssl.KeyManagerFactory;
-import javax.net.ssl.TrustManagerFactory;
 import lombok.extern.slf4j.Slf4j;
-import org.openapitools.jackson.nullable.JsonNullableModule;
 import org.springframework.core.ParameterizedTypeReference;
 import org.springframework.http.HttpHeaders;
 import org.springframework.http.HttpMethod;
 import org.springframework.http.MediaType;
-import org.springframework.http.client.reactive.ReactorClientHttpConnector;
-import org.springframework.http.codec.json.Jackson2JsonDecoder;
-import org.springframework.http.codec.json.Jackson2JsonEncoder;
 import org.springframework.util.MultiValueMap;
-import org.springframework.util.ResourceUtils;
 import org.springframework.util.unit.DataSize;
 import org.springframework.web.client.RestClientException;
-import org.springframework.web.reactive.function.client.ExchangeStrategies;
 import org.springframework.web.reactive.function.client.WebClient;
 import org.springframework.web.reactive.function.client.WebClientResponseException;
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
-import reactor.netty.http.client.HttpClient;
 import reactor.util.retry.Retry;
 
 @Slf4j
@@ -51,7 +31,7 @@ public class RetryingKafkaConnectClient extends KafkaConnectClientApi {
   private static final int MAX_RETRIES = 5;
   private static final Duration RETRIES_DELAY = Duration.ofMillis(200);
 
-  public RetryingKafkaConnectClient(KafkaConnectCluster config, DataSize maxBuffSize) {
+  public RetryingKafkaConnectClient(ConnectCluster config, DataSize maxBuffSize) {
     super(new RetryingApiClient(config, maxBuffSize));
   }
 
@@ -97,58 +77,27 @@ public class RetryingKafkaConnectClient extends KafkaConnectClientApi {
 
   private static class RetryingApiClient extends ApiClient {
 
-    private static final DateFormat dateFormat = getDefaultDateFormat();
-    private static final ObjectMapper mapper = buildObjectMapper(dateFormat);
-
-    public RetryingApiClient(KafkaConnectCluster config, DataSize maxBuffSize) {
-      super(buildWebClient(mapper, maxBuffSize, config), mapper, dateFormat);
+    public RetryingApiClient(ConnectCluster config, DataSize maxBuffSize) {
+      super(buildWebClient(maxBuffSize, config), null, null);
       setBasePath(config.getAddress());
       setUsername(config.getUserName());
       setPassword(config.getPassword());
     }
 
-    public static DateFormat getDefaultDateFormat() {
-      DateFormat dateFormat = new RFC3339DateFormat();
-      dateFormat.setTimeZone(TimeZone.getTimeZone("UTC"));
-      return dateFormat;
-    }
-
-    public static WebClient buildWebClient(ObjectMapper mapper, DataSize maxBuffSize, KafkaConnectCluster config) {
-      ExchangeStrategies strategies = ExchangeStrategies
-              .builder()
-              .codecs(clientDefaultCodecsConfigurer -> {
-                clientDefaultCodecsConfigurer.defaultCodecs()
-                        .jackson2JsonEncoder(new Jackson2JsonEncoder(mapper, MediaType.APPLICATION_JSON));
-                clientDefaultCodecsConfigurer.defaultCodecs()
-                        .jackson2JsonDecoder(new Jackson2JsonDecoder(mapper, MediaType.APPLICATION_JSON));
-                clientDefaultCodecsConfigurer.defaultCodecs()
-                        .maxInMemorySize((int) maxBuffSize.toBytes());
-              })
-              .build();
-
-      try {
-        WebClient.Builder webClient = SecuredWebClient.configure(
-            config.getKeystoreLocation(),
-            config.getKeystorePassword(),
-            config.getTruststoreLocation(),
-            config.getTruststorePassword()
-        );
-
-        return webClient.exchangeStrategies(strategies).build();
-      } catch (Exception e) {
-        throw new IllegalStateException(
-            "cannot create TLS configuration for kafka-connect cluster " + config.getName(), e);
-      }
-    }
-
-    public static ObjectMapper buildObjectMapper(DateFormat dateFormat) {
-      ObjectMapper mapper = new ObjectMapper();
-      mapper.setDateFormat(dateFormat);
-      mapper.registerModule(new JavaTimeModule());
-      mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
-      JsonNullableModule jnm = new JsonNullableModule();
-      mapper.registerModule(jnm);
-      return mapper;
+    public static WebClient buildWebClient(DataSize maxBuffSize, ConnectCluster config) {
+      return new WebClientConfigurator()
+          .configureSsl(
+              config.getKeystoreLocation(),
+              config.getKeystorePassword(),
+              config.getTruststoreLocation(),
+              config.getTruststorePassword()
+          )
+          .configureBasicAuth(
+              config.getUserName(),
+              config.getPassword()
+          )
+          .configureBufferSize(maxBuffSize)
+          .build();
     }
 
     @Override

+ 6 - 0
kafka-ui-api/src/main/java/com/provectus/kafka/ui/config/ClustersProperties.java

@@ -8,7 +8,10 @@ import java.util.Map;
 import java.util.Properties;
 import java.util.Set;
 import javax.annotation.PostConstruct;
+import lombok.AllArgsConstructor;
+import lombok.Builder;
 import lombok.Data;
+import lombok.NoArgsConstructor;
 import lombok.ToString;
 import org.springframework.boot.context.properties.ConfigurationProperties;
 import org.springframework.context.annotation.Configuration;
@@ -53,6 +56,9 @@ public class ClustersProperties {
   }
 
   @Data
+  @NoArgsConstructor
+  @AllArgsConstructor
+  @Builder(toBuilder = true)
   public static class ConnectCluster {
     String name;
     String address;

+ 3 - 3
kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/KafkaConnectController.java

@@ -234,9 +234,9 @@ public class KafkaConnectController extends AbstractController implements KafkaC
         .build());
 
     return validateAccess.then(
-        kafkaConnectService
-            .getConnectorPlugins(getCluster(clusterName), connectName)
-            .map(ResponseEntity::ok)
+        Mono.just(
+            ResponseEntity.ok(
+                kafkaConnectService.getConnectorPlugins(getCluster(clusterName), connectName)))
     );
   }
 

+ 64 - 61
kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/SchemasController.java

@@ -2,7 +2,8 @@ package com.provectus.kafka.ui.controller;
 
 import com.provectus.kafka.ui.api.SchemasApi;
 import com.provectus.kafka.ui.exception.ValidationException;
-import com.provectus.kafka.ui.mapper.ClusterMapper;
+import com.provectus.kafka.ui.mapper.KafkaSrMapper;
+import com.provectus.kafka.ui.mapper.KafkaSrMapperImpl;
 import com.provectus.kafka.ui.model.CompatibilityCheckResponseDTO;
 import com.provectus.kafka.ui.model.CompatibilityLevelDTO;
 import com.provectus.kafka.ui.model.KafkaCluster;
@@ -32,7 +33,7 @@ public class SchemasController extends AbstractController implements SchemasApi
 
   private static final Integer DEFAULT_PAGE_SIZE = 25;
 
-  private final ClusterMapper mapper;
+  private final KafkaSrMapper kafkaSrMapper = new KafkaSrMapperImpl();
 
   private final SchemaRegistryService schemaRegistryService;
   private final AccessControlService accessControlService;
@@ -40,7 +41,7 @@ public class SchemasController extends AbstractController implements SchemasApi
   @Override
   protected KafkaCluster getCluster(String clusterName) {
     var c = super.getCluster(clusterName);
-    if (c.getSchemaRegistry() == null) {
+    if (c.getSchemaRegistryClient() == null) {
       throw new ValidationException("Schema Registry is not set for cluster " + clusterName);
     }
     return c;
@@ -48,9 +49,8 @@ public class SchemasController extends AbstractController implements SchemasApi
 
   @Override
   public Mono<ResponseEntity<CompatibilityCheckResponseDTO>> checkSchemaCompatibility(
-      String clusterName, String subject, @Valid Mono<NewSchemaSubjectDTO> newSchemaSubject,
+      String clusterName, String subject, @Valid Mono<NewSchemaSubjectDTO> newSchemaSubjectMono,
       ServerWebExchange exchange) {
-
     Mono<Void> validateAccess = accessControlService.validateAccess(AccessContext.builder()
         .cluster(clusterName)
         .schema(subject)
@@ -58,37 +58,41 @@ public class SchemasController extends AbstractController implements SchemasApi
         .build());
 
     return validateAccess.then(
-        schemaRegistryService.checksSchemaCompatibility(
-                getCluster(clusterName), subject, newSchemaSubject)
-            .map(mapper::toCompatibilityCheckResponse)
+        newSchemaSubjectMono.flatMap(subjectDTO ->
+                schemaRegistryService.checksSchemaCompatibility(
+                    getCluster(clusterName),
+                    subject,
+                    kafkaSrMapper.fromDto(subjectDTO)
+                ))
+            .map(kafkaSrMapper::toDto)
             .map(ResponseEntity::ok)
     );
   }
 
   @Override
   public Mono<ResponseEntity<SchemaSubjectDTO>> createNewSchema(
-      String clusterName, @Valid Mono<NewSchemaSubjectDTO> newSchemaSubject,
+      String clusterName, @Valid Mono<NewSchemaSubjectDTO> newSchemaSubjectMono,
       ServerWebExchange exchange) {
+    Mono<Void> validateAccess = accessControlService.validateAccess(AccessContext.builder()
+        .cluster(clusterName)
+        .schemaActions(SchemaAction.CREATE)
+        .build());
 
-    return newSchemaSubject.flatMap(dto -> {
-      Mono<Void> validateAccess = accessControlService.validateAccess(AccessContext.builder()
-          .cluster(clusterName)
-          .schemaActions(SchemaAction.CREATE)
-          .build());
-
-      return validateAccess.then(
-          schemaRegistryService
-              .registerNewSchema(getCluster(clusterName), dto)
-              .map(ResponseEntity::ok)
-      );
-    });
+    return validateAccess.then(
+        newSchemaSubjectMono.flatMap(newSubject ->
+                schemaRegistryService.registerNewSchema(
+                    getCluster(clusterName),
+                    newSubject.getSubject(),
+                    kafkaSrMapper.fromDto(newSubject)
+                )
+            ).map(kafkaSrMapper::toDto)
+            .map(ResponseEntity::ok)
+    );
   }
 
   @Override
-  public Mono<ResponseEntity<Void>> deleteLatestSchema(String clusterName,
-                                                       String subject,
-                                                       ServerWebExchange exchange) {
-
+  public Mono<ResponseEntity<Void>> deleteLatestSchema(
+      String clusterName, String subject, ServerWebExchange exchange) {
     Mono<Void> validateAccess = accessControlService.validateAccess(AccessContext.builder()
         .cluster(clusterName)
         .schema(subject)
@@ -102,9 +106,8 @@ public class SchemasController extends AbstractController implements SchemasApi
   }
 
   @Override
-  public Mono<ResponseEntity<Void>> deleteSchema(String clusterName,
-                                                 String subject,
-                                                 ServerWebExchange exchange) {
+  public Mono<ResponseEntity<Void>> deleteSchema(
+      String clusterName, String subject, ServerWebExchange exchange) {
     Mono<Void> validateAccess = accessControlService.validateAccess(AccessContext.builder()
         .cluster(clusterName)
         .schema(subject)
@@ -118,36 +121,32 @@ public class SchemasController extends AbstractController implements SchemasApi
   }
 
   @Override
-  public Mono<ResponseEntity<Void>> deleteSchemaByVersion(String clusterName,
-                                                          String subject,
-                                                          Integer version,
-                                                          ServerWebExchange exchange) {
-
+  public Mono<ResponseEntity<Void>> deleteSchemaByVersion(
+      String clusterName, String subjectName, Integer version, ServerWebExchange exchange) {
     Mono<Void> validateAccess = accessControlService.validateAccess(AccessContext.builder()
         .cluster(clusterName)
-        .schema(subject)
+        .schema(subjectName)
         .schemaActions(SchemaAction.DELETE)
         .build());
 
     return validateAccess.then(
-        schemaRegistryService.deleteSchemaSubjectByVersion(getCluster(clusterName), subject, version)
+        schemaRegistryService.deleteSchemaSubjectByVersion(getCluster(clusterName), subjectName, version)
             .thenReturn(ResponseEntity.ok().build())
     );
   }
 
   @Override
   public Mono<ResponseEntity<Flux<SchemaSubjectDTO>>> getAllVersionsBySubject(
-      String clusterName, String subject, ServerWebExchange exchange) {
-
+      String clusterName, String subjectName, ServerWebExchange exchange) {
     Mono<Void> validateAccess = accessControlService.validateAccess(AccessContext.builder()
         .cluster(clusterName)
-        .schema(subject)
+        .schema(subjectName)
         .schemaActions(SchemaAction.VIEW)
         .build());
 
     Flux<SchemaSubjectDTO> schemas =
-        schemaRegistryService.getAllVersionsBySubject(getCluster(clusterName), subject);
-
+        schemaRegistryService.getAllVersionsBySubject(getCluster(clusterName), subjectName)
+            .map(kafkaSrMapper::toDto);
     return validateAccess.thenReturn(ResponseEntity.ok(schemas));
   }
 
@@ -155,7 +154,7 @@ public class SchemasController extends AbstractController implements SchemasApi
   public Mono<ResponseEntity<CompatibilityLevelDTO>> getGlobalSchemaCompatibilityLevel(
       String clusterName, ServerWebExchange exchange) {
     return schemaRegistryService.getGlobalSchemaCompatibilityLevel(getCluster(clusterName))
-        .map(mapper::toCompatibilityLevelDto)
+        .map(c -> new CompatibilityLevelDTO().compatibility(kafkaSrMapper.toDto(c)))
         .map(ResponseEntity::ok)
         .defaultIfEmpty(ResponseEntity.notFound().build());
   }
@@ -172,6 +171,7 @@ public class SchemasController extends AbstractController implements SchemasApi
 
     return validateAccess.then(
         schemaRegistryService.getLatestSchemaVersionBySubject(getCluster(clusterName), subject)
+            .map(kafkaSrMapper::toDto)
             .map(ResponseEntity::ok)
     );
   }
@@ -179,7 +179,6 @@ public class SchemasController extends AbstractController implements SchemasApi
   @Override
   public Mono<ResponseEntity<SchemaSubjectDTO>> getSchemaByVersion(
       String clusterName, String subject, Integer version, ServerWebExchange exchange) {
-
     Mono<Void> validateAccess = accessControlService.validateAccess(AccessContext.builder()
         .cluster(clusterName)
         .schema(subject)
@@ -189,6 +188,7 @@ public class SchemasController extends AbstractController implements SchemasApi
     return validateAccess.then(
         schemaRegistryService.getSchemaSubjectByVersion(
                 getCluster(clusterName), subject, version)
+            .map(kafkaSrMapper::toDto)
             .map(ResponseEntity::ok)
     );
   }
@@ -198,10 +198,10 @@ public class SchemasController extends AbstractController implements SchemasApi
                                                                     @Valid Integer pageNum,
                                                                     @Valid Integer perPage,
                                                                     @Valid String search,
-                                                                    ServerWebExchange exchange) {
+                                                                    ServerWebExchange serverWebExchange) {
     return schemaRegistryService
         .getAllSubjectNames(getCluster(clusterName))
-        .flatMapMany(Flux::fromArray)
+        .flatMapIterable(l -> l)
         .filterWhen(schema -> accessControlService.isSchemaAccessible(schema, clusterName))
         .collectList()
         .flatMap(subjects -> {
@@ -218,46 +218,49 @@ public class SchemasController extends AbstractController implements SchemasApi
               .limit(pageSize)
               .collect(Collectors.toList());
           return schemaRegistryService.getAllLatestVersionSchemas(getCluster(clusterName), subjectsToRender)
-              .map(a -> new SchemaSubjectsResponseDTO().pageCount(totalPages).schemas(a));
-        })
-        .map(ResponseEntity::ok);
+              .map(subjs -> subjs.stream().map(kafkaSrMapper::toDto).toList())
+              .map(subjs -> new SchemaSubjectsResponseDTO().pageCount(totalPages).schemas(subjs));
+        }).map(ResponseEntity::ok);
   }
 
   @Override
   public Mono<ResponseEntity<Void>> updateGlobalSchemaCompatibilityLevel(
-      String clusterName, @Valid Mono<CompatibilityLevelDTO> compatibilityLevel,
+      String clusterName, @Valid Mono<CompatibilityLevelDTO> compatibilityLevelMono,
       ServerWebExchange exchange) {
-
     Mono<Void> validateAccess = accessControlService.validateAccess(AccessContext.builder()
         .cluster(clusterName)
         .schemaActions(SchemaAction.MODIFY_GLOBAL_COMPATIBILITY)
         .build());
 
-    log.info("Updating schema compatibility globally");
-
     return validateAccess.then(
-        schemaRegistryService.updateSchemaCompatibility(
-                getCluster(clusterName), compatibilityLevel)
-            .map(ResponseEntity::ok)
+        compatibilityLevelMono
+            .flatMap(compatibilityLevelDTO ->
+                schemaRegistryService.updateGlobalSchemaCompatibility(
+                    getCluster(clusterName),
+                    kafkaSrMapper.fromDto(compatibilityLevelDTO.getCompatibility())
+                ))
+            .thenReturn(ResponseEntity.ok().build())
     );
   }
 
   @Override
   public Mono<ResponseEntity<Void>> updateSchemaCompatibilityLevel(
-      String clusterName, String subject, @Valid Mono<CompatibilityLevelDTO> compatibilityLevel,
+      String clusterName, String subject, @Valid Mono<CompatibilityLevelDTO> compatibilityLevelMono,
       ServerWebExchange exchange) {
-
     Mono<Void> validateAccess = accessControlService.validateAccess(AccessContext.builder()
         .cluster(clusterName)
         .schemaActions(SchemaAction.EDIT)
         .build());
 
-    log.info("Updating schema compatibility for subject: {}", subject);
-
     return validateAccess.then(
-        schemaRegistryService.updateSchemaCompatibility(
-                getCluster(clusterName), subject, compatibilityLevel)
-            .map(ResponseEntity::ok)
+        compatibilityLevelMono
+            .flatMap(compatibilityLevelDTO ->
+                schemaRegistryService.updateSchemaCompatibility(
+                    getCluster(clusterName),
+                    subject,
+                    kafkaSrMapper.fromDto(compatibilityLevelDTO.getCompatibility())
+                ))
+            .thenReturn(ResponseEntity.ok().build())
     );
   }
 }

+ 2 - 2
kafka-ui-api/src/main/java/com/provectus/kafka/ui/exception/SchemaCompatibilityException.java

@@ -1,8 +1,8 @@
 package com.provectus.kafka.ui.exception;
 
 public class SchemaCompatibilityException extends CustomBaseException {
-  public SchemaCompatibilityException(String message) {
-    super(message);
+  public SchemaCompatibilityException() {
+    super("Schema being registered is incompatible with an earlier schema");
   }
 
   @Override

+ 0 - 12
kafka-ui-api/src/main/java/com/provectus/kafka/ui/exception/SchemaTypeNotSupportedException.java

@@ -1,12 +0,0 @@
-package com.provectus.kafka.ui.exception;
-
-public class SchemaTypeNotSupportedException extends UnprocessableEntityException {
-
-  private static final String REQUIRED_SCHEMA_REGISTRY_VERSION = "5.5.0";
-
-  public SchemaTypeNotSupportedException() {
-    super(String.format("Current version of Schema Registry does "
-        + "not support provided schema type,"
-        + " version %s or later is required here.", REQUIRED_SCHEMA_REGISTRY_VERSION));
-  }
-}

+ 1 - 99
kafka-ui-api/src/main/java/com/provectus/kafka/ui/mapper/ClusterMapper.java

@@ -8,25 +8,18 @@ import com.provectus.kafka.ui.model.BrokerMetricsDTO;
 import com.provectus.kafka.ui.model.ClusterDTO;
 import com.provectus.kafka.ui.model.ClusterMetricsDTO;
 import com.provectus.kafka.ui.model.ClusterStatsDTO;
-import com.provectus.kafka.ui.model.CompatibilityCheckResponseDTO;
-import com.provectus.kafka.ui.model.CompatibilityLevelDTO;
 import com.provectus.kafka.ui.model.ConfigSourceDTO;
 import com.provectus.kafka.ui.model.ConfigSynonymDTO;
 import com.provectus.kafka.ui.model.ConnectDTO;
-import com.provectus.kafka.ui.model.FailoverUrlList;
 import com.provectus.kafka.ui.model.Feature;
 import com.provectus.kafka.ui.model.InternalBroker;
 import com.provectus.kafka.ui.model.InternalBrokerConfig;
 import com.provectus.kafka.ui.model.InternalBrokerDiskUsage;
 import com.provectus.kafka.ui.model.InternalClusterState;
-import com.provectus.kafka.ui.model.InternalKsqlServer;
 import com.provectus.kafka.ui.model.InternalPartition;
 import com.provectus.kafka.ui.model.InternalReplica;
-import com.provectus.kafka.ui.model.InternalSchemaRegistry;
 import com.provectus.kafka.ui.model.InternalTopic;
 import com.provectus.kafka.ui.model.InternalTopicConfig;
-import com.provectus.kafka.ui.model.KafkaCluster;
-import com.provectus.kafka.ui.model.KafkaConnectCluster;
 import com.provectus.kafka.ui.model.MetricDTO;
 import com.provectus.kafka.ui.model.Metrics;
 import com.provectus.kafka.ui.model.PartitionDTO;
@@ -34,35 +27,20 @@ import com.provectus.kafka.ui.model.ReplicaDTO;
 import com.provectus.kafka.ui.model.TopicConfigDTO;
 import com.provectus.kafka.ui.model.TopicDTO;
 import com.provectus.kafka.ui.model.TopicDetailsDTO;
-import com.provectus.kafka.ui.model.schemaregistry.InternalCompatibilityCheck;
-import com.provectus.kafka.ui.model.schemaregistry.InternalCompatibilityLevel;
 import com.provectus.kafka.ui.service.masking.DataMasking;
 import com.provectus.kafka.ui.service.metrics.RawMetric;
-import com.provectus.kafka.ui.util.PollingThrottler;
-import java.util.Arrays;
-import java.util.Collections;
 import java.util.List;
 import java.util.Map;
-import java.util.Properties;
-import java.util.function.Supplier;
 import java.util.stream.Collectors;
 import org.apache.kafka.clients.admin.ConfigEntry;
 import org.mapstruct.Mapper;
 import org.mapstruct.Mapping;
-import org.mapstruct.Named;
 
 @Mapper(componentModel = "spring")
 public interface ClusterMapper {
 
   ClusterDTO toCluster(InternalClusterState clusterState);
 
-  @Mapping(target = "properties", source = "properties", qualifiedByName = "setProperties")
-  @Mapping(target = "schemaRegistry", source = ".", qualifiedByName = "setSchemaRegistry")
-  @Mapping(target = "ksqldbServer", source = ".", qualifiedByName = "setKsqldbServer")
-  @Mapping(target = "metricsConfig", source = "metrics")
-  @Mapping(target = "throttler", source = ".", qualifiedByName = "createClusterThrottler")
-  KafkaCluster toKafkaCluster(ClustersProperties.Cluster clusterProperties);
-
   ClusterStatsDTO toClusterStats(InternalClusterState clusterState);
 
   default ClusterMetricsDTO toClusterMetrics(Metrics metrics) {
@@ -107,67 +85,6 @@ public interface ClusterMapper {
 
   BrokerDTO toBrokerDto(InternalBroker broker);
 
-  @Named("setSchemaRegistry")
-  default InternalSchemaRegistry setSchemaRegistry(ClustersProperties.Cluster clusterProperties) {
-    if (clusterProperties == null
-        || clusterProperties.getSchemaRegistry() == null) {
-      return null;
-    }
-
-    InternalSchemaRegistry.InternalSchemaRegistryBuilder internalSchemaRegistry =
-        InternalSchemaRegistry.builder();
-
-    internalSchemaRegistry.url(
-        clusterProperties.getSchemaRegistry() != null
-            ? new FailoverUrlList(Arrays.asList(clusterProperties.getSchemaRegistry().split(",")))
-            : new FailoverUrlList(Collections.emptyList())
-    );
-
-    if (clusterProperties.getSchemaRegistryAuth() != null) {
-      internalSchemaRegistry.username(clusterProperties.getSchemaRegistryAuth().getUsername());
-      internalSchemaRegistry.password(clusterProperties.getSchemaRegistryAuth().getPassword());
-    }
-
-    if (clusterProperties.getSchemaRegistrySsl() != null) {
-      internalSchemaRegistry.keystoreLocation(clusterProperties.getSchemaRegistrySsl().getKeystoreLocation());
-      internalSchemaRegistry.keystorePassword(clusterProperties.getSchemaRegistrySsl().getKeystorePassword());
-      internalSchemaRegistry.truststoreLocation(clusterProperties.getSchemaRegistrySsl().getTruststoreLocation());
-      internalSchemaRegistry.truststorePassword(clusterProperties.getSchemaRegistrySsl().getTruststorePassword());
-    }
-
-    return internalSchemaRegistry.build();
-  }
-
-  @Named("setKsqldbServer")
-  default InternalKsqlServer setKsqldbServer(ClustersProperties.Cluster clusterProperties) {
-    if (clusterProperties == null
-            || clusterProperties.getKsqldbServer() == null) {
-      return null;
-    }
-
-    InternalKsqlServer.InternalKsqlServerBuilder internalKsqlServerBuilder =
-            InternalKsqlServer.builder().url(clusterProperties.getKsqldbServer());
-
-    if (clusterProperties.getKsqldbServerAuth() != null) {
-      internalKsqlServerBuilder.username(clusterProperties.getKsqldbServerAuth().getUsername());
-      internalKsqlServerBuilder.password(clusterProperties.getKsqldbServerAuth().getPassword());
-    }
-
-    if (clusterProperties.getKsqldbServerSsl() != null) {
-      internalKsqlServerBuilder.keystoreLocation(clusterProperties.getKsqldbServerSsl().getKeystoreLocation());
-      internalKsqlServerBuilder.keystorePassword(clusterProperties.getKsqldbServerSsl().getKeystorePassword());
-      internalKsqlServerBuilder.truststoreLocation(clusterProperties.getKsqldbServerSsl().getTruststoreLocation());
-      internalKsqlServerBuilder.truststorePassword(clusterProperties.getKsqldbServerSsl().getTruststorePassword());
-    }
-
-    return internalKsqlServerBuilder.build();
-  }
-
-  @Named("createClusterThrottler")
-  default Supplier<PollingThrottler> createClusterThrottler(ClustersProperties.Cluster cluster) {
-    return PollingThrottler.throttlerSupplier(cluster);
-  }
-
   TopicDetailsDTO toTopicDetails(InternalTopic topic);
 
   @Mapping(target = "isReadOnly", source = "readOnly")
@@ -176,16 +93,10 @@ public interface ClusterMapper {
 
   ReplicaDTO toReplica(InternalReplica replica);
 
-  ConnectDTO toKafkaConnect(KafkaConnectCluster connect);
+  ConnectDTO toKafkaConnect(ClustersProperties.ConnectCluster connect);
 
   List<ClusterDTO.FeaturesEnum> toFeaturesEnum(List<Feature> features);
 
-  @Mapping(target = "isCompatible", source = "compatible")
-  CompatibilityCheckResponseDTO toCompatibilityCheckResponse(InternalCompatibilityCheck dto);
-
-  @Mapping(target = "compatibility", source = "compatibilityLevel")
-  CompatibilityLevelDTO toCompatibilityLevelDto(InternalCompatibilityLevel dto);
-
   default List<PartitionDTO> map(Map<Integer, InternalPartition> map) {
     return map.values().stream().map(this::toPartition).collect(Collectors.toList());
   }
@@ -202,13 +113,4 @@ public interface ClusterMapper {
     return DataMasking.create(maskingProperties);
   }
 
-  @Named("setProperties")
-  default Properties setProperties(Properties properties) {
-    Properties copy = new Properties();
-    if (properties != null) {
-      copy.putAll(properties);
-    }
-    return copy;
-  }
-
 }

+ 37 - 0
kafka-ui-api/src/main/java/com/provectus/kafka/ui/mapper/KafkaSrMapper.java

@@ -0,0 +1,37 @@
+package com.provectus.kafka.ui.mapper;
+
+import com.provectus.kafka.ui.model.CompatibilityCheckResponseDTO;
+import com.provectus.kafka.ui.model.CompatibilityLevelDTO;
+import com.provectus.kafka.ui.model.NewSchemaSubjectDTO;
+import com.provectus.kafka.ui.model.SchemaSubjectDTO;
+import com.provectus.kafka.ui.model.SchemaTypeDTO;
+import com.provectus.kafka.ui.service.SchemaRegistryService;
+import com.provectus.kafka.ui.sr.model.Compatibility;
+import com.provectus.kafka.ui.sr.model.CompatibilityCheckResponse;
+import com.provectus.kafka.ui.sr.model.NewSubject;
+import com.provectus.kafka.ui.sr.model.SchemaType;
+import java.util.Optional;
+import org.mapstruct.Mapper;
+
+
+@Mapper(componentModel = "spring")
+public interface KafkaSrMapper {
+
+  default SchemaSubjectDTO toDto(SchemaRegistryService.SubjectWithCompatibilityLevel s) {
+    return new SchemaSubjectDTO()
+        .id(s.getId())
+        .version(s.getVersion())
+        .subject(s.getSubject())
+        .schema(s.getSchema())
+        .schemaType(SchemaTypeDTO.fromValue(Optional.ofNullable(s.getSchemaType()).orElse(SchemaType.AVRO).getValue()))
+        .compatibilityLevel(s.getCompatibility().toString());
+  }
+
+  CompatibilityCheckResponseDTO toDto(CompatibilityCheckResponse ccr);
+
+  CompatibilityLevelDTO.CompatibilityEnum toDto(Compatibility compatibility);
+
+  NewSubject fromDto(NewSchemaSubjectDTO subjectDto);
+
+  Compatibility fromDto(CompatibilityLevelDTO.CompatibilityEnum dtoEnum);
+}

+ 0 - 59
kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/FailoverUrlList.java

@@ -1,59 +0,0 @@
-package com.provectus.kafka.ui.model;
-
-import java.time.Instant;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.atomic.AtomicInteger;
-import lombok.experimental.Delegate;
-
-public class FailoverUrlList {
-
-  public static final int DEFAULT_RETRY_GRACE_PERIOD_IN_MS = 5000;
-
-  private final Map<Integer, Instant> failures = new ConcurrentHashMap<>();
-  private final AtomicInteger index = new AtomicInteger(0);
-  @Delegate
-  private final List<String> urls;
-  private final int retryGracePeriodInMs;
-
-  public FailoverUrlList(List<String> urls) {
-    this(urls, DEFAULT_RETRY_GRACE_PERIOD_IN_MS);
-  }
-
-  public FailoverUrlList(List<String> urls, int retryGracePeriodInMs) {
-    if (urls != null && !urls.isEmpty()) {
-      this.urls = new ArrayList<>(urls);
-    } else {
-      throw new IllegalArgumentException("Expected at least one URL to be passed in constructor");
-    }
-    this.retryGracePeriodInMs = retryGracePeriodInMs;
-  }
-
-  public String current() {
-    return this.urls.get(this.index.get());
-  }
-
-  public void fail(String url) {
-    int currentIndex = this.index.get();
-    if ((this.urls.get(currentIndex)).equals(url)) {
-      this.failures.put(currentIndex, Instant.now());
-      this.index.compareAndSet(currentIndex, (currentIndex + 1) % this.urls.size());
-    }
-  }
-
-  public boolean isFailoverAvailable() {
-    var now = Instant.now();
-    return this.urls.size() > this.failures.size()
-            || this.failures
-                    .values()
-                    .stream()
-                    .anyMatch(e -> now.isAfter(e.plusMillis(retryGracePeriodInMs)));
-  }
-
-  @Override
-  public String toString() {
-    return this.urls.toString();
-  }
-}

+ 0 - 19
kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/InternalKsqlServer.java

@@ -1,19 +0,0 @@
-package com.provectus.kafka.ui.model;
-
-import lombok.Builder;
-import lombok.Data;
-import lombok.ToString;
-
-@Data
-@ToString(exclude = {"password", "keystorePassword", "truststorePassword"})
-@Builder(toBuilder = true)
-public class InternalKsqlServer {
-  private final String url;
-  private final String username;
-  private final String password;
-
-  private final String keystoreLocation;
-  private final String truststoreLocation;
-  private final String keystorePassword;
-  private final String truststorePassword;
-}

+ 0 - 33
kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/InternalSchemaRegistry.java

@@ -1,33 +0,0 @@
-package com.provectus.kafka.ui.model;
-
-import lombok.Builder;
-import lombok.Data;
-
-@Data
-@Builder(toBuilder = true)
-public class InternalSchemaRegistry {
-  private final String username;
-  private final String password;
-  private final FailoverUrlList url;
-
-  private final String keystoreLocation;
-  private final String truststoreLocation;
-  private final String keystorePassword;
-  private final String truststorePassword;
-
-  public String getPrimaryNodeUri() {
-    return url.get(0);
-  }
-
-  public String getUri() {
-    return url.current();
-  }
-
-  public void markAsUnavailable(String url) {
-    this.url.fail(url);
-  }
-
-  public boolean isFailoverAvailable() {
-    return this.url.isFailoverAvailable();
-  }
-}

+ 11 - 4
kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/KafkaCluster.java

@@ -1,8 +1,13 @@
 package com.provectus.kafka.ui.model;
 
+import com.provectus.kafka.ui.config.ClustersProperties;
+import com.provectus.kafka.ui.connect.api.KafkaConnectClientApi;
+import com.provectus.kafka.ui.service.ksql.KsqlApiClient;
 import com.provectus.kafka.ui.service.masking.DataMasking;
+import com.provectus.kafka.ui.sr.api.KafkaSrClientApi;
 import com.provectus.kafka.ui.util.PollingThrottler;
-import java.util.List;
+import com.provectus.kafka.ui.util.ReactiveFailover;
+import java.util.Map;
 import java.util.Properties;
 import java.util.function.Supplier;
 import lombok.AccessLevel;
@@ -14,16 +19,18 @@ import lombok.Data;
 @Builder(toBuilder = true)
 @AllArgsConstructor(access = AccessLevel.PRIVATE)
 public class KafkaCluster {
+  private final ClustersProperties.Cluster originalProperties;
+
   private final String name;
   private final String version;
   private final String bootstrapServers;
-  private final InternalSchemaRegistry schemaRegistry;
-  private final InternalKsqlServer ksqldbServer;
-  private final List<KafkaConnectCluster> kafkaConnect;
   private final Properties properties;
   private final boolean readOnly;
   private final boolean disableLogDirsCollection;
   private final MetricsConfig metricsConfig;
   private final DataMasking masking;
   private final Supplier<PollingThrottler> throttler;
+  private final ReactiveFailover<KafkaSrClientApi> schemaRegistryClient;
+  private final Map<String, ReactiveFailover<KafkaConnectClientApi>> connectsClients;
+  private final ReactiveFailover<KsqlApiClient> ksqlClient;
 }

+ 0 - 21
kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/KafkaConnectCluster.java

@@ -1,21 +0,0 @@
-package com.provectus.kafka.ui.model;
-
-import lombok.AccessLevel;
-import lombok.AllArgsConstructor;
-import lombok.Builder;
-import lombok.Data;
-
-@Data
-@Builder(toBuilder = true)
-@AllArgsConstructor(access = AccessLevel.PRIVATE)
-public class KafkaConnectCluster {
-  private final String name;
-  private final String address;
-  private final String userName;
-  private final String password;
-
-  private final String keystoreLocation;
-  private final String truststoreLocation;
-  private final String keystorePassword;
-  private final String truststorePassword;
-}

+ 2 - 3
kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/ClustersStorage.java

@@ -2,7 +2,6 @@ package com.provectus.kafka.ui.service;
 
 import com.google.common.collect.ImmutableMap;
 import com.provectus.kafka.ui.config.ClustersProperties;
-import com.provectus.kafka.ui.mapper.ClusterMapper;
 import com.provectus.kafka.ui.model.KafkaCluster;
 import java.util.Collection;
 import java.util.Optional;
@@ -13,9 +12,9 @@ public class ClustersStorage {
 
   private final ImmutableMap<String, KafkaCluster> kafkaClusters;
 
-  public ClustersStorage(ClustersProperties properties, ClusterMapper mapper) {
+  public ClustersStorage(ClustersProperties properties, KafkaClusterFactory factory) {
     var builder = ImmutableMap.<String, KafkaCluster>builder();
-    properties.getClusters().forEach(c -> builder.put(c.getName(), mapper.toKafkaCluster(c)));
+    properties.getClusters().forEach(c -> builder.put(c.getName(), factory.create(c)));
     this.kafkaClusters = builder.build();
   }
 

+ 5 - 4
kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/FeatureService.java

@@ -5,6 +5,7 @@ import com.provectus.kafka.ui.model.KafkaCluster;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
+import java.util.Map;
 import java.util.Optional;
 import java.util.function.Predicate;
 import javax.annotation.Nullable;
@@ -27,17 +28,17 @@ public class FeatureService {
   public Mono<List<Feature>> getAvailableFeatures(KafkaCluster cluster, @Nullable Node controller) {
     List<Mono<Feature>> features = new ArrayList<>();
 
-    if (Optional.ofNullable(cluster.getKafkaConnect())
-        .filter(Predicate.not(List::isEmpty))
+    if (Optional.ofNullable(cluster.getConnectsClients())
+        .filter(Predicate.not(Map::isEmpty))
         .isPresent()) {
       features.add(Mono.just(Feature.KAFKA_CONNECT));
     }
 
-    if (cluster.getKsqldbServer() != null) {
+    if (cluster.getKsqlClient() != null) {
       features.add(Mono.just(Feature.KSQL_DB));
     }
 
-    if (cluster.getSchemaRegistry() != null) {
+    if (cluster.getSchemaRegistryClient() != null) {
       features.add(Mono.just(Feature.SCHEMA_REGISTRY));
     }
 

+ 133 - 0
kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/KafkaClusterFactory.java

@@ -0,0 +1,133 @@
+package com.provectus.kafka.ui.service;
+
+import com.provectus.kafka.ui.client.RetryingKafkaConnectClient;
+import com.provectus.kafka.ui.config.ClustersProperties;
+import com.provectus.kafka.ui.connect.api.KafkaConnectClientApi;
+import com.provectus.kafka.ui.model.KafkaCluster;
+import com.provectus.kafka.ui.model.MetricsConfig;
+import com.provectus.kafka.ui.service.ksql.KsqlApiClient;
+import com.provectus.kafka.ui.service.masking.DataMasking;
+import com.provectus.kafka.ui.sr.ApiClient;
+import com.provectus.kafka.ui.sr.api.KafkaSrClientApi;
+import com.provectus.kafka.ui.util.PollingThrottler;
+import com.provectus.kafka.ui.util.ReactiveFailover;
+import com.provectus.kafka.ui.util.WebClientConfigurator;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.stream.Stream;
+import javax.annotation.Nullable;
+import lombok.RequiredArgsConstructor;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.stereotype.Service;
+import org.springframework.util.unit.DataSize;
+import org.springframework.web.reactive.function.client.WebClient;
+
+@Service
+@RequiredArgsConstructor
+public class KafkaClusterFactory {
+
+  @Value("${webclient.max-in-memory-buffer-size:20MB}")
+  private DataSize maxBuffSize;
+
+  public KafkaCluster create(ClustersProperties.Cluster clusterProperties) {
+    KafkaCluster.KafkaClusterBuilder builder = KafkaCluster.builder();
+
+    builder.name(clusterProperties.getName());
+    builder.bootstrapServers(clusterProperties.getBootstrapServers());
+    builder.properties(Optional.ofNullable(clusterProperties.getProperties()).orElse(new Properties()));
+    builder.readOnly(clusterProperties.isReadOnly());
+    builder.disableLogDirsCollection(clusterProperties.isDisableLogDirsCollection());
+    builder.masking(DataMasking.create(clusterProperties.getMasking()));
+    builder.metricsConfig(metricsConfigDataToMetricsConfig(clusterProperties.getMetrics()));
+    builder.throttler(PollingThrottler.throttlerSupplier(clusterProperties));
+
+    builder.schemaRegistryClient(schemaRegistryClient(clusterProperties));
+    builder.connectsClients(connectClients(clusterProperties));
+    builder.ksqlClient(ksqlClient(clusterProperties));
+
+    builder.originalProperties(clusterProperties);
+
+    return builder.build();
+  }
+
+  @Nullable
+  private Map<String, ReactiveFailover<KafkaConnectClientApi>> connectClients(
+      ClustersProperties.Cluster clusterProperties) {
+    if (clusterProperties.getKafkaConnect() == null) {
+      return null;
+    }
+    Map<String, ReactiveFailover<KafkaConnectClientApi>> connects = new HashMap<>();
+    clusterProperties.getKafkaConnect().forEach(c -> {
+      ReactiveFailover<KafkaConnectClientApi> failover = ReactiveFailover.create(
+          parseUrlList(c.getAddress()),
+          url -> new RetryingKafkaConnectClient(c.toBuilder().address(url).build(), maxBuffSize),
+          ReactiveFailover.CONNECTION_REFUSED_EXCEPTION_FILTER,
+          "No alive connect instances available",
+          ReactiveFailover.DEFAULT_RETRY_GRACE_PERIOD_MS
+      );
+      connects.put(c.getName(), failover);
+    });
+    return connects;
+  }
+
+  @Nullable
+  private ReactiveFailover<KafkaSrClientApi> schemaRegistryClient(ClustersProperties.Cluster clusterProperties) {
+    if (clusterProperties.getSchemaRegistry() == null) {
+      return null;
+    }
+    var auth = Optional.ofNullable(clusterProperties.getSchemaRegistryAuth())
+        .orElse(new ClustersProperties.SchemaRegistryAuth());
+    WebClient webClient = new WebClientConfigurator()
+        .configureSsl(clusterProperties.getSchemaRegistrySsl())
+        .configureBasicAuth(auth.getUsername(), auth.getPassword())
+        .build();
+    return ReactiveFailover.create(
+        parseUrlList(clusterProperties.getSchemaRegistry()),
+        url -> new KafkaSrClientApi(new ApiClient(webClient, null, null).setBasePath(url)),
+        ReactiveFailover.CONNECTION_REFUSED_EXCEPTION_FILTER,
+        "No live schemaRegistry instances available",
+        ReactiveFailover.DEFAULT_RETRY_GRACE_PERIOD_MS
+    );
+  }
+
+  @Nullable
+  private ReactiveFailover<KsqlApiClient> ksqlClient(ClustersProperties.Cluster clusterProperties) {
+    if (clusterProperties.getKsqldbServer() == null) {
+      return null;
+    }
+    return ReactiveFailover.create(
+        parseUrlList(clusterProperties.getKsqldbServer()),
+        url -> new KsqlApiClient(
+            url,
+            clusterProperties.getKsqldbServerAuth(),
+            clusterProperties.getKsqldbServerSsl(),
+            maxBuffSize
+        ),
+        ReactiveFailover.CONNECTION_REFUSED_EXCEPTION_FILTER,
+        "No live ksqldb instances available",
+        ReactiveFailover.DEFAULT_RETRY_GRACE_PERIOD_MS
+    );
+  }
+
+  private List<String> parseUrlList(String url) {
+    return Stream.of(url.split(",")).map(String::trim).filter(s -> !s.isBlank()).toList();
+  }
+
+  @Nullable
+  private MetricsConfig metricsConfigDataToMetricsConfig(ClustersProperties.MetricsConfigData metricsConfigData) {
+    if (metricsConfigData == null) {
+      return null;
+    }
+    MetricsConfig.MetricsConfigBuilder builder = MetricsConfig.builder();
+    builder.type(metricsConfigData.getType());
+    builder.port(metricsConfigData.getPort());
+    builder.ssl(metricsConfigData.isSsl());
+    builder.username(metricsConfigData.getUsername());
+    builder.password(metricsConfigData.getPassword());
+    return builder.build();
+  }
+
+}

+ 40 - 38
kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/KafkaConnectService.java

@@ -2,13 +2,12 @@ package com.provectus.kafka.ui.service;
 
 import com.fasterxml.jackson.core.type.TypeReference;
 import com.fasterxml.jackson.databind.ObjectMapper;
-import com.provectus.kafka.ui.client.KafkaConnectClientsFactory;
 import com.provectus.kafka.ui.connect.api.KafkaConnectClientApi;
 import com.provectus.kafka.ui.connect.model.ConnectorStatus;
 import com.provectus.kafka.ui.connect.model.ConnectorStatusConnector;
 import com.provectus.kafka.ui.connect.model.ConnectorTopics;
 import com.provectus.kafka.ui.connect.model.TaskStatus;
-import com.provectus.kafka.ui.exception.ConnectNotFoundException;
+import com.provectus.kafka.ui.exception.NotFoundException;
 import com.provectus.kafka.ui.exception.ValidationException;
 import com.provectus.kafka.ui.mapper.ClusterMapper;
 import com.provectus.kafka.ui.mapper.KafkaConnectMapper;
@@ -24,9 +23,11 @@ import com.provectus.kafka.ui.model.KafkaCluster;
 import com.provectus.kafka.ui.model.NewConnectorDTO;
 import com.provectus.kafka.ui.model.TaskDTO;
 import com.provectus.kafka.ui.model.connect.InternalConnectInfo;
+import com.provectus.kafka.ui.util.ReactiveFailover;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.function.Function;
 import java.util.function.Predicate;
 import java.util.stream.Collectors;
@@ -50,12 +51,11 @@ public class KafkaConnectService {
   private final KafkaConnectMapper kafkaConnectMapper;
   private final ObjectMapper objectMapper;
   private final KafkaConfigSanitizer kafkaConfigSanitizer;
-  private final KafkaConnectClientsFactory kafkaConnectClientsFactory;
 
   public List<ConnectDTO> getConnects(KafkaCluster cluster) {
-    return cluster.getKafkaConnect().stream()
-        .map(clusterMapper::toKafkaConnect)
-        .collect(Collectors.toList());
+    return Optional.ofNullable(cluster.getOriginalProperties().getKafkaConnect())
+        .map(lst -> lst.stream().map(clusterMapper::toKafkaConnect).toList())
+        .orElse(List.of());
   }
 
   public Flux<FullConnectorInfoDTO> getAllConnectors(final KafkaCluster cluster,
@@ -118,8 +118,9 @@ public class KafkaConnectService {
 
   private Mono<ConnectorTopics> getConnectorTopics(KafkaCluster cluster, String connectClusterName,
                                                    String connectorName) {
-    return withConnectClient(cluster, connectClusterName)
-        .flatMap(c -> c.getConnectorTopics(connectorName).map(result -> result.get(connectorName)))
+    return api(cluster, connectClusterName)
+        .mono(c -> c.getConnectorTopics(connectorName))
+        .map(result -> result.get(connectorName))
         // old connectors don't have this api, setting empty list for
         // backward-compatibility
         .onErrorResume(Exception.class, e -> Mono.just(new ConnectorTopics().topics(List.of())));
@@ -141,8 +142,8 @@ public class KafkaConnectService {
   }
 
   public Flux<String> getConnectors(KafkaCluster cluster, String connectName) {
-    return withConnectClient(cluster, connectName)
-        .flatMapMany(client ->
+    return api(cluster, connectName)
+        .flux(client ->
             client.getConnectors(null)
                 .doOnError(e -> log.error("Unexpected error upon getting connectors", e))
         );
@@ -150,8 +151,8 @@ public class KafkaConnectService {
 
   public Mono<ConnectorDTO> createConnector(KafkaCluster cluster, String connectName,
                                             Mono<NewConnectorDTO> connector) {
-    return withConnectClient(cluster, connectName)
-        .flatMap(client ->
+    return api(cluster, connectName)
+        .mono(client ->
             connector
                 .flatMap(c -> connectorExists(cluster, connectName, c.getName())
                     .map(exists -> {
@@ -177,8 +178,8 @@ public class KafkaConnectService {
 
   public Mono<ConnectorDTO> getConnector(KafkaCluster cluster, String connectName,
                                          String connectorName) {
-    return withConnectClient(cluster, connectName)
-        .flatMap(client -> client.getConnector(connectorName)
+    return api(cluster, connectName)
+        .mono(client -> client.getConnector(connectorName)
             .map(kafkaConnectMapper::fromClient)
             .flatMap(connector ->
                 client.getConnectorStatus(connector.getName())
@@ -226,8 +227,8 @@ public class KafkaConnectService {
 
   public Mono<Map<String, Object>> getConnectorConfig(KafkaCluster cluster, String connectName,
                                                       String connectorName) {
-    return withConnectClient(cluster, connectName)
-        .flatMap(c -> c.getConnectorConfig(connectorName))
+    return api(cluster, connectName)
+        .mono(c -> c.getConnectorConfig(connectorName))
         .map(connectorConfig -> {
           final Map<String, Object> obfuscatedMap = new HashMap<>();
           connectorConfig.forEach((key, value) ->
@@ -238,8 +239,8 @@ public class KafkaConnectService {
 
   public Mono<ConnectorDTO> setConnectorConfig(KafkaCluster cluster, String connectName,
                                                String connectorName, Mono<Object> requestBody) {
-    return withConnectClient(cluster, connectName)
-        .flatMap(c ->
+    return api(cluster, connectName)
+        .mono(c ->
             requestBody
                 .flatMap(body -> c.setConnectorConfig(connectorName, (Map<String, Object>) body))
                 .map(kafkaConnectMapper::fromClient));
@@ -247,14 +248,14 @@ public class KafkaConnectService {
 
   public Mono<Void> deleteConnector(
       KafkaCluster cluster, String connectName, String connectorName) {
-    return withConnectClient(cluster, connectName)
-        .flatMap(c -> c.deleteConnector(connectorName));
+    return api(cluster, connectName)
+        .mono(c -> c.deleteConnector(connectorName));
   }
 
   public Mono<Void> updateConnectorState(KafkaCluster cluster, String connectName,
                                          String connectorName, ConnectorActionDTO action) {
-    return withConnectClient(cluster, connectName)
-        .flatMap(client -> {
+    return api(cluster, connectName)
+        .mono(client -> {
           switch (action) {
             case RESTART:
               return client.restartConnector(connectorName, false, false);
@@ -283,8 +284,8 @@ public class KafkaConnectService {
   }
 
   public Flux<TaskDTO> getConnectorTasks(KafkaCluster cluster, String connectName, String connectorName) {
-    return withConnectClient(cluster, connectName)
-        .flatMapMany(client ->
+    return api(cluster, connectName)
+        .flux(client ->
             client.getConnectorTasks(connectorName)
                 .onErrorResume(WebClientResponseException.NotFound.class, e -> Flux.empty())
                 .map(kafkaConnectMapper::fromClient)
@@ -299,20 +300,20 @@ public class KafkaConnectService {
 
   public Mono<Void> restartConnectorTask(KafkaCluster cluster, String connectName,
                                          String connectorName, Integer taskId) {
-    return withConnectClient(cluster, connectName)
-        .flatMap(client -> client.restartConnectorTask(connectorName, taskId));
+    return api(cluster, connectName)
+        .mono(client -> client.restartConnectorTask(connectorName, taskId));
   }
 
-  public Mono<Flux<ConnectorPluginDTO>> getConnectorPlugins(KafkaCluster cluster,
-                                                            String connectName) {
-    return withConnectClient(cluster, connectName)
-        .map(client -> client.getConnectorPlugins().map(kafkaConnectMapper::fromClient));
+  public Flux<ConnectorPluginDTO> getConnectorPlugins(KafkaCluster cluster,
+                                                      String connectName) {
+    return api(cluster, connectName)
+        .flux(client -> client.getConnectorPlugins().map(kafkaConnectMapper::fromClient));
   }
 
   public Mono<ConnectorPluginConfigValidationResponseDTO> validateConnectorPluginConfig(
       KafkaCluster cluster, String connectName, String pluginName, Mono<Object> requestBody) {
-    return withConnectClient(cluster, connectName)
-        .flatMap(client ->
+    return api(cluster, connectName)
+        .mono(client ->
             requestBody
                 .flatMap(body ->
                     client.validateConnectorPluginConfig(pluginName, (Map<String, Object>) body))
@@ -320,11 +321,12 @@ public class KafkaConnectService {
         );
   }
 
-  private Mono<KafkaConnectClientApi> withConnectClient(KafkaCluster cluster, String connectName) {
-    return Mono.justOrEmpty(cluster.getKafkaConnect().stream()
-            .filter(connect -> connect.getName().equals(connectName))
-            .findFirst())
-        .switchIfEmpty(Mono.error(ConnectNotFoundException::new))
-        .map(kafkaConnectClientsFactory::withKafkaConnectConfig);
+  private ReactiveFailover<KafkaConnectClientApi> api(KafkaCluster cluster, String connectName) {
+    var client = cluster.getConnectsClients().get(connectName);
+    if (client == null) {
+      throw new NotFoundException(
+          "Connect %s not found for cluster %s".formatted(connectName, cluster.getName()));
+    }
+    return client;
   }
 }

+ 105 - 391
kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/SchemaRegistryService.java

@@ -1,54 +1,31 @@
 package com.provectus.kafka.ui.service;
 
-import static org.springframework.http.HttpStatus.CONFLICT;
-import static org.springframework.http.HttpStatus.NOT_FOUND;
-import static org.springframework.http.HttpStatus.UNPROCESSABLE_ENTITY;
-
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.json.JsonMapper;
 import com.provectus.kafka.ui.exception.SchemaCompatibilityException;
-import com.provectus.kafka.ui.exception.SchemaFailedToDeleteException;
 import com.provectus.kafka.ui.exception.SchemaNotFoundException;
-import com.provectus.kafka.ui.exception.SchemaTypeNotSupportedException;
-import com.provectus.kafka.ui.exception.UnprocessableEntityException;
 import com.provectus.kafka.ui.exception.ValidationException;
-import com.provectus.kafka.ui.model.CompatibilityLevelDTO;
-import com.provectus.kafka.ui.model.InternalSchemaRegistry;
 import com.provectus.kafka.ui.model.KafkaCluster;
-import com.provectus.kafka.ui.model.NewSchemaSubjectDTO;
-import com.provectus.kafka.ui.model.SchemaSubjectDTO;
-import com.provectus.kafka.ui.model.SchemaTypeDTO;
-import com.provectus.kafka.ui.model.schemaregistry.ErrorResponse;
-import com.provectus.kafka.ui.model.schemaregistry.InternalCompatibilityCheck;
-import com.provectus.kafka.ui.model.schemaregistry.InternalCompatibilityLevel;
-import com.provectus.kafka.ui.model.schemaregistry.InternalNewSchema;
-import com.provectus.kafka.ui.model.schemaregistry.SubjectIdResponse;
-import com.provectus.kafka.ui.util.SecuredWebClient;
+import com.provectus.kafka.ui.sr.api.KafkaSrClientApi;
+import com.provectus.kafka.ui.sr.model.Compatibility;
+import com.provectus.kafka.ui.sr.model.CompatibilityCheckResponse;
+import com.provectus.kafka.ui.sr.model.CompatibilityConfig;
+import com.provectus.kafka.ui.sr.model.CompatibilityLevelChange;
+import com.provectus.kafka.ui.sr.model.NewSubject;
+import com.provectus.kafka.ui.sr.model.SchemaSubject;
+import com.provectus.kafka.ui.util.ReactiveFailover;
+import com.provectus.kafka.ui.util.WebClientConfigurator;
 import java.io.IOException;
-import java.net.URI;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.Formatter;
 import java.util.List;
-import java.util.Objects;
-import java.util.Optional;
-import java.util.function.Function;
-import java.util.function.Supplier;
 import java.util.stream.Collectors;
+import lombok.AllArgsConstructor;
+import lombok.Getter;
 import lombok.RequiredArgsConstructor;
+import lombok.SneakyThrows;
+import lombok.experimental.Delegate;
 import lombok.extern.slf4j.Slf4j;
-import org.jetbrains.annotations.NotNull;
-import org.jetbrains.annotations.Nullable;
-import org.springframework.http.HttpHeaders;
-import org.springframework.http.HttpMethod;
-import org.springframework.http.HttpStatus;
-import org.springframework.http.MediaType;
 import org.springframework.stereotype.Service;
-import org.springframework.util.LinkedMultiValueMap;
-import org.springframework.util.MultiValueMap;
-import org.springframework.web.reactive.function.BodyInserters;
-import org.springframework.web.reactive.function.client.ClientResponse;
-import org.springframework.web.reactive.function.client.WebClient;
-import org.springframework.web.reactive.function.client.WebClientRequestException;
-import org.springframework.web.util.UriComponentsBuilder;
+import org.springframework.web.reactive.function.client.WebClientResponseException;
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
 
@@ -57,404 +34,141 @@ import reactor.core.publisher.Mono;
 @RequiredArgsConstructor
 public class SchemaRegistryService {
 
-  public static final String NO_SUCH_SCHEMA_VERSION = "No such schema %s with version %s";
-  public static final String NO_SUCH_SCHEMA = "No such schema %s";
-
-  private static final String URL_SUBJECTS = "/subjects";
-  private static final String URL_SUBJECT = "/subjects/{schemaName}";
-  private static final String URL_SUBJECT_VERSIONS = "/subjects/{schemaName}/versions";
-  private static final String URL_SUBJECT_BY_VERSION = "/subjects/{schemaName}/versions/{version}";
   private static final String LATEST = "latest";
 
-  private static final String UNRECOGNIZED_FIELD_SCHEMA_TYPE = "Unrecognized field: schemaType";
-  private static final String INCOMPATIBLE_WITH_AN_EARLIER_SCHEMA = "incompatible with an earlier schema";
-  private static final String INVALID_SCHEMA = "Invalid Schema";
+  @AllArgsConstructor
+  public static class SubjectWithCompatibilityLevel {
+    @Delegate
+    SchemaSubject subject;
+    @Getter
+    Compatibility compatibility;
+  }
+
+  private ReactiveFailover<KafkaSrClientApi> api(KafkaCluster cluster) {
+    return cluster.getSchemaRegistryClient();
+  }
 
-  public Mono<List<SchemaSubjectDTO>> getAllLatestVersionSchemas(KafkaCluster cluster,
-                                                                 List<String> subjects) {
+  public Mono<List<SubjectWithCompatibilityLevel>> getAllLatestVersionSchemas(KafkaCluster cluster,
+                                                                              List<String> subjects) {
     return Flux.fromIterable(subjects)
         .concatMap(subject -> getLatestSchemaVersionBySubject(cluster, subject))
         .collect(Collectors.toList());
   }
 
-  public Mono<String[]> getAllSubjectNames(KafkaCluster cluster) {
-    return configuredWebClient(
-        cluster,
-        HttpMethod.GET,
-        URL_SUBJECTS)
-        .retrieve()
-        .bodyToMono(String[].class)
-        .doOnError(e -> log.error("Unexpected error", e))
-        .as(m -> failoverAble(m,
-            new FailoverMono<>(cluster.getSchemaRegistry(), () -> this.getAllSubjectNames(cluster))));
+  public Mono<List<String>> getAllSubjectNames(KafkaCluster cluster) {
+    return api(cluster)
+        .mono(c -> c.getAllSubjectNames(null, false))
+        .flatMapIterable(this::parseSubjectListString)
+        .collectList();
+  }
+
+  @SneakyThrows
+  private List<String> parseSubjectListString(String subjectNamesStr) {
+    //workaround for https://github.com/spring-projects/spring-framework/issues/24734
+    return new JsonMapper().readValue(subjectNamesStr, new TypeReference<List<String>>() {
+    });
   }
 
-  public Flux<SchemaSubjectDTO> getAllVersionsBySubject(KafkaCluster cluster, String subject) {
+  public Flux<SubjectWithCompatibilityLevel> getAllVersionsBySubject(KafkaCluster cluster, String subject) {
     Flux<Integer> versions = getSubjectVersions(cluster, subject);
     return versions.flatMap(version -> getSchemaSubjectByVersion(cluster, subject, version));
   }
 
   private Flux<Integer> getSubjectVersions(KafkaCluster cluster, String schemaName) {
-    return configuredWebClient(
-        cluster,
-        HttpMethod.GET,
-        URL_SUBJECT_VERSIONS,
-        schemaName)
-        .retrieve()
-        .onStatus(NOT_FOUND::equals,
-            throwIfNotFoundStatus(formatted(NO_SUCH_SCHEMA, schemaName)))
-        .bodyToFlux(Integer.class)
-        .as(f -> failoverAble(f, new FailoverFlux<>(cluster.getSchemaRegistry(),
-            () -> this.getSubjectVersions(cluster, schemaName))));
+    return api(cluster).flux(c -> c.getSubjectVersions(schemaName));
   }
 
-  public Mono<SchemaSubjectDTO> getSchemaSubjectByVersion(KafkaCluster cluster, String schemaName,
-                                                          Integer version) {
-    return this.getSchemaSubject(cluster, schemaName, String.valueOf(version));
+  public Mono<SubjectWithCompatibilityLevel> getSchemaSubjectByVersion(KafkaCluster cluster,
+                                                                       String schemaName,
+                                                                       Integer version) {
+    return getSchemaSubject(cluster, schemaName, String.valueOf(version));
   }
 
-  public Mono<SchemaSubjectDTO> getLatestSchemaVersionBySubject(KafkaCluster cluster,
-                                                                String schemaName) {
-    return this.getSchemaSubject(cluster, schemaName, LATEST);
+  public Mono<SubjectWithCompatibilityLevel> getLatestSchemaVersionBySubject(KafkaCluster cluster,
+                                                                             String schemaName) {
+    return getSchemaSubject(cluster, schemaName, LATEST);
   }
 
-  private Mono<SchemaSubjectDTO> getSchemaSubject(KafkaCluster cluster, String schemaName,
-                                                  String version) {
-    return configuredWebClient(
-        cluster,
-        HttpMethod.GET,
-        SchemaRegistryService.URL_SUBJECT_BY_VERSION,
-        List.of(schemaName, version))
-        .retrieve()
-        .onStatus(NOT_FOUND::equals,
-            throwIfNotFoundStatus(formatted(NO_SUCH_SCHEMA_VERSION, schemaName, version))
-        )
-        .bodyToMono(SchemaSubjectDTO.class)
-        .map(this::withSchemaType)
+  private Mono<SubjectWithCompatibilityLevel> getSchemaSubject(KafkaCluster cluster, String schemaName,
+                                                               String version) {
+    return api(cluster)
+        .mono(c -> c.getSubjectVersion(schemaName, version))
         .zipWith(getSchemaCompatibilityInfoOrGlobal(cluster, schemaName))
-        .map(tuple -> {
-          SchemaSubjectDTO schema = tuple.getT1();
-          String compatibilityLevel = tuple.getT2().getCompatibilityLevel();
-          schema.setCompatibilityLevel(compatibilityLevel);
-          return schema;
-        })
-        .as(m -> failoverAble(m, new FailoverMono<>(cluster.getSchemaRegistry(),
-            () -> this.getSchemaSubject(cluster, schemaName, version))));
-  }
-
-  /**
-   * If {@link SchemaSubjectDTO#getSchemaType()} is null, then AVRO, otherwise,
-   * adds the schema type as is.
-   */
-  @NotNull
-  private SchemaSubjectDTO withSchemaType(SchemaSubjectDTO s) {
-    return s.schemaType(Optional.ofNullable(s.getSchemaType()).orElse(SchemaTypeDTO.AVRO));
+        .map(t -> new SubjectWithCompatibilityLevel(t.getT1(), t.getT2()))
+        .onErrorResume(WebClientResponseException.NotFound.class, th -> Mono.error(new SchemaNotFoundException()));
   }
 
-  public Mono<Void> deleteSchemaSubjectByVersion(KafkaCluster cluster,
-                                                 String schemaName,
-                                                 Integer version) {
-    return this.deleteSchemaSubject(cluster, schemaName, String.valueOf(version));
+  public Mono<Void> deleteSchemaSubjectByVersion(KafkaCluster cluster, String schemaName, Integer version) {
+    return deleteSchemaSubject(cluster, schemaName, String.valueOf(version));
   }
 
-  public Mono<Void> deleteLatestSchemaSubject(KafkaCluster cluster,
-                                              String schemaName) {
-    return this.deleteSchemaSubject(cluster, schemaName, LATEST);
+  public Mono<Void> deleteLatestSchemaSubject(KafkaCluster cluster, String schemaName) {
+    return deleteSchemaSubject(cluster, schemaName, LATEST);
   }
 
-  private Mono<Void> deleteSchemaSubject(KafkaCluster cluster, String schemaName,
-                                         String version) {
-    return configuredWebClient(
-        cluster,
-        HttpMethod.DELETE,
-        SchemaRegistryService.URL_SUBJECT_BY_VERSION,
-        List.of(schemaName, version))
-        .retrieve()
-        .onStatus(NOT_FOUND::equals,
-            throwIfNotFoundStatus(formatted(NO_SUCH_SCHEMA_VERSION, schemaName, version))
-        )
-        .toBodilessEntity()
-        .then()
-        .as(m -> failoverAble(m, new FailoverMono<>(cluster.getSchemaRegistry(),
-            () -> this.deleteSchemaSubject(cluster, schemaName, version))));
+  private Mono<Void> deleteSchemaSubject(KafkaCluster cluster, String schemaName, String version) {
+    return api(cluster).mono(c -> c.deleteSubjectVersion(schemaName, version, false));
   }
 
-  public Mono<Void> deleteSchemaSubjectEntirely(KafkaCluster cluster,
-                                                String schemaName) {
-    return configuredWebClient(
-        cluster,
-        HttpMethod.DELETE,
-        URL_SUBJECT,
-        schemaName)
-        .retrieve()
-        .onStatus(HttpStatus::isError, errorOnSchemaDeleteFailure(schemaName))
-        .toBodilessEntity()
-        .then()
-        .as(m -> failoverAble(m, new FailoverMono<>(cluster.getSchemaRegistry(),
-            () -> this.deleteSchemaSubjectEntirely(cluster, schemaName))));
+  public Mono<Void> deleteSchemaSubjectEntirely(KafkaCluster cluster, String schemaName) {
+    return api(cluster).mono(c -> c.deleteAllSubjectVersions(schemaName, false));
   }
 
   /**
    * Checks whether the provided schema duplicates the previous or not, creates a new schema
    * and then returns the whole content by requesting its latest version.
    */
-  public Mono<SchemaSubjectDTO> registerNewSchema(KafkaCluster cluster,
-                                                  NewSchemaSubjectDTO dto) {
-    SchemaTypeDTO schemaType = SchemaTypeDTO.AVRO == dto.getSchemaType() ? null : dto.getSchemaType();
-    Mono<InternalNewSchema> newSchema = Mono.just(new InternalNewSchema(dto.getSchema(), schemaType));
-    String subject = dto.getSubject();
-    return submitNewSchema(subject, newSchema, cluster)
-        .flatMap(resp -> getLatestSchemaVersionBySubject(cluster, subject));
-  }
-
-  @NotNull
-  private Mono<SubjectIdResponse> submitNewSchema(String subject,
-                                                  Mono<InternalNewSchema> newSchemaSubject,
-                                                  KafkaCluster cluster) {
-    return configuredWebClient(
-        cluster,
-        HttpMethod.POST,
-        URL_SUBJECT_VERSIONS, subject)
-        .contentType(MediaType.APPLICATION_JSON)
-        .body(BodyInserters.fromPublisher(newSchemaSubject, InternalNewSchema.class))
-        .retrieve()
-        .onStatus(status -> UNPROCESSABLE_ENTITY.equals(status) || CONFLICT.equals(status),
-            r -> r.bodyToMono(ErrorResponse.class)
-                .flatMap(this::getMonoError))
-        .bodyToMono(SubjectIdResponse.class)
-        .as(m -> failoverAble(m, new FailoverMono<>(cluster.getSchemaRegistry(),
-            () -> submitNewSchema(subject, newSchemaSubject, cluster))));
-  }
-
-  @NotNull
-  private Mono<Throwable> getMonoError(ErrorResponse x) {
-    if (isUnrecognizedFieldSchemaTypeMessage(x.getMessage())) {
-      return Mono.error(new SchemaTypeNotSupportedException());
-    } else if (isIncompatibleSchemaMessage(x.getMessage())) {
-      return Mono.error(new SchemaCompatibilityException(x.getMessage()));
-    } else {
-      log.error(x.getMessage());
-      return Mono.error(new UnprocessableEntityException(INVALID_SCHEMA));
-    }
-  }
-
-  @NotNull
-  private Function<ClientResponse, Mono<? extends Throwable>> throwIfNotFoundStatus(
-      String formatted) {
-    return resp -> Mono.error(new SchemaNotFoundException(formatted));
-  }
-
-  /**
-   * Updates a compatibility level for a <code>schemaName</code>.
-   *
-   * @param schemaName is a schema subject name
-   * @see com.provectus.kafka.ui.model.CompatibilityLevelDTO.CompatibilityEnum
-   */
-  public Mono<Void> updateSchemaCompatibility(KafkaCluster cluster, @Nullable String schemaName,
-                                              Mono<CompatibilityLevelDTO> compatibilityLevel) {
-    String configEndpoint = Objects.isNull(schemaName) ? "/config" : "/config/{schemaName}";
-    return configuredWebClient(
-        cluster,
-        HttpMethod.PUT,
-        configEndpoint,
-        schemaName)
-        .contentType(MediaType.APPLICATION_JSON)
-        .body(BodyInserters.fromPublisher(compatibilityLevel, CompatibilityLevelDTO.class))
-        .retrieve()
-        .onStatus(NOT_FOUND::equals,
-            throwIfNotFoundStatus(formatted(NO_SUCH_SCHEMA, schemaName)))
-        .bodyToMono(Void.class)
-        .as(m -> failoverAble(m, new FailoverMono<>(cluster.getSchemaRegistry(),
-            () -> this.updateSchemaCompatibility(cluster, schemaName, compatibilityLevel))));
+  public Mono<SubjectWithCompatibilityLevel> registerNewSchema(KafkaCluster cluster,
+                                                               String subject,
+                                                               NewSubject newSchemaSubject) {
+    return api(cluster)
+        .mono(c -> c.registerNewSchema(subject, newSchemaSubject))
+        .onErrorMap(WebClientResponseException.Conflict.class,
+            th -> new SchemaCompatibilityException())
+        .onErrorMap(WebClientResponseException.UnprocessableEntity.class,
+            th -> new ValidationException("Invalid schema"))
+        .then(getLatestSchemaVersionBySubject(cluster, subject));
   }
 
   public Mono<Void> updateSchemaCompatibility(KafkaCluster cluster,
-                                              Mono<CompatibilityLevelDTO> compatibilityLevel) {
-    return updateSchemaCompatibility(cluster, null, compatibilityLevel);
-  }
-
-  public Mono<InternalCompatibilityLevel> getSchemaCompatibilityLevel(KafkaCluster cluster,
-                                                                      String schemaName) {
-    String globalConfig = Objects.isNull(schemaName) ? "/config" : "/config/{schemaName}";
-    final var values = new LinkedMultiValueMap<String, String>();
-    values.add("defaultToGlobal", "true");
-    return configuredWebClient(
-        cluster,
-        HttpMethod.GET,
-        globalConfig,
-        (schemaName == null ? Collections.emptyList() : List.of(schemaName)),
-        values)
-        .retrieve()
-        .bodyToMono(InternalCompatibilityLevel.class)
+                                              String schemaName,
+                                              Compatibility compatibility) {
+    return api(cluster)
+        .mono(c -> c.updateSubjectCompatibilityLevel(
+            schemaName, new CompatibilityLevelChange().compatibility(compatibility)))
+        .then();
+  }
+
+  public Mono<Void> updateGlobalSchemaCompatibility(KafkaCluster cluster,
+                                                    Compatibility compatibility) {
+    return api(cluster)
+        .mono(c -> c.updateGlobalCompatibilityLevel(new CompatibilityLevelChange().compatibility(compatibility)))
+        .then();
+  }
+
+  public Mono<Compatibility> getSchemaCompatibilityLevel(KafkaCluster cluster,
+                                                         String schemaName) {
+    return api(cluster)
+        .mono(c -> c.getSubjectCompatibilityLevel(schemaName, true))
+        .map(CompatibilityConfig::getCompatibilityLevel)
         .onErrorResume(error -> Mono.empty());
   }
 
-  public Mono<InternalCompatibilityLevel> getGlobalSchemaCompatibilityLevel(KafkaCluster cluster) {
-    return this.getSchemaCompatibilityLevel(cluster, null);
+  public Mono<Compatibility> getGlobalSchemaCompatibilityLevel(KafkaCluster cluster) {
+    return api(cluster)
+        .mono(KafkaSrClientApi::getGlobalCompatibilityLevel)
+        .map(CompatibilityConfig::getCompatibilityLevel);
   }
 
-  private Mono<InternalCompatibilityLevel> getSchemaCompatibilityInfoOrGlobal(KafkaCluster cluster,
-                                                                              String schemaName) {
-    return this.getSchemaCompatibilityLevel(cluster, schemaName)
+  private Mono<Compatibility> getSchemaCompatibilityInfoOrGlobal(KafkaCluster cluster,
+                                                                 String schemaName) {
+    return getSchemaCompatibilityLevel(cluster, schemaName)
         .switchIfEmpty(this.getGlobalSchemaCompatibilityLevel(cluster));
   }
 
-  public Mono<InternalCompatibilityCheck> checksSchemaCompatibility(
-      KafkaCluster cluster, String schemaName, Mono<NewSchemaSubjectDTO> newSchemaSubject) {
-    return configuredWebClient(
-        cluster,
-        HttpMethod.POST,
-        "/compatibility/subjects/{schemaName}/versions/latest",
-        schemaName)
-        .contentType(MediaType.APPLICATION_JSON)
-        .body(BodyInserters.fromPublisher(newSchemaSubject, NewSchemaSubjectDTO.class))
-        .retrieve()
-        .onStatus(NOT_FOUND::equals,
-            throwIfNotFoundStatus(formatted(NO_SUCH_SCHEMA, schemaName)))
-        .bodyToMono(InternalCompatibilityCheck.class)
-        .as(m -> failoverAble(m, new FailoverMono<>(cluster.getSchemaRegistry(),
-            () -> this.checksSchemaCompatibility(cluster, schemaName, newSchemaSubject))));
-  }
-
-  public String formatted(String str, Object... args) {
-    try (Formatter formatter = new Formatter()) {
-      return formatter.format(str, args).toString();
-    }
-  }
-
-  private void setBasicAuthIfEnabled(InternalSchemaRegistry schemaRegistry, HttpHeaders headers) {
-    if (schemaRegistry.getUsername() != null && schemaRegistry.getPassword() != null) {
-      headers.setBasicAuth(
-          schemaRegistry.getUsername(),
-          schemaRegistry.getPassword()
-      );
-    } else if (schemaRegistry.getUsername() != null) {
-      throw new ValidationException(
-          "You specified username but did not specify password");
-    } else if (schemaRegistry.getPassword() != null) {
-      throw new ValidationException(
-          "You specified password but did not specify username");
-    }
-  }
-
-  private boolean isUnrecognizedFieldSchemaTypeMessage(String errorMessage) {
-    return errorMessage.contains(UNRECOGNIZED_FIELD_SCHEMA_TYPE);
-  }
-
-  private boolean isIncompatibleSchemaMessage(String message) {
-    return message.contains(INCOMPATIBLE_WITH_AN_EARLIER_SCHEMA);
-  }
-
-  private WebClient.RequestBodySpec configuredWebClient(KafkaCluster cluster, HttpMethod method,
-                                                        String uri) {
-    return configuredWebClient(cluster, method, uri, Collections.emptyList(),
-        new LinkedMultiValueMap<>());
-  }
-
-  private WebClient.RequestBodySpec configuredWebClient(KafkaCluster cluster, HttpMethod method,
-                                                        String uri, List<String> uriVariables) {
-    return configuredWebClient(cluster, method, uri, uriVariables, new LinkedMultiValueMap<>());
-  }
-
-  private WebClient.RequestBodySpec configuredWebClient(KafkaCluster cluster, HttpMethod method,
-                                                        String uri, @Nullable String uriVariable) {
-    List<String> uriVariables = uriVariable == null ? Collections.emptyList() : List.of(uriVariable);
-    return configuredWebClient(cluster, method, uri, uriVariables, new LinkedMultiValueMap<>());
-  }
-
-  private WebClient.RequestBodySpec configuredWebClient(KafkaCluster cluster,
-                                                        HttpMethod method, String path,
-                                                        List<String> uriVariables,
-                                                        MultiValueMap<String, String> queryParams) {
-    final var schemaRegistry = cluster.getSchemaRegistry();
-
-    try {
-      WebClient.Builder schemaRegistryWebClient = SecuredWebClient.configure(
-          schemaRegistry.getKeystoreLocation(),
-          schemaRegistry.getKeystorePassword(),
-          schemaRegistry.getTruststoreLocation(),
-          schemaRegistry.getTruststorePassword()
-      );
-
-      return schemaRegistryWebClient.build()
-          .method(method)
-          .uri(buildUri(schemaRegistry, path, uriVariables, queryParams))
-          .headers(headers -> setBasicAuthIfEnabled(schemaRegistry, headers));
-    } catch (Exception e) {
-      throw new IllegalStateException(
-          "cannot create TLS configuration for schema-registry in cluster " + cluster.getName(), e);
-    }
-  }
-
-  private URI buildUri(InternalSchemaRegistry schemaRegistry, String path, List<String> uriVariables,
-                       MultiValueMap<String, String> queryParams) {
-    final var builder = UriComponentsBuilder
-        .fromHttpUrl(schemaRegistry.getUri() + path);
-    builder.queryParams(queryParams);
-    return builder.build(uriVariables.toArray());
-  }
-
-  private Function<ClientResponse, Mono<? extends Throwable>> errorOnSchemaDeleteFailure(String schemaName) {
-    return resp -> {
-      if (NOT_FOUND.equals(resp.statusCode())) {
-        return Mono.error(new SchemaNotFoundException(schemaName));
-      }
-      return Mono.error(new SchemaFailedToDeleteException(schemaName));
-    };
-  }
-
-  private <T> Mono<T> failoverAble(Mono<T> request, FailoverMono<T> failoverMethod) {
-    return request.onErrorResume(failoverMethod::failover);
-  }
-
-  private <T> Flux<T> failoverAble(Flux<T> request, FailoverFlux<T> failoverMethod) {
-    return request.onErrorResume(failoverMethod::failover);
-  }
-
-  private abstract static class Failover<E> {
-    private final InternalSchemaRegistry schemaRegistry;
-    private final Supplier<E> failover;
-
-    private Failover(InternalSchemaRegistry schemaRegistry, Supplier<E> failover) {
-      this.schemaRegistry = Objects.requireNonNull(schemaRegistry);
-      this.failover = Objects.requireNonNull(failover);
-    }
-
-    abstract E error(Throwable error);
-
-    public E failover(Throwable error) {
-      if (error instanceof WebClientRequestException
-          && error.getCause() instanceof IOException
-          && schemaRegistry.isFailoverAvailable()) {
-        var uri = ((WebClientRequestException) error).getUri();
-        schemaRegistry.markAsUnavailable(String.format("%s://%s", uri.getScheme(), uri.getAuthority()));
-        return failover.get();
-      }
-      return error(error);
-    }
-  }
-
-  private static class FailoverMono<T> extends Failover<Mono<T>> {
-
-    private FailoverMono(InternalSchemaRegistry schemaRegistry, Supplier<Mono<T>> failover) {
-      super(schemaRegistry, failover);
-    }
-
-    @Override
-    Mono<T> error(Throwable error) {
-      return Mono.error(error);
-    }
-  }
-
-  private static class FailoverFlux<T> extends Failover<Flux<T>> {
-
-    private FailoverFlux(InternalSchemaRegistry schemaRegistry, Supplier<Flux<T>> failover) {
-      super(schemaRegistry, failover);
-    }
-
-    @Override
-    Flux<T> error(Throwable error) {
-      return Flux.error(error);
-    }
+  public Mono<CompatibilityCheckResponse> checksSchemaCompatibility(KafkaCluster cluster,
+                                                                    String schemaName,
+                                                                    NewSubject newSchemaSubject) {
+    return api(cluster).mono(c -> c.checkSchemaCompatibility(schemaName, LATEST, true, newSchemaSubject));
   }
 }

+ 52 - 63
kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/ksql/KsqlApiClient.java

@@ -4,28 +4,29 @@ import static ksql.KsqlGrammarParser.DefineVariableContext;
 import static ksql.KsqlGrammarParser.PrintTopicContext;
 import static ksql.KsqlGrammarParser.SingleStatementContext;
 import static ksql.KsqlGrammarParser.UndefineVariableContext;
+import static org.springframework.http.MediaType.APPLICATION_JSON;
 
 import com.fasterxml.jackson.databind.JsonNode;
-import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.json.JsonMapper;
 import com.fasterxml.jackson.databind.node.TextNode;
-import com.provectus.kafka.ui.exception.ValidationException;
-import com.provectus.kafka.ui.model.KafkaCluster;
+import com.provectus.kafka.ui.config.ClustersProperties;
 import com.provectus.kafka.ui.service.ksql.response.ResponseParser;
-import com.provectus.kafka.ui.util.SecuredWebClient;
+import com.provectus.kafka.ui.util.WebClientConfigurator;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
+import javax.annotation.Nullable;
 import lombok.Builder;
 import lombok.Value;
 import lombok.extern.slf4j.Slf4j;
 import org.springframework.core.codec.DecodingException;
-import org.springframework.http.HttpHeaders;
 import org.springframework.http.MediaType;
 import org.springframework.http.codec.json.Jackson2JsonDecoder;
+import org.springframework.http.codec.json.Jackson2JsonEncoder;
+import org.springframework.util.MimeType;
 import org.springframework.util.MimeTypeUtils;
 import org.springframework.util.unit.DataSize;
-import org.springframework.web.reactive.function.client.ExchangeStrategies;
 import org.springframework.web.reactive.function.client.WebClient;
 import org.springframework.web.reactive.function.client.WebClientResponseException;
 import reactor.core.publisher.Flux;
@@ -34,6 +35,8 @@ import reactor.core.publisher.Mono;
 @Slf4j
 public class KsqlApiClient {
 
+  private static final MimeType KQL_API_MIME_TYPE = MimeTypeUtils.parseMimeType("application/vnd.ksql.v1+json");
+
   private static final Set<Class<?>> UNSUPPORTED_STMT_TYPES = Set.of(
       PrintTopicContext.class,
       DefineVariableContext.class,
@@ -60,60 +63,46 @@ public class KsqlApiClient {
 
   //--------------------------------------------------------------------------------------------
 
-  private final KafkaCluster cluster;
-  private final DataSize maxBuffSize;
+  private final String baseUrl;
+  private final WebClient webClient;
 
-  public KsqlApiClient(KafkaCluster cluster, DataSize maxBuffSize) {
-    this.cluster = cluster;
-    this.maxBuffSize = maxBuffSize;
+  public KsqlApiClient(String baseUrl,
+                       @Nullable ClustersProperties.KsqldbServerAuth ksqldbServerAuth,
+                       @Nullable ClustersProperties.WebClientSsl ksqldbServerSsl,
+                       @Nullable DataSize maxBuffSize) {
+    this.baseUrl = baseUrl;
+    this.webClient = webClient(ksqldbServerAuth, ksqldbServerSsl, maxBuffSize);
   }
 
-  private WebClient webClient() {
-    var exchangeStrategies = ExchangeStrategies.builder()
-        .codecs(configurer -> {
-          configurer.customCodecs()
-              .register(
-                  new Jackson2JsonDecoder(
-                      new ObjectMapper(),
-                      // some ksqldb versions do not set content-type header in response,
-                      // but we still need to use JsonDecoder for it
-                      MimeTypeUtils.APPLICATION_OCTET_STREAM));
+  private static WebClient webClient(@Nullable ClustersProperties.KsqldbServerAuth ksqldbServerAuth,
+                                     @Nullable ClustersProperties.WebClientSsl ksqldbServerSsl,
+                                     @Nullable DataSize maxBuffSize) {
+    ksqldbServerAuth = Optional.ofNullable(ksqldbServerAuth).orElse(new ClustersProperties.KsqldbServerAuth());
+    ksqldbServerSsl = Optional.ofNullable(ksqldbServerSsl).orElse(new ClustersProperties.WebClientSsl());
+    maxBuffSize = Optional.ofNullable(maxBuffSize).orElse(DataSize.ofMegabytes(20));
+
+    return new WebClientConfigurator()
+        .configureSsl(
+            ksqldbServerSsl.getKeystoreLocation(),
+            ksqldbServerSsl.getKeystorePassword(),
+            ksqldbServerSsl.getTruststoreLocation(),
+            ksqldbServerSsl.getTruststorePassword()
+        )
+        .configureBasicAuth(
+            ksqldbServerAuth.getUsername(),
+            ksqldbServerAuth.getPassword()
+        )
+        .configureBufferSize(maxBuffSize)
+        .configureCodecs(codecs -> {
+          var mapper = new JsonMapper();
+          codecs.defaultCodecs()
+              .jackson2JsonEncoder(new Jackson2JsonEncoder(mapper, KQL_API_MIME_TYPE, APPLICATION_JSON));
+          // some ksqldb versions do not set content-type header in response,
+          // but we still need to use JsonDecoder for it
+          codecs.defaultCodecs()
+              .jackson2JsonDecoder(new Jackson2JsonDecoder(mapper, MimeTypeUtils.ALL));
         })
         .build();
-
-    try {
-      WebClient.Builder securedWebClient = SecuredWebClient.configure(
-          cluster.getKsqldbServer().getKeystoreLocation(),
-          cluster.getKsqldbServer().getKeystorePassword(),
-          cluster.getKsqldbServer().getTruststoreLocation(),
-          cluster.getKsqldbServer().getTruststorePassword()
-      );
-
-      return securedWebClient
-          .codecs(c -> c.defaultCodecs().maxInMemorySize((int) maxBuffSize.toBytes()))
-          .defaultHeaders(httpHeaders -> setBasicAuthIfEnabled(httpHeaders, cluster))
-          .exchangeStrategies(exchangeStrategies)
-          .build();
-    } catch (Exception e) {
-      throw new IllegalStateException(
-          "cannot create TLS configuration for ksqlDB in cluster " + cluster.getName(), e);
-    }
-  }
-
-  public static void setBasicAuthIfEnabled(HttpHeaders headers, KafkaCluster cluster) {
-    String username = cluster.getKsqldbServer().getUsername();
-    String password = cluster.getKsqldbServer().getPassword();
-    if (username != null && password != null) {
-      headers.setBasicAuth(username, password);
-    } else if (username != null) {
-      throw new ValidationException("You specified username but did not specify password");
-    } else if (password != null) {
-      throw new ValidationException("You specified password but did not specify username");
-    }
-  }
-
-  private String baseKsqlDbUri() {
-    return cluster.getKsqldbServer().getUrl();
   }
 
   private KsqlRequest ksqlRequest(String ksql, Map<String, String> streamProperties) {
@@ -121,11 +110,11 @@ public class KsqlApiClient {
   }
 
   private Flux<KsqlResponseTable> executeSelect(String ksql, Map<String, String> streamProperties) {
-    return webClient()
+    return webClient
         .post()
-        .uri(baseKsqlDbUri() + "/query")
-        .accept(MediaType.parseMediaType("application/vnd.ksql.v1+json"))
-        .contentType(MediaType.parseMediaType("application/vnd.ksql.v1+json"))
+        .uri(baseUrl + "/query")
+        .accept(new MediaType(KQL_API_MIME_TYPE))
+        .contentType(new MediaType(KQL_API_MIME_TYPE))
         .bodyValue(ksqlRequest(ksql, streamProperties))
         .retrieve()
         .bodyToFlux(JsonNode.class)
@@ -151,11 +140,11 @@ public class KsqlApiClient {
 
   private Flux<KsqlResponseTable> executeStatement(String ksql,
                                                    Map<String, String> streamProperties) {
-    return webClient()
+    return webClient
         .post()
-        .uri(baseKsqlDbUri() + "/ksql")
-        .accept(MediaType.parseMediaType("application/vnd.ksql.v1+json"))
-        .contentType(MediaType.parseMediaType("application/json"))
+        .uri(baseUrl + "/ksql")
+        .accept(new MediaType(KQL_API_MIME_TYPE))
+        .contentType(APPLICATION_JSON)
         .bodyValue(ksqlRequest(ksql, streamProperties))
         .exchangeToFlux(
             resp -> {

+ 6 - 14
kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/ksql/KsqlServiceV2.java

@@ -14,21 +14,13 @@ import java.util.UUID;
 import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 import lombok.extern.slf4j.Slf4j;
-import org.springframework.beans.factory.annotation.Value;
 import org.springframework.stereotype.Service;
-import org.springframework.util.unit.DataSize;
 import reactor.core.publisher.Flux;
 
 @Slf4j
 @Service
 public class KsqlServiceV2 {
 
-  private final DataSize maxBuffSize;
-
-  public KsqlServiceV2(@Value("${webclient.max-in-memory-buffer-size:20MB}") DataSize maxBuffSize) {
-    this.maxBuffSize = maxBuffSize;
-  }
-
   @lombok.Value
   private static class KsqlExecuteCommand {
     KafkaCluster cluster;
@@ -55,13 +47,13 @@ public class KsqlServiceV2 {
       throw new ValidationException("No command registered with id " + commandId);
     }
     registeredCommands.invalidate(commandId);
-    return new KsqlApiClient(cmd.cluster, maxBuffSize)
-        .execute(cmd.ksql, cmd.streamProperties);
+    return cmd.cluster.getKsqlClient()
+        .flux(client -> client.execute(cmd.ksql, cmd.streamProperties));
   }
 
   public Flux<KsqlTableDescriptionDTO> listTables(KafkaCluster cluster) {
-    return new KsqlApiClient(cluster, maxBuffSize)
-        .execute("LIST TABLES;", Map.of())
+    return cluster.getKsqlClient()
+        .flux(client -> client.execute("LIST TABLES;", Map.of()))
         .flatMap(resp -> {
           if (!resp.getHeader().equals("Tables")) {
             log.error("Unexpected result header: {}", resp.getHeader());
@@ -82,8 +74,8 @@ public class KsqlServiceV2 {
   }
 
   public Flux<KsqlStreamDescriptionDTO> listStreams(KafkaCluster cluster) {
-    return new KsqlApiClient(cluster, maxBuffSize)
-        .execute("LIST STREAMS;", Map.of())
+    return cluster.getKsqlClient()
+        .flux(client -> client.execute("LIST STREAMS;", Map.of()))
         .flatMap(resp -> {
           if (!resp.getHeader().equals("Streams")) {
             log.error("Unexpected result header: {}", resp.getHeader());

+ 154 - 0
kafka-ui-api/src/main/java/com/provectus/kafka/ui/util/ReactiveFailover.java

@@ -0,0 +1,154 @@
+package com.provectus.kafka.ui.util;
+
+import com.google.common.base.Preconditions;
+import java.io.IOException;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Function;
+import java.util.function.Predicate;
+import java.util.function.Supplier;
+import org.springframework.web.reactive.function.client.WebClientRequestException;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+
+public class ReactiveFailover<T> {
+
+  public static final Duration DEFAULT_RETRY_GRACE_PERIOD_MS = Duration.ofSeconds(5);
+  public static final Predicate<Throwable> CONNECTION_REFUSED_EXCEPTION_FILTER =
+      error -> error.getCause() instanceof IOException && error.getCause().getMessage().contains("Connection refused");
+
+  private final List<PublisherHolder<T>> publishers;
+  private int currentIndex = 0;
+
+  private final Predicate<Throwable> failoverExceptionsPredicate;
+  private final String noAvailablePublishersMsg;
+
+  public static <T> ReactiveFailover<T> create(List<T> publishers,
+                                               Predicate<Throwable> failoverExeptionsPredicate,
+                                               String noAvailablePublishersMsg,
+                                               Duration retryGracePeriodMs) {
+    return new ReactiveFailover<>(
+        publishers.stream().map(p -> new PublisherHolder<>(() -> p, retryGracePeriodMs.toMillis())).toList(),
+        failoverExeptionsPredicate,
+        noAvailablePublishersMsg
+    );
+  }
+
+  public static <T, A> ReactiveFailover<T> create(List<A> args,
+                                                  Function<A, T> factory,
+                                                  Predicate<Throwable> failoverExeptionsPredicate,
+                                                  String noAvailablePublishersMsg,
+                                                  Duration retryGracePeriodMs) {
+    return new ReactiveFailover<>(
+        args.stream().map(arg ->
+            new PublisherHolder<>(() -> factory.apply(arg), retryGracePeriodMs.toMillis())).toList(),
+        failoverExeptionsPredicate,
+        noAvailablePublishersMsg
+    );
+  }
+
+  private ReactiveFailover(List<PublisherHolder<T>> publishers,
+                   Predicate<Throwable> failoverExceptionsPredicate,
+                   String noAvailablePublishersMsg) {
+    Preconditions.checkArgument(!publishers.isEmpty());
+    this.publishers = publishers;
+    this.failoverExceptionsPredicate = failoverExceptionsPredicate;
+    this.noAvailablePublishersMsg = noAvailablePublishersMsg;
+  }
+
+  public <V> Mono<V> mono(Function<T, Mono<V>> f) {
+    List<PublisherHolder<T>> candidates = getActivePublishers();
+    if (candidates.isEmpty()) {
+      return Mono.error(() -> new IllegalStateException(noAvailablePublishersMsg));
+    }
+    return mono(f, candidates);
+  }
+
+  private <V> Mono<V> mono(Function<T, Mono<V>> f, List<PublisherHolder<T>> candidates) {
+    var publisher = candidates.get(0);
+    return f.apply(publisher.get())
+        .onErrorResume(failoverExceptionsPredicate, th -> {
+          publisher.markFailed();
+          if (candidates.size() == 1) {
+            return Mono.error(th);
+          }
+          var newCandidates = candidates.stream().skip(1).filter(PublisherHolder::isActive).toList();
+          if (newCandidates.isEmpty()) {
+            return Mono.error(th);
+          }
+          return mono(f, newCandidates);
+        });
+  }
+
+  public <V> Flux<V> flux(Function<T, Flux<V>> f) {
+    List<PublisherHolder<T>> candidates = getActivePublishers();
+    if (candidates.isEmpty()) {
+      return Flux.error(() -> new IllegalStateException(noAvailablePublishersMsg));
+    }
+    return flux(f, candidates);
+  }
+
+  private <V> Flux<V> flux(Function<T, Flux<V>> f, List<PublisherHolder<T>> candidates) {
+    var publisher = candidates.get(0);
+    return f.apply(publisher.get())
+        .onErrorResume(failoverExceptionsPredicate, th -> {
+          publisher.markFailed();
+          if (candidates.size() == 1) {
+            return Flux.error(th);
+          }
+          var newCandidates = candidates.stream().skip(1).filter(PublisherHolder::isActive).toList();
+          if (newCandidates.isEmpty()) {
+            return Flux.error(th);
+          }
+          return flux(f, newCandidates);
+        });
+  }
+
+  /**
+   * Returns list of active publishers, starting with latest active.
+   */
+  private synchronized List<PublisherHolder<T>> getActivePublishers() {
+    var result = new ArrayList<PublisherHolder<T>>();
+    for (int i = 0, j = currentIndex; i < publishers.size(); i++) {
+      var publisher = publishers.get(j);
+      if (publisher.isActive()) {
+        result.add(publisher);
+      } else if (currentIndex == j) {
+        currentIndex = ++currentIndex == publishers.size() ? 0 : currentIndex;
+      }
+      j = ++j == publishers.size() ? 0 : j;
+    }
+    return result;
+  }
+
+  static class PublisherHolder<T> {
+
+    private final long retryGracePeriodMs;
+    private final Supplier<T> supplier;
+    private final AtomicLong lastErrorTs = new AtomicLong();
+    private T publisherInstance;
+
+    PublisherHolder(Supplier<T> supplier, long retryGracePeriodMs) {
+      this.supplier = supplier;
+      this.retryGracePeriodMs = retryGracePeriodMs;
+    }
+
+    synchronized T get() {
+      if (publisherInstance == null) {
+        publisherInstance = supplier.get();
+      }
+      return publisherInstance;
+    }
+
+    void markFailed() {
+      lastErrorTs.set(System.currentTimeMillis());
+    }
+
+    boolean isActive() {
+      return System.currentTimeMillis() - lastErrorTs.get() > retryGracePeriodMs;
+    }
+  }
+
+}

+ 0 - 66
kafka-ui-api/src/main/java/com/provectus/kafka/ui/util/SecuredWebClient.java

@@ -1,66 +0,0 @@
-package com.provectus.kafka.ui.util;
-
-import io.netty.handler.ssl.SslContext;
-import io.netty.handler.ssl.SslContextBuilder;
-import java.io.FileInputStream;
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.security.KeyStore;
-import java.security.KeyStoreException;
-import java.security.NoSuchAlgorithmException;
-import java.security.UnrecoverableKeyException;
-import java.security.cert.CertificateException;
-import javax.net.ssl.KeyManagerFactory;
-import javax.net.ssl.TrustManagerFactory;
-import org.springframework.http.client.reactive.ReactorClientHttpConnector;
-import org.springframework.util.ResourceUtils;
-import org.springframework.web.reactive.function.client.WebClient;
-import reactor.netty.http.client.HttpClient;
-
-public class SecuredWebClient {
-  public static WebClient.Builder configure(
-      String keystoreLocation,
-      String keystorePassword,
-      String truststoreLocation,
-      String truststorePassword)
-      throws NoSuchAlgorithmException, IOException, KeyStoreException, CertificateException, UnrecoverableKeyException {
-    // If we want to customize our TLS configuration, we need at least a truststore
-    if (truststoreLocation == null || truststorePassword == null) {
-      return WebClient.builder();
-    }
-
-    SslContextBuilder contextBuilder = SslContextBuilder.forClient();
-
-    // Prepare truststore
-    KeyStore trustStore = KeyStore.getInstance("JKS");
-    trustStore.load(
-        new FileInputStream((ResourceUtils.getFile(truststoreLocation))),
-        truststorePassword.toCharArray()
-    );
-
-    TrustManagerFactory trustManagerFactory = TrustManagerFactory.getInstance(
-        TrustManagerFactory.getDefaultAlgorithm()
-    );
-    trustManagerFactory.init(trustStore);
-    contextBuilder.trustManager(trustManagerFactory);
-
-    // Prepare keystore only if we got a keystore
-    if (keystoreLocation != null && keystorePassword != null) {
-      KeyStore keyStore = KeyStore.getInstance("JKS");
-      keyStore.load(
-          new FileInputStream(ResourceUtils.getFile(keystoreLocation)),
-          keystorePassword.toCharArray()
-      );
-
-      KeyManagerFactory keyManagerFactory = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm());
-      keyManagerFactory.init(keyStore, keystorePassword.toCharArray());
-      contextBuilder.keyManager(keyManagerFactory);
-    }
-
-    // Create webclient
-    SslContext context = contextBuilder.build();
-
-    return WebClient.builder()
-        .clientConnector(new ReactorClientHttpConnector(HttpClient.create().secure(t -> t.sslContext(context))));
-  }
-}

+ 136 - 0
kafka-ui-api/src/main/java/com/provectus/kafka/ui/util/WebClientConfigurator.java

@@ -0,0 +1,136 @@
+package com.provectus.kafka.ui.util;
+
+import com.fasterxml.jackson.databind.DeserializationFeature;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
+import com.provectus.kafka.ui.config.ClustersProperties;
+import com.provectus.kafka.ui.exception.ValidationException;
+import io.netty.handler.ssl.SslContext;
+import io.netty.handler.ssl.SslContextBuilder;
+import java.io.FileInputStream;
+import java.security.KeyStore;
+import java.util.function.Consumer;
+import javax.annotation.Nullable;
+import javax.net.ssl.KeyManagerFactory;
+import javax.net.ssl.TrustManagerFactory;
+import lombok.SneakyThrows;
+import org.openapitools.jackson.nullable.JsonNullableModule;
+import org.springframework.http.MediaType;
+import org.springframework.http.client.reactive.ReactorClientHttpConnector;
+import org.springframework.http.codec.ClientCodecConfigurer;
+import org.springframework.http.codec.json.Jackson2JsonDecoder;
+import org.springframework.http.codec.json.Jackson2JsonEncoder;
+import org.springframework.util.ResourceUtils;
+import org.springframework.util.unit.DataSize;
+import org.springframework.web.reactive.function.client.WebClient;
+import reactor.netty.http.client.HttpClient;
+
+public class WebClientConfigurator {
+
+  private final WebClient.Builder builder = WebClient.builder();
+
+  public WebClientConfigurator() {
+    configureObjectMapper(defaultOM());
+  }
+
+  private static ObjectMapper defaultOM() {
+    return new ObjectMapper()
+        .registerModule(new JavaTimeModule())
+        .registerModule(new JsonNullableModule())
+        .configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
+  }
+
+
+  public WebClientConfigurator configureSsl(@Nullable ClustersProperties.WebClientSsl ssl) {
+    if (ssl != null) {
+      return configureSsl(
+          ssl.getKeystoreLocation(),
+          ssl.getKeystorePassword(),
+          ssl.getTruststoreLocation(),
+          ssl.getTruststorePassword()
+      );
+    }
+    return this;
+  }
+
+  @SneakyThrows
+  public WebClientConfigurator configureSsl(
+      @Nullable String keystoreLocation,
+      @Nullable String keystorePassword,
+      @Nullable String truststoreLocation,
+      @Nullable String truststorePassword) {
+    // If we want to customize our TLS configuration, we need at least a truststore
+    if (truststoreLocation == null || truststorePassword == null) {
+      return this;
+    }
+
+    SslContextBuilder contextBuilder = SslContextBuilder.forClient();
+
+    // Prepare truststore
+    KeyStore trustStore = KeyStore.getInstance("JKS");
+    trustStore.load(
+        new FileInputStream((ResourceUtils.getFile(truststoreLocation))),
+        truststorePassword.toCharArray()
+    );
+
+    TrustManagerFactory trustManagerFactory = TrustManagerFactory.getInstance(
+        TrustManagerFactory.getDefaultAlgorithm()
+    );
+    trustManagerFactory.init(trustStore);
+    contextBuilder.trustManager(trustManagerFactory);
+
+    // Prepare keystore only if we got a keystore
+    if (keystoreLocation != null && keystorePassword != null) {
+      KeyStore keyStore = KeyStore.getInstance("JKS");
+      keyStore.load(
+          new FileInputStream(ResourceUtils.getFile(keystoreLocation)),
+          keystorePassword.toCharArray()
+      );
+
+      KeyManagerFactory keyManagerFactory = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm());
+      keyManagerFactory.init(keyStore, keystorePassword.toCharArray());
+      contextBuilder.keyManager(keyManagerFactory);
+    }
+
+    // Create webclient
+    SslContext context = contextBuilder.build();
+
+    builder.clientConnector(new ReactorClientHttpConnector(HttpClient.create().secure(t -> t.sslContext(context))));
+    return this;
+  }
+
+  public WebClientConfigurator configureBasicAuth(@Nullable String username, @Nullable String password) {
+    if (username != null && password != null) {
+      builder.defaultHeaders(httpHeaders -> httpHeaders.setBasicAuth(username, password));
+    } else if (username != null) {
+      throw new ValidationException("You specified username but did not specify password");
+    } else if (password != null) {
+      throw new ValidationException("You specified password but did not specify username");
+    }
+    return this;
+  }
+
+  public WebClientConfigurator configureBufferSize(DataSize maxBuffSize) {
+    builder.codecs(c -> c.defaultCodecs().maxInMemorySize((int) maxBuffSize.toBytes()));
+    return this;
+  }
+
+  public WebClientConfigurator configureObjectMapper(ObjectMapper mapper) {
+    builder.codecs(codecs -> {
+      codecs.defaultCodecs()
+          .jackson2JsonEncoder(new Jackson2JsonEncoder(mapper, MediaType.APPLICATION_JSON));
+      codecs.defaultCodecs()
+          .jackson2JsonDecoder(new Jackson2JsonDecoder(mapper, MediaType.APPLICATION_JSON));
+    });
+    return this;
+  }
+
+  public WebClientConfigurator configureCodecs(Consumer<ClientCodecConfigurer> configurer) {
+    builder.codecs(configurer);
+    return this;
+  }
+
+  public WebClient build() {
+    return builder.build();
+  }
+}

+ 0 - 69
kafka-ui-api/src/test/java/com/provectus/kafka/ui/model/FailoverUrlListTest.java

@@ -1,69 +0,0 @@
-package com.provectus.kafka.ui.model;
-
-import static org.assertj.core.api.Assertions.assertThat;
-
-import java.util.List;
-import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Nested;
-import org.junit.jupiter.api.Test;
-
-
-class FailoverUrlListTest {
-
-  public static final int RETRY_GRACE_PERIOD_IN_MS = 10;
-
-  @Nested
-  @SuppressWarnings("all")
-  class ShouldHaveFailoverAvailableWhen {
-
-    private FailoverUrlList failoverUrlList;
-
-    @BeforeEach
-    void before() {
-      failoverUrlList = new FailoverUrlList(List.of("localhost:123", "farawayhost:5678"), RETRY_GRACE_PERIOD_IN_MS);
-    }
-
-    @Test
-    void thereAreNoFailures() {
-      assertThat(failoverUrlList.isFailoverAvailable()).isTrue();
-    }
-
-    @Test
-    void withLessFailuresThenAvailableUrls() {
-      failoverUrlList.fail(failoverUrlList.current());
-
-      assertThat(failoverUrlList.isFailoverAvailable()).isTrue();
-    }
-
-    @Test
-    void withAllFailuresAndAtLeastOneAfterTheGraceTimeoutPeriod() throws InterruptedException {
-      failoverUrlList.fail(failoverUrlList.current());
-      failoverUrlList.fail(failoverUrlList.current());
-
-      Thread.sleep(RETRY_GRACE_PERIOD_IN_MS + 1);
-
-      assertThat(failoverUrlList.isFailoverAvailable()).isTrue();
-    }
-
-    @Nested
-    @SuppressWarnings("all")
-    class ShouldNotHaveFailoverAvailableWhen {
-
-      private FailoverUrlList failoverUrlList;
-
-      @BeforeEach
-      void before() {
-        failoverUrlList = new FailoverUrlList(List.of("localhost:123", "farawayhost:5678"), 1000);
-      }
-
-      @Test
-      void allFailuresWithinGracePeriod() {
-        failoverUrlList.fail(failoverUrlList.current());
-        failoverUrlList.fail(failoverUrlList.current());
-
-        assertThat(failoverUrlList.isFailoverAvailable()).isFalse();
-      }
-    }
-  }
-}
-

+ 43 - 43
kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/SchemaRegistryPaginationTest.java

@@ -7,78 +7,78 @@ import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
 import com.provectus.kafka.ui.controller.SchemasController;
-import com.provectus.kafka.ui.mapper.ClusterMapper;
-import com.provectus.kafka.ui.model.InternalSchemaRegistry;
 import com.provectus.kafka.ui.model.KafkaCluster;
 import com.provectus.kafka.ui.model.SchemaSubjectDTO;
-import com.provectus.kafka.ui.service.rbac.AccessControlService;
+import com.provectus.kafka.ui.sr.model.Compatibility;
+import com.provectus.kafka.ui.sr.model.SchemaSubject;
 import com.provectus.kafka.ui.util.AccessControlServiceMock;
+import com.provectus.kafka.ui.util.ReactiveFailover;
 import java.util.Comparator;
+import java.util.List;
 import java.util.Optional;
 import java.util.stream.IntStream;
 import org.junit.jupiter.api.Test;
-import org.springframework.test.util.ReflectionTestUtils;
 import reactor.core.publisher.Mono;
 
 public class SchemaRegistryPaginationTest {
 
   private static final String LOCAL_KAFKA_CLUSTER_NAME = "local";
 
-  private final SchemaRegistryService schemaRegistryService = mock(SchemaRegistryService.class);
-  private final ClustersStorage clustersStorage = mock(ClustersStorage.class);
-  private final ClusterMapper clusterMapper = mock(ClusterMapper.class);
-  private final AccessControlService accessControlService = new AccessControlServiceMock().getMock();
+  private SchemasController controller;
 
-  private final SchemasController controller
-      = new SchemasController(clusterMapper, schemaRegistryService, accessControlService);
+  private void init(List<String> subjects) {
+    ClustersStorage clustersStorage = mock(ClustersStorage.class);
+    when(clustersStorage.getClusterByName(isA(String.class)))
+        .thenReturn(Optional.of(buildKafkaCluster(LOCAL_KAFKA_CLUSTER_NAME)));
 
-  private void init(String[] subjects) {
+    SchemaRegistryService schemaRegistryService = mock(SchemaRegistryService.class);
     when(schemaRegistryService.getAllSubjectNames(isA(KafkaCluster.class)))
-        .thenReturn(Mono.just(subjects));
+                .thenReturn(Mono.just(subjects));
     when(schemaRegistryService
-        .getAllLatestVersionSchemas(isA(KafkaCluster.class), anyList())).thenCallRealMethod();
-    when(clustersStorage.getClusterByName(isA(String.class)))
-        .thenReturn(Optional.of(buildKafkaCluster(LOCAL_KAFKA_CLUSTER_NAME)));
+            .getAllLatestVersionSchemas(isA(KafkaCluster.class), anyList())).thenCallRealMethod();
     when(schemaRegistryService.getLatestSchemaVersionBySubject(isA(KafkaCluster.class), isA(String.class)))
-        .thenAnswer(a -> Mono.just(new SchemaSubjectDTO().subject(a.getArgument(1))));
+            .thenAnswer(a -> Mono.just(
+                new SchemaRegistryService.SubjectWithCompatibilityLevel(
+                    new SchemaSubject().subject(a.getArgument(1)), Compatibility.FULL)));
 
-    ReflectionTestUtils.setField(controller, "clustersStorage", clustersStorage);
+    this.controller = new SchemasController(schemaRegistryService, new AccessControlServiceMock().getMock());
+    this.controller.setClustersStorage(clustersStorage);
   }
 
   @Test
   void shouldListFirst25andThen10Schemas() {
     init(
-        IntStream.rangeClosed(1, 100)
-            .boxed()
-            .map(num -> "subject" + num)
-            .toArray(String[]::new)
+            IntStream.rangeClosed(1, 100)
+                    .boxed()
+                    .map(num -> "subject" + num)
+                    .toList()
     );
     var schemasFirst25 = controller.getSchemas(LOCAL_KAFKA_CLUSTER_NAME,
-        null, null, null, null).block();
+            null, null, null, null).block();
     assertThat(schemasFirst25.getBody().getPageCount()).isEqualTo(4);
     assertThat(schemasFirst25.getBody().getSchemas()).hasSize(25);
     assertThat(schemasFirst25.getBody().getSchemas())
-        .isSortedAccordingTo(Comparator.comparing(SchemaSubjectDTO::getSubject));
+            .isSortedAccordingTo(Comparator.comparing(SchemaSubjectDTO::getSubject));
 
     var schemasFirst10 = controller.getSchemas(LOCAL_KAFKA_CLUSTER_NAME,
-        null, 10, null, null).block();
+            null, 10, null, null).block();
 
     assertThat(schemasFirst10.getBody().getPageCount()).isEqualTo(10);
     assertThat(schemasFirst10.getBody().getSchemas()).hasSize(10);
     assertThat(schemasFirst10.getBody().getSchemas())
-        .isSortedAccordingTo(Comparator.comparing(SchemaSubjectDTO::getSubject));
+            .isSortedAccordingTo(Comparator.comparing(SchemaSubjectDTO::getSubject));
   }
 
   @Test
   void shouldListSchemasContaining_1() {
     init(
-        IntStream.rangeClosed(1, 100)
-            .boxed()
-            .map(num -> "subject" + num)
-            .toArray(String[]::new)
+              IntStream.rangeClosed(1, 100)
+                      .boxed()
+                      .map(num -> "subject" + num)
+                      .toList()
     );
     var schemasSearch7 = controller.getSchemas(LOCAL_KAFKA_CLUSTER_NAME,
-        null, null, "1", null).block();
+            null, null, "1", null).block();
     assertThat(schemasSearch7.getBody().getPageCount()).isEqualTo(1);
     assertThat(schemasSearch7.getBody().getSchemas()).hasSize(20);
   }
@@ -86,13 +86,13 @@ public class SchemaRegistryPaginationTest {
   @Test
   void shouldCorrectlyHandleNonPositivePageNumberAndPageSize() {
     init(
-        IntStream.rangeClosed(1, 100)
-            .boxed()
-            .map(num -> "subject" + num)
-            .toArray(String[]::new)
+                IntStream.rangeClosed(1, 100)
+                        .boxed()
+                        .map(num -> "subject" + num)
+                        .toList()
     );
     var schemas = controller.getSchemas(LOCAL_KAFKA_CLUSTER_NAME,
-        0, -1, null, null).block();
+            0, -1, null, null).block();
 
     assertThat(schemas.getBody().getPageCount()).isEqualTo(4);
     assertThat(schemas.getBody().getSchemas()).hasSize(25);
@@ -102,14 +102,14 @@ public class SchemaRegistryPaginationTest {
   @Test
   void shouldCalculateCorrectPageCountForNonDivisiblePageSize() {
     init(
-        IntStream.rangeClosed(1, 100)
-            .boxed()
-            .map(num -> "subject" + num)
-            .toArray(String[]::new)
+                IntStream.rangeClosed(1, 100)
+                        .boxed()
+                        .map(num -> "subject" + num)
+                        .toList()
     );
 
     var schemas = controller.getSchemas(LOCAL_KAFKA_CLUSTER_NAME,
-        4, 33, null, null).block();
+            4, 33, null, null).block();
 
     assertThat(schemas.getBody().getPageCount()).isEqualTo(4);
     assertThat(schemas.getBody().getSchemas()).hasSize(1);
@@ -118,8 +118,8 @@ public class SchemaRegistryPaginationTest {
 
   private KafkaCluster buildKafkaCluster(String clusterName) {
     return KafkaCluster.builder()
-        .name(clusterName)
-        .schemaRegistry(InternalSchemaRegistry.builder().build())
-        .build();
+            .name(clusterName)
+            .schemaRegistryClient(mock(ReactiveFailover.class))
+            .build();
   }
 }

+ 0 - 2
kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/TopicsServicePaginationTest.java

@@ -11,7 +11,6 @@ import com.provectus.kafka.ui.mapper.ClusterMapper;
 import com.provectus.kafka.ui.mapper.ClusterMapperImpl;
 import com.provectus.kafka.ui.model.InternalLogDirStats;
 import com.provectus.kafka.ui.model.InternalPartitionsOffsets;
-import com.provectus.kafka.ui.model.InternalSchemaRegistry;
 import com.provectus.kafka.ui.model.InternalTopic;
 import com.provectus.kafka.ui.model.KafkaCluster;
 import com.provectus.kafka.ui.model.Metrics;
@@ -87,7 +86,6 @@ class TopicsServicePaginationTest {
   private KafkaCluster buildKafkaCluster(String clusterName) {
     return KafkaCluster.builder()
         .name(clusterName)
-        .schemaRegistry(InternalSchemaRegistry.builder().build())
         .build();
   }
 

+ 5 - 7
kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/ksql/KsqlApiClientTest.java

@@ -9,15 +9,12 @@ import com.fasterxml.jackson.databind.node.JsonNodeFactory;
 import com.fasterxml.jackson.databind.node.TextNode;
 import com.provectus.kafka.ui.AbstractIntegrationTest;
 import com.provectus.kafka.ui.container.KsqlDbContainer;
-import com.provectus.kafka.ui.model.InternalKsqlServer;
-import com.provectus.kafka.ui.model.KafkaCluster;
 import java.time.Duration;
 import java.util.List;
 import java.util.Map;
 import org.junit.jupiter.api.AfterAll;
 import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.Test;
-import org.springframework.util.unit.DataSize;
 import org.testcontainers.shaded.org.awaitility.Awaitility;
 import org.testcontainers.utility.DockerImageName;
 import reactor.test.StepVerifier;
@@ -28,8 +25,6 @@ class KsqlApiClientTest extends AbstractIntegrationTest {
       DockerImageName.parse("confluentinc/ksqldb-server").withTag("0.24.0"))
       .withKafka(kafka);
 
-  private static final DataSize maxBuffSize = DataSize.ofMegabytes(20);
-
   @BeforeAll
   static void startContainer() {
     KSQL_DB.start();
@@ -43,8 +38,7 @@ class KsqlApiClientTest extends AbstractIntegrationTest {
   // Tutorial is here: https://ksqldb.io/quickstart.html
   @Test
   void ksqTutorialQueriesWork() {
-    var client = new KsqlApiClient(KafkaCluster.builder().ksqldbServer(
-            InternalKsqlServer.builder().url(KSQL_DB.url()).build()).build(), maxBuffSize);
+    var client = ksqlClient();
     execCommandSync(client,
         "CREATE STREAM riderLocations (profileId VARCHAR, latitude DOUBLE, longitude DOUBLE) "
             + "WITH (kafka_topic='locations', value_format='json', partitions=1);",
@@ -130,5 +124,9 @@ class KsqlApiClientTest extends AbstractIntegrationTest {
     }
   }
 
+  private KsqlApiClient ksqlClient() {
+    return new KsqlApiClient(KSQL_DB.url(), null, null, null);
+  }
+
 
 }

+ 20 - 13
kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/ksql/KsqlServiceV2Test.java

@@ -4,10 +4,11 @@ import static org.assertj.core.api.Assertions.assertThat;
 
 import com.provectus.kafka.ui.AbstractIntegrationTest;
 import com.provectus.kafka.ui.container.KsqlDbContainer;
-import com.provectus.kafka.ui.model.InternalKsqlServer;
 import com.provectus.kafka.ui.model.KafkaCluster;
 import com.provectus.kafka.ui.model.KsqlStreamDescriptionDTO;
 import com.provectus.kafka.ui.model.KsqlTableDescriptionDTO;
+import com.provectus.kafka.ui.util.ReactiveFailover;
+import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.CopyOnWriteArraySet;
@@ -35,29 +36,25 @@ class KsqlServiceV2Test extends AbstractIntegrationTest {
 
   @AfterAll
   static void cleanup() {
-    var client = new KsqlApiClient(KafkaCluster.builder().ksqldbServer(
-        InternalKsqlServer.builder().url(KSQL_DB.url()).build()).build(), maxBuffSize);
-
     TABLES_TO_DELETE.forEach(t ->
-        client.execute(String.format("DROP TABLE IF EXISTS %s DELETE TOPIC;", t), Map.of())
+        ksqlClient().execute(String.format("DROP TABLE IF EXISTS %s DELETE TOPIC;", t), Map.of())
             .blockLast());
 
     STREAMS_TO_DELETE.forEach(s ->
-        client.execute(String.format("DROP STREAM IF EXISTS %s DELETE TOPIC;", s), Map.of())
+        ksqlClient().execute(String.format("DROP STREAM IF EXISTS %s DELETE TOPIC;", s), Map.of())
             .blockLast());
 
     KSQL_DB.stop();
   }
 
-  private final KsqlServiceV2 ksqlService = new KsqlServiceV2(maxBuffSize);
+  private final KsqlServiceV2 ksqlService = new KsqlServiceV2();
 
   @Test
   void listStreamsReturnsAllKsqlStreams() {
-    var cluster = KafkaCluster.builder().ksqldbServer(InternalKsqlServer.builder().url(KSQL_DB.url()).build()).build();
     var streamName = "stream_" + System.currentTimeMillis();
     STREAMS_TO_DELETE.add(streamName);
 
-    new KsqlApiClient(cluster, maxBuffSize)
+    ksqlClient()
         .execute(
             String.format("CREATE STREAM %s ( "
                 + "  c1 BIGINT KEY, "
@@ -70,7 +67,7 @@ class KsqlServiceV2Test extends AbstractIntegrationTest {
             Map.of())
         .blockLast();
 
-    var streams = ksqlService.listStreams(cluster).collectList().block();
+    var streams = ksqlService.listStreams(cluster()).collectList().block();
     assertThat(streams).contains(
         new KsqlStreamDescriptionDTO()
             .name(streamName.toUpperCase())
@@ -82,11 +79,10 @@ class KsqlServiceV2Test extends AbstractIntegrationTest {
 
   @Test
   void listTablesReturnsAllKsqlTables() {
-    var cluster = KafkaCluster.builder().ksqldbServer(InternalKsqlServer.builder().url(KSQL_DB.url()).build()).build();
     var tableName = "table_" + System.currentTimeMillis();
     TABLES_TO_DELETE.add(tableName);
 
-    new KsqlApiClient(cluster, maxBuffSize)
+    ksqlClient()
         .execute(
             String.format("CREATE TABLE %s ( "
                 + "   c1 BIGINT PRIMARY KEY, "
@@ -99,7 +95,7 @@ class KsqlServiceV2Test extends AbstractIntegrationTest {
             Map.of())
         .blockLast();
 
-    var tables = ksqlService.listTables(cluster).collectList().block();
+    var tables = ksqlService.listTables(cluster()).collectList().block();
     assertThat(tables).contains(
         new KsqlTableDescriptionDTO()
             .name(tableName.toUpperCase())
@@ -110,4 +106,15 @@ class KsqlServiceV2Test extends AbstractIntegrationTest {
     );
   }
 
+  private static KafkaCluster cluster() {
+    return KafkaCluster.builder()
+        .ksqlClient(ReactiveFailover.create(
+            List.of(ksqlClient()), th -> true, "", ReactiveFailover.DEFAULT_RETRY_GRACE_PERIOD_MS))
+        .build();
+  }
+
+  private static KsqlApiClient ksqlClient() {
+    return new KsqlApiClient(KSQL_DB.url(), null, null, null);
+  }
+
 }

+ 233 - 0
kafka-ui-api/src/test/java/com/provectus/kafka/ui/util/ReactiveFailoverTest.java

@@ -0,0 +1,233 @@
+package com.provectus.kafka.ui.util;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import com.google.common.base.Preconditions;
+import java.time.Duration;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Consumer;
+import java.util.function.Predicate;
+import java.util.function.Supplier;
+import java.util.stream.Stream;
+import org.junit.jupiter.api.Test;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+import reactor.test.StepVerifier;
+
+class ReactiveFailoverTest {
+
+  private static final String NO_AVAILABLE_PUBLISHERS_MSG = "no active publishers!";
+  private static final Predicate<Throwable> FAILING_EXCEPTION_FILTER = th -> th.getMessage().contains("fail!");
+  private static final Supplier<Throwable> FAILING_EXCEPTION_SUPPLIER = () -> new IllegalStateException("fail!");
+  private static final Duration RETRY_PERIOD = Duration.ofMillis(300);
+
+  private final List<Publisher> publishers = Stream.generate(Publisher::new).limit(3).toList();
+
+  private final ReactiveFailover<Publisher> failover = ReactiveFailover.create(
+      publishers,
+      FAILING_EXCEPTION_FILTER,
+      NO_AVAILABLE_PUBLISHERS_MSG,
+      RETRY_PERIOD
+  );
+
+  @Test
+  void testMonoFailoverCycle() throws InterruptedException {
+    // starting with first publisher:
+    // 0 -> ok : ok
+    monoCheck(
+        Map.of(
+            0, okMono()
+        ),
+        List.of(0),
+        step -> step.expectNextCount(1).verifyComplete()
+    );
+
+    // 0 -> fail, 1 -> ok : ok
+    monoCheck(
+        Map.of(
+            0, failingMono(),
+            1, okMono()
+        ),
+        List.of(0, 1),
+        step -> step.expectNextCount(1).verifyComplete()
+    );
+
+    // 0.failed, 1.failed, 2 -> ok : ok
+    monoCheck(
+        Map.of(
+            1, failingMono(),
+            2, okMono()
+        ),
+        List.of(1, 2),
+        step -> step.expectNextCount(1).verifyComplete()
+    );
+
+    // 0.failed, 1.failed, 2 -> fail : failing exception
+    monoCheck(
+        Map.of(
+            2, failingMono()
+        ),
+        List.of(2),
+        step -> step.verifyErrorMessage(FAILING_EXCEPTION_SUPPLIER.get().getMessage())
+    );
+
+    // 0.failed, 1.failed, 2.failed : No alive publisher exception
+    monoCheck(
+        Map.of(),
+        List.of(),
+        step -> step.verifyErrorMessage(NO_AVAILABLE_PUBLISHERS_MSG)
+    );
+
+    // resetting retry: all publishers became alive: 0.ok, 1.ok, 2.ok
+    Thread.sleep(RETRY_PERIOD.toMillis() + 1);
+
+    // starting with last errored publisher:
+    // 2 -> fail, 0 -> fail, 1 -> ok : ok
+    monoCheck(
+        Map.of(
+            2, failingMono(),
+            0, failingMono(),
+            1, okMono()
+        ),
+        List.of(2, 0, 1),
+        step -> step.expectNextCount(1).verifyComplete()
+    );
+
+    // 1 -> ok : ok
+    monoCheck(
+        Map.of(
+            1, okMono()
+        ),
+        List.of(1),
+        step -> step.expectNextCount(1).verifyComplete()
+    );
+  }
+
+  @Test
+  void testFluxFailoverCycle() throws InterruptedException {
+    // starting with first publisher:
+    // 0 -> ok : ok
+    fluxCheck(
+        Map.of(
+            0, okFlux()
+        ),
+        List.of(0),
+        step -> step.expectNextCount(1).verifyComplete()
+    );
+
+    // 0 -> fail, 1 -> ok : ok
+    fluxCheck(
+        Map.of(
+            0, failingFlux(),
+            1, okFlux()
+        ),
+        List.of(0, 1),
+        step -> step.expectNextCount(1).verifyComplete()
+    );
+
+    // 0.failed, 1.failed, 2 -> ok : ok
+    fluxCheck(
+        Map.of(
+            1, failingFlux(),
+            2, okFlux()
+        ),
+        List.of(1, 2),
+        step -> step.expectNextCount(1).verifyComplete()
+    );
+
+    // 0.failed, 1.failed, 2 -> fail : failing exception
+    fluxCheck(
+        Map.of(
+            2, failingFlux()
+        ),
+        List.of(2),
+        step -> step.verifyErrorMessage(FAILING_EXCEPTION_SUPPLIER.get().getMessage())
+    );
+
+    // 0.failed, 1.failed, 2.failed : No alive publisher exception
+    fluxCheck(
+        Map.of(),
+        List.of(),
+        step -> step.verifyErrorMessage(NO_AVAILABLE_PUBLISHERS_MSG)
+    );
+
+    // resetting retry: all publishers became alive: 0.ok, 1.ok, 2.ok
+    Thread.sleep(RETRY_PERIOD.toMillis() + 1);
+
+    // starting with last errored publisher:
+    // 2 -> fail, 0 -> fail, 1 -> ok : ok
+    fluxCheck(
+        Map.of(
+            2, failingFlux(),
+            0, failingFlux(),
+            1, okFlux()
+        ),
+        List.of(2, 0, 1),
+        step -> step.expectNextCount(1).verifyComplete()
+    );
+
+    // 1 -> ok : ok
+    fluxCheck(
+        Map.of(
+            1, okFlux()
+        ),
+        List.of(1),
+        step -> step.expectNextCount(1).verifyComplete()
+    );
+  }
+
+  private void monoCheck(Map<Integer, Mono<String>> mock,
+                         List<Integer> publishersToBeCalled, // for checking calls order
+                         Consumer<StepVerifier.Step<?>> stepVerifier) {
+    AtomicInteger calledCount = new AtomicInteger();
+    var mono = failover.mono(publisher -> {
+      int calledPublisherIdx = publishers.indexOf(publisher);
+      assertThat(calledPublisherIdx).isEqualTo(publishersToBeCalled.get(calledCount.getAndIncrement()));
+      return Preconditions.checkNotNull(
+          mock.get(calledPublisherIdx),
+          "Mono result not set for publisher %d", calledPublisherIdx
+      );
+    });
+    stepVerifier.accept(StepVerifier.create(mono));
+    assertThat(calledCount.get()).isEqualTo(publishersToBeCalled.size());
+  }
+
+
+  private void fluxCheck(Map<Integer, Flux<String>> mock,
+                         List<Integer> publishersToBeCalled, // for checking calls order
+                         Consumer<StepVerifier.Step<?>> stepVerifier) {
+    AtomicInteger calledCount = new AtomicInteger();
+    var flux = failover.flux(publisher -> {
+      int calledPublisherIdx = publishers.indexOf(publisher);
+      assertThat(calledPublisherIdx).isEqualTo(publishersToBeCalled.get(calledCount.getAndIncrement()));
+      return Preconditions.checkNotNull(
+          mock.get(calledPublisherIdx),
+          "Mono result not set for publisher %d", calledPublisherIdx
+      );
+    });
+    stepVerifier.accept(StepVerifier.create(flux));
+    assertThat(calledCount.get()).isEqualTo(publishersToBeCalled.size());
+  }
+
+  private Flux<String> okFlux() {
+    return Flux.just("ok");
+  }
+
+  private Flux<String> failingFlux() {
+    return Flux.error(FAILING_EXCEPTION_SUPPLIER);
+  }
+
+  private Mono<String> okMono() {
+    return Mono.just("ok");
+  }
+
+  private Mono<String> failingMono() {
+    return Mono.error(FAILING_EXCEPTION_SUPPLIER);
+  }
+
+  public static class Publisher {
+  }
+
+}

+ 26 - 0
kafka-ui-contract/pom.xml

@@ -122,6 +122,32 @@
                                         <asyncNative>true</asyncNative>
                                         <library>webclient</library>
 
+                                        <useBeanValidation>true</useBeanValidation>
+                                        <dateLibrary>java8</dateLibrary>
+                                    </configOptions>
+                                </configuration>
+                            </execution>
+                            <execution>
+                                <id>generate-sr-client</id>
+                                <goals>
+                                    <goal>generate</goal>
+                                </goals>
+                                <configuration>
+                                    <inputSpec>${project.basedir}/src/main/resources/swagger/kafka-sr-api.yaml
+                                    </inputSpec>
+                                    <output>${project.build.directory}/generated-sources/kafka-sr-client</output>
+                                    <generatorName>java</generatorName>
+                                    <generateApiTests>false</generateApiTests>
+                                    <generateModelTests>false</generateModelTests>
+
+                                    <configOptions>
+                                        <modelPackage>com.provectus.kafka.ui.sr.model</modelPackage>
+                                        <apiPackage>com.provectus.kafka.ui.sr.api</apiPackage>
+                                        <sourceFolder>kafka-sr-client</sourceFolder>
+
+                                        <asyncNative>true</asyncNative>
+                                        <library>webclient</library>
+
                                         <useBeanValidation>true</useBeanValidation>
                                         <dateLibrary>java8</dateLibrary>
                                     </configOptions>

+ 404 - 0
kafka-ui-contract/src/main/resources/swagger/kafka-sr-api.yaml

@@ -0,0 +1,404 @@
+openapi: 3.0.0
+info:
+    description: Api Documentation
+    version: 0.1.0
+    title: Api Documentation
+    termsOfService: urn:tos
+    contact: {}
+    license:
+        name: Apache 2.0
+        url: http://www.apache.org/licenses/LICENSE-2.0
+tags:
+    - name: /schemaregistry
+servers:
+    - url: /localhost
+
+paths:
+    /subjects:
+        get:
+            tags:
+              - KafkaSrClient
+            summary: get all connectors from Kafka Connect service
+            operationId: getAllSubjectNames
+            parameters:
+              - name: subjectPrefix
+                in: query
+                required: false
+                schema:
+                  type: string
+              - name: deleted
+                in: query
+                schema:
+                  type: boolean
+            responses:
+                200:
+                  description: OK
+                  content:
+                      application/json:
+                          schema:
+                            #workaround for https://github.com/spring-projects/spring-framework/issues/24734
+                            type: string
+
+    /subjects/{subject}:
+        delete:
+            tags:
+                - KafkaSrClient
+            operationId: deleteAllSubjectVersions
+            parameters:
+                - name: subject
+                  in: path
+                  required: true
+                  schema:
+                    type: string
+                - name: permanent
+                  in: query
+                  schema:
+                    type: boolean
+                  required: false
+            responses:
+                200:
+                    description: OK
+                404:
+                    description: Not found
+
+    /subjects/{subject}/versions/{version}:
+        get:
+            tags:
+              - KafkaSrClient
+            operationId: getSubjectVersion
+            parameters:
+              - name: subject
+                in: path
+                required: true
+                schema:
+                  type: string
+              - name: version
+                in: path
+                required: true
+                schema:
+                  type: string
+            responses:
+                200:
+                    description: OK
+                    content:
+                        application/json:
+                            schema:
+                                $ref: '#/components/schemas/SchemaSubject'
+                404:
+                    description: Not found
+                422:
+                    description: Invalid version
+        delete:
+            tags:
+                - KafkaSrClient
+            operationId: deleteSubjectVersion
+            parameters:
+                - name: subject
+                  in: path
+                  required: true
+                  schema:
+                    type: string
+                - name: permanent
+                  in: query
+                  required: false
+                  schema:
+                    type: boolean
+                    default: false
+                - name: version
+                  in: path
+                  required: true
+                  schema:
+                    type: string
+            responses:
+                200:
+                    description: OK
+                404:
+                    description: Not found
+
+    /subjects/{subject}/versions:
+        get:
+            tags:
+                - KafkaSrClient
+            operationId: getSubjectVersions
+            parameters:
+                - name: subject
+                  in: path
+                  required: true
+                  schema:
+                      type: string
+            responses:
+                200:
+                    description: OK
+                    content:
+                        application/json:
+                            schema:
+                                type: array
+                                items:
+                                    type: integer
+                                    format: int32
+                404:
+                    description: Not found
+        post:
+            tags:
+                - KafkaSrClient
+            operationId: registerNewSchema
+            parameters:
+                - name: subject
+                  in: path
+                  required: true
+                  schema:
+                      type: string
+            requestBody:
+                content:
+                    application/json:
+                        schema:
+                            $ref: '#/components/schemas/NewSubject'
+            responses:
+                200:
+                    description: OK
+                    content:
+                        application/json:
+                            schema:
+                                $ref: '#/components/schemas/SubjectId'
+
+    /config/:
+        get:
+            tags:
+                - KafkaSrClient
+            operationId: getGlobalCompatibilityLevel
+            responses:
+                200:
+                    description: OK
+                    content:
+                        application/json:
+                            schema:
+                                $ref: '#/components/schemas/CompatibilityConfig'
+                404:
+                    description: Not found
+        put:
+            tags:
+                - KafkaSrClient
+            operationId: updateGlobalCompatibilityLevel
+            requestBody:
+                content:
+                    application/json:
+                        schema:
+                            $ref: '#/components/schemas/CompatibilityLevelChange'
+            responses:
+                200:
+                    description: OK
+                    content:
+                        application/json:
+                            schema:
+                                $ref: '#/components/schemas/CompatibilityLevelChange'
+                404:
+                    description: Not found
+
+    /config/{subject}:
+        get:
+            tags:
+                - KafkaSrClient
+            operationId: getSubjectCompatibilityLevel
+            parameters:
+                - name: subject
+                  in: path
+                  required: true
+                  schema:
+                      type: string
+                - name: defaultToGlobal
+                  in: query
+                  required: true
+                  schema:
+                      type: boolean
+            responses:
+                200:
+                    description: OK
+                    content:
+                        application/json:
+                            schema:
+                                $ref: '#/components/schemas/CompatibilityConfig'
+                404:
+                    description: Not found
+        put:
+            tags:
+                - KafkaSrClient
+            operationId: updateSubjectCompatibilityLevel
+            parameters:
+                - name: subject
+                  in: path
+                  required: true
+                  schema:
+                      type: string
+            requestBody:
+                content:
+                    application/json:
+                        schema:
+                            $ref: '#/components/schemas/CompatibilityLevelChange'
+            responses:
+                200:
+                    description: OK
+                    content:
+                        application/json:
+                            schema:
+                                $ref: '#/components/schemas/CompatibilityLevelChange'
+                404:
+                    description: Not found
+        delete:
+            tags:
+                - KafkaSrClient
+            operationId: deleteSubjectCompatibilityLevel
+            parameters:
+                - name: subject
+                  in: path
+                  required: true
+                  schema:
+                      type: string
+            responses:
+                200:
+                    description: OK
+                404:
+                    description: Not found
+
+    /compatibility/subjects/{subject}/versions/{version}:
+        post:
+            tags:
+                - KafkaSrClient
+            operationId: checkSchemaCompatibility
+            parameters:
+                - name: subject
+                  in: path
+                  required: true
+                  schema:
+                      type: string
+                - name: version
+                  in: path
+                  required: true
+                  schema:
+                      type: string
+                - name: verbose
+                  in: query
+                  description: Show reason a schema fails the compatibility test
+                  schema:
+                      type: boolean
+            requestBody:
+                content:
+                    application/json:
+                        schema:
+                            $ref: '#/components/schemas/NewSubject'
+            responses:
+                200:
+                    description: OK
+                    content:
+                        application/json:
+                            schema:
+                                $ref: '#/components/schemas/CompatibilityCheckResponse'
+                404:
+                    description: Not found
+
+security:
+    - basicAuth: []
+
+components:
+    securitySchemes:
+        basicAuth:
+            type: http
+            scheme: basic
+    schemas:
+        SchemaSubject:
+            type: object
+            properties:
+              subject:
+                type: string
+              version:
+                type: string
+              id:
+                type: integer
+              schema:
+                type: string
+              schemaType:
+                  $ref: '#/components/schemas/SchemaType'
+            required:
+              - id
+              - subject
+              - version
+              - schema
+              - schemaType
+
+        SchemaType:
+          type: string
+          description: upon updating a schema, the type of an existing schema can't be changed
+          enum:
+            - AVRO
+            - JSON
+            - PROTOBUF
+
+        SchemaReference:
+            type: object
+            properties:
+                name:
+                    type: string
+                subject:
+                    type: string
+                version:
+                    type: integer
+            required:
+                - name
+                - subject
+                - version
+
+        SubjectId:
+            type: object
+            properties:
+                id:
+                    type: integer
+
+        NewSubject:
+            type: object
+            description: should be set for creating/updating schema subject
+            properties:
+                schema:
+                    type: string
+                schemaType:
+                    $ref: '#/components/schemas/SchemaType'
+                references:
+                    type: array
+                    items:
+                        $ref: '#/components/schemas/SchemaReference'
+            required:
+                - schema
+                - schemaType
+
+        CompatibilityConfig:
+            type: object
+            properties:
+                compatibilityLevel:
+                    $ref: '#/components/schemas/Compatibility'
+            required:
+                - compatibilityLevel
+
+        CompatibilityLevelChange:
+            type: object
+            properties:
+                compatibility:
+                    $ref: '#/components/schemas/Compatibility'
+            required:
+                - compatibility
+
+
+        Compatibility:
+            type: string
+            enum:
+                - BACKWARD
+                - BACKWARD_TRANSITIVE
+                - FORWARD
+                - FORWARD_TRANSITIVE
+                - FULL
+                - FULL_TRANSITIVE
+                - NONE
+
+
+        CompatibilityCheckResponse:
+            type: object
+            properties:
+                is_compatible:
+                    type: boolean