|
@@ -5,6 +5,7 @@ import lombok.extern.log4j.Log4j2;
|
|
|
|
|
|
import java.io.IOException;
|
|
|
import java.util.Collections;
|
|
|
+import java.util.HashMap;
|
|
|
import java.util.List;
|
|
|
import java.util.Map;
|
|
|
import java.util.concurrent.ConcurrentHashMap;
|
|
@@ -101,10 +102,14 @@ public class SchemaRegistryRecordDeserializer implements RecordDeserializer {
|
|
|
|
|
|
private Object parseAvroRecord(ConsumerRecord<Bytes, Bytes> record) throws IOException {
|
|
|
String topic = record.topic();
|
|
|
- byte[] valueBytes = record.value().get();
|
|
|
- GenericRecord avroRecord = (GenericRecord) avroDeserializer.deserialize(topic, valueBytes);
|
|
|
- byte[] bytes = AvroSchemaUtils.toJson(avroRecord);
|
|
|
- return parseJson(bytes);
|
|
|
+ if (record.value()!=null) {
|
|
|
+ byte[] valueBytes = record.value().get();
|
|
|
+ GenericRecord avroRecord = (GenericRecord) avroDeserializer.deserialize(topic, valueBytes);
|
|
|
+ byte[] bytes = AvroSchemaUtils.toJson(avroRecord);
|
|
|
+ return parseJson(bytes);
|
|
|
+ } else {
|
|
|
+ return new HashMap<String,Object>();
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
private Object parseJsonRecord(ConsumerRecord<Bytes, Bytes> record) throws IOException {
|