|
@@ -4,7 +4,6 @@ import io.confluent.kafka.schemaregistry.avro.AvroSchemaUtils;
|
|
|
import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
|
|
|
import io.confluent.kafka.serializers.KafkaAvroDeserializer;
|
|
|
import lombok.SneakyThrows;
|
|
|
-import org.apache.avro.generic.GenericRecord;
|
|
|
|
|
|
public class AvroMessageFormatter implements MessageFormatter {
|
|
|
private final KafkaAvroDeserializer avroDeserializer;
|
|
@@ -16,8 +15,10 @@ public class AvroMessageFormatter implements MessageFormatter {
|
|
|
@Override
|
|
|
@SneakyThrows
|
|
|
public String format(String topic, byte[] value) {
|
|
|
- GenericRecord avroRecord = (GenericRecord) avroDeserializer.deserialize(topic, value);
|
|
|
- byte[] jsonBytes = AvroSchemaUtils.toJson(avroRecord);
|
|
|
+ // deserialized will have type, that depends on schema type (record or primitive),
|
|
|
+ // AvroSchemaUtils.toJson(...) method will take it into account
|
|
|
+ Object deserialized = avroDeserializer.deserialize(topic, value);
|
|
|
+ byte[] jsonBytes = AvroSchemaUtils.toJson(deserialized);
|
|
|
return new String(jsonBytes);
|
|
|
}
|
|
|
|