diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/serdes/SerdesInitializer.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/serdes/SerdesInitializer.java index 66692894a6..26f5041b5d 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/serdes/SerdesInitializer.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/serdes/SerdesInitializer.java @@ -11,6 +11,7 @@ import com.provectus.kafka.ui.serde.api.PropertyResolver; import com.provectus.kafka.ui.serde.api.Serde; import com.provectus.kafka.ui.serdes.builtin.AvroEmbeddedSerde; import com.provectus.kafka.ui.serdes.builtin.Base64Serde; +import com.provectus.kafka.ui.serdes.builtin.ConsumerOffsetsSerde; import com.provectus.kafka.ui.serdes.builtin.Int32Serde; import com.provectus.kafka.ui.serdes.builtin.Int64Serde; import com.provectus.kafka.ui.serdes.builtin.ProtobufFileSerde; @@ -37,6 +38,7 @@ public class SerdesInitializer { public SerdesInitializer() { this( ImmutableMap.>builder() + .put(ConsumerOffsetsSerde.name(), ConsumerOffsetsSerde.class) .put(StringSerde.name(), StringSerde.class) .put(SchemaRegistrySerde.name(), SchemaRegistrySerde.class) .put(ProtobufFileSerde.name(), ProtobufFileSerde.class) diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/serdes/builtin/ConsumerOffsetsSerde.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/serdes/builtin/ConsumerOffsetsSerde.java new file mode 100644 index 0000000000..ac222d64e4 --- /dev/null +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/serdes/builtin/ConsumerOffsetsSerde.java @@ -0,0 +1,241 @@ +package com.provectus.kafka.ui.serdes.builtin; + +import com.fasterxml.jackson.databind.json.JsonMapper; +import com.provectus.kafka.ui.serde.api.DeserializeResult; +import com.provectus.kafka.ui.serde.api.PropertyResolver; +import com.provectus.kafka.ui.serde.api.SchemaDescription; +import com.provectus.kafka.ui.serdes.BuiltInSerde; +import java.nio.ByteBuffer; +import java.util.LinkedHashMap; +import java.util.Map; +import java.util.Optional; +import lombok.SneakyThrows; +import org.apache.kafka.common.protocol.ByteBufferAccessor; +import org.apache.kafka.common.protocol.types.ArrayOf; +import org.apache.kafka.common.protocol.types.BoundField; +import org.apache.kafka.common.protocol.types.Field; +import org.apache.kafka.common.protocol.types.Schema; +import org.apache.kafka.common.protocol.types.Struct; +import org.apache.kafka.common.protocol.types.Type; + +public class ConsumerOffsetsSerde implements BuiltInSerde { + + private static final String TOPIC = "__consumer_offsets"; + + public static String name() { + return "__consumer_offsets"; + } + + @Override + public boolean canBeAutoConfigured(PropertyResolver kafkaClusterProperties, PropertyResolver globalProperties) { + return true; + } + + @Override + public Optional getDescription() { + return Optional.empty(); + } + + @Override + public Optional getSchema(String topic, Target type) { + return Optional.empty(); + } + + @Override + public boolean canDeserialize(String topic, Target type) { + return topic.equals(TOPIC); + } + + @Override + public boolean canSerialize(String topic, Target type) { + return false; + } + + @Override + public Serializer serializer(String topic, Target type) { + throw new UnsupportedOperationException(); + } + + @Override + public Deserializer deserializer(String topic, Target type) { + return switch (type) { + case KEY -> keyDeserializer(); + case VALUE -> valueDeserializer(); + }; + } + + private Deserializer keyDeserializer() { + final Schema commitKeySchema = new Schema( + new Field("group", Type.STRING, ""), + new Field("topic", Type.STRING, ""), + new Field("partition", Type.INT32, "") + ); + + final Schema groupMetadataSchema = new Schema( + new Field("group", Type.STRING, "") + ); + + return (headers, data) -> { + var bb = ByteBuffer.wrap(data); + short version = bb.getShort(); + return new DeserializeResult( + toJson( + switch (version) { + case 0, 1 -> commitKeySchema.read(bb); + case 2 -> groupMetadataSchema.read(bb); + default -> throw new IllegalStateException("Unknown group metadata message version: " + version); + } + ), + DeserializeResult.Type.JSON, + Map.of() + ); + }; + } + + private Deserializer valueDeserializer() { + final Schema valueSchemaV0 = + new Schema( + new Field("offset", Type.INT64, ""), + new Field("metadata", Type.STRING, ""), + new Field("commit_timestamp", Type.INT64, "") + ); + + final Schema valueSchemaV1 = + new Schema( + new Field("offset", Type.INT64, ""), + new Field("metadata", Type.STRING, ""), + new Field("commit_timestamp", Type.INT64, ""), + new Field("expire_timestamp", Type.INT64, "") + ); + + final Schema valueSchemaV2 = + new Schema( + new Field("offset", Type.INT64, ""), + new Field("metadata", Type.STRING, ""), + new Field("commit_timestamp", Type.INT64, "") + ); + + final Schema valueSchemaV3 = + new Schema( + new Field("offset", Type.INT64, ""), + new Field("leader_epoch", Type.INT32, ""), + new Field("metadata", Type.STRING, ""), + new Field("commit_timestamp", Type.INT64, "") + ); + + final Schema metadataSchema0 = + new Schema( + new Field("protocol_type", Type.STRING, ""), + new Field("generation", Type.INT32, ""), + new Field("protocol", Type.NULLABLE_STRING, ""), + new Field("leader", Type.NULLABLE_STRING, ""), + new Field("members", new ArrayOf(new Schema( + new Field("member_id", Type.STRING, ""), + new Field("client_id", Type.STRING, ""), + new Field("client_host", Type.STRING, ""), + new Field("session_timeout", Type.INT32, ""), + new Field("subscription", Type.BYTES, ""), + new Field("assignment", Type.BYTES, "") + )), "") + ); + + final Schema metadataSchema1 = + new Schema( + new Field("protocol_type", Type.STRING, ""), + new Field("generation", Type.INT32, ""), + new Field("protocol", Type.NULLABLE_STRING, ""), + new Field("leader", Type.NULLABLE_STRING, ""), + new Field("members", new ArrayOf(new Schema( + new Field("member_id", Type.STRING, ""), + new Field("client_id", Type.STRING, ""), + new Field("client_host", Type.STRING, ""), + new Field("rebalance_timeout", Type.INT32, ""), + new Field("session_timeout", Type.INT32, ""), + new Field("subscription", Type.BYTES, ""), + new Field("assignment", Type.BYTES, "") + )), "") + ); + + final Schema metadataSchema2 = + new Schema( + new Field("protocol_type", Type.STRING, ""), + new Field("generation", Type.INT32, ""), + new Field("protocol", Type.NULLABLE_STRING, ""), + new Field("leader", Type.NULLABLE_STRING, ""), + new Field("current_state_timestamp", Type.INT64, ""), + new Field("members", new ArrayOf(new Schema( + new Field("member_id", Type.STRING, ""), + new Field("client_id", Type.STRING, ""), + new Field("client_host", Type.STRING, ""), + new Field("rebalance_timeout", Type.INT32, ""), + new Field("session_timeout", Type.INT32, ""), + new Field("subscription", Type.BYTES, ""), + new Field("assignment", Type.BYTES, "") + )), "") + ); + + final Schema metadataSchema3 = + new Schema( + new Field("protocol_type", Type.STRING, ""), + new Field("generation", Type.INT32, ""), + new Field("protocol", Type.NULLABLE_STRING, ""), + new Field("leader", Type.NULLABLE_STRING, ""), + new Field("current_state_timestamp", Type.INT64, ""), + new Field("members", new ArrayOf(new Schema( + new Field("member_id", Type.STRING, ""), + new Field("group_instance_id", Type.NULLABLE_STRING, ""), + new Field("client_id", Type.STRING, ""), + new Field("client_host", Type.STRING, ""), + new Field("rebalance_timeout", Type.INT32, ""), + new Field("session_timeout", Type.INT32, ""), + new Field("subscription", Type.BYTES, ""), + new Field("assignment", Type.BYTES, "") + )), "") + ); + + return (headers, data) -> { + var bb = ByteBuffer.wrap(data); + short version = bb.getShort(); + + String result = null; + try { + result = toJson( + switch (version) { + case 0 -> metadataSchema0.read(bb); + case 1 -> metadataSchema1.read(bb); + case 2 -> metadataSchema2.read(bb); + case 3 -> metadataSchema3.read(bb); + default -> throw new IllegalStateException("Unknown offset message version: " + version); + } + ); + } catch (Throwable e) { + bb = bb.rewind(); + bb.getShort(); //skipping version + result = toJson( + switch (version) { + case 0 -> valueSchemaV0.read(bb); + case 1 -> valueSchemaV1.read(bb); + case 2 -> valueSchemaV2.read(bb); + case 3 -> valueSchemaV3.read(bb); + default -> throw new IllegalStateException("Unknown offset message version: " + version); + } + ); + } + + return new DeserializeResult( + result, + DeserializeResult.Type.STRING, + Map.of() + ); + }; + } + + @SneakyThrows + private String toJson(Struct s) { + Map map = new LinkedHashMap<>(); + for (BoundField field : s.schema().fields()) { + map.put(field.def.name, s.get(field)); + } + return new JsonMapper().writeValueAsString(map); + } +} diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/util/WebClientConfigurator.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/util/WebClientConfigurator.java index fe2240bd6a..00f872f9a5 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/util/WebClientConfigurator.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/util/WebClientConfigurator.java @@ -12,11 +12,17 @@ import io.netty.handler.ssl.SslContextBuilder; import io.netty.handler.ssl.SslProvider; import java.io.FileInputStream; import java.security.KeyStore; +import java.time.Duration; +import java.util.List; +import java.util.Properties; import java.util.function.Consumer; import javax.annotation.Nullable; import javax.net.ssl.KeyManagerFactory; import javax.net.ssl.TrustManagerFactory; import lombok.SneakyThrows; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.common.serialization.BytesDeserializer; import org.openapitools.jackson.nullable.JsonNullableModule; import org.springframework.http.MediaType; import org.springframework.http.client.reactive.ReactorClientHttpConnector; @@ -131,4 +137,35 @@ public class WebClientConfigurator { public WebClient build() { return builder.build(); } + + @SneakyThrows + public static void main(String[] args) { + for (int i = 0; i < 2; i++) { + int finalI = i; + var t = new Thread(() -> { + Properties props = new Properties(); + props.put(ConsumerConfig.GROUP_ID_CONFIG, "test_Members_3"); + props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); + props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, BytesDeserializer.class); + props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, BytesDeserializer.class); + props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); + + try { + Thread.sleep(finalI * 10_000); + } catch (Exception e) { + } + + var consumer = new KafkaConsumer(props); + consumer.subscribe(List.of("test")); + while (true) { + consumer.poll(Duration.ofSeconds(5)); + consumer.commitSync(); + } + }); + t.setDaemon(true); + t.start(); + } + + Thread.sleep(600_000); + } } diff --git a/kafka-ui-api/src/test/java/com/provectus/kafka/ui/emitter/MessageFiltersTest.java b/kafka-ui-api/src/test/java/com/provectus/kafka/ui/emitter/MessageFiltersTest.java index 94a377c9c8..c631a96011 100644 --- a/kafka-ui-api/src/test/java/com/provectus/kafka/ui/emitter/MessageFiltersTest.java +++ b/kafka-ui-api/src/test/java/com/provectus/kafka/ui/emitter/MessageFiltersTest.java @@ -61,6 +61,15 @@ class MessageFiltersTest { @Nested class GroovyScriptFilter { + @Test + void test(){ + var test = groovyScriptFilter("value == null") + .test(new TopicMessageDTO() + .timestamp(OffsetDateTime.now()) + .content("\u0000\u0003\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0015����\u0000\fAQAAAYBnhg6m\u0000\u0000\u0001�g�\u0016\u000B")); + assertThat(test).isTrue(); + } + @Test void throwsExceptionOnInvalidGroovySyntax() { assertThrows(ValidationException.class, @@ -185,4 +194,4 @@ class MessageFiltersTest { .partition(1); } -} \ No newline at end of file +}