ソースを参照

Minor serdes fixes (#3857)

1. wrapping all serde methods with Exception catching to prevent serde list generation failure
2. for SR serde checking if schema exists by id added
Ilya Kuramshin 2 年 前
コミット
7e47906d88

+ 24 - 4
kafka-ui-api/src/main/java/com/provectus/kafka/ui/serdes/SerdeInstance.java

@@ -42,19 +42,39 @@ public class SerdeInstance implements Closeable {
   }
 
   public Optional<SchemaDescription> getSchema(String topic, Serde.Target type) {
-    return wrapWithClassloader(() -> serde.getSchema(topic, type));
+    try {
+      return wrapWithClassloader(() -> serde.getSchema(topic, type));
+    } catch (Exception e) {
+      log.warn("Error getting schema for '{}'({}) with serde '{}'", topic, type, name, e);
+      return Optional.empty();
+    }
   }
 
   public Optional<String> description() {
-    return wrapWithClassloader(serde::getDescription);
+    try {
+      return wrapWithClassloader(serde::getDescription);
+    } catch (Exception e) {
+      log.warn("Error getting description serde '{}'", name, e);
+      return Optional.empty();
+    }
   }
 
   public boolean canSerialize(String topic, Serde.Target type) {
-    return wrapWithClassloader(() -> serde.canSerialize(topic, type));
+    try {
+      return wrapWithClassloader(() -> serde.canSerialize(topic, type));
+    } catch (Exception e) {
+      log.warn("Error calling canSerialize for '{}'({}) with serde '{}'", topic, type, name, e);
+      return false;
+    }
   }
 
   public boolean canDeserialize(String topic, Serde.Target type) {
-    return wrapWithClassloader(() -> serde.canDeserialize(topic, type));
+    try {
+      return wrapWithClassloader(() -> serde.canDeserialize(topic, type));
+    } catch (Exception e) {
+      log.warn("Error calling canDeserialize for '{}'({}) with serde '{}'", topic, type, name, e);
+      return false;
+    }
   }
 
   public Serde.Serializer serializer(String topic, Serde.Target type) {

+ 33 - 37
kafka-ui-api/src/main/java/com/provectus/kafka/ui/serdes/builtin/sr/SchemaRegistrySerde.java

@@ -189,39 +189,40 @@ public class SchemaRegistrySerde implements BuiltInSerde {
   public Optional<SchemaDescription> getSchema(String topic, Target type) {
     String subject = schemaSubject(topic, type);
     return getSchemaBySubject(subject)
-        .map(schemaMetadata ->
-            new SchemaDescription(
-                convertSchema(schemaMetadata),
-                Map.of(
-                    "subject", subject,
-                    "schemaId", schemaMetadata.getId(),
-                    "latestVersion", schemaMetadata.getVersion(),
-                    "type", schemaMetadata.getSchemaType() // AVRO / PROTOBUF / JSON
-                )
-            ));
+        .flatMap(schemaMetadata ->
+            //schema can be not-found, when schema contexts configured improperly
+            getSchemaById(schemaMetadata.getId())
+                .map(parsedSchema ->
+                    new SchemaDescription(
+                        convertSchema(schemaMetadata, parsedSchema),
+                        Map.of(
+                            "subject", subject,
+                            "schemaId", schemaMetadata.getId(),
+                            "latestVersion", schemaMetadata.getVersion(),
+                            "type", schemaMetadata.getSchemaType() // AVRO / PROTOBUF / JSON
+                        )
+                    )));
   }
 
   @SneakyThrows
-  private String convertSchema(SchemaMetadata schema) {
+  private String convertSchema(SchemaMetadata schema, ParsedSchema parsedSchema) {
     URI basePath = new URI(schemaRegistryUrls.get(0))
         .resolve(Integer.toString(schema.getId()));
-    ParsedSchema schemaById = schemaRegistryClient.getSchemaById(schema.getId());
     SchemaType schemaType = SchemaType.fromString(schema.getSchemaType())
         .orElseThrow(() -> new IllegalStateException("Unknown schema type: " + schema.getSchemaType()));
-    switch (schemaType) {
-      case PROTOBUF:
-        return new ProtobufSchemaConverter()
-            .convert(basePath, ((ProtobufSchema) schemaById).toDescriptor())
-            .toJson();
-      case AVRO:
-        return new AvroJsonSchemaConverter()
-            .convert(basePath, ((AvroSchema) schemaById).rawSchema())
-            .toJson();
-      case JSON:
-        return schema.getSchema();
-      default:
-        throw new IllegalStateException();
-    }
+    return switch (schemaType) {
+      case PROTOBUF -> new ProtobufSchemaConverter()
+          .convert(basePath, ((ProtobufSchema) parsedSchema).toDescriptor())
+          .toJson();
+      case AVRO -> new AvroJsonSchemaConverter()
+          .convert(basePath, ((AvroSchema) parsedSchema).rawSchema())
+          .toJson();
+      case JSON -> schema.getSchema();
+    };
+  }
+
+  private Optional<ParsedSchema> getSchemaById(int id) {
+    return wrapWith404Handler(() -> schemaRegistryClient.getSchemaById(id));
   }
 
   private Optional<SchemaMetadata> getSchemaBySubject(String subject) {
@@ -253,16 +254,11 @@ public class SchemaRegistrySerde implements BuiltInSerde {
     boolean isKey = type == Target.KEY;
     SchemaType schemaType = SchemaType.fromString(schema.getSchemaType())
         .orElseThrow(() -> new IllegalStateException("Unknown schema type: " + schema.getSchemaType()));
-    switch (schemaType) {
-      case PROTOBUF:
-        return new ProtobufSchemaRegistrySerializer(topic, isKey, schemaRegistryClient, schema);
-      case AVRO:
-        return new AvroSchemaRegistrySerializer(topic, isKey, schemaRegistryClient, schema);
-      case JSON:
-        return new JsonSchemaSchemaRegistrySerializer(topic, isKey, schemaRegistryClient, schema);
-      default:
-        throw new IllegalStateException();
-    }
+    return switch (schemaType) {
+      case PROTOBUF -> new ProtobufSchemaRegistrySerializer(topic, isKey, schemaRegistryClient, schema);
+      case AVRO -> new AvroSchemaRegistrySerializer(topic, isKey, schemaRegistryClient, schema);
+      case JSON -> new JsonSchemaSchemaRegistrySerializer(topic, isKey, schemaRegistryClient, schema);
+    };
   }
 
   @Override
@@ -297,7 +293,7 @@ public class SchemaRegistrySerde implements BuiltInSerde {
   }
 
   private SchemaType getMessageFormatBySchemaId(int schemaId) {
-    return wrapWith404Handler(() -> schemaRegistryClient.getSchemaById(schemaId))
+    return getSchemaById(schemaId)
         .map(ParsedSchema::schemaType)
         .flatMap(SchemaType::fromString)
         .orElseThrow(() -> new ValidationException(String.format("Schema for id '%d' not found ", schemaId)));