SchemaRegistryService.java 7.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173
  1. package com.provectus.kafka.ui.service;
  2. import com.fasterxml.jackson.core.type.TypeReference;
  3. import com.fasterxml.jackson.databind.json.JsonMapper;
  4. import com.provectus.kafka.ui.exception.SchemaCompatibilityException;
  5. import com.provectus.kafka.ui.exception.SchemaNotFoundException;
  6. import com.provectus.kafka.ui.exception.ValidationException;
  7. import com.provectus.kafka.ui.model.KafkaCluster;
  8. import com.provectus.kafka.ui.sr.api.KafkaSrClientApi;
  9. import com.provectus.kafka.ui.sr.model.Compatibility;
  10. import com.provectus.kafka.ui.sr.model.CompatibilityCheckResponse;
  11. import com.provectus.kafka.ui.sr.model.CompatibilityConfig;
  12. import com.provectus.kafka.ui.sr.model.CompatibilityLevelChange;
  13. import com.provectus.kafka.ui.sr.model.NewSubject;
  14. import com.provectus.kafka.ui.sr.model.SchemaSubject;
  15. import com.provectus.kafka.ui.util.ReactiveFailover;
  16. import java.nio.charset.Charset;
  17. import java.util.List;
  18. import java.util.stream.Collectors;
  19. import lombok.AllArgsConstructor;
  20. import lombok.Getter;
  21. import lombok.RequiredArgsConstructor;
  22. import lombok.SneakyThrows;
  23. import lombok.experimental.Delegate;
  24. import lombok.extern.slf4j.Slf4j;
  25. import org.springframework.stereotype.Service;
  26. import org.springframework.web.reactive.function.client.WebClientResponseException;
  27. import reactor.core.publisher.Flux;
  28. import reactor.core.publisher.Mono;
  29. @Service
  30. @Slf4j
  31. @RequiredArgsConstructor
  32. public class SchemaRegistryService {
  33. private static final String LATEST = "latest";
  34. @AllArgsConstructor
  35. public static class SubjectWithCompatibilityLevel {
  36. @Delegate
  37. SchemaSubject subject;
  38. @Getter
  39. Compatibility compatibility;
  40. }
  41. private ReactiveFailover<KafkaSrClientApi> api(KafkaCluster cluster) {
  42. return cluster.getSchemaRegistryClient();
  43. }
  44. public Mono<List<SubjectWithCompatibilityLevel>> getAllLatestVersionSchemas(KafkaCluster cluster,
  45. List<String> subjects) {
  46. return Flux.fromIterable(subjects)
  47. .concatMap(subject -> getLatestSchemaVersionBySubject(cluster, subject))
  48. .collect(Collectors.toList());
  49. }
  50. public Mono<List<String>> getAllSubjectNames(KafkaCluster cluster) {
  51. return api(cluster)
  52. .mono(c -> c.getAllSubjectNames(null, false))
  53. .flatMapIterable(this::parseSubjectListString)
  54. .collectList();
  55. }
  56. @SneakyThrows
  57. private List<String> parseSubjectListString(String subjectNamesStr) {
  58. //workaround for https://github.com/spring-projects/spring-framework/issues/24734
  59. return new JsonMapper().readValue(subjectNamesStr, new TypeReference<List<String>>() {
  60. });
  61. }
  62. public Flux<SubjectWithCompatibilityLevel> getAllVersionsBySubject(KafkaCluster cluster, String subject) {
  63. Flux<Integer> versions = getSubjectVersions(cluster, subject);
  64. return versions.flatMap(version -> getSchemaSubjectByVersion(cluster, subject, version));
  65. }
  66. private Flux<Integer> getSubjectVersions(KafkaCluster cluster, String schemaName) {
  67. return api(cluster).flux(c -> c.getSubjectVersions(schemaName));
  68. }
  69. public Mono<SubjectWithCompatibilityLevel> getSchemaSubjectByVersion(KafkaCluster cluster,
  70. String schemaName,
  71. Integer version) {
  72. return getSchemaSubject(cluster, schemaName, String.valueOf(version));
  73. }
  74. public Mono<SubjectWithCompatibilityLevel> getLatestSchemaVersionBySubject(KafkaCluster cluster,
  75. String schemaName) {
  76. return getSchemaSubject(cluster, schemaName, LATEST);
  77. }
  78. private Mono<SubjectWithCompatibilityLevel> getSchemaSubject(KafkaCluster cluster, String schemaName,
  79. String version) {
  80. return api(cluster)
  81. .mono(c -> c.getSubjectVersion(schemaName, version))
  82. .zipWith(getSchemaCompatibilityInfoOrGlobal(cluster, schemaName))
  83. .map(t -> new SubjectWithCompatibilityLevel(t.getT1(), t.getT2()))
  84. .onErrorResume(WebClientResponseException.NotFound.class, th -> Mono.error(new SchemaNotFoundException()));
  85. }
  86. public Mono<Void> deleteSchemaSubjectByVersion(KafkaCluster cluster, String schemaName, Integer version) {
  87. return deleteSchemaSubject(cluster, schemaName, String.valueOf(version));
  88. }
  89. public Mono<Void> deleteLatestSchemaSubject(KafkaCluster cluster, String schemaName) {
  90. return deleteSchemaSubject(cluster, schemaName, LATEST);
  91. }
  92. private Mono<Void> deleteSchemaSubject(KafkaCluster cluster, String schemaName, String version) {
  93. return api(cluster).mono(c -> c.deleteSubjectVersion(schemaName, version, false));
  94. }
  95. public Mono<Void> deleteSchemaSubjectEntirely(KafkaCluster cluster, String schemaName) {
  96. return api(cluster).mono(c -> c.deleteAllSubjectVersions(schemaName, false));
  97. }
  98. /**
  99. * Checks whether the provided schema duplicates the previous or not, creates a new schema
  100. * and then returns the whole content by requesting its latest version.
  101. */
  102. public Mono<SubjectWithCompatibilityLevel> registerNewSchema(KafkaCluster cluster,
  103. String subject,
  104. NewSubject newSchemaSubject) {
  105. return api(cluster)
  106. .mono(c -> c.registerNewSchema(subject, newSchemaSubject))
  107. .onErrorMap(WebClientResponseException.Conflict.class,
  108. th -> new SchemaCompatibilityException())
  109. .onErrorMap(WebClientResponseException.UnprocessableEntity.class,
  110. th -> new ValidationException("Invalid schema"))
  111. .then(getLatestSchemaVersionBySubject(cluster, subject));
  112. }
  113. public Mono<Void> updateSchemaCompatibility(KafkaCluster cluster,
  114. String schemaName,
  115. Compatibility compatibility) {
  116. return api(cluster)
  117. .mono(c -> c.updateSubjectCompatibilityLevel(
  118. schemaName, new CompatibilityLevelChange().compatibility(compatibility)))
  119. .then();
  120. }
  121. public Mono<Void> updateGlobalSchemaCompatibility(KafkaCluster cluster,
  122. Compatibility compatibility) {
  123. return api(cluster)
  124. .mono(c -> c.updateGlobalCompatibilityLevel(new CompatibilityLevelChange().compatibility(compatibility)))
  125. .then();
  126. }
  127. public Mono<Compatibility> getSchemaCompatibilityLevel(KafkaCluster cluster,
  128. String schemaName) {
  129. return api(cluster)
  130. .mono(c -> c.getSubjectCompatibilityLevel(schemaName, true))
  131. .map(CompatibilityConfig::getCompatibilityLevel)
  132. .onErrorResume(error -> Mono.empty());
  133. }
  134. public Mono<Compatibility> getGlobalSchemaCompatibilityLevel(KafkaCluster cluster) {
  135. return api(cluster)
  136. .mono(KafkaSrClientApi::getGlobalCompatibilityLevel)
  137. .map(CompatibilityConfig::getCompatibilityLevel);
  138. }
  139. private Mono<Compatibility> getSchemaCompatibilityInfoOrGlobal(KafkaCluster cluster,
  140. String schemaName) {
  141. return getSchemaCompatibilityLevel(cluster, schemaName)
  142. .switchIfEmpty(this.getGlobalSchemaCompatibilityLevel(cluster));
  143. }
  144. public Mono<CompatibilityCheckResponse> checksSchemaCompatibility(KafkaCluster cluster,
  145. String schemaName,
  146. NewSubject newSchemaSubject) {
  147. return api(cluster).mono(c -> c.checkSchemaCompatibility(schemaName, LATEST, true, newSchemaSubject));
  148. }
  149. }