Include compatibility level info to schema response. Change Flux to Mono

This commit is contained in:
Ildar Almakaev 2021-02-04 14:32:47 +03:00
parent 861abeaa5f
commit 14922e0d5c
8 changed files with 106 additions and 98 deletions

View file

@ -40,6 +40,9 @@ public interface ClusterMapper {
@Mapping(target = "isCompatible", source = "compatible")
CompatibilityCheckResponse toCompatibilityCheckResponse(InternalCompatibilityCheck dto);
@Mapping(target = "compatibility", source = "compatibilityLevel")
CompatibilityLevel toCompatibilityLevel(InternalCompatibilityLevel dto);
default TopicDetails toTopicDetails(InternalTopic topic, InternalClusterMetrics metrics) {
final TopicDetails result = toTopicDetails(topic);
result.setBytesInPerSec(

View file

@ -0,0 +1,8 @@
package com.provectus.kafka.ui.cluster.model;
import lombok.Data;
@Data
public class InternalCompatibilityLevel {
private String compatibilityLevel;
}

View file

@ -4,7 +4,11 @@ import com.provectus.kafka.ui.cluster.exception.NotFoundException;
import com.provectus.kafka.ui.cluster.mapper.ClusterMapper;
import com.provectus.kafka.ui.cluster.model.ClustersStorage;
import com.provectus.kafka.ui.cluster.model.InternalCompatibilityCheck;
import com.provectus.kafka.ui.model.*;
import com.provectus.kafka.ui.cluster.model.InternalCompatibilityLevel;
import com.provectus.kafka.ui.model.CompatibilityCheckResponse;
import com.provectus.kafka.ui.model.CompatibilityLevel;
import com.provectus.kafka.ui.model.NewSchemaSubject;
import com.provectus.kafka.ui.model.SchemaSubject;
import lombok.RequiredArgsConstructor;
import lombok.extern.log4j.Log4j2;
import org.springframework.http.HttpStatus;
@ -24,9 +28,9 @@ import java.util.Objects;
@RequiredArgsConstructor
public class SchemaRegistryService {
private static final String URL_SUBJECTS = "/subjects";
private static final String URL_SUBJECT = "/subjects/{subjectName}";
private static final String URL_SUBJECT_VERSIONS = "/subjects/{subjectName}/versions";
private static final String URL_SUBJECT_BY_VERSION = "/subjects/{subjectName}/versions/{version}";
private static final String URL_SUBJECT = "/subjects/{schemaName}";
private static final String URL_SUBJECT_VERSIONS = "/subjects/{schemaName}/versions";
private static final String URL_SUBJECT_BY_VERSION = "/subjects/{schemaName}/versions/{version}";
private static final String LATEST = "latest";
private final ClustersStorage clustersStorage;
@ -44,56 +48,64 @@ public class SchemaRegistryService {
.orElse(Flux.error(new NotFoundException("No such cluster")));
}
public Flux<Integer> getSchemaSubjectVersions(String clusterName, String subjectName) {
public Flux<Integer> getSchemaSubjectVersions(String clusterName, String schemaName) {
return clustersStorage.getClusterByName(clusterName)
.map(cluster -> webClient.get()
.uri(cluster.getSchemaRegistry() + URL_SUBJECT_VERSIONS, subjectName)
.uri(cluster.getSchemaRegistry() + URL_SUBJECT_VERSIONS, schemaName)
.retrieve()
.onStatus(HttpStatus.NOT_FOUND::equals, resp -> Mono.error(new NotFoundException("No such subject")))
.bodyToFlux(Integer.class))
.orElse(Flux.error(new NotFoundException("No such cluster")));
}
public Flux<SchemaSubject> getSchemaSubjectByVersion(String clusterName, String subjectName, Integer version) {
return this.getSchemaSubject(clusterName, subjectName, String.valueOf(version));
public Mono<SchemaSubject> getSchemaSubjectByVersion(String clusterName, String schemaName, Integer version) {
return this.getSchemaSubject(clusterName, schemaName, String.valueOf(version));
}
public Flux<SchemaSubject> getLatestSchemaSubject(String clusterName, String subjectName) {
return this.getSchemaSubject(clusterName, subjectName, LATEST);
public Mono<SchemaSubject> getLatestSchemaSubject(String clusterName, String schemaName) {
return this.getSchemaSubject(clusterName, schemaName, LATEST);
}
private Flux<SchemaSubject> getSchemaSubject(String clusterName, String subjectName, String version) {
private Mono<SchemaSubject> getSchemaSubject(String clusterName, String schemaName, String version) {
return clustersStorage.getClusterByName(clusterName)
.map(cluster -> webClient.get()
.uri(cluster.getSchemaRegistry() + URL_SUBJECT_BY_VERSION, subjectName, version)
.uri(cluster.getSchemaRegistry() + URL_SUBJECT_BY_VERSION, schemaName, version)
.retrieve()
.onStatus(HttpStatus.NOT_FOUND::equals, resp -> Mono.error(new NotFoundException("No such subject or version")))
.bodyToFlux(SchemaSubject.class))
.orElse(Flux.error(new NotFoundException()));
.bodyToMono(SchemaSubject.class)
.zipWith(getSchemaCompatibilityInfoOrGlobal(clusterName, schemaName))
.map(tuple -> {
SchemaSubject schema = tuple.getT1();
String compatibilityLevel = tuple.getT2().getCompatibility().getValue();
schema.setCompatibilityLevel(compatibilityLevel);
return schema;
})
)
.orElse(Mono.error(new NotFoundException()));
}
public Mono<ResponseEntity<Void>> deleteSchemaSubjectByVersion(String clusterName, String subjectName, Integer version) {
return this.deleteSchemaSubject(clusterName, subjectName, String.valueOf(version));
public Mono<ResponseEntity<Void>> deleteSchemaSubjectByVersion(String clusterName, String schemaName, Integer version) {
return this.deleteSchemaSubject(clusterName, schemaName, String.valueOf(version));
}
public Mono<ResponseEntity<Void>> deleteLatestSchemaSubject(String clusterName, String subjectName) {
return this.deleteSchemaSubject(clusterName, subjectName, LATEST);
public Mono<ResponseEntity<Void>> deleteLatestSchemaSubject(String clusterName, String schemaName) {
return this.deleteSchemaSubject(clusterName, schemaName, LATEST);
}
private Mono<ResponseEntity<Void>> deleteSchemaSubject(String clusterName, String subjectName, String version) {
private Mono<ResponseEntity<Void>> deleteSchemaSubject(String clusterName, String schemaName, String version) {
return clustersStorage.getClusterByName(clusterName)
.map(cluster -> webClient.delete()
.uri(cluster.getSchemaRegistry() + URL_SUBJECT_BY_VERSION, subjectName, version)
.uri(cluster.getSchemaRegistry() + URL_SUBJECT_BY_VERSION, schemaName, version)
.retrieve()
.onStatus(HttpStatus.NOT_FOUND::equals, resp -> Mono.error(new NotFoundException("No such subject or version")))
.toBodilessEntity())
.orElse(Mono.error(new NotFoundException("No such cluster")));
}
public Mono<ResponseEntity<Void>> deleteSchemaSubject(String clusterName, String subjectName) {
public Mono<ResponseEntity<Void>> deleteSchemaSubject(String clusterName, String schemaName) {
return clustersStorage.getClusterByName(clusterName)
.map(cluster -> webClient.delete()
.uri(cluster.getSchemaRegistry() + URL_SUBJECT, subjectName)
.uri(cluster.getSchemaRegistry() + URL_SUBJECT, schemaName)
.retrieve()
.onStatus(HttpStatus.NOT_FOUND::equals, resp -> Mono.error(new NotFoundException("No such subject or version")))
.toBodilessEntity())
@ -136,21 +148,32 @@ public class SchemaRegistryService {
return updateSchemaCompatibility(clusterName, null, compatibilityLevel);
}
public Mono<CompatibilityLevelResponse> getSchemaCompatibilityLevel(String clusterName, String schemaName) {
public Mono<CompatibilityLevel> getSchemaCompatibilityLevel(String clusterName, String schemaName) {
return clustersStorage.getClusterByName(clusterName)
.map(cluster -> {
String configEndpoint = Objects.isNull(schemaName) ? "/config" : "/config/{schemaName}";
return webClient.get()
.uri(cluster.getSchemaRegistry() + configEndpoint, schemaName)
.retrieve()
.bodyToMono(CompatibilityLevelResponse.class);
}).orElse(Mono.error(new NotFoundException("No such cluster")));
.bodyToMono(InternalCompatibilityLevel.class)
.map(mapper::toCompatibilityLevel)
.onErrorResume(error -> Mono.empty());
}).orElse(Mono.empty());
}
public Mono<CompatibilityLevel> getGlobalSchemaCompatibilityLevel(String clusterName) {
return this.getSchemaCompatibilityLevel(clusterName, null);
}
private Mono<CompatibilityLevel> getSchemaCompatibilityInfoOrGlobal(String clusterName, String schemaName) {
return this.getSchemaCompatibilityLevel(clusterName, schemaName)
.switchIfEmpty(this.getGlobalSchemaCompatibilityLevel(clusterName));
}
public Mono<CompatibilityCheckResponse> checksSchemaCompatibility(String clusterName, String schemaName, Mono<NewSchemaSubject> newSchemaSubject) {
return clustersStorage.getClusterByName(clusterName)
.map(cluster -> webClient.post()
.uri(cluster.getSchemaRegistry() + "/compatibility/subjects/{subjectName}/versions/latest", schemaName)
.uri(cluster.getSchemaRegistry() + "/compatibility/subjects/{schemaName}/versions/latest", schemaName)
.contentType(MediaType.APPLICATION_JSON)
.body(BodyInserters.fromPublisher(newSchemaSubject, NewSchemaSubject.class))
.retrieve()

View file

@ -105,15 +105,17 @@ public class MetricsRestController implements ApiClustersApi {
}
@Override
public Mono<ResponseEntity<Flux<SchemaSubject>>> getLatestSchema(String clusterName, String schemaName, ServerWebExchange exchange) {
Flux<SchemaSubject> flux = schemaRegistryService.getLatestSchemaSubject(clusterName, schemaName);
return Mono.just(ResponseEntity.ok(flux)).onErrorReturn(ResponseEntity.notFound().build());
public Mono<ResponseEntity<SchemaSubject>> getLatestSchema(String clusterName, String schemaName, ServerWebExchange exchange) {
return schemaRegistryService.getLatestSchemaSubject(clusterName, schemaName)
.map(ResponseEntity::ok)
.onErrorReturn(ResponseEntity.notFound().build());
}
@Override
public Mono<ResponseEntity<Flux<SchemaSubject>>> getSchemaByVersion(String clusterName, String schemaName, Integer version, ServerWebExchange exchange) {
Flux<SchemaSubject> flux = schemaRegistryService.getSchemaSubjectByVersion(clusterName, schemaName, version);
return Mono.just(ResponseEntity.ok(flux)).onErrorReturn(ResponseEntity.notFound().build());
public Mono<ResponseEntity<SchemaSubject>> getSchemaByVersion(String clusterName, String schemaName, Integer version, ServerWebExchange exchange) {
return schemaRegistryService.getSchemaSubjectByVersion(clusterName, schemaName, version)
.map(ResponseEntity::ok)
.onErrorReturn(ResponseEntity.notFound().build());
}
@Override
@ -165,17 +167,10 @@ public class MetricsRestController implements ApiClustersApi {
}
@Override
public Mono<ResponseEntity<CompatibilityLevelResponse>> getGlobalSchemaCompatibilityLevel(String clusterName, ServerWebExchange exchange) {
return schemaRegistryService.getSchemaCompatibilityLevel(clusterName, null)
public Mono<ResponseEntity<CompatibilityLevel>> getGlobalSchemaCompatibilityLevel(String clusterName, ServerWebExchange exchange) {
return schemaRegistryService.getGlobalSchemaCompatibilityLevel(clusterName)
.map(ResponseEntity::ok)
.onErrorReturn(ResponseEntity.badRequest().build());
}
@Override
public Mono<ResponseEntity<CompatibilityLevelResponse>> getSchemaCompatibilityLevel(String clusterName, String schemaName, ServerWebExchange exchange) {
return schemaRegistryService.getSchemaCompatibilityLevel(clusterName, schemaName)
.map(ResponseEntity::ok)
.onErrorReturn(ResponseEntity.badRequest().build());
.defaultIfEmpty(ResponseEntity.badRequest().build());
}
@Override

View file

@ -11,6 +11,7 @@ kafka:
name: secondLocal
bootstrapServers: localhost:9093
zookeeper: localhost:2182
schemaRegistry: http://localhost:8081
jmxPort: 9998
admin-client-timeout: 5000
zookeeper:

View file

@ -9,6 +9,7 @@ kafka:
name: secondLocal
zookeeper: zookeeper1:2181
bootstrapServers: kafka1:29092
schemaRegistry: http://schemaregistry0:8085
admin-client-timeout: 5000
zookeeper:
connection-timeout: 1000

View file

@ -1,9 +1,8 @@
package com.provectus.kafka.ui;
import com.provectus.kafka.ui.model.CompatibilityLevelResponse;
import com.provectus.kafka.ui.model.CompatibilityLevel;
import com.provectus.kafka.ui.model.SchemaSubject;
import com.provectus.kafka.ui.rest.MetricsRestController;
import io.confluent.kafka.schemaregistry.CompatibilityLevel;
import lombok.extern.log4j.Log4j2;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
@ -43,20 +42,20 @@ class SchemaRegistryServiceTests extends AbstractBaseTest {
.expectStatus().isNotFound();
}
@Test
void shouldReturnBackwardAsGlobalCompatibilityLevelByDefault() {
WebTestClient.bindToController(metricsRestController).build()
.get()
.uri("http://localhost:8080/api/clusters/local/schemas/compatibility")
.exchange()
.expectStatus().isOk()
.expectBody(CompatibilityLevelResponse.class)
.consumeWith(result -> {
CompatibilityLevelResponse responseBody = result.getResponseBody();
Assertions.assertNotNull(responseBody);
Assertions.assertEquals(CompatibilityLevel.BACKWARD.name, responseBody.getCompatibilityLevel());
});
}
// @Test
// void shouldReturnBackwardAsGlobalCompatibilityLevelByDefault() {
// WebTestClient.bindToController(metricsRestController).build()
// .get()
// .uri("http://localhost:8080/api/clusters/local/schemas/compatibility")
// .exchange()
// .expectStatus().isOk()
// .expectBody(CompatibilityLevel.class)
// .consumeWith(result -> {
// CompatibilityLevel responseBody = result.getResponseBody();
// Assertions.assertNotNull(responseBody);
// Assertions.assertEquals(CompatibilityLevel.BACKWARD.name, responseBody());
// });
// }
@Test
public void shouldReturnNotNullResponseWhenGetAllSchemas() {
@ -106,6 +105,9 @@ class SchemaRegistryServiceTests extends AbstractBaseTest {
Assertions.assertNotNull(actualSchema);
Assertions.assertEquals(schemaName, actualSchema.getSubject());
Assertions.assertEquals("\"string\"", actualSchema.getSchema());
Assertions.assertNotNull(actualSchema.getCompatibilityLevel());
Assertions.assertEquals(CompatibilityLevel.CompatibilityEnum.BACKWARD.name(), actualSchema.getCompatibilityLevel());
}
private void assertResponseBodyWhenCreateNewSchema(EntityExchangeResult<SchemaSubject> exchangeResult) {

View file

@ -460,9 +460,7 @@ paths:
content:
application/json:
schema:
type: array
items:
$ref: '#/components/schemas/SchemaSubject'
$ref: '#/components/schemas/SchemaSubject'
delete:
tags:
- /api/clusters
@ -514,9 +512,7 @@ paths:
content:
application/json:
schema:
type: array
items:
$ref: '#/components/schemas/SchemaSubject'
$ref: '#/components/schemas/SchemaSubject'
delete:
tags:
- /api/clusters
@ -548,7 +544,7 @@ paths:
get:
tags:
- /api/clusters
summary: Get schema compatibility level globally
summary: Get global schema compatibility level
operationId: getGlobalSchemaCompatibilityLevel
parameters:
- name: clusterName
@ -562,7 +558,7 @@ paths:
content:
application/json:
schema:
$ref: '#/components/schemas/CompatibilityLevelResponse'
$ref: '#/components/schemas/CompatibilityLevel'
put:
tags:
- /api/clusters
@ -585,30 +581,7 @@ paths:
404:
description: Not Found
/api/clusters/{clusterName}/schemas/compatibility/{schemaName}:
get:
tags:
- /api/clusters
summary: Get schema compatibility level of specific schema
operationId: getSchemaCompatibilityLevel
parameters:
- name: clusterName
in: path
required: true
schema:
type: string
- name: schemaName
in: path
required: true
schema:
type: string
responses:
200:
description: OK
content:
application/json:
schema:
$ref: '#/components/schemas/CompatibilityLevelResponse'
/api/clusters/{clusterName}/schemas/{schemaName}/compatibility:
put:
tags:
- /api/clusters
@ -636,7 +609,7 @@ paths:
404:
description: Not Found
/api/clusters/{clusterName}/schemas/compatibility/{schemaName}/check:
/api/clusters/{clusterName}/schemas/{schemaName}/check:
post:
tags:
- /api/clusters
@ -1003,6 +976,8 @@ components:
type: integer
schema:
type: string
compatibilityLevel:
type: string
required:
- id
@ -1030,13 +1005,13 @@ components:
required:
- compatibility
CompatibilityLevelResponse:
type: object
properties:
compatibilityLevel:
type: string
required:
- compatibilityLevel
# CompatibilityLevelResponse:
# type: object
# properties:
# compatibilityLevel:
# type: string
# required:
# - compatibilityLevel
CompatibilityCheckResponse:
type: object