fix schemaRegistry property by adding checks (#752)

* fix schemaRegistry property by adding checks

* style fix

* pull request fix

Co-authored-by: marselakhmetov <makhmetov@provectus.com>
This commit is contained in:
Marsel 2021-07-30 17:51:56 +05:00 committed by GitHub
parent a9d659642e
commit f7ce347149
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
2 changed files with 6 additions and 3 deletions

View file

@ -35,6 +35,7 @@ import java.math.BigDecimal;
import java.nio.file.Path;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Properties;
import java.util.stream.Collectors;
import org.apache.kafka.clients.admin.ConfigEntry;
@ -91,7 +92,8 @@ public interface ClusterMapper {
Partition toPartition(InternalPartition topic);
default InternalSchemaRegistry setSchemaRegistry(ClustersProperties.Cluster clusterProperties) {
if (clusterProperties == null) {
if (clusterProperties == null
|| clusterProperties.getSchemaRegistry() == null) {
return null;
}

View file

@ -68,8 +68,9 @@ public class SchemaRegistryAwareRecordSerDe implements RecordSerDe {
private final ObjectMapper objectMapper = new ObjectMapper();
private static SchemaRegistryClient createSchemaRegistryClient(KafkaCluster cluster) {
Objects.requireNonNull(cluster.getSchemaRegistry());
Objects.requireNonNull(cluster.getSchemaRegistry().getUrl());
if (cluster.getSchemaRegistry() == null) {
throw new ValidationException("schemaRegistry is not specified");
}
List<SchemaProvider> schemaProviders =
List.of(new AvroSchemaProvider(), new ProtobufSchemaProvider(), new JsonSchemaProvider());