123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218 |
- package com.provectus.kafka.ui;
- import com.provectus.kafka.ui.model.CompatibilityLevel;
- import com.provectus.kafka.ui.model.NewSchemaSubject;
- import com.provectus.kafka.ui.model.SchemaSubject;
- import com.provectus.kafka.ui.model.SchemaType;
- import lombok.extern.log4j.Log4j2;
- import lombok.val;
- import org.junit.jupiter.api.Assertions;
- import org.junit.jupiter.api.BeforeEach;
- import org.junit.jupiter.api.Test;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.boot.test.autoconfigure.web.reactive.AutoConfigureWebTestClient;
- import org.springframework.http.HttpStatus;
- import org.springframework.http.MediaType;
- import org.springframework.test.context.ContextConfiguration;
- import org.springframework.test.web.reactive.server.EntityExchangeResult;
- import org.springframework.test.web.reactive.server.WebTestClient;
- import org.springframework.web.reactive.function.BodyInserters;
- import reactor.core.publisher.Mono;
- import java.util.List;
- import java.util.UUID;
- @ContextConfiguration(initializers = {AbstractBaseTest.Initializer.class})
- @Log4j2
- @AutoConfigureWebTestClient(timeout = "10000")
- class SchemaRegistryServiceTests extends AbstractBaseTest {
- @Autowired
- WebTestClient webTestClient;
- String subject;
- @BeforeEach
- public void setUpBefore() {
- this.subject = UUID.randomUUID().toString();
- }
- @Test
- public void should404WhenGetAllSchemasForUnknownCluster() {
- webTestClient
- .get()
- .uri("/api/clusters/unknown-cluster/schemas")
- .exchange()
- .expectStatus().isNotFound();
- }
- @Test
- public void shouldReturn404WhenGetLatestSchemaByNonExistingSubject() {
- String unknownSchema = "unknown-schema";
- webTestClient
- .get()
- .uri("/api/clusters/{clusterName}/schemas/{subject}/latest", LOCAL, unknownSchema)
- .exchange()
- .expectStatus().isNotFound();
- }
- @Test
- void shouldReturn409WhenSchemaDuplicatesThePreviousVersion() {
- String schema = "{\"subject\":\"%s\",\"schemaType\":\"AVRO\",\"schema\":\"{\\\"type\\\": \\\"string\\\"}\"}";
- webTestClient
- .post()
- .uri("/api/clusters/{clusterName}/schemas", LOCAL)
- .contentType(MediaType.APPLICATION_JSON)
- .body(BodyInserters.fromValue(schema.formatted(subject)))
- .exchange()
- .expectStatus().isEqualTo(HttpStatus.OK);
- webTestClient
- .post()
- .uri("/api/clusters/{clusterName}/schemas", LOCAL)
- .contentType(MediaType.APPLICATION_JSON)
- .body(BodyInserters.fromValue(schema.formatted(subject)))
- .exchange()
- .expectStatus().isEqualTo(HttpStatus.CONFLICT);
- }
- @Test
- void shouldCreateNewProtobufSchema() {
- String schema = "syntax = \"proto3\";\n\nmessage MyRecord {\n int32 id = 1;\n string name = 2;\n}\n";
- NewSchemaSubject requestBody = new NewSchemaSubject()
- .schemaType(SchemaType.PROTOBUF)
- .subject(subject)
- .schema(schema);
- SchemaSubject actual = webTestClient
- .post()
- .uri("/api/clusters/{clusterName}/schemas", LOCAL)
- .contentType(MediaType.APPLICATION_JSON)
- .body(BodyInserters.fromPublisher(Mono.just(requestBody), NewSchemaSubject.class))
- .exchange()
- .expectStatus()
- .isOk()
- .expectBody(SchemaSubject.class)
- .returnResult()
- .getResponseBody();
- Assertions.assertNotNull(actual);
- Assertions.assertEquals(CompatibilityLevel.CompatibilityEnum.BACKWARD.name(), actual.getCompatibilityLevel());
- Assertions.assertEquals("1", actual.getVersion());
- Assertions.assertEquals(SchemaType.PROTOBUF, actual.getSchemaType());
- Assertions.assertEquals(schema, actual.getSchema());
- }
- @Test
- public void shouldReturnBackwardAsGlobalCompatibilityLevelByDefault() {
- webTestClient
- .get()
- .uri("/api/clusters/{clusterName}/schemas/compatibility", LOCAL)
- .exchange()
- .expectStatus().isOk()
- .expectBody(CompatibilityLevel.class)
- .consumeWith(result -> {
- CompatibilityLevel responseBody = result.getResponseBody();
- Assertions.assertNotNull(responseBody);
- Assertions.assertEquals(CompatibilityLevel.CompatibilityEnum.BACKWARD, responseBody.getCompatibility());
- });
- }
- @Test
- public void shouldReturnNotEmptyResponseWhenGetAllSchemas() {
- createNewSubjectAndAssert(subject);
- webTestClient
- .get()
- .uri("/api/clusters/{clusterName}/schemas", LOCAL)
- .exchange()
- .expectStatus().isOk()
- .expectBodyList(SchemaSubject.class)
- .consumeWith(result -> {
- List<SchemaSubject> responseBody = result.getResponseBody();
- log.info("Response of test schemas: {}", responseBody);
- Assertions.assertNotNull(responseBody);
- Assertions.assertFalse(responseBody.isEmpty());
- SchemaSubject actualSchemaSubject = responseBody.stream()
- .filter(schemaSubject -> subject.equals(schemaSubject.getSubject()))
- .findFirst()
- .orElseThrow();
- Assertions.assertNotNull(actualSchemaSubject.getId());
- Assertions.assertNotNull(actualSchemaSubject.getVersion());
- Assertions.assertNotNull(actualSchemaSubject.getCompatibilityLevel());
- Assertions.assertEquals("\"string\"", actualSchemaSubject.getSchema());
- });
- }
- @Test
- public void shouldOkWhenCreateNewSchemaThenGetAndUpdateItsCompatibilityLevel() {
- createNewSubjectAndAssert(subject);
- //Get the created schema and check its items
- webTestClient
- .get()
- .uri("/api/clusters/{clusterName}/schemas/{subject}/latest", LOCAL, subject)
- .exchange()
- .expectStatus().isOk()
- .expectBodyList(SchemaSubject.class)
- .consumeWith(listEntityExchangeResult -> {
- val expectedCompatibility = CompatibilityLevel.CompatibilityEnum.BACKWARD;
- assertSchemaWhenGetLatest(subject, listEntityExchangeResult, expectedCompatibility);
- });
- //Now let's change compatibility level of this schema to FULL whereas the global level should be BACKWARD
- webTestClient.put()
- .uri("/api/clusters/{clusterName}/schemas/{subject}/compatibility", LOCAL, subject)
- .contentType(MediaType.APPLICATION_JSON)
- .body(BodyInserters.fromValue("{\"compatibility\":\"FULL\"}"))
- .exchange()
- .expectStatus().isOk();
- //Get one more time to check the schema compatibility level is changed to FULL
- webTestClient
- .get()
- .uri("/api/clusters/{clusterName}/schemas/{subject}/latest", LOCAL, subject)
- .exchange()
- .expectStatus().isOk()
- .expectBodyList(SchemaSubject.class)
- .consumeWith(listEntityExchangeResult -> {
- val expectedCompatibility = CompatibilityLevel.CompatibilityEnum.FULL;
- assertSchemaWhenGetLatest(subject, listEntityExchangeResult, expectedCompatibility);
- });
- }
- private void createNewSubjectAndAssert(String subject) {
- webTestClient
- .post()
- .uri("/api/clusters/{clusterName}/schemas", LOCAL)
- .contentType(MediaType.APPLICATION_JSON)
- .body(BodyInserters.fromValue("{\"subject\":\"%s\",\"schemaType\":\"AVRO\",\"schema\":\"{\\\"type\\\": \\\"string\\\"}\"}".formatted(subject)))
- .exchange()
- .expectStatus().isOk()
- .expectBody(SchemaSubject.class)
- .consumeWith(this::assertResponseBodyWhenCreateNewSchema);
- }
- private void assertSchemaWhenGetLatest(String subject, EntityExchangeResult<List<SchemaSubject>> listEntityExchangeResult, CompatibilityLevel.CompatibilityEnum expectedCompatibility) {
- List<SchemaSubject> responseBody = listEntityExchangeResult.getResponseBody();
- Assertions.assertNotNull(responseBody);
- Assertions.assertEquals(1, responseBody.size());
- SchemaSubject actualSchema = responseBody.get(0);
- Assertions.assertNotNull(actualSchema);
- Assertions.assertEquals(subject, actualSchema.getSubject());
- Assertions.assertEquals("\"string\"", actualSchema.getSchema());
- Assertions.assertNotNull(actualSchema.getCompatibilityLevel());
- Assertions.assertEquals(SchemaType.AVRO, actualSchema.getSchemaType());
- Assertions.assertEquals(expectedCompatibility.name(), actualSchema.getCompatibilityLevel());
- }
- private void assertResponseBodyWhenCreateNewSchema(EntityExchangeResult<SchemaSubject> exchangeResult) {
- SchemaSubject responseBody = exchangeResult.getResponseBody();
- Assertions.assertNotNull(responseBody);
- Assertions.assertEquals("1", responseBody.getVersion());
- Assertions.assertNotNull(responseBody.getSchema());
- Assertions.assertNotNull(responseBody.getSubject());
- Assertions.assertNotNull(responseBody.getCompatibilityLevel());
- Assertions.assertEquals(SchemaType.AVRO, responseBody.getSchemaType());
- }
- }
|