SchemasController.java 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295
  1. package com.provectus.kafka.ui.controller;
  2. import com.provectus.kafka.ui.api.SchemasApi;
  3. import com.provectus.kafka.ui.exception.ValidationException;
  4. import com.provectus.kafka.ui.mapper.KafkaSrMapper;
  5. import com.provectus.kafka.ui.mapper.KafkaSrMapperImpl;
  6. import com.provectus.kafka.ui.model.CompatibilityCheckResponseDTO;
  7. import com.provectus.kafka.ui.model.CompatibilityLevelDTO;
  8. import com.provectus.kafka.ui.model.KafkaCluster;
  9. import com.provectus.kafka.ui.model.NewSchemaSubjectDTO;
  10. import com.provectus.kafka.ui.model.SchemaSubjectDTO;
  11. import com.provectus.kafka.ui.model.SchemaSubjectsResponseDTO;
  12. import com.provectus.kafka.ui.model.rbac.AccessContext;
  13. import com.provectus.kafka.ui.model.rbac.permission.SchemaAction;
  14. import com.provectus.kafka.ui.service.SchemaRegistryService;
  15. import com.provectus.kafka.ui.service.audit.AuditService;
  16. import com.provectus.kafka.ui.service.rbac.AccessControlService;
  17. import java.util.List;
  18. import java.util.Map;
  19. import java.util.stream.Collectors;
  20. import javax.validation.Valid;
  21. import lombok.RequiredArgsConstructor;
  22. import lombok.extern.slf4j.Slf4j;
  23. import org.apache.commons.lang3.StringUtils;
  24. import org.springframework.http.ResponseEntity;
  25. import org.springframework.web.bind.annotation.RestController;
  26. import org.springframework.web.server.ServerWebExchange;
  27. import reactor.core.publisher.Flux;
  28. import reactor.core.publisher.Mono;
  29. @RestController
  30. @RequiredArgsConstructor
  31. @Slf4j
  32. public class SchemasController extends AbstractController implements SchemasApi {
  33. private static final Integer DEFAULT_PAGE_SIZE = 25;
  34. private final KafkaSrMapper kafkaSrMapper = new KafkaSrMapperImpl();
  35. private final SchemaRegistryService schemaRegistryService;
  36. private final AccessControlService accessControlService;
  37. private final AuditService auditService;
  38. @Override
  39. protected KafkaCluster getCluster(String clusterName) {
  40. var c = super.getCluster(clusterName);
  41. if (c.getSchemaRegistryClient() == null) {
  42. throw new ValidationException("Schema Registry is not set for cluster " + clusterName);
  43. }
  44. return c;
  45. }
  46. @Override
  47. public Mono<ResponseEntity<CompatibilityCheckResponseDTO>> checkSchemaCompatibility(
  48. String clusterName, String subject, @Valid Mono<NewSchemaSubjectDTO> newSchemaSubjectMono,
  49. ServerWebExchange exchange) {
  50. var context = AccessContext.builder()
  51. .cluster(clusterName)
  52. .schema(subject)
  53. .schemaActions(SchemaAction.VIEW)
  54. .operationName("checkSchemaCompatibility")
  55. .build();
  56. return accessControlService.validateAccess(context).then(
  57. newSchemaSubjectMono.flatMap(subjectDTO ->
  58. schemaRegistryService.checksSchemaCompatibility(
  59. getCluster(clusterName),
  60. subject,
  61. kafkaSrMapper.fromDto(subjectDTO)
  62. ))
  63. .map(kafkaSrMapper::toDto)
  64. .map(ResponseEntity::ok)
  65. ).doOnEach(sig -> auditService.audit(context, sig));
  66. }
  67. @Override
  68. public Mono<ResponseEntity<SchemaSubjectDTO>> createNewSchema(
  69. String clusterName, @Valid Mono<NewSchemaSubjectDTO> newSchemaSubjectMono,
  70. ServerWebExchange exchange) {
  71. var context = AccessContext.builder()
  72. .cluster(clusterName)
  73. .schemaActions(SchemaAction.CREATE)
  74. .operationName("createNewSchema")
  75. .build();
  76. return accessControlService.validateAccess(context).then(
  77. newSchemaSubjectMono.flatMap(newSubject ->
  78. schemaRegistryService.registerNewSchema(
  79. getCluster(clusterName),
  80. newSubject.getSubject(),
  81. kafkaSrMapper.fromDto(newSubject)
  82. )
  83. ).map(kafkaSrMapper::toDto)
  84. .map(ResponseEntity::ok)
  85. ).doOnEach(sig -> auditService.audit(context, sig));
  86. }
  87. @Override
  88. public Mono<ResponseEntity<Void>> deleteLatestSchema(
  89. String clusterName, String subject, ServerWebExchange exchange) {
  90. var context = AccessContext.builder()
  91. .cluster(clusterName)
  92. .schema(subject)
  93. .schemaActions(SchemaAction.DELETE)
  94. .operationName("deleteLatestSchema")
  95. .build();
  96. return accessControlService.validateAccess(context).then(
  97. schemaRegistryService.deleteLatestSchemaSubject(getCluster(clusterName), subject)
  98. .doOnEach(sig -> auditService.audit(context, sig))
  99. .thenReturn(ResponseEntity.ok().build())
  100. );
  101. }
  102. @Override
  103. public Mono<ResponseEntity<Void>> deleteSchema(
  104. String clusterName, String subject, ServerWebExchange exchange) {
  105. var context = AccessContext.builder()
  106. .cluster(clusterName)
  107. .schema(subject)
  108. .schemaActions(SchemaAction.DELETE)
  109. .operationName("deleteSchema")
  110. .build();
  111. return accessControlService.validateAccess(context).then(
  112. schemaRegistryService.deleteSchemaSubjectEntirely(getCluster(clusterName), subject)
  113. .doOnEach(sig -> auditService.audit(context, sig))
  114. .thenReturn(ResponseEntity.ok().build())
  115. );
  116. }
  117. @Override
  118. public Mono<ResponseEntity<Void>> deleteSchemaByVersion(
  119. String clusterName, String subjectName, Integer version, ServerWebExchange exchange) {
  120. var context = AccessContext.builder()
  121. .cluster(clusterName)
  122. .schema(subjectName)
  123. .schemaActions(SchemaAction.DELETE)
  124. .operationName("deleteSchemaByVersion")
  125. .build();
  126. return accessControlService.validateAccess(context).then(
  127. schemaRegistryService.deleteSchemaSubjectByVersion(getCluster(clusterName), subjectName, version)
  128. .doOnEach(sig -> auditService.audit(context, sig))
  129. .thenReturn(ResponseEntity.ok().build())
  130. );
  131. }
  132. @Override
  133. public Mono<ResponseEntity<Flux<SchemaSubjectDTO>>> getAllVersionsBySubject(
  134. String clusterName, String subjectName, ServerWebExchange exchange) {
  135. var context = AccessContext.builder()
  136. .cluster(clusterName)
  137. .schema(subjectName)
  138. .schemaActions(SchemaAction.VIEW)
  139. .operationName("getAllVersionsBySubject")
  140. .build();
  141. Flux<SchemaSubjectDTO> schemas =
  142. schemaRegistryService.getAllVersionsBySubject(getCluster(clusterName), subjectName)
  143. .map(kafkaSrMapper::toDto);
  144. return accessControlService.validateAccess(context)
  145. .thenReturn(ResponseEntity.ok(schemas))
  146. .doOnEach(sig -> auditService.audit(context, sig));
  147. }
  148. @Override
  149. public Mono<ResponseEntity<CompatibilityLevelDTO>> getGlobalSchemaCompatibilityLevel(
  150. String clusterName, ServerWebExchange exchange) {
  151. return schemaRegistryService.getGlobalSchemaCompatibilityLevel(getCluster(clusterName))
  152. .map(c -> new CompatibilityLevelDTO().compatibility(kafkaSrMapper.toDto(c)))
  153. .map(ResponseEntity::ok)
  154. .defaultIfEmpty(ResponseEntity.notFound().build());
  155. }
  156. @Override
  157. public Mono<ResponseEntity<SchemaSubjectDTO>> getLatestSchema(String clusterName,
  158. String subject,
  159. ServerWebExchange exchange) {
  160. var context = AccessContext.builder()
  161. .cluster(clusterName)
  162. .schema(subject)
  163. .schemaActions(SchemaAction.VIEW)
  164. .operationName("getLatestSchema")
  165. .build();
  166. return accessControlService.validateAccess(context).then(
  167. schemaRegistryService.getLatestSchemaVersionBySubject(getCluster(clusterName), subject)
  168. .map(kafkaSrMapper::toDto)
  169. .map(ResponseEntity::ok)
  170. ).doOnEach(sig -> auditService.audit(context, sig));
  171. }
  172. @Override
  173. public Mono<ResponseEntity<SchemaSubjectDTO>> getSchemaByVersion(
  174. String clusterName, String subject, Integer version, ServerWebExchange exchange) {
  175. var context = AccessContext.builder()
  176. .cluster(clusterName)
  177. .schema(subject)
  178. .schemaActions(SchemaAction.VIEW)
  179. .operationName("getSchemaByVersion")
  180. .operationParams(Map.of("subject", subject, "version", version))
  181. .build();
  182. return accessControlService.validateAccess(context).then(
  183. schemaRegistryService.getSchemaSubjectByVersion(
  184. getCluster(clusterName), subject, version)
  185. .map(kafkaSrMapper::toDto)
  186. .map(ResponseEntity::ok)
  187. ).doOnEach(sig -> auditService.audit(context, sig));
  188. }
  189. @Override
  190. public Mono<ResponseEntity<SchemaSubjectsResponseDTO>> getSchemas(String clusterName,
  191. @Valid Integer pageNum,
  192. @Valid Integer perPage,
  193. @Valid String search,
  194. ServerWebExchange serverWebExchange) {
  195. var context = AccessContext.builder()
  196. .cluster(clusterName)
  197. .operationName("getSchemas")
  198. .build();
  199. return schemaRegistryService
  200. .getAllSubjectNames(getCluster(clusterName))
  201. .flatMapIterable(l -> l)
  202. .filterWhen(schema -> accessControlService.isSchemaAccessible(schema, clusterName))
  203. .collectList()
  204. .flatMap(subjects -> {
  205. int pageSize = perPage != null && perPage > 0 ? perPage : DEFAULT_PAGE_SIZE;
  206. int subjectToSkip = ((pageNum != null && pageNum > 0 ? pageNum : 1) - 1) * pageSize;
  207. List<String> filteredSubjects = subjects
  208. .stream()
  209. .filter(subj -> search == null || StringUtils.containsIgnoreCase(subj, search))
  210. .sorted().toList();
  211. var totalPages = (filteredSubjects.size() / pageSize)
  212. + (filteredSubjects.size() % pageSize == 0 ? 0 : 1);
  213. List<String> subjectsToRender = filteredSubjects.stream()
  214. .skip(subjectToSkip)
  215. .limit(pageSize)
  216. .collect(Collectors.toList());
  217. return schemaRegistryService.getAllLatestVersionSchemas(getCluster(clusterName), subjectsToRender)
  218. .map(subjs -> subjs.stream().map(kafkaSrMapper::toDto).toList())
  219. .map(subjs -> new SchemaSubjectsResponseDTO().pageCount(totalPages).schemas(subjs));
  220. }).map(ResponseEntity::ok)
  221. .doOnEach(sig -> auditService.audit(context, sig));
  222. }
  223. @Override
  224. public Mono<ResponseEntity<Void>> updateGlobalSchemaCompatibilityLevel(
  225. String clusterName, @Valid Mono<CompatibilityLevelDTO> compatibilityLevelMono,
  226. ServerWebExchange exchange) {
  227. var context = AccessContext.builder()
  228. .cluster(clusterName)
  229. .schemaActions(SchemaAction.MODIFY_GLOBAL_COMPATIBILITY)
  230. .operationName("updateGlobalSchemaCompatibilityLevel")
  231. .build();
  232. return accessControlService.validateAccess(context).then(
  233. compatibilityLevelMono
  234. .flatMap(compatibilityLevelDTO ->
  235. schemaRegistryService.updateGlobalSchemaCompatibility(
  236. getCluster(clusterName),
  237. kafkaSrMapper.fromDto(compatibilityLevelDTO.getCompatibility())
  238. ))
  239. .doOnEach(sig -> auditService.audit(context, sig))
  240. .thenReturn(ResponseEntity.ok().build())
  241. );
  242. }
  243. @Override
  244. public Mono<ResponseEntity<Void>> updateSchemaCompatibilityLevel(
  245. String clusterName, String subject, @Valid Mono<CompatibilityLevelDTO> compatibilityLevelMono,
  246. ServerWebExchange exchange) {
  247. var context = AccessContext.builder()
  248. .cluster(clusterName)
  249. .schemaActions(SchemaAction.EDIT)
  250. .operationName("updateSchemaCompatibilityLevel")
  251. .operationParams(Map.of("subject", subject))
  252. .build();
  253. return accessControlService.validateAccess(context).then(
  254. compatibilityLevelMono
  255. .flatMap(compatibilityLevelDTO ->
  256. schemaRegistryService.updateSchemaCompatibility(
  257. getCluster(clusterName),
  258. subject,
  259. kafkaSrMapper.fromDto(compatibilityLevelDTO.getCompatibility())
  260. ))
  261. .doOnEach(sig -> auditService.audit(context, sig))
  262. .thenReturn(ResponseEntity.ok().build())
  263. );
  264. }
  265. }