Merge branch 'master' into audit-be-minor-refactoring

This commit is contained in:
Ilya Kuramshin 2023-08-01 15:54:00 +04:00 committed by GitHub
commit 04087fb9e1
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
5 changed files with 36 additions and 72 deletions

View file

@ -19,12 +19,6 @@ public class AvroEmbeddedSerde implements BuiltInSerde {
return "Avro (Embedded)";
}
@Override
public void configure(PropertyResolver serdeProperties,
PropertyResolver kafkaClusterProperties,
PropertyResolver globalProperties) {
}
@Override
public Optional<String> getDescription() {
return Optional.empty();

View file

@ -1,8 +1,6 @@
package com.provectus.kafka.ui.serdes.builtin;
import com.provectus.kafka.ui.serde.api.DeserializeResult;
import com.provectus.kafka.ui.serde.api.PropertyResolver;
import com.provectus.kafka.ui.serde.api.RecordHeaders;
import com.provectus.kafka.ui.serde.api.SchemaDescription;
import com.provectus.kafka.ui.serdes.BuiltInSerde;
import java.util.Base64;
@ -16,12 +14,6 @@ public class Base64Serde implements BuiltInSerde {
return "Base64";
}
@Override
public void configure(PropertyResolver serdeProperties,
PropertyResolver kafkaClusterProperties,
PropertyResolver globalProperties) {
}
@Override
public Optional<String> getDescription() {
return Optional.empty();
@ -44,31 +36,25 @@ public class Base64Serde implements BuiltInSerde {
@Override
public Serializer serializer(String topic, Target type) {
return new Serializer() {
@Override
public byte[] serialize(String input) {
input = input.trim();
// it is actually a hack to provide ability to sent empty array as a key/value
if (input.length() == 0) {
return new byte[]{};
}
return Base64.getDecoder().decode(input);
var decoder = Base64.getDecoder();
return inputString -> {
inputString = inputString.trim();
// it is actually a hack to provide ability to sent empty array as a key/value
if (inputString.length() == 0) {
return new byte[] {};
}
return decoder.decode(inputString);
};
}
@Override
public Deserializer deserializer(String topic, Target type) {
var encoder = Base64.getEncoder();
return new Deserializer() {
@Override
public DeserializeResult deserialize(RecordHeaders headers, byte[] data) {
return new DeserializeResult(
return (headers, data) ->
new DeserializeResult(
encoder.encodeToString(data),
DeserializeResult.Type.STRING,
Map.of()
);
}
};
}
}

View file

@ -55,15 +55,11 @@ public class Int64Serde implements BuiltInSerde {
@Override
public Deserializer deserializer(String topic, Target type) {
return new Deserializer() {
@Override
public DeserializeResult deserialize(RecordHeaders headers, byte[] data) {
return new DeserializeResult(
return (headers, data) ->
new DeserializeResult(
String.valueOf(Longs.fromByteArray(data)),
DeserializeResult.Type.JSON,
Map.of()
);
}
};
}
}

View file

@ -1,10 +1,8 @@
package com.provectus.kafka.ui.serdes.builtin;
import com.google.common.primitives.Longs;
import com.google.common.primitives.UnsignedInteger;
import com.google.common.primitives.UnsignedLong;
import com.provectus.kafka.ui.serde.api.DeserializeResult;
import com.provectus.kafka.ui.serde.api.RecordHeaders;
import com.provectus.kafka.ui.serde.api.SchemaDescription;
import com.provectus.kafka.ui.serdes.BuiltInSerde;
import java.util.Map;
@ -32,7 +30,7 @@ public class UInt64Serde implements BuiltInSerde {
+ " \"minimum\" : 0, "
+ " \"maximum\" : %s "
+ "}",
UnsignedInteger.MAX_VALUE
UnsignedLong.MAX_VALUE
),
Map.of()
)
@ -56,15 +54,11 @@ public class UInt64Serde implements BuiltInSerde {
@Override
public Deserializer deserializer(String topic, Target type) {
return new Deserializer() {
@Override
public DeserializeResult deserialize(RecordHeaders headers, byte[] data) {
return new DeserializeResult(
return (headers, data) ->
new DeserializeResult(
UnsignedLong.fromLongBits(Longs.fromByteArray(data)).toString(),
DeserializeResult.Type.JSON,
Map.of()
);
}
};
}
}

View file

@ -50,41 +50,35 @@ public class UuidBinarySerde implements BuiltInSerde {
@Override
public Serializer serializer(String topic, Target type) {
return new Serializer() {
@Override
public byte[] serialize(String input) {
UUID uuid = UUID.fromString(input);
ByteBuffer bb = ByteBuffer.wrap(new byte[16]);
if (mostSignificantBitsFirst) {
bb.putLong(uuid.getMostSignificantBits());
bb.putLong(uuid.getLeastSignificantBits());
} else {
bb.putLong(uuid.getLeastSignificantBits());
bb.putLong(uuid.getMostSignificantBits());
}
return bb.array();
return input -> {
UUID uuid = UUID.fromString(input);
ByteBuffer bb = ByteBuffer.wrap(new byte[16]);
if (mostSignificantBitsFirst) {
bb.putLong(uuid.getMostSignificantBits());
bb.putLong(uuid.getLeastSignificantBits());
} else {
bb.putLong(uuid.getLeastSignificantBits());
bb.putLong(uuid.getMostSignificantBits());
}
return bb.array();
};
}
@Override
public Deserializer deserializer(String topic, Target type) {
return new Deserializer() {
@Override
public DeserializeResult deserialize(RecordHeaders headers, byte[] data) {
if (data.length != 16) {
throw new ValidationException("UUID data should be 16 bytes, but it is " + data.length);
}
ByteBuffer bb = ByteBuffer.wrap(data);
long msb = bb.getLong();
long lsb = bb.getLong();
UUID uuid = mostSignificantBitsFirst ? new UUID(msb, lsb) : new UUID(lsb, msb);
return new DeserializeResult(
uuid.toString(),
DeserializeResult.Type.STRING,
Map.of()
);
return (headers, data) -> {
if (data.length != 16) {
throw new ValidationException("UUID data should be 16 bytes, but it is " + data.length);
}
ByteBuffer bb = ByteBuffer.wrap(data);
long msb = bb.getLong();
long lsb = bb.getLong();
UUID uuid = mostSignificantBitsFirst ? new UUID(msb, lsb) : new UUID(lsb, msb);
return new DeserializeResult(
uuid.toString(),
DeserializeResult.Type.STRING,
Map.of()
);
};
}
}