ISSUE-841 Fix null valued headers deserialization (#843)

This commit is contained in:
German Osin 2021-08-31 19:17:56 +03:00 committed by GitHub
parent 7643825059
commit 161d887e64
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
2 changed files with 25 additions and 8 deletions

View file

@ -281,9 +281,15 @@ public class ClusterUtil {
public static TopicMessage mapToTopicMessage(ConsumerRecord<Bytes, Bytes> consumerRecord,
RecordSerDe recordDeserializer) {
Map<String, String> headers = new HashMap<>();
consumerRecord.headers().iterator()
.forEachRemaining(header -> headers.put(header.key(), new String(header.value())));
.forEachRemaining(header ->
headers.put(
header.key(),
header.value() != null ? new String(header.value()) : null
)
);
TopicMessage topicMessage = new TopicMessage();

View file

@ -15,6 +15,7 @@ import com.provectus.kafka.ui.util.OffsetsSeekBackward;
import com.provectus.kafka.ui.util.OffsetsSeekForward;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@ -29,6 +30,8 @@ import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.internals.RecordHeader;
import org.apache.kafka.common.serialization.BytesDeserializer;
import org.apache.kafka.common.utils.Bytes;
import org.junit.jupiter.api.AfterAll;
@ -57,13 +60,21 @@ class RecordEmitterTest extends AbstractBaseTest {
for (int i = 0; i < MSGS_PER_PARTITION; i++) {
long ts = System.currentTimeMillis() + i;
var value = "msg_" + partition + "_" + i;
var metadata =
producer.send(new ProducerRecord<>(TOPIC, partition, ts, null, value)).get();
SENT_RECORDS.add(new Record(
value,
new TopicPartition(metadata.topic(), metadata.partition()),
metadata.offset(),
ts)
var metadata = producer.send(
new ProducerRecord<>(
TOPIC, partition, ts, null, value, List.of(
new RecordHeader("name", null),
new RecordHeader("name2", "value".getBytes())
)
)
).get();
SENT_RECORDS.add(
new Record(
value,
new TopicPartition(metadata.topic(), metadata.partition()),
metadata.offset(),
ts
)
);
}
}