Browse Source

ISSUE-886 Schema registry multiple urls (#888)

German Osin 3 năm trước cách đây
mục cha
commit
5e74bda56e

+ 7 - 1
kafka-ui-api/src/main/java/com/provectus/kafka/ui/mapper/ClusterMapper.java

@@ -33,6 +33,8 @@ import com.provectus.kafka.ui.model.schemaregistry.InternalCompatibilityCheck;
 import com.provectus.kafka.ui.model.schemaregistry.InternalCompatibilityLevel;
 import java.math.BigDecimal;
 import java.nio.file.Path;
+import java.util.Arrays;
+import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
@@ -101,7 +103,11 @@ public interface ClusterMapper {
     InternalSchemaRegistry.InternalSchemaRegistryBuilder internalSchemaRegistry =
         InternalSchemaRegistry.builder();
 
-    internalSchemaRegistry.url(clusterProperties.getSchemaRegistry());
+    internalSchemaRegistry.url(
+        clusterProperties.getSchemaRegistry() != null
+            ? Arrays.asList(clusterProperties.getSchemaRegistry().split(","))
+            : Collections.emptyList()
+    );
 
     if (clusterProperties.getSchemaRegistryAuth() != null) {
       internalSchemaRegistry.username(clusterProperties.getSchemaRegistryAuth().getUsername());

+ 7 - 1
kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/InternalSchemaRegistry.java

@@ -1,5 +1,6 @@
 package com.provectus.kafka.ui.model;
 
+import java.util.List;
 import lombok.Builder;
 import lombok.Data;
 
@@ -8,5 +9,10 @@ import lombok.Data;
 public class InternalSchemaRegistry {
   private final String username;
   private final String password;
-  private final String url;
+  private final List<String> url;
+
+  public String getFirstUrl() {
+    return url != null  && !url.isEmpty() ? url.iterator().next() : null;
+  }
+
 }

+ 2 - 2
kafka-ui-api/src/main/java/com/provectus/kafka/ui/serde/schemaregistry/SchemaRegistryAwareRecordSerDe.java

@@ -90,7 +90,7 @@ public class SchemaRegistryAwareRecordSerDe implements RecordSerDe {
           "You specified password but do not specified username");
     }
     return new CachedSchemaRegistryClient(
-        Collections.singletonList(cluster.getSchemaRegistry().getUrl()),
+        cluster.getSchemaRegistry().getUrl(),
         CLIENT_IDENTITY_MAP_CAPACITY,
         schemaProviders,
         configs
@@ -218,7 +218,7 @@ public class SchemaRegistryAwareRecordSerDe implements RecordSerDe {
   private String convertSchema(SchemaMetadata schema) {
 
     String jsonSchema;
-    URI basePath = new URI(cluster.getSchemaRegistry().getUrl())
+    URI basePath = new URI(cluster.getSchemaRegistry().getFirstUrl())
         .resolve(Integer.toString(schema.getId()));
     final ParsedSchema schemaById = Objects.requireNonNull(schemaRegistryClient)
         .getSchemaById(schema.getId());

+ 3 - 2
kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/SchemaRegistryService.java

@@ -343,8 +343,9 @@ public class SchemaRegistryService {
   private WebClient.RequestBodySpec configuredWebClient(InternalSchemaRegistry schemaRegistry,
                                                         HttpMethod method, String uri,
                                                         Object... params) {
-    return webClient.method(method)
-        .uri(schemaRegistry.getUrl() + uri, params)
+    return webClient
+        .method(method)
+        .uri(schemaRegistry.getFirstUrl() + uri, params)
         .headers(headers -> setBasicAuthIfEnabled(schemaRegistry, headers));
   }
 }