浏览代码

Json Schema support (#651)

* Json schema support implementation

* comments & checkstyle

* unused mapper removed

* Explicit validation added

Co-authored-by: Ilya Kuramshin <ikuramshin@provectus.com>
Ilya Kuramshin 4 年之前
父节点
当前提交
73d4e48941

+ 5 - 0
kafka-ui-api/pom.xml

@@ -86,6 +86,11 @@
             <artifactId>kafka-avro-serializer</artifactId>
             <version>${confluent.version}</version>
         </dependency>
+        <dependency>
+            <groupId>io.confluent</groupId>
+            <artifactId>kafka-json-schema-serializer</artifactId>
+            <version>${confluent.version}</version>
+        </dependency>
         <dependency>
             <groupId>io.confluent</groupId>
             <artifactId>kafka-protobuf-serializer</artifactId>

+ 9 - 2
kafka-ui-api/src/main/java/com/provectus/kafka/ui/serde/schemaregistry/AvroMessageReader.java

@@ -7,6 +7,7 @@ import io.confluent.kafka.schemaregistry.client.SchemaMetadata;
 import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
 import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException;
 import io.confluent.kafka.serializers.KafkaAvroSerializer;
+import io.confluent.kafka.serializers.KafkaAvroSerializerConfig;
 import java.io.IOException;
 import java.util.Map;
 import org.apache.kafka.common.serialization.Serializer;
@@ -23,8 +24,14 @@ public class AvroMessageReader extends MessageReader<Object> {
   @Override
   protected Serializer<Object> createSerializer(SchemaRegistryClient client) {
     var serializer = new KafkaAvroSerializer(client);
-    // need to call configure to set isKey property
-    serializer.configure(Map.of("schema.registry.url", "wontbeused"), isKey);
+    serializer.configure(
+        Map.of(
+            "schema.registry.url", "wontbeused",
+            KafkaAvroSerializerConfig.AUTO_REGISTER_SCHEMAS, false,
+            KafkaAvroSerializerConfig.USE_LATEST_VERSION, true
+        ),
+        isKey
+    );
     return serializer;
   }
 

+ 20 - 0
kafka-ui-api/src/main/java/com/provectus/kafka/ui/serde/schemaregistry/JsonSchemaMessageFormatter.java

@@ -0,0 +1,20 @@
+package com.provectus.kafka.ui.serde.schemaregistry;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
+import io.confluent.kafka.serializers.json.KafkaJsonSchemaDeserializer;
+
+public class JsonSchemaMessageFormatter implements MessageFormatter {
+
+  private final KafkaJsonSchemaDeserializer<JsonNode> jsonSchemaDeserializer;
+
+  public JsonSchemaMessageFormatter(SchemaRegistryClient client) {
+    this.jsonSchemaDeserializer = new KafkaJsonSchemaDeserializer<>(client);
+  }
+
+  @Override
+  public String format(String topic, byte[] value) {
+    JsonNode json = jsonSchemaDeserializer.deserialize(topic, value);
+    return json.toString();
+  }
+}

+ 81 - 0
kafka-ui-api/src/main/java/com/provectus/kafka/ui/serde/schemaregistry/JsonSchemaMessageReader.java

@@ -0,0 +1,81 @@
+package com.provectus.kafka.ui.serde.schemaregistry;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.provectus.kafka.ui.exception.ValidationException;
+import com.provectus.kafka.ui.util.annotations.KafkaClientInternalsDependant;
+import io.confluent.kafka.schemaregistry.ParsedSchema;
+import io.confluent.kafka.schemaregistry.client.SchemaMetadata;
+import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
+import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException;
+import io.confluent.kafka.schemaregistry.json.JsonSchema;
+import io.confluent.kafka.serializers.json.KafkaJsonSchemaSerializer;
+import io.confluent.kafka.serializers.json.KafkaJsonSchemaSerializerConfig;
+import java.io.IOException;
+import java.util.Map;
+import org.apache.kafka.common.serialization.Serializer;
+
+public class JsonSchemaMessageReader extends MessageReader<JsonNode> {
+
+  private static final ObjectMapper MAPPER = new ObjectMapper();
+
+  public JsonSchemaMessageReader(String topic,
+                                 boolean isKey,
+                                 SchemaRegistryClient client,
+                                 SchemaMetadata schema) throws IOException, RestClientException {
+    super(topic, isKey, client, schema);
+  }
+
+  @Override
+  protected Serializer<JsonNode> createSerializer(SchemaRegistryClient client) {
+    var serializer = new KafkaJsonSchemaSerializerWithoutSchemaInfer(client);
+    serializer.configure(
+        Map.of(
+            "schema.registry.url", "wontbeused",
+            KafkaJsonSchemaSerializerConfig.AUTO_REGISTER_SCHEMAS, false,
+            KafkaJsonSchemaSerializerConfig.USE_LATEST_VERSION, true
+        ),
+        isKey
+    );
+    return serializer;
+  }
+
+  @Override
+  protected JsonNode read(String value, ParsedSchema schema) {
+    try {
+      JsonNode json = MAPPER.readTree(value);
+      ((JsonSchema) schema).validate(json);
+      return json;
+    } catch (JsonProcessingException e) {
+      throw new ValidationException(String.format("'%s' is not valid json", value));
+    } catch (org.everit.json.schema.ValidationException e) {
+      throw new ValidationException(
+          String.format("'%s' does not fit schema: %s", value, e.getAllMessages()));
+    }
+  }
+
+  @KafkaClientInternalsDependant
+  private class KafkaJsonSchemaSerializerWithoutSchemaInfer
+      extends KafkaJsonSchemaSerializer<JsonNode> {
+
+    KafkaJsonSchemaSerializerWithoutSchemaInfer(SchemaRegistryClient client) {
+      super(client);
+    }
+
+    /**
+     * Need to override original method because it tries to infer schema from input
+     * by checking 'schema' json field or @Schema annotation on input class, which is not
+     * possible in our case. So, we just skip all infer logic and pass schema directly.
+     */
+    @Override
+    public byte[] serialize(String topic, JsonNode record) {
+      return super.serializeImpl(
+          super.getSubjectName(topic, isKey, record, schema),
+          record,
+          (JsonSchema) schema
+      );
+    }
+  }
+
+}

+ 0 - 1
kafka-ui-api/src/main/java/com/provectus/kafka/ui/serde/schemaregistry/MessageFormat.java

@@ -3,6 +3,5 @@ package com.provectus.kafka.ui.serde.schemaregistry;
 public enum MessageFormat {
   AVRO,
   JSON,
-  STRING,
   PROTOBUF
 }

+ 1 - 1
kafka-ui-api/src/main/java/com/provectus/kafka/ui/serde/schemaregistry/MessageReader.java

@@ -11,7 +11,7 @@ public abstract class MessageReader<T> {
   protected final Serializer<T> serializer;
   protected final String topic;
   protected final boolean isKey;
-  private final ParsedSchema schema;
+  protected final ParsedSchema schema;
 
   protected MessageReader(String topic, boolean isKey, SchemaRegistryClient client,
                           SchemaMetadata schema) throws IOException, RestClientException {

+ 9 - 2
kafka-ui-api/src/main/java/com/provectus/kafka/ui/serde/schemaregistry/ProtobufMessageReader.java

@@ -9,6 +9,7 @@ import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
 import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException;
 import io.confluent.kafka.schemaregistry.protobuf.ProtobufSchema;
 import io.confluent.kafka.serializers.protobuf.KafkaProtobufSerializer;
+import io.confluent.kafka.serializers.protobuf.KafkaProtobufSerializerConfig;
 import java.io.IOException;
 import java.util.Map;
 import org.apache.kafka.common.serialization.Serializer;
@@ -24,8 +25,14 @@ public class ProtobufMessageReader extends MessageReader<Message> {
   @Override
   protected Serializer<Message> createSerializer(SchemaRegistryClient client) {
     var serializer = new KafkaProtobufSerializer<>(client);
-    // need to call configure to set isKey property
-    serializer.configure(Map.of("schema.registry.url", "wontbeused"), isKey);
+    serializer.configure(
+        Map.of(
+            "schema.registry.url", "wontbeused",
+            KafkaProtobufSerializerConfig.AUTO_REGISTER_SCHEMAS, false,
+            KafkaProtobufSerializerConfig.USE_LATEST_VERSION, true
+        ),
+        isKey
+    );
     return serializer;
   }
 

+ 13 - 1
kafka-ui-api/src/main/java/com/provectus/kafka/ui/serde/schemaregistry/SchemaRegistryAwareRecordSerDe.java

@@ -16,6 +16,7 @@ import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient;
 import io.confluent.kafka.schemaregistry.client.SchemaMetadata;
 import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
 import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException;
+import io.confluent.kafka.schemaregistry.json.JsonSchemaProvider;
 import io.confluent.kafka.schemaregistry.protobuf.ProtobufSchema;
 import io.confluent.kafka.schemaregistry.protobuf.ProtobufSchemaProvider;
 import java.net.URI;
@@ -52,6 +53,9 @@ public class SchemaRegistryAwareRecordSerDe implements RecordSerDe {
   @Nullable
   private final ProtobufMessageFormatter protobufFormatter;
 
+  @Nullable
+  private final JsonSchemaMessageFormatter jsonSchemaMessageFormatter;
+
   private final StringMessageFormatter stringFormatter = new StringMessageFormatter();
   private final ProtobufSchemaConverter protoSchemaConverter = new ProtobufSchemaConverter();
   private final AvroJsonSchemaConverter avroSchemaConverter = new AvroJsonSchemaConverter();
@@ -60,7 +64,7 @@ public class SchemaRegistryAwareRecordSerDe implements RecordSerDe {
   private static SchemaRegistryClient createSchemaRegistryClient(KafkaCluster cluster) {
     Objects.requireNonNull(cluster.getSchemaRegistry());
     List<SchemaProvider> schemaProviders =
-        List.of(new AvroSchemaProvider(), new ProtobufSchemaProvider());
+        List.of(new AvroSchemaProvider(), new ProtobufSchemaProvider(), new JsonSchemaProvider());
     //TODO add auth
     return new CachedSchemaRegistryClient(
         Collections.singletonList(cluster.getSchemaRegistry()),
@@ -78,9 +82,11 @@ public class SchemaRegistryAwareRecordSerDe implements RecordSerDe {
     if (schemaRegistryClient != null) {
       this.avroFormatter = new AvroMessageFormatter(schemaRegistryClient);
       this.protobufFormatter = new ProtobufMessageFormatter(schemaRegistryClient);
+      this.jsonSchemaMessageFormatter = new JsonSchemaMessageFormatter(schemaRegistryClient);
     } else {
       this.avroFormatter = null;
       this.protobufFormatter = null;
+      this.jsonSchemaMessageFormatter = null;
     }
   }
 
@@ -128,6 +134,8 @@ public class SchemaRegistryAwareRecordSerDe implements RecordSerDe {
         reader = new ProtobufMessageReader(topic, isKey, schemaRegistryClient, schema);
       } else if (schema.getSchemaType().equals(MessageFormat.AVRO.name())) {
         reader = new AvroMessageReader(topic, isKey, schemaRegistryClient, schema);
+      } else if (schema.getSchemaType().equals(MessageFormat.JSON.name())) {
+        reader = new JsonSchemaMessageReader(topic, isKey, schemaRegistryClient, schema);
       } else {
         throw new IllegalStateException("Unsupported schema type: " + schema.getSchemaType());
       }
@@ -218,6 +226,10 @@ public class SchemaRegistryAwareRecordSerDe implements RecordSerDe {
             if (tryFormatter(avroFormatter, msg, isKey).isPresent()) {
               return avroFormatter;
             }
+          } else if (type.get().equals(MessageFormat.JSON.name())) {
+            if (tryFormatter(jsonSchemaMessageFormatter, msg, isKey).isPresent()) {
+              return jsonSchemaMessageFormatter;
+            }
           } else {
             throw new IllegalStateException("Unsupported schema type: " + type.get());
           }

+ 8 - 0
kafka-ui-api/src/main/java/com/provectus/kafka/ui/util/annotations/KafkaClientInternalsDependant.java

@@ -0,0 +1,8 @@
+package com.provectus.kafka.ui.util.annotations;
+
+/**
+ * All code places that depend on kafka-client's internals or implementation-specific logic
+ * should be marked with this annotation to make further update process easier.
+ */
+public @interface KafkaClientInternalsDependant {
+}

+ 102 - 22
kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/SendAndReadTests.java

@@ -12,6 +12,7 @@ import com.provectus.kafka.ui.model.SeekType;
 import com.provectus.kafka.ui.model.TopicMessage;
 import io.confluent.kafka.schemaregistry.ParsedSchema;
 import io.confluent.kafka.schemaregistry.avro.AvroSchema;
+import io.confluent.kafka.schemaregistry.json.JsonSchema;
 import io.confluent.kafka.schemaregistry.protobuf.ProtobufSchema;
 import java.time.Duration;
 import java.util.Map;
@@ -21,8 +22,6 @@ import java.util.function.Consumer;
 import lombok.SneakyThrows;
 import org.apache.kafka.clients.admin.NewTopic;
 import org.apache.kafka.common.TopicPartition;
-import org.junit.Assert;
-import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.Test;
 import org.springframework.beans.factory.annotation.Autowired;
 
@@ -81,6 +80,33 @@ public class SendAndReadTests extends AbstractBaseTest {
   private static final String PROTOBUF_SCHEMA_JSON_RECORD
       = "{ \"f1\" : \"test str\", \"f2\" : 123 }";
 
+
+  private static final JsonSchema JSON_SCHEMA = new JsonSchema(
+      "{ "
+          + "  \"$schema\": \"http://json-schema.org/draft-07/schema#\", "
+          + "  \"$id\": \"http://example.com/myURI.schema.json\", "
+          + "  \"title\": \"TestRecord\","
+          + "  \"type\": \"object\","
+          + "  \"additionalProperties\": false,"
+          + "  \"properties\": {"
+          + "    \"f1\": {"
+          + "      \"type\": \"integer\""
+          + "    },"
+          + "    \"f2\": {"
+          + "      \"type\": \"string\""
+          + "    },"
+          // it is important special case since there is code in KafkaJsonSchemaSerializer
+          // that checks fields with this name (it should be worked around)
+          + "    \"schema\": {"
+          + "      \"type\": \"string\""
+          + "    }"
+          + "  }"
+          + "}"
+  );
+
+  private static final String JSON_SCHEMA_RECORD
+      = "{ \"f1\": 12, \"f2\": \"testJsonSchema1\", \"schema\": \"some txt\" }";
+
   @Autowired
   private ClusterService clusterService;
 
@@ -236,15 +262,14 @@ public class SendAndReadTests extends AbstractBaseTest {
 
   @Test
   void valueWithAvroSchemaShouldThrowExceptionArgIsNotValidJsonObject() {
-    assertThatThrownBy(() -> {
-      new SendAndReadSpec()
-          .withValueSchema(AVRO_SCHEMA_2)
-          .withMsgToSend(
-              new CreateTopicMessage()
-                  .content("not a json object")
-          )
-          .doAssert(polled -> Assertions.fail());
-    }).hasMessageContaining("Failed to serialize record");
+    new SendAndReadSpec()
+        .withValueSchema(AVRO_SCHEMA_2)
+        .withMsgToSend(
+            new CreateTopicMessage()
+                // f2 has type object instead of string
+                .content("{ \"f1\": 111, \"f2\": {} }")
+        )
+        .assertSendThrowsException();
   }
 
   @Test
@@ -281,15 +306,56 @@ public class SendAndReadTests extends AbstractBaseTest {
 
   @Test
   void valueWithProtoSchemaShouldThrowExceptionArgIsNotValidJsonObject() {
-    assertThatThrownBy(() -> {
-      new SendAndReadSpec()
-          .withValueSchema(PROTOBUF_SCHEMA)
-          .withMsgToSend(
-              new CreateTopicMessage()
-                  .content("not a json object")
-          )
-          .doAssert(polled -> Assertions.fail());
-    }).hasMessageContaining("Failed to serialize record");
+    new SendAndReadSpec()
+        .withValueSchema(PROTOBUF_SCHEMA)
+        .withMsgToSend(
+            new CreateTopicMessage()
+                // f2 field has type object instead of int
+                .content("{ \"f1\" : \"test str\", \"f2\" : {} }"))
+        .assertSendThrowsException();
+  }
+
+  @Test
+  void keyWithProtoSchemaValueWithJsonSchema() {
+    new SendAndReadSpec()
+        .withKeySchema(PROTOBUF_SCHEMA)
+        .withValueSchema(JSON_SCHEMA)
+        .withMsgToSend(
+            new CreateTopicMessage()
+                .key(PROTOBUF_SCHEMA_JSON_RECORD)
+                .content(JSON_SCHEMA_RECORD)
+        )
+        .doAssert(polled -> {
+          assertJsonEqual(polled.getKey(), PROTOBUF_SCHEMA_JSON_RECORD);
+          assertJsonEqual(polled.getContent(), JSON_SCHEMA_RECORD);
+        });
+  }
+
+  @Test
+  void keyWithJsonValueWithJsonSchemaKeyValueIsNull() {
+    new SendAndReadSpec()
+        .withKeySchema(JSON_SCHEMA)
+        .withValueSchema(JSON_SCHEMA)
+        .withMsgToSend(
+            new CreateTopicMessage()
+                .key(JSON_SCHEMA_RECORD)
+        )
+        .doAssert(polled -> {
+          assertJsonEqual(polled.getKey(), JSON_SCHEMA_RECORD);
+          assertThat(polled.getContent()).isNull();
+        });
+  }
+
+  @Test
+  void valueWithJsonSchemaThrowsExceptionIfArgIsNotValidJsonObject() {
+    new SendAndReadSpec()
+        .withValueSchema(JSON_SCHEMA)
+        .withMsgToSend(
+            new CreateTopicMessage()
+                // 'f2' field has has type object instead of string
+                .content("{ \"f1\": 12, \"f2\": {}, \"schema\": \"some txt\" }")
+        )
+        .assertSendThrowsException();
   }
 
 
@@ -320,7 +386,7 @@ public class SendAndReadTests extends AbstractBaseTest {
     }
 
     @SneakyThrows
-    public void doAssert(Consumer<TopicMessage> msgAssert) {
+    private String createTopicAndCreateSchemas() {
       Objects.requireNonNull(msgToSend);
       String topic = UUID.randomUUID().toString();
       createTopic(new NewTopic(topic, 1, (short) 1));
@@ -330,9 +396,23 @@ public class SendAndReadTests extends AbstractBaseTest {
       if (valueSchema != null) {
         schemaRegistry.schemaRegistryClient().register(topic + "-value", valueSchema);
       }
-
       // need to update to see new topic & schemas
       clustersMetricsScheduler.updateMetrics();
+      return topic;
+    }
+
+    public void assertSendThrowsException() {
+      String topic = createTopicAndCreateSchemas();
+      try {
+        assertThatThrownBy(() -> clusterService.sendMessage(LOCAL, topic, msgToSend).block());
+      } finally {
+        deleteTopic(topic);
+      }
+    }
+
+    @SneakyThrows
+    public void doAssert(Consumer<TopicMessage> msgAssert) {
+      String topic = createTopicAndCreateSchemas();
       try {
         clusterService.sendMessage(LOCAL, topic, msgToSend).block();
         TopicMessage polled = clusterService.getMessages(