From 20cbcd33e25c4187d6c35b636b453090a24aa8d5 Mon Sep 17 00:00:00 2001 From: Marsel <49659004+MarselAhmetov@users.noreply.github.com> Date: Fri, 23 Jul 2021 18:52:15 +0300 Subject: [PATCH] Issue#493 add authentication to schema registry (#679) * adding basic authentication for SchemaRegistry * checkstyle fix * tests fix * pull request fix * pull request fixes * replace string with constant * adding documentation Co-authored-by: marselakhmetov --- README.md | 7 ++ .../kafka/ui/config/ClustersProperties.java | 7 ++ .../kafka/ui/mapper/ClusterMapper.java | 20 ++++ .../ui/model/InternalSchemaRegistry.java | 12 +++ .../kafka/ui/model/KafkaCluster.java | 2 +- .../SchemaRegistryAwareRecordSerDe.java | 30 +++++- .../ui/service/SchemaRegistryService.java | 101 +++++++++++++----- 7 files changed, 149 insertions(+), 30 deletions(-) create mode 100644 kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/InternalSchemaRegistry.java diff --git a/README.md b/README.md index 6feb09572a..927b67b280 100644 --- a/README.md +++ b/README.md @@ -132,6 +132,9 @@ kafka: bootstrapServers: localhost:29091 zookeeper: localhost:2183 schemaRegistry: http://localhost:8085 + schemaRegistryAuth: + username: username + password: password # schemaNameTemplate: "%s-value" jmxPort: 9997 - @@ -141,6 +144,8 @@ kafka: * `bootstrapServers`: where to connect * `zookeeper`: zookeeper service address * `schemaRegistry`: schemaRegistry's address +* `schemaRegistryAuth.username`: schemaRegistry's basic authentication username +* `schemaRegistryAuth.password`: schemaRegistry's basic authentication password * `schemaNameTemplate`: how keys are saved to schemaRegistry * `jmxPort`: open jmxPosrts of a broker * `readOnly`: enable read only mode @@ -160,6 +165,8 @@ For example, if you want to use an environment variable to set the `name` parame |`KAFKA_CLUSTERS_0_ZOOKEEPER` | Zookeper service address |`KAFKA_CLUSTERS_0_PROPERTIES_SECURITY_PROTOCOL` |Security protocol to connect to the brokers. For SSL connection use "SSL", for plaintext connection don't set this environment variable |`KAFKA_CLUSTERS_0_SCHEMAREGISTRY` |SchemaRegistry's address +|`KAFKA_CLUSTERS_0_SCHEMAREGISTRYAUTH_USERNAME` |SchemaRegistry's basic authentication username +|`KAFKA_CLUSTERS_0_SCHEMAREGISTRYAUTH_PASSWORD` |SchemaRegistry's basic authentication password |`KAFKA_CLUSTERS_0_SCHEMANAMETEMPLATE` |How keys are saved to schemaRegistry |`KAFKA_CLUSTERS_0_JMXPORT` |Open jmxPosrts of a broker |`KAFKA_CLUSTERS_0_READONLY` |Enable read only mode. Default: false diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/config/ClustersProperties.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/config/ClustersProperties.java index 82954dff36..f847ca2a80 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/config/ClustersProperties.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/config/ClustersProperties.java @@ -20,6 +20,7 @@ public class ClustersProperties { String bootstrapServers; String zookeeper; String schemaRegistry; + SchemaRegistryAuth schemaRegistryAuth; String schemaNameTemplate = "%s-value"; String keySchemaNameTemplate = "%s-key"; String protobufFile; @@ -35,4 +36,10 @@ public class ClustersProperties { String name; String address; } + + @Data + public static class SchemaRegistryAuth { + String username; + String password; + } } diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/mapper/ClusterMapper.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/mapper/ClusterMapper.java index 766f3f24b3..321a6c0871 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/mapper/ClusterMapper.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/mapper/ClusterMapper.java @@ -15,6 +15,7 @@ import com.provectus.kafka.ui.model.InternalBrokerMetrics; import com.provectus.kafka.ui.model.InternalClusterMetrics; import com.provectus.kafka.ui.model.InternalPartition; import com.provectus.kafka.ui.model.InternalReplica; +import com.provectus.kafka.ui.model.InternalSchemaRegistry; import com.provectus.kafka.ui.model.InternalTopic; import com.provectus.kafka.ui.model.InternalTopicConfig; import com.provectus.kafka.ui.model.KafkaCluster; @@ -49,6 +50,7 @@ public interface ClusterMapper { @Mapping(target = "protobufFile", source = "protobufFile", qualifiedByName = "resolvePath") @Mapping(target = "properties", source = "properties", qualifiedByName = "setProperties") + @Mapping(target = "schemaRegistry", source = ".", qualifiedByName = "setSchemaRegistry") KafkaCluster toKafkaCluster(ClustersProperties.Cluster clusterProperties); @Mapping(target = "diskUsage", source = "internalBrokerDiskUsage", @@ -64,6 +66,24 @@ public interface ClusterMapper { Partition toPartition(InternalPartition topic); + default InternalSchemaRegistry setSchemaRegistry(ClustersProperties.Cluster clusterProperties) { + if (clusterProperties == null) { + return null; + } + + InternalSchemaRegistry.InternalSchemaRegistryBuilder internalSchemaRegistry = + InternalSchemaRegistry.builder(); + + internalSchemaRegistry.url(clusterProperties.getSchemaRegistry()); + + if (clusterProperties.getSchemaRegistryAuth() != null) { + internalSchemaRegistry.username(clusterProperties.getSchemaRegistryAuth().getUsername()); + internalSchemaRegistry.password(clusterProperties.getSchemaRegistryAuth().getPassword()); + } + + return internalSchemaRegistry.build(); + } + TopicDetails toTopicDetails(InternalTopic topic); default TopicDetails toTopicDetails(InternalTopic topic, InternalClusterMetrics metrics) { diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/InternalSchemaRegistry.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/InternalSchemaRegistry.java new file mode 100644 index 0000000000..378f2706f5 --- /dev/null +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/InternalSchemaRegistry.java @@ -0,0 +1,12 @@ +package com.provectus.kafka.ui.model; + +import lombok.Builder; +import lombok.Data; + +@Data +@Builder(toBuilder = true) +public class InternalSchemaRegistry { + private final String username; + private final String password; + private final String url; +} diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/KafkaCluster.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/KafkaCluster.java index 805f0937e9..68b1607376 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/KafkaCluster.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/KafkaCluster.java @@ -15,7 +15,7 @@ public class KafkaCluster { private final Integer jmxPort; private final String bootstrapServers; private final String zookeeper; - private final String schemaRegistry; + private final InternalSchemaRegistry schemaRegistry; private final List kafkaConnect; private final String schemaNameTemplate; private final String keySchemaNameTemplate; diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/serde/schemaregistry/SchemaRegistryAwareRecordSerDe.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/serde/schemaregistry/SchemaRegistryAwareRecordSerDe.java index 08aef7455f..03f1431b82 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/serde/schemaregistry/SchemaRegistryAwareRecordSerDe.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/serde/schemaregistry/SchemaRegistryAwareRecordSerDe.java @@ -1,6 +1,11 @@ package com.provectus.kafka.ui.serde.schemaregistry; + +import static io.confluent.kafka.serializers.AbstractKafkaSchemaSerDeConfig.BASIC_AUTH_CREDENTIALS_SOURCE; +import static io.confluent.kafka.serializers.AbstractKafkaSchemaSerDeConfig.USER_INFO_CONFIG; + import com.fasterxml.jackson.databind.ObjectMapper; +import com.provectus.kafka.ui.exception.ValidationException; import com.provectus.kafka.ui.model.KafkaCluster; import com.provectus.kafka.ui.model.MessageSchema; import com.provectus.kafka.ui.model.TopicMessageSchema; @@ -22,6 +27,7 @@ import io.confluent.kafka.schemaregistry.protobuf.ProtobufSchemaProvider; import java.net.URI; import java.nio.ByteBuffer; import java.util.Collections; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Objects; @@ -63,14 +69,29 @@ public class SchemaRegistryAwareRecordSerDe implements RecordSerDe { private static SchemaRegistryClient createSchemaRegistryClient(KafkaCluster cluster) { Objects.requireNonNull(cluster.getSchemaRegistry()); + Objects.requireNonNull(cluster.getSchemaRegistry().getUrl()); List schemaProviders = List.of(new AvroSchemaProvider(), new ProtobufSchemaProvider(), new JsonSchemaProvider()); - //TODO add auth + + Map configs = new HashMap<>(); + String username = cluster.getSchemaRegistry().getUsername(); + String password = cluster.getSchemaRegistry().getPassword(); + + if (username != null && password != null) { + configs.put(BASIC_AUTH_CREDENTIALS_SOURCE, "USER_INFO"); + configs.put(USER_INFO_CONFIG, username + ":" + password); + } else if (username != null) { + throw new ValidationException( + "You specified username but do not specified password"); + } else if (password != null) { + throw new ValidationException( + "You specified password but do not specified username"); + } return new CachedSchemaRegistryClient( - Collections.singletonList(cluster.getSchemaRegistry()), + Collections.singletonList(cluster.getSchemaRegistry().getUrl()), CLIENT_IDENTITY_MAP_CAPACITY, schemaProviders, - Collections.emptyMap() + configs ); } @@ -181,7 +202,8 @@ public class SchemaRegistryAwareRecordSerDe implements RecordSerDe { private String convertSchema(SchemaMetadata schema) { String jsonSchema; - URI basePath = new URI(cluster.getSchemaRegistry()).resolve(Integer.toString(schema.getId())); + URI basePath = new URI(cluster.getSchemaRegistry().getUrl()) + .resolve(Integer.toString(schema.getId())); final ParsedSchema schemaById = Objects.requireNonNull(schemaRegistryClient) .getSchemaById(schema.getId()); diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/SchemaRegistryService.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/SchemaRegistryService.java index 41374120b8..611d6eedf4 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/SchemaRegistryService.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/SchemaRegistryService.java @@ -7,9 +7,11 @@ import com.provectus.kafka.ui.exception.ClusterNotFoundException; import com.provectus.kafka.ui.exception.DuplicateEntityException; import com.provectus.kafka.ui.exception.SchemaNotFoundException; import com.provectus.kafka.ui.exception.UnprocessableEntityException; +import com.provectus.kafka.ui.exception.ValidationException; import com.provectus.kafka.ui.mapper.ClusterMapper; import com.provectus.kafka.ui.model.CompatibilityCheckResponse; import com.provectus.kafka.ui.model.CompatibilityLevel; +import com.provectus.kafka.ui.model.InternalSchemaRegistry; import com.provectus.kafka.ui.model.KafkaCluster; import com.provectus.kafka.ui.model.NewSchemaSubject; import com.provectus.kafka.ui.model.SchemaSubject; @@ -26,6 +28,8 @@ import java.util.function.Function; import lombok.RequiredArgsConstructor; import lombok.extern.log4j.Log4j2; import org.jetbrains.annotations.NotNull; +import org.springframework.http.HttpHeaders; +import org.springframework.http.HttpMethod; import org.springframework.http.MediaType; import org.springframework.http.ResponseEntity; import org.springframework.stereotype.Service; @@ -61,8 +65,10 @@ public class SchemaRegistryService { public Mono getAllSubjectNames(String clusterName) { return clustersStorage.getClusterByName(clusterName) - .map(cluster -> webClient.get() - .uri(cluster.getSchemaRegistry() + URL_SUBJECTS) + .map(cluster -> configuredWebClient( + cluster, + HttpMethod.GET, + URL_SUBJECTS) .retrieve() .bodyToMono(String[].class) .doOnError(log::error) @@ -77,8 +83,10 @@ public class SchemaRegistryService { private Flux getSubjectVersions(String clusterName, String schemaName) { return clustersStorage.getClusterByName(clusterName) - .map(cluster -> webClient.get() - .uri(cluster.getSchemaRegistry() + URL_SUBJECT_VERSIONS, schemaName) + .map(cluster -> configuredWebClient( + cluster, + HttpMethod.GET, + URL_SUBJECT_VERSIONS, schemaName) .retrieve() .onStatus(NOT_FOUND::equals, throwIfNotFoundStatus(formatted(NO_SUCH_SCHEMA, schemaName)) @@ -99,8 +107,10 @@ public class SchemaRegistryService { private Mono getSchemaSubject(String clusterName, String schemaName, String version) { return clustersStorage.getClusterByName(clusterName) - .map(cluster -> webClient.get() - .uri(cluster.getSchemaRegistry() + URL_SUBJECT_BY_VERSION, schemaName, version) + .map(cluster -> configuredWebClient( + cluster, + HttpMethod.GET, + URL_SUBJECT_BY_VERSION, schemaName, version) .retrieve() .onStatus(NOT_FOUND::equals, throwIfNotFoundStatus(formatted(NO_SUCH_SCHEMA_VERSION, schemaName, version)) @@ -140,8 +150,10 @@ public class SchemaRegistryService { private Mono> deleteSchemaSubject(String clusterName, String schemaName, String version) { return clustersStorage.getClusterByName(clusterName) - .map(cluster -> webClient.delete() - .uri(cluster.getSchemaRegistry() + URL_SUBJECT_BY_VERSION, schemaName, version) + .map(cluster -> configuredWebClient( + cluster, + HttpMethod.DELETE, + URL_SUBJECT_BY_VERSION, schemaName, version) .retrieve() .onStatus(NOT_FOUND::equals, throwIfNotFoundStatus(formatted(NO_SUCH_SCHEMA_VERSION, schemaName, version)) @@ -152,8 +164,10 @@ public class SchemaRegistryService { public Mono> deleteSchemaSubjectEntirely(String clusterName, String schemaName) { return clustersStorage.getClusterByName(clusterName) - .map(cluster -> webClient.delete() - .uri(cluster.getSchemaRegistry() + URL_SUBJECT, schemaName) + .map(cluster -> configuredWebClient( + cluster, + HttpMethod.DELETE, + URL_SUBJECT, schemaName) .retrieve() .onStatus(NOT_FOUND::equals, throwIfNotFoundStatus(formatted(NO_SUCH_SCHEMA, schemaName)) @@ -178,8 +192,8 @@ public class SchemaRegistryService { return clustersStorage.getClusterByName(clusterName) .map(KafkaCluster::getSchemaRegistry) .map( - schemaRegistryUrl -> checkSchemaOnDuplicate(subject, newSchema, schemaRegistryUrl) - .flatMap(s -> submitNewSchema(subject, newSchema, schemaRegistryUrl)) + schemaRegistry -> checkSchemaOnDuplicate(subject, newSchema, schemaRegistry) + .flatMap(s -> submitNewSchema(subject, newSchema, schemaRegistry)) .flatMap(resp -> getLatestSchemaVersionBySubject(clusterName, subject)) ) .orElse(Mono.error(ClusterNotFoundException::new)); @@ -189,9 +203,11 @@ public class SchemaRegistryService { @NotNull private Mono submitNewSchema(String subject, Mono newSchemaSubject, - String schemaRegistryUrl) { - return webClient.post() - .uri(schemaRegistryUrl + URL_SUBJECT_VERSIONS, subject) + InternalSchemaRegistry schemaRegistry) { + return configuredWebClient( + schemaRegistry, + HttpMethod.POST, + URL_SUBJECT_VERSIONS, subject) .contentType(MediaType.APPLICATION_JSON) .body(BodyInserters.fromPublisher(newSchemaSubject, InternalNewSchema.class)) .retrieve() @@ -204,9 +220,11 @@ public class SchemaRegistryService { @NotNull private Mono checkSchemaOnDuplicate(String subject, Mono newSchemaSubject, - String schemaRegistryUrl) { - return webClient.post() - .uri(schemaRegistryUrl + URL_SUBJECT, subject) + InternalSchemaRegistry schemaRegistry) { + return configuredWebClient( + schemaRegistry, + HttpMethod.POST, + URL_SUBJECT, subject) .contentType(MediaType.APPLICATION_JSON) .body(BodyInserters.fromPublisher(newSchemaSubject, InternalNewSchema.class)) .retrieve() @@ -236,8 +254,10 @@ public class SchemaRegistryService { return clustersStorage.getClusterByName(clusterName) .map(cluster -> { String configEndpoint = Objects.isNull(schemaName) ? "/config" : "/config/{schemaName}"; - return webClient.put() - .uri(cluster.getSchemaRegistry() + configEndpoint, schemaName) + return configuredWebClient( + cluster, + HttpMethod.PUT, + configEndpoint, schemaName) .contentType(MediaType.APPLICATION_JSON) .body(BodyInserters.fromPublisher(compatibilityLevel, CompatibilityLevel.class)) .retrieve() @@ -257,8 +277,10 @@ public class SchemaRegistryService { return clustersStorage.getClusterByName(clusterName) .map(cluster -> { String configEndpoint = Objects.isNull(schemaName) ? "/config" : "/config/{schemaName}"; - return webClient.get() - .uri(cluster.getSchemaRegistry() + configEndpoint, schemaName) + return configuredWebClient( + cluster, + HttpMethod.GET, + configEndpoint, schemaName) .retrieve() .bodyToMono(InternalCompatibilityLevel.class) .map(mapper::toCompatibilityLevel) @@ -279,9 +301,10 @@ public class SchemaRegistryService { public Mono checksSchemaCompatibility( String clusterName, String schemaName, Mono newSchemaSubject) { return clustersStorage.getClusterByName(clusterName) - .map(cluster -> webClient.post() - .uri(cluster.getSchemaRegistry() - + "/compatibility/subjects/{schemaName}/versions/latest", schemaName) + .map(cluster -> configuredWebClient( + cluster, + HttpMethod.POST, + "/compatibility/subjects/{schemaName}/versions/latest", schemaName) .contentType(MediaType.APPLICATION_JSON) .body(BodyInserters.fromPublisher(newSchemaSubject, NewSchemaSubject.class)) .retrieve() @@ -296,4 +319,32 @@ public class SchemaRegistryService { public String formatted(String str, Object... args) { return new Formatter().format(str, args).toString(); } + + private void setBasicAuthIfEnabled(InternalSchemaRegistry schemaRegistry, HttpHeaders headers) { + if (schemaRegistry.getUsername() != null && schemaRegistry.getPassword() != null) { + headers.setBasicAuth( + schemaRegistry.getUsername(), + schemaRegistry.getPassword() + ); + } else if (schemaRegistry.getUsername() != null) { + throw new ValidationException( + "You specified username but do not specified password"); + } else if (schemaRegistry.getPassword() != null) { + throw new ValidationException( + "You specified password but do not specified username"); + } + } + + private WebClient.RequestBodySpec configuredWebClient(KafkaCluster cluster, HttpMethod method, + String uri, Object... params) { + return configuredWebClient(cluster.getSchemaRegistry(), method, uri, params); + } + + private WebClient.RequestBodySpec configuredWebClient(InternalSchemaRegistry schemaRegistry, + HttpMethod method, String uri, + Object... params) { + return webClient.method(method) + .uri(schemaRegistry.getUrl() + uri, params) + .headers(headers -> setBasicAuthIfEnabled(schemaRegistry, headers)); + } }