SchemaRegistryService.java 19 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460
  1. package com.provectus.kafka.ui.service;
  2. import static org.springframework.http.HttpStatus.CONFLICT;
  3. import static org.springframework.http.HttpStatus.NOT_FOUND;
  4. import static org.springframework.http.HttpStatus.UNPROCESSABLE_ENTITY;
  5. import com.provectus.kafka.ui.exception.SchemaCompatibilityException;
  6. import com.provectus.kafka.ui.exception.SchemaFailedToDeleteException;
  7. import com.provectus.kafka.ui.exception.SchemaNotFoundException;
  8. import com.provectus.kafka.ui.exception.SchemaTypeNotSupportedException;
  9. import com.provectus.kafka.ui.exception.UnprocessableEntityException;
  10. import com.provectus.kafka.ui.exception.ValidationException;
  11. import com.provectus.kafka.ui.model.CompatibilityLevelDTO;
  12. import com.provectus.kafka.ui.model.InternalSchemaRegistry;
  13. import com.provectus.kafka.ui.model.KafkaCluster;
  14. import com.provectus.kafka.ui.model.NewSchemaSubjectDTO;
  15. import com.provectus.kafka.ui.model.SchemaSubjectDTO;
  16. import com.provectus.kafka.ui.model.SchemaTypeDTO;
  17. import com.provectus.kafka.ui.model.schemaregistry.ErrorResponse;
  18. import com.provectus.kafka.ui.model.schemaregistry.InternalCompatibilityCheck;
  19. import com.provectus.kafka.ui.model.schemaregistry.InternalCompatibilityLevel;
  20. import com.provectus.kafka.ui.model.schemaregistry.InternalNewSchema;
  21. import com.provectus.kafka.ui.model.schemaregistry.SubjectIdResponse;
  22. import com.provectus.kafka.ui.util.SecuredWebClient;
  23. import java.io.IOException;
  24. import java.net.URI;
  25. import java.util.Arrays;
  26. import java.util.Collections;
  27. import java.util.Formatter;
  28. import java.util.List;
  29. import java.util.Objects;
  30. import java.util.Optional;
  31. import java.util.function.Function;
  32. import java.util.function.Supplier;
  33. import java.util.stream.Collectors;
  34. import lombok.RequiredArgsConstructor;
  35. import lombok.extern.slf4j.Slf4j;
  36. import org.jetbrains.annotations.NotNull;
  37. import org.jetbrains.annotations.Nullable;
  38. import org.springframework.http.HttpHeaders;
  39. import org.springframework.http.HttpMethod;
  40. import org.springframework.http.HttpStatus;
  41. import org.springframework.http.MediaType;
  42. import org.springframework.stereotype.Service;
  43. import org.springframework.util.LinkedMultiValueMap;
  44. import org.springframework.util.MultiValueMap;
  45. import org.springframework.web.reactive.function.BodyInserters;
  46. import org.springframework.web.reactive.function.client.ClientResponse;
  47. import org.springframework.web.reactive.function.client.WebClient;
  48. import org.springframework.web.reactive.function.client.WebClientRequestException;
  49. import org.springframework.web.util.UriComponentsBuilder;
  50. import reactor.core.publisher.Flux;
  51. import reactor.core.publisher.Mono;
  52. @Service
  53. @Slf4j
  54. @RequiredArgsConstructor
  55. public class SchemaRegistryService {
  56. public static final String NO_SUCH_SCHEMA_VERSION = "No such schema %s with version %s";
  57. public static final String NO_SUCH_SCHEMA = "No such schema %s";
  58. private static final String URL_SUBJECTS = "/subjects";
  59. private static final String URL_SUBJECT = "/subjects/{schemaName}";
  60. private static final String URL_SUBJECT_VERSIONS = "/subjects/{schemaName}/versions";
  61. private static final String URL_SUBJECT_BY_VERSION = "/subjects/{schemaName}/versions/{version}";
  62. private static final String LATEST = "latest";
  63. private static final String UNRECOGNIZED_FIELD_SCHEMA_TYPE = "Unrecognized field: schemaType";
  64. private static final String INCOMPATIBLE_WITH_AN_EARLIER_SCHEMA = "incompatible with an earlier schema";
  65. private static final String INVALID_SCHEMA = "Invalid Schema";
  66. public Mono<List<SchemaSubjectDTO>> getAllLatestVersionSchemas(KafkaCluster cluster,
  67. List<String> subjects) {
  68. return Flux.fromIterable(subjects)
  69. .concatMap(subject -> getLatestSchemaVersionBySubject(cluster, subject))
  70. .collect(Collectors.toList());
  71. }
  72. public Mono<String[]> getAllSubjectNames(KafkaCluster cluster) {
  73. return configuredWebClient(
  74. cluster,
  75. HttpMethod.GET,
  76. URL_SUBJECTS)
  77. .retrieve()
  78. .bodyToMono(String[].class)
  79. .doOnError(e -> log.error("Unexpected error", e))
  80. .as(m -> failoverAble(m,
  81. new FailoverMono<>(cluster.getSchemaRegistry(), () -> this.getAllSubjectNames(cluster))));
  82. }
  83. public Flux<SchemaSubjectDTO> getAllVersionsBySubject(KafkaCluster cluster, String subject) {
  84. Flux<Integer> versions = getSubjectVersions(cluster, subject);
  85. return versions.flatMap(version -> getSchemaSubjectByVersion(cluster, subject, version));
  86. }
  87. private Flux<Integer> getSubjectVersions(KafkaCluster cluster, String schemaName) {
  88. return configuredWebClient(
  89. cluster,
  90. HttpMethod.GET,
  91. URL_SUBJECT_VERSIONS,
  92. schemaName)
  93. .retrieve()
  94. .onStatus(NOT_FOUND::equals,
  95. throwIfNotFoundStatus(formatted(NO_SUCH_SCHEMA, schemaName)))
  96. .bodyToFlux(Integer.class)
  97. .as(f -> failoverAble(f, new FailoverFlux<>(cluster.getSchemaRegistry(),
  98. () -> this.getSubjectVersions(cluster, schemaName))));
  99. }
  100. public Mono<SchemaSubjectDTO> getSchemaSubjectByVersion(KafkaCluster cluster, String schemaName,
  101. Integer version) {
  102. return this.getSchemaSubject(cluster, schemaName, String.valueOf(version));
  103. }
  104. public Mono<SchemaSubjectDTO> getLatestSchemaVersionBySubject(KafkaCluster cluster,
  105. String schemaName) {
  106. return this.getSchemaSubject(cluster, schemaName, LATEST);
  107. }
  108. private Mono<SchemaSubjectDTO> getSchemaSubject(KafkaCluster cluster, String schemaName,
  109. String version) {
  110. return configuredWebClient(
  111. cluster,
  112. HttpMethod.GET,
  113. SchemaRegistryService.URL_SUBJECT_BY_VERSION,
  114. List.of(schemaName, version))
  115. .retrieve()
  116. .onStatus(NOT_FOUND::equals,
  117. throwIfNotFoundStatus(formatted(NO_SUCH_SCHEMA_VERSION, schemaName, version))
  118. )
  119. .bodyToMono(SchemaSubjectDTO.class)
  120. .map(this::withSchemaType)
  121. .zipWith(getSchemaCompatibilityInfoOrGlobal(cluster, schemaName))
  122. .map(tuple -> {
  123. SchemaSubjectDTO schema = tuple.getT1();
  124. String compatibilityLevel = tuple.getT2().getCompatibilityLevel();
  125. schema.setCompatibilityLevel(compatibilityLevel);
  126. return schema;
  127. })
  128. .as(m -> failoverAble(m, new FailoverMono<>(cluster.getSchemaRegistry(),
  129. () -> this.getSchemaSubject(cluster, schemaName, version))));
  130. }
  131. /**
  132. * If {@link SchemaSubjectDTO#getSchemaType()} is null, then AVRO, otherwise,
  133. * adds the schema type as is.
  134. */
  135. @NotNull
  136. private SchemaSubjectDTO withSchemaType(SchemaSubjectDTO s) {
  137. return s.schemaType(Optional.ofNullable(s.getSchemaType()).orElse(SchemaTypeDTO.AVRO));
  138. }
  139. public Mono<Void> deleteSchemaSubjectByVersion(KafkaCluster cluster,
  140. String schemaName,
  141. Integer version) {
  142. return this.deleteSchemaSubject(cluster, schemaName, String.valueOf(version));
  143. }
  144. public Mono<Void> deleteLatestSchemaSubject(KafkaCluster cluster,
  145. String schemaName) {
  146. return this.deleteSchemaSubject(cluster, schemaName, LATEST);
  147. }
  148. private Mono<Void> deleteSchemaSubject(KafkaCluster cluster, String schemaName,
  149. String version) {
  150. return configuredWebClient(
  151. cluster,
  152. HttpMethod.DELETE,
  153. SchemaRegistryService.URL_SUBJECT_BY_VERSION,
  154. List.of(schemaName, version))
  155. .retrieve()
  156. .onStatus(NOT_FOUND::equals,
  157. throwIfNotFoundStatus(formatted(NO_SUCH_SCHEMA_VERSION, schemaName, version))
  158. )
  159. .toBodilessEntity()
  160. .then()
  161. .as(m -> failoverAble(m, new FailoverMono<>(cluster.getSchemaRegistry(),
  162. () -> this.deleteSchemaSubject(cluster, schemaName, version))));
  163. }
  164. public Mono<Void> deleteSchemaSubjectEntirely(KafkaCluster cluster,
  165. String schemaName) {
  166. return configuredWebClient(
  167. cluster,
  168. HttpMethod.DELETE,
  169. URL_SUBJECT,
  170. schemaName)
  171. .retrieve()
  172. .onStatus(HttpStatus::isError, errorOnSchemaDeleteFailure(schemaName))
  173. .toBodilessEntity()
  174. .then()
  175. .as(m -> failoverAble(m, new FailoverMono<>(cluster.getSchemaRegistry(),
  176. () -> this.deleteSchemaSubjectEntirely(cluster, schemaName))));
  177. }
  178. /**
  179. * Checks whether the provided schema duplicates the previous or not, creates a new schema
  180. * and then returns the whole content by requesting its latest version.
  181. */
  182. public Mono<SchemaSubjectDTO> registerNewSchema(KafkaCluster cluster,
  183. NewSchemaSubjectDTO dto) {
  184. SchemaTypeDTO schemaType = SchemaTypeDTO.AVRO == dto.getSchemaType() ? null : dto.getSchemaType();
  185. Mono<InternalNewSchema> newSchema = Mono.just(new InternalNewSchema(dto.getSchema(), schemaType));
  186. String subject = dto.getSubject();
  187. return submitNewSchema(subject, newSchema, cluster)
  188. .flatMap(resp -> getLatestSchemaVersionBySubject(cluster, subject));
  189. }
  190. @NotNull
  191. private Mono<SubjectIdResponse> submitNewSchema(String subject,
  192. Mono<InternalNewSchema> newSchemaSubject,
  193. KafkaCluster cluster) {
  194. return configuredWebClient(
  195. cluster,
  196. HttpMethod.POST,
  197. URL_SUBJECT_VERSIONS, subject)
  198. .contentType(MediaType.APPLICATION_JSON)
  199. .body(BodyInserters.fromPublisher(newSchemaSubject, InternalNewSchema.class))
  200. .retrieve()
  201. .onStatus(status -> UNPROCESSABLE_ENTITY.equals(status) || CONFLICT.equals(status),
  202. r -> r.bodyToMono(ErrorResponse.class)
  203. .flatMap(this::getMonoError))
  204. .bodyToMono(SubjectIdResponse.class)
  205. .as(m -> failoverAble(m, new FailoverMono<>(cluster.getSchemaRegistry(),
  206. () -> submitNewSchema(subject, newSchemaSubject, cluster))));
  207. }
  208. @NotNull
  209. private Mono<Throwable> getMonoError(ErrorResponse x) {
  210. if (isUnrecognizedFieldSchemaTypeMessage(x.getMessage())) {
  211. return Mono.error(new SchemaTypeNotSupportedException());
  212. } else if (isIncompatibleSchemaMessage(x.getMessage())) {
  213. return Mono.error(new SchemaCompatibilityException(x.getMessage()));
  214. } else {
  215. log.error(x.getMessage());
  216. return Mono.error(new UnprocessableEntityException(INVALID_SCHEMA));
  217. }
  218. }
  219. @NotNull
  220. private Function<ClientResponse, Mono<? extends Throwable>> throwIfNotFoundStatus(
  221. String formatted) {
  222. return resp -> Mono.error(new SchemaNotFoundException(formatted));
  223. }
  224. /**
  225. * Updates a compatibility level for a <code>schemaName</code>.
  226. *
  227. * @param schemaName is a schema subject name
  228. * @see com.provectus.kafka.ui.model.CompatibilityLevelDTO.CompatibilityEnum
  229. */
  230. public Mono<Void> updateSchemaCompatibility(KafkaCluster cluster, @Nullable String schemaName,
  231. Mono<CompatibilityLevelDTO> compatibilityLevel) {
  232. String configEndpoint = Objects.isNull(schemaName) ? "/config" : "/config/{schemaName}";
  233. return configuredWebClient(
  234. cluster,
  235. HttpMethod.PUT,
  236. configEndpoint,
  237. schemaName)
  238. .contentType(MediaType.APPLICATION_JSON)
  239. .body(BodyInserters.fromPublisher(compatibilityLevel, CompatibilityLevelDTO.class))
  240. .retrieve()
  241. .onStatus(NOT_FOUND::equals,
  242. throwIfNotFoundStatus(formatted(NO_SUCH_SCHEMA, schemaName)))
  243. .bodyToMono(Void.class)
  244. .as(m -> failoverAble(m, new FailoverMono<>(cluster.getSchemaRegistry(),
  245. () -> this.updateSchemaCompatibility(cluster, schemaName, compatibilityLevel))));
  246. }
  247. public Mono<Void> updateSchemaCompatibility(KafkaCluster cluster,
  248. Mono<CompatibilityLevelDTO> compatibilityLevel) {
  249. return updateSchemaCompatibility(cluster, null, compatibilityLevel);
  250. }
  251. public Mono<InternalCompatibilityLevel> getSchemaCompatibilityLevel(KafkaCluster cluster,
  252. String schemaName) {
  253. String globalConfig = Objects.isNull(schemaName) ? "/config" : "/config/{schemaName}";
  254. final var values = new LinkedMultiValueMap<String, String>();
  255. values.add("defaultToGlobal", "true");
  256. return configuredWebClient(
  257. cluster,
  258. HttpMethod.GET,
  259. globalConfig,
  260. (schemaName == null ? Collections.emptyList() : List.of(schemaName)),
  261. values)
  262. .retrieve()
  263. .bodyToMono(InternalCompatibilityLevel.class)
  264. .onErrorResume(error -> Mono.empty());
  265. }
  266. public Mono<InternalCompatibilityLevel> getGlobalSchemaCompatibilityLevel(KafkaCluster cluster) {
  267. return this.getSchemaCompatibilityLevel(cluster, null);
  268. }
  269. private Mono<InternalCompatibilityLevel> getSchemaCompatibilityInfoOrGlobal(KafkaCluster cluster,
  270. String schemaName) {
  271. return this.getSchemaCompatibilityLevel(cluster, schemaName)
  272. .switchIfEmpty(this.getGlobalSchemaCompatibilityLevel(cluster));
  273. }
  274. public Mono<InternalCompatibilityCheck> checksSchemaCompatibility(
  275. KafkaCluster cluster, String schemaName, Mono<NewSchemaSubjectDTO> newSchemaSubject) {
  276. return configuredWebClient(
  277. cluster,
  278. HttpMethod.POST,
  279. "/compatibility/subjects/{schemaName}/versions/latest",
  280. schemaName)
  281. .contentType(MediaType.APPLICATION_JSON)
  282. .body(BodyInserters.fromPublisher(newSchemaSubject, NewSchemaSubjectDTO.class))
  283. .retrieve()
  284. .onStatus(NOT_FOUND::equals,
  285. throwIfNotFoundStatus(formatted(NO_SUCH_SCHEMA, schemaName)))
  286. .bodyToMono(InternalCompatibilityCheck.class)
  287. .as(m -> failoverAble(m, new FailoverMono<>(cluster.getSchemaRegistry(),
  288. () -> this.checksSchemaCompatibility(cluster, schemaName, newSchemaSubject))));
  289. }
  290. public String formatted(String str, Object... args) {
  291. try (Formatter formatter = new Formatter()) {
  292. return formatter.format(str, args).toString();
  293. }
  294. }
  295. private void setBasicAuthIfEnabled(InternalSchemaRegistry schemaRegistry, HttpHeaders headers) {
  296. if (schemaRegistry.getUsername() != null && schemaRegistry.getPassword() != null) {
  297. headers.setBasicAuth(
  298. schemaRegistry.getUsername(),
  299. schemaRegistry.getPassword()
  300. );
  301. } else if (schemaRegistry.getUsername() != null) {
  302. throw new ValidationException(
  303. "You specified username but did not specify password");
  304. } else if (schemaRegistry.getPassword() != null) {
  305. throw new ValidationException(
  306. "You specified password but did not specify username");
  307. }
  308. }
  309. private boolean isUnrecognizedFieldSchemaTypeMessage(String errorMessage) {
  310. return errorMessage.contains(UNRECOGNIZED_FIELD_SCHEMA_TYPE);
  311. }
  312. private boolean isIncompatibleSchemaMessage(String message) {
  313. return message.contains(INCOMPATIBLE_WITH_AN_EARLIER_SCHEMA);
  314. }
  315. private WebClient.RequestBodySpec configuredWebClient(KafkaCluster cluster, HttpMethod method,
  316. String uri) {
  317. return configuredWebClient(cluster, method, uri, Collections.emptyList(),
  318. new LinkedMultiValueMap<>());
  319. }
  320. private WebClient.RequestBodySpec configuredWebClient(KafkaCluster cluster, HttpMethod method,
  321. String uri, List<String> uriVariables) {
  322. return configuredWebClient(cluster, method, uri, uriVariables, new LinkedMultiValueMap<>());
  323. }
  324. private WebClient.RequestBodySpec configuredWebClient(KafkaCluster cluster, HttpMethod method,
  325. String uri, @Nullable String uriVariable) {
  326. List<String> uriVariables = uriVariable == null ? Collections.emptyList() : List.of(uriVariable);
  327. return configuredWebClient(cluster, method, uri, uriVariables, new LinkedMultiValueMap<>());
  328. }
  329. private WebClient.RequestBodySpec configuredWebClient(KafkaCluster cluster,
  330. HttpMethod method, String path,
  331. List<String> uriVariables,
  332. MultiValueMap<String, String> queryParams) {
  333. final var schemaRegistry = cluster.getSchemaRegistry();
  334. try {
  335. WebClient.Builder schemaRegistryWebClient = SecuredWebClient.configure(
  336. schemaRegistry.getKeystoreLocation(),
  337. schemaRegistry.getKeystorePassword(),
  338. schemaRegistry.getTruststoreLocation(),
  339. schemaRegistry.getTruststorePassword()
  340. );
  341. return schemaRegistryWebClient.build()
  342. .method(method)
  343. .uri(buildUri(schemaRegistry, path, uriVariables, queryParams))
  344. .headers(headers -> setBasicAuthIfEnabled(schemaRegistry, headers));
  345. } catch (Exception e) {
  346. throw new IllegalStateException(
  347. "cannot create TLS configuration for schema-registry in cluster " + cluster.getName(), e);
  348. }
  349. }
  350. private URI buildUri(InternalSchemaRegistry schemaRegistry, String path, List<String> uriVariables,
  351. MultiValueMap<String, String> queryParams) {
  352. final var builder = UriComponentsBuilder
  353. .fromHttpUrl(schemaRegistry.getUri() + path);
  354. builder.queryParams(queryParams);
  355. return builder.build(uriVariables.toArray());
  356. }
  357. private Function<ClientResponse, Mono<? extends Throwable>> errorOnSchemaDeleteFailure(String schemaName) {
  358. return resp -> {
  359. if (NOT_FOUND.equals(resp.statusCode())) {
  360. return Mono.error(new SchemaNotFoundException(schemaName));
  361. }
  362. return Mono.error(new SchemaFailedToDeleteException(schemaName));
  363. };
  364. }
  365. private <T> Mono<T> failoverAble(Mono<T> request, FailoverMono<T> failoverMethod) {
  366. return request.onErrorResume(failoverMethod::failover);
  367. }
  368. private <T> Flux<T> failoverAble(Flux<T> request, FailoverFlux<T> failoverMethod) {
  369. return request.onErrorResume(failoverMethod::failover);
  370. }
  371. private abstract static class Failover<E> {
  372. private final InternalSchemaRegistry schemaRegistry;
  373. private final Supplier<E> failover;
  374. private Failover(InternalSchemaRegistry schemaRegistry, Supplier<E> failover) {
  375. this.schemaRegistry = Objects.requireNonNull(schemaRegistry);
  376. this.failover = Objects.requireNonNull(failover);
  377. }
  378. abstract E error(Throwable error);
  379. public E failover(Throwable error) {
  380. if (error instanceof WebClientRequestException
  381. && error.getCause() instanceof IOException
  382. && schemaRegistry.isFailoverAvailable()) {
  383. var uri = ((WebClientRequestException) error).getUri();
  384. schemaRegistry.markAsUnavailable(String.format("%s://%s", uri.getScheme(), uri.getAuthority()));
  385. return failover.get();
  386. }
  387. return error(error);
  388. }
  389. }
  390. private static class FailoverMono<T> extends Failover<Mono<T>> {
  391. private FailoverMono(InternalSchemaRegistry schemaRegistry, Supplier<Mono<T>> failover) {
  392. super(schemaRegistry, failover);
  393. }
  394. @Override
  395. Mono<T> error(Throwable error) {
  396. return Mono.error(error);
  397. }
  398. }
  399. private static class FailoverFlux<T> extends Failover<Flux<T>> {
  400. private FailoverFlux(InternalSchemaRegistry schemaRegistry, Supplier<Flux<T>> failover) {
  401. super(schemaRegistry, failover);
  402. }
  403. @Override
  404. Flux<T> error(Throwable error) {
  405. return Flux.error(error);
  406. }
  407. }
  408. }