Jelajahi Sumber

BE: Serde: Implement a serde for consumer_offsets topic (#3771)

Ilya Kuramshin 2 tahun lalu
induk
melakukan
8337c9c183

+ 24 - 0
kafka-ui-api/src/main/java/com/provectus/kafka/ui/serdes/SerdesInitializer.java

@@ -11,6 +11,7 @@ import com.provectus.kafka.ui.serde.api.PropertyResolver;
 import com.provectus.kafka.ui.serde.api.Serde;
 import com.provectus.kafka.ui.serdes.builtin.AvroEmbeddedSerde;
 import com.provectus.kafka.ui.serdes.builtin.Base64Serde;
+import com.provectus.kafka.ui.serdes.builtin.ConsumerOffsetsSerde;
 import com.provectus.kafka.ui.serdes.builtin.Int32Serde;
 import com.provectus.kafka.ui.serdes.builtin.Int64Serde;
 import com.provectus.kafka.ui.serdes.builtin.ProtobufFileSerde;
@@ -118,6 +119,8 @@ public class SerdesInitializer {
       }
     });
 
+    registerTopicRelatedSerde(registeredSerdes);
+
     return new ClusterSerdes(
         registeredSerdes,
         Optional.ofNullable(clusterProperties.getDefaultKeySerde())
@@ -132,6 +135,27 @@ public class SerdesInitializer {
     );
   }
 
+  /**
+   * Registers serdse that should only be used for specific (hard-coded) topics, like ConsumerOffsetsSerde.
+   */
+  private void registerTopicRelatedSerde(Map<String, SerdeInstance> serdes) {
+    registerConsumerOffsetsSerde(serdes);
+  }
+
+  private void registerConsumerOffsetsSerde(Map<String, SerdeInstance> serdes) {
+    var pattern = Pattern.compile(ConsumerOffsetsSerde.TOPIC);
+    serdes.put(
+        ConsumerOffsetsSerde.name(),
+        new SerdeInstance(
+            ConsumerOffsetsSerde.name(),
+            new ConsumerOffsetsSerde(),
+            pattern,
+            pattern,
+            null
+        )
+    );
+  }
+
   private SerdeInstance createFallbackSerde() {
     StringSerde serde = new StringSerde();
     serde.configure(PropertyResolverImpl.empty(), PropertyResolverImpl.empty(), PropertyResolverImpl.empty());

+ 294 - 0
kafka-ui-api/src/main/java/com/provectus/kafka/ui/serdes/builtin/ConsumerOffsetsSerde.java

@@ -0,0 +1,294 @@
+package com.provectus.kafka.ui.serdes.builtin;
+
+import com.fasterxml.jackson.core.JsonGenerator;
+import com.fasterxml.jackson.databind.JsonSerializer;
+import com.fasterxml.jackson.databind.SerializerProvider;
+import com.fasterxml.jackson.databind.json.JsonMapper;
+import com.fasterxml.jackson.databind.module.SimpleModule;
+import com.provectus.kafka.ui.serde.api.DeserializeResult;
+import com.provectus.kafka.ui.serde.api.SchemaDescription;
+import com.provectus.kafka.ui.serdes.BuiltInSerde;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Map;
+import java.util.Optional;
+import lombok.SneakyThrows;
+import org.apache.kafka.common.protocol.types.ArrayOf;
+import org.apache.kafka.common.protocol.types.BoundField;
+import org.apache.kafka.common.protocol.types.CompactArrayOf;
+import org.apache.kafka.common.protocol.types.Field;
+import org.apache.kafka.common.protocol.types.Schema;
+import org.apache.kafka.common.protocol.types.Struct;
+import org.apache.kafka.common.protocol.types.Type;
+
+// Deserialization logic and message's schemas can be found in
+// kafka.coordinator.group.GroupMetadataManager (readMessageKey, readOffsetMessageValue, readGroupMessageValue)
+public class ConsumerOffsetsSerde implements BuiltInSerde {
+
+  private static final JsonMapper JSON_MAPPER = createMapper();
+
+  public static final String TOPIC = "__consumer_offsets";
+
+  public static String name() {
+    return "__consumer_offsets";
+  }
+
+  private static JsonMapper createMapper() {
+    var module = new SimpleModule();
+    module.addSerializer(Struct.class, new JsonSerializer<>() {
+      @Override
+      public void serialize(Struct value, JsonGenerator gen, SerializerProvider serializers) throws IOException {
+        gen.writeStartObject();
+        for (BoundField field : value.schema().fields()) {
+          var fieldVal = value.get(field);
+          gen.writeObjectField(field.def.name, fieldVal);
+        }
+        gen.writeEndObject();
+      }
+    });
+    var mapper = new JsonMapper();
+    mapper.registerModule(module);
+    return mapper;
+  }
+
+  @Override
+  public Optional<String> getDescription() {
+    return Optional.empty();
+  }
+
+  @Override
+  public Optional<SchemaDescription> getSchema(String topic, Target type) {
+    return Optional.empty();
+  }
+
+  @Override
+  public boolean canDeserialize(String topic, Target type) {
+    return topic.equals(TOPIC);
+  }
+
+  @Override
+  public boolean canSerialize(String topic, Target type) {
+    return false;
+  }
+
+  @Override
+  public Serializer serializer(String topic, Target type) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public Deserializer deserializer(String topic, Target type) {
+    return switch (type) {
+      case KEY -> keyDeserializer();
+      case VALUE -> valueDeserializer();
+    };
+  }
+
+  private Deserializer keyDeserializer() {
+    final Schema commitKeySchema = new Schema(
+        new Field("group", Type.STRING, ""),
+        new Field("topic", Type.STRING, ""),
+        new Field("partition", Type.INT32, "")
+    );
+
+    final Schema groupMetadataSchema = new Schema(
+        new Field("group", Type.STRING, "")
+    );
+
+    return (headers, data) -> {
+      var bb = ByteBuffer.wrap(data);
+      short version = bb.getShort();
+      return new DeserializeResult(
+          toJson(
+              switch (version) {
+                case 0, 1 -> commitKeySchema.read(bb);
+                case 2 -> groupMetadataSchema.read(bb);
+                default -> throw new IllegalStateException("Unknown group metadata message version: " + version);
+              }
+          ),
+          DeserializeResult.Type.JSON,
+          Map.of()
+      );
+    };
+  }
+
+  private Deserializer valueDeserializer() {
+    final Schema commitOffsetSchemaV0 =
+        new Schema(
+            new Field("offset", Type.INT64, ""),
+            new Field("metadata", Type.STRING, ""),
+            new Field("commit_timestamp", Type.INT64, "")
+        );
+
+    final Schema commitOffsetSchemaV1 =
+        new Schema(
+            new Field("offset", Type.INT64, ""),
+            new Field("metadata", Type.STRING, ""),
+            new Field("commit_timestamp", Type.INT64, ""),
+            new Field("expire_timestamp", Type.INT64, "")
+        );
+
+    final Schema commitOffsetSchemaV2 =
+        new Schema(
+            new Field("offset", Type.INT64, ""),
+            new Field("metadata", Type.STRING, ""),
+            new Field("commit_timestamp", Type.INT64, "")
+        );
+
+    final Schema commitOffsetSchemaV3 =
+        new Schema(
+            new Field("offset", Type.INT64, ""),
+            new Field("leader_epoch", Type.INT32, ""),
+            new Field("metadata", Type.STRING, ""),
+            new Field("commit_timestamp", Type.INT64, "")
+        );
+
+    final Schema commitOffsetSchemaV4 = new Schema(
+        new Field("offset", Type.INT64, ""),
+        new Field("leader_epoch", Type.INT32, ""),
+        new Field("metadata", Type.COMPACT_STRING, ""),
+        new Field("commit_timestamp", Type.INT64, ""),
+        Field.TaggedFieldsSection.of()
+    );
+
+    final Schema metadataSchema0 =
+        new Schema(
+            new Field("protocol_type", Type.STRING, ""),
+            new Field("generation", Type.INT32, ""),
+            new Field("protocol", Type.NULLABLE_STRING, ""),
+            new Field("leader", Type.NULLABLE_STRING, ""),
+            new Field("members", new ArrayOf(new Schema(
+                new Field("member_id", Type.STRING, ""),
+                new Field("client_id", Type.STRING, ""),
+                new Field("client_host", Type.STRING, ""),
+                new Field("session_timeout", Type.INT32, ""),
+                new Field("subscription", Type.BYTES, ""),
+                new Field("assignment", Type.BYTES, "")
+            )), "")
+        );
+
+    final Schema metadataSchema1 =
+        new Schema(
+            new Field("protocol_type", Type.STRING, ""),
+            new Field("generation", Type.INT32, ""),
+            new Field("protocol", Type.NULLABLE_STRING, ""),
+            new Field("leader", Type.NULLABLE_STRING, ""),
+            new Field("members", new ArrayOf(new Schema(
+                new Field("member_id", Type.STRING, ""),
+                new Field("client_id", Type.STRING, ""),
+                new Field("client_host", Type.STRING, ""),
+                new Field("rebalance_timeout", Type.INT32, ""),
+                new Field("session_timeout", Type.INT32, ""),
+                new Field("subscription", Type.BYTES, ""),
+                new Field("assignment", Type.BYTES, "")
+            )), "")
+        );
+
+    final Schema metadataSchema2 =
+        new Schema(
+            new Field("protocol_type", Type.STRING, ""),
+            new Field("generation", Type.INT32, ""),
+            new Field("protocol", Type.NULLABLE_STRING, ""),
+            new Field("leader", Type.NULLABLE_STRING, ""),
+            new Field("current_state_timestamp", Type.INT64, ""),
+            new Field("members", new ArrayOf(new Schema(
+                new Field("member_id", Type.STRING, ""),
+                new Field("client_id", Type.STRING, ""),
+                new Field("client_host", Type.STRING, ""),
+                new Field("rebalance_timeout", Type.INT32, ""),
+                new Field("session_timeout", Type.INT32, ""),
+                new Field("subscription", Type.BYTES, ""),
+                new Field("assignment", Type.BYTES, "")
+            )), "")
+        );
+
+    final Schema metadataSchema3 =
+        new Schema(
+            new Field("protocol_type", Type.STRING, ""),
+            new Field("generation", Type.INT32, ""),
+            new Field("protocol", Type.NULLABLE_STRING, ""),
+            new Field("leader", Type.NULLABLE_STRING, ""),
+            new Field("current_state_timestamp", Type.INT64, ""),
+            new Field("members", new ArrayOf(new Schema(
+                new Field("member_id", Type.STRING, ""),
+                new Field("group_instance_id", Type.NULLABLE_STRING, ""),
+                new Field("client_id", Type.STRING, ""),
+                new Field("client_host", Type.STRING, ""),
+                new Field("rebalance_timeout", Type.INT32, ""),
+                new Field("session_timeout", Type.INT32, ""),
+                new Field("subscription", Type.BYTES, ""),
+                new Field("assignment", Type.BYTES, "")
+            )), "")
+        );
+
+    final Schema metadataSchema4 =
+        new Schema(
+            new Field("protocol_type", Type.COMPACT_STRING, ""),
+            new Field("generation", Type.INT32, ""),
+            new Field("protocol", Type.COMPACT_NULLABLE_STRING, ""),
+            new Field("leader", Type.COMPACT_NULLABLE_STRING, ""),
+            new Field("current_state_timestamp", Type.INT64, ""),
+            new Field("members", new CompactArrayOf(new Schema(
+                new Field("member_id", Type.COMPACT_STRING, ""),
+                new Field("group_instance_id", Type.COMPACT_NULLABLE_STRING, ""),
+                new Field("client_id", Type.COMPACT_STRING, ""),
+                new Field("client_host", Type.COMPACT_STRING, ""),
+                new Field("rebalance_timeout", Type.INT32, ""),
+                new Field("session_timeout", Type.INT32, ""),
+                new Field("subscription", Type.COMPACT_BYTES, ""),
+                new Field("assignment", Type.COMPACT_BYTES, ""),
+                Field.TaggedFieldsSection.of()
+            )), ""),
+            Field.TaggedFieldsSection.of()
+        );
+
+    return (headers, data) -> {
+      String result;
+      var bb = ByteBuffer.wrap(data);
+      short version = bb.getShort();
+      // ideally, we should distinguish if value is commit or metadata
+      // by checking record's key, but our current serde structure doesn't allow that.
+      // so, we trying to parse into metadata first and after into commit msg
+      try {
+        result = toJson(
+            switch (version) {
+              case 0 -> metadataSchema0.read(bb);
+              case 1 -> metadataSchema1.read(bb);
+              case 2 -> metadataSchema2.read(bb);
+              case 3 -> metadataSchema3.read(bb);
+              case 4 -> metadataSchema4.read(bb);
+              default -> throw new IllegalArgumentException("Unrecognized version: " + version);
+            }
+        );
+      } catch (Throwable e) {
+        bb = bb.rewind();
+        bb.getShort(); // skipping version
+        result = toJson(
+            switch (version) {
+              case 0 -> commitOffsetSchemaV0.read(bb);
+              case 1 -> commitOffsetSchemaV1.read(bb);
+              case 2 -> commitOffsetSchemaV2.read(bb);
+              case 3 -> commitOffsetSchemaV3.read(bb);
+              case 4 -> commitOffsetSchemaV4.read(bb);
+              default -> throw new IllegalArgumentException("Unrecognized version: " + version);
+            }
+        );
+      }
+
+      if (bb.remaining() != 0) {
+        throw new IllegalArgumentException(
+            "Message buffer is not read to the end, which is likely means message is unrecognized");
+      }
+      return new DeserializeResult(
+          result,
+          DeserializeResult.Type.JSON,
+          Map.of()
+      );
+    };
+  }
+
+  @SneakyThrows
+  private String toJson(Struct s) {
+    return JSON_MAPPER.writeValueAsString(s);
+  }
+}

+ 185 - 0
kafka-ui-api/src/test/java/com/provectus/kafka/ui/serdes/builtin/ConsumerOffsetsSerdeTest.java

@@ -0,0 +1,185 @@
+package com.provectus.kafka.ui.serdes.builtin;
+
+import static com.provectus.kafka.ui.serdes.builtin.ConsumerOffsetsSerde.TOPIC;
+import static org.assertj.core.api.Assertions.assertThat;
+
+import com.fasterxml.jackson.databind.json.JsonMapper;
+import com.provectus.kafka.ui.AbstractIntegrationTest;
+import com.provectus.kafka.ui.producer.KafkaTestProducer;
+import com.provectus.kafka.ui.serde.api.DeserializeResult;
+import com.provectus.kafka.ui.serde.api.Serde;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.UUID;
+import lombok.SneakyThrows;
+import org.apache.kafka.clients.admin.NewTopic;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.serialization.BytesDeserializer;
+import org.apache.kafka.common.utils.Bytes;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+import org.testcontainers.shaded.org.awaitility.Awaitility;
+import reactor.util.function.Tuple2;
+import reactor.util.function.Tuples;
+
+class ConsumerOffsetsSerdeTest extends AbstractIntegrationTest {
+
+  private static final int MSGS_TO_GENERATE = 10;
+
+  private static String consumerGroupName;
+  private static String committedTopic;
+
+  @BeforeAll
+  static void createTopicAndCommitItsOffset() {
+    committedTopic = ConsumerOffsetsSerdeTest.class.getSimpleName() + "-" + UUID.randomUUID();
+    consumerGroupName = committedTopic + "-group";
+    createTopic(new NewTopic(committedTopic, 1, (short) 1));
+
+    try (var producer = KafkaTestProducer.forKafka(kafka)) {
+      for (int i = 0; i < MSGS_TO_GENERATE; i++) {
+        producer.send(committedTopic, "i=" + i);
+      }
+    }
+    try (var consumer = createConsumer(consumerGroupName)) {
+      consumer.subscribe(List.of(committedTopic));
+      int polled = 0;
+      while (polled < MSGS_TO_GENERATE) {
+        polled += consumer.poll(Duration.ofMillis(100)).count();
+      }
+      consumer.commitSync();
+    }
+  }
+
+  @AfterAll
+  static void cleanUp() {
+    deleteTopic(committedTopic);
+  }
+
+  @Test
+  void canOnlyDeserializeConsumerOffsetsTopic() {
+    var serde = new ConsumerOffsetsSerde();
+    assertThat(serde.canDeserialize(ConsumerOffsetsSerde.TOPIC, Serde.Target.KEY)).isTrue();
+    assertThat(serde.canDeserialize(ConsumerOffsetsSerde.TOPIC, Serde.Target.VALUE)).isTrue();
+    assertThat(serde.canDeserialize("anyOtherTopic", Serde.Target.KEY)).isFalse();
+    assertThat(serde.canDeserialize("anyOtherTopic", Serde.Target.VALUE)).isFalse();
+  }
+
+  @Test
+  void deserializesMessagesMadeByConsumerActivity() {
+    var serde = new ConsumerOffsetsSerde();
+    var keyDeserializer = serde.deserializer(TOPIC, Serde.Target.KEY);
+    var valueDeserializer = serde.deserializer(TOPIC, Serde.Target.VALUE);
+
+    try (var consumer = createConsumer(consumerGroupName + "-check")) {
+      consumer.subscribe(List.of(ConsumerOffsetsSerde.TOPIC));
+      List<Tuple2<DeserializeResult, DeserializeResult>> polled = new ArrayList<>();
+
+      Awaitility.await()
+          .pollInSameThread()
+          .atMost(Duration.ofMinutes(1))
+          .untilAsserted(() -> {
+            for (var rec : consumer.poll(Duration.ofMillis(200))) {
+              DeserializeResult key = rec.key() != null
+                  ? keyDeserializer.deserialize(null, rec.key().get())
+                  : null;
+              DeserializeResult val = rec.value() != null
+                  ? valueDeserializer.deserialize(null, rec.value().get())
+                  : null;
+              if (key != null && val != null) {
+                polled.add(Tuples.of(key, val));
+              }
+            }
+            assertThat(polled).anyMatch(t -> isCommitMessage(t.getT1(), t.getT2()));
+            assertThat(polled).anyMatch(t -> isGroupMetadataMessage(t.getT1(), t.getT2()));
+          });
+    }
+  }
+
+  // Sample commit record:
+  //
+  // key: {
+  //  "group": "test_Members_3",
+  //  "topic": "test",
+  //  "partition": 0
+  // }
+  //
+  // value:
+  // {
+  //  "offset": 2,
+  //  "leader_epoch": 0,
+  //  "metadata": "",
+  //  "commit_timestamp": 1683112980588
+  // }
+  private boolean isCommitMessage(DeserializeResult key, DeserializeResult value) {
+    var keyJson = toMapFromJsom(key);
+    boolean keyIsOk = consumerGroupName.equals(keyJson.get("group"))
+        && committedTopic.equals(keyJson.get("topic"))
+        && ((Integer) 0).equals(keyJson.get("partition"));
+
+    var valueJson = toMapFromJsom(value);
+    boolean valueIsOk = valueJson.containsKey("offset")
+        && valueJson.get("offset").equals(MSGS_TO_GENERATE)
+        && valueJson.containsKey("commit_timestamp");
+
+    return keyIsOk && valueIsOk;
+  }
+
+  // Sample group metadata record:
+  //
+  // key: {
+  //  "group": "test_Members_3"
+  // }
+  //
+  // value:
+  // {
+  //  "protocol_type": "consumer",
+  //  "generation": 1,
+  //  "protocol": "range",
+  //  "leader": "consumer-test_Members_3-1-5a37876e-e42f-420e-9c7d-6902889bd5dd",
+  //  "current_state_timestamp": 1683112974561,
+  //  "members": [
+  //    {
+  //      "member_id": "consumer-test_Members_3-1-5a37876e-e42f-420e-9c7d-6902889bd5dd",
+  //      "group_instance_id": null,
+  //      "client_id": "consumer-test_Members_3-1",
+  //      "client_host": "/192.168.16.1",
+  //      "rebalance_timeout": 300000,
+  //      "session_timeout": 45000,
+  //      "subscription": "AAEAAAABAAR0ZXN0/////wAAAAA=",
+  //      "assignment": "AAEAAAABAAR0ZXN0AAAAAQAAAAD/////"
+  //    }
+  //  ]
+  // }
+  private boolean isGroupMetadataMessage(DeserializeResult key, DeserializeResult value) {
+    var keyJson = toMapFromJsom(key);
+    boolean keyIsOk = consumerGroupName.equals(keyJson.get("group")) && keyJson.size() == 1;
+
+    var valueJson = toMapFromJsom(value);
+    boolean valueIsOk = valueJson.keySet()
+        .containsAll(Set.of("protocol_type", "generation", "leader", "members"));
+
+    return keyIsOk && valueIsOk;
+  }
+
+  @SneakyThrows
+  private Map<String, Object> toMapFromJsom(DeserializeResult result) {
+    return new JsonMapper().readValue(result.getResult(), Map.class);
+  }
+
+  private static KafkaConsumer<Bytes, Bytes> createConsumer(String groupId) {
+    Properties props = new Properties();
+    props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
+    props.put(ConsumerConfig.CLIENT_ID_CONFIG, groupId);
+    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka.getBootstrapServers());
+    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, BytesDeserializer.class);
+    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, BytesDeserializer.class);
+    props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+    return new KafkaConsumer<>(props);
+  }
+}