From b19b9d82e921cf0c2fb3e3fb6517c79d0b4053e8 Mon Sep 17 00:00:00 2001 From: Ilya Kuramshin Date: Mon, 3 Oct 2022 17:16:55 +0400 Subject: [PATCH] Minor backend fixes (#2696) Minor backend fixes: 1. InternalTopic.leader fiels filling fix 2. bytesIn/OutPerSec fields set to TopicDTO 3. "subject" property added to SR SchemaDescription --- .../kafka/ui/model/InternalTopic.java | 9 +++-- .../builtin/sr/SchemaRegistrySerde.java | 1 + .../builtin/sr/SchemaRegistrySerdeTest.java | 35 ++++++++++++------- .../main/resources/swagger/kafka-ui-api.yaml | 4 +++ 4 files changed, 33 insertions(+), 16 deletions(-) diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/InternalTopic.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/InternalTopic.java index c6cc4b1b08..206e3a83fb 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/InternalTopic.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/InternalTopic.java @@ -57,9 +57,12 @@ public class InternalTopic { partitionDto.inSyncReplicasCount(partition.isr().size()); partitionDto.replicasCount(partition.replicas().size()); List replicas = partition.replicas().stream() - .map(r -> new InternalReplica(r.id(), - partition.leader() != null && partition.leader().id() != r.id(), - partition.isr().contains(r))) + .map(r -> + InternalReplica.builder() + .broker(r.id()) + .inSync(partition.isr().contains(r)) + .leader(partition.leader() != null && partition.leader().id() == r.id()) + .build()) .collect(Collectors.toList()); partitionDto.replicas(replicas); diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/serdes/builtin/sr/SchemaRegistrySerde.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/serdes/builtin/sr/SchemaRegistrySerde.java index eb67ee7643..787bec2808 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/serdes/builtin/sr/SchemaRegistrySerde.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/serdes/builtin/sr/SchemaRegistrySerde.java @@ -142,6 +142,7 @@ public class SchemaRegistrySerde implements BuiltInSerde { new SchemaDescription( convertSchema(schemaMetadata), Map.of( + "subject", subject, "schemaId", schemaMetadata.getId(), "latestVersion", schemaMetadata.getVersion(), "type", schemaMetadata.getSchemaType() // AVRO / PROTOBUF / JSON diff --git a/kafka-ui-api/src/test/java/com/provectus/kafka/ui/serdes/builtin/sr/SchemaRegistrySerdeTest.java b/kafka-ui-api/src/test/java/com/provectus/kafka/ui/serdes/builtin/sr/SchemaRegistrySerdeTest.java index 7e4e0145e5..c965930916 100644 --- a/kafka-ui-api/src/test/java/com/provectus/kafka/ui/serdes/builtin/sr/SchemaRegistrySerdeTest.java +++ b/kafka-ui-api/src/test/java/com/provectus/kafka/ui/serdes/builtin/sr/SchemaRegistrySerdeTest.java @@ -23,6 +23,8 @@ import org.apache.avro.io.Encoder; import org.apache.avro.io.EncoderFactory; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.CsvSource; class SchemaRegistrySerdeTest { @@ -36,21 +38,28 @@ class SchemaRegistrySerdeTest { serde.configure(List.of("wontbeused"), registryClient, "%s-key", "%s-value"); } - @Test - void returnsSchemaDescriptionIfSchemaRegisteredInSR() throws RestClientException, IOException { - String topic = "test"; - registryClient.register(topic + "-key", new AvroSchema("{ \"type\": \"int\" }")); - registryClient.register(topic + "-value", new AvroSchema("{ \"type\": \"float\" }")); + @ParameterizedTest + @CsvSource({ + "test_topic, test_topic-key, KEY", + "test_topic, test_topic-value, VALUE" + }) + @SneakyThrows + void returnsSchemaDescriptionIfSchemaRegisteredInSR(String topic, String subject, Serde.Target target) { + int schemaId = registryClient.register(subject, new AvroSchema("{ \"type\": \"int\" }")); + int registeredVersion = registryClient.getLatestSchemaMetadata(subject).getVersion(); - var keySchemaOptional = serde.getSchema(topic, Serde.Target.KEY); - assertThat(keySchemaOptional) - .map(SchemaDescription::getSchema) + var schemaOptional = serde.getSchema(topic, target); + assertThat(schemaOptional).isPresent(); + + SchemaDescription schemaDescription = schemaOptional.get(); + assertThat(schemaDescription.getSchema()) .contains("{\"$id\":\"int\",\"$schema\":\"https://json-schema.org/draft/2020-12/schema\",\"type\":\"integer\"}"); - - var valueSchemaOptional = serde.getSchema(topic, Serde.Target.VALUE); - assertThat(valueSchemaOptional) - .map(SchemaDescription::getSchema) - .contains("{\"$id\":\"float\",\"$schema\":\"https://json-schema.org/draft/2020-12/schema\",\"type\":\"number\"}"); + assertThat(schemaDescription.getAdditionalProperties()) + .containsOnlyKeys("subject", "schemaId", "latestVersion", "type") + .containsEntry("subject", subject) + .containsEntry("schemaId", schemaId) + .containsEntry("latestVersion", registeredVersion) + .containsEntry("type", "AVRO"); } @Test diff --git a/kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml b/kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml index e06c453781..86e939562a 100644 --- a/kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml +++ b/kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml @@ -2035,6 +2035,10 @@ components: format: int64 segmentCount: type: integer + bytesInPerSec: + type: number + bytesOutPerSec: + type: number underReplicatedPartitions: type: integer cleanUpPolicy: