|
@@ -85,9 +85,15 @@ class ConsumerOffsetsSerdeTest extends AbstractIntegrationTest {
|
|
|
.atMost(Duration.ofMinutes(1))
|
|
|
.untilAsserted(() -> {
|
|
|
for (var rec : consumer.poll(Duration.ofMillis(200))) {
|
|
|
- DeserializeResult key = keyDeserializer.deserialize(null, rec.key().get());
|
|
|
- DeserializeResult val = valueDeserializer.deserialize(null, rec.value().get());
|
|
|
- polled.add(Tuples.of(key, val));
|
|
|
+ DeserializeResult key = rec.key() != null
|
|
|
+ ? keyDeserializer.deserialize(null, rec.key().get())
|
|
|
+ : null;
|
|
|
+ DeserializeResult val = rec.value() != null
|
|
|
+ ? valueDeserializer.deserialize(null, rec.value().get())
|
|
|
+ : null;
|
|
|
+ if (key != null && val != null) {
|
|
|
+ polled.add(Tuples.of(key, val));
|
|
|
+ }
|
|
|
}
|
|
|
assertThat(polled).anyMatch(t -> isCommitMessage(t.getT1(), t.getT2()));
|
|
|
assertThat(polled).anyMatch(t -> isGroupMetadataMessage(t.getT1(), t.getT2()));
|