Bläddra i källkod

#222 Fix npe in consumer (#223)

* add null handling
* replaced hashmaps with immutable map in deserializer returns
* added simple test for previously failing case
Ramazan Yapparov 4 år sedan
förälder
incheckning
58df6c1a7e

+ 10 - 3
kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/deserialization/SchemaRegistryRecordDeserializer.java

@@ -150,7 +150,7 @@ public class SchemaRegistryRecordDeserializer implements RecordDeserializer {
 			byte[] bytes = AvroSchemaUtils.toJson(avroRecord);
 			byte[] bytes = AvroSchemaUtils.toJson(avroRecord);
 			return parseJson(bytes);
 			return parseJson(bytes);
 		} else {
 		} else {
-			return new HashMap<String,Object>();
+			return Map.of();
 		}
 		}
 	}
 	}
 
 
@@ -162,12 +162,16 @@ public class SchemaRegistryRecordDeserializer implements RecordDeserializer {
 			byte[] bytes = ProtobufSchemaUtils.toJson(message);
 			byte[] bytes = ProtobufSchemaUtils.toJson(message);
 			return parseJson(bytes);
 			return parseJson(bytes);
 		} else {
 		} else {
-			return new HashMap<String,Object>();
+			return Map.of();
 		}
 		}
 	}
 	}
 
 
 	private Object parseJsonRecord(ConsumerRecord<Bytes, Bytes> record) throws IOException {
 	private Object parseJsonRecord(ConsumerRecord<Bytes, Bytes> record) throws IOException {
-		byte[] valueBytes = record.value().get();
+		var value = record.value();
+		if (value == null) {
+			return Map.of();
+		}
+		byte[] valueBytes = value.get();
 		return parseJson(valueBytes);
 		return parseJson(valueBytes);
 	}
 	}
 
 
@@ -178,6 +182,9 @@ public class SchemaRegistryRecordDeserializer implements RecordDeserializer {
 
 
 	private Object parseStringRecord(ConsumerRecord<Bytes, Bytes> record) {
 	private Object parseStringRecord(ConsumerRecord<Bytes, Bytes> record) {
 		String topic = record.topic();
 		String topic = record.topic();
+		if (record.value() == null) {
+			return Map.of();
+		}
 		byte[] valueBytes = record.value().get();
 		byte[] valueBytes = record.value().get();
 		return stringDeserializer.deserialize(topic, valueBytes);
 		return stringDeserializer.deserialize(topic, valueBytes);
 	}
 	}

+ 34 - 0
kafka-ui-api/src/test/java/com/provectus/kafka/ui/cluster/deserialization/SchemaRegistryRecordDeserializerTest.java

@@ -0,0 +1,34 @@
+package com.provectus.kafka.ui.cluster.deserialization;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.provectus.kafka.ui.cluster.model.KafkaCluster;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.utils.Bytes;
+import org.junit.jupiter.api.Test;
+
+import java.util.Map;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+class SchemaRegistryRecordDeserializerTest {
+
+    private final SchemaRegistryRecordDeserializer deserializer = new SchemaRegistryRecordDeserializer(
+            KafkaCluster.builder()
+                    .schemaNameTemplate("%s-value")
+                    .build(),
+            new ObjectMapper()
+    );
+
+    @Test
+    public void shouldDeserializeStringValue() {
+        var value = "test";
+        var deserializedRecord = deserializer.deserialize(new ConsumerRecord<>("topic", 1, 0, Bytes.wrap("key".getBytes()), Bytes.wrap(value.getBytes())));
+        assertEquals(value, deserializedRecord);
+    }
+
+    @Test
+    public void shouldDeserializeNullValueRecordToEmptyMap() {
+        var deserializedRecord = deserializer.deserialize(new ConsumerRecord<>("topic", 1, 0, Bytes.wrap("key".getBytes()), null));
+        assertEquals(Map.of(), deserializedRecord);
+    }
+}