SchemasController.java 4.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114
  1. package com.provectus.kafka.ui.controller;
  2. import com.provectus.kafka.ui.api.SchemasApi;
  3. import com.provectus.kafka.ui.model.CompatibilityCheckResponseDTO;
  4. import com.provectus.kafka.ui.model.CompatibilityLevelDTO;
  5. import com.provectus.kafka.ui.model.NewSchemaSubjectDTO;
  6. import com.provectus.kafka.ui.model.SchemaSubjectDTO;
  7. import com.provectus.kafka.ui.service.SchemaRegistryService;
  8. import javax.validation.Valid;
  9. import lombok.RequiredArgsConstructor;
  10. import lombok.extern.log4j.Log4j2;
  11. import org.springframework.http.ResponseEntity;
  12. import org.springframework.web.bind.annotation.RestController;
  13. import org.springframework.web.server.ServerWebExchange;
  14. import reactor.core.publisher.Flux;
  15. import reactor.core.publisher.Mono;
  16. @RestController
  17. @RequiredArgsConstructor
  18. @Log4j2
  19. public class SchemasController implements SchemasApi {
  20. private final SchemaRegistryService schemaRegistryService;
  21. @Override
  22. public Mono<ResponseEntity<CompatibilityCheckResponseDTO>> checkSchemaCompatibility(
  23. String clusterName, String subject, @Valid Mono<NewSchemaSubjectDTO> newSchemaSubject,
  24. ServerWebExchange exchange) {
  25. return schemaRegistryService.checksSchemaCompatibility(clusterName, subject, newSchemaSubject)
  26. .map(ResponseEntity::ok);
  27. }
  28. @Override
  29. public Mono<ResponseEntity<SchemaSubjectDTO>> createNewSchema(
  30. String clusterName, @Valid Mono<NewSchemaSubjectDTO> newSchemaSubject,
  31. ServerWebExchange exchange) {
  32. return schemaRegistryService
  33. .registerNewSchema(clusterName, newSchemaSubject)
  34. .map(ResponseEntity::ok);
  35. }
  36. @Override
  37. public Mono<ResponseEntity<Void>> deleteLatestSchema(
  38. String clusterName, String subject, ServerWebExchange exchange) {
  39. return schemaRegistryService.deleteLatestSchemaSubject(clusterName, subject);
  40. }
  41. @Override
  42. public Mono<ResponseEntity<Void>> deleteSchema(
  43. String clusterName, String subjectName, ServerWebExchange exchange) {
  44. return schemaRegistryService.deleteSchemaSubjectEntirely(clusterName, subjectName);
  45. }
  46. @Override
  47. public Mono<ResponseEntity<Void>> deleteSchemaByVersion(
  48. String clusterName, String subjectName, Integer version, ServerWebExchange exchange) {
  49. return schemaRegistryService.deleteSchemaSubjectByVersion(clusterName, subjectName, version);
  50. }
  51. @Override
  52. public Mono<ResponseEntity<Flux<SchemaSubjectDTO>>> getAllVersionsBySubject(
  53. String clusterName, String subjectName, ServerWebExchange exchange) {
  54. Flux<SchemaSubjectDTO> schemas =
  55. schemaRegistryService.getAllVersionsBySubject(clusterName, subjectName);
  56. return Mono.just(ResponseEntity.ok(schemas));
  57. }
  58. @Override
  59. public Mono<ResponseEntity<CompatibilityLevelDTO>> getGlobalSchemaCompatibilityLevel(
  60. String clusterName, ServerWebExchange exchange) {
  61. return schemaRegistryService.getGlobalSchemaCompatibilityLevel(clusterName)
  62. .map(ResponseEntity::ok)
  63. .defaultIfEmpty(ResponseEntity.notFound().build());
  64. }
  65. @Override
  66. public Mono<ResponseEntity<SchemaSubjectDTO>> getLatestSchema(String clusterName, String subject,
  67. ServerWebExchange exchange) {
  68. return schemaRegistryService.getLatestSchemaVersionBySubject(clusterName, subject)
  69. .map(ResponseEntity::ok);
  70. }
  71. @Override
  72. public Mono<ResponseEntity<SchemaSubjectDTO>> getSchemaByVersion(
  73. String clusterName, String subject, Integer version, ServerWebExchange exchange) {
  74. return schemaRegistryService.getSchemaSubjectByVersion(clusterName, subject, version)
  75. .map(ResponseEntity::ok);
  76. }
  77. @Override
  78. public Mono<ResponseEntity<Flux<SchemaSubjectDTO>>> getSchemas(String clusterName,
  79. ServerWebExchange exchange) {
  80. Flux<SchemaSubjectDTO> subjects = schemaRegistryService.getAllLatestVersionSchemas(clusterName);
  81. return Mono.just(ResponseEntity.ok(subjects));
  82. }
  83. @Override
  84. public Mono<ResponseEntity<Void>> updateGlobalSchemaCompatibilityLevel(
  85. String clusterName, @Valid Mono<CompatibilityLevelDTO> compatibilityLevel,
  86. ServerWebExchange exchange) {
  87. log.info("Updating schema compatibility globally");
  88. return schemaRegistryService.updateSchemaCompatibility(clusterName, compatibilityLevel)
  89. .map(ResponseEntity::ok);
  90. }
  91. @Override
  92. public Mono<ResponseEntity<Void>> updateSchemaCompatibilityLevel(
  93. String clusterName, String subject, @Valid Mono<CompatibilityLevelDTO> compatibilityLevel,
  94. ServerWebExchange exchange) {
  95. log.info("Updating schema compatibility for subject: {}", subject);
  96. return schemaRegistryService.updateSchemaCompatibility(clusterName, subject, compatibilityLevel)
  97. .map(ResponseEntity::ok);
  98. }
  99. }