|
@@ -1,6 +1,7 @@
|
|
package com.provectus.kafka.ui.util.jsonschema;
|
|
package com.provectus.kafka.ui.util.jsonschema;
|
|
|
|
|
|
import com.fasterxml.jackson.core.JsonParser;
|
|
import com.fasterxml.jackson.core.JsonParser;
|
|
|
|
+import com.fasterxml.jackson.core.JsonProcessingException;
|
|
import com.fasterxml.jackson.databind.JsonNode;
|
|
import com.fasterxml.jackson.databind.JsonNode;
|
|
import com.fasterxml.jackson.databind.json.JsonMapper;
|
|
import com.fasterxml.jackson.databind.json.JsonMapper;
|
|
import com.fasterxml.jackson.databind.node.ArrayNode;
|
|
import com.fasterxml.jackson.databind.node.ArrayNode;
|
|
@@ -15,7 +16,7 @@ import com.fasterxml.jackson.databind.node.NullNode;
|
|
import com.fasterxml.jackson.databind.node.ObjectNode;
|
|
import com.fasterxml.jackson.databind.node.ObjectNode;
|
|
import com.fasterxml.jackson.databind.node.TextNode;
|
|
import com.fasterxml.jackson.databind.node.TextNode;
|
|
import com.google.common.collect.Lists;
|
|
import com.google.common.collect.Lists;
|
|
-import com.provectus.kafka.ui.exception.JsonToAvroConversionException;
|
|
|
|
|
|
+import com.provectus.kafka.ui.exception.JsonAvroConversionException;
|
|
import io.confluent.kafka.serializers.AvroData;
|
|
import io.confluent.kafka.serializers.AvroData;
|
|
import java.math.BigDecimal;
|
|
import java.math.BigDecimal;
|
|
import java.nio.ByteBuffer;
|
|
import java.nio.ByteBuffer;
|
|
@@ -34,7 +35,6 @@ import java.util.Optional;
|
|
import java.util.concurrent.TimeUnit;
|
|
import java.util.concurrent.TimeUnit;
|
|
import java.util.function.BiFunction;
|
|
import java.util.function.BiFunction;
|
|
import java.util.stream.Stream;
|
|
import java.util.stream.Stream;
|
|
-import lombok.SneakyThrows;
|
|
|
|
import org.apache.avro.Schema;
|
|
import org.apache.avro.Schema;
|
|
import org.apache.avro.generic.GenericData;
|
|
import org.apache.avro.generic.GenericData;
|
|
|
|
|
|
@@ -42,12 +42,17 @@ import org.apache.avro.generic.GenericData;
|
|
public class JsonAvroConversion {
|
|
public class JsonAvroConversion {
|
|
|
|
|
|
private static final JsonMapper MAPPER = new JsonMapper();
|
|
private static final JsonMapper MAPPER = new JsonMapper();
|
|
|
|
+ private static final Schema NULL_SCHEMA = Schema.create(Schema.Type.NULL);
|
|
|
|
|
|
// converts json into Object that is expected input for KafkaAvroSerializer
|
|
// converts json into Object that is expected input for KafkaAvroSerializer
|
|
// (with AVRO_USE_LOGICAL_TYPE_CONVERTERS flat enabled!)
|
|
// (with AVRO_USE_LOGICAL_TYPE_CONVERTERS flat enabled!)
|
|
- @SneakyThrows
|
|
|
|
public static Object convertJsonToAvro(String jsonString, Schema avroSchema) {
|
|
public static Object convertJsonToAvro(String jsonString, Schema avroSchema) {
|
|
- JsonNode rootNode = MAPPER.readTree(jsonString);
|
|
|
|
|
|
+ JsonNode rootNode = null;
|
|
|
|
+ try {
|
|
|
|
+ rootNode = MAPPER.readTree(jsonString);
|
|
|
|
+ } catch (JsonProcessingException e) {
|
|
|
|
+ throw new JsonAvroConversionException("String is not a valid json");
|
|
|
|
+ }
|
|
return convert(rootNode, avroSchema);
|
|
return convert(rootNode, avroSchema);
|
|
}
|
|
}
|
|
|
|
|
|
@@ -80,7 +85,7 @@ public class JsonAvroConversion {
|
|
assertJsonType(node, JsonNodeType.STRING);
|
|
assertJsonType(node, JsonNodeType.STRING);
|
|
String symbol = node.textValue();
|
|
String symbol = node.textValue();
|
|
if (!avroSchema.getEnumSymbols().contains(symbol)) {
|
|
if (!avroSchema.getEnumSymbols().contains(symbol)) {
|
|
- throw new JsonToAvroConversionException("%s is not a part of enum symbols [%s]"
|
|
|
|
|
|
+ throw new JsonAvroConversionException("%s is not a part of enum symbols [%s]"
|
|
.formatted(symbol, avroSchema.getEnumSymbols()));
|
|
.formatted(symbol, avroSchema.getEnumSymbols()));
|
|
}
|
|
}
|
|
yield new GenericData.EnumSymbol(avroSchema, symbol);
|
|
yield new GenericData.EnumSymbol(avroSchema, symbol);
|
|
@@ -88,23 +93,35 @@ public class JsonAvroConversion {
|
|
case UNION -> {
|
|
case UNION -> {
|
|
// for types from enum (other than null) payload should be an object with single key == name of type
|
|
// for types from enum (other than null) payload should be an object with single key == name of type
|
|
// ex: schema = [ "null", "int", "string" ], possible payloads = null, { "string": "str" }, { "int": 123 }
|
|
// ex: schema = [ "null", "int", "string" ], possible payloads = null, { "string": "str" }, { "int": 123 }
|
|
- if (node.isNull() && avroSchema.getTypes().contains(Schema.create(Schema.Type.NULL))) {
|
|
|
|
|
|
+ if (node.isNull() && avroSchema.getTypes().contains(NULL_SCHEMA)) {
|
|
yield null;
|
|
yield null;
|
|
}
|
|
}
|
|
|
|
|
|
assertJsonType(node, JsonNodeType.OBJECT);
|
|
assertJsonType(node, JsonNodeType.OBJECT);
|
|
var elements = Lists.newArrayList(node.fields());
|
|
var elements = Lists.newArrayList(node.fields());
|
|
if (elements.size() != 1) {
|
|
if (elements.size() != 1) {
|
|
- throw new JsonToAvroConversionException(
|
|
|
|
|
|
+ throw new JsonAvroConversionException(
|
|
"UNION field value should be an object with single field == type name");
|
|
"UNION field value should be an object with single field == type name");
|
|
}
|
|
}
|
|
- var typeNameToValue = elements.get(0);
|
|
|
|
|
|
+ Map.Entry<String, JsonNode> typeNameToValue = elements.get(0);
|
|
|
|
+ List<Schema> candidates = new ArrayList<>();
|
|
for (Schema unionType : avroSchema.getTypes()) {
|
|
for (Schema unionType : avroSchema.getTypes()) {
|
|
if (typeNameToValue.getKey().equals(unionType.getFullName())) {
|
|
if (typeNameToValue.getKey().equals(unionType.getFullName())) {
|
|
yield convert(typeNameToValue.getValue(), unionType);
|
|
yield convert(typeNameToValue.getValue(), unionType);
|
|
}
|
|
}
|
|
|
|
+ if (typeNameToValue.getKey().equals(unionType.getName())) {
|
|
|
|
+ candidates.add(unionType);
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ if (candidates.size() == 1) {
|
|
|
|
+ yield convert(typeNameToValue.getValue(), candidates.get(0));
|
|
}
|
|
}
|
|
- throw new JsonToAvroConversionException(
|
|
|
|
|
|
+ if (candidates.size() > 1) {
|
|
|
|
+ throw new JsonAvroConversionException(
|
|
|
|
+ "Can't select type within union for value '%s'. Provide full type name.".formatted(node)
|
|
|
|
+ );
|
|
|
|
+ }
|
|
|
|
+ throw new JsonAvroConversionException(
|
|
"json value '%s' is cannot be converted to any of union types [%s]"
|
|
"json value '%s' is cannot be converted to any of union types [%s]"
|
|
.formatted(node, avroSchema.getTypes()));
|
|
.formatted(node, avroSchema.getTypes()));
|
|
}
|
|
}
|
|
@@ -164,7 +181,7 @@ public class JsonAvroConversion {
|
|
assertJsonType(node, JsonNodeType.STRING);
|
|
assertJsonType(node, JsonNodeType.STRING);
|
|
byte[] bytes = node.textValue().getBytes(StandardCharsets.ISO_8859_1);
|
|
byte[] bytes = node.textValue().getBytes(StandardCharsets.ISO_8859_1);
|
|
if (bytes.length != avroSchema.getFixedSize()) {
|
|
if (bytes.length != avroSchema.getFixedSize()) {
|
|
- throw new JsonToAvroConversionException(
|
|
|
|
|
|
+ throw new JsonAvroConversionException(
|
|
"Fixed field has unexpected size %d (should be %d)"
|
|
"Fixed field has unexpected size %d (should be %d)"
|
|
.formatted(bytes.length, avroSchema.getFixedSize()));
|
|
.formatted(bytes.length, avroSchema.getFixedSize()));
|
|
}
|
|
}
|
|
@@ -208,8 +225,11 @@ public class JsonAvroConversion {
|
|
case UNION -> {
|
|
case UNION -> {
|
|
ObjectNode node = MAPPER.createObjectNode();
|
|
ObjectNode node = MAPPER.createObjectNode();
|
|
int unionIdx = AvroData.getGenericData().resolveUnion(avroSchema, obj);
|
|
int unionIdx = AvroData.getGenericData().resolveUnion(avroSchema, obj);
|
|
- Schema unionType = avroSchema.getTypes().get(unionIdx);
|
|
|
|
- node.set(unionType.getFullName(), convertAvroToJson(obj, unionType));
|
|
|
|
|
|
+ Schema selectedType = avroSchema.getTypes().get(unionIdx);
|
|
|
|
+ node.set(
|
|
|
|
+ selectUnionTypeFieldName(avroSchema, selectedType, unionIdx),
|
|
|
|
+ convertAvroToJson(obj, selectedType)
|
|
|
|
+ );
|
|
yield node;
|
|
yield node;
|
|
}
|
|
}
|
|
case STRING -> {
|
|
case STRING -> {
|
|
@@ -252,11 +272,30 @@ public class JsonAvroConversion {
|
|
};
|
|
};
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+ // select name for a key field that represents type name of union.
|
|
|
|
+ // For records selects short name, if it is possible.
|
|
|
|
+ private static String selectUnionTypeFieldName(Schema unionSchema,
|
|
|
|
+ Schema chosenType,
|
|
|
|
+ int chosenTypeIdx) {
|
|
|
|
+ var types = unionSchema.getTypes();
|
|
|
|
+ if (types.size() == 2 && types.contains(NULL_SCHEMA)) {
|
|
|
|
+ return chosenType.getName();
|
|
|
|
+ }
|
|
|
|
+ for (int i = 0; i < types.size(); i++) {
|
|
|
|
+ if (i != chosenTypeIdx && chosenType.getName().equals(types.get(i).getName())) {
|
|
|
|
+ // there is another type inside union with the same name
|
|
|
|
+ // so, we have to use fullname
|
|
|
|
+ return chosenType.getFullName();
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ return chosenType.getName();
|
|
|
|
+ }
|
|
|
|
+
|
|
private static Object processLogicalType(JsonNode node, Schema schema) {
|
|
private static Object processLogicalType(JsonNode node, Schema schema) {
|
|
return findConversion(schema)
|
|
return findConversion(schema)
|
|
.map(c -> c.jsonToAvroConversion.apply(node, schema))
|
|
.map(c -> c.jsonToAvroConversion.apply(node, schema))
|
|
.orElseThrow(() ->
|
|
.orElseThrow(() ->
|
|
- new JsonToAvroConversionException("'%s' logical type is not supported"
|
|
|
|
|
|
+ new JsonAvroConversionException("'%s' logical type is not supported"
|
|
.formatted(schema.getLogicalType().getName())));
|
|
.formatted(schema.getLogicalType().getName())));
|
|
}
|
|
}
|
|
|
|
|
|
@@ -264,7 +303,7 @@ public class JsonAvroConversion {
|
|
return findConversion(schema)
|
|
return findConversion(schema)
|
|
.map(c -> c.avroToJsonConversion.apply(obj, schema))
|
|
.map(c -> c.avroToJsonConversion.apply(obj, schema))
|
|
.orElseThrow(() ->
|
|
.orElseThrow(() ->
|
|
- new JsonToAvroConversionException("'%s' logical type is not supported"
|
|
|
|
|
|
+ new JsonAvroConversionException("'%s' logical type is not supported"
|
|
.formatted(schema.getLogicalType().getName())));
|
|
.formatted(schema.getLogicalType().getName())));
|
|
}
|
|
}
|
|
|
|
|
|
@@ -281,7 +320,7 @@ public class JsonAvroConversion {
|
|
|
|
|
|
private static void assertJsonType(JsonNode node, JsonNodeType... allowedTypes) {
|
|
private static void assertJsonType(JsonNode node, JsonNodeType... allowedTypes) {
|
|
if (Stream.of(allowedTypes).noneMatch(t -> node.getNodeType() == t)) {
|
|
if (Stream.of(allowedTypes).noneMatch(t -> node.getNodeType() == t)) {
|
|
- throw new JsonToAvroConversionException(
|
|
|
|
|
|
+ throw new JsonAvroConversionException(
|
|
"%s node has unexpected type, allowed types %s, actual type %s"
|
|
"%s node has unexpected type, allowed types %s, actual type %s"
|
|
.formatted(node, Arrays.toString(allowedTypes), node.getNodeType()));
|
|
.formatted(node, Arrays.toString(allowedTypes), node.getNodeType()));
|
|
}
|
|
}
|
|
@@ -289,7 +328,7 @@ public class JsonAvroConversion {
|
|
|
|
|
|
private static void assertJsonNumberType(JsonNode node, JsonParser.NumberType... allowedTypes) {
|
|
private static void assertJsonNumberType(JsonNode node, JsonParser.NumberType... allowedTypes) {
|
|
if (Stream.of(allowedTypes).noneMatch(t -> node.numberType() == t)) {
|
|
if (Stream.of(allowedTypes).noneMatch(t -> node.numberType() == t)) {
|
|
- throw new JsonToAvroConversionException(
|
|
|
|
|
|
+ throw new JsonAvroConversionException(
|
|
"%s node has unexpected numeric type, allowed types %s, actual type %s"
|
|
"%s node has unexpected numeric type, allowed types %s, actual type %s"
|
|
.formatted(node, Arrays.toString(allowedTypes), node.numberType()));
|
|
.formatted(node, Arrays.toString(allowedTypes), node.numberType()));
|
|
}
|
|
}
|
|
@@ -318,7 +357,7 @@ public class JsonAvroConversion {
|
|
} else if (node.isNumber()) {
|
|
} else if (node.isNumber()) {
|
|
return new BigDecimal(node.numberValue().toString());
|
|
return new BigDecimal(node.numberValue().toString());
|
|
}
|
|
}
|
|
- throw new JsonToAvroConversionException(
|
|
|
|
|
|
+ throw new JsonAvroConversionException(
|
|
"node '%s' can't be converted to decimal logical type"
|
|
"node '%s' can't be converted to decimal logical type"
|
|
.formatted(node));
|
|
.formatted(node));
|
|
},
|
|
},
|
|
@@ -335,7 +374,7 @@ public class JsonAvroConversion {
|
|
} else if (node.isTextual()) {
|
|
} else if (node.isTextual()) {
|
|
return LocalDate.parse(node.asText());
|
|
return LocalDate.parse(node.asText());
|
|
} else {
|
|
} else {
|
|
- throw new JsonToAvroConversionException(
|
|
|
|
|
|
+ throw new JsonAvroConversionException(
|
|
"node '%s' can't be converted to date logical type"
|
|
"node '%s' can't be converted to date logical type"
|
|
.formatted(node));
|
|
.formatted(node));
|
|
}
|
|
}
|
|
@@ -356,7 +395,7 @@ public class JsonAvroConversion {
|
|
} else if (node.isTextual()) {
|
|
} else if (node.isTextual()) {
|
|
return LocalTime.parse(node.asText());
|
|
return LocalTime.parse(node.asText());
|
|
} else {
|
|
} else {
|
|
- throw new JsonToAvroConversionException(
|
|
|
|
|
|
+ throw new JsonAvroConversionException(
|
|
"node '%s' can't be converted to time-millis logical type"
|
|
"node '%s' can't be converted to time-millis logical type"
|
|
.formatted(node));
|
|
.formatted(node));
|
|
}
|
|
}
|
|
@@ -377,7 +416,7 @@ public class JsonAvroConversion {
|
|
} else if (node.isTextual()) {
|
|
} else if (node.isTextual()) {
|
|
return LocalTime.parse(node.asText());
|
|
return LocalTime.parse(node.asText());
|
|
} else {
|
|
} else {
|
|
- throw new JsonToAvroConversionException(
|
|
|
|
|
|
+ throw new JsonAvroConversionException(
|
|
"node '%s' can't be converted to time-micros logical type"
|
|
"node '%s' can't be converted to time-micros logical type"
|
|
.formatted(node));
|
|
.formatted(node));
|
|
}
|
|
}
|
|
@@ -398,7 +437,7 @@ public class JsonAvroConversion {
|
|
} else if (node.isTextual()) {
|
|
} else if (node.isTextual()) {
|
|
return Instant.parse(node.asText());
|
|
return Instant.parse(node.asText());
|
|
} else {
|
|
} else {
|
|
- throw new JsonToAvroConversionException(
|
|
|
|
|
|
+ throw new JsonAvroConversionException(
|
|
"node '%s' can't be converted to timestamp-millis logical type"
|
|
"node '%s' can't be converted to timestamp-millis logical type"
|
|
.formatted(node));
|
|
.formatted(node));
|
|
}
|
|
}
|
|
@@ -423,7 +462,7 @@ public class JsonAvroConversion {
|
|
} else if (node.isTextual()) {
|
|
} else if (node.isTextual()) {
|
|
return Instant.parse(node.asText());
|
|
return Instant.parse(node.asText());
|
|
} else {
|
|
} else {
|
|
- throw new JsonToAvroConversionException(
|
|
|
|
|
|
+ throw new JsonAvroConversionException(
|
|
"node '%s' can't be converted to timestamp-millis logical type"
|
|
"node '%s' can't be converted to timestamp-millis logical type"
|
|
.formatted(node));
|
|
.formatted(node));
|
|
}
|
|
}
|