|
@@ -21,10 +21,9 @@ import org.apache.kafka.clients.consumer.ConsumerConfig;
|
|
import org.apache.kafka.clients.consumer.KafkaConsumer;
|
|
import org.apache.kafka.clients.consumer.KafkaConsumer;
|
|
import org.apache.kafka.common.serialization.BytesDeserializer;
|
|
import org.apache.kafka.common.serialization.BytesDeserializer;
|
|
import org.apache.kafka.common.utils.Bytes;
|
|
import org.apache.kafka.common.utils.Bytes;
|
|
-import org.junit.jupiter.api.AfterEach;
|
|
|
|
-import org.junit.jupiter.api.BeforeEach;
|
|
|
|
|
|
+import org.junit.jupiter.api.AfterAll;
|
|
|
|
+import org.junit.jupiter.api.BeforeAll;
|
|
import org.junit.jupiter.api.Test;
|
|
import org.junit.jupiter.api.Test;
|
|
-import org.testcontainers.shaded.org.apache.commons.lang3.mutable.MutableBoolean;
|
|
|
|
import org.testcontainers.shaded.org.awaitility.Awaitility;
|
|
import org.testcontainers.shaded.org.awaitility.Awaitility;
|
|
import reactor.util.function.Tuple2;
|
|
import reactor.util.function.Tuple2;
|
|
import reactor.util.function.Tuples;
|
|
import reactor.util.function.Tuples;
|
|
@@ -33,11 +32,11 @@ class ConsumerOffsetsSerdeTest extends AbstractIntegrationTest {
|
|
|
|
|
|
private static final int MSGS_TO_GENERATE = 10;
|
|
private static final int MSGS_TO_GENERATE = 10;
|
|
|
|
|
|
- private String consumerGroupName;
|
|
|
|
- private String committedTopic;
|
|
|
|
|
|
+ private static String consumerGroupName;
|
|
|
|
+ private static String committedTopic;
|
|
|
|
|
|
- @BeforeEach
|
|
|
|
- void createTopicAndCommitItsOffset() {
|
|
|
|
|
|
+ @BeforeAll
|
|
|
|
+ static void createTopicAndCommitItsOffset() {
|
|
committedTopic = ConsumerOffsetsSerdeTest.class.getName() + "-" + UUID.randomUUID();
|
|
committedTopic = ConsumerOffsetsSerdeTest.class.getName() + "-" + UUID.randomUUID();
|
|
consumerGroupName = committedTopic + "-group";
|
|
consumerGroupName = committedTopic + "-group";
|
|
createTopic(new NewTopic(committedTopic, 1, (short) 1));
|
|
createTopic(new NewTopic(committedTopic, 1, (short) 1));
|
|
@@ -57,19 +56,28 @@ class ConsumerOffsetsSerdeTest extends AbstractIntegrationTest {
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
- @AfterEach
|
|
|
|
- void cleanUp() {
|
|
|
|
|
|
+ @AfterAll
|
|
|
|
+ static void cleanUp() {
|
|
deleteTopic(committedTopic);
|
|
deleteTopic(committedTopic);
|
|
}
|
|
}
|
|
|
|
|
|
@Test
|
|
@Test
|
|
- void test() {
|
|
|
|
|
|
+ void canOnlyDeserializeConsumerOffsetsTopic() {
|
|
|
|
+ var serde = new ConsumerOffsetsSerde();
|
|
|
|
+ assertThat(serde.canDeserialize(ConsumerOffsetsSerde.TOPIC, Serde.Target.KEY)).isTrue();
|
|
|
|
+ assertThat(serde.canDeserialize(ConsumerOffsetsSerde.TOPIC, Serde.Target.VALUE)).isTrue();
|
|
|
|
+ assertThat(serde.canDeserialize("anyOtherTopic", Serde.Target.KEY)).isFalse();
|
|
|
|
+ assertThat(serde.canDeserialize("anyOtherTopic", Serde.Target.VALUE)).isFalse();
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ @Test
|
|
|
|
+ void deserializesMessagesMadeByConsumerActivity() {
|
|
var serde = new ConsumerOffsetsSerde();
|
|
var serde = new ConsumerOffsetsSerde();
|
|
var keyDeserializer = serde.deserializer(TOPIC, Serde.Target.KEY);
|
|
var keyDeserializer = serde.deserializer(TOPIC, Serde.Target.KEY);
|
|
var valueDeserializer = serde.deserializer(TOPIC, Serde.Target.VALUE);
|
|
var valueDeserializer = serde.deserializer(TOPIC, Serde.Target.VALUE);
|
|
|
|
|
|
try (var consumer = createConsumer(consumerGroupName + "-check")) {
|
|
try (var consumer = createConsumer(consumerGroupName + "-check")) {
|
|
- consumer.subscribe(List.of(TOPIC));
|
|
|
|
|
|
+ consumer.subscribe(List.of(ConsumerOffsetsSerde.TOPIC));
|
|
List<Tuple2<DeserializeResult, DeserializeResult>> polled = new ArrayList<>();
|
|
List<Tuple2<DeserializeResult, DeserializeResult>> polled = new ArrayList<>();
|
|
|
|
|
|
Awaitility.await()
|
|
Awaitility.await()
|
|
@@ -158,7 +166,7 @@ class ConsumerOffsetsSerdeTest extends AbstractIntegrationTest {
|
|
return new JsonMapper().readValue(result.getResult(), Map.class);
|
|
return new JsonMapper().readValue(result.getResult(), Map.class);
|
|
}
|
|
}
|
|
|
|
|
|
- private KafkaConsumer<Bytes, Bytes> createConsumer(String groupId) {
|
|
|
|
|
|
+ private static KafkaConsumer<Bytes, Bytes> createConsumer(String groupId) {
|
|
Properties props = new Properties();
|
|
Properties props = new Properties();
|
|
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
|
|
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
|
|
props.put(ConsumerConfig.CLIENT_ID_CONFIG, groupId);
|
|
props.put(ConsumerConfig.CLIENT_ID_CONFIG, groupId);
|