SchemasController.java 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266
  1. package com.provectus.kafka.ui.controller;
  2. import com.provectus.kafka.ui.api.SchemasApi;
  3. import com.provectus.kafka.ui.exception.ValidationException;
  4. import com.provectus.kafka.ui.mapper.KafkaSrMapper;
  5. import com.provectus.kafka.ui.mapper.KafkaSrMapperImpl;
  6. import com.provectus.kafka.ui.model.CompatibilityCheckResponseDTO;
  7. import com.provectus.kafka.ui.model.CompatibilityLevelDTO;
  8. import com.provectus.kafka.ui.model.KafkaCluster;
  9. import com.provectus.kafka.ui.model.NewSchemaSubjectDTO;
  10. import com.provectus.kafka.ui.model.SchemaSubjectDTO;
  11. import com.provectus.kafka.ui.model.SchemaSubjectsResponseDTO;
  12. import com.provectus.kafka.ui.model.rbac.AccessContext;
  13. import com.provectus.kafka.ui.model.rbac.permission.SchemaAction;
  14. import com.provectus.kafka.ui.service.SchemaRegistryService;
  15. import com.provectus.kafka.ui.service.rbac.AccessControlService;
  16. import java.util.List;
  17. import java.util.stream.Collectors;
  18. import javax.validation.Valid;
  19. import lombok.RequiredArgsConstructor;
  20. import lombok.extern.slf4j.Slf4j;
  21. import org.apache.commons.lang3.StringUtils;
  22. import org.springframework.http.ResponseEntity;
  23. import org.springframework.web.bind.annotation.RestController;
  24. import org.springframework.web.server.ServerWebExchange;
  25. import reactor.core.publisher.Flux;
  26. import reactor.core.publisher.Mono;
  27. @RestController
  28. @RequiredArgsConstructor
  29. @Slf4j
  30. public class SchemasController extends AbstractController implements SchemasApi {
  31. private static final Integer DEFAULT_PAGE_SIZE = 25;
  32. private final KafkaSrMapper kafkaSrMapper = new KafkaSrMapperImpl();
  33. private final SchemaRegistryService schemaRegistryService;
  34. private final AccessControlService accessControlService;
  35. @Override
  36. protected KafkaCluster getCluster(String clusterName) {
  37. var c = super.getCluster(clusterName);
  38. if (c.getSchemaRegistryClient() == null) {
  39. throw new ValidationException("Schema Registry is not set for cluster " + clusterName);
  40. }
  41. return c;
  42. }
  43. @Override
  44. public Mono<ResponseEntity<CompatibilityCheckResponseDTO>> checkSchemaCompatibility(
  45. String clusterName, String subject, @Valid Mono<NewSchemaSubjectDTO> newSchemaSubjectMono,
  46. ServerWebExchange exchange) {
  47. Mono<Void> validateAccess = accessControlService.validateAccess(AccessContext.builder()
  48. .cluster(clusterName)
  49. .schema(subject)
  50. .schemaActions(SchemaAction.VIEW)
  51. .build());
  52. return validateAccess.then(
  53. newSchemaSubjectMono.flatMap(subjectDTO ->
  54. schemaRegistryService.checksSchemaCompatibility(
  55. getCluster(clusterName),
  56. subject,
  57. kafkaSrMapper.fromDto(subjectDTO)
  58. ))
  59. .map(kafkaSrMapper::toDto)
  60. .map(ResponseEntity::ok)
  61. );
  62. }
  63. @Override
  64. public Mono<ResponseEntity<SchemaSubjectDTO>> createNewSchema(
  65. String clusterName, @Valid Mono<NewSchemaSubjectDTO> newSchemaSubjectMono,
  66. ServerWebExchange exchange) {
  67. Mono<Void> validateAccess = accessControlService.validateAccess(AccessContext.builder()
  68. .cluster(clusterName)
  69. .schemaActions(SchemaAction.CREATE)
  70. .build());
  71. return validateAccess.then(
  72. newSchemaSubjectMono.flatMap(newSubject ->
  73. schemaRegistryService.registerNewSchema(
  74. getCluster(clusterName),
  75. newSubject.getSubject(),
  76. kafkaSrMapper.fromDto(newSubject)
  77. )
  78. ).map(kafkaSrMapper::toDto)
  79. .map(ResponseEntity::ok)
  80. );
  81. }
  82. @Override
  83. public Mono<ResponseEntity<Void>> deleteLatestSchema(
  84. String clusterName, String subject, ServerWebExchange exchange) {
  85. Mono<Void> validateAccess = accessControlService.validateAccess(AccessContext.builder()
  86. .cluster(clusterName)
  87. .schema(subject)
  88. .schemaActions(SchemaAction.DELETE)
  89. .build());
  90. return validateAccess.then(
  91. schemaRegistryService.deleteLatestSchemaSubject(getCluster(clusterName), subject)
  92. .thenReturn(ResponseEntity.ok().build())
  93. );
  94. }
  95. @Override
  96. public Mono<ResponseEntity<Void>> deleteSchema(
  97. String clusterName, String subject, ServerWebExchange exchange) {
  98. Mono<Void> validateAccess = accessControlService.validateAccess(AccessContext.builder()
  99. .cluster(clusterName)
  100. .schema(subject)
  101. .schemaActions(SchemaAction.DELETE)
  102. .build());
  103. return validateAccess.then(
  104. schemaRegistryService.deleteSchemaSubjectEntirely(getCluster(clusterName), subject)
  105. .thenReturn(ResponseEntity.ok().build())
  106. );
  107. }
  108. @Override
  109. public Mono<ResponseEntity<Void>> deleteSchemaByVersion(
  110. String clusterName, String subjectName, Integer version, ServerWebExchange exchange) {
  111. Mono<Void> validateAccess = accessControlService.validateAccess(AccessContext.builder()
  112. .cluster(clusterName)
  113. .schema(subjectName)
  114. .schemaActions(SchemaAction.DELETE)
  115. .build());
  116. return validateAccess.then(
  117. schemaRegistryService.deleteSchemaSubjectByVersion(getCluster(clusterName), subjectName, version)
  118. .thenReturn(ResponseEntity.ok().build())
  119. );
  120. }
  121. @Override
  122. public Mono<ResponseEntity<Flux<SchemaSubjectDTO>>> getAllVersionsBySubject(
  123. String clusterName, String subjectName, ServerWebExchange exchange) {
  124. Mono<Void> validateAccess = accessControlService.validateAccess(AccessContext.builder()
  125. .cluster(clusterName)
  126. .schema(subjectName)
  127. .schemaActions(SchemaAction.VIEW)
  128. .build());
  129. Flux<SchemaSubjectDTO> schemas =
  130. schemaRegistryService.getAllVersionsBySubject(getCluster(clusterName), subjectName)
  131. .map(kafkaSrMapper::toDto);
  132. return validateAccess.thenReturn(ResponseEntity.ok(schemas));
  133. }
  134. @Override
  135. public Mono<ResponseEntity<CompatibilityLevelDTO>> getGlobalSchemaCompatibilityLevel(
  136. String clusterName, ServerWebExchange exchange) {
  137. return schemaRegistryService.getGlobalSchemaCompatibilityLevel(getCluster(clusterName))
  138. .map(c -> new CompatibilityLevelDTO().compatibility(kafkaSrMapper.toDto(c)))
  139. .map(ResponseEntity::ok)
  140. .defaultIfEmpty(ResponseEntity.notFound().build());
  141. }
  142. @Override
  143. public Mono<ResponseEntity<SchemaSubjectDTO>> getLatestSchema(String clusterName,
  144. String subject,
  145. ServerWebExchange exchange) {
  146. Mono<Void> validateAccess = accessControlService.validateAccess(AccessContext.builder()
  147. .cluster(clusterName)
  148. .schema(subject)
  149. .schemaActions(SchemaAction.VIEW)
  150. .build());
  151. return validateAccess.then(
  152. schemaRegistryService.getLatestSchemaVersionBySubject(getCluster(clusterName), subject)
  153. .map(kafkaSrMapper::toDto)
  154. .map(ResponseEntity::ok)
  155. );
  156. }
  157. @Override
  158. public Mono<ResponseEntity<SchemaSubjectDTO>> getSchemaByVersion(
  159. String clusterName, String subject, Integer version, ServerWebExchange exchange) {
  160. Mono<Void> validateAccess = accessControlService.validateAccess(AccessContext.builder()
  161. .cluster(clusterName)
  162. .schema(subject)
  163. .schemaActions(SchemaAction.VIEW)
  164. .build());
  165. return validateAccess.then(
  166. schemaRegistryService.getSchemaSubjectByVersion(
  167. getCluster(clusterName), subject, version)
  168. .map(kafkaSrMapper::toDto)
  169. .map(ResponseEntity::ok)
  170. );
  171. }
  172. @Override
  173. public Mono<ResponseEntity<SchemaSubjectsResponseDTO>> getSchemas(String clusterName,
  174. @Valid Integer pageNum,
  175. @Valid Integer perPage,
  176. @Valid String search,
  177. ServerWebExchange serverWebExchange) {
  178. return schemaRegistryService
  179. .getAllSubjectNames(getCluster(clusterName))
  180. .flatMapIterable(l -> l)
  181. .filterWhen(schema -> accessControlService.isSchemaAccessible(schema, clusterName))
  182. .collectList()
  183. .flatMap(subjects -> {
  184. int pageSize = perPage != null && perPage > 0 ? perPage : DEFAULT_PAGE_SIZE;
  185. int subjectToSkip = ((pageNum != null && pageNum > 0 ? pageNum : 1) - 1) * pageSize;
  186. List<String> filteredSubjects = subjects
  187. .stream()
  188. .filter(subj -> search == null || StringUtils.containsIgnoreCase(subj, search))
  189. .sorted().toList();
  190. var totalPages = (filteredSubjects.size() / pageSize)
  191. + (filteredSubjects.size() % pageSize == 0 ? 0 : 1);
  192. List<String> subjectsToRender = filteredSubjects.stream()
  193. .skip(subjectToSkip)
  194. .limit(pageSize)
  195. .collect(Collectors.toList());
  196. return schemaRegistryService.getAllLatestVersionSchemas(getCluster(clusterName), subjectsToRender)
  197. .map(subjs -> subjs.stream().map(kafkaSrMapper::toDto).toList())
  198. .map(subjs -> new SchemaSubjectsResponseDTO().pageCount(totalPages).schemas(subjs));
  199. }).map(ResponseEntity::ok);
  200. }
  201. @Override
  202. public Mono<ResponseEntity<Void>> updateGlobalSchemaCompatibilityLevel(
  203. String clusterName, @Valid Mono<CompatibilityLevelDTO> compatibilityLevelMono,
  204. ServerWebExchange exchange) {
  205. Mono<Void> validateAccess = accessControlService.validateAccess(AccessContext.builder()
  206. .cluster(clusterName)
  207. .schemaActions(SchemaAction.MODIFY_GLOBAL_COMPATIBILITY)
  208. .build());
  209. return validateAccess.then(
  210. compatibilityLevelMono
  211. .flatMap(compatibilityLevelDTO ->
  212. schemaRegistryService.updateGlobalSchemaCompatibility(
  213. getCluster(clusterName),
  214. kafkaSrMapper.fromDto(compatibilityLevelDTO.getCompatibility())
  215. ))
  216. .thenReturn(ResponseEntity.ok().build())
  217. );
  218. }
  219. @Override
  220. public Mono<ResponseEntity<Void>> updateSchemaCompatibilityLevel(
  221. String clusterName, String subject, @Valid Mono<CompatibilityLevelDTO> compatibilityLevelMono,
  222. ServerWebExchange exchange) {
  223. Mono<Void> validateAccess = accessControlService.validateAccess(AccessContext.builder()
  224. .cluster(clusterName)
  225. .schemaActions(SchemaAction.EDIT)
  226. .build());
  227. return validateAccess.then(
  228. compatibilityLevelMono
  229. .flatMap(compatibilityLevelDTO ->
  230. schemaRegistryService.updateSchemaCompatibility(
  231. getCluster(clusterName),
  232. subject,
  233. kafkaSrMapper.fromDto(compatibilityLevelDTO.getCompatibility())
  234. ))
  235. .thenReturn(ResponseEntity.ok().build())
  236. );
  237. }
  238. }