diff --git a/kafka-ui-api/pom.xml b/kafka-ui-api/pom.xml index c658e6a23c..41ca96f744 100644 --- a/kafka-ui-api/pom.xml +++ b/kafka-ui-api/pom.xml @@ -86,6 +86,11 @@ kafka-avro-serializer ${confluent.version} + + io.confluent + kafka-json-schema-serializer + ${confluent.version} + io.confluent kafka-protobuf-serializer diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/serde/schemaregistry/AvroMessageReader.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/serde/schemaregistry/AvroMessageReader.java index d89792159f..fcf5173a27 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/serde/schemaregistry/AvroMessageReader.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/serde/schemaregistry/AvroMessageReader.java @@ -7,6 +7,7 @@ import io.confluent.kafka.schemaregistry.client.SchemaMetadata; import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient; import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException; import io.confluent.kafka.serializers.KafkaAvroSerializer; +import io.confluent.kafka.serializers.KafkaAvroSerializerConfig; import java.io.IOException; import java.util.Map; import org.apache.kafka.common.serialization.Serializer; @@ -23,8 +24,14 @@ public class AvroMessageReader extends MessageReader { @Override protected Serializer createSerializer(SchemaRegistryClient client) { var serializer = new KafkaAvroSerializer(client); - // need to call configure to set isKey property - serializer.configure(Map.of("schema.registry.url", "wontbeused"), isKey); + serializer.configure( + Map.of( + "schema.registry.url", "wontbeused", + KafkaAvroSerializerConfig.AUTO_REGISTER_SCHEMAS, false, + KafkaAvroSerializerConfig.USE_LATEST_VERSION, true + ), + isKey + ); return serializer; } diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/serde/schemaregistry/JsonSchemaMessageFormatter.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/serde/schemaregistry/JsonSchemaMessageFormatter.java new file mode 100644 index 0000000000..3435851b4e --- /dev/null +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/serde/schemaregistry/JsonSchemaMessageFormatter.java @@ -0,0 +1,20 @@ +package com.provectus.kafka.ui.serde.schemaregistry; + +import com.fasterxml.jackson.databind.JsonNode; +import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient; +import io.confluent.kafka.serializers.json.KafkaJsonSchemaDeserializer; + +public class JsonSchemaMessageFormatter implements MessageFormatter { + + private final KafkaJsonSchemaDeserializer jsonSchemaDeserializer; + + public JsonSchemaMessageFormatter(SchemaRegistryClient client) { + this.jsonSchemaDeserializer = new KafkaJsonSchemaDeserializer<>(client); + } + + @Override + public String format(String topic, byte[] value) { + JsonNode json = jsonSchemaDeserializer.deserialize(topic, value); + return json.toString(); + } +} diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/serde/schemaregistry/JsonSchemaMessageReader.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/serde/schemaregistry/JsonSchemaMessageReader.java new file mode 100644 index 0000000000..1cecdf7fd5 --- /dev/null +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/serde/schemaregistry/JsonSchemaMessageReader.java @@ -0,0 +1,81 @@ +package com.provectus.kafka.ui.serde.schemaregistry; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.provectus.kafka.ui.exception.ValidationException; +import com.provectus.kafka.ui.util.annotations.KafkaClientInternalsDependant; +import io.confluent.kafka.schemaregistry.ParsedSchema; +import io.confluent.kafka.schemaregistry.client.SchemaMetadata; +import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient; +import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException; +import io.confluent.kafka.schemaregistry.json.JsonSchema; +import io.confluent.kafka.serializers.json.KafkaJsonSchemaSerializer; +import io.confluent.kafka.serializers.json.KafkaJsonSchemaSerializerConfig; +import java.io.IOException; +import java.util.Map; +import org.apache.kafka.common.serialization.Serializer; + +public class JsonSchemaMessageReader extends MessageReader { + + private static final ObjectMapper MAPPER = new ObjectMapper(); + + public JsonSchemaMessageReader(String topic, + boolean isKey, + SchemaRegistryClient client, + SchemaMetadata schema) throws IOException, RestClientException { + super(topic, isKey, client, schema); + } + + @Override + protected Serializer createSerializer(SchemaRegistryClient client) { + var serializer = new KafkaJsonSchemaSerializerWithoutSchemaInfer(client); + serializer.configure( + Map.of( + "schema.registry.url", "wontbeused", + KafkaJsonSchemaSerializerConfig.AUTO_REGISTER_SCHEMAS, false, + KafkaJsonSchemaSerializerConfig.USE_LATEST_VERSION, true + ), + isKey + ); + return serializer; + } + + @Override + protected JsonNode read(String value, ParsedSchema schema) { + try { + JsonNode json = MAPPER.readTree(value); + ((JsonSchema) schema).validate(json); + return json; + } catch (JsonProcessingException e) { + throw new ValidationException(String.format("'%s' is not valid json", value)); + } catch (org.everit.json.schema.ValidationException e) { + throw new ValidationException( + String.format("'%s' does not fit schema: %s", value, e.getAllMessages())); + } + } + + @KafkaClientInternalsDependant + private class KafkaJsonSchemaSerializerWithoutSchemaInfer + extends KafkaJsonSchemaSerializer { + + KafkaJsonSchemaSerializerWithoutSchemaInfer(SchemaRegistryClient client) { + super(client); + } + + /** + * Need to override original method because it tries to infer schema from input + * by checking 'schema' json field or @Schema annotation on input class, which is not + * possible in our case. So, we just skip all infer logic and pass schema directly. + */ + @Override + public byte[] serialize(String topic, JsonNode record) { + return super.serializeImpl( + super.getSubjectName(topic, isKey, record, schema), + record, + (JsonSchema) schema + ); + } + } + +} diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/serde/schemaregistry/MessageFormat.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/serde/schemaregistry/MessageFormat.java index a14bbcb650..b1e875b760 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/serde/schemaregistry/MessageFormat.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/serde/schemaregistry/MessageFormat.java @@ -3,6 +3,5 @@ package com.provectus.kafka.ui.serde.schemaregistry; public enum MessageFormat { AVRO, JSON, - STRING, PROTOBUF } diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/serde/schemaregistry/MessageReader.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/serde/schemaregistry/MessageReader.java index 334e824200..c6cb1e4606 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/serde/schemaregistry/MessageReader.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/serde/schemaregistry/MessageReader.java @@ -11,7 +11,7 @@ public abstract class MessageReader { protected final Serializer serializer; protected final String topic; protected final boolean isKey; - private final ParsedSchema schema; + protected final ParsedSchema schema; protected MessageReader(String topic, boolean isKey, SchemaRegistryClient client, SchemaMetadata schema) throws IOException, RestClientException { diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/serde/schemaregistry/ProtobufMessageReader.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/serde/schemaregistry/ProtobufMessageReader.java index 6a59bce800..ce3150467c 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/serde/schemaregistry/ProtobufMessageReader.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/serde/schemaregistry/ProtobufMessageReader.java @@ -9,6 +9,7 @@ import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient; import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException; import io.confluent.kafka.schemaregistry.protobuf.ProtobufSchema; import io.confluent.kafka.serializers.protobuf.KafkaProtobufSerializer; +import io.confluent.kafka.serializers.protobuf.KafkaProtobufSerializerConfig; import java.io.IOException; import java.util.Map; import org.apache.kafka.common.serialization.Serializer; @@ -24,8 +25,14 @@ public class ProtobufMessageReader extends MessageReader { @Override protected Serializer createSerializer(SchemaRegistryClient client) { var serializer = new KafkaProtobufSerializer<>(client); - // need to call configure to set isKey property - serializer.configure(Map.of("schema.registry.url", "wontbeused"), isKey); + serializer.configure( + Map.of( + "schema.registry.url", "wontbeused", + KafkaProtobufSerializerConfig.AUTO_REGISTER_SCHEMAS, false, + KafkaProtobufSerializerConfig.USE_LATEST_VERSION, true + ), + isKey + ); return serializer; } diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/serde/schemaregistry/SchemaRegistryAwareRecordSerDe.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/serde/schemaregistry/SchemaRegistryAwareRecordSerDe.java index 68185ddc99..08aef7455f 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/serde/schemaregistry/SchemaRegistryAwareRecordSerDe.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/serde/schemaregistry/SchemaRegistryAwareRecordSerDe.java @@ -16,6 +16,7 @@ import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient; import io.confluent.kafka.schemaregistry.client.SchemaMetadata; import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient; import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException; +import io.confluent.kafka.schemaregistry.json.JsonSchemaProvider; import io.confluent.kafka.schemaregistry.protobuf.ProtobufSchema; import io.confluent.kafka.schemaregistry.protobuf.ProtobufSchemaProvider; import java.net.URI; @@ -52,6 +53,9 @@ public class SchemaRegistryAwareRecordSerDe implements RecordSerDe { @Nullable private final ProtobufMessageFormatter protobufFormatter; + @Nullable + private final JsonSchemaMessageFormatter jsonSchemaMessageFormatter; + private final StringMessageFormatter stringFormatter = new StringMessageFormatter(); private final ProtobufSchemaConverter protoSchemaConverter = new ProtobufSchemaConverter(); private final AvroJsonSchemaConverter avroSchemaConverter = new AvroJsonSchemaConverter(); @@ -60,7 +64,7 @@ public class SchemaRegistryAwareRecordSerDe implements RecordSerDe { private static SchemaRegistryClient createSchemaRegistryClient(KafkaCluster cluster) { Objects.requireNonNull(cluster.getSchemaRegistry()); List schemaProviders = - List.of(new AvroSchemaProvider(), new ProtobufSchemaProvider()); + List.of(new AvroSchemaProvider(), new ProtobufSchemaProvider(), new JsonSchemaProvider()); //TODO add auth return new CachedSchemaRegistryClient( Collections.singletonList(cluster.getSchemaRegistry()), @@ -78,9 +82,11 @@ public class SchemaRegistryAwareRecordSerDe implements RecordSerDe { if (schemaRegistryClient != null) { this.avroFormatter = new AvroMessageFormatter(schemaRegistryClient); this.protobufFormatter = new ProtobufMessageFormatter(schemaRegistryClient); + this.jsonSchemaMessageFormatter = new JsonSchemaMessageFormatter(schemaRegistryClient); } else { this.avroFormatter = null; this.protobufFormatter = null; + this.jsonSchemaMessageFormatter = null; } } @@ -128,6 +134,8 @@ public class SchemaRegistryAwareRecordSerDe implements RecordSerDe { reader = new ProtobufMessageReader(topic, isKey, schemaRegistryClient, schema); } else if (schema.getSchemaType().equals(MessageFormat.AVRO.name())) { reader = new AvroMessageReader(topic, isKey, schemaRegistryClient, schema); + } else if (schema.getSchemaType().equals(MessageFormat.JSON.name())) { + reader = new JsonSchemaMessageReader(topic, isKey, schemaRegistryClient, schema); } else { throw new IllegalStateException("Unsupported schema type: " + schema.getSchemaType()); } @@ -218,6 +226,10 @@ public class SchemaRegistryAwareRecordSerDe implements RecordSerDe { if (tryFormatter(avroFormatter, msg, isKey).isPresent()) { return avroFormatter; } + } else if (type.get().equals(MessageFormat.JSON.name())) { + if (tryFormatter(jsonSchemaMessageFormatter, msg, isKey).isPresent()) { + return jsonSchemaMessageFormatter; + } } else { throw new IllegalStateException("Unsupported schema type: " + type.get()); } diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/util/annotations/KafkaClientInternalsDependant.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/util/annotations/KafkaClientInternalsDependant.java new file mode 100644 index 0000000000..1003ff0d7f --- /dev/null +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/util/annotations/KafkaClientInternalsDependant.java @@ -0,0 +1,8 @@ +package com.provectus.kafka.ui.util.annotations; + +/** + * All code places that depend on kafka-client's internals or implementation-specific logic + * should be marked with this annotation to make further update process easier. + */ +public @interface KafkaClientInternalsDependant { +} diff --git a/kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/SendAndReadTests.java b/kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/SendAndReadTests.java index 526e01f6ff..f3b1988f5c 100644 --- a/kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/SendAndReadTests.java +++ b/kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/SendAndReadTests.java @@ -12,6 +12,7 @@ import com.provectus.kafka.ui.model.SeekType; import com.provectus.kafka.ui.model.TopicMessage; import io.confluent.kafka.schemaregistry.ParsedSchema; import io.confluent.kafka.schemaregistry.avro.AvroSchema; +import io.confluent.kafka.schemaregistry.json.JsonSchema; import io.confluent.kafka.schemaregistry.protobuf.ProtobufSchema; import java.time.Duration; import java.util.Map; @@ -21,8 +22,6 @@ import java.util.function.Consumer; import lombok.SneakyThrows; import org.apache.kafka.clients.admin.NewTopic; import org.apache.kafka.common.TopicPartition; -import org.junit.Assert; -import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; import org.springframework.beans.factory.annotation.Autowired; @@ -81,6 +80,33 @@ public class SendAndReadTests extends AbstractBaseTest { private static final String PROTOBUF_SCHEMA_JSON_RECORD = "{ \"f1\" : \"test str\", \"f2\" : 123 }"; + + private static final JsonSchema JSON_SCHEMA = new JsonSchema( + "{ " + + " \"$schema\": \"http://json-schema.org/draft-07/schema#\", " + + " \"$id\": \"http://example.com/myURI.schema.json\", " + + " \"title\": \"TestRecord\"," + + " \"type\": \"object\"," + + " \"additionalProperties\": false," + + " \"properties\": {" + + " \"f1\": {" + + " \"type\": \"integer\"" + + " }," + + " \"f2\": {" + + " \"type\": \"string\"" + + " }," + // it is important special case since there is code in KafkaJsonSchemaSerializer + // that checks fields with this name (it should be worked around) + + " \"schema\": {" + + " \"type\": \"string\"" + + " }" + + " }" + + "}" + ); + + private static final String JSON_SCHEMA_RECORD + = "{ \"f1\": 12, \"f2\": \"testJsonSchema1\", \"schema\": \"some txt\" }"; + @Autowired private ClusterService clusterService; @@ -236,15 +262,14 @@ public class SendAndReadTests extends AbstractBaseTest { @Test void valueWithAvroSchemaShouldThrowExceptionArgIsNotValidJsonObject() { - assertThatThrownBy(() -> { - new SendAndReadSpec() - .withValueSchema(AVRO_SCHEMA_2) - .withMsgToSend( - new CreateTopicMessage() - .content("not a json object") - ) - .doAssert(polled -> Assertions.fail()); - }).hasMessageContaining("Failed to serialize record"); + new SendAndReadSpec() + .withValueSchema(AVRO_SCHEMA_2) + .withMsgToSend( + new CreateTopicMessage() + // f2 has type object instead of string + .content("{ \"f1\": 111, \"f2\": {} }") + ) + .assertSendThrowsException(); } @Test @@ -281,15 +306,56 @@ public class SendAndReadTests extends AbstractBaseTest { @Test void valueWithProtoSchemaShouldThrowExceptionArgIsNotValidJsonObject() { - assertThatThrownBy(() -> { - new SendAndReadSpec() - .withValueSchema(PROTOBUF_SCHEMA) - .withMsgToSend( - new CreateTopicMessage() - .content("not a json object") - ) - .doAssert(polled -> Assertions.fail()); - }).hasMessageContaining("Failed to serialize record"); + new SendAndReadSpec() + .withValueSchema(PROTOBUF_SCHEMA) + .withMsgToSend( + new CreateTopicMessage() + // f2 field has type object instead of int + .content("{ \"f1\" : \"test str\", \"f2\" : {} }")) + .assertSendThrowsException(); + } + + @Test + void keyWithProtoSchemaValueWithJsonSchema() { + new SendAndReadSpec() + .withKeySchema(PROTOBUF_SCHEMA) + .withValueSchema(JSON_SCHEMA) + .withMsgToSend( + new CreateTopicMessage() + .key(PROTOBUF_SCHEMA_JSON_RECORD) + .content(JSON_SCHEMA_RECORD) + ) + .doAssert(polled -> { + assertJsonEqual(polled.getKey(), PROTOBUF_SCHEMA_JSON_RECORD); + assertJsonEqual(polled.getContent(), JSON_SCHEMA_RECORD); + }); + } + + @Test + void keyWithJsonValueWithJsonSchemaKeyValueIsNull() { + new SendAndReadSpec() + .withKeySchema(JSON_SCHEMA) + .withValueSchema(JSON_SCHEMA) + .withMsgToSend( + new CreateTopicMessage() + .key(JSON_SCHEMA_RECORD) + ) + .doAssert(polled -> { + assertJsonEqual(polled.getKey(), JSON_SCHEMA_RECORD); + assertThat(polled.getContent()).isNull(); + }); + } + + @Test + void valueWithJsonSchemaThrowsExceptionIfArgIsNotValidJsonObject() { + new SendAndReadSpec() + .withValueSchema(JSON_SCHEMA) + .withMsgToSend( + new CreateTopicMessage() + // 'f2' field has has type object instead of string + .content("{ \"f1\": 12, \"f2\": {}, \"schema\": \"some txt\" }") + ) + .assertSendThrowsException(); } @@ -320,7 +386,7 @@ public class SendAndReadTests extends AbstractBaseTest { } @SneakyThrows - public void doAssert(Consumer msgAssert) { + private String createTopicAndCreateSchemas() { Objects.requireNonNull(msgToSend); String topic = UUID.randomUUID().toString(); createTopic(new NewTopic(topic, 1, (short) 1)); @@ -330,9 +396,23 @@ public class SendAndReadTests extends AbstractBaseTest { if (valueSchema != null) { schemaRegistry.schemaRegistryClient().register(topic + "-value", valueSchema); } - // need to update to see new topic & schemas clustersMetricsScheduler.updateMetrics(); + return topic; + } + + public void assertSendThrowsException() { + String topic = createTopicAndCreateSchemas(); + try { + assertThatThrownBy(() -> clusterService.sendMessage(LOCAL, topic, msgToSend).block()); + } finally { + deleteTopic(topic); + } + } + + @SneakyThrows + public void doAssert(Consumer msgAssert) { + String topic = createTopicAndCreateSchemas(); try { clusterService.sendMessage(LOCAL, topic, msgToSend).block(); TopicMessage polled = clusterService.getMessages(