Browse Source

Feature/10 record deserialization (#57)

* Record deserialization

* Check avro schema for topic

* Fix sdp docker config

* Code cleanup

* Code review changes

* Move Avro schema name template to cluster-level config
Anton Petrov 5 năm trước cách đây
mục cha
commit
b60f2a357e

+ 20 - 1
docker/kafka-clusters-only.yaml

@@ -95,4 +95,23 @@ services:
      KAFKA_BROKER_ID: ignored
      KAFKA_ZOOKEEPER_CONNECT: ignored
     networks:
-     - default
+     - default
+
+  schemaregistry0:
+    image: confluentinc/cp-schema-registry:5.1.0
+    depends_on:
+      - zookeeper0
+      - kafka0
+      - kafka01
+    ports:
+      - 8085:8085
+    environment:
+      SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: PLAINTEXT://kafka0:9092,PLAINTEXT://kafka01:9092
+      SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL: zookeeper0:2183
+      SCHEMA_REGISTRY_KAFKASTORE_SECURITY_PROTOCOL: PLAINTEXT
+      SCHEMA_REGISTRY_HOST_NAME: schemaregistry
+      SCHEMA_REGISTRY_LISTENERS: http://0.0.0.0:8085
+
+      SCHEMA_REGISTRY_SCHEMA_REGISTRY_INTER_INSTANCE_PROTOCOL: "http"
+      SCHEMA_REGISTRY_LOG4J_ROOT_LOGLEVEL: INFO
+      SCHEMA_REGISTRY_KAFKASTORE_TOPIC: _schemas

+ 17 - 0
docker/kafka-ui.yaml

@@ -12,6 +12,7 @@ services:
       - zookeeper1
       - kafka0
       - kafka1
+      - schemaregistry0
     command: [ "java", "-jar", "kafka-ui-api.jar", "--spring.profiles.active=sdp"]
 
   zookeeper0:
@@ -53,3 +54,19 @@ services:
       KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
       JMX_PORT: 9997
       KAFKA_JMX_OPTS: -Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Djava.rmi.server.hostname=kafka1 -Dcom.sun.management.jmxremote.rmi.port=9997
+
+  schemaregistry0:
+    image: confluentinc/cp-schema-registry:5.1.0
+    depends_on:
+      - zookeeper0
+      - kafka0
+    environment:
+      SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: PLAINTEXT://kafka0:29092
+      SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL: zookeeper0:2181
+      SCHEMA_REGISTRY_KAFKASTORE_SECURITY_PROTOCOL: PLAINTEXT
+      SCHEMA_REGISTRY_HOST_NAME: schemaregistry0
+      SCHEMA_REGISTRY_LISTENERS: http://schemaregistry0:8085
+
+      SCHEMA_REGISTRY_SCHEMA_REGISTRY_INTER_INSTANCE_PROTOCOL: "http"
+      SCHEMA_REGISTRY_LOG4J_ROOT_LOGLEVEL: INFO
+      SCHEMA_REGISTRY_KAFKASTORE_TOPIC: _schemas

+ 15 - 0
kafka-ui-api/pom.xml

@@ -69,6 +69,21 @@
             <artifactId>mapstruct</artifactId>
             <version>${org.mapstruct.version}</version>
         </dependency>
+        <dependency>
+            <groupId>io.confluent</groupId>
+            <artifactId>kafka-schema-registry-client</artifactId>
+            <version>${confluent.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>io.confluent</groupId>
+            <artifactId>kafka-avro-serializer</artifactId>
+            <version>${confluent.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.avro</groupId>
+            <artifactId>avro</artifactId>
+            <version>${avro.version}</version>
+        </dependency>
 
         <dependency>
             <groupId>org.springframework.boot</groupId>

+ 2 - 0
kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/config/ClustersProperties.java

@@ -19,5 +19,7 @@ public class ClustersProperties {
         String name;
         String bootstrapServers;
         String zookeeper;
+        String schemaRegistry;
+        String schemaNameTemplate = "%s-value";
     }
 }

+ 43 - 0
kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/deserialization/DeserializationService.java

@@ -0,0 +1,43 @@
+package com.provectus.kafka.ui.cluster.deserialization;
+
+import lombok.RequiredArgsConstructor;
+
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import javax.annotation.PostConstruct;
+
+import org.apache.commons.lang3.StringUtils;
+import org.springframework.stereotype.Component;
+
+import com.provectus.kafka.ui.cluster.model.ClustersStorage;
+import com.provectus.kafka.ui.cluster.model.KafkaCluster;
+
+@Component
+@RequiredArgsConstructor
+public class DeserializationService {
+
+	private final ClustersStorage clustersStorage;
+	private Map<String, RecordDeserializer> clusterDeserializers;
+
+	@PostConstruct
+	public void init() {
+		this.clusterDeserializers = clustersStorage.getKafkaClusters().stream()
+				.collect(Collectors.toMap(
+						KafkaCluster::getName,
+						this::createRecordDeserializerForCluster
+				));
+	}
+
+	private RecordDeserializer createRecordDeserializerForCluster(KafkaCluster cluster) {
+		if (StringUtils.isEmpty(cluster.getSchemaRegistry())) {
+			return new SimpleRecordDeserializer();
+		} else {
+			return new SchemaRegistryRecordDeserializer(cluster);
+		}
+	}
+
+	public RecordDeserializer getRecordDeserializerForCluster(KafkaCluster cluster) {
+		return clusterDeserializers.get(cluster.getName());
+	}
+}

+ 9 - 0
kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/deserialization/RecordDeserializer.java

@@ -0,0 +1,9 @@
+package com.provectus.kafka.ui.cluster.deserialization;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.utils.Bytes;
+
+public interface RecordDeserializer {
+
+	Object deserialize(ConsumerRecord<Bytes, Bytes> record);
+}

+ 131 - 0
kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/deserialization/SchemaRegistryRecordDeserializer.java

@@ -0,0 +1,131 @@
+package com.provectus.kafka.ui.cluster.deserialization;
+
+import lombok.RequiredArgsConstructor;
+import lombok.extern.log4j.Log4j2;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.avro.generic.GenericRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.utils.Bytes;
+
+import io.confluent.kafka.schemaregistry.SchemaProvider;
+import io.confluent.kafka.schemaregistry.avro.AvroSchemaProvider;
+import io.confluent.kafka.schemaregistry.avro.AvroSchemaUtils;
+import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient;
+import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
+import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException;
+import io.confluent.kafka.serializers.KafkaAvroDeserializer;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.provectus.kafka.ui.cluster.model.KafkaCluster;
+
+@Log4j2
+@RequiredArgsConstructor
+public class SchemaRegistryRecordDeserializer implements RecordDeserializer {
+
+	private final static int CLIENT_IDENTITY_MAP_CAPACITY = 100;
+
+	private final KafkaCluster cluster;
+	private final  SchemaRegistryClient schemaRegistryClient;
+	private KafkaAvroDeserializer avroDeserializer;
+	private ObjectMapper objectMapper;
+	private StringDeserializer stringDeserializer;
+
+	private final Map<String, MessageFormat> topicFormatMap = new ConcurrentHashMap<>();
+
+	public SchemaRegistryRecordDeserializer(KafkaCluster cluster) {
+		this.cluster = cluster;
+
+		List<String> endpoints = Collections.singletonList(cluster.getSchemaRegistry());
+		List<SchemaProvider> providers = Collections.singletonList(new AvroSchemaProvider());
+		this.schemaRegistryClient = new CachedSchemaRegistryClient(endpoints, CLIENT_IDENTITY_MAP_CAPACITY, providers, Collections.emptyMap());
+
+		this.avroDeserializer = new KafkaAvroDeserializer(schemaRegistryClient);
+		this.objectMapper = new ObjectMapper();
+		this.stringDeserializer = new StringDeserializer();
+	}
+
+	public Object deserialize(ConsumerRecord<Bytes, Bytes> record) {
+		MessageFormat format = getMessageFormat(record);
+
+		try {
+			Object parsedValue;
+			switch (format) {
+				case AVRO:
+					parsedValue = parseAvroRecord(record);
+					break;
+				case JSON:
+					parsedValue = parseJsonRecord(record);
+					break;
+				case STRING:
+					parsedValue = parseStringRecord(record);
+					break;
+				default:
+					throw new IllegalArgumentException("Unknown message format " + format + " for topic " + record.topic());
+			}
+			return parsedValue;
+		} catch (IOException e) {
+			throw new RuntimeException("Failed to parse record from topic " + record.topic(), e);
+		}
+	}
+
+	private MessageFormat getMessageFormat(ConsumerRecord<Bytes, Bytes> record) {
+		return topicFormatMap.computeIfAbsent(record.topic(), k -> detectFormat(record));
+	}
+
+	private MessageFormat detectFormat(ConsumerRecord<Bytes, Bytes> record) {
+		String avroSchema = String.format(cluster.getSchemaNameTemplate(), record.topic());
+		try {
+			schemaRegistryClient.getAllVersions(avroSchema);
+			return MessageFormat.AVRO;
+		} catch (RestClientException | IOException e) {
+			log.info("Failed to get Avro schema for topic {}", record.topic());
+		}
+
+		try {
+			parseJsonRecord(record);
+			return MessageFormat.JSON;
+		} catch (IOException e) {
+			log.info("Failed to parse json from topic {}", record.topic());
+		}
+
+		return MessageFormat.STRING;
+	}
+
+	private Object parseAvroRecord(ConsumerRecord<Bytes, Bytes> record) throws IOException {
+		String topic = record.topic();
+		byte[] valueBytes = record.value().get();
+		GenericRecord avroRecord = (GenericRecord) avroDeserializer.deserialize(topic, valueBytes);
+		byte[] bytes = AvroSchemaUtils.toJson(avroRecord);
+		return parseJson(bytes);
+	}
+
+	private Object parseJsonRecord(ConsumerRecord<Bytes, Bytes> record) throws IOException {
+		byte[] valueBytes = record.value().get();
+		return parseJson(valueBytes);
+	}
+
+	private Object parseJson(byte[] bytes) throws IOException {
+		return objectMapper.readValue(bytes, new TypeReference<Map<String, Object>>() {
+		});
+	}
+
+	private Object parseStringRecord(ConsumerRecord<Bytes, Bytes> record) {
+		String topic = record.topic();
+		byte[] valueBytes = record.value().get();
+		return stringDeserializer.deserialize(topic, valueBytes);
+	}
+
+	public enum MessageFormat {
+		AVRO,
+		JSON,
+		STRING
+	}
+}

+ 15 - 0
kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/deserialization/SimpleRecordDeserializer.java

@@ -0,0 +1,15 @@
+package com.provectus.kafka.ui.cluster.deserialization;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.utils.Bytes;
+
+public class SimpleRecordDeserializer implements RecordDeserializer {
+
+	private final StringDeserializer stringDeserializer = new StringDeserializer();
+
+	@Override
+	public Object deserialize(ConsumerRecord<Bytes, Bytes> record) {
+		return stringDeserializer.deserialize(record.topic(), record.value().get());
+	}
+}

+ 2 - 1
kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/model/KafkaCluster.java

@@ -15,11 +15,12 @@ public class KafkaCluster {
     private final String jmxPort;
     private final String bootstrapServers;
     private final String zookeeper;
+    private final String schemaRegistry;
+    private final String schemaNameTemplate;
     private final ServerStatus status;
     private final ServerStatus zookeeperStatus;
     private final InternalClusterMetrics metrics;
     private final Map<String, InternalTopic> topics;
     private final Throwable lastKafkaException;
     private final Throwable lastZookeeperException;
-
 }

+ 5 - 1
kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/service/ConsumingService.java

@@ -16,6 +16,8 @@ import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.utils.Bytes;
 import org.springframework.stereotype.Service;
 
+import com.provectus.kafka.ui.cluster.deserialization.DeserializationService;
+import com.provectus.kafka.ui.cluster.deserialization.RecordDeserializer;
 import com.provectus.kafka.ui.cluster.model.ConsumerPosition;
 import com.provectus.kafka.ui.cluster.model.KafkaCluster;
 import com.provectus.kafka.ui.cluster.util.ClusterUtil;
@@ -37,15 +39,17 @@ public class ConsumingService {
 	private static final int MAX_POLLS_COUNT = 30;
 
 	private final KafkaService kafkaService;
+	private final DeserializationService deserializationService;
 
 	public Flux<TopicMessage> loadMessages(KafkaCluster cluster, String topic, ConsumerPosition consumerPosition, Integer limit) {
 		int recordsLimit = Optional.ofNullable(limit)
 				.map(s -> Math.min(s, MAX_RECORD_LIMIT))
 				.orElse(DEFAULT_RECORD_LIMIT);
 		RecordEmitter emitter = new RecordEmitter(kafkaService, cluster, topic, consumerPosition);
+		RecordDeserializer recordDeserializer = deserializationService.getRecordDeserializerForCluster(cluster);
 		return Flux.create(emitter::emit)
 				.subscribeOn(Schedulers.boundedElastic())
-				.map(ClusterUtil::mapToTopicMessage)
+				.map(r -> ClusterUtil.mapToTopicMessage(r, recordDeserializer))
 				.limitRequest(recordsLimit);
 	}
 

+ 4 - 2
kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/util/ClusterUtil.java

@@ -1,6 +1,7 @@
 package com.provectus.kafka.ui.cluster.util;
 
 import com.provectus.kafka.ui.cluster.model.*;
+import com.provectus.kafka.ui.cluster.deserialization.RecordDeserializer;
 import com.provectus.kafka.ui.model.*;
 import lombok.extern.slf4j.Slf4j;
 import com.provectus.kafka.ui.model.TopicMessage;
@@ -149,7 +150,7 @@ public class ClusterUtil {
         return serverStatus.equals(ServerStatus.ONLINE) ? 1 : 0;
     }
 
-    public static TopicMessage mapToTopicMessage(ConsumerRecord<Bytes, Bytes> consumerRecord) {
+    public static TopicMessage mapToTopicMessage(ConsumerRecord<Bytes, Bytes> consumerRecord, RecordDeserializer recordDeserializer) {
         OffsetDateTime timestamp = OffsetDateTime.ofInstant(Instant.ofEpochMilli(consumerRecord.timestamp()), UTC_ZONE_ID);
         TopicMessage.TimestampTypeEnum timestampType = mapToTimestampType(consumerRecord.timestampType());
         Map<String, String> headers = new HashMap<>();
@@ -166,7 +167,8 @@ public class ClusterUtil {
             topicMessage.setKey(consumerRecord.key().toString());
         }
         topicMessage.setHeaders(headers);
-        topicMessage.setContent(consumerRecord.value().toString());
+        Object parsedValue = recordDeserializer.deserialize(consumerRecord);
+        topicMessage.setContent(parsedValue);
 
         return topicMessage;
     }

+ 2 - 0
kafka-ui-api/src/main/resources/application-local.yml

@@ -4,6 +4,8 @@ kafka:
       name: local
       bootstrapServers: localhost:29091
       zookeeper: localhost:2183
+      schemaRegistry: http://localhost:8085
+#      schemaNameTemplate: "%s-value"
     -
       name: secondLocal
       bootstrapServers: localhost:29092

+ 1 - 0
kafka-ui-api/src/main/resources/application-sdp.yml

@@ -4,6 +4,7 @@ kafka:
       name: local
       bootstrapServers: kafka0:29092
       zookeeper: zookeeper0:2181
+      schemaRegistry: http://schemaregistry0:8085
     -
       name: secondLocal
       zookeeper: zookeeper1:2181

+ 1 - 1
kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml

@@ -458,7 +458,7 @@ components:
           additionalProperties:
             type: string
         content:
-          type: string
+          type: object
       required:
         - partition
         - offset

+ 16 - 0
pom.xml

@@ -28,8 +28,24 @@
 		<swagger-annotations.version>1.6.0</swagger-annotations.version>
 		<springdoc-openapi-webflux-ui.version>1.2.32</springdoc-openapi-webflux-ui.version>
 		<kafka.version>2.4.1</kafka.version>
+		<avro.version>1.9.2</avro.version>
+		<confluent.version>5.5.0</confluent.version>
 	</properties>
 
+	<repositories>
+		<repository>
+			<id>confluent</id>
+			<url>https://packages.confluent.io/maven/</url>
+		</repository>
+	</repositories>
+
+	<pluginRepositories>
+		<pluginRepository>
+			<id>confluent</id>
+			<url>https://packages.confluent.io/maven/</url>
+		</pluginRepository>
+	</pluginRepositories>
+
 	<groupId>com.provectus</groupId>
 	<artifactId>kafka-ui</artifactId>
 	<version>0.0.1-SNAPSHOT</version>