SerdesInitializer.java 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253
  1. package com.provectus.kafka.ui.serdes;
  2. import com.google.common.annotations.VisibleForTesting;
  3. import com.google.common.base.Preconditions;
  4. import com.google.common.base.Strings;
  5. import com.google.common.collect.ImmutableMap;
  6. import com.provectus.kafka.ui.config.ClustersProperties;
  7. import com.provectus.kafka.ui.config.ClustersProperties.SerdeConfig;
  8. import com.provectus.kafka.ui.exception.ValidationException;
  9. import com.provectus.kafka.ui.serde.api.PropertyResolver;
  10. import com.provectus.kafka.ui.serde.api.Serde;
  11. import com.provectus.kafka.ui.serdes.builtin.AvroEmbeddedSerde;
  12. import com.provectus.kafka.ui.serdes.builtin.Base64Serde;
  13. import com.provectus.kafka.ui.serdes.builtin.Int32Serde;
  14. import com.provectus.kafka.ui.serdes.builtin.Int64Serde;
  15. import com.provectus.kafka.ui.serdes.builtin.ProtobufFileSerde;
  16. import com.provectus.kafka.ui.serdes.builtin.StringSerde;
  17. import com.provectus.kafka.ui.serdes.builtin.UInt32Serde;
  18. import com.provectus.kafka.ui.serdes.builtin.UInt64Serde;
  19. import com.provectus.kafka.ui.serdes.builtin.UuidBinarySerde;
  20. import com.provectus.kafka.ui.serdes.builtin.sr.SchemaRegistrySerde;
  21. import java.util.LinkedHashMap;
  22. import java.util.Map;
  23. import java.util.Optional;
  24. import java.util.regex.Pattern;
  25. import javax.annotation.Nullable;
  26. import lombok.SneakyThrows;
  27. import lombok.extern.slf4j.Slf4j;
  28. import org.springframework.core.env.Environment;
  29. @Slf4j
  30. public class SerdesInitializer {
  31. private final Map<String, Class<? extends BuiltInSerde>> builtInSerdeClasses;
  32. private final CustomSerdeLoader customSerdeLoader;
  33. public SerdesInitializer() {
  34. this(
  35. ImmutableMap.<String, Class<? extends BuiltInSerde>>builder()
  36. .put(StringSerde.name(), StringSerde.class)
  37. .put(SchemaRegistrySerde.name(), SchemaRegistrySerde.class)
  38. .put(ProtobufFileSerde.name(), ProtobufFileSerde.class)
  39. .put(Int32Serde.name(), Int32Serde.class)
  40. .put(Int64Serde.name(), Int64Serde.class)
  41. .put(UInt32Serde.name(), UInt32Serde.class)
  42. .put(UInt64Serde.name(), UInt64Serde.class)
  43. .put(AvroEmbeddedSerde.name(), AvroEmbeddedSerde.class)
  44. .put(Base64Serde.name(), Base64Serde.class)
  45. .put(UuidBinarySerde.name(), UuidBinarySerde.class)
  46. .build(),
  47. new CustomSerdeLoader()
  48. );
  49. }
  50. @VisibleForTesting
  51. SerdesInitializer(Map<String, Class<? extends BuiltInSerde>> builtInSerdeClasses,
  52. CustomSerdeLoader customSerdeLoader) {
  53. this.builtInSerdeClasses = builtInSerdeClasses;
  54. this.customSerdeLoader = customSerdeLoader;
  55. }
  56. /**
  57. * Initialization algorithm:
  58. * First, we iterate over explicitly configured serdes from cluster config:
  59. * > if serde has name = one of built-in serde's names:
  60. * - if serde's properties are empty, we treat it as serde should be
  61. * auto-configured - we try to do that
  62. * - if serde's properties not empty, we treat it as an intention to
  63. * override default configuration, so we configuring it with specific config (calling configure(..))
  64. * <p/>
  65. * > if serde has className = one of built-in serde's classes:
  66. * - initializing it with specific config and with default classloader
  67. * <p/>
  68. * > if serde has custom className != one of built-in serde's classes:
  69. * - initializing it with specific config and with custom classloader (see CustomSerdeLoader)
  70. * <p/>
  71. * Second, we iterate over remaining built-in serdes (that we NOT explicitly configured by config)
  72. * trying to auto-configure them and registering with empty patterns - they will be present
  73. * in Serde selection in UI, but not assigned to any topic k/v.
  74. */
  75. public ClusterSerdes init(Environment env,
  76. ClustersProperties clustersProperties,
  77. int clusterIndex) {
  78. ClustersProperties.Cluster clusterProperties = clustersProperties.getClusters().get(clusterIndex);
  79. log.debug("Configuring serdes for cluster {}", clusterProperties.getName());
  80. var globalPropertiesResolver = new PropertyResolverImpl(env);
  81. var clusterPropertiesResolver = new PropertyResolverImpl(env, "kafka.clusters." + clusterIndex);
  82. Map<String, SerdeInstance> registeredSerdes = new LinkedHashMap<>();
  83. // initializing serdes from config
  84. if (clusterProperties.getSerde() != null) {
  85. for (int i = 0; i < clusterProperties.getSerde().size(); i++) {
  86. SerdeConfig serdeConfig = clusterProperties.getSerde().get(i);
  87. if (Strings.isNullOrEmpty(serdeConfig.getName())) {
  88. throw new ValidationException("'name' property not set for serde: " + serdeConfig);
  89. }
  90. if (registeredSerdes.containsKey(serdeConfig.getName())) {
  91. throw new ValidationException("Multiple serdes with same name: " + serdeConfig.getName());
  92. }
  93. var instance = createSerdeFromConfig(
  94. serdeConfig,
  95. new PropertyResolverImpl(env, "kafka.clusters." + clusterIndex + ".serde." + i + ".properties"),
  96. clusterPropertiesResolver,
  97. globalPropertiesResolver
  98. );
  99. registeredSerdes.put(serdeConfig.getName(), instance);
  100. }
  101. }
  102. // initializing remaining built-in serdes with empty selection patters
  103. builtInSerdeClasses.forEach((name, clazz) -> {
  104. if (!registeredSerdes.containsKey(name)) {
  105. BuiltInSerde serde = createSerdeInstance(clazz);
  106. if (autoConfigureSerde(serde, clusterPropertiesResolver, globalPropertiesResolver)) {
  107. registeredSerdes.put(name, new SerdeInstance(name, serde, null, null, null));
  108. }
  109. }
  110. });
  111. return new ClusterSerdes(
  112. registeredSerdes,
  113. Optional.ofNullable(clusterProperties.getDefaultKeySerde())
  114. .map(name -> Preconditions.checkNotNull(registeredSerdes.get(name), "Default key serde not found"))
  115. .orElse(null),
  116. Optional.ofNullable(clusterProperties.getDefaultValueSerde())
  117. .map(name -> Preconditions.checkNotNull(registeredSerdes.get(name), "Default value serde not found"))
  118. .or(() -> Optional.ofNullable(registeredSerdes.get(SchemaRegistrySerde.name())))
  119. .or(() -> Optional.ofNullable(registeredSerdes.get(ProtobufFileSerde.name())))
  120. .orElse(null),
  121. createFallbackSerde()
  122. );
  123. }
  124. private SerdeInstance createFallbackSerde() {
  125. StringSerde serde = new StringSerde();
  126. serde.configure(PropertyResolverImpl.empty(), PropertyResolverImpl.empty(), PropertyResolverImpl.empty());
  127. return new SerdeInstance("Fallback", serde, null, null, null);
  128. }
  129. @SneakyThrows
  130. private SerdeInstance createSerdeFromConfig(SerdeConfig serdeConfig,
  131. PropertyResolver serdeProps,
  132. PropertyResolver clusterProps,
  133. PropertyResolver globalProps) {
  134. if (builtInSerdeClasses.containsKey(serdeConfig.getName())) {
  135. return createSerdeWithBuiltInSerdeName(serdeConfig, serdeProps, clusterProps, globalProps);
  136. }
  137. if (serdeConfig.getClassName() != null) {
  138. var builtInSerdeClass = builtInSerdeClasses.values().stream()
  139. .filter(c -> c.getName().equals(serdeConfig.getClassName()))
  140. .findAny();
  141. // built-in serde type with custom name
  142. if (builtInSerdeClass.isPresent()) {
  143. return createSerdeWithBuiltInClass(builtInSerdeClass.get(), serdeConfig, serdeProps, clusterProps, globalProps);
  144. }
  145. }
  146. log.info("Loading custom serde {}", serdeConfig.getName());
  147. return loadAndInitCustomSerde(serdeConfig, serdeProps, clusterProps, globalProps);
  148. }
  149. private SerdeInstance createSerdeWithBuiltInSerdeName(SerdeConfig serdeConfig,
  150. PropertyResolver serdeProps,
  151. PropertyResolver clusterProps,
  152. PropertyResolver globalProps) {
  153. String name = serdeConfig.getName();
  154. if (serdeConfig.getClassName() != null) {
  155. throw new ValidationException("className can't be set for built-in serde");
  156. }
  157. if (serdeConfig.getFilePath() != null) {
  158. throw new ValidationException("filePath can't be set for built-in serde types");
  159. }
  160. var clazz = builtInSerdeClasses.get(name);
  161. BuiltInSerde serde = createSerdeInstance(clazz);
  162. if (serdeConfig.getProperties() == null || serdeConfig.getProperties().isEmpty()) {
  163. if (!autoConfigureSerde(serde, clusterProps, globalProps)) {
  164. // no properties provided and serde does not support auto-configuration
  165. throw new ValidationException(name + " serde is not configured");
  166. }
  167. } else {
  168. // configuring serde with explicitly set properties
  169. serde.configure(serdeProps, clusterProps, globalProps);
  170. }
  171. return new SerdeInstance(
  172. name,
  173. serde,
  174. nullablePattern(serdeConfig.getTopicKeysPattern()),
  175. nullablePattern(serdeConfig.getTopicValuesPattern()),
  176. null
  177. );
  178. }
  179. private boolean autoConfigureSerde(BuiltInSerde serde, PropertyResolver clusterProps, PropertyResolver globalProps) {
  180. if (serde.canBeAutoConfigured(clusterProps, globalProps)) {
  181. serde.autoConfigure(clusterProps, globalProps);
  182. return true;
  183. }
  184. return false;
  185. }
  186. @SneakyThrows
  187. private SerdeInstance createSerdeWithBuiltInClass(Class<? extends BuiltInSerde> clazz,
  188. SerdeConfig serdeConfig,
  189. PropertyResolver serdeProps,
  190. PropertyResolver clusterProps,
  191. PropertyResolver globalProps) {
  192. if (serdeConfig.getFilePath() != null) {
  193. throw new ValidationException("filePath can't be set for built-in serde type");
  194. }
  195. BuiltInSerde serde = createSerdeInstance(clazz);
  196. serde.configure(serdeProps, clusterProps, globalProps);
  197. return new SerdeInstance(
  198. serdeConfig.getName(),
  199. serde,
  200. nullablePattern(serdeConfig.getTopicKeysPattern()),
  201. nullablePattern(serdeConfig.getTopicValuesPattern()),
  202. null
  203. );
  204. }
  205. @SneakyThrows
  206. private <T extends Serde> T createSerdeInstance(Class<T> clazz) {
  207. return clazz.getDeclaredConstructor().newInstance();
  208. }
  209. private SerdeInstance loadAndInitCustomSerde(SerdeConfig serdeConfig,
  210. PropertyResolver serdeProps,
  211. PropertyResolver clusterProps,
  212. PropertyResolver globalProps) {
  213. if (Strings.isNullOrEmpty(serdeConfig.getClassName())) {
  214. throw new ValidationException(
  215. "'className' property not set for custom serde " + serdeConfig.getName());
  216. }
  217. if (Strings.isNullOrEmpty(serdeConfig.getFilePath())) {
  218. throw new ValidationException(
  219. "'filePath' property not set for custom serde " + serdeConfig.getName());
  220. }
  221. var loaded = customSerdeLoader.loadAndConfigure(
  222. serdeConfig.getClassName(), serdeConfig.getFilePath(), serdeProps, clusterProps, globalProps);
  223. return new SerdeInstance(
  224. serdeConfig.getName(),
  225. loaded.getSerde(),
  226. nullablePattern(serdeConfig.getTopicKeysPattern()),
  227. nullablePattern(serdeConfig.getTopicValuesPattern()),
  228. loaded.getClassLoader()
  229. );
  230. }
  231. @Nullable
  232. private Pattern nullablePattern(@Nullable String pattern) {
  233. return pattern == null ? null : Pattern.compile(pattern);
  234. }
  235. }