Browse Source

minor improvements

iliax 1 year ago
parent
commit
fcdfd701ad

+ 6 - 19
kafka-ui-api/src/main/java/com/provectus/kafka/ui/serdes/builtin/sr/SchemaRegistrySerde.java

@@ -10,7 +10,6 @@ import com.google.common.annotations.VisibleForTesting;
 import com.provectus.kafka.ui.exception.ValidationException;
 import com.provectus.kafka.ui.exception.ValidationException;
 import com.provectus.kafka.ui.serde.api.DeserializeResult;
 import com.provectus.kafka.ui.serde.api.DeserializeResult;
 import com.provectus.kafka.ui.serde.api.PropertyResolver;
 import com.provectus.kafka.ui.serde.api.PropertyResolver;
-import com.provectus.kafka.ui.serde.api.RecordHeaders;
 import com.provectus.kafka.ui.serde.api.SchemaDescription;
 import com.provectus.kafka.ui.serde.api.SchemaDescription;
 import com.provectus.kafka.ui.serdes.BuiltInSerde;
 import com.provectus.kafka.ui.serdes.BuiltInSerde;
 import com.provectus.kafka.ui.util.jsonschema.AvroJsonSchemaConverter;
 import com.provectus.kafka.ui.util.jsonschema.AvroJsonSchemaConverter;
@@ -35,13 +34,15 @@ import java.util.Map;
 import java.util.Optional;
 import java.util.Optional;
 import java.util.concurrent.Callable;
 import java.util.concurrent.Callable;
 import javax.annotation.Nullable;
 import javax.annotation.Nullable;
-import lombok.RequiredArgsConstructor;
 import lombok.SneakyThrows;
 import lombok.SneakyThrows;
 import org.apache.kafka.common.config.SslConfigs;
 import org.apache.kafka.common.config.SslConfigs;
 
 
 
 
 public class SchemaRegistrySerde implements BuiltInSerde {
 public class SchemaRegistrySerde implements BuiltInSerde {
 
 
+  private static final byte SR_PAYLOAD_MAGIC_BYTE = 0x0;
+  private static final int SR_PAYLOAD_PREFIX_LENGTH = 5;
+
   public static String name() {
   public static String name() {
     return "SchemaRegistry";
     return "SchemaRegistry";
   }
   }
@@ -275,21 +276,7 @@ public class SchemaRegistrySerde implements BuiltInSerde {
 
 
   @Override
   @Override
   public Deserializer deserializer(String topic, Target type) {
   public Deserializer deserializer(String topic, Target type) {
-    return new SrDeserializer(topic);
-  }
-
-  ///--------------------------------------------------------------
-
-  private static final byte SR_RECORD_MAGIC_BYTE = (byte) 0;
-  private static final int SR_RECORD_PREFIX_LENGTH = 5;
-
-  @RequiredArgsConstructor
-  private class SrDeserializer implements Deserializer {
-
-    private final String topic;
-
-    @Override
-    public DeserializeResult deserialize(RecordHeaders headers, byte[] data) {
+    return (headers, data) -> {
       var schemaId = extractSchemaIdFromMsg(data);
       var schemaId = extractSchemaIdFromMsg(data);
       SchemaType format = getMessageFormatBySchemaId(schemaId);
       SchemaType format = getMessageFormatBySchemaId(schemaId);
       MessageFormatter formatter = schemaRegistryFormatters.get(format);
       MessageFormatter formatter = schemaRegistryFormatters.get(format);
@@ -301,7 +288,7 @@ public class SchemaRegistrySerde implements BuiltInSerde {
               "type", format.name()
               "type", format.name()
           )
           )
       );
       );
-    }
+    };
   }
   }
 
 
   private SchemaType getMessageFormatBySchemaId(int schemaId) {
   private SchemaType getMessageFormatBySchemaId(int schemaId) {
@@ -313,7 +300,7 @@ public class SchemaRegistrySerde implements BuiltInSerde {
 
 
   private int extractSchemaIdFromMsg(byte[] data) {
   private int extractSchemaIdFromMsg(byte[] data) {
     ByteBuffer buffer = ByteBuffer.wrap(data);
     ByteBuffer buffer = ByteBuffer.wrap(data);
-    if (buffer.remaining() > SR_RECORD_PREFIX_LENGTH && buffer.get() == SR_RECORD_MAGIC_BYTE) {
+    if (buffer.remaining() >= SR_PAYLOAD_PREFIX_LENGTH && buffer.get() == SR_PAYLOAD_MAGIC_BYTE) {
       return buffer.getInt();
       return buffer.getInt();
     }
     }
     throw new ValidationException(
     throw new ValidationException(