Browse Source

Issue#493 add authentication to schema registry (#679)

* adding basic authentication for SchemaRegistry

* checkstyle fix

* tests fix

* pull request fix

* pull request fixes

* replace string with constant

* adding documentation

Co-authored-by: marselakhmetov <makhmetov@provectus.com>
Marsel 3 năm trước cách đây
mục cha
commit
20cbcd33e2

+ 7 - 0
README.md

@@ -132,6 +132,9 @@ kafka:
       bootstrapServers: localhost:29091
       zookeeper: localhost:2183
       schemaRegistry: http://localhost:8085
+      schemaRegistryAuth:
+        username: username
+        password: password
 #     schemaNameTemplate: "%s-value"
       jmxPort: 9997
     -
@@ -141,6 +144,8 @@ kafka:
 * `bootstrapServers`: where to connect
 * `zookeeper`: zookeeper service address
 * `schemaRegistry`: schemaRegistry's address
+* `schemaRegistryAuth.username`: schemaRegistry's basic authentication username
+* `schemaRegistryAuth.password`: schemaRegistry's basic authentication password
 * `schemaNameTemplate`: how keys are saved to schemaRegistry
 * `jmxPort`: open jmxPosrts of a broker
 * `readOnly`: enable read only mode
@@ -160,6 +165,8 @@ For example, if you want to use an environment variable to set the `name` parame
 |`KAFKA_CLUSTERS_0_ZOOKEEPER` 	| Zookeper service address 
 |`KAFKA_CLUSTERS_0_PROPERTIES_SECURITY_PROTOCOL` 	|Security protocol to connect to the brokers. For SSL connection use "SSL", for plaintext connection don't set this environment variable
 |`KAFKA_CLUSTERS_0_SCHEMAREGISTRY`   	|SchemaRegistry's address
+|`KAFKA_CLUSTERS_0_SCHEMAREGISTRYAUTH_USERNAME`   	|SchemaRegistry's basic authentication username
+|`KAFKA_CLUSTERS_0_SCHEMAREGISTRYAUTH_PASSWORD`   	|SchemaRegistry's basic authentication password
 |`KAFKA_CLUSTERS_0_SCHEMANAMETEMPLATE`  |How keys are saved to schemaRegistry
 |`KAFKA_CLUSTERS_0_JMXPORT`        	|Open jmxPosrts of a broker
 |`KAFKA_CLUSTERS_0_READONLY`        	|Enable read only mode. Default: false

+ 7 - 0
kafka-ui-api/src/main/java/com/provectus/kafka/ui/config/ClustersProperties.java

@@ -20,6 +20,7 @@ public class ClustersProperties {
     String bootstrapServers;
     String zookeeper;
     String schemaRegistry;
+    SchemaRegistryAuth schemaRegistryAuth;
     String schemaNameTemplate = "%s-value";
     String keySchemaNameTemplate = "%s-key";
     String protobufFile;
@@ -35,4 +36,10 @@ public class ClustersProperties {
     String name;
     String address;
   }
+
+  @Data
+  public static class SchemaRegistryAuth {
+    String username;
+    String password;
+  }
 }

+ 20 - 0
kafka-ui-api/src/main/java/com/provectus/kafka/ui/mapper/ClusterMapper.java

@@ -15,6 +15,7 @@ import com.provectus.kafka.ui.model.InternalBrokerMetrics;
 import com.provectus.kafka.ui.model.InternalClusterMetrics;
 import com.provectus.kafka.ui.model.InternalPartition;
 import com.provectus.kafka.ui.model.InternalReplica;
+import com.provectus.kafka.ui.model.InternalSchemaRegistry;
 import com.provectus.kafka.ui.model.InternalTopic;
 import com.provectus.kafka.ui.model.InternalTopicConfig;
 import com.provectus.kafka.ui.model.KafkaCluster;
@@ -49,6 +50,7 @@ public interface ClusterMapper {
 
   @Mapping(target = "protobufFile", source = "protobufFile", qualifiedByName = "resolvePath")
   @Mapping(target = "properties", source = "properties", qualifiedByName = "setProperties")
+  @Mapping(target = "schemaRegistry", source = ".", qualifiedByName = "setSchemaRegistry")
   KafkaCluster toKafkaCluster(ClustersProperties.Cluster clusterProperties);
 
   @Mapping(target = "diskUsage", source = "internalBrokerDiskUsage",
@@ -64,6 +66,24 @@ public interface ClusterMapper {
 
   Partition toPartition(InternalPartition topic);
 
+  default InternalSchemaRegistry setSchemaRegistry(ClustersProperties.Cluster clusterProperties) {
+    if (clusterProperties == null) {
+      return null;
+    }
+
+    InternalSchemaRegistry.InternalSchemaRegistryBuilder internalSchemaRegistry =
+        InternalSchemaRegistry.builder();
+
+    internalSchemaRegistry.url(clusterProperties.getSchemaRegistry());
+
+    if (clusterProperties.getSchemaRegistryAuth() != null) {
+      internalSchemaRegistry.username(clusterProperties.getSchemaRegistryAuth().getUsername());
+      internalSchemaRegistry.password(clusterProperties.getSchemaRegistryAuth().getPassword());
+    }
+
+    return internalSchemaRegistry.build();
+  }
+
   TopicDetails toTopicDetails(InternalTopic topic);
 
   default TopicDetails toTopicDetails(InternalTopic topic, InternalClusterMetrics metrics) {

+ 12 - 0
kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/InternalSchemaRegistry.java

@@ -0,0 +1,12 @@
+package com.provectus.kafka.ui.model;
+
+import lombok.Builder;
+import lombok.Data;
+
+@Data
+@Builder(toBuilder = true)
+public class InternalSchemaRegistry {
+  private final String username;
+  private final String password;
+  private final String url;
+}

+ 1 - 1
kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/KafkaCluster.java

@@ -15,7 +15,7 @@ public class KafkaCluster {
   private final Integer jmxPort;
   private final String bootstrapServers;
   private final String zookeeper;
-  private final String schemaRegistry;
+  private final InternalSchemaRegistry schemaRegistry;
   private final List<KafkaConnectCluster> kafkaConnect;
   private final String schemaNameTemplate;
   private final String keySchemaNameTemplate;

+ 26 - 4
kafka-ui-api/src/main/java/com/provectus/kafka/ui/serde/schemaregistry/SchemaRegistryAwareRecordSerDe.java

@@ -1,6 +1,11 @@
 package com.provectus.kafka.ui.serde.schemaregistry;
 
+
+import static io.confluent.kafka.serializers.AbstractKafkaSchemaSerDeConfig.BASIC_AUTH_CREDENTIALS_SOURCE;
+import static io.confluent.kafka.serializers.AbstractKafkaSchemaSerDeConfig.USER_INFO_CONFIG;
+
 import com.fasterxml.jackson.databind.ObjectMapper;
+import com.provectus.kafka.ui.exception.ValidationException;
 import com.provectus.kafka.ui.model.KafkaCluster;
 import com.provectus.kafka.ui.model.MessageSchema;
 import com.provectus.kafka.ui.model.TopicMessageSchema;
@@ -22,6 +27,7 @@ import io.confluent.kafka.schemaregistry.protobuf.ProtobufSchemaProvider;
 import java.net.URI;
 import java.nio.ByteBuffer;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
@@ -63,14 +69,29 @@ public class SchemaRegistryAwareRecordSerDe implements RecordSerDe {
 
   private static SchemaRegistryClient createSchemaRegistryClient(KafkaCluster cluster) {
     Objects.requireNonNull(cluster.getSchemaRegistry());
+    Objects.requireNonNull(cluster.getSchemaRegistry().getUrl());
     List<SchemaProvider> schemaProviders =
         List.of(new AvroSchemaProvider(), new ProtobufSchemaProvider(), new JsonSchemaProvider());
-    //TODO add auth
+
+    Map<String, String> configs = new HashMap<>();
+    String username = cluster.getSchemaRegistry().getUsername();
+    String password = cluster.getSchemaRegistry().getPassword();
+
+    if (username != null && password != null) {
+      configs.put(BASIC_AUTH_CREDENTIALS_SOURCE, "USER_INFO");
+      configs.put(USER_INFO_CONFIG, username + ":" + password);
+    } else if (username != null) {
+      throw new ValidationException(
+          "You specified username but do not specified password");
+    } else if (password != null) {
+      throw new ValidationException(
+          "You specified password but do not specified username");
+    }
     return new CachedSchemaRegistryClient(
-        Collections.singletonList(cluster.getSchemaRegistry()),
+        Collections.singletonList(cluster.getSchemaRegistry().getUrl()),
         CLIENT_IDENTITY_MAP_CAPACITY,
         schemaProviders,
-        Collections.emptyMap()
+        configs
     );
   }
 
@@ -181,7 +202,8 @@ public class SchemaRegistryAwareRecordSerDe implements RecordSerDe {
   private String convertSchema(SchemaMetadata schema) {
 
     String jsonSchema;
-    URI basePath = new URI(cluster.getSchemaRegistry()).resolve(Integer.toString(schema.getId()));
+    URI basePath = new URI(cluster.getSchemaRegistry().getUrl())
+        .resolve(Integer.toString(schema.getId()));
     final ParsedSchema schemaById = Objects.requireNonNull(schemaRegistryClient)
         .getSchemaById(schema.getId());
 

+ 76 - 25
kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/SchemaRegistryService.java

@@ -7,9 +7,11 @@ import com.provectus.kafka.ui.exception.ClusterNotFoundException;
 import com.provectus.kafka.ui.exception.DuplicateEntityException;
 import com.provectus.kafka.ui.exception.SchemaNotFoundException;
 import com.provectus.kafka.ui.exception.UnprocessableEntityException;
+import com.provectus.kafka.ui.exception.ValidationException;
 import com.provectus.kafka.ui.mapper.ClusterMapper;
 import com.provectus.kafka.ui.model.CompatibilityCheckResponse;
 import com.provectus.kafka.ui.model.CompatibilityLevel;
+import com.provectus.kafka.ui.model.InternalSchemaRegistry;
 import com.provectus.kafka.ui.model.KafkaCluster;
 import com.provectus.kafka.ui.model.NewSchemaSubject;
 import com.provectus.kafka.ui.model.SchemaSubject;
@@ -26,6 +28,8 @@ import java.util.function.Function;
 import lombok.RequiredArgsConstructor;
 import lombok.extern.log4j.Log4j2;
 import org.jetbrains.annotations.NotNull;
+import org.springframework.http.HttpHeaders;
+import org.springframework.http.HttpMethod;
 import org.springframework.http.MediaType;
 import org.springframework.http.ResponseEntity;
 import org.springframework.stereotype.Service;
@@ -61,8 +65,10 @@ public class SchemaRegistryService {
 
   public Mono<String[]> getAllSubjectNames(String clusterName) {
     return clustersStorage.getClusterByName(clusterName)
-        .map(cluster -> webClient.get()
-            .uri(cluster.getSchemaRegistry() + URL_SUBJECTS)
+        .map(cluster -> configuredWebClient(
+            cluster,
+            HttpMethod.GET,
+            URL_SUBJECTS)
             .retrieve()
             .bodyToMono(String[].class)
             .doOnError(log::error)
@@ -77,8 +83,10 @@ public class SchemaRegistryService {
 
   private Flux<Integer> getSubjectVersions(String clusterName, String schemaName) {
     return clustersStorage.getClusterByName(clusterName)
-        .map(cluster -> webClient.get()
-            .uri(cluster.getSchemaRegistry() + URL_SUBJECT_VERSIONS, schemaName)
+        .map(cluster -> configuredWebClient(
+            cluster,
+            HttpMethod.GET,
+            URL_SUBJECT_VERSIONS, schemaName)
             .retrieve()
             .onStatus(NOT_FOUND::equals,
                 throwIfNotFoundStatus(formatted(NO_SUCH_SCHEMA, schemaName))
@@ -99,8 +107,10 @@ public class SchemaRegistryService {
   private Mono<SchemaSubject> getSchemaSubject(String clusterName, String schemaName,
                                                String version) {
     return clustersStorage.getClusterByName(clusterName)
-        .map(cluster -> webClient.get()
-            .uri(cluster.getSchemaRegistry() + URL_SUBJECT_BY_VERSION, schemaName, version)
+        .map(cluster -> configuredWebClient(
+            cluster,
+            HttpMethod.GET,
+            URL_SUBJECT_BY_VERSION, schemaName, version)
             .retrieve()
             .onStatus(NOT_FOUND::equals,
                 throwIfNotFoundStatus(formatted(NO_SUCH_SCHEMA_VERSION, schemaName, version))
@@ -140,8 +150,10 @@ public class SchemaRegistryService {
   private Mono<ResponseEntity<Void>> deleteSchemaSubject(String clusterName, String schemaName,
                                                          String version) {
     return clustersStorage.getClusterByName(clusterName)
-        .map(cluster -> webClient.delete()
-            .uri(cluster.getSchemaRegistry() + URL_SUBJECT_BY_VERSION, schemaName, version)
+        .map(cluster -> configuredWebClient(
+            cluster,
+            HttpMethod.DELETE,
+            URL_SUBJECT_BY_VERSION, schemaName, version)
             .retrieve()
             .onStatus(NOT_FOUND::equals,
                 throwIfNotFoundStatus(formatted(NO_SUCH_SCHEMA_VERSION, schemaName, version))
@@ -152,8 +164,10 @@ public class SchemaRegistryService {
   public Mono<ResponseEntity<Void>> deleteSchemaSubjectEntirely(String clusterName,
                                                                 String schemaName) {
     return clustersStorage.getClusterByName(clusterName)
-        .map(cluster -> webClient.delete()
-            .uri(cluster.getSchemaRegistry() + URL_SUBJECT, schemaName)
+        .map(cluster -> configuredWebClient(
+            cluster,
+            HttpMethod.DELETE,
+            URL_SUBJECT, schemaName)
             .retrieve()
             .onStatus(NOT_FOUND::equals,
                 throwIfNotFoundStatus(formatted(NO_SUCH_SCHEMA, schemaName))
@@ -178,8 +192,8 @@ public class SchemaRegistryService {
           return clustersStorage.getClusterByName(clusterName)
               .map(KafkaCluster::getSchemaRegistry)
               .map(
-                  schemaRegistryUrl -> checkSchemaOnDuplicate(subject, newSchema, schemaRegistryUrl)
-                      .flatMap(s -> submitNewSchema(subject, newSchema, schemaRegistryUrl))
+                  schemaRegistry -> checkSchemaOnDuplicate(subject, newSchema, schemaRegistry)
+                      .flatMap(s -> submitNewSchema(subject, newSchema, schemaRegistry))
                       .flatMap(resp -> getLatestSchemaVersionBySubject(clusterName, subject))
               )
               .orElse(Mono.error(ClusterNotFoundException::new));
@@ -189,9 +203,11 @@ public class SchemaRegistryService {
   @NotNull
   private Mono<SubjectIdResponse> submitNewSchema(String subject,
                                                   Mono<InternalNewSchema> newSchemaSubject,
-                                                  String schemaRegistryUrl) {
-    return webClient.post()
-        .uri(schemaRegistryUrl + URL_SUBJECT_VERSIONS, subject)
+                                                  InternalSchemaRegistry schemaRegistry) {
+    return configuredWebClient(
+        schemaRegistry,
+        HttpMethod.POST,
+        URL_SUBJECT_VERSIONS, subject)
         .contentType(MediaType.APPLICATION_JSON)
         .body(BodyInserters.fromPublisher(newSchemaSubject, InternalNewSchema.class))
         .retrieve()
@@ -204,9 +220,11 @@ public class SchemaRegistryService {
   @NotNull
   private Mono<SchemaSubject> checkSchemaOnDuplicate(String subject,
                                                      Mono<InternalNewSchema> newSchemaSubject,
-                                                     String schemaRegistryUrl) {
-    return webClient.post()
-        .uri(schemaRegistryUrl + URL_SUBJECT, subject)
+                                                     InternalSchemaRegistry schemaRegistry) {
+    return configuredWebClient(
+        schemaRegistry,
+        HttpMethod.POST,
+        URL_SUBJECT, subject)
         .contentType(MediaType.APPLICATION_JSON)
         .body(BodyInserters.fromPublisher(newSchemaSubject, InternalNewSchema.class))
         .retrieve()
@@ -236,8 +254,10 @@ public class SchemaRegistryService {
     return clustersStorage.getClusterByName(clusterName)
         .map(cluster -> {
           String configEndpoint = Objects.isNull(schemaName) ? "/config" : "/config/{schemaName}";
-          return webClient.put()
-              .uri(cluster.getSchemaRegistry() + configEndpoint, schemaName)
+          return configuredWebClient(
+              cluster,
+              HttpMethod.PUT,
+              configEndpoint, schemaName)
               .contentType(MediaType.APPLICATION_JSON)
               .body(BodyInserters.fromPublisher(compatibilityLevel, CompatibilityLevel.class))
               .retrieve()
@@ -257,8 +277,10 @@ public class SchemaRegistryService {
     return clustersStorage.getClusterByName(clusterName)
         .map(cluster -> {
           String configEndpoint = Objects.isNull(schemaName) ? "/config" : "/config/{schemaName}";
-          return webClient.get()
-              .uri(cluster.getSchemaRegistry() + configEndpoint, schemaName)
+          return configuredWebClient(
+              cluster,
+              HttpMethod.GET,
+              configEndpoint, schemaName)
               .retrieve()
               .bodyToMono(InternalCompatibilityLevel.class)
               .map(mapper::toCompatibilityLevel)
@@ -279,9 +301,10 @@ public class SchemaRegistryService {
   public Mono<CompatibilityCheckResponse> checksSchemaCompatibility(
       String clusterName, String schemaName, Mono<NewSchemaSubject> newSchemaSubject) {
     return clustersStorage.getClusterByName(clusterName)
-        .map(cluster -> webClient.post()
-            .uri(cluster.getSchemaRegistry()
-                + "/compatibility/subjects/{schemaName}/versions/latest", schemaName)
+        .map(cluster -> configuredWebClient(
+            cluster,
+            HttpMethod.POST,
+            "/compatibility/subjects/{schemaName}/versions/latest", schemaName)
             .contentType(MediaType.APPLICATION_JSON)
             .body(BodyInserters.fromPublisher(newSchemaSubject, NewSchemaSubject.class))
             .retrieve()
@@ -296,4 +319,32 @@ public class SchemaRegistryService {
   public String formatted(String str, Object... args) {
     return new Formatter().format(str, args).toString();
   }
+
+  private void setBasicAuthIfEnabled(InternalSchemaRegistry schemaRegistry, HttpHeaders headers) {
+    if (schemaRegistry.getUsername() != null && schemaRegistry.getPassword() != null) {
+      headers.setBasicAuth(
+          schemaRegistry.getUsername(),
+          schemaRegistry.getPassword()
+      );
+    } else if (schemaRegistry.getUsername() != null) {
+      throw new ValidationException(
+          "You specified username but do not specified password");
+    } else if (schemaRegistry.getPassword() != null) {
+      throw new ValidationException(
+          "You specified password but do not specified username");
+    }
+  }
+
+  private WebClient.RequestBodySpec configuredWebClient(KafkaCluster cluster, HttpMethod method,
+                                                        String uri, Object... params) {
+    return configuredWebClient(cluster.getSchemaRegistry(), method, uri, params);
+  }
+
+  private WebClient.RequestBodySpec configuredWebClient(InternalSchemaRegistry schemaRegistry,
+                                                        HttpMethod method, String uri,
+                                                        Object... params) {
+    return webClient.method(method)
+        .uri(schemaRegistry.getUrl() + uri, params)
+        .headers(headers -> setBasicAuthIfEnabled(schemaRegistry, headers));
+  }
 }