iliax 2 years ago
parent
commit
f52442e574

+ 2 - 0
kafka-ui-api/src/main/java/com/provectus/kafka/ui/serdes/SerdesInitializer.java

@@ -11,6 +11,7 @@ import com.provectus.kafka.ui.serde.api.PropertyResolver;
 import com.provectus.kafka.ui.serde.api.Serde;
 import com.provectus.kafka.ui.serdes.builtin.AvroEmbeddedSerde;
 import com.provectus.kafka.ui.serdes.builtin.Base64Serde;
+import com.provectus.kafka.ui.serdes.builtin.ConsumerOffsetsSerde;
 import com.provectus.kafka.ui.serdes.builtin.Int32Serde;
 import com.provectus.kafka.ui.serdes.builtin.Int64Serde;
 import com.provectus.kafka.ui.serdes.builtin.ProtobufFileSerde;
@@ -37,6 +38,7 @@ public class SerdesInitializer {
   public SerdesInitializer() {
     this(
         ImmutableMap.<String, Class<? extends BuiltInSerde>>builder()
+            .put(ConsumerOffsetsSerde.name(), ConsumerOffsetsSerde.class)
             .put(StringSerde.name(), StringSerde.class)
             .put(SchemaRegistrySerde.name(), SchemaRegistrySerde.class)
             .put(ProtobufFileSerde.name(), ProtobufFileSerde.class)

+ 241 - 0
kafka-ui-api/src/main/java/com/provectus/kafka/ui/serdes/builtin/ConsumerOffsetsSerde.java

@@ -0,0 +1,241 @@
+package com.provectus.kafka.ui.serdes.builtin;
+
+import com.fasterxml.jackson.databind.json.JsonMapper;
+import com.provectus.kafka.ui.serde.api.DeserializeResult;
+import com.provectus.kafka.ui.serde.api.PropertyResolver;
+import com.provectus.kafka.ui.serde.api.SchemaDescription;
+import com.provectus.kafka.ui.serdes.BuiltInSerde;
+import java.nio.ByteBuffer;
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.Optional;
+import lombok.SneakyThrows;
+import org.apache.kafka.common.protocol.ByteBufferAccessor;
+import org.apache.kafka.common.protocol.types.ArrayOf;
+import org.apache.kafka.common.protocol.types.BoundField;
+import org.apache.kafka.common.protocol.types.Field;
+import org.apache.kafka.common.protocol.types.Schema;
+import org.apache.kafka.common.protocol.types.Struct;
+import org.apache.kafka.common.protocol.types.Type;
+
+public class ConsumerOffsetsSerde implements BuiltInSerde {
+
+  private static final String TOPIC = "__consumer_offsets";
+
+  public static String name() {
+    return "__consumer_offsets";
+  }
+
+  @Override
+  public boolean canBeAutoConfigured(PropertyResolver kafkaClusterProperties, PropertyResolver globalProperties) {
+    return true;
+  }
+
+  @Override
+  public Optional<String> getDescription() {
+    return Optional.empty();
+  }
+
+  @Override
+  public Optional<SchemaDescription> getSchema(String topic, Target type) {
+    return Optional.empty();
+  }
+
+  @Override
+  public boolean canDeserialize(String topic, Target type) {
+    return topic.equals(TOPIC);
+  }
+
+  @Override
+  public boolean canSerialize(String topic, Target type) {
+    return false;
+  }
+
+  @Override
+  public Serializer serializer(String topic, Target type) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public Deserializer deserializer(String topic, Target type) {
+    return switch (type) {
+      case KEY -> keyDeserializer();
+      case VALUE -> valueDeserializer();
+    };
+  }
+
+  private Deserializer keyDeserializer() {
+    final Schema commitKeySchema = new Schema(
+        new Field("group", Type.STRING, ""),
+        new Field("topic", Type.STRING, ""),
+        new Field("partition", Type.INT32, "")
+    );
+
+    final Schema groupMetadataSchema = new Schema(
+        new Field("group", Type.STRING, "")
+    );
+
+    return (headers, data) -> {
+      var bb = ByteBuffer.wrap(data);
+      short version = bb.getShort();
+      return new DeserializeResult(
+          toJson(
+              switch (version) {
+                case 0, 1 -> commitKeySchema.read(bb);
+                case 2 -> groupMetadataSchema.read(bb);
+                default -> throw new IllegalStateException("Unknown group metadata message version: " + version);
+              }
+          ),
+          DeserializeResult.Type.JSON,
+          Map.of()
+      );
+    };
+  }
+
+  private Deserializer valueDeserializer() {
+    final Schema valueSchemaV0 =
+        new Schema(
+            new Field("offset", Type.INT64, ""),
+            new Field("metadata", Type.STRING, ""),
+            new Field("commit_timestamp", Type.INT64, "")
+        );
+
+    final Schema valueSchemaV1 =
+        new Schema(
+            new Field("offset", Type.INT64, ""),
+            new Field("metadata", Type.STRING, ""),
+            new Field("commit_timestamp", Type.INT64, ""),
+            new Field("expire_timestamp", Type.INT64, "")
+        );
+
+    final Schema valueSchemaV2 =
+        new Schema(
+            new Field("offset", Type.INT64, ""),
+            new Field("metadata", Type.STRING, ""),
+            new Field("commit_timestamp", Type.INT64, "")
+        );
+
+    final Schema valueSchemaV3 =
+        new Schema(
+            new Field("offset", Type.INT64, ""),
+            new Field("leader_epoch", Type.INT32, ""),
+            new Field("metadata", Type.STRING, ""),
+            new Field("commit_timestamp", Type.INT64, "")
+        );
+
+    final Schema metadataSchema0 =
+        new Schema(
+            new Field("protocol_type", Type.STRING, ""),
+            new Field("generation", Type.INT32, ""),
+            new Field("protocol", Type.NULLABLE_STRING, ""),
+            new Field("leader", Type.NULLABLE_STRING, ""),
+            new Field("members", new ArrayOf(new Schema(
+                new Field("member_id", Type.STRING, ""),
+                new Field("client_id", Type.STRING, ""),
+                new Field("client_host", Type.STRING, ""),
+                new Field("session_timeout", Type.INT32, ""),
+                new Field("subscription", Type.BYTES, ""),
+                new Field("assignment", Type.BYTES, "")
+            )), "")
+        );
+
+    final Schema metadataSchema1 =
+        new Schema(
+            new Field("protocol_type", Type.STRING, ""),
+            new Field("generation", Type.INT32, ""),
+            new Field("protocol", Type.NULLABLE_STRING, ""),
+            new Field("leader", Type.NULLABLE_STRING, ""),
+            new Field("members", new ArrayOf(new Schema(
+                new Field("member_id", Type.STRING, ""),
+                new Field("client_id", Type.STRING, ""),
+                new Field("client_host", Type.STRING, ""),
+                new Field("rebalance_timeout", Type.INT32, ""),
+                new Field("session_timeout", Type.INT32, ""),
+                new Field("subscription", Type.BYTES, ""),
+                new Field("assignment", Type.BYTES, "")
+            )), "")
+        );
+
+    final Schema metadataSchema2 =
+        new Schema(
+            new Field("protocol_type", Type.STRING, ""),
+            new Field("generation", Type.INT32, ""),
+            new Field("protocol", Type.NULLABLE_STRING, ""),
+            new Field("leader", Type.NULLABLE_STRING, ""),
+            new Field("current_state_timestamp", Type.INT64, ""),
+            new Field("members", new ArrayOf(new Schema(
+                new Field("member_id", Type.STRING, ""),
+                new Field("client_id", Type.STRING, ""),
+                new Field("client_host", Type.STRING, ""),
+                new Field("rebalance_timeout", Type.INT32, ""),
+                new Field("session_timeout", Type.INT32, ""),
+                new Field("subscription", Type.BYTES, ""),
+                new Field("assignment", Type.BYTES, "")
+            )), "")
+        );
+
+    final Schema metadataSchema3 =
+        new Schema(
+            new Field("protocol_type", Type.STRING, ""),
+            new Field("generation", Type.INT32, ""),
+            new Field("protocol", Type.NULLABLE_STRING, ""),
+            new Field("leader", Type.NULLABLE_STRING, ""),
+            new Field("current_state_timestamp", Type.INT64, ""),
+            new Field("members", new ArrayOf(new Schema(
+                new Field("member_id", Type.STRING, ""),
+                new Field("group_instance_id", Type.NULLABLE_STRING, ""),
+                new Field("client_id", Type.STRING, ""),
+                new Field("client_host", Type.STRING, ""),
+                new Field("rebalance_timeout", Type.INT32, ""),
+                new Field("session_timeout", Type.INT32, ""),
+                new Field("subscription", Type.BYTES, ""),
+                new Field("assignment", Type.BYTES, "")
+            )), "")
+        );
+
+    return (headers, data) -> {
+      var bb = ByteBuffer.wrap(data);
+      short version = bb.getShort();
+
+      String result = null;
+      try {
+        result = toJson(
+            switch (version) {
+              case 0 -> metadataSchema0.read(bb);
+              case 1 -> metadataSchema1.read(bb);
+              case 2 -> metadataSchema2.read(bb);
+              case 3 -> metadataSchema3.read(bb);
+              default -> throw new IllegalStateException("Unknown offset message version: " + version);
+            }
+        );
+      } catch (Throwable e) {
+        bb = bb.rewind();
+        bb.getShort(); //skipping version
+        result = toJson(
+            switch (version) {
+              case 0 -> valueSchemaV0.read(bb);
+              case 1 -> valueSchemaV1.read(bb);
+              case 2 -> valueSchemaV2.read(bb);
+              case 3 -> valueSchemaV3.read(bb);
+              default -> throw new IllegalStateException("Unknown offset message version: " + version);
+            }
+        );
+      }
+
+      return new DeserializeResult(
+          result,
+          DeserializeResult.Type.STRING,
+          Map.of()
+      );
+    };
+  }
+
+  @SneakyThrows
+  private String toJson(Struct s) {
+    Map<String, Object> map = new LinkedHashMap<>();
+    for (BoundField field : s.schema().fields()) {
+      map.put(field.def.name, s.get(field));
+    }
+    return new JsonMapper().writeValueAsString(map);
+  }
+}

+ 37 - 0
kafka-ui-api/src/main/java/com/provectus/kafka/ui/util/WebClientConfigurator.java

@@ -12,11 +12,17 @@ import io.netty.handler.ssl.SslContextBuilder;
 import io.netty.handler.ssl.SslProvider;
 import java.io.FileInputStream;
 import java.security.KeyStore;
+import java.time.Duration;
+import java.util.List;
+import java.util.Properties;
 import java.util.function.Consumer;
 import javax.annotation.Nullable;
 import javax.net.ssl.KeyManagerFactory;
 import javax.net.ssl.TrustManagerFactory;
 import lombok.SneakyThrows;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.serialization.BytesDeserializer;
 import org.openapitools.jackson.nullable.JsonNullableModule;
 import org.springframework.http.MediaType;
 import org.springframework.http.client.reactive.ReactorClientHttpConnector;
@@ -131,4 +137,35 @@ public class WebClientConfigurator {
   public WebClient build() {
     return builder.build();
   }
+
+  @SneakyThrows
+  public static void main(String[] args) {
+    for (int i = 0; i < 2; i++) {
+      int finalI = i;
+      var t = new Thread(() -> {
+        Properties props = new Properties();
+        props.put(ConsumerConfig.GROUP_ID_CONFIG, "test_Members_3");
+        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
+        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, BytesDeserializer.class);
+        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, BytesDeserializer.class);
+        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+
+        try {
+          Thread.sleep(finalI * 10_000);
+        } catch (Exception e) {
+        }
+
+        var consumer = new KafkaConsumer<String, String>(props);
+        consumer.subscribe(List.of("test"));
+        while (true) {
+          consumer.poll(Duration.ofSeconds(5));
+          consumer.commitSync();
+        }
+      });
+      t.setDaemon(true);
+      t.start();
+    }
+
+    Thread.sleep(600_000);
+  }
 }

+ 10 - 1
kafka-ui-api/src/test/java/com/provectus/kafka/ui/emitter/MessageFiltersTest.java

@@ -61,6 +61,15 @@ class MessageFiltersTest {
   @Nested
   class GroovyScriptFilter {
 
+    @Test
+    void  test(){
+      var test = groovyScriptFilter("value == null")
+          .test(new TopicMessageDTO()
+              .timestamp(OffsetDateTime.now())
+              .content("\u0000\u0003\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0015����\u0000\fAQAAAYBnhg6m\u0000\u0000\u0001�g�\u0016\u000B"));
+      assertThat(test).isTrue();
+    }
+
     @Test
     void throwsExceptionOnInvalidGroovySyntax() {
       assertThrows(ValidationException.class,
@@ -185,4 +194,4 @@ class MessageFiltersTest {
         .partition(1);
   }
 
-}
+}