SchemasController.java 7.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163
  1. package com.provectus.kafka.ui.controller;
  2. import com.provectus.kafka.ui.api.SchemasApi;
  3. import com.provectus.kafka.ui.exception.ValidationException;
  4. import com.provectus.kafka.ui.mapper.ClusterMapper;
  5. import com.provectus.kafka.ui.model.CompatibilityCheckResponseDTO;
  6. import com.provectus.kafka.ui.model.CompatibilityLevelDTO;
  7. import com.provectus.kafka.ui.model.KafkaCluster;
  8. import com.provectus.kafka.ui.model.NewSchemaSubjectDTO;
  9. import com.provectus.kafka.ui.model.SchemaSubjectDTO;
  10. import com.provectus.kafka.ui.model.SchemaSubjectsResponseDTO;
  11. import com.provectus.kafka.ui.service.SchemaRegistryService;
  12. import java.util.Arrays;
  13. import java.util.List;
  14. import java.util.stream.Collectors;
  15. import javax.validation.Valid;
  16. import lombok.RequiredArgsConstructor;
  17. import lombok.extern.slf4j.Slf4j;
  18. import org.apache.commons.lang3.StringUtils;
  19. import org.springframework.http.ResponseEntity;
  20. import org.springframework.web.bind.annotation.RestController;
  21. import org.springframework.web.server.ServerWebExchange;
  22. import reactor.core.publisher.Flux;
  23. import reactor.core.publisher.Mono;
  24. @RestController
  25. @RequiredArgsConstructor
  26. @Slf4j
  27. public class SchemasController extends AbstractController implements SchemasApi {
  28. private static final Integer DEFAULT_PAGE_SIZE = 25;
  29. private final ClusterMapper mapper;
  30. private final SchemaRegistryService schemaRegistryService;
  31. @Override
  32. protected KafkaCluster getCluster(String clusterName) {
  33. var c = super.getCluster(clusterName);
  34. if (c.getSchemaRegistry() == null) {
  35. throw new ValidationException("Schema Registry is not set for cluster " + clusterName);
  36. }
  37. return c;
  38. }
  39. @Override
  40. public Mono<ResponseEntity<CompatibilityCheckResponseDTO>> checkSchemaCompatibility(
  41. String clusterName, String subject, @Valid Mono<NewSchemaSubjectDTO> newSchemaSubject,
  42. ServerWebExchange exchange) {
  43. return schemaRegistryService.checksSchemaCompatibility(
  44. getCluster(clusterName), subject, newSchemaSubject)
  45. .map(mapper::toCompatibilityCheckResponse)
  46. .map(ResponseEntity::ok);
  47. }
  48. @Override
  49. public Mono<ResponseEntity<SchemaSubjectDTO>> createNewSchema(
  50. String clusterName, @Valid Mono<NewSchemaSubjectDTO> newSchemaSubject,
  51. ServerWebExchange exchange) {
  52. return schemaRegistryService
  53. .registerNewSchema(getCluster(clusterName), newSchemaSubject)
  54. .map(ResponseEntity::ok);
  55. }
  56. @Override
  57. public Mono<ResponseEntity<Void>> deleteLatestSchema(
  58. String clusterName, String subject, ServerWebExchange exchange) {
  59. return schemaRegistryService.deleteLatestSchemaSubject(getCluster(clusterName), subject)
  60. .thenReturn(ResponseEntity.ok().build());
  61. }
  62. @Override
  63. public Mono<ResponseEntity<Void>> deleteSchema(
  64. String clusterName, String subjectName, ServerWebExchange exchange) {
  65. return schemaRegistryService.deleteSchemaSubjectEntirely(getCluster(clusterName), subjectName)
  66. .thenReturn(ResponseEntity.ok().build());
  67. }
  68. @Override
  69. public Mono<ResponseEntity<Void>> deleteSchemaByVersion(
  70. String clusterName, String subjectName, Integer version, ServerWebExchange exchange) {
  71. return schemaRegistryService.deleteSchemaSubjectByVersion(getCluster(clusterName), subjectName, version)
  72. .thenReturn(ResponseEntity.ok().build());
  73. }
  74. @Override
  75. public Mono<ResponseEntity<Flux<SchemaSubjectDTO>>> getAllVersionsBySubject(
  76. String clusterName, String subjectName, ServerWebExchange exchange) {
  77. Flux<SchemaSubjectDTO> schemas =
  78. schemaRegistryService.getAllVersionsBySubject(getCluster(clusterName), subjectName);
  79. return Mono.just(ResponseEntity.ok(schemas));
  80. }
  81. @Override
  82. public Mono<ResponseEntity<CompatibilityLevelDTO>> getGlobalSchemaCompatibilityLevel(
  83. String clusterName, ServerWebExchange exchange) {
  84. return schemaRegistryService.getGlobalSchemaCompatibilityLevel(getCluster(clusterName))
  85. .map(mapper::toCompatibilityLevelDto)
  86. .map(ResponseEntity::ok)
  87. .defaultIfEmpty(ResponseEntity.notFound().build());
  88. }
  89. @Override
  90. public Mono<ResponseEntity<SchemaSubjectDTO>> getLatestSchema(String clusterName, String subject,
  91. ServerWebExchange exchange) {
  92. return schemaRegistryService.getLatestSchemaVersionBySubject(getCluster(clusterName), subject)
  93. .map(ResponseEntity::ok);
  94. }
  95. @Override
  96. public Mono<ResponseEntity<SchemaSubjectDTO>> getSchemaByVersion(
  97. String clusterName, String subject, Integer version, ServerWebExchange exchange) {
  98. return schemaRegistryService.getSchemaSubjectByVersion(
  99. getCluster(clusterName), subject, version)
  100. .map(ResponseEntity::ok);
  101. }
  102. @Override
  103. public Mono<ResponseEntity<SchemaSubjectsResponseDTO>> getSchemas(String clusterName,
  104. @Valid Integer pageNum,
  105. @Valid Integer perPage,
  106. @Valid String search,
  107. ServerWebExchange serverWebExchange) {
  108. return schemaRegistryService
  109. .getAllSubjectNames(getCluster(clusterName))
  110. .flatMap(subjects -> {
  111. int pageSize = perPage != null && perPage > 0 ? perPage : DEFAULT_PAGE_SIZE;
  112. int subjectToSkip = ((pageNum != null && pageNum > 0 ? pageNum : 1) - 1) * pageSize;
  113. List<String> filteredSubjects = Arrays.stream(subjects)
  114. .filter(subj -> search == null || StringUtils.containsIgnoreCase(subj, search))
  115. .sorted()
  116. .collect(Collectors.toList());
  117. var totalPages = (filteredSubjects.size() / pageSize)
  118. + (filteredSubjects.size() % pageSize == 0 ? 0 : 1);
  119. List<String> subjectsToRender = filteredSubjects.stream()
  120. .skip(subjectToSkip)
  121. .limit(pageSize)
  122. .collect(Collectors.toList());
  123. return schemaRegistryService.getAllLatestVersionSchemas(getCluster(clusterName), subjectsToRender)
  124. .map(a -> new SchemaSubjectsResponseDTO().pageCount(totalPages).schemas(a));
  125. }).map(ResponseEntity::ok);
  126. }
  127. @Override
  128. public Mono<ResponseEntity<Void>> updateGlobalSchemaCompatibilityLevel(
  129. String clusterName, @Valid Mono<CompatibilityLevelDTO> compatibilityLevel,
  130. ServerWebExchange exchange) {
  131. log.info("Updating schema compatibility globally");
  132. return schemaRegistryService.updateSchemaCompatibility(
  133. getCluster(clusterName), compatibilityLevel)
  134. .map(ResponseEntity::ok);
  135. }
  136. @Override
  137. public Mono<ResponseEntity<Void>> updateSchemaCompatibilityLevel(
  138. String clusterName, String subject, @Valid Mono<CompatibilityLevelDTO> compatibilityLevel,
  139. ServerWebExchange exchange) {
  140. log.info("Updating schema compatibility for subject: {}", subject);
  141. return schemaRegistryService.updateSchemaCompatibility(
  142. getCluster(clusterName), subject, compatibilityLevel)
  143. .map(ResponseEntity::ok);
  144. }
  145. }