Minor backend fixes (#2696)
Minor backend fixes: 1. InternalTopic.leader fiels filling fix 2. bytesIn/OutPerSec fields set to TopicDTO 3. "subject" property added to SR SchemaDescription
This commit is contained in:
parent
68218668ec
commit
b19b9d82e9
4 changed files with 33 additions and 16 deletions
|
@ -57,9 +57,12 @@ public class InternalTopic {
|
||||||
partitionDto.inSyncReplicasCount(partition.isr().size());
|
partitionDto.inSyncReplicasCount(partition.isr().size());
|
||||||
partitionDto.replicasCount(partition.replicas().size());
|
partitionDto.replicasCount(partition.replicas().size());
|
||||||
List<InternalReplica> replicas = partition.replicas().stream()
|
List<InternalReplica> replicas = partition.replicas().stream()
|
||||||
.map(r -> new InternalReplica(r.id(),
|
.map(r ->
|
||||||
partition.leader() != null && partition.leader().id() != r.id(),
|
InternalReplica.builder()
|
||||||
partition.isr().contains(r)))
|
.broker(r.id())
|
||||||
|
.inSync(partition.isr().contains(r))
|
||||||
|
.leader(partition.leader() != null && partition.leader().id() == r.id())
|
||||||
|
.build())
|
||||||
.collect(Collectors.toList());
|
.collect(Collectors.toList());
|
||||||
partitionDto.replicas(replicas);
|
partitionDto.replicas(replicas);
|
||||||
|
|
||||||
|
|
|
@ -142,6 +142,7 @@ public class SchemaRegistrySerde implements BuiltInSerde {
|
||||||
new SchemaDescription(
|
new SchemaDescription(
|
||||||
convertSchema(schemaMetadata),
|
convertSchema(schemaMetadata),
|
||||||
Map.of(
|
Map.of(
|
||||||
|
"subject", subject,
|
||||||
"schemaId", schemaMetadata.getId(),
|
"schemaId", schemaMetadata.getId(),
|
||||||
"latestVersion", schemaMetadata.getVersion(),
|
"latestVersion", schemaMetadata.getVersion(),
|
||||||
"type", schemaMetadata.getSchemaType() // AVRO / PROTOBUF / JSON
|
"type", schemaMetadata.getSchemaType() // AVRO / PROTOBUF / JSON
|
||||||
|
|
|
@ -23,6 +23,8 @@ import org.apache.avro.io.Encoder;
|
||||||
import org.apache.avro.io.EncoderFactory;
|
import org.apache.avro.io.EncoderFactory;
|
||||||
import org.junit.jupiter.api.BeforeEach;
|
import org.junit.jupiter.api.BeforeEach;
|
||||||
import org.junit.jupiter.api.Test;
|
import org.junit.jupiter.api.Test;
|
||||||
|
import org.junit.jupiter.params.ParameterizedTest;
|
||||||
|
import org.junit.jupiter.params.provider.CsvSource;
|
||||||
|
|
||||||
class SchemaRegistrySerdeTest {
|
class SchemaRegistrySerdeTest {
|
||||||
|
|
||||||
|
@ -36,21 +38,28 @@ class SchemaRegistrySerdeTest {
|
||||||
serde.configure(List.of("wontbeused"), registryClient, "%s-key", "%s-value");
|
serde.configure(List.of("wontbeused"), registryClient, "%s-key", "%s-value");
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@ParameterizedTest
|
||||||
void returnsSchemaDescriptionIfSchemaRegisteredInSR() throws RestClientException, IOException {
|
@CsvSource({
|
||||||
String topic = "test";
|
"test_topic, test_topic-key, KEY",
|
||||||
registryClient.register(topic + "-key", new AvroSchema("{ \"type\": \"int\" }"));
|
"test_topic, test_topic-value, VALUE"
|
||||||
registryClient.register(topic + "-value", new AvroSchema("{ \"type\": \"float\" }"));
|
})
|
||||||
|
@SneakyThrows
|
||||||
|
void returnsSchemaDescriptionIfSchemaRegisteredInSR(String topic, String subject, Serde.Target target) {
|
||||||
|
int schemaId = registryClient.register(subject, new AvroSchema("{ \"type\": \"int\" }"));
|
||||||
|
int registeredVersion = registryClient.getLatestSchemaMetadata(subject).getVersion();
|
||||||
|
|
||||||
var keySchemaOptional = serde.getSchema(topic, Serde.Target.KEY);
|
var schemaOptional = serde.getSchema(topic, target);
|
||||||
assertThat(keySchemaOptional)
|
assertThat(schemaOptional).isPresent();
|
||||||
.map(SchemaDescription::getSchema)
|
|
||||||
|
SchemaDescription schemaDescription = schemaOptional.get();
|
||||||
|
assertThat(schemaDescription.getSchema())
|
||||||
.contains("{\"$id\":\"int\",\"$schema\":\"https://json-schema.org/draft/2020-12/schema\",\"type\":\"integer\"}");
|
.contains("{\"$id\":\"int\",\"$schema\":\"https://json-schema.org/draft/2020-12/schema\",\"type\":\"integer\"}");
|
||||||
|
assertThat(schemaDescription.getAdditionalProperties())
|
||||||
var valueSchemaOptional = serde.getSchema(topic, Serde.Target.VALUE);
|
.containsOnlyKeys("subject", "schemaId", "latestVersion", "type")
|
||||||
assertThat(valueSchemaOptional)
|
.containsEntry("subject", subject)
|
||||||
.map(SchemaDescription::getSchema)
|
.containsEntry("schemaId", schemaId)
|
||||||
.contains("{\"$id\":\"float\",\"$schema\":\"https://json-schema.org/draft/2020-12/schema\",\"type\":\"number\"}");
|
.containsEntry("latestVersion", registeredVersion)
|
||||||
|
.containsEntry("type", "AVRO");
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
|
|
@ -2035,6 +2035,10 @@ components:
|
||||||
format: int64
|
format: int64
|
||||||
segmentCount:
|
segmentCount:
|
||||||
type: integer
|
type: integer
|
||||||
|
bytesInPerSec:
|
||||||
|
type: number
|
||||||
|
bytesOutPerSec:
|
||||||
|
type: number
|
||||||
underReplicatedPartitions:
|
underReplicatedPartitions:
|
||||||
type: integer
|
type: integer
|
||||||
cleanUpPolicy:
|
cleanUpPolicy:
|
||||||
|
|
Loading…
Add table
Reference in a new issue