iliax пре 2 година
родитељ
комит
a8e8637731

+ 23 - 1
kafka-ui-api/src/main/java/com/provectus/kafka/ui/serdes/SerdesInitializer.java

@@ -38,7 +38,6 @@ public class SerdesInitializer {
   public SerdesInitializer() {
     this(
         ImmutableMap.<String, Class<? extends BuiltInSerde>>builder()
-            .put(ConsumerOffsetsSerde.name(), ConsumerOffsetsSerde.class)
             .put(StringSerde.name(), StringSerde.class)
             .put(SchemaRegistrySerde.name(), SchemaRegistrySerde.class)
             .put(ProtobufFileSerde.name(), ProtobufFileSerde.class)
@@ -120,6 +119,8 @@ public class SerdesInitializer {
       }
     });
 
+    registerTopicRelatedSerde(registeredSerdes);
+
     return new ClusterSerdes(
         registeredSerdes,
         Optional.ofNullable(clusterProperties.getDefaultKeySerde())
@@ -134,6 +135,27 @@ public class SerdesInitializer {
     );
   }
 
+  /**
+   * Registers serdse that should only be used for specific (hard-coded) topics, like ConsumerOffsetsSerde.
+   */
+  private void registerTopicRelatedSerde(Map<String, SerdeInstance> serdes) {
+    registerConsumerOffsetsSerde(serdes);
+  }
+
+  private void registerConsumerOffsetsSerde(Map<String, SerdeInstance> serdes) {
+    var pattern = Pattern.compile(ConsumerOffsetsSerde.TOPIC);
+    serdes.put(
+        ConsumerOffsetsSerde.name(),
+        new SerdeInstance(
+            ConsumerOffsetsSerde.name(),
+            new ConsumerOffsetsSerde(),
+            pattern,
+            pattern,
+            null
+        )
+    );
+  }
+
   private SerdeInstance createFallbackSerde() {
     StringSerde serde = new StringSerde();
     serde.configure(PropertyResolverImpl.empty(), PropertyResolverImpl.empty(), PropertyResolverImpl.empty());

+ 76 - 22
kafka-ui-api/src/main/java/com/provectus/kafka/ui/serdes/builtin/ConsumerOffsetsSerde.java

@@ -1,31 +1,57 @@
 package com.provectus.kafka.ui.serdes.builtin;
 
+import com.fasterxml.jackson.core.JsonGenerator;
+import com.fasterxml.jackson.databind.JsonSerializer;
+import com.fasterxml.jackson.databind.SerializerProvider;
 import com.fasterxml.jackson.databind.json.JsonMapper;
+import com.fasterxml.jackson.databind.module.SimpleModule;
 import com.provectus.kafka.ui.serde.api.DeserializeResult;
 import com.provectus.kafka.ui.serde.api.PropertyResolver;
 import com.provectus.kafka.ui.serde.api.SchemaDescription;
 import com.provectus.kafka.ui.serdes.BuiltInSerde;
+import java.io.IOException;
 import java.nio.ByteBuffer;
-import java.util.LinkedHashMap;
 import java.util.Map;
 import java.util.Optional;
 import lombok.SneakyThrows;
-import org.apache.kafka.common.protocol.ByteBufferAccessor;
 import org.apache.kafka.common.protocol.types.ArrayOf;
 import org.apache.kafka.common.protocol.types.BoundField;
+import org.apache.kafka.common.protocol.types.CompactArrayOf;
 import org.apache.kafka.common.protocol.types.Field;
 import org.apache.kafka.common.protocol.types.Schema;
 import org.apache.kafka.common.protocol.types.Struct;
 import org.apache.kafka.common.protocol.types.Type;
 
+// Deserialization logic and message's schemas can be found in
+// kafka.coordinator.group.GroupMetadataManager (readMessageKey, readOffsetMessageValue, readGroupMessageValue)
 public class ConsumerOffsetsSerde implements BuiltInSerde {
 
-  private static final String TOPIC = "__consumer_offsets";
+  private static final JsonMapper JSON_MAPPER = createMapper();
+
+  public static final String TOPIC = "__consumer_offsets";
 
   public static String name() {
     return "__consumer_offsets";
   }
 
+  private static JsonMapper createMapper() {
+    var module = new SimpleModule();
+    module.addSerializer(Struct.class, new JsonSerializer<>() {
+      @Override
+      public void serialize(Struct value, JsonGenerator gen, SerializerProvider serializers) throws IOException {
+        gen.writeStartObject();
+        for (BoundField field : value.schema().fields()) {
+          var fieldVal = value.get(field);
+          gen.writeObjectField(field.def.name, fieldVal);
+        }
+        gen.writeEndObject();
+      }
+    });
+    var mapper = new JsonMapper();
+    mapper.registerModule(module);
+    return mapper;
+  }
+
   @Override
   public boolean canBeAutoConfigured(PropertyResolver kafkaClusterProperties, PropertyResolver globalProperties) {
     return true;
@@ -93,14 +119,14 @@ public class ConsumerOffsetsSerde implements BuiltInSerde {
   }
 
   private Deserializer valueDeserializer() {
-    final Schema valueSchemaV0 =
+    final Schema commitOffsetSchemaV0 =
         new Schema(
             new Field("offset", Type.INT64, ""),
             new Field("metadata", Type.STRING, ""),
             new Field("commit_timestamp", Type.INT64, "")
         );
 
-    final Schema valueSchemaV1 =
+    final Schema commitOffsetSchemaV1 =
         new Schema(
             new Field("offset", Type.INT64, ""),
             new Field("metadata", Type.STRING, ""),
@@ -108,14 +134,14 @@ public class ConsumerOffsetsSerde implements BuiltInSerde {
             new Field("expire_timestamp", Type.INT64, "")
         );
 
-    final Schema valueSchemaV2 =
+    final Schema commitOffsetSchemaV2 =
         new Schema(
             new Field("offset", Type.INT64, ""),
             new Field("metadata", Type.STRING, ""),
             new Field("commit_timestamp", Type.INT64, "")
         );
 
-    final Schema valueSchemaV3 =
+    final Schema commitOffsetSchemaV3 =
         new Schema(
             new Field("offset", Type.INT64, ""),
             new Field("leader_epoch", Type.INT32, ""),
@@ -123,6 +149,14 @@ public class ConsumerOffsetsSerde implements BuiltInSerde {
             new Field("commit_timestamp", Type.INT64, "")
         );
 
+    final Schema commitOffsetSchemaV4Latest = new Schema(
+        new Field("offset", Type.INT64, ""),
+        new Field("leader_epoch", Type.INT32, ""),
+        new Field("metadata", Type.COMPACT_STRING, ""),
+        new Field("commit_timestamp", Type.INT64, ""),
+        Field.TaggedFieldsSection.of()
+    );
+
     final Schema metadataSchema0 =
         new Schema(
             new Field("protocol_type", Type.STRING, ""),
@@ -193,11 +227,31 @@ public class ConsumerOffsetsSerde implements BuiltInSerde {
             )), "")
         );
 
+    final Schema metadataSchema4Latest =
+        new Schema(
+            new Field("protocol_type", Type.COMPACT_STRING, ""),
+            new Field("generation", Type.INT32, ""),
+            new Field("protocol", Type.COMPACT_NULLABLE_STRING, ""),
+            new Field("leader", Type.COMPACT_NULLABLE_STRING, ""),
+            new Field("current_state_timestamp", Type.INT64, ""),
+            new Field("members", new CompactArrayOf(new Schema(
+                new Field("member_id", Type.COMPACT_STRING, ""),
+                new Field("group_instance_id", Type.COMPACT_NULLABLE_STRING, ""),
+                new Field("client_id", Type.COMPACT_STRING, ""),
+                new Field("client_host", Type.COMPACT_STRING, ""),
+                new Field("rebalance_timeout", Type.INT32, ""),
+                new Field("session_timeout", Type.INT32, ""),
+                new Field("subscription", Type.COMPACT_BYTES, ""),
+                new Field("assignment", Type.COMPACT_BYTES, ""),
+                Field.TaggedFieldsSection.of()
+            )), ""),
+            Field.TaggedFieldsSection.of()
+        );
+
     return (headers, data) -> {
+      String result;
       var bb = ByteBuffer.wrap(data);
       short version = bb.getShort();
-
-      String result = null;
       try {
         result = toJson(
             switch (version) {
@@ -205,26 +259,30 @@ public class ConsumerOffsetsSerde implements BuiltInSerde {
               case 1 -> metadataSchema1.read(bb);
               case 2 -> metadataSchema2.read(bb);
               case 3 -> metadataSchema3.read(bb);
-              default -> throw new IllegalStateException("Unknown offset message version: " + version);
+              default -> metadataSchema4Latest.read(bb);
             }
         );
       } catch (Throwable e) {
         bb = bb.rewind();
-        bb.getShort(); //skipping version
+        bb.getShort(); // skipping version
         result = toJson(
             switch (version) {
-              case 0 -> valueSchemaV0.read(bb);
-              case 1 -> valueSchemaV1.read(bb);
-              case 2 -> valueSchemaV2.read(bb);
-              case 3 -> valueSchemaV3.read(bb);
-              default -> throw new IllegalStateException("Unknown offset message version: " + version);
+              case 0 -> commitOffsetSchemaV0.read(bb);
+              case 1 -> commitOffsetSchemaV1.read(bb);
+              case 2 -> commitOffsetSchemaV2.read(bb);
+              case 3 -> commitOffsetSchemaV3.read(bb);
+              default -> commitOffsetSchemaV4Latest.read(bb);
             }
         );
       }
 
+      if (bb.remaining() != 0) {
+        throw new IllegalArgumentException(
+            "Message buffer is not read to the end, which is likely means message is unrecognized");
+      }
       return new DeserializeResult(
           result,
-          DeserializeResult.Type.STRING,
+          DeserializeResult.Type.JSON,
           Map.of()
       );
     };
@@ -232,10 +290,6 @@ public class ConsumerOffsetsSerde implements BuiltInSerde {
 
   @SneakyThrows
   private String toJson(Struct s) {
-    Map<String, Object> map = new LinkedHashMap<>();
-    for (BoundField field : s.schema().fields()) {
-      map.put(field.def.name, s.get(field));
-    }
-    return new JsonMapper().writeValueAsString(map);
+    return JSON_MAPPER.writeValueAsString(s);
   }
 }

+ 0 - 40
kafka-ui-api/src/main/java/com/provectus/kafka/ui/util/WebClientConfigurator.java

@@ -5,24 +5,15 @@ import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
 import com.provectus.kafka.ui.config.ClustersProperties;
 import com.provectus.kafka.ui.exception.ValidationException;
-import io.netty.buffer.ByteBufAllocator;
-import io.netty.handler.ssl.JdkSslContext;
 import io.netty.handler.ssl.SslContext;
 import io.netty.handler.ssl.SslContextBuilder;
-import io.netty.handler.ssl.SslProvider;
 import java.io.FileInputStream;
 import java.security.KeyStore;
-import java.time.Duration;
-import java.util.List;
-import java.util.Properties;
 import java.util.function.Consumer;
 import javax.annotation.Nullable;
 import javax.net.ssl.KeyManagerFactory;
 import javax.net.ssl.TrustManagerFactory;
 import lombok.SneakyThrows;
-import org.apache.kafka.clients.consumer.ConsumerConfig;
-import org.apache.kafka.clients.consumer.KafkaConsumer;
-import org.apache.kafka.common.serialization.BytesDeserializer;
 import org.openapitools.jackson.nullable.JsonNullableModule;
 import org.springframework.http.MediaType;
 import org.springframework.http.client.reactive.ReactorClientHttpConnector;
@@ -137,35 +128,4 @@ public class WebClientConfigurator {
   public WebClient build() {
     return builder.build();
   }
-
-  @SneakyThrows
-  public static void main(String[] args) {
-    for (int i = 0; i < 2; i++) {
-      int finalI = i;
-      var t = new Thread(() -> {
-        Properties props = new Properties();
-        props.put(ConsumerConfig.GROUP_ID_CONFIG, "test_Members_3");
-        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
-        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, BytesDeserializer.class);
-        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, BytesDeserializer.class);
-        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
-
-        try {
-          Thread.sleep(finalI * 10_000);
-        } catch (Exception e) {
-        }
-
-        var consumer = new KafkaConsumer<String, String>(props);
-        consumer.subscribe(List.of("test"));
-        while (true) {
-          consumer.poll(Duration.ofSeconds(5));
-          consumer.commitSync();
-        }
-      });
-      t.setDaemon(true);
-      t.start();
-    }
-
-    Thread.sleep(600_000);
-  }
 }

+ 0 - 9
kafka-ui-api/src/test/java/com/provectus/kafka/ui/emitter/MessageFiltersTest.java

@@ -61,15 +61,6 @@ class MessageFiltersTest {
   @Nested
   class GroovyScriptFilter {
 
-    @Test
-    void  test(){
-      var test = groovyScriptFilter("value == null")
-          .test(new TopicMessageDTO()
-              .timestamp(OffsetDateTime.now())
-              .content("\u0000\u0003\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0015����\u0000\fAQAAAYBnhg6m\u0000\u0000\u0001�g�\u0016\u000B"));
-      assertThat(test).isTrue();
-    }
-
     @Test
     void throwsExceptionOnInvalidGroovySyntax() {
       assertThrows(ValidationException.class,