SchemaRegistryServiceTests.java 9.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218
  1. package com.provectus.kafka.ui;
  2. import com.provectus.kafka.ui.model.CompatibilityLevel;
  3. import com.provectus.kafka.ui.model.NewSchemaSubject;
  4. import com.provectus.kafka.ui.model.SchemaSubject;
  5. import com.provectus.kafka.ui.model.SchemaType;
  6. import lombok.extern.log4j.Log4j2;
  7. import lombok.val;
  8. import org.junit.jupiter.api.Assertions;
  9. import org.junit.jupiter.api.BeforeEach;
  10. import org.junit.jupiter.api.Test;
  11. import org.springframework.beans.factory.annotation.Autowired;
  12. import org.springframework.boot.test.autoconfigure.web.reactive.AutoConfigureWebTestClient;
  13. import org.springframework.http.HttpStatus;
  14. import org.springframework.http.MediaType;
  15. import org.springframework.test.context.ContextConfiguration;
  16. import org.springframework.test.web.reactive.server.EntityExchangeResult;
  17. import org.springframework.test.web.reactive.server.WebTestClient;
  18. import org.springframework.web.reactive.function.BodyInserters;
  19. import reactor.core.publisher.Mono;
  20. import java.util.List;
  21. import java.util.UUID;
  22. @ContextConfiguration(initializers = {AbstractBaseTest.Initializer.class})
  23. @Log4j2
  24. @AutoConfigureWebTestClient(timeout = "10000")
  25. class SchemaRegistryServiceTests extends AbstractBaseTest {
  26. @Autowired
  27. WebTestClient webTestClient;
  28. String subject;
  29. @BeforeEach
  30. public void setUpBefore() {
  31. this.subject = UUID.randomUUID().toString();
  32. }
  33. @Test
  34. public void should404WhenGetAllSchemasForUnknownCluster() {
  35. webTestClient
  36. .get()
  37. .uri("/api/clusters/unknown-cluster/schemas")
  38. .exchange()
  39. .expectStatus().isNotFound();
  40. }
  41. @Test
  42. public void shouldReturn404WhenGetLatestSchemaByNonExistingSubject() {
  43. String unknownSchema = "unknown-schema";
  44. webTestClient
  45. .get()
  46. .uri("/api/clusters/{clusterName}/schemas/{subject}/latest", LOCAL, unknownSchema)
  47. .exchange()
  48. .expectStatus().isNotFound();
  49. }
  50. @Test
  51. void shouldReturn409WhenSchemaDuplicatesThePreviousVersion() {
  52. String schema = "{\"subject\":\"%s\",\"schemaType\":\"AVRO\",\"schema\":\"{\\\"type\\\": \\\"string\\\"}\"}";
  53. webTestClient
  54. .post()
  55. .uri("/api/clusters/{clusterName}/schemas", LOCAL)
  56. .contentType(MediaType.APPLICATION_JSON)
  57. .body(BodyInserters.fromValue(schema.formatted(subject)))
  58. .exchange()
  59. .expectStatus().isEqualTo(HttpStatus.OK);
  60. webTestClient
  61. .post()
  62. .uri("/api/clusters/{clusterName}/schemas", LOCAL)
  63. .contentType(MediaType.APPLICATION_JSON)
  64. .body(BodyInserters.fromValue(schema.formatted(subject)))
  65. .exchange()
  66. .expectStatus().isEqualTo(HttpStatus.CONFLICT);
  67. }
  68. @Test
  69. void shouldCreateNewProtobufSchema() {
  70. String schema = "syntax = \"proto3\";\n\nmessage MyRecord {\n int32 id = 1;\n string name = 2;\n}\n";
  71. NewSchemaSubject requestBody = new NewSchemaSubject()
  72. .schemaType(SchemaType.PROTOBUF)
  73. .subject(subject)
  74. .schema(schema);
  75. SchemaSubject actual = webTestClient
  76. .post()
  77. .uri("/api/clusters/{clusterName}/schemas", LOCAL)
  78. .contentType(MediaType.APPLICATION_JSON)
  79. .body(BodyInserters.fromPublisher(Mono.just(requestBody), NewSchemaSubject.class))
  80. .exchange()
  81. .expectStatus()
  82. .isOk()
  83. .expectBody(SchemaSubject.class)
  84. .returnResult()
  85. .getResponseBody();
  86. Assertions.assertNotNull(actual);
  87. Assertions.assertEquals(CompatibilityLevel.CompatibilityEnum.BACKWARD.name(), actual.getCompatibilityLevel());
  88. Assertions.assertEquals("1", actual.getVersion());
  89. Assertions.assertEquals(SchemaType.PROTOBUF, actual.getSchemaType());
  90. Assertions.assertEquals(schema, actual.getSchema());
  91. }
  92. @Test
  93. public void shouldReturnBackwardAsGlobalCompatibilityLevelByDefault() {
  94. webTestClient
  95. .get()
  96. .uri("/api/clusters/{clusterName}/schemas/compatibility", LOCAL)
  97. .exchange()
  98. .expectStatus().isOk()
  99. .expectBody(CompatibilityLevel.class)
  100. .consumeWith(result -> {
  101. CompatibilityLevel responseBody = result.getResponseBody();
  102. Assertions.assertNotNull(responseBody);
  103. Assertions.assertEquals(CompatibilityLevel.CompatibilityEnum.BACKWARD, responseBody.getCompatibility());
  104. });
  105. }
  106. @Test
  107. public void shouldReturnNotEmptyResponseWhenGetAllSchemas() {
  108. createNewSubjectAndAssert(subject);
  109. webTestClient
  110. .get()
  111. .uri("/api/clusters/{clusterName}/schemas", LOCAL)
  112. .exchange()
  113. .expectStatus().isOk()
  114. .expectBodyList(SchemaSubject.class)
  115. .consumeWith(result -> {
  116. List<SchemaSubject> responseBody = result.getResponseBody();
  117. log.info("Response of test schemas: {}", responseBody);
  118. Assertions.assertNotNull(responseBody);
  119. Assertions.assertFalse(responseBody.isEmpty());
  120. SchemaSubject actualSchemaSubject = responseBody.stream()
  121. .filter(schemaSubject -> subject.equals(schemaSubject.getSubject()))
  122. .findFirst()
  123. .orElseThrow();
  124. Assertions.assertNotNull(actualSchemaSubject.getId());
  125. Assertions.assertNotNull(actualSchemaSubject.getVersion());
  126. Assertions.assertNotNull(actualSchemaSubject.getCompatibilityLevel());
  127. Assertions.assertEquals("\"string\"", actualSchemaSubject.getSchema());
  128. });
  129. }
  130. @Test
  131. public void shouldOkWhenCreateNewSchemaThenGetAndUpdateItsCompatibilityLevel() {
  132. createNewSubjectAndAssert(subject);
  133. //Get the created schema and check its items
  134. webTestClient
  135. .get()
  136. .uri("/api/clusters/{clusterName}/schemas/{subject}/latest", LOCAL, subject)
  137. .exchange()
  138. .expectStatus().isOk()
  139. .expectBodyList(SchemaSubject.class)
  140. .consumeWith(listEntityExchangeResult -> {
  141. val expectedCompatibility = CompatibilityLevel.CompatibilityEnum.BACKWARD;
  142. assertSchemaWhenGetLatest(subject, listEntityExchangeResult, expectedCompatibility);
  143. });
  144. //Now let's change compatibility level of this schema to FULL whereas the global level should be BACKWARD
  145. webTestClient.put()
  146. .uri("/api/clusters/{clusterName}/schemas/{subject}/compatibility", LOCAL, subject)
  147. .contentType(MediaType.APPLICATION_JSON)
  148. .body(BodyInserters.fromValue("{\"compatibility\":\"FULL\"}"))
  149. .exchange()
  150. .expectStatus().isOk();
  151. //Get one more time to check the schema compatibility level is changed to FULL
  152. webTestClient
  153. .get()
  154. .uri("/api/clusters/{clusterName}/schemas/{subject}/latest", LOCAL, subject)
  155. .exchange()
  156. .expectStatus().isOk()
  157. .expectBodyList(SchemaSubject.class)
  158. .consumeWith(listEntityExchangeResult -> {
  159. val expectedCompatibility = CompatibilityLevel.CompatibilityEnum.FULL;
  160. assertSchemaWhenGetLatest(subject, listEntityExchangeResult, expectedCompatibility);
  161. });
  162. }
  163. private void createNewSubjectAndAssert(String subject) {
  164. webTestClient
  165. .post()
  166. .uri("/api/clusters/{clusterName}/schemas", LOCAL)
  167. .contentType(MediaType.APPLICATION_JSON)
  168. .body(BodyInserters.fromValue("{\"subject\":\"%s\",\"schemaType\":\"AVRO\",\"schema\":\"{\\\"type\\\": \\\"string\\\"}\"}".formatted(subject)))
  169. .exchange()
  170. .expectStatus().isOk()
  171. .expectBody(SchemaSubject.class)
  172. .consumeWith(this::assertResponseBodyWhenCreateNewSchema);
  173. }
  174. private void assertSchemaWhenGetLatest(String subject, EntityExchangeResult<List<SchemaSubject>> listEntityExchangeResult, CompatibilityLevel.CompatibilityEnum expectedCompatibility) {
  175. List<SchemaSubject> responseBody = listEntityExchangeResult.getResponseBody();
  176. Assertions.assertNotNull(responseBody);
  177. Assertions.assertEquals(1, responseBody.size());
  178. SchemaSubject actualSchema = responseBody.get(0);
  179. Assertions.assertNotNull(actualSchema);
  180. Assertions.assertEquals(subject, actualSchema.getSubject());
  181. Assertions.assertEquals("\"string\"", actualSchema.getSchema());
  182. Assertions.assertNotNull(actualSchema.getCompatibilityLevel());
  183. Assertions.assertEquals(SchemaType.AVRO, actualSchema.getSchemaType());
  184. Assertions.assertEquals(expectedCompatibility.name(), actualSchema.getCompatibilityLevel());
  185. }
  186. private void assertResponseBodyWhenCreateNewSchema(EntityExchangeResult<SchemaSubject> exchangeResult) {
  187. SchemaSubject responseBody = exchangeResult.getResponseBody();
  188. Assertions.assertNotNull(responseBody);
  189. Assertions.assertEquals("1", responseBody.getVersion());
  190. Assertions.assertNotNull(responseBody.getSchema());
  191. Assertions.assertNotNull(responseBody.getSubject());
  192. Assertions.assertNotNull(responseBody.getCompatibilityLevel());
  193. Assertions.assertEquals(SchemaType.AVRO, responseBody.getSchemaType());
  194. }
  195. }