SchemaRegistryServiceTests.java 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336
  1. package com.provectus.kafka.ui;
  2. import com.provectus.kafka.ui.model.CompatibilityLevelDTO;
  3. import com.provectus.kafka.ui.model.NewSchemaSubjectDTO;
  4. import com.provectus.kafka.ui.model.SchemaSubjectDTO;
  5. import com.provectus.kafka.ui.model.SchemaSubjectsResponseDTO;
  6. import com.provectus.kafka.ui.model.SchemaTypeDTO;
  7. import java.nio.charset.StandardCharsets;
  8. import java.util.List;
  9. import java.util.Objects;
  10. import java.util.UUID;
  11. import lombok.extern.slf4j.Slf4j;
  12. import lombok.val;
  13. import org.junit.jupiter.api.Assertions;
  14. import org.junit.jupiter.api.BeforeEach;
  15. import org.junit.jupiter.api.Test;
  16. import org.springframework.beans.factory.annotation.Autowired;
  17. import org.springframework.http.HttpStatus;
  18. import org.springframework.http.MediaType;
  19. import org.springframework.test.web.reactive.server.EntityExchangeResult;
  20. import org.springframework.test.web.reactive.server.WebTestClient;
  21. import org.springframework.web.reactive.function.BodyInserters;
  22. import org.testcontainers.shaded.org.hamcrest.MatcherAssert;
  23. import org.testcontainers.shaded.org.hamcrest.Matchers;
  24. import reactor.core.publisher.Mono;
  25. @Slf4j
  26. class SchemaRegistryServiceTests extends AbstractIntegrationTest {
  27. @Autowired
  28. WebTestClient webTestClient;
  29. String subject;
  30. @BeforeEach
  31. public void setUpBefore() {
  32. this.subject = UUID.randomUUID().toString();
  33. }
  34. @Test
  35. public void should404WhenGetAllSchemasForUnknownCluster() {
  36. webTestClient
  37. .get()
  38. .uri("/api/clusters/unknown-cluster/schemas")
  39. .exchange()
  40. .expectStatus().isNotFound();
  41. }
  42. @Test
  43. public void shouldReturn404WhenGetLatestSchemaByNonExistingSubject() {
  44. String unknownSchema = "unknown-schema";
  45. webTestClient
  46. .get()
  47. .uri("/api/clusters/{clusterName}/schemas/{subject}/latest", LOCAL, unknownSchema)
  48. .exchange()
  49. .expectStatus().isNotFound();
  50. }
  51. /**
  52. * It should create a new schema w/o submitting a schemaType field to Schema Registry.
  53. */
  54. @Test
  55. void shouldBeBadRequestIfNoSchemaType() {
  56. String schema = "{\"subject\":\"%s\",\"schema\":\"{\\\"type\\\": \\\"string\\\"}\"}";
  57. webTestClient
  58. .post()
  59. .uri("/api/clusters/{clusterName}/schemas", LOCAL)
  60. .contentType(MediaType.APPLICATION_JSON)
  61. .body(BodyInserters.fromValue(String.format(schema, subject)))
  62. .exchange()
  63. .expectStatus().isBadRequest();
  64. }
  65. @Test
  66. void shouldNotDoAnythingIfSchemaNotChanged() {
  67. String schema =
  68. "{\"subject\":\"%s\",\"schemaType\":\"AVRO\",\"schema\":"
  69. + "\"{\\\"type\\\": \\\"string\\\"}\"}";
  70. SchemaSubjectDTO dto = webTestClient
  71. .post()
  72. .uri("/api/clusters/{clusterName}/schemas", LOCAL)
  73. .contentType(MediaType.APPLICATION_JSON)
  74. .body(BodyInserters.fromValue(String.format(schema, subject)))
  75. .exchange()
  76. .expectStatus()
  77. .isOk()
  78. .expectBody(SchemaSubjectDTO.class)
  79. .returnResult()
  80. .getResponseBody();
  81. Assertions.assertNotNull(dto);
  82. Assertions.assertEquals("1", dto.getVersion());
  83. dto = webTestClient
  84. .post()
  85. .uri("/api/clusters/{clusterName}/schemas", LOCAL)
  86. .contentType(MediaType.APPLICATION_JSON)
  87. .body(BodyInserters.fromValue(String.format(schema, subject)))
  88. .exchange()
  89. .expectStatus()
  90. .isOk()
  91. .expectBody(SchemaSubjectDTO.class)
  92. .returnResult()
  93. .getResponseBody();
  94. Assertions.assertNotNull(dto);
  95. Assertions.assertEquals("1", dto.getVersion());
  96. }
  97. @Test
  98. void shouldReturnCorrectMessageWhenIncompatibleSchema() {
  99. String schema = "{\"subject\":\"%s\",\"schemaType\":\"JSON\",\"schema\":"
  100. + "\"{\\\"type\\\": \\\"string\\\"," + "\\\"properties\\\": "
  101. + "{\\\"f1\\\": {\\\"type\\\": \\\"integer\\\"}}}"
  102. + "\"}";
  103. String schema2 = "{\"subject\":\"%s\"," + "\"schemaType\":\"JSON\",\"schema\":"
  104. + "\"{\\\"type\\\": \\\"string\\\"," + "\\\"properties\\\": "
  105. + "{\\\"f1\\\": {\\\"type\\\": \\\"string\\\"},"
  106. + "\\\"f2\\\": {" + "\\\"type\\\": \\\"string\\\"}}}"
  107. + "\"}";
  108. SchemaSubjectDTO dto =
  109. webTestClient
  110. .post()
  111. .uri("/api/clusters/{clusterName}/schemas", LOCAL)
  112. .contentType(MediaType.APPLICATION_JSON)
  113. .body(BodyInserters.fromValue(String.format(schema, subject)))
  114. .exchange()
  115. .expectStatus()
  116. .isOk()
  117. .expectBody(SchemaSubjectDTO.class)
  118. .returnResult()
  119. .getResponseBody();
  120. Assertions.assertNotNull(dto);
  121. Assertions.assertEquals("1", dto.getVersion());
  122. webTestClient
  123. .post()
  124. .uri("/api/clusters/{clusterName}/schemas", LOCAL)
  125. .contentType(MediaType.APPLICATION_JSON)
  126. .body(BodyInserters.fromValue(String.format(schema2, subject)))
  127. .exchange()
  128. .expectStatus().isEqualTo(HttpStatus.UNPROCESSABLE_ENTITY)
  129. .expectBody().consumeWith(body -> {
  130. String responseBody = new String(Objects.requireNonNull(body.getResponseBody()), StandardCharsets.UTF_8);
  131. MatcherAssert.assertThat("Must return correct message incompatible schema",
  132. responseBody,
  133. Matchers.containsString("Schema being registered is incompatible with an earlier schema"));
  134. });
  135. dto = webTestClient
  136. .get()
  137. .uri("/api/clusters/{clusterName}/schemas/{subject}/latest", LOCAL, subject)
  138. .exchange()
  139. .expectStatus().isOk()
  140. .expectBody(SchemaSubjectDTO.class)
  141. .returnResult()
  142. .getResponseBody();
  143. Assertions.assertNotNull(dto);
  144. Assertions.assertEquals("1", dto.getVersion());
  145. }
  146. @Test
  147. void shouldCreateNewProtobufSchema() {
  148. String schema =
  149. "syntax = \"proto3\";\n\nmessage MyRecord {\n int32 id = 1;\n string name = 2;\n}\n";
  150. NewSchemaSubjectDTO requestBody = new NewSchemaSubjectDTO()
  151. .schemaType(SchemaTypeDTO.PROTOBUF)
  152. .subject(subject)
  153. .schema(schema);
  154. SchemaSubjectDTO actual = webTestClient
  155. .post()
  156. .uri("/api/clusters/{clusterName}/schemas", LOCAL)
  157. .contentType(MediaType.APPLICATION_JSON)
  158. .body(BodyInserters.fromPublisher(Mono.just(requestBody), NewSchemaSubjectDTO.class))
  159. .exchange()
  160. .expectStatus()
  161. .isOk()
  162. .expectBody(SchemaSubjectDTO.class)
  163. .returnResult()
  164. .getResponseBody();
  165. Assertions.assertNotNull(actual);
  166. Assertions.assertEquals(CompatibilityLevelDTO.CompatibilityEnum.BACKWARD.name(),
  167. actual.getCompatibilityLevel());
  168. Assertions.assertEquals("1", actual.getVersion());
  169. Assertions.assertEquals(SchemaTypeDTO.PROTOBUF, actual.getSchemaType());
  170. Assertions.assertEquals(schema, actual.getSchema());
  171. }
  172. @Test
  173. public void shouldReturnBackwardAsGlobalCompatibilityLevelByDefault() {
  174. webTestClient
  175. .get()
  176. .uri("/api/clusters/{clusterName}/schemas/compatibility", LOCAL)
  177. .exchange()
  178. .expectStatus().isOk()
  179. .expectBody(CompatibilityLevelDTO.class)
  180. .consumeWith(result -> {
  181. CompatibilityLevelDTO responseBody = result.getResponseBody();
  182. Assertions.assertNotNull(responseBody);
  183. Assertions.assertEquals(CompatibilityLevelDTO.CompatibilityEnum.BACKWARD,
  184. responseBody.getCompatibility());
  185. });
  186. }
  187. @Test
  188. public void shouldReturnNotEmptyResponseWhenGetAllSchemas() {
  189. createNewSubjectAndAssert(subject);
  190. webTestClient
  191. .get()
  192. .uri("/api/clusters/{clusterName}/schemas", LOCAL)
  193. .exchange()
  194. .expectStatus().isOk()
  195. .expectBody(SchemaSubjectsResponseDTO.class)
  196. .consumeWith(result -> {
  197. SchemaSubjectsResponseDTO responseBody = result.getResponseBody();
  198. log.info("Response of test schemas: {}", responseBody);
  199. Assertions.assertNotNull(responseBody);
  200. Assertions.assertFalse(responseBody.getSchemas().isEmpty());
  201. SchemaSubjectDTO actualSchemaSubject = responseBody.getSchemas().stream()
  202. .filter(schemaSubject -> subject.equals(schemaSubject.getSubject()))
  203. .findFirst()
  204. .orElseThrow();
  205. Assertions.assertNotNull(actualSchemaSubject.getId());
  206. Assertions.assertNotNull(actualSchemaSubject.getVersion());
  207. Assertions.assertNotNull(actualSchemaSubject.getCompatibilityLevel());
  208. Assertions.assertEquals("\"string\"", actualSchemaSubject.getSchema());
  209. });
  210. }
  211. @Test
  212. public void shouldOkWhenCreateNewSchemaThenGetAndUpdateItsCompatibilityLevel() {
  213. createNewSubjectAndAssert(subject);
  214. //Get the created schema and check its items
  215. webTestClient
  216. .get()
  217. .uri("/api/clusters/{clusterName}/schemas/{subject}/latest", LOCAL, subject)
  218. .exchange()
  219. .expectStatus().isOk()
  220. .expectBodyList(SchemaSubjectDTO.class)
  221. .consumeWith(listEntityExchangeResult -> {
  222. val expectedCompatibility =
  223. CompatibilityLevelDTO.CompatibilityEnum.BACKWARD;
  224. assertSchemaWhenGetLatest(subject, listEntityExchangeResult, expectedCompatibility);
  225. });
  226. // Now let's change compatibility level of this schema to FULL whereas the global
  227. // level should be BACKWARD
  228. webTestClient.put()
  229. .uri("/api/clusters/{clusterName}/schemas/{subject}/compatibility", LOCAL, subject)
  230. .contentType(MediaType.APPLICATION_JSON)
  231. .body(BodyInserters.fromValue("{\"compatibility\":\"FULL\"}"))
  232. .exchange()
  233. .expectStatus().isOk();
  234. //Get one more time to check the schema compatibility level is changed to FULL
  235. webTestClient
  236. .get()
  237. .uri("/api/clusters/{clusterName}/schemas/{subject}/latest", LOCAL, subject)
  238. .exchange()
  239. .expectStatus().isOk()
  240. .expectBodyList(SchemaSubjectDTO.class)
  241. .consumeWith(listEntityExchangeResult -> {
  242. val expectedCompatibility =
  243. CompatibilityLevelDTO.CompatibilityEnum.FULL;
  244. assertSchemaWhenGetLatest(subject, listEntityExchangeResult, expectedCompatibility);
  245. });
  246. }
  247. @Test
  248. void shouldCreateNewSchemaWhenSubjectIncludesNonAsciiCharacters() {
  249. String schema =
  250. "{\"subject\":\"test/test\",\"schemaType\":\"JSON\",\"schema\":"
  251. + "\"{\\\"type\\\": \\\"string\\\"}\"}";
  252. webTestClient
  253. .post()
  254. .uri("/api/clusters/{clusterName}/schemas", LOCAL)
  255. .contentType(MediaType.APPLICATION_JSON)
  256. .body(BodyInserters.fromValue(schema))
  257. .exchange()
  258. .expectStatus().isOk();
  259. }
  260. private void createNewSubjectAndAssert(String subject) {
  261. webTestClient
  262. .post()
  263. .uri("/api/clusters/{clusterName}/schemas", LOCAL)
  264. .contentType(MediaType.APPLICATION_JSON)
  265. .body(BodyInserters.fromValue(
  266. String.format(
  267. "{\"subject\":\"%s\",\"schemaType\":\"AVRO\",\"schema\":"
  268. + "\"{\\\"type\\\": \\\"string\\\"}\"}",
  269. subject
  270. )
  271. ))
  272. .exchange()
  273. .expectStatus().isOk()
  274. .expectBody(SchemaSubjectDTO.class)
  275. .consumeWith(this::assertResponseBodyWhenCreateNewSchema);
  276. }
  277. private void assertSchemaWhenGetLatest(
  278. String subject, EntityExchangeResult<List<SchemaSubjectDTO>> listEntityExchangeResult,
  279. CompatibilityLevelDTO.CompatibilityEnum expectedCompatibility) {
  280. List<SchemaSubjectDTO> responseBody = listEntityExchangeResult.getResponseBody();
  281. Assertions.assertNotNull(responseBody);
  282. Assertions.assertEquals(1, responseBody.size());
  283. SchemaSubjectDTO actualSchema = responseBody.get(0);
  284. Assertions.assertNotNull(actualSchema);
  285. Assertions.assertEquals(subject, actualSchema.getSubject());
  286. Assertions.assertEquals("\"string\"", actualSchema.getSchema());
  287. Assertions.assertNotNull(actualSchema.getCompatibilityLevel());
  288. Assertions.assertEquals(SchemaTypeDTO.AVRO, actualSchema.getSchemaType());
  289. Assertions.assertEquals(expectedCompatibility.name(), actualSchema.getCompatibilityLevel());
  290. }
  291. private void assertResponseBodyWhenCreateNewSchema(
  292. EntityExchangeResult<SchemaSubjectDTO> exchangeResult) {
  293. SchemaSubjectDTO responseBody = exchangeResult.getResponseBody();
  294. Assertions.assertNotNull(responseBody);
  295. Assertions.assertEquals("1", responseBody.getVersion());
  296. Assertions.assertNotNull(responseBody.getSchema());
  297. Assertions.assertNotNull(responseBody.getSubject());
  298. Assertions.assertNotNull(responseBody.getCompatibilityLevel());
  299. Assertions.assertEquals(SchemaTypeDTO.AVRO, responseBody.getSchemaType());
  300. }
  301. }