123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336 |
- package com.provectus.kafka.ui;
- import com.provectus.kafka.ui.model.CompatibilityLevelDTO;
- import com.provectus.kafka.ui.model.NewSchemaSubjectDTO;
- import com.provectus.kafka.ui.model.SchemaSubjectDTO;
- import com.provectus.kafka.ui.model.SchemaSubjectsResponseDTO;
- import com.provectus.kafka.ui.model.SchemaTypeDTO;
- import java.nio.charset.StandardCharsets;
- import java.util.List;
- import java.util.Objects;
- import java.util.UUID;
- import lombok.extern.slf4j.Slf4j;
- import lombok.val;
- import org.junit.jupiter.api.Assertions;
- import org.junit.jupiter.api.BeforeEach;
- import org.junit.jupiter.api.Test;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.http.HttpStatus;
- import org.springframework.http.MediaType;
- import org.springframework.test.web.reactive.server.EntityExchangeResult;
- import org.springframework.test.web.reactive.server.WebTestClient;
- import org.springframework.web.reactive.function.BodyInserters;
- import org.testcontainers.shaded.org.hamcrest.MatcherAssert;
- import org.testcontainers.shaded.org.hamcrest.Matchers;
- import reactor.core.publisher.Mono;
- @Slf4j
- class SchemaRegistryServiceTests extends AbstractIntegrationTest {
- @Autowired
- WebTestClient webTestClient;
- String subject;
- @BeforeEach
- public void setUpBefore() {
- this.subject = UUID.randomUUID().toString();
- }
- @Test
- public void should404WhenGetAllSchemasForUnknownCluster() {
- webTestClient
- .get()
- .uri("/api/clusters/unknown-cluster/schemas")
- .exchange()
- .expectStatus().isNotFound();
- }
- @Test
- public void shouldReturn404WhenGetLatestSchemaByNonExistingSubject() {
- String unknownSchema = "unknown-schema";
- webTestClient
- .get()
- .uri("/api/clusters/{clusterName}/schemas/{subject}/latest", LOCAL, unknownSchema)
- .exchange()
- .expectStatus().isNotFound();
- }
- /**
- * It should create a new schema w/o submitting a schemaType field to Schema Registry.
- */
- @Test
- void shouldBeBadRequestIfNoSchemaType() {
- String schema = "{\"subject\":\"%s\",\"schema\":\"{\\\"type\\\": \\\"string\\\"}\"}";
- webTestClient
- .post()
- .uri("/api/clusters/{clusterName}/schemas", LOCAL)
- .contentType(MediaType.APPLICATION_JSON)
- .body(BodyInserters.fromValue(String.format(schema, subject)))
- .exchange()
- .expectStatus().isBadRequest();
- }
- @Test
- void shouldNotDoAnythingIfSchemaNotChanged() {
- String schema =
- "{\"subject\":\"%s\",\"schemaType\":\"AVRO\",\"schema\":"
- + "\"{\\\"type\\\": \\\"string\\\"}\"}";
- SchemaSubjectDTO dto = webTestClient
- .post()
- .uri("/api/clusters/{clusterName}/schemas", LOCAL)
- .contentType(MediaType.APPLICATION_JSON)
- .body(BodyInserters.fromValue(String.format(schema, subject)))
- .exchange()
- .expectStatus()
- .isOk()
- .expectBody(SchemaSubjectDTO.class)
- .returnResult()
- .getResponseBody();
- Assertions.assertNotNull(dto);
- Assertions.assertEquals("1", dto.getVersion());
- dto = webTestClient
- .post()
- .uri("/api/clusters/{clusterName}/schemas", LOCAL)
- .contentType(MediaType.APPLICATION_JSON)
- .body(BodyInserters.fromValue(String.format(schema, subject)))
- .exchange()
- .expectStatus()
- .isOk()
- .expectBody(SchemaSubjectDTO.class)
- .returnResult()
- .getResponseBody();
- Assertions.assertNotNull(dto);
- Assertions.assertEquals("1", dto.getVersion());
- }
- @Test
- void shouldReturnCorrectMessageWhenIncompatibleSchema() {
- String schema = "{\"subject\":\"%s\",\"schemaType\":\"JSON\",\"schema\":"
- + "\"{\\\"type\\\": \\\"string\\\"," + "\\\"properties\\\": "
- + "{\\\"f1\\\": {\\\"type\\\": \\\"integer\\\"}}}"
- + "\"}";
- String schema2 = "{\"subject\":\"%s\"," + "\"schemaType\":\"JSON\",\"schema\":"
- + "\"{\\\"type\\\": \\\"string\\\"," + "\\\"properties\\\": "
- + "{\\\"f1\\\": {\\\"type\\\": \\\"string\\\"},"
- + "\\\"f2\\\": {" + "\\\"type\\\": \\\"string\\\"}}}"
- + "\"}";
- SchemaSubjectDTO dto =
- webTestClient
- .post()
- .uri("/api/clusters/{clusterName}/schemas", LOCAL)
- .contentType(MediaType.APPLICATION_JSON)
- .body(BodyInserters.fromValue(String.format(schema, subject)))
- .exchange()
- .expectStatus()
- .isOk()
- .expectBody(SchemaSubjectDTO.class)
- .returnResult()
- .getResponseBody();
- Assertions.assertNotNull(dto);
- Assertions.assertEquals("1", dto.getVersion());
- webTestClient
- .post()
- .uri("/api/clusters/{clusterName}/schemas", LOCAL)
- .contentType(MediaType.APPLICATION_JSON)
- .body(BodyInserters.fromValue(String.format(schema2, subject)))
- .exchange()
- .expectStatus().isEqualTo(HttpStatus.UNPROCESSABLE_ENTITY)
- .expectBody().consumeWith(body -> {
- String responseBody = new String(Objects.requireNonNull(body.getResponseBody()), StandardCharsets.UTF_8);
- MatcherAssert.assertThat("Must return correct message incompatible schema",
- responseBody,
- Matchers.containsString("Schema being registered is incompatible with an earlier schema"));
- });
- dto = webTestClient
- .get()
- .uri("/api/clusters/{clusterName}/schemas/{subject}/latest", LOCAL, subject)
- .exchange()
- .expectStatus().isOk()
- .expectBody(SchemaSubjectDTO.class)
- .returnResult()
- .getResponseBody();
- Assertions.assertNotNull(dto);
- Assertions.assertEquals("1", dto.getVersion());
- }
- @Test
- void shouldCreateNewProtobufSchema() {
- String schema =
- "syntax = \"proto3\";\n\nmessage MyRecord {\n int32 id = 1;\n string name = 2;\n}\n";
- NewSchemaSubjectDTO requestBody = new NewSchemaSubjectDTO()
- .schemaType(SchemaTypeDTO.PROTOBUF)
- .subject(subject)
- .schema(schema);
- SchemaSubjectDTO actual = webTestClient
- .post()
- .uri("/api/clusters/{clusterName}/schemas", LOCAL)
- .contentType(MediaType.APPLICATION_JSON)
- .body(BodyInserters.fromPublisher(Mono.just(requestBody), NewSchemaSubjectDTO.class))
- .exchange()
- .expectStatus()
- .isOk()
- .expectBody(SchemaSubjectDTO.class)
- .returnResult()
- .getResponseBody();
- Assertions.assertNotNull(actual);
- Assertions.assertEquals(CompatibilityLevelDTO.CompatibilityEnum.BACKWARD.name(),
- actual.getCompatibilityLevel());
- Assertions.assertEquals("1", actual.getVersion());
- Assertions.assertEquals(SchemaTypeDTO.PROTOBUF, actual.getSchemaType());
- Assertions.assertEquals(schema, actual.getSchema());
- }
- @Test
- public void shouldReturnBackwardAsGlobalCompatibilityLevelByDefault() {
- webTestClient
- .get()
- .uri("/api/clusters/{clusterName}/schemas/compatibility", LOCAL)
- .exchange()
- .expectStatus().isOk()
- .expectBody(CompatibilityLevelDTO.class)
- .consumeWith(result -> {
- CompatibilityLevelDTO responseBody = result.getResponseBody();
- Assertions.assertNotNull(responseBody);
- Assertions.assertEquals(CompatibilityLevelDTO.CompatibilityEnum.BACKWARD,
- responseBody.getCompatibility());
- });
- }
- @Test
- public void shouldReturnNotEmptyResponseWhenGetAllSchemas() {
- createNewSubjectAndAssert(subject);
- webTestClient
- .get()
- .uri("/api/clusters/{clusterName}/schemas", LOCAL)
- .exchange()
- .expectStatus().isOk()
- .expectBody(SchemaSubjectsResponseDTO.class)
- .consumeWith(result -> {
- SchemaSubjectsResponseDTO responseBody = result.getResponseBody();
- log.info("Response of test schemas: {}", responseBody);
- Assertions.assertNotNull(responseBody);
- Assertions.assertFalse(responseBody.getSchemas().isEmpty());
- SchemaSubjectDTO actualSchemaSubject = responseBody.getSchemas().stream()
- .filter(schemaSubject -> subject.equals(schemaSubject.getSubject()))
- .findFirst()
- .orElseThrow();
- Assertions.assertNotNull(actualSchemaSubject.getId());
- Assertions.assertNotNull(actualSchemaSubject.getVersion());
- Assertions.assertNotNull(actualSchemaSubject.getCompatibilityLevel());
- Assertions.assertEquals("\"string\"", actualSchemaSubject.getSchema());
- });
- }
- @Test
- public void shouldOkWhenCreateNewSchemaThenGetAndUpdateItsCompatibilityLevel() {
- createNewSubjectAndAssert(subject);
- //Get the created schema and check its items
- webTestClient
- .get()
- .uri("/api/clusters/{clusterName}/schemas/{subject}/latest", LOCAL, subject)
- .exchange()
- .expectStatus().isOk()
- .expectBodyList(SchemaSubjectDTO.class)
- .consumeWith(listEntityExchangeResult -> {
- val expectedCompatibility =
- CompatibilityLevelDTO.CompatibilityEnum.BACKWARD;
- assertSchemaWhenGetLatest(subject, listEntityExchangeResult, expectedCompatibility);
- });
- // Now let's change compatibility level of this schema to FULL whereas the global
- // level should be BACKWARD
- webTestClient.put()
- .uri("/api/clusters/{clusterName}/schemas/{subject}/compatibility", LOCAL, subject)
- .contentType(MediaType.APPLICATION_JSON)
- .body(BodyInserters.fromValue("{\"compatibility\":\"FULL\"}"))
- .exchange()
- .expectStatus().isOk();
- //Get one more time to check the schema compatibility level is changed to FULL
- webTestClient
- .get()
- .uri("/api/clusters/{clusterName}/schemas/{subject}/latest", LOCAL, subject)
- .exchange()
- .expectStatus().isOk()
- .expectBodyList(SchemaSubjectDTO.class)
- .consumeWith(listEntityExchangeResult -> {
- val expectedCompatibility =
- CompatibilityLevelDTO.CompatibilityEnum.FULL;
- assertSchemaWhenGetLatest(subject, listEntityExchangeResult, expectedCompatibility);
- });
- }
- @Test
- void shouldCreateNewSchemaWhenSubjectIncludesNonAsciiCharacters() {
- String schema =
- "{\"subject\":\"test/test\",\"schemaType\":\"JSON\",\"schema\":"
- + "\"{\\\"type\\\": \\\"string\\\"}\"}";
- webTestClient
- .post()
- .uri("/api/clusters/{clusterName}/schemas", LOCAL)
- .contentType(MediaType.APPLICATION_JSON)
- .body(BodyInserters.fromValue(schema))
- .exchange()
- .expectStatus().isOk();
- }
- private void createNewSubjectAndAssert(String subject) {
- webTestClient
- .post()
- .uri("/api/clusters/{clusterName}/schemas", LOCAL)
- .contentType(MediaType.APPLICATION_JSON)
- .body(BodyInserters.fromValue(
- String.format(
- "{\"subject\":\"%s\",\"schemaType\":\"AVRO\",\"schema\":"
- + "\"{\\\"type\\\": \\\"string\\\"}\"}",
- subject
- )
- ))
- .exchange()
- .expectStatus().isOk()
- .expectBody(SchemaSubjectDTO.class)
- .consumeWith(this::assertResponseBodyWhenCreateNewSchema);
- }
- private void assertSchemaWhenGetLatest(
- String subject, EntityExchangeResult<List<SchemaSubjectDTO>> listEntityExchangeResult,
- CompatibilityLevelDTO.CompatibilityEnum expectedCompatibility) {
- List<SchemaSubjectDTO> responseBody = listEntityExchangeResult.getResponseBody();
- Assertions.assertNotNull(responseBody);
- Assertions.assertEquals(1, responseBody.size());
- SchemaSubjectDTO actualSchema = responseBody.get(0);
- Assertions.assertNotNull(actualSchema);
- Assertions.assertEquals(subject, actualSchema.getSubject());
- Assertions.assertEquals("\"string\"", actualSchema.getSchema());
- Assertions.assertNotNull(actualSchema.getCompatibilityLevel());
- Assertions.assertEquals(SchemaTypeDTO.AVRO, actualSchema.getSchemaType());
- Assertions.assertEquals(expectedCompatibility.name(), actualSchema.getCompatibilityLevel());
- }
- private void assertResponseBodyWhenCreateNewSchema(
- EntityExchangeResult<SchemaSubjectDTO> exchangeResult) {
- SchemaSubjectDTO responseBody = exchangeResult.getResponseBody();
- Assertions.assertNotNull(responseBody);
- Assertions.assertEquals("1", responseBody.getVersion());
- Assertions.assertNotNull(responseBody.getSchema());
- Assertions.assertNotNull(responseBody.getSubject());
- Assertions.assertNotNull(responseBody.getCompatibilityLevel());
- Assertions.assertEquals(SchemaTypeDTO.AVRO, responseBody.getSchemaType());
- }
- }
|