From 912a387d587095cc583e566924d5232921a3b9a2 Mon Sep 17 00:00:00 2001 From: iliax Date: Wed, 3 May 2023 16:59:32 +0400 Subject: [PATCH] serde test added --- .../builtin/ConsumerOffsetsSerdeTest.java | 171 ++++++++++++++++++ 1 file changed, 171 insertions(+) create mode 100644 kafka-ui-api/src/test/java/com/provectus/kafka/ui/serdes/builtin/ConsumerOffsetsSerdeTest.java diff --git a/kafka-ui-api/src/test/java/com/provectus/kafka/ui/serdes/builtin/ConsumerOffsetsSerdeTest.java b/kafka-ui-api/src/test/java/com/provectus/kafka/ui/serdes/builtin/ConsumerOffsetsSerdeTest.java new file mode 100644 index 0000000000..11c2cab779 --- /dev/null +++ b/kafka-ui-api/src/test/java/com/provectus/kafka/ui/serdes/builtin/ConsumerOffsetsSerdeTest.java @@ -0,0 +1,171 @@ +package com.provectus.kafka.ui.serdes.builtin; + +import static com.provectus.kafka.ui.serdes.builtin.ConsumerOffsetsSerde.TOPIC; +import static org.assertj.core.api.Assertions.assertThat; + +import com.fasterxml.jackson.databind.json.JsonMapper; +import com.provectus.kafka.ui.AbstractIntegrationTest; +import com.provectus.kafka.ui.producer.KafkaTestProducer; +import com.provectus.kafka.ui.serde.api.DeserializeResult; +import com.provectus.kafka.ui.serde.api.Serde; +import java.time.Duration; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.Set; +import java.util.UUID; +import lombok.SneakyThrows; +import org.apache.kafka.clients.admin.NewTopic; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.common.serialization.BytesDeserializer; +import org.apache.kafka.common.utils.Bytes; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.testcontainers.shaded.org.apache.commons.lang3.mutable.MutableBoolean; +import org.testcontainers.shaded.org.awaitility.Awaitility; +import reactor.util.function.Tuple2; +import reactor.util.function.Tuples; + +class ConsumerOffsetsSerdeTest extends AbstractIntegrationTest { + + private static final int MSGS_TO_GENERATE = 10; + + private String consumerGroupName; + private String committedTopic; + + @BeforeEach + void createTopicAndCommitItsOffset() { + committedTopic = ConsumerOffsetsSerdeTest.class.getName() + "-" + UUID.randomUUID(); + consumerGroupName = committedTopic + "-group"; + createTopic(new NewTopic(committedTopic, 1, (short) 1)); + + try (var producer = KafkaTestProducer.forKafka(kafka)) { + for (int i = 0; i < 10; i++) { + producer.send(committedTopic, "i=" + i); + } + } + try (var consumer = createConsumer(consumerGroupName)) { + consumer.subscribe(List.of(committedTopic)); + int polled = 0; + while (polled < MSGS_TO_GENERATE) { + polled += consumer.poll(Duration.ofMillis(100)).count(); + } + consumer.commitSync(); + } + } + + @AfterEach + void cleanUp() { + deleteTopic(committedTopic); + } + + @Test + void test() { + var serde = new ConsumerOffsetsSerde(); + var keyDeserializer = serde.deserializer(TOPIC, Serde.Target.KEY); + var valueDeserializer = serde.deserializer(TOPIC, Serde.Target.VALUE); + + try (var consumer = createConsumer(consumerGroupName + "-check")) { + consumer.subscribe(List.of(TOPIC)); + List> polled = new ArrayList<>(); + + Awaitility.await() + .pollInSameThread() + .atMost(Duration.ofMinutes(1)) + .untilAsserted(() -> { + for (var rec : consumer.poll(Duration.ofMillis(200))) { + DeserializeResult key = keyDeserializer.deserialize(null, rec.key().get()); + DeserializeResult val = valueDeserializer.deserialize(null, rec.value().get()); + polled.add(Tuples.of(key, val)); + } + assertThat(polled).anyMatch(t -> isCommitMessage(t.getT1(), t.getT2())); + assertThat(polled).anyMatch(t -> isGroupMetadataMessage(t.getT1(), t.getT2())); + }); + } + } + + // Sample commit record: + // + // key: { + // "group": "test_Members_3", + // "topic": "test", + // "partition": 0 + // } + // + // value: + // { + // "offset": 2, + // "leader_epoch": 0, + // "metadata": "", + // "commit_timestamp": 1683112980588 + // } + private boolean isCommitMessage(DeserializeResult key, DeserializeResult value) { + var keyJson = toMapFromJsom(key); + boolean keyIsOk = consumerGroupName.equals(keyJson.get("group")) + && committedTopic.equals(keyJson.get("topic")) + && ((Integer) 0).equals(keyJson.get("partition")); + + var valueJson = toMapFromJsom(value); + boolean valueIsOk = valueJson.containsKey("offset") + && valueJson.get("offset").equals(MSGS_TO_GENERATE) + && valueJson.containsKey("commit_timestamp"); + + return keyIsOk && valueIsOk; + } + + // Sample group metadata record: + // + // key: { + // "group": "test_Members_3" + // } + // + // value: + // { + // "protocol_type": "consumer", + // "generation": 1, + // "protocol": "range", + // "leader": "consumer-test_Members_3-1-5a37876e-e42f-420e-9c7d-6902889bd5dd", + // "current_state_timestamp": 1683112974561, + // "members": [ + // { + // "member_id": "consumer-test_Members_3-1-5a37876e-e42f-420e-9c7d-6902889bd5dd", + // "group_instance_id": null, + // "client_id": "consumer-test_Members_3-1", + // "client_host": "/192.168.16.1", + // "rebalance_timeout": 300000, + // "session_timeout": 45000, + // "subscription": "AAEAAAABAAR0ZXN0/////wAAAAA=", + // "assignment": "AAEAAAABAAR0ZXN0AAAAAQAAAAD/////" + // } + // ] + // } + private boolean isGroupMetadataMessage(DeserializeResult key, DeserializeResult value) { + var keyJson = toMapFromJsom(key); + boolean keyIsOk = consumerGroupName.equals(keyJson.get("group")) && keyJson.size() == 1; + + var valueJson = toMapFromJsom(value); + boolean valueIsOk = valueJson.keySet() + .containsAll(Set.of("protocol_type", "generation", "leader", "members")); + + return keyIsOk && valueIsOk; + } + + @SneakyThrows + private Map toMapFromJsom(DeserializeResult result) { + return new JsonMapper().readValue(result.getResult(), Map.class); + } + + private KafkaConsumer createConsumer(String groupId) { + Properties props = new Properties(); + props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId); + props.put(ConsumerConfig.CLIENT_ID_CONFIG, groupId); + props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka.getBootstrapServers()); + props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, BytesDeserializer.class); + props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, BytesDeserializer.class); + props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); + return new KafkaConsumer<>(props); + } +}