|
@@ -1,18 +1,18 @@
|
|
package com.provectus.kafka.ui.service.integration.odd.schema;
|
|
package com.provectus.kafka.ui.service.integration.odd.schema;
|
|
|
|
|
|
import com.google.common.collect.ImmutableSet;
|
|
import com.google.common.collect.ImmutableSet;
|
|
-import com.provectus.kafka.ui.service.integration.odd.Oddrn;
|
|
|
|
import com.provectus.kafka.ui.sr.model.SchemaSubject;
|
|
import com.provectus.kafka.ui.sr.model.SchemaSubject;
|
|
import java.util.ArrayList;
|
|
import java.util.ArrayList;
|
|
import java.util.List;
|
|
import java.util.List;
|
|
-import lombok.experimental.UtilityClass;
|
|
|
|
import org.apache.avro.Schema;
|
|
import org.apache.avro.Schema;
|
|
import org.opendatadiscovery.client.model.DataSetField;
|
|
import org.opendatadiscovery.client.model.DataSetField;
|
|
import org.opendatadiscovery.client.model.DataSetFieldType;
|
|
import org.opendatadiscovery.client.model.DataSetFieldType;
|
|
import org.opendatadiscovery.oddrn.model.KafkaPath;
|
|
import org.opendatadiscovery.oddrn.model.KafkaPath;
|
|
|
|
|
|
-@UtilityClass
|
|
+final class AvroExtractor {
|
|
-class AvroExtractor {
|
|
+
|
|
|
|
+ private AvroExtractor() {
|
|
|
|
+ }
|
|
|
|
|
|
static List<DataSetField> extract(SchemaSubject subject, KafkaPath topicOddrn, boolean isKey) {
|
|
static List<DataSetField> extract(SchemaSubject subject, KafkaPath topicOddrn, boolean isKey) {
|
|
var schema = new Schema.Parser().parse(subject.getSchema());
|
|
var schema = new Schema.Parser().parse(subject.getSchema());
|
|
@@ -31,14 +31,14 @@ class AvroExtractor {
|
|
return result;
|
|
return result;
|
|
}
|
|
}
|
|
|
|
|
|
- private void extract(Schema schema,
|
|
+ private static void extract(Schema schema,
|
|
- String parentOddr,
|
|
+ String parentOddr,
|
|
- String oddrn,
|
|
+ String oddrn,
|
|
- String name,
|
|
+ String name,
|
|
- String doc,
|
|
+ String doc,
|
|
- Boolean nullable,
|
|
+ Boolean nullable,
|
|
- ImmutableSet<String> registeredRecords,
|
|
+ ImmutableSet<String> registeredRecords,
|
|
- List<DataSetField> sink
|
|
+ List<DataSetField> sink
|
|
) {
|
|
) {
|
|
switch (schema.getType()) {
|
|
switch (schema.getType()) {
|
|
case RECORD -> extractRecord(schema, parentOddr, oddrn, name, doc, nullable, registeredRecords, sink);
|
|
case RECORD -> extractRecord(schema, parentOddr, oddrn, name, doc, nullable, registeredRecords, sink);
|
|
@@ -49,12 +49,12 @@ class AvroExtractor {
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
- private DataSetField createDataSetField(String name,
|
|
+ private static DataSetField createDataSetField(String name,
|
|
- String doc,
|
|
+ String doc,
|
|
- String parentOddrn,
|
|
+ String parentOddrn,
|
|
- String oddrn,
|
|
+ String oddrn,
|
|
- Schema schema,
|
|
+ Schema schema,
|
|
- Boolean nullable) {
|
|
+ Boolean nullable) {
|
|
return new DataSetField()
|
|
return new DataSetField()
|
|
.name(name)
|
|
.name(name)
|
|
.description(doc)
|
|
.description(doc)
|
|
@@ -63,14 +63,14 @@ class AvroExtractor {
|
|
.type(mapSchema(schema, nullable));
|
|
.type(mapSchema(schema, nullable));
|
|
}
|
|
}
|
|
|
|
|
|
- private void extractRecord(Schema schema,
|
|
+ private static void extractRecord(Schema schema,
|
|
- String parentOddr,
|
|
+ String parentOddr,
|
|
- String oddrn,
|
|
+ String oddrn,
|
|
- String name,
|
|
+ String name,
|
|
- String doc,
|
|
+ String doc,
|
|
- Boolean nullable,
|
|
+ Boolean nullable,
|
|
- ImmutableSet<String> registeredRecords,
|
|
+ ImmutableSet<String> registeredRecords,
|
|
- List<DataSetField> sink) {
|
|
+ List<DataSetField> sink) {
|
|
boolean isRoot = oddrn == null;
|
|
boolean isRoot = oddrn == null;
|
|
if (!isRoot) {
|
|
if (!isRoot) {
|
|
sink.add(createDataSetField(name, doc, parentOddr, oddrn, schema, nullable));
|
|
sink.add(createDataSetField(name, doc, parentOddr, oddrn, schema, nullable));
|
|
@@ -99,13 +99,13 @@ class AvroExtractor {
|
|
));
|
|
));
|
|
}
|
|
}
|
|
|
|
|
|
- private void extractUnion(Schema schema,
|
|
+ private static void extractUnion(Schema schema,
|
|
- String parentOddr,
|
|
+ String parentOddr,
|
|
- String oddrn,
|
|
+ String oddrn,
|
|
- String name,
|
|
+ String name,
|
|
- String doc,
|
|
+ String doc,
|
|
- ImmutableSet<String> registeredRecords,
|
|
+ ImmutableSet<String> registeredRecords,
|
|
- List<DataSetField> sink) {
|
|
+ List<DataSetField> sink) {
|
|
boolean isRoot = oddrn == null;
|
|
boolean isRoot = oddrn == null;
|
|
boolean containsNull = schema.getTypes().stream().map(Schema::getType).anyMatch(t -> t == Schema.Type.NULL);
|
|
boolean containsNull = schema.getTypes().stream().map(Schema::getType).anyMatch(t -> t == Schema.Type.NULL);
|
|
|
|
|
|
@@ -149,14 +149,14 @@ class AvroExtractor {
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
- private void extractArray(Schema schema,
|
|
+ private static void extractArray(Schema schema,
|
|
- String parentOddr,
|
|
+ String parentOddr,
|
|
- String oddrn,
|
|
+ String oddrn,
|
|
- String name,
|
|
+ String name,
|
|
- String doc,
|
|
+ String doc,
|
|
- Boolean nullable,
|
|
+ Boolean nullable,
|
|
- ImmutableSet<String> registeredRecords,
|
|
+ ImmutableSet<String> registeredRecords,
|
|
- List<DataSetField> sink) {
|
|
+ List<DataSetField> sink) {
|
|
boolean isRoot = oddrn == null;
|
|
boolean isRoot = oddrn == null;
|
|
oddrn = isRoot ? parentOddr + "/array" : oddrn;
|
|
oddrn = isRoot ? parentOddr + "/array" : oddrn;
|
|
if (isRoot) {
|
|
if (isRoot) {
|
|
@@ -176,14 +176,14 @@ class AvroExtractor {
|
|
);
|
|
);
|
|
}
|
|
}
|
|
|
|
|
|
- private void extractMap(Schema schema,
|
|
+ private static void extractMap(Schema schema,
|
|
- String parentOddr,
|
|
+ String parentOddr,
|
|
- String oddrn,
|
|
+ String oddrn,
|
|
- String name,
|
|
+ String name,
|
|
- String doc,
|
|
+ String doc,
|
|
- Boolean nullable,
|
|
+ Boolean nullable,
|
|
- ImmutableSet<String> registeredRecords,
|
|
+ ImmutableSet<String> registeredRecords,
|
|
- List<DataSetField> sink) {
|
|
+ List<DataSetField> sink) {
|
|
boolean isRoot = oddrn == null;
|
|
boolean isRoot = oddrn == null;
|
|
oddrn = isRoot ? parentOddr + "/map" : oddrn;
|
|
oddrn = isRoot ? parentOddr + "/map" : oddrn;
|
|
if (isRoot) {
|
|
if (isRoot) {
|
|
@@ -214,13 +214,13 @@ class AvroExtractor {
|
|
}
|
|
}
|
|
|
|
|
|
|
|
|
|
- private void extractPrimitive(Schema schema,
|
|
+ private static void extractPrimitive(Schema schema,
|
|
- String parentOddr,
|
|
+ String parentOddr,
|
|
- String oddrn,
|
|
+ String oddrn,
|
|
- String name,
|
|
+ String name,
|
|
- String doc,
|
|
+ String doc,
|
|
- Boolean nullable,
|
|
+ Boolean nullable,
|
|
- List<DataSetField> sink) {
|
|
+ List<DataSetField> sink) {
|
|
boolean isRoot = oddrn == null;
|
|
boolean isRoot = oddrn == null;
|
|
String primOddrn = isRoot ? (parentOddr + "/" + schema.getType()) : oddrn;
|
|
String primOddrn = isRoot ? (parentOddr + "/" + schema.getType()) : oddrn;
|
|
if (isRoot) {
|
|
if (isRoot) {
|
|
@@ -231,7 +231,7 @@ class AvroExtractor {
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
- private DataSetFieldType.TypeEnum mapType(Schema.Type type) {
|
|
+ private static DataSetFieldType.TypeEnum mapType(Schema.Type type) {
|
|
return switch (type) {
|
|
return switch (type) {
|
|
case INT, LONG -> DataSetFieldType.TypeEnum.INTEGER;
|
|
case INT, LONG -> DataSetFieldType.TypeEnum.INTEGER;
|
|
case FLOAT, DOUBLE, FIXED -> DataSetFieldType.TypeEnum.NUMBER;
|
|
case FLOAT, DOUBLE, FIXED -> DataSetFieldType.TypeEnum.NUMBER;
|
|
@@ -246,14 +246,14 @@ class AvroExtractor {
|
|
};
|
|
};
|
|
}
|
|
}
|
|
|
|
|
|
- private DataSetFieldType mapSchema(Schema schema, Boolean nullable) {
|
|
+ private static DataSetFieldType mapSchema(Schema schema, Boolean nullable) {
|
|
return new DataSetFieldType()
|
|
return new DataSetFieldType()
|
|
.logicalType(logicalType(schema))
|
|
.logicalType(logicalType(schema))
|
|
.isNullable(nullable)
|
|
.isNullable(nullable)
|
|
.type(mapType(schema.getType()));
|
|
.type(mapType(schema.getType()));
|
|
}
|
|
}
|
|
|
|
|
|
- private String logicalType(Schema schema) {
|
|
+ private static String logicalType(Schema schema) {
|
|
return schema.getType() == Schema.Type.RECORD
|
|
return schema.getType() == Schema.Type.RECORD
|
|
? schema.getFullName()
|
|
? schema.getFullName()
|
|
: schema.getType().toString().toLowerCase();
|
|
: schema.getType().toString().toLowerCase();
|