diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/serdes/SerdesInitializer.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/serdes/SerdesInitializer.java index 26f5041b5d..ac3c2241cf 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/serdes/SerdesInitializer.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/serdes/SerdesInitializer.java @@ -38,7 +38,6 @@ public class SerdesInitializer { public SerdesInitializer() { this( ImmutableMap.>builder() - .put(ConsumerOffsetsSerde.name(), ConsumerOffsetsSerde.class) .put(StringSerde.name(), StringSerde.class) .put(SchemaRegistrySerde.name(), SchemaRegistrySerde.class) .put(ProtobufFileSerde.name(), ProtobufFileSerde.class) @@ -120,6 +119,8 @@ public class SerdesInitializer { } }); + registerTopicRelatedSerde(registeredSerdes); + return new ClusterSerdes( registeredSerdes, Optional.ofNullable(clusterProperties.getDefaultKeySerde()) @@ -134,6 +135,27 @@ public class SerdesInitializer { ); } + /** + * Registers serdse that should only be used for specific (hard-coded) topics, like ConsumerOffsetsSerde. + */ + private void registerTopicRelatedSerde(Map serdes) { + registerConsumerOffsetsSerde(serdes); + } + + private void registerConsumerOffsetsSerde(Map serdes) { + var pattern = Pattern.compile(ConsumerOffsetsSerde.TOPIC); + serdes.put( + ConsumerOffsetsSerde.name(), + new SerdeInstance( + ConsumerOffsetsSerde.name(), + new ConsumerOffsetsSerde(), + pattern, + pattern, + null + ) + ); + } + private SerdeInstance createFallbackSerde() { StringSerde serde = new StringSerde(); serde.configure(PropertyResolverImpl.empty(), PropertyResolverImpl.empty(), PropertyResolverImpl.empty()); diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/serdes/builtin/ConsumerOffsetsSerde.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/serdes/builtin/ConsumerOffsetsSerde.java index ac222d64e4..2941ae8e9b 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/serdes/builtin/ConsumerOffsetsSerde.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/serdes/builtin/ConsumerOffsetsSerde.java @@ -1,31 +1,57 @@ package com.provectus.kafka.ui.serdes.builtin; +import com.fasterxml.jackson.core.JsonGenerator; +import com.fasterxml.jackson.databind.JsonSerializer; +import com.fasterxml.jackson.databind.SerializerProvider; import com.fasterxml.jackson.databind.json.JsonMapper; +import com.fasterxml.jackson.databind.module.SimpleModule; import com.provectus.kafka.ui.serde.api.DeserializeResult; import com.provectus.kafka.ui.serde.api.PropertyResolver; import com.provectus.kafka.ui.serde.api.SchemaDescription; import com.provectus.kafka.ui.serdes.BuiltInSerde; +import java.io.IOException; import java.nio.ByteBuffer; -import java.util.LinkedHashMap; import java.util.Map; import java.util.Optional; import lombok.SneakyThrows; -import org.apache.kafka.common.protocol.ByteBufferAccessor; import org.apache.kafka.common.protocol.types.ArrayOf; import org.apache.kafka.common.protocol.types.BoundField; +import org.apache.kafka.common.protocol.types.CompactArrayOf; import org.apache.kafka.common.protocol.types.Field; import org.apache.kafka.common.protocol.types.Schema; import org.apache.kafka.common.protocol.types.Struct; import org.apache.kafka.common.protocol.types.Type; +// Deserialization logic and message's schemas can be found in +// kafka.coordinator.group.GroupMetadataManager (readMessageKey, readOffsetMessageValue, readGroupMessageValue) public class ConsumerOffsetsSerde implements BuiltInSerde { - private static final String TOPIC = "__consumer_offsets"; + private static final JsonMapper JSON_MAPPER = createMapper(); + + public static final String TOPIC = "__consumer_offsets"; public static String name() { return "__consumer_offsets"; } + private static JsonMapper createMapper() { + var module = new SimpleModule(); + module.addSerializer(Struct.class, new JsonSerializer<>() { + @Override + public void serialize(Struct value, JsonGenerator gen, SerializerProvider serializers) throws IOException { + gen.writeStartObject(); + for (BoundField field : value.schema().fields()) { + var fieldVal = value.get(field); + gen.writeObjectField(field.def.name, fieldVal); + } + gen.writeEndObject(); + } + }); + var mapper = new JsonMapper(); + mapper.registerModule(module); + return mapper; + } + @Override public boolean canBeAutoConfigured(PropertyResolver kafkaClusterProperties, PropertyResolver globalProperties) { return true; @@ -93,14 +119,14 @@ public class ConsumerOffsetsSerde implements BuiltInSerde { } private Deserializer valueDeserializer() { - final Schema valueSchemaV0 = + final Schema commitOffsetSchemaV0 = new Schema( new Field("offset", Type.INT64, ""), new Field("metadata", Type.STRING, ""), new Field("commit_timestamp", Type.INT64, "") ); - final Schema valueSchemaV1 = + final Schema commitOffsetSchemaV1 = new Schema( new Field("offset", Type.INT64, ""), new Field("metadata", Type.STRING, ""), @@ -108,14 +134,14 @@ public class ConsumerOffsetsSerde implements BuiltInSerde { new Field("expire_timestamp", Type.INT64, "") ); - final Schema valueSchemaV2 = + final Schema commitOffsetSchemaV2 = new Schema( new Field("offset", Type.INT64, ""), new Field("metadata", Type.STRING, ""), new Field("commit_timestamp", Type.INT64, "") ); - final Schema valueSchemaV3 = + final Schema commitOffsetSchemaV3 = new Schema( new Field("offset", Type.INT64, ""), new Field("leader_epoch", Type.INT32, ""), @@ -123,6 +149,14 @@ public class ConsumerOffsetsSerde implements BuiltInSerde { new Field("commit_timestamp", Type.INT64, "") ); + final Schema commitOffsetSchemaV4Latest = new Schema( + new Field("offset", Type.INT64, ""), + new Field("leader_epoch", Type.INT32, ""), + new Field("metadata", Type.COMPACT_STRING, ""), + new Field("commit_timestamp", Type.INT64, ""), + Field.TaggedFieldsSection.of() + ); + final Schema metadataSchema0 = new Schema( new Field("protocol_type", Type.STRING, ""), @@ -193,11 +227,31 @@ public class ConsumerOffsetsSerde implements BuiltInSerde { )), "") ); + final Schema metadataSchema4Latest = + new Schema( + new Field("protocol_type", Type.COMPACT_STRING, ""), + new Field("generation", Type.INT32, ""), + new Field("protocol", Type.COMPACT_NULLABLE_STRING, ""), + new Field("leader", Type.COMPACT_NULLABLE_STRING, ""), + new Field("current_state_timestamp", Type.INT64, ""), + new Field("members", new CompactArrayOf(new Schema( + new Field("member_id", Type.COMPACT_STRING, ""), + new Field("group_instance_id", Type.COMPACT_NULLABLE_STRING, ""), + new Field("client_id", Type.COMPACT_STRING, ""), + new Field("client_host", Type.COMPACT_STRING, ""), + new Field("rebalance_timeout", Type.INT32, ""), + new Field("session_timeout", Type.INT32, ""), + new Field("subscription", Type.COMPACT_BYTES, ""), + new Field("assignment", Type.COMPACT_BYTES, ""), + Field.TaggedFieldsSection.of() + )), ""), + Field.TaggedFieldsSection.of() + ); + return (headers, data) -> { + String result; var bb = ByteBuffer.wrap(data); short version = bb.getShort(); - - String result = null; try { result = toJson( switch (version) { @@ -205,26 +259,30 @@ public class ConsumerOffsetsSerde implements BuiltInSerde { case 1 -> metadataSchema1.read(bb); case 2 -> metadataSchema2.read(bb); case 3 -> metadataSchema3.read(bb); - default -> throw new IllegalStateException("Unknown offset message version: " + version); + default -> metadataSchema4Latest.read(bb); } ); } catch (Throwable e) { bb = bb.rewind(); - bb.getShort(); //skipping version + bb.getShort(); // skipping version result = toJson( switch (version) { - case 0 -> valueSchemaV0.read(bb); - case 1 -> valueSchemaV1.read(bb); - case 2 -> valueSchemaV2.read(bb); - case 3 -> valueSchemaV3.read(bb); - default -> throw new IllegalStateException("Unknown offset message version: " + version); + case 0 -> commitOffsetSchemaV0.read(bb); + case 1 -> commitOffsetSchemaV1.read(bb); + case 2 -> commitOffsetSchemaV2.read(bb); + case 3 -> commitOffsetSchemaV3.read(bb); + default -> commitOffsetSchemaV4Latest.read(bb); } ); } + if (bb.remaining() != 0) { + throw new IllegalArgumentException( + "Message buffer is not read to the end, which is likely means message is unrecognized"); + } return new DeserializeResult( result, - DeserializeResult.Type.STRING, + DeserializeResult.Type.JSON, Map.of() ); }; @@ -232,10 +290,6 @@ public class ConsumerOffsetsSerde implements BuiltInSerde { @SneakyThrows private String toJson(Struct s) { - Map map = new LinkedHashMap<>(); - for (BoundField field : s.schema().fields()) { - map.put(field.def.name, s.get(field)); - } - return new JsonMapper().writeValueAsString(map); + return JSON_MAPPER.writeValueAsString(s); } } diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/util/WebClientConfigurator.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/util/WebClientConfigurator.java index 00f872f9a5..65e089da27 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/util/WebClientConfigurator.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/util/WebClientConfigurator.java @@ -5,24 +5,15 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule; import com.provectus.kafka.ui.config.ClustersProperties; import com.provectus.kafka.ui.exception.ValidationException; -import io.netty.buffer.ByteBufAllocator; -import io.netty.handler.ssl.JdkSslContext; import io.netty.handler.ssl.SslContext; import io.netty.handler.ssl.SslContextBuilder; -import io.netty.handler.ssl.SslProvider; import java.io.FileInputStream; import java.security.KeyStore; -import java.time.Duration; -import java.util.List; -import java.util.Properties; import java.util.function.Consumer; import javax.annotation.Nullable; import javax.net.ssl.KeyManagerFactory; import javax.net.ssl.TrustManagerFactory; import lombok.SneakyThrows; -import org.apache.kafka.clients.consumer.ConsumerConfig; -import org.apache.kafka.clients.consumer.KafkaConsumer; -import org.apache.kafka.common.serialization.BytesDeserializer; import org.openapitools.jackson.nullable.JsonNullableModule; import org.springframework.http.MediaType; import org.springframework.http.client.reactive.ReactorClientHttpConnector; @@ -137,35 +128,4 @@ public class WebClientConfigurator { public WebClient build() { return builder.build(); } - - @SneakyThrows - public static void main(String[] args) { - for (int i = 0; i < 2; i++) { - int finalI = i; - var t = new Thread(() -> { - Properties props = new Properties(); - props.put(ConsumerConfig.GROUP_ID_CONFIG, "test_Members_3"); - props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); - props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, BytesDeserializer.class); - props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, BytesDeserializer.class); - props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); - - try { - Thread.sleep(finalI * 10_000); - } catch (Exception e) { - } - - var consumer = new KafkaConsumer(props); - consumer.subscribe(List.of("test")); - while (true) { - consumer.poll(Duration.ofSeconds(5)); - consumer.commitSync(); - } - }); - t.setDaemon(true); - t.start(); - } - - Thread.sleep(600_000); - } } diff --git a/kafka-ui-api/src/test/java/com/provectus/kafka/ui/emitter/MessageFiltersTest.java b/kafka-ui-api/src/test/java/com/provectus/kafka/ui/emitter/MessageFiltersTest.java index c631a96011..aea2db378f 100644 --- a/kafka-ui-api/src/test/java/com/provectus/kafka/ui/emitter/MessageFiltersTest.java +++ b/kafka-ui-api/src/test/java/com/provectus/kafka/ui/emitter/MessageFiltersTest.java @@ -61,15 +61,6 @@ class MessageFiltersTest { @Nested class GroovyScriptFilter { - @Test - void test(){ - var test = groovyScriptFilter("value == null") - .test(new TopicMessageDTO() - .timestamp(OffsetDateTime.now()) - .content("\u0000\u0003\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0015����\u0000\fAQAAAYBnhg6m\u0000\u0000\u0001�g�\u0016\u000B")); - assertThat(test).isTrue(); - } - @Test void throwsExceptionOnInvalidGroovySyntax() { assertThrows(ValidationException.class,