Merge branch 'master' of github.com:provectus/kafka-ui into metrics_ph2
Conflicts: kafka-ui-api/src/test/java/com/provectus/kafka/ui/AbstractIntegrationTest.java
This commit is contained in:
commit
79ba8313f7
33 changed files with 362 additions and 116 deletions
5
.github/workflows/backend.yml
vendored
5
.github/workflows/backend.yml
vendored
|
@ -3,11 +3,14 @@ on:
|
||||||
push:
|
push:
|
||||||
branches:
|
branches:
|
||||||
- master
|
- master
|
||||||
pull_request:
|
pull_request_target:
|
||||||
types: ["opened", "edited", "reopened", "synchronize"]
|
types: ["opened", "edited", "reopened", "synchronize"]
|
||||||
paths:
|
paths:
|
||||||
- "kafka-ui-api/**"
|
- "kafka-ui-api/**"
|
||||||
- "pom.xml"
|
- "pom.xml"
|
||||||
|
permissions:
|
||||||
|
checks: write
|
||||||
|
pull-requests: write
|
||||||
jobs:
|
jobs:
|
||||||
build-and-test:
|
build-and-test:
|
||||||
runs-on: ubuntu-latest
|
runs-on: ubuntu-latest
|
||||||
|
|
8
.github/workflows/e2e-checks.yaml
vendored
8
.github/workflows/e2e-checks.yaml
vendored
|
@ -1,6 +1,6 @@
|
||||||
name: "E2E: PR healthcheck"
|
name: "E2E: PR healthcheck"
|
||||||
on:
|
on:
|
||||||
pull_request:
|
pull_request_target:
|
||||||
types: [ "opened", "edited", "reopened", "synchronize" ]
|
types: [ "opened", "edited", "reopened", "synchronize" ]
|
||||||
paths:
|
paths:
|
||||||
- "kafka-ui-api/**"
|
- "kafka-ui-api/**"
|
||||||
|
@ -8,6 +8,8 @@ on:
|
||||||
- "kafka-ui-react-app/**"
|
- "kafka-ui-react-app/**"
|
||||||
- "kafka-ui-e2e-checks/**"
|
- "kafka-ui-e2e-checks/**"
|
||||||
- "pom.xml"
|
- "pom.xml"
|
||||||
|
permissions:
|
||||||
|
statuses: write
|
||||||
jobs:
|
jobs:
|
||||||
build-and-test:
|
build-and-test:
|
||||||
runs-on: ubuntu-latest
|
runs-on: ubuntu-latest
|
||||||
|
@ -18,8 +20,8 @@ jobs:
|
||||||
- name: Configure AWS credentials
|
- name: Configure AWS credentials
|
||||||
uses: aws-actions/configure-aws-credentials@v2
|
uses: aws-actions/configure-aws-credentials@v2
|
||||||
with:
|
with:
|
||||||
aws-access-key-id: ${{ secrets.AWS_ACCESS_KEY_ID }}
|
aws-access-key-id: ${{ secrets.S3_AWS_ACCESS_KEY_ID }}
|
||||||
aws-secret-access-key: ${{ secrets.AWS_SECRET_ACCESS_KEY }}
|
aws-secret-access-key: ${{ secrets.S3_AWS_SECRET_ACCESS_KEY }}
|
||||||
aws-region: eu-central-1
|
aws-region: eu-central-1
|
||||||
- name: Set up environment
|
- name: Set up environment
|
||||||
id: set_env_values
|
id: set_env_values
|
||||||
|
|
7
.github/workflows/frontend.yaml
vendored
7
.github/workflows/frontend.yaml
vendored
|
@ -3,11 +3,14 @@ on:
|
||||||
push:
|
push:
|
||||||
branches:
|
branches:
|
||||||
- master
|
- master
|
||||||
pull_request:
|
pull_request_target:
|
||||||
types: ["opened", "edited", "reopened", "synchronize"]
|
types: ["opened", "edited", "reopened", "synchronize"]
|
||||||
paths:
|
paths:
|
||||||
- "kafka-ui-contract/**"
|
- "kafka-ui-contract/**"
|
||||||
- "kafka-ui-react-app/**"
|
- "kafka-ui-react-app/**"
|
||||||
|
permissions:
|
||||||
|
checks: write
|
||||||
|
pull-requests: write
|
||||||
jobs:
|
jobs:
|
||||||
build-and-test:
|
build-and-test:
|
||||||
env:
|
env:
|
||||||
|
@ -24,7 +27,7 @@ jobs:
|
||||||
with:
|
with:
|
||||||
version: 7.4.0
|
version: 7.4.0
|
||||||
- name: Install node
|
- name: Install node
|
||||||
uses: actions/setup-node@v3.6.0
|
uses: actions/setup-node@v3.7.0
|
||||||
with:
|
with:
|
||||||
node-version: "16.15.0"
|
node-version: "16.15.0"
|
||||||
cache: "pnpm"
|
cache: "pnpm"
|
||||||
|
|
5
.github/workflows/pr-checks.yaml
vendored
5
.github/workflows/pr-checks.yaml
vendored
|
@ -1,8 +1,9 @@
|
||||||
name: "PR: Checklist linter"
|
name: "PR: Checklist linter"
|
||||||
on:
|
on:
|
||||||
pull_request:
|
pull_request_target:
|
||||||
types: [opened, edited, synchronize, reopened]
|
types: [opened, edited, synchronize, reopened]
|
||||||
|
permissions:
|
||||||
|
checks: write
|
||||||
jobs:
|
jobs:
|
||||||
task-check:
|
task-check:
|
||||||
runs-on: ubuntu-latest
|
runs-on: ubuntu-latest
|
||||||
|
|
|
@ -1,13 +1,15 @@
|
||||||
name: Welcome first time contributors
|
name: Welcome first time contributors
|
||||||
|
|
||||||
on:
|
on:
|
||||||
pull_request:
|
pull_request_target:
|
||||||
types:
|
types:
|
||||||
- opened
|
- opened
|
||||||
issues:
|
issues:
|
||||||
types:
|
types:
|
||||||
- opened
|
- opened
|
||||||
|
permissions:
|
||||||
|
issues: write
|
||||||
|
pull-requests: write
|
||||||
jobs:
|
jobs:
|
||||||
welcome:
|
welcome:
|
||||||
runs-on: ubuntu-latest
|
runs-on: ubuntu-latest
|
||||||
|
|
|
@ -91,7 +91,7 @@
|
||||||
<dependency>
|
<dependency>
|
||||||
<groupId>software.amazon.msk</groupId>
|
<groupId>software.amazon.msk</groupId>
|
||||||
<artifactId>aws-msk-iam-auth</artifactId>
|
<artifactId>aws-msk-iam-auth</artifactId>
|
||||||
<version>1.1.6</version>
|
<version>1.1.7</version>
|
||||||
</dependency>
|
</dependency>
|
||||||
|
|
||||||
<dependency>
|
<dependency>
|
||||||
|
|
|
@ -123,9 +123,11 @@ public class TopicsController extends AbstractController implements TopicsApi {
|
||||||
.operationName("deleteTopic")
|
.operationName("deleteTopic")
|
||||||
.build();
|
.build();
|
||||||
|
|
||||||
return accessControlService.validateAccess(context).then(
|
return accessControlService.validateAccess(context)
|
||||||
topicsService.deleteTopic(getCluster(clusterName), topicName).map(ResponseEntity::ok)
|
.then(
|
||||||
).doOnEach(sig -> auditService.audit(context, sig));
|
topicsService.deleteTopic(getCluster(clusterName), topicName)
|
||||||
|
.thenReturn(ResponseEntity.ok().<Void>build())
|
||||||
|
).doOnEach(sig -> auditService.audit(context, sig));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -3,18 +3,21 @@ package com.provectus.kafka.ui.mapper;
|
||||||
import com.provectus.kafka.ui.model.CompatibilityCheckResponseDTO;
|
import com.provectus.kafka.ui.model.CompatibilityCheckResponseDTO;
|
||||||
import com.provectus.kafka.ui.model.CompatibilityLevelDTO;
|
import com.provectus.kafka.ui.model.CompatibilityLevelDTO;
|
||||||
import com.provectus.kafka.ui.model.NewSchemaSubjectDTO;
|
import com.provectus.kafka.ui.model.NewSchemaSubjectDTO;
|
||||||
|
import com.provectus.kafka.ui.model.SchemaReferenceDTO;
|
||||||
import com.provectus.kafka.ui.model.SchemaSubjectDTO;
|
import com.provectus.kafka.ui.model.SchemaSubjectDTO;
|
||||||
import com.provectus.kafka.ui.model.SchemaTypeDTO;
|
import com.provectus.kafka.ui.model.SchemaTypeDTO;
|
||||||
import com.provectus.kafka.ui.service.SchemaRegistryService;
|
import com.provectus.kafka.ui.service.SchemaRegistryService;
|
||||||
import com.provectus.kafka.ui.sr.model.Compatibility;
|
import com.provectus.kafka.ui.sr.model.Compatibility;
|
||||||
import com.provectus.kafka.ui.sr.model.CompatibilityCheckResponse;
|
import com.provectus.kafka.ui.sr.model.CompatibilityCheckResponse;
|
||||||
import com.provectus.kafka.ui.sr.model.NewSubject;
|
import com.provectus.kafka.ui.sr.model.NewSubject;
|
||||||
|
import com.provectus.kafka.ui.sr.model.SchemaReference;
|
||||||
import com.provectus.kafka.ui.sr.model.SchemaType;
|
import com.provectus.kafka.ui.sr.model.SchemaType;
|
||||||
|
import java.util.List;
|
||||||
import java.util.Optional;
|
import java.util.Optional;
|
||||||
import org.mapstruct.Mapper;
|
import org.mapstruct.Mapper;
|
||||||
|
|
||||||
|
|
||||||
@Mapper(componentModel = "spring")
|
@Mapper
|
||||||
public interface KafkaSrMapper {
|
public interface KafkaSrMapper {
|
||||||
|
|
||||||
default SchemaSubjectDTO toDto(SchemaRegistryService.SubjectWithCompatibilityLevel s) {
|
default SchemaSubjectDTO toDto(SchemaRegistryService.SubjectWithCompatibilityLevel s) {
|
||||||
|
@ -24,9 +27,12 @@ public interface KafkaSrMapper {
|
||||||
.subject(s.getSubject())
|
.subject(s.getSubject())
|
||||||
.schema(s.getSchema())
|
.schema(s.getSchema())
|
||||||
.schemaType(SchemaTypeDTO.fromValue(Optional.ofNullable(s.getSchemaType()).orElse(SchemaType.AVRO).getValue()))
|
.schemaType(SchemaTypeDTO.fromValue(Optional.ofNullable(s.getSchemaType()).orElse(SchemaType.AVRO).getValue()))
|
||||||
|
.references(toDto(s.getReferences()))
|
||||||
.compatibilityLevel(s.getCompatibility().toString());
|
.compatibilityLevel(s.getCompatibility().toString());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
List<SchemaReferenceDTO> toDto(List<SchemaReference> references);
|
||||||
|
|
||||||
CompatibilityCheckResponseDTO toDto(CompatibilityCheckResponse ccr);
|
CompatibilityCheckResponseDTO toDto(CompatibilityCheckResponse ccr);
|
||||||
|
|
||||||
CompatibilityLevelDTO.CompatibilityEnum toDto(Compatibility compatibility);
|
CompatibilityLevelDTO.CompatibilityEnum toDto(Compatibility compatibility);
|
||||||
|
|
|
@ -20,6 +20,7 @@ import io.confluent.kafka.schemaregistry.client.SchemaMetadata;
|
||||||
import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
|
import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
|
||||||
import io.confluent.kafka.schemaregistry.client.SchemaRegistryClientConfig;
|
import io.confluent.kafka.schemaregistry.client.SchemaRegistryClientConfig;
|
||||||
import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException;
|
import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException;
|
||||||
|
import io.confluent.kafka.schemaregistry.json.JsonSchema;
|
||||||
import io.confluent.kafka.schemaregistry.json.JsonSchemaProvider;
|
import io.confluent.kafka.schemaregistry.json.JsonSchemaProvider;
|
||||||
import io.confluent.kafka.schemaregistry.protobuf.ProtobufSchema;
|
import io.confluent.kafka.schemaregistry.protobuf.ProtobufSchema;
|
||||||
import io.confluent.kafka.schemaregistry.protobuf.ProtobufSchemaProvider;
|
import io.confluent.kafka.schemaregistry.protobuf.ProtobufSchemaProvider;
|
||||||
|
@ -217,7 +218,9 @@ public class SchemaRegistrySerde implements BuiltInSerde {
|
||||||
case AVRO -> new AvroJsonSchemaConverter()
|
case AVRO -> new AvroJsonSchemaConverter()
|
||||||
.convert(basePath, ((AvroSchema) parsedSchema).rawSchema())
|
.convert(basePath, ((AvroSchema) parsedSchema).rawSchema())
|
||||||
.toJson();
|
.toJson();
|
||||||
case JSON -> schema.getSchema();
|
case JSON ->
|
||||||
|
//need to use confluent JsonSchema since it includes resolved references
|
||||||
|
((JsonSchema) parsedSchema).rawSchema().toString();
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -14,8 +14,7 @@ import com.provectus.kafka.ui.sr.model.CompatibilityLevelChange;
|
||||||
import com.provectus.kafka.ui.sr.model.NewSubject;
|
import com.provectus.kafka.ui.sr.model.NewSubject;
|
||||||
import com.provectus.kafka.ui.sr.model.SchemaSubject;
|
import com.provectus.kafka.ui.sr.model.SchemaSubject;
|
||||||
import com.provectus.kafka.ui.util.ReactiveFailover;
|
import com.provectus.kafka.ui.util.ReactiveFailover;
|
||||||
import com.provectus.kafka.ui.util.WebClientConfigurator;
|
import java.nio.charset.Charset;
|
||||||
import java.io.IOException;
|
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.stream.Collectors;
|
import java.util.stream.Collectors;
|
||||||
import lombok.AllArgsConstructor;
|
import lombok.AllArgsConstructor;
|
||||||
|
@ -92,7 +91,7 @@ public class SchemaRegistryService {
|
||||||
private Mono<SubjectWithCompatibilityLevel> getSchemaSubject(KafkaCluster cluster, String schemaName,
|
private Mono<SubjectWithCompatibilityLevel> getSchemaSubject(KafkaCluster cluster, String schemaName,
|
||||||
String version) {
|
String version) {
|
||||||
return api(cluster)
|
return api(cluster)
|
||||||
.mono(c -> c.getSubjectVersion(schemaName, version))
|
.mono(c -> c.getSubjectVersion(schemaName, version, false))
|
||||||
.zipWith(getSchemaCompatibilityInfoOrGlobal(cluster, schemaName))
|
.zipWith(getSchemaCompatibilityInfoOrGlobal(cluster, schemaName))
|
||||||
.map(t -> new SubjectWithCompatibilityLevel(t.getT1(), t.getT2()))
|
.map(t -> new SubjectWithCompatibilityLevel(t.getT1(), t.getT2()))
|
||||||
.onErrorResume(WebClientResponseException.NotFound.class, th -> Mono.error(new SchemaNotFoundException()));
|
.onErrorResume(WebClientResponseException.NotFound.class, th -> Mono.error(new SchemaNotFoundException()));
|
||||||
|
|
|
@ -0,0 +1,55 @@
|
||||||
|
package com.provectus.kafka.ui.service.integration.odd;
|
||||||
|
|
||||||
|
import com.google.common.collect.ImmutableMap;
|
||||||
|
import com.google.common.collect.ImmutableSet;
|
||||||
|
import com.provectus.kafka.ui.sr.api.KafkaSrClientApi;
|
||||||
|
import com.provectus.kafka.ui.sr.model.SchemaReference;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Optional;
|
||||||
|
import javax.annotation.Nullable;
|
||||||
|
import reactor.core.publisher.Mono;
|
||||||
|
|
||||||
|
// logic copied from AbstractSchemaProvider:resolveReferences
|
||||||
|
// https://github.com/confluentinc/schema-registry/blob/fd59613e2c5adf62e36705307f420712e4c8c1ea/client/src/main/java/io/confluent/kafka/schemaregistry/AbstractSchemaProvider.java#L54
|
||||||
|
class SchemaReferencesResolver {
|
||||||
|
|
||||||
|
private final KafkaSrClientApi client;
|
||||||
|
|
||||||
|
SchemaReferencesResolver(KafkaSrClientApi client) {
|
||||||
|
this.client = client;
|
||||||
|
}
|
||||||
|
|
||||||
|
Mono<ImmutableMap<String, String>> resolve(List<SchemaReference> refs) {
|
||||||
|
return resolveReferences(refs, new Resolving(ImmutableMap.of(), ImmutableSet.of()))
|
||||||
|
.map(Resolving::resolved);
|
||||||
|
}
|
||||||
|
|
||||||
|
private record Resolving(ImmutableMap<String, String> resolved, ImmutableSet<String> visited) {
|
||||||
|
|
||||||
|
Resolving visit(String name) {
|
||||||
|
return new Resolving(resolved, ImmutableSet.<String>builder().addAll(visited).add(name).build());
|
||||||
|
}
|
||||||
|
|
||||||
|
Resolving resolve(String ref, String schema) {
|
||||||
|
return new Resolving(ImmutableMap.<String, String>builder().putAll(resolved).put(ref, schema).build(), visited);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private Mono<Resolving> resolveReferences(@Nullable List<SchemaReference> refs, Resolving initState) {
|
||||||
|
Mono<Resolving> result = Mono.just(initState);
|
||||||
|
for (SchemaReference reference : Optional.ofNullable(refs).orElse(List.of())) {
|
||||||
|
result = result.flatMap(state -> {
|
||||||
|
if (state.visited().contains(reference.getName())) {
|
||||||
|
return Mono.just(state);
|
||||||
|
} else {
|
||||||
|
final var newState = state.visit(reference.getName());
|
||||||
|
return client.getSubjectVersion(reference.getSubject(), String.valueOf(reference.getVersion()), true)
|
||||||
|
.flatMap(subj ->
|
||||||
|
resolveReferences(subj.getReferences(), newState)
|
||||||
|
.map(withNewRefs -> withNewRefs.resolve(reference.getName(), subj.getSchema())));
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
return result;
|
||||||
|
}
|
||||||
|
}
|
|
@ -6,6 +6,7 @@ import com.google.common.collect.ImmutableMap;
|
||||||
import com.provectus.kafka.ui.model.KafkaCluster;
|
import com.provectus.kafka.ui.model.KafkaCluster;
|
||||||
import com.provectus.kafka.ui.service.StatisticsCache;
|
import com.provectus.kafka.ui.service.StatisticsCache;
|
||||||
import com.provectus.kafka.ui.service.integration.odd.schema.DataSetFieldsExtractors;
|
import com.provectus.kafka.ui.service.integration.odd.schema.DataSetFieldsExtractors;
|
||||||
|
import com.provectus.kafka.ui.sr.model.SchemaSubject;
|
||||||
import java.net.URI;
|
import java.net.URI;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
@ -25,6 +26,8 @@ import org.opendatadiscovery.oddrn.model.KafkaPath;
|
||||||
import org.springframework.web.reactive.function.client.WebClientResponseException;
|
import org.springframework.web.reactive.function.client.WebClientResponseException;
|
||||||
import reactor.core.publisher.Flux;
|
import reactor.core.publisher.Flux;
|
||||||
import reactor.core.publisher.Mono;
|
import reactor.core.publisher.Mono;
|
||||||
|
import reactor.util.function.Tuple2;
|
||||||
|
import reactor.util.function.Tuples;
|
||||||
|
|
||||||
@Slf4j
|
@Slf4j
|
||||||
@RequiredArgsConstructor
|
@RequiredArgsConstructor
|
||||||
|
@ -101,12 +104,20 @@ class TopicsExporter {
|
||||||
return Mono.just(List.of());
|
return Mono.just(List.of());
|
||||||
}
|
}
|
||||||
String subject = topic + (isKey ? "-key" : "-value");
|
String subject = topic + (isKey ? "-key" : "-value");
|
||||||
return cluster.getSchemaRegistryClient()
|
return getSubjWithResolvedRefs(cluster, subject)
|
||||||
.mono(client -> client.getSubjectVersion(subject, "latest"))
|
.map(t -> DataSetFieldsExtractors.extract(t.getT1(), t.getT2(), topicOddrn, isKey))
|
||||||
.map(subj -> DataSetFieldsExtractors.extract(subj, topicOddrn, isKey))
|
|
||||||
.onErrorResume(WebClientResponseException.NotFound.class, th -> Mono.just(List.of()))
|
.onErrorResume(WebClientResponseException.NotFound.class, th -> Mono.just(List.of()))
|
||||||
.onErrorMap(WebClientResponseException.class, err ->
|
.onErrorMap(WebClientResponseException.class, err ->
|
||||||
new IllegalStateException("Error retrieving subject %s".formatted(subject), err));
|
new IllegalStateException("Error retrieving subject %s".formatted(subject), err));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private Mono<Tuple2<SchemaSubject, Map<String, String>>> getSubjWithResolvedRefs(KafkaCluster cluster,
|
||||||
|
String subjectName) {
|
||||||
|
return cluster.getSchemaRegistryClient()
|
||||||
|
.mono(client ->
|
||||||
|
client.getSubjectVersion(subjectName, "latest", false)
|
||||||
|
.flatMap(subj -> new SchemaReferencesResolver(client).resolve(subj.getReferences())
|
||||||
|
.map(resolvedRefs -> Tuples.of(subj, resolvedRefs))));
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,7 +1,7 @@
|
||||||
package com.provectus.kafka.ui.service.integration.odd.schema;
|
package com.provectus.kafka.ui.service.integration.odd.schema;
|
||||||
|
|
||||||
import com.google.common.collect.ImmutableSet;
|
import com.google.common.collect.ImmutableSet;
|
||||||
import com.provectus.kafka.ui.sr.model.SchemaSubject;
|
import io.confluent.kafka.schemaregistry.avro.AvroSchema;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import org.apache.avro.Schema;
|
import org.apache.avro.Schema;
|
||||||
|
@ -14,8 +14,8 @@ final class AvroExtractor {
|
||||||
private AvroExtractor() {
|
private AvroExtractor() {
|
||||||
}
|
}
|
||||||
|
|
||||||
static List<DataSetField> extract(SchemaSubject subject, KafkaPath topicOddrn, boolean isKey) {
|
static List<DataSetField> extract(AvroSchema avroSchema, KafkaPath topicOddrn, boolean isKey) {
|
||||||
var schema = new Schema.Parser().parse(subject.getSchema());
|
var schema = avroSchema.rawSchema();
|
||||||
List<DataSetField> result = new ArrayList<>();
|
List<DataSetField> result = new ArrayList<>();
|
||||||
result.add(DataSetFieldsExtractors.rootField(topicOddrn, isKey));
|
result.add(DataSetFieldsExtractors.rootField(topicOddrn, isKey));
|
||||||
extract(
|
extract(
|
||||||
|
|
|
@ -2,7 +2,11 @@ package com.provectus.kafka.ui.service.integration.odd.schema;
|
||||||
|
|
||||||
import com.provectus.kafka.ui.sr.model.SchemaSubject;
|
import com.provectus.kafka.ui.sr.model.SchemaSubject;
|
||||||
import com.provectus.kafka.ui.sr.model.SchemaType;
|
import com.provectus.kafka.ui.sr.model.SchemaType;
|
||||||
|
import io.confluent.kafka.schemaregistry.avro.AvroSchema;
|
||||||
|
import io.confluent.kafka.schemaregistry.json.JsonSchema;
|
||||||
|
import io.confluent.kafka.schemaregistry.protobuf.ProtobufSchema;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
import java.util.Optional;
|
import java.util.Optional;
|
||||||
import org.opendatadiscovery.client.model.DataSetField;
|
import org.opendatadiscovery.client.model.DataSetField;
|
||||||
import org.opendatadiscovery.client.model.DataSetFieldType;
|
import org.opendatadiscovery.client.model.DataSetFieldType;
|
||||||
|
@ -10,12 +14,18 @@ import org.opendatadiscovery.oddrn.model.KafkaPath;
|
||||||
|
|
||||||
public final class DataSetFieldsExtractors {
|
public final class DataSetFieldsExtractors {
|
||||||
|
|
||||||
public static List<DataSetField> extract(SchemaSubject subject, KafkaPath topicOddrn, boolean isKey) {
|
public static List<DataSetField> extract(SchemaSubject subject,
|
||||||
|
Map<String, String> resolvedRefs,
|
||||||
|
KafkaPath topicOddrn,
|
||||||
|
boolean isKey) {
|
||||||
SchemaType schemaType = Optional.ofNullable(subject.getSchemaType()).orElse(SchemaType.AVRO);
|
SchemaType schemaType = Optional.ofNullable(subject.getSchemaType()).orElse(SchemaType.AVRO);
|
||||||
return switch (schemaType) {
|
return switch (schemaType) {
|
||||||
case AVRO -> AvroExtractor.extract(subject, topicOddrn, isKey);
|
case AVRO -> AvroExtractor.extract(
|
||||||
case JSON -> JsonSchemaExtractor.extract(subject, topicOddrn, isKey);
|
new AvroSchema(subject.getSchema(), List.of(), resolvedRefs, null), topicOddrn, isKey);
|
||||||
case PROTOBUF -> ProtoExtractor.extract(subject, topicOddrn, isKey);
|
case JSON -> JsonSchemaExtractor.extract(
|
||||||
|
new JsonSchema(subject.getSchema(), List.of(), resolvedRefs, null), topicOddrn, isKey);
|
||||||
|
case PROTOBUF -> ProtoExtractor.extract(
|
||||||
|
new ProtobufSchema(subject.getSchema(), List.of(), resolvedRefs, null, null), topicOddrn, isKey);
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -30,8 +30,8 @@ final class JsonSchemaExtractor {
|
||||||
private JsonSchemaExtractor() {
|
private JsonSchemaExtractor() {
|
||||||
}
|
}
|
||||||
|
|
||||||
static List<DataSetField> extract(SchemaSubject subject, KafkaPath topicOddrn, boolean isKey) {
|
static List<DataSetField> extract(JsonSchema jsonSchema, KafkaPath topicOddrn, boolean isKey) {
|
||||||
Schema schema = new JsonSchema(subject.getSchema()).rawSchema();
|
Schema schema = jsonSchema.rawSchema();
|
||||||
List<DataSetField> result = new ArrayList<>();
|
List<DataSetField> result = new ArrayList<>();
|
||||||
result.add(DataSetFieldsExtractors.rootField(topicOddrn, isKey));
|
result.add(DataSetFieldsExtractors.rootField(topicOddrn, isKey));
|
||||||
extract(
|
extract(
|
||||||
|
|
|
@ -15,7 +15,6 @@ import com.google.protobuf.Timestamp;
|
||||||
import com.google.protobuf.UInt32Value;
|
import com.google.protobuf.UInt32Value;
|
||||||
import com.google.protobuf.UInt64Value;
|
import com.google.protobuf.UInt64Value;
|
||||||
import com.google.protobuf.Value;
|
import com.google.protobuf.Value;
|
||||||
import com.provectus.kafka.ui.sr.model.SchemaSubject;
|
|
||||||
import io.confluent.kafka.schemaregistry.protobuf.ProtobufSchema;
|
import io.confluent.kafka.schemaregistry.protobuf.ProtobufSchema;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
@ -42,8 +41,8 @@ final class ProtoExtractor {
|
||||||
private ProtoExtractor() {
|
private ProtoExtractor() {
|
||||||
}
|
}
|
||||||
|
|
||||||
static List<DataSetField> extract(SchemaSubject subject, KafkaPath topicOddrn, boolean isKey) {
|
static List<DataSetField> extract(ProtobufSchema protobufSchema, KafkaPath topicOddrn, boolean isKey) {
|
||||||
Descriptor schema = new ProtobufSchema(subject.getSchema()).toDescriptor();
|
Descriptor schema = protobufSchema.toDescriptor();
|
||||||
List<DataSetField> result = new ArrayList<>();
|
List<DataSetField> result = new ArrayList<>();
|
||||||
result.add(DataSetFieldsExtractors.rootField(topicOddrn, isKey));
|
result.add(DataSetFieldsExtractors.rootField(topicOddrn, isKey));
|
||||||
var rootOddrn = topicOddrn.oddrn() + "/columns/" + (isKey ? "key" : "value");
|
var rootOddrn = topicOddrn.oddrn() + "/columns/" + (isKey ? "key" : "value");
|
||||||
|
|
|
@ -28,6 +28,9 @@ import reactor.netty.http.client.HttpClient;
|
||||||
public class WebClientConfigurator {
|
public class WebClientConfigurator {
|
||||||
|
|
||||||
private final WebClient.Builder builder = WebClient.builder();
|
private final WebClient.Builder builder = WebClient.builder();
|
||||||
|
private HttpClient httpClient = HttpClient
|
||||||
|
.create()
|
||||||
|
.proxyWithSystemProperties();
|
||||||
|
|
||||||
public WebClientConfigurator() {
|
public WebClientConfigurator() {
|
||||||
configureObjectMapper(defaultOM());
|
configureObjectMapper(defaultOM());
|
||||||
|
@ -90,12 +93,7 @@ public class WebClientConfigurator {
|
||||||
// Create webclient
|
// Create webclient
|
||||||
SslContext context = contextBuilder.build();
|
SslContext context = contextBuilder.build();
|
||||||
|
|
||||||
var httpClient = HttpClient
|
httpClient = httpClient.secure(t -> t.sslContext(context));
|
||||||
.create()
|
|
||||||
.secure(t -> t.sslContext(context))
|
|
||||||
.proxyWithSystemProperties();
|
|
||||||
|
|
||||||
builder.clientConnector(new ReactorClientHttpConnector(httpClient));
|
|
||||||
return this;
|
return this;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -131,6 +129,6 @@ public class WebClientConfigurator {
|
||||||
}
|
}
|
||||||
|
|
||||||
public WebClient build() {
|
public WebClient build() {
|
||||||
return builder.build();
|
return builder.clientConnector(new ReactorClientHttpConnector(httpClient)).build();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,6 +1,7 @@
|
||||||
package com.provectus.kafka.ui;
|
package com.provectus.kafka.ui;
|
||||||
|
|
||||||
import com.provectus.kafka.ui.container.KafkaConnectContainer;
|
import com.provectus.kafka.ui.container.KafkaConnectContainer;
|
||||||
|
import com.provectus.kafka.ui.container.KsqlDbContainer;
|
||||||
import com.provectus.kafka.ui.container.SchemaRegistryContainer;
|
import com.provectus.kafka.ui.container.SchemaRegistryContainer;
|
||||||
import java.nio.file.Path;
|
import java.nio.file.Path;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
@ -32,7 +33,7 @@ public abstract class AbstractIntegrationTest {
|
||||||
public static final String LOCAL = "local";
|
public static final String LOCAL = "local";
|
||||||
public static final String SECOND_LOCAL = "secondLocal";
|
public static final String SECOND_LOCAL = "secondLocal";
|
||||||
|
|
||||||
private static final String CONFLUENT_PLATFORM_VERSION = "7.2.1";
|
private static final String CONFLUENT_PLATFORM_VERSION = "7.2.1"; // Append ".arm64" for a local run
|
||||||
|
|
||||||
public static final KafkaContainer kafka = new KafkaContainer(
|
public static final KafkaContainer kafka = new KafkaContainer(
|
||||||
DockerImageName.parse("confluentinc/cp-kafka").withTag(CONFLUENT_PLATFORM_VERSION))
|
DockerImageName.parse("confluentinc/cp-kafka").withTag(CONFLUENT_PLATFORM_VERSION))
|
||||||
|
@ -49,6 +50,11 @@ public abstract class AbstractIntegrationTest {
|
||||||
.dependsOn(kafka)
|
.dependsOn(kafka)
|
||||||
.dependsOn(schemaRegistry);
|
.dependsOn(schemaRegistry);
|
||||||
|
|
||||||
|
protected static final KsqlDbContainer KSQL_DB = new KsqlDbContainer(
|
||||||
|
DockerImageName.parse("confluentinc/cp-ksqldb-server")
|
||||||
|
.withTag(CONFLUENT_PLATFORM_VERSION))
|
||||||
|
.withKafka(kafka);
|
||||||
|
|
||||||
@TempDir
|
@TempDir
|
||||||
public static Path tmpDir;
|
public static Path tmpDir;
|
||||||
|
|
||||||
|
|
|
@ -2,6 +2,7 @@ package com.provectus.kafka.ui;
|
||||||
|
|
||||||
import com.provectus.kafka.ui.model.CompatibilityLevelDTO;
|
import com.provectus.kafka.ui.model.CompatibilityLevelDTO;
|
||||||
import com.provectus.kafka.ui.model.NewSchemaSubjectDTO;
|
import com.provectus.kafka.ui.model.NewSchemaSubjectDTO;
|
||||||
|
import com.provectus.kafka.ui.model.SchemaReferenceDTO;
|
||||||
import com.provectus.kafka.ui.model.SchemaSubjectDTO;
|
import com.provectus.kafka.ui.model.SchemaSubjectDTO;
|
||||||
import com.provectus.kafka.ui.model.SchemaSubjectsResponseDTO;
|
import com.provectus.kafka.ui.model.SchemaSubjectsResponseDTO;
|
||||||
import com.provectus.kafka.ui.model.SchemaTypeDTO;
|
import com.provectus.kafka.ui.model.SchemaTypeDTO;
|
||||||
|
@ -190,6 +191,58 @@ class SchemaRegistryServiceTests extends AbstractIntegrationTest {
|
||||||
Assertions.assertEquals(schema, actual.getSchema());
|
Assertions.assertEquals(schema, actual.getSchema());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@Test
|
||||||
|
void shouldCreateNewProtobufSchemaWithRefs() {
|
||||||
|
NewSchemaSubjectDTO requestBody = new NewSchemaSubjectDTO()
|
||||||
|
.schemaType(SchemaTypeDTO.PROTOBUF)
|
||||||
|
.subject(subject + "-ref")
|
||||||
|
.schema("""
|
||||||
|
syntax = "proto3";
|
||||||
|
message MyRecord {
|
||||||
|
int32 id = 1;
|
||||||
|
string name = 2;
|
||||||
|
}
|
||||||
|
""");
|
||||||
|
|
||||||
|
webTestClient
|
||||||
|
.post()
|
||||||
|
.uri("/api/clusters/{clusterName}/schemas", LOCAL)
|
||||||
|
.contentType(MediaType.APPLICATION_JSON)
|
||||||
|
.body(BodyInserters.fromPublisher(Mono.just(requestBody), NewSchemaSubjectDTO.class))
|
||||||
|
.exchange()
|
||||||
|
.expectStatus()
|
||||||
|
.isOk();
|
||||||
|
|
||||||
|
requestBody = new NewSchemaSubjectDTO()
|
||||||
|
.schemaType(SchemaTypeDTO.PROTOBUF)
|
||||||
|
.subject(subject)
|
||||||
|
.schema("""
|
||||||
|
syntax = "proto3";
|
||||||
|
import "MyRecord.proto";
|
||||||
|
message MyRecordWithRef {
|
||||||
|
int32 id = 1;
|
||||||
|
MyRecord my_ref = 2;
|
||||||
|
}
|
||||||
|
""")
|
||||||
|
.references(List.of(new SchemaReferenceDTO().name("MyRecord.proto").subject(subject + "-ref").version(1)));
|
||||||
|
|
||||||
|
SchemaSubjectDTO actual = webTestClient
|
||||||
|
.post()
|
||||||
|
.uri("/api/clusters/{clusterName}/schemas", LOCAL)
|
||||||
|
.contentType(MediaType.APPLICATION_JSON)
|
||||||
|
.body(BodyInserters.fromPublisher(Mono.just(requestBody), NewSchemaSubjectDTO.class))
|
||||||
|
.exchange()
|
||||||
|
.expectStatus()
|
||||||
|
.isOk()
|
||||||
|
.expectBody(SchemaSubjectDTO.class)
|
||||||
|
.returnResult()
|
||||||
|
.getResponseBody();
|
||||||
|
|
||||||
|
Assertions.assertNotNull(actual);
|
||||||
|
Assertions.assertEquals(requestBody.getReferences(), actual.getReferences());
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void shouldReturnBackwardAsGlobalCompatibilityLevelByDefault() {
|
public void shouldReturnBackwardAsGlobalCompatibilityLevelByDefault() {
|
||||||
webTestClient
|
webTestClient
|
||||||
|
@ -278,7 +331,7 @@ class SchemaRegistryServiceTests extends AbstractIntegrationTest {
|
||||||
void shouldCreateNewSchemaWhenSubjectIncludesNonAsciiCharacters() {
|
void shouldCreateNewSchemaWhenSubjectIncludesNonAsciiCharacters() {
|
||||||
String schema =
|
String schema =
|
||||||
"{\"subject\":\"test/test\",\"schemaType\":\"JSON\",\"schema\":"
|
"{\"subject\":\"test/test\",\"schemaType\":\"JSON\",\"schema\":"
|
||||||
+ "\"{\\\"type\\\": \\\"string\\\"}\"}";
|
+ "\"{\\\"type\\\": \\\"string\\\"}\"}";
|
||||||
|
|
||||||
webTestClient
|
webTestClient
|
||||||
.post()
|
.post()
|
||||||
|
|
|
@ -0,0 +1,86 @@
|
||||||
|
package com.provectus.kafka.ui.service.integration.odd;
|
||||||
|
|
||||||
|
import static org.assertj.core.api.Assertions.assertThat;
|
||||||
|
import static org.mockito.Mockito.mock;
|
||||||
|
import static org.mockito.Mockito.when;
|
||||||
|
|
||||||
|
import com.google.common.collect.ImmutableMap;
|
||||||
|
import com.provectus.kafka.ui.sr.api.KafkaSrClientApi;
|
||||||
|
import com.provectus.kafka.ui.sr.model.SchemaReference;
|
||||||
|
import com.provectus.kafka.ui.sr.model.SchemaSubject;
|
||||||
|
import java.util.List;
|
||||||
|
import org.junit.jupiter.api.Test;
|
||||||
|
import reactor.core.publisher.Mono;
|
||||||
|
import reactor.test.StepVerifier;
|
||||||
|
|
||||||
|
class SchemaReferencesResolverTest {
|
||||||
|
|
||||||
|
private final KafkaSrClientApi srClientMock = mock(KafkaSrClientApi.class);
|
||||||
|
|
||||||
|
private final SchemaReferencesResolver schemaReferencesResolver = new SchemaReferencesResolver(srClientMock);
|
||||||
|
|
||||||
|
@Test
|
||||||
|
void resolvesRefsUsingSrClient() {
|
||||||
|
mockSrCall("sub1", 1,
|
||||||
|
new SchemaSubject()
|
||||||
|
.schema("schema1"));
|
||||||
|
|
||||||
|
mockSrCall("sub2", 1,
|
||||||
|
new SchemaSubject()
|
||||||
|
.schema("schema2")
|
||||||
|
.references(
|
||||||
|
List.of(
|
||||||
|
new SchemaReference().name("ref2_1").subject("sub2_1").version(2),
|
||||||
|
new SchemaReference().name("ref2_2").subject("sub1").version(1))));
|
||||||
|
|
||||||
|
mockSrCall("sub2_1", 2,
|
||||||
|
new SchemaSubject()
|
||||||
|
.schema("schema2_1")
|
||||||
|
.references(
|
||||||
|
List.of(
|
||||||
|
new SchemaReference().name("ref2_1_1").subject("sub2_1_1").version(3),
|
||||||
|
new SchemaReference().name("ref1").subject("should_not_be_called").version(1)
|
||||||
|
))
|
||||||
|
);
|
||||||
|
|
||||||
|
mockSrCall("sub2_1_1", 3,
|
||||||
|
new SchemaSubject()
|
||||||
|
.schema("schema2_1_1"));
|
||||||
|
|
||||||
|
var resolvedRefsMono = schemaReferencesResolver.resolve(
|
||||||
|
List.of(
|
||||||
|
new SchemaReference().name("ref1").subject("sub1").version(1),
|
||||||
|
new SchemaReference().name("ref2").subject("sub2").version(1)));
|
||||||
|
|
||||||
|
StepVerifier.create(resolvedRefsMono)
|
||||||
|
.assertNext(refs ->
|
||||||
|
assertThat(refs)
|
||||||
|
.containsExactlyEntriesOf(
|
||||||
|
// checking map should be ordered
|
||||||
|
ImmutableMap.<String, String>builder()
|
||||||
|
.put("ref1", "schema1")
|
||||||
|
.put("ref2_1_1", "schema2_1_1")
|
||||||
|
.put("ref2_1", "schema2_1")
|
||||||
|
.put("ref2_2", "schema1")
|
||||||
|
.put("ref2", "schema2")
|
||||||
|
.build()))
|
||||||
|
.verifyComplete();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
void returnsEmptyMapOnEmptyInputs() {
|
||||||
|
StepVerifier.create(schemaReferencesResolver.resolve(null))
|
||||||
|
.assertNext(map -> assertThat(map).isEmpty())
|
||||||
|
.verifyComplete();
|
||||||
|
|
||||||
|
StepVerifier.create(schemaReferencesResolver.resolve(List.of()))
|
||||||
|
.assertNext(map -> assertThat(map).isEmpty())
|
||||||
|
.verifyComplete();
|
||||||
|
}
|
||||||
|
|
||||||
|
private void mockSrCall(String subject, int version, SchemaSubject subjectToReturn) {
|
||||||
|
when(srClientMock.getSubjectVersion(subject, version + "", true))
|
||||||
|
.thenReturn(Mono.just(subjectToReturn));
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
|
@ -3,6 +3,7 @@ package com.provectus.kafka.ui.service.integration.odd;
|
||||||
import static com.provectus.kafka.ui.service.metrics.scrape.ScrapedClusterState.TopicState;
|
import static com.provectus.kafka.ui.service.metrics.scrape.ScrapedClusterState.TopicState;
|
||||||
import static com.provectus.kafka.ui.service.metrics.scrape.ScrapedClusterState.empty;
|
import static com.provectus.kafka.ui.service.metrics.scrape.ScrapedClusterState.empty;
|
||||||
import static org.assertj.core.api.Assertions.assertThat;
|
import static org.assertj.core.api.Assertions.assertThat;
|
||||||
|
import static org.mockito.ArgumentMatchers.anyBoolean;
|
||||||
import static org.mockito.ArgumentMatchers.anyString;
|
import static org.mockito.ArgumentMatchers.anyString;
|
||||||
import static org.mockito.Mockito.mock;
|
import static org.mockito.Mockito.mock;
|
||||||
import static org.mockito.Mockito.when;
|
import static org.mockito.Mockito.when;
|
||||||
|
@ -57,9 +58,8 @@ class TopicsExporterTest {
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
void doesNotExportTopicsWhichDontFitFiltrationRule() {
|
void doesNotExportTopicsWhichDontFitFiltrationRule() {
|
||||||
when(schemaRegistryClientMock.getSubjectVersion(anyString(), anyString()))
|
when(schemaRegistryClientMock.getSubjectVersion(anyString(), anyString(), anyBoolean()))
|
||||||
.thenReturn(Mono.error(WebClientResponseException.create(404, "NF", new HttpHeaders(), null, null, null)));
|
.thenReturn(Mono.error(WebClientResponseException.create(404, "NF", new HttpHeaders(), null, null, null)));
|
||||||
|
|
||||||
stats = Statistics.empty()
|
stats = Statistics.empty()
|
||||||
.toBuilder()
|
.toBuilder()
|
||||||
.clusterState(
|
.clusterState(
|
||||||
|
@ -95,14 +95,14 @@ class TopicsExporterTest {
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
void doesExportTopicData() {
|
void doesExportTopicData() {
|
||||||
when(schemaRegistryClientMock.getSubjectVersion("testTopic-value", "latest"))
|
when(schemaRegistryClientMock.getSubjectVersion("testTopic-value", "latest", false))
|
||||||
.thenReturn(Mono.just(
|
.thenReturn(Mono.just(
|
||||||
new SchemaSubject()
|
new SchemaSubject()
|
||||||
.schema("\"string\"")
|
.schema("\"string\"")
|
||||||
.schemaType(SchemaType.AVRO)
|
.schemaType(SchemaType.AVRO)
|
||||||
));
|
));
|
||||||
|
|
||||||
when(schemaRegistryClientMock.getSubjectVersion("testTopic-key", "latest"))
|
when(schemaRegistryClientMock.getSubjectVersion("testTopic-key", "latest", false))
|
||||||
.thenReturn(Mono.just(
|
.thenReturn(Mono.just(
|
||||||
new SchemaSubject()
|
new SchemaSubject()
|
||||||
.schema("\"int\"")
|
.schema("\"int\"")
|
||||||
|
|
|
@ -2,7 +2,7 @@ package com.provectus.kafka.ui.service.integration.odd.schema;
|
||||||
|
|
||||||
import static org.assertj.core.api.Assertions.assertThat;
|
import static org.assertj.core.api.Assertions.assertThat;
|
||||||
|
|
||||||
import com.provectus.kafka.ui.sr.model.SchemaSubject;
|
import io.confluent.kafka.schemaregistry.avro.AvroSchema;
|
||||||
import org.junit.jupiter.params.ParameterizedTest;
|
import org.junit.jupiter.params.ParameterizedTest;
|
||||||
import org.junit.jupiter.params.provider.ValueSource;
|
import org.junit.jupiter.params.provider.ValueSource;
|
||||||
import org.opendatadiscovery.client.model.DataSetField;
|
import org.opendatadiscovery.client.model.DataSetField;
|
||||||
|
@ -15,8 +15,7 @@ class AvroExtractorTest {
|
||||||
@ValueSource(booleans = {true, false})
|
@ValueSource(booleans = {true, false})
|
||||||
void test(boolean isKey) {
|
void test(boolean isKey) {
|
||||||
var list = AvroExtractor.extract(
|
var list = AvroExtractor.extract(
|
||||||
new SchemaSubject()
|
new AvroSchema("""
|
||||||
.schema("""
|
|
||||||
{
|
{
|
||||||
"type": "record",
|
"type": "record",
|
||||||
"name": "Message",
|
"name": "Message",
|
||||||
|
|
|
@ -2,7 +2,7 @@ package com.provectus.kafka.ui.service.integration.odd.schema;
|
||||||
|
|
||||||
import static org.assertj.core.api.Assertions.assertThat;
|
import static org.assertj.core.api.Assertions.assertThat;
|
||||||
|
|
||||||
import com.provectus.kafka.ui.sr.model.SchemaSubject;
|
import io.confluent.kafka.schemaregistry.json.JsonSchema;
|
||||||
import java.net.URI;
|
import java.net.URI;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
@ -40,7 +40,7 @@ class JsonSchemaExtractorTest {
|
||||||
}
|
}
|
||||||
""";
|
""";
|
||||||
var fields = JsonSchemaExtractor.extract(
|
var fields = JsonSchemaExtractor.extract(
|
||||||
new SchemaSubject().schema(jsonSchema),
|
new JsonSchema(jsonSchema),
|
||||||
KafkaPath.builder()
|
KafkaPath.builder()
|
||||||
.cluster("localhost:9092")
|
.cluster("localhost:9092")
|
||||||
.topic("someTopic")
|
.topic("someTopic")
|
||||||
|
|
|
@ -2,7 +2,7 @@ package com.provectus.kafka.ui.service.integration.odd.schema;
|
||||||
|
|
||||||
import static org.assertj.core.api.Assertions.assertThat;
|
import static org.assertj.core.api.Assertions.assertThat;
|
||||||
|
|
||||||
import com.provectus.kafka.ui.sr.model.SchemaSubject;
|
import io.confluent.kafka.schemaregistry.protobuf.ProtobufSchema;
|
||||||
import org.junit.jupiter.params.ParameterizedTest;
|
import org.junit.jupiter.params.ParameterizedTest;
|
||||||
import org.junit.jupiter.params.provider.ValueSource;
|
import org.junit.jupiter.params.provider.ValueSource;
|
||||||
import org.opendatadiscovery.client.model.DataSetField;
|
import org.opendatadiscovery.client.model.DataSetField;
|
||||||
|
@ -54,8 +54,7 @@ class ProtoExtractorTest {
|
||||||
}""";
|
}""";
|
||||||
|
|
||||||
var list = ProtoExtractor.extract(
|
var list = ProtoExtractor.extract(
|
||||||
new SchemaSubject()
|
new ProtobufSchema(protoSchema),
|
||||||
.schema(protoSchema),
|
|
||||||
KafkaPath.builder()
|
KafkaPath.builder()
|
||||||
.cluster("localhost:9092")
|
.cluster("localhost:9092")
|
||||||
.topic("someTopic")
|
.topic("someTopic")
|
||||||
|
|
|
@ -3,30 +3,24 @@ package com.provectus.kafka.ui.service.ksql;
|
||||||
import static org.assertj.core.api.Assertions.assertThat;
|
import static org.assertj.core.api.Assertions.assertThat;
|
||||||
|
|
||||||
import com.fasterxml.jackson.databind.node.ArrayNode;
|
import com.fasterxml.jackson.databind.node.ArrayNode;
|
||||||
import com.fasterxml.jackson.databind.node.DoubleNode;
|
import com.fasterxml.jackson.databind.node.DecimalNode;
|
||||||
import com.fasterxml.jackson.databind.node.IntNode;
|
import com.fasterxml.jackson.databind.node.IntNode;
|
||||||
import com.fasterxml.jackson.databind.node.JsonNodeFactory;
|
import com.fasterxml.jackson.databind.node.JsonNodeFactory;
|
||||||
import com.fasterxml.jackson.databind.node.TextNode;
|
import com.fasterxml.jackson.databind.node.TextNode;
|
||||||
import com.provectus.kafka.ui.AbstractIntegrationTest;
|
import com.provectus.kafka.ui.AbstractIntegrationTest;
|
||||||
import com.provectus.kafka.ui.container.KsqlDbContainer;
|
import java.math.BigDecimal;
|
||||||
import java.time.Duration;
|
import java.time.Duration;
|
||||||
import java.util.List;
|
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import org.junit.Ignore;
|
import org.junit.Ignore;
|
||||||
import org.junit.jupiter.api.AfterAll;
|
import org.junit.jupiter.api.AfterAll;
|
||||||
import org.junit.jupiter.api.BeforeAll;
|
import org.junit.jupiter.api.BeforeAll;
|
||||||
import org.junit.jupiter.api.Test;
|
import org.junit.jupiter.api.Test;
|
||||||
import org.testcontainers.shaded.org.awaitility.Awaitility;
|
import org.testcontainers.shaded.org.awaitility.Awaitility;
|
||||||
import org.testcontainers.utility.DockerImageName;
|
|
||||||
import reactor.test.StepVerifier;
|
import reactor.test.StepVerifier;
|
||||||
|
|
||||||
@Ignore
|
@Ignore
|
||||||
class KsqlApiClientTest extends AbstractIntegrationTest {
|
class KsqlApiClientTest extends AbstractIntegrationTest {
|
||||||
|
|
||||||
private static final KsqlDbContainer KSQL_DB = new KsqlDbContainer(
|
|
||||||
DockerImageName.parse("confluentinc/ksqldb-server").withTag("0.24.0"))
|
|
||||||
.withKafka(kafka);
|
|
||||||
|
|
||||||
@BeforeAll
|
@BeforeAll
|
||||||
static void startContainer() {
|
static void startContainer() {
|
||||||
KSQL_DB.start();
|
KSQL_DB.start();
|
||||||
|
@ -74,7 +68,7 @@ class KsqlApiClientTest extends AbstractIntegrationTest {
|
||||||
private void assertLastKsqTutorialQueryResult(KsqlApiClient client) {
|
private void assertLastKsqTutorialQueryResult(KsqlApiClient client) {
|
||||||
// expected results:
|
// expected results:
|
||||||
//{"header":"Schema","columnNames":[...],"values":null}
|
//{"header":"Schema","columnNames":[...],"values":null}
|
||||||
//{"header":"Row","columnNames":null,"values":[[0.0,["4ab5cbad","8b6eae59","4a7c7b41"],3]]}
|
//{"header":"Row","columnNames":null,"values":[[0,["4ab5cbad","8b6eae59","4a7c7b41"],3]]}
|
||||||
//{"header":"Row","columnNames":null,"values":[[10.0,["18f4ea86"],1]]}
|
//{"header":"Row","columnNames":null,"values":[[10.0,["18f4ea86"],1]]}
|
||||||
StepVerifier.create(
|
StepVerifier.create(
|
||||||
client.execute(
|
client.execute(
|
||||||
|
@ -88,34 +82,26 @@ class KsqlApiClientTest extends AbstractIntegrationTest {
|
||||||
assertThat(header.getValues()).isNull();
|
assertThat(header.getValues()).isNull();
|
||||||
})
|
})
|
||||||
.assertNext(row -> {
|
.assertNext(row -> {
|
||||||
assertThat(row).isEqualTo(
|
var distance = (DecimalNode) row.getValues().get(0).get(0);
|
||||||
KsqlApiClient.KsqlResponseTable.builder()
|
var riders = (ArrayNode) row.getValues().get(0).get(1);
|
||||||
.header("Row")
|
var count = (IntNode) row.getValues().get(0).get(2);
|
||||||
.columnNames(null)
|
|
||||||
.values(List.of(List.of(
|
assertThat(distance).isEqualTo(new DecimalNode(new BigDecimal(0)));
|
||||||
new DoubleNode(0.0),
|
assertThat(riders).isEqualTo(new ArrayNode(JsonNodeFactory.instance)
|
||||||
new ArrayNode(JsonNodeFactory.instance)
|
.add(new TextNode("4ab5cbad"))
|
||||||
.add(new TextNode("4ab5cbad"))
|
.add(new TextNode("8b6eae59"))
|
||||||
.add(new TextNode("8b6eae59"))
|
.add(new TextNode("4a7c7b41")));
|
||||||
.add(new TextNode("4a7c7b41")),
|
assertThat(count).isEqualTo(new IntNode(3));
|
||||||
new IntNode(3)
|
|
||||||
)))
|
|
||||||
.build()
|
|
||||||
);
|
|
||||||
})
|
})
|
||||||
.assertNext(row -> {
|
.assertNext(row -> {
|
||||||
assertThat(row).isEqualTo(
|
var distance = (DecimalNode) row.getValues().get(0).get(0);
|
||||||
KsqlApiClient.KsqlResponseTable.builder()
|
var riders = (ArrayNode) row.getValues().get(0).get(1);
|
||||||
.header("Row")
|
var count = (IntNode) row.getValues().get(0).get(2);
|
||||||
.columnNames(null)
|
|
||||||
.values(List.of(List.of(
|
assertThat(distance).isEqualTo(new DecimalNode(new BigDecimal(10)));
|
||||||
new DoubleNode(10.0),
|
assertThat(riders).isEqualTo(new ArrayNode(JsonNodeFactory.instance)
|
||||||
new ArrayNode(JsonNodeFactory.instance)
|
.add(new TextNode("18f4ea86")));
|
||||||
.add(new TextNode("18f4ea86")),
|
assertThat(count).isEqualTo(new IntNode(1));
|
||||||
new IntNode(1)
|
|
||||||
)))
|
|
||||||
.build()
|
|
||||||
);
|
|
||||||
})
|
})
|
||||||
.verifyComplete();
|
.verifyComplete();
|
||||||
}
|
}
|
||||||
|
|
|
@ -3,7 +3,6 @@ package com.provectus.kafka.ui.service.ksql;
|
||||||
import static org.assertj.core.api.Assertions.assertThat;
|
import static org.assertj.core.api.Assertions.assertThat;
|
||||||
|
|
||||||
import com.provectus.kafka.ui.AbstractIntegrationTest;
|
import com.provectus.kafka.ui.AbstractIntegrationTest;
|
||||||
import com.provectus.kafka.ui.container.KsqlDbContainer;
|
|
||||||
import com.provectus.kafka.ui.model.KafkaCluster;
|
import com.provectus.kafka.ui.model.KafkaCluster;
|
||||||
import com.provectus.kafka.ui.model.KsqlStreamDescriptionDTO;
|
import com.provectus.kafka.ui.model.KsqlStreamDescriptionDTO;
|
||||||
import com.provectus.kafka.ui.model.KsqlTableDescriptionDTO;
|
import com.provectus.kafka.ui.model.KsqlTableDescriptionDTO;
|
||||||
|
@ -15,14 +14,9 @@ import java.util.concurrent.CopyOnWriteArraySet;
|
||||||
import org.junit.jupiter.api.AfterAll;
|
import org.junit.jupiter.api.AfterAll;
|
||||||
import org.junit.jupiter.api.BeforeAll;
|
import org.junit.jupiter.api.BeforeAll;
|
||||||
import org.junit.jupiter.api.Test;
|
import org.junit.jupiter.api.Test;
|
||||||
import org.testcontainers.utility.DockerImageName;
|
|
||||||
|
|
||||||
class KsqlServiceV2Test extends AbstractIntegrationTest {
|
class KsqlServiceV2Test extends AbstractIntegrationTest {
|
||||||
|
|
||||||
private static final KsqlDbContainer KSQL_DB = new KsqlDbContainer(
|
|
||||||
DockerImageName.parse("confluentinc/ksqldb-server").withTag("0.24.0"))
|
|
||||||
.withKafka(kafka);
|
|
||||||
|
|
||||||
private static final Set<String> STREAMS_TO_DELETE = new CopyOnWriteArraySet<>();
|
private static final Set<String> STREAMS_TO_DELETE = new CopyOnWriteArraySet<>();
|
||||||
private static final Set<String> TABLES_TO_DELETE = new CopyOnWriteArraySet<>();
|
private static final Set<String> TABLES_TO_DELETE = new CopyOnWriteArraySet<>();
|
||||||
|
|
||||||
|
|
|
@ -77,6 +77,10 @@ paths:
|
||||||
required: true
|
required: true
|
||||||
schema:
|
schema:
|
||||||
type: string
|
type: string
|
||||||
|
- name: deleted
|
||||||
|
in: query
|
||||||
|
schema:
|
||||||
|
type: boolean
|
||||||
responses:
|
responses:
|
||||||
200:
|
200:
|
||||||
description: OK
|
description: OK
|
||||||
|
@ -317,6 +321,10 @@ components:
|
||||||
type: string
|
type: string
|
||||||
schemaType:
|
schemaType:
|
||||||
$ref: '#/components/schemas/SchemaType'
|
$ref: '#/components/schemas/SchemaType'
|
||||||
|
references:
|
||||||
|
type: array
|
||||||
|
items:
|
||||||
|
$ref: '#/components/schemas/SchemaReference'
|
||||||
required:
|
required:
|
||||||
- id
|
- id
|
||||||
- subject
|
- subject
|
||||||
|
|
|
@ -2976,6 +2976,10 @@ components:
|
||||||
type: string
|
type: string
|
||||||
schemaType:
|
schemaType:
|
||||||
$ref: '#/components/schemas/SchemaType'
|
$ref: '#/components/schemas/SchemaType'
|
||||||
|
references:
|
||||||
|
type: array
|
||||||
|
items:
|
||||||
|
$ref: '#/components/schemas/SchemaReference'
|
||||||
required:
|
required:
|
||||||
- id
|
- id
|
||||||
- subject
|
- subject
|
||||||
|
@ -2993,13 +2997,30 @@ components:
|
||||||
schema:
|
schema:
|
||||||
type: string
|
type: string
|
||||||
schemaType:
|
schemaType:
|
||||||
$ref: '#/components/schemas/SchemaType'
|
$ref: '#/components/schemas/SchemaType' # upon updating a schema, the type of existing schema can't be changed
|
||||||
# upon updating a schema, the type of existing schema can't be changed
|
references:
|
||||||
|
type: array
|
||||||
|
items:
|
||||||
|
$ref: '#/components/schemas/SchemaReference'
|
||||||
required:
|
required:
|
||||||
- subject
|
- subject
|
||||||
- schema
|
- schema
|
||||||
- schemaType
|
- schemaType
|
||||||
|
|
||||||
|
SchemaReference:
|
||||||
|
type: object
|
||||||
|
properties:
|
||||||
|
name:
|
||||||
|
type: string
|
||||||
|
subject:
|
||||||
|
type: string
|
||||||
|
version:
|
||||||
|
type: integer
|
||||||
|
required:
|
||||||
|
- name
|
||||||
|
- subject
|
||||||
|
- version
|
||||||
|
|
||||||
CompatibilityLevel:
|
CompatibilityLevel:
|
||||||
type: object
|
type: object
|
||||||
properties:
|
properties:
|
||||||
|
|
|
@ -20,7 +20,7 @@
|
||||||
<selenide.version>6.12.3</selenide.version>
|
<selenide.version>6.12.3</selenide.version>
|
||||||
<testng.version>7.7.1</testng.version>
|
<testng.version>7.7.1</testng.version>
|
||||||
<allure.version>2.22.2</allure.version>
|
<allure.version>2.22.2</allure.version>
|
||||||
<qase.io.version>3.0.4</qase.io.version>
|
<qase.io.version>3.0.5</qase.io.version>
|
||||||
<aspectj.version>1.9.9.1</aspectj.version>
|
<aspectj.version>1.9.9.1</aspectj.version>
|
||||||
<assertj.version>3.24.2</assertj.version>
|
<assertj.version>3.24.2</assertj.version>
|
||||||
<hamcrest.version>2.2</hamcrest.version>
|
<hamcrest.version>2.2</hamcrest.version>
|
||||||
|
|
|
@ -1,8 +1,8 @@
|
||||||
import { SerdeDescription } from 'generated-sources';
|
import { SerdeDescription } from 'generated-sources';
|
||||||
import { getPrefferedDescription } from 'components/Topics/Topic/SendMessage/utils';
|
import { getPreferredDescription } from 'components/Topics/Topic/SendMessage/utils';
|
||||||
|
|
||||||
export const getDefaultSerdeName = (serdes: SerdeDescription[]) => {
|
export const getDefaultSerdeName = (serdes: SerdeDescription[]) => {
|
||||||
const preffered = getPrefferedDescription(serdes);
|
const preffered = getPreferredDescription(serdes);
|
||||||
if (preffered) {
|
if (preffered) {
|
||||||
return preffered.name || '';
|
return preffered.name || '';
|
||||||
}
|
}
|
||||||
|
|
|
@ -118,8 +118,8 @@ const SendMessage: React.FC<{ closeSidebar: () => void }> = ({
|
||||||
valueSerde,
|
valueSerde,
|
||||||
});
|
});
|
||||||
if (!keepContents) {
|
if (!keepContents) {
|
||||||
setValue('key', '');
|
setValue('key', defaultValues.key || '');
|
||||||
setValue('content', '');
|
setValue('content', defaultValues.content || '');
|
||||||
closeSidebar();
|
closeSidebar();
|
||||||
}
|
}
|
||||||
} catch (e) {
|
} catch (e) {
|
||||||
|
|
|
@ -13,21 +13,21 @@ jsf.option('fillProperties', false);
|
||||||
jsf.option('alwaysFakeOptionals', true);
|
jsf.option('alwaysFakeOptionals', true);
|
||||||
jsf.option('failOnInvalidFormat', false);
|
jsf.option('failOnInvalidFormat', false);
|
||||||
|
|
||||||
const generateValueFromSchema = (preffered?: SerdeDescription) => {
|
const generateValueFromSchema = (preferred?: SerdeDescription) => {
|
||||||
if (!preffered?.schema) {
|
if (!preferred?.schema) {
|
||||||
return undefined;
|
return undefined;
|
||||||
}
|
}
|
||||||
const parsedSchema = JSON.parse(preffered.schema);
|
const parsedSchema = JSON.parse(preferred.schema);
|
||||||
const value = jsf.generate(parsedSchema);
|
const value = jsf.generate(parsedSchema);
|
||||||
return JSON.stringify(value);
|
return JSON.stringify(value);
|
||||||
};
|
};
|
||||||
|
|
||||||
export const getPrefferedDescription = (serdes: SerdeDescription[]) =>
|
export const getPreferredDescription = (serdes: SerdeDescription[]) =>
|
||||||
serdes.find((s) => s.preferred);
|
serdes.find((s) => s.preferred);
|
||||||
|
|
||||||
export const getDefaultValues = (serdes: TopicSerdeSuggestion) => {
|
export const getDefaultValues = (serdes: TopicSerdeSuggestion) => {
|
||||||
const keySerde = getPrefferedDescription(serdes.key || []);
|
const keySerde = getPreferredDescription(serdes.key || []);
|
||||||
const valueSerde = getPrefferedDescription(serdes.value || []);
|
const valueSerde = getPreferredDescription(serdes.value || []);
|
||||||
|
|
||||||
return {
|
return {
|
||||||
key: generateValueFromSchema(keySerde),
|
key: generateValueFromSchema(keySerde),
|
||||||
|
@ -65,15 +65,15 @@ export const validateBySchema = (
|
||||||
return errors;
|
return errors;
|
||||||
}
|
}
|
||||||
|
|
||||||
let parcedSchema;
|
let parsedSchema;
|
||||||
let parsedValue;
|
let parsedValue;
|
||||||
|
|
||||||
try {
|
try {
|
||||||
parcedSchema = JSON.parse(schema);
|
parsedSchema = JSON.parse(schema);
|
||||||
} catch (e) {
|
} catch (e) {
|
||||||
return [`Error in parsing the "${type}" field schema`];
|
return [`Error in parsing the "${type}" field schema`];
|
||||||
}
|
}
|
||||||
if (parcedSchema.type === 'string') {
|
if (parsedSchema.type === 'string') {
|
||||||
return [];
|
return [];
|
||||||
}
|
}
|
||||||
try {
|
try {
|
||||||
|
@ -84,7 +84,7 @@ export const validateBySchema = (
|
||||||
try {
|
try {
|
||||||
const ajv = new Ajv();
|
const ajv = new Ajv();
|
||||||
addFormats(ajv);
|
addFormats(ajv);
|
||||||
const validate = ajv.compile(parcedSchema);
|
const validate = ajv.compile(parsedSchema);
|
||||||
validate(parsedValue);
|
validate(parsedValue);
|
||||||
if (validate.errors) {
|
if (validate.errors) {
|
||||||
errors = validate.errors.map(
|
errors = validate.errors.map(
|
||||||
|
|
6
pom.xml
6
pom.xml
|
@ -26,17 +26,17 @@
|
||||||
<assertj.version>3.19.0</assertj.version>
|
<assertj.version>3.19.0</assertj.version>
|
||||||
<avro.version>1.11.1</avro.version>
|
<avro.version>1.11.1</avro.version>
|
||||||
<byte-buddy.version>1.12.19</byte-buddy.version>
|
<byte-buddy.version>1.12.19</byte-buddy.version>
|
||||||
<confluent.version>7.3.3</confluent.version>
|
<confluent.version>7.4.0</confluent.version>
|
||||||
<datasketches-java.version>3.1.0</datasketches-java.version>
|
<datasketches-java.version>3.1.0</datasketches-java.version>
|
||||||
<groovy.version>3.0.13</groovy.version>
|
<groovy.version>3.0.13</groovy.version>
|
||||||
<jackson.version>2.14.0</jackson.version>
|
<jackson.version>2.14.0</jackson.version>
|
||||||
<kafka-clients.version>3.3.1</kafka-clients.version>
|
<kafka-clients.version>3.5.0</kafka-clients.version>
|
||||||
<org.mapstruct.version>1.5.5.Final</org.mapstruct.version>
|
<org.mapstruct.version>1.5.5.Final</org.mapstruct.version>
|
||||||
<org.projectlombok.version>1.18.24</org.projectlombok.version>
|
<org.projectlombok.version>1.18.24</org.projectlombok.version>
|
||||||
<protobuf-java.version>3.23.3</protobuf-java.version>
|
<protobuf-java.version>3.23.3</protobuf-java.version>
|
||||||
<scala-lang.library.version>2.13.9</scala-lang.library.version>
|
<scala-lang.library.version>2.13.9</scala-lang.library.version>
|
||||||
<snakeyaml.version>2.0</snakeyaml.version>
|
<snakeyaml.version>2.0</snakeyaml.version>
|
||||||
<spring-boot.version>3.0.6</spring-boot.version>
|
<spring-boot.version>3.1.1</spring-boot.version>
|
||||||
<kafka-ui-serde-api.version>1.0.0</kafka-ui-serde-api.version>
|
<kafka-ui-serde-api.version>1.0.0</kafka-ui-serde-api.version>
|
||||||
<odd-oddrn-generator.version>0.1.17</odd-oddrn-generator.version>
|
<odd-oddrn-generator.version>0.1.17</odd-oddrn-generator.version>
|
||||||
<odd-oddrn-client.version>0.1.23</odd-oddrn-client.version>
|
<odd-oddrn-client.version>0.1.23</odd-oddrn-client.version>
|
||||||
|
|
Loading…
Add table
Reference in a new issue