Просмотр исходного кода

BE: Refactor SchemaRegistry serialization logic (#4116)

Co-authored-by: iliax <ikuramshin@provectus.com>
Co-authored-by: Roman Zabaluev <rzabaluev@provectus.com>
Ilya Kuramshin 1 год назад
Родитель
Сommit
b0583a3ca7

+ 0 - 46
kafka-ui-api/src/main/java/com/provectus/kafka/ui/serdes/builtin/sr/AvroSchemaRegistrySerializer.java

@@ -1,46 +0,0 @@
-package com.provectus.kafka.ui.serdes.builtin.sr;
-
-import com.provectus.kafka.ui.util.jsonschema.JsonAvroConversion;
-import io.confluent.kafka.schemaregistry.ParsedSchema;
-import io.confluent.kafka.schemaregistry.avro.AvroSchema;
-import io.confluent.kafka.schemaregistry.client.SchemaMetadata;
-import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
-import io.confluent.kafka.serializers.AbstractKafkaSchemaSerDeConfig;
-import io.confluent.kafka.serializers.KafkaAvroSerializer;
-import io.confluent.kafka.serializers.KafkaAvroSerializerConfig;
-import java.util.Map;
-import org.apache.kafka.common.serialization.Serializer;
-
-class AvroSchemaRegistrySerializer extends SchemaRegistrySerializer<Object> {
-
-  AvroSchemaRegistrySerializer(String topic, boolean isKey,
-                               SchemaRegistryClient client,
-                               SchemaMetadata schema) {
-    super(topic, isKey, client, schema);
-  }
-
-  @Override
-  protected Serializer<Object> createSerializer(SchemaRegistryClient client) {
-    var serializer = new KafkaAvroSerializer(client);
-    serializer.configure(
-        Map.of(
-            "schema.registry.url", "wontbeused",
-            AbstractKafkaSchemaSerDeConfig.AUTO_REGISTER_SCHEMAS, false,
-            KafkaAvroSerializerConfig.AVRO_USE_LOGICAL_TYPE_CONVERTERS_CONFIG, true,
-            AbstractKafkaSchemaSerDeConfig.USE_LATEST_VERSION, true
-        ),
-        isKey
-    );
-    return serializer;
-  }
-
-  @Override
-  protected Object serialize(String value, ParsedSchema schema) {
-    try {
-      return JsonAvroConversion.convertJsonToAvro(value, ((AvroSchema) schema).rawSchema());
-    } catch (Throwable e) {
-      throw new RuntimeException("Failed to serialize record for topic " + topic, e);
-    }
-
-  }
-}

+ 0 - 79
kafka-ui-api/src/main/java/com/provectus/kafka/ui/serdes/builtin/sr/JsonSchemaSchemaRegistrySerializer.java

@@ -1,79 +0,0 @@
-package com.provectus.kafka.ui.serdes.builtin.sr;
-
-import com.fasterxml.jackson.core.JsonProcessingException;
-import com.fasterxml.jackson.databind.JsonNode;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.provectus.kafka.ui.exception.ValidationException;
-import com.provectus.kafka.ui.util.annotation.KafkaClientInternalsDependant;
-import io.confluent.kafka.schemaregistry.ParsedSchema;
-import io.confluent.kafka.schemaregistry.client.SchemaMetadata;
-import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
-import io.confluent.kafka.schemaregistry.json.JsonSchema;
-import io.confluent.kafka.serializers.AbstractKafkaSchemaSerDeConfig;
-import io.confluent.kafka.serializers.json.KafkaJsonSchemaSerializer;
-import java.util.Map;
-import org.apache.kafka.common.serialization.Serializer;
-
-class JsonSchemaSchemaRegistrySerializer extends SchemaRegistrySerializer<JsonNode> {
-
-  private static final ObjectMapper MAPPER = new ObjectMapper();
-
-  JsonSchemaSchemaRegistrySerializer(String topic,
-                                            boolean isKey,
-                                            SchemaRegistryClient client,
-                                            SchemaMetadata schema) {
-    super(topic, isKey, client, schema);
-  }
-
-  @Override
-  protected Serializer<JsonNode> createSerializer(SchemaRegistryClient client) {
-    var serializer = new KafkaJsonSchemaSerializerWithoutSchemaInfer(client);
-    serializer.configure(
-        Map.of(
-            "schema.registry.url", "wontbeused",
-            AbstractKafkaSchemaSerDeConfig.AUTO_REGISTER_SCHEMAS, false,
-            AbstractKafkaSchemaSerDeConfig.USE_LATEST_VERSION, true
-        ),
-        isKey
-    );
-    return serializer;
-  }
-
-  @Override
-  protected JsonNode serialize(String value, ParsedSchema schema) {
-    try {
-      JsonNode json = MAPPER.readTree(value);
-      ((JsonSchema) schema).validate(json);
-      return json;
-    } catch (JsonProcessingException e) {
-      throw new ValidationException(String.format("'%s' is not valid json", value));
-    } catch (org.everit.json.schema.ValidationException e) {
-      throw new ValidationException(
-          String.format("'%s' does not fit schema: %s", value, e.getAllMessages()));
-    }
-  }
-
-  @KafkaClientInternalsDependant
-  private class KafkaJsonSchemaSerializerWithoutSchemaInfer
-      extends KafkaJsonSchemaSerializer<JsonNode> {
-
-    KafkaJsonSchemaSerializerWithoutSchemaInfer(SchemaRegistryClient client) {
-      super(client);
-    }
-
-    /**
-     * Need to override original method because it tries to infer schema from input
-     * by checking 'schema' json field or @Schema annotation on input class, which is not
-     * possible in our case. So, we just skip all infer logic and pass schema directly.
-     */
-    @Override
-    public byte[] serialize(String topic, JsonNode rec) {
-      return super.serializeImpl(
-          super.getSubjectName(topic, isKey, rec, schema),
-          rec,
-          (JsonSchema) schema
-      );
-    }
-  }
-
-}

+ 0 - 50
kafka-ui-api/src/main/java/com/provectus/kafka/ui/serdes/builtin/sr/ProtobufSchemaRegistrySerializer.java

@@ -1,50 +0,0 @@
-package com.provectus.kafka.ui.serdes.builtin.sr;
-
-import com.google.protobuf.DynamicMessage;
-import com.google.protobuf.Message;
-import com.google.protobuf.util.JsonFormat;
-import io.confluent.kafka.schemaregistry.ParsedSchema;
-import io.confluent.kafka.schemaregistry.client.SchemaMetadata;
-import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
-import io.confluent.kafka.schemaregistry.protobuf.ProtobufSchema;
-import io.confluent.kafka.serializers.AbstractKafkaSchemaSerDeConfig;
-import io.confluent.kafka.serializers.protobuf.KafkaProtobufSerializer;
-import java.util.Map;
-import lombok.SneakyThrows;
-import org.apache.kafka.common.serialization.Serializer;
-
-class ProtobufSchemaRegistrySerializer extends SchemaRegistrySerializer<Message> {
-
-  @SneakyThrows
-  public ProtobufSchemaRegistrySerializer(String topic, boolean isKey,
-                                          SchemaRegistryClient client, SchemaMetadata schema) {
-    super(topic, isKey, client, schema);
-  }
-
-  @Override
-  protected Serializer<Message> createSerializer(SchemaRegistryClient client) {
-    var serializer = new KafkaProtobufSerializer<>(client);
-    serializer.configure(
-        Map.of(
-            "schema.registry.url", "wontbeused",
-            AbstractKafkaSchemaSerDeConfig.AUTO_REGISTER_SCHEMAS, false,
-            AbstractKafkaSchemaSerDeConfig.USE_LATEST_VERSION, true
-        ),
-        isKey
-    );
-    return serializer;
-  }
-
-  @Override
-  protected Message serialize(String value, ParsedSchema schema) {
-    ProtobufSchema protobufSchema = (ProtobufSchema) schema;
-    DynamicMessage.Builder builder = protobufSchema.newMessageBuilder();
-    try {
-      JsonFormat.parser().merge(value, builder);
-      return builder.build();
-    } catch (Throwable e) {
-      throw new RuntimeException("Failed to serialize record for topic " + topic, e);
-    }
-  }
-
-}

+ 25 - 29
kafka-ui-api/src/main/java/com/provectus/kafka/ui/serdes/builtin/sr/SchemaRegistrySerde.java

@@ -1,5 +1,8 @@
 package com.provectus.kafka.ui.serdes.builtin.sr;
 
+import static com.provectus.kafka.ui.serdes.builtin.sr.Serialize.serializeAvro;
+import static com.provectus.kafka.ui.serdes.builtin.sr.Serialize.serializeJson;
+import static com.provectus.kafka.ui.serdes.builtin.sr.Serialize.serializeProto;
 import static io.confluent.kafka.serializers.AbstractKafkaSchemaSerDeConfig.BASIC_AUTH_CREDENTIALS_SOURCE;
 import static io.confluent.kafka.serializers.AbstractKafkaSchemaSerDeConfig.USER_INFO_CONFIG;
 
@@ -7,7 +10,6 @@ import com.google.common.annotations.VisibleForTesting;
 import com.provectus.kafka.ui.exception.ValidationException;
 import com.provectus.kafka.ui.serde.api.DeserializeResult;
 import com.provectus.kafka.ui.serde.api.PropertyResolver;
-import com.provectus.kafka.ui.serde.api.RecordHeaders;
 import com.provectus.kafka.ui.serde.api.SchemaDescription;
 import com.provectus.kafka.ui.serdes.BuiltInSerde;
 import com.provectus.kafka.ui.util.jsonschema.AvroJsonSchemaConverter;
@@ -32,13 +34,15 @@ import java.util.Map;
 import java.util.Optional;
 import java.util.concurrent.Callable;
 import javax.annotation.Nullable;
-import lombok.RequiredArgsConstructor;
 import lombok.SneakyThrows;
 import org.apache.kafka.common.config.SslConfigs;
 
 
 public class SchemaRegistrySerde implements BuiltInSerde {
 
+  private static final byte SR_PAYLOAD_MAGIC_BYTE = 0x0;
+  private static final int SR_PAYLOAD_PREFIX_LENGTH = 5;
+
   public static String name() {
     return "SchemaRegistry";
   }
@@ -221,8 +225,8 @@ public class SchemaRegistrySerde implements BuiltInSerde {
           .convert(basePath, ((AvroSchema) parsedSchema).rawSchema())
           .toJson();
       case JSON ->
-        //need to use confluent JsonSchema since it includes resolved references
-        ((JsonSchema) parsedSchema).rawSchema().toString();
+          //need to use confluent JsonSchema since it includes resolved references
+          ((JsonSchema) parsedSchema).rawSchema().toString();
     };
   }
 
@@ -254,35 +258,27 @@ public class SchemaRegistrySerde implements BuiltInSerde {
   @Override
   public Serializer serializer(String topic, Target type) {
     String subject = schemaSubject(topic, type);
-    var schema = getSchemaBySubject(subject)
-        .orElseThrow(() -> new ValidationException(String.format("No schema for subject '%s' found", subject)));
-    boolean isKey = type == Target.KEY;
-    SchemaType schemaType = SchemaType.fromString(schema.getSchemaType())
-        .orElseThrow(() -> new IllegalStateException("Unknown schema type: " + schema.getSchemaType()));
+    SchemaMetadata meta = getSchemaBySubject(subject)
+        .orElseThrow(() -> new ValidationException(
+            String.format("No schema for subject '%s' found", subject)));
+    ParsedSchema schema = getSchemaById(meta.getId())
+        .orElseThrow(() -> new IllegalStateException(
+            String.format("Schema found for id %s, subject '%s'", meta.getId(), subject)));
+    SchemaType schemaType = SchemaType.fromString(meta.getSchemaType())
+        .orElseThrow(() -> new IllegalStateException("Unknown schema type: " + meta.getSchemaType()));
     return switch (schemaType) {
-      case PROTOBUF -> new ProtobufSchemaRegistrySerializer(topic, isKey, schemaRegistryClient, schema);
-      case AVRO -> new AvroSchemaRegistrySerializer(topic, isKey, schemaRegistryClient, schema);
-      case JSON -> new JsonSchemaSchemaRegistrySerializer(topic, isKey, schemaRegistryClient, schema);
+      case PROTOBUF -> input ->
+          serializeProto(schemaRegistryClient, topic, type, (ProtobufSchema) schema, meta.getId(), input);
+      case AVRO -> input ->
+          serializeAvro((AvroSchema) schema, meta.getId(), input);
+      case JSON -> input ->
+          serializeJson((JsonSchema) schema, meta.getId(), input);
     };
   }
 
   @Override
   public Deserializer deserializer(String topic, Target type) {
-    return new SrDeserializer(topic);
-  }
-
-  ///--------------------------------------------------------------
-
-  private static final byte SR_RECORD_MAGIC_BYTE = (byte) 0;
-  private static final int SR_RECORD_PREFIX_LENGTH = 5;
-
-  @RequiredArgsConstructor
-  private class SrDeserializer implements Deserializer {
-
-    private final String topic;
-
-    @Override
-    public DeserializeResult deserialize(RecordHeaders headers, byte[] data) {
+    return (headers, data) -> {
       var schemaId = extractSchemaIdFromMsg(data);
       SchemaType format = getMessageFormatBySchemaId(schemaId);
       MessageFormatter formatter = schemaRegistryFormatters.get(format);
@@ -294,7 +290,7 @@ public class SchemaRegistrySerde implements BuiltInSerde {
               "type", format.name()
           )
       );
-    }
+    };
   }
 
   private SchemaType getMessageFormatBySchemaId(int schemaId) {
@@ -306,7 +302,7 @@ public class SchemaRegistrySerde implements BuiltInSerde {
 
   private int extractSchemaIdFromMsg(byte[] data) {
     ByteBuffer buffer = ByteBuffer.wrap(data);
-    if (buffer.remaining() > SR_RECORD_PREFIX_LENGTH && buffer.get() == SR_RECORD_MAGIC_BYTE) {
+    if (buffer.remaining() >= SR_PAYLOAD_PREFIX_LENGTH && buffer.get() == SR_PAYLOAD_MAGIC_BYTE) {
       return buffer.getInt();
     }
     throw new ValidationException(

+ 0 - 34
kafka-ui-api/src/main/java/com/provectus/kafka/ui/serdes/builtin/sr/SchemaRegistrySerializer.java

@@ -1,34 +0,0 @@
-package com.provectus.kafka.ui.serdes.builtin.sr;
-
-import com.provectus.kafka.ui.serde.api.Serde;
-import io.confluent.kafka.schemaregistry.ParsedSchema;
-import io.confluent.kafka.schemaregistry.client.SchemaMetadata;
-import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
-import lombok.SneakyThrows;
-import org.apache.kafka.common.serialization.Serializer;
-
-abstract class SchemaRegistrySerializer<T> implements Serde.Serializer {
-  protected final Serializer<T> serializer;
-  protected final String topic;
-  protected final boolean isKey;
-  protected final ParsedSchema schema;
-
-  @SneakyThrows
-  protected SchemaRegistrySerializer(String topic, boolean isKey, SchemaRegistryClient client,
-                                     SchemaMetadata schema) {
-    this.topic = topic;
-    this.isKey = isKey;
-    this.serializer = createSerializer(client);
-    this.schema = client.getSchemaById(schema.getId());
-  }
-
-  protected abstract Serializer<T> createSerializer(SchemaRegistryClient client);
-
-  @Override
-  public byte[] serialize(String input) {
-    final T read = this.serialize(input, schema);
-    return this.serializer.serialize(topic, read);
-  }
-
-  protected abstract T serialize(String value, ParsedSchema schema);
-}

+ 126 - 0
kafka-ui-api/src/main/java/com/provectus/kafka/ui/serdes/builtin/sr/Serialize.java

@@ -0,0 +1,126 @@
+package com.provectus.kafka.ui.serdes.builtin.sr;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Preconditions;
+import com.google.protobuf.DynamicMessage;
+import com.google.protobuf.Message;
+import com.google.protobuf.util.JsonFormat;
+import com.provectus.kafka.ui.exception.ValidationException;
+import com.provectus.kafka.ui.serde.api.Serde;
+import com.provectus.kafka.ui.util.annotation.KafkaClientInternalsDependant;
+import com.provectus.kafka.ui.util.jsonschema.JsonAvroConversion;
+import io.confluent.kafka.schemaregistry.avro.AvroSchema;
+import io.confluent.kafka.schemaregistry.avro.AvroSchemaUtils;
+import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
+import io.confluent.kafka.schemaregistry.json.JsonSchema;
+import io.confluent.kafka.schemaregistry.json.jackson.Jackson;
+import io.confluent.kafka.schemaregistry.protobuf.MessageIndexes;
+import io.confluent.kafka.schemaregistry.protobuf.ProtobufSchema;
+import io.confluent.kafka.serializers.protobuf.AbstractKafkaProtobufSerializer;
+import io.confluent.kafka.serializers.subject.DefaultReferenceSubjectNameStrategy;
+import java.io.ByteArrayOutputStream;
+import java.nio.ByteBuffer;
+import java.util.HashMap;
+import lombok.SneakyThrows;
+import org.apache.avro.Schema;
+import org.apache.avro.io.BinaryEncoder;
+import org.apache.avro.io.DatumWriter;
+import org.apache.avro.io.EncoderFactory;
+
+final class Serialize {
+
+  private static final byte MAGIC = 0x0;
+  private static final ObjectMapper JSON_SERIALIZE_MAPPER = Jackson.newObjectMapper(); //from confluent package
+
+  private Serialize() {
+  }
+
+  @KafkaClientInternalsDependant("AbstractKafkaJsonSchemaSerializer::serializeImpl")
+  @SneakyThrows
+  static byte[] serializeJson(JsonSchema schema, int schemaId, String value) {
+    JsonNode json;
+    try {
+      json = JSON_SERIALIZE_MAPPER.readTree(value);
+    } catch (JsonProcessingException e) {
+      throw new ValidationException(String.format("'%s' is not valid json", value));
+    }
+    try {
+      schema.validate(json);
+    } catch (org.everit.json.schema.ValidationException e) {
+      throw new ValidationException(
+          String.format("'%s' does not fit schema: %s", value, e.getAllMessages()));
+    }
+    try (var out = new ByteArrayOutputStream()) {
+      out.write(MAGIC);
+      out.write(schemaId(schemaId));
+      out.write(JSON_SERIALIZE_MAPPER.writeValueAsBytes(json));
+      return out.toByteArray();
+    }
+  }
+
+  @KafkaClientInternalsDependant("AbstractKafkaProtobufSerializer::serializeImpl")
+  @SneakyThrows
+  static byte[] serializeProto(SchemaRegistryClient srClient,
+                               String topic,
+                               Serde.Target target,
+                               ProtobufSchema schema,
+                               int schemaId,
+                               String input) {
+    // flags are tuned like in ProtobufSerializer by default
+    boolean normalizeSchema = false;
+    boolean autoRegisterSchema = false;
+    boolean useLatestVersion = true;
+    boolean latestCompatStrict = true;
+    boolean skipKnownTypes = true;
+
+    schema = AbstractKafkaProtobufSerializer.resolveDependencies(
+        srClient, normalizeSchema, autoRegisterSchema, useLatestVersion, latestCompatStrict,
+        new HashMap<>(), skipKnownTypes, new DefaultReferenceSubjectNameStrategy(),
+        topic, target == Serde.Target.KEY, schema
+    );
+
+    DynamicMessage.Builder builder = schema.newMessageBuilder();
+    JsonFormat.parser().merge(input, builder);
+    Message message = builder.build();
+    MessageIndexes indexes = schema.toMessageIndexes(message.getDescriptorForType().getFullName(), normalizeSchema);
+    try (var out = new ByteArrayOutputStream()) {
+      out.write(MAGIC);
+      out.write(schemaId(schemaId));
+      out.write(indexes.toByteArray());
+      message.writeTo(out);
+      return out.toByteArray();
+    }
+  }
+
+  @KafkaClientInternalsDependant("AbstractKafkaAvroSerializer::serializeImpl")
+  @SneakyThrows
+  static byte[] serializeAvro(AvroSchema schema, int schemaId, String input) {
+    var avroObject = JsonAvroConversion.convertJsonToAvro(input, schema.rawSchema());
+    try (var out = new ByteArrayOutputStream()) {
+      out.write(MAGIC);
+      out.write(schemaId(schemaId));
+      Schema rawSchema = schema.rawSchema();
+      if (rawSchema.getType().equals(Schema.Type.BYTES)) {
+        Preconditions.checkState(
+            avroObject instanceof ByteBuffer,
+            "Unrecognized bytes object of type: " + avroObject.getClass().getName()
+        );
+        out.write(((ByteBuffer) avroObject).array());
+      } else {
+        boolean useLogicalTypeConverters = true;
+        BinaryEncoder encoder = EncoderFactory.get().directBinaryEncoder(out, null);
+        DatumWriter<Object> writer =
+            (DatumWriter<Object>) AvroSchemaUtils.getDatumWriter(avroObject, rawSchema, useLogicalTypeConverters);
+        writer.write(avroObject, encoder);
+        encoder.flush();
+      }
+      return out.toByteArray();
+    }
+  }
+
+  private static byte[] schemaId(int id) {
+    return ByteBuffer.allocate(Integer.BYTES).putInt(id).array();
+  }
+}

+ 1 - 0
kafka-ui-api/src/main/java/com/provectus/kafka/ui/util/annotation/KafkaClientInternalsDependant.java

@@ -5,4 +5,5 @@ package com.provectus.kafka.ui.util.annotation;
  * should be marked with this annotation to make further update process easier.
  */
 public @interface KafkaClientInternalsDependant {
+  String value() default "";
 }