KafkaServicesValidation.java 5.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147
  1. package com.provectus.kafka.ui.util;
  2. import static com.provectus.kafka.ui.config.ClustersProperties.TruststoreConfig;
  3. import com.provectus.kafka.ui.connect.api.KafkaConnectClientApi;
  4. import com.provectus.kafka.ui.model.ApplicationPropertyValidationDTO;
  5. import com.provectus.kafka.ui.service.ReactiveAdminClient;
  6. import com.provectus.kafka.ui.service.ksql.KsqlApiClient;
  7. import com.provectus.kafka.ui.sr.api.KafkaSrClientApi;
  8. import java.io.FileInputStream;
  9. import java.security.KeyStore;
  10. import java.util.Map;
  11. import java.util.Optional;
  12. import java.util.Properties;
  13. import java.util.function.Supplier;
  14. import javax.annotation.Nullable;
  15. import javax.net.ssl.TrustManagerFactory;
  16. import lombok.extern.slf4j.Slf4j;
  17. import org.apache.kafka.clients.admin.AdminClient;
  18. import org.apache.kafka.clients.admin.AdminClientConfig;
  19. import org.springframework.util.ResourceUtils;
  20. import reactor.core.publisher.Flux;
  21. import reactor.core.publisher.Mono;
  22. @Slf4j
  23. public final class KafkaServicesValidation {
  24. private KafkaServicesValidation() {
  25. }
  26. private static Mono<ApplicationPropertyValidationDTO> valid() {
  27. return Mono.just(new ApplicationPropertyValidationDTO().error(false));
  28. }
  29. private static Mono<ApplicationPropertyValidationDTO> invalid(String errorMsg) {
  30. return Mono.just(new ApplicationPropertyValidationDTO().error(true).errorMessage(errorMsg));
  31. }
  32. private static Mono<ApplicationPropertyValidationDTO> invalid(Throwable th) {
  33. return Mono.just(new ApplicationPropertyValidationDTO().error(true).errorMessage(th.getMessage()));
  34. }
  35. /**
  36. * Returns error msg, if any.
  37. */
  38. public static Optional<String> validateTruststore(TruststoreConfig truststoreConfig) {
  39. if (truststoreConfig.getTruststoreLocation() != null && truststoreConfig.getTruststorePassword() != null) {
  40. try {
  41. KeyStore trustStore = KeyStore.getInstance(KeyStore.getDefaultType());
  42. trustStore.load(
  43. new FileInputStream((ResourceUtils.getFile(truststoreConfig.getTruststoreLocation()))),
  44. truststoreConfig.getTruststorePassword().toCharArray()
  45. );
  46. TrustManagerFactory trustManagerFactory = TrustManagerFactory.getInstance(
  47. TrustManagerFactory.getDefaultAlgorithm()
  48. );
  49. trustManagerFactory.init(trustStore);
  50. } catch (Exception e) {
  51. return Optional.of(e.getMessage());
  52. }
  53. }
  54. return Optional.empty();
  55. }
  56. public static Mono<ApplicationPropertyValidationDTO> validateClusterConnection(String bootstrapServers,
  57. Properties clusterProps,
  58. @Nullable
  59. TruststoreConfig ssl) {
  60. Properties properties = new Properties();
  61. SslPropertiesUtil.addKafkaSslProperties(ssl, properties);
  62. properties.putAll(clusterProps);
  63. properties.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
  64. // editing properties to make validation faster
  65. properties.put(AdminClientConfig.RETRIES_CONFIG, 1);
  66. properties.put(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, 5_000);
  67. properties.put(AdminClientConfig.DEFAULT_API_TIMEOUT_MS_CONFIG, 5_000);
  68. properties.put(AdminClientConfig.CLIENT_ID_CONFIG, "kui-admin-client-validation-" + System.currentTimeMillis());
  69. AdminClient adminClient = null;
  70. try {
  71. adminClient = AdminClient.create(properties);
  72. } catch (Exception e) {
  73. log.error("Error creating admin client during validation", e);
  74. return invalid("Error while creating AdminClient. See logs for details.");
  75. }
  76. return Mono.just(adminClient)
  77. .then(ReactiveAdminClient.toMono(adminClient.listTopics().names()))
  78. .then(valid())
  79. .doOnTerminate(adminClient::close)
  80. .onErrorResume(th -> {
  81. log.error("Error connecting to cluster", th);
  82. return KafkaServicesValidation.invalid("Error connecting to cluster. See logs for details.");
  83. });
  84. }
  85. public static Mono<ApplicationPropertyValidationDTO> validateSchemaRegistry(
  86. Supplier<ReactiveFailover<KafkaSrClientApi>> clientSupplier) {
  87. ReactiveFailover<KafkaSrClientApi> client;
  88. try {
  89. client = clientSupplier.get();
  90. } catch (Exception e) {
  91. log.error("Error creating Schema Registry client", e);
  92. return invalid("Error creating Schema Registry client: " + e.getMessage());
  93. }
  94. return client
  95. .mono(KafkaSrClientApi::getGlobalCompatibilityLevel)
  96. .then(valid())
  97. .onErrorResume(KafkaServicesValidation::invalid);
  98. }
  99. public static Mono<ApplicationPropertyValidationDTO> validateConnect(
  100. Supplier<ReactiveFailover<KafkaConnectClientApi>> clientSupplier) {
  101. ReactiveFailover<KafkaConnectClientApi> client;
  102. try {
  103. client = clientSupplier.get();
  104. } catch (Exception e) {
  105. log.error("Error creating Connect client", e);
  106. return invalid("Error creating Connect client: " + e.getMessage());
  107. }
  108. return client.flux(KafkaConnectClientApi::getConnectorPlugins)
  109. .collectList()
  110. .then(valid())
  111. .onErrorResume(KafkaServicesValidation::invalid);
  112. }
  113. public static Mono<ApplicationPropertyValidationDTO> validateKsql(
  114. Supplier<ReactiveFailover<KsqlApiClient>> clientSupplier) {
  115. ReactiveFailover<KsqlApiClient> client;
  116. try {
  117. client = clientSupplier.get();
  118. } catch (Exception e) {
  119. log.error("Error creating Ksql client", e);
  120. return invalid("Error creating Ksql client: " + e.getMessage());
  121. }
  122. return client.flux(c -> c.execute("SHOW VARIABLES;", Map.of()))
  123. .collectList()
  124. .flatMap(ksqlResults ->
  125. Flux.fromIterable(ksqlResults)
  126. .filter(KsqlApiClient.KsqlResponseTable::isError)
  127. .flatMap(err -> invalid("Error response from ksql: " + err))
  128. .next()
  129. .switchIfEmpty(valid())
  130. )
  131. .onErrorResume(KafkaServicesValidation::invalid);
  132. }
  133. }